Done main server loop

This commit is contained in:
2014-08-19 16:46:04 +01:00
parent d6cb6aa3ea
commit 7b8e84913a
8 changed files with 129 additions and 12 deletions

View File

@@ -16,7 +16,7 @@ extern "C" {
#include <time.h> #include <time.h>
#include <pthread.h> #include <pthread.h>
#define EPOLL_MAXEVENTS 128 #define EPOLL_MAXEVENTS 128
#define CONN_LOCK(c) pthread_mutex_lock(c->mutex) #define CONN_LOCK(c) pthread_mutex_lock(c->mutex)
#define CONN_UNLOCK(c) pthread_mutex_unlock(c->mutex) #define CONN_UNLOCK(c) pthread_mutex_unlock(c->mutex)

View File

@@ -10,7 +10,7 @@
#include "log.h" #include "log.h"
queue_item* queue_item_new() { queue_item* queue_item_new() {
static uint64_t nextid = 0; static uint64_t nextid = 1;
queue_item *item = calloc(1, sizeof(queue_item)); queue_item *item = calloc(1, sizeof(queue_item));
item->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST); item->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
@@ -131,6 +131,7 @@ queue_item* queue_fetchone(queue *q, bool blocking) {
//Add to processing list //Add to processing list
queue_pending_item *token = calloc(1, sizeof(queue_pending_item)); queue_pending_item *token = calloc(1, sizeof(queue_pending_item));
token->qid = item->id; token->qid = item->id;
token->data = item->data;
LL_APPEND(q->processing, token); LL_APPEND(q->processing, token);
} }
} }
@@ -206,7 +207,8 @@ void queue_return_item(queue *q, queue_item *item, bool finished) {
pthread_cond_broadcast(q->processing_cond); pthread_cond_broadcast(q->processing_cond);
QUEUE_UNLOCK(q); QUEUE_UNLOCK(q);
} }
void queue_waitfor_pending(queue *q, uint64_t itemid) { void queue_pending_wait(queue *q, uint64_t itemid) {
assert(q!=NULL);
QUEUE_LOCK(q); QUEUE_LOCK(q);
bool found; bool found;
@@ -218,3 +220,22 @@ void queue_waitfor_pending(queue *q, uint64_t itemid) {
} }
QUEUE_UNLOCK(q); QUEUE_UNLOCK(q);
} }
void queue_pending_waitptr(queue *q, void* ptr) {
uint64_t itemid;
queue_pending_item *elem;
do {
QUEUE_LOCK(q);
itemid = 0;
LL_FOREACH(q->processing, elem) {
if (elem->data == ptr) {
itemid = elem->qid;
break;
}
}
QUEUE_UNLOCK(q);
if (itemid > 0) {
queue_pending_wait(q, itemid);
}
} while(itemid > 0);
}

View File

@@ -73,6 +73,7 @@ extern "C" {
typedef struct queue_pending_item { typedef struct queue_pending_item {
uint64_t qid; uint64_t qid;
void * data;
struct queue_pending_item *next; struct queue_pending_item *next;
} queue_pending_item; } queue_pending_item;
@@ -97,6 +98,7 @@ extern "C" {
size_t queue_count(queue *q); size_t queue_count(queue *q);
void queue_return_item(queue *q, queue_item *item, bool finished); void queue_return_item(queue *q, queue_item *item, bool finished);
void queue_pending_wait(queue *q, uint64_t itemid); void queue_pending_wait(queue *q, uint64_t itemid);
void queue_pending_waitptr(queue *q, void* ptr);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@@ -1,6 +1,9 @@
#include <stdlib.h> #include <stdlib.h>
#include <stdio.h> #include <stdio.h>
#include <stdbool.h> #include <stdbool.h>
#include <sys/epoll.h>
#include <assert.h>
#include <errno.h>
#include "util.h" #include "util.h"
#include "log.h" #include "log.h"
@@ -8,7 +11,89 @@
#include "server-connection.h" #include "server-connection.h"
#include "server-state.h" #include "server-state.h"
#include "server-loop.h"
#include "ut/utlist.h"
#include "server-socket.h"
void server_loop(server_status *state) { void server_loop(server_status *state) {
assert(state!=NULL);
assert(state->started==true);
struct epoll_event *current_event;
struct epoll_event *events = alloca(sizeof(struct epoll_event)*EP_MAXEVENTS);
struct server_connection *conn;
while(state->shutdown_requested == false) {
int event_count = epoll_wait(state->epollfd, events, EP_MAXEVENTS, EP_WAIT_TIME);
for(int i=0; i<event_count; i++) {
current_event = &events[i];
conn = current_event->data.ptr != NULL ? EP_CONN(current_event) : NULL;
if (EP_EVENT_IS (current_event, EPOLLERR | EPOLLHUP) ||
EP_EVENT_ISNOT(current_event, EPOLLIN | EPOLLOUT | EPOLLRDHUP)) {
if (conn == NULL) {
fatal("Unexpected error on unknown socket");
} else if (conn->skt->fd == state->sfd) {
LOG(LWARNING, "Error/Unexpected event on server socket");
state->shutdown_requested = true;
break;
} else {
char* conn_addr = calloc(INET_ADDRSTRLEN, sizeof(char));
skt_clientaddr(conn->skt, conn_addr, INET_ADDRSTRLEN);
LOG(LWARNING, "Error on socket %lu [%s]", conn->id, conn_addr);
free(conn_addr);
skt_close(conn->skt);
}
} else if (EP_EVENT_IS(current_event, EPOLLRDHUP)) {
assert(conn != NULL);
skt_close(conn->skt);
} else if (EP_EVENT_IS(current_event, EPOLLIN)) {
assert(conn != NULL);
if (conn->skt->fd == state->sfd) {
//New connections
while (true) {
socket_info *skt = server_socket_accept(state->sfd, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (skt == NULL) {
break;
}
server_connection *client = server_connection_new(skt);
struct epoll_event new_event;
new_event.data.ptr = client;
new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
if (epoll_ctl(state->epollfd, EPOLL_CTL_ADD, client->skt->fd, &new_event) < 0) {
warning(true, "Failed to add connection to epoll");
skt_close(client->skt);
server_connection_delete(client);
}
LL_APPEND(state->clients, client);
}
} else {
assert(conn != NULL);
//Data available on connection
queue_add(state->pools[POOL_READ]->queue, (void*)conn);
}
} else if (EP_EVENT_IS(current_event, EPOLLOUT)) {
//Connection is now available to write to
queue_add(state->pools[POOL_WRITE]->queue, (void*)conn);
}
}
//Clean up closed connections
server_connection *elem, *tmp;
LL_FOREACH(state->clients, elem) {
if (elem->skt->closed != true) {
continue;
}
for(int i=0; i < THREADPOOL_NUM; i++) {
queue_remove_byptr(state->pools[i]->queue, elem);
}
for(int i=0; i < THREADPOOL_NUM; i++) {
queue_pending_waitptr(state->pools[i]->queue, elem);
}
server_connection_delete(elem);
}
}
state->shutdown_requested = false;
state->started = false;
state->stopped = true;
} }

View File

@@ -14,9 +14,12 @@ extern "C" {
#include "server-state.h" #include "server-state.h"
#define EP_MAXEVENTS 128
#define EP_WAIT_TIME 2000
#define EP_CONN(event) (server_connection*)event->data.ptr #define EP_CONN(event) (server_connection*)event->data.ptr
#define EP_EVENT_IS(event, type) ((event->events & type) == type) #define EP_EVENT_IS(event, type) ((event->events & type) != 0)
#define EP_EVENT_ISNOT(event, type) (!EVENT_IS(event, type)) #define EP_EVENT_ISNOT(event, type) (!EP_EVENT_IS(event, type))
void server_loop(server_status *state); void server_loop(server_status *state);
void* server_loop_read(void* arg); void* server_loop_read(void* arg);

View File

@@ -87,8 +87,10 @@ socket_info* server_socket_accept(int fd, int flags) {
skt->clientaddr = clientaddr; skt->clientaddr = clientaddr;
skt->fd = clientfd; skt->fd = clientfd;
info("[#%lu %s] New Connection", skt->id, skt_clientaddr(skt)); char * address = calloc(INET_ADDRSTRLEN, sizeof(char));
skt_clientaddr(skt, address, INET_ADDRSTRLEN);
info("[#%lu %s] New Connection", skt->id, address);
free(address);
return skt; return skt;
} }

View File

@@ -28,6 +28,7 @@ socket_info* skt_new(int fd) {
socket_info* skt = calloc(1, sizeof(socket_info)); socket_info* skt = calloc(1, sizeof(socket_info));
skt->id = skt_nextid(); skt->id = skt_nextid();
skt->fd = fd; skt->fd = fd;
skt->closed = false;
skt->time_opened = time(NULL); skt->time_opened = time(NULL);
skt->error = false; skt->error = false;
skt->clientaddr = NULL; skt->clientaddr = NULL;
@@ -114,14 +115,16 @@ void skt_close(socket_info* skt) {
if (close(skt->fd) < 0) { if (close(skt->fd) < 0) {
warning(true, "error closing socket"); warning(true, "error closing socket");
} }
skt->closed = true;
} }
const char* skt_clientaddr(socket_info *skt) { char* skt_clientaddr(socket_info *skt, char* address, size_t address_len) {
assert(skt != NULL); assert(skt != NULL);
char *tmp = calloc(INET_ADDRSTRLEN, sizeof(char)); assert(address != NULL);
const char* address = inet_ntop(AF_INET, &skt->clientaddr->sin_addr, tmp, INET_ADDRSTRLEN); assert(address_len >= INET_ADDRSTRLEN);
inet_ntop(AF_INET, &skt->clientaddr->sin_addr, address, address_len);
if (address == NULL) { if (address == NULL) {
warning(true, "error fetching client address"); warning(true, "error fetching client address");
free(tmp); free(address);
} }
return address; return address;
} }

View File

@@ -23,6 +23,7 @@ extern "C" {
typedef struct socket_info { typedef struct socket_info {
u_int64_t id; u_int64_t id;
int fd; int fd;
bool closed;
struct sockaddr_in* clientaddr; struct sockaddr_in* clientaddr;
time_t time_opened; time_t time_opened;
bool error; bool error;
@@ -37,7 +38,7 @@ extern "C" {
size_t skt_write(socket_info* skt, char* data, size_t len); size_t skt_write(socket_info* skt, char* data, size_t len);
int skt_write_data_buffer(socket_info *skt, data_buffer_list *list); int skt_write_data_buffer(socket_info *skt, data_buffer_list *list);
void skt_close(socket_info *skt); void skt_close(socket_info *skt);
const char* skt_clientaddr(socket_info *skt); char* skt_clientaddr(socket_info *skt, char* address, size_t address_len);
#ifdef __cplusplus #ifdef __cplusplus
} }