qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH RFC 05/12] migration/rdma: Create the multiRDMA channels


From: Zhimin Feng
Subject: [PATCH RFC 05/12] migration/rdma: Create the multiRDMA channels
Date: Thu, 9 Jan 2020 12:59:15 +0800

From: fengzhimin <address@hidden>

In both sides. We still don't transmit anything through them,
and we only build the RDMA connections.

Signed-off-by: fengzhimin <address@hidden>
---
 migration/rdma.c | 253 +++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 223 insertions(+), 30 deletions(-)

diff --git a/migration/rdma.c b/migration/rdma.c
index 992e5abfed..5b780bef36 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -403,6 +403,10 @@ typedef struct {
     char *name;
     /* channel thread id */
     QemuThread thread;
+    /* RDMAContext channel */
+    RDMAContext *rdma;
+    /* communication channel */
+    QEMUFile *file;
     /* sem where to wait for more work */
     QemuSemaphore sem;
     /* this mutex protects the following parameters */
@@ -429,6 +433,10 @@ typedef struct {
     char *name;
     /* channel thread id */
     QemuThread thread;
+    /* RDMAContext channel */
+    RDMAContext *rdma;
+    /* communication channel */
+    QEMUFile *file;
     /* sem where to wait for more work */
     QemuSemaphore sem;
     /* this mutex protects the following parameters */
@@ -3417,6 +3425,27 @@ static int qemu_rdma_accept(RDMAContext *rdma)
         qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
                             NULL,
                             (void *)(intptr_t)rdma->return_path);
+    } else if (migrate_use_multiRDMA()) {
+        int thread_count;
+        int i;
+        RDMAContext *multi_rdma = NULL;
+        thread_count = migrate_multiRDMA_channels();
+        /* create the multi Thread RDMA channels */
+        for (i = 0; i < thread_count; i++) {
+            if (multiRDMA_recv_state->params[i].rdma->cm_id == NULL) {
+                multi_rdma = multiRDMA_recv_state->params[i].rdma;
+                break;
+            }
+        }
+
+        if (multi_rdma) {
+            qemu_set_fd_handler(rdma->channel->fd,
+                                rdma_accept_incoming_migration,
+                                NULL, (void *)(intptr_t)multi_rdma);
+        } else {
+            qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+                                NULL, rdma);
+        }
     } else {
         qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
                             NULL, rdma);
@@ -4029,6 +4058,58 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma, 
const char *mode)
     return rioc->file;
 }
 
+static void *multiRDMA_recv_thread(void *opaque)
+{
+    MultiRDMARecvParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
+static void multiRDMA_recv_new_channel(QEMUFile *f, int id)
+{
+    MultiRDMARecvParams *p;
+    Error *local_err = NULL;
+
+    p = &multiRDMA_recv_state->params[id];
+    if (p->file != NULL) {
+        error_setg(&local_err,
+                   "multiRDMA: received id '%d' already setup'", id);
+        return ;
+    }
+    p->file = f;
+
+    qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, p,
+                       QEMU_THREAD_JOINABLE);
+    atomic_inc(&multiRDMA_recv_state->count);
+}
+
+static void migration_multiRDMA_process_incoming(QEMUFile *f, RDMAContext 
*rdma)
+{
+    MigrationIncomingState *mis = migration_incoming_get_current();
+
+    if (!mis->from_src_file) {
+        rdma->migration_started_on_destination = 1;
+        migration_incoming_setup(f);
+        migration_incoming_process();
+    } else {
+        multiRDMA_recv_new_channel(f, multiRDMA_recv_state->count);
+    }
+}
+
 static void rdma_accept_incoming_migration(void *opaque)
 {
     RDMAContext *rdma = opaque;
@@ -4057,29 +4138,13 @@ static void rdma_accept_incoming_migration(void *opaque)
         return;
     }
 
-    rdma->migration_started_on_destination = 1;
-    migration_fd_process_incoming(f);
-}
-
-static void *multiRDMA_recv_thread(void *opaque)
-{
-    MultiRDMARecvParams *p = opaque;
-
-    while (true) {
-        qemu_mutex_lock(&p->mutex);
-        if (p->quit) {
-            qemu_mutex_unlock(&p->mutex);
-            break;
-        }
-        qemu_mutex_unlock(&p->mutex);
-        qemu_sem_wait(&p->sem);
+    if (migrate_use_multiRDMA()) {
+        /* build the multiRDMA channels */
+        migration_multiRDMA_process_incoming(f, rdma);
+    } else {
+        rdma->migration_started_on_destination = 1;
+        migration_fd_process_incoming(f);
     }
-
-    qemu_mutex_lock(&p->mutex);
-    p->running = false;
-    qemu_mutex_unlock(&p->mutex);
-
-    return NULL;
 }
 
 static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma,
@@ -4087,6 +4152,7 @@ static int multiRDMA_load_setup(const char *host_port, 
RDMAContext *rdma,
 {
     uint8_t i;
     int thread_count;
+    int idx;
 
     thread_count = migrate_multiRDMA_channels();
     if (multiRDMA_recv_state == NULL) {
@@ -4099,15 +4165,21 @@ static int multiRDMA_load_setup(const char *host_port, 
RDMAContext *rdma,
         for (i = 0; i < thread_count; i++) {
             MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
 
+            p->rdma = qemu_rdma_data_init(host_port, errp);
+            for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+                p->rdma->wr_data[idx].control_len = 0;
+                p->rdma->wr_data[idx].control_curr = NULL;
+            }
+            /* the CM channel and CM id is shared */
+            p->rdma->channel = rdma->channel;
+            p->rdma->listen_id = rdma->listen_id;
+
             qemu_mutex_init(&p->mutex);
             qemu_sem_init(&p->sem, 0);
             p->quit = false;
             p->id = i;
             p->running = true;
             p->name = g_strdup_printf("multiRDMARecv_%d", i);
-            qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread,
-                               p, QEMU_THREAD_JOINABLE);
-            atomic_inc(&multiRDMA_recv_state->count);
         }
     }
 
@@ -4155,6 +4227,7 @@ void rdma_start_incoming_migration(const char *host_port, 
Error **errp)
         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
     }
 
+    /* initialize the RDMAContext for multiRDMA */
     if (migrate_use_multiRDMA()) {
         if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) {
             ERROR(errp, "init multiRDMA failure!");
@@ -4193,10 +4266,29 @@ static void *multiRDMA_send_thread(void *opaque)
     return NULL;
 }
 
+static void multiRDMA_send_new_channel(QEMUFile *f, int id)
+{
+    MultiRDMASendParams *p;
+    Error *local_err = NULL;
+
+    p = &multiRDMA_send_state->params[id];
+    if (p->file != NULL) {
+        error_setg(&local_err,
+                   "multiRDMA: send id '%d' already setup'", id);
+        return ;
+    }
+    p->file = f;
+
+    qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread,
+                       p, QEMU_THREAD_JOINABLE);
+    atomic_inc(&multiRDMA_send_state->count);
+}
+
 static int multiRDMA_save_setup(const char *host_port, Error **errp)
 {
     int thread_count;
     uint8_t i;
+    int ret;
 
     thread_count = migrate_multiRDMA_channels();
     multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state));
@@ -4207,6 +4299,27 @@ static int multiRDMA_save_setup(const char *host_port, 
Error **errp)
 
     for (i = 0; i < thread_count; i++) {
         MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+        QEMUFile *f = NULL;
+
+        p->rdma = qemu_rdma_data_init(host_port, errp);
+        if (p->rdma == NULL) {
+            ERROR(errp, "init RDMA data failure for multi channel %d!", i);
+            goto err;
+        }
+
+        ret = qemu_rdma_source_init(p->rdma, migrate_use_rdma_pin_all(), errp);
+        if (ret) {
+            ERROR(errp, "init RDMA source failure for multi channel %d!", i);
+            goto err;
+        }
+
+        ret = qemu_rdma_connect(p->rdma, errp);
+        if (ret) {
+            ERROR(errp, "connect multi channel %d failure!", i);
+            goto err;
+        }
+
+        f = qemu_fopen_rdma(multiRDMA_send_state->params[i].rdma, "wb");
         qemu_mutex_init(&p->mutex);
         qemu_sem_init(&p->sem, 0);
         p->quit = false;
@@ -4214,12 +4327,20 @@ static int multiRDMA_save_setup(const char *host_port, 
Error **errp)
         p->running = true;
         p->name = g_strdup_printf("multiRDMASend_%d", i);
 
-        qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p,
-                           QEMU_THREAD_JOINABLE);
-        atomic_inc(&multiRDMA_send_state->count);
+        multiRDMA_send_new_channel(f, i);
     }
 
     return 0;
+
+err:
+    for (i = 0; i < thread_count; i++) {
+        g_free(multiRDMA_send_state->params[i].rdma);
+    }
+
+    g_free(multiRDMA_send_state->params);
+    g_free(multiRDMA_send_state);
+
+    return -1;
 }
 
 static void multiRDMA_send_terminate_threads(void)
@@ -4268,6 +4389,8 @@ int multiRDMA_save_cleanup(void)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        qemu_rdma_cleanup(multiRDMA_send_state->params[i].rdma);
+        g_free(multiRDMA_send_state->params[i].rdma);
     }
 
     qemu_sem_destroy(&multiRDMA_send_state->sem_sync);
@@ -4292,6 +4415,71 @@ static void multiRDMA_recv_terminate_threads(void)
     }
 }
 
+static void qemu_multiRDMA_load_cleanup(RDMAContext *rdma)
+{
+    int idx;
+
+    if (rdma->cm_id && rdma->connected) {
+        if ((rdma->error_state ||
+             migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+            !rdma->received_error) {
+            RDMAControlHeader head = { .len = 0,
+                                       .type = RDMA_CONTROL_ERROR,
+                                       .repeat = 1,
+                                     };
+            error_report("Early error. Sending error.");
+            qemu_rdma_post_send_control(rdma, NULL, &head);
+        }
+
+        rdma_disconnect(rdma->cm_id);
+        trace_qemu_rdma_cleanup_disconnect();
+        rdma->connected = false;
+    }
+
+    g_free(rdma->dest_blocks);
+    rdma->dest_blocks = NULL;
+
+    for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+        if (rdma->wr_data[idx].control_mr) {
+            rdma->total_registrations--;
+            ibv_dereg_mr(rdma->wr_data[idx].control_mr);
+        }
+        rdma->wr_data[idx].control_mr = NULL;
+    }
+
+    if (rdma->local_ram_blocks.block) {
+        while (rdma->local_ram_blocks.nb_blocks) {
+            rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
+        }
+    }
+
+    if (rdma->qp) {
+        rdma_destroy_qp(rdma->cm_id);
+        rdma->qp = NULL;
+    }
+    if (rdma->cq) {
+        ibv_destroy_cq(rdma->cq);
+        rdma->cq = NULL;
+    }
+    if (rdma->comp_channel) {
+        ibv_destroy_comp_channel(rdma->comp_channel);
+        rdma->comp_channel = NULL;
+    }
+    if (rdma->pd) {
+        ibv_dealloc_pd(rdma->pd);
+        rdma->pd = NULL;
+    }
+    if (rdma->cm_id) {
+        rdma_destroy_id(rdma->cm_id);
+        rdma->cm_id = NULL;
+    }
+
+    rdma->listen_id = NULL;
+    rdma->channel = NULL;
+    g_free(rdma->host);
+    rdma->host = NULL;
+}
+
 int multiRDMA_load_cleanup(void)
 {
     int i;
@@ -4323,6 +4511,8 @@ int multiRDMA_load_cleanup(void)
         qemu_sem_destroy(&p->sem);
         g_free(p->name);
         p->name = NULL;
+        qemu_multiRDMA_load_cleanup(multiRDMA_recv_state->params[i].rdma);
+        g_free(multiRDMA_recv_state->params[i].rdma);
     }
 
     qemu_sem_destroy(&multiRDMA_recv_state->sem_sync);
@@ -4386,15 +4576,18 @@ void rdma_start_outgoing_migration(void *opaque,
 
     trace_rdma_start_outgoing_migration_after_rdma_connect();
 
+    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
+    /* create multiRDMA channel */
     if (migrate_use_multiRDMA()) {
         if (multiRDMA_save_setup(host_port, errp) != 0) {
             ERROR(errp, "init multiRDMA channels failure!");
             goto err;
         }
+        migrate_fd_connect(s, NULL);
+    } else {
+        migrate_fd_connect(s, NULL);
     }
 
-    s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
-    migrate_fd_connect(s, NULL);
     return;
 err:
     g_free(rdma);
-- 
2.19.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]