diff --git a/nbproject/Makefile-Debug.mk b/nbproject/Makefile-Debug.mk index 94cfe92..547a80b 100644 --- a/nbproject/Makefile-Debug.mk +++ b/nbproject/Makefile-Debug.mk @@ -45,6 +45,7 @@ OBJECTFILES= \ ${OBJECTDIR}/src/mime.o \ ${OBJECTDIR}/src/queue.o \ ${OBJECTDIR}/src/socket.o \ + ${OBJECTDIR}/src/thread-pool.o \ ${OBJECTDIR}/src/util.o @@ -122,6 +123,11 @@ ${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c ${RM} "$@.d" $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/socket.o src/socket.c +${OBJECTDIR}/src/thread-pool.o: nbproject/Makefile-${CND_CONF}.mk src/thread-pool.c + ${MKDIR} -p ${OBJECTDIR}/src + ${RM} "$@.d" + $(COMPILE.c) -g -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/thread-pool.o src/thread-pool.c + ${OBJECTDIR}/src/util.o: nbproject/Makefile-${CND_CONF}.mk src/util.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" diff --git a/nbproject/Makefile-Release.mk b/nbproject/Makefile-Release.mk index 5ba0cec..6a01c53 100644 --- a/nbproject/Makefile-Release.mk +++ b/nbproject/Makefile-Release.mk @@ -45,6 +45,7 @@ OBJECTFILES= \ ${OBJECTDIR}/src/mime.o \ ${OBJECTDIR}/src/queue.o \ ${OBJECTDIR}/src/socket.o \ + ${OBJECTDIR}/src/thread-pool.o \ ${OBJECTDIR}/src/util.o @@ -122,6 +123,11 @@ ${OBJECTDIR}/src/socket.o: nbproject/Makefile-${CND_CONF}.mk src/socket.c ${RM} "$@.d" $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/socket.o src/socket.c +${OBJECTDIR}/src/thread-pool.o: nbproject/Makefile-${CND_CONF}.mk src/thread-pool.c + ${MKDIR} -p ${OBJECTDIR}/src + ${RM} "$@.d" + $(COMPILE.c) -O2 -Werror -DINI_ALLOW_BOM=0 -DINI_ALLOW_MULTILINE=0 -D_GNU_SOURCE -Ilib -std=c99 -MMD -MP -MF "$@.d" -o ${OBJECTDIR}/src/thread-pool.o src/thread-pool.c + ${OBJECTDIR}/src/util.o: nbproject/Makefile-${CND_CONF}.mk src/util.c ${MKDIR} -p ${OBJECTDIR}/src ${RM} "$@.d" diff --git a/nbproject/configurations.xml b/nbproject/configurations.xml index 503c786..9d9f331 100644 --- a/nbproject/configurations.xml +++ b/nbproject/configurations.xml @@ -14,6 +14,7 @@ src/mime.h src/queue.h src/socket.h + src/thread-pool.h src/util.h src/mime.c src/queue.c src/socket.c + src/thread-pool.c src/util.c + + + + @@ -211,6 +217,10 @@ + + + + diff --git a/nbproject/private/private.xml b/nbproject/private/private.xml index 7cd6f01..f6e8961 100644 --- a/nbproject/private/private.xml +++ b/nbproject/private/private.xml @@ -8,10 +8,16 @@ file:/home/sam/NetBeansProjects/KHttp/src/socket.c - file:/home/sam/NetBeansProjects/KHttp/src/util.h + file:/home/sam/NetBeansProjects/KHttp/src/mime.c file:/home/sam/NetBeansProjects/KHttp/src/config.c + file:/home/sam/NetBeansProjects/KHttp/src/mime.h + file:/home/sam/NetBeansProjects/KHttp/src/thread-pool.c file:/home/sam/NetBeansProjects/KHttp/src/main.h + file:/home/sam/NetBeansProjects/KHttp/src/queue.c file:/home/sam/NetBeansProjects/KHttp/src/main.c + file:/home/sam/NetBeansProjects/KHttp/lib/ut/utlist.h + file:/home/sam/NetBeansProjects/KHttp/src/thread-pool.h + file:/home/sam/NetBeansProjects/KHttp/src/queue.h file:/home/sam/NetBeansProjects/KHttp/src/util.c diff --git a/src/main.c b/src/main.c index 7033f42..0c39882 100644 --- a/src/main.c +++ b/src/main.c @@ -29,6 +29,7 @@ #include "http-server.h" #include "mime.h" #include "queue.h" +#include "thread-pool.h" int serverfd = 0; volatile static bool stop = false; diff --git a/src/queue.c b/src/queue.c index d4c450c..fe639ce 100644 --- a/src/queue.c +++ b/src/queue.c @@ -75,4 +75,14 @@ queue_item* queue_fetchone(queue *q, bool blocking) { } QUEUE_UNLOCK(q); return item; +} +void queue_clear(queue *q) { + QUEUE_LOCK(q); + queue_item *elem, *tmp; + DL_FOREACH_SAFE(q->list, elem, tmp) { + queue_item_delete(elem); + DL_DELETE(q->list, elem); + } + pthread_cond_broadcast(q->cond); + QUEUE_UNLOCK(q); } \ No newline at end of file diff --git a/src/queue.h b/src/queue.h index eaa8aeb..6bc06ee 100644 --- a/src/queue.h +++ b/src/queue.h @@ -51,6 +51,7 @@ extern "C" { int queue_add(queue *q, queue_item *item); int queue_remove(queue *q, queue_item *item); queue_item* queue_fetchone(queue *q, bool blocking); + void queue_clear(queue *q); #ifdef __cplusplus } diff --git a/src/thread-pool.c b/src/thread-pool.c new file mode 100644 index 0000000..46daaa0 --- /dev/null +++ b/src/thread-pool.c @@ -0,0 +1,178 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "ut/utlist.h" +#include "thread-pool.h" +#include "util.h" + +uint64_t thread_newid(){ + static uint64_t id = 1; + return __atomic_fetch_add(&id, 1, __ATOMIC_SEQ_CST); +} +thread* thread_new(thread_pool *pool) { + thread* t = calloc(1, sizeof(thread)); + t->tid = thread_newid(); + t->pool = pool; + t->stop = false; + return t; +} +void thread_delete(thread *th) { + free(th); +} +void thread_start(thread *th, thread_func func) { + info("Starting thread [%s][%lu]", th->pool->name, th->tid); + if (pthread_create(&th->pthread, NULL, func, (void*)th)!=0) { + fatal("Failed to start thread"); + } +} +void thread_stop(thread *th) { + info("Stopping thread [%s][%lu]", th->pool->name, th->tid); + if (thread_trystop(th)==false) { + thread_kill(th); + } +} +bool thread_trystop(thread *th) { + void* ret; + struct timespec waittime; + waittime.tv_nsec = 0; + waittime.tv_sec = 2; + if (pthread_timedjoin_np(th->pthread, &ret, &waittime)!=0) { + return false; + } + return true; +} +void thread_kill(thread* th) { + pthread_cancel(th->pthread); + void* ret; + pthread_join(th->pthread, &ret); +} + +thread_pool* thread_pool_new(char *name, queue *queue) { + thread_pool* pool = calloc(1, sizeof(thread_pool)); + pool->name = calloc(strlen(name)+1, sizeof(char)); + strcpy(pool->name, name); + pool->queue = queue; + pool->shutdown = false; + pool->min_threads=1; + pool->max_threads=1; + pool->queue_factor=5; + return pool; +} +void thread_pool_delete(thread_pool *pool) { + free(pool->name); + free(pool->management_thread); + thread *elem, *tmp; + LL_FOREACH_SAFE(pool->threads, elem, tmp){ + LL_DELETE(pool->threads, elem); + thread_delete(elem); + } + free(pool->threads); + free(pool); +} +void thread_pool_start(thread_pool *pool) { + info("Starting thread pool %s", pool->name); + pool->management_thread = thread_new(pool); + thread_start(pool->management_thread, thread_mgt); +} +void thread_pool_stop(thread_pool *pool) { + info("Stopping thread pool %s", pool->name); + pool->shutdown = true; + void* ret; + if (pthread_join(pool->management_thread->pthread, &ret) != 0) { + fatal("Could not join thread pool manager"); + } +} +void thread_pool_add_thread(thread_pool *pool, thread *th) { + thread_start(th, pool->func); + LL_PREPEND(pool->threads, th); + pool->thread_count++; +} +void thread_pool_remove_thread(thread_pool *pool, thread *th) { + LL_DELETE(pool->threads, th); + pool->thread_count--; +} + +void* thread_mgt(void* arg) { + thread* th = (thread*)arg; + thread_pool *pool = th->pool; + + if (pool->min_threads > 0) { + info("Starting initial threads for %s", pool->name); + //Start initial threads + for(int i=0; imin_threads;i++) { + thread *th = thread_new(pool); + thread_pool_add_thread(pool, th); + } + } + if (pool->min_threads > pool->max_threads) { + warning(false, "thread pool %s: min threads exceeds max threads", pool->name); + pool->max_threads = pool->min_threads; + } + + //Management loop + struct timespec loopdelay; + loopdelay.tv_nsec = 10*1000; + loopdelay.tv_sec = 0; + + int64_t last_queue_count = 0; + while(pool->shutdown == false) { + + int64_t queue_count_diff = pool->queue->count - last_queue_count; + int64_t watermark = pool->thread_count * pool->queue_factor; + if (queue_count_diff > watermark && pool->thread_count <= pool->max_threads) { + //New thread + thread *th = thread_new(pool); + thread_pool_add_thread(pool, th); + last_queue_count = pool->queue->count; + } else if ((queue_count_diff*-1) > watermark && pool->thread_count > pool->min_threads) { + //Remove thread (stop the first in the list) + pool->threads->stop = true; + last_queue_count = pool->queue->count; + } + + //cleanup finished threads + void* ret; + thread *elem,*tmp; + LL_FOREACH_SAFE(pool->threads, elem, tmp) { + int joinresult = pthread_tryjoin_np(elem->pthread, &ret); + if (joinresult != 0) { + if (joinresult == EBUSY) { + continue; + } + errno = joinresult; + fatal("thread try join failed"); + } + info("Close thread [%s][%lu]", pool->name, elem->tid); + thread_pool_remove_thread(pool, elem); + thread_delete(elem); + } + + nanosleep(&loopdelay, NULL); + } + //Wait until queue is empty + while(pool->queue->count > 0) { + nanosleep(&loopdelay, NULL); + } + + //Shutdown threads + thread *elem, *tmp; + LL_FOREACH(pool->threads, elem) { + elem->stop = true; + } + //Ping the queue to wake up the threads + QUEUE_LOCK(pool->queue); + pthread_cond_broadcast(pool->queue->cond); + QUEUE_UNLOCK(pool->queue); + //Remove threads + LL_FOREACH_SAFE(pool->threads, elem, tmp) { + thread_stop(elem); + LL_DELETE(pool->threads, elem); + thread_delete(elem); + } +} \ No newline at end of file diff --git a/src/thread-pool.h b/src/thread-pool.h new file mode 100644 index 0000000..5c041a2 --- /dev/null +++ b/src/thread-pool.h @@ -0,0 +1,69 @@ +/* + * File: thread-pool.h + * Author: sam + * + * Created on 05 August 2014, 13:32 + */ + +#ifndef THREAD_POOL_H +#define THREAD_POOL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include "queue.h" + + typedef struct thread { + uint64_t tid; + pthread_t pthread; + struct thread_pool *pool; + bool stop; + struct thread* next; + } thread; + + typedef void* (*thread_func)(void*); + + typedef struct thread_pool { + char* name; + bool shutdown; + thread_func func; + thread *threads; + size_t thread_count; + size_t min_threads; + size_t max_threads; + size_t queue_factor; + thread *management_thread; + queue *queue; + } thread_pool; + + uint64_t thread_newid(); + thread* thread_new(thread_pool *pool); + void thread_delete(thread *th); + void thread_start(thread *th, thread_func func); + void thread_stop(thread *th); + bool thread_trystop(thread *th); + void thread_kill(thread *th); + +#define THREAD_POOL_FOREACH_THREAD(pool, th, i) \ + i=0; \ + th = pool->thread_count > 0 ? pool->threads[i] : NULL; \ + for(;ithread_count;th=pool->threads[i++]) + + thread_pool* thread_pool_new(char *name, queue *queue); + void thread_pool_delete(thread_pool *pool); + void thread_pool_start(thread_pool *pool); + void thread_pool_stop(thread_pool *pool); + void thread_pool_add_thread(thread_pool *pool, thread *th); + void thread_pool_remove_thread(thread_pool *pool, thread *th); + + void* thread_mgt(void* th); + +#ifdef __cplusplus +} +#endif + +#endif /* THREAD_POOL_H */ + diff --git a/src/util.c b/src/util.c index b468558..2f78117 100644 --- a/src/util.c +++ b/src/util.c @@ -12,28 +12,34 @@ #include "util.h" -void fatal(char* msg) { +void fatal(char* fmt, ...) { + char msg[128] = {0}; + va_list va; + va_start(va, fmt); + vsnprintf(msg, 128, fmt, va); + va_end(va); + fprintf(stderr, "\n"); perror(msg); exit(EXIT_FAILURE); } -void warning(bool showPError, char* msg, ...) { - char warning[128] = {0}; +void warning(bool use_errno, char* fmt, ...) { + char msg[128] = {0}; va_list va; - va_start(va, msg); - vsnprintf(warning, 128, msg, va); + va_start(va, fmt); + vsnprintf(msg, 128, fmt, va); va_end(va); - if (showPError == true) { - perror(warning); + if (use_errno == true) { + perror(msg); } else { - fprintf(stderr, "%s\n", warning); + fprintf(stderr, "%s\n", msg); } } -void info(char* msg, ...) { +void info(char* fmt, ...) { va_list va; - va_start(va, msg); - vfprintf(stdout, msg, va); + va_start(va, fmt); + vfprintf(stdout, fmt, va); fputc('\n', stdout); va_end(va); } diff --git a/src/util.h b/src/util.h index 135c31e..3eddd56 100644 --- a/src/util.h +++ b/src/util.h @@ -19,9 +19,9 @@ extern "C" { size_t size; } file_map; - void fatal(char* msg); - void warning(bool showPError, char* msg, ...); - void info(char* msg, ...); + void fatal(char* fmt, ...); + void warning(bool use_errno, char* fmt, ...); + void info(char* fmt, ...); char* str_trimwhitespace(char *str); char** str_splitlines(char *str, size_t *line_count);