Added thread safe work queue

This commit is contained in:
2014-08-04 18:56:29 +01:00
parent cb16d83961
commit 257ce14115
7 changed files with 168 additions and 13 deletions

View File

@@ -15,6 +15,7 @@
#include <sys/mman.h>
#include <ctype.h>
#include <signal.h>
#include <bits/stdio2.h>
#include "http_parser.h"
@@ -27,6 +28,7 @@
#include "config.h"
#include "http-server.h"
#include "mime.h"
#include "queue.h"
int serverfd = 0;
volatile static bool stop = false;

78
src/queue.c Normal file
View File

@@ -0,0 +1,78 @@
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include "queue.h"
#include "util.h"
#include "ut/utlist.h"
queue_item* queue_item_new() {
queue_item *item = calloc(1, sizeof(queue_item));
return item;
}
void queue_item_delete(queue_item *item) {
free(item);
}
queue* queue_new() {
queue *q = calloc(1, sizeof(queue));
q->count = 0;
q->list = NULL;
q->mutex = calloc(1, sizeof(pthread_mutex_t));
if (pthread_mutex_init(q->mutex, NULL) != 0) {
fatal("Failed to init queue mutex");
}
q->cond = calloc(1, sizeof(pthread_cond_t));
if (pthread_cond_init(q->cond, NULL) != 0) {
fatal("Failed to init queue cond");
}
return q;
}
void queue_delete(queue *q) {
queue_item *elem, *tmp;
DL_FOREACH_SAFE(q->list, elem, tmp) {
queue_item_delete(elem);
DL_DELETE(q->list, elem);
}
pthread_mutex_destroy(q->mutex);
free(q->mutex);
pthread_cond_destroy(q->cond);
free(q->cond);
free(q);
}
int queue_add(queue *q, queue_item *item) {
QUEUE_LOCK(q);
DL_APPEND(q->list, item);
q->count++;
pthread_cond_signal(q->cond);
QUEUE_UNLOCK(q);
return 0;
}
int queue_remove(queue *q, queue_item *item) {
QUEUE_LOCK(q);
queue_item *elem, *tmp;
DL_FOREACH_SAFE(q->list, elem, tmp) {
if (elem == item) {
DL_DELETE(q->list, elem);
queue_item_delete(elem);
break;
}
}
q->count--;
QUEUE_UNLOCK(q);
}
queue_item* queue_fetchone(queue *q, bool blocking) {
queue_item *item = NULL;
QUEUE_LOCK(q);
if (q->count == 0 && blocking == true) {
pthread_cond_wait(q->cond, q->mutex);
}
if (q->count > 0) {
item = q->list;
DL_DELETE(q->list, q->list);
q->count--;
}
QUEUE_UNLOCK(q);
return item;
}

60
src/queue.h Normal file
View File

@@ -0,0 +1,60 @@
/*
* File: queue.h
* Author: sam
*
* Created on 04 August 2014, 14:27
*/
#ifndef QUEUE_H
#define QUEUE_H
#ifdef __cplusplus
extern "C" {
#endif
#include <pthread.h>
#include <stdint.h>
#include <stdbool.h>
typedef struct queue_item {
struct queue_item *prev;
struct queue_item *next;
char tag[16];
void *data;
} queue_item;
queue_item* queue_item_new();
void queue_item_delete(queue_item *item);
typedef struct queue {
queue_item *list;
uint64_t count;
pthread_mutex_t *mutex;
pthread_cond_t *cond;
} queue;
#define QUEUE_LOCK(q) \
do { \
if (pthread_mutex_lock(q->mutex)!=0) { \
fatal("Could not lock queue"); \
} \
} while(0)
#define QUEUE_UNLOCK(q) \
do { \
if (pthread_mutex_unlock(q->mutex)!=0) {\
fatal("Could not unlock queue"); \
} \
} while(0)
queue* queue_new();
void queue_delete(queue *q);
int queue_add(queue *q, queue_item *item);
int queue_remove(queue *q, queue_item *item);
queue_item* queue_fetchone(queue *q, bool blocking);
#ifdef __cplusplus
}
#endif
#endif /* QUEUE_H */