qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[Qemu-devel] [PATCH 3/3] multifd: Start of zlib compression code


From: Juan Quintela
Subject: [Qemu-devel] [PATCH 3/3] multifd: Start of zlib compression code
Date: Wed, 20 Feb 2019 13:57:55 +0100

This is still work in progress.

It is already faster that normal compression code inside qemu.
It don't do any unnecesary copies.  And as packages are bigger, we get
better compression.

Signed-off-by: Juan Quintela <address@hidden>
---
 hmp.c                  |  6 ++-
 migration/ram.c        | 96 ++++++++++++++++++++++++++++++++++++++++--
 migration/trace-events |  2 +-
 3 files changed, 97 insertions(+), 7 deletions(-)

diff --git a/hmp.c b/hmp.c
index abbd49ec17..7c1ad2376d 100644
--- a/hmp.c
+++ b/hmp.c
@@ -1789,8 +1789,10 @@ void hmp_migrate_set_parameter(Monitor *mon, const QDict 
*qdict)
     case MIGRATION_PARAMETER_XBZRLE_CACHE_SIZE:
         p->has_xbzrle_cache_size = true;
         visit_type_size(v, param, &cache_size, &err);
-        if (err || cache_size > INT64_MAX
-            || (size_t)cache_size != cache_size) {
+        if (err) {
+            break;
+        }
+        if (cache_size > INT64_MAX || (size_t)cache_size != cache_size) {
             error_setg(&err, "Invalid size %s", valuestr);
             break;
         }
diff --git a/migration/ram.c b/migration/ram.c
index 7de27e1a35..feb857d395 100644
--- a/migration/ram.c
+++ b/migration/ram.c
@@ -519,6 +519,7 @@ exit:
 #define MULTIFD_VERSION 1
 
 #define MULTIFD_FLAG_SYNC (1 << 0)
+#define MULTIFD_FLAG_ZLIB (1 << 1)
 
 /* This value needs to be a multiple of qemu_target_page_size() */
 #define MULTIFD_PACKET_SIZE (512 * 1024)
@@ -1031,6 +1032,7 @@ static void *multifd_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    z_stream *zs = &p->zs;
     int ret;
 
     trace_multifd_send_thread_start(p->id);
@@ -1050,8 +1052,43 @@ static void *multifd_send_thread(void *opaque)
             uint32_t used = p->pages->used;
             uint64_t packet_num = p->packet_num;
             uint32_t flags = p->flags;
+            struct iovec *iov = p->pages->iov;
 
             p->next_packet_size = used * qemu_target_page_size();
+
+            if (used && migrate_use_multifd_zlib()) {
+                uint32_t in_size = used * qemu_target_page_size();
+                uint32_t out_size = 0;
+                int i;
+
+                for (i = 0; i < used; i++ ) {
+                    uint32_t available = p->zbuff_len - out_size;
+                    int flush = Z_NO_FLUSH;
+
+                    if (i == used  - 1) {
+                        flush = Z_SYNC_FLUSH;
+                    }
+
+                    zs->avail_in = iov[i].iov_len;
+                    zs->next_in = iov[i].iov_base;
+
+                    zs->avail_out = available;
+                    zs->next_out = p->zbuff + out_size;
+
+                    ret = deflate(zs, flush);
+                    if (ret != Z_OK) {
+                        printf("problem with deflate? %d\n", ret);
+                        qemu_mutex_unlock(&p->mutex);
+                        break;
+                    }
+                    out_size += available - zs->avail_out;
+                }
+                printf("%d deflate in %d out %d diff %d\n",
+                       p->id, in_size, out_size, in_size - out_size);
+
+                p->next_packet_size = out_size;
+            }
+
             multifd_send_fill_packet(p);
             p->flags = 0;
             p->num_packets++;
@@ -1069,8 +1106,13 @@ static void *multifd_send_thread(void *opaque)
             }
 
             if (used) {
-                ret = qio_channel_writev_all(p->c, p->pages->iov,
-                                             used, &local_err);
+                if (migrate_use_multifd_zlib()) {
+                    ret = qio_channel_write_all(p->c, (void *)p->zbuff,
+                                               p->next_packet_size, 
&local_err);
+                } else {
+                    ret = qio_channel_writev_all(p->c, p->pages->iov,
+                                                 used, &local_err);
+                }
                 if (ret != 0) {
                     break;
                 }
@@ -1283,6 +1325,7 @@ static void *multifd_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
     Error *local_err = NULL;
+    z_stream *zs = &p->zs;
     int ret;
 
     trace_multifd_recv_thread_start(p->id);
@@ -1317,8 +1360,53 @@ static void *multifd_recv_thread(void *opaque)
         qemu_mutex_unlock(&p->mutex);
 
         if (used) {
-            ret = qio_channel_readv_all(p->c, p->pages->iov,
-                                        used, &local_err);
+            uint32_t in_size = p->next_packet_size;
+            uint32_t out_size = 0;
+            uint32_t expected_size = used * qemu_target_page_size();
+            int i;
+
+            if (migrate_use_multifd_zlib()) {
+                ret = qio_channel_read_all(p->c, (void *)p->zbuff,
+                                           in_size, &local_err);
+
+                if (ret != 0) {
+                    break;
+                }
+
+                zs->avail_in = in_size;
+                zs->next_in = p->zbuff;
+
+                for (i = 0; i < used; i++ ) {
+                    struct iovec *iov = &p->pages->iov[i];
+                    int flush = Z_NO_FLUSH;
+
+                    if (i == used  - 1) {
+                        flush = Z_SYNC_FLUSH;
+                    }
+
+                    zs->avail_out = iov->iov_len;
+                    zs->next_out = iov->iov_base;
+
+                    ret = inflate(zs, flush);
+                    if (ret != Z_OK) {
+                        printf("%d: problem with inflate? %d\n", p->id, ret);
+                        qemu_mutex_unlock(&p->mutex);
+                        break;
+                    }
+                    out_size += iov->iov_len;
+                }
+
+                printf("%d: out_size = %d, in_size = %d, expected_size = %d 
avail_in: %d\n",
+                       p->id, out_size, in_size, expected_size, zs->avail_in);
+                if (out_size != expected_size) {
+                    printf("out size %d expected size %d\n",
+                           out_size, expected_size);
+                    break;
+                }
+            } else {
+                ret = qio_channel_readv_all(p->c, p->pages->iov,
+                                            used, &local_err);
+            }
             if (ret != 0) {
                 break;
             }
diff --git a/migration/trace-events b/migration/trace-events
index a11e66e1d9..8aaae32e67 100644
--- a/migration/trace-events
+++ b/migration/trace-events
@@ -77,7 +77,7 @@ get_queued_page_not_dirty(const char *block_name, uint64_t 
tmp_offset, unsigned
 migration_bitmap_sync_start(void) ""
 migration_bitmap_sync_end(uint64_t dirty_pages) "dirty_pages %" PRIu64
 migration_throttle(void) ""
-multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, 
uint32_t next_packet_size) "channel %d packet number %" PRIu64 " pages %d flags 
0x%x next packet size %d"
+multifd_recv(uint8_t id, uint64_t packet_num, uint32_t used, uint32_t flags, 
uint32_t next_packet_size) "channel %d packet_num %" PRIu64 " pages %d flags 
0x%x next packet size %d"
 multifd_recv_sync_main(long packet_num) "packet num %ld"
 multifd_recv_sync_main_signal(uint8_t id) "channel %d"
 multifd_recv_sync_main_wait(uint8_t id) "channel %d"
-- 
2.20.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]