Working on server read loop

This commit is contained in:
2014-08-22 21:26:23 +01:00
parent 1650dbc1a3
commit 053a6a8e21
11 changed files with 148 additions and 52 deletions

View File

@@ -1,4 +1,5 @@
#include <stdio.h> #include <stdio.h>
#include <stddef.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdbool.h> #include <stdbool.h>
#include <pthread.h> #include <pthread.h>
@@ -41,7 +42,6 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n)
blocks++; blocks++;
LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE)); LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE));
} }
size_t offset = 0; size_t offset = 0;
data_buffer *elem; data_buffer *elem;
LL_FOREACH(newbuf, elem) { LL_FOREACH(newbuf, elem) {
@@ -59,8 +59,8 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n)
} }
void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) { void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) {
assert(list != NULL); assert(list != NULL);
if (rd == true) pthread_mutex_lock(list->rdlock);
if (wr == true) pthread_mutex_lock(list->wrlock); if (wr == true) pthread_mutex_lock(list->wrlock);
if (rd == true) pthread_mutex_lock(list->rdlock);
} }
void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr) { void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr) {
assert(list != NULL); assert(list != NULL);
@@ -71,15 +71,15 @@ void data_buffer_list_unlock(data_buffer_list *list, bool rd, bool wr) {
data_buffer* data_buffer_new(size_t size) { data_buffer* data_buffer_new(size_t size) {
assert(size > 0); assert(size > 0);
data_buffer* buffer = calloc(1, sizeof(data_buffer)); data_buffer* buf = calloc(1, sizeof(data_buffer));
buffer->buffer = calloc(size, sizeof(char)); buf->buffer = calloc(size, sizeof(char));
buffer->size = size; buf->size = size;
return buffer; return buf;
} }
void data_buffer_free(data_buffer *buffer) { void data_buffer_free(data_buffer *buf) {
assert(buffer != NULL); assert(buf != NULL);
free(buffer->buffer); free(buf->buffer);
free(buffer); free(buf);
} }

View File

@@ -8,6 +8,7 @@
#ifndef DATA_BUFFER_H #ifndef DATA_BUFFER_H
#define DATA_BUFFER_H #define DATA_BUFFER_H
#include <stddef.h>
#include <stdbool.h> #include <stdbool.h>
#include <pthread.h> #include <pthread.h>
@@ -17,23 +18,26 @@ extern "C" {
#define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, true, true) #define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, true, true)
#define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, true, true) #define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, true, true)
#define BUFFER_LIST_WRONLY_LOCK(l) data_buffer_list_lock(l, false, true)
#define BUFFER_LIST_WRONLY_UNLOCK(l) data_buffer_list_unlock(l, false, true)
#define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false) #define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false)
#define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false) #define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false)
#define DATA_BUFFER_SIZE 16*1024 #define DATA_BUFFER_SIZE 16*1024
typedef struct data_buffer_list {
struct data_buffer *first;
pthread_mutex_t *wrlock, *rdlock;
} data_buffer_list;
typedef struct data_buffer { typedef struct data_buffer {
char* buffer; char* buffer;
size_t size; size_t size;
size_t wOffset, rOffset; size_t wOffset;
size_t rOffset;
struct data_buffer *next; struct data_buffer *next;
} data_buffer; } data_buffer;
typedef struct data_buffer_list {
data_buffer *first;
pthread_mutex_t *wrlock, *rdlock;
} data_buffer_list;
data_buffer_list* data_buffer_list_new(); data_buffer_list* data_buffer_list_new();
void data_buffer_list_delete(data_buffer_list *list); void data_buffer_list_delete(data_buffer_list *list);
void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n); void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n);
@@ -43,7 +47,6 @@ extern "C" {
data_buffer* data_buffer_new(size_t size); data_buffer* data_buffer_new(size_t size);
void data_buffer_free(data_buffer *buffer); void data_buffer_free(data_buffer *buffer);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@@ -12,7 +12,7 @@
str = calloc(length+1, sizeof(char));\ str = calloc(length+1, sizeof(char));\
strncpy(str, at, length);\ strncpy(str, at, length);\
}while(0); }while(0);
#define SKT(parser) ((request_parse_state*)parser->data) #define SKT(parser) ((server_parse_status*)parser->data)
http_parser_settings *parser_settings = NULL; http_parser_settings *parser_settings = NULL;
@@ -43,6 +43,7 @@ int parser_cb_on_message_begin(http_parser* parser) {
http_request_delete(SKT(parser)->current_request); http_request_delete(SKT(parser)->current_request);
} }
SKT(parser)->current_request = http_request_new(); SKT(parser)->current_request = http_request_new();
SKT(parser)->request_complete = false;
return 0; return 0;
} }

View File

@@ -17,14 +17,6 @@ extern "C" {
#include "http.h" #include "http.h"
#include "http_parser.h" #include "http_parser.h"
typedef struct request_parse_state {
http_request *current_request;
bool request_complete;
struct http_parser *parser;
http_header *parser_current_header;
enum {HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD} parser_header_state;
} request_parse_state;
http_parser_settings* parser_get_settings(); http_parser_settings* parser_get_settings();
void parser_free_settings(); void parser_free_settings();

View File

@@ -4,6 +4,8 @@
#include <string.h> #include <string.h>
#include <stdbool.h> #include <stdbool.h>
#include <time.h> #include <time.h>
#include "ut/utlist.h"
#include "ut/utarray.h" #include "ut/utarray.h"
#include "ut/utstring.h" #include "ut/utstring.h"
@@ -364,7 +366,7 @@ char* http_response_write(http_response *resp) {
return outputStr; return outputStr;
} }
http_response* http_response_create_builtin(uint16_t code, char* errmsg) { http_response* http_response_create_builtin(uint16_t code, const char* errmsg) {
http_response *resp = http_response_new(http_response_line_new(code)); http_response *resp = http_response_new(http_response_line_new(code));
http_header_list_add(resp->headers, http_header_new(HEADER_CONTENT_TYPE, "text/html"), false); http_header_list_add(resp->headers, http_header_new(HEADER_CONTENT_TYPE, "text/html"), false);
@@ -442,15 +444,34 @@ char* http_chunks_terminate(http_header_list *footers) {
http_response_list* http_response_list_new() { http_response_list* http_response_list_new() {
http_response_list *list = calloc(1, sizeof(http_response_list)); http_response_list *list = calloc(1, sizeof(http_response_list));
list->responses = NULL; assert(list != NULL);
list->count = 0; list->first = NULL;
return list; return list;
} }
void http_response_list_append(http_response_list *list, http_response* response) { void http_response_list_append(http_response_list *list, http_response* response) {
list->responses = realloc(list->responses, (++list->count)*sizeof(http_response*)); assert(list != NULL);
list->responses[list->count-1] = response; assert(response != NULL);
LL_APPEND(list->first, response);
}
http_response* http_response_list_next(http_response_list *list) {
assert(list != NULL);
return http_response_list_next2(list, true);
}
http_response* http_response_list_next2(http_response_list *list, bool remove) {
assert(list != NULL);
if (list->first == NULL) {
return NULL;
}
http_response* resp = list->first;
if (remove == true) {
LL_DELETE(list->first, resp);
resp->next = NULL;
}
return resp;
} }
void http_response_list_delete(http_response_list *list) { void http_response_list_delete(http_response_list *list) {
assert(list != NULL);
http_response *elem; http_response *elem;
HTTP_RESPONSE_LIST_FOREACH(list, elem) { HTTP_RESPONSE_LIST_FOREACH(list, elem) {
http_response_delete(elem); http_response_delete(elem);

View File

@@ -82,6 +82,7 @@ extern "C" {
http_request_parsestatus parsestatus; http_request_parsestatus parsestatus;
http_header_list *headers; http_header_list *headers;
char *body; char *body;
struct http_request *next;
} http_request; } http_request;
typedef struct http_response { typedef struct http_response {
@@ -89,15 +90,13 @@ extern "C" {
http_header_list *headers; http_header_list *headers;
bool body_chunked; bool body_chunked;
char* body; char* body;
struct http_response *next;
} http_response; } http_response;
#define HTTP_RESPONSE_LIST_FOREACH(list, elem) \ #define HTTP_RESPONSE_LIST_FOREACH(list, elem) LL_FOREACH(list->first, elem)
elem = list->count == 0 ? NULL : list->responses[0]; \
for(int i=0; i<list->count; elem=list->responses[++i])
typedef struct http_response_list { typedef struct http_response_list {
http_response **responses; http_response *first;
size_t count;
} http_response_list; } http_response_list;
char* http_method_getstring(http_request_method method, char* method_other); char* http_method_getstring(http_request_method method, char* method_other);
@@ -130,13 +129,15 @@ extern "C" {
void http_response_append_body(http_response *resp, const char* body); void http_response_append_body(http_response *resp, const char* body);
void http_response_delete(http_response *resp); void http_response_delete(http_response *resp);
char* http_response_write(http_response *resp); char* http_response_write(http_response *resp);
http_response* http_response_create_builtin(uint16_t code, char* errmsg); http_response* http_response_create_builtin(uint16_t code, const char* errmsg);
char* http_chunks_write(char* source); char* http_chunks_write(char* source);
char* http_chunks_terminate(http_header_list *footers); char* http_chunks_terminate(http_header_list *footers);
http_response_list* http_response_list_new(); http_response_list* http_response_list_new();
void http_response_list_append(http_response_list *list, http_response* response); void http_response_list_append(http_response_list *list, http_response* response);
http_response* http_response_list_next(http_response_list *list);
http_response* http_response_list_next2(http_response_list *list, bool remove);
void http_response_list_delete(http_response_list *list); void http_response_list_delete(http_response_list *list);
#ifdef __cplusplus #ifdef __cplusplus

View File

@@ -5,7 +5,9 @@
#include <time.h> #include <time.h>
#include <assert.h> #include <assert.h>
#include "http_parser.h"
#include "http.h" #include "http.h"
#include "http-reader.h"
#include "socket.h" #include "socket.h"
#include "server-state.h" #include "server-state.h"
#include "server-connection.h" #include "server-connection.h"
@@ -19,7 +21,8 @@ server_connection* server_connection_new(socket_info *skt, server_state *state)
conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
conn->last_activity = time(NULL); conn->last_activity = time(NULL);
conn->server = state; conn->server = state;
conn->parse_state = NULL; conn->parse_state = server_parse_status_new();
conn->pending_requests = NULL;
conn->pending_responses = http_response_list_new(); conn->pending_responses = http_response_list_new();
conn->pending_writes = data_buffer_list_new(); conn->pending_writes = data_buffer_list_new();
conn->skt = skt; conn->skt = skt;
@@ -29,9 +32,47 @@ server_connection* server_connection_new(socket_info *skt, server_state *state)
void server_connection_delete(server_connection *conn) { void server_connection_delete(server_connection *conn) {
assert(conn!=NULL); assert(conn!=NULL);
server_parse_status_delete(conn->parse_state);
http_response_list_delete(conn->pending_responses); http_response_list_delete(conn->pending_responses);
data_buffer_list_delete(conn->pending_writes); data_buffer_list_delete(conn->pending_writes);
skt_delete(conn->skt); skt_delete(conn->skt);
pthread_mutex_destroy(&conn->mutex); pthread_mutex_destroy(&conn->mutex);
free(conn); free(conn);
} }
server_parse_status* server_parse_status_new() {
server_parse_status *state = calloc(1, sizeof(server_parse_status));
state->current_request = NULL;
state->parser_current_header = NULL;
state->request_complete = false;
state->parser_header_state = HSTATE_NONE;
state->parser = calloc(1, sizeof(http_parser));
http_parser_init(state->parser, HTTP_REQUEST);
return state;
}
void server_parse_status_delete(server_parse_status* state) {
assert(state!=NULL);
if (state->current_request != NULL) {
http_request_delete(state->current_request);
}
if (state->parser != NULL) {
free(state->parser);
}
if (state->parser_current_header != NULL) {
http_header_delete(state->parser_current_header);
}
free(state);
}
void server_parser_status_reset(server_parse_status* state) {
assert(state!=NULL);
state->request_complete = false;
state->parser_header_state = HSTATE_NONE;
if (state->parser_current_header != NULL) {
http_header_delete(state->parser_current_header);
state->parser_current_header = NULL;
}
http_parser_init(state->parser, HTTP_REQUEST);
}

View File

@@ -25,12 +25,16 @@ extern "C" {
#define CONN_LOCK(c) pthread_mutex_lock(&c->mutex) #define CONN_LOCK(c) pthread_mutex_lock(&c->mutex)
#define CONN_UNLOCK(c) pthread_mutex_unlock(&c->mutex) #define CONN_UNLOCK(c) pthread_mutex_unlock(&c->mutex)
typedef enum server_parse_header_state {
HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD
} server_parse_header_state;
typedef struct server_parse_status { typedef struct server_parse_status {
http_request *current_request; http_request *current_request;
bool request_complete; bool request_complete;
http_parser *parser; http_parser *parser;
http_header *parser_current_header; http_header *parser_current_header;
int parser_header_state; server_parse_header_state parser_header_state;
} server_parse_status; } server_parse_status;
typedef struct server_connection { typedef struct server_connection {
@@ -38,6 +42,7 @@ extern "C" {
struct socket_info *skt; struct socket_info *skt;
time_t last_activity; time_t last_activity;
server_state *server; server_state *server;
http_request *pending_requests;
http_response_list *pending_responses; http_response_list *pending_responses;
data_buffer_list *pending_writes; data_buffer_list *pending_writes;
server_parse_status *parse_state; server_parse_status *parse_state;
@@ -48,6 +53,10 @@ extern "C" {
server_connection* server_connection_new(socket_info *skt, server_state *state); server_connection* server_connection_new(socket_info *skt, server_state *state);
void server_connection_delete(server_connection *conn); void server_connection_delete(server_connection *conn);
server_parse_status* server_parse_status_new();
void server_parse_status_delete(server_parse_status* state);
void server_parser_status_reset(server_parse_status* state);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@@ -4,11 +4,16 @@
#include <unistd.h> #include <unistd.h>
#include <errno.h> #include <errno.h>
#include "ut/utlist.h"
#include "util.h" #include "util.h"
#include "log.h" #include "log.h"
#include "config.h" #include "config.h"
#include "data-buffer.h" #include "data-buffer.h"
#include "queue.h" #include "queue.h"
#include "http_parser.h"
#include "http.h"
#include "http-reader.h"
#include "server-connection.h" #include "server-connection.h"
#include "server-state.h" #include "server-state.h"
@@ -46,10 +51,31 @@ void* server_loop_read(void* arg) {
break; break;
} }
totalread += count; totalread += count;
data_buffer_list_append(conn->pending_writes, readbuf, count); bool error = false;
size_t parsed_count = http_parser_execute(conn->parse_state->parser, parser_get_settings(), readbuf, count);
if (conn->parse_state->parser->upgrade) {
//No upgrades of the protocol are supported, send 501 not impl. back
http_response *resp = http_response_create_builtin(501, "Protocol upgrade is not supported");
http_header_list_add(resp->headers, http_header_new(HEADER_CONNECTION, "Close"), false);
http_response_list_append(conn->pending_responses, resp);
error = true;
} else if (parsed_count != count || HTTP_PARSER_ERRNO(conn->parse_state->parser) != HPE_OK) {
//Error parsing request
http_response *resp = http_response_create_builtin(400, http_errno_description(HTTP_PARSER_ERRNO(conn->parse_state->parser)));
http_header_list_add(resp->headers, http_header_new(HEADER_CONNECTION, "Close"), false);
http_response_list_append(conn->pending_responses, resp);
error = true;
} else if (conn->parse_state->request_complete == true) {
//Request has been read successfully, notify worker queue
LL_APPEND(conn->pending_requests, conn->parse_state->current_request);
server_parser_status_reset(conn->parse_state);
queue_add(conn->server->pools[POOL_WORKER]->queue, queue_item_new2("REQ", (void*)conn));
}
if (error = true) {
//Write any error directly, this will also close the connection
queue_add(conn->server->pools[POOL_WRITE]->queue, queue_item_new2("RESP", (void*)conn));
break;
} }
if (totalread > 0) {
queue_add(conn->server->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", conn));
} }
CONN_UNLOCK(conn); CONN_UNLOCK(conn);
queue_return_item(th->pool->queue, item, item->blocked == false); queue_return_item(th->pool->queue, item, item->blocked == false);

View File

@@ -50,9 +50,9 @@ bool skt_canread(socket_info* skt) {
} }
return len > 0; return len > 0;
} }
size_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) { ssize_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) {
assert(skt != NULL); assert(skt != NULL);
int result = read(skt->fd, buffer, bufferlen); ssize_t result = read(skt->fd, buffer, bufferlen);
if (result < 0) { if (result < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { if (errno != EAGAIN && errno != EWOULDBLOCK) {
warning(true, "read error"); warning(true, "read error");
@@ -62,11 +62,11 @@ size_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) {
} }
return result; //Number of bytes read return result; //Number of bytes read
} }
size_t skt_write(socket_info* skt, char* data, size_t len) { ssize_t skt_write(socket_info* skt, char* data, size_t len) {
assert(skt != NULL); assert(skt != NULL);
assert(data != NULL); assert(data != NULL);
int result = write(skt->fd, data, len); ssize_t result = write(skt->fd, data, len);
if (result < 0) { if (result < 0) {
if (errno != EAGAIN && errno != EWOULDBLOCK) { if (errno != EAGAIN && errno != EWOULDBLOCK) {
warning(true, "write error"); warning(true, "write error");
@@ -82,18 +82,20 @@ int skt_data_buffer(socket_info *skt, data_buffer_list *list) {
BUFFER_LIST_RD_LOCK(list); BUFFER_LIST_RD_LOCK(list);
do { do {
BUFFER_LIST_RD_LOCK(list);
data_buffer *elem = list->first; data_buffer *elem = list->first;
size_t written = skt_write(skt, elem->buffer + elem->rOffset, elem->wOffset - elem->rOffset); ssize_t written = skt_write(skt, elem->buffer + elem->rOffset, elem->wOffset - elem->rOffset);
if (written == 0) { if (written == 0) {
break; break;
} }
elem->rOffset += written; elem->rOffset += written;
if (elem->rOffset == elem->wOffset) { if (elem->rOffset == elem->wOffset) {
BUFFER_LIST_WR_LOCK(list); BUFFER_LIST_WRONLY_LOCK(list);
LL_DELETE(list->first, elem); LL_DELETE(list->first, elem);
BUFFER_LIST_WR_UNLOCK(list); BUFFER_LIST_WRONLY_UNLOCK(list);
data_buffer_free(elem); data_buffer_free(elem);
} }
BUFFER_LIST_RD_UNLOCK(list);
} while(list->first != NULL); } while(list->first != NULL);
int result; int result;

View File

@@ -34,8 +34,8 @@ extern "C" {
void skt_delete(socket_info *skt); void skt_delete(socket_info *skt);
bool skt_canread(socket_info *skt); bool skt_canread(socket_info *skt);
size_t skt_read(socket_info *skt, char* buffer, size_t bufferlen); ssize_t skt_read(socket_info *skt, char* buffer, size_t bufferlen);
size_t skt_write(socket_info* skt, char* data, size_t len); ssize_t skt_write(socket_info* skt, char* data, size_t len);
int skt_write_data_buffer(socket_info *skt, data_buffer_list *list); int skt_write_data_buffer(socket_info *skt, data_buffer_list *list);
void skt_close(socket_info *skt); void skt_close(socket_info *skt);
char* skt_clientaddr(socket_info *skt, char* address, size_t address_len); char* skt_clientaddr(socket_info *skt, char* address, size_t address_len);