[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH v4 04/33] thread-pool: Implement generic (non-AIO) pool support
From: |
Maciej S. Szmigiero |
Subject: |
[PATCH v4 04/33] thread-pool: Implement generic (non-AIO) pool support |
Date: |
Thu, 30 Jan 2025 11:08:25 +0100 |
From: "Maciej S. Szmigiero" <maciej.szmigiero@oracle.com>
Migration code wants to manage device data sending threads in one place.
QEMU has an existing thread pool implementation, however it is limited
to queuing AIO operations only and essentially has a 1:1 mapping between
the current AioContext and the AIO ThreadPool in use.
Implement generic (non-AIO) ThreadPool by essentially wrapping Glib's
GThreadPool.
This brings a few new operations on a pool:
* thread_pool_wait() operation waits until all the submitted work requests
have finished.
* thread_pool_set_max_threads() explicitly sets the maximum thread count
in the pool.
* thread_pool_adjust_max_threads_to_work() adjusts the maximum thread count
in the pool to equal the number of still waiting in queue or unfinished work.
Reviewed-by: Fabiano Rosas <farosas@suse.de>
Reviewed-by: Peter Xu <peterx@redhat.com>
Signed-off-by: Maciej S. Szmigiero <maciej.szmigiero@oracle.com>
---
include/block/thread-pool.h | 51 ++++++++++++++++
util/thread-pool.c | 119 ++++++++++++++++++++++++++++++++++++
2 files changed, 170 insertions(+)
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
index 6f27eb085b45..dd48cf07e85f 100644
--- a/include/block/thread-pool.h
+++ b/include/block/thread-pool.h
@@ -38,5 +38,56 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPoolFunc *func,
void *arg,
int coroutine_fn thread_pool_submit_co(ThreadPoolFunc *func, void *arg);
void thread_pool_update_params(ThreadPoolAio *pool, struct AioContext *ctx);
+/* ------------------------------------------- */
+/* Generic thread pool types and methods below */
+typedef struct ThreadPool ThreadPool;
+
+/* Create a new thread pool. Never returns NULL. */
+ThreadPool *thread_pool_new(void);
+
+/*
+ * Free the thread pool.
+ * Waits for all the previously submitted work to complete before performing
+ * the actual freeing operation.
+ */
+void thread_pool_free(ThreadPool *pool);
+
+/*
+ * Submit a new work (task) for the pool.
+ *
+ * @opaque_destroy is an optional GDestroyNotify for the @opaque argument
+ * to the work function at @func.
+ */
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+ void *opaque, GDestroyNotify opaque_destroy);
+
+/*
+ * Submit a new work (task) for the pool, making sure it starts getting
+ * processed immediately, launching a new thread for it if necessary.
+ *
+ * @opaque_destroy is an optional GDestroyNotify for the @opaque argument
+ * to the work function at @func.
+ */
+void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
+ void *opaque, GDestroyNotify opaque_destroy);
+
+/*
+ * Wait for all previously submitted work to complete before returning.
+ *
+ * Can be used as a barrier between two sets of tasks executed on a thread
+ * pool without destroying it or in a performance sensitive path where the
+ * caller just wants to wait for all tasks to complete while deferring the
+ * pool free operation for later, less performance sensitive time.
+ */
+void thread_pool_wait(ThreadPool *pool);
+
+/* Set the maximum number of threads in the pool. */
+bool thread_pool_set_max_threads(ThreadPool *pool, int max_threads);
+
+/*
+ * Adjust the maximum number of threads in the pool to give each task its
+ * own thread (exactly one thread per task).
+ */
+bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool);
#endif
diff --git a/util/thread-pool.c b/util/thread-pool.c
index 908194dc070f..d2ead6b72857 100644
--- a/util/thread-pool.c
+++ b/util/thread-pool.c
@@ -374,3 +374,122 @@ void thread_pool_free_aio(ThreadPoolAio *pool)
qemu_mutex_destroy(&pool->lock);
g_free(pool);
}
+
+struct ThreadPool {
+ GThreadPool *t;
+ size_t cur_work;
+ QemuMutex cur_work_lock;
+ QemuCond all_finished_cond;
+};
+
+typedef struct {
+ ThreadPoolFunc *func;
+ void *opaque;
+ GDestroyNotify opaque_destroy;
+} ThreadPoolElement;
+
+static void thread_pool_func(gpointer data, gpointer user_data)
+{
+ ThreadPool *pool = user_data;
+ g_autofree ThreadPoolElement *el = data;
+
+ el->func(el->opaque);
+
+ if (el->opaque_destroy) {
+ el->opaque_destroy(el->opaque);
+ }
+
+ QEMU_LOCK_GUARD(&pool->cur_work_lock);
+
+ assert(pool->cur_work > 0);
+ pool->cur_work--;
+
+ if (pool->cur_work == 0) {
+ qemu_cond_signal(&pool->all_finished_cond);
+ }
+}
+
+ThreadPool *thread_pool_new(void)
+{
+ ThreadPool *pool = g_new(ThreadPool, 1);
+
+ pool->cur_work = 0;
+ qemu_mutex_init(&pool->cur_work_lock);
+ qemu_cond_init(&pool->all_finished_cond);
+
+ pool->t = g_thread_pool_new(thread_pool_func, pool, 0, TRUE, NULL);
+ /*
+ * g_thread_pool_new() can only return errors if initial thread(s)
+ * creation fails but we ask for 0 initial threads above.
+ */
+ assert(pool->t);
+
+ return pool;
+}
+
+void thread_pool_free(ThreadPool *pool)
+{
+ /*
+ * With _wait = TRUE this effectively waits for all
+ * previously submitted work to complete first.
+ */
+ g_thread_pool_free(pool->t, FALSE, TRUE);
+
+ qemu_cond_destroy(&pool->all_finished_cond);
+ qemu_mutex_destroy(&pool->cur_work_lock);
+
+ g_free(pool);
+}
+
+void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func,
+ void *opaque, GDestroyNotify opaque_destroy)
+{
+ ThreadPoolElement *el = g_new(ThreadPoolElement, 1);
+
+ el->func = func;
+ el->opaque = opaque;
+ el->opaque_destroy = opaque_destroy;
+
+ WITH_QEMU_LOCK_GUARD(&pool->cur_work_lock) {
+ pool->cur_work++;
+ }
+
+ /*
+ * Ignore the return value since this function can only return errors
+ * if creation of an additional thread fails but even in this case the
+ * provided work is still getting queued (just for the existing threads).
+ */
+ g_thread_pool_push(pool->t, el, NULL);
+}
+
+void thread_pool_submit_immediate(ThreadPool *pool, ThreadPoolFunc *func,
+ void *opaque, GDestroyNotify opaque_destroy)
+{
+ thread_pool_submit(pool, func, opaque, opaque_destroy);
+ thread_pool_adjust_max_threads_to_work(pool);
+}
+
+void thread_pool_wait(ThreadPool *pool)
+{
+ QEMU_LOCK_GUARD(&pool->cur_work_lock);
+
+ while (pool->cur_work > 0) {
+ qemu_cond_wait(&pool->all_finished_cond,
+ &pool->cur_work_lock);
+ }
+}
+
+bool thread_pool_set_max_threads(ThreadPool *pool,
+ int max_threads)
+{
+ assert(max_threads > 0);
+
+ return g_thread_pool_set_max_threads(pool->t, max_threads, NULL);
+}
+
+bool thread_pool_adjust_max_threads_to_work(ThreadPool *pool)
+{
+ QEMU_LOCK_GUARD(&pool->cur_work_lock);
+
+ return thread_pool_set_max_threads(pool, pool->cur_work);
+}
- [PATCH v4 00/33] Multifd 🔀 device state transfer support with VFIO consumer, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 01/33] migration: Clarify that {load, save}_cleanup handlers can run without setup, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 03/33] thread-pool: Rename AIO pool functions to *_aio() and data types to *Aio, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 02/33] thread-pool: Remove thread_pool_submit() function, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 04/33] thread-pool: Implement generic (non-AIO) pool support,
Maciej S. Szmigiero <=
- [PATCH v4 05/33] migration: Add MIG_CMD_SWITCHOVER_START and its load handler, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 06/33] migration: Add qemu_loadvm_load_state_buffer() and its handler, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 07/33] io: tls: Allow terminating the TLS session gracefully with EOF, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 08/33] migration/multifd: Allow premature EOF on TLS incoming channels, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 09/33] migration: postcopy_ram_listen_thread() needs to take BQL for some calls, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 10/33] error: define g_autoptr() cleanup function for the Error type, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 11/33] migration: Add thread pool of optional load threads, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 12/33] migration/multifd: Split packet into header and RAM data, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 13/33] migration/multifd: Device state transfer support - receive side, Maciej S. Szmigiero, 2025/01/30
- [PATCH v4 14/33] migration/multifd: Make multifd_send() thread safe, Maciej S. Szmigiero, 2025/01/30