[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [PATCH v2 2/3] thread-pool: replace semaphore with condition variabl
From: |
Nicolas Saenz Julienne |
Subject: |
Re: [PATCH v2 2/3] thread-pool: replace semaphore with condition variable |
Date: |
Fri, 13 May 2022 13:56:39 +0200 |
User-agent: |
Evolution 3.44.1 (3.44.1-1.fc36) |
Hi Paolo,
On Thu, 2022-05-12 at 12:43 +0200, Paolo Bonzini wrote:
[...]
> diff --git a/util/thread-pool.c b/util/thread-pool.c
> index 4979f30ca3..da189d9338 100644
> --- a/util/thread-pool.c
> +++ b/util/thread-pool.c
> @@ -57,7 +57,7 @@ struct ThreadPool {
> QEMUBH *completion_bh;
> QemuMutex lock;
> QemuCond worker_stopped;
> - QemuSemaphore sem;
> + QemuCond request_cond;
> QEMUBH *new_thread_bh;
>
> /* The following variables are only accessed from one
> AioContext. */
> @@ -74,23 +74,6 @@ struct ThreadPool {
> int max_threads;
> };
>
> -static inline bool back_to_sleep(ThreadPool *pool, int ret)
> -{
> - /*
> - * The semaphore timed out, we should exit the loop except when:
> - * - There is work to do, we raced with the signal.
> - * - The max threads threshold just changed, we raced with the
> signal.
> - * - The thread pool forces a minimum number of readily
> available threads.
> - */
> - if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
> - pool->cur_threads > pool->max_threads ||
> - pool->cur_threads <= pool->min_threads)) {
> - return true;
> - }
> -
> - return false;
> -}
> -
> static void *worker_thread(void *opaque)
> {
> ThreadPool *pool = opaque;
> @@ -99,20 +82,27 @@ static void *worker_thread(void *opaque)
> pool->pending_threads--;
> do_spawn_thread(pool);
>
> - while (!pool->stopping) {
> + while (!pool->stopping && pool->cur_threads <= pool-
> >max_threads) {
> ThreadPoolElement *req;
> int ret;
>
> - do {
> + if (QTAILQ_EMPTY(&pool->request_list)) {
> pool->idle_threads++;
> - qemu_mutex_unlock(&pool->lock);
> - ret = qemu_sem_timedwait(&pool->sem, 10000);
> - qemu_mutex_lock(&pool->lock);
> + ret = qemu_cond_timedwait(&pool->request_cond, &pool-
> >lock, 10000);
> pool->idle_threads--;
> - } while (back_to_sleep(pool, ret));
> - if (ret == -1 || pool->stopping ||
I think, you need to check for 'pool->stopping' upon exiting wait_cond().
Otherwise it'll blindly try to dequeue a request from a list that is otherwise
empty.
BTW, I see there is no thread_pool_free() unit test.
> - pool->cur_threads > pool->max_threads) {
> - break;
> + if (ret == 0) {
> + if (QTAILQ_EMPTY(&pool->request_list) &&
> + pool->cur_threads > pool->min_threads) {
> + /* Timed out + no work to do + no need for warm
> threads = exit. */
> + break;
> + } else {
> + /*
> + * Even if there is some work to do, check if
> there aren't
> + * too many worker threads before picking it up.
> + */
> + continue;
> + }
> + }
> }
>
> req = QTAILQ_FIRST(&pool->request_list);
> @@ -229,13 +219,7 @@ static void thread_pool_cancel(BlockAIOCB *acb)
> trace_thread_pool_cancel(elem, elem->common.opaque);
>
> QEMU_LOCK_GUARD(&pool->lock);
> - if (elem->state == THREAD_QUEUED &&
> - /* No thread has yet started working on elem. we can try to
> "steal"
> - * the item from the worker if we can get a signal from the
> - * semaphore. Because this is non-blocking, we can do it
> with
> - * the lock taken and ensure that elem will remain
> THREAD_QUEUED.
> - */
> - qemu_sem_timedwait(&pool->sem, 0) == 0) {
> + if (elem->state == THREAD_QUEUED) {
> QTAILQ_REMOVE(&pool->request_list, elem, reqs);
> qemu_bh_schedule(pool->completion_bh);
The 'thread-pool cancel' unit test fails.
I think it's because there is an assumption in worker_thread() that if you get
woken up, you'll have a pending request. And you're now 'stealing' work
requests, without 'stealing' a wakeup (what qemu_sem_timedwait(sem, 0) achieved
in the past).
Regards,
--
Nicolás Sáenz