From 7ca9656f75b5511ac4fe22c9e398f0453350457b Mon Sep 17 00:00:00 2001 From: Heikki Hannikainen Date: Tue, 7 Aug 2012 16:27:11 +0000 Subject: [PATCH] Calculate worker lag *after* inserting packets in the global buffer, so that the lag values are large enough. Avoid expiring packets which are not yet handled by workers. Added some debug logging. git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@612 3ce903b1-3385-4e86-93cd-f9a4a239f7ac --- src/dupecheck.c | 74 +++++++++++++++++++++++-------------------------- 1 file changed, 34 insertions(+), 40 deletions(-) diff --git a/src/dupecheck.c b/src/dupecheck.c index 38b6818..8b0a0db 100644 --- a/src/dupecheck.c +++ b/src/dupecheck.c @@ -42,7 +42,7 @@ int pbuf_global_count; int pbuf_global_dupe_count; int pbuf_global_count_limit = 5000; /* Real criteria is expirer.. */ -int pbuf_global_dupe_count_limit = 5000; /* .. but we set some minimum packet counts +int pbuf_global_dupe_count_limit = 100; /* .. but we set some minimum packet counts into the global pbuf queue anyway. */ long long dupecheck_outcount; /* 64 bit counters for statistics */ @@ -110,10 +110,13 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) lastage1 = pb->t; if (pb->t >= expire1) break; // stop at newer than expire1 + lag = pbuf_seqnum_lag(dupecheck_seqnum, pb->seqnum); - if (pbuf_lag >= lag) + if (pbuf_lag >= lag) { + hlog(LOG_DEBUG, "global_pbuf_purger: stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum); break; // some output-worker is lagging behind this item! - + } + freeset[n++] = pb; ++n1; --pbuf_global_count; @@ -140,9 +143,11 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) if (pb->t >= expire2) break; // stop at newer than expire2 lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum); - if (pbuf_dupe_lag >= lag) + if (pbuf_dupe_lag >= lag) { + hlog(LOG_DEBUG, "global_pbuf_purger: dupe stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum); break; // some output-worker is lagging behind this item! - + } + freeset[n++] = pb; ++n2; --pbuf_global_dupe_count; @@ -160,8 +165,8 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag) // debug printout time... map "undefined" lag values to zero. - if (pbuf_lag == 2000000000) pbuf_lag = 0; - if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0; + //if (pbuf_lag == 2000000000) pbuf_lag = 0; + //if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0; if (lastage1 == 0) lastage1 = now+2; // makes printout of "-2" (or "-1") if (lastage2 == 0) lastage2 = now+2; @@ -430,11 +435,11 @@ static int dupecheck_drain_worker(struct worker_t *w, for (pb = pb_list; (pb); pb = pbnext) { if (pb->t > tick) { - hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d!\n%s*", - pb->seqnum, pb->t, tick, pb->packet_len-2, pb->data); + hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d, worker %d!\n%*s", + pb->seqnum, pb->t, tick, w->id, pb->packet_len-2, pb->data); } else if (tick - pb->t > 10) { hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s", - pb->seqnum, tick - pb->t, pb->packet_len-2, w->id, pb->data); + pb->seqnum, tick - pb->t, w->id, pb->packet_len-2, pb->data); } int rc = dupecheck(pb); @@ -476,8 +481,6 @@ static void dupecheck_thread(void) int e; int c, d; int pb_out_count, pb_out_dupe_count; - int worker_pbuf_lag; - int worker_pbuf_dupe_lag; time_t cleanup_tick = now; pthreads_profiling_reset("dupecheck"); @@ -504,24 +507,9 @@ static void dupecheck_thread(void) pb_out_dupe_prevp = &pb_out_dupe; pb_out_count = pb_out_dupe_count = 0; pb_out_last = pb_out_dupe_last = NULL; - worker_pbuf_lag = worker_pbuf_dupe_lag = -1; /* walk through worker threads */ for (w = worker_threads; (w); w = w->next) { - - /* find the highest worker lag count... */ - - /* It really does not matter when we do this calculation, the worker-threads - are running independently of us, and only thing guaranteed for us is that - they are progressing onwards the pbuf sequences. */ - c = pbuf_seqnum_lag(dupecheck_seqnum, w->last_pbuf_seqnum); - if (w->last_pbuf_seqnum == 0) c = 2000000000; - if (c > worker_pbuf_lag) - worker_pbuf_lag = c; - c = pbuf_seqnum_lag(dupecheck_dupe_seqnum, w->last_pbuf_dupe_seqnum); - if (c > worker_pbuf_dupe_lag) - worker_pbuf_dupe_lag = c; - /* if there are items in the worker's pbuf_incoming, grab them and process */ if (!w->pbuf_incoming) continue; @@ -553,14 +541,12 @@ static void dupecheck_thread(void) if (pb_out) { *pbuf_global_prevp = pb_out; pbuf_global_prevp = pb_out_prevp; - pbuf_global_last = pb_out_last; pbuf_global_count += pb_out_count; } if (pb_out_dupe) { *pbuf_global_dupe_prevp = pb_out_dupe; pbuf_global_dupe_prevp = pb_out_dupe_prevp; - pbuf_global_dupe_last = pb_out_dupe_last; pbuf_global_dupe_count += pb_out_dupe_count; } @@ -570,22 +556,12 @@ static void dupecheck_thread(void) } } -#if 0 - // If no workers are running, the output processing is also - // impossible, thus the default lag of "-1" is just fine, - // and the out-queue purge is not lag-restricted. - - if (worker_pbuf_lag < 0) - worker_pbuf_lag = 2000000000; - if (worker_pbuf_dupe_lag < 0) - worker_pbuf_dupe_lag = 2000000000; -#endif - dupecheck_outcount += pb_out_count; dupecheck_dupecount += pb_out_dupe_count; if (cleanup_tick <= now) { // once in a (simulated) minute or so.. cleanup_tick = now + 10; + /* if ((e = rwl_wrlock(&pbuf_global_rwlock))) { hlog(LOG_CRIT, "dupecheck: Failed to wrlock pbuf_global_rwlock!"); @@ -593,6 +569,24 @@ static void dupecheck_thread(void) } */ + /* walk through worker threads */ + int worker_pbuf_lag; + int worker_pbuf_dupe_lag; + worker_pbuf_lag = worker_pbuf_dupe_lag = -1; + for (w = worker_threads; (w); w = w->next) { + /* Find the highest worker lag count after we have appended + * the packets in the buffer. + */ + c = pbuf_seqnum_lag(dupecheck_seqnum, w->last_pbuf_seqnum); + if (w->last_pbuf_seqnum == 0) + c = 2000000000; + if (c > worker_pbuf_lag) + worker_pbuf_lag = c; + c = pbuf_seqnum_lag(dupecheck_dupe_seqnum, w->last_pbuf_dupe_seqnum); + if (c > worker_pbuf_dupe_lag) + worker_pbuf_dupe_lag = c; + } + global_pbuf_purger(0, worker_pbuf_lag, worker_pbuf_dupe_lag); /*