Blob Blame History Raw
From 3f6ff474db3934f43d9963dfe4dda7d201211e75 Mon Sep 17 00:00:00 2001
From: Xavi Hernandez <xhernandez@redhat.com>
Date: Fri, 12 Jun 2020 00:06:36 +0200
Subject: [PATCH 455/456] locks: prevent deletion of locked entries

To keep consistency inside transactions started by locking an entry or
an inode, this change delays the removal of entries that are currently
locked by one or more clients. Once all locks are released, the removal
is processed.

It has also been improved the detection of stale inodes in the locking
code of EC.

>Upstream patch - https://review.gluster.org/#/c/glusterfs/+/20025/
>Fixes: #990

Change-Id: Ic8ba23d9480f80c7f74e7a310bf8a15922320fd5
BUG: 1812789
Signed-off-by: Xavi Hernandez <xhernandez@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/206442
Tested-by: RHGS Build Bot <nigelb@redhat.com>
---
 xlators/cluster/ec/src/ec-locks.c    |  69 ++++++--
 xlators/features/locks/src/common.c  | 316 ++++++++++++++++++++++++++++++++++-
 xlators/features/locks/src/common.h  |  43 +++++
 xlators/features/locks/src/entrylk.c |  19 +--
 xlators/features/locks/src/inodelk.c | 150 ++++++++++-------
 xlators/features/locks/src/locks.h   |  23 ++-
 xlators/features/locks/src/posix.c   | 183 ++++++++++++++++++--
 7 files changed, 689 insertions(+), 114 deletions(-)

diff --git a/xlators/cluster/ec/src/ec-locks.c b/xlators/cluster/ec/src/ec-locks.c
index ffcac07..db86296 100644
--- a/xlators/cluster/ec/src/ec-locks.c
+++ b/xlators/cluster/ec/src/ec-locks.c
@@ -28,9 +28,36 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
     ec_t *ec = fop->xl->private;
     ec_cbk_data_t *ans = NULL;
     ec_cbk_data_t *cbk = NULL;
-    uintptr_t locked = 0, notlocked = 0;
+    uintptr_t locked = 0;
+    int32_t good = 0;
+    int32_t eagain = 0;
+    int32_t estale = 0;
     int32_t error = -1;
 
+    /* There are some errors that we'll handle in an special way while trying
+     * to acquire a lock.
+     *
+     *   EAGAIN:  If it's found during a parallel non-blocking lock request, we
+     *            consider that there's contention on the inode, so we consider
+     *            the acquisition a failure and try again with a sequential
+     *            blocking lock request. This will ensure that we get a lock on
+     *            as many bricks as possible (ignoring EAGAIN here would cause
+     *            unnecessary triggers of self-healing).
+     *
+     *            If it's found during a sequential blocking lock request, it's
+     *            considered an error. Lock will only succeed if there are
+     *            enough other bricks locked.
+     *
+     *   ESTALE:  This can appear during parallel or sequential lock request if
+     *            the inode has just been unlinked. We consider this error is
+     *            not recoverable, but we also don't consider it as fatal. So,
+     *            if it happens during parallel lock, we won't attempt a
+     *            sequential one unless there are EAGAIN errors on other
+     *            bricks (and are enough to form a quorum), but if we reach
+     *            quorum counting the ESTALE bricks, we consider the whole
+     *            result of the operation is ESTALE instead of EIO.
+     */
+
     list_for_each_entry(ans, &fop->cbk_list, list)
     {
         if (ans->op_ret >= 0) {
@@ -38,24 +65,23 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
                 error = EIO;
             }
             locked |= ans->mask;
+            good = ans->count;
             cbk = ans;
-        } else {
-            if (ans->op_errno == EAGAIN) {
-                switch (fop->uint32) {
-                    case EC_LOCK_MODE_NONE:
-                    case EC_LOCK_MODE_ALL:
-                        /* Goal is to treat non-blocking lock as failure
-                         * even if there is a single EAGAIN*/
-                        notlocked |= ans->mask;
-                        break;
-                }
-            }
+        } else if (ans->op_errno == ESTALE) {
+            estale += ans->count;
+        } else if ((ans->op_errno == EAGAIN) &&
+                   (fop->uint32 != EC_LOCK_MODE_INC)) {
+            eagain += ans->count;
         }
     }
 
     if (error == -1) {
-        if (gf_bits_count(locked | notlocked) >= ec->fragments) {
-            if (notlocked == 0) {
+        /* If we have enough quorum with succeeded and EAGAIN answers, we
+         * ignore for now any ESTALE answer. If there are EAGAIN answers,
+         * we retry with a sequential blocking lock request if needed.
+         * Otherwise we succeed. */
+        if ((good + eagain) >= ec->fragments) {
+            if (eagain == 0) {
                 if (fop->answer == NULL) {
                     fop->answer = cbk;
                 }
@@ -68,21 +94,28 @@ ec_lock_check(ec_fop_data_t *fop, uintptr_t *mask)
                     case EC_LOCK_MODE_NONE:
                         error = EAGAIN;
                         break;
-
                     case EC_LOCK_MODE_ALL:
                         fop->uint32 = EC_LOCK_MODE_INC;
                         break;
-
                     default:
+                        /* This shouldn't happen because eagain cannot be > 0
+                         * when fop->uint32 is EC_LOCK_MODE_INC. */
                         error = EIO;
                         break;
                 }
             }
         } else {
-            if (fop->answer && fop->answer->op_ret < 0)
+            /* We have been unable to find enough candidates that will be able
+             * to take the lock. If we have quorum on some answer, we return
+             * it. Otherwise we check if ESTALE answers allow us to reach
+             * quorum. If so, we return ESTALE. */
+            if (fop->answer && fop->answer->op_ret < 0) {
                 error = fop->answer->op_errno;
-            else
+            } else if ((good + eagain + estale) >= ec->fragments) {
+                error = ESTALE;
+            } else {
                 error = EIO;
+            }
         }
     }
 
diff --git a/xlators/features/locks/src/common.c b/xlators/features/locks/src/common.c
index 1406e70..0c52853 100644
--- a/xlators/features/locks/src/common.c
+++ b/xlators/features/locks/src/common.c
@@ -462,11 +462,16 @@ pl_inode_get(xlator_t *this, inode_t *inode, pl_local_t *local)
         INIT_LIST_HEAD(&pl_inode->blocked_calls);
         INIT_LIST_HEAD(&pl_inode->metalk_list);
         INIT_LIST_HEAD(&pl_inode->queued_locks);
+        INIT_LIST_HEAD(&pl_inode->waiting);
         gf_uuid_copy(pl_inode->gfid, inode->gfid);
 
         pl_inode->check_mlock_info = _gf_true;
         pl_inode->mlock_enforced = _gf_false;
 
+        /* -2 means never looked up. -1 means something went wrong and link
+         * tracking is disabled. */
+        pl_inode->links = -2;
+
         ret = __inode_ctx_put(inode, this, (uint64_t)(long)(pl_inode));
         if (ret) {
             pthread_mutex_destroy(&pl_inode->mutex);
@@ -1276,4 +1281,313 @@ pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd)
     }
 
     return 0;
-}
\ No newline at end of file
+}
+
+gf_boolean_t
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client)
+{
+    if (client && (client->opversion < GD_OP_VERSION_7_0)) {
+        return _gf_true;
+    }
+
+    if (is_lk_owner_null(owner)) {
+        return _gf_false;
+    }
+    return _gf_true;
+}
+
+static int32_t
+pl_inode_from_loc(loc_t *loc, inode_t **pinode)
+{
+    inode_t *inode = NULL;
+    int32_t error = 0;
+
+    if (loc->inode != NULL) {
+        inode = inode_ref(loc->inode);
+        goto done;
+    }
+
+    if (loc->parent == NULL) {
+        error = EINVAL;
+        goto done;
+    }
+
+    if (!gf_uuid_is_null(loc->gfid)) {
+        inode = inode_find(loc->parent->table, loc->gfid);
+        if (inode != NULL) {
+            goto done;
+        }
+    }
+
+    if (loc->name == NULL) {
+        error = EINVAL;
+        goto done;
+    }
+
+    inode = inode_grep(loc->parent->table, loc->parent, loc->name);
+    if (inode == NULL) {
+        /* We haven't found any inode. This means that the file doesn't exist
+         * or that even if it exists, we don't have any knowledge about it, so
+         * we don't have locks on it either, which is fine for our purposes. */
+        goto done;
+    }
+
+done:
+    *pinode = inode;
+
+    return error;
+}
+
+static gf_boolean_t
+pl_inode_has_owners(xlator_t *xl, client_t *client, pl_inode_t *pl_inode,
+                    struct timespec *now, struct list_head *contend)
+{
+    pl_dom_list_t *dom;
+    pl_inode_lock_t *lock;
+    gf_boolean_t has_owners = _gf_false;
+
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
+    {
+        list_for_each_entry(lock, &dom->inodelk_list, list)
+        {
+            /* If the lock belongs to the same client, we assume it's related
+             * to the same operation, so we allow the removal to continue. */
+            if (lock->client == client) {
+                continue;
+            }
+            /* If the lock belongs to an internal process, we don't block the
+             * removal. */
+            if (lock->client_pid < 0) {
+                continue;
+            }
+            if (contend == NULL) {
+                return _gf_true;
+            }
+            has_owners = _gf_true;
+            inodelk_contention_notify_check(xl, lock, now, contend);
+        }
+    }
+
+    return has_owners;
+}
+
+int32_t
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
+                        pl_inode_t **ppl_inode, struct list_head *contend)
+{
+    struct timespec now;
+    inode_t *inode;
+    pl_inode_t *pl_inode;
+    int32_t error;
+
+    pl_inode = NULL;
+
+    error = pl_inode_from_loc(loc, &inode);
+    if ((error != 0) || (inode == NULL)) {
+        goto done;
+    }
+
+    pl_inode = pl_inode_get(xl, inode, NULL);
+    if (pl_inode == NULL) {
+        inode_unref(inode);
+        error = ENOMEM;
+        goto done;
+    }
+
+    /* pl_inode_from_loc() already increments ref count for inode, so
+     * we only assign here our reference. */
+    pl_inode->inode = inode;
+
+    timespec_now(&now);
+
+    pthread_mutex_lock(&pl_inode->mutex);
+
+    if (pl_inode->removed) {
+        error = ESTALE;
+        goto unlock;
+    }
+
+    if (pl_inode_has_owners(xl, frame->root->client, pl_inode, &now, contend)) {
+        error = -1;
+        /* We skip the unlock here because the caller must create a stub when
+         * we return -1 and do a call to pl_inode_remove_complete(), which
+         * assumes the lock is still acquired and will release it once
+         * everything else is prepared. */
+        goto done;
+    }
+
+    pl_inode->is_locked = _gf_true;
+    pl_inode->remove_running++;
+
+unlock:
+    pthread_mutex_unlock(&pl_inode->mutex);
+
+done:
+    *ppl_inode = pl_inode;
+
+    return error;
+}
+
+int32_t
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
+                         struct list_head *contend)
+{
+    pl_inode_lock_t *lock;
+    int32_t error = -1;
+
+    if (stub != NULL) {
+        list_add_tail(&stub->list, &pl_inode->waiting);
+        pl_inode->is_locked = _gf_true;
+    } else {
+        error = ENOMEM;
+
+        while (!list_empty(contend)) {
+            lock = list_first_entry(contend, pl_inode_lock_t, list);
+            list_del_init(&lock->list);
+            __pl_inodelk_unref(lock);
+        }
+    }
+
+    pthread_mutex_unlock(&pl_inode->mutex);
+
+    if (error < 0) {
+        inodelk_contention_notify(xl, contend);
+    }
+
+    inode_unref(pl_inode->inode);
+
+    return error;
+}
+
+void
+pl_inode_remove_wake(struct list_head *list)
+{
+    call_stub_t *stub;
+
+    while (!list_empty(list)) {
+        stub = list_first_entry(list, call_stub_t, list);
+        list_del_init(&stub->list);
+
+        call_resume(stub);
+    }
+}
+
+void
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error)
+{
+    struct list_head contend, granted;
+    struct timespec now;
+    pl_dom_list_t *dom;
+
+    if (pl_inode == NULL) {
+        return;
+    }
+
+    INIT_LIST_HEAD(&contend);
+    INIT_LIST_HEAD(&granted);
+    timespec_now(&now);
+
+    pthread_mutex_lock(&pl_inode->mutex);
+
+    if (error == 0) {
+        if (pl_inode->links >= 0) {
+            pl_inode->links--;
+        }
+        if (pl_inode->links == 0) {
+            pl_inode->removed = _gf_true;
+        }
+    }
+
+    pl_inode->remove_running--;
+
+    if ((pl_inode->remove_running == 0) && list_empty(&pl_inode->waiting)) {
+        pl_inode->is_locked = _gf_false;
+
+        list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
+        {
+            __grant_blocked_inode_locks(xl, pl_inode, &granted, dom, &now,
+                                        &contend);
+        }
+    }
+
+    pthread_mutex_unlock(&pl_inode->mutex);
+
+    unwind_granted_inodes(xl, pl_inode, &granted);
+
+    inodelk_contention_notify(xl, &contend);
+
+    inode_unref(pl_inode->inode);
+}
+
+void
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
+                         struct list_head *list)
+{
+    call_stub_t *stub, *tmp;
+
+    if (!pl_inode->is_locked) {
+        return;
+    }
+
+    list_for_each_entry_safe(stub, tmp, &pl_inode->waiting, list)
+    {
+        if (!pl_inode_has_owners(xl, stub->frame->root->client, pl_inode, NULL,
+                                 NULL)) {
+            list_move_tail(&stub->list, list);
+        }
+    }
+}
+
+/* This function determines if an inodelk attempt can be done now or it needs
+ * to wait.
+ *
+ * Possible return values:
+ *   < 0: An error occurred. Currently only -ESTALE can be returned if the
+ *        inode has been deleted previously by unlink/rmdir/rename
+ *   = 0: The lock can be attempted.
+ *   > 0: The lock needs to wait because a conflicting remove operation is
+ *        ongoing.
+ */
+int32_t
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock)
+{
+    pl_dom_list_t *dom;
+    pl_inode_lock_t *ilock;
+
+    /* If the inode has been deleted, we won't allow any lock. */
+    if (pl_inode->removed) {
+        return -ESTALE;
+    }
+
+    /* We only synchronize with locks made for regular operations coming from
+     * the user. Locks done for internal purposes are hard to control and could
+     * lead to long delays or deadlocks quite easily. */
+    if (lock->client_pid < 0) {
+        return 0;
+    }
+    if (!pl_inode->is_locked) {
+        return 0;
+    }
+    if (pl_inode->remove_running > 0) {
+        return 1;
+    }
+
+    list_for_each_entry(dom, &pl_inode->dom_list, inode_list)
+    {
+        list_for_each_entry(ilock, &dom->inodelk_list, list)
+        {
+            /* If a lock from the same client is already granted, we allow this
+             * one to continue. This is necessary to prevent deadlocks when
+             * multiple locks are taken for the same operation.
+             *
+             * On the other side it's unlikely that the same client sends
+             * completely unrelated locks for the same inode.
+             */
+            if (ilock->client == lock->client) {
+                return 0;
+            }
+        }
+    }
+
+    return 1;
+}
diff --git a/xlators/features/locks/src/common.h b/xlators/features/locks/src/common.h
index ea86b96..6c81ac3 100644
--- a/xlators/features/locks/src/common.h
+++ b/xlators/features/locks/src/common.h
@@ -105,6 +105,15 @@ void
 __pl_inodelk_unref(pl_inode_lock_t *lock);
 
 void
+__grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
+                            struct list_head *granted, pl_dom_list_t *dom,
+                            struct timespec *now, struct list_head *contend);
+
+void
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
+                      struct list_head *granted);
+
+void
 grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
                           pl_dom_list_t *dom, struct timespec *now,
                           struct list_head *contend);
@@ -204,6 +213,16 @@ pl_metalock_is_active(pl_inode_t *pl_inode);
 void
 __pl_queue_lock(pl_inode_t *pl_inode, posix_lock_t *reqlock);
 
+void
+inodelk_contention_notify_check(xlator_t *xl, pl_inode_lock_t *lock,
+                                struct timespec *now,
+                                struct list_head *contend);
+
+void
+entrylk_contention_notify_check(xlator_t *xl, pl_entry_lock_t *lock,
+                                struct timespec *now,
+                                struct list_head *contend);
+
 gf_boolean_t
 pl_does_monkey_want_stuck_lock();
 
@@ -216,4 +235,28 @@ pl_clean_local(pl_local_t *local);
 int
 pl_local_init(call_frame_t *frame, xlator_t *this, loc_t *loc, fd_t *fd);
 
+gf_boolean_t
+pl_is_lk_owner_valid(gf_lkowner_t *owner, client_t *client);
+
+int32_t
+pl_inode_remove_prepare(xlator_t *xl, call_frame_t *frame, loc_t *loc,
+                        pl_inode_t **ppl_inode, struct list_head *contend);
+
+int32_t
+pl_inode_remove_complete(xlator_t *xl, pl_inode_t *pl_inode, call_stub_t *stub,
+                         struct list_head *contend);
+
+void
+pl_inode_remove_wake(struct list_head *list);
+
+void
+pl_inode_remove_cbk(xlator_t *xl, pl_inode_t *pl_inode, int32_t error);
+
+void
+pl_inode_remove_unlocked(xlator_t *xl, pl_inode_t *pl_inode,
+                         struct list_head *list);
+
+int32_t
+pl_inode_remove_inodelk(pl_inode_t *pl_inode, pl_inode_lock_t *lock);
+
 #endif /* __COMMON_H__ */
diff --git a/xlators/features/locks/src/entrylk.c b/xlators/features/locks/src/entrylk.c
index 93c649c..b97836f 100644
--- a/xlators/features/locks/src/entrylk.c
+++ b/xlators/features/locks/src/entrylk.c
@@ -197,9 +197,9 @@ out:
     return revoke_lock;
 }
 
-static gf_boolean_t
-__entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
-                                  struct timespec *now)
+void
+entrylk_contention_notify_check(xlator_t *this, pl_entry_lock_t *lock,
+                                struct timespec *now, struct list_head *contend)
 {
     posix_locks_private_t *priv;
     int64_t elapsed;
@@ -209,7 +209,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
     /* If this lock is in a list, it means that we are about to send a
      * notification for it, so no need to do anything else. */
     if (!list_empty(&lock->contend)) {
-        return _gf_false;
+        return;
     }
 
     elapsed = now->tv_sec;
@@ -218,7 +218,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
         elapsed--;
     }
     if (elapsed < priv->notify_contention_delay) {
-        return _gf_false;
+        return;
     }
 
     /* All contention notifications will be sent outside of the locked
@@ -231,7 +231,7 @@ __entrylk_needs_contention_notify(xlator_t *this, pl_entry_lock_t *lock,
 
     lock->contention_time = *now;
 
-    return _gf_true;
+    list_add_tail(&lock->contend, contend);
 }
 
 void
@@ -325,9 +325,7 @@ __entrylk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_entry_lock_t *lock,
                     break;
                 }
             }
-            if (__entrylk_needs_contention_notify(this, tmp, now)) {
-                list_add_tail(&tmp->contend, contend);
-            }
+            entrylk_contention_notify_check(this, tmp, now, contend);
         }
     }
 
@@ -690,10 +688,9 @@ __grant_blocked_entry_locks(xlator_t *this, pl_inode_t *pl_inode,
         bl_ret = __lock_entrylk(bl->this, pl_inode, bl, 0, dom, now, contend);
 
         if (bl_ret == 0) {
-            list_add(&bl->blocked_locks, granted);
+            list_add_tail(&bl->blocked_locks, granted);
         }
     }
-    return;
 }
 
 /* Grants locks if possible which are blocked on a lock */
diff --git a/xlators/features/locks/src/inodelk.c b/xlators/features/locks/src/inodelk.c
index 24dee49..1a07243 100644
--- a/xlators/features/locks/src/inodelk.c
+++ b/xlators/features/locks/src/inodelk.c
@@ -231,9 +231,9 @@ out:
     return revoke_lock;
 }
 
-static gf_boolean_t
-__inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
-                                  struct timespec *now)
+void
+inodelk_contention_notify_check(xlator_t *this, pl_inode_lock_t *lock,
+                                struct timespec *now, struct list_head *contend)
 {
     posix_locks_private_t *priv;
     int64_t elapsed;
@@ -243,7 +243,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
     /* If this lock is in a list, it means that we are about to send a
      * notification for it, so no need to do anything else. */
     if (!list_empty(&lock->contend)) {
-        return _gf_false;
+        return;
     }
 
     elapsed = now->tv_sec;
@@ -252,7 +252,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
         elapsed--;
     }
     if (elapsed < priv->notify_contention_delay) {
-        return _gf_false;
+        return;
     }
 
     /* All contention notifications will be sent outside of the locked
@@ -265,7 +265,7 @@ __inodelk_needs_contention_notify(xlator_t *this, pl_inode_lock_t *lock,
 
     lock->contention_time = *now;
 
-    return _gf_true;
+    list_add_tail(&lock->contend, contend);
 }
 
 void
@@ -353,9 +353,7 @@ __inodelk_grantable(xlator_t *this, pl_dom_list_t *dom, pl_inode_lock_t *lock,
                     break;
                 }
             }
-            if (__inodelk_needs_contention_notify(this, l, now)) {
-                list_add_tail(&l->contend, contend);
-            }
+            inodelk_contention_notify_check(this, l, now, contend);
         }
     }
 
@@ -435,12 +433,17 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
                struct list_head *contend)
 {
     pl_inode_lock_t *conf = NULL;
-    int ret = -EINVAL;
+    int ret;
 
-    conf = __inodelk_grantable(this, dom, lock, now, contend);
-    if (conf) {
-        ret = __lock_blocked_add(this, dom, lock, can_block);
-        goto out;
+    ret = pl_inode_remove_inodelk(pl_inode, lock);
+    if (ret < 0) {
+        return ret;
+    }
+    if (ret == 0) {
+        conf = __inodelk_grantable(this, dom, lock, now, contend);
+    }
+    if ((ret > 0) || (conf != NULL)) {
+        return __lock_blocked_add(this, dom, lock, can_block);
     }
 
     /* To prevent blocked locks starvation, check if there are any blocked
@@ -462,17 +465,13 @@ __lock_inodelk(xlator_t *this, pl_inode_t *pl_inode, pl_inode_lock_t *lock,
                    "starvation");
         }
 
-        ret = __lock_blocked_add(this, dom, lock, can_block);
-        goto out;
+        return __lock_blocked_add(this, dom, lock, can_block);
     }
     __pl_inodelk_ref(lock);
     gettimeofday(&lock->granted_time, NULL);
     list_add(&lock->list, &dom->inodelk_list);
 
-    ret = 0;
-
-out:
-    return ret;
+    return 0;
 }
 
 /* Return true if the two inodelks have exactly same lock boundaries */
@@ -529,12 +528,11 @@ out:
     return conf;
 }
 
-static void
+void
 __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
                             struct list_head *granted, pl_dom_list_t *dom,
                             struct timespec *now, struct list_head *contend)
 {
-    int bl_ret = 0;
     pl_inode_lock_t *bl = NULL;
     pl_inode_lock_t *tmp = NULL;
 
@@ -547,52 +545,48 @@ __grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
     {
         list_del_init(&bl->blocked_locks);
 
-        bl_ret = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
+        bl->status = __lock_inodelk(this, pl_inode, bl, 1, dom, now, contend);
 
-        if (bl_ret == 0) {
-            list_add(&bl->blocked_locks, granted);
+        if (bl->status != -EAGAIN) {
+            list_add_tail(&bl->blocked_locks, granted);
         }
     }
-    return;
 }
 
-/* Grant all inodelks blocked on a lock */
 void
-grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
-                          pl_dom_list_t *dom, struct timespec *now,
-                          struct list_head *contend)
+unwind_granted_inodes(xlator_t *this, pl_inode_t *pl_inode,
+                      struct list_head *granted)
 {
-    struct list_head granted;
     pl_inode_lock_t *lock;
     pl_inode_lock_t *tmp;
+    int32_t op_ret;
+    int32_t op_errno;
 
-    INIT_LIST_HEAD(&granted);
-
-    pthread_mutex_lock(&pl_inode->mutex);
-    {
-        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
-                                    contend);
-    }
-    pthread_mutex_unlock(&pl_inode->mutex);
-
-    list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
+    list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
     {
-        gf_log(this->name, GF_LOG_TRACE,
-               "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64 " => Granted",
-               lock->fl_type == F_UNLCK ? "Unlock" : "Lock", lock->client_pid,
-               lkowner_utoa(&lock->owner), lock->user_flock.l_start,
-               lock->user_flock.l_len);
-
+        if (lock->status == 0) {
+            op_ret = 0;
+            op_errno = 0;
+            gf_log(this->name, GF_LOG_TRACE,
+                   "%s (pid=%d) (lk-owner=%s) %" PRId64 " - %" PRId64
+                   " => Granted",
+                   lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
+                   lock->client_pid, lkowner_utoa(&lock->owner),
+                   lock->user_flock.l_start, lock->user_flock.l_len);
+        } else {
+            op_ret = -1;
+            op_errno = -lock->status;
+        }
         pl_trace_out(this, lock->frame, NULL, NULL, F_SETLKW, &lock->user_flock,
-                     0, 0, lock->volume);
+                     op_ret, op_errno, lock->volume);
 
-        STACK_UNWIND_STRICT(inodelk, lock->frame, 0, 0, NULL);
+        STACK_UNWIND_STRICT(inodelk, lock->frame, op_ret, op_errno, NULL);
         lock->frame = NULL;
     }
 
     pthread_mutex_lock(&pl_inode->mutex);
     {
-        list_for_each_entry_safe(lock, tmp, &granted, blocked_locks)
+        list_for_each_entry_safe(lock, tmp, granted, blocked_locks)
         {
             list_del_init(&lock->blocked_locks);
             __pl_inodelk_unref(lock);
@@ -601,6 +595,26 @@ grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
     pthread_mutex_unlock(&pl_inode->mutex);
 }
 
+/* Grant all inodelks blocked on a lock */
+void
+grant_blocked_inode_locks(xlator_t *this, pl_inode_t *pl_inode,
+                          pl_dom_list_t *dom, struct timespec *now,
+                          struct list_head *contend)
+{
+    struct list_head granted;
+
+    INIT_LIST_HEAD(&granted);
+
+    pthread_mutex_lock(&pl_inode->mutex);
+    {
+        __grant_blocked_inode_locks(this, pl_inode, &granted, dom, now,
+                                    contend);
+    }
+    pthread_mutex_unlock(&pl_inode->mutex);
+
+    unwind_granted_inodes(this, pl_inode, &granted);
+}
+
 static void
 pl_inodelk_log_cleanup(pl_inode_lock_t *lock)
 {
@@ -662,7 +676,7 @@ pl_inodelk_client_cleanup(xlator_t *this, pl_ctx_t *ctx)
                  * and blocked lists, then this means that a parallel
                  * unlock on another inodelk (L2 say) may have 'granted'
                  * L1 and added it to 'granted' list in
-                 * __grant_blocked_node_locks() (although using the
+                 * __grant_blocked_inode_locks() (although using the
                  * 'blocked_locks' member). In that case, the cleanup
                  * codepath must try and grant other overlapping
                  * blocked inodelks from other clients, now that L1 is
@@ -747,6 +761,7 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
     gf_boolean_t need_inode_unref = _gf_false;
     struct list_head *pcontend = NULL;
     struct list_head contend;
+    struct list_head wake;
     struct timespec now = {};
     short fl_type;
 
@@ -798,6 +813,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
         timespec_now(&now);
     }
 
+    INIT_LIST_HEAD(&wake);
+
     if (ctx)
         pthread_mutex_lock(&ctx->lock);
     pthread_mutex_lock(&pl_inode->mutex);
@@ -820,18 +837,17 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
                        lock->fl_type == F_UNLCK ? "Unlock" : "Lock",
                        lock->client_pid, lkowner_utoa(&lock->owner),
                        lock->user_flock.l_start, lock->user_flock.l_len);
-                if (can_block)
+                if (can_block) {
                     unref = _gf_false;
-                /* For all but the case where a non-blocking
-                 * lock attempt fails, the extra ref taken at
-                 * the start of this function must be negated.
-                 */
-                else
-                    need_inode_unref = _gf_true;
+                }
             }
-
-            if (ctx && (!ret || can_block))
+            /* For all but the case where a non-blocking lock attempt fails
+             * with -EAGAIN, the extra ref taken at the start of this function
+             * must be negated. */
+            need_inode_unref = (ret != 0) && ((ret != -EAGAIN) || !can_block);
+            if (ctx && !need_inode_unref) {
                 list_add_tail(&lock->client_list, &ctx->inodelk_lockers);
+            }
         } else {
             /* Irrespective of whether unlock succeeds or not,
              * the extra inode ref that was done at the start of
@@ -849,6 +865,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
             list_del_init(&retlock->client_list);
             __pl_inodelk_unref(retlock);
 
+            pl_inode_remove_unlocked(this, pl_inode, &wake);
+
             ret = 0;
         }
     out:
@@ -859,6 +877,8 @@ pl_inode_setlk(xlator_t *this, pl_ctx_t *ctx, pl_inode_t *pl_inode,
     if (ctx)
         pthread_mutex_unlock(&ctx->lock);
 
+    pl_inode_remove_wake(&wake);
+
     /* The following (extra) unref corresponds to the ref that
      * was done at the time the lock was granted.
      */
@@ -1033,10 +1053,14 @@ pl_common_inodelk(call_frame_t *frame, xlator_t *this, const char *volume,
                                  inode);
 
             if (ret < 0) {
-                if ((can_block) && (F_UNLCK != lock_type)) {
-                    goto out;
+                if (ret == -EAGAIN) {
+                    if (can_block && (F_UNLCK != lock_type)) {
+                        goto out;
+                    }
+                    gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
+                } else {
+                    gf_log(this->name, GF_LOG_TRACE, "returning %d", ret);
                 }
-                gf_log(this->name, GF_LOG_TRACE, "returning EAGAIN");
                 op_errno = -ret;
                 goto unwind;
             }
diff --git a/xlators/features/locks/src/locks.h b/xlators/features/locks/src/locks.h
index aa267de..6666feb 100644
--- a/xlators/features/locks/src/locks.h
+++ b/xlators/features/locks/src/locks.h
@@ -102,6 +102,9 @@ struct __pl_inode_lock {
 
     struct list_head client_list; /* list of all locks from a client */
     short fl_type;
+
+    int32_t status; /* Error code when we try to grant a lock in blocked
+                       state */
 };
 typedef struct __pl_inode_lock pl_inode_lock_t;
 
@@ -164,13 +167,14 @@ struct __pl_inode {
     struct list_head rw_list;            /* list of waiting r/w requests */
     struct list_head reservelk_list;     /* list of reservelks */
     struct list_head blocked_reservelks; /* list of blocked reservelks */
-    struct list_head
-        blocked_calls; /* List of blocked lock calls while a reserve is held*/
-    struct list_head metalk_list; /* Meta lock list */
-                                  /* This is to store the incoming lock
-                                     requests while meta lock is enabled */
-    struct list_head queued_locks;
-    int mandatory; /* if mandatory locking is enabled */
+    struct list_head blocked_calls;      /* List of blocked lock calls while a
+                                            reserve is held*/
+    struct list_head metalk_list;        /* Meta lock list */
+    struct list_head queued_locks;       /* This is to store the incoming lock
+                                            requests while meta lock is enabled */
+    struct list_head waiting; /* List of pending fops waiting to unlink/rmdir
+                                 the inode. */
+    int mandatory;            /* if mandatory locking is enabled */
 
     inode_t *refkeeper; /* hold refs on an inode while locks are
                            held to prevent pruning */
@@ -197,6 +201,11 @@ struct __pl_inode {
     */
     int fop_wind_count;
     pthread_cond_t check_fop_wind_count;
+
+    int32_t links;           /* Number of hard links the inode has. */
+    uint32_t remove_running; /* Number of remove operations running. */
+    gf_boolean_t is_locked;  /* Regular locks will be blocked. */
+    gf_boolean_t removed;    /* The inode has been deleted. */
 };
 typedef struct __pl_inode pl_inode_t;
 
diff --git a/xlators/features/locks/src/posix.c b/xlators/features/locks/src/posix.c
index 7887b82..5ae0125 100644
--- a/xlators/features/locks/src/posix.c
+++ b/xlators/features/locks/src/posix.c
@@ -147,6 +147,29 @@ fetch_pathinfo(xlator_t *, inode_t *, int32_t *, char **);
         }                                                                      \
     } while (0)
 
+#define PL_INODE_REMOVE(_fop, _frame, _xl, _loc1, _loc2, _cont, _cbk,          \
+                        _args...)                                              \
+    ({                                                                         \
+        struct list_head contend;                                              \
+        pl_inode_t *__pl_inode;                                                \
+        call_stub_t *__stub;                                                   \
+        int32_t __error;                                                       \
+        INIT_LIST_HEAD(&contend);                                              \
+        __error = pl_inode_remove_prepare(_xl, _frame, _loc2 ? _loc2 : _loc1,  \
+                                          &__pl_inode, &contend);              \
+        if (__error < 0) {                                                     \
+            __stub = fop_##_fop##_stub(_frame, _cont, ##_args);                \
+            __error = pl_inode_remove_complete(_xl, __pl_inode, __stub,        \
+                                               &contend);                      \
+        } else if (__error == 0) {                                             \
+            PL_LOCAL_GET_REQUESTS(_frame, _xl, xdata, ((fd_t *)NULL), _loc1,   \
+                                  _loc2);                                      \
+            STACK_WIND_COOKIE(_frame, _cbk, __pl_inode, FIRST_CHILD(_xl),      \
+                              FIRST_CHILD(_xl)->fops->_fop, ##_args);          \
+        }                                                                      \
+        __error;                                                               \
+    })
+
 gf_boolean_t
 pl_has_xdata_requests(dict_t *xdata)
 {
@@ -2969,11 +2992,85 @@ out:
     return ret;
 }
 
+static int32_t
+pl_request_link_count(dict_t **pxdata)
+{
+    dict_t *xdata;
+
+    xdata = *pxdata;
+    if (xdata == NULL) {
+        xdata = dict_new();
+        if (xdata == NULL) {
+            return ENOMEM;
+        }
+    } else {
+        dict_ref(xdata);
+    }
+
+    if (dict_set_uint32(xdata, GET_LINK_COUNT, 0) != 0) {
+        dict_unref(xdata);
+        return ENOMEM;
+    }
+
+    *pxdata = xdata;
+
+    return 0;
+}
+
+static int32_t
+pl_check_link_count(dict_t *xdata)
+{
+    int32_t count;
+
+    /* In case we are unable to read the link count from xdata, we take a
+     * conservative approach and return -2, which will prevent the inode from
+     * being considered deleted. In fact it will cause link tracking for this
+     * inode to be disabled completely to avoid races. */
+
+    if (xdata == NULL) {
+        return -2;
+    }
+
+    if (dict_get_int32(xdata, GET_LINK_COUNT, &count) != 0) {
+        return -2;
+    }
+
+    return count;
+}
+
 int32_t
 pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
               int32_t op_errno, inode_t *inode, struct iatt *buf, dict_t *xdata,
               struct iatt *postparent)
 {
+    pl_inode_t *pl_inode;
+
+    if (op_ret >= 0) {
+        pl_inode = pl_inode_get(this, inode, NULL);
+        if (pl_inode == NULL) {
+            PL_STACK_UNWIND(lookup, xdata, frame, -1, ENOMEM, NULL, NULL, NULL,
+                            NULL);
+            return 0;
+        }
+
+        pthread_mutex_lock(&pl_inode->mutex);
+
+        /* We only update the link count if we previously didn't know it.
+         * Doing it always can lead to races since lookup is not executed
+         * atomically most of the times. */
+        if (pl_inode->links == -2) {
+            pl_inode->links = pl_check_link_count(xdata);
+            if (buf->ia_type == IA_IFDIR) {
+                /* Directories have at least 2 links. To avoid special handling
+                 * for directories, we simply decrement the value here to make
+                 * them equivalent to regular files. */
+                pl_inode->links--;
+            }
+        }
+
+        pthread_mutex_unlock(&pl_inode->mutex);
+    }
+
     PL_STACK_UNWIND(lookup, xdata, frame, op_ret, op_errno, inode, buf, xdata,
                     postparent);
     return 0;
@@ -2982,9 +3079,17 @@ pl_lookup_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
 int32_t
 pl_lookup(call_frame_t *frame, xlator_t *this, loc_t *loc, dict_t *xdata)
 {
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
-    STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
-               FIRST_CHILD(this)->fops->lookup, loc, xdata);
+    int32_t error;
+
+    error = pl_request_link_count(&xdata);
+    if (error == 0) {
+        PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
+        STACK_WIND(frame, pl_lookup_cbk, FIRST_CHILD(this),
+                   FIRST_CHILD(this)->fops->lookup, loc, xdata);
+        dict_unref(xdata);
+    } else {
+        STACK_UNWIND_STRICT(lookup, frame, -1, error, NULL, NULL, NULL, NULL);
+    }
     return 0;
 }
 
@@ -3792,6 +3897,10 @@ unlock:
             gf_proc_dump_write("posixlk-count", "%d", count);
             __dump_posixlks(pl_inode);
         }
+
+        gf_proc_dump_write("links", "%d", pl_inode->links);
+        gf_proc_dump_write("removes_pending", "%u", pl_inode->remove_running);
+        gf_proc_dump_write("removed", "%u", pl_inode->removed);
     }
     pthread_mutex_unlock(&pl_inode->mutex);
 
@@ -4137,8 +4246,11 @@ pl_rename_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
               struct iatt *postoldparent, struct iatt *prenewparent,
               struct iatt *postnewparent, dict_t *xdata)
 {
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
+
     PL_STACK_UNWIND(rename, xdata, frame, op_ret, op_errno, buf, preoldparent,
                     postoldparent, prenewparent, postnewparent, xdata);
+
     return 0;
 }
 
@@ -4146,10 +4258,15 @@ int32_t
 pl_rename(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
           dict_t *xdata)
 {
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
+    int32_t error;
+
+    error = PL_INODE_REMOVE(rename, frame, this, oldloc, newloc, pl_rename,
+                            pl_rename_cbk, oldloc, newloc, xdata);
+    if (error > 0) {
+        STACK_UNWIND_STRICT(rename, frame, -1, error, NULL, NULL, NULL, NULL,
+                            NULL, NULL);
+    }
 
-    STACK_WIND(frame, pl_rename_cbk, FIRST_CHILD(this),
-               FIRST_CHILD(this)->fops->rename, oldloc, newloc, xdata);
     return 0;
 }
 
@@ -4273,8 +4390,11 @@ pl_unlink_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
               int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
               dict_t *xdata)
 {
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
+
     PL_STACK_UNWIND(unlink, xdata, frame, op_ret, op_errno, preparent,
                     postparent, xdata);
+
     return 0;
 }
 
@@ -4282,9 +4402,14 @@ int32_t
 pl_unlink(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflag,
           dict_t *xdata)
 {
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
-    STACK_WIND(frame, pl_unlink_cbk, FIRST_CHILD(this),
-               FIRST_CHILD(this)->fops->unlink, loc, xflag, xdata);
+    int32_t error;
+
+    error = PL_INODE_REMOVE(unlink, frame, this, loc, NULL, pl_unlink,
+                            pl_unlink_cbk, loc, xflag, xdata);
+    if (error > 0) {
+        STACK_UNWIND_STRICT(unlink, frame, -1, error, NULL, NULL, NULL);
+    }
+
     return 0;
 }
 
@@ -4351,8 +4476,11 @@ pl_rmdir_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
              int32_t op_errno, struct iatt *preparent, struct iatt *postparent,
              dict_t *xdata)
 {
+    pl_inode_remove_cbk(this, cookie, op_ret < 0 ? op_errno : 0);
+
     PL_STACK_UNWIND_FOR_CLIENT(rmdir, xdata, frame, op_ret, op_errno, preparent,
                                postparent, xdata);
+
     return 0;
 }
 
@@ -4360,9 +4488,14 @@ int
 pl_rmdir(call_frame_t *frame, xlator_t *this, loc_t *loc, int xflags,
          dict_t *xdata)
 {
-    PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), loc, NULL);
-    STACK_WIND(frame, pl_rmdir_cbk, FIRST_CHILD(this),
-               FIRST_CHILD(this)->fops->rmdir, loc, xflags, xdata);
+    int32_t error;
+
+    error = PL_INODE_REMOVE(rmdir, frame, this, loc, NULL, pl_rmdir,
+                            pl_rmdir_cbk, loc, xflags, xdata);
+    if (error > 0) {
+        STACK_UNWIND_STRICT(rmdir, frame, -1, error, NULL, NULL, NULL);
+    }
+
     return 0;
 }
 
@@ -4392,6 +4525,19 @@ pl_link_cbk(call_frame_t *frame, void *cookie, xlator_t *this, int32_t op_ret,
             int32_t op_errno, inode_t *inode, struct iatt *buf,
             struct iatt *preparent, struct iatt *postparent, dict_t *xdata)
 {
+    pl_inode_t *pl_inode = (pl_inode_t *)cookie;
+
+    if (op_ret >= 0) {
+        pthread_mutex_lock(&pl_inode->mutex);
+
+        /* TODO: can happen pl_inode->links == 0 ? */
+        if (pl_inode->links >= 0) {
+            pl_inode->links++;
+        }
+
+        pthread_mutex_unlock(&pl_inode->mutex);
+    }
+
     PL_STACK_UNWIND_FOR_CLIENT(link, xdata, frame, op_ret, op_errno, inode, buf,
                                preparent, postparent, xdata);
     return 0;
@@ -4401,9 +4547,18 @@ int
 pl_link(call_frame_t *frame, xlator_t *this, loc_t *oldloc, loc_t *newloc,
         dict_t *xdata)
 {
+    pl_inode_t *pl_inode;
+
+    pl_inode = pl_inode_get(this, oldloc->inode, NULL);
+    if (pl_inode == NULL) {
+        STACK_UNWIND_STRICT(link, frame, -1, ENOMEM, NULL, NULL, NULL, NULL,
+                            NULL);
+        return 0;
+    }
+
     PL_LOCAL_GET_REQUESTS(frame, this, xdata, ((fd_t *)NULL), oldloc, newloc);
-    STACK_WIND(frame, pl_link_cbk, FIRST_CHILD(this),
-               FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
+    STACK_WIND_COOKIE(frame, pl_link_cbk, pl_inode, FIRST_CHILD(this),
+                      FIRST_CHILD(this)->fops->link, oldloc, newloc, xdata);
     return 0;
 }
 
-- 
1.8.3.1