Blob Blame History Raw
From a6a4b068fe1feec882d471101c7931415bf05226 Mon Sep 17 00:00:00 2001
From: Krutika Dhananjay <kdhananj@redhat.com>
Date: Thu, 12 May 2016 15:06:59 +0530
Subject: [PATCH 164/167] features/shard: Get hard-link-count in {unlink,rename}_cbk before deleting shards

        Backport of: http://review.gluster.org/#/c/14334/
        release-3.7 patch: http://review.gluster.org/#/c/14450/

Change-Id: I078cd9baf32ce5dc92edcf86c67f0cfcd38dd9a6
BUG: 1333643
Signed-off-by: Krutika Dhananjay <kdhananj@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/74766
Reviewed-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
Tested-by: Pranith Kumar Karampuri <pkarampu@redhat.com>
---
 libglusterfs/src/glusterfs.h           |    1 +
 tests/bugs/shard/unlinks-and-renames.t |  282 ++++++++++++++++++++++++++
 xlators/cluster/dht/src/dht-rename.c   |   21 ++-
 xlators/features/shard/src/shard.c     |  343 +++++++++++++++++++-------------
 xlators/storage/posix/src/posix.c      |   99 ++++++++--
 5 files changed, 582 insertions(+), 164 deletions(-)
 create mode 100644 tests/bugs/shard/unlinks-and-renames.t

diff --git a/libglusterfs/src/glusterfs.h b/libglusterfs/src/glusterfs.h
index c8c4590..9d1ea8c 100644
--- a/libglusterfs/src/glusterfs.h
+++ b/libglusterfs/src/glusterfs.h
@@ -250,6 +250,7 @@
 #define TIER_LINKFILE_GFID           "tier-linkfile-gfid"
 #define DHT_SKIP_OPEN_FD_UNLINK     "dont-unlink-for-open-fd"
 #define DHT_IATT_IN_XDATA_KEY       "dht-get-iatt-in-xattr"
+#define GET_LINK_COUNT              "get-link-count"
 
 /*CTR and Marker requires inode dentry link count from posix*/
 #define GF_RESPONSE_LINK_COUNT_XDATA "gf_response_link_count"
diff --git a/tests/bugs/shard/unlinks-and-renames.t b/tests/bugs/shard/unlinks-and-renames.t
new file mode 100644
index 0000000..751874c
--- /dev/null
+++ b/tests/bugs/shard/unlinks-and-renames.t
@@ -0,0 +1,282 @@
+#!/bin/bash
+
+. $(dirname $0)/../../include.rc
+. $(dirname $0)/../../volume.rc
+
+cleanup
+
+# The aim of this test script is to exercise the various codepaths of unlink
+# and rename fops in sharding and make sure they work fine.
+#
+
+#################################################
+################### UNLINK ######################
+#################################################
+
+TEST glusterd
+TEST pidof glusterd
+TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0,1}
+TEST $CLI volume set $V0 features.shard on
+TEST $CLI volume start $V0
+TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0
+
+TEST mkdir $M0/dir
+TEST touch $M0/dir/foo
+TEST touch $M0/dir/new
+
+######################################
+##### Unlink with /.shard absent #####
+######################################
+TEST truncate -s 5M $M0/dir/foo
+TEST ! stat $B0/${V0}0/.shard
+TEST ! stat $B0/${V0}1/.shard
+# Test to ensure that unlink doesn't fail due to absence of /.shard
+TEST unlink $M0/dir/foo
+
+##################################################
+##### Unlink of a sharded file without holes #####
+##################################################
+# Create a 9M sharded file
+TEST dd if=/dev/zero of=$M0/dir/new bs=1024 count=9216
+gfid_new=$(get_gfid_string $M0/dir/new)
+# Ensure its shards are created.
+TEST stat $B0/${V0}0/.shard/$gfid_new.1
+TEST stat $B0/${V0}1/.shard/$gfid_new.1
+TEST stat $B0/${V0}0/.shard/$gfid_new.2
+TEST stat $B0/${V0}1/.shard/$gfid_new.2
+TEST unlink $M0/dir/new
+TEST ! stat $B0/${V0}0/.shard/$gfid_new.1
+TEST ! stat $B0/${V0}1/.shard/$gfid_new.1
+TEST ! stat $B0/${V0}0/.shard/$gfid_new.2
+TEST ! stat $B0/${V0}1/.shard/$gfid_new.2
+TEST ! stat $M0/dir/new
+TEST ! stat $B0/${V0}0/dir/new
+TEST ! stat $B0/${V0}1/dir/new
+
+#######################################
+##### Unlink with /.shard present #####
+#######################################
+TEST truncate -s 5M $M0/dir/foo
+gfid_foo=$(get_gfid_string $M0/dir/foo)
+# Ensure its shards are absent.
+TEST ! stat $B0/${V0}0/.shard/$gfid_foo.1
+TEST ! stat $B0/${V0}1/.shard/$gfid_foo.1
+# Test to ensure that unlink of a sparse file works fine.
+TEST unlink $M0/dir/foo
+TEST ! stat $B0/${V0}0/dir/foo
+TEST ! stat $B0/${V0}1/dir/foo
+TEST ! stat $M0/dir/foo
+
+#############################################################
+##### Unlink of a file with only one block (the zeroth) #####
+#############################################################
+TEST touch $M0/dir/foo
+TEST dd if=/dev/zero of=$M0/dir/foo bs=1024 count=1024
+# Test to ensure that unlink of a sparse file works fine.
+TEST unlink $M0/dir/foo
+TEST ! stat $B0/${V0}0/dir/foo
+TEST ! stat $B0/${V0}1/dir/foo
+TEST ! stat $M0/dir/foo
+
+####################################################
+##### Unlink of a sharded file with hard-links #####
+####################################################
+# Create a 9M sharded file
+TEST dd if=/dev/zero of=$M0/dir/original bs=1024 count=9216
+gfid_original=$(get_gfid_string $M0/dir/original)
+# Ensure its shards are created.
+TEST stat $B0/${V0}0/.shard/$gfid_original.1
+TEST stat $B0/${V0}1/.shard/$gfid_original.1
+TEST stat $B0/${V0}0/.shard/$gfid_original.2
+TEST stat $B0/${V0}1/.shard/$gfid_original.2
+# Create a hard link.
+TEST ln $M0/dir/original $M0/link
+# Now delete the original file.
+TEST unlink $M0/dir/original
+# Ensure the shards are still intact.
+TEST stat $B0/${V0}0/.shard/$gfid_original.1
+TEST stat $B0/${V0}1/.shard/$gfid_original.1
+TEST stat $B0/${V0}0/.shard/$gfid_original.2
+TEST stat $B0/${V0}1/.shard/$gfid_original.2
+TEST ! stat $M0/dir/original
+TEST stat $M0/link
+TEST stat $B0/${V0}0/link
+TEST stat $B0/${V0}1/link
+# Now delete the last link.
+TEST unlink $M0/link
+# Ensure that the shards are all cleaned up.
+TEST ! stat $B0/${V0}0/.shard/$gfid_original.1
+TEST ! stat $B0/${V0}1/.shard/$gfid_original.1
+TEST ! stat $B0/${V0}0/.shard/$gfid_original.2
+TEST ! stat $B0/${V0}1/.shard/$gfid_original.2
+TEST ! stat $M0/link
+TEST ! stat $B0/${V0}0/link
+TEST ! stat $B0/${V0}1/link
+
+EXPECT_WITHIN $UMOUNT_TIMEOUT "Y" force_umount $M0
+TEST $CLI volume stop $V0
+TEST $CLI volume delete $V0
+
+cleanup
+
+#################################################
+################### RENAME ######################
+#################################################
+
+TEST glusterd
+TEST pidof glusterd
+TEST $CLI volume create $V0 replica 2 $H0:$B0/${V0}{0,1}
+TEST $CLI volume set $V0 features.shard on
+TEST $CLI volume start $V0
+TEST glusterfs --volfile-id=$V0 --volfile-server=$H0 $M0
+
+TEST mkdir $M0/dir
+TEST touch $M0/dir/src
+TEST touch $M0/dir/dst
+
+######################################
+##### Rename with /.shard absent #####
+######################################
+TEST truncate -s 5M $M0/dir/dst
+TEST ! stat $B0/${V0}0/.shard
+TEST ! stat $B0/${V0}1/.shard
+# Test to ensure that rename doesn't fail due to absence of /.shard
+TEST mv -f $M0/dir/src $M0/dir/dst
+TEST ! stat $M0/dir/src
+TEST   stat $M0/dir/dst
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+TEST   stat $B0/${V0}0/dir/dst
+TEST   stat $B0/${V0}1/dir/dst
+
+##################################################
+##### Rename to a sharded file without holes #####
+##################################################
+TEST unlink $M0/dir/dst
+TEST touch $M0/dir/src
+# Create a 9M sharded file
+TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=9216
+gfid_dst=$(get_gfid_string $M0/dir/dst)
+# Ensure its shards are created.
+TEST stat $B0/${V0}0/.shard/$gfid_dst.1
+TEST stat $B0/${V0}1/.shard/$gfid_dst.1
+TEST stat $B0/${V0}0/.shard/$gfid_dst.2
+TEST stat $B0/${V0}1/.shard/$gfid_dst.2
+TEST mv -f $M0/dir/src $M0/dir/dst
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.1
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.1
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.2
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.2
+TEST ! stat $M0/dir/src
+TEST   stat $M0/dir/dst
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+TEST   stat $B0/${V0}0/dir/dst
+TEST   stat $B0/${V0}1/dir/dst
+
+###################################################
+##### Rename of dst file with /.shard present #####
+###################################################
+TEST unlink $M0/dir/dst
+TEST touch $M0/dir/src
+TEST truncate -s 5M $M0/dir/dst
+# Test to ensure that unlink of a sparse file works fine.
+TEST mv -f $M0/dir/src $M0/dir/dst
+TEST ! stat $M0/dir/src
+TEST   stat $M0/dir/dst
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+TEST   stat $B0/${V0}0/dir/dst
+TEST   stat $B0/${V0}1/dir/dst
+
+###############################################################
+##### Rename of dst file with only one block (the zeroth) #####
+###############################################################
+TEST unlink $M0/dir/dst
+TEST touch $M0/dir/src
+TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=1024
+# Test to ensure that unlink of a sparse file works fine.
+TEST mv -f $M0/dir/src $M0/dir/dst
+TEST ! stat $M0/dir/src
+TEST   stat $M0/dir/dst
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+TEST   stat $B0/${V0}0/dir/dst
+TEST   stat $B0/${V0}1/dir/dst
+
+########################################################
+##### Rename to a dst sharded file with hard-links #####
+########################################################
+TEST unlink $M0/dir/dst
+TEST touch $M0/dir/src
+# Create a 9M sharded file
+TEST dd if=/dev/zero of=$M0/dir/dst bs=1024 count=9216
+gfid_dst=$(get_gfid_string $M0/dir/dst)
+# Ensure its shards are created.
+TEST stat $B0/${V0}0/.shard/$gfid_dst.1
+TEST stat $B0/${V0}1/.shard/$gfid_dst.1
+TEST stat $B0/${V0}0/.shard/$gfid_dst.2
+TEST stat $B0/${V0}1/.shard/$gfid_dst.2
+# Create a hard link.
+TEST ln $M0/dir/dst $M0/link
+# Now rename src to the dst.
+TEST mv -f $M0/dir/src $M0/dir/dst
+# Ensure the shards are still intact.
+TEST stat $B0/${V0}0/.shard/$gfid_dst.1
+TEST stat $B0/${V0}1/.shard/$gfid_dst.1
+TEST stat $B0/${V0}0/.shard/$gfid_dst.2
+TEST stat $B0/${V0}1/.shard/$gfid_dst.2
+TEST ! stat $M0/dir/src
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+# Now rename another file to the last link.
+TEST touch $M0/dir/src2
+TEST mv -f $M0/dir/src2 $M0/link
+# Ensure that the shards are all cleaned up.
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.1
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.1
+TEST ! stat $B0/${V0}0/.shard/$gfid_dst.2
+TEST ! stat $B0/${V0}1/.shard/$gfid_dst.2
+TEST ! stat $M0/dir/src2
+TEST ! stat $B0/${V0}0/dir/src2
+TEST ! stat $B0/${V0}1/dir/src2
+
+# Rename with non-existent dst and a sharded src
+TEST touch $M0/dir/src
+TEST dd if=/dev/zero of=$M0/dir/src bs=1024 count=9216
+gfid_src=$(get_gfid_string $M0/dir/src)
+# Ensure its shards are created.
+TEST stat $B0/${V0}0/.shard/$gfid_src.1
+TEST stat $B0/${V0}1/.shard/$gfid_src.1
+TEST stat $B0/${V0}0/.shard/$gfid_src.2
+TEST stat $B0/${V0}1/.shard/$gfid_src.2
+# Now rename src to the dst.
+TEST mv $M0/dir/src $M0/dir/dst
+
+TEST   stat $B0/${V0}0/.shard/$gfid_src.1
+TEST   stat $B0/${V0}1/.shard/$gfid_src.1
+TEST   stat $B0/${V0}0/.shard/$gfid_src.2
+TEST   stat $B0/${V0}1/.shard/$gfid_src.2
+TEST ! stat $M0/dir/src
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+TEST   stat $M0/dir/dst
+TEST   stat $B0/${V0}0/dir/dst
+TEST   stat $B0/${V0}1/dir/dst
+
+# Rename with non-existent dst and a sharded src with no shards
+TEST touch $M0/dir/src
+TEST dd if=/dev/zero of=$M0/dir/src bs=1024 count=1024
+gfid_src=$(get_gfid_string $M0/dir/src)
+TEST ! stat $B0/${V0}0/.shard/$gfid_src.1
+TEST ! stat $B0/${V0}1/.shard/$gfid_src.1
+# Now rename src to the dst.
+TEST mv $M0/dir/src $M0/dir/dst
+TEST ! stat $M0/dir/src
+TEST ! stat $B0/${V0}0/dir/src
+TEST ! stat $B0/${V0}1/dir/src
+TEST   stat $M0/dir/dst
+TEST   stat $B0/${V0}0/dir/dst
+TEST   stat $B0/${V0}1/dir/dst
+
+cleanup
diff --git a/xlators/cluster/dht/src/dht-rename.c b/xlators/cluster/dht/src/dht-rename.c
index 82a97bc..4ed659b 100644
--- a/xlators/cluster/dht/src/dht-rename.c
+++ b/xlators/cluster/dht/src/dht-rename.c
@@ -565,7 +565,7 @@ dht_rename_unlock_cbk (call_frame_t *frame, void *cookie,
         DHT_STACK_UNWIND (rename, frame, local->op_ret, local->op_errno,
                           &local->stbuf, &local->preoldparent,
                           &local->postoldparent, &local->preparent,
-                          &local->postparent, NULL);
+                          &local->postparent, local->xattr);
         return 0;
 }
 
@@ -876,6 +876,12 @@ dht_rename_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                                 uuid_utoa(local->loc.inode->gfid):"");
                 }
         }
+        if (xdata) {
+                if (!local->xattr)
+                        local->xattr = dict_ref (xdata);
+                else
+                        local->xattr = dict_copy_with_ref (xdata, local->xattr);
+        }
 
         if ((src_cached == dst_cached) && (dst_hashed != dst_cached)) {
                 link_frame = copy_frame (frame);
@@ -1026,7 +1032,6 @@ dht_do_rename (call_frame_t *frame)
         xlator_t    *dst_cached    = NULL;
         xlator_t    *this          = NULL;
         xlator_t    *rename_subvol = NULL;
-        dict_t      *dict          = NULL;
 
         local = frame->local;
         this  = frame->this;
@@ -1041,11 +1046,12 @@ dht_do_rename (call_frame_t *frame)
                 rename_subvol = dst_hashed;
 
         if ((src_cached != dst_hashed) && (rename_subvol == dst_hashed)) {
-                DHT_MARKER_DONT_ACCOUNT(dict);
+                DHT_MARKER_DONT_ACCOUNT(local->xattr_req);
         }
 
         if (rename_subvol == src_cached) {
-                DHT_CHANGELOG_TRACK_AS_RENAME(dict, &local->loc, &local->loc2);
+                DHT_CHANGELOG_TRACK_AS_RENAME(local->xattr_req, &local->loc,
+                                              &local->loc2);
         }
 
         gf_msg_trace (this->name, 0,
@@ -1056,10 +1062,7 @@ dht_do_rename (call_frame_t *frame)
                 FRAME_SU_DO (frame, dht_local_t);
         STACK_WIND (frame, dht_rename_cbk,
                     rename_subvol, rename_subvol->fops->rename,
-                    &local->loc, &local->loc2, dict);
-        if (dict)
-                dict_unref (dict);
-
+                    &local->loc, &local->loc2, local->xattr_req);
         return 0;
 }
 
@@ -1554,6 +1557,8 @@ dht_rename (call_frame_t *frame, xlator_t *this,
         local->src_cached = src_cached;
         local->dst_hashed = dst_hashed;
         local->dst_cached = dst_cached;
+        if (xdata)
+                local->xattr_req = dict_ref (xdata);
 
         gf_msg (this->name, GF_LOG_INFO, 0,
                 DHT_MSG_RENAME_INFO,
diff --git a/xlators/features/shard/src/shard.c b/xlators/features/shard/src/shard.c
index c7a57e0..d9a61c1 100644
--- a/xlators/features/shard/src/shard.c
+++ b/xlators/features/shard/src/shard.c
@@ -2135,30 +2135,178 @@ err:
 }
 
 int
-shard_unlink_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
-                  int32_t op_ret, int32_t op_errno, struct iatt *preparent,
-                  struct iatt *postparent, dict_t *xdata)
+shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode);
+
+int
+shard_post_lookup_shards_unlink_handler (call_frame_t *frame, xlator_t *this)
 {
-        SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno,  preparent,
-                            postparent, xdata);
+        shard_local_t *local = NULL;
+
+        local = frame->local;
 
+        if ((local->op_ret < 0) && (local->op_errno != ENOENT)) {
+                if (local->fop == GF_FOP_UNLINK)
+                        SHARD_STACK_UNWIND (unlink, frame, local->op_ret,
+                                            local->op_errno, NULL, NULL, NULL);
+                else
+                        SHARD_STACK_UNWIND (rename, frame, local->op_ret,
+                                            local->op_errno, NULL, NULL, NULL,
+                                            NULL, NULL, NULL);
+                return 0;
+        }
+        local->op_ret = 0;
+        local->op_errno = 0;
+
+        shard_unlink_shards_do (frame, this,
+                                (local->fop == GF_FOP_RENAME)
+                                             ? local->loc2.inode
+                                             : local->loc.inode);
         return 0;
 }
 
 int
-shard_unlink_base_file (call_frame_t *frame, xlator_t *this)
+shard_rename_cbk (call_frame_t *frame, xlator_t *this);
+
+int32_t
+shard_unlink_cbk (call_frame_t *frame, xlator_t *this);
+
+int
+shard_post_resolve_unlink_handler (call_frame_t *frame, xlator_t *this)
 {
         shard_local_t *local = NULL;
 
         local = frame->local;
 
         if (local->op_ret < 0) {
-                shard_unlink_cbk (frame, 0, this, local->op_ret,
-                                  local->op_errno, NULL, NULL, NULL);
+                if (local->op_errno == ENOENT) {
+                        /* If lookup on /.shard fails with ENOENT, it probably
+                         * means that the file is being unlinked before it
+                         * could grow beyond its first block. In this case,
+                         * unlink boils down to unlinking the base file and
+                         * unwinding the call.
+                         */
+                        local->op_ret = 0;
+                        local->first_block = local->last_block = 0;
+                        local->num_blocks = 1;
+                        if (local->fop == GF_FOP_UNLINK)
+                                shard_unlink_cbk (frame, this);
+                        else
+                                shard_rename_cbk (frame, this);
+                        return 0;
+                } else {
+                        if (local->fop == GF_FOP_UNLINK)
+                                SHARD_STACK_UNWIND (unlink, frame,
+                                                    local->op_ret,
+                                                    local->op_errno, NULL, NULL,
+                                                    NULL);
+                        else
+                                shard_rename_cbk (frame, this);
+                        return 0;
+                }
+        }
+
+        if (!local->call_count)
+                shard_unlink_shards_do (frame, this,
+                                        (local->fop == GF_FOP_RENAME)
+                                                     ? local->loc2.inode
+                                                     : local->loc.inode);
+        else
+                shard_common_lookup_shards (frame, this,
+                                            (local->fop == GF_FOP_RENAME)
+                                                         ? local->loc2.inode
+                                                         : local->loc.inode,
+                                       shard_post_lookup_shards_unlink_handler);
+        return 0;
+}
+
+int
+shard_unlink_base_file_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                            int32_t op_ret, int32_t op_errno,
+                            struct iatt *preparent, struct iatt *postparent,
+                            dict_t *xdata)
+{
+        int                  ret        = 0;
+        uint32_t             link_count = 0;
+        shard_local_t       *local      = NULL;
+        shard_priv_t        *priv       = NULL;
+
+        local = frame->local;
+        priv = this->private;
+
+        if (op_ret < 0) {
+                SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno, NULL, NULL,
+                                    NULL);
                 return 0;
         }
 
-        STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD(this),
+        /* Because link() does not create links for all but the
+         * base shard, unlink() must delete these shards only when the
+         * link count is 1. We can return safely now.
+         */
+        if ((xdata) && (!dict_get_uint32 (xdata, GET_LINK_COUNT, &link_count))
+            && (link_count > 1))
+                goto unwind;
+
+        local->first_block = get_lowest_block (0, local->block_size);
+        local->last_block = get_highest_block (0, local->prebuf.ia_size,
+                                               local->block_size);
+        local->num_blocks = local->last_block - local->first_block + 1;
+
+        /* num_blocks = 1 implies that the file has not crossed its
+         * shard block size. So unlink boils down to unlinking just the
+         * base file. We can safely return now.
+         */
+        if (local->num_blocks == 1)
+                goto unwind;
+
+        local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
+                                       gf_shard_mt_inode_list);
+        if (!local->inode_list)
+                goto unwind;
+
+        /* Save the xdata and preparent and postparent iatts now. This will be
+         * used at the time of unwinding the call to the parent xl.
+         */
+        local->preoldparent = *preparent;
+        local->postoldparent = *postparent;
+        if (xdata)
+                local->xattr_rsp = dict_ref (xdata);
+
+        local->dot_shard_loc.inode = inode_find (this->itable,
+                                                 priv->dot_shard_gfid);
+        if (!local->dot_shard_loc.inode) {
+                ret = shard_init_dot_shard_loc (this, local);
+                if (ret)
+                        goto unwind;
+                shard_lookup_dot_shard (frame, this,
+                                        shard_post_resolve_unlink_handler);
+        } else {
+                shard_common_resolve_shards (frame, this, local->loc.inode,
+                                             shard_post_resolve_unlink_handler);
+        }
+
+        return 0;
+
+unwind:
+        SHARD_STACK_UNWIND (unlink, frame, op_ret, op_errno,  preparent,
+                            postparent, xdata);
+        return 0;
+}
+
+int
+shard_unlink_base_file (call_frame_t *frame, xlator_t *this)
+{
+        shard_local_t *local = NULL;
+
+        local = frame->local;
+
+        if (dict_set_uint32 (local->xattr_req, GET_LINK_COUNT, 0))
+                gf_msg (this->name, GF_LOG_WARNING, 0,
+                        SHARD_MSG_DICT_SET_FAILED, "Failed to set "
+                        GET_LINK_COUNT" in dict");
+
+        /* To-Do: Request open-fd count on base file */
+        STACK_WIND (frame, shard_unlink_base_file_cbk, FIRST_CHILD(this),
                     FIRST_CHILD(this)->fops->unlink, &local->loc, local->xflag,
                     local->xattr_req);
         return 0;
@@ -2199,6 +2347,17 @@ shard_unlink_block_inode (shard_local_t *local, int shard_block_num)
 int
 shard_rename_cbk (call_frame_t *frame, xlator_t *this);
 
+int32_t
+shard_unlink_cbk (call_frame_t *frame, xlator_t *this)
+{
+        shard_local_t *local = frame->local;
+
+	SHARD_STACK_UNWIND (unlink, frame, local->op_ret, local->op_errno,
+			    &local->preoldparent, &local->postoldparent,
+                            local->xattr_rsp);
+	return 0;
+}
+
 int
 shard_unlink_shards_do_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
                             int32_t op_ret, int32_t op_errno,
@@ -2225,7 +2384,7 @@ done:
                 SHARD_UNSET_ROOT_FS_ID (frame, local);
 
                 if (local->fop == GF_FOP_UNLINK)
-                        shard_unlink_base_file (frame, this);
+                        shard_unlink_cbk (frame, this);
                 else if (local->fop == GF_FOP_RENAME)
                         shard_rename_cbk (frame, this);
                 else
@@ -2254,9 +2413,16 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
 
         priv = this->private;
         local = frame->local;
+
+        /* local->num_blocks includes the base file block. This function only
+         * deletes the shards under /.shard. So subtract num_blocks by 1.
+         */
         local->call_count = call_count = local->num_blocks - 1;
         last_block = local->last_block;
 
+        /* Ignore the inode associated with the base file and start counting
+         * from 1.
+         */
         for (i = 1; i < local->num_blocks; i++) {
                 if (!local->inode_list[i])
                         continue;
@@ -2266,20 +2432,15 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
         if (!count) {
                 /* callcount = 0 implies that all of the shards that need to be
                  * unlinked are non-existent (in other words the file is full of
-                 * holes). So shard xlator would now proceed to do the final
-                 * unlink on the base file.
+                 * holes). So shard xlator can simply return the fop to its
+                 * parent now.
                  */
                 gf_msg_debug (this->name, 0, "All shards that need to be "
                               "unlinked are non-existent: %s",
                               uuid_utoa (inode->gfid));
                 local->num_blocks = 1;
                 if (local->fop == GF_FOP_UNLINK) {
-                        gf_msg_debug (this->name, 0, "Proceeding to unlink the"
-                                      " base file");
-                        STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD(this),
-                                    FIRST_CHILD(this)->fops->unlink,
-                                    &local->loc, local->flags,
-                                    local->xattr_req);
+                        shard_unlink_cbk (frame, this);
                 } else if (local->fop == GF_FOP_RENAME) {
                         gf_msg_debug (this->name, 0, "Resuming rename()");
                         shard_rename_cbk (frame, this);
@@ -2291,6 +2452,8 @@ shard_unlink_shards_do (call_frame_t *frame, xlator_t *this, inode_t *inode)
         cur_block = 1;
         SHARD_SET_ROOT_FS_ID (frame, local);
 
+        /* Ignore the base file and start iterating from the first block shard.
+         */
         while (cur_block <= last_block) {
                 if (!local->inode_list[cur_block]) {
                         cur_block++;
@@ -2347,86 +2510,6 @@ next:
 }
 
 int
-shard_post_lookup_shards_unlink_handler (call_frame_t *frame, xlator_t *this)
-{
-        shard_local_t *local = NULL;
-
-        local = frame->local;
-
-        if ((local->op_ret < 0) && (local->op_errno != ENOENT)) {
-                if (local->fop == GF_FOP_UNLINK)
-                        SHARD_STACK_UNWIND (unlink, frame, local->op_ret,
-                                            local->op_errno, NULL, NULL, NULL);
-                else
-                        SHARD_STACK_UNWIND (rename, frame, local->op_ret,
-                                            local->op_errno, NULL, NULL, NULL,
-                                            NULL, NULL, NULL);
-                return 0;
-        }
-        local->op_ret = 0;
-        local->op_errno = 0;
-
-        shard_unlink_shards_do (frame, this,
-                                (local->fop == GF_FOP_RENAME)
-                                             ? local->loc2.inode
-                                             : local->loc.inode);
-        return 0;
-}
-
-int
-shard_post_resolve_unlink_handler (call_frame_t *frame, xlator_t *this)
-{
-        shard_local_t *local = NULL;
-
-        local = frame->local;
-
-        if (local->op_ret < 0) {
-                if (local->op_errno == ENOENT) {
-                        /* If lookup on /.shard fails with ENOENT, it probably
-                         * means that the file is being unlinked before it
-                         * could grow beyond its first block. In this case,
-                         * unlink boils down to unlinking the base file and
-                         * unwinding the call.
-                         */
-                        local->op_ret = 0;
-                        local->first_block = local->last_block = 0;
-                        local->num_blocks = 1;
-                        if (local->fop == GF_FOP_UNLINK)
-                                STACK_WIND (frame, shard_unlink_cbk,
-                                            FIRST_CHILD(this),
-                                            FIRST_CHILD (this)->fops->unlink,
-                                            &local->loc, local->xflag,
-                                            local->xattr_req);
-                        else
-                                shard_rename_cbk (frame, this);
-                        return 0;
-                } else {
-                        if (local->fop == GF_FOP_UNLINK)
-                                SHARD_STACK_UNWIND (unlink, frame,
-                                                    local->op_ret,
-                                                    local->op_errno, NULL, NULL,
-                                                    NULL);
-                        else
-                                shard_rename_cbk (frame, this);
-                        return 0;
-                }
-        }
-
-        if (!local->call_count)
-                shard_unlink_shards_do (frame, this,
-                                        (local->fop == GF_FOP_RENAME)
-                                                     ? local->loc2.inode
-                                                     : local->loc.inode);
-        else
-                shard_common_lookup_shards (frame, this,
-                                            (local->fop == GF_FOP_RENAME)
-                                                         ? local->loc2.inode
-                                                         : local->loc.inode,
-                                       shard_post_lookup_shards_unlink_handler);
-        return 0;
-}
-
-int
 shard_post_lookup_unlink_handler (call_frame_t *frame, xlator_t *this)
 {
         int            ret   = -1;
@@ -2442,46 +2525,7 @@ shard_post_lookup_unlink_handler (call_frame_t *frame, xlator_t *this)
                 return 0;
         }
 
-        local->first_block = get_lowest_block (0, local->block_size);
-        local->last_block = get_highest_block (0, local->prebuf.ia_size,
-                                               local->block_size);
-        local->num_blocks = local->last_block - local->first_block + 1;
-
-        if ((local->num_blocks == 1) || (local->prebuf.ia_nlink > 1)) {
-                /* num_blocks = 1 implies that the file has not crossed its
-                 * shard block size. So unlink boils down to unlinking just the
-                 * base file.
-                 * Because link() does not create links for all but the
-                 * base shard, unlink() must delete these shards only when the
-                 * link count is 1.
-                 */
-                STACK_WIND (frame, shard_unlink_cbk, FIRST_CHILD (this),
-                            FIRST_CHILD (this)->fops->unlink, &local->loc,
-                            local->xflag, local->xattr_req);
-                return 0;
-        }
-
-        local->inode_list = GF_CALLOC (local->num_blocks, sizeof (inode_t *),
-                                       gf_shard_mt_inode_list);
-        if (!local->inode_list)
-                goto out;
-
-        local->dot_shard_loc.inode = inode_find (this->itable,
-                                                 priv->dot_shard_gfid);
-        if (!local->dot_shard_loc.inode) {
-                ret = shard_init_dot_shard_loc (this, local);
-                if (ret)
-                        goto out;
-                shard_lookup_dot_shard (frame, this,
-                                        shard_post_resolve_unlink_handler);
-        } else {
-                shard_common_resolve_shards (frame, this, local->loc.inode,
-                                             shard_post_resolve_unlink_handler);
-        }
-        return 0;
-
-out:
-        SHARD_STACK_UNWIND (unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
+        shard_unlink_base_file (frame, this);
         return 0;
 }
 
@@ -2524,7 +2568,6 @@ shard_unlink (call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
 
         shard_lookup_base_file (frame, this, &local->loc,
                                 shard_post_lookup_unlink_handler);
-
         return 0;
 err:
         SHARD_STACK_UNWIND (unlink, frame, -1, ENOMEM, NULL, NULL, NULL);
@@ -2549,9 +2592,10 @@ shard_rename_cbk (call_frame_t *frame, xlator_t *this)
 int
 shard_rename_unlink_dst_shards_do (call_frame_t *frame, xlator_t *this)
 {
-        int            ret   = -1;
-        shard_local_t *local = NULL;
-        shard_priv_t  *priv  = NULL;
+        int            ret        = -1;
+        uint32_t       link_count = 0;
+        shard_local_t *local      = NULL;
+        shard_priv_t  *priv       = NULL;
 
         local = frame->local;
         priv = this->private;
@@ -2561,7 +2605,14 @@ shard_rename_unlink_dst_shards_do (call_frame_t *frame, xlator_t *this)
                                                local->dst_block_size);
         local->num_blocks = local->last_block - local->first_block + 1;
 
-        if ((local->num_blocks == 1) || (local->postbuf.ia_nlink > 1)) {
+        if ((local->xattr_rsp) &&
+            (!dict_get_uint32 (local->xattr_rsp, GET_LINK_COUNT, &link_count))
+            && (link_count > 1)) {
+                shard_rename_cbk (frame, this);
+                return 0;
+        }
+
+        if (local->num_blocks == 1) {
                 shard_rename_cbk (frame, this);
                 return 0;
         }
@@ -2664,6 +2715,12 @@ shard_rename_src_base_file (call_frame_t *frame, xlator_t *this)
 
         local = frame->local;
 
+        if (dict_set_uint32 (local->xattr_req, GET_LINK_COUNT, 0))
+                gf_msg (this->name, GF_LOG_WARNING, 0,
+                        SHARD_MSG_DICT_SET_FAILED, "Failed to set "
+                        GET_LINK_COUNT" in dict");
+
+        /* To-Do: Request open-fd count on dst base file */
         STACK_WIND (frame, shard_rename_src_cbk, FIRST_CHILD(this),
                     FIRST_CHILD(this)->fops->rename, &local->loc, &local->loc2,
                     local->xattr_req);
diff --git a/xlators/storage/posix/src/posix.c b/xlators/storage/posix/src/posix.c
index 94e3c45..d5c18f1 100644
--- a/xlators/storage/posix/src/posix.c
+++ b/xlators/storage/posix/src/posix.c
@@ -1660,13 +1660,13 @@ out:
 int32_t
 posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
                                     struct iatt *stbuf, int32_t *op_errno,
-                                    loc_t *loc)
+                                    loc_t *loc, gf_boolean_t get_link_count,
+                                    dict_t *rsp_dict)
 {
-        int32_t                 ret    = 0;
-        struct posix_private    *priv  = NULL;
-        int fd_count = 0;
-
-        priv = this->private;
+        int                    fd_count = 0;
+        int32_t                ret      = 0;
+        struct iatt            prebuf   = {0,};
+        gf_boolean_t           locked   = _gf_false;
 
         /*  Unlink the gfid_handle_first */
         if (stbuf && stbuf->ia_nlink == 1) {
@@ -1689,6 +1689,18 @@ posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
                 }
         }
 
+        if (get_link_count) {
+                LOCK (&loc->inode->lock);
+                locked = _gf_true;
+                ret = posix_pstat (this, loc->gfid, real_path, &prebuf);
+                if (ret) {
+                        gf_msg (this->name, GF_LOG_ERROR, errno,
+                                P_MSG_LSTAT_FAILED, "lstat on %s failed",
+                                real_path);
+                        goto err;
+                }
+        }
+
         /* Unlink the actual file */
         ret = sys_unlink (real_path);
         if (ret == -1) {
@@ -1699,9 +1711,23 @@ posix_unlink_gfid_handle_and_entry (xlator_t *this, const char *real_path,
                 goto err;
         }
 
+        if (locked) {
+                UNLOCK (&loc->inode->lock);
+                locked = _gf_false;
+        }
+
+        ret = dict_set_uint32 (rsp_dict, GET_LINK_COUNT, prebuf.ia_nlink);
+        if (ret)
+                gf_msg (this->name, GF_LOG_WARNING, 0, P_MSG_SET_XDATA_FAIL,
+                        "failed to set "GET_LINK_COUNT" for %s", real_path);
+
         return 0;
 
 err:
+        if (locked) {
+                UNLOCK (&loc->inode->lock);
+                locked = _gf_false;
+        }
         return -1;
 }
 
@@ -1792,6 +1818,7 @@ posix_unlink (call_frame_t *frame, xlator_t *this,
         void                  *uuid               = NULL;
         char                   uuid_str[GF_UUID_BUF_SIZE] = {0};
         char                   gfid_str[GF_UUID_BUF_SIZE] = {0};
+        gf_boolean_t           get_link_count     = _gf_false;
 
         DECLARE_OLD_FS_ID_VAR;
 
@@ -1915,18 +1942,23 @@ posix_unlink (call_frame_t *frame, xlator_t *this,
                 }
         }
 
-        op_ret =  posix_unlink_gfid_handle_and_entry (this, real_path, &stbuf,
-                                                      &op_errno, loc);
-        if (op_ret == -1) {
-                goto out;
-        }
-
         unwind_dict = dict_new ();
         if (!unwind_dict) {
                 op_errno = -ENOMEM;
                 op_ret = -1;
                 goto out;
         }
+
+        if (xdata && dict_get (xdata, GET_LINK_COUNT))
+                get_link_count = _gf_true;
+        op_ret =  posix_unlink_gfid_handle_and_entry (this, real_path, &stbuf,
+                                                      &op_errno, loc,
+                                                      get_link_count,
+                                                      unwind_dict);
+        if (op_ret == -1) {
+                goto out;
+        }
+
         if (fdstat_requested) {
                 op_ret = posix_fdstat (this, fd, &postbuf);
                 if (op_ret == -1) {
@@ -2250,6 +2282,8 @@ posix_rename (call_frame_t *frame, xlator_t *this,
         char                 *pgfid_xattr_key = NULL;
         int32_t               nlink_samepgfid = 0;
         dict_t               *unwind_dict     = NULL;
+        gf_boolean_t          locked          = _gf_false;
+        gf_boolean_t          get_link_count  = _gf_false;
 
         DECLARE_OLD_FS_ID_VAR;
 
@@ -2276,6 +2310,13 @@ posix_rename (call_frame_t *frame, xlator_t *this,
                 goto out;
         }
 
+        unwind_dict = dict_new ();
+        if (!unwind_dict) {
+                op_ret = -1;
+                op_errno = ENOMEM;
+                goto out;
+        }
+
         op_ret = posix_pstat (this, oldloc->pargfid, par_oldpath, &preoldparent);
         if (op_ret == -1) {
                 op_errno = errno;
@@ -2342,6 +2383,22 @@ posix_rename (call_frame_t *frame, xlator_t *this,
                                                    this, unlock);
                 }
 
+                if ((xdata) && (dict_get (xdata, GET_LINK_COUNT))
+                    && (real_newpath) && (was_present)) {
+                        LOCK (&newloc->inode->lock);
+                        locked = _gf_true;
+                        get_link_count = _gf_true;
+                        op_ret = posix_pstat (this, newloc->gfid, real_newpath,
+                                              &stbuf);
+                        if ((op_ret == -1) && (errno != ENOENT)) {
+                                op_errno = errno;
+                                gf_msg (this->name, GF_LOG_ERROR, errno,
+                                        P_MSG_LSTAT_FAILED,
+                                        "lstat on %s failed", real_newpath);
+                                goto unlock;
+                        }
+                }
+
                 op_ret = sys_rename (real_oldpath, real_newpath);
                 if (op_ret == -1) {
                         op_errno = errno;
@@ -2369,6 +2426,18 @@ posix_rename (call_frame_t *frame, xlator_t *this,
                         goto unlock;
                 }
 
+                if (locked) {
+                        UNLOCK (&newloc->inode->lock);
+                        locked = _gf_false;
+                }
+
+                if ((get_link_count) &&
+                    (dict_set_uint32 (unwind_dict, GET_LINK_COUNT,
+                                      stbuf.ia_nlink)))
+                        gf_msg (this->name, GF_LOG_WARNING, 0,
+                                P_MSG_SET_XDATA_FAIL, "failed to set "
+                                GET_LINK_COUNT" for %s", real_newpath);
+
                 if (!IA_ISDIR (oldloc->inode->ia_type)
                     && priv->update_pgfid_nlinks) {
                         MAKE_PGFID_XATTR_KEY (pgfid_xattr_key,
@@ -2382,6 +2451,10 @@ posix_rename (call_frame_t *frame, xlator_t *this,
                 }
         }
 unlock:
+        if (locked) {
+                UNLOCK (&newloc->inode->lock);
+                locked = _gf_false;
+        }
         UNLOCK (&oldloc->inode->lock);
 
         if (op_ret < 0) {
@@ -2430,7 +2503,7 @@ unlock:
         }
 
         if (was_present)
-                unwind_dict = posix_dict_set_nlink (xdata, NULL, nlink);
+                unwind_dict = posix_dict_set_nlink (xdata, unwind_dict, nlink);
         op_ret = 0;
 out:
 
-- 
1.7.1