qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v3 14/18] migration/rdma: register memory for multifd RDMA channe


From: Chuan Zheng
Subject: [PATCH v3 14/18] migration/rdma: register memory for multifd RDMA channels
Date: Sat, 17 Oct 2020 12:25:44 +0800

Signed-off-by: Zhimin Feng <fengzhimin1@huawei.com>
Signed-off-by: Chuan Zheng <zhengchuan@huawei.com>
---
 migration/multifd.c |  3 ++
 migration/rdma.c    | 94 +++++++++++++++++++++++++++++++++++++++++++++++++++--
 2 files changed, 95 insertions(+), 2 deletions(-)

diff --git a/migration/multifd.c b/migration/multifd.c
index 9439b3c..c4d90ef 100644
--- a/migration/multifd.c
+++ b/migration/multifd.c
@@ -534,6 +534,9 @@ void multifd_send_terminate_threads(Error *err)
         qemu_mutex_lock(&p->mutex);
         p->quit = true;
         qemu_sem_post(&p->sem);
+        if (migrate_use_rdma()) {
+            qemu_sem_post(&p->sem_sync);
+        }
         qemu_mutex_unlock(&p->mutex);
     }
 }
diff --git a/migration/rdma.c b/migration/rdma.c
index a366849..3210e6e 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -3837,6 +3837,19 @@ static int rdma_load_hook(QEMUFile *f, void *opaque, 
uint64_t flags, void *data)
         return rdma_block_notification_handle(opaque, data);
 
     case RAM_CONTROL_HOOK:
+        if (migrate_use_multifd()) {
+            int i;
+            MultiFDRecvParams *multifd_recv_param = NULL;
+            int thread_count = migrate_multifd_channels();
+            /* Inform dest recv_thread to poll */
+            for (i = 0; i < thread_count; i++) {
+                if (get_multifd_recv_param(i, &multifd_recv_param)) {
+                    return -1;
+                }
+                qemu_sem_post(&multifd_recv_param->sem_sync);
+            }
+        }
+
         return qemu_rdma_registration_handle(f, opaque);
 
     default:
@@ -3909,6 +3922,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
         head.type = RDMA_CONTROL_RAM_BLOCKS_REQUEST;
         trace_qemu_rdma_registration_stop_ram();
 
+        if (migrate_use_multifd()) {
+            /*
+             * Inform the multifd channels to register memory
+             */
+            int i;
+            int thread_count = migrate_multifd_channels();
+            MultiFDSendParams *multifd_send_param = NULL;
+            for (i = 0; i < thread_count; i++) {
+                ret = get_multifd_send_param(i, &multifd_send_param);
+                if (ret) {
+                    error_report("rdma: error getting multifd(%d)", i);
+                    return ret;
+                }
+
+                qemu_sem_post(&multifd_send_param->sem_sync);
+            }
+        }
+
         /*
          * Make sure that we parallelize the pinning on both sides.
          * For very large guests, doing this serially takes a really
@@ -3967,6 +3998,21 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
                     rdma->dest_blocks[i].remote_host_addr;
             local->block[i].remote_rkey = rdma->dest_blocks[i].remote_rkey;
         }
+        /* Wait for all multifd channels to complete registration */
+        if (migrate_use_multifd()) {
+            int i;
+            int thread_count = migrate_multifd_channels();
+            MultiFDSendParams *multifd_send_param = NULL;
+            for (i = 0; i < thread_count; i++) {
+                ret = get_multifd_send_param(i, &multifd_send_param);
+                if (ret) {
+                    error_report("rdma: error getting multifd(%d)", i);
+                    return ret;
+                }
+
+                qemu_sem_wait(&multifd_send_param->sem);
+            }
+        }
     }
 
     trace_qemu_rdma_registration_stop(flags);
@@ -3978,6 +4024,24 @@ static int qemu_rdma_registration_stop(QEMUFile *f, void 
*opaque,
         goto err;
     }
 
+    if (migrate_use_multifd()) {
+        /*
+         * Inform src send_thread to send FINISHED signal.
+         * Wait for multifd RDMA send threads to poll the CQE.
+         */
+        int i;
+        int thread_count = migrate_multifd_channels();
+        MultiFDSendParams *multifd_send_param = NULL;
+        for (i = 0; i < thread_count; i++) {
+            ret = get_multifd_send_param(i, &multifd_send_param);
+            if (ret < 0) {
+                goto err;
+            }
+
+            qemu_sem_post(&multifd_send_param->sem_sync);
+        }
+    }
+
     return 0;
 err:
     rdma->error_state = ret;
@@ -4355,20 +4419,39 @@ static void *multifd_rdma_send_thread(void *opaque)
 {
     MultiFDSendParams *p = opaque;
     Error *local_err = NULL;
+    int ret = 0;
+    RDMAControlHeader head = { .len = 0, .repeat = 1 };
 
     trace_multifd_send_thread_start(p->id);
     if (multifd_send_initial_packet(p, &local_err) < 0) {
         goto out;
     }
 
+    /* wait for semaphore notification to register memory */
+    qemu_sem_wait(&p->sem_sync);
+    if (qemu_rdma_registration(p->rdma) < 0) {
+        goto out;
+    }
+    /*
+     * Inform the main RDMA thread to run when multifd
+     * RDMA thread have completed registration.
+     */
+    qemu_sem_post(&p->sem);
     while (true) {
+        qemu_sem_wait(&p->sem_sync);
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+
+        /* Send FINISHED to the destination */
+        head.type = RDMA_CONTROL_REGISTER_FINISHED;
+        ret = qemu_rdma_exchange_send(p->rdma, &head, NULL, NULL, NULL, NULL);
+        if (ret < 0) {
+            return NULL;
+        }
     }
 
 out:
@@ -4406,15 +4489,22 @@ static void 
multifd_rdma_send_channel_setup(MultiFDSendParams *p)
 static void *multifd_rdma_recv_thread(void *opaque)
 {
     MultiFDRecvParams *p = opaque;
+    int ret = 0;
 
     while (true) {
+        qemu_sem_wait(&p->sem_sync);
+
         qemu_mutex_lock(&p->mutex);
         if (p->quit) {
             qemu_mutex_unlock(&p->mutex);
             break;
         }
         qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem_sync);
+        ret = qemu_rdma_registration_handle(p->file, p->c);
+        if (ret < 0) {
+            qemu_file_set_error(p->file, ret);
+            break;
+        }
     }
 
     qemu_mutex_lock(&p->mutex);
-- 
1.8.3.1




reply via email to

[Prev in Thread] Current Thread [Next in Thread]