7f4c2a
From 3d8c0c9d3cabc61172b1d5d767a33cf334ddf59c Mon Sep 17 00:00:00 2001
7f4c2a
From: vmallika <vmallika@redhat.com>
7f4c2a
Date: Thu, 13 Aug 2015 14:09:22 +0530
7f4c2a
Subject: [PATCH 258/279] quota: Fix crash in quota enforcer
7f4c2a
7f4c2a
This is a backport of http://review.gluster.org/11510
7f4c2a
7f4c2a
With multiple hardlinks check_quota_limit is invoked for each parent
7f4c2a
each of this check_limit can invoke validation
7f4c2a
this can cause frame->local to get corrupted during validation.
7f4c2a
7f4c2a
Testcase tests/bugs/quota/bug-1235182.t fails spuriously with
7f4c2a
this problem
7f4c2a
7f4c2a
> Change-Id: I53adc54b431fb5f43e67a94248102ddaf0d7978f
7f4c2a
> BUG: 1238747
7f4c2a
> Signed-off-by: vmallika <vmallika@redhat.com>
7f4c2a
> Reviewed-on: http://review.gluster.org/11510
7f4c2a
> Tested-by: NetBSD Build System <jenkins@build.gluster.org>
7f4c2a
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
7f4c2a
7f4c2a
BUG: 1238049
7f4c2a
Change-Id: Iac8d3c54c81536c8d4ba07c2c34ea79d0701b714
7f4c2a
Signed-off-by: vmallika <vmallika@redhat.com>
7f4c2a
Reviewed-on: https://code.engineering.redhat.com/gerrit/55060
7f4c2a
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
7f4c2a
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
7f4c2a
---
7f4c2a
 tests/bugs/quota/bug-1235182.t     |   10 +-
7f4c2a
 xlators/features/quota/src/quota.c |  408 +++++++++++++++++++++++++-----------
7f4c2a
 xlators/features/quota/src/quota.h |    9 +-
7f4c2a
 3 files changed, 297 insertions(+), 130 deletions(-)
7f4c2a
7f4c2a
diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t
7f4c2a
index 0bd43a9..e28b557 100644
7f4c2a
--- a/tests/bugs/quota/bug-1235182.t
7f4c2a
+++ b/tests/bugs/quota/bug-1235182.t
7f4c2a
@@ -9,12 +9,6 @@
7f4c2a
 
7f4c2a
 cleanup;
7f4c2a
 
7f4c2a
-function usage()
7f4c2a
-{
7f4c2a
-        local QUOTA_PATH=$1;
7f4c2a
-        $CLI volume quota $V0 list $QUOTA_PATH | grep "$QUOTA_PATH" | awk '{print $4}'
7f4c2a
-}
7f4c2a
-
7f4c2a
 QDD=$(dirname $0)/quota
7f4c2a
 # compile the test write program and run it
7f4c2a
 build_tester $(dirname $0)/../../basic/quota.c -o $QDD
7f4c2a
@@ -23,7 +17,7 @@ TEST glusterd
7f4c2a
 TEST pidof glusterd;
7f4c2a
 TEST $CLI volume info;
7f4c2a
 
7f4c2a
-TEST $CLI volume create $V0 $H0:$B0/${V0}{1};
7f4c2a
+TEST $CLI volume create $V0 $H0:$B0/${V0};
7f4c2a
 TEST $CLI volume start $V0;
7f4c2a
 
7f4c2a
 TEST $CLI volume quota $V0 enable;
7f4c2a
@@ -47,7 +41,7 @@ echo "Wait for process with pid $PID to complete"
7f4c2a
 wait $PID
7f4c2a
 echo "Process with pid $PID finished"
7f4c2a
 
7f4c2a
-EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "100.0MB" usage "/"
7f4c2a
+EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "100.0MB" quotausage "/"
7f4c2a
 
7f4c2a
 TEST $CLI volume stop $V0
7f4c2a
 TEST $CLI volume delete $V0
7f4c2a
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c
7f4c2a
index df0572f..16ad0fc 100644
7f4c2a
--- a/xlators/features/quota/src/quota.c
7f4c2a
+++ b/xlators/features/quota/src/quota.c
7f4c2a
@@ -155,7 +155,7 @@ err:
7f4c2a
 
7f4c2a
 
7f4c2a
 int32_t
7f4c2a
-quota_local_cleanup (xlator_t *this, quota_local_t *local)
7f4c2a
+quota_local_cleanup (quota_local_t *local)
7f4c2a
 {
7f4c2a
         if (local == NULL) {
7f4c2a
                 goto out;
7f4c2a
@@ -243,6 +243,31 @@ out:
7f4c2a
         return;
7f4c2a
 }
7f4c2a
 
7f4c2a
+void
7f4c2a
+__quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par)
7f4c2a
+{
7f4c2a
+        quota_dentry_t    *dentry = NULL;
7f4c2a
+        quota_dentry_t    *tmp    = NULL;
7f4c2a
+
7f4c2a
+        list_for_each_entry_safe (dentry, tmp, &ctx->parents, next) {
7f4c2a
+                if ((strcmp (dentry->name, name) == 0) &&
7f4c2a
+                    (gf_uuid_compare (dentry->par, par) == 0)) {
7f4c2a
+                        __quota_dentry_free (dentry);
7f4c2a
+                        break;
7f4c2a
+                }
7f4c2a
+        }
7f4c2a
+}
7f4c2a
+
7f4c2a
+void
7f4c2a
+quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par)
7f4c2a
+{
7f4c2a
+        LOCK (&ctx->lock);
7f4c2a
+        {
7f4c2a
+                __quota_dentry_del (ctx, name, par);
7f4c2a
+        }
7f4c2a
+        UNLOCK (&ctx->lock);
7f4c2a
+}
7f4c2a
+
7f4c2a
 static inline inode_t*
7f4c2a
 __quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name)
7f4c2a
 {
7f4c2a
@@ -485,10 +510,18 @@ out:
7f4c2a
 }
7f4c2a
 
7f4c2a
 static inline void
7f4c2a
-quota_link_count_decrement (quota_local_t *local)
7f4c2a
+quota_link_count_decrement (call_frame_t *frame)
7f4c2a
 {
7f4c2a
-        call_stub_t *stub       = NULL;
7f4c2a
-        int          link_count = -1;
7f4c2a
+        call_frame_t   *tmpframe   = NULL;
7f4c2a
+        quota_local_t  *local      = NULL;
7f4c2a
+        call_stub_t    *stub       = NULL;
7f4c2a
+        int             link_count = -1;
7f4c2a
+
7f4c2a
+        local = frame->local;
7f4c2a
+        if (local && local->par_frame) {
7f4c2a
+                local = local->par_frame->local;
7f4c2a
+                tmpframe = frame;
7f4c2a
+        }
7f4c2a
 
7f4c2a
         if (local == NULL)
7f4c2a
                 goto out;
7f4c2a
@@ -506,14 +539,30 @@ quota_link_count_decrement (quota_local_t *local)
7f4c2a
         if (stub != NULL) {
7f4c2a
                 call_resume (stub);
7f4c2a
         }
7f4c2a
+
7f4c2a
 out:
7f4c2a
+        if (tmpframe) {
7f4c2a
+                local = tmpframe->local;
7f4c2a
+                tmpframe->local = NULL;
7f4c2a
+
7f4c2a
+                STACK_DESTROY (frame->root);
7f4c2a
+                if (local)
7f4c2a
+                        quota_local_cleanup (local);
7f4c2a
+        }
7f4c2a
+
7f4c2a
         return;
7f4c2a
 }
7f4c2a
 
7f4c2a
 static inline void
7f4c2a
-quota_handle_validate_error (quota_local_t *local, int32_t op_ret,
7f4c2a
+quota_handle_validate_error (call_frame_t *frame, int32_t op_ret,
7f4c2a
                              int32_t op_errno)
7f4c2a
 {
7f4c2a
+        quota_local_t  *local;
7f4c2a
+
7f4c2a
+        local = frame->local;
7f4c2a
+        if (local && local->par_frame)
7f4c2a
+                local = local->par_frame->local;
7f4c2a
+
7f4c2a
         if (local == NULL)
7f4c2a
                 goto out;
7f4c2a
 
7f4c2a
@@ -527,7 +576,7 @@ quota_handle_validate_error (quota_local_t *local, int32_t op_ret,
7f4c2a
         UNLOCK (&local->lock);
7f4c2a
 
7f4c2a
         /* we abort checking limits on this path to root */
7f4c2a
-        quota_link_count_decrement (local);
7f4c2a
+        quota_link_count_decrement (frame);
7f4c2a
 out:
7f4c2a
         return;
7f4c2a
 }
7f4c2a
@@ -595,7 +644,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
         return 0;
7f4c2a
 
7f4c2a
 unwind:
7f4c2a
-        quota_handle_validate_error (local, op_ret, op_errno);
7f4c2a
+        quota_handle_validate_error (frame, op_ret, op_errno);
7f4c2a
         return 0;
7f4c2a
 }
7f4c2a
 
7f4c2a
@@ -622,27 +671,65 @@ quota_timeout (struct timeval *tv, int32_t timeout)
7f4c2a
         return timed_out;
7f4c2a
 }
7f4c2a
 
7f4c2a
-static inline void
7f4c2a
-quota_add_parent (quota_dentry_t *dentry, struct list_head *list)
7f4c2a
+/* Return: 1 if new entry added
7f4c2a
+ *         0 no entry added
7f4c2a
+ */
7f4c2a
+static inline int32_t
7f4c2a
+quota_add_parent (struct list_head *list, char *name, uuid_t pgfid)
7f4c2a
 {
7f4c2a
         quota_dentry_t *entry = NULL;
7f4c2a
         gf_boolean_t    found = _gf_false;
7f4c2a
 
7f4c2a
-        if ((dentry == NULL) || (list == NULL)) {
7f4c2a
+        if (list == NULL) {
7f4c2a
                 goto out;
7f4c2a
         }
7f4c2a
 
7f4c2a
         list_for_each_entry (entry, list, next) {
7f4c2a
-                if (gf_uuid_compare (dentry->par, entry->par) == 0) {
7f4c2a
+                if (gf_uuid_compare (pgfid, entry->par) == 0) {
7f4c2a
                         found = _gf_true;
7f4c2a
                         goto out;
7f4c2a
                 }
7f4c2a
         }
7f4c2a
 
7f4c2a
-        list_add_tail (&dentry->next, list);
7f4c2a
+        entry = __quota_dentry_new (NULL, name, pgfid);
7f4c2a
+        list_add_tail (&entry->next, list);
7f4c2a
 
7f4c2a
 out:
7f4c2a
-        return;
7f4c2a
+        if (found)
7f4c2a
+                return 0;
7f4c2a
+        else
7f4c2a
+                return 1;
7f4c2a
+
7f4c2a
+}
7f4c2a
+
7f4c2a
+/* This function iterates the parent list in inode
7f4c2a
+ * context and add unique parent to the list
7f4c2a
+ * Returns number of dentry added to the list
7f4c2a
+ */
7f4c2a
+static inline int32_t
7f4c2a
+quota_add_parents_from_ctx (quota_inode_ctx_t *ctx, struct list_head *list)
7f4c2a
+{
7f4c2a
+        int                ret     = 0;
7f4c2a
+        quota_dentry_t    *dentry  = NULL;
7f4c2a
+        int32_t            count   = 0;
7f4c2a
+
7f4c2a
+        if (ctx == NULL || list == NULL)
7f4c2a
+                goto out;
7f4c2a
+
7f4c2a
+        LOCK (&ctx->lock);
7f4c2a
+        {
7f4c2a
+                list_for_each_entry (dentry, &ctx->parents, next) {
7f4c2a
+                        ret = quota_add_parent (list, dentry->name,
7f4c2a
+                                                dentry->par);
7f4c2a
+
7f4c2a
+                        if (ret == 1)
7f4c2a
+                                count++;
7f4c2a
+                }
7f4c2a
+        }
7f4c2a
+        UNLOCK (&ctx->lock);
7f4c2a
+
7f4c2a
+out:
7f4c2a
+        return count;
7f4c2a
 }
7f4c2a
 
7f4c2a
 int32_t
7f4c2a
@@ -708,23 +795,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
 
7f4c2a
         quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);
7f4c2a
 
7f4c2a
-        if (ctx != NULL) {
7f4c2a
-                LOCK (&ctx->lock);
7f4c2a
-                {
7f4c2a
-                        list_for_each_entry (dentry, &ctx->parents, next) {
7f4c2a
-                                /* we built ancestry for a non-directory */
7f4c2a
-                                tmp = __quota_dentry_new (NULL, dentry->name,
7f4c2a
-                                                          dentry->par);
7f4c2a
-                                quota_add_parent (tmp, &parents);
7f4c2a
-
7f4c2a
-                                if (list_empty (&tmp->next)) {
7f4c2a
-                                        __quota_dentry_free (tmp);
7f4c2a
-                                        tmp = NULL;
7f4c2a
-                                }
7f4c2a
-                        }
7f4c2a
-                }
7f4c2a
-                UNLOCK (&ctx->lock);
7f4c2a
-        }
7f4c2a
+        quota_add_parents_from_ctx (ctx, &parents);
7f4c2a
 
7f4c2a
         if (list_empty (&parents)) {
7f4c2a
                 /* we built ancestry for a directory */
7f4c2a
@@ -735,8 +806,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
 
7f4c2a
                 GF_ASSERT (&entry->list != &entries->list);
7f4c2a
 
7f4c2a
-                tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid);
7f4c2a
-                quota_add_parent (tmp, &parents);
7f4c2a
+                quota_add_parent (&parents, entry->d_name, parent->gfid);
7f4c2a
         }
7f4c2a
 
7f4c2a
         local->ancestry_cbk (&parents, local->loc.inode, 0, 0,
7f4c2a
@@ -748,7 +818,7 @@ err:
7f4c2a
 
7f4c2a
 cleanup:
7f4c2a
         STACK_DESTROY (frame->root);
7f4c2a
-        quota_local_cleanup (this, local);
7f4c2a
+        quota_local_cleanup (local);
7f4c2a
 
7f4c2a
         if (parent != NULL) {
7f4c2a
                 inode_unref (parent);
7f4c2a
@@ -838,7 +908,7 @@ err:
7f4c2a
                 }
7f4c2a
 
7f4c2a
                 if (local)
7f4c2a
-                        quota_local_cleanup (this, local);
7f4c2a
+                        quota_local_cleanup (local);
7f4c2a
         }
7f4c2a
 
7f4c2a
         return 0;
7f4c2a
@@ -919,6 +989,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
7f4c2a
         call_frame_t   *frame        = NULL;
7f4c2a
         xlator_t       *this         = NULL;
7f4c2a
         quota_local_t  *local        = NULL;
7f4c2a
+        quota_local_t  *par_local    = NULL;
7f4c2a
         quota_dentry_t *entry        = NULL;
7f4c2a
         inode_t        *parent       = NULL;
7f4c2a
         int             parent_count = 0;
7f4c2a
@@ -927,6 +998,12 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
7f4c2a
         local = frame->local;
7f4c2a
         this = THIS;
7f4c2a
 
7f4c2a
+        if (local->par_frame)
7f4c2a
+                par_local = local->par_frame->local;
7f4c2a
+        else
7f4c2a
+                par_local = local;
7f4c2a
+
7f4c2a
+
7f4c2a
         if ((op_ret < 0) || list_empty (parents)) {
7f4c2a
                 if (op_ret >= 0) {
7f4c2a
                         gf_msg (this->name, GF_LOG_WARNING, EIO,
7f4c2a
@@ -939,7 +1016,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
7f4c2a
                         op_errno = EIO;
7f4c2a
                 }
7f4c2a
 
7f4c2a
-                quota_handle_validate_error (local, -1, op_errno);
7f4c2a
+                quota_handle_validate_error (frame, -1, op_errno);
7f4c2a
                 goto out;
7f4c2a
         }
7f4c2a
 
7f4c2a
@@ -947,16 +1024,27 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
7f4c2a
                 parent_count++;
7f4c2a
         }
7f4c2a
 
7f4c2a
-        LOCK (&local->lock);
7f4c2a
+        LOCK (&par_local->lock);
7f4c2a
         {
7f4c2a
-                local->link_count += (parent_count - 1);
7f4c2a
+                par_local->link_count += (parent_count - 1);
7f4c2a
         }
7f4c2a
-        UNLOCK (&local->lock);
7f4c2a
+        UNLOCK (&par_local->lock);
7f4c2a
 
7f4c2a
-        list_for_each_entry (entry, parents, next) {
7f4c2a
-                parent = inode_find (inode->table, entry->par);
7f4c2a
-                quota_check_limit (frame, parent, this, NULL, NULL);
7f4c2a
-                inode_unref (parent);
7f4c2a
+        if (local->par_frame) {
7f4c2a
+                list_for_each_entry (entry, parents, next) {
7f4c2a
+                        parent = inode_find (inode->table, entry->par);
7f4c2a
+                        quota_check_limit (frame, parent, this, NULL, NULL);
7f4c2a
+                        inode_unref (parent);
7f4c2a
+                }
7f4c2a
+        } else {
7f4c2a
+                list_for_each_entry (entry, parents, next) {
7f4c2a
+                        parent = do_quota_check_limit (frame, inode, this,
7f4c2a
+                                                       entry);
7f4c2a
+                        if (parent)
7f4c2a
+                                inode_unref (parent);
7f4c2a
+                        else
7f4c2a
+                                quota_link_count_decrement (frame);
7f4c2a
+                }
7f4c2a
         }
7f4c2a
 
7f4c2a
 out:
7f4c2a
@@ -1129,6 +1217,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
         quota_inode_ctx_t *ctx                 = NULL;
7f4c2a
         quota_priv_t      *priv                = NULL;
7f4c2a
         quota_local_t     *local               = NULL;
7f4c2a
+        quota_local_t     *par_local           = NULL;
7f4c2a
         char               need_validate       = 0;
7f4c2a
         char               just_validated      = 0;
7f4c2a
         gf_boolean_t       hard_limit_exceeded = 0;
7f4c2a
@@ -1144,9 +1233,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
         local  = frame->local;
7f4c2a
         GF_VALIDATE_OR_GOTO (this->name, local, err);
7f4c2a
 
7f4c2a
-        delta = local->delta;
7f4c2a
+        if (local->par_frame) {
7f4c2a
+                par_local = local->par_frame->local;
7f4c2a
+                GF_VALIDATE_OR_GOTO (this->name, par_local, err);
7f4c2a
+        } else {
7f4c2a
+                par_local = local;
7f4c2a
+        }
7f4c2a
+
7f4c2a
+        delta = par_local->delta;
7f4c2a
 
7f4c2a
-        GF_VALIDATE_OR_GOTO (this->name, local->stub, err);
7f4c2a
+        GF_VALIDATE_OR_GOTO (this->name, par_local->stub, err);
7f4c2a
         /* Allow all the trusted clients
7f4c2a
          * Don't block the gluster internal processes like rebalance, gsyncd,
7f4c2a
          * self heal etc from the disk quotas.
7f4c2a
@@ -1157,7 +1253,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
          */
7f4c2a
         if (0 > frame->root->pid) {
7f4c2a
                 ret = 0;
7f4c2a
-                quota_link_count_decrement (local);
7f4c2a
+                quota_link_count_decrement (frame);
7f4c2a
                 goto done;
7f4c2a
         }
7f4c2a
 
7f4c2a
@@ -1182,15 +1278,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
         do {
7f4c2a
                 /* In a rename operation, enforce should be stopped at common
7f4c2a
                    ancestor */
7f4c2a
-                if (!gf_uuid_is_null (local->common_ancestor) &&
7f4c2a
-                    !gf_uuid_compare (_inode->gfid, local->common_ancestor)) {
7f4c2a
-                        quota_link_count_decrement (local);
7f4c2a
+                if (!gf_uuid_is_null (par_local->common_ancestor) &&
7f4c2a
+                    !gf_uuid_compare (_inode->gfid, par_local->common_ancestor)
7f4c2a
+                   ) {
7f4c2a
+                        quota_link_count_decrement (frame);
7f4c2a
                         break;
7f4c2a
                 }
7f4c2a
 
7f4c2a
                 ret = quota_check_object_limit (frame, ctx, priv, _inode, this,
7f4c2a
                                                 &op_errno, just_validated,
7f4c2a
-                                                local, &skip_check);
7f4c2a
+                                                par_local, &skip_check);
7f4c2a
                 if (skip_check == _gf_true)
7f4c2a
                         goto done;
7f4c2a
 
7f4c2a
@@ -1203,7 +1300,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
 
7f4c2a
                 ret = quota_check_size_limit (frame, ctx, priv, _inode, this,
7f4c2a
                                               &op_errno, just_validated, delta,
7f4c2a
-                                              local, &skip_check);
7f4c2a
+                                              par_local, &skip_check);
7f4c2a
                 if (skip_check == _gf_true)
7f4c2a
                         goto done;
7f4c2a
 
7f4c2a
@@ -1215,7 +1312,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
                 }
7f4c2a
 
7f4c2a
                 if (__is_root_gfid (_inode->gfid)) {
7f4c2a
-                        quota_link_count_decrement (local);
7f4c2a
+                        quota_link_count_decrement (frame);
7f4c2a
                         break;
7f4c2a
                 }
7f4c2a
 
7f4c2a
@@ -1228,8 +1325,8 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
 
7f4c2a
                 if (parent == NULL) {
7f4c2a
                         ret = quota_build_ancestry (_inode,
7f4c2a
-                                                    quota_check_limit_continuation,
7f4c2a
-                                                    frame);
7f4c2a
+                                                 quota_check_limit_continuation,
7f4c2a
+                                                 frame);
7f4c2a
                         if (ret < 0) {
7f4c2a
                                 op_errno = -ret;
7f4c2a
                                 goto err;
7f4c2a
@@ -1247,7 +1344,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
                 ctx = (quota_inode_ctx_t *)(unsigned long)value;
7f4c2a
         } while (1);
7f4c2a
 
7f4c2a
-
7f4c2a
 done:
7f4c2a
         if (_inode != NULL) {
7f4c2a
                 inode_unref (_inode);
7f4c2a
@@ -1256,12 +1352,66 @@ done:
7f4c2a
         return 0;
7f4c2a
 
7f4c2a
 err:
7f4c2a
-        quota_handle_validate_error (local, -1, op_errno);
7f4c2a
+        quota_handle_validate_error (frame, -1, op_errno);
7f4c2a
 
7f4c2a
         inode_unref (_inode);
7f4c2a
         return 0;
7f4c2a
 }
7f4c2a
 
7f4c2a
+inode_t *
7f4c2a
+do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
+                      quota_dentry_t *dentry)
7f4c2a
+{
7f4c2a
+        int32_t         ret        = -1;
7f4c2a
+        inode_t        *parent     = NULL;
7f4c2a
+        call_frame_t   *new_frame  = NULL;
7f4c2a
+        quota_local_t  *local      = NULL;
7f4c2a
+        quota_local_t  *new_local  = NULL;
7f4c2a
+
7f4c2a
+        local = frame->local;
7f4c2a
+
7f4c2a
+        parent = inode_parent (inode, dentry->par, dentry->name);
7f4c2a
+        if (parent == NULL)
7f4c2a
+                parent = inode_find (inode->table, dentry->par);
7f4c2a
+        if (parent == NULL)
7f4c2a
+                goto out;
7f4c2a
+
7f4c2a
+        new_frame = create_frame (this, this->ctx->pool);
7f4c2a
+        if (new_frame == NULL)
7f4c2a
+                goto out;
7f4c2a
+
7f4c2a
+        new_local = quota_local_new ();
7f4c2a
+        if (new_local == NULL)
7f4c2a
+                goto out;
7f4c2a
+
7f4c2a
+        new_frame->root->uid = new_frame->root->gid = 0;
7f4c2a
+        new_frame->local = new_local;
7f4c2a
+        new_local->par_frame = frame;
7f4c2a
+
7f4c2a
+        quota_check_limit (new_frame, parent, this, NULL, NULL);
7f4c2a
+
7f4c2a
+        ret = 0;
7f4c2a
+out:
7f4c2a
+        if (ret < 0) {
7f4c2a
+                if (parent) {
7f4c2a
+                        /* Caller should decrement link_count, in case parent is
7f4c2a
+                         * NULL
7f4c2a
+                         */
7f4c2a
+                        quota_handle_validate_error (frame, -1, ENOMEM);
7f4c2a
+                }
7f4c2a
+
7f4c2a
+                if (new_frame) {
7f4c2a
+                        new_frame->local = NULL;
7f4c2a
+                        STACK_DESTROY (new_frame->root);
7f4c2a
+                }
7f4c2a
+
7f4c2a
+                if (new_local)
7f4c2a
+                        quota_local_cleanup (new_local);
7f4c2a
+        }
7f4c2a
+
7f4c2a
+        return parent;
7f4c2a
+}
7f4c2a
+
7f4c2a
 static inline int
7f4c2a
 quota_get_limits (xlator_t *this, dict_t *dict, int64_t *hard_lim,
7f4c2a
                   int64_t *soft_lim, int64_t *object_hard_limit,
7f4c2a
@@ -1440,7 +1590,7 @@ out:
7f4c2a
         if (this_inode)
7f4c2a
                 inode_unref (this_inode);
7f4c2a
 
7f4c2a
-        quota_local_cleanup (this, local);
7f4c2a
+        quota_local_cleanup (local);
7f4c2a
 
7f4c2a
         return 0;
7f4c2a
 }
7f4c2a
@@ -1632,15 +1782,17 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
7f4c2a
               struct iovec *vector, int32_t count, off_t off,
7f4c2a
               uint32_t flags, struct iobref *iobref, dict_t *xdata)
7f4c2a
 {
7f4c2a
-        quota_priv_t      *priv    = NULL;
7f4c2a
-        int32_t            ret     = -1, op_errno = EINVAL;
7f4c2a
-        int32_t            parents = 0;
7f4c2a
-        uint64_t           size    = 0;
7f4c2a
-        quota_local_t     *local   = NULL;
7f4c2a
-        quota_inode_ctx_t *ctx     = NULL;
7f4c2a
-        quota_dentry_t    *dentry  = NULL, *tmp = NULL;
7f4c2a
-        call_stub_t       *stub    = NULL;
7f4c2a
-        struct list_head   head    = {0, };
7f4c2a
+        quota_priv_t      *priv       = NULL;
7f4c2a
+        int32_t            ret        = -1, op_errno = EINVAL;
7f4c2a
+        int32_t            parents    = 0;
7f4c2a
+        int32_t            fail_count = 0;
7f4c2a
+        uint64_t           size       = 0;
7f4c2a
+        quota_local_t     *local      = NULL;
7f4c2a
+        quota_inode_ctx_t *ctx        = NULL;
7f4c2a
+        quota_dentry_t    *dentry     = NULL, *tmp = NULL;
7f4c2a
+        call_stub_t       *stub       = NULL;
7f4c2a
+        struct list_head   head       = {0, };
7f4c2a
+        inode_t           *par_inode  = NULL;
7f4c2a
 
7f4c2a
         priv = this->private;
7f4c2a
 
7f4c2a
@@ -1679,18 +1831,8 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
7f4c2a
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
7f4c2a
 
7f4c2a
         size = iov_length (vector, count);
7f4c2a
-        if (ctx != NULL) {
7f4c2a
-                LOCK (&ctx->lock);
7f4c2a
-                {
7f4c2a
-                        list_for_each_entry (dentry, &ctx->parents, next) {
7f4c2a
-                                tmp = __quota_dentry_new (NULL, dentry->name,
7f4c2a
-                                                          dentry->par);
7f4c2a
-                                list_add_tail (&tmp->next, &head;;
7f4c2a
-                                parents++;
7f4c2a
-                        }
7f4c2a
-                }
7f4c2a
-                UNLOCK (&ctx->lock);
7f4c2a
-        }
7f4c2a
+
7f4c2a
+        parents = quota_add_parents_from_ctx (ctx, &head;;
7f4c2a
 
7f4c2a
         LOCK (&local->lock);
7f4c2a
         {
7f4c2a
@@ -1707,10 +1849,33 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
7f4c2a
                 quota_check_limit (frame, fd->inode, this, NULL, NULL);
7f4c2a
         } else {
7f4c2a
                 list_for_each_entry_safe (dentry, tmp, &head, next) {
7f4c2a
-                        quota_check_limit (frame, fd->inode, this, dentry->name,
7f4c2a
-                                           dentry->par);
7f4c2a
+                        par_inode = do_quota_check_limit (frame, fd->inode,
7f4c2a
+                                                          this, dentry);
7f4c2a
+                        if (par_inode == NULL) {
7f4c2a
+                                /* remove stale entry from inode ctx */
7f4c2a
+                                quota_dentry_del (ctx, dentry->name,
7f4c2a
+                                                  dentry->par);
7f4c2a
+                                parents--;
7f4c2a
+                                fail_count++;
7f4c2a
+                        } else {
7f4c2a
+                                inode_unref (par_inode);
7f4c2a
+                        }
7f4c2a
                         __quota_dentry_free (dentry);
7f4c2a
                 }
7f4c2a
+
7f4c2a
+                if (parents == 0) {
7f4c2a
+                        LOCK (&local->lock);
7f4c2a
+                        {
7f4c2a
+                                local->link_count++;
7f4c2a
+                        }
7f4c2a
+                        UNLOCK (&local->lock);
7f4c2a
+                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
7f4c2a
+                }
7f4c2a
+
7f4c2a
+                while (fail_count != 0) {
7f4c2a
+                        quota_link_count_decrement (frame);
7f4c2a
+                        fail_count--;
7f4c2a
+                }
7f4c2a
         }
7f4c2a
 
7f4c2a
         return 0;
7f4c2a
@@ -1986,8 +2151,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
         quota_local_t     *local = NULL;
7f4c2a
         quota_inode_ctx_t *ctx   = NULL;
7f4c2a
         uint64_t           value = 0;
7f4c2a
-        quota_dentry_t    *dentry = NULL;
7f4c2a
-        quota_dentry_t    *old_dentry = NULL;
7f4c2a
 
7f4c2a
         if (op_ret < 0) {
7f4c2a
                 goto out;
7f4c2a
@@ -2006,20 +2169,7 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
                 goto out;
7f4c2a
         }
7f4c2a
 
7f4c2a
-        LOCK (&ctx->lock);
7f4c2a
-        {
7f4c2a
-                list_for_each_entry (dentry, &ctx->parents, next) {
7f4c2a
-                        if ((strcmp (dentry->name, local->loc.name) == 0) &&
7f4c2a
-                            (gf_uuid_compare (local->loc.parent->gfid,
7f4c2a
-                                           dentry->par) == 0)) {
7f4c2a
-                                old_dentry = dentry;
7f4c2a
-                                break;
7f4c2a
-                        }
7f4c2a
-                }
7f4c2a
-                if (old_dentry)
7f4c2a
-                        __quota_dentry_free (old_dentry);
7f4c2a
-        }
7f4c2a
-        UNLOCK (&ctx->lock);
7f4c2a
+        quota_dentry_del (ctx, local->loc.name, local->loc.parent->gfid);
7f4c2a
 
7f4c2a
 out:
7f4c2a
         QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent,
7f4c2a
@@ -2267,7 +2417,7 @@ off:
7f4c2a
                          FIRST_CHILD(this)->fops->link, &(local->oldloc),
7f4c2a
                          &(local->newloc), local->xdata);
7f4c2a
 
7f4c2a
-        quota_local_cleanup (this, local);
7f4c2a
+        quota_local_cleanup (local);
7f4c2a
         return;
7f4c2a
 }
7f4c2a
 
7f4c2a
@@ -2526,7 +2676,7 @@ quota_rename_get_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
         return 0;
7f4c2a
 
7f4c2a
 out:
7f4c2a
-        quota_handle_validate_error (local, -1, op_errno);
7f4c2a
+        quota_handle_validate_error (frame, -1, op_errno);
7f4c2a
         return 0;
7f4c2a
 }
7f4c2a
 
7f4c2a
@@ -4230,7 +4380,7 @@ quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
7f4c2a
         UNLOCK (&ctx->lock);
7f4c2a
 
7f4c2a
 resume:
7f4c2a
-        quota_link_count_decrement (local);
7f4c2a
+        quota_link_count_decrement (frame);
7f4c2a
         return 0;
7f4c2a
 }
7f4c2a
 
7f4c2a
@@ -4260,7 +4410,7 @@ quota_get_limit_dir_continuation (struct list_head *parents, inode_t *inode,
7f4c2a
                         op_errno = EIO;
7f4c2a
                 }
7f4c2a
 
7f4c2a
-                quota_handle_validate_error (local, -1, op_errno);
7f4c2a
+                quota_handle_validate_error (frame, -1, op_errno);
7f4c2a
                 goto out;
7f4c2a
         }
7f4c2a
 
7f4c2a
@@ -4290,7 +4440,7 @@ quota_statfs_continue (call_frame_t *frame, xlator_t *this, inode_t *inode)
7f4c2a
         ret = quota_validate (frame, local->inode, this,
7f4c2a
                               quota_statfs_validate_cbk);
7f4c2a
         if (0 > ret)
7f4c2a
-                quota_handle_validate_error (local, -1, -ret);
7f4c2a
+                quota_handle_validate_error (frame, -1, -ret);
7f4c2a
 }
7f4c2a
 
7f4c2a
 void
7f4c2a
@@ -4642,13 +4792,17 @@ int32_t
7f4c2a
 quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
7f4c2a
 		off_t offset, size_t len, dict_t *xdata)
7f4c2a
 {
7f4c2a
-        int32_t            ret     = -1, op_errno = EINVAL;
7f4c2a
-        int32_t            parents = 0;
7f4c2a
-        quota_local_t     *local   = NULL;
7f4c2a
-        quota_inode_ctx_t *ctx     = NULL;
7f4c2a
-        quota_priv_t      *priv    = NULL;
7f4c2a
-        quota_dentry_t    *dentry  = NULL;
7f4c2a
-        call_stub_t       *stub    = NULL;
7f4c2a
+        int32_t            ret         = -1, op_errno = EINVAL;
7f4c2a
+        int32_t            parents     = 0;
7f4c2a
+        int32_t            fail_count  = 0;
7f4c2a
+        quota_local_t     *local       = NULL;
7f4c2a
+        quota_inode_ctx_t *ctx         = NULL;
7f4c2a
+        quota_priv_t      *priv        = NULL;
7f4c2a
+        quota_dentry_t    *dentry      = NULL;
7f4c2a
+        quota_dentry_t    *tmp         = NULL;
7f4c2a
+        call_stub_t       *stub        = NULL;
7f4c2a
+        struct list_head   head        = {0, };
7f4c2a
+        inode_t           *par_inode   = NULL;
7f4c2a
 
7f4c2a
         priv = this->private;
7f4c2a
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
7f4c2a
@@ -4685,15 +4839,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
7f4c2a
         priv = this->private;
7f4c2a
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
7f4c2a
 
7f4c2a
-        if (ctx != NULL) {
7f4c2a
-                LOCK (&ctx->lock);
7f4c2a
-                {
7f4c2a
-                        list_for_each_entry (dentry, &ctx->parents, next) {
7f4c2a
-                                parents++;
7f4c2a
-                        }
7f4c2a
-                }
7f4c2a
-                UNLOCK (&ctx->lock);
7f4c2a
-        }
7f4c2a
+        parents = quota_add_parents_from_ctx (ctx, &head;;
7f4c2a
 
7f4c2a
 	/*
7f4c2a
 	 * Note that by using len as the delta we're assuming the range from
7f4c2a
@@ -4708,9 +4854,33 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
7f4c2a
                 local->link_count = 1;
7f4c2a
                 quota_check_limit (frame, fd->inode, this, NULL, NULL);
7f4c2a
         } else {
7f4c2a
-                list_for_each_entry (dentry, &ctx->parents, next) {
7f4c2a
-                        quota_check_limit (frame, fd->inode, this, dentry->name,
7f4c2a
-                                           dentry->par);
7f4c2a
+                list_for_each_entry_safe (dentry, tmp, &head, next) {
7f4c2a
+                        par_inode = do_quota_check_limit (frame, fd->inode,
7f4c2a
+                                                          this, dentry);
7f4c2a
+                        if (par_inode == NULL) {
7f4c2a
+                                /* remove stale entry from inode_ctx */
7f4c2a
+                                quota_dentry_del (ctx, dentry->name,
7f4c2a
+                                                  dentry->par);
7f4c2a
+                                parents--;
7f4c2a
+                                fail_count++;
7f4c2a
+                        } else {
7f4c2a
+                                inode_unref (par_inode);
7f4c2a
+                        }
7f4c2a
+                        __quota_dentry_free (dentry);
7f4c2a
+                }
7f4c2a
+
7f4c2a
+                if (parents == 0) {
7f4c2a
+                        LOCK (&local->lock);
7f4c2a
+                        {
7f4c2a
+                                local->link_count++;
7f4c2a
+                        }
7f4c2a
+                        UNLOCK (&local->lock);
7f4c2a
+                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
7f4c2a
+                }
7f4c2a
+
7f4c2a
+                while (fail_count != 0) {
7f4c2a
+                        quota_link_count_decrement (frame);
7f4c2a
+                        fail_count--;
7f4c2a
                 }
7f4c2a
         }
7f4c2a
 
7f4c2a
diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h
7f4c2a
index 56876f0..0f8cecd 100644
7f4c2a
--- a/xlators/features/quota/src/quota.h
7f4c2a
+++ b/xlators/features/quota/src/quota.h
7f4c2a
@@ -102,7 +102,7 @@
7f4c2a
                 STACK_WIND_TAIL (frame, params);                        \
7f4c2a
                                                                         \
7f4c2a
                 if (_local)                                             \
7f4c2a
-                        quota_local_cleanup (_this, _local);            \
7f4c2a
+                        quota_local_cleanup (_local);                   \
7f4c2a
         } while (0)
7f4c2a
 
7f4c2a
 #define QUOTA_STACK_UNWIND(fop, frame, params...)                       \
7f4c2a
@@ -115,7 +115,7 @@
7f4c2a
                         frame->local = NULL;                            \
7f4c2a
                 }                                                       \
7f4c2a
                 STACK_UNWIND_STRICT (fop, frame, params);               \
7f4c2a
-                quota_local_cleanup (_this, _local);                    \
7f4c2a
+                quota_local_cleanup (_local);                           \
7f4c2a
         } while (0)
7f4c2a
 
7f4c2a
 #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution)     \
7f4c2a
@@ -193,7 +193,6 @@ typedef void
7f4c2a
 
7f4c2a
 struct quota_local {
7f4c2a
         gf_lock_t               lock;
7f4c2a
-        uint32_t                validate_count;
7f4c2a
         uint32_t                link_count;
7f4c2a
         loc_t                   loc;
7f4c2a
         loc_t                   oldloc;
7f4c2a
@@ -219,6 +218,7 @@ struct quota_local {
7f4c2a
         dict_t                 *validate_xdata;
7f4c2a
         int32_t                 quotad_conn_retry;
7f4c2a
         xlator_t               *this;
7f4c2a
+        call_frame_t           *par_frame;
7f4c2a
 };
7f4c2a
 typedef struct quota_local      quota_local_t;
7f4c2a
 
7f4c2a
@@ -266,6 +266,9 @@ int32_t
7f4c2a
 quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
                    char *name, uuid_t par);
7f4c2a
 
7f4c2a
+inode_t *
7f4c2a
+do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
7f4c2a
+                      quota_dentry_t *dentry);
7f4c2a
 int
7f4c2a
 quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict,
7f4c2a
                      loc_t *loc, struct iatt *buf, int32_t *op_errno);
7f4c2a
-- 
7f4c2a
1.7.1
7f4c2a