diff --git a/src/outgoing.c b/src/outgoing.c index 1b6c3cf..0b03737 100644 --- a/src/outgoing.c +++ b/src/outgoing.c @@ -53,17 +53,17 @@ static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb) if (pb->flags & F_DUPE) { /* Duplicate packet. Don't send, unless client especially wants! */ - for (c = self->clients; (c); c = cnext) { - cnext = c->next; // client_write() MAY destroy the client object! - if (c->flags & CLFLAGS_DUPEFEED && c->state == CSTATE_CONNECTED) + for (c = self->clients_dupe; (c); c = cnext) { + cnext = c->class_next; // client_write() MAY destroy the client object! + if (c->state == CSTATE_CONNECTED) send_single(self, c, pb); } return; } - for (c = self->clients; (c); c = cnext) { - cnext = c->next; // client_write() MAY destroy the client object! + for (c = self->clients_other; (c); c = cnext) { + cnext = c->class_next; // client_write() MAY destroy the client object! /* Do not send to clients that are not logged in. */ if (c->state != CSTATE_CONNECTED && c->state != CSTATE_COREPEER) { @@ -87,9 +87,6 @@ static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb) //hlog(LOG_DEBUG, "fd %d: Not from downstr, not sending to upstr.", c->fd); continue; } - } else if (c->flags & CLFLAGS_DUPEFEED) { - /* Duplicate packets feed? This packet isn't a duplicate, so don't send. */ - continue; } else { hlog(LOG_DEBUG, "fd %d: Odd! Client not upstream or downstream. Not sending packets.", c->fd); continue; diff --git a/src/worker.c b/src/worker.c index 65b5968..b01dd64 100644 --- a/src/worker.c +++ b/src/worker.c @@ -478,7 +478,6 @@ int pass_client_to_worker(struct worker_t *wc, struct client_t *c) return 0; } - char *strsockaddr(const struct sockaddr *sa, const int addr_len) { char eb[200], *s; @@ -761,6 +760,13 @@ void client_close(struct worker_t *self, struct client_t *c, int errnum) if (c->next) c->next->prevp = c->prevp; *c->prevp = c->next; + + /* link the classified clients list together over this node */ + if (c->class_prevp) { + *c->class_prevp = c->class_next; + if (c->class_next) + c->class_next->class_prevp = c->class_prevp; + } /* If this happens to be the uplink, tell the uplink connection * setup module that the connection has gone away. @@ -1280,6 +1286,23 @@ static void collect_new_clients(struct worker_t *self) c->next->prevp = &c->next; self->clients = c; c->prevp = &self->clients; + + struct client_t *class_next; + struct client_t **class_prevp; + if (c->flags & CLFLAGS_DUPEFEED) { + hlog(LOG_DEBUG, "collect_new_clients(worker %d): client fd %d classified dupefeed", self->id, c->fd); + class_next = self->clients_dupe; + class_prevp = &self->clients_dupe; + } else { + hlog(LOG_DEBUG, "collect_new_clients(worker %d): client fd %d classified other", self->id, c->fd); + class_next = self->clients_other; + class_prevp = &self->clients_other; + } + c->class_next = class_next; + if (class_next) + class_next->class_prevp = &c->class_next; + *class_prevp = c; + c->class_prevp = class_prevp; } /* If the new client is an UDP core peer, we will add it's FD to the diff --git a/src/worker.h b/src/worker.h index 4254954..fa61197 100644 --- a/src/worker.h +++ b/src/worker.h @@ -254,6 +254,9 @@ struct client_t { struct client_t *next; struct client_t **prevp; + struct client_t *class_next; + struct client_t **class_prevp; + union sockaddr_u addr; struct portaccount_t *portaccount; /* port specific global account accumulator */ struct portaccount_t localaccount; /* client connection specific account accumulator */ @@ -384,7 +387,10 @@ struct worker_t { int shutting_down; /* should I shut down? */ - struct client_t *clients; /* clients handled by this thread */ + struct client_t *clients; /* all clients handled by this thread */ + /* c->class_next lists, classified clients */ + struct client_t *clients_dupe; /* dupeclient port clients */ + struct client_t *clients_other; /* other clients (unoptimized) */ pthread_mutex_t clients_mutex; /* mutex to protect access to the client list by the status dumps */ struct client_t *new_clients; /* new clients which passed in by accept */