34b321
From 2a68d801c63137c3d1fe9fa96f0193eb2d1576f5 Mon Sep 17 00:00:00 2001
34b321
From: Fam Zheng <famz@redhat.com>
34b321
Date: Thu, 10 Mar 2016 04:00:52 +0100
34b321
Subject: [PATCH 3/5] nbd-server: Coroutine based negotiation
34b321
34b321
RH-Author: Fam Zheng <famz@redhat.com>
34b321
Message-id: <1457582453-13835-3-git-send-email-famz@redhat.com>
34b321
Patchwork-id: 69758
34b321
O-Subject: [RHEL-7.3 qemu-kvm PATCH v2 2/3] nbd-server: Coroutine based negotiation
34b321
Bugzilla: 1285453
34b321
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
34b321
RH-Acked-by: Paolo Bonzini <pbonzini@redhat.com>
34b321
RH-Acked-by: Laurent Vivier <lvivier@redhat.com>
34b321
34b321
Create a coroutine in nbd_client_new, so that nbd_send_negotiate doesn't
34b321
need qemu_set_block().
34b321
34b321
Handlers need to be set temporarily for csock fd in case the coroutine
34b321
yields during I/O.
34b321
34b321
With this, if the other end disappears in the middle of the negotiation,
34b321
we don't block the whole event loop.
34b321
34b321
To make the code clearer, unify all function names that belong to
34b321
negotiate, so they are less likely to be misused. This is important
34b321
because we rely on negotiation staying in main loop, as commented in
34b321
nbd_negotiate_read/write().
34b321
34b321
Signed-off-by: Fam Zheng <famz@redhat.com>
34b321
Message-Id: <1452760863-25350-4-git-send-email-famz@redhat.com>
34b321
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
34b321
(cherry picked from commit 1a6245a5b0b4e8d822c739b403fc67c8a7bc8d12)
34b321
Signed-off-by: Fam Zheng <famz@redhat.com>
34b321
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
34b321
34b321
Conflicts:
34b321
	nbd.c
34b321
Downstream doesn't have new style protocol, and the code is not split.
34b321
The patch is redone.
34b321
---
34b321
 nbd.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++---------------
34b321
 1 file changed, 82 insertions(+), 24 deletions(-)
34b321
34b321
diff --git a/nbd.c b/nbd.c
34b321
index ba97270..97aeecb 100644
34b321
--- a/nbd.c
34b321
+++ b/nbd.c
34b321
@@ -167,6 +167,41 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
34b321
     return offset;
34b321
 }
34b321
 
34b321
+static void nbd_negotiate_continue(void *opaque)
34b321
+{
34b321
+    qemu_coroutine_enter(opaque, NULL);
34b321
+}
34b321
+
34b321
+static ssize_t read_sync(int fd, void *buffer, size_t size);
34b321
+static ssize_t write_sync(int fd, void *buffer, size_t size);
34b321
+
34b321
+static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
34b321
+{
34b321
+    ssize_t ret;
34b321
+
34b321
+    assert(qemu_in_coroutine());
34b321
+    /* Negotiation are always in main loop. */
34b321
+    qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
34b321
+                        qemu_coroutine_self());
34b321
+    ret = read_sync(fd, buffer, size);
34b321
+    qemu_set_fd_handler(fd, NULL, NULL, NULL);
34b321
+    return ret;
34b321
+
34b321
+}
34b321
+
34b321
+static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
34b321
+{
34b321
+    ssize_t ret;
34b321
+
34b321
+    assert(qemu_in_coroutine());
34b321
+    /* Negotiation are always in main loop. */
34b321
+    qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
34b321
+                        qemu_coroutine_self());
34b321
+    ret = write_sync(fd, buffer, size);
34b321
+    qemu_set_fd_handler(fd, NULL, NULL, NULL);
34b321
+    return ret;
34b321
+}
34b321
+
34b321
 static ssize_t read_sync(int fd, void *buffer, size_t size)
34b321
 {
34b321
     /* Sockets are kept in blocking mode in the negotiation phase.  After
34b321
@@ -280,7 +315,7 @@ int unix_socket_outgoing(const char *path)
34b321
 
34b321
 */
34b321
 
34b321
-static int nbd_receive_options(NBDClient *client)
34b321
+static coroutine_fn int nbd_negotiate_receive_options(NBDClient *client)
34b321
 {
34b321
     int csock = client->sock;
34b321
     char name[256];
34b321
@@ -297,7 +332,7 @@ static int nbd_receive_options(NBDClient *client)
34b321
      */
34b321
 
34b321
     rc = -EINVAL;
34b321
-    if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
34b321
+    if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
34b321
         LOG("read failed");
34b321
         goto fail;
34b321
     }
34b321
@@ -307,7 +342,7 @@ static int nbd_receive_options(NBDClient *client)
34b321
         goto fail;
34b321
     }
34b321
 
34b321
-    if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
34b321
+    if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
34b321
         LOG("read failed");
34b321
         goto fail;
34b321
     }
34b321
@@ -317,7 +352,7 @@ static int nbd_receive_options(NBDClient *client)
34b321
         goto fail;
34b321
     }
34b321
 
34b321
-    if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
34b321
+    if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
34b321
         LOG("read failed");
34b321
         goto fail;
34b321
     }
34b321
@@ -327,7 +362,7 @@ static int nbd_receive_options(NBDClient *client)
34b321
         goto fail;
34b321
     }
34b321
 
34b321
-    if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
34b321
+    if (nbd_negotiate_read(csock, &length, sizeof(length)) != sizeof(length)) {
34b321
         LOG("read failed");
34b321
         goto fail;
34b321
     }
34b321
@@ -337,7 +372,7 @@ static int nbd_receive_options(NBDClient *client)
34b321
         LOG("Bad length received");
34b321
         goto fail;
34b321
     }
34b321
-    if (read_sync(csock, name, length) != length) {
34b321
+    if (nbd_negotiate_read(csock, name, length) != length) {
34b321
         LOG("read failed");
34b321
         goto fail;
34b321
     }
34b321
@@ -358,8 +393,14 @@ fail:
34b321
     return rc;
34b321
 }
34b321
 
34b321
-static int nbd_send_negotiate(NBDClient *client)
34b321
+typedef struct {
34b321
+    NBDClient *client;
34b321
+    Coroutine *co;
34b321
+} NBDClientNewData;
34b321
+
34b321
+static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
34b321
 {
34b321
+    NBDClient *client = data->client;
34b321
     int csock = client->sock;
34b321
     char buf[8 + 8 + 8 + 128];
34b321
     int rc;
34b321
@@ -385,7 +426,6 @@ static int nbd_send_negotiate(NBDClient *client)
34b321
         [28 .. 151]   reserved     (0)
34b321
      */
34b321
 
34b321
-    qemu_set_block(csock);
34b321
     rc = -EINVAL;
34b321
 
34b321
     TRACE("Beginning negotiation.");
34b321
@@ -401,16 +441,16 @@ static int nbd_send_negotiate(NBDClient *client)
34b321
     }
34b321
 
34b321
     if (client->exp) {
34b321
-        if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
34b321
+        if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
34b321
             LOG("write failed");
34b321
             goto fail;
34b321
         }
34b321
     } else {
34b321
-        if (write_sync(csock, buf, 18) != 18) {
34b321
+        if (nbd_negotiate_write(csock, buf, 18) != 18) {
34b321
             LOG("write failed");
34b321
             goto fail;
34b321
         }
34b321
-        rc = nbd_receive_options(client);
34b321
+        rc = nbd_negotiate_receive_options(client);
34b321
         if (rc < 0) {
34b321
             LOG("option negotiation failed");
34b321
             goto fail;
34b321
@@ -419,7 +459,8 @@ static int nbd_send_negotiate(NBDClient *client)
34b321
         assert ((client->exp->nbdflags & ~65535) == 0);
34b321
         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
34b321
         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
34b321
-        if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
34b321
+        if (nbd_negotiate_write(csock, buf + 18,
34b321
+                                sizeof(buf) - 18) != sizeof(buf) - 18) {
34b321
             LOG("write failed");
34b321
             goto fail;
34b321
         }
34b321
@@ -428,7 +469,6 @@ static int nbd_send_negotiate(NBDClient *client)
34b321
     TRACE("Negotiation succeeded.");
34b321
     rc = 0;
34b321
 fail:
34b321
-    qemu_set_nonblock(csock);
34b321
     return rc;
34b321
 }
34b321
 
34b321
@@ -1232,24 +1272,42 @@ static void nbd_restart_write(void *opaque)
34b321
     qemu_coroutine_enter(client->send_coroutine, NULL);
34b321
 }
34b321
 
34b321
+static coroutine_fn void nbd_co_client_start(void *opaque)
34b321
+{
34b321
+    NBDClientNewData *data = opaque;
34b321
+    NBDClient *client = data->client;
34b321
+    NBDExport *exp = client->exp;
34b321
+
34b321
+    if (exp) {
34b321
+        nbd_export_get(exp);
34b321
+    }
34b321
+    if (nbd_negotiate(data)) {
34b321
+        shutdown(client->sock, 2);
34b321
+        client->close(client);
34b321
+        goto out;
34b321
+    }
34b321
+    qemu_co_mutex_init(&client->send_lock);
34b321
+    qemu_set_fd_handler2(client->sock, nbd_can_read, nbd_read, NULL, client);
34b321
+
34b321
+    if (exp) {
34b321
+        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
34b321
+    }
34b321
+out:
34b321
+    g_free(data);
34b321
+}
34b321
+
34b321
 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
34b321
 {
34b321
     NBDClient *client;
34b321
+    NBDClientNewData *data = g_new(NBDClientNewData, 1);
34b321
+
34b321
     client = g_malloc0(sizeof(NBDClient));
34b321
     client->refcount = 1;
34b321
     client->exp = exp;
34b321
     client->sock = csock;
34b321
-    if (nbd_send_negotiate(client) < 0) {
34b321
-        shutdown(client->sock, 2);
34b321
-        close_fn(client);
34b321
-        return;
34b321
-    }
34b321
     client->close = close_fn;
34b321
-    qemu_co_mutex_init(&client->send_lock);
34b321
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
34b321
 
34b321
-    if (exp) {
34b321
-        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
34b321
-        nbd_export_get(exp);
34b321
-    }
34b321
+    data->client = client;
34b321
+    data->co = qemu_coroutine_create(nbd_co_client_start);
34b321
+    qemu_coroutine_enter(data->co, data);
34b321
 }
34b321
-- 
34b321
1.8.3.1
34b321