Blob Blame History Raw
From 753f881856efa3d55ea16dd5b088423da683c7f6 Mon Sep 17 00:00:00 2001
From: vmallika <vmallika@redhat.com>
Date: Thu, 13 Aug 2015 14:14:41 +0530
Subject: [PATCH 263/279] quota: fix parents caching during build ancestry

This is a backport of http://review.gluster.org/11574

In build ancestry, we get the list of parents for a file,
these parents are cached in inode ctx.
This caching is not happening because posix is not setting
d_stat information in the leaf node entry
This patch fixes the issue

Inode-ctx is not updated with new parent when
rename performed on same directory.
This patch fixes the issue

There is a possibility of caching stale entries,
consider below example:
1) build_ancestry invoked on a file
2) rename is invoked on the same file
3) buils_ancestry prepared entries of old parent
4) rename completed and in cbk old parent is replaced with
       new parent in inode ctx
5) now build_ancestry cbk adds old parent to inode ctx

In this patch we also remove stale entries in writev and fallocate

> Change-Id: Ib1854a41b47b14eb775326588352015c83d034de
> BUG: 1240949
> Signed-off-by: vmallika <vmallika@redhat.com>
> Reviewed-on: http://review.gluster.org/11574
> Tested-by: Gluster Build System <jenkins@build.gluster.com>
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>

BUG: 1245448
Change-Id: I71f514aa9285bb3493b9aee9443009b32d2c5a77
Signed-off-by: vmallika <vmallika@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/55062
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
---
 xlators/features/quota/src/quota.c |  147 ++++++++++++++++++------------------
 xlators/features/quota/src/quota.h |    5 +-
 xlators/storage/posix/src/posix.c  |    9 +-
 3 files changed, 82 insertions(+), 79 deletions(-)

diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c
index a444713..30445c4 100644
--- a/xlators/features/quota/src/quota.c
+++ b/xlators/features/quota/src/quota.c
@@ -640,7 +640,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         }
         UNLOCK (&ctx->lock);
 
-        quota_check_limit (frame, local->validate_loc.inode, this, NULL, NULL);
+        quota_check_limit (frame, local->validate_loc.inode, this);
         return 0;
 
 unwind:
@@ -1033,13 +1033,13 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
         if (local->par_frame) {
                 list_for_each_entry (entry, parents, next) {
                         parent = inode_find (inode->table, entry->par);
-                        quota_check_limit (frame, parent, this, NULL, NULL);
+                        quota_check_limit (frame, parent, this);
                         inode_unref (parent);
                 }
         } else {
                 list_for_each_entry (entry, parents, next) {
                         parent = do_quota_check_limit (frame, inode, this,
-                                                       entry);
+                                                       entry, _gf_true);
                         if (parent)
                                 inode_unref (parent);
                         else
@@ -1210,8 +1210,7 @@ out:
 
 
 int32_t
-quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
-                   char *name, uuid_t par)
+quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this)
 {
         int32_t            ret                 = -1, op_errno = EINVAL;
         inode_t           *_inode              = NULL, *parent = NULL;
@@ -1224,7 +1223,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         gf_boolean_t       hard_limit_exceeded = 0;
         int64_t            delta               = 0;
         uint64_t           value               = 0;
-        uuid_t             trav_uuid           = {0,};
         gf_boolean_t       skip_check          = _gf_false;
 
         GF_VALIDATE_OR_GOTO ("quota", this, err);
@@ -1272,10 +1270,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         }
         UNLOCK (&local->lock);
 
-        if ( par != NULL ) {
-                gf_uuid_copy (trav_uuid, par);
-        }
-
         do {
                 /* In a rename operation, enforce should be stopped at common
                    ancestor */
@@ -1319,13 +1313,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
                         break;
                 }
 
-                parent = inode_parent (_inode, trav_uuid, name);
-
-                if (name != NULL) {
-                        name = NULL;
-                        gf_uuid_clear (trav_uuid);
-                }
-
+                parent = inode_parent (_inode, 0, NULL);
                 if (parent == NULL) {
                         ret = quota_build_ancestry (_inode,
                                                  quota_check_limit_continuation,
@@ -1363,7 +1351,7 @@ err:
 
 inode_t *
 do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
-                      quota_dentry_t *dentry)
+                      quota_dentry_t *dentry, gf_boolean_t force)
 {
         int32_t         ret        = -1;
         inode_t        *parent     = NULL;
@@ -1374,8 +1362,12 @@ do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         local = frame->local;
 
         parent = inode_parent (inode, dentry->par, dentry->name);
-        if (parent == NULL)
-                parent = inode_find (inode->table, dentry->par);
+        if (parent == NULL) {
+                if (force)
+                        parent = inode_find (inode->table, dentry->par);
+                else
+                        goto out;
+        }
         if (parent == NULL)
                 goto out;
 
@@ -1391,7 +1383,7 @@ do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         new_frame->local = new_local;
         new_local->par_frame = frame;
 
-        quota_check_limit (new_frame, parent, this, NULL, NULL);
+        quota_check_limit (new_frame, parent, this);
 
         ret = 0;
 out:
@@ -1849,11 +1841,12 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
                 /* nameless lookup on this inode, allow quota to reconstruct
                  * ancestry as part of check_limit.
                  */
-                quota_check_limit (frame, fd->inode, this, NULL, NULL);
+                quota_check_limit (frame, fd->inode, this);
         } else {
                 list_for_each_entry_safe (dentry, tmp, &head, next) {
                         par_inode = do_quota_check_limit (frame, fd->inode,
-                                                          this, dentry);
+                                                          this, dentry,
+                                                          _gf_false);
                         if (par_inode == NULL) {
                                 /* remove stale entry from inode ctx */
                                 quota_dentry_del (ctx, dentry->name,
@@ -1872,7 +1865,7 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
                                 local->link_count++;
                         }
                         UNLOCK (&local->lock);
-                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
+                        quota_check_limit (frame, fd->inode, this);
                 }
 
                 while (fail_count != 0) {
@@ -1981,7 +1974,7 @@ quota_mkdir (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
         }
         UNLOCK (&local->lock);
 
-        quota_check_limit (frame, loc->parent, this, NULL, NULL);
+        quota_check_limit (frame, loc->parent, this);
         return 0;
 
 err:
@@ -2130,7 +2123,7 @@ quota_create (call_frame_t *frame, xlator_t *this, loc_t *loc, int32_t flags,
         }
         UNLOCK (&local->lock);
 
-        quota_check_limit (frame, loc->parent, this, NULL, NULL);
+        quota_check_limit (frame, loc->parent, this);
         return 0;
 err:
         QUOTA_STACK_UNWIND (create, frame, -1, op_errno, NULL, NULL, NULL,
@@ -2373,20 +2366,22 @@ quota_link_continue (call_frame_t *frame)
                  * This needs re-vist if marker accounts only once
                  * for the links created across directories
                  */
-                src_parent = inode_parent (local->oldloc.inode, 0, NULL);
-                dst_parent = inode_parent (local->newloc.inode, 0, NULL);
+                if (local->oldloc.parent)
+                        src_parent = inode_ref (local->oldloc.parent);
+                else
+                        src_parent = inode_parent (local->oldloc.inode, 0,
+                                                   NULL);
+                dst_parent = local->newloc.parent;
 
                 /* No need to check quota limit if src and dst parents are same
                  */
                 if (src_parent == dst_parent ||
                     gf_uuid_compare (src_parent->gfid, dst_parent->gfid) == 0) {
                         inode_unref (src_parent);
-                        inode_unref (dst_parent);
-                        goto off;
+                        goto wind;
                 }
 
                 inode_unref (src_parent);
-                inode_unref (dst_parent);
         }
 
         quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0);
@@ -2405,7 +2400,7 @@ quota_link_continue (call_frame_t *frame)
         }
         UNLOCK (&local->lock);
 
-        quota_check_limit (frame, local->newloc.parent, this, NULL, NULL);
+        quota_check_limit (frame, local->newloc.parent, this);
         return;
 
 err:
@@ -2413,14 +2408,10 @@ err:
                             NULL, NULL, NULL);
         return;
 
-off:
-        frame->local = NULL;
-
-        STACK_WIND_TAIL (frame, FIRST_CHILD(this),
-                         FIRST_CHILD(this)->fops->link, &(local->oldloc),
-                         &(local->newloc), local->xdata);
-
-        quota_local_cleanup (local);
+wind:
+        STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this),
+                    FIRST_CHILD(this)->fops->link, &(local->oldloc),
+                    &(local->newloc), local->xdata);
         return;
 }
 
@@ -2438,15 +2429,6 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
 
         WIND_IF_QUOTAOFF (priv->is_quota_on, off);
 
-        /* No need to check quota limit if src and dst parents are same */
-        if (oldloc->parent && newloc->parent &&
-            !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
-                gf_msg_debug (this->name, GF_LOG_DEBUG, "link %s -> %s are "
-                              "in the same directory, so skip check limit",
-                              oldloc->path, newloc->path);
-                goto off;
-        }
-
         local = quota_local_new ();
         if (local == NULL) {
                 goto err;
@@ -2478,6 +2460,15 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
                 goto err;
         }
 
+        /* No need to check quota limit if src and dst parents are same */
+        if (oldloc->parent && newloc->parent &&
+            !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
+                gf_msg_debug (this->name, GF_LOG_DEBUG, "link %s -> %s are "
+                              "in the same directory, so skip check limit",
+                              oldloc->path, newloc->path);
+                goto wind;
+        }
+
         stub = fop_link_stub (frame, quota_link_helper, oldloc, newloc, xdata);
         if (stub == NULL) {
                 goto err;
@@ -2494,7 +2485,11 @@ quota_link (call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
         check_ancestory (frame, newloc->parent);
 
         /* source parent can be NULL, so do check_ancestory on a file */
-        check_ancestory (frame, oldloc->inode);
+        if (oldloc->parent)
+                check_ancestory (frame, oldloc->parent);
+        else
+                check_ancestory (frame, oldloc->inode);
+
         return 0;
 
 err:
@@ -2507,6 +2502,12 @@ off:
                          FIRST_CHILD(this)->fops->link, oldloc,
                          newloc, xdata);
         return 0;
+
+wind:
+        STACK_WIND (frame, quota_link_cbk, FIRST_CHILD(this),
+                    FIRST_CHILD(this)->fops->link, oldloc,
+                    newloc, xdata);
+        return 0;
 }
 
 
@@ -2532,13 +2533,10 @@ quota_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
         GF_VALIDATE_OR_GOTO ("quota", local, out);
 
-        if (QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) {
+        if (QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type))
                 size = buf->ia_blocks * 512;
-        }
-
-        if (!QUOTA_REG_OR_LNK_FILE (local->oldloc.inode->ia_type)) {
+        else
                 goto out;
-        }
 
         ret = quota_inode_ctx_get (local->oldloc.inode, this, &ctx, 0);
         if ((ret == -1) || (ctx == NULL)) {
@@ -2674,8 +2672,7 @@ quota_rename_get_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                 goto out;
         }
         local->delta = ntoh64 (*size);
-        quota_check_limit (frame, local->newloc.parent, this,
-                           NULL, NULL);
+        quota_check_limit (frame, local->newloc.parent, this);
         return 0;
 
 out:
@@ -2759,7 +2756,7 @@ quota_rename_continue (call_frame_t *frame)
                         return;
         }
 
-        quota_check_limit (frame, local->newloc.parent, this, NULL, NULL);
+        quota_check_limit (frame, local->newloc.parent, this);
         return;
 
 err:
@@ -2783,15 +2780,6 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
 
         WIND_IF_QUOTAOFF (priv->is_quota_on, off);
 
-        /* No need to check quota limit if src and dst parents are same */
-        if (oldloc->parent && newloc->parent &&
-            !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
-                gf_msg_debug (this->name, 0, "rename %s -> %s are "
-                              "in the same directory, so skip check limit",
-                              oldloc->path, newloc->path);
-                goto off;
-        }
-
         local = quota_local_new ();
         if (local == NULL) {
                 goto err;
@@ -2813,6 +2801,15 @@ quota_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                 goto err;
         }
 
+        /* No need to check quota limit if src and dst parents are same */
+        if (oldloc->parent && newloc->parent &&
+            !gf_uuid_compare(oldloc->parent->gfid, newloc->parent->gfid)) {
+                gf_msg_debug (this->name, 0, "rename %s -> %s are "
+                              "in the same directory, so skip check limit",
+                              oldloc->path, newloc->path);
+                goto wind;
+        }
+
         stub = fop_rename_stub (frame, quota_rename_helper, oldloc, newloc,
                                 xdata);
         if (stub == NULL) {
@@ -2843,7 +2840,12 @@ off:
         STACK_WIND_TAIL (frame, FIRST_CHILD(this),
                          FIRST_CHILD(this)->fops->rename, oldloc,
                          newloc, xdata);
+        return 0;
 
+wind:
+        STACK_WIND (frame, quota_rename_cbk,
+                    FIRST_CHILD(this), FIRST_CHILD(this)->fops->rename, oldloc,
+                    newloc, xdata);
         return 0;
 }
 
@@ -2973,7 +2975,7 @@ quota_symlink (call_frame_t *frame, xlator_t *this, const char *linkpath,
         }
         UNLOCK (&local->lock);
 
-        quota_check_limit (frame, loc->parent, this, NULL, NULL);
+        quota_check_limit (frame, loc->parent, this);
         return 0;
 
 err:
@@ -3915,7 +3917,7 @@ quota_mknod (call_frame_t *frame, xlator_t *this, loc_t *loc, mode_t mode,
         }
         UNLOCK (&local->lock);
 
-        quota_check_limit (frame, loc->parent, this, NULL, NULL);
+        quota_check_limit (frame, loc->parent, this);
         return 0;
 
 err:
@@ -4855,11 +4857,12 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
 
         if (parents == 0) {
                 local->link_count = 1;
-                quota_check_limit (frame, fd->inode, this, NULL, NULL);
+                quota_check_limit (frame, fd->inode, this);
         } else {
                 list_for_each_entry_safe (dentry, tmp, &head, next) {
                         par_inode = do_quota_check_limit (frame, fd->inode,
-                                                          this, dentry);
+                                                          this, dentry,
+                                                          _gf_false);
                         if (par_inode == NULL) {
                                 /* remove stale entry from inode_ctx */
                                 quota_dentry_del (ctx, dentry->name,
@@ -4878,7 +4881,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
                                 local->link_count++;
                         }
                         UNLOCK (&local->lock);
-                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
+                        quota_check_limit (frame, fd->inode, this);
                 }
 
                 while (fail_count != 0) {
diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h
index 0f8cecd..db27e1e 100644
--- a/xlators/features/quota/src/quota.h
+++ b/xlators/features/quota/src/quota.h
@@ -263,12 +263,11 @@ void
 quota_get_limit_dir (call_frame_t *frame, inode_t *cur_inode, xlator_t *this);
 
 int32_t
-quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
-                   char *name, uuid_t par);
+quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this);
 
 inode_t *
 do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
-                      quota_dentry_t *dentry);
+                      quota_dentry_t *dentry, gf_boolean_t force);
 int
 quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict,
                      loc_t *loc, struct iatt *buf, int32_t *op_errno);
diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c
index 60acac4..3f4af01 100644
--- a/xlators/storage/posix/src/posix.c
+++ b/xlators/storage/posix/src/posix.c
@@ -3504,7 +3504,7 @@ out:
 
 int32_t
 posix_links_in_same_directory (char *dirpath, int count, inode_t *leaf_inode,
-                               inode_t *parent, uint64_t ino,
+                               inode_t *parent, struct stat *stbuf,
                                gf_dirent_t *head, char **path,
                                int type, dict_t *xdata, int32_t *op_errno)
 {
@@ -3540,7 +3540,7 @@ posix_links_in_same_directory (char *dirpath, int count, inode_t *leaf_inode,
                 if ((result == NULL) || *op_errno)
                         break;
 
-                if (entry->d_ino != ino)
+                if (entry->d_ino != stbuf->st_ino)
                         continue;
 
                 linked_inode = inode_link (leaf_inode, parent,
@@ -3564,6 +3564,8 @@ posix_links_in_same_directory (char *dirpath, int count, inode_t *leaf_inode,
                         gf_entry->dict
                                 = posix_xattr_fill (this, temppath, &loc, NULL,
                                                     -1, xdata, NULL);
+                        iatt_from_stat (&(gf_entry->d_stat), stbuf);
+
                         list_add_tail (&gf_entry->list, &head->list);
                         loc_wipe (&loc);
                 }
@@ -3745,8 +3747,7 @@ posix_get_ancestry_non_directory (xlator_t *this, inode_t *leaf_inode,
                 dirpath[strlen (dirpath) - 1] = '\0';
 
                 posix_links_in_same_directory (dirpath, nlink_samepgfid,
-                                               leaf_inode,
-                                               parent, stbuf.st_ino, head,
+                                               leaf_inode, parent, &stbuf, head,
                                                path, type, xdata, op_errno);
 
                 if (parent != NULL) {
-- 
1.7.1