76daa3
From 689e11115170df5dedec420d78d375e780f3968e Mon Sep 17 00:00:00 2001
76daa3
From: Eric Blake <eblake@redhat.com>
76daa3
Date: Sun, 11 Jun 2017 03:30:07 +0200
76daa3
Subject: [PATCH 04/13] nbd: make it thread-safe, fix qcow2 over nbd
76daa3
76daa3
RH-Author: Eric Blake <eblake@redhat.com>
76daa3
Message-id: <20170611033007.399-1-eblake@redhat.com>
76daa3
Patchwork-id: 75581
76daa3
O-Subject: [RHEV-7.4 qemu-kvm-rhev PATCH] nbd: make it thread-safe, fix qcow2 over nbd
76daa3
Bugzilla: 1454582
76daa3
RH-Acked-by: Laurent Vivier <lvivier@redhat.com>
76daa3
RH-Acked-by: Max Reitz <mreitz@redhat.com>
76daa3
RH-Acked-by: Jeffrey Cody <jcody@redhat.com>
76daa3
76daa3
From: Paolo Bonzini <pbonzini@redhat.com>
76daa3
76daa3
NBD is not thread safe, because it accesses s->in_flight without
76daa3
a CoMutex.  Fixing this will be required for multiqueue.
76daa3
CoQueue doesn't have spurious wakeups but, when another coroutine can
76daa3
run between qemu_co_queue_next's wakeup and qemu_co_queue_wait's
76daa3
re-locking of the mutex, the wait condition can become false and
76daa3
a loop is necessary.
76daa3
76daa3
In fact, it turns out that the loop is necessary even without this
76daa3
multi-threaded scenario.  A particular sequence of coroutine wakeups
76daa3
is happening ~80% of the time when starting a guest with qcow2 image
76daa3
served over NBD (i.e. qemu-nbd --format=raw, and QEMU's -drive option
76daa3
has -format=qcow2).  This patch fixes that issue too.
76daa3
76daa3
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
76daa3
(cherry picked from commit 6bdcc018a6ed760b9dfe43539124e420aed83092)
76daa3
Signed-off-by: Eric Blake <eblake@redhat.com>
76daa3
Upstream-status: v6 pull request https://lists.gnu.org/archive/html/qemu-devel/2017-06/msg01841.html
76daa3
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
76daa3
---
76daa3
 block/nbd-client.c | 30 +++++++++---------------------
76daa3
 1 file changed, 9 insertions(+), 21 deletions(-)
76daa3
76daa3
diff --git a/block/nbd-client.c b/block/nbd-client.c
76daa3
index 1e2952f..43e0292 100644
76daa3
--- a/block/nbd-client.c
76daa3
+++ b/block/nbd-client.c
76daa3
@@ -114,6 +114,10 @@ static int nbd_co_send_request(BlockDriverState *bs,
76daa3
     int rc, ret, i;
76daa3
 
76daa3
     qemu_co_mutex_lock(&s->send_mutex);
76daa3
+    while (s->in_flight == MAX_NBD_REQUESTS) {
76daa3
+        qemu_co_queue_wait(&s->free_sema, &s->send_mutex);
76daa3
+    }
76daa3
+    s->in_flight++;
76daa3
 
76daa3
     for (i = 0; i < MAX_NBD_REQUESTS; i++) {
76daa3
         if (s->recv_coroutine[i] == NULL) {
76daa3
@@ -176,20 +180,6 @@ static void nbd_co_receive_reply(NBDClientSession *s,
76daa3
     }
76daa3
 }
76daa3
 
76daa3
-static void nbd_coroutine_start(NBDClientSession *s,
76daa3
-                                NBDRequest *request)
76daa3
-{
76daa3
-    /* Poor man semaphore.  The free_sema is locked when no other request
76daa3
-     * can be accepted, and unlocked after receiving one reply.  */
76daa3
-    if (s->in_flight == MAX_NBD_REQUESTS) {
76daa3
-        qemu_co_queue_wait(&s->free_sema, NULL);
76daa3
-        assert(s->in_flight < MAX_NBD_REQUESTS);
76daa3
-    }
76daa3
-    s->in_flight++;
76daa3
-
76daa3
-    /* s->recv_coroutine[i] is set as soon as we get the send_lock.  */
76daa3
-}
76daa3
-
76daa3
 static void nbd_coroutine_end(BlockDriverState *bs,
76daa3
                               NBDRequest *request)
76daa3
 {
76daa3
@@ -197,13 +187,16 @@ static void nbd_coroutine_end(BlockDriverState *bs,
76daa3
     int i = HANDLE_TO_INDEX(s, request->handle);
76daa3
 
76daa3
     s->recv_coroutine[i] = NULL;
76daa3
-    s->in_flight--;
76daa3
-    qemu_co_queue_next(&s->free_sema);
76daa3
 
76daa3
     /* Kick the read_reply_co to get the next reply.  */
76daa3
     if (s->read_reply_co) {
76daa3
         aio_co_wake(s->read_reply_co);
76daa3
     }
76daa3
+
76daa3
+    qemu_co_mutex_lock(&s->send_mutex);
76daa3
+    s->in_flight--;
76daa3
+    qemu_co_queue_next(&s->free_sema);
76daa3
+    qemu_co_mutex_unlock(&s->send_mutex);
76daa3
 }
76daa3
 
76daa3
 int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
76daa3
@@ -221,7 +214,6 @@ int nbd_client_co_preadv(BlockDriverState *bs, uint64_t offset,
76daa3
     assert(bytes <= NBD_MAX_BUFFER_SIZE);
76daa3
     assert(!flags);
76daa3
 
76daa3
-    nbd_coroutine_start(client, &request);
76daa3
     ret = nbd_co_send_request(bs, &request, NULL);
76daa3
     if (ret < 0) {
76daa3
         reply.error = -ret;
76daa3
@@ -251,7 +243,6 @@ int nbd_client_co_pwritev(BlockDriverState *bs, uint64_t offset,
76daa3
 
76daa3
     assert(bytes <= NBD_MAX_BUFFER_SIZE);
76daa3
 
76daa3
-    nbd_coroutine_start(client, &request);
76daa3
     ret = nbd_co_send_request(bs, &request, qiov);
76daa3
     if (ret < 0) {
76daa3
         reply.error = -ret;
76daa3
@@ -286,7 +277,6 @@ int nbd_client_co_pwrite_zeroes(BlockDriverState *bs, int64_t offset,
76daa3
         request.flags |= NBD_CMD_FLAG_NO_HOLE;
76daa3
     }
76daa3
 
76daa3
-    nbd_coroutine_start(client, &request);
76daa3
     ret = nbd_co_send_request(bs, &request, NULL);
76daa3
     if (ret < 0) {
76daa3
         reply.error = -ret;
76daa3
@@ -311,7 +301,6 @@ int nbd_client_co_flush(BlockDriverState *bs)
76daa3
     request.from = 0;
76daa3
     request.len = 0;
76daa3
 
76daa3
-    nbd_coroutine_start(client, &request);
76daa3
     ret = nbd_co_send_request(bs, &request, NULL);
76daa3
     if (ret < 0) {
76daa3
         reply.error = -ret;
76daa3
@@ -337,7 +326,6 @@ int nbd_client_co_pdiscard(BlockDriverState *bs, int64_t offset, int count)
76daa3
         return 0;
76daa3
     }
76daa3
 
76daa3
-    nbd_coroutine_start(client, &request);
76daa3
     ret = nbd_co_send_request(bs, &request, NULL);
76daa3
     if (ret < 0) {
76daa3
         reply.error = -ret;
76daa3
-- 
76daa3
1.8.3.1
76daa3