9ae3a8
From 2a68d801c63137c3d1fe9fa96f0193eb2d1576f5 Mon Sep 17 00:00:00 2001
9ae3a8
From: Fam Zheng <famz@redhat.com>
9ae3a8
Date: Thu, 10 Mar 2016 04:00:52 +0100
9ae3a8
Subject: [PATCH 3/5] nbd-server: Coroutine based negotiation
9ae3a8
9ae3a8
RH-Author: Fam Zheng <famz@redhat.com>
9ae3a8
Message-id: <1457582453-13835-3-git-send-email-famz@redhat.com>
9ae3a8
Patchwork-id: 69758
9ae3a8
O-Subject: [RHEL-7.3 qemu-kvm PATCH v2 2/3] nbd-server: Coroutine based negotiation
9ae3a8
Bugzilla: 1285453
9ae3a8
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
9ae3a8
RH-Acked-by: Paolo Bonzini <pbonzini@redhat.com>
9ae3a8
RH-Acked-by: Laurent Vivier <lvivier@redhat.com>
9ae3a8
9ae3a8
Create a coroutine in nbd_client_new, so that nbd_send_negotiate doesn't
9ae3a8
need qemu_set_block().
9ae3a8
9ae3a8
Handlers need to be set temporarily for csock fd in case the coroutine
9ae3a8
yields during I/O.
9ae3a8
9ae3a8
With this, if the other end disappears in the middle of the negotiation,
9ae3a8
we don't block the whole event loop.
9ae3a8
9ae3a8
To make the code clearer, unify all function names that belong to
9ae3a8
negotiate, so they are less likely to be misused. This is important
9ae3a8
because we rely on negotiation staying in main loop, as commented in
9ae3a8
nbd_negotiate_read/write().
9ae3a8
9ae3a8
Signed-off-by: Fam Zheng <famz@redhat.com>
9ae3a8
Message-Id: <1452760863-25350-4-git-send-email-famz@redhat.com>
9ae3a8
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
9ae3a8
(cherry picked from commit 1a6245a5b0b4e8d822c739b403fc67c8a7bc8d12)
9ae3a8
Signed-off-by: Fam Zheng <famz@redhat.com>
9ae3a8
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
9ae3a8
9ae3a8
Conflicts:
9ae3a8
	nbd.c
9ae3a8
Downstream doesn't have new style protocol, and the code is not split.
9ae3a8
The patch is redone.
9ae3a8
---
9ae3a8
 nbd.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++---------------
9ae3a8
 1 file changed, 82 insertions(+), 24 deletions(-)
9ae3a8
9ae3a8
diff --git a/nbd.c b/nbd.c
9ae3a8
index ba97270..97aeecb 100644
9ae3a8
--- a/nbd.c
9ae3a8
+++ b/nbd.c
9ae3a8
@@ -167,6 +167,41 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
9ae3a8
     return offset;
9ae3a8
 }
9ae3a8
 
9ae3a8
+static void nbd_negotiate_continue(void *opaque)
9ae3a8
+{
9ae3a8
+    qemu_coroutine_enter(opaque, NULL);
9ae3a8
+}
9ae3a8
+
9ae3a8
+static ssize_t read_sync(int fd, void *buffer, size_t size);
9ae3a8
+static ssize_t write_sync(int fd, void *buffer, size_t size);
9ae3a8
+
9ae3a8
+static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
9ae3a8
+{
9ae3a8
+    ssize_t ret;
9ae3a8
+
9ae3a8
+    assert(qemu_in_coroutine());
9ae3a8
+    /* Negotiation are always in main loop. */
9ae3a8
+    qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
9ae3a8
+                        qemu_coroutine_self());
9ae3a8
+    ret = read_sync(fd, buffer, size);
9ae3a8
+    qemu_set_fd_handler(fd, NULL, NULL, NULL);
9ae3a8
+    return ret;
9ae3a8
+
9ae3a8
+}
9ae3a8
+
9ae3a8
+static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
9ae3a8
+{
9ae3a8
+    ssize_t ret;
9ae3a8
+
9ae3a8
+    assert(qemu_in_coroutine());
9ae3a8
+    /* Negotiation are always in main loop. */
9ae3a8
+    qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
9ae3a8
+                        qemu_coroutine_self());
9ae3a8
+    ret = write_sync(fd, buffer, size);
9ae3a8
+    qemu_set_fd_handler(fd, NULL, NULL, NULL);
9ae3a8
+    return ret;
9ae3a8
+}
9ae3a8
+
9ae3a8
 static ssize_t read_sync(int fd, void *buffer, size_t size)
9ae3a8
 {
9ae3a8
     /* Sockets are kept in blocking mode in the negotiation phase.  After
9ae3a8
@@ -280,7 +315,7 @@ int unix_socket_outgoing(const char *path)
9ae3a8
 
9ae3a8
 */
9ae3a8
 
9ae3a8
-static int nbd_receive_options(NBDClient *client)
9ae3a8
+static coroutine_fn int nbd_negotiate_receive_options(NBDClient *client)
9ae3a8
 {
9ae3a8
     int csock = client->sock;
9ae3a8
     char name[256];
9ae3a8
@@ -297,7 +332,7 @@ static int nbd_receive_options(NBDClient *client)
9ae3a8
      */
9ae3a8
 
9ae3a8
     rc = -EINVAL;
9ae3a8
-    if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
9ae3a8
+    if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
9ae3a8
         LOG("read failed");
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
@@ -307,7 +342,7 @@ static int nbd_receive_options(NBDClient *client)
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
 
9ae3a8
-    if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
9ae3a8
+    if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
9ae3a8
         LOG("read failed");
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
@@ -317,7 +352,7 @@ static int nbd_receive_options(NBDClient *client)
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
 
9ae3a8
-    if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
9ae3a8
+    if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
9ae3a8
         LOG("read failed");
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
@@ -327,7 +362,7 @@ static int nbd_receive_options(NBDClient *client)
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
 
9ae3a8
-    if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
9ae3a8
+    if (nbd_negotiate_read(csock, &length, sizeof(length)) != sizeof(length)) {
9ae3a8
         LOG("read failed");
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
@@ -337,7 +372,7 @@ static int nbd_receive_options(NBDClient *client)
9ae3a8
         LOG("Bad length received");
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
-    if (read_sync(csock, name, length) != length) {
9ae3a8
+    if (nbd_negotiate_read(csock, name, length) != length) {
9ae3a8
         LOG("read failed");
9ae3a8
         goto fail;
9ae3a8
     }
9ae3a8
@@ -358,8 +393,14 @@ fail:
9ae3a8
     return rc;
9ae3a8
 }
9ae3a8
 
9ae3a8
-static int nbd_send_negotiate(NBDClient *client)
9ae3a8
+typedef struct {
9ae3a8
+    NBDClient *client;
9ae3a8
+    Coroutine *co;
9ae3a8
+} NBDClientNewData;
9ae3a8
+
9ae3a8
+static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
9ae3a8
 {
9ae3a8
+    NBDClient *client = data->client;
9ae3a8
     int csock = client->sock;
9ae3a8
     char buf[8 + 8 + 8 + 128];
9ae3a8
     int rc;
9ae3a8
@@ -385,7 +426,6 @@ static int nbd_send_negotiate(NBDClient *client)
9ae3a8
         [28 .. 151]   reserved     (0)
9ae3a8
      */
9ae3a8
 
9ae3a8
-    qemu_set_block(csock);
9ae3a8
     rc = -EINVAL;
9ae3a8
 
9ae3a8
     TRACE("Beginning negotiation.");
9ae3a8
@@ -401,16 +441,16 @@ static int nbd_send_negotiate(NBDClient *client)
9ae3a8
     }
9ae3a8
 
9ae3a8
     if (client->exp) {
9ae3a8
-        if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
9ae3a8
+        if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
9ae3a8
             LOG("write failed");
9ae3a8
             goto fail;
9ae3a8
         }
9ae3a8
     } else {
9ae3a8
-        if (write_sync(csock, buf, 18) != 18) {
9ae3a8
+        if (nbd_negotiate_write(csock, buf, 18) != 18) {
9ae3a8
             LOG("write failed");
9ae3a8
             goto fail;
9ae3a8
         }
9ae3a8
-        rc = nbd_receive_options(client);
9ae3a8
+        rc = nbd_negotiate_receive_options(client);
9ae3a8
         if (rc < 0) {
9ae3a8
             LOG("option negotiation failed");
9ae3a8
             goto fail;
9ae3a8
@@ -419,7 +459,8 @@ static int nbd_send_negotiate(NBDClient *client)
9ae3a8
         assert ((client->exp->nbdflags & ~65535) == 0);
9ae3a8
         cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
9ae3a8
         cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
9ae3a8
-        if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
9ae3a8
+        if (nbd_negotiate_write(csock, buf + 18,
9ae3a8
+                                sizeof(buf) - 18) != sizeof(buf) - 18) {
9ae3a8
             LOG("write failed");
9ae3a8
             goto fail;
9ae3a8
         }
9ae3a8
@@ -428,7 +469,6 @@ static int nbd_send_negotiate(NBDClient *client)
9ae3a8
     TRACE("Negotiation succeeded.");
9ae3a8
     rc = 0;
9ae3a8
 fail:
9ae3a8
-    qemu_set_nonblock(csock);
9ae3a8
     return rc;
9ae3a8
 }
9ae3a8
 
9ae3a8
@@ -1232,24 +1272,42 @@ static void nbd_restart_write(void *opaque)
9ae3a8
     qemu_coroutine_enter(client->send_coroutine, NULL);
9ae3a8
 }
9ae3a8
 
9ae3a8
+static coroutine_fn void nbd_co_client_start(void *opaque)
9ae3a8
+{
9ae3a8
+    NBDClientNewData *data = opaque;
9ae3a8
+    NBDClient *client = data->client;
9ae3a8
+    NBDExport *exp = client->exp;
9ae3a8
+
9ae3a8
+    if (exp) {
9ae3a8
+        nbd_export_get(exp);
9ae3a8
+    }
9ae3a8
+    if (nbd_negotiate(data)) {
9ae3a8
+        shutdown(client->sock, 2);
9ae3a8
+        client->close(client);
9ae3a8
+        goto out;
9ae3a8
+    }
9ae3a8
+    qemu_co_mutex_init(&client->send_lock);
9ae3a8
+    qemu_set_fd_handler2(client->sock, nbd_can_read, nbd_read, NULL, client);
9ae3a8
+
9ae3a8
+    if (exp) {
9ae3a8
+        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
9ae3a8
+    }
9ae3a8
+out:
9ae3a8
+    g_free(data);
9ae3a8
+}
9ae3a8
+
9ae3a8
 void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
9ae3a8
 {
9ae3a8
     NBDClient *client;
9ae3a8
+    NBDClientNewData *data = g_new(NBDClientNewData, 1);
9ae3a8
+
9ae3a8
     client = g_malloc0(sizeof(NBDClient));
9ae3a8
     client->refcount = 1;
9ae3a8
     client->exp = exp;
9ae3a8
     client->sock = csock;
9ae3a8
-    if (nbd_send_negotiate(client) < 0) {
9ae3a8
-        shutdown(client->sock, 2);
9ae3a8
-        close_fn(client);
9ae3a8
-        return;
9ae3a8
-    }
9ae3a8
     client->close = close_fn;
9ae3a8
-    qemu_co_mutex_init(&client->send_lock);
9ae3a8
-    qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
9ae3a8
 
9ae3a8
-    if (exp) {
9ae3a8
-        QTAILQ_INSERT_TAIL(&exp->clients, client, next);
9ae3a8
-        nbd_export_get(exp);
9ae3a8
-    }
9ae3a8
+    data->client = client;
9ae3a8
+    data->co = qemu_coroutine_create(nbd_co_client_start);
9ae3a8
+    qemu_coroutine_enter(data->co, data);
9ae3a8
 }
9ae3a8
-- 
9ae3a8
1.8.3.1
9ae3a8