From 053a6a8e21be5f3632bffb6aa3cfe2eae353b87d Mon Sep 17 00:00:00 2001 From: Sam Stevens Date: Fri, 22 Aug 2014 21:26:23 +0100 Subject: [PATCH] Working on server read loop --- src/data-buffer.c | 20 +++++++++---------- src/data-buffer.h | 17 +++++++++------- src/http-reader.c | 3 ++- src/http-reader.h | 8 -------- src/http.c | 31 ++++++++++++++++++++++++----- src/http.h | 13 +++++++------ src/server-connection.c | 43 ++++++++++++++++++++++++++++++++++++++++- src/server-connection.h | 11 ++++++++++- src/server-loop-read.c | 34 ++++++++++++++++++++++++++++---- src/socket.c | 16 ++++++++------- src/socket.h | 4 ++-- 11 files changed, 148 insertions(+), 52 deletions(-) diff --git a/src/data-buffer.c b/src/data-buffer.c index 3254e48..2813aa4 100644 --- a/src/data-buffer.c +++ b/src/data-buffer.c @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -41,7 +42,6 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n) blocks++; LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE)); } - size_t offset = 0; data_buffer *elem; LL_FOREACH(newbuf, elem) { @@ -59,8 +59,8 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n) } void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) { assert(list != NULL); - if (rd == true) pthread_mutex_lock(list->rdlock); if (wr == true) pthread_mutex_lock(list->wrlock); + if (rd == true) pthread_mutex_lock(list->rdlock); } void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr) { assert(list != NULL); @@ -71,15 +71,15 @@ void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr) { data_buffer* data_buffer_new(size_t size) { assert(size > 0); - data_buffer* buffer = calloc(1, sizeof(data_buffer)); - buffer->buffer = calloc(size, sizeof(char)); - buffer->size = size; + data_buffer* buf = calloc(1, sizeof(data_buffer)); + buf->buffer = calloc(size, sizeof(char)); + buf->size = size; - return buffer; + return buf; } -void data_buffer_free(data_buffer *buffer) { - assert(buffer != NULL); +void data_buffer_free(data_buffer *buf) { + assert(buf != NULL); - free(buffer->buffer); - free(buffer); + free(buf->buffer); + free(buf); } diff --git a/src/data-buffer.h b/src/data-buffer.h index f4b043e..e659eb9 100644 --- a/src/data-buffer.h +++ b/src/data-buffer.h @@ -8,6 +8,7 @@ #ifndef DATA_BUFFER_H #define DATA_BUFFER_H +#include #include #include @@ -17,23 +18,26 @@ extern "C" { #define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, true, true) #define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, true, true) +#define BUFFER_LIST_WRONLY_LOCK(l) data_buffer_list_lock(l, false, true) +#define BUFFER_LIST_WRONLY_UNLOCK(l) data_buffer_list_unlock(l, false, true) #define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false) #define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false) #define DATA_BUFFER_SIZE 16*1024 + typedef struct data_buffer_list { + struct data_buffer *first; + pthread_mutex_t *wrlock, *rdlock; + } data_buffer_list; + typedef struct data_buffer { char* buffer; size_t size; - size_t wOffset, rOffset; + size_t wOffset; + size_t rOffset; struct data_buffer *next; } data_buffer; - typedef struct data_buffer_list { - data_buffer *first; - pthread_mutex_t *wrlock, *rdlock; - } data_buffer_list; - data_buffer_list* data_buffer_list_new(); void data_buffer_list_delete(data_buffer_list *list); void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n); @@ -43,7 +47,6 @@ extern "C" { data_buffer* data_buffer_new(size_t size); void data_buffer_free(data_buffer *buffer); - #ifdef __cplusplus } #endif diff --git a/src/http-reader.c b/src/http-reader.c index 498501c..9e49537 100644 --- a/src/http-reader.c +++ b/src/http-reader.c @@ -12,7 +12,7 @@ str = calloc(length+1, sizeof(char));\ strncpy(str, at, length);\ }while(0); -#define SKT(parser) ((request_parse_state*)parser->data) +#define SKT(parser) ((server_parse_status*)parser->data) http_parser_settings *parser_settings = NULL; @@ -43,6 +43,7 @@ int parser_cb_on_message_begin(http_parser* parser) { http_request_delete(SKT(parser)->current_request); } SKT(parser)->current_request = http_request_new(); + SKT(parser)->request_complete = false; return 0; } diff --git a/src/http-reader.h b/src/http-reader.h index c482577..82d515b 100644 --- a/src/http-reader.h +++ b/src/http-reader.h @@ -17,14 +17,6 @@ extern "C" { #include "http.h" #include "http_parser.h" - typedef struct request_parse_state { - http_request *current_request; - bool request_complete; - struct http_parser *parser; - http_header *parser_current_header; - enum {HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD} parser_header_state; - } request_parse_state; - http_parser_settings* parser_get_settings(); void parser_free_settings(); diff --git a/src/http.c b/src/http.c index 59d7cb3..dc2d5ac 100644 --- a/src/http.c +++ b/src/http.c @@ -4,6 +4,8 @@ #include #include #include + +#include "ut/utlist.h" #include "ut/utarray.h" #include "ut/utstring.h" @@ -364,7 +366,7 @@ char* http_response_write(http_response *resp) { return outputStr; } -http_response* http_response_create_builtin(uint16_t code, char* errmsg) { +http_response* http_response_create_builtin(uint16_t code, const char* errmsg) { http_response *resp = http_response_new(http_response_line_new(code)); http_header_list_add(resp->headers, http_header_new(HEADER_CONTENT_TYPE, "text/html"), false); @@ -442,15 +444,34 @@ char* http_chunks_terminate(http_header_list *footers) { http_response_list* http_response_list_new() { http_response_list *list = calloc(1, sizeof(http_response_list)); - list->responses = NULL; - list->count = 0; + assert(list != NULL); + list->first = NULL; return list; } void http_response_list_append(http_response_list *list, http_response* response) { - list->responses = realloc(list->responses, (++list->count)*sizeof(http_response*)); - list->responses[list->count-1] = response; + assert(list != NULL); + assert(response != NULL); + LL_APPEND(list->first, response); +} +http_response* http_response_list_next(http_response_list *list) { + assert(list != NULL); + return http_response_list_next2(list, true); +} +http_response* http_response_list_next2(http_response_list *list, bool remove) { + assert(list != NULL); + if (list->first == NULL) { + return NULL; + } + http_response* resp = list->first; + if (remove == true) { + LL_DELETE(list->first, resp); + resp->next = NULL; + } + return resp; } void http_response_list_delete(http_response_list *list) { + assert(list != NULL); + http_response *elem; HTTP_RESPONSE_LIST_FOREACH(list, elem) { http_response_delete(elem); diff --git a/src/http.h b/src/http.h index 4232696..d004cd4 100644 --- a/src/http.h +++ b/src/http.h @@ -82,6 +82,7 @@ extern "C" { http_request_parsestatus parsestatus; http_header_list *headers; char *body; + struct http_request *next; } http_request; typedef struct http_response { @@ -89,15 +90,13 @@ extern "C" { http_header_list *headers; bool body_chunked; char* body; + struct http_response *next; } http_response; -#define HTTP_RESPONSE_LIST_FOREACH(list, elem) \ - elem = list->count == 0 ? NULL : list->responses[0]; \ - for(int i=0; icount; elem=list->responses[++i]) +#define HTTP_RESPONSE_LIST_FOREACH(list, elem) LL_FOREACH(list->first, elem) typedef struct http_response_list { - http_response **responses; - size_t count; + http_response *first; } http_response_list; char* http_method_getstring(http_request_method method, char* method_other); @@ -130,13 +129,15 @@ extern "C" { void http_response_append_body(http_response *resp, const char* body); void http_response_delete(http_response *resp); char* http_response_write(http_response *resp); - http_response* http_response_create_builtin(uint16_t code, char* errmsg); + http_response* http_response_create_builtin(uint16_t code, const char* errmsg); char* http_chunks_write(char* source); char* http_chunks_terminate(http_header_list *footers); http_response_list* http_response_list_new(); void http_response_list_append(http_response_list *list, http_response* response); + http_response* http_response_list_next(http_response_list *list); + http_response* http_response_list_next2(http_response_list *list, bool remove); void http_response_list_delete(http_response_list *list); #ifdef __cplusplus diff --git a/src/server-connection.c b/src/server-connection.c index 0ba92db..39578a3 100644 --- a/src/server-connection.c +++ b/src/server-connection.c @@ -5,7 +5,9 @@ #include #include +#include "http_parser.h" #include "http.h" +#include "http-reader.h" #include "socket.h" #include "server-state.h" #include "server-connection.h" @@ -19,7 +21,8 @@ server_connection* server_connection_new(socket_info *skt, server_state *state) conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); conn->last_activity = time(NULL); conn->server = state; - conn->parse_state = NULL; + conn->parse_state = server_parse_status_new(); + conn->pending_requests = NULL; conn->pending_responses = http_response_list_new(); conn->pending_writes = data_buffer_list_new(); conn->skt = skt; @@ -29,9 +32,47 @@ server_connection* server_connection_new(socket_info *skt, server_state *state) void server_connection_delete(server_connection *conn) { assert(conn!=NULL); + server_parse_status_delete(conn->parse_state); http_response_list_delete(conn->pending_responses); data_buffer_list_delete(conn->pending_writes); skt_delete(conn->skt); pthread_mutex_destroy(&conn->mutex); free(conn); +} + +server_parse_status* server_parse_status_new() { + server_parse_status *state = calloc(1, sizeof(server_parse_status)); + state->current_request = NULL; + state->parser_current_header = NULL; + state->request_complete = false; + state->parser_header_state = HSTATE_NONE; + state->parser = calloc(1, sizeof(http_parser)); + http_parser_init(state->parser, HTTP_REQUEST); + + return state; +} +void server_parse_status_delete(server_parse_status* state) { + assert(state!=NULL); + + if (state->current_request != NULL) { + http_request_delete(state->current_request); + } + if (state->parser != NULL) { + free(state->parser); + } + if (state->parser_current_header != NULL) { + http_header_delete(state->parser_current_header); + } + free(state); +} +void server_parser_status_reset(server_parse_status* state) { + assert(state!=NULL); + + state->request_complete = false; + state->parser_header_state = HSTATE_NONE; + if (state->parser_current_header != NULL) { + http_header_delete(state->parser_current_header); + state->parser_current_header = NULL; + } + http_parser_init(state->parser, HTTP_REQUEST); } \ No newline at end of file diff --git a/src/server-connection.h b/src/server-connection.h index 475cf75..b2cb8e5 100644 --- a/src/server-connection.h +++ b/src/server-connection.h @@ -25,12 +25,16 @@ extern "C" { #define CONN_LOCK(c) pthread_mutex_lock(&c->mutex) #define CONN_UNLOCK(c) pthread_mutex_unlock(&c->mutex) + typedef enum server_parse_header_state { + HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD + } server_parse_header_state; + typedef struct server_parse_status { http_request *current_request; bool request_complete; http_parser *parser; http_header *parser_current_header; - int parser_header_state; + server_parse_header_state parser_header_state; } server_parse_status; typedef struct server_connection { @@ -38,6 +42,7 @@ extern "C" { struct socket_info *skt; time_t last_activity; server_state *server; + http_request *pending_requests; http_response_list *pending_responses; data_buffer_list *pending_writes; server_parse_status *parse_state; @@ -47,6 +52,10 @@ extern "C" { server_connection* server_connection_new(socket_info *skt, server_state *state); void server_connection_delete(server_connection *conn); + + server_parse_status* server_parse_status_new(); + void server_parse_status_delete(server_parse_status* state); + void server_parser_status_reset(server_parse_status* state); #ifdef __cplusplus } diff --git a/src/server-loop-read.c b/src/server-loop-read.c index 2028dfc..8c38ef7 100644 --- a/src/server-loop-read.c +++ b/src/server-loop-read.c @@ -4,11 +4,16 @@ #include #include +#include "ut/utlist.h" + #include "util.h" #include "log.h" #include "config.h" #include "data-buffer.h" #include "queue.h" +#include "http_parser.h" +#include "http.h" +#include "http-reader.h" #include "server-connection.h" #include "server-state.h" @@ -46,10 +51,31 @@ void* server_loop_read(void* arg) { break; } totalread += count; - data_buffer_list_append(conn->pending_writes, readbuf, count); - } - if (totalread > 0) { - queue_add(conn->server->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", conn)); + bool error = false; + size_t parsed_count = http_parser_execute(conn->parse_state->parser, parser_get_settings(), readbuf, count); + if (conn->parse_state->parser->upgrade) { + //No upgrades of the protocol are supported, send 501 not impl. back + http_response *resp = http_response_create_builtin(501, "Protocol upgrade is not supported"); + http_header_list_add(resp->headers, http_header_new(HEADER_CONNECTION, "Close"), false); + http_response_list_append(conn->pending_responses, resp); + error = true; + } else if (parsed_count != count || HTTP_PARSER_ERRNO(conn->parse_state->parser) != HPE_OK) { + //Error parsing request + http_response *resp = http_response_create_builtin(400, http_errno_description(HTTP_PARSER_ERRNO(conn->parse_state->parser))); + http_header_list_add(resp->headers, http_header_new(HEADER_CONNECTION, "Close"), false); + http_response_list_append(conn->pending_responses, resp); + error = true; + } else if (conn->parse_state->request_complete == true) { + //Request has been read successfully, notify worker queue + LL_APPEND(conn->pending_requests, conn->parse_state->current_request); + server_parser_status_reset(conn->parse_state); + queue_add(conn->server->pools[POOL_WORKER]->queue, queue_item_new2("REQ", (void*)conn)); + } + if (error = true) { + //Write any error directly, this will also close the connection + queue_add(conn->server->pools[POOL_WRITE]->queue, queue_item_new2("RESP", (void*)conn)); + break; + } } CONN_UNLOCK(conn); queue_return_item(th->pool->queue, item, item->blocked == false); diff --git a/src/socket.c b/src/socket.c index fb30b01..d8f593d 100644 --- a/src/socket.c +++ b/src/socket.c @@ -50,9 +50,9 @@ bool skt_canread(socket_info* skt) { } return len > 0; } -size_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) { +ssize_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) { assert(skt != NULL); - int result = read(skt->fd, buffer, bufferlen); + ssize_t result = read(skt->fd, buffer, bufferlen); if (result < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { warning(true, "read error"); @@ -62,11 +62,11 @@ size_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) { } return result; //Number of bytes read } -size_t skt_write(socket_info* skt, char* data, size_t len) { +ssize_t skt_write(socket_info* skt, char* data, size_t len) { assert(skt != NULL); assert(data != NULL); - int result = write(skt->fd, data, len); + ssize_t result = write(skt->fd, data, len); if (result < 0) { if (errno != EAGAIN && errno != EWOULDBLOCK) { warning(true, "write error"); @@ -82,18 +82,20 @@ int skt_data_buffer(socket_info *skt, data_buffer_list *list) { BUFFER_LIST_RD_LOCK(list); do { + BUFFER_LIST_RD_LOCK(list); data_buffer *elem = list->first; - size_t written = skt_write(skt, elem->buffer + elem->rOffset, elem->wOffset - elem->rOffset); + ssize_t written = skt_write(skt, elem->buffer + elem->rOffset, elem->wOffset - elem->rOffset); if (written == 0) { break; } elem->rOffset += written; if (elem->rOffset == elem->wOffset) { - BUFFER_LIST_WR_LOCK(list); + BUFFER_LIST_WRONLY_LOCK(list); LL_DELETE(list->first, elem); - BUFFER_LIST_WR_UNLOCK(list); + BUFFER_LIST_WRONLY_UNLOCK(list); data_buffer_free(elem); } + BUFFER_LIST_RD_UNLOCK(list); } while(list->first != NULL); int result; diff --git a/src/socket.h b/src/socket.h index e1d41e6..0c20f5b 100644 --- a/src/socket.h +++ b/src/socket.h @@ -34,8 +34,8 @@ extern "C" { void skt_delete(socket_info *skt); bool skt_canread(socket_info *skt); - size_t skt_read(socket_info *skt, char* buffer, size_t bufferlen); - size_t skt_write(socket_info* skt, char* data, size_t len); + ssize_t skt_read(socket_info *skt, char* buffer, size_t bufferlen); + ssize_t skt_write(socket_info* skt, char* data, size_t len); int skt_write_data_buffer(socket_info *skt, data_buffer_list *list); void skt_close(socket_info *skt); char* skt_clientaddr(socket_info *skt, char* address, size_t address_len);