More work on new server loop
This commit is contained in:
@@ -82,7 +82,9 @@ void*log_loop(void* arg) {
|
|||||||
time_t ctime;
|
time_t ctime;
|
||||||
struct tm *tinfo = calloc(1,sizeof(struct tm));
|
struct tm *tinfo = calloc(1,sizeof(struct tm));
|
||||||
while(true) {
|
while(true) {
|
||||||
|
//Read next message pointer from pipe
|
||||||
if (read(l->pRead, buf, sizeof(void*)) <= 0) {
|
if (read(l->pRead, buf, sizeof(void*)) <= 0) {
|
||||||
|
//zero length indicates the write end closed (EOF)
|
||||||
if (l->running == false) {
|
if (l->running == false) {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|||||||
120
src/main-loop.c
120
src/main-loop.c
@@ -1,6 +1,11 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <sys/epoll.h>
|
#include <sys/epoll.h>
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <time.h>
|
||||||
|
|
||||||
|
#include <ut/utlist.h>
|
||||||
|
#include <errno.h>
|
||||||
|
|
||||||
#include "main-loop.h"
|
#include "main-loop.h"
|
||||||
#include "mime.h"
|
#include "mime.h"
|
||||||
@@ -8,6 +13,33 @@
|
|||||||
#include "socket.h"
|
#include "socket.h"
|
||||||
#include "thread-pool.h"
|
#include "thread-pool.h"
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
|
#include "log.h"
|
||||||
|
|
||||||
|
hmain_connection* hmain_connection_new(int fd, hmain_status *status) {
|
||||||
|
static u_int64_t nextid = 1;
|
||||||
|
|
||||||
|
hmain_connection *conn = calloc(1, sizeof(hmain_connection));
|
||||||
|
conn->cid = __atomic_fetch_add(&u_int64_t, 1, __ATOMIC_SEQ_CST);
|
||||||
|
conn->fd = fd;
|
||||||
|
conn->status = status;
|
||||||
|
conn->opened = time(NULL);
|
||||||
|
conn->last_activity = conn->opened;
|
||||||
|
conn->pending_responses = http_response_list_new();
|
||||||
|
|
||||||
|
return conn;
|
||||||
|
}
|
||||||
|
void hmain_connection_close(hmain_connection *conn) {
|
||||||
|
close(fd);
|
||||||
|
//TODO: remove from all queues
|
||||||
|
}
|
||||||
|
void hmain_connection_delete(hmain_connection *conn) {
|
||||||
|
http_response_list_delete(conn->pending_responses);
|
||||||
|
free(conn->clientaddr);
|
||||||
|
free(conn);
|
||||||
|
}
|
||||||
|
|
||||||
|
#define EVENT_IS(event, type) ((event.events & type) == type)
|
||||||
|
#define EVENT_ISNOT(event, type) (!EVENT_IS(event, type))
|
||||||
|
|
||||||
void hmain_setup(hmain_status **statusptr) {
|
void hmain_setup(hmain_status **statusptr) {
|
||||||
hmain_status *status = *statusptr;
|
hmain_status *status = *statusptr;
|
||||||
@@ -16,6 +48,7 @@ void hmain_setup(hmain_status **statusptr) {
|
|||||||
fatal("hmain already setup");
|
fatal("hmain already setup");
|
||||||
}
|
}
|
||||||
status = calloc(1, sizeof(hmain_status));
|
status = calloc(1, sizeof(hmain_status));
|
||||||
|
status->shutdown = false;
|
||||||
|
|
||||||
//Start Logging
|
//Start Logging
|
||||||
log_register_add(log_new("stderr", stderr), true, LALL & ~(LINFO|LDEBUG));
|
log_register_add(log_new("stderr", stderr), true, LALL & ~(LINFO|LDEBUG));
|
||||||
@@ -33,7 +66,6 @@ void hmain_setup(hmain_status **statusptr) {
|
|||||||
|
|
||||||
//Open our listening socket
|
//Open our listening socket
|
||||||
status->sfd = svr_create();
|
status->sfd = svr_create();
|
||||||
svr_setnonblock(status->sfd);
|
|
||||||
svr_listen(status->sfd, status->config->listen_port);
|
svr_listen(status->sfd, status->config->listen_port);
|
||||||
|
|
||||||
//Open epoll for socket
|
//Open epoll for socket
|
||||||
@@ -55,35 +87,48 @@ void hmain_setup(hmain_status **statusptr) {
|
|||||||
pool->min_threads = 1;
|
pool->min_threads = 1;
|
||||||
pool->max_threads = 2;
|
pool->max_threads = 2;
|
||||||
pool->func = thloop_read;
|
pool->func = thloop_read;
|
||||||
status->pool.read = pool;
|
status->pools[POOL_READ] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
|
||||||
pool = thread_pool_new("write", queue_new());
|
pool = thread_pool_new("write", queue_new());
|
||||||
pool->min_threads = 1;
|
pool->min_threads = 1;
|
||||||
pool->max_threads = 2;
|
pool->max_threads = 2;
|
||||||
pool->func = thloop_write;
|
pool->func = thloop_write;
|
||||||
status->pool.write = pool;
|
status->pools[POOL_WRITE] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
|
||||||
pool = thread_pool_new("disk_read", queue_new());
|
pool = thread_pool_new("disk_read", queue_new());
|
||||||
pool->min_threads = 1;
|
pool->min_threads = 1;
|
||||||
pool->max_threads = 2;
|
pool->max_threads = 2;
|
||||||
pool->func = thloop_disk_read;
|
pool->func = thloop_disk_read;
|
||||||
status->pool.disk_read = pool;
|
status->pools[POOL_DISK_READ] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
|
||||||
pool = thread_pool_new("worker", queue_new());
|
pool = thread_pool_new("worker", queue_new());
|
||||||
pool->min_threads = 1;
|
pool->min_threads = 1;
|
||||||
pool->max_threads = 5;
|
pool->max_threads = 5;
|
||||||
pool->func = thloop_worker;
|
pool->func = thloop_worker;
|
||||||
status->pool.workers = pool;
|
status->pools[POOL_WORKERS] = pool;
|
||||||
|
thread_pool_start(pool);
|
||||||
|
|
||||||
|
|
||||||
}
|
}
|
||||||
void hmain_teardown(hmain_status *status) {
|
void hmain_teardown(hmain_status *status) {
|
||||||
if (status == NULL) {
|
if (status == NULL) {
|
||||||
fatal("hmain is not setup");
|
fatal("hmain is not setup");
|
||||||
}
|
}
|
||||||
|
//Stop pools
|
||||||
|
size_t pool_count = sizeof(status->pools) / sizeof(status->pools[0]);
|
||||||
|
for(int i=0; i<pool_count; i++) {
|
||||||
|
thread_pool_stop(status->pools[i]);
|
||||||
|
queue_delete(status->pools[i]->queue);
|
||||||
|
thread_pool_delete(status->pools[i]);
|
||||||
|
}
|
||||||
|
|
||||||
//Close epoll
|
//Close epoll
|
||||||
close(status->epollfd);
|
close(status->epollfd);
|
||||||
|
|
||||||
//Close the listening connection
|
//Close the listening socket
|
||||||
svr_release(status->sfd);
|
svr_release(status->sfd);
|
||||||
|
|
||||||
//Cleanup the mime detector
|
//Cleanup the mime detector
|
||||||
@@ -97,7 +142,70 @@ void hmain_teardown(hmain_status *status) {
|
|||||||
free(status);
|
free(status);
|
||||||
}
|
}
|
||||||
void hmain_loop(hmain_status *status) {
|
void hmain_loop(hmain_status *status) {
|
||||||
|
struct epoll_event *current_event;
|
||||||
|
struct epoll_event *events = calloc(EPOLL_MAXEVENTS, sizeof(struct epoll_event));
|
||||||
|
|
||||||
|
struct sockaddr_in* clientaddr = calloc(1, sizeof(struct sockaddr_in));
|
||||||
|
socklen_t clientaddr_len = (socklen_t)sizeof(struct sockaddr_in);
|
||||||
|
|
||||||
|
while(status->shutdown == false) {
|
||||||
|
int event_count = epoll_wait(status->epollfd, events, EPOLL_MAXEVENTS, 1000);
|
||||||
|
for(int i=0; i<event_count; i++) {
|
||||||
|
current_event = &events[i];
|
||||||
|
|
||||||
|
if (EVENT_IS(current_event, EPOLLERR) ||
|
||||||
|
EVENT_IS(current_event, EPOLLHUP) ||
|
||||||
|
EVENT_ISNOT(current_event, EPOLLIN)) {
|
||||||
|
LOG(LERROR, "epoll; unexpected error or event");
|
||||||
|
//TODO: close socket & cleanup
|
||||||
|
|
||||||
|
} else if (EVENT_IS(current_event, EPOLLRDHUP)) {
|
||||||
|
LOG(LINFO, "connection closed");
|
||||||
|
//TODO: close socket & cleanup
|
||||||
|
} else if (EVENT_IS(current_event, EPOLLIN)) {
|
||||||
|
if (current_event->data.fd == status->sfd) {
|
||||||
|
//New connection(s)
|
||||||
|
while(true) {
|
||||||
|
struct epoll_event new_event;
|
||||||
|
int clientfd = accept4(status->sfd, (struct sockaddr*)clientaddr, clientaddr_len, SOCK_NONBLOCK | SOCK_CLOEXEC);
|
||||||
|
if (clientfd < 0) {
|
||||||
|
if (errno == EAGAIN || errno == EWOULDBLOCK) {
|
||||||
|
//Done with new connections
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
warning(true, "Error accepting new connection");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
hmain_connection *conn = hmain_connection_new(clientfd, status);
|
||||||
|
conn->clientaddr = calloc(1, sizeof(struct sockaddr_in));
|
||||||
|
memcpy(conn->clientaddr, clientaddr, clientaddr_len);
|
||||||
|
new_event.data.fd = clientfd;
|
||||||
|
new_event.data.ptr = conn;
|
||||||
|
new_event.events = EPOLLIN | EPOLLOUT | EPOLLHUP | EPOLLET;
|
||||||
|
if (epoll_ctl(status->epollfd, EPOLL_CTL_ADD, status->sfd, &new_event) < 0) {
|
||||||
|
fatal("Could not register new connection with epoll");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
//Data is ready to read on existing connection
|
||||||
|
hmain_connection *conn = (hmain_connection*)current_event->data.ptr;
|
||||||
|
if (conn->isReading == true) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
queue_add(status->pools[POOL_READ]->queue, queue_item_new2("READ", (void*)conn));
|
||||||
|
}
|
||||||
|
} else if (EVENT_IS(current_event, EPOLLOUT)) {
|
||||||
|
//Data can be written to connection
|
||||||
|
hmain_connection *conn = (hmain_connection*)current_event->data.ptr;
|
||||||
|
if (conn->isWriting == true || conn->pending_responses == NULL) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
queue_add(status->pools[POOL_WRITE]->queue, queue_item_new2("WRITE", (void*)conn));
|
||||||
|
}
|
||||||
|
}//for events
|
||||||
|
}//while shutdown == false
|
||||||
|
free(events);
|
||||||
|
free(clientaddr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* thloop_read(void * arg) {
|
void* thloop_read(void * arg) {
|
||||||
|
|||||||
@@ -8,22 +8,47 @@
|
|||||||
#ifndef MAIN_LOOP_H
|
#ifndef MAIN_LOOP_H
|
||||||
#define MAIN_LOOP_H
|
#define MAIN_LOOP_H
|
||||||
|
|
||||||
|
#include <stdbool.h>
|
||||||
|
#include <time.h>
|
||||||
#include "config.h"
|
#include "config.h"
|
||||||
#include "thread-pool.h"
|
#include "thread-pool.h"
|
||||||
|
#include "http.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define EPOLL_MAXEVENTS 128
|
||||||
|
|
||||||
|
typedef enum hmain_pool {
|
||||||
|
POOL_READ, POOL_WRITE, POOL_WORKERS, POOL_DISK_READ
|
||||||
|
};
|
||||||
|
|
||||||
typedef struct hmain_status {
|
typedef struct hmain_status {
|
||||||
config_server *config;
|
config_server *config;
|
||||||
int sfd;
|
int sfd;
|
||||||
int epollfd;
|
int epollfd;
|
||||||
struct {
|
thread_pool *pools[4];
|
||||||
thread_pool *read, *write, *workers, *disk_read;
|
bool shutdown;
|
||||||
} pool;
|
hmain_connection *connections;
|
||||||
} hmain_status;
|
} hmain_status;
|
||||||
|
|
||||||
|
typedef struct hmain_connection {
|
||||||
|
uint64_t cid;
|
||||||
|
hmain_status *status;
|
||||||
|
int fd;
|
||||||
|
struct sockaddr_in* clientaddr;
|
||||||
|
bool isReading, isWriting;
|
||||||
|
time_t opened;
|
||||||
|
time_t last_activity;
|
||||||
|
http_response_list *pending_responses;
|
||||||
|
hmain_connection *next;
|
||||||
|
} hmain_connection;
|
||||||
|
|
||||||
|
hmain_connection* hmain_connection_new(int fd, hmain_status *status);
|
||||||
|
void hmain_connection_close(hmain_connection *conn);
|
||||||
|
void hmain_connection_delete(hmain_connection *conn);
|
||||||
|
|
||||||
void hmain_setup(hmain_status **statusptr);
|
void hmain_setup(hmain_status **statusptr);
|
||||||
void hmain_teardown(hmain_status *status);
|
void hmain_teardown(hmain_status *status);
|
||||||
void hmain_loop(hmain_status *status);
|
void hmain_loop(hmain_status *status);
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <pthread.h>
|
#include <pthread.h>
|
||||||
|
#include <string.h>
|
||||||
|
|
||||||
#include "queue.h"
|
#include "queue.h"
|
||||||
#include "util.h"
|
#include "util.h"
|
||||||
@@ -10,6 +11,13 @@ queue_item* queue_item_new() {
|
|||||||
queue_item *item = calloc(1, sizeof(queue_item));
|
queue_item *item = calloc(1, sizeof(queue_item));
|
||||||
return item;
|
return item;
|
||||||
}
|
}
|
||||||
|
queue_item* queue_item_new2(char* tag, void* data) {
|
||||||
|
queue_item *item = queue_item_new();
|
||||||
|
item->tag[0] = '\0';
|
||||||
|
strncat(item->tag, tag, (sizeof(item->tag)/sizeof(char))+1);
|
||||||
|
item->data = data;
|
||||||
|
return item;
|
||||||
|
}
|
||||||
void queue_item_delete(queue_item *item) {
|
void queue_item_delete(queue_item *item) {
|
||||||
free(item);
|
free(item);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ extern "C" {
|
|||||||
} queue_item;
|
} queue_item;
|
||||||
|
|
||||||
queue_item* queue_item_new();
|
queue_item* queue_item_new();
|
||||||
|
queue_item* queue_item_new2(char* tag, void* data);
|
||||||
void queue_item_delete(queue_item *item);
|
void queue_item_delete(queue_item *item);
|
||||||
|
|
||||||
typedef struct queue {
|
typedef struct queue {
|
||||||
|
|||||||
12
src/socket.c
12
src/socket.c
@@ -111,7 +111,7 @@ char* skt_clientaddr(skt_info *skt) {
|
|||||||
|
|
||||||
int svr_create() {
|
int svr_create() {
|
||||||
int fd = 0;
|
int fd = 0;
|
||||||
fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);
|
fd = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
|
||||||
if (fd < 0) {
|
if (fd < 0) {
|
||||||
fatal("could not create socket");
|
fatal("could not create socket");
|
||||||
}
|
}
|
||||||
@@ -134,16 +134,6 @@ void svr_listen(int fd, uint16_t port) {
|
|||||||
}
|
}
|
||||||
info("Listening on port %u", port);
|
info("Listening on port %u", port);
|
||||||
}
|
}
|
||||||
void svr_setnonblock(int fd) {
|
|
||||||
int flags = fcntl(fd, F_GETFL, 0);
|
|
||||||
if (flags < 0) {
|
|
||||||
fatal("failed to set nonblocking on server socket");
|
|
||||||
}
|
|
||||||
flags |= O_NONBLOCK;
|
|
||||||
if (fcntl(fd, F_SETFL, flags) < 0) {
|
|
||||||
fatal("failed to set nonblocking on server socket");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
void svr_release(int fd) {
|
void svr_release(int fd) {
|
||||||
if (close(fd) < 0) {
|
if (close(fd) < 0) {
|
||||||
warning(true, "could not close socket");
|
warning(true, "could not close socket");
|
||||||
|
|||||||
@@ -47,7 +47,6 @@ extern "C" {
|
|||||||
|
|
||||||
int svr_create();
|
int svr_create();
|
||||||
void svr_listen(int fd, uint16_t port);
|
void svr_listen(int fd, uint16_t port);
|
||||||
void svr_setnonblock(int fd);
|
|
||||||
void svr_release(int fd);
|
void svr_release(int fd);
|
||||||
bool svr_canaccept(int fd);
|
bool svr_canaccept(int fd);
|
||||||
skt_info* svr_accept(int fd);
|
skt_info* svr_accept(int fd);
|
||||||
|
|||||||
13
src/util.c
13
src/util.c
@@ -21,7 +21,20 @@ void fatal(char* fmt, ...) {
|
|||||||
vsnprintf(msg, LOG_LENGTH, fmt, va);
|
vsnprintf(msg, LOG_LENGTH, fmt, va);
|
||||||
va_end(va);
|
va_end(va);
|
||||||
|
|
||||||
|
if (errno != 0) {
|
||||||
|
char *errnostr = calloc(64, sizeof(char));
|
||||||
|
strerror_r(errno, errnostr, 64);
|
||||||
|
char *errstr = calloc(strlen(msg)+strlen(errnostr)+3, sizeof(char));
|
||||||
|
strcat(errstr, msg);
|
||||||
|
strcat(errstr, ": ");
|
||||||
|
strcat(errstr, errnostr);
|
||||||
|
LOG(LFATAL, errstr);
|
||||||
|
free(errnostr);
|
||||||
|
free(errstr);
|
||||||
|
} else {
|
||||||
LOG(LFATAL, msg);
|
LOG(LFATAL, msg);
|
||||||
|
}
|
||||||
|
|
||||||
log_register_clear();
|
log_register_clear();
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user