diff --git a/network.c b/network.c index 512ab16..6d71762 100644 --- a/network.c +++ b/network.c @@ -2,6 +2,7 @@ #include #include #include +#include #include #include #include @@ -16,77 +17,112 @@ #include "plugins.h" #include "rrdtool.h" -#define BUFSIZE 8192 +#define BUFSIZE 1024 -struct fwd_addr { +struct net_entry { struct list_head list; + struct event_fd *event; struct sockaddr_in addr; + int socket; }; -static int fwd_sock; -static LIST_HEAD(fwd_list); +static LIST_HEAD(srv_list); +static LIST_HEAD(cli_list); -// todo: never freed.. -static char *tx_buf, *rx_buf; -static int tx_pos; - -int net_submit(const char *hostname, const char *pluginname, const char *filename, int ds_id, const char *data) -{ - if (list_empty(&fwd_list)) - return 0; - - int size = snprintf(tx_buf + tx_pos, BUFSIZE - tx_pos, "%s:%s:%s:%d %s\n", - hostname, pluginname, filename, ds_id, data); - - if (size < 0 || size >= BUFSIZE - tx_pos) { - log_print(LOG_ERROR, "net_submit(): arguments too long"); - // TODO: retry with flushed buffer? - return -1; - } - - tx_pos += size; - return 0; -} +static char *fwd_buf; +static int fwd_buf_len; void net_submit_flush(void) { - if (tx_pos == 0) + if (fwd_buf_len == 0) return; - struct fwd_addr *entry; - list_for_each_entry(entry, &fwd_list, list) - sendto(fwd_sock, tx_buf, tx_pos, 0, (struct sockaddr *)&entry->addr, sizeof(entry->addr)); + struct net_entry *entry; + list_for_each_entry(entry, &cli_list, list) + sendto(entry->socket, fwd_buf, fwd_buf_len, 0, (struct sockaddr *)&entry->addr, sizeof(entry->addr)); - tx_pos = 0; + fwd_buf_len = 0; +} + +int net_submit(const char *hostname, const char *pluginname, const char *filename, int ds_id, const char *data) +{ + if (list_empty(&cli_list)) + return 0; + + if (fwd_buf == NULL) { + fwd_buf = malloc(BUFSIZE); + if (fwd_buf == NULL) { + log_print(LOG_ERROR, "net_submit(): out of memory"); + return -1; + } + } + + int size = snprintf(fwd_buf + fwd_buf_len, BUFSIZE - fwd_buf_len, "%s:%s:%s:%d %s\n", + hostname, pluginname, filename, ds_id, data); + + if (size < 0 || size >= BUFSIZE - fwd_buf_len) { + /* the complete buffer is already full */ + if (fwd_buf_len == 0) { + log_print(LOG_ERROR, "net_submit(): arguments too long"); + return -1; + } + + /* flush & retry */ + net_submit_flush(); + return net_submit(hostname, pluginname, filename, ds_id, data); + } + + fwd_buf_len += size; + return 0; } static int net_receive(int socket, void *privdata) { - int size = recv(socket, rx_buf, BUFSIZE, 0); - int rx_pos = 0; - - char *delim; - while ((delim = memchr(rx_buf + rx_pos, '\n', size - rx_pos))) { - *delim = '\0'; - - char *data[2]; - int ret = strsplit(rx_buf + rx_pos, " ", data, 2); - if (ret != 2) { - log_print(LOG_ERROR, "net_receive(): abort data-split"); - continue; - } - - char *part[4]; - ret = strsplit(data[0], ":", part, 4); - if (ret != 4) { - log_print(LOG_ERROR, "net_receive(): abort header-split"); - continue; - } - - rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]); - - rx_pos = (delim - rx_buf) +1; + int recvsize; + if (ioctl(socket, FIONREAD, &recvsize) == -1) { + log_print(LOG_WARN, "net_receive(): ioctl(FIONREAD)"); + return 0; } + + char *buf = malloc(recvsize); + if (buf == NULL) { + log_print(LOG_WARN, "net_receive(): out of memory"); + return 0; + } + + int size = recv(socket, buf, recvsize, 0); + if (size <= 0) { + log_print(LOG_WARN, "net_receive(): recv()"); + free(buf); + return 0; + } + + char *delim = memchr(buf, '\n', size); + if (delim == NULL) { + log_print(LOG_WARN, "net_receive(): invalid data"); + free(buf); + return 0; + } + *delim = '\0'; + + char *data[2]; + int ret = strsplit(buf, " ", data, 2); + if (ret != 2) { + log_print(LOG_ERROR, "net_receive(): abort data-split"); + free(buf); + return 0; + } + + char *part[4]; + ret = strsplit(data[0], ":", part, 4); + if (ret != 4) { + log_print(LOG_ERROR, "net_receive(): abort header-split"); + free(buf); + return 0; + } + + rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]); + free(buf); return 0; } @@ -119,49 +155,58 @@ static int parse_saddr(const char *addr, struct sockaddr_in *sa) return (ret != 0) ? 0 : -1; } +static struct net_entry * create_net_entry(const char *value) +{ + struct net_entry *entry = malloc(sizeof(struct net_entry)); + if (entry == NULL) + return NULL; + + if (parse_saddr(value, &entry->addr) == -1) { + free(entry); + return NULL; + } + + entry->socket = socket(PF_INET, SOCK_DGRAM, 0); + if (entry->socket < 0) { + free(entry); + return NULL; + } + + return entry; +} + static int net_init_srv_cb(const char *value, void *privdata) { - struct sockaddr_in addr; - if (parse_saddr(value, &addr) == -1) { - log_print(LOG_WARN, "invalid listen addr: '%s'", value); + struct net_entry *entry = create_net_entry(value); + if (entry == NULL) { + log_print(LOG_ERROR, "net_init_srv_cb(): can not create net_entry"); return -1; } - int sock = socket(PF_INET, SOCK_DGRAM, 0); - if (sock < 0) { - log_print(LOG_ERROR, "net_init_srv_cb(): socket()"); - return -1; - } - - if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { + if (bind(entry->socket, (struct sockaddr *)&entry->addr, sizeof(entry->addr)) < 0) { log_print(LOG_ERROR, "net_init_srv_cb(): bind()"); - close(sock); + close(entry->socket); + free(entry); return -1; } - event_add_readfd(NULL, sock, net_receive, NULL); + entry->event = event_add_readfd(NULL, entry->socket, net_receive, NULL); log_print(LOG_INFO, "listen on %s:%d", - inet_ntoa(addr.sin_addr), - ntohs(addr.sin_port)); + inet_ntoa(entry->addr.sin_addr), + ntohs(entry->addr.sin_port)); return 0; } static int net_init_cli_cb(const char *value, void *privdata) { - struct fwd_addr *entry = malloc(sizeof(struct fwd_addr)); + struct net_entry *entry = create_net_entry(value); if (entry == NULL) { - log_print(LOG_ERROR, "net_init_cli_cb(): out of memory"); + log_print(LOG_ERROR, "net_init_cli_cb(): can not create net_entry"); return -1; } - if (parse_saddr(value, &entry->addr) == -1) { - log_print(LOG_WARN, "invalid listen addr: '%s'", value); - free(entry); - return -1; - } - - list_add(&entry->list, &fwd_list); + list_add(&entry->list, &cli_list); log_print(LOG_INFO, "forwarding to %s:%d", inet_ntoa(entry->addr.sin_addr), ntohs(entry->addr.sin_port)); @@ -171,29 +216,26 @@ static int net_init_cli_cb(const char *value, void *privdata) int net_init(void) { - int srv_cnt = config_get_strings("global", "listen", net_init_srv_cb, NULL); - if (srv_cnt > 0) { - rx_buf = malloc(BUFSIZE); - if (rx_buf == NULL) { - log_print(LOG_ERROR, "net_init(): out of memory"); - return -1; - } - } - - int cli_cnt = config_get_strings("global", "forward", net_init_cli_cb, NULL); - if (cli_cnt > 0) { - tx_buf = malloc(BUFSIZE); - if (tx_buf == NULL) { - log_print(LOG_ERROR, "net_init(): out of memory"); - return -1; - } - - fwd_sock = socket(PF_INET, SOCK_DGRAM, 0); - if (fwd_sock < 0) { - log_print(LOG_ERROR, "net_init(): socket()"); - return -1; - } - } - + config_get_strings("global", "listen", net_init_srv_cb, NULL); + config_get_strings("global", "forward", net_init_cli_cb, NULL); return 0; } + +void net_close(void) { + struct net_entry *entry, *tmp; + list_for_each_entry_safe(entry, tmp, &cli_list, list) { + list_del(&entry->list); + close(entry->socket); + free(entry); + } + + list_for_each_entry_safe(entry, tmp, &srv_list, list) { + list_del(&entry->list); + event_remove_fd(entry->event); + close(entry->socket); + free(entry); + } + + if (fwd_buf != NULL) + free(fwd_buf); +}