From 8503ed9b94777d47352f19ebfa844e151352b87f Mon Sep 17 00:00:00 2001 From: Milind Changire Date: Fri, 2 Mar 2018 15:39:27 +0530 Subject: [PATCH 166/180] rpcsvc: scale rpcsvc_request_handler threads Scale rpcsvc_request_handler threads to match the scaling of event handler threads. Please refer to https://bugzilla.redhat.com/show_bug.cgi?id=1467614#c51 for a discussion about why we need multi-threaded rpcsvc request handlers. mainline: > Reviewed-on: https://review.gluster.org/19337 > Reviewed-by: Raghavendra G > Signed-off-by: Milind Changire (cherry picked from commit 7d641313f46789ec0a7ba0cc04f504724c780855) Change-Id: Ib6838fb8b928e15602a3d36fd66b7ba08999430b BUG: 1549497 Signed-off-by: Milind Changire Reviewed-on: https://code.engineering.redhat.com/gerrit/131596 Tested-by: RHGS Build Bot Reviewed-by: Raghavendra Gowdappa --- glusterfsd/src/Makefile.am | 1 + glusterfsd/src/glusterfsd-mgmt.c | 16 ++++- glusterfsd/src/glusterfsd.h | 2 +- libglusterfs/src/event-poll.c | 7 ++ rpc/rpc-lib/src/rpcsvc.c | 129 +++++++++++++++++++++++++++++++---- rpc/rpc-lib/src/rpcsvc.h | 8 +++ xlators/protocol/server/src/server.c | 10 ++- 7 files changed, 153 insertions(+), 20 deletions(-) diff --git a/glusterfsd/src/Makefile.am b/glusterfsd/src/Makefile.am index 0196204..8ab585c 100644 --- a/glusterfsd/src/Makefile.am +++ b/glusterfsd/src/Makefile.am @@ -22,6 +22,7 @@ AM_CPPFLAGS = $(GF_CPPFLAGS) \ -I$(top_srcdir)/rpc/xdr/src \ -I$(top_builddir)/rpc/xdr/src \ -I$(top_srcdir)/xlators/nfs/server/src \ + -I$(top_srcdir)/xlators/protocol/server/src \ -I$(top_srcdir)/api/src AM_CFLAGS = -Wall $(GF_CFLAGS) diff --git a/glusterfsd/src/glusterfsd-mgmt.c b/glusterfsd/src/glusterfsd-mgmt.c index ca706d1..69d93f5 100644 --- a/glusterfsd/src/glusterfsd-mgmt.c +++ b/glusterfsd/src/glusterfsd-mgmt.c @@ -33,6 +33,7 @@ #include "syncop.h" #include "xlator.h" #include "syscall.h" +#include "server.h" static gf_boolean_t is_mgmt_rpc_reconnect = _gf_false; int need_emancipate = 0; @@ -185,12 +186,15 @@ glusterfs_terminate_response_send (rpcsvc_request_t *req, int op_ret) } void -glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr) +glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this) { struct event_pool *pool = ctx->event_pool; + server_conf_t *conf = this->private; + int thread_count = pool->eventthreadcount; pool->auto_thread_count += incr; - (void) event_reconfigure_threads (pool, pool->eventthreadcount+incr); + (void) event_reconfigure_threads (pool, thread_count+incr); + rpcsvc_ownthread_reconf (conf->rpc, pool->eventthreadcount); } int @@ -839,6 +843,7 @@ glusterfs_handle_attach (rpcsvc_request_t *req) xlator_t *nextchild = NULL; glusterfs_graph_t *newgraph = NULL; glusterfs_ctx_t *ctx = NULL; + xlator_t *protocol_server = NULL; GF_ASSERT (req); this = THIS; @@ -876,7 +881,12 @@ glusterfs_handle_attach (rpcsvc_request_t *req) nextchild->name); goto out; } - glusterfs_autoscale_threads (this->ctx, 1); + /* we need a protocol/server xlator as + * nextchild + */ + protocol_server = this->ctx->active->first; + glusterfs_autoscale_threads (this->ctx, 1, + protocol_server); } } else { gf_log (this->name, GF_LOG_WARNING, diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index 6d1e165..43cef52 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -124,7 +124,7 @@ int glusterfs_volume_top_read_perf (uint32_t blk_size, uint32_t blk_count, char *brick_path, double *throughput, double *time); void -glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr); +glusterfs_autoscale_threads (glusterfs_ctx_t *ctx, int incr, xlator_t *this); extern glusterfs_ctx_t *glusterfsd_ctx; #endif /* __GLUSTERFSD_H__ */ diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c index 3bffc47..b1aca82 100644 --- a/libglusterfs/src/event-poll.c +++ b/libglusterfs/src/event-poll.c @@ -173,6 +173,13 @@ event_pool_new_poll (int count, int eventthreadcount) "thread count (%d) ignored", eventthreadcount); } + /* although, eventhreadcount for poll implementaiton is always + * going to be 1, eventthreadcount needs to be set to 1 so that + * rpcsvc_request_handler() thread scaling works flawlessly in + * both epoll and poll models + */ + event_pool->eventthreadcount = 1; + return event_pool; } diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c index 68e27ab..31b5eb5 100644 --- a/rpc/rpc-lib/src/rpcsvc.c +++ b/rpc/rpc-lib/src/rpcsvc.c @@ -1877,39 +1877,105 @@ rpcsvc_request_handler (void *arg) goto unlock; } - while (list_empty (&program->request_queue)) + while (list_empty (&program->request_queue) && + (program->threadcount <= + program->eventthreadcount)) { pthread_cond_wait (&program->queue_cond, &program->queue_lock); + } - req = list_entry (program->request_queue.next, - typeof (*req), request_list); - - list_del_init (&req->request_list); + if (program->threadcount > program->eventthreadcount) { + done = 1; + program->threadcount--; + + gf_log (GF_RPCSVC, GF_LOG_INFO, + "program '%s' thread terminated; " + "total count:%d", + program->progname, + program->threadcount); + } else if (!list_empty (&program->request_queue)) { + req = list_entry (program->request_queue.next, + typeof (*req), request_list); + + list_del_init (&req->request_list); + } } unlock: pthread_mutex_unlock (&program->queue_lock); + if (req) { + THIS = req->svc->xl; + actor = rpcsvc_program_actor (req); + ret = actor->actor (req); + + if (ret != 0) { + rpcsvc_check_and_reply_error (ret, NULL, req); + } + req = NULL; + } + if (done) break; + } - THIS = req->svc->xl; + return NULL; +} - actor = rpcsvc_program_actor (req); +int +rpcsvc_spawn_threads (rpcsvc_t *svc, rpcsvc_program_t *program) +{ + int ret = 0, delta = 0, creates = 0; - ret = actor->actor (req); + if (!program || !svc) + goto out; - if (ret != 0) { - rpcsvc_check_and_reply_error (ret, NULL, req); + pthread_mutex_lock (&program->queue_lock); + { + delta = program->eventthreadcount - program->threadcount; + + if (delta >= 0) { + while (delta--) { + ret = gf_thread_create (&program->thread, NULL, + rpcsvc_request_handler, + program, "rpcrqhnd"); + if (!ret) { + program->threadcount++; + creates++; + } + } + + if (creates) { + gf_log (GF_RPCSVC, GF_LOG_INFO, + "spawned %d threads for program '%s'; " + "total count:%d", + creates, + program->progname, + program->threadcount); + } + } else { + gf_log (GF_RPCSVC, GF_LOG_INFO, + "terminating %d threads for program '%s'", + -delta, program->progname); + + /* this signal is to just wake up the threads so they + * test for the change in eventthreadcount and kill + * themselves until the program thread count becomes + * equal to the event thread count + */ + pthread_cond_broadcast (&program->queue_cond); } } + pthread_mutex_unlock (&program->queue_lock); - return NULL; +out: + return creates; } int rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) { int ret = -1; + int creates = -1; rpcsvc_program_t *newprog = NULL; char already_registered = 0; @@ -1957,9 +2023,12 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program) newprog->ownthread = _gf_false; if (newprog->ownthread) { - gf_thread_create (&newprog->thread, NULL, - rpcsvc_request_handler, - newprog, "rpcsvcrh"); + newprog->eventthreadcount = 1; + creates = rpcsvc_spawn_threads (svc, newprog); + + if (creates < 1) { + goto out; + } } pthread_mutex_lock (&svc->rpclock); @@ -2816,6 +2885,38 @@ out: return ret; } +/* During reconfigure, Make sure to call this function after event-threads are + * reconfigured as programs' threadcount will be made equal to event threads. + */ + +int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount) +{ + int ret = -1; + rpcsvc_program_t *program = NULL; + + if (!svc) { + ret = 0; + goto out; + } + + pthread_rwlock_wrlock (&svc->rpclock); + { + list_for_each_entry (program, &svc->programs, program) { + if (program->ownthread) { + program->eventthreadcount = + new_eventthreadcount; + rpcsvc_spawn_threads (svc, program); + } + } + } + pthread_rwlock_unlock (&svc->rpclock); + + ret = 0; +out: + return ret; +} + rpcsvc_actor_t gluster_dump_actors[GF_DUMP_MAXVALUE] = { [GF_DUMP_NULL] = {"NULL", GF_DUMP_NULL, NULL, NULL, 0, DRC_NA}, diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h index 73507b6..4ae2350 100644 --- a/rpc/rpc-lib/src/rpcsvc.h +++ b/rpc/rpc-lib/src/rpcsvc.h @@ -412,6 +412,12 @@ struct rpcsvc_program { pthread_mutex_t queue_lock; pthread_cond_t queue_cond; pthread_t thread; + int threadcount; + /* eventthreadcount is just a readonly copy of the actual value + * owned by the event sub-system + * It is used to control the scaling of rpcsvc_request_handler threads + */ + int eventthreadcount; }; typedef struct rpcsvc_cbk_program { @@ -623,4 +629,6 @@ rpcsvc_auth_array (rpcsvc_t *svc, char *volname, int *autharr, int arrlen); rpcsvc_vector_sizer rpcsvc_get_program_vector_sizer (rpcsvc_t *svc, uint32_t prognum, uint32_t progver, int procnum); +extern int +rpcsvc_ownthread_reconf (rpcsvc_t *svc, int new_eventthreadcount); #endif diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c index 6dc9d0f..4627ea0 100644 --- a/xlators/protocol/server/src/server.c +++ b/xlators/protocol/server/src/server.c @@ -990,6 +990,12 @@ do_rpc: ret = server_init_grace_timer (this, options, conf); + /* rpcsvc thread reconfigure should be after events thread + * reconfigure + */ + new_nthread = + ((struct event_pool *)(this->ctx->event_pool))->eventthreadcount; + ret = rpcsvc_ownthread_reconf (rpc_conf, new_nthread); out: THIS = oldTHIS; gf_msg_debug ("", 0, "returning %d", ret); @@ -1569,9 +1575,9 @@ notify (xlator_t *this, int32_t event, void *data, ...) (*trav_p) = (*trav_p)->next; glusterfs_mgmt_pmap_signout (ctx, victim->name); - glusterfs_autoscale_threads (THIS->ctx, -1); + /* we need the protocol/server xlator here as 'this' */ + glusterfs_autoscale_threads (ctx, -1, this); default_notify (victim, GF_EVENT_CLEANUP, data); - } break; -- 1.8.3.1