From 77716a11910ca2b88f37ff549776f7778cc17dae Mon Sep 17 00:00:00 2001 From: Csaba Henk Date: Thu, 9 Aug 2018 11:46:33 +0200 Subject: [PATCH 526/529] fuse: interrupt handling framework - add sub-framework to send timed responses to kernel - add interrupt handler queue - implement INTERRUPT fuse_interrupt looks up handlers for interrupted messages in the queue. If found, it invokes the handler function. Else responds with EAGAIN with a delay. See spec at https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.17#n148 and explanation in comments. Upstream: https://review.gluster.org/20686 > Change-Id: I1a79d3679b31f36e14b4ac8f60b7f2c1ea2badfb > updates: #465 > Signed-off-by: Csaba Henk Change-Id: Idff76920aaa9f87b185dabb0b431a31fcd2a2c77 BUG: 1595246 Signed-off-by: Csaba Henk Reviewed-on: https://code.engineering.redhat.com/gerrit/162549 Tested-by: RHGS Build Bot Reviewed-by: Amar Tumballi Suryanarayan Reviewed-by: Sunil Kumar Heggodu Gopala Acharya --- libglusterfs/src/timespec.c | 16 ++ libglusterfs/src/timespec.h | 1 + xlators/mount/fuse/src/fuse-bridge.c | 464 +++++++++++++++++++++++++++++++- xlators/mount/fuse/src/fuse-bridge.h | 39 +++ xlators/mount/fuse/src/fuse-mem-types.h | 2 + 5 files changed, 521 insertions(+), 1 deletion(-) diff --git a/libglusterfs/src/timespec.c b/libglusterfs/src/timespec.c index 903303d..55f7575 100644 --- a/libglusterfs/src/timespec.c +++ b/libglusterfs/src/timespec.c @@ -72,3 +72,19 @@ void timespec_sub (const struct timespec *begin, const struct timespec *end, res->tv_nsec = end->tv_nsec - begin->tv_nsec; } } + +int +timespec_cmp(const struct timespec *lhs_ts, const struct timespec *rhs_ts) +{ + if (lhs_ts->tv_sec < rhs_ts->tv_sec) { + return -1; + } else if (lhs_ts->tv_sec > rhs_ts->tv_sec) { + return 1; + } else if (lhs_ts->tv_nsec < rhs_ts->tv_nsec) { + return -1; + } else if (lhs_ts->tv_nsec > rhs_ts->tv_nsec) { + return 1; + } + + return 0; +} diff --git a/libglusterfs/src/timespec.h b/libglusterfs/src/timespec.h index 9c393ee..aa37951 100644 --- a/libglusterfs/src/timespec.h +++ b/libglusterfs/src/timespec.h @@ -23,5 +23,6 @@ void timespec_adjust_delta (struct timespec *ts, struct timespec delta); void timespec_sub (const struct timespec *begin, const struct timespec *end, struct timespec *res); +int timespec_cmp(const struct timespec *lhs_ts, const struct timespec *rhs_ts); #endif /* __INCLUDE_TIMESPEC_H__ */ diff --git a/xlators/mount/fuse/src/fuse-bridge.c b/xlators/mount/fuse/src/fuse-bridge.c index f3188d6..0d4b9db 100644 --- a/xlators/mount/fuse/src/fuse-bridge.c +++ b/xlators/mount/fuse/src/fuse-bridge.c @@ -15,6 +15,7 @@ #include "compat-errno.h" #include "glusterfs-acl.h" #include "syscall.h" +#include "timespec.h" #ifdef __NetBSD__ #undef open /* in perfuse.h, pulled from mount-gluster-compat.h */ @@ -426,6 +427,361 @@ fuse_inode_invalidate_fn(xlator_t *this, inode_t *inode) } #endif +static fuse_timed_message_t * +fuse_timed_message_new (void) +{ + fuse_timed_message_t *dmsg = NULL; + + dmsg = GF_MALLOC (sizeof (*dmsg), gf_fuse_mt_timed_message_t); + if (!dmsg) { + return NULL; + } + + /* should be NULL if not set */ + dmsg->fuse_message_body = NULL; + INIT_LIST_HEAD (&dmsg->next); + + return dmsg; +} + +static void +fuse_timed_message_free (fuse_timed_message_t *dmsg) +{ + GF_FREE (dmsg->fuse_message_body); + GF_FREE (dmsg); +} + +static void +send_fuse_timed (xlator_t *this, fuse_timed_message_t *dmsg) +{ + fuse_private_t *priv = NULL; + + priv = this->private; + + if (!priv->timed_response_fuse_thread_started) { + return; + } + + pthread_mutex_lock (&priv->timed_mutex); + { + list_add_tail (&dmsg->next, &priv->timed_list); + pthread_cond_signal (&priv->timed_cond); + } + pthread_mutex_unlock (&priv->timed_mutex); +} + +fuse_interrupt_record_t * +fuse_interrupt_record_new (fuse_in_header_t *finh, + fuse_interrupt_handler_t handler) +{ + fuse_interrupt_record_t *fir = NULL; + + fir = GF_MALLOC (sizeof (*fir), gf_fuse_mt_interrupt_record_t); + if (!fir) { + return NULL; + } + + fir->hit = _gf_false; + fir->interrupt_state = INTERRUPT_NONE; + fir->data = NULL; + + fir->interrupt_handler = handler; + memcpy (&fir->fuse_in_header, finh, sizeof (*finh)); + pthread_cond_init (&fir->handler_cond, NULL); + pthread_mutex_init (&fir->handler_mutex, NULL); + INIT_LIST_HEAD (&fir->next); + + return fir; +} + +static void +fuse_interrupt_record_free (fuse_interrupt_record_t *fir, void **datap) +{ + /* + * If caller wishes, we give back the private data to let them deal with it + * however they want; otherwise we take care of freeing it. + */ + if (datap) { + *datap = fir->data; + } else { + GF_FREE (fir->data); + } + + GF_FREE (fir); +} + +void +fuse_interrupt_record_insert (xlator_t *this, fuse_interrupt_record_t *fir) +{ + fuse_private_t *priv = NULL; + + priv = this->private; + pthread_mutex_lock (&priv->interrupt_mutex); + { + list_add_tail (&fir->next, &priv->interrupt_list); + } + pthread_mutex_unlock (&priv->interrupt_mutex); +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_fetch (xlator_t *this, uint64_t unique, gf_boolean_t reap) +{ + fuse_interrupt_record_t *fir = NULL; + gf_boolean_t found = _gf_false; + fuse_private_t *priv = NULL; + + priv = this->private; + pthread_mutex_lock (&priv->interrupt_mutex); + { + list_for_each_entry (fir, &priv->interrupt_list, next) + { + if (fir->fuse_in_header.unique == unique) { + /* + * If we are to reap, we do it regardless the + * hit flag; otherwise we take the record only + * hasn't yet flagged hit. + */ + if (reap || !fir->hit) { + found = _gf_true; + } + /* + * If we are not reaping (coming from handler + * context), we set the hit flag. + */ + if (!reap) { + fir->hit = _gf_true; + } + break; + } + } + if (found && reap) { + list_del (&fir->next); + } + } + pthread_mutex_unlock (&priv->interrupt_mutex); + + if (found) { + return fir; + } + return NULL; +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_get (xlator_t *this, uint64_t unique) +{ + return fuse_interrupt_record_fetch (this, unique, _gf_false); +} + +static fuse_interrupt_record_t * +fuse_interrupt_record_reap (xlator_t *this, uint64_t unique) +{ + return fuse_interrupt_record_fetch (this, unique, _gf_true); +} + +static void +fuse_interrupt (xlator_t *this, fuse_in_header_t *finh, void *msg, + struct iobuf *iobuf) +{ + struct fuse_interrupt_in *fii = msg; + fuse_interrupt_record_t *fir = NULL; + + gf_log ("glusterfs-fuse", GF_LOG_TRACE, + "unique %" PRIu64 " INTERRUPT for %" PRIu64, finh->unique, + fii->unique); + + fir = fuse_interrupt_record_get (this, fii->unique); + if (fir) { + gf_log ("glusterfs-fuse", GF_LOG_DEBUG, + "unique %" PRIu64 " INTERRUPT for %" PRIu64 + ": handler triggered", + finh->unique, fii->unique); + + fir->interrupt_handler (this, fir); + } else { + fuse_timed_message_t *dmsg = NULL; + + /* + * No record found for this interrupt request. + * + * It's either because the handler for the interrupted message + * does not want to handle interrupt, or this interrupt + * message beat the interrupted which hasn't yet added a record + * to the interrupt queue. Either case we reply with error + * EAGAIN with some (0.01 sec) delay. That will have this + * interrupt request resent, unless the interrupted message + * has been already answered. + * + * So effectively we are looping in between kernel and + * userspace, which will be exited either when the interrupted + * message handler has added an interrupt record, or has + * replied to kernel. See + * + * https://git.kernel.org/pub/scm/linux/kernel/git/torvalds/ + * linux.git/tree/Documentation/filesystems/fuse.txt?h=v4.18#n148 + */ + + gf_log ("glusterfs-fuse", GF_LOG_DEBUG, + "unique %" PRIu64 " INTERRUPT for %" PRIu64 ": no handler found", + finh->unique, fii->unique); + + dmsg = fuse_timed_message_new (); + if (!dmsg) { + gf_log ("glusterfs-fuse", GF_LOG_ERROR, + "unique %" PRIu64 " INTERRUPT for %" PRIu64 + ":" + " failed to allocate timed message", + finh->unique, fii->unique); + + return; + } + + dmsg->fuse_out_header.unique = finh->unique; + dmsg->fuse_out_header.len = sizeof (dmsg->fuse_out_header); + dmsg->fuse_out_header.error = -EAGAIN; + timespec_now (&dmsg->scheduled_ts); + timespec_adjust_delta (&dmsg->scheduled_ts, + (struct timespec){0, 10000000}); + + send_fuse_timed (this, dmsg); + } +} + +/* + * Function to be called in fop cbk context (if the fop engages + * with interrupt handling). + */ +gf_boolean_t +fuse_interrupt_finish_fop (call_frame_t *frame, xlator_t *this, + gf_boolean_t sync, void **datap) +{ + fuse_interrupt_record_t *fir = NULL; + fuse_state_t *state = frame->root->state; + fuse_in_header_t *finh = state->finh; + gf_boolean_t hit = _gf_false; + gf_boolean_t handled = _gf_false; + fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE; + + fir = fuse_interrupt_record_reap (this, finh->unique); + if (!fir) { + /* + * No interrupt record was inserted (however, caller would usually know + * about that and there is no point then in calling this function). + */ + return _gf_false; + } + + /* + * The interrupt handler (if finds the record) modifies fir->hit; however, + * that could have occurred only before fuse_interrupt_record_reap (), so + * we are safe here with a lock-free access. + */ + hit = fir->hit; + if (hit) { + pthread_mutex_lock (&fir->handler_mutex); + { + intstat_orig = fir->interrupt_state; + if (fir->interrupt_state == INTERRUPT_NONE) { + fir->interrupt_state = INTERRUPT_SQUELCHED; + if (sync) { + while (fir->interrupt_state == INTERRUPT_NONE) { + pthread_cond_wait (&fir->handler_cond, + &fir->handler_mutex); + } + } + } + } + pthread_mutex_unlock (&fir->handler_mutex); + } + + gf_log ("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig); + + /* + * From this on fir can only be referred under the conditions that imply + * we are to free it (otherwise interrupt handler might have already freed + * it). + */ + + if (/* there was no interrupt */ + !hit || + /* lost the race against interrupt handler */ + intstat_orig != INTERRUPT_NONE || + /* we took cleaning up on us */ + sync) { + /* cleaning up */ + fuse_interrupt_record_free (fir, datap); + } else if (datap) { + *datap = NULL; + } + + handled = (intstat_orig == INTERRUPT_HANDLED); + if (handled) { + /* + * Fuse request was answered already from interrupt context, we can do + * away with the stack. + */ + free_fuse_state (state); + STACK_DESTROY (frame->root); + } + + /* + * Let caller know if they have to answer the fuse request. + */ + return handled; +} + +/* + * Function to be called in interrupt handler context. + */ +void +fuse_interrupt_finish_interrupt (xlator_t *this, fuse_interrupt_record_t *fir, + fuse_interrupt_state_t intstat, + gf_boolean_t sync, void **datap) +{ + fuse_in_header_t finh = { + 0, + }; + fuse_interrupt_state_t intstat_orig = INTERRUPT_NONE; + + pthread_mutex_lock (&fir->handler_mutex); + { + intstat_orig = fir->interrupt_state; + if (fir->interrupt_state == INTERRUPT_NONE) { + fir->interrupt_state = intstat; + if (sync) { + pthread_cond_signal (&fir->handler_cond); + } + } + finh = fir->fuse_in_header; + } + pthread_mutex_unlock (&fir->handler_mutex); + + gf_log ("glusterfs-fuse", GF_LOG_DEBUG, "intstat_orig=%d", intstat_orig); + + /* + * From this on fir can only be referred under the conditions that imply + * we are to free it (otherwise fop handler might have already freed it). + */ + + if (/* we won the race, response is up to us */ + intstat_orig == INTERRUPT_NONE && + /* interrupt handling was successful, let the kernel know */ + intstat == INTERRUPT_HANDLED) { + send_fuse_err (this, &finh, EINTR); + } + + if (/* lost the race ... */ + intstat_orig != INTERRUPT_NONE && + /* + * ... and there is no contract with fop handler that it does the + * cleanup ... + */ + !sync) { + /* ... so we do! */ + fuse_interrupt_record_free (fir, datap); + } else if (datap) { + *datap = NULL; + } +} int send_fuse_err (xlator_t *this, fuse_in_header_t *finh, int error) @@ -4100,6 +4456,89 @@ notify_kernel_loop (void *data) } #endif +static void * +timed_response_loop (void *data) +{ + ssize_t rv = 0; + size_t len = 0; + xlator_t *this = NULL; + fuse_private_t *priv = NULL; + fuse_timed_message_t *dmsg = NULL; + fuse_timed_message_t *tmp = NULL; + struct timespec now = {0,}; + struct timespec delta = {0,}; + struct iovec iovs[2] = {{0,},}; + fuse_in_header_t finh = {0,}; + + this = data; + priv = this->private; + + for (;;) { + pthread_mutex_lock (&priv->timed_mutex); + { + while (list_empty (&priv->timed_list)) { + pthread_cond_wait (&priv->timed_cond, &priv->timed_mutex); + } + + dmsg = list_entry (priv->timed_list.next, fuse_timed_message_t, + next); + list_for_each_entry (tmp, &priv->timed_list, next) + { + if (timespec_cmp (&tmp->scheduled_ts, &dmsg->scheduled_ts) < 0) { + dmsg = tmp; + } + } + + list_del_init (&dmsg->next); + } + pthread_mutex_unlock (&priv->timed_mutex); + + timespec_now (&now); + if (timespec_cmp (&now, &dmsg->scheduled_ts) < 0) { + timespec_sub (&now, &dmsg->scheduled_ts, &delta); + nanosleep (&delta, NULL); + } + + gf_log ("glusterfs-fuse", GF_LOG_TRACE, + "sending timed message of unique %"PRIu64, + dmsg->fuse_out_header.unique); + + len = dmsg->fuse_out_header.len; + iovs[0] = (struct iovec){&dmsg->fuse_out_header, + sizeof (struct fuse_out_header)}; + iovs[1] = (struct iovec){dmsg->fuse_message_body, + len - sizeof (struct fuse_out_header)}; + /* + * Nasty hack to allow us to use the send_fuse_iov API, + * which we resort to, as the API used in original upstream + * code used is not available in this codebase. + */ + finh.unique = dmsg->fuse_out_header.unique; + rv = send_fuse_iov (this, &finh, iovs, 2); + + fuse_timed_message_free (dmsg); + + if (rv == EBADF) { + break; + } + } + + gf_log ("glusterfs-fuse", GF_LOG_ERROR, "timed response loop terminated"); + + pthread_mutex_lock (&priv->timed_mutex); + { + priv->timed_response_fuse_thread_started = _gf_false; + list_for_each_entry_safe (dmsg, tmp, &priv->timed_list, next) + { + list_del_init (&dmsg->next); + fuse_timed_message_free (dmsg); + } + } + pthread_mutex_unlock (&priv->timed_mutex); + + return NULL; +} + static void fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, struct iobuf *iobuf) @@ -4112,6 +4551,7 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, #if FUSE_KERNEL_MINOR_VERSION >= 9 pthread_t messenger; #endif + pthread_t delayer; priv = this->private; @@ -4160,6 +4600,19 @@ fuse_init (xlator_t *this, fuse_in_header_t *finh, void *msg, fino.flags |= FUSE_BIG_WRITES; } + /* Start the thread processing timed responses */ + ret = gf_thread_create (&delayer, NULL, timed_response_loop, this, + "fusedlyd"); + if (ret != 0) { + gf_log ("glusterfs-fuse", GF_LOG_ERROR, + "failed to start timed response thread (%s)", + strerror (errno)); + + sys_close (priv->fd); + goto out; + } + priv->timed_response_fuse_thread_started = _gf_true; + /* Used for 'reverse invalidation of inode' */ if (fini->minor >= 12) { ret = gf_thread_create (&messenger, NULL, notify_kernel_loop, @@ -5229,6 +5682,8 @@ fuse_priv_dump (xlator_t *this) (int)private->init_recvd); gf_proc_dump_write("strict_volfile_check", "%d", (int)private->strict_volfile_check); + gf_proc_dump_write("timed_response_thread_started", "%d", + (int)private->timed_response_fuse_thread_started); gf_proc_dump_write("reverse_thread_started", "%d", (int)private->reverse_fuse_thread_started); gf_proc_dump_write("use_readdirp", "%d", private->use_readdirp); @@ -5486,7 +5941,7 @@ static fuse_handler_t *fuse_std_ops[FUSE_OP_HIGH] = { [FUSE_SETLKW] = fuse_setlk, [FUSE_ACCESS] = fuse_access, [FUSE_CREATE] = fuse_create, - /* [FUSE_INTERRUPT] */ + [FUSE_INTERRUPT] = fuse_interrupt, /* [FUSE_BMAP] */ [FUSE_DESTROY] = fuse_destroy, /* [FUSE_IOCTL] */ @@ -5611,6 +6066,13 @@ init (xlator_t *this_xl) pthread_cond_init (&priv->invalidate_cond, NULL); pthread_mutex_init (&priv->invalidate_mutex, NULL); + INIT_LIST_HEAD (&priv->timed_list); + pthread_cond_init (&priv->timed_cond, NULL); + pthread_mutex_init (&priv->timed_mutex, NULL); + + INIT_LIST_HEAD (&priv->interrupt_list); + pthread_mutex_init (&priv->interrupt_mutex, NULL); + /* get options from option dictionary */ ret = dict_get_str (options, ZR_MOUNTPOINT_OPT, &value_string); if (ret == -1 || value_string == NULL) { diff --git a/xlators/mount/fuse/src/fuse-bridge.h b/xlators/mount/fuse/src/fuse-bridge.h index 4e32a7f..ba3e000 100644 --- a/xlators/mount/fuse/src/fuse-bridge.h +++ b/xlators/mount/fuse/src/fuse-bridge.h @@ -147,6 +147,16 @@ struct fuse_private { /* LRU Limit, if not set, default is 128k for now */ uint32_t lru_limit; + + /* Delayed fuse response */ + struct list_head timed_list; + pthread_cond_t timed_cond; + pthread_mutex_t timed_mutex; + gf_boolean_t timed_response_fuse_thread_started; + + /* Interrupt subscription */ + struct list_head interrupt_list; + pthread_mutex_t interrupt_mutex; }; typedef struct fuse_private fuse_private_t; @@ -162,6 +172,35 @@ struct fuse_invalidate_node { }; typedef struct fuse_invalidate_node fuse_invalidate_node_t; +struct fuse_timed_message { + struct fuse_out_header fuse_out_header; + void *fuse_message_body; + struct timespec scheduled_ts; + struct list_head next; +}; +typedef struct fuse_timed_message fuse_timed_message_t; + +enum fuse_interrupt_state { + INTERRUPT_NONE, + INTERRUPT_SQUELCHED, + INTERRUPT_HANDLED, +}; +typedef enum fuse_interrupt_state fuse_interrupt_state_t; +struct fuse_interrupt_record; +typedef struct fuse_interrupt_record fuse_interrupt_record_t; +typedef void (*fuse_interrupt_handler_t) (xlator_t *this, + fuse_interrupt_record_t *); +struct fuse_interrupt_record { + struct fuse_in_header fuse_in_header; + void *data; + gf_boolean_t hit; + fuse_interrupt_state_t interrupt_state; + fuse_interrupt_handler_t interrupt_handler; + pthread_cond_t handler_cond; + pthread_mutex_t handler_mutex; + struct list_head next; +}; + struct fuse_graph_switch_args { xlator_t *this; xlator_t *old_subvol; diff --git a/xlators/mount/fuse/src/fuse-mem-types.h b/xlators/mount/fuse/src/fuse-mem-types.h index 721b9a3..4ded879 100644 --- a/xlators/mount/fuse/src/fuse-mem-types.h +++ b/xlators/mount/fuse/src/fuse-mem-types.h @@ -24,6 +24,8 @@ enum gf_fuse_mem_types_ { gf_fuse_mt_gids_t, gf_fuse_mt_invalidate_node_t, gf_fuse_mt_pthread_t, + gf_fuse_mt_timed_message_t, + gf_fuse_mt_interrupt_record_t, gf_fuse_mt_end }; #endif -- 1.8.3.1