472fdf
From 1962e6128a4d86a7c54977577e1e4224cadbb5f7 Mon Sep 17 00:00:00 2001
472fdf
From: Alexander Scheel <ascheel@redhat.com>
472fdf
Date: Wed, 2 Aug 2017 15:11:49 -0400
472fdf
Subject: [PATCH] [client] Switch to non-blocking sockets
472fdf
472fdf
Switch the gssproxy client library to non-blocking sockets, allowing
472fdf
for timeout and retry operations.  The client will automatically retry
472fdf
both send() and recv() operations three times on ETIMEDOUT.  If the
472fdf
combined send() and recv() hit the three time limit, ETIMEDOUT will be
472fdf
exposed to the caller in the minor status.
472fdf
472fdf
Signed-off-by: Alexander Scheel <ascheel@redhat.com>
472fdf
Reviewed-by: Simo Sorce <simo@redhat.com>
472fdf
[rharwood@redhat.com: commit message cleanups, rebased]
472fdf
Reviewed-by: Robbie Harwood <rharwood@redhat.com>
472fdf
(cherry picked from commit d035646c8feb0b78f0c157580ca02c46cd00dd7e)
472fdf
---
472fdf
 proxy/src/client/gpm_common.c | 317 +++++++++++++++++++++++++++++++---
472fdf
 1 file changed, 295 insertions(+), 22 deletions(-)
472fdf
472fdf
diff --git a/proxy/src/client/gpm_common.c b/proxy/src/client/gpm_common.c
472fdf
index 2133618..dba23a6 100644
472fdf
--- a/proxy/src/client/gpm_common.c
472fdf
+++ b/proxy/src/client/gpm_common.c
472fdf
@@ -7,9 +7,15 @@
472fdf
 #include <stdlib.h>
472fdf
 #include <time.h>
472fdf
 #include <pthread.h>
472fdf
+#include <sys/epoll.h>
472fdf
+#include <fcntl.h>
472fdf
+#include <sys/timerfd.h>
472fdf
 
472fdf
 #define FRAGMENT_BIT (1 << 31)
472fdf
 
472fdf
+#define RESPONSE_TIMEOUT 15
472fdf
+#define MAX_TIMEOUT_RETRY 3
472fdf
+
472fdf
 struct gpm_ctx {
472fdf
     pthread_mutex_t lock;
472fdf
     int fd;
472fdf
@@ -20,6 +26,9 @@ struct gpm_ctx {
472fdf
     gid_t gid;
472fdf
 
472fdf
     int next_xid;
472fdf
+
472fdf
+    int epollfd;
472fdf
+    int timerfd;
472fdf
 };
472fdf
 
472fdf
 /* a single global struct is not particularly efficient,
472fdf
@@ -39,6 +48,8 @@ static void gpm_init_once(void)
472fdf
     pthread_mutex_init(&gpm_global_ctx.lock, &attr);
472fdf
 
472fdf
     gpm_global_ctx.fd = -1;
472fdf
+    gpm_global_ctx.epollfd = -1;
472fdf
+    gpm_global_ctx.timerfd = -1;
472fdf
 
472fdf
     seedp = time(NULL) + getpid() + pthread_self();
472fdf
     gpm_global_ctx.next_xid = rand_r(&seedp);
472fdf
@@ -69,6 +80,7 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
472fdf
     struct sockaddr_un addr = {0};
472fdf
     char name[PATH_MAX];
472fdf
     int ret;
472fdf
+    unsigned flags;
472fdf
     int fd = -1;
472fdf
 
472fdf
     ret = get_pipe_name(name);
472fdf
@@ -86,6 +98,18 @@ static int gpm_open_socket(struct gpm_ctx *gpmctx)
472fdf
         goto done;
472fdf
     }
472fdf
 
472fdf
+    ret = fcntl(fd, F_GETFD, &flags);
472fdf
+    if (ret != 0) {
472fdf
+        ret = errno;
472fdf
+        goto done;
472fdf
+    }
472fdf
+
472fdf
+    ret = fcntl(fd, F_SETFD, flags | O_NONBLOCK);
472fdf
+    if (ret != 0) {
472fdf
+        ret = errno;
472fdf
+        goto done;
472fdf
+    }
472fdf
+
472fdf
     ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
472fdf
     if (ret == -1) {
472fdf
         ret = errno;
472fdf
@@ -163,6 +187,158 @@ static int gpm_release_sock(struct gpm_ctx *gpmctx)
472fdf
     return pthread_mutex_unlock(&gpmctx->lock);
472fdf
 }
472fdf
 
472fdf
+static void gpm_timer_close(struct gpm_ctx *gpmctx) {
472fdf
+    if (gpmctx->timerfd < 0) {
472fdf
+        return;
472fdf
+    }
472fdf
+
472fdf
+    close(gpmctx->timerfd);
472fdf
+    gpmctx->timerfd = -1;
472fdf
+}
472fdf
+
472fdf
+static int gpm_timer_setup(struct gpm_ctx *gpmctx, int timeout_seconds) {
472fdf
+    int ret;
472fdf
+    struct itimerspec its;
472fdf
+
472fdf
+    if (gpmctx->timerfd >= 0) {
472fdf
+        gpm_timer_close(gpmctx);
472fdf
+    }
472fdf
+
472fdf
+    gpmctx->timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC);
472fdf
+    if (gpmctx->timerfd < 0) {
472fdf
+        return errno;
472fdf
+    }
472fdf
+
472fdf
+    its.it_interval.tv_sec = timeout_seconds;
472fdf
+    its.it_interval.tv_nsec = 0;
472fdf
+    its.it_value.tv_sec = timeout_seconds;
472fdf
+    its.it_value.tv_nsec = 0;
472fdf
+
472fdf
+    ret = timerfd_settime(gpmctx->timerfd, 0, &its, NULL);
472fdf
+    if (ret) {
472fdf
+        ret = errno;
472fdf
+        gpm_timer_close(gpmctx);
472fdf
+        return ret;
472fdf
+    }
472fdf
+
472fdf
+    return 0;
472fdf
+}
472fdf
+
472fdf
+static void gpm_epoll_close(struct gpm_ctx *gpmctx) {
472fdf
+    if (gpmctx->epollfd < 0) {
472fdf
+        return;
472fdf
+    }
472fdf
+
472fdf
+    close(gpmctx->epollfd);
472fdf
+    gpmctx->epollfd = -1;
472fdf
+}
472fdf
+
472fdf
+static int gpm_epoll_setup(struct gpm_ctx *gpmctx) {
472fdf
+    struct epoll_event ev;
472fdf
+    int ret;
472fdf
+
472fdf
+    if (gpmctx->epollfd >= 0) {
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+    }
472fdf
+
472fdf
+    gpmctx->epollfd = epoll_create1(EPOLL_CLOEXEC);
472fdf
+    if (gpmctx->epollfd == -1) {
472fdf
+        return errno;
472fdf
+    }
472fdf
+
472fdf
+    /* Add timer */
472fdf
+    ev.events = EPOLLIN;
472fdf
+    ev.data.fd = gpmctx->timerfd;
472fdf
+    ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->timerfd, &ev;;
472fdf
+    if (ret == -1) {
472fdf
+        ret = errno;
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+        return ret;
472fdf
+    }
472fdf
+
472fdf
+    return ret;
472fdf
+}
472fdf
+
472fdf
+static int gpm_epoll_wait(struct gpm_ctx *gpmctx, uint32_t event_flags) {
472fdf
+    int ret;
472fdf
+    int epoll_ret;
472fdf
+    struct epoll_event ev;
472fdf
+    struct epoll_event events[2];
472fdf
+    uint64_t timer_read;
472fdf
+
472fdf
+    if (gpmctx->epollfd < 0) {
472fdf
+        ret = gpm_epoll_setup(gpmctx);
472fdf
+        if (ret)
472fdf
+            return ret;
472fdf
+    }
472fdf
+
472fdf
+    ev.events = event_flags;
472fdf
+    ev.data.fd = gpmctx->fd;
472fdf
+    epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_ADD, gpmctx->fd, &ev;;
472fdf
+    if (epoll_ret == -1) {
472fdf
+        ret = errno;
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+        return ret;
472fdf
+    }
472fdf
+
472fdf
+    do {
472fdf
+        epoll_ret = epoll_wait(gpmctx->epollfd, events, 2, -1);
472fdf
+    } while (epoll_ret < 0 && errno == EINTR);
472fdf
+
472fdf
+    if (epoll_ret < 0) {
472fdf
+        /* Error while waiting that isn't EINTR */
472fdf
+        ret = errno;
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+    } else if (epoll_ret == 0) {
472fdf
+        /* Shouldn't happen as timeout == -1; treat it like a timeout
472fdf
+         * occurred. */
472fdf
+        ret = ETIMEDOUT;
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+    } else if (epoll_ret == 1 && events[0].data.fd == gpmctx->timerfd) {
472fdf
+        /* Got an event which is only our timer */
472fdf
+        ret = read(gpmctx->timerfd, &timer_read, sizeof(uint64_t));
472fdf
+        if (ret == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
472fdf
+            /* In the case when reading from the timer failed, don't hide the
472fdf
+             * timer error behind ETIMEDOUT such that it isn't retried */
472fdf
+            ret = errno;
472fdf
+        } else {
472fdf
+            /* If ret == 0, then we definitely timed out. Else, if ret == -1
472fdf
+             * and errno == EAGAIN or errno == EWOULDBLOCK, we're in a weird
472fdf
+             * edge case where epoll thinks the timer can be read, but it
472fdf
+             * is blocking more; treat it like a TIMEOUT and retry, as
472fdf
+             * nothing around us would handle EAGAIN from timer and retry
472fdf
+             * it. */
472fdf
+            ret = ETIMEDOUT;
472fdf
+        }
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+    } else {
472fdf
+        /* If ret == 2, then we ignore the timerfd; that way if the next
472fdf
+         * operation cannot be performed immediately, we timeout and retry.
472fdf
+         * If ret == 1 and data.fd == gpmctx->fd, return 0. */
472fdf
+        ret = 0;
472fdf
+    }
472fdf
+
472fdf
+    epoll_ret = epoll_ctl(gpmctx->epollfd, EPOLL_CTL_DEL, gpmctx->fd, NULL);
472fdf
+    if (epoll_ret == -1) {
472fdf
+        /* If we previously had an error, expose that error instead of
472fdf
+         * clobbering it with errno; else if no error, then assume it is
472fdf
+         * better to notify of the error deleting the event than it is
472fdf
+         * to continue. */
472fdf
+        if (ret == 0)
472fdf
+            ret = errno;
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
+    }
472fdf
+
472fdf
+    return ret;
472fdf
+}
472fdf
+
472fdf
+static int gpm_retry_socket(struct gpm_ctx *gpmctx)
472fdf
+{
472fdf
+    gpm_epoll_close(gpmctx);
472fdf
+    gpm_close_socket(gpmctx);
472fdf
+    return gpm_open_socket(gpmctx);
472fdf
+}
472fdf
+
472fdf
 /* must be called after the lock has been grabbed */
472fdf
 static int gpm_send_buffer(struct gpm_ctx *gpmctx,
472fdf
                            char *buffer, uint32_t length)
472fdf
@@ -183,8 +359,13 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
472fdf
     retry = false;
472fdf
     do {
472fdf
         do {
472fdf
+            ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
472fdf
+            if (ret != 0) {
472fdf
+                goto done;
472fdf
+            }
472fdf
+
472fdf
             ret = 0;
472fdf
-            wn = send(gpmctx->fd, &size, sizeof(uint32_t), MSG_NOSIGNAL);
472fdf
+            wn = write(gpmctx->fd, &size, sizeof(uint32_t));
472fdf
             if (wn == -1) {
472fdf
                 ret = errno;
472fdf
             }
472fdf
@@ -192,8 +373,7 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
472fdf
         if (wn != 4) {
472fdf
             /* reopen and retry once */
472fdf
             if (retry == false) {
472fdf
-                gpm_close_socket(gpmctx);
472fdf
-                ret = gpm_open_socket(gpmctx);
472fdf
+                ret = gpm_retry_socket(gpmctx);
472fdf
                 if (ret == 0) {
472fdf
                     retry = true;
472fdf
                     continue;
472fdf
@@ -208,9 +388,14 @@ static int gpm_send_buffer(struct gpm_ctx *gpmctx,
472fdf
 
472fdf
     pos = 0;
472fdf
     while (length > pos) {
472fdf
-        wn = send(gpmctx->fd, buffer + pos, length - pos, MSG_NOSIGNAL);
472fdf
+        ret = gpm_epoll_wait(gpmctx, EPOLLOUT);
472fdf
+        if (ret) {
472fdf
+            goto done;
472fdf
+        }
472fdf
+
472fdf
+        wn = write(gpmctx->fd, buffer + pos, length - pos);
472fdf
         if (wn == -1) {
472fdf
-            if (errno == EINTR) {
472fdf
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
472fdf
                 continue;
472fdf
             }
472fdf
             ret = errno;
472fdf
@@ -231,7 +416,7 @@ done:
472fdf
 
472fdf
 /* must be called after the lock has been grabbed */
472fdf
 static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
472fdf
-                           char *buffer, uint32_t *length)
472fdf
+                           char **buffer, uint32_t *length)
472fdf
 {
472fdf
     uint32_t size;
472fdf
     ssize_t rn;
472fdf
@@ -239,6 +424,11 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
472fdf
     int ret;
472fdf
 
472fdf
     do {
472fdf
+        ret = gpm_epoll_wait(gpmctx, EPOLLIN);
472fdf
+        if (ret) {
472fdf
+            goto done;
472fdf
+        }
472fdf
+
472fdf
         ret = 0;
472fdf
         rn = read(gpmctx->fd, &size, sizeof(uint32_t));
472fdf
         if (rn == -1) {
472fdf
@@ -258,11 +448,22 @@ static int gpm_recv_buffer(struct gpm_ctx *gpmctx,
472fdf
         goto done;
472fdf
     }
472fdf
 
472fdf
+    *buffer = malloc(*length);
472fdf
+    if (*buffer == NULL) {
472fdf
+        ret = ENOMEM;
472fdf
+        goto done;
472fdf
+    }
472fdf
+
472fdf
     pos = 0;
472fdf
     while (*length > pos) {
472fdf
-        rn = read(gpmctx->fd, buffer + pos, *length - pos);
472fdf
+        ret = gpm_epoll_wait(gpmctx, EPOLLIN);
472fdf
+        if (ret) {
472fdf
+            goto done;
472fdf
+        }
472fdf
+
472fdf
+        rn = read(gpmctx->fd, *buffer + pos, *length - pos);
472fdf
         if (rn == -1) {
472fdf
-            if (errno == EINTR) {
472fdf
+            if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
472fdf
                 continue;
472fdf
             }
472fdf
             ret = errno;
472fdf
@@ -281,6 +482,7 @@ done:
472fdf
     if (ret) {
472fdf
         /* on errors we can only close the fd and return */
472fdf
         gpm_close_socket(gpmctx);
472fdf
+        gpm_epoll_close(gpmctx);
472fdf
     }
472fdf
     return ret;
472fdf
 }
472fdf
@@ -309,6 +511,63 @@ static struct gpm_ctx *gpm_get_ctx(void)
472fdf
     return &gpm_global_ctx;
472fdf
 }
472fdf
 
472fdf
+static int gpm_send_recv_loop(struct gpm_ctx *gpmctx, char *send_buffer,
472fdf
+                              uint32_t send_length, char** recv_buffer,
472fdf
+                              uint32_t *recv_length)
472fdf
+{
472fdf
+    int ret;
472fdf
+    int retry_count;
472fdf
+
472fdf
+    /* setup timer */
472fdf
+    ret = gpm_timer_setup(gpmctx, RESPONSE_TIMEOUT);
472fdf
+    if (ret)
472fdf
+        return ret;
472fdf
+
472fdf
+    for (retry_count = 0; retry_count < MAX_TIMEOUT_RETRY; retry_count++) {
472fdf
+        /* send to proxy */
472fdf
+        ret = gpm_send_buffer(gpmctx, send_buffer, send_length);
472fdf
+
472fdf
+        if (ret == 0) {
472fdf
+            /* No error, continue to recv */
472fdf
+        } else if (ret == ETIMEDOUT) {
472fdf
+            /* Close and reopen socket before trying again */
472fdf
+            ret = gpm_retry_socket(gpmctx);
472fdf
+            if (ret != 0)
472fdf
+                return ret;
472fdf
+            ret = ETIMEDOUT;
472fdf
+
472fdf
+            /* RETRY entire send */
472fdf
+            continue;
472fdf
+        } else {
472fdf
+            /* Other error */
472fdf
+            return ret;
472fdf
+        }
472fdf
+
472fdf
+        /* receive answer */
472fdf
+        ret = gpm_recv_buffer(gpmctx, recv_buffer, recv_length);
472fdf
+        if (ret == 0) {
472fdf
+            /* No error */
472fdf
+            break;
472fdf
+        } else if (ret == ETIMEDOUT) {
472fdf
+            /* Close and reopen socket before trying again */
472fdf
+            ret = gpm_retry_socket(gpmctx);
472fdf
+
472fdf
+            /* Free buffer and set it to NULL to prevent free(xdr_reply_ctx) */
472fdf
+            free(recv_buffer);
472fdf
+            recv_buffer = NULL;
472fdf
+
472fdf
+            if (ret != 0)
472fdf
+                return ret;
472fdf
+            ret = ETIMEDOUT;
472fdf
+        } else {
472fdf
+            /* Other error */
472fdf
+            return ret;
472fdf
+        }
472fdf
+    }
472fdf
+
472fdf
+    return ret;
472fdf
+}
472fdf
+
472fdf
 OM_uint32 gpm_release_buffer(OM_uint32 *minor_status,
472fdf
                              gss_buffer_t buffer)
472fdf
 {
472fdf
@@ -399,15 +658,20 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
472fdf
     gp_rpc_msg msg;
472fdf
     XDR xdr_call_ctx;
472fdf
     XDR xdr_reply_ctx;
472fdf
-    char buffer[MAX_RPC_SIZE];
472fdf
-    uint32_t length;
472fdf
+    char *send_buffer = NULL;
472fdf
+    char *recv_buffer = NULL;
472fdf
+    uint32_t send_length;
472fdf
+    uint32_t recv_length;
472fdf
     uint32_t xid;
472fdf
     bool xdrok;
472fdf
     bool sockgrab = false;
472fdf
     int ret;
472fdf
 
472fdf
-    xdrmem_create(&xdr_call_ctx, buffer, MAX_RPC_SIZE, XDR_ENCODE);
472fdf
-    xdrmem_create(&xdr_reply_ctx, buffer, MAX_RPC_SIZE, XDR_DECODE);
472fdf
+    send_buffer = malloc(MAX_RPC_SIZE);
472fdf
+    if (send_buffer == NULL)
472fdf
+        return ENOMEM;
472fdf
+
472fdf
+    xdrmem_create(&xdr_call_ctx, send_buffer, MAX_RPC_SIZE, XDR_ENCODE);
472fdf
 
472fdf
     memset(&msg, 0, sizeof(gp_rpc_msg));
472fdf
     msg.header.type = GP_RPC_CALL;
472fdf
@@ -450,22 +714,22 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
472fdf
         goto done;
472fdf
     }
472fdf
 
472fdf
-    /* send to proxy */
472fdf
-    ret = gpm_send_buffer(gpmctx, buffer, xdr_getpos(&xdr_call_ctx));
472fdf
-    if (ret) {
472fdf
-        goto done;
472fdf
-    }
472fdf
+    /* set send_length */
472fdf
+    send_length = xdr_getpos(&xdr_call_ctx);
472fdf
 
472fdf
-    /* receive answer */
472fdf
-    ret = gpm_recv_buffer(gpmctx, buffer, &length);
472fdf
-    if (ret) {
472fdf
+    /* Send request, receive response with timeout */
472fdf
+    ret = gpm_send_recv_loop(gpmctx, send_buffer, send_length, &recv_buffer,
472fdf
+                             &recv_length);
472fdf
+    if (ret)
472fdf
         goto done;
472fdf
-    }
472fdf
 
472fdf
     /* release the lock */
472fdf
     gpm_release_sock(gpmctx);
472fdf
     sockgrab = false;
472fdf
 
472fdf
+    /* Create the reply context */
472fdf
+    xdrmem_create(&xdr_reply_ctx, recv_buffer, recv_length, XDR_DECODE);
472fdf
+
472fdf
     /* decode header */
472fdf
     memset(&msg, 0, sizeof(gp_rpc_msg));
472fdf
     xdrok = xdr_gp_rpc_msg(&xdr_reply_ctx, &msg;;
472fdf
@@ -489,12 +753,21 @@ int gpm_make_call(int proc, union gp_rpc_arg *arg, union gp_rpc_res *res)
472fdf
     }
472fdf
 
472fdf
 done:
472fdf
+    gpm_timer_close(gpmctx);
472fdf
+    gpm_epoll_close(gpmctx);
472fdf
+
472fdf
     if (sockgrab) {
472fdf
         gpm_release_sock(gpmctx);
472fdf
     }
472fdf
     xdr_free((xdrproc_t)xdr_gp_rpc_msg, (char *)&msg;;
472fdf
     xdr_destroy(&xdr_call_ctx);
472fdf
-    xdr_destroy(&xdr_reply_ctx);
472fdf
+
472fdf
+    if (recv_buffer != NULL)
472fdf
+        xdr_destroy(&xdr_reply_ctx);
472fdf
+
472fdf
+    free(send_buffer);
472fdf
+    free(recv_buffer);
472fdf
+
472fdf
     return ret;
472fdf
 }
472fdf