outgoing/worker: Classify upstreams/peers in their own list

This commit is contained in:
Heikki Hannikainen 2012-10-12 02:49:52 +03:00
parent 9cb21af856
commit f5a4298292
3 changed files with 21 additions and 20 deletions

View File

@ -61,34 +61,28 @@ static void process_outgoing_single(struct worker_t *self, struct pbuf_t *pb)
return;
}
if (pb->flags & F_FROM_DOWNSTR) {
/* client is from downstream, send to upstreams and peers */
for (c = self->clients_ups; (c); c = cnext) {
cnext = c->class_next; // client_write() MAY destroy the client object!
if ((c->state == CSTATE_CONNECTED || c->state == CSTATE_COREPEER) && c != pb->origin)
send_single(self, c, pb);
}
}
for (c = self->clients_other; (c); c = cnext) {
cnext = c->class_next; // client_write() MAY destroy the client object!
/* Do not send to clients that are not logged in. */
if (c->state != CSTATE_CONNECTED && c->state != CSTATE_COREPEER) {
if (c->state != CSTATE_CONNECTED) {
//hlog(LOG_DEBUG, "%d/%s: not sending to client: not connected", c->fd, c->username);
continue;
}
if (c->flags & CLFLAGS_INPORT) {
/* Downstream client? If not full feed, process filters
* to see if the packet should be sent.
*/
if (( (c->flags & CLFLAGS_FULLFEED) != CLFLAGS_FULLFEED) && filter_process(self, c, pb) < 1) {
//hlog(LOG_DEBUG, "fd %d: Not fullfeed or not matching filter, not sending.", c->fd);
continue;
}
} else if (c->state == CSTATE_COREPEER || (c->flags & CLFLAGS_UPLINKPORT)) {
/* core peer or uplink? Check that the packet is
* coming from a downstream client.
*/
if ((pb->flags & F_FROM_DOWNSTR) != F_FROM_DOWNSTR) {
//hlog(LOG_DEBUG, "fd %d: Not from downstr, not sending to upstr.", c->fd);
continue;
}
} else {
hlog(LOG_DEBUG, "fd %d: Odd! Client not upstream or downstream. Not sending packets.", c->fd);
/* If not full feed, process filters to see if the packet should be sent. */
if (( (c->flags & CLFLAGS_FULLFEED) != CLFLAGS_FULLFEED) && filter_process(self, c, pb) < 1) {
//hlog(LOG_DEBUG, "fd %d: Not fullfeed or not matching filter, not sending.", c->fd);
continue;
}

View File

@ -1293,14 +1293,20 @@ static void collect_new_clients(struct worker_t *self)
hlog(LOG_DEBUG, "collect_new_clients(worker %d): client fd %d classified readonly", self->id, c->fd);
class_next = self->clients_ro;
class_prevp = &self->clients_ro;
} else if (c->state == CSTATE_COREPEER || (c->flags & CLFLAGS_UPLINKPORT)) {
hlog(LOG_DEBUG, "collect_new_clients(worker %d): client fd %d classified upstream/peer", self->id, c->fd);
class_next = self->clients_ups;
class_prevp = &self->clients_ups;
} else if (c->flags & CLFLAGS_DUPEFEED) {
hlog(LOG_DEBUG, "collect_new_clients(worker %d): client fd %d classified dupefeed", self->id, c->fd);
class_next = self->clients_dupe;
class_prevp = &self->clients_dupe;
} else {
} else if (c->flags & CLFLAGS_INPORT) {
hlog(LOG_DEBUG, "collect_new_clients(worker %d): client fd %d classified other", self->id, c->fd);
class_next = self->clients_other;
class_prevp = &self->clients_other;
} else {
hlog(LOG_ERR, "collect_new_clients(worker %d): client fd %d NOT CLASSIFIED - will not get packets", self->id, c->fd);
}
c->class_next = class_next;
if (class_next)

View File

@ -391,6 +391,7 @@ struct worker_t {
/* c->class_next lists, classified clients for optimized outbound */
struct client_t *clients_dupe; /* dupeclient port clients */
struct client_t *clients_ro; /* read-only clients */
struct client_t *clients_ups; /* upstreams and peers */
struct client_t *clients_other; /* other clients (unoptimized) */
pthread_mutex_t clients_mutex; /* mutex to protect access to the client list by the status dumps */