17b94a
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
17b94a
From: Xavi Hernandez <xhernandez@redhat.com>
17b94a
Date: Fri, 12 Jun 2020 00:06:36 +0200
17b94a
Subject: [PATCH 455/456] locks: prevent deletion of locked entries
17b94a
17b94a
To keep consistency inside transactions started by locking an entry or
17b94a
an inode, this change delays the removal of entries that are currently
17b94a
locked by one or more clients. Once all locks are released, the removal
17b94a
is processed.
17b94a
17b94a
It has also been improved the detection of stale inodes in the locking
17b94a
code of EC.
17b94a
17b94a
>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
17b94a
>Fixes: #990
17b94a
17b94a
Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
17b94a
BUG: 1812789
17b94a
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
17b94a
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
17b94a
Tested-by: RHGS Build Bot <nigelb@redhat.com>
17b94a
---
17b94a
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
17b94a
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
17b94a
 xlators/features/locks/src/common.h  |  43 +++++
17b94a
 xlators/features/locks/src/entrylk.c |  19 +--
17b94a
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
17b94a
 xlators/features/locks/src/locks.h   |  23 ++-
17b94a
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
17b94a
 7 files changed, 689 insertions(+), 114 deletions(-)
17b94a
17b94a
diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
17b94a
index ffcac07..db86296 100644
17b94a
--- a/xlators/cluster/ec/src/ec-locks.c
17b94a
+++ b/xlators/cluster/ec/src/ec-locks.c
17b94a
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
17b94a
     ec_t *ec = fop->xl->private;
17b94a
     ec_cbk_data_t *ans = NULL;
17b94a
     ec_cbk_data_t *cbk = NULL;
17b94a
-    uintptr_t locked = 0, notlocked = 0;
17b94a
+    uintptr_t locked = 0;
17b94a
+    int32_t good = 0;
17b94a
+    int32_t eagain = 0;
17b94a
+    int32_t estale = 0;
17b94a
     int32_t error = -1;
17b94a
 
17b94a
+    /* There are some errors that we'll handle in an special way while trying
17b94a
+     * to acquire a lock.
17b94a
+     *
17b94a
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
17b94a
+     *            consider that there's contention on the inode, so we consider
17b94a
+     *            the acquisition a failure and try again with a sequential
17b94a
+     *            blocking lock request. This will ensure that we get a lock on
17b94a
+     *            as many bricks as possible (ignoring EAGAIN here would cause
17b94a
+     *            unnecessary triggers of self-healing).
17b94a
+     *
17b94a
+     *            If it's found during a sequential blocking lock request, it's
17b94a
+     *            considered an error. Lock will only succeed if there are
17b94a
+     *            enough other bricks locked.
17b94a
+     *
17b94a
+     *   ESTALE:  This can appear during parallel or sequential lock request if
17b94a
+     *            the inode has just been unlinked. We consider this error is
17b94a
+     *            not recoverable, but we also don't consider it as fatal. So,
17b94a
+     *            if it happens during parallel lock, we won't attempt a
17b94a
+     *            sequential one unless there are EAGAIN errors on other
17b94a
+     *            bricks (and are enough to form a quorum), but if we reach
17b94a
+     *            quorum counting the ESTALE bricks, we consider the whole
17b94a
+     *            result of the operation is ESTALE instead of EIO.
17b94a
+     */
17b94a
+
17b94a
     list_for_each_entry(ans, &fop->cbk_list, list)
17b94a
     {
17b94a
         if (ans->op_ret >= 0) {
17b94a
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
17b94a
                 error = EIO;
17b94a
             }
17b94a
             locked |= ans->mask;
17b94a
+            good = ans->count;
17b94a
             cbk = ans;
17b94a
-        } else {
17b94a
-            if (ans->op_errno == EAGAIN) {
17b94a
-                switch (fop->uint32) {
17b94a
-                    case EC_LOCK_MODE_NONE:
17b94a
-                    case EC_LOCK_MODE_ALL:
17b94a
-                        /* Goal is to treat non-blocking lock as failure
17b94a
-                         * even if there is a single EAGAIN*/
17b94a
-                        notlocked |= ans->mask;
17b94a
-                        break;
17b94a
-                }
17b94a
-            }
17b94a
+        } else if (ans->op_errno == ESTALE) {
17b94a
+            estale += ans->count;
17b94a
+        } else if ((ans->op_errno == EAGAIN) &&
17b94a
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
17b94a
+            eagain += ans->count;
17b94a
         }
17b94a
     }
17b94a
 
17b94a
     if (error == -1) {
17b94a
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
17b94a
-            if (notlocked == 0) {
17b94a
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
17b94a
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
17b94a
+         * we retry with a sequential blocking lock request if needed.
17b94a
+         * Otherwise we succeed. */
17b94a
+        if ((good + eagain) >= ec->fragments) {
17b94a
+            if (eagain == 0) {
17b94a
                 if (fop->answer == NULL) {
17b94a
                     fop->answer = cbk;
17b94a
                 }
17b94a
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
17b94a
                     case EC_LOCK_MODE_NONE:
17b94a
                         error = EAGAIN;
17b94a
                         break;
17b94a
-
17b94a
                     case EC_LOCK_MODE_ALL:
17b94a
                         fop->uint32 = EC_LOCK_MODE_INC;
17b94a
                         break;
17b94a
-
17b94a
                     default:
17b94a
+                        /* This shouldn't happen because eagain cannot be > 0
17b94a
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
17b94a
                         error = EIO;
17b94a
                         break;
17b94a
                 }
17b94a
             }
17b94a
         } else {
17b94a
-            if (fop->answer && fop->answer->op_ret < 0)
17b94a
+            /* We have been unable to find enough candidates that will be able
17b94a
+             * to take the lock. If we have quorum on some answer, we return
17b94a
+             * it. Otherwise we check if ESTALE answers allow us to reach
17b94a
+             * quorum. If so, we return ESTALE. */
17b94a
+            if (fop->answer && fop->answer->op_ret < 0) {
17b94a
                 error = fop->answer->op_errno;
17b94a
-            else
17b94a
+            } else if ((good + eagain + estale) >= ec->fragments) {
17b94a
+                error = ESTALE;
17b94a
+            } else {
17b94a
                 error = EIO;
17b94a
+            }
17b94a
         }
17b94a
     }
17b94a
 
17b94a
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
17b94a
index 1406e70..0c52853 100644
17b94a
--- a/xlators/features/locks/src/common.c
17b94a
+++ b/xlators/features/locks/src/common.c
17b94a
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
17b94a
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
17b94a
         INIT_LIST_HEAD(&pl_inode->metalk_list);
17b94a
         INIT_LIST_HEAD(&pl_inode->queued_locks);
17b94a
+        INIT_LIST_HEAD(&pl_inode->waiting);
17b94a
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
17b94a
 
17b94a
         pl_inode->check_mlock_info = _gf_true;
17b94a
         pl_inode->mlock_enforced = _gf_false;
17b94a
 
17b94a
+        /* -2 means never looked up. -1 means something went wrong and link
17b94a
+         * tracking is disabled. */
17b94a
+        pl_inode->links = -2;
17b94a
+
17b94a
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
17b94a
         if (ret) {
17b94a
             pthread_mutex_destroy(&pl_inode->mutex);
17b94a
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
17b94a
     }
17b94a
 
17b94a
     return 0;
17b94a
-}
17b94a
\ No newline at end of file
17b94a
+}
17b94a
+
17b94a
+gf_boolean_t
17b94a
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
17b94a
+{
17b94a
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
17b94a
+        return _gf_true;
17b94a
+    }
17b94a
+
17b94a
+    if (is_lk_owner_null(owner)) {
17b94a
+        return _gf_false;
17b94a
+    }
17b94a
+    return _gf_true;
17b94a
+}
17b94a
+
17b94a
+static int32_t
17b94a
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
17b94a
+{
17b94a
+    inode_t *inode = NULL;
17b94a
+    int32_t error = 0;
17b94a
+
17b94a
+    if (loc->inode != NULL) {
17b94a
+        inode = inode_ref(loc->inode);
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+    if (loc->parent == NULL) {
17b94a
+        error = EINVAL;
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+    if (!gf_uuid_is_null(loc->gfid)) {
17b94a
+        inode = inode_find(loc->parent->table, loc->gfid);
17b94a
+        if (inode != NULL) {
17b94a
+            goto done;
17b94a
+        }
17b94a
+    }
17b94a
+
17b94a
+    if (loc->name == NULL) {
17b94a
+        error = EINVAL;
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
17b94a
+    if (inode == NULL) {
17b94a
+        /* We haven't found any inode. This means that the file doesn't exist
17b94a
+         * or that even if it exists, we don't have any knowledge about it, so
17b94a
+         * we don't have locks on it either, which is fine for our purposes. */
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+done:
17b94a
+    *pinode = inode;
17b94a
+
17b94a
+    return error;
17b94a
+}
17b94a
+
17b94a
+static gf_boolean_t
17b94a
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
17b94a
+                    struct timespec *now, struct list_head *contend)
17b94a
+{
17b94a
+    pl_dom_list_t *dom;
17b94a
+    pl_inode_lock_t *lock;
17b94a
+    gf_boolean_t has_owners = _gf_false;
17b94a
+
17b94a
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
17b94a
+    {
17b94a
+        list_for_each_entry(lock, &dom->inodelk_list, list)
17b94a
+        {
17b94a
+            /* If the lock belongs to the same client, we assume it's related
17b94a
+             * to the same operation, so we allow the removal to continue. */
17b94a
+            if (lock->client == client) {
17b94a
+                continue;
17b94a
+            }
17b94a
+            /* If the lock belongs to an internal process, we don't block the
17b94a
+             * removal. */
17b94a
+            if (lock->client_pid < 0) {
17b94a
+                continue;
17b94a
+            }
17b94a
+            if (contend == NULL) {
17b94a
+                return _gf_true;
17b94a
+            }
17b94a
+            has_owners = _gf_true;
17b94a
+            inodelk_contention_notify_check(xl, lock, now, contend);
17b94a
+        }
17b94a
+    }
17b94a
+
17b94a
+    return has_owners;
17b94a
+}
17b94a
+
17b94a
+int32_t
17b94a
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
17b94a
+                        pl_inode_t **ppl_inode, struct list_head *contend)
17b94a
+{
17b94a
+    struct timespec now;
17b94a
+    inode_t *inode;
17b94a
+    pl_inode_t *pl_inode;
17b94a
+    int32_t error;
17b94a
+
17b94a
+    pl_inode = NULL;
17b94a
+
17b94a
+    error = pl_inode_from_loc(loc, &inode;;
17b94a
+    if ((error != 0) || (inode == NULL)) {
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+    pl_inode = pl_inode_get(xl, inode, NULL);
17b94a
+    if (pl_inode == NULL) {
17b94a
+        inode_unref(inode);
17b94a
+        error = ENOMEM;
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+    /* pl_inode_from_loc() already increments ref count for inode, so
17b94a
+     * we only assign here our reference. */
17b94a
+    pl_inode->inode = inode;
17b94a
+
17b94a
+    timespec_now(&now;;
17b94a
+
17b94a
+    pthread_mutex_lock(&pl_inode->mutex);
17b94a
+
17b94a
+    if (pl_inode->removed) {
17b94a
+        error = ESTALE;
17b94a
+        goto unlock;
17b94a
+    }
17b94a
+
17b94a
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
17b94a
+        error = -1;
17b94a
+        /* We skip the unlock here because the caller must create a stub when
17b94a
+         * we return -1 and do a call to pl_inode_remove_complete(), which
17b94a
+         * assumes the lock is still acquired and will release it once
17b94a
+         * everything else is prepared. */
17b94a
+        goto done;
17b94a
+    }
17b94a
+
17b94a
+    pl_inode->is_locked = _gf_true;
17b94a
+    pl_inode->remove_running++;
17b94a
+
17b94a
+unlock:
17b94a
+    pthread_mutex_unlock(&pl_inode->mutex);
17b94a
+
17b94a
+done:
17b94a
+    *ppl_inode = pl_inode;
17b94a
+
17b94a
+    return error;
17b94a
+}
17b94a
+
17b94a
+int32_t
17b94a
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
17b94a
+                         struct list_head *contend)
17b94a
+{
17b94a
+    pl_inode_lock_t *lock;
17b94a
+    int32_t error = -1;
17b94a
+
17b94a
+    if (stub != NULL) {
17b94a
+        list_add_tail(&stub->list, &pl_inode->waiting);
17b94a
+        pl_inode->is_locked = _gf_true;
17b94a
+    } else {
17b94a
+        error = ENOMEM;
17b94a
+
17b94a
+        while (!list_empty(contend)) {
17b94a
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
17b94a
+            list_del_init(&lock->list);
17b94a
+            __pl_inodelk_unref(lock);
17b94a
+        }
17b94a
+    }
17b94a
+
17b94a
+    pthread_mutex_unlock(&pl_inode->mutex);
17b94a
+
17b94a
+    if (error < 0) {
17b94a
+        inodelk_contention_notify(xl, contend);
17b94a
+    }
17b94a
+
17b94a
+    inode_unref(pl_inode->inode);
17b94a
+
17b94a
+    return error;
17b94a
+}
17b94a
+
17b94a
+void
17b94a
+pl_inode_remove_wake(struct list_head *list)
17b94a
+{
17b94a
+    call_stub_t *stub;
17b94a
+
17b94a
+    while (!list_empty(list)) {
17b94a
+        stub = list_first_entry(list, call_stub_t, list);
17b94a
+        list_del_init(&stub->list);
17b94a
+
17b94a
+        call_resume(stub);
17b94a
+    }
17b94a
+}
17b94a
+
17b94a
+void
17b94a
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
17b94a
+{
17b94a
+    struct list_head contend, granted;
17b94a
+    struct timespec now;
17b94a
+    pl_dom_list_t *dom;
17b94a
+
17b94a
+    if (pl_inode == NULL) {
17b94a
+        return;
17b94a
+    }
17b94a
+
17b94a
+    INIT_LIST_HEAD(&contend);
17b94a
+    INIT_LIST_HEAD(&granted);
17b94a
+    timespec_now(&now;;
17b94a
+
17b94a
+    pthread_mutex_lock(&pl_inode->mutex);
17b94a
+
17b94a
+    if (error == 0) {
17b94a
+        if (pl_inode->links >= 0) {
17b94a
+            pl_inode->links--;
17b94a
+        }
17b94a
+        if (pl_inode->links == 0) {
17b94a
+            pl_inode->removed = _gf_true;
17b94a
+        }
17b94a
+    }
17b94a
+
17b94a
+    pl_inode->remove_running--;
17b94a
+
17b94a
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
17b94a
+        pl_inode->is_locked = _gf_false;
17b94a
+
17b94a
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
17b94a
+        {
17b94a
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
17b94a
+                                        &contend);
17b94a
+        }
17b94a
+    }
17b94a
+
17b94a
+    pthread_mutex_unlock(&pl_inode->mutex);
17b94a
+
17b94a
+    unwind_granted_inodes(xl, pl_inode, &granted);
17b94a
+
17b94a
+    inodelk_contention_notify(xl, &contend);
17b94a
+
17b94a
+    inode_unref(pl_inode->inode);
17b94a
+}
17b94a
+
17b94a
+void
17b94a
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
17b94a
+                         struct list_head *list)
17b94a
+{
17b94a
+    call_stub_t *stub, *tmp;
17b94a
+
17b94a
+    if (!pl_inode->is_locked) {
17b94a
+        return;
17b94a
+    }
17b94a
+
17b94a
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
17b94a
+    {
17b94a
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
17b94a
+                                 NULL)) {
17b94a
+            list_move_tail(&stub->list, list);
17b94a
+        }
17b94a
+    }
17b94a
+}
17b94a
+
17b94a
+/* This function determines if an inodelk attempt can be done now or it needs
17b94a
+ * to wait.
17b94a
+ *
17b94a
+ * Possible return values:
17b94a
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
17b94a
+ *        inode has been deleted previously by unlink/rmdir/rename
17b94a
+ *   = 0: The lock can be attempted.
17b94a
+ *   > 0: The lock needs to wait because a conflicting remove operation is
17b94a
+ *        ongoing.
17b94a
+ */
17b94a
+int32_t
17b94a
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
17b94a
+{
17b94a
+    pl_dom_list_t *dom;
17b94a
+    pl_inode_lock_t *ilock;
17b94a
+
17b94a
+    /* If the inode has been deleted, we won't allow any lock. */
17b94a
+    if (pl_inode->removed) {
17b94a
+        return -ESTALE;
17b94a
+    }
17b94a
+
17b94a
+    /* We only synchronize with locks made for regular operations coming from
17b94a
+     * the user. Locks done for internal purposes are hard to control and could
17b94a
+     * lead to long delays or deadlocks quite easily. */
17b94a
+    if (lock->client_pid < 0) {
17b94a
+        return 0;
17b94a
+    }
17b94a
+    if (!pl_inode->is_locked) {
17b94a
+        return 0;
17b94a
+    }
17b94a
+    if (pl_inode->remove_running > 0) {
17b94a
+        return 1;
17b94a
+    }
17b94a
+
17b94a
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
17b94a
+    {
17b94a
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
17b94a
+        {
17b94a
+            /* If a lock from the same client is already granted, we allow this
17b94a
+             * one to continue. This is necessary to prevent deadlocks when
17b94a
+             * multiple locks are taken for the same operation.
17b94a
+             *
17b94a
+             * On the other side it's unlikely that the same client sends
17b94a
+             * completely unrelated locks for the same inode.
17b94a
+             */
17b94a
+            if (ilock->client == lock->client) {
17b94a
+                return 0;
17b94a
+            }
17b94a
+        }
17b94a
+    }
17b94a
+
17b94a
+    return 1;
17b94a
+}
17b94a
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
17b94a
index ea86b96..6c81ac3 100644
17b94a
--- a/xlators/features/locks/src/common.h
17b94a
+++ b/xlators/features/locks/src/common.h
17b94a
@@ -105,6 +105,15 @@ void
17b94a
 __pl_inodelk_unref(pl_inode_lock_t *lock);
17b94a
 
17b94a
 void
17b94a
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
+                            struct list_head *granted, pl_dom_list_t *dom,
17b94a
+                            struct timespec *now, struct list_head *contend);
17b94a
+
17b94a
+void
17b94a
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
17b94a
+                      struct list_head *granted);
17b94a
+
17b94a
+void
17b94a
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
                           pl_dom_list_t *dom, struct timespec *now,
17b94a
                           struct list_head *contend);
17b94a
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
17b94a
 void
17b94a
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
17b94a
 
17b94a
+void
17b94a
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
17b94a
+                                struct timespec *now,
17b94a
+                                struct list_head *contend);
17b94a
+
17b94a
+void
17b94a
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
17b94a
+                                struct timespec *now,
17b94a
+                                struct list_head *contend);
17b94a
+
17b94a
 gf_boolean_t
17b94a
 pl_does_monkey_want_stuck_lock();
17b94a
 
17b94a
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
17b94a
 int
17b94a
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
17b94a
 
17b94a
+gf_boolean_t
17b94a
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
17b94a
+
17b94a
+int32_t
17b94a
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
17b94a
+                        pl_inode_t **ppl_inode, struct list_head *contend);
17b94a
+
17b94a
+int32_t
17b94a
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
17b94a
+                         struct list_head *contend);
17b94a
+
17b94a
+void
17b94a
+pl_inode_remove_wake(struct list_head *list);
17b94a
+
17b94a
+void
17b94a
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
17b94a
+
17b94a
+void
17b94a
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
17b94a
+                         struct list_head *list);
17b94a
+
17b94a
+int32_t
17b94a
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
17b94a
+
17b94a
 #endif /* __COMMON_H__ */
17b94a
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
17b94a
index 93c649c..b97836f 100644
17b94a
--- a/xlators/features/locks/src/entrylk.c
17b94a
+++ b/xlators/features/locks/src/entrylk.c
17b94a
@@ -197,9 +197,9 @@ out:
17b94a
     return revoke_lock;
17b94a
 }
17b94a
 
17b94a
-static gf_boolean_t
17b94a
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
17b94a
-                                  struct timespec *now)
17b94a
+void
17b94a
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
17b94a
+                                struct timespec *now, struct list_head *contend)
17b94a
 {
17b94a
     posix_locks_private_t *priv;
17b94a
     int64_t elapsed;
17b94a
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
17b94a
     /* If this lock is in a list, it means that we are about to send a
17b94a
      * notification for it, so no need to do anything else. */
17b94a
     if (!list_empty(&lock->contend)) {
17b94a
-        return _gf_false;
17b94a
+        return;
17b94a
     }
17b94a
 
17b94a
     elapsed = now->tv_sec;
17b94a
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
17b94a
         elapsed--;
17b94a
     }
17b94a
     if (elapsed < priv->notify_contention_delay) {
17b94a
-        return _gf_false;
17b94a
+        return;
17b94a
     }
17b94a
 
17b94a
     /* All contention notifications will be sent outside of the locked
17b94a
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
17b94a
 
17b94a
     lock->contention_time = *now;
17b94a
 
17b94a
-    return _gf_true;
17b94a
+    list_add_tail(&lock->contend, contend);
17b94a
 }
17b94a
 
17b94a
 void
17b94a
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
17b94a
                     break;
17b94a
                 }
17b94a
             }
17b94a
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
17b94a
-                list_add_tail(&tmp->contend, contend);
17b94a
-            }
17b94a
+            entrylk_contention_notify_check(this, tmp, now, contend);
17b94a
         }
17b94a
     }
17b94a
 
17b94a
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
17b94a
 
17b94a
         if (bl_ret == 0) {
17b94a
-            list_add(&bl->blocked_locks, granted);
17b94a
+            list_add_tail(&bl->blocked_locks, granted);
17b94a
         }
17b94a
     }
17b94a
-    return;
17b94a
 }
17b94a
 
17b94a
 /* Grants locks if possible which are blocked on a lock */
17b94a
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
17b94a
index 24dee49..1a07243 100644
17b94a
--- a/xlators/features/locks/src/inodelk.c
17b94a
+++ b/xlators/features/locks/src/inodelk.c
17b94a
@@ -231,9 +231,9 @@ out:
17b94a
     return revoke_lock;
17b94a
 }
17b94a
 
17b94a
-static gf_boolean_t
17b94a
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
17b94a
-                                  struct timespec *now)
17b94a
+void
17b94a
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
17b94a
+                                struct timespec *now, struct list_head *contend)
17b94a
 {
17b94a
     posix_locks_private_t *priv;
17b94a
     int64_t elapsed;
17b94a
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
17b94a
     /* If this lock is in a list, it means that we are about to send a
17b94a
      * notification for it, so no need to do anything else. */
17b94a
     if (!list_empty(&lock->contend)) {
17b94a
-        return _gf_false;
17b94a
+        return;
17b94a
     }
17b94a
 
17b94a
     elapsed = now->tv_sec;
17b94a
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
17b94a
         elapsed--;
17b94a
     }
17b94a
     if (elapsed < priv->notify_contention_delay) {
17b94a
-        return _gf_false;
17b94a
+        return;
17b94a
     }
17b94a
 
17b94a
     /* All contention notifications will be sent outside of the locked
17b94a
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
17b94a
 
17b94a
     lock->contention_time = *now;
17b94a
 
17b94a
-    return _gf_true;
17b94a
+    list_add_tail(&lock->contend, contend);
17b94a
 }
17b94a
 
17b94a
 void
17b94a
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
17b94a
                     break;
17b94a
                 }
17b94a
             }
17b94a
-            if (__inodelk_needs_contention_notify(this, l, now)) {
17b94a
-                list_add_tail(&l->contend, contend);
17b94a
-            }
17b94a
+            inodelk_contention_notify_check(this, l, now, contend);
17b94a
         }
17b94a
     }
17b94a
 
17b94a
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
17b94a
                struct list_head *contend)
17b94a
 {
17b94a
     pl_inode_lock_t *conf = NULL;
17b94a
-    int ret = -EINVAL;
17b94a
+    int ret;
17b94a
 
17b94a
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
17b94a
-    if (conf) {
17b94a
-        ret = __lock_blocked_add(this, dom, lock, can_block);
17b94a
-        goto out;
17b94a
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
17b94a
+    if (ret < 0) {
17b94a
+        return ret;
17b94a
+    }
17b94a
+    if (ret == 0) {
17b94a
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
17b94a
+    }
17b94a
+    if ((ret > 0) || (conf != NULL)) {
17b94a
+        return __lock_blocked_add(this, dom, lock, can_block);
17b94a
     }
17b94a
 
17b94a
     /* To prevent blocked locks starvation, check if there are any blocked
17b94a
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
17b94a
                    "starvation");
17b94a
         }
17b94a
 
17b94a
-        ret = __lock_blocked_add(this, dom, lock, can_block);
17b94a
-        goto out;
17b94a
+        return __lock_blocked_add(this, dom, lock, can_block);
17b94a
     }
17b94a
     __pl_inodelk_ref(lock);
17b94a
     gettimeofday(&lock->granted_time, NULL);
17b94a
     list_add(&lock->list, &dom->inodelk_list);
17b94a
 
17b94a
-    ret = 0;
17b94a
-
17b94a
-out:
17b94a
-    return ret;
17b94a
+    return 0;
17b94a
 }
17b94a
 
17b94a
 /* Return true if the two inodelks have exactly same lock boundaries */
17b94a
@@ -529,12 +528,11 @@ out:
17b94a
     return conf;
17b94a
 }
17b94a
 
17b94a
-static void
17b94a
+void
17b94a
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
                             struct list_head *granted, pl_dom_list_t *dom,
17b94a
                             struct timespec *now, struct list_head *contend)
17b94a
 {
17b94a
-    int bl_ret = 0;
17b94a
     pl_inode_lock_t *bl = NULL;
17b94a
     pl_inode_lock_t *tmp = NULL;
17b94a
 
17b94a
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
     {
17b94a
         list_del_init(&bl->blocked_locks);
17b94a
 
17b94a
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
17b94a
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
17b94a
 
17b94a
-        if (bl_ret == 0) {
17b94a
-            list_add(&bl->blocked_locks, granted);
17b94a
+        if (bl->status != -EAGAIN) {
17b94a
+            list_add_tail(&bl->blocked_locks, granted);
17b94a
         }
17b94a
     }
17b94a
-    return;
17b94a
 }
17b94a
 
17b94a
-/* Grant all inodelks blocked on a lock */
17b94a
 void
17b94a
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
-                          pl_dom_list_t *dom, struct timespec *now,
17b94a
-                          struct list_head *contend)
17b94a
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
17b94a
+                      struct list_head *granted)
17b94a
 {
17b94a
-    struct list_head granted;
17b94a
     pl_inode_lock_t *lock;
17b94a
     pl_inode_lock_t *tmp;
17b94a
+    int32_t op_ret;
17b94a
+    int32_t op_errno;
17b94a
 
17b94a
-    INIT_LIST_HEAD(&granted);
17b94a
-
17b94a
-    pthread_mutex_lock(&pl_inode->mutex);
17b94a
-    {
17b94a
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
17b94a
-                                    contend);
17b94a
-    }
17b94a
-    pthread_mutex_unlock(&pl_inode->mutex);
17b94a
-
17b94a
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
17b94a
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
17b94a
     {
17b94a
-        gf_log(this->name, GF_LOG_TRACE,
17b94a
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
17b94a
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
17b94a
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
17b94a
-               lock->user_flock.l_len);
17b94a
-
17b94a
+        if (lock->status == 0) {
17b94a
+            op_ret = 0;
17b94a
+            op_errno = 0;
17b94a
+            gf_log(this->name, GF_LOG_TRACE,
17b94a
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
17b94a
+                   " => Granted",
17b94a
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
17b94a
+                   lock->client_pid, lkowner_utoa(&lock->owner),
17b94a
+                   lock->user_flock.l_start, lock->user_flock.l_len);
17b94a
+        } else {
17b94a
+            op_ret = -1;
17b94a
+            op_errno = -lock->status;
17b94a
+        }
17b94a
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
17b94a
-                     0, 0, lock->volume);
17b94a
+                     op_ret, op_errno, lock->volume);
17b94a
 
17b94a
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
17b94a
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
17b94a
         lock->frame = NULL;
17b94a
     }
17b94a
 
17b94a
     pthread_mutex_lock(&pl_inode->mutex);
17b94a
     {
17b94a
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
17b94a
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
17b94a
         {
17b94a
             list_del_init(&lock->blocked_locks);
17b94a
             __pl_inodelk_unref(lock);
17b94a
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
     pthread_mutex_unlock(&pl_inode->mutex);
17b94a
 }
17b94a
 
17b94a
+/* Grant all inodelks blocked on a lock */
17b94a
+void
17b94a
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
17b94a
+                          pl_dom_list_t *dom, struct timespec *now,
17b94a
+                          struct list_head *contend)
17b94a
+{
17b94a
+    struct list_head granted;
17b94a
+
17b94a
+    INIT_LIST_HEAD(&granted);
17b94a
+
17b94a
+    pthread_mutex_lock(&pl_inode->mutex);
17b94a
+    {
17b94a
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
17b94a
+                                    contend);
17b94a
+    }
17b94a
+    pthread_mutex_unlock(&pl_inode->mutex);
17b94a
+
17b94a
+    unwind_granted_inodes(this, pl_inode, &granted);
17b94a
+}
17b94a
+
17b94a
 static void
17b94a
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
17b94a
 {
17b94a
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
17b94a
                  * and blocked lists, then this means that a parallel
17b94a
                  * unlock on another inodelk (L2 say) may have 'granted'
17b94a
                  * L1 and added it to 'granted' list in
17b94a
-                 * __grant_blocked_node_locks() (although using the
17b94a
+                 * __grant_blocked_inode_locks() (although using the
17b94a
                  * 'blocked_locks' member). In that case, the cleanup
17b94a
                  * codepath must try and grant other overlapping
17b94a
                  * blocked inodelks from other clients, now that L1 is
17b94a
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
17b94a
     gf_boolean_t need_inode_unref = _gf_false;
17b94a
     struct list_head *pcontend = NULL;
17b94a
     struct list_head contend;
17b94a
+    struct list_head wake;
17b94a
     struct timespec now = {};
17b94a
     short fl_type;
17b94a
 
17b94a
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
17b94a
         timespec_now(&now;;
17b94a
     }
17b94a
 
17b94a
+    INIT_LIST_HEAD(&wake);
17b94a
+
17b94a
     if (ctx)
17b94a
         pthread_mutex_lock(&ctx->lock);
17b94a
     pthread_mutex_lock(&pl_inode->mutex);
17b94a
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
17b94a
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
17b94a
                        lock->client_pid, lkowner_utoa(&lock->owner),
17b94a
                        lock->user_flock.l_start, lock->user_flock.l_len);
17b94a
-                if (can_block)
17b94a
+                if (can_block) {
17b94a
                     unref = _gf_false;
17b94a
-                /* For all but the case where a non-blocking
17b94a
-                 * lock attempt fails, the extra ref taken at
17b94a
-                 * the start of this function must be negated.
17b94a
-                 */
17b94a
-                else
17b94a
-                    need_inode_unref = _gf_true;
17b94a
+                }
17b94a
             }
17b94a
-
17b94a
-            if (ctx && (!ret || can_block))
17b94a
+            /* For all but the case where a non-blocking lock attempt fails
17b94a
+             * with -EAGAIN, the extra ref taken at the start of this function
17b94a
+             * must be negated. */
17b94a
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
17b94a
+            if (ctx && !need_inode_unref) {
17b94a
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
17b94a
+            }
17b94a
         } else {
17b94a
             /* Irrespective of whether unlock succeeds or not,
17b94a
              * the extra inode ref that was done at the start of
17b94a
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
17b94a
             list_del_init(&retlock->client_list);
17b94a
             __pl_inodelk_unref(retlock);
17b94a
 
17b94a
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
17b94a
+
17b94a
             ret = 0;
17b94a
         }
17b94a
     out:
17b94a
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
17b94a
     if (ctx)
17b94a
         pthread_mutex_unlock(&ctx->lock);
17b94a
 
17b94a
+    pl_inode_remove_wake(&wake);
17b94a
+
17b94a
     /* The following (extra) unref corresponds to the ref that
17b94a
      * was done at the time the lock was granted.
17b94a
      */
17b94a
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
17b94a
                                  inode);
17b94a
 
17b94a
             if (ret < 0) {
17b94a
-                if ((can_block) && (F_UNLCK != lock_type)) {
17b94a
-                    goto out;
17b94a
+                if (ret == -EAGAIN) {
17b94a
+                    if (can_block && (F_UNLCK != lock_type)) {
17b94a
+                        goto out;
17b94a
+                    }
17b94a
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
17b94a
+                } else {
17b94a
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
17b94a
                 }
17b94a
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
17b94a
                 op_errno = -ret;
17b94a
                 goto unwind;
17b94a
             }
17b94a
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
17b94a
index aa267de..6666feb 100644
17b94a
--- a/xlators/features/locks/src/locks.h
17b94a
+++ b/xlators/features/locks/src/locks.h
17b94a
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
17b94a
 
17b94a
     struct list_head client_list; /* list of all locks from a client */
17b94a
     short fl_type;
17b94a
+
17b94a
+    int32_t status; /* Error code when we try to grant a lock in blocked
17b94a
+                       state */
17b94a
 };
17b94a
 typedef struct __pl_inode_lock pl_inode_lock_t;
17b94a
 
17b94a
@@ -164,13 +167,14 @@ struct __pl_inode {
17b94a
     struct list_head rw_list;            /* list of waiting r/w requests */
17b94a
     struct list_head reservelk_list;     /* list of reservelks */
17b94a
     struct list_head blocked_reservelks; /* list of blocked reservelks */
17b94a
-    struct list_head
17b94a
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
17b94a
-    struct list_head metalk_list; /* Meta lock list */
17b94a
-                                  /* This is to store the incoming lock
17b94a
-                                     requests while meta lock is enabled */
17b94a
-    struct list_head queued_locks;
17b94a
-    int mandatory; /* if mandatory locking is enabled */
17b94a
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
17b94a
+                                            reserve is held*/
17b94a
+    struct list_head metalk_list;        /* Meta lock list */
17b94a
+    struct list_head queued_locks;       /* This is to store the incoming lock
17b94a
+                                            requests while meta lock is enabled */
17b94a
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
17b94a
+                                 the inode. */
17b94a
+    int mandatory;            /* if mandatory locking is enabled */
17b94a
 
17b94a
     inode_t *refkeeper; /* hold refs on an inode while locks are
17b94a
                            held to prevent pruning */
17b94a
@@ -197,6 +201,11 @@ struct __pl_inode {
17b94a
     */
17b94a
     int fop_wind_count;
17b94a
     pthread_cond_t check_fop_wind_count;
17b94a
+
17b94a
+    int32_t links;           /* Number of hard links the inode has. */
17b94a
+    uint32_t remove_running; /* Number of remove operations running. */
17b94a
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
17b94a
+    gf_boolean_t removed;    /* The inode has been deleted. */
17b94a
 };
17b94a
 typedef struct __pl_inode pl_inode_t;
17b94a
 
17b94a
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
17b94a
index 7887b82..5ae0125 100644
17b94a
--- a/xlators/features/locks/src/posix.c
17b94a
+++ b/xlators/features/locks/src/posix.c
17b94a
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
17b94a
         }                                                                      \
17b94a
     } while (0)
17b94a
 
17b94a
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
17b94a
+                        _args...)                                              \
17b94a
+    ({                                                                         \
17b94a
+        struct list_head contend;                                              \
17b94a
+        pl_inode_t *__pl_inode;                                                \
17b94a
+        call_stub_t *__stub;                                                   \
17b94a
+        int32_t __error;                                                       \
17b94a
+        INIT_LIST_HEAD(&contend);                                              \
17b94a
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
17b94a
+                                          &__pl_inode, &contend);              \
17b94a
+        if (__error < 0) {                                                     \
17b94a
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
17b94a
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
17b94a
+                                               &contend);                      \
17b94a
+        } else if (__error == 0) {                                             \
17b94a
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
17b94a
+                                  _loc2);                                      \
17b94a
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
17b94a
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
17b94a
+        }                                                                      \
17b94a
+        __error;                                                               \
17b94a
+    })
17b94a
+
17b94a
 gf_boolean_t
17b94a
 pl_has_xdata_requests(dict_t *xdata)
17b94a
 {
17b94a
@@ -2969,11 +2992,85 @@ out:
17b94a
     return ret;
17b94a
 }
17b94a
 
17b94a
+static int32_t
17b94a
+pl_request_link_count(dict_t **pxdata)
17b94a
+{
17b94a
+    dict_t *xdata;
17b94a
+
17b94a
+    xdata = *pxdata;
17b94a
+    if (xdata == NULL) {
17b94a
+        xdata = dict_new();
17b94a
+        if (xdata == NULL) {
17b94a
+            return ENOMEM;
17b94a
+        }
17b94a
+    } else {
17b94a
+        dict_ref(xdata);
17b94a
+    }
17b94a
+
17b94a
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
17b94a
+        dict_unref(xdata);
17b94a
+        return ENOMEM;
17b94a
+    }
17b94a
+
17b94a
+    *pxdata = xdata;
17b94a
+
17b94a
+    return 0;
17b94a
+}
17b94a
+
17b94a
+static int32_t
17b94a
+pl_check_link_count(dict_t *xdata)
17b94a
+{
17b94a
+    int32_t count;
17b94a
+
17b94a
+    /* In case we are unable to read the link count from xdata, we take a
17b94a
+     * conservative approach and return -2, which will prevent the inode from
17b94a
+     * being considered deleted. In fact it will cause link tracking for this
17b94a
+     * inode to be disabled completely to avoid races. */
17b94a
+
17b94a
+    if (xdata == NULL) {
17b94a
+        return -2;
17b94a
+    }
17b94a
+
17b94a
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
17b94a
+        return -2;
17b94a
+    }
17b94a
+
17b94a
+    return count;
17b94a
+}
17b94a
+
17b94a
 int32_t
17b94a
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
17b94a
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
17b94a
               struct iatt *postparent)
17b94a
 {
17b94a
+    pl_inode_t *pl_inode;
17b94a
+
17b94a
+    if (op_ret >= 0) {
17b94a
+        pl_inode = pl_inode_get(this, inode, NULL);
17b94a
+        if (pl_inode == NULL) {
17b94a
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
17b94a
+                            NULL);
17b94a
+            return 0;
17b94a
+        }
17b94a
+
17b94a
+        pthread_mutex_lock(&pl_inode->mutex);
17b94a
+
17b94a
+        /* We only update the link count if we previously didn't know it.
17b94a
+         * Doing it always can lead to races since lookup is not executed
17b94a
+         * atomically most of the times. */
17b94a
+        if (pl_inode->links == -2) {
17b94a
+            pl_inode->links = pl_check_link_count(xdata);
17b94a
+            if (buf->ia_type == IA_IFDIR) {
17b94a
+                /* Directories have at least 2 links. To avoid special handling
17b94a
+                 * for directories, we simply decrement the value here to make
17b94a
+                 * them equivalent to regular files. */
17b94a
+                pl_inode->links--;
17b94a
+            }
17b94a
+        }
17b94a
+
17b94a
+        pthread_mutex_unlock(&pl_inode->mutex);
17b94a
+    }
17b94a
+
17b94a
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
17b94a
                     postparent);
17b94a
     return 0;
17b94a
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
17b94a
 int32_t
17b94a
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
17b94a
 {
17b94a
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
17b94a
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
17b94a
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
17b94a
+    int32_t error;
17b94a
+
17b94a
+    error = pl_request_link_count(&xdata);
17b94a
+    if (error == 0) {
17b94a
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
17b94a
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
17b94a
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
17b94a
+        dict_unref(xdata);
17b94a
+    } else {
17b94a
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
17b94a
+    }
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -3792,6 +3897,10 @@ unlock:
17b94a
             gf_proc_dump_write("posixlk-count", "%d", count);
17b94a
             __dump_posixlks(pl_inode);
17b94a
         }
17b94a
+
17b94a
+        gf_proc_dump_write("links", "%d", pl_inode->links);
17b94a
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
17b94a
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
17b94a
     }
17b94a
     pthread_mutex_unlock(&pl_inode->mutex);
17b94a
 
17b94a
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
17b94a
               struct iatt *postoldparent, struct iatt *prenewparent,
17b94a
               struct iatt *postnewparent, dict_t *xdata)
17b94a
 {
17b94a
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
17b94a
+
17b94a
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
17b94a
                     postoldparent, prenewparent, postnewparent, xdata);
17b94a
+
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -4146,10 +4258,15 @@ int32_t
17b94a
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
17b94a
           dict_t *xdata)
17b94a
 {
17b94a
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
17b94a
+    int32_t error;
17b94a
+
17b94a
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
17b94a
+                            pl_rename_cbk, oldloc, newloc, xdata);
17b94a
+    if (error > 0) {
17b94a
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
17b94a
+                            NULL, NULL);
17b94a
+    }
17b94a
 
17b94a
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
17b94a
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
17b94a
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
17b94a
               dict_t *xdata)
17b94a
 {
17b94a
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
17b94a
+
17b94a
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
17b94a
                     postparent, xdata);
17b94a
+
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -4282,9 +4402,14 @@ int32_t
17b94a
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
17b94a
           dict_t *xdata)
17b94a
 {
17b94a
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
17b94a
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
17b94a
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
17b94a
+    int32_t error;
17b94a
+
17b94a
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
17b94a
+                            pl_unlink_cbk, loc, xflag, xdata);
17b94a
+    if (error > 0) {
17b94a
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
17b94a
+    }
17b94a
+
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
17b94a
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
17b94a
              dict_t *xdata)
17b94a
 {
17b94a
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
17b94a
+
17b94a
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
17b94a
                                postparent, xdata);
17b94a
+
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -4360,9 +4488,14 @@ int
17b94a
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
17b94a
          dict_t *xdata)
17b94a
 {
17b94a
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
17b94a
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
17b94a
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
17b94a
+    int32_t error;
17b94a
+
17b94a
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
17b94a
+                            pl_rmdir_cbk, loc, xflags, xdata);
17b94a
+    if (error > 0) {
17b94a
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
17b94a
+    }
17b94a
+
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
17b94a
             int32_t op_errno, inode_t *inode, struct iatt *buf,
17b94a
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
17b94a
 {
17b94a
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
17b94a
+
17b94a
+    if (op_ret >= 0) {
17b94a
+        pthread_mutex_lock(&pl_inode->mutex);
17b94a
+
17b94a
+        /* TODO: can happen pl_inode->links == 0 ? */
17b94a
+        if (pl_inode->links >= 0) {
17b94a
+            pl_inode->links++;
17b94a
+        }
17b94a
+
17b94a
+        pthread_mutex_unlock(&pl_inode->mutex);
17b94a
+    }
17b94a
+
17b94a
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
17b94a
                                preparent, postparent, xdata);
17b94a
     return 0;
17b94a
@@ -4401,9 +4547,18 @@ int
17b94a
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
17b94a
         dict_t *xdata)
17b94a
 {
17b94a
+    pl_inode_t *pl_inode;
17b94a
+
17b94a
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
17b94a
+    if (pl_inode == NULL) {
17b94a
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
17b94a
+                            NULL);
17b94a
+        return 0;
17b94a
+    }
17b94a
+
17b94a
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
17b94a
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
17b94a
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
17b94a
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
17b94a
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
17b94a
     return 0;
17b94a
 }
17b94a
 
17b94a
-- 
17b94a
1.8.3.1
17b94a