diff --git a/src/accept.c b/src/accept.c index 6519d30..73734d0 100644 --- a/src/accept.c +++ b/src/accept.c @@ -702,7 +702,7 @@ static void accept_process_udpsubmit(struct listen_t *l, char *buf, int len, cha udp_pseudoclient->portaccount = l->portaccount; e = pseudoclient_push_packet(udp_worker, udp_pseudoclient, username, packet, packet_len); - clientaccount_add(udp_pseudoclient, IPPROTO_UDP, len, 1, 0, 0, (e < 0) ? e : 0, 0); + clientaccount_add_rx(udp_pseudoclient, IPPROTO_UDP, len, 1, (e < 0) ? e : 0, 0); udp_pseudoclient->portaccount = NULL; if (e < 0) diff --git a/src/dupecheck.c b/src/dupecheck.c index 276d4b6..94a12e6 100644 --- a/src/dupecheck.c +++ b/src/dupecheck.c @@ -95,7 +95,6 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) struct pbuf_t *pb, *pb2; struct pbuf_t *freeset[2002]; int n, n1, n2, lag; - time_t lastage1 = 0, lastage2 = 0; time_t expire2 = tick - pbuf_global_dupe_expiration; time_t expire1 = tick - pbuf_global_expiration; @@ -107,13 +106,20 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) } pb = pbuf_global; + +#ifdef GLOBAL_PBUF_PURGER_STATS + time_t lastage1 = 0, lastage2 = 0; if (pb) lastage1 = pb->t; +#endif + n = 0; n1 = 0; while ( pbuf_global_count > pbuf_global_count_limit && pb ) { +#ifdef GLOBAL_PBUF_PURGER_STATS lastage1 = pb->t; +#endif if (pb->t >= expire1) break; // stop at newer than expire1 @@ -139,13 +145,16 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) } pb = pbuf_global_dupe; +#ifdef GLOBAL_PBUF_PURGER_STATS if (pb) lastage2 = pb->t; +#endif n = 0; n2 = 0; while ( pbuf_global_dupe_count > pbuf_global_dupe_count_limit && pb ) { - +#ifdef GLOBAL_PBUF_PURGER_STATS lastage2 = pb->t; +#endif if (pb->t >= expire2) break; // stop at newer than expire2 lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum); @@ -169,15 +178,13 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) pbuf_free_many(freeset, n); } +#ifdef GLOBAL_PBUF_PURGER_STATS // debug printout time... map "undefined" lag values to zero. - //if (pbuf_lag == 2000000000) pbuf_lag = 0; - //if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0; - + if (pbuf_lag == 2000000000) pbuf_lag = 0; + if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0; if (lastage1 == 0) lastage1 = tick+2; // makes printout of "-2" (or "-1") if (lastage2 == 0) lastage2 = tick+2; - - /* static int show_zeros = 1; @@ -194,7 +201,7 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) else show_zeros = 1; } - */ +#endif } @@ -394,11 +401,8 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat int tlen1, tlen2, tlen3; int i; char c; - - /* TODO: dupecheck_mangle_store: Check for the necessity to do - * any futher packet scans when doing the initial scan - * (check for 8-bit / low data => optimize /shortcut) - */ + int have_del = 0; + int have_low_values = 0; ilen = addrlen + datalen; @@ -413,14 +417,13 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat /********************************************/ /* remove spaces from the end of the packet */ - memcpy(tb1, ib, ilen); tlen1 = ilen; - while (tlen1 > 0 && tb1[tlen1-1] == ' ') + while (tlen1 > 0 && ib[tlen1-1] == ' ') --tlen1; if (tlen1 != ilen) { //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d spaces: '%.*s'", ilen-tlen1, tlen1, tb1); - dupecheck_add_buf(tb1, tlen1, DTYPE_SPACE_TRIM); + dupecheck_add_buf(ib, tlen1, DTYPE_SPACE_TRIM); } /*************************/ @@ -428,9 +431,21 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat * tb2: 8th bit is cleared * tb3: 8th bit replaced with a space */ + + /* Check for the necessity to do + * any futher packet scans when doing the initial scan + * (check for DEL bytes / low data => optimize /shortcut) + */ + tlen1 = tlen2 = tlen3 = 0; for (i = 0; i < ilen; i++) { - c = ib[i] & 0x7F; + c = ib[i]; + if (c == 0x7F) + have_del = 1; + if (c < 0x20 && c > 0) + have_low_values = 1; + + c = c & 0x7F; tb2[tlen2++] = c; if (ib[i] != c) { /* high bit is on */ @@ -455,50 +470,54 @@ static int dupecheck_mangle_store(const char *addr, int addrlen, const char *dat * tb1: Low data (0 <= x < 0x20 deleted * tb2: Low data replaced with spaces */ - tlen1 = tlen2 = 0; - for (i = 0; i < ilen; i++) { - c = ib[i]; - if (c < 0x20 && c > 0) { - /* low data, tb2 gets a space and tb1 gets nothing */ - tb2[tlen2++] = ' '; - } else { - /* regular stuff */ - tb1[tlen1++] = c; - tb2[tlen2++] = c; + if (have_low_values) { + tlen1 = tlen2 = 0; + for (i = 0; i < ilen; i++) { + c = ib[i]; + if (c < 0x20 && c > 0) { + /* low data, tb2 gets a space and tb1 gets nothing */ + tb2[tlen2++] = ' '; + } else { + /* regular stuff */ + tb1[tlen1++] = c; + tb2[tlen2++] = c; + } + } + + if (tlen1 != ilen) { + /* if there was low data, store it */ + //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1); + //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2); + dupecheck_add_buf(tb1, tlen1, DTYPE_LOWDATA_STRIP); + dupecheck_add_buf(tb2, tlen2, DTYPE_LOWDATA_SPACED); } - } - - if (tlen1 != ilen) { - /* if there was low data, store it */ - //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1); - //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2); - dupecheck_add_buf(tb1, tlen1, DTYPE_LOWDATA_STRIP); - dupecheck_add_buf(tb2, tlen2, DTYPE_LOWDATA_SPACED); } /********************************************** * tb1: Del characters (0x7f) deleted * tb2: Del characters replaced with spaces */ - tlen1 = tlen2 = 0; - for (i = 0; i < ilen; i++) { - c = ib[i]; - if (c == 0x7f) { - /* low data, tb2 gets a space and tb1 gets nothing */ - tb2[tlen2++] = ' '; - } else { - /* regular stuff */ - tb1[tlen1++] = c; - tb2[tlen2++] = c; + if (have_del) { + tlen1 = tlen2 = 0; + for (i = 0; i < ilen; i++) { + c = ib[i]; + if (c == 0x7f) { + /* low data, tb2 gets a space and tb1 gets nothing */ + tb2[tlen2++] = ' '; + } else { + /* regular stuff */ + tb1[tlen1++] = c; + tb2[tlen2++] = c; + } + } + + if (tlen1 != ilen) { + /* if there was low data, store it */ + //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1); + //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2); + dupecheck_add_buf(tb1, tlen1, DTYPE_DEL_STRIP); + dupecheck_add_buf(tb2, tlen2, DTYPE_DEL_SPACED); } - } - - if (tlen1 != ilen) { - /* if there was low data, store it */ - //hlog(LOG_DEBUG, "dupecheck_mangle_store: removed %d low chars: '%.*s'", ilen-tlen1, tlen1, tb1); - //hlog(LOG_DEBUG, "dupecheck_mangle_store: replaced %d low chars: '%.*s'", ilen-tlen1, tlen2, tb2); - dupecheck_add_buf(tb1, tlen1, DTYPE_DEL_STRIP); - dupecheck_add_buf(tb2, tlen2, DTYPE_DEL_SPACED); } return 0; @@ -618,6 +637,7 @@ static int dupecheck_drain_worker(struct worker_t *w, struct pbuf_t *pb, *pbnext; int n = 0; int me; + int pb_out_count_local = 0, pb_out_dupe_count_local = 0; /* grab worker's list of packets */ if ((me = pthread_mutex_lock(&w->pbuf_incoming_mutex))) { @@ -635,16 +655,14 @@ static int dupecheck_drain_worker(struct worker_t *w, //hlog(LOG_DEBUG, "Dupecheck got %d packets from worker %d; n=%d", // c, w->id, dupecheck_seqnum); - + + // check that the first packet isn't very old, it indicates we're not doing well + if ((pb_list) && tick - pb_list->t > 10) { + hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s", + pb_list->seqnum, tick - pb_list->t, w->id, pb_list->packet_len-2, pb_list->data); + } + for (pb = pb_list; (pb); pb = pbnext) { - if (pb->t > tick + 1) { - hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d, worker %d!\n%*s", - pb->seqnum, pb->t, tick, w->id, pb->packet_len-2, pb->data); - } else if (tick - pb->t > 10) { - hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s", - pb->seqnum, tick - pb->t, w->id, pb->packet_len-2, pb->data); - } - int rc = dupecheck(pb); pbnext = pb->next; // it may get modified below.. @@ -664,18 +682,21 @@ static int dupecheck_drain_worker(struct worker_t *w, *pb_out_prevp = &pb->next; *pb_out_last = pb; pb->seqnum = ++dupecheck_seqnum; - *pb_out_count = *pb_out_count + 1; + pb_out_count_local++; } else { // Duplicate **pb_out_dupe_prevp = pb; *pb_out_dupe_prevp = &pb->next; *pb_out_dupe_last = pb; pb->seqnum = ++dupecheck_dupe_seqnum; - *pb_out_dupe_count = *pb_out_dupe_count + 1; + pb_out_dupe_count_local++; //hlog(LOG_DEBUG, "is duplicate"); } n++; } + + *pb_out_count += pb_out_count_local; + *pb_out_dupe_count += pb_out_dupe_count_local; return n; } diff --git a/src/incoming.c b/src/incoming.c index 67f3f9b..6dda7df 100644 --- a/src/incoming.c +++ b/src/incoming.c @@ -964,15 +964,14 @@ int incoming_parse(struct worker_t *self, struct client_t *c, char *s, int len) hlog(LOG_DEBUG, "packet too long after inserting new Q construct (%d bytes, max %d)", new_len, PACKETLEN_MAX_LARGE); return INERR_LONG_PACKET; } - + + /* get a new packet buffer of the correct size; it is zeroed by pbuf_get. */ pb = pbuf_get(self, new_len); if (!pb) { // This should never happen... LOG_CRIT error logged in pbuf_get() //hlog(LOG_DEBUG, "pbuf_get failed to get packet buffer"); return INERR_OUT_OF_PBUFS; // No room :-( } - pb->next = NULL; // OPTIMIZE: pbuf arrives pre-zeroed, this could be removed maybe? - pb->flags = 0; /* store the source reference */ pb->origin = c; @@ -1189,7 +1188,7 @@ in_drop: /* Account the one incoming packet. * Incoming bytes were already accounted earlier. */ - clientaccount_add(c, l4proto, 0, 1, 0, 0, (e < 0) ? e : 0, 0); + clientaccount_add_rx(c, l4proto, 0, 1, (e < 0) ? e : 0, 0); return 0; } diff --git a/src/outgoing.c b/src/outgoing.c index f304c0e..94006d4 100644 --- a/src/outgoing.c +++ b/src/outgoing.c @@ -31,9 +31,9 @@ static inline void send_single(struct worker_t *self, struct client_t *c, char * * its TCP or SCTP or something. */ if (c->udp_port && c->udpclient) - clientaccount_add( c, IPPROTO_UDP, 0, 0, 0, 1, 0, 0); + clientaccount_add_tx( c, IPPROTO_UDP, 0, 1); else - clientaccount_add( c, c->ai_protocol, 0, 0, 0, 1, 0, 0); + clientaccount_add_tx( c, c->ai_protocol, 0, 1); c->write(self, c, data, len); } @@ -79,7 +79,7 @@ static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb) /* OPTIMIZE: we walk through all clients for each dupe - how to find it quickly? */ for (c = self->clients; (c); c = c->next) { if (c == origin) { - clientaccount_add(c, -1, 0, 0, 0, 0, 0, 1); + clientaccount_add_rx(c, -1, 0, 0, 0, 1); break; } } diff --git a/src/sctp.c b/src/sctp.c index 1822af2..4ba008b 100644 --- a/src/sctp.c +++ b/src/sctp.c @@ -286,7 +286,7 @@ int sctp_client_write(struct worker_t *self, struct client_t *c, char *p, int le c->obuf_writes++; if (client_buffer_outgoing_data(self, c, p, len) == -12) return -12; - clientaccount_add( c, IPPROTO_SCTP, 0, 0, len, 0, 0, 0); + clientaccount_add_tx(c, IPPROTO_SCTP, len, 0); if (c->obuf_writes > obuf_writes_threshold) { // Lots and lots of writes, switch to buffering... if (c->obuf_flushsize == 0) { diff --git a/src/worker.c b/src/worker.c index 7fc6fe7..4384777 100644 --- a/src/worker.c +++ b/src/worker.c @@ -660,7 +660,7 @@ char *hexsockaddr(const struct sockaddr *sa, const int addr_len) return hstrdup(eb); } -void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes) +void clientaccount_add_rx(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int rxerr, int rxdupes) { struct portaccount_t *pa = NULL; int rxdrops = 0; @@ -674,9 +674,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke /* worker local accounters do not need locks */ c->localaccount.rxbytes += rxbytes; - c->localaccount.txbytes += txbytes; c->localaccount.rxpackets += rxpackets; - c->localaccount.txpackets += txpackets; c->localaccount.rxdupes += rxdupes; if (rxdrops) { c->localaccount.rxdrops += 1; @@ -692,9 +690,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke if (pa) { #ifdef HAVE_SYNC_FETCH_AND_ADD __sync_fetch_and_add(&pa->rxbytes, rxbytes); - __sync_fetch_and_add(&pa->txbytes, txbytes); __sync_fetch_and_add(&pa->rxpackets, rxpackets); - __sync_fetch_and_add(&pa->txpackets, txpackets); __sync_fetch_and_add(&pa->rxdupes, rxdupes); if (rxdrops) { __sync_fetch_and_add(&pa->rxdrops, 1); @@ -703,9 +699,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke #else // FIXME: MUTEX !! -- this may or may not need locks.. pa->rxbytes += rxbytes; - pa->txbytes += txbytes; pa->rxpackets += rxpackets; - pa->txpackets += txpackets; pa->rxdupes += rxdupes; if (rxdrops) { pa->rxdrops += 1; @@ -729,9 +723,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke #ifdef HAVE_SYNC_FETCH_AND_ADD __sync_fetch_and_add(&proto->rxbytes, rxbytes); - __sync_fetch_and_add(&proto->txbytes, txbytes); __sync_fetch_and_add(&proto->rxpackets, rxpackets); - __sync_fetch_and_add(&proto->txpackets, txpackets); if (rxdrops) { __sync_fetch_and_add(&proto->rxdrops, 1); __sync_fetch_and_add(&proto->rxerrs[rxerr], 1); @@ -739,9 +731,7 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke #else // FIXME: MUTEX !! -- this may or may not need locks.. proto->rxbytes += rxbytes; - proto->txbytes += txbytes; proto->rxpackets += rxpackets; - proto->txpackets += txpackets; if (rxdrops) { proto->rxdrops += 1; proto->rxerrs[rxerr] += 1; @@ -749,6 +739,54 @@ void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpacke #endif } +void clientaccount_add_tx(struct client_t *c, int l4proto, int txbytes, int txpackets) +{ + struct portaccount_t *pa = NULL; + + /* worker local accounters do not need locks */ + c->localaccount.txbytes += txbytes; + c->localaccount.txpackets += txpackets; + + if (l4proto == IPPROTO_UDP && c->udpclient && c->udpclient->portaccount) { + pa = c->udpclient->portaccount; + } else if (c->portaccount) { + pa = c->portaccount; + } + + if (pa) { +#ifdef HAVE_SYNC_FETCH_AND_ADD + __sync_fetch_and_add(&pa->txbytes, txbytes); + __sync_fetch_and_add(&pa->txpackets, txpackets); +#else + // FIXME: MUTEX !! -- this may or may not need locks.. + pa->txbytes += txbytes; + pa->txpackets += txpackets; +#endif + } + + struct portaccount_t *proto; + + if (l4proto == IPPROTO_TCP) + proto = &client_connects_tcp; + else if (l4proto == IPPROTO_UDP) + proto = &client_connects_udp; +#ifdef USE_SCTP + else if (l4proto == IPPROTO_SCTP) + proto = &client_connects_sctp; +#endif + else + return; + +#ifdef HAVE_SYNC_FETCH_AND_ADD + __sync_fetch_and_add(&proto->txbytes, txbytes); + __sync_fetch_and_add(&proto->txpackets, txpackets); +#else + // FIXME: MUTEX !! -- this may or may not need locks.. + proto->txbytes += txbytes; + proto->txpackets += txpackets; +#endif +} + static const char *protocol_str(struct client_t *c) { static const char unknown[] = "UNKNOWN-PROTOCOL"; @@ -929,7 +967,7 @@ int udp_client_write(struct worker_t *self, struct client_t *c, char *p, int len // hlog( LOG_DEBUG, "UDP from %d to client port %d, sendto rc=%d", c->udpclient->portnum, c->udp_port, i ); if (i > 0) - clientaccount_add( c, IPPROTO_UDP, 0, 0, i, 0, 0, 0); + clientaccount_add_tx( c, IPPROTO_UDP, i, 0); return i; } @@ -990,7 +1028,7 @@ static int ssl_client_write(struct worker_t *self, struct client_t *c, char *p, c->obuf_writes++; if (len > 0) - clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0); + clientaccount_add_tx( c, c->ai_protocol, len, 0); if (client_buffer_outgoing_data(self, c, p, len) == -12) return -12; @@ -1029,7 +1067,7 @@ static int tcp_client_write(struct worker_t *self, struct client_t *c, char *p, * will be incremented only when we actually transmit a packet * instead of a keepalive. */ - clientaccount_add( c, c->ai_protocol, 0, 0, len, 0, 0, 0); + clientaccount_add_tx( c, c->ai_protocol, len, 0); } if (client_buffer_outgoing_data(self, c, p, len) == -12) @@ -1195,7 +1233,7 @@ static int handle_corepeer_readable(struct worker_t *self, struct client_t *c) hlog(LOG_DEBUG, "worker thread passing UDP packet from %s to handler: %*s", addrs, r, c->ibuf); hfree(addrs); */ - clientaccount_add( rc, IPPROTO_UDP, r, 0, 0, 0, 0, 0); /* Account byte count. incoming_handler() will account packets. */ + clientaccount_add_rx(rc, IPPROTO_UDP, r, 0, 0, 0); /* Account byte count. incoming_handler() will account packets. */ rc->last_read = tick; /* Ignore CRs and LFs in UDP input packet - the current core peer system puts 1 APRS packet in each @@ -1256,7 +1294,7 @@ static int deframe_aprsis_input_lines(struct worker_t *self, struct client_t *c) int client_postread(struct worker_t *self, struct client_t *c, int r) { - clientaccount_add(c, c->ai_protocol, r, 0, 0, 0, 0, 0); /* Number of packets is now unknown, + clientaccount_add_rx(c, c->ai_protocol, r, 0, 0, 0); /* Number of packets is now unknown, byte count is collected. The incoming_handler() will account packets. */ diff --git a/src/worker.h b/src/worker.h index 2f2f3ed..2d4a2e4 100644 --- a/src/worker.h +++ b/src/worker.h @@ -523,7 +523,8 @@ extern void port_accounter_drop(struct portaccount_t *p); extern char *strsockaddr(const struct sockaddr *sa, const int addr_len); extern char *hexsockaddr(const struct sockaddr *sa, const int addr_len); -extern void clientaccount_add(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int txbytes, int txpackets, int rxerr, int rxdupes); +extern void clientaccount_add_rx(struct client_t *c, int l4proto, int rxbytes, int rxpackets, int rxerr, int rxdupes); +extern void clientaccount_add_tx(struct client_t *c, int l4proto, int txbytes, int txpackets); extern void json_add_rxerrs(cJSON *root, const char *key, long long vals[]); extern int worker_client_list(cJSON *workers, cJSON *clients, cJSON *uplinks, cJSON *peers, cJSON *totals, cJSON *memory);