diff --git a/network.c b/network.c index d751e38..7873643 100644 --- a/network.c +++ b/network.c @@ -15,40 +15,64 @@ #include "plugins.h" #include "rrdtool.h" -#define BUFSIZE 1024 +#define BUFSIZE 8192 static struct sockaddr_in fwd_sa; static int fwd_sock; // todo: never freed.. static char *tx_buf, *rx_buf; +static int tx_pos; int net_submit(char *hostname, char *plugin, char *filename, int ds_id, char *data) { - int size = snprintf(tx_buf, BUFSIZE, "%s:%s:%s:%d %s", hostname, plugin, filename, ds_id, data); - if (size < 0 || size >= BUFSIZE) { + int size = snprintf(tx_buf + tx_pos, BUFSIZE - tx_pos, "%s:%s:%s:%d %s\n", + hostname, plugin, filename, ds_id, data); + + if (size < 0 || size >= BUFSIZE - tx_pos) { log_print(LOG_ERROR, "net_submit(): arguments too long"); + // TODO: retry with flushed buffer? return -1; } - sendto(fwd_sock, tx_buf, size +1, 0, (struct sockaddr *)&fwd_sa, sizeof(fwd_sa)); + tx_pos += size; + return 0; +} + +int net_submit_flush() +{ + if (tx_pos != 0) + sendto(fwd_sock, tx_buf, tx_pos, 0, (struct sockaddr *)&fwd_sa, sizeof(fwd_sa)); + + tx_pos = 0; return 0; } int net_receive(int socket) { - recv(socket, rx_buf, BUFSIZE, 0); + int size = recv(socket, rx_buf, BUFSIZE, 0); + int rx_pos = 0; - char *data[2], *part[4]; - int ret = strsplit(rx_buf, " ", data, 2); - if (ret != 2) - return -1; + char *delim; + while ((delim = memchr(rx_buf + rx_pos, '\n', size - rx_pos))) { + *delim = '\0'; - ret = strsplit(data[0], ":", part, 4); - if (ret != 4) - return -1; + char *data[2], *part[4]; + int ret = strsplit(rx_buf + rx_pos, " ", data, 2); + if (ret != 2) { + log_print(LOG_ERROR, "net_receive() abort data-split"); + continue; + } - rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]); + ret = strsplit(data[0], ":", part, 4); + if (ret != 4) { + log_print(LOG_ERROR, "net_receive() abort header-split"); + continue; + } + rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]); + + rx_pos = (delim - rx_buf) +1; + } return 0; } @@ -64,6 +88,8 @@ int net_init_cli() return -1; } + tx_pos = 0; + fwd_sock = socket(PF_INET, SOCK_DGRAM, 0); if (fwd_sock < 0) { log_print(LOG_ERROR, "net_init_cli(): socket()"); diff --git a/network.h b/network.h index 9b098cd..cec9130 100644 --- a/network.h +++ b/network.h @@ -8,5 +8,6 @@ int net_init_cli(); int net_receive(int sock); int net_submit(char *hostname, char *plugin, char *filename, int ds_id, char *data); +int net_submit_flush(); #endif /* _NETWORK_H_ */ diff --git a/p_mysql.c b/p_mysql.c index ad7530d..98a04bf 100644 --- a/p_mysql.c +++ b/p_mysql.c @@ -36,7 +36,7 @@ struct server_entry { struct list_head list; void *mysql; - char name[0]; + char *name; }; static LIST_HEAD(server_list); @@ -175,15 +175,14 @@ static int init(void) if (mysql == NULL) continue; - int len = strlen(part[0]); - struct server_entry *entry = malloc(sizeof(struct server_entry) + len +1); + struct server_entry *entry = malloc(sizeof(struct server_entry)); if (entry == NULL) { log_print(LOG_ERROR, "p_mysql: out of memory"); close_connection(mysql); continue; } - strcpy(entry->name, part[0]); + entry->name = part[0]; entry->mysql = mysql; log_print(LOG_DEBUG, "p_mysql: added server '%s'", entry->name); diff --git a/plugins.c b/plugins.c index 76cef51..ff8f4e6 100644 --- a/plugins.c +++ b/plugins.c @@ -121,6 +121,9 @@ void plugins_probe(void) plugin->lastprobe = now; } } + + if (plugin_flags & PLUGIN_NET) + net_submit_flush(); } struct sammler_plugin * plugin_lookup(char *name) diff --git a/sammler.conf b/sammler.conf index d9afcc2..25cef2a 100644 --- a/sammler.conf +++ b/sammler.conf @@ -1,9 +1,9 @@ [global] hostname localhost -#listen 127.0.0.1:5000 -#forward 127.0.0.1:5000 -#forward_only true +listen 127.0.0.1:5000 +forward 127.0.0.1:5000 +forward_only true logfile sammler.log