[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v5 6/7] migration/multifd: implement qpl compression and decompre
From: |
Yuan Liu |
Subject: |
[PATCH v5 6/7] migration/multifd: implement qpl compression and decompression |
Date: |
Wed, 20 Mar 2024 00:45:26 +0800 |
each qpl job is used to (de)compress a normal page and it can
be processed independently by the IAA hardware. All qpl jobs
are submitted to the hardware at once, and wait for all jobs
completion.
Signed-off-by: Yuan Liu <yuan1.liu@intel.com>
Reviewed-by: Nanhai Zou <nanhai.zou@intel.com>
---
migration/multifd-qpl.c | 229 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 225 insertions(+), 4 deletions(-)
diff --git a/migration/multifd-qpl.c b/migration/multifd-qpl.c
index 6de65e9da7..479b051b24 100644
--- a/migration/multifd-qpl.c
+++ b/migration/multifd-qpl.c
@@ -13,6 +13,7 @@
#include "qemu/osdep.h"
#include "qemu/module.h"
#include "qapi/error.h"
+#include "exec/ramblock.h"
#include "migration.h"
#include "multifd.h"
#include "qpl/qpl.h"
@@ -171,6 +172,112 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error
**errp)
p->compress_data = NULL;
}
+static inline void prepare_job(qpl_job *job, uint8_t *input, uint32_t
input_len,
+ uint8_t *output, uint32_t output_len,
+ bool is_compression)
+{
+ job->op = is_compression ? qpl_op_compress : qpl_op_decompress;
+ job->next_in_ptr = input;
+ job->next_out_ptr = output;
+ job->available_in = input_len;
+ job->available_out = output_len;
+ job->flags = QPL_FLAG_FIRST | QPL_FLAG_LAST | QPL_FLAG_OMIT_VERIFY;
+ /* only supports one compression level */
+ job->level = 1;
+}
+
+/**
+ * set_raw_data_hdr: set the length of raw data
+ *
+ * If the length of the compressed output data is greater than or equal to
+ * the page size, then set the compressed data length to the data size and
+ * send raw data directly.
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the compression job header
+ */
+static inline void set_raw_data_hdr(QplData *qpl, uint32_t index)
+{
+ assert(index < qpl->job_num);
+ qpl->zbuf_hdr[index] = cpu_to_be32(qpl->data_size);
+}
+
+/**
+ * is_raw_data: check if the data is raw data
+ *
+ * The raw data length is always equal to data size, which is the
+ * size of one page.
+ *
+ * Returns true if the data is raw data, otherwise false
+ *
+ * @qpl: pointer to the QplData structure
+ * @index: the index of the decompressed job header
+ */
+static inline bool is_raw_data(QplData *qpl, uint32_t index)
+{
+ assert(index < qpl->job_num);
+ return qpl->zbuf_hdr[index] == qpl->data_size;
+}
+
+static int run_comp_jobs(MultiFDSendParams *p, Error **errp)
+{
+ qpl_status status;
+ QplData *qpl = p->compress_data;
+ MultiFDPages_t *pages = p->pages;
+ uint32_t job_num = pages->normal_num;
+ qpl_job *job = NULL;
+ uint32_t off = 0;
+
+ assert(job_num <= qpl->job_num);
+ /* submit all compression jobs */
+ for (int i = 0; i < job_num; i++) {
+ job = qpl->job_array[i];
+ /* the compressed data size should be less than one page */
+ prepare_job(job, pages->block->host + pages->offset[i], qpl->data_size,
+ qpl->zbuf + off, qpl->data_size - 1, true);
+retry:
+ status = qpl_submit_job(job);
+ if (status == QPL_STS_OK) {
+ off += qpl->data_size;
+ } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+ goto retry;
+ } else {
+ error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+ p->id, status);
+ return -1;
+ }
+ }
+
+ /* wait all jobs to complete */
+ for (int i = 0; i < job_num; i++) {
+ job = qpl->job_array[i];
+ status = qpl_wait_job(job);
+ if (status == QPL_STS_OK) {
+ qpl->zbuf_hdr[i] = cpu_to_be32(job->total_out);
+ p->iov[p->iovs_num].iov_len = job->total_out;
+ p->iov[p->iovs_num].iov_base = qpl->zbuf + (qpl->data_size * i);
+ p->next_packet_size += job->total_out;
+ } else if (status == QPL_STS_MORE_OUTPUT_NEEDED) {
+ /*
+ * the compression job does not fail, the output data
+ * size is larger than the provided memory size. In this
+ * case, raw data is sent directly to the destination.
+ */
+ set_raw_data_hdr(qpl, i);
+ p->iov[p->iovs_num].iov_len = qpl->data_size;
+ p->iov[p->iovs_num].iov_base = pages->block->host +
+ pages->offset[i];
+ p->next_packet_size += qpl->data_size;
+ } else {
+ error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+ p->id, status);
+ return -1;
+ }
+ p->iovs_num++;
+ }
+ return 0;
+}
+
/**
* qpl_send_prepare: prepare data to be able to send
*
@@ -184,8 +291,28 @@ static void qpl_send_cleanup(MultiFDSendParams *p, Error
**errp)
*/
static int qpl_send_prepare(MultiFDSendParams *p, Error **errp)
{
- /* Implement in next patch */
- return -1;
+ QplData *qpl = p->compress_data;
+ uint32_t hdr_size;
+
+ if (!multifd_send_prepare_common(p)) {
+ goto out;
+ }
+
+ assert(p->pages->normal_num <= qpl->job_num);
+ hdr_size = p->pages->normal_num * sizeof(uint32_t);
+ /* prepare the header that stores the lengths of all compressed data */
+ p->iov[1].iov_base = (uint8_t *) qpl->zbuf_hdr;
+ p->iov[1].iov_len = hdr_size;
+ p->iovs_num++;
+ p->next_packet_size += hdr_size;
+ if (run_comp_jobs(p, errp) != 0) {
+ return -1;
+ }
+
+out:
+ p->flags |= MULTIFD_FLAG_QPL;
+ multifd_send_fill_packet(p);
+ return 0;
}
/**
@@ -227,6 +354,60 @@ static void qpl_recv_cleanup(MultiFDRecvParams *p)
p->compress_data = NULL;
}
+static int run_decomp_jobs(MultiFDRecvParams *p, Error **errp)
+{
+ qpl_status status;
+ qpl_job *job;
+ QplData *qpl = p->compress_data;
+ uint32_t off = 0;
+ uint32_t job_num = p->normal_num;
+
+ assert(job_num <= qpl->job_num);
+ /* submit all decompression jobs */
+ for (int i = 0; i < job_num; i++) {
+ /* for the raw data, load it directly */
+ if (is_raw_data(qpl, i)) {
+ memcpy(p->host + p->normal[i], qpl->zbuf + off, qpl->data_size);
+ off += qpl->data_size;
+ continue;
+ }
+ job = qpl->job_array[i];
+ prepare_job(job, qpl->zbuf + off, qpl->zbuf_hdr[i],
+ p->host + p->normal[i], qpl->data_size, false);
+retry:
+ status = qpl_submit_job(job);
+ if (status == QPL_STS_OK) {
+ off += qpl->zbuf_hdr[i];
+ } else if (status == QPL_STS_QUEUES_ARE_BUSY_ERR) {
+ goto retry;
+ } else {
+ error_setg(errp, "multifd %u: qpl_submit_job failed with error %d",
+ p->id, status);
+ return -1;
+ }
+ }
+
+ /* wait all jobs to complete */
+ for (int i = 0; i < job_num; i++) {
+ if (is_raw_data(qpl, i)) {
+ continue;
+ }
+ job = qpl->job_array[i];
+ status = qpl_wait_job(job);
+ if (status != QPL_STS_OK) {
+ error_setg(errp, "multifd %u: qpl_wait_job failed with error %d",
+ p->id, status);
+ return -1;
+ }
+ if (job->total_out != qpl->data_size) {
+ error_setg(errp, "multifd %u: decompressed len %u, expected len
%u",
+ p->id, job->total_out, qpl->data_size);
+ return -1;
+ }
+ }
+ return 0;
+}
+
/**
* qpl_recv: read the data from the channel into actual pages
*
@@ -240,8 +421,48 @@ static void qpl_recv_cleanup(MultiFDRecvParams *p)
*/
static int qpl_recv(MultiFDRecvParams *p, Error **errp)
{
- /* Implement in next patch */
- return -1;
+ QplData *qpl = p->compress_data;
+ uint32_t in_size = p->next_packet_size;
+ uint32_t flags = p->flags & MULTIFD_FLAG_COMPRESSION_MASK;
+ uint32_t hdr_len = p->normal_num * sizeof(uint32_t);
+ uint32_t data_len = 0;
+ int ret;
+
+ if (flags != MULTIFD_FLAG_QPL) {
+ error_setg(errp, "multifd %u: flags received %x flags expected %x",
+ p->id, flags, MULTIFD_FLAG_QPL);
+ return -1;
+ }
+ multifd_recv_zero_page_process(p);
+ if (!p->normal_num) {
+ assert(in_size == 0);
+ return 0;
+ }
+
+ /* read compressed data lengths */
+ assert(hdr_len < in_size);
+ ret = qio_channel_read_all(p->c, (void *) qpl->zbuf_hdr, hdr_len, errp);
+ if (ret != 0) {
+ return ret;
+ }
+ assert(p->normal_num <= qpl->job_num);
+ for (int i = 0; i < p->normal_num; i++) {
+ qpl->zbuf_hdr[i] = be32_to_cpu(qpl->zbuf_hdr[i]);
+ data_len += qpl->zbuf_hdr[i];
+ assert(qpl->zbuf_hdr[i] <= qpl->data_size);
+ }
+
+ /* read compressed data */
+ assert(in_size == hdr_len + data_len);
+ ret = qio_channel_read_all(p->c, (void *) qpl->zbuf, data_len, errp);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (run_decomp_jobs(p, errp) != 0) {
+ return -1;
+ }
+ return 0;
}
static MultiFDMethods multifd_qpl_ops = {
--
2.39.3
Re: [PATCH v5 0/7] Live Migration With IAA, Peter Xu, 2024/03/26