Blob Blame History Raw
From f0e823dfc224c9c3c7a1179453ef6fe087171787 Mon Sep 17 00:00:00 2001
From: Pranith Kumar K <pkarampu@redhat.com>
Date: Wed, 15 Jul 2015 06:16:54 +0530
Subject: [PATCH 244/244] cluster/ec: Handle race between unlock-timer, new lock

        Backport of http://review.gluster.org/11670

Problem:
New lock could come at the time timer is on the way to unlock. This was leading
to crash in timer thread because thread executing new lock can free up the
timer_link->fop and then timer thread will try to access structures already
freed.

Fix:
If the timer event is fired, set lock->release to true and wait for unlock to
complete.

Thanks to Xavi and Bhaskar for helping in confirming that this race is the RC.
Thanks to Kritika for pointing out and explaining how Avati's patch can be used
to fix this bug.

Change-Id: I45fa5470bbc1f03b5f3d133e26d1e0ab24303378
BUG: 1242423
Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/53061
---
 xlators/cluster/ec/src/ec-common.c |   33 ++++++++++++++-------------------
 xlators/cluster/ec/src/ec-common.h |    1 -
 xlators/cluster/ec/src/ec.c        |   33 +++------------------------------
 3 files changed, 17 insertions(+), 50 deletions(-)

diff --git a/xlators/cluster/ec/src/ec-common.c b/xlators/cluster/ec/src/ec-common.c
index b2d0348..ec6a4aa 100644
--- a/xlators/cluster/ec/src/ec-common.c
+++ b/xlators/cluster/ec/src/ec-common.c
@@ -1358,13 +1358,20 @@ void ec_lock(ec_fop_data_t *fop)
         if (lock->timer != NULL) {
             GF_ASSERT (lock->release == _gf_false);
             timer_link = lock->timer->data;
-            ec_trace("UNLOCK_CANCELLED", timer_link->fop, "lock=%p", lock);
-            gf_timer_call_cancel(fop->xl->ctx, lock->timer);
-            lock->timer = NULL;
-
-            lock->refs--;
-            /* There should remain at least 1 ref, the current one. */
-            GF_ASSERT(lock->refs > 0);
+            if (gf_timer_call_cancel(fop->xl->ctx, lock->timer) == 0) {
+                    ec_trace("UNLOCK_CANCELLED", timer_link->fop,
+                             "lock=%p", lock);
+                    lock->timer = NULL;
+                    lock->refs--;
+                    /* There should remain at least 1 ref, the current one. */
+                    GF_ASSERT(lock->refs > 0);
+            } else {
+                    /* Timer expired and on the way to unlock.
+                     * Set lock->release to _gf_true, so that this
+                     * lock will be put in frozen list*/
+                    timer_link = NULL;
+                    lock->release = _gf_true;
+            }
         }
 
         GF_ASSERT(list_empty(&link->wait_list));
@@ -1815,18 +1822,6 @@ void ec_unlock(ec_fop_data_t *fop)
     }
 }
 
-void
-ec_unlock_force(ec_fop_data_t *fop)
-{
-        int32_t i;
-
-        for (i = 0; i < fop->lock_count; i++) {
-                ec_trace("UNLOCK_FORCED", fop, "lock=%p", &fop->locks[i]);
-
-                ec_unlock_timer_del(&fop->locks[i]);
-        }
-}
-
 void ec_flush_size_version(ec_fop_data_t *fop)
 {
     GF_ASSERT(fop->lock_count == 1);
diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h
index 4c7fe0c..a851fb3 100644
--- a/xlators/cluster/ec/src/ec-common.h
+++ b/xlators/cluster/ec/src/ec-common.h
@@ -91,7 +91,6 @@ void ec_lock_prepare_fd(ec_fop_data_t *fop, fd_t *fd, uint32_t flags);
 void ec_lock(ec_fop_data_t * fop);
 void ec_lock_reuse(ec_fop_data_t *fop);
 void ec_unlock(ec_fop_data_t * fop);
-void ec_unlock_force(ec_fop_data_t *fop);
 
 gf_boolean_t ec_get_inode_size(ec_fop_data_t *fop, inode_t *inode,
                                uint64_t *size);
diff --git a/xlators/cluster/ec/src/ec.c b/xlators/cluster/ec/src/ec.c
index 4673c11..2885b01 100644
--- a/xlators/cluster/ec/src/ec.c
+++ b/xlators/cluster/ec/src/ec.c
@@ -395,38 +395,11 @@ ec_handle_down (xlator_t *this, ec_t *ec, int32_t idx)
 }
 
 gf_boolean_t
-ec_force_unlocks(ec_t *ec)
+ec_disable_delays(ec_t *ec)
 {
-        struct list_head list;
-        ec_fop_data_t *fop;
-
-        if (list_empty(&ec->pending_fops)) {
-                return _gf_true;
-        }
-
-        INIT_LIST_HEAD(&list);
-
-        /* All pending fops when GF_EVENT_PARENT_DOWN is received should only
-         * be fops waiting for a delayed unlock. However the unlock can
-         * generate new fops. We don't want to trverse these new fops while
-         * forcing unlocks, so we move all fops to a temporal list. To process
-         * them without interferences.*/
-        list_splice_init(&ec->pending_fops, &list);
-
-        while (!list_empty(&list)) {
-                fop = list_entry(list.next, ec_fop_data_t, pending_list);
-                list_move_tail(&fop->pending_list, &ec->pending_fops);
-
-                UNLOCK(&ec->lock);
-
-                ec_unlock_force(fop);
-
-                LOCK(&ec->lock);
-        }
-
         ec->shutdown = _gf_true;
 
-        return list_empty(&ec->pending_fops);
+        return list_empty (&ec->pending_fops);
 }
 
 void
@@ -482,7 +455,7 @@ ec_notify (xlator_t *this, int32_t event, void *data, void *data2)
         } else if (event == GF_EVENT_PARENT_DOWN) {
                 /* If there aren't pending fops running after we have waken up
                  * them, we immediately propagate the notification. */
-                propagate = ec_force_unlocks(ec);
+                propagate = ec_disable_delays(ec);
                 goto unlock;
         }
 
-- 
1.7.1