12a457
From a3d7051a82e5547f8e8d0e5263f403968df13902 Mon Sep 17 00:00:00 2001
12a457
From: Pranith Kumar K <pkarampu@redhat.com>
12a457
Date: Thu, 17 Mar 2016 09:32:02 +0530
12a457
Subject: [PATCH 72/80] syncop: Add parallel dir scan functionality
12a457
12a457
Most of this functionality's ideas are contributed
12a457
by Richard Wareing, in his patch:
12a457
https://bugzilla.redhat.com/show_bug.cgi?id=1221737#c1
12a457
12a457
VERY BIG thanks to him :-).
12a457
12a457
After starting porting/testing the patch above, I found a few things we can
12a457
improve in this patch based on the results we got in testing.
12a457
1) We are reading all the indices before we launch self-heals. In some customer
12a457
cases I worked on there were almost 5million files/directories that needed
12a457
heal. With such a big number self-heal daemon will be OOM killed if we go
12a457
this route. So I modified this to launch heals based on a queue length
12a457
limit.
12a457
12a457
2) We found that for directory hierarchies, multi-threaded self-heal
12a457
patch was not giving better results compared to single-threaded
12a457
self-heal because of the order problems. We improved index xlator to
12a457
give gfid type to make sure that all directories in the indices are
12a457
healed before the files that follow in that iteration of readdir
12a457
output(http://review.gluster.org/13553). In our testing this lead to
12a457
zero errors of self-heals as we were only doing self-heals in parallel
12a457
for files and not directories. I think we can further improve self-heal
12a457
speed for directories by doing name heals in parallel based on similar
12a457
techniques Richard's patch showed. I think the best thing there would be to
12a457
introduce synccond_t infra (pthread_cond_t kind of infra for syncops)
12a457
which I am planning to implement for future releases.
12a457
12a457
3) Based on 1), 2) and the fact that afr already does retries of the
12a457
indices in a loop I removed retries again in the threads.
12a457
12a457
4) After the refactor, the changes required to bring in multi-threaded
12a457
self-heal for ec would just be ~10 lines, most of it will be about
12a457
options initialization.
12a457
12a457
Our tests found that we are able to easily saturate network :-).
12a457
12a457
High level description of the final feature:
12a457
Traditionally self-heal daemon reads the indices (gfids) that need to be healed
12a457
from the brick and initiates heal one gfid at a time. Goal of this feature is
12a457
to add parallelization to the way we do self-heals in a way we do not regress
12a457
in any case but increase parallelization wherever we can. As part of this following
12a457
knobs are introduced to improve parallelization:
12a457
1) We can launch 'max-jobs' number of heals in parallel.
12a457
2) We can keep reading indices as long as the wait-q for heals doesn't go over
12a457
   'max-qlen' passed as arguments to multi-threaded dir_scan.
12a457
12a457
As a first cut, we always do healing of directories in serial order one at a time
12a457
but for files we launch heals in parallel. In future we can do name-heals of dir
12a457
in parallel, but this is not implemented as of now. Reason for this is mentioned
12a457
already in '2)' above.
12a457
12a457
AFR/EC can introduce options like max-shd-threads/wait-qlength which can be set
12a457
by users to increase the rate of heals when they want. Please note that the
12a457
options will take effect only for the next crawl.
12a457
12a457
 >BUG: 1221737
12a457
 >Change-Id: I8fc0afc334def87797f6d41e309cefc722a317d2
12a457
 >Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
12a457
 >Reviewed-on: http://review.gluster.org/13569
12a457
 >NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
12a457
 >CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
12a457
 >Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
12a457
 >Smoke: Gluster Build System <jenkins@build.gluster.com>
12a457
12a457
 >BUG: 1325857
12a457
 >Change-Id: I23235bbb923208eee6a8be711bbfb14350edb11b
12a457
 >Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
12a457
 >Reviewed-on: http://review.gluster.org/13967
12a457
 >Smoke: Gluster Build System <jenkins@build.gluster.com>
12a457
 >NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
12a457
 >CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
12a457
12a457
BUG: 1314724
12a457
Change-Id: I803f5397aa86b83d04d8c7a237a6535707106fd0
12a457
Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
12a457
Reviewed-on: https://code.engineering.redhat.com/gerrit/72370
12a457
---
12a457
 libglusterfs/src/mem-types.h    |    1 +
12a457
 libglusterfs/src/syncop-utils.c |  238 +++++++++++++++++++++++++++++++++++++++
12a457
 libglusterfs/src/syncop-utils.h |    7 +
12a457
 3 files changed, 246 insertions(+), 0 deletions(-)
12a457
12a457
diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h
12a457
index 84949c6..dd96cc6 100644
12a457
--- a/libglusterfs/src/mem-types.h
12a457
+++ b/libglusterfs/src/mem-types.h
12a457
@@ -155,6 +155,7 @@ enum gf_common_mem_types_ {
12a457
         gf_common_mt_synctask,
12a457
         gf_common_mt_syncstack,
12a457
         gf_common_mt_syncenv,
12a457
+        gf_common_mt_scan_data,
12a457
         gf_common_mt_end
12a457
 };
12a457
 #endif
12a457
diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c
12a457
index 16ae1f7..5e6b9fa 100644
12a457
--- a/libglusterfs/src/syncop-utils.c
12a457
+++ b/libglusterfs/src/syncop-utils.c
12a457
@@ -14,9 +14,24 @@
12a457
 #endif
12a457
 
12a457
 #include "syncop.h"
12a457
+#include "syncop-utils.h"
12a457
 #include "common-utils.h"
12a457
 #include "libglusterfs-messages.h"
12a457
 
12a457
+struct syncop_dir_scan_data {
12a457
+        xlator_t *subvol;
12a457
+        loc_t *parent;
12a457
+        void *data;
12a457
+        gf_dirent_t *q;
12a457
+        gf_dirent_t *entry;
12a457
+        pthread_cond_t *cond;
12a457
+        pthread_mutex_t *mut;
12a457
+        syncop_dir_scan_fn_t fn;
12a457
+        uint32_t *jobs_running;
12a457
+        uint32_t *qlen;
12a457
+        int32_t  *retval;
12a457
+};
12a457
+
12a457
 int
12a457
 syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid)
12a457
 {
12a457
@@ -224,6 +239,229 @@ out:
12a457
         return ret;
12a457
 }
12a457
 
12a457
+static void
12a457
+_scan_data_destroy (struct syncop_dir_scan_data *data)
12a457
+{
12a457
+        GF_FREE (data);
12a457
+}
12a457
+
12a457
+static int
12a457
+_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque)
12a457
+{
12a457
+        struct syncop_dir_scan_data *scan_data = opaque;
12a457
+
12a457
+        _scan_data_destroy (scan_data);
12a457
+        return 0;
12a457
+}
12a457
+
12a457
+static int
12a457
+_dir_scan_job_fn (void *data)
12a457
+{
12a457
+        struct syncop_dir_scan_data *scan_data = data;
12a457
+        gf_dirent_t                 *entry     = NULL;
12a457
+        int                         ret        = 0;
12a457
+
12a457
+        entry = scan_data->entry;
12a457
+        scan_data->entry = NULL;
12a457
+        do {
12a457
+                ret = scan_data->fn (scan_data->subvol, entry,
12a457
+                                     scan_data->parent,
12a457
+                                     scan_data->data);
12a457
+                gf_dirent_entry_free (entry);
12a457
+                entry = NULL;
12a457
+                pthread_mutex_lock (scan_data->mut);
12a457
+                {
12a457
+                        if (ret || list_empty (&scan_data->q->list)) {
12a457
+                                (*scan_data->jobs_running)--;
12a457
+                                *scan_data->retval |= ret;
12a457
+                                pthread_cond_broadcast (scan_data->cond);
12a457
+                        } else {
12a457
+                                entry = list_first_entry (&scan_data->q->list,
12a457
+                                                  typeof (*scan_data->q), list);
12a457
+                                list_del_init (&entry->list);
12a457
+                                (*scan_data->qlen)--;
12a457
+                        }
12a457
+                }
12a457
+                pthread_mutex_unlock (scan_data->mut);
12a457
+        } while (entry);
12a457
+
12a457
+        return ret;
12a457
+}
12a457
+
12a457
+static int
12a457
+_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q,
12a457
+                    gf_dirent_t *entry, int *retval, pthread_mutex_t *mut,
12a457
+                    pthread_cond_t *cond, uint32_t *jobs_running,
12a457
+                    uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data)
12a457
+{
12a457
+        int     ret = 0;
12a457
+        struct syncop_dir_scan_data *scan_data = NULL;
12a457
+
12a457
+
12a457
+        scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data),
12a457
+                               gf_common_mt_scan_data);
12a457
+        if (!scan_data) {
12a457
+                ret = -ENOMEM;
12a457
+                goto out;
12a457
+        }
12a457
+
12a457
+        scan_data->subvol       = subvol;
12a457
+        scan_data->parent       = parent;
12a457
+        scan_data->data         = data;
12a457
+        scan_data->mut          = mut;
12a457
+        scan_data->cond         = cond;
12a457
+        scan_data->fn           = fn;
12a457
+        scan_data->jobs_running = jobs_running;
12a457
+        scan_data->entry        = entry;
12a457
+        scan_data->q            = q;
12a457
+        scan_data->qlen         = qlen;
12a457
+        scan_data->retval       = retval;
12a457
+
12a457
+        ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn,
12a457
+                            _dir_scan_job_fn_cbk, NULL, scan_data);
12a457
+out:
12a457
+        if (ret < 0) {
12a457
+                gf_dirent_entry_free (entry);
12a457
+                _scan_data_destroy (scan_data);
12a457
+                pthread_mutex_lock (mut);
12a457
+                {
12a457
+                        *jobs_running = *jobs_running - 1;
12a457
+                }
12a457
+                pthread_mutex_unlock (mut);
12a457
+                /*No need to cond-broadcast*/
12a457
+        }
12a457
+        return ret;
12a457
+}
12a457
+
12a457
+int
12a457
+syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
12a457
+                    syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs,
12a457
+                    uint32_t max_qlen)
12a457
+{
12a457
+        fd_t        *fd    = NULL;
12a457
+        uint64_t    offset = 0;
12a457
+        gf_dirent_t *last = NULL;
12a457
+        int         ret    = 0;
12a457
+        int         retval = 0;
12a457
+        gf_dirent_t q;
12a457
+        gf_dirent_t *entry = NULL;
12a457
+        gf_dirent_t *tmp = NULL;
12a457
+        uint32_t    jobs_running = 0;
12a457
+        uint32_t    qlen = 0;
12a457
+        pthread_cond_t cond;
12a457
+        pthread_mutex_t mut;
12a457
+        gf_boolean_t cond_init = _gf_false;
12a457
+        gf_boolean_t mut_init = _gf_false;
12a457
+        gf_dirent_t entries;
12a457
+
12a457
+        /*For this functionality to be implemented in general, we need
12a457
+         * synccond_t infra which doesn't block the executing thread. Until then
12a457
+         * return failures inside synctask if they use this.*/
12a457
+        if (synctask_get())
12a457
+                return -ENOTSUP;
12a457
+
12a457
+        if (max_jobs == 0)
12a457
+                return -EINVAL;
12a457
+
12a457
+        /*Code becomes simpler this way. cond_wait just on qlength.
12a457
+         * Little bit of cheating*/
12a457
+        if (max_qlen == 0)
12a457
+                max_qlen = 1;
12a457
+
12a457
+        ret = syncop_dirfd (subvol, loc, &fd, pid);
12a457
+        if (ret)
12a457
+                goto out;
12a457
+
12a457
+        INIT_LIST_HEAD (&entries.list);
12a457
+        INIT_LIST_HEAD (&q.list);
12a457
+        ret = pthread_mutex_init (&mut, NULL);
12a457
+        if (ret)
12a457
+                goto out;
12a457
+        mut_init = _gf_true;
12a457
+
12a457
+        ret = pthread_cond_init (&cond, NULL);
12a457
+        if (ret)
12a457
+                goto out;
12a457
+        cond_init = _gf_true;
12a457
+
12a457
+        while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries,
12a457
+                                      xdata, NULL))) {
12a457
+                if (ret < 0)
12a457
+                        break;
12a457
+
12a457
+                if (ret > 0) {
12a457
+                        /* If the entries are only '.', and '..' then ret
12a457
+                         * value will be non-zero. so set it to zero here. */
12a457
+                        ret = 0;
12a457
+                }
12a457
+
12a457
+                last = list_last_entry (&entries.list, typeof (*last), list);
12a457
+                offset = last->d_off;
12a457
+
12a457
+                list_for_each_entry_safe (entry, tmp, &entries.list, list) {
12a457
+                        list_del_init (&entry->list);
12a457
+                        if (!strcmp (entry->d_name, ".") ||
12a457
+                            !strcmp (entry->d_name, "..")) {
12a457
+                                gf_dirent_entry_free (entry);
12a457
+                                continue;
12a457
+                        }
12a457
+
12a457
+                        if (entry->d_type == IA_IFDIR) {
12a457
+                                ret = fn (subvol, entry, loc, data);
12a457
+                                gf_dirent_entry_free (entry);
12a457
+                                if (ret)
12a457
+                                        break;
12a457
+                                continue;
12a457
+                        }
12a457
+
12a457
+                        pthread_mutex_lock (&mut;;
12a457
+                        {
12a457
+                                while (qlen == max_qlen)
12a457
+                                        pthread_cond_wait (&cond, &mut;;
12a457
+                                if (max_jobs == jobs_running) {
12a457
+                                        list_add_tail (&entry->list, &q.list);
12a457
+                                        qlen++;
12a457
+                                        entry = NULL;
12a457
+                                } else {
12a457
+                                        jobs_running++;
12a457
+                                }
12a457
+                        }
12a457
+                        pthread_mutex_unlock (&mut;;
12a457
+                        if (retval) /*Any jobs failed?*/
12a457
+                                break;
12a457
+
12a457
+                        if (!entry)
12a457
+                                continue;
12a457
+
12a457
+                        ret = _run_dir_scan_task (subvol, loc, &q, entry,
12a457
+                                                  &retval, &mut, &cond,
12a457
+                                                &jobs_running, &qlen, fn, data);
12a457
+                        if (ret)
12a457
+                                break;
12a457
+                }
12a457
+        }
12a457
+
12a457
+out:
12a457
+        if (fd)
12a457
+                fd_unref (fd);
12a457
+        if (mut_init && cond_init) {
12a457
+                pthread_mutex_lock (&mut;;
12a457
+                {
12a457
+                        while (jobs_running)
12a457
+                                pthread_cond_wait (&cond, &mut;;
12a457
+                }
12a457
+                pthread_mutex_unlock (&mut;;
12a457
+                gf_dirent_free (&q);
12a457
+                gf_dirent_free (&entries);
12a457
+        }
12a457
+
12a457
+        if (mut_init)
12a457
+                pthread_mutex_destroy (&mut;;
12a457
+        if (cond_init)
12a457
+                pthread_cond_destroy (&cond;;
12a457
+        return ret|retval;
12a457
+}
12a457
+
12a457
 int
12a457
 syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
12a457
                  int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
12a457
diff --git a/libglusterfs/src/syncop-utils.h b/libglusterfs/src/syncop-utils.h
12a457
index 7a9ccac..52bcfd9 100644
12a457
--- a/libglusterfs/src/syncop-utils.h
12a457
+++ b/libglusterfs/src/syncop-utils.h
12a457
@@ -11,12 +11,19 @@
12a457
 #ifndef _SYNCOP_UTILS_H
12a457
 #define _SYNCOP_UTILS_H
12a457
 
12a457
+typedef int (*syncop_dir_scan_fn_t) (xlator_t *subvol, gf_dirent_t *entry,
12a457
+                                     loc_t *parent, void *data);
12a457
 int
12a457
 syncop_ftw (xlator_t *subvol, loc_t *loc, int pid, void *data,
12a457
             int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
12a457
                        void *data));
12a457
 
12a457
 int
12a457
+syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
12a457
+                    syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs,
12a457
+                    uint32_t max_qlen);
12a457
+
12a457
+int
12a457
 syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
12a457
                  int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
12a457
                             void *data));
12a457
-- 
12a457
1.7.1
12a457