working on server
This commit is contained in:
@@ -49,6 +49,8 @@ OBJECTFILES= \
|
|||||||
${OBJECTDIR}/src/queue.o \
|
${OBJECTDIR}/src/queue.o \
|
||||||
${OBJECTDIR}/src/server-connection.o \
|
${OBJECTDIR}/src/server-connection.o \
|
||||||
${OBJECTDIR}/src/server-socket.o \
|
${OBJECTDIR}/src/server-socket.o \
|
||||||
|
${OBJECTDIR}/src/server-state.o \
|
||||||
|
${OBJECTDIR}/src/server.o \
|
||||||
${OBJECTDIR}/src/socket.o \
|
${OBJECTDIR}/src/socket.o \
|
||||||
${OBJECTDIR}/src/thread-pool.o \
|
${OBJECTDIR}/src/thread-pool.o \
|
||||||
${OBJECTDIR}/src/util.o
|
${OBJECTDIR}/src/util.o
|
||||||
@@ -148,6 +150,16 @@ ${OBJECTDIR}/src/server-socket.o: nbproject/Makefile-${CND_CONF}.mk src/server-s
|
|||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-socket.o src/server-socket.c
|
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-socket.o src/server-socket.c
|
||||||
|
|
||||||
|
${OBJECTDIR}/src/server-state.o: nbproject/Makefile-${CND_CONF}.mk src/server-state.c
|
||||||
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
|
${RM} "$@.d"
|
||||||
|
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-state.o src/server-state.c
|
||||||
|
|
||||||
|
${OBJECTDIR}/src/server.o: nbproject/Makefile-${CND_CONF}.mk src/server.c
|
||||||
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
|
${RM} "$@.d"
|
||||||
|
$(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server.o src/server.c
|
||||||
|
|
||||||
${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c
|
${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c
|
||||||
${MKDIR} -p ${OBJECTDIR}/src
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
|
|||||||
@@ -49,6 +49,8 @@ OBJECTFILES= \
|
|||||||
${OBJECTDIR}/src/queue.o \
|
${OBJECTDIR}/src/queue.o \
|
||||||
${OBJECTDIR}/src/server-connection.o \
|
${OBJECTDIR}/src/server-connection.o \
|
||||||
${OBJECTDIR}/src/server-socket.o \
|
${OBJECTDIR}/src/server-socket.o \
|
||||||
|
${OBJECTDIR}/src/server-state.o \
|
||||||
|
${OBJECTDIR}/src/server.o \
|
||||||
${OBJECTDIR}/src/socket.o \
|
${OBJECTDIR}/src/socket.o \
|
||||||
${OBJECTDIR}/src/thread-pool.o \
|
${OBJECTDIR}/src/thread-pool.o \
|
||||||
${OBJECTDIR}/src/util.o
|
${OBJECTDIR}/src/util.o
|
||||||
@@ -148,6 +150,16 @@ ${OBJECTDIR}/src/server-socket.o: nbproject/Makefile-${CND_CONF}.mk src/server-s
|
|||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-socket.o src/server-socket.c
|
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-socket.o src/server-socket.c
|
||||||
|
|
||||||
|
${OBJECTDIR}/src/server-state.o: nbproject/Makefile-${CND_CONF}.mk src/server-state.c
|
||||||
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
|
${RM} "$@.d"
|
||||||
|
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server-state.o src/server-state.c
|
||||||
|
|
||||||
|
${OBJECTDIR}/src/server.o: nbproject/Makefile-${CND_CONF}.mk src/server.c
|
||||||
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
|
${RM} "$@.d"
|
||||||
|
$(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/server.o src/server.c
|
||||||
|
|
||||||
${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c
|
${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c
|
||||||
${MKDIR} -p ${OBJECTDIR}/src
|
${MKDIR} -p ${OBJECTDIR}/src
|
||||||
${RM} "$@.d"
|
${RM} "$@.d"
|
||||||
|
|||||||
@@ -19,6 +19,8 @@
|
|||||||
<itemPath>src/queue.h</itemPath>
|
<itemPath>src/queue.h</itemPath>
|
||||||
<itemPath>src/server-connection.h</itemPath>
|
<itemPath>src/server-connection.h</itemPath>
|
||||||
<itemPath>src/server-socket.h</itemPath>
|
<itemPath>src/server-socket.h</itemPath>
|
||||||
|
<itemPath>src/server-state.h</itemPath>
|
||||||
|
<itemPath>src/server.h</itemPath>
|
||||||
<itemPath>src/socket.h</itemPath>
|
<itemPath>src/socket.h</itemPath>
|
||||||
<itemPath>src/thread-pool.h</itemPath>
|
<itemPath>src/thread-pool.h</itemPath>
|
||||||
<itemPath>src/util.h</itemPath>
|
<itemPath>src/util.h</itemPath>
|
||||||
@@ -45,6 +47,8 @@
|
|||||||
<itemPath>src/queue.c</itemPath>
|
<itemPath>src/queue.c</itemPath>
|
||||||
<itemPath>src/server-connection.c</itemPath>
|
<itemPath>src/server-connection.c</itemPath>
|
||||||
<itemPath>src/server-socket.c</itemPath>
|
<itemPath>src/server-socket.c</itemPath>
|
||||||
|
<itemPath>src/server-state.c</itemPath>
|
||||||
|
<itemPath>src/server.c</itemPath>
|
||||||
<itemPath>src/socket.c</itemPath>
|
<itemPath>src/socket.c</itemPath>
|
||||||
<itemPath>src/thread-pool.c</itemPath>
|
<itemPath>src/thread-pool.c</itemPath>
|
||||||
<itemPath>src/util.c</itemPath>
|
<itemPath>src/util.c</itemPath>
|
||||||
@@ -161,6 +165,14 @@
|
|||||||
</item>
|
</item>
|
||||||
<item path="src/server-socket.h" ex="false" tool="3" flavor2="0">
|
<item path="src/server-socket.h" ex="false" tool="3" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
|
<item path="src/server-state.c" ex="false" tool="0" flavor2="0">
|
||||||
|
</item>
|
||||||
|
<item path="src/server-state.h" ex="false" tool="3" flavor2="0">
|
||||||
|
</item>
|
||||||
|
<item path="src/server.c" ex="false" tool="0" flavor2="0">
|
||||||
|
</item>
|
||||||
|
<item path="src/server.h" ex="false" tool="3" flavor2="0">
|
||||||
|
</item>
|
||||||
<item path="src/socket.c" ex="false" tool="0" flavor2="0">
|
<item path="src/socket.c" ex="false" tool="0" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/socket.h" ex="false" tool="3" flavor2="0">
|
<item path="src/socket.h" ex="false" tool="3" flavor2="0">
|
||||||
@@ -271,6 +283,14 @@
|
|||||||
</item>
|
</item>
|
||||||
<item path="src/server-socket.h" ex="false" tool="3" flavor2="0">
|
<item path="src/server-socket.h" ex="false" tool="3" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
|
<item path="src/server-state.c" ex="false" tool="0" flavor2="0">
|
||||||
|
</item>
|
||||||
|
<item path="src/server-state.h" ex="false" tool="3" flavor2="0">
|
||||||
|
</item>
|
||||||
|
<item path="src/server.c" ex="false" tool="0" flavor2="0">
|
||||||
|
</item>
|
||||||
|
<item path="src/server.h" ex="false" tool="3" flavor2="0">
|
||||||
|
</item>
|
||||||
<item path="src/socket.c" ex="false" tool="0" flavor2="0">
|
<item path="src/socket.c" ex="false" tool="0" flavor2="0">
|
||||||
</item>
|
</item>
|
||||||
<item path="src/socket.h" ex="false" tool="3" flavor2="0">
|
<item path="src/socket.h" ex="false" tool="3" flavor2="0">
|
||||||
|
|||||||
@@ -12,7 +12,6 @@
|
|||||||
#include "log.h"
|
#include "log.h"
|
||||||
#include "socket.h"
|
#include "socket.h"
|
||||||
#include "thread-pool.h"
|
#include "thread-pool.h"
|
||||||
#include "log.h"
|
|
||||||
#include "http_parser.h"
|
#include "http_parser.h"
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
#include "http-reader.h"
|
#include "http-reader.h"
|
||||||
|
|||||||
82
src/queue.c
82
src/queue.c
@@ -2,10 +2,12 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
#include "ut/utlist.h"
|
#include "ut/utlist.h"
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
|
#include "log.h"
|
||||||
|
|
||||||
queue_item* queue_item_new() {
|
queue_item* queue_item_new() {
|
||||||
static uint64_t nextid = 0;
|
static uint64_t nextid = 0;
|
||||||
@@ -18,7 +20,7 @@ queue_item* queue_item_new() {
|
|||||||
queue_item* queue_item_new2(char* tag, void* data) {
|
queue_item* queue_item_new2(char* tag, void* data) {
|
||||||
queue_item *item = queue_item_new();
|
queue_item *item = queue_item_new();
|
||||||
item->tag[0] = '\0';
|
item->tag[0] = '\0';
|
||||||
strncat(item->tag, tag, (sizeof(item->tag)/sizeof(char))-1);
|
strncat(item->tag, tag, QUEUE_ITEM_TAG_LEN-1);
|
||||||
item->data = data;
|
item->data = data;
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
@@ -38,22 +40,32 @@ queue* queue_new() {
|
|||||||
if (pthread_cond_init(q->cond, NULL) != 0) {
|
if (pthread_cond_init(q->cond, NULL) != 0) {
|
||||||
fatal("Failed to init queue cond");
|
fatal("Failed to init queue cond");
|
||||||
}
|
}
|
||||||
|
q->processing_cond = calloc(1, sizeof(pthread_cond_t));
|
||||||
|
if (pthread_cond_init(q->processing_cond, NULL) != 0) {
|
||||||
|
fatal("Failed to init queue processing cond");
|
||||||
|
}
|
||||||
|
|
||||||
return q;
|
return q;
|
||||||
}
|
}
|
||||||
void queue_delete(queue *q) {
|
void queue_delete(queue *q) {
|
||||||
queue_item *elem, *tmp;
|
assert(q!=NULL);
|
||||||
DL_FOREACH_SAFE(q->list, elem, tmp) {
|
size_t pending_count = 0;
|
||||||
queue_item_delete(elem);
|
QUEUE_PENDING_COUNT(q, pending_count);
|
||||||
DL_DELETE(q->list, elem);
|
if (pending_count > 0) {
|
||||||
|
warning(false, "queue has pending items at deletion");
|
||||||
}
|
}
|
||||||
|
queue_clear(q);
|
||||||
pthread_mutex_destroy(q->mutex);
|
pthread_mutex_destroy(q->mutex);
|
||||||
free(q->mutex);
|
free(q->mutex);
|
||||||
pthread_cond_destroy(q->cond);
|
pthread_cond_destroy(q->cond);
|
||||||
free(q->cond);
|
free(q->cond);
|
||||||
|
pthread_cond_destroy(q->processing_cond);
|
||||||
|
free(q->processing_cond);
|
||||||
free(q);
|
free(q);
|
||||||
}
|
}
|
||||||
int queue_add(queue *q, queue_item *item) {
|
int queue_add(queue *q, queue_item *item) {
|
||||||
|
assert(q!=NULL);
|
||||||
|
assert(item!=NULL);
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
DL_APPEND(q->list, item);
|
DL_APPEND(q->list, item);
|
||||||
q->count++;
|
q->count++;
|
||||||
@@ -64,6 +76,8 @@ int queue_add(queue *q, queue_item *item) {
|
|||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int queue_remove(queue *q, queue_item *item) {
|
int queue_remove(queue *q, queue_item *item) {
|
||||||
|
assert(q!=NULL);
|
||||||
|
assert(item!=NULL);
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
int result = 0;
|
int result = 0;
|
||||||
queue_item *elem, *tmp;
|
queue_item *elem, *tmp;
|
||||||
@@ -80,6 +94,7 @@ int queue_remove(queue *q, queue_item *item) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
int queue_remove_byptr(queue *q, void* ptr) {
|
int queue_remove_byptr(queue *q, void* ptr) {
|
||||||
|
assert(q!=NULL);
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
|
|
||||||
int result = 0;
|
int result = 0;
|
||||||
@@ -96,6 +111,7 @@ int queue_remove_byptr(queue *q, void* ptr) {
|
|||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
queue_item* queue_fetchone(queue *q, bool blocking) {
|
queue_item* queue_fetchone(queue *q, bool blocking) {
|
||||||
|
assert(q!=NULL);
|
||||||
queue_item *item = NULL;
|
queue_item *item = NULL;
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
if (q->count == 0 && blocking == true) {
|
if (q->count == 0 && blocking == true) {
|
||||||
@@ -112,12 +128,17 @@ queue_item* queue_fetchone(queue *q, bool blocking) {
|
|||||||
if (item != NULL) {
|
if (item != NULL) {
|
||||||
DL_DELETE(q->list, item);
|
DL_DELETE(q->list, item);
|
||||||
q->count--;
|
q->count--;
|
||||||
|
//Add to processing list
|
||||||
|
queue_pending_item *token = calloc(1, sizeof(queue_pending_item));
|
||||||
|
token->qid = item->id;
|
||||||
|
LL_APPEND(q->processing, token);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
QUEUE_UNLOCK(q);
|
QUEUE_UNLOCK(q);
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
void queue_unblock(queue *q, uint64_t itemid) {
|
void queue_unblock(queue *q, uint64_t itemid) {
|
||||||
|
assert(q!=NULL);
|
||||||
queue_item *item=NULL, *elem=NULL;
|
queue_item *item=NULL, *elem=NULL;
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
LL_FOREACH(q->list, elem) {
|
LL_FOREACH(q->list, elem) {
|
||||||
@@ -135,24 +156,65 @@ void queue_unblock(queue *q, uint64_t itemid) {
|
|||||||
QUEUE_UNLOCK(q);
|
QUEUE_UNLOCK(q);
|
||||||
}
|
}
|
||||||
void queue_clear(queue *q) {
|
void queue_clear(queue *q) {
|
||||||
|
assert(q!=NULL);
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
queue_item *elem, *tmp;
|
{
|
||||||
DL_FOREACH_SAFE(q->list, elem, tmp) {
|
queue_item *elem, *tmp;
|
||||||
queue_item_delete(elem);
|
DL_FOREACH_SAFE(q->list, elem, tmp) {
|
||||||
DL_DELETE(q->list, elem);
|
queue_item_delete(elem);
|
||||||
|
DL_DELETE(q->list, elem);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
{
|
||||||
|
queue_pending_item *elem, *tmp;
|
||||||
|
LL_FOREACH_SAFE(q->processing, elem, tmp) {
|
||||||
|
LL_DELETE(q->processing, elem);
|
||||||
|
free(elem);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
pthread_cond_broadcast(q->cond);
|
|
||||||
QUEUE_UNLOCK(q);
|
QUEUE_UNLOCK(q);
|
||||||
}
|
}
|
||||||
void queue_ping(queue *q) {
|
void queue_ping(queue *q) {
|
||||||
|
assert(q!=NULL);
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
pthread_cond_broadcast(q->cond);
|
pthread_cond_broadcast(q->cond);
|
||||||
QUEUE_UNLOCK(q);
|
QUEUE_UNLOCK(q);
|
||||||
}
|
}
|
||||||
size_t queue_count(queue *q) {
|
size_t queue_count(queue *q) {
|
||||||
|
assert(q!=NULL);
|
||||||
size_t count;
|
size_t count;
|
||||||
QUEUE_LOCK(q);
|
QUEUE_LOCK(q);
|
||||||
count = q->count;
|
count = q->count;
|
||||||
QUEUE_UNLOCK(q);
|
QUEUE_UNLOCK(q);
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void queue_return_item(queue *q, queue_item *item, bool finished) {
|
||||||
|
assert(q!=NULL);
|
||||||
|
assert(item!=NULL);
|
||||||
|
|
||||||
|
QUEUE_LOCK(q);
|
||||||
|
QUEUE_PENDING_REMOVE(q, item->id);
|
||||||
|
QUEUE_UNLOCK(q);
|
||||||
|
|
||||||
|
if (finished == true) {
|
||||||
|
queue_item_delete(item);
|
||||||
|
} else {
|
||||||
|
queue_add(q, item);
|
||||||
|
}
|
||||||
|
QUEUE_LOCK(q);
|
||||||
|
pthread_cond_broadcast(q->processing_cond);
|
||||||
|
QUEUE_UNLOCK(q);
|
||||||
|
}
|
||||||
|
void queue_waitfor_pending(queue *q, uint64_t itemid) {
|
||||||
|
QUEUE_LOCK(q);
|
||||||
|
|
||||||
|
bool found;
|
||||||
|
QUEUE_HAS_PENDING(q, itemid, found);
|
||||||
|
while(found == true) {
|
||||||
|
pthread_cond_wait(q->processing_cond, q->mutex);
|
||||||
|
|
||||||
|
QUEUE_HAS_PENDING(q, itemid, found);
|
||||||
|
}
|
||||||
|
QUEUE_UNLOCK(q);
|
||||||
|
}
|
||||||
76
src/queue.h
76
src/queue.h
@@ -16,25 +16,7 @@ extern "C" {
|
|||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
#include <stdbool.h>
|
#include <stdbool.h>
|
||||||
|
|
||||||
typedef struct queue_item {
|
#define QUEUE_ITEM_TAG_LEN 16
|
||||||
uint64_t id;
|
|
||||||
struct queue_item *prev;
|
|
||||||
struct queue_item *next;
|
|
||||||
char tag[16];
|
|
||||||
bool blocked;
|
|
||||||
void *data;
|
|
||||||
} queue_item;
|
|
||||||
|
|
||||||
queue_item* queue_item_new();
|
|
||||||
queue_item* queue_item_new2(char* tag, void* data);
|
|
||||||
void queue_item_delete(queue_item *item);
|
|
||||||
|
|
||||||
typedef struct queue {
|
|
||||||
queue_item *list;
|
|
||||||
uint64_t count;
|
|
||||||
pthread_mutex_t *mutex;
|
|
||||||
pthread_cond_t *cond;
|
|
||||||
} queue;
|
|
||||||
|
|
||||||
#define QUEUE_LOCK(q) \
|
#define QUEUE_LOCK(q) \
|
||||||
do { \
|
do { \
|
||||||
@@ -49,6 +31,60 @@ extern "C" {
|
|||||||
} \
|
} \
|
||||||
} while(0)
|
} while(0)
|
||||||
|
|
||||||
|
#define QUEUE_PENDING_REMOVE(q, itemid) \
|
||||||
|
do { \
|
||||||
|
queue_pending_item *elem, *tmp; \
|
||||||
|
LL_FOREACH_SAFE(q->processing, elem, tmp) { \
|
||||||
|
if (elem->qid == itemid) { \
|
||||||
|
LL_DELETE(q->processing, elem); \
|
||||||
|
free(elem); \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
} while(0)
|
||||||
|
#define QUEUE_HAS_PENDING(q, itemid, found) \
|
||||||
|
do { \
|
||||||
|
found = false; \
|
||||||
|
queue_pending_item *elem; \
|
||||||
|
LL_FOREACH(q->processing, elem) { \
|
||||||
|
if (elem->qid == itemid) { \
|
||||||
|
found = true; \
|
||||||
|
break; \
|
||||||
|
} \
|
||||||
|
} \
|
||||||
|
} while(0)
|
||||||
|
#define QUEUE_PENDING_COUNT(q, counter) \
|
||||||
|
do { \
|
||||||
|
queue_pending_item *elem; \
|
||||||
|
LL_COUNT(q->processing, elem, counter); \
|
||||||
|
} while(0)
|
||||||
|
|
||||||
|
typedef struct queue_item {
|
||||||
|
uint64_t id;
|
||||||
|
struct queue_item *prev;
|
||||||
|
struct queue_item *next;
|
||||||
|
char tag[QUEUE_ITEM_TAG_LEN];
|
||||||
|
bool blocked;
|
||||||
|
void *data;
|
||||||
|
} queue_item;
|
||||||
|
|
||||||
|
queue_item* queue_item_new();
|
||||||
|
queue_item* queue_item_new2(char* tag, void* data);
|
||||||
|
void queue_item_delete(queue_item *item);
|
||||||
|
|
||||||
|
typedef struct queue_pending_item {
|
||||||
|
uint64_t qid;
|
||||||
|
struct queue_pending_item *next;
|
||||||
|
} queue_pending_item;
|
||||||
|
|
||||||
|
typedef struct queue {
|
||||||
|
queue_item *list;
|
||||||
|
uint64_t count;
|
||||||
|
queue_pending_item *processing;
|
||||||
|
pthread_mutex_t *mutex;
|
||||||
|
pthread_cond_t *cond;
|
||||||
|
pthread_cond_t *processing_cond;
|
||||||
|
} queue;
|
||||||
|
|
||||||
queue* queue_new();
|
queue* queue_new();
|
||||||
void queue_delete(queue *q);
|
void queue_delete(queue *q);
|
||||||
int queue_add(queue *q, queue_item *item);
|
int queue_add(queue *q, queue_item *item);
|
||||||
@@ -59,6 +95,8 @@ extern "C" {
|
|||||||
void queue_clear(queue *q);
|
void queue_clear(queue *q);
|
||||||
void queue_ping(queue *q);
|
void queue_ping(queue *q);
|
||||||
size_t queue_count(queue *q);
|
size_t queue_count(queue *q);
|
||||||
|
void queue_return_item(queue *q, queue_item *item, bool finished);
|
||||||
|
void queue_pending_wait(queue *q, uint64_t itemid);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -6,12 +6,12 @@
|
|||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
|
||||||
#include "http.h"
|
#include "http.h"
|
||||||
|
#include "socket.h"
|
||||||
#include "server-connection.h"
|
#include "server-connection.h"
|
||||||
|
|
||||||
|
server_connection* server_connection_new(socket_info *skt) {
|
||||||
server_connection* server_connection_new(skt_info *skt) {
|
|
||||||
static uint64_t nextid = 1;
|
static uint64_t nextid = 1;
|
||||||
assert(skt!=null);
|
assert(skt!=NULL);
|
||||||
|
|
||||||
server_connection *conn = calloc(1, sizeof(server_connection));
|
server_connection *conn = calloc(1, sizeof(server_connection));
|
||||||
conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
|
conn->id = __atomic_fetch_add(&nextid, 1, __ATOMIC_SEQ_CST);
|
||||||
|
|||||||
@@ -20,18 +20,26 @@ extern "C" {
|
|||||||
#include "http-reader.h"
|
#include "http-reader.h"
|
||||||
#include "socket.h"
|
#include "socket.h"
|
||||||
|
|
||||||
|
typedef struct server_parse_status {
|
||||||
|
http_request *current_request;
|
||||||
|
bool request_complete;
|
||||||
|
http_parser *parser;
|
||||||
|
http_header *parser_current_header;
|
||||||
|
int parser_header_state;
|
||||||
|
} server_parse_status;
|
||||||
|
|
||||||
typedef struct server_connection {
|
typedef struct server_connection {
|
||||||
uint64_t id;
|
uint64_t id;
|
||||||
struct skt_info *skt;
|
struct socket_info *skt;
|
||||||
time_t last_activity;
|
time_t last_activity;
|
||||||
http_response_list *pending_responses;
|
http_response_list *pending_responses;
|
||||||
data_buffer_list *pending_writes;
|
data_buffer_list *pending_writes;
|
||||||
uint64_t write_qid;//item id in write queue
|
uint64_t write_qid;//item id in write queue
|
||||||
request_parse_state *parse_state;
|
server_parse_status *parse_state;
|
||||||
struct server_connection *next;
|
struct server_connection *next;
|
||||||
} server_connection;
|
} server_connection;
|
||||||
|
|
||||||
server_connection* server_connection_new(skt_info *skt);
|
server_connection* server_connection_new(socket_info *skt);
|
||||||
void server_connection_delete(server_connection *conn);
|
void server_connection_delete(server_connection *conn);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|||||||
@@ -63,7 +63,7 @@ bool server_socket_canaccept(int fd) {
|
|||||||
free(pfd);
|
free(pfd);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
skt_info* server_socket_accept(int fd, int flags) {
|
socket_info* server_socket_accept(int fd, int flags) {
|
||||||
struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in));
|
struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in));
|
||||||
|
|
||||||
int clientfd=0;
|
int clientfd=0;
|
||||||
@@ -74,7 +74,7 @@ skt_info* server_socket_accept(int fd, int flags) {
|
|||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
skt_info* skt = skt_new(clientfd);
|
socket_info* skt = skt_new(clientfd);
|
||||||
skt->clientaddr = clientaddr;
|
skt->clientaddr = clientaddr;
|
||||||
skt->fd = clientfd;
|
skt->fd = clientfd;
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ extern "C" {
|
|||||||
void server_socket_listen(int fd, uint16_t port);
|
void server_socket_listen(int fd, uint16_t port);
|
||||||
void server_socket_release(int fd);
|
void server_socket_release(int fd);
|
||||||
bool server_socket_canaccept(int fd);
|
bool server_socket_canaccept(int fd);
|
||||||
skt_info* server_socket_accept(int fd, int flags);
|
socket_info* server_socket_accept(int fd, int flags);
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|||||||
79
src/server-state.c
Normal file
79
src/server-state.c
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
#include <stdlib.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <assert.h>
|
||||||
|
|
||||||
|
#include "ut/utlist.h"
|
||||||
|
|
||||||
|
#include "util.h"
|
||||||
|
#include "config.h"
|
||||||
|
#include "server-state.h"
|
||||||
|
#include "queue.h"
|
||||||
|
#include "thread-pool.h"
|
||||||
|
|
||||||
|
server_status* server_status_new(config_server *config) {
|
||||||
|
assert(config!=NULL);
|
||||||
|
assert(config->host_count>0);
|
||||||
|
assert(config->listen_port>0);
|
||||||
|
|
||||||
|
server_status *status = calloc(1, sizeof(server_status));
|
||||||
|
status->started = false;
|
||||||
|
status->stopped = true;
|
||||||
|
status->shutdown_requested = false;
|
||||||
|
status->config = config;
|
||||||
|
status->epollfd = status->sfd = 0;
|
||||||
|
status->clients = NULL;
|
||||||
|
|
||||||
|
return status;
|
||||||
|
}
|
||||||
|
void server_status_delete(server_status *status) {
|
||||||
|
assert(status!=NULL);
|
||||||
|
assert(status->stopped==true);
|
||||||
|
assert(status->pools[0]==NULL);
|
||||||
|
|
||||||
|
server_connection *elem, *tmp;
|
||||||
|
LL_FOREACH_SAFE(status->clients, elem, tmp) {
|
||||||
|
LL_DELETE(status->clients, elem);
|
||||||
|
server_connection_delete(elem);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(status);
|
||||||
|
}
|
||||||
|
|
||||||
|
void server_start_pools(server_status *status) {
|
||||||
|
assert(status!=NULL);
|
||||||
|
assert(status->pools[0]==NULL);
|
||||||
|
|
||||||
|
//Create thread pools/queues
|
||||||
|
thread_pool *pool = thread_pool_new("read", queue_new());
|
||||||
|
pool->min_threads = 1;
|
||||||
|
pool->max_threads = 2;
|
||||||
|
//pool->func = thloop_read;
|
||||||
|
status->pools[POOL_READ] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
|
||||||
|
pool = thread_pool_new("write", queue_new());
|
||||||
|
pool->min_threads = 1;
|
||||||
|
pool->max_threads = 2;
|
||||||
|
//pool->func = thloop_write;
|
||||||
|
status->pools[POOL_WRITE] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
|
||||||
|
pool = thread_pool_new("worker", queue_new());
|
||||||
|
pool->min_threads = 1;
|
||||||
|
pool->max_threads = 5;
|
||||||
|
//pool->func = thloop_worker;
|
||||||
|
status->pools[POOL_WORKER] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
}
|
||||||
|
void server_stop_pools(server_status *status) {
|
||||||
|
assert(status!=NULL);
|
||||||
|
assert(status->pools[0]!=NULL);
|
||||||
|
|
||||||
|
for(int i=0; i<THREADPOOL_NUM; i++) {
|
||||||
|
thread_pool_stop(status->pools[i]);
|
||||||
|
queue_delete(status->pools[i]->queue);
|
||||||
|
thread_pool_stop(status->pools[i]);
|
||||||
|
}
|
||||||
|
memset(status->pools, 0, sizeof(status->pools));
|
||||||
|
}
|
||||||
45
src/server-state.h
Normal file
45
src/server-state.h
Normal file
@@ -0,0 +1,45 @@
|
|||||||
|
/*
|
||||||
|
* File: server-state.h
|
||||||
|
* Author: sam
|
||||||
|
*
|
||||||
|
* Created on 17 August 2014, 22:18
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef SERVER_STATE_H
|
||||||
|
#define SERVER_STATE_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#include "http.h"
|
||||||
|
#include "config.h"
|
||||||
|
#include "thread-pool.h"
|
||||||
|
#include "server-connection.h"
|
||||||
|
|
||||||
|
typedef enum server_pool {
|
||||||
|
POOL_READ, POOL_WRITE, POOL_WORKER, /*{*/THREADPOOL_NUM/*}must be last*/
|
||||||
|
} server_pool;
|
||||||
|
|
||||||
|
typedef struct server_status {
|
||||||
|
config_server *config;
|
||||||
|
bool started, stopped;
|
||||||
|
bool shutdown_requested;
|
||||||
|
int sfd;
|
||||||
|
int epollfd;
|
||||||
|
thread_pool *pools[THREADPOOL_NUM];
|
||||||
|
server_connection *clients;
|
||||||
|
} server_status;
|
||||||
|
|
||||||
|
server_status* server_status_new(config_server *config);
|
||||||
|
void server_status_delete(server_status *status);
|
||||||
|
|
||||||
|
void server_start_pools(server_status *status);
|
||||||
|
void server_stop_pools(server_status *status);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* SERVER_STATE_H */
|
||||||
|
|
||||||
17
src/server.c
Normal file
17
src/server.c
Normal file
@@ -0,0 +1,17 @@
|
|||||||
|
#include <stdlib.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
|
||||||
|
#include "log.h"
|
||||||
|
#include "util.h"
|
||||||
|
#include "mime.h"
|
||||||
|
#include "queue.h"
|
||||||
|
#include "thread-pool.h"
|
||||||
|
#include "http_parser.h"
|
||||||
|
#include "http.h"
|
||||||
|
#include "http-reader.h"
|
||||||
|
#include "config.h"
|
||||||
|
|
||||||
|
#include "server-socket.h"
|
||||||
|
#include "server-connection.h"
|
||||||
|
#include "server.h"
|
||||||
22
src/server.h
Normal file
22
src/server.h
Normal file
@@ -0,0 +1,22 @@
|
|||||||
|
/*
|
||||||
|
* File: server.h
|
||||||
|
* Author: sam
|
||||||
|
*
|
||||||
|
* Created on 17 August 2014, 22:01
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef SERVER_H
|
||||||
|
#define SERVER_H
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* SERVER_H */
|
||||||
|
|
||||||
19
src/socket.c
19
src/socket.c
@@ -23,9 +23,9 @@ u_int64_t skt_nextid() {
|
|||||||
static u_int64_t id = 1;
|
static u_int64_t id = 1;
|
||||||
return __atomic_fetch_add(&id, 1, __ATOMIC_SEQ_CST);
|
return __atomic_fetch_add(&id, 1, __ATOMIC_SEQ_CST);
|
||||||
}
|
}
|
||||||
skt_info* skt_new(int fd) {
|
socket_info* skt_new(int fd) {
|
||||||
assert(fd>0);
|
assert(fd>0);
|
||||||
skt_info* skt = calloc(1, sizeof(skt_info));
|
socket_info* skt = calloc(1, sizeof(socket_info));
|
||||||
skt->id = skt_nextid();
|
skt->id = skt_nextid();
|
||||||
skt->fd = fd;
|
skt->fd = fd;
|
||||||
skt->time_opened = time(NULL);
|
skt->time_opened = time(NULL);
|
||||||
@@ -33,13 +33,14 @@ skt_info* skt_new(int fd) {
|
|||||||
skt->clientaddr = NULL;
|
skt->clientaddr = NULL;
|
||||||
return skt;
|
return skt;
|
||||||
}
|
}
|
||||||
void skt_delete(skt_info* skt) {
|
|
||||||
|
void skt_delete(socket_info* skt) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
free(skt->clientaddr);
|
free(skt->clientaddr);
|
||||||
free(skt);
|
free(skt);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool skt_canread(skt_info* skt) {
|
bool skt_canread(socket_info* skt) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
int len = 0;
|
int len = 0;
|
||||||
if (ioctl(skt->fd, FIONREAD, &len) < 0) {
|
if (ioctl(skt->fd, FIONREAD, &len) < 0) {
|
||||||
@@ -48,7 +49,7 @@ bool skt_canread(skt_info* skt) {
|
|||||||
}
|
}
|
||||||
return len > 0;
|
return len > 0;
|
||||||
}
|
}
|
||||||
size_t skt_read(skt_info* skt, char* buffer, size_t bufferlen) {
|
size_t skt_read(socket_info* skt, char* buffer, size_t bufferlen) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
int result = read(skt->fd, buffer, bufferlen);
|
int result = read(skt->fd, buffer, bufferlen);
|
||||||
if (result < 0) {
|
if (result < 0) {
|
||||||
@@ -60,7 +61,7 @@ size_t skt_read(skt_info* skt, char* buffer, size_t bufferlen) {
|
|||||||
}
|
}
|
||||||
return result; //Number of bytes read
|
return result; //Number of bytes read
|
||||||
}
|
}
|
||||||
size_t skt_write(skt_info* skt, char* data, size_t len) {
|
size_t skt_write(socket_info* skt, char* data, size_t len) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
assert(data != NULL);
|
assert(data != NULL);
|
||||||
|
|
||||||
@@ -74,7 +75,7 @@ size_t skt_write(skt_info* skt, char* data, size_t len) {
|
|||||||
}
|
}
|
||||||
return result; //bytes written
|
return result; //bytes written
|
||||||
}
|
}
|
||||||
int skt_write_data_buffer(skt_info *skt, data_buffer_list *list) {
|
int skt_data_buffer(socket_info *skt, data_buffer_list *list) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
assert(list != NULL);
|
assert(list != NULL);
|
||||||
BUFFER_LIST_RD_LOCK(list);
|
BUFFER_LIST_RD_LOCK(list);
|
||||||
@@ -108,13 +109,13 @@ int skt_write_data_buffer(skt_info *skt, data_buffer_list *list) {
|
|||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
void skt_close(skt_info* skt) {
|
void skt_close(socket_info* skt) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
if (close(skt->fd) < 0) {
|
if (close(skt->fd) < 0) {
|
||||||
warning(true, "error closing socket");
|
warning(true, "error closing socket");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const char* skt_clientaddr(skt_info *skt) {
|
const char* skt_clientaddr(socket_info *skt) {
|
||||||
assert(skt != NULL);
|
assert(skt != NULL);
|
||||||
char *tmp = calloc(INET_ADDRSTRLEN, sizeof(char));
|
char *tmp = calloc(INET_ADDRSTRLEN, sizeof(char));
|
||||||
const char* address = inet_ntop(AF_INET, &skt->clientaddr->sin_addr, tmp, INET_ADDRSTRLEN);
|
const char* address = inet_ntop(AF_INET, &skt->clientaddr->sin_addr, tmp, INET_ADDRSTRLEN);
|
||||||
|
|||||||
20
src/socket.h
20
src/socket.h
@@ -20,24 +20,24 @@ extern "C" {
|
|||||||
|
|
||||||
#include "data-buffer.h"
|
#include "data-buffer.h"
|
||||||
|
|
||||||
typedef struct skt_info {
|
typedef struct socket_info {
|
||||||
u_int64_t id;
|
u_int64_t id;
|
||||||
int fd;
|
int fd;
|
||||||
struct sockaddr_in* clientaddr;
|
struct sockaddr_in* clientaddr;
|
||||||
time_t time_opened;
|
time_t time_opened;
|
||||||
bool error;
|
bool error;
|
||||||
} skt_info;
|
} socket_info;
|
||||||
|
|
||||||
u_int64_t skt_nextid();
|
u_int64_t skt_nextid();
|
||||||
skt_info* skt_new(int fd);
|
socket_info* skt_new(int fd);
|
||||||
void skt_delete(skt_info *skt);
|
void skt_delete(socket_info *skt);
|
||||||
|
|
||||||
bool skt_canread(skt_info *skt);
|
bool skt_canread(socket_info *skt);
|
||||||
size_t skt_read(skt_info *skt, char* buffer, size_t bufferlen);
|
size_t skt_read(socket_info *skt, char* buffer, size_t bufferlen);
|
||||||
size_t skt_write(skt_info* skt, char* data, size_t len);
|
size_t skt_write(socket_info* skt, char* data, size_t len);
|
||||||
int skt_write_data_buffer(skt_info *skt, data_buffer_list *list);
|
int skt_write_data_buffer(socket_info *skt, data_buffer_list *list);
|
||||||
void skt_close(skt_info *skt);
|
void skt_close(socket_info *skt);
|
||||||
const char* skt_clientaddr(skt_info *skt);
|
const char* skt_clientaddr(socket_info *skt);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user