cb8e9e
From 3d8c0c9d3cabc61172b1d5d767a33cf334ddf59c Mon Sep 17 00:00:00 2001
cb8e9e
From: vmallika <vmallika@redhat.com>
cb8e9e
Date: Thu, 13 Aug 2015 14:09:22 +0530
cb8e9e
Subject: [PATCH 258/279] quota: Fix crash in quota enforcer
cb8e9e
cb8e9e
This is a backport of http://review.gluster.org/11510
cb8e9e
cb8e9e
With multiple hardlinks check_quota_limit is invoked for each parent
cb8e9e
each of this check_limit can invoke validation
cb8e9e
this can cause frame->local to get corrupted during validation.
cb8e9e
cb8e9e
Testcase tests/bugs/quota/bug-1235182.t fails spuriously with
cb8e9e
this problem
cb8e9e
cb8e9e
> Change-Id: I53adc54b431fb5f43e67a94248102ddaf0d7978f
cb8e9e
> BUG: 1238747
cb8e9e
> Signed-off-by: vmallika <vmallika@redhat.com>
cb8e9e
> Reviewed-on: http://review.gluster.org/11510
cb8e9e
> Tested-by: NetBSD Build System <jenkins@build.gluster.org>
cb8e9e
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>
cb8e9e
cb8e9e
BUG: 1238049
cb8e9e
Change-Id: Iac8d3c54c81536c8d4ba07c2c34ea79d0701b714
cb8e9e
Signed-off-by: vmallika <vmallika@redhat.com>
cb8e9e
Reviewed-on: https://code.engineering.redhat.com/gerrit/55060
cb8e9e
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
cb8e9e
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
cb8e9e
---
cb8e9e
 tests/bugs/quota/bug-1235182.t     |   10 +-
cb8e9e
 xlators/features/quota/src/quota.c |  408 +++++++++++++++++++++++++-----------
cb8e9e
 xlators/features/quota/src/quota.h |    9 +-
cb8e9e
 3 files changed, 297 insertions(+), 130 deletions(-)
cb8e9e
cb8e9e
diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t
cb8e9e
index 0bd43a9..e28b557 100644
cb8e9e
--- a/tests/bugs/quota/bug-1235182.t
cb8e9e
+++ b/tests/bugs/quota/bug-1235182.t
cb8e9e
@@ -9,12 +9,6 @@
cb8e9e
 
cb8e9e
 cleanup;
cb8e9e
 
cb8e9e
-function usage()
cb8e9e
-{
cb8e9e
-        local QUOTA_PATH=$1;
cb8e9e
-        $CLI volume quota $V0 list $QUOTA_PATH | grep "$QUOTA_PATH" | awk '{print $4}'
cb8e9e
-}
cb8e9e
-
cb8e9e
 QDD=$(dirname $0)/quota
cb8e9e
 # compile the test write program and run it
cb8e9e
 build_tester $(dirname $0)/../../basic/quota.c -o $QDD
cb8e9e
@@ -23,7 +17,7 @@ TEST glusterd
cb8e9e
 TEST pidof glusterd;
cb8e9e
 TEST $CLI volume info;
cb8e9e
 
cb8e9e
-TEST $CLI volume create $V0 $H0:$B0/${V0}{1};
cb8e9e
+TEST $CLI volume create $V0 $H0:$B0/${V0};
cb8e9e
 TEST $CLI volume start $V0;
cb8e9e
 
cb8e9e
 TEST $CLI volume quota $V0 enable;
cb8e9e
@@ -47,7 +41,7 @@ echo "Wait for process with pid $PID to complete"
cb8e9e
 wait $PID
cb8e9e
 echo "Process with pid $PID finished"
cb8e9e
 
cb8e9e
-EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "100.0MB" usage "/"
cb8e9e
+EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "100.0MB" quotausage "/"
cb8e9e
 
cb8e9e
 TEST $CLI volume stop $V0
cb8e9e
 TEST $CLI volume delete $V0
cb8e9e
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c
cb8e9e
index df0572f..16ad0fc 100644
cb8e9e
--- a/xlators/features/quota/src/quota.c
cb8e9e
+++ b/xlators/features/quota/src/quota.c
cb8e9e
@@ -155,7 +155,7 @@ err:
cb8e9e
 
cb8e9e
 
cb8e9e
 int32_t
cb8e9e
-quota_local_cleanup (xlator_t *this, quota_local_t *local)
cb8e9e
+quota_local_cleanup (quota_local_t *local)
cb8e9e
 {
cb8e9e
         if (local == NULL) {
cb8e9e
                 goto out;
cb8e9e
@@ -243,6 +243,31 @@ out:
cb8e9e
         return;
cb8e9e
 }
cb8e9e
 
cb8e9e
+void
cb8e9e
+__quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par)
cb8e9e
+{
cb8e9e
+        quota_dentry_t    *dentry = NULL;
cb8e9e
+        quota_dentry_t    *tmp    = NULL;
cb8e9e
+
cb8e9e
+        list_for_each_entry_safe (dentry, tmp, &ctx->parents, next) {
cb8e9e
+                if ((strcmp (dentry->name, name) == 0) &&
cb8e9e
+                    (gf_uuid_compare (dentry->par, par) == 0)) {
cb8e9e
+                        __quota_dentry_free (dentry);
cb8e9e
+                        break;
cb8e9e
+                }
cb8e9e
+        }
cb8e9e
+}
cb8e9e
+
cb8e9e
+void
cb8e9e
+quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par)
cb8e9e
+{
cb8e9e
+        LOCK (&ctx->lock);
cb8e9e
+        {
cb8e9e
+                __quota_dentry_del (ctx, name, par);
cb8e9e
+        }
cb8e9e
+        UNLOCK (&ctx->lock);
cb8e9e
+}
cb8e9e
+
cb8e9e
 static inline inode_t*
cb8e9e
 __quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name)
cb8e9e
 {
cb8e9e
@@ -485,10 +510,18 @@ out:
cb8e9e
 }
cb8e9e
 
cb8e9e
 static inline void
cb8e9e
-quota_link_count_decrement (quota_local_t *local)
cb8e9e
+quota_link_count_decrement (call_frame_t *frame)
cb8e9e
 {
cb8e9e
-        call_stub_t *stub       = NULL;
cb8e9e
-        int          link_count = -1;
cb8e9e
+        call_frame_t   *tmpframe   = NULL;
cb8e9e
+        quota_local_t  *local      = NULL;
cb8e9e
+        call_stub_t    *stub       = NULL;
cb8e9e
+        int             link_count = -1;
cb8e9e
+
cb8e9e
+        local = frame->local;
cb8e9e
+        if (local && local->par_frame) {
cb8e9e
+                local = local->par_frame->local;
cb8e9e
+                tmpframe = frame;
cb8e9e
+        }
cb8e9e
 
cb8e9e
         if (local == NULL)
cb8e9e
                 goto out;
cb8e9e
@@ -506,14 +539,30 @@ quota_link_count_decrement (quota_local_t *local)
cb8e9e
         if (stub != NULL) {
cb8e9e
                 call_resume (stub);
cb8e9e
         }
cb8e9e
+
cb8e9e
 out:
cb8e9e
+        if (tmpframe) {
cb8e9e
+                local = tmpframe->local;
cb8e9e
+                tmpframe->local = NULL;
cb8e9e
+
cb8e9e
+                STACK_DESTROY (frame->root);
cb8e9e
+                if (local)
cb8e9e
+                        quota_local_cleanup (local);
cb8e9e
+        }
cb8e9e
+
cb8e9e
         return;
cb8e9e
 }
cb8e9e
 
cb8e9e
 static inline void
cb8e9e
-quota_handle_validate_error (quota_local_t *local, int32_t op_ret,
cb8e9e
+quota_handle_validate_error (call_frame_t *frame, int32_t op_ret,
cb8e9e
                              int32_t op_errno)
cb8e9e
 {
cb8e9e
+        quota_local_t  *local;
cb8e9e
+
cb8e9e
+        local = frame->local;
cb8e9e
+        if (local && local->par_frame)
cb8e9e
+                local = local->par_frame->local;
cb8e9e
+
cb8e9e
         if (local == NULL)
cb8e9e
                 goto out;
cb8e9e
 
cb8e9e
@@ -527,7 +576,7 @@ quota_handle_validate_error (quota_local_t *local, int32_t op_ret,
cb8e9e
         UNLOCK (&local->lock);
cb8e9e
 
cb8e9e
         /* we abort checking limits on this path to root */
cb8e9e
-        quota_link_count_decrement (local);
cb8e9e
+        quota_link_count_decrement (frame);
cb8e9e
 out:
cb8e9e
         return;
cb8e9e
 }
cb8e9e
@@ -595,7 +644,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 unwind:
cb8e9e
-        quota_handle_validate_error (local, op_ret, op_errno);
cb8e9e
+        quota_handle_validate_error (frame, op_ret, op_errno);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -622,27 +671,65 @@ quota_timeout (struct timeval *tv, int32_t timeout)
cb8e9e
         return timed_out;
cb8e9e
 }
cb8e9e
 
cb8e9e
-static inline void
cb8e9e
-quota_add_parent (quota_dentry_t *dentry, struct list_head *list)
cb8e9e
+/* Return: 1 if new entry added
cb8e9e
+ *         0 no entry added
cb8e9e
+ */
cb8e9e
+static inline int32_t
cb8e9e
+quota_add_parent (struct list_head *list, char *name, uuid_t pgfid)
cb8e9e
 {
cb8e9e
         quota_dentry_t *entry = NULL;
cb8e9e
         gf_boolean_t    found = _gf_false;
cb8e9e
 
cb8e9e
-        if ((dentry == NULL) || (list == NULL)) {
cb8e9e
+        if (list == NULL) {
cb8e9e
                 goto out;
cb8e9e
         }
cb8e9e
 
cb8e9e
         list_for_each_entry (entry, list, next) {
cb8e9e
-                if (gf_uuid_compare (dentry->par, entry->par) == 0) {
cb8e9e
+                if (gf_uuid_compare (pgfid, entry->par) == 0) {
cb8e9e
                         found = _gf_true;
cb8e9e
                         goto out;
cb8e9e
                 }
cb8e9e
         }
cb8e9e
 
cb8e9e
-        list_add_tail (&dentry->next, list);
cb8e9e
+        entry = __quota_dentry_new (NULL, name, pgfid);
cb8e9e
+        list_add_tail (&entry->next, list);
cb8e9e
 
cb8e9e
 out:
cb8e9e
-        return;
cb8e9e
+        if (found)
cb8e9e
+                return 0;
cb8e9e
+        else
cb8e9e
+                return 1;
cb8e9e
+
cb8e9e
+}
cb8e9e
+
cb8e9e
+/* This function iterates the parent list in inode
cb8e9e
+ * context and add unique parent to the list
cb8e9e
+ * Returns number of dentry added to the list
cb8e9e
+ */
cb8e9e
+static inline int32_t
cb8e9e
+quota_add_parents_from_ctx (quota_inode_ctx_t *ctx, struct list_head *list)
cb8e9e
+{
cb8e9e
+        int                ret     = 0;
cb8e9e
+        quota_dentry_t    *dentry  = NULL;
cb8e9e
+        int32_t            count   = 0;
cb8e9e
+
cb8e9e
+        if (ctx == NULL || list == NULL)
cb8e9e
+                goto out;
cb8e9e
+
cb8e9e
+        LOCK (&ctx->lock);
cb8e9e
+        {
cb8e9e
+                list_for_each_entry (dentry, &ctx->parents, next) {
cb8e9e
+                        ret = quota_add_parent (list, dentry->name,
cb8e9e
+                                                dentry->par);
cb8e9e
+
cb8e9e
+                        if (ret == 1)
cb8e9e
+                                count++;
cb8e9e
+                }
cb8e9e
+        }
cb8e9e
+        UNLOCK (&ctx->lock);
cb8e9e
+
cb8e9e
+out:
cb8e9e
+        return count;
cb8e9e
 }
cb8e9e
 
cb8e9e
 int32_t
cb8e9e
@@ -708,23 +795,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 
cb8e9e
         quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);
cb8e9e
 
cb8e9e
-        if (ctx != NULL) {
cb8e9e
-                LOCK (&ctx->lock);
cb8e9e
-                {
cb8e9e
-                        list_for_each_entry (dentry, &ctx->parents, next) {
cb8e9e
-                                /* we built ancestry for a non-directory */
cb8e9e
-                                tmp = __quota_dentry_new (NULL, dentry->name,
cb8e9e
-                                                          dentry->par);
cb8e9e
-                                quota_add_parent (tmp, &parents);
cb8e9e
-
cb8e9e
-                                if (list_empty (&tmp->next)) {
cb8e9e
-                                        __quota_dentry_free (tmp);
cb8e9e
-                                        tmp = NULL;
cb8e9e
-                                }
cb8e9e
-                        }
cb8e9e
-                }
cb8e9e
-                UNLOCK (&ctx->lock);
cb8e9e
-        }
cb8e9e
+        quota_add_parents_from_ctx (ctx, &parents);
cb8e9e
 
cb8e9e
         if (list_empty (&parents)) {
cb8e9e
                 /* we built ancestry for a directory */
cb8e9e
@@ -735,8 +806,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
 
cb8e9e
                 GF_ASSERT (&entry->list != &entries->list);
cb8e9e
 
cb8e9e
-                tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid);
cb8e9e
-                quota_add_parent (tmp, &parents);
cb8e9e
+                quota_add_parent (&parents, entry->d_name, parent->gfid);
cb8e9e
         }
cb8e9e
 
cb8e9e
         local->ancestry_cbk (&parents, local->loc.inode, 0, 0,
cb8e9e
@@ -748,7 +818,7 @@ err:
cb8e9e
 
cb8e9e
 cleanup:
cb8e9e
         STACK_DESTROY (frame->root);
cb8e9e
-        quota_local_cleanup (this, local);
cb8e9e
+        quota_local_cleanup (local);
cb8e9e
 
cb8e9e
         if (parent != NULL) {
cb8e9e
                 inode_unref (parent);
cb8e9e
@@ -838,7 +908,7 @@ err:
cb8e9e
                 }
cb8e9e
 
cb8e9e
                 if (local)
cb8e9e
-                        quota_local_cleanup (this, local);
cb8e9e
+                        quota_local_cleanup (local);
cb8e9e
         }
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
@@ -919,6 +989,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
cb8e9e
         call_frame_t   *frame        = NULL;
cb8e9e
         xlator_t       *this         = NULL;
cb8e9e
         quota_local_t  *local        = NULL;
cb8e9e
+        quota_local_t  *par_local    = NULL;
cb8e9e
         quota_dentry_t *entry        = NULL;
cb8e9e
         inode_t        *parent       = NULL;
cb8e9e
         int             parent_count = 0;
cb8e9e
@@ -927,6 +998,12 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
cb8e9e
         local = frame->local;
cb8e9e
         this = THIS;
cb8e9e
 
cb8e9e
+        if (local->par_frame)
cb8e9e
+                par_local = local->par_frame->local;
cb8e9e
+        else
cb8e9e
+                par_local = local;
cb8e9e
+
cb8e9e
+
cb8e9e
         if ((op_ret < 0) || list_empty (parents)) {
cb8e9e
                 if (op_ret >= 0) {
cb8e9e
                         gf_msg (this->name, GF_LOG_WARNING, EIO,
cb8e9e
@@ -939,7 +1016,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
cb8e9e
                         op_errno = EIO;
cb8e9e
                 }
cb8e9e
 
cb8e9e
-                quota_handle_validate_error (local, -1, op_errno);
cb8e9e
+                quota_handle_validate_error (frame, -1, op_errno);
cb8e9e
                 goto out;
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -947,16 +1024,27 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
cb8e9e
                 parent_count++;
cb8e9e
         }
cb8e9e
 
cb8e9e
-        LOCK (&local->lock);
cb8e9e
+        LOCK (&par_local->lock);
cb8e9e
         {
cb8e9e
-                local->link_count += (parent_count - 1);
cb8e9e
+                par_local->link_count += (parent_count - 1);
cb8e9e
         }
cb8e9e
-        UNLOCK (&local->lock);
cb8e9e
+        UNLOCK (&par_local->lock);
cb8e9e
 
cb8e9e
-        list_for_each_entry (entry, parents, next) {
cb8e9e
-                parent = inode_find (inode->table, entry->par);
cb8e9e
-                quota_check_limit (frame, parent, this, NULL, NULL);
cb8e9e
-                inode_unref (parent);
cb8e9e
+        if (local->par_frame) {
cb8e9e
+                list_for_each_entry (entry, parents, next) {
cb8e9e
+                        parent = inode_find (inode->table, entry->par);
cb8e9e
+                        quota_check_limit (frame, parent, this, NULL, NULL);
cb8e9e
+                        inode_unref (parent);
cb8e9e
+                }
cb8e9e
+        } else {
cb8e9e
+                list_for_each_entry (entry, parents, next) {
cb8e9e
+                        parent = do_quota_check_limit (frame, inode, this,
cb8e9e
+                                                       entry);
cb8e9e
+                        if (parent)
cb8e9e
+                                inode_unref (parent);
cb8e9e
+                        else
cb8e9e
+                                quota_link_count_decrement (frame);
cb8e9e
+                }
cb8e9e
         }
cb8e9e
 
cb8e9e
 out:
cb8e9e
@@ -1129,6 +1217,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
         quota_inode_ctx_t *ctx                 = NULL;
cb8e9e
         quota_priv_t      *priv                = NULL;
cb8e9e
         quota_local_t     *local               = NULL;
cb8e9e
+        quota_local_t     *par_local           = NULL;
cb8e9e
         char               need_validate       = 0;
cb8e9e
         char               just_validated      = 0;
cb8e9e
         gf_boolean_t       hard_limit_exceeded = 0;
cb8e9e
@@ -1144,9 +1233,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
         local  = frame->local;
cb8e9e
         GF_VALIDATE_OR_GOTO (this->name, local, err);
cb8e9e
 
cb8e9e
-        delta = local->delta;
cb8e9e
+        if (local->par_frame) {
cb8e9e
+                par_local = local->par_frame->local;
cb8e9e
+                GF_VALIDATE_OR_GOTO (this->name, par_local, err);
cb8e9e
+        } else {
cb8e9e
+                par_local = local;
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        delta = par_local->delta;
cb8e9e
 
cb8e9e
-        GF_VALIDATE_OR_GOTO (this->name, local->stub, err);
cb8e9e
+        GF_VALIDATE_OR_GOTO (this->name, par_local->stub, err);
cb8e9e
         /* Allow all the trusted clients
cb8e9e
          * Don't block the gluster internal processes like rebalance, gsyncd,
cb8e9e
          * self heal etc from the disk quotas.
cb8e9e
@@ -1157,7 +1253,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
          */
cb8e9e
         if (0 > frame->root->pid) {
cb8e9e
                 ret = 0;
cb8e9e
-                quota_link_count_decrement (local);
cb8e9e
+                quota_link_count_decrement (frame);
cb8e9e
                 goto done;
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -1182,15 +1278,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
         do {
cb8e9e
                 /* In a rename operation, enforce should be stopped at common
cb8e9e
                    ancestor */
cb8e9e
-                if (!gf_uuid_is_null (local->common_ancestor) &&
cb8e9e
-                    !gf_uuid_compare (_inode->gfid, local->common_ancestor)) {
cb8e9e
-                        quota_link_count_decrement (local);
cb8e9e
+                if (!gf_uuid_is_null (par_local->common_ancestor) &&
cb8e9e
+                    !gf_uuid_compare (_inode->gfid, par_local->common_ancestor)
cb8e9e
+                   ) {
cb8e9e
+                        quota_link_count_decrement (frame);
cb8e9e
                         break;
cb8e9e
                 }
cb8e9e
 
cb8e9e
                 ret = quota_check_object_limit (frame, ctx, priv, _inode, this,
cb8e9e
                                                 &op_errno, just_validated,
cb8e9e
-                                                local, &skip_check);
cb8e9e
+                                                par_local, &skip_check);
cb8e9e
                 if (skip_check == _gf_true)
cb8e9e
                         goto done;
cb8e9e
 
cb8e9e
@@ -1203,7 +1300,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
 
cb8e9e
                 ret = quota_check_size_limit (frame, ctx, priv, _inode, this,
cb8e9e
                                               &op_errno, just_validated, delta,
cb8e9e
-                                              local, &skip_check);
cb8e9e
+                                              par_local, &skip_check);
cb8e9e
                 if (skip_check == _gf_true)
cb8e9e
                         goto done;
cb8e9e
 
cb8e9e
@@ -1215,7 +1312,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
                 }
cb8e9e
 
cb8e9e
                 if (__is_root_gfid (_inode->gfid)) {
cb8e9e
-                        quota_link_count_decrement (local);
cb8e9e
+                        quota_link_count_decrement (frame);
cb8e9e
                         break;
cb8e9e
                 }
cb8e9e
 
cb8e9e
@@ -1228,8 +1325,8 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
 
cb8e9e
                 if (parent == NULL) {
cb8e9e
                         ret = quota_build_ancestry (_inode,
cb8e9e
-                                                    quota_check_limit_continuation,
cb8e9e
-                                                    frame);
cb8e9e
+                                                 quota_check_limit_continuation,
cb8e9e
+                                                 frame);
cb8e9e
                         if (ret < 0) {
cb8e9e
                                 op_errno = -ret;
cb8e9e
                                 goto err;
cb8e9e
@@ -1247,7 +1344,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
                 ctx = (quota_inode_ctx_t *)(unsigned long)value;
cb8e9e
         } while (1);
cb8e9e
 
cb8e9e
-
cb8e9e
 done:
cb8e9e
         if (_inode != NULL) {
cb8e9e
                 inode_unref (_inode);
cb8e9e
@@ -1256,12 +1352,66 @@ done:
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 err:
cb8e9e
-        quota_handle_validate_error (local, -1, op_errno);
cb8e9e
+        quota_handle_validate_error (frame, -1, op_errno);
cb8e9e
 
cb8e9e
         inode_unref (_inode);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
+inode_t *
cb8e9e
+do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
+                      quota_dentry_t *dentry)
cb8e9e
+{
cb8e9e
+        int32_t         ret        = -1;
cb8e9e
+        inode_t        *parent     = NULL;
cb8e9e
+        call_frame_t   *new_frame  = NULL;
cb8e9e
+        quota_local_t  *local      = NULL;
cb8e9e
+        quota_local_t  *new_local  = NULL;
cb8e9e
+
cb8e9e
+        local = frame->local;
cb8e9e
+
cb8e9e
+        parent = inode_parent (inode, dentry->par, dentry->name);
cb8e9e
+        if (parent == NULL)
cb8e9e
+                parent = inode_find (inode->table, dentry->par);
cb8e9e
+        if (parent == NULL)
cb8e9e
+                goto out;
cb8e9e
+
cb8e9e
+        new_frame = create_frame (this, this->ctx->pool);
cb8e9e
+        if (new_frame == NULL)
cb8e9e
+                goto out;
cb8e9e
+
cb8e9e
+        new_local = quota_local_new ();
cb8e9e
+        if (new_local == NULL)
cb8e9e
+                goto out;
cb8e9e
+
cb8e9e
+        new_frame->root->uid = new_frame->root->gid = 0;
cb8e9e
+        new_frame->local = new_local;
cb8e9e
+        new_local->par_frame = frame;
cb8e9e
+
cb8e9e
+        quota_check_limit (new_frame, parent, this, NULL, NULL);
cb8e9e
+
cb8e9e
+        ret = 0;
cb8e9e
+out:
cb8e9e
+        if (ret < 0) {
cb8e9e
+                if (parent) {
cb8e9e
+                        /* Caller should decrement link_count, in case parent is
cb8e9e
+                         * NULL
cb8e9e
+                         */
cb8e9e
+                        quota_handle_validate_error (frame, -1, ENOMEM);
cb8e9e
+                }
cb8e9e
+
cb8e9e
+                if (new_frame) {
cb8e9e
+                        new_frame->local = NULL;
cb8e9e
+                        STACK_DESTROY (new_frame->root);
cb8e9e
+                }
cb8e9e
+
cb8e9e
+                if (new_local)
cb8e9e
+                        quota_local_cleanup (new_local);
cb8e9e
+        }
cb8e9e
+
cb8e9e
+        return parent;
cb8e9e
+}
cb8e9e
+
cb8e9e
 static inline int
cb8e9e
 quota_get_limits (xlator_t *this, dict_t *dict, int64_t *hard_lim,
cb8e9e
                   int64_t *soft_lim, int64_t *object_hard_limit,
cb8e9e
@@ -1440,7 +1590,7 @@ out:
cb8e9e
         if (this_inode)
cb8e9e
                 inode_unref (this_inode);
cb8e9e
 
cb8e9e
-        quota_local_cleanup (this, local);
cb8e9e
+        quota_local_cleanup (local);
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
@@ -1632,15 +1782,17 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
cb8e9e
               struct iovec *vector, int32_t count, off_t off,
cb8e9e
               uint32_t flags, struct iobref *iobref, dict_t *xdata)
cb8e9e
 {
cb8e9e
-        quota_priv_t      *priv    = NULL;
cb8e9e
-        int32_t            ret     = -1, op_errno = EINVAL;
cb8e9e
-        int32_t            parents = 0;
cb8e9e
-        uint64_t           size    = 0;
cb8e9e
-        quota_local_t     *local   = NULL;
cb8e9e
-        quota_inode_ctx_t *ctx     = NULL;
cb8e9e
-        quota_dentry_t    *dentry  = NULL, *tmp = NULL;
cb8e9e
-        call_stub_t       *stub    = NULL;
cb8e9e
-        struct list_head   head    = {0, };
cb8e9e
+        quota_priv_t      *priv       = NULL;
cb8e9e
+        int32_t            ret        = -1, op_errno = EINVAL;
cb8e9e
+        int32_t            parents    = 0;
cb8e9e
+        int32_t            fail_count = 0;
cb8e9e
+        uint64_t           size       = 0;
cb8e9e
+        quota_local_t     *local      = NULL;
cb8e9e
+        quota_inode_ctx_t *ctx        = NULL;
cb8e9e
+        quota_dentry_t    *dentry     = NULL, *tmp = NULL;
cb8e9e
+        call_stub_t       *stub       = NULL;
cb8e9e
+        struct list_head   head       = {0, };
cb8e9e
+        inode_t           *par_inode  = NULL;
cb8e9e
 
cb8e9e
         priv = this->private;
cb8e9e
 
cb8e9e
@@ -1679,18 +1831,8 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
cb8e9e
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
cb8e9e
 
cb8e9e
         size = iov_length (vector, count);
cb8e9e
-        if (ctx != NULL) {
cb8e9e
-                LOCK (&ctx->lock);
cb8e9e
-                {
cb8e9e
-                        list_for_each_entry (dentry, &ctx->parents, next) {
cb8e9e
-                                tmp = __quota_dentry_new (NULL, dentry->name,
cb8e9e
-                                                          dentry->par);
cb8e9e
-                                list_add_tail (&tmp->next, &head;;
cb8e9e
-                                parents++;
cb8e9e
-                        }
cb8e9e
-                }
cb8e9e
-                UNLOCK (&ctx->lock);
cb8e9e
-        }
cb8e9e
+
cb8e9e
+        parents = quota_add_parents_from_ctx (ctx, &head;;
cb8e9e
 
cb8e9e
         LOCK (&local->lock);
cb8e9e
         {
cb8e9e
@@ -1707,10 +1849,33 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
cb8e9e
                 quota_check_limit (frame, fd->inode, this, NULL, NULL);
cb8e9e
         } else {
cb8e9e
                 list_for_each_entry_safe (dentry, tmp, &head, next) {
cb8e9e
-                        quota_check_limit (frame, fd->inode, this, dentry->name,
cb8e9e
-                                           dentry->par);
cb8e9e
+                        par_inode = do_quota_check_limit (frame, fd->inode,
cb8e9e
+                                                          this, dentry);
cb8e9e
+                        if (par_inode == NULL) {
cb8e9e
+                                /* remove stale entry from inode ctx */
cb8e9e
+                                quota_dentry_del (ctx, dentry->name,
cb8e9e
+                                                  dentry->par);
cb8e9e
+                                parents--;
cb8e9e
+                                fail_count++;
cb8e9e
+                        } else {
cb8e9e
+                                inode_unref (par_inode);
cb8e9e
+                        }
cb8e9e
                         __quota_dentry_free (dentry);
cb8e9e
                 }
cb8e9e
+
cb8e9e
+                if (parents == 0) {
cb8e9e
+                        LOCK (&local->lock);
cb8e9e
+                        {
cb8e9e
+                                local->link_count++;
cb8e9e
+                        }
cb8e9e
+                        UNLOCK (&local->lock);
cb8e9e
+                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
cb8e9e
+                }
cb8e9e
+
cb8e9e
+                while (fail_count != 0) {
cb8e9e
+                        quota_link_count_decrement (frame);
cb8e9e
+                        fail_count--;
cb8e9e
+                }
cb8e9e
         }
cb8e9e
 
cb8e9e
         return 0;
cb8e9e
@@ -1986,8 +2151,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         quota_local_t     *local = NULL;
cb8e9e
         quota_inode_ctx_t *ctx   = NULL;
cb8e9e
         uint64_t           value = 0;
cb8e9e
-        quota_dentry_t    *dentry = NULL;
cb8e9e
-        quota_dentry_t    *old_dentry = NULL;
cb8e9e
 
cb8e9e
         if (op_ret < 0) {
cb8e9e
                 goto out;
cb8e9e
@@ -2006,20 +2169,7 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
                 goto out;
cb8e9e
         }
cb8e9e
 
cb8e9e
-        LOCK (&ctx->lock);
cb8e9e
-        {
cb8e9e
-                list_for_each_entry (dentry, &ctx->parents, next) {
cb8e9e
-                        if ((strcmp (dentry->name, local->loc.name) == 0) &&
cb8e9e
-                            (gf_uuid_compare (local->loc.parent->gfid,
cb8e9e
-                                           dentry->par) == 0)) {
cb8e9e
-                                old_dentry = dentry;
cb8e9e
-                                break;
cb8e9e
-                        }
cb8e9e
-                }
cb8e9e
-                if (old_dentry)
cb8e9e
-                        __quota_dentry_free (old_dentry);
cb8e9e
-        }
cb8e9e
-        UNLOCK (&ctx->lock);
cb8e9e
+        quota_dentry_del (ctx, local->loc.name, local->loc.parent->gfid);
cb8e9e
 
cb8e9e
 out:
cb8e9e
         QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent,
cb8e9e
@@ -2267,7 +2417,7 @@ off:
cb8e9e
                          FIRST_CHILD(this)->fops->link, &(local->oldloc),
cb8e9e
                          &(local->newloc), local->xdata);
cb8e9e
 
cb8e9e
-        quota_local_cleanup (this, local);
cb8e9e
+        quota_local_cleanup (local);
cb8e9e
         return;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -2526,7 +2676,7 @@ quota_rename_get_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         return 0;
cb8e9e
 
cb8e9e
 out:
cb8e9e
-        quota_handle_validate_error (local, -1, op_errno);
cb8e9e
+        quota_handle_validate_error (frame, -1, op_errno);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -4230,7 +4380,7 @@ quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
cb8e9e
         UNLOCK (&ctx->lock);
cb8e9e
 
cb8e9e
 resume:
cb8e9e
-        quota_link_count_decrement (local);
cb8e9e
+        quota_link_count_decrement (frame);
cb8e9e
         return 0;
cb8e9e
 }
cb8e9e
 
cb8e9e
@@ -4260,7 +4410,7 @@ quota_get_limit_dir_continuation (struct list_head *parents, inode_t *inode,
cb8e9e
                         op_errno = EIO;
cb8e9e
                 }
cb8e9e
 
cb8e9e
-                quota_handle_validate_error (local, -1, op_errno);
cb8e9e
+                quota_handle_validate_error (frame, -1, op_errno);
cb8e9e
                 goto out;
cb8e9e
         }
cb8e9e
 
cb8e9e
@@ -4290,7 +4440,7 @@ quota_statfs_continue (call_frame_t *frame, xlator_t *this, inode_t *inode)
cb8e9e
         ret = quota_validate (frame, local->inode, this,
cb8e9e
                               quota_statfs_validate_cbk);
cb8e9e
         if (0 > ret)
cb8e9e
-                quota_handle_validate_error (local, -1, -ret);
cb8e9e
+                quota_handle_validate_error (frame, -1, -ret);
cb8e9e
 }
cb8e9e
 
cb8e9e
 void
cb8e9e
@@ -4642,13 +4792,17 @@ int32_t
cb8e9e
 quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
cb8e9e
 		off_t offset, size_t len, dict_t *xdata)
cb8e9e
 {
cb8e9e
-        int32_t            ret     = -1, op_errno = EINVAL;
cb8e9e
-        int32_t            parents = 0;
cb8e9e
-        quota_local_t     *local   = NULL;
cb8e9e
-        quota_inode_ctx_t *ctx     = NULL;
cb8e9e
-        quota_priv_t      *priv    = NULL;
cb8e9e
-        quota_dentry_t    *dentry  = NULL;
cb8e9e
-        call_stub_t       *stub    = NULL;
cb8e9e
+        int32_t            ret         = -1, op_errno = EINVAL;
cb8e9e
+        int32_t            parents     = 0;
cb8e9e
+        int32_t            fail_count  = 0;
cb8e9e
+        quota_local_t     *local       = NULL;
cb8e9e
+        quota_inode_ctx_t *ctx         = NULL;
cb8e9e
+        quota_priv_t      *priv        = NULL;
cb8e9e
+        quota_dentry_t    *dentry      = NULL;
cb8e9e
+        quota_dentry_t    *tmp         = NULL;
cb8e9e
+        call_stub_t       *stub        = NULL;
cb8e9e
+        struct list_head   head        = {0, };
cb8e9e
+        inode_t           *par_inode   = NULL;
cb8e9e
 
cb8e9e
         priv = this->private;
cb8e9e
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
cb8e9e
@@ -4685,15 +4839,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
cb8e9e
         priv = this->private;
cb8e9e
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
cb8e9e
 
cb8e9e
-        if (ctx != NULL) {
cb8e9e
-                LOCK (&ctx->lock);
cb8e9e
-                {
cb8e9e
-                        list_for_each_entry (dentry, &ctx->parents, next) {
cb8e9e
-                                parents++;
cb8e9e
-                        }
cb8e9e
-                }
cb8e9e
-                UNLOCK (&ctx->lock);
cb8e9e
-        }
cb8e9e
+        parents = quota_add_parents_from_ctx (ctx, &head;;
cb8e9e
 
cb8e9e
 	/*
cb8e9e
 	 * Note that by using len as the delta we're assuming the range from
cb8e9e
@@ -4708,9 +4854,33 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
cb8e9e
                 local->link_count = 1;
cb8e9e
                 quota_check_limit (frame, fd->inode, this, NULL, NULL);
cb8e9e
         } else {
cb8e9e
-                list_for_each_entry (dentry, &ctx->parents, next) {
cb8e9e
-                        quota_check_limit (frame, fd->inode, this, dentry->name,
cb8e9e
-                                           dentry->par);
cb8e9e
+                list_for_each_entry_safe (dentry, tmp, &head, next) {
cb8e9e
+                        par_inode = do_quota_check_limit (frame, fd->inode,
cb8e9e
+                                                          this, dentry);
cb8e9e
+                        if (par_inode == NULL) {
cb8e9e
+                                /* remove stale entry from inode_ctx */
cb8e9e
+                                quota_dentry_del (ctx, dentry->name,
cb8e9e
+                                                  dentry->par);
cb8e9e
+                                parents--;
cb8e9e
+                                fail_count++;
cb8e9e
+                        } else {
cb8e9e
+                                inode_unref (par_inode);
cb8e9e
+                        }
cb8e9e
+                        __quota_dentry_free (dentry);
cb8e9e
+                }
cb8e9e
+
cb8e9e
+                if (parents == 0) {
cb8e9e
+                        LOCK (&local->lock);
cb8e9e
+                        {
cb8e9e
+                                local->link_count++;
cb8e9e
+                        }
cb8e9e
+                        UNLOCK (&local->lock);
cb8e9e
+                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
cb8e9e
+                }
cb8e9e
+
cb8e9e
+                while (fail_count != 0) {
cb8e9e
+                        quota_link_count_decrement (frame);
cb8e9e
+                        fail_count--;
cb8e9e
                 }
cb8e9e
         }
cb8e9e
 
cb8e9e
diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h
cb8e9e
index 56876f0..0f8cecd 100644
cb8e9e
--- a/xlators/features/quota/src/quota.h
cb8e9e
+++ b/xlators/features/quota/src/quota.h
cb8e9e
@@ -102,7 +102,7 @@
cb8e9e
                 STACK_WIND_TAIL (frame, params);                        \
cb8e9e
                                                                         \
cb8e9e
                 if (_local)                                             \
cb8e9e
-                        quota_local_cleanup (_this, _local);            \
cb8e9e
+                        quota_local_cleanup (_local);                   \
cb8e9e
         } while (0)
cb8e9e
 
cb8e9e
 #define QUOTA_STACK_UNWIND(fop, frame, params...)                       \
cb8e9e
@@ -115,7 +115,7 @@
cb8e9e
                         frame->local = NULL;                            \
cb8e9e
                 }                                                       \
cb8e9e
                 STACK_UNWIND_STRICT (fop, frame, params);               \
cb8e9e
-                quota_local_cleanup (_this, _local);                    \
cb8e9e
+                quota_local_cleanup (_local);                           \
cb8e9e
         } while (0)
cb8e9e
 
cb8e9e
 #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution)     \
cb8e9e
@@ -193,7 +193,6 @@ typedef void
cb8e9e
 
cb8e9e
 struct quota_local {
cb8e9e
         gf_lock_t               lock;
cb8e9e
-        uint32_t                validate_count;
cb8e9e
         uint32_t                link_count;
cb8e9e
         loc_t                   loc;
cb8e9e
         loc_t                   oldloc;
cb8e9e
@@ -219,6 +218,7 @@ struct quota_local {
cb8e9e
         dict_t                 *validate_xdata;
cb8e9e
         int32_t                 quotad_conn_retry;
cb8e9e
         xlator_t               *this;
cb8e9e
+        call_frame_t           *par_frame;
cb8e9e
 };
cb8e9e
 typedef struct quota_local      quota_local_t;
cb8e9e
 
cb8e9e
@@ -266,6 +266,9 @@ int32_t
cb8e9e
 quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
                    char *name, uuid_t par);
cb8e9e
 
cb8e9e
+inode_t *
cb8e9e
+do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
cb8e9e
+                      quota_dentry_t *dentry);
cb8e9e
 int
cb8e9e
 quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict,
cb8e9e
                      loc_t *loc, struct iatt *buf, int32_t *op_errno);
cb8e9e
-- 
cb8e9e
1.7.1
cb8e9e