#include #include #include #include #include #include #include #include #include #include "configfile.h" #include "event.h" #include "helper.h" #include "list.h" #include "logging.h" #include "network.h" #include "plugins.h" #include "rrdtool.h" #include "sockaddr.h" #define PKTSIZE 1400 struct net_entry { struct list_head list; struct event_fd *event; struct sockaddr_in addr; int socket; }; static LIST_HEAD(srv_list); static LIST_HEAD(cli_list); static char *fwd_buf; static int fwd_buf_len; void net_submit_flush(void) { if (fwd_buf_len == 0) return; struct net_entry *entry; list_for_each_entry(entry, &cli_list, list) sendto(entry->socket, fwd_buf, fwd_buf_len, 0, (struct sockaddr *)&entry->addr, sizeof(entry->addr)); fwd_buf_len = 0; } int net_submit(const char *hostname, const char *pluginname, const char *filename, int ds_id, const char *data) { if (list_empty(&cli_list)) return 0; int size = snprintf(fwd_buf + fwd_buf_len, PKTSIZE - fwd_buf_len, "%s:%s:%s:%d %s\n", hostname, pluginname, filename, ds_id, data); if (size < 0 || size >= PKTSIZE - fwd_buf_len) { /* the complete buffer is already full */ if (fwd_buf_len == 0) { log_print(LOG_ERROR, "net_submit(): arguments too long"); return -1; } /* flush & retry */ net_submit_flush(); return net_submit(hostname, pluginname, filename, ds_id, data); } fwd_buf_len += size; return 0; } static int net_receive(int socket, void *privdata) { int recvsize; if (ioctl(socket, FIONREAD, &recvsize) == -1) { log_print(LOG_WARN, "net_receive(): ioctl(FIONREAD)"); return 0; } char *buf = malloc(recvsize); if (buf == NULL) { log_print(LOG_WARN, "net_receive(): out of memory"); return 0; } int size = recv(socket, buf, recvsize, 0); if (size <= 0) { log_print(LOG_WARN, "net_receive(): recv()"); free(buf); return 0; } char *delim; int pos = 0; while ((delim = memchr(buf + pos, '\n', size - pos)) != NULL) { *delim = '\0'; char *data[2]; int ret = strsplit(buf + pos, " ", data, 2); pos = (delim - buf) +1; if (ret != 2) { log_print(LOG_ERROR, "net_receive(): abort data-split"); continue; } char *part[4]; ret = strsplit(data[0], ":", part, 4); if (ret != 4) { log_print(LOG_ERROR, "net_receive(): abort header-split"); continue; } rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]); } free(buf); return 0; } static struct net_entry * create_net_entry(const char *value) { struct net_entry *entry = malloc(sizeof(struct net_entry)); if (entry == NULL) return NULL; if (parse_sockaddr(value, &entry->addr) < 0) { free(entry); return NULL; } entry->socket = socket(PF_INET, SOCK_DGRAM, 0); if (entry->socket < 0) { free(entry); return NULL; } return entry; } static int net_init_srv_cb(const char *value, void *privdata) { struct net_entry *entry = create_net_entry(value); if (entry == NULL) { log_print(LOG_ERROR, "net_init_srv_cb(): can not create net_entry"); return -1; } if (bind(entry->socket, (struct sockaddr *)&entry->addr, sizeof(entry->addr)) < 0) { log_print(LOG_ERROR, "net_init_srv_cb(): bind()"); close(entry->socket); free(entry); return -1; } entry->event = event_add_readfd(NULL, entry->socket, net_receive, NULL); list_add(&entry->list, &srv_list); log_print(LOG_INFO, "listen on %s", get_sockaddr_buf(&entry->addr)); return 0; } static int net_init_cli_cb(const char *value, void *privdata) { struct net_entry *entry = create_net_entry(value); if (entry == NULL) { log_print(LOG_ERROR, "net_init_cli_cb(): can not create net_entry"); return -1; } list_add(&entry->list, &cli_list); log_print(LOG_INFO, "forwarding to %s", get_sockaddr_buf(&entry->addr)); return 0; } int net_init(void) { config_get_strings("global", "listen", net_init_srv_cb, NULL); config_get_strings("global", "forward", net_init_cli_cb, NULL); fwd_buf = malloc(PKTSIZE); if (fwd_buf == NULL) { log_print(LOG_ERROR, "net_submit(): out of memory"); return -1; } return 0; } void net_close(void) { struct net_entry *entry, *tmp; list_for_each_entry_safe(entry, tmp, &cli_list, list) { list_del(&entry->list); close(entry->socket); free(entry); } list_for_each_entry_safe(entry, tmp, &srv_list, list) { list_del(&entry->list); event_remove_fd(entry->event); close(entry->socket); free(entry); } fwd_buf_len = 0; if (fwd_buf != NULL) free(fwd_buf); }