190130
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
190130
From: Xavi Hernandez <xhernandez@redhat.com>
190130
Date: Fri, 12 Jun 2020 00:06:36 +0200
190130
Subject: [PATCH 455/456] locks: prevent deletion of locked entries
190130
190130
To keep consistency inside transactions started by locking an entry or
190130
an inode, this change delays the removal of entries that are currently
190130
locked by one or more clients. Once all locks are released, the removal
190130
is processed.
190130
190130
It has also been improved the detection of stale inodes in the locking
190130
code of EC.
190130
190130
>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
190130
>Fixes: #990
190130
190130
Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
190130
BUG: 1812789
190130
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
190130
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
190130
Tested-by: RHGS Build Bot <nigelb@redhat.com>
190130
---
190130
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
190130
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
190130
 xlators/features/locks/src/common.h  |  43 +++++
190130
 xlators/features/locks/src/entrylk.c |  19 +--
190130
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
190130
 xlators/features/locks/src/locks.h   |  23 ++-
190130
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
190130
 7 files changed, 689 insertions(+), 114 deletions(-)
190130
190130
diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
190130
index ffcac07..db86296 100644
190130
--- a/xlators/cluster/ec/src/ec-locks.c
190130
+++ b/xlators/cluster/ec/src/ec-locks.c
190130
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
190130
     ec_t *ec = fop->xl->private;
190130
     ec_cbk_data_t *ans = NULL;
190130
     ec_cbk_data_t *cbk = NULL;
190130
-    uintptr_t locked = 0, notlocked = 0;
190130
+    uintptr_t locked = 0;
190130
+    int32_t good = 0;
190130
+    int32_t eagain = 0;
190130
+    int32_t estale = 0;
190130
     int32_t error = -1;
190130
 
190130
+    /* There are some errors that we'll handle in an special way while trying
190130
+     * to acquire a lock.
190130
+     *
190130
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
190130
+     *            consider that there's contention on the inode, so we consider
190130
+     *            the acquisition a failure and try again with a sequential
190130
+     *            blocking lock request. This will ensure that we get a lock on
190130
+     *            as many bricks as possible (ignoring EAGAIN here would cause
190130
+     *            unnecessary triggers of self-healing).
190130
+     *
190130
+     *            If it's found during a sequential blocking lock request, it's
190130
+     *            considered an error. Lock will only succeed if there are
190130
+     *            enough other bricks locked.
190130
+     *
190130
+     *   ESTALE:  This can appear during parallel or sequential lock request if
190130
+     *            the inode has just been unlinked. We consider this error is
190130
+     *            not recoverable, but we also don't consider it as fatal. So,
190130
+     *            if it happens during parallel lock, we won't attempt a
190130
+     *            sequential one unless there are EAGAIN errors on other
190130
+     *            bricks (and are enough to form a quorum), but if we reach
190130
+     *            quorum counting the ESTALE bricks, we consider the whole
190130
+     *            result of the operation is ESTALE instead of EIO.
190130
+     */
190130
+
190130
     list_for_each_entry(ans, &fop->cbk_list, list)
190130
     {
190130
         if (ans->op_ret >= 0) {
190130
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
190130
                 error = EIO;
190130
             }
190130
             locked |= ans->mask;
190130
+            good = ans->count;
190130
             cbk = ans;
190130
-        } else {
190130
-            if (ans->op_errno == EAGAIN) {
190130
-                switch (fop->uint32) {
190130
-                    case EC_LOCK_MODE_NONE:
190130
-                    case EC_LOCK_MODE_ALL:
190130
-                        /* Goal is to treat non-blocking lock as failure
190130
-                         * even if there is a single EAGAIN*/
190130
-                        notlocked |= ans->mask;
190130
-                        break;
190130
-                }
190130
-            }
190130
+        } else if (ans->op_errno == ESTALE) {
190130
+            estale += ans->count;
190130
+        } else if ((ans->op_errno == EAGAIN) &&
190130
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
190130
+            eagain += ans->count;
190130
         }
190130
     }
190130
 
190130
     if (error == -1) {
190130
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
190130
-            if (notlocked == 0) {
190130
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
190130
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
190130
+         * we retry with a sequential blocking lock request if needed.
190130
+         * Otherwise we succeed. */
190130
+        if ((good + eagain) >= ec->fragments) {
190130
+            if (eagain == 0) {
190130
                 if (fop->answer == NULL) {
190130
                     fop->answer = cbk;
190130
                 }
190130
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
190130
                     case EC_LOCK_MODE_NONE:
190130
                         error = EAGAIN;
190130
                         break;
190130
-
190130
                     case EC_LOCK_MODE_ALL:
190130
                         fop->uint32 = EC_LOCK_MODE_INC;
190130
                         break;
190130
-
190130
                     default:
190130
+                        /* This shouldn't happen because eagain cannot be > 0
190130
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
190130
                         error = EIO;
190130
                         break;
190130
                 }
190130
             }
190130
         } else {
190130
-            if (fop->answer && fop->answer->op_ret < 0)
190130
+            /* We have been unable to find enough candidates that will be able
190130
+             * to take the lock. If we have quorum on some answer, we return
190130
+             * it. Otherwise we check if ESTALE answers allow us to reach
190130
+             * quorum. If so, we return ESTALE. */
190130
+            if (fop->answer && fop->answer->op_ret < 0) {
190130
                 error = fop->answer->op_errno;
190130
-            else
190130
+            } else if ((good + eagain + estale) >= ec->fragments) {
190130
+                error = ESTALE;
190130
+            } else {
190130
                 error = EIO;
190130
+            }
190130
         }
190130
     }
190130
 
190130
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
190130
index 1406e70..0c52853 100644
190130
--- a/xlators/features/locks/src/common.c
190130
+++ b/xlators/features/locks/src/common.c
190130
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
190130
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
190130
         INIT_LIST_HEAD(&pl_inode->metalk_list);
190130
         INIT_LIST_HEAD(&pl_inode->queued_locks);
190130
+        INIT_LIST_HEAD(&pl_inode->waiting);
190130
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
190130
 
190130
         pl_inode->check_mlock_info = _gf_true;
190130
         pl_inode->mlock_enforced = _gf_false;
190130
 
190130
+        /* -2 means never looked up. -1 means something went wrong and link
190130
+         * tracking is disabled. */
190130
+        pl_inode->links = -2;
190130
+
190130
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
190130
         if (ret) {
190130
             pthread_mutex_destroy(&pl_inode->mutex);
190130
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
190130
     }
190130
 
190130
     return 0;
190130
-}
190130
\ No newline at end of file
190130
+}
190130
+
190130
+gf_boolean_t
190130
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
190130
+{
190130
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
190130
+        return _gf_true;
190130
+    }
190130
+
190130
+    if (is_lk_owner_null(owner)) {
190130
+        return _gf_false;
190130
+    }
190130
+    return _gf_true;
190130
+}
190130
+
190130
+static int32_t
190130
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
190130
+{
190130
+    inode_t *inode = NULL;
190130
+    int32_t error = 0;
190130
+
190130
+    if (loc->inode != NULL) {
190130
+        inode = inode_ref(loc->inode);
190130
+        goto done;
190130
+    }
190130
+
190130
+    if (loc->parent == NULL) {
190130
+        error = EINVAL;
190130
+        goto done;
190130
+    }
190130
+
190130
+    if (!gf_uuid_is_null(loc->gfid)) {
190130
+        inode = inode_find(loc->parent->table, loc->gfid);
190130
+        if (inode != NULL) {
190130
+            goto done;
190130
+        }
190130
+    }
190130
+
190130
+    if (loc->name == NULL) {
190130
+        error = EINVAL;
190130
+        goto done;
190130
+    }
190130
+
190130
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
190130
+    if (inode == NULL) {
190130
+        /* We haven't found any inode. This means that the file doesn't exist
190130
+         * or that even if it exists, we don't have any knowledge about it, so
190130
+         * we don't have locks on it either, which is fine for our purposes. */
190130
+        goto done;
190130
+    }
190130
+
190130
+done:
190130
+    *pinode = inode;
190130
+
190130
+    return error;
190130
+}
190130
+
190130
+static gf_boolean_t
190130
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
190130
+                    struct timespec *now, struct list_head *contend)
190130
+{
190130
+    pl_dom_list_t *dom;
190130
+    pl_inode_lock_t *lock;
190130
+    gf_boolean_t has_owners = _gf_false;
190130
+
190130
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
190130
+    {
190130
+        list_for_each_entry(lock, &dom->inodelk_list, list)
190130
+        {
190130
+            /* If the lock belongs to the same client, we assume it's related
190130
+             * to the same operation, so we allow the removal to continue. */
190130
+            if (lock->client == client) {
190130
+                continue;
190130
+            }
190130
+            /* If the lock belongs to an internal process, we don't block the
190130
+             * removal. */
190130
+            if (lock->client_pid < 0) {
190130
+                continue;
190130
+            }
190130
+            if (contend == NULL) {
190130
+                return _gf_true;
190130
+            }
190130
+            has_owners = _gf_true;
190130
+            inodelk_contention_notify_check(xl, lock, now, contend);
190130
+        }
190130
+    }
190130
+
190130
+    return has_owners;
190130
+}
190130
+
190130
+int32_t
190130
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
190130
+                        pl_inode_t **ppl_inode, struct list_head *contend)
190130
+{
190130
+    struct timespec now;
190130
+    inode_t *inode;
190130
+    pl_inode_t *pl_inode;
190130
+    int32_t error;
190130
+
190130
+    pl_inode = NULL;
190130
+
190130
+    error = pl_inode_from_loc(loc, &inode;;
190130
+    if ((error != 0) || (inode == NULL)) {
190130
+        goto done;
190130
+    }
190130
+
190130
+    pl_inode = pl_inode_get(xl, inode, NULL);
190130
+    if (pl_inode == NULL) {
190130
+        inode_unref(inode);
190130
+        error = ENOMEM;
190130
+        goto done;
190130
+    }
190130
+
190130
+    /* pl_inode_from_loc() already increments ref count for inode, so
190130
+     * we only assign here our reference. */
190130
+    pl_inode->inode = inode;
190130
+
190130
+    timespec_now(&now;;
190130
+
190130
+    pthread_mutex_lock(&pl_inode->mutex);
190130
+
190130
+    if (pl_inode->removed) {
190130
+        error = ESTALE;
190130
+        goto unlock;
190130
+    }
190130
+
190130
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
190130
+        error = -1;
190130
+        /* We skip the unlock here because the caller must create a stub when
190130
+         * we return -1 and do a call to pl_inode_remove_complete(), which
190130
+         * assumes the lock is still acquired and will release it once
190130
+         * everything else is prepared. */
190130
+        goto done;
190130
+    }
190130
+
190130
+    pl_inode->is_locked = _gf_true;
190130
+    pl_inode->remove_running++;
190130
+
190130
+unlock:
190130
+    pthread_mutex_unlock(&pl_inode->mutex);
190130
+
190130
+done:
190130
+    *ppl_inode = pl_inode;
190130
+
190130
+    return error;
190130
+}
190130
+
190130
+int32_t
190130
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
190130
+                         struct list_head *contend)
190130
+{
190130
+    pl_inode_lock_t *lock;
190130
+    int32_t error = -1;
190130
+
190130
+    if (stub != NULL) {
190130
+        list_add_tail(&stub->list, &pl_inode->waiting);
190130
+        pl_inode->is_locked = _gf_true;
190130
+    } else {
190130
+        error = ENOMEM;
190130
+
190130
+        while (!list_empty(contend)) {
190130
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
190130
+            list_del_init(&lock->list);
190130
+            __pl_inodelk_unref(lock);
190130
+        }
190130
+    }
190130
+
190130
+    pthread_mutex_unlock(&pl_inode->mutex);
190130
+
190130
+    if (error < 0) {
190130
+        inodelk_contention_notify(xl, contend);
190130
+    }
190130
+
190130
+    inode_unref(pl_inode->inode);
190130
+
190130
+    return error;
190130
+}
190130
+
190130
+void
190130
+pl_inode_remove_wake(struct list_head *list)
190130
+{
190130
+    call_stub_t *stub;
190130
+
190130
+    while (!list_empty(list)) {
190130
+        stub = list_first_entry(list, call_stub_t, list);
190130
+        list_del_init(&stub->list);
190130
+
190130
+        call_resume(stub);
190130
+    }
190130
+}
190130
+
190130
+void
190130
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
190130
+{
190130
+    struct list_head contend, granted;
190130
+    struct timespec now;
190130
+    pl_dom_list_t *dom;
190130
+
190130
+    if (pl_inode == NULL) {
190130
+        return;
190130
+    }
190130
+
190130
+    INIT_LIST_HEAD(&contend);
190130
+    INIT_LIST_HEAD(&granted);
190130
+    timespec_now(&now;;
190130
+
190130
+    pthread_mutex_lock(&pl_inode->mutex);
190130
+
190130
+    if (error == 0) {
190130
+        if (pl_inode->links >= 0) {
190130
+            pl_inode->links--;
190130
+        }
190130
+        if (pl_inode->links == 0) {
190130
+            pl_inode->removed = _gf_true;
190130
+        }
190130
+    }
190130
+
190130
+    pl_inode->remove_running--;
190130
+
190130
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
190130
+        pl_inode->is_locked = _gf_false;
190130
+
190130
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
190130
+        {
190130
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
190130
+                                        &contend);
190130
+        }
190130
+    }
190130
+
190130
+    pthread_mutex_unlock(&pl_inode->mutex);
190130
+
190130
+    unwind_granted_inodes(xl, pl_inode, &granted);
190130
+
190130
+    inodelk_contention_notify(xl, &contend);
190130
+
190130
+    inode_unref(pl_inode->inode);
190130
+}
190130
+
190130
+void
190130
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
190130
+                         struct list_head *list)
190130
+{
190130
+    call_stub_t *stub, *tmp;
190130
+
190130
+    if (!pl_inode->is_locked) {
190130
+        return;
190130
+    }
190130
+
190130
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
190130
+    {
190130
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
190130
+                                 NULL)) {
190130
+            list_move_tail(&stub->list, list);
190130
+        }
190130
+    }
190130
+}
190130
+
190130
+/* This function determines if an inodelk attempt can be done now or it needs
190130
+ * to wait.
190130
+ *
190130
+ * Possible return values:
190130
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
190130
+ *        inode has been deleted previously by unlink/rmdir/rename
190130
+ *   = 0: The lock can be attempted.
190130
+ *   > 0: The lock needs to wait because a conflicting remove operation is
190130
+ *        ongoing.
190130
+ */
190130
+int32_t
190130
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
190130
+{
190130
+    pl_dom_list_t *dom;
190130
+    pl_inode_lock_t *ilock;
190130
+
190130
+    /* If the inode has been deleted, we won't allow any lock. */
190130
+    if (pl_inode->removed) {
190130
+        return -ESTALE;
190130
+    }
190130
+
190130
+    /* We only synchronize with locks made for regular operations coming from
190130
+     * the user. Locks done for internal purposes are hard to control and could
190130
+     * lead to long delays or deadlocks quite easily. */
190130
+    if (lock->client_pid < 0) {
190130
+        return 0;
190130
+    }
190130
+    if (!pl_inode->is_locked) {
190130
+        return 0;
190130
+    }
190130
+    if (pl_inode->remove_running > 0) {
190130
+        return 1;
190130
+    }
190130
+
190130
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
190130
+    {
190130
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
190130
+        {
190130
+            /* If a lock from the same client is already granted, we allow this
190130
+             * one to continue. This is necessary to prevent deadlocks when
190130
+             * multiple locks are taken for the same operation.
190130
+             *
190130
+             * On the other side it's unlikely that the same client sends
190130
+             * completely unrelated locks for the same inode.
190130
+             */
190130
+            if (ilock->client == lock->client) {
190130
+                return 0;
190130
+            }
190130
+        }
190130
+    }
190130
+
190130
+    return 1;
190130
+}
190130
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
190130
index ea86b96..6c81ac3 100644
190130
--- a/xlators/features/locks/src/common.h
190130
+++ b/xlators/features/locks/src/common.h
190130
@@ -105,6 +105,15 @@ void
190130
 __pl_inodelk_unref(pl_inode_lock_t *lock);
190130
 
190130
 void
190130
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
+                            struct list_head *granted, pl_dom_list_t *dom,
190130
+                            struct timespec *now, struct list_head *contend);
190130
+
190130
+void
190130
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
190130
+                      struct list_head *granted);
190130
+
190130
+void
190130
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
                           pl_dom_list_t *dom, struct timespec *now,
190130
                           struct list_head *contend);
190130
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
190130
 void
190130
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
190130
 
190130
+void
190130
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
190130
+                                struct timespec *now,
190130
+                                struct list_head *contend);
190130
+
190130
+void
190130
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
190130
+                                struct timespec *now,
190130
+                                struct list_head *contend);
190130
+
190130
 gf_boolean_t
190130
 pl_does_monkey_want_stuck_lock();
190130
 
190130
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
190130
 int
190130
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
190130
 
190130
+gf_boolean_t
190130
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
190130
+
190130
+int32_t
190130
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
190130
+                        pl_inode_t **ppl_inode, struct list_head *contend);
190130
+
190130
+int32_t
190130
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
190130
+                         struct list_head *contend);
190130
+
190130
+void
190130
+pl_inode_remove_wake(struct list_head *list);
190130
+
190130
+void
190130
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
190130
+
190130
+void
190130
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
190130
+                         struct list_head *list);
190130
+
190130
+int32_t
190130
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
190130
+
190130
 #endif /* __COMMON_H__ */
190130
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
190130
index 93c649c..b97836f 100644
190130
--- a/xlators/features/locks/src/entrylk.c
190130
+++ b/xlators/features/locks/src/entrylk.c
190130
@@ -197,9 +197,9 @@ out:
190130
     return revoke_lock;
190130
 }
190130
 
190130
-static gf_boolean_t
190130
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
190130
-                                  struct timespec *now)
190130
+void
190130
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
190130
+                                struct timespec *now, struct list_head *contend)
190130
 {
190130
     posix_locks_private_t *priv;
190130
     int64_t elapsed;
190130
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
190130
     /* If this lock is in a list, it means that we are about to send a
190130
      * notification for it, so no need to do anything else. */
190130
     if (!list_empty(&lock->contend)) {
190130
-        return _gf_false;
190130
+        return;
190130
     }
190130
 
190130
     elapsed = now->tv_sec;
190130
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
190130
         elapsed--;
190130
     }
190130
     if (elapsed < priv->notify_contention_delay) {
190130
-        return _gf_false;
190130
+        return;
190130
     }
190130
 
190130
     /* All contention notifications will be sent outside of the locked
190130
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
190130
 
190130
     lock->contention_time = *now;
190130
 
190130
-    return _gf_true;
190130
+    list_add_tail(&lock->contend, contend);
190130
 }
190130
 
190130
 void
190130
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
190130
                     break;
190130
                 }
190130
             }
190130
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
190130
-                list_add_tail(&tmp->contend, contend);
190130
-            }
190130
+            entrylk_contention_notify_check(this, tmp, now, contend);
190130
         }
190130
     }
190130
 
190130
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
190130
 
190130
         if (bl_ret == 0) {
190130
-            list_add(&bl->blocked_locks, granted);
190130
+            list_add_tail(&bl->blocked_locks, granted);
190130
         }
190130
     }
190130
-    return;
190130
 }
190130
 
190130
 /* Grants locks if possible which are blocked on a lock */
190130
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
190130
index 24dee49..1a07243 100644
190130
--- a/xlators/features/locks/src/inodelk.c
190130
+++ b/xlators/features/locks/src/inodelk.c
190130
@@ -231,9 +231,9 @@ out:
190130
     return revoke_lock;
190130
 }
190130
 
190130
-static gf_boolean_t
190130
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
190130
-                                  struct timespec *now)
190130
+void
190130
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
190130
+                                struct timespec *now, struct list_head *contend)
190130
 {
190130
     posix_locks_private_t *priv;
190130
     int64_t elapsed;
190130
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
190130
     /* If this lock is in a list, it means that we are about to send a
190130
      * notification for it, so no need to do anything else. */
190130
     if (!list_empty(&lock->contend)) {
190130
-        return _gf_false;
190130
+        return;
190130
     }
190130
 
190130
     elapsed = now->tv_sec;
190130
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
190130
         elapsed--;
190130
     }
190130
     if (elapsed < priv->notify_contention_delay) {
190130
-        return _gf_false;
190130
+        return;
190130
     }
190130
 
190130
     /* All contention notifications will be sent outside of the locked
190130
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
190130
 
190130
     lock->contention_time = *now;
190130
 
190130
-    return _gf_true;
190130
+    list_add_tail(&lock->contend, contend);
190130
 }
190130
 
190130
 void
190130
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
190130
                     break;
190130
                 }
190130
             }
190130
-            if (__inodelk_needs_contention_notify(this, l, now)) {
190130
-                list_add_tail(&l->contend, contend);
190130
-            }
190130
+            inodelk_contention_notify_check(this, l, now, contend);
190130
         }
190130
     }
190130
 
190130
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
190130
                struct list_head *contend)
190130
 {
190130
     pl_inode_lock_t *conf = NULL;
190130
-    int ret = -EINVAL;
190130
+    int ret;
190130
 
190130
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
190130
-    if (conf) {
190130
-        ret = __lock_blocked_add(this, dom, lock, can_block);
190130
-        goto out;
190130
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
190130
+    if (ret < 0) {
190130
+        return ret;
190130
+    }
190130
+    if (ret == 0) {
190130
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
190130
+    }
190130
+    if ((ret > 0) || (conf != NULL)) {
190130
+        return __lock_blocked_add(this, dom, lock, can_block);
190130
     }
190130
 
190130
     /* To prevent blocked locks starvation, check if there are any blocked
190130
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
190130
                    "starvation");
190130
         }
190130
 
190130
-        ret = __lock_blocked_add(this, dom, lock, can_block);
190130
-        goto out;
190130
+        return __lock_blocked_add(this, dom, lock, can_block);
190130
     }
190130
     __pl_inodelk_ref(lock);
190130
     gettimeofday(&lock->granted_time, NULL);
190130
     list_add(&lock->list, &dom->inodelk_list);
190130
 
190130
-    ret = 0;
190130
-
190130
-out:
190130
-    return ret;
190130
+    return 0;
190130
 }
190130
 
190130
 /* Return true if the two inodelks have exactly same lock boundaries */
190130
@@ -529,12 +528,11 @@ out:
190130
     return conf;
190130
 }
190130
 
190130
-static void
190130
+void
190130
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
                             struct list_head *granted, pl_dom_list_t *dom,
190130
                             struct timespec *now, struct list_head *contend)
190130
 {
190130
-    int bl_ret = 0;
190130
     pl_inode_lock_t *bl = NULL;
190130
     pl_inode_lock_t *tmp = NULL;
190130
 
190130
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
     {
190130
         list_del_init(&bl->blocked_locks);
190130
 
190130
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
190130
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
190130
 
190130
-        if (bl_ret == 0) {
190130
-            list_add(&bl->blocked_locks, granted);
190130
+        if (bl->status != -EAGAIN) {
190130
+            list_add_tail(&bl->blocked_locks, granted);
190130
         }
190130
     }
190130
-    return;
190130
 }
190130
 
190130
-/* Grant all inodelks blocked on a lock */
190130
 void
190130
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
-                          pl_dom_list_t *dom, struct timespec *now,
190130
-                          struct list_head *contend)
190130
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
190130
+                      struct list_head *granted)
190130
 {
190130
-    struct list_head granted;
190130
     pl_inode_lock_t *lock;
190130
     pl_inode_lock_t *tmp;
190130
+    int32_t op_ret;
190130
+    int32_t op_errno;
190130
 
190130
-    INIT_LIST_HEAD(&granted);
190130
-
190130
-    pthread_mutex_lock(&pl_inode->mutex);
190130
-    {
190130
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
190130
-                                    contend);
190130
-    }
190130
-    pthread_mutex_unlock(&pl_inode->mutex);
190130
-
190130
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
190130
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
190130
     {
190130
-        gf_log(this->name, GF_LOG_TRACE,
190130
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
190130
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
190130
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
190130
-               lock->user_flock.l_len);
190130
-
190130
+        if (lock->status == 0) {
190130
+            op_ret = 0;
190130
+            op_errno = 0;
190130
+            gf_log(this->name, GF_LOG_TRACE,
190130
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
190130
+                   " => Granted",
190130
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
190130
+                   lock->client_pid, lkowner_utoa(&lock->owner),
190130
+                   lock->user_flock.l_start, lock->user_flock.l_len);
190130
+        } else {
190130
+            op_ret = -1;
190130
+            op_errno = -lock->status;
190130
+        }
190130
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
190130
-                     0, 0, lock->volume);
190130
+                     op_ret, op_errno, lock->volume);
190130
 
190130
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
190130
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
190130
         lock->frame = NULL;
190130
     }
190130
 
190130
     pthread_mutex_lock(&pl_inode->mutex);
190130
     {
190130
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
190130
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
190130
         {
190130
             list_del_init(&lock->blocked_locks);
190130
             __pl_inodelk_unref(lock);
190130
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
     pthread_mutex_unlock(&pl_inode->mutex);
190130
 }
190130
 
190130
+/* Grant all inodelks blocked on a lock */
190130
+void
190130
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
190130
+                          pl_dom_list_t *dom, struct timespec *now,
190130
+                          struct list_head *contend)
190130
+{
190130
+    struct list_head granted;
190130
+
190130
+    INIT_LIST_HEAD(&granted);
190130
+
190130
+    pthread_mutex_lock(&pl_inode->mutex);
190130
+    {
190130
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
190130
+                                    contend);
190130
+    }
190130
+    pthread_mutex_unlock(&pl_inode->mutex);
190130
+
190130
+    unwind_granted_inodes(this, pl_inode, &granted);
190130
+}
190130
+
190130
 static void
190130
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
190130
 {
190130
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
190130
                  * and blocked lists, then this means that a parallel
190130
                  * unlock on another inodelk (L2 say) may have 'granted'
190130
                  * L1 and added it to 'granted' list in
190130
-                 * __grant_blocked_node_locks() (although using the
190130
+                 * __grant_blocked_inode_locks() (although using the
190130
                  * 'blocked_locks' member). In that case, the cleanup
190130
                  * codepath must try and grant other overlapping
190130
                  * blocked inodelks from other clients, now that L1 is
190130
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
190130
     gf_boolean_t need_inode_unref = _gf_false;
190130
     struct list_head *pcontend = NULL;
190130
     struct list_head contend;
190130
+    struct list_head wake;
190130
     struct timespec now = {};
190130
     short fl_type;
190130
 
190130
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
190130
         timespec_now(&now;;
190130
     }
190130
 
190130
+    INIT_LIST_HEAD(&wake);
190130
+
190130
     if (ctx)
190130
         pthread_mutex_lock(&ctx->lock);
190130
     pthread_mutex_lock(&pl_inode->mutex);
190130
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
190130
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
190130
                        lock->client_pid, lkowner_utoa(&lock->owner),
190130
                        lock->user_flock.l_start, lock->user_flock.l_len);
190130
-                if (can_block)
190130
+                if (can_block) {
190130
                     unref = _gf_false;
190130
-                /* For all but the case where a non-blocking
190130
-                 * lock attempt fails, the extra ref taken at
190130
-                 * the start of this function must be negated.
190130
-                 */
190130
-                else
190130
-                    need_inode_unref = _gf_true;
190130
+                }
190130
             }
190130
-
190130
-            if (ctx && (!ret || can_block))
190130
+            /* For all but the case where a non-blocking lock attempt fails
190130
+             * with -EAGAIN, the extra ref taken at the start of this function
190130
+             * must be negated. */
190130
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
190130
+            if (ctx && !need_inode_unref) {
190130
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
190130
+            }
190130
         } else {
190130
             /* Irrespective of whether unlock succeeds or not,
190130
              * the extra inode ref that was done at the start of
190130
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
190130
             list_del_init(&retlock->client_list);
190130
             __pl_inodelk_unref(retlock);
190130
 
190130
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
190130
+
190130
             ret = 0;
190130
         }
190130
     out:
190130
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
190130
     if (ctx)
190130
         pthread_mutex_unlock(&ctx->lock);
190130
 
190130
+    pl_inode_remove_wake(&wake);
190130
+
190130
     /* The following (extra) unref corresponds to the ref that
190130
      * was done at the time the lock was granted.
190130
      */
190130
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
190130
                                  inode);
190130
 
190130
             if (ret < 0) {
190130
-                if ((can_block) && (F_UNLCK != lock_type)) {
190130
-                    goto out;
190130
+                if (ret == -EAGAIN) {
190130
+                    if (can_block && (F_UNLCK != lock_type)) {
190130
+                        goto out;
190130
+                    }
190130
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
190130
+                } else {
190130
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
190130
                 }
190130
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
190130
                 op_errno = -ret;
190130
                 goto unwind;
190130
             }
190130
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
190130
index aa267de..6666feb 100644
190130
--- a/xlators/features/locks/src/locks.h
190130
+++ b/xlators/features/locks/src/locks.h
190130
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
190130
 
190130
     struct list_head client_list; /* list of all locks from a client */
190130
     short fl_type;
190130
+
190130
+    int32_t status; /* Error code when we try to grant a lock in blocked
190130
+                       state */
190130
 };
190130
 typedef struct __pl_inode_lock pl_inode_lock_t;
190130
 
190130
@@ -164,13 +167,14 @@ struct __pl_inode {
190130
     struct list_head rw_list;            /* list of waiting r/w requests */
190130
     struct list_head reservelk_list;     /* list of reservelks */
190130
     struct list_head blocked_reservelks; /* list of blocked reservelks */
190130
-    struct list_head
190130
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
190130
-    struct list_head metalk_list; /* Meta lock list */
190130
-                                  /* This is to store the incoming lock
190130
-                                     requests while meta lock is enabled */
190130
-    struct list_head queued_locks;
190130
-    int mandatory; /* if mandatory locking is enabled */
190130
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
190130
+                                            reserve is held*/
190130
+    struct list_head metalk_list;        /* Meta lock list */
190130
+    struct list_head queued_locks;       /* This is to store the incoming lock
190130
+                                            requests while meta lock is enabled */
190130
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
190130
+                                 the inode. */
190130
+    int mandatory;            /* if mandatory locking is enabled */
190130
 
190130
     inode_t *refkeeper; /* hold refs on an inode while locks are
190130
                            held to prevent pruning */
190130
@@ -197,6 +201,11 @@ struct __pl_inode {
190130
     */
190130
     int fop_wind_count;
190130
     pthread_cond_t check_fop_wind_count;
190130
+
190130
+    int32_t links;           /* Number of hard links the inode has. */
190130
+    uint32_t remove_running; /* Number of remove operations running. */
190130
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
190130
+    gf_boolean_t removed;    /* The inode has been deleted. */
190130
 };
190130
 typedef struct __pl_inode pl_inode_t;
190130
 
190130
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
190130
index 7887b82..5ae0125 100644
190130
--- a/xlators/features/locks/src/posix.c
190130
+++ b/xlators/features/locks/src/posix.c
190130
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
190130
         }                                                                      \
190130
     } while (0)
190130
 
190130
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
190130
+                        _args...)                                              \
190130
+    ({                                                                         \
190130
+        struct list_head contend;                                              \
190130
+        pl_inode_t *__pl_inode;                                                \
190130
+        call_stub_t *__stub;                                                   \
190130
+        int32_t __error;                                                       \
190130
+        INIT_LIST_HEAD(&contend);                                              \
190130
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
190130
+                                          &__pl_inode, &contend);              \
190130
+        if (__error < 0) {                                                     \
190130
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
190130
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
190130
+                                               &contend);                      \
190130
+        } else if (__error == 0) {                                             \
190130
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
190130
+                                  _loc2);                                      \
190130
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
190130
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
190130
+        }                                                                      \
190130
+        __error;                                                               \
190130
+    })
190130
+
190130
 gf_boolean_t
190130
 pl_has_xdata_requests(dict_t *xdata)
190130
 {
190130
@@ -2969,11 +2992,85 @@ out:
190130
     return ret;
190130
 }
190130
 
190130
+static int32_t
190130
+pl_request_link_count(dict_t **pxdata)
190130
+{
190130
+    dict_t *xdata;
190130
+
190130
+    xdata = *pxdata;
190130
+    if (xdata == NULL) {
190130
+        xdata = dict_new();
190130
+        if (xdata == NULL) {
190130
+            return ENOMEM;
190130
+        }
190130
+    } else {
190130
+        dict_ref(xdata);
190130
+    }
190130
+
190130
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
190130
+        dict_unref(xdata);
190130
+        return ENOMEM;
190130
+    }
190130
+
190130
+    *pxdata = xdata;
190130
+
190130
+    return 0;
190130
+}
190130
+
190130
+static int32_t
190130
+pl_check_link_count(dict_t *xdata)
190130
+{
190130
+    int32_t count;
190130
+
190130
+    /* In case we are unable to read the link count from xdata, we take a
190130
+     * conservative approach and return -2, which will prevent the inode from
190130
+     * being considered deleted. In fact it will cause link tracking for this
190130
+     * inode to be disabled completely to avoid races. */
190130
+
190130
+    if (xdata == NULL) {
190130
+        return -2;
190130
+    }
190130
+
190130
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
190130
+        return -2;
190130
+    }
190130
+
190130
+    return count;
190130
+}
190130
+
190130
 int32_t
190130
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
190130
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
190130
               struct iatt *postparent)
190130
 {
190130
+    pl_inode_t *pl_inode;
190130
+
190130
+    if (op_ret >= 0) {
190130
+        pl_inode = pl_inode_get(this, inode, NULL);
190130
+        if (pl_inode == NULL) {
190130
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
190130
+                            NULL);
190130
+            return 0;
190130
+        }
190130
+
190130
+        pthread_mutex_lock(&pl_inode->mutex);
190130
+
190130
+        /* We only update the link count if we previously didn't know it.
190130
+         * Doing it always can lead to races since lookup is not executed
190130
+         * atomically most of the times. */
190130
+        if (pl_inode->links == -2) {
190130
+            pl_inode->links = pl_check_link_count(xdata);
190130
+            if (buf->ia_type == IA_IFDIR) {
190130
+                /* Directories have at least 2 links. To avoid special handling
190130
+                 * for directories, we simply decrement the value here to make
190130
+                 * them equivalent to regular files. */
190130
+                pl_inode->links--;
190130
+            }
190130
+        }
190130
+
190130
+        pthread_mutex_unlock(&pl_inode->mutex);
190130
+    }
190130
+
190130
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
190130
                     postparent);
190130
     return 0;
190130
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
190130
 int32_t
190130
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
190130
 {
190130
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
190130
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
190130
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
190130
+    int32_t error;
190130
+
190130
+    error = pl_request_link_count(&xdata);
190130
+    if (error == 0) {
190130
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
190130
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
190130
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
190130
+        dict_unref(xdata);
190130
+    } else {
190130
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
190130
+    }
190130
     return 0;
190130
 }
190130
 
190130
@@ -3792,6 +3897,10 @@ unlock:
190130
             gf_proc_dump_write("posixlk-count", "%d", count);
190130
             __dump_posixlks(pl_inode);
190130
         }
190130
+
190130
+        gf_proc_dump_write("links", "%d", pl_inode->links);
190130
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
190130
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
190130
     }
190130
     pthread_mutex_unlock(&pl_inode->mutex);
190130
 
190130
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
190130
               struct iatt *postoldparent, struct iatt *prenewparent,
190130
               struct iatt *postnewparent, dict_t *xdata)
190130
 {
190130
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
190130
+
190130
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
190130
                     postoldparent, prenewparent, postnewparent, xdata);
190130
+
190130
     return 0;
190130
 }
190130
 
190130
@@ -4146,10 +4258,15 @@ int32_t
190130
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
190130
           dict_t *xdata)
190130
 {
190130
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
190130
+    int32_t error;
190130
+
190130
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
190130
+                            pl_rename_cbk, oldloc, newloc, xdata);
190130
+    if (error > 0) {
190130
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
190130
+                            NULL, NULL);
190130
+    }
190130
 
190130
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
190130
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
190130
     return 0;
190130
 }
190130
 
190130
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
190130
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
190130
               dict_t *xdata)
190130
 {
190130
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
190130
+
190130
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
190130
                     postparent, xdata);
190130
+
190130
     return 0;
190130
 }
190130
 
190130
@@ -4282,9 +4402,14 @@ int32_t
190130
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
190130
           dict_t *xdata)
190130
 {
190130
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
190130
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
190130
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
190130
+    int32_t error;
190130
+
190130
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
190130
+                            pl_unlink_cbk, loc, xflag, xdata);
190130
+    if (error > 0) {
190130
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
190130
+    }
190130
+
190130
     return 0;
190130
 }
190130
 
190130
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
190130
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
190130
              dict_t *xdata)
190130
 {
190130
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
190130
+
190130
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
190130
                                postparent, xdata);
190130
+
190130
     return 0;
190130
 }
190130
 
190130
@@ -4360,9 +4488,14 @@ int
190130
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
190130
          dict_t *xdata)
190130
 {
190130
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
190130
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
190130
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
190130
+    int32_t error;
190130
+
190130
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
190130
+                            pl_rmdir_cbk, loc, xflags, xdata);
190130
+    if (error > 0) {
190130
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
190130
+    }
190130
+
190130
     return 0;
190130
 }
190130
 
190130
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
190130
             int32_t op_errno, inode_t *inode, struct iatt *buf,
190130
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
190130
 {
190130
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
190130
+
190130
+    if (op_ret >= 0) {
190130
+        pthread_mutex_lock(&pl_inode->mutex);
190130
+
190130
+        /* TODO: can happen pl_inode->links == 0 ? */
190130
+        if (pl_inode->links >= 0) {
190130
+            pl_inode->links++;
190130
+        }
190130
+
190130
+        pthread_mutex_unlock(&pl_inode->mutex);
190130
+    }
190130
+
190130
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
190130
                                preparent, postparent, xdata);
190130
     return 0;
190130
@@ -4401,9 +4547,18 @@ int
190130
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
190130
         dict_t *xdata)
190130
 {
190130
+    pl_inode_t *pl_inode;
190130
+
190130
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
190130
+    if (pl_inode == NULL) {
190130
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
190130
+                            NULL);
190130
+        return 0;
190130
+    }
190130
+
190130
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
190130
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
190130
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
190130
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
190130
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
190130
     return 0;
190130
 }
190130
 
190130
-- 
190130
1.8.3.1
190130