Blob Blame History Raw
From 3de9cc04cdf5a65825cc86c8239734a284775470 Mon Sep 17 00:00:00 2001
From: Raghavendra G <rgowdapp@redhat.com>
Date: Wed, 6 Feb 2019 17:30:30 +0530
Subject: [PATCH 524/529] program/GF-DUMP: Shield ping processing from traffic
 to Glusterfs Program

Since poller thread bears the brunt of execution till the request is
handed over to io-threads, poller thread experiencies lock
contention(s) in the control flow till io-threads, which slows it
down. This delay invariably affects reading ping requests from network
and responding to them, resulting in increased ping latencies, which
sometimes results in a ping-timer-expiry on client leading to
disconnect of transport. So, this patch aims to free up poller thread
from executing code of Glusterfs Program. We do this by making

* Glusterfs Program registering itself asking rpcsvc to execute its
  actors in its own threads.
* GF-DUMP Program registering itself asking rpcsvc to _NOT_ execute
  its actors in its own threads. Otherwise program's ownthreads become
  bottleneck in processing ping traffic. This means that poller thread
  reads a ping packet, invokes its actor and hands the response msg to
  transport queue.

Change-Id: I526268c10bdd5ef93f322a4f95385137550a6a49
Signed-off-by: Raghavendra G <rgowdapp@redhat.com>
BUG: 1390151
Reviewed-on: https://review.gluster.org/17105
NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
CentOS-regression: Gluster Build System <jenkins@build.gluster.org>
Smoke: Gluster Build System <jenkins@build.gluster.org>
Reviewed-by: Amar Tumballi <amarts@redhat.com>
Reviewed-by: Jeff Darcy <jeff@pl.atyp.us>
(cherry picked from commit 2e72b24707f1886833db0b09e48b3f48b8d68d37)
Reviewed-on: https://code.engineering.redhat.com/gerrit/162426
Tested-by: RHGS Build Bot <nigelb@redhat.com>
---
 rpc/rpc-lib/src/rpcsvc.c                      | 90 ++++++++++++++++++++++++++-
 rpc/rpc-lib/src/rpcsvc.h                      | 18 +++++-
 xlators/protocol/server/src/server-helpers.c  |  4 --
 xlators/protocol/server/src/server-rpc-fops.c |  1 +
 4 files changed, 106 insertions(+), 7 deletions(-)

diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index 695e9fb..faa1956 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -304,6 +304,7 @@ rpcsvc_program_actor (rpcsvc_request_t *req)
                 goto err;
         }
 
+        req->ownthread = program->ownthread;
         req->synctask = program->synctask;
 
         err = SUCCESS;
@@ -411,6 +412,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
         req->progver = rpc_call_progver (callmsg);
         req->procnum = rpc_call_progproc (callmsg);
         req->trans = rpc_transport_ref (trans);
+        gf_client_ref (req->trans->xl_private);
         req->count = msg->count;
         req->msg[0] = progmsg;
         req->iobref = iobref_ref (msg->iobref);
@@ -426,6 +428,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
         req->trans_private = msg->private;
 
         INIT_LIST_HEAD (&req->txlist);
+        INIT_LIST_HEAD (&req->request_list);
         req->payloadsize = 0;
 
         /* By this time, the data bytes for the auth scheme would have already
@@ -576,7 +579,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
         rpcsvc_request_t       *req            = NULL;
         int                     ret            = -1;
         uint16_t                port           = 0;
-        gf_boolean_t            is_unix        = _gf_false;
+        gf_boolean_t            is_unix        = _gf_false, empty = _gf_false;
         gf_boolean_t            unprivileged   = _gf_false;
         drc_cached_op_t        *reply          = NULL;
         rpcsvc_drc_globals_t   *drc            = NULL;
@@ -692,6 +695,20 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
                                             (synctask_fn_t) actor_fn,
                                             rpcsvc_check_and_reply_error, NULL,
                                             req);
+                } else if (req->ownthread) {
+                        pthread_mutex_lock (&req->prog->queue_lock);
+                        {
+                                empty = list_empty (&req->prog->request_queue);
+
+                                list_add_tail (&req->request_list,
+                                               &req->prog->request_queue);
+
+                                if (empty)
+                                        pthread_cond_signal (&req->prog->queue_cond);
+                        }
+                        pthread_mutex_unlock (&req->prog->queue_lock);
+
+                        ret = 0;
                 } else {
                         ret = actor_fn (req);
                 }
@@ -1572,6 +1589,12 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program)
                 " Ver: %d, Port: %d", prog->progname, prog->prognum,
                 prog->progver, prog->progport);
 
+        if (prog->ownthread) {
+                prog->alive = _gf_false;
+                ret = 0;
+                goto out;
+        }
+
         pthread_mutex_lock (&svc->rpclock);
         {
                 list_del_init (&prog->program);
@@ -1838,6 +1861,56 @@ out:
         return ret;
 }
 
+void *
+rpcsvc_request_handler (void *arg)
+{
+        rpcsvc_program_t *program = arg;
+        rpcsvc_request_t *req     = NULL;
+        rpcsvc_actor_t   *actor   = NULL;
+        gf_boolean_t      done    = _gf_false;
+        int               ret     = 0;
+
+        if (!program)
+                return NULL;
+
+        while (1) {
+                pthread_mutex_lock (&program->queue_lock);
+                {
+                        if (!program->alive
+                            && list_empty (&program->request_queue)) {
+                                done = 1;
+                                goto unlock;
+                        }
+
+                        while (list_empty (&program->request_queue))
+                                pthread_cond_wait (&program->queue_cond,
+                                                   &program->queue_lock);
+
+                        req = list_entry (program->request_queue.next,
+                                          typeof (*req), request_list);
+
+                        list_del_init (&req->request_list);
+                }
+        unlock:
+                pthread_mutex_unlock (&program->queue_lock);
+
+                if (done)
+                        break;
+
+                THIS = req->svc->xl;
+
+                actor = rpcsvc_program_actor (req);
+
+                ret = actor->actor (req);
+
+                if (ret != 0) {
+                        rpcsvc_check_and_reply_error (ret, NULL, req);
+                }
+        }
+
+        return NULL;
+}
+
 int
 rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
 {
@@ -1878,6 +1951,21 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
         memcpy (newprog, program, sizeof (*program));
 
         INIT_LIST_HEAD (&newprog->program);
+        INIT_LIST_HEAD (&newprog->request_queue);
+        pthread_mutex_init (&newprog->queue_lock, NULL);
+        pthread_cond_init (&newprog->queue_cond, NULL);
+
+        newprog->alive = _gf_true;
+
+        /* make sure synctask gets priority over ownthread */
+        if (newprog->synctask)
+                newprog->ownthread = _gf_false;
+
+        if (newprog->ownthread) {
+                gf_thread_create (&newprog->thread, NULL,
+                                  rpcsvc_request_handler,
+                                  newprog, "reqhnd");
+        }
 
         pthread_mutex_lock (&svc->rpclock);
         {
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index d3aafac..58c0055 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -233,7 +233,9 @@ struct rpcsvc_request {
          */
         rpcsvc_auth_data_t      verf;
 
-	/* Execute this request's actor function as a synctask?*/
+        /* Execute this request's actor function in ownthread of program?*/
+        gf_boolean_t            ownthread;
+
         gf_boolean_t            synctask;
         /* Container for a RPC program wanting to store a temp
          * request-specific item.
@@ -245,6 +247,10 @@ struct rpcsvc_request {
 
         /* pointer to cached reply for use in DRC */
         drc_cached_op_t         *reply;
+
+        /* request queue in rpcsvc */
+        struct list_head         request_list;
+
 };
 
 #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog))
@@ -395,10 +401,18 @@ struct rpcsvc_program {
          */
         int                     min_auth;
 
-	/* Execute actor function as a synctask? */
+        /* Execute actor function in program's own thread? */
+        /* This will reduce the workload on poller threads */
+        gf_boolean_t            ownthread;
+        gf_boolean_t            alive;
+
         gf_boolean_t            synctask;
         /* list member to link to list of registered services with rpcsvc */
         struct list_head        program;
+        struct list_head        request_queue;
+        pthread_mutex_t         queue_lock;
+        pthread_cond_t          queue_cond;
+        pthread_t               thread;
 };
 
 typedef struct rpcsvc_cbk_program {
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c
index 30045ef..7cc3d15 100644
--- a/xlators/protocol/server/src/server-helpers.c
+++ b/xlators/protocol/server/src/server-helpers.c
@@ -557,10 +557,6 @@ get_frame_from_request (rpcsvc_request_t *req)
                 }
         }
 
-        /* Add a ref for this fop */
-        if (client)
-                gf_client_ref (client);
-
         frame->root->uid      = req->uid;
         frame->root->gid      = req->gid;
         frame->root->pid      = req->pid;
diff --git a/xlators/protocol/server/src/server-rpc-fops.c b/xlators/protocol/server/src/server-rpc-fops.c
index b7bb26a..db4242d 100644
--- a/xlators/protocol/server/src/server-rpc-fops.c
+++ b/xlators/protocol/server/src/server-rpc-fops.c
@@ -6143,4 +6143,5 @@ struct rpcsvc_program glusterfs3_3_fop_prog = {
         .progver   = GLUSTER_FOP_VERSION,
         .numactors = GLUSTER_FOP_PROCCNT,
         .actors    = glusterfs3_3_fop_actors,
+        .ownthread = _gf_true,
 };
-- 
1.8.3.1