[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH RFC 05/12] migration/rdma: Create the multiRDMA channels
From: |
Zhimin Feng |
Subject: |
[PATCH RFC 05/12] migration/rdma: Create the multiRDMA channels |
Date: |
Thu, 9 Jan 2020 12:59:15 +0800 |
From: fengzhimin <address@hidden>
In both sides. We still don't transmit anything through them,
and we only build the RDMA connections.
Signed-off-by: fengzhimin <address@hidden>
---
migration/rdma.c | 253 +++++++++++++++++++++++++++++++++++++++++------
1 file changed, 223 insertions(+), 30 deletions(-)
diff --git a/migration/rdma.c b/migration/rdma.c
index 992e5abfed..5b780bef36 100644
--- a/migration/rdma.c
+++ b/migration/rdma.c
@@ -403,6 +403,10 @@ typedef struct {
char *name;
/* channel thread id */
QemuThread thread;
+ /* RDMAContext channel */
+ RDMAContext *rdma;
+ /* communication channel */
+ QEMUFile *file;
/* sem where to wait for more work */
QemuSemaphore sem;
/* this mutex protects the following parameters */
@@ -429,6 +433,10 @@ typedef struct {
char *name;
/* channel thread id */
QemuThread thread;
+ /* RDMAContext channel */
+ RDMAContext *rdma;
+ /* communication channel */
+ QEMUFile *file;
/* sem where to wait for more work */
QemuSemaphore sem;
/* this mutex protects the following parameters */
@@ -3417,6 +3425,27 @@ static int qemu_rdma_accept(RDMAContext *rdma)
qemu_set_fd_handler(rdma->channel->fd, rdma_accept_incoming_migration,
NULL,
(void *)(intptr_t)rdma->return_path);
+ } else if (migrate_use_multiRDMA()) {
+ int thread_count;
+ int i;
+ RDMAContext *multi_rdma = NULL;
+ thread_count = migrate_multiRDMA_channels();
+ /* create the multi Thread RDMA channels */
+ for (i = 0; i < thread_count; i++) {
+ if (multiRDMA_recv_state->params[i].rdma->cm_id == NULL) {
+ multi_rdma = multiRDMA_recv_state->params[i].rdma;
+ break;
+ }
+ }
+
+ if (multi_rdma) {
+ qemu_set_fd_handler(rdma->channel->fd,
+ rdma_accept_incoming_migration,
+ NULL, (void *)(intptr_t)multi_rdma);
+ } else {
+ qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
+ NULL, rdma);
+ }
} else {
qemu_set_fd_handler(rdma->channel->fd, rdma_cm_poll_handler,
NULL, rdma);
@@ -4029,6 +4058,58 @@ static QEMUFile *qemu_fopen_rdma(RDMAContext *rdma,
const char *mode)
return rioc->file;
}
+static void *multiRDMA_recv_thread(void *opaque)
+{
+ MultiRDMARecvParams *p = opaque;
+
+ while (true) {
+ qemu_mutex_lock(&p->mutex);
+ if (p->quit) {
+ qemu_mutex_unlock(&p->mutex);
+ break;
+ }
+ qemu_mutex_unlock(&p->mutex);
+ qemu_sem_wait(&p->sem);
+ }
+
+ qemu_mutex_lock(&p->mutex);
+ p->running = false;
+ qemu_mutex_unlock(&p->mutex);
+
+ return NULL;
+}
+
+static void multiRDMA_recv_new_channel(QEMUFile *f, int id)
+{
+ MultiRDMARecvParams *p;
+ Error *local_err = NULL;
+
+ p = &multiRDMA_recv_state->params[id];
+ if (p->file != NULL) {
+ error_setg(&local_err,
+ "multiRDMA: received id '%d' already setup'", id);
+ return ;
+ }
+ p->file = f;
+
+ qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread, p,
+ QEMU_THREAD_JOINABLE);
+ atomic_inc(&multiRDMA_recv_state->count);
+}
+
+static void migration_multiRDMA_process_incoming(QEMUFile *f, RDMAContext
*rdma)
+{
+ MigrationIncomingState *mis = migration_incoming_get_current();
+
+ if (!mis->from_src_file) {
+ rdma->migration_started_on_destination = 1;
+ migration_incoming_setup(f);
+ migration_incoming_process();
+ } else {
+ multiRDMA_recv_new_channel(f, multiRDMA_recv_state->count);
+ }
+}
+
static void rdma_accept_incoming_migration(void *opaque)
{
RDMAContext *rdma = opaque;
@@ -4057,29 +4138,13 @@ static void rdma_accept_incoming_migration(void *opaque)
return;
}
- rdma->migration_started_on_destination = 1;
- migration_fd_process_incoming(f);
-}
-
-static void *multiRDMA_recv_thread(void *opaque)
-{
- MultiRDMARecvParams *p = opaque;
-
- while (true) {
- qemu_mutex_lock(&p->mutex);
- if (p->quit) {
- qemu_mutex_unlock(&p->mutex);
- break;
- }
- qemu_mutex_unlock(&p->mutex);
- qemu_sem_wait(&p->sem);
+ if (migrate_use_multiRDMA()) {
+ /* build the multiRDMA channels */
+ migration_multiRDMA_process_incoming(f, rdma);
+ } else {
+ rdma->migration_started_on_destination = 1;
+ migration_fd_process_incoming(f);
}
-
- qemu_mutex_lock(&p->mutex);
- p->running = false;
- qemu_mutex_unlock(&p->mutex);
-
- return NULL;
}
static int multiRDMA_load_setup(const char *host_port, RDMAContext *rdma,
@@ -4087,6 +4152,7 @@ static int multiRDMA_load_setup(const char *host_port,
RDMAContext *rdma,
{
uint8_t i;
int thread_count;
+ int idx;
thread_count = migrate_multiRDMA_channels();
if (multiRDMA_recv_state == NULL) {
@@ -4099,15 +4165,21 @@ static int multiRDMA_load_setup(const char *host_port,
RDMAContext *rdma,
for (i = 0; i < thread_count; i++) {
MultiRDMARecvParams *p = &multiRDMA_recv_state->params[i];
+ p->rdma = qemu_rdma_data_init(host_port, errp);
+ for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+ p->rdma->wr_data[idx].control_len = 0;
+ p->rdma->wr_data[idx].control_curr = NULL;
+ }
+ /* the CM channel and CM id is shared */
+ p->rdma->channel = rdma->channel;
+ p->rdma->listen_id = rdma->listen_id;
+
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
p->quit = false;
p->id = i;
p->running = true;
p->name = g_strdup_printf("multiRDMARecv_%d", i);
- qemu_thread_create(&p->thread, p->name, multiRDMA_recv_thread,
- p, QEMU_THREAD_JOINABLE);
- atomic_inc(&multiRDMA_recv_state->count);
}
}
@@ -4155,6 +4227,7 @@ void rdma_start_incoming_migration(const char *host_port,
Error **errp)
qemu_rdma_return_path_dest_init(rdma_return_path, rdma);
}
+ /* initialize the RDMAContext for multiRDMA */
if (migrate_use_multiRDMA()) {
if (multiRDMA_load_setup(host_port, rdma, &local_err) != 0) {
ERROR(errp, "init multiRDMA failure!");
@@ -4193,10 +4266,29 @@ static void *multiRDMA_send_thread(void *opaque)
return NULL;
}
+static void multiRDMA_send_new_channel(QEMUFile *f, int id)
+{
+ MultiRDMASendParams *p;
+ Error *local_err = NULL;
+
+ p = &multiRDMA_send_state->params[id];
+ if (p->file != NULL) {
+ error_setg(&local_err,
+ "multiRDMA: send id '%d' already setup'", id);
+ return ;
+ }
+ p->file = f;
+
+ qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread,
+ p, QEMU_THREAD_JOINABLE);
+ atomic_inc(&multiRDMA_send_state->count);
+}
+
static int multiRDMA_save_setup(const char *host_port, Error **errp)
{
int thread_count;
uint8_t i;
+ int ret;
thread_count = migrate_multiRDMA_channels();
multiRDMA_send_state = g_malloc0(sizeof(*multiRDMA_send_state));
@@ -4207,6 +4299,27 @@ static int multiRDMA_save_setup(const char *host_port,
Error **errp)
for (i = 0; i < thread_count; i++) {
MultiRDMASendParams *p = &multiRDMA_send_state->params[i];
+ QEMUFile *f = NULL;
+
+ p->rdma = qemu_rdma_data_init(host_port, errp);
+ if (p->rdma == NULL) {
+ ERROR(errp, "init RDMA data failure for multi channel %d!", i);
+ goto err;
+ }
+
+ ret = qemu_rdma_source_init(p->rdma, migrate_use_rdma_pin_all(), errp);
+ if (ret) {
+ ERROR(errp, "init RDMA source failure for multi channel %d!", i);
+ goto err;
+ }
+
+ ret = qemu_rdma_connect(p->rdma, errp);
+ if (ret) {
+ ERROR(errp, "connect multi channel %d failure!", i);
+ goto err;
+ }
+
+ f = qemu_fopen_rdma(multiRDMA_send_state->params[i].rdma, "wb");
qemu_mutex_init(&p->mutex);
qemu_sem_init(&p->sem, 0);
p->quit = false;
@@ -4214,12 +4327,20 @@ static int multiRDMA_save_setup(const char *host_port,
Error **errp)
p->running = true;
p->name = g_strdup_printf("multiRDMASend_%d", i);
- qemu_thread_create(&p->thread, p->name, multiRDMA_send_thread, p,
- QEMU_THREAD_JOINABLE);
- atomic_inc(&multiRDMA_send_state->count);
+ multiRDMA_send_new_channel(f, i);
}
return 0;
+
+err:
+ for (i = 0; i < thread_count; i++) {
+ g_free(multiRDMA_send_state->params[i].rdma);
+ }
+
+ g_free(multiRDMA_send_state->params);
+ g_free(multiRDMA_send_state);
+
+ return -1;
}
static void multiRDMA_send_terminate_threads(void)
@@ -4268,6 +4389,8 @@ int multiRDMA_save_cleanup(void)
qemu_sem_destroy(&p->sem);
g_free(p->name);
p->name = NULL;
+ qemu_rdma_cleanup(multiRDMA_send_state->params[i].rdma);
+ g_free(multiRDMA_send_state->params[i].rdma);
}
qemu_sem_destroy(&multiRDMA_send_state->sem_sync);
@@ -4292,6 +4415,71 @@ static void multiRDMA_recv_terminate_threads(void)
}
}
+static void qemu_multiRDMA_load_cleanup(RDMAContext *rdma)
+{
+ int idx;
+
+ if (rdma->cm_id && rdma->connected) {
+ if ((rdma->error_state ||
+ migrate_get_current()->state == MIGRATION_STATUS_CANCELLING) &&
+ !rdma->received_error) {
+ RDMAControlHeader head = { .len = 0,
+ .type = RDMA_CONTROL_ERROR,
+ .repeat = 1,
+ };
+ error_report("Early error. Sending error.");
+ qemu_rdma_post_send_control(rdma, NULL, &head);
+ }
+
+ rdma_disconnect(rdma->cm_id);
+ trace_qemu_rdma_cleanup_disconnect();
+ rdma->connected = false;
+ }
+
+ g_free(rdma->dest_blocks);
+ rdma->dest_blocks = NULL;
+
+ for (idx = 0; idx < RDMA_WRID_MAX; idx++) {
+ if (rdma->wr_data[idx].control_mr) {
+ rdma->total_registrations--;
+ ibv_dereg_mr(rdma->wr_data[idx].control_mr);
+ }
+ rdma->wr_data[idx].control_mr = NULL;
+ }
+
+ if (rdma->local_ram_blocks.block) {
+ while (rdma->local_ram_blocks.nb_blocks) {
+ rdma_delete_block(rdma, &rdma->local_ram_blocks.block[0]);
+ }
+ }
+
+ if (rdma->qp) {
+ rdma_destroy_qp(rdma->cm_id);
+ rdma->qp = NULL;
+ }
+ if (rdma->cq) {
+ ibv_destroy_cq(rdma->cq);
+ rdma->cq = NULL;
+ }
+ if (rdma->comp_channel) {
+ ibv_destroy_comp_channel(rdma->comp_channel);
+ rdma->comp_channel = NULL;
+ }
+ if (rdma->pd) {
+ ibv_dealloc_pd(rdma->pd);
+ rdma->pd = NULL;
+ }
+ if (rdma->cm_id) {
+ rdma_destroy_id(rdma->cm_id);
+ rdma->cm_id = NULL;
+ }
+
+ rdma->listen_id = NULL;
+ rdma->channel = NULL;
+ g_free(rdma->host);
+ rdma->host = NULL;
+}
+
int multiRDMA_load_cleanup(void)
{
int i;
@@ -4323,6 +4511,8 @@ int multiRDMA_load_cleanup(void)
qemu_sem_destroy(&p->sem);
g_free(p->name);
p->name = NULL;
+ qemu_multiRDMA_load_cleanup(multiRDMA_recv_state->params[i].rdma);
+ g_free(multiRDMA_recv_state->params[i].rdma);
}
qemu_sem_destroy(&multiRDMA_recv_state->sem_sync);
@@ -4386,15 +4576,18 @@ void rdma_start_outgoing_migration(void *opaque,
trace_rdma_start_outgoing_migration_after_rdma_connect();
+ s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
+ /* create multiRDMA channel */
if (migrate_use_multiRDMA()) {
if (multiRDMA_save_setup(host_port, errp) != 0) {
ERROR(errp, "init multiRDMA channels failure!");
goto err;
}
+ migrate_fd_connect(s, NULL);
+ } else {
+ migrate_fd_connect(s, NULL);
}
- s->to_dst_file = qemu_fopen_rdma(rdma, "wb");
- migrate_fd_connect(s, NULL);
return;
err:
g_free(rdma);
--
2.19.1
- [PATCH RFC 10/12] migration/rdma: use multiRDMA to send RAM block for rdma-pin-all mode, (continued)
[PATCH RFC 02/12] migration: Export the 'migration_incoming_setup' function and add the 'migrate_use_rdma_pin_all' function, Zhimin Feng, 2020/01/09
[PATCH RFC 07/12] migration/rdma: Be sure all channels are created, Zhimin Feng, 2020/01/09
[PATCH RFC 05/12] migration/rdma: Create the multiRDMA channels,
Zhimin Feng <=
Re: [PATCH RFC 00/12] *** mulitple RDMA channels for migration ***, no-reply, 2020/01/09
Re: [PATCH RFC 00/12] *** mulitple RDMA channels for migration ***, Dr. David Alan Gilbert, 2020/01/15