14f8ab
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
14f8ab
From: Xavi Hernandez <xhernandez@redhat.com>
14f8ab
Date: Fri, 12 Jun 2020 00:06:36 +0200
14f8ab
Subject: [PATCH 455/456] locks: prevent deletion of locked entries
14f8ab
14f8ab
To keep consistency inside transactions started by locking an entry or
14f8ab
an inode, this change delays the removal of entries that are currently
14f8ab
locked by one or more clients. Once all locks are released, the removal
14f8ab
is processed.
14f8ab
14f8ab
It has also been improved the detection of stale inodes in the locking
14f8ab
code of EC.
14f8ab
14f8ab
>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
14f8ab
>Fixes: #990
14f8ab
14f8ab
Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
14f8ab
BUG: 1812789
14f8ab
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
14f8ab
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
14f8ab
Tested-by: RHGS Build Bot <nigelb@redhat.com>
14f8ab
---
14f8ab
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
14f8ab
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
14f8ab
 xlators/features/locks/src/common.h  |  43 +++++
14f8ab
 xlators/features/locks/src/entrylk.c |  19 +--
14f8ab
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
14f8ab
 xlators/features/locks/src/locks.h   |  23 ++-
14f8ab
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
14f8ab
 7 files changed, 689 insertions(+), 114 deletions(-)
14f8ab
14f8ab
diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
14f8ab
index ffcac07..db86296 100644
14f8ab
--- a/xlators/cluster/ec/src/ec-locks.c
14f8ab
+++ b/xlators/cluster/ec/src/ec-locks.c
14f8ab
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
14f8ab
     ec_t *ec = fop->xl->private;
14f8ab
     ec_cbk_data_t *ans = NULL;
14f8ab
     ec_cbk_data_t *cbk = NULL;
14f8ab
-    uintptr_t locked = 0, notlocked = 0;
14f8ab
+    uintptr_t locked = 0;
14f8ab
+    int32_t good = 0;
14f8ab
+    int32_t eagain = 0;
14f8ab
+    int32_t estale = 0;
14f8ab
     int32_t error = -1;
14f8ab
 
14f8ab
+    /* There are some errors that we'll handle in an special way while trying
14f8ab
+     * to acquire a lock.
14f8ab
+     *
14f8ab
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
14f8ab
+     *            consider that there's contention on the inode, so we consider
14f8ab
+     *            the acquisition a failure and try again with a sequential
14f8ab
+     *            blocking lock request. This will ensure that we get a lock on
14f8ab
+     *            as many bricks as possible (ignoring EAGAIN here would cause
14f8ab
+     *            unnecessary triggers of self-healing).
14f8ab
+     *
14f8ab
+     *            If it's found during a sequential blocking lock request, it's
14f8ab
+     *            considered an error. Lock will only succeed if there are
14f8ab
+     *            enough other bricks locked.
14f8ab
+     *
14f8ab
+     *   ESTALE:  This can appear during parallel or sequential lock request if
14f8ab
+     *            the inode has just been unlinked. We consider this error is
14f8ab
+     *            not recoverable, but we also don't consider it as fatal. So,
14f8ab
+     *            if it happens during parallel lock, we won't attempt a
14f8ab
+     *            sequential one unless there are EAGAIN errors on other
14f8ab
+     *            bricks (and are enough to form a quorum), but if we reach
14f8ab
+     *            quorum counting the ESTALE bricks, we consider the whole
14f8ab
+     *            result of the operation is ESTALE instead of EIO.
14f8ab
+     */
14f8ab
+
14f8ab
     list_for_each_entry(ans, &fop->cbk_list, list)
14f8ab
     {
14f8ab
         if (ans->op_ret >= 0) {
14f8ab
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
14f8ab
                 error = EIO;
14f8ab
             }
14f8ab
             locked |= ans->mask;
14f8ab
+            good = ans->count;
14f8ab
             cbk = ans;
14f8ab
-        } else {
14f8ab
-            if (ans->op_errno == EAGAIN) {
14f8ab
-                switch (fop->uint32) {
14f8ab
-                    case EC_LOCK_MODE_NONE:
14f8ab
-                    case EC_LOCK_MODE_ALL:
14f8ab
-                        /* Goal is to treat non-blocking lock as failure
14f8ab
-                         * even if there is a single EAGAIN*/
14f8ab
-                        notlocked |= ans->mask;
14f8ab
-                        break;
14f8ab
-                }
14f8ab
-            }
14f8ab
+        } else if (ans->op_errno == ESTALE) {
14f8ab
+            estale += ans->count;
14f8ab
+        } else if ((ans->op_errno == EAGAIN) &&
14f8ab
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
14f8ab
+            eagain += ans->count;
14f8ab
         }
14f8ab
     }
14f8ab
 
14f8ab
     if (error == -1) {
14f8ab
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
14f8ab
-            if (notlocked == 0) {
14f8ab
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
14f8ab
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
14f8ab
+         * we retry with a sequential blocking lock request if needed.
14f8ab
+         * Otherwise we succeed. */
14f8ab
+        if ((good + eagain) >= ec->fragments) {
14f8ab
+            if (eagain == 0) {
14f8ab
                 if (fop->answer == NULL) {
14f8ab
                     fop->answer = cbk;
14f8ab
                 }
14f8ab
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
14f8ab
                     case EC_LOCK_MODE_NONE:
14f8ab
                         error = EAGAIN;
14f8ab
                         break;
14f8ab
-
14f8ab
                     case EC_LOCK_MODE_ALL:
14f8ab
                         fop->uint32 = EC_LOCK_MODE_INC;
14f8ab
                         break;
14f8ab
-
14f8ab
                     default:
14f8ab
+                        /* This shouldn't happen because eagain cannot be > 0
14f8ab
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
14f8ab
                         error = EIO;
14f8ab
                         break;
14f8ab
                 }
14f8ab
             }
14f8ab
         } else {
14f8ab
-            if (fop->answer && fop->answer->op_ret < 0)
14f8ab
+            /* We have been unable to find enough candidates that will be able
14f8ab
+             * to take the lock. If we have quorum on some answer, we return
14f8ab
+             * it. Otherwise we check if ESTALE answers allow us to reach
14f8ab
+             * quorum. If so, we return ESTALE. */
14f8ab
+            if (fop->answer && fop->answer->op_ret < 0) {
14f8ab
                 error = fop->answer->op_errno;
14f8ab
-            else
14f8ab
+            } else if ((good + eagain + estale) >= ec->fragments) {
14f8ab
+                error = ESTALE;
14f8ab
+            } else {
14f8ab
                 error = EIO;
14f8ab
+            }
14f8ab
         }
14f8ab
     }
14f8ab
 
14f8ab
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
14f8ab
index 1406e70..0c52853 100644
14f8ab
--- a/xlators/features/locks/src/common.c
14f8ab
+++ b/xlators/features/locks/src/common.c
14f8ab
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
14f8ab
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
14f8ab
         INIT_LIST_HEAD(&pl_inode->metalk_list);
14f8ab
         INIT_LIST_HEAD(&pl_inode->queued_locks);
14f8ab
+        INIT_LIST_HEAD(&pl_inode->waiting);
14f8ab
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
14f8ab
 
14f8ab
         pl_inode->check_mlock_info = _gf_true;
14f8ab
         pl_inode->mlock_enforced = _gf_false;
14f8ab
 
14f8ab
+        /* -2 means never looked up. -1 means something went wrong and link
14f8ab
+         * tracking is disabled. */
14f8ab
+        pl_inode->links = -2;
14f8ab
+
14f8ab
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
14f8ab
         if (ret) {
14f8ab
             pthread_mutex_destroy(&pl_inode->mutex);
14f8ab
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
14f8ab
     }
14f8ab
 
14f8ab
     return 0;
14f8ab
-}
14f8ab
\ No newline at end of file
14f8ab
+}
14f8ab
+
14f8ab
+gf_boolean_t
14f8ab
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
14f8ab
+{
14f8ab
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
14f8ab
+        return _gf_true;
14f8ab
+    }
14f8ab
+
14f8ab
+    if (is_lk_owner_null(owner)) {
14f8ab
+        return _gf_false;
14f8ab
+    }
14f8ab
+    return _gf_true;
14f8ab
+}
14f8ab
+
14f8ab
+static int32_t
14f8ab
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
14f8ab
+{
14f8ab
+    inode_t *inode = NULL;
14f8ab
+    int32_t error = 0;
14f8ab
+
14f8ab
+    if (loc->inode != NULL) {
14f8ab
+        inode = inode_ref(loc->inode);
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+    if (loc->parent == NULL) {
14f8ab
+        error = EINVAL;
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+    if (!gf_uuid_is_null(loc->gfid)) {
14f8ab
+        inode = inode_find(loc->parent->table, loc->gfid);
14f8ab
+        if (inode != NULL) {
14f8ab
+            goto done;
14f8ab
+        }
14f8ab
+    }
14f8ab
+
14f8ab
+    if (loc->name == NULL) {
14f8ab
+        error = EINVAL;
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
14f8ab
+    if (inode == NULL) {
14f8ab
+        /* We haven't found any inode. This means that the file doesn't exist
14f8ab
+         * or that even if it exists, we don't have any knowledge about it, so
14f8ab
+         * we don't have locks on it either, which is fine for our purposes. */
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+done:
14f8ab
+    *pinode = inode;
14f8ab
+
14f8ab
+    return error;
14f8ab
+}
14f8ab
+
14f8ab
+static gf_boolean_t
14f8ab
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
14f8ab
+                    struct timespec *now, struct list_head *contend)
14f8ab
+{
14f8ab
+    pl_dom_list_t *dom;
14f8ab
+    pl_inode_lock_t *lock;
14f8ab
+    gf_boolean_t has_owners = _gf_false;
14f8ab
+
14f8ab
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
14f8ab
+    {
14f8ab
+        list_for_each_entry(lock, &dom->inodelk_list, list)
14f8ab
+        {
14f8ab
+            /* If the lock belongs to the same client, we assume it's related
14f8ab
+             * to the same operation, so we allow the removal to continue. */
14f8ab
+            if (lock->client == client) {
14f8ab
+                continue;
14f8ab
+            }
14f8ab
+            /* If the lock belongs to an internal process, we don't block the
14f8ab
+             * removal. */
14f8ab
+            if (lock->client_pid < 0) {
14f8ab
+                continue;
14f8ab
+            }
14f8ab
+            if (contend == NULL) {
14f8ab
+                return _gf_true;
14f8ab
+            }
14f8ab
+            has_owners = _gf_true;
14f8ab
+            inodelk_contention_notify_check(xl, lock, now, contend);
14f8ab
+        }
14f8ab
+    }
14f8ab
+
14f8ab
+    return has_owners;
14f8ab
+}
14f8ab
+
14f8ab
+int32_t
14f8ab
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
14f8ab
+                        pl_inode_t **ppl_inode, struct list_head *contend)
14f8ab
+{
14f8ab
+    struct timespec now;
14f8ab
+    inode_t *inode;
14f8ab
+    pl_inode_t *pl_inode;
14f8ab
+    int32_t error;
14f8ab
+
14f8ab
+    pl_inode = NULL;
14f8ab
+
14f8ab
+    error = pl_inode_from_loc(loc, &inode;;
14f8ab
+    if ((error != 0) || (inode == NULL)) {
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+    pl_inode = pl_inode_get(xl, inode, NULL);
14f8ab
+    if (pl_inode == NULL) {
14f8ab
+        inode_unref(inode);
14f8ab
+        error = ENOMEM;
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+    /* pl_inode_from_loc() already increments ref count for inode, so
14f8ab
+     * we only assign here our reference. */
14f8ab
+    pl_inode->inode = inode;
14f8ab
+
14f8ab
+    timespec_now(&now;;
14f8ab
+
14f8ab
+    pthread_mutex_lock(&pl_inode->mutex);
14f8ab
+
14f8ab
+    if (pl_inode->removed) {
14f8ab
+        error = ESTALE;
14f8ab
+        goto unlock;
14f8ab
+    }
14f8ab
+
14f8ab
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
14f8ab
+        error = -1;
14f8ab
+        /* We skip the unlock here because the caller must create a stub when
14f8ab
+         * we return -1 and do a call to pl_inode_remove_complete(), which
14f8ab
+         * assumes the lock is still acquired and will release it once
14f8ab
+         * everything else is prepared. */
14f8ab
+        goto done;
14f8ab
+    }
14f8ab
+
14f8ab
+    pl_inode->is_locked = _gf_true;
14f8ab
+    pl_inode->remove_running++;
14f8ab
+
14f8ab
+unlock:
14f8ab
+    pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
+
14f8ab
+done:
14f8ab
+    *ppl_inode = pl_inode;
14f8ab
+
14f8ab
+    return error;
14f8ab
+}
14f8ab
+
14f8ab
+int32_t
14f8ab
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
14f8ab
+                         struct list_head *contend)
14f8ab
+{
14f8ab
+    pl_inode_lock_t *lock;
14f8ab
+    int32_t error = -1;
14f8ab
+
14f8ab
+    if (stub != NULL) {
14f8ab
+        list_add_tail(&stub->list, &pl_inode->waiting);
14f8ab
+        pl_inode->is_locked = _gf_true;
14f8ab
+    } else {
14f8ab
+        error = ENOMEM;
14f8ab
+
14f8ab
+        while (!list_empty(contend)) {
14f8ab
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
14f8ab
+            list_del_init(&lock->list);
14f8ab
+            __pl_inodelk_unref(lock);
14f8ab
+        }
14f8ab
+    }
14f8ab
+
14f8ab
+    pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
+
14f8ab
+    if (error < 0) {
14f8ab
+        inodelk_contention_notify(xl, contend);
14f8ab
+    }
14f8ab
+
14f8ab
+    inode_unref(pl_inode->inode);
14f8ab
+
14f8ab
+    return error;
14f8ab
+}
14f8ab
+
14f8ab
+void
14f8ab
+pl_inode_remove_wake(struct list_head *list)
14f8ab
+{
14f8ab
+    call_stub_t *stub;
14f8ab
+
14f8ab
+    while (!list_empty(list)) {
14f8ab
+        stub = list_first_entry(list, call_stub_t, list);
14f8ab
+        list_del_init(&stub->list);
14f8ab
+
14f8ab
+        call_resume(stub);
14f8ab
+    }
14f8ab
+}
14f8ab
+
14f8ab
+void
14f8ab
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
14f8ab
+{
14f8ab
+    struct list_head contend, granted;
14f8ab
+    struct timespec now;
14f8ab
+    pl_dom_list_t *dom;
14f8ab
+
14f8ab
+    if (pl_inode == NULL) {
14f8ab
+        return;
14f8ab
+    }
14f8ab
+
14f8ab
+    INIT_LIST_HEAD(&contend);
14f8ab
+    INIT_LIST_HEAD(&granted);
14f8ab
+    timespec_now(&now;;
14f8ab
+
14f8ab
+    pthread_mutex_lock(&pl_inode->mutex);
14f8ab
+
14f8ab
+    if (error == 0) {
14f8ab
+        if (pl_inode->links >= 0) {
14f8ab
+            pl_inode->links--;
14f8ab
+        }
14f8ab
+        if (pl_inode->links == 0) {
14f8ab
+            pl_inode->removed = _gf_true;
14f8ab
+        }
14f8ab
+    }
14f8ab
+
14f8ab
+    pl_inode->remove_running--;
14f8ab
+
14f8ab
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
14f8ab
+        pl_inode->is_locked = _gf_false;
14f8ab
+
14f8ab
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
14f8ab
+        {
14f8ab
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
14f8ab
+                                        &contend);
14f8ab
+        }
14f8ab
+    }
14f8ab
+
14f8ab
+    pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
+
14f8ab
+    unwind_granted_inodes(xl, pl_inode, &granted);
14f8ab
+
14f8ab
+    inodelk_contention_notify(xl, &contend);
14f8ab
+
14f8ab
+    inode_unref(pl_inode->inode);
14f8ab
+}
14f8ab
+
14f8ab
+void
14f8ab
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
14f8ab
+                         struct list_head *list)
14f8ab
+{
14f8ab
+    call_stub_t *stub, *tmp;
14f8ab
+
14f8ab
+    if (!pl_inode->is_locked) {
14f8ab
+        return;
14f8ab
+    }
14f8ab
+
14f8ab
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
14f8ab
+    {
14f8ab
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
14f8ab
+                                 NULL)) {
14f8ab
+            list_move_tail(&stub->list, list);
14f8ab
+        }
14f8ab
+    }
14f8ab
+}
14f8ab
+
14f8ab
+/* This function determines if an inodelk attempt can be done now or it needs
14f8ab
+ * to wait.
14f8ab
+ *
14f8ab
+ * Possible return values:
14f8ab
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
14f8ab
+ *        inode has been deleted previously by unlink/rmdir/rename
14f8ab
+ *   = 0: The lock can be attempted.
14f8ab
+ *   > 0: The lock needs to wait because a conflicting remove operation is
14f8ab
+ *        ongoing.
14f8ab
+ */
14f8ab
+int32_t
14f8ab
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
14f8ab
+{
14f8ab
+    pl_dom_list_t *dom;
14f8ab
+    pl_inode_lock_t *ilock;
14f8ab
+
14f8ab
+    /* If the inode has been deleted, we won't allow any lock. */
14f8ab
+    if (pl_inode->removed) {
14f8ab
+        return -ESTALE;
14f8ab
+    }
14f8ab
+
14f8ab
+    /* We only synchronize with locks made for regular operations coming from
14f8ab
+     * the user. Locks done for internal purposes are hard to control and could
14f8ab
+     * lead to long delays or deadlocks quite easily. */
14f8ab
+    if (lock->client_pid < 0) {
14f8ab
+        return 0;
14f8ab
+    }
14f8ab
+    if (!pl_inode->is_locked) {
14f8ab
+        return 0;
14f8ab
+    }
14f8ab
+    if (pl_inode->remove_running > 0) {
14f8ab
+        return 1;
14f8ab
+    }
14f8ab
+
14f8ab
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
14f8ab
+    {
14f8ab
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
14f8ab
+        {
14f8ab
+            /* If a lock from the same client is already granted, we allow this
14f8ab
+             * one to continue. This is necessary to prevent deadlocks when
14f8ab
+             * multiple locks are taken for the same operation.
14f8ab
+             *
14f8ab
+             * On the other side it's unlikely that the same client sends
14f8ab
+             * completely unrelated locks for the same inode.
14f8ab
+             */
14f8ab
+            if (ilock->client == lock->client) {
14f8ab
+                return 0;
14f8ab
+            }
14f8ab
+        }
14f8ab
+    }
14f8ab
+
14f8ab
+    return 1;
14f8ab
+}
14f8ab
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
14f8ab
index ea86b96..6c81ac3 100644
14f8ab
--- a/xlators/features/locks/src/common.h
14f8ab
+++ b/xlators/features/locks/src/common.h
14f8ab
@@ -105,6 +105,15 @@ void
14f8ab
 __pl_inodelk_unref(pl_inode_lock_t *lock);
14f8ab
 
14f8ab
 void
14f8ab
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
+                            struct list_head *granted, pl_dom_list_t *dom,
14f8ab
+                            struct timespec *now, struct list_head *contend);
14f8ab
+
14f8ab
+void
14f8ab
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
+                      struct list_head *granted);
14f8ab
+
14f8ab
+void
14f8ab
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
                           pl_dom_list_t *dom, struct timespec *now,
14f8ab
                           struct list_head *contend);
14f8ab
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
14f8ab
 void
14f8ab
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
14f8ab
 
14f8ab
+void
14f8ab
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
14f8ab
+                                struct timespec *now,
14f8ab
+                                struct list_head *contend);
14f8ab
+
14f8ab
+void
14f8ab
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
14f8ab
+                                struct timespec *now,
14f8ab
+                                struct list_head *contend);
14f8ab
+
14f8ab
 gf_boolean_t
14f8ab
 pl_does_monkey_want_stuck_lock();
14f8ab
 
14f8ab
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
14f8ab
 int
14f8ab
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
14f8ab
 
14f8ab
+gf_boolean_t
14f8ab
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
14f8ab
+
14f8ab
+int32_t
14f8ab
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
14f8ab
+                        pl_inode_t **ppl_inode, struct list_head *contend);
14f8ab
+
14f8ab
+int32_t
14f8ab
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
14f8ab
+                         struct list_head *contend);
14f8ab
+
14f8ab
+void
14f8ab
+pl_inode_remove_wake(struct list_head *list);
14f8ab
+
14f8ab
+void
14f8ab
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
14f8ab
+
14f8ab
+void
14f8ab
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
14f8ab
+                         struct list_head *list);
14f8ab
+
14f8ab
+int32_t
14f8ab
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
14f8ab
+
14f8ab
 #endif /* __COMMON_H__ */
14f8ab
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
14f8ab
index 93c649c..b97836f 100644
14f8ab
--- a/xlators/features/locks/src/entrylk.c
14f8ab
+++ b/xlators/features/locks/src/entrylk.c
14f8ab
@@ -197,9 +197,9 @@ out:
14f8ab
     return revoke_lock;
14f8ab
 }
14f8ab
 
14f8ab
-static gf_boolean_t
14f8ab
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
14f8ab
-                                  struct timespec *now)
14f8ab
+void
14f8ab
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
14f8ab
+                                struct timespec *now, struct list_head *contend)
14f8ab
 {
14f8ab
     posix_locks_private_t *priv;
14f8ab
     int64_t elapsed;
14f8ab
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
14f8ab
     /* If this lock is in a list, it means that we are about to send a
14f8ab
      * notification for it, so no need to do anything else. */
14f8ab
     if (!list_empty(&lock->contend)) {
14f8ab
-        return _gf_false;
14f8ab
+        return;
14f8ab
     }
14f8ab
 
14f8ab
     elapsed = now->tv_sec;
14f8ab
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
14f8ab
         elapsed--;
14f8ab
     }
14f8ab
     if (elapsed < priv->notify_contention_delay) {
14f8ab
-        return _gf_false;
14f8ab
+        return;
14f8ab
     }
14f8ab
 
14f8ab
     /* All contention notifications will be sent outside of the locked
14f8ab
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
14f8ab
 
14f8ab
     lock->contention_time = *now;
14f8ab
 
14f8ab
-    return _gf_true;
14f8ab
+    list_add_tail(&lock->contend, contend);
14f8ab
 }
14f8ab
 
14f8ab
 void
14f8ab
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
14f8ab
                     break;
14f8ab
                 }
14f8ab
             }
14f8ab
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
14f8ab
-                list_add_tail(&tmp->contend, contend);
14f8ab
-            }
14f8ab
+            entrylk_contention_notify_check(this, tmp, now, contend);
14f8ab
         }
14f8ab
     }
14f8ab
 
14f8ab
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
14f8ab
 
14f8ab
         if (bl_ret == 0) {
14f8ab
-            list_add(&bl->blocked_locks, granted);
14f8ab
+            list_add_tail(&bl->blocked_locks, granted);
14f8ab
         }
14f8ab
     }
14f8ab
-    return;
14f8ab
 }
14f8ab
 
14f8ab
 /* Grants locks if possible which are blocked on a lock */
14f8ab
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
14f8ab
index 24dee49..1a07243 100644
14f8ab
--- a/xlators/features/locks/src/inodelk.c
14f8ab
+++ b/xlators/features/locks/src/inodelk.c
14f8ab
@@ -231,9 +231,9 @@ out:
14f8ab
     return revoke_lock;
14f8ab
 }
14f8ab
 
14f8ab
-static gf_boolean_t
14f8ab
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
14f8ab
-                                  struct timespec *now)
14f8ab
+void
14f8ab
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
14f8ab
+                                struct timespec *now, struct list_head *contend)
14f8ab
 {
14f8ab
     posix_locks_private_t *priv;
14f8ab
     int64_t elapsed;
14f8ab
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
14f8ab
     /* If this lock is in a list, it means that we are about to send a
14f8ab
      * notification for it, so no need to do anything else. */
14f8ab
     if (!list_empty(&lock->contend)) {
14f8ab
-        return _gf_false;
14f8ab
+        return;
14f8ab
     }
14f8ab
 
14f8ab
     elapsed = now->tv_sec;
14f8ab
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
14f8ab
         elapsed--;
14f8ab
     }
14f8ab
     if (elapsed < priv->notify_contention_delay) {
14f8ab
-        return _gf_false;
14f8ab
+        return;
14f8ab
     }
14f8ab
 
14f8ab
     /* All contention notifications will be sent outside of the locked
14f8ab
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
14f8ab
 
14f8ab
     lock->contention_time = *now;
14f8ab
 
14f8ab
-    return _gf_true;
14f8ab
+    list_add_tail(&lock->contend, contend);
14f8ab
 }
14f8ab
 
14f8ab
 void
14f8ab
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
14f8ab
                     break;
14f8ab
                 }
14f8ab
             }
14f8ab
-            if (__inodelk_needs_contention_notify(this, l, now)) {
14f8ab
-                list_add_tail(&l->contend, contend);
14f8ab
-            }
14f8ab
+            inodelk_contention_notify_check(this, l, now, contend);
14f8ab
         }
14f8ab
     }
14f8ab
 
14f8ab
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
14f8ab
                struct list_head *contend)
14f8ab
 {
14f8ab
     pl_inode_lock_t *conf = NULL;
14f8ab
-    int ret = -EINVAL;
14f8ab
+    int ret;
14f8ab
 
14f8ab
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
14f8ab
-    if (conf) {
14f8ab
-        ret = __lock_blocked_add(this, dom, lock, can_block);
14f8ab
-        goto out;
14f8ab
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
14f8ab
+    if (ret < 0) {
14f8ab
+        return ret;
14f8ab
+    }
14f8ab
+    if (ret == 0) {
14f8ab
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
14f8ab
+    }
14f8ab
+    if ((ret > 0) || (conf != NULL)) {
14f8ab
+        return __lock_blocked_add(this, dom, lock, can_block);
14f8ab
     }
14f8ab
 
14f8ab
     /* To prevent blocked locks starvation, check if there are any blocked
14f8ab
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
14f8ab
                    "starvation");
14f8ab
         }
14f8ab
 
14f8ab
-        ret = __lock_blocked_add(this, dom, lock, can_block);
14f8ab
-        goto out;
14f8ab
+        return __lock_blocked_add(this, dom, lock, can_block);
14f8ab
     }
14f8ab
     __pl_inodelk_ref(lock);
14f8ab
     gettimeofday(&lock->granted_time, NULL);
14f8ab
     list_add(&lock->list, &dom->inodelk_list);
14f8ab
 
14f8ab
-    ret = 0;
14f8ab
-
14f8ab
-out:
14f8ab
-    return ret;
14f8ab
+    return 0;
14f8ab
 }
14f8ab
 
14f8ab
 /* Return true if the two inodelks have exactly same lock boundaries */
14f8ab
@@ -529,12 +528,11 @@ out:
14f8ab
     return conf;
14f8ab
 }
14f8ab
 
14f8ab
-static void
14f8ab
+void
14f8ab
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
                             struct list_head *granted, pl_dom_list_t *dom,
14f8ab
                             struct timespec *now, struct list_head *contend)
14f8ab
 {
14f8ab
-    int bl_ret = 0;
14f8ab
     pl_inode_lock_t *bl = NULL;
14f8ab
     pl_inode_lock_t *tmp = NULL;
14f8ab
 
14f8ab
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
     {
14f8ab
         list_del_init(&bl->blocked_locks);
14f8ab
 
14f8ab
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
14f8ab
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
14f8ab
 
14f8ab
-        if (bl_ret == 0) {
14f8ab
-            list_add(&bl->blocked_locks, granted);
14f8ab
+        if (bl->status != -EAGAIN) {
14f8ab
+            list_add_tail(&bl->blocked_locks, granted);
14f8ab
         }
14f8ab
     }
14f8ab
-    return;
14f8ab
 }
14f8ab
 
14f8ab
-/* Grant all inodelks blocked on a lock */
14f8ab
 void
14f8ab
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
-                          pl_dom_list_t *dom, struct timespec *now,
14f8ab
-                          struct list_head *contend)
14f8ab
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
+                      struct list_head *granted)
14f8ab
 {
14f8ab
-    struct list_head granted;
14f8ab
     pl_inode_lock_t *lock;
14f8ab
     pl_inode_lock_t *tmp;
14f8ab
+    int32_t op_ret;
14f8ab
+    int32_t op_errno;
14f8ab
 
14f8ab
-    INIT_LIST_HEAD(&granted);
14f8ab
-
14f8ab
-    pthread_mutex_lock(&pl_inode->mutex);
14f8ab
-    {
14f8ab
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
14f8ab
-                                    contend);
14f8ab
-    }
14f8ab
-    pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
-
14f8ab
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
14f8ab
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
14f8ab
     {
14f8ab
-        gf_log(this->name, GF_LOG_TRACE,
14f8ab
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
14f8ab
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
14f8ab
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
14f8ab
-               lock->user_flock.l_len);
14f8ab
-
14f8ab
+        if (lock->status == 0) {
14f8ab
+            op_ret = 0;
14f8ab
+            op_errno = 0;
14f8ab
+            gf_log(this->name, GF_LOG_TRACE,
14f8ab
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
14f8ab
+                   " => Granted",
14f8ab
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
14f8ab
+                   lock->client_pid, lkowner_utoa(&lock->owner),
14f8ab
+                   lock->user_flock.l_start, lock->user_flock.l_len);
14f8ab
+        } else {
14f8ab
+            op_ret = -1;
14f8ab
+            op_errno = -lock->status;
14f8ab
+        }
14f8ab
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
14f8ab
-                     0, 0, lock->volume);
14f8ab
+                     op_ret, op_errno, lock->volume);
14f8ab
 
14f8ab
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
14f8ab
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
14f8ab
         lock->frame = NULL;
14f8ab
     }
14f8ab
 
14f8ab
     pthread_mutex_lock(&pl_inode->mutex);
14f8ab
     {
14f8ab
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
14f8ab
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
14f8ab
         {
14f8ab
             list_del_init(&lock->blocked_locks);
14f8ab
             __pl_inodelk_unref(lock);
14f8ab
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
     pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
 }
14f8ab
 
14f8ab
+/* Grant all inodelks blocked on a lock */
14f8ab
+void
14f8ab
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
14f8ab
+                          pl_dom_list_t *dom, struct timespec *now,
14f8ab
+                          struct list_head *contend)
14f8ab
+{
14f8ab
+    struct list_head granted;
14f8ab
+
14f8ab
+    INIT_LIST_HEAD(&granted);
14f8ab
+
14f8ab
+    pthread_mutex_lock(&pl_inode->mutex);
14f8ab
+    {
14f8ab
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
14f8ab
+                                    contend);
14f8ab
+    }
14f8ab
+    pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
+
14f8ab
+    unwind_granted_inodes(this, pl_inode, &granted);
14f8ab
+}
14f8ab
+
14f8ab
 static void
14f8ab
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
14f8ab
 {
14f8ab
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
14f8ab
                  * and blocked lists, then this means that a parallel
14f8ab
                  * unlock on another inodelk (L2 say) may have 'granted'
14f8ab
                  * L1 and added it to 'granted' list in
14f8ab
-                 * __grant_blocked_node_locks() (although using the
14f8ab
+                 * __grant_blocked_inode_locks() (although using the
14f8ab
                  * 'blocked_locks' member). In that case, the cleanup
14f8ab
                  * codepath must try and grant other overlapping
14f8ab
                  * blocked inodelks from other clients, now that L1 is
14f8ab
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
14f8ab
     gf_boolean_t need_inode_unref = _gf_false;
14f8ab
     struct list_head *pcontend = NULL;
14f8ab
     struct list_head contend;
14f8ab
+    struct list_head wake;
14f8ab
     struct timespec now = {};
14f8ab
     short fl_type;
14f8ab
 
14f8ab
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
14f8ab
         timespec_now(&now;;
14f8ab
     }
14f8ab
 
14f8ab
+    INIT_LIST_HEAD(&wake);
14f8ab
+
14f8ab
     if (ctx)
14f8ab
         pthread_mutex_lock(&ctx->lock);
14f8ab
     pthread_mutex_lock(&pl_inode->mutex);
14f8ab
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
14f8ab
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
14f8ab
                        lock->client_pid, lkowner_utoa(&lock->owner),
14f8ab
                        lock->user_flock.l_start, lock->user_flock.l_len);
14f8ab
-                if (can_block)
14f8ab
+                if (can_block) {
14f8ab
                     unref = _gf_false;
14f8ab
-                /* For all but the case where a non-blocking
14f8ab
-                 * lock attempt fails, the extra ref taken at
14f8ab
-                 * the start of this function must be negated.
14f8ab
-                 */
14f8ab
-                else
14f8ab
-                    need_inode_unref = _gf_true;
14f8ab
+                }
14f8ab
             }
14f8ab
-
14f8ab
-            if (ctx && (!ret || can_block))
14f8ab
+            /* For all but the case where a non-blocking lock attempt fails
14f8ab
+             * with -EAGAIN, the extra ref taken at the start of this function
14f8ab
+             * must be negated. */
14f8ab
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
14f8ab
+            if (ctx && !need_inode_unref) {
14f8ab
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
14f8ab
+            }
14f8ab
         } else {
14f8ab
             /* Irrespective of whether unlock succeeds or not,
14f8ab
              * the extra inode ref that was done at the start of
14f8ab
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
14f8ab
             list_del_init(&retlock->client_list);
14f8ab
             __pl_inodelk_unref(retlock);
14f8ab
 
14f8ab
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
14f8ab
+
14f8ab
             ret = 0;
14f8ab
         }
14f8ab
     out:
14f8ab
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
14f8ab
     if (ctx)
14f8ab
         pthread_mutex_unlock(&ctx->lock);
14f8ab
 
14f8ab
+    pl_inode_remove_wake(&wake);
14f8ab
+
14f8ab
     /* The following (extra) unref corresponds to the ref that
14f8ab
      * was done at the time the lock was granted.
14f8ab
      */
14f8ab
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
14f8ab
                                  inode);
14f8ab
 
14f8ab
             if (ret < 0) {
14f8ab
-                if ((can_block) && (F_UNLCK != lock_type)) {
14f8ab
-                    goto out;
14f8ab
+                if (ret == -EAGAIN) {
14f8ab
+                    if (can_block && (F_UNLCK != lock_type)) {
14f8ab
+                        goto out;
14f8ab
+                    }
14f8ab
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
14f8ab
+                } else {
14f8ab
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
14f8ab
                 }
14f8ab
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
14f8ab
                 op_errno = -ret;
14f8ab
                 goto unwind;
14f8ab
             }
14f8ab
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
14f8ab
index aa267de..6666feb 100644
14f8ab
--- a/xlators/features/locks/src/locks.h
14f8ab
+++ b/xlators/features/locks/src/locks.h
14f8ab
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
14f8ab
 
14f8ab
     struct list_head client_list; /* list of all locks from a client */
14f8ab
     short fl_type;
14f8ab
+
14f8ab
+    int32_t status; /* Error code when we try to grant a lock in blocked
14f8ab
+                       state */
14f8ab
 };
14f8ab
 typedef struct __pl_inode_lock pl_inode_lock_t;
14f8ab
 
14f8ab
@@ -164,13 +167,14 @@ struct __pl_inode {
14f8ab
     struct list_head rw_list;            /* list of waiting r/w requests */
14f8ab
     struct list_head reservelk_list;     /* list of reservelks */
14f8ab
     struct list_head blocked_reservelks; /* list of blocked reservelks */
14f8ab
-    struct list_head
14f8ab
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
14f8ab
-    struct list_head metalk_list; /* Meta lock list */
14f8ab
-                                  /* This is to store the incoming lock
14f8ab
-                                     requests while meta lock is enabled */
14f8ab
-    struct list_head queued_locks;
14f8ab
-    int mandatory; /* if mandatory locking is enabled */
14f8ab
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
14f8ab
+                                            reserve is held*/
14f8ab
+    struct list_head metalk_list;        /* Meta lock list */
14f8ab
+    struct list_head queued_locks;       /* This is to store the incoming lock
14f8ab
+                                            requests while meta lock is enabled */
14f8ab
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
14f8ab
+                                 the inode. */
14f8ab
+    int mandatory;            /* if mandatory locking is enabled */
14f8ab
 
14f8ab
     inode_t *refkeeper; /* hold refs on an inode while locks are
14f8ab
                            held to prevent pruning */
14f8ab
@@ -197,6 +201,11 @@ struct __pl_inode {
14f8ab
     */
14f8ab
     int fop_wind_count;
14f8ab
     pthread_cond_t check_fop_wind_count;
14f8ab
+
14f8ab
+    int32_t links;           /* Number of hard links the inode has. */
14f8ab
+    uint32_t remove_running; /* Number of remove operations running. */
14f8ab
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
14f8ab
+    gf_boolean_t removed;    /* The inode has been deleted. */
14f8ab
 };
14f8ab
 typedef struct __pl_inode pl_inode_t;
14f8ab
 
14f8ab
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
14f8ab
index 7887b82..5ae0125 100644
14f8ab
--- a/xlators/features/locks/src/posix.c
14f8ab
+++ b/xlators/features/locks/src/posix.c
14f8ab
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
14f8ab
         }                                                                      \
14f8ab
     } while (0)
14f8ab
 
14f8ab
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
14f8ab
+                        _args...)                                              \
14f8ab
+    ({                                                                         \
14f8ab
+        struct list_head contend;                                              \
14f8ab
+        pl_inode_t *__pl_inode;                                                \
14f8ab
+        call_stub_t *__stub;                                                   \
14f8ab
+        int32_t __error;                                                       \
14f8ab
+        INIT_LIST_HEAD(&contend);                                              \
14f8ab
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
14f8ab
+                                          &__pl_inode, &contend);              \
14f8ab
+        if (__error < 0) {                                                     \
14f8ab
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
14f8ab
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
14f8ab
+                                               &contend);                      \
14f8ab
+        } else if (__error == 0) {                                             \
14f8ab
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
14f8ab
+                                  _loc2);                                      \
14f8ab
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
14f8ab
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
14f8ab
+        }                                                                      \
14f8ab
+        __error;                                                               \
14f8ab
+    })
14f8ab
+
14f8ab
 gf_boolean_t
14f8ab
 pl_has_xdata_requests(dict_t *xdata)
14f8ab
 {
14f8ab
@@ -2969,11 +2992,85 @@ out:
14f8ab
     return ret;
14f8ab
 }
14f8ab
 
14f8ab
+static int32_t
14f8ab
+pl_request_link_count(dict_t **pxdata)
14f8ab
+{
14f8ab
+    dict_t *xdata;
14f8ab
+
14f8ab
+    xdata = *pxdata;
14f8ab
+    if (xdata == NULL) {
14f8ab
+        xdata = dict_new();
14f8ab
+        if (xdata == NULL) {
14f8ab
+            return ENOMEM;
14f8ab
+        }
14f8ab
+    } else {
14f8ab
+        dict_ref(xdata);
14f8ab
+    }
14f8ab
+
14f8ab
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
14f8ab
+        dict_unref(xdata);
14f8ab
+        return ENOMEM;
14f8ab
+    }
14f8ab
+
14f8ab
+    *pxdata = xdata;
14f8ab
+
14f8ab
+    return 0;
14f8ab
+}
14f8ab
+
14f8ab
+static int32_t
14f8ab
+pl_check_link_count(dict_t *xdata)
14f8ab
+{
14f8ab
+    int32_t count;
14f8ab
+
14f8ab
+    /* In case we are unable to read the link count from xdata, we take a
14f8ab
+     * conservative approach and return -2, which will prevent the inode from
14f8ab
+     * being considered deleted. In fact it will cause link tracking for this
14f8ab
+     * inode to be disabled completely to avoid races. */
14f8ab
+
14f8ab
+    if (xdata == NULL) {
14f8ab
+        return -2;
14f8ab
+    }
14f8ab
+
14f8ab
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
14f8ab
+        return -2;
14f8ab
+    }
14f8ab
+
14f8ab
+    return count;
14f8ab
+}
14f8ab
+
14f8ab
 int32_t
14f8ab
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
14f8ab
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
14f8ab
               struct iatt *postparent)
14f8ab
 {
14f8ab
+    pl_inode_t *pl_inode;
14f8ab
+
14f8ab
+    if (op_ret >= 0) {
14f8ab
+        pl_inode = pl_inode_get(this, inode, NULL);
14f8ab
+        if (pl_inode == NULL) {
14f8ab
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
14f8ab
+                            NULL);
14f8ab
+            return 0;
14f8ab
+        }
14f8ab
+
14f8ab
+        pthread_mutex_lock(&pl_inode->mutex);
14f8ab
+
14f8ab
+        /* We only update the link count if we previously didn't know it.
14f8ab
+         * Doing it always can lead to races since lookup is not executed
14f8ab
+         * atomically most of the times. */
14f8ab
+        if (pl_inode->links == -2) {
14f8ab
+            pl_inode->links = pl_check_link_count(xdata);
14f8ab
+            if (buf->ia_type == IA_IFDIR) {
14f8ab
+                /* Directories have at least 2 links. To avoid special handling
14f8ab
+                 * for directories, we simply decrement the value here to make
14f8ab
+                 * them equivalent to regular files. */
14f8ab
+                pl_inode->links--;
14f8ab
+            }
14f8ab
+        }
14f8ab
+
14f8ab
+        pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
+    }
14f8ab
+
14f8ab
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
14f8ab
                     postparent);
14f8ab
     return 0;
14f8ab
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
14f8ab
 int32_t
14f8ab
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
14f8ab
 {
14f8ab
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
14f8ab
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
14f8ab
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
14f8ab
+    int32_t error;
14f8ab
+
14f8ab
+    error = pl_request_link_count(&xdata);
14f8ab
+    if (error == 0) {
14f8ab
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
14f8ab
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
14f8ab
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
14f8ab
+        dict_unref(xdata);
14f8ab
+    } else {
14f8ab
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
14f8ab
+    }
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -3792,6 +3897,10 @@ unlock:
14f8ab
             gf_proc_dump_write("posixlk-count", "%d", count);
14f8ab
             __dump_posixlks(pl_inode);
14f8ab
         }
14f8ab
+
14f8ab
+        gf_proc_dump_write("links", "%d", pl_inode->links);
14f8ab
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
14f8ab
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
14f8ab
     }
14f8ab
     pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
 
14f8ab
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
14f8ab
               struct iatt *postoldparent, struct iatt *prenewparent,
14f8ab
               struct iatt *postnewparent, dict_t *xdata)
14f8ab
 {
14f8ab
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
14f8ab
+
14f8ab
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
14f8ab
                     postoldparent, prenewparent, postnewparent, xdata);
14f8ab
+
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -4146,10 +4258,15 @@ int32_t
14f8ab
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
14f8ab
           dict_t *xdata)
14f8ab
 {
14f8ab
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
14f8ab
+    int32_t error;
14f8ab
+
14f8ab
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
14f8ab
+                            pl_rename_cbk, oldloc, newloc, xdata);
14f8ab
+    if (error > 0) {
14f8ab
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
14f8ab
+                            NULL, NULL);
14f8ab
+    }
14f8ab
 
14f8ab
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
14f8ab
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
14f8ab
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
14f8ab
               dict_t *xdata)
14f8ab
 {
14f8ab
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
14f8ab
+
14f8ab
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
14f8ab
                     postparent, xdata);
14f8ab
+
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -4282,9 +4402,14 @@ int32_t
14f8ab
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
14f8ab
           dict_t *xdata)
14f8ab
 {
14f8ab
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
14f8ab
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
14f8ab
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
14f8ab
+    int32_t error;
14f8ab
+
14f8ab
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
14f8ab
+                            pl_unlink_cbk, loc, xflag, xdata);
14f8ab
+    if (error > 0) {
14f8ab
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
14f8ab
+    }
14f8ab
+
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
14f8ab
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
14f8ab
              dict_t *xdata)
14f8ab
 {
14f8ab
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
14f8ab
+
14f8ab
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
14f8ab
                                postparent, xdata);
14f8ab
+
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -4360,9 +4488,14 @@ int
14f8ab
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
14f8ab
          dict_t *xdata)
14f8ab
 {
14f8ab
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
14f8ab
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
14f8ab
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
14f8ab
+    int32_t error;
14f8ab
+
14f8ab
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
14f8ab
+                            pl_rmdir_cbk, loc, xflags, xdata);
14f8ab
+    if (error > 0) {
14f8ab
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
14f8ab
+    }
14f8ab
+
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
14f8ab
             int32_t op_errno, inode_t *inode, struct iatt *buf,
14f8ab
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
14f8ab
 {
14f8ab
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
14f8ab
+
14f8ab
+    if (op_ret >= 0) {
14f8ab
+        pthread_mutex_lock(&pl_inode->mutex);
14f8ab
+
14f8ab
+        /* TODO: can happen pl_inode->links == 0 ? */
14f8ab
+        if (pl_inode->links >= 0) {
14f8ab
+            pl_inode->links++;
14f8ab
+        }
14f8ab
+
14f8ab
+        pthread_mutex_unlock(&pl_inode->mutex);
14f8ab
+    }
14f8ab
+
14f8ab
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
14f8ab
                                preparent, postparent, xdata);
14f8ab
     return 0;
14f8ab
@@ -4401,9 +4547,18 @@ int
14f8ab
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
14f8ab
         dict_t *xdata)
14f8ab
 {
14f8ab
+    pl_inode_t *pl_inode;
14f8ab
+
14f8ab
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
14f8ab
+    if (pl_inode == NULL) {
14f8ab
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
14f8ab
+                            NULL);
14f8ab
+        return 0;
14f8ab
+    }
14f8ab
+
14f8ab
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
14f8ab
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
14f8ab
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
14f8ab
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
14f8ab
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
14f8ab
     return 0;
14f8ab
 }
14f8ab
 
14f8ab
-- 
14f8ab
1.8.3.1
14f8ab