socket interface modernisation, client keepalives

git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@30 3ce903b1-3385-4e86-93cd-f9a4a239f7ac
This commit is contained in:
Matti Aarnio 2008-03-01 01:45:12 +00:00
parent 3ae9cabc59
commit c1eb1c8fd2
9 changed files with 169 additions and 62 deletions

View File

@ -51,7 +51,7 @@ struct listen_t {
struct listen_t *next;
struct listen_t **prevp;
struct sockaddr_in sin;
union sockaddr_u sa;
socklen_t addr_len;
int fd;
@ -127,7 +127,7 @@ int open_tcp_listener(struct listen_t *l)
arg = 1;
setsockopt(f, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg));
if (bind(f, (struct sockaddr *)&l->sin, sizeof(l->sin))) {
if (bind(f, (struct sockaddr *)&l->sa, l->addr_len)) {
hlog(LOG_CRIT, "bind(%s): %s", l->addr_s, strerror(errno));
close(f);
return -1;
@ -149,15 +149,16 @@ int open_listeners(void)
struct listen_config_t *lc;
struct listen_t *l;
struct hostent *he;
char eb[80];
char eb[80], *s;
int opened = 0, i;
for (lc = listen_config; (lc); lc = lc->next) {
l = listener_alloc();
l->sin.sin_family = AF_INET;
l->sin.sin_port = htons(lc->port);
l->sa.si.sin_family = AF_INET;
l->sa.si.sin_port = htons(lc->port);
l->addr_len = sizeof(l->sa.si);
if (lc->host) {
if (!(he = gethostbyname(lc->host))) {
h_strerror(h_errno, eb, sizeof(eb));
@ -165,12 +166,16 @@ int open_listeners(void)
listener_free(l);
continue;
}
memcpy(&l->sin.sin_addr.s_addr, he->h_addr_list[0], he->h_length);
memcpy(&l->sa.si.sin_addr.s_addr, he->h_addr_list[0], he->h_length);
} else {
l->sin.sin_addr.s_addr = INADDR_ANY;
l->sa.si.sin_addr.s_addr = INADDR_ANY;
}
aptoa(l->sin.sin_addr, ntohs(l->sin.sin_port), eb, sizeof(eb));
eb[0] = '[';
inet_ntop(l->sa.sa.sa_family, &l->sa.si.sin_addr, eb+1, sizeof(eb)-1);
s = eb + strlen(eb);
sprintf(s, "]:%d", ntohs(((l->sa.sa.sa_family == AF_INET) ? l->sa.si.sin_port : l->sa.si6.sin6_port)));
l->addr_s = hstrdup(eb);
if (open_tcp_listener(l) >= 0) {
@ -185,7 +190,7 @@ int open_listeners(void)
/* Copy filter definitions */
for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
if (i < (sizeof(lc->filters)/sizeof(lc->filters[0])))
l->filters[i] = lc->filters[i];
l->filters[i] = (lc->filters[i]) ? hstrdup(lc->filters[i]) : NULL;
else
l->filters[i] = NULL;
}
@ -224,14 +229,14 @@ void close_listeners(void)
struct client_t *do_accept(struct listen_t *l)
{
char eb[80];
int fd, i;
struct client_t *c;
struct sockaddr_in *sin;
struct sockaddr sa;
union sockaddr_u sa;
socklen_t addr_len = sizeof(sa);
char eb[200];
char *s;
if ((fd = accept(l->fd, &sa, &addr_len)) < 0) {
if ((fd = accept(l->fd, (struct sockaddr*)&sa, &addr_len)) < 0) {
int e = errno;
switch (e) {
/* Errors reporting really bad internal (programming) bugs */
@ -263,9 +268,15 @@ struct client_t *do_accept(struct listen_t *l)
}
c = client_alloc();
sin = (struct sockaddr_in *)&c->addr;
c->fd = fd;
c->addr = sa;
c->state = CSTATE_LOGIN;
eb[0] = '[';
inet_ntop(sa.sa.sa_family, &sa.si.sin_addr, eb+1, sizeof(eb)-1);
s = eb + strlen(eb);
sprintf(s, "]:%d", ntohs((sa.sa.sa_family == AF_INET) ? sa.si.sin_port : sa.si6.sin6_port));
aptoa(sin->sin_addr, ntohs(sin->sin_port), eb, sizeof(eb));
c->addr_s = hstrdup(eb);
hlog(LOG_DEBUG, "%s - Accepted connection on fd %d from %s", l->addr_s, c->fd, eb);
@ -353,8 +364,10 @@ void accept_thread(void *asdf)
close_listeners();
/* start listening on the sockets */
if ((listen_n = open_listeners()) <= 0)
if ((listen_n = open_listeners()) <= 0) {
hlog(LOG_CRIT, "Failed to listen on any ports.");
exit(2);
}
hlog(LOG_DEBUG, "Generating polling list...");
acceptpfd = hmalloc(listen_n * sizeof(*acceptpfd));

View File

@ -234,16 +234,19 @@ int main(int argc, char **argv)
accept_reconfiguring = 1;
}
}
/*
/*
if (now >= next_expiry) {
next_expiry = now + expiry_interval;
// expire
// pbuf_expire();
}
if (now >= next_stats) {
next_stats = now + stats_interval;
// log stats
}
*/
}
hlog(LOG_INFO, "Signalling accept_thread to shut down...");

View File

@ -8,6 +8,7 @@
MyCall N0CALL
MyEmail email@example.com
MyAdmin "My Name, MYCALL"
MyHostname aprsc-1.example.com
### Directories #########
# Data directory (for database files)

View File

@ -52,6 +52,7 @@ char *logname = def_logname; /* syslog entries use this program name */
char *mycall = NULL;
char *myemail = NULL;
char *myadmin = NULL;
char *myhostname = "undefined-hostname";
char *new_mycall = NULL;
char *new_myemail = NULL;
char *new_myadmin = NULL;
@ -93,6 +94,7 @@ static struct cfgcmd cfg_cmds[] = {
{ "logdir", _CFUNC_ do_string, &new_logdir },
{ "mycall", _CFUNC_ do_string, &new_mycall },
{ "myemail", _CFUNC_ do_string, &new_myemail },
{ "myhostname", _CFUNC_ do_string, &myhostname },
{ "myadmin", _CFUNC_ do_string, &new_myadmin },
{ "workerthreads", _CFUNC_ do_int, &workers_configured },
{ "statsinterval", _CFUNC_ do_interval, &stats_interval },
@ -180,7 +182,7 @@ int do_interval(time_t *dest, int argc, char **argv)
/*
* Parse a Listen directive
*
* listen <label> <?> tcp <hostname> <portnum> [<filter> [..<more_filters>]]
* listen <label> <typelabel> tcp <hostname> <portnum> [<filter> [..<more_filters>]]
*
*/
@ -215,7 +217,6 @@ int do_listen(struct listen_config_t **lq, int argc, char **argv)
l->filters[i] = hstrdup(argv[i+6]);
}
}
/* put in the list */
l->next = *lq;

View File

@ -53,6 +53,7 @@ extern int verbose;
extern char *mycall;
extern char *myemail;
extern char *myadmin;
extern char *myhostname;
extern char *cfgfile;
extern char *pidfile;

View File

@ -131,22 +131,22 @@ int login_handler(struct worker_t *self, struct client_t *c, char *s, int len)
}
}
client_printf(self, c, "# logresp %s %s, server %s\r\n",
username,
(c->validated) ? "verified" : "unverified",
mycall
);
client_printf(self, c, "# logresp %s %s, server %s hostname %s\r\n",
username,
(c->validated) ? "verified" : "unverified",
mycall, myhostname
);
hlog(LOG_DEBUG, "%s: login '%s'%s%s%s%s%s%s%s%s",
c->addr_s, username,
(c->validated) ? " pass_ok" : "",
(!c->validated && given_passcode >= 0) ? " pass_invalid" : "",
(given_passcode < 0) ? " pass_none" : "",
(c->udp_port) ? " UDP" : "",
(c->app_name) ? " app " : "",
(c->app_name) ? c->app_name : "",
(c->app_version) ? " ver " : "",
(c->app_version) ? c->app_version : ""
c->addr_s, username,
(c->validated) ? " pass_ok" : "",
(!c->validated && given_passcode >= 0) ? " pass_invalid" : "",
(given_passcode < 0) ? " pass_none" : "",
(c->udp_port) ? " UDP" : "",
(c->app_name) ? " app " : "",
(c->app_name) ? c->app_name : "",
(c->app_version) ? " ver " : "",
(c->app_version) ? c->app_version : ""
);
return 0;

View File

@ -41,6 +41,7 @@
#endif
#include <inttypes.h>
#include "worker.h"
typedef uint32_t spkey_t;
@ -48,8 +49,8 @@ struct spblk {
struct spblk *leftlink;
struct spblk *rightlink;
struct spblk *uplink;
spkey_t key;
void *data;
spkey_t key;
};
struct sptree {

View File

@ -39,10 +39,15 @@
#include "filter.h"
time_t now; /* current time, updated by the main thread */
time_t next_keepalive;
extern int ibuf_size;
struct worker_t *worker_threads = NULL;
int workers_running = 0;
int keepalive_interval = 20; /* 20 seconds by default */
/* global packet buffer */
rwlock_t pbuf_global_rwlock = RWL_INITIALIZER;
struct pbuf_t *pbuf_global = NULL;
@ -57,11 +62,11 @@ struct client_t *client_alloc(void)
memset((void *)c, 0, sizeof(*c));
c->fd = -1;
c->ibuf_size = ibuf_size;
c->ibuf = hmalloc(c->ibuf_size);
c->obuf_size = obuf_size;
c->obuf = hmalloc(c->obuf_size);
c->ibuf_size = 1500;
c->ibuf = hmalloc(c->ibuf_size);
c->obuf_size = obuf_size;
c->obuf = hmalloc(c->obuf_size);
return c;
}
@ -108,7 +113,8 @@ void close_client(struct worker_t *self, struct client_t *c)
close(c->fd);
/* remove from polling list */
xpoll_remove(self->xp, c->xfd);
if (self->xp)
xpoll_remove(self->xp, c->xfd);
/* link the list together over this node */
if (c->next)
@ -135,9 +141,17 @@ int client_write(struct worker_t *self, struct client_t *c, char *p, int len)
*/
if (len > c->obuf_size - (c->obuf_end - c->obuf_start)) {
/* Oh crap, the data will not fit even if we move stuff.
* We'd overflow in any case, so we bail out.
*/
return -1;
Lets try writing.. */
int i = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
if (i > 0) {
c->obuf_start += i;
c->obuf_writes++;
}
/* Is it still out of space ? */
if (len > c->obuf_size - (c->obuf_end - c->obuf_start)) {
/* Oh crap, the data will still not fit! */
return -1;
}
}
/* okay, move stuff to the beginning to make space in the end */
memmove((void *)c->obuf, (void *)c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
@ -149,10 +163,22 @@ int client_write(struct worker_t *self, struct client_t *c, char *p, int len)
memcpy((void *)c->obuf + c->obuf_end, p, len);
c->obuf_end += len;
/* Is it over the flush size ? */
if (c->obuf_end > c->obuf_flushsize) {
int i = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
if (i > 0) {
c->obuf_start += i;
c->obuf_writes++;
}
}
/* All done ? */
if (c->obuf_start >= c->obuf_end) {
c->obuf_start = 0;
c->obuf_end = 0;
return len;
}
/* tell the poller that we have outgoing data */
/* TODO: should try writing right away, if there is a sufficient amount of
* data in obuf!
*/
xpoll_outgoing(self->xp, c->xfd, 1);
return len;
@ -187,6 +213,9 @@ int client_printf(struct worker_t *self, struct client_t *c, const char *fmt, ..
int handle_client_readable(struct worker_t *self, struct client_t *c)
{
int r;
char *s;
char *ibuf_end;
char *row_start;
r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
if (r == 0) {
@ -195,7 +224,11 @@ int handle_client_readable(struct worker_t *self, struct client_t *c)
return -1;
}
if (r < 0) {
if (errno == EINTR || errno == EAGAIN)
return 0; /* D'oh.. return again latter */
hlog(LOG_DEBUG, "read: Error from client fd %d (%s): %s", c->fd, c->addr_s, strerror(errno));
hlog(LOG_DEBUG, " .. ibuf=%p ibuf_end=%d ibuf_size=%d", c->ibuf, c->ibuf_end, c->ibuf_size-c->ibuf_end-1);
close_client(self, c);
return -1;
}
@ -206,9 +239,9 @@ int handle_client_readable(struct worker_t *self, struct client_t *c)
* without the CRLF (we accept either CR or LF or both, but make sure
* to always output CRLF
*/
char *s;
char *ibuf_end = c->ibuf + c->ibuf_end;
char *row_start = c->ibuf;
ibuf_end = c->ibuf + c->ibuf_end;
row_start = c->ibuf;
for (s = c->ibuf; s < ibuf_end; s++) {
if (*s == '\r' || *s == '\n') {
/* found EOL */
@ -246,6 +279,9 @@ int handle_client_writeable(struct worker_t *self, struct client_t *c)
r = write(c->fd, c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
if (r < 0) {
if (errno == EINTR || errno == EAGAIN)
return 0;
hlog(LOG_DEBUG, "write: Error from client fd %d (%s): %s", c->fd, c->addr_s, strerror(errno));
close_client(self, c);
return -1;
@ -325,11 +361,45 @@ void collect_new_clients(struct worker_t *self)
/* add to polling list */
c->xfd = xpoll_add(self->xp, c->fd, (void *)c);
client_printf(self, c, "# %s\r\n", SERVERID);
client_printf(self, c, "# Hello %s -- %s - %s - %s\r\n", c->addr_s, SERVERID, mycall, myhostname);
}
}
/*
* Send keepalives to client sockets
*/
void send_keepalives(struct worker_t *self)
{
struct client_t *c;
struct tm t;
char buf[130], *s;
int len;
sprintf(buf, "# %.40s %.40s ",SERVERID, myhostname);
s = buf + strlen(buf);
gmtime_r(&now, &t);
s += strftime(s, 40, "%Y-%m-%d %T UTC\r\n", &t);
len = (s - buf);
for (c = self->clients; (c); c = c->next) {
int flushlevel = c->obuf_flushsize;
/* Send keepalives only to sockets AFTER
they have completed their login.. */
if (c->state != CSTATE_CONNECTED)
continue;
c->obuf_flushsize = 0;
/* Write out immediately */
client_write(self, c, buf, len);
c->obuf_flushsize = flushlevel;
}
}
/*
* Process outgoing packets, write them to clients
*/
@ -360,6 +430,7 @@ void process_outgoing(struct worker_t *self)
void worker_thread(struct worker_t *self)
{
sigset_t sigs_to_block;
time_t next_keepalive = now + keepalive_interval;
sigemptyset(&sigs_to_block);
sigaddset(&sigs_to_block, SIGALRM);
@ -392,6 +463,12 @@ void worker_thread(struct worker_t *self)
/* if we have new stuff in the global packet buffer, process it */
if (*self->pbuf_global_prevp)
process_outgoing(self);
/* time of next keepalive broadcast ? */
if (next_keepalive < now) {
send_keepalives(self);
next_keepalive += keepalive_interval;
}
}
/* stop polling */
@ -402,10 +479,7 @@ void worker_thread(struct worker_t *self)
while (self->clients)
close_client(self, self->clients);
/* FIXME: should move thread-local pbuf_free_* and
* pbuf_incoming entries to the global pool to avoid
* leaking!
*/
/* workers_stop() will clean up thread-local pbuf pools */
hlog(LOG_DEBUG, "Worker %d shut down.", self->id);
}
@ -441,6 +515,8 @@ void workers_stop(int stop_all)
else
hlog(LOG_INFO, "Worker %d has terminated.", w->id);
for (p = w->pbuf_free_small; p; p = pn) {
pn = p->next;
pbuf_free(p);
@ -472,7 +548,7 @@ void workers_start(void)
struct worker_t **prevp;
workers_stop(0);
while (workers_running < workers_configured) {
hlog(LOG_DEBUG, "Starting a worker thread...");
i = 0;

View File

@ -60,6 +60,8 @@ extern time_t now; /* current time - updated by the main thread */
#define T_TELEMETRY 16
#define F_DUPE 1 /* duplicate of a previously seen packet */
#define F_DUPEKEY 2 /* first of the unique keys.. */
#define F_FASTPOS 4 /* last position packet of given object/source id */
struct client_t; /* forward declarator */
@ -113,12 +115,19 @@ extern struct pbuf_t **pbuf_global_prevp;
struct worker_t; /* used in client_t, but introduced later */
struct filter_t; /* used in client_t, but introduced later */
union sockaddr_u {
struct sockaddr sa;
struct sockaddr_in si;
struct sockaddr_in6 si6;
};
struct client_t {
struct client_t *next;
struct client_t **prevp;
struct sockaddr addr;
union sockaddr_u addr;
int fd;
int udp_port;
char *addr_s;
@ -127,15 +136,17 @@ struct client_t {
/* first stage read buffer - used to crunch out lines to packet buffers */
char *ibuf;
int ibuf_size; /* size of buffer */
int ibuf_end; /* where data in buffer ends */
int ibuf_size; /* size of buffer */
int ibuf_end; /* where data in buffer ends */
/* output buffer */
char *obuf;
int obuf_size; /* size of buffer */
int obuf_start; /* where data in buffer starts */
int obuf_end; /* where data in buffer ends */
int obuf_size; /* size of buffer */
int obuf_start; /* where data in buffer starts */
int obuf_end; /* where data in buffer ends */
int obuf_flushsize; /* how much data in buf before forced write() at adding ? */
int obuf_writes; /* how many times (since last check) the socket has been written ? */
/* state of the client... one of CSTATE_* */
int state;
char *username;