74096c
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
74096c
From: Xavi Hernandez <xhernandez@redhat.com>
74096c
Date: Fri, 12 Jun 2020 00:06:36 +0200
74096c
Subject: [PATCH 455/456] locks: prevent deletion of locked entries
74096c
74096c
To keep consistency inside transactions started by locking an entry or
74096c
an inode, this change delays the removal of entries that are currently
74096c
locked by one or more clients. Once all locks are released, the removal
74096c
is processed.
74096c
74096c
It has also been improved the detection of stale inodes in the locking
74096c
code of EC.
74096c
74096c
>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
74096c
>Fixes: #990
74096c
74096c
Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
74096c
BUG: 1812789
74096c
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
74096c
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
74096c
Tested-by: RHGS Build Bot <nigelb@redhat.com>
74096c
---
74096c
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
74096c
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
74096c
 xlators/features/locks/src/common.h  |  43 +++++
74096c
 xlators/features/locks/src/entrylk.c |  19 +--
74096c
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
74096c
 xlators/features/locks/src/locks.h   |  23 ++-
74096c
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
74096c
 7 files changed, 689 insertions(+), 114 deletions(-)
74096c
74096c
diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
74096c
index ffcac07..db86296 100644
74096c
--- a/xlators/cluster/ec/src/ec-locks.c
74096c
+++ b/xlators/cluster/ec/src/ec-locks.c
74096c
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
74096c
     ec_t *ec = fop->xl->private;
74096c
     ec_cbk_data_t *ans = NULL;
74096c
     ec_cbk_data_t *cbk = NULL;
74096c
-    uintptr_t locked = 0, notlocked = 0;
74096c
+    uintptr_t locked = 0;
74096c
+    int32_t good = 0;
74096c
+    int32_t eagain = 0;
74096c
+    int32_t estale = 0;
74096c
     int32_t error = -1;
74096c
 
74096c
+    /* There are some errors that we'll handle in an special way while trying
74096c
+     * to acquire a lock.
74096c
+     *
74096c
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
74096c
+     *            consider that there's contention on the inode, so we consider
74096c
+     *            the acquisition a failure and try again with a sequential
74096c
+     *            blocking lock request. This will ensure that we get a lock on
74096c
+     *            as many bricks as possible (ignoring EAGAIN here would cause
74096c
+     *            unnecessary triggers of self-healing).
74096c
+     *
74096c
+     *            If it's found during a sequential blocking lock request, it's
74096c
+     *            considered an error. Lock will only succeed if there are
74096c
+     *            enough other bricks locked.
74096c
+     *
74096c
+     *   ESTALE:  This can appear during parallel or sequential lock request if
74096c
+     *            the inode has just been unlinked. We consider this error is
74096c
+     *            not recoverable, but we also don't consider it as fatal. So,
74096c
+     *            if it happens during parallel lock, we won't attempt a
74096c
+     *            sequential one unless there are EAGAIN errors on other
74096c
+     *            bricks (and are enough to form a quorum), but if we reach
74096c
+     *            quorum counting the ESTALE bricks, we consider the whole
74096c
+     *            result of the operation is ESTALE instead of EIO.
74096c
+     */
74096c
+
74096c
     list_for_each_entry(ans, &fop->cbk_list, list)
74096c
     {
74096c
         if (ans->op_ret >= 0) {
74096c
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
74096c
                 error = EIO;
74096c
             }
74096c
             locked |= ans->mask;
74096c
+            good = ans->count;
74096c
             cbk = ans;
74096c
-        } else {
74096c
-            if (ans->op_errno == EAGAIN) {
74096c
-                switch (fop->uint32) {
74096c
-                    case EC_LOCK_MODE_NONE:
74096c
-                    case EC_LOCK_MODE_ALL:
74096c
-                        /* Goal is to treat non-blocking lock as failure
74096c
-                         * even if there is a single EAGAIN*/
74096c
-                        notlocked |= ans->mask;
74096c
-                        break;
74096c
-                }
74096c
-            }
74096c
+        } else if (ans->op_errno == ESTALE) {
74096c
+            estale += ans->count;
74096c
+        } else if ((ans->op_errno == EAGAIN) &&
74096c
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
74096c
+            eagain += ans->count;
74096c
         }
74096c
     }
74096c
 
74096c
     if (error == -1) {
74096c
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
74096c
-            if (notlocked == 0) {
74096c
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
74096c
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
74096c
+         * we retry with a sequential blocking lock request if needed.
74096c
+         * Otherwise we succeed. */
74096c
+        if ((good + eagain) >= ec->fragments) {
74096c
+            if (eagain == 0) {
74096c
                 if (fop->answer == NULL) {
74096c
                     fop->answer = cbk;
74096c
                 }
74096c
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
74096c
                     case EC_LOCK_MODE_NONE:
74096c
                         error = EAGAIN;
74096c
                         break;
74096c
-
74096c
                     case EC_LOCK_MODE_ALL:
74096c
                         fop->uint32 = EC_LOCK_MODE_INC;
74096c
                         break;
74096c
-
74096c
                     default:
74096c
+                        /* This shouldn't happen because eagain cannot be > 0
74096c
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
74096c
                         error = EIO;
74096c
                         break;
74096c
                 }
74096c
             }
74096c
         } else {
74096c
-            if (fop->answer && fop->answer->op_ret < 0)
74096c
+            /* We have been unable to find enough candidates that will be able
74096c
+             * to take the lock. If we have quorum on some answer, we return
74096c
+             * it. Otherwise we check if ESTALE answers allow us to reach
74096c
+             * quorum. If so, we return ESTALE. */
74096c
+            if (fop->answer && fop->answer->op_ret < 0) {
74096c
                 error = fop->answer->op_errno;
74096c
-            else
74096c
+            } else if ((good + eagain + estale) >= ec->fragments) {
74096c
+                error = ESTALE;
74096c
+            } else {
74096c
                 error = EIO;
74096c
+            }
74096c
         }
74096c
     }
74096c
 
74096c
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
74096c
index 1406e70..0c52853 100644
74096c
--- a/xlators/features/locks/src/common.c
74096c
+++ b/xlators/features/locks/src/common.c
74096c
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
74096c
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
74096c
         INIT_LIST_HEAD(&pl_inode->metalk_list);
74096c
         INIT_LIST_HEAD(&pl_inode->queued_locks);
74096c
+        INIT_LIST_HEAD(&pl_inode->waiting);
74096c
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
74096c
 
74096c
         pl_inode->check_mlock_info = _gf_true;
74096c
         pl_inode->mlock_enforced = _gf_false;
74096c
 
74096c
+        /* -2 means never looked up. -1 means something went wrong and link
74096c
+         * tracking is disabled. */
74096c
+        pl_inode->links = -2;
74096c
+
74096c
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
74096c
         if (ret) {
74096c
             pthread_mutex_destroy(&pl_inode->mutex);
74096c
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
74096c
     }
74096c
 
74096c
     return 0;
74096c
-}
74096c
\ No newline at end of file
74096c
+}
74096c
+
74096c
+gf_boolean_t
74096c
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
74096c
+{
74096c
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
74096c
+        return _gf_true;
74096c
+    }
74096c
+
74096c
+    if (is_lk_owner_null(owner)) {
74096c
+        return _gf_false;
74096c
+    }
74096c
+    return _gf_true;
74096c
+}
74096c
+
74096c
+static int32_t
74096c
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
74096c
+{
74096c
+    inode_t *inode = NULL;
74096c
+    int32_t error = 0;
74096c
+
74096c
+    if (loc->inode != NULL) {
74096c
+        inode = inode_ref(loc->inode);
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+    if (loc->parent == NULL) {
74096c
+        error = EINVAL;
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+    if (!gf_uuid_is_null(loc->gfid)) {
74096c
+        inode = inode_find(loc->parent->table, loc->gfid);
74096c
+        if (inode != NULL) {
74096c
+            goto done;
74096c
+        }
74096c
+    }
74096c
+
74096c
+    if (loc->name == NULL) {
74096c
+        error = EINVAL;
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
74096c
+    if (inode == NULL) {
74096c
+        /* We haven't found any inode. This means that the file doesn't exist
74096c
+         * or that even if it exists, we don't have any knowledge about it, so
74096c
+         * we don't have locks on it either, which is fine for our purposes. */
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+done:
74096c
+    *pinode = inode;
74096c
+
74096c
+    return error;
74096c
+}
74096c
+
74096c
+static gf_boolean_t
74096c
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
74096c
+                    struct timespec *now, struct list_head *contend)
74096c
+{
74096c
+    pl_dom_list_t *dom;
74096c
+    pl_inode_lock_t *lock;
74096c
+    gf_boolean_t has_owners = _gf_false;
74096c
+
74096c
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
74096c
+    {
74096c
+        list_for_each_entry(lock, &dom->inodelk_list, list)
74096c
+        {
74096c
+            /* If the lock belongs to the same client, we assume it's related
74096c
+             * to the same operation, so we allow the removal to continue. */
74096c
+            if (lock->client == client) {
74096c
+                continue;
74096c
+            }
74096c
+            /* If the lock belongs to an internal process, we don't block the
74096c
+             * removal. */
74096c
+            if (lock->client_pid < 0) {
74096c
+                continue;
74096c
+            }
74096c
+            if (contend == NULL) {
74096c
+                return _gf_true;
74096c
+            }
74096c
+            has_owners = _gf_true;
74096c
+            inodelk_contention_notify_check(xl, lock, now, contend);
74096c
+        }
74096c
+    }
74096c
+
74096c
+    return has_owners;
74096c
+}
74096c
+
74096c
+int32_t
74096c
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
74096c
+                        pl_inode_t **ppl_inode, struct list_head *contend)
74096c
+{
74096c
+    struct timespec now;
74096c
+    inode_t *inode;
74096c
+    pl_inode_t *pl_inode;
74096c
+    int32_t error;
74096c
+
74096c
+    pl_inode = NULL;
74096c
+
74096c
+    error = pl_inode_from_loc(loc, &inode;;
74096c
+    if ((error != 0) || (inode == NULL)) {
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+    pl_inode = pl_inode_get(xl, inode, NULL);
74096c
+    if (pl_inode == NULL) {
74096c
+        inode_unref(inode);
74096c
+        error = ENOMEM;
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+    /* pl_inode_from_loc() already increments ref count for inode, so
74096c
+     * we only assign here our reference. */
74096c
+    pl_inode->inode = inode;
74096c
+
74096c
+    timespec_now(&now;;
74096c
+
74096c
+    pthread_mutex_lock(&pl_inode->mutex);
74096c
+
74096c
+    if (pl_inode->removed) {
74096c
+        error = ESTALE;
74096c
+        goto unlock;
74096c
+    }
74096c
+
74096c
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
74096c
+        error = -1;
74096c
+        /* We skip the unlock here because the caller must create a stub when
74096c
+         * we return -1 and do a call to pl_inode_remove_complete(), which
74096c
+         * assumes the lock is still acquired and will release it once
74096c
+         * everything else is prepared. */
74096c
+        goto done;
74096c
+    }
74096c
+
74096c
+    pl_inode->is_locked = _gf_true;
74096c
+    pl_inode->remove_running++;
74096c
+
74096c
+unlock:
74096c
+    pthread_mutex_unlock(&pl_inode->mutex);
74096c
+
74096c
+done:
74096c
+    *ppl_inode = pl_inode;
74096c
+
74096c
+    return error;
74096c
+}
74096c
+
74096c
+int32_t
74096c
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
74096c
+                         struct list_head *contend)
74096c
+{
74096c
+    pl_inode_lock_t *lock;
74096c
+    int32_t error = -1;
74096c
+
74096c
+    if (stub != NULL) {
74096c
+        list_add_tail(&stub->list, &pl_inode->waiting);
74096c
+        pl_inode->is_locked = _gf_true;
74096c
+    } else {
74096c
+        error = ENOMEM;
74096c
+
74096c
+        while (!list_empty(contend)) {
74096c
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
74096c
+            list_del_init(&lock->list);
74096c
+            __pl_inodelk_unref(lock);
74096c
+        }
74096c
+    }
74096c
+
74096c
+    pthread_mutex_unlock(&pl_inode->mutex);
74096c
+
74096c
+    if (error < 0) {
74096c
+        inodelk_contention_notify(xl, contend);
74096c
+    }
74096c
+
74096c
+    inode_unref(pl_inode->inode);
74096c
+
74096c
+    return error;
74096c
+}
74096c
+
74096c
+void
74096c
+pl_inode_remove_wake(struct list_head *list)
74096c
+{
74096c
+    call_stub_t *stub;
74096c
+
74096c
+    while (!list_empty(list)) {
74096c
+        stub = list_first_entry(list, call_stub_t, list);
74096c
+        list_del_init(&stub->list);
74096c
+
74096c
+        call_resume(stub);
74096c
+    }
74096c
+}
74096c
+
74096c
+void
74096c
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
74096c
+{
74096c
+    struct list_head contend, granted;
74096c
+    struct timespec now;
74096c
+    pl_dom_list_t *dom;
74096c
+
74096c
+    if (pl_inode == NULL) {
74096c
+        return;
74096c
+    }
74096c
+
74096c
+    INIT_LIST_HEAD(&contend);
74096c
+    INIT_LIST_HEAD(&granted);
74096c
+    timespec_now(&now;;
74096c
+
74096c
+    pthread_mutex_lock(&pl_inode->mutex);
74096c
+
74096c
+    if (error == 0) {
74096c
+        if (pl_inode->links >= 0) {
74096c
+            pl_inode->links--;
74096c
+        }
74096c
+        if (pl_inode->links == 0) {
74096c
+            pl_inode->removed = _gf_true;
74096c
+        }
74096c
+    }
74096c
+
74096c
+    pl_inode->remove_running--;
74096c
+
74096c
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
74096c
+        pl_inode->is_locked = _gf_false;
74096c
+
74096c
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
74096c
+        {
74096c
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
74096c
+                                        &contend);
74096c
+        }
74096c
+    }
74096c
+
74096c
+    pthread_mutex_unlock(&pl_inode->mutex);
74096c
+
74096c
+    unwind_granted_inodes(xl, pl_inode, &granted);
74096c
+
74096c
+    inodelk_contention_notify(xl, &contend);
74096c
+
74096c
+    inode_unref(pl_inode->inode);
74096c
+}
74096c
+
74096c
+void
74096c
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
74096c
+                         struct list_head *list)
74096c
+{
74096c
+    call_stub_t *stub, *tmp;
74096c
+
74096c
+    if (!pl_inode->is_locked) {
74096c
+        return;
74096c
+    }
74096c
+
74096c
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
74096c
+    {
74096c
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
74096c
+                                 NULL)) {
74096c
+            list_move_tail(&stub->list, list);
74096c
+        }
74096c
+    }
74096c
+}
74096c
+
74096c
+/* This function determines if an inodelk attempt can be done now or it needs
74096c
+ * to wait.
74096c
+ *
74096c
+ * Possible return values:
74096c
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
74096c
+ *        inode has been deleted previously by unlink/rmdir/rename
74096c
+ *   = 0: The lock can be attempted.
74096c
+ *   > 0: The lock needs to wait because a conflicting remove operation is
74096c
+ *        ongoing.
74096c
+ */
74096c
+int32_t
74096c
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
74096c
+{
74096c
+    pl_dom_list_t *dom;
74096c
+    pl_inode_lock_t *ilock;
74096c
+
74096c
+    /* If the inode has been deleted, we won't allow any lock. */
74096c
+    if (pl_inode->removed) {
74096c
+        return -ESTALE;
74096c
+    }
74096c
+
74096c
+    /* We only synchronize with locks made for regular operations coming from
74096c
+     * the user. Locks done for internal purposes are hard to control and could
74096c
+     * lead to long delays or deadlocks quite easily. */
74096c
+    if (lock->client_pid < 0) {
74096c
+        return 0;
74096c
+    }
74096c
+    if (!pl_inode->is_locked) {
74096c
+        return 0;
74096c
+    }
74096c
+    if (pl_inode->remove_running > 0) {
74096c
+        return 1;
74096c
+    }
74096c
+
74096c
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
74096c
+    {
74096c
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
74096c
+        {
74096c
+            /* If a lock from the same client is already granted, we allow this
74096c
+             * one to continue. This is necessary to prevent deadlocks when
74096c
+             * multiple locks are taken for the same operation.
74096c
+             *
74096c
+             * On the other side it's unlikely that the same client sends
74096c
+             * completely unrelated locks for the same inode.
74096c
+             */
74096c
+            if (ilock->client == lock->client) {
74096c
+                return 0;
74096c
+            }
74096c
+        }
74096c
+    }
74096c
+
74096c
+    return 1;
74096c
+}
74096c
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
74096c
index ea86b96..6c81ac3 100644
74096c
--- a/xlators/features/locks/src/common.h
74096c
+++ b/xlators/features/locks/src/common.h
74096c
@@ -105,6 +105,15 @@ void
74096c
 __pl_inodelk_unref(pl_inode_lock_t *lock);
74096c
 
74096c
 void
74096c
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
+                            struct list_head *granted, pl_dom_list_t *dom,
74096c
+                            struct timespec *now, struct list_head *contend);
74096c
+
74096c
+void
74096c
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
74096c
+                      struct list_head *granted);
74096c
+
74096c
+void
74096c
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
                           pl_dom_list_t *dom, struct timespec *now,
74096c
                           struct list_head *contend);
74096c
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
74096c
 void
74096c
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
74096c
 
74096c
+void
74096c
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
74096c
+                                struct timespec *now,
74096c
+                                struct list_head *contend);
74096c
+
74096c
+void
74096c
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
74096c
+                                struct timespec *now,
74096c
+                                struct list_head *contend);
74096c
+
74096c
 gf_boolean_t
74096c
 pl_does_monkey_want_stuck_lock();
74096c
 
74096c
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
74096c
 int
74096c
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
74096c
 
74096c
+gf_boolean_t
74096c
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
74096c
+
74096c
+int32_t
74096c
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
74096c
+                        pl_inode_t **ppl_inode, struct list_head *contend);
74096c
+
74096c
+int32_t
74096c
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
74096c
+                         struct list_head *contend);
74096c
+
74096c
+void
74096c
+pl_inode_remove_wake(struct list_head *list);
74096c
+
74096c
+void
74096c
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
74096c
+
74096c
+void
74096c
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
74096c
+                         struct list_head *list);
74096c
+
74096c
+int32_t
74096c
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
74096c
+
74096c
 #endif /* __COMMON_H__ */
74096c
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
74096c
index 93c649c..b97836f 100644
74096c
--- a/xlators/features/locks/src/entrylk.c
74096c
+++ b/xlators/features/locks/src/entrylk.c
74096c
@@ -197,9 +197,9 @@ out:
74096c
     return revoke_lock;
74096c
 }
74096c
 
74096c
-static gf_boolean_t
74096c
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
74096c
-                                  struct timespec *now)
74096c
+void
74096c
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
74096c
+                                struct timespec *now, struct list_head *contend)
74096c
 {
74096c
     posix_locks_private_t *priv;
74096c
     int64_t elapsed;
74096c
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
74096c
     /* If this lock is in a list, it means that we are about to send a
74096c
      * notification for it, so no need to do anything else. */
74096c
     if (!list_empty(&lock->contend)) {
74096c
-        return _gf_false;
74096c
+        return;
74096c
     }
74096c
 
74096c
     elapsed = now->tv_sec;
74096c
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
74096c
         elapsed--;
74096c
     }
74096c
     if (elapsed < priv->notify_contention_delay) {
74096c
-        return _gf_false;
74096c
+        return;
74096c
     }
74096c
 
74096c
     /* All contention notifications will be sent outside of the locked
74096c
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
74096c
 
74096c
     lock->contention_time = *now;
74096c
 
74096c
-    return _gf_true;
74096c
+    list_add_tail(&lock->contend, contend);
74096c
 }
74096c
 
74096c
 void
74096c
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
74096c
                     break;
74096c
                 }
74096c
             }
74096c
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
74096c
-                list_add_tail(&tmp->contend, contend);
74096c
-            }
74096c
+            entrylk_contention_notify_check(this, tmp, now, contend);
74096c
         }
74096c
     }
74096c
 
74096c
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
74096c
 
74096c
         if (bl_ret == 0) {
74096c
-            list_add(&bl->blocked_locks, granted);
74096c
+            list_add_tail(&bl->blocked_locks, granted);
74096c
         }
74096c
     }
74096c
-    return;
74096c
 }
74096c
 
74096c
 /* Grants locks if possible which are blocked on a lock */
74096c
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
74096c
index 24dee49..1a07243 100644
74096c
--- a/xlators/features/locks/src/inodelk.c
74096c
+++ b/xlators/features/locks/src/inodelk.c
74096c
@@ -231,9 +231,9 @@ out:
74096c
     return revoke_lock;
74096c
 }
74096c
 
74096c
-static gf_boolean_t
74096c
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
74096c
-                                  struct timespec *now)
74096c
+void
74096c
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
74096c
+                                struct timespec *now, struct list_head *contend)
74096c
 {
74096c
     posix_locks_private_t *priv;
74096c
     int64_t elapsed;
74096c
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
74096c
     /* If this lock is in a list, it means that we are about to send a
74096c
      * notification for it, so no need to do anything else. */
74096c
     if (!list_empty(&lock->contend)) {
74096c
-        return _gf_false;
74096c
+        return;
74096c
     }
74096c
 
74096c
     elapsed = now->tv_sec;
74096c
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
74096c
         elapsed--;
74096c
     }
74096c
     if (elapsed < priv->notify_contention_delay) {
74096c
-        return _gf_false;
74096c
+        return;
74096c
     }
74096c
 
74096c
     /* All contention notifications will be sent outside of the locked
74096c
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
74096c
 
74096c
     lock->contention_time = *now;
74096c
 
74096c
-    return _gf_true;
74096c
+    list_add_tail(&lock->contend, contend);
74096c
 }
74096c
 
74096c
 void
74096c
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
74096c
                     break;
74096c
                 }
74096c
             }
74096c
-            if (__inodelk_needs_contention_notify(this, l, now)) {
74096c
-                list_add_tail(&l->contend, contend);
74096c
-            }
74096c
+            inodelk_contention_notify_check(this, l, now, contend);
74096c
         }
74096c
     }
74096c
 
74096c
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
74096c
                struct list_head *contend)
74096c
 {
74096c
     pl_inode_lock_t *conf = NULL;
74096c
-    int ret = -EINVAL;
74096c
+    int ret;
74096c
 
74096c
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
74096c
-    if (conf) {
74096c
-        ret = __lock_blocked_add(this, dom, lock, can_block);
74096c
-        goto out;
74096c
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
74096c
+    if (ret < 0) {
74096c
+        return ret;
74096c
+    }
74096c
+    if (ret == 0) {
74096c
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
74096c
+    }
74096c
+    if ((ret > 0) || (conf != NULL)) {
74096c
+        return __lock_blocked_add(this, dom, lock, can_block);
74096c
     }
74096c
 
74096c
     /* To prevent blocked locks starvation, check if there are any blocked
74096c
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
74096c
                    "starvation");
74096c
         }
74096c
 
74096c
-        ret = __lock_blocked_add(this, dom, lock, can_block);
74096c
-        goto out;
74096c
+        return __lock_blocked_add(this, dom, lock, can_block);
74096c
     }
74096c
     __pl_inodelk_ref(lock);
74096c
     gettimeofday(&lock->granted_time, NULL);
74096c
     list_add(&lock->list, &dom->inodelk_list);
74096c
 
74096c
-    ret = 0;
74096c
-
74096c
-out:
74096c
-    return ret;
74096c
+    return 0;
74096c
 }
74096c
 
74096c
 /* Return true if the two inodelks have exactly same lock boundaries */
74096c
@@ -529,12 +528,11 @@ out:
74096c
     return conf;
74096c
 }
74096c
 
74096c
-static void
74096c
+void
74096c
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
                             struct list_head *granted, pl_dom_list_t *dom,
74096c
                             struct timespec *now, struct list_head *contend)
74096c
 {
74096c
-    int bl_ret = 0;
74096c
     pl_inode_lock_t *bl = NULL;
74096c
     pl_inode_lock_t *tmp = NULL;
74096c
 
74096c
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
     {
74096c
         list_del_init(&bl->blocked_locks);
74096c
 
74096c
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
74096c
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
74096c
 
74096c
-        if (bl_ret == 0) {
74096c
-            list_add(&bl->blocked_locks, granted);
74096c
+        if (bl->status != -EAGAIN) {
74096c
+            list_add_tail(&bl->blocked_locks, granted);
74096c
         }
74096c
     }
74096c
-    return;
74096c
 }
74096c
 
74096c
-/* Grant all inodelks blocked on a lock */
74096c
 void
74096c
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
-                          pl_dom_list_t *dom, struct timespec *now,
74096c
-                          struct list_head *contend)
74096c
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
74096c
+                      struct list_head *granted)
74096c
 {
74096c
-    struct list_head granted;
74096c
     pl_inode_lock_t *lock;
74096c
     pl_inode_lock_t *tmp;
74096c
+    int32_t op_ret;
74096c
+    int32_t op_errno;
74096c
 
74096c
-    INIT_LIST_HEAD(&granted);
74096c
-
74096c
-    pthread_mutex_lock(&pl_inode->mutex);
74096c
-    {
74096c
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
74096c
-                                    contend);
74096c
-    }
74096c
-    pthread_mutex_unlock(&pl_inode->mutex);
74096c
-
74096c
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
74096c
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
74096c
     {
74096c
-        gf_log(this->name, GF_LOG_TRACE,
74096c
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
74096c
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
74096c
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
74096c
-               lock->user_flock.l_len);
74096c
-
74096c
+        if (lock->status == 0) {
74096c
+            op_ret = 0;
74096c
+            op_errno = 0;
74096c
+            gf_log(this->name, GF_LOG_TRACE,
74096c
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
74096c
+                   " => Granted",
74096c
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
74096c
+                   lock->client_pid, lkowner_utoa(&lock->owner),
74096c
+                   lock->user_flock.l_start, lock->user_flock.l_len);
74096c
+        } else {
74096c
+            op_ret = -1;
74096c
+            op_errno = -lock->status;
74096c
+        }
74096c
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
74096c
-                     0, 0, lock->volume);
74096c
+                     op_ret, op_errno, lock->volume);
74096c
 
74096c
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
74096c
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
74096c
         lock->frame = NULL;
74096c
     }
74096c
 
74096c
     pthread_mutex_lock(&pl_inode->mutex);
74096c
     {
74096c
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
74096c
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
74096c
         {
74096c
             list_del_init(&lock->blocked_locks);
74096c
             __pl_inodelk_unref(lock);
74096c
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
     pthread_mutex_unlock(&pl_inode->mutex);
74096c
 }
74096c
 
74096c
+/* Grant all inodelks blocked on a lock */
74096c
+void
74096c
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
74096c
+                          pl_dom_list_t *dom, struct timespec *now,
74096c
+                          struct list_head *contend)
74096c
+{
74096c
+    struct list_head granted;
74096c
+
74096c
+    INIT_LIST_HEAD(&granted);
74096c
+
74096c
+    pthread_mutex_lock(&pl_inode->mutex);
74096c
+    {
74096c
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
74096c
+                                    contend);
74096c
+    }
74096c
+    pthread_mutex_unlock(&pl_inode->mutex);
74096c
+
74096c
+    unwind_granted_inodes(this, pl_inode, &granted);
74096c
+}
74096c
+
74096c
 static void
74096c
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
74096c
 {
74096c
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
74096c
                  * and blocked lists, then this means that a parallel
74096c
                  * unlock on another inodelk (L2 say) may have 'granted'
74096c
                  * L1 and added it to 'granted' list in
74096c
-                 * __grant_blocked_node_locks() (although using the
74096c
+                 * __grant_blocked_inode_locks() (although using the
74096c
                  * 'blocked_locks' member). In that case, the cleanup
74096c
                  * codepath must try and grant other overlapping
74096c
                  * blocked inodelks from other clients, now that L1 is
74096c
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
74096c
     gf_boolean_t need_inode_unref = _gf_false;
74096c
     struct list_head *pcontend = NULL;
74096c
     struct list_head contend;
74096c
+    struct list_head wake;
74096c
     struct timespec now = {};
74096c
     short fl_type;
74096c
 
74096c
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
74096c
         timespec_now(&now;;
74096c
     }
74096c
 
74096c
+    INIT_LIST_HEAD(&wake);
74096c
+
74096c
     if (ctx)
74096c
         pthread_mutex_lock(&ctx->lock);
74096c
     pthread_mutex_lock(&pl_inode->mutex);
74096c
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
74096c
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
74096c
                        lock->client_pid, lkowner_utoa(&lock->owner),
74096c
                        lock->user_flock.l_start, lock->user_flock.l_len);
74096c
-                if (can_block)
74096c
+                if (can_block) {
74096c
                     unref = _gf_false;
74096c
-                /* For all but the case where a non-blocking
74096c
-                 * lock attempt fails, the extra ref taken at
74096c
-                 * the start of this function must be negated.
74096c
-                 */
74096c
-                else
74096c
-                    need_inode_unref = _gf_true;
74096c
+                }
74096c
             }
74096c
-
74096c
-            if (ctx && (!ret || can_block))
74096c
+            /* For all but the case where a non-blocking lock attempt fails
74096c
+             * with -EAGAIN, the extra ref taken at the start of this function
74096c
+             * must be negated. */
74096c
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
74096c
+            if (ctx && !need_inode_unref) {
74096c
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
74096c
+            }
74096c
         } else {
74096c
             /* Irrespective of whether unlock succeeds or not,
74096c
              * the extra inode ref that was done at the start of
74096c
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
74096c
             list_del_init(&retlock->client_list);
74096c
             __pl_inodelk_unref(retlock);
74096c
 
74096c
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
74096c
+
74096c
             ret = 0;
74096c
         }
74096c
     out:
74096c
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
74096c
     if (ctx)
74096c
         pthread_mutex_unlock(&ctx->lock);
74096c
 
74096c
+    pl_inode_remove_wake(&wake);
74096c
+
74096c
     /* The following (extra) unref corresponds to the ref that
74096c
      * was done at the time the lock was granted.
74096c
      */
74096c
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
74096c
                                  inode);
74096c
 
74096c
             if (ret < 0) {
74096c
-                if ((can_block) && (F_UNLCK != lock_type)) {
74096c
-                    goto out;
74096c
+                if (ret == -EAGAIN) {
74096c
+                    if (can_block && (F_UNLCK != lock_type)) {
74096c
+                        goto out;
74096c
+                    }
74096c
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
74096c
+                } else {
74096c
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
74096c
                 }
74096c
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
74096c
                 op_errno = -ret;
74096c
                 goto unwind;
74096c
             }
74096c
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
74096c
index aa267de..6666feb 100644
74096c
--- a/xlators/features/locks/src/locks.h
74096c
+++ b/xlators/features/locks/src/locks.h
74096c
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
74096c
 
74096c
     struct list_head client_list; /* list of all locks from a client */
74096c
     short fl_type;
74096c
+
74096c
+    int32_t status; /* Error code when we try to grant a lock in blocked
74096c
+                       state */
74096c
 };
74096c
 typedef struct __pl_inode_lock pl_inode_lock_t;
74096c
 
74096c
@@ -164,13 +167,14 @@ struct __pl_inode {
74096c
     struct list_head rw_list;            /* list of waiting r/w requests */
74096c
     struct list_head reservelk_list;     /* list of reservelks */
74096c
     struct list_head blocked_reservelks; /* list of blocked reservelks */
74096c
-    struct list_head
74096c
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
74096c
-    struct list_head metalk_list; /* Meta lock list */
74096c
-                                  /* This is to store the incoming lock
74096c
-                                     requests while meta lock is enabled */
74096c
-    struct list_head queued_locks;
74096c
-    int mandatory; /* if mandatory locking is enabled */
74096c
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
74096c
+                                            reserve is held*/
74096c
+    struct list_head metalk_list;        /* Meta lock list */
74096c
+    struct list_head queued_locks;       /* This is to store the incoming lock
74096c
+                                            requests while meta lock is enabled */
74096c
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
74096c
+                                 the inode. */
74096c
+    int mandatory;            /* if mandatory locking is enabled */
74096c
 
74096c
     inode_t *refkeeper; /* hold refs on an inode while locks are
74096c
                            held to prevent pruning */
74096c
@@ -197,6 +201,11 @@ struct __pl_inode {
74096c
     */
74096c
     int fop_wind_count;
74096c
     pthread_cond_t check_fop_wind_count;
74096c
+
74096c
+    int32_t links;           /* Number of hard links the inode has. */
74096c
+    uint32_t remove_running; /* Number of remove operations running. */
74096c
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
74096c
+    gf_boolean_t removed;    /* The inode has been deleted. */
74096c
 };
74096c
 typedef struct __pl_inode pl_inode_t;
74096c
 
74096c
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
74096c
index 7887b82..5ae0125 100644
74096c
--- a/xlators/features/locks/src/posix.c
74096c
+++ b/xlators/features/locks/src/posix.c
74096c
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
74096c
         }                                                                      \
74096c
     } while (0)
74096c
 
74096c
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
74096c
+                        _args...)                                              \
74096c
+    ({                                                                         \
74096c
+        struct list_head contend;                                              \
74096c
+        pl_inode_t *__pl_inode;                                                \
74096c
+        call_stub_t *__stub;                                                   \
74096c
+        int32_t __error;                                                       \
74096c
+        INIT_LIST_HEAD(&contend);                                              \
74096c
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
74096c
+                                          &__pl_inode, &contend);              \
74096c
+        if (__error < 0) {                                                     \
74096c
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
74096c
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
74096c
+                                               &contend);                      \
74096c
+        } else if (__error == 0) {                                             \
74096c
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
74096c
+                                  _loc2);                                      \
74096c
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
74096c
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
74096c
+        }                                                                      \
74096c
+        __error;                                                               \
74096c
+    })
74096c
+
74096c
 gf_boolean_t
74096c
 pl_has_xdata_requests(dict_t *xdata)
74096c
 {
74096c
@@ -2969,11 +2992,85 @@ out:
74096c
     return ret;
74096c
 }
74096c
 
74096c
+static int32_t
74096c
+pl_request_link_count(dict_t **pxdata)
74096c
+{
74096c
+    dict_t *xdata;
74096c
+
74096c
+    xdata = *pxdata;
74096c
+    if (xdata == NULL) {
74096c
+        xdata = dict_new();
74096c
+        if (xdata == NULL) {
74096c
+            return ENOMEM;
74096c
+        }
74096c
+    } else {
74096c
+        dict_ref(xdata);
74096c
+    }
74096c
+
74096c
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
74096c
+        dict_unref(xdata);
74096c
+        return ENOMEM;
74096c
+    }
74096c
+
74096c
+    *pxdata = xdata;
74096c
+
74096c
+    return 0;
74096c
+}
74096c
+
74096c
+static int32_t
74096c
+pl_check_link_count(dict_t *xdata)
74096c
+{
74096c
+    int32_t count;
74096c
+
74096c
+    /* In case we are unable to read the link count from xdata, we take a
74096c
+     * conservative approach and return -2, which will prevent the inode from
74096c
+     * being considered deleted. In fact it will cause link tracking for this
74096c
+     * inode to be disabled completely to avoid races. */
74096c
+
74096c
+    if (xdata == NULL) {
74096c
+        return -2;
74096c
+    }
74096c
+
74096c
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
74096c
+        return -2;
74096c
+    }
74096c
+
74096c
+    return count;
74096c
+}
74096c
+
74096c
 int32_t
74096c
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
74096c
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
74096c
               struct iatt *postparent)
74096c
 {
74096c
+    pl_inode_t *pl_inode;
74096c
+
74096c
+    if (op_ret >= 0) {
74096c
+        pl_inode = pl_inode_get(this, inode, NULL);
74096c
+        if (pl_inode == NULL) {
74096c
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
74096c
+                            NULL);
74096c
+            return 0;
74096c
+        }
74096c
+
74096c
+        pthread_mutex_lock(&pl_inode->mutex);
74096c
+
74096c
+        /* We only update the link count if we previously didn't know it.
74096c
+         * Doing it always can lead to races since lookup is not executed
74096c
+         * atomically most of the times. */
74096c
+        if (pl_inode->links == -2) {
74096c
+            pl_inode->links = pl_check_link_count(xdata);
74096c
+            if (buf->ia_type == IA_IFDIR) {
74096c
+                /* Directories have at least 2 links. To avoid special handling
74096c
+                 * for directories, we simply decrement the value here to make
74096c
+                 * them equivalent to regular files. */
74096c
+                pl_inode->links--;
74096c
+            }
74096c
+        }
74096c
+
74096c
+        pthread_mutex_unlock(&pl_inode->mutex);
74096c
+    }
74096c
+
74096c
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
74096c
                     postparent);
74096c
     return 0;
74096c
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
74096c
 int32_t
74096c
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
74096c
 {
74096c
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
74096c
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
74096c
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
74096c
+    int32_t error;
74096c
+
74096c
+    error = pl_request_link_count(&xdata);
74096c
+    if (error == 0) {
74096c
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
74096c
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
74096c
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
74096c
+        dict_unref(xdata);
74096c
+    } else {
74096c
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
74096c
+    }
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -3792,6 +3897,10 @@ unlock:
74096c
             gf_proc_dump_write("posixlk-count", "%d", count);
74096c
             __dump_posixlks(pl_inode);
74096c
         }
74096c
+
74096c
+        gf_proc_dump_write("links", "%d", pl_inode->links);
74096c
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
74096c
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
74096c
     }
74096c
     pthread_mutex_unlock(&pl_inode->mutex);
74096c
 
74096c
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
74096c
               struct iatt *postoldparent, struct iatt *prenewparent,
74096c
               struct iatt *postnewparent, dict_t *xdata)
74096c
 {
74096c
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
74096c
+
74096c
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
74096c
                     postoldparent, prenewparent, postnewparent, xdata);
74096c
+
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -4146,10 +4258,15 @@ int32_t
74096c
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
74096c
           dict_t *xdata)
74096c
 {
74096c
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
74096c
+    int32_t error;
74096c
+
74096c
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
74096c
+                            pl_rename_cbk, oldloc, newloc, xdata);
74096c
+    if (error > 0) {
74096c
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
74096c
+                            NULL, NULL);
74096c
+    }
74096c
 
74096c
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
74096c
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
74096c
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
74096c
               dict_t *xdata)
74096c
 {
74096c
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
74096c
+
74096c
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
74096c
                     postparent, xdata);
74096c
+
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -4282,9 +4402,14 @@ int32_t
74096c
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
74096c
           dict_t *xdata)
74096c
 {
74096c
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
74096c
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
74096c
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
74096c
+    int32_t error;
74096c
+
74096c
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
74096c
+                            pl_unlink_cbk, loc, xflag, xdata);
74096c
+    if (error > 0) {
74096c
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
74096c
+    }
74096c
+
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
74096c
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
74096c
              dict_t *xdata)
74096c
 {
74096c
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
74096c
+
74096c
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
74096c
                                postparent, xdata);
74096c
+
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -4360,9 +4488,14 @@ int
74096c
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
74096c
          dict_t *xdata)
74096c
 {
74096c
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
74096c
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
74096c
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
74096c
+    int32_t error;
74096c
+
74096c
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
74096c
+                            pl_rmdir_cbk, loc, xflags, xdata);
74096c
+    if (error > 0) {
74096c
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
74096c
+    }
74096c
+
74096c
     return 0;
74096c
 }
74096c
 
74096c
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
74096c
             int32_t op_errno, inode_t *inode, struct iatt *buf,
74096c
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
74096c
 {
74096c
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
74096c
+
74096c
+    if (op_ret >= 0) {
74096c
+        pthread_mutex_lock(&pl_inode->mutex);
74096c
+
74096c
+        /* TODO: can happen pl_inode->links == 0 ? */
74096c
+        if (pl_inode->links >= 0) {
74096c
+            pl_inode->links++;
74096c
+        }
74096c
+
74096c
+        pthread_mutex_unlock(&pl_inode->mutex);
74096c
+    }
74096c
+
74096c
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
74096c
                                preparent, postparent, xdata);
74096c
     return 0;
74096c
@@ -4401,9 +4547,18 @@ int
74096c
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
74096c
         dict_t *xdata)
74096c
 {
74096c
+    pl_inode_t *pl_inode;
74096c
+
74096c
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
74096c
+    if (pl_inode == NULL) {
74096c
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
74096c
+                            NULL);
74096c
+        return 0;
74096c
+    }
74096c
+
74096c
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
74096c
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
74096c
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
74096c
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
74096c
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
74096c
     return 0;
74096c
 }
74096c
 
74096c
-- 
74096c
1.8.3.1
74096c