Blob Blame History Raw
From a2c7141364b3e55c7206923ec76997994bff62c0 Mon Sep 17 00:00:00 2001
From: Raghavendra G <rgowdapp@redhat.com>
Date: Fri, 5 May 2017 15:21:30 +0530
Subject: [PATCH 479/486] event/epoll: Add back socket for polling of events
 immediately after reading the entire rpc message from the wire

Currently socket is added back for future events after higher layers
(rpc, xlators etc) have processed the message. If message processing
involves signficant delay (as in writev replies processed by Erasure
Coding), performance takes hit. Hence this patch modifies
transport/socket to add back the socket for polling of events
immediately after reading the entire rpc message, but before
notification to higher layers.

credits: Thanks to "Kotresh Hiremath Ravishankar"
         <khiremat@redhat.com> for assitance in fixing a regression in
         bitrot caused by this patch.

>Reviewed-on: https://review.gluster.org/15036
>CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
>NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
>Smoke: Gluster Build System <jenkins@build.gluster.org>
>Reviewed-by: Amar Tumballi <amarts@redhat.com>
>Change-Id: I04b6b9d0b51a1cfb86ecac3c3d87a5f388cf5800
>BUG: 1448364
>Signed-off-by: Raghavendra G <rgowdapp@redhat.com>

Change-Id: I33724b6ab484271aa80c973dff063caf5f7da55b
BUG: 1420796
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/107554
Reviewed-by: Atin Mukherjee <amukherj@redhat.com>
---
 cli/src/cli-rl.c                      |   8 ++-
 glusterfsd/src/glusterfsd-mgmt.c      | 122 +++++++++++++++++++---------------
 libglusterfs/src/event-epoll.c        |  81 ++++++++++++++--------
 libglusterfs/src/event-poll.c         |   4 +-
 libglusterfs/src/event.c              |  59 +++++++++++-----
 libglusterfs/src/event.h              |  11 ++-
 libglusterfs/src/glusterfs.h          |   2 +
 rpc/rpc-transport/socket/src/socket.c |  96 ++++++++++++++++++++------
 rpc/rpc-transport/socket/src/socket.h |   6 ++
 9 files changed, 268 insertions(+), 121 deletions(-)

diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c
index bca37d9..4745cf4 100644
--- a/cli/src/cli-rl.c
+++ b/cli/src/cli-rl.c
@@ -108,11 +108,17 @@ cli_rl_process_line (char *line)
 
 
 int
-cli_rl_stdin (int fd, int idx, void *data,
+cli_rl_stdin (int fd, int idx, int gen, void *data,
               int poll_out, int poll_in, int poll_err)
 {
+        struct cli_state *state = NULL;
+
+        state = data;
+
         rl_callback_read_char ();
 
+        event_handled (state->ctx->event_pool, fd, idx, gen);
+
         return 0;
 }
 
diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c
index 968570c..2d84146 100644
--- a/glusterfsd/src/glusterfsd-mgmt.c
+++ b/glusterfsd/src/glusterfsd-mgmt.c
@@ -1745,8 +1745,7 @@ out:
 
 /* XXX: move these into @ctx */
 static char *oldvolfile = NULL;
-static int oldvollen = 0;
-
+static int   oldvollen;
 
 
 int
@@ -1756,7 +1755,7 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,
         gf_getspec_rsp           rsp   = {0,};
         call_frame_t            *frame = NULL;
         glusterfs_ctx_t         *ctx = NULL;
-        int                      ret   = 0;
+        int                      ret   = 0, locked = 0;
         ssize_t                  size = 0;
         FILE                    *tmpfp = NULL;
         char                    *volfilebuf = NULL;
@@ -1786,74 +1785,85 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,
         ret = 0;
         size = rsp.op_ret;
 
-        if (size == oldvollen && (memcmp (oldvolfile, rsp.spec, size) == 0)) {
-                gf_log (frame->this->name, GF_LOG_INFO,
-                        "No change in volfile, continuing");
-                goto out;
-        }
+        LOCK (&ctx->volfile_lock);
+        {
+                locked = 1;
 
-        tmpfp = tmpfile ();
-        if (!tmpfp) {
-                ret = -1;
-                goto out;
-        }
+                if (size == oldvollen && (memcmp (oldvolfile, rsp.spec, size) == 0)) {
+                        gf_log (frame->this->name, GF_LOG_INFO,
+                                "No change in volfile, continuing");
+                        goto out;
+                }
 
-        fwrite (rsp.spec, size, 1, tmpfp);
-        fflush (tmpfp);
-        if (ferror (tmpfp)) {
-                ret = -1;
-                goto out;
-        }
+                tmpfp = tmpfile ();
+                if (!tmpfp) {
+                        ret = -1;
+                        goto out;
+                }
+
+                fwrite (rsp.spec, size, 1, tmpfp);
+                fflush (tmpfp);
+                if (ferror (tmpfp)) {
+                        ret = -1;
+                        goto out;
+                }
+
+                /*  Check if only options have changed. No need to reload the
+                 *  volfile if topology hasn't changed.
+                 *  glusterfs_volfile_reconfigure returns 3 possible return states
+                 *  return 0          =======> reconfiguration of options has succeeded
+                 *  return 1          =======> the graph has to be reconstructed and all the xlators should be inited
+                 *  return -1(or -ve) =======> Some Internal Error occurred during the operation
+                 */
+
+                ret = glusterfs_volfile_reconfigure (oldvollen, tmpfp, ctx, oldvolfile);
+                if (ret == 0) {
+                        gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG,
+                                "No need to re-load volfile, reconfigure done");
+                        if (oldvolfile)
+                                volfilebuf = GF_REALLOC (oldvolfile, size);
+                        else
+                                volfilebuf = GF_CALLOC (1, size, gf_common_mt_char);
+                        if (!volfilebuf) {
+                                ret = -1;
+                                goto out;
+                        }
+                        oldvolfile = volfilebuf;
+                        oldvollen = size;
+                        memcpy (oldvolfile, rsp.spec, size);
+                        goto out;
+                }
 
-        /*  Check if only options have changed. No need to reload the
-        *  volfile if topology hasn't changed.
-        *  glusterfs_volfile_reconfigure returns 3 possible return states
-        *  return 0          =======> reconfiguration of options has succeeded
-        *  return 1          =======> the graph has to be reconstructed and all the xlators should be inited
-        *  return -1(or -ve) =======> Some Internal Error occurred during the operation
-        */
+                if (ret < 0) {
+                        gf_log ("glusterfsd-mgmt",
+                                GF_LOG_DEBUG, "Reconfigure failed !!");
+                        goto out;
+                }
+
+                ret = glusterfs_process_volfp (ctx, tmpfp);
+                /* tmpfp closed */
+                tmpfp = NULL;
+                if (ret)
+                        goto out;
 
-        ret = glusterfs_volfile_reconfigure (oldvollen, tmpfp, ctx, oldvolfile);
-        if (ret == 0) {
-                gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG,
-                        "No need to re-load volfile, reconfigure done");
                 if (oldvolfile)
                         volfilebuf = GF_REALLOC (oldvolfile, size);
                 else
                         volfilebuf = GF_CALLOC (1, size, gf_common_mt_char);
+
                 if (!volfilebuf) {
                         ret = -1;
                         goto out;
                 }
+
                 oldvolfile = volfilebuf;
                 oldvollen = size;
                 memcpy (oldvolfile, rsp.spec, size);
-                goto out;
         }
+        UNLOCK (&ctx->volfile_lock);
 
-        if (ret < 0) {
-                gf_log ("glusterfsd-mgmt", GF_LOG_DEBUG, "Reconfigure failed !!");
-                goto out;
-        }
-
-        ret = glusterfs_process_volfp (ctx, tmpfp);
-        /* tmpfp closed */
-        tmpfp = NULL;
-        if (ret)
-                goto out;
-
-        if (oldvolfile)
-                volfilebuf = GF_REALLOC (oldvolfile, size);
-        else
-                volfilebuf = GF_CALLOC (1, size, gf_common_mt_char);
+        locked = 0;
 
-        if (!volfilebuf) {
-                ret = -1;
-                goto out;
-        }
-        oldvolfile = volfilebuf;
-        oldvollen = size;
-        memcpy (oldvolfile, rsp.spec, size);
         if (!is_mgmt_rpc_reconnect) {
                 need_emancipate = 1;
                 glusterfs_mgmt_pmap_signin (ctx);
@@ -1861,6 +1871,10 @@ mgmt_getspec_cbk (struct rpc_req *req, struct iovec *iov, int count,
         }
 
 out:
+
+        if (locked)
+                UNLOCK (&ctx->volfile_lock);
+
         STACK_DESTROY (frame->root);
 
         free (rsp.spec);
@@ -2363,6 +2377,8 @@ glusterfs_mgmt_init (glusterfs_ctx_t *ctx)
         if (ctx->mgmt)
                 return 0;
 
+        LOCK_INIT (&ctx->volfile_lock);
+
         if (cmd_args->volfile_server_port)
                 port = cmd_args->volfile_server_port;
 
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
index e2b4060..4b76cc9 100644
--- a/libglusterfs/src/event-epoll.c
+++ b/libglusterfs/src/event-epoll.c
@@ -569,38 +569,11 @@ pre_unlock:
         if (!handler)
 		goto out;
 
-	ret = handler (fd, idx, data,
+	ret = handler (fd, idx, gen, data,
 		       (event->events & (EPOLLIN|EPOLLPRI)),
 		       (event->events & (EPOLLOUT)),
 		       (event->events & (EPOLLERR|EPOLLHUP)));
 
-	LOCK (&slot->lock);
-	{
-		slot->in_handler--;
-
-		if (gen != slot->gen) {
-			/* event_unregister() happened while we were
-			   in handler()
-			*/
-			gf_msg_debug ("epoll", 0, "generation bumped on idx=%d"
-                                      " from gen=%d to slot->gen=%d, fd=%d, "
-				      "slot->fd=%d", idx, gen, slot->gen, fd,
-                                      slot->fd);
-			goto post_unlock;
-		}
-
-		/* This call also picks up the changes made by another
-		   thread calling event_select_on_epoll() while this
-		   thread was busy in handler()
-		*/
-                if (slot->in_handler == 0) {
-                        event->events = slot->events;
-                        ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD,
-                                         fd, event);
-                }
-	}
-post_unlock:
-	UNLOCK (&slot->lock);
 out:
 	event_slot_unref (event_pool, slot, idx);
 
@@ -891,6 +864,55 @@ event_pool_destroy_epoll (struct event_pool *event_pool)
         return ret;
 }
 
+static int
+event_handled_epoll (struct event_pool *event_pool, int fd, int idx, int gen)
+{
+        struct event_slot_epoll *slot  = NULL;
+        struct epoll_event epoll_event = {0, };
+        struct event_data *ev_data     = (void *)&epoll_event.data;
+        int                ret         = 0;
+
+	slot = event_slot_get (event_pool, idx);
+
+        assert (slot->fd == fd);
+
+	LOCK (&slot->lock);
+	{
+		slot->in_handler--;
+
+		if (gen != slot->gen) {
+			/* event_unregister() happened while we were
+			   in handler()
+			*/
+			gf_msg_debug ("epoll", 0, "generation bumped on idx=%d"
+                                      " from gen=%d to slot->gen=%d, fd=%d, "
+				      "slot->fd=%d", idx, gen, slot->gen, fd,
+                                      slot->fd);
+			goto post_unlock;
+		}
+
+		/* This call also picks up the changes made by another
+		   thread calling event_select_on_epoll() while this
+		   thread was busy in handler()
+		*/
+                if (slot->in_handler == 0) {
+                        epoll_event.events = slot->events;
+                        ev_data->idx = idx;
+                        ev_data->gen = gen;
+
+                        ret = epoll_ctl (event_pool->fd, EPOLL_CTL_MOD,
+                                         fd, &epoll_event);
+                }
+	}
+post_unlock:
+	UNLOCK (&slot->lock);
+
+        event_slot_unref (event_pool, slot, idx);
+
+        return ret;
+}
+
+
 struct event_ops event_ops_epoll = {
         .new                       = event_pool_new_epoll,
         .event_register            = event_register_epoll,
@@ -899,7 +921,8 @@ struct event_ops event_ops_epoll = {
         .event_unregister_close    = event_unregister_close_epoll,
         .event_dispatch            = event_dispatch_epoll,
         .event_reconfigure_threads = event_reconfigure_threads_epoll,
-        .event_pool_destroy        = event_pool_destroy_epoll
+        .event_pool_destroy        = event_pool_destroy_epoll,
+        .event_handled             = event_handled_epoll,
 };
 
 #endif
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
index 2006e33..3bffc47 100644
--- a/libglusterfs/src/event-poll.c
+++ b/libglusterfs/src/event-poll.c
@@ -40,7 +40,7 @@ event_register_poll (struct event_pool *event_pool, int fd,
 
 
 static int
-__flush_fd (int fd, int idx, void *data,
+__flush_fd (int fd, int idx, int gen, void *data,
             int poll_in, int poll_out, int poll_err)
 {
         char buf[64];
@@ -386,7 +386,7 @@ unlock:
         pthread_mutex_unlock (&event_pool->mutex);
 
         if (handler)
-                ret = handler (ufds[i].fd, idx, data,
+                ret = handler (ufds[i].fd, idx, 0, data,
                                (ufds[i].revents & (POLLIN|POLLPRI)),
                                (ufds[i].revents & (POLLOUT)),
                                (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)));
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
index 09ecce1..a969a18 100644
--- a/libglusterfs/src/event.c
+++ b/libglusterfs/src/event.c
@@ -159,8 +159,9 @@ event_pool_destroy (struct event_pool *event_pool)
         }
         pthread_mutex_unlock (&event_pool->mutex);
 
-        if (!destroy || (activethreadcount > 0))
+        if (!destroy || (activethreadcount > 0)) {
                 goto out;
+        }
 
         ret = event_pool->ops->event_pool_destroy (event_pool);
 out:
@@ -168,19 +169,27 @@ out:
 }
 
 int
-poller_destroy_handler (int fd, int idx, void *data,
+poller_destroy_handler (int fd, int idx, int gen, void *data,
                        int poll_out, int poll_in, int poll_err)
 {
-        int readfd = -1;
-        char buf = '\0';
+        struct event_destroy_data *destroy = NULL;
+        int                        readfd  = -1, ret = -1;
+        char                       buf     = '\0';
 
-        readfd = *(int *)data;
-        if (readfd < 0)
-                return -1;
+        destroy = data;
+        readfd = destroy->readfd;
+        if (readfd < 0) {
+                goto out;
+        }
 
         while (sys_read (readfd, &buf, 1) > 0) {
         }
-        return 0;
+
+        ret = 0;
+out:
+        event_handled (destroy->pool, fd, idx, gen);
+
+        return ret;
 }
 
 /* This function destroys all the poller threads.
@@ -197,11 +206,12 @@ poller_destroy_handler (int fd, int idx, void *data,
 int
 event_dispatch_destroy (struct event_pool *event_pool)
 {
-        int  ret     = -1;
-        int  fd[2]   = {-1};
-        int  idx     = -1;
-        int  flags   = 0;
-        struct timespec   sleep_till = {0, };
+        int                       ret        = -1, threadcount = 0;
+        int  fd[2]                           = {-1};
+        int                       idx        = -1;
+        int                       flags      = 0;
+        struct timespec           sleep_till = {0, };
+        struct event_destroy_data data       = {0, };
 
         GF_VALIDATE_OR_GOTO ("event", event_pool, out);
 
@@ -223,10 +233,13 @@ event_dispatch_destroy (struct event_pool *event_pool)
         if (ret < 0)
                 goto out;
 
+        data.pool = event_pool;
+        data.readfd = fd[1];
+
         /* From the main thread register an event on the pipe fd[0],
          */
         idx = event_register (event_pool, fd[0], poller_destroy_handler,
-                              &fd[1], 1, 0);
+                              &data, 1, 0);
         if (idx < 0)
                 goto out;
 
@@ -235,6 +248,7 @@ event_dispatch_destroy (struct event_pool *event_pool)
          */
         pthread_mutex_lock (&event_pool->mutex);
         {
+                threadcount = event_pool->eventthreadcount;
                 event_pool->destroy = 1;
         }
         pthread_mutex_unlock (&event_pool->mutex);
@@ -254,9 +268,11 @@ event_dispatch_destroy (struct event_pool *event_pool)
                  */
                 int retry = 0;
 
-                while (event_pool->activethreadcount > 0 && retry++ < 10) {
-                        if (sys_write (fd[1], "dummy", 6) == -1)
+                while (event_pool->activethreadcount > 0
+                       && (retry++ < (threadcount + 10))) {
+                        if (sys_write (fd[1], "dummy", 6) == -1) {
                                 break;
+                        }
                         sleep_till.tv_sec = time (NULL) + 1;
                         ret = pthread_cond_timedwait (&event_pool->cond,
                                                       &event_pool->mutex,
@@ -275,3 +291,14 @@ event_dispatch_destroy (struct event_pool *event_pool)
 
         return ret;
 }
+
+int
+event_handled (struct event_pool *event_pool, int fd, int idx, int gen)
+{
+        int ret = 0;
+
+        if (event_pool->ops->event_handled)
+                ret = event_pool->ops->event_handled (event_pool, fd, idx, gen);
+
+        return ret;
+}
diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h
index 1348f5d..c60b14a 100644
--- a/libglusterfs/src/event.h
+++ b/libglusterfs/src/event.h
@@ -23,7 +23,7 @@ struct event_data {
 } __attribute__ ((__packed__, __may_alias__));
 
 
-typedef int (*event_handler_t) (int fd, int idx, void *data,
+typedef int (*event_handler_t) (int fd, int idx, int gen, void *data,
 				int poll_in, int poll_out, int poll_err);
 
 #define EVENT_EPOLL_TABLES 1024
@@ -73,6 +73,11 @@ struct event_pool {
 
 };
 
+struct event_destroy_data {
+        int                readfd;
+        struct event_pool *pool;
+};
+
 struct event_ops {
         struct event_pool * (*new) (int count, int eventthreadcount);
 
@@ -93,6 +98,8 @@ struct event_ops {
         int (*event_reconfigure_threads) (struct event_pool *event_pool,
                                           int newcount);
         int (*event_pool_destroy) (struct event_pool *event_pool);
+        int (*event_handled) (struct event_pool *event_pool, int fd, int idx,
+                              int gen);
 };
 
 struct event_pool *event_pool_new (int count, int eventthreadcount);
@@ -107,4 +114,6 @@ int event_dispatch (struct event_pool *event_pool);
 int event_reconfigure_threads (struct event_pool *event_pool, int value);
 int event_pool_destroy (struct event_pool *event_pool);
 int event_dispatch_destroy (struct event_pool *event_pool);
+int event_handled (struct event_pool *event_pool, int fd, int idx, int gen);
+
 #endif /* _EVENT_H_ */
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index 4e686c3..3544aab 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -509,6 +509,8 @@ struct _glusterfs_ctx {
         int                 notifying;
 
         struct gf_ctx_tw   *tw; /* refcounted timer_wheel */
+
+        gf_lock_t           volfile_lock;
 };
 typedef struct _glusterfs_ctx glusterfs_ctx_t;
 
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index ffb56dd..10f59ee 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -1163,11 +1163,11 @@ out:
 }
 
 
-static int
-socket_event_poll_err (rpc_transport_t *this)
+static gf_boolean_t
+socket_event_poll_err (rpc_transport_t *this, int gen, int idx)
 {
-        socket_private_t *priv = NULL;
-        int               ret = -1;
+        socket_private_t *priv          = NULL;
+        gf_boolean_t      socket_closed = _gf_false;
 
         GF_VALIDATE_OR_GOTO ("socket", this, out);
         GF_VALIDATE_OR_GOTO ("socket", this->private, out);
@@ -1176,15 +1176,29 @@ socket_event_poll_err (rpc_transport_t *this)
 
         pthread_mutex_lock (&priv->lock);
         {
-                __socket_ioq_flush (this);
-                __socket_reset (this);
+                if ((priv->gen == gen) && (priv->idx == idx)
+                    && (priv->sock != -1)) {
+                        __socket_ioq_flush (this);
+                        __socket_reset (this);
+                        socket_closed = _gf_true;
+                }
         }
         pthread_mutex_unlock (&priv->lock);
 
-        rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+        if (socket_closed) {
+                pthread_mutex_lock (&priv->notify.lock);
+                {
+                        while (priv->notify.in_progress)
+                                pthread_cond_wait (&priv->notify.cond,
+                                                   &priv->notify.lock);
+                }
+                pthread_mutex_unlock (&priv->notify.lock);
+
+                rpc_transport_notify (this, RPC_TRANSPORT_DISCONNECT, this);
+        }
 
 out:
-        return ret;
+        return socket_closed;
 }
 
 
@@ -2262,22 +2276,50 @@ out:
 
 
 static int
-socket_event_poll_in (rpc_transport_t *this)
+socket_event_poll_in (rpc_transport_t *this, gf_boolean_t notify_handled)
 {
         int                     ret    = -1;
         rpc_transport_pollin_t *pollin = NULL;
         socket_private_t       *priv = this->private;
+        glusterfs_ctx_t        *ctx  = NULL;
+
+        ctx = this->ctx;
 
 	ret = socket_proto_state_machine (this, &pollin);
 
+        if (pollin) {
+                pthread_mutex_lock (&priv->notify.lock);
+                {
+                        priv->notify.in_progress++;
+                }
+                pthread_mutex_unlock (&priv->notify.lock);
+        }
+
+
+        if (notify_handled && (ret != -1))
+                event_handled (ctx->event_pool, priv->sock, priv->idx,
+                               priv->gen);
+
 	if (pollin) {
                 priv->ot_state = OT_CALLBACK;
+
                 ret = rpc_transport_notify (this, RPC_TRANSPORT_MSG_RECEIVED,
                                             pollin);
+
                 if (priv->ot_state == OT_CALLBACK) {
                         priv->ot_state = OT_RUNNING;
                 }
+
                 rpc_transport_pollin_destroy (pollin);
+
+                pthread_mutex_lock (&priv->notify.lock);
+                {
+                        --priv->notify.in_progress;
+
+                        if (!priv->notify.in_progress)
+                                pthread_cond_signal (&priv->notify.cond);
+                }
+                pthread_mutex_unlock (&priv->notify.lock);
         }
 
         return ret;
@@ -2360,24 +2402,29 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
 
 /* reads rpc_requests during pollin */
 static int
-socket_event_handler (int fd, int idx, void *data,
+socket_event_handler (int fd, int idx, int gen, void *data,
                       int poll_in, int poll_out, int poll_err)
 {
-        rpc_transport_t  *this = NULL;
-        socket_private_t *priv = NULL;
-	int               ret = -1;
+        rpc_transport_t  *this          = NULL;
+        socket_private_t *priv          = NULL;
+	int               ret           = -1;
+        glusterfs_ctx_t  *ctx           = NULL;
+        gf_boolean_t      socket_closed = _gf_false, notify_handled = _gf_false;
 
         this = data;
+
         GF_VALIDATE_OR_GOTO ("socket", this, out);
         GF_VALIDATE_OR_GOTO ("socket", this->private, out);
         GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
 
         THIS = this->xl;
         priv = this->private;
+        ctx = this->ctx;
 
         pthread_mutex_lock (&priv->lock);
         {
                 priv->idx = idx;
+                priv->gen = gen;
         }
         pthread_mutex_unlock (&priv->lock);
 
@@ -2408,16 +2455,23 @@ socket_event_handler (int fd, int idx, void *data,
         }
 
         if (!ret && poll_in) {
-                ret = socket_event_poll_in (this);
+                ret = socket_event_poll_in (this, !poll_err);
+                notify_handled = _gf_true;
         }
 
         if ((ret < 0) || poll_err) {
                 /* Logging has happened already in earlier cases */
                 gf_log ("transport", ((ret >= 0) ? GF_LOG_INFO : GF_LOG_DEBUG),
                         "EPOLLERR - disconnecting now");
-                socket_event_poll_err (this);
-                rpc_transport_unref (this);
-	}
+
+                socket_closed = socket_event_poll_err (this, gen, idx);
+
+                if (socket_closed)
+                        rpc_transport_unref (this);
+
+	} else if (!notify_handled) {
+                event_handled (ctx->event_pool, fd, idx, gen);
+        }
 
 out:
 	return ret;
@@ -2524,7 +2578,7 @@ socket_poller (void *ctx)
                 }
 
 		if (pfd[1].revents & POLL_MASK_INPUT) {
-			ret = socket_event_poll_in(this);
+			ret = socket_event_poll_in(this, 0);
 			if (ret >= 0) {
 				/* Suppress errors while making progress. */
 				pfd[1].revents &= ~POLL_MASK_ERROR;
@@ -2648,7 +2702,7 @@ socket_spawn (rpc_transport_t *this)
 }
 
 static int
-socket_server_event_handler (int fd, int idx, void *data,
+socket_server_event_handler (int fd, int idx, int gen, void *data,
                              int poll_in, int poll_out, int poll_err)
 {
         rpc_transport_t             *this = NULL;
@@ -2903,6 +2957,8 @@ socket_server_event_handler (int fd, int idx, void *data,
                 }
         }
 out:
+        event_handled (ctx->event_pool, fd, idx, gen);
+
         if (cname && (cname != this->ssl_name)) {
                 GF_FREE(cname);
         }
@@ -3998,6 +4054,8 @@ socket_init (rpc_transport_t *this)
         priv->bio = 0;
         priv->windowsize = GF_DEFAULT_SOCKET_WINDOW_SIZE;
         INIT_LIST_HEAD (&priv->ioq);
+        pthread_mutex_init (&priv->notify.lock, NULL);
+        pthread_cond_init (&priv->notify.cond, NULL);
 
         /* All the below section needs 'this->options' to be present */
         if (!this->options)
diff --git a/rpc/rpc-transport/socket/src/socket.h b/rpc/rpc-transport/socket/src/socket.h
index 8528bde..0c7bf5f 100644
--- a/rpc/rpc-transport/socket/src/socket.h
+++ b/rpc/rpc-transport/socket/src/socket.h
@@ -199,6 +199,7 @@ typedef enum {
 typedef struct {
         int32_t                sock;
         int32_t                idx;
+        int32_t                gen;
         /* -1 = not connected. 0 = in progress. 1 = connected */
         char                   connected;
         /* 1 = connect failed for reasons other than EINPROGRESS/ENOENT
@@ -249,6 +250,11 @@ typedef struct {
         int                    log_ctr;
         GF_REF_DECL;           /* refcount to keep track of socket_poller
                                   threads */
+        struct {
+                pthread_mutex_t  lock;
+                pthread_cond_t   cond;
+                uint64_t         in_progress;
+        } notify;
 } socket_private_t;
 
 
-- 
1.8.3.1