Moved queue ping code into queue code

This commit is contained in:
2014-08-05 21:02:16 +01:00
parent 19c4351df9
commit 826c48f3ad
3 changed files with 11 additions and 5 deletions

View File

@@ -86,3 +86,8 @@ void queue_clear(queue *q) {
pthread_cond_broadcast(q->cond); pthread_cond_broadcast(q->cond);
QUEUE_UNLOCK(q); QUEUE_UNLOCK(q);
} }
void queue_ping(queue *q) {
QUEUE_LOCK(q);
pthread_cond_broadcast(q->cond);
QUEUE_UNLOCK(q);
}

View File

@@ -52,6 +52,7 @@ extern "C" {
int queue_remove(queue *q, queue_item *item); int queue_remove(queue *q, queue_item *item);
queue_item* queue_fetchone(queue *q, bool blocking); queue_item* queue_fetchone(queue *q, bool blocking);
void queue_clear(queue *q); void queue_clear(queue *q);
void queue_ping(queue *q);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@@ -125,7 +125,7 @@ void* thread_mgt(void* arg) {
int64_t queue_count_diff = pool->queue->count - last_queue_count; int64_t queue_count_diff = pool->queue->count - last_queue_count;
int64_t watermark = pool->thread_count * pool->queue_factor; int64_t watermark = pool->thread_count * pool->queue_factor;
if (queue_count_diff > watermark && pool->thread_count <= pool->max_threads) { if (queue_count_diff > watermark && pool->thread_count < pool->max_threads) {
//New thread //New thread
thread *th = thread_new(pool); thread *th = thread_new(pool);
thread_pool_add_thread(pool, th); thread_pool_add_thread(pool, th);
@@ -165,10 +165,10 @@ void* thread_mgt(void* arg) {
LL_FOREACH(pool->threads, elem) { LL_FOREACH(pool->threads, elem) {
elem->stop = true; elem->stop = true;
} }
//Ping the queue to wake up the threads
QUEUE_LOCK(pool->queue); //Ping the queue to wake up the threads (this should prompt them to check the stop flag)
pthread_cond_broadcast(pool->queue->cond); queue_ping(pool->queue);
QUEUE_UNLOCK(pool->queue);
//Remove threads //Remove threads
LL_FOREACH_SAFE(pool->threads, elem, tmp) { LL_FOREACH_SAFE(pool->threads, elem, tmp) {
thread_stop(elem); thread_stop(elem);