From 8cc8e513633a1a8b12c416e32fb5362fcf4d65dd Mon Sep 17 00:00:00 2001 From: Christine Caulfield Date: Thu, 5 Mar 2015 16:45:15 +0000 Subject: [PATCH] cpg: Add support for messages larger than 1Mb If a cpg client sends a message larger than 1Mb (actually slightly less to allow for internal buffers) cpg will now fragment that into several corosync messages before sending it around the ring. cpg_mcast_joined() can now return CS_ERR_INTERRUPT which means that the cpg membership was disrupted during the send operation and the message needs to be resent. The new API call cpg_max_atomic_msgsize_get() returns the maximum size of a message that will not be fragmented internally. New test program cpghum was written to stress test this functionality, it checks message integrity and order of receipt. Signed-off-by: Christine Caulfield Reviewed-by: Jan Friesse --- configure.ac | 1 + corosync.spec.in | 1 + exec/cpg.c | 182 +++++++++++++++++++++++++++++++++++++++++++- include/corosync/cpg.h | 7 ++ include/corosync/ipc_cpg.h | 35 ++++++++- lib/cpg.c | 171 ++++++++++++++++++++++++++++++++++++++++- test/Makefile.am | 3 +- 7 files changed, 393 insertions(+), 7 deletions(-) diff --git a/configure.ac b/configure.ac index 0c371aa..b394329 100644 --- a/configure.ac +++ b/configure.ac @@ -163,6 +163,7 @@ AC_CHECK_LIB([pthread], [pthread_create]) AC_CHECK_LIB([socket], [socket]) AC_CHECK_LIB([nsl], [t_open]) AC_CHECK_LIB([rt], [sched_getscheduler]) +AC_CHECK_LIB([z], [crc32]) # Checks for library functions. AC_FUNC_ALLOCA diff --git a/corosync.spec.in b/corosync.spec.in index 3ca75b7..a2ba584 100644 --- a/corosync.spec.in +++ b/corosync.spec.in @@ -40,6 +40,7 @@ Conflicts: openais <= 0.89, openais-devel <= 0.89 BuildRequires: groff BuildRequires: libqb-devel BuildRequires: nss-devel +BuildRequires: zlib-devel %if %{with runautogen} BuildRequires: autoconf automake libtool %endif diff --git a/exec/cpg.c b/exec/cpg.c index 1c6fbb9..a18b850 100644 --- a/exec/cpg.c +++ b/exec/cpg.c @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006-2012 Red Hat, Inc. + * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * @@ -83,7 +83,8 @@ enum cpg_message_req_types { MESSAGE_REQ_EXEC_CPG_JOINLIST = 2, MESSAGE_REQ_EXEC_CPG_MCAST = 3, MESSAGE_REQ_EXEC_CPG_DOWNLIST_OLD = 4, - MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5 + MESSAGE_REQ_EXEC_CPG_DOWNLIST = 5, + MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST = 6, }; struct zcb_mapped { @@ -156,6 +157,8 @@ struct cpg_pd { enum cpd_state cpd_state; unsigned int flags; int initial_totem_conf_sent; + uint64_t transition_counter; /* These two are used when sending fragmented messages */ + uint64_t initial_transition_counter; struct list_head list; struct list_head iteration_instance_list_head; struct list_head zcb_mapped_list_head; @@ -224,6 +227,10 @@ static void message_handler_req_exec_cpg_mcast ( const void *message, unsigned int nodeid); +static void message_handler_req_exec_cpg_partial_mcast ( + const void *message, + unsigned int nodeid); + static void message_handler_req_exec_cpg_downlist_old ( const void *message, unsigned int nodeid); @@ -238,6 +245,8 @@ static void exec_cpg_joinlist_endian_convert (void *msg); static void exec_cpg_mcast_endian_convert (void *msg); +static void exec_cpg_partial_mcast_endian_convert (void *msg); + static void exec_cpg_downlist_endian_convert_old (void *msg); static void exec_cpg_downlist_endian_convert (void *msg); @@ -250,6 +259,8 @@ static void message_handler_req_lib_cpg_finalize (void *conn, const void *messag static void message_handler_req_lib_cpg_mcast (void *conn, const void *message); +static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message); + static void message_handler_req_lib_cpg_membership (void *conn, const void *message); @@ -383,7 +394,10 @@ static struct corosync_lib_handler cpg_lib_engine[] = .lib_handler_fn = message_handler_req_lib_cpg_zc_execute, .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED }, - + { /* 12 */ + .lib_handler_fn = message_handler_req_lib_cpg_partial_mcast, + .flow_control = CS_LIB_FLOW_CONTROL_REQUIRED + }, }; @@ -413,6 +427,10 @@ static struct corosync_exec_handler cpg_exec_engine[] = .exec_handler_fn = message_handler_req_exec_cpg_downlist, .exec_endian_convert_fn = exec_cpg_downlist_endian_convert }, + { /* 6 - MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST */ + .exec_handler_fn = message_handler_req_exec_cpg_partial_mcast, + .exec_endian_convert_fn = exec_cpg_partial_mcast_endian_convert + }, }; struct corosync_service_engine cpg_service_engine = { @@ -457,6 +475,17 @@ struct req_exec_cpg_mcast { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct req_exec_cpg_partial_mcast { + struct qb_ipc_request_header header __attribute__((aligned(8))); + mar_cpg_name_t group_name __attribute__((aligned(8))); + mar_uint32_t msglen __attribute__((aligned(8))); + mar_uint32_t fraglen __attribute__((aligned(8))); + mar_uint32_t pid __attribute__((aligned(8))); + mar_uint32_t type __attribute__((aligned(8))); + mar_message_source_t source __attribute__((aligned(8))); + mar_uint8_t message[] __attribute__((aligned(8))); +}; + struct req_exec_cpg_downlist_old { struct qb_ipc_request_header header __attribute__((aligned(8))); mar_uint32_t left_nodes __attribute__((aligned(8))); @@ -740,6 +769,7 @@ static int notify_lib_joinlist( cpd->cpd_state == CPD_STATE_LEAVE_STARTED) { api->ipc_dispatch_send (cpd->conn, buf, size); + cpd->transition_counter++; } if (left_list_entries) { if (left_list[0].pid == cpd->pid && @@ -1186,6 +1216,19 @@ static void exec_cpg_mcast_endian_convert (void *msg) swab_mar_message_source_t (&req_exec_cpg_mcast->source); } +static void exec_cpg_partial_mcast_endian_convert (void *msg) +{ + struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = msg; + + swab_coroipc_request_header_t (&req_exec_cpg_mcast->header); + swab_mar_cpg_name_t (&req_exec_cpg_mcast->group_name); + req_exec_cpg_mcast->pid = swab32(req_exec_cpg_mcast->pid); + req_exec_cpg_mcast->msglen = swab32(req_exec_cpg_mcast->msglen); + req_exec_cpg_mcast->fraglen = swab32(req_exec_cpg_mcast->fraglen); + req_exec_cpg_mcast->type = swab32(req_exec_cpg_mcast->type); + swab_mar_message_source_t (&req_exec_cpg_mcast->source); +} + static struct process_info *process_info_find(const mar_cpg_name_t *group_name, uint32_t pid, unsigned int nodeid) { struct list_head *iter; @@ -1453,6 +1496,68 @@ static void message_handler_req_exec_cpg_mcast ( } } +static void message_handler_req_exec_cpg_partial_mcast ( + const void *message, + unsigned int nodeid) +{ + const struct req_exec_cpg_partial_mcast *req_exec_cpg_mcast = message; + struct res_lib_cpg_partial_deliver_callback res_lib_cpg_mcast; + int msglen = req_exec_cpg_mcast->fraglen; + struct list_head *iter, *pi_iter; + struct cpg_pd *cpd; + struct iovec iovec[2]; + int known_node = 0; + + log_printf(LOGSYS_LEVEL_DEBUG, "Got fragmented message from node %d, size = %d bytes\n", nodeid, msglen); + + res_lib_cpg_mcast.header.id = MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK; + res_lib_cpg_mcast.header.size = sizeof(res_lib_cpg_mcast) + msglen; + res_lib_cpg_mcast.fraglen = msglen; + res_lib_cpg_mcast.msglen = req_exec_cpg_mcast->msglen; + res_lib_cpg_mcast.pid = req_exec_cpg_mcast->pid; + res_lib_cpg_mcast.type = req_exec_cpg_mcast->type; + res_lib_cpg_mcast.nodeid = nodeid; + + memcpy(&res_lib_cpg_mcast.group_name, &req_exec_cpg_mcast->group_name, + sizeof(mar_cpg_name_t)); + iovec[0].iov_base = (void *)&res_lib_cpg_mcast; + iovec[0].iov_len = sizeof (res_lib_cpg_mcast); + + iovec[1].iov_base = (char*)message+sizeof(*req_exec_cpg_mcast); + iovec[1].iov_len = msglen; + + for (iter = cpg_pd_list_head.next; iter != &cpg_pd_list_head; ) { + cpd = list_entry(iter, struct cpg_pd, list); + iter = iter->next; + + if ((cpd->cpd_state == CPD_STATE_LEAVE_STARTED || cpd->cpd_state == CPD_STATE_JOIN_COMPLETED) + && (mar_name_compare (&cpd->group_name, &req_exec_cpg_mcast->group_name) == 0)) { + + if (!known_node) { + /* Try to find, if we know the node */ + for (pi_iter = process_info_list_head.next; + pi_iter != &process_info_list_head; pi_iter = pi_iter->next) { + + struct process_info *pi = list_entry (pi_iter, struct process_info, list); + + if (pi->nodeid == nodeid && + mar_name_compare (&pi->group, &req_exec_cpg_mcast->group_name) == 0) { + known_node = 1; + break; + } + } + } + + if (!known_node) { + log_printf(LOGSYS_LEVEL_WARNING, "Unknown node -> we will not deliver message"); + return ; + } + + api->ipc_dispatch_iov_send (cpd->conn, iovec, 2); + } + } +} + static int cpg_exec_send_downlist(void) { @@ -1864,6 +1969,77 @@ static void message_handler_req_lib_cpg_zc_free ( res_header.size); } +/* Fragmented mcast message from the library */ +static void message_handler_req_lib_cpg_partial_mcast (void *conn, const void *message) +{ + const struct req_lib_cpg_partial_mcast *req_lib_cpg_mcast = message; + struct cpg_pd *cpd = (struct cpg_pd *)api->ipc_private_data_get (conn); + mar_cpg_name_t group_name = cpd->group_name; + + struct iovec req_exec_cpg_iovec[2]; + struct req_exec_cpg_partial_mcast req_exec_cpg_mcast; + struct res_lib_cpg_partial_send res_lib_cpg_partial_send; + int msglen = req_lib_cpg_mcast->fraglen; + int result; + cs_error_t error = CS_ERR_NOT_EXIST; + + log_printf(LOGSYS_LEVEL_TRACE, "got fragmented mcast request on %p", conn); + log_printf(LOGSYS_LEVEL_DEBUG, "Sending fragmented message size = %d bytes\n", msglen); + + switch (cpd->cpd_state) { + case CPD_STATE_UNJOINED: + error = CS_ERR_NOT_EXIST; + break; + case CPD_STATE_LEAVE_STARTED: + error = CS_ERR_NOT_EXIST; + break; + case CPD_STATE_JOIN_STARTED: + error = CS_OK; + break; + case CPD_STATE_JOIN_COMPLETED: + error = CS_OK; + break; + } + + res_lib_cpg_partial_send.header.size = sizeof(res_lib_cpg_partial_send); + res_lib_cpg_partial_send.header.id = MESSAGE_RES_CPG_PARTIAL_SEND; + + if (req_lib_cpg_mcast->type == LIBCPG_PARTIAL_FIRST) { + cpd->initial_transition_counter = cpd->transition_counter; + } + if (cpd->transition_counter != cpd->initial_transition_counter) { + error = CS_ERR_INTERRUPT; + } + + if (error == CS_OK) { + req_exec_cpg_mcast.header.size = sizeof(req_exec_cpg_mcast) + msglen; + req_exec_cpg_mcast.header.id = SERVICE_ID_MAKE(CPG_SERVICE, + MESSAGE_REQ_EXEC_CPG_PARTIAL_MCAST); + req_exec_cpg_mcast.pid = cpd->pid; + req_exec_cpg_mcast.msglen = req_lib_cpg_mcast->msglen; + req_exec_cpg_mcast.type = req_lib_cpg_mcast->type; + req_exec_cpg_mcast.fraglen = req_lib_cpg_mcast->fraglen; + api->ipc_source_set (&req_exec_cpg_mcast.source, conn); + memcpy(&req_exec_cpg_mcast.group_name, &group_name, + sizeof(mar_cpg_name_t)); + + req_exec_cpg_iovec[0].iov_base = (char *)&req_exec_cpg_mcast; + req_exec_cpg_iovec[0].iov_len = sizeof(req_exec_cpg_mcast); + req_exec_cpg_iovec[1].iov_base = (char *)&req_lib_cpg_mcast->message; + req_exec_cpg_iovec[1].iov_len = msglen; + + result = api->totem_mcast (req_exec_cpg_iovec, 2, TOTEM_AGREED); + assert(result == 0); + } else { + log_printf(LOGSYS_LEVEL_ERROR, "*** %p can't mcast to group %s state:%d, error:%d", + conn, group_name.value, cpd->cpd_state, error); + } + + res_lib_cpg_partial_send.header.error = error; + api->ipc_response_send (conn, &res_lib_cpg_partial_send, + sizeof (res_lib_cpg_partial_send)); +} + /* Mcast message from the library */ static void message_handler_req_lib_cpg_mcast (void *conn, const void *message) { diff --git a/include/corosync/cpg.h b/include/corosync/cpg.h index 55fc4b8..f66fb14 100644 --- a/include/corosync/cpg.h +++ b/include/corosync/cpg.h @@ -186,6 +186,13 @@ cs_error_t cpg_fd_get ( int *fd); /** + * Get maximum size of a message that will not be fragmented + */ +cs_error_t cpg_max_atomic_msgsize_get ( + cpg_handle_t handle, + uint32_t *size); + +/** * Get contexts for a CPG handle */ cs_error_t cpg_context_get ( diff --git a/include/corosync/ipc_cpg.h b/include/corosync/ipc_cpg.h index a95335a..5008acf 100644 --- a/include/corosync/ipc_cpg.h +++ b/include/corosync/ipc_cpg.h @@ -1,5 +1,5 @@ /* - * Copyright (c) 2006-2011 Red Hat, Inc. + * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * @@ -55,6 +55,7 @@ enum req_cpg_types { MESSAGE_REQ_CPG_ZC_ALLOC = 9, MESSAGE_REQ_CPG_ZC_FREE = 10, MESSAGE_REQ_CPG_ZC_EXECUTE = 11, + MESSAGE_REQ_CPG_PARTIAL_MCAST = 12, }; enum res_cpg_types { @@ -75,6 +76,8 @@ enum res_cpg_types { MESSAGE_RES_CPG_ZC_ALLOC = 14, MESSAGE_RES_CPG_ZC_FREE = 15, MESSAGE_RES_CPG_ZC_EXECUTE = 16, + MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK = 17, + MESSAGE_RES_CPG_PARTIAL_SEND = 18, }; enum lib_cpg_confchg_reason { @@ -85,6 +88,12 @@ enum lib_cpg_confchg_reason { CONFCHG_CPG_REASON_PROCDOWN = 5 }; +enum lib_cpg_partial_types { + LIBCPG_PARTIAL_FIRST = 1, + LIBCPG_PARTIAL_CONTINUED = 2, + LIBCPG_PARTIAL_LAST = 3, +}; + typedef struct { uint32_t length __attribute__((aligned(8))); char value[CPG_MAX_NAME_LENGTH] __attribute__((aligned(8))); @@ -200,6 +209,10 @@ struct res_lib_cpg_local_get { mar_uint32_t local_nodeid __attribute__((aligned(8))); }; +struct res_lib_cpg_partial_send { + struct qb_ipc_response_header header __attribute__((aligned(8))); +}; + struct req_lib_cpg_mcast { struct qb_ipc_response_header header __attribute__((aligned(8))); mar_uint32_t guarantee __attribute__((aligned(8))); @@ -207,6 +220,15 @@ struct req_lib_cpg_mcast { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct req_lib_cpg_partial_mcast { + struct qb_ipc_response_header header __attribute__((aligned(8))); + mar_uint32_t guarantee __attribute__((aligned(8))); + mar_uint32_t msglen __attribute__((aligned(8))); + mar_uint32_t fraglen __attribute__((aligned(8))); + mar_uint32_t type __attribute__((aligned(8))); + mar_uint8_t message[] __attribute__((aligned(8))); +}; + struct res_lib_cpg_mcast { struct qb_ipc_response_header header __attribute__((aligned(8))); }; @@ -223,6 +245,17 @@ struct res_lib_cpg_deliver_callback { mar_uint8_t message[] __attribute__((aligned(8))); }; +struct res_lib_cpg_partial_deliver_callback { + struct qb_ipc_response_header header __attribute__((aligned(8))); + mar_cpg_name_t group_name __attribute__((aligned(8))); + mar_uint32_t msglen __attribute__((aligned(8))); + mar_uint32_t fraglen __attribute__((aligned(8))); + mar_uint32_t nodeid __attribute__((aligned(8))); + mar_uint32_t pid __attribute__((aligned(8))); + mar_uint32_t type __attribute__((aligned(8))); + mar_uint8_t message[] __attribute__((aligned(8))); +}; + struct res_lib_cpg_flowcontrol_callback { struct qb_ipc_response_header header __attribute__((aligned(8))); mar_uint32_t flow_control_state __attribute__((aligned(8))); diff --git a/lib/cpg.c b/lib/cpg.c index 4b92f44..037e8a9 100644 --- a/lib/cpg.c +++ b/lib/cpg.c @@ -1,7 +1,7 @@ /* * vi: set autoindent tabstop=4 shiftwidth=4 : * - * Copyright (c) 2006-2012 Red Hat, Inc. + * Copyright (c) 2006-2015 Red Hat, Inc. * * All rights reserved. * @@ -70,6 +70,12 @@ #endif /* + * Maximum number of times to retry a send when transmitting + * a large message fragment + */ +#define MAX_RETRIES 100 + +/* * ZCB files have following umask (umask is same as used in libqb) */ #define CPG_MEMORY_MAP_UMASK 077 @@ -83,6 +89,14 @@ struct cpg_inst { cpg_model_v1_data_t model_v1_data; }; struct list_head iteration_list_head; + uint32_t max_msg_size; + char *assembly_buf; + uint32_t assembly_buf_ptr; + int assembling; /* Flag that says we have started assembling a message. + * It's here to catch the situation where a node joins + * the cluster/group in the middle of a CPG message send + * so we don't pass on a partial message to the client. + */ }; static void cpg_inst_free (void *inst); @@ -210,6 +224,8 @@ cs_error_t cpg_model_initialize ( } } + /* Allow space for corosync internal headers */ + cpg_inst->max_msg_size = IPC_REQUEST_SIZE - 1024; cpg_inst->model_data.model = model; cpg_inst->context = context; @@ -291,6 +307,25 @@ cs_error_t cpg_fd_get ( return (error); } +cs_error_t cpg_max_atomic_msgsize_get ( + cpg_handle_t handle, + uint32_t *size) +{ + cs_error_t error; + struct cpg_inst *cpg_inst; + + error = hdb_error_to_cs (hdb_handle_get (&cpg_handle_t_db, handle, (void *)&cpg_inst)); + if (error != CS_OK) { + return (error); + } + + *size = cpg_inst->max_msg_size; + + hdb_handle_put (&cpg_handle_t_db, handle); + + return (error); +} + cs_error_t cpg_context_get ( cpg_handle_t handle, void **context) @@ -339,6 +374,7 @@ cs_error_t cpg_dispatch ( struct cpg_inst *cpg_inst; struct res_lib_cpg_confchg_callback *res_cpg_confchg_callback; struct res_lib_cpg_deliver_callback *res_cpg_deliver_callback; + struct res_lib_cpg_partial_deliver_callback *res_cpg_partial_deliver_callback; struct res_lib_cpg_totem_confchg_callback *res_cpg_totem_confchg_callback; struct cpg_inst cpg_inst_copy; struct qb_ipc_response_header *dispatch_data; @@ -361,7 +397,7 @@ cs_error_t cpg_dispatch ( /* * Timeout instantly for CS_DISPATCH_ONE_NONBLOCKING or CS_DISPATCH_ALL and - * wait indefinately for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING + * wait indefinitely for CS_DISPATCH_ONE or CS_DISPATCH_BLOCKING */ if (dispatch_types == CS_DISPATCH_ALL || dispatch_types == CS_DISPATCH_ONE_NONBLOCKING) { timeout = 0; @@ -428,6 +464,43 @@ cs_error_t cpg_dispatch ( res_cpg_deliver_callback->msglen); break; + case MESSAGE_RES_CPG_PARTIAL_DELIVER_CALLBACK: + res_cpg_partial_deliver_callback = (struct res_lib_cpg_partial_deliver_callback *)dispatch_data; + + marshall_from_mar_cpg_name_t ( + &group_name, + &res_cpg_partial_deliver_callback->group_name); + + if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_FIRST) { + /* + * Allocate a buffer to contain a full message. + */ + cpg_inst->assembly_buf = malloc(res_cpg_partial_deliver_callback->msglen); + if (!cpg_inst->assembly_buf) { + error = CS_ERR_NO_MEMORY; + goto error_put; + } + cpg_inst->assembling = 1; + cpg_inst->assembly_buf_ptr = 0; + } + if (cpg_inst->assembling) { + memcpy(cpg_inst->assembly_buf + cpg_inst->assembly_buf_ptr, + res_cpg_partial_deliver_callback->message, res_cpg_partial_deliver_callback->fraglen); + cpg_inst->assembly_buf_ptr += res_cpg_partial_deliver_callback->fraglen; + + if (res_cpg_partial_deliver_callback->type == LIBCPG_PARTIAL_LAST) { + cpg_inst_copy.model_v1_data.cpg_deliver_fn (handle, + &group_name, + res_cpg_partial_deliver_callback->nodeid, + res_cpg_partial_deliver_callback->pid, + cpg_inst->assembly_buf, + res_cpg_partial_deliver_callback->msglen); + free(cpg_inst->assembly_buf); + cpg_inst->assembling = 0; + } + } + break; + case MESSAGE_RES_CPG_CONFCHG_CALLBACK: if (cpg_inst_copy.model_v1_data.cpg_confchg_fn == NULL) { break; @@ -921,6 +994,12 @@ cs_error_t cpg_zcb_mcast_joined ( if (error != CS_OK) { return (error); } + + if (msg_len > IPC_REQUEST_SIZE) { + error = CS_ERR_TOO_BIG; + goto error_exit; + } + req_lib_cpg_mcast = (struct req_lib_cpg_mcast *)(((char *)msg) - sizeof (struct req_lib_cpg_mcast)); req_lib_cpg_mcast->header.size = sizeof (struct req_lib_cpg_mcast) + msg_len; @@ -957,6 +1036,88 @@ error_exit: return (error); } +static cs_error_t send_fragments ( + struct cpg_inst *cpg_inst, + cpg_guarantee_t guarantee, + size_t msg_len, + const struct iovec *iovec, + unsigned int iov_len) +{ + int i; + cs_error_t error = CS_OK; + struct iovec iov[2]; + struct req_lib_cpg_partial_mcast req_lib_cpg_mcast; + struct res_lib_cpg_partial_send res_lib_cpg_partial_send; + size_t sent = 0; + size_t iov_sent = 0; + int retry_count; + + req_lib_cpg_mcast.header.id = MESSAGE_REQ_CPG_PARTIAL_MCAST; + req_lib_cpg_mcast.guarantee = guarantee; + req_lib_cpg_mcast.msglen = msg_len; + + iov[0].iov_base = (void *)&req_lib_cpg_mcast; + iov[0].iov_len = sizeof (struct req_lib_cpg_partial_mcast); + + i=0; + iov_sent = 0 ; + qb_ipcc_fc_enable_max_set(cpg_inst->c, 2); + + while (error == CS_OK && sent < msg_len) { + + retry_count = 0; + if ( (iovec[i].iov_len - iov_sent) > cpg_inst->max_msg_size) { + iov[1].iov_len = cpg_inst->max_msg_size; + } + else { + iov[1].iov_len = iovec[i].iov_len - iov_sent; + } + + if (sent == 0) { + req_lib_cpg_mcast.type = LIBCPG_PARTIAL_FIRST; + } + else if ((sent + iov[1].iov_len) == msg_len) { + req_lib_cpg_mcast.type = LIBCPG_PARTIAL_LAST; + } + else { + req_lib_cpg_mcast.type = LIBCPG_PARTIAL_CONTINUED; + } + + req_lib_cpg_mcast.fraglen = iov[1].iov_len; + req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_partial_mcast) + iov[1].iov_len; + iov[1].iov_base = (char *)iovec[i].iov_base + iov_sent; + + resend: + error = coroipcc_msg_send_reply_receive (cpg_inst->c, iov, 2, + &res_lib_cpg_partial_send, + sizeof (res_lib_cpg_partial_send)); + + if (error == CS_ERR_TRY_AGAIN) { + fprintf(stderr, "sleep. counter=%d\n", retry_count); + if (++retry_count > MAX_RETRIES) { + goto error_exit; + } + usleep(10000); + goto resend; + } + + iov_sent += iov[1].iov_len; + sent += iov[1].iov_len; + + /* Next iovec */ + if (iov_sent >= iovec[i].iov_len) { + i++; + iov_sent = 0; + } + error = res_lib_cpg_partial_send.header.error; + } +error_exit: + qb_ipcc_fc_enable_max_set(cpg_inst->c, 1); + + return error; +} + + cs_error_t cpg_mcast_joined ( cpg_handle_t handle, cpg_guarantee_t guarantee, @@ -979,6 +1140,11 @@ cs_error_t cpg_mcast_joined ( msg_len += iovec[i].iov_len; } + if (msg_len > cpg_inst->max_msg_size) { + error = send_fragments(cpg_inst, guarantee, msg_len, iovec, iov_len); + goto error_exit; + } + req_lib_cpg_mcast.header.size = sizeof (struct req_lib_cpg_mcast) + msg_len; @@ -994,6 +1160,7 @@ cs_error_t cpg_mcast_joined ( error = qb_to_cs_error(qb_ipcc_sendv(cpg_inst->c, iov, iov_len + 1)); qb_ipcc_fc_enable_max_set(cpg_inst->c, 1); +error_exit: hdb_handle_put (&cpg_handle_t_db, handle); return (error); diff --git a/test/Makefile.am b/test/Makefile.am index c19e506..bb11518 100644 --- a/test/Makefile.am +++ b/test/Makefile.am @@ -34,7 +34,7 @@ MAINTAINERCLEANFILES = Makefile.in EXTRA_DIST = ploadstart.sh -noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench \ +noinst_PROGRAMS = cpgverify testcpg testcpg2 cpgbench cpghum \ testquorum testvotequorum1 testvotequorum2 \ stress_cpgfdget stress_cpgcontext cpgbound testsam \ testcpgzc cpgbenchzc testzcgc stress_cpgzc @@ -48,6 +48,7 @@ testzcgc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la stress_cpgzc_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la stress_cpgfdget_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la stress_cpgcontext_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la +cpghum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libcpg.la -lz testquorum_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libquorum.la testvotequorum1_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la testvotequorum2_LDADD = $(LIBQB_LIBS) $(top_builddir)/lib/libvotequorum.la -- 1.7.1