12a457
From e06607387272bff55477967f01c615049f8795cf Mon Sep 17 00:00:00 2001
12a457
From: Kaushal M <kaushal@redhat.com>
12a457
Date: Wed, 27 Apr 2016 16:12:49 +0530
12a457
Subject: [PATCH 122/139] socket: Reap own-threads
12a457
12a457
  Backport of be41e31 from upstream/release-3.7
12a457
12a457
Dead own-threads are reaped periodically (currently every minute). This
12a457
helps avoid memory being leaked, and should help prevent memory
12a457
starvation issues with GlusterD.
12a457
12a457
Change-Id: Ifb3442a91891b164655bb2aa72210b13cee31599
12a457
BUG: 1327751
12a457
Signed-off-by: Kaushal M <kaushal@redhat.com>
12a457
Reviewed-upstream-on: http://review.gluster.org/14143
12a457
Reviewed-on: https://code.engineering.redhat.com/gerrit/73681
12a457
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
12a457
Tested-by: Atin Mukherjee <amukherj@redhat.com>
12a457
---
12a457
 rpc/rpc-transport/socket/src/socket-mem-types.h |    1 +
12a457
 rpc/rpc-transport/socket/src/socket.c           |  121 +++++++++++++++++++++++
12a457
 2 files changed, 122 insertions(+), 0 deletions(-)
12a457
12a457
diff --git a/rpc/rpc-transport/socket/src/socket-mem-types.h b/rpc/rpc-transport/socket/src/socket-mem-types.h
12a457
index 3181406..d1860e6 100644
12a457
--- a/rpc/rpc-transport/socket/src/socket-mem-types.h
12a457
+++ b/rpc/rpc-transport/socket/src/socket-mem-types.h
12a457
@@ -16,6 +16,7 @@
12a457
 typedef enum gf_sock_mem_types_ {
12a457
         gf_sock_connect_error_state_t     = gf_common_mt_end + 1,
12a457
         gf_sock_mt_lock_array,
12a457
+        gf_sock_mt_tid_wrap,
12a457
         gf_sock_mt_end
12a457
 } gf_sock_mem_types_t;
12a457
 
12a457
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
12a457
index 6464cd7..17912c5 100644
12a457
--- a/rpc/rpc-transport/socket/src/socket.c
12a457
+++ b/rpc/rpc-transport/socket/src/socket.c
12a457
@@ -24,6 +24,7 @@
12a457
 #include "common-utils.h"
12a457
 #include "compat-errno.h"
12a457
 #include "socket-mem-types.h"
12a457
+#include "timer.h"
12a457
 
12a457
 /* ugly #includes below */
12a457
 #include "protocol-common.h"
12a457
@@ -191,6 +192,117 @@ struct socket_connect_error_state_ {
12a457
 };
12a457
 typedef struct socket_connect_error_state_ socket_connect_error_state_t;
12a457
 
12a457
+
12a457
+/* This timer and queue are used to reap dead threads. The timer triggers every
12a457
+ * minute and pthread_joins any threads that added themselves to the reap queue
12a457
+ *
12a457
+ * TODO: Make the timer configurable? (Not sure if required)
12a457
+ */
12a457
+static gf_timer_t *reap_timer;
12a457
+static struct list_head reap_queue;
12a457
+static pthread_mutex_t reap_lock = PTHREAD_MUTEX_INITIALIZER;
12a457
+const struct timespec reap_ts = {60, 0};
12a457
+
12a457
+struct tid_wrap {
12a457
+        struct list_head list;
12a457
+        pthread_t tid;
12a457
+};
12a457
+
12a457
+/* _socket_reap_own_threads iterated over the queue of tid's and pthread_joins
12a457
+ * them.  If a thread join fails, it logs the failure and continues
12a457
+ */
12a457
+static void
12a457
+_socket_reap_own_threads() {
12a457
+        struct tid_wrap *node = NULL;
12a457
+        struct tid_wrap *tmp = NULL;
12a457
+        pthread_t tid = 0;
12a457
+        int i = 0;
12a457
+
12a457
+        list_for_each_entry_safe (node, tmp, &reap_queue, list) {
12a457
+                list_del_init (&node->list);
12a457
+                if (pthread_join (node->tid, NULL)) {
12a457
+                        gf_log (THIS->name, GF_LOG_ERROR,
12a457
+                                "own-thread: failed to join thread (tid: %zu)",
12a457
+                                tid);
12a457
+                }
12a457
+                node->tid = 0;
12a457
+                GF_FREE (node);
12a457
+                node = NULL;
12a457
+                i++;
12a457
+        }
12a457
+
12a457
+        if (i) {
12a457
+                gf_log (THIS->name, GF_LOG_TRACE, "reaped %d own-threads", i);
12a457
+        }
12a457
+
12a457
+        return;
12a457
+}
12a457
+
12a457
+/* socket_thread_reaper reaps threads and restarts the reap_timer
12a457
+ */
12a457
+static void
12a457
+socket_thread_reaper () {
12a457
+
12a457
+        pthread_mutex_lock (&reap_lock);
12a457
+
12a457
+        gf_timer_call_cancel (THIS->ctx, reap_timer);
12a457
+        reap_timer = 0;
12a457
+
12a457
+        _socket_reap_own_threads();
12a457
+
12a457
+        reap_timer = gf_timer_call_after (THIS->ctx, reap_ts,
12a457
+                                          socket_thread_reaper, NULL);
12a457
+        if (!reap_timer)
12a457
+                gf_log (THIS->name, GF_LOG_ERROR,
12a457
+                        "failed to restart socket own-thread reap timer");
12a457
+
12a457
+        pthread_mutex_unlock (&reap_lock);
12a457
+
12a457
+        return;
12a457
+}
12a457
+
12a457
+/* socket_thread_reaper_init initializes reap_timer and reap_queue.
12a457
+ * Initializations are done only the first time this is called.
12a457
+ *
12a457
+ * To make sure that the reap_timer is always run, reaper_init it is better to
12a457
+ * call this whenever an own-thread is launched
12a457
+ */
12a457
+static void
12a457
+socket_thread_reaper_init () {
12a457
+        pthread_mutex_lock (&reap_lock);
12a457
+
12a457
+        if (reap_timer == NULL) {
12a457
+                reap_timer = gf_timer_call_after (THIS->ctx, reap_ts,
12a457
+                                                  socket_thread_reaper, NULL);
12a457
+                INIT_LIST_HEAD (&reap_queue);
12a457
+        }
12a457
+
12a457
+        pthread_mutex_unlock (&reap_lock);
12a457
+
12a457
+        return;
12a457
+}
12a457
+
12a457
+/* socket_thread_reaper_add adds the given thread id to the queue of threads
12a457
+ * that will be reaped by socket_thread_reaper
12a457
+ * own-threads need to call this with their thread-ids before dying
12a457
+ */
12a457
+static int
12a457
+socket_thread_reaper_add (pthread_t tid) {
12a457
+        struct tid_wrap *node = NULL;
12a457
+
12a457
+        pthread_mutex_lock (&reap_lock);
12a457
+
12a457
+        node = GF_CALLOC (1, sizeof (*node), gf_sock_mt_tid_wrap);
12a457
+        node->tid = tid;
12a457
+        INIT_LIST_HEAD (&node->list);
12a457
+        list_add_tail (&node->list, &reap_queue);
12a457
+
12a457
+        pthread_mutex_unlock (&reap_lock);
12a457
+
12a457
+        return 0;
12a457
+}
12a457
+
12a457
+
12a457
 static int socket_init (rpc_transport_t *this);
12a457
 
12a457
 static void
12a457
@@ -2527,7 +2639,14 @@ err:
12a457
         priv->ot_state = OT_IDLE;
12a457
         pthread_mutex_unlock(&priv->lock);
12a457
         rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
12a457
+
12a457
+        /* Add the thread to the reap_queue before freeing up the transport and
12a457
+         * dying
12a457
+         */
12a457
+        socket_thread_reaper_add (priv->thread);
12a457
+
12a457
         rpc_transport_unref (this);
12a457
+
12a457
 	return NULL;
12a457
 }
12a457
 
12a457
@@ -2556,6 +2675,8 @@ socket_spawn (rpc_transport_t *this)
12a457
                 gf_log (this->name, GF_LOG_ERROR,
12a457
                         "could not create poll thread");
12a457
         }
12a457
+        /* start the reaper thread */
12a457
+        socket_thread_reaper_init();
12a457
 }
12a457
 
12a457
 static int
12a457
-- 
12a457
1.7.1
12a457