Blame SOURCES/bz720543-pcmk-remote_properly_version_the_remote_connection_protocol.patch

ed0026
commit 330600b4cafd37f51d109728f9377bc921a6aa7f
ed0026
Author: Andrew Beekhof <andrew@beekhof.net>
ed0026
Date:   Mon Oct 14 13:50:41 2013 +1100
ed0026
ed0026
    Feature: remote: Properly version the remote connection protocol
ed0026
    
ed0026
    (cherry picked from commit aa024490a023bd0d0d5568f08b2c60b61adb15f5)
ed0026
ed0026
diff --git a/include/crm/common/ipcs.h b/include/crm/common/ipcs.h
ed0026
index 3e78d51..84249a3 100644
ed0026
--- a/include/crm/common/ipcs.h
ed0026
+++ b/include/crm/common/ipcs.h
ed0026
@@ -41,14 +41,17 @@ enum client_type {
ed0026
 struct crm_remote_s {
ed0026
     /* Shared */
ed0026
     char *buffer;
ed0026
+    size_t buffer_size;
ed0026
+    size_t buffer_offset;
ed0026
     int auth_timeout;
ed0026
-    bool authenticated;         /* CIB-only */
ed0026
+    int tcp_socket;
ed0026
     mainloop_io_t *source;
ed0026
 
ed0026
-    char *token;                /* CIB Only */
ed0026
-
ed0026
-    int tcp_socket;
ed0026
+    /* CIB-only */
ed0026
+    bool authenticated;
ed0026
+    char *token;
ed0026
 
ed0026
+    /* TLS only */
ed0026
 #  ifdef HAVE_GNUTLS_GNUTLS_H
ed0026
     gnutls_session_t *tls_session;
ed0026
     bool tls_handshake_complete;
ed0026
diff --git a/lib/common/remote.c b/lib/common/remote.c
ed0026
index 8b00f16..297c75a 100644
ed0026
--- a/lib/common/remote.c
ed0026
+++ b/lib/common/remote.c
ed0026
@@ -34,6 +34,8 @@
ed0026
 #include <fcntl.h>
ed0026
 #include <glib.h>
ed0026
 
ed0026
+#include <bzlib.h>
ed0026
+
ed0026
 #include <crm/common/ipcs.h>
ed0026
 #include <crm/common/xml.h>
ed0026
 #include <crm/common/mainloop.h>
ed0026
@@ -57,6 +59,17 @@ const int anon_tls_kx_order[] = {
ed0026
     0
ed0026
 };
ed0026
 
ed0026
+struct crm_remote_header_v0 
ed0026
+{
ed0026
+    uint64_t id;
ed0026
+    uint64_t flags;
ed0026
+    uint32_t error;
ed0026
+    uint32_t version;
ed0026
+    uint32_t size_total;
ed0026
+    uint32_t payload_uncompressed;
ed0026
+    uint32_t payload_compressed;
ed0026
+};
ed0026
+
ed0026
 int
ed0026
 crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
ed0026
 {
ed0026
@@ -176,97 +189,6 @@ crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
ed0026
 
ed0026
     return rc < 0 ? rc : total_send;
ed0026
 }
ed0026
-
ed0026
-/*!
ed0026
- * \internal
ed0026
- * \brief Read bytes off non blocking tls session.
ed0026
- *
ed0026
- * \param session - tls session to read
ed0026
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
ed0026
- *
ed0026
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
ed0026
- *       This function will return once max_size is met, the socket read buffer
ed0026
- *       is empty, or an error is encountered.
ed0026
- *
ed0026
- * \retval '\0' terminated buffer on success
ed0026
- */
ed0026
-static char *
ed0026
-crm_recv_tls(gnutls_session_t * session, size_t max_size, size_t * recv_len, int *disconnected)
ed0026
-{
ed0026
-    char *buf = NULL;
ed0026
-    int rc = 0;
ed0026
-    size_t len = 0;
ed0026
-    size_t chunk_size = max_size ? max_size : 1024;
ed0026
-    size_t buf_size = 0;
ed0026
-    size_t read_size = 0;
ed0026
-
ed0026
-    if (session == NULL) {
ed0026
-        if (disconnected) {
ed0026
-            *disconnected = 1;
ed0026
-        }
ed0026
-        goto done;
ed0026
-    }
ed0026
-
ed0026
-    buf = calloc(1, chunk_size + 1);
ed0026
-    buf_size = chunk_size;
ed0026
-
ed0026
-    while (TRUE) {
ed0026
-        read_size = buf_size - len;
ed0026
-
ed0026
-        /* automatically grow the buffer when needed if max_size is not set. */
ed0026
-        if (!max_size && (read_size < (chunk_size / 2))) {
ed0026
-            buf_size += chunk_size;
ed0026
-            crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
ed0026
-                      buf_size);
ed0026
-            buf = realloc(buf, buf_size + 1);
ed0026
-            CRM_ASSERT(buf != NULL);
ed0026
-
ed0026
-            read_size = buf_size - len;
ed0026
-        }
ed0026
-
ed0026
-        rc = gnutls_record_recv(*session, buf + len, read_size);
ed0026
-
ed0026
-        if (rc > 0) {
ed0026
-            crm_trace("Got %d more bytes.", rc);
ed0026
-            len += rc;
ed0026
-            /* always null terminate buffer, the +1 to alloc always allows for this. */
ed0026
-            buf[len] = '\0';
ed0026
-        }
ed0026
-        if (max_size && (max_size == read_size)) {
ed0026
-            crm_trace("Buffer max read size %d met", max_size);
ed0026
-            goto done;
ed0026
-        }
ed0026
-
ed0026
-        /* process any errors. */
ed0026
-        if (rc == GNUTLS_E_INTERRUPTED) {
ed0026
-            crm_trace("EINTR encoutered, retry tls read");
ed0026
-        } else if (rc == GNUTLS_E_AGAIN) {
ed0026
-            crm_trace("non-blocking, exiting read on rc = %d", rc);
ed0026
-            goto done;
ed0026
-        } else if (rc <= 0) {
ed0026
-            if (rc == 0) {
ed0026
-                crm_debug("EOF encoutered during TLS read");
ed0026
-            } else {
ed0026
-                crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
ed0026
-            }
ed0026
-            if (disconnected) {
ed0026
-                *disconnected = 1;
ed0026
-            }
ed0026
-            goto done;
ed0026
-        }
ed0026
-    }
ed0026
-
ed0026
-  done:
ed0026
-    if (recv_len) {
ed0026
-        *recv_len = len;
ed0026
-    }
ed0026
-    if (!len) {
ed0026
-        free(buf);
ed0026
-        buf = NULL;
ed0026
-    }
ed0026
-    return buf;
ed0026
-
ed0026
-}
ed0026
 #endif
ed0026
 
ed0026
 static int
ed0026
@@ -310,141 +232,57 @@ crm_send_plaintext(int sock, const char *buf, size_t len)
ed0026
 
ed0026
 }
ed0026
 
ed0026
-/*!
ed0026
- * \internal
ed0026
- * \brief Read bytes off non blocking socket.
ed0026
- *
ed0026
- * \param session - tls session to read
ed0026
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
ed0026
- *
ed0026
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
ed0026
- *       This function will return once max_size is met, the socket read buffer
ed0026
- *       is empty, or an error is encountered.
ed0026
- *
ed0026
- * \retval '\0' terminated buffer on success
ed0026
- */
ed0026
-static char *
ed0026
-crm_recv_plaintext(int sock, size_t max_size, size_t * recv_len, int *disconnected)
ed0026
-{
ed0026
-    char *buf = NULL;
ed0026
-    ssize_t rc = 0;
ed0026
-    ssize_t len = 0;
ed0026
-    ssize_t chunk_size = max_size ? max_size : 1024;
ed0026
-    size_t buf_size = 0;
ed0026
-    size_t read_size = 0;
ed0026
-
ed0026
-    if (sock <= 0) {
ed0026
-        if (disconnected) {
ed0026
-            *disconnected = 1;
ed0026
-        }
ed0026
-        goto done;
ed0026
-    }
ed0026
-
ed0026
-    buf = calloc(1, chunk_size + 1);
ed0026
-    buf_size = chunk_size;
ed0026
-
ed0026
-    while (TRUE) {
ed0026
-        errno = 0;
ed0026
-        read_size = buf_size - len;
ed0026
-
ed0026
-        /* automatically grow the buffer when needed if max_size is not set. */
ed0026
-        if (!max_size && (read_size < (chunk_size / 2))) {
ed0026
-            buf_size += chunk_size;
ed0026
-            crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
ed0026
-                      buf_size);
ed0026
-            buf = realloc(buf, buf_size + 1);
ed0026
-            CRM_ASSERT(buf != NULL);
ed0026
-
ed0026
-            read_size = buf_size - len;
ed0026
-        }
ed0026
-
ed0026
-        rc = read(sock, buf + len, chunk_size);
ed0026
-
ed0026
-        if (rc > 0) {
ed0026
-            crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
ed0026
-            len += rc;
ed0026
-            /* always null terminate buffer, the +1 to alloc always allows for this. */
ed0026
-            buf[len] = '\0';
ed0026
-        }
ed0026
-        if (max_size && (max_size == read_size)) {
ed0026
-            crm_trace("Buffer max read size %d met", max_size);
ed0026
-            goto done;
ed0026
-        }
ed0026
-
ed0026
-        if (rc > 0) {
ed0026
-            continue;
ed0026
-        } else if (rc == 0) {
ed0026
-            if (disconnected) {
ed0026
-                *disconnected = 1;
ed0026
-            }
ed0026
-            crm_trace("EOF encoutered during read");
ed0026
-            goto done;
ed0026
-        }
ed0026
-
ed0026
-        /* process errors */
ed0026
-        if (errno == EINTR) {
ed0026
-            crm_trace("EINTER encoutered, retry socket read.");
ed0026
-        } else if (errno == EAGAIN) {
ed0026
-            crm_trace("non-blocking, exiting read on rc = %d", rc);
ed0026
-            goto done;
ed0026
-        } else if (errno <= 0) {
ed0026
-            if (disconnected) {
ed0026
-                *disconnected = 1;
ed0026
-            }
ed0026
-            crm_debug("Error receiving message: %d", (int)rc);
ed0026
-            goto done;
ed0026
-        }
ed0026
-    }
ed0026
-
ed0026
-  done:
ed0026
-    if (recv_len) {
ed0026
-        *recv_len = len;
ed0026
-    }
ed0026
-    if (!len) {
ed0026
-        free(buf);
ed0026
-        buf = NULL;
ed0026
-    }
ed0026
-    return buf;
ed0026
-}
ed0026
-
ed0026
 static int
ed0026
-crm_remote_send_raw(crm_remote_t * remote, const char *buf, size_t len)
ed0026
+crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
ed0026
 {
ed0026
+    int lpc = 0;
ed0026
     int rc = -ESOCKTNOSUPPORT;
ed0026
 
ed0026
-    if (remote->tcp_socket) {
ed0026
-        rc = crm_send_plaintext(remote->tcp_socket, buf, len);
ed0026
+    for(; lpc < iovs; lpc++) {
ed0026
+        if (remote->tcp_socket) {
ed0026
+            rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
ed0026
 #ifdef HAVE_GNUTLS_GNUTLS_H
ed0026
 
ed0026
-    } else if (remote->tls_session) {
ed0026
-        rc = crm_send_tls(remote->tls_session, buf, len);
ed0026
+        } else if (remote->tls_session) {
ed0026
+            rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
ed0026
 #endif
ed0026
-    } else {
ed0026
-        crm_err("Unsupported connection type");
ed0026
+        } else {
ed0026
+            crm_err("Unsupported connection type");
ed0026
+        }
ed0026
     }
ed0026
     return rc;
ed0026
 }
ed0026
 
ed0026
+#define PCMK_TLS_VERSION 1
ed0026
+
ed0026
 int
ed0026
 crm_remote_send(crm_remote_t * remote, xmlNode * msg)
ed0026
 {
ed0026
+    static uint64_t id = 0;
ed0026
     int rc = -1;
ed0026
-    char *xml_text = NULL;
ed0026
-    int len = 0;
ed0026
+    char *xml_text = dump_xml_unformatted(msg);
ed0026
 
ed0026
-    xml_text = dump_xml_unformatted(msg);
ed0026
-    if (xml_text) {
ed0026
-        len = strlen(xml_text);
ed0026
-    } else {
ed0026
+    struct iovec iov[2];
ed0026
+    struct crm_remote_header_v0 *header = calloc(1, sizeof(struct crm_remote_header_v0));
ed0026
+
ed0026
+    if (xml_text == NULL) {
ed0026
         crm_err("Invalid XML, can not send msg");
ed0026
         return -1;
ed0026
     }
ed0026
 
ed0026
-    rc = crm_remote_send_raw(remote, xml_text, len);
ed0026
-    if (rc >= 0) {
ed0026
-        rc = crm_remote_send_raw(remote, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR));
ed0026
-    }
ed0026
+    iov[0].iov_base = header;
ed0026
+    iov[0].iov_len = sizeof(struct crm_remote_header_v0);
ed0026
+
ed0026
+    iov[1].iov_base = xml_text;
ed0026
+    iov[1].iov_len = 1 + strlen(xml_text);
ed0026
 
ed0026
+    id++;
ed0026
+    header->id = id;
ed0026
+    header->version = PCMK_TLS_VERSION;
ed0026
+    header->size_total = iov[0].iov_len + iov[1].iov_len;
ed0026
+    header->payload_uncompressed = iov[1].iov_len;
ed0026
+
ed0026
+    rc = crm_remote_sendv(remote, iov, 2);
ed0026
     if (rc < 0) {
ed0026
         crm_err("Failed to send remote msg, rc = %d", rc);
ed0026
     }
ed0026
@@ -461,44 +299,55 @@ crm_remote_send(crm_remote_t * remote, xmlNode * msg)
ed0026
 xmlNode *
ed0026
 crm_remote_parse_buffer(crm_remote_t * remote)
ed0026
 {
ed0026
-    char *buf = NULL;
ed0026
-    char *start = NULL;
ed0026
-    char *end = NULL;
ed0026
     xmlNode *xml = NULL;
ed0026
+    size_t offset = sizeof(struct crm_remote_header_v0);
ed0026
+    struct crm_remote_header_v0 *header = NULL;
ed0026
 
ed0026
     if (remote->buffer == NULL) {
ed0026
         return NULL;
ed0026
+
ed0026
+    } else if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
ed0026
+        return NULL;
ed0026
     }
ed0026
 
ed0026
     /* take ownership of the buffer */
ed0026
-    buf = remote->buffer;
ed0026
-    remote->buffer = NULL;
ed0026
-
ed0026
-    /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */
ed0026
-    start = buf;
ed0026
-    end = strstr(start, REMOTE_MSG_TERMINATOR);
ed0026
+    remote->buffer_offset = 0;
ed0026
+
ed0026
+    header = (struct crm_remote_header_v0 *)remote->buffer;
ed0026
+    /* Support compression on the receiving end now, in case we ever want to add it later */
ed0026
+    if (header->payload_compressed) {
ed0026
+        int rc = 0;
ed0026
+        unsigned int size_u = 1 + header->payload_uncompressed;
ed0026
+        char *uncompressed = calloc(1, offset + size_u);
ed0026
+
ed0026
+        crm_trace("Decompressing message data %d bytes into %d bytes",
ed0026
+                 header->payload_compressed, size_u);
ed0026
+
ed0026
+        rc = BZ2_bzBuffToBuffDecompress(uncompressed + offset, &size_u,
ed0026
+                                        remote->buffer + offset,
ed0026
+                                        header->payload_compressed, 1, 0);
ed0026
+
ed0026
+        if (rc != BZ_OK) {
ed0026
+            crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
ed0026
+            free(uncompressed);
ed0026
+            return NULL;
ed0026
+        }
ed0026
 
ed0026
-    while (!xml && end) {
ed0026
+        CRM_ASSERT(size_u == header->payload_uncompressed);
ed0026
 
ed0026
-        /* grab the message */
ed0026
-        end[0] = '\0';
ed0026
-        end += strlen(REMOTE_MSG_TERMINATOR);
ed0026
+        memcpy(uncompressed, remote->buffer, offset);       /* Preserve the header */
ed0026
+        header = (struct crm_remote_header_v0 *)uncompressed;
ed0026
 
ed0026
-        xml = string2xml(start);
ed0026
-        if (xml == NULL) {
ed0026
-            crm_err("Couldn't parse: '%.120s'", start);
ed0026
-        }
ed0026
-        start = end;
ed0026
-        end = strstr(start, REMOTE_MSG_TERMINATOR);
ed0026
+        free(remote->buffer);
ed0026
+        remote->buffer_size = offset + size_u;
ed0026
+        remote->buffer = uncompressed;
ed0026
     }
ed0026
 
ed0026
-    if (xml && start) {
ed0026
-        /* we have msgs left over, save it until next time */
ed0026
-        remote->buffer = strdup(start);
ed0026
-        free(buf);
ed0026
-    } else if (!xml) {
ed0026
-        /* no msg present */
ed0026
-        remote->buffer = buf;
ed0026
+    CRM_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
ed0026
+
ed0026
+    xml = string2xml(remote->buffer + offset);
ed0026
+    if (xml == NULL) {
ed0026
+        crm_err("Couldn't parse: '%.120s'", remote->buffer + offset);
ed0026
     }
ed0026
 
ed0026
     return xml;
ed0026
@@ -559,6 +408,105 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
ed0026
     return rc;
ed0026
 }
ed0026
 
ed0026
+
ed0026
+/*!
ed0026
+ * \internal
ed0026
+ * \brief Read bytes off non blocking remote connection.
ed0026
+ *
ed0026
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
ed0026
+ *       This function will return once max_size is met, the socket read buffer
ed0026
+ *       is empty, or an error is encountered.
ed0026
+ *
ed0026
+ * \retval number of bytes received
ed0026
+ */
ed0026
+static size_t
ed0026
+crm_remote_recv_once(crm_remote_t * remote)
ed0026
+{
ed0026
+    int rc = 0;
ed0026
+    size_t read_len = sizeof(struct crm_remote_header_v0);
ed0026
+
ed0026
+    if(remote->buffer_offset >= sizeof(struct crm_remote_header_v0)) {
ed0026
+        struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
ed0026
+
ed0026
+        read_len = hdr->size_total;
ed0026
+    }
ed0026
+
ed0026
+    /* automatically grow the buffer when needed */
ed0026
+    if(remote->buffer_size < read_len) {
ed0026
+        remote->buffer_size = 2 * read_len;
ed0026
+        crm_trace("Expanding buffer to %u bytes", remote->buffer_size);
ed0026
+
ed0026
+        remote->buffer = realloc(remote->buffer, remote->buffer_size + 1);
ed0026
+        CRM_ASSERT(remote->buffer != NULL);
ed0026
+    }
ed0026
+
ed0026
+    if (remote->tcp_socket) {
ed0026
+        errno = 0;
ed0026
+        rc = read(remote->tcp_socket,
ed0026
+                  remote->buffer + remote->buffer_offset,
ed0026
+                  remote->buffer_size - remote->buffer_offset);
ed0026
+        if(rc < 0) {
ed0026
+            rc = -errno;
ed0026
+        }
ed0026
+
ed0026
+#ifdef HAVE_GNUTLS_GNUTLS_H
ed0026
+    } else if (remote->tls_session) {
ed0026
+        rc = gnutls_record_recv(*(remote->tls_session),
ed0026
+                                remote->buffer + remote->buffer_offset,
ed0026
+                                remote->buffer_size - remote->buffer_offset);
ed0026
+        if (rc == GNUTLS_E_INTERRUPTED) {
ed0026
+            rc = -EINTR;
ed0026
+        } else if (rc == GNUTLS_E_AGAIN) {
ed0026
+            rc = -EAGAIN;
ed0026
+        } else if (rc < 0) {
ed0026
+            crm_info("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
ed0026
+            rc = -pcmk_err_generic;
ed0026
+        }
ed0026
+#endif
ed0026
+    } else {
ed0026
+        crm_err("Unsupported connection type");
ed0026
+        return -ESOCKTNOSUPPORT;
ed0026
+    }
ed0026
+
ed0026
+    /* process any errors. */
ed0026
+    if (rc > 0) {
ed0026
+        remote->buffer_offset += rc;
ed0026
+        /* always null terminate buffer, the +1 to alloc always allows for this. */
ed0026
+        remote->buffer[remote->buffer_offset] = '\0';
ed0026
+        crm_trace("Received %u more bytes, %u total", rc, remote->buffer_offset);
ed0026
+
ed0026
+    } else if (rc == -EINTR || rc == -EAGAIN) {
ed0026
+        crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
ed0026
+
ed0026
+    } else if (rc == 0) {
ed0026
+        crm_debug("EOF encoutered after %u bytes", remote->buffer_offset);
ed0026
+        return -ENOTCONN;
ed0026
+
ed0026
+    } else if (rc <= 0) {
ed0026
+        crm_debug("Error receiving message after %u bytes: %s (%d)",
ed0026
+                  remote->buffer_offset, gnutls_strerror(rc), rc);
ed0026
+        return -ENOTCONN;
ed0026
+    }
ed0026
+
ed0026
+    if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
ed0026
+        crm_trace("Not enough data to fill header: %u < %u bytes",
ed0026
+                  remote->buffer_offset, sizeof(struct crm_remote_header_v0));
ed0026
+        return -EAGAIN;
ed0026
+
ed0026
+    } else {
ed0026
+        struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
ed0026
+
ed0026
+        if(remote->buffer_offset < hdr->size_total) {
ed0026
+            crm_trace("Read less than the advertised length: %u < %u bytes",
ed0026
+                      remote->buffer_offset, hdr->size_total);
ed0026
+            return -EAGAIN;
ed0026
+        }
ed0026
+    }
ed0026
+
ed0026
+    crm_trace("Read full message of %u bytes", remote->buffer_offset);
ed0026
+    return remote->buffer_offset;
ed0026
+}
ed0026
+
ed0026
 /*!
ed0026
  * \internal
ed0026
  * \brief Read data off the socket until at least one full message is present or timeout occures.
ed0026
@@ -569,10 +517,8 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
ed0026
 gboolean
ed0026
 crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
ed0026
 {
ed0026
-    int ret;
ed0026
-    size_t request_len = 0;
ed0026
+    int rc;
ed0026
     time_t start = time(NULL);
ed0026
-    char *raw_request = NULL;
ed0026
     int remaining_timeout = 0;
ed0026
 
ed0026
     if (total_timeout == 0) {
ed0026
@@ -588,57 +534,30 @@ crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconne
ed0026
         /* read some more off the tls buffer if we still have time left. */
ed0026
         crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
ed0026
                   total_timeout, remaining_timeout);
ed0026
-        ret = crm_remote_ready(remote, remaining_timeout);
ed0026
-        raw_request = NULL;
ed0026
+        rc = crm_remote_ready(remote, remaining_timeout);
ed0026
 
ed0026
-        if (ret == 0) {
ed0026
+        if (rc == 0) {
ed0026
             crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
ed0026
             return FALSE;
ed0026
 
ed0026
-        } else if (ret < 0) {
ed0026
-            if (errno != EINTR) {
ed0026
-                crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret,
ed0026
-                          errno);
ed0026
-                *disconnected = 1;
ed0026
-                return FALSE;
ed0026
-            }
ed0026
-            crm_debug("poll EINTR encountered during poll, retrying");
ed0026
-
ed0026
-        } else if (remote->tcp_socket) {
ed0026
-            raw_request = crm_recv_plaintext(remote->tcp_socket, 0, &request_len, disconnected);
ed0026
+        } else if(rc < 0) {
ed0026
+            crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
ed0026
 
ed0026
-#ifdef HAVE_GNUTLS_GNUTLS_H
ed0026
-        } else if (remote->tls_session) {
ed0026
-            raw_request = crm_recv_tls(remote->tls_session, 0, &request_len, disconnected);
ed0026
-#endif
ed0026
         } else {
ed0026
-            crm_err("Unsupported connection type");
ed0026
-        }
ed0026
-
ed0026
-        remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
ed0026
-
ed0026
-        if (!raw_request) {
ed0026
-            crm_debug("Empty msg received after poll");
ed0026
-            continue;
ed0026
+            rc = crm_remote_recv_once(remote);
ed0026
+            if(rc > 0) {
ed0026
+                return TRUE;
ed0026
+            } else if (rc < 0) {
ed0026
+                crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
ed0026
+            }
ed0026
         }
ed0026
 
ed0026
-        if (remote->buffer) {
ed0026
-            int old_len = strlen(remote->buffer);
ed0026
-
ed0026
-            crm_trace("Expanding recv buffer from %d to %d", old_len, old_len + request_len);
ed0026
-
ed0026
-            remote->buffer = realloc(remote->buffer, old_len + request_len + 1);
ed0026
-            memcpy(remote->buffer + old_len, raw_request, request_len);
ed0026
-            *(remote->buffer + old_len + request_len) = '\0';
ed0026
-            free(raw_request);
ed0026
-
ed0026
-        } else {
ed0026
-            remote->buffer = raw_request;
ed0026
+        if(rc == -ENOTCONN) {
ed0026
+            *disconnected = 1;
ed0026
+            return FALSE;
ed0026
         }
ed0026
 
ed0026
-        if (strstr(remote->buffer, REMOTE_MSG_TERMINATOR)) {
ed0026
-            return TRUE;
ed0026
-        }
ed0026
+        remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
ed0026
     }
ed0026
 
ed0026
     return FALSE;