From cc069c64db88d42bf51c260bd40c257bb36fa5b3 Mon Sep 17 00:00:00 2001 From: Sam Stevens Date: Fri, 8 Aug 2014 21:46:02 +0100 Subject: [PATCH] More work on new server loop --- src/log.c | 2 + src/main-loop.c | 120 +++++++++++++++++++++++++++++++++++++++++++++--- src/main-loop.h | 31 +++++++++++-- src/queue.c | 8 ++++ src/queue.h | 1 + src/socket.c | 12 +---- src/socket.h | 1 - src/util.c | 15 +++++- 8 files changed, 168 insertions(+), 22 deletions(-) diff --git a/src/log.c b/src/log.c index a5d8e29..5fc058b 100644 --- a/src/log.c +++ b/src/log.c @@ -82,7 +82,9 @@ void*log_loop(void* arg) { time_t ctime; struct tm *tinfo = calloc(1,sizeof(struct tm)); while(true) { + //Read next message pointer from pipe if (read(l->pRead, buf, sizeof(void*)) <= 0) { + //zero length indicates the write end closed (EOF) if (l->running == false) { break; } diff --git a/src/main-loop.c b/src/main-loop.c index 9d7b87d..faa36e2 100644 --- a/src/main-loop.c +++ b/src/main-loop.c @@ -1,6 +1,11 @@ #include #include #include +#include +#include + +#include +#include #include "main-loop.h" #include "mime.h" @@ -8,6 +13,33 @@ #include "socket.h" #include "thread-pool.h" #include "queue.h" +#include "log.h" + +hmain_connection* hmain_connection_new(int fd, hmain_status *status) { + static u_int64_t nextid = 1; + + hmain_connection *conn = calloc(1, sizeof(hmain_connection)); + conn->cid = __atomic_fetch_add(&u_int64_t, 1, __ATOMIC_SEQ_CST); + conn->fd = fd; + conn->status = status; + conn->opened = time(NULL); + conn->last_activity = conn->opened; + conn->pending_responses = http_response_list_new(); + + return conn; +} +void hmain_connection_close(hmain_connection *conn) { + close(fd); + //TODO: remove from all queues +} +void hmain_connection_delete(hmain_connection *conn) { + http_response_list_delete(conn->pending_responses); + free(conn->clientaddr); + free(conn); +} + +#define EVENT_IS(event, type) ((event.events & type) == type) +#define EVENT_ISNOT(event, type) (!EVENT_IS(event, type)) void hmain_setup(hmain_status **statusptr) { hmain_status *status = *statusptr; @@ -16,6 +48,7 @@ void hmain_setup(hmain_status **statusptr) { fatal("hmain already setup"); } status = calloc(1, sizeof(hmain_status)); + status->shutdown = false; //Start Logging log_register_add(log_new("stderr", stderr), true, LALL & ~(LINFO|LDEBUG)); @@ -33,7 +66,6 @@ void hmain_setup(hmain_status **statusptr) { //Open our listening socket status->sfd = svr_create(); - svr_setnonblock(status->sfd); svr_listen(status->sfd, status->config->listen_port); //Open epoll for socket @@ -55,35 +87,48 @@ void hmain_setup(hmain_status **statusptr) { pool->min_threads = 1; pool->max_threads = 2; pool->func = thloop_read; - status->pool.read = pool; + status->pools[POOL_READ] = pool; + thread_pool_start(pool); pool = thread_pool_new("write", queue_new()); pool->min_threads = 1; pool->max_threads = 2; pool->func = thloop_write; - status->pool.write = pool; + status->pools[POOL_WRITE] = pool; + thread_pool_start(pool); pool = thread_pool_new("disk_read", queue_new()); pool->min_threads = 1; pool->max_threads = 2; pool->func = thloop_disk_read; - status->pool.disk_read = pool; + status->pools[POOL_DISK_READ] = pool; + thread_pool_start(pool); pool = thread_pool_new("worker", queue_new()); pool->min_threads = 1; pool->max_threads = 5; pool->func = thloop_worker; - status->pool.workers = pool; + status->pools[POOL_WORKERS] = pool; + thread_pool_start(pool); + } void hmain_teardown(hmain_status *status) { if (status == NULL) { fatal("hmain is not setup"); } + //Stop pools + size_t pool_count = sizeof(status->pools) / sizeof(status->pools[0]); + for(int i=0; ipools[i]); + queue_delete(status->pools[i]->queue); + thread_pool_delete(status->pools[i]); + } + //Close epoll close(status->epollfd); - //Close the listening connection + //Close the listening socket svr_release(status->sfd); //Cleanup the mime detector @@ -97,7 +142,70 @@ void hmain_teardown(hmain_status *status) { free(status); } void hmain_loop(hmain_status *status) { + struct epoll_event *current_event; + struct epoll_event *events = calloc(EPOLL_MAXEVENTS, sizeof(struct epoll_event)); + struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in)); + socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in); + + while(status->shutdown == false) { + int event_count = epoll_wait(status->epollfd, events, EPOLL_MAXEVENTS, 1000); + for(int i=0; idata.fd == status->sfd) { + //New connection(s) + while(true) { + struct epoll_event new_event; + int clientfd = accept4(status->sfd, (struct sockaddr*)clientaddr, clientaddr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); + if (clientfd < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + //Done with new connections + break; + } else { + warning(true, "Error accepting new connection"); + } + } + hmain_connection *conn = hmain_connection_new(clientfd, status); + conn->clientaddr = calloc(1, sizeof(struct sockaddr_in)); + memcpy(conn->clientaddr, clientaddr, clientaddr_len); + new_event.data.fd = clientfd; + new_event.data.ptr = conn; + new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; + if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, status->sfd, &new_event) < 0) { + fatal("Could not register new connection with epoll"); + } + } + } else { + //Data is ready to read on existing connection + hmain_connection *conn = (hmain_connection*)current_event->data.ptr; + if (conn->isReading == true) { + continue; + } + queue_add(status->pools[POOL_READ]->queue, queue_item_new2("READ", (void*)conn)); + } + } else if (EVENT_IS(current_event, EPOLLOUT)) { + //Data can be written to connection + hmain_connection *conn = (hmain_connection*)current_event->data.ptr; + if (conn->isWriting == true || conn->pending_responses == NULL) { + continue; + } + queue_add(status->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", (void*)conn)); + } + }//for events + }//while shutdown == false + free(events); + free(clientaddr); } void* thloop_read(void * arg) { diff --git a/src/main-loop.h b/src/main-loop.h index 710fe7c..b6c1fcf 100644 --- a/src/main-loop.h +++ b/src/main-loop.h @@ -8,22 +8,47 @@ #ifndef MAIN_LOOP_H #define MAIN_LOOP_H +#include +#include #include "config.h" #include "thread-pool.h" +#include "http.h" #ifdef __cplusplus extern "C" { #endif +#define EPOLL_MAXEVENTS 128 + + typedef enum hmain_pool { + POOL_READ, POOL_WRITE, POOL_WORKERS, POOL_DISK_READ + }; + typedef struct hmain_status { config_server *config; int sfd; int epollfd; - struct { - thread_pool *read, *write, *workers, *disk_read; - } pool; + thread_pool *pools[4]; + bool shutdown; + hmain_connection *connections; } hmain_status; + typedef struct hmain_connection { + uint64_t cid; + hmain_status *status; + int fd; + struct sockaddr_in* clientaddr; + bool isReading, isWriting; + time_t opened; + time_t last_activity; + http_response_list *pending_responses; + hmain_connection *next; + } hmain_connection; + + hmain_connection* hmain_connection_new(int fd, hmain_status *status); + void hmain_connection_close(hmain_connection *conn); + void hmain_connection_delete(hmain_connection *conn); + void hmain_setup(hmain_status **statusptr); void hmain_teardown(hmain_status *status); void hmain_loop(hmain_status *status); diff --git a/src/queue.c b/src/queue.c index a30dffa..0ae29d1 100644 --- a/src/queue.c +++ b/src/queue.c @@ -1,6 +1,7 @@ #include #include #include +#include #include "queue.h" #include "util.h" @@ -10,6 +11,13 @@ queue_item* queue_item_new() { queue_item *item = calloc(1, sizeof(queue_item)); return item; } +queue_item* queue_item_new2(char* tag, void* data) { + queue_item *item = queue_item_new(); + item->tag[0] = '\0'; + strncat(item->tag, tag, (sizeof(item->tag)/sizeof(char))+1); + item->data = data; + return item; +} void queue_item_delete(queue_item *item) { free(item); } diff --git a/src/queue.h b/src/queue.h index 99083ec..9a0ebae 100644 --- a/src/queue.h +++ b/src/queue.h @@ -24,6 +24,7 @@ extern "C" { } queue_item; queue_item* queue_item_new(); + queue_item* queue_item_new2(char* tag, void* data); void queue_item_delete(queue_item *item); typedef struct queue { diff --git a/src/socket.c b/src/socket.c index f1c7fab..94b222a 100644 --- a/src/socket.c +++ b/src/socket.c @@ -111,7 +111,7 @@ char* skt_clientaddr(skt_info *skt) { int svr_create() { int fd = 0; - fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0); + fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); if (fd < 0) { fatal("could not create socket"); } @@ -134,16 +134,6 @@ void svr_listen(int fd, uint16_t port) { } info("Listening on port %u", port); } -void svr_setnonblock(int fd) { - int flags = fcntl(fd, F_GETFL, 0); - if (flags < 0) { - fatal("failed to set nonblocking on server socket"); - } - flags |= O_NONBLOCK; - if (fcntl(fd, F_SETFL, flags) < 0) { - fatal("failed to set nonblocking on server socket"); - } -} void svr_release(int fd) { if (close(fd) < 0) { warning(true, "could not close socket"); diff --git a/src/socket.h b/src/socket.h index 22e911a..1233f9a 100644 --- a/src/socket.h +++ b/src/socket.h @@ -47,7 +47,6 @@ extern "C" { int svr_create(); void svr_listen(int fd, uint16_t port); - void svr_setnonblock(int fd); void svr_release(int fd); bool svr_canaccept(int fd); skt_info* svr_accept(int fd); diff --git a/src/util.c b/src/util.c index c86dbc7..f563c55 100644 --- a/src/util.c +++ b/src/util.c @@ -21,7 +21,20 @@ void fatal(char* fmt, ...) { vsnprintf(msg, LOG_LENGTH, fmt, va); va_end(va); - LOG(LFATAL, msg); + if (errno != 0) { + char *errnostr = calloc(64, sizeof(char)); + strerror_r(errno, errnostr, 64); + char *errstr = calloc(strlen(msg)+strlen(errnostr)+3, sizeof(char)); + strcat(errstr, msg); + strcat(errstr, ": "); + strcat(errstr, errnostr); + LOG(LFATAL, errstr); + free(errnostr); + free(errstr); + } else { + LOG(LFATAL, msg); + } + log_register_clear(); exit(EXIT_FAILURE); }