commit 330600b4cafd37f51d109728f9377bc921a6aa7f Author: Andrew Beekhof Date: Mon Oct 14 13:50:41 2013 +1100 Feature: remote: Properly version the remote connection protocol (cherry picked from commit aa024490a023bd0d0d5568f08b2c60b61adb15f5) diff --git a/include/crm/common/ipcs.h b/include/crm/common/ipcs.h index 3e78d51..84249a3 100644 --- a/include/crm/common/ipcs.h +++ b/include/crm/common/ipcs.h @@ -41,14 +41,17 @@ enum client_type { struct crm_remote_s { /* Shared */ char *buffer; + size_t buffer_size; + size_t buffer_offset; int auth_timeout; - bool authenticated; /* CIB-only */ + int tcp_socket; mainloop_io_t *source; - char *token; /* CIB Only */ - - int tcp_socket; + /* CIB-only */ + bool authenticated; + char *token; + /* TLS only */ # ifdef HAVE_GNUTLS_GNUTLS_H gnutls_session_t *tls_session; bool tls_handshake_complete; diff --git a/lib/common/remote.c b/lib/common/remote.c index 8b00f16..297c75a 100644 --- a/lib/common/remote.c +++ b/lib/common/remote.c @@ -34,6 +34,8 @@ #include #include +#include + #include #include #include @@ -57,6 +59,17 @@ const int anon_tls_kx_order[] = { 0 }; +struct crm_remote_header_v0 +{ + uint64_t id; + uint64_t flags; + uint32_t error; + uint32_t version; + uint32_t size_total; + uint32_t payload_uncompressed; + uint32_t payload_compressed; +}; + int crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms) { @@ -176,97 +189,6 @@ crm_send_tls(gnutls_session_t * session, const char *buf, size_t len) return rc < 0 ? rc : total_send; } - -/*! - * \internal - * \brief Read bytes off non blocking tls session. - * - * \param session - tls session to read - * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit - * - * \note only use with NON-Blocking sockets. Should only be used after polling socket. - * This function will return once max_size is met, the socket read buffer - * is empty, or an error is encountered. - * - * \retval '\0' terminated buffer on success - */ -static char * -crm_recv_tls(gnutls_session_t * session, size_t max_size, size_t * recv_len, int *disconnected) -{ - char *buf = NULL; - int rc = 0; - size_t len = 0; - size_t chunk_size = max_size ? max_size : 1024; - size_t buf_size = 0; - size_t read_size = 0; - - if (session == NULL) { - if (disconnected) { - *disconnected = 1; - } - goto done; - } - - buf = calloc(1, chunk_size + 1); - buf_size = chunk_size; - - while (TRUE) { - read_size = buf_size - len; - - /* automatically grow the buffer when needed if max_size is not set. */ - if (!max_size && (read_size < (chunk_size / 2))) { - buf_size += chunk_size; - crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, - buf_size); - buf = realloc(buf, buf_size + 1); - CRM_ASSERT(buf != NULL); - - read_size = buf_size - len; - } - - rc = gnutls_record_recv(*session, buf + len, read_size); - - if (rc > 0) { - crm_trace("Got %d more bytes.", rc); - len += rc; - /* always null terminate buffer, the +1 to alloc always allows for this. */ - buf[len] = '\0'; - } - if (max_size && (max_size == read_size)) { - crm_trace("Buffer max read size %d met", max_size); - goto done; - } - - /* process any errors. */ - if (rc == GNUTLS_E_INTERRUPTED) { - crm_trace("EINTR encoutered, retry tls read"); - } else if (rc == GNUTLS_E_AGAIN) { - crm_trace("non-blocking, exiting read on rc = %d", rc); - goto done; - } else if (rc <= 0) { - if (rc == 0) { - crm_debug("EOF encoutered during TLS read"); - } else { - crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc); - } - if (disconnected) { - *disconnected = 1; - } - goto done; - } - } - - done: - if (recv_len) { - *recv_len = len; - } - if (!len) { - free(buf); - buf = NULL; - } - return buf; - -} #endif static int @@ -310,141 +232,57 @@ crm_send_plaintext(int sock, const char *buf, size_t len) } -/*! - * \internal - * \brief Read bytes off non blocking socket. - * - * \param session - tls session to read - * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit - * - * \note only use with NON-Blocking sockets. Should only be used after polling socket. - * This function will return once max_size is met, the socket read buffer - * is empty, or an error is encountered. - * - * \retval '\0' terminated buffer on success - */ -static char * -crm_recv_plaintext(int sock, size_t max_size, size_t * recv_len, int *disconnected) -{ - char *buf = NULL; - ssize_t rc = 0; - ssize_t len = 0; - ssize_t chunk_size = max_size ? max_size : 1024; - size_t buf_size = 0; - size_t read_size = 0; - - if (sock <= 0) { - if (disconnected) { - *disconnected = 1; - } - goto done; - } - - buf = calloc(1, chunk_size + 1); - buf_size = chunk_size; - - while (TRUE) { - errno = 0; - read_size = buf_size - len; - - /* automatically grow the buffer when needed if max_size is not set. */ - if (!max_size && (read_size < (chunk_size / 2))) { - buf_size += chunk_size; - crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size, - buf_size); - buf = realloc(buf, buf_size + 1); - CRM_ASSERT(buf != NULL); - - read_size = buf_size - len; - } - - rc = read(sock, buf + len, chunk_size); - - if (rc > 0) { - crm_trace("Got %d more bytes. errno=%d", (int)rc, errno); - len += rc; - /* always null terminate buffer, the +1 to alloc always allows for this. */ - buf[len] = '\0'; - } - if (max_size && (max_size == read_size)) { - crm_trace("Buffer max read size %d met", max_size); - goto done; - } - - if (rc > 0) { - continue; - } else if (rc == 0) { - if (disconnected) { - *disconnected = 1; - } - crm_trace("EOF encoutered during read"); - goto done; - } - - /* process errors */ - if (errno == EINTR) { - crm_trace("EINTER encoutered, retry socket read."); - } else if (errno == EAGAIN) { - crm_trace("non-blocking, exiting read on rc = %d", rc); - goto done; - } else if (errno <= 0) { - if (disconnected) { - *disconnected = 1; - } - crm_debug("Error receiving message: %d", (int)rc); - goto done; - } - } - - done: - if (recv_len) { - *recv_len = len; - } - if (!len) { - free(buf); - buf = NULL; - } - return buf; -} - static int -crm_remote_send_raw(crm_remote_t * remote, const char *buf, size_t len) +crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs) { + int lpc = 0; int rc = -ESOCKTNOSUPPORT; - if (remote->tcp_socket) { - rc = crm_send_plaintext(remote->tcp_socket, buf, len); + for(; lpc < iovs; lpc++) { + if (remote->tcp_socket) { + rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len); #ifdef HAVE_GNUTLS_GNUTLS_H - } else if (remote->tls_session) { - rc = crm_send_tls(remote->tls_session, buf, len); + } else if (remote->tls_session) { + rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len); #endif - } else { - crm_err("Unsupported connection type"); + } else { + crm_err("Unsupported connection type"); + } } return rc; } +#define PCMK_TLS_VERSION 1 + int crm_remote_send(crm_remote_t * remote, xmlNode * msg) { + static uint64_t id = 0; int rc = -1; - char *xml_text = NULL; - int len = 0; + char *xml_text = dump_xml_unformatted(msg); - xml_text = dump_xml_unformatted(msg); - if (xml_text) { - len = strlen(xml_text); - } else { + struct iovec iov[2]; + struct crm_remote_header_v0 *header = calloc(1, sizeof(struct crm_remote_header_v0)); + + if (xml_text == NULL) { crm_err("Invalid XML, can not send msg"); return -1; } - rc = crm_remote_send_raw(remote, xml_text, len); - if (rc >= 0) { - rc = crm_remote_send_raw(remote, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR)); - } + iov[0].iov_base = header; + iov[0].iov_len = sizeof(struct crm_remote_header_v0); + + iov[1].iov_base = xml_text; + iov[1].iov_len = 1 + strlen(xml_text); + id++; + header->id = id; + header->version = PCMK_TLS_VERSION; + header->size_total = iov[0].iov_len + iov[1].iov_len; + header->payload_uncompressed = iov[1].iov_len; + + rc = crm_remote_sendv(remote, iov, 2); if (rc < 0) { crm_err("Failed to send remote msg, rc = %d", rc); } @@ -461,44 +299,55 @@ crm_remote_send(crm_remote_t * remote, xmlNode * msg) xmlNode * crm_remote_parse_buffer(crm_remote_t * remote) { - char *buf = NULL; - char *start = NULL; - char *end = NULL; xmlNode *xml = NULL; + size_t offset = sizeof(struct crm_remote_header_v0); + struct crm_remote_header_v0 *header = NULL; if (remote->buffer == NULL) { return NULL; + + } else if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) { + return NULL; } /* take ownership of the buffer */ - buf = remote->buffer; - remote->buffer = NULL; - - /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */ - start = buf; - end = strstr(start, REMOTE_MSG_TERMINATOR); + remote->buffer_offset = 0; + + header = (struct crm_remote_header_v0 *)remote->buffer; + /* Support compression on the receiving end now, in case we ever want to add it later */ + if (header->payload_compressed) { + int rc = 0; + unsigned int size_u = 1 + header->payload_uncompressed; + char *uncompressed = calloc(1, offset + size_u); + + crm_trace("Decompressing message data %d bytes into %d bytes", + header->payload_compressed, size_u); + + rc = BZ2_bzBuffToBuffDecompress(uncompressed + offset, &size_u, + remote->buffer + offset, + header->payload_compressed, 1, 0); + + if (rc != BZ_OK) { + crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc); + free(uncompressed); + return NULL; + } - while (!xml && end) { + CRM_ASSERT(size_u == header->payload_uncompressed); - /* grab the message */ - end[0] = '\0'; - end += strlen(REMOTE_MSG_TERMINATOR); + memcpy(uncompressed, remote->buffer, offset); /* Preserve the header */ + header = (struct crm_remote_header_v0 *)uncompressed; - xml = string2xml(start); - if (xml == NULL) { - crm_err("Couldn't parse: '%.120s'", start); - } - start = end; - end = strstr(start, REMOTE_MSG_TERMINATOR); + free(remote->buffer); + remote->buffer_size = offset + size_u; + remote->buffer = uncompressed; } - if (xml && start) { - /* we have msgs left over, save it until next time */ - remote->buffer = strdup(start); - free(buf); - } else if (!xml) { - /* no msg present */ - remote->buffer = buf; + CRM_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0); + + xml = string2xml(remote->buffer + offset); + if (xml == NULL) { + crm_err("Couldn't parse: '%.120s'", remote->buffer + offset); } return xml; @@ -559,6 +408,105 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ ) return rc; } + +/*! + * \internal + * \brief Read bytes off non blocking remote connection. + * + * \note only use with NON-Blocking sockets. Should only be used after polling socket. + * This function will return once max_size is met, the socket read buffer + * is empty, or an error is encountered. + * + * \retval number of bytes received + */ +static size_t +crm_remote_recv_once(crm_remote_t * remote) +{ + int rc = 0; + size_t read_len = sizeof(struct crm_remote_header_v0); + + if(remote->buffer_offset >= sizeof(struct crm_remote_header_v0)) { + struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer; + + read_len = hdr->size_total; + } + + /* automatically grow the buffer when needed */ + if(remote->buffer_size < read_len) { + remote->buffer_size = 2 * read_len; + crm_trace("Expanding buffer to %u bytes", remote->buffer_size); + + remote->buffer = realloc(remote->buffer, remote->buffer_size + 1); + CRM_ASSERT(remote->buffer != NULL); + } + + if (remote->tcp_socket) { + errno = 0; + rc = read(remote->tcp_socket, + remote->buffer + remote->buffer_offset, + remote->buffer_size - remote->buffer_offset); + if(rc < 0) { + rc = -errno; + } + +#ifdef HAVE_GNUTLS_GNUTLS_H + } else if (remote->tls_session) { + rc = gnutls_record_recv(*(remote->tls_session), + remote->buffer + remote->buffer_offset, + remote->buffer_size - remote->buffer_offset); + if (rc == GNUTLS_E_INTERRUPTED) { + rc = -EINTR; + } else if (rc == GNUTLS_E_AGAIN) { + rc = -EAGAIN; + } else if (rc < 0) { + crm_info("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc); + rc = -pcmk_err_generic; + } +#endif + } else { + crm_err("Unsupported connection type"); + return -ESOCKTNOSUPPORT; + } + + /* process any errors. */ + if (rc > 0) { + remote->buffer_offset += rc; + /* always null terminate buffer, the +1 to alloc always allows for this. */ + remote->buffer[remote->buffer_offset] = '\0'; + crm_trace("Received %u more bytes, %u total", rc, remote->buffer_offset); + + } else if (rc == -EINTR || rc == -EAGAIN) { + crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc); + + } else if (rc == 0) { + crm_debug("EOF encoutered after %u bytes", remote->buffer_offset); + return -ENOTCONN; + + } else if (rc <= 0) { + crm_debug("Error receiving message after %u bytes: %s (%d)", + remote->buffer_offset, gnutls_strerror(rc), rc); + return -ENOTCONN; + } + + if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) { + crm_trace("Not enough data to fill header: %u < %u bytes", + remote->buffer_offset, sizeof(struct crm_remote_header_v0)); + return -EAGAIN; + + } else { + struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer; + + if(remote->buffer_offset < hdr->size_total) { + crm_trace("Read less than the advertised length: %u < %u bytes", + remote->buffer_offset, hdr->size_total); + return -EAGAIN; + } + } + + crm_trace("Read full message of %u bytes", remote->buffer_offset); + return remote->buffer_offset; +} + /*! * \internal * \brief Read data off the socket until at least one full message is present or timeout occures. @@ -569,10 +517,8 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ ) gboolean crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected) { - int ret; - size_t request_len = 0; + int rc; time_t start = time(NULL); - char *raw_request = NULL; int remaining_timeout = 0; if (total_timeout == 0) { @@ -588,57 +534,30 @@ crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconne /* read some more off the tls buffer if we still have time left. */ crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d", total_timeout, remaining_timeout); - ret = crm_remote_ready(remote, remaining_timeout); - raw_request = NULL; + rc = crm_remote_ready(remote, remaining_timeout); - if (ret == 0) { + if (rc == 0) { crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout); return FALSE; - } else if (ret < 0) { - if (errno != EINTR) { - crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret, - errno); - *disconnected = 1; - return FALSE; - } - crm_debug("poll EINTR encountered during poll, retrying"); - - } else if (remote->tcp_socket) { - raw_request = crm_recv_plaintext(remote->tcp_socket, 0, &request_len, disconnected); + } else if(rc < 0) { + crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc); -#ifdef HAVE_GNUTLS_GNUTLS_H - } else if (remote->tls_session) { - raw_request = crm_recv_tls(remote->tls_session, 0, &request_len, disconnected); -#endif } else { - crm_err("Unsupported connection type"); - } - - remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000); - - if (!raw_request) { - crm_debug("Empty msg received after poll"); - continue; + rc = crm_remote_recv_once(remote); + if(rc > 0) { + return TRUE; + } else if (rc < 0) { + crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc); + } } - if (remote->buffer) { - int old_len = strlen(remote->buffer); - - crm_trace("Expanding recv buffer from %d to %d", old_len, old_len + request_len); - - remote->buffer = realloc(remote->buffer, old_len + request_len + 1); - memcpy(remote->buffer + old_len, raw_request, request_len); - *(remote->buffer + old_len + request_len) = '\0'; - free(raw_request); - - } else { - remote->buffer = raw_request; + if(rc == -ENOTCONN) { + *disconnected = 1; + return FALSE; } - if (strstr(remote->buffer, REMOTE_MSG_TERMINATOR)) { - return TRUE; - } + remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000); } return FALSE;