cb8e9e
From a288ff3ea3b792dcafaa4ffa247d3c6032d68f10 Mon Sep 17 00:00:00 2001
cb8e9e
From: Sakshi <sabansal@redhat.com>
cb8e9e
Date: Thu, 16 Jul 2015 14:31:03 +0530
cb8e9e
Subject: [PATCH 311/320] dht : lock on subvols to prevent lookup vs rmdir race
cb8e9e
cb8e9e
There is a possibility that while an rmdir is completed on
cb8e9e
some non-hashed subvol and proceeding to others. A lookup
cb8e9e
selfheal can recreate the same directory on those subvols
cb8e9e
for which the rmdir had succeeded. The fix is to take a
cb8e9e
blocking inodelk on the subvols before starting rmdir.
cb8e9e
Since selfheal requires lock on all subvols, if an rmdir
cb8e9e
is in progess acquiring locks will fail and vice versa.
cb8e9e
cb8e9e
Change-Id: I841a44758c3b88f5e04d1cb73ad36e0cac9fdabb
cb8e9e
BUG: 1115367
cb8e9e
Signed-off-by: Sakshi <sabansal@redhat.com>
cb8e9e
Reviewed-on: http://review.gluster.org/11725
cb8e9e
Tested-by: NetBSD Build System <jenkins@build.gluster.org>
cb8e9e
Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
cb8e9e
Reviewed-on: https://code.engineering.redhat.com/gerrit/56595
cb8e9e
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
cb8e9e
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
cb8e9e
---
cb8e9e
 xlators/cluster/dht/src/dht-common.c   |  180 +++++++++++++++++++++++++++----
cb8e9e
 xlators/cluster/dht/src/dht-common.h   |   14 ++-
cb8e9e
 xlators/cluster/dht/src/dht-helper.c   |   38 ++++++-
cb8e9e
 xlators/cluster/dht/src/dht-rename.c   |    2 +-
cb8e9e
 xlators/cluster/dht/src/dht-selfheal.c |  181 +++++++++++++++++++++++---------
cb8e9e
 5 files changed, 331 insertions(+), 84 deletions(-)
cb8e9e
cb8e9e
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
cb8e9e
index 5c1a693..f819aa6 100644
cb8e9e
--- a/xlators/cluster/dht/src/dht-common.c
cb8e9e
+++ b/xlators/cluster/dht/src/dht-common.c
cb8e9e
@@ -40,6 +40,10 @@ dht_setxattr2 (xlator_t *this, xlator_t *subvol, call_frame_t *frame);
cb8e9e
 int run_defrag = 0;
cb8e9e
 
cb8e9e
 int
cb8e9e
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this);
cb8e9e
+
cb8e9e
+
cb8e9e
+int
cb8e9e
 dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)
cb8e9e
 {
cb8e9e
         int              ret            = -1;
cb8e9e
@@ -4514,7 +4518,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
cb8e9e
                  * corresponding hashed subvolume will take care of the
cb8e9e
                  * directory entry.
cb8e9e
                  */
cb8e9e
-
cb8e9e
                         if (readdir_optimize) {
cb8e9e
                                 if (prev->this == local->first_up_subvol)
cb8e9e
                                         goto list;
cb8e9e
@@ -4999,7 +5002,7 @@ out:
cb8e9e
         if (local && local->lock.locks) {
cb8e9e
                 /* store op_errno for failure case*/
cb8e9e
                 local->op_errno = op_errno;
cb8e9e
-                local->refresh_layout_unlock (frame, this, op_ret);
cb8e9e
+                local->refresh_layout_unlock (frame, this, op_ret, 0);
cb8e9e
 
cb8e9e
                 if (op_ret == 0) {
cb8e9e
                         DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno,
cb8e9e
@@ -5044,7 +5047,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
         if (local->lock.locks)
cb8e9e
-                local->refresh_layout_unlock (frame, this, -1);
cb8e9e
+                local->refresh_layout_unlock (frame, this, -1, 0);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
@@ -5149,7 +5152,7 @@ dht_mknod_do (call_frame_t *frame)
cb8e9e
                                          local->umask, local->params);
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
-        local->refresh_layout_unlock (frame, this, -1);
cb8e9e
+        local->refresh_layout_unlock (frame, this, -1, 0);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
@@ -5164,7 +5167,7 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 }
cb8e9e
 
cb8e9e
 int32_t
cb8e9e
-dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret)
cb8e9e
+dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret, int invoke_cbk)
cb8e9e
 {
cb8e9e
         dht_local_t  *local      = NULL, *lock_local = NULL;
cb8e9e
         call_frame_t *lock_frame = NULL;
cb8e9e
@@ -5239,7 +5242,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
-        dht_mknod_finish (frame, this, -1);
cb8e9e
+        dht_mknod_finish (frame, this, -1, 0);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -5270,7 +5273,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)
cb8e9e
         local->lock.lk_count = count;
cb8e9e
 
cb8e9e
         ret = dht_blocking_inodelk (frame, lk_array, count,
cb8e9e
-                                    dht_mknod_lock_cbk);
cb8e9e
+                                    IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);
cb8e9e
 
cb8e9e
         if (ret < 0) {
cb8e9e
                 local->lock.locks = NULL;
cb8e9e
@@ -5797,7 +5800,7 @@ out:
cb8e9e
         if (local && local->lock.locks) {
cb8e9e
                 /* store op_errno for failure case*/
cb8e9e
                 local->op_errno = op_errno;
cb8e9e
-                local->refresh_layout_unlock (frame, this, op_ret);
cb8e9e
+                local->refresh_layout_unlock (frame, this, op_ret, 0);
cb8e9e
 
cb8e9e
                 if (op_ret == 0) {
cb8e9e
                         DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd,
cb8e9e
@@ -5838,7 +5841,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
         if (local->lock.locks)
cb8e9e
-                local->refresh_layout_unlock (frame, this, -1);
cb8e9e
+                local->refresh_layout_unlock (frame, this, -1, 0);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
@@ -6002,7 +6005,7 @@ dht_create_do (call_frame_t *frame)
cb8e9e
                                          local->umask, local->fd, local->params);
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
-        local->refresh_layout_unlock (frame, this, -1);
cb8e9e
+        local->refresh_layout_unlock (frame, this, -1, 0);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
@@ -6016,7 +6019,7 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 }
cb8e9e
 
cb8e9e
 int32_t
cb8e9e
-dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret)
cb8e9e
+dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret, int invoke_cbk)
cb8e9e
 {
cb8e9e
         dht_local_t  *local      = NULL, *lock_local = NULL;
cb8e9e
         call_frame_t *lock_frame = NULL;
cb8e9e
@@ -6091,7 +6094,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
-        dht_create_finish (frame, this, -1);
cb8e9e
+        dht_create_finish (frame, this, -1, 0);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -6122,7 +6125,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)
cb8e9e
         local->lock.lk_count = count;
cb8e9e
 
cb8e9e
         ret = dht_blocking_inodelk (frame, lk_array, count,
cb8e9e
-                                    dht_create_lock_cbk);
cb8e9e
+                                    IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);
cb8e9e
 
cb8e9e
         if (ret < 0) {
cb8e9e
                 local->lock.locks = NULL;
cb8e9e
@@ -6582,6 +6585,7 @@ unlock:
cb8e9e
         this_call_cnt = dht_frame_return (frame);
cb8e9e
         if (is_last_call (this_call_cnt)) {
cb8e9e
                if (local->need_selfheal) {
cb8e9e
+                        dht_rmdir_unlock (frame, this);
cb8e9e
                         local->layout =
cb8e9e
                                 dht_layout_get (this, local->loc.inode);
cb8e9e
 
cb8e9e
@@ -6605,6 +6609,7 @@ unlock:
cb8e9e
                                                            1);
cb8e9e
                         }
cb8e9e
 
cb8e9e
+                        dht_rmdir_unlock (frame, this);
cb8e9e
                         DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
cb8e9e
                                           local->op_errno, &local->preparent,
cb8e9e
                                           &local->postparent, NULL);
cb8e9e
@@ -6673,6 +6678,7 @@ unlock:
cb8e9e
 
cb8e9e
         if (done) {
cb8e9e
                 if (local->need_selfheal && local->fop_succeeded) {
cb8e9e
+                        dht_rmdir_unlock (frame, this);
cb8e9e
                         local->layout =
cb8e9e
                                 dht_layout_get (this, local->loc.inode);
cb8e9e
 
cb8e9e
@@ -6707,6 +6713,7 @@ unlock:
cb8e9e
 
cb8e9e
                         }
cb8e9e
 
cb8e9e
+                        dht_rmdir_unlock (frame, this);
cb8e9e
                         DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
cb8e9e
                                           local->op_errno, &local->preparent,
cb8e9e
                                           &local->postparent, NULL);
cb8e9e
@@ -6718,11 +6725,110 @@ unlock:
cb8e9e
 
cb8e9e
 
cb8e9e
 int
cb8e9e
+dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
+                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
cb8e9e
+{
cb8e9e
+        DHT_STACK_DESTROY (frame);
cb8e9e
+        return 0;
cb8e9e
+}
cb8e9e
+
cb8e9e
+
cb8e9e
+int
cb8e9e
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)
cb8e9e
+{
cb8e9e
+        dht_local_t  *local      = NULL, *lock_local = NULL;
cb8e9e
+        call_frame_t *lock_frame = NULL;
cb8e9e
+        int           lock_count = 0;
cb8e9e
+
cb8e9e
+        local = frame->local;
cb8e9e
+        lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
cb8e9e
+
cb8e9e
+        if (lock_count == 0)
cb8e9e
+                goto done;
cb8e9e
+
cb8e9e
+        lock_frame = copy_frame (frame);
cb8e9e
+        if (lock_frame == NULL)
cb8e9e
+                goto done;
cb8e9e
+
cb8e9e
+        lock_local = dht_local_init (lock_frame, &local->loc, NULL,
cb8e9e
+                                     lock_frame->root->op);
cb8e9e
+        if (lock_local == NULL)
cb8e9e
+                goto done;
cb8e9e
+
cb8e9e
+        lock_local->lock.locks = local->lock.locks;
cb8e9e
+        lock_local->lock.lk_count = local->lock.lk_count;
cb8e9e
+
cb8e9e
+        local->lock.locks = NULL;
cb8e9e
+        local->lock.lk_count = 0;
cb8e9e
+        dht_unlock_inodelk (lock_frame, lock_local->lock.locks,
cb8e9e
+                            lock_local->lock.lk_count,
cb8e9e
+                            dht_rmdir_unlock_cbk);
cb8e9e
+        lock_frame = NULL;
cb8e9e
+
cb8e9e
+done:
cb8e9e
+        if (lock_frame != NULL) {
cb8e9e
+                DHT_STACK_DESTROY (lock_frame);
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        return 0;
cb8e9e
+}
cb8e9e
+
cb8e9e
+
cb8e9e
+int
cb8e9e
+dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
+                    int32_t op_ret, int32_t op_errno, dict_t *xdata)
cb8e9e
+{
cb8e9e
+        dht_local_t  *local = NULL;
cb8e9e
+        dht_conf_t   *conf  = NULL;
cb8e9e
+        int           i     = 0;
cb8e9e
+
cb8e9e
+        VALIDATE_OR_GOTO (this->private, err);
cb8e9e
+
cb8e9e
+        conf = this->private;
cb8e9e
+        local = frame->local;
cb8e9e
+
cb8e9e
+        if (op_ret < 0) {
cb8e9e
+                gf_msg (this->name, GF_LOG_WARNING, op_errno,
cb8e9e
+                        DHT_MSG_INODE_LK_ERROR,
cb8e9e
+                        "acquiring inodelk failed rmdir for %s)",
cb8e9e
+                        local->loc.path);
cb8e9e
+
cb8e9e
+                local->op_ret = -1;
cb8e9e
+                local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno;
cb8e9e
+                goto err;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        for (i = 0; i < conf->subvolume_cnt; i++) {
cb8e9e
+                if (local->hashed_subvol &&
cb8e9e
+                    (local->hashed_subvol == conf->subvolumes[i]))
cb8e9e
+                        continue;
cb8e9e
+
cb8e9e
+                STACK_WIND (frame, dht_rmdir_cbk,
cb8e9e
+                            conf->subvolumes[i],
cb8e9e
+                            conf->subvolumes[i]->fops->rmdir,
cb8e9e
+                            &local->loc, local->flags, NULL);
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        return 0;
cb8e9e
+
cb8e9e
+err:
cb8e9e
+        /* No harm in calling an extra rmdir unlock */
cb8e9e
+        dht_rmdir_unlock (frame, this);
cb8e9e
+        DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
cb8e9e
+                          &local->preparent, &local->postparent, NULL);
cb8e9e
+
cb8e9e
+        return 0;
cb8e9e
+}
cb8e9e
+
cb8e9e
+
cb8e9e
+int
cb8e9e
 dht_rmdir_do (call_frame_t *frame, xlator_t *this)
cb8e9e
 {
cb8e9e
         dht_local_t  *local = NULL;
cb8e9e
         dht_conf_t   *conf = NULL;
cb8e9e
-        int           i = 0;
cb8e9e
+        dht_lock_t   **lk_array = NULL;
cb8e9e
+        int           i = 0, ret = -1;
cb8e9e
+        int           count = 1;
cb8e9e
         xlator_t     *hashed_subvol = NULL;
cb8e9e
         char gfid[GF_UUID_BUF_SIZE] ={0};
cb8e9e
 
cb8e9e
@@ -6736,7 +6842,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
cb8e9e
 
cb8e9e
         local->call_cnt = conf->subvolume_cnt;
cb8e9e
 
cb8e9e
-
cb8e9e
         /* first remove from non-hashed_subvol */
cb8e9e
         hashed_subvol = dht_subvol_get_hashed (this, &local->loc);
cb8e9e
 
cb8e9e
@@ -6760,20 +6865,49 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
cb8e9e
                 return 0;
cb8e9e
         }
cb8e9e
 
cb8e9e
-        for (i = 0; i < conf->subvolume_cnt; i++) {
cb8e9e
-                if (hashed_subvol &&
cb8e9e
-                    (hashed_subvol == conf->subvolumes[i]))
cb8e9e
-                        continue;
cb8e9e
+        count = conf->subvolume_cnt;
cb8e9e
 
cb8e9e
-                STACK_WIND (frame, dht_rmdir_cbk,
cb8e9e
-                            conf->subvolumes[i],
cb8e9e
-                            conf->subvolumes[i]->fops->rmdir,
cb8e9e
-                            &local->loc, local->flags, NULL);
cb8e9e
+        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
cb8e9e
+        if (lk_array == NULL) {
cb8e9e
+                local->op_ret = -1;
cb8e9e
+                local->op_errno = ENOMEM;
cb8e9e
+                goto err;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        for (i = 0; i < count; i++) {
cb8e9e
+                lk_array[i] = dht_lock_new (frame->this,
cb8e9e
+                                            conf->subvolumes[i],
cb8e9e
+                                            &local->loc, F_WRLCK,
cb8e9e
+                                            DHT_LAYOUT_HEAL_DOMAIN);
cb8e9e
+                if (lk_array[i] == NULL) {
cb8e9e
+                        local->op_ret = -1;
cb8e9e
+                        local->op_errno = EINVAL;
cb8e9e
+                        goto err;
cb8e9e
+                }
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        local->lock.locks = lk_array;
cb8e9e
+        local->lock.lk_count = count;
cb8e9e
+
cb8e9e
+        ret = dht_blocking_inodelk (frame, lk_array, count,
cb8e9e
+                                    IGNORE_ENOENT_ESTALE,
cb8e9e
+                                    dht_rmdir_lock_cbk);
cb8e9e
+        if (ret < 0) {
cb8e9e
+                local->lock.locks = NULL;
cb8e9e
+                local->lock.lk_count = 0;
cb8e9e
+                local->op_ret = -1;
cb8e9e
+                local->op_errno = errno ? errno : EINVAL;
cb8e9e
+                goto err;
cb8e9e
         }
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 err:
cb8e9e
+        if (lk_array != NULL) {
cb8e9e
+                dht_lock_array_free (lk_array, count);
cb8e9e
+                GF_FREE (lk_array);
cb8e9e
+        }
cb8e9e
+
cb8e9e
         DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
cb8e9e
                           &local->preparent, &local->postparent, NULL);
cb8e9e
         return 0;
cb8e9e
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
cb8e9e
index 1b5a084..7f99a06 100644
cb8e9e
--- a/xlators/cluster/dht/src/dht-common.h
cb8e9e
+++ b/xlators/cluster/dht/src/dht-common.h
cb8e9e
@@ -45,7 +45,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t        *this, xlator_t *dst_node,
cb8e9e
                                     call_frame_t    *frame);
cb8e9e
 
cb8e9e
 typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this,
cb8e9e
-                                         int op_ret);
cb8e9e
+                                         int op_ret, int invoke_cbk);
cb8e9e
 
cb8e9e
 typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame);
cb8e9e
 
cb8e9e
@@ -136,6 +136,11 @@ typedef enum {
cb8e9e
         qdstatfs_action_COMPARE,
cb8e9e
 } qdstatfs_action_t;
cb8e9e
 
cb8e9e
+typedef enum {
cb8e9e
+        FAIL_ON_ANY_ERROR,
cb8e9e
+        IGNORE_ENOENT_ESTALE
cb8e9e
+} dht_reaction_type_t;
cb8e9e
+
cb8e9e
 struct dht_skip_linkto_unlink {
cb8e9e
 
cb8e9e
         gf_boolean_t    handle_valid_link;
cb8e9e
@@ -266,6 +271,7 @@ struct dht_local {
cb8e9e
                 fop_inodelk_cbk_t   inodelk_cbk;
cb8e9e
                 dht_lock_t        **locks;
cb8e9e
                 int                 lk_count;
cb8e9e
+                dht_reaction_type_t reaction;
cb8e9e
 
cb8e9e
                 /* whether locking failed on _any_ of the "locks" above */
cb8e9e
                 int                 op_ret;
cb8e9e
@@ -1047,7 +1053,8 @@ dht_fill_dict_to_avoid_unlink_of_migrating_file (dict_t *dict);
cb8e9e
 
cb8e9e
 int
cb8e9e
 dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
-                         int lk_count, fop_inodelk_cbk_t inodelk_cbk);
cb8e9e
+                         int lk_count, dht_reaction_type_t reaction,
cb8e9e
+                         fop_inodelk_cbk_t inodelk_cbk);
cb8e9e
 
cb8e9e
 /* same as dht_nonblocking_inodelk, but issues sequential blocking locks on
cb8e9e
  * @lk_array directly. locks are issued on some order which remains same
cb8e9e
@@ -1055,7 +1062,8 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
  */
cb8e9e
 int
cb8e9e
 dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
-                      int lk_count, fop_inodelk_cbk_t inodelk_cbk);
cb8e9e
+                      int lk_count, dht_reaction_type_t reaction,
cb8e9e
+                      fop_inodelk_cbk_t inodelk_cbk);
cb8e9e
 
cb8e9e
 int32_t
cb8e9e
 dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count,
cb8e9e
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
cb8e9e
index 2e4a53c..1b3fbb0 100644
cb8e9e
--- a/xlators/cluster/dht/src/dht-helper.c
cb8e9e
+++ b/xlators/cluster/dht/src/dht-helper.c
cb8e9e
@@ -347,6 +347,7 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,
cb8e9e
 
cb8e9e
         lock->xl = xl;
cb8e9e
         lock->type = type;
cb8e9e
+
cb8e9e
         lock->domain = gf_strdup (domain);
cb8e9e
         if (lock->domain == NULL) {
cb8e9e
                 dht_lock_free (lock);
cb8e9e
@@ -1692,7 +1693,8 @@ out:
cb8e9e
 
cb8e9e
 int
cb8e9e
 dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
-                         int lk_count, fop_inodelk_cbk_t inodelk_cbk)
cb8e9e
+                         int lk_count, dht_reaction_type_t reaction,
cb8e9e
+                         fop_inodelk_cbk_t inodelk_cbk)
cb8e9e
 {
cb8e9e
         struct gf_flock  flock      = {0,};
cb8e9e
         int              i          = 0, ret = 0;
cb8e9e
@@ -1715,6 +1717,7 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
         dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);
cb8e9e
 
cb8e9e
         local = lock_frame->local;
cb8e9e
+        local->lock.reaction = reaction;
cb8e9e
         local->main_frame = frame;
cb8e9e
 
cb8e9e
         local->call_cnt = lk_count;
cb8e9e
@@ -1745,21 +1748,42 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
                           int32_t op_ret, int32_t op_errno, dict_t *xdata)
cb8e9e
 {
cb8e9e
         int          lk_index = 0;
cb8e9e
+        int          i        = 0;
cb8e9e
         dht_local_t *local    = NULL;
cb8e9e
 
cb8e9e
         lk_index = (long) cookie;
cb8e9e
 
cb8e9e
         local = frame->local;
cb8e9e
-
cb8e9e
         if (op_ret == 0) {
cb8e9e
                 local->lock.locks[lk_index]->locked = _gf_true;
cb8e9e
         } else {
cb8e9e
-                local->lock.op_ret = -1;
cb8e9e
-                local->lock.op_errno = op_errno;
cb8e9e
-                goto cleanup;
cb8e9e
+                switch (op_errno) {
cb8e9e
+                case ESTALE:
cb8e9e
+                case ENOENT:
cb8e9e
+                        if (local->lock.reaction != IGNORE_ENOENT_ESTALE) {
cb8e9e
+                                local->lock.op_ret = -1;
cb8e9e
+                                local->lock.op_errno = op_errno;
cb8e9e
+                                goto cleanup;
cb8e9e
+                        }
cb8e9e
+                        break;
cb8e9e
+                default:
cb8e9e
+                        local->lock.op_ret = -1;
cb8e9e
+                        local->lock.op_errno = op_errno;
cb8e9e
+                        goto cleanup;
cb8e9e
+                }
cb8e9e
         }
cb8e9e
 
cb8e9e
         if (lk_index == (local->lock.lk_count - 1)) {
cb8e9e
+                for (i = 0; (i < local->lock.lk_count) &&
cb8e9e
+                     (!local->lock.locks[i]->locked); i++) {
cb8e9e
+                        ;
cb8e9e
+                }
cb8e9e
+
cb8e9e
+                if (i == local->lock.lk_count) {
cb8e9e
+                        local->lock.op_ret = -1;
cb8e9e
+                        local->lock.op_errno = op_errno;
cb8e9e
+                }
cb8e9e
+
cb8e9e
                 dht_inodelk_done (frame);
cb8e9e
         } else {
cb8e9e
                 dht_blocking_inodelk_rec (frame, ++lk_index);
cb8e9e
@@ -1833,7 +1857,8 @@ out:
cb8e9e
 
cb8e9e
 int
cb8e9e
 dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
-                      int lk_count, fop_inodelk_cbk_t inodelk_cbk)
cb8e9e
+                      int lk_count, dht_reaction_type_t reaction,
cb8e9e
+                      fop_inodelk_cbk_t inodelk_cbk)
cb8e9e
 {
cb8e9e
         int           ret        = -1;
cb8e9e
         call_frame_t *lock_frame = NULL;
cb8e9e
@@ -1855,6 +1880,7 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
cb8e9e
         dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);
cb8e9e
 
cb8e9e
         local = lock_frame->local;
cb8e9e
+        local->lock.reaction = reaction;
cb8e9e
         local->main_frame = frame;
cb8e9e
 
cb8e9e
         dht_blocking_inodelk_rec (lock_frame, 0);
cb8e9e
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
cb8e9e
index 320f875..06d7ac8 100644
cb8e9e
--- a/xlators/cluster/dht/src/dht-rename.c
cb8e9e
+++ b/xlators/cluster/dht/src/dht-rename.c
cb8e9e
@@ -1307,7 +1307,7 @@ dht_rename_lock (call_frame_t *frame)
cb8e9e
         local->lock.lk_count = count;
cb8e9e
 
cb8e9e
         ret = dht_nonblocking_inodelk (frame, lk_array, count,
cb8e9e
-                                       dht_rename_lock_cbk);
cb8e9e
+                                       FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);
cb8e9e
         if (ret < 0) {
cb8e9e
                 local->lock.locks = NULL;
cb8e9e
                 local->lock.lk_count = 0;
cb8e9e
diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c
cb8e9e
index cd1d97f..46491cf 100644
cb8e9e
--- a/xlators/cluster/dht/src/dht-selfheal.c
cb8e9e
+++ b/xlators/cluster/dht/src/dht-selfheal.c
cb8e9e
@@ -82,7 +82,7 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 }
cb8e9e
 
cb8e9e
 int
cb8e9e
-dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
cb8e9e
+dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret, int invoke_cbk)
cb8e9e
 {
cb8e9e
         dht_local_t  *local      = NULL, *lock_local = NULL;
cb8e9e
         call_frame_t *lock_frame = NULL;
cb8e9e
@@ -90,7 +90,6 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
cb8e9e
 
cb8e9e
         local = frame->local;
cb8e9e
         lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
cb8e9e
-
cb8e9e
         if (lock_count == 0)
cb8e9e
                 goto done;
cb8e9e
 
cb8e9e
@@ -117,8 +116,9 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
cb8e9e
         lock_frame = NULL;
cb8e9e
 
cb8e9e
 done:
cb8e9e
-        local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
cb8e9e
-                                 local->op_errno, NULL);
cb8e9e
+        if (!invoke_cbk)
cb8e9e
+                local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
cb8e9e
+                                         local->op_errno, NULL);
cb8e9e
         if (lock_frame != NULL) {
cb8e9e
                 DHT_STACK_DESTROY (lock_frame);
cb8e9e
         }
cb8e9e
@@ -160,13 +160,13 @@ dht_refresh_layout_done (call_frame_t *frame)
cb8e9e
 
cb8e9e
                 dht_layout_unref (frame->this, heal);
cb8e9e
 
cb8e9e
-                dht_selfheal_dir_finish (frame, frame->this, 0);
cb8e9e
+                dht_selfheal_dir_finish (frame, frame->this, 0, 0);
cb8e9e
         }
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 err:
cb8e9e
-        dht_selfheal_dir_finish (frame, frame->this, -1);
cb8e9e
+        dht_selfheal_dir_finish (frame, frame->this, -1, 0);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -224,8 +224,9 @@ unlock:
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 err:
cb8e9e
-        local->refresh_layout_unlock (frame, this, -1);
cb8e9e
+        local->refresh_layout_unlock (frame, this, -1, 0);
cb8e9e
 
cb8e9e
+        dht_selfheal_dir_finish (frame, this, -1, 0);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -291,7 +292,8 @@ dht_refresh_layout (call_frame_t *frame)
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 out:
cb8e9e
-        local->refresh_layout_unlock (frame, this, -1);
cb8e9e
+        local->refresh_layout_unlock (frame, this, -1, 0);
cb8e9e
+        dht_selfheal_dir_finish (frame, this, -1, 0);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -319,7 +321,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 err:
cb8e9e
-        dht_selfheal_dir_finish (frame, this, -1);
cb8e9e
+        dht_selfheal_dir_finish (frame, this, -1, 0);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -580,7 +582,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
cb8e9e
         local->lock.locks = lk_array;
cb8e9e
         local->lock.lk_count = count;
cb8e9e
 
cb8e9e
-        ret = dht_blocking_inodelk (frame, lk_array, count,
cb8e9e
+        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
cb8e9e
                                     dht_selfheal_layout_lock_cbk);
cb8e9e
         if (ret < 0) {
cb8e9e
                 local->lock.locks = NULL;
cb8e9e
@@ -591,13 +593,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
         if (lk_array != NULL) {
cb8e9e
-                int tmp_count = 0, i = 0;
cb8e9e
-
cb8e9e
-                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
cb8e9e
-                        ;
cb8e9e
-                }
cb8e9e
-
cb8e9e
-                dht_lock_array_free (lk_array, tmp_count);
cb8e9e
+                dht_lock_array_free (lk_array, count);
cb8e9e
                 GF_FREE (lk_array);
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -636,7 +632,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         this_call_cnt = dht_frame_return (frame);
cb8e9e
 
cb8e9e
         if (is_last_call (this_call_cnt)) {
cb8e9e
-                dht_selfheal_dir_finish (frame, this, 0);
cb8e9e
+                dht_selfheal_dir_finish (frame, this, 0, 0);
cb8e9e
         }
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
@@ -831,7 +827,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)
cb8e9e
                       missing_xattr, loc->path);
cb8e9e
 
cb8e9e
         if (missing_xattr == 0) {
cb8e9e
-                dht_selfheal_dir_finish (frame, this, 0);
cb8e9e
+                dht_selfheal_dir_finish (frame, this, 0, 0);
cb8e9e
                 return 0;
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -958,7 +954,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,
cb8e9e
                       missing_xattr, loc->path);
cb8e9e
 
cb8e9e
         if (missing_xattr == 0) {
cb8e9e
-                dht_selfheal_dir_finish (frame, this, 0);
cb8e9e
+                dht_selfheal_dir_finish (frame, this, 0, 0);
cb8e9e
                 return 0;
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -1026,7 +1022,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
                                                 dht_should_heal_layout);
cb8e9e
 
cb8e9e
                 if (ret < 0) {
cb8e9e
-                        dht_selfheal_dir_finish (frame, this, -1);
cb8e9e
+                        dht_selfheal_dir_finish (frame, this, -1, 0);
cb8e9e
                 }
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -1057,7 +1053,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,
cb8e9e
                                                 dht_should_heal_layout);
cb8e9e
 
cb8e9e
                 if (ret < 0) {
cb8e9e
-                        dht_selfheal_dir_finish (frame, this, -1);
cb8e9e
+                        dht_selfheal_dir_finish (frame, this, -1, 0);
cb8e9e
                 }
cb8e9e
 
cb8e9e
                 return 0;
cb8e9e
@@ -1095,7 +1091,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         dht_layout_t  *layout = NULL;
cb8e9e
         call_frame_t  *prev = NULL;
cb8e9e
         xlator_t      *subvol = NULL;
cb8e9e
-        int            i = 0;
cb8e9e
+        int            i = 0, ret = -1;
cb8e9e
         int            this_call_cnt = 0;
cb8e9e
         char           gfid[GF_UUID_BUF_SIZE] = {0};
cb8e9e
 
cb8e9e
@@ -1114,7 +1110,6 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         }
cb8e9e
 
cb8e9e
         if (op_ret) {
cb8e9e
-
cb8e9e
                 gf_uuid_unparse(local->loc.gfid, gfid);
cb8e9e
                 gf_msg (this->name, ((op_errno == EEXIST) ? GF_LOG_DEBUG :
cb8e9e
                                      GF_LOG_WARNING),
cb8e9e
@@ -1127,11 +1122,13 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         dht_iatt_merge (this, &local->stbuf, stbuf, prev->this);
cb8e9e
         dht_iatt_merge (this, &local->preparent, preparent, prev->this);
cb8e9e
         dht_iatt_merge (this, &local->postparent, postparent, prev->this);
cb8e9e
+        ret = 0;
cb8e9e
 
cb8e9e
 out:
cb8e9e
         this_call_cnt = dht_frame_return (frame);
cb8e9e
 
cb8e9e
         if (is_last_call (this_call_cnt)) {
cb8e9e
+                dht_selfheal_dir_finish (frame, this, ret, -1);
cb8e9e
                 dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -1184,32 +1181,33 @@ out:
cb8e9e
 }
cb8e9e
 
cb8e9e
 int
cb8e9e
-dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
cb8e9e
-                        dht_layout_t *layout, int force)
cb8e9e
+dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
+                                 int32_t op_ret, int32_t op_errno, dict_t *xdata)
cb8e9e
 {
cb8e9e
-        int           missing_dirs = 0;
cb8e9e
+        dht_local_t  *local = NULL;
cb8e9e
         int           i     = 0;
cb8e9e
         int           ret   = -1;
cb8e9e
-        dht_local_t  *local = NULL;
cb8e9e
-        xlator_t     *this = NULL;
cb8e9e
         dict_t       *dict = NULL;
cb8e9e
+        dht_layout_t  *layout = NULL;
cb8e9e
+        loc_t        *loc   = NULL;
cb8e9e
 
cb8e9e
-        local = frame->local;
cb8e9e
-        this = frame->this;
cb8e9e
+        VALIDATE_OR_GOTO (this->private, err);
cb8e9e
 
cb8e9e
-        local->selfheal.force_mkdir = force ? _gf_true : _gf_false;
cb8e9e
+        local = frame->local;
cb8e9e
+        layout = local->layout;
cb8e9e
+        loc    = &local->loc;
cb8e9e
 
cb8e9e
-        for (i = 0; i < layout->cnt; i++) {
cb8e9e
-                if (layout->list[i].err == ENOENT || force)
cb8e9e
-                        missing_dirs++;
cb8e9e
-        }
cb8e9e
+        if (op_ret < 0) {
cb8e9e
+                gf_msg (this->name, GF_LOG_WARNING, op_errno,
cb8e9e
+                        DHT_MSG_INODE_LK_ERROR,
cb8e9e
+                        "acquiring inodelk failed for %s",
cb8e9e
+                        loc->path);
cb8e9e
 
cb8e9e
-        if (missing_dirs == 0) {
cb8e9e
-                dht_selfheal_dir_setattr (frame, loc, &local->stbuf, 0xffffffff, layout);
cb8e9e
-                return 0;
cb8e9e
+                local->op_ret = -1;
cb8e9e
+                local->op_errno = (op_errno == EAGAIN) ? EBUSY : op_errno;
cb8e9e
+                goto err;
cb8e9e
         }
cb8e9e
 
cb8e9e
-        local->call_cnt = missing_dirs;
cb8e9e
         if (!gf_uuid_is_null (local->gfid)) {
cb8e9e
                 dict = dict_new ();
cb8e9e
                 if (!dict)
cb8e9e
@@ -1223,6 +1221,7 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
cb8e9e
                                 " key = gfid-req", loc->path);
cb8e9e
         } else if (local->params) {
cb8e9e
                 /* Send the dictionary from higher layers directly */
cb8e9e
+
cb8e9e
                 dict = dict_ref (local->params);
cb8e9e
         }
cb8e9e
         /* Set acls */
cb8e9e
@@ -1234,8 +1233,18 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
cb8e9e
                         DHT_MSG_DICT_SET_FAILED,
cb8e9e
                         "dict is NULL, need to make sure gfids are same");
cb8e9e
 
cb8e9e
+
cb8e9e
+        /* We don't have to do a lookup here again:
cb8e9e
+            1) Parallel rmdir would had removed the directory and locking would
cb8e9e
+               have anyway failed with an ESTALE on all subvols. Hence selfheal
cb8e9e
+               will never create the directory.
cb8e9e
+            2) Parallel lookup creating directory does not have to be mutually
cb8e9e
+               exclusive for the mkdir phase of lookup selfheal.
cb8e9e
+        */
cb8e9e
+
cb8e9e
         for (i = 0; i < layout->cnt; i++) {
cb8e9e
-                if (layout->list[i].err == ENOENT || force) {
cb8e9e
+                if (layout->list[i].err == ENOENT ||
cb8e9e
+                    local->selfheal.force_mkdir) {
cb8e9e
                         gf_msg_debug (this->name, 0,
cb8e9e
                                       "Creating directory %s on subvol %s",
cb8e9e
                                       loc->path, layout->list[i].xlator->name);
cb8e9e
@@ -1254,6 +1263,82 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
cb8e9e
                 dict_unref (dict);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
+
cb8e9e
+err:
cb8e9e
+        dht_selfheal_dir_finish (frame, this, -1, 0);
cb8e9e
+        return 0;
cb8e9e
+}
cb8e9e
+
cb8e9e
+int
cb8e9e
+dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
cb8e9e
+                        dht_layout_t *layout, int force)
cb8e9e
+{
cb8e9e
+        int           missing_dirs = 0;
cb8e9e
+        int           i     = 0;
cb8e9e
+        int           ret   = -1;
cb8e9e
+        int           count = 1;
cb8e9e
+        dht_local_t  *local = NULL;
cb8e9e
+        dht_conf_t   *conf  = NULL;
cb8e9e
+        xlator_t     *this = NULL;
cb8e9e
+        dht_lock_t   **lk_array = NULL;
cb8e9e
+
cb8e9e
+        local = frame->local;
cb8e9e
+        this = frame->this;
cb8e9e
+        conf = this->private;
cb8e9e
+
cb8e9e
+        local->selfheal.force_mkdir = force ? _gf_true : _gf_false;
cb8e9e
+
cb8e9e
+        for (i = 0; i < layout->cnt; i++) {
cb8e9e
+                if (layout->list[i].err == ENOENT || force)
cb8e9e
+                        missing_dirs++;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        if (missing_dirs == 0) {
cb8e9e
+                dht_selfheal_dir_setattr (frame, loc, &local->stbuf,
cb8e9e
+                                          0xffffffff, layout);
cb8e9e
+                return 0;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        local->call_cnt = missing_dirs;
cb8e9e
+        count = conf->subvolume_cnt;
cb8e9e
+
cb8e9e
+        /* Locking on all subvols in the mkdir phase of lookup selfheal is
cb8e9e
+           is done to synchronize with rmdir/rename.
cb8e9e
+        */
cb8e9e
+        lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
cb8e9e
+        if (lk_array == NULL)
cb8e9e
+                goto err;
cb8e9e
+
cb8e9e
+        for (i = 0; i < count; i++) {
cb8e9e
+                lk_array[i] = dht_lock_new (frame->this,
cb8e9e
+                                            conf->subvolumes[i],
cb8e9e
+                                            &local->loc, F_WRLCK,
cb8e9e
+                                            DHT_LAYOUT_HEAL_DOMAIN);
cb8e9e
+                if (lk_array[i] == NULL)
cb8e9e
+                        goto err;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        local->lock.locks = lk_array;
cb8e9e
+        local->lock.lk_count = count;
cb8e9e
+
cb8e9e
+        ret = dht_blocking_inodelk (frame, lk_array, count,
cb8e9e
+                                    IGNORE_ENOENT_ESTALE,
cb8e9e
+                                    dht_selfheal_dir_mkdir_lock_cbk);
cb8e9e
+
cb8e9e
+        if (ret < 0) {
cb8e9e
+                local->lock.locks = NULL;
cb8e9e
+                local->lock.lk_count = 0;
cb8e9e
+                goto err;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        return 0;
cb8e9e
+err:
cb8e9e
+        if (lk_array != NULL) {
cb8e9e
+                dht_lock_array_free (lk_array, count);
cb8e9e
+                GF_FREE (lk_array);
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        return -1;
cb8e9e
 }
cb8e9e
 
cb8e9e
 int
cb8e9e
@@ -1825,7 +1910,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,
cb8e9e
 
cb8e9e
 sorry_no_fix:
cb8e9e
         /* TODO: need to put appropriate local->op_errno */
cb8e9e
-        dht_selfheal_dir_finish (frame, this, ret);
cb8e9e
+        dht_selfheal_dir_finish (frame, this, ret, 0);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
@@ -1893,7 +1978,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,
cb8e9e
 
cb8e9e
 sorry_no_fix:
cb8e9e
         /* TODO: need to put appropriate local->op_errno */
cb8e9e
-        dht_selfheal_dir_finish (frame, this, ret);
cb8e9e
+        dht_selfheal_dir_finish (frame, this, ret, 0);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
@@ -2244,7 +2329,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
cb8e9e
         local->lock.locks = lk_array;
cb8e9e
         local->lock.lk_count = count;
cb8e9e
 
cb8e9e
-        ret = dht_blocking_inodelk (frame, lk_array, count,
cb8e9e
+        ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
cb8e9e
                                     dht_update_commit_hash_for_layout_resume);
cb8e9e
         if (ret < 0) {
cb8e9e
                 local->lock.locks = NULL;
cb8e9e
@@ -2255,13 +2340,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
cb8e9e
         return 0;
cb8e9e
 err:
cb8e9e
         if (lk_array != NULL) {
cb8e9e
-                int tmp_count = 0, i = 0;
cb8e9e
-
cb8e9e
-                for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
cb8e9e
-                        ;
cb8e9e
-                }
cb8e9e
-
cb8e9e
-                dht_lock_array_free (lk_array, tmp_count);
cb8e9e
+                dht_lock_array_free (lk_array, count);
cb8e9e
                 GF_FREE (lk_array);
cb8e9e
         }
cb8e9e
 
cb8e9e
-- 
cb8e9e
1.7.1
cb8e9e