From a207b3e1e6c0ca8041de6c1d54b1a45aa1355596 Mon Sep 17 00:00:00 2001 From: Heikki Hannikainen Date: Sat, 12 May 2012 11:19:53 +0000 Subject: [PATCH] + implemented 'multiro' uplinks: parallel read-only links to multiple hubs, to be used for aprs.fi uplink aggregation + fixed passing packets from uplinks to clients git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@337 3ce903b1-3385-4e86-93cd-f9a4a239f7ac --- src/config.c | 39 +++++- src/config.h | 7 ++ src/dupecheck.c | 5 +- src/incoming.c | 26 ++-- src/uplink.c | 202 ++++++++++++++++++++++-------- src/worker.c | 10 +- src/worker.h | 2 + tests/Makefile | 12 ++ tests/aggregator/10aggr_join.t | 103 +++++++++++++++ tests/cfg-aprsc/aggregator | 73 +++++++++++ tests/cfg-aprsc/uplinks | 73 +++++++++++ tests/libperl/Ham/APRS/IS.pm | 17 +++ tests/libperl/Ham/APRS/IS_Fake.pm | 92 ++++++++++++++ tests/libperl/istest.pm | 6 + tests/libperl/runproduct.pm | 2 +- 15 files changed, 592 insertions(+), 77 deletions(-) create mode 100644 tests/aggregator/10aggr_join.t create mode 100644 tests/cfg-aprsc/aggregator create mode 100644 tests/cfg-aprsc/uplinks create mode 100644 tests/libperl/Ham/APRS/IS_Fake.pm diff --git a/src/config.c b/src/config.c index 54684b9..ee2eb0d 100644 --- a/src/config.c +++ b/src/config.c @@ -324,9 +324,16 @@ int do_uplink(struct uplink_config_t **lq, int argc, char **argv) /* argv[1] is name label for this uplink */ - if (strcasecmp(argv[2],"ro")==0) { - clflags |= CLFLAGS_PORT_RO; - } // FIXME: other tokens ?? + if (strcasecmp(argv[2], "ro")==0) { + clflags |= CLFLAGS_PORT_RO; + } else if (strcasecmp(argv[2], "multiro")==0) { + clflags |= CLFLAGS_PORT_RO|CLFLAGS_UPLINKMULTI; + } else if (strcasecmp(argv[2], "full") == 0) { + /* regular */ + } else { + hlog(LOG_ERR, "Uplink: Unsupported uplink type '%s'\n", argv[2]); + return -2; + } memset(&req, 0, sizeof(req)); req.ai_family = 0; @@ -372,6 +379,7 @@ int do_uplink(struct uplink_config_t **lq, int argc, char **argv) l->host = hstrdup(argv[4]); l->port = hstrdup(argv[5]); l->client_flags = clflags; + l->state = UPLINK_ST_UNKNOWN; for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) { l->filters[i] = NULL; @@ -598,6 +606,31 @@ int read_config(void) failed = 1; } + /* validate uplink config: if there is a single 'multiro' connection + * configured, all of the uplinks must be 'multiro' + */ + int uplink_config_failed = 0; + int got_multiro = 0; + int got_non_multiro = 0; + struct uplink_config_t *up; + for (up = new_uplink_config; (up); up = up->next) { + if (up->client_flags & CLFLAGS_UPLINKMULTI) + got_multiro = 1; + else + got_non_multiro = 1; + if ((up->client_flags & CLFLAGS_UPLINKMULTI) && !(up->client_flags & CLFLAGS_PORT_RO)) { + uplink_config_failed = 1; + hlog(LOG_WARNING, "Config: uplink with non-RO MULTI uplink - would cause a loop, not allowed."); + } + } + if ((got_multiro) && (got_non_multiro)) { + hlog(LOG_WARNING, "Config: Configured both multiro and non-multiro uplinks - would cause a loop, not allowed."); + failed = 1; + free_uplink_config(&new_uplink_config); + } + if (uplink_config_failed) + free_uplink_config(&new_uplink_config); + if (new_fileno_limit > 0 && new_fileno_limit != fileno_limit) { /* Adjust process global fileno limit */ int e; diff --git a/src/config.h b/src/config.h index 4e20b18..1220138 100644 --- a/src/config.h +++ b/src/config.h @@ -107,8 +107,15 @@ struct uplink_config_t { const char *filters[10]; /* up to 10 filters, NULL when not defined */ int client_flags; + int state; /* the state of the uplink */ }; +#define UPLINK_ST_UNKNOWN -1 +#define UPLINK_ST_NOT_LINKED 0 +#define UPLINK_ST_CONNECTING 1 +#define UPLINK_ST_CONNECTED 2 +#define UPLINK_ST_LINKED 3 + extern struct listen_config_t *listen_config; extern struct peerip_config_t *peerip_config; extern struct uplink_config_t *uplink_config; diff --git a/src/dupecheck.c b/src/dupecheck.c index bc8e85f..ea8ce11 100644 --- a/src/dupecheck.c +++ b/src/dupecheck.c @@ -486,8 +486,8 @@ static void dupecheck_thread(void) w->pbuf_incoming_count = 0; pthread_mutex_unlock(&w->pbuf_incoming_mutex); - // hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d", - // c, w->id, dupecheck_seqnum); + hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d", + c, w->id, dupecheck_seqnum); for (pb = pb_list; (pb); pb = pbnext) { int rc = dupecheck(pb); @@ -505,6 +505,7 @@ static void dupecheck_thread(void) pb_out_dupe_last = pb; pb->seqnum = ++dupecheck_dupe_seqnum; ++pb_out_dupe_count; + //hlog(LOG_DEBUG, "is duplicate"); } n++; } diff --git a/src/incoming.c b/src/incoming.c index 9b69ede..a126e98 100644 --- a/src/incoming.c +++ b/src/incoming.c @@ -434,23 +434,23 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) /* look for the '>' */ src_end = memchr(s, '>', pathlen < CALLSIGNLEN_MAX+1 ? pathlen : CALLSIGNLEN_MAX+1); if (!src_end) - return -1; // No ">" in packet start.. + return -2; // No ">" in packet start.. path_start = src_end+1; if (path_start >= packet_end) - return -1; + return -3; if (src_end - s > CALLSIGNLEN_MAX || src_end - s < CALLSIGNLEN_MIN) - return -1; /* too long source callsign */ + return -4; /* too long source callsign */ info_start = path_end+1; // @":"+1 - first char of the payload if (info_start >= packet_end) - return -1; + return -5; /* see that there is at least some data in the packet */ info_end = packet_end; if (info_end <= info_start) - return -1; + return -6; /* look up end of dstcall (excluding SSID - this is the way dupecheck and * mic-e parser wants it) @@ -464,7 +464,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) dstcall_end++; if (dstcall_end - path_start > CALLSIGNLEN_MAX) - return -1; /* too long for destination callsign */ + return -7; /* too long for destination callsign */ /* where does the digipeater path start? */ via_start = dstcall_end; @@ -478,8 +478,8 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) /* if disallow_unverified is anabled, don't allow unverified clients * to send packets where srccall != login */ - if (!c->validated && !originated_by_client && disallow_unverified) - return -1; + if (!c->validated && !originated_by_client && disallow_unverified && !(c->flags & CLFLAGS_UPLINKPORT)) + return -8; /* process Q construct, path_append_len of path_append will be copied * to the end of the path later @@ -499,7 +499,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) if (!pb) { // This should never happen... hlog(LOG_INFO, "pbuf_get failed to get a block"); - return -1; // No room :-( + return -9; // No room :-( } pb->next = NULL; // pbuf arrives pre-zeroed @@ -530,7 +530,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) pb->qconst_start = pb->data + (q_start - s); } else { fprintf(stderr, "q construct bug: did not find a good construct or produce a new one for:\n%s\n", s); - return -1; + return -10; } /* Copy the modified or appended part of the packet header -- qcons */ @@ -560,7 +560,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) pb->dstcall_len = via_start - src_end - 1; pb->info_start = info_start; - // hlog(LOG_DEBUG, "After parsing and Qc algorithm: %.*s", pb->packet_len-2, pb->data); + //hlog(LOG_DEBUG, "After parsing and Qc algorithm: %.*s", pb->packet_len-2, pb->data); /* just try APRS parsing */ rc = parse_aprs(self, pb); @@ -628,9 +628,9 @@ int incoming_uplinksim_handler(struct worker_t *self, struct client_t *c, char * /* failed parsing */ if (e == -42) - hlog(LOG_DEBUG, "Packet too short (%d): %.*s", len, len, s); + hlog(LOG_DEBUG, "Uplinksim: Packet too short (%d): %.*s", len, len, s); else - hlog(LOG_DEBUG, "Failed parsing (%d): %.*s",e,len,s); + hlog(LOG_DEBUG, "Uplinksim: Failed parsing (%d): %.*s",e,len,s); } return 0; diff --git a/src/uplink.c b/src/uplink.c index 3507535..a3299fb 100644 --- a/src/uplink.c +++ b/src/uplink.c @@ -32,12 +32,13 @@ #include "filter.h" #include "passcode.h" +#define MAX_UPLINKS 32 int uplink_reconfiguring; int uplink_shutting_down; pthread_mutex_t uplink_client_mutex = PTHREAD_MUTEX_INITIALIZER; -struct client_t *uplink_client; +struct client_t *uplink_client[MAX_UPLINKS]; int uplink_running; pthread_t uplink_th; @@ -68,23 +69,28 @@ int uplink_sighandler(int signum) } /* - * Open the uplinking socket + * Close uplinking sockets */ void close_uplinkers(void) { int rc; + + hlog(LOG_INFO, "Closing all uplinks"); if ((rc = pthread_mutex_lock(&uplink_client_mutex))) { hlog( LOG_ERR, "close_uplinkers(): could not lock uplink_client_mutex: %s", strerror(rc) ); return; } - - if (uplink_client && uplink_client->fd >= 0) { - hlog( LOG_DEBUG, "Closing uplinking socket .. fd %d ...", uplink_client->fd ); - shutdown(uplink_client->fd, SHUT_RDWR); + + int i; + for (i = 0; i < MAX_UPLINKS; i++) { + if ((uplink_client[i]) && uplink_client[i]->fd >= 0) { + hlog( LOG_DEBUG, "Closing uplinking socket %d (fd %d) %s ...", i, uplink_client[i]->fd, uplink_client[i]->addr_ss ); + shutdown(uplink_client[i]->fd, SHUT_RDWR); + } } - + if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) { hlog( LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc) ); return; @@ -96,7 +102,7 @@ void uplink_close(struct client_t *c) { int rc; - hlog(LOG_DEBUG, "Uplink socket has been closed."); + hlog(LOG_DEBUG, "Uplink to %s has been closed.", c->addr_ss); if ((rc = pthread_mutex_lock(&uplink_client_mutex))) { hlog(LOG_ERR, "close_uplinkers(): could not lock uplink_client_mutex: %s", strerror(rc)); @@ -105,7 +111,7 @@ void uplink_close(struct client_t *c) -- uplink_connects.gauge; - uplink_client = NULL; // there can be only one! + uplink_client[c->uplink_index] = NULL; // there can be only one! if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) { hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc)); @@ -120,9 +126,9 @@ int uplink_login_handler(struct worker_t *self, struct client_t *c, char *s, int char buf[1000]; int passcode, rc; - #ifndef FIXED_IOBUFS - if (!c->username) c->username = hstrdup("simulator"); + if (!c->username) + c->username = hstrdup("simulator"); #else if (!*c->username) strcpy(c->username, "simulator"); @@ -155,6 +161,7 @@ int uplink_login_handler(struct worker_t *self, struct client_t *c, char *s, int int make_uplink(struct uplink_config_t *l) { int fd, i, arg; + int uplink_index; struct client_t *c; union sockaddr_u sa; /* large enough for also IPv6 address */ socklen_t addr_len; @@ -172,7 +179,17 @@ int make_uplink(struct uplink_config_t *l) req.ai_protocol = IPPROTO_TCP; req.ai_flags = 0; ai = NULL; - + + /* find a free uplink slot */ + for (uplink_index = 0; uplink_index < MAX_UPLINKS; uplink_index++) { + if (!uplink_client[uplink_index]) + break; + } + if (uplink_index == MAX_UPLINKS) { + hlog(LOG_ERR, "Uplink: No available uplink slots, %d used", MAX_UPLINKS); + return -2; + } + if (strcasecmp(l->proto, "tcp") == 0) { // well, do nothing for now. } else if (strcasecmp(l->proto, "udp") == 0) { @@ -194,13 +211,14 @@ int make_uplink(struct uplink_config_t *l) return -2; } + l->state = UPLINK_ST_CONNECTING; i = getaddrinfo(l->host, l->port, &req, &ai); if (i != 0) { - hlog(LOG_INFO,"Uplink: address resolving failure of '%s' '%s'",l->host,l->port); + hlog(LOG_INFO,"Uplink: address resolving failure of '%s' '%s'", l->host, l->port); + l->state = UPLINK_ST_NOT_LINKED; return i; } - i = 0; for (a = ai; a && i < 20 ; a = a->ai_next, ++i) { ap[i] = a; /* Up to 20 first addresses */ @@ -224,8 +242,7 @@ int make_uplink(struct uplink_config_t *l) // FIXME: format socket IP address to text sprintf(addr_s, "%s:%s", l->host, l->port); - - hlog(LOG_INFO, "Making uplink TCP socket: %s %s", l->host, l->port); + hlog(LOG_INFO, "Uplink: Connecting to %s:%s", l->host, l->port); if ((fd = socket(a->ai_family, a->ai_socktype, a->ai_protocol)) < 0) { hlog(LOG_CRIT, "Uplink: socket(): %s\n", strerror(errno)); @@ -234,24 +251,72 @@ int make_uplink(struct uplink_config_t *l) arg = 1; setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg)); - - if (connect(fd, a->ai_addr, a->ai_addrlen)) { - hlog(LOG_CRIT, "Uplink: connect(%s): %s", addr_s, strerror(errno)); + + /* set non-blocking mode at this point, so that we can make a + * non-blocking connect() with a short timeout + */ + if (fcntl(fd, F_SETFL, O_NONBLOCK)) { + hlog(LOG_CRIT, "Uplink: Failed to set non-blocking mode on new socket: %s", strerror(errno)); close(fd); fd = -1; continue; } - if (fd >= 0) - break; /* Successfull connect! */ + + if (connect(fd, a->ai_addr, a->ai_addrlen) && errno != EINPROGRESS) { + hlog(LOG_ERR, "Uplink: connect(%s) failed: %s", addr_s, strerror(errno)); + close(fd); + fd = -1; + continue; + } + + /* Only wait a few seconds for the connection to be created. + * If the connection setup is very slow, it is unlikely to + * perform well enough anyway. + */ + struct pollfd connect_fd; + connect_fd.fd = fd; + connect_fd.events = POLLOUT; + connect_fd.revents = 0; + + int r = poll(&connect_fd, 1, 3000); + hlog(LOG_DEBUG, "Uplink: poll after connect returned %d, revents %d", r, connect_fd.revents); + + if (r < 0) { + hlog(LOG_ERR, "Uplink: connect to %s: poll failed: %s", addr_s, strerror(errno)); + close(fd); + fd = -1; + continue; + } + + if (r < 1) { + hlog(LOG_ERR, "Uplink: connect to %s timed out", addr_s); + close(fd); + fd = -1; + continue; + } + + socklen_t optlen = sizeof(arg); + getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *)&arg, &optlen); + if (arg == 0) { + /* Successful connect! */ + hlog(LOG_DEBUG, "Uplink: successfull connect"); + break; + } + + hlog(LOG_ERR, "Uplink: connect to %s failed: %s", addr_s, strerror(arg)); + close(fd); + fd = -1; } freeaddrinfo(ai); /* Not needed anymore.. */ if (fd < 0) { + l->state = UPLINK_ST_NOT_LINKED; return -3; /* No successfull connection at any address.. */ } c = client_alloc(); + c->uplink_index = uplink_index; c->fd = fd; c->addr = sa; c->state = CSTATE_CONNECTED; @@ -299,20 +364,14 @@ int make_uplink(struct uplink_config_t *l) hlog(LOG_INFO, "%s - Uplink connection on fd %d from %s", c->addr_ss, c->fd, c->addr_s); - uplink_client = c; - - + uplink_client[uplink_index] = c; + l->state = UPLINK_ST_CONNECTED; + // for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) { // if (l->filters[i]) // filter_parse(c, l->filters[i], 0); /* system filters */ // } - /* set non-blocking mode */ - if (fcntl(c->fd, F_SETFL, O_NONBLOCK)) { - hlog(LOG_ERR, "Uplink: %s - Failed to set non-blocking mode on socket: %s", c->addr_ss, strerror(errno)); - goto err; - } - /* Push it on the first worker, which ever it is.. */ @@ -334,7 +393,7 @@ int make_uplink(struct uplink_config_t *l) hlog(LOG_ERR, "make_uplink(): could not unlock new_clients_mutex: %s", strerror(pe)); goto err; } - + ++ uplink_connects.gauge; ++ uplink_connects.counter; ++ uplink_connects.refcount; /* <-- that does not get decremented at any time.. */ @@ -345,7 +404,8 @@ int make_uplink(struct uplink_config_t *l) err: client_free(c); - uplink_client = NULL; + uplink_client[uplink_index] = NULL; + l->state = UPLINK_ST_NOT_LINKED; return -1; } @@ -358,7 +418,8 @@ void uplink_thread(void *asdf) { sigset_t sigs_to_block; int rc; - + int next_uplink = -1; /* the index to the next regular uplink candidate */ + pthreads_profiling_reset("uplink"); sigemptyset(&sigs_to_block); @@ -372,46 +433,80 @@ void uplink_thread(void *asdf) sigaddset(&sigs_to_block, SIGUSR1); sigaddset(&sigs_to_block, SIGUSR2); pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL); - - hlog(LOG_INFO, "Uplink_thread starting..."); - + + hlog(LOG_INFO, "Uplink thread starting..."); + uplink_reconfiguring = 1; while (!uplink_shutting_down) { if (uplink_reconfiguring) { uplink_reconfiguring = 0; close_uplinkers(); - hlog(LOG_INFO, "Uplink thread ready."); + hlog(LOG_INFO, "Uplink thread configured."); } /* sleep for 1 second */ poll(NULL, 0, 1000); + /* speed up shutdown */ + if (uplink_shutting_down) + continue; + if ((rc = pthread_mutex_lock(&uplink_client_mutex))) { hlog(LOG_ERR, "uplink_thread(): could not lock uplink_client_mutex: %s", strerror(rc)); continue; } - - if (!uplink_client) { - int n = 0; - struct uplink_config_t *l = uplink_config; - for (; l ; l = l->next ) - ++n; - l = uplink_config; - if (n > 0) { - n = random() % n; - for (; l && n > 0; l = l->next, --n) - ; - if (!l) l = uplink_config; - if (l) + + /* Check if all we have a single regular uplink connection up, out of all + * the configured ones. Also, check that all the UPLINKMULTI links are + * connected. + */ + + int has_uplink = 0; /* do we have a single regular uplink? */ + int avail_uplink = 0; /* how many regular uplinks are configured? */ + + struct uplink_config_t *l = uplink_config; + for (; l; l = l->next) { + if (l->client_flags & CLFLAGS_UPLINKMULTI) { + /* MULTI uplink, needs to be up */ + if (l->state < UPLINK_ST_CONNECTING) make_uplink(l); + } else { + /* regular uplink, need to have one connected */ + if (l->state >= UPLINK_ST_CONNECTING) + has_uplink++; + avail_uplink++; } } - + + if (avail_uplink && !has_uplink) { + hlog(LOG_INFO, "Uplink: %d uplinks configured, %d are connected, need to pick new", avail_uplink, has_uplink); + /* we have regular uplinks but none are connected, + * pick the next one and connect */ + next_uplink++; + if (next_uplink >= avail_uplink) + next_uplink = 0; + hlog(LOG_DEBUG, "Uplink: picked uplink index %d as the new candidate", next_uplink); + l = uplink_config; + int i = 0; + while ((l) && i < next_uplink) { + if (!(l->client_flags & CLFLAGS_UPLINKMULTI)) + i++; + l = l->next; + } + if (l) { + hlog(LOG_DEBUG, "Uplink: trying %s (%s:%s)", l->name, l->host, l->port); + make_uplink(l); + } + } + if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) { - hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc)); + hlog(LOG_CRIT, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc)); continue; } + + /* sleep for 4 seconds between successful rounds */ + poll(NULL, 0, 4000); } hlog(LOG_DEBUG, "Uplinker thread shutting down uplinking sockets..."); @@ -420,8 +515,9 @@ void uplink_thread(void *asdf) /* - * Start / stop dupecheck + * Start / stop the uplinks maintainer thread */ + void uplink_start(void) { if (uplink_running) diff --git a/src/worker.c b/src/worker.c index 3d59b5d..55550ab 100644 --- a/src/worker.c +++ b/src/worker.c @@ -728,7 +728,7 @@ int handle_client_readable(struct worker_t *self, struct client_t *c) char *row_start; if (c->fd < 0) { - hlog(LOG_DEBUG, "client no longer alive, closing (%s)", c->fd, c->addr_s); + hlog(LOG_DEBUG, "socket no longer alive, closing (%s)", c->fd, c->addr_s); close_client(self, c); return -1; } @@ -738,7 +738,7 @@ int handle_client_readable(struct worker_t *self, struct client_t *c) r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1); if (r == 0) { - hlog( LOG_DEBUG, "read: EOF from client fd %d (%s @ %s)", + hlog( LOG_DEBUG, "read: EOF from socket fd %d (%s @ %s)", c->fd, c->addr_s, c->addr_ss ); close_client(self, c); return -1; @@ -747,7 +747,7 @@ int handle_client_readable(struct worker_t *self, struct client_t *c) if (errno == EINTR || errno == EAGAIN) return 0; /* D'oh.. return again latter */ - hlog( LOG_DEBUG, "read: Error from client fd %d (%s): %s", + hlog( LOG_DEBUG, "read: Error from socket fd %d (%s): %s", c->fd, c->addr_s, strerror(errno)); hlog( LOG_DEBUG, " .. ibuf=%p ibuf_end=%d ibuf_size=%d", c->ibuf, c->ibuf_end, c->ibuf_size-c->ibuf_end-1); @@ -821,13 +821,13 @@ int handle_client_writeable(struct worker_t *self, struct client_t *c) if (errno == EINTR || errno == EAGAIN) return 0; - hlog(LOG_DEBUG, "write: Error from client fd %d (%s): %s", c->fd, c->addr_s, strerror(errno)); + hlog(LOG_DEBUG, "write: Error from socket fd %d (%s): %s", c->fd, c->addr_s, strerror(errno)); close_client(self, c); return -1; } c->obuf_start += r; - //hlog(LOG_DEBUG, "write: %d bytes to client fd %d (%s) - %d in obuf", r, c->fd, c->addr_s, c->obuf_end - c->obuf_start); + //hlog(LOG_DEBUG, "write: %d bytes to socket fd %d (%s) - %d in obuf", r, c->fd, c->addr_s, c->obuf_end - c->obuf_start); if (c->obuf_start == c->obuf_end) { xpoll_outgoing(&self->xp, c->xfd, 0); c->obuf_start = c->obuf_end = 0; diff --git a/src/worker.h b/src/worker.h index f5f061b..62a62af 100644 --- a/src/worker.h +++ b/src/worker.h @@ -208,6 +208,7 @@ struct client_t { char *addr_hex; /* client IP address in hex format */ char *addr_ss; /* server IP address in text format */ #endif + int uplink_index; /* uplink array index */ int portnum; time_t connect_time;/* Time of connection */ time_t last_read; /* Time of last read - not necessarily last packet... */ @@ -248,6 +249,7 @@ struct client_t { #define CLFLAGS_MESSAGEONLY 0x400 /* Together with filter t/m -- which really implements it */ #define CLFLAGS_CLIENTONLY 0x800 /* Client connected on client-only port */ #define CLFLAGS_IGATE 0x1000 /* Igate port */ +#define CLFLAGS_UPLINKMULTI 0x2000 /* Allow multiple parallel outgoing connections */ CStateEnum state; /* state of the client... one of CSTATE_* */ char warned; /* the client has been warned that it has bad filter definition */ diff --git a/tests/Makefile b/tests/Makefile index 7f56446..b1d1983 100644 --- a/tests/Makefile +++ b/tests/Makefile @@ -8,6 +8,18 @@ INST_ARCHLIB = libperl PERL=/usr/bin/perl +TESTID=`echo $$$$_$$RANDOM$$RANDOM$$RANDOM` +TEST_SRCCALLS="XX7FIT-1,XX7FIT-2" +TEST_IGATE="XX7IG" + test: + rm -f logs/aprsc.log PERL_DL_NONLAZY=1 $(PERL) "-MExtUtils::Command::MM" "-e" "test_harness($(TEST_VERBOSE), '$(INST_LIB)', '$(INST_ARCHLIB)')" $(TEST_FILES) +AGGREGATOR_FILES = aggregator/*.t + +aggrtest: + rm -f logs/aprsc.log + TEST_ID=$(TESTID) TEST_IGATE=$(TEST_IGATE) TEST_SRCCALLS=$(TEST_SRCCALLS) \ + PERL_DL_NONLAZY=1 $(PERL) "-MExtUtils::Command::MM" "-e" "test_harness($(TEST_VERBOSE), '$(INST_LIB)', '$(INST_ARCHLIB)')" $(AGGREGATOR_FILES) + diff --git a/tests/aggregator/10aggr_join.t b/tests/aggregator/10aggr_join.t new file mode 100644 index 0000000..a002c07 --- /dev/null +++ b/tests/aggregator/10aggr_join.t @@ -0,0 +1,103 @@ +######################### +# Feed data to an aggregator server from two servers +# and validate that no dupes come out +######################### + +use Test; + +my @packets; + +my $id_ok; + +BEGIN { + my $test_srcs = $ENV{'TEST_SRCCALLS'}; + my @test_srcl = split(',', $test_srcs); + my $test_igate = $ENV{'TEST_IGATE'}; + my $id = $ENV{'TEST_ID'}; + my $tlm_seq = int(rand(1000)); + + foreach my $src (@test_srcl) { + push @packets, "$src>APZMDR,TRACE2-2,qAR,$test_igate:!/0(KkThq\">{2Oaprs.fi system testing COM_$id"; + push @packets, "$src>APZMDR,TRACE2-2,qAR,$test_igate:>system testing STA_$id status"; + push @packets, "$src>ID,qAR,$test_igate:Stupid testing BEACON_$id beacon"; + push @packets, "$src>APRS,WIDE2-2,qAR,${test_igate}::OH7LZB-10:MSG_$id"; + push @packets, "$src>APRS,WIDE2-2,qAR,${test_igate}:T#$tlm_seq,1,010,100,500,900,10101010"; + push @packets, "$src>APRS,WIDE2-2,qAR,${test_igate}:_03041723c344s009g014t038r000p000P000h38b10207wDVP"; + # bad packet to force raw packets flush + push @packets, "$src>APZMDR,TRACE2-2,qAR,$test_igate:!/0(KBAD LAST BAD_$id"; + } + + plan tests => 9 + 2 + ($#packets+1)*2 + 2 + 2 + 1 +}; + +use Ham::APRS::IS_Fake; +use runproduct; + +my $ret; + +ok(1); # modules load fine + +my $iss1 = new Ham::APRS::IS_Fake('127.0.0.1:10153', 'CORE1'); +my $iss2 = new Ham::APRS::IS_Fake('127.0.0.1:10154', 'CORE2'); +ok(1); # there's a working socket + +$iss1->bind_and_listen(); +$iss2->bind_and_listen(); +ok(1); + +# initialize the product runner using the basic configuration +my $p = new runproduct('aggregator'); +ok(defined $p, 1, "Failed to initialize product runner"); +ok($p->start(), 1, "Failed to start product"); + +my $is1 = $iss1->accept(); +ok(defined $is1, (1), "Failed to accept connection 1 from server"); +my $is2 = $iss2->accept(); +ok(defined $is2, (1), "Failed to accept connection 2 from server"); + +$iss1->send_login_prompt($is1); +$iss1->send_login_ok($is1); +$iss2->send_login_prompt($is2); +$iss2->send_login_ok($is2); + +my $read1 = $is1->getline_noncomment(1); +my $read2 = $is2->getline_noncomment(2); +ok($read1, qr/^user TESTING pass 31421 /, "Did not receive 'user' login command on connection 1"); +ok($read2, qr/^user TESTING pass 31421 /, "Did not receive 'user' login command on connection 2"); + +# create client connection +my $cl = new Ham::APRS::IS("localhost:10152", 'CL1ENT'); +ok(defined $cl, 1, "Failed to initialize Ham::APRS::IS"); +$ret = $cl->connect('retryuntil' => 8); +ok($ret, 1, "Failed to connect to the server: " . $cl->{'error'}); + +foreach $i (@packets) { + ok($is1->sendline($i), 1, "failed to write packet to socket 1"); + ok($is2->sendline($i), 1, "failed to write packet to socket 2"); +} + +sleep(3); + +$read1 = $is1->getline_noncomment(1); +$read2 = $is2->getline_noncomment(2); +ok($read1, undef, "Ouch, received data from read-only upstream connection 1"); +ok($read2, undef, "Ouch, received data from read-only upstream connection 2"); + +while (my $l = $cl->getline_noncomment(1)) { + warn "got: $l\n"; +} + +#my $read1 = istest::read_and_disconnect($sock1); +#my $read2 = istest::read_and_disconnect($sock2); + +$ret = $is1->disconnect(); +ok($ret, 1, "Failed to disconnect 1: " . $is1->{'error'}); +$ret = $is2->disconnect(); +ok($ret, 1, "Failed to disconnect 2: " . $is2->{'error'}); + +ok($p->stop(), 1, "Failed to stop product"); + + +######################### + + diff --git a/tests/cfg-aprsc/aggregator b/tests/cfg-aprsc/aggregator new file mode 100644 index 0000000..24cc158 --- /dev/null +++ b/tests/cfg-aprsc/aggregator @@ -0,0 +1,73 @@ +# +# USE RCS !!! +# $Id$ +# + +# Configuration for aprsc, an APRS-IS server for core servers + +MyCall TESTING +MyEmail email@example.com +MyAdmin "Admin, N0CALL" + +### Directories ######### +# Data directory (for database files) +RunDir data + +# Log directory +LogDir logs + +### Intervals ######### +# Interval specification format examples: +# 600 (600 seconds), 5m, 2h, 1h30m, 1d3h15m24s, etc... + +# Statistics logging +StatsInterval 1m + +# How often to scan the caches for old entries +ExpiryInterval 30s + +# How long to cache last position of each station +LastPositionCache 48h + +# How long to cache duplicate filter entries (practically all packets) +DupeFilterCache 30s + +# When no data is received from an upstream server in N seconds, switch to +# another server +UpstreamTimeout 10s + +# When no data is received from a downstream server in N seconds, disconnect +ClientTimeout 48h + +### TCP listener ########## +# Listen tcp
+# socketname: any name you wish to show up in logs and statistics +# porttype: one of: +# fullfeed - everything, after dupe filtering +# dupefeed - everything that comes in - with dupes! +# msgonly - messages only +# userfilter - user-specified filters +# +Listen "Messages only" messageonly tcp 0.0.0.0 1314 t/m +Listen "Full feed with CWOP" fullfeed tcp 0.0.0.0 10152 t/c* +Listen "Igate port" igate tcp 0.0.0.0 14580 +Listen "Client-only port" clientonly tcp 0.0.0.0 14581 + +Uplink full1 multiro tcp 127.0.0.1 10153 +Uplink full2 multiro tcp 127.0.0.1 10154 + +### Internals ############ +# Only use 3 threads in these basic tests, to keep startup/shutdown times +# short. +WorkerThreads 3 + +# When running this server as super-user, the server can (in many systems) +# increase several resource limits, and do other things that less privileged +# server can not do. +# +# The FileLimit is resource limit on how many simultaneous connections and +# some other internal resources the system can use at the same time. +# If the server is not being run as super-user, this setting has no effect. +# +FileLimit 10000 + diff --git a/tests/cfg-aprsc/uplinks b/tests/cfg-aprsc/uplinks new file mode 100644 index 0000000..25bd8c6 --- /dev/null +++ b/tests/cfg-aprsc/uplinks @@ -0,0 +1,73 @@ +# +# USE RCS !!! +# $Id$ +# + +# Configuration for aprsc, an APRS-IS server for core servers + +MyCall TESTING +MyEmail email@example.com +MyAdmin "Admin, N0CALL" + +### Directories ######### +# Data directory (for database files) +RunDir data + +# Log directory +LogDir logs + +### Intervals ######### +# Interval specification format examples: +# 600 (600 seconds), 5m, 2h, 1h30m, 1d3h15m24s, etc... + +# Statistics logging +StatsInterval 1m + +# How often to scan the caches for old entries +ExpiryInterval 30s + +# How long to cache last position of each station +LastPositionCache 48h + +# How long to cache duplicate filter entries (practically all packets) +DupeFilterCache 30s + +# When no data is received from an upstream server in N seconds, switch to +# another server +UpstreamTimeout 10s + +# When no data is received from a downstream server in N seconds, disconnect +ClientTimeout 48h + +### TCP listener ########## +# Listen tcp
+# socketname: any name you wish to show up in logs and statistics +# porttype: one of: +# fullfeed - everything, after dupe filtering +# dupefeed - everything that comes in - with dupes! +# msgonly - messages only +# userfilter - user-specified filters +# +Listen "Messages only" messageonly tcp 0.0.0.0 1314 t/m +Listen "Full feed with CWOP" fullfeed tcp 0.0.0.0 10152 +Listen "Igate port" igate tcp 0.0.0.0 14580 +Listen "Client-only port" clientonly tcp 0.0.0.0 14581 + +Uplink core1 full tcp 10.0.0.5 10153 +Uplink core2 full tcp 10.0.0.6 10154 + +### Internals ############ +# Only use 3 threads in these basic tests, to keep startup/shutdown times +# short. +WorkerThreads 3 + +# When running this server as super-user, the server can (in many systems) +# increase several resource limits, and do other things that less privileged +# server can not do. +# +# The FileLimit is resource limit on how many simultaneous connections and +# some other internal resources the system can use at the same time. +# If the server is not being run as super-user, this setting has no effect. +# +FileLimit 10000 + diff --git a/tests/libperl/Ham/APRS/IS.pm b/tests/libperl/Ham/APRS/IS.pm index 184e754..f27094f 100644 --- a/tests/libperl/Ham/APRS/IS.pm +++ b/tests/libperl/Ham/APRS/IS.pm @@ -199,6 +199,23 @@ sub connected($) } +=head1 accepted($socket) + +Accepts a socket + +=cut + +sub accepted($$) +{ + my($self, $sock) = @_; + + $self->{'sock'} = $sock; + $self->{'sock'}->blocking(0); + $self->{'state'} = 'connected'; + $self->{'error'} = 'Accepted connection successfully'; + $self->{'ibuf'} = ''; +} + # ------------------------------------------------------------------------- # Get a line (blocking) diff --git a/tests/libperl/Ham/APRS/IS_Fake.pm b/tests/libperl/Ham/APRS/IS_Fake.pm new file mode 100644 index 0000000..4adf60d --- /dev/null +++ b/tests/libperl/Ham/APRS/IS_Fake.pm @@ -0,0 +1,92 @@ + +package Ham::APRS::IS_Fake; + +use 5.006; +use strict; +use warnings; + +use IO::Handle '_IOFBF'; +use IO::Socket::INET; +use IO::Select; + +use Ham::APRS::IS; + +=head1 new(hostport, mycall, optionshash) + +Initializes a new Ham::APRS::IS_Fake listening socket. Takes two mandatory arguments, +the host:port pair to listen on and the server's callsign. + + my $is = new Ham::APRS::IS_Fake('*:12765', 'N0CALL'); + +=cut + +sub new($$$;%) +{ + my $that = shift; + my $class = ref($that) || $that; + my $self = { }; + bless ($self, $class); + + my($host_port, $mycall, %options) = @_; + + $self->{'host_port'} = $host_port; + $self->{'mycall'} = $mycall; + #$self->{'filter'} = $options{'filter'} if (defined $options{'filter'}); + + #warn "aprspass for $self->{mycall} is $self->{aprspass}\n"; + + $self->{'state'} = 'init'; + $self->{'error'} = "No errors yet."; + + return $self; +} + +sub bind_and_listen($) +{ + my($self) = @_; + + my($localaddr, $localport) = split(':', $self->{'host_port'}); + + $self->{'lsock'} = IO::Socket::INET->new( + Listen => 10, + LocalAddr => $self->{'host_port'}, + Proto => 'tcp', + ReuseAddr => 1, + ReeusePort => 1); + + die "Could not create socket: $!\n" unless $self->{'lsock'}; +} + +sub accept($) +{ + my($self) = @_; + + my $sock = $self->{'lsock'}->accept(); + + return if (!$sock); + + my $is = new Ham::APRS::IS('client:0', $self->{'mycall'}); + $is->accepted($sock); + + return $is; +} + +## javAPRSSrvr 3.15b07 +#user oh7lzb-af +## logresp oh7lzb-af unverified, server T2FINLAND + +sub send_login_prompt($$) +{ + my($self, $is) = @_; + + return $is->sendline("# IS_Fake 1.00"); +} + +sub send_login_ok($$) +{ + my($self, $is) = @_; + + return $is->sendline("# logresp CALLSIGN unverified, server IS_FAKE"); +} + +1; diff --git a/tests/libperl/istest.pm b/tests/libperl/istest.pm index 58bc4c2..1c52a33 100644 --- a/tests/libperl/istest.pm +++ b/tests/libperl/istest.pm @@ -88,5 +88,11 @@ sub should_drop($$$$$;$) $i_rx->getline_noncomment(); } +sub read_and_disconnect($) +{ + my($i) = @_; + + +} 1; diff --git a/tests/libperl/runproduct.pm b/tests/libperl/runproduct.pm index 608273a..9867e3a 100644 --- a/tests/libperl/runproduct.pm +++ b/tests/libperl/runproduct.pm @@ -18,7 +18,7 @@ use Data::Dumper; my %products = ( 'aprsc' => { 'binary' => '../src/aprsc', - 'stdargs' => '', + 'stdargs' => '-e debug -o file -r logs', 'cfgfileargs' => '-c', 'cfgdir' => 'cfg-aprsc' },