Blob Blame History Raw
From 411aa49510fa41ba982e03544d367430cbfc7513 Mon Sep 17 00:00:00 2001
From: vmallika <vmallika@redhat.com>
Date: Sun, 12 Jul 2015 08:49:49 +0530
Subject: [PATCH 280/304] quota/marker: fix inode quota with rename

This is a backport of http://review.gluster.org/11578

There are three problems with marker-rename which
is fixed in this patch

Problem 1)
1) mq_reduce_parent_size is not handling inode-quota contribution
2) When dest files exists and IO is happening
   Now renaming will overwrite existing file
   mq_reduce_parent_size called on dest file
   with saved contribution, this can be
   a problem is IO is still happening
   contribution might have changed

Problem 2)
There is a small race between rename and in-progress write
Consider below scenario
1) rename FOP invoked on file 'x'
2) write is still in progress for file 'x'
3) rename takes a lock on old-parent
4) write-update txn blocked on old-parent to acquire lock
5) in rename_cbk, contri xattrs are removed and contribution is deleted and
   lock is released
6) now write-update txn gets the lock and updates the wrong parent
   as it was holding lock on old parent
   so validate parent once the lock is acquired

Problem 3)
when a rename operation is performed, a lock is
held on old parent. This lock is release before
unwinding the rename operation.
This can be a problem if there are in-progress
writes happening during rename, where update txn
can take a lock and update the old parent
as inode table is not updated with new parent

> Change-Id: Ic3316097c001c33533f98592e8fcf234b1ee2aa2
> BUG: 1240991
> Signed-off-by: vmallika <vmallika@redhat.com>
> Reviewed-on: http://review.gluster.org/11578
> Tested-by: Gluster Build System <jenkins@build.gluster.com>
> Tested-by: NetBSD Build System <jenkins@build.gluster.org>
> Reviewed-by: Raghavendra G <rgowdapp@redhat.com>

Change-Id: I4753f2380fc9ef03a1ba612ec4644921478eae0c
BUG: 1240918
Signed-off-by: vmallika <vmallika@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/55711
Reviewed-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
Tested-by: Raghavendra Gowdappa <rgowdapp@redhat.com>
---
 tests/bugs/quota/bug-1235182.t                    |   17 +-
 tests/bugs/quota/bug-1240991.t                    |   43 ++
 xlators/features/marker/src/marker-quota-helper.c |    5 +
 xlators/features/marker/src/marker-quota.c        |  549 ++++++++++-----------
 xlators/features/marker/src/marker-quota.h        |    5 +-
 xlators/features/marker/src/marker.c              |  445 ++++++++---------
 xlators/features/marker/src/marker.h              |    4 +-
 7 files changed, 536 insertions(+), 532 deletions(-)
 create mode 100644 tests/bugs/quota/bug-1240991.t

diff --git a/tests/bugs/quota/bug-1235182.t b/tests/bugs/quota/bug-1235182.t
index e28b557..2f963e6 100644
--- a/tests/bugs/quota/bug-1235182.t
+++ b/tests/bugs/quota/bug-1235182.t
@@ -28,13 +28,22 @@ TEST $CLI volume quota $V0 limit-usage / 1GB
 TEST $CLI volume quota $V0 hard-timeout 0
 TEST $CLI volume quota $V0 soft-timeout 0
 
-$QDD $M0/f1 256 400&
+TEST mkdir $M0/1
+$QDD $M0/1/f1 256 400&
 PID=$!
-EXPECT_WITHIN $PROCESS_UP_TIMEOUT "0" STAT $M0/f1
-TESTS_EXPECTED_IN_LOOP=50
+EXPECT_WITHIN $PROCESS_UP_TIMEOUT "0" STAT $M0/1/f1
+TESTS_EXPECTED_IN_LOOP=150
 for i in {1..50}; do
         ii=`expr $i + 1`;
-        TEST_IN_LOOP mv $M0/f$i $M0/f$ii;
+        touch $M0/$i/f$ii
+        echo Hello > $M0/$i/f$ii
+
+        #rename within same dir
+        TEST_IN_LOOP mv -f $M0/$i/f$i $M0/$i/f$ii;
+
+        #rename to different dir
+        TEST_IN_LOOP mkdir $M0/$ii
+        TEST_IN_LOOP mv -f $M0/$i/f$ii $M0/$ii/f$ii;
 done
 
 echo "Wait for process with pid $PID to complete"
diff --git a/tests/bugs/quota/bug-1240991.t b/tests/bugs/quota/bug-1240991.t
new file mode 100644
index 0000000..2a1a6d1
--- /dev/null
+++ b/tests/bugs/quota/bug-1240991.t
@@ -0,0 +1,43 @@
+#!/bin/bash
+
+# This regression test tries to ensure renaming a directory with content, and
+# no limit set, is accounted properly, when moved into a directory with quota
+# limit set.
+
+. $(dirname $0)/../../include.rc
+. $(dirname $0)/../../volume.rc
+
+cleanup;
+
+TEST glusterd
+TEST pidof glusterd;
+TEST $CLI volume info;
+
+TEST $CLI volume create $V0 $H0:$B0/${V0}
+TEST $CLI volume start $V0;
+
+TEST $CLI volume quota $V0 enable;
+
+TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0;
+
+TEST $CLI volume quota $V0 hard-timeout 0
+TEST $CLI volume quota $V0 soft-timeout 0
+
+TEST mkdir -p $M0/dir/dir1
+TEST $CLI volume quota $V0 limit-objects /dir 20
+
+TEST mkdir $M0/dir/dir1/d{1..5}
+TEST touch $M0/dir/dir1/f{1..5}
+TEST mv $M0/dir/dir1 $M0/dir/dir2
+
+#Number of files under /dir is 5
+EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "5" quota_object_list_field "/dir" 4
+
+#Number of directories under /dir is 7
+EXPECT_WITHIN $MARKER_UPDATE_TIMEOUT "7" quota_object_list_field "/dir" 5
+
+TEST $CLI volume stop $V0
+TEST $CLI volume delete $V0
+EXPECT "1" get_aux
+
+cleanup;
diff --git a/xlators/features/marker/src/marker-quota-helper.c b/xlators/features/marker/src/marker-quota-helper.c
index a103bb9..df76f12 100644
--- a/xlators/features/marker/src/marker-quota-helper.c
+++ b/xlators/features/marker/src/marker-quota-helper.c
@@ -174,6 +174,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx)
 
         LOCK (&ctx->lock);
         {
+                if (list_empty (&ctx->contribution_head))
+                        goto unlock;
+
                 list_for_each_entry (temp, &ctx->contribution_head,
                                      contri_list) {
                         if (gf_uuid_compare (temp->gfid, inode->gfid) == 0) {
@@ -183,7 +186,9 @@ mq_get_contribution_node (inode_t *inode, quota_inode_ctx_t *ctx)
                         }
                 }
         }
+unlock:
         UNLOCK (&ctx->lock);
+
 out:
         return contri;
 }
diff --git a/xlators/features/marker/src/marker-quota.c b/xlators/features/marker/src/marker-quota.c
index 6ccd974..bb9dfa8 100644
--- a/xlators/features/marker/src/marker-quota.c
+++ b/xlators/features/marker/src/marker-quota.c
@@ -2592,15 +2592,11 @@ out:
 
 int32_t
 mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
-                  inode_contribution_t *contri, quota_meta_t *delta,
-                  gf_boolean_t remove_xattr)
+                  inode_contribution_t *contri, quota_meta_t *delta)
 {
         int32_t              ret                         = -1;
         char                 contri_key[CONTRI_KEY_MAX]  = {0, };
 
-        if (remove_xattr == _gf_false)
-                goto done;
-
         GET_CONTRI_KEY (contri_key, contri->gfid, ret);
         if (ret < 0) {
                 gf_log (this->name, GF_LOG_ERROR, "get contri_key "
@@ -2627,7 +2623,6 @@ mq_remove_contri (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
                 }
         }
 
-done:
         LOCK (&contri->lock);
         {
                 contri->contribution += delta->size;
@@ -2781,8 +2776,8 @@ mq_synctask_cleanup (int ret, call_frame_t *frame, void *opaque)
 }
 
 int
-mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
-             int64_t contri)
+mq_synctask1 (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn,
+              loc_t *loc, quota_meta_t *contri)
 {
         int32_t              ret         = -1;
         quota_synctask_t    *args        = NULL;
@@ -2798,7 +2793,14 @@ mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc,
 
         args->this = this;
         loc_copy (&args->loc, loc);
-        args->contri = contri;
+
+        if (contri) {
+                args->contri = *contri;
+        } else {
+                args->contri.size = -1;
+                args->contri.file_count = -1;
+                args->contri.dir_count = -1;
+        }
 
         if (spawn) {
                 ret = synctask_new1 (this->ctx->env, 1024 * 16, task,
@@ -2817,6 +2819,12 @@ out:
         return ret;
 }
 
+int
+mq_synctask (xlator_t *this, synctask_fn_t task, gf_boolean_t spawn, loc_t *loc)
+{
+        return mq_synctask1 (this, task, spawn, loc, NULL);
+}
+
 int32_t
 mq_prevalidate_txn (xlator_t *this, loc_t *origin_loc, loc_t *loc,
                     quota_inode_ctx_t **ctx)
@@ -2861,136 +2869,6 @@ out:
 }
 
 int
-mq_start_quota_txn_v2 (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
-                       inode_contribution_t *contri)
-{
-        int32_t            ret        = -1;
-        loc_t              child_loc  = {0,};
-        loc_t              parent_loc = {0,};
-        gf_boolean_t       locked     = _gf_false;
-        gf_boolean_t       dirty      = _gf_false;
-        gf_boolean_t       status     = _gf_false;
-        quota_meta_t       delta      = {0, };
-
-        GF_VALIDATE_OR_GOTO ("marker", contri, out);
-        GF_REF_GET (contri);
-
-        GF_VALIDATE_OR_GOTO ("marker", loc, out);
-        GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
-        GF_VALIDATE_OR_GOTO ("marker", ctx, out);
-
-        ret = mq_loc_copy (&child_loc, loc);
-        if (ret < 0) {
-                gf_log (this->name, GF_LOG_ERROR, "loc copy failed");
-                goto out;
-        }
-
-        while (!__is_root_gfid (child_loc.gfid)) {
-                /* To improve performance, abort current transaction
-                 * if one is already in progress for same inode
-                 */
-                if (status == _gf_true) {
-                        /* status will alreday set before txn start,
-                         * so it should not be set in first
-                         * loop iteration
-                         */
-                        ret = mq_test_and_set_ctx_updation_status (ctx,
-                                                                   &status);
-                        if (ret < 0 || status == _gf_true)
-                                goto out;
-                }
-
-                ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc);
-                if (ret < 0) {
-                        gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
-                        goto out;
-                }
-
-                ret = mq_lock (this, &parent_loc, F_WRLCK);
-                if (ret < 0)
-                        goto out;
-                locked = _gf_true;
-
-                mq_set_ctx_updation_status (ctx, _gf_false);
-                status = _gf_true;
-
-                ret = mq_get_delta (this, &child_loc, &delta, ctx, contri);
-                if (ret < 0)
-                        goto out;
-
-                if (quota_meta_is_null (&delta))
-                        goto out;
-
-                ret = mq_mark_dirty (this, &parent_loc, 1);
-                if (ret < 0)
-                        goto out;
-                dirty = _gf_true;
-
-                ret = mq_update_contri (this, &child_loc, contri, &delta);
-                if (ret < 0)
-                        goto out;
-
-                ret = mq_update_size (this, &parent_loc, &delta);
-                if (ret < 0) {
-                        gf_log (this->name, GF_LOG_DEBUG, "rollback "
-                                "contri updation");
-                        mq_sub_meta (&delta, NULL);
-                        mq_update_contri (this, &child_loc, contri, &delta);
-                        goto out;
-                }
-
-                ret = mq_mark_dirty (this, &parent_loc, 0);
-                dirty = _gf_false;
-
-                ret = mq_lock (this, &parent_loc, F_UNLCK);
-                locked = _gf_false;
-
-                if (__is_root_gfid (parent_loc.gfid))
-                        break;
-
-                /* Repeate above steps upwards till the root */
-                loc_wipe (&child_loc);
-                ret = mq_loc_copy (&child_loc, &parent_loc);
-                if (ret < 0)
-                        goto out;
-                loc_wipe (&parent_loc);
-
-                ret = mq_inode_ctx_get (child_loc.inode, this, &ctx);
-                if (ret < 0)
-                        goto out;
-
-                if (list_empty (&ctx->contribution_head)) {
-                        gf_log (this->name, GF_LOG_ERROR,
-                                "contribution node list is empty (%s)",
-                                uuid_utoa(child_loc.inode->gfid));
-                        ret = -1;
-                        goto out;
-                }
-
-                GF_REF_PUT (contri);
-                contri = mq_get_contribution_node (child_loc.parent, ctx);
-                GF_ASSERT (contri != NULL);
-        }
-
-out:
-        if (ret >= 0 && dirty)
-                ret = mq_mark_dirty (this, &parent_loc, 0);
-
-        if (locked)
-                ret = mq_lock (this, &parent_loc, F_UNLCK);
-
-        if (ctx && status == _gf_false)
-                mq_set_ctx_updation_status (ctx, _gf_false);
-
-        loc_wipe (&child_loc);
-        loc_wipe (&parent_loc);
-        if (contri)
-                GF_REF_PUT (contri);
-
-        return 0;
-}
-
-int
 mq_create_xattrs_task (void *opaque)
 {
         int32_t                  ret        = -1;
@@ -3067,7 +2945,7 @@ _mq_create_xattrs_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn)
         if (ret < 0 || status == _gf_true)
                 goto out;
 
-        ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc, 0);
+        ret = mq_synctask (this, mq_create_xattrs_task, spawn, &loc);
 out:
         if (ret < 0 && status == _gf_false)
                 mq_set_ctx_create_status (ctx, _gf_false);
@@ -3109,13 +2987,13 @@ mq_reduce_parent_size_task (void *opaque)
         quota_inode_ctx_t       *ctx           = NULL;
         inode_contribution_t    *contribution  = NULL;
         quota_meta_t             delta         = {0, };
+        quota_meta_t             contri        = {0, };
         loc_t                    parent_loc    = {0,};
         gf_boolean_t             locked        = _gf_false;
         gf_boolean_t             dirty         = _gf_false;
         quota_synctask_t        *args          = NULL;
         xlator_t                *this          = NULL;
         loc_t                   *loc           = NULL;
-        int64_t                  contri        = 0;
         gf_boolean_t             remove_xattr  = _gf_true;
 
         GF_ASSERT (opaque);
@@ -3133,26 +3011,6 @@ mq_reduce_parent_size_task (void *opaque)
                 goto out;
         }
 
-        contribution = mq_get_contribution_node (loc->parent, ctx);
-        if (contribution == NULL) {
-                ret = -1;
-                gf_log_callingfn (this->name, GF_LOG_WARNING,
-                                  "contribution for the node %s is NULL",
-                                  loc->path);
-                goto out;
-        }
-
-        if (contri >= 0) {
-                /* contri paramater is supplied only for rename operation.
-                 * remove xattr is alreday performed, we need to skip
-                 * removexattr for rename operation
-                 */
-                remove_xattr = _gf_false;
-                delta.size = contri;
-                delta.file_count = 1;
-                delta.dir_count = 0;
-        }
-
         ret = mq_inode_loc_fill (NULL, loc->parent, &parent_loc);
         if (ret < 0) {
                 gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
@@ -3164,7 +3022,26 @@ mq_reduce_parent_size_task (void *opaque)
                 goto out;
         locked = _gf_true;
 
-        if (contri < 0) {
+        if (contri.size >= 0) {
+                /* contri paramater is supplied only for rename operation.
+                 * remove xattr is alreday performed, we need to skip
+                 * removexattr for rename operation
+                 */
+                remove_xattr = _gf_false;
+                delta.size = contri.size;
+                delta.file_count = contri.file_count;
+                delta.dir_count = contri.dir_count;
+        } else {
+                remove_xattr = _gf_true;
+                contribution = mq_get_contribution_node (loc->parent, ctx);
+                if (contribution == NULL) {
+                        ret = -1;
+                        gf_log (this->name, GF_LOG_DEBUG,
+                                "contribution for the node %s is NULL",
+                                loc->path);
+                        goto out;
+                }
+
                 LOCK (&contribution->lock);
                 {
                         delta.size = contribution->contribution;
@@ -3174,26 +3051,6 @@ mq_reduce_parent_size_task (void *opaque)
                 UNLOCK (&contribution->lock);
         }
 
-        /* TODO: Handle handlinks with better approach
-           Iterating dentry_list without a lock is not a good idea
-        if (loc->inode->ia_type != IA_IFDIR) {
-                list_for_each_entry (dentry, &inode->dentry_list, inode_list) {
-                        if (loc->parent == dentry->parent) {
-                                * If the file has another link within the same
-                                * directory, we should not be reducing the size
-                                * of parent
-                                *
-                                delta = 0;
-                                idelta = 0;
-                                break;
-                        }
-                }
-        }
-        */
-
-        if (quota_meta_is_null (&delta))
-                goto out;
-
         ret = mq_mark_dirty (this, &parent_loc, 1);
         if (ret < 0)
                 goto out;
@@ -3201,9 +3058,13 @@ mq_reduce_parent_size_task (void *opaque)
 
         mq_sub_meta (&delta, NULL);
 
-        ret = mq_remove_contri (this, loc, ctx, contribution, &delta,
-                                remove_xattr);
-        if (ret < 0)
+        if (remove_xattr) {
+                ret = mq_remove_contri (this, loc, ctx, contribution, &delta);
+                if (ret < 0)
+                        goto out;
+        }
+
+        if (quota_meta_is_null (&delta))
                 goto out;
 
         ret = mq_update_size (this, &parent_loc, &delta);
@@ -3229,7 +3090,8 @@ out:
 }
 
 int32_t
-mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri)
+mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc,
+                           quota_meta_t *contri)
 {
         int32_t                  ret           = -1;
         loc_t                    loc           = {0, };
@@ -3252,8 +3114,8 @@ mq_reduce_parent_size_txn (xlator_t *this, loc_t *origin_loc, int64_t contri)
                 goto out;
         }
 
-        ret = mq_synctask (this, mq_reduce_parent_size_task, _gf_true, &loc,
-                           contri);
+        ret = mq_synctask1 (this, mq_reduce_parent_size_task, _gf_true, &loc,
+                            contri);
 out:
         loc_wipe (&loc);
         return ret;
@@ -3262,74 +3124,204 @@ out:
 int
 mq_initiate_quota_task (void *opaque)
 {
-        int32_t                 ret          = -1;
-        quota_inode_ctx_t      *ctx          = NULL;
-        inode_contribution_t   *contribution = NULL;
-        quota_synctask_t       *args         = NULL;
-        xlator_t               *this         = NULL;
-        loc_t                  *loc          = NULL;
-
-        GF_ASSERT (opaque);
+        int32_t                ret        = -1;
+        loc_t                  child_loc  = {0,};
+        loc_t                  parent_loc = {0,};
+        gf_boolean_t           locked     = _gf_false;
+        gf_boolean_t           dirty      = _gf_false;
+        gf_boolean_t           status     = _gf_false;
+        quota_meta_t           delta      = {0, };
+        quota_synctask_t      *args       = NULL;
+        xlator_t              *this       = NULL;
+        loc_t                 *loc        = NULL;
+        inode_contribution_t  *contri     = NULL;
+        quota_inode_ctx_t     *ctx        = NULL;
+        inode_t               *tmp_parent = NULL;
+
+        GF_VALIDATE_OR_GOTO ("marker", opaque, out);
 
         args = (quota_synctask_t *) opaque;
         loc = &args->loc;
         this = args->this;
+
+        GF_VALIDATE_OR_GOTO ("marker", this, out);
         THIS = this;
 
-        ret = mq_inode_ctx_get (loc->inode, this, &ctx);
-        if (ret == -1) {
-                gf_log (this->name, GF_LOG_WARNING,
-                        "inode ctx get failed, aborting quota txn");
+        GF_VALIDATE_OR_GOTO (this->name, loc, out);
+        GF_VALIDATE_OR_GOTO (this->name, loc->inode, out);
+
+        ret = mq_loc_copy (&child_loc, loc);
+        if (ret < 0) {
+                gf_log (this->name, GF_LOG_ERROR, "loc copy failed");
                 goto out;
         }
 
-        /* Create the contribution node if its absent. Is it right to
-           assume that if the contribution node is not there, then
-           create one and proceed instead of returning?
-           Reason for this assumption is for hard links. Suppose
-           hard link for a file f1 present in a directory d1 is
-           created in the directory d2 (as f2). Now, since d2's
-           contribution is not there in f1's inode ctx, d2's
-           contribution xattr wont be created and will create problems
-           for quota operations.
-        */
-        contribution = mq_get_contribution_node (loc->parent, ctx);
-        if (!contribution) {
-                if (!loc_is_root(loc))
-                        gf_log_callingfn (this->name, GF_LOG_TRACE,
-                                          "contribution node for the "
-                                          "path (%s) with parent (%s) "
-                                          "not found", loc->path,
-                                          loc->parent ?
-                                          uuid_utoa (loc->parent->gfid) :
-                                          NULL);
+        while (!__is_root_gfid (child_loc.gfid)) {
 
-                contribution = mq_add_new_contribution_node (this, ctx, loc);
-                if (!contribution) {
-                        if (!loc_is_root(loc))
-                                gf_log_callingfn (this->name, GF_LOG_WARNING,
-                                                  "could not allocate "
-                                                  " contribution node for (%s) "
-                                                  "parent: (%s)", loc->path,
-                                                  loc->parent ?
-                                                 uuid_utoa (loc->parent->gfid) :
-                                                  NULL);
-                        ret = -1;
+                ret = mq_inode_ctx_get (child_loc.inode, this, &ctx);
+                if (ret < 0) {
+                        gf_log (this->name, GF_LOG_WARNING,
+                                "inode ctx get failed for %s, "
+                                "aborting update txn", child_loc.path);
                         goto out;
                 }
-        }
 
-        mq_start_quota_txn_v2 (this, loc, ctx, contribution);
+                /* To improve performance, abort current transaction
+                 * if one is already in progress for same inode
+                 */
+                if (status == _gf_true) {
+                        /* status will alreday set before txn start,
+                         * so it should not be set in first
+                         * loop iteration
+                         */
+                        ret = mq_test_and_set_ctx_updation_status (ctx,
+                                                                   &status);
+                        if (ret < 0 || status == _gf_true)
+                                goto out;
+                }
+
+                ret = mq_inode_loc_fill (NULL, child_loc.parent, &parent_loc);
+                if (ret < 0) {
+                        gf_log (this->name, GF_LOG_ERROR, "loc fill failed");
+                        goto out;
+                }
+
+                ret = mq_lock (this, &parent_loc, F_WRLCK);
+                if (ret < 0)
+                        goto out;
+                locked = _gf_true;
+
+                mq_set_ctx_updation_status (ctx, _gf_false);
+                status = _gf_true;
+
+                /* Contribution node can be NULL in below scenarios and
+                   create if needed:
+
+                   Scenario 1)
+                   In this case create a new contribution node
+                   Suppose hard link for a file f1 present in a directory d1 is
+                   created in the directory d2 (as f2). Now, since d2's
+                   contribution is not there in f1's inode ctx, d2's
+                   contribution xattr wont be created and will create problems
+                   for quota operations.
+
+                   Don't create contribution if parent has been changed after
+                   taking a lock, this can happen when rename is performed
+                   and writes is still in-progress for the same file
+
+                   Scenario 2)
+                   When a rename operation is performed, contribution node
+                   for olp path will be removed.
+
+                   Create contribution node only if oldparent is same as
+                   newparent.
+                   Consider below example
+                   1) rename FOP invoked on file 'x'
+                   2) write is still in progress for file 'x'
+                   3) rename takes a lock on old-parent
+                   4) write-update txn blocked on old-parent to acquire lock
+                   5) in rename_cbk, contri xattrs are removed and contribution
+                      is deleted and lock is released
+                   6) now write-update txn gets the lock and updates the
+                      wrong parent as it was holding lock on old parent
+                      so validate parent once the lock is acquired
+
+                     For more information on thsi problem, please see
+                     doc for marker_rename in file marker.c
+                */
+                contri = mq_get_contribution_node (child_loc.parent, ctx);
+                if (contri == NULL) {
+                        tmp_parent = inode_parent (child_loc.inode, 0, NULL);
+                        if (tmp_parent == NULL) {
+                                ret = -1;
+                                goto out;
+                        }
+                        if (gf_uuid_compare(tmp_parent->gfid,
+                                            parent_loc.gfid)) {
+                                /* abort txn if parent has changed */
+                                ret = 0;
+                                goto out;
+                        }
+
+                        inode_unref (tmp_parent);
+                        tmp_parent = NULL;
+
+                        contri = mq_add_new_contribution_node (this, ctx,
+                                                               &child_loc);
+                        if (contri == NULL) {
+                                gf_log (this->name, GF_LOG_ERROR, "Failed to "
+                                        "create contribution node for %s, "
+                                        "abort update txn", child_loc.path);
+                                ret = -1;
+                                goto out;
+                        }
+                }
+
+                ret = mq_get_delta (this, &child_loc, &delta, ctx, contri);
+                if (ret < 0)
+                        goto out;
+
+                if (quota_meta_is_null (&delta))
+                        goto out;
+
+                ret = mq_mark_dirty (this, &parent_loc, 1);
+                if (ret < 0)
+                        goto out;
+                dirty = _gf_true;
+
+                ret = mq_update_contri (this, &child_loc, contri, &delta);
+                if (ret < 0)
+                        goto out;
+
+                ret = mq_update_size (this, &parent_loc, &delta);
+                if (ret < 0) {
+                        gf_log (this->name, GF_LOG_DEBUG, "rollback "
+                                "contri updation");
+                        mq_sub_meta (&delta, NULL);
+                        mq_update_contri (this, &child_loc, contri, &delta);
+                        goto out;
+                }
+
+                ret = mq_mark_dirty (this, &parent_loc, 0);
+                dirty = _gf_false;
+
+                ret = mq_lock (this, &parent_loc, F_UNLCK);
+                locked = _gf_false;
+
+                if (__is_root_gfid (parent_loc.gfid))
+                        break;
+
+                /* Repeate above steps upwards till the root */
+                loc_wipe (&child_loc);
+                ret = mq_loc_copy (&child_loc, &parent_loc);
+                if (ret < 0)
+                        goto out;
+
+                loc_wipe (&parent_loc);
+                GF_REF_PUT (contri);
+                contri = NULL;
+        }
 
-        ret = 0;
 out:
-        if (contribution)
-                GF_REF_PUT (contribution);
+        if (ret >= 0 && dirty)
+                ret = mq_mark_dirty (this, &parent_loc, 0);
 
-        if (ctx && ret < 0)
+        if (locked)
+                ret = mq_lock (this, &parent_loc, F_UNLCK);
+
+        if (ctx && status == _gf_false)
                 mq_set_ctx_updation_status (ctx, _gf_false);
 
-        return ret;
+        loc_wipe (&child_loc);
+        loc_wipe (&parent_loc);
+
+        if (tmp_parent)
+                inode_unref (tmp_parent);
+
+        if (contri)
+                GF_REF_PUT (contri);
+
+        return 0;
 }
 
 int
@@ -3359,7 +3351,7 @@ _mq_initiate_quota_txn (xlator_t *this, loc_t *origin_loc, gf_boolean_t spawn)
         if (ret < 0 || status == _gf_true)
                 goto out;
 
-        ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc, 0);
+        ret = mq_synctask (this, mq_initiate_quota_task, spawn, &loc);
 
 out:
         if (ret < 0 && status == _gf_false)
@@ -3556,14 +3548,14 @@ mq_update_dirty_inode_txn (xlator_t *this, loc_t *loc)
         GF_VALIDATE_OR_GOTO ("marker", loc, out);
         GF_VALIDATE_OR_GOTO ("marker", loc->inode, out);
 
-        ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true,
-                           loc, 0);
+        ret = mq_synctask (this, mq_update_dirty_inode_task, _gf_true, loc);
 out:
         return ret;
 }
 
 int32_t
-mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
+mq_inspect_directory_xattr (xlator_t *this, quota_inode_ctx_t *ctx,
+                            inode_contribution_t *contribution, loc_t *loc,
                             dict_t *dict, struct iatt buf)
 {
         int32_t               ret                          = 0;
@@ -3572,19 +3564,6 @@ mq_inspect_directory_xattr (xlator_t *this, loc_t *loc, quota_inode_ctx_t *ctx,
         quota_meta_t          contri                       = {0, };
         quota_meta_t          delta                        = {0, };
         char                  contri_key[CONTRI_KEY_MAX]   = {0, };
-        inode_contribution_t *contribution                 = NULL;
-
-        if (!loc_is_root(loc)) {
-                contribution = mq_add_new_contribution_node (this, ctx, loc);
-                if (contribution == NULL) {
-                        if (!gf_uuid_is_null (loc->inode->gfid))
-                                gf_log (this->name, GF_LOG_DEBUG,
-                                        "cannot add a new contribution node "
-                                        "(%s)", uuid_utoa (loc->inode->gfid));
-                        ret = -1;
-                        goto out;
-                }
-        }
 
         ret = dict_get_int8 (dict, QUOTA_DIRTY_KEY, &dirty);
         if (ret < 0) {
@@ -3647,14 +3626,12 @@ create_xattr:
                 ret = mq_create_xattrs_txn (this, loc);
 
 out:
-        if (contribution)
-                GF_REF_PUT (contribution);
-
         return ret;
 }
 
 int32_t
-mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,
+mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx,
+                       inode_contribution_t *contribution, loc_t *loc,
                        dict_t *dict, struct iatt buf)
 {
         int32_t               ret                          = -1;
@@ -3662,15 +3639,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,
         quota_meta_t          contri                       = {0, };
         quota_meta_t          delta                        = {0, };
         char                  contri_key[CONTRI_KEY_MAX]   = {0, };
-        inode_contribution_t *contribution                 = NULL;
-
-        contribution = mq_add_new_contribution_node (this, ctx, loc);
-        if (contribution == NULL) {
-                gf_log_callingfn (this->name, GF_LOG_DEBUG, "cannot allocate "
-                                  "contribution node (path:%s)", loc->path);
-                ret = -1;
-                goto out;
-        }
 
         LOCK (&ctx->lock);
         {
@@ -3708,9 +3676,6 @@ mq_inspect_file_xattr (xlator_t *this, quota_inode_ctx_t *ctx, loc_t *loc,
         /* TODO: revist this code when fixing hardlinks */
 
 out:
-        if (contribution)
-                GF_REF_PUT (contribution);
-
         return ret;
 }
 
@@ -3718,22 +3683,48 @@ int32_t
 mq_xattr_state (xlator_t *this, loc_t *origin_loc, dict_t *dict,
                 struct iatt buf)
 {
-        int32_t               ret     = -1;
-        quota_inode_ctx_t    *ctx     = NULL;
-        loc_t                 loc     = {0, };
+        int32_t               ret             = -1;
+        quota_inode_ctx_t    *ctx             = NULL;
+        loc_t                 loc             = {0, };
+        inode_contribution_t *contribution    = NULL;
+
+        if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))
+            || (buf.ia_type == IA_IFLNK) || (buf.ia_type == IA_IFDIR)) {
+                /* do healing only for these type of files */
+        } else {
+                ret = 0;
+                goto out;
+        }
 
         ret = mq_prevalidate_txn (this, origin_loc, &loc, &ctx);
         if (ret < 0)
                 goto out;
 
-        if (((buf.ia_type == IA_IFREG) && !dht_is_linkfile (&buf, dict))
-            || (buf.ia_type == IA_IFLNK))  {
-                mq_inspect_file_xattr (this, ctx, &loc, dict, buf);
-        } else if (buf.ia_type == IA_IFDIR)
-                mq_inspect_directory_xattr (this, &loc, ctx, dict, buf);
+        if (!loc_is_root(&loc)) {
+                contribution = mq_add_new_contribution_node (this, ctx, &loc);
+                if (contribution == NULL) {
+                        if (!gf_uuid_is_null (loc.inode->gfid))
+                                gf_log (this->name, GF_LOG_WARNING,
+                                        "cannot add a new contribution node "
+                                        "(%s)", uuid_utoa (loc.gfid));
+                        ret = -1;
+                        goto out;
+                }
+        }
+
+        if (buf.ia_type == IA_IFDIR)
+                mq_inspect_directory_xattr (this, ctx, contribution, &loc, dict,
+                                            buf);
+        else
+                mq_inspect_file_xattr (this, ctx, contribution, &loc, dict,
+                                       buf);
 
 out:
         loc_wipe (&loc);
+
+        if (contribution)
+                GF_REF_PUT (contribution);
+
         return ret;
 }
 
diff --git a/xlators/features/marker/src/marker-quota.h b/xlators/features/marker/src/marker-quota.h
index 89ac559..65cb0b2 100644
--- a/xlators/features/marker/src/marker-quota.h
+++ b/xlators/features/marker/src/marker-quota.h
@@ -18,6 +18,7 @@
 #include "xlator.h"
 #include "marker-mem-types.h"
 #include "refcount.h"
+#include "quota-common-utils.h"
 
 #define QUOTA_XATTR_PREFIX "trusted.glusterfs"
 #define QUOTA_DIRTY_KEY "trusted.glusterfs.quota.dirty"
@@ -98,7 +99,7 @@ typedef struct quota_inode_ctx quota_inode_ctx_t;
 struct quota_synctask {
         xlator_t      *this;
         loc_t          loc;
-        int64_t        contri;
+        quota_meta_t   contri;
         gf_boolean_t   is_static;
 };
 typedef struct quota_synctask quota_synctask_t;
@@ -146,7 +147,7 @@ int32_t
 mq_reduce_parent_size (xlator_t *, loc_t *, int64_t);
 
 int32_t
-mq_reduce_parent_size_txn (xlator_t *, loc_t *, int64_t);
+mq_reduce_parent_size_txn (xlator_t *, loc_t *, quota_meta_t *);
 
 int32_t
 mq_rename_update_newpath (xlator_t *, loc_t *);
diff --git a/xlators/features/marker/src/marker.c b/xlators/features/marker/src/marker.c
index ba599da..6265028 100644
--- a/xlators/features/marker/src/marker.c
+++ b/xlators/features/marker/src/marker.c
@@ -210,6 +210,11 @@ marker_local_unref (marker_local_t *local)
         if (local->xdata)
                 dict_unref (local->xdata);
 
+        if (local->lk_frame) {
+                STACK_DESTROY (local->lk_frame->root);
+                local->lk_frame = NULL;
+        }
+
         if (local->oplocal) {
                 marker_local_unref (local->oplocal);
                 local->oplocal = NULL;
@@ -836,7 +841,7 @@ marker_rmdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
         priv = this->private;
 
         if (priv->feature_enabled & GF_QUOTA)
-                mq_reduce_parent_size_txn (this, &local->loc, -1);
+                mq_reduce_parent_size_txn (this, &local->loc, NULL);
 
         if (priv->feature_enabled & GF_XTIME)
                 marker_xtime_update_marks (this, local);
@@ -905,7 +910,7 @@ marker_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
 
         if (priv->feature_enabled & GF_QUOTA) {
                 if (!local->skip_txn)
-                        mq_reduce_parent_size_txn (this, &local->loc, -1);
+                        mq_reduce_parent_size_txn (this, &local->loc, NULL);
         }
 
         if (priv->feature_enabled & GF_XTIME)
@@ -1048,35 +1053,23 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
         frame->local = NULL;
 
         if (op_ret < 0) {
-                if (local->err == 0) {
-                        local->err =  op_errno ? op_errno : EINVAL;
-                }
-
                 gf_log (this->name, GF_LOG_WARNING,
                         "inodelk (UNLOCK) failed on path:%s (gfid:%s) (%s)",
-                        local->parent_loc.path,
-                        uuid_utoa (local->parent_loc.inode->gfid),
+                        oplocal->parent_loc.path,
+                        uuid_utoa (oplocal->parent_loc.inode->gfid),
                         strerror (op_errno));
         }
 
-        if (local->stub != NULL) {
-                call_resume (local->stub);
-                local->stub = NULL;
-        } else if (local->err != 0) {
-                STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL,
-                                     NULL, NULL, NULL, NULL);
-        } else {
-                gf_log (this->name, GF_LOG_CRITICAL,
-                        "continuation stub to unwind the call is absent, hence "
-                        "call will be hung (call-stack id = %"PRIu64")",
-                        frame->root->unique);
-        }
+        if (local->err != 0)
+                goto err;
 
-        mq_reduce_parent_size_txn (this, &oplocal->loc, oplocal->contribution);
+        mq_reduce_parent_size_txn (this, &oplocal->loc, &oplocal->contribution);
 
         if (local->loc.inode != NULL) {
-                mq_reduce_parent_size_txn (this, &local->loc,
-                                           local->contribution);
+                /* If destination file exits before rename, it would have
+                 * been unlinked while renaming a file
+                 */
+                mq_reduce_parent_size_txn (this, &local->loc, NULL);
         }
 
         newloc.inode = inode_ref (oplocal->loc.inode);
@@ -1097,39 +1090,26 @@ marker_rename_done (call_frame_t *frame, void *cookie, xlator_t *this,
                 marker_xtime_update_marks (this, local);
         }
 
+err:
         marker_local_unref (local);
         marker_local_unref (oplocal);
+
         return 0;
 }
 
 
-int32_t
-marker_rename_release_newp_lock (call_frame_t *frame, void *cookie,
-                                 xlator_t *this, int32_t op_ret,
-                                 int32_t op_errno, dict_t *xdata)
+void
+marker_rename_release_oldp_lock (marker_local_t *local, xlator_t *this)
 {
-        marker_local_t  *local = NULL, *oplocal = NULL;
-        struct gf_flock  lock  = {0, };
+        marker_local_t        *oplocal  = NULL;
+        call_frame_t          *lk_frame = NULL;
+        struct gf_flock        lock     = {0, };
 
-        local = frame->local;
         oplocal = local->oplocal;
+        lk_frame = local->lk_frame;
 
-        if (op_ret < 0) {
-                if (local->err == 0) {
-                        local->err = op_errno ? op_errno : EINVAL;
-                }
-
-                gf_log (this->name, GF_LOG_WARNING,
-                        "inodelk (UNLOCK) failed on %s (gfid:%s) (%s)",
-                        oplocal->parent_loc.path,
-                        uuid_utoa (oplocal->parent_loc.inode->gfid),
-                        strerror (op_errno));
-        }
-
-        if (local->next_lock_on == NULL) {
-                marker_rename_done (frame, NULL, this, 0, 0, NULL);
-                goto out;
-        }
+        if (lk_frame == NULL)
+                goto err;
 
         lock.l_type   = F_UNLCK;
         lock.l_whence = SEEK_SET;
@@ -1137,47 +1117,75 @@ marker_rename_release_newp_lock (call_frame_t *frame, void *cookie,
         lock.l_len    = 0;
         lock.l_pid    = 0;
 
-        STACK_WIND (frame,
+        STACK_WIND (lk_frame,
                     marker_rename_done,
                     FIRST_CHILD(this),
                     FIRST_CHILD(this)->fops->inodelk,
-                    this->name, &local->parent_loc, F_SETLKW, &lock, NULL);
+                    this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL);
 
-out:
-        return 0;
+        return;
+
+err:
+        marker_local_unref (local);
+        marker_local_unref (oplocal);
 }
 
 
 int32_t
-marker_rename_release_oldp_lock (call_frame_t *frame, void *cookie,
-                                 xlator_t *this, int32_t op_ret,
-                                 int32_t op_errno, dict_t *xdata)
+marker_rename_unwind (call_frame_t *frame, void *cookie, xlator_t *this,
+                      int32_t op_ret, int32_t op_errno, dict_t *xdata)
 {
-        marker_local_t  *local = NULL, *oplocal = NULL;
-        struct gf_flock  lock  = {0, };
+        marker_local_t       *local    = NULL;
+        marker_local_t       *oplocal  = NULL;
+        quota_inode_ctx_t    *ctx      = NULL;
+        inode_contribution_t *contri   = NULL;
 
         local = frame->local;
         oplocal = local->oplocal;
-
-        if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) {
-                local->err = op_errno;
-        }
+        frame->local = NULL;
 
         //Reset frame uid and gid if set.
         if (cookie == (void *) _GF_UID_GID_CHANGED)
                 MARKER_RESET_UID_GID (frame, frame->root, local);
 
-        lock.l_type   = F_UNLCK;
-        lock.l_whence = SEEK_SET;
-        lock.l_start  = 0;
-        lock.l_len    = 0;
-        lock.l_pid    = 0;
+        if (op_ret < 0)
+                local->err =  op_errno ? op_errno : EINVAL;
+
+        if (local->stub != NULL) {
+                /* Remove contribution node from in-memory even if
+                 * remove-xattr has failed as the rename is already performed
+                 * if local->stub is set, which means rename was sucessful
+                 */
+                mq_inode_ctx_get (oplocal->loc.inode, this, &ctx);
+                if (ctx) {
+                        contri = mq_get_contribution_node (oplocal->loc.parent,
+                                                           ctx);
+                        if (contri) {
+                                QUOTA_FREE_CONTRIBUTION_NODE (ctx, contri);
+                                GF_REF_PUT (contri);
+                        }
+                }
+
+                call_resume (local->stub);
+                local->stub = NULL;
+                local->err = 0;
+        } else if (local->err != 0) {
+                STACK_UNWIND_STRICT (rename, frame, -1, local->err, NULL, NULL,
+                                     NULL, NULL, NULL, NULL);
+        } else {
+                gf_log (this->name, GF_LOG_CRITICAL,
+                        "continuation stub to unwind the call is absent, hence "
+                        "call will be hung (call-stack id = %"PRIu64")",
+                        frame->root->unique);
+        }
+
+        /* If there are in-progress writes on old-path when during rename
+         * operation, update txn will update the wrong path if lock
+         * is released before rename unwind.
+         * So release lock only after rename unwind
+         */
+        marker_rename_release_oldp_lock (local, this);
 
-        STACK_WIND (frame,
-                    marker_rename_release_newp_lock,
-                    FIRST_CHILD(this),
-                    FIRST_CHILD(this)->fops->inodelk,
-                    this->name, &oplocal->parent_loc, F_SETLKW, &lock, NULL);
         return 0;
 }
 
@@ -1249,7 +1257,7 @@ marker_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                 newloc.parent = inode_ref (local->loc.parent);
                 gf_uuid_copy (newloc.gfid, oplocal->loc.inode->gfid);
 
-                STACK_WIND_COOKIE (frame, marker_rename_release_oldp_lock,
+                STACK_WIND_COOKIE (frame, marker_rename_unwind,
                                    frame->cookie, FIRST_CHILD(this),
                                    FIRST_CHILD(this)->fops->removexattr,
                                    &newloc, contri_key, NULL);
@@ -1283,7 +1291,7 @@ out:
         return 0;
 
 quota_err:
-        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
+        marker_rename_unwind (frame, NULL, this, 0, 0, NULL);
         return 0;
 }
 
@@ -1296,60 +1304,7 @@ marker_do_rename (call_frame_t *frame, void *cookie, xlator_t *this,
         marker_local_t       *oplocal                    = NULL;
         char                  contri_key[CONTRI_KEY_MAX] = {0, };
         int32_t               ret                        = 0;
-        int64_t              *contribution               = 0;
-
-        local = frame->local;
-        oplocal = local->oplocal;
-
-        //Reset frame uid and gid if set.
-        if (cookie == (void *) _GF_UID_GID_CHANGED)
-                MARKER_RESET_UID_GID (frame, frame->root, local);
-
-        if ((op_ret < 0) && (op_errno != ENOATTR) && (op_errno != ENODATA)) {
-                local->err = op_errno ? op_errno : EINVAL;
-                gf_log (this->name, GF_LOG_WARNING,
-                        "fetching contribution values from %s (gfid:%s) "
-                        "failed (%s)", local->loc.path,
-                        uuid_utoa (local->loc.inode->gfid),
-                        strerror (op_errno));
-                goto err;
-        }
-
-        if (local->loc.inode != NULL) {
-                GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret);
-                if (ret < 0) {
-                        local->err = errno ? errno : ENOMEM;
-                        goto err;
-                }
-
-                if (dict_get_bin (dict, contri_key,
-                                  (void **) &contribution) == 0) {
-                        local->contribution = ntoh64 (*contribution);
-                }
-        }
-
-        STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this),
-                    FIRST_CHILD(this)->fops->rename, &oplocal->loc,
-                    &local->loc, local->xdata);
-
-        return 0;
-
-err:
-        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
-        return 0;
-}
-
-
-int32_t
-marker_get_newpath_contribution (call_frame_t *frame, void *cookie,
-                                 xlator_t *this, int32_t op_ret,
-                                 int32_t op_errno, dict_t *dict, dict_t *xdata)
-{
-        marker_local_t *local                      = NULL;
-        marker_local_t *oplocal                    = NULL;
-        char            contri_key[CONTRI_KEY_MAX] = {0, };
-        int32_t         ret                        = 0;
-        int64_t        *contribution               = 0;
+        quota_meta_t          contribution               = {0, };
 
         local = frame->local;
         oplocal = local->oplocal;
@@ -1373,68 +1328,51 @@ marker_get_newpath_contribution (call_frame_t *frame, void *cookie,
                 local->err = errno ? errno : ENOMEM;
                 goto err;
         }
+        quota_dict_get_meta (dict, contri_key, &contribution);
+        oplocal->contribution = contribution;
 
-        if (dict_get_bin (dict, contri_key, (void **) &contribution) == 0)
-                oplocal->contribution = ntoh64 (*contribution);
-
-        if (local->loc.inode != NULL) {
-                GET_CONTRI_KEY (contri_key, local->loc.parent->gfid, ret);
-                if (ret < 0) {
-                        local->err = errno ? errno : ENOMEM;
-                        goto err;
-                }
-
-                /* getxattr requires uid and gid to be 0,
-                 * reset them in the callback.
-                 */
-                MARKER_SET_UID_GID (frame, local, frame->root);
-                if (gf_uuid_is_null (local->loc.gfid))
-                        gf_uuid_copy (local->loc.gfid, local->loc.inode->gfid);
-
-                GF_UUID_ASSERT (local->loc.gfid);
-
-                STACK_WIND_COOKIE (frame, marker_do_rename,
-                                   frame->cookie, FIRST_CHILD(this),
-                                   FIRST_CHILD(this)->fops->getxattr,
-                                   &local->loc, contri_key, NULL);
-        } else {
-                marker_do_rename (frame, NULL, this, 0, 0, NULL, NULL);
-        }
+        STACK_WIND (frame, marker_rename_cbk, FIRST_CHILD(this),
+                    FIRST_CHILD(this)->fops->rename, &oplocal->loc,
+                    &local->loc, local->xdata);
 
         return 0;
+
 err:
-        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
+        marker_rename_unwind (frame, NULL, this, 0, 0, NULL);
         return 0;
 }
 
-
 int32_t
-marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,
+marker_get_oldpath_contribution (call_frame_t *lk_frame, void *cookie,
                                  xlator_t *this, int32_t op_ret,
                                  int32_t op_errno, dict_t *xdata)
 {
-        marker_local_t *local                      = NULL;
-        marker_local_t *oplocal                    = NULL;
-        char            contri_key[CONTRI_KEY_MAX] = {0, };
-        int32_t         ret                        = 0;
+        call_frame_t    *frame                      = NULL;
+        marker_local_t  *local                      = NULL;
+        marker_local_t  *oplocal                    = NULL;
+        char             contri_key[CONTRI_KEY_MAX] = {0, };
+        int32_t          ret                        = 0;
 
-        local = frame->local;
+        local = lk_frame->local;
         oplocal = local->oplocal;
+        frame = local->frame;
 
         if (op_ret < 0) {
                 local->err = op_errno ? op_errno : EINVAL;
                 gf_log (this->name, GF_LOG_WARNING,
                         "cannot hold inodelk on %s (gfid:%s) (%s)",
-                        local->next_lock_on->path,
-                        uuid_utoa (local->next_lock_on->inode->gfid),
+                        oplocal->loc.path, uuid_utoa (oplocal->loc.inode->gfid),
                         strerror (op_errno));
-                goto lock_err;
+                goto err;
+
+                STACK_DESTROY (local->lk_frame->root);
+                local->lk_frame = NULL;
         }
 
         GET_CONTRI_KEY (contri_key, oplocal->loc.parent->gfid, ret);
         if (ret < 0) {
                 local->err = errno ? errno : ENOMEM;
-                goto quota_err;
+                goto err;
         }
 
         /* getxattr requires uid and gid to be 0,
@@ -1448,79 +1386,102 @@ marker_get_oldpath_contribution (call_frame_t *frame, void *cookie,
 
         GF_UUID_ASSERT (oplocal->loc.gfid);
 
-        STACK_WIND_COOKIE (frame, marker_get_newpath_contribution,
+        STACK_WIND_COOKIE (frame, marker_do_rename,
                            frame->cookie, FIRST_CHILD(this),
                            FIRST_CHILD(this)->fops->getxattr,
                            &oplocal->loc, contri_key, NULL);
-        return 0;
-
-quota_err:
-        marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
-        return 0;
-
-lock_err:
-        if ((local->next_lock_on == NULL)
-            || (local->next_lock_on == &local->parent_loc)) {
-                local->next_lock_on = NULL;
-                marker_rename_release_oldp_lock (frame, NULL, this, 0, 0, NULL);
-        } else {
-                marker_rename_release_newp_lock (frame, NULL, this, 0, 0, NULL);
-        }
 
         return 0;
-}
-
-
-int32_t
-marker_rename_inodelk_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
-                           int32_t op_ret, int32_t op_errno, dict_t *xdata)
-{
-        marker_local_t  *local = NULL, *oplocal = NULL;
-        loc_t           *loc   = NULL;
-        struct gf_flock  lock  = {0, };
-
-        local = frame->local;
-        oplocal = local->oplocal;
-
-        if (op_ret < 0) {
-                if (local->next_lock_on != &oplocal->parent_loc) {
-                        loc = &oplocal->parent_loc;
-                } else {
-                        loc = &local->parent_loc;
-                }
-
-                local->err = op_errno ? op_errno : EINVAL;
-                gf_log (this->name, GF_LOG_WARNING,
-                        "cannot hold inodelk on %s (gfid:%s) (%s)",
-                        loc->path, uuid_utoa (loc->inode->gfid),
-                        strerror (op_errno));
-                goto err;
-        }
-
-        if (local->next_lock_on != NULL) {
-                lock.l_len    = 0;
-                lock.l_start  = 0;
-                lock.l_type   = F_WRLCK;
-                lock.l_whence = SEEK_SET;
-
-                STACK_WIND (frame,
-                            marker_get_oldpath_contribution,
-                            FIRST_CHILD(this),
-                            FIRST_CHILD(this)->fops->inodelk,
-                            this->name, local->next_lock_on,
-                            F_SETLKW, &lock, NULL);
-        } else {
-                marker_get_oldpath_contribution (frame, 0, this, 0, 0, NULL);
-        }
-
-        return 0;
-
 err:
-        marker_rename_done (frame, NULL, this, 0, 0, NULL);
-        return 0;
-}
-
-
+        marker_rename_unwind (frame, NULL, this, 0, 0, NULL);
+        return 0;
+}
+
+
+/* For a marker_rename FOP, following is the algorithm used for Quota
+ * accounting. The use-case considered is:
+ * 1. rename (src, dst)
+ * 2. both src and dst exist
+ * 3. there are parallel operations on src and dst (lets say through fds
+ *    opened on them before rename was initiated).
+ *
+ * PS: We've not thought through whether this algo works in the presence of
+ *     hardlinks to src and/or dst.
+ *
+ * Algorithm:
+ * ==========
+ *
+ * 1) set inodelk on src-parent
+ *    As part of rename operation, parent can change for the file.
+ *    We need to remove contribution (both on disk xattr and in-memory one)
+ *    to src-parent (and its ancestors) and add the contribution to dst-parent
+ *    (and its ancestors). While we are doing these operations, contribution of
+ *    the file/directory shouldn't be changing as we want to be sure that
+ *      a) what we subtract from src-parent is exactly what we add to dst-parent
+ *      b) we should subtract from src-parent exactly what we contributed to
+ *         src-parent
+ *    So, We hold a lock on src-parent to block any parallel transcations on
+ *    src-inode (since thats the one which survives rename).
+ *
+ *    If there are any parallel transactions on dst-inode they keep succeeding
+ *    till the association of dst-inode with dst-parent is broken because of an
+ *    inode_rename after unwind of rename fop from marker. Only after unwind
+ *    (and hence inode_rename), we delete and subtract the contribution of
+ *    dst-inode to dst-parent. That way we are making sure we subtract exactly
+ *    what dst-inode contributed to dst-parent.
+ *
+ * 2) lookup contribution to src-parent on src-inode.
+ *    We need to save the contribution info for use at step-8.
+ *
+ * 3) wind rename
+ *    Perform rename on disk
+ *
+ * 4) remove xattr on src-loc
+ *    After rename, parent can change, so
+ *    need to remove xattrs storing contribution to src-parent.
+ *
+ * 5) remove contribution node corresponding to src-parent from the in-memory
+ *    list.
+ *    After rename, contri gfid can change and we have
+ *    also removed xattr from file.
+ *    We need to remove in-memory contribution node to prevent updations to
+ *    src-parent even after a successful rename
+ *
+ * 6) unwind rename
+ *    This will ensure that rename is done in the server
+ *    inode table. An inode_rename disassociates src-inode from src-parent and
+ *    associates it with dst-parent. It also disassociates dst-inode from
+ *    dst-parent. After inode_rename, inode_parent on src-inode will give
+ *    dst-parent and inode_parent on dst-inode will return NULL (assuming
+ *    dst-inode doesn't have any hardlinks).
+ *
+ * 7) release inodelk on src-parent
+ *    Lock on src-parent should be released only after
+ *    rename on disk, remove xattr and rename_unwind (and hence inode_rename)
+ *    operations. If lock is released before inode_rename, a parallel
+ *    transaction on src-inode can still update src-parent (as inode_parent on
+ *    src-inode can still return src-parent). This would make the
+ *    contribution from src-inode to src-parent stored in step-2 stale.
+ *
+ * 8) Initiate mq_reduce_parent_size_txn on src-parent to remove contribution
+ *    of src-inode to src-parent. We use the contribution stored in step-2.
+ *    Since, we had acquired the lock on src-parent all along step-2 through
+ *    inode_rename, we can be sure that a parallel transaction wouldn't have
+ *    added a delta to src-parent.
+ *
+ * 9) Initiate mq_reduce_parent_size_txn on dst-parent if dst-inode exists.
+ *    The size reduced from dst-parent and its ancestors is the
+ *    size stored as contribution to dst-parent in dst-inode.
+ *    If the destination file had existed, rename will unlink the
+ *    destination file as part of its operation.
+ *    We need to reduce the size on the dest parent similarly to
+ *    unlink. Since, we are initiating reduce-parent-size transaction after
+ *    inode_rename, we can be sure that a parallel transaction wouldn't add
+ *    delta to dst-parent while we are reducing the contribution of dst-inode
+ *    from its ancestors before rename.
+ *
+ * 10) create contribution xattr to dst-parent on src-inode.
+ */
 int32_t
 marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
                loc_t *newloc, dict_t *xdata)
@@ -1530,7 +1491,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
         marker_local_t *oplocal          = NULL;
         marker_conf_t  *priv             = NULL;
         struct gf_flock lock             = {0, };
-        loc_t          *lock_on          = NULL;
 
         priv = this->private;
 
@@ -1569,19 +1529,6 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
         if (ret < 0)
                 goto err;
 
-        if ((newloc->inode != NULL) && (newloc->parent != oldloc->parent)
-            && (gf_uuid_compare (newloc->parent->gfid,
-                              oldloc->parent->gfid) < 0)) {
-                lock_on = &local->parent_loc;
-                local->next_lock_on = &oplocal->parent_loc;
-        } else {
-                lock_on = &oplocal->parent_loc;
-                if ((newloc->inode != NULL) && (newloc->parent
-                                                != oldloc->parent)) {
-                        local->next_lock_on = &local->parent_loc;
-                }
-        }
-
         lock.l_len    = 0;
         lock.l_start  = 0;
         lock.l_type   = F_WRLCK;
@@ -1589,14 +1536,22 @@ marker_rename (call_frame_t *frame, xlator_t *this, loc_t *oldloc,
 
         local->xdata = dict_ref (xdata);
 
-        if (is_lk_owner_null (&frame->root->lk_owner))
-                set_lk_owner_from_ptr (&frame->root->lk_owner, frame->root);
+        local->frame = frame;
+        local->lk_frame = create_frame (this, this->ctx->pool);
+        if (local->lk_frame == NULL)
+                goto err;
+
+        local->lk_frame->root->uid = 0;
+        local->lk_frame->root->gid = 0;
+        local->lk_frame->local = local;
+        set_lk_owner_from_ptr (&local->lk_frame->root->lk_owner,
+                               local->lk_frame->root);
 
-        STACK_WIND (frame,
-                    marker_rename_inodelk_cbk,
+        STACK_WIND (local->lk_frame,
+                    marker_get_oldpath_contribution,
                     FIRST_CHILD(this),
                     FIRST_CHILD(this)->fops->inodelk,
-                    this->name, lock_on,
+                    this->name, &oplocal->parent_loc,
                     F_SETLKW, &lock, NULL);
 
         return 0;
diff --git a/xlators/features/marker/src/marker.h b/xlators/features/marker/src/marker.h
index f198859..12a51bc 100644
--- a/xlators/features/marker/src/marker.h
+++ b/xlators/features/marker/src/marker.h
@@ -97,7 +97,6 @@ struct marker_local{
         pid_t           pid;
         loc_t           loc;
         loc_t           parent_loc;
-        loc_t          *next_lock_on;
         uid_t           uid;
         gid_t           gid;
         int32_t         ref;
@@ -106,7 +105,8 @@ struct marker_local{
         mode_t          mode;
         int32_t         err;
         call_stub_t    *stub;
-        int64_t         contribution;
+        call_frame_t   *lk_frame;
+        quota_meta_t    contribution;
         struct marker_local *oplocal;
 
         /* marker quota specific */
-- 
1.7.1