Blob Blame History Raw
From 1c94a3d3bd4ac1237d497f03a899ba29590fc37f Mon Sep 17 00:00:00 2001
From: Pranith Kumar K <pkarampu@redhat.com>
Date: Mon, 15 Jun 2015 16:32:06 +0530
Subject: [PATCH 139/190] cluster/ec: wind fops on good subvols for access/readdir[p]

        Backport of http://review.gluster.org/11246

Change-Id: I1e629a6adc803c4b7164a5a7a81ee5cb1d0e139c
BUG: 1228525
Signed-off-by: Pranith Kumar K <pkarampu@redhat.com>
Reviewed-on: https://code.engineering.redhat.com/gerrit/51322
---
 tests/basic/ec/ec-readdir.t            |   30 +++++-
 xlators/cluster/ec/src/ec-combine.c    |   12 +--
 xlators/cluster/ec/src/ec-common.c     |   28 ++++-
 xlators/cluster/ec/src/ec-common.h     |    3 +-
 xlators/cluster/ec/src/ec-data.c       |    2 +
 xlators/cluster/ec/src/ec-data.h       |    1 +
 xlators/cluster/ec/src/ec-dir-read.c   |  207 +++++++++++++++++---------------
 xlators/cluster/ec/src/ec-inode-read.c |   89 +++++++++++----
 8 files changed, 232 insertions(+), 140 deletions(-)

diff --git a/tests/basic/ec/ec-readdir.t b/tests/basic/ec/ec-readdir.t
index bbfa2dd..fad101b 100644
--- a/tests/basic/ec/ec-readdir.t
+++ b/tests/basic/ec/ec-readdir.t
@@ -9,12 +9,38 @@ cleanup
 TEST glusterd
 TEST pidof glusterd
 TEST $CLI volume create $V0 disperse 3 redundancy 1 $H0:$B0/${V0}{0..5}
+TEST $CLI volume heal $V0 disable
 TEST $CLI volume start $V0
 
 TEST $GFS --volfile-id=/$V0 --volfile-server=$H0 $M0;
 EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 0
 EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 1
 
-TEST touch $M0/{1..100}
-EXPECT "100" echo $(ls $M0/* | wc -l)
+TEST mkdir $M0/d
+TEST touch $M0/d/{1..100}
+EXPECT "100" echo $(ls $M0/d/* | wc -l)
+TEST kill_brick $V0 $H0 $B0/${V0}0
+TEST kill_brick $V0 $H0 $B0/${V0}3
+TEST rm -rf $M0/d/{1..100}
+TEST $CLI volume start $V0 force
+EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 0
+EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 1
+#Do it 3 times so that even with all load balancing, readdir never falls
+#on stale bricks
+EXPECT "0" echo $(ls $M0/d/ | wc -l)
+EXPECT "0" echo $(ls $M0/d/ | wc -l)
+EXPECT "0" echo $(ls $M0/d/ | wc -l)
+#Do the same test above for creation of entries
+TEST mkdir $M0/d1
+TEST kill_brick $V0 $H0 $B0/${V0}0
+TEST kill_brick $V0 $H0 $B0/${V0}3
+TEST touch $M0/d1/{1..100}
+TEST $CLI volume start $V0 force
+EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 0
+EXPECT_WITHIN $CHILD_UP_TIMEOUT "3" ec_child_up_count $V0 1
+#Do it 3 times so that even with all load balancing, readdir never falls
+#on stale bricks
+EXPECT "100" echo $(ls $M0/d1/ | wc -l)
+EXPECT "100" echo $(ls $M0/d1/ | wc -l)
+EXPECT "100" echo $(ls $M0/d1/ | wc -l)
 cleanup
diff --git a/xlators/cluster/ec/src/ec-combine.c b/xlators/cluster/ec/src/ec-combine.c
index a8cfabc..bf698d9 100644
--- a/xlators/cluster/ec/src/ec-combine.c
+++ b/xlators/cluster/ec/src/ec-combine.c
@@ -875,7 +875,7 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
     ec_fop_data_t *fop = newcbk->fop;
     ec_cbk_data_t *cbk = NULL, *tmp = NULL;
     struct list_head *item = NULL;
-    int32_t needed = 0, resume = 0;
+    int32_t needed = 0;
     char str[32];
 
     LOCK(&fop->lock);
@@ -912,12 +912,6 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
     ec_trace("ANSWER", fop, "combine=%s[%d]",
              ec_bin(str, sizeof(str), newcbk->mask, 0), newcbk->count);
 
-    if ((newcbk->count == fop->expected) && (fop->answer == NULL)) {
-        fop->answer = newcbk;
-
-        resume = 1;
-    }
-
     cbk = list_entry(fop->cbk_list.next, ec_cbk_data_t, list);
     if ((fop->mask ^ fop->remaining) == fop->received) {
         needed = fop->minimum - cbk->count;
@@ -927,9 +921,5 @@ void ec_combine (ec_cbk_data_t *newcbk, ec_combine_f combine)
 
     if (needed > 0) {
         ec_dispatch_next(fop, newcbk->idx);
-    } else if (resume) {
-        ec_update_bad(fop, newcbk->mask);
-
-        ec_resume(fop, 0);
     }
 }
diff --git a/xlators/cluster/ec/src/ec-common.c b/xlators/cluster/ec/src/ec-common.c
index f69234e..4dbbe28 100644
--- a/xlators/cluster/ec/src/ec-common.c
+++ b/xlators/cluster/ec/src/ec-common.c
@@ -186,6 +186,10 @@ void ec_update_bad(ec_fop_data_t * fop, uintptr_t good)
     ec_t *ec = fop->xl->private;
     uintptr_t bad;
 
+    /*Don't let fops that do dispatch_one() to update bad*/
+    if (fop->expected == 1)
+            return;
+
     bad = ec->xl_up & ~(fop->remaining | good);
     fop->bad |= bad;
     fop->good |= good;
@@ -314,6 +318,20 @@ void ec_resume_parent(ec_fop_data_t * fop, int32_t error)
     }
 }
 
+gf_boolean_t
+ec_is_recoverable_error (int32_t op_errno)
+{
+        switch (op_errno) {
+        case ENOTCONN:
+        case ESTALE:
+        case ENOENT:
+        case EBADFD:/*Opened fd but brick is disconnected*/
+        case EIO:/*Backend-fs crash like XFS/ext4 etc*/
+                return _gf_true;
+        }
+        return _gf_false;
+}
+
 void ec_complete(ec_fop_data_t * fop)
 {
     ec_cbk_data_t * cbk = NULL;
@@ -329,14 +347,12 @@ void ec_complete(ec_fop_data_t * fop)
             if (!list_empty(&fop->cbk_list)) {
                 cbk = list_entry(fop->cbk_list.next, ec_cbk_data_t, list);
                 healing_count = ec_bits_count (cbk->mask & fop->healing);
+                    /* fop shouldn't be treated as success if it is not
+                     * successful on at least fop->minimum good copies*/
                 if ((cbk->count - healing_count) >= fop->minimum) {
-                        /* fop shouldn't be treated as success if it is not
-                         * successful on at least fop->minimum good copies*/
-                    if ((cbk->op_ret >= 0) || (cbk->op_errno != ENOTCONN)) {
-                        fop->answer = cbk;
+                    fop->answer = cbk;
 
-                        update = 1;
-                    }
+                    update = 1;
                 }
             }
 
diff --git a/xlators/cluster/ec/src/ec-common.h b/xlators/cluster/ec/src/ec-common.h
index e3f01ca..3334a7b 100644
--- a/xlators/cluster/ec/src/ec-common.h
+++ b/xlators/cluster/ec/src/ec-common.h
@@ -75,7 +75,7 @@ typedef enum {
 #define EC_STATE_HEAL_POST_INODELK_UNLOCK   217
 #define EC_STATE_HEAL_DISPATCH              218
 
-int32_t ec_dispatch_one_retry(ec_fop_data_t *fop, int32_t idx, int32_t op_ret);
+int32_t ec_dispatch_one_retry (ec_fop_data_t *fop, int32_t idx, int32_t op_ret);
 int32_t ec_dispatch_next(ec_fop_data_t * fop, int32_t idx);
 
 void ec_complete(ec_fop_data_t * fop);
@@ -111,5 +111,6 @@ void ec_resume(ec_fop_data_t * fop, int32_t error);
 void ec_resume_parent(ec_fop_data_t * fop, int32_t error);
 
 void ec_manager(ec_fop_data_t * fop, int32_t error);
+gf_boolean_t ec_is_recoverable_error (int32_t op_errno);
 
 #endif /* __EC_COMMON_H__ */
diff --git a/xlators/cluster/ec/src/ec-data.c b/xlators/cluster/ec/src/ec-data.c
index 047ccd5..72f3b0b 100644
--- a/xlators/cluster/ec/src/ec-data.c
+++ b/xlators/cluster/ec/src/ec-data.c
@@ -59,6 +59,7 @@ ec_cbk_data_t * ec_cbk_data_allocate(call_frame_t * frame, xlator_t * this,
     cbk->count = 1;
     cbk->op_ret = op_ret;
     cbk->op_errno = op_errno;
+    INIT_LIST_HEAD (&cbk->entries.list);
 
     LOCK(&fop->lock);
 
@@ -92,6 +93,7 @@ void ec_cbk_data_destroy(ec_cbk_data_t * cbk)
         iobref_unref(cbk->buffers);
     }
     GF_FREE(cbk->vector);
+    gf_dirent_free (&cbk->entries);
 
     mem_put(cbk);
 }
diff --git a/xlators/cluster/ec/src/ec-data.h b/xlators/cluster/ec/src/ec-data.h
index 8f6d5de..670b3b8 100644
--- a/xlators/cluster/ec/src/ec-data.h
+++ b/xlators/cluster/ec/src/ec-data.h
@@ -266,6 +266,7 @@ struct _ec_cbk_data
     struct iovec *   vector;
     struct iobref *  buffers;
     uint64_t         dirty[2];
+    gf_dirent_t      entries;
 };
 
 struct _ec_heal
diff --git a/xlators/cluster/ec/src/ec-dir-read.c b/xlators/cluster/ec/src/ec-dir-read.c
index 7821878..69df4d2 100644
--- a/xlators/cluster/ec/src/ec-dir-read.c
+++ b/xlators/cluster/ec/src/ec-dir-read.c
@@ -316,34 +316,36 @@ out:
 
 /* FOP: readdir */
 
-void ec_adjust_readdir(ec_t * ec, int32_t idx, gf_dirent_t * entries)
+void ec_adjust_readdirp (ec_t *ec, int32_t idx, gf_dirent_t *entries)
 {
     gf_dirent_t * entry;
 
     list_for_each_entry(entry, &entries->list, list)
     {
+        if (!entry->inode)
+                continue;
+
         if (entry->d_stat.ia_type == IA_IFREG)
         {
             if ((entry->dict == NULL) ||
                 (ec_dict_del_number(entry->dict, EC_XATTR_SIZE,
-                                    &entry->d_stat.ia_size) != 0))
-            {
-                gf_log(ec->xl->name, GF_LOG_WARNING, "Unable to get exact "
-                                                     "file size.");
-
-                entry->d_stat.ia_size *= ec->fragments;
+                                    &entry->d_stat.ia_size) != 0)) {
+                    inode_unref (entry->inode);
+                    entry->inode = NULL;
+            } else {
+                ec_iatt_rebuild(ec, &entry->d_stat, 1, 1);
             }
-
-            ec_iatt_rebuild(ec, &entry->d_stat, 1, 1);
         }
     }
 }
 
-int32_t ec_readdir_cbk(call_frame_t * frame, void * cookie, xlator_t * this,
-                       int32_t op_ret, int32_t op_errno, gf_dirent_t * entries,
-                       dict_t * xdata)
+int32_t
+ec_common_readdir_cbk (call_frame_t *frame, void *cookie, xlator_t *this,
+                       int32_t op_ret, int32_t op_errno,
+                       gf_dirent_t *entries, dict_t *xdata)
 {
-    ec_fop_data_t * fop = NULL;
+    ec_fop_data_t *fop = NULL;
+    ec_cbk_data_t *cbk = NULL;
     int32_t idx = (int32_t)(uintptr_t)cookie;
 
     VALIDATE_OR_GOTO(this, out);
@@ -356,18 +358,15 @@ int32_t ec_readdir_cbk(call_frame_t * frame, void * cookie, xlator_t * this,
     ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx,
              frame, op_ret, op_errno);
 
-    if (op_ret > 0)
-    {
-        ec_adjust_readdir(fop->xl->private, idx, entries);
-    }
-
-    if (!ec_dispatch_one_retry(fop, idx, op_ret))
-    {
-        if (fop->cbks.readdir != NULL)
-        {
-            fop->cbks.readdir(fop->req_frame, fop, this, op_ret, op_errno,
-                              entries, xdata);
-        }
+    cbk = ec_cbk_data_allocate (frame, this, fop, fop->id,
+                                idx, op_ret, op_errno);
+    if (cbk) {
+        if (xdata)
+                cbk->xdata = dict_ref (xdata);
+        if (cbk->op_ret >= 0)
+                list_splice_init (&entries->list,
+                                  &cbk->entries.list);
+        ec_combine (cbk, NULL);
     }
 
 out:
@@ -383,14 +382,15 @@ void ec_wind_readdir(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
 {
     ec_trace("WIND", fop, "idx=%d", idx);
 
-    STACK_WIND_COOKIE(fop->frame, ec_readdir_cbk, (void *)(uintptr_t)idx,
+    STACK_WIND_COOKIE(fop->frame, ec_common_readdir_cbk, (void *)(uintptr_t)idx,
                       ec->xl_list[idx], ec->xl_list[idx]->fops->readdir,
                       fop->fd, fop->size, fop->offset, fop->xdata);
 }
 
 int32_t ec_manager_readdir(ec_fop_data_t * fop, int32_t state)
 {
-    ec_fd_t *ctx;
+    ec_fd_t *ctx = NULL;
+    ec_cbk_data_t *cbk = NULL;
 
     switch (state)
     {
@@ -404,27 +404,21 @@ int32_t ec_manager_readdir(ec_fop_data_t * fop, int32_t state)
                 return EC_STATE_REPORT;
             }
 
-            if (fop->xdata == NULL)
-            {
-                fop->xdata = dict_new();
-                if (fop->xdata == NULL)
-                {
-                    gf_log(fop->xl->name, GF_LOG_ERROR, "Unable to prepare "
-                                                        "readdirp request");
-
-                    fop->error = EIO;
+            if (fop->id == GF_FOP_READDIRP) {
+                    if (fop->xdata == NULL) {
+                        fop->xdata = dict_new();
+                        if (fop->xdata == NULL) {
+                            fop->error = EIO;
 
-                    return EC_STATE_REPORT;
-                }
-            }
-            if (dict_set_uint64(fop->xdata, EC_XATTR_SIZE, 0) != 0)
-            {
-                gf_log(fop->xl->name, GF_LOG_ERROR, "Unable to prepare "
-                                                    "readdirp request");
+                            return EC_STATE_REPORT;
+                        }
+                    }
 
-                fop->error = EIO;
+                    if (dict_set_uint64(fop->xdata, EC_XATTR_SIZE, 0)) {
+                        fop->error = EIO;
 
-                return EC_STATE_REPORT;
+                        return EC_STATE_REPORT;
+                    }
             }
 
             if (fop->offset != 0)
@@ -440,37 +434,92 @@ int32_t ec_manager_readdir(ec_fop_data_t * fop, int32_t state)
                         return EC_STATE_REPORT;
                 }
                 fop->mask &= 1ULL << idx;
+            } else {
+                    ec_lock_prepare_fd(fop, fop->fd, EC_QUERY_INFO);
+                    ec_lock(fop);
             }
 
-        /* Fall through */
+            return EC_STATE_DISPATCH;
 
         case EC_STATE_DISPATCH:
             ec_dispatch_one(fop);
 
+            return EC_STATE_PREPARE_ANSWER;
+
+        case EC_STATE_PREPARE_ANSWER:
+            cbk = fop->answer;
+            if (cbk) {
+                if ((cbk->op_ret < 0) &&
+                    ec_is_recoverable_error (cbk->op_errno)) {
+                    GF_ASSERT (fop->mask & (1ULL<<cbk->idx));
+                    fop->mask ^= (1ULL << cbk->idx);
+                    if (fop->mask == 0)
+                            return EC_STATE_REPORT;
+                    return EC_STATE_DISPATCH;
+                }
+                if ((cbk->op_ret > 0) && (fop->id == GF_FOP_READDIRP)) {
+                    ec_adjust_readdirp (fop->xl->private, cbk->idx,
+                                        &cbk->entries);
+                }
+            } else {
+                ec_fop_set_error(fop, EIO);
+            }
             return EC_STATE_REPORT;
 
+        case EC_STATE_REPORT:
+            cbk = fop->answer;
+            GF_ASSERT (cbk);
+            if (fop->id == GF_FOP_READDIR) {
+                if (fop->cbks.readdir != NULL) {
+                    fop->cbks.readdir(fop->req_frame, fop, fop->xl, cbk->op_ret,
+                                      cbk->op_errno, &cbk->entries, cbk->xdata);
+                }
+            } else {
+                if (fop->cbks.readdirp != NULL) {
+                    fop->cbks.readdirp(fop->req_frame, fop, fop->xl,
+                                       cbk->op_ret, cbk->op_errno,
+                                       &cbk->entries, cbk->xdata);
+                }
+            }
+            if (fop->offset == 0)
+                    return EC_STATE_LOCK_REUSE;
+            else
+                    return EC_STATE_END;
+
         case -EC_STATE_INIT:
-            if (fop->id == GF_FOP_READDIR)
-            {
-                if (fop->cbks.readdir != NULL)
-                {
+        case -EC_STATE_LOCK:
+        case -EC_STATE_DISPATCH:
+        case -EC_STATE_PREPARE_ANSWER:
+        case -EC_STATE_REPORT:
+            if (fop->id == GF_FOP_READDIR) {
+                if (fop->cbks.readdir != NULL) {
                     fop->cbks.readdir(fop->req_frame, fop, fop->xl, -1,
                                       fop->error, NULL, NULL);
                 }
-            }
-            else
-            {
-                if (fop->cbks.readdirp != NULL)
-                {
+            } else {
+                if (fop->cbks.readdirp != NULL) {
                     fop->cbks.readdirp(fop->req_frame, fop, fop->xl, -1,
                                        fop->error, NULL, NULL);
                 }
             }
+            if (fop->offset == 0)
+                    return EC_STATE_LOCK_REUSE;
+            else
+                    return EC_STATE_END;
 
-        case EC_STATE_REPORT:
-        case -EC_STATE_REPORT:
-            return EC_STATE_END;
+        case -EC_STATE_LOCK_REUSE:
+        case EC_STATE_LOCK_REUSE:
+            GF_ASSERT (fop->offset == 0);
+            ec_lock_reuse(fop);
+
+            return EC_STATE_UNLOCK;
 
+        case -EC_STATE_UNLOCK:
+        case EC_STATE_UNLOCK:
+            GF_ASSERT (fop->offset == 0);
+            ec_unlock(fop);
+
+            return EC_STATE_END;
         default:
             gf_log(fop->xl->name, GF_LOG_ERROR, "Unhandled state %d for %s",
                    state, ec_fop_name(fop->id));
@@ -544,51 +593,11 @@ out:
 
 /* FOP: readdirp */
 
-int32_t ec_readdirp_cbk(call_frame_t * frame, void * cookie, xlator_t * this,
-                        int32_t op_ret, int32_t op_errno,
-                        gf_dirent_t * entries, dict_t * xdata)
-{
-    ec_fop_data_t * fop = NULL;
-    int32_t idx = (int32_t)(uintptr_t)cookie;
-
-    VALIDATE_OR_GOTO(this, out);
-    GF_VALIDATE_OR_GOTO(this->name, frame, out);
-    GF_VALIDATE_OR_GOTO(this->name, frame->local, out);
-    GF_VALIDATE_OR_GOTO(this->name, this->private, out);
-
-    fop = frame->local;
-
-    ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx,
-             frame, op_ret, op_errno);
-
-    if (op_ret > 0)
-    {
-        ec_adjust_readdir(fop->xl->private, idx, entries);
-    }
-
-    if (!ec_dispatch_one_retry(fop, idx, op_ret))
-    {
-        if (fop->cbks.readdirp != NULL)
-        {
-            fop->cbks.readdirp(fop->req_frame, fop, this, op_ret, op_errno,
-                               entries, xdata);
-        }
-    }
-
-out:
-    if (fop != NULL)
-    {
-        ec_complete(fop);
-    }
-
-    return 0;
-}
-
 void ec_wind_readdirp(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
 {
     ec_trace("WIND", fop, "idx=%d", idx);
 
-    STACK_WIND_COOKIE(fop->frame, ec_readdirp_cbk, (void *)(uintptr_t)idx,
+    STACK_WIND_COOKIE(fop->frame, ec_common_readdir_cbk, (void *)(uintptr_t)idx,
                       ec->xl_list[idx], ec->xl_list[idx]->fops->readdirp,
                       fop->fd, fop->size, fop->offset, fop->xdata);
 }
diff --git a/xlators/cluster/ec/src/ec-inode-read.c b/xlators/cluster/ec/src/ec-inode-read.c
index ef2170f..1f91391 100644
--- a/xlators/cluster/ec/src/ec-inode-read.c
+++ b/xlators/cluster/ec/src/ec-inode-read.c
@@ -22,7 +22,8 @@
 int32_t ec_access_cbk(call_frame_t * frame, void * cookie, xlator_t * this,
                       int32_t op_ret, int32_t op_errno, dict_t * xdata)
 {
-    ec_fop_data_t * fop = NULL;
+    ec_fop_data_t *fop = NULL;
+    ec_cbk_data_t *cbk = NULL;
     int32_t idx = (int32_t)(uintptr_t)cookie;
 
     VALIDATE_OR_GOTO(this, out);
@@ -35,19 +36,18 @@ int32_t ec_access_cbk(call_frame_t * frame, void * cookie, xlator_t * this,
     ec_trace("CBK", fop, "idx=%d, frame=%p, op_ret=%d, op_errno=%d", idx,
              frame, op_ret, op_errno);
 
-    if (!ec_dispatch_one_retry(fop, idx, op_ret))
-    {
-        if (fop->cbks.access != NULL)
-        {
-            fop->cbks.access(fop->req_frame, fop, this, op_ret, op_errno,
-                             xdata);
-        }
+    cbk = ec_cbk_data_allocate (frame, this, fop, GF_FOP_ACCESS,
+                                idx, op_ret, op_errno);
+    if (cbk) {
+        if (xdata)
+               cbk->xdata = dict_ref (xdata);
+        ec_combine (cbk, NULL);
     }
 
 out:
     if (fop != NULL)
     {
-        ec_complete(fop);
+        ec_complete (fop);
     }
 
     return 0;
@@ -62,25 +62,72 @@ void ec_wind_access(ec_t * ec, ec_fop_data_t * fop, int32_t idx)
                       &fop->loc[0], fop->int32, fop->xdata);
 }
 
-int32_t ec_manager_access(ec_fop_data_t * fop, int32_t state)
+int32_t
+ec_manager_access(ec_fop_data_t *fop, int32_t state)
 {
-    switch (state)
-    {
+        ec_cbk_data_t *cbk = NULL;
+
+        switch (state) {
         case EC_STATE_INIT:
+        case EC_STATE_LOCK:
+            ec_lock_prepare_inode (fop, &fop->loc[0], EC_QUERY_INFO);
+            ec_lock (fop);
+
+            return EC_STATE_DISPATCH;
+
         case EC_STATE_DISPATCH:
-            ec_dispatch_one(fop);
+            ec_dispatch_one (fop);
 
+            return EC_STATE_PREPARE_ANSWER;
+
+        case EC_STATE_PREPARE_ANSWER:
+            cbk = fop->answer;
+            if (cbk) {
+                if ((cbk->op_ret < 0) && ec_is_recoverable_error (cbk->op_errno)) {
+                    GF_ASSERT (fop->mask & (1ULL<<cbk->idx));
+                    fop->mask ^= (1ULL << cbk->idx);
+                    if (fop->mask == 0)
+                            return EC_STATE_REPORT;
+                    return EC_STATE_DISPATCH;
+                }
+            } else {
+                ec_fop_set_error(fop, EIO);
+            }
             return EC_STATE_REPORT;
 
-        case -EC_STATE_INIT:
-            if (fop->cbks.access != NULL)
-            {
-                fop->cbks.access(fop->req_frame, fop, fop->xl, -1, fop->error,
-                                 NULL);
+        case EC_STATE_REPORT:
+            cbk = fop->answer;
+            GF_ASSERT (cbk);
+            if (fop->cbks.access != NULL) {
+                if (cbk) {
+                    fop->cbks.access(fop->req_frame, fop, fop->xl,
+                                     cbk->op_ret, cbk->op_errno,
+                                     cbk->xdata);
+                }
             }
+            return EC_STATE_LOCK_REUSE;
 
+        case -EC_STATE_INIT:
+        case -EC_STATE_LOCK:
+        case -EC_STATE_DISPATCH:
+        case -EC_STATE_PREPARE_ANSWER:
         case -EC_STATE_REPORT:
-        case EC_STATE_REPORT:
+            if (fop->cbks.access != NULL) {
+                fop->cbks.access(fop->req_frame, fop, fop->xl, -1,
+                                 fop->error, NULL);
+            }
+            return -EC_STATE_LOCK_REUSE;
+
+        case -EC_STATE_LOCK_REUSE:
+        case EC_STATE_LOCK_REUSE:
+            ec_lock_reuse(fop);
+
+            return EC_STATE_UNLOCK;
+
+        case -EC_STATE_UNLOCK:
+        case EC_STATE_UNLOCK:
+            ec_unlock(fop);
+
             return EC_STATE_END;
 
         default:
@@ -88,7 +135,7 @@ int32_t ec_manager_access(ec_fop_data_t * fop, int32_t state)
                    state, ec_fop_name(fop->id));
 
             return EC_STATE_END;
-    }
+        }
 }
 
 void ec_access(call_frame_t * frame, xlator_t * this, uintptr_t target,
-- 
1.7.1