887953
From 667e92a8dd0a21902cef39a59bc6c6b77d1f3c26 Mon Sep 17 00:00:00 2001
887953
From: Raghavendra Gowdappa <rgowdapp@redhat.com>
887953
Date: Mon, 11 Feb 2019 12:32:52 +0530
887953
Subject: [PATCH 525/529] rpcsvc: provide each request handler thread its own
887953
 queue
887953
887953
A single global per program queue is contended by all request handler
887953
threads and event threads. This can lead to high contention. So,
887953
reduce the contention by providing each request handler thread its own
887953
private queue.
887953
887953
Thanks to "Manoj Pillai"<mpillai@redhat.com> for the idea of pairing a
887953
single queue with a fixed request-handler-thread and event-thread,
887953
which brought down the performance regression due to overhead of
887953
queuing significantly.
887953
887953
Thanks to "Xavi Hernandez"<xhernandez@redhat.com> for discussion on
887953
how to communicate the event-thread death to request-handler-thread.
887953
887953
Thanks to "Karan Sandha"<ksandha@redhat.com> for voluntarily running
887953
the perf benchmarks to qualify that performance regression introduced
887953
by ping-timer-fixes is fixed with this patch and patiently running
887953
many iterations of regression tests while RCAing the issue.
887953
887953
Thanks to "Milind Changire"<mchangir@redhat.com> for patiently running
887953
the many iterations of perf benchmarking tests while RCAing the
887953
regression caused by ping-timer-expiry fixes.
887953
887953
Change-Id: I578c3fc67713f4234bd3abbec5d3fbba19059ea5
887953
BUG: 1390151
887953
Signed-off-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
887953
(cherry picked from commit 95e380eca19b9f0d03a53429535f15556e5724ad)
887953
Reviewed-on: https://code.engineering.redhat.com/gerrit/162427
887953
Tested-by: RHGS Build Bot <nigelb@redhat.com>
887953
---
887953
 cli/src/cli-rl.c                             |   4 +-
887953
 libglusterfs/src/event-epoll.c               | 156 +++++++++---
887953
 libglusterfs/src/event-poll.c                |  14 +-
887953
 libglusterfs/src/event.c                     |  11 +-
887953
 libglusterfs/src/event.h                     |  19 +-
887953
 rpc/rpc-lib/src/rpc-clnt.c                   |   6 +
887953
 rpc/rpc-lib/src/rpc-transport.c              |   4 +
887953
 rpc/rpc-lib/src/rpc-transport.h              |   3 +
887953
 rpc/rpc-lib/src/rpcsvc.c                     | 339 +++++++++++++++++++++++----
887953
 rpc/rpc-lib/src/rpcsvc.h                     |  32 ++-
887953
 rpc/rpc-transport/socket/src/socket.c        |  29 ++-
887953
 xlators/protocol/server/src/server-helpers.c |   4 +
887953
 xlators/protocol/server/src/server.c         |   3 +
887953
 13 files changed, 530 insertions(+), 94 deletions(-)
887953
887953
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c
887953
index 4745cf4..cffd0a8 100644
887953
--- a/cli/src/cli-rl.c
887953
+++ b/cli/src/cli-rl.c
887953
@@ -109,7 +109,7 @@ cli_rl_process_line (char *line)
887953
 
887953
 int
887953
 cli_rl_stdin (int fd, int idx, int gen, void *data,
887953
-              int poll_out, int poll_in, int poll_err)
887953
+              int poll_out, int poll_in, int poll_err, char event_thread_died)
887953
 {
887953
         struct cli_state *state = NULL;
887953
 
887953
@@ -394,7 +394,7 @@ cli_rl_enable (struct cli_state *state)
887953
         }
887953
 
887953
         ret = event_register (state->ctx->event_pool, 0, cli_rl_stdin, state,
887953
-                              1, 0);
887953
+                              1, 0, 0);
887953
         if (ret == -1)
887953
                 goto out;
887953
 
887953
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
887953
index 7fc53ff..310bce3 100644
887953
--- a/libglusterfs/src/event-epoll.c
887953
+++ b/libglusterfs/src/event-epoll.c
887953
@@ -32,6 +32,7 @@ struct event_slot_epoll {
887953
 	int fd;
887953
 	int events;
887953
 	int gen;
887953
+        int idx;
887953
 	int ref;
887953
 	int do_close;
887953
 	int in_handler;
887953
@@ -39,6 +40,7 @@ struct event_slot_epoll {
887953
 	void *data;
887953
 	event_handler_t handler;
887953
 	gf_lock_t lock;
887953
+        struct list_head poller_death;
887953
 };
887953
 
887953
 struct event_thread_data {
887953
@@ -60,6 +62,7 @@ __event_newtable (struct event_pool *event_pool, int table_idx)
887953
 	for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
887953
 		table[i].fd = -1;
887953
 		LOCK_INIT (&table[i].lock);
887953
+                INIT_LIST_HEAD(&table[i].poller_death);
887953
 	}
887953
 
887953
 	event_pool->ereg[table_idx] = table;
887953
@@ -70,7 +73,8 @@ __event_newtable (struct event_pool *event_pool, int table_idx)
887953
 
887953
 
887953
 static int
887953
-__event_slot_alloc (struct event_pool *event_pool, int fd)
887953
+__event_slot_alloc (struct event_pool *event_pool, int fd,
887953
+                    char notify_poller_death)
887953
 {
887953
         int  i = 0;
887953
 	int  table_idx = -1;
887953
@@ -105,34 +109,42 @@ __event_slot_alloc (struct event_pool *event_pool, int fd)
887953
 
887953
 	table_idx = i;
887953
 
887953
-	for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
887953
-		if (table[i].fd == -1) {
887953
-			/* wipe everything except bump the generation */
887953
-			gen = table[i].gen;
887953
-			memset (&table[i], 0, sizeof (table[i]));
887953
-			table[i].gen = gen + 1;
887953
-
887953
-			LOCK_INIT (&table[i].lock);
887953
+        for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
887953
+                if (table[i].fd == -1) {
887953
+                        /* wipe everything except bump the generation */
887953
+                        gen = table[i].gen;
887953
+                        memset (&table[i], 0, sizeof (table[i]));
887953
+                        table[i].gen = gen + 1;
887953
+
887953
+                        LOCK_INIT (&table[i].lock);
887953
+                        INIT_LIST_HEAD(&table[i].poller_death);
887953
+
887953
+                        table[i].fd = fd;
887953
+                        if (notify_poller_death) {
887953
+                                table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i;
887953
+                                list_add_tail(&table[i].poller_death,
887953
+                                                &event_pool->poller_death);
887953
+                        }
887953
 
887953
-			table[i].fd = fd;
887953
-			event_pool->slots_used[table_idx]++;
887953
+                        event_pool->slots_used[table_idx]++;
887953
 
887953
-			break;
887953
-		}
887953
-	}
887953
+                        break;
887953
+                }
887953
+        }
887953
 
887953
 	return table_idx * EVENT_EPOLL_SLOTS + i;
887953
 }
887953
 
887953
 
887953
 static int
887953
-event_slot_alloc (struct event_pool *event_pool, int fd)
887953
+event_slot_alloc (struct event_pool *event_pool, int fd,
887953
+                  char notify_poller_death)
887953
 {
887953
 	int  idx = -1;
887953
 
887953
 	pthread_mutex_lock (&event_pool->mutex);
887953
 	{
887953
-		idx = __event_slot_alloc (event_pool, fd);
887953
+		idx = __event_slot_alloc (event_pool, fd, notify_poller_death);
887953
 	}
887953
 	pthread_mutex_unlock (&event_pool->mutex);
887953
 
887953
@@ -162,6 +174,7 @@ __event_slot_dealloc (struct event_pool *event_pool, int idx)
887953
 	slot->fd = -1;
887953
         slot->handled_error = 0;
887953
         slot->in_handler = 0;
887953
+        list_del_init(&slot->poller_death);
887953
 	event_pool->slots_used[table_idx]--;
887953
 
887953
 	return;
887953
@@ -180,6 +193,23 @@ event_slot_dealloc (struct event_pool *event_pool, int idx)
887953
 	return;
887953
 }
887953
 
887953
+static int
887953
+event_slot_ref(struct event_slot_epoll *slot)
887953
+{
887953
+        int ref;
887953
+
887953
+        if (!slot)
887953
+                return -1;
887953
+
887953
+	LOCK (&slot->lock);
887953
+	{
887953
+		slot->ref++;
887953
+                ref = slot->ref;
887953
+	}
887953
+	UNLOCK (&slot->lock);
887953
+
887953
+        return ref;
887953
+}
887953
 
887953
 static struct event_slot_epoll *
887953
 event_slot_get (struct event_pool *event_pool, int idx)
887953
@@ -198,15 +228,44 @@ event_slot_get (struct event_pool *event_pool, int idx)
887953
 
887953
 	slot = &table[offset];
887953
 
887953
+        event_slot_ref (slot);
887953
+	return slot;
887953
+}
887953
+
887953
+static void
887953
+__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
887953
+                   int idx)
887953
+{
887953
+        int ref = -1;
887953
+        int fd = -1;
887953
+        int do_close = 0;
887953
+
887953
 	LOCK (&slot->lock);
887953
 	{
887953
-		slot->ref++;
887953
+		--(slot->ref);
887953
+                ref = slot->ref;
887953
 	}
887953
 	UNLOCK (&slot->lock);
887953
 
887953
-	return slot;
887953
-}
887953
+        if (ref)
887953
+                /* slot still alive */
887953
+                goto done;
887953
+
887953
+        LOCK(&slot->lock);
887953
+        {
887953
+                fd = slot->fd;
887953
+                do_close = slot->do_close;
887953
+                slot->do_close = 0;
887953
+        }
887953
+        UNLOCK(&slot->lock);
887953
+
887953
+        __event_slot_dealloc(event_pool, idx);
887953
 
887953
+        if (do_close)
887953
+                sys_close(fd);
887953
+done:
887953
+        return;
887953
+}
887953
 
887953
 static void
887953
 event_slot_unref (struct event_pool *event_pool, struct event_slot_epoll *slot,
887953
@@ -264,7 +323,7 @@ event_pool_new_epoll (int count, int eventthreadcount)
887953
         event_pool->fd = epfd;
887953
 
887953
         event_pool->count = count;
887953
-
887953
+        INIT_LIST_HEAD(&event_pool->poller_death);
887953
         event_pool->eventthreadcount = eventthreadcount;
887953
         event_pool->auto_thread_count = 0;
887953
 
887953
@@ -315,7 +374,8 @@ __slot_update_events (struct event_slot_epoll *slot, int poll_in, int poll_out)
887953
 int
887953
 event_register_epoll (struct event_pool *event_pool, int fd,
887953
                       event_handler_t handler,
887953
-                      void *data, int poll_in, int poll_out)
887953
+                      void *data, int poll_in, int poll_out,
887953
+                      char notify_poller_death)
887953
 {
887953
         int                 idx = -1;
887953
         int                 ret = -1;
887953
@@ -345,7 +405,7 @@ event_register_epoll (struct event_pool *event_pool, int fd,
887953
         if (destroy == 1)
887953
                goto out;
887953
 
887953
-	idx = event_slot_alloc (event_pool, fd);
887953
+	idx = event_slot_alloc (event_pool, fd, notify_poller_death);
887953
 	if (idx == -1) {
887953
 		gf_msg ("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
887953
 			"could not find slot for fd=%d", fd);
887953
@@ -583,7 +643,7 @@ pre_unlock:
887953
                 ret = handler (fd, idx, gen, data,
887953
                                (event->events & (EPOLLIN|EPOLLPRI)),
887953
                                (event->events & (EPOLLOUT)),
887953
-                               (event->events & (EPOLLERR|EPOLLHUP)));
887953
+                               (event->events & (EPOLLERR|EPOLLHUP)), 0);
887953
         }
887953
 out:
887953
 	event_slot_unref (event_pool, slot, idx);
887953
@@ -600,7 +660,10 @@ event_dispatch_epoll_worker (void *data)
887953
         struct event_thread_data *ev_data = data;
887953
 	struct event_pool  *event_pool;
887953
         int                 myindex = -1;
887953
-        int                 timetodie = 0;
887953
+        int                 timetodie = 0, gen = 0;
887953
+        struct list_head    poller_death_notify;
887953
+        struct event_slot_epoll *slot = NULL, *tmp = NULL;
887953
+
887953
 
887953
         GF_VALIDATE_OR_GOTO ("event", ev_data, out);
887953
 
887953
@@ -610,7 +673,7 @@ event_dispatch_epoll_worker (void *data)
887953
         GF_VALIDATE_OR_GOTO ("event", event_pool, out);
887953
 
887953
         gf_msg ("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "Started"
887953
-                " thread with index %d", myindex);
887953
+                " thread with index %d", myindex - 1);
887953
 
887953
         pthread_mutex_lock (&event_pool->mutex);
887953
         {
887953
@@ -627,21 +690,58 @@ event_dispatch_epoll_worker (void *data)
887953
                          * reconfigured always */
887953
                         pthread_mutex_lock (&event_pool->mutex);
887953
                         {
887953
-                                if (event_pool->eventthreadcount <
887953
-                                    myindex) {
887953
+                                if (event_pool->eventthreadcount < myindex) {
887953
+                                        while (event_pool->poller_death_sliced) {
887953
+                                                pthread_cond_wait(
887953
+                                                        &event_pool->cond,
887953
+                                                        &event_pool->mutex);
887953
+                                        }
887953
+
887953
+                                        INIT_LIST_HEAD(&poller_death_notify);
887953
+
887953
                                         /* if found true in critical section,
887953
                                          * die */
887953
                                         event_pool->pollers[myindex - 1] = 0;
887953
                                         event_pool->activethreadcount--;
887953
                                         timetodie = 1;
887953
+                                        gen = ++event_pool->poller_gen;
887953
+                                        list_for_each_entry(slot, &event_pool->poller_death,
887953
+                                                            poller_death)
887953
+                                        {
887953
+                                                event_slot_ref(slot);
887953
+                                        }
887953
+
887953
+                                        list_splice_init(&event_pool->poller_death,
887953
+                                                         &poller_death_notify);
887953
+                                        event_pool->poller_death_sliced = 1;
887953
+
887953
                                         pthread_cond_broadcast (&event_pool->cond);
887953
                                 }
887953
                         }
887953
                         pthread_mutex_unlock (&event_pool->mutex);
887953
                         if (timetodie) {
887953
+                                list_for_each_entry(slot, &poller_death_notify, poller_death)
887953
+                                {
887953
+                                        slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1);
887953
+                                }
887953
+
887953
+                                pthread_mutex_lock(&event_pool->mutex);
887953
+                                {
887953
+                                        list_for_each_entry_safe(slot, tmp, &poller_death_notify, poller_death)
887953
+                                        {
887953
+                                                __event_slot_unref(event_pool, slot, slot->idx);
887953
+                                        }
887953
+
887953
+                                        list_splice(&poller_death_notify,
887953
+                                                        &event_pool->poller_death);
887953
+                                        event_pool->poller_death_sliced = 0;
887953
+                                        pthread_cond_broadcast(&event_pool->cond);
887953
+                                }
887953
+                                pthread_mutex_unlock(&event_pool->mutex);
887953
+
887953
                                 gf_msg ("epoll", GF_LOG_INFO, 0,
887953
                                         LG_MSG_EXITED_EPOLL_THREAD, "Exited "
887953
-                                        "thread with index %d", myindex);
887953
+                                        "thread with index %d", myindex - 1);
887953
                                 goto out;
887953
                         }
887953
                 }
887953
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
887953
index 3bffc47..ca00071 100644
887953
--- a/libglusterfs/src/event-poll.c
887953
+++ b/libglusterfs/src/event-poll.c
887953
@@ -36,12 +36,14 @@ struct event_slot_poll {
887953
 static int
887953
 event_register_poll (struct event_pool *event_pool, int fd,
887953
                      event_handler_t handler,
887953
-                     void *data, int poll_in, int poll_out);
887953
+                     void *data, int poll_in, int poll_out,
887953
+                     char notify_poller_death);
887953
 
887953
 
887953
 static int
887953
 __flush_fd (int fd, int idx, int gen, void *data,
887953
-            int poll_in, int poll_out, int poll_err)
887953
+            int poll_in, int poll_out, int poll_err,
887953
+            char notify_poller_death)
887953
 {
887953
         char buf[64];
887953
         int ret = -1;
887953
@@ -153,7 +155,7 @@ event_pool_new_poll (int count, int eventthreadcount)
887953
         }
887953
 
887953
         ret = event_register_poll (event_pool, event_pool->breaker[0],
887953
-                                   __flush_fd, NULL, 1, 0);
887953
+                                   __flush_fd, NULL, 1, 0, 0);
887953
         if (ret == -1) {
887953
                 gf_msg ("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,
887953
                         "could not register pipe fd with poll event loop");
887953
@@ -180,7 +182,8 @@ event_pool_new_poll (int count, int eventthreadcount)
887953
 static int
887953
 event_register_poll (struct event_pool *event_pool, int fd,
887953
                      event_handler_t handler,
887953
-                     void *data, int poll_in, int poll_out)
887953
+                     void *data, int poll_in, int poll_out,
887953
+                     char notify_poller_death)
887953
 {
887953
         int idx = -1;
887953
 
887953
@@ -389,7 +392,8 @@ unlock:
887953
                 ret = handler (ufds[i].fd, idx, 0, data,
887953
                                (ufds[i].revents & (POLLIN|POLLPRI)),
887953
                                (ufds[i].revents & (POLLOUT)),
887953
-                               (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)));
887953
+                               (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)),
887953
+                               0);
887953
 
887953
         return ret;
887953
 }
887953
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
887953
index bba6f84..8463c19 100644
887953
--- a/libglusterfs/src/event.c
887953
+++ b/libglusterfs/src/event.c
887953
@@ -58,14 +58,16 @@ event_pool_new (int count, int eventthreadcount)
887953
 int
887953
 event_register (struct event_pool *event_pool, int fd,
887953
                 event_handler_t handler,
887953
-                void *data, int poll_in, int poll_out)
887953
+                void *data, int poll_in, int poll_out,
887953
+                char notify_poller_death)
887953
 {
887953
         int ret = -1;
887953
 
887953
         GF_VALIDATE_OR_GOTO ("event", event_pool, out);
887953
 
887953
         ret = event_pool->ops->event_register (event_pool, fd, handler, data,
887953
-                                               poll_in, poll_out);
887953
+                                               poll_in, poll_out,
887953
+                                               notify_poller_death);
887953
 out:
887953
         return ret;
887953
 }
887953
@@ -170,7 +172,8 @@ out:
887953
 
887953
 int
887953
 poller_destroy_handler (int fd, int idx, int gen, void *data,
887953
-                       int poll_out, int poll_in, int poll_err)
887953
+                       int poll_out, int poll_in, int poll_err,
887953
+                       char event_thread_exit)
887953
 {
887953
         struct event_destroy_data *destroy = NULL;
887953
         int                        readfd  = -1, ret = -1;
887953
@@ -239,7 +242,7 @@ event_dispatch_destroy (struct event_pool *event_pool)
887953
         /* From the main thread register an event on the pipe fd[0],
887953
          */
887953
         idx = event_register (event_pool, fd[0], poller_destroy_handler,
887953
-                              &data, 1, 0);
887953
+                              &data, 1, 0, 0);
887953
         if (idx < 0)
887953
                 goto out;
887953
 
887953
diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h
887953
index c60b14a..875cd7d 100644
887953
--- a/libglusterfs/src/event.h
887953
+++ b/libglusterfs/src/event.h
887953
@@ -12,6 +12,7 @@
887953
 #define _EVENT_H_
887953
 
887953
 #include <pthread.h>
887953
+#include "list.h"
887953
 
887953
 struct event_pool;
887953
 struct event_ops;
887953
@@ -24,7 +25,8 @@ struct event_data {
887953
 
887953
 
887953
 typedef int (*event_handler_t) (int fd, int idx, int gen, void *data,
887953
-				int poll_in, int poll_out, int poll_err);
887953
+				int poll_in, int poll_out, int poll_err,
887953
+                                char event_thread_exit);
887953
 
887953
 #define EVENT_EPOLL_TABLES 1024
887953
 #define EVENT_EPOLL_SLOTS 1024
887953
@@ -41,6 +43,13 @@ struct event_pool {
887953
 	struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];
887953
 	int slots_used[EVENT_EPOLL_TABLES];
887953
 
887953
+        struct list_head poller_death;
887953
+        int poller_death_sliced; /* track whether the list of fds interested
887953
+                                  * poller_death is sliced. If yes, new thread
887953
+                                  * death notification has to wait till the
887953
+                                  * list is added back
887953
+                                  */
887953
+        int poller_gen;
887953
 	int used;
887953
 	int changed;
887953
 
887953
@@ -54,7 +63,7 @@ struct event_pool {
887953
          * epoll. */
887953
         int eventthreadcount; /* number of event threads to execute. */
887953
         pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store,
887953
-                                                     * and live status */
887953
+                                               * and live status */
887953
         int destroy;
887953
         int activethreadcount;
887953
 
887953
@@ -83,7 +92,8 @@ struct event_ops {
887953
 
887953
         int (*event_register) (struct event_pool *event_pool, int fd,
887953
                                event_handler_t handler,
887953
-                               void *data, int poll_in, int poll_out);
887953
+                               void *data, int poll_in, int poll_out,
887953
+                               char notify_poller_death);
887953
 
887953
         int (*event_select_on) (struct event_pool *event_pool, int fd, int idx,
887953
                                 int poll_in, int poll_out);
887953
@@ -107,7 +117,8 @@ int event_select_on (struct event_pool *event_pool, int fd, int idx,
887953
 		     int poll_in, int poll_out);
887953
 int event_register (struct event_pool *event_pool, int fd,
887953
 		    event_handler_t handler,
887953
-		    void *data, int poll_in, int poll_out);
887953
+		    void *data, int poll_in, int poll_out,
887953
+                    char notify_poller_death);
887953
 int event_unregister (struct event_pool *event_pool, int fd, int idx);
887953
 int event_unregister_close (struct event_pool *event_pool, int fd, int idx);
887953
 int event_dispatch (struct event_pool *event_pool);
887953
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
887953
index fd7e3ec..fe5e3fd 100644
887953
--- a/rpc/rpc-lib/src/rpc-clnt.c
887953
+++ b/rpc/rpc-lib/src/rpc-clnt.c
887953
@@ -1013,6 +1013,12 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
887953
                  */
887953
                 ret = 0;
887953
                 break;
887953
+
887953
+        case RPC_TRANSPORT_EVENT_THREAD_DIED:
887953
+            /* only meaningful on a server, no need of handling this event on a
887953
+             * client */
887953
+            ret = 0;
887953
+            break;
887953
         }
887953
 
887953
 out:
887953
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
887953
index b737ff2..db02338 100644
887953
--- a/rpc/rpc-lib/src/rpc-transport.c
887953
+++ b/rpc/rpc-lib/src/rpc-transport.c
887953
@@ -294,6 +294,10 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
887953
                 goto fail;
887953
         }
887953
 
887953
+        if (dict_get(options, "notify-poller-death")) {
887953
+                trans->notify_poller_death = 1;
887953
+        }
887953
+
887953
 	gf_log ("rpc-transport", GF_LOG_DEBUG,
887953
 		"attempt to load file %s", name);
887953
 
887953
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
887953
index c97f98d..cf77c9d 100644
887953
--- a/rpc/rpc-lib/src/rpc-transport.h
887953
+++ b/rpc/rpc-lib/src/rpc-transport.h
887953
@@ -99,6 +99,7 @@ typedef enum {
887953
         RPC_TRANSPORT_MSG_RECEIVED,         /* Complete rpc msg has been read */
887953
         RPC_TRANSPORT_CONNECT,              /* client is connected to server */
887953
         RPC_TRANSPORT_MSG_SENT,
887953
+        RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */
887953
 } rpc_transport_event_t;
887953
 
887953
 struct rpc_transport_msg {
887953
@@ -218,6 +219,8 @@ struct rpc_transport {
887953
          */
887953
         gf_boolean_t               connect_failed;
887953
         gf_atomic_t                disconnect_progress;
887953
+        char notify_poller_death;
887953
+        char poller_death_accept;
887953
 };
887953
 
887953
 struct rpc_transport_ops {
887953
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
887953
index faa1956..c769463 100644
887953
--- a/rpc/rpc-lib/src/rpcsvc.c
887953
+++ b/rpc/rpc-lib/src/rpcsvc.c
887953
@@ -8,6 +8,7 @@
887953
   cases as published by the Free Software Foundation.
887953
 */
887953
 
887953
+#include <math.h>
887953
 #include "rpcsvc.h"
887953
 #include "rpc-transport.h"
887953
 #include "dict.h"
887953
@@ -56,9 +57,76 @@ int
887953
 rpcsvc_notify (rpc_transport_t *trans, void *mydata,
887953
                rpc_transport_event_t event, void *data, ...);
887953
 
887953
+void *
887953
+rpcsvc_request_handler(void *arg);
887953
+
887953
 static int
887953
 rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr);
887953
 
887953
+void
887953
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
887953
+                           rpcsvc_request_queue_t *queue, char status[])
887953
+{
887953
+        int queue_index = 0, status_index = 0, set_bit = 0;
887953
+
887953
+        if (queue != &prog->request_queue[0]) {
887953
+                queue_index = (queue - &prog->request_queue[0]);
887953
+        }
887953
+
887953
+        status_index = queue_index / 8;
887953
+        set_bit = queue_index % 8;
887953
+
887953
+        status[status_index] ^= (1 << set_bit);
887953
+
887953
+        return;
887953
+}
887953
+
887953
+static int
887953
+get_rightmost_set_bit(int n)
887953
+{
887953
+        return log2(n & -n);
887953
+}
887953
+
887953
+int
887953
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
887953
+{
887953
+        int queue_index = 0, max_index = 0, i = 0;
887953
+        unsigned int right_most_unset_bit = 0;
887953
+
887953
+        right_most_unset_bit = 8;
887953
+
887953
+        max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8;
887953
+        for (i = 0; i < max_index; i++) {
887953
+                if (prog->request_queue_status[i] == 0) {
887953
+                        right_most_unset_bit = 0;
887953
+                        break;
887953
+                } else {
887953
+                        right_most_unset_bit = get_rightmost_set_bit(
887953
+                                        ~prog->request_queue_status[i]);
887953
+                        if (right_most_unset_bit < 8) {
887953
+                                break;
887953
+                        }
887953
+                }
887953
+        }
887953
+
887953
+        if (right_most_unset_bit > 7) {
887953
+                queue_index = -1;
887953
+        } else {
887953
+                queue_index = i * 8;
887953
+                queue_index += right_most_unset_bit;
887953
+
887953
+                if (queue_index > EVENT_MAX_THREADS) {
887953
+                        queue_index = -1;
887953
+                }
887953
+        }
887953
+
887953
+        if (queue_index != -1) {
887953
+                prog->request_queue_status[i] |= (0x1 << right_most_unset_bit);
887953
+        }
887953
+
887953
+        return queue_index;
887953
+}
887953
+
887953
 rpcsvc_notify_wrapper_t *
887953
 rpcsvc_notify_wrapper_alloc (void)
887953
 {
887953
@@ -412,7 +480,6 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
887953
         req->progver = rpc_call_progver (callmsg);
887953
         req->procnum = rpc_call_progproc (callmsg);
887953
         req->trans = rpc_transport_ref (trans);
887953
-        gf_client_ref (req->trans->xl_private);
887953
         req->count = msg->count;
887953
         req->msg[0] = progmsg;
887953
         req->iobref = iobref_ref (msg->iobref);
887953
@@ -570,6 +637,73 @@ rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
887953
         return 0;
887953
 }
887953
 
887953
+void
887953
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
887953
+{
887953
+        rpcsvc_request_queue_t *queue = NULL;
887953
+        int num = 0;
887953
+        void *value = NULL;
887953
+        rpcsvc_request_t *req = NULL;
887953
+        char empty = 0;
887953
+
887953
+        value = pthread_getspecific(prog->req_queue_key);
887953
+        if (value == NULL) {
887953
+                return;
887953
+        }
887953
+
887953
+        num = ((unsigned long)value) - 1;
887953
+
887953
+        queue = &prog->request_queue[num];
887953
+
887953
+        if (queue->gen == gen) {
887953
+                /* duplicate event */
887953
+                gf_log(GF_RPCSVC, GF_LOG_INFO,
887953
+                                "not queuing duplicate event thread death. "
887953
+                                "queue %d program %s",
887953
+                                num, prog->progname);
887953
+                return;
887953
+        }
887953
+
887953
+        rpcsvc_alloc_request(svc, req);
887953
+        req->prognum = RPCSVC_INFRA_PROGRAM;
887953
+        req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
887953
+        gf_log(GF_RPCSVC, GF_LOG_INFO,
887953
+                        "queuing event thread death request to queue %d of program %s", num,
887953
+                        prog->progname);
887953
+
887953
+        pthread_mutex_lock(&queue->queue_lock);
887953
+        {
887953
+                empty = list_empty(&queue->request_queue);
887953
+
887953
+                list_add_tail(&req->request_list, &queue->request_queue);
887953
+                queue->gen = gen;
887953
+
887953
+                if (empty && queue->waiting)
887953
+                        pthread_cond_signal(&queue->queue_cond);
887953
+        }
887953
+        pthread_mutex_unlock(&queue->queue_lock);
887953
+
887953
+        return;
887953
+}
887953
+
887953
+int
887953
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
887953
+{
887953
+        rpcsvc_program_t *prog = NULL;
887953
+
887953
+        pthread_mutex_lock (&svc->rpclock);
887953
+        {
887953
+                list_for_each_entry(prog, &svc->programs, program)
887953
+                {
887953
+                        if (prog->ownthread)
887953
+                                rpcsvc_queue_event_thread_death(svc, prog, gen);
887953
+                }
887953
+        }
887953
+        pthread_mutex_unlock (&svc->rpclock);
887953
+
887953
+        return 0;
887953
+}
887953
+
887953
 int
887953
 rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
887953
                         rpc_transport_pollin_t *msg)
887953
@@ -581,8 +715,12 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
887953
         uint16_t                port           = 0;
887953
         gf_boolean_t            is_unix        = _gf_false, empty = _gf_false;
887953
         gf_boolean_t            unprivileged   = _gf_false;
887953
+        gf_boolean_t            spawn_request_handler   = _gf_false;
887953
         drc_cached_op_t        *reply          = NULL;
887953
         rpcsvc_drc_globals_t   *drc            = NULL;
887953
+        rpcsvc_request_queue_t *queue          = NULL;
887953
+        long                    num            = 0;
887953
+        void                   *value          = NULL;
887953
 
887953
         if (!trans || !svc)
887953
                 return -1;
887953
@@ -696,20 +834,83 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
887953
                                             rpcsvc_check_and_reply_error, NULL,
887953
                                             req);
887953
                 } else if (req->ownthread) {
887953
-                        pthread_mutex_lock (&req->prog->queue_lock);
887953
+                        value = pthread_getspecific(req->prog->req_queue_key);
887953
+                        if (value == NULL) {
887953
+                                pthread_mutex_lock(&req->prog->thr_lock);
887953
+                                {
887953
+                                        num = rpcsvc_get_free_queue_index(req->prog);
887953
+                                        if (num != -1) {
887953
+                                                num++;
887953
+                                                value = (void *)num;
887953
+                                                ret = pthread_setspecific(req->prog->req_queue_key,
887953
+                                                                value);
887953
+                                                if (ret < 0) {
887953
+                                                        gf_log(GF_RPCSVC, GF_LOG_WARNING,
887953
+                                                               "setting request queue in TLS failed");
887953
+                                                        rpcsvc_toggle_queue_status(
887953
+                                                                        req->prog, &req->prog->request_queue[num - 1],
887953
+                                                                        req->prog->request_queue_status);
887953
+                                                        num = -1;
887953
+                                                } else {
887953
+                                                        spawn_request_handler = 1;
887953
+                                                }
887953
+                                        }
887953
+                                }
887953
+                                pthread_mutex_unlock(&req->prog->thr_lock);
887953
+                        }
887953
+
887953
+                        if (num == -1)
887953
+                                goto noqueue;
887953
+
887953
+                        num = ((unsigned long)value) - 1;
887953
+
887953
+                        queue = &req->prog->request_queue[num];
887953
+
887953
+                        if (spawn_request_handler) {
887953
+                                ret = gf_thread_create(&queue->thread, NULL,
887953
+                                                       rpcsvc_request_handler, queue,
887953
+                                                       "rpcrqhnd");
887953
+                                if (!ret) {
887953
+                                        gf_log(GF_RPCSVC, GF_LOG_INFO,
887953
+                                               "spawned a request handler "
887953
+                                               "thread for queue %d",
887953
+                                               (int)num);
887953
+
887953
+                                        req->prog->threadcount++;
887953
+                                } else {
887953
+                                        gf_log(GF_RPCSVC, GF_LOG_INFO,
887953
+                                               "spawning a request handler "
887953
+                                               "thread for queue %d failed",
887953
+                                               (int)num);
887953
+                                        ret = pthread_setspecific(req->prog->req_queue_key, 0);
887953
+                                        if (ret < 0) {
887953
+                                                gf_log(GF_RPCSVC, GF_LOG_WARNING,
887953
+                                                       "resetting request "
887953
+                                                       "queue in TLS failed");
887953
+                                        }
887953
+
887953
+                                        rpcsvc_toggle_queue_status(
887953
+                                                        req->prog, &req->prog->request_queue[num - 1],
887953
+                                                        req->prog->request_queue_status);
887953
+
887953
+                                        goto noqueue;
887953
+                                }
887953
+                        }
887953
+
887953
+                        pthread_mutex_lock(&queue->queue_lock);
887953
                         {
887953
-                                empty = list_empty (&req->prog->request_queue);
887953
+                                empty = list_empty(&queue->request_queue);
887953
 
887953
-                                list_add_tail (&req->request_list,
887953
-                                               &req->prog->request_queue);
887953
+                                list_add_tail(&req->request_list, &queue->request_queue);
887953
 
887953
-                                if (empty)
887953
-                                        pthread_cond_signal (&req->prog->queue_cond);
887953
+                                if (empty && queue->waiting)
887953
+                                        pthread_cond_signal(&queue->queue_cond);
887953
                         }
887953
-                        pthread_mutex_unlock (&req->prog->queue_lock);
887953
+                        pthread_mutex_unlock(&queue->queue_lock);
887953
 
887953
                         ret = 0;
887953
                 } else {
887953
+noqueue:
887953
                         ret = actor_fn (req);
887953
                 }
887953
         }
887953
@@ -838,6 +1039,12 @@ rpcsvc_notify (rpc_transport_t *trans, void *mydata,
887953
                         "got MAP_XID event, which should have not come");
887953
                 ret = 0;
887953
                 break;
887953
+
887953
+        case RPC_TRANSPORT_EVENT_THREAD_DIED:
887953
+            rpcsvc_handle_event_thread_death(svc, trans,
887953
+                                             (int)(unsigned long)data);
887953
+            ret = 0;
887953
+            break;
887953
         }
887953
 
887953
 out:
887953
@@ -1779,6 +1986,7 @@ rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name)
887953
                         goto out;
887953
                 }
887953
 
887953
+                dict_del(options, "notify-poller-death");
887953
                 GF_FREE (transport_name);
887953
                 transport_name = NULL;
887953
                 count++;
887953
@@ -1864,50 +2072,87 @@ out:
887953
 void *
887953
 rpcsvc_request_handler (void *arg)
887953
 {
887953
-        rpcsvc_program_t *program = arg;
887953
-        rpcsvc_request_t *req     = NULL;
887953
+        rpcsvc_request_queue_t *queue = NULL;
887953
+        rpcsvc_program_t *program = NULL;
887953
+        rpcsvc_request_t *req = NULL, *tmp_req = NULL;
887953
         rpcsvc_actor_t   *actor   = NULL;
887953
         gf_boolean_t      done    = _gf_false;
887953
         int               ret     = 0;
887953
+        struct list_head tmp_list = {
887953
+                0,
887953
+        };
887953
+
887953
+        queue = arg;
887953
+        program = queue->program;
887953
+
887953
+        INIT_LIST_HEAD(&tmp_list);
887953
 
887953
         if (!program)
887953
                 return NULL;
887953
 
887953
         while (1) {
887953
-                pthread_mutex_lock (&program->queue_lock);
887953
+                pthread_mutex_lock(&queue->queue_lock);
887953
                 {
887953
-                        if (!program->alive
887953
-                            && list_empty (&program->request_queue)) {
887953
+                        if (!program->alive && list_empty(&queue->request_queue)) {
887953
                                 done = 1;
887953
                                 goto unlock;
887953
                         }
887953
-
887953
-                        while (list_empty (&program->request_queue))
887953
-                                pthread_cond_wait (&program->queue_cond,
887953
-                                                   &program->queue_lock);
887953
-
887953
-                        req = list_entry (program->request_queue.next,
887953
-                                          typeof (*req), request_list);
887953
-
887953
-                        list_del_init (&req->request_list);
887953
+                        while (list_empty(&queue->request_queue)) {
887953
+                                queue->waiting = _gf_true;
887953
+                                pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
887953
+                        }
887953
+                        queue->waiting = _gf_false;
887953
+                        if (!list_empty(&queue->request_queue)) {
887953
+                                INIT_LIST_HEAD(&tmp_list);
887953
+                                list_splice_init(&queue->request_queue, &tmp_list);
887953
+                        }
887953
+                }
887953
+unlock:
887953
+                pthread_mutex_unlock(&queue->queue_lock);
887953
+                list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
887953
+                {
887953
+                        list_del_init(&req->request_list);
887953
+                        if (req) {
887953
+                                if (req->prognum == RPCSVC_INFRA_PROGRAM) {
887953
+                                        switch (req->procnum) {
887953
+                                        case RPCSVC_PROC_EVENT_THREAD_DEATH:
887953
+                                                gf_log(GF_RPCSVC, GF_LOG_INFO,
887953
+                                                       "event thread died, exiting request handler "
887953
+                                                       "thread for queue %d of program %s",
887953
+                                                       (int)(queue - &program->request_queue[0]),
887953
+                                                       program->progname);
887953
+                                                done = 1;
887953
+
887953
+                                                pthread_mutex_lock(&program->thr_lock);
887953
+                                                {
887953
+                                                        rpcsvc_toggle_queue_status(
887953
+                                                                        program, queue,
887953
+                                                                        program->request_queue_status);
887953
+                                                        program->threadcount--;
887953
+                                                }
887953
+                                                pthread_mutex_unlock(&program->thr_lock);
887953
+                                                rpcsvc_request_destroy(req);
887953
+                                                break;
887953
+
887953
+                                        default:
887953
+                                                break;
887953
+                                        }
887953
+                                } else {
887953
+                                        THIS = req->svc->xl;
887953
+                                        actor = rpcsvc_program_actor(req);
887953
+                                        ret = actor->actor(req);
887953
+
887953
+                                        if (ret != 0) {
887953
+                                                rpcsvc_check_and_reply_error(ret, NULL, req);
887953
+                                        }
887953
+
887953
+                                        req = NULL;
887953
+                                }
887953
+                        }
887953
                 }
887953
-        unlock:
887953
-                pthread_mutex_unlock (&program->queue_lock);
887953
-
887953
                 if (done)
887953
                         break;
887953
-
887953
-                THIS = req->svc->xl;
887953
-
887953
-                actor = rpcsvc_program_actor (req);
887953
-
887953
-                ret = actor->actor (req);
887953
-
887953
-                if (ret != 0) {
887953
-                        rpcsvc_check_and_reply_error (ret, NULL, req);
887953
-                }
887953
         }
887953
-
887953
         return NULL;
887953
 }
887953
 
887953
@@ -1917,6 +2162,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
887953
         int               ret                = -1;
887953
         rpcsvc_program_t *newprog            = NULL;
887953
         char              already_registered = 0;
887953
+        int               i                  = 0;
887953
 
887953
         if (!svc) {
887953
                 goto out;
887953
@@ -1951,9 +2197,16 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
887953
         memcpy (newprog, program, sizeof (*program));
887953
 
887953
         INIT_LIST_HEAD (&newprog->program);
887953
-        INIT_LIST_HEAD (&newprog->request_queue);
887953
-        pthread_mutex_init (&newprog->queue_lock, NULL);
887953
-        pthread_cond_init (&newprog->queue_cond, NULL);
887953
+
887953
+        for (i = 0; i < EVENT_MAX_THREADS; i++) {
887953
+                INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
887953
+                pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL);
887953
+                pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
887953
+                newprog->request_queue[i].program = newprog;
887953
+        }
887953
+
887953
+        pthread_mutex_init(&newprog->thr_lock, NULL);
887953
+        pthread_cond_init(&newprog->thr_cond, NULL);
887953
 
887953
         newprog->alive = _gf_true;
887953
 
887953
@@ -1962,9 +2215,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
887953
                 newprog->ownthread = _gf_false;
887953
 
887953
         if (newprog->ownthread) {
887953
-                gf_thread_create (&newprog->thread, NULL,
887953
-                                  rpcsvc_request_handler,
887953
-                                  newprog, "reqhnd");
887953
+                struct event_pool *ep = svc->ctx->event_pool;
887953
+                newprog->eventthreadcount = ep->eventthreadcount;
887953
+
887953
+                pthread_key_create(&newprog->req_queue_key, NULL);
887953
+                newprog->thr_queue = 1;
887953
         }
887953
 
887953
         pthread_mutex_lock (&svc->rpclock);
887953
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
887953
index 58c0055..f500bab 100644
887953
--- a/rpc/rpc-lib/src/rpcsvc.h
887953
+++ b/rpc/rpc-lib/src/rpcsvc.h
887953
@@ -33,6 +33,16 @@
887953
 #define MAX_IOVEC 16
887953
 #endif
887953
 
887953
+/* TODO: we should store prognums at a centralized location to avoid conflict
887953
+         or use a robust random number generator to avoid conflicts
887953
+*/
887953
+
887953
+#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */
887953
+
887953
+typedef enum {
887953
+    RPCSVC_PROC_EVENT_THREAD_DEATH = 0,
887953
+} rpcsvc_infra_procnum_t;
887953
+
887953
 #define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 /* Default for protocol/server */
887953
 #define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */
887953
 #define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536
887953
@@ -349,6 +359,16 @@ typedef struct rpcsvc_actor_desc {
887953
         drc_op_type_t           op_type;
887953
 } rpcsvc_actor_t;
887953
 
887953
+typedef struct rpcsvc_request_queue {
887953
+        int gen;
887953
+        struct list_head       request_queue;
887953
+        pthread_mutex_t        queue_lock;
887953
+        pthread_cond_t         queue_cond;
887953
+        pthread_t              thread;
887953
+        struct rpcsvc_program *program;
887953
+        gf_boolean_t           waiting;
887953
+} rpcsvc_request_queue_t;
887953
+
887953
 /* Describes a program and its version along with the function pointers
887953
  * required to handle the procedures/actors of each program/version.
887953
  * Never changed ever by any thread so no need for a lock.
887953
@@ -409,10 +429,14 @@ struct rpcsvc_program {
887953
         gf_boolean_t            synctask;
887953
         /* list member to link to list of registered services with rpcsvc */
887953
         struct list_head        program;
887953
-        struct list_head        request_queue;
887953
-        pthread_mutex_t         queue_lock;
887953
-        pthread_cond_t          queue_cond;
887953
-        pthread_t               thread;
887953
+        rpcsvc_request_queue_t  request_queue[EVENT_MAX_THREADS];
887953
+        char                    request_queue_status[EVENT_MAX_THREADS / 8 + 1];
887953
+        pthread_mutex_t         thr_lock;
887953
+        pthread_cond_t          thr_cond;
887953
+        int                     thr_queue;
887953
+        pthread_key_t           req_queue_key;
887953
+        int                     threadcount;
887953
+        int                     eventthreadcount;
887953
 };
887953
 
887953
 typedef struct rpcsvc_cbk_program {
887953
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
887953
index e28c5cd..df984f8 100644
887953
--- a/rpc/rpc-transport/socket/src/socket.c
887953
+++ b/rpc/rpc-transport/socket/src/socket.c
887953
@@ -2419,7 +2419,8 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
887953
 /* reads rpc_requests during pollin */
887953
 static int
887953
 socket_event_handler (int fd, int idx, int gen, void *data,
887953
-                      int poll_in, int poll_out, int poll_err)
887953
+                      int poll_in, int poll_out, int poll_err,
887953
+                      char event_thread_died)
887953
 {
887953
         rpc_transport_t  *this          = NULL;
887953
         socket_private_t *priv          = NULL;
887953
@@ -2429,6 +2430,13 @@ socket_event_handler (int fd, int idx, int gen, void *data,
887953
 
887953
         this = data;
887953
 
887953
+        if (event_thread_died) {
887953
+                /* to avoid duplicate notifications,
887953
+                 * notify only for listener sockets
887953
+                 */
887953
+                return 0;
887953
+        }
887953
+
887953
         GF_VALIDATE_OR_GOTO ("socket", this, out);
887953
         GF_VALIDATE_OR_GOTO ("socket", this->private, out);
887953
         GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
887953
@@ -2720,7 +2728,8 @@ socket_spawn (rpc_transport_t *this)
887953
 
887953
 static int
887953
 socket_server_event_handler (int fd, int idx, int gen, void *data,
887953
-                             int poll_in, int poll_out, int poll_err)
887953
+                             int poll_in, int poll_out, int poll_err,
887953
+                             char event_thread_died)
887953
 {
887953
         rpc_transport_t             *this = NULL;
887953
         socket_private_t        *priv = NULL;
887953
@@ -2742,6 +2751,12 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
887953
         priv = this->private;
887953
         ctx  = this->ctx;
887953
 
887953
+        if (event_thread_died) {
887953
+                rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED,
887953
+                                     (void *)(unsigned long)gen);
887953
+                return 0;
887953
+        }
887953
+
887953
         /* NOTE:
887953
          * We have done away with the critical section in this function. since
887953
          * there's little that it helps with. There's no other code that
887953
@@ -2840,6 +2855,7 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
887953
                 new_trans->mydata = this->mydata;
887953
                 new_trans->notify = this->notify;
887953
                 new_trans->listener = this;
887953
+                new_trans->notify_poller_death = this->poller_death_accept;
887953
                 new_priv = new_trans->private;
887953
 
887953
                 if (new_sockaddr.ss_family == AF_UNIX) {
887953
@@ -2935,7 +2951,8 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
887953
                                                         new_sock,
887953
                                                         socket_event_handler,
887953
                                                         new_trans,
887953
-                                                        1, 0);
887953
+                                                        1, 0,
887953
+                                                        new_trans->notify_poller_death);
887953
                                 if (new_priv->idx == -1) {
887953
                                         ret = -1;
887953
                                         gf_log(this->name, GF_LOG_ERROR,
887953
@@ -3388,7 +3405,8 @@ handler:
887953
                 else {
887953
                         priv->idx = event_register (ctx->event_pool, priv->sock,
887953
                                                     socket_event_handler,
887953
-                                                    this, 1, 1);
887953
+                                                    this, 1, 1,
887953
+                                                    this->notify_poller_death);
887953
                         if (priv->idx == -1) {
887953
                                 gf_log ("", GF_LOG_WARNING,
887953
                                         "failed to register the event");
887953
@@ -3560,7 +3578,8 @@ socket_listen (rpc_transport_t *this)
887953
 
887953
                 priv->idx = event_register (ctx->event_pool, priv->sock,
887953
                                             socket_server_event_handler,
887953
-                                            this, 1, 0);
887953
+                                            this, 1, 0,
887953
+                                            this->notify_poller_death);
887953
 
887953
                 if (priv->idx == -1) {
887953
                         gf_log (this->name, GF_LOG_WARNING,
887953
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c
887953
index 7cc3d15..30045ef 100644
887953
--- a/xlators/protocol/server/src/server-helpers.c
887953
+++ b/xlators/protocol/server/src/server-helpers.c
887953
@@ -557,6 +557,10 @@ get_frame_from_request (rpcsvc_request_t *req)
887953
                 }
887953
         }
887953
 
887953
+        /* Add a ref for this fop */
887953
+        if (client)
887953
+                gf_client_ref (client);
887953
+
887953
         frame->root->uid      = req->uid;
887953
         frame->root->gid      = req->gid;
887953
         frame->root->pid      = req->pid;
887953
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
887953
index ba3b831..d32f5dd 100644
887953
--- a/xlators/protocol/server/src/server.c
887953
+++ b/xlators/protocol/server/src/server.c
887953
@@ -1342,6 +1342,9 @@ init (xlator_t *this)
887953
                 ret = -1;
887953
                 goto out;
887953
         }
887953
+
887953
+        ret = dict_set_int32(this->options, "notify-poller-death", 1);
887953
+
887953
         ret = rpcsvc_create_listeners (conf->rpc, this->options,
887953
                                        this->name);
887953
         if (ret < 1) {
887953
-- 
887953
1.8.3.1
887953