From 3842ba6080e00fd9484a2a875d982e149f67bc44 Mon Sep 17 00:00:00 2001 From: Jan Friesse Date: Tue, 10 Mar 2015 13:20:37 +0100 Subject: [PATCH] Really add cpghum Signed-off-by: Jan Friesse --- test/cpghum.c | 432 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 files changed, 432 insertions(+), 0 deletions(-) create mode 100644 test/cpghum.c diff --git a/test/cpghum.c b/test/cpghum.c new file mode 100644 index 0000000..79184e8 --- /dev/null +++ b/test/cpghum.c @@ -0,0 +1,432 @@ +/* + * Copyright (c) 2015 Red Hat, Inc. + * + * All rights reserved. + * + * Author: Christine Caulfield + * + * This software licensed under BSD license, the text of which follows: + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * - Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * - Redistributions in binary form must reproduce the above copyright notice, + * this list of conditions and the following disclaimer in the documentation + * and/or other materials provided with the distribution. + * - Neither the name of the MontaVista Software, Inc. nor the names of its + * contributors may be used to endorse or promote products derived from this + * software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF + * THE POSSIBILITY OF SUCH DAMAGE. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include + +static cpg_handle_t handle; + +static pthread_t thread; + +#ifndef timersub +#define timersub(a, b, result) \ + do { \ + (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \ + (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \ + if ((result)->tv_usec < 0) { \ + --(result)->tv_sec; \ + (result)->tv_usec += 1000000; \ + } \ + } while (0) +#endif /* timersub */ + +static int alarm_notice; +#define ONE_MEG 1048576 +#define DATASIZE (ONE_MEG*20) +static char data[DATASIZE]; +static int send_counter = 0; +static int do_syslog = 0; +static int quiet = 0; +static volatile int stopped; + +// stats +static unsigned int length_errors=0; +static unsigned int crc_errors=0; +static unsigned int sequence_errors=0; +static unsigned int packets_sent=0; +static unsigned int packets_recvd=0; +static unsigned int send_retries=0; +static unsigned int send_fails=0; + +static void cpg_bm_confchg_fn ( + cpg_handle_t handle_in, + const struct cpg_name *group_name, + const struct cpg_address *member_list, size_t member_list_entries, + const struct cpg_address *left_list, size_t left_list_entries, + const struct cpg_address *joined_list, size_t joined_list_entries) +{ +} + +static unsigned int g_recv_count; +static unsigned int g_recv_length; +static unsigned int g_write_size; +static int g_recv_counter = 0; + +static void cpg_bm_deliver_fn ( + cpg_handle_t handle_in, + const struct cpg_name *group_name, + uint32_t nodeid, + uint32_t pid, + void *msg, + size_t msg_len) +{ + int *value = msg; + uLong crc=0; + ulong recv_crc = value[1] & 0xFFFFFFFF; + + packets_recvd++; + g_recv_length = msg_len; + + // Basic check, packets should all be the right size + if (g_write_size && (msg_len != g_write_size)) { + length_errors++; + fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size); + if (do_syslog) { + syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size); + } + } + + // Sequence counters are incrementing in step? + if (*value != g_recv_counter) { + sequence_errors++; + fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter); + if (do_syslog) { + syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter); + } + // Catch up or we'll be printing errors for ever + g_recv_counter = *value +1; + } else { + g_recv_counter++; + } + + // Check crc + crc = crc32(0, NULL, 0); + crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF; + if (crc != recv_crc) { + crc_errors++; + fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc); + if (do_syslog) { + syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc); + } + } + + g_recv_count++; + +} + +static cpg_model_v1_data_t model1_data = { + .cpg_deliver_fn = cpg_bm_deliver_fn, + .cpg_confchg_fn = cpg_bm_confchg_fn, +}; + +static cpg_callbacks_t callbacks = { + .cpg_deliver_fn = cpg_bm_deliver_fn, + .cpg_confchg_fn = cpg_bm_confchg_fn +}; + +static struct cpg_name group_name = { + .value = "cpghum", + .length = 7 +}; + +static void cpg_test ( + cpg_handle_t handle_in, + int write_size, + int delay_time, + int print_time) +{ + struct timeval tv1, tv2, tv_elapsed; + struct iovec iov; + unsigned int res; + int i; + unsigned int *dataint = (unsigned int *)data; + uLong crc; + + alarm_notice = 0; + iov.iov_base = data; + iov.iov_len = write_size; + + g_recv_count = 0; + alarm (print_time); + + gettimeofday (&tv1, NULL); + do { + dataint[0] = send_counter++; + for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) { + dataint[i] = rand(); + } + crc = crc32(0, NULL, 0); + dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2); + resend: + res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1); + if (res == CS_ERR_TRY_AGAIN) { + usleep(10000); + send_retries++; + goto resend; + } + if (res != CS_OK) { + fprintf(stderr, "send failed: %d\n", res); + send_fails++; + } + else { + packets_sent++; + } + usleep(delay_time*1000); + } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0); + gettimeofday (&tv2, NULL); + timersub (&tv2, &tv1, &tv_elapsed); + + if (!quiet) { + printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s"); + printf ("%5d bytes per write\n", write_size); + } + +} + +static void sigalrm_handler (int num) +{ + alarm_notice = 1; +} + +static void sigint_handler (int num) +{ + stopped = 1; +} + +static void* dispatch_thread (void *arg) +{ + cpg_dispatch (handle, CS_DISPATCH_BLOCKING); + return NULL; +} + +static void usage(char *cmd) +{ + fprintf(stderr, "%s [OPTIONS]\n", cmd); + fprintf(stderr, "\n"); + fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd); + fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n"); + fprintf(stderr, "corrupted messages will be detected and reported.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd); + fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n"); + fprintf(stderr, "and it, obviously, must match that of the sender.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n"); + fprintf(stderr, "different nodes by using the -n option.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd); + fprintf(stderr, "sequence numbers.\n"); + fprintf(stderr, "\n"); + fprintf(stderr, " -w Write size in Kbytes, default 4\n"); + fprintf(stderr, " -W Write size in bytes, default 4096\n"); + fprintf(stderr, " -n CPG name to use, default 'cpghum'\n"); + fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n"); + fprintf(stderr, " -r Number of repetitions, default 100\n"); + fprintf(stderr, " -p Delay between printing output(S), default 10s\n"); + fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n"); + fprintf(stderr, " -m cpg_initialise() model. Default 1.\n"); + fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n"); + fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n"); + fprintf(stderr, "\n"); +} + +int main (int argc, char *argv[]) { + int i; + unsigned int res; + uint32_t maxsize; + int opt; + int bs; + int write_size = 4096; + int delay_time = 1000; + int repetitions = 100; + int print_time = 10; + int have_size = 0; + int listen_only = 0; + int model = 1; + + while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) { + switch (opt) { + case 'w': // Write size in K + bs = atoi(optarg); + if (bs > 0) { + write_size = bs*1024; + have_size = 1; + } + break; + case 'W': // Write size in bytes + bs = atoi(optarg); + if (bs > 0) { + write_size = bs; + have_size = 1; + } + break; + case 'n': + strcpy(group_name.value, optarg); + group_name.length = strlen(group_name.value); + break; + case 'd': + delay_time = atoi(optarg); + break; + case 'r': + repetitions = atoi(optarg); + break; + case 'p': + print_time = atoi(optarg); + break; + case 'l': + listen_only = 1; + break; + case 's': + do_syslog = 1; + break; + case 'q': + quiet = 1; + break; + case 'm': + model = atoi(optarg); + if (model < 0 || model > 1) { + fprintf(stderr, "%s: Model must be 0-1\n", argv[0]); + exit(1); + } + break; + case '?': + usage(basename(argv[0])); + exit(0); + } + } + + qb_log_init("cpghum", LOG_USER, LOG_EMERG); + qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE); + qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD, + QB_LOG_FILTER_FILE, "*", LOG_DEBUG); + qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE); + + g_write_size = write_size; + + signal (SIGALRM, sigalrm_handler); + signal (SIGINT, sigint_handler); + switch (model) { + case 0: + res = cpg_initialize (&handle, &callbacks); + break; + case 1: + res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL); + break; + default: + res=999; // can't get here but it keeps the compiler happy + break; + } + + if (res != CS_OK) { + printf ("cpg_initialize failed with result %d\n", res); + exit (1); + } + pthread_create (&thread, NULL, dispatch_thread, NULL); + + res = cpg_join (handle, &group_name); + if (res != CS_OK) { + printf ("cpg_join failed with result %d\n", res); + exit (1); + } + + if (listen_only) { + int secs; + if (!quiet) { + printf("-- Listening on CPG %s\n", group_name.value); + printf("-- Ignore any starting \"counters don't match\" error while we catch up\n"); + } + + /* Only check packet size if specified on the command-line */ + if (!have_size) { + g_write_size = 0; + } + + while (!stopped) { + sleep(1); + if (++secs > print_time && !quiet) { + printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length); + secs = 0; + g_recv_count = 0; + } + } + } + else { + cpg_max_atomic_msgsize_get (handle, &maxsize); + if ( write_size > maxsize) { + fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n", + write_size, maxsize); + } + for (i = 0; i < repetitions && !stopped; i++) { + cpg_test (handle, write_size, delay_time, print_time); + signal (SIGALRM, sigalrm_handler); + } + } + + res = cpg_finalize (handle); + if (res != CS_OK) { + printf ("cpg_finalize failed with result %d\n", res); + exit (1); + } + + printf("\n"); + printf("Stats:\n"); + if (!listen_only) { + printf(" packets sent: %d\n", packets_sent); + printf(" send failures: %d\n", send_fails); + printf(" send retries: %d\n", send_retries); + } + if (have_size) { + printf(" length errors: %d\n", length_errors); + } + printf(" packets recvd: %d\n", packets_recvd); + printf(" sequence errors: %d\n", sequence_errors); + printf(" crc errors: %d\n", crc_errors); + printf("\n"); + return (0); +} -- 1.7.1