87c3ef
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
87c3ef
From: Xavi Hernandez <xhernandez@redhat.com>
87c3ef
Date: Fri, 12 Jun 2020 00:06:36 +0200
87c3ef
Subject: [PATCH 455/456] locks: prevent deletion of locked entries
87c3ef
87c3ef
To keep consistency inside transactions started by locking an entry or
87c3ef
an inode, this change delays the removal of entries that are currently
87c3ef
locked by one or more clients. Once all locks are released, the removal
87c3ef
is processed.
87c3ef
87c3ef
It has also been improved the detection of stale inodes in the locking
87c3ef
code of EC.
87c3ef
87c3ef
>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
87c3ef
>Fixes: #990
87c3ef
87c3ef
Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
87c3ef
BUG: 1812789
87c3ef
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
87c3ef
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
87c3ef
Tested-by: RHGS Build Bot <nigelb@redhat.com>
87c3ef
---
87c3ef
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
87c3ef
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
87c3ef
 xlators/features/locks/src/common.h  |  43 +++++
87c3ef
 xlators/features/locks/src/entrylk.c |  19 +--
87c3ef
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
87c3ef
 xlators/features/locks/src/locks.h   |  23 ++-
87c3ef
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
87c3ef
 7 files changed, 689 insertions(+), 114 deletions(-)
87c3ef
87c3ef
diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
87c3ef
index ffcac07..db86296 100644
87c3ef
--- a/xlators/cluster/ec/src/ec-locks.c
87c3ef
+++ b/xlators/cluster/ec/src/ec-locks.c
87c3ef
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
87c3ef
     ec_t *ec = fop->xl->private;
87c3ef
     ec_cbk_data_t *ans = NULL;
87c3ef
     ec_cbk_data_t *cbk = NULL;
87c3ef
-    uintptr_t locked = 0, notlocked = 0;
87c3ef
+    uintptr_t locked = 0;
87c3ef
+    int32_t good = 0;
87c3ef
+    int32_t eagain = 0;
87c3ef
+    int32_t estale = 0;
87c3ef
     int32_t error = -1;
87c3ef
 
87c3ef
+    /* There are some errors that we'll handle in an special way while trying
87c3ef
+     * to acquire a lock.
87c3ef
+     *
87c3ef
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
87c3ef
+     *            consider that there's contention on the inode, so we consider
87c3ef
+     *            the acquisition a failure and try again with a sequential
87c3ef
+     *            blocking lock request. This will ensure that we get a lock on
87c3ef
+     *            as many bricks as possible (ignoring EAGAIN here would cause
87c3ef
+     *            unnecessary triggers of self-healing).
87c3ef
+     *
87c3ef
+     *            If it's found during a sequential blocking lock request, it's
87c3ef
+     *            considered an error. Lock will only succeed if there are
87c3ef
+     *            enough other bricks locked.
87c3ef
+     *
87c3ef
+     *   ESTALE:  This can appear during parallel or sequential lock request if
87c3ef
+     *            the inode has just been unlinked. We consider this error is
87c3ef
+     *            not recoverable, but we also don't consider it as fatal. So,
87c3ef
+     *            if it happens during parallel lock, we won't attempt a
87c3ef
+     *            sequential one unless there are EAGAIN errors on other
87c3ef
+     *            bricks (and are enough to form a quorum), but if we reach
87c3ef
+     *            quorum counting the ESTALE bricks, we consider the whole
87c3ef
+     *            result of the operation is ESTALE instead of EIO.
87c3ef
+     */
87c3ef
+
87c3ef
     list_for_each_entry(ans, &fop->cbk_list, list)
87c3ef
     {
87c3ef
         if (ans->op_ret >= 0) {
87c3ef
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
87c3ef
                 error = EIO;
87c3ef
             }
87c3ef
             locked |= ans->mask;
87c3ef
+            good = ans->count;
87c3ef
             cbk = ans;
87c3ef
-        } else {
87c3ef
-            if (ans->op_errno == EAGAIN) {
87c3ef
-                switch (fop->uint32) {
87c3ef
-                    case EC_LOCK_MODE_NONE:
87c3ef
-                    case EC_LOCK_MODE_ALL:
87c3ef
-                        /* Goal is to treat non-blocking lock as failure
87c3ef
-                         * even if there is a single EAGAIN*/
87c3ef
-                        notlocked |= ans->mask;
87c3ef
-                        break;
87c3ef
-                }
87c3ef
-            }
87c3ef
+        } else if (ans->op_errno == ESTALE) {
87c3ef
+            estale += ans->count;
87c3ef
+        } else if ((ans->op_errno == EAGAIN) &&
87c3ef
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
87c3ef
+            eagain += ans->count;
87c3ef
         }
87c3ef
     }
87c3ef
 
87c3ef
     if (error == -1) {
87c3ef
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
87c3ef
-            if (notlocked == 0) {
87c3ef
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
87c3ef
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
87c3ef
+         * we retry with a sequential blocking lock request if needed.
87c3ef
+         * Otherwise we succeed. */
87c3ef
+        if ((good + eagain) >= ec->fragments) {
87c3ef
+            if (eagain == 0) {
87c3ef
                 if (fop->answer == NULL) {
87c3ef
                     fop->answer = cbk;
87c3ef
                 }
87c3ef
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
87c3ef
                     case EC_LOCK_MODE_NONE:
87c3ef
                         error = EAGAIN;
87c3ef
                         break;
87c3ef
-
87c3ef
                     case EC_LOCK_MODE_ALL:
87c3ef
                         fop->uint32 = EC_LOCK_MODE_INC;
87c3ef
                         break;
87c3ef
-
87c3ef
                     default:
87c3ef
+                        /* This shouldn't happen because eagain cannot be > 0
87c3ef
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
87c3ef
                         error = EIO;
87c3ef
                         break;
87c3ef
                 }
87c3ef
             }
87c3ef
         } else {
87c3ef
-            if (fop->answer && fop->answer->op_ret < 0)
87c3ef
+            /* We have been unable to find enough candidates that will be able
87c3ef
+             * to take the lock. If we have quorum on some answer, we return
87c3ef
+             * it. Otherwise we check if ESTALE answers allow us to reach
87c3ef
+             * quorum. If so, we return ESTALE. */
87c3ef
+            if (fop->answer && fop->answer->op_ret < 0) {
87c3ef
                 error = fop->answer->op_errno;
87c3ef
-            else
87c3ef
+            } else if ((good + eagain + estale) >= ec->fragments) {
87c3ef
+                error = ESTALE;
87c3ef
+            } else {
87c3ef
                 error = EIO;
87c3ef
+            }
87c3ef
         }
87c3ef
     }
87c3ef
 
87c3ef
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
87c3ef
index 1406e70..0c52853 100644
87c3ef
--- a/xlators/features/locks/src/common.c
87c3ef
+++ b/xlators/features/locks/src/common.c
87c3ef
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
87c3ef
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
87c3ef
         INIT_LIST_HEAD(&pl_inode->metalk_list);
87c3ef
         INIT_LIST_HEAD(&pl_inode->queued_locks);
87c3ef
+        INIT_LIST_HEAD(&pl_inode->waiting);
87c3ef
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
87c3ef
 
87c3ef
         pl_inode->check_mlock_info = _gf_true;
87c3ef
         pl_inode->mlock_enforced = _gf_false;
87c3ef
 
87c3ef
+        /* -2 means never looked up. -1 means something went wrong and link
87c3ef
+         * tracking is disabled. */
87c3ef
+        pl_inode->links = -2;
87c3ef
+
87c3ef
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
87c3ef
         if (ret) {
87c3ef
             pthread_mutex_destroy(&pl_inode->mutex);
87c3ef
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
87c3ef
     }
87c3ef
 
87c3ef
     return 0;
87c3ef
-}
87c3ef
\ No newline at end of file
87c3ef
+}
87c3ef
+
87c3ef
+gf_boolean_t
87c3ef
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
87c3ef
+{
87c3ef
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
87c3ef
+        return _gf_true;
87c3ef
+    }
87c3ef
+
87c3ef
+    if (is_lk_owner_null(owner)) {
87c3ef
+        return _gf_false;
87c3ef
+    }
87c3ef
+    return _gf_true;
87c3ef
+}
87c3ef
+
87c3ef
+static int32_t
87c3ef
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
87c3ef
+{
87c3ef
+    inode_t *inode = NULL;
87c3ef
+    int32_t error = 0;
87c3ef
+
87c3ef
+    if (loc->inode != NULL) {
87c3ef
+        inode = inode_ref(loc->inode);
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+    if (loc->parent == NULL) {
87c3ef
+        error = EINVAL;
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+    if (!gf_uuid_is_null(loc->gfid)) {
87c3ef
+        inode = inode_find(loc->parent->table, loc->gfid);
87c3ef
+        if (inode != NULL) {
87c3ef
+            goto done;
87c3ef
+        }
87c3ef
+    }
87c3ef
+
87c3ef
+    if (loc->name == NULL) {
87c3ef
+        error = EINVAL;
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
87c3ef
+    if (inode == NULL) {
87c3ef
+        /* We haven't found any inode. This means that the file doesn't exist
87c3ef
+         * or that even if it exists, we don't have any knowledge about it, so
87c3ef
+         * we don't have locks on it either, which is fine for our purposes. */
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+done:
87c3ef
+    *pinode = inode;
87c3ef
+
87c3ef
+    return error;
87c3ef
+}
87c3ef
+
87c3ef
+static gf_boolean_t
87c3ef
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
87c3ef
+                    struct timespec *now, struct list_head *contend)
87c3ef
+{
87c3ef
+    pl_dom_list_t *dom;
87c3ef
+    pl_inode_lock_t *lock;
87c3ef
+    gf_boolean_t has_owners = _gf_false;
87c3ef
+
87c3ef
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
87c3ef
+    {
87c3ef
+        list_for_each_entry(lock, &dom->inodelk_list, list)
87c3ef
+        {
87c3ef
+            /* If the lock belongs to the same client, we assume it's related
87c3ef
+             * to the same operation, so we allow the removal to continue. */
87c3ef
+            if (lock->client == client) {
87c3ef
+                continue;
87c3ef
+            }
87c3ef
+            /* If the lock belongs to an internal process, we don't block the
87c3ef
+             * removal. */
87c3ef
+            if (lock->client_pid < 0) {
87c3ef
+                continue;
87c3ef
+            }
87c3ef
+            if (contend == NULL) {
87c3ef
+                return _gf_true;
87c3ef
+            }
87c3ef
+            has_owners = _gf_true;
87c3ef
+            inodelk_contention_notify_check(xl, lock, now, contend);
87c3ef
+        }
87c3ef
+    }
87c3ef
+
87c3ef
+    return has_owners;
87c3ef
+}
87c3ef
+
87c3ef
+int32_t
87c3ef
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
87c3ef
+                        pl_inode_t **ppl_inode, struct list_head *contend)
87c3ef
+{
87c3ef
+    struct timespec now;
87c3ef
+    inode_t *inode;
87c3ef
+    pl_inode_t *pl_inode;
87c3ef
+    int32_t error;
87c3ef
+
87c3ef
+    pl_inode = NULL;
87c3ef
+
87c3ef
+    error = pl_inode_from_loc(loc, &inode;;
87c3ef
+    if ((error != 0) || (inode == NULL)) {
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+    pl_inode = pl_inode_get(xl, inode, NULL);
87c3ef
+    if (pl_inode == NULL) {
87c3ef
+        inode_unref(inode);
87c3ef
+        error = ENOMEM;
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+    /* pl_inode_from_loc() already increments ref count for inode, so
87c3ef
+     * we only assign here our reference. */
87c3ef
+    pl_inode->inode = inode;
87c3ef
+
87c3ef
+    timespec_now(&now;;
87c3ef
+
87c3ef
+    pthread_mutex_lock(&pl_inode->mutex);
87c3ef
+
87c3ef
+    if (pl_inode->removed) {
87c3ef
+        error = ESTALE;
87c3ef
+        goto unlock;
87c3ef
+    }
87c3ef
+
87c3ef
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
87c3ef
+        error = -1;
87c3ef
+        /* We skip the unlock here because the caller must create a stub when
87c3ef
+         * we return -1 and do a call to pl_inode_remove_complete(), which
87c3ef
+         * assumes the lock is still acquired and will release it once
87c3ef
+         * everything else is prepared. */
87c3ef
+        goto done;
87c3ef
+    }
87c3ef
+
87c3ef
+    pl_inode->is_locked = _gf_true;
87c3ef
+    pl_inode->remove_running++;
87c3ef
+
87c3ef
+unlock:
87c3ef
+    pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
+
87c3ef
+done:
87c3ef
+    *ppl_inode = pl_inode;
87c3ef
+
87c3ef
+    return error;
87c3ef
+}
87c3ef
+
87c3ef
+int32_t
87c3ef
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
87c3ef
+                         struct list_head *contend)
87c3ef
+{
87c3ef
+    pl_inode_lock_t *lock;
87c3ef
+    int32_t error = -1;
87c3ef
+
87c3ef
+    if (stub != NULL) {
87c3ef
+        list_add_tail(&stub->list, &pl_inode->waiting);
87c3ef
+        pl_inode->is_locked = _gf_true;
87c3ef
+    } else {
87c3ef
+        error = ENOMEM;
87c3ef
+
87c3ef
+        while (!list_empty(contend)) {
87c3ef
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
87c3ef
+            list_del_init(&lock->list);
87c3ef
+            __pl_inodelk_unref(lock);
87c3ef
+        }
87c3ef
+    }
87c3ef
+
87c3ef
+    pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
+
87c3ef
+    if (error < 0) {
87c3ef
+        inodelk_contention_notify(xl, contend);
87c3ef
+    }
87c3ef
+
87c3ef
+    inode_unref(pl_inode->inode);
87c3ef
+
87c3ef
+    return error;
87c3ef
+}
87c3ef
+
87c3ef
+void
87c3ef
+pl_inode_remove_wake(struct list_head *list)
87c3ef
+{
87c3ef
+    call_stub_t *stub;
87c3ef
+
87c3ef
+    while (!list_empty(list)) {
87c3ef
+        stub = list_first_entry(list, call_stub_t, list);
87c3ef
+        list_del_init(&stub->list);
87c3ef
+
87c3ef
+        call_resume(stub);
87c3ef
+    }
87c3ef
+}
87c3ef
+
87c3ef
+void
87c3ef
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
87c3ef
+{
87c3ef
+    struct list_head contend, granted;
87c3ef
+    struct timespec now;
87c3ef
+    pl_dom_list_t *dom;
87c3ef
+
87c3ef
+    if (pl_inode == NULL) {
87c3ef
+        return;
87c3ef
+    }
87c3ef
+
87c3ef
+    INIT_LIST_HEAD(&contend);
87c3ef
+    INIT_LIST_HEAD(&granted);
87c3ef
+    timespec_now(&now;;
87c3ef
+
87c3ef
+    pthread_mutex_lock(&pl_inode->mutex);
87c3ef
+
87c3ef
+    if (error == 0) {
87c3ef
+        if (pl_inode->links >= 0) {
87c3ef
+            pl_inode->links--;
87c3ef
+        }
87c3ef
+        if (pl_inode->links == 0) {
87c3ef
+            pl_inode->removed = _gf_true;
87c3ef
+        }
87c3ef
+    }
87c3ef
+
87c3ef
+    pl_inode->remove_running--;
87c3ef
+
87c3ef
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
87c3ef
+        pl_inode->is_locked = _gf_false;
87c3ef
+
87c3ef
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
87c3ef
+        {
87c3ef
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
87c3ef
+                                        &contend);
87c3ef
+        }
87c3ef
+    }
87c3ef
+
87c3ef
+    pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
+
87c3ef
+    unwind_granted_inodes(xl, pl_inode, &granted);
87c3ef
+
87c3ef
+    inodelk_contention_notify(xl, &contend);
87c3ef
+
87c3ef
+    inode_unref(pl_inode->inode);
87c3ef
+}
87c3ef
+
87c3ef
+void
87c3ef
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
87c3ef
+                         struct list_head *list)
87c3ef
+{
87c3ef
+    call_stub_t *stub, *tmp;
87c3ef
+
87c3ef
+    if (!pl_inode->is_locked) {
87c3ef
+        return;
87c3ef
+    }
87c3ef
+
87c3ef
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
87c3ef
+    {
87c3ef
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
87c3ef
+                                 NULL)) {
87c3ef
+            list_move_tail(&stub->list, list);
87c3ef
+        }
87c3ef
+    }
87c3ef
+}
87c3ef
+
87c3ef
+/* This function determines if an inodelk attempt can be done now or it needs
87c3ef
+ * to wait.
87c3ef
+ *
87c3ef
+ * Possible return values:
87c3ef
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
87c3ef
+ *        inode has been deleted previously by unlink/rmdir/rename
87c3ef
+ *   = 0: The lock can be attempted.
87c3ef
+ *   > 0: The lock needs to wait because a conflicting remove operation is
87c3ef
+ *        ongoing.
87c3ef
+ */
87c3ef
+int32_t
87c3ef
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
87c3ef
+{
87c3ef
+    pl_dom_list_t *dom;
87c3ef
+    pl_inode_lock_t *ilock;
87c3ef
+
87c3ef
+    /* If the inode has been deleted, we won't allow any lock. */
87c3ef
+    if (pl_inode->removed) {
87c3ef
+        return -ESTALE;
87c3ef
+    }
87c3ef
+
87c3ef
+    /* We only synchronize with locks made for regular operations coming from
87c3ef
+     * the user. Locks done for internal purposes are hard to control and could
87c3ef
+     * lead to long delays or deadlocks quite easily. */
87c3ef
+    if (lock->client_pid < 0) {
87c3ef
+        return 0;
87c3ef
+    }
87c3ef
+    if (!pl_inode->is_locked) {
87c3ef
+        return 0;
87c3ef
+    }
87c3ef
+    if (pl_inode->remove_running > 0) {
87c3ef
+        return 1;
87c3ef
+    }
87c3ef
+
87c3ef
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
87c3ef
+    {
87c3ef
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
87c3ef
+        {
87c3ef
+            /* If a lock from the same client is already granted, we allow this
87c3ef
+             * one to continue. This is necessary to prevent deadlocks when
87c3ef
+             * multiple locks are taken for the same operation.
87c3ef
+             *
87c3ef
+             * On the other side it's unlikely that the same client sends
87c3ef
+             * completely unrelated locks for the same inode.
87c3ef
+             */
87c3ef
+            if (ilock->client == lock->client) {
87c3ef
+                return 0;
87c3ef
+            }
87c3ef
+        }
87c3ef
+    }
87c3ef
+
87c3ef
+    return 1;
87c3ef
+}
87c3ef
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
87c3ef
index ea86b96..6c81ac3 100644
87c3ef
--- a/xlators/features/locks/src/common.h
87c3ef
+++ b/xlators/features/locks/src/common.h
87c3ef
@@ -105,6 +105,15 @@ void
87c3ef
 __pl_inodelk_unref(pl_inode_lock_t *lock);
87c3ef
 
87c3ef
 void
87c3ef
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
+                            struct list_head *granted, pl_dom_list_t *dom,
87c3ef
+                            struct timespec *now, struct list_head *contend);
87c3ef
+
87c3ef
+void
87c3ef
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
+                      struct list_head *granted);
87c3ef
+
87c3ef
+void
87c3ef
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
                           pl_dom_list_t *dom, struct timespec *now,
87c3ef
                           struct list_head *contend);
87c3ef
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
87c3ef
 void
87c3ef
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
87c3ef
 
87c3ef
+void
87c3ef
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
87c3ef
+                                struct timespec *now,
87c3ef
+                                struct list_head *contend);
87c3ef
+
87c3ef
+void
87c3ef
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
87c3ef
+                                struct timespec *now,
87c3ef
+                                struct list_head *contend);
87c3ef
+
87c3ef
 gf_boolean_t
87c3ef
 pl_does_monkey_want_stuck_lock();
87c3ef
 
87c3ef
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
87c3ef
 int
87c3ef
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
87c3ef
 
87c3ef
+gf_boolean_t
87c3ef
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
87c3ef
+
87c3ef
+int32_t
87c3ef
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
87c3ef
+                        pl_inode_t **ppl_inode, struct list_head *contend);
87c3ef
+
87c3ef
+int32_t
87c3ef
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
87c3ef
+                         struct list_head *contend);
87c3ef
+
87c3ef
+void
87c3ef
+pl_inode_remove_wake(struct list_head *list);
87c3ef
+
87c3ef
+void
87c3ef
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
87c3ef
+
87c3ef
+void
87c3ef
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
87c3ef
+                         struct list_head *list);
87c3ef
+
87c3ef
+int32_t
87c3ef
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
87c3ef
+
87c3ef
 #endif /* __COMMON_H__ */
87c3ef
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
87c3ef
index 93c649c..b97836f 100644
87c3ef
--- a/xlators/features/locks/src/entrylk.c
87c3ef
+++ b/xlators/features/locks/src/entrylk.c
87c3ef
@@ -197,9 +197,9 @@ out:
87c3ef
     return revoke_lock;
87c3ef
 }
87c3ef
 
87c3ef
-static gf_boolean_t
87c3ef
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
87c3ef
-                                  struct timespec *now)
87c3ef
+void
87c3ef
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
87c3ef
+                                struct timespec *now, struct list_head *contend)
87c3ef
 {
87c3ef
     posix_locks_private_t *priv;
87c3ef
     int64_t elapsed;
87c3ef
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
87c3ef
     /* If this lock is in a list, it means that we are about to send a
87c3ef
      * notification for it, so no need to do anything else. */
87c3ef
     if (!list_empty(&lock->contend)) {
87c3ef
-        return _gf_false;
87c3ef
+        return;
87c3ef
     }
87c3ef
 
87c3ef
     elapsed = now->tv_sec;
87c3ef
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
87c3ef
         elapsed--;
87c3ef
     }
87c3ef
     if (elapsed < priv->notify_contention_delay) {
87c3ef
-        return _gf_false;
87c3ef
+        return;
87c3ef
     }
87c3ef
 
87c3ef
     /* All contention notifications will be sent outside of the locked
87c3ef
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
87c3ef
 
87c3ef
     lock->contention_time = *now;
87c3ef
 
87c3ef
-    return _gf_true;
87c3ef
+    list_add_tail(&lock->contend, contend);
87c3ef
 }
87c3ef
 
87c3ef
 void
87c3ef
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
87c3ef
                     break;
87c3ef
                 }
87c3ef
             }
87c3ef
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
87c3ef
-                list_add_tail(&tmp->contend, contend);
87c3ef
-            }
87c3ef
+            entrylk_contention_notify_check(this, tmp, now, contend);
87c3ef
         }
87c3ef
     }
87c3ef
 
87c3ef
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
87c3ef
 
87c3ef
         if (bl_ret == 0) {
87c3ef
-            list_add(&bl->blocked_locks, granted);
87c3ef
+            list_add_tail(&bl->blocked_locks, granted);
87c3ef
         }
87c3ef
     }
87c3ef
-    return;
87c3ef
 }
87c3ef
 
87c3ef
 /* Grants locks if possible which are blocked on a lock */
87c3ef
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
87c3ef
index 24dee49..1a07243 100644
87c3ef
--- a/xlators/features/locks/src/inodelk.c
87c3ef
+++ b/xlators/features/locks/src/inodelk.c
87c3ef
@@ -231,9 +231,9 @@ out:
87c3ef
     return revoke_lock;
87c3ef
 }
87c3ef
 
87c3ef
-static gf_boolean_t
87c3ef
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
87c3ef
-                                  struct timespec *now)
87c3ef
+void
87c3ef
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
87c3ef
+                                struct timespec *now, struct list_head *contend)
87c3ef
 {
87c3ef
     posix_locks_private_t *priv;
87c3ef
     int64_t elapsed;
87c3ef
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
87c3ef
     /* If this lock is in a list, it means that we are about to send a
87c3ef
      * notification for it, so no need to do anything else. */
87c3ef
     if (!list_empty(&lock->contend)) {
87c3ef
-        return _gf_false;
87c3ef
+        return;
87c3ef
     }
87c3ef
 
87c3ef
     elapsed = now->tv_sec;
87c3ef
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
87c3ef
         elapsed--;
87c3ef
     }
87c3ef
     if (elapsed < priv->notify_contention_delay) {
87c3ef
-        return _gf_false;
87c3ef
+        return;
87c3ef
     }
87c3ef
 
87c3ef
     /* All contention notifications will be sent outside of the locked
87c3ef
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
87c3ef
 
87c3ef
     lock->contention_time = *now;
87c3ef
 
87c3ef
-    return _gf_true;
87c3ef
+    list_add_tail(&lock->contend, contend);
87c3ef
 }
87c3ef
 
87c3ef
 void
87c3ef
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
87c3ef
                     break;
87c3ef
                 }
87c3ef
             }
87c3ef
-            if (__inodelk_needs_contention_notify(this, l, now)) {
87c3ef
-                list_add_tail(&l->contend, contend);
87c3ef
-            }
87c3ef
+            inodelk_contention_notify_check(this, l, now, contend);
87c3ef
         }
87c3ef
     }
87c3ef
 
87c3ef
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
87c3ef
                struct list_head *contend)
87c3ef
 {
87c3ef
     pl_inode_lock_t *conf = NULL;
87c3ef
-    int ret = -EINVAL;
87c3ef
+    int ret;
87c3ef
 
87c3ef
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
87c3ef
-    if (conf) {
87c3ef
-        ret = __lock_blocked_add(this, dom, lock, can_block);
87c3ef
-        goto out;
87c3ef
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
87c3ef
+    if (ret < 0) {
87c3ef
+        return ret;
87c3ef
+    }
87c3ef
+    if (ret == 0) {
87c3ef
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
87c3ef
+    }
87c3ef
+    if ((ret > 0) || (conf != NULL)) {
87c3ef
+        return __lock_blocked_add(this, dom, lock, can_block);
87c3ef
     }
87c3ef
 
87c3ef
     /* To prevent blocked locks starvation, check if there are any blocked
87c3ef
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
87c3ef
                    "starvation");
87c3ef
         }
87c3ef
 
87c3ef
-        ret = __lock_blocked_add(this, dom, lock, can_block);
87c3ef
-        goto out;
87c3ef
+        return __lock_blocked_add(this, dom, lock, can_block);
87c3ef
     }
87c3ef
     __pl_inodelk_ref(lock);
87c3ef
     gettimeofday(&lock->granted_time, NULL);
87c3ef
     list_add(&lock->list, &dom->inodelk_list);
87c3ef
 
87c3ef
-    ret = 0;
87c3ef
-
87c3ef
-out:
87c3ef
-    return ret;
87c3ef
+    return 0;
87c3ef
 }
87c3ef
 
87c3ef
 /* Return true if the two inodelks have exactly same lock boundaries */
87c3ef
@@ -529,12 +528,11 @@ out:
87c3ef
     return conf;
87c3ef
 }
87c3ef
 
87c3ef
-static void
87c3ef
+void
87c3ef
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
                             struct list_head *granted, pl_dom_list_t *dom,
87c3ef
                             struct timespec *now, struct list_head *contend)
87c3ef
 {
87c3ef
-    int bl_ret = 0;
87c3ef
     pl_inode_lock_t *bl = NULL;
87c3ef
     pl_inode_lock_t *tmp = NULL;
87c3ef
 
87c3ef
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
     {
87c3ef
         list_del_init(&bl->blocked_locks);
87c3ef
 
87c3ef
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
87c3ef
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
87c3ef
 
87c3ef
-        if (bl_ret == 0) {
87c3ef
-            list_add(&bl->blocked_locks, granted);
87c3ef
+        if (bl->status != -EAGAIN) {
87c3ef
+            list_add_tail(&bl->blocked_locks, granted);
87c3ef
         }
87c3ef
     }
87c3ef
-    return;
87c3ef
 }
87c3ef
 
87c3ef
-/* Grant all inodelks blocked on a lock */
87c3ef
 void
87c3ef
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
-                          pl_dom_list_t *dom, struct timespec *now,
87c3ef
-                          struct list_head *contend)
87c3ef
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
+                      struct list_head *granted)
87c3ef
 {
87c3ef
-    struct list_head granted;
87c3ef
     pl_inode_lock_t *lock;
87c3ef
     pl_inode_lock_t *tmp;
87c3ef
+    int32_t op_ret;
87c3ef
+    int32_t op_errno;
87c3ef
 
87c3ef
-    INIT_LIST_HEAD(&granted);
87c3ef
-
87c3ef
-    pthread_mutex_lock(&pl_inode->mutex);
87c3ef
-    {
87c3ef
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
87c3ef
-                                    contend);
87c3ef
-    }
87c3ef
-    pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
-
87c3ef
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
87c3ef
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
87c3ef
     {
87c3ef
-        gf_log(this->name, GF_LOG_TRACE,
87c3ef
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
87c3ef
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
87c3ef
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
87c3ef
-               lock->user_flock.l_len);
87c3ef
-
87c3ef
+        if (lock->status == 0) {
87c3ef
+            op_ret = 0;
87c3ef
+            op_errno = 0;
87c3ef
+            gf_log(this->name, GF_LOG_TRACE,
87c3ef
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
87c3ef
+                   " => Granted",
87c3ef
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
87c3ef
+                   lock->client_pid, lkowner_utoa(&lock->owner),
87c3ef
+                   lock->user_flock.l_start, lock->user_flock.l_len);
87c3ef
+        } else {
87c3ef
+            op_ret = -1;
87c3ef
+            op_errno = -lock->status;
87c3ef
+        }
87c3ef
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
87c3ef
-                     0, 0, lock->volume);
87c3ef
+                     op_ret, op_errno, lock->volume);
87c3ef
 
87c3ef
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
87c3ef
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
87c3ef
         lock->frame = NULL;
87c3ef
     }
87c3ef
 
87c3ef
     pthread_mutex_lock(&pl_inode->mutex);
87c3ef
     {
87c3ef
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
87c3ef
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
87c3ef
         {
87c3ef
             list_del_init(&lock->blocked_locks);
87c3ef
             __pl_inodelk_unref(lock);
87c3ef
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
     pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
 }
87c3ef
 
87c3ef
+/* Grant all inodelks blocked on a lock */
87c3ef
+void
87c3ef
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
87c3ef
+                          pl_dom_list_t *dom, struct timespec *now,
87c3ef
+                          struct list_head *contend)
87c3ef
+{
87c3ef
+    struct list_head granted;
87c3ef
+
87c3ef
+    INIT_LIST_HEAD(&granted);
87c3ef
+
87c3ef
+    pthread_mutex_lock(&pl_inode->mutex);
87c3ef
+    {
87c3ef
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
87c3ef
+                                    contend);
87c3ef
+    }
87c3ef
+    pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
+
87c3ef
+    unwind_granted_inodes(this, pl_inode, &granted);
87c3ef
+}
87c3ef
+
87c3ef
 static void
87c3ef
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
87c3ef
 {
87c3ef
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
87c3ef
                  * and blocked lists, then this means that a parallel
87c3ef
                  * unlock on another inodelk (L2 say) may have 'granted'
87c3ef
                  * L1 and added it to 'granted' list in
87c3ef
-                 * __grant_blocked_node_locks() (although using the
87c3ef
+                 * __grant_blocked_inode_locks() (although using the
87c3ef
                  * 'blocked_locks' member). In that case, the cleanup
87c3ef
                  * codepath must try and grant other overlapping
87c3ef
                  * blocked inodelks from other clients, now that L1 is
87c3ef
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
87c3ef
     gf_boolean_t need_inode_unref = _gf_false;
87c3ef
     struct list_head *pcontend = NULL;
87c3ef
     struct list_head contend;
87c3ef
+    struct list_head wake;
87c3ef
     struct timespec now = {};
87c3ef
     short fl_type;
87c3ef
 
87c3ef
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
87c3ef
         timespec_now(&now;;
87c3ef
     }
87c3ef
 
87c3ef
+    INIT_LIST_HEAD(&wake);
87c3ef
+
87c3ef
     if (ctx)
87c3ef
         pthread_mutex_lock(&ctx->lock);
87c3ef
     pthread_mutex_lock(&pl_inode->mutex);
87c3ef
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
87c3ef
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
87c3ef
                        lock->client_pid, lkowner_utoa(&lock->owner),
87c3ef
                        lock->user_flock.l_start, lock->user_flock.l_len);
87c3ef
-                if (can_block)
87c3ef
+                if (can_block) {
87c3ef
                     unref = _gf_false;
87c3ef
-                /* For all but the case where a non-blocking
87c3ef
-                 * lock attempt fails, the extra ref taken at
87c3ef
-                 * the start of this function must be negated.
87c3ef
-                 */
87c3ef
-                else
87c3ef
-                    need_inode_unref = _gf_true;
87c3ef
+                }
87c3ef
             }
87c3ef
-
87c3ef
-            if (ctx && (!ret || can_block))
87c3ef
+            /* For all but the case where a non-blocking lock attempt fails
87c3ef
+             * with -EAGAIN, the extra ref taken at the start of this function
87c3ef
+             * must be negated. */
87c3ef
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
87c3ef
+            if (ctx && !need_inode_unref) {
87c3ef
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
87c3ef
+            }
87c3ef
         } else {
87c3ef
             /* Irrespective of whether unlock succeeds or not,
87c3ef
              * the extra inode ref that was done at the start of
87c3ef
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
87c3ef
             list_del_init(&retlock->client_list);
87c3ef
             __pl_inodelk_unref(retlock);
87c3ef
 
87c3ef
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
87c3ef
+
87c3ef
             ret = 0;
87c3ef
         }
87c3ef
     out:
87c3ef
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
87c3ef
     if (ctx)
87c3ef
         pthread_mutex_unlock(&ctx->lock);
87c3ef
 
87c3ef
+    pl_inode_remove_wake(&wake);
87c3ef
+
87c3ef
     /* The following (extra) unref corresponds to the ref that
87c3ef
      * was done at the time the lock was granted.
87c3ef
      */
87c3ef
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
87c3ef
                                  inode);
87c3ef
 
87c3ef
             if (ret < 0) {
87c3ef
-                if ((can_block) && (F_UNLCK != lock_type)) {
87c3ef
-                    goto out;
87c3ef
+                if (ret == -EAGAIN) {
87c3ef
+                    if (can_block && (F_UNLCK != lock_type)) {
87c3ef
+                        goto out;
87c3ef
+                    }
87c3ef
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
87c3ef
+                } else {
87c3ef
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
87c3ef
                 }
87c3ef
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
87c3ef
                 op_errno = -ret;
87c3ef
                 goto unwind;
87c3ef
             }
87c3ef
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
87c3ef
index aa267de..6666feb 100644
87c3ef
--- a/xlators/features/locks/src/locks.h
87c3ef
+++ b/xlators/features/locks/src/locks.h
87c3ef
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
87c3ef
 
87c3ef
     struct list_head client_list; /* list of all locks from a client */
87c3ef
     short fl_type;
87c3ef
+
87c3ef
+    int32_t status; /* Error code when we try to grant a lock in blocked
87c3ef
+                       state */
87c3ef
 };
87c3ef
 typedef struct __pl_inode_lock pl_inode_lock_t;
87c3ef
 
87c3ef
@@ -164,13 +167,14 @@ struct __pl_inode {
87c3ef
     struct list_head rw_list;            /* list of waiting r/w requests */
87c3ef
     struct list_head reservelk_list;     /* list of reservelks */
87c3ef
     struct list_head blocked_reservelks; /* list of blocked reservelks */
87c3ef
-    struct list_head
87c3ef
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
87c3ef
-    struct list_head metalk_list; /* Meta lock list */
87c3ef
-                                  /* This is to store the incoming lock
87c3ef
-                                     requests while meta lock is enabled */
87c3ef
-    struct list_head queued_locks;
87c3ef
-    int mandatory; /* if mandatory locking is enabled */
87c3ef
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
87c3ef
+                                            reserve is held*/
87c3ef
+    struct list_head metalk_list;        /* Meta lock list */
87c3ef
+    struct list_head queued_locks;       /* This is to store the incoming lock
87c3ef
+                                            requests while meta lock is enabled */
87c3ef
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
87c3ef
+                                 the inode. */
87c3ef
+    int mandatory;            /* if mandatory locking is enabled */
87c3ef
 
87c3ef
     inode_t *refkeeper; /* hold refs on an inode while locks are
87c3ef
                            held to prevent pruning */
87c3ef
@@ -197,6 +201,11 @@ struct __pl_inode {
87c3ef
     */
87c3ef
     int fop_wind_count;
87c3ef
     pthread_cond_t check_fop_wind_count;
87c3ef
+
87c3ef
+    int32_t links;           /* Number of hard links the inode has. */
87c3ef
+    uint32_t remove_running; /* Number of remove operations running. */
87c3ef
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
87c3ef
+    gf_boolean_t removed;    /* The inode has been deleted. */
87c3ef
 };
87c3ef
 typedef struct __pl_inode pl_inode_t;
87c3ef
 
87c3ef
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
87c3ef
index 7887b82..5ae0125 100644
87c3ef
--- a/xlators/features/locks/src/posix.c
87c3ef
+++ b/xlators/features/locks/src/posix.c
87c3ef
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
87c3ef
         }                                                                      \
87c3ef
     } while (0)
87c3ef
 
87c3ef
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
87c3ef
+                        _args...)                                              \
87c3ef
+    ({                                                                         \
87c3ef
+        struct list_head contend;                                              \
87c3ef
+        pl_inode_t *__pl_inode;                                                \
87c3ef
+        call_stub_t *__stub;                                                   \
87c3ef
+        int32_t __error;                                                       \
87c3ef
+        INIT_LIST_HEAD(&contend);                                              \
87c3ef
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
87c3ef
+                                          &__pl_inode, &contend);              \
87c3ef
+        if (__error < 0) {                                                     \
87c3ef
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
87c3ef
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
87c3ef
+                                               &contend);                      \
87c3ef
+        } else if (__error == 0) {                                             \
87c3ef
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
87c3ef
+                                  _loc2);                                      \
87c3ef
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
87c3ef
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
87c3ef
+        }                                                                      \
87c3ef
+        __error;                                                               \
87c3ef
+    })
87c3ef
+
87c3ef
 gf_boolean_t
87c3ef
 pl_has_xdata_requests(dict_t *xdata)
87c3ef
 {
87c3ef
@@ -2969,11 +2992,85 @@ out:
87c3ef
     return ret;
87c3ef
 }
87c3ef
 
87c3ef
+static int32_t
87c3ef
+pl_request_link_count(dict_t **pxdata)
87c3ef
+{
87c3ef
+    dict_t *xdata;
87c3ef
+
87c3ef
+    xdata = *pxdata;
87c3ef
+    if (xdata == NULL) {
87c3ef
+        xdata = dict_new();
87c3ef
+        if (xdata == NULL) {
87c3ef
+            return ENOMEM;
87c3ef
+        }
87c3ef
+    } else {
87c3ef
+        dict_ref(xdata);
87c3ef
+    }
87c3ef
+
87c3ef
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
87c3ef
+        dict_unref(xdata);
87c3ef
+        return ENOMEM;
87c3ef
+    }
87c3ef
+
87c3ef
+    *pxdata = xdata;
87c3ef
+
87c3ef
+    return 0;
87c3ef
+}
87c3ef
+
87c3ef
+static int32_t
87c3ef
+pl_check_link_count(dict_t *xdata)
87c3ef
+{
87c3ef
+    int32_t count;
87c3ef
+
87c3ef
+    /* In case we are unable to read the link count from xdata, we take a
87c3ef
+     * conservative approach and return -2, which will prevent the inode from
87c3ef
+     * being considered deleted. In fact it will cause link tracking for this
87c3ef
+     * inode to be disabled completely to avoid races. */
87c3ef
+
87c3ef
+    if (xdata == NULL) {
87c3ef
+        return -2;
87c3ef
+    }
87c3ef
+
87c3ef
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
87c3ef
+        return -2;
87c3ef
+    }
87c3ef
+
87c3ef
+    return count;
87c3ef
+}
87c3ef
+
87c3ef
 int32_t
87c3ef
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
87c3ef
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
87c3ef
               struct iatt *postparent)
87c3ef
 {
87c3ef
+    pl_inode_t *pl_inode;
87c3ef
+
87c3ef
+    if (op_ret >= 0) {
87c3ef
+        pl_inode = pl_inode_get(this, inode, NULL);
87c3ef
+        if (pl_inode == NULL) {
87c3ef
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
87c3ef
+                            NULL);
87c3ef
+            return 0;
87c3ef
+        }
87c3ef
+
87c3ef
+        pthread_mutex_lock(&pl_inode->mutex);
87c3ef
+
87c3ef
+        /* We only update the link count if we previously didn't know it.
87c3ef
+         * Doing it always can lead to races since lookup is not executed
87c3ef
+         * atomically most of the times. */
87c3ef
+        if (pl_inode->links == -2) {
87c3ef
+            pl_inode->links = pl_check_link_count(xdata);
87c3ef
+            if (buf->ia_type == IA_IFDIR) {
87c3ef
+                /* Directories have at least 2 links. To avoid special handling
87c3ef
+                 * for directories, we simply decrement the value here to make
87c3ef
+                 * them equivalent to regular files. */
87c3ef
+                pl_inode->links--;
87c3ef
+            }
87c3ef
+        }
87c3ef
+
87c3ef
+        pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
+    }
87c3ef
+
87c3ef
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
87c3ef
                     postparent);
87c3ef
     return 0;
87c3ef
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
87c3ef
 int32_t
87c3ef
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
87c3ef
 {
87c3ef
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
87c3ef
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
87c3ef
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
87c3ef
+    int32_t error;
87c3ef
+
87c3ef
+    error = pl_request_link_count(&xdata);
87c3ef
+    if (error == 0) {
87c3ef
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
87c3ef
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
87c3ef
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
87c3ef
+        dict_unref(xdata);
87c3ef
+    } else {
87c3ef
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
87c3ef
+    }
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -3792,6 +3897,10 @@ unlock:
87c3ef
             gf_proc_dump_write("posixlk-count", "%d", count);
87c3ef
             __dump_posixlks(pl_inode);
87c3ef
         }
87c3ef
+
87c3ef
+        gf_proc_dump_write("links", "%d", pl_inode->links);
87c3ef
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
87c3ef
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
87c3ef
     }
87c3ef
     pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
 
87c3ef
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
87c3ef
               struct iatt *postoldparent, struct iatt *prenewparent,
87c3ef
               struct iatt *postnewparent, dict_t *xdata)
87c3ef
 {
87c3ef
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
87c3ef
+
87c3ef
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
87c3ef
                     postoldparent, prenewparent, postnewparent, xdata);
87c3ef
+
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -4146,10 +4258,15 @@ int32_t
87c3ef
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
87c3ef
           dict_t *xdata)
87c3ef
 {
87c3ef
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
87c3ef
+    int32_t error;
87c3ef
+
87c3ef
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
87c3ef
+                            pl_rename_cbk, oldloc, newloc, xdata);
87c3ef
+    if (error > 0) {
87c3ef
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
87c3ef
+                            NULL, NULL);
87c3ef
+    }
87c3ef
 
87c3ef
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
87c3ef
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
87c3ef
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
87c3ef
               dict_t *xdata)
87c3ef
 {
87c3ef
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
87c3ef
+
87c3ef
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
87c3ef
                     postparent, xdata);
87c3ef
+
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -4282,9 +4402,14 @@ int32_t
87c3ef
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
87c3ef
           dict_t *xdata)
87c3ef
 {
87c3ef
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
87c3ef
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
87c3ef
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
87c3ef
+    int32_t error;
87c3ef
+
87c3ef
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
87c3ef
+                            pl_unlink_cbk, loc, xflag, xdata);
87c3ef
+    if (error > 0) {
87c3ef
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
87c3ef
+    }
87c3ef
+
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
87c3ef
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
87c3ef
              dict_t *xdata)
87c3ef
 {
87c3ef
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
87c3ef
+
87c3ef
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
87c3ef
                                postparent, xdata);
87c3ef
+
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -4360,9 +4488,14 @@ int
87c3ef
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
87c3ef
          dict_t *xdata)
87c3ef
 {
87c3ef
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
87c3ef
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
87c3ef
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
87c3ef
+    int32_t error;
87c3ef
+
87c3ef
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
87c3ef
+                            pl_rmdir_cbk, loc, xflags, xdata);
87c3ef
+    if (error > 0) {
87c3ef
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
87c3ef
+    }
87c3ef
+
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
87c3ef
             int32_t op_errno, inode_t *inode, struct iatt *buf,
87c3ef
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
87c3ef
 {
87c3ef
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
87c3ef
+
87c3ef
+    if (op_ret >= 0) {
87c3ef
+        pthread_mutex_lock(&pl_inode->mutex);
87c3ef
+
87c3ef
+        /* TODO: can happen pl_inode->links == 0 ? */
87c3ef
+        if (pl_inode->links >= 0) {
87c3ef
+            pl_inode->links++;
87c3ef
+        }
87c3ef
+
87c3ef
+        pthread_mutex_unlock(&pl_inode->mutex);
87c3ef
+    }
87c3ef
+
87c3ef
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
87c3ef
                                preparent, postparent, xdata);
87c3ef
     return 0;
87c3ef
@@ -4401,9 +4547,18 @@ int
87c3ef
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
87c3ef
         dict_t *xdata)
87c3ef
 {
87c3ef
+    pl_inode_t *pl_inode;
87c3ef
+
87c3ef
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
87c3ef
+    if (pl_inode == NULL) {
87c3ef
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
87c3ef
+                            NULL);
87c3ef
+        return 0;
87c3ef
+    }
87c3ef
+
87c3ef
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
87c3ef
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
87c3ef
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
87c3ef
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
87c3ef
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
87c3ef
     return 0;
87c3ef
 }
87c3ef
 
87c3ef
-- 
87c3ef
1.8.3.1
87c3ef