qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH RFC 04/12] migration/rdma: Create multiRDMA migration threads


From: Zhimin Feng
Subject: [PATCH RFC 04/12] migration/rdma: Create multiRDMA migration threads
Date: Thu, 9 Jan 2020 12:59:14 +0800

From: fengzhimin <address@hidden>

Creation of the RDMA threads, nothing inside yet.

Signed-off-by: fengzhimin <address@hidden>
---
 migration/migration.c |   1 +
 migration/migration.h |   2 +
 migration/rdma.c      | 283 ++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 286 insertions(+)

diff --git a/migration/migration.c b/migration/migration.c
index 5756a4806e..f8d4eb657e 100644
--- a/migration/migration.c
+++ b/migration/migration.c
@@ -1546,6 +1546,7 @@ static void migrate_fd_cleanup(MigrationState *s)
         qemu_mutex_lock_iothread();
 
         multifd_save_cleanup();
+        multiRDMA_save_cleanup();
         qemu_mutex_lock(&s->qemu_file_lock);
         tmp = s->to_dst_file;
         s->to_dst_file = NULL;
diff --git a/migration/migration.h b/migration/migration.h
index 4192c22d8c..d69a3fe4e9 100644
--- a/migration/migration.h
+++ b/migration/migration.h
@@ -272,6 +272,8 @@ void migration_incoming_process(void);
 
 bool  migration_has_all_channels(void);
 int migrate_multiRDMA_channels(void);
+int multiRDMA_save_cleanup(void);
+int multiRDMA_load_cleanup(void);
 
 uint64_t migrate_max_downtime(void);
 
diff --git a/migration/rdma.c b/migration/rdma.c
index e241dcb992..992e5abfed 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -395,6 +395,58 @@ typedef struct RDMAContext {
     bool is_return_path;
 } RDMAContext;
 
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
+    uint8_t id;
+    /* channel thread name */
+    char *name;
+    /* channel thread id */
+    QemuThread thread;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
+    /* this mutex protects the following parameters */
+    QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
+    bool quit;
+}  MultiRDMASendParams;
+
+struct {
+    MultiRDMASendParams *params;
+    /* number of created threads */
+    int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+} *multiRDMA_send_state;
+
+typedef struct {
+    /* this fields are not changed once the thread is created */
+    /* channel number */
+    uint8_t id;
+    /* channel thread name */
+    char *name;
+    /* channel thread id */
+    QemuThread thread;
+    /* sem where to wait for more work */
+    QemuSemaphore sem;
+    /* this mutex protects the following parameters */
+    QemuMutex mutex;
+    /* is this channel thread running */
+    bool running;
+    /* should this thread finish */
+    bool quit;
+} MultiRDMARecvParams;
+
+struct {
+    MultiRDMARecvParams *params;
+    /* number of created threads */
+    int count;
+    /* syncs main thread and channels */
+    QemuSemaphore sem_sync;
+} *multiRDMA_recv_state;
+
 #define TYPE_QIO_CHANNEL_RDMA "qio-channel-rdma"
 #define QIO_CHANNEL_RDMA(obj)                                     \
     OBJECT_CHECK(QIOChannelRDMA, (obj), TYPE_QIO_CHANNEL_RDMA)
@@ -3018,6 +3070,7 @@ static void qio_channel_rdma_close_rcu(struct 
rdma_close_rcu *rcu)
     if (rcu->rdmaout) {
         qemu_rdma_cleanup(rcu->rdmaout);
     }
+    multiRDMA_load_cleanup();
 
     g_free(rcu->rdmain);
     g_free(rcu->rdmaout);
@@ -3919,6 +3972,7 @@ static void qio_channel_rdma_finalize(Object *obj)
         g_free(rioc->rdmaout);
         rioc->rdmaout = NULL;
     }
+    multiRDMA_load_cleanup();
 }
 
 static void qio_channel_rdma_class_init(ObjectClass *klass,
@@ -4007,6 +4061,59 @@ static void rdma_accept_incoming_migration(void *opaque)
     migration_fd_process_incoming(f);
 }
 
+static void *multiRDMA_recv_thread(void *opaque)
+{
+    MultiRDMARecvParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
+static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma,
+                                      Error **errp)
+{
+    uint8_t i;
+    int thread_count;
+
+    thread_count = migrate_multiRDMA_channels();
+    if (multiRDMA_recv_state == NULL) {
+        multiRDMA_recv_state = g_malloc0(sizeof(*multiRDMA_recv_state));
+        multiRDMA_recv_state->params = g_new0(MultiRDMARecvParams,
+                                              thread_count);
+        atomic_set(&multiRDMA_recv_state->count, 0);
+        qemu_sem_init(&multiRDMA_recv_state->sem_sync, 0);
+
+        for (i = 0; i < thread_count; i++) {
+            MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+
+            qemu_mutex_init(&p->mutex);
+            qemu_sem_init(&p->sem, 0);
+            p->quit = false;
+            p->id = i;
+            p->running = true;
+            p->name = g_strdup_printf("multiRDMARecv_%d", i);
+            qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread,
+                               p, QEMU_THREAD_JOINABLE);
+            atomic_inc(&multiRDMA_recv_state->count);
+        }
+    }
+
+    return 0;
+}
+
 void rdma_start_incoming_migration(const char *host_port, Error **errp)
 {
     int ret;
@@ -4048,6 +4155,13 @@ void rdma_start_incoming_migration(const char 
*host_port, Error **errp)
         qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
     }
 
+    if (migrate_use_multiRDMA()) {
+        if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) {
+            ERROR(errp, "init multiRDMA failure!");
+            goto err;
+        }
+    }
+
     qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
                         NULL, (void *)(intptr_t)rdma);
     return;
@@ -4055,6 +4169,167 @@ err:
     error_propagate(errp, local_err);
     g_free(rdma);
     g_free(rdma_return_path);
+    multiRDMA_load_cleanup();
+}
+
+static void *multiRDMA_send_thread(void *opaque)
+{
+    MultiRDMASendParams *p = opaque;
+
+    while (true) {
+        qemu_mutex_lock(&p->mutex);
+        if (p->quit) {
+            qemu_mutex_unlock(&p->mutex);
+            break;
+        }
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_wait(&p->sem);
+    }
+
+    qemu_mutex_lock(&p->mutex);
+    p->running = false;
+    qemu_mutex_unlock(&p->mutex);
+
+    return NULL;
+}
+
+static int multiRDMA_save_setup(const char *host_port, Error **errp)
+{
+    int thread_count;
+    uint8_t i;
+
+    thread_count = migrate_multiRDMA_channels();
+    multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state));
+    multiRDMA_send_state->params = g_new0(MultiRDMASendParams,
+                                          thread_count);
+    atomic_set(&multiRDMA_send_state->count, 0);
+    qemu_sem_init(&multiRDMA_send_state->sem_sync, 0);
+
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+        qemu_mutex_init(&p->mutex);
+        qemu_sem_init(&p->sem, 0);
+        p->quit = false;
+        p->id = i;
+        p->running = true;
+        p->name = g_strdup_printf("multiRDMASend_%d", i);
+
+        qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p,
+                           QEMU_THREAD_JOINABLE);
+        atomic_inc(&multiRDMA_send_state->count);
+    }
+
+    return 0;
+}
+
+static void multiRDMA_send_terminate_threads(void)
+{
+    int i;
+    int thread_count = migrate_multiRDMA_channels();
+
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        p->quit = true;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+}
+
+int multiRDMA_save_cleanup(void)
+{
+    int i;
+    int ret = 0;
+    int thread_count = migrate_multiRDMA_channels();
+
+    if (!migrate_use_multiRDMA()) {
+        return 0;
+    }
+
+    /* prevent double free */
+    if (multiRDMA_send_state == NULL) {
+        return 0;
+    }
+
+    /* notify multi RDMA threads to exit */
+    multiRDMA_send_terminate_threads();
+
+    /* wait for multi RDMA send threads to be exit */
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+
+        qemu_thread_join(&p->thread);
+    }
+
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+        qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem);
+        g_free(p->name);
+        p->name = NULL;
+    }
+
+    qemu_sem_destroy(&multiRDMA_send_state->sem_sync);
+    g_free(multiRDMA_send_state);
+    multiRDMA_send_state = NULL;
+
+    return ret;
+}
+
+static void multiRDMA_recv_terminate_threads(void)
+{
+    int i;
+    int thread_count = migrate_multiRDMA_channels();
+
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+
+        qemu_mutex_lock(&p->mutex);
+        p->quit = true;
+        qemu_mutex_unlock(&p->mutex);
+        qemu_sem_post(&p->sem);
+    }
+}
+
+int multiRDMA_load_cleanup(void)
+{
+    int i;
+    int ret = 0;
+    int thread_count = migrate_multiRDMA_channels();
+
+    if (!migrate_use_multiRDMA()) {
+        return 0;
+    }
+
+    /* prevent double free */
+    if (multiRDMA_recv_state == NULL) {
+        return 0;
+    }
+
+    /* notify multi RDMA recv threads to exit */
+    multiRDMA_recv_terminate_threads();
+
+    /* wait for multi RDMA threads to be exit */
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+
+        qemu_thread_join(&p->thread);
+    }
+
+    for (i = 0; i < thread_count; i++) {
+        MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+        qemu_mutex_destroy(&p->mutex);
+        qemu_sem_destroy(&p->sem);
+        g_free(p->name);
+        p->name = NULL;
+    }
+
+    qemu_sem_destroy(&multiRDMA_recv_state->sem_sync);
+    g_free(multiRDMA_recv_state);
+    multiRDMA_recv_state = NULL;
+
+    return ret;
 }
 
 void rdma_start_outgoing_migration(void *opaque,
@@ -4111,10 +4386,18 @@ void rdma_start_outgoing_migration(void *opaque,
 
     trace_rdma_start_outgoing_migration_after_rdma_connect();
 
+    if (migrate_use_multiRDMA()) {
+        if (multiRDMA_save_setup(host_port, errp) != 0) {
+            ERROR(errp, "init multiRDMA channels failure!");
+            goto err;
+        }
+    }
+
     s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
     migrate_fd_connect(s, NULL);
     return;
 err:
     g_free(rdma);
     g_free(rdma_return_path);
+    multiRDMA_save_cleanup();
 }
-- 
2.19.1





reply via email to

[Prev in Thread] Current Thread [Next in Thread]