From 3bffc2c201fdf49135624e91452f20cf02b87cac Mon Sep 17 00:00:00 2001 From: Olaf Rempel Date: Wed, 4 Oct 2006 21:45:18 +0200 Subject: [PATCH] new event system --- Makefile | 2 +- cachesyncd.c | 90 +++++++++++++-- event.c | 310 +++++++++++++++++++++++++++++++++++++++++++++++++++ event.h | 26 +++++ multicast.c | 2 +- selector.c | 109 ------------------ selector.h | 6 - 7 files changed, 416 insertions(+), 129 deletions(-) create mode 100644 event.c create mode 100644 event.h delete mode 100644 selector.c delete mode 100644 selector.h diff --git a/Makefile b/Makefile index 86ce6a8..491d5fe 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ # Toplevel Makefile -SRC := cachesyncd.c configfile.c logging.c multicast.c selector.c unixsock.c +SRC := cachesyncd.c configfile.c event.c logging.c multicast.c unixsock.c CFLAGS := -O2 -Wall # ############################ diff --git a/cachesyncd.c b/cachesyncd.c index ab13389..184cb34 100644 --- a/cachesyncd.c +++ b/cachesyncd.c @@ -25,15 +25,21 @@ #include #include +#include +#include +#include + #include "configfile.h" #include "logging.h" -#include "selector.h" +#include "event.h" #include "multicast.h" #include "unixsock.h" #define DEFAULT_CONFIG "cachesyncd.conf" #define DEFAULT_LOGFILE "cachesyncd.log" +#define BUF_SIZE 256 + static struct option opts[] = { {"config", 1, 0, 'c'}, {"debug", 0, 0, 'd'}, @@ -41,6 +47,59 @@ static struct option opts[] = { {0, 0, 0, 0} }; +static int usock, msock; +static char *buf; + +int usock_read_callback(int fd, void *privdata) +{ + int len = read(fd, buf, BUF_SIZE); + if (len <= 0) { + close(fd); + return -1; + } + + mcast_send(msock, buf, len); + return 0; +} + +int usock_accept_callback(int fd, void *privdata) +{ + int con = accept(usock, NULL, NULL); + if (con < 0 ) { + log_print(LOG_ERROR, "selector: accept()"); + return 0; + } + + event_add_readfd(con, usock_read_callback, NULL); + return 0; +} + +int msock_read_callback(int fd, void *privdata) +{ + int len = read(fd, buf, BUF_SIZE); + if (len <= 0) { + log_print(LOG_ERROR, "selector: multicast sock closed?"); + + } else if (!strncmp(buf, "KEEPALIVE", 10)) { + // nothing + + } else if (!strncmp(buf, "DELETE ", 7)) { + log_print(LOG_DEBUG, "delete '%s'", buf +7); + //delete_file(buf +7); + + } else { + log_print(LOG_DEBUG, "recv unknown cmd via multicast: '%s'", buf); + } + + return 0; +} + +int msock_keepalive_timeout(void *privdata) +{ + mcast_send(msock, "KEEPALIVE", 10); + return 0; +} + int main(int argc, char *argv[]) { char *config = DEFAULT_CONFIG; @@ -92,25 +151,32 @@ int main(int argc, char *argv[]) daemon(-1, 0); } - // logfile (absolute filename, dropped privs???) - // (daemon) - // chroot (needs root) - // msockinit (needs root) - // setgid/pid (drop privs) - // usockinit (dropped privs, relative path) - // selector/unlink (dropped privs, relative path) - log_print(LOG_EVERYTIME, "cachesyncd started (pid: %d)", getpid()); - int usock = sock_init(); + buf = malloc(BUF_SIZE); + if (buf == NULL) { + log_print(LOG_ERROR, "selector: out of memory"); + return -1; + } + + usock = sock_init(); if (usock < 0) return -1; - int msock = mcast_init(); + msock = mcast_init(); if (msock < 0) return -1; - selector(usock, msock); + event_add_readfd(usock, usock_accept_callback, NULL); + event_add_readfd(msock, msock_read_callback, NULL); + + struct timeval tv; + tv.tv_sec = 60; + tv.tv_usec = 0; + + event_add_timeout(&tv, msock_keepalive_timeout, NULL); + + event_loop(); return 0; } diff --git a/event.c b/event.c new file mode 100644 index 0000000..9ebf8e6 --- /dev/null +++ b/event.c @@ -0,0 +1,310 @@ +/*************************************************************************** + * Copyright (C) 10/2006 by Olaf Rempel * + * razzor@kopf-tisch.de * + * * + * This program is free software; you can redistribute it and/or modify * + * it under the terms of the GNU General Public License as published by * + * the Free Software Foundation; either version 2 of the License, or * + * (at your option) any later version. * + * * + * This program is distributed in the hope that it will be useful, * + * but WITHOUT ANY WARRANTY; without even the implied warranty of * + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * + * GNU General Public License for more details. * + * * + * You should have received a copy of the GNU General Public License * + * along with this program; if not, write to the * + * Free Software Foundation, Inc., * + * 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. * + ***************************************************************************/ +#include +#include +#include + +#include +#include + +#include "list.h" +#include "logging.h" + +#include "event.h" + +static LIST_HEAD(readfd_list); +static LIST_HEAD(writefd_list); +static LIST_HEAD(exceptfd_list); +static LIST_HEAD(timeout_list); + +struct fd_entry { + struct list_head list; + int fd; + int type; + int (*callback)(int fd, void *privdata); + void *privdata; +}; + +struct timeout_entry { + struct list_head list; + struct timeval timeout; + struct timeval nextrun; + int (*callback)(void *privdata); + void *privdata; +}; + +int event_add_fd(int fd, int type, int (*callback)(int fd, void *privdata), void *privdata) +{ + if (fd < 0 || fd > FD_SETSIZE) + return -1; + + struct fd_entry *entry; + entry = malloc(sizeof(struct fd_entry)); + if (entry == NULL) { + log_print(LOG_ERROR, "event_add_fd(): out of memory"); + return -1; + } + + entry->fd = fd; + entry->type = type; + entry->callback = callback; + entry->privdata = privdata; + + switch (type) { + case FD_READ: + list_add_tail(&entry->list, &readfd_list); + break; + + case FD_WRITE: + list_add_tail(&entry->list, &writefd_list); + break; + + case FD_EXCEPT: + list_add_tail(&entry->list, &exceptfd_list); + break; + + default: + log_print(LOG_ERROR, "add_fd(): unknown type"); + free(entry); + return -1; + } + + return 0; +} + +int event_remove_fd(int fd) +{ + struct fd_entry *entry, *tmp; + list_for_each_entry_safe(entry, tmp, &readfd_list, list) { + if (entry->fd == fd) { + list_del(&entry->list); + free(entry); + return 0; + } + } + + list_for_each_entry_safe(entry, tmp, &writefd_list, list) { + if (entry->fd == fd) { + list_del(&entry->list); + free(entry); + return 0; + } + } + + list_for_each_entry_safe(entry, tmp, &exceptfd_list, list) { + if (entry->fd == fd) { + list_del(&entry->list); + free(entry); + return 0; + } + } + + return -1; +} + +static void calc_nextrun(struct timeval *timeout, struct timeval *nextrun) +{ + struct timeval now; + gettimeofday(&now, NULL); + + nextrun->tv_usec = now.tv_usec + timeout->tv_usec; + nextrun->tv_sec = now.tv_sec + timeout->tv_sec; + + if (nextrun->tv_usec >= 1000000) { + nextrun->tv_usec -= 1000000; + nextrun->tv_sec++; + } +} + +static void calc_timeout(struct timeval *timeout, struct timeval *nextrun) +{ + struct timeval now; + gettimeofday(&now, NULL); + + timeout->tv_usec = nextrun->tv_usec - now.tv_usec; + timeout->tv_sec = nextrun->tv_sec - now.tv_sec; + + if (timeout->tv_usec < 0) { + timeout->tv_usec += 1000000; + timeout->tv_sec--; + } +} + +static void schedule_nextrun(struct timeout_entry *entry) +{ + struct timeout_entry *search; + + list_for_each_entry(search, &timeout_list, list) { + if (search->nextrun.tv_sec > entry->nextrun.tv_sec) { + list_add_tail(&entry->list, &search->list); + return; + + } else if (search->nextrun.tv_sec == entry->nextrun.tv_sec && + search->nextrun.tv_usec > entry->nextrun.tv_usec) { + list_add_tail(&entry->list, &search->list); + return; + } + } + list_add_tail(&entry->list, &timeout_list); +} + +int event_add_timeout(struct timeval *timeout, int (*callback)(void *privdata), void *privdata) +{ + struct timeout_entry *entry; + entry = malloc(sizeof(struct timeout_entry)); + if (entry == NULL) { + log_print(LOG_ERROR, "event_add_timeout(): out of memory"); + return -1; + } + + memcpy(&entry->timeout, timeout, sizeof(entry->timeout)); + entry->callback = callback; + entry->privdata = privdata; + + calc_nextrun(&entry->timeout, &entry->nextrun); + schedule_nextrun(entry); + return 0; +} + +int event_loop(void) +{ + while (1) { + fd_set readfds, *readfds_p = NULL; + fd_set writefds, *writefds_p = NULL; + fd_set exceptfds, *exceptfds_p =NULL; + struct timeval timeout, *timeout_p = NULL; + + if (!list_empty(&readfd_list)) { + struct fd_entry *entry; + + FD_ZERO(&readfds); + list_for_each_entry(entry, &readfd_list, list) + FD_SET(entry->fd, &readfds); + + readfds_p = &readfds; + } + + if (!list_empty(&writefd_list)) { + struct fd_entry *entry; + + FD_ZERO(&writefds); + list_for_each_entry(entry, &writefd_list, list) + FD_SET(entry->fd, &writefds); + + writefds_p = &writefds; + } + + if (!list_empty(&exceptfd_list)) { + struct fd_entry *entry; + + FD_ZERO(&exceptfds); + list_for_each_entry(entry, &exceptfd_list, list) + FD_SET(entry->fd, &exceptfds); + + exceptfds_p = &exceptfds; + } + + if (!list_empty(&timeout_list)) { + struct timeout_entry *entry, *tmp; + list_for_each_entry_safe(entry, tmp, &timeout_list, list) { + + calc_timeout(&timeout, &entry->nextrun); + if (timeout.tv_sec >= 0 && timeout.tv_usec > 0) { + timeout_p = &timeout; + break; + } + + // delayed timeout, exec NOW! + list_del(&entry->list); + int ret = entry->callback(entry->privdata); + if (ret == 0) { + calc_nextrun(&entry->timeout, &entry->nextrun); + schedule_nextrun(entry); + + } else { + free(entry); + } + } + } + + int i = select(FD_SETSIZE, readfds_p, writefds_p, exceptfds_p, timeout_p); + if (i < 0) { + /* On error, -1 is returned, and errno is set + * appropriately; the sets and timeout become + * undefined, so do not rely on their contents + * after an error. + */ + continue; + + } else if (i == 0 && !list_empty(&timeout_list)) { + struct timeout_entry *entry; + entry = list_entry(timeout_list.next, typeof(*entry), list); + list_del(&entry->list); + + int ret = entry->callback(entry->privdata); + if (ret == 0) { + calc_nextrun(&entry->timeout, &entry->nextrun); + schedule_nextrun(entry); + + } else { + free(entry); + } + } + + if (readfds_p) { + struct fd_entry *entry, *tmp; + list_for_each_entry_safe(entry, tmp, &readfd_list, list) { + if (!FD_ISSET(entry->fd, &readfds)) + continue; + + if (entry->callback(entry->fd, entry->privdata) != 0) { + list_del(&entry->list); + free(entry); + } + } + } + + if (writefds_p) { + struct fd_entry *entry, *tmp; + list_for_each_entry_safe(entry, tmp, &writefd_list, list) { + if (FD_ISSET(entry->fd, &writefds)) + continue; + + if (entry->callback(entry->fd, entry->privdata) != 0) { + list_del(&entry->list); + free(entry); + } + } + } + + if (exceptfds_p) { + struct fd_entry *entry, *tmp; + list_for_each_entry_safe(entry, tmp, &exceptfd_list, list) { + if (FD_ISSET(entry->fd, &exceptfds)) + continue; + + if (entry->callback(entry->fd, entry->privdata) != 0) { + list_del(&entry->list); + free(entry); + } + } + } + } +} diff --git a/event.h b/event.h new file mode 100644 index 0000000..a20d1f8 --- /dev/null +++ b/event.h @@ -0,0 +1,26 @@ +#ifndef _EVENT_H_ +#define _EVENT_H_ + +#include + +#define FD_READ 0x01 +#define FD_WRITE 0x02 +#define FD_EXCEPT 0x04 + +#define event_add_readfd(fd, callback, privdata) \ + event_add_fd(fd, FD_READ, callback, privdata) + +#define event_add_writefd(fd, callback, privdata) \ + event_add_fd(fd, FD_WRITE, callback, privdata) + +#define event_add_exceptfd(fd, callback, privdata) \ + event_add_fd(fd, FD_EXCEPT, callback, privdata) + +int event_add_fd(int fd, int type, int (*callback)(int fd, void *privdata), void *privdata); +int event_add_timeout(struct timeval *timeout, int (*callback)(void *privdata), void *privdata); + +int event_remove_fd(int fd); + +int event_loop(void); + +#endif /* _EVENT_H_ */ diff --git a/multicast.c b/multicast.c index c748b8e..227d094 100644 --- a/multicast.c +++ b/multicast.c @@ -96,7 +96,7 @@ int mcast_init() return -1; } - char loop = 1; + char loop = 0; if (setsockopt(sockfd, IPPROTO_IP, IP_MULTICAST_LOOP, &loop, sizeof(loop))) { log_print(LOG_WARN, "mcast_init: setsockopt(IP_MULTICAST_LOOP)"); close(sockfd); diff --git a/selector.c b/selector.c deleted file mode 100644 index d9d45dd..0000000 --- a/selector.c +++ /dev/null @@ -1,109 +0,0 @@ -#include -#include -#include - -#include -#include -#include - -#include "list.h" -#include "logging.h" -#include "multicast.h" -#include "unixsock.h" - -#define BUF_SIZE 256 - -struct fd_entry { - struct list_head list; - int fd; -}; - -static LIST_HEAD(fd_list); - -void selector(int usock, int msock) -{ - fd_set fdsel; - - char *buf = malloc(BUF_SIZE); - if (buf == NULL) { - log_print(LOG_ERROR, "selector: out of memory"); - return; - } - - while (1) { - struct timeval tv; - struct fd_entry *entry, *entry_safe; - - FD_ZERO(&fdsel); - FD_SET(usock, &fdsel); - FD_SET(msock, &fdsel); - - list_for_each_entry(entry, &fd_list, list) { - FD_SET(entry->fd, &fdsel); - } - - tv.tv_sec = 60; - tv.tv_usec = 0; - - int ret = select(FD_SETSIZE, &fdsel, NULL, NULL, &tv); - if (ret < 0) { - log_print(LOG_ERROR, "selector: select()"); - continue; - - } else if (ret == 0) { - mcast_send(msock, "KEEPALIVE", 10); - } - - if (FD_ISSET(msock, &fdsel)) { - int len = read(msock, buf, BUF_SIZE); - if (len <= 0) { - log_print(LOG_ERROR, "selector: multicast sock closed?"); - - } else if (!strncmp(buf, "KEEPALIVE", 10)) { - // nothing - - } else if (!strncmp(buf, "DELETE ", 7)) { - log_print(LOG_DEBUG, "delete '%s'", buf +7); -// delete_file(buf +7); - - } else { - log_print(LOG_DEBUG, "recv unknown cmd via multicast: '%s'", buf); - } - } - - if (FD_ISSET(usock, &fdsel)) { - int usock_con; - - usock_con = accept(usock, NULL, NULL); - if (usock_con != -1) { - entry = malloc(sizeof(struct fd_entry)); - if (entry) { - entry->fd = usock_con; - list_add_tail(&entry->list, &fd_list); - } else { - log_print(LOG_ERROR, "selector: out of memory"); - close(usock_con); - } - - } else { - log_print(LOG_ERROR, "selector: accept()"); - } - } - - list_for_each_entry_safe(entry, entry_safe, &fd_list, list) { - if (FD_ISSET(entry->fd, &fdsel)) { - int len = read(entry->fd, buf, BUF_SIZE); - if (len <= 0) { - list_del(&entry->list); - close(entry->fd); - free(entry); - - } else { - mcast_send(msock, buf, len); - } - } - } - } - msock_close(msock); - sock_close(usock); -} diff --git a/selector.h b/selector.h deleted file mode 100644 index 7cd693e..0000000 --- a/selector.h +++ /dev/null @@ -1,6 +0,0 @@ -#ifndef _SELECTOR_H_ -#define _SELECTOR_H_ - -void selector(int usock, int msock); - -#endif // _SELECTOR_H_