Blame SOURCES/client-Switch-to-non-blocking-sockets.patch

c530df
From 3d08f71f576a381955f07a91198f5dcb320026ba Mon Sep 17 00:00:00 2001
c530df
From: Alexander Scheel <ascheel@redhat.com>
c530df
Date: Wed, 2 Aug 2017 15:11:49 -0400
c530df
Subject: [PATCH] [client] Switch to non-blocking sockets
c530df
c530df
Switch the gssproxy client library to non-blocking sockets, allowing
c530df
for timeout and retry operations.  The client will automatically retry
c530df
both send() and recv() operations three times on ETIMEDOUT.  If the
c530df
combined send() and recv() hit the three time limit, ETIMEDOUT will be
c530df
exposed to the caller in the minor status.
c530df
c530df
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
c530df
Reviewed-by: Simo Sorce <simo@redhat.com>
c530df
[rharwood@redhat.com: commit message cleanups, rebased]
c530df
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
c530df
(cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
c530df
---
c530df
 proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++++++++++---
c530df
 1 file changed, 295 insertions(+), 22 deletions(-)
c530df
c530df
diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
c530df
index 2133618..dba23a6 100644
c530df
--- a/proxy/src/client/gpm_common.c
c530df
+++ b/proxy/src/client/gpm_common.c
c530df
@@ -7,9 +7,15 @@
c530df
 #include <stdlib.h>
c530df
 #include <time.h>
c530df
 #include <pthread.h>
c530df
+#include <sys/epoll.h>
c530df
+#include <fcntl.h>
c530df
+#include <sys/timerfd.h>
c530df
 
c530df
 #define FRAGMENT_BIT (1 << 31)
c530df
 
c530df
+#define RESPONSE_TIMEOUT 15
c530df
+#define MAX_TIMEOUT_RETRY 3
c530df
+
c530df
 struct gpm_ctx {
c530df
     pthread_mutex_t lock;
c530df
     int fd;
c530df
@@ -20,6 +26,9 @@ struct gpm_ctx {
c530df
     gid_t gid;
c530df
 
c530df
     int next_xid;
c530df
+
c530df
+    int epollfd;
c530df
+    int timerfd;
c530df
 };
c530df
 
c530df
 /* a single global struct is not particularly efficient,
c530df
@@ -39,6 +48,8 @@ static void gpm_init_once(void)
c530df
     pthread_mutex_init(&gpm_global_ctx.lock, &attr);
c530df
 
c530df
     gpm_global_ctx.fd = -1;
c530df
+    gpm_global_ctx.epollfd = -1;
c530df
+    gpm_global_ctx.timerfd = -1;
c530df
 
c530df
     seedp = time(NULL) + getpid() + pthread_self();
c530df
     gpm_global_ctx.next_xid = rand_r(&seedp);
c530df
@@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
c530df
     struct sockaddr_un addr = {0};
c530df
     char name[PATH_MAX];
c530df
     int ret;
c530df
+    unsigned flags;
c530df
     int fd = -1;
c530df
 
c530df
     ret = get_pipe_name(name);
c530df
@@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
c530df
         goto done;
c530df
     }
c530df
 
c530df
+    ret = fcntl(fd, F_GETFD, &flags);
c530df
+    if (ret != 0) {
c530df
+        ret = errno;
c530df
+        goto done;
c530df
+    }
c530df
+
c530df
+    ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
c530df
+    if (ret != 0) {
c530df
+        ret = errno;
c530df
+        goto done;
c530df
+    }
c530df
+
c530df
     ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
c530df
     if (ret == -1) {
c530df
         ret = errno;
c530df
@@ -163,6 +187,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
c530df
     return pthread_mutex_unlock(&gpmctx->lock);
c530df
 }
c530df
 
c530df
+static void gpm_timer_close(struct gpm_ctx *gpmctx) {
c530df
+    if (gpmctx->timerfd < 0) {
c530df
+        return;
c530df
+    }
c530df
+
c530df
+    close(gpmctx->timerfd);
c530df
+    gpmctx->timerfd = -1;
c530df
+}
c530df
+
c530df
+static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
c530df
+    int ret;
c530df
+    struct itimerspec its;
c530df
+
c530df
+    if (gpmctx->timerfd >= 0) {
c530df
+        gpm_timer_close(gpmctx);
c530df
+    }
c530df
+
c530df
+    gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
c530df
+    if (gpmctx->timerfd < 0) {
c530df
+        return errno;
c530df
+    }
c530df
+
c530df
+    its.it_interval.tv_sec = timeout_seconds;
c530df
+    its.it_interval.tv_nsec = 0;
c530df
+    its.it_value.tv_sec = timeout_seconds;
c530df
+    its.it_value.tv_nsec = 0;
c530df
+
c530df
+    ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
c530df
+    if (ret) {
c530df
+        ret = errno;
c530df
+        gpm_timer_close(gpmctx);
c530df
+        return ret;
c530df
+    }
c530df
+
c530df
+    return 0;
c530df
+}
c530df
+
c530df
+static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
c530df
+    if (gpmctx->epollfd < 0) {
c530df
+        return;
c530df
+    }
c530df
+
c530df
+    close(gpmctx->epollfd);
c530df
+    gpmctx->epollfd = -1;
c530df
+}
c530df
+
c530df
+static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
c530df
+    struct epoll_event ev;
c530df
+    int ret;
c530df
+
c530df
+    if (gpmctx->epollfd >= 0) {
c530df
+        gpm_epoll_close(gpmctx);
c530df
+    }
c530df
+
c530df
+    gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
c530df
+    if (gpmctx->epollfd == -1) {
c530df
+        return errno;
c530df
+    }
c530df
+
c530df
+    /* Add timer */
c530df
+    ev.events = EPOLLIN;
c530df
+    ev.data.fd = gpmctx->timerfd;
c530df
+    ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev;;
c530df
+    if (ret == -1) {
c530df
+        ret = errno;
c530df
+        gpm_epoll_close(gpmctx);
c530df
+        return ret;
c530df
+    }
c530df
+
c530df
+    return ret;
c530df
+}
c530df
+
c530df
+static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
c530df
+    int ret;
c530df
+    int epoll_ret;
c530df
+    struct epoll_event ev;
c530df
+    struct epoll_event events[2];
c530df
+    uint64_t timer_read;
c530df
+
c530df
+    if (gpmctx->epollfd < 0) {
c530df
+        ret = gpm_epoll_setup(gpmctx);
c530df
+        if (ret)
c530df
+            return ret;
c530df
+    }
c530df
+
c530df
+    ev.events = event_flags;
c530df
+    ev.data.fd = gpmctx->fd;
c530df
+    epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev;;
c530df
+    if (epoll_ret == -1) {
c530df
+        ret = errno;
c530df
+        gpm_epoll_close(gpmctx);
c530df
+        return ret;
c530df
+    }
c530df
+
c530df
+    do {
c530df
+        epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
c530df
+    } while (epoll_ret < 0 && errno == EINTR);
c530df
+
c530df
+    if (epoll_ret < 0) {
c530df
+        /* Error while waiting that isn't EINTR */
c530df
+        ret = errno;
c530df
+        gpm_epoll_close(gpmctx);
c530df
+    } else if (epoll_ret == 0) {
c530df
+        /* Shouldn't happen as timeout == -1; treat it like a timeout
c530df
+         * occurred. */
c530df
+        ret = ETIMEDOUT;
c530df
+        gpm_epoll_close(gpmctx);
c530df
+    } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
c530df
+        /* Got an event which is only our timer */
c530df
+        ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
c530df
+        if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
c530df
+            /* In the case when reading from the timer failed, don't hide the
c530df
+             * timer error behind ETIMEDOUT such that it isn't retried */
c530df
+            ret = errno;
c530df
+        } else {
c530df
+            /* If ret == 0, then we definitely timed out. Else, if ret == -1
c530df
+             * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
c530df
+             * edge case where epoll thinks the timer can be read, but it
c530df
+             * is blocking more; treat it like a TIMEOUT and retry, as
c530df
+             * nothing around us would handle EAGAIN from timer and retry
c530df
+             * it. */
c530df
+            ret = ETIMEDOUT;
c530df
+        }
c530df
+        gpm_epoll_close(gpmctx);
c530df
+    } else {
c530df
+        /* If ret == 2, then we ignore the timerfd; that way if the next
c530df
+         * operation cannot be performed immediately, we timeout and retry.
c530df
+         * If ret == 1 and data.fd == gpmctx->fd, return 0. */
c530df
+        ret = 0;
c530df
+    }
c530df
+
c530df
+    epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
c530df
+    if (epoll_ret == -1) {
c530df
+        /* If we previously had an error, expose that error instead of
c530df
+         * clobbering it with errno; else if no error, then assume it is
c530df
+         * better to notify of the error deleting the event than it is
c530df
+         * to continue. */
c530df
+        if (ret == 0)
c530df
+            ret = errno;
c530df
+        gpm_epoll_close(gpmctx);
c530df
+    }
c530df
+
c530df
+    return ret;
c530df
+}
c530df
+
c530df
+static int gpm_retry_socket(struct gpm_ctx *gpmctx)
c530df
+{
c530df
+    gpm_epoll_close(gpmctx);
c530df
+    gpm_close_socket(gpmctx);
c530df
+    return gpm_open_socket(gpmctx);
c530df
+}
c530df
+
c530df
 /* must be called after the lock has been grabbed */
c530df
 static int gpm_send_buffer(struct gpm_ctx *gpmctx,
c530df
                            char *buffer, uint32_t length)
c530df
@@ -183,8 +359,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
c530df
     retry = false;
c530df
     do {
c530df
         do {
c530df
+            ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
c530df
+            if (ret != 0) {
c530df
+                goto done;
c530df
+            }
c530df
+
c530df
             ret = 0;
c530df
-            wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
c530df
+            wn = write(gpmctx->fd, &size, sizeof(uint32_t));
c530df
             if (wn == -1) {
c530df
                 ret = errno;
c530df
             }
c530df
@@ -192,8 +373,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
c530df
         if (wn != 4) {
c530df
             /* reopen and retry once */
c530df
             if (retry == false) {
c530df
-                gpm_close_socket(gpmctx);
c530df
-                ret = gpm_open_socket(gpmctx);
c530df
+                ret = gpm_retry_socket(gpmctx);
c530df
                 if (ret == 0) {
c530df
                     retry = true;
c530df
                     continue;
c530df
@@ -208,9 +388,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
c530df
 
c530df
     pos = 0;
c530df
     while (length > pos) {
c530df
-        wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
c530df
+        ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
c530df
+        if (ret) {
c530df
+            goto done;
c530df
+        }
c530df
+
c530df
+        wn = write(gpmctx->fd, buffer + pos, length - pos);
c530df
         if (wn == -1) {
c530df
-            if (errno == EINTR) {
c530df
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
c530df
                 continue;
c530df
             }
c530df
             ret = errno;
c530df
@@ -231,7 +416,7 @@ done:
c530df
 
c530df
 /* must be called after the lock has been grabbed */
c530df
 static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
c530df
-                           char *buffer, uint32_t *length)
c530df
+                           char **buffer, uint32_t *length)
c530df
 {
c530df
     uint32_t size;
c530df
     ssize_t rn;
c530df
@@ -239,6 +424,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
c530df
     int ret;
c530df
 
c530df
     do {
c530df
+        ret = gpm_epoll_wait(gpmctx, EPOLLIN);
c530df
+        if (ret) {
c530df
+            goto done;
c530df
+        }
c530df
+
c530df
         ret = 0;
c530df
         rn = read(gpmctx->fd, &size, sizeof(uint32_t));
c530df
         if (rn == -1) {
c530df
@@ -258,11 +448,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
c530df
         goto done;
c530df
     }
c530df
 
c530df
+    *buffer = malloc(*length);
c530df
+    if (*buffer == NULL) {
c530df
+        ret = ENOMEM;
c530df
+        goto done;
c530df
+    }
c530df
+
c530df
     pos = 0;
c530df
     while (*length > pos) {
c530df
-        rn = read(gpmctx->fd, buffer + pos, *length - pos);
c530df
+        ret = gpm_epoll_wait(gpmctx, EPOLLIN);
c530df
+        if (ret) {
c530df
+            goto done;
c530df
+        }
c530df
+
c530df
+        rn = read(gpmctx->fd, *buffer + pos, *length - pos);
c530df
         if (rn == -1) {
c530df
-            if (errno == EINTR) {
c530df
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
c530df
                 continue;
c530df
             }
c530df
             ret = errno;
c530df
@@ -281,6 +482,7 @@ done:
c530df
     if (ret) {
c530df
         /* on errors we can only close the fd and return */
c530df
         gpm_close_socket(gpmctx);
c530df
+        gpm_epoll_close(gpmctx);
c530df
     }
c530df
     return ret;
c530df
 }
c530df
@@ -309,6 +511,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
c530df
     return &gpm_global_ctx;
c530df
 }
c530df
 
c530df
+static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
c530df
+                              uint32_t send_length, char** recv_buffer,
c530df
+                              uint32_t *recv_length)
c530df
+{
c530df
+    int ret;
c530df
+    int retry_count;
c530df
+
c530df
+    /* setup timer */
c530df
+    ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
c530df
+    if (ret)
c530df
+        return ret;
c530df
+
c530df
+    for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
c530df
+        /* send to proxy */
c530df
+        ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
c530df
+
c530df
+        if (ret == 0) {
c530df
+            /* No error, continue to recv */
c530df
+        } else if (ret == ETIMEDOUT) {
c530df
+            /* Close and reopen socket before trying again */
c530df
+            ret = gpm_retry_socket(gpmctx);
c530df
+            if (ret != 0)
c530df
+                return ret;
c530df
+            ret = ETIMEDOUT;
c530df
+
c530df
+            /* RETRY entire send */
c530df
+            continue;
c530df
+        } else {
c530df
+            /* Other error */
c530df
+            return ret;
c530df
+        }
c530df
+
c530df
+        /* receive answer */
c530df
+        ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
c530df
+        if (ret == 0) {
c530df
+            /* No error */
c530df
+            break;
c530df
+        } else if (ret == ETIMEDOUT) {
c530df
+            /* Close and reopen socket before trying again */
c530df
+            ret = gpm_retry_socket(gpmctx);
c530df
+
c530df
+            /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
c530df
+            free(recv_buffer);
c530df
+            recv_buffer = NULL;
c530df
+
c530df
+            if (ret != 0)
c530df
+                return ret;
c530df
+            ret = ETIMEDOUT;
c530df
+        } else {
c530df
+            /* Other error */
c530df
+            return ret;
c530df
+        }
c530df
+    }
c530df
+
c530df
+    return ret;
c530df
+}
c530df
+
c530df
 OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
c530df
                              gss_buffer_t buffer)
c530df
 {
c530df
@@ -399,15 +658,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
c530df
     gp_rpc_msg msg;
c530df
     XDR xdr_call_ctx;
c530df
     XDR xdr_reply_ctx;
c530df
-    char buffer[MAX_RPC_SIZE];
c530df
-    uint32_t length;
c530df
+    char *send_buffer = NULL;
c530df
+    char *recv_buffer = NULL;
c530df
+    uint32_t send_length;
c530df
+    uint32_t recv_length;
c530df
     uint32_t xid;
c530df
     bool xdrok;
c530df
     bool sockgrab = false;
c530df
     int ret;
c530df
 
c530df
-    xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
c530df
-    xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
c530df
+    send_buffer = malloc(MAX_RPC_SIZE);
c530df
+    if (send_buffer == NULL)
c530df
+        return ENOMEM;
c530df
+
c530df
+    xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
c530df
 
c530df
     memset(&msg, 0, sizeof(gp_rpc_msg));
c530df
     msg.header.type = GP_RPC_CALL;
c530df
@@ -450,22 +714,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
c530df
         goto done;
c530df
     }
c530df
 
c530df
-    /* send to proxy */
c530df
-    ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
c530df
-    if (ret) {
c530df
-        goto done;
c530df
-    }
c530df
+    /* set send_length */
c530df
+    send_length = xdr_getpos(&xdr_call_ctx);
c530df
 
c530df
-    /* receive answer */
c530df
-    ret = gpm_recv_buffer(gpmctx, buffer, &length);
c530df
-    if (ret) {
c530df
+    /* Send request, receive response with timeout */
c530df
+    ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
c530df
+                             &recv_length);
c530df
+    if (ret)
c530df
         goto done;
c530df
-    }
c530df
 
c530df
     /* release the lock */
c530df
     gpm_release_sock(gpmctx);
c530df
     sockgrab = false;
c530df
 
c530df
+    /* Create the reply context */
c530df
+    xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
c530df
+
c530df
     /* decode header */
c530df
     memset(&msg, 0, sizeof(gp_rpc_msg));
c530df
     xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg;;
c530df
@@ -489,12 +753,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
c530df
     }
c530df
 
c530df
 done:
c530df
+    gpm_timer_close(gpmctx);
c530df
+    gpm_epoll_close(gpmctx);
c530df
+
c530df
     if (sockgrab) {
c530df
         gpm_release_sock(gpmctx);
c530df
     }
c530df
     xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg;;
c530df
     xdr_destroy(&xdr_call_ctx);
c530df
-    xdr_destroy(&xdr_reply_ctx);
c530df
+
c530df
+    if (recv_buffer != NULL)
c530df
+        xdr_destroy(&xdr_reply_ctx);
c530df
+
c530df
+    free(send_buffer);
c530df
+    free(recv_buffer);
c530df
+
c530df
     return ret;
c530df
 }
c530df