use single net pkts
This commit is contained in:
parent
135d209392
commit
bc0bd5fffb
48
network.c
48
network.c
@ -15,40 +15,64 @@
|
|||||||
#include "plugins.h"
|
#include "plugins.h"
|
||||||
#include "rrdtool.h"
|
#include "rrdtool.h"
|
||||||
|
|
||||||
#define BUFSIZE 1024
|
#define BUFSIZE 8192
|
||||||
|
|
||||||
static struct sockaddr_in fwd_sa;
|
static struct sockaddr_in fwd_sa;
|
||||||
static int fwd_sock;
|
static int fwd_sock;
|
||||||
|
|
||||||
// todo: never freed..
|
// todo: never freed..
|
||||||
static char *tx_buf, *rx_buf;
|
static char *tx_buf, *rx_buf;
|
||||||
|
static int tx_pos;
|
||||||
|
|
||||||
int net_submit(char *hostname, char *plugin, char *filename, int ds_id, char *data)
|
int net_submit(char *hostname, char *plugin, char *filename, int ds_id, char *data)
|
||||||
{
|
{
|
||||||
int size = snprintf(tx_buf, BUFSIZE, "%s:%s:%s:%d %s", hostname, plugin, filename, ds_id, data);
|
int size = snprintf(tx_buf + tx_pos, BUFSIZE - tx_pos, "%s:%s:%s:%d %s\n",
|
||||||
if (size < 0 || size >= BUFSIZE) {
|
hostname, plugin, filename, ds_id, data);
|
||||||
|
|
||||||
|
if (size < 0 || size >= BUFSIZE - tx_pos) {
|
||||||
log_print(LOG_ERROR, "net_submit(): arguments too long");
|
log_print(LOG_ERROR, "net_submit(): arguments too long");
|
||||||
|
// TODO: retry with flushed buffer?
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
sendto(fwd_sock, tx_buf, size +1, 0, (struct sockaddr *)&fwd_sa, sizeof(fwd_sa));
|
tx_pos += size;
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int net_submit_flush()
|
||||||
|
{
|
||||||
|
if (tx_pos != 0)
|
||||||
|
sendto(fwd_sock, tx_buf, tx_pos, 0, (struct sockaddr *)&fwd_sa, sizeof(fwd_sa));
|
||||||
|
|
||||||
|
tx_pos = 0;
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int net_receive(int socket)
|
int net_receive(int socket)
|
||||||
{
|
{
|
||||||
recv(socket, rx_buf, BUFSIZE, 0);
|
int size = recv(socket, rx_buf, BUFSIZE, 0);
|
||||||
|
int rx_pos = 0;
|
||||||
|
|
||||||
|
char *delim;
|
||||||
|
while ((delim = memchr(rx_buf + rx_pos, '\n', size - rx_pos))) {
|
||||||
|
*delim = '\0';
|
||||||
|
|
||||||
char *data[2], *part[4];
|
char *data[2], *part[4];
|
||||||
int ret = strsplit(rx_buf, " ", data, 2);
|
int ret = strsplit(rx_buf + rx_pos, " ", data, 2);
|
||||||
if (ret != 2)
|
if (ret != 2) {
|
||||||
return -1;
|
log_print(LOG_ERROR, "net_receive() abort data-split");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
ret = strsplit(data[0], ":", part, 4);
|
ret = strsplit(data[0], ":", part, 4);
|
||||||
if (ret != 4)
|
if (ret != 4) {
|
||||||
return -1;
|
log_print(LOG_ERROR, "net_receive() abort header-split");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]);
|
rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]);
|
||||||
|
|
||||||
|
rx_pos = (delim - rx_buf) +1;
|
||||||
|
}
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -64,6 +88,8 @@ int net_init_cli()
|
|||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
tx_pos = 0;
|
||||||
|
|
||||||
fwd_sock = socket(PF_INET, SOCK_DGRAM, 0);
|
fwd_sock = socket(PF_INET, SOCK_DGRAM, 0);
|
||||||
if (fwd_sock < 0) {
|
if (fwd_sock < 0) {
|
||||||
log_print(LOG_ERROR, "net_init_cli(): socket()");
|
log_print(LOG_ERROR, "net_init_cli(): socket()");
|
||||||
|
@ -8,5 +8,6 @@ int net_init_cli();
|
|||||||
int net_receive(int sock);
|
int net_receive(int sock);
|
||||||
|
|
||||||
int net_submit(char *hostname, char *plugin, char *filename, int ds_id, char *data);
|
int net_submit(char *hostname, char *plugin, char *filename, int ds_id, char *data);
|
||||||
|
int net_submit_flush();
|
||||||
|
|
||||||
#endif /* _NETWORK_H_ */
|
#endif /* _NETWORK_H_ */
|
||||||
|
@ -36,7 +36,7 @@
|
|||||||
struct server_entry {
|
struct server_entry {
|
||||||
struct list_head list;
|
struct list_head list;
|
||||||
void *mysql;
|
void *mysql;
|
||||||
char name[0];
|
char *name;
|
||||||
};
|
};
|
||||||
|
|
||||||
static LIST_HEAD(server_list);
|
static LIST_HEAD(server_list);
|
||||||
@ -175,15 +175,14 @@ static int init(void)
|
|||||||
if (mysql == NULL)
|
if (mysql == NULL)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
int len = strlen(part[0]);
|
struct server_entry *entry = malloc(sizeof(struct server_entry));
|
||||||
struct server_entry *entry = malloc(sizeof(struct server_entry) + len +1);
|
|
||||||
if (entry == NULL) {
|
if (entry == NULL) {
|
||||||
log_print(LOG_ERROR, "p_mysql: out of memory");
|
log_print(LOG_ERROR, "p_mysql: out of memory");
|
||||||
close_connection(mysql);
|
close_connection(mysql);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
strcpy(entry->name, part[0]);
|
entry->name = part[0];
|
||||||
entry->mysql = mysql;
|
entry->mysql = mysql;
|
||||||
|
|
||||||
log_print(LOG_DEBUG, "p_mysql: added server '%s'", entry->name);
|
log_print(LOG_DEBUG, "p_mysql: added server '%s'", entry->name);
|
||||||
|
@ -121,6 +121,9 @@ void plugins_probe(void)
|
|||||||
plugin->lastprobe = now;
|
plugin->lastprobe = now;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (plugin_flags & PLUGIN_NET)
|
||||||
|
net_submit_flush();
|
||||||
}
|
}
|
||||||
|
|
||||||
struct sammler_plugin * plugin_lookup(char *name)
|
struct sammler_plugin * plugin_lookup(char *name)
|
||||||
|
@ -1,9 +1,9 @@
|
|||||||
[global]
|
[global]
|
||||||
hostname localhost
|
hostname localhost
|
||||||
|
|
||||||
#listen 127.0.0.1:5000
|
listen 127.0.0.1:5000
|
||||||
#forward 127.0.0.1:5000
|
forward 127.0.0.1:5000
|
||||||
#forward_only true
|
forward_only true
|
||||||
|
|
||||||
logfile sammler.log
|
logfile sammler.log
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user