IS2: Implement live upgrade for IS2 clients, with test case.

This commit is contained in:
Heikki Hannikainen 2017-04-16 21:32:31 +03:00
parent 3a5b229959
commit 4ca8ea37e7
5 changed files with 147 additions and 44 deletions

View File

@ -1081,6 +1081,7 @@ static int accept_liveupgrade_single(cJSON *client, int *rxerr_map, int rxerr_ma
{
cJSON *fd, *listener_id, *username, *time_connect, *tick_connect;
cJSON *state;
cJSON *link;
cJSON *addr_loc;
cJSON *udp_port;
cJSON *app_name, *app_version;
@ -1138,6 +1139,7 @@ static int accept_liveupgrade_single(cJSON *client, int *rxerr_map, int rxerr_ma
client_heard = cJSON_GetObjectItem(client, "client_heard");
lat = cJSON_GetObjectItem(client, "lat");
lng = cJSON_GetObjectItem(client, "lng");
link = cJSON_GetObjectItem(client, "link");
if (!(
(fd)
@ -1203,15 +1205,30 @@ static int accept_liveupgrade_single(cJSON *client, int *rxerr_map, int rxerr_ma
hfree(client_addr_s);
if ((link) && (link->valuestring) && strcmp(link->valuestring, "is2") == 0) {
c->flags |= CLFLAGS_IS2;
}
if (strcmp(state->valuestring, "connected") == 0) {
c->state = CSTATE_CONNECTED;
c->handler_line_in = &incoming_handler;
/* use the default login handler */
if (c->flags & CLFLAGS_IS2)
c->is2_input_handler = &is2_input_handler;
else
c->handler_line_in = &incoming_handler;
strncpy(c->username, username->valuestring, sizeof(c->username));
c->username[sizeof(c->username)-1] = 0;
c->username_len = strlen(c->username);
} else if (strcmp(state->valuestring, "login") == 0) {
c->state = CSTATE_LOGIN;
c->handler_line_in = &login_handler;
/* use the default login handler */
if (c->flags & CLFLAGS_IS2)
c->is2_input_handler = &is2_input_handler_login;
else
c->handler_line_in = &login_handler;
} else {
hlog(LOG_ERR, "Live upgrade: Client %s is in invalid state '%s' (fd %d)", l->addr_s, state->valuestring, l->fd);
goto err;

View File

@ -2252,6 +2252,9 @@ static struct cJSON *worker_client_json(struct client_t *c, int liveup_info)
if (c->ai_protocol == IPPROTO_SCTP)
cJSON_AddStringToObject(jc, "proto", "sctp");
if (c->flags & CLFLAGS_IS2)
cJSON_AddStringToObject(jc, "link", "is2");
#ifdef USE_SSL
if (c->cert_subject[0])
cJSON_AddStringToObject(jc, "cert_subject", c->cert_subject);

View File

@ -34,8 +34,10 @@ ClientTimeout 48h
# msgonly - messages only
# userfilter - user-specified filters
#
Listen "Full feed with CWOP" is2fullfeed tcp ::0 56152 acl "cfg-aprsc/acl-all.acl"
Listen "Igate port" is2igate tcp 0.0.0.0 56580 acl "cfg-aprsc/acl-all.acl"
Listen "Full feed with CWOP" fullfeed tcp ::0 55152 acl "cfg-aprsc/acl-all.acl"
Listen "Igate port" igate tcp 0.0.0.0 55580 acl "cfg-aprsc/acl-all.acl"
Listen "IS2 Full feed with CWOP" is2fullfeed tcp ::0 56152 acl "cfg-aprsc/acl-all.acl"
Listen "IS2 Igate port" is2igate tcp 0.0.0.0 56580 acl "cfg-aprsc/acl-all.acl"
### Uplink configuration ########
# Uplink <name> <type> tcp <address> <port>

View File

@ -172,31 +172,12 @@ sub connect($;%)
$self->{'sock'}->blocking(0);
# wait for server signature
my $t = time();
while (my $l = $self->is2_frame_in()) {
my $sig = $l->server_signature;
if ($l->type == IS2Message::Type::SERVER_SIGNATURE()) {
if (!$sig) {
$self->{'error'} = "SERVER_SIGNATURE type, but no server signature message";
return 0;
}
$self->{'loginstate'} = 'server_signature';
#warn sprintf("got server signature: serverid '%s' app '%s' version '%s'\n",
# $sig->username, $sig->app_name, $sig->app_version);
last;
} else {
$self->{'error'} = "Wrong type of message received instead of SERVER_SIGNATURE: " . $l->type;
return 0;
}
if (time() - $t > 5) {
$self->{'error'} = "Timed out waiting for server signature";
return 0;
}
if (!$self->wait_signature()) {
return 0;
}
$self->{'loginstate'} = 'server_signature';
if ($self->{'loginstate'} ne 'server_signature') {
$self->{'error'} = "Timed out waiting for server signature";
return 0;
@ -215,10 +196,9 @@ sub connect($;%)
'login_request' => $lm
});
$self->{'sock'}->blocking(1);
$self->is2_frame_out($im->encode);
$self->{'sock'}->blocking(0);
my $t = time();
while (my $l = $self->is2_frame_in()) {
my $rep = $l->login_reply;
if ($l->type == IS2Message::Type::LOGIN_REPLY()) {
@ -284,9 +264,86 @@ sub send_packets($$)
'is_packet' => \@pq
});
$self->{'sock'}->blocking(1);
$self->is2_frame_out($im->encode);
$self->{'sock'}->blocking(0);
}
sub wait_signature($)
{
my($self) = @_;
# wait for server signature
my $t = time();
while (my $l = $self->is2_frame_in()) {
my $sig = $l->server_signature;
if ($l->type == IS2Message::Type::SERVER_SIGNATURE()) {
if (!$sig) {
$self->{'error'} = "SERVER_SIGNATURE type, but no server signature message";
return 0;
}
return 1;
#warn sprintf("got server signature: serverid '%s' app '%s' version '%s'\n",
# $sig->username, $sig->app_name, $sig->app_version);
} else {
$self->{'error'} = "Wrong type of message received instead of SERVER_SIGNATURE: " . $l->type;
return 0;
}
if (time() - $t > 5) {
$self->{'error'} = "Timed out waiting for server signature";
return 0;
}
}
$self->{'error'} = "Failed to read server signature: $!";
return 0;
}
sub ping($)
{
my($self) = @_;
my $timeout = 2;
my $reqid = int(rand(2**30));
my $im = IS2Message->new({
'type' => IS2Message::Type::KEEPALIVE_PING(),
'keepalive_ping' => IS2KeepalivePing->new({
'ping_type' => IS2KeepalivePing::PingType::REQUEST(),
'request_id' => $reqid,
})
});
$self->is2_frame_out($im->encode);
my $t = time();
while (my $l = $self->is2_frame_in($timeout)) {
if ($l->type != IS2Message::Type::KEEPALIVE_PING()) {
warn "IS2 ping: Unexpected type of frame received: " . $l->type . "\n";
next;
}
if (!defined $l->keepalive_ping) {
$self->{'error'} = "IS2 ping reply does not have keepalive_ping payload";
return undef;
}
if ($l->keepalive_ping->ping_type != IS2KeepalivePing::PingType::REPLY()) {
$self->{'error'} = "IS2 ping: Wrong type of frame received: " . $l->type;
return undef;
}
if ($l->keepalive_ping->request_id != $reqid) {
$self->{'error'} = "IS2 ping: Request id mismatch, sent $reqid, got " . $l->keepalive_ping->request_id;
return undef;
}
return 1;
}
$self->{'error'} = "ping get_packets timed out";
return undef;
}
sub send_packet($)
@ -367,9 +424,7 @@ sub set_filter($$)
})
});
$self->{'sock'}->blocking(1);
$self->is2_frame_out($im->encode);
$self->{'sock'}->blocking(0);
my $t = time();
while (my $l = $self->is2_frame_in()) {
@ -422,15 +477,19 @@ sub is2_frame_out($$)
#warn "is2_frame_out: framelen $framelen\n";
$self->{'sock'}->blocking(1);
if (!$self->{'sock'}->print(chr(0x02) . substr($framelen_i, 1) . $frame . chr(0x03))) {
$self->{'error'} = "Failed to write IS2 frame to $self->{host_port}: $!";
$self->{'sock'}->blocking(0);
return 0;
}
if (!$self->{'sock'}->flush) {
$self->{'error'} = "Failed to flush IS2 frame to $self->{host_port}: $!";
$self->{'sock'}->blocking(0);
return 0;
}
$self->{'sock'}->blocking(0);
}
sub is2_frame_in($;$)
@ -510,6 +569,7 @@ sub is2_frame_in($;$)
if (time() > $end_t) {
#warn "is2_frame_in: timeout\n";
$self->{'error'} = "is2_frame_in timed out";
return undef;
}
}

View File

@ -6,7 +6,7 @@
use Test;
BEGIN {
plan tests => (!defined $ENV{'TEST_PRODUCT'} || $ENV{'TEST_PRODUCT'} =~ /aprsc/) ? 2 + 8 + 2 + 8 + 1 : 0;
plan tests => (!defined $ENV{'TEST_PRODUCT'} || $ENV{'TEST_PRODUCT'} =~ /aprsc/) ? 2 + 13 + 2 + 12 + 1 : 0;
};
if (defined $ENV{'TEST_PRODUCT'} && $ENV{'TEST_PRODUCT'} !~ /aprsc/) {
@ -19,6 +19,7 @@ use LWP::UserAgent;
use HTTP::Request::Common;
use JSON::XS;
use Ham::APRS::IS;
use Ham::APRS::IS2;
use Time::HiRes qw( sleep time );
use istest;
@ -33,7 +34,7 @@ $json->latin1(0);
$json->ascii(1);
$json->utf8(0);
my $p = new runproduct('basic');
my $p = new runproduct('is2-basic');
ok(defined $p, 1, "Failed to initialize product runner");
ok($p->start(), 1, "Failed to start product");
@ -71,15 +72,28 @@ ok(defined $i_rx, 1, "Failed to initialize Ham::APRS::IS");
$ret = $i_rx->connect('retryuntil' => 8);
ok($ret, 1, "Failed to connect to the server: " . $i_rx->{'error'});
# IS2 clients
my $is2 = new Ham::APRS::IS2("localhost:56152", "N5CAL-4");
ok(defined $is2, 1, "Failed to initialize Ham::APRS::IS2");
$ret = $is2->connect('retryuntil' => 8);
ok($ret, 1, "Failed to connect to the server: " . $is2->{'error'});
ok($is2->ping(), 1, "Failed to ping the IS2 server: " . $is2->{'error'});
# send a packet, a duplicate, and a third dummy packet
istest::txrx(\&ok, $i_tx, $i_rx,
"SRC>DST,qAR,$login:foo1",
"SRC>DST,qAR,$login:foo1");
my $uniq = "SRC>DST,qAR,$login:foo1";
istest::txrx(\&ok, $i_tx, $i_rx, $uniq, $uniq);
# 11: send the same packet with a different digi path and see that it is dropped
my $helper = "SRC>DST,qAR,$login:dummy1";
istest::should_drop(\&ok, $i_tx, $i_rx,
"SRC>DST,DIGI1*,qAR,$login:foo1", # should drop
"SRC>DST:dummy1", 1); # will pass (helper packet)
$helper, 1, 1); # will pass (helper packet)
# the unique packet and the helper will appear on the IS2 full feed
istest::rx(\&ok, $is2, $uniq, $uniq);
istest::rx(\&ok, $is2, $helper, $helper);
# delete old liveupgrade status file, ignore errors if it doesn't happen to exist yet
my $liveupgrade_json_old = "data/liveupgrade.json.old";
@ -99,15 +113,22 @@ while (time() < $wait_end && ! -e $liveupgrade_json_old) {
#warn sprintf("waited %.3f s\n", time() - $wait_start);
ok(-e $liveupgrade_json_old, 1, "live upgrade not done, timed out in $maxwait s, $liveupgrade_json_old not present");
ok($is2->wait_signature(), 1, "IS2 server signature not received after upgrade: " . $is2->{'error'});
ok($is2->ping(), 1, "Failed to ping the IS2 server: " . $is2->{'error'});
# do the same test again - dupecheck cache has been lost in the
# upgrade
istest::txrx(\&ok, $i_tx, $i_rx,
"SRC>DST,qAR,$login:foo1",
"SRC>DST,qAR,$login:foo1");
istest::txrx(\&ok, $i_tx, $i_rx, $uniq, $uniq);
$helper = "SRC>DST,qAR,$login:dummy2";
istest::should_drop(\&ok, $i_tx, $i_rx,
"SRC>DST,DIGI1*,qAR,$login:foo1", # should drop
"SRC>DST:dumm21", 1); # will pass (helper packet)
$uniq, # should drop
$helper, 1, 1); # will pass (helper packet)
# the unique packet and the helper will appear on the IS2 full feed
istest::rx(\&ok, $is2, $uniq, $uniq);
istest::rx(\&ok, $is2, $helper, $helper);
# it takes some time for worker threads to accumulate statistics
sleep(1.5);