+ implemented 'multiro' uplinks: parallel read-only links to multiple

hubs, to be used for aprs.fi uplink aggregation
+ fixed passing packets from uplinks to clients


git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@337 3ce903b1-3385-4e86-93cd-f9a4a239f7ac
This commit is contained in:
Heikki Hannikainen 2012-05-12 11:19:53 +00:00
parent c9268bc14f
commit a207b3e1e6
15 changed files with 592 additions and 77 deletions

View File

@ -324,9 +324,16 @@ int do_uplink(struct uplink_config_t **lq, int argc, char **argv)
/* argv[1] is name label for this uplink */
if (strcasecmp(argv[2],"ro")==0) {
clflags |= CLFLAGS_PORT_RO;
} // FIXME: other tokens ??
if (strcasecmp(argv[2], "ro")==0) {
clflags |= CLFLAGS_PORT_RO;
} else if (strcasecmp(argv[2], "multiro")==0) {
clflags |= CLFLAGS_PORT_RO|CLFLAGS_UPLINKMULTI;
} else if (strcasecmp(argv[2], "full") == 0) {
/* regular */
} else {
hlog(LOG_ERR, "Uplink: Unsupported uplink type '%s'\n", argv[2]);
return -2;
}
memset(&req, 0, sizeof(req));
req.ai_family = 0;
@ -372,6 +379,7 @@ int do_uplink(struct uplink_config_t **lq, int argc, char **argv)
l->host = hstrdup(argv[4]);
l->port = hstrdup(argv[5]);
l->client_flags = clflags;
l->state = UPLINK_ST_UNKNOWN;
for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
l->filters[i] = NULL;
@ -598,6 +606,31 @@ int read_config(void)
failed = 1;
}
/* validate uplink config: if there is a single 'multiro' connection
* configured, all of the uplinks must be 'multiro'
*/
int uplink_config_failed = 0;
int got_multiro = 0;
int got_non_multiro = 0;
struct uplink_config_t *up;
for (up = new_uplink_config; (up); up = up->next) {
if (up->client_flags & CLFLAGS_UPLINKMULTI)
got_multiro = 1;
else
got_non_multiro = 1;
if ((up->client_flags & CLFLAGS_UPLINKMULTI) && !(up->client_flags & CLFLAGS_PORT_RO)) {
uplink_config_failed = 1;
hlog(LOG_WARNING, "Config: uplink with non-RO MULTI uplink - would cause a loop, not allowed.");
}
}
if ((got_multiro) && (got_non_multiro)) {
hlog(LOG_WARNING, "Config: Configured both multiro and non-multiro uplinks - would cause a loop, not allowed.");
failed = 1;
free_uplink_config(&new_uplink_config);
}
if (uplink_config_failed)
free_uplink_config(&new_uplink_config);
if (new_fileno_limit > 0 && new_fileno_limit != fileno_limit) {
/* Adjust process global fileno limit */
int e;

View File

@ -107,8 +107,15 @@ struct uplink_config_t {
const char *filters[10]; /* up to 10 filters, NULL when not defined */
int client_flags;
int state; /* the state of the uplink */
};
#define UPLINK_ST_UNKNOWN -1
#define UPLINK_ST_NOT_LINKED 0
#define UPLINK_ST_CONNECTING 1
#define UPLINK_ST_CONNECTED 2
#define UPLINK_ST_LINKED 3
extern struct listen_config_t *listen_config;
extern struct peerip_config_t *peerip_config;
extern struct uplink_config_t *uplink_config;

View File

@ -486,8 +486,8 @@ static void dupecheck_thread(void)
w->pbuf_incoming_count = 0;
pthread_mutex_unlock(&w->pbuf_incoming_mutex);
// hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d",
// c, w->id, dupecheck_seqnum);
hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d",
c, w->id, dupecheck_seqnum);
for (pb = pb_list; (pb); pb = pbnext) {
int rc = dupecheck(pb);
@ -505,6 +505,7 @@ static void dupecheck_thread(void)
pb_out_dupe_last = pb;
pb->seqnum = ++dupecheck_dupe_seqnum;
++pb_out_dupe_count;
//hlog(LOG_DEBUG, "is duplicate");
}
n++;
}

View File

@ -434,23 +434,23 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
/* look for the '>' */
src_end = memchr(s, '>', pathlen < CALLSIGNLEN_MAX+1 ? pathlen : CALLSIGNLEN_MAX+1);
if (!src_end)
return -1; // No ">" in packet start..
return -2; // No ">" in packet start..
path_start = src_end+1;
if (path_start >= packet_end)
return -1;
return -3;
if (src_end - s > CALLSIGNLEN_MAX || src_end - s < CALLSIGNLEN_MIN)
return -1; /* too long source callsign */
return -4; /* too long source callsign */
info_start = path_end+1; // @":"+1 - first char of the payload
if (info_start >= packet_end)
return -1;
return -5;
/* see that there is at least some data in the packet */
info_end = packet_end;
if (info_end <= info_start)
return -1;
return -6;
/* look up end of dstcall (excluding SSID - this is the way dupecheck and
* mic-e parser wants it)
@ -464,7 +464,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
dstcall_end++;
if (dstcall_end - path_start > CALLSIGNLEN_MAX)
return -1; /* too long for destination callsign */
return -7; /* too long for destination callsign */
/* where does the digipeater path start? */
via_start = dstcall_end;
@ -478,8 +478,8 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
/* if disallow_unverified is anabled, don't allow unverified clients
* to send packets where srccall != login
*/
if (!c->validated && !originated_by_client && disallow_unverified)
return -1;
if (!c->validated && !originated_by_client && disallow_unverified && !(c->flags & CLFLAGS_UPLINKPORT))
return -8;
/* process Q construct, path_append_len of path_append will be copied
* to the end of the path later
@ -499,7 +499,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
if (!pb) {
// This should never happen...
hlog(LOG_INFO, "pbuf_get failed to get a block");
return -1; // No room :-(
return -9; // No room :-(
}
pb->next = NULL; // pbuf arrives pre-zeroed
@ -530,7 +530,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
pb->qconst_start = pb->data + (q_start - s);
} else {
fprintf(stderr, "q construct bug: did not find a good construct or produce a new one for:\n%s\n", s);
return -1;
return -10;
}
/* Copy the modified or appended part of the packet header -- qcons */
@ -560,7 +560,7 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
pb->dstcall_len = via_start - src_end - 1;
pb->info_start = info_start;
// hlog(LOG_DEBUG, "After parsing and Qc algorithm: %.*s", pb->packet_len-2, pb->data);
//hlog(LOG_DEBUG, "After parsing and Qc algorithm: %.*s", pb->packet_len-2, pb->data);
/* just try APRS parsing */
rc = parse_aprs(self, pb);
@ -628,9 +628,9 @@ int incoming_uplinksim_handler(struct worker_t *self, struct client_t *c, char *
/* failed parsing */
if (e == -42)
hlog(LOG_DEBUG, "Packet too short (%d): %.*s", len, len, s);
hlog(LOG_DEBUG, "Uplinksim: Packet too short (%d): %.*s", len, len, s);
else
hlog(LOG_DEBUG, "Failed parsing (%d): %.*s",e,len,s);
hlog(LOG_DEBUG, "Uplinksim: Failed parsing (%d): %.*s",e,len,s);
}
return 0;

View File

@ -32,12 +32,13 @@
#include "filter.h"
#include "passcode.h"
#define MAX_UPLINKS 32
int uplink_reconfiguring;
int uplink_shutting_down;
pthread_mutex_t uplink_client_mutex = PTHREAD_MUTEX_INITIALIZER;
struct client_t *uplink_client;
struct client_t *uplink_client[MAX_UPLINKS];
int uplink_running;
pthread_t uplink_th;
@ -68,23 +69,28 @@ int uplink_sighandler(int signum)
}
/*
* Open the uplinking socket
* Close uplinking sockets
*/
void close_uplinkers(void)
{
int rc;
hlog(LOG_INFO, "Closing all uplinks");
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog( LOG_ERR, "close_uplinkers(): could not lock uplink_client_mutex: %s", strerror(rc) );
return;
}
if (uplink_client && uplink_client->fd >= 0) {
hlog( LOG_DEBUG, "Closing uplinking socket .. fd %d ...", uplink_client->fd );
shutdown(uplink_client->fd, SHUT_RDWR);
int i;
for (i = 0; i < MAX_UPLINKS; i++) {
if ((uplink_client[i]) && uplink_client[i]->fd >= 0) {
hlog( LOG_DEBUG, "Closing uplinking socket %d (fd %d) %s ...", i, uplink_client[i]->fd, uplink_client[i]->addr_ss );
shutdown(uplink_client[i]->fd, SHUT_RDWR);
}
}
if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog( LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc) );
return;
@ -96,7 +102,7 @@ void uplink_close(struct client_t *c)
{
int rc;
hlog(LOG_DEBUG, "Uplink socket has been closed.");
hlog(LOG_DEBUG, "Uplink to %s has been closed.", c->addr_ss);
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog(LOG_ERR, "close_uplinkers(): could not lock uplink_client_mutex: %s", strerror(rc));
@ -105,7 +111,7 @@ void uplink_close(struct client_t *c)
-- uplink_connects.gauge;
uplink_client = NULL; // there can be only one!
uplink_client[c->uplink_index] = NULL; // there can be only one!
if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
@ -120,9 +126,9 @@ int uplink_login_handler(struct worker_t *self, struct client_t *c, char *s, int
char buf[1000];
int passcode, rc;
#ifndef FIXED_IOBUFS
if (!c->username) c->username = hstrdup("simulator");
if (!c->username)
c->username = hstrdup("simulator");
#else
if (!*c->username)
strcpy(c->username, "simulator");
@ -155,6 +161,7 @@ int uplink_login_handler(struct worker_t *self, struct client_t *c, char *s, int
int make_uplink(struct uplink_config_t *l)
{
int fd, i, arg;
int uplink_index;
struct client_t *c;
union sockaddr_u sa; /* large enough for also IPv6 address */
socklen_t addr_len;
@ -172,7 +179,17 @@ int make_uplink(struct uplink_config_t *l)
req.ai_protocol = IPPROTO_TCP;
req.ai_flags = 0;
ai = NULL;
/* find a free uplink slot */
for (uplink_index = 0; uplink_index < MAX_UPLINKS; uplink_index++) {
if (!uplink_client[uplink_index])
break;
}
if (uplink_index == MAX_UPLINKS) {
hlog(LOG_ERR, "Uplink: No available uplink slots, %d used", MAX_UPLINKS);
return -2;
}
if (strcasecmp(l->proto, "tcp") == 0) {
// well, do nothing for now.
} else if (strcasecmp(l->proto, "udp") == 0) {
@ -194,13 +211,14 @@ int make_uplink(struct uplink_config_t *l)
return -2;
}
l->state = UPLINK_ST_CONNECTING;
i = getaddrinfo(l->host, l->port, &req, &ai);
if (i != 0) {
hlog(LOG_INFO,"Uplink: address resolving failure of '%s' '%s'",l->host,l->port);
hlog(LOG_INFO,"Uplink: address resolving failure of '%s' '%s'", l->host, l->port);
l->state = UPLINK_ST_NOT_LINKED;
return i;
}
i = 0;
for (a = ai; a && i < 20 ; a = a->ai_next, ++i) {
ap[i] = a; /* Up to 20 first addresses */
@ -224,8 +242,7 @@ int make_uplink(struct uplink_config_t *l)
// FIXME: format socket IP address to text
sprintf(addr_s, "%s:%s", l->host, l->port);
hlog(LOG_INFO, "Making uplink TCP socket: %s %s", l->host, l->port);
hlog(LOG_INFO, "Uplink: Connecting to %s:%s", l->host, l->port);
if ((fd = socket(a->ai_family, a->ai_socktype, a->ai_protocol)) < 0) {
hlog(LOG_CRIT, "Uplink: socket(): %s\n", strerror(errno));
@ -234,24 +251,72 @@ int make_uplink(struct uplink_config_t *l)
arg = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg));
if (connect(fd, a->ai_addr, a->ai_addrlen)) {
hlog(LOG_CRIT, "Uplink: connect(%s): %s", addr_s, strerror(errno));
/* set non-blocking mode at this point, so that we can make a
* non-blocking connect() with a short timeout
*/
if (fcntl(fd, F_SETFL, O_NONBLOCK)) {
hlog(LOG_CRIT, "Uplink: Failed to set non-blocking mode on new socket: %s", strerror(errno));
close(fd);
fd = -1;
continue;
}
if (fd >= 0)
break; /* Successfull connect! */
if (connect(fd, a->ai_addr, a->ai_addrlen) && errno != EINPROGRESS) {
hlog(LOG_ERR, "Uplink: connect(%s) failed: %s", addr_s, strerror(errno));
close(fd);
fd = -1;
continue;
}
/* Only wait a few seconds for the connection to be created.
* If the connection setup is very slow, it is unlikely to
* perform well enough anyway.
*/
struct pollfd connect_fd;
connect_fd.fd = fd;
connect_fd.events = POLLOUT;
connect_fd.revents = 0;
int r = poll(&connect_fd, 1, 3000);
hlog(LOG_DEBUG, "Uplink: poll after connect returned %d, revents %d", r, connect_fd.revents);
if (r < 0) {
hlog(LOG_ERR, "Uplink: connect to %s: poll failed: %s", addr_s, strerror(errno));
close(fd);
fd = -1;
continue;
}
if (r < 1) {
hlog(LOG_ERR, "Uplink: connect to %s timed out", addr_s);
close(fd);
fd = -1;
continue;
}
socklen_t optlen = sizeof(arg);
getsockopt(fd, SOL_SOCKET, SO_ERROR, (char *)&arg, &optlen);
if (arg == 0) {
/* Successful connect! */
hlog(LOG_DEBUG, "Uplink: successfull connect");
break;
}
hlog(LOG_ERR, "Uplink: connect to %s failed: %s", addr_s, strerror(arg));
close(fd);
fd = -1;
}
freeaddrinfo(ai); /* Not needed anymore.. */
if (fd < 0) {
l->state = UPLINK_ST_NOT_LINKED;
return -3; /* No successfull connection at any address.. */
}
c = client_alloc();
c->uplink_index = uplink_index;
c->fd = fd;
c->addr = sa;
c->state = CSTATE_CONNECTED;
@ -299,20 +364,14 @@ int make_uplink(struct uplink_config_t *l)
hlog(LOG_INFO, "%s - Uplink connection on fd %d from %s", c->addr_ss, c->fd, c->addr_s);
uplink_client = c;
uplink_client[uplink_index] = c;
l->state = UPLINK_ST_CONNECTED;
// for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
// if (l->filters[i])
// filter_parse(c, l->filters[i], 0); /* system filters */
// }
/* set non-blocking mode */
if (fcntl(c->fd, F_SETFL, O_NONBLOCK)) {
hlog(LOG_ERR, "Uplink: %s - Failed to set non-blocking mode on socket: %s", c->addr_ss, strerror(errno));
goto err;
}
/* Push it on the first worker, which ever it is..
*/
@ -334,7 +393,7 @@ int make_uplink(struct uplink_config_t *l)
hlog(LOG_ERR, "make_uplink(): could not unlock new_clients_mutex: %s", strerror(pe));
goto err;
}
++ uplink_connects.gauge;
++ uplink_connects.counter;
++ uplink_connects.refcount; /* <-- that does not get decremented at any time.. */
@ -345,7 +404,8 @@ int make_uplink(struct uplink_config_t *l)
err:
client_free(c);
uplink_client = NULL;
uplink_client[uplink_index] = NULL;
l->state = UPLINK_ST_NOT_LINKED;
return -1;
}
@ -358,7 +418,8 @@ void uplink_thread(void *asdf)
{
sigset_t sigs_to_block;
int rc;
int next_uplink = -1; /* the index to the next regular uplink candidate */
pthreads_profiling_reset("uplink");
sigemptyset(&sigs_to_block);
@ -372,46 +433,80 @@ void uplink_thread(void *asdf)
sigaddset(&sigs_to_block, SIGUSR1);
sigaddset(&sigs_to_block, SIGUSR2);
pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
hlog(LOG_INFO, "Uplink_thread starting...");
hlog(LOG_INFO, "Uplink thread starting...");
uplink_reconfiguring = 1;
while (!uplink_shutting_down) {
if (uplink_reconfiguring) {
uplink_reconfiguring = 0;
close_uplinkers();
hlog(LOG_INFO, "Uplink thread ready.");
hlog(LOG_INFO, "Uplink thread configured.");
}
/* sleep for 1 second */
poll(NULL, 0, 1000);
/* speed up shutdown */
if (uplink_shutting_down)
continue;
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog(LOG_ERR, "uplink_thread(): could not lock uplink_client_mutex: %s", strerror(rc));
continue;
}
if (!uplink_client) {
int n = 0;
struct uplink_config_t *l = uplink_config;
for (; l ; l = l->next )
++n;
l = uplink_config;
if (n > 0) {
n = random() % n;
for (; l && n > 0; l = l->next, --n)
;
if (!l) l = uplink_config;
if (l)
/* Check if all we have a single regular uplink connection up, out of all
* the configured ones. Also, check that all the UPLINKMULTI links are
* connected.
*/
int has_uplink = 0; /* do we have a single regular uplink? */
int avail_uplink = 0; /* how many regular uplinks are configured? */
struct uplink_config_t *l = uplink_config;
for (; l; l = l->next) {
if (l->client_flags & CLFLAGS_UPLINKMULTI) {
/* MULTI uplink, needs to be up */
if (l->state < UPLINK_ST_CONNECTING)
make_uplink(l);
} else {
/* regular uplink, need to have one connected */
if (l->state >= UPLINK_ST_CONNECTING)
has_uplink++;
avail_uplink++;
}
}
if (avail_uplink && !has_uplink) {
hlog(LOG_INFO, "Uplink: %d uplinks configured, %d are connected, need to pick new", avail_uplink, has_uplink);
/* we have regular uplinks but none are connected,
* pick the next one and connect */
next_uplink++;
if (next_uplink >= avail_uplink)
next_uplink = 0;
hlog(LOG_DEBUG, "Uplink: picked uplink index %d as the new candidate", next_uplink);
l = uplink_config;
int i = 0;
while ((l) && i < next_uplink) {
if (!(l->client_flags & CLFLAGS_UPLINKMULTI))
i++;
l = l->next;
}
if (l) {
hlog(LOG_DEBUG, "Uplink: trying %s (%s:%s)", l->name, l->host, l->port);
make_uplink(l);
}
}
if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
hlog(LOG_CRIT, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
continue;
}
/* sleep for 4 seconds between successful rounds */
poll(NULL, 0, 4000);
}
hlog(LOG_DEBUG, "Uplinker thread shutting down uplinking sockets...");
@ -420,8 +515,9 @@ void uplink_thread(void *asdf)
/*
* Start / stop dupecheck
* Start / stop the uplinks maintainer thread
*/
void uplink_start(void)
{
if (uplink_running)

View File

@ -728,7 +728,7 @@ int handle_client_readable(struct worker_t *self, struct client_t *c)
char *row_start;
if (c->fd < 0) {
hlog(LOG_DEBUG, "client no longer alive, closing (%s)", c->fd, c->addr_s);
hlog(LOG_DEBUG, "socket no longer alive, closing (%s)", c->fd, c->addr_s);
close_client(self, c);
return -1;
}
@ -738,7 +738,7 @@ int handle_client_readable(struct worker_t *self, struct client_t *c)
r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
if (r == 0) {
hlog( LOG_DEBUG, "read: EOF from client fd %d (%s @ %s)",
hlog( LOG_DEBUG, "read: EOF from socket fd %d (%s @ %s)",
c->fd, c->addr_s, c->addr_ss );
close_client(self, c);
return -1;
@ -747,7 +747,7 @@ int handle_client_readable(struct worker_t *self, struct client_t *c)
if (errno == EINTR || errno == EAGAIN)
return 0; /* D'oh.. return again latter */
hlog( LOG_DEBUG, "read: Error from client fd %d (%s): %s",
hlog( LOG_DEBUG, "read: Error from socket fd %d (%s): %s",
c->fd, c->addr_s, strerror(errno));
hlog( LOG_DEBUG, " .. ibuf=%p ibuf_end=%d ibuf_size=%d",
c->ibuf, c->ibuf_end, c->ibuf_size-c->ibuf_end-1);
@ -821,13 +821,13 @@ int handle_client_writeable(struct worker_t *self, struct client_t *c)
if (errno == EINTR || errno == EAGAIN)
return 0;
hlog(LOG_DEBUG, "write: Error from client fd %d (%s): %s", c->fd, c->addr_s, strerror(errno));
hlog(LOG_DEBUG, "write: Error from socket fd %d (%s): %s", c->fd, c->addr_s, strerror(errno));
close_client(self, c);
return -1;
}
c->obuf_start += r;
//hlog(LOG_DEBUG, "write: %d bytes to client fd %d (%s) - %d in obuf", r, c->fd, c->addr_s, c->obuf_end - c->obuf_start);
//hlog(LOG_DEBUG, "write: %d bytes to socket fd %d (%s) - %d in obuf", r, c->fd, c->addr_s, c->obuf_end - c->obuf_start);
if (c->obuf_start == c->obuf_end) {
xpoll_outgoing(&self->xp, c->xfd, 0);
c->obuf_start = c->obuf_end = 0;

View File

@ -208,6 +208,7 @@ struct client_t {
char *addr_hex; /* client IP address in hex format */
char *addr_ss; /* server IP address in text format */
#endif
int uplink_index; /* uplink array index */
int portnum;
time_t connect_time;/* Time of connection */
time_t last_read; /* Time of last read - not necessarily last packet... */
@ -248,6 +249,7 @@ struct client_t {
#define CLFLAGS_MESSAGEONLY 0x400 /* Together with filter t/m -- which really implements it */
#define CLFLAGS_CLIENTONLY 0x800 /* Client connected on client-only port */
#define CLFLAGS_IGATE 0x1000 /* Igate port */
#define CLFLAGS_UPLINKMULTI 0x2000 /* Allow multiple parallel outgoing connections */
CStateEnum state; /* state of the client... one of CSTATE_* */
char warned; /* the client has been warned that it has bad filter definition */

View File

@ -8,6 +8,18 @@ INST_ARCHLIB = libperl
PERL=/usr/bin/perl
TESTID=`echo $$$$_$$RANDOM$$RANDOM$$RANDOM`
TEST_SRCCALLS="XX7FIT-1,XX7FIT-2"
TEST_IGATE="XX7IG"
test:
rm -f logs/aprsc.log
PERL_DL_NONLAZY=1 $(PERL) "-MExtUtils::Command::MM" "-e" "test_harness($(TEST_VERBOSE), '$(INST_LIB)', '$(INST_ARCHLIB)')" $(TEST_FILES)
AGGREGATOR_FILES = aggregator/*.t
aggrtest:
rm -f logs/aprsc.log
TEST_ID=$(TESTID) TEST_IGATE=$(TEST_IGATE) TEST_SRCCALLS=$(TEST_SRCCALLS) \
PERL_DL_NONLAZY=1 $(PERL) "-MExtUtils::Command::MM" "-e" "test_harness($(TEST_VERBOSE), '$(INST_LIB)', '$(INST_ARCHLIB)')" $(AGGREGATOR_FILES)

View File

@ -0,0 +1,103 @@
#########################
# Feed data to an aggregator server from two servers
# and validate that no dupes come out
#########################
use Test;
my @packets;
my $id_ok;
BEGIN {
my $test_srcs = $ENV{'TEST_SRCCALLS'};
my @test_srcl = split(',', $test_srcs);
my $test_igate = $ENV{'TEST_IGATE'};
my $id = $ENV{'TEST_ID'};
my $tlm_seq = int(rand(1000));
foreach my $src (@test_srcl) {
push @packets, "$src>APZMDR,TRACE2-2,qAR,$test_igate:!/0(KkThq\">{2Oaprs.fi system testing COM_$id";
push @packets, "$src>APZMDR,TRACE2-2,qAR,$test_igate:>system testing STA_$id status";
push @packets, "$src>ID,qAR,$test_igate:Stupid testing BEACON_$id beacon";
push @packets, "$src>APRS,WIDE2-2,qAR,${test_igate}::OH7LZB-10:MSG_$id";
push @packets, "$src>APRS,WIDE2-2,qAR,${test_igate}:T#$tlm_seq,1,010,100,500,900,10101010";
push @packets, "$src>APRS,WIDE2-2,qAR,${test_igate}:_03041723c344s009g014t038r000p000P000h38b10207wDVP";
# bad packet to force raw packets flush
push @packets, "$src>APZMDR,TRACE2-2,qAR,$test_igate:!/0(KBAD LAST BAD_$id";
}
plan tests => 9 + 2 + ($#packets+1)*2 + 2 + 2 + 1
};
use Ham::APRS::IS_Fake;
use runproduct;
my $ret;
ok(1); # modules load fine
my $iss1 = new Ham::APRS::IS_Fake('127.0.0.1:10153', 'CORE1');
my $iss2 = new Ham::APRS::IS_Fake('127.0.0.1:10154', 'CORE2');
ok(1); # there's a working socket
$iss1->bind_and_listen();
$iss2->bind_and_listen();
ok(1);
# initialize the product runner using the basic configuration
my $p = new runproduct('aggregator');
ok(defined $p, 1, "Failed to initialize product runner");
ok($p->start(), 1, "Failed to start product");
my $is1 = $iss1->accept();
ok(defined $is1, (1), "Failed to accept connection 1 from server");
my $is2 = $iss2->accept();
ok(defined $is2, (1), "Failed to accept connection 2 from server");
$iss1->send_login_prompt($is1);
$iss1->send_login_ok($is1);
$iss2->send_login_prompt($is2);
$iss2->send_login_ok($is2);
my $read1 = $is1->getline_noncomment(1);
my $read2 = $is2->getline_noncomment(2);
ok($read1, qr/^user TESTING pass 31421 /, "Did not receive 'user' login command on connection 1");
ok($read2, qr/^user TESTING pass 31421 /, "Did not receive 'user' login command on connection 2");
# create client connection
my $cl = new Ham::APRS::IS("localhost:10152", 'CL1ENT');
ok(defined $cl, 1, "Failed to initialize Ham::APRS::IS");
$ret = $cl->connect('retryuntil' => 8);
ok($ret, 1, "Failed to connect to the server: " . $cl->{'error'});
foreach $i (@packets) {
ok($is1->sendline($i), 1, "failed to write packet to socket 1");
ok($is2->sendline($i), 1, "failed to write packet to socket 2");
}
sleep(3);
$read1 = $is1->getline_noncomment(1);
$read2 = $is2->getline_noncomment(2);
ok($read1, undef, "Ouch, received data from read-only upstream connection 1");
ok($read2, undef, "Ouch, received data from read-only upstream connection 2");
while (my $l = $cl->getline_noncomment(1)) {
warn "got: $l\n";
}
#my $read1 = istest::read_and_disconnect($sock1);
#my $read2 = istest::read_and_disconnect($sock2);
$ret = $is1->disconnect();
ok($ret, 1, "Failed to disconnect 1: " . $is1->{'error'});
$ret = $is2->disconnect();
ok($ret, 1, "Failed to disconnect 2: " . $is2->{'error'});
ok($p->stop(), 1, "Failed to stop product");
#########################

View File

@ -0,0 +1,73 @@
#
# USE RCS !!!
# $Id$
#
# Configuration for aprsc, an APRS-IS server for core servers
MyCall TESTING
MyEmail email@example.com
MyAdmin "Admin, N0CALL"
### Directories #########
# Data directory (for database files)
RunDir data
# Log directory
LogDir logs
### Intervals #########
# Interval specification format examples:
# 600 (600 seconds), 5m, 2h, 1h30m, 1d3h15m24s, etc...
# Statistics logging
StatsInterval 1m
# How often to scan the caches for old entries
ExpiryInterval 30s
# How long to cache last position of each station
LastPositionCache 48h
# How long to cache duplicate filter entries (practically all packets)
DupeFilterCache 30s
# When no data is received from an upstream server in N seconds, switch to
# another server
UpstreamTimeout 10s
# When no data is received from a downstream server in N seconds, disconnect
ClientTimeout 48h
### TCP listener ##########
# Listen <socketname> <porttype> tcp <address to bind> <port>
# socketname: any name you wish to show up in logs and statistics
# porttype: one of:
# fullfeed - everything, after dupe filtering
# dupefeed - everything that comes in - with dupes!
# msgonly - messages only
# userfilter - user-specified filters
#
Listen "Messages only" messageonly tcp 0.0.0.0 1314 t/m
Listen "Full feed with CWOP" fullfeed tcp 0.0.0.0 10152 t/c*
Listen "Igate port" igate tcp 0.0.0.0 14580
Listen "Client-only port" clientonly tcp 0.0.0.0 14581
Uplink full1 multiro tcp 127.0.0.1 10153
Uplink full2 multiro tcp 127.0.0.1 10154
### Internals ############
# Only use 3 threads in these basic tests, to keep startup/shutdown times
# short.
WorkerThreads 3
# When running this server as super-user, the server can (in many systems)
# increase several resource limits, and do other things that less privileged
# server can not do.
#
# The FileLimit is resource limit on how many simultaneous connections and
# some other internal resources the system can use at the same time.
# If the server is not being run as super-user, this setting has no effect.
#
FileLimit 10000

73
tests/cfg-aprsc/uplinks Normal file
View File

@ -0,0 +1,73 @@
#
# USE RCS !!!
# $Id$
#
# Configuration for aprsc, an APRS-IS server for core servers
MyCall TESTING
MyEmail email@example.com
MyAdmin "Admin, N0CALL"
### Directories #########
# Data directory (for database files)
RunDir data
# Log directory
LogDir logs
### Intervals #########
# Interval specification format examples:
# 600 (600 seconds), 5m, 2h, 1h30m, 1d3h15m24s, etc...
# Statistics logging
StatsInterval 1m
# How often to scan the caches for old entries
ExpiryInterval 30s
# How long to cache last position of each station
LastPositionCache 48h
# How long to cache duplicate filter entries (practically all packets)
DupeFilterCache 30s
# When no data is received from an upstream server in N seconds, switch to
# another server
UpstreamTimeout 10s
# When no data is received from a downstream server in N seconds, disconnect
ClientTimeout 48h
### TCP listener ##########
# Listen <socketname> <porttype> tcp <address to bind> <port>
# socketname: any name you wish to show up in logs and statistics
# porttype: one of:
# fullfeed - everything, after dupe filtering
# dupefeed - everything that comes in - with dupes!
# msgonly - messages only
# userfilter - user-specified filters
#
Listen "Messages only" messageonly tcp 0.0.0.0 1314 t/m
Listen "Full feed with CWOP" fullfeed tcp 0.0.0.0 10152
Listen "Igate port" igate tcp 0.0.0.0 14580
Listen "Client-only port" clientonly tcp 0.0.0.0 14581
Uplink core1 full tcp 10.0.0.5 10153
Uplink core2 full tcp 10.0.0.6 10154
### Internals ############
# Only use 3 threads in these basic tests, to keep startup/shutdown times
# short.
WorkerThreads 3
# When running this server as super-user, the server can (in many systems)
# increase several resource limits, and do other things that less privileged
# server can not do.
#
# The FileLimit is resource limit on how many simultaneous connections and
# some other internal resources the system can use at the same time.
# If the server is not being run as super-user, this setting has no effect.
#
FileLimit 10000

View File

@ -199,6 +199,23 @@ sub connected($)
}
=head1 accepted($socket)
Accepts a socket
=cut
sub accepted($$)
{
my($self, $sock) = @_;
$self->{'sock'} = $sock;
$self->{'sock'}->blocking(0);
$self->{'state'} = 'connected';
$self->{'error'} = 'Accepted connection successfully';
$self->{'ibuf'} = '';
}
# -------------------------------------------------------------------------
# Get a line (blocking)

View File

@ -0,0 +1,92 @@
package Ham::APRS::IS_Fake;
use 5.006;
use strict;
use warnings;
use IO::Handle '_IOFBF';
use IO::Socket::INET;
use IO::Select;
use Ham::APRS::IS;
=head1 new(hostport, mycall, optionshash)
Initializes a new Ham::APRS::IS_Fake listening socket. Takes two mandatory arguments,
the host:port pair to listen on and the server's callsign.
my $is = new Ham::APRS::IS_Fake('*:12765', 'N0CALL');
=cut
sub new($$$;%)
{
my $that = shift;
my $class = ref($that) || $that;
my $self = { };
bless ($self, $class);
my($host_port, $mycall, %options) = @_;
$self->{'host_port'} = $host_port;
$self->{'mycall'} = $mycall;
#$self->{'filter'} = $options{'filter'} if (defined $options{'filter'});
#warn "aprspass for $self->{mycall} is $self->{aprspass}\n";
$self->{'state'} = 'init';
$self->{'error'} = "No errors yet.";
return $self;
}
sub bind_and_listen($)
{
my($self) = @_;
my($localaddr, $localport) = split(':', $self->{'host_port'});
$self->{'lsock'} = IO::Socket::INET->new(
Listen => 10,
LocalAddr => $self->{'host_port'},
Proto => 'tcp',
ReuseAddr => 1,
ReeusePort => 1);
die "Could not create socket: $!\n" unless $self->{'lsock'};
}
sub accept($)
{
my($self) = @_;
my $sock = $self->{'lsock'}->accept();
return if (!$sock);
my $is = new Ham::APRS::IS('client:0', $self->{'mycall'});
$is->accepted($sock);
return $is;
}
## javAPRSSrvr 3.15b07
#user oh7lzb-af
## logresp oh7lzb-af unverified, server T2FINLAND
sub send_login_prompt($$)
{
my($self, $is) = @_;
return $is->sendline("# IS_Fake 1.00");
}
sub send_login_ok($$)
{
my($self, $is) = @_;
return $is->sendline("# logresp CALLSIGN unverified, server IS_FAKE");
}
1;

View File

@ -88,5 +88,11 @@ sub should_drop($$$$$;$)
$i_rx->getline_noncomment();
}
sub read_and_disconnect($)
{
my($i) = @_;
}
1;

View File

@ -18,7 +18,7 @@ use Data::Dumper;
my %products = (
'aprsc' => {
'binary' => '../src/aprsc',
'stdargs' => '',
'stdargs' => '-e debug -o file -r logs',
'cfgfileargs' => '-c',
'cfgdir' => 'cfg-aprsc'
},