0a122b
From 5d7481d76a57e533f521a5d99ba8d35b5d69625c Mon Sep 17 00:00:00 2001
0a122b
Message-Id: <5d7481d76a57e533f521a5d99ba8d35b5d69625c.1387382496.git.minovotn@redhat.com>
0a122b
In-Reply-To: <c5386144fbf09f628148101bc674e2421cdd16e3.1387382496.git.minovotn@redhat.com>
0a122b
References: <c5386144fbf09f628148101bc674e2421cdd16e3.1387382496.git.minovotn@redhat.com>
0a122b
From: Nigel Croxon <ncroxon@redhat.com>
0a122b
Date: Thu, 14 Nov 2013 22:52:51 +0100
0a122b
Subject: [PATCH 15/46] rdma: core logic
0a122b
0a122b
RH-Author: Nigel Croxon <ncroxon@redhat.com>
0a122b
Message-id: <1384469598-13137-16-git-send-email-ncroxon@redhat.com>
0a122b
Patchwork-id: 55697
0a122b
O-Subject: [RHEL7.0 PATCH 15/42] rdma: core logic
0a122b
Bugzilla: 1011720
0a122b
RH-Acked-by: Orit Wasserman <owasserm@redhat.com>
0a122b
RH-Acked-by: Amit Shah <amit.shah@redhat.com>
0a122b
RH-Acked-by: Paolo Bonzini <pbonzini@redhat.com>
0a122b
0a122b
Bugzilla: 1011720
0a122b
https://bugzilla.redhat.com/show_bug.cgi?id=1011720
0a122b
0a122b
>From commit ID:
0a122b
commit 2da776db4846eadcb808598a5d3484d149773c05
0a122b
Author: Michael R. Hines <mrhines@us.ibm.com>
0a122b
Date:   Mon Jul 22 10:01:54 2013 -0400
0a122b
0a122b
    rdma: core logic
0a122b
0a122b
    Code that does need to be visible is kept
0a122b
    well contained inside this file and this is the only
0a122b
    new additional file to the entire patch.
0a122b
0a122b
    This file includes the entire protocol and interfaces
0a122b
    required to perform RDMA migration.
0a122b
0a122b
    Also, the configure and Makefile modifications to link
0a122b
    this file are included.
0a122b
0a122b
    Full documentation is in docs/rdma.txt
0a122b
0a122b
    Reviewed-by: Paolo Bonzini <pbonzini@redhat.com>
0a122b
    Reviewed-by: Chegu Vinod <chegu_vinod@hp.com>
0a122b
    Tested-by: Chegu Vinod <chegu_vinod@hp.com>
0a122b
    Tested-by: Michael R. Hines <mrhines@us.ibm.com>
0a122b
    Signed-off-by: Michael R. Hines <mrhines@us.ibm.com>
0a122b
    Signed-off-by: Juan Quintela <quintela@redhat.com>
0a122b
---
0a122b
 Makefile.objs                 |    1 +
0a122b
 configure                     |   40 +
0a122b
 include/migration/migration.h |    4 +
0a122b
 migration-rdma.c              | 3249 +++++++++++++++++++++++++++++++++++++++++
0a122b
 migration.c                   |    8 +
0a122b
 5 files changed, 3302 insertions(+), 0 deletions(-)
0a122b
 create mode 100644 migration-rdma.c
0a122b
0a122b
Signed-off-by: Michal Novotny <minovotn@redhat.com>
0a122b
---
0a122b
 Makefile.objs                 |    1 +
0a122b
 configure                     |   40 +
0a122b
 include/migration/migration.h |    4 +
0a122b
 migration-rdma.c              | 3249 +++++++++++++++++++++++++++++++++++++++++
0a122b
 migration.c                   |    8 +
0a122b
 5 files changed, 3302 insertions(+)
0a122b
 create mode 100644 migration-rdma.c
0a122b
0a122b
diff --git a/Makefile.objs b/Makefile.objs
0a122b
index 286ce06..67b4a28 100644
0a122b
--- a/Makefile.objs
0a122b
+++ b/Makefile.objs
0a122b
@@ -50,6 +50,7 @@ common-obj-$(CONFIG_POSIX) += os-posix.o
0a122b
 common-obj-$(CONFIG_LINUX) += fsdev/
0a122b
 
0a122b
 common-obj-y += migration.o migration-tcp.o
0a122b
+common-obj-$(CONFIG_RDMA) += migration-rdma.o
0a122b
 common-obj-y += qemu-char.o #aio.o
0a122b
 common-obj-y += block-migration.o
0a122b
 common-obj-y += page_cache.o xbzrle.o
0a122b
diff --git a/configure b/configure
0a122b
index 0a729ac..33235c4 100755
0a122b
--- a/configure
0a122b
+++ b/configure
0a122b
@@ -181,6 +181,7 @@ xfs=""
0a122b
 vhost_net="no"
0a122b
 vhost_scsi="no"
0a122b
 kvm="no"
0a122b
+rdma=""
0a122b
 gprof="no"
0a122b
 debug_tcg="no"
0a122b
 debug="no"
0a122b
@@ -925,6 +926,10 @@ for opt do
0a122b
   ;;
0a122b
   --enable-gtk) gtk="yes"
0a122b
   ;;
0a122b
+  --enable-rdma) rdma="yes"
0a122b
+  ;;
0a122b
+  --disable-rdma) rdma="no"
0a122b
+  ;;
0a122b
   --with-gtkabi=*) gtkabi="$optarg"
0a122b
   ;;
0a122b
   --enable-tpm) tpm="yes"
0a122b
@@ -1151,6 +1156,8 @@ echo "  --enable-bluez           enable bluez stack connectivity"
0a122b
 echo "  --disable-slirp          disable SLIRP userspace network connectivity"
0a122b
 echo "  --disable-kvm            disable KVM acceleration support"
0a122b
 echo "  --enable-kvm             enable KVM acceleration support"
0a122b
+echo "  --disable-rdma           disable RDMA-based migration support"
0a122b
+echo "  --enable-rdma            enable RDMA-based migration support"
0a122b
 echo "  --enable-tcg-interpreter enable TCG with bytecode interpreter (TCI)"
0a122b
 echo "  --disable-nptl           disable usermode NPTL support"
0a122b
 echo "  --enable-nptl            enable usermode NPTL support"
0a122b
@@ -1846,6 +1853,30 @@ EOF
0a122b
 fi
0a122b
 
0a122b
 ##########################################
0a122b
+# RDMA needs OpenFabrics libraries
0a122b
+if test "$rdma" != "no" ; then
0a122b
+  cat > $TMPC <
0a122b
+#include <rdma rdma_cma.h="">
0a122b
+int main(void) { return 0; }
0a122b
+EOF
0a122b
+  rdma_libs="-lrdmacm -libverbs"
0a122b
+  if compile_prog "" "$rdma_libs" ; then
0a122b
+    rdma="yes"
0a122b
+    libs_softmmu="$libs_softmmu $rdma_libs"
0a122b
+  else
0a122b
+    if test "$rdma" = "yes" ; then
0a122b
+        error_exit \
0a122b
+            " OpenFabrics librdmacm/libibverbs not present." \
0a122b
+            " Your options:" \
0a122b
+            "  (1) Fast: Install infiniband packages from your distro." \
0a122b
+            "  (2) Cleanest: Install libraries from www.openfabrics.org" \
0a122b
+            "  (3) Also: Install softiwarp if you don't have RDMA hardware"
0a122b
+    fi
0a122b
+    rdma="no"
0a122b
+  fi
0a122b
+fi
0a122b
+
0a122b
+##########################################
0a122b
 # VNC TLS/WS detection
0a122b
 if test "$vnc" = "yes" -a \( "$vnc_tls" != "no" -o "$vnc_ws" != "no" \) ; then
0a122b
   cat > $TMPC <
0a122b
@@ -3550,6 +3581,7 @@ echo "Linux AIO support $linux_aio"
0a122b
 echo "ATTR/XATTR support $attr"
0a122b
 echo "Install blobs     $blobs"
0a122b
 echo "KVM support       $kvm"
0a122b
+echo "RDMA support      $rdma"
0a122b
 echo "TCG interpreter   $tcg_interpreter"
0a122b
 echo "fdt support       $fdt"
0a122b
 echo "preadv support    $preadv"
0a122b
@@ -4032,6 +4064,10 @@ if test "$trace_default" = "yes"; then
0a122b
   echo "CONFIG_TRACE_DEFAULT=y" >> $config_host_mak
0a122b
 fi
0a122b
 
0a122b
+if test "$rdma" = "yes" ; then
0a122b
+  echo "CONFIG_RDMA=y" >> $config_host_mak
0a122b
+fi
0a122b
+
0a122b
 if test "$tcg_interpreter" = "yes"; then
0a122b
   QEMU_INCLUDES="-I\$(SRC_PATH)/tcg/tci $QEMU_INCLUDES"
0a122b
 elif test "$ARCH" = "sparc64" ; then
0a122b
@@ -4506,6 +4542,10 @@ if [ "$pixman" = "internal" ]; then
0a122b
   echo "config-host.h: subdir-pixman" >> $config_host_mak
0a122b
 fi
0a122b
 
0a122b
+if test "$rdma" = "yes" ; then
0a122b
+echo "CONFIG_RDMA=y" >> $config_host_mak
0a122b
+fi
0a122b
+
0a122b
 if [ "$dtc_internal" = "yes" ]; then
0a122b
   echo "config-host.h: subdir-dtc" >> $config_host_mak
0a122b
 fi
0a122b
diff --git a/include/migration/migration.h b/include/migration/migration.h
0a122b
index 90b5021..13a9629 100644
0a122b
--- a/include/migration/migration.h
0a122b
+++ b/include/migration/migration.h
0a122b
@@ -77,6 +77,10 @@ void fd_start_incoming_migration(const char *path, Error **errp);
0a122b
 
0a122b
 void fd_start_outgoing_migration(MigrationState *s, const char *fdname, Error **errp);
0a122b
 
0a122b
+void rdma_start_outgoing_migration(void *opaque, const char *host_port, Error **errp);
0a122b
+
0a122b
+void rdma_start_incoming_migration(const char *host_port, Error **errp);
0a122b
+
0a122b
 void migrate_fd_error(MigrationState *s);
0a122b
 
0a122b
 void migrate_fd_connect(MigrationState *s);
0a122b
diff --git a/migration-rdma.c b/migration-rdma.c
0a122b
new file mode 100644
0a122b
index 0000000..d044830
0a122b
--- /dev/null
0a122b
+++ b/migration-rdma.c
0a122b
@@ -0,0 +1,3249 @@
0a122b
+/*
0a122b
+ * RDMA protocol and interfaces
0a122b
+ *
0a122b
+ * Copyright IBM, Corp. 2010-2013
0a122b
+ *
0a122b
+ * Authors:
0a122b
+ *  Michael R. Hines <mrhines@us.ibm.com>
0a122b
+ *  Jiuxing Liu <jl@us.ibm.com>
0a122b
+ *
0a122b
+ * This work is licensed under the terms of the GNU GPL, version 2 or
0a122b
+ * later.  See the COPYING file in the top-level directory.
0a122b
+ *
0a122b
+ */
0a122b
+#include "qemu-common.h"
0a122b
+#include "migration/migration.h"
0a122b
+#include "migration/qemu-file.h"
0a122b
+#include "exec/cpu-common.h"
0a122b
+#include "qemu/main-loop.h"
0a122b
+#include "qemu/sockets.h"
0a122b
+#include "qemu/bitmap.h"
0a122b
+#include "block/coroutine.h"
0a122b
+#include <stdio.h>
0a122b
+#include <sys types.h="">
0a122b
+#include <sys socket.h="">
0a122b
+#include <netdb.h>
0a122b
+#include <arpa inet.h="">
0a122b
+#include <string.h>
0a122b
+#include <rdma rdma_cma.h="">
0a122b
+
0a122b
+#define DEBUG_RDMA
0a122b
+//#define DEBUG_RDMA_VERBOSE
0a122b
+//#define DEBUG_RDMA_REALLY_VERBOSE
0a122b
+
0a122b
+#ifdef DEBUG_RDMA
0a122b
+#define DPRINTF(fmt, ...) \
0a122b
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
0a122b
+#else
0a122b
+#define DPRINTF(fmt, ...) \
0a122b
+    do { } while (0)
0a122b
+#endif
0a122b
+
0a122b
+#ifdef DEBUG_RDMA_VERBOSE
0a122b
+#define DDPRINTF(fmt, ...) \
0a122b
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
0a122b
+#else
0a122b
+#define DDPRINTF(fmt, ...) \
0a122b
+    do { } while (0)
0a122b
+#endif
0a122b
+
0a122b
+#ifdef DEBUG_RDMA_REALLY_VERBOSE
0a122b
+#define DDDPRINTF(fmt, ...) \
0a122b
+    do { printf("rdma: " fmt, ## __VA_ARGS__); } while (0)
0a122b
+#else
0a122b
+#define DDDPRINTF(fmt, ...) \
0a122b
+    do { } while (0)
0a122b
+#endif
0a122b
+
0a122b
+/*
0a122b
+ * Print and error on both the Monitor and the Log file.
0a122b
+ */
0a122b
+#define ERROR(errp, fmt, ...) \
0a122b
+    do { \
0a122b
+        fprintf(stderr, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
0a122b
+        if (errp && (*(errp) == NULL)) { \
0a122b
+            error_setg(errp, "RDMA ERROR: " fmt, ## __VA_ARGS__); \
0a122b
+        } \
0a122b
+    } while (0)
0a122b
+
0a122b
+#define RDMA_RESOLVE_TIMEOUT_MS 10000
0a122b
+
0a122b
+/* Do not merge data if larger than this. */
0a122b
+#define RDMA_MERGE_MAX (2 * 1024 * 1024)
0a122b
+#define RDMA_SIGNALED_SEND_MAX (RDMA_MERGE_MAX / 4096)
0a122b
+
0a122b
+#define RDMA_REG_CHUNK_SHIFT 20 /* 1 MB */
0a122b
+
0a122b
+/*
0a122b
+ * This is only for non-live state being migrated.
0a122b
+ * Instead of RDMA_WRITE messages, we use RDMA_SEND
0a122b
+ * messages for that state, which requires a different
0a122b
+ * delivery design than main memory.
0a122b
+ */
0a122b
+#define RDMA_SEND_INCREMENT 32768
0a122b
+
0a122b
+/*
0a122b
+ * Maximum size infiniband SEND message
0a122b
+ */
0a122b
+#define RDMA_CONTROL_MAX_BUFFER (512 * 1024)
0a122b
+#define RDMA_CONTROL_MAX_COMMANDS_PER_MESSAGE 4096
0a122b
+
0a122b
+#define RDMA_CONTROL_VERSION_CURRENT 1
0a122b
+/*
0a122b
+ * Capabilities for negotiation.
0a122b
+ */
0a122b
+#define RDMA_CAPABILITY_PIN_ALL 0x01
0a122b
+
0a122b
+/*
0a122b
+ * Add the other flags above to this list of known capabilities
0a122b
+ * as they are introduced.
0a122b
+ */
0a122b
+static uint32_t known_capabilities = RDMA_CAPABILITY_PIN_ALL;
0a122b
+
0a122b
+#define CHECK_ERROR_STATE() \
0a122b
+    do { \
0a122b
+        if (rdma->error_state) { \
0a122b
+            if (!rdma->error_reported) { \
0a122b
+                fprintf(stderr, "RDMA is in an error state waiting migration" \
0a122b
+                                " to abort!\n"); \
0a122b
+                rdma->error_reported = 1; \
0a122b
+            } \
0a122b
+            return rdma->error_state; \
0a122b
+        } \
0a122b
+    } while (0);
0a122b
+
0a122b
+/*
0a122b
+ * A work request ID is 64-bits and we split up these bits
0a122b
+ * into 3 parts:
0a122b
+ *
0a122b
+ * bits 0-15 : type of control message, 2^16
0a122b
+ * bits 16-29: ram block index, 2^14
0a122b
+ * bits 30-63: ram block chunk number, 2^34
0a122b
+ *
0a122b
+ * The last two bit ranges are only used for RDMA writes,
0a122b
+ * in order to track their completion and potentially
0a122b
+ * also track unregistration status of the message.
0a122b
+ */
0a122b
+#define RDMA_WRID_TYPE_SHIFT  0UL
0a122b
+#define RDMA_WRID_BLOCK_SHIFT 16UL
0a122b
+#define RDMA_WRID_CHUNK_SHIFT 30UL
0a122b
+
0a122b
+#define RDMA_WRID_TYPE_MASK \
0a122b
+    ((1UL << RDMA_WRID_BLOCK_SHIFT) - 1UL)
0a122b
+
0a122b
+#define RDMA_WRID_BLOCK_MASK \
0a122b
+    (~RDMA_WRID_TYPE_MASK & ((1UL << RDMA_WRID_CHUNK_SHIFT) - 1UL))
0a122b
+
0a122b
+#define RDMA_WRID_CHUNK_MASK (~RDMA_WRID_BLOCK_MASK & ~RDMA_WRID_TYPE_MASK)
0a122b
+
0a122b
+/*
0a122b
+ * RDMA migration protocol:
0a122b
+ * 1. RDMA Writes (data messages, i.e. RAM)
0a122b
+ * 2. IB Send/Recv (control channel messages)
0a122b
+ */
0a122b
+enum {
0a122b
+    RDMA_WRID_NONE = 0,
0a122b
+    RDMA_WRID_RDMA_WRITE = 1,
0a122b
+    RDMA_WRID_SEND_CONTROL = 2000,
0a122b
+    RDMA_WRID_RECV_CONTROL = 4000,
0a122b
+};
0a122b
+
0a122b
+const char *wrid_desc[] = {
0a122b
+    [RDMA_WRID_NONE] = "NONE",
0a122b
+    [RDMA_WRID_RDMA_WRITE] = "WRITE RDMA",
0a122b
+    [RDMA_WRID_SEND_CONTROL] = "CONTROL SEND",
0a122b
+    [RDMA_WRID_RECV_CONTROL] = "CONTROL RECV",
0a122b
+};
0a122b
+
0a122b
+/*
0a122b
+ * Work request IDs for IB SEND messages only (not RDMA writes).
0a122b
+ * This is used by the migration protocol to transmit
0a122b
+ * control messages (such as device state and registration commands)
0a122b
+ *
0a122b
+ * We could use more WRs, but we have enough for now.
0a122b
+ */
0a122b
+enum {
0a122b
+    RDMA_WRID_READY = 0,
0a122b
+    RDMA_WRID_DATA,
0a122b
+    RDMA_WRID_CONTROL,
0a122b
+    RDMA_WRID_MAX,
0a122b
+};
0a122b
+
0a122b
+/*
0a122b
+ * SEND/RECV IB Control Messages.
0a122b
+ */
0a122b
+enum {
0a122b
+    RDMA_CONTROL_NONE = 0,
0a122b
+    RDMA_CONTROL_ERROR,
0a122b
+    RDMA_CONTROL_READY,               /* ready to receive */
0a122b
+    RDMA_CONTROL_QEMU_FILE,           /* QEMUFile-transmitted bytes */
0a122b
+    RDMA_CONTROL_RAM_BLOCKS_REQUEST,  /* RAMBlock synchronization */
0a122b
+    RDMA_CONTROL_RAM_BLOCKS_RESULT,   /* RAMBlock synchronization */
0a122b
+    RDMA_CONTROL_COMPRESS,            /* page contains repeat values */
0a122b
+    RDMA_CONTROL_REGISTER_REQUEST,    /* dynamic page registration */
0a122b
+    RDMA_CONTROL_REGISTER_RESULT,     /* key to use after registration */
0a122b
+    RDMA_CONTROL_REGISTER_FINISHED,   /* current iteration finished */
0a122b
+    RDMA_CONTROL_UNREGISTER_REQUEST,  /* dynamic UN-registration */
0a122b
+    RDMA_CONTROL_UNREGISTER_FINISHED, /* unpinning finished */
0a122b
+};
0a122b
+
0a122b
+const char *control_desc[] = {
0a122b
+    [RDMA_CONTROL_NONE] = "NONE",
0a122b
+    [RDMA_CONTROL_ERROR] = "ERROR",
0a122b
+    [RDMA_CONTROL_READY] = "READY",
0a122b
+    [RDMA_CONTROL_QEMU_FILE] = "QEMU FILE",
0a122b
+    [RDMA_CONTROL_RAM_BLOCKS_REQUEST] = "RAM BLOCKS REQUEST",
0a122b
+    [RDMA_CONTROL_RAM_BLOCKS_RESULT] = "RAM BLOCKS RESULT",
0a122b
+    [RDMA_CONTROL_COMPRESS] = "COMPRESS",
0a122b
+    [RDMA_CONTROL_REGISTER_REQUEST] = "REGISTER REQUEST",
0a122b
+    [RDMA_CONTROL_REGISTER_RESULT] = "REGISTER RESULT",
0a122b
+    [RDMA_CONTROL_REGISTER_FINISHED] = "REGISTER FINISHED",
0a122b
+    [RDMA_CONTROL_UNREGISTER_REQUEST] = "UNREGISTER REQUEST",
0a122b
+    [RDMA_CONTROL_UNREGISTER_FINISHED] = "UNREGISTER FINISHED",
0a122b
+};
0a122b
+
0a122b
+/*
0a122b
+ * Memory and MR structures used to represent an IB Send/Recv work request.
0a122b
+ * This is *not* used for RDMA writes, only IB Send/Recv.
0a122b
+ */
0a122b
+typedef struct {
0a122b
+    uint8_t  control[RDMA_CONTROL_MAX_BUFFER]; /* actual buffer to register */
0a122b
+    struct   ibv_mr *control_mr;               /* registration metadata */
0a122b
+    size_t   control_len;                      /* length of the message */
0a122b
+    uint8_t *control_curr;                     /* start of unconsumed bytes */
0a122b
+} RDMAWorkRequestData;
0a122b
+
0a122b
+/*
0a122b
+ * Negotiate RDMA capabilities during connection-setup time.
0a122b
+ */
0a122b
+typedef struct {
0a122b
+    uint32_t version;
0a122b
+    uint32_t flags;
0a122b
+} RDMACapabilities;
0a122b
+
0a122b
+static void caps_to_network(RDMACapabilities *cap)
0a122b
+{
0a122b
+    cap->version = htonl(cap->version);
0a122b
+    cap->flags = htonl(cap->flags);
0a122b
+}
0a122b
+
0a122b
+static void network_to_caps(RDMACapabilities *cap)
0a122b
+{
0a122b
+    cap->version = ntohl(cap->version);
0a122b
+    cap->flags = ntohl(cap->flags);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Representation of a RAMBlock from an RDMA perspective.
0a122b
+ * This is not transmitted, only local.
0a122b
+ * This and subsequent structures cannot be linked lists
0a122b
+ * because we're using a single IB message to transmit
0a122b
+ * the information. It's small anyway, so a list is overkill.
0a122b
+ */
0a122b
+typedef struct RDMALocalBlock {
0a122b
+    uint8_t  *local_host_addr; /* local virtual address */
0a122b
+    uint64_t remote_host_addr; /* remote virtual address */
0a122b
+    uint64_t offset;
0a122b
+    uint64_t length;
0a122b
+    struct   ibv_mr **pmr;     /* MRs for chunk-level registration */
0a122b
+    struct   ibv_mr *mr;       /* MR for non-chunk-level registration */
0a122b
+    uint32_t *remote_keys;     /* rkeys for chunk-level registration */
0a122b
+    uint32_t remote_rkey;      /* rkeys for non-chunk-level registration */
0a122b
+    int      index;            /* which block are we */
0a122b
+    bool     is_ram_block;
0a122b
+    int      nb_chunks;
0a122b
+    unsigned long *transit_bitmap;
0a122b
+    unsigned long *unregister_bitmap;
0a122b
+} RDMALocalBlock;
0a122b
+
0a122b
+/*
0a122b
+ * Also represents a RAMblock, but only on the dest.
0a122b
+ * This gets transmitted by the dest during connection-time
0a122b
+ * to the source VM and then is used to populate the
0a122b
+ * corresponding RDMALocalBlock with
0a122b
+ * the information needed to perform the actual RDMA.
0a122b
+ */
0a122b
+typedef struct QEMU_PACKED RDMARemoteBlock {
0a122b
+    uint64_t remote_host_addr;
0a122b
+    uint64_t offset;
0a122b
+    uint64_t length;
0a122b
+    uint32_t remote_rkey;
0a122b
+    uint32_t padding;
0a122b
+} RDMARemoteBlock;
0a122b
+
0a122b
+static uint64_t htonll(uint64_t v)
0a122b
+{
0a122b
+    union { uint32_t lv[2]; uint64_t llv; } u;
0a122b
+    u.lv[0] = htonl(v >> 32);
0a122b
+    u.lv[1] = htonl(v & 0xFFFFFFFFULL);
0a122b
+    return u.llv;
0a122b
+}
0a122b
+
0a122b
+static uint64_t ntohll(uint64_t v) {
0a122b
+    union { uint32_t lv[2]; uint64_t llv; } u;
0a122b
+    u.llv = v;
0a122b
+    return ((uint64_t)ntohl(u.lv[0]) << 32) | (uint64_t) ntohl(u.lv[1]);
0a122b
+}
0a122b
+
0a122b
+static void remote_block_to_network(RDMARemoteBlock *rb)
0a122b
+{
0a122b
+    rb->remote_host_addr = htonll(rb->remote_host_addr);
0a122b
+    rb->offset = htonll(rb->offset);
0a122b
+    rb->length = htonll(rb->length);
0a122b
+    rb->remote_rkey = htonl(rb->remote_rkey);
0a122b
+}
0a122b
+
0a122b
+static void network_to_remote_block(RDMARemoteBlock *rb)
0a122b
+{
0a122b
+    rb->remote_host_addr = ntohll(rb->remote_host_addr);
0a122b
+    rb->offset = ntohll(rb->offset);
0a122b
+    rb->length = ntohll(rb->length);
0a122b
+    rb->remote_rkey = ntohl(rb->remote_rkey);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Virtual address of the above structures used for transmitting
0a122b
+ * the RAMBlock descriptions at connection-time.
0a122b
+ * This structure is *not* transmitted.
0a122b
+ */
0a122b
+typedef struct RDMALocalBlocks {
0a122b
+    int nb_blocks;
0a122b
+    bool     init;             /* main memory init complete */
0a122b
+    RDMALocalBlock *block;
0a122b
+} RDMALocalBlocks;
0a122b
+
0a122b
+/*
0a122b
+ * Main data structure for RDMA state.
0a122b
+ * While there is only one copy of this structure being allocated right now,
0a122b
+ * this is the place where one would start if you wanted to consider
0a122b
+ * having more than one RDMA connection open at the same time.
0a122b
+ */
0a122b
+typedef struct RDMAContext {
0a122b
+    char *host;
0a122b
+    int port;
0a122b
+
0a122b
+    RDMAWorkRequestData wr_data[RDMA_WRID_MAX + 1];
0a122b
+
0a122b
+    /*
0a122b
+     * This is used by *_exchange_send() to figure out whether or not
0a122b
+     * the initial "READY" message has already been received or not.
0a122b
+     * This is because other functions may potentially poll() and detect
0a122b
+     * the READY message before send() does, in which case we need to
0a122b
+     * know if it completed.
0a122b
+     */
0a122b
+    int control_ready_expected;
0a122b
+
0a122b
+    /* number of outstanding writes */
0a122b
+    int nb_sent;
0a122b
+
0a122b
+    /* store info about current buffer so that we can
0a122b
+       merge it with future sends */
0a122b
+    uint64_t current_addr;
0a122b
+    uint64_t current_length;
0a122b
+    /* index of ram block the current buffer belongs to */
0a122b
+    int current_index;
0a122b
+    /* index of the chunk in the current ram block */
0a122b
+    int current_chunk;
0a122b
+
0a122b
+    bool pin_all;
0a122b
+
0a122b
+    /*
0a122b
+     * infiniband-specific variables for opening the device
0a122b
+     * and maintaining connection state and so forth.
0a122b
+     *
0a122b
+     * cm_id also has ibv_context, rdma_event_channel, and ibv_qp in
0a122b
+     * cm_id->verbs, cm_id->channel, and cm_id->qp.
0a122b
+     */
0a122b
+    struct rdma_cm_id *cm_id;               /* connection manager ID */
0a122b
+    struct rdma_cm_id *listen_id;
0a122b
+
0a122b
+    struct ibv_context          *verbs;
0a122b
+    struct rdma_event_channel   *channel;
0a122b
+    struct ibv_qp *qp;                      /* queue pair */
0a122b
+    struct ibv_comp_channel *comp_channel;  /* completion channel */
0a122b
+    struct ibv_pd *pd;                      /* protection domain */
0a122b
+    struct ibv_cq *cq;                      /* completion queue */
0a122b
+
0a122b
+    /*
0a122b
+     * If a previous write failed (perhaps because of a failed
0a122b
+     * memory registration, then do not attempt any future work
0a122b
+     * and remember the error state.
0a122b
+     */
0a122b
+    int error_state;
0a122b
+    int error_reported;
0a122b
+
0a122b
+    /*
0a122b
+     * Description of ram blocks used throughout the code.
0a122b
+     */
0a122b
+    RDMALocalBlocks local_ram_blocks;
0a122b
+    RDMARemoteBlock *block;
0a122b
+
0a122b
+    /*
0a122b
+     * Migration on *destination* started.
0a122b
+     * Then use coroutine yield function.
0a122b
+     * Source runs in a thread, so we don't care.
0a122b
+     */
0a122b
+    int migration_started_on_destination;
0a122b
+
0a122b
+    int total_registrations;
0a122b
+    int total_writes;
0a122b
+
0a122b
+    int unregister_current, unregister_next;
0a122b
+    uint64_t unregistrations[RDMA_SIGNALED_SEND_MAX];
0a122b
+
0a122b
+    GHashTable *blockmap;
0a122b
+} RDMAContext;
0a122b
+
0a122b
+/*
0a122b
+ * Interface to the rest of the migration call stack.
0a122b
+ */
0a122b
+typedef struct QEMUFileRDMA {
0a122b
+    RDMAContext *rdma;
0a122b
+    size_t len;
0a122b
+    void *file;
0a122b
+} QEMUFileRDMA;
0a122b
+
0a122b
+/*
0a122b
+ * Main structure for IB Send/Recv control messages.
0a122b
+ * This gets prepended at the beginning of every Send/Recv.
0a122b
+ */
0a122b
+typedef struct QEMU_PACKED {
0a122b
+    uint32_t len;     /* Total length of data portion */
0a122b
+    uint32_t type;    /* which control command to perform */
0a122b
+    uint32_t repeat;  /* number of commands in data portion of same type */
0a122b
+    uint32_t padding;
0a122b
+} RDMAControlHeader;
0a122b
+
0a122b
+static void control_to_network(RDMAControlHeader *control)
0a122b
+{
0a122b
+    control->type = htonl(control->type);
0a122b
+    control->len = htonl(control->len);
0a122b
+    control->repeat = htonl(control->repeat);
0a122b
+}
0a122b
+
0a122b
+static void network_to_control(RDMAControlHeader *control)
0a122b
+{
0a122b
+    control->type = ntohl(control->type);
0a122b
+    control->len = ntohl(control->len);
0a122b
+    control->repeat = ntohl(control->repeat);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Register a single Chunk.
0a122b
+ * Information sent by the source VM to inform the dest
0a122b
+ * to register an single chunk of memory before we can perform
0a122b
+ * the actual RDMA operation.
0a122b
+ */
0a122b
+typedef struct QEMU_PACKED {
0a122b
+    union QEMU_PACKED {
0a122b
+        uint64_t current_addr;  /* offset into the ramblock of the chunk */
0a122b
+        uint64_t chunk;         /* chunk to lookup if unregistering */
0a122b
+    } key;
0a122b
+    uint32_t current_index; /* which ramblock the chunk belongs to */
0a122b
+    uint32_t padding;
0a122b
+    uint64_t chunks;            /* how many sequential chunks to register */
0a122b
+} RDMARegister;
0a122b
+
0a122b
+static void register_to_network(RDMARegister *reg)
0a122b
+{
0a122b
+    reg->key.current_addr = htonll(reg->key.current_addr);
0a122b
+    reg->current_index = htonl(reg->current_index);
0a122b
+    reg->chunks = htonll(reg->chunks);
0a122b
+}
0a122b
+
0a122b
+static void network_to_register(RDMARegister *reg)
0a122b
+{
0a122b
+    reg->key.current_addr = ntohll(reg->key.current_addr);
0a122b
+    reg->current_index = ntohl(reg->current_index);
0a122b
+    reg->chunks = ntohll(reg->chunks);
0a122b
+}
0a122b
+
0a122b
+typedef struct QEMU_PACKED {
0a122b
+    uint32_t value;     /* if zero, we will madvise() */
0a122b
+    uint32_t block_idx; /* which ram block index */
0a122b
+    uint64_t offset;    /* where in the remote ramblock this chunk */
0a122b
+    uint64_t length;    /* length of the chunk */
0a122b
+} RDMACompress;
0a122b
+
0a122b
+static void compress_to_network(RDMACompress *comp)
0a122b
+{
0a122b
+    comp->value = htonl(comp->value);
0a122b
+    comp->block_idx = htonl(comp->block_idx);
0a122b
+    comp->offset = htonll(comp->offset);
0a122b
+    comp->length = htonll(comp->length);
0a122b
+}
0a122b
+
0a122b
+static void network_to_compress(RDMACompress *comp)
0a122b
+{
0a122b
+    comp->value = ntohl(comp->value);
0a122b
+    comp->block_idx = ntohl(comp->block_idx);
0a122b
+    comp->offset = ntohll(comp->offset);
0a122b
+    comp->length = ntohll(comp->length);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * The result of the dest's memory registration produces an "rkey"
0a122b
+ * which the source VM must reference in order to perform
0a122b
+ * the RDMA operation.
0a122b
+ */
0a122b
+typedef struct QEMU_PACKED {
0a122b
+    uint32_t rkey;
0a122b
+    uint32_t padding;
0a122b
+    uint64_t host_addr;
0a122b
+} RDMARegisterResult;
0a122b
+
0a122b
+static void result_to_network(RDMARegisterResult *result)
0a122b
+{
0a122b
+    result->rkey = htonl(result->rkey);
0a122b
+    result->host_addr = htonll(result->host_addr);
0a122b
+};
0a122b
+
0a122b
+static void network_to_result(RDMARegisterResult *result)
0a122b
+{
0a122b
+    result->rkey = ntohl(result->rkey);
0a122b
+    result->host_addr = ntohll(result->host_addr);
0a122b
+};
0a122b
+
0a122b
+const char *print_wrid(int wrid);
0a122b
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
0a122b
+                                   uint8_t *data, RDMAControlHeader *resp,
0a122b
+                                   int *resp_idx,
0a122b
+                                   int (*callback)(RDMAContext *rdma));
0a122b
+
0a122b
+static inline uint64_t ram_chunk_index(uint8_t *start, uint8_t *host)
0a122b
+{
0a122b
+    return ((uintptr_t) host - (uintptr_t) start) >> RDMA_REG_CHUNK_SHIFT;
0a122b
+}
0a122b
+
0a122b
+static inline uint8_t *ram_chunk_start(RDMALocalBlock *rdma_ram_block,
0a122b
+                                       uint64_t i)
0a122b
+{
0a122b
+    return (uint8_t *) (((uintptr_t) rdma_ram_block->local_host_addr)
0a122b
+                                    + (i << RDMA_REG_CHUNK_SHIFT));
0a122b
+}
0a122b
+
0a122b
+static inline uint8_t *ram_chunk_end(RDMALocalBlock *rdma_ram_block, uint64_t i)
0a122b
+{
0a122b
+    uint8_t *result = ram_chunk_start(rdma_ram_block, i) +
0a122b
+                                         (1UL << RDMA_REG_CHUNK_SHIFT);
0a122b
+
0a122b
+    if (result > (rdma_ram_block->local_host_addr + rdma_ram_block->length)) {
0a122b
+        result = rdma_ram_block->local_host_addr + rdma_ram_block->length;
0a122b
+    }
0a122b
+
0a122b
+    return result;
0a122b
+}
0a122b
+
0a122b
+static int __qemu_rdma_add_block(RDMAContext *rdma, void *host_addr,
0a122b
+                         ram_addr_t block_offset, uint64_t length)
0a122b
+{
0a122b
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
0a122b
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
0a122b
+        (void *) block_offset);
0a122b
+    RDMALocalBlock *old = local->block;
0a122b
+
0a122b
+    assert(block == NULL);
0a122b
+
0a122b
+    local->block = g_malloc0(sizeof(RDMALocalBlock) * (local->nb_blocks + 1));
0a122b
+
0a122b
+    if (local->nb_blocks) {
0a122b
+        int x;
0a122b
+
0a122b
+        for (x = 0; x < local->nb_blocks; x++) {
0a122b
+            g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
0a122b
+            g_hash_table_insert(rdma->blockmap, (void *)old[x].offset,
0a122b
+                                                &local->block[x]);
0a122b
+        }
0a122b
+        memcpy(local->block, old, sizeof(RDMALocalBlock) * local->nb_blocks);
0a122b
+        g_free(old);
0a122b
+    }
0a122b
+
0a122b
+    block = &local->block[local->nb_blocks];
0a122b
+
0a122b
+    block->local_host_addr = host_addr;
0a122b
+    block->offset = block_offset;
0a122b
+    block->length = length;
0a122b
+    block->index = local->nb_blocks;
0a122b
+    block->nb_chunks = ram_chunk_index(host_addr, host_addr + length) + 1UL;
0a122b
+    block->transit_bitmap = bitmap_new(block->nb_chunks);
0a122b
+    bitmap_clear(block->transit_bitmap, 0, block->nb_chunks);
0a122b
+    block->unregister_bitmap = bitmap_new(block->nb_chunks);
0a122b
+    bitmap_clear(block->unregister_bitmap, 0, block->nb_chunks);
0a122b
+    block->remote_keys = g_malloc0(block->nb_chunks * sizeof(uint32_t));
0a122b
+
0a122b
+    block->is_ram_block = local->init ? false : true;
0a122b
+
0a122b
+    g_hash_table_insert(rdma->blockmap, (void *) block_offset, block);
0a122b
+
0a122b
+    DDPRINTF("Added Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
0a122b
+           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
0a122b
+            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
0a122b
+            block->length, (uint64_t) (block->local_host_addr + block->length),
0a122b
+                BITS_TO_LONGS(block->nb_chunks) *
0a122b
+                    sizeof(unsigned long) * 8, block->nb_chunks);
0a122b
+
0a122b
+    local->nb_blocks++;
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Memory regions need to be registered with the device and queue pairs setup
0a122b
+ * in advanced before the migration starts. This tells us where the RAM blocks
0a122b
+ * are so that we can register them individually.
0a122b
+ */
0a122b
+static void qemu_rdma_init_one_block(void *host_addr,
0a122b
+    ram_addr_t block_offset, ram_addr_t length, void *opaque)
0a122b
+{
0a122b
+    __qemu_rdma_add_block(opaque, host_addr, block_offset, length);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Identify the RAMBlocks and their quantity. They will be references to
0a122b
+ * identify chunk boundaries inside each RAMBlock and also be referenced
0a122b
+ * during dynamic page registration.
0a122b
+ */
0a122b
+static int qemu_rdma_init_ram_blocks(RDMAContext *rdma)
0a122b
+{
0a122b
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
0a122b
+
0a122b
+    assert(rdma->blockmap == NULL);
0a122b
+    rdma->blockmap = g_hash_table_new(g_direct_hash, g_direct_equal);
0a122b
+    memset(local, 0, sizeof *local);
0a122b
+    qemu_ram_foreach_block(qemu_rdma_init_one_block, rdma);
0a122b
+    DPRINTF("Allocated %d local ram block structures\n", local->nb_blocks);
0a122b
+    rdma->block = (RDMARemoteBlock *) g_malloc0(sizeof(RDMARemoteBlock) *
0a122b
+                        rdma->local_ram_blocks.nb_blocks);
0a122b
+    local->init = true;
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+static int __qemu_rdma_delete_block(RDMAContext *rdma, ram_addr_t block_offset)
0a122b
+{
0a122b
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
0a122b
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
0a122b
+        (void *) block_offset);
0a122b
+    RDMALocalBlock *old = local->block;
0a122b
+    int x;
0a122b
+
0a122b
+    assert(block);
0a122b
+
0a122b
+    if (block->pmr) {
0a122b
+        int j;
0a122b
+
0a122b
+        for (j = 0; j < block->nb_chunks; j++) {
0a122b
+            if (!block->pmr[j]) {
0a122b
+                continue;
0a122b
+            }
0a122b
+            ibv_dereg_mr(block->pmr[j]);
0a122b
+            rdma->total_registrations--;
0a122b
+        }
0a122b
+        g_free(block->pmr);
0a122b
+        block->pmr = NULL;
0a122b
+    }
0a122b
+
0a122b
+    if (block->mr) {
0a122b
+        ibv_dereg_mr(block->mr);
0a122b
+        rdma->total_registrations--;
0a122b
+        block->mr = NULL;
0a122b
+    }
0a122b
+
0a122b
+    g_free(block->transit_bitmap);
0a122b
+    block->transit_bitmap = NULL;
0a122b
+
0a122b
+    g_free(block->unregister_bitmap);
0a122b
+    block->unregister_bitmap = NULL;
0a122b
+
0a122b
+    g_free(block->remote_keys);
0a122b
+    block->remote_keys = NULL;
0a122b
+
0a122b
+    for (x = 0; x < local->nb_blocks; x++) {
0a122b
+        g_hash_table_remove(rdma->blockmap, (void *)old[x].offset);
0a122b
+    }
0a122b
+
0a122b
+    if (local->nb_blocks > 1) {
0a122b
+
0a122b
+        local->block = g_malloc0(sizeof(RDMALocalBlock) *
0a122b
+                                    (local->nb_blocks - 1));
0a122b
+
0a122b
+        if (block->index) {
0a122b
+            memcpy(local->block, old, sizeof(RDMALocalBlock) * block->index);
0a122b
+        }
0a122b
+
0a122b
+        if (block->index < (local->nb_blocks - 1)) {
0a122b
+            memcpy(local->block + block->index, old + (block->index + 1),
0a122b
+                sizeof(RDMALocalBlock) *
0a122b
+                    (local->nb_blocks - (block->index + 1)));
0a122b
+        }
0a122b
+    } else {
0a122b
+        assert(block == local->block);
0a122b
+        local->block = NULL;
0a122b
+    }
0a122b
+
0a122b
+    DDPRINTF("Deleted Block: %d, addr: %" PRIu64 ", offset: %" PRIu64
0a122b
+           " length: %" PRIu64 " end: %" PRIu64 " bits %" PRIu64 " chunks %d\n",
0a122b
+            local->nb_blocks, (uint64_t) block->local_host_addr, block->offset,
0a122b
+            block->length, (uint64_t) (block->local_host_addr + block->length),
0a122b
+                BITS_TO_LONGS(block->nb_chunks) *
0a122b
+                    sizeof(unsigned long) * 8, block->nb_chunks);
0a122b
+
0a122b
+    g_free(old);
0a122b
+
0a122b
+    local->nb_blocks--;
0a122b
+
0a122b
+    if (local->nb_blocks) {
0a122b
+        for (x = 0; x < local->nb_blocks; x++) {
0a122b
+            g_hash_table_insert(rdma->blockmap, (void *)local->block[x].offset,
0a122b
+                                                &local->block[x]);
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Put in the log file which RDMA device was opened and the details
0a122b
+ * associated with that device.
0a122b
+ */
0a122b
+static void qemu_rdma_dump_id(const char *who, struct ibv_context *verbs)
0a122b
+{
0a122b
+    printf("%s RDMA Device opened: kernel name %s "
0a122b
+           "uverbs device name %s, "
0a122b
+           "infiniband_verbs class device path %s,"
0a122b
+           " infiniband class device path %s\n",
0a122b
+                who,
0a122b
+                verbs->device->name,
0a122b
+                verbs->device->dev_name,
0a122b
+                verbs->device->dev_path,
0a122b
+                verbs->device->ibdev_path);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Put in the log file the RDMA gid addressing information,
0a122b
+ * useful for folks who have trouble understanding the
0a122b
+ * RDMA device hierarchy in the kernel.
0a122b
+ */
0a122b
+static void qemu_rdma_dump_gid(const char *who, struct rdma_cm_id *id)
0a122b
+{
0a122b
+    char sgid[33];
0a122b
+    char dgid[33];
0a122b
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.sgid, sgid, sizeof sgid);
0a122b
+    inet_ntop(AF_INET6, &id->route.addr.addr.ibaddr.dgid, dgid, sizeof dgid);
0a122b
+    DPRINTF("%s Source GID: %s, Dest GID: %s\n", who, sgid, dgid);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Figure out which RDMA device corresponds to the requested IP hostname
0a122b
+ * Also create the initial connection manager identifiers for opening
0a122b
+ * the connection.
0a122b
+ */
0a122b
+static int qemu_rdma_resolve_host(RDMAContext *rdma, Error **errp)
0a122b
+{
0a122b
+    int ret;
0a122b
+    struct addrinfo *res;
0a122b
+    char port_str[16];
0a122b
+    struct rdma_cm_event *cm_event;
0a122b
+    char ip[40] = "unknown";
0a122b
+
0a122b
+    if (rdma->host == NULL || !strcmp(rdma->host, "")) {
0a122b
+        ERROR(errp, "RDMA hostname has not been set\n");
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    /* create CM channel */
0a122b
+    rdma->channel = rdma_create_event_channel();
0a122b
+    if (!rdma->channel) {
0a122b
+        ERROR(errp, "could not create CM channel\n");
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    /* create CM id */
0a122b
+    ret = rdma_create_id(rdma->channel, &rdma->cm_id, NULL, RDMA_PS_TCP);
0a122b
+    if (ret) {
0a122b
+        ERROR(errp, "could not create channel id\n");
0a122b
+        goto err_resolve_create_id;
0a122b
+    }
0a122b
+
0a122b
+    snprintf(port_str, 16, "%d", rdma->port);
0a122b
+    port_str[15] = '\0';
0a122b
+
0a122b
+    ret = getaddrinfo(rdma->host, port_str, NULL, &res);
0a122b
+    if (ret < 0) {
0a122b
+        ERROR(errp, "could not getaddrinfo address %s\n", rdma->host);
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+
0a122b
+    inet_ntop(AF_INET, &((struct sockaddr_in *) res->ai_addr)->sin_addr,
0a122b
+                                ip, sizeof ip);
0a122b
+    DPRINTF("%s => %s\n", rdma->host, ip);
0a122b
+
0a122b
+    /* resolve the first address */
0a122b
+    ret = rdma_resolve_addr(rdma->cm_id, NULL, res->ai_addr,
0a122b
+            RDMA_RESOLVE_TIMEOUT_MS);
0a122b
+    if (ret) {
0a122b
+        ERROR(errp, "could not resolve address %s\n", rdma->host);
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+
0a122b
+    qemu_rdma_dump_gid("source_resolve_addr", rdma->cm_id);
0a122b
+
0a122b
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
0a122b
+    if (ret) {
0a122b
+        ERROR(errp, "could not perform event_addr_resolved\n");
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+
0a122b
+    if (cm_event->event != RDMA_CM_EVENT_ADDR_RESOLVED) {
0a122b
+        ERROR(errp, "result not equal to event_addr_resolved %s\n",
0a122b
+                rdma_event_str(cm_event->event));
0a122b
+        perror("rdma_resolve_addr");
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+    rdma_ack_cm_event(cm_event);
0a122b
+
0a122b
+    /* resolve route */
0a122b
+    ret = rdma_resolve_route(rdma->cm_id, RDMA_RESOLVE_TIMEOUT_MS);
0a122b
+    if (ret) {
0a122b
+        ERROR(errp, "could not resolve rdma route\n");
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+
0a122b
+    ret = rdma_get_cm_event(rdma->channel, &cm_event);
0a122b
+    if (ret) {
0a122b
+        ERROR(errp, "could not perform event_route_resolved\n");
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+    if (cm_event->event != RDMA_CM_EVENT_ROUTE_RESOLVED) {
0a122b
+        ERROR(errp, "result not equal to event_route_resolved: %s\n",
0a122b
+                        rdma_event_str(cm_event->event));
0a122b
+        rdma_ack_cm_event(cm_event);
0a122b
+        goto err_resolve_get_addr;
0a122b
+    }
0a122b
+    rdma_ack_cm_event(cm_event);
0a122b
+    rdma->verbs = rdma->cm_id->verbs;
0a122b
+    qemu_rdma_dump_id("source_resolve_host", rdma->cm_id->verbs);
0a122b
+    qemu_rdma_dump_gid("source_resolve_host", rdma->cm_id);
0a122b
+    return 0;
0a122b
+
0a122b
+err_resolve_get_addr:
0a122b
+    rdma_destroy_id(rdma->cm_id);
0a122b
+    rdma->cm_id = NULL;
0a122b
+err_resolve_create_id:
0a122b
+    rdma_destroy_event_channel(rdma->channel);
0a122b
+    rdma->channel = NULL;
0a122b
+
0a122b
+    return -1;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Create protection domain and completion queues
0a122b
+ */
0a122b
+static int qemu_rdma_alloc_pd_cq(RDMAContext *rdma)
0a122b
+{
0a122b
+    /* allocate pd */
0a122b
+    rdma->pd = ibv_alloc_pd(rdma->verbs);
0a122b
+    if (!rdma->pd) {
0a122b
+        fprintf(stderr, "failed to allocate protection domain\n");
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    /* create completion channel */
0a122b
+    rdma->comp_channel = ibv_create_comp_channel(rdma->verbs);
0a122b
+    if (!rdma->comp_channel) {
0a122b
+        fprintf(stderr, "failed to allocate completion channel\n");
0a122b
+        goto err_alloc_pd_cq;
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * Completion queue can be filled by both read and write work requests,
0a122b
+     * so must reflect the sum of both possible queue sizes.
0a122b
+     */
0a122b
+    rdma->cq = ibv_create_cq(rdma->verbs, (RDMA_SIGNALED_SEND_MAX * 3),
0a122b
+            NULL, rdma->comp_channel, 0);
0a122b
+    if (!rdma->cq) {
0a122b
+        fprintf(stderr, "failed to allocate completion queue\n");
0a122b
+        goto err_alloc_pd_cq;
0a122b
+    }
0a122b
+
0a122b
+    return 0;
0a122b
+
0a122b
+err_alloc_pd_cq:
0a122b
+    if (rdma->pd) {
0a122b
+        ibv_dealloc_pd(rdma->pd);
0a122b
+    }
0a122b
+    if (rdma->comp_channel) {
0a122b
+        ibv_destroy_comp_channel(rdma->comp_channel);
0a122b
+    }
0a122b
+    rdma->pd = NULL;
0a122b
+    rdma->comp_channel = NULL;
0a122b
+    return -1;
0a122b
+
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Create queue pairs.
0a122b
+ */
0a122b
+static int qemu_rdma_alloc_qp(RDMAContext *rdma)
0a122b
+{
0a122b
+    struct ibv_qp_init_attr attr = { 0 };
0a122b
+    int ret;
0a122b
+
0a122b
+    attr.cap.max_send_wr = RDMA_SIGNALED_SEND_MAX;
0a122b
+    attr.cap.max_recv_wr = 3;
0a122b
+    attr.cap.max_send_sge = 1;
0a122b
+    attr.cap.max_recv_sge = 1;
0a122b
+    attr.send_cq = rdma->cq;
0a122b
+    attr.recv_cq = rdma->cq;
0a122b
+    attr.qp_type = IBV_QPT_RC;
0a122b
+
0a122b
+    ret = rdma_create_qp(rdma->cm_id, rdma->pd, &attr);
0a122b
+    if (ret) {
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    rdma->qp = rdma->cm_id->qp;
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+static int qemu_rdma_reg_whole_ram_blocks(RDMAContext *rdma)
0a122b
+{
0a122b
+    int i;
0a122b
+    RDMALocalBlocks *local = &rdma->local_ram_blocks;
0a122b
+
0a122b
+    for (i = 0; i < local->nb_blocks; i++) {
0a122b
+        local->block[i].mr =
0a122b
+            ibv_reg_mr(rdma->pd,
0a122b
+                    local->block[i].local_host_addr,
0a122b
+                    local->block[i].length,
0a122b
+                    IBV_ACCESS_LOCAL_WRITE |
0a122b
+                    IBV_ACCESS_REMOTE_WRITE
0a122b
+                    );
0a122b
+        if (!local->block[i].mr) {
0a122b
+            perror("Failed to register local dest ram block!\n");
0a122b
+            break;
0a122b
+        }
0a122b
+        rdma->total_registrations++;
0a122b
+    }
0a122b
+
0a122b
+    if (i >= local->nb_blocks) {
0a122b
+        return 0;
0a122b
+    }
0a122b
+
0a122b
+    for (i--; i >= 0; i--) {
0a122b
+        ibv_dereg_mr(local->block[i].mr);
0a122b
+        rdma->total_registrations--;
0a122b
+    }
0a122b
+
0a122b
+    return -1;
0a122b
+
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Find the ram block that corresponds to the page requested to be
0a122b
+ * transmitted by QEMU.
0a122b
+ *
0a122b
+ * Once the block is found, also identify which 'chunk' within that
0a122b
+ * block that the page belongs to.
0a122b
+ *
0a122b
+ * This search cannot fail or the migration will fail.
0a122b
+ */
0a122b
+static int qemu_rdma_search_ram_block(RDMAContext *rdma,
0a122b
+                                      uint64_t block_offset,
0a122b
+                                      uint64_t offset,
0a122b
+                                      uint64_t length,
0a122b
+                                      uint64_t *block_index,
0a122b
+                                      uint64_t *chunk_index)
0a122b
+{
0a122b
+    uint64_t current_addr = block_offset + offset;
0a122b
+    RDMALocalBlock *block = g_hash_table_lookup(rdma->blockmap,
0a122b
+                                                (void *) block_offset);
0a122b
+    assert(block);
0a122b
+    assert(current_addr >= block->offset);
0a122b
+    assert((current_addr + length) <= (block->offset + block->length));
0a122b
+
0a122b
+    *block_index = block->index;
0a122b
+    *chunk_index = ram_chunk_index(block->local_host_addr,
0a122b
+                block->local_host_addr + (current_addr - block->offset));
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Register a chunk with IB. If the chunk was already registered
0a122b
+ * previously, then skip.
0a122b
+ *
0a122b
+ * Also return the keys associated with the registration needed
0a122b
+ * to perform the actual RDMA operation.
0a122b
+ */
0a122b
+static int qemu_rdma_register_and_get_keys(RDMAContext *rdma,
0a122b
+        RDMALocalBlock *block, uint8_t *host_addr,
0a122b
+        uint32_t *lkey, uint32_t *rkey, int chunk,
0a122b
+        uint8_t *chunk_start, uint8_t *chunk_end)
0a122b
+{
0a122b
+    if (block->mr) {
0a122b
+        if (lkey) {
0a122b
+            *lkey = block->mr->lkey;
0a122b
+        }
0a122b
+        if (rkey) {
0a122b
+            *rkey = block->mr->rkey;
0a122b
+        }
0a122b
+        return 0;
0a122b
+    }
0a122b
+
0a122b
+    /* allocate memory to store chunk MRs */
0a122b
+    if (!block->pmr) {
0a122b
+        block->pmr = g_malloc0(block->nb_chunks * sizeof(struct ibv_mr *));
0a122b
+        if (!block->pmr) {
0a122b
+            return -1;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * If 'rkey', then we're the destination, so grant access to the source.
0a122b
+     *
0a122b
+     * If 'lkey', then we're the source VM, so grant access only to ourselves.
0a122b
+     */
0a122b
+    if (!block->pmr[chunk]) {
0a122b
+        uint64_t len = chunk_end - chunk_start;
0a122b
+
0a122b
+        DDPRINTF("Registering %" PRIu64 " bytes @ %p\n",
0a122b
+                 len, chunk_start);
0a122b
+
0a122b
+        block->pmr[chunk] = ibv_reg_mr(rdma->pd,
0a122b
+                chunk_start, len,
0a122b
+                (rkey ? (IBV_ACCESS_LOCAL_WRITE |
0a122b
+                        IBV_ACCESS_REMOTE_WRITE) : 0));
0a122b
+
0a122b
+        if (!block->pmr[chunk]) {
0a122b
+            perror("Failed to register chunk!");
0a122b
+            fprintf(stderr, "Chunk details: block: %d chunk index %d"
0a122b
+                            " start %" PRIu64 " end %" PRIu64 " host %" PRIu64
0a122b
+                            " local %" PRIu64 " registrations: %d\n",
0a122b
+                            block->index, chunk, (uint64_t) chunk_start,
0a122b
+                            (uint64_t) chunk_end, (uint64_t) host_addr,
0a122b
+                            (uint64_t) block->local_host_addr,
0a122b
+                            rdma->total_registrations);
0a122b
+            return -1;
0a122b
+        }
0a122b
+        rdma->total_registrations++;
0a122b
+    }
0a122b
+
0a122b
+    if (lkey) {
0a122b
+        *lkey = block->pmr[chunk]->lkey;
0a122b
+    }
0a122b
+    if (rkey) {
0a122b
+        *rkey = block->pmr[chunk]->rkey;
0a122b
+    }
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Register (at connection time) the memory used for control
0a122b
+ * channel messages.
0a122b
+ */
0a122b
+static int qemu_rdma_reg_control(RDMAContext *rdma, int idx)
0a122b
+{
0a122b
+    rdma->wr_data[idx].control_mr = ibv_reg_mr(rdma->pd,
0a122b
+            rdma->wr_data[idx].control, RDMA_CONTROL_MAX_BUFFER,
0a122b
+            IBV_ACCESS_LOCAL_WRITE | IBV_ACCESS_REMOTE_WRITE);
0a122b
+    if (rdma->wr_data[idx].control_mr) {
0a122b
+        rdma->total_registrations++;
0a122b
+        return 0;
0a122b
+    }
0a122b
+    fprintf(stderr, "qemu_rdma_reg_control failed!\n");
0a122b
+    return -1;
0a122b
+}
0a122b
+
0a122b
+const char *print_wrid(int wrid)
0a122b
+{
0a122b
+    if (wrid >= RDMA_WRID_RECV_CONTROL) {
0a122b
+        return wrid_desc[RDMA_WRID_RECV_CONTROL];
0a122b
+    }
0a122b
+    return wrid_desc[wrid];
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * RDMA requires memory registration (mlock/pinning), but this is not good for
0a122b
+ * overcommitment.
0a122b
+ *
0a122b
+ * In preparation for the future where LRU information or workload-specific
0a122b
+ * writable writable working set memory access behavior is available to QEMU
0a122b
+ * it would be nice to have in place the ability to UN-register/UN-pin
0a122b
+ * particular memory regions from the RDMA hardware when it is determine that
0a122b
+ * those regions of memory will likely not be accessed again in the near future.
0a122b
+ *
0a122b
+ * While we do not yet have such information right now, the following
0a122b
+ * compile-time option allows us to perform a non-optimized version of this
0a122b
+ * behavior.
0a122b
+ *
0a122b
+ * By uncommenting this option, you will cause *all* RDMA transfers to be
0a122b
+ * unregistered immediately after the transfer completes on both sides of the
0a122b
+ * connection. This has no effect in 'rdma-pin-all' mode, only regular mode.
0a122b
+ *
0a122b
+ * This will have a terrible impact on migration performance, so until future
0a122b
+ * workload information or LRU information is available, do not attempt to use
0a122b
+ * this feature except for basic testing.
0a122b
+ */
0a122b
+//#define RDMA_UNREGISTRATION_EXAMPLE
0a122b
+
0a122b
+/*
0a122b
+ * Perform a non-optimized memory unregistration after every transfer
0a122b
+ * for demonsration purposes, only if pin-all is not requested.
0a122b
+ *
0a122b
+ * Potential optimizations:
0a122b
+ * 1. Start a new thread to run this function continuously
0a122b
+        - for bit clearing
0a122b
+        - and for receipt of unregister messages
0a122b
+ * 2. Use an LRU.
0a122b
+ * 3. Use workload hints.
0a122b
+ */
0a122b
+static int qemu_rdma_unregister_waiting(RDMAContext *rdma)
0a122b
+{
0a122b
+    while (rdma->unregistrations[rdma->unregister_current]) {
0a122b
+        int ret;
0a122b
+        uint64_t wr_id = rdma->unregistrations[rdma->unregister_current];
0a122b
+        uint64_t chunk =
0a122b
+            (wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
0a122b
+        uint64_t index =
0a122b
+            (wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
0a122b
+        RDMALocalBlock *block =
0a122b
+            &(rdma->local_ram_blocks.block[index]);
0a122b
+        RDMARegister reg = { .current_index = index };
0a122b
+        RDMAControlHeader resp = { .type = RDMA_CONTROL_UNREGISTER_FINISHED,
0a122b
+                                 };
0a122b
+        RDMAControlHeader head = { .len = sizeof(RDMARegister),
0a122b
+                                   .type = RDMA_CONTROL_UNREGISTER_REQUEST,
0a122b
+                                   .repeat = 1,
0a122b
+                                 };
0a122b
+
0a122b
+        DDPRINTF("Processing unregister for chunk: %" PRIu64
0a122b
+                 " at position %d\n", chunk, rdma->unregister_current);
0a122b
+
0a122b
+        rdma->unregistrations[rdma->unregister_current] = 0;
0a122b
+        rdma->unregister_current++;
0a122b
+
0a122b
+        if (rdma->unregister_current == RDMA_SIGNALED_SEND_MAX) {
0a122b
+            rdma->unregister_current = 0;
0a122b
+        }
0a122b
+
0a122b
+
0a122b
+        /*
0a122b
+         * Unregistration is speculative (because migration is single-threaded
0a122b
+         * and we cannot break the protocol's inifinband message ordering).
0a122b
+         * Thus, if the memory is currently being used for transmission,
0a122b
+         * then abort the attempt to unregister and try again
0a122b
+         * later the next time a completion is received for this memory.
0a122b
+         */
0a122b
+        clear_bit(chunk, block->unregister_bitmap);
0a122b
+
0a122b
+        if (test_bit(chunk, block->transit_bitmap)) {
0a122b
+            DDPRINTF("Cannot unregister inflight chunk: %" PRIu64 "\n", chunk);
0a122b
+            continue;
0a122b
+        }
0a122b
+
0a122b
+        DDPRINTF("Sending unregister for chunk: %" PRIu64 "\n", chunk);
0a122b
+
0a122b
+        ret = ibv_dereg_mr(block->pmr[chunk]);
0a122b
+        block->pmr[chunk] = NULL;
0a122b
+        block->remote_keys[chunk] = 0;
0a122b
+
0a122b
+        if (ret != 0) {
0a122b
+            perror("unregistration chunk failed");
0a122b
+            return -ret;
0a122b
+        }
0a122b
+        rdma->total_registrations--;
0a122b
+
0a122b
+        reg.key.chunk = chunk;
0a122b
+        register_to_network(®);
0a122b
+        ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
0a122b
+                                &resp, NULL, NULL);
0a122b
+        if (ret < 0) {
0a122b
+            return ret;
0a122b
+        }
0a122b
+
0a122b
+        DDPRINTF("Unregister for chunk: %" PRIu64 " complete.\n", chunk);
0a122b
+    }
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+static uint64_t qemu_rdma_make_wrid(uint64_t wr_id, uint64_t index,
0a122b
+                                         uint64_t chunk)
0a122b
+{
0a122b
+    uint64_t result = wr_id & RDMA_WRID_TYPE_MASK;
0a122b
+
0a122b
+    result |= (index << RDMA_WRID_BLOCK_SHIFT);
0a122b
+    result |= (chunk << RDMA_WRID_CHUNK_SHIFT);
0a122b
+
0a122b
+    return result;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Set bit for unregistration in the next iteration.
0a122b
+ * We cannot transmit right here, but will unpin later.
0a122b
+ */
0a122b
+static void qemu_rdma_signal_unregister(RDMAContext *rdma, uint64_t index,
0a122b
+                                        uint64_t chunk, uint64_t wr_id)
0a122b
+{
0a122b
+    if (rdma->unregistrations[rdma->unregister_next] != 0) {
0a122b
+        fprintf(stderr, "rdma migration: queue is full!\n");
0a122b
+    } else {
0a122b
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
0a122b
+
0a122b
+        if (!test_and_set_bit(chunk, block->unregister_bitmap)) {
0a122b
+            DDPRINTF("Appending unregister chunk %" PRIu64
0a122b
+                    " at position %d\n", chunk, rdma->unregister_next);
0a122b
+
0a122b
+            rdma->unregistrations[rdma->unregister_next++] =
0a122b
+                    qemu_rdma_make_wrid(wr_id, index, chunk);
0a122b
+
0a122b
+            if (rdma->unregister_next == RDMA_SIGNALED_SEND_MAX) {
0a122b
+                rdma->unregister_next = 0;
0a122b
+            }
0a122b
+        } else {
0a122b
+            DDPRINTF("Unregister chunk %" PRIu64 " already in queue.\n",
0a122b
+                    chunk);
0a122b
+        }
0a122b
+    }
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Consult the connection manager to see a work request
0a122b
+ * (of any kind) has completed.
0a122b
+ * Return the work request ID that completed.
0a122b
+ */
0a122b
+static uint64_t qemu_rdma_poll(RDMAContext *rdma, uint64_t *wr_id_out)
0a122b
+{
0a122b
+    int ret;
0a122b
+    struct ibv_wc wc;
0a122b
+    uint64_t wr_id;
0a122b
+
0a122b
+    ret = ibv_poll_cq(rdma->cq, 1, &wc);
0a122b
+
0a122b
+    if (!ret) {
0a122b
+        *wr_id_out = RDMA_WRID_NONE;
0a122b
+        return 0;
0a122b
+    }
0a122b
+
0a122b
+    if (ret < 0) {
0a122b
+        fprintf(stderr, "ibv_poll_cq return %d!\n", ret);
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    wr_id = wc.wr_id & RDMA_WRID_TYPE_MASK;
0a122b
+
0a122b
+    if (wc.status != IBV_WC_SUCCESS) {
0a122b
+        fprintf(stderr, "ibv_poll_cq wc.status=%d %s!\n",
0a122b
+                        wc.status, ibv_wc_status_str(wc.status));
0a122b
+        fprintf(stderr, "ibv_poll_cq wrid=%s!\n", wrid_desc[wr_id]);
0a122b
+
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    if (rdma->control_ready_expected &&
0a122b
+        (wr_id >= RDMA_WRID_RECV_CONTROL)) {
0a122b
+        DDDPRINTF("completion %s #%" PRId64 " received (%" PRId64 ")"
0a122b
+                  " left %d\n", wrid_desc[RDMA_WRID_RECV_CONTROL],
0a122b
+                  wr_id - RDMA_WRID_RECV_CONTROL, wr_id, rdma->nb_sent);
0a122b
+        rdma->control_ready_expected = 0;
0a122b
+    }
0a122b
+
0a122b
+    if (wr_id == RDMA_WRID_RDMA_WRITE) {
0a122b
+        uint64_t chunk =
0a122b
+            (wc.wr_id & RDMA_WRID_CHUNK_MASK) >> RDMA_WRID_CHUNK_SHIFT;
0a122b
+        uint64_t index =
0a122b
+            (wc.wr_id & RDMA_WRID_BLOCK_MASK) >> RDMA_WRID_BLOCK_SHIFT;
0a122b
+        RDMALocalBlock *block = &(rdma->local_ram_blocks.block[index]);
0a122b
+
0a122b
+        DDDPRINTF("completions %s (%" PRId64 ") left %d, "
0a122b
+                 "block %" PRIu64 ", chunk: %" PRIu64 " %p %p\n",
0a122b
+                 print_wrid(wr_id), wr_id, rdma->nb_sent, index, chunk,
0a122b
+                 block->local_host_addr, (void *)block->remote_host_addr);
0a122b
+
0a122b
+        clear_bit(chunk, block->transit_bitmap);
0a122b
+
0a122b
+        if (rdma->nb_sent > 0) {
0a122b
+            rdma->nb_sent--;
0a122b
+        }
0a122b
+
0a122b
+        if (!rdma->pin_all) {
0a122b
+            /*
0a122b
+             * FYI: If one wanted to signal a specific chunk to be unregistered
0a122b
+             * using LRU or workload-specific information, this is the function
0a122b
+             * you would call to do so. That chunk would then get asynchronously
0a122b
+             * unregistered later.
0a122b
+             */
0a122b
+#ifdef RDMA_UNREGISTRATION_EXAMPLE
0a122b
+            qemu_rdma_signal_unregister(rdma, index, chunk, wc.wr_id);
0a122b
+#endif
0a122b
+        }
0a122b
+    } else {
0a122b
+        DDDPRINTF("other completion %s (%" PRId64 ") received left %d\n",
0a122b
+            print_wrid(wr_id), wr_id, rdma->nb_sent);
0a122b
+    }
0a122b
+
0a122b
+    *wr_id_out = wc.wr_id;
0a122b
+
0a122b
+    return  0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Block until the next work request has completed.
0a122b
+ *
0a122b
+ * First poll to see if a work request has already completed,
0a122b
+ * otherwise block.
0a122b
+ *
0a122b
+ * If we encounter completed work requests for IDs other than
0a122b
+ * the one we're interested in, then that's generally an error.
0a122b
+ *
0a122b
+ * The only exception is actual RDMA Write completions. These
0a122b
+ * completions only need to be recorded, but do not actually
0a122b
+ * need further processing.
0a122b
+ */
0a122b
+static int qemu_rdma_block_for_wrid(RDMAContext *rdma, int wrid_requested)
0a122b
+{
0a122b
+    int num_cq_events = 0, ret = 0;
0a122b
+    struct ibv_cq *cq;
0a122b
+    void *cq_ctx;
0a122b
+    uint64_t wr_id = RDMA_WRID_NONE, wr_id_in;
0a122b
+
0a122b
+    if (ibv_req_notify_cq(rdma->cq, 0)) {
0a122b
+        return -1;
0a122b
+    }
0a122b
+    /* poll cq first */
0a122b
+    while (wr_id != wrid_requested) {
0a122b
+        ret = qemu_rdma_poll(rdma, &wr_id_in);
0a122b
+        if (ret < 0) {
0a122b
+            return ret;
0a122b
+        }
0a122b
+
0a122b
+        wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
0a122b
+
0a122b
+        if (wr_id == RDMA_WRID_NONE) {
0a122b
+            break;
0a122b
+        }
0a122b
+        if (wr_id != wrid_requested) {
0a122b
+            DDDPRINTF("A Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
0a122b
+                print_wrid(wrid_requested),
0a122b
+                wrid_requested, print_wrid(wr_id), wr_id);
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    if (wr_id == wrid_requested) {
0a122b
+        return 0;
0a122b
+    }
0a122b
+
0a122b
+    while (1) {
0a122b
+        /*
0a122b
+         * Coroutine doesn't start until process_incoming_migration()
0a122b
+         * so don't yield unless we know we're running inside of a coroutine.
0a122b
+         */
0a122b
+        if (rdma->migration_started_on_destination) {
0a122b
+            yield_until_fd_readable(rdma->comp_channel->fd);
0a122b
+        }
0a122b
+
0a122b
+        if (ibv_get_cq_event(rdma->comp_channel, &cq, &cq_ctx)) {
0a122b
+            perror("ibv_get_cq_event");
0a122b
+            goto err_block_for_wrid;
0a122b
+        }
0a122b
+
0a122b
+        num_cq_events++;
0a122b
+
0a122b
+        if (ibv_req_notify_cq(cq, 0)) {
0a122b
+            goto err_block_for_wrid;
0a122b
+        }
0a122b
+
0a122b
+        while (wr_id != wrid_requested) {
0a122b
+            ret = qemu_rdma_poll(rdma, &wr_id_in);
0a122b
+            if (ret < 0) {
0a122b
+                goto err_block_for_wrid;
0a122b
+            }
0a122b
+
0a122b
+            wr_id = wr_id_in & RDMA_WRID_TYPE_MASK;
0a122b
+
0a122b
+            if (wr_id == RDMA_WRID_NONE) {
0a122b
+                break;
0a122b
+            }
0a122b
+            if (wr_id != wrid_requested) {
0a122b
+                DDDPRINTF("B Wanted wrid %s (%d) but got %s (%" PRIu64 ")\n",
0a122b
+                    print_wrid(wrid_requested), wrid_requested,
0a122b
+                    print_wrid(wr_id), wr_id);
0a122b
+            }
0a122b
+        }
0a122b
+
0a122b
+        if (wr_id == wrid_requested) {
0a122b
+            goto success_block_for_wrid;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+success_block_for_wrid:
0a122b
+    if (num_cq_events) {
0a122b
+        ibv_ack_cq_events(cq, num_cq_events);
0a122b
+    }
0a122b
+    return 0;
0a122b
+
0a122b
+err_block_for_wrid:
0a122b
+    if (num_cq_events) {
0a122b
+        ibv_ack_cq_events(cq, num_cq_events);
0a122b
+    }
0a122b
+    return ret;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Post a SEND message work request for the control channel
0a122b
+ * containing some data and block until the post completes.
0a122b
+ */
0a122b
+static int qemu_rdma_post_send_control(RDMAContext *rdma, uint8_t *buf,
0a122b
+                                       RDMAControlHeader *head)
0a122b
+{
0a122b
+    int ret = 0;
0a122b
+    RDMAWorkRequestData *wr = &rdma->wr_data[RDMA_WRID_MAX];
0a122b
+    struct ibv_send_wr *bad_wr;
0a122b
+    struct ibv_sge sge = {
0a122b
+                           .addr = (uint64_t)(wr->control),
0a122b
+                           .length = head->len + sizeof(RDMAControlHeader),
0a122b
+                           .lkey = wr->control_mr->lkey,
0a122b
+                         };
0a122b
+    struct ibv_send_wr send_wr = {
0a122b
+                                   .wr_id = RDMA_WRID_SEND_CONTROL,
0a122b
+                                   .opcode = IBV_WR_SEND,
0a122b
+                                   .send_flags = IBV_SEND_SIGNALED,
0a122b
+                                   .sg_list = &sge,
0a122b
+                                   .num_sge = 1,
0a122b
+                                };
0a122b
+
0a122b
+    DDDPRINTF("CONTROL: sending %s..\n", control_desc[head->type]);
0a122b
+
0a122b
+    /*
0a122b
+     * We don't actually need to do a memcpy() in here if we used
0a122b
+     * the "sge" properly, but since we're only sending control messages
0a122b
+     * (not RAM in a performance-critical path), then its OK for now.
0a122b
+     *
0a122b
+     * The copy makes the RDMAControlHeader simpler to manipulate
0a122b
+     * for the time being.
0a122b
+     */
0a122b
+    memcpy(wr->control, head, sizeof(RDMAControlHeader));
0a122b
+    control_to_network((void *) wr->control);
0a122b
+
0a122b
+    if (buf) {
0a122b
+        memcpy(wr->control + sizeof(RDMAControlHeader), buf, head->len);
0a122b
+    }
0a122b
+
0a122b
+
0a122b
+    if (ibv_post_send(rdma->qp, &send_wr, &bad_wr)) {
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    if (ret < 0) {
0a122b
+        fprintf(stderr, "Failed to use post IB SEND for control!\n");
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_SEND_CONTROL);
0a122b
+    if (ret < 0) {
0a122b
+        fprintf(stderr, "rdma migration: send polling control error!\n");
0a122b
+    }
0a122b
+
0a122b
+    return ret;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Post a RECV work request in anticipation of some future receipt
0a122b
+ * of data on the control channel.
0a122b
+ */
0a122b
+static int qemu_rdma_post_recv_control(RDMAContext *rdma, int idx)
0a122b
+{
0a122b
+    struct ibv_recv_wr *bad_wr;
0a122b
+    struct ibv_sge sge = {
0a122b
+                            .addr = (uint64_t)(rdma->wr_data[idx].control),
0a122b
+                            .length = RDMA_CONTROL_MAX_BUFFER,
0a122b
+                            .lkey = rdma->wr_data[idx].control_mr->lkey,
0a122b
+                         };
0a122b
+
0a122b
+    struct ibv_recv_wr recv_wr = {
0a122b
+                                    .wr_id = RDMA_WRID_RECV_CONTROL + idx,
0a122b
+                                    .sg_list = &sge,
0a122b
+                                    .num_sge = 1,
0a122b
+                                 };
0a122b
+
0a122b
+
0a122b
+    if (ibv_post_recv(rdma->qp, &recv_wr, &bad_wr)) {
0a122b
+        return -1;
0a122b
+    }
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Block and wait for a RECV control channel message to arrive.
0a122b
+ */
0a122b
+static int qemu_rdma_exchange_get_response(RDMAContext *rdma,
0a122b
+                RDMAControlHeader *head, int expecting, int idx)
0a122b
+{
0a122b
+    int ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RECV_CONTROL + idx);
0a122b
+
0a122b
+    if (ret < 0) {
0a122b
+        fprintf(stderr, "rdma migration: recv polling control error!\n");
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    network_to_control((void *) rdma->wr_data[idx].control);
0a122b
+    memcpy(head, rdma->wr_data[idx].control, sizeof(RDMAControlHeader));
0a122b
+
0a122b
+    DDDPRINTF("CONTROL: %s receiving...\n", control_desc[expecting]);
0a122b
+
0a122b
+    if (expecting == RDMA_CONTROL_NONE) {
0a122b
+        DDDPRINTF("Surprise: got %s (%d)\n",
0a122b
+                  control_desc[head->type], head->type);
0a122b
+    } else if (head->type != expecting || head->type == RDMA_CONTROL_ERROR) {
0a122b
+        fprintf(stderr, "Was expecting a %s (%d) control message"
0a122b
+                ", but got: %s (%d), length: %d\n",
0a122b
+                control_desc[expecting], expecting,
0a122b
+                control_desc[head->type], head->type, head->len);
0a122b
+        return -EIO;
0a122b
+    }
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * When a RECV work request has completed, the work request's
0a122b
+ * buffer is pointed at the header.
0a122b
+ *
0a122b
+ * This will advance the pointer to the data portion
0a122b
+ * of the control message of the work request's buffer that
0a122b
+ * was populated after the work request finished.
0a122b
+ */
0a122b
+static void qemu_rdma_move_header(RDMAContext *rdma, int idx,
0a122b
+                                  RDMAControlHeader *head)
0a122b
+{
0a122b
+    rdma->wr_data[idx].control_len = head->len;
0a122b
+    rdma->wr_data[idx].control_curr =
0a122b
+        rdma->wr_data[idx].control + sizeof(RDMAControlHeader);
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * This is an 'atomic' high-level operation to deliver a single, unified
0a122b
+ * control-channel message.
0a122b
+ *
0a122b
+ * Additionally, if the user is expecting some kind of reply to this message,
0a122b
+ * they can request a 'resp' response message be filled in by posting an
0a122b
+ * additional work request on behalf of the user and waiting for an additional
0a122b
+ * completion.
0a122b
+ *
0a122b
+ * The extra (optional) response is used during registration to us from having
0a122b
+ * to perform an *additional* exchange of message just to provide a response by
0a122b
+ * instead piggy-backing on the acknowledgement.
0a122b
+ */
0a122b
+static int qemu_rdma_exchange_send(RDMAContext *rdma, RDMAControlHeader *head,
0a122b
+                                   uint8_t *data, RDMAControlHeader *resp,
0a122b
+                                   int *resp_idx,
0a122b
+                                   int (*callback)(RDMAContext *rdma))
0a122b
+{
0a122b
+    int ret = 0;
0a122b
+
0a122b
+    /*
0a122b
+     * Wait until the dest is ready before attempting to deliver the message
0a122b
+     * by waiting for a READY message.
0a122b
+     */
0a122b
+    if (rdma->control_ready_expected) {
0a122b
+        RDMAControlHeader resp;
0a122b
+        ret = qemu_rdma_exchange_get_response(rdma,
0a122b
+                                    &resp, RDMA_CONTROL_READY, RDMA_WRID_READY);
0a122b
+        if (ret < 0) {
0a122b
+            return ret;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * If the user is expecting a response, post a WR in anticipation of it.
0a122b
+     */
0a122b
+    if (resp) {
0a122b
+        ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_DATA);
0a122b
+        if (ret) {
0a122b
+            fprintf(stderr, "rdma migration: error posting"
0a122b
+                    " extra control recv for anticipated result!");
0a122b
+            return ret;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * Post a WR to replace the one we just consumed for the READY message.
0a122b
+     */
0a122b
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
0a122b
+    if (ret) {
0a122b
+        fprintf(stderr, "rdma migration: error posting first control recv!");
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * Deliver the control message that was requested.
0a122b
+     */
0a122b
+    ret = qemu_rdma_post_send_control(rdma, data, head);
0a122b
+
0a122b
+    if (ret < 0) {
0a122b
+        fprintf(stderr, "Failed to send control buffer!\n");
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * If we're expecting a response, block and wait for it.
0a122b
+     */
0a122b
+    if (resp) {
0a122b
+        if (callback) {
0a122b
+            DDPRINTF("Issuing callback before receiving response...\n");
0a122b
+            ret = callback(rdma);
0a122b
+            if (ret < 0) {
0a122b
+                return ret;
0a122b
+            }
0a122b
+        }
0a122b
+
0a122b
+        DDPRINTF("Waiting for response %s\n", control_desc[resp->type]);
0a122b
+        ret = qemu_rdma_exchange_get_response(rdma, resp,
0a122b
+                                              resp->type, RDMA_WRID_DATA);
0a122b
+
0a122b
+        if (ret < 0) {
0a122b
+            return ret;
0a122b
+        }
0a122b
+
0a122b
+        qemu_rdma_move_header(rdma, RDMA_WRID_DATA, resp);
0a122b
+        if (resp_idx) {
0a122b
+            *resp_idx = RDMA_WRID_DATA;
0a122b
+        }
0a122b
+        DDPRINTF("Response %s received.\n", control_desc[resp->type]);
0a122b
+    }
0a122b
+
0a122b
+    rdma->control_ready_expected = 1;
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * This is an 'atomic' high-level operation to receive a single, unified
0a122b
+ * control-channel message.
0a122b
+ */
0a122b
+static int qemu_rdma_exchange_recv(RDMAContext *rdma, RDMAControlHeader *head,
0a122b
+                                int expecting)
0a122b
+{
0a122b
+    RDMAControlHeader ready = {
0a122b
+                                .len = 0,
0a122b
+                                .type = RDMA_CONTROL_READY,
0a122b
+                                .repeat = 1,
0a122b
+                              };
0a122b
+    int ret;
0a122b
+
0a122b
+    /*
0a122b
+     * Inform the source that we're ready to receive a message.
0a122b
+     */
0a122b
+    ret = qemu_rdma_post_send_control(rdma, NULL, &ready);
0a122b
+
0a122b
+    if (ret < 0) {
0a122b
+        fprintf(stderr, "Failed to send control buffer!\n");
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * Block and wait for the message.
0a122b
+     */
0a122b
+    ret = qemu_rdma_exchange_get_response(rdma, head,
0a122b
+                                          expecting, RDMA_WRID_READY);
0a122b
+
0a122b
+    if (ret < 0) {
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    qemu_rdma_move_header(rdma, RDMA_WRID_READY, head);
0a122b
+
0a122b
+    /*
0a122b
+     * Post a new RECV work request to replace the one we just consumed.
0a122b
+     */
0a122b
+    ret = qemu_rdma_post_recv_control(rdma, RDMA_WRID_READY);
0a122b
+    if (ret) {
0a122b
+        fprintf(stderr, "rdma migration: error posting second control recv!");
0a122b
+        return ret;
0a122b
+    }
0a122b
+
0a122b
+    return 0;
0a122b
+}
0a122b
+
0a122b
+/*
0a122b
+ * Write an actual chunk of memory using RDMA.
0a122b
+ *
0a122b
+ * If we're using dynamic registration on the dest-side, we have to
0a122b
+ * send a registration command first.
0a122b
+ */
0a122b
+static int qemu_rdma_write_one(QEMUFile *f, RDMAContext *rdma,
0a122b
+                               int current_index, uint64_t current_addr,
0a122b
+                               uint64_t length)
0a122b
+{
0a122b
+    struct ibv_sge sge;
0a122b
+    struct ibv_send_wr send_wr = { 0 };
0a122b
+    struct ibv_send_wr *bad_wr;
0a122b
+    int reg_result_idx, ret, count = 0;
0a122b
+    uint64_t chunk, chunks;
0a122b
+    uint8_t *chunk_start, *chunk_end;
0a122b
+    RDMALocalBlock *block = &(rdma->local_ram_blocks.block[current_index]);
0a122b
+    RDMARegister reg;
0a122b
+    RDMARegisterResult *reg_result;
0a122b
+    RDMAControlHeader resp = { .type = RDMA_CONTROL_REGISTER_RESULT };
0a122b
+    RDMAControlHeader head = { .len = sizeof(RDMARegister),
0a122b
+                               .type = RDMA_CONTROL_REGISTER_REQUEST,
0a122b
+                               .repeat = 1,
0a122b
+                             };
0a122b
+
0a122b
+retry:
0a122b
+    sge.addr = (uint64_t)(block->local_host_addr +
0a122b
+                            (current_addr - block->offset));
0a122b
+    sge.length = length;
0a122b
+
0a122b
+    chunk = ram_chunk_index(block->local_host_addr, (uint8_t *) sge.addr);
0a122b
+    chunk_start = ram_chunk_start(block, chunk);
0a122b
+
0a122b
+    if (block->is_ram_block) {
0a122b
+        chunks = length / (1UL << RDMA_REG_CHUNK_SHIFT);
0a122b
+
0a122b
+        if (chunks && ((length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
0a122b
+            chunks--;
0a122b
+        }
0a122b
+    } else {
0a122b
+        chunks = block->length / (1UL << RDMA_REG_CHUNK_SHIFT);
0a122b
+
0a122b
+        if (chunks && ((block->length % (1UL << RDMA_REG_CHUNK_SHIFT)) == 0)) {
0a122b
+            chunks--;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    DDPRINTF("Writing %" PRIu64 " chunks, (%" PRIu64 " MB)\n",
0a122b
+        chunks + 1, (chunks + 1) * (1UL << RDMA_REG_CHUNK_SHIFT) / 1024 / 1024);
0a122b
+
0a122b
+    chunk_end = ram_chunk_end(block, chunk + chunks);
0a122b
+
0a122b
+    if (!rdma->pin_all) {
0a122b
+#ifdef RDMA_UNREGISTRATION_EXAMPLE
0a122b
+        qemu_rdma_unregister_waiting(rdma);
0a122b
+#endif
0a122b
+    }
0a122b
+
0a122b
+    while (test_bit(chunk, block->transit_bitmap)) {
0a122b
+        (void)count;
0a122b
+        DDPRINTF("(%d) Not clobbering: block: %d chunk %" PRIu64
0a122b
+                " current %" PRIu64 " len %" PRIu64 " %d %d\n",
0a122b
+                count++, current_index, chunk,
0a122b
+                sge.addr, length, rdma->nb_sent, block->nb_chunks);
0a122b
+
0a122b
+        ret = qemu_rdma_block_for_wrid(rdma, RDMA_WRID_RDMA_WRITE);
0a122b
+
0a122b
+        if (ret < 0) {
0a122b
+            fprintf(stderr, "Failed to Wait for previous write to complete "
0a122b
+                    "block %d chunk %" PRIu64
0a122b
+                    " current %" PRIu64 " len %" PRIu64 " %d\n",
0a122b
+                    current_index, chunk, sge.addr, length, rdma->nb_sent);
0a122b
+            return ret;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    if (!rdma->pin_all || !block->is_ram_block) {
0a122b
+        if (!block->remote_keys[chunk]) {
0a122b
+            /*
0a122b
+             * This chunk has not yet been registered, so first check to see
0a122b
+             * if the entire chunk is zero. If so, tell the other size to
0a122b
+             * memset() + madvise() the entire chunk without RDMA.
0a122b
+             */
0a122b
+
0a122b
+            if (can_use_buffer_find_nonzero_offset((void *)sge.addr, length)
0a122b
+                   && buffer_find_nonzero_offset((void *)sge.addr,
0a122b
+                                                    length) == length) {
0a122b
+                RDMACompress comp = {
0a122b
+                                        .offset = current_addr,
0a122b
+                                        .value = 0,
0a122b
+                                        .block_idx = current_index,
0a122b
+                                        .length = length,
0a122b
+                                    };
0a122b
+
0a122b
+                head.len = sizeof(comp);
0a122b
+                head.type = RDMA_CONTROL_COMPRESS;
0a122b
+
0a122b
+                DDPRINTF("Entire chunk is zero, sending compress: %"
0a122b
+                    PRIu64 " for %d "
0a122b
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
0a122b
+                    chunk, sge.length, current_index, current_addr);
0a122b
+
0a122b
+                compress_to_network(&comp);
0a122b
+                ret = qemu_rdma_exchange_send(rdma, &head,
0a122b
+                                (uint8_t *) &comp, NULL, NULL, NULL);
0a122b
+
0a122b
+                if (ret < 0) {
0a122b
+                    return -EIO;
0a122b
+                }
0a122b
+
0a122b
+                acct_update_position(f, sge.length, true);
0a122b
+
0a122b
+                return 1;
0a122b
+            }
0a122b
+
0a122b
+            /*
0a122b
+             * Otherwise, tell other side to register.
0a122b
+             */
0a122b
+            reg.current_index = current_index;
0a122b
+            if (block->is_ram_block) {
0a122b
+                reg.key.current_addr = current_addr;
0a122b
+            } else {
0a122b
+                reg.key.chunk = chunk;
0a122b
+            }
0a122b
+            reg.chunks = chunks;
0a122b
+
0a122b
+            DDPRINTF("Sending registration request chunk %" PRIu64 " for %d "
0a122b
+                    "bytes, index: %d, offset: %" PRId64 "...\n",
0a122b
+                    chunk, sge.length, current_index, current_addr);
0a122b
+
0a122b
+            register_to_network(®);
0a122b
+            ret = qemu_rdma_exchange_send(rdma, &head, (uint8_t *) ®,
0a122b
+                                    &resp, ®_result_idx, NULL);
0a122b
+            if (ret < 0) {
0a122b
+                return ret;
0a122b
+            }
0a122b
+
0a122b
+            /* try to overlap this single registration with the one we sent. */
0a122b
+            if (qemu_rdma_register_and_get_keys(rdma, block,
0a122b
+                                                (uint8_t *) sge.addr,
0a122b
+                                                &sge.lkey, NULL, chunk,
0a122b
+                                                chunk_start, chunk_end)) {
0a122b
+                fprintf(stderr, "cannot get lkey!\n");
0a122b
+                return -EINVAL;
0a122b
+            }
0a122b
+
0a122b
+            reg_result = (RDMARegisterResult *)
0a122b
+                    rdma->wr_data[reg_result_idx].control_curr;
0a122b
+
0a122b
+            network_to_result(reg_result);
0a122b
+
0a122b
+            DDPRINTF("Received registration result:"
0a122b
+                    " my key: %x their key %x, chunk %" PRIu64 "\n",
0a122b
+                    block->remote_keys[chunk], reg_result->rkey, chunk);
0a122b
+
0a122b
+            block->remote_keys[chunk] = reg_result->rkey;
0a122b
+            block->remote_host_addr = reg_result->host_addr;
0a122b
+        } else {
0a122b
+            /* already registered before */
0a122b
+            if (qemu_rdma_register_and_get_keys(rdma, block,
0a122b
+                                                (uint8_t *)sge.addr,
0a122b
+                                                &sge.lkey, NULL, chunk,
0a122b
+                                                chunk_start, chunk_end)) {
0a122b
+                fprintf(stderr, "cannot get lkey!\n");
0a122b
+                return -EINVAL;
0a122b
+            }
0a122b
+        }
0a122b
+
0a122b
+        send_wr.wr.rdma.rkey = block->remote_keys[chunk];
0a122b
+    } else {
0a122b
+        send_wr.wr.rdma.rkey = block->remote_rkey;
0a122b
+
0a122b
+        if (qemu_rdma_register_and_get_keys(rdma, block, (uint8_t *)sge.addr,
0a122b
+                                                     &sge.lkey, NULL, chunk,
0a122b
+                                                     chunk_start, chunk_end)) {
0a122b
+            fprintf(stderr, "cannot get lkey!\n");
0a122b
+            return -EINVAL;
0a122b
+        }
0a122b
+    }
0a122b
+
0a122b
+    /*
0a122b
+     * Encode the ram block index and chunk within this wrid.
0a122b
+     * We will use this information at the time of completion
0a122b
+     * to figure out which bitmap to check against and then which
0a122b
+     * chunk in the bitmap to look for.
0a122b
+     */
0a122b
+    send_wr.wr_id = qemu_rdma_make_wrid(RDMA_WRID_RDMA_WRITE,
0a122b
+                                        current_index, chunk);
0a122b
+
0a122b
+    send_wr.opcode = IBV_WR_RDMA_WRITE;
0a122b
+    send_wr.send_flags = IBV_SEND_SIGNALED;
0a122b
+    send_wr.sg_list = &sge;
0a122b
+    send_wr.num_sge = 1;