0727d3
From 3b57c876e1eaca34fb5bd9067553de945013d4be Mon Sep 17 00:00:00 2001
0727d3
From: Juan Quintela <quintela@redhat.com>
0727d3
Date: Wed, 18 May 2022 02:52:24 -0300
0727d3
Subject: [PATCH 16/37] multifd: Use normal pages array on the send side
0727d3
MIME-Version: 1.0
0727d3
Content-Type: text/plain; charset=UTF-8
0727d3
Content-Transfer-Encoding: 8bit
0727d3
0727d3
RH-Author: Leonardo Brás <leobras@redhat.com>
0727d3
RH-MergeRequest: 191: MSG_ZEROCOPY + Multifd @ rhel8.7
0727d3
RH-Commit: [16/26] 1c48806474daf48fe93920ac361311af95c6a6f3
0727d3
RH-Bugzilla: 2072049
0727d3
RH-Acked-by: Peter Xu <peterx@redhat.com>
0727d3
RH-Acked-by: Daniel P. Berrangé <berrange@redhat.com>
0727d3
RH-Acked-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
0727d3
0727d3
We are only sending normal pages through multifd channels.
0727d3
Later on this series, we are going to also send zero pages.
0727d3
We are going to detect if a page is zero or non zero in the multifd
0727d3
channel thread, not on the main thread.
0727d3
0727d3
So we receive an array of pages page->offset[N]
0727d3
0727d3
And we will end with:
0727d3
0727d3
p->normal[N - zero_pages]
0727d3
p->zero[zero_pages].
0727d3
0727d3
In this patch, we just copy all the pages in offset to normal.
0727d3
0727d3
for (i = 0; i < pages->num; i++) {
0727d3
    p->narmal[p->normal_num] = pages->offset[i];
0727d3
    p->normal_num++:
0727d3
}
0727d3
0727d3
Later in the series this becomes:
0727d3
0727d3
for (i = 0; i < pages->num; i++) {
0727d3
    if (buffer_is_zero(page->offset[i])) {
0727d3
        p->zerol[p->zero_num] = pages->offset[i];
0727d3
        p->zero_num++:
0727d3
    } else {
0727d3
        p->narmal[p->normal_num] = pages->offset[i];
0727d3
        p->normal_num++:
0727d3
    }
0727d3
}
0727d3
0727d3
Signed-off-by: Juan Quintela <quintela@redhat.com>
0727d3
Reviewed-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
0727d3
0727d3
---
0727d3
0727d3
Improving comment (dave)
0727d3
Renaming num_normal_pages to total_normal_pages (peter)
0727d3
0727d3
(cherry picked from commit 815956f03902980c771da64b17f7f791c1cb57b0)
0727d3
Signed-off-by: Leonardo Bras <leobras@redhat.com>
0727d3
---
0727d3
 migration/multifd-zlib.c |  6 +++---
0727d3
 migration/multifd-zstd.c |  6 +++---
0727d3
 migration/multifd.c      | 30 +++++++++++++++++++-----------
0727d3
 migration/multifd.h      |  8 ++++++--
0727d3
 migration/trace-events   |  4 ++--
0727d3
 5 files changed, 33 insertions(+), 21 deletions(-)
0727d3
0727d3
diff --git a/migration/multifd-zlib.c b/migration/multifd-zlib.c
0727d3
index 8ed29b9633..8508f26adf 100644
0727d3
--- a/migration/multifd-zlib.c
0727d3
+++ b/migration/multifd-zlib.c
0727d3
@@ -108,16 +108,16 @@ static int zlib_send_prepare(MultiFDSendParams *p, Error **errp)
0727d3
     int ret;
0727d3
     uint32_t i;
0727d3
 
0727d3
-    for (i = 0; i < p->pages->num; i++) {
0727d3
+    for (i = 0; i < p->normal_num; i++) {
0727d3
         uint32_t available = z->zbuff_len - out_size;
0727d3
         int flush = Z_NO_FLUSH;
0727d3
 
0727d3
-        if (i == p->pages->num - 1) {
0727d3
+        if (i == p->normal_num - 1) {
0727d3
             flush = Z_SYNC_FLUSH;
0727d3
         }
0727d3
 
0727d3
         zs->avail_in = page_size;
0727d3
-        zs->next_in = p->pages->block->host + p->pages->offset[i];
0727d3
+        zs->next_in = p->pages->block->host + p->normal[i];
0727d3
 
0727d3
         zs->avail_out = available;
0727d3
         zs->next_out = z->zbuff + out_size;
0727d3
diff --git a/migration/multifd-zstd.c b/migration/multifd-zstd.c
0727d3
index 25e1f517b5..693af3a140 100644
0727d3
--- a/migration/multifd-zstd.c
0727d3
+++ b/migration/multifd-zstd.c
0727d3
@@ -123,13 +123,13 @@ static int zstd_send_prepare(MultiFDSendParams *p, Error **errp)
0727d3
     z->out.size = z->zbuff_len;
0727d3
     z->out.pos = 0;
0727d3
 
0727d3
-    for (i = 0; i < p->pages->num; i++) {
0727d3
+    for (i = 0; i < p->normal_num; i++) {
0727d3
         ZSTD_EndDirective flush = ZSTD_e_continue;
0727d3
 
0727d3
-        if (i == p->pages->num - 1) {
0727d3
+        if (i == p->normal_num - 1) {
0727d3
             flush = ZSTD_e_flush;
0727d3
         }
0727d3
-        z->in.src = p->pages->block->host + p->pages->offset[i];
0727d3
+        z->in.src = p->pages->block->host + p->normal[i];
0727d3
         z->in.size = page_size;
0727d3
         z->in.pos = 0;
0727d3
 
0727d3
diff --git a/migration/multifd.c b/migration/multifd.c
0727d3
index d0f86542b1..3725226400 100644
0727d3
--- a/migration/multifd.c
0727d3
+++ b/migration/multifd.c
0727d3
@@ -89,13 +89,13 @@ static int nocomp_send_prepare(MultiFDSendParams *p, Error **errp)
0727d3
     MultiFDPages_t *pages = p->pages;
0727d3
     size_t page_size = qemu_target_page_size();
0727d3
 
0727d3
-    for (int i = 0; i < p->pages->num; i++) {
0727d3
-        p->iov[p->iovs_num].iov_base = pages->block->host + pages->offset[i];
0727d3
+    for (int i = 0; i < p->normal_num; i++) {
0727d3
+        p->iov[p->iovs_num].iov_base = pages->block->host + p->normal[i];
0727d3
         p->iov[p->iovs_num].iov_len = page_size;
0727d3
         p->iovs_num++;
0727d3
     }
0727d3
 
0727d3
-    p->next_packet_size = p->pages->num * page_size;
0727d3
+    p->next_packet_size = p->normal_num * page_size;
0727d3
     p->flags |= MULTIFD_FLAG_NOCOMP;
0727d3
     return 0;
0727d3
 }
0727d3
@@ -262,7 +262,7 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
0727d3
 
0727d3
     packet->flags = cpu_to_be32(p->flags);
0727d3
     packet->pages_alloc = cpu_to_be32(p->pages->allocated);
0727d3
-    packet->pages_used = cpu_to_be32(p->pages->num);
0727d3
+    packet->pages_used = cpu_to_be32(p->normal_num);
0727d3
     packet->next_packet_size = cpu_to_be32(p->next_packet_size);
0727d3
     packet->packet_num = cpu_to_be64(p->packet_num);
0727d3
 
0727d3
@@ -270,9 +270,9 @@ static void multifd_send_fill_packet(MultiFDSendParams *p)
0727d3
         strncpy(packet->ramblock, p->pages->block->idstr, 256);
0727d3
     }
0727d3
 
0727d3
-    for (i = 0; i < p->pages->num; i++) {
0727d3
+    for (i = 0; i < p->normal_num; i++) {
0727d3
         /* there are architectures where ram_addr_t is 32 bit */
0727d3
-        uint64_t temp = p->pages->offset[i];
0727d3
+        uint64_t temp = p->normal[i];
0727d3
 
0727d3
         packet->offset[i] = cpu_to_be64(temp);
0727d3
     }
0727d3
@@ -556,6 +556,8 @@ void multifd_save_cleanup(void)
0727d3
         p->packet = NULL;
0727d3
         g_free(p->iov);
0727d3
         p->iov = NULL;
0727d3
+        g_free(p->normal);
0727d3
+        p->normal = NULL;
0727d3
         multifd_send_state->ops->send_cleanup(p, &local_err);
0727d3
         if (local_err) {
0727d3
             migrate_set_error(migrate_get_current(), local_err);
0727d3
@@ -640,12 +642,17 @@ static void *multifd_send_thread(void *opaque)
0727d3
         qemu_mutex_lock(&p->mutex);
0727d3
 
0727d3
         if (p->pending_job) {
0727d3
-            uint32_t used = p->pages->num;
0727d3
             uint64_t packet_num = p->packet_num;
0727d3
             uint32_t flags = p->flags;
0727d3
             p->iovs_num = 1;
0727d3
+            p->normal_num = 0;
0727d3
+
0727d3
+            for (int i = 0; i < p->pages->num; i++) {
0727d3
+                p->normal[p->normal_num] = p->pages->offset[i];
0727d3
+                p->normal_num++;
0727d3
+            }
0727d3
 
0727d3
-            if (used) {
0727d3
+            if (p->normal_num) {
0727d3
                 ret = multifd_send_state->ops->send_prepare(p, &local_err);
0727d3
                 if (ret != 0) {
0727d3
                     qemu_mutex_unlock(&p->mutex);
0727d3
@@ -655,12 +662,12 @@ static void *multifd_send_thread(void *opaque)
0727d3
             multifd_send_fill_packet(p);
0727d3
             p->flags = 0;
0727d3
             p->num_packets++;
0727d3
-            p->num_pages += used;
0727d3
+            p->total_normal_pages += p->normal_num;
0727d3
             p->pages->num = 0;
0727d3
             p->pages->block = NULL;
0727d3
             qemu_mutex_unlock(&p->mutex);
0727d3
 
0727d3
-            trace_multifd_send(p->id, packet_num, used, flags,
0727d3
+            trace_multifd_send(p->id, packet_num, p->normal_num, flags,
0727d3
                                p->next_packet_size);
0727d3
 
0727d3
             p->iov[0].iov_len = p->packet_len;
0727d3
@@ -710,7 +717,7 @@ out:
0727d3
     qemu_mutex_unlock(&p->mutex);
0727d3
 
0727d3
     rcu_unregister_thread();
0727d3
-    trace_multifd_send_thread_end(p->id, p->num_packets, p->num_pages);
0727d3
+    trace_multifd_send_thread_end(p->id, p->num_packets, p->total_normal_pages);
0727d3
 
0727d3
     return NULL;
0727d3
 }
0727d3
@@ -910,6 +917,7 @@ int multifd_save_setup(Error **errp)
0727d3
         p->tls_hostname = g_strdup(s->hostname);
0727d3
         /* We need one extra place for the packet header */
0727d3
         p->iov = g_new0(struct iovec, page_count + 1);
0727d3
+        p->normal = g_new0(ram_addr_t, page_count);
0727d3
         socket_send_channel_create(multifd_new_send_channel_async, p);
0727d3
     }
0727d3
 
0727d3
diff --git a/migration/multifd.h b/migration/multifd.h
0727d3
index 7496f951a7..7823199dbe 100644
0727d3
--- a/migration/multifd.h
0727d3
+++ b/migration/multifd.h
0727d3
@@ -104,14 +104,18 @@ typedef struct {
0727d3
     /* thread local variables */
0727d3
     /* packets sent through this channel */
0727d3
     uint64_t num_packets;
0727d3
-    /* pages sent through this channel */
0727d3
-    uint64_t num_pages;
0727d3
+    /* non zero pages sent through this channel */
0727d3
+    uint64_t total_normal_pages;
0727d3
     /* syncs main thread and channels */
0727d3
     QemuSemaphore sem_sync;
0727d3
     /* buffers to send */
0727d3
     struct iovec *iov;
0727d3
     /* number of iovs used */
0727d3
     uint32_t iovs_num;
0727d3
+    /* Pages that are not zero */
0727d3
+    ram_addr_t *normal;
0727d3
+    /* num of non zero pages */
0727d3
+    uint32_t normal_num;
0727d3
     /* used for compression methods */
0727d3
     void *data;
0727d3
 }  MultiFDSendParams;
0727d3
diff --git a/migration/trace-events b/migration/trace-events
0727d3
index 5172cb3b3d..171a83a55d 100644
0727d3
--- a/migration/trace-events
0727d3
+++ b/migration/trace-events
0727d3
@@ -124,13 +124,13 @@ multifd_recv_sync_main_wait(uint8_t id) "channel %u"
0727d3
 multifd_recv_terminate_threads(bool error) "error %d"
0727d3
 multifd_recv_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %" PRIu64
0727d3
 multifd_recv_thread_start(uint8_t id) "%u"
0727d3
-multifd_send(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " pages %u flags 0x%x next packet size %u"
0727d3
+multifd_send(uint8_t id, uint64_t packet_num, uint32_t normal, uint32_t flags, uint32_t next_packet_size) "channel %u packet_num %" PRIu64 " normal pages %u flags 0x%x next packet size %u"
0727d3
 multifd_send_error(uint8_t id) "channel %u"
0727d3
 multifd_send_sync_main(long packet_num) "packet num %ld"
0727d3
 multifd_send_sync_main_signal(uint8_t id) "channel %u"
0727d3
 multifd_send_sync_main_wait(uint8_t id) "channel %u"
0727d3
 multifd_send_terminate_threads(bool error) "error %d"
0727d3
-multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t pages) "channel %u packets %" PRIu64 " pages %"  PRIu64
0727d3
+multifd_send_thread_end(uint8_t id, uint64_t packets, uint64_t normal_pages) "channel %u packets %" PRIu64 " normal pages %"  PRIu64
0727d3
 multifd_send_thread_start(uint8_t id) "%u"
0727d3
 multifd_tls_outgoing_handshake_start(void *ioc, void *tioc, const char *hostname) "ioc=%p tioc=%p hostname=%s"
0727d3
 multifd_tls_outgoing_handshake_error(void *ioc, const char *err) "ioc=%p err=%s"
0727d3
-- 
0727d3
2.35.3
0727d3