From de20f58e8ad6baeef2296544822816cfe9eabb4e Mon Sep 17 00:00:00 2001
From: Rajesh Joseph <rjoseph@redhat.com>
Date: Tue, 13 Dec 2016 15:28:42 +0530
Subject: [PATCH 249/257] socket: socket disconnect should wait for poller
thread exit
When SSL is enabled or if "transport.socket.own-thread" option is set
then socket_poller is run as different thread. Currently during
disconnect or PARENT_DOWN scenario we don't wait for this thread
to terminate. PARENT_DOWN will disconnect the socket layer and
cleanup resources used by socket_poller.
Therefore before disconnect we should wait for poller thread to exit.
>Reviewed-on: http://review.gluster.org/16141
>Smoke: Gluster Build System <jenkins@build.gluster.org>
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
>Reviewed-by: Kaushal M <kaushal@redhat.com>
>Reviewed-by: Raghavendra Talur <rtalur@redhat.com>
>Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
Change-Id: I71f984b47d260ffd979102f180a99a0bed29f0d6
BUG: 1398798
Signed-off-by: Rajesh Joseph <rjoseph@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/93555
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
Tested-by: Atin Mukherjee <amukherj@redhat.com>
---
rpc/rpc-lib/src/rpc-clnt-ping.c | 2 +-
rpc/rpc-lib/src/rpc-clnt.c | 4 +-
rpc/rpc-lib/src/rpc-transport.c | 5 +-
rpc/rpc-lib/src/rpc-transport.h | 4 +-
rpc/rpc-lib/src/rpcsvc.c | 2 +-
rpc/rpc-transport/rdma/src/rdma.c | 19 ++---
rpc/rpc-transport/socket/src/socket.c | 98 +++++++++++++++++++++-----
rpc/rpc-transport/socket/src/socket.h | 6 ++
xlators/mgmt/glusterd/src/glusterd-handshake.c | 7 +-
xlators/mgmt/glusterd/src/glusterd-utils.c | 3 +-
xlators/protocol/client/src/client-handshake.c | 4 +-
xlators/protocol/server/src/server.c | 3 +-
12 files changed, 116 insertions(+), 41 deletions(-)
diff --git a/rpc/rpc-lib/src/rpc-clnt-ping.c b/rpc/rpc-lib/src/rpc-clnt-ping.c
index a7ff866..14a1d7c 100644
--- a/rpc/rpc-lib/src/rpc-clnt-ping.c
+++ b/rpc/rpc-lib/src/rpc-clnt-ping.c
@@ -159,7 +159,7 @@ rpc_clnt_ping_timer_expired (void *rpc_ptr)
trans->peerinfo.identifier,
conn->ping_timeout);
- rpc_transport_disconnect (conn->trans);
+ rpc_transport_disconnect (conn->trans, _gf_false);
}
out:
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index fe099f9..f24c934 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1855,7 +1855,7 @@ rpc_clnt_disable (struct rpc_clnt *rpc)
pthread_mutex_unlock (&conn->lock);
if (trans) {
- rpc_transport_disconnect (trans);
+ rpc_transport_disconnect (trans, _gf_true);
}
if (unref)
@@ -1914,7 +1914,7 @@ rpc_clnt_disconnect (struct rpc_clnt *rpc)
pthread_mutex_unlock (&conn->lock);
if (trans) {
- rpc_transport_disconnect (trans);
+ rpc_transport_disconnect (trans, _gf_true);
}
if (unref)
rpc_clnt_unref (rpc);
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index f3ebe2d..e62d94b 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -435,13 +435,14 @@ fail:
int32_t
-rpc_transport_disconnect (rpc_transport_t *this)
+rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
int32_t ret = -1;
GF_VALIDATE_OR_GOTO("rpc_transport", this, fail);
- ret = this->ops->disconnect (this);
+ ret = this->ops->disconnect (this, wait);
+
fail:
return ret;
}
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index 1d4b444..630729d 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -223,7 +223,7 @@ struct rpc_transport_ops {
rpc_transport_reply_t *reply);
int32_t (*connect) (rpc_transport_t *this, int port);
int32_t (*listen) (rpc_transport_t *this);
- int32_t (*disconnect) (rpc_transport_t *this);
+ int32_t (*disconnect) (rpc_transport_t *this, gf_boolean_t wait);
int32_t (*get_peername) (rpc_transport_t *this, char *hostname,
int hostlen);
int32_t (*get_peeraddr) (rpc_transport_t *this, char *peeraddr,
@@ -247,7 +247,7 @@ int32_t
rpc_transport_connect (rpc_transport_t *this, int port);
int32_t
-rpc_transport_disconnect (rpc_transport_t *this);
+rpc_transport_disconnect (rpc_transport_t *this, gf_boolean_t wait);
int32_t
rpc_transport_destroy (rpc_transport_t *this);
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 2a2ca64..2d2a7ff 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -1639,7 +1639,7 @@ rpcsvc_create_listener (rpcsvc_t *svc, dict_t *options, char *name)
ret = 0;
out:
if (!listener && trans) {
- rpc_transport_disconnect (trans);
+ rpc_transport_disconnect (trans, _gf_true);
}
return ret;
diff --git a/rpc/rpc-transport/rdma/src/rdma.c b/rpc/rpc-transport/rdma/src/rdma.c
index 551ac07..d2f04bd 100644
--- a/rpc/rpc-transport/rdma/src/rdma.c
+++ b/rpc/rpc-transport/rdma/src/rdma.c
@@ -51,7 +51,7 @@ static int32_t
gf_rdma_teardown (rpc_transport_t *this);
static int32_t
-gf_rdma_disconnect (rpc_transport_t *this);
+gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait);
static void
gf_rdma_cm_handle_disconnect (rpc_transport_t *this);
@@ -1209,7 +1209,7 @@ gf_rdma_cm_handle_connect_init (struct rdma_cm_event *event)
}
if (ret < 0) {
- gf_rdma_disconnect (this);
+ gf_rdma_disconnect (this, _gf_false);
}
return ret;
@@ -3014,7 +3014,7 @@ gf_rdma_submit_request (rpc_transport_t *this, rpc_transport_req_t *req)
RDMA_MSG_WRITE_PEER_FAILED,
"sending request to peer (%s) failed",
this->peerinfo.identifier);
- rpc_transport_disconnect (this);
+ rpc_transport_disconnect (this, _gf_false);
}
out:
@@ -3051,7 +3051,7 @@ gf_rdma_submit_reply (rpc_transport_t *this, rpc_transport_reply_t *reply)
RDMA_MSG_WRITE_PEER_FAILED,
"sending request to peer (%s) failed",
this->peerinfo.identifier);
- rpc_transport_disconnect (this);
+ rpc_transport_disconnect (this, _gf_false);
}
out:
@@ -4095,7 +4095,7 @@ gf_rdma_process_recv (gf_rdma_peer_t *peer, struct ibv_wc *wc)
out:
if (ret == -1) {
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans, _gf_false);
}
return;
@@ -4216,7 +4216,8 @@ gf_rdma_recv_completion_proc (void *data)
if (peer) {
ibv_ack_cq_events (event_cq, num_wr);
rpc_transport_unref (peer->trans);
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans,
+ _gf_false);
}
if (post) {
@@ -4292,7 +4293,7 @@ gf_rdma_handle_failed_send_completion (gf_rdma_peer_t *peer, struct ibv_wc *wc)
}
if (peer) {
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans, _gf_false);
}
return;
@@ -4343,7 +4344,7 @@ gf_rdma_handle_successful_send_completion (gf_rdma_peer_t *peer,
ret = gf_rdma_pollin_notify (peer, post);
if ((ret == -1) && (peer != NULL)) {
- rpc_transport_disconnect (peer->trans);
+ rpc_transport_disconnect (peer->trans, _gf_false);
}
out:
@@ -4657,7 +4658,7 @@ gf_rdma_init (rpc_transport_t *this)
static int32_t
-gf_rdma_disconnect (rpc_transport_t *this)
+gf_rdma_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
gf_rdma_private_t *priv = NULL;
int32_t ret = 0;
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index ae551dc..da2f97f 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2346,7 +2346,7 @@ out:
return ret;
}
-static int socket_disconnect (rpc_transport_t *this);
+static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
/* reads rpc_requests during pollin */
static int
@@ -2377,7 +2377,7 @@ socket_event_handler (int fd, int idx, void *data,
EINPROGRESS or ENOENT, so nothing more to do, fail
reading/writing anything even if poll_in or poll_out
is set */
- ret = socket_disconnect (this);
+ ret = socket_disconnect (this, _gf_false);
/* Force ret to be -1, as we are officially done with
this socket */
@@ -2426,6 +2426,13 @@ socket_poller (void *ctx)
* conditionally
*/
THIS = this->xl;
+ GF_REF_GET (priv);
+
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_DEBUG, "socket_poller is exiting "
+ "because socket state is OT_PLEASE_DIE");
+ goto err;
+ }
priv->ot_state = OT_RUNNING;
@@ -2496,6 +2503,13 @@ socket_poller (void *ctx)
break;
}
+ if (priv->ot_state == OT_PLEASE_DIE) {
+ gf_log (this->name, GF_LOG_DEBUG,
+ "OT_PLEASE_DIE on %p (exiting socket_poller)",
+ this);
+ break;
+ }
+
if (pfd[1].revents & POLL_MASK_INPUT) {
ret = socket_event_poll_in(this);
if (ret >= 0) {
@@ -2509,7 +2523,6 @@ socket_poller (void *ctx)
gf_log (this->name, GF_LOG_TRACE,
"OT_IDLE on %p (input request)",
this);
- priv->ot_state = OT_IDLE;
break;
}
}
@@ -2526,7 +2539,6 @@ socket_poller (void *ctx)
gf_log (this->name, GF_LOG_TRACE,
"OT_IDLE on %p (output request)",
this);
- priv->ot_state = OT_IDLE;
break;
}
}
@@ -2563,22 +2575,24 @@ socket_poller (void *ctx)
err:
/* All (and only) I/O errors should come here. */
pthread_mutex_lock(&priv->lock);
+ {
+ __socket_teardown_connection (this);
+ sys_close (priv->sock);
+ priv->sock = -1;
- __socket_teardown_connection (this);
- sys_close (priv->sock);
- priv->sock = -1;
-
- sys_close (priv->pipe[0]);
- sys_close (priv->pipe[1]);
- priv->pipe[0] = -1;
- priv->pipe[1] = -1;
-
- priv->ot_state = OT_IDLE;
+ sys_close (priv->pipe[0]);
+ sys_close (priv->pipe[1]);
+ priv->pipe[0] = -1;
+ priv->pipe[1] = -1;
+ priv->ot_state = OT_IDLE;
+ }
pthread_mutex_unlock(&priv->lock);
rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+ GF_REF_PUT (priv);
+
rpc_transport_unref (this);
return NULL;
@@ -2850,16 +2864,39 @@ out:
static int
-socket_disconnect (rpc_transport_t *this)
+socket_disconnect (rpc_transport_t *this, gf_boolean_t wait)
{
- socket_private_t *priv = NULL;
- int ret = -1;
+ socket_private_t *priv = NULL;
+ int ret = -1;
+ char a_byte = 'r';
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
priv = this->private;
+ if (wait && priv->own_thread) {
+ pthread_mutex_lock (&priv->cond_lock);
+ {
+ GF_REF_PUT (priv);
+ /* Change the state to OT_PLEASE_DIE so that
+ * socket_poller can exit. */
+ priv->ot_state = OT_PLEASE_DIE;
+ /* Write something into the pipe so that poller
+ * thread can wake up.*/
+ if (sys_write (priv->pipe[1], &a_byte, 1) < 1) {
+ gf_log (this->name, GF_LOG_WARNING,
+ "write error on pipe");
+ }
+
+ /* Wait for socket_poller to exit */
+ if (!priv->own_thread_done)
+ pthread_cond_wait (&priv->cond,
+ &priv->cond_lock);
+ }
+ pthread_mutex_unlock (&priv->cond_lock);
+ }
+
pthread_mutex_lock (&priv->lock);
{
ret = __socket_disconnect (this);
@@ -2939,6 +2976,7 @@ socket_connect (rpc_transport_t *this, int port)
pthread_mutex_lock (&priv->lock);
{
+ priv->own_thread_done = _gf_false;
if (priv->sock != -1) {
gf_log_callingfn (this->name, GF_LOG_TRACE,
"connect () called on transport "
@@ -3807,6 +3845,26 @@ init_openssl_mt (void)
SSL_load_error_strings();
}
+void
+socket_poller_mayday (void *data)
+{
+ socket_private_t *priv = (socket_private_t *)data;
+
+ if (priv == NULL)
+ return;
+
+ pthread_mutex_lock (&priv->cond_lock);
+ {
+ /* Signal waiting threads before exiting from socket_poller */
+ if (!priv->own_thread_done) {
+ gf_log ("socket", GF_LOG_TRACE, "priv->cond SIGNALED");
+ pthread_cond_signal (&priv->cond);
+ priv->own_thread_done = _gf_true;
+ }
+ }
+ pthread_mutex_unlock (&priv->cond_lock);
+}
+
static int
socket_init (rpc_transport_t *this)
{
@@ -3837,6 +3895,10 @@ socket_init (rpc_transport_t *this)
memset(priv,0,sizeof(*priv));
pthread_mutex_init (&priv->lock, NULL);
+ pthread_mutex_init (&priv->cond_lock, NULL);
+ pthread_cond_init (&priv->cond, NULL);
+
+ GF_REF_INIT (priv, socket_poller_mayday);
priv->sock = -1;
priv->idx = -1;
@@ -4267,6 +4329,8 @@ fini (rpc_transport_t *this)
"transport %p destroyed", this);
pthread_mutex_destroy (&priv->lock);
+ pthread_mutex_destroy (&priv->cond_lock);
+ pthread_cond_destroy (&priv->cond);
if (priv->ssl_private_key) {
GF_FREE(priv->ssl_private_key);
}
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 7c7005b..8528bde 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -27,6 +27,7 @@
#include "dict.h"
#include "mem-pool.h"
#include "globals.h"
+#include "refcount.h"
#ifndef MAX_IOVEC
#define MAX_IOVEC 16
@@ -215,6 +216,8 @@ typedef struct {
};
struct gf_sock_incoming incoming;
pthread_mutex_t lock;
+ pthread_mutex_t cond_lock;
+ pthread_cond_t cond;
int windowsize;
char lowlat;
char nodelay;
@@ -239,10 +242,13 @@ typedef struct {
pthread_t thread;
int pipe[2];
gf_boolean_t own_thread;
+ gf_boolean_t own_thread_done;
ot_state_t ot_state;
uint32_t ot_gen;
gf_boolean_t is_server;
int log_ctr;
+ GF_REF_DECL; /* refcount to keep track of socket_poller
+ threads */
} socket_private_t;
diff --git a/xlators/mgmt/glusterd/src/glusterd-handshake.c b/xlators/mgmt/glusterd/src/glusterd-handshake.c
index 9f162d8..e9aa6b0 100644
--- a/xlators/mgmt/glusterd/src/glusterd-handshake.c
+++ b/xlators/mgmt/glusterd/src/glusterd-handshake.c
@@ -1821,7 +1821,7 @@ __glusterd_mgmt_hndsk_version_ack_cbk (struct rpc_req *req, struct iovec *iov,
out:
if (ret != 0 && peerinfo)
- rpc_transport_disconnect (peerinfo->rpc->conn.trans);
+ rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false);
rcu_read_unlock ();
@@ -1946,7 +1946,8 @@ out:
frame->local = NULL;
STACK_DESTROY (frame->root);
if (peerinfo)
- rpc_transport_disconnect (peerinfo->rpc->conn.trans);
+ rpc_transport_disconnect (peerinfo->rpc->conn.trans,
+ _gf_false);
}
rcu_read_unlock ();
@@ -2216,7 +2217,7 @@ __glusterd_peer_dump_version_cbk (struct rpc_req *req, struct iovec *iov,
out:
if (ret != 0 && peerinfo)
- rpc_transport_disconnect (peerinfo->rpc->conn.trans);
+ rpc_transport_disconnect (peerinfo->rpc->conn.trans, _gf_false);
rcu_read_unlock ();
diff --git a/xlators/mgmt/glusterd/src/glusterd-utils.c b/xlators/mgmt/glusterd/src/glusterd-utils.c
index 98c0059..31ba68b 100644
--- a/xlators/mgmt/glusterd/src/glusterd-utils.c
+++ b/xlators/mgmt/glusterd/src/glusterd-utils.c
@@ -4058,7 +4058,8 @@ gd_check_and_update_rebalance_info (glusterd_volinfo_t *old_volinfo,
//Disconnect from rebalance process
if (glusterd_defrag_rpc_get (old->defrag)) {
- rpc_transport_disconnect (old->defrag->rpc->conn.trans);
+ rpc_transport_disconnect (old->defrag->rpc->conn.trans,
+ _gf_false);
glusterd_defrag_rpc_put (old->defrag);
}
diff --git a/xlators/protocol/client/src/client-handshake.c b/xlators/protocol/client/src/client-handshake.c
index 3284fac..74e7548 100644
--- a/xlators/protocol/client/src/client-handshake.c
+++ b/xlators/protocol/client/src/client-handshake.c
@@ -1557,7 +1557,7 @@ out:
if (conf) {
/* Need this to connect the same transport on different port */
/* ie, glusterd to glusterfsd */
- rpc_transport_disconnect (conf->rpc->conn.trans);
+ rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false);
}
return ret;
@@ -1678,7 +1678,7 @@ out:
STACK_DESTROY (frame->root);
if (ret != 0)
- rpc_transport_disconnect (conf->rpc->conn.trans);
+ rpc_transport_disconnect (conf->rpc->conn.trans, _gf_false);
return ret;
}
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index 726ab69..3c3664d 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -899,7 +899,8 @@ reconfigure (xlator_t *this, dict_t *options)
"unauthorized client, hence "
"terminating the connection %s",
xprt->peerinfo.identifier);
- rpc_transport_disconnect(xprt);
+ rpc_transport_disconnect(xprt,
+ _gf_false);
}
}
}
--
2.9.3