qemu-devel
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

Re: [PATCH 6/6] aiopool: protect with a mutex


From: Vladimir Sementsov-Ogievskiy
Subject: Re: [PATCH 6/6] aiopool: protect with a mutex
Date: Mon, 10 May 2021 14:56:33 +0300
User-agent: Mozilla/5.0 (X11; Linux x86_64; rv:78.0) Gecko/20100101 Thunderbird/78.10.1

10.05.2021 11:59, Emanuele Giuseppe Esposito wrote:
Divide the fields in AioTaskPool in IN and Status, and
introduce a CoQueue instead of .wait to take care of suspending
and resuming the calling coroutine, and a lock to protect the
busy_tasks counter accesses and the AioTask .ret field.

"and" in commit message is almost always a sign, that there should be several 
commits :)

Please, do at least refactoring to drop "main_co" in separate of adding a mutex.

Hmm, actually, that was done in Denis's series "[PATCH v8 0/6] block: seriously 
improve savevm/loadvm performance". 
(https://patchew.org/QEMU/20200709132644.28470-1-den@openvz.org/)

Probably, you could reuse patches 01,02 of it.

(add Den to cc)


Signed-off-by: Emanuele Giuseppe Esposito <eesposit@redhat.com>
---
  block/aio_task.c         | 63 ++++++++++++++++++++++++----------------
  include/block/aio_task.h |  2 +-
  2 files changed, 39 insertions(+), 26 deletions(-)

diff --git a/block/aio_task.c b/block/aio_task.c
index 88989fa248..7ac6b5dd72 100644
--- a/block/aio_task.c
+++ b/block/aio_task.c
@@ -27,62 +27,70 @@
  #include "block/aio_task.h"
struct AioTaskPool {
-    Coroutine *main_co;
-    int status;
+    /* IN: just set in aio_task_pool_new and never modified */
      int max_busy_tasks;
+
+    /* Status: either atomic or protected by the lock */
+    int status;
      int busy_tasks;
-    bool waiting;
+    CoQueue queue;
+    CoMutex lock;
  };
static void coroutine_fn aio_task_co(void *opaque)
  {
+    int ret;
      AioTask *task = opaque;
      AioTaskPool *pool = task->pool;
- assert(pool->busy_tasks < pool->max_busy_tasks);
-    pool->busy_tasks++;
+    WITH_QEMU_LOCK_GUARD(&pool->lock) {
+        assert(pool->busy_tasks < pool->max_busy_tasks);
+        pool->busy_tasks++;
- task->ret = task->func(task);
+        ret = task->func(task);
+        task->ret = ret;
- pool->busy_tasks--;
+        pool->busy_tasks--;
+    }
- if (task->ret < 0 && pool->status == 0) {
-        pool->status = task->ret;
+    if (ret < 0) {
+        qatomic_cmpxchg(&pool->status, 0, ret);
      }

Can we just do it inside critical section above and avoid extra cmpxchg? We'll 
need just qatomic_set as a pair to qatomic_read()

g_free(task); - if (pool->waiting) {
-        pool->waiting = false;
-        aio_co_wake(pool->main_co);
-    }
+    qemu_co_queue_next(&pool->queue);

this call doesn't need mutex protection? Then we should modify comment insid 
AioTaskPool structure.

Anyway, I think it's simpler to just have one QEMU_MUTEX_GUARD() for the whole 
function.

  }
-void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool)
+/* Called with lock held */

again, such things usually called _locked().

+static void coroutine_fn aio_task_pool_wait_one_unlocked(AioTaskPool *pool)
  {
      assert(pool->busy_tasks > 0);
-    assert(qemu_coroutine_self() == pool->main_co);
-
-    pool->waiting = true;
-    qemu_coroutine_yield();
-
-    assert(!pool->waiting);
+    qemu_co_queue_wait(&pool->queue, &pool->lock);
      assert(pool->busy_tasks < pool->max_busy_tasks);
  }
+void coroutine_fn aio_task_pool_wait_one(AioTaskPool *pool)
+{
+    QEMU_LOCK_GUARD(&pool->lock);
+    aio_task_pool_wait_one_unlocked(pool);
+}
+
  void coroutine_fn aio_task_pool_wait_slot(AioTaskPool *pool)
  {
+    QEMU_LOCK_GUARD(&pool->lock);
      if (pool->busy_tasks < pool->max_busy_tasks) {
          return;
      }
- aio_task_pool_wait_one(pool);
+    aio_task_pool_wait_one_unlocked(pool);
  }
void coroutine_fn aio_task_pool_wait_all(AioTaskPool *pool)
  {
+    QEMU_LOCK_GUARD(&pool->lock);
      while (pool->busy_tasks > 0) {
-        aio_task_pool_wait_one(pool);
+        aio_task_pool_wait_one_unlocked(pool);
      }
  }
@@ -98,8 +106,8 @@ AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks)
  {
      AioTaskPool *pool = g_new0(AioTaskPool, 1);
- pool->main_co = qemu_coroutine_self();
      pool->max_busy_tasks = max_busy_tasks;
+    qemu_co_queue_init(&pool->queue);
return pool;
  }
@@ -115,10 +123,15 @@ int aio_task_pool_status(AioTaskPool *pool)
          return 0; /* Sugar for lazy allocation of aio pool */
      }
- return pool->status;
+    return qatomic_read(&pool->status);
  }
bool aio_task_pool_empty(AioTaskPool *pool)
  {
-    return pool->busy_tasks == 0;
+    int tasks;
+
+    qemu_co_mutex_lock(&pool->lock);
+    tasks = pool->busy_tasks;
+    qemu_co_mutex_unlock(&pool->lock);
+    return tasks == 0;
  }
diff --git a/include/block/aio_task.h b/include/block/aio_task.h
index 50bc1e1817..b22a4310aa 100644
--- a/include/block/aio_task.h
+++ b/include/block/aio_task.h
@@ -33,7 +33,7 @@ typedef int coroutine_fn (*AioTaskFunc)(AioTask *task);
  struct AioTask {
      AioTaskPool *pool;
      AioTaskFunc func;
-    int ret;
+    int ret; /* atomic */
  };
AioTaskPool *coroutine_fn aio_task_pool_new(int max_busy_tasks);



--
Best regards,
Vladimir



reply via email to

[Prev in Thread] Current Thread [Next in Thread]