Blame SOURCES/client-Switch-to-non-blocking-sockets.patch

68bf20
From 1962e6128a4d86a7c54977577e1e4224cadbb5f7 Mon Sep 17 00:00:00 2001
1f3433
From: Alexander Scheel <ascheel@redhat.com>
1f3433
Date: Wed, 2 Aug 2017 15:11:49 -0400
1f3433
Subject: [PATCH] [client] Switch to non-blocking sockets
1f3433
1f3433
Switch the gssproxy client library to non-blocking sockets, allowing
1f3433
for timeout and retry operations.  The client will automatically retry
1f3433
both send() and recv() operations three times on ETIMEDOUT.  If the
1f3433
combined send() and recv() hit the three time limit, ETIMEDOUT will be
1f3433
exposed to the caller in the minor status.
1f3433
1f3433
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
1f3433
Reviewed-by: Simo Sorce <simo@redhat.com>
1f3433
[rharwood@redhat.com: commit message cleanups, rebased]
1f3433
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
1f3433
(cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
1f3433
---
68bf20
 proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++---
1f3433
 1 file changed, 295 insertions(+), 22 deletions(-)
1f3433
1f3433
diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
1f3433
index 2133618..dba23a6 100644
1f3433
--- a/proxy/src/client/gpm_common.c
1f3433
+++ b/proxy/src/client/gpm_common.c
1f3433
@@ -7,9 +7,15 @@
1f3433
 #include <stdlib.h>
1f3433
 #include <time.h>
1f3433
 #include <pthread.h>
1f3433
+#include <sys/epoll.h>
1f3433
+#include <fcntl.h>
1f3433
+#include <sys/timerfd.h>
1f3433
 
1f3433
 #define FRAGMENT_BIT (1 << 31)
1f3433
 
1f3433
+#define RESPONSE_TIMEOUT 15
1f3433
+#define MAX_TIMEOUT_RETRY 3
1f3433
+
1f3433
 struct gpm_ctx {
1f3433
     pthread_mutex_t lock;
1f3433
     int fd;
1f3433
@@ -20,6 +26,9 @@ struct gpm_ctx {
1f3433
     gid_t gid;
1f3433
 
1f3433
     int next_xid;
1f3433
+
1f3433
+    int epollfd;
1f3433
+    int timerfd;
1f3433
 };
1f3433
 
1f3433
 /* a single global struct is not particularly efficient,
1f3433
@@ -39,6 +48,8 @@ static void gpm_init_once(void)
1f3433
     pthread_mutex_init(&gpm_global_ctx.lock, &attr);
1f3433
 
1f3433
     gpm_global_ctx.fd = -1;
1f3433
+    gpm_global_ctx.epollfd = -1;
1f3433
+    gpm_global_ctx.timerfd = -1;
1f3433
 
1f3433
     seedp = time(NULL) + getpid() + pthread_self();
1f3433
     gpm_global_ctx.next_xid = rand_r(&seedp);
1f3433
@@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
1f3433
     struct sockaddr_un addr = {0};
1f3433
     char name[PATH_MAX];
1f3433
     int ret;
1f3433
+    unsigned flags;
1f3433
     int fd = -1;
1f3433
 
1f3433
     ret = get_pipe_name(name);
1f3433
@@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
1f3433
         goto done;
1f3433
     }
1f3433
 
1f3433
+    ret = fcntl(fd, F_GETFD, &flags);
1f3433
+    if (ret != 0) {
1f3433
+        ret = errno;
1f3433
+        goto done;
1f3433
+    }
1f3433
+
1f3433
+    ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
1f3433
+    if (ret != 0) {
1f3433
+        ret = errno;
1f3433
+        goto done;
1f3433
+    }
1f3433
+
1f3433
     ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
1f3433
     if (ret == -1) {
1f3433
         ret = errno;
1f3433
@@ -163,6 +187,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
1f3433
     return pthread_mutex_unlock(&gpmctx->lock);
1f3433
 }
1f3433
 
1f3433
+static void gpm_timer_close(struct gpm_ctx *gpmctx) {
1f3433
+    if (gpmctx->timerfd < 0) {
1f3433
+        return;
1f3433
+    }
1f3433
+
1f3433
+    close(gpmctx->timerfd);
1f3433
+    gpmctx->timerfd = -1;
1f3433
+}
1f3433
+
1f3433
+static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
1f3433
+    int ret;
1f3433
+    struct itimerspec its;
1f3433
+
1f3433
+    if (gpmctx->timerfd >= 0) {
1f3433
+        gpm_timer_close(gpmctx);
1f3433
+    }
1f3433
+
1f3433
+    gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
1f3433
+    if (gpmctx->timerfd < 0) {
1f3433
+        return errno;
1f3433
+    }
1f3433
+
1f3433
+    its.it_interval.tv_sec = timeout_seconds;
1f3433
+    its.it_interval.tv_nsec = 0;
1f3433
+    its.it_value.tv_sec = timeout_seconds;
1f3433
+    its.it_value.tv_nsec = 0;
1f3433
+
1f3433
+    ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
1f3433
+    if (ret) {
1f3433
+        ret = errno;
1f3433
+        gpm_timer_close(gpmctx);
1f3433
+        return ret;
1f3433
+    }
1f3433
+
1f3433
+    return 0;
1f3433
+}
1f3433
+
1f3433
+static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
1f3433
+    if (gpmctx->epollfd < 0) {
1f3433
+        return;
1f3433
+    }
1f3433
+
1f3433
+    close(gpmctx->epollfd);
1f3433
+    gpmctx->epollfd = -1;
1f3433
+}
1f3433
+
1f3433
+static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
1f3433
+    struct epoll_event ev;
1f3433
+    int ret;
1f3433
+
1f3433
+    if (gpmctx->epollfd >= 0) {
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+    }
1f3433
+
1f3433
+    gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
1f3433
+    if (gpmctx->epollfd == -1) {
1f3433
+        return errno;
1f3433
+    }
1f3433
+
1f3433
+    /* Add timer */
1f3433
+    ev.events = EPOLLIN;
1f3433
+    ev.data.fd = gpmctx->timerfd;
1f3433
+    ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev;;
1f3433
+    if (ret == -1) {
1f3433
+        ret = errno;
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+        return ret;
1f3433
+    }
1f3433
+
1f3433
+    return ret;
1f3433
+}
1f3433
+
1f3433
+static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
1f3433
+    int ret;
1f3433
+    int epoll_ret;
1f3433
+    struct epoll_event ev;
1f3433
+    struct epoll_event events[2];
1f3433
+    uint64_t timer_read;
1f3433
+
1f3433
+    if (gpmctx->epollfd < 0) {
1f3433
+        ret = gpm_epoll_setup(gpmctx);
1f3433
+        if (ret)
1f3433
+            return ret;
1f3433
+    }
1f3433
+
1f3433
+    ev.events = event_flags;
1f3433
+    ev.data.fd = gpmctx->fd;
1f3433
+    epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev;;
1f3433
+    if (epoll_ret == -1) {
1f3433
+        ret = errno;
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+        return ret;
1f3433
+    }
1f3433
+
1f3433
+    do {
1f3433
+        epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
1f3433
+    } while (epoll_ret < 0 && errno == EINTR);
1f3433
+
1f3433
+    if (epoll_ret < 0) {
1f3433
+        /* Error while waiting that isn't EINTR */
1f3433
+        ret = errno;
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+    } else if (epoll_ret == 0) {
1f3433
+        /* Shouldn't happen as timeout == -1; treat it like a timeout
1f3433
+         * occurred. */
1f3433
+        ret = ETIMEDOUT;
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+    } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
1f3433
+        /* Got an event which is only our timer */
1f3433
+        ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
1f3433
+        if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
1f3433
+            /* In the case when reading from the timer failed, don't hide the
1f3433
+             * timer error behind ETIMEDOUT such that it isn't retried */
1f3433
+            ret = errno;
1f3433
+        } else {
1f3433
+            /* If ret == 0, then we definitely timed out. Else, if ret == -1
1f3433
+             * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
1f3433
+             * edge case where epoll thinks the timer can be read, but it
1f3433
+             * is blocking more; treat it like a TIMEOUT and retry, as
1f3433
+             * nothing around us would handle EAGAIN from timer and retry
1f3433
+             * it. */
1f3433
+            ret = ETIMEDOUT;
1f3433
+        }
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+    } else {
1f3433
+        /* If ret == 2, then we ignore the timerfd; that way if the next
1f3433
+         * operation cannot be performed immediately, we timeout and retry.
1f3433
+         * If ret == 1 and data.fd == gpmctx->fd, return 0. */
1f3433
+        ret = 0;
1f3433
+    }
1f3433
+
1f3433
+    epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
1f3433
+    if (epoll_ret == -1) {
1f3433
+        /* If we previously had an error, expose that error instead of
1f3433
+         * clobbering it with errno; else if no error, then assume it is
1f3433
+         * better to notify of the error deleting the event than it is
1f3433
+         * to continue. */
1f3433
+        if (ret == 0)
1f3433
+            ret = errno;
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
+    }
1f3433
+
1f3433
+    return ret;
1f3433
+}
1f3433
+
1f3433
+static int gpm_retry_socket(struct gpm_ctx *gpmctx)
1f3433
+{
1f3433
+    gpm_epoll_close(gpmctx);
1f3433
+    gpm_close_socket(gpmctx);
1f3433
+    return gpm_open_socket(gpmctx);
1f3433
+}
1f3433
+
1f3433
 /* must be called after the lock has been grabbed */
1f3433
 static int gpm_send_buffer(struct gpm_ctx *gpmctx,
1f3433
                            char *buffer, uint32_t length)
1f3433
@@ -183,8 +359,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
1f3433
     retry = false;
1f3433
     do {
1f3433
         do {
1f3433
+            ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
1f3433
+            if (ret != 0) {
1f3433
+                goto done;
1f3433
+            }
1f3433
+
1f3433
             ret = 0;
1f3433
-            wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
1f3433
+            wn = write(gpmctx->fd, &size, sizeof(uint32_t));
1f3433
             if (wn == -1) {
1f3433
                 ret = errno;
1f3433
             }
1f3433
@@ -192,8 +373,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
1f3433
         if (wn != 4) {
1f3433
             /* reopen and retry once */
1f3433
             if (retry == false) {
1f3433
-                gpm_close_socket(gpmctx);
1f3433
-                ret = gpm_open_socket(gpmctx);
1f3433
+                ret = gpm_retry_socket(gpmctx);
1f3433
                 if (ret == 0) {
1f3433
                     retry = true;
1f3433
                     continue;
1f3433
@@ -208,9 +388,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
1f3433
 
1f3433
     pos = 0;
1f3433
     while (length > pos) {
1f3433
-        wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
1f3433
+        ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
1f3433
+        if (ret) {
1f3433
+            goto done;
1f3433
+        }
1f3433
+
1f3433
+        wn = write(gpmctx->fd, buffer + pos, length - pos);
1f3433
         if (wn == -1) {
1f3433
-            if (errno == EINTR) {
1f3433
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
1f3433
                 continue;
1f3433
             }
1f3433
             ret = errno;
1f3433
@@ -231,7 +416,7 @@ done:
1f3433
 
1f3433
 /* must be called after the lock has been grabbed */
1f3433
 static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
1f3433
-                           char *buffer, uint32_t *length)
1f3433
+                           char **buffer, uint32_t *length)
1f3433
 {
1f3433
     uint32_t size;
1f3433
     ssize_t rn;
1f3433
@@ -239,6 +424,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
1f3433
     int ret;
1f3433
 
1f3433
     do {
1f3433
+        ret = gpm_epoll_wait(gpmctx, EPOLLIN);
1f3433
+        if (ret) {
1f3433
+            goto done;
1f3433
+        }
1f3433
+
1f3433
         ret = 0;
1f3433
         rn = read(gpmctx->fd, &size, sizeof(uint32_t));
1f3433
         if (rn == -1) {
1f3433
@@ -258,11 +448,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
1f3433
         goto done;
1f3433
     }
1f3433
 
1f3433
+    *buffer = malloc(*length);
1f3433
+    if (*buffer == NULL) {
1f3433
+        ret = ENOMEM;
1f3433
+        goto done;
1f3433
+    }
1f3433
+
1f3433
     pos = 0;
1f3433
     while (*length > pos) {
1f3433
-        rn = read(gpmctx->fd, buffer + pos, *length - pos);
1f3433
+        ret = gpm_epoll_wait(gpmctx, EPOLLIN);
1f3433
+        if (ret) {
1f3433
+            goto done;
1f3433
+        }
1f3433
+
1f3433
+        rn = read(gpmctx->fd, *buffer + pos, *length - pos);
1f3433
         if (rn == -1) {
1f3433
-            if (errno == EINTR) {
1f3433
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
1f3433
                 continue;
1f3433
             }
1f3433
             ret = errno;
1f3433
@@ -281,6 +482,7 @@ done:
1f3433
     if (ret) {
1f3433
         /* on errors we can only close the fd and return */
1f3433
         gpm_close_socket(gpmctx);
1f3433
+        gpm_epoll_close(gpmctx);
1f3433
     }
1f3433
     return ret;
1f3433
 }
1f3433
@@ -309,6 +511,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
1f3433
     return &gpm_global_ctx;
1f3433
 }
1f3433
 
1f3433
+static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
1f3433
+                              uint32_t send_length, char** recv_buffer,
1f3433
+                              uint32_t *recv_length)
1f3433
+{
1f3433
+    int ret;
1f3433
+    int retry_count;
1f3433
+
1f3433
+    /* setup timer */
1f3433
+    ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
1f3433
+    if (ret)
1f3433
+        return ret;
1f3433
+
1f3433
+    for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
1f3433
+        /* send to proxy */
1f3433
+        ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
1f3433
+
1f3433
+        if (ret == 0) {
1f3433
+            /* No error, continue to recv */
1f3433
+        } else if (ret == ETIMEDOUT) {
1f3433
+            /* Close and reopen socket before trying again */
1f3433
+            ret = gpm_retry_socket(gpmctx);
1f3433
+            if (ret != 0)
1f3433
+                return ret;
1f3433
+            ret = ETIMEDOUT;
1f3433
+
1f3433
+            /* RETRY entire send */
1f3433
+            continue;
1f3433
+        } else {
1f3433
+            /* Other error */
1f3433
+            return ret;
1f3433
+        }
1f3433
+
1f3433
+        /* receive answer */
1f3433
+        ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
1f3433
+        if (ret == 0) {
1f3433
+            /* No error */
1f3433
+            break;
1f3433
+        } else if (ret == ETIMEDOUT) {
1f3433
+            /* Close and reopen socket before trying again */
1f3433
+            ret = gpm_retry_socket(gpmctx);
1f3433
+
1f3433
+            /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
1f3433
+            free(recv_buffer);
1f3433
+            recv_buffer = NULL;
1f3433
+
1f3433
+            if (ret != 0)
1f3433
+                return ret;
1f3433
+            ret = ETIMEDOUT;
1f3433
+        } else {
1f3433
+            /* Other error */
1f3433
+            return ret;
1f3433
+        }
1f3433
+    }
1f3433
+
1f3433
+    return ret;
1f3433
+}
1f3433
+
1f3433
 OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
1f3433
                              gss_buffer_t buffer)
1f3433
 {
1f3433
@@ -399,15 +658,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
1f3433
     gp_rpc_msg msg;
1f3433
     XDR xdr_call_ctx;
1f3433
     XDR xdr_reply_ctx;
1f3433
-    char buffer[MAX_RPC_SIZE];
1f3433
-    uint32_t length;
1f3433
+    char *send_buffer = NULL;
1f3433
+    char *recv_buffer = NULL;
1f3433
+    uint32_t send_length;
1f3433
+    uint32_t recv_length;
1f3433
     uint32_t xid;
1f3433
     bool xdrok;
1f3433
     bool sockgrab = false;
1f3433
     int ret;
1f3433
 
1f3433
-    xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
1f3433
-    xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
1f3433
+    send_buffer = malloc(MAX_RPC_SIZE);
1f3433
+    if (send_buffer == NULL)
1f3433
+        return ENOMEM;
1f3433
+
1f3433
+    xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
1f3433
 
1f3433
     memset(&msg, 0, sizeof(gp_rpc_msg));
1f3433
     msg.header.type = GP_RPC_CALL;
1f3433
@@ -450,22 +714,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
1f3433
         goto done;
1f3433
     }
1f3433
 
1f3433
-    /* send to proxy */
1f3433
-    ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
1f3433
-    if (ret) {
1f3433
-        goto done;
1f3433
-    }
1f3433
+    /* set send_length */
1f3433
+    send_length = xdr_getpos(&xdr_call_ctx);
1f3433
 
1f3433
-    /* receive answer */
1f3433
-    ret = gpm_recv_buffer(gpmctx, buffer, &length);
1f3433
-    if (ret) {
1f3433
+    /* Send request, receive response with timeout */
1f3433
+    ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
1f3433
+                             &recv_length);
1f3433
+    if (ret)
1f3433
         goto done;
1f3433
-    }
1f3433
 
1f3433
     /* release the lock */
1f3433
     gpm_release_sock(gpmctx);
1f3433
     sockgrab = false;
1f3433
 
1f3433
+    /* Create the reply context */
1f3433
+    xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
1f3433
+
1f3433
     /* decode header */
1f3433
     memset(&msg, 0, sizeof(gp_rpc_msg));
1f3433
     xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg;;
1f3433
@@ -489,12 +753,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
1f3433
     }
1f3433
 
1f3433
 done:
1f3433
+    gpm_timer_close(gpmctx);
1f3433
+    gpm_epoll_close(gpmctx);
1f3433
+
1f3433
     if (sockgrab) {
1f3433
         gpm_release_sock(gpmctx);
1f3433
     }
1f3433
     xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg;;
1f3433
     xdr_destroy(&xdr_call_ctx);
1f3433
-    xdr_destroy(&xdr_reply_ctx);
1f3433
+
1f3433
+    if (recv_buffer != NULL)
1f3433
+        xdr_destroy(&xdr_reply_ctx);
1f3433
+
1f3433
+    free(send_buffer);
1f3433
+    free(recv_buffer);
1f3433
+
1f3433
     return ret;
1f3433
 }
1f3433