diff --git a/src/accept.c b/src/accept.c index aa27869..e49f147 100644 --- a/src/accept.c +++ b/src/accept.c @@ -50,8 +50,9 @@ struct listen_t { struct listen_t *next; struct listen_t **prevp; + int id; int fd; - int clientflags; + int client_flags; int portnum; int clients_max; int corepeer; @@ -84,14 +85,17 @@ static struct listen_t *listener_alloc(void) struct listen_t *l = hmalloc(sizeof(*l)); memset( l, 0, sizeof(*l) ); l->fd = -1; - + l->id = -1; + return l; } static void listener_free(struct listen_t *l) { int i; - + + hlog(LOG_DEBUG, "Freeing listener %d '%s': %s", l->id, l->name, l->addr_s); + if (l->udp) { l->udp->configured = 0; l->fd = -1; @@ -112,6 +116,12 @@ static void listener_free(struct listen_t *l) if (l->acl) acl_free(l->acl); + /* merge listener list around this node */ + if (l->next) + l->next->prevp = l->prevp; + if (l->prevp) + *(l->prevp) = l->next; + hfree(l); } @@ -190,9 +200,6 @@ static int open_udp_listener(struct listen_t *l, const struct addrinfo *ai) c->af = ai->ai_family; c->portaccount = l->portaccount; - inbound_connects_account(3, c->portaccount); /* "3" = udp, not listening.. - account all ports + port-specifics */ - if (1) { int len, arg; /* Set bigger socket buffer sizes for the UDP port.. @@ -223,82 +230,130 @@ static int open_udp_listener(struct listen_t *l, const struct addrinfo *ai) return fd; } -static int open_listeners(void) +static int open_listener(struct listen_config_t *lc) +{ + struct listen_t *l; + int i; + + l = listener_alloc(); + l->id = lc->id; + l->hidden = lc->hidden; + l->corepeer = lc->corepeer; + l->client_flags = lc->client_flags; + l->clients_max = lc->clients_max; + + l->portaccount = port_accounter_alloc(); + + /* Pick first of the AIs for this listen definition */ + l->addr_s = strsockaddr( lc->ai->ai_addr, lc->ai->ai_addrlen ); + l->name = hstrdup(lc->name); + l->portnum = lc->portnum; + + hlog(LOG_DEBUG, "Opening listener %d '%s': %s", lc->id, lc->name, l->addr_s); + + if (lc->ai->ai_socktype == SOCK_DGRAM && + lc->ai->ai_protocol == IPPROTO_UDP) { + /* UDP listenting is not quite same as TCP listening.. */ + i = open_udp_listener(l, lc->ai); + } else { + /* TCP listenting... */ + i = open_tcp_listener(l, lc->ai); + } + + if (i < 0) { + hlog(LOG_DEBUG, "... failed"); + listener_free(l); + return -1; + } + + hlog(LOG_DEBUG, "... ok, bound"); + + /* Copy access lists */ + if (lc->acl) + l->acl = acl_dup(lc->acl); + + /* Copy filter definitions */ + for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) { + if (i < (sizeof(lc->filters)/sizeof(lc->filters[0]))) + l->filters[i] = (lc->filters[i]) ? hstrdup(lc->filters[i]) : NULL; + else + l->filters[i] = NULL; + } + + hlog(LOG_DEBUG, "... adding %s to listened sockets", l->addr_s); + // put (first) in the list of listening sockets + l->next = listen_list; + l->prevp = &listen_list; + if (listen_list) + listen_list->prevp = &l->next; + listen_list = l; + + return 0; +} + +struct listen_t *find_listener_id(int id) +{ + struct listen_t *l = listen_list; + + while (l) { + if (l->id == id) + return l; + l = l->next; + } + + return NULL; +} + + +static int open_missing_listeners(void) { struct listen_config_t *lc; - struct listen_t *l; - int opened = 0, i; + int opened = 0; for (lc = listen_config; (lc); lc = lc->next) { - l = listener_alloc(); - l->hidden = lc->hidden; - l->corepeer = lc->corepeer; - l->clientflags = lc->client_flags; - l->clients_max = lc->clients_max; - - l->portaccount = port_accounter_alloc(); - - /* Pick first of the AIs for this listen definition */ - - l->addr_s = strsockaddr( lc->ai->ai_addr, lc->ai->ai_addrlen ); - l->name = hstrdup(lc->name); - l->portnum = lc->portnum; - - if (lc->ai->ai_socktype == SOCK_DGRAM && - lc->ai->ai_protocol == IPPROTO_UDP) { - /* UDP listenting is not quite same as TCP listening.. */ - i = open_udp_listener(l, lc->ai); - } else { - /* TCP listenting... */ - i = open_tcp_listener(l, lc->ai); - } - - if (i >= 0) { - opened++; - hlog(LOG_DEBUG, "... ok, bound"); - } else { - hlog(LOG_DEBUG, "... failed"); - listener_free(l); + if (find_listener_id(lc->id)) { + hlog(LOG_DEBUG, "open_missing_listeners: already listening %d '%s': %s:%d", lc->id, lc->name, lc->host, lc->portnum); continue; } - /* Copy access lists */ - if (lc->acl) - l->acl = acl_dup(lc->acl); - - /* Copy filter definitions */ - for (i = 0; i < (sizeof(l->filters)/sizeof(l->filters[0])); ++i) { - if (i < (sizeof(lc->filters)/sizeof(lc->filters[0]))) - l->filters[i] = (lc->filters[i]) ? hstrdup(lc->filters[i]) : NULL; - else - l->filters[i] = NULL; - } - - hlog(LOG_DEBUG, "... adding %s to listened sockets", l->addr_s); - // put (first) in the list of listening sockets - l->next = listen_list; - l->prevp = &listen_list; - if (listen_list) - listen_list->prevp = &l->next; - listen_list = l; + if (open_listener(lc) == 0) + opened++; } return opened; } +static int close_removed_listeners(void) +{ + int closed = 0; + + hlog(LOG_DEBUG, "Closing removed listening sockets...."); + struct listen_t *l, *next; + next = listen_list; + while (next) { + l = next; + next = l->next; + + struct listen_config_t *lc = find_listen_config_id(listen_config, l->id); + if (!lc) { + hlog(LOG_INFO, "Listener %d (%s) no longer in configuration, closing port....", + l->id, l->addr_s); + listener_free(l); + closed++; + } + } + + return closed; +} + static void close_listeners(void) { if (!listen_list) return; hlog(LOG_DEBUG, "Closing listening sockets...."); - struct listen_t *l; - while (listen_list) { - l = listen_list; - listen_list = listen_list->next; - - listener_free(l); - } + while (listen_list) + listener_free(listen_list); } /* @@ -344,6 +399,8 @@ static void peerip_clients_config(void) c->connect_time = tick; c->last_read = tick; /* not simulated time */ + inbound_connects_account(3, c->udpclient->portaccount); /* "3" = udp, not listening.. */ + /* convert client address to string */ s = strsockaddr( &c->udpaddr.sa, c->udpaddrlen ); @@ -493,7 +550,7 @@ static void accept_udp_recv(struct listen_t *l) addrlen = sizeof(addr); while ((i = recvfrom( l->udp->fd, buf, sizeof(buf)-1, MSG_DONTWAIT|MSG_TRUNC, (struct sockaddr *)&addr, &addrlen )) >= 0) { - if (!(l->clientflags & CLFLAGS_UDPSUBMIT)) { + if (!(l->client_flags & CLFLAGS_UDPSUBMIT)) { hlog(LOG_DEBUG, "accept thread discarded an UDP packet on a listening socket"); continue; } @@ -610,7 +667,7 @@ static void do_accept(struct listen_t *l) c->portnum = l->portnum; c->hidden = l->hidden; c->state = CSTATE_LOGIN; - c->flags = l->clientflags; + c->flags = l->client_flags; /* use the default login handler */ c->handler = &login_handler; c->udpclient = client_udp_find(udpclients, sa.sa.sa_family, l->portnum); @@ -777,7 +834,9 @@ void accept_thread(void *asdf) sigset_t sigs_to_block; int e, n; struct pollfd *acceptpfd = NULL; + struct listen_t **acceptpl = NULL; int listen_n = 0; + int poll_n = 0; struct listen_t *l; pthreads_profiling_reset("accept"); @@ -814,17 +873,33 @@ void accept_thread(void *asdf) while (!accept_shutting_down) { if (accept_reconfiguring) { accept_reconfiguring = 0; - if (listen_n) - close_listeners(); + listen_n -= close_removed_listeners(); /* start listening on the sockets */ - if ((listen_n = open_listeners()) <= 0) { + listen_n += open_missing_listeners(); + + if (listen_n < 1) { hlog(LOG_CRIT, "Failed to listen on any ports."); exit(2); } - hlog(LOG_DEBUG, "Generating polling list..."); - acceptpfd = hmalloc(listen_n * sizeof(*acceptpfd)); + poll_n = 0; + for (l = listen_list; (l); l = l->next) + if (!l->corepeer) + poll_n++; + + hlog(LOG_DEBUG, "Generating polling list for %d/%d listeners...", poll_n, listen_n); + + /* array of FDs for poll() */ + if (acceptpfd) + hfree(acceptpfd); + acceptpfd = hmalloc(poll_n * sizeof(*acceptpfd)); + + /* array of listeners */ + if (acceptpl) + hfree(acceptpl); + acceptpl = hmalloc(poll_n * sizeof(*acceptpl)); + n = 0; for (l = listen_list; (l); l = l->next) { /* The accept thread does not poll() UDP sockets for core peers. @@ -832,7 +907,6 @@ void accept_thread(void *asdf) */ if (l->corepeer) { hlog(LOG_DEBUG, "... %d: fd %d (%s) - not polled, is corepeer", n, (l->udp) ? l->udp->fd : l->fd, l->addr_s); - listen_n--; continue; } @@ -847,6 +921,7 @@ void accept_thread(void *asdf) hlog(LOG_DEBUG, "... %d: fd %d (%s)", n, fd, l->addr_s); acceptpfd[n].fd = fd; acceptpfd[n].events = POLLIN|POLLPRI|POLLERR|POLLHUP; + acceptpl[n] = l; n++; } hlog(LOG_INFO, "Accept thread ready."); @@ -875,7 +950,7 @@ void accept_thread(void *asdf) } /* check for new connections */ - e = poll(acceptpfd, listen_n, 200); + e = poll(acceptpfd, poll_n, 200); if (e == 0) continue; if (e < 0) { @@ -886,8 +961,8 @@ void accept_thread(void *asdf) } /* now, which socket was that on? */ - l = listen_list; - for (n = 0; n < listen_n; n++) { + for (n = 0; n < poll_n; n++) { + l = acceptpl[n]; if (!(l) || (l->udp ? l->udp->fd : l->fd) != acceptpfd[n].fd) { hlog(LOG_CRIT, "accept_thread: polling list and listener list do mot match!"); exit(1); @@ -898,7 +973,6 @@ void accept_thread(void *asdf) else do_accept(l); /* accept a single connection */ } - l = l->next; } } @@ -909,6 +983,9 @@ void accept_thread(void *asdf) http_shutting_down = 1; workers_stop(1); hfree(acceptpfd); + hfree(acceptpl); + acceptpfd = NULL; + acceptpl = NULL; /* free up the pseudo-client */ client_free(udp_pseudoclient); @@ -959,8 +1036,10 @@ int accept_listener_status(cJSON *listeners, cJSON *totals) json_add_rxerrs(jl, "rx_errs", l->portaccount->rxerrs); cJSON_AddItemToArray(listeners, jl); - total_clients += l->portaccount->gauge; - total_connects += l->portaccount->counter; + if (!(l->udp)) { + total_clients += l->portaccount->gauge; + total_connects += l->portaccount->counter; + } /* total_rxbytes += l->portaccount->rxbytes; total_txbytes += l->portaccount->txbytes; diff --git a/src/config.c b/src/config.c index 0b87416..49b19b7 100644 --- a/src/config.c +++ b/src/config.c @@ -220,6 +220,7 @@ void free_listen_config(struct listen_config_t **lc) *lc = this->next; hfree((void*)this->name); hfree((void*)this->host); + hfree((void*)this->proto); for (i = 0; i < (sizeof(this->filters)/sizeof(this->filters[0])); ++i) if (this->filters[i]) hfree((void*)this->filters[i]); @@ -230,6 +231,33 @@ void free_listen_config(struct listen_config_t **lc) } } +struct listen_config_t *find_listen_config_id(struct listen_config_t *l, int id) +{ + while (l) { + if (l->id == id) + return l; + l = l->next; + } + + return NULL; +} + +static struct listen_config_t *find_listen_config(struct listen_config_t *l, + const char *proto, const char *host, int portnum) +{ + while (l) { + if (l->portnum == portnum + && strcmp(l->host, host) == 0 + && strcmp(l->proto, proto) == 0) + return l; + + l = l->next; + } + + return NULL; +} + + /* * Free a peer-ip config tree */ @@ -478,9 +506,11 @@ int do_peergroup(struct peerip_config_t **lq, int argc, char **argv) /* Configure a listener */ li = hmalloc(sizeof(*li)); memset(li, 0, sizeof(*li)); + li->id = random(); li->corepeer = 1; li->name = hstrdup(argv[1]); li->host = fullhost; + li->proto = hstrdup("udp"); li->portnum = localport; li->client_flags = 0; li->clients_max = 1; @@ -770,6 +800,7 @@ int do_listen(struct listen_config_t **lq, int argc, char **argv) /* default parameters for a listener */ int clflags = CLFLAGS_INPORT; int clients_max = 200; + char *proto; memset(&req, 0, sizeof(req)); req.ai_family = 0; @@ -803,23 +834,24 @@ int do_listen(struct listen_config_t **lq, int argc, char **argv) hlog(LOG_ERR, "Listen: unknown port type: %s", argv[2]); } - if (strcasecmp(argv[3], "tcp") == 0) { + proto = argv[3]; + if (strcasecmp(proto, "tcp") == 0) { /* well, do nothing for now. */ - } else if (strcasecmp(argv[3], "udp") == 0) { + } else if (strcasecmp(proto, "udp") == 0) { req.ai_socktype = SOCK_DGRAM; req.ai_protocol = IPPROTO_UDP; #if defined(SOCK_SEQPACKET) && defined(IPPROTO_SCTP) - } else if (strcasecmp(argv[3], "sctp") == 0) { + } else if (strcasecmp(proto, "sctp") == 0) { req.ai_socktype = SOCK_SEQPACKET; req.ai_protocol = IPPROTO_SCTP; #endif } else { - hlog(LOG_ERR, "Listen: Unsupported protocol '%s'\n", argv[3]); + hlog(LOG_ERR, "Listen: Unsupported protocol '%s'\n", proto); return -2; } if ((clflags & CLFLAGS_UDPSUBMIT) && req.ai_protocol != IPPROTO_UDP) { - hlog(LOG_ERR, "Listen: Invalid protocol '%s' for udpsubmit port - only UDP is supported\n", argv[3]); + hlog(LOG_ERR, "Listen: Invalid protocol '%s' for udpsubmit port - only UDP is supported\n", proto); return -2; } @@ -839,6 +871,7 @@ int do_listen(struct listen_config_t **lq, int argc, char **argv) memset(l, 0, sizeof(*l)); l->name = hstrdup(argv[1]); l->host = hstrdup(argv[4]); + l->proto = hstrdup(proto); l->portnum = port; l->client_flags = clflags; l->clients_max = clients_max; @@ -928,6 +961,19 @@ int do_listen(struct listen_config_t **lq, int argc, char **argv) if (port < 1024) listen_low_ports = 1; + /* find existing config for same proto-host-port combination */ + struct listen_config_t *old_l; + old_l = find_listen_config(listen_config, l->proto, l->host, l->portnum); + if (old_l) { + /* this is an old config... see if it changed in a way which + * would require listener reconfiguration + */ + l->id = old_l->id; + } else { + /* new config, assign new id */ + l->id = random(); + } + /* put in the list */ l->next = *lq; if (l->next) @@ -1045,7 +1091,13 @@ int read_config(void) hlog(LOG_CRIT, "Config: rundir not defined."); failed = 1; } + } else { + if (new_rundir) { + hfree(new_rundir); + new_rundir = NULL; + } } + if (!log_dir) { hlog(LOG_CRIT, "Config: logdir not defined."); failed = 1; diff --git a/src/config.h b/src/config.h index debdd32..2e8ac00 100644 --- a/src/config.h +++ b/src/config.h @@ -86,19 +86,25 @@ struct listen_config_t { struct listen_config_t *next; struct listen_config_t **prevp; /* pointer to the *next pointer in the previous node */ - const char *name; /* name of socket */ - const char *host; /* hostname or dotted-quad IP to bind the UDP socket to, default INADDR_ANY */ + int id; /* id of listener config */ + + const char *proto; /* protocol: tcp / udp / sctp */ + const char *name; /* name of socket */ + const char *host; /* hostname or dotted-quad IP to bind the UDP socket to, default INADDR_ANY */ int portnum; int clients_max; - int corepeer; + int corepeer; /* special listener for corepeer packets */ int hidden; struct addrinfo *ai; struct acl_t *acl; const char *filters[LISTEN_MAX_FILTERS]; /* up to 10 filters, NULL when not defined */ - - int client_flags; + + int client_flags; /* cflags set for clients of this socket */ + + /* reconfiguration support flags */ + int changed; /* configuration has changed */ }; struct peerip_config_t { @@ -177,6 +183,7 @@ extern int parse_args_noshell(char *argv[],char *cmd); extern void sanitize_ascii_string(char *s); extern void free_uplink_config(struct uplink_config_t **lc); +extern struct listen_config_t *find_listen_config_id(struct listen_config_t *l, int id); extern int read_config(void); extern void free_config(void); diff --git a/src/login.c b/src/login.c index 1f36d9e..a562828 100644 --- a/src/login.c +++ b/src/login.c @@ -241,6 +241,8 @@ int login_handler(struct worker_t *self, struct client_t *c, int l4proto, char * c->udpaddr.si6.sin6_port = htons(c->udp_port); c->udpaddrlen = sizeof(c->udpaddr.si6); } + + inbound_connects_account(3, c->udpclient->portaccount); /* "3" = udp, not listening.. */ } else { /* Sorry, no UDP service for this port.. */ hlog(LOG_DEBUG, "%s/%s: Requested UDP on client port with no UDP configured", c->addr_rem, username); diff --git a/src/worker.c b/src/worker.c index 6e47676..9bf192d 100644 --- a/src/worker.c +++ b/src/worker.c @@ -103,7 +103,7 @@ struct portaccount_t *port_accounter_alloc(void) p->refcount = 1; pthread_mutex_init( & p->mutex, NULL ); - // hlog(LOG_DEBUG, "new port_accounter %p", p); + hlog(LOG_DEBUG, "new port_accounter %p", p); return p; } @@ -135,6 +135,8 @@ static void port_accounter_add(struct portaccount_t *p) hlog(LOG_ERR, "port_accounter_add: could not lock portaccount: %s", strerror(i)); return; } + + hlog(LOG_DEBUG, "port_accounter_add %p", p); ++ p->refcount; ++ p->counter; @@ -169,7 +171,7 @@ void port_accounter_drop(struct portaccount_t *p) return; } - // hlog(LOG_DEBUG, "port_accounter_drop(%p) refcount=%d", p, r); + hlog(LOG_DEBUG, "port_accounter_drop(%p) refcount=%d", p, r); if (r == 0) { /* Last reference is being destroyed */ @@ -775,12 +777,18 @@ void client_close(struct worker_t *self, struct client_t *c, int errnum) */ if (c->flags & CLFLAGS_UPLINKPORT && c->state != CSTATE_COREPEER) uplink_close(c, errnum); - else { - /* Else if it is an inbound connection, handle their - * population accounting... - */ + + if (c->portaccount) { + /* If port accounting is done, handle population accounting... */ + hlog(LOG_DEBUG, "client_close dropping inbound_connects_account %p", c->portaccount); inbound_connects_account(0, c->portaccount); c->portaccount = NULL; + } else { + hlog(LOG_DEBUG, "client_close: has no portaccount"); + } + + if (c->udp_port && c->udpclient->portaccount) { + inbound_connects_account(2, c->udpclient->portaccount); /* udp client count goes down */ } /* free it up */ @@ -1329,6 +1337,8 @@ static void collect_new_clients(struct worker_t *self) /* corepeer reconfig flag */ hlog(LOG_DEBUG, "collect_new_clients(worker %d): closing all existing peergroup peers", self->id); corepeer_close_all(self); + client_free(c); + i--; /* don't count it in */ continue; }