#include #include #include #include #include #include #include #include #include "configfile.h" #include "event.h" #include "helper.h" #include "list.h" #include "logging.h" #include "network.h" #include "plugins.h" #include "rrdtool.h" #define BUFSIZE 8192 struct fwd_addr { struct list_head list; struct sockaddr_in addr; }; static int fwd_sock; static LIST_HEAD(fwd_list); // todo: never freed.. static char *tx_buf, *rx_buf; static int tx_pos; int net_submit(const char *hostname, const char *pluginname, const char *filename, int ds_id, const char *data) { if (list_empty(&fwd_list)) return 0; int size = snprintf(tx_buf + tx_pos, BUFSIZE - tx_pos, "%s:%s:%s:%d %s\n", hostname, pluginname, filename, ds_id, data); if (size < 0 || size >= BUFSIZE - tx_pos) { log_print(LOG_ERROR, "net_submit(): arguments too long"); // TODO: retry with flushed buffer? return -1; } tx_pos += size; return 0; } void net_submit_flush(void) { if (tx_pos == 0) return; struct fwd_addr *entry; list_for_each_entry(entry, &fwd_list, list) sendto(fwd_sock, tx_buf, tx_pos, 0, (struct sockaddr *)&entry->addr, sizeof(entry->addr)); tx_pos = 0; } static int net_receive(int socket, void *privdata) { int size = recv(socket, rx_buf, BUFSIZE, 0); int rx_pos = 0; char *delim; while ((delim = memchr(rx_buf + rx_pos, '\n', size - rx_pos))) { *delim = '\0'; char *data[2]; int ret = strsplit(rx_buf + rx_pos, " ", data, 2); if (ret != 2) { log_print(LOG_ERROR, "net_receive(): abort data-split"); continue; } char *part[4]; ret = strsplit(data[0], ":", part, 4); if (ret != 4) { log_print(LOG_ERROR, "net_receive(): abort header-split"); continue; } rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]); rx_pos = (delim - rx_buf) +1; } return 0; } static int parse_saddr(const char *addr, struct sockaddr_in *sa) { char *addr_cpy = strdup(addr); if (addr_cpy == NULL) { log_print(LOG_WARN, "parse_saddr(): out of memory"); return -1; } char *tmp; char *ip = strtok_r(addr_cpy, ":", &tmp); if (ip == NULL) { free(addr_cpy); return -1; } char *port = strtok_r(NULL, ":", &tmp); if (port == NULL) { free(addr_cpy); return -1; } sa->sin_family = AF_INET; sa->sin_port = htons(atoi(port)); int ret = inet_aton(ip, &sa->sin_addr); free(addr_cpy); return (ret != 0) ? 0 : -1; } static int net_init_srv_cb(const char *value, void *privdata) { struct sockaddr_in addr; if (parse_saddr(value, &addr) == -1) { log_print(LOG_WARN, "invalid listen addr: '%s'", value); return -1; } int sock = socket(PF_INET, SOCK_DGRAM, 0); if (sock < 0) { log_print(LOG_ERROR, "net_init_srv_cb(): socket()"); return -1; } if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) { log_print(LOG_ERROR, "net_init_srv_cb(): bind()"); close(sock); return -1; } event_add_readfd(NULL, sock, net_receive, NULL); log_print(LOG_INFO, "listen on %s:%d", inet_ntoa(addr.sin_addr), ntohs(addr.sin_port)); return 0; } static int net_init_cli_cb(const char *value, void *privdata) { struct fwd_addr *entry = malloc(sizeof(struct fwd_addr)); if (entry == NULL) { log_print(LOG_ERROR, "net_init_cli_cb(): out of memory"); return -1; } if (parse_saddr(value, &entry->addr) == -1) { log_print(LOG_WARN, "invalid listen addr: '%s'", value); free(entry); return -1; } list_add(&entry->list, &fwd_list); log_print(LOG_INFO, "forwarding to %s:%d", inet_ntoa(entry->addr.sin_addr), ntohs(entry->addr.sin_port)); return 0; } int net_init(void) { int srv_cnt = config_get_strings("global", "listen", net_init_srv_cb, NULL); if (srv_cnt > 0) { rx_buf = malloc(BUFSIZE); if (rx_buf == NULL) { log_print(LOG_ERROR, "net_init(): out of memory"); return -1; } } int cli_cnt = config_get_strings("global", "forward", net_init_cli_cb, NULL); if (cli_cnt > 0) { tx_buf = malloc(BUFSIZE); if (tx_buf == NULL) { log_print(LOG_ERROR, "net_init(): out of memory"); return -1; } fwd_sock = socket(PF_INET, SOCK_DGRAM, 0); if (fwd_sock < 0) { log_print(LOG_ERROR, "net_init(): socket()"); return -1; } } return 0; }