Live upgrade: fix UDP, pass ibuf/obuf, and filters.

Fix a crash in shutdown with Uplinks.
This commit is contained in:
Heikki Hannikainen 2012-10-19 22:35:37 +03:00
parent 5700ceebcc
commit 34e05320ba
7 changed files with 186 additions and 27 deletions

View File

@ -923,6 +923,7 @@ static int accept_liveupgrade_single(cJSON *client)
cJSON *fd, *listener_id, *username, *t_connect;
cJSON *state;
cJSON *addr_loc;
cJSON *udp_port;
cJSON *app_name, *app_version;
cJSON *verified;
cJSON *obuf_q;
@ -930,6 +931,7 @@ static int accept_liveupgrade_single(cJSON *client)
cJSON *pkts_rx, *pkts_tx, *pkts_ign;
cJSON *rx_errs;
cJSON *filter;
cJSON *ibuf, *obuf;
unsigned addr_len;
union sockaddr_u sa;
@ -939,6 +941,7 @@ static int accept_liveupgrade_single(cJSON *client)
username = cJSON_GetObjectItem(client, "username");
t_connect = cJSON_GetObjectItem(client, "t_connect");
addr_loc = cJSON_GetObjectItem(client, "addr_loc");
udp_port = cJSON_GetObjectItem(client, "udp_port");
app_name = cJSON_GetObjectItem(client, "app_name");
app_version = cJSON_GetObjectItem(client, "app_version");
verified = cJSON_GetObjectItem(client, "verified");
@ -950,8 +953,11 @@ static int accept_liveupgrade_single(cJSON *client)
pkts_ign = cJSON_GetObjectItem(client, "pkts_ign");
rx_errs = cJSON_GetObjectItem(client, "rx_errs");
filter = cJSON_GetObjectItem(client, "filter");
ibuf = cJSON_GetObjectItem(client, "ibuf");
obuf = cJSON_GetObjectItem(client, "obuf");
if (!((fd) && (listener_id) && (state) && (username) && (t_connect) && (addr_loc) && (app_name) && (app_version)
if (!((fd) && (listener_id) && (state) && (username) && (t_connect)
&& (addr_loc) && (app_name) && (app_version)
&& (verified) && (obuf_q) && (bytes_rx) && (bytes_tx)
&& (pkts_tx) && (pkts_rx) && (pkts_ign) && (rx_errs) && (filter))) {
hlog(LOG_ERR, "Live upgrade: Fields missing from client JSON");
@ -1016,6 +1022,50 @@ static int accept_liveupgrade_single(cJSON *client)
login_set_app_name(c, app_name->valuestring, app_version->valuestring);
// handle client's filter setting
if (c->flags & CLFLAGS_USERFILTEROK) {
// archive a copy of the filters, for status display
strncpy(c->filter_s, filter->valuestring, FILTER_S_SIZE);
c->filter_s[FILTER_S_SIZE-1] = 0;
sanitize_ascii_string(c->filter_s);
char *f = hstrdup(filter->valuestring);
filter_parse(c, f, 1);
hfree(f);
}
// set up UDP downstream if necessary
if (udp_port && udp_port->valueint > 1024 && udp_port->valueint < 65536) {
if (login_setup_udp_feed(c, udp_port->valueint) != 0) {
hlog(LOG_DEBUG, "%s/%s: Requested UDP on client port with no UDP configured", c->addr_rem, c->username);
}
}
// fill up ibuf
if (ibuf && ibuf->valuestring) {
int l = hex_decode(c->ibuf, c->ibuf_size, ibuf->valuestring);
if (l < 0) {
hlog(LOG_ERR, "Live upgrade: %s/%s: Failed to decode ibuf: %s", c->addr_rem, c->username, ibuf->valuestring);
} else {
c->ibuf_end = l;
hlog(LOG_DEBUG, "Live upgrade: Decoded ibuf %d bytes: '%.*s'", l, l, c->ibuf);
hlog(LOG_DEBUG, "Hex: %s", ibuf->valuestring);
}
}
// fill up obuf
if (obuf && obuf->valuestring) {
int l = hex_decode(c->obuf, c->obuf_size, obuf->valuestring);
if (l < 0) {
hlog(LOG_ERR, "Live upgrade: %s/%s: Failed to decode obuf: %s", c->addr_rem, c->username, obuf->valuestring);
} else {
c->obuf_start = 0;
c->obuf_end = l;
hlog(LOG_DEBUG, "Live upgrade: Decoded obuf %d bytes: '%.*s'", l, l, c->obuf);
hlog(LOG_DEBUG, "Hex: %s", obuf->valuestring);
}
}
hlog(LOG_DEBUG, "%s - Accepted live upgrade client on fd %d from %s", c->addr_loc, c->fd, c->addr_rem);
/* set client socket options, return -1 on serious errors */

View File

@ -127,6 +127,26 @@ void login_set_app_name(struct client_t *c, const char *app_name, const char *ap
sanitize_ascii_string(c->app_version);
}
int login_setup_udp_feed(struct client_t *c, int port)
{
if (!c->udpclient)
return -1;
c->udp_port = port;
c->udpaddr = c->addr;
if (c->udpaddr.sa.sa_family == AF_INET) {
c->udpaddr.si.sin_port = htons(c->udp_port);
c->udpaddrlen = sizeof(c->udpaddr.si);
} else {
c->udpaddr.si6.sin6_port = htons(c->udp_port);
c->udpaddrlen = sizeof(c->udpaddr.si6);
}
inbound_connects_account(3, c->udpclient->portaccount); /* "3" = udp, not listening.. */
return 0;
}
/*
* login.c: works in the context of the worker thread
*/
@ -230,27 +250,16 @@ int login_handler(struct worker_t *self, struct client_t *c, int l4proto, char *
hlog(LOG_WARNING, "%s/%s: Missing UDP port number after UDP command", c->addr_rem, username);
break;
}
c->udp_port = atoi(argv[i]);
if (c->udp_port < 1024 || c->udp_port > 65535) {
int udp_port = atoi(argv[i]);
if (udp_port < 1024 || udp_port > 65535) {
hlog(LOG_WARNING, "%s/%s: UDP port number %s is out of range", c->addr_rem, username, argv[i]);
c->udp_port = 0;
break;
}
if (c->udpclient) {
c->udpaddr = c->addr;
if (c->udpaddr.sa.sa_family == AF_INET) {
c->udpaddr.si.sin_port = htons(c->udp_port);
c->udpaddrlen = sizeof(c->udpaddr.si);
} else {
c->udpaddr.si6.sin6_port = htons(c->udp_port);
c->udpaddrlen = sizeof(c->udpaddr.si6);
}
inbound_connects_account(3, c->udpclient->portaccount); /* "3" = udp, not listening.. */
} else {
if (login_setup_udp_feed(c, udp_port) != 0) {
/* Sorry, no UDP service for this port.. */
hlog(LOG_DEBUG, "%s/%s: Requested UDP on client port with no UDP configured", c->addr_rem, username);
c->udp_port = 0;
rc = client_printf(self, c, "# No UDP service available on this port\r\n");
if (rc < -2)
return rc; // client got destroyed

View File

@ -16,6 +16,7 @@
extern int http_udp_upload_login(const char *addr_rem, char *s, char **username);
extern int login_handler(struct worker_t *self, struct client_t *c, int l4proto, char *s, int len);
extern void login_set_app_name(struct client_t *c, const char *app_name, const char *app_ver);
extern int login_setup_udp_feed(struct client_t *c, int port);
#endif

View File

@ -17,6 +17,7 @@
#include <errno.h>
#include <stdlib.h>
#include <sys/utsname.h>
#include <ctype.h>
#include "status.h"
#include "cellmalloc.h"
@ -575,3 +576,57 @@ void status_atend(void)
}
}
/*
* Helper functions for encoding binary data to string buffers in JSON
*/
char *hex_encode(const char *buf, int len)
{
static const char *lut = "0123456789abcdef";
int i;
char *s = hmalloc(len*2+1);
for (i = 0; i < len; i++) {
const char c = buf[i];
s[i*2] = lut[c >> 4];
s[i*2+1] = lut[c & 15];
}
s[i*2] = 0;
return s;
}
int hex_decode(char *obuf, int olen, const char *hex)
{
int i;
int len = strlen(hex);
int oi = 0;
if (len & 1)
return -1; // odd length
if (olen < len / 2)
return -1; // output buffer too large
for (i = 0; i < len; i += 2) {
char h = toupper(hex[i]);
if (h >= 0x30 && h <= 0x39)
h -= 0x30; /* 0..9 are now right */
else if (h >= 0x41 && h <= 0x46)
h = h - 0x41 + 10; /* A-F are now right */
else
return -1;
char l = toupper(hex[i+1]);
if (l >= 0x30 && l <= 0x39)
l -= 0x30; /* 0..9 are now right */
else if (l >= 0x41 && l <= 0x46)
l = l - 0x41 + 10; /* A-F are now right */
else
return -1;
obuf[oi++] = h*16 + l;
}
return oi;
}

View File

@ -24,4 +24,7 @@ extern int status_read_liveupgrade(void);
extern void status_init(void);
extern void status_atend(void);
extern char *hex_encode(const char *buf, int len);
extern int hex_decode(char *obuf, int olen, const char *hex);
#endif

View File

@ -691,7 +691,7 @@ void uplink_start(void)
void uplink_stop(void)
{
int e;
int i, e;
if (!uplink_running)
return;
@ -706,5 +706,21 @@ void uplink_stop(void)
uplink_running = 0;
}
/* free uplink config - and clean up the uplink_client indexed pointers
* which refer to the configs
*/
if ((e = pthread_mutex_lock(&uplink_client_mutex))) {
hlog( LOG_ERR, "uplink_stop(): could not lock uplink_client_mutex: %s", strerror(e) );
return;
}
for (i = 0; i < MAX_UPLINKS; i++)
uplink_client[i] = NULL;
free_uplink_config(&uplink_config);
if ((e = pthread_mutex_unlock(&uplink_client_mutex))) {
hlog( LOG_ERR, "uplink_stop(): could not unlock uplink_client_mutex: %s", strerror(e) );
return;
}
}

View File

@ -34,6 +34,7 @@
#include "client_heard.h"
#include "cellmalloc.h"
#include "version.h"
#include "status.h"
time_t now; /* current time, updated by the main thread, MAY be spun around by the simulator */
time_t tick; /* real monotonous clock, may or may not be wallclock */
@ -94,7 +95,7 @@ cellarena_t *client_cells;
/* clientlist collected at shutdown for live upgrade */
cJSON *worker_shutdown_clients = NULL;
static struct cJSON *worker_client_json(struct client_t *c);
static struct cJSON *worker_client_json(struct client_t *c, int liveup_info);
/* port accounters */
struct portaccount_t *port_accounter_alloc(void)
@ -1381,6 +1382,8 @@ static void collect_new_clients(struct worker_t *self)
/* According to http://www.aprs-is.net/ServerDesign.aspx, the server must
* initially transmit it's software name and version string.
* In case of a live upgrade, this should maybe be skipped, but
* I'll leave it in for now.
*/
client_printf(self, c, "# %s\r\n", verstr_aprsis);
@ -1608,12 +1611,13 @@ void worker_thread(struct worker_t *self)
/* live upgrade: must free all UDP client structs - we need to close the UDP listener fd. */
struct client_t *c;
for (c = self->clients; (c); c = c->next) {
client_udp_free(c->udpclient);
c->udpclient = NULL;
/* collect client state first before closing or freeing anything */
if (worker_shutdown_clients) {
cJSON *jc = worker_client_json(c);
cJSON *jc = worker_client_json(c, 1);
cJSON_AddItemToArray(worker_shutdown_clients, jc);
}
client_udp_free(c->udpclient);
c->udpclient = NULL;
}
} else {
/* close all clients, if not shutting down for a live upgrade */
@ -1850,7 +1854,7 @@ static const char *client_state_string(CStateEnum state)
* (called from another thread - watch out and lock!)
*/
static struct cJSON *worker_client_json(struct client_t *c)
static struct cJSON *worker_client_json(struct client_t *c, int liveup_info)
{
char addr_s[80];
char *s;
@ -1864,8 +1868,29 @@ static struct cJSON *worker_client_json(struct client_t *c)
cJSON *jc = cJSON_CreateObject();
cJSON_AddNumberToObject(jc, "fd", c->fd);
cJSON_AddNumberToObject(jc, "listener_id", c->listener_id);
cJSON_AddStringToObject(jc, "state", client_state_string(c->state));
/* additional information for live upgrade, not published */
if (liveup_info) {
cJSON_AddNumberToObject(jc, "listener_id", c->listener_id);
cJSON_AddStringToObject(jc, "state", client_state_string(c->state));
if (c->udp_port && c->udpclient)
cJSON_AddNumberToObject(jc, "udp_port", c->udp_port);
/* output buffer and input buffer data */
if (c->obuf_end - c->obuf_start > 0) {
s = hex_encode(c->obuf + c->obuf_start, c->obuf_end - c->obuf_start);
cJSON_AddStringToObject(jc, "obuf", s);
hfree(s);
}
if (c->ibuf_end > 0) {
s = hex_encode(c->ibuf, c->ibuf_end);
cJSON_AddStringToObject(jc, "ibuf", s);
hlog(LOG_DEBUG, "Encoded ibuf %d bytes: '%.*s'", c->ibuf_end, c->ibuf_end, c->ibuf);
hlog(LOG_DEBUG, "Hex: %s", s);
hfree(s);
}
}
if (c->state == CSTATE_COREPEER) {
/* cut out ports in the name of security by obscurity */
@ -1882,7 +1907,7 @@ static struct cJSON *worker_client_json(struct client_t *c)
cJSON_AddStringToObject(jc, "addr_loc", c->addr_loc);
}
cJSON_AddStringToObject(jc, "addr_q", c->addr_hex);
//cJSON_AddStringToObject(jc, "addr_q", c->addr_hex);
if (c->udp_port && c->udpclient)
cJSON_AddNumberToObject(jc, "udp_downstream", 1);
@ -1956,7 +1981,7 @@ int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *pe
if (c->hidden || w->client_count > 1000)
continue;
cJSON *jc = worker_client_json(c);
cJSON *jc = worker_client_json(c, 0);
if (c->state == CSTATE_COREPEER) {
cJSON_AddItemToArray(peers, jc);