Calculate worker lag *after* inserting packets in the global buffer, so that

the lag values are large enough. Avoid expiring packets which are not yet
handled by workers.
Added some debug logging.


git-svn-id: http://repo.ham.fi/svn/aprsc/trunk@612 3ce903b1-3385-4e86-93cd-f9a4a239f7ac
This commit is contained in:
Heikki Hannikainen 2012-08-07 16:27:11 +00:00
parent 51b6242d72
commit 7ca9656f75
1 changed files with 34 additions and 40 deletions

View File

@ -42,7 +42,7 @@ int pbuf_global_count;
int pbuf_global_dupe_count;
int pbuf_global_count_limit = 5000; /* Real criteria is expirer.. */
int pbuf_global_dupe_count_limit = 5000; /* .. but we set some minimum packet counts
int pbuf_global_dupe_count_limit = 100; /* .. but we set some minimum packet counts
into the global pbuf queue anyway. */
long long dupecheck_outcount; /* 64 bit counters for statistics */
@ -110,10 +110,13 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
lastage1 = pb->t;
if (pb->t >= expire1)
break; // stop at newer than expire1
lag = pbuf_seqnum_lag(dupecheck_seqnum, pb->seqnum);
if (pbuf_lag >= lag)
if (pbuf_lag >= lag) {
hlog(LOG_DEBUG, "global_pbuf_purger: stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum);
break; // some output-worker is lagging behind this item!
}
freeset[n++] = pb;
++n1;
--pbuf_global_count;
@ -140,9 +143,11 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
if (pb->t >= expire2)
break; // stop at newer than expire2
lag = pbuf_seqnum_lag(dupecheck_dupe_seqnum, pb->seqnum);
if (pbuf_dupe_lag >= lag)
if (pbuf_dupe_lag >= lag) {
hlog(LOG_DEBUG, "global_pbuf_purger: dupe stop at lag %d, dupecheck at %d, pb %d", lag, dupecheck_seqnum, pb->seqnum);
break; // some output-worker is lagging behind this item!
}
freeset[n++] = pb;
++n2;
--pbuf_global_dupe_count;
@ -160,8 +165,8 @@ static void global_pbuf_purger(const int all, int pbuf_lag, int pbuf_dupe_lag)
// debug printout time... map "undefined" lag values to zero.
if (pbuf_lag == 2000000000) pbuf_lag = 0;
if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0;
//if (pbuf_lag == 2000000000) pbuf_lag = 0;
//if (pbuf_dupe_lag == 2000000000) pbuf_dupe_lag = 0;
if (lastage1 == 0) lastage1 = now+2; // makes printout of "-2" (or "-1")
if (lastage2 == 0) lastage2 = now+2;
@ -430,11 +435,11 @@ static int dupecheck_drain_worker(struct worker_t *w,
for (pb = pb_list; (pb); pb = pbnext) {
if (pb->t > tick) {
hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d!\n%s*",
pb->seqnum, pb->t, tick, pb->packet_len-2, pb->data);
hlog(LOG_ERR, "dupecheck: drain got packet from future %d with t %d > tick %d, worker %d!\n%*s",
pb->seqnum, pb->t, tick, w->id, pb->packet_len-2, pb->data);
} else if (tick - pb->t > 10) {
hlog(LOG_ERR, "dupecheck: drain got packet %d aged %d sec from worker %d\n%*s",
pb->seqnum, tick - pb->t, pb->packet_len-2, w->id, pb->data);
pb->seqnum, tick - pb->t, w->id, pb->packet_len-2, pb->data);
}
int rc = dupecheck(pb);
@ -476,8 +481,6 @@ static void dupecheck_thread(void)
int e;
int c, d;
int pb_out_count, pb_out_dupe_count;
int worker_pbuf_lag;
int worker_pbuf_dupe_lag;
time_t cleanup_tick = now;
pthreads_profiling_reset("dupecheck");
@ -504,24 +507,9 @@ static void dupecheck_thread(void)
pb_out_dupe_prevp = &pb_out_dupe;
pb_out_count = pb_out_dupe_count = 0;
pb_out_last = pb_out_dupe_last = NULL;
worker_pbuf_lag = worker_pbuf_dupe_lag = -1;
/* walk through worker threads */
for (w = worker_threads; (w); w = w->next) {
/* find the highest worker lag count... */
/* It really does not matter when we do this calculation, the worker-threads
are running independently of us, and only thing guaranteed for us is that
they are progressing onwards the pbuf sequences. */
c = pbuf_seqnum_lag(dupecheck_seqnum, w->last_pbuf_seqnum);
if (w->last_pbuf_seqnum == 0) c = 2000000000;
if (c > worker_pbuf_lag)
worker_pbuf_lag = c;
c = pbuf_seqnum_lag(dupecheck_dupe_seqnum, w->last_pbuf_dupe_seqnum);
if (c > worker_pbuf_dupe_lag)
worker_pbuf_dupe_lag = c;
/* if there are items in the worker's pbuf_incoming, grab them and process */
if (!w->pbuf_incoming)
continue;
@ -553,14 +541,12 @@ static void dupecheck_thread(void)
if (pb_out) {
*pbuf_global_prevp = pb_out;
pbuf_global_prevp = pb_out_prevp;
pbuf_global_last = pb_out_last;
pbuf_global_count += pb_out_count;
}
if (pb_out_dupe) {
*pbuf_global_dupe_prevp = pb_out_dupe;
pbuf_global_dupe_prevp = pb_out_dupe_prevp;
pbuf_global_dupe_last = pb_out_dupe_last;
pbuf_global_dupe_count += pb_out_dupe_count;
}
@ -570,22 +556,12 @@ static void dupecheck_thread(void)
}
}
#if 0
// If no workers are running, the output processing is also
// impossible, thus the default lag of "-1" is just fine,
// and the out-queue purge is not lag-restricted.
if (worker_pbuf_lag < 0)
worker_pbuf_lag = 2000000000;
if (worker_pbuf_dupe_lag < 0)
worker_pbuf_dupe_lag = 2000000000;
#endif
dupecheck_outcount += pb_out_count;
dupecheck_dupecount += pb_out_dupe_count;
if (cleanup_tick <= now) { // once in a (simulated) minute or so..
cleanup_tick = now + 10;
/*
if ((e = rwl_wrlock(&pbuf_global_rwlock))) {
hlog(LOG_CRIT, "dupecheck: Failed to wrlock pbuf_global_rwlock!");
@ -593,6 +569,24 @@ static void dupecheck_thread(void)
}
*/
/* walk through worker threads */
int worker_pbuf_lag;
int worker_pbuf_dupe_lag;
worker_pbuf_lag = worker_pbuf_dupe_lag = -1;
for (w = worker_threads; (w); w = w->next) {
/* Find the highest worker lag count after we have appended
* the packets in the buffer.
*/
c = pbuf_seqnum_lag(dupecheck_seqnum, w->last_pbuf_seqnum);
if (w->last_pbuf_seqnum == 0)
c = 2000000000;
if (c > worker_pbuf_lag)
worker_pbuf_lag = c;
c = pbuf_seqnum_lag(dupecheck_dupe_seqnum, w->last_pbuf_dupe_seqnum);
if (c > worker_pbuf_dupe_lag)
worker_pbuf_dupe_lag = c;
}
global_pbuf_purger(0, worker_pbuf_lag, worker_pbuf_dupe_lag);
/*