12a457
From a6a4b068fe1feec882d471101c7931415bf05226 Mon Sep 17 00:00:00 2001
12a457
From: Krutika Dhananjay <kdhananj@redhat.com>
12a457
Date: Thu, 12 May 2016 15:06:59 +0530
12a457
Subject: [PATCH 164/167] features/shard: Get hard-link-count in {unlink,rename}_cbk before deleting shards
12a457
12a457
        Backport of: http://review.gluster.org/#/c/14334/
12a457
        release-3.7 patch: http://review.gluster.org/#/c/14450/
12a457
12a457
Change-Id: I078cd9baf32ce5dc92edcf86c67f0cfcd38dd9a6
12a457
BUG: 1333643
12a457
Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com>
12a457
Reviewed-on: https://code.engineering.redhat.com/gerrit/74766
12a457
Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
12a457
Tested-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
12a457
---
12a457
 libglusterfs/src/glusterfs.h           |    1 +
12a457
 tests/bugs/shard/unlinks-and-renames.t |  282 ++++++++++++++++++++++++++
12a457
 xlators/cluster/dht/src/dht-rename.c   |   21 ++-
12a457
 xlators/features/shard/src/shard.c     |  343 +++++++++++++++++++-------------
12a457
 xlators/storage/posix/src/posix.c      |   99 ++++++++--
12a457
 5 files changed, 582 insertions(+), 164 deletions(-)
12a457
 create mode 100644 tests/bugs/shard/unlinks-and-renames.t
12a457
12a457
diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
12a457
index c8c4590..9d1ea8c 100644
12a457
--- a/libglusterfs/src/glusterfs.h
12a457
+++ b/libglusterfs/src/glusterfs.h
12a457
@@ -250,6 +250,7 @@
12a457
 #define TIER_LINKFILE_GFID           "tier-linkfile-gfid"
12a457
 #define DHT_SKIP_OPEN_FD_UNLINK     "dont-unlink-for-open-fd"
12a457
 #define DHT_IATT_IN_XDATA_KEY       "dht-get-iatt-in-xattr"
12a457
+#define GET_LINK_COUNT              "get-link-count"
12a457
 
12a457
 /*CTR and Marker requires inode dentry link count from posix*/
12a457
 #define GF_RESPONSE_LINK_COUNT_XDATA "gf_response_link_count"
12a457
diff --git a/tests/bugs/shard/unlinks-and-renames.t b/tests/bugs/shard/unlinks-and-renames.t
12a457
new file mode 100644
12a457
index 0000000..751874c
12a457
--- /dev/null
12a457
+++ b/tests/bugs/shard/unlinks-and-renames.t
12a457
@@ -0,0 +1,282 @@
12a457
+#!/bin/bash
12a457
+
12a457
+. $(dirname $0)/../../include.rc
12a457
+. $(dirname $0)/../../volume.rc
12a457
+
12a457
+cleanup
12a457
+
12a457
+# The aim of this test script is to exercise the various codepaths of unlink
12a457
+# and rename fops in sharding and make sure they work fine.
12a457
+#
12a457
+
12a457
+#################################################
12a457
+################### UNLINK ######################
12a457
+#################################################
12a457
+
12a457
+TEST glusterd
12a457
+TEST pidof glusterd
12a457
+TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0,1}
12a457
+TEST $CLI volume set $V0 features.shard on
12a457
+TEST $CLI volume start $V0
12a457
+TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0
12a457
+
12a457
+TEST mkdir $M0/dir
12a457
+TEST touch $M0/dir/foo
12a457
+TEST touch $M0/dir/new
12a457
+
12a457
+######################################
12a457
+##### Unlink with /.shard absent #####
12a457
+######################################
12a457
+TEST truncate -s 5M $M0/dir/foo
12a457
+TEST ! stat $B0/${V0}0/.shard
12a457
+TEST ! stat $B0/${V0}1/.shard
12a457
+# Test to ensure that unlink doesn't fail due to absence of /.shard
12a457
+TEST unlink $M0/dir/foo
12a457
+
12a457
+##################################################
12a457
+##### Unlink of a sharded file without holes #####
12a457
+##################################################
12a457
+# Create a 9M sharded file
12a457
+TEST dd if=/dev/zero of=$M0/dir/new bs=1024 count=9216
12a457
+gfid_new=$(get_gfid_string $M0/dir/new)
12a457
+# Ensure its shards are created.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_new.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_new.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_new.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_new.2
12a457
+TEST unlink $M0/dir/new
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_new.1
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_new.1
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_new.2
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_new.2
12a457
+TEST ! stat $M0/dir/new
12a457
+TEST ! stat $B0/${V0}0/dir/new
12a457
+TEST ! stat $B0/${V0}1/dir/new
12a457
+
12a457
+#######################################
12a457
+##### Unlink with /.shard present #####
12a457
+#######################################
12a457
+TEST truncate -s 5M $M0/dir/foo
12a457
+gfid_foo=$(get_gfid_string $M0/dir/foo)
12a457
+# Ensure its shards are absent.
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_foo.1
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_foo.1
12a457
+# Test to ensure that unlink of a sparse file works fine.
12a457
+TEST unlink $M0/dir/foo
12a457
+TEST ! stat $B0/${V0}0/dir/foo
12a457
+TEST ! stat $B0/${V0}1/dir/foo
12a457
+TEST ! stat $M0/dir/foo
12a457
+
12a457
+#############################################################
12a457
+##### Unlink of a file with only one block (the zeroth) #####
12a457
+#############################################################
12a457
+TEST touch $M0/dir/foo
12a457
+TEST dd if=/dev/zero of=$M0/dir/foo bs=1024 count=1024
12a457
+# Test to ensure that unlink of a sparse file works fine.
12a457
+TEST unlink $M0/dir/foo
12a457
+TEST ! stat $B0/${V0}0/dir/foo
12a457
+TEST ! stat $B0/${V0}1/dir/foo
12a457
+TEST ! stat $M0/dir/foo
12a457
+
12a457
+####################################################
12a457
+##### Unlink of a sharded file with hard-links #####
12a457
+####################################################
12a457
+# Create a 9M sharded file
12a457
+TEST dd if=/dev/zero of=$M0/dir/original bs=1024 count=9216
12a457
+gfid_original=$(get_gfid_string $M0/dir/original)
12a457
+# Ensure its shards are created.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_original.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_original.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_original.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_original.2
12a457
+# Create a hard link.
12a457
+TEST ln $M0/dir/original $M0/link
12a457
+# Now delete the original file.
12a457
+TEST unlink $M0/dir/original
12a457
+# Ensure the shards are still intact.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_original.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_original.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_original.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_original.2
12a457
+TEST ! stat $M0/dir/original
12a457
+TEST stat $M0/link
12a457
+TEST stat $B0/${V0}0/link
12a457
+TEST stat $B0/${V0}1/link
12a457
+# Now delete the last link.
12a457
+TEST unlink $M0/link
12a457
+# Ensure that the shards are all cleaned up.
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_original.1
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_original.1
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_original.2
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_original.2
12a457
+TEST ! stat $M0/link
12a457
+TEST ! stat $B0/${V0}0/link
12a457
+TEST ! stat $B0/${V0}1/link
12a457
+
12a457
+EXPECT_WITHIN $UMOUNT_TIMEOUT "Y" force_umount $M0
12a457
+TEST $CLI volume stop $V0
12a457
+TEST $CLI volume delete $V0
12a457
+
12a457
+cleanup
12a457
+
12a457
+#################################################
12a457
+################### RENAME ######################
12a457
+#################################################
12a457
+
12a457
+TEST glusterd
12a457
+TEST pidof glusterd
12a457
+TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0,1}
12a457
+TEST $CLI volume set $V0 features.shard on
12a457
+TEST $CLI volume start $V0
12a457
+TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0
12a457
+
12a457
+TEST mkdir $M0/dir
12a457
+TEST touch $M0/dir/src
12a457
+TEST touch $M0/dir/dst
12a457
+
12a457
+######################################
12a457
+##### Rename with /.shard absent #####
12a457
+######################################
12a457
+TEST truncate -s 5M $M0/dir/dst
12a457
+TEST ! stat $B0/${V0}0/.shard
12a457
+TEST ! stat $B0/${V0}1/.shard
12a457
+# Test to ensure that rename doesn't fail due to absence of /.shard
12a457
+TEST mv -f $M0/dir/src $M0/dir/dst
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST   stat $M0/dir/dst
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+TEST   stat $B0/${V0}0/dir/dst
12a457
+TEST   stat $B0/${V0}1/dir/dst
12a457
+
12a457
+##################################################
12a457
+##### Rename to a sharded file without holes #####
12a457
+##################################################
12a457
+TEST unlink $M0/dir/dst
12a457
+TEST touch $M0/dir/src
12a457
+# Create a 9M sharded file
12a457
+TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=9216
12a457
+gfid_dst=$(get_gfid_string $M0/dir/dst)
12a457
+# Ensure its shards are created.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_dst.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_dst.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_dst.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_dst.2
12a457
+TEST mv -f $M0/dir/src $M0/dir/dst
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.1
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.1
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.2
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.2
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST   stat $M0/dir/dst
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+TEST   stat $B0/${V0}0/dir/dst
12a457
+TEST   stat $B0/${V0}1/dir/dst
12a457
+
12a457
+###################################################
12a457
+##### Rename of dst file with /.shard present #####
12a457
+###################################################
12a457
+TEST unlink $M0/dir/dst
12a457
+TEST touch $M0/dir/src
12a457
+TEST truncate -s 5M $M0/dir/dst
12a457
+# Test to ensure that unlink of a sparse file works fine.
12a457
+TEST mv -f $M0/dir/src $M0/dir/dst
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST   stat $M0/dir/dst
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+TEST   stat $B0/${V0}0/dir/dst
12a457
+TEST   stat $B0/${V0}1/dir/dst
12a457
+
12a457
+###############################################################
12a457
+##### Rename of dst file with only one block (the zeroth) #####
12a457
+###############################################################
12a457
+TEST unlink $M0/dir/dst
12a457
+TEST touch $M0/dir/src
12a457
+TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=1024
12a457
+# Test to ensure that unlink of a sparse file works fine.
12a457
+TEST mv -f $M0/dir/src $M0/dir/dst
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST   stat $M0/dir/dst
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+TEST   stat $B0/${V0}0/dir/dst
12a457
+TEST   stat $B0/${V0}1/dir/dst
12a457
+
12a457
+########################################################
12a457
+##### Rename to a dst sharded file with hard-links #####
12a457
+########################################################
12a457
+TEST unlink $M0/dir/dst
12a457
+TEST touch $M0/dir/src
12a457
+# Create a 9M sharded file
12a457
+TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=9216
12a457
+gfid_dst=$(get_gfid_string $M0/dir/dst)
12a457
+# Ensure its shards are created.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_dst.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_dst.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_dst.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_dst.2
12a457
+# Create a hard link.
12a457
+TEST ln $M0/dir/dst $M0/link
12a457
+# Now rename src to the dst.
12a457
+TEST mv -f $M0/dir/src $M0/dir/dst
12a457
+# Ensure the shards are still intact.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_dst.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_dst.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_dst.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_dst.2
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+# Now rename another file to the last link.
12a457
+TEST touch $M0/dir/src2
12a457
+TEST mv -f $M0/dir/src2 $M0/link
12a457
+# Ensure that the shards are all cleaned up.
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.1
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.1
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.2
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.2
12a457
+TEST ! stat $M0/dir/src2
12a457
+TEST ! stat $B0/${V0}0/dir/src2
12a457
+TEST ! stat $B0/${V0}1/dir/src2
12a457
+
12a457
+# Rename with non-existent dst and a sharded src
12a457
+TEST touch $M0/dir/src
12a457
+TEST dd if=/dev/zero of=$M0/dir/src bs=1024 count=9216
12a457
+gfid_src=$(get_gfid_string $M0/dir/src)
12a457
+# Ensure its shards are created.
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_src.1
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_src.1
12a457
+TEST stat $B0/${V0}0/.shard/$gfid_src.2
12a457
+TEST stat $B0/${V0}1/.shard/$gfid_src.2
12a457
+# Now rename src to the dst.
12a457
+TEST mv $M0/dir/src $M0/dir/dst
12a457
+
12a457
+TEST   stat $B0/${V0}0/.shard/$gfid_src.1
12a457
+TEST   stat $B0/${V0}1/.shard/$gfid_src.1
12a457
+TEST   stat $B0/${V0}0/.shard/$gfid_src.2
12a457
+TEST   stat $B0/${V0}1/.shard/$gfid_src.2
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+TEST   stat $M0/dir/dst
12a457
+TEST   stat $B0/${V0}0/dir/dst
12a457
+TEST   stat $B0/${V0}1/dir/dst
12a457
+
12a457
+# Rename with non-existent dst and a sharded src with no shards
12a457
+TEST touch $M0/dir/src
12a457
+TEST dd if=/dev/zero of=$M0/dir/src bs=1024 count=1024
12a457
+gfid_src=$(get_gfid_string $M0/dir/src)
12a457
+TEST ! stat $B0/${V0}0/.shard/$gfid_src.1
12a457
+TEST ! stat $B0/${V0}1/.shard/$gfid_src.1
12a457
+# Now rename src to the dst.
12a457
+TEST mv $M0/dir/src $M0/dir/dst
12a457
+TEST ! stat $M0/dir/src
12a457
+TEST ! stat $B0/${V0}0/dir/src
12a457
+TEST ! stat $B0/${V0}1/dir/src
12a457
+TEST   stat $M0/dir/dst
12a457
+TEST   stat $B0/${V0}0/dir/dst
12a457
+TEST   stat $B0/${V0}1/dir/dst
12a457
+
12a457
+cleanup
12a457
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
12a457
index 82a97bc..4ed659b 100644
12a457
--- a/xlators/cluster/dht/src/dht-rename.c
12a457
+++ b/xlators/cluster/dht/src/dht-rename.c
12a457
@@ -565,7 +565,7 @@ dht_rename_unlock_cbk (call_frame_t *frame, void *cookie,
12a457
         DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno,
12a457
                           &local->stbuf, &local->preoldparent,
12a457
                           &local->postoldparent, &local->preparent,
12a457
-                          &local->postparent, NULL);
12a457
+                          &local->postparent, local->xattr);
12a457
         return 0;
12a457
 }
12a457
 
12a457
@@ -876,6 +876,12 @@ dht_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
12a457
                                 uuid_utoa(local->loc.inode->gfid):"");
12a457
                 }
12a457
         }
12a457
+        if (xdata) {
12a457
+                if (!local->xattr)
12a457
+                        local->xattr = dict_ref (xdata);
12a457
+                else
12a457
+                        local->xattr = dict_copy_with_ref (xdata, local->xattr);
12a457
+        }
12a457
 
12a457
         if ((src_cached == dst_cached) && (dst_hashed != dst_cached)) {
12a457
                 link_frame = copy_frame (frame);
12a457
@@ -1026,7 +1032,6 @@ dht_do_rename (call_frame_t *frame)
12a457
         xlator_t    *dst_cached    = NULL;
12a457
         xlator_t    *this          = NULL;
12a457
         xlator_t    *rename_subvol = NULL;
12a457
-        dict_t      *dict          = NULL;
12a457
 
12a457
         local = frame->local;
12a457
         this  = frame->this;
12a457
@@ -1041,11 +1046,12 @@ dht_do_rename (call_frame_t *frame)
12a457
                 rename_subvol = dst_hashed;
12a457
 
12a457
         if ((src_cached != dst_hashed) && (rename_subvol == dst_hashed)) {
12a457
-                DHT_MARKER_DONT_ACCOUNT(dict);
12a457
+                DHT_MARKER_DONT_ACCOUNT(local->xattr_req);
12a457
         }
12a457
 
12a457
         if (rename_subvol == src_cached) {
12a457
-                DHT_CHANGELOG_TRACK_AS_RENAME(dict, &local->loc, &local->loc2);
12a457
+                DHT_CHANGELOG_TRACK_AS_RENAME(local->xattr_req, &local->loc,
12a457
+                                              &local->loc2);
12a457
         }
12a457
 
12a457
         gf_msg_trace (this->name, 0,
12a457
@@ -1056,10 +1062,7 @@ dht_do_rename (call_frame_t *frame)
12a457
                 FRAME_SU_DO (frame, dht_local_t);
12a457
         STACK_WIND (frame, dht_rename_cbk,
12a457
                     rename_subvol, rename_subvol->fops->rename,
12a457
-                    &local->loc, &local->loc2, dict);
12a457
-        if (dict)
12a457
-                dict_unref (dict);
12a457
-
12a457
+                    &local->loc, &local->loc2, local->xattr_req);
12a457
         return 0;
12a457
 }
12a457
 
12a457
@@ -1554,6 +1557,8 @@ dht_rename (call_frame_t *frame, xlator_t *this,
12a457
         local->src_cached = src_cached;
12a457
         local->dst_hashed = dst_hashed;
12a457
         local->dst_cached = dst_cached;
12a457
+        if (xdata)
12a457
+                local->xattr_req = dict_ref (xdata);
12a457
 
12a457
         gf_msg (this->name, GF_LOG_INFO, 0,
12a457
                 DHT_MSG_RENAME_INFO,
12a457
diff --git a/xlators/features/shard/src/shard.c b/xlators/features/shard/src/shard.c
12a457
index c7a57e0..d9a61c1 100644
12a457
--- a/xlators/features/shard/src/shard.c
12a457
+++ b/xlators/features/shard/src/shard.c
12a457
@@ -2135,30 +2135,178 @@ err:
12a457
 }
12a457
 
12a457
 int
12a457
-shard_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
12a457
-                  int32_t op_ret, int32_t op_errno, struct iatt *preparent,
12a457
-                  struct iatt *postparent, dict_t *xdata)
12a457
+shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode);
12a457
+
12a457
+int
12a457
+shard_post_lookup_shards_unlink_handler (call_frame_t *frame, xlator_t *this)
12a457
 {
12a457
-        SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno,  preparent,
12a457
-                            postparent, xdata);
12a457
+        shard_local_t *local = NULL;
12a457
+
12a457
+        local = frame->local;
12a457
 
12a457
+        if ((local->op_ret < 0) && (local->op_errno != ENOENT)) {
12a457
+                if (local->fop == GF_FOP_UNLINK)
12a457
+                        SHARD_STACK_UNWIND (unlink, frame, local->op_ret,
12a457
+                                            local->op_errno, NULL, NULL, NULL);
12a457
+                else
12a457
+                        SHARD_STACK_UNWIND (rename, frame, local->op_ret,
12a457
+                                            local->op_errno, NULL, NULL, NULL,
12a457
+                                            NULL, NULL, NULL);
12a457
+                return 0;
12a457
+        }
12a457
+        local->op_ret = 0;
12a457
+        local->op_errno = 0;
12a457
+
12a457
+        shard_unlink_shards_do (frame, this,
12a457
+                                (local->fop == GF_FOP_RENAME)
12a457
+                                             ? local->loc2.inode
12a457
+                                             : local->loc.inode);
12a457
         return 0;
12a457
 }
12a457
 
12a457
 int
12a457
-shard_unlink_base_file (call_frame_t *frame, xlator_t *this)
12a457
+shard_rename_cbk (call_frame_t *frame, xlator_t *this);
12a457
+
12a457
+int32_t
12a457
+shard_unlink_cbk (call_frame_t *frame, xlator_t *this);
12a457
+
12a457
+int
12a457
+shard_post_resolve_unlink_handler (call_frame_t *frame, xlator_t *this)
12a457
 {
12a457
         shard_local_t *local = NULL;
12a457
 
12a457
         local = frame->local;
12a457
 
12a457
         if (local->op_ret < 0) {
12a457
-                shard_unlink_cbk (frame, 0, this, local->op_ret,
12a457
-                                  local->op_errno, NULL, NULL, NULL);
12a457
+                if (local->op_errno == ENOENT) {
12a457
+                        /* If lookup on /.shard fails with ENOENT, it probably
12a457
+                         * means that the file is being unlinked before it
12a457
+                         * could grow beyond its first block. In this case,
12a457
+                         * unlink boils down to unlinking the base file and
12a457
+                         * unwinding the call.
12a457
+                         */
12a457
+                        local->op_ret = 0;
12a457
+                        local->first_block = local->last_block = 0;
12a457
+                        local->num_blocks = 1;
12a457
+                        if (local->fop == GF_FOP_UNLINK)
12a457
+                                shard_unlink_cbk (frame, this);
12a457
+                        else
12a457
+                                shard_rename_cbk (frame, this);
12a457
+                        return 0;
12a457
+                } else {
12a457
+                        if (local->fop == GF_FOP_UNLINK)
12a457
+                                SHARD_STACK_UNWIND (unlink, frame,
12a457
+                                                    local->op_ret,
12a457
+                                                    local->op_errno, NULL, NULL,
12a457
+                                                    NULL);
12a457
+                        else
12a457
+                                shard_rename_cbk (frame, this);
12a457
+                        return 0;
12a457
+                }
12a457
+        }
12a457
+
12a457
+        if (!local->call_count)
12a457
+                shard_unlink_shards_do (frame, this,
12a457
+                                        (local->fop == GF_FOP_RENAME)
12a457
+                                                     ? local->loc2.inode
12a457
+                                                     : local->loc.inode);
12a457
+        else
12a457
+                shard_common_lookup_shards (frame, this,
12a457
+                                            (local->fop == GF_FOP_RENAME)
12a457
+                                                         ? local->loc2.inode
12a457
+                                                         : local->loc.inode,
12a457
+                                       shard_post_lookup_shards_unlink_handler);
12a457
+        return 0;
12a457
+}
12a457
+
12a457
+int
12a457
+shard_unlink_base_file_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
12a457
+                            int32_t op_ret, int32_t op_errno,
12a457
+                            struct iatt *preparent, struct iatt *postparent,
12a457
+                            dict_t *xdata)
12a457
+{
12a457
+        int                  ret        = 0;
12a457
+        uint32_t             link_count = 0;
12a457
+        shard_local_t       *local      = NULL;
12a457
+        shard_priv_t        *priv       = NULL;
12a457
+
12a457
+        local = frame->local;
12a457
+        priv = this->private;
12a457
+
12a457
+        if (op_ret < 0) {
12a457
+                SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno, NULL, NULL,
12a457
+                                    NULL);
12a457
                 return 0;
12a457
         }
12a457
 
12a457
-        STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD(this),
12a457
+        /* Because link() does not create links for all but the
12a457
+         * base shard, unlink() must delete these shards only when the
12a457
+         * link count is 1. We can return safely now.
12a457
+         */
12a457
+        if ((xdata) && (!dict_get_uint32 (xdata, GET_LINK_COUNT, &link_count))
12a457
+            && (link_count > 1))
12a457
+                goto unwind;
12a457
+
12a457
+        local->first_block = get_lowest_block (0, local->block_size);
12a457
+        local->last_block = get_highest_block (0, local->prebuf.ia_size,
12a457
+                                               local->block_size);
12a457
+        local->num_blocks = local->last_block - local->first_block + 1;
12a457
+
12a457
+        /* num_blocks = 1 implies that the file has not crossed its
12a457
+         * shard block size. So unlink boils down to unlinking just the
12a457
+         * base file. We can safely return now.
12a457
+         */
12a457
+        if (local->num_blocks == 1)
12a457
+                goto unwind;
12a457
+
12a457
+        local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
12a457
+                                       gf_shard_mt_inode_list);
12a457
+        if (!local->inode_list)
12a457
+                goto unwind;
12a457
+
12a457
+        /* Save the xdata and preparent and postparent iatts now. This will be
12a457
+         * used at the time of unwinding the call to the parent xl.
12a457
+         */
12a457
+        local->preoldparent = *preparent;
12a457
+        local->postoldparent = *postparent;
12a457
+        if (xdata)
12a457
+                local->xattr_rsp = dict_ref (xdata);
12a457
+
12a457
+        local->dot_shard_loc.inode = inode_find (this->itable,
12a457
+                                                 priv->dot_shard_gfid);
12a457
+        if (!local->dot_shard_loc.inode) {
12a457
+                ret = shard_init_dot_shard_loc (this, local);
12a457
+                if (ret)
12a457
+                        goto unwind;
12a457
+                shard_lookup_dot_shard (frame, this,
12a457
+                                        shard_post_resolve_unlink_handler);
12a457
+        } else {
12a457
+                shard_common_resolve_shards (frame, this, local->loc.inode,
12a457
+                                             shard_post_resolve_unlink_handler);
12a457
+        }
12a457
+
12a457
+        return 0;
12a457
+
12a457
+unwind:
12a457
+        SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno,  preparent,
12a457
+                            postparent, xdata);
12a457
+        return 0;
12a457
+}
12a457
+
12a457
+int
12a457
+shard_unlink_base_file (call_frame_t *frame, xlator_t *this)
12a457
+{
12a457
+        shard_local_t *local = NULL;
12a457
+
12a457
+        local = frame->local;
12a457
+
12a457
+        if (dict_set_uint32 (local->xattr_req, GET_LINK_COUNT, 0))
12a457
+                gf_msg (this->name, GF_LOG_WARNING, 0,
12a457
+                        SHARD_MSG_DICT_SET_FAILED, "Failed to set "
12a457
+                        GET_LINK_COUNT" in dict");
12a457
+
12a457
+        /* To-Do: Request open-fd count on base file */
12a457
+        STACK_WIND (frame, shard_unlink_base_file_cbk, FIRST_CHILD(this),
12a457
                     FIRST_CHILD(this)->fops->unlink, &local->loc, local->xflag,
12a457
                     local->xattr_req);
12a457
         return 0;
12a457
@@ -2199,6 +2347,17 @@ shard_unlink_block_inode (shard_local_t *local, int shard_block_num)
12a457
 int
12a457
 shard_rename_cbk (call_frame_t *frame, xlator_t *this);
12a457
 
12a457
+int32_t
12a457
+shard_unlink_cbk (call_frame_t *frame, xlator_t *this)
12a457
+{
12a457
+        shard_local_t *local = frame->local;
12a457
+
12a457
+	SHARD_STACK_UNWIND (unlink, frame, local->op_ret, local->op_errno,
12a457
+			    &local->preoldparent, &local->postoldparent,
12a457
+                            local->xattr_rsp);
12a457
+	return 0;
12a457
+}
12a457
+
12a457
 int
12a457
 shard_unlink_shards_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
12a457
                             int32_t op_ret, int32_t op_errno,
12a457
@@ -2225,7 +2384,7 @@ done:
12a457
                 SHARD_UNSET_ROOT_FS_ID (frame, local);
12a457
 
12a457
                 if (local->fop == GF_FOP_UNLINK)
12a457
-                        shard_unlink_base_file (frame, this);
12a457
+                        shard_unlink_cbk (frame, this);
12a457
                 else if (local->fop == GF_FOP_RENAME)
12a457
                         shard_rename_cbk (frame, this);
12a457
                 else
12a457
@@ -2254,9 +2413,16 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
12a457
 
12a457
         priv = this->private;
12a457
         local = frame->local;
12a457
+
12a457
+        /* local->num_blocks includes the base file block. This function only
12a457
+         * deletes the shards under /.shard. So subtract num_blocks by 1.
12a457
+         */
12a457
         local->call_count = call_count = local->num_blocks - 1;
12a457
         last_block = local->last_block;
12a457
 
12a457
+        /* Ignore the inode associated with the base file and start counting
12a457
+         * from 1.
12a457
+         */
12a457
         for (i = 1; i < local->num_blocks; i++) {
12a457
                 if (!local->inode_list[i])
12a457
                         continue;
12a457
@@ -2266,20 +2432,15 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
12a457
         if (!count) {
12a457
                 /* callcount = 0 implies that all of the shards that need to be
12a457
                  * unlinked are non-existent (in other words the file is full of
12a457
-                 * holes). So shard xlator would now proceed to do the final
12a457
-                 * unlink on the base file.
12a457
+                 * holes). So shard xlator can simply return the fop to its
12a457
+                 * parent now.
12a457
                  */
12a457
                 gf_msg_debug (this->name, 0, "All shards that need to be "
12a457
                               "unlinked are non-existent: %s",
12a457
                               uuid_utoa (inode->gfid));
12a457
                 local->num_blocks = 1;
12a457
                 if (local->fop == GF_FOP_UNLINK) {
12a457
-                        gf_msg_debug (this->name, 0, "Proceeding to unlink the"
12a457
-                                      " base file");
12a457
-                        STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD(this),
12a457
-                                    FIRST_CHILD(this)->fops->unlink,
12a457
-                                    &local->loc, local->flags,
12a457
-                                    local->xattr_req);
12a457
+                        shard_unlink_cbk (frame, this);
12a457
                 } else if (local->fop == GF_FOP_RENAME) {
12a457
                         gf_msg_debug (this->name, 0, "Resuming rename()");
12a457
                         shard_rename_cbk (frame, this);
12a457
@@ -2291,6 +2452,8 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
12a457
         cur_block = 1;
12a457
         SHARD_SET_ROOT_FS_ID (frame, local);
12a457
 
12a457
+        /* Ignore the base file and start iterating from the first block shard.
12a457
+         */
12a457
         while (cur_block <= last_block) {
12a457
                 if (!local->inode_list[cur_block]) {
12a457
                         cur_block++;
12a457
@@ -2347,86 +2510,6 @@ next:
12a457
 }
12a457
 
12a457
 int
12a457
-shard_post_lookup_shards_unlink_handler (call_frame_t *frame, xlator_t *this)
12a457
-{
12a457
-        shard_local_t *local = NULL;
12a457
-
12a457
-        local = frame->local;
12a457
-
12a457
-        if ((local->op_ret < 0) && (local->op_errno != ENOENT)) {
12a457
-                if (local->fop == GF_FOP_UNLINK)
12a457
-                        SHARD_STACK_UNWIND (unlink, frame, local->op_ret,
12a457
-                                            local->op_errno, NULL, NULL, NULL);
12a457
-                else
12a457
-                        SHARD_STACK_UNWIND (rename, frame, local->op_ret,
12a457
-                                            local->op_errno, NULL, NULL, NULL,
12a457
-                                            NULL, NULL, NULL);
12a457
-                return 0;
12a457
-        }
12a457
-        local->op_ret = 0;
12a457
-        local->op_errno = 0;
12a457
-
12a457
-        shard_unlink_shards_do (frame, this,
12a457
-                                (local->fop == GF_FOP_RENAME)
12a457
-                                             ? local->loc2.inode
12a457
-                                             : local->loc.inode);
12a457
-        return 0;
12a457
-}
12a457
-
12a457
-int
12a457
-shard_post_resolve_unlink_handler (call_frame_t *frame, xlator_t *this)
12a457
-{
12a457
-        shard_local_t *local = NULL;
12a457
-
12a457
-        local = frame->local;
12a457
-
12a457
-        if (local->op_ret < 0) {
12a457
-                if (local->op_errno == ENOENT) {
12a457
-                        /* If lookup on /.shard fails with ENOENT, it probably
12a457
-                         * means that the file is being unlinked before it
12a457
-                         * could grow beyond its first block. In this case,
12a457
-                         * unlink boils down to unlinking the base file and
12a457
-                         * unwinding the call.
12a457
-                         */
12a457
-                        local->op_ret = 0;
12a457
-                        local->first_block = local->last_block = 0;
12a457
-                        local->num_blocks = 1;
12a457
-                        if (local->fop == GF_FOP_UNLINK)
12a457
-                                STACK_WIND (frame, shard_unlink_cbk,
12a457
-                                            FIRST_CHILD(this),
12a457
-                                            FIRST_CHILD (this)->fops->unlink,
12a457
-                                            &local->loc, local->xflag,
12a457
-                                            local->xattr_req);
12a457
-                        else
12a457
-                                shard_rename_cbk (frame, this);
12a457
-                        return 0;
12a457
-                } else {
12a457
-                        if (local->fop == GF_FOP_UNLINK)
12a457
-                                SHARD_STACK_UNWIND (unlink, frame,
12a457
-                                                    local->op_ret,
12a457
-                                                    local->op_errno, NULL, NULL,
12a457
-                                                    NULL);
12a457
-                        else
12a457
-                                shard_rename_cbk (frame, this);
12a457
-                        return 0;
12a457
-                }
12a457
-        }
12a457
-
12a457
-        if (!local->call_count)
12a457
-                shard_unlink_shards_do (frame, this,
12a457
-                                        (local->fop == GF_FOP_RENAME)
12a457
-                                                     ? local->loc2.inode
12a457
-                                                     : local->loc.inode);
12a457
-        else
12a457
-                shard_common_lookup_shards (frame, this,
12a457
-                                            (local->fop == GF_FOP_RENAME)
12a457
-                                                         ? local->loc2.inode
12a457
-                                                         : local->loc.inode,
12a457
-                                       shard_post_lookup_shards_unlink_handler);
12a457
-        return 0;
12a457
-}
12a457
-
12a457
-int
12a457
 shard_post_lookup_unlink_handler (call_frame_t *frame, xlator_t *this)
12a457
 {
12a457
         int            ret   = -1;
12a457
@@ -2442,46 +2525,7 @@ shard_post_lookup_unlink_handler (call_frame_t *frame, xlator_t *this)
12a457
                 return 0;
12a457
         }
12a457
 
12a457
-        local->first_block = get_lowest_block (0, local->block_size);
12a457
-        local->last_block = get_highest_block (0, local->prebuf.ia_size,
12a457
-                                               local->block_size);
12a457
-        local->num_blocks = local->last_block - local->first_block + 1;
12a457
-
12a457
-        if ((local->num_blocks == 1) || (local->prebuf.ia_nlink > 1)) {
12a457
-                /* num_blocks = 1 implies that the file has not crossed its
12a457
-                 * shard block size. So unlink boils down to unlinking just the
12a457
-                 * base file.
12a457
-                 * Because link() does not create links for all but the
12a457
-                 * base shard, unlink() must delete these shards only when the
12a457
-                 * link count is 1.
12a457
-                 */
12a457
-                STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD (this),
12a457
-                            FIRST_CHILD (this)->fops->unlink, &local->loc,
12a457
-                            local->xflag, local->xattr_req);
12a457
-                return 0;
12a457
-        }
12a457
-
12a457
-        local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
12a457
-                                       gf_shard_mt_inode_list);
12a457
-        if (!local->inode_list)
12a457
-                goto out;
12a457
-
12a457
-        local->dot_shard_loc.inode = inode_find (this->itable,
12a457
-                                                 priv->dot_shard_gfid);
12a457
-        if (!local->dot_shard_loc.inode) {
12a457
-                ret = shard_init_dot_shard_loc (this, local);
12a457
-                if (ret)
12a457
-                        goto out;
12a457
-                shard_lookup_dot_shard (frame, this,
12a457
-                                        shard_post_resolve_unlink_handler);
12a457
-        } else {
12a457
-                shard_common_resolve_shards (frame, this, local->loc.inode,
12a457
-                                             shard_post_resolve_unlink_handler);
12a457
-        }
12a457
-        return 0;
12a457
-
12a457
-out:
12a457
-        SHARD_STACK_UNWIND (unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
12a457
+        shard_unlink_base_file (frame, this);
12a457
         return 0;
12a457
 }
12a457
 
12a457
@@ -2524,7 +2568,6 @@ shard_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
12a457
 
12a457
         shard_lookup_base_file (frame, this, &local->loc,
12a457
                                 shard_post_lookup_unlink_handler);
12a457
-
12a457
         return 0;
12a457
 err:
12a457
         SHARD_STACK_UNWIND (unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
12a457
@@ -2549,9 +2592,10 @@ shard_rename_cbk (call_frame_t *frame, xlator_t *this)
12a457
 int
12a457
 shard_rename_unlink_dst_shards_do (call_frame_t *frame, xlator_t *this)
12a457
 {
12a457
-        int            ret   = -1;
12a457
-        shard_local_t *local = NULL;
12a457
-        shard_priv_t  *priv  = NULL;
12a457
+        int            ret        = -1;
12a457
+        uint32_t       link_count = 0;
12a457
+        shard_local_t *local      = NULL;
12a457
+        shard_priv_t  *priv       = NULL;
12a457
 
12a457
         local = frame->local;
12a457
         priv = this->private;
12a457
@@ -2561,7 +2605,14 @@ shard_rename_unlink_dst_shards_do (call_frame_t *frame, xlator_t *this)
12a457
                                                local->dst_block_size);
12a457
         local->num_blocks = local->last_block - local->first_block + 1;
12a457
 
12a457
-        if ((local->num_blocks == 1) || (local->postbuf.ia_nlink > 1)) {
12a457
+        if ((local->xattr_rsp) &&
12a457
+            (!dict_get_uint32 (local->xattr_rsp, GET_LINK_COUNT, &link_count))
12a457
+            && (link_count > 1)) {
12a457
+                shard_rename_cbk (frame, this);
12a457
+                return 0;
12a457
+        }
12a457
+
12a457
+        if (local->num_blocks == 1) {
12a457
                 shard_rename_cbk (frame, this);
12a457
                 return 0;
12a457
         }
12a457
@@ -2664,6 +2715,12 @@ shard_rename_src_base_file (call_frame_t *frame, xlator_t *this)
12a457
 
12a457
         local = frame->local;
12a457
 
12a457
+        if (dict_set_uint32 (local->xattr_req, GET_LINK_COUNT, 0))
12a457
+                gf_msg (this->name, GF_LOG_WARNING, 0,
12a457
+                        SHARD_MSG_DICT_SET_FAILED, "Failed to set "
12a457
+                        GET_LINK_COUNT" in dict");
12a457
+
12a457
+        /* To-Do: Request open-fd count on dst base file */
12a457
         STACK_WIND (frame, shard_rename_src_cbk, FIRST_CHILD(this),
12a457
                     FIRST_CHILD(this)->fops->rename, &local->loc, &local->loc2,
12a457
                     local->xattr_req);
12a457
diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c
12a457
index 94e3c45..d5c18f1 100644
12a457
--- a/xlators/storage/posix/src/posix.c
12a457
+++ b/xlators/storage/posix/src/posix.c
12a457
@@ -1660,13 +1660,13 @@ out:
12a457
 int32_t
12a457
 posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
12a457
                                     struct iatt *stbuf, int32_t *op_errno,
12a457
-                                    loc_t *loc)
12a457
+                                    loc_t *loc, gf_boolean_t get_link_count,
12a457
+                                    dict_t *rsp_dict)
12a457
 {
12a457
-        int32_t                 ret    = 0;
12a457
-        struct posix_private    *priv  = NULL;
12a457
-        int fd_count = 0;
12a457
-
12a457
-        priv = this->private;
12a457
+        int                    fd_count = 0;
12a457
+        int32_t                ret      = 0;
12a457
+        struct iatt            prebuf   = {0,};
12a457
+        gf_boolean_t           locked   = _gf_false;
12a457
 
12a457
         /*  Unlink the gfid_handle_first */
12a457
         if (stbuf && stbuf->ia_nlink == 1) {
12a457
@@ -1689,6 +1689,18 @@ posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
12a457
                 }
12a457
         }
12a457
 
12a457
+        if (get_link_count) {
12a457
+                LOCK (&loc->inode->lock);
12a457
+                locked = _gf_true;
12a457
+                ret = posix_pstat (this, loc->gfid, real_path, &prebuf);
12a457
+                if (ret) {
12a457
+                        gf_msg (this->name, GF_LOG_ERROR, errno,
12a457
+                                P_MSG_LSTAT_FAILED, "lstat on %s failed",
12a457
+                                real_path);
12a457
+                        goto err;
12a457
+                }
12a457
+        }
12a457
+
12a457
         /* Unlink the actual file */
12a457
         ret = sys_unlink (real_path);
12a457
         if (ret == -1) {
12a457
@@ -1699,9 +1711,23 @@ posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
12a457
                 goto err;
12a457
         }
12a457
 
12a457
+        if (locked) {
12a457
+                UNLOCK (&loc->inode->lock);
12a457
+                locked = _gf_false;
12a457
+        }
12a457
+
12a457
+        ret = dict_set_uint32 (rsp_dict, GET_LINK_COUNT, prebuf.ia_nlink);
12a457
+        if (ret)
12a457
+                gf_msg (this->name, GF_LOG_WARNING, 0, P_MSG_SET_XDATA_FAIL,
12a457
+                        "failed to set "GET_LINK_COUNT" for %s", real_path);
12a457
+
12a457
         return 0;
12a457
 
12a457
 err:
12a457
+        if (locked) {
12a457
+                UNLOCK (&loc->inode->lock);
12a457
+                locked = _gf_false;
12a457
+        }
12a457
         return -1;
12a457
 }
12a457
 
12a457
@@ -1792,6 +1818,7 @@ posix_unlink (call_frame_t *frame, xlator_t *this,
12a457
         void                  *uuid               = NULL;
12a457
         char                   uuid_str[GF_UUID_BUF_SIZE] = {0};
12a457
         char                   gfid_str[GF_UUID_BUF_SIZE] = {0};
12a457
+        gf_boolean_t           get_link_count     = _gf_false;
12a457
 
12a457
         DECLARE_OLD_FS_ID_VAR;
12a457
 
12a457
@@ -1915,18 +1942,23 @@ posix_unlink (call_frame_t *frame, xlator_t *this,
12a457
                 }
12a457
         }
12a457
 
12a457
-        op_ret =  posix_unlink_gfid_handle_and_entry (this, real_path, &stbuf,
12a457
-                                                      &op_errno, loc);
12a457
-        if (op_ret == -1) {
12a457
-                goto out;
12a457
-        }
12a457
-
12a457
         unwind_dict = dict_new ();
12a457
         if (!unwind_dict) {
12a457
                 op_errno = -ENOMEM;
12a457
                 op_ret = -1;
12a457
                 goto out;
12a457
         }
12a457
+
12a457
+        if (xdata && dict_get (xdata, GET_LINK_COUNT))
12a457
+                get_link_count = _gf_true;
12a457
+        op_ret =  posix_unlink_gfid_handle_and_entry (this, real_path, &stbuf,
12a457
+                                                      &op_errno, loc,
12a457
+                                                      get_link_count,
12a457
+                                                      unwind_dict);
12a457
+        if (op_ret == -1) {
12a457
+                goto out;
12a457
+        }
12a457
+
12a457
         if (fdstat_requested) {
12a457
                 op_ret = posix_fdstat (this, fd, &postbuf);
12a457
                 if (op_ret == -1) {
12a457
@@ -2250,6 +2282,8 @@ posix_rename (call_frame_t *frame, xlator_t *this,
12a457
         char                 *pgfid_xattr_key = NULL;
12a457
         int32_t               nlink_samepgfid = 0;
12a457
         dict_t               *unwind_dict     = NULL;
12a457
+        gf_boolean_t          locked          = _gf_false;
12a457
+        gf_boolean_t          get_link_count  = _gf_false;
12a457
 
12a457
         DECLARE_OLD_FS_ID_VAR;
12a457
 
12a457
@@ -2276,6 +2310,13 @@ posix_rename (call_frame_t *frame, xlator_t *this,
12a457
                 goto out;
12a457
         }
12a457
 
12a457
+        unwind_dict = dict_new ();
12a457
+        if (!unwind_dict) {
12a457
+                op_ret = -1;
12a457
+                op_errno = ENOMEM;
12a457
+                goto out;
12a457
+        }
12a457
+
12a457
         op_ret = posix_pstat (this, oldloc->pargfid, par_oldpath, &preoldparent);
12a457
         if (op_ret == -1) {
12a457
                 op_errno = errno;
12a457
@@ -2342,6 +2383,22 @@ posix_rename (call_frame_t *frame, xlator_t *this,
12a457
                                                    this, unlock);
12a457
                 }
12a457
 
12a457
+                if ((xdata) && (dict_get (xdata, GET_LINK_COUNT))
12a457
+                    && (real_newpath) && (was_present)) {
12a457
+                        LOCK (&newloc->inode->lock);
12a457
+                        locked = _gf_true;
12a457
+                        get_link_count = _gf_true;
12a457
+                        op_ret = posix_pstat (this, newloc->gfid, real_newpath,
12a457
+                                              &stbuf);
12a457
+                        if ((op_ret == -1) && (errno != ENOENT)) {
12a457
+                                op_errno = errno;
12a457
+                                gf_msg (this->name, GF_LOG_ERROR, errno,
12a457
+                                        P_MSG_LSTAT_FAILED,
12a457
+                                        "lstat on %s failed", real_newpath);
12a457
+                                goto unlock;
12a457
+                        }
12a457
+                }
12a457
+
12a457
                 op_ret = sys_rename (real_oldpath, real_newpath);
12a457
                 if (op_ret == -1) {
12a457
                         op_errno = errno;
12a457
@@ -2369,6 +2426,18 @@ posix_rename (call_frame_t *frame, xlator_t *this,
12a457
                         goto unlock;
12a457
                 }
12a457
 
12a457
+                if (locked) {
12a457
+                        UNLOCK (&newloc->inode->lock);
12a457
+                        locked = _gf_false;
12a457
+                }
12a457
+
12a457
+                if ((get_link_count) &&
12a457
+                    (dict_set_uint32 (unwind_dict, GET_LINK_COUNT,
12a457
+                                      stbuf.ia_nlink)))
12a457
+                        gf_msg (this->name, GF_LOG_WARNING, 0,
12a457
+                                P_MSG_SET_XDATA_FAIL, "failed to set "
12a457
+                                GET_LINK_COUNT" for %s", real_newpath);
12a457
+
12a457
                 if (!IA_ISDIR (oldloc->inode->ia_type)
12a457
                     && priv->update_pgfid_nlinks) {
12a457
                         MAKE_PGFID_XATTR_KEY (pgfid_xattr_key,
12a457
@@ -2382,6 +2451,10 @@ posix_rename (call_frame_t *frame, xlator_t *this,
12a457
                 }
12a457
         }
12a457
 unlock:
12a457
+        if (locked) {
12a457
+                UNLOCK (&newloc->inode->lock);
12a457
+                locked = _gf_false;
12a457
+        }
12a457
         UNLOCK (&oldloc->inode->lock);
12a457
 
12a457
         if (op_ret < 0) {
12a457
@@ -2430,7 +2503,7 @@ unlock:
12a457
         }
12a457
 
12a457
         if (was_present)
12a457
-                unwind_dict = posix_dict_set_nlink (xdata, NULL, nlink);
12a457
+                unwind_dict = posix_dict_set_nlink (xdata, unwind_dict, nlink);
12a457
         op_ret = 0;
12a457
 out:
12a457
 
12a457
-- 
12a457
1.7.1
12a457