9ae3f9
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
9ae3f9
From: Xavi Hernandez <xhernandez@redhat.com>
9ae3f9
Date: Fri, 12 Jun 2020 00:06:36 +0200
9ae3f9
Subject: [PATCH 455/456] locks: prevent deletion of locked entries
9ae3f9
9ae3f9
To keep consistency inside transactions started by locking an entry or
9ae3f9
an inode, this change delays the removal of entries that are currently
9ae3f9
locked by one or more clients. Once all locks are released, the removal
9ae3f9
is processed.
9ae3f9
9ae3f9
It has also been improved the detection of stale inodes in the locking
9ae3f9
code of EC.
9ae3f9
9ae3f9
>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
9ae3f9
>Fixes: #990
9ae3f9
9ae3f9
Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
9ae3f9
BUG: 1812789
9ae3f9
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
9ae3f9
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
9ae3f9
Tested-by: RHGS Build Bot <nigelb@redhat.com>
9ae3f9
---
9ae3f9
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
9ae3f9
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
9ae3f9
 xlators/features/locks/src/common.h  |  43 +++++
9ae3f9
 xlators/features/locks/src/entrylk.c |  19 +--
9ae3f9
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
9ae3f9
 xlators/features/locks/src/locks.h   |  23 ++-
9ae3f9
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
9ae3f9
 7 files changed, 689 insertions(+), 114 deletions(-)
9ae3f9
9ae3f9
diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
9ae3f9
index ffcac07..db86296 100644
9ae3f9
--- a/xlators/cluster/ec/src/ec-locks.c
9ae3f9
+++ b/xlators/cluster/ec/src/ec-locks.c
9ae3f9
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
9ae3f9
     ec_t *ec = fop->xl->private;
9ae3f9
     ec_cbk_data_t *ans = NULL;
9ae3f9
     ec_cbk_data_t *cbk = NULL;
9ae3f9
-    uintptr_t locked = 0, notlocked = 0;
9ae3f9
+    uintptr_t locked = 0;
9ae3f9
+    int32_t good = 0;
9ae3f9
+    int32_t eagain = 0;
9ae3f9
+    int32_t estale = 0;
9ae3f9
     int32_t error = -1;
9ae3f9
 
9ae3f9
+    /* There are some errors that we'll handle in an special way while trying
9ae3f9
+     * to acquire a lock.
9ae3f9
+     *
9ae3f9
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
9ae3f9
+     *            consider that there's contention on the inode, so we consider
9ae3f9
+     *            the acquisition a failure and try again with a sequential
9ae3f9
+     *            blocking lock request. This will ensure that we get a lock on
9ae3f9
+     *            as many bricks as possible (ignoring EAGAIN here would cause
9ae3f9
+     *            unnecessary triggers of self-healing).
9ae3f9
+     *
9ae3f9
+     *            If it's found during a sequential blocking lock request, it's
9ae3f9
+     *            considered an error. Lock will only succeed if there are
9ae3f9
+     *            enough other bricks locked.
9ae3f9
+     *
9ae3f9
+     *   ESTALE:  This can appear during parallel or sequential lock request if
9ae3f9
+     *            the inode has just been unlinked. We consider this error is
9ae3f9
+     *            not recoverable, but we also don't consider it as fatal. So,
9ae3f9
+     *            if it happens during parallel lock, we won't attempt a
9ae3f9
+     *            sequential one unless there are EAGAIN errors on other
9ae3f9
+     *            bricks (and are enough to form a quorum), but if we reach
9ae3f9
+     *            quorum counting the ESTALE bricks, we consider the whole
9ae3f9
+     *            result of the operation is ESTALE instead of EIO.
9ae3f9
+     */
9ae3f9
+
9ae3f9
     list_for_each_entry(ans, &fop->cbk_list, list)
9ae3f9
     {
9ae3f9
         if (ans->op_ret >= 0) {
9ae3f9
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
9ae3f9
                 error = EIO;
9ae3f9
             }
9ae3f9
             locked |= ans->mask;
9ae3f9
+            good = ans->count;
9ae3f9
             cbk = ans;
9ae3f9
-        } else {
9ae3f9
-            if (ans->op_errno == EAGAIN) {
9ae3f9
-                switch (fop->uint32) {
9ae3f9
-                    case EC_LOCK_MODE_NONE:
9ae3f9
-                    case EC_LOCK_MODE_ALL:
9ae3f9
-                        /* Goal is to treat non-blocking lock as failure
9ae3f9
-                         * even if there is a single EAGAIN*/
9ae3f9
-                        notlocked |= ans->mask;
9ae3f9
-                        break;
9ae3f9
-                }
9ae3f9
-            }
9ae3f9
+        } else if (ans->op_errno == ESTALE) {
9ae3f9
+            estale += ans->count;
9ae3f9
+        } else if ((ans->op_errno == EAGAIN) &&
9ae3f9
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
9ae3f9
+            eagain += ans->count;
9ae3f9
         }
9ae3f9
     }
9ae3f9
 
9ae3f9
     if (error == -1) {
9ae3f9
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
9ae3f9
-            if (notlocked == 0) {
9ae3f9
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
9ae3f9
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
9ae3f9
+         * we retry with a sequential blocking lock request if needed.
9ae3f9
+         * Otherwise we succeed. */
9ae3f9
+        if ((good + eagain) >= ec->fragments) {
9ae3f9
+            if (eagain == 0) {
9ae3f9
                 if (fop->answer == NULL) {
9ae3f9
                     fop->answer = cbk;
9ae3f9
                 }
9ae3f9
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
9ae3f9
                     case EC_LOCK_MODE_NONE:
9ae3f9
                         error = EAGAIN;
9ae3f9
                         break;
9ae3f9
-
9ae3f9
                     case EC_LOCK_MODE_ALL:
9ae3f9
                         fop->uint32 = EC_LOCK_MODE_INC;
9ae3f9
                         break;
9ae3f9
-
9ae3f9
                     default:
9ae3f9
+                        /* This shouldn't happen because eagain cannot be > 0
9ae3f9
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
9ae3f9
                         error = EIO;
9ae3f9
                         break;
9ae3f9
                 }
9ae3f9
             }
9ae3f9
         } else {
9ae3f9
-            if (fop->answer && fop->answer->op_ret < 0)
9ae3f9
+            /* We have been unable to find enough candidates that will be able
9ae3f9
+             * to take the lock. If we have quorum on some answer, we return
9ae3f9
+             * it. Otherwise we check if ESTALE answers allow us to reach
9ae3f9
+             * quorum. If so, we return ESTALE. */
9ae3f9
+            if (fop->answer && fop->answer->op_ret < 0) {
9ae3f9
                 error = fop->answer->op_errno;
9ae3f9
-            else
9ae3f9
+            } else if ((good + eagain + estale) >= ec->fragments) {
9ae3f9
+                error = ESTALE;
9ae3f9
+            } else {
9ae3f9
                 error = EIO;
9ae3f9
+            }
9ae3f9
         }
9ae3f9
     }
9ae3f9
 
9ae3f9
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
9ae3f9
index 1406e70..0c52853 100644
9ae3f9
--- a/xlators/features/locks/src/common.c
9ae3f9
+++ b/xlators/features/locks/src/common.c
9ae3f9
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
9ae3f9
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
9ae3f9
         INIT_LIST_HEAD(&pl_inode->metalk_list);
9ae3f9
         INIT_LIST_HEAD(&pl_inode->queued_locks);
9ae3f9
+        INIT_LIST_HEAD(&pl_inode->waiting);
9ae3f9
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
9ae3f9
 
9ae3f9
         pl_inode->check_mlock_info = _gf_true;
9ae3f9
         pl_inode->mlock_enforced = _gf_false;
9ae3f9
 
9ae3f9
+        /* -2 means never looked up. -1 means something went wrong and link
9ae3f9
+         * tracking is disabled. */
9ae3f9
+        pl_inode->links = -2;
9ae3f9
+
9ae3f9
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
9ae3f9
         if (ret) {
9ae3f9
             pthread_mutex_destroy(&pl_inode->mutex);
9ae3f9
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
9ae3f9
     }
9ae3f9
 
9ae3f9
     return 0;
9ae3f9
-}
9ae3f9
\ No newline at end of file
9ae3f9
+}
9ae3f9
+
9ae3f9
+gf_boolean_t
9ae3f9
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
9ae3f9
+{
9ae3f9
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
9ae3f9
+        return _gf_true;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (is_lk_owner_null(owner)) {
9ae3f9
+        return _gf_false;
9ae3f9
+    }
9ae3f9
+    return _gf_true;
9ae3f9
+}
9ae3f9
+
9ae3f9
+static int32_t
9ae3f9
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
9ae3f9
+{
9ae3f9
+    inode_t *inode = NULL;
9ae3f9
+    int32_t error = 0;
9ae3f9
+
9ae3f9
+    if (loc->inode != NULL) {
9ae3f9
+        inode = inode_ref(loc->inode);
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (loc->parent == NULL) {
9ae3f9
+        error = EINVAL;
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (!gf_uuid_is_null(loc->gfid)) {
9ae3f9
+        inode = inode_find(loc->parent->table, loc->gfid);
9ae3f9
+        if (inode != NULL) {
9ae3f9
+            goto done;
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (loc->name == NULL) {
9ae3f9
+        error = EINVAL;
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
9ae3f9
+    if (inode == NULL) {
9ae3f9
+        /* We haven't found any inode. This means that the file doesn't exist
9ae3f9
+         * or that even if it exists, we don't have any knowledge about it, so
9ae3f9
+         * we don't have locks on it either, which is fine for our purposes. */
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+done:
9ae3f9
+    *pinode = inode;
9ae3f9
+
9ae3f9
+    return error;
9ae3f9
+}
9ae3f9
+
9ae3f9
+static gf_boolean_t
9ae3f9
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
9ae3f9
+                    struct timespec *now, struct list_head *contend)
9ae3f9
+{
9ae3f9
+    pl_dom_list_t *dom;
9ae3f9
+    pl_inode_lock_t *lock;
9ae3f9
+    gf_boolean_t has_owners = _gf_false;
9ae3f9
+
9ae3f9
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
9ae3f9
+    {
9ae3f9
+        list_for_each_entry(lock, &dom->inodelk_list, list)
9ae3f9
+        {
9ae3f9
+            /* If the lock belongs to the same client, we assume it's related
9ae3f9
+             * to the same operation, so we allow the removal to continue. */
9ae3f9
+            if (lock->client == client) {
9ae3f9
+                continue;
9ae3f9
+            }
9ae3f9
+            /* If the lock belongs to an internal process, we don't block the
9ae3f9
+             * removal. */
9ae3f9
+            if (lock->client_pid < 0) {
9ae3f9
+                continue;
9ae3f9
+            }
9ae3f9
+            if (contend == NULL) {
9ae3f9
+                return _gf_true;
9ae3f9
+            }
9ae3f9
+            has_owners = _gf_true;
9ae3f9
+            inodelk_contention_notify_check(xl, lock, now, contend);
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    return has_owners;
9ae3f9
+}
9ae3f9
+
9ae3f9
+int32_t
9ae3f9
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
9ae3f9
+                        pl_inode_t **ppl_inode, struct list_head *contend)
9ae3f9
+{
9ae3f9
+    struct timespec now;
9ae3f9
+    inode_t *inode;
9ae3f9
+    pl_inode_t *pl_inode;
9ae3f9
+    int32_t error;
9ae3f9
+
9ae3f9
+    pl_inode = NULL;
9ae3f9
+
9ae3f9
+    error = pl_inode_from_loc(loc, &inode;;
9ae3f9
+    if ((error != 0) || (inode == NULL)) {
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    pl_inode = pl_inode_get(xl, inode, NULL);
9ae3f9
+    if (pl_inode == NULL) {
9ae3f9
+        inode_unref(inode);
9ae3f9
+        error = ENOMEM;
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    /* pl_inode_from_loc() already increments ref count for inode, so
9ae3f9
+     * we only assign here our reference. */
9ae3f9
+    pl_inode->inode = inode;
9ae3f9
+
9ae3f9
+    timespec_now(&now;;
9ae3f9
+
9ae3f9
+    pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+    if (pl_inode->removed) {
9ae3f9
+        error = ESTALE;
9ae3f9
+        goto unlock;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
9ae3f9
+        error = -1;
9ae3f9
+        /* We skip the unlock here because the caller must create a stub when
9ae3f9
+         * we return -1 and do a call to pl_inode_remove_complete(), which
9ae3f9
+         * assumes the lock is still acquired and will release it once
9ae3f9
+         * everything else is prepared. */
9ae3f9
+        goto done;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    pl_inode->is_locked = _gf_true;
9ae3f9
+    pl_inode->remove_running++;
9ae3f9
+
9ae3f9
+unlock:
9ae3f9
+    pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+done:
9ae3f9
+    *ppl_inode = pl_inode;
9ae3f9
+
9ae3f9
+    return error;
9ae3f9
+}
9ae3f9
+
9ae3f9
+int32_t
9ae3f9
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
9ae3f9
+                         struct list_head *contend)
9ae3f9
+{
9ae3f9
+    pl_inode_lock_t *lock;
9ae3f9
+    int32_t error = -1;
9ae3f9
+
9ae3f9
+    if (stub != NULL) {
9ae3f9
+        list_add_tail(&stub->list, &pl_inode->waiting);
9ae3f9
+        pl_inode->is_locked = _gf_true;
9ae3f9
+    } else {
9ae3f9
+        error = ENOMEM;
9ae3f9
+
9ae3f9
+        while (!list_empty(contend)) {
9ae3f9
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
9ae3f9
+            list_del_init(&lock->list);
9ae3f9
+            __pl_inodelk_unref(lock);
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+    if (error < 0) {
9ae3f9
+        inodelk_contention_notify(xl, contend);
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    inode_unref(pl_inode->inode);
9ae3f9
+
9ae3f9
+    return error;
9ae3f9
+}
9ae3f9
+
9ae3f9
+void
9ae3f9
+pl_inode_remove_wake(struct list_head *list)
9ae3f9
+{
9ae3f9
+    call_stub_t *stub;
9ae3f9
+
9ae3f9
+    while (!list_empty(list)) {
9ae3f9
+        stub = list_first_entry(list, call_stub_t, list);
9ae3f9
+        list_del_init(&stub->list);
9ae3f9
+
9ae3f9
+        call_resume(stub);
9ae3f9
+    }
9ae3f9
+}
9ae3f9
+
9ae3f9
+void
9ae3f9
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
9ae3f9
+{
9ae3f9
+    struct list_head contend, granted;
9ae3f9
+    struct timespec now;
9ae3f9
+    pl_dom_list_t *dom;
9ae3f9
+
9ae3f9
+    if (pl_inode == NULL) {
9ae3f9
+        return;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    INIT_LIST_HEAD(&contend);
9ae3f9
+    INIT_LIST_HEAD(&granted);
9ae3f9
+    timespec_now(&now;;
9ae3f9
+
9ae3f9
+    pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+    if (error == 0) {
9ae3f9
+        if (pl_inode->links >= 0) {
9ae3f9
+            pl_inode->links--;
9ae3f9
+        }
9ae3f9
+        if (pl_inode->links == 0) {
9ae3f9
+            pl_inode->removed = _gf_true;
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    pl_inode->remove_running--;
9ae3f9
+
9ae3f9
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
9ae3f9
+        pl_inode->is_locked = _gf_false;
9ae3f9
+
9ae3f9
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
9ae3f9
+        {
9ae3f9
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
9ae3f9
+                                        &contend);
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+    unwind_granted_inodes(xl, pl_inode, &granted);
9ae3f9
+
9ae3f9
+    inodelk_contention_notify(xl, &contend);
9ae3f9
+
9ae3f9
+    inode_unref(pl_inode->inode);
9ae3f9
+}
9ae3f9
+
9ae3f9
+void
9ae3f9
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
9ae3f9
+                         struct list_head *list)
9ae3f9
+{
9ae3f9
+    call_stub_t *stub, *tmp;
9ae3f9
+
9ae3f9
+    if (!pl_inode->is_locked) {
9ae3f9
+        return;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
9ae3f9
+    {
9ae3f9
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
9ae3f9
+                                 NULL)) {
9ae3f9
+            list_move_tail(&stub->list, list);
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+}
9ae3f9
+
9ae3f9
+/* This function determines if an inodelk attempt can be done now or it needs
9ae3f9
+ * to wait.
9ae3f9
+ *
9ae3f9
+ * Possible return values:
9ae3f9
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
9ae3f9
+ *        inode has been deleted previously by unlink/rmdir/rename
9ae3f9
+ *   = 0: The lock can be attempted.
9ae3f9
+ *   > 0: The lock needs to wait because a conflicting remove operation is
9ae3f9
+ *        ongoing.
9ae3f9
+ */
9ae3f9
+int32_t
9ae3f9
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
9ae3f9
+{
9ae3f9
+    pl_dom_list_t *dom;
9ae3f9
+    pl_inode_lock_t *ilock;
9ae3f9
+
9ae3f9
+    /* If the inode has been deleted, we won't allow any lock. */
9ae3f9
+    if (pl_inode->removed) {
9ae3f9
+        return -ESTALE;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    /* We only synchronize with locks made for regular operations coming from
9ae3f9
+     * the user. Locks done for internal purposes are hard to control and could
9ae3f9
+     * lead to long delays or deadlocks quite easily. */
9ae3f9
+    if (lock->client_pid < 0) {
9ae3f9
+        return 0;
9ae3f9
+    }
9ae3f9
+    if (!pl_inode->is_locked) {
9ae3f9
+        return 0;
9ae3f9
+    }
9ae3f9
+    if (pl_inode->remove_running > 0) {
9ae3f9
+        return 1;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
9ae3f9
+    {
9ae3f9
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
9ae3f9
+        {
9ae3f9
+            /* If a lock from the same client is already granted, we allow this
9ae3f9
+             * one to continue. This is necessary to prevent deadlocks when
9ae3f9
+             * multiple locks are taken for the same operation.
9ae3f9
+             *
9ae3f9
+             * On the other side it's unlikely that the same client sends
9ae3f9
+             * completely unrelated locks for the same inode.
9ae3f9
+             */
9ae3f9
+            if (ilock->client == lock->client) {
9ae3f9
+                return 0;
9ae3f9
+            }
9ae3f9
+        }
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    return 1;
9ae3f9
+}
9ae3f9
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
9ae3f9
index ea86b96..6c81ac3 100644
9ae3f9
--- a/xlators/features/locks/src/common.h
9ae3f9
+++ b/xlators/features/locks/src/common.h
9ae3f9
@@ -105,6 +105,15 @@ void
9ae3f9
 __pl_inodelk_unref(pl_inode_lock_t *lock);
9ae3f9
 
9ae3f9
 void
9ae3f9
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
+                            struct list_head *granted, pl_dom_list_t *dom,
9ae3f9
+                            struct timespec *now, struct list_head *contend);
9ae3f9
+
9ae3f9
+void
9ae3f9
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
+                      struct list_head *granted);
9ae3f9
+
9ae3f9
+void
9ae3f9
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
                           pl_dom_list_t *dom, struct timespec *now,
9ae3f9
                           struct list_head *contend);
9ae3f9
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
9ae3f9
 void
9ae3f9
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
9ae3f9
 
9ae3f9
+void
9ae3f9
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
9ae3f9
+                                struct timespec *now,
9ae3f9
+                                struct list_head *contend);
9ae3f9
+
9ae3f9
+void
9ae3f9
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
9ae3f9
+                                struct timespec *now,
9ae3f9
+                                struct list_head *contend);
9ae3f9
+
9ae3f9
 gf_boolean_t
9ae3f9
 pl_does_monkey_want_stuck_lock();
9ae3f9
 
9ae3f9
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
9ae3f9
 int
9ae3f9
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
9ae3f9
 
9ae3f9
+gf_boolean_t
9ae3f9
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
9ae3f9
+
9ae3f9
+int32_t
9ae3f9
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
9ae3f9
+                        pl_inode_t **ppl_inode, struct list_head *contend);
9ae3f9
+
9ae3f9
+int32_t
9ae3f9
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
9ae3f9
+                         struct list_head *contend);
9ae3f9
+
9ae3f9
+void
9ae3f9
+pl_inode_remove_wake(struct list_head *list);
9ae3f9
+
9ae3f9
+void
9ae3f9
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
9ae3f9
+
9ae3f9
+void
9ae3f9
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
9ae3f9
+                         struct list_head *list);
9ae3f9
+
9ae3f9
+int32_t
9ae3f9
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
9ae3f9
+
9ae3f9
 #endif /* __COMMON_H__ */
9ae3f9
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
9ae3f9
index 93c649c..b97836f 100644
9ae3f9
--- a/xlators/features/locks/src/entrylk.c
9ae3f9
+++ b/xlators/features/locks/src/entrylk.c
9ae3f9
@@ -197,9 +197,9 @@ out:
9ae3f9
     return revoke_lock;
9ae3f9
 }
9ae3f9
 
9ae3f9
-static gf_boolean_t
9ae3f9
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
9ae3f9
-                                  struct timespec *now)
9ae3f9
+void
9ae3f9
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
9ae3f9
+                                struct timespec *now, struct list_head *contend)
9ae3f9
 {
9ae3f9
     posix_locks_private_t *priv;
9ae3f9
     int64_t elapsed;
9ae3f9
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
9ae3f9
     /* If this lock is in a list, it means that we are about to send a
9ae3f9
      * notification for it, so no need to do anything else. */
9ae3f9
     if (!list_empty(&lock->contend)) {
9ae3f9
-        return _gf_false;
9ae3f9
+        return;
9ae3f9
     }
9ae3f9
 
9ae3f9
     elapsed = now->tv_sec;
9ae3f9
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
9ae3f9
         elapsed--;
9ae3f9
     }
9ae3f9
     if (elapsed < priv->notify_contention_delay) {
9ae3f9
-        return _gf_false;
9ae3f9
+        return;
9ae3f9
     }
9ae3f9
 
9ae3f9
     /* All contention notifications will be sent outside of the locked
9ae3f9
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
9ae3f9
 
9ae3f9
     lock->contention_time = *now;
9ae3f9
 
9ae3f9
-    return _gf_true;
9ae3f9
+    list_add_tail(&lock->contend, contend);
9ae3f9
 }
9ae3f9
 
9ae3f9
 void
9ae3f9
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
9ae3f9
                     break;
9ae3f9
                 }
9ae3f9
             }
9ae3f9
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
9ae3f9
-                list_add_tail(&tmp->contend, contend);
9ae3f9
-            }
9ae3f9
+            entrylk_contention_notify_check(this, tmp, now, contend);
9ae3f9
         }
9ae3f9
     }
9ae3f9
 
9ae3f9
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
9ae3f9
 
9ae3f9
         if (bl_ret == 0) {
9ae3f9
-            list_add(&bl->blocked_locks, granted);
9ae3f9
+            list_add_tail(&bl->blocked_locks, granted);
9ae3f9
         }
9ae3f9
     }
9ae3f9
-    return;
9ae3f9
 }
9ae3f9
 
9ae3f9
 /* Grants locks if possible which are blocked on a lock */
9ae3f9
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
9ae3f9
index 24dee49..1a07243 100644
9ae3f9
--- a/xlators/features/locks/src/inodelk.c
9ae3f9
+++ b/xlators/features/locks/src/inodelk.c
9ae3f9
@@ -231,9 +231,9 @@ out:
9ae3f9
     return revoke_lock;
9ae3f9
 }
9ae3f9
 
9ae3f9
-static gf_boolean_t
9ae3f9
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
9ae3f9
-                                  struct timespec *now)
9ae3f9
+void
9ae3f9
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
9ae3f9
+                                struct timespec *now, struct list_head *contend)
9ae3f9
 {
9ae3f9
     posix_locks_private_t *priv;
9ae3f9
     int64_t elapsed;
9ae3f9
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
9ae3f9
     /* If this lock is in a list, it means that we are about to send a
9ae3f9
      * notification for it, so no need to do anything else. */
9ae3f9
     if (!list_empty(&lock->contend)) {
9ae3f9
-        return _gf_false;
9ae3f9
+        return;
9ae3f9
     }
9ae3f9
 
9ae3f9
     elapsed = now->tv_sec;
9ae3f9
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
9ae3f9
         elapsed--;
9ae3f9
     }
9ae3f9
     if (elapsed < priv->notify_contention_delay) {
9ae3f9
-        return _gf_false;
9ae3f9
+        return;
9ae3f9
     }
9ae3f9
 
9ae3f9
     /* All contention notifications will be sent outside of the locked
9ae3f9
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
9ae3f9
 
9ae3f9
     lock->contention_time = *now;
9ae3f9
 
9ae3f9
-    return _gf_true;
9ae3f9
+    list_add_tail(&lock->contend, contend);
9ae3f9
 }
9ae3f9
 
9ae3f9
 void
9ae3f9
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
9ae3f9
                     break;
9ae3f9
                 }
9ae3f9
             }
9ae3f9
-            if (__inodelk_needs_contention_notify(this, l, now)) {
9ae3f9
-                list_add_tail(&l->contend, contend);
9ae3f9
-            }
9ae3f9
+            inodelk_contention_notify_check(this, l, now, contend);
9ae3f9
         }
9ae3f9
     }
9ae3f9
 
9ae3f9
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
9ae3f9
                struct list_head *contend)
9ae3f9
 {
9ae3f9
     pl_inode_lock_t *conf = NULL;
9ae3f9
-    int ret = -EINVAL;
9ae3f9
+    int ret;
9ae3f9
 
9ae3f9
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
9ae3f9
-    if (conf) {
9ae3f9
-        ret = __lock_blocked_add(this, dom, lock, can_block);
9ae3f9
-        goto out;
9ae3f9
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
9ae3f9
+    if (ret < 0) {
9ae3f9
+        return ret;
9ae3f9
+    }
9ae3f9
+    if (ret == 0) {
9ae3f9
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
9ae3f9
+    }
9ae3f9
+    if ((ret > 0) || (conf != NULL)) {
9ae3f9
+        return __lock_blocked_add(this, dom, lock, can_block);
9ae3f9
     }
9ae3f9
 
9ae3f9
     /* To prevent blocked locks starvation, check if there are any blocked
9ae3f9
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
9ae3f9
                    "starvation");
9ae3f9
         }
9ae3f9
 
9ae3f9
-        ret = __lock_blocked_add(this, dom, lock, can_block);
9ae3f9
-        goto out;
9ae3f9
+        return __lock_blocked_add(this, dom, lock, can_block);
9ae3f9
     }
9ae3f9
     __pl_inodelk_ref(lock);
9ae3f9
     gettimeofday(&lock->granted_time, NULL);
9ae3f9
     list_add(&lock->list, &dom->inodelk_list);
9ae3f9
 
9ae3f9
-    ret = 0;
9ae3f9
-
9ae3f9
-out:
9ae3f9
-    return ret;
9ae3f9
+    return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
 /* Return true if the two inodelks have exactly same lock boundaries */
9ae3f9
@@ -529,12 +528,11 @@ out:
9ae3f9
     return conf;
9ae3f9
 }
9ae3f9
 
9ae3f9
-static void
9ae3f9
+void
9ae3f9
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
                             struct list_head *granted, pl_dom_list_t *dom,
9ae3f9
                             struct timespec *now, struct list_head *contend)
9ae3f9
 {
9ae3f9
-    int bl_ret = 0;
9ae3f9
     pl_inode_lock_t *bl = NULL;
9ae3f9
     pl_inode_lock_t *tmp = NULL;
9ae3f9
 
9ae3f9
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
     {
9ae3f9
         list_del_init(&bl->blocked_locks);
9ae3f9
 
9ae3f9
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
9ae3f9
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
9ae3f9
 
9ae3f9
-        if (bl_ret == 0) {
9ae3f9
-            list_add(&bl->blocked_locks, granted);
9ae3f9
+        if (bl->status != -EAGAIN) {
9ae3f9
+            list_add_tail(&bl->blocked_locks, granted);
9ae3f9
         }
9ae3f9
     }
9ae3f9
-    return;
9ae3f9
 }
9ae3f9
 
9ae3f9
-/* Grant all inodelks blocked on a lock */
9ae3f9
 void
9ae3f9
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
-                          pl_dom_list_t *dom, struct timespec *now,
9ae3f9
-                          struct list_head *contend)
9ae3f9
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
+                      struct list_head *granted)
9ae3f9
 {
9ae3f9
-    struct list_head granted;
9ae3f9
     pl_inode_lock_t *lock;
9ae3f9
     pl_inode_lock_t *tmp;
9ae3f9
+    int32_t op_ret;
9ae3f9
+    int32_t op_errno;
9ae3f9
 
9ae3f9
-    INIT_LIST_HEAD(&granted);
9ae3f9
-
9ae3f9
-    pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
-    {
9ae3f9
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
9ae3f9
-                                    contend);
9ae3f9
-    }
9ae3f9
-    pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
-
9ae3f9
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
9ae3f9
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
9ae3f9
     {
9ae3f9
-        gf_log(this->name, GF_LOG_TRACE,
9ae3f9
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
9ae3f9
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
9ae3f9
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
9ae3f9
-               lock->user_flock.l_len);
9ae3f9
-
9ae3f9
+        if (lock->status == 0) {
9ae3f9
+            op_ret = 0;
9ae3f9
+            op_errno = 0;
9ae3f9
+            gf_log(this->name, GF_LOG_TRACE,
9ae3f9
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
9ae3f9
+                   " => Granted",
9ae3f9
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
9ae3f9
+                   lock->client_pid, lkowner_utoa(&lock->owner),
9ae3f9
+                   lock->user_flock.l_start, lock->user_flock.l_len);
9ae3f9
+        } else {
9ae3f9
+            op_ret = -1;
9ae3f9
+            op_errno = -lock->status;
9ae3f9
+        }
9ae3f9
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
9ae3f9
-                     0, 0, lock->volume);
9ae3f9
+                     op_ret, op_errno, lock->volume);
9ae3f9
 
9ae3f9
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
9ae3f9
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
9ae3f9
         lock->frame = NULL;
9ae3f9
     }
9ae3f9
 
9ae3f9
     pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
     {
9ae3f9
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
9ae3f9
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
9ae3f9
         {
9ae3f9
             list_del_init(&lock->blocked_locks);
9ae3f9
             __pl_inodelk_unref(lock);
9ae3f9
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
     pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
 }
9ae3f9
 
9ae3f9
+/* Grant all inodelks blocked on a lock */
9ae3f9
+void
9ae3f9
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
9ae3f9
+                          pl_dom_list_t *dom, struct timespec *now,
9ae3f9
+                          struct list_head *contend)
9ae3f9
+{
9ae3f9
+    struct list_head granted;
9ae3f9
+
9ae3f9
+    INIT_LIST_HEAD(&granted);
9ae3f9
+
9ae3f9
+    pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
+    {
9ae3f9
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
9ae3f9
+                                    contend);
9ae3f9
+    }
9ae3f9
+    pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+    unwind_granted_inodes(this, pl_inode, &granted);
9ae3f9
+}
9ae3f9
+
9ae3f9
 static void
9ae3f9
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
9ae3f9
 {
9ae3f9
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
9ae3f9
                  * and blocked lists, then this means that a parallel
9ae3f9
                  * unlock on another inodelk (L2 say) may have 'granted'
9ae3f9
                  * L1 and added it to 'granted' list in
9ae3f9
-                 * __grant_blocked_node_locks() (although using the
9ae3f9
+                 * __grant_blocked_inode_locks() (although using the
9ae3f9
                  * 'blocked_locks' member). In that case, the cleanup
9ae3f9
                  * codepath must try and grant other overlapping
9ae3f9
                  * blocked inodelks from other clients, now that L1 is
9ae3f9
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
9ae3f9
     gf_boolean_t need_inode_unref = _gf_false;
9ae3f9
     struct list_head *pcontend = NULL;
9ae3f9
     struct list_head contend;
9ae3f9
+    struct list_head wake;
9ae3f9
     struct timespec now = {};
9ae3f9
     short fl_type;
9ae3f9
 
9ae3f9
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
9ae3f9
         timespec_now(&now;;
9ae3f9
     }
9ae3f9
 
9ae3f9
+    INIT_LIST_HEAD(&wake);
9ae3f9
+
9ae3f9
     if (ctx)
9ae3f9
         pthread_mutex_lock(&ctx->lock);
9ae3f9
     pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
9ae3f9
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
9ae3f9
                        lock->client_pid, lkowner_utoa(&lock->owner),
9ae3f9
                        lock->user_flock.l_start, lock->user_flock.l_len);
9ae3f9
-                if (can_block)
9ae3f9
+                if (can_block) {
9ae3f9
                     unref = _gf_false;
9ae3f9
-                /* For all but the case where a non-blocking
9ae3f9
-                 * lock attempt fails, the extra ref taken at
9ae3f9
-                 * the start of this function must be negated.
9ae3f9
-                 */
9ae3f9
-                else
9ae3f9
-                    need_inode_unref = _gf_true;
9ae3f9
+                }
9ae3f9
             }
9ae3f9
-
9ae3f9
-            if (ctx && (!ret || can_block))
9ae3f9
+            /* For all but the case where a non-blocking lock attempt fails
9ae3f9
+             * with -EAGAIN, the extra ref taken at the start of this function
9ae3f9
+             * must be negated. */
9ae3f9
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
9ae3f9
+            if (ctx && !need_inode_unref) {
9ae3f9
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
9ae3f9
+            }
9ae3f9
         } else {
9ae3f9
             /* Irrespective of whether unlock succeeds or not,
9ae3f9
              * the extra inode ref that was done at the start of
9ae3f9
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
9ae3f9
             list_del_init(&retlock->client_list);
9ae3f9
             __pl_inodelk_unref(retlock);
9ae3f9
 
9ae3f9
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
9ae3f9
+
9ae3f9
             ret = 0;
9ae3f9
         }
9ae3f9
     out:
9ae3f9
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
9ae3f9
     if (ctx)
9ae3f9
         pthread_mutex_unlock(&ctx->lock);
9ae3f9
 
9ae3f9
+    pl_inode_remove_wake(&wake);
9ae3f9
+
9ae3f9
     /* The following (extra) unref corresponds to the ref that
9ae3f9
      * was done at the time the lock was granted.
9ae3f9
      */
9ae3f9
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
9ae3f9
                                  inode);
9ae3f9
 
9ae3f9
             if (ret < 0) {
9ae3f9
-                if ((can_block) && (F_UNLCK != lock_type)) {
9ae3f9
-                    goto out;
9ae3f9
+                if (ret == -EAGAIN) {
9ae3f9
+                    if (can_block && (F_UNLCK != lock_type)) {
9ae3f9
+                        goto out;
9ae3f9
+                    }
9ae3f9
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
9ae3f9
+                } else {
9ae3f9
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
9ae3f9
                 }
9ae3f9
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
9ae3f9
                 op_errno = -ret;
9ae3f9
                 goto unwind;
9ae3f9
             }
9ae3f9
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
9ae3f9
index aa267de..6666feb 100644
9ae3f9
--- a/xlators/features/locks/src/locks.h
9ae3f9
+++ b/xlators/features/locks/src/locks.h
9ae3f9
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
9ae3f9
 
9ae3f9
     struct list_head client_list; /* list of all locks from a client */
9ae3f9
     short fl_type;
9ae3f9
+
9ae3f9
+    int32_t status; /* Error code when we try to grant a lock in blocked
9ae3f9
+                       state */
9ae3f9
 };
9ae3f9
 typedef struct __pl_inode_lock pl_inode_lock_t;
9ae3f9
 
9ae3f9
@@ -164,13 +167,14 @@ struct __pl_inode {
9ae3f9
     struct list_head rw_list;            /* list of waiting r/w requests */
9ae3f9
     struct list_head reservelk_list;     /* list of reservelks */
9ae3f9
     struct list_head blocked_reservelks; /* list of blocked reservelks */
9ae3f9
-    struct list_head
9ae3f9
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
9ae3f9
-    struct list_head metalk_list; /* Meta lock list */
9ae3f9
-                                  /* This is to store the incoming lock
9ae3f9
-                                     requests while meta lock is enabled */
9ae3f9
-    struct list_head queued_locks;
9ae3f9
-    int mandatory; /* if mandatory locking is enabled */
9ae3f9
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
9ae3f9
+                                            reserve is held*/
9ae3f9
+    struct list_head metalk_list;        /* Meta lock list */
9ae3f9
+    struct list_head queued_locks;       /* This is to store the incoming lock
9ae3f9
+                                            requests while meta lock is enabled */
9ae3f9
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
9ae3f9
+                                 the inode. */
9ae3f9
+    int mandatory;            /* if mandatory locking is enabled */
9ae3f9
 
9ae3f9
     inode_t *refkeeper; /* hold refs on an inode while locks are
9ae3f9
                            held to prevent pruning */
9ae3f9
@@ -197,6 +201,11 @@ struct __pl_inode {
9ae3f9
     */
9ae3f9
     int fop_wind_count;
9ae3f9
     pthread_cond_t check_fop_wind_count;
9ae3f9
+
9ae3f9
+    int32_t links;           /* Number of hard links the inode has. */
9ae3f9
+    uint32_t remove_running; /* Number of remove operations running. */
9ae3f9
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
9ae3f9
+    gf_boolean_t removed;    /* The inode has been deleted. */
9ae3f9
 };
9ae3f9
 typedef struct __pl_inode pl_inode_t;
9ae3f9
 
9ae3f9
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
9ae3f9
index 7887b82..5ae0125 100644
9ae3f9
--- a/xlators/features/locks/src/posix.c
9ae3f9
+++ b/xlators/features/locks/src/posix.c
9ae3f9
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
9ae3f9
         }                                                                      \
9ae3f9
     } while (0)
9ae3f9
 
9ae3f9
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
9ae3f9
+                        _args...)                                              \
9ae3f9
+    ({                                                                         \
9ae3f9
+        struct list_head contend;                                              \
9ae3f9
+        pl_inode_t *__pl_inode;                                                \
9ae3f9
+        call_stub_t *__stub;                                                   \
9ae3f9
+        int32_t __error;                                                       \
9ae3f9
+        INIT_LIST_HEAD(&contend);                                              \
9ae3f9
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
9ae3f9
+                                          &__pl_inode, &contend);              \
9ae3f9
+        if (__error < 0) {                                                     \
9ae3f9
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
9ae3f9
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
9ae3f9
+                                               &contend);                      \
9ae3f9
+        } else if (__error == 0) {                                             \
9ae3f9
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
9ae3f9
+                                  _loc2);                                      \
9ae3f9
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
9ae3f9
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
9ae3f9
+        }                                                                      \
9ae3f9
+        __error;                                                               \
9ae3f9
+    })
9ae3f9
+
9ae3f9
 gf_boolean_t
9ae3f9
 pl_has_xdata_requests(dict_t *xdata)
9ae3f9
 {
9ae3f9
@@ -2969,11 +2992,85 @@ out:
9ae3f9
     return ret;
9ae3f9
 }
9ae3f9
 
9ae3f9
+static int32_t
9ae3f9
+pl_request_link_count(dict_t **pxdata)
9ae3f9
+{
9ae3f9
+    dict_t *xdata;
9ae3f9
+
9ae3f9
+    xdata = *pxdata;
9ae3f9
+    if (xdata == NULL) {
9ae3f9
+        xdata = dict_new();
9ae3f9
+        if (xdata == NULL) {
9ae3f9
+            return ENOMEM;
9ae3f9
+        }
9ae3f9
+    } else {
9ae3f9
+        dict_ref(xdata);
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
9ae3f9
+        dict_unref(xdata);
9ae3f9
+        return ENOMEM;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    *pxdata = xdata;
9ae3f9
+
9ae3f9
+    return 0;
9ae3f9
+}
9ae3f9
+
9ae3f9
+static int32_t
9ae3f9
+pl_check_link_count(dict_t *xdata)
9ae3f9
+{
9ae3f9
+    int32_t count;
9ae3f9
+
9ae3f9
+    /* In case we are unable to read the link count from xdata, we take a
9ae3f9
+     * conservative approach and return -2, which will prevent the inode from
9ae3f9
+     * being considered deleted. In fact it will cause link tracking for this
9ae3f9
+     * inode to be disabled completely to avoid races. */
9ae3f9
+
9ae3f9
+    if (xdata == NULL) {
9ae3f9
+        return -2;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
9ae3f9
+        return -2;
9ae3f9
+    }
9ae3f9
+
9ae3f9
+    return count;
9ae3f9
+}
9ae3f9
+
9ae3f9
 int32_t
9ae3f9
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
9ae3f9
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
9ae3f9
               struct iatt *postparent)
9ae3f9
 {
9ae3f9
+    pl_inode_t *pl_inode;
9ae3f9
+
9ae3f9
+    if (op_ret >= 0) {
9ae3f9
+        pl_inode = pl_inode_get(this, inode, NULL);
9ae3f9
+        if (pl_inode == NULL) {
9ae3f9
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
9ae3f9
+                            NULL);
9ae3f9
+            return 0;
9ae3f9
+        }
9ae3f9
+
9ae3f9
+        pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+        /* We only update the link count if we previously didn't know it.
9ae3f9
+         * Doing it always can lead to races since lookup is not executed
9ae3f9
+         * atomically most of the times. */
9ae3f9
+        if (pl_inode->links == -2) {
9ae3f9
+            pl_inode->links = pl_check_link_count(xdata);
9ae3f9
+            if (buf->ia_type == IA_IFDIR) {
9ae3f9
+                /* Directories have at least 2 links. To avoid special handling
9ae3f9
+                 * for directories, we simply decrement the value here to make
9ae3f9
+                 * them equivalent to regular files. */
9ae3f9
+                pl_inode->links--;
9ae3f9
+            }
9ae3f9
+        }
9ae3f9
+
9ae3f9
+        pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
+    }
9ae3f9
+
9ae3f9
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
9ae3f9
                     postparent);
9ae3f9
     return 0;
9ae3f9
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
9ae3f9
 int32_t
9ae3f9
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
9ae3f9
 {
9ae3f9
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
9ae3f9
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
9ae3f9
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
9ae3f9
+    int32_t error;
9ae3f9
+
9ae3f9
+    error = pl_request_link_count(&xdata);
9ae3f9
+    if (error == 0) {
9ae3f9
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
9ae3f9
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
9ae3f9
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
9ae3f9
+        dict_unref(xdata);
9ae3f9
+    } else {
9ae3f9
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
9ae3f9
+    }
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -3792,6 +3897,10 @@ unlock:
9ae3f9
             gf_proc_dump_write("posixlk-count", "%d", count);
9ae3f9
             __dump_posixlks(pl_inode);
9ae3f9
         }
9ae3f9
+
9ae3f9
+        gf_proc_dump_write("links", "%d", pl_inode->links);
9ae3f9
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
9ae3f9
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
9ae3f9
     }
9ae3f9
     pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
 
9ae3f9
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
9ae3f9
               struct iatt *postoldparent, struct iatt *prenewparent,
9ae3f9
               struct iatt *postnewparent, dict_t *xdata)
9ae3f9
 {
9ae3f9
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
9ae3f9
+
9ae3f9
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
9ae3f9
                     postoldparent, prenewparent, postnewparent, xdata);
9ae3f9
+
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -4146,10 +4258,15 @@ int32_t
9ae3f9
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
9ae3f9
           dict_t *xdata)
9ae3f9
 {
9ae3f9
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
9ae3f9
+    int32_t error;
9ae3f9
+
9ae3f9
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
9ae3f9
+                            pl_rename_cbk, oldloc, newloc, xdata);
9ae3f9
+    if (error > 0) {
9ae3f9
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
9ae3f9
+                            NULL, NULL);
9ae3f9
+    }
9ae3f9
 
9ae3f9
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
9ae3f9
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
9ae3f9
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
9ae3f9
               dict_t *xdata)
9ae3f9
 {
9ae3f9
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
9ae3f9
+
9ae3f9
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
9ae3f9
                     postparent, xdata);
9ae3f9
+
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -4282,9 +4402,14 @@ int32_t
9ae3f9
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
9ae3f9
           dict_t *xdata)
9ae3f9
 {
9ae3f9
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
9ae3f9
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
9ae3f9
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
9ae3f9
+    int32_t error;
9ae3f9
+
9ae3f9
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
9ae3f9
+                            pl_unlink_cbk, loc, xflag, xdata);
9ae3f9
+    if (error > 0) {
9ae3f9
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
9ae3f9
+    }
9ae3f9
+
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
9ae3f9
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
9ae3f9
              dict_t *xdata)
9ae3f9
 {
9ae3f9
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
9ae3f9
+
9ae3f9
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
9ae3f9
                                postparent, xdata);
9ae3f9
+
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -4360,9 +4488,14 @@ int
9ae3f9
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
9ae3f9
          dict_t *xdata)
9ae3f9
 {
9ae3f9
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
9ae3f9
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
9ae3f9
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
9ae3f9
+    int32_t error;
9ae3f9
+
9ae3f9
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
9ae3f9
+                            pl_rmdir_cbk, loc, xflags, xdata);
9ae3f9
+    if (error > 0) {
9ae3f9
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
9ae3f9
+    }
9ae3f9
+
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
9ae3f9
             int32_t op_errno, inode_t *inode, struct iatt *buf,
9ae3f9
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
9ae3f9
 {
9ae3f9
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
9ae3f9
+
9ae3f9
+    if (op_ret >= 0) {
9ae3f9
+        pthread_mutex_lock(&pl_inode->mutex);
9ae3f9
+
9ae3f9
+        /* TODO: can happen pl_inode->links == 0 ? */
9ae3f9
+        if (pl_inode->links >= 0) {
9ae3f9
+            pl_inode->links++;
9ae3f9
+        }
9ae3f9
+
9ae3f9
+        pthread_mutex_unlock(&pl_inode->mutex);
9ae3f9
+    }
9ae3f9
+
9ae3f9
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
9ae3f9
                                preparent, postparent, xdata);
9ae3f9
     return 0;
9ae3f9
@@ -4401,9 +4547,18 @@ int
9ae3f9
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
9ae3f9
         dict_t *xdata)
9ae3f9
 {
9ae3f9
+    pl_inode_t *pl_inode;
9ae3f9
+
9ae3f9
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
9ae3f9
+    if (pl_inode == NULL) {
9ae3f9
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
9ae3f9
+                            NULL);
9ae3f9
+        return 0;
9ae3f9
+    }
9ae3f9
+
9ae3f9
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
9ae3f9
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
9ae3f9
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
9ae3f9
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
9ae3f9
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
9ae3f9
     return 0;
9ae3f9
 }
9ae3f9
 
9ae3f9
-- 
9ae3f9
1.8.3.1
9ae3f9