From 3de9cc04cdf5a65825cc86c8239734a284775470 Mon Sep 17 00:00:00 2001 From: Raghavendra G Date: Wed, 6 Feb 2019 17:30:30 +0530 Subject: [PATCH 524/529] program/GF-DUMP: Shield ping processing from traffic to Glusterfs Program Since poller thread bears the brunt of execution till the request is handed over to io-threads, poller thread experiencies lock contention(s) in the control flow till io-threads, which slows it down. This delay invariably affects reading ping requests from network and responding to them, resulting in increased ping latencies, which sometimes results in a ping-timer-expiry on client leading to disconnect of transport. So, this patch aims to free up poller thread from executing code of Glusterfs Program. We do this by making * Glusterfs Program registering itself asking rpcsvc to execute its actors in its own threads. * GF-DUMP Program registering itself asking rpcsvc to _NOT_ execute its actors in its own threads. Otherwise program's ownthreads become bottleneck in processing ping traffic. This means that poller thread reads a ping packet, invokes its actor and hands the response msg to transport queue. Change-Id: I526268c10bdd5ef93f322a4f95385137550a6a49 Signed-off-by: Raghavendra G BUG: 1390151 Reviewed-on: https://review.gluster.org/17105 NetBSD-regression: NetBSD Build System CentOS-regression: Gluster Build System Smoke: Gluster Build System Reviewed-by: Amar Tumballi Reviewed-by: Jeff Darcy (cherry picked from commit 2e72b24707f1886833db0b09e48b3f48b8d68d37) Reviewed-on: https://code.engineering.redhat.com/gerrit/162426 Tested-by: RHGS Build Bot --- rpc/rpc-lib/src/rpcsvc.c | 90 ++++++++++++++++++++++++++- rpc/rpc-lib/src/rpcsvc.h | 18 +++++- xlators/protocol/server/src/server-helpers.c | 4 -- xlators/protocol/server/src/server-rpc-fops.c | 1 + 4 files changed, 106 insertions(+), 7 deletions(-) diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 695e9fb..faa1956 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -304,6 +304,7 @@ rpcsvc_program_actor (rpcsvc_request_t *req) goto err; } + req->ownthread = program->ownthread; req->synctask = program->synctask; err = SUCCESS; @@ -411,6 +412,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans, req->progver = rpc_call_progver (callmsg); req->procnum = rpc_call_progproc (callmsg); req->trans = rpc_transport_ref (trans); + gf_client_ref (req->trans->xl_private); req->count = msg->count; req->msg[0] = progmsg; req->iobref = iobref_ref (msg->iobref); @@ -426,6 +428,7 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans, req->trans_private = msg->private; INIT_LIST_HEAD (&req->txlist); + INIT_LIST_HEAD (&req->request_list); req->payloadsize = 0; /* By this time, the data bytes for the auth scheme would have already @@ -576,7 +579,7 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, rpcsvc_request_t *req = NULL; int ret = -1; uint16_t port = 0; - gf_boolean_t is_unix = _gf_false; + gf_boolean_t is_unix = _gf_false, empty = _gf_false; gf_boolean_t unprivileged = _gf_false; drc_cached_op_t *reply = NULL; rpcsvc_drc_globals_t *drc = NULL; @@ -692,6 +695,20 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans, (synctask_fn_t) actor_fn, rpcsvc_check_and_reply_error, NULL, req); + } else if (req->ownthread) { + pthread_mutex_lock (&req->prog->queue_lock); + { + empty = list_empty (&req->prog->request_queue); + + list_add_tail (&req->request_list, + &req->prog->request_queue); + + if (empty) + pthread_cond_signal (&req->prog->queue_cond); + } + pthread_mutex_unlock (&req->prog->queue_lock); + + ret = 0; } else { ret = actor_fn (req); } @@ -1572,6 +1589,12 @@ rpcsvc_program_unregister (rpcsvc_t *svc, rpcsvc_program_t *program) " Ver: %d, Port: %d", prog->progname, prog->prognum, prog->progver, prog->progport); + if (prog->ownthread) { + prog->alive = _gf_false; + ret = 0; + goto out; + } + pthread_mutex_lock (&svc->rpclock); { list_del_init (&prog->program); @@ -1838,6 +1861,56 @@ out: return ret; } +void * +rpcsvc_request_handler (void *arg) +{ + rpcsvc_program_t *program = arg; + rpcsvc_request_t *req = NULL; + rpcsvc_actor_t *actor = NULL; + gf_boolean_t done = _gf_false; + int ret = 0; + + if (!program) + return NULL; + + while (1) { + pthread_mutex_lock (&program->queue_lock); + { + if (!program->alive + && list_empty (&program->request_queue)) { + done = 1; + goto unlock; + } + + while (list_empty (&program->request_queue)) + pthread_cond_wait (&program->queue_cond, + &program->queue_lock); + + req = list_entry (program->request_queue.next, + typeof (*req), request_list); + + list_del_init (&req->request_list); + } + unlock: + pthread_mutex_unlock (&program->queue_lock); + + if (done) + break; + + THIS = req->svc->xl; + + actor = rpcsvc_program_actor (req); + + ret = actor->actor (req); + + if (ret != 0) { + rpcsvc_check_and_reply_error (ret, NULL, req); + } + } + + return NULL; +} + int rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) { @@ -1878,6 +1951,21 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) memcpy (newprog, program, sizeof (*program)); INIT_LIST_HEAD (&newprog->program); + INIT_LIST_HEAD (&newprog->request_queue); + pthread_mutex_init (&newprog->queue_lock, NULL); + pthread_cond_init (&newprog->queue_cond, NULL); + + newprog->alive = _gf_true; + + /* make sure synctask gets priority over ownthread */ + if (newprog->synctask) + newprog->ownthread = _gf_false; + + if (newprog->ownthread) { + gf_thread_create (&newprog->thread, NULL, + rpcsvc_request_handler, + newprog, "reqhnd"); + } pthread_mutex_lock (&svc->rpclock); { diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index d3aafac..58c0055 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -233,7 +233,9 @@ struct rpcsvc_request { */ rpcsvc_auth_data_t verf; - /* Execute this request's actor function as a synctask?*/ + /* Execute this request's actor function in ownthread of program?*/ + gf_boolean_t ownthread; + gf_boolean_t synctask; /* Container for a RPC program wanting to store a temp * request-specific item. @@ -245,6 +247,10 @@ struct rpcsvc_request { /* pointer to cached reply for use in DRC */ drc_cached_op_t *reply; + + /* request queue in rpcsvc */ + struct list_head request_list; + }; #define rpcsvc_request_program(req) ((rpcsvc_program_t *)((req)->prog)) @@ -395,10 +401,18 @@ struct rpcsvc_program { */ int min_auth; - /* Execute actor function as a synctask? */ + /* Execute actor function in program's own thread? */ + /* This will reduce the workload on poller threads */ + gf_boolean_t ownthread; + gf_boolean_t alive; + gf_boolean_t synctask; /* list member to link to list of registered services with rpcsvc */ struct list_head program; + struct list_head request_queue; + pthread_mutex_t queue_lock; + pthread_cond_t queue_cond; + pthread_t thread; }; typedef struct rpcsvc_cbk_program { diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c index 30045ef..7cc3d15 100644 --- a/xlators/protocol/server/src/server-helpers.c +++ b/xlators/protocol/server/src/server-helpers.c @@ -557,10 +557,6 @@ get_frame_from_request (rpcsvc_request_t *req) } } - /* Add a ref for this fop */ - if (client) - gf_client_ref (client); - frame->root->uid = req->uid; frame->root->gid = req->gid; frame->root->pid = req->pid; diff --git a/xlators/protocol/server/src/server-rpc-fops.c b/xlators/protocol/server/src/server-rpc-fops.c index b7bb26a..db4242d 100644 --- a/xlators/protocol/server/src/server-rpc-fops.c +++ b/xlators/protocol/server/src/server-rpc-fops.c @@ -6143,4 +6143,5 @@ struct rpcsvc_program glusterfs3_3_fop_prog = { .progver = GLUSTER_FOP_VERSION, .numactors = GLUSTER_FOP_PROCCNT, .actors = glusterfs3_3_fop_actors, + .ownthread = _gf_true, }; -- 1.8.3.1