diff --git a/src/main-loop.h b/src/main-loop.h index e1d687e..cebbc1a 100644 --- a/src/main-loop.h +++ b/src/main-loop.h @@ -16,7 +16,7 @@ extern "C" { #include #include -#define EPOLL_MAXEVENTS 128 + #define EPOLL_MAXEVENTS 128 #define CONN_LOCK(c) pthread_mutex_lock(c->mutex) #define CONN_UNLOCK(c) pthread_mutex_unlock(c->mutex) diff --git a/src/queue.c b/src/queue.c index c0dbb8f..2e4cd0b 100644 --- a/src/queue.c +++ b/src/queue.c @@ -10,7 +10,7 @@ #include "log.h" queue_item* queue_item_new() { - static uint64_t nextid = 0; + static uint64_t nextid = 1; queue_item *item = calloc(1, sizeof(queue_item)); item->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); @@ -131,6 +131,7 @@ queue_item* queue_fetchone(queue *q, bool blocking) { //Add to processing list queue_pending_item *token = calloc(1, sizeof(queue_pending_item)); token->qid = item->id; + token->data = item->data; LL_APPEND(q->processing, token); } } @@ -206,7 +207,8 @@ void queue_return_item(queue *q, queue_item *item, bool finished) { pthread_cond_broadcast(q->processing_cond); QUEUE_UNLOCK(q); } -void queue_waitfor_pending(queue *q, uint64_t itemid) { +void queue_pending_wait(queue *q, uint64_t itemid) { + assert(q!=NULL); QUEUE_LOCK(q); bool found; @@ -217,4 +219,23 @@ void queue_waitfor_pending(queue *q, uint64_t itemid) { QUEUE_HAS_PENDING(q, itemid, found); } QUEUE_UNLOCK(q); +} +void queue_pending_waitptr(queue *q, void* ptr) { + uint64_t itemid; + + queue_pending_item *elem; + do { + QUEUE_LOCK(q); + itemid = 0; + LL_FOREACH(q->processing, elem) { + if (elem->data == ptr) { + itemid = elem->qid; + break; + } + } + QUEUE_UNLOCK(q); + if (itemid > 0) { + queue_pending_wait(q, itemid); + } + } while(itemid > 0); } \ No newline at end of file diff --git a/src/queue.h b/src/queue.h index 6f517db..aa1da1d 100644 --- a/src/queue.h +++ b/src/queue.h @@ -73,6 +73,7 @@ extern "C" { typedef struct queue_pending_item { uint64_t qid; + void * data; struct queue_pending_item *next; } queue_pending_item; @@ -97,6 +98,7 @@ extern "C" { size_t queue_count(queue *q); void queue_return_item(queue *q, queue_item *item, bool finished); void queue_pending_wait(queue *q, uint64_t itemid); + void queue_pending_waitptr(queue *q, void* ptr); #ifdef __cplusplus } diff --git a/src/server-loop.c b/src/server-loop.c index 1cd4590..f27734e 100644 --- a/src/server-loop.c +++ b/src/server-loop.c @@ -1,6 +1,9 @@ #include #include #include +#include +#include +#include #include "util.h" #include "log.h" @@ -8,7 +11,89 @@ #include "server-connection.h" #include "server-state.h" +#include "server-loop.h" +#include "ut/utlist.h" +#include "server-socket.h" void server_loop(server_status *state) { + assert(state!=NULL); + assert(state->started==true); + struct epoll_event *current_event; + struct epoll_event *events = alloca(sizeof(struct epoll_event)*EP_MAXEVENTS); + struct server_connection *conn; + + while(state->shutdown_requested == false) { + int event_count = epoll_wait(state->epollfd, events, EP_MAXEVENTS, EP_WAIT_TIME); + for(int i=0; idata.ptr != NULL ? EP_CONN(current_event) : NULL; + + if (EP_EVENT_IS (current_event, EPOLLERR | EPOLLHUP) || + EP_EVENT_ISNOT(current_event, EPOLLIN | EPOLLOUT | EPOLLRDHUP)) { + if (conn == NULL) { + fatal("Unexpected error on unknown socket"); + } else if (conn->skt->fd == state->sfd) { + LOG(LWARNING, "Error/Unexpected event on server socket"); + state->shutdown_requested = true; + break; + } else { + char* conn_addr = calloc(INET_ADDRSTRLEN, sizeof(char)); + skt_clientaddr(conn->skt, conn_addr, INET_ADDRSTRLEN); + LOG(LWARNING, "Error on socket %lu [%s]", conn->id, conn_addr); + free(conn_addr); + skt_close(conn->skt); + } + } else if (EP_EVENT_IS(current_event, EPOLLRDHUP)) { + assert(conn != NULL); + skt_close(conn->skt); + } else if (EP_EVENT_IS(current_event, EPOLLIN)) { + assert(conn != NULL); + if (conn->skt->fd == state->sfd) { + //New connections + while (true) { + socket_info *skt = server_socket_accept(state->sfd, SOCK_NONBLOCK | SOCK_CLOEXEC); + if (skt == NULL) { + break; + } + server_connection *client = server_connection_new(skt); + struct epoll_event new_event; + new_event.data.ptr = client; + new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; + if (epoll_ctl(state->epollfd, EPOLL_CTL_ADD, client->skt->fd, &new_event) < 0) { + warning(true, "Failed to add connection to epoll"); + skt_close(client->skt); + server_connection_delete(client); + } + LL_APPEND(state->clients, client); + } + } else { + assert(conn != NULL); + //Data available on connection + queue_add(state->pools[POOL_READ]->queue, (void*)conn); + } + } else if (EP_EVENT_IS(current_event, EPOLLOUT)) { + //Connection is now available to write to + queue_add(state->pools[POOL_WRITE]->queue, (void*)conn); + } + } + + //Clean up closed connections + server_connection *elem, *tmp; + LL_FOREACH(state->clients, elem) { + if (elem->skt->closed != true) { + continue; + } + for(int i=0; i < THREADPOOL_NUM; i++) { + queue_remove_byptr(state->pools[i]->queue, elem); + } + for(int i=0; i < THREADPOOL_NUM; i++) { + queue_pending_waitptr(state->pools[i]->queue, elem); + } + server_connection_delete(elem); + } + } + state->shutdown_requested = false; + state->started = false; + state->stopped = true; } \ No newline at end of file diff --git a/src/server-loop.h b/src/server-loop.h index 33c29e2..e95c834 100644 --- a/src/server-loop.h +++ b/src/server-loop.h @@ -14,9 +14,12 @@ extern "C" { #include "server-state.h" +#define EP_MAXEVENTS 128 +#define EP_WAIT_TIME 2000 + #define EP_CONN(event) (server_connection*)event->data.ptr -#define EP_EVENT_IS(event, type) ((event->events & type) == type) -#define EP_EVENT_ISNOT(event, type) (!EVENT_IS(event, type)) +#define EP_EVENT_IS(event, type) ((event->events & type) != 0) +#define EP_EVENT_ISNOT(event, type) (!EP_EVENT_IS(event, type)) void server_loop(server_status *state); void* server_loop_read(void* arg); diff --git a/src/server-socket.c b/src/server-socket.c index fa504b4..7e1e803 100644 --- a/src/server-socket.c +++ b/src/server-socket.c @@ -87,8 +87,10 @@ socket_info* server_socket_accept(int fd, int flags) { skt->clientaddr = clientaddr; skt->fd = clientfd; - info("[#%lu %s] New Connection", skt->id, skt_clientaddr(skt)); + char * address = calloc(INET_ADDRSTRLEN, sizeof(char)); + skt_clientaddr(skt, address, INET_ADDRSTRLEN); + info("[#%lu %s] New Connection", skt->id, address); + free(address); return skt; - } \ No newline at end of file diff --git a/src/socket.c b/src/socket.c index 1f5c3f7..fda2cef 100644 --- a/src/socket.c +++ b/src/socket.c @@ -28,6 +28,7 @@ socket_info* skt_new(int fd) { socket_info* skt = calloc(1, sizeof(socket_info)); skt->id = skt_nextid(); skt->fd = fd; + skt->closed = false; skt->time_opened = time(NULL); skt->error = false; skt->clientaddr = NULL; @@ -114,14 +115,16 @@ void skt_close(socket_info* skt) { if (close(skt->fd) < 0) { warning(true, "error closing socket"); } + skt->closed = true; } -const char* skt_clientaddr(socket_info *skt) { +char* skt_clientaddr(socket_info *skt, char* address, size_t address_len) { assert(skt != NULL); - char *tmp = calloc(INET_ADDRSTRLEN, sizeof(char)); - const char* address = inet_ntop(AF_INET, &skt->clientaddr->sin_addr, tmp, INET_ADDRSTRLEN); + assert(address != NULL); + assert(address_len >= INET_ADDRSTRLEN); + inet_ntop(AF_INET, &skt->clientaddr->sin_addr, address, address_len); if (address == NULL) { warning(true, "error fetching client address"); - free(tmp); + free(address); } return address; } \ No newline at end of file diff --git a/src/socket.h b/src/socket.h index e4fa36f..e1d41e6 100644 --- a/src/socket.h +++ b/src/socket.h @@ -23,6 +23,7 @@ extern "C" { typedef struct socket_info { u_int64_t id; int fd; + bool closed; struct sockaddr_in* clientaddr; time_t time_opened; bool error; @@ -37,7 +38,7 @@ extern "C" { size_t skt_write(socket_info* skt, char* data, size_t len); int skt_write_data_buffer(socket_info *skt, data_buffer_list *list); void skt_close(socket_info *skt); - const char* skt_clientaddr(socket_info *skt); + char* skt_clientaddr(socket_info *skt, char* address, size_t address_len); #ifdef __cplusplus }