Blob Blame History Raw
From de20f58e8ad6baeef2296544822816cfe9eabb4e Mon Sep 17 00:00:00 2001
From: Rajesh Joseph <rjoseph@redhat.com>
Date: Tue, 13 Dec 2016 15:28:42 +0530
Subject: [PATCH 249/257] socket: socket disconnect should wait for poller
 thread exit

When SSL is enabled or if "transport.socket.own-thread" option is set
then socket_poller is run as different thread. Currently during
disconnect or PARENT_DOWN scenario we don't wait for this thread
to terminate. PARENT_DOWN will disconnect the socket layer and
cleanup resources used by socket_poller.

Therefore before disconnect we should wait for poller thread to exit.

>Reviewed-on: http://review.gluster.org/16141
>Smoke: Gluster Build System <jenkins@build.gluster.org>
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
>Reviewed-by: Kaushal M <kaushal@redhat.com>
>Reviewed-by: Raghavendra Talur <rtalur@redhat.com>
>Reviewed-by: Raghavendra G <rgowdapp@redhat.com>

Change-Id: I71f984b47d260ffd979102f180a99a0bed29f0d6
BUG: 1398798
Signed-off-by: Rajesh Joseph <rjoseph@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/93555
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
Tested-by: Atin Mukherjee <amukherj@redhat.com>
---
 rpc/rpc-lib/src/rpc-clnt-ping.c                |  2 +-
 rpc/rpc-lib/src/rpc-clnt.c                     |  4 +-
 rpc/rpc-lib/src/rpc-transport.c                |  5 +-
 rpc/rpc-lib/src/rpc-transport.h                |  4 +-
 rpc/rpc-lib/src/rpcsvc.c                       |  2 +-
 rpc/rpc-transport/rdma/src/rdma.c              | 19 ++---
 rpc/rpc-transport/socket/src/socket.c          | 98 +++++++++++++++++++++-----
 rpc/rpc-transport/socket/src/socket.h          |  6 ++
 xlators/mgmt/glusterd/src/glusterd-handshake.c |  7 +-
 xlators/mgmt/glusterd/src/glusterd-utils.c     |  3 +-
 xlators/protocol/client/src/client-handshake.c |  4 +-
 xlators/protocol/server/src/server.c           |  3 +-
 12 files changed, 116 insertions(+), 41 deletions(-)

diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.c b/rpc/rpc-lib/src/rpc-clnt-ping.c
index a7ff866..14a1d7c 100644
--- a/rpc/rpc-lib/src/rpc-clnt-ping.c
+++ b/rpc/rpc-lib/src/rpc-clnt-ping.c
@@ -159,7 +159,7 @@ rpc_clnt_ping_timer_expired (void *rpc_ptr)
                         trans->peerinfo.identifier,
                         conn->ping_timeout);
 
-                rpc_transport_disconnect (conn->trans);
+                rpc_transport_disconnect (conn->trans, _gf_false);
         }
 
 out:
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index fe099f9..f24c934 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1855,7 +1855,7 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
         pthread_mutex_unlock (&conn->lock);
 
         if (trans) {
-                rpc_transport_disconnect (trans);
+                rpc_transport_disconnect (trans, _gf_true);
         }
 
         if (unref)
@@ -1914,7 +1914,7 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
         pthread_mutex_unlock (&conn->lock);
 
         if (trans) {
-                rpc_transport_disconnect (trans);
+                rpc_transport_disconnect (trans, _gf_true);
         }
         if (unref)
                 rpc_clnt_unref (rpc);
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index f3ebe2d..e62d94b 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -435,13 +435,14 @@ fail:
 
 
 int32_t
-rpc_transport_disconnect (rpc_transport_t *this)
+rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait)
 {
 	int32_t ret = -1;
 
 	GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
 
-	ret = this->ops->disconnect (this);
+        ret = this->ops->disconnect (this, wait);
+
 fail:
 	return ret;
 }
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 1d4b444..630729d 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -223,7 +223,7 @@ struct rpc_transport_ops {
                                    rpc_transport_reply_t *reply);
         int32_t (*connect)        (rpc_transport_t *this, int port);
         int32_t (*listen)         (rpc_transport_t *this);
-        int32_t (*disconnect)     (rpc_transport_t *this);
+        int32_t (*disconnect)     (rpc_transport_t *this, gf_boolean_t wait);
         int32_t (*get_peername)   (rpc_transport_t *this, char *hostname,
                                    int hostlen);
         int32_t (*get_peeraddr)   (rpc_transport_t *this, char *peeraddr,
@@ -247,7 +247,7 @@ int32_t
 rpc_transport_connect (rpc_transport_t *this, int port);
 
 int32_t
-rpc_transport_disconnect (rpc_transport_t *this);
+rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait);
 
 int32_t
 rpc_transport_destroy (rpc_transport_t *this);
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 2a2ca64..2d2a7ff 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1639,7 +1639,7 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
         ret = 0;
 out:
         if (!listener && trans) {
-                rpc_transport_disconnect (trans);
+                rpc_transport_disconnect (trans, _gf_true);
         }
 
         return ret;
diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c
index 551ac07..d2f04bd 100644
--- a/rpc/rpc-transport/rdma/src/rdma.c
+++ b/rpc/rpc-transport/rdma/src/rdma.c
@@ -51,7 +51,7 @@ static int32_t
 gf_rdma_teardown (rpc_transport_t *this);
 
 static int32_t
-gf_rdma_disconnect (rpc_transport_t *this);
+gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait);
 
 static void
 gf_rdma_cm_handle_disconnect (rpc_transport_t *this);
@@ -1209,7 +1209,7 @@ gf_rdma_cm_handle_connect_init (struct rdma_cm_event *event)
         }
 
         if (ret < 0) {
-                gf_rdma_disconnect (this);
+                gf_rdma_disconnect (this, _gf_false);
         }
 
         return ret;
@@ -3014,7 +3014,7 @@ gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
                         RDMA_MSG_WRITE_PEER_FAILED,
                         "sending request to peer (%s) failed",
                         this->peerinfo.identifier);
-                rpc_transport_disconnect (this);
+                rpc_transport_disconnect (this, _gf_false);
         }
 
 out:
@@ -3051,7 +3051,7 @@ gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
                         RDMA_MSG_WRITE_PEER_FAILED,
                         "sending request to peer (%s) failed",
                         this->peerinfo.identifier);
-                rpc_transport_disconnect (this);
+                rpc_transport_disconnect (this, _gf_false);
         }
 
 out:
@@ -4095,7 +4095,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc)
 
 out:
         if (ret == -1) {
-                rpc_transport_disconnect (peer->trans);
+                rpc_transport_disconnect (peer->trans, _gf_false);
         }
 
         return;
@@ -4216,7 +4216,8 @@ gf_rdma_recv_completion_proc (void *data)
                                 if (peer) {
                                         ibv_ack_cq_events (event_cq, num_wr);
                                         rpc_transport_unref (peer->trans);
-                                        rpc_transport_disconnect (peer->trans);
+                                        rpc_transport_disconnect (peer->trans,
+                                                                  _gf_false);
                                 }
 
                                 if (post) {
@@ -4292,7 +4293,7 @@ gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc)
         }
 
         if (peer) {
-                rpc_transport_disconnect (peer->trans);
+                rpc_transport_disconnect (peer->trans, _gf_false);
         }
 
         return;
@@ -4343,7 +4344,7 @@ gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer,
 
         ret = gf_rdma_pollin_notify (peer, post);
         if ((ret == -1) && (peer != NULL)) {
-                rpc_transport_disconnect (peer->trans);
+                rpc_transport_disconnect (peer->trans, _gf_false);
         }
 
 out:
@@ -4657,7 +4658,7 @@ gf_rdma_init (rpc_transport_t *this)
 
 
 static int32_t
-gf_rdma_disconnect (rpc_transport_t *this)
+gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait)
 {
         gf_rdma_private_t *priv = NULL;
         int32_t            ret  = 0;
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index ae551dc..da2f97f 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2346,7 +2346,7 @@ out:
         return ret;
 }
 
-static int socket_disconnect (rpc_transport_t *this);
+static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
 
 /* reads rpc_requests during pollin */
 static int
@@ -2377,7 +2377,7 @@ socket_event_handler (int fd, int idx, void *data,
                         EINPROGRESS or ENOENT, so nothing more to do, fail
                         reading/writing anything even if poll_in or poll_out
                         is set */
-                        ret = socket_disconnect (this);
+                        ret = socket_disconnect (this, _gf_false);
 
                         /* Force ret to be -1, as we are officially done with
                         this socket */
@@ -2426,6 +2426,13 @@ socket_poller (void *ctx)
          * conditionally
          */
         THIS = this->xl;
+        GF_REF_GET (priv);
+
+        if (priv->ot_state == OT_PLEASE_DIE) {
+                gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting "
+                        "because socket state is OT_PLEASE_DIE");
+                goto err;
+        }
 
         priv->ot_state = OT_RUNNING;
 
@@ -2496,6 +2503,13 @@ socket_poller (void *ctx)
 			break;
 		}
 
+                if (priv->ot_state == OT_PLEASE_DIE) {
+                        gf_log (this->name, GF_LOG_DEBUG,
+                                "OT_PLEASE_DIE on %p (exiting socket_poller)",
+                                this);
+                        break;
+                }
+
 		if (pfd[1].revents & POLL_MASK_INPUT) {
 			ret = socket_event_poll_in(this);
 			if (ret >= 0) {
@@ -2509,7 +2523,6 @@ socket_poller (void *ctx)
                                 gf_log (this->name, GF_LOG_TRACE,
                                         "OT_IDLE on %p (input request)",
                                         this);
-                                priv->ot_state = OT_IDLE;
                                 break;
                         }
 		}
@@ -2526,7 +2539,6 @@ socket_poller (void *ctx)
                                 gf_log (this->name, GF_LOG_TRACE,
                                         "OT_IDLE on %p (output request)",
                                         this);
-                                priv->ot_state = OT_IDLE;
                                 break;
                         }
 		}
@@ -2563,22 +2575,24 @@ socket_poller (void *ctx)
 err:
 	/* All (and only) I/O errors should come here. */
         pthread_mutex_lock(&priv->lock);
+        {
+                __socket_teardown_connection (this);
+                sys_close (priv->sock);
+                priv->sock = -1;
 
-        __socket_teardown_connection (this);
-        sys_close (priv->sock);
-        priv->sock = -1;
-
-        sys_close (priv->pipe[0]);
-        sys_close (priv->pipe[1]);
-        priv->pipe[0] = -1;
-        priv->pipe[1] = -1;
-
-        priv->ot_state = OT_IDLE;
+                sys_close (priv->pipe[0]);
+                sys_close (priv->pipe[1]);
+                priv->pipe[0] = -1;
+                priv->pipe[1] = -1;
 
+                priv->ot_state = OT_IDLE;
+        }
         pthread_mutex_unlock(&priv->lock);
 
         rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
 
+        GF_REF_PUT (priv);
+
         rpc_transport_unref (this);
 
 	return NULL;
@@ -2850,16 +2864,39 @@ out:
 
 
 static int
-socket_disconnect (rpc_transport_t *this)
+socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)
 {
-        socket_private_t *priv = NULL;
-        int               ret = -1;
+        socket_private_t *priv   = NULL;
+        int               ret    = -1;
+        char              a_byte = 'r';
 
         GF_VALIDATE_OR_GOTO ("socket", this, out);
         GF_VALIDATE_OR_GOTO ("socket", this->private, out);
 
         priv = this->private;
 
+        if (wait && priv->own_thread) {
+                pthread_mutex_lock (&priv->cond_lock);
+                {
+                        GF_REF_PUT (priv);
+                        /* Change the state to OT_PLEASE_DIE so that
+                         * socket_poller can exit. */
+                        priv->ot_state = OT_PLEASE_DIE;
+                        /* Write something into the pipe so that poller
+                         * thread can wake up.*/
+                        if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
+                                gf_log (this->name, GF_LOG_WARNING,
+                                                "write error on pipe");
+                        }
+
+                        /* Wait for socket_poller to exit */
+                        if (!priv->own_thread_done)
+                                pthread_cond_wait (&priv->cond,
+                                                   &priv->cond_lock);
+                }
+                pthread_mutex_unlock (&priv->cond_lock);
+        }
+
         pthread_mutex_lock (&priv->lock);
         {
                 ret = __socket_disconnect (this);
@@ -2939,6 +2976,7 @@ socket_connect (rpc_transport_t *this, int port)
 
         pthread_mutex_lock (&priv->lock);
         {
+                priv->own_thread_done = _gf_false;
                 if (priv->sock != -1) {
                         gf_log_callingfn (this->name, GF_LOG_TRACE,
                                           "connect () called on transport "
@@ -3807,6 +3845,26 @@ init_openssl_mt (void)
         SSL_load_error_strings();
 }
 
+void
+socket_poller_mayday (void *data)
+{
+        socket_private_t *priv = (socket_private_t *)data;
+
+        if (priv == NULL)
+                return;
+
+        pthread_mutex_lock (&priv->cond_lock);
+        {
+                /* Signal waiting threads before exiting from socket_poller */
+                if (!priv->own_thread_done) {
+                        gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED");
+                        pthread_cond_signal (&priv->cond);
+                        priv->own_thread_done = _gf_true;
+                }
+        }
+        pthread_mutex_unlock (&priv->cond_lock);
+}
+
 static int
 socket_init (rpc_transport_t *this)
 {
@@ -3837,6 +3895,10 @@ socket_init (rpc_transport_t *this)
         memset(priv,0,sizeof(*priv));
 
         pthread_mutex_init (&priv->lock, NULL);
+        pthread_mutex_init (&priv->cond_lock, NULL);
+        pthread_cond_init (&priv->cond, NULL);
+
+        GF_REF_INIT (priv, socket_poller_mayday);
 
         priv->sock = -1;
         priv->idx = -1;
@@ -4267,6 +4329,8 @@ fini (rpc_transport_t *this)
                         "transport %p destroyed", this);
 
                 pthread_mutex_destroy (&priv->lock);
+                pthread_mutex_destroy (&priv->cond_lock);
+                pthread_cond_destroy (&priv->cond);
 		if (priv->ssl_private_key) {
 			GF_FREE(priv->ssl_private_key);
 		}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 7c7005b..8528bde 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -27,6 +27,7 @@
 #include "dict.h"
 #include "mem-pool.h"
 #include "globals.h"
+#include "refcount.h"
 
 #ifndef MAX_IOVEC
 #define MAX_IOVEC 16
@@ -215,6 +216,8 @@ typedef struct {
         };
         struct gf_sock_incoming incoming;
         pthread_mutex_t        lock;
+        pthread_mutex_t        cond_lock;
+        pthread_cond_t         cond;
         int                    windowsize;
         char                   lowlat;
         char                   nodelay;
@@ -239,10 +242,13 @@ typedef struct {
 	pthread_t              thread;
 	int                    pipe[2];
 	gf_boolean_t           own_thread;
+        gf_boolean_t           own_thread_done;
         ot_state_t             ot_state;
         uint32_t               ot_gen;
         gf_boolean_t           is_server;
         int                    log_ctr;
+        GF_REF_DECL;           /* refcount to keep track of socket_poller
+                                  threads */
 } socket_private_t;
 
 
diff --git a/xlators/mgmt/glusterd/src/glusterd-handshake.c b/xlators/mgmt/glusterd/src/glusterd-handshake.c
index 9f162d8..e9aa6b0 100644
--- a/xlators/mgmt/glusterd/src/glusterd-handshake.c
+++ b/xlators/mgmt/glusterd/src/glusterd-handshake.c
@@ -1821,7 +1821,7 @@ __glusterd_mgmt_hndsk_version_ack_cbk (struct rpc_req *req, struct iovec *iov,
 out:
 
         if (ret != 0 && peerinfo)
-                rpc_transport_disconnect (peerinfo->rpc->conn.trans);
+                rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false);
 
         rcu_read_unlock ();
 
@@ -1946,7 +1946,8 @@ out:
                 frame->local = NULL;
                 STACK_DESTROY (frame->root);
                 if (peerinfo)
-                        rpc_transport_disconnect (peerinfo->rpc->conn.trans);
+                        rpc_transport_disconnect (peerinfo->rpc->conn.trans,
+                                                  _gf_false);
         }
 
         rcu_read_unlock ();
@@ -2216,7 +2217,7 @@ __glusterd_peer_dump_version_cbk (struct rpc_req *req, struct iovec *iov,
 
 out:
         if (ret != 0 && peerinfo)
-                rpc_transport_disconnect (peerinfo->rpc->conn.trans);
+                rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false);
 
         rcu_read_unlock ();
 
diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c
index 98c0059..31ba68b 100644
--- a/xlators/mgmt/glusterd/src/glusterd-utils.c
+++ b/xlators/mgmt/glusterd/src/glusterd-utils.c
@@ -4058,7 +4058,8 @@ gd_check_and_update_rebalance_info (glusterd_volinfo_t *old_volinfo,
 
         //Disconnect from rebalance process
         if (glusterd_defrag_rpc_get (old->defrag)) {
-                rpc_transport_disconnect (old->defrag->rpc->conn.trans);
+                rpc_transport_disconnect (old->defrag->rpc->conn.trans,
+                                          _gf_false);
                 glusterd_defrag_rpc_put (old->defrag);
         }
 
diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c
index 3284fac..74e7548 100644
--- a/xlators/protocol/client/src/client-handshake.c
+++ b/xlators/protocol/client/src/client-handshake.c
@@ -1557,7 +1557,7 @@ out:
         if (conf) {
                 /* Need this to connect the same transport on different port */
                 /* ie, glusterd to glusterfsd */
-                rpc_transport_disconnect (conf->rpc->conn.trans);
+                rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false);
         }
 
         return ret;
@@ -1678,7 +1678,7 @@ out:
         STACK_DESTROY (frame->root);
 
         if (ret != 0)
-                rpc_transport_disconnect (conf->rpc->conn.trans);
+                rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false);
 
         return ret;
 }
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index 726ab69..3c3664d 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -899,7 +899,8 @@ reconfigure (xlator_t *this, dict_t *options)
                                                 "unauthorized client, hence "
                                                 "terminating the connection %s",
                                                 xprt->peerinfo.identifier);
-                                        rpc_transport_disconnect(xprt);
+                                        rpc_transport_disconnect(xprt,
+                                                                 _gf_false);
                                 }
                         }
                 }
-- 
2.9.3