From 753f881856efa3d55ea16dd5b088423da683c7f6 Mon Sep 17 00:00:00 2001 From: vmallika Date: Thu, 13 Aug 2015 14:14:41 +0530 Subject: [PATCH 263/279] quota: fix parents caching during build ancestry This is a backport of http://review.gluster.org/11574 In build ancestry, we get the list of parents for a file, these parents are cached in inode ctx. This caching is not happening because posix is not setting d_stat information in the leaf node entry This patch fixes the issue Inode-ctx is not updated with new parent when rename performed on same directory. This patch fixes the issue There is a possibility of caching stale entries, consider below example: 1) build_ancestry invoked on a file 2) rename is invoked on the same file 3) buils_ancestry prepared entries of old parent 4) rename completed and in cbk old parent is replaced with new parent in inode ctx 5) now build_ancestry cbk adds old parent to inode ctx In this patch we also remove stale entries in writev and fallocate > Change-Id: Ib1854a41b47b14eb775326588352015c83d034de > BUG: 1240949 > Signed-off-by: vmallika > Reviewed-on: http://review.gluster.org/11574 > Tested-by: Gluster Build System > Reviewed-by: Raghavendra G BUG: 1245448 Change-Id: I71f514aa9285bb3493b9aee9443009b32d2c5a77 Signed-off-by: vmallika Reviewed-on: https://code.engineering.redhat.com/gerrit/55062 Reviewed-by: Raghavendra Gowdappa Tested-by: Raghavendra Gowdappa --- xlators/features/quota/src/quota.c | 147 ++++++++++++++++++------------------ xlators/features/quota/src/quota.h | 5 +- xlators/storage/posix/src/posix.c | 9 +- 3 files changed, 82 insertions(+), 79 deletions(-) diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c index a444713..30445c4 100644 --- a/xlators/features/quota/src/quota.c +++ b/xlators/features/quota/src/quota.c @@ -640,7 +640,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this, } UNLOCK (&ctx->lock); - quota_check_limit (frame, local->validate_loc.inode, this, NULL, NULL); + quota_check_limit (frame, local->validate_loc.inode, this); return 0; unwind: @@ -1033,13 +1033,13 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode, if (local->par_frame) { list_for_each_entry (entry, parents, next) { parent = inode_find (inode->table, entry->par); - quota_check_limit (frame, parent, this, NULL, NULL); + quota_check_limit (frame, parent, this); inode_unref (parent); } } else { list_for_each_entry (entry, parents, next) { parent = do_quota_check_limit (frame, inode, this, - entry); + entry, _gf_true); if (parent) inode_unref (parent); else @@ -1210,8 +1210,7 @@ out: int32_t -quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, - char *name, uuid_t par) +quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this) { int32_t ret = -1, op_errno = EINVAL; inode_t *_inode = NULL, *parent = NULL; @@ -1224,7 +1223,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, gf_boolean_t hard_limit_exceeded = 0; int64_t delta = 0; uint64_t value = 0; - uuid_t trav_uuid = {0,}; gf_boolean_t skip_check = _gf_false; GF_VALIDATE_OR_GOTO ("quota", this, err); @@ -1272,10 +1270,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, } UNLOCK (&local->lock); - if ( par != NULL ) { - gf_uuid_copy (trav_uuid, par); - } - do { /* In a rename operation, enforce should be stopped at common ancestor */ @@ -1319,13 +1313,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, break; } - parent = inode_parent (_inode, trav_uuid, name); - - if (name != NULL) { - name = NULL; - gf_uuid_clear (trav_uuid); - } - + parent = inode_parent (_inode, 0, NULL); if (parent == NULL) { ret = quota_build_ancestry (_inode, quota_check_limit_continuation, @@ -1363,7 +1351,7 @@ err: inode_t * do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, - quota_dentry_t *dentry) + quota_dentry_t *dentry, gf_boolean_t force) { int32_t ret = -1; inode_t *parent = NULL; @@ -1374,8 +1362,12 @@ do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, local = frame->local; parent = inode_parent (inode, dentry->par, dentry->name); - if (parent == NULL) - parent = inode_find (inode->table, dentry->par); + if (parent == NULL) { + if (force) + parent = inode_find (inode->table, dentry->par); + else + goto out; + } if (parent == NULL) goto out; @@ -1391,7 +1383,7 @@ do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, new_frame->local = new_local; new_local->par_frame = frame; - quota_check_limit (new_frame, parent, this, NULL, NULL); + quota_check_limit (new_frame, parent, this); ret = 0; out: @@ -1849,11 +1841,12 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, /* nameless lookup on this inode, allow quota to reconstruct * ancestry as part of check_limit. */ - quota_check_limit (frame, fd->inode, this, NULL, NULL); + quota_check_limit (frame, fd->inode, this); } else { list_for_each_entry_safe (dentry, tmp, &head, next) { par_inode = do_quota_check_limit (frame, fd->inode, - this, dentry); + this, dentry, + _gf_false); if (par_inode == NULL) { /* remove stale entry from inode ctx */ quota_dentry_del (ctx, dentry->name, @@ -1872,7 +1865,7 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd, local->link_count++; } UNLOCK (&local->lock); - quota_check_limit (frame, fd->inode, this, NULL, NULL); + quota_check_limit (frame, fd->inode, this); } while (fail_count != 0) { @@ -1981,7 +1974,7 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, } UNLOCK (&local->lock); - quota_check_limit (frame, loc->parent, this, NULL, NULL); + quota_check_limit (frame, loc->parent, this); return 0; err: @@ -2130,7 +2123,7 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags, } UNLOCK (&local->lock); - quota_check_limit (frame, loc->parent, this, NULL, NULL); + quota_check_limit (frame, loc->parent, this); return 0; err: QUOTA_STACK_UNWIND (create, frame, -1, op_errno, NULL, NULL, NULL, @@ -2373,20 +2366,22 @@ quota_link_continue (call_frame_t *frame) * This needs re-vist if marker accounts only once * for the links created across directories */ - src_parent = inode_parent (local->oldloc.inode, 0, NULL); - dst_parent = inode_parent (local->newloc.inode, 0, NULL); + if (local->oldloc.parent) + src_parent = inode_ref (local->oldloc.parent); + else + src_parent = inode_parent (local->oldloc.inode, 0, + NULL); + dst_parent = local->newloc.parent; /* No need to check quota limit if src and dst parents are same */ if (src_parent == dst_parent || gf_uuid_compare (src_parent->gfid, dst_parent->gfid) == 0) { inode_unref (src_parent); - inode_unref (dst_parent); - goto off; + goto wind; } inode_unref (src_parent); - inode_unref (dst_parent); } quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0); @@ -2405,7 +2400,7 @@ quota_link_continue (call_frame_t *frame) } UNLOCK (&local->lock); - quota_check_limit (frame, local->newloc.parent, this, NULL, NULL); + quota_check_limit (frame, local->newloc.parent, this); return; err: @@ -2413,14 +2408,10 @@ err: NULL, NULL, NULL); return; -off: - frame->local = NULL; - - STACK_WIND_TAIL (frame, FIRST_CHILD(this), - FIRST_CHILD(this)->fops->link, &(local->oldloc), - &(local->newloc), local->xdata); - - quota_local_cleanup (local); +wind: + STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->link, &(local->oldloc), + &(local->newloc), local->xdata); return; } @@ -2438,15 +2429,6 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, WIND_IF_QUOTAOFF (priv->is_quota_on, off); - /* No need to check quota limit if src and dst parents are same */ - if (oldloc->parent && newloc->parent && - !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) { - gf_msg_debug (this->name, GF_LOG_DEBUG, "link %s -> %s are " - "in the same directory, so skip check limit", - oldloc->path, newloc->path); - goto off; - } - local = quota_local_new (); if (local == NULL) { goto err; @@ -2478,6 +2460,15 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, goto err; } + /* No need to check quota limit if src and dst parents are same */ + if (oldloc->parent && newloc->parent && + !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) { + gf_msg_debug (this->name, GF_LOG_DEBUG, "link %s -> %s are " + "in the same directory, so skip check limit", + oldloc->path, newloc->path); + goto wind; + } + stub = fop_link_stub (frame, quota_link_helper, oldloc, newloc, xdata); if (stub == NULL) { goto err; @@ -2494,7 +2485,11 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc, check_ancestory (frame, newloc->parent); /* source parent can be NULL, so do check_ancestory on a file */ - check_ancestory (frame, oldloc->inode); + if (oldloc->parent) + check_ancestory (frame, oldloc->parent); + else + check_ancestory (frame, oldloc->inode); + return 0; err: @@ -2507,6 +2502,12 @@ off: FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata); return 0; + +wind: + STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this), + FIRST_CHILD(this)->fops->link, oldloc, + newloc, xdata); + return 0; } @@ -2532,13 +2533,10 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this, GF_VALIDATE_OR_GOTO ("quota", local, out); - if (QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) { + if (QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) size = buf->ia_blocks * 512; - } - - if (!QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) { + else goto out; - } ret = quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0); if ((ret == -1) || (ctx == NULL)) { @@ -2674,8 +2672,7 @@ quota_rename_get_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this, goto out; } local->delta = ntoh64 (*size); - quota_check_limit (frame, local->newloc.parent, this, - NULL, NULL); + quota_check_limit (frame, local->newloc.parent, this); return 0; out: @@ -2759,7 +2756,7 @@ quota_rename_continue (call_frame_t *frame) return; } - quota_check_limit (frame, local->newloc.parent, this, NULL, NULL); + quota_check_limit (frame, local->newloc.parent, this); return; err: @@ -2783,15 +2780,6 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, WIND_IF_QUOTAOFF (priv->is_quota_on, off); - /* No need to check quota limit if src and dst parents are same */ - if (oldloc->parent && newloc->parent && - !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) { - gf_msg_debug (this->name, 0, "rename %s -> %s are " - "in the same directory, so skip check limit", - oldloc->path, newloc->path); - goto off; - } - local = quota_local_new (); if (local == NULL) { goto err; @@ -2813,6 +2801,15 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc, goto err; } + /* No need to check quota limit if src and dst parents are same */ + if (oldloc->parent && newloc->parent && + !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) { + gf_msg_debug (this->name, 0, "rename %s -> %s are " + "in the same directory, so skip check limit", + oldloc->path, newloc->path); + goto wind; + } + stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc, xdata); if (stub == NULL) { @@ -2843,7 +2840,12 @@ off: STACK_WIND_TAIL (frame, FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata); + return 0; +wind: + STACK_WIND (frame, quota_rename_cbk, + FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc, + newloc, xdata); return 0; } @@ -2973,7 +2975,7 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath, } UNLOCK (&local->lock); - quota_check_limit (frame, loc->parent, this, NULL, NULL); + quota_check_limit (frame, loc->parent, this); return 0; err: @@ -3915,7 +3917,7 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode, } UNLOCK (&local->lock); - quota_check_limit (frame, loc->parent, this, NULL, NULL); + quota_check_limit (frame, loc->parent, this); return 0; err: @@ -4855,11 +4857,12 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, if (parents == 0) { local->link_count = 1; - quota_check_limit (frame, fd->inode, this, NULL, NULL); + quota_check_limit (frame, fd->inode, this); } else { list_for_each_entry_safe (dentry, tmp, &head, next) { par_inode = do_quota_check_limit (frame, fd->inode, - this, dentry); + this, dentry, + _gf_false); if (par_inode == NULL) { /* remove stale entry from inode_ctx */ quota_dentry_del (ctx, dentry->name, @@ -4878,7 +4881,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode, local->link_count++; } UNLOCK (&local->lock); - quota_check_limit (frame, fd->inode, this, NULL, NULL); + quota_check_limit (frame, fd->inode, this); } while (fail_count != 0) { diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h index 0f8cecd..db27e1e 100644 --- a/xlators/features/quota/src/quota.h +++ b/xlators/features/quota/src/quota.h @@ -263,12 +263,11 @@ void quota_get_limit_dir (call_frame_t *frame, inode_t *cur_inode, xlator_t *this); int32_t -quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, - char *name, uuid_t par); +quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this); inode_t * do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this, - quota_dentry_t *dentry); + quota_dentry_t *dentry, gf_boolean_t force); int quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict, loc_t *loc, struct iatt *buf, int32_t *op_errno); diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c index 60acac4..3f4af01 100644 --- a/xlators/storage/posix/src/posix.c +++ b/xlators/storage/posix/src/posix.c @@ -3504,7 +3504,7 @@ out: int32_t posix_links_in_same_directory (char *dirpath, int count, inode_t *leaf_inode, - inode_t *parent, uint64_t ino, + inode_t *parent, struct stat *stbuf, gf_dirent_t *head, char **path, int type, dict_t *xdata, int32_t *op_errno) { @@ -3540,7 +3540,7 @@ posix_links_in_same_directory (char *dirpath, int count, inode_t *leaf_inode, if ((result == NULL) || *op_errno) break; - if (entry->d_ino != ino) + if (entry->d_ino != stbuf->st_ino) continue; linked_inode = inode_link (leaf_inode, parent, @@ -3564,6 +3564,8 @@ posix_links_in_same_directory (char *dirpath, int count, inode_t *leaf_inode, gf_entry->dict = posix_xattr_fill (this, temppath, &loc, NULL, -1, xdata, NULL); + iatt_from_stat (&(gf_entry->d_stat), stbuf); + list_add_tail (&gf_entry->list, &head->list); loc_wipe (&loc); } @@ -3745,8 +3747,7 @@ posix_get_ancestry_non_directory (xlator_t *this, inode_t *leaf_inode, dirpath[strlen (dirpath) - 1] = '\0'; posix_links_in_same_directory (dirpath, nlink_samepgfid, - leaf_inode, - parent, stbuf.st_ino, head, + leaf_inode, parent, &stbuf, head, path, type, xdata, op_errno); if (parent != NULL) { -- 1.7.1