diff --git a/nbproject/Makefile-Debug.mk b/nbproject/Makefile-Debug.mk index b93a934..2642742 100644 --- a/nbproject/Makefile-Debug.mk +++ b/nbproject/Makefile-Debug.mk @@ -38,6 +38,7 @@ OBJECTFILES= \ ${OBJECTDIR}/lib/http_parser.o \ ${OBJECTDIR}/lib/ini.o \ ${OBJECTDIR}/src/config.o \ + ${OBJECTDIR}/src/data-buffer.o \ ${OBJECTDIR}/src/http-reader.o \ ${OBJECTDIR}/src/http-server.o \ ${OBJECTDIR}/src/http.o \ @@ -46,6 +47,7 @@ OBJECTFILES= \ ${OBJECTDIR}/src/main.o \ ${OBJECTDIR}/src/mime.o \ ${OBJECTDIR}/src/queue.o \ + ${OBJECTDIR}/src/server-socket.o \ ${OBJECTDIR}/src/socket.o \ ${OBJECTDIR}/src/thread-pool.o \ ${OBJECTDIR}/src/util.o @@ -90,6 +92,11 @@ ${OBJECTDIR}/src/config.o: nbproject/Makefile-${CND_CONF}.mk src/config.c ${RM} "$@.d" $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/config.o src/config.c +${OBJECTDIR}/src/data-buffer.o: nbproject/Makefile-${CND_CONF}.mk src/data-buffer.c + ${MKDIR} -p ${OBJECTDIR}/src + ${RM} "$@.d" + $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/data-buffer.o src/data-buffer.c + ${OBJECTDIR}/src/http-reader.o: nbproject/Makefile-${CND_CONF}.mk src/http-reader.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" @@ -130,6 +137,11 @@ ${OBJECTDIR}/src/queue.o: nbproject/Makefile-${CND_CONF}.mk src/queue.c ${RM} "$@.d" $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/queue.o src/queue.c +${OBJECTDIR}/src/server-socket.o: nbproject/Makefile-${CND_CONF}.mk src/server-socket.c + ${MKDIR} -p ${OBJECTDIR}/src + ${RM} "$@.d" + $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-socket.o src/server-socket.c + ${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" diff --git a/nbproject/Makefile-Release.mk b/nbproject/Makefile-Release.mk index f01dbb9..22190b0 100644 --- a/nbproject/Makefile-Release.mk +++ b/nbproject/Makefile-Release.mk @@ -38,6 +38,7 @@ OBJECTFILES= \ ${OBJECTDIR}/lib/http_parser.o \ ${OBJECTDIR}/lib/ini.o \ ${OBJECTDIR}/src/config.o \ + ${OBJECTDIR}/src/data-buffer.o \ ${OBJECTDIR}/src/http-reader.o \ ${OBJECTDIR}/src/http-server.o \ ${OBJECTDIR}/src/http.o \ @@ -46,6 +47,7 @@ OBJECTFILES= \ ${OBJECTDIR}/src/main.o \ ${OBJECTDIR}/src/mime.o \ ${OBJECTDIR}/src/queue.o \ + ${OBJECTDIR}/src/server-socket.o \ ${OBJECTDIR}/src/socket.o \ ${OBJECTDIR}/src/thread-pool.o \ ${OBJECTDIR}/src/util.o @@ -90,6 +92,11 @@ ${OBJECTDIR}/src/config.o: nbproject/Makefile-${CND_CONF}.mk src/config.c ${RM} "$@.d" $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/config.o src/config.c +${OBJECTDIR}/src/data-buffer.o: nbproject/Makefile-${CND_CONF}.mk src/data-buffer.c + ${MKDIR} -p ${OBJECTDIR}/src + ${RM} "$@.d" + $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/data-buffer.o src/data-buffer.c + ${OBJECTDIR}/src/http-reader.o: nbproject/Makefile-${CND_CONF}.mk src/http-reader.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" @@ -130,6 +137,11 @@ ${OBJECTDIR}/src/queue.o: nbproject/Makefile-${CND_CONF}.mk src/queue.c ${RM} "$@.d" $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/queue.o src/queue.c +${OBJECTDIR}/src/server-socket.o: nbproject/Makefile-${CND_CONF}.mk src/server-socket.c + ${MKDIR} -p ${OBJECTDIR}/src + ${RM} "$@.d" + $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-socket.o src/server-socket.c + ${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" diff --git a/nbproject/configurations.xml b/nbproject/configurations.xml index 97442ba..9082983 100644 --- a/nbproject/configurations.xml +++ b/nbproject/configurations.xml @@ -5,6 +5,7 @@ displayName="Header Files" projectFiles="true"> src/config.h + src/data-buffer.h src/http-reader.h src/http-server.h src/http.h @@ -15,6 +16,7 @@ src/main.h src/mime.h src/queue.h + src/server-socket.h src/socket.h src/thread-pool.h src/util.h @@ -27,6 +29,7 @@ displayName="Source Files" projectFiles="true"> src/config.c + src/data-buffer.c src/http-reader.c src/http-server.c src/http.c @@ -37,6 +40,7 @@ src/main.c src/mime.c src/queue.c + src/server-socket.c src/socket.c src/thread-pool.c src/util.c @@ -107,6 +111,10 @@ + + + + @@ -139,6 +147,10 @@ + + + + @@ -201,6 +213,10 @@ + + + + @@ -233,6 +249,10 @@ + + + + diff --git a/src/data-buffer.c b/src/data-buffer.c new file mode 100644 index 0000000..ffd2b2c --- /dev/null +++ b/src/data-buffer.c @@ -0,0 +1,86 @@ +#include +#include +#include +#include +#include +#include + +#include "ut/utlist.h" + +#include "data-buffer.h" + +data_buffer_list* data_buffer_list_new() { + data_buffer_list *list = calloc(1, sizeof(data_buffer_list)); + list->first = NULL; + list->wrlock = calloc(1, sizeof(pthread_mutex_t)); + pthread_mutex_init(list->wrlock, NULL); + list->rdlock = calloc(1, sizeof(pthread_mutex_t)); + pthread_mutex_init(list->rdlock, NULL); + return list; +} +void data_buffer_list_delete(data_buffer_list *list) { + assert(list!=NULL); + pthread_mutex_destroy(list->wrlock); + pthread_mutex_destroy(list->rdlock); + data_buffer *elem, *tmp; + LL_FOREACH_SAFE(list->first, elem, tmp) { + LL_DELETE(list->first, elem); + data_buffer_free(elem); + } + free(list->wrlock); + free(list->rdlock); + free(list); +} +int data_buffer_list_append(data_buffer_list *list, const char* src, size_t n) { + assert(list!=NULL); + assert(src!=NULL && n>0); + BUFFER_LIST_WR_LOCK(list); + + int blocks = 1; + data_buffer *newbuf=NULL; + while(blocks * DATA_BUFFER_SIZE < n) { + blocks++; + LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE)); + } + + size_t offset = 0; + data_buffer *elem; + LL_FOREACH(newbuf, elem) { + size_t copy_count = n - offset; + if (copy_count > elem->size) { + copy_count = elem->size; + } + memcpy(elem->buffer, src+offset, copy_count); + offset += copy_count; + elem->wOffset += copy_count; + } + + LL_CONCAT(list->first, elem); + BUFFER_LIST_WR_LOCK(list); +} +void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) { + assert(list != NULL); + if (rd == true) pthread_mutex_lock(list->rdlock); + if (wr == true) pthread_mutex_lock(list->wrlock); +} +void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr) { + assert(list != NULL); + if (rd == true) pthread_mutex_unlock(list->rdlock); + if (wr == true) pthread_mutex_unlock(list->wrlock); +} + +data_buffer* data_buffer_new(size_t size) { + assert(size > 0); + + data_buffer* buffer = calloc(1, sizeof(data_buffer)); + buffer->buffer = calloc(size, sizeof(char)); + buffer->size = size; + + return buffer; +} +void data_buffer_free(data_buffer *buffer) { + assert(buffer != NULL); + + free(buffer->buffer); + free(buffer); +} diff --git a/src/data-buffer.h b/src/data-buffer.h new file mode 100644 index 0000000..08a1a51 --- /dev/null +++ b/src/data-buffer.h @@ -0,0 +1,52 @@ +/* + * File: data-buffer.h + * Author: sam + * + * Created on 09 August 2014, 16:54 + */ + +#ifndef DATA_BUFFER_H +#define DATA_BUFFER_H + +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +#define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, false, true) +#define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, false, true) +#define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false) +#define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false) + +#define DATA_BUFFER_SIZE 16*1024 + + typedef struct data_buffer_list { + struct data_buffer *first; + pthread_mutex_t *wrlock, *rdlock; + } data_buffer_list; + + typedef struct data_buffer { + char* buffer; + size_t size; + size_t wOffset, rOffset; + struct data_buffer *next; + } data_buffer; + + data_buffer_list* data_buffer_list_new(); + void data_buffer_list_delete(data_buffer_list *list); + int data_buffer_list_append(data_buffer_list *list, const char* src, size_t n); + void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr); + void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr); + + data_buffer* data_buffer_new(size_t size); + void data_buffer_free(data_buffer *buffer); + + +#ifdef __cplusplus +} +#endif + +#endif /* DATA_BUFFER_H */ + diff --git a/src/http-reader.c b/src/http-reader.c index 5b85443..d91ce37 100644 --- a/src/http-reader.c +++ b/src/http-reader.c @@ -7,16 +7,17 @@ #include "http.h" #include "http_parser.h" #include "http-reader.h" +#include "main-loop.h" #define GET_CB_STR(str, at, length) do { \ str = calloc(length+1, sizeof(char));\ strncpy(str, at, length);\ }while(0); -#define SKT(parser) ((skt_elem*)parser->data) +#define SKT(parser) ((hmain_parse_data*)parser->data) http_parser_settings *parser_settings = NULL; -http_parser_settings* parser_get_settings(skt_elem *elem) { +http_parser_settings* parser_get_settings(hmain_parse_data *data) { if (parser_settings == NULL) { parser_settings = calloc(1, sizeof(http_parser_settings)); parser_settings->on_body = parser_cb_on_body; diff --git a/src/http-reader.h b/src/http-reader.h index 4e11742..4f725fd 100644 --- a/src/http-reader.h +++ b/src/http-reader.h @@ -15,8 +15,9 @@ extern "C" { #include "http_parser.h" #include "http.h" #include "main.h" +#include "main-loop.h" - http_parser_settings* parser_get_settings(skt_elem *elem); + http_parser_settings* parser_get_settings(hmain_parse_data *elem); void parser_free_settings(); int parser_cb_on_message_begin(http_parser* parser); diff --git a/src/http-server.c b/src/http-server.c index f03d6b3..b7c2f3b 100644 --- a/src/http-server.c +++ b/src/http-server.c @@ -15,6 +15,7 @@ #include "config.h" #include "http-server.h" #include "mime.h" +#include "ut/utstring.h" http_response* server_process_request(config_server* config, http_request *request) { http_response* response = NULL; diff --git a/src/main-loop.c b/src/main-loop.c index faa36e2..4044899 100644 --- a/src/main-loop.c +++ b/src/main-loop.c @@ -6,6 +6,7 @@ #include #include +#include #include "main-loop.h" #include "mime.h" @@ -14,40 +15,49 @@ #include "thread-pool.h" #include "queue.h" #include "log.h" +#include "http.h" +#include "http-reader.h" +#include "server-socket.h" hmain_connection* hmain_connection_new(int fd, hmain_status *status) { static u_int64_t nextid = 1; hmain_connection *conn = calloc(1, sizeof(hmain_connection)); - conn->cid = __atomic_fetch_add(&u_int64_t, 1, __ATOMIC_SEQ_CST); + conn->cid = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); conn->fd = fd; conn->status = status; conn->opened = time(NULL); conn->last_activity = conn->opened; conn->pending_responses = http_response_list_new(); + conn->pending_write = data_buffer_list_new(); + conn->mutex = calloc(1, sizeof(pthread_mutex_t)); + pthread_mutex_init(conn->mutex, NULL); return conn; } void hmain_connection_close(hmain_connection *conn) { - close(fd); + close(conn->fd); + if (conn->pending_write != NULL) { + data_buffer_list_delete(conn->pending_write); + } //TODO: remove from all queues } void hmain_connection_delete(hmain_connection *conn) { + LL_DELETE(conn->status->connections, conn); http_response_list_delete(conn->pending_responses); + if (conn->pending_write != NULL) { + data_buffer_list_delete(conn->pending_write); + } + pthread_mutex_destroy(conn->mutex); + free(conn->mutex); free(conn->clientaddr); free(conn); } -#define EVENT_IS(event, type) ((event.events & type) == type) +#define EVENT_IS(event, type) ((event->events & type) == type) #define EVENT_ISNOT(event, type) (!EVENT_IS(event, type)) -void hmain_setup(hmain_status **statusptr) { - hmain_status *status = *statusptr; - - if (status != NULL) { - fatal("hmain already setup"); - } - status = calloc(1, sizeof(hmain_status)); +void hmain_setup(hmain_status *status) { status->shutdown = false; //Start Logging @@ -65,10 +75,10 @@ void hmain_setup(hmain_status **statusptr) { status->config = config; //Open our listening socket - status->sfd = svr_create(); - svr_listen(status->sfd, status->config->listen_port); + status->sfd = server_socket_create(); + server_socket_listen(status->sfd, status->config->listen_port); - //Open epoll for socket + //Open epoll socket status->epollfd = epoll_create1(0); if (status->epollfd < 0) { fatal("Failed to create epollfd"); @@ -78,7 +88,7 @@ void hmain_setup(hmain_status **statusptr) { struct epoll_event svr_event; svr_event.data.fd = status->sfd; svr_event.events = EPOLLIN | EPOLLET; - if (epoll_ctl(s->epollfd, EPOLL_CTL_ADD, status->sfd, &svr_event) < 0) { + if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, status->sfd, &svr_event) < 0) { fatal("Could not register server socket with epoll"); } @@ -125,11 +135,17 @@ void hmain_teardown(hmain_status *status) { thread_pool_delete(status->pools[i]); } + //Close all connections + hmain_connection *elem; + LL_FOREACH(status->connections, elem) { + hmain_connection_close(elem); + } + //Close epoll close(status->epollfd); //Close the listening socket - svr_release(status->sfd); + server_socket_release(status->sfd); //Cleanup the mime detector mime_destroy(); @@ -149,25 +165,28 @@ void hmain_loop(hmain_status *status) { socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in); while(status->shutdown == false) { - int event_count = epoll_wait(status->epollfd, events, EPOLL_MAXEVENTS, 1000); + int event_count = epoll_wait(status->epollfd, events, EPOLL_MAXEVENTS, 2000); for(int i=0; idata.ptr; + hmain_connection_close(conn); + hmain_connection_delete(conn); //TODO: close socket & cleanup } else if (EVENT_IS(current_event, EPOLLIN)) { if (current_event->data.fd == status->sfd) { //New connection(s) while(true) { struct epoll_event new_event; - int clientfd = accept4(status->sfd, (struct sockaddr*)clientaddr, clientaddr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); + int clientfd = accept4(status->sfd, (struct sockaddr*)clientaddr, &clientaddr_len, SOCK_NONBLOCK | SOCK_CLOEXEC); if (clientfd < 0) { if (errno == EAGAIN || errno == EWOULDBLOCK) { //Done with new connections @@ -181,10 +200,11 @@ void hmain_loop(hmain_status *status) { memcpy(conn->clientaddr, clientaddr, clientaddr_len); new_event.data.fd = clientfd; new_event.data.ptr = conn; - new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET; - if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, status->sfd, &new_event) < 0) { + new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP; + if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, clientfd, &new_event) < 0) { fatal("Could not register new connection with epoll"); } + LL_APPEND(status->connections, conn); } } else { //Data is ready to read on existing connection @@ -197,7 +217,7 @@ void hmain_loop(hmain_status *status) { } else if (EVENT_IS(current_event, EPOLLOUT)) { //Data can be written to connection hmain_connection *conn = (hmain_connection*)current_event->data.ptr; - if (conn->isWriting == true || conn->pending_responses == NULL) { + if (conn->isWriting == true || (conn->pending_responses == NULL && conn->pending_write == NULL)) { continue; } queue_add(status->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", (void*)conn)); @@ -209,10 +229,68 @@ void hmain_loop(hmain_status *status) { } void* thloop_read(void * arg) { + thread* th = (thread*)arg; + while(th->stop==false) { + queue_item *item = queue_fetchone(th->pool->queue, true); + if (item == NULL) { + continue; + } + hmain_connection *conn = (hmain_connection*)item->data; + CONN_LOCK(conn); + conn->isReading = true; + + char buffer[16*1024]; + int count; + do { + count = read(conn->fd, buffer, sizeof(buffer)/sizeof(buffer[0])); + if (count < 0) { + if (!(errno == EAGAIN || errno == EWOULDBLOCK)) { + //Socket error + warning(true, "failed to read from connection #%lu", conn->cid); + } + break; + } + if (count == 0) { + break; + } + + LOG(LDEBUG, "Read %zu", count); + + //conn->pending_write = data_pool_appendbuffer(conn->pending_write, conn->status->buffer_pool, buffer, count); + + queue_add(conn->status->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", (void*)conn)); + /*if (conn->parse_data == NULL) { + conn->parse_data = calloc(1, sizeof(hmain_parse_data)); + conn->parse_data->parser = calloc(1, sizeof(http_parser)); + http_parser_init(conn->parse_data->parser, HTTP_REQUEST); + conn->parse_data->parser->data = conn->parse_data; + conn->parse_data->parser_header_state = HSTATE_NONE; + } + */ + } while(count > 0); + conn->isReading = false; + CONN_UNLOCK(conn); + } } void* thloop_write(void * arg) { + thread* th = (thread*)arg; + while(th->stop==false) { + queue_item *item = queue_fetchone(th->pool->queue, true); + if (item == NULL) { + continue; + } + hmain_connection *conn = (hmain_connection*)item->data; + do { + CONN_LOCK(conn); + conn->isWriting = true; + + + }while(0); + conn->isWriting = false; + CONN_UNLOCK(conn); + } } void* thloop_disk_read(void * arg){ diff --git a/src/main-loop.h b/src/main-loop.h index b6c1fcf..235bc0f 100644 --- a/src/main-loop.h +++ b/src/main-loop.h @@ -10,9 +10,14 @@ #include #include +#include + +#include "http_parser.h" + #include "config.h" #include "thread-pool.h" #include "http.h" +#include "data-buffer.h" #ifdef __cplusplus extern "C" { @@ -20,9 +25,30 @@ extern "C" { #define EPOLL_MAXEVENTS 128 +#define CONN_LOCK(c) pthread_mutex_lock(c->mutex) +#define CONN_UNLOCK(c) pthread_mutex_unlock(c->mutex) + typedef enum hmain_pool { POOL_READ, POOL_WRITE, POOL_WORKERS, POOL_DISK_READ - }; + } hmain_pool; + + typedef enum skt_elem_hstate {HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD} skt_elem_hstate; + + typedef struct hmain_connection { + uint64_t cid; + struct hmain_status *status; + int fd; + struct sockaddr_in* clientaddr; + bool isReading, isWriting; + time_t opened; + time_t last_activity; + struct hmain_parse_data *parse_data; + http_response_list *pending_responses; + data_buffer_list *pending_write; + struct hmain_connection *next; + pthread_mutex_t *mutex; + + } hmain_connection; typedef struct hmain_status { config_server *config; @@ -33,23 +59,19 @@ extern "C" { hmain_connection *connections; } hmain_status; - typedef struct hmain_connection { - uint64_t cid; - hmain_status *status; - int fd; - struct sockaddr_in* clientaddr; - bool isReading, isWriting; - time_t opened; - time_t last_activity; - http_response_list *pending_responses; - hmain_connection *next; - } hmain_connection; + typedef struct hmain_parse_data { + http_request *current_request; + bool request_complete; + http_parser *parser; + http_header *parser_current_header; + skt_elem_hstate parser_header_state; + } hmain_parse_data; hmain_connection* hmain_connection_new(int fd, hmain_status *status); void hmain_connection_close(hmain_connection *conn); void hmain_connection_delete(hmain_connection *conn); - void hmain_setup(hmain_status **statusptr); + void hmain_setup(hmain_status *status); void hmain_teardown(hmain_status *status); void hmain_loop(hmain_status *status); diff --git a/src/main.c b/src/main.c index b9e7410..11784f6 100644 --- a/src/main.c +++ b/src/main.c @@ -31,6 +31,7 @@ #include "queue.h" #include "thread-pool.h" #include "log.h" +#include "main-loop.h" int serverfd = 0; volatile static bool stop = false; @@ -41,180 +42,13 @@ static void signal_int(int signum) { } int main(int argc, char** argv) { - log_register_add(log_new("stderr", stderr), true, LALL & ~(LINFO|LDEBUG)); - log_register_add(log_new("stdout", stdout), false, LDEBUG | LINFO); - mime_init(NULL); - config_server *config = config_server_new(); - if (config_read_ini("khttpd.ini", config) < 0) { - fatal("Could not read config"); - } + hmain_status *status = calloc(1, sizeof(hmain_status)); + hmain_setup(status); - signal(SIGINT, signal_int); + hmain_loop(status); - skt_elem *connections = NULL; - - serverfd = svr_create(); - svr_listen(serverfd, config->listen_port); - - while(1) { - uint32_t connections_open; - skt_elem *elem, *tmp; - - //Accept new connections - LL_COUNT(connections, elem, connections_open); - while(connections_open < 100 && svr_canaccept(serverfd)) { - skt_info *info = svr_accept(serverfd); - if (info != NULL) { - skt_elem *elem = skt_elem_new(info); - LL_APPEND(connections, elem); - } - } - - //Read from connections - LL_FOREACH(connections, elem) { - if (skt_canread(elem->info)) { - skt_read(elem->info); - } - } - - //Process sockets - LL_FOREACH(connections, elem) { - if (utstring_len(elem->info->read) > 0) { - //Parse the incoming data - int parsedcount = http_parser_execute( - elem->parser, - parser_get_settings(elem), - utstring_body(elem->info->read), - utstring_len(elem->info->read)); - //Check that all data was read - if (parsedcount != utstring_len(elem->info->read)) { - //emit warning - char warningmsg[2048] = {0}; - snprintf(warningmsg, 2048, - "error parsing request (%s: %s). closing connection", - http_errno_name(elem->parser->http_errno), - http_errno_description(elem->parser->http_errno)); - warning(false, warningmsg); - //send 400 back and close connection - http_response *resp400 = http_response_create_builtin(400, "Request was invalid or could not be read"); - http_header_list_add(resp400->headers, http_header_new(HEADER_CONNECTION, "close"), false); - skt_elem_write_response(elem, resp400, false); - http_response_delete(resp400); - skt_elem_reset(elem); - } - //Clear read data now that we have processed it - utstring_clear(elem->info->read); - //Process request if received - if (elem->request_complete == true) { - http_response *response = server_process_request(config, elem->current_request); - if (response == NULL) { - response = http_response_create_builtin(500, "Request could not be processed"); - http_header_list_add(response->headers, http_header_new(HEADER_CONNECTION, "close"), false); - } - skt_elem_write_response(elem, response, true); - - skt_elem_reset(elem); - } - } - } - - //Write to connections - LL_FOREACH(connections, elem) { - if (utstring_len(elem->info->write) > 0 && elem->info->close == false) { - skt_write(elem->info); - } - } - - time_t current = time(NULL); - time_t timeout = 30; - time_t maxlife = 500; - //Close where needed - LL_FOREACH(connections, elem) { - if (current - elem->info->last_act > timeout) { - info("[#%lu %s] Timeout", elem->info->id, skt_clientaddr(elem->info)); - elem->info->close = true; - } - if (current - elem->info->time_opened > maxlife) { - info("[#%lu %s] Reached max life", elem->info->id, skt_clientaddr(elem->info)); - elem->info->close = true; - } - if (elem->info->close_afterwrite && utstring_len(elem->info->write) == 0) { - elem->info->close = true; - } - if (elem->info->close == true || stop == true) { - skt_close(elem->info); - } - } - //Delete closed connections - LL_FOREACH_SAFE(connections, elem, tmp) { - if (elem->info->closed) { - LL_DELETE(connections, elem); - skt_elem_delete(elem); - } - } - if (stop == true) { - break; - } - } - - mime_destroy(); - config_server_delete(config); - svr_release(serverfd); - log_register_clear(); - serverfd = 0; + hmain_teardown(status); return (EXIT_SUCCESS); -} - -skt_elem* skt_elem_new(skt_info *info) { - skt_elem* elem = calloc(1, sizeof(skt_elem)); - elem->info = info; - elem->parser = calloc(1, sizeof(http_parser)); - http_parser_init(elem->parser, HTTP_REQUEST); - elem->parser->data = (void*)elem; - elem->parser_header_state = HSTATE_NONE; - elem->request_complete = false; - return elem; -} -void skt_elem_reset(skt_elem *elem) { - if (elem->current_request != NULL) { - http_request_delete(elem->current_request); - elem->current_request = NULL; - } - if (elem->parser_current_header != NULL) { - http_header_delete(elem->parser_current_header); - } - elem->parser_current_header = NULL; - elem->parser_header_state = HSTATE_NONE; - elem->request_complete = false; -} -void skt_elem_write_response(skt_elem *elem, http_response *response, bool dispose) { - http_header* connection_header = http_header_list_get(response->headers, HEADER_CONNECTION); - if (connection_header != NULL && strcasecmp(connection_header->content, "close") == 0) { - elem->info->close_afterwrite = true; - } - if (connection_header == NULL) { - if (response->resp->version == HTTP11) { - http_header_list_add(response->headers, http_header_new(HEADER_CONNECTION, "Keep-Alive"), true); - } else if (response->resp->version == HTTP10) { - elem->info->close_afterwrite = true; - } - } - char *response_str = http_response_write(response); - utstring_printf(elem->info->write, "%s", response_str); - free(response_str); - if (dispose == true) { - http_response_delete(response); - } -} -void skt_elem_delete(skt_elem* elem) { - if (elem->info!=NULL) skt_delete(elem->info); - if (elem->current_request!=NULL) http_request_delete(elem->current_request); - if (elem->parser!= NULL) { - elem->parser->data = NULL; - free(elem->parser); - } - - free(elem); -} +} \ No newline at end of file diff --git a/src/main.h b/src/main.h index 3dca40e..9408f69 100644 --- a/src/main.h +++ b/src/main.h @@ -15,32 +15,11 @@ extern "C" { #define SERVER_NAME "KHTTP/0.1" #include -#include "http_parser.h" -#include "socket.h" #include "http.h" #include "util.h" - - typedef enum skt_elem_hstate {HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD} skt_elem_hstate; - typedef struct skt_elem { - skt_info* info; - http_request *current_request; - bool request_complete; - http_parser *parser; - http_header *parser_current_header; - skt_elem_hstate parser_header_state; - struct skt_elem *next; - } skt_elem; - - skt_elem* skt_elem_new(skt_info *info); - void skt_elem_reset(skt_elem *elem); - void skt_elem_write_response(skt_elem *skt, http_response *response, bool dispose); - void skt_elem_delete(skt_elem* elem); int main(int argc, char** argv); - - - #ifdef __cplusplus } #endif diff --git a/src/queue.c b/src/queue.c index 0ae29d1..f016bd7 100644 --- a/src/queue.c +++ b/src/queue.c @@ -8,7 +8,11 @@ #include "ut/utlist.h" queue_item* queue_item_new() { + static uint64_t nextid = 0; + queue_item *item = calloc(1, sizeof(queue_item)); + item->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); + item->blocked = false; return item; } queue_item* queue_item_new2(char* tag, void* data) { @@ -53,7 +57,9 @@ int queue_add(queue *q, queue_item *item) { QUEUE_LOCK(q); DL_APPEND(q->list, item); q->count++; - pthread_cond_signal(q->cond); + if (item->blocked == false) { + pthread_cond_signal(q->cond); + } QUEUE_UNLOCK(q); return 0; } @@ -77,13 +83,39 @@ queue_item* queue_fetchone(queue *q, bool blocking) { pthread_cond_wait(q->cond, q->mutex); } if (q->count > 0) { - item = q->list; - DL_DELETE(q->list, q->list); - q->count--; + queue_item *elem; + LL_FOREACH(q->list, elem) { + if (elem->blocked == false) { + item = elem; + break; + } + } + if (item != NULL) { + item = q->list; + DL_DELETE(q->list, q->list); + q->count--; + } } QUEUE_UNLOCK(q); return item; } +void queue_unblock(queue *q, uint64_t itemid) { + queue_item *item=NULL, *elem=NULL; + QUEUE_LOCK(q); + LL_FOREACH(q->list, elem) { + if (elem->id == itemid) { + if (elem->blocked == true) { + elem->blocked = false; + item = elem; + } + break; + } + } + if (item != NULL) { + pthread_cond_signal(q->cond); + } + QUEUE_UNLOCK(q); +} void queue_clear(queue *q) { QUEUE_LOCK(q); queue_item *elem, *tmp; diff --git a/src/queue.h b/src/queue.h index 9a0ebae..21da5fb 100644 --- a/src/queue.h +++ b/src/queue.h @@ -17,9 +17,11 @@ extern "C" { #include typedef struct queue_item { + uint64_t id; struct queue_item *prev; struct queue_item *next; char tag[16]; + bool blocked; void *data; } queue_item; @@ -52,6 +54,7 @@ extern "C" { int queue_add(queue *q, queue_item *item); int queue_remove(queue *q, queue_item *item); queue_item* queue_fetchone(queue *q, bool blocking); + void queue_unblock(queue *q, uint64_t itemid); void queue_clear(queue *q); void queue_ping(queue *q); size_t queue_count(queue *q); diff --git a/src/server-socket.c b/src/server-socket.c new file mode 100644 index 0000000..66eb1a0 --- /dev/null +++ b/src/server-socket.c @@ -0,0 +1,84 @@ +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "socket.h" +#include "server-socket.h" +#include "main.h" + +int server_socket_create() { + int fd = 0; + fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (fd < 0) { + fatal("could not create socket"); + } + return fd; +} +void server_socket_listen(int fd, uint16_t port) { + struct sockaddr_in server_address; + memset(&server_address, 0, sizeof server_address); + server_address.sin_family = AF_INET; + server_address.sin_addr.s_addr = INADDR_ANY; + server_address.sin_port = htons(port); + + if (bind(fd, (struct sockaddr*)&server_address, sizeof server_address) < 0) { + close(fd); + fatal("Failed to bind to socket"); + } + if (listen(fd, SOMAXCONN) < 0) { + close(fd); + fatal("Could not set socket to listen mode"); + } + info("Listening on port %u", port); +} +void server_socket_release(int fd) { + if (close(fd) < 0) { + warning(true, "could not close socket"); + } +} +bool server_socket_canaccept(int fd) { + struct pollfd* pfd = calloc(1, sizeof(struct pollfd)); + + pfd[0].fd = fd; + pfd[0].events = POLLIN; + + if (poll(pfd, 1, 50/*ms*/) < 0) { + warning(true, "poll failed"); + free(pfd); + return false; + } + if ((pfd[0].revents & POLLIN) == POLLIN) { + free(pfd); + return true; + } + free(pfd); + return false; +} +skt_info* server_socket_accept(int fd, int flags) { + struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in)); + + int clientfd=0; + socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in); + clientfd = accept4(fd, (struct sockaddr*)clientaddr, &clientaddr_len, flags); + if (clientfd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { + warning(true, "error accepting connection"); + return NULL; + } + + skt_info* skt = skt_new(clientfd); + skt->clientaddr = clientaddr; + skt->fd = clientfd; + + info("[#%lu %s] New Connection", skt->id, skt_clientaddr(skt)); + + return skt; + +} \ No newline at end of file diff --git a/src/server-socket.h b/src/server-socket.h new file mode 100644 index 0000000..9f9ccf8 --- /dev/null +++ b/src/server-socket.h @@ -0,0 +1,29 @@ +/* + * File: server-socket.h + * Author: sam + * + * Created on 15 August 2014, 13:22 + */ + +#ifndef SERVER_SOCKET_H +#define SERVER_SOCKET_H + +#include "socket.h" + +#ifdef __cplusplus +extern "C" { +#endif + + int server_socket_create(); + void server_socket_listen(int fd, uint16_t port); + void server_socket_release(int fd); + bool server_socket_canaccept(int fd); + skt_info* server_socket_accept(int fd, int flags); + + +#ifdef __cplusplus +} +#endif + +#endif /* SERVER_SOCKET_H */ + diff --git a/src/socket.c b/src/socket.c index 94b222a..a2290d3 100644 --- a/src/socket.c +++ b/src/socket.c @@ -6,40 +6,40 @@ #include #include #include -#include #include #include -#include #include #include +#include + #include "socket.h" #include "ut/utstring.h" +#include "data-buffer.h" #include "main.h" +#include "ut/utlist.h" u_int64_t skt_nextid() { static u_int64_t id = 0; return __atomic_fetch_add(&id, 1, __ATOMIC_SEQ_CST); } skt_info* skt_new(int fd) { + assert(fd>0); skt_info* skt = calloc(1, sizeof(skt_info)); skt->id = skt_nextid(); skt->fd = fd; - skt->last_act = skt->time_opened = time(NULL); - utstring_new(skt->read); - utstring_new(skt->write); - skt->close = false; - skt->close_afterwrite = false; - skt->closed = false; + skt->time_opened = time(NULL); + skt->error = false; + skt->clientaddr = NULL; return skt; } void skt_delete(skt_info* skt) { - utstring_free(skt->read); - utstring_free(skt->write); + assert(skt != NULL); free(skt->clientaddr); free(skt); } bool skt_canread(skt_info* skt) { + assert(skt != NULL); int len = 0; if (ioctl(skt->fd, FIONREAD, &len) < 0) { warning(true, "ioctl failed"); @@ -47,133 +47,79 @@ bool skt_canread(skt_info* skt) { } return len > 0; } -uint32_t skt_read(skt_info* skt) { - char buffer[1024]; - memset(buffer, 0, 1024); - - int result = read(skt->fd, &buffer,1023); +size_t skt_read(skt_info* skt, char* buffer, size_t bufferlen) { + assert(skt != NULL); + int result = read(skt->fd, buffer, bufferlen); if (result < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { warning(true, "read error"); - skt->close = true; + skt->error = true; } return 0; } - skt->last_act = time(NULL); - utstring_printf(skt->read, "%s", buffer); return result; //Number of bytes read } -uint32_t skt_write(skt_info* skt) { - if (utstring_len(skt->write) == 0) { - return 0; - } +size_t skt_write(skt_info* skt, char* data, size_t len) { + assert(skt != NULL); + assert(data != NULL); - int result = write(skt->fd, utstring_body(skt->write), utstring_len(skt->write)); + int result = write(skt->fd, data, len); if (result < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { warning(true, "write error"); - skt->close = true; + skt->error = true; } return 0; } - - skt->last_act = time(NULL); - - if (result == utstring_len(skt->write)) { - utstring_clear(skt->write); - return result; - } - //remove first x chars - char* newstr = calloc(utstring_len(skt->write) - result + 1, sizeof(char)); - - char* writeBody = utstring_body(skt->write); - strcpy(newstr, writeBody + (sizeof(char) * result)); - - utstring_clear(skt->write); - utstring_printf(skt->write, "%s", newstr); - free(newstr); return result; //bytes written } -void skt_close(skt_info* skt) { - if (skt->closed == true) { - return; +int skt_write_data_buffer(skt_info *skt, data_buffer_list *list) { + assert(skt != NULL); + assert(list != NULL); + BUFFER_LIST_RD_LOCK(list); + + do { + data_buffer *elem = list->first; + size_t written = skt_write(skt, elem->buffer + elem->rOffset, elem->wOffset - elem->rOffset); + if (written == 0) { + break; + } + elem->rOffset += written; + if (elem->rOffset == elem->wOffset) { + BUFFER_LIST_WR_LOCK(list); + LL_DELETE(list->first, elem); + BUFFER_LIST_WR_UNLOCK(list); + data_buffer_free(elem); + } + } while(list->first != NULL); + + int result; + if (skt->error == true) { + result = -1; } - info("[#%lu %s] Closed", skt->id, skt_clientaddr(skt)); + if (list->first == NULL) { + result = 0; + } else { + result = 1; + } + + BUFFER_LIST_RD_UNLOCK(list); + + return result; +} +void skt_close(skt_info* skt) { + assert(skt != NULL); if (close(skt->fd) < 0) { warning(true, "error closing socket"); } - skt->closed = true; } -char* skt_clientaddr(skt_info *skt) { - char* address = inet_ntoa(skt->clientaddr->sin_addr); +const char* skt_clientaddr(skt_info *skt) { + assert(skt != NULL); + char *tmp = calloc(INET_ADDRSTRLEN, sizeof(char)); + const char* address = inet_ntop(AF_INET, &skt->clientaddr->sin_addr, tmp, INET_ADDRSTRLEN); + if (address == NULL) { + warning(true, "error fetching client address"); + free(tmp); + } return address; -} - -int svr_create() { - int fd = 0; - fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); - if (fd < 0) { - fatal("could not create socket"); - } - return fd; -} -void svr_listen(int fd, uint16_t port) { - struct sockaddr_in server_address; - memset(&server_address, 0, sizeof server_address); - server_address.sin_family = AF_INET; - server_address.sin_addr.s_addr = INADDR_ANY; - server_address.sin_port = htons(port); - - if (bind(fd, (struct sockaddr*)&server_address, sizeof server_address) < 0) { - close(fd); - fatal("Failed to bind to socket"); - } - if (listen(fd, SOMAXCONN) < 0) { - close(fd); - fatal("Could not set socket to listen mode"); - } - info("Listening on port %u", port); -} -void svr_release(int fd) { - if (close(fd) < 0) { - warning(true, "could not close socket"); - } -} -bool svr_canaccept(int fd) { - struct pollfd* pfd = calloc(1, sizeof(struct pollfd)); - - pfd[0].fd = fd; - pfd[0].events = POLLIN; - - if (poll(pfd, 1, 50/*ms*/) < 0) { - warning(true, "poll failed"); - free(pfd); - return false; - } - if ((pfd[0].revents & POLLIN) == POLLIN) { - free(pfd); - return true; - } - free(pfd); - return false; -} -skt_info* svr_accept(int fd) { - struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in)); - - int clientfd=0; - socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in); - clientfd = accept(fd, (struct sockaddr*)clientaddr, &clientaddr_len); - if (clientfd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { - warning(true, "error accepting connection"); - return NULL; - } - - skt_info* skt = skt_new(clientfd); - skt->clientaddr = clientaddr; - skt->fd = clientfd; - - info("[#%lu %s] New Connection", skt->id, skt_clientaddr(skt)); - - return skt; - } \ No newline at end of file diff --git a/src/socket.h b/src/socket.h index 1233f9a..74f5022 100644 --- a/src/socket.h +++ b/src/socket.h @@ -19,39 +19,26 @@ extern "C" { #include #include "http.h" #include "ut/utstring.h" +#include "data-buffer.h" - typedef struct skt_info skt_info; - - struct skt_info { + typedef struct skt_info { u_int64_t id; int fd; - time_t time_opened; - time_t last_act; - UT_string *read; - UT_string *write; - bool close; - bool close_afterwrite; - bool closed; struct sockaddr_in* clientaddr; - }; + time_t time_opened; + bool error; + } skt_info; u_int64_t skt_nextid(); skt_info* skt_new(int fd); void skt_delete(skt_info *skt); bool skt_canread(skt_info *skt); - uint32_t skt_read(skt_info *skt); - uint32_t skt_write(skt_info *skt); + size_t skt_read(skt_info *skt, char* buffer, size_t bufferlen); + size_t skt_write(skt_info* skt, char* data, size_t len); + int skt_write_data_buffer(skt_info *skt, data_buffer_list *list); void skt_close(skt_info *skt); - char* skt_clientaddr(skt_info *skt); - - int svr_create(); - void svr_listen(int fd, uint16_t port); - void svr_release(int fd); - bool svr_canaccept(int fd); - skt_info* svr_accept(int fd); - - + const char* skt_clientaddr(skt_info *skt); #ifdef __cplusplus } diff --git a/src/util.c b/src/util.c index f563c55..ceac25f 100644 --- a/src/util.c +++ b/src/util.c @@ -22,14 +22,17 @@ void fatal(char* fmt, ...) { va_end(va); if (errno != 0) { - char *errnostr = calloc(64, sizeof(char)); - strerror_r(errno, errnostr, 64); + char *errnostr_buf = calloc(64, sizeof(char)); + char *errnostr = strerror_r(errno, errnostr_buf, 64); + if (strlen(errnostr) == 0) { + snprintf(errnostr, 64, "Code(%d)", errno); + } char *errstr = calloc(strlen(msg)+strlen(errnostr)+3, sizeof(char)); strcat(errstr, msg); strcat(errstr, ": "); strcat(errstr, errnostr); LOG(LFATAL, errstr); - free(errnostr); + free(errnostr_buf); free(errstr); } else { LOG(LFATAL, msg); @@ -46,14 +49,17 @@ void warning(bool use_errno, char* fmt, ...) { va_end(va); if (use_errno == true) { - char *errnostr = calloc(64, sizeof(char)); - strerror_r(errno, errnostr, 64); + char *errnostr_buf = calloc(64, sizeof(char)); + char *errnostr = strerror_r(errno, errnostr_buf, 64); + if (strlen(errnostr) == 0) { + snprintf(errnostr, 64, "Code(%d)", errno); + } char *errstr = calloc(strlen(msg)+strlen(errnostr)+3, sizeof(char)); strcat(errstr, msg); strcat(errstr, ": "); strcat(errstr, errnostr); - LOG(LWARNING, errstr); - free(errnostr); + LOG(LFATAL, errstr); + free(errnostr_buf); free(errstr); } else { LOG(LWARNING, msg);