+ implemented client and server inactivity timeouts
+ test cases for both client and server inactivity timeouts = tuned log messages git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@345 3ce903b1-3385-4e86-93cd-f9a4a239f7ac
This commit is contained in:
parent
34afaa97b4
commit
dcd8259c0e
31
src/worker.c
31
src/worker.c
|
|
@ -915,7 +915,7 @@ void collect_new_clients(struct worker_t *self)
|
||||||
/* the new client may end up destroyed right away, never mind it here... */
|
/* the new client may end up destroyed right away, never mind it here... */
|
||||||
client_printf(self, c, "# Hello %s\r\n", c->addr_s, SERVERID);
|
client_printf(self, c, "# Hello %s\r\n", c->addr_s, SERVERID);
|
||||||
}
|
}
|
||||||
hlog( LOG_DEBUG, "Worker %d accepted %d new clients, now total %d",
|
hlog( LOG_DEBUG, "Worker %d accepted %d new connections, now total %d",
|
||||||
self->id, self->xp.pollfd_used - n, self->xp.pollfd_used );
|
self->id, self->xp.pollfd_used - n, self->xp.pollfd_used );
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -957,7 +957,7 @@ void send_keepalives(struct worker_t *self)
|
||||||
// the c may get destroyed from underneath of ourselves!
|
// the c may get destroyed from underneath of ourselves!
|
||||||
cnext = c->next;
|
cnext = c->next;
|
||||||
|
|
||||||
#if 0 /* Or perhaps we should ? */
|
#if 0 /* Or perhaps we should ? Oh yes we should! Especially for read-only uplinks. */
|
||||||
if ( c->flags & (CLFLAGS_UPLINKSIM|CLFLAGS_UPLINKPORT) ||
|
if ( c->flags & (CLFLAGS_UPLINKSIM|CLFLAGS_UPLINKPORT) ||
|
||||||
c->state == CSTATE_COREPEER )
|
c->state == CSTATE_COREPEER )
|
||||||
continue;
|
continue;
|
||||||
|
|
@ -981,24 +981,45 @@ void send_keepalives(struct worker_t *self)
|
||||||
rc = client_write(self, c, buf, 0);
|
rc = client_write(self, c, buf, 0);
|
||||||
if (rc < -2) continue; // destroyed..
|
if (rc < -2) continue; // destroyed..
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* check for input timeouts */
|
||||||
|
if (c->flags & CLFLAGS_INPORT) {
|
||||||
|
if (c->last_read < tick - client_timeout) {
|
||||||
|
hlog(LOG_DEBUG, "Closing client %p fd %d (%s) due to inactivity (%d s)",
|
||||||
|
c, c->fd, c->addr_s, client_timeout);
|
||||||
|
close_client(self, c);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
if (c->last_read < tick - upstream_timeout) {
|
||||||
|
hlog(LOG_DEBUG, "Closing upstream %p fd %d (%s) due to inactivity (%d s)",
|
||||||
|
c, c->fd, c->addr_s, upstream_timeout);
|
||||||
|
close_client(self, c);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* check for write timeouts */
|
||||||
if (c->obuf_wtime < w_expire && c->state != CSTATE_UDP) {
|
if (c->obuf_wtime < w_expire && c->state != CSTATE_UDP) {
|
||||||
// TOO OLD! Shutdown the client
|
// TOO OLD! Shutdown the client
|
||||||
hlog( LOG_DEBUG,"Closing client %p fd %d (%s) due to obuf wtime timeout",
|
hlog( LOG_DEBUG,"Closing connection %p fd %d (%s) due to obuf wtime timeout",
|
||||||
c, c->fd, c->addr_s );
|
c, c->fd, c->addr_s );
|
||||||
close_client(self, c);
|
close_client(self, c);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* adjust buffering */
|
||||||
if (c->obuf_writes > obuf_writes_treshold) {
|
if (c->obuf_writes > obuf_writes_treshold) {
|
||||||
// Lots and lots of writes, switch to buffering...
|
// Lots and lots of writes, switch to buffering...
|
||||||
|
|
||||||
if (c->obuf_flushsize == 0)
|
if (c->obuf_flushsize == 0)
|
||||||
// hlog( LOG_DEBUG,"Switch client %p fd %d (%s) to buffered writes", c, c->fd, c->addr_s );
|
// hlog( LOG_DEBUG,"Switch connection %p fd %d (%s) to buffered writes", c, c->fd, c->addr_s );
|
||||||
|
|
||||||
c->obuf_flushsize = c->obuf_size - 200;
|
c->obuf_flushsize = c->obuf_size - 200;
|
||||||
} else {
|
} else {
|
||||||
// Not so much writes, back to "write immediate"
|
// Not so much writes, back to "write immediate"
|
||||||
if (c->obuf_flushsize != 0)
|
if (c->obuf_flushsize != 0)
|
||||||
// hlog( LOG_DEBUG,"Switch client %p fd %d (%s) to unbuffered writes", c, c->fd, c->addr_s );
|
// hlog( LOG_DEBUG,"Switch connection %p fd %d (%s) to unbuffered writes", c, c->fd, c->addr_s );
|
||||||
|
|
||||||
c->obuf_flushsize = 0;
|
c->obuf_flushsize = 0;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,76 @@
|
||||||
|
#########################
|
||||||
|
# Test the client and server socket timeouts
|
||||||
|
#########################
|
||||||
|
|
||||||
|
use Test;
|
||||||
|
|
||||||
|
my $id_ok;
|
||||||
|
|
||||||
|
BEGIN {
|
||||||
|
plan tests => 15
|
||||||
|
};
|
||||||
|
|
||||||
|
use Ham::APRS::IS_Fake;
|
||||||
|
use Ham::APRS::IS;
|
||||||
|
use runproduct;
|
||||||
|
use Time::HiRes qw(time sleep);
|
||||||
|
|
||||||
|
my $ret;
|
||||||
|
|
||||||
|
ok(1); # modules load fine
|
||||||
|
|
||||||
|
my $iss1 = new Ham::APRS::IS_Fake('127.0.0.1:10153', 'CORE1');
|
||||||
|
ok(1); # there's a working socket
|
||||||
|
|
||||||
|
$iss1->bind_and_listen();
|
||||||
|
ok(1);
|
||||||
|
|
||||||
|
# initialize the product runner using the basic configuration
|
||||||
|
my $p = new runproduct('aggregator');
|
||||||
|
ok(defined $p, 1, "Failed to initialize product runner");
|
||||||
|
ok($p->start(), 1, "Failed to start product");
|
||||||
|
|
||||||
|
my $is1 = $iss1->accept();
|
||||||
|
ok(defined $is1, (1), "Failed to accept connection 1 from server");
|
||||||
|
|
||||||
|
$iss1->send_login_prompt($is1);
|
||||||
|
$iss1->send_login_ok($is1);
|
||||||
|
|
||||||
|
my $t_start = time();
|
||||||
|
|
||||||
|
my $read1 = $is1->getline_noncomment(1);
|
||||||
|
ok($read1, qr/^user TESTING pass 31421 /, "Did not receive 'user' login command on connection 1");
|
||||||
|
|
||||||
|
$read1 = $is1->getline_noncomment(30);
|
||||||
|
ok($read1, undef, "Ouch, got data from server when none should have been available");
|
||||||
|
|
||||||
|
my $t_end = time();
|
||||||
|
my $t_dif = $t_end - $t_start;
|
||||||
|
warn sprintf("\ntimeout on uplink socket took %.3f s, should take 10s\n", $t_dif);
|
||||||
|
ok($t_dif > 9 && $t_dif < 15);
|
||||||
|
|
||||||
|
# create client connection
|
||||||
|
my $cl = new Ham::APRS::IS("localhost:10152", 'CL1ENT');
|
||||||
|
ok(defined $cl, 1, "Failed to initialize Ham::APRS::IS");
|
||||||
|
$ret = $cl->connect('retryuntil' => 8);
|
||||||
|
ok($ret, 1, "Failed to connect to the server: " . $cl->{'error'});
|
||||||
|
|
||||||
|
$t_start = time();
|
||||||
|
|
||||||
|
$read1 = $cl->getline_noncomment(60);
|
||||||
|
ok($read1, undef, "Ouch, received data from server on client connection when none should have been available");
|
||||||
|
|
||||||
|
$t_end = time();
|
||||||
|
$t_dif = $t_end - $t_start;
|
||||||
|
warn sprintf("\ntimeout on client socket took %.3f s, should take 20s\n", $t_dif);
|
||||||
|
ok($t_dif > 19 && $t_dif < 25);
|
||||||
|
|
||||||
|
$ret = $is1->disconnect();
|
||||||
|
ok($ret, 1, "Failed to disconnect 1: " . $is1->{'error'});
|
||||||
|
|
||||||
|
ok($p->stop(), 1, "Failed to stop product");
|
||||||
|
|
||||||
|
|
||||||
|
#########################
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -29,12 +29,13 @@ ExpiryInterval 30s
|
||||||
# How long to cache last position of each station
|
# How long to cache last position of each station
|
||||||
LastPositionCache 48h
|
LastPositionCache 48h
|
||||||
|
|
||||||
|
# NOTE: THESE SHORT TIMEOUTS ARE USED BY THE TIMEOUT TEST, DON'T CHANGE IN THIS CONFIG
|
||||||
# When no data is received from an upstream server in N seconds, switch to
|
# When no data is received from an upstream server in N seconds, switch to
|
||||||
# another server
|
# another server
|
||||||
UpstreamTimeout 10s
|
UpstreamTimeout 10s
|
||||||
|
|
||||||
# When no data is received from a downstream server in N seconds, disconnect
|
# When no data is received from a downstream server in N seconds, disconnect
|
||||||
ClientTimeout 48h
|
ClientTimeout 20s
|
||||||
|
|
||||||
### TCP listener ##########
|
### TCP listener ##########
|
||||||
# Listen <socketname> <porttype> tcp <address to bind> <port>
|
# Listen <socketname> <porttype> tcp <address to bind> <port>
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue