Merge pull request #70 from hessu/feature/uplink-nonblock

uplink: Less blocking of worker threads
This commit is contained in:
Heikki Hannikainen 2022-10-23 02:53:40 +03:00 committed by GitHub
commit b3b66b9d47
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 72 additions and 49 deletions

View File

@ -42,13 +42,17 @@
#include "filter.h"
#include "ssl.h"
#define MAX_UPLINKS 32
int uplink_reconfiguring;
int uplink_shutting_down;
/* currently-connected uplinks */
struct uplink_client_t {
struct client_t *client;
struct uplink_client_t *next;
struct uplink_client_t **prevp;
} *uplink_clients;
pthread_mutex_t uplink_client_mutex = PTHREAD_MUTEX_INITIALIZER;
struct client_t *uplink_client[MAX_UPLINKS];
int uplink_running;
pthread_t uplink_th;
@ -62,6 +66,18 @@ struct portaccount_t uplink_connects = {
.refcount = 99, /* Global static blocks have extra-high initial refcount */
};
void uplink_client_free(struct uplink_client_t *uc)
{
if (!uc)
return;
*uc->prevp = uc->next;
if (uc->next)
uc->next->prevp = uc->prevp;
hfree(uc);
}
/*
* Close uplinking sockets
*/
@ -77,11 +93,10 @@ void close_uplinkers(void)
return;
}
int i;
for (i = 0; i < MAX_UPLINKS; i++) {
if ((uplink_client[i]) && uplink_client[i]->fd >= 0) {
hlog( LOG_DEBUG, "Closing uplinking socket %d (fd %d) %s ...", i, uplink_client[i]->fd, uplink_client[i]->addr_rem );
shutdown(uplink_client[i]->fd, SHUT_RDWR);
for (struct uplink_client_t *uc = uplink_clients; (uc); uc = uc->next) {
if (uc->client && uc->client->fd >= 0) {
hlog( LOG_DEBUG, "Closing uplinking socket (fd %d) %s ...", uc->client->fd, uc->client->addr_rem );
shutdown(uc->client->fd, SHUT_RDWR);
}
}
@ -100,12 +115,7 @@ void uplink_close(struct client_t *c, int errnum)
{
int rc;
hlog(LOG_INFO, "%s: Uplink [%d] has been closed: %s", c->addr_rem, c->uplink_index, aprsc_strerror(errnum));
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog(LOG_ERR, "uplink_close(): could not lock uplink_client_mutex: %s", strerror(rc));
return;
}
hlog(LOG_INFO, "%s: Uplink has been closed: %s", c->addr_rem, aprsc_strerror(errnum));
if ((rc = pthread_mutex_lock(& uplink_connects.mutex )))
hlog(LOG_ERR, "uplink_close: could not lock uplink_connects: %s", strerror(rc));
@ -113,6 +123,11 @@ void uplink_close(struct client_t *c, int errnum)
if ((rc = pthread_mutex_unlock(& uplink_connects.mutex )))
hlog(LOG_ERR, "uplink_close: could not unlock uplink_connects: %s", strerror(rc));
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog(LOG_ERR, "uplink_close(): could not lock uplink_client_mutex: %s", strerror(rc));
return;
}
struct uplink_config_t *l = uplink_config;
for (; l; l = l->next) {
if (l->client_ptr == (void *)c) {
@ -121,11 +136,16 @@ void uplink_close(struct client_t *c, int errnum)
l->client_ptr = NULL;
}
}
uplink_client[c->uplink_index] = NULL; // there can be only one!
for (struct uplink_client_t *uc = uplink_clients; (uc); uc = uc->next) {
if (uc->client == c) {
uplink_client_free(uc);
break;
}
}
if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog(LOG_ERR, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
hlog(LOG_ERR, "uplink_close(): could not unlock uplink_client_mutex: %s", strerror(rc));
return;
}
return;
@ -342,7 +362,7 @@ int config_uplink_ssl_setup(struct uplink_config_t *l)
int make_uplink(struct uplink_config_t *l)
{
int fd, i, addrc, arg;
int uplink_index;
struct uplink_client_t *uc = NULL;
union sockaddr_u sa; /* large enough for also IPv6 address */
socklen_t addr_len;
struct addrinfo *ai, *a, *ap[21];
@ -358,7 +378,7 @@ int make_uplink(struct uplink_config_t *l)
req.ai_protocol = IPPROTO_TCP;
req.ai_flags = AI_ADDRCONFIG;
ai = NULL;
#ifdef USE_SSL
/* SSL requires both a cert and a key, or none at all */
if ((l->certfile && !l->keyfile) || (l->keyfile && !l->certfile)) {
@ -377,16 +397,6 @@ int make_uplink(struct uplink_config_t *l)
}
#endif
/* find a free uplink slot */
for (uplink_index = 0; uplink_index < MAX_UPLINKS; uplink_index++) {
if (!uplink_client[uplink_index])
break;
}
if (uplink_index == MAX_UPLINKS) {
hlog(LOG_ERR, "Uplink %s: No available uplink slots, %d used", l->name, MAX_UPLINKS);
return -2;
}
if (strcasecmp(l->proto, "tcp") == 0) {
// well, do nothing for now.
} else if (strcasecmp(l->proto, "udp") == 0) {
@ -446,8 +456,8 @@ int make_uplink(struct uplink_config_t *l)
ap[i] = NULL;
addr_s = strsockaddr(a->ai_addr, a->ai_addrlen);
hlog(LOG_INFO, "Uplink %s: Connecting to %s:%s (%s) [link %d, addr %d/%d]",
l->name, l->host, l->port, addr_s, uplink_index, i+1, addrc);
hlog(LOG_INFO, "Uplink %s: Connecting to %s:%s (%s) [addr %d/%d]",
l->name, l->host, l->port, addr_s, i+1, addrc);
i++;
if (i == addrc)
i = 0;
@ -560,7 +570,6 @@ connerr:
}
l->client_ptr = (void *)c;
c->uplink_index = uplink_index;
c->fd = fd;
c->ai_protocol = req.ai_protocol;
c->state = CSTATE_INIT;
@ -600,11 +609,30 @@ connerr:
hfree(s);
hlog(LOG_INFO, "Uplink %s: %s: Connection established on fd %d using source address %s", l->name, c->addr_rem, c->fd, c->addr_loc);
if (set_client_sockopt(c) < 0)
goto err;
uplink_client[uplink_index] = c;
/* insert to the list of currently-connected uplinks */
int rc;
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog(LOG_ERR, "make_uplink(): could not lock uplink_client_mutex: %s", strerror(rc));
goto err;
}
uc = hmalloc(sizeof(*uc));
uc->client = c;
uc->next = uplink_clients;
uc->prevp = &uplink_clients;
if (uc->next)
uc->next->prevp = &uc->next;
uplink_clients = uc;
if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog(LOG_CRIT, "make_uplink(): could not unlock uplink_client_mutex: %s", strerror(rc));
goto err;
}
l->state = UPLINK_ST_CONNECTED;
/* set up SSL if necessary */
@ -635,7 +663,8 @@ connerr:
err:
client_free(c);
uplink_client[uplink_index] = NULL;
if (uc)
uplink_client_free(uc);
l->state = UPLINK_ST_NOT_LINKED;
return -1;
}
@ -696,11 +725,6 @@ void uplink_thread(void *asdf)
continue;
}
if ((rc = pthread_mutex_lock(&uplink_client_mutex))) {
hlog(LOG_ERR, "uplink_thread(): could not lock uplink_client_mutex: %s", strerror(rc));
continue;
}
/* Check if all we have a single regular uplink connection up, out of all
* the configured ones. Also, check that all the UPLINKMULTI links are
* connected.
@ -721,6 +745,8 @@ void uplink_thread(void *asdf)
has_uplink++;
avail_uplink++;
}
if (uplink_shutting_down)
break;
}
if (avail_uplink && !has_uplink) {
@ -744,11 +770,6 @@ void uplink_thread(void *asdf)
}
}
if ((rc = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog(LOG_CRIT, "close_uplinkers(): could not unlock uplink_client_mutex: %s", strerror(rc));
continue;
}
if (avail_uplink && !has_uplink) {
status_error(3600, "no_uplink");
uplink_error_set = 1;
@ -759,6 +780,9 @@ void uplink_thread(void *asdf)
}
}
if (uplink_shutting_down)
break;
/* sleep for 4 seconds between successful rounds */
for (rc = 0; (!uplink_shutting_down) && rc < 4000/200; rc++)
if (poll(NULL, 0, 200) == -1 && errno != EINTR)
@ -789,7 +813,7 @@ void uplink_start(void)
void uplink_stop(void)
{
int i, e;
int e;
if (!uplink_running)
return;
@ -812,8 +836,8 @@ void uplink_stop(void)
return;
}
for (i = 0; i < MAX_UPLINKS; i++)
uplink_client[i] = NULL;
while (uplink_clients)
uplink_client_free(uplink_clients);
free_uplink_config(&uplink_config);

View File

@ -297,7 +297,6 @@ struct client_t {
int fd;
int uplink_index; /* uplink array index */
int portnum;
int listener_id; /* which listener is this client connected to */
time_t connect_time;/* Time of connection, wallclock real time */