commit db21da87bba6017c8343f9c6f255b21813ffd5d0 Author: Fabio M. Di Nitto Date: Tue Oct 15 06:46:36 2019 +0200 [host] rename variables to make it easier to read the code Signed-off-by: Fabio M. Di Nitto diff --git a/libknet/host.c b/libknet/host.c index abb1f89..ac26b89 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -569,7 +569,7 @@ static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, int clear_buf) { - size_t i, j; /* circular buffer indexes */ + size_t head, tail; /* circular buffer indexes */ seq_num_t seq_dist; char *dst_cbuf = host->circular_buffer; char *dst_cbuf_defrag = host->circular_buffer_defrag; @@ -585,13 +585,13 @@ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, i seq_dist = *dst_seq_num - seq_num; } - j = seq_num % KNET_CBUFFER_SIZE; + head = seq_num % KNET_CBUFFER_SIZE; if (seq_dist < KNET_CBUFFER_SIZE) { /* seq num is in ring buffer */ if (!defrag_buf) { - return (dst_cbuf[j] == 0) ? 1 : 0; + return (dst_cbuf[head] == 0) ? 1 : 0; } else { - return (dst_cbuf_defrag[j] == 0) ? 1 : 0; + return (dst_cbuf_defrag[head] == 0) ? 1 : 0; } } else if (seq_dist <= SEQ_MAX - KNET_CBUFFER_SIZE) { memset(dst_cbuf, 0, KNET_CBUFFER_SIZE); @@ -600,16 +600,16 @@ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, i } /* cleaning up circular buffer */ - i = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; + tail = (*dst_seq_num + 1) % KNET_CBUFFER_SIZE; - if (i > j) { - memset(dst_cbuf + i, 0, KNET_CBUFFER_SIZE - i); - memset(dst_cbuf, 0, j + 1); - memset(dst_cbuf_defrag + i, 0, KNET_CBUFFER_SIZE - i); - memset(dst_cbuf_defrag, 0, j + 1); + if (tail > head) { + memset(dst_cbuf + tail, 0, KNET_CBUFFER_SIZE - tail); + memset(dst_cbuf, 0, head + 1); + memset(dst_cbuf_defrag + tail, 0, KNET_CBUFFER_SIZE - tail); + memset(dst_cbuf_defrag, 0, head + 1); } else { - memset(dst_cbuf + i, 0, j - i + 1); - memset(dst_cbuf_defrag + i, 0, j - i + 1); + memset(dst_cbuf + tail, 0, head - tail + 1); + memset(dst_cbuf_defrag + tail, 0, head - tail + 1); } *dst_seq_num = seq_num; commit 1e473cf26d55c2b6ff8d5bfaa5aa689554de803c Author: Fabio M. Di Nitto Date: Tue Oct 15 06:53:24 2019 +0200 [host] fix defrag buffers reclaim logic The problem: - let's assume a 2 nodes (A and B) cluster setup - node A sends fragmented packets to node B and there is packet loss on the network. - node B receives all those fragments and attempts to reassemble them. - node A sends packet seq_num X in Y fragments. - node B receives only part of the fragments and stores them in a defrag buf. - packet loss stops. - node A continues to send packets and a seq_num roll-over takes place. - node A sends a new packet seq_num X in Y fragments. - node B gets confused here because the parts of the old packet seq_num X are still stored and the buffer has not been reclaimed. - node B continues to rebuild packet seq_num X with old stale data and new data from after the roll-over. - node B completes reassembling the packet and delivers junk to the application. The solution: Add a much stronger buffer reclaim logic that will apply on each received packet and not only when defrag buffers are needed, as there might be a mix of fragmented and not fragmented packets in-flight. The new logic creates a window of N packets that can be handled at the same time (based on the number of buffers) and clear everything else. Fixes https://github.com/kronosnet/kronosnet/issues/261 Signed-off-by: Fabio M. Di Nitto diff --git a/libknet/host.c b/libknet/host.c index ac26b89..85d4626 100644 --- a/libknet/host.c +++ b/libknet/host.c @@ -562,6 +562,35 @@ static void _clear_cbuffers(struct knet_host *host, seq_num_t rx_seq_num) } } +static void _reclaim_old_defrag_bufs(struct knet_host *host, seq_num_t seq_num) +{ + seq_num_t head, tail; /* seq_num boundaries */ + int i; + + head = seq_num + 1; + tail = seq_num - (KNET_MAX_LINK + 1); + + /* + * expire old defrag buffers + */ + for (i = 0; i < KNET_MAX_LINK; i++) { + if (host->defrag_buf[i].in_use) { + /* + * head has done a rollover to 0+ + */ + if (tail > head) { + if ((host->defrag_buf[i].pckt_seq >= head) && (host->defrag_buf[i].pckt_seq <= tail)) { + host->defrag_buf[i].in_use = 0; + } + } else { + if ((host->defrag_buf[i].pckt_seq >= head) || (host->defrag_buf[i].pckt_seq <= tail)){ + host->defrag_buf[i].in_use = 0; + } + } + } + } +} + /* * check if a given packet seq num is in the circular buffers * defrag_buf = 0 -> use normal cbuf 1 -> use the defrag buffer lookup @@ -579,6 +608,8 @@ int _seq_num_lookup(struct knet_host *host, seq_num_t seq_num, int defrag_buf, i _clear_cbuffers(host, seq_num); } + _reclaim_old_defrag_bufs(host, seq_num); + if (seq_num < *dst_seq_num) { seq_dist = (SEQ_MAX - seq_num) + *dst_seq_num; } else { commit 5bd88ebd63af20577095c2c98975f0f1781ba46a Author: Fabio M. Di Nitto Date: Tue Oct 15 07:02:05 2019 +0200 [rx] copy data into the defrag buffer only if we know the size of the frame Signed-off-by: Fabio M. Di Nitto diff --git a/libknet/threads_rx.c b/libknet/threads_rx.c index b2a5dad..6c26cdc 100644 --- a/libknet/threads_rx.c +++ b/libknet/threads_rx.c @@ -186,8 +186,10 @@ static int pckt_defrag(knet_handle_t knet_h, struct knet_header *inbuf, ssize_t defrag_buf->frag_size = *len; } - memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), - inbuf->khp_data_userdata, *len); + if (defrag_buf->frag_size) { + memmove(defrag_buf->buf + ((inbuf->khp_data_frag_seq - 1) * defrag_buf->frag_size), + inbuf->khp_data_userdata, *len); + } defrag_buf->frag_recv++; defrag_buf->frag_map[inbuf->khp_data_frag_seq] = 1; commit cd59986900510119d8e7b63d33ad35466d480858 Author: Fabio M. Di Nitto Date: Tue Oct 15 07:16:22 2019 +0200 [test] add ability to knet_bench to specify a fixed packet size for perf test Signed-off-by: Fabio M. Di Nitto diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index dc04239..54b5303 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -67,6 +67,8 @@ static int test_type = TEST_PING; static uint64_t perf_by_size_size = 1 * ONE_GIGABYTE; static uint64_t perf_by_time_secs = 10; +static uint32_t force_packet_size = 0; + struct node { int nodeid; int links; @@ -109,6 +111,7 @@ static void print_help(void) printf(" -s nodeid that will generate traffic for benchmarks\n"); printf(" -S [size|seconds] when used in combination with -T perf-by-size it indicates how many GB of traffic to generate for the test. (default: 1GB)\n"); printf(" when used in combination with -T perf-by-time it indicates how many Seconds of traffic to generate for the test. (default: 10 seconds)\n"); + printf(" -x force packet size for perf-by-time or perf-by-size\n"); printf(" -C repeat the test continously (default: off)\n"); printf(" -X[XX] show stats at the end of the run (default: 1)\n"); printf(" 1: show handle stats, 2: show summary link stats\n"); @@ -250,7 +253,7 @@ static void setup_knet(int argc, char *argv[]) memset(nodes, 0, sizeof(nodes)); - while ((rv = getopt(argc, argv, "aCT:S:s:ldfom:wb:t:n:c:p:X::P:z:h")) != EOF) { + while ((rv = getopt(argc, argv, "aCT:S:s:ldfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); @@ -406,6 +409,13 @@ static void setup_knet(int argc, char *argv[]) perf_by_size_size = (uint64_t)atoi(optarg) * ONE_GIGABYTE; perf_by_time_secs = (uint64_t)atoi(optarg); break; + case 'x': + force_packet_size = (uint32_t)atoi(optarg); + if ((force_packet_size < 1) || (force_packet_size > 65536)) { + printf("Unsupported packet size %u (accepted 1 - 65536)\n", force_packet_size); + exit(FAIL); + } + break; case 'C': continous = 1; break; @@ -874,7 +884,7 @@ static int setup_send_buffers_common(struct knet_mmsghdr *msg, struct iovec *iov printf("TXT: Unable to malloc!\n"); return -1; } - memset(tx_buf[i], 0, KNET_MAX_PACKET_SIZE); + memset(tx_buf[i], i, KNET_MAX_PACKET_SIZE); iov_out[i].iov_base = (void *)tx_buf[i]; memset(&msg[i].msg_hdr, 0, sizeof(struct msghdr)); msg[i].msg_hdr.msg_iov = &iov_out[i]; @@ -898,6 +908,9 @@ static void send_perf_data_by_size(void) setup_send_buffers_common(msg, iov_out, tx_buf); while (packetsize <= KNET_MAX_PACKET_SIZE) { + if (force_packet_size) { + packetsize = force_packet_size; + } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } @@ -926,7 +939,7 @@ static void send_perf_data_by_size(void) knet_send(knet_h, ctrl_message, TEST_STOP, channel); - if (packetsize == KNET_MAX_PACKET_SIZE) { + if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) { break; } @@ -1175,6 +1188,9 @@ static void send_perf_data_by_time(void) memset(&clock_end, 0, sizeof(clock_start)); while (packetsize <= KNET_MAX_PACKET_SIZE) { + if (force_packet_size) { + packetsize = force_packet_size; + } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; } @@ -1205,7 +1221,7 @@ static void send_perf_data_by_time(void) knet_send(knet_h, ctrl_message, TEST_STOP, channel); - if (packetsize == KNET_MAX_PACKET_SIZE) { + if ((packetsize == KNET_MAX_PACKET_SIZE) || (force_packet_size)) { break; } commit e28e2ea7c7e8139a6792ec1508215d4560b53e65 Author: Fabio M. Di Nitto Date: Wed Oct 16 08:10:23 2019 +0200 [test] add packet verification option to knet_bench Signed-off-by: Fabio M. Di Nitto diff --git a/libknet/tests/knet_bench.c b/libknet/tests/knet_bench.c index 54b5303..c9e1c06 100644 --- a/libknet/tests/knet_bench.c +++ b/libknet/tests/knet_bench.c @@ -47,6 +47,7 @@ static char *compresscfg = NULL; static char *cryptocfg = NULL; static int machine_output = 0; static int use_access_lists = 0; +static int use_pckt_verification = 0; static int bench_shutdown_in_progress = 0; static pthread_mutex_t shutdown_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -76,6 +77,11 @@ struct node { struct sockaddr_storage address[KNET_MAX_LINK]; }; +struct pckt_ver { + uint32_t len; + uint32_t chksum; +}; + static void print_help(void) { printf("knet_bench usage:\n"); @@ -117,6 +123,7 @@ static void print_help(void) printf(" 1: show handle stats, 2: show summary link stats\n"); printf(" 3: show detailed link stats\n"); printf(" -a enable machine parsable output (default: off).\n"); + printf(" -v enable packet verification for performance tests (default: off).\n"); } static void parse_nodes(char *nodesinfo[MAX_NODES], int onidx, int port, struct node nodes[MAX_NODES], int *thisidx) @@ -253,7 +260,7 @@ static void setup_knet(int argc, char *argv[]) memset(nodes, 0, sizeof(nodes)); - while ((rv = getopt(argc, argv, "aCT:S:s:ldfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) { + while ((rv = getopt(argc, argv, "aCT:S:s:lvdfom:wb:t:n:c:p:x:X::P:z:h")) != EOF) { switch(rv) { case 'h': print_help(); @@ -411,11 +418,14 @@ static void setup_knet(int argc, char *argv[]) break; case 'x': force_packet_size = (uint32_t)atoi(optarg); - if ((force_packet_size < 1) || (force_packet_size > 65536)) { - printf("Unsupported packet size %u (accepted 1 - 65536)\n", force_packet_size); + if ((force_packet_size < 64) || (force_packet_size > 65536)) { + printf("Unsupported packet size %u (accepted 64 - 65536)\n", force_packet_size); exit(FAIL); } break; + case 'v': + use_pckt_verification = 1; + break; case 'C': continous = 1; break; @@ -654,6 +664,24 @@ static void setup_knet(int argc, char *argv[]) } } +/* + * calculate weak chksum (stole from corosync for debugging purposes) + */ +static uint32_t compute_chsum(const unsigned char *data, uint32_t data_len) +{ + unsigned int i; + unsigned int checksum = 0; + + for (i = 0; i < data_len; i++) { + if (checksum & 1) { + checksum |= 0x10000; + } + + checksum = ((checksum >> 1) + (unsigned char)data[i]) & 0xffff; + } + return (checksum); +} + static void *_rx_thread(void *args) { int rx_epoll; @@ -766,6 +794,20 @@ static void *_rx_thread(void *args) } continue; } + if (use_pckt_verification) { + struct pckt_ver *recv_pckt = (struct pckt_ver *)msg[i].msg_hdr.msg_iov->iov_base; + uint32_t chksum; + + if (msg[i].msg_len != recv_pckt->len) { + printf("Wrong packet len received: %u expected: %u!\n", msg[i].msg_len, recv_pckt->len); + exit(FAIL); + } + chksum = compute_chsum((const unsigned char *)msg[i].msg_hdr.msg_iov->iov_base + sizeof(struct pckt_ver), msg[i].msg_len - sizeof(struct pckt_ver)); + if (recv_pckt->chksum != chksum){ + printf("Wrong packet checksum received: %u expected: %u!\n", recv_pckt->chksum, chksum); + exit(FAIL); + } + } rx_pkts++; rx_bytes = rx_bytes + msg[i].msg_len; current_pckt_size = msg[i].msg_len; @@ -913,6 +955,11 @@ static void send_perf_data_by_size(void) } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; + if (use_pckt_verification) { + struct pckt_ver *tx_pckt = (struct pckt_ver *)&iov_out[i].iov_base; + tx_pckt->len = iov_out[i].iov_len; + tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver)); + } } total_pkts_to_tx = perf_by_size_size / packetsize; @@ -1193,6 +1240,11 @@ static void send_perf_data_by_time(void) } for (i = 0; i < PCKT_FRAG_MAX; i++) { iov_out[i].iov_len = packetsize; + if (use_pckt_verification) { + struct pckt_ver *tx_pckt = (struct pckt_ver *)iov_out[i].iov_base; + tx_pckt->len = iov_out[i].iov_len; + tx_pckt->chksum = compute_chsum((const unsigned char *)iov_out[i].iov_base + sizeof(struct pckt_ver), iov_out[i].iov_len - sizeof(struct pckt_ver)); + } } printf("[info]: testing with %u bytes packet size for %" PRIu64 " seconds.\n", packetsize, perf_by_time_secs);