Moved sctp code to sctp.c to make worker.c/accept.c smaller
This commit is contained in:
parent
b3ba956a89
commit
e8e2d2b096
|
|
@ -143,7 +143,7 @@ OBJS = aprsc.o accept.o worker.o errno.o \
|
|||
keyhash.o \
|
||||
filter.o cellmalloc.o historydb.o \
|
||||
counterdata.o status.o cJSON.o \
|
||||
http.o ssl.o version.o \
|
||||
http.o ssl.o sctp.o version.o \
|
||||
@LIBOBJS@
|
||||
|
||||
clean:
|
||||
|
|
|
|||
|
|
@ -202,8 +202,7 @@ static int open_tcp_listener(struct listen_t *l, const struct addrinfo *ai, char
|
|||
}
|
||||
|
||||
#ifdef USE_SCTP
|
||||
|
||||
static int set_sctp_params(struct listen_t *l)
|
||||
static int sctp_set_listen_params(struct listen_t *l)
|
||||
{
|
||||
struct sctp_event_subscribe subscribe;
|
||||
|
||||
|
|
@ -219,7 +218,6 @@ static int set_sctp_params(struct listen_t *l)
|
|||
|
||||
return l->fd;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
|
|
@ -328,7 +326,7 @@ static int open_listener(struct listen_config_t *lc)
|
|||
lc->ai->ai_protocol == IPPROTO_SCTP) {
|
||||
i = open_tcp_listener(l, lc->ai, "SCTP");
|
||||
if (i >= 0)
|
||||
i = set_sctp_params(l);
|
||||
i = sctp_set_listen_params(l);
|
||||
#endif
|
||||
} else {
|
||||
hlog(LOG_ERR, "Unsupported listener protocol for '%s'", l->name);
|
||||
|
|
|
|||
|
|
@ -0,0 +1,179 @@
|
|||
|
||||
#include "config.h"
|
||||
#include "hlog.h"
|
||||
#include "sctp.h"
|
||||
#include "worker.h"
|
||||
|
||||
/*
|
||||
* Code to support SCTP connections
|
||||
*/
|
||||
|
||||
#ifdef USE_SCTP
|
||||
|
||||
#include <netinet/sctp.h>
|
||||
|
||||
/*
|
||||
* SCTP notification received
|
||||
*/
|
||||
|
||||
static int sctp_rx_assoc_change(struct client_t *c, union sctp_notification *sn)
|
||||
{
|
||||
switch (sn->sn_assoc_change.sac_state) {
|
||||
case SCTP_COMM_UP:
|
||||
hlog(LOG_DEBUG, "Received SCTP_COMM_UP");
|
||||
break;
|
||||
case SCTP_COMM_LOST:
|
||||
hlog(LOG_DEBUG, "Received SCTP_COMM_LOST");
|
||||
break;
|
||||
case SCTP_RESTART:
|
||||
hlog(LOG_DEBUG, "Received SCTP_RESTART");
|
||||
break;
|
||||
case SCTP_SHUTDOWN_COMP:
|
||||
hlog(LOG_DEBUG, "Received SCTP_SHUTDOWN_COMP");
|
||||
break;
|
||||
case SCTP_CANT_STR_ASSOC:
|
||||
hlog(LOG_DEBUG, "Received SCTP_CANT_STR_ASSOC");
|
||||
break;
|
||||
default:
|
||||
hlog(LOG_DEBUG, "Received assoc_change %d", sn->sn_assoc_change.sac_state);
|
||||
break;
|
||||
}
|
||||
|
||||
if (sn->sn_assoc_change.sac_state == SCTP_COMM_UP)
|
||||
return sn->sn_assoc_change.sac_assoc_id;
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
static int sctp_rx_notification(struct client_t *c, struct msghdr *m)
|
||||
{
|
||||
union sctp_notification *sn;
|
||||
|
||||
sn = (union sctp_notification *)m->msg_iov->iov_base;
|
||||
|
||||
switch(sn->sn_header.sn_type) {
|
||||
case SCTP_SHUTDOWN_EVENT: {
|
||||
struct sctp_shutdown_event *shut;
|
||||
shut = (struct sctp_shutdown_event *)m->msg_iov->iov_base;
|
||||
hlog(LOG_INFO, "SCTP shutdown on assoc id %d", shut->sse_assoc_id);
|
||||
break;
|
||||
}
|
||||
case SCTP_ASSOC_CHANGE:
|
||||
return sctp_rx_assoc_change(c, sn);
|
||||
};
|
||||
|
||||
hlog(LOG_ERR, "sctp_rx_notification: Received unexpected notification: %d", sn->sn_header.sn_type);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
typedef union {
|
||||
struct sctp_initmsg init;
|
||||
struct sctp_sndrcvinfo sndrcvinfo;
|
||||
} _sctp_cmsg_data_t;
|
||||
|
||||
typedef union {
|
||||
struct sockaddr_storage ss;
|
||||
struct sockaddr_in v4;
|
||||
struct sockaddr_in6 v6;
|
||||
struct sockaddr sa;
|
||||
} sockaddr_storage_t;
|
||||
|
||||
/*
|
||||
* handle a readable event on SCTP socket
|
||||
*/
|
||||
|
||||
|
||||
int sctp_readable(struct worker_t *self, struct client_t *c)
|
||||
{
|
||||
int e;
|
||||
struct msghdr inmsg;
|
||||
char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
|
||||
sockaddr_storage_t msgname;
|
||||
struct iovec iov;
|
||||
|
||||
/* space to receive data */
|
||||
c->ibuf_end = 0;
|
||||
iov.iov_base = c->ibuf;
|
||||
iov.iov_len = c->ibuf_size - 3;
|
||||
inmsg.msg_iov = &iov;
|
||||
inmsg.msg_iovlen = 1;
|
||||
/* or control messages */
|
||||
inmsg.msg_control = incmsg;
|
||||
inmsg.msg_controllen = sizeof(incmsg);
|
||||
inmsg.msg_name = &msgname;
|
||||
inmsg.msg_namelen = sizeof(msgname);
|
||||
|
||||
e = recvmsg(c->fd, &inmsg, MSG_WAITALL);
|
||||
if (e < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
hlog(LOG_DEBUG, "sctp_readable: EAGAIN");
|
||||
return 0;
|
||||
}
|
||||
|
||||
hlog(LOG_INFO, "sctp_readable: recvmsg returned %d: %s", e, strerror(errno));
|
||||
|
||||
client_close(self, c, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (e == 0) {
|
||||
hlog( LOG_DEBUG, "sctp_readable: EOF from socket fd %d (%s @ %s)",
|
||||
c->fd, c->addr_rem, c->addr_loc );
|
||||
client_close(self, c, CLIERR_EOF);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (inmsg.msg_flags & MSG_NOTIFICATION) {
|
||||
hlog(LOG_DEBUG, "sctp_readable: got MSG_NOTIFICATION");
|
||||
int associd = sctp_rx_notification(c, &inmsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
//hlog_packet(LOG_DEBUG, iov.iov_base, e, "sctp_readable: got data: ");
|
||||
c->ibuf[e++] = '\r';
|
||||
c->ibuf[e++] = '\n';
|
||||
|
||||
return client_postread(self, c, e);
|
||||
}
|
||||
|
||||
/*
|
||||
* SCTP socket is now writeable, but we really don't do SCTP buffering yet...
|
||||
*/
|
||||
|
||||
int sctp_writeable(struct worker_t *self, struct client_t *c)
|
||||
{
|
||||
hlog(LOG_INFO, "sctp_writeable: SCTP tx buffering not implemented, closing socket");
|
||||
client_close(self, c, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Write data to an SCTP client
|
||||
*/
|
||||
|
||||
int sctp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
|
||||
{
|
||||
//hlog_packet(LOG_DEBUG, p, len, "client_write_sctp %d bytes: ", len);
|
||||
|
||||
if (len == 0)
|
||||
return 0;
|
||||
|
||||
int i = send(c->fd, p, len-2, 0);
|
||||
|
||||
if (i < 0) {
|
||||
hlog(LOG_ERR, "SCTP transmit error to fd %d / %s: %s",
|
||||
c->fd, c->addr_rem, strerror(errno));
|
||||
} else if (i != len -2) {
|
||||
hlog(LOG_ERR, "SCTP transmit incomplete to fd %d / %s: wrote %d of %d bytes, errno: %s",
|
||||
c->fd, c->addr_rem, i, len-2, strerror(errno));
|
||||
} else {
|
||||
//hlog(LOG_DEBUG, "SCTP transmit ok to %s: %d bytes", c->addr_rem, i);
|
||||
c->obuf_wtime = tick;
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
|
@ -0,0 +1,11 @@
|
|||
|
||||
#ifndef SCTP_H
|
||||
#define SCTP_H
|
||||
|
||||
#ifdef USE_SCTP
|
||||
extern int sctp_readable(struct worker_t *self, struct client_t *c);
|
||||
extern int sctp_writeable(struct worker_t *self, struct client_t *c);
|
||||
extern int sctp_client_write(struct worker_t *self, struct client_t *c, char *p, int len);
|
||||
#endif
|
||||
|
||||
#endif
|
||||
206
src/worker.c
206
src/worker.c
|
|
@ -35,10 +35,7 @@
|
|||
#include "cellmalloc.h"
|
||||
#include "version.h"
|
||||
#include "status.h"
|
||||
|
||||
#ifdef USE_SCTP
|
||||
#include <netinet/sctp.h>
|
||||
#endif
|
||||
#include "sctp.h"
|
||||
|
||||
|
||||
time_t now; /* current time, updated by the main thread, MAY be spun around by NTP */
|
||||
|
|
@ -826,31 +823,6 @@ void client_close(struct worker_t *self, struct client_t *c, int errnum)
|
|||
self->client_count--;
|
||||
}
|
||||
|
||||
#ifdef USE_SCTP
|
||||
int client_write_sctp(struct worker_t *self, struct client_t *c, char *p, int len)
|
||||
{
|
||||
//hlog_packet(LOG_DEBUG, p, len, "client_write_sctp %d bytes: ", len);
|
||||
|
||||
if (len == 0)
|
||||
return 0;
|
||||
|
||||
int i = send(c->fd, p, len-2, 0);
|
||||
|
||||
if (i < 0) {
|
||||
hlog(LOG_ERR, "SCTP transmit error to fd %d / %s: %s",
|
||||
c->fd, c->addr_rem, strerror(errno));
|
||||
} else if (i != len -2) {
|
||||
hlog(LOG_ERR, "SCTP transmit incomplete to fd %d / %s: wrote %d of %d bytes, errno: %s",
|
||||
c->fd, c->addr_rem, i, len-2, strerror(errno));
|
||||
} else {
|
||||
//hlog(LOG_DEBUG, "SCTP transmit ok to %s: %d bytes", c->addr_rem, i);
|
||||
c->obuf_wtime = tick;
|
||||
}
|
||||
|
||||
return i;
|
||||
}
|
||||
#endif
|
||||
|
||||
/*
|
||||
* write data to a client (well, at least put it in the output buffer)
|
||||
* (this is also used with len=0 to flush current buffer)
|
||||
|
|
@ -906,7 +878,7 @@ int client_write(struct worker_t *self, struct client_t *c, char *p, int len)
|
|||
|
||||
#ifdef USE_SCTP
|
||||
if (c->ai_protocol == IPPROTO_SCTP) {
|
||||
return client_write_sctp(self, c, p, len);
|
||||
return sctp_client_write(self, c, p, len);
|
||||
}
|
||||
#endif
|
||||
|
||||
|
|
@ -1190,143 +1162,11 @@ int client_postread(struct worker_t *self, struct client_t *c, int r)
|
|||
}
|
||||
|
||||
|
||||
/*
|
||||
* handle a readable event on SCTP socket
|
||||
*/
|
||||
|
||||
#ifdef USE_SCTP
|
||||
|
||||
/*
|
||||
* SCTP notification received
|
||||
*/
|
||||
|
||||
static int sctp_rx_assoc_change(struct client_t *c, union sctp_notification *sn)
|
||||
{
|
||||
switch (sn->sn_assoc_change.sac_state) {
|
||||
case SCTP_COMM_UP:
|
||||
hlog(LOG_DEBUG, "Received SCTP_COMM_UP");
|
||||
break;
|
||||
case SCTP_COMM_LOST:
|
||||
hlog(LOG_DEBUG, "Received SCTP_COMM_LOST");
|
||||
break;
|
||||
case SCTP_RESTART:
|
||||
hlog(LOG_DEBUG, "Received SCTP_RESTART");
|
||||
break;
|
||||
case SCTP_SHUTDOWN_COMP:
|
||||
hlog(LOG_DEBUG, "Received SCTP_SHUTDOWN_COMP");
|
||||
break;
|
||||
case SCTP_CANT_STR_ASSOC:
|
||||
hlog(LOG_DEBUG, "Received SCTP_CANT_STR_ASSOC");
|
||||
break;
|
||||
default:
|
||||
hlog(LOG_DEBUG, "Received assoc_change %d", sn->sn_assoc_change.sac_state);
|
||||
break;
|
||||
}
|
||||
|
||||
if (sn->sn_assoc_change.sac_state == SCTP_COMM_UP)
|
||||
return sn->sn_assoc_change.sac_assoc_id;
|
||||
|
||||
return 0;
|
||||
|
||||
}
|
||||
|
||||
static int sctp_rx_notification(struct client_t *c, struct msghdr *m)
|
||||
{
|
||||
union sctp_notification *sn;
|
||||
|
||||
sn = (union sctp_notification *)m->msg_iov->iov_base;
|
||||
|
||||
switch(sn->sn_header.sn_type) {
|
||||
case SCTP_SHUTDOWN_EVENT: {
|
||||
struct sctp_shutdown_event *shut;
|
||||
shut = (struct sctp_shutdown_event *)m->msg_iov->iov_base;
|
||||
hlog(LOG_INFO, "SCTP shutdown on assoc id %d", shut->sse_assoc_id);
|
||||
break;
|
||||
}
|
||||
case SCTP_ASSOC_CHANGE:
|
||||
return sctp_rx_assoc_change(c, sn);
|
||||
};
|
||||
|
||||
hlog(LOG_ERR, "sctp_rx_notification: Received unexpected notification: %d", sn->sn_header.sn_type);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/*
|
||||
* Receive something on an SCTP socket
|
||||
*/
|
||||
|
||||
|
||||
typedef union {
|
||||
struct sctp_initmsg init;
|
||||
struct sctp_sndrcvinfo sndrcvinfo;
|
||||
} _sctp_cmsg_data_t;
|
||||
|
||||
typedef union {
|
||||
struct sockaddr_storage ss;
|
||||
struct sockaddr_in v4;
|
||||
struct sockaddr_in6 v6;
|
||||
struct sockaddr sa;
|
||||
} sockaddr_storage_t;
|
||||
|
||||
|
||||
static int sctp_readable(struct worker_t *self, struct client_t *c)
|
||||
{
|
||||
int e;
|
||||
struct msghdr inmsg;
|
||||
char incmsg[CMSG_SPACE(sizeof(_sctp_cmsg_data_t))];
|
||||
sockaddr_storage_t msgname;
|
||||
struct iovec iov;
|
||||
|
||||
/* space to receive data */
|
||||
c->ibuf_end = 0;
|
||||
iov.iov_base = c->ibuf;
|
||||
iov.iov_len = c->ibuf_size - 3;
|
||||
inmsg.msg_iov = &iov;
|
||||
inmsg.msg_iovlen = 1;
|
||||
/* or control messages */
|
||||
inmsg.msg_control = incmsg;
|
||||
inmsg.msg_controllen = sizeof(incmsg);
|
||||
inmsg.msg_name = &msgname;
|
||||
inmsg.msg_namelen = sizeof(msgname);
|
||||
|
||||
e = recvmsg(c->fd, &inmsg, MSG_WAITALL);
|
||||
if (e < 0) {
|
||||
if (errno == EAGAIN) {
|
||||
hlog(LOG_DEBUG, "sctp_readable: EAGAIN");
|
||||
return 0;
|
||||
}
|
||||
|
||||
hlog(LOG_INFO, "sctp_readable: recvmsg returned %d: %s", e, strerror(errno));
|
||||
|
||||
client_close(self, c, errno);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (e == 0) {
|
||||
hlog( LOG_DEBUG, "sctp_readable: EOF from socket fd %d (%s @ %s)",
|
||||
c->fd, c->addr_rem, c->addr_loc );
|
||||
client_close(self, c, CLIERR_EOF);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (inmsg.msg_flags & MSG_NOTIFICATION) {
|
||||
hlog(LOG_DEBUG, "sctp_readable: got MSG_NOTIFICATION");
|
||||
int associd = sctp_rx_notification(c, &inmsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
//hlog_packet(LOG_DEBUG, iov.iov_base, e, "sctp_readable: got data: ");
|
||||
c->ibuf[e++] = '\r';
|
||||
c->ibuf[e++] = '\n';
|
||||
|
||||
return client_postread(self, c, e);
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
/*
|
||||
* handle an event on an fd
|
||||
*
|
||||
* TODO: replace the conditional jumping with a 'readable' handler pointer
|
||||
*/
|
||||
|
||||
static int handle_client_readable(struct worker_t *self, struct client_t *c)
|
||||
|
|
@ -1344,14 +1184,14 @@ static int handle_client_readable(struct worker_t *self, struct client_t *c)
|
|||
return -1;
|
||||
}
|
||||
|
||||
#ifdef USE_SSL
|
||||
if (c->ssl_con)
|
||||
return ssl_readable(self, c);
|
||||
#endif
|
||||
#ifdef USE_SCTP
|
||||
if (c->ai_protocol == IPPROTO_SCTP)
|
||||
return sctp_readable(self, c);
|
||||
#endif
|
||||
#ifdef USE_SSL
|
||||
if (c->ssl_con)
|
||||
return ssl_readable(self, c);
|
||||
#endif
|
||||
|
||||
r = read(c->fd, c->ibuf + c->ibuf_end, c->ibuf_size - c->ibuf_end - 1);
|
||||
|
||||
|
|
@ -1377,20 +1217,22 @@ static int handle_client_readable(struct worker_t *self, struct client_t *c)
|
|||
return client_postread(self, c, r);
|
||||
}
|
||||
|
||||
/*
|
||||
* client fd is now writeable
|
||||
*
|
||||
* TODO: replace ssl/sctp conditional calls with a 'writable' handler pointer
|
||||
*/
|
||||
|
||||
static int handle_client_writeable(struct worker_t *self, struct client_t *c)
|
||||
{
|
||||
int r;
|
||||
|
||||
if (c->obuf_start == c->obuf_end) {
|
||||
/* there is nothing to write any more */
|
||||
//hlog(LOG_DEBUG, "writable: nothing to write on fd %d (%s)", c->fd, c->addr_rem);
|
||||
xpoll_outgoing(&self->xp, c->xfd, 0);
|
||||
c->obuf_start = c->obuf_end = 0;
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* TODO: call client_try_write */
|
||||
|
||||
#ifdef USE_SCTP
|
||||
if (c->ai_protocol == IPPROTO_SCTP)
|
||||
return sctp_writeable(self, c);
|
||||
#endif
|
||||
#ifdef USE_SSL
|
||||
if (c->ssl_con)
|
||||
return ssl_writeable(self, c);
|
||||
|
|
@ -1428,8 +1270,16 @@ static int handle_client_event(struct xpoll_t *xp, struct xpoll_fd_t *xfd)
|
|||
|
||||
if (xfd->result & XP_OUT) { /* priorize doing output */
|
||||
/* ah, the client is writable */
|
||||
if (handle_client_writeable(self, c) < 0)
|
||||
return 0;
|
||||
|
||||
if (c->obuf_start == c->obuf_end) {
|
||||
/* there is nothing to write any more */
|
||||
//hlog(LOG_DEBUG, "client writable: nothing to write on fd %d (%s)", c->fd, c->addr_rem);
|
||||
xpoll_outgoing(&self->xp, c->xfd, 0);
|
||||
c->obuf_start = c->obuf_end = 0;
|
||||
} else {
|
||||
if (handle_client_writeable(self, c) < 0)
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
if (xfd->result & XP_IN) { /* .. before doing input */
|
||||
|
|
|
|||
Loading…
Reference in New Issue