commit 330600b4cafd37f51d109728f9377bc921a6aa7f
Author: Andrew Beekhof <andrew@beekhof.net>
Date: Mon Oct 14 13:50:41 2013 +1100
Feature: remote: Properly version the remote connection protocol
(cherry picked from commit aa024490a023bd0d0d5568f08b2c60b61adb15f5)
diff --git a/include/crm/common/ipcs.h b/include/crm/common/ipcs.h
index 3e78d51..84249a3 100644
--- a/include/crm/common/ipcs.h
+++ b/include/crm/common/ipcs.h
@@ -41,14 +41,17 @@ enum client_type {
struct crm_remote_s {
/* Shared */
char *buffer;
+ size_t buffer_size;
+ size_t buffer_offset;
int auth_timeout;
- bool authenticated; /* CIB-only */
+ int tcp_socket;
mainloop_io_t *source;
- char *token; /* CIB Only */
-
- int tcp_socket;
+ /* CIB-only */
+ bool authenticated;
+ char *token;
+ /* TLS only */
# ifdef HAVE_GNUTLS_GNUTLS_H
gnutls_session_t *tls_session;
bool tls_handshake_complete;
diff --git a/lib/common/remote.c b/lib/common/remote.c
index 8b00f16..297c75a 100644
--- a/lib/common/remote.c
+++ b/lib/common/remote.c
@@ -34,6 +34,8 @@
#include <fcntl.h>
#include <glib.h>
+#include <bzlib.h>
+
#include <crm/common/ipcs.h>
#include <crm/common/xml.h>
#include <crm/common/mainloop.h>
@@ -57,6 +59,17 @@ const int anon_tls_kx_order[] = {
0
};
+struct crm_remote_header_v0
+{
+ uint64_t id;
+ uint64_t flags;
+ uint32_t error;
+ uint32_t version;
+ uint32_t size_total;
+ uint32_t payload_uncompressed;
+ uint32_t payload_compressed;
+};
+
int
crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
{
@@ -176,97 +189,6 @@ crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
return rc < 0 ? rc : total_send;
}
-
-/*!
- * \internal
- * \brief Read bytes off non blocking tls session.
- *
- * \param session - tls session to read
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
- *
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
- * This function will return once max_size is met, the socket read buffer
- * is empty, or an error is encountered.
- *
- * \retval '\0' terminated buffer on success
- */
-static char *
-crm_recv_tls(gnutls_session_t * session, size_t max_size, size_t * recv_len, int *disconnected)
-{
- char *buf = NULL;
- int rc = 0;
- size_t len = 0;
- size_t chunk_size = max_size ? max_size : 1024;
- size_t buf_size = 0;
- size_t read_size = 0;
-
- if (session == NULL) {
- if (disconnected) {
- *disconnected = 1;
- }
- goto done;
- }
-
- buf = calloc(1, chunk_size + 1);
- buf_size = chunk_size;
-
- while (TRUE) {
- read_size = buf_size - len;
-
- /* automatically grow the buffer when needed if max_size is not set. */
- if (!max_size && (read_size < (chunk_size / 2))) {
- buf_size += chunk_size;
- crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
- buf_size);
- buf = realloc(buf, buf_size + 1);
- CRM_ASSERT(buf != NULL);
-
- read_size = buf_size - len;
- }
-
- rc = gnutls_record_recv(*session, buf + len, read_size);
-
- if (rc > 0) {
- crm_trace("Got %d more bytes.", rc);
- len += rc;
- /* always null terminate buffer, the +1 to alloc always allows for this. */
- buf[len] = '\0';
- }
- if (max_size && (max_size == read_size)) {
- crm_trace("Buffer max read size %d met", max_size);
- goto done;
- }
-
- /* process any errors. */
- if (rc == GNUTLS_E_INTERRUPTED) {
- crm_trace("EINTR encoutered, retry tls read");
- } else if (rc == GNUTLS_E_AGAIN) {
- crm_trace("non-blocking, exiting read on rc = %d", rc);
- goto done;
- } else if (rc <= 0) {
- if (rc == 0) {
- crm_debug("EOF encoutered during TLS read");
- } else {
- crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
- }
- if (disconnected) {
- *disconnected = 1;
- }
- goto done;
- }
- }
-
- done:
- if (recv_len) {
- *recv_len = len;
- }
- if (!len) {
- free(buf);
- buf = NULL;
- }
- return buf;
-
-}
#endif
static int
@@ -310,141 +232,57 @@ crm_send_plaintext(int sock, const char *buf, size_t len)
}
-/*!
- * \internal
- * \brief Read bytes off non blocking socket.
- *
- * \param session - tls session to read
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
- *
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
- * This function will return once max_size is met, the socket read buffer
- * is empty, or an error is encountered.
- *
- * \retval '\0' terminated buffer on success
- */
-static char *
-crm_recv_plaintext(int sock, size_t max_size, size_t * recv_len, int *disconnected)
-{
- char *buf = NULL;
- ssize_t rc = 0;
- ssize_t len = 0;
- ssize_t chunk_size = max_size ? max_size : 1024;
- size_t buf_size = 0;
- size_t read_size = 0;
-
- if (sock <= 0) {
- if (disconnected) {
- *disconnected = 1;
- }
- goto done;
- }
-
- buf = calloc(1, chunk_size + 1);
- buf_size = chunk_size;
-
- while (TRUE) {
- errno = 0;
- read_size = buf_size - len;
-
- /* automatically grow the buffer when needed if max_size is not set. */
- if (!max_size && (read_size < (chunk_size / 2))) {
- buf_size += chunk_size;
- crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
- buf_size);
- buf = realloc(buf, buf_size + 1);
- CRM_ASSERT(buf != NULL);
-
- read_size = buf_size - len;
- }
-
- rc = read(sock, buf + len, chunk_size);
-
- if (rc > 0) {
- crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
- len += rc;
- /* always null terminate buffer, the +1 to alloc always allows for this. */
- buf[len] = '\0';
- }
- if (max_size && (max_size == read_size)) {
- crm_trace("Buffer max read size %d met", max_size);
- goto done;
- }
-
- if (rc > 0) {
- continue;
- } else if (rc == 0) {
- if (disconnected) {
- *disconnected = 1;
- }
- crm_trace("EOF encoutered during read");
- goto done;
- }
-
- /* process errors */
- if (errno == EINTR) {
- crm_trace("EINTER encoutered, retry socket read.");
- } else if (errno == EAGAIN) {
- crm_trace("non-blocking, exiting read on rc = %d", rc);
- goto done;
- } else if (errno <= 0) {
- if (disconnected) {
- *disconnected = 1;
- }
- crm_debug("Error receiving message: %d", (int)rc);
- goto done;
- }
- }
-
- done:
- if (recv_len) {
- *recv_len = len;
- }
- if (!len) {
- free(buf);
- buf = NULL;
- }
- return buf;
-}
-
static int
-crm_remote_send_raw(crm_remote_t * remote, const char *buf, size_t len)
+crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
{
+ int lpc = 0;
int rc = -ESOCKTNOSUPPORT;
- if (remote->tcp_socket) {
- rc = crm_send_plaintext(remote->tcp_socket, buf, len);
+ for(; lpc < iovs; lpc++) {
+ if (remote->tcp_socket) {
+ rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
#ifdef HAVE_GNUTLS_GNUTLS_H
- } else if (remote->tls_session) {
- rc = crm_send_tls(remote->tls_session, buf, len);
+ } else if (remote->tls_session) {
+ rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
#endif
- } else {
- crm_err("Unsupported connection type");
+ } else {
+ crm_err("Unsupported connection type");
+ }
}
return rc;
}
+#define PCMK_TLS_VERSION 1
+
int
crm_remote_send(crm_remote_t * remote, xmlNode * msg)
{
+ static uint64_t id = 0;
int rc = -1;
- char *xml_text = NULL;
- int len = 0;
+ char *xml_text = dump_xml_unformatted(msg);
- xml_text = dump_xml_unformatted(msg);
- if (xml_text) {
- len = strlen(xml_text);
- } else {
+ struct iovec iov[2];
+ struct crm_remote_header_v0 *header = calloc(1, sizeof(struct crm_remote_header_v0));
+
+ if (xml_text == NULL) {
crm_err("Invalid XML, can not send msg");
return -1;
}
- rc = crm_remote_send_raw(remote, xml_text, len);
- if (rc >= 0) {
- rc = crm_remote_send_raw(remote, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR));
- }
+ iov[0].iov_base = header;
+ iov[0].iov_len = sizeof(struct crm_remote_header_v0);
+
+ iov[1].iov_base = xml_text;
+ iov[1].iov_len = 1 + strlen(xml_text);
+ id++;
+ header->id = id;
+ header->version = PCMK_TLS_VERSION;
+ header->size_total = iov[0].iov_len + iov[1].iov_len;
+ header->payload_uncompressed = iov[1].iov_len;
+
+ rc = crm_remote_sendv(remote, iov, 2);
if (rc < 0) {
crm_err("Failed to send remote msg, rc = %d", rc);
}
@@ -461,44 +299,55 @@ crm_remote_send(crm_remote_t * remote, xmlNode * msg)
xmlNode *
crm_remote_parse_buffer(crm_remote_t * remote)
{
- char *buf = NULL;
- char *start = NULL;
- char *end = NULL;
xmlNode *xml = NULL;
+ size_t offset = sizeof(struct crm_remote_header_v0);
+ struct crm_remote_header_v0 *header = NULL;
if (remote->buffer == NULL) {
return NULL;
+
+ } else if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
+ return NULL;
}
/* take ownership of the buffer */
- buf = remote->buffer;
- remote->buffer = NULL;
-
- /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */
- start = buf;
- end = strstr(start, REMOTE_MSG_TERMINATOR);
+ remote->buffer_offset = 0;
+
+ header = (struct crm_remote_header_v0 *)remote->buffer;
+ /* Support compression on the receiving end now, in case we ever want to add it later */
+ if (header->payload_compressed) {
+ int rc = 0;
+ unsigned int size_u = 1 + header->payload_uncompressed;
+ char *uncompressed = calloc(1, offset + size_u);
+
+ crm_trace("Decompressing message data %d bytes into %d bytes",
+ header->payload_compressed, size_u);
+
+ rc = BZ2_bzBuffToBuffDecompress(uncompressed + offset, &size_u,
+ remote->buffer + offset,
+ header->payload_compressed, 1, 0);
+
+ if (rc != BZ_OK) {
+ crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
+ free(uncompressed);
+ return NULL;
+ }
- while (!xml && end) {
+ CRM_ASSERT(size_u == header->payload_uncompressed);
- /* grab the message */
- end[0] = '\0';
- end += strlen(REMOTE_MSG_TERMINATOR);
+ memcpy(uncompressed, remote->buffer, offset); /* Preserve the header */
+ header = (struct crm_remote_header_v0 *)uncompressed;
- xml = string2xml(start);
- if (xml == NULL) {
- crm_err("Couldn't parse: '%.120s'", start);
- }
- start = end;
- end = strstr(start, REMOTE_MSG_TERMINATOR);
+ free(remote->buffer);
+ remote->buffer_size = offset + size_u;
+ remote->buffer = uncompressed;
}
- if (xml && start) {
- /* we have msgs left over, save it until next time */
- remote->buffer = strdup(start);
- free(buf);
- } else if (!xml) {
- /* no msg present */
- remote->buffer = buf;
+ CRM_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
+
+ xml = string2xml(remote->buffer + offset);
+ if (xml == NULL) {
+ crm_err("Couldn't parse: '%.120s'", remote->buffer + offset);
}
return xml;
@@ -559,6 +408,105 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
return rc;
}
+
+/*!
+ * \internal
+ * \brief Read bytes off non blocking remote connection.
+ *
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
+ * This function will return once max_size is met, the socket read buffer
+ * is empty, or an error is encountered.
+ *
+ * \retval number of bytes received
+ */
+static size_t
+crm_remote_recv_once(crm_remote_t * remote)
+{
+ int rc = 0;
+ size_t read_len = sizeof(struct crm_remote_header_v0);
+
+ if(remote->buffer_offset >= sizeof(struct crm_remote_header_v0)) {
+ struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
+
+ read_len = hdr->size_total;
+ }
+
+ /* automatically grow the buffer when needed */
+ if(remote->buffer_size < read_len) {
+ remote->buffer_size = 2 * read_len;
+ crm_trace("Expanding buffer to %u bytes", remote->buffer_size);
+
+ remote->buffer = realloc(remote->buffer, remote->buffer_size + 1);
+ CRM_ASSERT(remote->buffer != NULL);
+ }
+
+ if (remote->tcp_socket) {
+ errno = 0;
+ rc = read(remote->tcp_socket,
+ remote->buffer + remote->buffer_offset,
+ remote->buffer_size - remote->buffer_offset);
+ if(rc < 0) {
+ rc = -errno;
+ }
+
+#ifdef HAVE_GNUTLS_GNUTLS_H
+ } else if (remote->tls_session) {
+ rc = gnutls_record_recv(*(remote->tls_session),
+ remote->buffer + remote->buffer_offset,
+ remote->buffer_size - remote->buffer_offset);
+ if (rc == GNUTLS_E_INTERRUPTED) {
+ rc = -EINTR;
+ } else if (rc == GNUTLS_E_AGAIN) {
+ rc = -EAGAIN;
+ } else if (rc < 0) {
+ crm_info("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
+ rc = -pcmk_err_generic;
+ }
+#endif
+ } else {
+ crm_err("Unsupported connection type");
+ return -ESOCKTNOSUPPORT;
+ }
+
+ /* process any errors. */
+ if (rc > 0) {
+ remote->buffer_offset += rc;
+ /* always null terminate buffer, the +1 to alloc always allows for this. */
+ remote->buffer[remote->buffer_offset] = '\0';
+ crm_trace("Received %u more bytes, %u total", rc, remote->buffer_offset);
+
+ } else if (rc == -EINTR || rc == -EAGAIN) {
+ crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
+
+ } else if (rc == 0) {
+ crm_debug("EOF encoutered after %u bytes", remote->buffer_offset);
+ return -ENOTCONN;
+
+ } else if (rc <= 0) {
+ crm_debug("Error receiving message after %u bytes: %s (%d)",
+ remote->buffer_offset, gnutls_strerror(rc), rc);
+ return -ENOTCONN;
+ }
+
+ if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
+ crm_trace("Not enough data to fill header: %u < %u bytes",
+ remote->buffer_offset, sizeof(struct crm_remote_header_v0));
+ return -EAGAIN;
+
+ } else {
+ struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
+
+ if(remote->buffer_offset < hdr->size_total) {
+ crm_trace("Read less than the advertised length: %u < %u bytes",
+ remote->buffer_offset, hdr->size_total);
+ return -EAGAIN;
+ }
+ }
+
+ crm_trace("Read full message of %u bytes", remote->buffer_offset);
+ return remote->buffer_offset;
+}
+
/*!
* \internal
* \brief Read data off the socket until at least one full message is present or timeout occures.
@@ -569,10 +517,8 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
gboolean
crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
{
- int ret;
- size_t request_len = 0;
+ int rc;
time_t start = time(NULL);
- char *raw_request = NULL;
int remaining_timeout = 0;
if (total_timeout == 0) {
@@ -588,57 +534,30 @@ crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconne
/* read some more off the tls buffer if we still have time left. */
crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
total_timeout, remaining_timeout);
- ret = crm_remote_ready(remote, remaining_timeout);
- raw_request = NULL;
+ rc = crm_remote_ready(remote, remaining_timeout);
- if (ret == 0) {
+ if (rc == 0) {
crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
return FALSE;
- } else if (ret < 0) {
- if (errno != EINTR) {
- crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret,
- errno);
- *disconnected = 1;
- return FALSE;
- }
- crm_debug("poll EINTR encountered during poll, retrying");
-
- } else if (remote->tcp_socket) {
- raw_request = crm_recv_plaintext(remote->tcp_socket, 0, &request_len, disconnected);
+ } else if(rc < 0) {
+ crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
-#ifdef HAVE_GNUTLS_GNUTLS_H
- } else if (remote->tls_session) {
- raw_request = crm_recv_tls(remote->tls_session, 0, &request_len, disconnected);
-#endif
} else {
- crm_err("Unsupported connection type");
- }
-
- remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
-
- if (!raw_request) {
- crm_debug("Empty msg received after poll");
- continue;
+ rc = crm_remote_recv_once(remote);
+ if(rc > 0) {
+ return TRUE;
+ } else if (rc < 0) {
+ crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
+ }
}
- if (remote->buffer) {
- int old_len = strlen(remote->buffer);
-
- crm_trace("Expanding recv buffer from %d to %d", old_len, old_len + request_len);
-
- remote->buffer = realloc(remote->buffer, old_len + request_len + 1);
- memcpy(remote->buffer + old_len, raw_request, request_len);
- *(remote->buffer + old_len + request_len) = '\0';
- free(raw_request);
-
- } else {
- remote->buffer = raw_request;
+ if(rc == -ENOTCONN) {
+ *disconnected = 1;
+ return FALSE;
}
- if (strstr(remote->buffer, REMOTE_MSG_TERMINATOR)) {
- return TRUE;
- }
+ remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
}
return FALSE;