Blob Blame History Raw
From 8cc8e513633a1a8b12c416e32fb5362fcf4d65dd Mon Sep 17 00:00:00 2001
From: Christine Caulfield <ccaulfie@redhat.com>
Date: Thu, 5 Mar 2015 16:45:15 +0000
Subject: [PATCH] cpg: Add support for messages larger than 1Mb

If a cpg client sends a message larger than 1Mb (actually slightly
less to allow for internal buffers) cpg will now fragment that into
several corosync messages before sending it around the ring.

cpg_mcast_joined() can now return CS_ERR_INTERRUPT which means that the
cpg membership was disrupted during the send operation and the message
needs to be resent.

The new API call cpg_max_atomic_msgsize_get() returns the maximum size
of a message that will not be fragmented internally.

New test program cpghum was written to stress test this functionality,
it checks message integrity and order of receipt.

Signed-off-by: Christine Caulfield <ccaulfie@redhat.com>
Reviewed-by: Jan Friesse <jfriesse@redhat.com>
---
 configure.ac               |    1 +
 corosync.spec.in           |    1 +
 exec/cpg.c                 |  182 +++++++++++++++++++++++++++++++++++++++++++-
 include/corosync/cpg.h     |    7 ++
 include/corosync/ipc_cpg.h |   35 ++++++++-
 lib/cpg.c                  |  171 ++++++++++++++++++++++++++++++++++++++++-
 test/Makefile.am           |    3 +-
 7 files changed, 393 insertions(+), 7 deletions(-)

diff --git a/configure.ac b/configure.ac
index 0c371aa..b394329 100644
--- a/configure.ac
+++ b/configure.ac
@@ -163,6 +163,7 @@ AC_CHECK_LIB([pthread], [pthread_create])
 AC_CHECK_LIB([socket], [socket])
 AC_CHECK_LIB([nsl], [t_open])
 AC_CHECK_LIB([rt], [sched_getscheduler])
+AC_CHECK_LIB([z], [crc32])
 
 # Checks for library functions.
 AC_FUNC_ALLOCA
diff --git a/corosync.spec.in b/corosync.spec.in
index 3ca75b7..a2ba584 100644
--- a/corosync.spec.in
+++ b/corosync.spec.in
@@ -40,6 +40,7 @@ Conflicts: openais <= 0.89, openais-devel <= 0.89
 BuildRequires: groff
 BuildRequires: libqb-devel
 BuildRequires: nss-devel
+BuildRequires: zlib-devel
 %if %{with runautogen}
 BuildRequires: autoconf automake libtool
 %endif
diff --git a/exec/cpg.c b/exec/cpg.c
index 1c6fbb9..a18b850 100644
--- a/exec/cpg.c
+++ b/exec/cpg.c
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -83,7 +83,8 @@ enum cpg_message_req_types {
 	MESSAGE_REQ_EXEC_CPG_JOINLIST = 2,
 	MESSAGE_REQ_EXEC_CPG_MCAST = 3,
 	MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4,
-	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5
+	MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5,
+	MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6,
 };
 
 struct zcb_mapped {
@@ -156,6 +157,8 @@ struct cpg_pd {
 	enum cpd_state cpd_state;
 	unsigned int flags;
 	int initial_totem_conf_sent;
+	uint64_t transition_counter; /* These two are used when sending fragmented messages */
+	uint64_t initial_transition_counter;
 	struct list_head list;
 	struct list_head iteration_instance_list_head;
 	struct list_head zcb_mapped_list_head;
@@ -224,6 +227,10 @@ static void message_handler_req_exec_cpg_mcast (
 	const void *message,
 	unsigned int nodeid);
 
+static void message_handler_req_exec_cpg_partial_mcast (
+	const void *message,
+	unsigned int nodeid);
+
 static void message_handler_req_exec_cpg_downlist_old (
 	const void *message,
 	unsigned int nodeid);
@@ -238,6 +245,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg);
 
 static void exec_cpg_mcast_endian_convert (void *msg);
 
+static void exec_cpg_partial_mcast_endian_convert (void *msg);
+
 static void exec_cpg_downlist_endian_convert_old (void *msg);
 
 static void exec_cpg_downlist_endian_convert (void *msg);
@@ -250,6 +259,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag
 
 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message);
 
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message);
+
 static void message_handler_req_lib_cpg_membership (void *conn,
 						    const void *message);
 
@@ -383,7 +394,10 @@ static struct corosync_lib_handler cpg_lib_engine[] =
 		.lib_handler_fn				= message_handler_req_lib_cpg_zc_execute,
 		.flow_control				= CS_LIB_FLOW_CONTROL_REQUIRED
 	},
-
+	{ /* 12 */
+		.lib_handler_fn				= message_handler_req_lib_cpg_partial_mcast,
+		.flow_control				= CS_LIB_FLOW_CONTROL_REQUIRED
+	},
 
 };
 
@@ -413,6 +427,10 @@ static struct corosync_exec_handler cpg_exec_engine[] =
 		.exec_handler_fn	= message_handler_req_exec_cpg_downlist,
 		.exec_endian_convert_fn	= exec_cpg_downlist_endian_convert
 	},
+	{ /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */
+		.exec_handler_fn	= message_handler_req_exec_cpg_partial_mcast,
+		.exec_endian_convert_fn	= exec_cpg_partial_mcast_endian_convert
+	},
 };
 
 struct corosync_service_engine cpg_service_engine = {
@@ -457,6 +475,17 @@ struct req_exec_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_exec_cpg_partial_mcast {
+	struct qb_ipc_request_header header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_message_source_t source __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct req_exec_cpg_downlist_old {
 	struct qb_ipc_request_header header __attribute__((aligned(8)));
 	mar_uint32_t left_nodes __attribute__((aligned(8)));
@@ -740,6 +769,7 @@ static int notify_lib_joinlist(
 					cpd->cpd_state == CPD_STATE_LEAVE_STARTED) {
 
 					api->ipc_dispatch_send (cpd->conn, buf, size);
+					cpd->transition_counter++;
 				}
 				if (left_list_entries) {
 					if (left_list[0].pid == cpd->pid &&
@@ -1186,6 +1216,19 @@ static void exec_cpg_mcast_endian_convert (void *msg)
 	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
 }
 
+static void exec_cpg_partial_mcast_endian_convert (void *msg)
+{
+	struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg;
+
+	swab_coroipc_request_header_t (&req_exec_cpg_mcast->header);
+	swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name);
+	req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid);
+	req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen);
+	req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen);
+	req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type);
+	swab_mar_message_source_t (&req_exec_cpg_mcast->source);
+}
+
 static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) {
 	struct list_head *iter;
 
@@ -1453,6 +1496,68 @@ static void message_handler_req_exec_cpg_mcast (
 	}
 }
 
+static void message_handler_req_exec_cpg_partial_mcast (
+	const void *message,
+	unsigned int nodeid)
+{
+	const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message;
+	struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast;
+	int msglen = req_exec_cpg_mcast->fraglen;
+	struct list_head *iter, *pi_iter;
+	struct cpg_pd *cpd;
+	struct iovec iovec[2];
+	int known_node = 0;
+
+	log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen);
+
+	res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK;
+	res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen;
+	res_lib_cpg_mcast.fraglen = msglen;
+	res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen;
+	res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid;
+	res_lib_cpg_mcast.type = req_exec_cpg_mcast->type;
+	res_lib_cpg_mcast.nodeid = nodeid;
+
+	memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name,
+	       sizeof(mar_cpg_name_t));
+	iovec[0].iov_base = (void *)&res_lib_cpg_mcast;
+	iovec[0].iov_len = sizeof (res_lib_cpg_mcast);
+
+	iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast);
+	iovec[1].iov_len = msglen;
+
+	for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) {
+		cpd = list_entry(iter, struct cpg_pd, list);
+		iter = iter->next;
+
+		if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED)
+		    && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) {
+
+			if (!known_node) {
+				/* Try to find, if we know the node */
+				for (pi_iter = process_info_list_head.next;
+				     pi_iter != &process_info_list_head; pi_iter = pi_iter->next) {
+
+					struct process_info *pi = list_entry (pi_iter, struct process_info, list);
+
+					if (pi->nodeid == nodeid &&
+					    mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) {
+						known_node = 1;
+						break;
+					}
+				}
+			}
+
+			if (!known_node) {
+				log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message");
+				return ;
+			}
+
+			api->ipc_dispatch_iov_send (cpd->conn, iovec, 2);
+		}
+	}
+}
+
 
 static int cpg_exec_send_downlist(void)
 {
@@ -1864,6 +1969,77 @@ static void message_handler_req_lib_cpg_zc_free (
 		res_header.size);
 }
 
+/* Fragmented mcast message from the library */
+static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message)
+{
+	const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message;
+	struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn);
+	mar_cpg_name_t group_name = cpd->group_name;
+
+	struct iovec req_exec_cpg_iovec[2];
+	struct req_exec_cpg_partial_mcast req_exec_cpg_mcast;
+	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
+	int msglen = req_lib_cpg_mcast->fraglen;
+	int result;
+	cs_error_t error = CS_ERR_NOT_EXIST;
+
+	log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn);
+	log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen);
+
+	switch (cpd->cpd_state) {
+	case CPD_STATE_UNJOINED:
+		error = CS_ERR_NOT_EXIST;
+		break;
+	case CPD_STATE_LEAVE_STARTED:
+		error = CS_ERR_NOT_EXIST;
+		break;
+	case CPD_STATE_JOIN_STARTED:
+		error = CS_OK;
+		break;
+	case CPD_STATE_JOIN_COMPLETED:
+		error = CS_OK;
+		break;
+	}
+
+	res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send);
+	res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND;
+
+	if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) {
+		cpd->initial_transition_counter = cpd->transition_counter;
+	}
+	if (cpd->transition_counter != cpd->initial_transition_counter) {
+		error = CS_ERR_INTERRUPT;
+	}
+
+	if (error == CS_OK) {
+		req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen;
+		req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE,
+							       MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST);
+		req_exec_cpg_mcast.pid = cpd->pid;
+		req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen;
+		req_exec_cpg_mcast.type = req_lib_cpg_mcast->type;
+		req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen;
+		api->ipc_source_set (&req_exec_cpg_mcast.source, conn);
+		memcpy(&req_exec_cpg_mcast.group_name, &group_name,
+		       sizeof(mar_cpg_name_t));
+
+		req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast;
+		req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast);
+		req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message;
+		req_exec_cpg_iovec[1].iov_len = msglen;
+
+		result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED);
+		assert(result == 0);
+	} else {
+		log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d",
+			   conn, group_name.value, cpd->cpd_state, error);
+	}
+
+	res_lib_cpg_partial_send.header.error = error;
+	api->ipc_response_send (conn, &res_lib_cpg_partial_send,
+				sizeof (res_lib_cpg_partial_send));
+}
+
 /* Mcast message from the library */
 static void message_handler_req_lib_cpg_mcast (void *conn, const void *message)
 {
diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h
index 55fc4b8..f66fb14 100644
--- a/include/corosync/cpg.h
+++ b/include/corosync/cpg.h
@@ -186,6 +186,13 @@ cs_error_t cpg_fd_get (
 	int *fd);
 
 /**
+ * Get maximum size of a message that will not be fragmented
+ */
+cs_error_t cpg_max_atomic_msgsize_get (
+	cpg_handle_t handle,
+	uint32_t *size);
+
+/**
  * Get contexts for a CPG handle
  */
 cs_error_t cpg_context_get (
diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h
index a95335a..5008acf 100644
--- a/include/corosync/ipc_cpg.h
+++ b/include/corosync/ipc_cpg.h
@@ -1,5 +1,5 @@
 /*
- * Copyright (c) 2006-2011 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -55,6 +55,7 @@ enum req_cpg_types {
 	MESSAGE_REQ_CPG_ZC_ALLOC = 9,
 	MESSAGE_REQ_CPG_ZC_FREE = 10,
 	MESSAGE_REQ_CPG_ZC_EXECUTE = 11,
+	MESSAGE_REQ_CPG_PARTIAL_MCAST = 12,
 };
 
 enum res_cpg_types {
@@ -75,6 +76,8 @@ enum res_cpg_types {
 	MESSAGE_RES_CPG_ZC_ALLOC = 14,
 	MESSAGE_RES_CPG_ZC_FREE = 15,
 	MESSAGE_RES_CPG_ZC_EXECUTE = 16,
+	MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17,
+	MESSAGE_RES_CPG_PARTIAL_SEND = 18,
 };
 
 enum lib_cpg_confchg_reason {
@@ -85,6 +88,12 @@ enum lib_cpg_confchg_reason {
 	CONFCHG_CPG_REASON_PROCDOWN = 5
 };
 
+enum lib_cpg_partial_types {
+	LIBCPG_PARTIAL_FIRST = 1,
+	LIBCPG_PARTIAL_CONTINUED = 2,
+	LIBCPG_PARTIAL_LAST = 3,
+};
+
 typedef struct {
 	uint32_t length __attribute__((aligned(8)));
 	char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8)));
@@ -200,6 +209,10 @@ struct res_lib_cpg_local_get {
 	mar_uint32_t local_nodeid __attribute__((aligned(8)));
 };
 
+struct res_lib_cpg_partial_send {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+};
+
 struct req_lib_cpg_mcast {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 	mar_uint32_t guarantee __attribute__((aligned(8)));
@@ -207,6 +220,15 @@ struct req_lib_cpg_mcast {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct req_lib_cpg_partial_mcast {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+	mar_uint32_t guarantee __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct res_lib_cpg_mcast {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 };
@@ -223,6 +245,17 @@ struct res_lib_cpg_deliver_callback {
 	mar_uint8_t message[] __attribute__((aligned(8)));
 };
 
+struct res_lib_cpg_partial_deliver_callback {
+	struct qb_ipc_response_header header __attribute__((aligned(8)));
+	mar_cpg_name_t group_name __attribute__((aligned(8)));
+	mar_uint32_t msglen __attribute__((aligned(8)));
+	mar_uint32_t fraglen __attribute__((aligned(8)));
+	mar_uint32_t nodeid __attribute__((aligned(8)));
+	mar_uint32_t pid __attribute__((aligned(8)));
+	mar_uint32_t type __attribute__((aligned(8)));
+	mar_uint8_t message[] __attribute__((aligned(8)));
+};
+
 struct res_lib_cpg_flowcontrol_callback {
 	struct qb_ipc_response_header header __attribute__((aligned(8)));
 	mar_uint32_t flow_control_state __attribute__((aligned(8)));
diff --git a/lib/cpg.c b/lib/cpg.c
index 4b92f44..037e8a9 100644
--- a/lib/cpg.c
+++ b/lib/cpg.c
@@ -1,7 +1,7 @@
 /*
  * vi: set autoindent tabstop=4 shiftwidth=4 :
  *
- * Copyright (c) 2006-2012 Red Hat, Inc.
+ * Copyright (c) 2006-2015 Red Hat, Inc.
  *
  * All rights reserved.
  *
@@ -70,6 +70,12 @@
 #endif
 
 /*
+ * Maximum number of times to retry a send when transmitting
+ * a large message fragment
+ */
+#define MAX_RETRIES 100
+
+/*
  * ZCB files have following umask (umask is same as used in libqb)
  */
 #define CPG_MEMORY_MAP_UMASK		077
@@ -83,6 +89,14 @@ struct cpg_inst {
 		cpg_model_v1_data_t model_v1_data;
 	};
 	struct list_head iteration_list_head;
+    uint32_t max_msg_size;
+    char *assembly_buf;
+    uint32_t assembly_buf_ptr;
+    int assembling; /* Flag that says we have started assembling a message.
+					 * It's here to catch the situation where a node joins
+					 * the cluster/group in the middle of a CPG message send
+					 * so we don't pass on a partial message to the client.
+					 */
 };
 static void cpg_inst_free (void *inst);
 
@@ -210,6 +224,8 @@ cs_error_t cpg_model_initialize (
 		}
 	}
 
+	/* Allow space for corosync internal headers */
+	cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024;
 	cpg_inst->model_data.model = model;
 	cpg_inst->context = context;
 
@@ -291,6 +307,25 @@ cs_error_t cpg_fd_get (
 	return (error);
 }
 
+cs_error_t cpg_max_atomic_msgsize_get (
+	cpg_handle_t handle,
+	uint32_t *size)
+{
+	cs_error_t error;
+	struct cpg_inst *cpg_inst;
+
+	error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst));
+	if (error != CS_OK) {
+		return (error);
+	}
+
+	*size = cpg_inst->max_msg_size;
+
+	hdb_handle_put (&cpg_handle_t_db, handle);
+
+	return (error);
+}
+
 cs_error_t cpg_context_get (
 	cpg_handle_t handle,
 	void **context)
@@ -339,6 +374,7 @@ cs_error_t cpg_dispatch (
 	struct cpg_inst *cpg_inst;
 	struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback;
 	struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback;
+	struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback;
 	struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback;
 	struct cpg_inst cpg_inst_copy;
 	struct qb_ipc_response_header *dispatch_data;
@@ -361,7 +397,7 @@ cs_error_t cpg_dispatch (
 
 	/*
 	 * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and
-	 * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
+	 * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING
 	 */
 	if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) {
 		timeout = 0;
@@ -428,6 +464,43 @@ cs_error_t cpg_dispatch (
 					res_cpg_deliver_callback->msglen);
 				break;
 
+			case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK:
+				res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data;
+
+				marshall_from_mar_cpg_name_t (
+					&group_name,
+					&res_cpg_partial_deliver_callback->group_name);
+
+				if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) {
+					/*
+					 * Allocate a buffer to contain a full message.
+					 */
+					cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen);
+					if (!cpg_inst->assembly_buf) {
+						error = CS_ERR_NO_MEMORY;
+						goto error_put;
+					}
+					cpg_inst->assembling = 1;
+					cpg_inst->assembly_buf_ptr = 0;
+				}
+				if (cpg_inst->assembling) {
+					memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr,
+					       res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen);
+					cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen;
+
+					if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) {
+						cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle,
+							&group_name,
+							res_cpg_partial_deliver_callback->nodeid,
+							res_cpg_partial_deliver_callback->pid,
+							cpg_inst->assembly_buf,
+							res_cpg_partial_deliver_callback->msglen);
+						free(cpg_inst->assembly_buf);
+						cpg_inst->assembling = 0;
+					}
+				}
+				break;
+
 			case MESSAGE_RES_CPG_CONFCHG_CALLBACK:
 				if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) {
 					break;
@@ -921,6 +994,12 @@ cs_error_t cpg_zcb_mcast_joined (
 	if (error != CS_OK) {
 		return (error);
 	}
+
+	if (msg_len > IPC_REQUEST_SIZE) {
+		error = CS_ERR_TOO_BIG;
+		goto error_exit;
+	}
+
 	req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast));
 	req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) +
 		msg_len;
@@ -957,6 +1036,88 @@ error_exit:
 	return (error);
 }
 
+static cs_error_t send_fragments (
+	struct cpg_inst *cpg_inst,
+	cpg_guarantee_t guarantee,
+	size_t msg_len,
+	const struct iovec *iovec,
+	unsigned int iov_len)
+{
+	int i;
+	cs_error_t error = CS_OK;
+	struct iovec iov[2];
+	struct req_lib_cpg_partial_mcast req_lib_cpg_mcast;
+	struct res_lib_cpg_partial_send res_lib_cpg_partial_send;
+	size_t sent = 0;
+	size_t iov_sent = 0;
+	int retry_count;
+
+	req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST;
+	req_lib_cpg_mcast.guarantee = guarantee;
+	req_lib_cpg_mcast.msglen = msg_len;
+
+	iov[0].iov_base = (void *)&req_lib_cpg_mcast;
+	iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast);
+
+	i=0;
+	iov_sent = 0 ;
+	qb_ipcc_fc_enable_max_set(cpg_inst->c,  2);
+
+	while (error == CS_OK && sent < msg_len) {
+
+		retry_count = 0;
+		if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) {
+			iov[1].iov_len = cpg_inst->max_msg_size;
+		}
+		else {
+			iov[1].iov_len = iovec[i].iov_len - iov_sent;
+		}
+
+		if (sent == 0) {
+			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST;
+		}
+		else if ((sent + iov[1].iov_len) == msg_len) {
+			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST;
+		}
+		else {
+			req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED;
+		}
+
+		req_lib_cpg_mcast.fraglen = iov[1].iov_len;
+		req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len;
+		iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent;
+
+	resend:
+		error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2,
+							 &res_lib_cpg_partial_send,
+							 sizeof (res_lib_cpg_partial_send));
+
+		if (error == CS_ERR_TRY_AGAIN) {
+			fprintf(stderr, "sleep. counter=%d\n", retry_count);
+			if (++retry_count > MAX_RETRIES) {
+				goto error_exit;
+			}
+			usleep(10000);
+			goto resend;
+		}
+
+		iov_sent += iov[1].iov_len;
+		sent += iov[1].iov_len;
+
+		/* Next iovec */
+		if (iov_sent >= iovec[i].iov_len) {
+			i++;
+			iov_sent = 0;
+		}
+		error = res_lib_cpg_partial_send.header.error;
+	}
+error_exit:
+	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
+
+	return error;
+}
+
+
 cs_error_t cpg_mcast_joined (
 	cpg_handle_t handle,
 	cpg_guarantee_t guarantee,
@@ -979,6 +1140,11 @@ cs_error_t cpg_mcast_joined (
 		msg_len += iovec[i].iov_len;
 	}
 
+	if (msg_len > cpg_inst->max_msg_size) {
+		error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len);
+		goto error_exit;
+	}
+
 	req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) +
 		msg_len;
 
@@ -994,6 +1160,7 @@ cs_error_t cpg_mcast_joined (
 	error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1));
 	qb_ipcc_fc_enable_max_set(cpg_inst->c,  1);
 
+error_exit:
 	hdb_handle_put (&cpg_handle_t_db, handle);
 
 	return (error);
diff --git a/test/Makefile.am b/test/Makefile.am
index c19e506..bb11518 100644
--- a/test/Makefile.am
+++ b/test/Makefile.am
@@ -34,7 +34,7 @@ MAINTAINERCLEANFILES	= Makefile.in
 
 EXTRA_DIST		= ploadstart.sh
 
-noinst_PROGRAMS		= cpgverify testcpg testcpg2 cpgbench \
+noinst_PROGRAMS		= cpgverify testcpg testcpg2 cpgbench cpghum \
 			  testquorum testvotequorum1 testvotequorum2	\
 			  stress_cpgfdget stress_cpgcontext cpgbound testsam \
 			  testcpgzc cpgbenchzc testzcgc stress_cpgzc
@@ -48,6 +48,7 @@ testzcgc_LDADD		= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgzc_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgfdget_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
 stress_cpgcontext_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la
+cpghum_LDADD            = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz
 testquorum_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la
 testvotequorum1_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
 testvotequorum2_LDADD	= $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la
-- 
1.7.1