Merge pull request #74 from hessu/optimize

Various CPU optimizations
This commit is contained in:
Heikki Hannikainen 2022-10-31 19:15:33 +02:00 committed by GitHub
commit 47b373b8df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 149 additions and 90 deletions

View File

@ -702,7 +702,7 @@ static void accept_process_udpsubmit(struct listen_t *l, char *buf, int len, cha
udp_pseudoclient->portaccount = l->portaccount; udp_pseudoclient->portaccount = l->portaccount;
e = pseudoclient_push_packet(udp_worker, udp_pseudoclient, username, packet, packet_len); e = pseudoclient_push_packet(udp_worker, udp_pseudoclient, username, packet, packet_len);
clientaccount_add(udp_pseudoclient, IPPROTO_UDP, len, 1, 0, 0, (e < 0) ? e : 0, 0); clientaccount_add_rx(udp_pseudoclient, IPPROTO_UDP, len, 1, (e < 0) ? e : 0, 0);
udp_pseudoclient->portaccount = NULL; udp_pseudoclient->portaccount = NULL;
if (e < 0) if (e < 0)

View File

@ -95,7 +95,6 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
struct pbuf_t *pb, *pb2; struct pbuf_t *pb, *pb2;
struct pbuf_t *freeset[2002]; struct pbuf_t *freeset[2002];
int n, n1, n2, lag; int n, n1, n2, lag;
time_t lastage1 = 0, lastage2 = 0;
time_t expire2 = tick - pbuf_global_dupe_expiration; time_t expire2 = tick - pbuf_global_dupe_expiration;
time_t expire1 = tick - pbuf_global_expiration; time_t expire1 = tick - pbuf_global_expiration;
@ -107,13 +106,20 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
} }
pb = pbuf_global; pb = pbuf_global;
#ifdef GLOBAL_PBUF_PURGER_STATS
time_t lastage1 = 0, lastage2 = 0;
if (pb) if (pb)
lastage1 = pb->t; lastage1 = pb->t;
#endif
n = 0; n = 0;
n1 = 0; n1 = 0;
while ( pbuf_global_count > pbuf_global_count_limit && pb ) { while ( pbuf_global_count > pbuf_global_count_limit && pb ) {
#ifdef GLOBAL_PBUF_PURGER_STATS
lastage1 = pb->t; lastage1 = pb->t;
#endif
if (pb->t >= expire1) if (pb->t >= expire1)
break; // stop at newer than expire1 break; // stop at newer than expire1
@ -139,13 +145,16 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
} }
pb = pbuf_global_dupe; pb = pbuf_global_dupe;
#ifdef GLOBAL_PBUF_PURGER_STATS
if (pb) if (pb)
lastage2 = pb->t; lastage2 = pb->t;
#endif
n = 0; n = 0;
n2 = 0; n2 = 0;
while ( pbuf_global_dupe_count > pbuf_global_dupe_count_limit && pb ) { while ( pbuf_global_dupe_count > pbuf_global_dupe_count_limit && pb ) {
#ifdef GLOBAL_PBUF_PURGER_STATS
lastage2 = pb->t; lastage2 = pb->t;
#endif
if (pb->t >= expire2) if (pb->t >= expire2)
break; // stop at newer than expire2 break; // stop at newer than expire2
lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum); lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum);
@ -169,15 +178,13 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
pbuf_free_many(freeset, n); pbuf_free_many(freeset, n);
} }
#ifdef GLOBAL_PBUF_PURGER_STATS
// debug printout time... map "undefined" lag values to zero. // debug printout time... map "undefined" lag values to zero.
//if (pbuf_lag == 2000000000) pbuf_lag = 0; if (pbuf_lag == 2000000000) pbuf_lag = 0;
//if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0; if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0;
if (lastage1 == 0) lastage1 = tick+2; // makes printout of "-2" (or "-1") if (lastage1 == 0) lastage1 = tick+2; // makes printout of "-2" (or "-1")
if (lastage2 == 0) lastage2 = tick+2; if (lastage2 == 0) lastage2 = tick+2;
/*
static int show_zeros = 1; static int show_zeros = 1;
@ -194,7 +201,7 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
else else
show_zeros = 1; show_zeros = 1;
} }
*/ #endif
} }
@ -394,11 +401,8 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat
int tlen1, tlen2, tlen3; int tlen1, tlen2, tlen3;
int i; int i;
char c; char c;
int have_del = 0;
/* TODO: dupecheck_mangle_store: Check for the necessity to do int have_low_values = 0;
* any futher packet scans when doing the initial scan
* (check for 8-bit / low data => optimize /shortcut)
*/
ilen = addrlen + datalen; ilen = addrlen + datalen;
@ -413,14 +417,13 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat
/********************************************/ /********************************************/
/* remove spaces from the end of the packet */ /* remove spaces from the end of the packet */
memcpy(tb1, ib, ilen);
tlen1 = ilen; tlen1 = ilen;
while (tlen1 > 0 && tb1[tlen1-1] == ' ') while (tlen1 > 0 && ib[tlen1-1] == ' ')
--tlen1; --tlen1;
if (tlen1 != ilen) { if (tlen1 != ilen) {
//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d spaces: '%.*s'", ilen-tlen1, tlen1, tb1); //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d spaces: '%.*s'", ilen-tlen1, tlen1, tb1);
dupecheck_add_buf(tb1, tlen1, DTYPE_SPACE_TRIM); dupecheck_add_buf(ib, tlen1, DTYPE_SPACE_TRIM);
} }
/*************************/ /*************************/
@ -428,9 +431,21 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat
* tb2: 8th bit is cleared * tb2: 8th bit is cleared
* tb3: 8th bit replaced with a space * tb3: 8th bit replaced with a space
*/ */
/* Check for the necessity to do
* any futher packet scans when doing the initial scan
* (check for DEL bytes / low data => optimize /shortcut)
*/
tlen1 = tlen2 = tlen3 = 0; tlen1 = tlen2 = tlen3 = 0;
for (i = 0; i < ilen; i++) { for (i = 0; i < ilen; i++) {
c = ib[i] & 0x7F; c = ib[i];
if (c == 0x7F)
have_del = 1;
if (c < 0x20 && c > 0)
have_low_values = 1;
c = c & 0x7F;
tb2[tlen2++] = c; tb2[tlen2++] = c;
if (ib[i] != c) { if (ib[i] != c) {
/* high bit is on */ /* high bit is on */
@ -455,50 +470,54 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat
* tb1: Low data (0 <= x < 0x20 deleted * tb1: Low data (0 <= x < 0x20 deleted
* tb2: Low data replaced with spaces * tb2: Low data replaced with spaces
*/ */
tlen1 = tlen2 = 0; if (have_low_values) {
for (i = 0; i < ilen; i++) { tlen1 = tlen2 = 0;
c = ib[i]; for (i = 0; i < ilen; i++) {
if (c < 0x20 && c > 0) { c = ib[i];
/* low data, tb2 gets a space and tb1 gets nothing */ if (c < 0x20 && c > 0) {
tb2[tlen2++] = ' '; /* low data, tb2 gets a space and tb1 gets nothing */
} else { tb2[tlen2++] = ' ';
/* regular stuff */ } else {
tb1[tlen1++] = c; /* regular stuff */
tb2[tlen2++] = c; tb1[tlen1++] = c;
tb2[tlen2++] = c;
}
}
if (tlen1 != ilen) {
/* if there was low data, store it */
//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
dupecheck_add_buf(tb1, tlen1, DTYPE_LOWDATA_STRIP);
dupecheck_add_buf(tb2, tlen2, DTYPE_LOWDATA_SPACED);
} }
}
if (tlen1 != ilen) {
/* if there was low data, store it */
//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
dupecheck_add_buf(tb1, tlen1, DTYPE_LOWDATA_STRIP);
dupecheck_add_buf(tb2, tlen2, DTYPE_LOWDATA_SPACED);
} }
/********************************************** /**********************************************
* tb1: Del characters (0x7f) deleted * tb1: Del characters (0x7f) deleted
* tb2: Del characters replaced with spaces * tb2: Del characters replaced with spaces
*/ */
tlen1 = tlen2 = 0; if (have_del) {
for (i = 0; i < ilen; i++) { tlen1 = tlen2 = 0;
c = ib[i]; for (i = 0; i < ilen; i++) {
if (c == 0x7f) { c = ib[i];
/* low data, tb2 gets a space and tb1 gets nothing */ if (c == 0x7f) {
tb2[tlen2++] = ' '; /* low data, tb2 gets a space and tb1 gets nothing */
} else { tb2[tlen2++] = ' ';
/* regular stuff */ } else {
tb1[tlen1++] = c; /* regular stuff */
tb2[tlen2++] = c; tb1[tlen1++] = c;
tb2[tlen2++] = c;
}
}
if (tlen1 != ilen) {
/* if there was low data, store it */
//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
dupecheck_add_buf(tb1, tlen1, DTYPE_DEL_STRIP);
dupecheck_add_buf(tb2, tlen2, DTYPE_DEL_SPACED);
} }
}
if (tlen1 != ilen) {
/* if there was low data, store it */
//hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1);
//hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2);
dupecheck_add_buf(tb1, tlen1, DTYPE_DEL_STRIP);
dupecheck_add_buf(tb2, tlen2, DTYPE_DEL_SPACED);
} }
return 0; return 0;
@ -618,6 +637,7 @@ static int dupecheck_drain_worker(struct worker_t *w,
struct pbuf_t *pb, *pbnext; struct pbuf_t *pb, *pbnext;
int n = 0; int n = 0;
int me; int me;
int pb_out_count_local = 0, pb_out_dupe_count_local = 0;
/* grab worker's list of packets */ /* grab worker's list of packets */
if ((me = pthread_mutex_lock(&w->pbuf_incoming_mutex))) { if ((me = pthread_mutex_lock(&w->pbuf_incoming_mutex))) {
@ -635,16 +655,14 @@ static int dupecheck_drain_worker(struct worker_t *w,
//hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d", //hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d",
// c, w->id, dupecheck_seqnum); // c, w->id, dupecheck_seqnum);
// check that the first packet isn't very old, it indicates we're not doing well
if ((pb_list) && tick - pb_list->t > 10) {
hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s",
pb_list->seqnum, tick - pb_list->t, w->id, pb_list->packet_len-2, pb_list->data);
}
for (pb = pb_list; (pb); pb = pbnext) { for (pb = pb_list; (pb); pb = pbnext) {
if (pb->t > tick + 1) {
hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d, worker %d!\n%*s",
pb->seqnum, pb->t, tick, w->id, pb->packet_len-2, pb->data);
} else if (tick - pb->t > 10) {
hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s",
pb->seqnum, tick - pb->t, w->id, pb->packet_len-2, pb->data);
}
int rc = dupecheck(pb); int rc = dupecheck(pb);
pbnext = pb->next; // it may get modified below.. pbnext = pb->next; // it may get modified below..
@ -664,18 +682,21 @@ static int dupecheck_drain_worker(struct worker_t *w,
*pb_out_prevp = &pb->next; *pb_out_prevp = &pb->next;
*pb_out_last = pb; *pb_out_last = pb;
pb->seqnum = ++dupecheck_seqnum; pb->seqnum = ++dupecheck_seqnum;
*pb_out_count = *pb_out_count + 1; pb_out_count_local++;
} else { } else {
// Duplicate // Duplicate
**pb_out_dupe_prevp = pb; **pb_out_dupe_prevp = pb;
*pb_out_dupe_prevp = &pb->next; *pb_out_dupe_prevp = &pb->next;
*pb_out_dupe_last = pb; *pb_out_dupe_last = pb;
pb->seqnum = ++dupecheck_dupe_seqnum; pb->seqnum = ++dupecheck_dupe_seqnum;
*pb_out_dupe_count = *pb_out_dupe_count + 1; pb_out_dupe_count_local++;
//hlog(LOG_DEBUG, "is duplicate"); //hlog(LOG_DEBUG, "is duplicate");
} }
n++; n++;
} }
*pb_out_count += pb_out_count_local;
*pb_out_dupe_count += pb_out_dupe_count_local;
return n; return n;
} }

View File

@ -964,15 +964,14 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len)
hlog(LOG_DEBUG, "packet too long after inserting new Q construct (%d bytes, max %d)", new_len, PACKETLEN_MAX_LARGE); hlog(LOG_DEBUG, "packet too long after inserting new Q construct (%d bytes, max %d)", new_len, PACKETLEN_MAX_LARGE);
return INERR_LONG_PACKET; return INERR_LONG_PACKET;
} }
/* get a new packet buffer of the correct size; it is zeroed by pbuf_get. */
pb = pbuf_get(self, new_len); pb = pbuf_get(self, new_len);
if (!pb) { if (!pb) {
// This should never happen... LOG_CRIT error logged in pbuf_get() // This should never happen... LOG_CRIT error logged in pbuf_get()
//hlog(LOG_DEBUG, "pbuf_get failed to get packet buffer"); //hlog(LOG_DEBUG, "pbuf_get failed to get packet buffer");
return INERR_OUT_OF_PBUFS; // No room :-( return INERR_OUT_OF_PBUFS; // No room :-(
} }
pb->next = NULL; // OPTIMIZE: pbuf arrives pre-zeroed, this could be removed maybe?
pb->flags = 0;
/* store the source reference */ /* store the source reference */
pb->origin = c; pb->origin = c;
@ -1189,7 +1188,7 @@ in_drop:
/* Account the one incoming packet. /* Account the one incoming packet.
* Incoming bytes were already accounted earlier. * Incoming bytes were already accounted earlier.
*/ */
clientaccount_add(c, l4proto, 0, 1, 0, 0, (e < 0) ? e : 0, 0); clientaccount_add_rx(c, l4proto, 0, 1, (e < 0) ? e : 0, 0);
return 0; return 0;
} }

View File

@ -31,9 +31,9 @@ static inline void send_single(struct worker_t *self, struct client_t *c, char *
* its TCP or SCTP or something. * its TCP or SCTP or something.
*/ */
if (c->udp_port && c->udpclient) if (c->udp_port && c->udpclient)
clientaccount_add( c, IPPROTO_UDP, 0, 0, 0, 1, 0, 0); clientaccount_add_tx( c, IPPROTO_UDP, 0, 1);
else else
clientaccount_add( c, c->ai_protocol, 0, 0, 0, 1, 0, 0); clientaccount_add_tx( c, c->ai_protocol, 0, 1);
c->write(self, c, data, len); c->write(self, c, data, len);
} }
@ -79,7 +79,7 @@ static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb)
/* OPTIMIZE: we walk through all clients for each dupe - how to find it quickly? */ /* OPTIMIZE: we walk through all clients for each dupe - how to find it quickly? */
for (c = self->clients; (c); c = c->next) { for (c = self->clients; (c); c = c->next) {
if (c == origin) { if (c == origin) {
clientaccount_add(c, -1, 0, 0, 0, 0, 0, 1); clientaccount_add_rx(c, -1, 0, 0, 0, 1);
break; break;
} }
} }

View File

@ -286,7 +286,7 @@ int sctp_client_write(struct worker_t *self, struct client_t *c, char *p, int le
c->obuf_writes++; c->obuf_writes++;
if (client_buffer_outgoing_data(self, c, p, len) == -12) if (client_buffer_outgoing_data(self, c, p, len) == -12)
return -12; return -12;
clientaccount_add( c, IPPROTO_SCTP, 0, 0, len, 0, 0, 0); clientaccount_add_tx(c, IPPROTO_SCTP, len, 0);
if (c->obuf_writes > obuf_writes_threshold) { if (c->obuf_writes > obuf_writes_threshold) {
// Lots and lots of writes, switch to buffering... // Lots and lots of writes, switch to buffering...
if (c->obuf_flushsize == 0) { if (c->obuf_flushsize == 0) {

View File

@ -660,7 +660,7 @@ char *hexsockaddr(const struct sockaddr *sa, const int addr_len)
return hstrdup(eb); return hstrdup(eb);
} }
void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes) void clientaccount_add_rx(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int rxerr, int rxdupes)
{ {
struct portaccount_t *pa = NULL; struct portaccount_t *pa = NULL;
int rxdrops = 0; int rxdrops = 0;
@ -674,9 +674,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke
/* worker local accounters do not need locks */ /* worker local accounters do not need locks */
c->localaccount.rxbytes += rxbytes; c->localaccount.rxbytes += rxbytes;
c->localaccount.txbytes += txbytes;
c->localaccount.rxpackets += rxpackets; c->localaccount.rxpackets += rxpackets;
c->localaccount.txpackets += txpackets;
c->localaccount.rxdupes += rxdupes; c->localaccount.rxdupes += rxdupes;
if (rxdrops) { if (rxdrops) {
c->localaccount.rxdrops += 1; c->localaccount.rxdrops += 1;
@ -692,9 +690,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke
if (pa) { if (pa) {
#ifdef HAVE_SYNC_FETCH_AND_ADD #ifdef HAVE_SYNC_FETCH_AND_ADD
__sync_fetch_and_add(&pa->rxbytes, rxbytes); __sync_fetch_and_add(&pa->rxbytes, rxbytes);
__sync_fetch_and_add(&pa->txbytes, txbytes);
__sync_fetch_and_add(&pa->rxpackets, rxpackets); __sync_fetch_and_add(&pa->rxpackets, rxpackets);
__sync_fetch_and_add(&pa->txpackets, txpackets);
__sync_fetch_and_add(&pa->rxdupes, rxdupes); __sync_fetch_and_add(&pa->rxdupes, rxdupes);
if (rxdrops) { if (rxdrops) {
__sync_fetch_and_add(&pa->rxdrops, 1); __sync_fetch_and_add(&pa->rxdrops, 1);
@ -703,9 +699,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke
#else #else
// FIXME: MUTEX !! -- this may or may not need locks.. // FIXME: MUTEX !! -- this may or may not need locks..
pa->rxbytes += rxbytes; pa->rxbytes += rxbytes;
pa->txbytes += txbytes;
pa->rxpackets += rxpackets; pa->rxpackets += rxpackets;
pa->txpackets += txpackets;
pa->rxdupes += rxdupes; pa->rxdupes += rxdupes;
if (rxdrops) { if (rxdrops) {
pa->rxdrops += 1; pa->rxdrops += 1;
@ -729,9 +723,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke
#ifdef HAVE_SYNC_FETCH_AND_ADD #ifdef HAVE_SYNC_FETCH_AND_ADD
__sync_fetch_and_add(&proto->rxbytes, rxbytes); __sync_fetch_and_add(&proto->rxbytes, rxbytes);
__sync_fetch_and_add(&proto->txbytes, txbytes);
__sync_fetch_and_add(&proto->rxpackets, rxpackets); __sync_fetch_and_add(&proto->rxpackets, rxpackets);
__sync_fetch_and_add(&proto->txpackets, txpackets);
if (rxdrops) { if (rxdrops) {
__sync_fetch_and_add(&proto->rxdrops, 1); __sync_fetch_and_add(&proto->rxdrops, 1);
__sync_fetch_and_add(&proto->rxerrs[rxerr], 1); __sync_fetch_and_add(&proto->rxerrs[rxerr], 1);
@ -739,9 +731,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke
#else #else
// FIXME: MUTEX !! -- this may or may not need locks.. // FIXME: MUTEX !! -- this may or may not need locks..
proto->rxbytes += rxbytes; proto->rxbytes += rxbytes;
proto->txbytes += txbytes;
proto->rxpackets += rxpackets; proto->rxpackets += rxpackets;
proto->txpackets += txpackets;
if (rxdrops) { if (rxdrops) {
proto->rxdrops += 1; proto->rxdrops += 1;
proto->rxerrs[rxerr] += 1; proto->rxerrs[rxerr] += 1;
@ -749,6 +739,54 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke
#endif #endif
} }
void clientaccount_add_tx(struct client_t *c, int l4proto, int txbytes, int txpackets)
{
struct portaccount_t *pa = NULL;
/* worker local accounters do not need locks */
c->localaccount.txbytes += txbytes;
c->localaccount.txpackets += txpackets;
if (l4proto == IPPROTO_UDP && c->udpclient && c->udpclient->portaccount) {
pa = c->udpclient->portaccount;
} else if (c->portaccount) {
pa = c->portaccount;
}
if (pa) {
#ifdef HAVE_SYNC_FETCH_AND_ADD
__sync_fetch_and_add(&pa->txbytes, txbytes);
__sync_fetch_and_add(&pa->txpackets, txpackets);
#else
// FIXME: MUTEX !! -- this may or may not need locks..
pa->txbytes += txbytes;
pa->txpackets += txpackets;
#endif
}
struct portaccount_t *proto;
if (l4proto == IPPROTO_TCP)
proto = &client_connects_tcp;
else if (l4proto == IPPROTO_UDP)
proto = &client_connects_udp;
#ifdef USE_SCTP
else if (l4proto == IPPROTO_SCTP)
proto = &client_connects_sctp;
#endif
else
return;
#ifdef HAVE_SYNC_FETCH_AND_ADD
__sync_fetch_and_add(&proto->txbytes, txbytes);
__sync_fetch_and_add(&proto->txpackets, txpackets);
#else
// FIXME: MUTEX !! -- this may or may not need locks..
proto->txbytes += txbytes;
proto->txpackets += txpackets;
#endif
}
static const char *protocol_str(struct client_t *c) static const char *protocol_str(struct client_t *c)
{ {
static const char unknown[] = "UNKNOWN-PROTOCOL"; static const char unknown[] = "UNKNOWN-PROTOCOL";
@ -929,7 +967,7 @@ int udp_client_write(struct worker_t *self, struct client_t *c, char *p, int len
// hlog( LOG_DEBUG, "UDP from %d to client port %d, sendto rc=%d", c->udpclient->portnum, c->udp_port, i ); // hlog( LOG_DEBUG, "UDP from %d to client port %d, sendto rc=%d", c->udpclient->portnum, c->udp_port, i );
if (i > 0) if (i > 0)
clientaccount_add( c, IPPROTO_UDP, 0, 0, i, 0, 0, 0); clientaccount_add_tx( c, IPPROTO_UDP, i, 0);
return i; return i;
} }
@ -990,7 +1028,7 @@ static int ssl_client_write(struct worker_t *self, struct client_t *c, char *p,
c->obuf_writes++; c->obuf_writes++;
if (len > 0) if (len > 0)
clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0); clientaccount_add_tx( c, c->ai_protocol, len, 0);
if (client_buffer_outgoing_data(self, c, p, len) == -12) if (client_buffer_outgoing_data(self, c, p, len) == -12)
return -12; return -12;
@ -1029,7 +1067,7 @@ static int tcp_client_write(struct worker_t *self, struct client_t *c, char *p,
* will be incremented only when we actually transmit a packet * will be incremented only when we actually transmit a packet
* instead of a keepalive. * instead of a keepalive.
*/ */
clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0); clientaccount_add_tx( c, c->ai_protocol, len, 0);
} }
if (client_buffer_outgoing_data(self, c, p, len) == -12) if (client_buffer_outgoing_data(self, c, p, len) == -12)
@ -1195,7 +1233,7 @@ static int handle_corepeer_readable(struct worker_t *self, struct client_t *c)
hlog(LOG_DEBUG, "worker thread passing UDP packet from %s to handler: %*s", addrs, r, c->ibuf); hlog(LOG_DEBUG, "worker thread passing UDP packet from %s to handler: %*s", addrs, r, c->ibuf);
hfree(addrs); hfree(addrs);
*/ */
clientaccount_add( rc, IPPROTO_UDP, r, 0, 0, 0, 0, 0); /* Account byte count. incoming_handler() will account packets. */ clientaccount_add_rx(rc, IPPROTO_UDP, r, 0, 0, 0); /* Account byte count. incoming_handler() will account packets. */
rc->last_read = tick; rc->last_read = tick;
/* Ignore CRs and LFs in UDP input packet - the current core peer system puts 1 APRS packet in each /* Ignore CRs and LFs in UDP input packet - the current core peer system puts 1 APRS packet in each
@ -1256,7 +1294,7 @@ static int deframe_aprsis_input_lines(struct worker_t *self, struct client_t *c)
int client_postread(struct worker_t *self, struct client_t *c, int r) int client_postread(struct worker_t *self, struct client_t *c, int r)
{ {
clientaccount_add(c, c->ai_protocol, r, 0, 0, 0, 0, 0); /* Number of packets is now unknown, clientaccount_add_rx(c, c->ai_protocol, r, 0, 0, 0); /* Number of packets is now unknown,
byte count is collected. byte count is collected.
The incoming_handler() will account The incoming_handler() will account
packets. */ packets. */

View File

@ -523,7 +523,8 @@ extern void port_accounter_drop(struct portaccount_t *p);
extern char *strsockaddr(const struct sockaddr *sa, const int addr_len); extern char *strsockaddr(const struct sockaddr *sa, const int addr_len);
extern char *hexsockaddr(const struct sockaddr *sa, const int addr_len); extern char *hexsockaddr(const struct sockaddr *sa, const int addr_len);
extern void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes); extern void clientaccount_add_rx(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int rxerr, int rxdupes);
extern void clientaccount_add_tx(struct client_t *c, int l4proto, int txbytes, int txpackets);
extern void json_add_rxerrs(cJSON *root, const char *key, long long vals[]); extern void json_add_rxerrs(cJSON *root, const char *key, long long vals[]);
extern int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *peers, cJSON *totals, cJSON *memory); extern int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *peers, cJSON *totals, cJSON *memory);