Merge pull request #71 from hessu/feature/sctp
sctp: Improve SCTP performance and logging, mark SCTP in status web
This commit is contained in:
commit
f4d444c277
|
|
@ -16,7 +16,7 @@ jobs:
|
|||
|
||||
- name: install deps
|
||||
run: |
|
||||
sudo apt-get install -y libevent-dev perl-modules libio-socket-inet6-perl libjson-xs-perl libwww-perl
|
||||
sudo apt-get install -y libevent-dev perl-modules libio-socket-inet6-perl libjson-xs-perl libwww-perl libsctp-dev
|
||||
|
||||
- name: configure
|
||||
run: |
|
||||
|
|
|
|||
|
|
@ -32,6 +32,16 @@ worker.h
|
|||
marks duplicate packets as such. Dupecheck returns the packets to
|
||||
the workers which then pass on them to the right clients.
|
||||
|
||||
ssl.c
|
||||
SSL/TLS encryption and authentication code for APRS-IS sockets.
|
||||
|
||||
sctp.c
|
||||
SCTP protocol support. sctp(7) says: "Like TCP, SCTP provides
|
||||
reliable, connection oriented data delivery with congestion control.
|
||||
Unlike TCP, SCTP also provides message boundary preservation,
|
||||
ordered and unordered message delivery, multi-streaming and
|
||||
multi-homing."
|
||||
|
||||
login.c
|
||||
Contains a login_handler() function which is called by the
|
||||
worker thread to process an incoming "user" command from
|
||||
|
|
|
|||
|
|
@ -1590,7 +1590,10 @@ int accept_listener_status(cJSON *listeners, cJSON *totals)
|
|||
cJSON_AddNumberToObject(jl, "fd", l->fd);
|
||||
cJSON_AddNumberToObject(jl, "id", l->id);
|
||||
cJSON_AddStringToObject(jl, "name", l->name);
|
||||
cJSON_AddStringToObject(jl, "proto", (l->udp) ? "udp" : "tcp");
|
||||
if (l->ai_protocol == IPPROTO_SCTP)
|
||||
cJSON_AddStringToObject(jl, "proto", "sctp");
|
||||
else
|
||||
cJSON_AddStringToObject(jl, "proto", (l->udp) ? "udp" : "tcp");
|
||||
cJSON_AddStringToObject(jl, "addr", l->addr_s);
|
||||
if (l->filter_s)
|
||||
cJSON_AddStringToObject(jl, "filter", l->filter_s);
|
||||
|
|
|
|||
|
|
@ -2,12 +2,12 @@ Source: aprsc
|
|||
Section: hamradio
|
||||
Priority: extra
|
||||
Maintainer: Heikki Hannikainen <hessu@hes.iki.fi>
|
||||
Build-Depends: debhelper (>= 4), fakeroot (>= 1), lsb-release (>= 3), lsb-base (>= 3), libevent-dev, libssl-dev, libcap-dev, libz-dev
|
||||
Build-Depends: debhelper (>= 4), fakeroot (>= 1), lsb-release (>= 3), lsb-base (>= 3), libevent-dev, libssl-dev, libcap-dev, libz-dev, libsctp-dev
|
||||
Standards-Version: 3.7.2
|
||||
|
||||
Package: aprsc
|
||||
Architecture: any
|
||||
Depends: ${shlibs:Depends}, adduser (>= 2), libwww-perl (>= 5), libjson-xs-perl (>= 2), libcap2-bin (>= 2)
|
||||
Depends: ${shlibs:Depends}, adduser (>= 2), libwww-perl (>= 5), libjson-xs-perl (>= 2), libcap2-bin (>= 2), libsctp1 (>= 1)
|
||||
Description: APRS-IS server
|
||||
aprsc is an APRS-IS server intended to be used on core and Tier2 servers.
|
||||
.
|
||||
|
|
|
|||
227
src/sctp.c
227
src/sctp.c
|
|
@ -36,6 +36,12 @@ int sctp_set_client_sockopt(struct client_t *c)
|
|||
hlog(LOG_ERR, "setsockopt(%s, SCTP_DEFAULT_SEND_PARAM): %s", c->addr_rem, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
int enable = 1;
|
||||
if (setsockopt(c->fd, IPPROTO_SCTP, SCTP_NODELAY, &enable, sizeof(enable)) == -1) {
|
||||
hlog(LOG_ERR, "setsockopt(%s, SCTP_NODELAY): %s", c->addr_rem, strerror(errno));
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* which notifications do we want? */
|
||||
struct sctp_event_subscribe subscribe;
|
||||
|
|
@ -86,10 +92,10 @@ static int sctp_rx_assoc_change(struct client_t *c, union sctp_notification *sn)
|
|||
{
|
||||
switch (sn->sn_assoc_change.sac_state) {
|
||||
case SCTP_COMM_UP:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_COMM_UP", c->addr_rem, c->username);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP COMM_UP - connection established", c->addr_rem, c->username);
|
||||
break;
|
||||
case SCTP_COMM_LOST:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_COMM_LOST", c->addr_rem, c->username);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP COMM_LOST - connection failed", c->addr_rem, c->username);
|
||||
break;
|
||||
case SCTP_RESTART:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_RESTART", c->addr_rem, c->username);
|
||||
|
|
@ -118,22 +124,22 @@ static int sctp_rx_peer_addr_change(struct client_t *c, union sctp_notification
|
|||
|
||||
switch (sn->sn_paddr_change.spc_state) {
|
||||
case SCTP_ADDR_AVAILABLE:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_ADDR_AVAILABLE: %s", c->addr_rem, c->username, addr_s);
|
||||
hlog(LOG_DEBUG, "%s/%s: SCTP peer address available: %s", c->addr_rem, c->username, addr_s);
|
||||
break;
|
||||
case SCTP_ADDR_UNREACHABLE:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_ADDR_UNREACHABLE: %s", c->addr_rem, c->username, addr_s);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP peer address unreachable: %s", c->addr_rem, c->username, addr_s);
|
||||
break;
|
||||
case SCTP_ADDR_REMOVED:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_ADDR_REMOVED: %s", c->addr_rem, c->username, addr_s);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP peer address removed: %s", c->addr_rem, c->username, addr_s);
|
||||
break;
|
||||
case SCTP_ADDR_ADDED:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_ADDR_ADDED: %s", c->addr_rem, c->username, addr_s);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP peer address added: %s", c->addr_rem, c->username, addr_s);
|
||||
break;
|
||||
case SCTP_ADDR_MADE_PRIM:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_ADDR_MADE_PRIM: %s", c->addr_rem, c->username, addr_s);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP peer address made primary: %s", c->addr_rem, c->username, addr_s);
|
||||
break;
|
||||
case SCTP_ADDR_CONFIRMED:
|
||||
hlog(LOG_DEBUG, "%s/%s: Received SCTP_ADDR_CONFIRMED: %s", c->addr_rem, c->username, addr_s);
|
||||
hlog(LOG_INFO, "%s/%s: SCTP peer address confirmed: %s", c->addr_rem, c->username, addr_s);
|
||||
break;
|
||||
default:
|
||||
hlog(LOG_DEBUG, "%s/%s: SCTP Received unexpected peer_addr_change %d: %s",
|
||||
|
|
@ -157,8 +163,8 @@ static int sctp_rx_notification(struct client_t *c, struct msghdr *m)
|
|||
case SCTP_SHUTDOWN_EVENT: {
|
||||
struct sctp_shutdown_event *shut;
|
||||
shut = (struct sctp_shutdown_event *)m->msg_iov->iov_base;
|
||||
hlog(LOG_INFO, "SCTP shutdown on assoc id %d", shut->sse_assoc_id);
|
||||
break;
|
||||
hlog(LOG_INFO, "%s/%s: SCTP shutdown on assoc id %d", c->addr_rem, c->username, shut->sse_assoc_id);
|
||||
return -1;
|
||||
}
|
||||
case SCTP_ASSOC_CHANGE:
|
||||
return sctp_rx_assoc_change(c, sn);
|
||||
|
|
@ -173,11 +179,20 @@ static int sctp_rx_notification(struct client_t *c, struct msghdr *m)
|
|||
case SCTP_PARTIAL_DELIVERY_EVENT:
|
||||
hlog(LOG_DEBUG, "%s/%s: SCTP partial delivery event", c->addr_rem, c->username);
|
||||
return 0;
|
||||
case SCTP_ADAPTATION_INDICATION:
|
||||
hlog(LOG_DEBUG, "%s/%s: SCTP adaptation indication", c->addr_rem, c->username);
|
||||
return 0;
|
||||
case SCTP_AUTHENTICATION_INDICATION:
|
||||
hlog(LOG_DEBUG, "%s/%s: SCTP authentication indication", c->addr_rem, c->username);
|
||||
return 0;
|
||||
case SCTP_SENDER_DRY_EVENT:
|
||||
hlog(LOG_DEBUG, "%s/%s: SCTP sender dry", c->addr_rem, c->username);
|
||||
return 0;
|
||||
default:
|
||||
hlog(LOG_ERR, "%s/%s: sctp_rx_notification: Received unexpected notification: %d",
|
||||
c->addr_rem, c->username, sn->sn_header.sn_type);
|
||||
};
|
||||
|
||||
hlog(LOG_ERR, "%s/%s: sctp_rx_notification: Received unexpected notification: %d",
|
||||
c->addr_rem, c->username, sn->sn_header.sn_type);
|
||||
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
|
@ -225,28 +240,25 @@ int sctp_readable(struct worker_t *self, struct client_t *c)
|
|||
return 0;
|
||||
}
|
||||
|
||||
hlog(LOG_INFO, "sctp_readable: recvmsg returned %d: %s", e, strerror(errno));
|
||||
|
||||
hlog(LOG_INFO, "%s/%s: sctp_readable: recvmsg returned %d: %s", c->addr_rem, c->username, e, strerror(errno));
|
||||
client_close(self, c, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (e == 0) {
|
||||
hlog( LOG_DEBUG, "sctp_readable: EOF from socket fd %d (%s @ %s)",
|
||||
c->fd, c->addr_rem, c->addr_loc );
|
||||
hlog(LOG_DEBUG, "%s/%s: sctp_readable: EOF from socket fd %d",
|
||||
c->addr_rem, c->username, c->fd);
|
||||
client_close(self, c, CLIERR_EOF);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (inmsg.msg_flags & MSG_NOTIFICATION) {
|
||||
hlog(LOG_DEBUG, "sctp_readable: got MSG_NOTIFICATION");
|
||||
//hlog(LOG_DEBUG, "%s/%s: sctp_readable: got MSG_NOTIFICATION", c->addr_rem, c->username);
|
||||
sctp_rx_notification(c, &inmsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
//hlog_packet(LOG_DEBUG, iov.iov_base, e, "sctp_readable: got data: ");
|
||||
c->ibuf[e++] = '\r';
|
||||
c->ibuf[e++] = '\n';
|
||||
|
||||
return client_postread(self, c, e);
|
||||
}
|
||||
|
|
@ -268,146 +280,51 @@ int sctp_writable(struct worker_t *self, struct client_t *c)
|
|||
|
||||
int sctp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
|
||||
{
|
||||
//hlog_packet(LOG_DEBUG, p, len, "client_write_sctp %d bytes: ", len);
|
||||
//hlog_packet(LOG_DEBUG, p, len, "%s/%s: client_write_sctp %d bytes: ", c->addr_rem, c->username, len);
|
||||
|
||||
if (len == 0)
|
||||
return 0;
|
||||
if (len > 0) {
|
||||
c->obuf_writes++;
|
||||
if (client_buffer_outgoing_data(self, c, p, len) == -12)
|
||||
return -12;
|
||||
clientaccount_add( c, IPPROTO_SCTP, 0, 0, len, 0, 0, 0);
|
||||
if (c->obuf_writes > obuf_writes_threshold) {
|
||||
// Lots and lots of writes, switch to buffering...
|
||||
if (c->obuf_flushsize == 0) {
|
||||
c->obuf_flushsize = c->obuf_size / 2;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/* Is it over the flush size ? */
|
||||
if (c->obuf_end > c->obuf_flushsize || ((len == 0) && (c->obuf_end > c->obuf_start))) {
|
||||
int to_send = c->obuf_end - c->obuf_start;
|
||||
int i = send(c->fd, c->obuf + c->obuf_start, to_send, 0);
|
||||
|
||||
clientaccount_add( c, IPPROTO_SCTP, 0, 0, len, 0, 0, 0);
|
||||
|
||||
int i = send(c->fd, p, len-2, 0);
|
||||
|
||||
if (i < 0) {
|
||||
hlog(LOG_ERR, "SCTP transmit error to fd %d / %s: %s",
|
||||
c->fd, c->addr_rem, strerror(errno));
|
||||
} else if (i != len -2) {
|
||||
hlog(LOG_ERR, "SCTP transmit incomplete to fd %d / %s: wrote %d of %d bytes, errno: %s",
|
||||
c->fd, c->addr_rem, i, len-2, strerror(errno));
|
||||
} else {
|
||||
//hlog(LOG_DEBUG, "SCTP transmit ok to %s: %d bytes", c->addr_rem, i);
|
||||
if (i < 0) {
|
||||
hlog(LOG_ERR, "%s/%s: SCTP transmit error to fd %d: %s",
|
||||
c->addr_rem, c->username, c->fd, strerror(errno));
|
||||
client_close(self, c, errno);
|
||||
return -9;
|
||||
} else if (i != to_send) {
|
||||
// Incomplete write with SCTP is not great, as we might not have ordered delivery.
|
||||
hlog(LOG_ERR, "%s/%s: SCTP transmit incomplete to fd %d: wrote %d of %d bytes, errno: %s",
|
||||
c->addr_rem, c->username, c->fd, i, to_send, strerror(errno));
|
||||
client_close(self, c, errno);
|
||||
return -9;
|
||||
} else {
|
||||
//hlog(LOG_DEBUG, "%s/%s: SCTP transmit ok: %d bytes", c->addr_rem, c->username, i);
|
||||
}
|
||||
c->obuf_start += i;
|
||||
c->obuf_wtime = tick;
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
|
||||
#if 0
|
||||
|
||||
|
||||
/*
|
||||
* SCTP notification received
|
||||
*/
|
||||
|
||||
static int sctp_rx_assoc_change(struct listen_t *l, union sctp_notification *sn)
|
||||
{
|
||||
switch (sn->sn_assoc_change.sac_state) {
|
||||
case SCTP_COMM_UP:
|
||||
hlog(LOG_DEBUG, "Received SCTP_COMM_UP");
|
||||
break;
|
||||
case SCTP_COMM_LOST:
|
||||
hlog(LOG_DEBUG, "Received SCTP_COMM_LOST");
|
||||
break;
|
||||
case SCTP_RESTART:
|
||||
hlog(LOG_DEBUG, "Received SCTP_RESTART");
|
||||
break;
|
||||
case SCTP_SHUTDOWN_COMP:
|
||||
hlog(LOG_DEBUG, "Received SCTP_SHUTDOWN_COMP");
|
||||
break;
|
||||
case SCTP_CANT_STR_ASSOC:
|
||||
hlog(LOG_DEBUG, "Received SCTP_CANT_STR_ASSOC");
|
||||
break;
|
||||
default:
|
||||
hlog(LOG_DEBUG, "Received assoc_change %d", sn->sn_assoc_change.sac_state);
|
||||
break;
|
||||
/* All done ? */
|
||||
if (c->obuf_start >= c->obuf_end) {
|
||||
//hlog(LOG_DEBUG, "%s/%s: client_write obuf empty", c->addr_rem, c->username, c->addr_rem);
|
||||
c->obuf_start = 0;
|
||||
c->obuf_end = 0;
|
||||
}
|
||||
|
||||
if (sn->sn_assoc_change.sac_state == SCTP_COMM_UP)
|
||||
return sn->sn_assoc_change.sac_assoc_id;
|
||||
|
||||
return 0;
|
||||
|
||||
return len;
|
||||
}
|
||||
|
||||
static int sctp_rx_notification(struct listen_t *l, struct msghdr *m)
|
||||
{
|
||||
union sctp_notification *sn;
|
||||
|
||||
sn = (union sctp_notification *)m->msg_iov->iov_base;
|
||||
|
||||
switch(sn->sn_header.sn_type) {
|
||||
case SCTP_SHUTDOWN_EVENT: {
|
||||
struct sctp_shutdown_event *shut;
|
||||
shut = (struct sctp_shutdown_event *)m->msg_iov->iov_base;
|
||||
hlog(LOG_INFO, "SCTP shutdown on assoc id %d", shut->sse_assoc_id);
|
||||
break;
|
||||
}
|
||||
case SCTP_ASSOC_CHANGE:
|
||||
return sctp_rx_assoc_change(l, sn);
|
||||
};
|
||||
|
||||
hlog(LOG_ERR, "sctp_rx_notification: Received unexpected notification: %d", sn->sn_header.sn_type);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive something on an SCTP socket
|
||||
*/
|
||||
|
||||
|
||||
typedef union {
|
||||
struct sctp_initmsg init;
|
||||
struct sctp_sndrcvinfo sndrcvinfo;
|
||||
} _sctp_cmsg_data_t;
|
||||
|
||||
typedef union {
|
||||
struct sockaddr_storage ss;
|
||||
struct sockaddr_in v4;
|
||||
struct sockaddr_in6 v6;
|
||||
struct sockaddr sa;
|
||||
} sockaddr_storage_t;
|
||||
|
||||
static void accept_sctp(struct listen_t *l)
|
||||
{
|
||||
int e;
|
||||
struct msghdr inmsg;
|
||||
char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
|
||||
sockaddr_storage_t msgname;
|
||||
struct iovec iov;
|
||||
char buf[2000];
|
||||
|
||||
/* space to receive data */
|
||||
iov.iov_base = buf;
|
||||
iov.iov_len = sizeof(buf);
|
||||
inmsg.msg_flags = 0;
|
||||
inmsg.msg_iov = &iov;
|
||||
inmsg.msg_iovlen = 1;
|
||||
/* or control messages */
|
||||
inmsg.msg_control = incmsg;
|
||||
inmsg.msg_controllen = sizeof(incmsg);
|
||||
inmsg.msg_name = &msgname;
|
||||
inmsg.msg_namelen = sizeof(msgname);
|
||||
|
||||
e = recvmsg(l->fd, &inmsg, MSG_WAITALL);
|
||||
if (e < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
hlog(LOG_DEBUG, "accept_sctp: EAGAIN");
|
||||
return;
|
||||
}
|
||||
|
||||
hlog(LOG_INFO, "accept_sctp: recvmsg returned %d: %s", e, strerror(errno));
|
||||
}
|
||||
|
||||
if (inmsg.msg_flags & MSG_NOTIFICATION) {
|
||||
hlog(LOG_DEBUG, "accept_sctp: got MSG_NOTIFICATION");
|
||||
int associd = sctp_rx_notification(l, &inmsg);
|
||||
} else {
|
||||
hlog_packet(LOG_DEBUG, iov.iov_base, e, "accept_sctp: got data: ");
|
||||
}
|
||||
|
||||
}
|
||||
#endif
|
||||
|
||||
|
||||
#endif
|
||||
|
|
|
|||
|
|
@ -153,7 +153,7 @@
|
|||
<tr ng-repeat="c in status.uplinks">
|
||||
<td class='username'>{{ c.username }}</td>
|
||||
<td><a ng-href="{{ server_status_href(c) }}">{{ c.addr_rem }}</a></td>
|
||||
<td>{{ c.mode }}</td>
|
||||
<td>{{ c.mode }} <span ng-show="c.proto == 'sctp'">SCTP</span></td>
|
||||
<td>{{ c.t_connect | datetime }}</td>
|
||||
<td>{{ c.since_connect | duration }}</td>
|
||||
<td>{{ c.since_last_read | duration }}</td>
|
||||
|
|
@ -212,6 +212,7 @@
|
|||
<td><a ng-show="linkable(c)" ng-href="{{ server_status_href(c) }}">{{ c.addr_rem }}</a>
|
||||
<span ng-show="!linkable(c)">{{ c.addr_rem }}</span>
|
||||
<span ng-show="c.udp_downstream">+UDP</span>
|
||||
<span ng-show="c.proto == 'sctp'">SCTP</span>
|
||||
</td>
|
||||
<td><span ng-show='c.verified == 0' class='red'>No</span>
|
||||
<span ng-show='c.verified == 1'>Yes</span>
|
||||
|
|
|
|||
11
src/worker.c
11
src/worker.c
|
|
@ -960,7 +960,7 @@ static char *client_buffer_outgoing_allocate(struct client_t *c, int len)
|
|||
return c->obuf + c->obuf_end;
|
||||
}
|
||||
|
||||
static int client_buffer_outgoing_data(struct worker_t *self, struct client_t *c, char *p, int len)
|
||||
int client_buffer_outgoing_data(struct worker_t *self, struct client_t *c, char *p, int len)
|
||||
{
|
||||
char *buf_tail = client_buffer_outgoing_allocate(c, len);
|
||||
|
||||
|
|
@ -1662,7 +1662,7 @@ static void send_keepalives(struct worker_t *self)
|
|||
c->obuf_flushsize = flushlevel;
|
||||
} else {
|
||||
/* just fush if there was anything to write */
|
||||
if (c->ai_protocol == IPPROTO_TCP) {
|
||||
if (c->ai_protocol == IPPROTO_TCP || c->ai_protocol == IPPROTO_SCTP) {
|
||||
rc = c->write(self, c, buf, 0);
|
||||
if (rc < -2) continue; // destroyed..
|
||||
}
|
||||
|
|
@ -2116,8 +2116,8 @@ static struct cJSON *worker_client_json(struct client_t *c, int liveup_info)
|
|||
if (c->ibuf_end > 0) {
|
||||
s = hex_encode(c->ibuf, c->ibuf_end);
|
||||
cJSON_AddStringToObject(jc, "ibuf", s);
|
||||
hlog(LOG_DEBUG, "Encoded ibuf %d bytes: '%.*s'", c->ibuf_end, c->ibuf_end, c->ibuf);
|
||||
hlog(LOG_DEBUG, "Hex: %s", s);
|
||||
//hlog(LOG_DEBUG, "Encoded ibuf %d bytes: '%.*s'", c->ibuf_end, c->ibuf_end, c->ibuf);
|
||||
//hlog(LOG_DEBUG, "Hex: %s", s);
|
||||
hfree(s);
|
||||
}
|
||||
|
||||
|
|
@ -2193,6 +2193,9 @@ static struct cJSON *worker_client_json(struct client_t *c, int liveup_info)
|
|||
|
||||
cJSON_AddStringToObject(jc, "mode", mode);
|
||||
}
|
||||
|
||||
if (c->ai_protocol == IPPROTO_SCTP)
|
||||
cJSON_AddStringToObject(jc, "proto", "sctp");
|
||||
|
||||
#ifdef USE_SSL
|
||||
if (c->cert_subject[0])
|
||||
|
|
|
|||
|
|
@ -491,6 +491,7 @@ extern void pbuf_dump(FILE *fp);
|
|||
extern void pbuf_dupe_dump(FILE *fp);
|
||||
|
||||
extern int client_postread(struct worker_t *self, struct client_t *c, int r);
|
||||
extern int client_buffer_outgoing_data(struct worker_t *self, struct client_t *c, char *p, int len);
|
||||
|
||||
extern int client_printf(struct worker_t *self, struct client_t *c, const char *fmt, ...);
|
||||
extern int client_write(struct worker_t *self, struct client_t *c, char *p, int len);
|
||||
|
|
@ -507,6 +508,8 @@ extern void workers_start(void);
|
|||
extern int keepalive_interval;
|
||||
extern int fileno_limit;
|
||||
|
||||
extern int obuf_writes_threshold;
|
||||
|
||||
extern struct client_udp_t *udpclients;
|
||||
extern struct client_udp_t *udppeers;
|
||||
extern void client_udp_free(struct client_udp_t *u);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
|
||||
# sctp hub
|
||||
|
||||
ServerId TESTING-1
|
||||
PassCode 31421
|
||||
MyEmail email@example.com
|
||||
MyAdmin "Admin, N0CALL"
|
||||
|
||||
### Directories #########
|
||||
# Data directory (for database files)
|
||||
RunDir data
|
||||
|
||||
### Intervals #########
|
||||
# Interval specification format examples:
|
||||
# 600 (600 seconds), 5m, 2h, 1h30m, 1d3h15m24s, etc...
|
||||
|
||||
# When no data is received from an upstream server in N seconds, switch to
|
||||
# another server
|
||||
UpstreamTimeout 90s
|
||||
|
||||
# When no data is received from a downstream server in N seconds, disconnect
|
||||
ClientTimeout 48h
|
||||
|
||||
### TCP listener ##########
|
||||
# Listen <socketname> <porttype> tcp <address to bind> <port>
|
||||
# socketname: any name you wish to show up in logs and statistics
|
||||
# porttype: one of:
|
||||
# fullfeed - everything, after dupe filtering
|
||||
# dupefeed - everything that comes in - with dupes!
|
||||
# msgonly - messages only
|
||||
# userfilter - user-specified filters
|
||||
#
|
||||
Listen "Full feed" fullfeed tcp ::0 55152 acl "cfg-aprsc/acl-all.acl"
|
||||
Listen "Full feed SCTP" fullfeed sctp ::0 55152 acl "cfg-aprsc/acl-all.acl"
|
||||
Listen "Igate port" igate tcp 0.0.0.0 55582 acl "cfg-aprsc/acl-all.acl"
|
||||
|
||||
### HTTP listener ##########
|
||||
# Status port provides a status view to web browsers.
|
||||
# It starts up by default on 0.0.0.0:14501.
|
||||
HTTPStatus 127.0.0.1 55501
|
||||
|
||||
|
||||
|
|
@ -0,0 +1,45 @@
|
|||
|
||||
# sctp hub
|
||||
|
||||
ServerId TESTING-2
|
||||
PassCode 31421
|
||||
MyEmail email@example.com
|
||||
MyAdmin "Admin, N0CALL"
|
||||
|
||||
### Directories #########
|
||||
# Data directory (for database files)
|
||||
RunDir data
|
||||
|
||||
### Intervals #########
|
||||
# Interval specification format examples:
|
||||
# 600 (600 seconds), 5m, 2h, 1h30m, 1d3h15m24s, etc...
|
||||
|
||||
# When no data is received from an upstream server in N seconds, switch to
|
||||
# another server
|
||||
UpstreamTimeout 90s
|
||||
|
||||
# When no data is received from a downstream server in N seconds, disconnect
|
||||
ClientTimeout 48h
|
||||
|
||||
### TCP/SCTP listener ##########
|
||||
# Listen <socketname> <porttype> tcp <address to bind> <port>
|
||||
# socketname: any name you wish to show up in logs and statistics
|
||||
# porttype: one of:
|
||||
# fullfeed - everything, after dupe filtering
|
||||
# dupefeed - everything that comes in - with dupes!
|
||||
# msgonly - messages only
|
||||
# userfilter - user-specified filters
|
||||
#
|
||||
Listen "Full feed" fullfeed tcp ::0 55153 acl "cfg-aprsc/acl-all.acl"
|
||||
Listen "Full feed SCTP" fullfeed sctp ::0 55153 acl "cfg-aprsc/acl-all.acl"
|
||||
Listen "Igate port" igate tcp 0.0.0.0 55583 acl "cfg-aprsc/acl-all.acl"
|
||||
|
||||
### SCTP uplink
|
||||
Uplink fullsctp full sctp 127.0.0.1 55152
|
||||
|
||||
### HTTP listener ##########
|
||||
# Status port provides a status view to web browsers.
|
||||
# It starts up by default on 0.0.0.0:14501.
|
||||
HTTPStatus 127.0.0.1 55502
|
||||
|
||||
|
||||
|
|
@ -27,7 +27,6 @@ my %products = (
|
|||
'cfgdir' => 'cfg-aprsc',
|
||||
'pidfile' => 'logs/aprsc.pid',
|
||||
'env' => { 'APRSC_NO_VERSION_REPORT' => '1' }
|
||||
|
||||
},
|
||||
'javap' => {
|
||||
'binary' => './javaprssrvr/java',
|
||||
|
|
@ -48,11 +47,11 @@ my %products = (
|
|||
}
|
||||
);
|
||||
|
||||
sub new($$)
|
||||
sub new($$;$)
|
||||
{
|
||||
my($class, $config) = @_;
|
||||
my $self = bless { @_ }, $class;
|
||||
|
||||
my($class, $config, $instance) = @_;
|
||||
my $self = bless { }, $class;
|
||||
|
||||
if (defined $ENV{'TEST_PRODUCT'}) {
|
||||
$self->{'prod_name'} = $ENV{'TEST_PRODUCT'};
|
||||
} else {
|
||||
|
|
@ -78,6 +77,14 @@ sub new($$)
|
|||
|
||||
$self->{'cmdline'} = $prod->{'binary'} . ' ' . $prod->{'stdargs'} . ' '
|
||||
. $prod->{'cfgfileargs'} . ' ' . $cfgfile;
|
||||
|
||||
if (defined $prod->{'pidfile'}) {
|
||||
$self->{'pidfile'} = $prod->{'pidfile'};
|
||||
}
|
||||
if (defined $instance) {
|
||||
$self->{'pidfile'} = $instance . '.pid';
|
||||
$self->{'cmdline'} .= " -p " . $self->{'pidfile'};
|
||||
}
|
||||
|
||||
$self->{'error'} = 'No errors yet';
|
||||
|
||||
|
|
@ -101,8 +108,8 @@ sub start($)
|
|||
return "Product already running.";
|
||||
}
|
||||
|
||||
if (defined $self->{'prod'}->{'pidfile'}) {
|
||||
my $pf = $self->{'prod'}->{'pidfile'};
|
||||
if (defined $self->{'pidfile'}) {
|
||||
my $pf = $self->{'pidfile'};
|
||||
if (open(PF, $pf)) {
|
||||
my $pl = <PF>;
|
||||
close(PF);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,181 @@
|
|||
|
||||
#
|
||||
# Test SCTP, only on aprsc
|
||||
#
|
||||
|
||||
use Test;
|
||||
|
||||
BEGIN {
|
||||
plan tests => (!defined $ENV{'TEST_PRODUCT'} || $ENV{'TEST_PRODUCT'} =~ /aprsc/) ? 4 + 1 + 4 + 3 + 2 : 0;
|
||||
};
|
||||
|
||||
if (defined $ENV{'TEST_PRODUCT'} && $ENV{'TEST_PRODUCT'} !~ /aprsc/) {
|
||||
exit(0);
|
||||
}
|
||||
|
||||
use runproduct;
|
||||
use LWP;
|
||||
use LWP::UserAgent;
|
||||
use HTTP::Request::Common;
|
||||
use JSON::XS;
|
||||
use Ham::APRS::IS;
|
||||
use istest;
|
||||
use Data::Dumper;
|
||||
|
||||
# set up the JSON module
|
||||
my $json = new JSON::XS;
|
||||
|
||||
if (!$json) {
|
||||
die "JSON loading failed";
|
||||
}
|
||||
|
||||
$json->latin1(0);
|
||||
$json->ascii(1);
|
||||
$json->utf8(0);
|
||||
|
||||
# set up http client ############
|
||||
|
||||
my $ua = LWP::UserAgent->new;
|
||||
|
||||
$ua->agent(
|
||||
agent => "httpaprstester/1.0",
|
||||
timeout => 10,
|
||||
max_redirect => 0,
|
||||
);
|
||||
|
||||
# set up two aprsc processes, the leaf connects to the hub with SCTP
|
||||
my $phub = new runproduct('sctp-hub', 'hub');
|
||||
my $pleaf = new runproduct('sctp-leaf', 'leaf');
|
||||
|
||||
ok(defined $phub, 1, "Failed to initialize product runner");
|
||||
ok(defined $pleaf, 1, "Failed to initialize product runner");
|
||||
ok($phub->start(), 1, "Failed to start hub");
|
||||
ok($pleaf->start(), 1, "Failed to start leaf");
|
||||
|
||||
# wait for the leaf to connect to the hub
|
||||
my $timeout_at = time() + 10;
|
||||
|
||||
my $uplink;
|
||||
|
||||
while (time() < $timeout_at) {
|
||||
# first get the status page without any traffic
|
||||
$res = $ua->simple_request(HTTP::Request::Common::GET("http://127.0.0.1:55502/status.json"));
|
||||
if ($res->code ne 200) {
|
||||
continue;
|
||||
}
|
||||
my $j1 = $json->decode($res->decoded_content(charset => 'none'));
|
||||
if (!defined $j1) {
|
||||
continue;
|
||||
}
|
||||
if (!defined $j1->{'uplinks'} || length($j1->{'uplinks'}) < 1) {
|
||||
continue;
|
||||
}
|
||||
$uplink = $j1->{'uplinks'}->[0];
|
||||
if ($uplink->{'username'} eq 'TESTING-1') {
|
||||
last;
|
||||
}
|
||||
undef $uplink;
|
||||
sleep(1);
|
||||
}
|
||||
|
||||
ok(defined $uplink, 1, "Leaf server failed to establish an SCTP uplink with hub");
|
||||
|
||||
# establish connections
|
||||
|
||||
my $login_tx = "N0GAT";
|
||||
my $i_tx = new Ham::APRS::IS("localhost:55583", $login_tx);
|
||||
ok(defined $i_tx, 1, "Failed to initialize Ham::APRS::IS");
|
||||
|
||||
my $login_rx = "N1GAT";
|
||||
my $i_rx = new Ham::APRS::IS("localhost:55152", $login_rx);
|
||||
ok(defined $i_rx, 1, "Failed to initialize Ham::APRS::IS");
|
||||
|
||||
$ret = $i_rx->connect('retryuntil' => 8);
|
||||
ok($ret, 1, "Failed to connect to the server: " . $i_rx->{'error'});
|
||||
|
||||
my $ret;
|
||||
$ret = $i_tx->connect('retryuntil' => 8);
|
||||
ok($ret, 1, "Failed to connect to the server: " . $i_tx->{'error'});
|
||||
|
||||
############################################
|
||||
|
||||
my $flush_interval = 300;
|
||||
my $bytelimit = 4*1024*1024;
|
||||
my $window = 64*1024;
|
||||
#my $window = 2*1024;
|
||||
my $outstanding = 0;
|
||||
my $txn = 0;
|
||||
my $rxn = 0;
|
||||
my $txl = 0;
|
||||
my $rxl = 0;
|
||||
my @l = ();
|
||||
my $txq = '';
|
||||
my $txq_l = 0;
|
||||
|
||||
my $start_t = time();
|
||||
|
||||
while ($txl < $bytelimit) {
|
||||
$s = "M" . ($txn % 10000 + 10) . ">APRS,qAR,$login_tx:!6028.51N/02505.68E# packet $txn blaa blaa blaa blaa END";
|
||||
push @l, $s;
|
||||
$s .= "\r\n";
|
||||
my $sl = length($s);
|
||||
$txl += $sl;
|
||||
$txq_l += $sl;
|
||||
$txq .= $s;
|
||||
$txn++;
|
||||
|
||||
if ($txq_l >= $flush_interval) {
|
||||
$i_tx->sendline($txq, 1);
|
||||
$outstanding += $txq_l;
|
||||
$txq_l = 0;
|
||||
$txq = '';
|
||||
}
|
||||
|
||||
while (($outstanding > $window) && (my $rx = $i_rx->getline_noncomment(1))) {
|
||||
my $exp = shift @l;
|
||||
if ($exp ne $rx) {
|
||||
warn "Ouch, received wrong packet: $rx\nExpected: $exp";
|
||||
}
|
||||
my $rx_l = length($rx) + 2;
|
||||
$outstanding -= $rx_l;
|
||||
$rxn++;
|
||||
$rxl += $rx_l;
|
||||
}
|
||||
}
|
||||
|
||||
if ($txq_l > 0) {
|
||||
warn "flushing the rest\n";
|
||||
$i_tx->sendline($txq, 1);
|
||||
$outstanding += $txq_l;
|
||||
$txq_l = 0;
|
||||
$txq = 0;
|
||||
}
|
||||
|
||||
warn "reading the rest, have received $rxn packets, sent $txn\n";
|
||||
while (($outstanding > 0) && (my $rx = $i_rx->getline_noncomment(2))) {
|
||||
my $exp = shift @l;
|
||||
if ($exp ne $rx) {
|
||||
warn "Ouch, received wrong packet: $rx\n";
|
||||
}
|
||||
my $rx_l = length($rx) + 2;
|
||||
$outstanding -= $rx_l;
|
||||
$rxn++;
|
||||
$rxl += $rx_l;
|
||||
}
|
||||
warn "after reading the rest, have received $rxn packets, sent $txn, outstanding $outstanding bytes\n";
|
||||
|
||||
$end_t = time();
|
||||
$dur_t = $end_t - $start_t;
|
||||
|
||||
warn sprintf("took %.3f seconds, %.0f packets/sec\n", $dur_t, $rxn / $dur_t);
|
||||
|
||||
ok($rxn, $txn, "Received wrong number of lines from blob");
|
||||
ok($rxl, $txl, "Received wrong number of bytes from blob");
|
||||
ok($outstanding, 0, "There are outstanding bytes in the server after timeout");
|
||||
|
||||
# stop
|
||||
|
||||
ok($phub->stop(), 1, "Failed to stop hub");
|
||||
ok($pleaf->stop(), 1, "Failed to stop leaf");
|
||||
|
||||
|
||||
Loading…
Reference in New Issue