[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [RFC PATCH v3 11/30] migration/multifd: Allow multifd without packet
From: |
Fabiano Rosas |
Subject: |
Re: [RFC PATCH v3 11/30] migration/multifd: Allow multifd without packets |
Date: |
Mon, 15 Jan 2024 15:39:29 -0300 |
Peter Xu <peterx@redhat.com> writes:
> On Mon, Nov 27, 2023 at 05:25:53PM -0300, Fabiano Rosas wrote:
>> For the upcoming support to the new 'fixed-ram' migration stream
>> format, we cannot use multifd packets because each write into the
>> ramblock section in the migration file is expected to contain only the
>> guest pages. They are written at their respective offsets relative to
>> the ramblock section header.
>>
>> There is no space for the packet information and the expected gains
>> from the new approach come partly from being able to write the pages
>> sequentially without extraneous data in between.
>>
>> The new format also doesn't need the packets and all necessary
>> information can be taken from the standard migration headers with some
>> (future) changes to multifd code.
>>
>> Use the presence of the fixed-ram capability to decide whether to send
>> packets. For now this has no effect as fixed-ram cannot yet be enabled
>> with multifd.
>>
>> Signed-off-by: Fabiano Rosas <farosas@suse.de>
>> ---
>> - moved more of the packet code under use_packets
>> ---
>> migration/multifd.c | 138 +++++++++++++++++++++++++++-----------------
>> migration/options.c | 5 ++
>> migration/options.h | 1 +
>> 3 files changed, 91 insertions(+), 53 deletions(-)
>>
>> diff --git a/migration/multifd.c b/migration/multifd.c
>> index ec58c58082..9625640d61 100644
>> --- a/migration/multifd.c
>> +++ b/migration/multifd.c
>> @@ -654,18 +654,22 @@ static void *multifd_send_thread(void *opaque)
>> Error *local_err = NULL;
>> int ret = 0;
>> bool use_zero_copy_send = migrate_zero_copy_send();
>> + bool use_packets = migrate_multifd_packets();
>>
>> thread = migration_threads_add(p->name, qemu_get_thread_id());
>>
>> trace_multifd_send_thread_start(p->id);
>> rcu_register_thread();
>>
>> - if (multifd_send_initial_packet(p, &local_err) < 0) {
>> - ret = -1;
>> - goto out;
>> + if (use_packets) {
>> + if (multifd_send_initial_packet(p, &local_err) < 0) {
>> + ret = -1;
>> + goto out;
>> + }
>> +
>> + /* initial packet */
>> + p->num_packets = 1;
>> }
>> - /* initial packet */
>> - p->num_packets = 1;
>>
>> while (true) {
>> qemu_sem_post(&multifd_send_state->channels_ready);
>> @@ -677,11 +681,10 @@ static void *multifd_send_thread(void *opaque)
>> qemu_mutex_lock(&p->mutex);
>>
>> if (p->pending_job) {
>> - uint64_t packet_num = p->packet_num;
>> uint32_t flags;
>> p->normal_num = 0;
>>
>> - if (use_zero_copy_send) {
>> + if (!use_packets || use_zero_copy_send) {
>> p->iovs_num = 0;
>> } else {
>> p->iovs_num = 1;
>> @@ -699,16 +702,20 @@ static void *multifd_send_thread(void *opaque)
>> break;
>> }
>> }
>> - multifd_send_fill_packet(p);
>> +
>> + if (use_packets) {
>> + multifd_send_fill_packet(p);
>> + p->num_packets++;
>> + }
>> +
>> flags = p->flags;
>> p->flags = 0;
>> - p->num_packets++;
>> p->total_normal_pages += p->normal_num;
>> p->pages->num = 0;
>> p->pages->block = NULL;
>> qemu_mutex_unlock(&p->mutex);
>>
>> - trace_multifd_send(p->id, packet_num, p->normal_num, flags,
>> + trace_multifd_send(p->id, p->packet_num, p->normal_num, flags,
>> p->next_packet_size);
>>
>> if (use_zero_copy_send) {
>> @@ -718,7 +725,7 @@ static void *multifd_send_thread(void *opaque)
>> if (ret != 0) {
>> break;
>> }
>> - } else {
>> + } else if (use_packets) {
>> /* Send header using the same writev call */
>> p->iov[0].iov_len = p->packet_len;
>> p->iov[0].iov_base = p->packet;
>> @@ -904,6 +911,7 @@ int multifd_save_setup(Error **errp)
>> {
>> int thread_count;
>> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>> + bool use_packets = migrate_multifd_packets();
>> uint8_t i;
>>
>> if (!migrate_multifd()) {
>> @@ -928,14 +936,20 @@ int multifd_save_setup(Error **errp)
>> p->pending_job = 0;
>> p->id = i;
>> p->pages = multifd_pages_init(page_count);
>> - p->packet_len = sizeof(MultiFDPacket_t)
>> - + sizeof(uint64_t) * page_count;
>> - p->packet = g_malloc0(p->packet_len);
>> - p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>> - p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>> +
>> + if (use_packets) {
>> + p->packet_len = sizeof(MultiFDPacket_t)
>> + + sizeof(uint64_t) * page_count;
>> + p->packet = g_malloc0(p->packet_len);
>> + p->packet->magic = cpu_to_be32(MULTIFD_MAGIC);
>> + p->packet->version = cpu_to_be32(MULTIFD_VERSION);
>> +
>> + /* We need one extra place for the packet header */
>> + p->iov = g_new0(struct iovec, page_count + 1);
>> + } else {
>> + p->iov = g_new0(struct iovec, page_count);
>> + }
>> p->name = g_strdup_printf("multifdsend_%d", i);
>> - /* We need one extra place for the packet header */
>> - p->iov = g_new0(struct iovec, page_count + 1);
>> p->normal = g_new0(ram_addr_t, page_count);
>> p->page_size = qemu_target_page_size();
>> p->page_count = page_count;
>> @@ -1067,7 +1081,7 @@ void multifd_recv_sync_main(void)
>> {
>> int i;
>>
>> - if (!migrate_multifd()) {
>> + if (!migrate_multifd() || !migrate_multifd_packets()) {
>> return;
>> }
>> for (i = 0; i < migrate_multifd_channels(); i++) {
>
> This noops the recv sync when use_packets=1, makes sense.
>
> How about multifd_send_sync_main()? Should we do the same?
>
It seems it got lost during rebase.
>> @@ -1094,38 +1108,44 @@ static void *multifd_recv_thread(void *opaque)
>> {
>> MultiFDRecvParams *p = opaque;
>> Error *local_err = NULL;
>> + bool use_packets = migrate_multifd_packets();
>> int ret;
>>
>> trace_multifd_recv_thread_start(p->id);
>> rcu_register_thread();
>>
>> while (true) {
>> - uint32_t flags;
>> + uint32_t flags = 0;
>> + p->normal_num = 0;
>>
>> if (p->quit) {
>> break;
>> }
>>
>> - ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>> - p->packet_len, &local_err);
>> - if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
>> - break;
>> - }
>> + if (use_packets) {
>> + ret = qio_channel_read_all_eof(p->c, (void *)p->packet,
>> + p->packet_len, &local_err);
>> + if (ret == 0 || ret == -1) { /* 0: EOF -1: Error */
>> + break;
>> + }
>> +
>> + qemu_mutex_lock(&p->mutex);
>> + ret = multifd_recv_unfill_packet(p, &local_err);
>> + if (ret) {
>> + qemu_mutex_unlock(&p->mutex);
>> + break;
>> + }
>> + p->num_packets++;
>> +
>> + flags = p->flags;
>> + /* recv methods don't know how to handle the SYNC flag */
>> + p->flags &= ~MULTIFD_FLAG_SYNC;
>> + trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
>> + p->next_packet_size);
>>
>> - qemu_mutex_lock(&p->mutex);
>> - ret = multifd_recv_unfill_packet(p, &local_err);
>> - if (ret) {
>> - qemu_mutex_unlock(&p->mutex);
>> - break;
>> + p->total_normal_pages += p->normal_num;
>> }
>>
>> - flags = p->flags;
>> - /* recv methods don't know how to handle the SYNC flag */
>> - p->flags &= ~MULTIFD_FLAG_SYNC;
>> - trace_multifd_recv(p->id, p->packet_num, p->normal_num, flags,
>> - p->next_packet_size);
>> - p->num_packets++;
>> - p->total_normal_pages += p->normal_num;
>> qemu_mutex_unlock(&p->mutex);
>>
>> if (p->normal_num) {
>> @@ -1135,7 +1155,7 @@ static void *multifd_recv_thread(void *opaque)
>> }
>> }
>>
>> - if (flags & MULTIFD_FLAG_SYNC) {
>> + if (use_packets && (flags & MULTIFD_FLAG_SYNC)) {
>> qemu_sem_post(&multifd_recv_state->sem_sync);
>> qemu_sem_wait(&p->sem_sync);
>> }
>> @@ -1159,6 +1179,7 @@ int multifd_load_setup(Error **errp)
>> {
>> int thread_count;
>> uint32_t page_count = MULTIFD_PACKET_SIZE / qemu_target_page_size();
>> + bool use_packets = migrate_multifd_packets();
>> uint8_t i;
>>
>> /*
>> @@ -1183,9 +1204,12 @@ int multifd_load_setup(Error **errp)
>> qemu_sem_init(&p->sem_sync, 0);
>> p->quit = false;
>> p->id = i;
>> - p->packet_len = sizeof(MultiFDPacket_t)
>> - + sizeof(uint64_t) * page_count;
>> - p->packet = g_malloc0(p->packet_len);
>> +
>> + if (use_packets) {
>> + p->packet_len = sizeof(MultiFDPacket_t)
>> + + sizeof(uint64_t) * page_count;
>> + p->packet = g_malloc0(p->packet_len);
>> + }
>> p->name = g_strdup_printf("multifdrecv_%d", i);
>> p->iov = g_new0(struct iovec, page_count);
>> p->normal = g_new0(ram_addr_t, page_count);
>> @@ -1231,18 +1255,27 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error
>> **errp)
>> {
>> MultiFDRecvParams *p;
>> Error *local_err = NULL;
>> - int id;
>> + bool use_packets = migrate_multifd_packets();
>> + int id, num_packets = 0;
>>
>> - id = multifd_recv_initial_packet(ioc, &local_err);
>> - if (id < 0) {
>> - multifd_recv_terminate_threads(local_err);
>> - error_propagate_prepend(errp, local_err,
>> - "failed to receive packet"
>> - " via multifd channel %d: ",
>> - qatomic_read(&multifd_recv_state->count));
>> - return;
>> + if (use_packets) {
>> + id = multifd_recv_initial_packet(ioc, &local_err);
>> + if (id < 0) {
>> + multifd_recv_terminate_threads(local_err);
>> + error_propagate_prepend(errp, local_err,
>> + "failed to receive packet"
>> + " via multifd channel %d: ",
>> +
>> qatomic_read(&multifd_recv_state->count));
>> + return;
>> + }
>> + trace_multifd_recv_new_channel(id);
>> +
>> + /* initial packet */
>> + num_packets = 1;
>> + } else {
>> + /* next patch gives this a meaningful value */
>> + id = 0;
>> }
>> - trace_multifd_recv_new_channel(id);
>>
>> p = &multifd_recv_state->params[id];
>> if (p->c != NULL) {
>> @@ -1253,9 +1286,8 @@ void multifd_recv_new_channel(QIOChannel *ioc, Error
>> **errp)
>> return;
>> }
>> p->c = ioc;
>> + p->num_packets = num_packets;
>> object_ref(OBJECT(ioc));
>> - /* initial packet */
>> - p->num_packets = 1;
>>
>> p->running = true;
>> qemu_thread_create(&p->thread, p->name, multifd_recv_thread, p,
>> diff --git a/migration/options.c b/migration/options.c
>> index 775428a8a5..10730b13ba 100644
>> --- a/migration/options.c
>> +++ b/migration/options.c
>> @@ -385,6 +385,11 @@ bool migrate_multifd_flush_after_each_section(void)
>> return s->multifd_flush_after_each_section;
>> }
>>
>> +bool migrate_multifd_packets(void)
>
> Maybe multifd_use_packets()? Dropping the migrate_ prefix as this is not a
> global API but multifd-only. Then if multifd_packets() reads too weird and
> unclear, "add" makes it clear.
>
We removed all the instances of migrate_use_* from the migration code
recently. Not sure we should introduce them back, it seems like a step
back.
We're setting 'use_packets = migrate_multifd_packets()' in most places,
so I guess 'use_packets = multifd_packets()' wouldn't be too bad.
>> +{
>> + return !migrate_fixed_ram();
>> +}
>> +
>> bool migrate_postcopy(void)
>> {
>> return migrate_postcopy_ram() || migrate_dirty_bitmaps();
>> diff --git a/migration/options.h b/migration/options.h
>> index 8680a10b79..8a19d6939c 100644
>> --- a/migration/options.h
>> +++ b/migration/options.h
>> @@ -56,6 +56,7 @@ bool migrate_zero_copy_send(void);
>> */
>>
>> bool migrate_multifd_flush_after_each_section(void);
>> +bool migrate_multifd_packets(void);
>> bool migrate_postcopy(void);
>> bool migrate_rdma(void);
>> bool migrate_tls(void);
>> --
>> 2.35.3
>>