Blob Blame History Raw
From 3d8c0c9d3cabc61172b1d5d767a33cf334ddf59c Mon Sep 17 00:00:00 2001
From: vmallika <vmallika@redhat.com>
Date: Thu, 13 Aug 2015 14:09:22 +0530
Subject: [PATCH 258/279] quota: Fix crash in quota enforcer

This is a backport of http://review.gluster.org/11510

With multiple hardlinks check_quota_limit is invoked for each parent
each of this check_limit can invoke validation
this can cause frame->local to get corrupted during validation.

Testcase tests/bugs/quota/bug-1235182.t fails spuriously with
this problem

> Change-Id: I53adc54b431fb5f43e67a94248102ddaf0d7978f
> BUG: 1238747
> Signed-off-by: vmallika <vmallika@redhat.com>
> Reviewed-on: http://review.gluster.org/11510
> Tested-by: NetBSD Build System <jenkins@build.gluster.org>
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>

BUG: 1238049
Change-Id: Iac8d3c54c81536c8d4ba07c2c34ea79d0701b714
Signed-off-by: vmallika <vmallika@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/55060
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
---
 tests/bugs/quota/bug-1235182.t     |   10 +-
 xlators/features/quota/src/quota.c |  408 +++++++++++++++++++++++++-----------
 xlators/features/quota/src/quota.h |    9 +-
 3 files changed, 297 insertions(+), 130 deletions(-)

diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t
index 0bd43a9..e28b557 100644
--- a/tests/bugs/quota/bug-1235182.t
+++ b/tests/bugs/quota/bug-1235182.t
@@ -9,12 +9,6 @@
 
 cleanup;
 
-function usage()
-{
-        local QUOTA_PATH=$1;
-        $CLI volume quota $V0 list $QUOTA_PATH | grep "$QUOTA_PATH" | awk '{print $4}'
-}
-
 QDD=$(dirname $0)/quota
 # compile the test write program and run it
 build_tester $(dirname $0)/../../basic/quota.c -o $QDD
@@ -23,7 +17,7 @@ TEST glusterd
 TEST pidof glusterd;
 TEST $CLI volume info;
 
-TEST $CLI volume create $V0 $H0:$B0/${V0}{1};
+TEST $CLI volume create $V0 $H0:$B0/${V0};
 TEST $CLI volume start $V0;
 
 TEST $CLI volume quota $V0 enable;
@@ -47,7 +41,7 @@ echo "Wait for process with pid $PID to complete"
 wait $PID
 echo "Process with pid $PID finished"
 
-EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "100.0MB" usage "/"
+EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "100.0MB" quotausage "/"
 
 TEST $CLI volume stop $V0
 TEST $CLI volume delete $V0
diff --git a/xlators/features/quota/src/quota.c b/xlators/features/quota/src/quota.c
index df0572f..16ad0fc 100644
--- a/xlators/features/quota/src/quota.c
+++ b/xlators/features/quota/src/quota.c
@@ -155,7 +155,7 @@ err:
 
 
 int32_t
-quota_local_cleanup (xlator_t *this, quota_local_t *local)
+quota_local_cleanup (quota_local_t *local)
 {
         if (local == NULL) {
                 goto out;
@@ -243,6 +243,31 @@ out:
         return;
 }
 
+void
+__quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par)
+{
+        quota_dentry_t    *dentry = NULL;
+        quota_dentry_t    *tmp    = NULL;
+
+        list_for_each_entry_safe (dentry, tmp, &ctx->parents, next) {
+                if ((strcmp (dentry->name, name) == 0) &&
+                    (gf_uuid_compare (dentry->par, par) == 0)) {
+                        __quota_dentry_free (dentry);
+                        break;
+                }
+        }
+}
+
+void
+quota_dentry_del (quota_inode_ctx_t *ctx, const char *name, uuid_t par)
+{
+        LOCK (&ctx->lock);
+        {
+                __quota_dentry_del (ctx, name, par);
+        }
+        UNLOCK (&ctx->lock);
+}
+
 static inline inode_t*
 __quota_inode_parent (inode_t *inode, uuid_t pargfid, const char *name)
 {
@@ -485,10 +510,18 @@ out:
 }
 
 static inline void
-quota_link_count_decrement (quota_local_t *local)
+quota_link_count_decrement (call_frame_t *frame)
 {
-        call_stub_t *stub       = NULL;
-        int          link_count = -1;
+        call_frame_t   *tmpframe   = NULL;
+        quota_local_t  *local      = NULL;
+        call_stub_t    *stub       = NULL;
+        int             link_count = -1;
+
+        local = frame->local;
+        if (local && local->par_frame) {
+                local = local->par_frame->local;
+                tmpframe = frame;
+        }
 
         if (local == NULL)
                 goto out;
@@ -506,14 +539,30 @@ quota_link_count_decrement (quota_local_t *local)
         if (stub != NULL) {
                 call_resume (stub);
         }
+
 out:
+        if (tmpframe) {
+                local = tmpframe->local;
+                tmpframe->local = NULL;
+
+                STACK_DESTROY (frame->root);
+                if (local)
+                        quota_local_cleanup (local);
+        }
+
         return;
 }
 
 static inline void
-quota_handle_validate_error (quota_local_t *local, int32_t op_ret,
+quota_handle_validate_error (call_frame_t *frame, int32_t op_ret,
                              int32_t op_errno)
 {
+        quota_local_t  *local;
+
+        local = frame->local;
+        if (local && local->par_frame)
+                local = local->par_frame->local;
+
         if (local == NULL)
                 goto out;
 
@@ -527,7 +576,7 @@ quota_handle_validate_error (quota_local_t *local, int32_t op_ret,
         UNLOCK (&local->lock);
 
         /* we abort checking limits on this path to root */
-        quota_link_count_decrement (local);
+        quota_link_count_decrement (frame);
 out:
         return;
 }
@@ -595,7 +644,7 @@ quota_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         return 0;
 
 unwind:
-        quota_handle_validate_error (local, op_ret, op_errno);
+        quota_handle_validate_error (frame, op_ret, op_errno);
         return 0;
 }
 
@@ -622,27 +671,65 @@ quota_timeout (struct timeval *tv, int32_t timeout)
         return timed_out;
 }
 
-static inline void
-quota_add_parent (quota_dentry_t *dentry, struct list_head *list)
+/* Return: 1 if new entry added
+ *         0 no entry added
+ */
+static inline int32_t
+quota_add_parent (struct list_head *list, char *name, uuid_t pgfid)
 {
         quota_dentry_t *entry = NULL;
         gf_boolean_t    found = _gf_false;
 
-        if ((dentry == NULL) || (list == NULL)) {
+        if (list == NULL) {
                 goto out;
         }
 
         list_for_each_entry (entry, list, next) {
-                if (gf_uuid_compare (dentry->par, entry->par) == 0) {
+                if (gf_uuid_compare (pgfid, entry->par) == 0) {
                         found = _gf_true;
                         goto out;
                 }
         }
 
-        list_add_tail (&dentry->next, list);
+        entry = __quota_dentry_new (NULL, name, pgfid);
+        list_add_tail (&entry->next, list);
 
 out:
-        return;
+        if (found)
+                return 0;
+        else
+                return 1;
+
+}
+
+/* This function iterates the parent list in inode
+ * context and add unique parent to the list
+ * Returns number of dentry added to the list
+ */
+static inline int32_t
+quota_add_parents_from_ctx (quota_inode_ctx_t *ctx, struct list_head *list)
+{
+        int                ret     = 0;
+        quota_dentry_t    *dentry  = NULL;
+        int32_t            count   = 0;
+
+        if (ctx == NULL || list == NULL)
+                goto out;
+
+        LOCK (&ctx->lock);
+        {
+                list_for_each_entry (dentry, &ctx->parents, next) {
+                        ret = quota_add_parent (list, dentry->name,
+                                                dentry->par);
+
+                        if (ret == 1)
+                                count++;
+                }
+        }
+        UNLOCK (&ctx->lock);
+
+out:
+        return count;
 }
 
 int32_t
@@ -708,23 +795,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
         quota_inode_ctx_get (local->loc.inode, this, &ctx, 0);
 
-        if (ctx != NULL) {
-                LOCK (&ctx->lock);
-                {
-                        list_for_each_entry (dentry, &ctx->parents, next) {
-                                /* we built ancestry for a non-directory */
-                                tmp = __quota_dentry_new (NULL, dentry->name,
-                                                          dentry->par);
-                                quota_add_parent (tmp, &parents);
-
-                                if (list_empty (&tmp->next)) {
-                                        __quota_dentry_free (tmp);
-                                        tmp = NULL;
-                                }
-                        }
-                }
-                UNLOCK (&ctx->lock);
-        }
+        quota_add_parents_from_ctx (ctx, &parents);
 
         if (list_empty (&parents)) {
                 /* we built ancestry for a directory */
@@ -735,8 +806,7 @@ quota_build_ancestry_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
                 GF_ASSERT (&entry->list != &entries->list);
 
-                tmp = __quota_dentry_new (NULL, entry->d_name, parent->gfid);
-                quota_add_parent (tmp, &parents);
+                quota_add_parent (&parents, entry->d_name, parent->gfid);
         }
 
         local->ancestry_cbk (&parents, local->loc.inode, 0, 0,
@@ -748,7 +818,7 @@ err:
 
 cleanup:
         STACK_DESTROY (frame->root);
-        quota_local_cleanup (this, local);
+        quota_local_cleanup (local);
 
         if (parent != NULL) {
                 inode_unref (parent);
@@ -838,7 +908,7 @@ err:
                 }
 
                 if (local)
-                        quota_local_cleanup (this, local);
+                        quota_local_cleanup (local);
         }
 
         return 0;
@@ -919,6 +989,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
         call_frame_t   *frame        = NULL;
         xlator_t       *this         = NULL;
         quota_local_t  *local        = NULL;
+        quota_local_t  *par_local    = NULL;
         quota_dentry_t *entry        = NULL;
         inode_t        *parent       = NULL;
         int             parent_count = 0;
@@ -927,6 +998,12 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
         local = frame->local;
         this = THIS;
 
+        if (local->par_frame)
+                par_local = local->par_frame->local;
+        else
+                par_local = local;
+
+
         if ((op_ret < 0) || list_empty (parents)) {
                 if (op_ret >= 0) {
                         gf_msg (this->name, GF_LOG_WARNING, EIO,
@@ -939,7 +1016,7 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
                         op_errno = EIO;
                 }
 
-                quota_handle_validate_error (local, -1, op_errno);
+                quota_handle_validate_error (frame, -1, op_errno);
                 goto out;
         }
 
@@ -947,16 +1024,27 @@ quota_check_limit_continuation (struct list_head *parents, inode_t *inode,
                 parent_count++;
         }
 
-        LOCK (&local->lock);
+        LOCK (&par_local->lock);
         {
-                local->link_count += (parent_count - 1);
+                par_local->link_count += (parent_count - 1);
         }
-        UNLOCK (&local->lock);
+        UNLOCK (&par_local->lock);
 
-        list_for_each_entry (entry, parents, next) {
-                parent = inode_find (inode->table, entry->par);
-                quota_check_limit (frame, parent, this, NULL, NULL);
-                inode_unref (parent);
+        if (local->par_frame) {
+                list_for_each_entry (entry, parents, next) {
+                        parent = inode_find (inode->table, entry->par);
+                        quota_check_limit (frame, parent, this, NULL, NULL);
+                        inode_unref (parent);
+                }
+        } else {
+                list_for_each_entry (entry, parents, next) {
+                        parent = do_quota_check_limit (frame, inode, this,
+                                                       entry);
+                        if (parent)
+                                inode_unref (parent);
+                        else
+                                quota_link_count_decrement (frame);
+                }
         }
 
 out:
@@ -1129,6 +1217,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         quota_inode_ctx_t *ctx                 = NULL;
         quota_priv_t      *priv                = NULL;
         quota_local_t     *local               = NULL;
+        quota_local_t     *par_local           = NULL;
         char               need_validate       = 0;
         char               just_validated      = 0;
         gf_boolean_t       hard_limit_exceeded = 0;
@@ -1144,9 +1233,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         local  = frame->local;
         GF_VALIDATE_OR_GOTO (this->name, local, err);
 
-        delta = local->delta;
+        if (local->par_frame) {
+                par_local = local->par_frame->local;
+                GF_VALIDATE_OR_GOTO (this->name, par_local, err);
+        } else {
+                par_local = local;
+        }
+
+        delta = par_local->delta;
 
-        GF_VALIDATE_OR_GOTO (this->name, local->stub, err);
+        GF_VALIDATE_OR_GOTO (this->name, par_local->stub, err);
         /* Allow all the trusted clients
          * Don't block the gluster internal processes like rebalance, gsyncd,
          * self heal etc from the disk quotas.
@@ -1157,7 +1253,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
          */
         if (0 > frame->root->pid) {
                 ret = 0;
-                quota_link_count_decrement (local);
+                quota_link_count_decrement (frame);
                 goto done;
         }
 
@@ -1182,15 +1278,16 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
         do {
                 /* In a rename operation, enforce should be stopped at common
                    ancestor */
-                if (!gf_uuid_is_null (local->common_ancestor) &&
-                    !gf_uuid_compare (_inode->gfid, local->common_ancestor)) {
-                        quota_link_count_decrement (local);
+                if (!gf_uuid_is_null (par_local->common_ancestor) &&
+                    !gf_uuid_compare (_inode->gfid, par_local->common_ancestor)
+                   ) {
+                        quota_link_count_decrement (frame);
                         break;
                 }
 
                 ret = quota_check_object_limit (frame, ctx, priv, _inode, this,
                                                 &op_errno, just_validated,
-                                                local, &skip_check);
+                                                par_local, &skip_check);
                 if (skip_check == _gf_true)
                         goto done;
 
@@ -1203,7 +1300,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
 
                 ret = quota_check_size_limit (frame, ctx, priv, _inode, this,
                                               &op_errno, just_validated, delta,
-                                              local, &skip_check);
+                                              par_local, &skip_check);
                 if (skip_check == _gf_true)
                         goto done;
 
@@ -1215,7 +1312,7 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
                 }
 
                 if (__is_root_gfid (_inode->gfid)) {
-                        quota_link_count_decrement (local);
+                        quota_link_count_decrement (frame);
                         break;
                 }
 
@@ -1228,8 +1325,8 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
 
                 if (parent == NULL) {
                         ret = quota_build_ancestry (_inode,
-                                                    quota_check_limit_continuation,
-                                                    frame);
+                                                 quota_check_limit_continuation,
+                                                 frame);
                         if (ret < 0) {
                                 op_errno = -ret;
                                 goto err;
@@ -1247,7 +1344,6 @@ quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
                 ctx = (quota_inode_ctx_t *)(unsigned long)value;
         } while (1);
 
-
 done:
         if (_inode != NULL) {
                 inode_unref (_inode);
@@ -1256,12 +1352,66 @@ done:
         return 0;
 
 err:
-        quota_handle_validate_error (local, -1, op_errno);
+        quota_handle_validate_error (frame, -1, op_errno);
 
         inode_unref (_inode);
         return 0;
 }
 
+inode_t *
+do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
+                      quota_dentry_t *dentry)
+{
+        int32_t         ret        = -1;
+        inode_t        *parent     = NULL;
+        call_frame_t   *new_frame  = NULL;
+        quota_local_t  *local      = NULL;
+        quota_local_t  *new_local  = NULL;
+
+        local = frame->local;
+
+        parent = inode_parent (inode, dentry->par, dentry->name);
+        if (parent == NULL)
+                parent = inode_find (inode->table, dentry->par);
+        if (parent == NULL)
+                goto out;
+
+        new_frame = create_frame (this, this->ctx->pool);
+        if (new_frame == NULL)
+                goto out;
+
+        new_local = quota_local_new ();
+        if (new_local == NULL)
+                goto out;
+
+        new_frame->root->uid = new_frame->root->gid = 0;
+        new_frame->local = new_local;
+        new_local->par_frame = frame;
+
+        quota_check_limit (new_frame, parent, this, NULL, NULL);
+
+        ret = 0;
+out:
+        if (ret < 0) {
+                if (parent) {
+                        /* Caller should decrement link_count, in case parent is
+                         * NULL
+                         */
+                        quota_handle_validate_error (frame, -1, ENOMEM);
+                }
+
+                if (new_frame) {
+                        new_frame->local = NULL;
+                        STACK_DESTROY (new_frame->root);
+                }
+
+                if (new_local)
+                        quota_local_cleanup (new_local);
+        }
+
+        return parent;
+}
+
 static inline int
 quota_get_limits (xlator_t *this, dict_t *dict, int64_t *hard_lim,
                   int64_t *soft_lim, int64_t *object_hard_limit,
@@ -1440,7 +1590,7 @@ out:
         if (this_inode)
                 inode_unref (this_inode);
 
-        quota_local_cleanup (this, local);
+        quota_local_cleanup (local);
 
         return 0;
 }
@@ -1632,15 +1782,17 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
               struct iovec *vector, int32_t count, off_t off,
               uint32_t flags, struct iobref *iobref, dict_t *xdata)
 {
-        quota_priv_t      *priv    = NULL;
-        int32_t            ret     = -1, op_errno = EINVAL;
-        int32_t            parents = 0;
-        uint64_t           size    = 0;
-        quota_local_t     *local   = NULL;
-        quota_inode_ctx_t *ctx     = NULL;
-        quota_dentry_t    *dentry  = NULL, *tmp = NULL;
-        call_stub_t       *stub    = NULL;
-        struct list_head   head    = {0, };
+        quota_priv_t      *priv       = NULL;
+        int32_t            ret        = -1, op_errno = EINVAL;
+        int32_t            parents    = 0;
+        int32_t            fail_count = 0;
+        uint64_t           size       = 0;
+        quota_local_t     *local      = NULL;
+        quota_inode_ctx_t *ctx        = NULL;
+        quota_dentry_t    *dentry     = NULL, *tmp = NULL;
+        call_stub_t       *stub       = NULL;
+        struct list_head   head       = {0, };
+        inode_t           *par_inode  = NULL;
 
         priv = this->private;
 
@@ -1679,18 +1831,8 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
 
         size = iov_length (vector, count);
-        if (ctx != NULL) {
-                LOCK (&ctx->lock);
-                {
-                        list_for_each_entry (dentry, &ctx->parents, next) {
-                                tmp = __quota_dentry_new (NULL, dentry->name,
-                                                          dentry->par);
-                                list_add_tail (&tmp->next, &head);
-                                parents++;
-                        }
-                }
-                UNLOCK (&ctx->lock);
-        }
+
+        parents = quota_add_parents_from_ctx (ctx, &head);
 
         LOCK (&local->lock);
         {
@@ -1707,10 +1849,33 @@ quota_writev (call_frame_t *frame, xlator_t *this, fd_t *fd,
                 quota_check_limit (frame, fd->inode, this, NULL, NULL);
         } else {
                 list_for_each_entry_safe (dentry, tmp, &head, next) {
-                        quota_check_limit (frame, fd->inode, this, dentry->name,
-                                           dentry->par);
+                        par_inode = do_quota_check_limit (frame, fd->inode,
+                                                          this, dentry);
+                        if (par_inode == NULL) {
+                                /* remove stale entry from inode ctx */
+                                quota_dentry_del (ctx, dentry->name,
+                                                  dentry->par);
+                                parents--;
+                                fail_count++;
+                        } else {
+                                inode_unref (par_inode);
+                        }
                         __quota_dentry_free (dentry);
                 }
+
+                if (parents == 0) {
+                        LOCK (&local->lock);
+                        {
+                                local->link_count++;
+                        }
+                        UNLOCK (&local->lock);
+                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
+                }
+
+                while (fail_count != 0) {
+                        quota_link_count_decrement (frame);
+                        fail_count--;
+                }
         }
 
         return 0;
@@ -1986,8 +2151,6 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         quota_local_t     *local = NULL;
         quota_inode_ctx_t *ctx   = NULL;
         uint64_t           value = 0;
-        quota_dentry_t    *dentry = NULL;
-        quota_dentry_t    *old_dentry = NULL;
 
         if (op_ret < 0) {
                 goto out;
@@ -2006,20 +2169,7 @@ quota_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                 goto out;
         }
 
-        LOCK (&ctx->lock);
-        {
-                list_for_each_entry (dentry, &ctx->parents, next) {
-                        if ((strcmp (dentry->name, local->loc.name) == 0) &&
-                            (gf_uuid_compare (local->loc.parent->gfid,
-                                           dentry->par) == 0)) {
-                                old_dentry = dentry;
-                                break;
-                        }
-                }
-                if (old_dentry)
-                        __quota_dentry_free (old_dentry);
-        }
-        UNLOCK (&ctx->lock);
+        quota_dentry_del (ctx, local->loc.name, local->loc.parent->gfid);
 
 out:
         QUOTA_STACK_UNWIND (unlink, frame, op_ret, op_errno, preparent,
@@ -2267,7 +2417,7 @@ off:
                          FIRST_CHILD(this)->fops->link, &(local->oldloc),
                          &(local->newloc), local->xdata);
 
-        quota_local_cleanup (this, local);
+        quota_local_cleanup (local);
         return;
 }
 
@@ -2526,7 +2676,7 @@ quota_rename_get_size_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         return 0;
 
 out:
-        quota_handle_validate_error (local, -1, op_errno);
+        quota_handle_validate_error (frame, -1, op_errno);
         return 0;
 }
 
@@ -4230,7 +4380,7 @@ quota_statfs_validate_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         UNLOCK (&ctx->lock);
 
 resume:
-        quota_link_count_decrement (local);
+        quota_link_count_decrement (frame);
         return 0;
 }
 
@@ -4260,7 +4410,7 @@ quota_get_limit_dir_continuation (struct list_head *parents, inode_t *inode,
                         op_errno = EIO;
                 }
 
-                quota_handle_validate_error (local, -1, op_errno);
+                quota_handle_validate_error (frame, -1, op_errno);
                 goto out;
         }
 
@@ -4290,7 +4440,7 @@ quota_statfs_continue (call_frame_t *frame, xlator_t *this, inode_t *inode)
         ret = quota_validate (frame, local->inode, this,
                               quota_statfs_validate_cbk);
         if (0 > ret)
-                quota_handle_validate_error (local, -1, -ret);
+                quota_handle_validate_error (frame, -1, -ret);
 }
 
 void
@@ -4642,13 +4792,17 @@ int32_t
 quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
 		off_t offset, size_t len, dict_t *xdata)
 {
-        int32_t            ret     = -1, op_errno = EINVAL;
-        int32_t            parents = 0;
-        quota_local_t     *local   = NULL;
-        quota_inode_ctx_t *ctx     = NULL;
-        quota_priv_t      *priv    = NULL;
-        quota_dentry_t    *dentry  = NULL;
-        call_stub_t       *stub    = NULL;
+        int32_t            ret         = -1, op_errno = EINVAL;
+        int32_t            parents     = 0;
+        int32_t            fail_count  = 0;
+        quota_local_t     *local       = NULL;
+        quota_inode_ctx_t *ctx         = NULL;
+        quota_priv_t      *priv        = NULL;
+        quota_dentry_t    *dentry      = NULL;
+        quota_dentry_t    *tmp         = NULL;
+        call_stub_t       *stub        = NULL;
+        struct list_head   head        = {0, };
+        inode_t           *par_inode   = NULL;
 
         priv = this->private;
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
@@ -4685,15 +4839,7 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
         priv = this->private;
         GF_VALIDATE_OR_GOTO (this->name, priv, unwind);
 
-        if (ctx != NULL) {
-                LOCK (&ctx->lock);
-                {
-                        list_for_each_entry (dentry, &ctx->parents, next) {
-                                parents++;
-                        }
-                }
-                UNLOCK (&ctx->lock);
-        }
+        parents = quota_add_parents_from_ctx (ctx, &head);
 
 	/*
 	 * Note that by using len as the delta we're assuming the range from
@@ -4708,9 +4854,33 @@ quota_fallocate(call_frame_t *frame, xlator_t *this, fd_t *fd, int32_t mode,
                 local->link_count = 1;
                 quota_check_limit (frame, fd->inode, this, NULL, NULL);
         } else {
-                list_for_each_entry (dentry, &ctx->parents, next) {
-                        quota_check_limit (frame, fd->inode, this, dentry->name,
-                                           dentry->par);
+                list_for_each_entry_safe (dentry, tmp, &head, next) {
+                        par_inode = do_quota_check_limit (frame, fd->inode,
+                                                          this, dentry);
+                        if (par_inode == NULL) {
+                                /* remove stale entry from inode_ctx */
+                                quota_dentry_del (ctx, dentry->name,
+                                                  dentry->par);
+                                parents--;
+                                fail_count++;
+                        } else {
+                                inode_unref (par_inode);
+                        }
+                        __quota_dentry_free (dentry);
+                }
+
+                if (parents == 0) {
+                        LOCK (&local->lock);
+                        {
+                                local->link_count++;
+                        }
+                        UNLOCK (&local->lock);
+                        quota_check_limit (frame, fd->inode, this, NULL, NULL);
+                }
+
+                while (fail_count != 0) {
+                        quota_link_count_decrement (frame);
+                        fail_count--;
                 }
         }
 
diff --git a/xlators/features/quota/src/quota.h b/xlators/features/quota/src/quota.h
index 56876f0..0f8cecd 100644
--- a/xlators/features/quota/src/quota.h
+++ b/xlators/features/quota/src/quota.h
@@ -102,7 +102,7 @@
                 STACK_WIND_TAIL (frame, params);                        \
                                                                         \
                 if (_local)                                             \
-                        quota_local_cleanup (_this, _local);            \
+                        quota_local_cleanup (_local);                   \
         } while (0)
 
 #define QUOTA_STACK_UNWIND(fop, frame, params...)                       \
@@ -115,7 +115,7 @@
                         frame->local = NULL;                            \
                 }                                                       \
                 STACK_UNWIND_STRICT (fop, frame, params);               \
-                quota_local_cleanup (_this, _local);                    \
+                quota_local_cleanup (_local);                           \
         } while (0)
 
 #define QUOTA_FREE_CONTRIBUTION_NODE(_contribution)     \
@@ -193,7 +193,6 @@ typedef void
 
 struct quota_local {
         gf_lock_t               lock;
-        uint32_t                validate_count;
         uint32_t                link_count;
         loc_t                   loc;
         loc_t                   oldloc;
@@ -219,6 +218,7 @@ struct quota_local {
         dict_t                 *validate_xdata;
         int32_t                 quotad_conn_retry;
         xlator_t               *this;
+        call_frame_t           *par_frame;
 };
 typedef struct quota_local      quota_local_t;
 
@@ -266,6 +266,9 @@ int32_t
 quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
                    char *name, uuid_t par);
 
+inode_t *
+do_quota_check_limit (call_frame_t *frame, inode_t *inode, xlator_t *this,
+                      quota_dentry_t *dentry);
 int
 quota_fill_inodectx (xlator_t *this, inode_t *inode, dict_t *dict,
                      loc_t *loc, struct iatt *buf, int32_t *op_errno);
-- 
1.7.1