diff --git a/src/queue.c b/src/queue.c index fe639ce..70d1673 100644 --- a/src/queue.c +++ b/src/queue.c @@ -85,4 +85,9 @@ void queue_clear(queue *q) { } pthread_cond_broadcast(q->cond); QUEUE_UNLOCK(q); +} +void queue_ping(queue *q) { + QUEUE_LOCK(q); + pthread_cond_broadcast(q->cond); + QUEUE_UNLOCK(q); } \ No newline at end of file diff --git a/src/queue.h b/src/queue.h index 6bc06ee..0e8da26 100644 --- a/src/queue.h +++ b/src/queue.h @@ -52,6 +52,7 @@ extern "C" { int queue_remove(queue *q, queue_item *item); queue_item* queue_fetchone(queue *q, bool blocking); void queue_clear(queue *q); + void queue_ping(queue *q); #ifdef __cplusplus } diff --git a/src/thread-pool.c b/src/thread-pool.c index 25dc18e..0a70d1b 100644 --- a/src/thread-pool.c +++ b/src/thread-pool.c @@ -125,7 +125,7 @@ void* thread_mgt(void* arg) { int64_t queue_count_diff = pool->queue->count - last_queue_count; int64_t watermark = pool->thread_count * pool->queue_factor; - if (queue_count_diff > watermark && pool->thread_count <= pool->max_threads) { + if (queue_count_diff > watermark && pool->thread_count < pool->max_threads) { //New thread thread *th = thread_new(pool); thread_pool_add_thread(pool, th); @@ -165,10 +165,10 @@ void* thread_mgt(void* arg) { LL_FOREACH(pool->threads, elem) { elem->stop = true; } - //Ping the queue to wake up the threads - QUEUE_LOCK(pool->queue); - pthread_cond_broadcast(pool->queue->cond); - QUEUE_UNLOCK(pool->queue); + + //Ping the queue to wake up the threads (this should prompt them to check the stop flag) + queue_ping(pool->queue); + //Remove threads LL_FOREACH_SAFE(pool->threads, elem, tmp) { thread_stop(elem);