Implemented a basic echo server
This commit is contained in:
@@ -43,7 +43,6 @@ OBJECTFILES= \
|
|||||||
${OBJECTDIR}/src/http-server.o \
|
${OBJECTDIR}/src/http-server.o \
|
||||||
${OBJECTDIR}/src/http.o \
|
${OBJECTDIR}/src/http.o \
|
||||||
${OBJECTDIR}/src/log.o \
|
${OBJECTDIR}/src/log.o \
|
||||||
${OBJECTDIR}/src/main-loop.o \
|
|
||||||
${OBJECTDIR}/src/main.o \
|
${OBJECTDIR}/src/main.o \
|
||||||
${OBJECTDIR}/src/mime.o \
|
${OBJECTDIR}/src/mime.o \
|
||||||
${OBJECTDIR}/src/queue.o \
|
${OBJECTDIR}/src/queue.o \
|
||||||
@@ -124,11 +123,6 @@ ${OBJECTDIR}/src/log.o: nbproject/Makefile-${CND_CONF}.mk src/log.c
|
|||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/log.o src/log.c
|
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/log.o src/log.c
|
||||||
|
|
||||||
${OBJECTDIR}/src/main-loop.o: nbproject/Makefile-${CND_CONF}.mk src/main-loop.c
|
|
||||||
${MKDIR} -p ${OBJECTDIR}/src
|
|
||||||
${RM} "$@.d"
|
|
||||||
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/main-loop.o src/main-loop.c
|
|
||||||
|
|
||||||
${OBJECTDIR}/src/main.o: nbproject/Makefile-${CND_CONF}.mk src/main.c
|
${OBJECTDIR}/src/main.o: nbproject/Makefile-${CND_CONF}.mk src/main.c
|
||||||
${MKDIR} -p ${OBJECTDIR}/src
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
|
|||||||
@@ -43,7 +43,6 @@ OBJECTFILES= \
|
|||||||
${OBJECTDIR}/src/http-server.o \
|
${OBJECTDIR}/src/http-server.o \
|
||||||
${OBJECTDIR}/src/http.o \
|
${OBJECTDIR}/src/http.o \
|
||||||
${OBJECTDIR}/src/log.o \
|
${OBJECTDIR}/src/log.o \
|
||||||
${OBJECTDIR}/src/main-loop.o \
|
|
||||||
${OBJECTDIR}/src/main.o \
|
${OBJECTDIR}/src/main.o \
|
||||||
${OBJECTDIR}/src/mime.o \
|
${OBJECTDIR}/src/mime.o \
|
||||||
${OBJECTDIR}/src/queue.o \
|
${OBJECTDIR}/src/queue.o \
|
||||||
@@ -124,11 +123,6 @@ ${OBJECTDIR}/src/log.o: nbproject/Makefile-${CND_CONF}.mk src/log.c
|
|||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/log.o src/log.c
|
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/log.o src/log.c
|
||||||
|
|
||||||
${OBJECTDIR}/src/main-loop.o: nbproject/Makefile-${CND_CONF}.mk src/main-loop.c
|
|
||||||
${MKDIR} -p ${OBJECTDIR}/src
|
|
||||||
${RM} "$@.d"
|
|
||||||
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/main-loop.o src/main-loop.c
|
|
||||||
|
|
||||||
${OBJECTDIR}/src/main.o: nbproject/Makefile-${CND_CONF}.mk src/main.c
|
${OBJECTDIR}/src/main.o: nbproject/Makefile-${CND_CONF}.mk src/main.c
|
||||||
${MKDIR} -p ${OBJECTDIR}/src
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
|
|||||||
@@ -11,9 +11,7 @@
|
|||||||
<itemPath>src/http.h</itemPath>
|
<itemPath>src/http.h</itemPath>
|
||||||
<itemPath>lib/http_parser.h</itemPath>
|
<itemPath>lib/http_parser.h</itemPath>
|
||||||
<itemPath>lib/ini.h</itemPath>
|
<itemPath>lib/ini.h</itemPath>
|
||||||
<itemPath>src/khttp.h</itemPath>
|
|
||||||
<itemPath>src/log.h</itemPath>
|
<itemPath>src/log.h</itemPath>
|
||||||
<itemPath>src/main-loop.h</itemPath>
|
|
||||||
<itemPath>src/main.h</itemPath>
|
<itemPath>src/main.h</itemPath>
|
||||||
<itemPath>src/mime.h</itemPath>
|
<itemPath>src/mime.h</itemPath>
|
||||||
<itemPath>src/queue.h</itemPath>
|
<itemPath>src/queue.h</itemPath>
|
||||||
@@ -42,7 +40,6 @@
|
|||||||
<itemPath>lib/http_parser.c</itemPath>
|
<itemPath>lib/http_parser.c</itemPath>
|
||||||
<itemPath>lib/ini.c</itemPath>
|
<itemPath>lib/ini.c</itemPath>
|
||||||
<itemPath>src/log.c</itemPath>
|
<itemPath>src/log.c</itemPath>
|
||||||
<itemPath>src/main-loop.c</itemPath>
|
|
||||||
<itemPath>src/main.c</itemPath>
|
<itemPath>src/main.c</itemPath>
|
||||||
<itemPath>src/mime.c</itemPath>
|
<itemPath>src/mime.c</itemPath>
|
||||||
<itemPath>src/queue.c</itemPath>
|
<itemPath>src/queue.c</itemPath>
|
||||||
@@ -140,16 +137,10 @@
|
|||||||
</item>
|
</item>
|
||||||
<item path="src/http.h" ex="false" tool="3" flavor2="0">
|
<item path="src/http.h" ex="false" tool="3" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/khttp.h" ex="false" tool="3" flavor2="0">
|
|
||||||
</item>
|
|
||||||
<item path="src/log.c" ex="false" tool="0" flavor2="0">
|
<item path="src/log.c" ex="false" tool="0" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/log.h" ex="false" tool="3" flavor2="0">
|
<item path="src/log.h" ex="false" tool="3" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/main-loop.c" ex="false" tool="0" flavor2="0">
|
|
||||||
</item>
|
|
||||||
<item path="src/main-loop.h" ex="false" tool="3" flavor2="0">
|
|
||||||
</item>
|
|
||||||
<item path="src/main.c" ex="false" tool="0" flavor2="0">
|
<item path="src/main.c" ex="false" tool="0" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/main.h" ex="false" tool="3" flavor2="0">
|
<item path="src/main.h" ex="false" tool="3" flavor2="0">
|
||||||
@@ -268,16 +259,10 @@
|
|||||||
</item>
|
</item>
|
||||||
<item path="src/http.h" ex="false" tool="3" flavor2="0">
|
<item path="src/http.h" ex="false" tool="3" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/khttp.h" ex="false" tool="3" flavor2="0">
|
|
||||||
</item>
|
|
||||||
<item path="src/log.c" ex="false" tool="0" flavor2="0">
|
<item path="src/log.c" ex="false" tool="0" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/log.h" ex="false" tool="3" flavor2="0">
|
<item path="src/log.h" ex="false" tool="3" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/main-loop.c" ex="false" tool="0" flavor2="0">
|
|
||||||
</item>
|
|
||||||
<item path="src/main-loop.h" ex="false" tool="3" flavor2="0">
|
|
||||||
</item>
|
|
||||||
<item path="src/main.c" ex="false" tool="0" flavor2="0">
|
<item path="src/main.c" ex="false" tool="0" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/main.h" ex="false" tool="3" flavor2="0">
|
<item path="src/main.h" ex="false" tool="3" flavor2="0">
|
||||||
|
|||||||
@@ -36,7 +36,7 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n)
|
|||||||
BUFFER_LIST_WR_LOCK(list);
|
BUFFER_LIST_WR_LOCK(list);
|
||||||
|
|
||||||
int blocks = 1;
|
int blocks = 1;
|
||||||
data_buffer *newbuf=NULL;
|
data_buffer *newbuf = data_buffer_new(DATA_BUFFER_SIZE);
|
||||||
while(blocks * DATA_BUFFER_SIZE < n) {
|
while(blocks * DATA_BUFFER_SIZE < n) {
|
||||||
blocks++;
|
blocks++;
|
||||||
LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE));
|
LL_PREPEND(newbuf, data_buffer_new(DATA_BUFFER_SIZE));
|
||||||
@@ -54,8 +54,8 @@ void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n)
|
|||||||
elem->wOffset += copy_count;
|
elem->wOffset += copy_count;
|
||||||
}
|
}
|
||||||
|
|
||||||
LL_CONCAT(list->first, elem);
|
LL_CONCAT(list->first, newbuf);
|
||||||
BUFFER_LIST_WR_LOCK(list);
|
BUFFER_LIST_WR_UNLOCK(list);
|
||||||
}
|
}
|
||||||
void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) {
|
void data_buffer_list_lock(data_buffer_list *list, bool rd, bool wr) {
|
||||||
assert(list != NULL);
|
assert(list != NULL);
|
||||||
|
|||||||
@@ -15,18 +15,13 @@
|
|||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
#define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, false, true)
|
#define BUFFER_LIST_WR_LOCK(l) data_buffer_list_lock(l, true, true)
|
||||||
#define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, false, true)
|
#define BUFFER_LIST_WR_UNLOCK(l) data_buffer_list_unlock(l, true, true)
|
||||||
#define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false)
|
#define BUFFER_LIST_RD_LOCK(l) data_buffer_list_lock(l, true, false)
|
||||||
#define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false)
|
#define BUFFER_LIST_RD_UNLOCK(l) data_buffer_list_unlock(l, true, false)
|
||||||
|
|
||||||
#define DATA_BUFFER_SIZE 16*1024
|
#define DATA_BUFFER_SIZE 16*1024
|
||||||
|
|
||||||
typedef struct data_buffer_list {
|
|
||||||
struct data_buffer *first;
|
|
||||||
pthread_mutex_t *wrlock, *rdlock;
|
|
||||||
} data_buffer_list;
|
|
||||||
|
|
||||||
typedef struct data_buffer {
|
typedef struct data_buffer {
|
||||||
char* buffer;
|
char* buffer;
|
||||||
size_t size;
|
size_t size;
|
||||||
@@ -34,6 +29,11 @@ extern "C" {
|
|||||||
struct data_buffer *next;
|
struct data_buffer *next;
|
||||||
} data_buffer;
|
} data_buffer;
|
||||||
|
|
||||||
|
typedef struct data_buffer_list {
|
||||||
|
data_buffer *first;
|
||||||
|
pthread_mutex_t *wrlock, *rdlock;
|
||||||
|
} data_buffer_list;
|
||||||
|
|
||||||
data_buffer_list* data_buffer_list_new();
|
data_buffer_list* data_buffer_list_new();
|
||||||
void data_buffer_list_delete(data_buffer_list *list);
|
void data_buffer_list_delete(data_buffer_list *list);
|
||||||
void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n);
|
void data_buffer_list_append(data_buffer_list *list, const char* src, size_t n);
|
||||||
|
|||||||
292
src/main-loop.c
292
src/main-loop.c
@@ -1,292 +0,0 @@
|
|||||||
#include <stdlib.h>
|
|
||||||
#include <stdio.h>
|
|
||||||
#include <sys/epoll.h>
|
|
||||||
#include <stdbool.h>
|
|
||||||
#include <time.h>
|
|
||||||
|
|
||||||
#include <ut/utlist.h>
|
|
||||||
#include <errno.h>
|
|
||||||
#include <unistd.h>
|
|
||||||
|
|
||||||
#include "mime.h"
|
|
||||||
#include "log.h"
|
|
||||||
#include "socket.h"
|
|
||||||
#include "thread-pool.h"
|
|
||||||
#include "http_parser.h"
|
|
||||||
#include "http.h"
|
|
||||||
#include "http-reader.h"
|
|
||||||
#include "server-socket.h"
|
|
||||||
#include "util.h"
|
|
||||||
#include "config.h"
|
|
||||||
#include "main-loop.h"
|
|
||||||
|
|
||||||
hmain_connection* hmain_connection_new(int fd, hmain_status *status) {
|
|
||||||
static u_int64_t nextid = 1;
|
|
||||||
|
|
||||||
hmain_connection *conn = calloc(1, sizeof(hmain_connection));
|
|
||||||
conn->cid = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
|
|
||||||
conn->fd = fd;
|
|
||||||
conn->status = status;
|
|
||||||
conn->opened = time(NULL);
|
|
||||||
conn->last_activity = conn->opened;
|
|
||||||
conn->pending_responses = http_response_list_new();
|
|
||||||
conn->pending_write = data_buffer_list_new();
|
|
||||||
conn->mutex = calloc(1, sizeof(pthread_mutex_t));
|
|
||||||
pthread_mutex_init(conn->mutex, NULL);
|
|
||||||
|
|
||||||
return conn;
|
|
||||||
}
|
|
||||||
void hmain_connection_close(hmain_connection *conn) {
|
|
||||||
close(conn->fd);
|
|
||||||
if (conn->pending_write != NULL) {
|
|
||||||
data_buffer_list_delete(conn->pending_write);
|
|
||||||
}
|
|
||||||
//TODO: remove from all queues
|
|
||||||
}
|
|
||||||
void hmain_connection_delete(hmain_connection *conn) {
|
|
||||||
LL_DELETE(conn->status->connections, conn);
|
|
||||||
http_response_list_delete(conn->pending_responses);
|
|
||||||
if (conn->pending_write != NULL) {
|
|
||||||
data_buffer_list_delete(conn->pending_write);
|
|
||||||
}
|
|
||||||
pthread_mutex_destroy(conn->mutex);
|
|
||||||
free(conn->mutex);
|
|
||||||
free(conn->clientaddr);
|
|
||||||
free(conn);
|
|
||||||
}
|
|
||||||
|
|
||||||
#define EVENT_IS(event, type) ((event->events & type) == type)
|
|
||||||
#define EVENT_ISNOT(event, type) (!EVENT_IS(event, type))
|
|
||||||
|
|
||||||
void hmain_setup(hmain_status *status) {
|
|
||||||
status->shutdown = false;
|
|
||||||
|
|
||||||
//Start Logging
|
|
||||||
log_register_add(log_new("stderr", stderr), true, LALL & ~(LINFO|LDEBUG));
|
|
||||||
log_register_add(log_new("stdout", stdout), false, LDEBUG | LINFO);
|
|
||||||
|
|
||||||
//Load mime types
|
|
||||||
mime_init(NULL);
|
|
||||||
|
|
||||||
//Load the config
|
|
||||||
config_server *config = config_server_new();
|
|
||||||
if (config_read_ini("khttpd.ini", config) < 0) {
|
|
||||||
fatal("Could not read config");
|
|
||||||
}
|
|
||||||
status->config = config;
|
|
||||||
|
|
||||||
//Open our listening socket
|
|
||||||
status->sfd = server_socket_create();
|
|
||||||
server_socket_listen(status->sfd, status->config->listen_port);
|
|
||||||
|
|
||||||
//Open epoll socket
|
|
||||||
status->epollfd = epoll_create1(0);
|
|
||||||
if (status->epollfd < 0) {
|
|
||||||
fatal("Failed to create epollfd");
|
|
||||||
}
|
|
||||||
|
|
||||||
//Register socket with epoll
|
|
||||||
struct epoll_event svr_event;
|
|
||||||
svr_event.data.fd = status->sfd;
|
|
||||||
svr_event.events = EPOLLIN | EPOLLET;
|
|
||||||
if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, status->sfd, &svr_event) < 0) {
|
|
||||||
fatal("Could not register server socket with epoll");
|
|
||||||
}
|
|
||||||
|
|
||||||
//Create thread pools/queues
|
|
||||||
thread_pool *pool = thread_pool_new("read", queue_new());
|
|
||||||
pool->min_threads = 1;
|
|
||||||
pool->max_threads = 2;
|
|
||||||
pool->func = thloop_read;
|
|
||||||
thread_pool_start(pool);
|
|
||||||
|
|
||||||
pool = thread_pool_new("write", queue_new());
|
|
||||||
pool->min_threads = 1;
|
|
||||||
pool->max_threads = 2;
|
|
||||||
pool->func = thloop_write;
|
|
||||||
thread_pool_start(pool);
|
|
||||||
|
|
||||||
pool = thread_pool_new("disk_read", queue_new());
|
|
||||||
pool->min_threads = 1;
|
|
||||||
pool->max_threads = 2;
|
|
||||||
pool->func = thloop_disk_read;
|
|
||||||
thread_pool_start(pool);
|
|
||||||
|
|
||||||
pool = thread_pool_new("worker", queue_new());
|
|
||||||
pool->min_threads = 1;
|
|
||||||
pool->max_threads = 5;
|
|
||||||
pool->func = thloop_worker;
|
|
||||||
thread_pool_start(pool);
|
|
||||||
|
|
||||||
}
|
|
||||||
void hmain_teardown(hmain_status *status) {
|
|
||||||
assert(status!=NULL);
|
|
||||||
|
|
||||||
//Stop pools
|
|
||||||
size_t pool_count = sizeof(status->pools) / sizeof(status->pools[0]);
|
|
||||||
for(int i=0; i<pool_count; i++) {
|
|
||||||
thread_pool_stop(status->pools[i]);
|
|
||||||
queue_delete(status->pools[i]->queue);
|
|
||||||
thread_pool_delete(status->pools[i]);
|
|
||||||
}
|
|
||||||
|
|
||||||
//Close all connections
|
|
||||||
hmain_connection *elem;
|
|
||||||
LL_FOREACH(status->connections, elem) {
|
|
||||||
hmain_connection_close(elem);
|
|
||||||
}
|
|
||||||
|
|
||||||
//Close epoll
|
|
||||||
close(status->epollfd);
|
|
||||||
|
|
||||||
//Close the listening socket
|
|
||||||
server_socket_release(status->sfd);
|
|
||||||
|
|
||||||
//Cleanup the mime detector
|
|
||||||
mime_destroy();
|
|
||||||
|
|
||||||
//Stop and remove all loggers
|
|
||||||
log_register_clear();
|
|
||||||
|
|
||||||
//Delete the config and the status
|
|
||||||
config_server_delete(status->config);
|
|
||||||
free(status);
|
|
||||||
}
|
|
||||||
void hmain_loop(hmain_status *status) {
|
|
||||||
struct epoll_event *current_event;
|
|
||||||
struct epoll_event *events = calloc(EPOLL_MAXEVENTS, sizeof(struct epoll_event));
|
|
||||||
|
|
||||||
struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in));
|
|
||||||
socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in);
|
|
||||||
|
|
||||||
while(status->shutdown == false) {
|
|
||||||
int event_count = epoll_wait(status->epollfd, events, EPOLL_MAXEVENTS, 2000);
|
|
||||||
for(int i=0; i<event_count; i++) {
|
|
||||||
current_event = &events[i];
|
|
||||||
|
|
||||||
if (EVENT_IS(current_event, EPOLLERR) ||
|
|
||||||
EVENT_IS(current_event, EPOLLHUP) ||
|
|
||||||
EVENT_ISNOT(current_event, EPOLLIN | EPOLLOUT)) {
|
|
||||||
LOG(LERROR, "epoll; unexpected error or event");
|
|
||||||
//TODO: close socket & cleanup
|
|
||||||
|
|
||||||
} else if (EVENT_IS(current_event, EPOLLRDHUP)) {
|
|
||||||
LOG(LINFO, "connection closed");
|
|
||||||
hmain_connection *conn = (hmain_connection*)current_event->data.ptr;
|
|
||||||
hmain_connection_close(conn);
|
|
||||||
hmain_connection_delete(conn);
|
|
||||||
//TODO: close socket & cleanup
|
|
||||||
} else if (EVENT_IS(current_event, EPOLLIN)) {
|
|
||||||
if (current_event->data.fd == status->sfd) {
|
|
||||||
//New connection(s)
|
|
||||||
while(true) {
|
|
||||||
struct epoll_event new_event;
|
|
||||||
int clientfd = accept4(status->sfd, (struct sockaddr*)clientaddr, &clientaddr_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
|
||||||
if (clientfd < 0) {
|
|
||||||
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
|
||||||
//Done with new connections
|
|
||||||
break;
|
|
||||||
} else {
|
|
||||||
warning(true, "Error accepting new connection");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
hmain_connection *conn = hmain_connection_new(clientfd, status);
|
|
||||||
conn->clientaddr = calloc(1, sizeof(struct sockaddr_in));
|
|
||||||
memcpy(conn->clientaddr, clientaddr, clientaddr_len);
|
|
||||||
new_event.data.fd = clientfd;
|
|
||||||
new_event.data.ptr = conn;
|
|
||||||
new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP;
|
|
||||||
if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, clientfd, &new_event) < 0) {
|
|
||||||
fatal("Could not register new connection with epoll");
|
|
||||||
}
|
|
||||||
LL_APPEND(status->connections, conn);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
//Data is ready to read on existing connection
|
|
||||||
hmain_connection *conn = (hmain_connection*)current_event->data.ptr;
|
|
||||||
if (conn->isReading == true) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
queue_add(status->pools[1]->queue, queue_item_new2("READ", (void*)conn));
|
|
||||||
}
|
|
||||||
} else if (EVENT_IS(current_event, EPOLLOUT)) {
|
|
||||||
//Data can be written to connection
|
|
||||||
hmain_connection *conn = (hmain_connection*)current_event->data.ptr;
|
|
||||||
if (conn->isWriting == true || (conn->pending_responses == NULL && conn->pending_write == NULL)) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
queue_add(status->pools[2]->queue, queue_item_new2("WRITE", (void*)conn));
|
|
||||||
}
|
|
||||||
}//for events
|
|
||||||
}//while shutdown == false
|
|
||||||
free(events);
|
|
||||||
free(clientaddr);
|
|
||||||
}
|
|
||||||
|
|
||||||
void* thloop_read(void * arg) {
|
|
||||||
thread* th = (thread*)arg;
|
|
||||||
|
|
||||||
while(th->stop==false) {
|
|
||||||
queue_item *item = queue_fetchone(th->pool->queue, true);
|
|
||||||
if (item == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
hmain_connection *conn = (hmain_connection*)item->data;
|
|
||||||
CONN_LOCK(conn);
|
|
||||||
|
|
||||||
char buffer[16*1024];
|
|
||||||
int count;
|
|
||||||
do {
|
|
||||||
count = read(conn->fd, buffer, sizeof(buffer)/sizeof(buffer[0]));
|
|
||||||
if (count < 0) {
|
|
||||||
if (!(errno == EAGAIN || errno == EWOULDBLOCK)) {
|
|
||||||
//Socket error
|
|
||||||
warning(true, "failed to read from connection #%lu", conn->cid);
|
|
||||||
}
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
if (count == 0) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
LOG(LDEBUG, "Read %zu", count);
|
|
||||||
|
|
||||||
//conn->pending_write = data_pool_appendbuffer(conn->pending_write, conn->status->buffer_pool, buffer, count);
|
|
||||||
|
|
||||||
queue_add(conn->status->pools[2]->queue, queue_item_new2("WRITE", (void*)conn));
|
|
||||||
/*if (conn->parse_data == NULL) {
|
|
||||||
conn->parse_data = calloc(1, sizeof(hmain_parse_data));
|
|
||||||
conn->parse_data->parser = calloc(1, sizeof(http_parser));
|
|
||||||
http_parser_init(conn->parse_data->parser, HTTP_REQUEST);
|
|
||||||
conn->parse_data->parser->data = conn->parse_data;
|
|
||||||
conn->parse_data->parser_header_state = HSTATE_NONE;
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
} while(count > 0);
|
|
||||||
CONN_UNLOCK(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void* thloop_write(void * arg) {
|
|
||||||
thread* th = (thread*)arg;
|
|
||||||
|
|
||||||
while(th->stop==false) {
|
|
||||||
queue_item *item = queue_fetchone(th->pool->queue, true);
|
|
||||||
if (item == NULL) {
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
hmain_connection *conn = (hmain_connection*)item->data;
|
|
||||||
do {
|
|
||||||
CONN_LOCK(conn);
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
}while(0);
|
|
||||||
CONN_UNLOCK(conn);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void* thloop_disk_read(void * arg){
|
|
||||||
|
|
||||||
}
|
|
||||||
void* thloop_worker(void * arg) {
|
|
||||||
|
|
||||||
}
|
|
||||||
@@ -1,81 +0,0 @@
|
|||||||
/*
|
|
||||||
* File: main-loop.h
|
|
||||||
* Author: sam
|
|
||||||
*
|
|
||||||
* Created on 07 August 2014, 18:44
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef MAIN_LOOP_H
|
|
||||||
#define MAIN_LOOP_H
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#include <stdbool.h>
|
|
||||||
#include <time.h>
|
|
||||||
#include <pthread.h>
|
|
||||||
|
|
||||||
#define EPOLL_MAXEVENTS 128
|
|
||||||
|
|
||||||
#define CONN_LOCK(c) pthread_mutex_lock(c->mutex)
|
|
||||||
#define CONN_UNLOCK(c) pthread_mutex_unlock(c->mutex)
|
|
||||||
|
|
||||||
typedef enum hmain_pool {
|
|
||||||
POOL_READA, POOL_WRITEA, POOL_WORKERSA, POOL_DISK_READA
|
|
||||||
} hmain_pool;
|
|
||||||
|
|
||||||
//typedef enum skt_elem_hstate {HSTATE_NONE, HSTATE_VALUE, HSTATE_FIELD} skt_elem_hstate;
|
|
||||||
|
|
||||||
typedef struct hmain_connection {
|
|
||||||
uint64_t cid;
|
|
||||||
struct hmain_status *status;
|
|
||||||
int fd;
|
|
||||||
struct sockaddr_in* clientaddr;
|
|
||||||
bool isReading, isWriting;
|
|
||||||
time_t opened;
|
|
||||||
time_t last_activity;
|
|
||||||
struct hmain_parse_data *parse_data;
|
|
||||||
http_response_list *pending_responses;
|
|
||||||
data_buffer_list *pending_write;
|
|
||||||
struct hmain_connection *next;
|
|
||||||
pthread_mutex_t *mutex;
|
|
||||||
|
|
||||||
} hmain_connection;
|
|
||||||
|
|
||||||
typedef struct hmain_status {
|
|
||||||
config_server *config;
|
|
||||||
int sfd;
|
|
||||||
int epollfd;
|
|
||||||
thread_pool *pools[4];
|
|
||||||
bool shutdown;
|
|
||||||
hmain_connection *connections;
|
|
||||||
} hmain_status;
|
|
||||||
|
|
||||||
typedef struct hmain_parse_data {
|
|
||||||
http_request *current_request;
|
|
||||||
bool request_complete;
|
|
||||||
http_parser *parser;
|
|
||||||
http_header *parser_current_header;
|
|
||||||
int parser_header_state;
|
|
||||||
} hmain_parse_data;
|
|
||||||
|
|
||||||
hmain_connection* hmain_connection_new(int fd, hmain_status *status);
|
|
||||||
void hmain_connection_close(hmain_connection *conn);
|
|
||||||
void hmain_connection_delete(hmain_connection *conn);
|
|
||||||
|
|
||||||
void hmain_setup(hmain_status *status);
|
|
||||||
void hmain_teardown(hmain_status *status);
|
|
||||||
void hmain_loop(hmain_status *status);
|
|
||||||
|
|
||||||
void* thloop_read(void * arg);
|
|
||||||
void* thloop_write(void * arg);
|
|
||||||
void* thloop_disk_read(void * arg);
|
|
||||||
void* thloop_worker(void * arg);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* MAIN_LOOP_H */
|
|
||||||
|
|
||||||
13
src/main.c
13
src/main.c
@@ -17,10 +17,23 @@
|
|||||||
#include <signal.h>
|
#include <signal.h>
|
||||||
#include <bits/stdio2.h>
|
#include <bits/stdio2.h>
|
||||||
|
|
||||||
|
#include "util.h"
|
||||||
#include "main.h"
|
#include "main.h"
|
||||||
|
#include "server.h"
|
||||||
|
#include "server-state.h"
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
|
||||||
|
//Load the config
|
||||||
|
config_server *config = config_server_new();
|
||||||
|
if (config_read_ini(DEFAULT_CONFIG_FILE, config) < 0) {
|
||||||
|
fatal("Could not read config");
|
||||||
|
}
|
||||||
|
|
||||||
|
server_state *state = server_status_new(config);
|
||||||
|
|
||||||
|
//Run the server
|
||||||
|
server_start(state);
|
||||||
|
|
||||||
return (EXIT_SUCCESS);
|
return (EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
19
src/queue.c
19
src/queue.c
@@ -156,6 +156,25 @@ void queue_unblock(queue *q, uint64_t itemid) {
|
|||||||
}
|
}
|
||||||
QUEUE_UNLOCK(q);
|
QUEUE_UNLOCK(q);
|
||||||
}
|
}
|
||||||
|
size_t queue_unblock_byptr(queue *q, void* ptr) {
|
||||||
|
assert(q!=NULL);
|
||||||
|
|
||||||
|
size_t count = 0;
|
||||||
|
uint64_t itemid = 0;
|
||||||
|
queue_item *elem;
|
||||||
|
QUEUE_LOCK(q);
|
||||||
|
LL_FOREACH(q->list, elem) {
|
||||||
|
if (elem->data == ptr) {
|
||||||
|
elem->blocked = false;
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count > 0) {
|
||||||
|
pthread_cond_signal(q->cond);
|
||||||
|
}
|
||||||
|
QUEUE_UNLOCK(q);
|
||||||
|
return count;
|
||||||
|
}
|
||||||
void queue_clear(queue *q) {
|
void queue_clear(queue *q) {
|
||||||
assert(q!=NULL);
|
assert(q!=NULL);
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
|
|||||||
@@ -93,6 +93,7 @@ extern "C" {
|
|||||||
int queue_remove_byptr(queue *q, void* ptr);
|
int queue_remove_byptr(queue *q, void* ptr);
|
||||||
queue_item* queue_fetchone(queue *q, bool blocking);
|
queue_item* queue_fetchone(queue *q, bool blocking);
|
||||||
void queue_unblock(queue *q, uint64_t itemid);
|
void queue_unblock(queue *q, uint64_t itemid);
|
||||||
|
size_t queue_unblock_byptr(queue *q, void* ptr);
|
||||||
void queue_clear(queue *q);
|
void queue_clear(queue *q);
|
||||||
void queue_ping(queue *q);
|
void queue_ping(queue *q);
|
||||||
size_t queue_count(queue *q);
|
size_t queue_count(queue *q);
|
||||||
|
|||||||
@@ -7,20 +7,23 @@
|
|||||||
|
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
#include "socket.h"
|
#include "socket.h"
|
||||||
|
#include "server-state.h"
|
||||||
#include "server-connection.h"
|
#include "server-connection.h"
|
||||||
|
|
||||||
server_connection* server_connection_new(socket_info *skt) {
|
server_connection* server_connection_new(socket_info *skt, server_state *state) {
|
||||||
static uint64_t nextid = 1;
|
static uint64_t nextid = 1;
|
||||||
assert(skt!=NULL);
|
assert(skt!=NULL);
|
||||||
|
assert(state!=NULL);
|
||||||
|
|
||||||
server_connection *conn = calloc(1, sizeof(server_connection));
|
server_connection *conn = calloc(1, sizeof(server_connection));
|
||||||
conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
|
conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
|
||||||
conn->last_activity = time(NULL);
|
conn->last_activity = time(NULL);
|
||||||
|
conn->server = state;
|
||||||
conn->parse_state = NULL;
|
conn->parse_state = NULL;
|
||||||
conn->pending_responses = http_response_list_new();
|
conn->pending_responses = http_response_list_new();
|
||||||
conn->pending_writes = data_buffer_list_new();
|
conn->pending_writes = data_buffer_list_new();
|
||||||
conn->skt = skt;
|
conn->skt = skt;
|
||||||
conn->write_qid = 0;
|
pthread_mutex_init(&conn->mutex, NULL);
|
||||||
return conn;
|
return conn;
|
||||||
}
|
}
|
||||||
void server_connection_delete(server_connection *conn) {
|
void server_connection_delete(server_connection *conn) {
|
||||||
@@ -29,5 +32,6 @@ void server_connection_delete(server_connection *conn) {
|
|||||||
http_response_list_delete(conn->pending_responses);
|
http_response_list_delete(conn->pending_responses);
|
||||||
data_buffer_list_delete(conn->pending_writes);
|
data_buffer_list_delete(conn->pending_writes);
|
||||||
skt_delete(conn->skt);
|
skt_delete(conn->skt);
|
||||||
|
pthread_mutex_destroy(&conn->mutex);
|
||||||
free(conn);
|
free(conn);
|
||||||
}
|
}
|
||||||
@@ -14,12 +14,17 @@ extern "C" {
|
|||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
|
#include <pthread.h>
|
||||||
|
|
||||||
#include "data-buffer.h"
|
#include "data-buffer.h"
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
#include "http-reader.h"
|
#include "http-reader.h"
|
||||||
|
#include "server-state.h"
|
||||||
#include "socket.h"
|
#include "socket.h"
|
||||||
|
|
||||||
|
#define CONN_LOCK(c) pthread_mutex_lock(&c->mutex)
|
||||||
|
#define CONN_UNLOCK(c) pthread_mutex_unlock(&c->mutex)
|
||||||
|
|
||||||
typedef struct server_parse_status {
|
typedef struct server_parse_status {
|
||||||
http_request *current_request;
|
http_request *current_request;
|
||||||
bool request_complete;
|
bool request_complete;
|
||||||
@@ -32,14 +37,15 @@ extern "C" {
|
|||||||
uint64_t id;
|
uint64_t id;
|
||||||
struct socket_info *skt;
|
struct socket_info *skt;
|
||||||
time_t last_activity;
|
time_t last_activity;
|
||||||
|
server_state *server;
|
||||||
http_response_list *pending_responses;
|
http_response_list *pending_responses;
|
||||||
data_buffer_list *pending_writes;
|
data_buffer_list *pending_writes;
|
||||||
uint64_t write_qid;//item id in write queue
|
|
||||||
server_parse_status *parse_state;
|
server_parse_status *parse_state;
|
||||||
struct server_connection *next;
|
struct server_connection *next;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
} server_connection;
|
} server_connection;
|
||||||
|
|
||||||
server_connection* server_connection_new(socket_info *skt);
|
server_connection* server_connection_new(socket_info *skt, server_state *state);
|
||||||
void server_connection_delete(server_connection *conn);
|
void server_connection_delete(server_connection *conn);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|||||||
@@ -1,15 +1,57 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
#include "data-buffer.h"
|
||||||
|
#include "queue.h"
|
||||||
|
|
||||||
#include "server-connection.h"
|
#include "server-connection.h"
|
||||||
#include "server-state.h"
|
#include "server-state.h"
|
||||||
#include "server-loop.h"
|
#include "server-loop.h"
|
||||||
|
|
||||||
void* server_loop_read(void* arg) {
|
void* server_loop_read(void* arg) {
|
||||||
|
thread *th = (thread*)arg;
|
||||||
|
|
||||||
|
while(th->stop == false) {
|
||||||
|
queue_item *item = queue_fetchone(th->pool->queue, true);
|
||||||
|
if (item == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
server_connection *conn = (server_connection*)item->data;
|
||||||
|
CONN_LOCK(conn);
|
||||||
|
ssize_t count=0, totalread=0;
|
||||||
|
char readbuf[16*1024];
|
||||||
|
while(true) {
|
||||||
|
count = read(conn->skt->fd, readbuf, sizeof(readbuf)/sizeof(readbuf[0]));
|
||||||
|
if (count < 0) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
item->blocked = true;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (errno != EBADF) {
|
||||||
|
char address[INET_ADDRSTRLEN];
|
||||||
|
skt_clientaddr(conn->skt, address, INET_ADDRSTRLEN);
|
||||||
|
warning(true, "[#%lu %s] read error", conn->id, address);
|
||||||
|
}
|
||||||
|
conn->skt->error = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (count == 0) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
totalread += count;
|
||||||
|
data_buffer_list_append(conn->pending_writes, readbuf, count);
|
||||||
|
}
|
||||||
|
if (totalread > 0) {
|
||||||
|
queue_add(conn->server->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", conn));
|
||||||
|
}
|
||||||
|
CONN_UNLOCK(conn);
|
||||||
|
queue_return_item(th->pool->queue, item, item->blocked == false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -1,15 +1,62 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
|
#include <unistd.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
|
#include "ut/utlist.h"
|
||||||
|
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "log.h"
|
#include "log.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
|
#include "data-buffer.h"
|
||||||
|
|
||||||
#include "server-connection.h"
|
#include "server-connection.h"
|
||||||
#include "server-state.h"
|
#include "server-state.h"
|
||||||
#include "server-loop.h"
|
#include "server-loop.h"
|
||||||
|
|
||||||
void* server_loop_write(void* arg) {
|
void* server_loop_write(void* arg) {
|
||||||
|
thread *th = (thread*)arg;
|
||||||
|
|
||||||
|
while(th->stop == false) {
|
||||||
|
queue_item *item = queue_fetchone(th->pool->queue, true);
|
||||||
|
if (item == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
server_connection *conn = (server_connection*)item->data;
|
||||||
|
CONN_LOCK(conn);
|
||||||
|
|
||||||
|
size_t count = 0;
|
||||||
|
while(conn->pending_writes->first != NULL) {
|
||||||
|
BUFFER_LIST_RD_LOCK(conn->pending_writes);
|
||||||
|
data_buffer *next = conn->pending_writes->first;
|
||||||
|
|
||||||
|
count = write(conn->skt->fd, next->buffer+next->rOffset, next->wOffset - next->rOffset);
|
||||||
|
if (count < 0) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
item->blocked = true;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
if (errno != EBADF) {
|
||||||
|
char address[INET_ADDRSTRLEN];
|
||||||
|
skt_clientaddr(conn->skt, address, INET_ADDRSTRLEN);
|
||||||
|
warning(true, "[#%lu %s] write error", conn->id, address);
|
||||||
|
}
|
||||||
|
conn->skt->error = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
next->rOffset += count;
|
||||||
|
|
||||||
|
if (next->rOffset >= next->wOffset) {
|
||||||
|
LL_DELETE(conn->pending_writes->first, next);
|
||||||
|
data_buffer_free(next);
|
||||||
|
}
|
||||||
|
|
||||||
|
BUFFER_LIST_RD_UNLOCK(conn->pending_writes);
|
||||||
|
}
|
||||||
|
|
||||||
|
CONN_UNLOCK(conn);
|
||||||
|
queue_return_item(th->pool->queue, item, item->blocked == false);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
@@ -15,29 +15,41 @@
|
|||||||
#include "ut/utlist.h"
|
#include "ut/utlist.h"
|
||||||
#include "server-socket.h"
|
#include "server-socket.h"
|
||||||
|
|
||||||
void server_loop(server_status *state) {
|
void server_loop(server_state *state) {
|
||||||
assert(state!=NULL);
|
assert(state!=NULL);
|
||||||
assert(state->started==true);
|
assert(state->started==true);
|
||||||
|
|
||||||
struct epoll_event *current_event;
|
struct epoll_event *current_event;
|
||||||
struct epoll_event *events = alloca(sizeof(struct epoll_event)*EP_MAXEVENTS);
|
struct epoll_event *events = alloca(sizeof(struct epoll_event)*EP_MAXEVENTS);
|
||||||
struct server_connection *conn;
|
server_connection *conn;
|
||||||
|
|
||||||
while(state->shutdown_requested == false) {
|
while(state->shutdown_requested == false) {
|
||||||
int event_count = epoll_wait(state->epollfd, events, EP_MAXEVENTS, EP_WAIT_TIME);
|
int event_count = epoll_wait(state->epollfd, events, EP_MAXEVENTS, EP_WAIT_TIME);
|
||||||
for(int i=0; i<event_count; i++) {
|
for(int i=0; i<event_count; i++) {
|
||||||
current_event = &events[i];
|
current_event = &events[i];
|
||||||
conn = current_event->data.ptr != NULL ? EP_CONN(current_event) : NULL;
|
|
||||||
|
|
||||||
if (EP_EVENT_IS (current_event, EPOLLERR | EPOLLHUP) ||
|
conn = NULL;
|
||||||
EP_EVENT_ISNOT(current_event, EPOLLIN | EPOLLOUT | EPOLLRDHUP)) {
|
server_connection *conn_elem;
|
||||||
if (conn == NULL) {
|
LL_FOREACH(state->clients, conn_elem) {
|
||||||
fatal("Unexpected error on unknown socket");
|
if (conn_elem == current_event->data.ptr) {
|
||||||
} else if (conn->skt->fd == state->sfd) {
|
conn = EP_CONN(current_event);
|
||||||
LOG(LWARNING, "Error/Unexpected event on server socket");
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (EP_EVENT_IS(current_event, EPOLLERR) ||
|
||||||
|
EP_EVENT_IS(current_event, EPOLLERR) ||
|
||||||
|
(
|
||||||
|
EP_EVENT_ISNOT(current_event, EPOLLIN) &&
|
||||||
|
EP_EVENT_ISNOT(current_event, EPOLLOUT) &&
|
||||||
|
EP_EVENT_ISNOT(current_event, EPOLLRDHUP)
|
||||||
|
)
|
||||||
|
) {
|
||||||
|
if (current_event->data.fd == state->sfd) {
|
||||||
|
LOG(LERROR, "Error/Unexpected event on server socket");
|
||||||
state->shutdown_requested = true;
|
state->shutdown_requested = true;
|
||||||
break;
|
break;
|
||||||
} else {
|
} else {
|
||||||
|
assert(conn!=NULL);
|
||||||
char* conn_addr = calloc(INET_ADDRSTRLEN, sizeof(char));
|
char* conn_addr = calloc(INET_ADDRSTRLEN, sizeof(char));
|
||||||
skt_clientaddr(conn->skt, conn_addr, INET_ADDRSTRLEN);
|
skt_clientaddr(conn->skt, conn_addr, INET_ADDRSTRLEN);
|
||||||
LOG(LWARNING, "Error on socket %lu [%s]", conn->id, conn_addr);
|
LOG(LWARNING, "Error on socket %lu [%s]", conn->id, conn_addr);
|
||||||
@@ -48,39 +60,48 @@ void server_loop(server_status *state) {
|
|||||||
assert(conn != NULL);
|
assert(conn != NULL);
|
||||||
skt_close(conn->skt);
|
skt_close(conn->skt);
|
||||||
} else if (EP_EVENT_IS(current_event, EPOLLIN)) {
|
} else if (EP_EVENT_IS(current_event, EPOLLIN)) {
|
||||||
assert(conn != NULL);
|
if (current_event->data.fd == state->sfd) {
|
||||||
if (conn->skt->fd == state->sfd) {
|
|
||||||
//New connections
|
//New connections
|
||||||
while (true) {
|
while (true) {
|
||||||
socket_info *skt = server_socket_accept(state->sfd, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
socket_info *skt = server_socket_accept(state->sfd, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||||
if (skt == NULL) {
|
if (skt == NULL) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
server_connection *client = server_connection_new(skt);
|
server_connection *client = server_connection_new(skt, state);
|
||||||
|
|
||||||
struct epoll_event new_event;
|
struct epoll_event new_event;
|
||||||
new_event.data.ptr = client;
|
new_event.data.ptr = client;
|
||||||
new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
|
new_event.events = EPOLLIN | EPOLLOUT | EPOLLRDHUP | EPOLLET;
|
||||||
if (epoll_ctl(state->epollfd, EPOLL_CTL_ADD, client->skt->fd, &new_event) < 0) {
|
if (epoll_ctl(state->epollfd, EPOLL_CTL_ADD, skt->fd, &new_event) < 0) {
|
||||||
warning(true, "Failed to add connection to epoll");
|
warning(true, "Failed to add connection to epoll");
|
||||||
skt_close(client->skt);
|
skt_close(client->skt);
|
||||||
server_connection_delete(client);
|
server_connection_delete(client);
|
||||||
|
} else {
|
||||||
|
LL_APPEND(state->clients, client);
|
||||||
|
queue_add(state->pools[POOL_READ]->queue, queue_item_new2("READ", (void*)client));
|
||||||
}
|
}
|
||||||
LL_APPEND(state->clients, client);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
assert(conn != NULL);
|
assert(conn != NULL);
|
||||||
//Data available on connection
|
//Data available on connection
|
||||||
queue_add(state->pools[POOL_READ]->queue, (void*)conn);
|
if (queue_unblock_byptr(state->pools[POOL_READ]->queue, (void*)conn) == 0) {
|
||||||
|
queue_add(state->pools[POOL_READ]->queue, queue_item_new2("READ", (void*)conn));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
} else if (EP_EVENT_IS(current_event, EPOLLOUT)) {
|
} else if (EP_EVENT_IS(current_event, EPOLLOUT)) {
|
||||||
//Connection is now available to write to
|
//Connection is now available to write to
|
||||||
queue_add(state->pools[POOL_WRITE]->queue, (void*)conn);
|
if (queue_unblock_byptr(state->pools[POOL_READ]->queue, (void*)conn) == 0) {
|
||||||
|
queue_add(state->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", (void*)conn));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//Clean up closed connections
|
//Clean up closed connections
|
||||||
server_connection *elem, *tmp;
|
server_connection *elem, *tmp;
|
||||||
LL_FOREACH(state->clients, elem) {
|
LL_FOREACH_SAFE(state->clients, elem, tmp) {
|
||||||
|
if (elem->skt->error == true) {
|
||||||
|
skt_close(elem->skt);
|
||||||
|
}
|
||||||
if (elem->skt->closed != true) {
|
if (elem->skt->closed != true) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@@ -90,6 +111,10 @@ void server_loop(server_status *state) {
|
|||||||
for(int i=0; i < THREADPOOL_NUM; i++) {
|
for(int i=0; i < THREADPOOL_NUM; i++) {
|
||||||
queue_pending_waitptr(state->pools[i]->queue, elem);
|
queue_pending_waitptr(state->pools[i]->queue, elem);
|
||||||
}
|
}
|
||||||
|
LL_DELETE(state->clients, elem);
|
||||||
|
char address[INET_ADDRSTRLEN] = {0};
|
||||||
|
skt_clientaddr(elem->skt, address, INET_ADDRSTRLEN);
|
||||||
|
LOG(LINFO, "[#%lu %s] Connection Closed", elem->id, address);
|
||||||
server_connection_delete(elem);
|
server_connection_delete(elem);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -18,10 +18,10 @@ extern "C" {
|
|||||||
#define EP_WAIT_TIME 2000
|
#define EP_WAIT_TIME 2000
|
||||||
|
|
||||||
#define EP_CONN(event) (server_connection*)event->data.ptr
|
#define EP_CONN(event) (server_connection*)event->data.ptr
|
||||||
#define EP_EVENT_IS(event, type) ((event->events & type) != 0)
|
#define EP_EVENT_IS(event, type) ((event->events & type) == type)
|
||||||
#define EP_EVENT_ISNOT(event, type) (!EP_EVENT_IS(event, type))
|
#define EP_EVENT_ISNOT(event, type) (!EP_EVENT_IS(event, type))
|
||||||
|
|
||||||
void server_loop(server_status *state);
|
void server_loop(server_state *state);
|
||||||
void* server_loop_read(void* arg);
|
void* server_loop_read(void* arg);
|
||||||
void* server_loop_write(void* arg);
|
void* server_loop_write(void* arg);
|
||||||
void* server_loop_worker(void* arg);
|
void* server_loop_worker(void* arg);
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ void server_socket_listen_epoll(int fd, uint16_t port, int *out_epfd) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
//Register socket with epoll
|
//Register socket with epoll
|
||||||
struct epoll_event svr_event;
|
struct epoll_event svr_event = {0};
|
||||||
svr_event.data.fd = fd;
|
svr_event.data.fd = fd;
|
||||||
svr_event.events = EPOLLIN | EPOLLET;
|
svr_event.events = EPOLLIN | EPOLLET;
|
||||||
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &svr_event) < 0) {
|
if (epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &svr_event) < 0) {
|
||||||
@@ -78,7 +78,10 @@ socket_info* server_socket_accept(int fd, int flags) {
|
|||||||
int clientfd=0;
|
int clientfd=0;
|
||||||
socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in);
|
socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in);
|
||||||
clientfd = accept4(fd, (struct sockaddr*)clientaddr, &clientaddr_len, flags);
|
clientfd = accept4(fd, (struct sockaddr*)clientaddr, &clientaddr_len, flags);
|
||||||
if (clientfd < 0 && errno != EAGAIN && errno != EWOULDBLOCK) {
|
if (clientfd < 0) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
warning(true, "error accepting connection");
|
warning(true, "error accepting connection");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -10,14 +10,14 @@
|
|||||||
#include "server-state.h"
|
#include "server-state.h"
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
#include "thread-pool.h"
|
#include "thread-pool.h"
|
||||||
#include "main-loop.h"
|
#include "server-connection.h"
|
||||||
|
|
||||||
server_status* server_status_new(config_server *config) {
|
server_state* server_status_new(config_server *config) {
|
||||||
assert(config!=NULL);
|
assert(config!=NULL);
|
||||||
assert(config->host_count>0);
|
assert(config->host_count>0);
|
||||||
assert(config->listen_port>0);
|
assert(config->listen_port>0);
|
||||||
|
|
||||||
server_status *status = calloc(1, sizeof(server_status));
|
server_state *status = calloc(1, sizeof(server_state));
|
||||||
status->started = false;
|
status->started = false;
|
||||||
status->stopped = true;
|
status->stopped = true;
|
||||||
status->shutdown_requested = false;
|
status->shutdown_requested = false;
|
||||||
@@ -27,7 +27,7 @@ server_status* server_status_new(config_server *config) {
|
|||||||
|
|
||||||
return status;
|
return status;
|
||||||
}
|
}
|
||||||
void server_status_delete(server_status *status) {
|
void server_status_delete(server_state *status) {
|
||||||
assert(status!=NULL);
|
assert(status!=NULL);
|
||||||
assert(status->stopped==true);
|
assert(status->stopped==true);
|
||||||
assert(status->pools[0]==NULL);
|
assert(status->pools[0]==NULL);
|
||||||
@@ -41,7 +41,7 @@ void server_status_delete(server_status *status) {
|
|||||||
free(status);
|
free(status);
|
||||||
}
|
}
|
||||||
|
|
||||||
void server_start_pools(server_status *status, thread_func pool_functions[]) {
|
void server_start_pools(server_state *status, thread_func pool_functions[]) {
|
||||||
assert(status!=NULL);
|
assert(status!=NULL);
|
||||||
assert(status->pools[0]==NULL);
|
assert(status->pools[0]==NULL);
|
||||||
|
|
||||||
@@ -67,7 +67,7 @@ void server_start_pools(server_status *status, thread_func pool_functions[]) {
|
|||||||
status->pools[POOL_WORKER] = pool;
|
status->pools[POOL_WORKER] = pool;
|
||||||
thread_pool_start(pool);
|
thread_pool_start(pool);
|
||||||
}
|
}
|
||||||
void server_stop_pools(server_status *status) {
|
void server_stop_pools(server_state *status) {
|
||||||
assert(status!=NULL);
|
assert(status!=NULL);
|
||||||
assert(status->pools[0]!=NULL);
|
assert(status->pools[0]!=NULL);
|
||||||
|
|
||||||
|
|||||||
@@ -15,27 +15,26 @@ extern "C" {
|
|||||||
#include "http.h"
|
#include "http.h"
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
#include "thread-pool.h"
|
#include "thread-pool.h"
|
||||||
#include "server-connection.h"
|
|
||||||
|
|
||||||
typedef enum server_pool {
|
typedef enum server_pool {
|
||||||
POOL_READ, POOL_WRITE, POOL_WORKER, /*{*/THREADPOOL_NUM/*}must be last*/
|
POOL_READ, POOL_WRITE, POOL_WORKER, /*{*/THREADPOOL_NUM/*}must be last*/
|
||||||
} server_pool;
|
} server_pool;
|
||||||
|
|
||||||
typedef struct server_status {
|
typedef struct server_state {
|
||||||
config_server *config;
|
config_server *config;
|
||||||
bool started, stopped;
|
bool started, stopped;
|
||||||
bool shutdown_requested;
|
bool shutdown_requested;
|
||||||
int sfd;
|
int sfd;
|
||||||
int epollfd;
|
int epollfd;
|
||||||
thread_pool *pools[THREADPOOL_NUM];
|
thread_pool *pools[THREADPOOL_NUM];
|
||||||
server_connection *clients;
|
struct server_connection *clients;
|
||||||
} server_status;
|
} server_state;
|
||||||
|
|
||||||
server_status* server_status_new(config_server *config);
|
server_state* server_status_new(config_server *config);
|
||||||
void server_status_delete(server_status *status);
|
void server_status_delete(server_state *status);
|
||||||
|
|
||||||
void server_start_pools(server_status *status, thread_func pool_functions[]);
|
void server_start_pools(server_state *status, thread_func pool_functions[]);
|
||||||
void server_stop_pools(server_status *status);
|
void server_stop_pools(server_state *status);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|||||||
13
src/server.c
13
src/server.c
@@ -21,7 +21,7 @@
|
|||||||
#include "server-loop.h"
|
#include "server-loop.h"
|
||||||
#include "server.h"
|
#include "server.h"
|
||||||
|
|
||||||
void server_start(server_status *status, const char* config_file) {
|
void server_start(server_state *status) {
|
||||||
assert(status!=NULL);
|
assert(status!=NULL);
|
||||||
assert(status->stopped==true);
|
assert(status->stopped==true);
|
||||||
|
|
||||||
@@ -34,16 +34,9 @@ void server_start(server_status *status, const char* config_file) {
|
|||||||
//Load mime types
|
//Load mime types
|
||||||
mime_init(NULL);
|
mime_init(NULL);
|
||||||
|
|
||||||
//Load the config
|
|
||||||
config_server *config = config_server_new();
|
|
||||||
if (config_read_ini(config_file, config) < 0) {
|
|
||||||
fatal("Could not read config");
|
|
||||||
}
|
|
||||||
status->config = config;
|
|
||||||
|
|
||||||
//Open the server socket
|
//Open the server socket
|
||||||
status->sfd = server_socket_create();
|
status->sfd = server_socket_create();
|
||||||
server_socket_listen_epoll(status->sfd, config->listen_port, &status->epollfd);
|
server_socket_listen_epoll(status->sfd, status->config->listen_port, &status->epollfd);
|
||||||
|
|
||||||
//Start thread pools
|
//Start thread pools
|
||||||
thread_func pool_functions[] = {
|
thread_func pool_functions[] = {
|
||||||
@@ -62,7 +55,7 @@ void server_start(server_status *status, const char* config_file) {
|
|||||||
//Cleanup after the loop exits
|
//Cleanup after the loop exits
|
||||||
server_teardown(status);
|
server_teardown(status);
|
||||||
}
|
}
|
||||||
void server_teardown(server_status *status) {
|
void server_teardown(server_state *status) {
|
||||||
assert(status!=NULL);
|
assert(status!=NULL);
|
||||||
assert(status->stopped==true);
|
assert(status->stopped==true);
|
||||||
|
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ extern "C" {
|
|||||||
|
|
||||||
#include "server-state.h"
|
#include "server-state.h"
|
||||||
|
|
||||||
void server_start(server_status *status, const char* config_file);
|
void server_start(server_state *status);
|
||||||
void server_teardown(server_status *status);
|
void server_teardown(server_state *status);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -113,7 +113,9 @@ int skt_data_buffer(socket_info *skt, data_buffer_list *list) {
|
|||||||
void skt_close(socket_info* skt) {
|
void skt_close(socket_info* skt) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
if (close(skt->fd) < 0) {
|
if (close(skt->fd) < 0) {
|
||||||
warning(true, "error closing socket");
|
if (errno != EBADF) {
|
||||||
|
warning(true, "error closing socket");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
skt->closed = true;
|
skt->closed = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -58,7 +58,7 @@ void warning(bool use_errno, char* fmt, ...) {
|
|||||||
strcat(errstr, msg);
|
strcat(errstr, msg);
|
||||||
strcat(errstr, ": ");
|
strcat(errstr, ": ");
|
||||||
strcat(errstr, errnostr);
|
strcat(errstr, errnostr);
|
||||||
LOG(LFATAL, errstr);
|
LOG(LWARNING, errstr);
|
||||||
free(errnostr_buf);
|
free(errnostr_buf);
|
||||||
free(errstr);
|
free(errstr);
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
Reference in New Issue
Block a user