From 667e92a8dd0a21902cef39a59bc6c6b77d1f3c26 Mon Sep 17 00:00:00 2001
From: Raghavendra Gowdappa <rgowdapp@redhat.com>
Date: Mon, 11 Feb 2019 12:32:52 +0530
Subject: [PATCH 525/529] rpcsvc: provide each request handler thread its own
queue
A single global per program queue is contended by all request handler
threads and event threads. This can lead to high contention. So,
reduce the contention by providing each request handler thread its own
private queue.
Thanks to "Manoj Pillai"<mpillai@redhat.com> for the idea of pairing a
single queue with a fixed request-handler-thread and event-thread,
which brought down the performance regression due to overhead of
queuing significantly.
Thanks to "Xavi Hernandez"<xhernandez@redhat.com> for discussion on
how to communicate the event-thread death to request-handler-thread.
Thanks to "Karan Sandha"<ksandha@redhat.com> for voluntarily running
the perf benchmarks to qualify that performance regression introduced
by ping-timer-fixes is fixed with this patch and patiently running
many iterations of regression tests while RCAing the issue.
Thanks to "Milind Changire"<mchangir@redhat.com> for patiently running
the many iterations of perf benchmarking tests while RCAing the
regression caused by ping-timer-expiry fixes.
Change-Id: I578c3fc67713f4234bd3abbec5d3fbba19059ea5
BUG: 1390151
Signed-off-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
(cherry picked from commit 95e380eca19b9f0d03a53429535f15556e5724ad)
Reviewed-on: https://code.engineering.redhat.com/gerrit/162427
Tested-by: RHGS Build Bot <nigelb@redhat.com>
---
cli/src/cli-rl.c | 4 +-
libglusterfs/src/event-epoll.c | 156 +++++++++---
libglusterfs/src/event-poll.c | 14 +-
libglusterfs/src/event.c | 11 +-
libglusterfs/src/event.h | 19 +-
rpc/rpc-lib/src/rpc-clnt.c | 6 +
rpc/rpc-lib/src/rpc-transport.c | 4 +
rpc/rpc-lib/src/rpc-transport.h | 3 +
rpc/rpc-lib/src/rpcsvc.c | 339 +++++++++++++++++++++++----
rpc/rpc-lib/src/rpcsvc.h | 32 ++-
rpc/rpc-transport/socket/src/socket.c | 29 ++-
xlators/protocol/server/src/server-helpers.c | 4 +
xlators/protocol/server/src/server.c | 3 +
13 files changed, 530 insertions(+), 94 deletions(-)
diff --git a/cli/src/cli-rl.c b/cli/src/cli-rl.c
index 4745cf4..cffd0a8 100644
--- a/cli/src/cli-rl.c
+++ b/cli/src/cli-rl.c
@@ -109,7 +109,7 @@ cli_rl_process_line (char *line)
int
cli_rl_stdin (int fd, int idx, int gen, void *data,
- int poll_out, int poll_in, int poll_err)
+ int poll_out, int poll_in, int poll_err, char event_thread_died)
{
struct cli_state *state = NULL;
@@ -394,7 +394,7 @@ cli_rl_enable (struct cli_state *state)
}
ret = event_register (state->ctx->event_pool, 0, cli_rl_stdin, state,
- 1, 0);
+ 1, 0, 0);
if (ret == -1)
goto out;
diff --git a/libglusterfs/src/event-epoll.c b/libglusterfs/src/event-epoll.c
index 7fc53ff..310bce3 100644
--- a/libglusterfs/src/event-epoll.c
+++ b/libglusterfs/src/event-epoll.c
@@ -32,6 +32,7 @@ struct event_slot_epoll {
int fd;
int events;
int gen;
+ int idx;
int ref;
int do_close;
int in_handler;
@@ -39,6 +40,7 @@ struct event_slot_epoll {
void *data;
event_handler_t handler;
gf_lock_t lock;
+ struct list_head poller_death;
};
struct event_thread_data {
@@ -60,6 +62,7 @@ __event_newtable (struct event_pool *event_pool, int table_idx)
for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
table[i].fd = -1;
LOCK_INIT (&table[i].lock);
+ INIT_LIST_HEAD(&table[i].poller_death);
}
event_pool->ereg[table_idx] = table;
@@ -70,7 +73,8 @@ __event_newtable (struct event_pool *event_pool, int table_idx)
static int
-__event_slot_alloc (struct event_pool *event_pool, int fd)
+__event_slot_alloc (struct event_pool *event_pool, int fd,
+ char notify_poller_death)
{
int i = 0;
int table_idx = -1;
@@ -105,34 +109,42 @@ __event_slot_alloc (struct event_pool *event_pool, int fd)
table_idx = i;
- for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
- if (table[i].fd == -1) {
- /* wipe everything except bump the generation */
- gen = table[i].gen;
- memset (&table[i], 0, sizeof (table[i]));
- table[i].gen = gen + 1;
-
- LOCK_INIT (&table[i].lock);
+ for (i = 0; i < EVENT_EPOLL_SLOTS; i++) {
+ if (table[i].fd == -1) {
+ /* wipe everything except bump the generation */
+ gen = table[i].gen;
+ memset (&table[i], 0, sizeof (table[i]));
+ table[i].gen = gen + 1;
+
+ LOCK_INIT (&table[i].lock);
+ INIT_LIST_HEAD(&table[i].poller_death);
+
+ table[i].fd = fd;
+ if (notify_poller_death) {
+ table[i].idx = table_idx * EVENT_EPOLL_SLOTS + i;
+ list_add_tail(&table[i].poller_death,
+ &event_pool->poller_death);
+ }
- table[i].fd = fd;
- event_pool->slots_used[table_idx]++;
+ event_pool->slots_used[table_idx]++;
- break;
- }
- }
+ break;
+ }
+ }
return table_idx * EVENT_EPOLL_SLOTS + i;
}
static int
-event_slot_alloc (struct event_pool *event_pool, int fd)
+event_slot_alloc (struct event_pool *event_pool, int fd,
+ char notify_poller_death)
{
int idx = -1;
pthread_mutex_lock (&event_pool->mutex);
{
- idx = __event_slot_alloc (event_pool, fd);
+ idx = __event_slot_alloc (event_pool, fd, notify_poller_death);
}
pthread_mutex_unlock (&event_pool->mutex);
@@ -162,6 +174,7 @@ __event_slot_dealloc (struct event_pool *event_pool, int idx)
slot->fd = -1;
slot->handled_error = 0;
slot->in_handler = 0;
+ list_del_init(&slot->poller_death);
event_pool->slots_used[table_idx]--;
return;
@@ -180,6 +193,23 @@ event_slot_dealloc (struct event_pool *event_pool, int idx)
return;
}
+static int
+event_slot_ref(struct event_slot_epoll *slot)
+{
+ int ref;
+
+ if (!slot)
+ return -1;
+
+ LOCK (&slot->lock);
+ {
+ slot->ref++;
+ ref = slot->ref;
+ }
+ UNLOCK (&slot->lock);
+
+ return ref;
+}
static struct event_slot_epoll *
event_slot_get (struct event_pool *event_pool, int idx)
@@ -198,15 +228,44 @@ event_slot_get (struct event_pool *event_pool, int idx)
slot = &table[offset];
+ event_slot_ref (slot);
+ return slot;
+}
+
+static void
+__event_slot_unref(struct event_pool *event_pool, struct event_slot_epoll *slot,
+ int idx)
+{
+ int ref = -1;
+ int fd = -1;
+ int do_close = 0;
+
LOCK (&slot->lock);
{
- slot->ref++;
+ --(slot->ref);
+ ref = slot->ref;
}
UNLOCK (&slot->lock);
- return slot;
-}
+ if (ref)
+ /* slot still alive */
+ goto done;
+
+ LOCK(&slot->lock);
+ {
+ fd = slot->fd;
+ do_close = slot->do_close;
+ slot->do_close = 0;
+ }
+ UNLOCK(&slot->lock);
+
+ __event_slot_dealloc(event_pool, idx);
+ if (do_close)
+ sys_close(fd);
+done:
+ return;
+}
static void
event_slot_unref (struct event_pool *event_pool, struct event_slot_epoll *slot,
@@ -264,7 +323,7 @@ event_pool_new_epoll (int count, int eventthreadcount)
event_pool->fd = epfd;
event_pool->count = count;
-
+ INIT_LIST_HEAD(&event_pool->poller_death);
event_pool->eventthreadcount = eventthreadcount;
event_pool->auto_thread_count = 0;
@@ -315,7 +374,8 @@ __slot_update_events (struct event_slot_epoll *slot, int poll_in, int poll_out)
int
event_register_epoll (struct event_pool *event_pool, int fd,
event_handler_t handler,
- void *data, int poll_in, int poll_out)
+ void *data, int poll_in, int poll_out,
+ char notify_poller_death)
{
int idx = -1;
int ret = -1;
@@ -345,7 +405,7 @@ event_register_epoll (struct event_pool *event_pool, int fd,
if (destroy == 1)
goto out;
- idx = event_slot_alloc (event_pool, fd);
+ idx = event_slot_alloc (event_pool, fd, notify_poller_death);
if (idx == -1) {
gf_msg ("epoll", GF_LOG_ERROR, 0, LG_MSG_SLOT_NOT_FOUND,
"could not find slot for fd=%d", fd);
@@ -583,7 +643,7 @@ pre_unlock:
ret = handler (fd, idx, gen, data,
(event->events & (EPOLLIN|EPOLLPRI)),
(event->events & (EPOLLOUT)),
- (event->events & (EPOLLERR|EPOLLHUP)));
+ (event->events & (EPOLLERR|EPOLLHUP)), 0);
}
out:
event_slot_unref (event_pool, slot, idx);
@@ -600,7 +660,10 @@ event_dispatch_epoll_worker (void *data)
struct event_thread_data *ev_data = data;
struct event_pool *event_pool;
int myindex = -1;
- int timetodie = 0;
+ int timetodie = 0, gen = 0;
+ struct list_head poller_death_notify;
+ struct event_slot_epoll *slot = NULL, *tmp = NULL;
+
GF_VALIDATE_OR_GOTO ("event", ev_data, out);
@@ -610,7 +673,7 @@ event_dispatch_epoll_worker (void *data)
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
gf_msg ("epoll", GF_LOG_INFO, 0, LG_MSG_STARTED_EPOLL_THREAD, "Started"
- " thread with index %d", myindex);
+ " thread with index %d", myindex - 1);
pthread_mutex_lock (&event_pool->mutex);
{
@@ -627,21 +690,58 @@ event_dispatch_epoll_worker (void *data)
* reconfigured always */
pthread_mutex_lock (&event_pool->mutex);
{
- if (event_pool->eventthreadcount <
- myindex) {
+ if (event_pool->eventthreadcount < myindex) {
+ while (event_pool->poller_death_sliced) {
+ pthread_cond_wait(
+ &event_pool->cond,
+ &event_pool->mutex);
+ }
+
+ INIT_LIST_HEAD(&poller_death_notify);
+
/* if found true in critical section,
* die */
event_pool->pollers[myindex - 1] = 0;
event_pool->activethreadcount--;
timetodie = 1;
+ gen = ++event_pool->poller_gen;
+ list_for_each_entry(slot, &event_pool->poller_death,
+ poller_death)
+ {
+ event_slot_ref(slot);
+ }
+
+ list_splice_init(&event_pool->poller_death,
+ &poller_death_notify);
+ event_pool->poller_death_sliced = 1;
+
pthread_cond_broadcast (&event_pool->cond);
}
}
pthread_mutex_unlock (&event_pool->mutex);
if (timetodie) {
+ list_for_each_entry(slot, &poller_death_notify, poller_death)
+ {
+ slot->handler(slot->fd, 0, gen, slot->data, 0, 0, 0, 1);
+ }
+
+ pthread_mutex_lock(&event_pool->mutex);
+ {
+ list_for_each_entry_safe(slot, tmp, &poller_death_notify, poller_death)
+ {
+ __event_slot_unref(event_pool, slot, slot->idx);
+ }
+
+ list_splice(&poller_death_notify,
+ &event_pool->poller_death);
+ event_pool->poller_death_sliced = 0;
+ pthread_cond_broadcast(&event_pool->cond);
+ }
+ pthread_mutex_unlock(&event_pool->mutex);
+
gf_msg ("epoll", GF_LOG_INFO, 0,
LG_MSG_EXITED_EPOLL_THREAD, "Exited "
- "thread with index %d", myindex);
+ "thread with index %d", myindex - 1);
goto out;
}
}
diff --git a/libglusterfs/src/event-poll.c b/libglusterfs/src/event-poll.c
index 3bffc47..ca00071 100644
--- a/libglusterfs/src/event-poll.c
+++ b/libglusterfs/src/event-poll.c
@@ -36,12 +36,14 @@ struct event_slot_poll {
static int
event_register_poll (struct event_pool *event_pool, int fd,
event_handler_t handler,
- void *data, int poll_in, int poll_out);
+ void *data, int poll_in, int poll_out,
+ char notify_poller_death);
static int
__flush_fd (int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err)
+ int poll_in, int poll_out, int poll_err,
+ char notify_poller_death)
{
char buf[64];
int ret = -1;
@@ -153,7 +155,7 @@ event_pool_new_poll (int count, int eventthreadcount)
}
ret = event_register_poll (event_pool, event_pool->breaker[0],
- __flush_fd, NULL, 1, 0);
+ __flush_fd, NULL, 1, 0, 0);
if (ret == -1) {
gf_msg ("poll", GF_LOG_ERROR, 0, LG_MSG_REGISTER_PIPE_FAILED,
"could not register pipe fd with poll event loop");
@@ -180,7 +182,8 @@ event_pool_new_poll (int count, int eventthreadcount)
static int
event_register_poll (struct event_pool *event_pool, int fd,
event_handler_t handler,
- void *data, int poll_in, int poll_out)
+ void *data, int poll_in, int poll_out,
+ char notify_poller_death)
{
int idx = -1;
@@ -389,7 +392,8 @@ unlock:
ret = handler (ufds[i].fd, idx, 0, data,
(ufds[i].revents & (POLLIN|POLLPRI)),
(ufds[i].revents & (POLLOUT)),
- (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)));
+ (ufds[i].revents & (POLLERR|POLLHUP|POLLNVAL)),
+ 0);
return ret;
}
diff --git a/libglusterfs/src/event.c b/libglusterfs/src/event.c
index bba6f84..8463c19 100644
--- a/libglusterfs/src/event.c
+++ b/libglusterfs/src/event.c
@@ -58,14 +58,16 @@ event_pool_new (int count, int eventthreadcount)
int
event_register (struct event_pool *event_pool, int fd,
event_handler_t handler,
- void *data, int poll_in, int poll_out)
+ void *data, int poll_in, int poll_out,
+ char notify_poller_death)
{
int ret = -1;
GF_VALIDATE_OR_GOTO ("event", event_pool, out);
ret = event_pool->ops->event_register (event_pool, fd, handler, data,
- poll_in, poll_out);
+ poll_in, poll_out,
+ notify_poller_death);
out:
return ret;
}
@@ -170,7 +172,8 @@ out:
int
poller_destroy_handler (int fd, int idx, int gen, void *data,
- int poll_out, int poll_in, int poll_err)
+ int poll_out, int poll_in, int poll_err,
+ char event_thread_exit)
{
struct event_destroy_data *destroy = NULL;
int readfd = -1, ret = -1;
@@ -239,7 +242,7 @@ event_dispatch_destroy (struct event_pool *event_pool)
/* From the main thread register an event on the pipe fd[0],
*/
idx = event_register (event_pool, fd[0], poller_destroy_handler,
- &data, 1, 0);
+ &data, 1, 0, 0);
if (idx < 0)
goto out;
diff --git a/libglusterfs/src/event.h b/libglusterfs/src/event.h
index c60b14a..875cd7d 100644
--- a/libglusterfs/src/event.h
+++ b/libglusterfs/src/event.h
@@ -12,6 +12,7 @@
#define _EVENT_H_
#include <pthread.h>
+#include "list.h"
struct event_pool;
struct event_ops;
@@ -24,7 +25,8 @@ struct event_data {
typedef int (*event_handler_t) (int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err);
+ int poll_in, int poll_out, int poll_err,
+ char event_thread_exit);
#define EVENT_EPOLL_TABLES 1024
#define EVENT_EPOLL_SLOTS 1024
@@ -41,6 +43,13 @@ struct event_pool {
struct event_slot_epoll *ereg[EVENT_EPOLL_TABLES];
int slots_used[EVENT_EPOLL_TABLES];
+ struct list_head poller_death;
+ int poller_death_sliced; /* track whether the list of fds interested
+ * poller_death is sliced. If yes, new thread
+ * death notification has to wait till the
+ * list is added back
+ */
+ int poller_gen;
int used;
int changed;
@@ -54,7 +63,7 @@ struct event_pool {
* epoll. */
int eventthreadcount; /* number of event threads to execute. */
pthread_t pollers[EVENT_MAX_THREADS]; /* poller thread_id store,
- * and live status */
+ * and live status */
int destroy;
int activethreadcount;
@@ -83,7 +92,8 @@ struct event_ops {
int (*event_register) (struct event_pool *event_pool, int fd,
event_handler_t handler,
- void *data, int poll_in, int poll_out);
+ void *data, int poll_in, int poll_out,
+ char notify_poller_death);
int (*event_select_on) (struct event_pool *event_pool, int fd, int idx,
int poll_in, int poll_out);
@@ -107,7 +117,8 @@ int event_select_on (struct event_pool *event_pool, int fd, int idx,
int poll_in, int poll_out);
int event_register (struct event_pool *event_pool, int fd,
event_handler_t handler,
- void *data, int poll_in, int poll_out);
+ void *data, int poll_in, int poll_out,
+ char notify_poller_death);
int event_unregister (struct event_pool *event_pool, int fd, int idx);
int event_unregister_close (struct event_pool *event_pool, int fd, int idx);
int event_dispatch (struct event_pool *event_pool);
diff --git a/rpc/rpc-lib/src/rpc-clnt.c b/rpc/rpc-lib/src/rpc-clnt.c
index fd7e3ec..fe5e3fd 100644
--- a/rpc/rpc-lib/src/rpc-clnt.c
+++ b/rpc/rpc-lib/src/rpc-clnt.c
@@ -1013,6 +1013,12 @@ rpc_clnt_notify (rpc_transport_t *trans, void *mydata,
*/
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ /* only meaningful on a server, no need of handling this event on a
+ * client */
+ ret = 0;
+ break;
}
out:
diff --git a/rpc/rpc-lib/src/rpc-transport.c b/rpc/rpc-lib/src/rpc-transport.c
index b737ff2..db02338 100644
--- a/rpc/rpc-lib/src/rpc-transport.c
+++ b/rpc/rpc-lib/src/rpc-transport.c
@@ -294,6 +294,10 @@ rpc_transport_load (glusterfs_ctx_t *ctx, dict_t *options, char *trans_name)
goto fail;
}
+ if (dict_get(options, "notify-poller-death")) {
+ trans->notify_poller_death = 1;
+ }
+
gf_log ("rpc-transport", GF_LOG_DEBUG,
"attempt to load file %s", name);
diff --git a/rpc/rpc-lib/src/rpc-transport.h b/rpc/rpc-lib/src/rpc-transport.h
index c97f98d..cf77c9d 100644
--- a/rpc/rpc-lib/src/rpc-transport.h
+++ b/rpc/rpc-lib/src/rpc-transport.h
@@ -99,6 +99,7 @@ typedef enum {
RPC_TRANSPORT_MSG_RECEIVED, /* Complete rpc msg has been read */
RPC_TRANSPORT_CONNECT, /* client is connected to server */
RPC_TRANSPORT_MSG_SENT,
+ RPC_TRANSPORT_EVENT_THREAD_DIED /* event-thread has died */
} rpc_transport_event_t;
struct rpc_transport_msg {
@@ -218,6 +219,8 @@ struct rpc_transport {
*/
gf_boolean_t connect_failed;
gf_atomic_t disconnect_progress;
+ char notify_poller_death;
+ char poller_death_accept;
};
struct rpc_transport_ops {
diff --git a/rpc/rpc-lib/src/rpcsvc.c b/rpc/rpc-lib/src/rpcsvc.c
index faa1956..c769463 100644
--- a/rpc/rpc-lib/src/rpcsvc.c
+++ b/rpc/rpc-lib/src/rpcsvc.c
@@ -8,6 +8,7 @@
cases as published by the Free Software Foundation.
*/
+#include <math.h>
#include "rpcsvc.h"
#include "rpc-transport.h"
#include "dict.h"
@@ -56,9 +57,76 @@ int
rpcsvc_notify (rpc_transport_t *trans, void *mydata,
rpc_transport_event_t event, void *data, ...);
+void *
+rpcsvc_request_handler(void *arg);
+
static int
rpcsvc_match_subnet_v4 (const char *addrtok, const char *ipaddr);
+void
+rpcsvc_toggle_queue_status(rpcsvc_program_t *prog,
+ rpcsvc_request_queue_t *queue, char status[])
+{
+ int queue_index = 0, status_index = 0, set_bit = 0;
+
+ if (queue != &prog->request_queue[0]) {
+ queue_index = (queue - &prog->request_queue[0]);
+ }
+
+ status_index = queue_index / 8;
+ set_bit = queue_index % 8;
+
+ status[status_index] ^= (1 << set_bit);
+
+ return;
+}
+
+static int
+get_rightmost_set_bit(int n)
+{
+ return log2(n & -n);
+}
+
+int
+rpcsvc_get_free_queue_index(rpcsvc_program_t *prog)
+{
+ int queue_index = 0, max_index = 0, i = 0;
+ unsigned int right_most_unset_bit = 0;
+
+ right_most_unset_bit = 8;
+
+ max_index = gf_roof(EVENT_MAX_THREADS, 8) / 8;
+ for (i = 0; i < max_index; i++) {
+ if (prog->request_queue_status[i] == 0) {
+ right_most_unset_bit = 0;
+ break;
+ } else {
+ right_most_unset_bit = get_rightmost_set_bit(
+ ~prog->request_queue_status[i]);
+ if (right_most_unset_bit < 8) {
+ break;
+ }
+ }
+ }
+
+ if (right_most_unset_bit > 7) {
+ queue_index = -1;
+ } else {
+ queue_index = i * 8;
+ queue_index += right_most_unset_bit;
+
+ if (queue_index > EVENT_MAX_THREADS) {
+ queue_index = -1;
+ }
+ }
+
+ if (queue_index != -1) {
+ prog->request_queue_status[i] |= (0x1 << right_most_unset_bit);
+ }
+
+ return queue_index;
+}
+
rpcsvc_notify_wrapper_t *
rpcsvc_notify_wrapper_alloc (void)
{
@@ -412,7 +480,6 @@ rpcsvc_request_init (rpcsvc_t *svc, rpc_transport_t *trans,
req->progver = rpc_call_progver (callmsg);
req->procnum = rpc_call_progproc (callmsg);
req->trans = rpc_transport_ref (trans);
- gf_client_ref (req->trans->xl_private);
req->count = msg->count;
req->msg[0] = progmsg;
req->iobref = iobref_ref (msg->iobref);
@@ -570,6 +637,73 @@ rpcsvc_check_and_reply_error (int ret, call_frame_t *frame, void *opaque)
return 0;
}
+void
+rpcsvc_queue_event_thread_death(rpcsvc_t *svc, rpcsvc_program_t *prog, int gen)
+{
+ rpcsvc_request_queue_t *queue = NULL;
+ int num = 0;
+ void *value = NULL;
+ rpcsvc_request_t *req = NULL;
+ char empty = 0;
+
+ value = pthread_getspecific(prog->req_queue_key);
+ if (value == NULL) {
+ return;
+ }
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &prog->request_queue[num];
+
+ if (queue->gen == gen) {
+ /* duplicate event */
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "not queuing duplicate event thread death. "
+ "queue %d program %s",
+ num, prog->progname);
+ return;
+ }
+
+ rpcsvc_alloc_request(svc, req);
+ req->prognum = RPCSVC_INFRA_PROGRAM;
+ req->procnum = RPCSVC_PROC_EVENT_THREAD_DEATH;
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "queuing event thread death request to queue %d of program %s", num,
+ prog->progname);
+
+ pthread_mutex_lock(&queue->queue_lock);
+ {
+ empty = list_empty(&queue->request_queue);
+
+ list_add_tail(&req->request_list, &queue->request_queue);
+ queue->gen = gen;
+
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
+ }
+ pthread_mutex_unlock(&queue->queue_lock);
+
+ return;
+}
+
+int
+rpcsvc_handle_event_thread_death(rpcsvc_t *svc, rpc_transport_t *trans, int gen)
+{
+ rpcsvc_program_t *prog = NULL;
+
+ pthread_mutex_lock (&svc->rpclock);
+ {
+ list_for_each_entry(prog, &svc->programs, program)
+ {
+ if (prog->ownthread)
+ rpcsvc_queue_event_thread_death(svc, prog, gen);
+ }
+ }
+ pthread_mutex_unlock (&svc->rpclock);
+
+ return 0;
+}
+
int
rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
rpc_transport_pollin_t *msg)
@@ -581,8 +715,12 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
uint16_t port = 0;
gf_boolean_t is_unix = _gf_false, empty = _gf_false;
gf_boolean_t unprivileged = _gf_false;
+ gf_boolean_t spawn_request_handler = _gf_false;
drc_cached_op_t *reply = NULL;
rpcsvc_drc_globals_t *drc = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ long num = 0;
+ void *value = NULL;
if (!trans || !svc)
return -1;
@@ -696,20 +834,83 @@ rpcsvc_handle_rpc_call (rpcsvc_t *svc, rpc_transport_t *trans,
rpcsvc_check_and_reply_error, NULL,
req);
} else if (req->ownthread) {
- pthread_mutex_lock (&req->prog->queue_lock);
+ value = pthread_getspecific(req->prog->req_queue_key);
+ if (value == NULL) {
+ pthread_mutex_lock(&req->prog->thr_lock);
+ {
+ num = rpcsvc_get_free_queue_index(req->prog);
+ if (num != -1) {
+ num++;
+ value = (void *)num;
+ ret = pthread_setspecific(req->prog->req_queue_key,
+ value);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "setting request queue in TLS failed");
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+ num = -1;
+ } else {
+ spawn_request_handler = 1;
+ }
+ }
+ }
+ pthread_mutex_unlock(&req->prog->thr_lock);
+ }
+
+ if (num == -1)
+ goto noqueue;
+
+ num = ((unsigned long)value) - 1;
+
+ queue = &req->prog->request_queue[num];
+
+ if (spawn_request_handler) {
+ ret = gf_thread_create(&queue->thread, NULL,
+ rpcsvc_request_handler, queue,
+ "rpcrqhnd");
+ if (!ret) {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawned a request handler "
+ "thread for queue %d",
+ (int)num);
+
+ req->prog->threadcount++;
+ } else {
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "spawning a request handler "
+ "thread for queue %d failed",
+ (int)num);
+ ret = pthread_setspecific(req->prog->req_queue_key, 0);
+ if (ret < 0) {
+ gf_log(GF_RPCSVC, GF_LOG_WARNING,
+ "resetting request "
+ "queue in TLS failed");
+ }
+
+ rpcsvc_toggle_queue_status(
+ req->prog, &req->prog->request_queue[num - 1],
+ req->prog->request_queue_status);
+
+ goto noqueue;
+ }
+ }
+
+ pthread_mutex_lock(&queue->queue_lock);
{
- empty = list_empty (&req->prog->request_queue);
+ empty = list_empty(&queue->request_queue);
- list_add_tail (&req->request_list,
- &req->prog->request_queue);
+ list_add_tail(&req->request_list, &queue->request_queue);
- if (empty)
- pthread_cond_signal (&req->prog->queue_cond);
+ if (empty && queue->waiting)
+ pthread_cond_signal(&queue->queue_cond);
}
- pthread_mutex_unlock (&req->prog->queue_lock);
+ pthread_mutex_unlock(&queue->queue_lock);
ret = 0;
} else {
+noqueue:
ret = actor_fn (req);
}
}
@@ -838,6 +1039,12 @@ rpcsvc_notify (rpc_transport_t *trans, void *mydata,
"got MAP_XID event, which should have not come");
ret = 0;
break;
+
+ case RPC_TRANSPORT_EVENT_THREAD_DIED:
+ rpcsvc_handle_event_thread_death(svc, trans,
+ (int)(unsigned long)data);
+ ret = 0;
+ break;
}
out:
@@ -1779,6 +1986,7 @@ rpcsvc_create_listeners (rpcsvc_t *svc, dict_t *options, char *name)
goto out;
}
+ dict_del(options, "notify-poller-death");
GF_FREE (transport_name);
transport_name = NULL;
count++;
@@ -1864,50 +2072,87 @@ out:
void *
rpcsvc_request_handler (void *arg)
{
- rpcsvc_program_t *program = arg;
- rpcsvc_request_t *req = NULL;
+ rpcsvc_request_queue_t *queue = NULL;
+ rpcsvc_program_t *program = NULL;
+ rpcsvc_request_t *req = NULL, *tmp_req = NULL;
rpcsvc_actor_t *actor = NULL;
gf_boolean_t done = _gf_false;
int ret = 0;
+ struct list_head tmp_list = {
+ 0,
+ };
+
+ queue = arg;
+ program = queue->program;
+
+ INIT_LIST_HEAD(&tmp_list);
if (!program)
return NULL;
while (1) {
- pthread_mutex_lock (&program->queue_lock);
+ pthread_mutex_lock(&queue->queue_lock);
{
- if (!program->alive
- && list_empty (&program->request_queue)) {
+ if (!program->alive && list_empty(&queue->request_queue)) {
done = 1;
goto unlock;
}
-
- while (list_empty (&program->request_queue))
- pthread_cond_wait (&program->queue_cond,
- &program->queue_lock);
-
- req = list_entry (program->request_queue.next,
- typeof (*req), request_list);
-
- list_del_init (&req->request_list);
+ while (list_empty(&queue->request_queue)) {
+ queue->waiting = _gf_true;
+ pthread_cond_wait(&queue->queue_cond, &queue->queue_lock);
+ }
+ queue->waiting = _gf_false;
+ if (!list_empty(&queue->request_queue)) {
+ INIT_LIST_HEAD(&tmp_list);
+ list_splice_init(&queue->request_queue, &tmp_list);
+ }
+ }
+unlock:
+ pthread_mutex_unlock(&queue->queue_lock);
+ list_for_each_entry_safe(req, tmp_req, &tmp_list, request_list)
+ {
+ list_del_init(&req->request_list);
+ if (req) {
+ if (req->prognum == RPCSVC_INFRA_PROGRAM) {
+ switch (req->procnum) {
+ case RPCSVC_PROC_EVENT_THREAD_DEATH:
+ gf_log(GF_RPCSVC, GF_LOG_INFO,
+ "event thread died, exiting request handler "
+ "thread for queue %d of program %s",
+ (int)(queue - &program->request_queue[0]),
+ program->progname);
+ done = 1;
+
+ pthread_mutex_lock(&program->thr_lock);
+ {
+ rpcsvc_toggle_queue_status(
+ program, queue,
+ program->request_queue_status);
+ program->threadcount--;
+ }
+ pthread_mutex_unlock(&program->thr_lock);
+ rpcsvc_request_destroy(req);
+ break;
+
+ default:
+ break;
+ }
+ } else {
+ THIS = req->svc->xl;
+ actor = rpcsvc_program_actor(req);
+ ret = actor->actor(req);
+
+ if (ret != 0) {
+ rpcsvc_check_and_reply_error(ret, NULL, req);
+ }
+
+ req = NULL;
+ }
+ }
}
- unlock:
- pthread_mutex_unlock (&program->queue_lock);
-
if (done)
break;
-
- THIS = req->svc->xl;
-
- actor = rpcsvc_program_actor (req);
-
- ret = actor->actor (req);
-
- if (ret != 0) {
- rpcsvc_check_and_reply_error (ret, NULL, req);
- }
}
-
return NULL;
}
@@ -1917,6 +2162,7 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
int ret = -1;
rpcsvc_program_t *newprog = NULL;
char already_registered = 0;
+ int i = 0;
if (!svc) {
goto out;
@@ -1951,9 +2197,16 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
memcpy (newprog, program, sizeof (*program));
INIT_LIST_HEAD (&newprog->program);
- INIT_LIST_HEAD (&newprog->request_queue);
- pthread_mutex_init (&newprog->queue_lock, NULL);
- pthread_cond_init (&newprog->queue_cond, NULL);
+
+ for (i = 0; i < EVENT_MAX_THREADS; i++) {
+ INIT_LIST_HEAD(&newprog->request_queue[i].request_queue);
+ pthread_mutex_init(&newprog->request_queue[i].queue_lock, NULL);
+ pthread_cond_init(&newprog->request_queue[i].queue_cond, NULL);
+ newprog->request_queue[i].program = newprog;
+ }
+
+ pthread_mutex_init(&newprog->thr_lock, NULL);
+ pthread_cond_init(&newprog->thr_cond, NULL);
newprog->alive = _gf_true;
@@ -1962,9 +2215,11 @@ rpcsvc_program_register (rpcsvc_t *svc, rpcsvc_program_t *program)
newprog->ownthread = _gf_false;
if (newprog->ownthread) {
- gf_thread_create (&newprog->thread, NULL,
- rpcsvc_request_handler,
- newprog, "reqhnd");
+ struct event_pool *ep = svc->ctx->event_pool;
+ newprog->eventthreadcount = ep->eventthreadcount;
+
+ pthread_key_create(&newprog->req_queue_key, NULL);
+ newprog->thr_queue = 1;
}
pthread_mutex_lock (&svc->rpclock);
diff --git a/rpc/rpc-lib/src/rpcsvc.h b/rpc/rpc-lib/src/rpcsvc.h
index 58c0055..f500bab 100644
--- a/rpc/rpc-lib/src/rpcsvc.h
+++ b/rpc/rpc-lib/src/rpcsvc.h
@@ -33,6 +33,16 @@
#define MAX_IOVEC 16
#endif
+/* TODO: we should store prognums at a centralized location to avoid conflict
+ or use a robust random number generator to avoid conflicts
+*/
+
+#define RPCSVC_INFRA_PROGRAM 7712846 /* random number */
+
+typedef enum {
+ RPCSVC_PROC_EVENT_THREAD_DEATH = 0,
+} rpcsvc_infra_procnum_t;
+
#define RPCSVC_DEFAULT_OUTSTANDING_RPC_LIMIT 64 /* Default for protocol/server */
#define RPCSVC_DEF_NFS_OUTSTANDING_RPC_LIMIT 16 /* Default for nfs/server */
#define RPCSVC_MAX_OUTSTANDING_RPC_LIMIT 65536
@@ -349,6 +359,16 @@ typedef struct rpcsvc_actor_desc {
drc_op_type_t op_type;
} rpcsvc_actor_t;
+typedef struct rpcsvc_request_queue {
+ int gen;
+ struct list_head request_queue;
+ pthread_mutex_t queue_lock;
+ pthread_cond_t queue_cond;
+ pthread_t thread;
+ struct rpcsvc_program *program;
+ gf_boolean_t waiting;
+} rpcsvc_request_queue_t;
+
/* Describes a program and its version along with the function pointers
* required to handle the procedures/actors of each program/version.
* Never changed ever by any thread so no need for a lock.
@@ -409,10 +429,14 @@ struct rpcsvc_program {
gf_boolean_t synctask;
/* list member to link to list of registered services with rpcsvc */
struct list_head program;
- struct list_head request_queue;
- pthread_mutex_t queue_lock;
- pthread_cond_t queue_cond;
- pthread_t thread;
+ rpcsvc_request_queue_t request_queue[EVENT_MAX_THREADS];
+ char request_queue_status[EVENT_MAX_THREADS / 8 + 1];
+ pthread_mutex_t thr_lock;
+ pthread_cond_t thr_cond;
+ int thr_queue;
+ pthread_key_t req_queue_key;
+ int threadcount;
+ int eventthreadcount;
};
typedef struct rpcsvc_cbk_program {
diff --git a/rpc/rpc-transport/socket/src/socket.c b/rpc/rpc-transport/socket/src/socket.c
index e28c5cd..df984f8 100644
--- a/rpc/rpc-transport/socket/src/socket.c
+++ b/rpc/rpc-transport/socket/src/socket.c
@@ -2419,7 +2419,8 @@ static int socket_disconnect (rpc_transport_t *this, gf_boolean_t wait);
/* reads rpc_requests during pollin */
static int
socket_event_handler (int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err)
+ int poll_in, int poll_out, int poll_err,
+ char event_thread_died)
{
rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
@@ -2429,6 +2430,13 @@ socket_event_handler (int fd, int idx, int gen, void *data,
this = data;
+ if (event_thread_died) {
+ /* to avoid duplicate notifications,
+ * notify only for listener sockets
+ */
+ return 0;
+ }
+
GF_VALIDATE_OR_GOTO ("socket", this, out);
GF_VALIDATE_OR_GOTO ("socket", this->private, out);
GF_VALIDATE_OR_GOTO ("socket", this->xl, out);
@@ -2720,7 +2728,8 @@ socket_spawn (rpc_transport_t *this)
static int
socket_server_event_handler (int fd, int idx, int gen, void *data,
- int poll_in, int poll_out, int poll_err)
+ int poll_in, int poll_out, int poll_err,
+ char event_thread_died)
{
rpc_transport_t *this = NULL;
socket_private_t *priv = NULL;
@@ -2742,6 +2751,12 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
priv = this->private;
ctx = this->ctx;
+ if (event_thread_died) {
+ rpc_transport_notify(this, RPC_TRANSPORT_EVENT_THREAD_DIED,
+ (void *)(unsigned long)gen);
+ return 0;
+ }
+
/* NOTE:
* We have done away with the critical section in this function. since
* there's little that it helps with. There's no other code that
@@ -2840,6 +2855,7 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
new_trans->mydata = this->mydata;
new_trans->notify = this->notify;
new_trans->listener = this;
+ new_trans->notify_poller_death = this->poller_death_accept;
new_priv = new_trans->private;
if (new_sockaddr.ss_family == AF_UNIX) {
@@ -2935,7 +2951,8 @@ socket_server_event_handler (int fd, int idx, int gen, void *data,
new_sock,
socket_event_handler,
new_trans,
- 1, 0);
+ 1, 0,
+ new_trans->notify_poller_death);
if (new_priv->idx == -1) {
ret = -1;
gf_log(this->name, GF_LOG_ERROR,
@@ -3388,7 +3405,8 @@ handler:
else {
priv->idx = event_register (ctx->event_pool, priv->sock,
socket_event_handler,
- this, 1, 1);
+ this, 1, 1,
+ this->notify_poller_death);
if (priv->idx == -1) {
gf_log ("", GF_LOG_WARNING,
"failed to register the event");
@@ -3560,7 +3578,8 @@ socket_listen (rpc_transport_t *this)
priv->idx = event_register (ctx->event_pool, priv->sock,
socket_server_event_handler,
- this, 1, 0);
+ this, 1, 0,
+ this->notify_poller_death);
if (priv->idx == -1) {
gf_log (this->name, GF_LOG_WARNING,
diff --git a/xlators/protocol/server/src/server-helpers.c b/xlators/protocol/server/src/server-helpers.c
index 7cc3d15..30045ef 100644
--- a/xlators/protocol/server/src/server-helpers.c
+++ b/xlators/protocol/server/src/server-helpers.c
@@ -557,6 +557,10 @@ get_frame_from_request (rpcsvc_request_t *req)
}
}
+ /* Add a ref for this fop */
+ if (client)
+ gf_client_ref (client);
+
frame->root->uid = req->uid;
frame->root->gid = req->gid;
frame->root->pid = req->pid;
diff --git a/xlators/protocol/server/src/server.c b/xlators/protocol/server/src/server.c
index ba3b831..d32f5dd 100644
--- a/xlators/protocol/server/src/server.c
+++ b/xlators/protocol/server/src/server.c
@@ -1342,6 +1342,9 @@ init (xlator_t *this)
ret = -1;
goto out;
}
+
+ ret = dict_set_int32(this->options, "notify-poller-death", 1);
+
ret = rpcsvc_create_listeners (conf->rpc, this->options,
this->name);
if (ret < 1) {
--
1.8.3.1