cleanup network
This commit is contained in:
parent
3d0063c419
commit
8210d25ec4
216
network.c
216
network.c
@ -2,6 +2,7 @@
|
|||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <sys/ioctl.h>
|
||||||
#include <sys/socket.h>
|
#include <sys/socket.h>
|
||||||
#include <netinet/in.h>
|
#include <netinet/in.h>
|
||||||
#include <netinet/ip.h>
|
#include <netinet/ip.h>
|
||||||
@ -16,77 +17,112 @@
|
|||||||
#include "plugins.h"
|
#include "plugins.h"
|
||||||
#include "rrdtool.h"
|
#include "rrdtool.h"
|
||||||
|
|
||||||
#define BUFSIZE 8192
|
#define BUFSIZE 1024
|
||||||
|
|
||||||
struct fwd_addr {
|
struct net_entry {
|
||||||
struct list_head list;
|
struct list_head list;
|
||||||
|
struct event_fd *event;
|
||||||
struct sockaddr_in addr;
|
struct sockaddr_in addr;
|
||||||
|
int socket;
|
||||||
};
|
};
|
||||||
|
|
||||||
static int fwd_sock;
|
static LIST_HEAD(srv_list);
|
||||||
static LIST_HEAD(fwd_list);
|
static LIST_HEAD(cli_list);
|
||||||
|
|
||||||
// todo: never freed..
|
static char *fwd_buf;
|
||||||
static char *tx_buf, *rx_buf;
|
static int fwd_buf_len;
|
||||||
static int tx_pos;
|
|
||||||
|
|
||||||
int net_submit(const char *hostname, const char *pluginname, const char *filename, int ds_id, const char *data)
|
|
||||||
{
|
|
||||||
if (list_empty(&fwd_list))
|
|
||||||
return 0;
|
|
||||||
|
|
||||||
int size = snprintf(tx_buf + tx_pos, BUFSIZE - tx_pos, "%s:%s:%s:%d %s\n",
|
|
||||||
hostname, pluginname, filename, ds_id, data);
|
|
||||||
|
|
||||||
if (size < 0 || size >= BUFSIZE - tx_pos) {
|
|
||||||
log_print(LOG_ERROR, "net_submit(): arguments too long");
|
|
||||||
// TODO: retry with flushed buffer?
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tx_pos += size;
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void net_submit_flush(void)
|
void net_submit_flush(void)
|
||||||
{
|
{
|
||||||
if (tx_pos == 0)
|
if (fwd_buf_len == 0)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
struct fwd_addr *entry;
|
struct net_entry *entry;
|
||||||
list_for_each_entry(entry, &fwd_list, list)
|
list_for_each_entry(entry, &cli_list, list)
|
||||||
sendto(fwd_sock, tx_buf, tx_pos, 0, (struct sockaddr *)&entry->addr, sizeof(entry->addr));
|
sendto(entry->socket, fwd_buf, fwd_buf_len, 0, (struct sockaddr *)&entry->addr, sizeof(entry->addr));
|
||||||
|
|
||||||
tx_pos = 0;
|
fwd_buf_len = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int net_submit(const char *hostname, const char *pluginname, const char *filename, int ds_id, const char *data)
|
||||||
|
{
|
||||||
|
if (list_empty(&cli_list))
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
if (fwd_buf == NULL) {
|
||||||
|
fwd_buf = malloc(BUFSIZE);
|
||||||
|
if (fwd_buf == NULL) {
|
||||||
|
log_print(LOG_ERROR, "net_submit(): out of memory");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = snprintf(fwd_buf + fwd_buf_len, BUFSIZE - fwd_buf_len, "%s:%s:%s:%d %s\n",
|
||||||
|
hostname, pluginname, filename, ds_id, data);
|
||||||
|
|
||||||
|
if (size < 0 || size >= BUFSIZE - fwd_buf_len) {
|
||||||
|
/* the complete buffer is already full */
|
||||||
|
if (fwd_buf_len == 0) {
|
||||||
|
log_print(LOG_ERROR, "net_submit(): arguments too long");
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
/* flush & retry */
|
||||||
|
net_submit_flush();
|
||||||
|
return net_submit(hostname, pluginname, filename, ds_id, data);
|
||||||
|
}
|
||||||
|
|
||||||
|
fwd_buf_len += size;
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int net_receive(int socket, void *privdata)
|
static int net_receive(int socket, void *privdata)
|
||||||
{
|
{
|
||||||
int size = recv(socket, rx_buf, BUFSIZE, 0);
|
int recvsize;
|
||||||
int rx_pos = 0;
|
if (ioctl(socket, FIONREAD, &recvsize) == -1) {
|
||||||
|
log_print(LOG_WARN, "net_receive(): ioctl(FIONREAD)");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
char *delim;
|
char *buf = malloc(recvsize);
|
||||||
while ((delim = memchr(rx_buf + rx_pos, '\n', size - rx_pos))) {
|
if (buf == NULL) {
|
||||||
|
log_print(LOG_WARN, "net_receive(): out of memory");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
int size = recv(socket, buf, recvsize, 0);
|
||||||
|
if (size <= 0) {
|
||||||
|
log_print(LOG_WARN, "net_receive(): recv()");
|
||||||
|
free(buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
char *delim = memchr(buf, '\n', size);
|
||||||
|
if (delim == NULL) {
|
||||||
|
log_print(LOG_WARN, "net_receive(): invalid data");
|
||||||
|
free(buf);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
*delim = '\0';
|
*delim = '\0';
|
||||||
|
|
||||||
char *data[2];
|
char *data[2];
|
||||||
int ret = strsplit(rx_buf + rx_pos, " ", data, 2);
|
int ret = strsplit(buf, " ", data, 2);
|
||||||
if (ret != 2) {
|
if (ret != 2) {
|
||||||
log_print(LOG_ERROR, "net_receive(): abort data-split");
|
log_print(LOG_ERROR, "net_receive(): abort data-split");
|
||||||
continue;
|
free(buf);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
char *part[4];
|
char *part[4];
|
||||||
ret = strsplit(data[0], ":", part, 4);
|
ret = strsplit(data[0], ":", part, 4);
|
||||||
if (ret != 4) {
|
if (ret != 4) {
|
||||||
log_print(LOG_ERROR, "net_receive(): abort header-split");
|
log_print(LOG_ERROR, "net_receive(): abort header-split");
|
||||||
continue;
|
free(buf);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]);
|
rrd_submit(part[0], part[1], part[2], atoi(part[3]), data[1]);
|
||||||
|
free(buf);
|
||||||
rx_pos = (delim - rx_buf) +1;
|
|
||||||
}
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,49 +155,58 @@ static int parse_saddr(const char *addr, struct sockaddr_in *sa)
|
|||||||
return (ret != 0) ? 0 : -1;
|
return (ret != 0) ? 0 : -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static struct net_entry * create_net_entry(const char *value)
|
||||||
|
{
|
||||||
|
struct net_entry *entry = malloc(sizeof(struct net_entry));
|
||||||
|
if (entry == NULL)
|
||||||
|
return NULL;
|
||||||
|
|
||||||
|
if (parse_saddr(value, &entry->addr) == -1) {
|
||||||
|
free(entry);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
entry->socket = socket(PF_INET, SOCK_DGRAM, 0);
|
||||||
|
if (entry->socket < 0) {
|
||||||
|
free(entry);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
return entry;
|
||||||
|
}
|
||||||
|
|
||||||
static int net_init_srv_cb(const char *value, void *privdata)
|
static int net_init_srv_cb(const char *value, void *privdata)
|
||||||
{
|
{
|
||||||
struct sockaddr_in addr;
|
struct net_entry *entry = create_net_entry(value);
|
||||||
if (parse_saddr(value, &addr) == -1) {
|
if (entry == NULL) {
|
||||||
log_print(LOG_WARN, "invalid listen addr: '%s'", value);
|
log_print(LOG_ERROR, "net_init_srv_cb(): can not create net_entry");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
int sock = socket(PF_INET, SOCK_DGRAM, 0);
|
if (bind(entry->socket, (struct sockaddr *)&entry->addr, sizeof(entry->addr)) < 0) {
|
||||||
if (sock < 0) {
|
|
||||||
log_print(LOG_ERROR, "net_init_srv_cb(): socket()");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (bind(sock, (struct sockaddr *)&addr, sizeof(addr)) < 0) {
|
|
||||||
log_print(LOG_ERROR, "net_init_srv_cb(): bind()");
|
log_print(LOG_ERROR, "net_init_srv_cb(): bind()");
|
||||||
close(sock);
|
close(entry->socket);
|
||||||
|
free(entry);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
event_add_readfd(NULL, sock, net_receive, NULL);
|
entry->event = event_add_readfd(NULL, entry->socket, net_receive, NULL);
|
||||||
log_print(LOG_INFO, "listen on %s:%d",
|
log_print(LOG_INFO, "listen on %s:%d",
|
||||||
inet_ntoa(addr.sin_addr),
|
inet_ntoa(entry->addr.sin_addr),
|
||||||
ntohs(addr.sin_port));
|
ntohs(entry->addr.sin_port));
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int net_init_cli_cb(const char *value, void *privdata)
|
static int net_init_cli_cb(const char *value, void *privdata)
|
||||||
{
|
{
|
||||||
struct fwd_addr *entry = malloc(sizeof(struct fwd_addr));
|
struct net_entry *entry = create_net_entry(value);
|
||||||
if (entry == NULL) {
|
if (entry == NULL) {
|
||||||
log_print(LOG_ERROR, "net_init_cli_cb(): out of memory");
|
log_print(LOG_ERROR, "net_init_cli_cb(): can not create net_entry");
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (parse_saddr(value, &entry->addr) == -1) {
|
list_add(&entry->list, &cli_list);
|
||||||
log_print(LOG_WARN, "invalid listen addr: '%s'", value);
|
|
||||||
free(entry);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
list_add(&entry->list, &fwd_list);
|
|
||||||
log_print(LOG_INFO, "forwarding to %s:%d",
|
log_print(LOG_INFO, "forwarding to %s:%d",
|
||||||
inet_ntoa(entry->addr.sin_addr),
|
inet_ntoa(entry->addr.sin_addr),
|
||||||
ntohs(entry->addr.sin_port));
|
ntohs(entry->addr.sin_port));
|
||||||
@ -171,29 +216,26 @@ static int net_init_cli_cb(const char *value, void *privdata)
|
|||||||
|
|
||||||
int net_init(void)
|
int net_init(void)
|
||||||
{
|
{
|
||||||
int srv_cnt = config_get_strings("global", "listen", net_init_srv_cb, NULL);
|
config_get_strings("global", "listen", net_init_srv_cb, NULL);
|
||||||
if (srv_cnt > 0) {
|
config_get_strings("global", "forward", net_init_cli_cb, NULL);
|
||||||
rx_buf = malloc(BUFSIZE);
|
|
||||||
if (rx_buf == NULL) {
|
|
||||||
log_print(LOG_ERROR, "net_init(): out of memory");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int cli_cnt = config_get_strings("global", "forward", net_init_cli_cb, NULL);
|
|
||||||
if (cli_cnt > 0) {
|
|
||||||
tx_buf = malloc(BUFSIZE);
|
|
||||||
if (tx_buf == NULL) {
|
|
||||||
log_print(LOG_ERROR, "net_init(): out of memory");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
fwd_sock = socket(PF_INET, SOCK_DGRAM, 0);
|
|
||||||
if (fwd_sock < 0) {
|
|
||||||
log_print(LOG_ERROR, "net_init(): socket()");
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void net_close(void) {
|
||||||
|
struct net_entry *entry, *tmp;
|
||||||
|
list_for_each_entry_safe(entry, tmp, &cli_list, list) {
|
||||||
|
list_del(&entry->list);
|
||||||
|
close(entry->socket);
|
||||||
|
free(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
list_for_each_entry_safe(entry, tmp, &srv_list, list) {
|
||||||
|
list_del(&entry->list);
|
||||||
|
event_remove_fd(entry->event);
|
||||||
|
close(entry->socket);
|
||||||
|
free(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
if (fwd_buf != NULL)
|
||||||
|
free(fwd_buf);
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user