From 2a68d801c63137c3d1fe9fa96f0193eb2d1576f5 Mon Sep 17 00:00:00 2001
From: Fam Zheng <famz@redhat.com>
Date: Thu, 10 Mar 2016 04:00:52 +0100
Subject: [PATCH 3/5] nbd-server: Coroutine based negotiation
RH-Author: Fam Zheng <famz@redhat.com>
Message-id: <1457582453-13835-3-git-send-email-famz@redhat.com>
Patchwork-id: 69758
O-Subject: [RHEL-7.3 qemu-kvm PATCH v2 2/3] nbd-server: Coroutine based negotiation
Bugzilla: 1285453
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
RH-Acked-by: Paolo Bonzini <pbonzini@redhat.com>
RH-Acked-by: Laurent Vivier <lvivier@redhat.com>
Create a coroutine in nbd_client_new, so that nbd_send_negotiate doesn't
need qemu_set_block().
Handlers need to be set temporarily for csock fd in case the coroutine
yields during I/O.
With this, if the other end disappears in the middle of the negotiation,
we don't block the whole event loop.
To make the code clearer, unify all function names that belong to
negotiate, so they are less likely to be misused. This is important
because we rely on negotiation staying in main loop, as commented in
nbd_negotiate_read/write().
Signed-off-by: Fam Zheng <famz@redhat.com>
Message-Id: <1452760863-25350-4-git-send-email-famz@redhat.com>
Signed-off-by: Paolo Bonzini <pbonzini@redhat.com>
(cherry picked from commit 1a6245a5b0b4e8d822c739b403fc67c8a7bc8d12)
Signed-off-by: Fam Zheng <famz@redhat.com>
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
Conflicts:
nbd.c
Downstream doesn't have new style protocol, and the code is not split.
The patch is redone.
---
nbd.c | 106 +++++++++++++++++++++++++++++++++++++++++++++++++++---------------
1 file changed, 82 insertions(+), 24 deletions(-)
diff --git a/nbd.c b/nbd.c
index ba97270..97aeecb 100644
--- a/nbd.c
+++ b/nbd.c
@@ -167,6 +167,41 @@ ssize_t nbd_wr_sync(int fd, void *buffer, size_t size, bool do_read)
return offset;
}
+static void nbd_negotiate_continue(void *opaque)
+{
+ qemu_coroutine_enter(opaque, NULL);
+}
+
+static ssize_t read_sync(int fd, void *buffer, size_t size);
+static ssize_t write_sync(int fd, void *buffer, size_t size);
+
+static ssize_t nbd_negotiate_read(int fd, void *buffer, size_t size)
+{
+ ssize_t ret;
+
+ assert(qemu_in_coroutine());
+ /* Negotiation are always in main loop. */
+ qemu_set_fd_handler(fd, nbd_negotiate_continue, NULL,
+ qemu_coroutine_self());
+ ret = read_sync(fd, buffer, size);
+ qemu_set_fd_handler(fd, NULL, NULL, NULL);
+ return ret;
+
+}
+
+static ssize_t nbd_negotiate_write(int fd, void *buffer, size_t size)
+{
+ ssize_t ret;
+
+ assert(qemu_in_coroutine());
+ /* Negotiation are always in main loop. */
+ qemu_set_fd_handler(fd, NULL, nbd_negotiate_continue,
+ qemu_coroutine_self());
+ ret = write_sync(fd, buffer, size);
+ qemu_set_fd_handler(fd, NULL, NULL, NULL);
+ return ret;
+}
+
static ssize_t read_sync(int fd, void *buffer, size_t size)
{
/* Sockets are kept in blocking mode in the negotiation phase. After
@@ -280,7 +315,7 @@ int unix_socket_outgoing(const char *path)
*/
-static int nbd_receive_options(NBDClient *client)
+static coroutine_fn int nbd_negotiate_receive_options(NBDClient *client)
{
int csock = client->sock;
char name[256];
@@ -297,7 +332,7 @@ static int nbd_receive_options(NBDClient *client)
*/
rc = -EINVAL;
- if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed");
goto fail;
}
@@ -307,7 +342,7 @@ static int nbd_receive_options(NBDClient *client)
goto fail;
}
- if (read_sync(csock, &magic, sizeof(magic)) != sizeof(magic)) {
+ if (nbd_negotiate_read(csock, &magic, sizeof(magic)) != sizeof(magic)) {
LOG("read failed");
goto fail;
}
@@ -317,7 +352,7 @@ static int nbd_receive_options(NBDClient *client)
goto fail;
}
- if (read_sync(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
+ if (nbd_negotiate_read(csock, &tmp, sizeof(tmp)) != sizeof(tmp)) {
LOG("read failed");
goto fail;
}
@@ -327,7 +362,7 @@ static int nbd_receive_options(NBDClient *client)
goto fail;
}
- if (read_sync(csock, &length, sizeof(length)) != sizeof(length)) {
+ if (nbd_negotiate_read(csock, &length, sizeof(length)) != sizeof(length)) {
LOG("read failed");
goto fail;
}
@@ -337,7 +372,7 @@ static int nbd_receive_options(NBDClient *client)
LOG("Bad length received");
goto fail;
}
- if (read_sync(csock, name, length) != length) {
+ if (nbd_negotiate_read(csock, name, length) != length) {
LOG("read failed");
goto fail;
}
@@ -358,8 +393,14 @@ fail:
return rc;
}
-static int nbd_send_negotiate(NBDClient *client)
+typedef struct {
+ NBDClient *client;
+ Coroutine *co;
+} NBDClientNewData;
+
+static coroutine_fn int nbd_negotiate(NBDClientNewData *data)
{
+ NBDClient *client = data->client;
int csock = client->sock;
char buf[8 + 8 + 8 + 128];
int rc;
@@ -385,7 +426,6 @@ static int nbd_send_negotiate(NBDClient *client)
[28 .. 151] reserved (0)
*/
- qemu_set_block(csock);
rc = -EINVAL;
TRACE("Beginning negotiation.");
@@ -401,16 +441,16 @@ static int nbd_send_negotiate(NBDClient *client)
}
if (client->exp) {
- if (write_sync(csock, buf, sizeof(buf)) != sizeof(buf)) {
+ if (nbd_negotiate_write(csock, buf, sizeof(buf)) != sizeof(buf)) {
LOG("write failed");
goto fail;
}
} else {
- if (write_sync(csock, buf, 18) != 18) {
+ if (nbd_negotiate_write(csock, buf, 18) != 18) {
LOG("write failed");
goto fail;
}
- rc = nbd_receive_options(client);
+ rc = nbd_negotiate_receive_options(client);
if (rc < 0) {
LOG("option negotiation failed");
goto fail;
@@ -419,7 +459,8 @@ static int nbd_send_negotiate(NBDClient *client)
assert ((client->exp->nbdflags & ~65535) == 0);
cpu_to_be64w((uint64_t*)(buf + 18), client->exp->size);
cpu_to_be16w((uint16_t*)(buf + 26), client->exp->nbdflags | myflags);
- if (write_sync(csock, buf + 18, sizeof(buf) - 18) != sizeof(buf) - 18) {
+ if (nbd_negotiate_write(csock, buf + 18,
+ sizeof(buf) - 18) != sizeof(buf) - 18) {
LOG("write failed");
goto fail;
}
@@ -428,7 +469,6 @@ static int nbd_send_negotiate(NBDClient *client)
TRACE("Negotiation succeeded.");
rc = 0;
fail:
- qemu_set_nonblock(csock);
return rc;
}
@@ -1232,24 +1272,42 @@ static void nbd_restart_write(void *opaque)
qemu_coroutine_enter(client->send_coroutine, NULL);
}
+static coroutine_fn void nbd_co_client_start(void *opaque)
+{
+ NBDClientNewData *data = opaque;
+ NBDClient *client = data->client;
+ NBDExport *exp = client->exp;
+
+ if (exp) {
+ nbd_export_get(exp);
+ }
+ if (nbd_negotiate(data)) {
+ shutdown(client->sock, 2);
+ client->close(client);
+ goto out;
+ }
+ qemu_co_mutex_init(&client->send_lock);
+ qemu_set_fd_handler2(client->sock, nbd_can_read, nbd_read, NULL, client);
+
+ if (exp) {
+ QTAILQ_INSERT_TAIL(&exp->clients, client, next);
+ }
+out:
+ g_free(data);
+}
+
void nbd_client_new(NBDExport *exp, int csock, void (*close_fn)(NBDClient *))
{
NBDClient *client;
+ NBDClientNewData *data = g_new(NBDClientNewData, 1);
+
client = g_malloc0(sizeof(NBDClient));
client->refcount = 1;
client->exp = exp;
client->sock = csock;
- if (nbd_send_negotiate(client) < 0) {
- shutdown(client->sock, 2);
- close_fn(client);
- return;
- }
client->close = close_fn;
- qemu_co_mutex_init(&client->send_lock);
- qemu_set_fd_handler2(csock, nbd_can_read, nbd_read, NULL, client);
- if (exp) {
- QTAILQ_INSERT_TAIL(&exp->clients, client, next);
- nbd_export_get(exp);
- }
+ data->client = client;
+ data->co = qemu_coroutine_create(nbd_co_client_start);
+ qemu_coroutine_enter(data->co, data);
}
--
1.8.3.1