From 3842ba6080e00fd9484a2a875d982e149f67bc44 Mon Sep 17 00:00:00 2001
From: Jan Friesse <jfriesse@redhat.com>
Date: Tue, 10 Mar 2015 13:20:37 +0100
Subject: [PATCH] Really add cpghum
Signed-off-by: Jan Friesse <jfriesse@redhat.com>
---
test/cpghum.c | 432 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++
1 files changed, 432 insertions(+), 0 deletions(-)
create mode 100644 test/cpghum.c
diff --git a/test/cpghum.c b/test/cpghum.c
new file mode 100644
index 0000000..79184e8
--- /dev/null
+++ b/test/cpghum.c
@@ -0,0 +1,432 @@
+/*
+ * Copyright (c) 2015 Red Hat, Inc.
+ *
+ * All rights reserved.
+ *
+ * Author: Christine Caulfield <ccaulfie@redhat.com>
+ *
+ * This software licensed under BSD license, the text of which follows:
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are met:
+ *
+ * - Redistributions of source code must retain the above copyright notice,
+ * this list of conditions and the following disclaimer.
+ * - Redistributions in binary form must reproduce the above copyright notice,
+ * this list of conditions and the following disclaimer in the documentation
+ * and/or other materials provided with the distribution.
+ * - Neither the name of the MontaVista Software, Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from this
+ * software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+ * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+ * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+ * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+ * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+ * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+ * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+ * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
+ * THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <signal.h>
+#include <unistd.h>
+#include <errno.h>
+#include <unistd.h>
+#include <time.h>
+#include <sys/time.h>
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <sys/select.h>
+#include <sys/uio.h>
+#include <sys/un.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <pthread.h>
+#include <zlib.h>
+#include <libgen.h>
+
+#include <qb/qblog.h>
+#include <qb/qbutil.h>
+
+#include <corosync/corotypes.h>
+#include <corosync/cpg.h>
+
+static cpg_handle_t handle;
+
+static pthread_t thread;
+
+#ifndef timersub
+#define timersub(a, b, result) \
+ do { \
+ (result)->tv_sec = (a)->tv_sec - (b)->tv_sec; \
+ (result)->tv_usec = (a)->tv_usec - (b)->tv_usec; \
+ if ((result)->tv_usec < 0) { \
+ --(result)->tv_sec; \
+ (result)->tv_usec += 1000000; \
+ } \
+ } while (0)
+#endif /* timersub */
+
+static int alarm_notice;
+#define ONE_MEG 1048576
+#define DATASIZE (ONE_MEG*20)
+static char data[DATASIZE];
+static int send_counter = 0;
+static int do_syslog = 0;
+static int quiet = 0;
+static volatile int stopped;
+
+// stats
+static unsigned int length_errors=0;
+static unsigned int crc_errors=0;
+static unsigned int sequence_errors=0;
+static unsigned int packets_sent=0;
+static unsigned int packets_recvd=0;
+static unsigned int send_retries=0;
+static unsigned int send_fails=0;
+
+static void cpg_bm_confchg_fn (
+ cpg_handle_t handle_in,
+ const struct cpg_name *group_name,
+ const struct cpg_address *member_list, size_t member_list_entries,
+ const struct cpg_address *left_list, size_t left_list_entries,
+ const struct cpg_address *joined_list, size_t joined_list_entries)
+{
+}
+
+static unsigned int g_recv_count;
+static unsigned int g_recv_length;
+static unsigned int g_write_size;
+static int g_recv_counter = 0;
+
+static void cpg_bm_deliver_fn (
+ cpg_handle_t handle_in,
+ const struct cpg_name *group_name,
+ uint32_t nodeid,
+ uint32_t pid,
+ void *msg,
+ size_t msg_len)
+{
+ int *value = msg;
+ uLong crc=0;
+ ulong recv_crc = value[1] & 0xFFFFFFFF;
+
+ packets_recvd++;
+ g_recv_length = msg_len;
+
+ // Basic check, packets should all be the right size
+ if (g_write_size && (msg_len != g_write_size)) {
+ length_errors++;
+ fprintf(stderr, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+ if (do_syslog) {
+ syslog(LOG_ERR, "%s: message sizes don't match. got %lu, expected %u\n", group_name->value, msg_len, g_write_size);
+ }
+ }
+
+ // Sequence counters are incrementing in step?
+ if (*value != g_recv_counter) {
+ sequence_errors++;
+ fprintf(stderr, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+ if (do_syslog) {
+ syslog(LOG_ERR, "%s: counters don't match. got %d, expected %d\n", group_name->value, *value, g_recv_counter);
+ }
+ // Catch up or we'll be printing errors for ever
+ g_recv_counter = *value +1;
+ } else {
+ g_recv_counter++;
+ }
+
+ // Check crc
+ crc = crc32(0, NULL, 0);
+ crc = crc32(crc, (Bytef *)&value[2], msg_len-sizeof(int)*2) & 0xFFFFFFFF;
+ if (crc != recv_crc) {
+ crc_errors++;
+ fprintf(stderr, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+ if (do_syslog) {
+ syslog(LOG_ERR, "%s: CRCs don't match. got %lx, expected %lx\n", group_name->value, recv_crc, crc);
+ }
+ }
+
+ g_recv_count++;
+
+}
+
+static cpg_model_v1_data_t model1_data = {
+ .cpg_deliver_fn = cpg_bm_deliver_fn,
+ .cpg_confchg_fn = cpg_bm_confchg_fn,
+};
+
+static cpg_callbacks_t callbacks = {
+ .cpg_deliver_fn = cpg_bm_deliver_fn,
+ .cpg_confchg_fn = cpg_bm_confchg_fn
+};
+
+static struct cpg_name group_name = {
+ .value = "cpghum",
+ .length = 7
+};
+
+static void cpg_test (
+ cpg_handle_t handle_in,
+ int write_size,
+ int delay_time,
+ int print_time)
+{
+ struct timeval tv1, tv2, tv_elapsed;
+ struct iovec iov;
+ unsigned int res;
+ int i;
+ unsigned int *dataint = (unsigned int *)data;
+ uLong crc;
+
+ alarm_notice = 0;
+ iov.iov_base = data;
+ iov.iov_len = write_size;
+
+ g_recv_count = 0;
+ alarm (print_time);
+
+ gettimeofday (&tv1, NULL);
+ do {
+ dataint[0] = send_counter++;
+ for (i=2; i<(DATASIZE-sizeof(int)*2)/4; i++) {
+ dataint[i] = rand();
+ }
+ crc = crc32(0, NULL, 0);
+ dataint[1] = crc32(crc, (Bytef*)&dataint[2], write_size-sizeof(int)*2);
+ resend:
+ res = cpg_mcast_joined (handle_in, CPG_TYPE_AGREED, &iov, 1);
+ if (res == CS_ERR_TRY_AGAIN) {
+ usleep(10000);
+ send_retries++;
+ goto resend;
+ }
+ if (res != CS_OK) {
+ fprintf(stderr, "send failed: %d\n", res);
+ send_fails++;
+ }
+ else {
+ packets_sent++;
+ }
+ usleep(delay_time*1000);
+ } while (alarm_notice == 0 && (res == CS_OK || res == CS_ERR_TRY_AGAIN) && stopped == 0);
+ gettimeofday (&tv2, NULL);
+ timersub (&tv2, &tv1, &tv_elapsed);
+
+ if (!quiet) {
+ printf ("%s: %5d message%s received, ", group_name.value, g_recv_count, g_recv_count==1?"":"s");
+ printf ("%5d bytes per write\n", write_size);
+ }
+
+}
+
+static void sigalrm_handler (int num)
+{
+ alarm_notice = 1;
+}
+
+static void sigint_handler (int num)
+{
+ stopped = 1;
+}
+
+static void* dispatch_thread (void *arg)
+{
+ cpg_dispatch (handle, CS_DISPATCH_BLOCKING);
+ return NULL;
+}
+
+static void usage(char *cmd)
+{
+ fprintf(stderr, "%s [OPTIONS]\n", cmd);
+ fprintf(stderr, "\n");
+ fprintf(stderr, "%s sends CPG messages to all registered users of the CPG.\n", cmd);
+ fprintf(stderr, "The messages have a sequence number and a CRC so that missing or\n");
+ fprintf(stderr, "corrupted messages will be detected and reported.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "%s can also be asked to simply listen for (and check) packets\n", cmd);
+ fprintf(stderr, "so that there is another node in the cluster connected to the CPG.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "When -l is present, packet size is only checked if specified by -w or -W\n");
+ fprintf(stderr, "and it, obviously, must match that of the sender.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "Multiple copies, in different CPGs, can also be run on the same or\n");
+ fprintf(stderr, "different nodes by using the -n option.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, "%s can't handle more than 1 sender in the same CPG as it messes with the\n", cmd);
+ fprintf(stderr, "sequence numbers.\n");
+ fprintf(stderr, "\n");
+ fprintf(stderr, " -w Write size in Kbytes, default 4\n");
+ fprintf(stderr, " -W Write size in bytes, default 4096\n");
+ fprintf(stderr, " -n CPG name to use, default 'cpghum'\n");
+ fprintf(stderr, " -d Delay between sending packets (mS), default 1000\n");
+ fprintf(stderr, " -r Number of repetitions, default 100\n");
+ fprintf(stderr, " -p Delay between printing output(S), default 10s\n");
+ fprintf(stderr, " -l Listen and check CRCs only, don't send (^C to quit)\n");
+ fprintf(stderr, " -m cpg_initialise() model. Default 1.\n");
+ fprintf(stderr, " -s Also send errors to syslog (for daemon log correlation).\n");
+ fprintf(stderr, " -q Quiet. Don't print messages every 10 seconds (see also -p)\n");
+ fprintf(stderr, "\n");
+}
+
+int main (int argc, char *argv[]) {
+ int i;
+ unsigned int res;
+ uint32_t maxsize;
+ int opt;
+ int bs;
+ int write_size = 4096;
+ int delay_time = 1000;
+ int repetitions = 100;
+ int print_time = 10;
+ int have_size = 0;
+ int listen_only = 0;
+ int model = 1;
+
+ while ( (opt = getopt(argc, argv, "qlsn:d:r:p:m:w:W:")) != -1 ) {
+ switch (opt) {
+ case 'w': // Write size in K
+ bs = atoi(optarg);
+ if (bs > 0) {
+ write_size = bs*1024;
+ have_size = 1;
+ }
+ break;
+ case 'W': // Write size in bytes
+ bs = atoi(optarg);
+ if (bs > 0) {
+ write_size = bs;
+ have_size = 1;
+ }
+ break;
+ case 'n':
+ strcpy(group_name.value, optarg);
+ group_name.length = strlen(group_name.value);
+ break;
+ case 'd':
+ delay_time = atoi(optarg);
+ break;
+ case 'r':
+ repetitions = atoi(optarg);
+ break;
+ case 'p':
+ print_time = atoi(optarg);
+ break;
+ case 'l':
+ listen_only = 1;
+ break;
+ case 's':
+ do_syslog = 1;
+ break;
+ case 'q':
+ quiet = 1;
+ break;
+ case 'm':
+ model = atoi(optarg);
+ if (model < 0 || model > 1) {
+ fprintf(stderr, "%s: Model must be 0-1\n", argv[0]);
+ exit(1);
+ }
+ break;
+ case '?':
+ usage(basename(argv[0]));
+ exit(0);
+ }
+ }
+
+ qb_log_init("cpghum", LOG_USER, LOG_EMERG);
+ qb_log_ctl(QB_LOG_SYSLOG, QB_LOG_CONF_ENABLED, QB_FALSE);
+ qb_log_filter_ctl(QB_LOG_STDERR, QB_LOG_FILTER_ADD,
+ QB_LOG_FILTER_FILE, "*", LOG_DEBUG);
+ qb_log_ctl(QB_LOG_STDERR, QB_LOG_CONF_ENABLED, QB_TRUE);
+
+ g_write_size = write_size;
+
+ signal (SIGALRM, sigalrm_handler);
+ signal (SIGINT, sigint_handler);
+ switch (model) {
+ case 0:
+ res = cpg_initialize (&handle, &callbacks);
+ break;
+ case 1:
+ res = cpg_model_initialize (&handle, CPG_MODEL_V1, (cpg_model_data_t *)&model1_data, NULL);
+ break;
+ default:
+ res=999; // can't get here but it keeps the compiler happy
+ break;
+ }
+
+ if (res != CS_OK) {
+ printf ("cpg_initialize failed with result %d\n", res);
+ exit (1);
+ }
+ pthread_create (&thread, NULL, dispatch_thread, NULL);
+
+ res = cpg_join (handle, &group_name);
+ if (res != CS_OK) {
+ printf ("cpg_join failed with result %d\n", res);
+ exit (1);
+ }
+
+ if (listen_only) {
+ int secs;
+ if (!quiet) {
+ printf("-- Listening on CPG %s\n", group_name.value);
+ printf("-- Ignore any starting \"counters don't match\" error while we catch up\n");
+ }
+
+ /* Only check packet size if specified on the command-line */
+ if (!have_size) {
+ g_write_size = 0;
+ }
+
+ while (!stopped) {
+ sleep(1);
+ if (++secs > print_time && !quiet) {
+ printf ("%s: %5d message%s received. %d bytes\n", group_name.value, g_recv_count, g_recv_count==1?"":"s", g_recv_length);
+ secs = 0;
+ g_recv_count = 0;
+ }
+ }
+ }
+ else {
+ cpg_max_atomic_msgsize_get (handle, &maxsize);
+ if ( write_size > maxsize) {
+ fprintf(stderr, "INFO: packet size (%d) is larger than the maximum atomic size (%d), libcpg will fragment\n",
+ write_size, maxsize);
+ }
+ for (i = 0; i < repetitions && !stopped; i++) {
+ cpg_test (handle, write_size, delay_time, print_time);
+ signal (SIGALRM, sigalrm_handler);
+ }
+ }
+
+ res = cpg_finalize (handle);
+ if (res != CS_OK) {
+ printf ("cpg_finalize failed with result %d\n", res);
+ exit (1);
+ }
+
+ printf("\n");
+ printf("Stats:\n");
+ if (!listen_only) {
+ printf(" packets sent: %d\n", packets_sent);
+ printf(" send failures: %d\n", send_fails);
+ printf(" send retries: %d\n", send_retries);
+ }
+ if (have_size) {
+ printf(" length errors: %d\n", length_errors);
+ }
+ printf(" packets recvd: %d\n", packets_recvd);
+ printf(" sequence errors: %d\n", sequence_errors);
+ printf(" crc errors: %d\n", crc_errors);
+ printf("\n");
+ return (0);
+}
--
1.7.1