Blob Blame History Raw
From a3d7051a82e5547f8e8d0e5263f403968df13902 Mon Sep 17 00:00:00 2001
From: Pranith Kumar K <pkarampu@redhat.com>
Date: Thu, 17 Mar 2016 09:32:02 +0530
Subject: [PATCH 72/80] syncop: Add parallel dir scan functionality

Most of this functionality's ideas are contributed
by Richard Wareing, in his patch:
https://bugzilla.redhat.com/show_bug.cgi?id=1221737#c1

VERY BIG thanks to him :-).

After starting porting/testing the patch above, I found a few things we can
improve in this patch based on the results we got in testing.
1) We are reading all the indices before we launch self-heals. In some customer
cases I worked on there were almost 5million files/directories that needed
heal. With such a big number self-heal daemon will be OOM killed if we go
this route. So I modified this to launch heals based on a queue length
limit.

2) We found that for directory hierarchies, multi-threaded self-heal
patch was not giving better results compared to single-threaded
self-heal because of the order problems. We improved index xlator to
give gfid type to make sure that all directories in the indices are
healed before the files that follow in that iteration of readdir
output(http://review.gluster.org/13553). In our testing this lead to
zero errors of self-heals as we were only doing self-heals in parallel
for files and not directories. I think we can further improve self-heal
speed for directories by doing name heals in parallel based on similar
techniques Richard's patch showed. I think the best thing there would be to
introduce synccond_t infra (pthread_cond_t kind of infra for syncops)
which I am planning to implement for future releases.

3) Based on 1), 2) and the fact that afr already does retries of the
indices in a loop I removed retries again in the threads.

4) After the refactor, the changes required to bring in multi-threaded
self-heal for ec would just be ~10 lines, most of it will be about
options initialization.

Our tests found that we are able to easily saturate network :-).

High level description of the final feature:
Traditionally self-heal daemon reads the indices (gfids) that need to be healed
from the brick and initiates heal one gfid at a time. Goal of this feature is
to add parallelization to the way we do self-heals in a way we do not regress
in any case but increase parallelization wherever we can. As part of this following
knobs are introduced to improve parallelization:
1) We can launch 'max-jobs' number of heals in parallel.
2) We can keep reading indices as long as the wait-q for heals doesn't go over
   'max-qlen' passed as arguments to multi-threaded dir_scan.

As a first cut, we always do healing of directories in serial order one at a time
but for files we launch heals in parallel. In future we can do name-heals of dir
in parallel, but this is not implemented as of now. Reason for this is mentioned
already in '2)' above.

AFR/EC can introduce options like max-shd-threads/wait-qlength which can be set
by users to increase the rate of heals when they want. Please note that the
options will take effect only for the next crawl.

 >BUG: 1221737
 >Change-Id: I8fc0afc334def87797f6d41e309cefc722a317d2
 >Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
 >Reviewed-on: http://review.gluster.org/13569
 >NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
 >CentOS-regression: Gluster Build System <jenkins@build.gluster.com>
 >Reviewed-by: Jeff Darcy <jdarcy@redhat.com>
 >Smoke: Gluster Build System <jenkins@build.gluster.com>

 >BUG: 1325857
 >Change-Id: I23235bbb923208eee6a8be711bbfb14350edb11b
 >Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
 >Reviewed-on: http://review.gluster.org/13967
 >Smoke: Gluster Build System <jenkins@build.gluster.com>
 >NetBSD-regression: NetBSD Build System <jenkins@build.gluster.org>
 >CentOS-regression: Gluster Build System <jenkins@build.gluster.com>

BUG: 1314724
Change-Id: I803f5397aa86b83d04d8c7a237a6535707106fd0
Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/72370
---
 libglusterfs/src/mem-types.h    |    1 +
 libglusterfs/src/syncop-utils.c |  238 +++++++++++++++++++++++++++++++++++++++
 libglusterfs/src/syncop-utils.h |    7 +
 3 files changed, 246 insertions(+), 0 deletions(-)

diff --git a/libglusterfs/src/mem-types.h b/libglusterfs/src/mem-types.h
index 84949c6..dd96cc6 100644
--- a/libglusterfs/src/mem-types.h
+++ b/libglusterfs/src/mem-types.h
@@ -155,6 +155,7 @@ enum gf_common_mem_types_ {
         gf_common_mt_synctask,
         gf_common_mt_syncstack,
         gf_common_mt_syncenv,
+        gf_common_mt_scan_data,
         gf_common_mt_end
 };
 #endif
diff --git a/libglusterfs/src/syncop-utils.c b/libglusterfs/src/syncop-utils.c
index 16ae1f7..5e6b9fa 100644
--- a/libglusterfs/src/syncop-utils.c
+++ b/libglusterfs/src/syncop-utils.c
@@ -14,9 +14,24 @@
 #endif
 
 #include "syncop.h"
+#include "syncop-utils.h"
 #include "common-utils.h"
 #include "libglusterfs-messages.h"
 
+struct syncop_dir_scan_data {
+        xlator_t *subvol;
+        loc_t *parent;
+        void *data;
+        gf_dirent_t *q;
+        gf_dirent_t *entry;
+        pthread_cond_t *cond;
+        pthread_mutex_t *mut;
+        syncop_dir_scan_fn_t fn;
+        uint32_t *jobs_running;
+        uint32_t *qlen;
+        int32_t  *retval;
+};
+
 int
 syncop_dirfd (xlator_t *subvol, loc_t *loc, fd_t **fd, int pid)
 {
@@ -224,6 +239,229 @@ out:
         return ret;
 }
 
+static void
+_scan_data_destroy (struct syncop_dir_scan_data *data)
+{
+        GF_FREE (data);
+}
+
+static int
+_dir_scan_job_fn_cbk (int ret, call_frame_t *frame, void *opaque)
+{
+        struct syncop_dir_scan_data *scan_data = opaque;
+
+        _scan_data_destroy (scan_data);
+        return 0;
+}
+
+static int
+_dir_scan_job_fn (void *data)
+{
+        struct syncop_dir_scan_data *scan_data = data;
+        gf_dirent_t                 *entry     = NULL;
+        int                         ret        = 0;
+
+        entry = scan_data->entry;
+        scan_data->entry = NULL;
+        do {
+                ret = scan_data->fn (scan_data->subvol, entry,
+                                     scan_data->parent,
+                                     scan_data->data);
+                gf_dirent_entry_free (entry);
+                entry = NULL;
+                pthread_mutex_lock (scan_data->mut);
+                {
+                        if (ret || list_empty (&scan_data->q->list)) {
+                                (*scan_data->jobs_running)--;
+                                *scan_data->retval |= ret;
+                                pthread_cond_broadcast (scan_data->cond);
+                        } else {
+                                entry = list_first_entry (&scan_data->q->list,
+                                                  typeof (*scan_data->q), list);
+                                list_del_init (&entry->list);
+                                (*scan_data->qlen)--;
+                        }
+                }
+                pthread_mutex_unlock (scan_data->mut);
+        } while (entry);
+
+        return ret;
+}
+
+static int
+_run_dir_scan_task (xlator_t *subvol, loc_t *parent, gf_dirent_t *q,
+                    gf_dirent_t *entry, int *retval, pthread_mutex_t *mut,
+                    pthread_cond_t *cond, uint32_t *jobs_running,
+                    uint32_t *qlen, syncop_dir_scan_fn_t fn, void *data)
+{
+        int     ret = 0;
+        struct syncop_dir_scan_data *scan_data = NULL;
+
+
+        scan_data = GF_CALLOC (1, sizeof (struct syncop_dir_scan_data),
+                               gf_common_mt_scan_data);
+        if (!scan_data) {
+                ret = -ENOMEM;
+                goto out;
+        }
+
+        scan_data->subvol       = subvol;
+        scan_data->parent       = parent;
+        scan_data->data         = data;
+        scan_data->mut          = mut;
+        scan_data->cond         = cond;
+        scan_data->fn           = fn;
+        scan_data->jobs_running = jobs_running;
+        scan_data->entry        = entry;
+        scan_data->q            = q;
+        scan_data->qlen         = qlen;
+        scan_data->retval       = retval;
+
+        ret = synctask_new (subvol->ctx->env, _dir_scan_job_fn,
+                            _dir_scan_job_fn_cbk, NULL, scan_data);
+out:
+        if (ret < 0) {
+                gf_dirent_entry_free (entry);
+                _scan_data_destroy (scan_data);
+                pthread_mutex_lock (mut);
+                {
+                        *jobs_running = *jobs_running - 1;
+                }
+                pthread_mutex_unlock (mut);
+                /*No need to cond-broadcast*/
+        }
+        return ret;
+}
+
+int
+syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
+                    syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs,
+                    uint32_t max_qlen)
+{
+        fd_t        *fd    = NULL;
+        uint64_t    offset = 0;
+        gf_dirent_t *last = NULL;
+        int         ret    = 0;
+        int         retval = 0;
+        gf_dirent_t q;
+        gf_dirent_t *entry = NULL;
+        gf_dirent_t *tmp = NULL;
+        uint32_t    jobs_running = 0;
+        uint32_t    qlen = 0;
+        pthread_cond_t cond;
+        pthread_mutex_t mut;
+        gf_boolean_t cond_init = _gf_false;
+        gf_boolean_t mut_init = _gf_false;
+        gf_dirent_t entries;
+
+        /*For this functionality to be implemented in general, we need
+         * synccond_t infra which doesn't block the executing thread. Until then
+         * return failures inside synctask if they use this.*/
+        if (synctask_get())
+                return -ENOTSUP;
+
+        if (max_jobs == 0)
+                return -EINVAL;
+
+        /*Code becomes simpler this way. cond_wait just on qlength.
+         * Little bit of cheating*/
+        if (max_qlen == 0)
+                max_qlen = 1;
+
+        ret = syncop_dirfd (subvol, loc, &fd, pid);
+        if (ret)
+                goto out;
+
+        INIT_LIST_HEAD (&entries.list);
+        INIT_LIST_HEAD (&q.list);
+        ret = pthread_mutex_init (&mut, NULL);
+        if (ret)
+                goto out;
+        mut_init = _gf_true;
+
+        ret = pthread_cond_init (&cond, NULL);
+        if (ret)
+                goto out;
+        cond_init = _gf_true;
+
+        while ((ret = syncop_readdir (subvol, fd, 131072, offset, &entries,
+                                      xdata, NULL))) {
+                if (ret < 0)
+                        break;
+
+                if (ret > 0) {
+                        /* If the entries are only '.', and '..' then ret
+                         * value will be non-zero. so set it to zero here. */
+                        ret = 0;
+                }
+
+                last = list_last_entry (&entries.list, typeof (*last), list);
+                offset = last->d_off;
+
+                list_for_each_entry_safe (entry, tmp, &entries.list, list) {
+                        list_del_init (&entry->list);
+                        if (!strcmp (entry->d_name, ".") ||
+                            !strcmp (entry->d_name, "..")) {
+                                gf_dirent_entry_free (entry);
+                                continue;
+                        }
+
+                        if (entry->d_type == IA_IFDIR) {
+                                ret = fn (subvol, entry, loc, data);
+                                gf_dirent_entry_free (entry);
+                                if (ret)
+                                        break;
+                                continue;
+                        }
+
+                        pthread_mutex_lock (&mut);
+                        {
+                                while (qlen == max_qlen)
+                                        pthread_cond_wait (&cond, &mut);
+                                if (max_jobs == jobs_running) {
+                                        list_add_tail (&entry->list, &q.list);
+                                        qlen++;
+                                        entry = NULL;
+                                } else {
+                                        jobs_running++;
+                                }
+                        }
+                        pthread_mutex_unlock (&mut);
+                        if (retval) /*Any jobs failed?*/
+                                break;
+
+                        if (!entry)
+                                continue;
+
+                        ret = _run_dir_scan_task (subvol, loc, &q, entry,
+                                                  &retval, &mut, &cond,
+                                                &jobs_running, &qlen, fn, data);
+                        if (ret)
+                                break;
+                }
+        }
+
+out:
+        if (fd)
+                fd_unref (fd);
+        if (mut_init && cond_init) {
+                pthread_mutex_lock (&mut);
+                {
+                        while (jobs_running)
+                                pthread_cond_wait (&cond, &mut);
+                }
+                pthread_mutex_unlock (&mut);
+                gf_dirent_free (&q);
+                gf_dirent_free (&entries);
+        }
+
+        if (mut_init)
+                pthread_mutex_destroy (&mut);
+        if (cond_init)
+                pthread_cond_destroy (&cond);
+        return ret|retval;
+}
+
 int
 syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
                  int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
diff --git a/libglusterfs/src/syncop-utils.h b/libglusterfs/src/syncop-utils.h
index 7a9ccac..52bcfd9 100644
--- a/libglusterfs/src/syncop-utils.h
+++ b/libglusterfs/src/syncop-utils.h
@@ -11,12 +11,19 @@
 #ifndef _SYNCOP_UTILS_H
 #define _SYNCOP_UTILS_H
 
+typedef int (*syncop_dir_scan_fn_t) (xlator_t *subvol, gf_dirent_t *entry,
+                                     loc_t *parent, void *data);
 int
 syncop_ftw (xlator_t *subvol, loc_t *loc, int pid, void *data,
             int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
                        void *data));
 
 int
+syncop_mt_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
+                    syncop_dir_scan_fn_t fn, dict_t *xdata, uint32_t max_jobs,
+                    uint32_t max_qlen);
+
+int
 syncop_dir_scan (xlator_t *subvol, loc_t *loc, int pid, void *data,
                  int (*fn) (xlator_t *subvol, gf_dirent_t *entry, loc_t *parent,
                             void *data));
-- 
1.7.1