Blob Blame History Raw
commit 330600b4cafd37f51d109728f9377bc921a6aa7f
Author: Andrew Beekhof <andrew@beekhof.net>
Date:   Mon Oct 14 13:50:41 2013 +1100

    Feature: remote: Properly version the remote connection protocol
    
    (cherry picked from commit aa024490a023bd0d0d5568f08b2c60b61adb15f5)

diff --git a/include/crm/common/ipcs.h b/include/crm/common/ipcs.h
index 3e78d51..84249a3 100644
--- a/include/crm/common/ipcs.h
+++ b/include/crm/common/ipcs.h
@@ -41,14 +41,17 @@ enum client_type {
 struct crm_remote_s {
     /* Shared */
     char *buffer;
+    size_t buffer_size;
+    size_t buffer_offset;
     int auth_timeout;
-    bool authenticated;         /* CIB-only */
+    int tcp_socket;
     mainloop_io_t *source;
 
-    char *token;                /* CIB Only */
-
-    int tcp_socket;
+    /* CIB-only */
+    bool authenticated;
+    char *token;
 
+    /* TLS only */
 #  ifdef HAVE_GNUTLS_GNUTLS_H
     gnutls_session_t *tls_session;
     bool tls_handshake_complete;
diff --git a/lib/common/remote.c b/lib/common/remote.c
index 8b00f16..297c75a 100644
--- a/lib/common/remote.c
+++ b/lib/common/remote.c
@@ -34,6 +34,8 @@
 #include <fcntl.h>
 #include <glib.h>
 
+#include <bzlib.h>
+
 #include <crm/common/ipcs.h>
 #include <crm/common/xml.h>
 #include <crm/common/mainloop.h>
@@ -57,6 +59,17 @@ const int anon_tls_kx_order[] = {
     0
 };
 
+struct crm_remote_header_v0 
+{
+    uint64_t id;
+    uint64_t flags;
+    uint32_t error;
+    uint32_t version;
+    uint32_t size_total;
+    uint32_t payload_uncompressed;
+    uint32_t payload_compressed;
+};
+
 int
 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
 {
@@ -176,97 +189,6 @@ crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
 
     return rc < 0 ? rc : total_send;
 }
-
-/*!
- * \internal
- * \brief Read bytes off non blocking tls session.
- *
- * \param session - tls session to read
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
- *
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
- *       This function will return once max_size is met, the socket read buffer
- *       is empty, or an error is encountered.
- *
- * \retval '\0' terminated buffer on success
- */
-static char *
-crm_recv_tls(gnutls_session_t * session, size_t max_size, size_t * recv_len, int *disconnected)
-{
-    char *buf = NULL;
-    int rc = 0;
-    size_t len = 0;
-    size_t chunk_size = max_size ? max_size : 1024;
-    size_t buf_size = 0;
-    size_t read_size = 0;
-
-    if (session == NULL) {
-        if (disconnected) {
-            *disconnected = 1;
-        }
-        goto done;
-    }
-
-    buf = calloc(1, chunk_size + 1);
-    buf_size = chunk_size;
-
-    while (TRUE) {
-        read_size = buf_size - len;
-
-        /* automatically grow the buffer when needed if max_size is not set. */
-        if (!max_size && (read_size < (chunk_size / 2))) {
-            buf_size += chunk_size;
-            crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
-                      buf_size);
-            buf = realloc(buf, buf_size + 1);
-            CRM_ASSERT(buf != NULL);
-
-            read_size = buf_size - len;
-        }
-
-        rc = gnutls_record_recv(*session, buf + len, read_size);
-
-        if (rc > 0) {
-            crm_trace("Got %d more bytes.", rc);
-            len += rc;
-            /* always null terminate buffer, the +1 to alloc always allows for this. */
-            buf[len] = '\0';
-        }
-        if (max_size && (max_size == read_size)) {
-            crm_trace("Buffer max read size %d met", max_size);
-            goto done;
-        }
-
-        /* process any errors. */
-        if (rc == GNUTLS_E_INTERRUPTED) {
-            crm_trace("EINTR encoutered, retry tls read");
-        } else if (rc == GNUTLS_E_AGAIN) {
-            crm_trace("non-blocking, exiting read on rc = %d", rc);
-            goto done;
-        } else if (rc <= 0) {
-            if (rc == 0) {
-                crm_debug("EOF encoutered during TLS read");
-            } else {
-                crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
-            }
-            if (disconnected) {
-                *disconnected = 1;
-            }
-            goto done;
-        }
-    }
-
-  done:
-    if (recv_len) {
-        *recv_len = len;
-    }
-    if (!len) {
-        free(buf);
-        buf = NULL;
-    }
-    return buf;
-
-}
 #endif
 
 static int
@@ -310,141 +232,57 @@ crm_send_plaintext(int sock, const char *buf, size_t len)
 
 }
 
-/*!
- * \internal
- * \brief Read bytes off non blocking socket.
- *
- * \param session - tls session to read
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
- *
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
- *       This function will return once max_size is met, the socket read buffer
- *       is empty, or an error is encountered.
- *
- * \retval '\0' terminated buffer on success
- */
-static char *
-crm_recv_plaintext(int sock, size_t max_size, size_t * recv_len, int *disconnected)
-{
-    char *buf = NULL;
-    ssize_t rc = 0;
-    ssize_t len = 0;
-    ssize_t chunk_size = max_size ? max_size : 1024;
-    size_t buf_size = 0;
-    size_t read_size = 0;
-
-    if (sock <= 0) {
-        if (disconnected) {
-            *disconnected = 1;
-        }
-        goto done;
-    }
-
-    buf = calloc(1, chunk_size + 1);
-    buf_size = chunk_size;
-
-    while (TRUE) {
-        errno = 0;
-        read_size = buf_size - len;
-
-        /* automatically grow the buffer when needed if max_size is not set. */
-        if (!max_size && (read_size < (chunk_size / 2))) {
-            buf_size += chunk_size;
-            crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
-                      buf_size);
-            buf = realloc(buf, buf_size + 1);
-            CRM_ASSERT(buf != NULL);
-
-            read_size = buf_size - len;
-        }
-
-        rc = read(sock, buf + len, chunk_size);
-
-        if (rc > 0) {
-            crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
-            len += rc;
-            /* always null terminate buffer, the +1 to alloc always allows for this. */
-            buf[len] = '\0';
-        }
-        if (max_size && (max_size == read_size)) {
-            crm_trace("Buffer max read size %d met", max_size);
-            goto done;
-        }
-
-        if (rc > 0) {
-            continue;
-        } else if (rc == 0) {
-            if (disconnected) {
-                *disconnected = 1;
-            }
-            crm_trace("EOF encoutered during read");
-            goto done;
-        }
-
-        /* process errors */
-        if (errno == EINTR) {
-            crm_trace("EINTER encoutered, retry socket read.");
-        } else if (errno == EAGAIN) {
-            crm_trace("non-blocking, exiting read on rc = %d", rc);
-            goto done;
-        } else if (errno <= 0) {
-            if (disconnected) {
-                *disconnected = 1;
-            }
-            crm_debug("Error receiving message: %d", (int)rc);
-            goto done;
-        }
-    }
-
-  done:
-    if (recv_len) {
-        *recv_len = len;
-    }
-    if (!len) {
-        free(buf);
-        buf = NULL;
-    }
-    return buf;
-}
-
 static int
-crm_remote_send_raw(crm_remote_t * remote, const char *buf, size_t len)
+crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
 {
+    int lpc = 0;
     int rc = -ESOCKTNOSUPPORT;
 
-    if (remote->tcp_socket) {
-        rc = crm_send_plaintext(remote->tcp_socket, buf, len);
+    for(; lpc < iovs; lpc++) {
+        if (remote->tcp_socket) {
+            rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
 #ifdef HAVE_GNUTLS_GNUTLS_H
 
-    } else if (remote->tls_session) {
-        rc = crm_send_tls(remote->tls_session, buf, len);
+        } else if (remote->tls_session) {
+            rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
 #endif
-    } else {
-        crm_err("Unsupported connection type");
+        } else {
+            crm_err("Unsupported connection type");
+        }
     }
     return rc;
 }
 
+#define PCMK_TLS_VERSION 1
+
 int
 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
 {
+    static uint64_t id = 0;
     int rc = -1;
-    char *xml_text = NULL;
-    int len = 0;
+    char *xml_text = dump_xml_unformatted(msg);
 
-    xml_text = dump_xml_unformatted(msg);
-    if (xml_text) {
-        len = strlen(xml_text);
-    } else {
+    struct iovec iov[2];
+    struct crm_remote_header_v0 *header = calloc(1, sizeof(struct crm_remote_header_v0));
+
+    if (xml_text == NULL) {
         crm_err("Invalid XML, can not send msg");
         return -1;
     }
 
-    rc = crm_remote_send_raw(remote, xml_text, len);
-    if (rc >= 0) {
-        rc = crm_remote_send_raw(remote, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR));
-    }
+    iov[0].iov_base = header;
+    iov[0].iov_len = sizeof(struct crm_remote_header_v0);
+
+    iov[1].iov_base = xml_text;
+    iov[1].iov_len = 1 + strlen(xml_text);
 
+    id++;
+    header->id = id;
+    header->version = PCMK_TLS_VERSION;
+    header->size_total = iov[0].iov_len + iov[1].iov_len;
+    header->payload_uncompressed = iov[1].iov_len;
+
+    rc = crm_remote_sendv(remote, iov, 2);
     if (rc < 0) {
         crm_err("Failed to send remote msg, rc = %d", rc);
     }
@@ -461,44 +299,55 @@ crm_remote_send(crm_remote_t * remote, xmlNode * msg)
 xmlNode *
 crm_remote_parse_buffer(crm_remote_t * remote)
 {
-    char *buf = NULL;
-    char *start = NULL;
-    char *end = NULL;
     xmlNode *xml = NULL;
+    size_t offset = sizeof(struct crm_remote_header_v0);
+    struct crm_remote_header_v0 *header = NULL;
 
     if (remote->buffer == NULL) {
         return NULL;
+
+    } else if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
+        return NULL;
     }
 
     /* take ownership of the buffer */
-    buf = remote->buffer;
-    remote->buffer = NULL;
-
-    /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */
-    start = buf;
-    end = strstr(start, REMOTE_MSG_TERMINATOR);
+    remote->buffer_offset = 0;
+
+    header = (struct crm_remote_header_v0 *)remote->buffer;
+    /* Support compression on the receiving end now, in case we ever want to add it later */
+    if (header->payload_compressed) {
+        int rc = 0;
+        unsigned int size_u = 1 + header->payload_uncompressed;
+        char *uncompressed = calloc(1, offset + size_u);
+
+        crm_trace("Decompressing message data %d bytes into %d bytes",
+                 header->payload_compressed, size_u);
+
+        rc = BZ2_bzBuffToBuffDecompress(uncompressed + offset, &size_u,
+                                        remote->buffer + offset,
+                                        header->payload_compressed, 1, 0);
+
+        if (rc != BZ_OK) {
+            crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
+            free(uncompressed);
+            return NULL;
+        }
 
-    while (!xml && end) {
+        CRM_ASSERT(size_u == header->payload_uncompressed);
 
-        /* grab the message */
-        end[0] = '\0';
-        end += strlen(REMOTE_MSG_TERMINATOR);
+        memcpy(uncompressed, remote->buffer, offset);       /* Preserve the header */
+        header = (struct crm_remote_header_v0 *)uncompressed;
 
-        xml = string2xml(start);
-        if (xml == NULL) {
-            crm_err("Couldn't parse: '%.120s'", start);
-        }
-        start = end;
-        end = strstr(start, REMOTE_MSG_TERMINATOR);
+        free(remote->buffer);
+        remote->buffer_size = offset + size_u;
+        remote->buffer = uncompressed;
     }
 
-    if (xml && start) {
-        /* we have msgs left over, save it until next time */
-        remote->buffer = strdup(start);
-        free(buf);
-    } else if (!xml) {
-        /* no msg present */
-        remote->buffer = buf;
+    CRM_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
+
+    xml = string2xml(remote->buffer + offset);
+    if (xml == NULL) {
+        crm_err("Couldn't parse: '%.120s'", remote->buffer + offset);
     }
 
     return xml;
@@ -559,6 +408,105 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
     return rc;
 }
 
+
+/*!
+ * \internal
+ * \brief Read bytes off non blocking remote connection.
+ *
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
+ *       This function will return once max_size is met, the socket read buffer
+ *       is empty, or an error is encountered.
+ *
+ * \retval number of bytes received
+ */
+static size_t
+crm_remote_recv_once(crm_remote_t * remote)
+{
+    int rc = 0;
+    size_t read_len = sizeof(struct crm_remote_header_v0);
+
+    if(remote->buffer_offset >= sizeof(struct crm_remote_header_v0)) {
+        struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
+
+        read_len = hdr->size_total;
+    }
+
+    /* automatically grow the buffer when needed */
+    if(remote->buffer_size < read_len) {
+        remote->buffer_size = 2 * read_len;
+        crm_trace("Expanding buffer to %u bytes", remote->buffer_size);
+
+        remote->buffer = realloc(remote->buffer, remote->buffer_size + 1);
+        CRM_ASSERT(remote->buffer != NULL);
+    }
+
+    if (remote->tcp_socket) {
+        errno = 0;
+        rc = read(remote->tcp_socket,
+                  remote->buffer + remote->buffer_offset,
+                  remote->buffer_size - remote->buffer_offset);
+        if(rc < 0) {
+            rc = -errno;
+        }
+
+#ifdef HAVE_GNUTLS_GNUTLS_H
+    } else if (remote->tls_session) {
+        rc = gnutls_record_recv(*(remote->tls_session),
+                                remote->buffer + remote->buffer_offset,
+                                remote->buffer_size - remote->buffer_offset);
+        if (rc == GNUTLS_E_INTERRUPTED) {
+            rc = -EINTR;
+        } else if (rc == GNUTLS_E_AGAIN) {
+            rc = -EAGAIN;
+        } else if (rc < 0) {
+            crm_info("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
+            rc = -pcmk_err_generic;
+        }
+#endif
+    } else {
+        crm_err("Unsupported connection type");
+        return -ESOCKTNOSUPPORT;
+    }
+
+    /* process any errors. */
+    if (rc > 0) {
+        remote->buffer_offset += rc;
+        /* always null terminate buffer, the +1 to alloc always allows for this. */
+        remote->buffer[remote->buffer_offset] = '\0';
+        crm_trace("Received %u more bytes, %u total", rc, remote->buffer_offset);
+
+    } else if (rc == -EINTR || rc == -EAGAIN) {
+        crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
+
+    } else if (rc == 0) {
+        crm_debug("EOF encoutered after %u bytes", remote->buffer_offset);
+        return -ENOTCONN;
+
+    } else if (rc <= 0) {
+        crm_debug("Error receiving message after %u bytes: %s (%d)",
+                  remote->buffer_offset, gnutls_strerror(rc), rc);
+        return -ENOTCONN;
+    }
+
+    if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
+        crm_trace("Not enough data to fill header: %u < %u bytes",
+                  remote->buffer_offset, sizeof(struct crm_remote_header_v0));
+        return -EAGAIN;
+
+    } else {
+        struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
+
+        if(remote->buffer_offset < hdr->size_total) {
+            crm_trace("Read less than the advertised length: %u < %u bytes",
+                      remote->buffer_offset, hdr->size_total);
+            return -EAGAIN;
+        }
+    }
+
+    crm_trace("Read full message of %u bytes", remote->buffer_offset);
+    return remote->buffer_offset;
+}
+
 /*!
  * \internal
  * \brief Read data off the socket until at least one full message is present or timeout occures.
@@ -569,10 +517,8 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
 gboolean
 crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
 {
-    int ret;
-    size_t request_len = 0;
+    int rc;
     time_t start = time(NULL);
-    char *raw_request = NULL;
     int remaining_timeout = 0;
 
     if (total_timeout == 0) {
@@ -588,57 +534,30 @@ crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconne
         /* read some more off the tls buffer if we still have time left. */
         crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
                   total_timeout, remaining_timeout);
-        ret = crm_remote_ready(remote, remaining_timeout);
-        raw_request = NULL;
+        rc = crm_remote_ready(remote, remaining_timeout);
 
-        if (ret == 0) {
+        if (rc == 0) {
             crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
             return FALSE;
 
-        } else if (ret < 0) {
-            if (errno != EINTR) {
-                crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret,
-                          errno);
-                *disconnected = 1;
-                return FALSE;
-            }
-            crm_debug("poll EINTR encountered during poll, retrying");
-
-        } else if (remote->tcp_socket) {
-            raw_request = crm_recv_plaintext(remote->tcp_socket, 0, &request_len, disconnected);
+        } else if(rc < 0) {
+            crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
 
-#ifdef HAVE_GNUTLS_GNUTLS_H
-        } else if (remote->tls_session) {
-            raw_request = crm_recv_tls(remote->tls_session, 0, &request_len, disconnected);
-#endif
         } else {
-            crm_err("Unsupported connection type");
-        }
-
-        remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
-
-        if (!raw_request) {
-            crm_debug("Empty msg received after poll");
-            continue;
+            rc = crm_remote_recv_once(remote);
+            if(rc > 0) {
+                return TRUE;
+            } else if (rc < 0) {
+                crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
+            }
         }
 
-        if (remote->buffer) {
-            int old_len = strlen(remote->buffer);
-
-            crm_trace("Expanding recv buffer from %d to %d", old_len, old_len + request_len);
-
-            remote->buffer = realloc(remote->buffer, old_len + request_len + 1);
-            memcpy(remote->buffer + old_len, raw_request, request_len);
-            *(remote->buffer + old_len + request_len) = '\0';
-            free(raw_request);
-
-        } else {
-            remote->buffer = raw_request;
+        if(rc == -ENOTCONN) {
+            *disconnected = 1;
+            return FALSE;
         }
 
-        if (strstr(remote->buffer, REMOTE_MSG_TERMINATOR)) {
-            return TRUE;
-        }
+        remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
     }
 
     return FALSE;