diff --git a/nbproject/Makefile-Debug.mk b/nbproject/Makefile-Debug.mk index ddf8bb8..aca0d44 100644 --- a/nbproject/Makefile-Debug.mk +++ b/nbproject/Makefile-Debug.mk @@ -43,7 +43,6 @@ OBJECTFILES= \ ${OBJECTDIR}/src/http-server.o \ ${OBJECTDIR}/src/http.o \ ${OBJECTDIR}/src/log.o \ - ${OBJECTDIR}/src/main-loop.o \ ${OBJECTDIR}/src/main.o \ ${OBJECTDIR}/src/mime.o \ ${OBJECTDIR}/src/queue.o \ @@ -124,11 +123,6 @@ ${OBJECTDIR}/src/log.o: nbproject/Makefile-${CND_CONF}.mk src/log.c ${RM} "$@.d" $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/log.o src/log.c -${OBJECTDIR}/src/main-loop.o: nbproject/Makefile-${CND_CONF}.mk src/main-loop.c - ${MKDIR} -p ${OBJECTDIR}/src - ${RM} "$@.d" - $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/main-loop.o src/main-loop.c - ${OBJECTDIR}/src/main.o: nbproject/Makefile-${CND_CONF}.mk src/main.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" diff --git a/nbproject/Makefile-Release.mk b/nbproject/Makefile-Release.mk index c8094fc..40274f1 100644 --- a/nbproject/Makefile-Release.mk +++ b/nbproject/Makefile-Release.mk @@ -43,7 +43,6 @@ OBJECTFILES= \ ${OBJECTDIR}/src/http-server.o \ ${OBJECTDIR}/src/http.o \ ${OBJECTDIR}/src/log.o \ - ${OBJECTDIR}/src/main-loop.o \ ${OBJECTDIR}/src/main.o \ ${OBJECTDIR}/src/mime.o \ ${OBJECTDIR}/src/queue.o \ @@ -124,11 +123,6 @@ ${OBJECTDIR}/src/log.o: nbproject/Makefile-${CND_CONF}.mk src/log.c ${RM} "$@.d" $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/log.o src/log.c -${OBJECTDIR}/src/main-loop.o: nbproject/Makefile-${CND_CONF}.mk src/main-loop.c - ${MKDIR} -p ${OBJECTDIR}/src - ${RM} "$@.d" - $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/main-loop.o src/main-loop.c - ${OBJECTDIR}/src/main.o: nbproject/Makefile-${CND_CONF}.mk src/main.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" diff --git a/nbproject/configurations.xml b/nbproject/configurations.xml index c6fe371..d36544f 100644 --- a/nbproject/configurations.xml +++ b/nbproject/configurations.xml @@ -11,9 +11,7 @@ src/http.h lib/http_parser.h lib/ini.h - src/khttp.h src/log.h - src/main-loop.h src/main.h src/mime.h src/queue.h @@ -42,7 +40,6 @@ lib/http_parser.c lib/ini.c src/log.c - src/main-loop.c src/main.c src/mime.c src/queue.c @@ -140,16 +137,10 @@ - - - - - - @@ -268,16 +259,10 @@ - - - - - - diff --git a/src/data-buffer.c b/src/data-buffer.c index 34b59d5..3254e48 100644 --- a/src/data-buffer.c +++ b/src/data-buffer.c @@ -36,7 +36,7 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n) BUFFER_LIST_WR_LOCK(list); int blocks = 1; - data_buffer *newbuf=NULL; + data_buffer *newbuf = data_buffer_new(DATA_BUFFER_SIZE); while(blocks * DATA_BUFFER_SIZE < n) { blocks++; LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE)); @@ -54,8 +54,8 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n) elem->wOffset += copy_count; } - LL_CONCAT(list->first, elem); - BUFFER_LIST_WR_LOCK(list); + LL_CONCAT(list->first, newbuf); + BUFFER_LIST_WR_UNLOCK(list); } void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) { assert(list != NULL); diff --git a/src/data-buffer.h b/src/data-buffer.h index b619300..f4b043e 100644 --- a/src/data-buffer.h +++ b/src/data-buffer.h @@ -15,18 +15,13 @@ extern "C" { #endif -#define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, false, true) -#define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, false, true) +#define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, true, true) +#define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, true, true) #define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false) #define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false) #define DATA_BUFFER_SIZE 16*1024 - typedef struct data_buffer_list { - struct data_buffer *first; - pthread_mutex_t *wrlock, *rdlock; - } data_buffer_list; - typedef struct data_buffer { char* buffer; size_t size; @@ -34,6 +29,11 @@ extern "C" { struct data_buffer *next; } data_buffer; + typedef struct data_buffer_list { + data_buffer *first; + pthread_mutex_t *wrlock, *rdlock; + } data_buffer_list; + data_buffer_list* data_buffer_list_new(); void data_buffer_list_delete(data_buffer_list *list); void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n); diff --git a/src/main-loop.c b/src/main-loop.c deleted file mode 100644 index e6eb0fb..0000000 --- a/src/main-loop.c +++ /dev/null @@ -1,292 +0,0 @@ -#include -#include -#include -#include -#include - -#include -#include -#include - -#include "mime.h" -#include "log.h" -#include "socket.h" -#include "thread-pool.h" -#include "http_parser.h" -#include "http.h" -#include "http-reader.h" -#include "server-socket.h" -#include "util.h" -#include "config.h" -#include "main-loop.h" - -hmain_connection* hmain_connection_new(int fd, hmain_status *status) { - static u_int64_t nextid = 1; - - hmain_connection *conn = calloc(1, sizeof(hmain_connection)); - conn->cid = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); - conn->fd = fd; - conn->status = status; - conn->opened = time(NULL); - conn->last_activity = conn->opened; - conn->pending_responses = http_response_list_new(); - conn->pending_write = data_buffer_list_new(); - conn->mutex = calloc(1, sizeof(pthread_mutex_t)); - pthread_mutex_init(conn->mutex, NULL); - - return conn; -} -void hmain_connection_close(hmain_connection *conn) { - close(conn->fd); - if (conn->pending_write != NULL) { - data_buffer_list_delete(conn->pending_write); - } - //TODO: remove from all queues -} -void hmain_connection_delete(hmain_connection *conn) { - LL_DELETE(conn->status->connections, conn); - http_response_list_delete(conn->pending_responses); - if (conn->pending_write != NULL) { - data_buffer_list_delete(conn->pending_write); - } - pthread_mutex_destroy(conn->mutex); - free(conn->mutex); - free(conn->clientaddr); - free(conn); -} - -#define EVENT_IS(event, type) ((event->events & type) == type) -#define EVENT_ISNOT(event, type) (!EVENT_IS(event, type)) - -void hmain_setup(hmain_status *status) { - status->shutdown = false; - - //Start Logging - log_register_add(log_new("stderr", stderr), true, LALL & ~(LINFO|LDEBUG)); - log_register_add(log_new("stdout", stdout), false, LDEBUG | LINFO); - - //Load mime types - mime_init(NULL); - - //Load the config - config_server *config = config_server_new(); - if (config_read_ini("khttpd.ini", config) < 0) { - fatal("Could not read config"); - } - status->config = config; - - //Open our listening socket - status->sfd = server_socket_create(); - server_socket_listen(status->sfd, status->config->listen_port); - - //Open epoll socket - status->epollfd = epoll_create1(0); - if (status->epollfd < 0) { - fatal("Failed to create epollfd"); - } - - //Register socket with epoll - struct epoll_event svr_event; - svr_event.data.fd = status->sfd; - svr_event.events = EPOLLIN | EPOLLET; - if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, status->sfd, &svr_event) < 0) { - fatal("Could not register server socket with epoll"); - } - - //Create thread pools/queues - thread_pool *pool = thread_pool_new("read", queue_new()); - pool->min_threads = 1; - pool->max_threads = 2; - pool->func = thloop_read; - thread_pool_start(pool); - - pool = thread_pool_new("write", queue_new()); - pool->min_threads = 1; - pool->max_threads = 2; - pool->func = thloop_write; - thread_pool_start(pool); - - pool = thread_pool_new("disk_read", queue_new()); - pool->min_threads = 1; - pool->max_threads = 2; - pool->func = thloop_disk_read; - thread_pool_start(pool); - - pool = thread_pool_new("worker", queue_new()); - pool->min_threads = 1; - pool->max_threads = 5; - pool->func = thloop_worker; - thread_pool_start(pool); - -} -void hmain_teardown(hmain_status *status) { - assert(status!=NULL); - - //Stop pools - size_t pool_count = sizeof(status->pools) / sizeof(status->pools[0]); - for(int i=0; ipools[i]); - queue_delete(status->pools[i]->queue); - thread_pool_delete(status->pools[i]); - } - - //Close all connections - hmain_connection *elem; - LL_FOREACH(status->connections, elem) { - hmain_connection_close(elem); - } - - //Close epoll - close(status->epollfd); - - //Close the listening socket - server_socket_release(status->sfd); - - //Cleanup the mime detector - mime_destroy(); - - //Stop and remove all loggers - log_register_clear(); - - //Delete the config and the status - config_server_delete(status->config); - free(status); -} -void hmain_loop(hmain_status *status) { - struct epoll_event *current_event; - struct epoll_event *events = calloc(EPOLL_MAXEVENTS, sizeof(struct epoll_event)); - - struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in)); - socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in); - - while(status->shutdown == false) { - int event_count = epoll_wait(status->epollfd, events, EPOLL_MAXEVENTS, 2000); - for(int i=0; idata.ptr; - hmain_connection_close(conn); - hmain_connection_delete(conn); - //TODO: close socket & cleanup - } else if (EVENT_IS(current_event, EPOLLIN)) { - if (current_event->data.fd == status->sfd) { - //New connection(s) - while(true) { - struct epoll_event new_event; - int clientfd = accept4(status->sfd, (struct sockaddr*)clientaddr, &clientaddr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); - if (clientfd < 0) { - if (errno == EAGAIN || errno == EWOULDBLOCK) { - //Done with new connections - break; - } else { - warning(true, "Error accepting new connection"); - } - } - hmain_connection *conn = hmain_connection_new(clientfd, status); - conn->clientaddr = calloc(1, sizeof(struct sockaddr_in)); - memcpy(conn->clientaddr, clientaddr, clientaddr_len); - new_event.data.fd = clientfd; - new_event.data.ptr = conn; - new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP; - if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, clientfd, &new_event) < 0) { - fatal("Could not register new connection with epoll"); - } - LL_APPEND(status->connections, conn); - } - } else { - //Data is ready to read on existing connection - hmain_connection *conn = (hmain_connection*)current_event->data.ptr; - if (conn->isReading == true) { - continue; - } - queue_add(status->pools[1]->queue, queue_item_new2("READ", (void*)conn)); - } - } else if (EVENT_IS(current_event, EPOLLOUT)) { - //Data can be written to connection - hmain_connection *conn = (hmain_connection*)current_event->data.ptr; - if (conn->isWriting == true || (conn->pending_responses == NULL && conn->pending_write == NULL)) { - continue; - } - queue_add(status->pools[2]->queue, queue_item_new2("WRITE", (void*)conn)); - } - }//for events - }//while shutdown == false - free(events); - free(clientaddr); -} - -void* thloop_read(void * arg) { - thread* th = (thread*)arg; - - while(th->stop==false) { - queue_item *item = queue_fetchone(th->pool->queue, true); - if (item == NULL) { - continue; - } - hmain_connection *conn = (hmain_connection*)item->data; - CONN_LOCK(conn); - - char buffer[16*1024]; - int count; - do { - count = read(conn->fd, buffer, sizeof(buffer)/sizeof(buffer[0])); - if (count < 0) { - if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { - //Socket error - warning(true, "failed to read from connection #%lu", conn->cid); - } - break; - } - if (count == 0) { - break; - } - - LOG(LDEBUG, "Read %zu", count); - - //conn->pending_write = data_pool_appendbuffer(conn->pending_write, conn->status->buffer_pool, buffer, count); - - queue_add(conn->status->pools[2]->queue, queue_item_new2("WRITE", (void*)conn)); - /*if (conn->parse_data == NULL) { - conn->parse_data = calloc(1, sizeof(hmain_parse_data)); - conn->parse_data->parser = calloc(1, sizeof(http_parser)); - http_parser_init(conn->parse_data->parser, HTTP_REQUEST); - conn->parse_data->parser->data = conn->parse_data; - conn->parse_data->parser_header_state = HSTATE_NONE; - } - */ - } while(count > 0); - CONN_UNLOCK(conn); - } -} -void* thloop_write(void * arg) { - thread* th = (thread*)arg; - - while(th->stop==false) { - queue_item *item = queue_fetchone(th->pool->queue, true); - if (item == NULL) { - continue; - } - hmain_connection *conn = (hmain_connection*)item->data; - do { - CONN_LOCK(conn); - - - - }while(0); - CONN_UNLOCK(conn); - } -} -void* thloop_disk_read(void * arg){ - -} -void* thloop_worker(void * arg) { - -} \ No newline at end of file diff --git a/src/main-loop.h b/src/main-loop.h deleted file mode 100644 index cebbc1a..0000000 --- a/src/main-loop.h +++ /dev/null @@ -1,81 +0,0 @@ -/* - * File: main-loop.h - * Author: sam - * - * Created on 07 August 2014, 18:44 - */ - -#ifndef MAIN_LOOP_H -#define MAIN_LOOP_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include - - #define EPOLL_MAXEVENTS 128 - -#define CONN_LOCK(c) pthread_mutex_lock(c->mutex) -#define CONN_UNLOCK(c) pthread_mutex_unlock(c->mutex) - - typedef enum hmain_pool { - POOL_READA, POOL_WRITEA, POOL_WORKERSA, POOL_DISK_READA - } hmain_pool; - - //typedef enum skt_elem_hstate {HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD} skt_elem_hstate; - - typedef struct hmain_connection { - uint64_t cid; - struct hmain_status *status; - int fd; - struct sockaddr_in* clientaddr; - bool isReading, isWriting; - time_t opened; - time_t last_activity; - struct hmain_parse_data *parse_data; - http_response_list *pending_responses; - data_buffer_list *pending_write; - struct hmain_connection *next; - pthread_mutex_t *mutex; - - } hmain_connection; - - typedef struct hmain_status { - config_server *config; - int sfd; - int epollfd; - thread_pool *pools[4]; - bool shutdown; - hmain_connection *connections; - } hmain_status; - - typedef struct hmain_parse_data { - http_request *current_request; - bool request_complete; - http_parser *parser; - http_header *parser_current_header; - int parser_header_state; - } hmain_parse_data; - - hmain_connection* hmain_connection_new(int fd, hmain_status *status); - void hmain_connection_close(hmain_connection *conn); - void hmain_connection_delete(hmain_connection *conn); - - void hmain_setup(hmain_status *status); - void hmain_teardown(hmain_status *status); - void hmain_loop(hmain_status *status); - - void* thloop_read(void * arg); - void* thloop_write(void * arg); - void* thloop_disk_read(void * arg); - void* thloop_worker(void * arg); - -#ifdef __cplusplus -} -#endif - -#endif /* MAIN_LOOP_H */ - diff --git a/src/main.c b/src/main.c index dbd544b..5bfae3f 100644 --- a/src/main.c +++ b/src/main.c @@ -17,10 +17,23 @@ #include #include +#include "util.h" #include "main.h" +#include "server.h" +#include "server-state.h" int main(int argc, char** argv) { + //Load the config + config_server *config = config_server_new(); + if (config_read_ini(DEFAULT_CONFIG_FILE, config) < 0) { + fatal("Could not read config"); + } + + server_state *state = server_status_new(config); + + //Run the server + server_start(state); return (EXIT_SUCCESS); } \ No newline at end of file diff --git a/src/queue.c b/src/queue.c index 2e4cd0b..c81f6ff 100644 --- a/src/queue.c +++ b/src/queue.c @@ -156,6 +156,25 @@ void queue_unblock(queue *q, uint64_t itemid) { } QUEUE_UNLOCK(q); } +size_t queue_unblock_byptr(queue *q, void* ptr) { + assert(q!=NULL); + + size_t count = 0; + uint64_t itemid = 0; + queue_item *elem; + QUEUE_LOCK(q); + LL_FOREACH(q->list, elem) { + if (elem->data == ptr) { + elem->blocked = false; + count++; + } + } + if (count > 0) { + pthread_cond_signal(q->cond); + } + QUEUE_UNLOCK(q); + return count; +} void queue_clear(queue *q) { assert(q!=NULL); QUEUE_LOCK(q); diff --git a/src/queue.h b/src/queue.h index aa1da1d..8c8ddaf 100644 --- a/src/queue.h +++ b/src/queue.h @@ -93,6 +93,7 @@ extern "C" { int queue_remove_byptr(queue *q, void* ptr); queue_item* queue_fetchone(queue *q, bool blocking); void queue_unblock(queue *q, uint64_t itemid); + size_t queue_unblock_byptr(queue *q, void* ptr); void queue_clear(queue *q); void queue_ping(queue *q); size_t queue_count(queue *q); diff --git a/src/server-connection.c b/src/server-connection.c index b848991..0ba92db 100644 --- a/src/server-connection.c +++ b/src/server-connection.c @@ -7,20 +7,23 @@ #include "http.h" #include "socket.h" +#include "server-state.h" #include "server-connection.h" -server_connection* server_connection_new(socket_info *skt) { +server_connection* server_connection_new(socket_info *skt, server_state *state) { static uint64_t nextid = 1; assert(skt!=NULL); + assert(state!=NULL); server_connection *conn = calloc(1, sizeof(server_connection)); conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); conn->last_activity = time(NULL); + conn->server = state; conn->parse_state = NULL; conn->pending_responses = http_response_list_new(); conn->pending_writes = data_buffer_list_new(); conn->skt = skt; - conn->write_qid = 0; + pthread_mutex_init(&conn->mutex, NULL); return conn; } void server_connection_delete(server_connection *conn) { @@ -29,5 +32,6 @@ void server_connection_delete(server_connection *conn) { http_response_list_delete(conn->pending_responses); data_buffer_list_delete(conn->pending_writes); skt_delete(conn->skt); + pthread_mutex_destroy(&conn->mutex); free(conn); } \ No newline at end of file diff --git a/src/server-connection.h b/src/server-connection.h index 8632b6d..475cf75 100644 --- a/src/server-connection.h +++ b/src/server-connection.h @@ -14,11 +14,16 @@ extern "C" { #include #include +#include #include "data-buffer.h" #include "http.h" #include "http-reader.h" +#include "server-state.h" #include "socket.h" + +#define CONN_LOCK(c) pthread_mutex_lock(&c->mutex) +#define CONN_UNLOCK(c) pthread_mutex_unlock(&c->mutex) typedef struct server_parse_status { http_request *current_request; @@ -32,14 +37,15 @@ extern "C" { uint64_t id; struct socket_info *skt; time_t last_activity; + server_state *server; http_response_list *pending_responses; data_buffer_list *pending_writes; - uint64_t write_qid;//item id in write queue - server_parse_status *parse_state; + server_parse_status *parse_state; struct server_connection *next; + pthread_mutex_t mutex; } server_connection; - server_connection* server_connection_new(socket_info *skt); + server_connection* server_connection_new(socket_info *skt, server_state *state); void server_connection_delete(server_connection *conn); #ifdef __cplusplus diff --git a/src/server-loop-read.c b/src/server-loop-read.c index 756ef8f..2028dfc 100644 --- a/src/server-loop-read.c +++ b/src/server-loop-read.c @@ -1,15 +1,57 @@ #include #include #include +#include +#include #include "util.h" #include "log.h" #include "config.h" +#include "data-buffer.h" +#include "queue.h" #include "server-connection.h" #include "server-state.h" #include "server-loop.h" void* server_loop_read(void* arg) { + thread *th = (thread*)arg; + while(th->stop == false) { + queue_item *item = queue_fetchone(th->pool->queue, true); + if (item == NULL) { + continue; + } + server_connection *conn = (server_connection*)item->data; + CONN_LOCK(conn); + ssize_t count=0, totalread=0; + char readbuf[16*1024]; + while(true) { + count = read(conn->skt->fd, readbuf, sizeof(readbuf)/sizeof(readbuf[0])); + if (count < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + item->blocked = true; + break; + } else { + if (errno != EBADF) { + char address[INET_ADDRSTRLEN]; + skt_clientaddr(conn->skt, address, INET_ADDRSTRLEN); + warning(true, "[#%lu %s] read error", conn->id, address); + } + conn->skt->error = true; + break; + } + } + if (count == 0) { + break; + } + totalread += count; + data_buffer_list_append(conn->pending_writes, readbuf, count); + } + if (totalread > 0) { + queue_add(conn->server->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", conn)); + } + CONN_UNLOCK(conn); + queue_return_item(th->pool->queue, item, item->blocked == false); + } } \ No newline at end of file diff --git a/src/server-loop-write.c b/src/server-loop-write.c index 1eb03b4..1694fbd 100644 --- a/src/server-loop-write.c +++ b/src/server-loop-write.c @@ -1,15 +1,62 @@ #include #include #include +#include +#include + +#include "ut/utlist.h" #include "util.h" #include "log.h" #include "config.h" +#include "data-buffer.h" #include "server-connection.h" #include "server-state.h" #include "server-loop.h" void* server_loop_write(void* arg) { + thread *th = (thread*)arg; + while(th->stop == false) { + queue_item *item = queue_fetchone(th->pool->queue, true); + if (item == NULL) { + continue; + } + server_connection *conn = (server_connection*)item->data; + CONN_LOCK(conn); + + size_t count = 0; + while(conn->pending_writes->first != NULL) { + BUFFER_LIST_RD_LOCK(conn->pending_writes); + data_buffer *next = conn->pending_writes->first; + + count = write(conn->skt->fd, next->buffer+next->rOffset, next->wOffset - next->rOffset); + if (count < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + item->blocked = true; + break; + } else { + if (errno != EBADF) { + char address[INET_ADDRSTRLEN]; + skt_clientaddr(conn->skt, address, INET_ADDRSTRLEN); + warning(true, "[#%lu %s] write error", conn->id, address); + } + conn->skt->error = true; + break; + } + } + next->rOffset += count; + + if (next->rOffset >= next->wOffset) { + LL_DELETE(conn->pending_writes->first, next); + data_buffer_free(next); + } + + BUFFER_LIST_RD_UNLOCK(conn->pending_writes); + } + + CONN_UNLOCK(conn); + queue_return_item(th->pool->queue, item, item->blocked == false); + } } \ No newline at end of file diff --git a/src/server-loop.c b/src/server-loop.c index f27734e..7de95a3 100644 --- a/src/server-loop.c +++ b/src/server-loop.c @@ -15,29 +15,41 @@ #include "ut/utlist.h" #include "server-socket.h" -void server_loop(server_status *state) { +void server_loop(server_state *state) { assert(state!=NULL); assert(state->started==true); struct epoll_event *current_event; struct epoll_event *events = alloca(sizeof(struct epoll_event)*EP_MAXEVENTS); - struct server_connection *conn; + server_connection *conn; while(state->shutdown_requested == false) { int event_count = epoll_wait(state->epollfd, events, EP_MAXEVENTS, EP_WAIT_TIME); for(int i=0; idata.ptr != NULL ? EP_CONN(current_event) : NULL; - if (EP_EVENT_IS (current_event, EPOLLERR | EPOLLHUP) || - EP_EVENT_ISNOT(current_event, EPOLLIN | EPOLLOUT | EPOLLRDHUP)) { - if (conn == NULL) { - fatal("Unexpected error on unknown socket"); - } else if (conn->skt->fd == state->sfd) { - LOG(LWARNING, "Error/Unexpected event on server socket"); + conn = NULL; + server_connection *conn_elem; + LL_FOREACH(state->clients, conn_elem) { + if (conn_elem == current_event->data.ptr) { + conn = EP_CONN(current_event); + } + } + + if (EP_EVENT_IS(current_event, EPOLLERR) || + EP_EVENT_IS(current_event, EPOLLERR) || + ( + EP_EVENT_ISNOT(current_event, EPOLLIN) && + EP_EVENT_ISNOT(current_event, EPOLLOUT) && + EP_EVENT_ISNOT(current_event, EPOLLRDHUP) + ) + ) { + if (current_event->data.fd == state->sfd) { + LOG(LERROR, "Error/Unexpected event on server socket"); state->shutdown_requested = true; break; } else { + assert(conn!=NULL); char* conn_addr = calloc(INET_ADDRSTRLEN, sizeof(char)); skt_clientaddr(conn->skt, conn_addr, INET_ADDRSTRLEN); LOG(LWARNING, "Error on socket %lu [%s]", conn->id, conn_addr); @@ -48,39 +60,48 @@ void server_loop(server_status *state) { assert(conn != NULL); skt_close(conn->skt); } else if (EP_EVENT_IS(current_event, EPOLLIN)) { - assert(conn != NULL); - if (conn->skt->fd == state->sfd) { + if (current_event->data.fd == state->sfd) { //New connections while (true) { socket_info *skt = server_socket_accept(state->sfd, SOCK_NONBLOCK | SOCK_CLOEXEC); if (skt == NULL) { break; } - server_connection *client = server_connection_new(skt); + server_connection *client = server_connection_new(skt, state); + struct epoll_event new_event; new_event.data.ptr = client; - new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; - if (epoll_ctl(state->epollfd, EPOLL_CTL_ADD, client->skt->fd, &new_event) < 0) { + new_event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET; + if (epoll_ctl(state->epollfd, EPOLL_CTL_ADD, skt->fd, &new_event) < 0) { warning(true, "Failed to add connection to epoll"); skt_close(client->skt); server_connection_delete(client); + } else { + LL_APPEND(state->clients, client); + queue_add(state->pools[POOL_READ]->queue, queue_item_new2("READ", (void*)client)); } - LL_APPEND(state->clients, client); } } else { assert(conn != NULL); //Data available on connection - queue_add(state->pools[POOL_READ]->queue, (void*)conn); + if (queue_unblock_byptr(state->pools[POOL_READ]->queue, (void*)conn) == 0) { + queue_add(state->pools[POOL_READ]->queue, queue_item_new2("READ", (void*)conn)); + } } } else if (EP_EVENT_IS(current_event, EPOLLOUT)) { //Connection is now available to write to - queue_add(state->pools[POOL_WRITE]->queue, (void*)conn); + if (queue_unblock_byptr(state->pools[POOL_READ]->queue, (void*)conn) == 0) { + queue_add(state->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", (void*)conn)); + } } } //Clean up closed connections server_connection *elem, *tmp; - LL_FOREACH(state->clients, elem) { + LL_FOREACH_SAFE(state->clients, elem, tmp) { + if (elem->skt->error == true) { + skt_close(elem->skt); + } if (elem->skt->closed != true) { continue; } @@ -90,6 +111,10 @@ void server_loop(server_status *state) { for(int i=0; i < THREADPOOL_NUM; i++) { queue_pending_waitptr(state->pools[i]->queue, elem); } + LL_DELETE(state->clients, elem); + char address[INET_ADDRSTRLEN] = {0}; + skt_clientaddr(elem->skt, address, INET_ADDRSTRLEN); + LOG(LINFO, "[#%lu %s] Connection Closed", elem->id, address); server_connection_delete(elem); } } diff --git a/src/server-loop.h b/src/server-loop.h index e95c834..11442be 100644 --- a/src/server-loop.h +++ b/src/server-loop.h @@ -18,10 +18,10 @@ extern "C" { #define EP_WAIT_TIME 2000 #define EP_CONN(event) (server_connection*)event->data.ptr -#define EP_EVENT_IS(event, type) ((event->events & type) != 0) +#define EP_EVENT_IS(event, type) ((event->events & type) == type) #define EP_EVENT_ISNOT(event, type) (!EP_EVENT_IS(event, type)) - void server_loop(server_status *state); + void server_loop(server_state *state); void* server_loop_read(void* arg); void* server_loop_write(void* arg); void* server_loop_worker(void* arg); diff --git a/src/server-socket.c b/src/server-socket.c index 7e1e803..e92ac66 100644 --- a/src/server-socket.c +++ b/src/server-socket.c @@ -57,7 +57,7 @@ void server_socket_listen_epoll(int fd, uint16_t port, int *out_epfd) { } //Register socket with epoll - struct epoll_event svr_event; + struct epoll_event svr_event = {0}; svr_event.data.fd = fd; svr_event.events = EPOLLIN | EPOLLET; if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &svr_event) < 0) { @@ -78,7 +78,10 @@ socket_info* server_socket_accept(int fd, int flags) { int clientfd=0; socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in); clientfd = accept4(fd, (struct sockaddr*)clientaddr, &clientaddr_len, flags); - if (clientfd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + if (clientfd < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) { + return NULL; + } warning(true, "error accepting connection"); return NULL; } diff --git a/src/server-state.c b/src/server-state.c index 6ba3aa3..fb387a9 100644 --- a/src/server-state.c +++ b/src/server-state.c @@ -10,14 +10,14 @@ #include "server-state.h" #include "queue.h" #include "thread-pool.h" -#include "main-loop.h" +#include "server-connection.h" -server_status* server_status_new(config_server *config) { +server_state* server_status_new(config_server *config) { assert(config!=NULL); assert(config->host_count>0); assert(config->listen_port>0); - server_status *status = calloc(1, sizeof(server_status)); + server_state *status = calloc(1, sizeof(server_state)); status->started = false; status->stopped = true; status->shutdown_requested = false; @@ -27,7 +27,7 @@ server_status* server_status_new(config_server *config) { return status; } -void server_status_delete(server_status *status) { +void server_status_delete(server_state *status) { assert(status!=NULL); assert(status->stopped==true); assert(status->pools[0]==NULL); @@ -41,7 +41,7 @@ void server_status_delete(server_status *status) { free(status); } -void server_start_pools(server_status *status, thread_func pool_functions[]) { +void server_start_pools(server_state *status, thread_func pool_functions[]) { assert(status!=NULL); assert(status->pools[0]==NULL); @@ -67,7 +67,7 @@ void server_start_pools(server_status *status, thread_func pool_functions[]) { status->pools[POOL_WORKER] = pool; thread_pool_start(pool); } -void server_stop_pools(server_status *status) { +void server_stop_pools(server_state *status) { assert(status!=NULL); assert(status->pools[0]!=NULL); diff --git a/src/server-state.h b/src/server-state.h index 9c57397..447e1c9 100644 --- a/src/server-state.h +++ b/src/server-state.h @@ -15,27 +15,26 @@ extern "C" { #include "http.h" #include "config.h" #include "thread-pool.h" -#include "server-connection.h" typedef enum server_pool { POOL_READ, POOL_WRITE, POOL_WORKER, /*{*/THREADPOOL_NUM/*}must be last*/ } server_pool; - typedef struct server_status { + typedef struct server_state { config_server *config; bool started, stopped; bool shutdown_requested; int sfd; int epollfd; thread_pool *pools[THREADPOOL_NUM]; - server_connection *clients; - } server_status; + struct server_connection *clients; + } server_state; - server_status* server_status_new(config_server *config); - void server_status_delete(server_status *status); + server_state* server_status_new(config_server *config); + void server_status_delete(server_state *status); - void server_start_pools(server_status *status, thread_func pool_functions[]); - void server_stop_pools(server_status *status); + void server_start_pools(server_state *status, thread_func pool_functions[]); + void server_stop_pools(server_state *status); #ifdef __cplusplus } diff --git a/src/server.c b/src/server.c index 020fe53..1ce3034 100644 --- a/src/server.c +++ b/src/server.c @@ -21,7 +21,7 @@ #include "server-loop.h" #include "server.h" -void server_start(server_status *status, const char* config_file) { +void server_start(server_state *status) { assert(status!=NULL); assert(status->stopped==true); @@ -34,16 +34,9 @@ void server_start(server_status *status, const char* config_file) { //Load mime types mime_init(NULL); - //Load the config - config_server *config = config_server_new(); - if (config_read_ini(config_file, config) < 0) { - fatal("Could not read config"); - } - status->config = config; - //Open the server socket status->sfd = server_socket_create(); - server_socket_listen_epoll(status->sfd, config->listen_port, &status->epollfd); + server_socket_listen_epoll(status->sfd, status->config->listen_port, &status->epollfd); //Start thread pools thread_func pool_functions[] = { @@ -62,7 +55,7 @@ void server_start(server_status *status, const char* config_file) { //Cleanup after the loop exits server_teardown(status); } -void server_teardown(server_status *status) { +void server_teardown(server_state *status) { assert(status!=NULL); assert(status->stopped==true); diff --git a/src/server.h b/src/server.h index d7c8e35..76370ce 100644 --- a/src/server.h +++ b/src/server.h @@ -16,8 +16,8 @@ extern "C" { #include "server-state.h" - void server_start(server_status *status, const char* config_file); - void server_teardown(server_status *status); + void server_start(server_state *status); + void server_teardown(server_state *status); #ifdef __cplusplus } diff --git a/src/socket.c b/src/socket.c index fda2cef..fb30b01 100644 --- a/src/socket.c +++ b/src/socket.c @@ -113,7 +113,9 @@ int skt_data_buffer(socket_info *skt, data_buffer_list *list) { void skt_close(socket_info* skt) { assert(skt != NULL); if (close(skt->fd) < 0) { - warning(true, "error closing socket"); + if (errno != EBADF) { + warning(true, "error closing socket"); + } } skt->closed = true; } diff --git a/src/util.c b/src/util.c index ceac25f..a35bc2d 100644 --- a/src/util.c +++ b/src/util.c @@ -58,7 +58,7 @@ void warning(bool use_errno, char* fmt, ...) { strcat(errstr, msg); strcat(errstr, ": "); strcat(errstr, errnostr); - LOG(LFATAL, errstr); + LOG(LWARNING, errstr); free(errnostr_buf); free(errstr); } else {