Blob Blame History Raw
From 3b57c876e1eaca34fb5bd9067553de945013d4be Mon Sep 17 00:00:00 2001
From: Juan Quintela <quintela@redhat.com>
Date: Wed, 18 May 2022 02:52:24 -0300
Subject: [PATCH 16/37] multifd: Use normal pages array on the send side
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

RH-Author: Leonardo Brás <leobras@redhat.com>
RH-MergeRequest: 191: MSG_ZEROCOPY + Multifd @ rhel8.7
RH-Commit: [16/26] 1c48806474daf48fe93920ac361311af95c6a6f3
RH-Bugzilla: 2072049
RH-Acked-by: Peter Xu <peterx@redhat.com>
RH-Acked-by: Daniel P. Berrangé <berrange@redhat.com>
RH-Acked-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

We are only sending normal pages through multifd channels.
Later on this series, we are going to also send zero pages.
We are going to detect if a page is zero or non zero in the multifd
channel thread, not on the main thread.

So we receive an array of pages page->offset[N]

And we will end with:

p->normal[N - zero_pages]
p->zero[zero_pages].

In this patch, we just copy all the pages in offset to normal.

for (i = 0; i < pages->num; i++) {
    p->narmal[p->normal_num] = pages->offset[i];
    p->normal_num++:
}

Later in the series this becomes:

for (i = 0; i < pages->num; i++) {
    if (buffer_is_zero(page->offset[i])) {
        p->zerol[p->zero_num] = pages->offset[i];
        p->zero_num++:
    } else {
        p->narmal[p->normal_num] = pages->offset[i];
        p->normal_num++:
    }
}

Signed-off-by: Juan Quintela <quintela@redhat.com>
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>

---

Improving comment (dave)
Renaming num_normal_pages to total_normal_pages (peter)

(cherry picked from commit 815956f03902980c771da64b17f7f791c1cb57b0)
Signed-off-by: Leonardo Bras <leobras@redhat.com>
---
 migration/multifd-zlib.c |  6 +++---
 migration/multifd-zstd.c |  6 +++---
 migration/multifd.c      | 30 +++++++++++++++++++-----------
 migration/multifd.h      |  8 ++++++--
 migration/trace-events   |  4 ++--
 5 files changed, 33 insertions(+), 21 deletions(-)

diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
index 8ed29b9633..8508f26adf 100644
--- a/migration/multifd-zlib.c
+++ b/migration/multifd-zlib.c
@@ -108,16 +108,16 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
     int ret;
     uint32_t i;
 
-    for (i = 0; i < p->pages->num; i++) {
+    for (i = 0; i < p->normal_num; i++) {
         uint32_t available = z->zbuff_len - out_size;
         int flush = Z_NO_FLUSH;
 
-        if (i == p->pages->num - 1) {
+        if (i == p->normal_num - 1) {
             flush = Z_SYNC_FLUSH;
         }
 
         zs->avail_in = page_size;
-        zs->next_in = p->pages->block->host + p->pages->offset[i];
+        zs->next_in = p->pages->block->host + p->normal[i];
 
         zs->avail_out = available;
         zs->next_out = z->zbuff + out_size;
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
index 25e1f517b5..693af3a140 100644
--- a/migration/multifd-zstd.c
+++ b/migration/multifd-zstd.c
@@ -123,13 +123,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
     z->out.size = z->zbuff_len;
     z->out.pos = 0;
 
-    for (i = 0; i < p->pages->num; i++) {
+    for (i = 0; i < p->normal_num; i++) {
         ZSTD_EndDirective flush = ZSTD_e_continue;
 
-        if (i == p->pages->num - 1) {
+        if (i == p->normal_num - 1) {
             flush = ZSTD_e_flush;
         }
-        z->in.src = p->pages->block->host + p->pages->offset[i];
+        z->in.src = p->pages->block->host + p->normal[i];
         z->in.size = page_size;
         z->in.pos = 0;
 
diff --git a/migration/multifd.c b/migration/multifd.c
index d0f86542b1..3725226400 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -89,13 +89,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
     MultiFDPages_t *pages = p->pages;
     size_t page_size = qemu_target_page_size();
 
-    for (int i = 0; i < p->pages->num; i++) {
-        p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
+    for (int i = 0; i < p->normal_num; i++) {
+        p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
         p->iov[p->iovs_num].iov_len = page_size;
         p->iovs_num++;
     }
 
-    p->next_packet_size = p->pages->num * page_size;
+    p->next_packet_size = p->normal_num * page_size;
     p->flags |= MULTIFD_FLAG_NOCOMP;
     return 0;
 }
@@ -262,7 +262,7 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
 
     packet->flags = cpu_to_be32(p->flags);
     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
-    packet->pages_used = cpu_to_be32(p->pages->num);
+    packet->pages_used = cpu_to_be32(p->normal_num);
     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
     packet->packet_num = cpu_to_be64(p->packet_num);
 
@@ -270,9 +270,9 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
         strncpy(packet->ramblock, p->pages->block->idstr, 256);
     }
 
-    for (i = 0; i < p->pages->num; i++) {
+    for (i = 0; i < p->normal_num; i++) {
         /* there are architectures where ram_addr_t is 32 bit */
-        uint64_t temp = p->pages->offset[i];
+        uint64_t temp = p->normal[i];
 
         packet->offset[i] = cpu_to_be64(temp);
     }
@@ -556,6 +556,8 @@ void multifd_save_cleanup(void)
         p->packet = NULL;
         g_free(p->iov);
         p->iov = NULL;
+        g_free(p->normal);
+        p->normal = NULL;
         multifd_send_state->ops->send_cleanup(p, &local_err);
         if (local_err) {
             migrate_set_error(migrate_get_current(), local_err);
@@ -640,12 +642,17 @@ static void *multifd_send_thread(void *opaque)
         qemu_mutex_lock(&p->mutex);
 
         if (p->pending_job) {
-            uint32_t used = p->pages->num;
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
             p->iovs_num = 1;
+            p->normal_num = 0;
+
+            for (int i = 0; i < p->pages->num; i++) {
+                p->normal[p->normal_num] = p->pages->offset[i];
+                p->normal_num++;
+            }
 
-            if (used) {
+            if (p->normal_num) {
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
                 if (ret != 0) {
                     qemu_mutex_unlock(&p->mutex);
@@ -655,12 +662,12 @@ static void *multifd_send_thread(void *opaque)
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
-            p->num_pages += used;
+            p->total_normal_pages += p->normal_num;
             p->pages->num = 0;
             p->pages->block = NULL;
             qemu_mutex_unlock(&p->mutex);
 
-            trace_multifd_send(p->id, packet_num, used, flags,
+            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
                                p->next_packet_size);
 
             p->iov[0].iov_len = p->packet_len;
@@ -710,7 +717,7 @@ out:
     qemu_mutex_unlock(&p->mutex);
 
     rcu_unregister_thread();
-    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
+    trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
 
     return NULL;
 }
@@ -910,6 +917,7 @@ int multifd_save_setup(Error **errp)
         p->tls_hostname = g_strdup(s->hostname);
         /* We need one extra place for the packet header */
         p->iov = g_new0(struct iovec, page_count + 1);
+        p->normal = g_new0(ram_addr_t, page_count);
         socket_send_channel_create(multifd_new_send_channel_async, p);
     }
 
diff --git a/migration/multifd.h b/migration/multifd.h
index 7496f951a7..7823199dbe 100644
--- a/migration/multifd.h
+++ b/migration/multifd.h
@@ -104,14 +104,18 @@ typedef struct {
     /* thread local variables */
     /* packets sent through this channel */
     uint64_t num_packets;
-    /* pages sent through this channel */
-    uint64_t num_pages;
+    /* non zero pages sent through this channel */
+    uint64_t total_normal_pages;
     /* syncs main thread and channels */
     QemuSemaphore sem_sync;
     /* buffers to send */
     struct iovec *iov;
     /* number of iovs used */
     uint32_t iovs_num;
+    /* Pages that are not zero */
+    ram_addr_t *normal;
+    /* num of non zero pages */
+    uint32_t normal_num;
     /* used for compression methods */
     void *data;
 }  MultiFDSendParams;
diff --git a/migration/trace-events b/migration/trace-events
index 5172cb3b3d..171a83a55d 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -124,13 +124,13 @@ multifd_recv_sync_main_wait(uint8_t id) "channel %u"
 multifd_recv_terminate_threads(bool error) "error %d"
 multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
 multifd_recv_thread_start(uint8_t id) "%u"
-multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u"
+multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u flags 0x%x next packet size %u"
 multifd_send_error(uint8_t id) "channel %u"
 multifd_send_sync_main(long packet_num) "packet num %ld"
 multifd_send_sync_main_signal(uint8_t id) "channel %u"
 multifd_send_sync_main_wait(uint8_t id) "channel %u"
 multifd_send_terminate_threads(bool error) "error %d"
-multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %"  PRIu64
+multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %"  PRIu64
 multifd_send_thread_start(uint8_t id) "%u"
 multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
 multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"
-- 
2.35.3