Blob Blame History Raw
From a63f1839e93e6d7cf19b2f86b6d0601d54ca703c Mon Sep 17 00:00:00 2001
From: Sakshi <sabansal@redhat.com>
Date: Thu, 16 Jul 2015 14:31:03 +0530
Subject: [PATCH 49/80] dht: lock on subvols to prevent lookup vs rmdir race

There is a possibility that while an rmdir is completed on
some non-hashed subvol and proceeding to others, a lookup
selfheal can recreate the same directory on those subvols
for which the rmdir had succeeded. Now the deletion of the
parent directory will fail with an ENOTEMPTY.

To fix this take blocking inodelk on the subvols before
starting rmdir. Selfheal must also take blocking inodelk
before creating the entry.

BUG: 1115367
Change-Id: If8d2ac06d8fb05b968d864e979ec21d3e7a38f22
Signed-off-by: Sakshi <sabansal@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/71479
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
---
 xlators/cluster/dht/src/dht-common.c   |  192 +++++++++++++++++++---
 xlators/cluster/dht/src/dht-common.h   |   11 +-
 xlators/cluster/dht/src/dht-helper.c   |   33 +++-
 xlators/cluster/dht/src/dht-rename.c   |    2 +-
 xlators/cluster/dht/src/dht-selfheal.c |  285 ++++++++++++++++++++++++++------
 5 files changed, 435 insertions(+), 88 deletions(-)

diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 7755eba..36244e7 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -60,6 +60,9 @@ int32_t dht_set_fixed_dir_stat (struct iatt *stat)
 
 
 int
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this);
+
+int
 dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)
 {
         int              ret            = -1;
@@ -4673,6 +4676,10 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
 
                 if (IA_ISINVAL(orig_entry->d_stat.ia_type)) {
                         /*stat failed somewhere- ignore this entry*/
+                        gf_msg_debug (this->name, EINVAL,
+                                      "Invalid stat, ignoring entry "
+                                      "%s gfid %s", orig_entry->d_name,
+                                      uuid_utoa (orig_entry->d_stat.ia_gfid));
                         continue;
                 }
 
@@ -4685,7 +4692,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
                  * corresponding hashed subvolume will take care of the
                  * directory entry.
                  */
-
                         if (readdir_optimize) {
                                 if (prev->this == local->first_up_subvol)
                                         goto list;
@@ -5203,7 +5209,7 @@ out:
         if (local && local->lock.locks) {
                 /* store op_errno for failure case*/
                 local->op_errno = op_errno;
-                local->refresh_layout_unlock (frame, this, op_ret);
+                local->refresh_layout_unlock (frame, this, op_ret, 1);
 
                 if (op_ret == 0) {
                         DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno,
@@ -5261,7 +5267,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,
         return 0;
 err:
         if (local && local->lock.locks) {
-                local->refresh_layout_unlock (frame, this, -1);
+                local->refresh_layout_unlock (frame, this, -1, 1);
         } else {
                 DHT_STACK_UNWIND (mknod, frame, -1,
                                   op_errno, NULL, NULL, NULL,
@@ -5369,7 +5375,7 @@ dht_mknod_do (call_frame_t *frame)
                                          local->umask, local->params);
         return 0;
 err:
-        local->refresh_layout_unlock (frame, this, -1);
+        local->refresh_layout_unlock (frame, this, -1, 1);
 
         return 0;
 }
@@ -5384,7 +5390,8 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 }
 
 int32_t
-dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret)
+dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret,
+                  int invoke_cbk)
 {
         dht_local_t  *local      = NULL, *lock_local = NULL;
         call_frame_t *lock_frame = NULL;
@@ -5459,7 +5466,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
         return 0;
 err:
-        dht_mknod_finish (frame, this, -1);
+        dht_mknod_finish (frame, this, -1, 0);
         return 0;
 }
 
@@ -5490,7 +5497,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)
         local->lock.lk_count = count;
 
         ret = dht_blocking_inodelk (frame, lk_array, count,
-                                    dht_mknod_lock_cbk);
+                                    IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);
 
         if (ret < 0) {
                 local->lock.locks = NULL;
@@ -6030,7 +6037,7 @@ out:
         if (local && local->lock.locks) {
                 /* store op_errno for failure case*/
                 local->op_errno = op_errno;
-                local->refresh_layout_unlock (frame, this, op_ret);
+                local->refresh_layout_unlock (frame, this, op_ret, 1);
 
                 if (op_ret == 0) {
                         DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd,
@@ -6089,7 +6096,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,
 
 err:
         if (local && local->lock.locks) {
-                local->refresh_layout_unlock (frame, this, -1);
+                local->refresh_layout_unlock (frame, this, -1, 1);
         } else {
                 DHT_STACK_UNWIND (create, frame, -1,
                                   op_errno, NULL, NULL, NULL,
@@ -6256,7 +6263,7 @@ dht_create_do (call_frame_t *frame)
                                          local->umask, local->fd, local->params);
         return 0;
 err:
-        local->refresh_layout_unlock (frame, this, -1);
+        local->refresh_layout_unlock (frame, this, -1, 1);
 
         return 0;
 }
@@ -6270,7 +6277,8 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 }
 
 int32_t
-dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret)
+dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret,
+                   int invoke_cbk)
 {
         dht_local_t  *local      = NULL, *lock_local = NULL;
         call_frame_t *lock_frame = NULL;
@@ -6345,7 +6353,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
         return 0;
 err:
-        dht_create_finish (frame, this, -1);
+        dht_create_finish (frame, this, -1, 0);
         return 0;
 }
 
@@ -6376,7 +6384,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)
         local->lock.lk_count = count;
 
         ret = dht_blocking_inodelk (frame, lk_array, count,
-                                    dht_create_lock_cbk);
+                                    IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);
 
         if (ret < 0) {
                 local->lock.locks = NULL;
@@ -6800,8 +6808,8 @@ dht_rmdir_selfheal_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
 int
 dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
-               int op_ret, int op_errno, struct iatt *preparent,
-               struct iatt *postparent, dict_t *xdata)
+                             int op_ret, int op_errno, struct iatt *preparent,
+                             struct iatt *postparent, dict_t *xdata)
 {
         dht_local_t  *local = NULL;
         dht_conf_t   *conf = NULL;
@@ -6821,7 +6829,8 @@ dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                         local->op_errno = op_errno;
                         local->op_ret   = -1;
                         if (conf->subvolume_cnt != 1) {
-                                if (op_errno != ENOENT && op_errno != EACCES) {
+                                if (op_errno != ENOENT && op_errno != EACCES
+                                    && op_errno != ESTALE) {
                                         local->need_selfheal = 1;
                                 }
                         }
@@ -6845,6 +6854,7 @@ unlock:
         this_call_cnt = dht_frame_return (frame);
         if (is_last_call (this_call_cnt)) {
                if (local->need_selfheal) {
+                        dht_rmdir_unlock (frame, this);
                         local->layout =
                                 dht_layout_get (this, local->loc.inode);
 
@@ -6871,6 +6881,7 @@ unlock:
                         dht_set_fixed_dir_stat (&local->preparent);
                         dht_set_fixed_dir_stat (&local->postparent);
 
+                        dht_rmdir_unlock (frame, this);
                         DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
                                           local->op_errno, &local->preparent,
                                           &local->postparent, NULL);
@@ -6939,6 +6950,7 @@ unlock:
 
         if (done) {
                 if (local->need_selfheal && local->fop_succeeded) {
+                        dht_rmdir_unlock (frame, this);
                         local->layout =
                                 dht_layout_get (this, local->loc.inode);
 
@@ -6976,6 +6988,7 @@ unlock:
                         dht_set_fixed_dir_stat (&local->preparent);
                         dht_set_fixed_dir_stat (&local->postparent);
 
+                        dht_rmdir_unlock (frame, this);
                         DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
                                           local->op_errno, &local->preparent,
                                           &local->postparent, NULL);
@@ -6987,11 +7000,110 @@ unlock:
 
 
 int
+dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+        DHT_STACK_DESTROY (frame);
+        return 0;
+}
+
+
+int
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)
+{
+        dht_local_t  *local      = NULL, *lock_local = NULL;
+        call_frame_t *lock_frame = NULL;
+        int           lock_count = 0;
+
+        local = frame->local;
+        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
+
+        if (lock_count == 0)
+                goto done;
+
+        lock_frame = copy_frame (frame);
+        if (lock_frame == NULL)
+                goto done;
+
+        lock_local = dht_local_init (lock_frame, &local->loc, NULL,
+                                     lock_frame->root->op);
+        if (lock_local == NULL)
+                goto done;
+
+        lock_local->lock.locks = local->lock.locks;
+        lock_local->lock.lk_count = local->lock.lk_count;
+
+        local->lock.locks = NULL;
+        local->lock.lk_count = 0;
+        dht_unlock_inodelk (lock_frame, lock_local->lock.locks,
+                            lock_local->lock.lk_count,
+                            dht_rmdir_unlock_cbk);
+        lock_frame = NULL;
+
+done:
+        if (lock_frame != NULL) {
+                DHT_STACK_DESTROY (lock_frame);
+        }
+
+        return 0;
+}
+
+
+int
+dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                    int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+        dht_local_t  *local = NULL;
+        dht_conf_t   *conf  = NULL;
+        int           i     = 0;
+
+        VALIDATE_OR_GOTO (this->private, err);
+
+        conf = this->private;
+        local = frame->local;
+
+        if (op_ret < 0) {
+                gf_msg (this->name, GF_LOG_WARNING, op_errno,
+                        DHT_MSG_INODE_LK_ERROR,
+                        "acquiring inodelk failed rmdir for %s)",
+                        local->loc.path);
+
+                local->op_ret = -1;
+                local->op_errno = op_errno;
+                goto err;
+        }
+
+        for (i = 0; i < conf->subvolume_cnt; i++) {
+                if (local->hashed_subvol &&
+                    (local->hashed_subvol == conf->subvolumes[i]))
+                        continue;
+
+                STACK_WIND (frame, dht_rmdir_cbk,
+                            conf->subvolumes[i],
+                            conf->subvolumes[i]->fops->rmdir,
+                            &local->loc, local->flags, NULL);
+        }
+
+        return 0;
+
+err:
+        /* No harm in calling an extra rmdir unlock */
+        dht_rmdir_unlock (frame, this);
+        DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
+                          &local->preparent, &local->postparent, NULL);
+
+        return 0;
+}
+
+
+int
 dht_rmdir_do (call_frame_t *frame, xlator_t *this)
 {
         dht_local_t  *local = NULL;
         dht_conf_t   *conf = NULL;
-        int           i = 0;
+        dht_lock_t   **lk_array = NULL;
+        int           i = 0, ret = -1;
+        int           count = 1;
         xlator_t     *hashed_subvol = NULL;
         char gfid[GF_UUID_BUF_SIZE] ={0};
 
@@ -7005,7 +7117,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
 
         local->call_cnt = conf->subvolume_cnt;
 
-
         /* first remove from non-hashed_subvol */
         hashed_subvol = dht_subvol_get_hashed (this, &local->loc);
 
@@ -7029,15 +7140,39 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
                 return 0;
         }
 
-        for (i = 0; i < conf->subvolume_cnt; i++) {
-                if (hashed_subvol &&
-                    (hashed_subvol == conf->subvolumes[i]))
-                        continue;
+        count = conf->subvolume_cnt;
 
-                STACK_WIND (frame, dht_rmdir_cbk,
-                            conf->subvolumes[i],
-                            conf->subvolumes[i]->fops->rmdir,
-                            &local->loc, local->flags, NULL);
+        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
+        if (lk_array == NULL) {
+                local->op_ret = -1;
+                local->op_errno = ENOMEM;
+                goto err;
+        }
+
+        for (i = 0; i < count; i++) {
+                lk_array[i] = dht_lock_new (frame->this,
+                                            conf->subvolumes[i],
+                                            &local->loc, F_WRLCK,
+                                            DHT_LAYOUT_HEAL_DOMAIN);
+                if (lk_array[i] == NULL) {
+                        local->op_ret = -1;
+                        local->op_errno = EINVAL;
+                        goto err;
+                }
+        }
+
+        local->lock.locks = lk_array;
+        local->lock.lk_count = count;
+
+        ret = dht_blocking_inodelk (frame, lk_array, count,
+                                    IGNORE_ENOENT_ESTALE,
+                                    dht_rmdir_lock_cbk);
+        if (ret < 0) {
+                local->lock.locks = NULL;
+                local->lock.lk_count = 0;
+                local->op_ret = -1;
+                local->op_errno = errno ? errno : EINVAL;
+                goto err;
         }
 
         return 0;
@@ -7046,6 +7181,11 @@ err:
         dht_set_fixed_dir_stat (&local->preparent);
         dht_set_fixed_dir_stat (&local->postparent);
 
+        if (lk_array != NULL) {
+                dht_lock_array_free (lk_array, count);
+                GF_FREE (lk_array);
+        }
+
         DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
                           &local->preparent, &local->postparent, NULL);
         return 0;
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index edfb805..d06224c 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -50,7 +50,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t        *this, xlator_t *dst_node,
                                     call_frame_t    *frame, int ret);
 
 typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this,
-                                         int op_ret);
+                                         int op_ret, int invoke_cbk);
 
 typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame);
 
@@ -145,6 +145,11 @@ typedef enum {
         qdstatfs_action_COMPARE,
 } qdstatfs_action_t;
 
+typedef enum {
+        FAIL_ON_ANY_ERROR,
+        IGNORE_ENOENT_ESTALE
+} dht_reaction_type_t;
+
 struct dht_skip_linkto_unlink {
 
         gf_boolean_t    handle_valid_link;
@@ -275,6 +280,7 @@ struct dht_local {
                 fop_inodelk_cbk_t   inodelk_cbk;
                 dht_lock_t        **locks;
                 int                 lk_count;
+                dht_reaction_type_t reaction;
 
                 /* whether locking failed on _any_ of the "locks" above */
                 int                 op_ret;
@@ -1132,7 +1138,8 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
  */
 int
 dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
-                      int lk_count, fop_inodelk_cbk_t inodelk_cbk);
+                      int lk_count, dht_reaction_type_t reaction,
+                      fop_inodelk_cbk_t inodelk_cbk);
 
 int32_t
 dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count,
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index df31cdb..881db81 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -496,6 +496,7 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,
 
         lock->xl = xl;
         lock->type = type;
+
         lock->domain = gf_strdup (domain);
         if (lock->domain == NULL) {
                 dht_lock_free (lock);
@@ -1978,21 +1979,41 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                           int32_t op_ret, int32_t op_errno, dict_t *xdata)
 {
         int          lk_index = 0;
+        int          i        = 0;
         dht_local_t *local    = NULL;
 
         lk_index = (long) cookie;
 
         local = frame->local;
-
         if (op_ret == 0) {
                 local->lock.locks[lk_index]->locked = _gf_true;
         } else {
-                local->lock.op_ret = -1;
-                local->lock.op_errno = op_errno;
-                goto cleanup;
+                switch (op_errno) {
+                case ESTALE:
+                case ENOENT:
+                        if (local->lock.reaction != IGNORE_ENOENT_ESTALE) {
+                                local->lock.op_ret = -1;
+                                local->lock.op_errno = op_errno;
+                                goto cleanup;
+                        }
+                        break;
+                default:
+                        local->lock.op_ret = -1;
+                        local->lock.op_errno = op_errno;
+                        goto cleanup;
+                }
         }
 
         if (lk_index == (local->lock.lk_count - 1)) {
+                for (i = 0; (i < local->lock.lk_count) &&
+                     (!local->lock.locks[i]->locked); i++)
+                        ;
+
+                if (i == local->lock.lk_count) {
+                        local->lock.op_ret = -1;
+                        local->lock.op_errno = op_errno;
+                }
+
                 dht_inodelk_done (frame);
         } else {
                 dht_blocking_inodelk_rec (frame, ++lk_index);
@@ -2066,7 +2087,8 @@ out:
 
 int
 dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
-                      int lk_count, fop_inodelk_cbk_t inodelk_cbk)
+                      int lk_count, dht_reaction_type_t reaction,
+                      fop_inodelk_cbk_t inodelk_cbk)
 {
         int           ret        = -1;
         call_frame_t *lock_frame = NULL;
@@ -2088,6 +2110,7 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
         dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);
 
         local = lock_frame->local;
+        local->lock.reaction = reaction;
         local->main_frame = frame;
 
         dht_blocking_inodelk_rec (lock_frame, 0);
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
index 79b8706..3b636c5 100644
--- a/xlators/cluster/dht/src/dht-rename.c
+++ b/xlators/cluster/dht/src/dht-rename.c
@@ -1320,7 +1320,7 @@ dht_rename_lock (call_frame_t *frame)
         local->lock.lk_count = count;
 
         ret = dht_blocking_inodelk (frame, lk_array, count,
-                                    dht_rename_lock_cbk);
+                                    FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);
         if (ret < 0) {
                 local->lock.locks = NULL;
                 local->lock.lk_count = 0;
diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c
index fd55303..307116a 100644
--- a/xlators/cluster/dht/src/dht-selfheal.c
+++ b/xlators/cluster/dht/src/dht-selfheal.c
@@ -82,7 +82,8 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 }
 
 int
-dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
+dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret,
+                         int invoke_cbk)
 {
         dht_local_t  *local      = NULL, *lock_local = NULL;
         call_frame_t *lock_frame = NULL;
@@ -90,7 +91,6 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
 
         local = frame->local;
         lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
-
         if (lock_count == 0)
                 goto done;
 
@@ -117,8 +117,9 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
         lock_frame = NULL;
 
 done:
-        local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
-                                 local->op_errno, NULL);
+        if (invoke_cbk)
+                local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
+                                         local->op_errno, NULL);
         if (lock_frame != NULL) {
                 DHT_STACK_DESTROY (lock_frame);
         }
@@ -160,13 +161,13 @@ dht_refresh_layout_done (call_frame_t *frame)
 
                 dht_layout_unref (frame->this, heal);
 
-                dht_selfheal_dir_finish (frame, frame->this, 0);
+                dht_selfheal_dir_finish (frame, frame->this, 0, 1);
         }
 
         return 0;
 
 err:
-        dht_selfheal_dir_finish (frame, frame->this, -1);
+        dht_selfheal_dir_finish (frame, frame->this, -1, 1);
         return 0;
 }
 
@@ -226,8 +227,7 @@ unlock:
         return 0;
 
 err:
-        local->refresh_layout_unlock (frame, this, -1);
-
+        local->refresh_layout_unlock (frame, this, -1, 1);
         return 0;
 }
 
@@ -293,7 +293,7 @@ dht_refresh_layout (call_frame_t *frame)
         return 0;
 
 out:
-        local->refresh_layout_unlock (frame, this, -1);
+        local->refresh_layout_unlock (frame, this, -1, 1);
         return 0;
 }
 
@@ -322,7 +322,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         return 0;
 
 err:
-        dht_selfheal_dir_finish (frame, this, -1);
+        dht_selfheal_dir_finish (frame, this, -1, 1);
         return 0;
 }
 
@@ -583,7 +583,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
         local->lock.locks = lk_array;
         local->lock.lk_count = count;
 
-        ret = dht_blocking_inodelk (frame, lk_array, count,
+        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
                                     dht_selfheal_layout_lock_cbk);
         if (ret < 0) {
                 local->lock.locks = NULL;
@@ -594,13 +594,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
         return 0;
 err:
         if (lk_array != NULL) {
-                int tmp_count = 0, i = 0;
-
-                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
-                        ;
-                }
-
-                dht_lock_array_free (lk_array, tmp_count);
+                dht_lock_array_free (lk_array, count);
                 GF_FREE (lk_array);
         }
 
@@ -653,7 +647,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         this_call_cnt = dht_frame_return (frame);
 
         if (is_last_call (this_call_cnt)) {
-                dht_selfheal_dir_finish (frame, this, 0);
+                dht_selfheal_dir_finish (frame, this, 0, 1);
         }
 
         return 0;
@@ -886,7 +880,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)
                       missing_xattr, loc->path);
 
         if (missing_xattr == 0) {
-                dht_selfheal_dir_finish (frame, this, 0);
+                dht_selfheal_dir_finish (frame, this, 0, 1);
                 return 0;
         }
 
@@ -1013,7 +1007,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,
                       missing_xattr, loc->path);
 
         if (missing_xattr == 0) {
-                dht_selfheal_dir_finish (frame, this, 0);
+                dht_selfheal_dir_finish (frame, this, 0, 1);
                 return 0;
         }
 
@@ -1081,7 +1075,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                                                 dht_should_heal_layout);
 
                 if (ret < 0) {
-                        dht_selfheal_dir_finish (frame, this, -1);
+                        dht_selfheal_dir_finish (frame, this, -1, 1);
                 }
         }
 
@@ -1112,7 +1106,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,
                                                 dht_should_heal_layout);
 
                 if (ret < 0) {
-                        dht_selfheal_dir_finish (frame, this, -1);
+                        dht_selfheal_dir_finish (frame, this, -1, 1);
                 }
 
                 return 0;
@@ -1150,7 +1144,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         dht_layout_t  *layout = NULL;
         call_frame_t  *prev = NULL;
         xlator_t      *subvol = NULL;
-        int            i = 0;
+        int            i = 0, ret = -1;
         int            this_call_cnt = 0;
         char           gfid[GF_UUID_BUF_SIZE] = {0};
 
@@ -1182,11 +1176,13 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         dht_iatt_merge (this, &local->stbuf, stbuf, prev->this);
         dht_iatt_merge (this, &local->preparent, preparent, prev->this);
         dht_iatt_merge (this, &local->postparent, postparent, prev->this);
+        ret = 0;
 
 out:
         this_call_cnt = dht_frame_return (frame);
 
         if (is_last_call (this_call_cnt)) {
+                dht_selfheal_dir_finish (frame, this, ret, 0);
                 dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);
         }
 
@@ -1239,32 +1235,21 @@ out:
 }
 
 int
-dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
-                        dht_layout_t *layout, int force)
+dht_selfheal_dir_mkdir_lookup_done (call_frame_t *frame, xlator_t *this)
 {
-        int           missing_dirs = 0;
+        dht_local_t  *local = NULL;
         int           i     = 0;
         int           ret   = -1;
-        dht_local_t  *local = NULL;
-        xlator_t     *this = NULL;
         dict_t       *dict = NULL;
+        dht_layout_t  *layout = NULL;
+        loc_t        *loc   = NULL;
 
-        local = frame->local;
-        this = frame->this;
-
-        local->selfheal.force_mkdir = force ? _gf_true : _gf_false;
-
-        for (i = 0; i < layout->cnt; i++) {
-                if (layout->list[i].err == ENOENT || force)
-                        missing_dirs++;
-        }
+        VALIDATE_OR_GOTO (this->private, err);
 
-        if (missing_dirs == 0) {
-                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, 0xffffffff, layout);
-                return 0;
-        }
+        local = frame->local;
+        layout = local->layout;
+        loc    = &local->loc;
 
-        local->call_cnt = missing_dirs;
         if (!gf_uuid_is_null (local->gfid)) {
                 dict = dict_new ();
                 if (!dict)
@@ -1278,6 +1263,7 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
                                 " key = gfid-req", loc->path);
         } else if (local->params) {
                 /* Send the dictionary from higher layers directly */
+
                 dict = dict_ref (local->params);
         }
         /* Set acls */
@@ -1290,7 +1276,8 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
                         "dict is NULL, need to make sure gfids are same");
 
         for (i = 0; i < layout->cnt; i++) {
-                if (layout->list[i].err == ENOENT || force) {
+                if (layout->list[i].err == ENOENT ||
+                    local->selfheal.force_mkdir) {
                         gf_msg_debug (this->name, 0,
                                       "Creating directory %s on subvol %s",
                                       loc->path, layout->list[i].xlator->name);
@@ -1309,6 +1296,202 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
                 dict_unref (dict);
 
         return 0;
+
+err:
+        dht_selfheal_dir_finish (frame, this, -1, 1);
+        return 0;
+}
+
+int
+dht_selfheal_dir_mkdir_lookup_cbk (call_frame_t *frame, void *cookie,
+                                   xlator_t *this, int op_ret, int op_errno,
+                                   inode_t *inode, struct iatt *stbuf,
+                                   dict_t *xattr, struct iatt *postparent)
+{
+        dht_local_t  *local = NULL;
+        int           i     = 0;
+        int           this_call_cnt = 0;
+        int           missing_dirs = 0;
+        dht_layout_t  *layout = NULL;
+        loc_t         *loc    = NULL;
+
+        VALIDATE_OR_GOTO (this->private, err);
+
+        local = frame->local;
+        layout = local->layout;
+        loc = &local->loc;
+
+        this_call_cnt = dht_frame_return (frame);
+
+        LOCK (&frame->lock);
+        {
+                if ((op_ret < 0) && (op_errno == ENOENT || op_errno == ESTALE))
+                        local->selfheal.hole_cnt = !local->selfheal.hole_cnt ? 1
+                                                : local->selfheal.hole_cnt + 1;
+        }
+        UNLOCK (&frame->lock);
+
+        if (is_last_call (this_call_cnt)) {
+                if (local->selfheal.hole_cnt == layout->cnt) {
+                        gf_msg_debug (this->name, op_errno,
+                                      "Lookup failed, an rmdir could have "
+                                      "deleted this entry %s", loc->name);
+                        local->op_errno = op_errno;
+                        goto err;
+                } else {
+                        for (i = 0; i < layout->cnt; i++) {
+                                if (layout->list[i].err == ENOENT ||
+                                    layout->list[i].err == ESTALE ||
+                                    local->selfheal.force_mkdir)
+                                        missing_dirs++;
+                        }
+
+                        if (missing_dirs == 0) {
+                                dht_selfheal_dir_finish (frame, this, 0, 0);
+                                dht_selfheal_dir_setattr (frame, loc,
+                                                          &local->stbuf,
+                                                          0xffffffff, layout);
+                                return 0;
+                        }
+
+                        local->call_cnt = missing_dirs;
+                        dht_selfheal_dir_mkdir_lookup_done (frame, this);
+                }
+        }
+
+        return 0;
+
+err:
+        dht_selfheal_dir_finish (frame, this, -1, 1);
+        return 0;
+}
+
+
+int
+dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie,
+                                 xlator_t *this, int32_t op_ret,
+                                 int32_t op_errno, dict_t *xdata)
+{
+        dht_local_t  *local = NULL;
+        dht_conf_t   *conf  = NULL;
+        int           i     = 0;
+
+        VALIDATE_OR_GOTO (this->private, err);
+
+        conf = this->private;
+        local = frame->local;
+
+	    local->call_cnt = conf->subvolume_cnt;
+
+        if (op_ret < 0) {
+
+                /* We get this error when the directory entry was not created
+                 * on a newky attatched tier subvol. Hence proceed and do mkdir
+                 * on the tier subvol.
+                 */
+                if (op_errno == EINVAL) {
+                        local->call_cnt = 1;
+                        dht_selfheal_dir_mkdir_lookup_done (frame, this);
+                        return 0;
+                }
+
+                gf_msg (this->name, GF_LOG_WARNING, op_errno,
+                        DHT_MSG_INODE_LK_ERROR,
+                        "acquiring inodelk failed for %s",
+                        local->loc.path);
+
+                local->op_errno = op_errno;
+                goto err;
+        }
+
+        /* After getting locks, perform lookup again to ensure that the
+           directory was not deleted by a racing rmdir
+        */
+
+        for (i = 0; i < conf->subvolume_cnt; i++) {
+                STACK_WIND (frame, dht_selfheal_dir_mkdir_lookup_cbk,
+                            conf->subvolumes[i],
+                            conf->subvolumes[i]->fops->lookup,
+                            &local->loc, NULL);
+        }
+
+        return 0;
+
+err:
+        dht_selfheal_dir_finish (frame, this, -1, 1);
+        return 0;
+}
+
+int
+dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
+                        dht_layout_t *layout, int force)
+{
+        int           missing_dirs = 0;
+        int           i     = 0;
+        int           ret   = -1;
+        int           count = 1;
+        dht_local_t  *local = NULL;
+        dht_conf_t   *conf  = NULL;
+        xlator_t     *this = NULL;
+        dht_lock_t   **lk_array = NULL;
+
+        local = frame->local;
+        this = frame->this;
+        conf = this->private;
+
+        local->selfheal.force_mkdir = force;
+        local->selfheal.hole_cnt = 0;
+
+        for (i = 0; i < layout->cnt; i++) {
+                if (layout->list[i].err == ENOENT || force)
+                        missing_dirs++;
+        }
+
+        if (missing_dirs == 0) {
+                dht_selfheal_dir_setattr (frame, loc, &local->stbuf,
+                                          0xffffffff, layout);
+                return 0;
+        }
+
+        count = conf->subvolume_cnt;
+
+        /* Locking on all subvols in the mkdir phase of lookup selfheal is
+           is done to synchronize with rmdir/rename.
+        */
+        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
+        if (lk_array == NULL)
+                goto err;
+
+        for (i = 0; i < count; i++) {
+                lk_array[i] = dht_lock_new (frame->this,
+                                            conf->subvolumes[i],
+                                            &local->loc, F_WRLCK,
+                                            DHT_LAYOUT_HEAL_DOMAIN);
+                if (lk_array[i] == NULL)
+                        goto err;
+        }
+
+        local->lock.locks = lk_array;
+        local->lock.lk_count = count;
+
+        ret = dht_blocking_inodelk (frame, lk_array, count,
+                                    IGNORE_ENOENT_ESTALE,
+                                    dht_selfheal_dir_mkdir_lock_cbk);
+
+        if (ret < 0) {
+                local->lock.locks = NULL;
+                local->lock.lk_count = 0;
+                goto err;
+        }
+
+        return 0;
+err:
+        if (lk_array != NULL) {
+                dht_lock_array_free (lk_array, count);
+                GF_FREE (lk_array);
+        }
+
+        return -1;
 }
 
 int
@@ -1882,7 +2065,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,
 
 sorry_no_fix:
         /* TODO: need to put appropriate local->op_errno */
-        dht_selfheal_dir_finish (frame, this, ret);
+        dht_selfheal_dir_finish (frame, this, ret, 1);
 
         return 0;
 }
@@ -1950,7 +2133,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,
 
 sorry_no_fix:
         /* TODO: need to put appropriate local->op_errno */
-        dht_selfheal_dir_finish (frame, this, ret);
+        dht_selfheal_dir_finish (frame, this, ret, 1);
 
         return 0;
 
@@ -2301,7 +2484,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
         local->lock.locks = lk_array;
         local->lock.lk_count = count;
 
-        ret = dht_blocking_inodelk (frame, lk_array, count,
+        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
                                     dht_update_commit_hash_for_layout_resume);
         if (ret < 0) {
                 local->lock.locks = NULL;
@@ -2312,13 +2495,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
         return 0;
 err:
         if (lk_array != NULL) {
-                int tmp_count = 0, i = 0;
-
-                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
-                        ;
-                }
-
-                dht_lock_array_free (lk_array, tmp_count);
+                dht_lock_array_free (lk_array, count);
                 GF_FREE (lk_array);
         }
 
-- 
1.7.1