|
|
ed0026 |
commit 330600b4cafd37f51d109728f9377bc921a6aa7f
|
|
|
ed0026 |
Author: Andrew Beekhof <andrew@beekhof.net>
|
|
|
ed0026 |
Date: Mon Oct 14 13:50:41 2013 +1100
|
|
|
ed0026 |
|
|
|
ed0026 |
Feature: remote: Properly version the remote connection protocol
|
|
|
ed0026 |
|
|
|
ed0026 |
(cherry picked from commit aa024490a023bd0d0d5568f08b2c60b61adb15f5)
|
|
|
ed0026 |
|
|
|
ed0026 |
diff --git a/include/crm/common/ipcs.h b/include/crm/common/ipcs.h
|
|
|
ed0026 |
index 3e78d51..84249a3 100644
|
|
|
ed0026 |
--- a/include/crm/common/ipcs.h
|
|
|
ed0026 |
+++ b/include/crm/common/ipcs.h
|
|
|
ed0026 |
@@ -41,14 +41,17 @@ enum client_type {
|
|
|
ed0026 |
struct crm_remote_s {
|
|
|
ed0026 |
/* Shared */
|
|
|
ed0026 |
char *buffer;
|
|
|
ed0026 |
+ size_t buffer_size;
|
|
|
ed0026 |
+ size_t buffer_offset;
|
|
|
ed0026 |
int auth_timeout;
|
|
|
ed0026 |
- bool authenticated; /* CIB-only */
|
|
|
ed0026 |
+ int tcp_socket;
|
|
|
ed0026 |
mainloop_io_t *source;
|
|
|
ed0026 |
|
|
|
ed0026 |
- char *token; /* CIB Only */
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- int tcp_socket;
|
|
|
ed0026 |
+ /* CIB-only */
|
|
|
ed0026 |
+ bool authenticated;
|
|
|
ed0026 |
+ char *token;
|
|
|
ed0026 |
|
|
|
ed0026 |
+ /* TLS only */
|
|
|
ed0026 |
# ifdef HAVE_GNUTLS_GNUTLS_H
|
|
|
ed0026 |
gnutls_session_t *tls_session;
|
|
|
ed0026 |
bool tls_handshake_complete;
|
|
|
ed0026 |
diff --git a/lib/common/remote.c b/lib/common/remote.c
|
|
|
ed0026 |
index 8b00f16..297c75a 100644
|
|
|
ed0026 |
--- a/lib/common/remote.c
|
|
|
ed0026 |
+++ b/lib/common/remote.c
|
|
|
ed0026 |
@@ -34,6 +34,8 @@
|
|
|
ed0026 |
#include <fcntl.h>
|
|
|
ed0026 |
#include <glib.h>
|
|
|
ed0026 |
|
|
|
ed0026 |
+#include <bzlib.h>
|
|
|
ed0026 |
+
|
|
|
ed0026 |
#include <crm/common/ipcs.h>
|
|
|
ed0026 |
#include <crm/common/xml.h>
|
|
|
ed0026 |
#include <crm/common/mainloop.h>
|
|
|
ed0026 |
@@ -57,6 +59,17 @@ const int anon_tls_kx_order[] = {
|
|
|
ed0026 |
0
|
|
|
ed0026 |
};
|
|
|
ed0026 |
|
|
|
ed0026 |
+struct crm_remote_header_v0
|
|
|
ed0026 |
+{
|
|
|
ed0026 |
+ uint64_t id;
|
|
|
ed0026 |
+ uint64_t flags;
|
|
|
ed0026 |
+ uint32_t error;
|
|
|
ed0026 |
+ uint32_t version;
|
|
|
ed0026 |
+ uint32_t size_total;
|
|
|
ed0026 |
+ uint32_t payload_uncompressed;
|
|
|
ed0026 |
+ uint32_t payload_compressed;
|
|
|
ed0026 |
+};
|
|
|
ed0026 |
+
|
|
|
ed0026 |
int
|
|
|
ed0026 |
crm_initiate_client_tls_handshake(crm_remote_t * remote, int timeout_ms)
|
|
|
ed0026 |
{
|
|
|
ed0026 |
@@ -176,97 +189,6 @@ crm_send_tls(gnutls_session_t * session, const char *buf, size_t len)
|
|
|
ed0026 |
|
|
|
ed0026 |
return rc < 0 ? rc : total_send;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
-
|
|
|
ed0026 |
-/*!
|
|
|
ed0026 |
- * \internal
|
|
|
ed0026 |
- * \brief Read bytes off non blocking tls session.
|
|
|
ed0026 |
- *
|
|
|
ed0026 |
- * \param session - tls session to read
|
|
|
ed0026 |
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
|
|
|
ed0026 |
- *
|
|
|
ed0026 |
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
|
|
|
ed0026 |
- * This function will return once max_size is met, the socket read buffer
|
|
|
ed0026 |
- * is empty, or an error is encountered.
|
|
|
ed0026 |
- *
|
|
|
ed0026 |
- * \retval '\0' terminated buffer on success
|
|
|
ed0026 |
- */
|
|
|
ed0026 |
-static char *
|
|
|
ed0026 |
-crm_recv_tls(gnutls_session_t * session, size_t max_size, size_t * recv_len, int *disconnected)
|
|
|
ed0026 |
-{
|
|
|
ed0026 |
- char *buf = NULL;
|
|
|
ed0026 |
- int rc = 0;
|
|
|
ed0026 |
- size_t len = 0;
|
|
|
ed0026 |
- size_t chunk_size = max_size ? max_size : 1024;
|
|
|
ed0026 |
- size_t buf_size = 0;
|
|
|
ed0026 |
- size_t read_size = 0;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- if (session == NULL) {
|
|
|
ed0026 |
- if (disconnected) {
|
|
|
ed0026 |
- *disconnected = 1;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- buf = calloc(1, chunk_size + 1);
|
|
|
ed0026 |
- buf_size = chunk_size;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- while (TRUE) {
|
|
|
ed0026 |
- read_size = buf_size - len;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- /* automatically grow the buffer when needed if max_size is not set. */
|
|
|
ed0026 |
- if (!max_size && (read_size < (chunk_size / 2))) {
|
|
|
ed0026 |
- buf_size += chunk_size;
|
|
|
ed0026 |
- crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
|
|
|
ed0026 |
- buf_size);
|
|
|
ed0026 |
- buf = realloc(buf, buf_size + 1);
|
|
|
ed0026 |
- CRM_ASSERT(buf != NULL);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- read_size = buf_size - len;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- rc = gnutls_record_recv(*session, buf + len, read_size);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- if (rc > 0) {
|
|
|
ed0026 |
- crm_trace("Got %d more bytes.", rc);
|
|
|
ed0026 |
- len += rc;
|
|
|
ed0026 |
- /* always null terminate buffer, the +1 to alloc always allows for this. */
|
|
|
ed0026 |
- buf[len] = '\0';
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- if (max_size && (max_size == read_size)) {
|
|
|
ed0026 |
- crm_trace("Buffer max read size %d met", max_size);
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- /* process any errors. */
|
|
|
ed0026 |
- if (rc == GNUTLS_E_INTERRUPTED) {
|
|
|
ed0026 |
- crm_trace("EINTR encoutered, retry tls read");
|
|
|
ed0026 |
- } else if (rc == GNUTLS_E_AGAIN) {
|
|
|
ed0026 |
- crm_trace("non-blocking, exiting read on rc = %d", rc);
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- } else if (rc <= 0) {
|
|
|
ed0026 |
- if (rc == 0) {
|
|
|
ed0026 |
- crm_debug("EOF encoutered during TLS read");
|
|
|
ed0026 |
- } else {
|
|
|
ed0026 |
- crm_debug("Error receiving message: %s (%d)", gnutls_strerror(rc), rc);
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- if (disconnected) {
|
|
|
ed0026 |
- *disconnected = 1;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- done:
|
|
|
ed0026 |
- if (recv_len) {
|
|
|
ed0026 |
- *recv_len = len;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- if (!len) {
|
|
|
ed0026 |
- free(buf);
|
|
|
ed0026 |
- buf = NULL;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- return buf;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
-}
|
|
|
ed0026 |
#endif
|
|
|
ed0026 |
|
|
|
ed0026 |
static int
|
|
|
ed0026 |
@@ -310,141 +232,57 @@ crm_send_plaintext(int sock, const char *buf, size_t len)
|
|
|
ed0026 |
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
-/*!
|
|
|
ed0026 |
- * \internal
|
|
|
ed0026 |
- * \brief Read bytes off non blocking socket.
|
|
|
ed0026 |
- *
|
|
|
ed0026 |
- * \param session - tls session to read
|
|
|
ed0026 |
- * \param max_size - max bytes allowed to read for buffer. 0 assumes no limit
|
|
|
ed0026 |
- *
|
|
|
ed0026 |
- * \note only use with NON-Blocking sockets. Should only be used after polling socket.
|
|
|
ed0026 |
- * This function will return once max_size is met, the socket read buffer
|
|
|
ed0026 |
- * is empty, or an error is encountered.
|
|
|
ed0026 |
- *
|
|
|
ed0026 |
- * \retval '\0' terminated buffer on success
|
|
|
ed0026 |
- */
|
|
|
ed0026 |
-static char *
|
|
|
ed0026 |
-crm_recv_plaintext(int sock, size_t max_size, size_t * recv_len, int *disconnected)
|
|
|
ed0026 |
-{
|
|
|
ed0026 |
- char *buf = NULL;
|
|
|
ed0026 |
- ssize_t rc = 0;
|
|
|
ed0026 |
- ssize_t len = 0;
|
|
|
ed0026 |
- ssize_t chunk_size = max_size ? max_size : 1024;
|
|
|
ed0026 |
- size_t buf_size = 0;
|
|
|
ed0026 |
- size_t read_size = 0;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- if (sock <= 0) {
|
|
|
ed0026 |
- if (disconnected) {
|
|
|
ed0026 |
- *disconnected = 1;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- buf = calloc(1, chunk_size + 1);
|
|
|
ed0026 |
- buf_size = chunk_size;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- while (TRUE) {
|
|
|
ed0026 |
- errno = 0;
|
|
|
ed0026 |
- read_size = buf_size - len;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- /* automatically grow the buffer when needed if max_size is not set. */
|
|
|
ed0026 |
- if (!max_size && (read_size < (chunk_size / 2))) {
|
|
|
ed0026 |
- buf_size += chunk_size;
|
|
|
ed0026 |
- crm_trace("Grow buffer by %d more bytes. buf is now %d bytes", (int)chunk_size,
|
|
|
ed0026 |
- buf_size);
|
|
|
ed0026 |
- buf = realloc(buf, buf_size + 1);
|
|
|
ed0026 |
- CRM_ASSERT(buf != NULL);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- read_size = buf_size - len;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- rc = read(sock, buf + len, chunk_size);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- if (rc > 0) {
|
|
|
ed0026 |
- crm_trace("Got %d more bytes. errno=%d", (int)rc, errno);
|
|
|
ed0026 |
- len += rc;
|
|
|
ed0026 |
- /* always null terminate buffer, the +1 to alloc always allows for this. */
|
|
|
ed0026 |
- buf[len] = '\0';
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- if (max_size && (max_size == read_size)) {
|
|
|
ed0026 |
- crm_trace("Buffer max read size %d met", max_size);
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- if (rc > 0) {
|
|
|
ed0026 |
- continue;
|
|
|
ed0026 |
- } else if (rc == 0) {
|
|
|
ed0026 |
- if (disconnected) {
|
|
|
ed0026 |
- *disconnected = 1;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- crm_trace("EOF encoutered during read");
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- /* process errors */
|
|
|
ed0026 |
- if (errno == EINTR) {
|
|
|
ed0026 |
- crm_trace("EINTER encoutered, retry socket read.");
|
|
|
ed0026 |
- } else if (errno == EAGAIN) {
|
|
|
ed0026 |
- crm_trace("non-blocking, exiting read on rc = %d", rc);
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- } else if (errno <= 0) {
|
|
|
ed0026 |
- if (disconnected) {
|
|
|
ed0026 |
- *disconnected = 1;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- crm_debug("Error receiving message: %d", (int)rc);
|
|
|
ed0026 |
- goto done;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- done:
|
|
|
ed0026 |
- if (recv_len) {
|
|
|
ed0026 |
- *recv_len = len;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- if (!len) {
|
|
|
ed0026 |
- free(buf);
|
|
|
ed0026 |
- buf = NULL;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- return buf;
|
|
|
ed0026 |
-}
|
|
|
ed0026 |
-
|
|
|
ed0026 |
static int
|
|
|
ed0026 |
-crm_remote_send_raw(crm_remote_t * remote, const char *buf, size_t len)
|
|
|
ed0026 |
+crm_remote_sendv(crm_remote_t * remote, struct iovec * iov, int iovs)
|
|
|
ed0026 |
{
|
|
|
ed0026 |
+ int lpc = 0;
|
|
|
ed0026 |
int rc = -ESOCKTNOSUPPORT;
|
|
|
ed0026 |
|
|
|
ed0026 |
- if (remote->tcp_socket) {
|
|
|
ed0026 |
- rc = crm_send_plaintext(remote->tcp_socket, buf, len);
|
|
|
ed0026 |
+ for(; lpc < iovs; lpc++) {
|
|
|
ed0026 |
+ if (remote->tcp_socket) {
|
|
|
ed0026 |
+ rc = crm_send_plaintext(remote->tcp_socket, iov[lpc].iov_base, iov[lpc].iov_len);
|
|
|
ed0026 |
#ifdef HAVE_GNUTLS_GNUTLS_H
|
|
|
ed0026 |
|
|
|
ed0026 |
- } else if (remote->tls_session) {
|
|
|
ed0026 |
- rc = crm_send_tls(remote->tls_session, buf, len);
|
|
|
ed0026 |
+ } else if (remote->tls_session) {
|
|
|
ed0026 |
+ rc = crm_send_tls(remote->tls_session, iov[lpc].iov_base, iov[lpc].iov_len);
|
|
|
ed0026 |
#endif
|
|
|
ed0026 |
- } else {
|
|
|
ed0026 |
- crm_err("Unsupported connection type");
|
|
|
ed0026 |
+ } else {
|
|
|
ed0026 |
+ crm_err("Unsupported connection type");
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
}
|
|
|
ed0026 |
return rc;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
+#define PCMK_TLS_VERSION 1
|
|
|
ed0026 |
+
|
|
|
ed0026 |
int
|
|
|
ed0026 |
crm_remote_send(crm_remote_t * remote, xmlNode * msg)
|
|
|
ed0026 |
{
|
|
|
ed0026 |
+ static uint64_t id = 0;
|
|
|
ed0026 |
int rc = -1;
|
|
|
ed0026 |
- char *xml_text = NULL;
|
|
|
ed0026 |
- int len = 0;
|
|
|
ed0026 |
+ char *xml_text = dump_xml_unformatted(msg);
|
|
|
ed0026 |
|
|
|
ed0026 |
- xml_text = dump_xml_unformatted(msg);
|
|
|
ed0026 |
- if (xml_text) {
|
|
|
ed0026 |
- len = strlen(xml_text);
|
|
|
ed0026 |
- } else {
|
|
|
ed0026 |
+ struct iovec iov[2];
|
|
|
ed0026 |
+ struct crm_remote_header_v0 *header = calloc(1, sizeof(struct crm_remote_header_v0));
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ if (xml_text == NULL) {
|
|
|
ed0026 |
crm_err("Invalid XML, can not send msg");
|
|
|
ed0026 |
return -1;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
- rc = crm_remote_send_raw(remote, xml_text, len);
|
|
|
ed0026 |
- if (rc >= 0) {
|
|
|
ed0026 |
- rc = crm_remote_send_raw(remote, REMOTE_MSG_TERMINATOR, strlen(REMOTE_MSG_TERMINATOR));
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
+ iov[0].iov_base = header;
|
|
|
ed0026 |
+ iov[0].iov_len = sizeof(struct crm_remote_header_v0);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ iov[1].iov_base = xml_text;
|
|
|
ed0026 |
+ iov[1].iov_len = 1 + strlen(xml_text);
|
|
|
ed0026 |
|
|
|
ed0026 |
+ id++;
|
|
|
ed0026 |
+ header->id = id;
|
|
|
ed0026 |
+ header->version = PCMK_TLS_VERSION;
|
|
|
ed0026 |
+ header->size_total = iov[0].iov_len + iov[1].iov_len;
|
|
|
ed0026 |
+ header->payload_uncompressed = iov[1].iov_len;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ rc = crm_remote_sendv(remote, iov, 2);
|
|
|
ed0026 |
if (rc < 0) {
|
|
|
ed0026 |
crm_err("Failed to send remote msg, rc = %d", rc);
|
|
|
ed0026 |
}
|
|
|
ed0026 |
@@ -461,44 +299,55 @@ crm_remote_send(crm_remote_t * remote, xmlNode * msg)
|
|
|
ed0026 |
xmlNode *
|
|
|
ed0026 |
crm_remote_parse_buffer(crm_remote_t * remote)
|
|
|
ed0026 |
{
|
|
|
ed0026 |
- char *buf = NULL;
|
|
|
ed0026 |
- char *start = NULL;
|
|
|
ed0026 |
- char *end = NULL;
|
|
|
ed0026 |
xmlNode *xml = NULL;
|
|
|
ed0026 |
+ size_t offset = sizeof(struct crm_remote_header_v0);
|
|
|
ed0026 |
+ struct crm_remote_header_v0 *header = NULL;
|
|
|
ed0026 |
|
|
|
ed0026 |
if (remote->buffer == NULL) {
|
|
|
ed0026 |
return NULL;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ } else if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
|
|
|
ed0026 |
+ return NULL;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
/* take ownership of the buffer */
|
|
|
ed0026 |
- buf = remote->buffer;
|
|
|
ed0026 |
- remote->buffer = NULL;
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- /* MSGS are separated by a '\r\n\r\n'. Split a message off the buffer and return it. */
|
|
|
ed0026 |
- start = buf;
|
|
|
ed0026 |
- end = strstr(start, REMOTE_MSG_TERMINATOR);
|
|
|
ed0026 |
+ remote->buffer_offset = 0;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ header = (struct crm_remote_header_v0 *)remote->buffer;
|
|
|
ed0026 |
+ /* Support compression on the receiving end now, in case we ever want to add it later */
|
|
|
ed0026 |
+ if (header->payload_compressed) {
|
|
|
ed0026 |
+ int rc = 0;
|
|
|
ed0026 |
+ unsigned int size_u = 1 + header->payload_uncompressed;
|
|
|
ed0026 |
+ char *uncompressed = calloc(1, offset + size_u);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ crm_trace("Decompressing message data %d bytes into %d bytes",
|
|
|
ed0026 |
+ header->payload_compressed, size_u);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ rc = BZ2_bzBuffToBuffDecompress(uncompressed + offset, &size_u,
|
|
|
ed0026 |
+ remote->buffer + offset,
|
|
|
ed0026 |
+ header->payload_compressed, 1, 0);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ if (rc != BZ_OK) {
|
|
|
ed0026 |
+ crm_err("Decompression failed: %s (%d)", bz2_strerror(rc), rc);
|
|
|
ed0026 |
+ free(uncompressed);
|
|
|
ed0026 |
+ return NULL;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
|
|
|
ed0026 |
- while (!xml && end) {
|
|
|
ed0026 |
+ CRM_ASSERT(size_u == header->payload_uncompressed);
|
|
|
ed0026 |
|
|
|
ed0026 |
- /* grab the message */
|
|
|
ed0026 |
- end[0] = '\0';
|
|
|
ed0026 |
- end += strlen(REMOTE_MSG_TERMINATOR);
|
|
|
ed0026 |
+ memcpy(uncompressed, remote->buffer, offset); /* Preserve the header */
|
|
|
ed0026 |
+ header = (struct crm_remote_header_v0 *)uncompressed;
|
|
|
ed0026 |
|
|
|
ed0026 |
- xml = string2xml(start);
|
|
|
ed0026 |
- if (xml == NULL) {
|
|
|
ed0026 |
- crm_err("Couldn't parse: '%.120s'", start);
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- start = end;
|
|
|
ed0026 |
- end = strstr(start, REMOTE_MSG_TERMINATOR);
|
|
|
ed0026 |
+ free(remote->buffer);
|
|
|
ed0026 |
+ remote->buffer_size = offset + size_u;
|
|
|
ed0026 |
+ remote->buffer = uncompressed;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
- if (xml && start) {
|
|
|
ed0026 |
- /* we have msgs left over, save it until next time */
|
|
|
ed0026 |
- remote->buffer = strdup(start);
|
|
|
ed0026 |
- free(buf);
|
|
|
ed0026 |
- } else if (!xml) {
|
|
|
ed0026 |
- /* no msg present */
|
|
|
ed0026 |
- remote->buffer = buf;
|
|
|
ed0026 |
+ CRM_ASSERT(remote->buffer[sizeof(struct crm_remote_header_v0) + header->payload_uncompressed - 1] == 0);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ xml = string2xml(remote->buffer + offset);
|
|
|
ed0026 |
+ if (xml == NULL) {
|
|
|
ed0026 |
+ crm_err("Couldn't parse: '%.120s'", remote->buffer + offset);
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
return xml;
|
|
|
ed0026 |
@@ -559,6 +408,105 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
|
|
|
ed0026 |
return rc;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+/*!
|
|
|
ed0026 |
+ * \internal
|
|
|
ed0026 |
+ * \brief Read bytes off non blocking remote connection.
|
|
|
ed0026 |
+ *
|
|
|
ed0026 |
+ * \note only use with NON-Blocking sockets. Should only be used after polling socket.
|
|
|
ed0026 |
+ * This function will return once max_size is met, the socket read buffer
|
|
|
ed0026 |
+ * is empty, or an error is encountered.
|
|
|
ed0026 |
+ *
|
|
|
ed0026 |
+ * \retval number of bytes received
|
|
|
ed0026 |
+ */
|
|
|
ed0026 |
+static size_t
|
|
|
ed0026 |
+crm_remote_recv_once(crm_remote_t * remote)
|
|
|
ed0026 |
+{
|
|
|
ed0026 |
+ int rc = 0;
|
|
|
ed0026 |
+ size_t read_len = sizeof(struct crm_remote_header_v0);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ if(remote->buffer_offset >= sizeof(struct crm_remote_header_v0)) {
|
|
|
ed0026 |
+ struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ read_len = hdr->size_total;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ /* automatically grow the buffer when needed */
|
|
|
ed0026 |
+ if(remote->buffer_size < read_len) {
|
|
|
ed0026 |
+ remote->buffer_size = 2 * read_len;
|
|
|
ed0026 |
+ crm_trace("Expanding buffer to %u bytes", remote->buffer_size);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ remote->buffer = realloc(remote->buffer, remote->buffer_size + 1);
|
|
|
ed0026 |
+ CRM_ASSERT(remote->buffer != NULL);
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ if (remote->tcp_socket) {
|
|
|
ed0026 |
+ errno = 0;
|
|
|
ed0026 |
+ rc = read(remote->tcp_socket,
|
|
|
ed0026 |
+ remote->buffer + remote->buffer_offset,
|
|
|
ed0026 |
+ remote->buffer_size - remote->buffer_offset);
|
|
|
ed0026 |
+ if(rc < 0) {
|
|
|
ed0026 |
+ rc = -errno;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+#ifdef HAVE_GNUTLS_GNUTLS_H
|
|
|
ed0026 |
+ } else if (remote->tls_session) {
|
|
|
ed0026 |
+ rc = gnutls_record_recv(*(remote->tls_session),
|
|
|
ed0026 |
+ remote->buffer + remote->buffer_offset,
|
|
|
ed0026 |
+ remote->buffer_size - remote->buffer_offset);
|
|
|
ed0026 |
+ if (rc == GNUTLS_E_INTERRUPTED) {
|
|
|
ed0026 |
+ rc = -EINTR;
|
|
|
ed0026 |
+ } else if (rc == GNUTLS_E_AGAIN) {
|
|
|
ed0026 |
+ rc = -EAGAIN;
|
|
|
ed0026 |
+ } else if (rc < 0) {
|
|
|
ed0026 |
+ crm_info("TLS receive failed: %s (%d)", gnutls_strerror(rc), rc);
|
|
|
ed0026 |
+ rc = -pcmk_err_generic;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+#endif
|
|
|
ed0026 |
+ } else {
|
|
|
ed0026 |
+ crm_err("Unsupported connection type");
|
|
|
ed0026 |
+ return -ESOCKTNOSUPPORT;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ /* process any errors. */
|
|
|
ed0026 |
+ if (rc > 0) {
|
|
|
ed0026 |
+ remote->buffer_offset += rc;
|
|
|
ed0026 |
+ /* always null terminate buffer, the +1 to alloc always allows for this. */
|
|
|
ed0026 |
+ remote->buffer[remote->buffer_offset] = '\0';
|
|
|
ed0026 |
+ crm_trace("Received %u more bytes, %u total", rc, remote->buffer_offset);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ } else if (rc == -EINTR || rc == -EAGAIN) {
|
|
|
ed0026 |
+ crm_trace("non-blocking, exiting read: %s (%d)", pcmk_strerror(rc), rc);
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ } else if (rc == 0) {
|
|
|
ed0026 |
+ crm_debug("EOF encoutered after %u bytes", remote->buffer_offset);
|
|
|
ed0026 |
+ return -ENOTCONN;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ } else if (rc <= 0) {
|
|
|
ed0026 |
+ crm_debug("Error receiving message after %u bytes: %s (%d)",
|
|
|
ed0026 |
+ remote->buffer_offset, gnutls_strerror(rc), rc);
|
|
|
ed0026 |
+ return -ENOTCONN;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ if(remote->buffer_offset < sizeof(struct crm_remote_header_v0)) {
|
|
|
ed0026 |
+ crm_trace("Not enough data to fill header: %u < %u bytes",
|
|
|
ed0026 |
+ remote->buffer_offset, sizeof(struct crm_remote_header_v0));
|
|
|
ed0026 |
+ return -EAGAIN;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ } else {
|
|
|
ed0026 |
+ struct crm_remote_header_v0 *hdr = (struct crm_remote_header_v0 *)remote->buffer;
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ if(remote->buffer_offset < hdr->size_total) {
|
|
|
ed0026 |
+ crm_trace("Read less than the advertised length: %u < %u bytes",
|
|
|
ed0026 |
+ remote->buffer_offset, hdr->size_total);
|
|
|
ed0026 |
+ return -EAGAIN;
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
+
|
|
|
ed0026 |
+ crm_trace("Read full message of %u bytes", remote->buffer_offset);
|
|
|
ed0026 |
+ return remote->buffer_offset;
|
|
|
ed0026 |
+}
|
|
|
ed0026 |
+
|
|
|
ed0026 |
/*!
|
|
|
ed0026 |
* \internal
|
|
|
ed0026 |
* \brief Read data off the socket until at least one full message is present or timeout occures.
|
|
|
ed0026 |
@@ -569,10 +517,8 @@ crm_remote_ready(crm_remote_t * remote, int timeout /* ms */ )
|
|
|
ed0026 |
gboolean
|
|
|
ed0026 |
crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconnected)
|
|
|
ed0026 |
{
|
|
|
ed0026 |
- int ret;
|
|
|
ed0026 |
- size_t request_len = 0;
|
|
|
ed0026 |
+ int rc;
|
|
|
ed0026 |
time_t start = time(NULL);
|
|
|
ed0026 |
- char *raw_request = NULL;
|
|
|
ed0026 |
int remaining_timeout = 0;
|
|
|
ed0026 |
|
|
|
ed0026 |
if (total_timeout == 0) {
|
|
|
ed0026 |
@@ -588,57 +534,30 @@ crm_remote_recv(crm_remote_t * remote, int total_timeout /*ms */ , int *disconne
|
|
|
ed0026 |
/* read some more off the tls buffer if we still have time left. */
|
|
|
ed0026 |
crm_trace("waiting to receive remote msg, starting timeout %d, remaining_timeout %d",
|
|
|
ed0026 |
total_timeout, remaining_timeout);
|
|
|
ed0026 |
- ret = crm_remote_ready(remote, remaining_timeout);
|
|
|
ed0026 |
- raw_request = NULL;
|
|
|
ed0026 |
+ rc = crm_remote_ready(remote, remaining_timeout);
|
|
|
ed0026 |
|
|
|
ed0026 |
- if (ret == 0) {
|
|
|
ed0026 |
+ if (rc == 0) {
|
|
|
ed0026 |
crm_err("poll timed out (%d ms) while waiting to receive msg", remaining_timeout);
|
|
|
ed0026 |
return FALSE;
|
|
|
ed0026 |
|
|
|
ed0026 |
- } else if (ret < 0) {
|
|
|
ed0026 |
- if (errno != EINTR) {
|
|
|
ed0026 |
- crm_debug("poll returned error while waiting for msg, rc: %d, errno: %d", ret,
|
|
|
ed0026 |
- errno);
|
|
|
ed0026 |
- *disconnected = 1;
|
|
|
ed0026 |
- return FALSE;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
- crm_debug("poll EINTR encountered during poll, retrying");
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- } else if (remote->tcp_socket) {
|
|
|
ed0026 |
- raw_request = crm_recv_plaintext(remote->tcp_socket, 0, &request_len, disconnected);
|
|
|
ed0026 |
+ } else if(rc < 0) {
|
|
|
ed0026 |
+ crm_debug("poll() failed: %s (%d)", pcmk_strerror(rc), rc);
|
|
|
ed0026 |
|
|
|
ed0026 |
-#ifdef HAVE_GNUTLS_GNUTLS_H
|
|
|
ed0026 |
- } else if (remote->tls_session) {
|
|
|
ed0026 |
- raw_request = crm_recv_tls(remote->tls_session, 0, &request_len, disconnected);
|
|
|
ed0026 |
-#endif
|
|
|
ed0026 |
} else {
|
|
|
ed0026 |
- crm_err("Unsupported connection type");
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- if (!raw_request) {
|
|
|
ed0026 |
- crm_debug("Empty msg received after poll");
|
|
|
ed0026 |
- continue;
|
|
|
ed0026 |
+ rc = crm_remote_recv_once(remote);
|
|
|
ed0026 |
+ if(rc > 0) {
|
|
|
ed0026 |
+ return TRUE;
|
|
|
ed0026 |
+ } else if (rc < 0) {
|
|
|
ed0026 |
+ crm_debug("recv() failed: %s (%d)", pcmk_strerror(rc), rc);
|
|
|
ed0026 |
+ }
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
- if (remote->buffer) {
|
|
|
ed0026 |
- int old_len = strlen(remote->buffer);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- crm_trace("Expanding recv buffer from %d to %d", old_len, old_len + request_len);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- remote->buffer = realloc(remote->buffer, old_len + request_len + 1);
|
|
|
ed0026 |
- memcpy(remote->buffer + old_len, raw_request, request_len);
|
|
|
ed0026 |
- *(remote->buffer + old_len + request_len) = '\0';
|
|
|
ed0026 |
- free(raw_request);
|
|
|
ed0026 |
-
|
|
|
ed0026 |
- } else {
|
|
|
ed0026 |
- remote->buffer = raw_request;
|
|
|
ed0026 |
+ if(rc == -ENOTCONN) {
|
|
|
ed0026 |
+ *disconnected = 1;
|
|
|
ed0026 |
+ return FALSE;
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
- if (strstr(remote->buffer, REMOTE_MSG_TERMINATOR)) {
|
|
|
ed0026 |
- return TRUE;
|
|
|
ed0026 |
- }
|
|
|
ed0026 |
+ remaining_timeout = remaining_timeout - ((time(NULL) - start) * 1000);
|
|
|
ed0026 |
}
|
|
|
ed0026 |
|
|
|
ed0026 |
return FALSE;
|