From a63f1839e93e6d7cf19b2f86b6d0601d54ca703c Mon Sep 17 00:00:00 2001
From: Sakshi <sabansal@redhat.com>
Date: Thu, 16 Jul 2015 14:31:03 +0530
Subject: [PATCH 49/80] dht: lock on subvols to prevent lookup vs rmdir race
There is a possibility that while an rmdir is completed on
some non-hashed subvol and proceeding to others, a lookup
selfheal can recreate the same directory on those subvols
for which the rmdir had succeeded. Now the deletion of the
parent directory will fail with an ENOTEMPTY.
To fix this take blocking inodelk on the subvols before
starting rmdir. Selfheal must also take blocking inodelk
before creating the entry.
BUG: 1115367
Change-Id: If8d2ac06d8fb05b968d864e979ec21d3e7a38f22
Signed-off-by: Sakshi <sabansal@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/71479
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
---
xlators/cluster/dht/src/dht-common.c | 192 +++++++++++++++++++---
xlators/cluster/dht/src/dht-common.h | 11 +-
xlators/cluster/dht/src/dht-helper.c | 33 +++-
xlators/cluster/dht/src/dht-rename.c | 2 +-
xlators/cluster/dht/src/dht-selfheal.c | 285 ++++++++++++++++++++++++++------
5 files changed, 435 insertions(+), 88 deletions(-)
diff --git a/xlators/cluster/dht/src/dht-common.c b/xlators/cluster/dht/src/dht-common.c
index 7755eba..36244e7 100644
--- a/xlators/cluster/dht/src/dht-common.c
+++ b/xlators/cluster/dht/src/dht-common.c
@@ -60,6 +60,9 @@ int32_t dht_set_fixed_dir_stat (struct iatt *stat)
int
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this);
+
+int
dht_aggregate_quota_xattr (dict_t *dst, char *key, data_t *value)
{
int ret = -1;
@@ -4673,6 +4676,10 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
if (IA_ISINVAL(orig_entry->d_stat.ia_type)) {
/*stat failed somewhere- ignore this entry*/
+ gf_msg_debug (this->name, EINVAL,
+ "Invalid stat, ignoring entry "
+ "%s gfid %s", orig_entry->d_name,
+ uuid_utoa (orig_entry->d_stat.ia_gfid));
continue;
}
@@ -4685,7 +4692,6 @@ dht_readdirp_cbk (call_frame_t *frame, void *cookie, xlator_t *this, int op_ret,
* corresponding hashed subvolume will take care of the
* directory entry.
*/
-
if (readdir_optimize) {
if (prev->this == local->first_up_subvol)
goto list;
@@ -5203,7 +5209,7 @@ out:
if (local && local->lock.locks) {
/* store op_errno for failure case*/
local->op_errno = op_errno;
- local->refresh_layout_unlock (frame, this, op_ret);
+ local->refresh_layout_unlock (frame, this, op_ret, 1);
if (op_ret == 0) {
DHT_STACK_UNWIND (mknod, frame, op_ret, op_errno,
@@ -5261,7 +5267,7 @@ dht_mknod_linkfile_create_cbk (call_frame_t *frame, void *cookie,
return 0;
err:
if (local && local->lock.locks) {
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
} else {
DHT_STACK_UNWIND (mknod, frame, -1,
op_errno, NULL, NULL, NULL,
@@ -5369,7 +5375,7 @@ dht_mknod_do (call_frame_t *frame)
local->umask, local->params);
return 0;
err:
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
return 0;
}
@@ -5384,7 +5390,8 @@ dht_mknod_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
-dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret)
+dht_mknod_finish (call_frame_t *frame, xlator_t *this, int op_ret,
+ int invoke_cbk)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@@ -5459,7 +5466,7 @@ dht_mknod_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- dht_mknod_finish (frame, this, -1);
+ dht_mknod_finish (frame, this, -1, 0);
return 0;
}
@@ -5490,7 +5497,7 @@ dht_mknod_lock (call_frame_t *frame, xlator_t *subvol)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
- dht_mknod_lock_cbk);
+ IGNORE_ENOENT_ESTALE, dht_mknod_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@@ -6030,7 +6037,7 @@ out:
if (local && local->lock.locks) {
/* store op_errno for failure case*/
local->op_errno = op_errno;
- local->refresh_layout_unlock (frame, this, op_ret);
+ local->refresh_layout_unlock (frame, this, op_ret, 1);
if (op_ret == 0) {
DHT_STACK_UNWIND (create, frame, op_ret, op_errno, fd,
@@ -6089,7 +6096,7 @@ dht_create_linkfile_create_cbk (call_frame_t *frame, void *cookie,
err:
if (local && local->lock.locks) {
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
} else {
DHT_STACK_UNWIND (create, frame, -1,
op_errno, NULL, NULL, NULL,
@@ -6256,7 +6263,7 @@ dht_create_do (call_frame_t *frame)
local->umask, local->fd, local->params);
return 0;
err:
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
return 0;
}
@@ -6270,7 +6277,8 @@ dht_create_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int32_t
-dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret)
+dht_create_finish (call_frame_t *frame, xlator_t *this, int op_ret,
+ int invoke_cbk)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@@ -6345,7 +6353,7 @@ dht_create_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- dht_create_finish (frame, this, -1);
+ dht_create_finish (frame, this, -1, 0);
return 0;
}
@@ -6376,7 +6384,7 @@ dht_create_lock (call_frame_t *frame, xlator_t *subvol)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
- dht_create_lock_cbk);
+ IGNORE_ENOENT_ESTALE, dht_create_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@@ -6800,8 +6808,8 @@ dht_rmdir_selfheal_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int
dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
- int op_ret, int op_errno, struct iatt *preparent,
- struct iatt *postparent, dict_t *xdata)
+ int op_ret, int op_errno, struct iatt *preparent,
+ struct iatt *postparent, dict_t *xdata)
{
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
@@ -6821,7 +6829,8 @@ dht_rmdir_hashed_subvol_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
local->op_errno = op_errno;
local->op_ret = -1;
if (conf->subvolume_cnt != 1) {
- if (op_errno != ENOENT && op_errno != EACCES) {
+ if (op_errno != ENOENT && op_errno != EACCES
+ && op_errno != ESTALE) {
local->need_selfheal = 1;
}
}
@@ -6845,6 +6854,7 @@ unlock:
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
if (local->need_selfheal) {
+ dht_rmdir_unlock (frame, this);
local->layout =
dht_layout_get (this, local->loc.inode);
@@ -6871,6 +6881,7 @@ unlock:
dht_set_fixed_dir_stat (&local->preparent);
dht_set_fixed_dir_stat (&local->postparent);
+ dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
local->op_errno, &local->preparent,
&local->postparent, NULL);
@@ -6939,6 +6950,7 @@ unlock:
if (done) {
if (local->need_selfheal && local->fop_succeeded) {
+ dht_rmdir_unlock (frame, this);
local->layout =
dht_layout_get (this, local->loc.inode);
@@ -6976,6 +6988,7 @@ unlock:
dht_set_fixed_dir_stat (&local->preparent);
dht_set_fixed_dir_stat (&local->postparent);
+ dht_rmdir_unlock (frame, this);
DHT_STACK_UNWIND (rmdir, frame, local->op_ret,
local->op_errno, &local->preparent,
&local->postparent, NULL);
@@ -6987,11 +7000,110 @@ unlock:
int
+dht_rmdir_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ DHT_STACK_DESTROY (frame);
+ return 0;
+}
+
+
+int
+dht_rmdir_unlock (call_frame_t *frame, xlator_t *this)
+{
+ dht_local_t *local = NULL, *lock_local = NULL;
+ call_frame_t *lock_frame = NULL;
+ int lock_count = 0;
+
+ local = frame->local;
+ lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
+
+ if (lock_count == 0)
+ goto done;
+
+ lock_frame = copy_frame (frame);
+ if (lock_frame == NULL)
+ goto done;
+
+ lock_local = dht_local_init (lock_frame, &local->loc, NULL,
+ lock_frame->root->op);
+ if (lock_local == NULL)
+ goto done;
+
+ lock_local->lock.locks = local->lock.locks;
+ lock_local->lock.lk_count = local->lock.lk_count;
+
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+ dht_unlock_inodelk (lock_frame, lock_local->lock.locks,
+ lock_local->lock.lk_count,
+ dht_rmdir_unlock_cbk);
+ lock_frame = NULL;
+
+done:
+ if (lock_frame != NULL) {
+ DHT_STACK_DESTROY (lock_frame);
+ }
+
+ return 0;
+}
+
+
+int
+dht_rmdir_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+ int32_t op_ret, int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ int i = 0;
+
+ VALIDATE_OR_GOTO (this->private, err);
+
+ conf = this->private;
+ local = frame->local;
+
+ if (op_ret < 0) {
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_INODE_LK_ERROR,
+ "acquiring inodelk failed rmdir for %s)",
+ local->loc.path);
+
+ local->op_ret = -1;
+ local->op_errno = op_errno;
+ goto err;
+ }
+
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ if (local->hashed_subvol &&
+ (local->hashed_subvol == conf->subvolumes[i]))
+ continue;
+
+ STACK_WIND (frame, dht_rmdir_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->rmdir,
+ &local->loc, local->flags, NULL);
+ }
+
+ return 0;
+
+err:
+ /* No harm in calling an extra rmdir unlock */
+ dht_rmdir_unlock (frame, this);
+ DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
+ &local->preparent, &local->postparent, NULL);
+
+ return 0;
+}
+
+
+int
dht_rmdir_do (call_frame_t *frame, xlator_t *this)
{
dht_local_t *local = NULL;
dht_conf_t *conf = NULL;
- int i = 0;
+ dht_lock_t **lk_array = NULL;
+ int i = 0, ret = -1;
+ int count = 1;
xlator_t *hashed_subvol = NULL;
char gfid[GF_UUID_BUF_SIZE] ={0};
@@ -7005,7 +7117,6 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
local->call_cnt = conf->subvolume_cnt;
-
/* first remove from non-hashed_subvol */
hashed_subvol = dht_subvol_get_hashed (this, &local->loc);
@@ -7029,15 +7140,39 @@ dht_rmdir_do (call_frame_t *frame, xlator_t *this)
return 0;
}
- for (i = 0; i < conf->subvolume_cnt; i++) {
- if (hashed_subvol &&
- (hashed_subvol == conf->subvolumes[i]))
- continue;
+ count = conf->subvolume_cnt;
- STACK_WIND (frame, dht_rmdir_cbk,
- conf->subvolumes[i],
- conf->subvolumes[i]->fops->rmdir,
- &local->loc, local->flags, NULL);
+ lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
+ if (lk_array == NULL) {
+ local->op_ret = -1;
+ local->op_errno = ENOMEM;
+ goto err;
+ }
+
+ for (i = 0; i < count; i++) {
+ lk_array[i] = dht_lock_new (frame->this,
+ conf->subvolumes[i],
+ &local->loc, F_WRLCK,
+ DHT_LAYOUT_HEAL_DOMAIN);
+ if (lk_array[i] == NULL) {
+ local->op_ret = -1;
+ local->op_errno = EINVAL;
+ goto err;
+ }
+ }
+
+ local->lock.locks = lk_array;
+ local->lock.lk_count = count;
+
+ ret = dht_blocking_inodelk (frame, lk_array, count,
+ IGNORE_ENOENT_ESTALE,
+ dht_rmdir_lock_cbk);
+ if (ret < 0) {
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+ local->op_ret = -1;
+ local->op_errno = errno ? errno : EINVAL;
+ goto err;
}
return 0;
@@ -7046,6 +7181,11 @@ err:
dht_set_fixed_dir_stat (&local->preparent);
dht_set_fixed_dir_stat (&local->postparent);
+ if (lk_array != NULL) {
+ dht_lock_array_free (lk_array, count);
+ GF_FREE (lk_array);
+ }
+
DHT_STACK_UNWIND (rmdir, frame, local->op_ret, local->op_errno,
&local->preparent, &local->postparent, NULL);
return 0;
diff --git a/xlators/cluster/dht/src/dht-common.h b/xlators/cluster/dht/src/dht-common.h
index edfb805..d06224c 100644
--- a/xlators/cluster/dht/src/dht-common.h
+++ b/xlators/cluster/dht/src/dht-common.h
@@ -50,7 +50,7 @@ typedef int (*dht_defrag_cbk_fn_t) (xlator_t *this, xlator_t *dst_node,
call_frame_t *frame, int ret);
typedef int (*dht_refresh_layout_unlock) (call_frame_t *frame, xlator_t *this,
- int op_ret);
+ int op_ret, int invoke_cbk);
typedef int (*dht_refresh_layout_done_handle) (call_frame_t *frame);
@@ -145,6 +145,11 @@ typedef enum {
qdstatfs_action_COMPARE,
} qdstatfs_action_t;
+typedef enum {
+ FAIL_ON_ANY_ERROR,
+ IGNORE_ENOENT_ESTALE
+} dht_reaction_type_t;
+
struct dht_skip_linkto_unlink {
gf_boolean_t handle_valid_link;
@@ -275,6 +280,7 @@ struct dht_local {
fop_inodelk_cbk_t inodelk_cbk;
dht_lock_t **locks;
int lk_count;
+ dht_reaction_type_t reaction;
/* whether locking failed on _any_ of the "locks" above */
int op_ret;
@@ -1132,7 +1138,8 @@ dht_nonblocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
*/
int
dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
- int lk_count, fop_inodelk_cbk_t inodelk_cbk);
+ int lk_count, dht_reaction_type_t reaction,
+ fop_inodelk_cbk_t inodelk_cbk);
int32_t
dht_unlock_inodelk (call_frame_t *frame, dht_lock_t **lk_array, int lk_count,
diff --git a/xlators/cluster/dht/src/dht-helper.c b/xlators/cluster/dht/src/dht-helper.c
index df31cdb..881db81 100644
--- a/xlators/cluster/dht/src/dht-helper.c
+++ b/xlators/cluster/dht/src/dht-helper.c
@@ -496,6 +496,7 @@ dht_lock_new (xlator_t *this, xlator_t *xl, loc_t *loc, short type,
lock->xl = xl;
lock->type = type;
+
lock->domain = gf_strdup (domain);
if (lock->domain == NULL) {
dht_lock_free (lock);
@@ -1978,21 +1979,41 @@ dht_blocking_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
int32_t op_ret, int32_t op_errno, dict_t *xdata)
{
int lk_index = 0;
+ int i = 0;
dht_local_t *local = NULL;
lk_index = (long) cookie;
local = frame->local;
-
if (op_ret == 0) {
local->lock.locks[lk_index]->locked = _gf_true;
} else {
- local->lock.op_ret = -1;
- local->lock.op_errno = op_errno;
- goto cleanup;
+ switch (op_errno) {
+ case ESTALE:
+ case ENOENT:
+ if (local->lock.reaction != IGNORE_ENOENT_ESTALE) {
+ local->lock.op_ret = -1;
+ local->lock.op_errno = op_errno;
+ goto cleanup;
+ }
+ break;
+ default:
+ local->lock.op_ret = -1;
+ local->lock.op_errno = op_errno;
+ goto cleanup;
+ }
}
if (lk_index == (local->lock.lk_count - 1)) {
+ for (i = 0; (i < local->lock.lk_count) &&
+ (!local->lock.locks[i]->locked); i++)
+ ;
+
+ if (i == local->lock.lk_count) {
+ local->lock.op_ret = -1;
+ local->lock.op_errno = op_errno;
+ }
+
dht_inodelk_done (frame);
} else {
dht_blocking_inodelk_rec (frame, ++lk_index);
@@ -2066,7 +2087,8 @@ out:
int
dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
- int lk_count, fop_inodelk_cbk_t inodelk_cbk)
+ int lk_count, dht_reaction_type_t reaction,
+ fop_inodelk_cbk_t inodelk_cbk)
{
int ret = -1;
call_frame_t *lock_frame = NULL;
@@ -2088,6 +2110,7 @@ dht_blocking_inodelk (call_frame_t *frame, dht_lock_t **lk_array,
dht_set_lkowner (lk_array, lk_count, &lock_frame->root->lk_owner);
local = lock_frame->local;
+ local->lock.reaction = reaction;
local->main_frame = frame;
dht_blocking_inodelk_rec (lock_frame, 0);
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
index 79b8706..3b636c5 100644
--- a/xlators/cluster/dht/src/dht-rename.c
+++ b/xlators/cluster/dht/src/dht-rename.c
@@ -1320,7 +1320,7 @@ dht_rename_lock (call_frame_t *frame)
local->lock.lk_count = count;
ret = dht_blocking_inodelk (frame, lk_array, count,
- dht_rename_lock_cbk);
+ FAIL_ON_ANY_ERROR, dht_rename_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
local->lock.lk_count = 0;
diff --git a/xlators/cluster/dht/src/dht-selfheal.c b/xlators/cluster/dht/src/dht-selfheal.c
index fd55303..307116a 100644
--- a/xlators/cluster/dht/src/dht-selfheal.c
+++ b/xlators/cluster/dht/src/dht-selfheal.c
@@ -82,7 +82,8 @@ dht_selfheal_unlock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
}
int
-dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
+dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret,
+ int invoke_cbk)
{
dht_local_t *local = NULL, *lock_local = NULL;
call_frame_t *lock_frame = NULL;
@@ -90,7 +91,6 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
local = frame->local;
lock_count = dht_lock_count (local->lock.locks, local->lock.lk_count);
-
if (lock_count == 0)
goto done;
@@ -117,8 +117,9 @@ dht_selfheal_dir_finish (call_frame_t *frame, xlator_t *this, int ret)
lock_frame = NULL;
done:
- local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
- local->op_errno, NULL);
+ if (invoke_cbk)
+ local->selfheal.dir_cbk (frame, NULL, frame->this, ret,
+ local->op_errno, NULL);
if (lock_frame != NULL) {
DHT_STACK_DESTROY (lock_frame);
}
@@ -160,13 +161,13 @@ dht_refresh_layout_done (call_frame_t *frame)
dht_layout_unref (frame->this, heal);
- dht_selfheal_dir_finish (frame, frame->this, 0);
+ dht_selfheal_dir_finish (frame, frame->this, 0, 1);
}
return 0;
err:
- dht_selfheal_dir_finish (frame, frame->this, -1);
+ dht_selfheal_dir_finish (frame, frame->this, -1, 1);
return 0;
}
@@ -226,8 +227,7 @@ unlock:
return 0;
err:
- local->refresh_layout_unlock (frame, this, -1);
-
+ local->refresh_layout_unlock (frame, this, -1, 1);
return 0;
}
@@ -293,7 +293,7 @@ dht_refresh_layout (call_frame_t *frame)
return 0;
out:
- local->refresh_layout_unlock (frame, this, -1);
+ local->refresh_layout_unlock (frame, this, -1, 1);
return 0;
}
@@ -322,7 +322,7 @@ dht_selfheal_layout_lock_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
return 0;
err:
- dht_selfheal_dir_finish (frame, this, -1);
+ dht_selfheal_dir_finish (frame, this, -1, 1);
return 0;
}
@@ -583,7 +583,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
local->lock.locks = lk_array;
local->lock.lk_count = count;
- ret = dht_blocking_inodelk (frame, lk_array, count,
+ ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
dht_selfheal_layout_lock_cbk);
if (ret < 0) {
local->lock.locks = NULL;
@@ -594,13 +594,7 @@ dht_selfheal_layout_lock (call_frame_t *frame, dht_layout_t *layout,
return 0;
err:
if (lk_array != NULL) {
- int tmp_count = 0, i = 0;
-
- for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
- ;
- }
-
- dht_lock_array_free (lk_array, tmp_count);
+ dht_lock_array_free (lk_array, count);
GF_FREE (lk_array);
}
@@ -653,7 +647,7 @@ dht_selfheal_dir_xattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
- dht_selfheal_dir_finish (frame, this, 0);
+ dht_selfheal_dir_finish (frame, this, 0, 1);
}
return 0;
@@ -886,7 +880,7 @@ dht_selfheal_dir_xattr (call_frame_t *frame, loc_t *loc, dht_layout_t *layout)
missing_xattr, loc->path);
if (missing_xattr == 0) {
- dht_selfheal_dir_finish (frame, this, 0);
+ dht_selfheal_dir_finish (frame, this, 0, 1);
return 0;
}
@@ -1013,7 +1007,7 @@ dht_selfheal_dir_xattr_for_nameless_lookup (call_frame_t *frame, loc_t *loc,
missing_xattr, loc->path);
if (missing_xattr == 0) {
- dht_selfheal_dir_finish (frame, this, 0);
+ dht_selfheal_dir_finish (frame, this, 0, 1);
return 0;
}
@@ -1081,7 +1075,7 @@ dht_selfheal_dir_setattr_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_should_heal_layout);
if (ret < 0) {
- dht_selfheal_dir_finish (frame, this, -1);
+ dht_selfheal_dir_finish (frame, this, -1, 1);
}
}
@@ -1112,7 +1106,7 @@ dht_selfheal_dir_setattr (call_frame_t *frame, loc_t *loc, struct iatt *stbuf,
dht_should_heal_layout);
if (ret < 0) {
- dht_selfheal_dir_finish (frame, this, -1);
+ dht_selfheal_dir_finish (frame, this, -1, 1);
}
return 0;
@@ -1150,7 +1144,7 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_layout_t *layout = NULL;
call_frame_t *prev = NULL;
xlator_t *subvol = NULL;
- int i = 0;
+ int i = 0, ret = -1;
int this_call_cnt = 0;
char gfid[GF_UUID_BUF_SIZE] = {0};
@@ -1182,11 +1176,13 @@ dht_selfheal_dir_mkdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
dht_iatt_merge (this, &local->stbuf, stbuf, prev->this);
dht_iatt_merge (this, &local->preparent, preparent, prev->this);
dht_iatt_merge (this, &local->postparent, postparent, prev->this);
+ ret = 0;
out:
this_call_cnt = dht_frame_return (frame);
if (is_last_call (this_call_cnt)) {
+ dht_selfheal_dir_finish (frame, this, ret, 0);
dht_selfheal_dir_setattr (frame, &local->loc, &local->stbuf, 0xffffff, layout);
}
@@ -1239,32 +1235,21 @@ out:
}
int
-dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
- dht_layout_t *layout, int force)
+dht_selfheal_dir_mkdir_lookup_done (call_frame_t *frame, xlator_t *this)
{
- int missing_dirs = 0;
+ dht_local_t *local = NULL;
int i = 0;
int ret = -1;
- dht_local_t *local = NULL;
- xlator_t *this = NULL;
dict_t *dict = NULL;
+ dht_layout_t *layout = NULL;
+ loc_t *loc = NULL;
- local = frame->local;
- this = frame->this;
-
- local->selfheal.force_mkdir = force ? _gf_true : _gf_false;
-
- for (i = 0; i < layout->cnt; i++) {
- if (layout->list[i].err == ENOENT || force)
- missing_dirs++;
- }
+ VALIDATE_OR_GOTO (this->private, err);
- if (missing_dirs == 0) {
- dht_selfheal_dir_setattr (frame, loc, &local->stbuf, 0xffffffff, layout);
- return 0;
- }
+ local = frame->local;
+ layout = local->layout;
+ loc = &local->loc;
- local->call_cnt = missing_dirs;
if (!gf_uuid_is_null (local->gfid)) {
dict = dict_new ();
if (!dict)
@@ -1278,6 +1263,7 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
" key = gfid-req", loc->path);
} else if (local->params) {
/* Send the dictionary from higher layers directly */
+
dict = dict_ref (local->params);
}
/* Set acls */
@@ -1290,7 +1276,8 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
"dict is NULL, need to make sure gfids are same");
for (i = 0; i < layout->cnt; i++) {
- if (layout->list[i].err == ENOENT || force) {
+ if (layout->list[i].err == ENOENT ||
+ local->selfheal.force_mkdir) {
gf_msg_debug (this->name, 0,
"Creating directory %s on subvol %s",
loc->path, layout->list[i].xlator->name);
@@ -1309,6 +1296,202 @@ dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
dict_unref (dict);
return 0;
+
+err:
+ dht_selfheal_dir_finish (frame, this, -1, 1);
+ return 0;
+}
+
+int
+dht_selfheal_dir_mkdir_lookup_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int op_ret, int op_errno,
+ inode_t *inode, struct iatt *stbuf,
+ dict_t *xattr, struct iatt *postparent)
+{
+ dht_local_t *local = NULL;
+ int i = 0;
+ int this_call_cnt = 0;
+ int missing_dirs = 0;
+ dht_layout_t *layout = NULL;
+ loc_t *loc = NULL;
+
+ VALIDATE_OR_GOTO (this->private, err);
+
+ local = frame->local;
+ layout = local->layout;
+ loc = &local->loc;
+
+ this_call_cnt = dht_frame_return (frame);
+
+ LOCK (&frame->lock);
+ {
+ if ((op_ret < 0) && (op_errno == ENOENT || op_errno == ESTALE))
+ local->selfheal.hole_cnt = !local->selfheal.hole_cnt ? 1
+ : local->selfheal.hole_cnt + 1;
+ }
+ UNLOCK (&frame->lock);
+
+ if (is_last_call (this_call_cnt)) {
+ if (local->selfheal.hole_cnt == layout->cnt) {
+ gf_msg_debug (this->name, op_errno,
+ "Lookup failed, an rmdir could have "
+ "deleted this entry %s", loc->name);
+ local->op_errno = op_errno;
+ goto err;
+ } else {
+ for (i = 0; i < layout->cnt; i++) {
+ if (layout->list[i].err == ENOENT ||
+ layout->list[i].err == ESTALE ||
+ local->selfheal.force_mkdir)
+ missing_dirs++;
+ }
+
+ if (missing_dirs == 0) {
+ dht_selfheal_dir_finish (frame, this, 0, 0);
+ dht_selfheal_dir_setattr (frame, loc,
+ &local->stbuf,
+ 0xffffffff, layout);
+ return 0;
+ }
+
+ local->call_cnt = missing_dirs;
+ dht_selfheal_dir_mkdir_lookup_done (frame, this);
+ }
+ }
+
+ return 0;
+
+err:
+ dht_selfheal_dir_finish (frame, this, -1, 1);
+ return 0;
+}
+
+
+int
+dht_selfheal_dir_mkdir_lock_cbk (call_frame_t *frame, void *cookie,
+ xlator_t *this, int32_t op_ret,
+ int32_t op_errno, dict_t *xdata)
+{
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ int i = 0;
+
+ VALIDATE_OR_GOTO (this->private, err);
+
+ conf = this->private;
+ local = frame->local;
+
+ local->call_cnt = conf->subvolume_cnt;
+
+ if (op_ret < 0) {
+
+ /* We get this error when the directory entry was not created
+ * on a newky attatched tier subvol. Hence proceed and do mkdir
+ * on the tier subvol.
+ */
+ if (op_errno == EINVAL) {
+ local->call_cnt = 1;
+ dht_selfheal_dir_mkdir_lookup_done (frame, this);
+ return 0;
+ }
+
+ gf_msg (this->name, GF_LOG_WARNING, op_errno,
+ DHT_MSG_INODE_LK_ERROR,
+ "acquiring inodelk failed for %s",
+ local->loc.path);
+
+ local->op_errno = op_errno;
+ goto err;
+ }
+
+ /* After getting locks, perform lookup again to ensure that the
+ directory was not deleted by a racing rmdir
+ */
+
+ for (i = 0; i < conf->subvolume_cnt; i++) {
+ STACK_WIND (frame, dht_selfheal_dir_mkdir_lookup_cbk,
+ conf->subvolumes[i],
+ conf->subvolumes[i]->fops->lookup,
+ &local->loc, NULL);
+ }
+
+ return 0;
+
+err:
+ dht_selfheal_dir_finish (frame, this, -1, 1);
+ return 0;
+}
+
+int
+dht_selfheal_dir_mkdir (call_frame_t *frame, loc_t *loc,
+ dht_layout_t *layout, int force)
+{
+ int missing_dirs = 0;
+ int i = 0;
+ int ret = -1;
+ int count = 1;
+ dht_local_t *local = NULL;
+ dht_conf_t *conf = NULL;
+ xlator_t *this = NULL;
+ dht_lock_t **lk_array = NULL;
+
+ local = frame->local;
+ this = frame->this;
+ conf = this->private;
+
+ local->selfheal.force_mkdir = force;
+ local->selfheal.hole_cnt = 0;
+
+ for (i = 0; i < layout->cnt; i++) {
+ if (layout->list[i].err == ENOENT || force)
+ missing_dirs++;
+ }
+
+ if (missing_dirs == 0) {
+ dht_selfheal_dir_setattr (frame, loc, &local->stbuf,
+ 0xffffffff, layout);
+ return 0;
+ }
+
+ count = conf->subvolume_cnt;
+
+ /* Locking on all subvols in the mkdir phase of lookup selfheal is
+ is done to synchronize with rmdir/rename.
+ */
+ lk_array = GF_CALLOC (count, sizeof (*lk_array), gf_common_mt_char);
+ if (lk_array == NULL)
+ goto err;
+
+ for (i = 0; i < count; i++) {
+ lk_array[i] = dht_lock_new (frame->this,
+ conf->subvolumes[i],
+ &local->loc, F_WRLCK,
+ DHT_LAYOUT_HEAL_DOMAIN);
+ if (lk_array[i] == NULL)
+ goto err;
+ }
+
+ local->lock.locks = lk_array;
+ local->lock.lk_count = count;
+
+ ret = dht_blocking_inodelk (frame, lk_array, count,
+ IGNORE_ENOENT_ESTALE,
+ dht_selfheal_dir_mkdir_lock_cbk);
+
+ if (ret < 0) {
+ local->lock.locks = NULL;
+ local->lock.lk_count = 0;
+ goto err;
+ }
+
+ return 0;
+err:
+ if (lk_array != NULL) {
+ dht_lock_array_free (lk_array, count);
+ GF_FREE (lk_array);
+ }
+
+ return -1;
}
int
@@ -1882,7 +2065,7 @@ dht_selfheal_directory (call_frame_t *frame, dht_selfheal_dir_cbk_t dir_cbk,
sorry_no_fix:
/* TODO: need to put appropriate local->op_errno */
- dht_selfheal_dir_finish (frame, this, ret);
+ dht_selfheal_dir_finish (frame, this, ret, 1);
return 0;
}
@@ -1950,7 +2133,7 @@ dht_selfheal_directory_for_nameless_lookup (call_frame_t *frame,
sorry_no_fix:
/* TODO: need to put appropriate local->op_errno */
- dht_selfheal_dir_finish (frame, this, ret);
+ dht_selfheal_dir_finish (frame, this, ret, 1);
return 0;
@@ -2301,7 +2484,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
local->lock.locks = lk_array;
local->lock.lk_count = count;
- ret = dht_blocking_inodelk (frame, lk_array, count,
+ ret = dht_blocking_inodelk (frame, lk_array, count, FAIL_ON_ANY_ERROR,
dht_update_commit_hash_for_layout_resume);
if (ret < 0) {
local->lock.locks = NULL;
@@ -2312,13 +2495,7 @@ dht_update_commit_hash_for_layout (call_frame_t *frame)
return 0;
err:
if (lk_array != NULL) {
- int tmp_count = 0, i = 0;
-
- for (i = 0; (i < count) && (lk_array[i]); i++, tmp_count++) {
- ;
- }
-
- dht_lock_array_free (lk_array, tmp_count);
+ dht_lock_array_free (lk_array, count);
GF_FREE (lk_array);
}
--
1.7.1