From 668b55b7dd86b23e635cfb2264bc5e50f4cd888d Mon Sep 17 00:00:00 2001 From: Krutika Dhananjay Date: Tue, 9 Jan 2018 15:11:00 +0530 Subject: [PATCH 461/493] mount/fuse: Add support for multi-threaded fuse readers > Upstream: https://review.gluster.org/19226 > Github issue #412 > Change-Id: I94aa1505e5ae6a133683d473e0e4e0edd139b76b Usage: Use 'reader-thread-count=' as command line option to set the thread count at the time of mounting the volume. Next task is to make these threads auto-scale based on the load, instead of having the user remount the volume everytime to change the thread count. Change-Id: I94aa1505e5ae6a133683d473e0e4e0edd139b76b BUG: 1651040 Signed-off-by: Krutika Dhananjay Reviewed-on: https://code.engineering.redhat.com/gerrit/158514 Tested-by: RHGS Build Bot Reviewed-by: Sunil Kumar Heggodu Gopala Acharya --- glusterfsd/src/glusterfsd.c | 26 ++++ glusterfsd/src/glusterfsd.h | 1 + libglusterfs/src/glusterfs.h | 1 + xlators/mount/fuse/src/fuse-bridge.c | 231 ++++++++++++++++++---------- xlators/mount/fuse/src/fuse-bridge.h | 9 +- xlators/mount/fuse/src/fuse-helpers.c | 3 + xlators/mount/fuse/src/fuse-mem-types.h | 1 + xlators/mount/fuse/utils/mount.glusterfs.in | 7 + 8 files changed, 196 insertions(+), 83 deletions(-) diff --git a/glusterfsd/src/glusterfsd.c b/glusterfsd/src/glusterfsd.c index 78f3719..03bca24 100644 --- a/glusterfsd/src/glusterfsd.c +++ b/glusterfsd/src/glusterfsd.c @@ -238,6 +238,8 @@ static struct argp_option gf_options[] = { "Enable localtime logging"}, {"event-history", ARGP_FUSE_EVENT_HISTORY_KEY, "BOOL", OPTION_ARG_OPTIONAL, "disable/enable fuse event-history"}, + {"reader-thread-count", ARGP_READER_THREAD_COUNT_KEY, "INTEGER", + OPTION_ARG_OPTIONAL, "set fuse reader thread count"}, {0, 0, 0, 0, "Miscellaneous Options:"}, {0, } }; @@ -557,6 +559,17 @@ set_fuse_mount_options (glusterfs_ctx_t *ctx, dict_t *options) goto err; } } + if (cmd_args->reader_thread_count) { + ret = dict_set_uint32 (options, "reader-thread-count", + cmd_args->reader_thread_count); + if (ret < 0) { + gf_msg ("glusterfsd", GF_LOG_ERROR, 0, glusterfsd_msg_4, + "failed to set dict value for key " + "reader-thread-count"); + goto err; + } + } + ret = 0; err: return ret; @@ -1307,6 +1320,19 @@ no_oom_api: argp_failure (state, -1, 0, "unknown event-history setting \"%s\"", arg); break; + case ARGP_READER_THREAD_COUNT_KEY: + if (gf_string2uint32 (arg, &cmd_args->reader_thread_count)) { + argp_failure (state, -1, 0, + "unknown reader thread count option %s", + arg); + } else if ((cmd_args->reader_thread_count < 1) || + (cmd_args->reader_thread_count > 64)) { + argp_failure (state, -1, 0, + "Invalid reader thread count %s. " + "Valid range: [\"1, 64\"]", arg); + } + + break; } return 0; diff --git a/glusterfsd/src/glusterfsd.h b/glusterfsd/src/glusterfsd.h index f66947b..75cb1d8 100644 --- a/glusterfsd/src/glusterfsd.h +++ b/glusterfsd/src/glusterfsd.h @@ -99,6 +99,7 @@ enum argp_option_keys { ARGP_LOCALTIME_LOGGING_KEY = 177, ARGP_SUBDIR_MOUNT_KEY = 178, ARGP_FUSE_EVENT_HISTORY_KEY = 179, + ARGP_READER_THREAD_COUNT_KEY = 180, }; struct _gfd_vol_top_priv { diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h index 5e641fd..3e2f426 100644 --- a/libglusterfs/src/glusterfs.h +++ b/libglusterfs/src/glusterfs.h @@ -446,6 +446,7 @@ struct _cmd_args { char *subdir_mount; char *event_history; + uint32_t reader_thread_count; }; typedef struct _cmd_args cmd_args_t; diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index fbb4c53..8d1e3a0 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -665,7 +665,8 @@ fuse_lookup_resume (fuse_state_t *state) } static void -fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lookup (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -693,7 +694,8 @@ do_forget(xlator_t *this, uint64_t unique, uint64_t nodeid, uint64_t nlookup) } static void -fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_forget_in *ffi = msg; @@ -714,7 +716,8 @@ fuse_forget (xlator_t *this, fuse_in_header_t *finh, void *msg) #if FUSE_KERNEL_MINOR_VERSION >= 16 static void -fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_batch_forget(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_batch_forget_in *fbfi = msg; struct fuse_forget_one *ffo = (struct fuse_forget_one *) (fbfi + 1); @@ -932,7 +935,8 @@ fuse_getattr_resume (fuse_state_t *state) } static void -fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { #if FUSE_KERNEL_MINOR_VERSION >= 9 struct fuse_getattr_in *fgi = msg; @@ -1265,7 +1269,8 @@ fuse_setattr_resume (fuse_state_t *state) } static void -fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_setattr_in *fsi = msg; @@ -1492,7 +1497,8 @@ fuse_access_resume (fuse_state_t *state) } static void -fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_access (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_access_in *fai = msg; fuse_state_t *state = NULL; @@ -1566,7 +1572,8 @@ fuse_readlink_resume (fuse_state_t *state) } static void -fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_state_t *state = NULL; @@ -1616,7 +1623,8 @@ fuse_mknod_resume (fuse_state_t *state) } static void -fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mknod (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_mknod_in *fmi = msg; char *name = (char *)(fmi + 1); @@ -1686,7 +1694,8 @@ fuse_mkdir_resume (fuse_state_t *state) } static void -fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_mkdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_mkdir_in *fmi = msg; char *name = (char *)(fmi + 1); @@ -1738,7 +1747,8 @@ fuse_unlink_resume (fuse_state_t *state) } static void -fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_unlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -1775,7 +1785,8 @@ fuse_rmdir_resume (fuse_state_t *state) } static void -fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rmdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; fuse_state_t *state = NULL; @@ -1825,7 +1836,8 @@ fuse_symlink_resume (fuse_state_t *state) } static void -fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_symlink (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; char *linkname = name + strlen (name) + 1; @@ -1947,7 +1959,8 @@ fuse_rename_resume (fuse_state_t *state) } static void -fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_rename (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_rename_in *fri = msg; char *oldname = (char *)(fri + 1); @@ -1997,7 +2010,8 @@ fuse_link_resume (fuse_state_t *state) } static void -fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_link (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_link_in *fli = msg; char *name = (char *)(fli + 1); @@ -2186,7 +2200,8 @@ fuse_create_resume (fuse_state_t *state) } static void -fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_create (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { #if FUSE_KERNEL_MINOR_VERSION >= 12 struct fuse_create_in *fci = msg; @@ -2280,7 +2295,8 @@ fuse_open_resume (fuse_state_t *state) } static void -fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_open (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_open_in *foi = msg; fuse_state_t *state = NULL; @@ -2357,7 +2373,8 @@ fuse_readv_resume (fuse_state_t *state) } static void -fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readv (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -2433,8 +2450,6 @@ void fuse_write_resume (fuse_state_t *state) { struct iobref *iobref = NULL; - struct iobuf *iobuf = NULL; - iobref = iobref_new (); if (!iobref) { @@ -2447,8 +2462,7 @@ fuse_write_resume (fuse_state_t *state) return; } - iobuf = ((fuse_private_t *) (state->this->private))->iobuf; - iobref_add (iobref, iobuf); + iobref_add (iobref, state->iobuf); gf_log ("glusterfs-fuse", GF_LOG_TRACE, "%"PRIu64": WRITE (%p, size=%"GF_PRI_SIZET", offset=%"PRId64")", @@ -2462,7 +2476,8 @@ fuse_write_resume (fuse_state_t *state) } static void -fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { /* WRITE is special, metadata is attached to in_header, * and msg is the payload as-is. @@ -2505,6 +2520,7 @@ fuse_write (xlator_t *this, fuse_in_header_t *finh, void *msg) state->vector.iov_base = msg; state->vector.iov_len = fwi->size; + state->iobuf = iobuf; fuse_resolve_and_resume (state, fuse_write_resume); @@ -2543,7 +2559,8 @@ fuse_lseek_resume (fuse_state_t *state) } static void -fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_lseek (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lseek_in *ffi = msg; fuse_state_t *state = NULL; @@ -2579,7 +2596,8 @@ fuse_flush_resume (fuse_state_t *state) } static void -fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_flush (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_flush_in *ffi = msg; @@ -2615,7 +2633,8 @@ fuse_internal_release (xlator_t *this, fd_t *fd) } static void -fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_release (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_release_in *fri = msg; fd_t *fd = NULL; @@ -2660,7 +2679,8 @@ fuse_fsync_resume (fuse_state_t *state) } static void -fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsync (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fsync_in *fsi = msg; @@ -2735,7 +2755,8 @@ fuse_opendir_resume (fuse_state_t *state) } static void -fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_opendir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { /* struct fuse_open_in *foi = msg; @@ -2877,7 +2898,8 @@ fuse_readdir_resume (fuse_state_t *state) } static void -fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -3028,7 +3050,8 @@ fuse_readdirp_resume (fuse_state_t *state) static void -fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_readdirp (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_read_in *fri = msg; @@ -3075,7 +3098,8 @@ fuse_fallocate_resume(fuse_state_t *state) } static void -fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fallocate_in *ffi = msg; fuse_state_t *state = NULL; @@ -3093,7 +3117,8 @@ fuse_fallocate(xlator_t *this, fuse_in_header_t *finh, void *msg) #endif /* FUSE minor version >= 19 */ static void -fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_releasedir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_release_in *fri = msg; fuse_state_t *state = NULL; @@ -3134,7 +3159,8 @@ fuse_fsyncdir_resume (fuse_state_t *state) } static void -fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_fsyncdir (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_fsync_in *fsi = msg; @@ -3221,7 +3247,8 @@ fuse_statfs_resume (fuse_state_t *state) static void -fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_statfs (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_state_t *state = NULL; @@ -3273,7 +3300,8 @@ fuse_setxattr_resume (fuse_state_t *state) static void -fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_setxattr_in *fsi = msg; char *name = (char *)(fsi + 1); @@ -3604,7 +3632,8 @@ fuse_getxattr_resume (fuse_state_t *state) static void -fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_getxattr_in *fgxi = msg; char *name = (char *)(fgxi + 1); @@ -3710,7 +3739,8 @@ fuse_listxattr_resume (fuse_state_t *state) static void -fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_listxattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_getxattr_in *fgxi = msg; fuse_state_t *state = NULL; @@ -3766,7 +3796,8 @@ fuse_removexattr_resume (fuse_state_t *state) static void -fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_removexattr (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { char *name = msg; @@ -3865,7 +3896,8 @@ fuse_getlk_resume (fuse_state_t *state) static void -fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_getlk (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lk_in *fli = msg; @@ -3957,7 +3989,8 @@ fuse_setlk_resume (fuse_state_t *state) static void -fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_setlk (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_lk_in *fli = msg; @@ -4056,7 +4089,8 @@ notify_kernel_loop (void *data) #endif static void -fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { struct fuse_init_in *fini = msg; struct fuse_init_out fino = {0,}; @@ -4227,7 +4261,8 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg) static void -fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { send_fuse_err (this, finh, ENOSYS); @@ -4236,7 +4271,8 @@ fuse_enosys (xlator_t *this, fuse_in_header_t *finh, void *msg) static void -fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_destroy (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { send_fuse_err (this, finh, 0); @@ -4826,6 +4862,7 @@ fuse_graph_sync (xlator_t *this) new_graph_id = priv->next_graph->id; priv->next_graph = NULL; need_first_lookup = 1; + priv->handle_graph_switch = _gf_true; while (!priv->event_recvd) { ret = pthread_cond_wait (&priv->sync_cond, @@ -4854,6 +4891,8 @@ unlock: { old_subvol->switched = 1; winds_on_old_subvol = old_subvol->winds; + priv->handle_graph_switch = _gf_false; + pthread_cond_broadcast (&priv->migrate_cond); } pthread_mutex_unlock (&priv->sync_mutex); @@ -4861,6 +4900,13 @@ unlock: xlator_notify (old_subvol, GF_EVENT_PARENT_DOWN, old_subvol, NULL); } + } else { + pthread_mutex_lock (&priv->sync_mutex); + { + priv->handle_graph_switch = _gf_false; + pthread_cond_broadcast (&priv->migrate_cond); + } + pthread_mutex_unlock (&priv->sync_mutex); } return 0; @@ -4897,7 +4943,6 @@ fuse_thread_proc (void *data) const size_t msg0_size = sizeof (*finh) + 128; fuse_handler_t **fuse_ops = NULL; struct pollfd pfd[2] = {{0,}}; - gf_boolean_t mount_finished = _gf_false; this = data; priv = this->private; @@ -4914,32 +4959,40 @@ fuse_thread_proc (void *data) /* THIS has to be reset here */ THIS = this; - if (!mount_finished) { - memset(pfd,0,sizeof(pfd)); - pfd[0].fd = priv->status_pipe[0]; - pfd[0].events = POLLIN | POLLHUP | POLLERR; - pfd[1].fd = priv->fd; - pfd[1].events = POLLIN | POLLHUP | POLLERR; - if (poll(pfd,2,-1) < 0) { - gf_log (this->name, GF_LOG_ERROR, - "poll error %s", strerror(errno)); - break; - } - if (pfd[0].revents & POLLIN) { - if (fuse_get_mount_status(this) != 0) { + pthread_mutex_lock (&priv->sync_mutex); + { + if (!priv->mount_finished) { + memset(pfd, 0, sizeof(pfd)); + pfd[0].fd = priv->status_pipe[0]; + pfd[0].events = POLLIN | POLLHUP | POLLERR; + pfd[1].fd = priv->fd; + pfd[1].events = POLLIN | POLLHUP | POLLERR; + if (poll(pfd, 2, -1) < 0) { + gf_log (this->name, GF_LOG_ERROR, + "poll error %s", + strerror(errno)); + pthread_mutex_unlock (&priv->sync_mutex); break; } - mount_finished = _gf_true; - } - else if (pfd[0].revents) { - gf_log (this->name, GF_LOG_ERROR, - "mount pipe closed without status"); - break; - } - if (!pfd[1].revents) { - continue; + if (pfd[0].revents & POLLIN) { + if (fuse_get_mount_status(this) != 0) { + pthread_mutex_unlock (&priv->sync_mutex); + break; + } + priv->mount_finished = _gf_true; + } else if (pfd[0].revents) { + gf_log (this->name, GF_LOG_ERROR, + "mount pipe closed without status"); + pthread_mutex_unlock (&priv->sync_mutex); + break; + } + if (!pfd[1].revents) { + pthread_mutex_unlock (&priv->sync_mutex); + continue; + } } } + pthread_mutex_unlock (&priv->sync_mutex); /* * We don't want to block on readv while we're still waiting @@ -5034,8 +5087,6 @@ fuse_thread_proc (void *data) break; } - priv->iobuf = iobuf; - /* * This can be moved around a bit, but it's important to do it * *after* the readv. Otherwise, a graph switch could occur @@ -5078,9 +5129,9 @@ fuse_thread_proc (void *data) if (finh->opcode >= FUSE_OP_HIGH) /* turn down MacFUSE specific messages */ - fuse_enosys (this, finh, msg); + fuse_enosys (this, finh, msg, NULL); else - fuse_ops[finh->opcode] (this, finh, msg); + fuse_ops[finh->opcode] (this, finh, msg, iobuf); iobuf_unref (iobuf); continue; @@ -5152,8 +5203,6 @@ fuse_priv_dump (xlator_t *this) private->volfile_size); gf_proc_dump_write("mount_point", "%s", private->mount_point); - gf_proc_dump_write("iobuf", "%p", - private->iobuf); gf_proc_dump_write("fuse_thread_started", "%d", (int)private->fuse_thread_started); gf_proc_dump_write("direct_io_mode", "%d", @@ -5279,6 +5328,7 @@ unlock: int notify (xlator_t *this, int32_t event, void *data, ...) { + int i = 0; int32_t ret = 0; fuse_private_t *private = NULL; gf_boolean_t start_thread = _gf_false; @@ -5327,14 +5377,21 @@ notify (xlator_t *this, int32_t event, void *data, ...) pthread_mutex_unlock (&private->sync_mutex); if (start_thread) { - ret = gf_thread_create (&private->fuse_thread, NULL, - fuse_thread_proc, this, - "fuseproc"); - if (ret != 0) { - gf_log (this->name, GF_LOG_DEBUG, - "pthread_create() failed (%s)", - strerror (errno)); - break; + private->fuse_thread = GF_CALLOC (private->reader_thread_count, + sizeof (pthread_t), + gf_fuse_mt_pthread_t); + for (i = 0; i < private->reader_thread_count; i++) { + + ret = gf_thread_create (&private->fuse_thread[i], + NULL, + fuse_thread_proc, this, + "fuseproc"); + if (ret != 0) { + gf_log (this->name, GF_LOG_DEBUG, + "pthread_create() failed (%s)", + strerror (errno)); + break; + } } } @@ -5441,7 +5498,8 @@ static fuse_handler_t *fuse_dump_ops[FUSE_OP_HIGH]; static void -fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) +fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) { fuse_private_t *priv = NULL; struct iovec diov[6] = {{0,},}; @@ -5473,7 +5531,7 @@ fuse_dumper (xlator_t *this, fuse_in_header_t *finh, void *msg) "failed to dump fuse message (R): %s", strerror (errno)); - priv->fuse_ops0[finh->opcode] (this, finh, msg); + priv->fuse_ops0[finh->opcode] (this, finh, msg, NULL); } @@ -5578,6 +5636,9 @@ init (xlator_t *this_xl) GF_OPTION_INIT (ZR_ATTR_TIMEOUT_OPT, priv->attribute_timeout, double, cleanup_exit); + GF_OPTION_INIT ("reader-thread-count", priv->reader_thread_count, uint32, + cleanup_exit); + GF_OPTION_INIT (ZR_ENTRY_TIMEOUT_OPT, priv->entry_timeout, double, cleanup_exit); @@ -5793,6 +5854,7 @@ init (xlator_t *this_xl) pthread_mutex_init (&priv->fuse_dump_mutex, NULL); pthread_cond_init (&priv->sync_cond, NULL); + pthread_cond_init (&priv->migrate_cond, NULL); pthread_mutex_init (&priv->sync_mutex, NULL); priv->event_recvd = 0; @@ -5992,5 +6054,12 @@ struct volume_options options[] = { .description = "This option can be used to enable or disable fuse " "event history.", }, + { .key = {"reader-thread-count"}, + .type = GF_OPTION_TYPE_INT, + .default_value = "1", + .min = 1, + .max = 64, + .description = "Sets fuse reader thread count.", + }, { .key = {NULL} }, }; diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h index 2dfef64..4ca76e9 100644 --- a/xlators/mount/fuse/src/fuse-bridge.h +++ b/xlators/mount/fuse/src/fuse-bridge.h @@ -52,7 +52,7 @@ typedef struct fuse_in_header fuse_in_header_t; typedef void (fuse_handler_t) (xlator_t *this, fuse_in_header_t *finh, - void *msg); + void *msg, struct iobuf *iobuf); struct fuse_private { int fd; @@ -62,7 +62,8 @@ struct fuse_private { char *mount_point; struct iobuf *iobuf; - pthread_t fuse_thread; + pthread_t *fuse_thread; + uint32_t reader_thread_count; char fuse_thread_started; uint32_t direct_io_mode; @@ -140,6 +141,9 @@ struct fuse_private { /* whether to run the unmount daemon */ gf_boolean_t auto_unmount; + gf_boolean_t mount_finished; + gf_boolean_t handle_graph_switch; + pthread_cond_t migrate_cond; }; typedef struct fuse_private fuse_private_t; @@ -391,6 +395,7 @@ typedef struct { int32_t fd_no; gf_seek_what_t whence; + struct iobuf *iobuf; } fuse_state_t; typedef struct { diff --git a/xlators/mount/fuse/src/fuse-helpers.c b/xlators/mount/fuse/src/fuse-helpers.c index c59ff77..c2d4d0c 100644 --- a/xlators/mount/fuse/src/fuse-helpers.c +++ b/xlators/mount/fuse/src/fuse-helpers.c @@ -123,6 +123,9 @@ get_fuse_state (xlator_t *this, fuse_in_header_t *finh) pthread_mutex_lock (&priv->sync_mutex); { + while (priv->handle_graph_switch) + pthread_cond_wait (&priv->migrate_cond, + &priv->sync_mutex); active_subvol = fuse_active_subvol (state->this); active_subvol->winds++; } diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h index 2b4b473..721b9a3 100644 --- a/xlators/mount/fuse/src/fuse-mem-types.h +++ b/xlators/mount/fuse/src/fuse-mem-types.h @@ -23,6 +23,7 @@ enum gf_fuse_mem_types_ { gf_fuse_mt_graph_switch_args_t, gf_fuse_mt_gids_t, gf_fuse_mt_invalidate_node_t, + gf_fuse_mt_pthread_t, gf_fuse_mt_end }; #endif diff --git a/xlators/mount/fuse/utils/mount.glusterfs.in b/xlators/mount/fuse/utils/mount.glusterfs.in index b39bb98..817619e 100755 --- a/xlators/mount/fuse/utils/mount.glusterfs.in +++ b/xlators/mount/fuse/utils/mount.glusterfs.in @@ -221,6 +221,10 @@ start_glusterfs () cmd_line=$(echo "$cmd_line --event-history=$event_history"); fi + if [ -n "$reader_thread_count" ]; then + cmd_line=$(echo "$cmd_line --reader-thread-count=$reader_thread_count"); + fi + if [ -n "$volume_name" ]; then cmd_line=$(echo "$cmd_line --volume-name=$volume_name"); fi @@ -496,6 +500,9 @@ with_options() "event-history") event_history=$value ;; + "reader-thread-count") + reader_thread_count=$value + ;; "no-root-squash") if [ $value = "yes" ] || [ $value = "on" ] || -- 1.8.3.1