+ Almost completely implement messaging support for aprsc: keep

track of heard stations per client, and pass messages to them.
With tests. Expiration missing, and courtesy posits.


git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@365 3ce903b1-3385-4e86-93cd-f9a4a239f7ac
This commit is contained in:
Heikki Hannikainen 2012-05-29 21:12:59 +00:00
parent 7a8658fcfe
commit ec67313248
14 changed files with 360 additions and 49 deletions

View File

@ -66,7 +66,7 @@ profile:
OBJS = aprsc.o accept.o worker.o \
login.o incoming.o dupecheck.o outgoing.o \
clientlist.o \
clientlist.o client_heard.o \
parse_aprs.o parse_qc.o \
config.o netlib.o xpoll.o \
cfgfile.o passcode.o uplink.o \

99
src/client_heard.c Normal file
View File

@ -0,0 +1,99 @@
/*
* aprsc
*
* (c) Heikki Hannikainen, OH7LZB <hessu@hes.iki.fi>
*
* This program is licensed under the BSD license, which can be found
* in the file LICENSE.
*
*/
/*
* The client's heard list contains a list of stations heard by
* a given station. It's used for message routing by the
* message destination callsign.
*
* The heard list is only touched by the worker thread operating
* on that client socket, so it shouldn't need any locking at all.
*/
#include <strings.h>
#include <string.h>
#include "client_heard.h"
#include "hlog.h"
#include "hmalloc.h"
/*
* Update the heard list, either update timestamp of a heard
* callsign or insert a new entry
*/
void client_heard_update(struct client_t *c, struct pbuf_t *pb)
{
struct client_heard_t *h;
int call_len;
call_len = pb->srccall_end - pb->data;
hlog(LOG_DEBUG, "client_heard fd %d: updating heard table for %.*s", c->fd, call_len, pb->data);
for (h = c->client_heard; (h); h = h->next) {
if (call_len == h->call_len
&& strncasecmp(pb->data, h->callsign, h->call_len) == 0) {
// OK, found it from the list
hlog(LOG_DEBUG, "client_heard fd %d: found, updating %.*s", c->fd, call_len, pb->data);
h->last_heard = pb->t;
return;
}
}
/* Not found, insert. */
hlog(LOG_DEBUG, "client_heard fd %d: inserting %.*s", c->fd, call_len, pb->data);
h = hmalloc(sizeof(*h));
strncpy(h->callsign, pb->data, call_len);
h->callsign[sizeof(h->callsign)-1] = 0;
h->call_len = call_len;
h->last_heard = pb->t;
/* insert in beginning of linked list */
h->next = c->client_heard;
h->prevp = &c->client_heard;
if (h->next)
h->next->prevp = &h->next;
c->client_heard = h;
}
int client_heard_check(struct client_t *c, const char *callsign, int call_len)
{
struct client_heard_t *h;
hlog(LOG_DEBUG, "client_heard_check fd %d: checking heard table for %.*s", c->fd, call_len, callsign);
// TODO: expire old entries on the go
for (h = c->client_heard; (h); h = h->next) {
if (call_len == h->call_len
&& strncasecmp(callsign, h->callsign, h->call_len) == 0) {
// OK, found it from the list
hlog(LOG_DEBUG, "client_heard_check fd %d: found %.*s", c->fd, call_len, callsign);
return 1;
}
}
return 0;
}
/*
* Free the whole client heard list
*/
void client_heard_free(struct client_t *c)
{
struct client_heard_t *h;
while (c->client_heard) {
h = c->client_heard->next;
hfree(c->client_heard);
c->client_heard = h;
}
}

View File

@ -23,6 +23,7 @@
#include "historydb.h"
#include "cfgfile.h"
#include "keyhash.h"
#include "client_heard.h"
// static double rad2deg(double a) { return a * (180.0 * M_1_PI); }
@ -2192,6 +2193,16 @@ int filter_process(struct worker_t *self, struct client_t *c, struct pbuf_t *pb)
{
struct filter_t *f;
/* messaging support: if (1) this is a text message,
* (2) the client is an igate port,
* and (3) the message's recipient has been heard
* recently on the port, gate the message.
*/
if ((c->flags & CLFLAGS_IGATE) && (pb->packettype & T_MESSAGE)
&& client_heard_check(c, pb->dstname, pb->dstname_len)) {
return 1;
}
f = c->negdefaultfilters;
for ( ; f; f = f->h.next ) {
int rc = filter_process_one(c, pb, f);

View File

@ -50,7 +50,10 @@ struct history_cell_t {
int packetlen;
char *packet;
char packetbuf[170]; /* Maybe a dozen packets are bigger than
170 bytes long out of some 17 000 .. */
170 bytes long out of some 17 000 ...
The bigger ones are malloc()'ed to the
*packet pointer, small ones go to
packetbuf where *packet points to. */
};
#define HISTORYDB_CELL_SIZE sizeof(struct history_cell_t)

View File

@ -24,6 +24,7 @@
#include "parse_aprs.h"
#include "parse_qc.h"
#include "filter.h"
#include "client_heard.h"
#include "cellmalloc.h"
@ -577,6 +578,13 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
/* Filter preprocessing before sending this to dupefilter.. */
filter_preprocess_dupefilter(pb);
/* If the packet came in on a filtered port, and if it's a position packet
* (not object/item), mark the station as heard on this port, so that
* messages can be routed to it.
*/
if ((c->flags & CLFLAGS_IGATE) && (pb->packettype & T_POSITION))
client_heard_update(c, pb);
/* put the buffer in the thread's incoming queue */
*self->pbuf_incoming_local_last = pb;
self->pbuf_incoming_local_last = &pb->next;

View File

@ -1148,11 +1148,13 @@ int parse_aprs(struct worker_t *self, struct pbuf_t *pb)
// them the same way in filters as we do those with real
// positions..
{
/* collect destination callsign of the message */
char keybuf[CALLSIGNLEN_MAX+1];
const char *p;
int i;
struct history_cell_t *history;
pb->dstname = body;
p = body;
for (i = 0; i < CALLSIGNLEN_MAX; ++i) {
keybuf[i] = *p;
@ -1160,9 +1162,19 @@ int parse_aprs(struct worker_t *self, struct pbuf_t *pb)
// to 9 chars, while our historydb is not.
if (*p == 0 || *p == ' ' || *p == ':')
break;
p++;
}
keybuf[i] = 0;
pb->dstname_len = p - body;
hlog(LOG_DEBUG, "message: dstname len %d", pb->dstname_len);
/*
* This adds a position for a message based on the
* recipient, causing it to match an area filter.
* This is not what javAPRSSrvr does, so let's not do it
* quite yet. Compatibility first, at first.
*/
/*
i = historydb_lookup( keybuf, i, &history );
if (i > 0) {
pb->lat = history->lat;
@ -1172,6 +1184,7 @@ int parse_aprs(struct worker_t *self, struct pbuf_t *pb)
pb->flags |= F_HASPOS;
return 1;
}
*/
}
return 0;

View File

@ -31,6 +31,7 @@
#include "filter.h"
#include "dupecheck.h"
#include "clientlist.h"
#include "client_heard.h"
#include "cellmalloc.h"
time_t now; /* current time, updated by the main thread, MAY be spun around by the simulator */
@ -313,6 +314,8 @@ void client_free(struct client_t *c)
filter_free(c->posuserfilters);
filter_free(c->neguserfilters);
client_heard_free(c);
client_udp_free(c->udpclient);
clientlist_remove(c);

View File

@ -105,6 +105,7 @@ struct pbuf_t {
uint16_t flags; /* bitmask: one or more of F_* */
uint16_t srcname_len; /* parsed length of source (object, item, srcall) name 3..9 */
uint16_t dstcall_len; /* parsed length of destination callsign *including* SSID */
uint16_t dstname_len; /* parsed length of message destination including SSID */
uint16_t entrycall_len;
int packet_len; /* the actual length of the packet, including CRLF */
@ -116,6 +117,7 @@ struct pbuf_t {
const char *qconst_start; /* "qAX,incomingSSID:" -- for q and e filters */
const char *info_start; /* pointer to start of info field */
const char *srcname; /* source's name (either srccall or object/item name) */
const char *dstname; /* message destination callsign */
float lat; /* if the packet is PT_POSITION, latitude and longitude go here */
float lng; /* .. in RADIAN */
@ -152,8 +154,17 @@ union sockaddr_u {
struct sockaddr_in6 si6;
};
#define WBUF_ADJUSTER 0 /* Client WBUF adjustment can be usefull -- but code is infant.. */
/* list of message recipient callsigns heard on a client port */
struct client_heard_t {
char callsign[CALLSIGNLEN_MAX+1];
int call_len;
time_t last_heard;
struct client_heard_t *next;
struct client_heard_t **prevp;
};
#define WBUF_ADJUSTER 0 /* Client WBUF adjustment can be usefull -- but code is infant.. */
struct portaccount_t { /* Port accounter tracks port usage, and traffic
Reporting looks up these via listener list. */
@ -270,6 +281,11 @@ struct client_t {
struct filter_t *posuserfilters;
struct filter_t *neguserfilters;
/* List of station callsigns (not objects/items!) which have been
* heard by this client. Only collected for filtered ports!
* Used for deciding if messages should be routed here.
*/
struct client_heard_t *client_heard;
// Maybe we use these four items, or maybe not.
// They are there for experimenting with outgoing queue processing algorithms.

View File

@ -28,7 +28,7 @@ TNCLogPorts=127.0.0.1:14508
# Status Page Parameters
#
statusPorts=55501
plainStatusPorts554511
plainStatusPorts=554511
portTable=10152;Full APRS-IS Feed;14580;User-defined Filtered Feed;14581;User-defined Filtered Feed;1314;Message-only Feed
#
# Client Port Properties

View File

@ -24,14 +24,14 @@ my $server_call = "TESTING";
my $i_rx = new Ham::APRS::IS("localhost:55152", $login);
ok(defined $i_rx, 1, "Failed to initialize Ham::APRS::IS");
warn "accepting\n";
#warn "accepting\n";
my $is1 = $iss1->accept();
ok(defined $is1, (1), "Failed to accept connection 1 from server");
warn "sending login prompt\n";
#warn "sending login prompt\n";
$iss1->send_login_prompt($is1);
warn "sending login ok\n";
#warn "sending login ok\n";
$iss1->send_login_ok($is1);
my $ret;
@ -52,7 +52,7 @@ ok($ret, 1, "Failed to connect to the server: " . $i_rx->{'error'});
# }
#
warn "doing test 1\n";
#warn "doing test 1\n";
# (2):
istest::txrx(\&ok, $is1, $i_rx,
@ -62,17 +62,17 @@ istest::txrx(\&ok, $is1, $i_rx,
# unbind the IPv4 server and create IPv6 server
$iss1->unbind();
warn "switching to ipv6\n";
#warn "switching to ipv6\n";
my $iss6 = new Ham::APRS::IS_Fake('[::1]:54153', 'CORE6');
ok(defined $iss6, 1, "Test failed to initialize listening server socket on IPv6");
$iss6->bind_and_listen();
warn "disconnecting uplink 1\n";
#warn "disconnecting uplink 1\n";
$is1->disconnect();
warn "accepting ipv6 connect\n";
#warn "accepting ipv6 connect\n";
my $is6 = $iss6->accept();
ok(defined $is6, (1), "Failed to accept connection ipv6 from server");

View File

@ -33,15 +33,11 @@ BEGIN {
push @packets, $packet;
}
plan tests => 9 + ($#packets+1) + 3 + 2;
plan tests => 6 + ($#packets+1) + 5;
};
ok(1); # If we made it this far, we're ok.
my $iss1 = new Ham::APRS::IS_Fake('127.0.0.1:54153', 'CORE1');
ok(defined $iss1, 1, "Test failed to initialize listening server socket");
$iss1->bind_and_listen();
my $iss6 = new Ham::APRS::IS_Fake('[::1]:54153', 'CORE6');
ok(defined $iss6, 1, "Test failed to initialize listening server socket on IPv6");
$iss6->bind_and_listen();
@ -56,12 +52,6 @@ my $server_call = "TESTING";
my $i_rx = new Ham::APRS::IS("localhost:55152", $login);
ok(defined $i_rx, 1, "Failed to initialize Ham::APRS::IS");
my $is1 = $iss1->accept();
ok(defined $is1, (1), "Failed to accept connection 1 from server");
$iss1->send_login_prompt($is1);
my $log1 = $is1->getline_noncomment(1);
$iss1->send_login_ok($is1);
my $is6 = $iss6->accept();
ok(defined $is6, (1), "Failed to accept connection ipv6 from server");
$iss6->send_login_prompt($is6);
@ -74,16 +64,13 @@ ok($ret, 1, "Failed to connect to the server: " . $i_rx->{'error'});
# do the actual tests
# (1):
#istest::txrx(\&ok, $is1, $i_rx,
# "SRC>DST,DIGI1,DIGI2*,qAI,FOOBA,BLAA:testing qAI (1)",
# "SRC>DST,DIGI1,DIGI2*,qAI,FOOBA,BLAA,7F000001,$server_call:testing qAI (1)");
my $maxlen = 509;
$maxlen = 510 if ($ENV{'TEST_PRODUCT'} eq 'javap');
# (1):
foreach my $packet (@packets) {
my $expect = $packet;
$expect =~ s/:/,00000000000000000000000000000001,$server_call:/;
if (length($expect) > 509) {
if (length($expect) > $maxlen) {
$is6->sendline($packet);
ok(1);
} else {
@ -96,8 +83,6 @@ foreach my $packet (@packets) {
my $read1;
$read1 = $is6->getline_noncomment(1);
ok($read1, undef, "Ouch, received data from read-only upstream connection ipv6");
$read1 = $is1->getline_noncomment(1);
ok($read1, undef, "Ouch, received data from read-only upstream connection 1");
$read1 = $i_rx->getline_noncomment(1);
ok($read1, undef, "Ouch, received unexpected data from full stream");

View File

@ -33,7 +33,7 @@ BEGIN {
push @packets, $packet;
}
plan tests => 9 + ($#packets+1) + 3 + 2;
plan tests => 8 + ($#packets+1) + 2 + 2;
};
ok(1); # If we made it this far, we're ok.
@ -56,11 +56,11 @@ my $server_call = "TESTING";
my $i_rx = new Ham::APRS::IS("localhost:55152", $login);
ok(defined $i_rx, 1, "Failed to initialize Ham::APRS::IS");
my $is1 = $iss1->accept();
ok(defined $is1, (1), "Failed to accept connection 1 from server");
$iss1->send_login_prompt($is1);
my $log1 = $is1->getline_noncomment(1);
$iss1->send_login_ok($is1);
#my $is1 = $iss1->accept();
#ok(defined $is1, (1), "Failed to accept connection 1 from server");
#$iss1->send_login_prompt($is1);
#my $log1 = $is1->getline_noncomment(1);
#$iss1->send_login_ok($is1);
my $is6 = $iss6->accept();
ok(defined $is6, (1), "Failed to accept connection ipv6 from server");
@ -74,18 +74,20 @@ ok($ret, 1, "Failed to connect to the server: " . $i_rx->{'error'});
# do the actual tests
# (1):
#istest::txrx(\&ok, $is1, $i_rx,
# "SRC>DST,DIGI1,DIGI2*,qAI,FOOBA,BLAA:testing qAI (1)",
# "SRC>DST,DIGI1,DIGI2*,qAI,FOOBA,BLAA,7F000001,$server_call:testing qAI (1)");
my $maxlen = 509;
$maxlen = 510 if ($ENV{'TEST_PRODUCT'} eq 'javap');
# (1):
foreach my $packet (@packets) {
my $expect = $packet;
$expect =~ s/IGATE:/IGATE,00000000000000000000000000000001,$server_call:/;
if (length($expect) > 509) {
$is6->sendline($packet);
ok(1);
if (length($expect) > $maxlen) {
my $res = $is6->sendline($packet);
if ($res) {
ok(1);
} else {
ok(undef, 1, "Ouch, write to server failed");
}
} else {
istest::txrx(\&ok, $is6, $i_rx,
$packet,
@ -96,8 +98,8 @@ foreach my $packet (@packets) {
my $read1;
$read1 = $is6->getline_noncomment(1);
ok($read1, undef, "Ouch, received data from read-only upstream connection ipv6");
$read1 = $is1->getline_noncomment(1);
ok($read1, undef, "Ouch, received data from read-only upstream connection 1");
#$read1 = $is1->getline_noncomment(1);
#ok($read1, undef, "Ouch, received data from read-only upstream connection 1");
$read1 = $i_rx->getline_noncomment(1);
ok($read1, undef, "Ouch, received unexpected data from full stream");

171
tests/t/40messaging.t Normal file
View File

@ -0,0 +1,171 @@
#
# Test messaging features:
#
# On a filtered igate port (14580), no messages should come out at first.
# When a position of a station has been heard, messages for that station
# should come out, together with a complementary position packet
# of the originating packet (but not too often?).
#
# Messages transmitted to any SSID must be passed.
#
# Are messages transmitted to objects passed, too?
#
# When a position has been heard, positions for the same callsign-ssid
# from other igates should come out too, to assist TX igates to know
# the station is on the Internet.
#
use Test;
BEGIN { plan tests => 7 + 5 + 2 + 2 + 6 + 4 };
use runproduct;
use istest;
use Ham::APRS::IS;
use Encode;
use utf8;
my $enc_utf8 = find_encoding("UTF-8") || die "Could not load encoding UTF-8"; # note: strict UTF-8
ok(1); # If we made it this far, we're ok.
my $p = new runproduct('basic');
ok(defined $p, 1, "Failed to initialize product runner");
ok($p->start(), 1, "Failed to start product");
my $server_call = "TESTING";
my $login_tx = "N0GATE";
my $i_tx = new Ham::APRS::IS("localhost:55580", $login_tx,
'filter' => 'r/60.4752/25.0947/1');
ok(defined $i_tx, 1, "Failed to initialize Ham::APRS::IS");
# We set a filter on the rx so that the helper packets get through
my $login_rx = "N1GATE";
my $i_rx = new Ham::APRS::IS("localhost:55580", $login_rx,
'filter' => 'r/60.4752/25.0947/1');
ok(defined $i_rx, 1, "Failed to initialize Ham::APRS::IS");
my $ret;
$ret = $i_tx->connect('retryuntil' => 8);
ok($ret, 1, "Failed to connect to the server: " . $i_tx->{'error'});
$ret = $i_rx->connect('retryuntil' => 8);
ok($ret, 1, "Failed to connect to the server: " . $i_rx->{'error'});
# do the actual tests
my $msg_src = "M1SRC";
my $msg_dst = "M1DST";
my($tx, $rx, $helper);
# first, verify that a message packet is not passed to a filtered port
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,$login_tx,I::%-9.9s:message", $msg_dst);
$helper = "H1LP>APRS,OH2RDG*,WIDE:!6028.51N/02505.68E# should pass";
istest::should_drop(\&ok, $i_tx, $i_rx, $tx, $helper);
# now, transmit a position packet on the receiving filtered port
$tx = "$msg_dst>APRS,OH2RDG*,WIDE,$login_rx,I:!6028.51N/02505.68E# should pass";
$rx = "$msg_dst>APRS,OH2RDG*,WIDE,qAR,$login_rx:!6028.51N/02505.68E# should pass";
istest::txrx(\&ok, $i_rx, $i_tx, $tx, $rx);
# then, a message packet should magically pass!
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,%s,I::%-9.9s:message", $login_tx, $msg_dst);
$rx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,qAR,%s::%-9.9s:message", $login_tx, $msg_dst);
istest::txrx(\&ok, $i_tx, $i_rx, $tx, $rx);
# Another message! With UTF-8 content.
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,%s,I::%-9.9s:Blää blåå 日本語{1d", $login_tx, $msg_dst);
$rx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,qAR,%s::%-9.9s:Blää blåå 日本語{1d", $login_tx, $msg_dst);
$tx = $enc_utf8->encode($tx);
$rx = $enc_utf8->encode($rx);
istest::txrx(\&ok, $i_tx, $i_rx, $tx, $rx);
# Also, it should pass to another SSID!
# NO, javaprssrvr does not pass this.
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,%s,I::%-9.9s:message with SSID{a", $login_tx, $msg_dst . '-5');
#$rx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,qAR,%s::%-9.9s:message with SSID{a", $login_tx, $msg_dst . '-5');
#istest::txrx(\&ok, $i_tx, $i_rx, $tx, $rx);
$helper = "H1LP>APRS,OH2RDG*,WIDE:!6028.51N/02505.68E# should pass5";
istest::should_drop(\&ok, $i_tx, $i_rx, $tx, $helper);
#
# Message to an OBJECT
#
my $msg_obj = 'OBJDST';
# transmit the object on the receiving filtered port
$tx = sprintf("$msg_dst>APRS,OH2RDG*,WIDE,$login_rx,I:;%-9.9s*111111z6028.51N/02505.68Ercomment", $msg_obj);
$rx = sprintf("$msg_dst>APRS,OH2RDG*,WIDE,qAR,$login_rx:;%-9.9s*111111z6028.51N/02505.68Ercomment", $msg_obj);
istest::txrx(\&ok, $i_rx, $i_tx, $tx, $rx);
# no, it should not pass at the moment
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,%s,I::%-9.9s:message to object", $login_tx, $msg_obj);
#$rx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,qAR,%s::%-9.9s:message to object", $login_tx, $msg_obj);
#istest::txrx(\&ok, $i_tx, $i_rx, $tx, $rx);
$helper = "H1LP>APRS,OH2RDG*,WIDE:!6028.51N/02505.68E# should pass6";
istest::should_drop(\&ok, $i_tx, $i_rx, $tx, $helper);
#
# Message to an ITEM
#
my $msg_item = 'ITEDST';
# transmit the item on the receiving filtered port
$tx = sprintf("$msg_dst>APRS,OH2RDG*,WIDE,$login_rx,I:)%s!6028.51N/02505.68Ercomment", $msg_item);
$rx = sprintf("$msg_dst>APRS,OH2RDG*,WIDE,qAR,$login_rx:)%s!6028.51N/02505.68Ercomment", $msg_item);
istest::txrx(\&ok, $i_rx, $i_tx, $tx, $rx);
# no, it should not pass at the moment
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,%s,I::%-9.9s:message to item", $login_tx, $msg_item);
#$rx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,qAR,%s::%-9.9s:message to item", $login_tx, $msg_item);
#istest::txrx(\&ok, $i_tx, $i_rx, $tx, $rx);
$helper = "H1LP>APRS,OH2RDG*,WIDE:!6028.51N/02505.68E# should pass7";
istest::should_drop(\&ok, $i_tx, $i_rx, $tx, $helper);
#
# Connect another igate and see what happens when there are
# two gates hearing the same station!
#
# We set a filter on the rx so that the helper packets get through
my $login_rx2 = "N2GATE";
my $i_rx2 = new Ham::APRS::IS("localhost:55580", $login_rx2,
'filter' => 'r/60.4752/25.0947/1');
ok(defined $i_rx2, 1, "Failed to initialize Ham::APRS::IS");
$ret = $i_rx2->connect('retryuntil' => 8);
ok($ret, 1, "Failed to connect to the server: " . $i_rx2->{'error'});
# Now, transmit a position packet on the second receiving filtered port.
# It will come out on the first receiving filtered port due to the
# range filter *and* due to it being heard there, too.
$tx = "$msg_dst>APRS,OH2RDG*,WIDE,$login_rx,I:!6028.51N/02505.68E# should pass 2nd";
$rx = "$msg_dst>APRS,OH2RDG*,WIDE,qAr,$login_rx:!6028.51N/02505.68E# should pass 2nd";
istest::txrx(\&ok, $i_rx2, $i_tx, $tx, $rx);
my $read1 = $i_rx->getline_noncomment(1);
ok($read1, $rx, "Got wrong line from first rx port");
# then, a message packet should magically pass! To both!
$tx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,%s,I::%-9.9s:two gates", $login_tx, $msg_dst);
$rx = sprintf("$msg_src>APRS,OH2RDG*,WIDE,qAR,%s::%-9.9s:two gates", $login_tx, $msg_dst);
istest::txrx(\&ok, $i_tx, $i_rx2, $tx, $rx);
$read1 = $i_rx->getline_noncomment(1);
ok($read1, $rx, "Got wrong message line from first rx port");
# disconnect
$ret = $i_rx->disconnect();
ok($ret, 1, "Failed to disconnect from the server: " . $i_rx->{'error'});
$ret = $i_rx2->disconnect();
ok($ret, 1, "Failed to disconnect from the server: " . $i_rx2->{'error'});
$ret = $i_tx->disconnect();
ok($ret, 1, "Failed to disconnect from the server: " . $i_tx->{'error'});
# stop
ok($p->stop(), 1, "Failed to stop product");