IS2 UDP corepeer: make it pass packets

This commit is contained in:
Heikki Hannikainen 2017-04-15 19:57:19 +03:00
parent 3da3454fd5
commit 8dd4b107de
4 changed files with 161 additions and 21 deletions

View File

@ -63,6 +63,26 @@ static int is2_write_message(struct worker_t *self, struct client_t *c, IS2Messa
return r;
}
/*
* Write an UDP message to a client, return result from c->write
*/
static int is2_corepeer_write_message(struct worker_t *self, struct client_t *c, IS2Message *m)
{
/* Could optimize by writing directly on client obuf...
* if it doesn't fit there, we're going to disconnect anyway.
*/
int len = is2_message__get_packed_size(m);
void *buf = is2_allocate_buffer(len);
is2_message__pack(m, buf + IS2_HEAD_LEN);
int r = udp_client_write(self, c, buf, len + IS2_HEAD_LEN + IS2_TAIL_LEN); // TODO: return value check!
hfree(buf);
hlog(LOG_DEBUG, "%s/%s: IS2 UDP: serialized length %d, frame %d, wrote %d", c->addr_rem, c->username, len, len + IS2_HEAD_LEN + IS2_TAIL_LEN, r);
return r;
}
/*
* Transmit a server signature to a new client
*/
@ -595,6 +615,28 @@ int is2_input_handler(struct worker_t *self, struct client_t *c, IS2Message *m)
return 0;
}
/*
* IS2 input handler, corepeer state
*/
int is2_input_handler_corepeer(struct worker_t *self, struct client_t *c, IS2Message *m)
{
switch (m->type) {
case IS2_MESSAGE__TYPE__KEEPALIVE_PING:
return is2_in_ping(self, c, m);
break;
case IS2_MESSAGE__TYPE__IS_PACKET:
return is2_in_packet(self, c, m);
break;
default:
hlog(LOG_WARNING, "%s/%s: IS2 UDP: unknown message type %d",
c->addr_rem, c->username, m->type);
break;
};
return 0;
}
/*
* Unpack a single message from the input buffer.
*/
@ -677,22 +719,57 @@ int is2_deframe_input(struct worker_t *self, struct client_t *c, int start_at)
return i;
}
int is2_corepeer_deframe_input(struct worker_t *self, struct client_t *c, char *ibuf, int len)
{
if (len < IS2_MINIMUM_FRAME_LEN) {
hlog_packet(LOG_DEBUG, ibuf, len, "%s/%s: IS2 UDP: Too short UDP frame (%d): ",
c->addr_rem, c->username, len);
return -1;
}
if (*ibuf != STX) {
hlog_packet(LOG_WARNING, ibuf, len, "%s/%s: IS2 UDP: Frame missing STX in beginning: ",
c->addr_rem, c->username);
return -1;
}
uint32_t *ip = (uint32_t *)ibuf;
uint32_t clen = ntohl(*ip) & 0xffffff;
if (clen < IS2_MINIMUM_FRAME_CONTENT_LEN) {
hlog_packet(LOG_WARNING, ibuf, len, "%s/%s: IS2 UDP: Too short frame content (%d): ",
c->addr_rem, c->username, clen);
return -1;
}
if (IS2_HEAD_LEN + clen + IS2_TAIL_LEN > len) {
hlog_packet(LOG_WARNING, ibuf, len, "%s/%s: IS2 UDP: Frame length points behind buffer end (%d+%d buflen %d): ", c->addr_rem, c->username, clen, IS2_HEAD_LEN + IS2_TAIL_LEN, len);
return -1;
}
if (ibuf[IS2_HEAD_LEN + clen] != ETX) {
hlog_packet(LOG_WARNING, ibuf, len, "%s/%s: IS2: Frame missing terminating ETX: ",
c->addr_rem, c->username);
return -1;
}
//hlog_packet(LOG_DEBUG, this, left, "%s/%s: IS2: framing ok: ", c->addr_rem, c->username);
is2_unpack_message(self, c, ibuf + IS2_HEAD_LEN, clen);
return 1;
}
/*
* Write a single packet to a client.
*
* OPTIMIZE: generate packet once, or reuse incoming prepacked buffer
*/
int is2_write_packet(struct worker_t *self, struct client_t *c, char *p, int len)
static int is2_encode_packet(IS2Message *m, ProtobufCBinaryData *data, char *p, int len)
{
/* trim away CR/LF */
len = len - 2;
//hlog(LOG_DEBUG, "%s/%s: IS2: writing IS packet of %d bytes", c->addr_rem, c->username, len);
ProtobufCBinaryData data;
data.data = (uint8_t *)p;
data.len = len;
data->data = (uint8_t *)p;
data->len = len;
int n = 1; // just one packet
int i;
@ -703,19 +780,64 @@ int is2_write_packet(struct worker_t *self, struct client_t *c, char *p, int len
ispacket__init(subs[i]);
subs[i]->type = ISPACKET__TYPE__IS_PACKET;
subs[i]->has_is_packet_data = 1;
subs[i]->is_packet_data = data;
subs[i]->is_packet_data = *data;
}
m->type = IS2_MESSAGE__TYPE__IS_PACKET;
m->n_is_packet = n;
m->is_packet = subs;
return i;
}
void is2_free_encoded_packets(IS2Message *m)
{
int i;
for (i = 0; i < m->n_is_packet; i++)
hfree(m->is_packet[i]);
hfree(m->is_packet);
}
int is2_write_packet(struct worker_t *self, struct client_t *c, char *p, int len)
{
/* trim away CR/LF */
len = len - 2;
//hlog(LOG_DEBUG, "%s/%s: IS2: writing IS packet of %d bytes", c->addr_rem, c->username, len);
IS2Message m = IS2_MESSAGE__INIT;
m.type = IS2_MESSAGE__TYPE__IS_PACKET;
m.n_is_packet = n;
m.is_packet = subs;
ProtobufCBinaryData data;
is2_encode_packet(&m, &data, p, len);
int ret = is2_write_message(self, c, &m);
for (i = 0; i < n; i++)
hfree(subs[i]);
hfree(subs);
is2_free_encoded_packets(&m);
return ret;
}
/*
* Write a single packet to an IS2 UDP peer
*
* OPTIMIZE: generate packet once, or reuse incoming prepacked buffer
*/
int is2_corepeer_write_packet(struct worker_t *self, struct client_t *c, char *p, int len)
{
/* trim away CR/LF */
len = len - 2;
//hlog(LOG_DEBUG, "%s/%s: IS2: writing IS packet of %d bytes", c->addr_rem, c->username, len);
IS2Message m = IS2_MESSAGE__INIT;
ProtobufCBinaryData data;
is2_encode_packet(&m, &data, p, len);
int ret = is2_corepeer_write_message(self, c, &m);
is2_free_encoded_packets(&m);
return ret;
}
@ -849,6 +971,8 @@ int is2_corepeer_control_in(struct worker_t *self, struct client_t *c, char *p,
/* OK, enable */
c->corepeer_is2 = 1;
c->write_packet = is2_corepeer_write_packet;
c->is2_input_handler = is2_input_handler_corepeer;
/* Do send offers every 10 minutes away */
c->next_is2_peer_offer = tick + COREPEER_IS2_PROPOSE_T_MAX + random() % 5;

View File

@ -10,6 +10,7 @@ extern int is2_input_handler(struct worker_t *self, struct client_t *c, IS2Messa
extern int is2_out_server_signature(struct worker_t *self, struct client_t *c);
extern int is2_deframe_input(struct worker_t *self, struct client_t *c, int start_at);
extern int is2_corepeer_deframe_input(struct worker_t *self, struct client_t *c, char *ibuf, int len);
extern int is2_write_packet(struct worker_t *self, struct client_t *c, char *p, int len);

View File

@ -412,6 +412,9 @@ void client_free(struct client_t *c)
client_udp_free(c->udpclient);
clientlist_remove(c);
if (c->corepeer_is2_challenge)
hfree(c->corepeer_is2_challenge);
#ifdef USE_SSL
if (c->ssl_con)
@ -911,19 +914,24 @@ void client_close(struct worker_t *self, struct client_t *c, int errnum)
self->client_count--;
}
int udp_client_write_packet(struct worker_t *self, struct client_t *c, char *p, int len)
{
return udp_client_write(self, c, p, len-2);
}
int udp_client_write(struct worker_t *self, struct client_t *c, char *p, int len)
{
/* Every packet ends with CRLF, but they are not sent over UDP ! */
/* Existing system doesn't send keepalives via UDP.. */
int i = sendto( c->udpclient->fd, p, len-2, MSG_DONTWAIT,
int i = sendto( c->udpclient->fd, p, len, MSG_DONTWAIT,
&c->udpaddr.sa, c->udpaddrlen );
if (i < 0) {
hlog(LOG_ERR, "UDP transmit error to %s udp port %d: %s",
c->addr_rem, c->udp_port, strerror(errno));
} else if (i != len -2) {
} else if (i != len) {
hlog(LOG_ERR, "UDP transmit incomplete to %s udp port %d: wrote %d of %d bytes, errno: %s",
c->addr_rem, c->udp_port, i, len-2, strerror(errno));
c->addr_rem, c->udp_port, i, len, strerror(errno));
}
// hlog( LOG_DEBUG, "UDP from %d to client port %d, sendto rc=%d", c->udpclient->portnum, c->udp_port, i );
@ -1014,7 +1022,7 @@ static int tcp_client_write(struct worker_t *self, struct client_t *c, char *p,
/* a TCP client with a udp downstream socket? */
if (c->udp_port && c->udpclient && len > 0 && *p != '#') {
return udp_client_write(self, c, p, len);
return udp_client_write_packet(self, c, p, len);
}
/* Count the number of writes towards this client, the keepalive
@ -1213,6 +1221,11 @@ static int handle_corepeer_readable(struct worker_t *self, struct client_t *c)
return 0;
}
if (c->corepeer_is2) {
is2_corepeer_deframe_input(self, c, c->ibuf, r);
return 0;
}
c->handler_line_in(self, rc, IPPROTO_UDP, c->ibuf, r);
return 0;
@ -1545,7 +1558,8 @@ static void collect_new_clients(struct worker_t *self)
}
c->handler_client_readable = &handle_corepeer_readable;
c->write_packet = c->write = &udp_client_write;
c->write_packet = &udp_client_write_packet;
c->write = &udp_client_write;
continue;
}

View File

@ -525,6 +525,7 @@ extern struct client_udp_t *udppeers;
extern void client_udp_free(struct client_udp_t *u);
extern struct client_udp_t *client_udp_alloc(struct client_udp_t **root, int fd, int portnum);
extern struct client_udp_t *client_udp_find(struct client_udp_t *root, int af, int portnum);
extern int udp_client_write(struct worker_t *self, struct client_t *c, char *p, int len);
extern void inbound_connects_account(const int add, struct portaccount_t *p);