UDP client support, sort of. Reference documentation on it is unclear...

git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@235 3ce903b1-3385-4e86-93cd-f9a4a239f7ac
This commit is contained in:
Matti Aarnio 2008-03-24 03:44:09 +00:00
parent b1c885b9d8
commit b2561d6d0f
6 changed files with 240 additions and 15 deletions

View File

@ -47,6 +47,7 @@
#include "dupecheck.h"
#include "filter.h"
#include "login.h"
#include "incoming.h" /* incoming_handler prototype */
#include "uplink.h"
extern int uplink_simulator;
@ -115,7 +116,7 @@ int accept_sighandler(int signum)
}
/*
* Open the TCP listening socket
* Open the TCP/SCTP listening socket
*/
int open_tcp_listener(struct listen_t *l)
@ -131,10 +132,9 @@ int open_tcp_listener(struct listen_t *l)
}
arg = 1;
setsockopt(f, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg));
#ifdef SO_REUSEPORT
setsockopt(f, SOL_SOCKET, SO_REUSEPORT, (char *)&arg, sizeof(arg));
#else
setsockopt(f, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg));
#endif
if (bind(f, l->ai->ai_addr, l->ai->ai_addrlen)) {
@ -154,6 +154,144 @@ int open_tcp_listener(struct listen_t *l)
return f;
}
/*
* Open the UDP receiving socket
*/
int open_udp_listener(struct listen_t *l)
{
int arg;
int fd, i;
struct client_t *c;
union sockaddr_u sa; /* large enough for also IPv6 address */
socklen_t addr_len = sizeof(sa);
char eb[200];
char sbuf[20];
char *s;
if (udpclient) {
hlog(LOG_ERR, "Multiple UDP client listeners defined! %s %s", l->name, l->addr_s);
return -1;
}
hlog(LOG_INFO, "Binding listening UDP socket: %s", l->addr_s);
if ((fd = socket(l->ai->ai_family, l->ai->ai_socktype, l->ai->ai_protocol)) < 0) {
hlog(LOG_CRIT, "socket(): %s\n", strerror(errno));
return -1;
}
arg = 1;
setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, (char *)&arg, sizeof(arg));
#ifdef SO_REUSEPORT
setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, (char *)&arg, sizeof(arg));
#endif
memcpy( &sa, l->ai->ai_addr, l->ai->ai_addrlen );
if (bind(fd, l->ai->ai_addr, l->ai->ai_addrlen)) {
hlog(LOG_CRIT, "bind(%s): %s", l->addr_s, strerror(errno));
close(fd);
return -1;
}
eb[0] = '[';
eb[1] = 0;
*sbuf = 0;
getnameinfo( (struct sockaddr *)&sa, addr_len,
eb+1, sizeof(eb)-1, sbuf, sizeof(sbuf), NI_NUMERICHOST|NI_NUMERICSERV );
s = eb + strlen(eb);
sprintf(s, "]:%s", sbuf);
c = client_alloc();
c->fd = fd;
c->addr = sa;
c->state = CSTATE_UDP;
c->addr_s = hstrdup(eb);
c->keepalive = tick;
c->flags = l->clientflags;
/* use the default login handler */
c->handler = &incoming_handler; /* Not really used.. */
if (1) {
int len, arg;
/* Set bigger RCVBUF and SNDBUF size for the UDP port.. */
len = sizeof(arg);
arg = 128*1024;
i = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, &arg, len);
i = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, &arg, len);
}
for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) {
if (l->filters[i])
if (filter_parse(c, l->filters[i], 0) < 0) { /* system filters */
hlog(LOG_ERR, "Bad system filter definition: %s", l->filters[i]);
}
}
/* set non-blocking mode */
if (fcntl(c->fd, F_SETFL, O_NONBLOCK)) {
hlog(LOG_ERR, "%s - Failed to set non-blocking mode on socket: %s", l->addr_s, strerror(errno));
goto err;
}
// FIXME: Add this on special list of UDP sockets!
udpclient = c;
#if 0 // At this point in time, we do not have workers running!
static int next_receiving_worker;
struct worker_t *w;
struct worker_t *wc;
/* Use simple round-robin on client feeding. Least clients is
* quite attractive idea, but when clients arrive at huge bursts
* they tend to move in big bunches, and it takes quite some while
* before the worker updates its client-counters.
*/
for (i = 0, w = worker_threads; w ; w = w->next, ++i) {
if ( i >= next_receiving_worker) break;
}
wc = w;
if (! w) {
wc = worker_threads; // ran out of the worker chain, back to the first..
next_receiving_worker = 0; // and reset the index too
}
// in every case, increment the next receiver index for the next call.
++next_receiving_worker;
/* ok, found it... lock the new client queue */
hlog(LOG_DEBUG, "... passing UDP rx to thread %d", wc->id);
if ((i = pthread_mutex_lock(&wc->new_clients_mutex))) {
hlog(LOG_ERR, "open_udp_listener(): could not lock new_clients_mutex: %s", strerror(i));
goto err;
}
/* push the client in the worker's queue */
c->next = wc->new_clients;
c->prevp = &wc->new_clients;
if (c->next)
c->next->prevp = &c->next;
wc->new_clients = c;
/* unlock the queue */
if ((i = pthread_mutex_unlock(&wc->new_clients_mutex))) {
hlog(LOG_ERR, "open_udp_listener(): could not unlock new_clients_mutex: %s", strerror(i));
goto err;
}
#endif
return fd;
err:
client_free(c);
return -1;
}
int open_listeners(void)
{
struct listen_config_t *lc;
@ -163,6 +301,8 @@ int open_listeners(void)
int opened = 0, i;
for (lc = listen_config; (lc); lc = lc->next) {
l = listener_alloc();
l->clientflags = lc->client_flags;
@ -182,7 +322,15 @@ int open_listeners(void)
l->addr_s = hstrdup(eb);
l->name = hstrdup(lc->name);
if (open_tcp_listener(l) >= 0) {
if (l->ai->ai_socktype == SOCK_DGRAM &&
l->ai->ai_protocol == IPPROTO_UDP) {
/* UDP listenting is not quite same as TCP listening.. */
i = open_udp_listener(l);
} else {
/* TCP listenting... */
i = open_tcp_listener(l);
}
if (i >= 0) {
opened++;
hlog(LOG_DEBUG, "... ok, bound");
} else {
@ -244,7 +392,6 @@ struct client_t *do_accept(struct listen_t *l)
static int next_receiving_worker;
struct worker_t *w;
struct worker_t *wc;
int client_min = -1;
static time_t last_EMFILE_report;
if ((fd = accept(l->fd, (struct sockaddr*)&sa, &addr_len)) < 0) {
@ -359,6 +506,7 @@ struct client_t *do_accept(struct listen_t *l)
* is _exactly_ fair.
*/
int client_min = -1;
for (wc = w = worker_threads; (w); w = w->next)
if (w->client_count < client_min || client_min == -1) {
wc = w;

View File

@ -70,7 +70,13 @@ Listen "Regional East Coast US Only Feed" fullfeed tcp 0.0.0.0 24579 -t
#
# Note: service on port 23 _requires_ this program to run as _root_
#
# Note on UDP: the CONTROL connection on TCP socket defines filters. Not the UDP talker itself!
# That is: Control connect on 10157 with UDP port definition will send you APRS
# packets over UDP with positions only within 100 km of yourself!
# (This may or may not match javAPRSSrvr)
#
Listen "Full feed with CWOP" fullfeed tcp 0.0.0.0 23 t/c*
Listen "Full feed with CWOP, UDP" fullfeed udp 0.0.0.0 23000 t/c*
Listen "Messages only" messageonly tcp 0.0.0.0 1314 t/m
Listen "Full feed with CWOP" fullfeed tcp 0.0.0.0 10152 t/c*
Listen "Full feed with CWOP and duplicates" dupefeed tcp ::1 10160 t/c*
@ -89,7 +95,7 @@ Listen "Regional central USA: 49N 107W x 25N 86W" fullfeed tcp 0.0.0.0 14579
#Uplink "Uplink" rw tcp finland.aprs2.net 10152
#Uplink "Uplink" ro tcp finland.aprs2.net 10152
#Listen "uplinksim" uplinksim tcp 127.0.0.1 10190
#Listen "Uplinksim" uplinksim tcp 127.0.0.1 10190
### Internals ############
# The number of worker threads to run - set this to the number of
@ -102,7 +108,7 @@ Listen "Regional central USA: 49N 107W x 25N 86W" fullfeed tcp 0.0.0.0 14579
## to the kernel, the less work each system call must do - search
## for your balance, too much threads has its own penalties.
##
WorkerThreads 20
WorkerThreads 4
# When running this server as super-user, the server can (in many systems)
# increase several resource limits, and do other things that less privileged
@ -110,6 +116,7 @@ WorkerThreads 20
#
# The FileLimit is resource limit on how many simultaneous connections and
# some other internal resources the system can use at the same time.
# If the server is not being run as super-user, this setting has no effect.
# If the server is not being run as super-user, this setting has no effect
# in case it is above what normal user can set.
#
FileLimit 10000

View File

@ -312,7 +312,8 @@ void incoming_flush(struct worker_t *self)
self->pbuf_incoming_count += self->pbuf_incoming_local_count;
pthread_mutex_unlock(&self->pbuf_incoming_mutex);
// hlog(LOG_DEBUG, "incoming_flush() sent out %d packets, incoming_count %d", self->pbuf_incoming_local_count, incoming_count);
// hlog( LOG_DEBUG, "incoming_flush() sent out %d packets, incoming_count %d",
// self->pbuf_incoming_local_count, incoming_count );
/* clean the local lockfree queue */
self->pbuf_incoming_local = NULL;

View File

@ -121,10 +121,23 @@ int login_handler(struct worker_t *self, struct client_t *c, char *s, int len)
break;
}
c->udp_port = atoi(argv[i]);
if (c->udp_port < 1 || c->udp_port > 65535 || c->udp_port == 53) {
if (c->udp_port < 1024 || c->udp_port > 65535) {
hlog(LOG_WARNING, "%s (%s): UDP port number %s is out of range", c->addr_s, username, argv[i]);
c->udp_port = 0;
}
c->udpaddr = c->addr;
if (c->udpaddr.sa.sa_family == AF_INET) {
c->udpaddr.si.sin_port = htons(c->udp_port);
c->udpaddrlen = sizeof(c->udpaddr.si);
} else {
c->udpaddr.si6.sin6_port = htons(c->udp_port);
c->udpaddrlen = sizeof(c->udpaddr.si6);
}
// FIXME: Find the client UDP service socket ??
// FIXME: or is it always the same ? udpclient ??
} else if (strcasecmp(argv[i], "filter") == 0) {
if (!(c->flags & CLFLAGS_USERFILTEROK)) {
return client_printf(self, c, "# No user-specified filters on this port\r\n");

View File

@ -46,8 +46,11 @@ time_t tick; /* real monotonous clock, may or may not be wallclock */
extern int ibuf_size;
struct worker_t *worker_threads = NULL;
int workers_running = 0;
struct worker_t *worker_threads;
struct client_t *udpclient; /* single listening/receiving UDP client socket */
struct client_t *udppeer; /* single listening/receiving UDP peer socket */
int workers_running;
int sock_write_expire = 60; /* 60 seconds OK ? */
int keepalive_interval = 20; /* 20 seconds for individual socket, NOT all in sync! */
int keepalive_poll_freq = 2; /* keepalive analysis scan interval */
@ -172,6 +175,21 @@ int client_write(struct worker_t *self, struct client_t *c, char *p, int len)
*/
c->obuf_writes++;
if (c->udp_port && udpclient && len > 0) {
int i;
i = sendto( udpclient->fd, p, len, MSG_DONTWAIT,
(struct sockaddr *)&c->udpaddr, c->udpaddrlen );
hlog(LOG_DEBUG, "UDP to client port %d, sendto rc=%d, errno %s",
c->udp_port, i, strerror(errno));
// FIXME: UDP write statistics !
if (*p != '#') // Send keepalive also through the tcp control socket..
return i;
}
if (c->state == CSTATE_UDP)
return 0;
if (c->obuf_end + len > c->obuf_size) {
/* Oops, cannot append the data to the output buffer.
* Check if we can make space for it by moving data
@ -337,8 +355,20 @@ int handle_client_readable(struct worker_t *self, struct client_t *c)
close_client(self, c);
return -1;
}
r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
if (c->state == CSTATE_UDP) {
union sockaddr_u sa;
socklen_t fromlen = sizeof(sa);
int freelen = c->ibuf_size - c->ibuf_end;
// FIXME: verify: freelen > X (512 ?)
r = recvfrom( c->fd, c->ibuf + c->ibuf_end, freelen, MSG_DONTWAIT,
(struct sockaddr *) & sa, & fromlen );
// FIXME: received via UDP, now verify source IP address and port!
// FIXME: verify that received packet had CRLF at the tail!
} else {
r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
}
if (r == 0) {
hlog(LOG_DEBUG, "read: EOF from client fd %d (%s)", c->fd, c->addr_s);
close_client(self, c);
@ -551,7 +581,7 @@ void send_keepalives(struct worker_t *self)
rc = client_write(self, c, buf, 0);
if (rc < -2) continue; // destroyed..
}
if (c->obuf_wtime < w_expire) {
if (c->obuf_wtime < w_expire && c->state != CSTATE_UDP) {
// TOO OLD! Shutdown the client
hlog( LOG_DEBUG,"Closing client %p fd %d (%s) due to obuf wtime timeout",
c, c->fd, c->addr_s );
@ -606,7 +636,27 @@ void worker_thread(struct worker_t *self)
pthread_sigmask(SIG_BLOCK, &sigs_to_block, NULL);
hlog(LOG_DEBUG, "Worker %d started.", self->id);
if (self->id == 1 && udpclient) { /* Worker ZERO picks up the udpclient, if any */
int pe;
/* lock the queue */
if ((pe = pthread_mutex_lock(&self->new_clients_mutex))) {
hlog(LOG_ERR, "collect_new_clients(worker %d): could not lock new_clients_mutex: %s", self->id, strerror(pe));
return;
}
/* Put the udp-client on head of the new client list.. */
udpclient->next = self->new_clients;
self->new_clients = udpclient;
/* unlock */
if ((pe = pthread_mutex_unlock(&self->new_clients_mutex))) {
hlog(LOG_ERR, "collect_new_clients(worker %d): could not unlock new_clients_mutex: %s", self->id, strerror(pe));
/* we'd going to deadlock here... */
exit(1);
}
}
while (!self->shutting_down) {
//hlog(LOG_DEBUG, "Worker %d checking for clients...", self->id);
t1 = tick;

View File

@ -129,6 +129,7 @@ extern struct pbuf_t **pbuf_global_dupe_prevp;
/* a network client */
typedef enum {
CSTATE_UDP,
CSTATE_LOGIN,
CSTATE_CONNECTED,
CSTATE_COREPEER
@ -151,6 +152,9 @@ struct client_t {
struct client_t **prevp;
union sockaddr_u addr;
union sockaddr_u udpaddr;
int udpaddrlen;
int fd;
int udp_port;
char *addr_s; /* client IP address in text format */
@ -280,4 +284,6 @@ extern void workers_start(void);
extern int keepalive_interval;
extern int fileno_limit;
extern struct client_t *udpclient;
#endif