[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [RFC PATCH 05/14] rcu: add call_rcu
From: |
Mike Day |
Subject: |
[Qemu-devel] [RFC PATCH 05/14] rcu: add call_rcu |
Date: |
Wed, 14 Aug 2013 11:50:41 -0400 |
From: Paolo Bonzini <address@hidden>
Signed-off-by: Paolo Bonzini <address@hidden>
Reviewed-by: Mike Day <address@hidden>
---
docs/rcu.txt | 108 +++++++++++++++++++++++++++++++++++++++++++++--
include/qemu/rcu.h | 22 ++++++++++
util/rcu.c | 120 +++++++++++++++++++++++++++++++++++++++++++++++++++++
3 files changed, 246 insertions(+), 4 deletions(-)
diff --git a/docs/rcu.txt b/docs/rcu.txt
index 5736676..d7c4f0b 100644
--- a/docs/rcu.txt
+++ b/docs/rcu.txt
@@ -82,7 +82,50 @@ The core RCU API is small:
Note that it would be valid for another update to come while
synchronize_rcu is running. Because of this, it is better that
the updater releases any locks it may hold before calling
- synchronize_rcu.
+ synchronize_rcu. If this is not possible (for example, because
+ the updater is protected by the BQL), you can use call_rcu.
+
+ void call_rcu1(struct rcu_head * head,
+ void (*func)(struct rcu_head *head));
+
+ This function invokes func(head) after all pre-existing RCU
+ read-side critical sections on all threads have completed. This
+ marks the end of the removal phase, with func taking care
+ asynchronously of the reclamation phase.
+
+ The foo struct needs to have an rcu_head structure added,
+ perhaps as follows:
+
+ struct foo {
+ struct rcu_head rcu;
+ int a;
+ char b;
+ long c;
+ };
+
+ so that the reclaimer function can fetch the struct foo address
+ and free it:
+
+ call_rcu1(foo_reclaim, &foo.rcu);
+
+ void foo_reclaim(struct rcu_head *rp)
+ {
+ struct foo *fp = container_of(rp, struct foo, rcu);
+ g_free(fp);
+ }
+
+ For the common case where the rcu_head member is the first of the
+ struct, you can use the following macro.
+
+ void call_rcu(T *p,
+ void (*func)(T *p),
+ field-name);
+
+ call_rcu1 is typically used through this macro, in the common case
+ where the "struct rcu_head" is the first field in the struct. In
+ the above case, one could have written simply:
+
+ call_rcu(foo_reclaim, g_free, rcu);
typeof(*p) rcu_dereference(p);
typeof(p) rcu_assign_pointer(p, typeof(p) v);
@@ -173,6 +216,11 @@ DIFFERENCES WITH LINUX
- rcu_dereference takes a _pointer_ to the variable being accessed.
Wrong usage will be detected by the compiler.
+- call_rcu is a macro that has an extra argument (the name of the first
+ field in the struct, which must be a struct rcu_head), and expects the
+ type of the callback's argument to be the type of the first argument.
+ call_rcu1 is the same as Linux's call_rcu.
+
- Quiescent points must be marked explicitly unless the thread uses
condvars/semaphores/events for synchronization.
@@ -229,7 +277,47 @@ The write side looks simply like this (with appropriate
locking):
synchronize_rcu();
free(old);
-Note that the same idiom would be possible with reader/writer
+If the processing cannot be done purely within the critical section, it
+is possible to combine this idiom with a "real" reference count:
+
+ rcu_read_lock();
+ p = rcu_dereference(&foo);
+ foo_ref(p);
+ rcu_read_unlock();
+ /* do something with p. */
+ foo_unref(p);
+
+The write side can be like this:
+
+ qemu_mutex_lock(&foo_mutex);
+ old = foo;
+ rcu_assign_pointer(foo, new);
+ qemu_mutex_unlock(&foo_mutex);
+ synchronize_rcu();
+ foo_unref(old);
+
+or with call_rcu:
+
+ qemu_mutex_lock(&foo_mutex);
+ old = foo;
+ rcu_assign_pointer(foo, new);
+ qemu_mutex_unlock(&foo_mutex);
+ call_rcu(foo_unref, old, rcu);
+
+In both cases, the write side only performs removal. Reclamation
+happens when the last reference to a "foo" object is dropped.
+Using synchronize_rcu() is undesirably expensive, because the
+last reference may be dropped on the read side. Hence you can
+use call_rcu() instead:
+
+ foo_unref(struct foo *p) {
+ if (atomic_dec(&p->refcount) == 0) {
+ call_rcu(foo_destroy, p, rcu);
+ }
+ }
+
+
+Note that the same idioms would be possible with reader/writer
locks:
read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
@@ -239,13 +327,25 @@ locks:
write_mutex_unlock(&foo_rwlock);
free(p);
+ ------------------------------------------------------------------
+
+ read_lock(&foo_rwlock); write_mutex_lock(&foo_rwlock);
+ p = foo; old = foo;
+ foo_ref(p); foo = new;
+ read_unlock(&foo_rwlock); write_mutex_unlock(&foo_rwlock);
+ /* do something with p. */ foo_unref(old);
+ foo_unref(p);
+
+foo_unref could use a mechanism such as bottom halves to move deallocation
+out of hot paths.
+
RCU resizable arrays
--------------------
Resizable arrays can be used with RCU. The expensive RCU synchronization
-only needs to take place when the array is resized. The two items to
-take care of are:
+(or call_rcu) only needs to take place when the array is resized.
+The two items to take care of are:
- ensuring that the old version of the array is available between removal
and reclamation;
diff --git a/include/qemu/rcu.h b/include/qemu/rcu.h
index b875593..e43b912 100644
--- a/include/qemu/rcu.h
+++ b/include/qemu/rcu.h
@@ -161,6 +161,28 @@ extern void synchronize_rcu(void);
extern void rcu_register_thread(void);
extern void rcu_unregister_thread(void);
+struct rcu_head;
+typedef void RCUCBFunc(struct rcu_head *head);
+
+struct rcu_head {
+ struct rcu_head *next;
+ RCUCBFunc *func;
+};
+
+extern void call_rcu1(struct rcu_head *head, RCUCBFunc *func);
+
+/* The operands of the minus operator must have the same type,
+ * which must be the one that we specify in the cast.
+ */
+#define call_rcu(head, func, field) \
+ call_rcu1(({ \
+ char __attribute__((unused)) \
+ offset_must_be_zero[-offsetof(typeof(*(head)), field)], \
+ func_type_invalid = (func) - (void (*)(typeof(head)))(func); \
+ &(head)->field; \
+ }), \
+ (RCUCBFunc *)(func))
+
#ifdef __cplusplus
}
#endif
diff --git a/util/rcu.c b/util/rcu.c
index 48686a3..27fda86 100644
--- a/util/rcu.c
+++ b/util/rcu.c
@@ -26,6 +26,7 @@
* IBM's contributions to this file may be relicensed under LGPLv2 or later.
*/
+#include "qemu-common.h"
#include <stdio.h>
#include <assert.h>
#include <stdlib.h>
@@ -33,6 +34,7 @@
#include <errno.h>
#include "qemu/rcu.h"
#include "qemu/atomic.h"
+#include "qemu/thread.h"
/*
* Global grace period counter. Bit 0 is one if the thread is online.
@@ -174,6 +176,119 @@ void synchronize_rcu(void)
}
}
+
+#define RCU_CALL_MIN_SIZE 30
+
+/* Multi-producer, single-consumer queue based on urcu/static/wfqueue.h
+ * from liburcu. Note that head is only used by the consumer.
+ */
+static struct rcu_head dummy;
+static struct rcu_head *head = &dummy, **tail = &dummy.next;
+static int rcu_call_count;
+static QemuEvent rcu_call_ready_event;
+
+static void enqueue(struct rcu_head *node)
+{
+ struct rcu_head **old_tail;
+
+ node->next = NULL;
+ old_tail = atomic_xchg(&tail, &node->next);
+ atomic_mb_set(old_tail, node);
+}
+
+static struct rcu_head *try_dequeue(void)
+{
+ struct rcu_head *node, *next;
+
+retry:
+ /* Test for an empty list, which we do not expect. Note that for
+ * the consumer head and tail are always consistent. The head
+ * is consistent because only the consumer reads/writes it.
+ * The tail, because it is the first step in the enqueuing.
+ * It is only the next pointers that might be inconsistent.
+ */
+ if (head == &dummy && atomic_mb_read(&tail) == &dummy.next) {
+ abort();
+ }
+
+ /* If the head node has NULL in its next pointer, the value is
+ * wrong and we need to wait until its enqueuer finishes the update.
+ */
+ node = head;
+ next = atomic_mb_read(&head->next);
+ if (!next) {
+ return NULL;
+ }
+
+ /* Since we are the sole consumer, and can_dequeue() excludes the
+ * empty case, the queue will always have at least two nodes: the
+ * dummy node, and the one being removed. So we do not need to update
+ * the tail pointer.
+ */
+ head = next;
+
+ /* If we dequeued the dummy node, add it back at the end and retry. */
+ if (node == &dummy) {
+ enqueue(node);
+ goto retry;
+ }
+
+ return node;
+}
+
+static void *call_rcu_thread(void *opaque)
+{
+ struct rcu_head *node;
+
+ /* This thread is just a writer. */
+ rcu_thread_offline();
+
+ for (;;) {
+ int tries = 0;
+ int n = atomic_read(&rcu_call_count);
+
+ /* Heuristically wait for a decent number of callbacks to pile up.
+ * Fetch rcu_call_count now, we only must process elements that were
+ * added before synchronize_rcu() starts.
+ */
+ while (n < RCU_CALL_MIN_SIZE && ++tries <= 5) {
+ g_usleep(100000);
+ qemu_event_reset(&rcu_call_ready_event);
+ n = atomic_read(&rcu_call_count);
+ if (n < RCU_CALL_MIN_SIZE) {
+ qemu_event_wait(&rcu_call_ready_event);
+ n = atomic_read(&rcu_call_count);
+ }
+ }
+
+ atomic_sub(&rcu_call_count, n);
+ synchronize_rcu();
+ while (n > 0) {
+ node = try_dequeue();
+ while (!node) {
+ qemu_event_reset(&rcu_call_ready_event);
+ node = try_dequeue();
+ if (!node) {
+ qemu_event_wait(&rcu_call_ready_event);
+ node = try_dequeue();
+ }
+ }
+
+ n--;
+ node->func(node);
+ }
+ }
+ abort();
+}
+
+void call_rcu1(struct rcu_head *node, void (*func)(struct rcu_head *node))
+{
+ node->func = func;
+ enqueue(node);
+ atomic_inc(&rcu_call_count);
+ qemu_event_set(&rcu_call_ready_event);
+}
+
void rcu_register_thread(void)
{
if (!get_rcu_reader()) {
@@ -197,7 +312,12 @@ void rcu_unregister_thread(void)
static void __attribute__((__constructor__)) rcu_init(void)
{
+ QemuThread thread;
+
qemu_mutex_init(&rcu_gp_lock);
qemu_event_init(&rcu_gp_event, true);
+
+ qemu_event_init(&rcu_call_ready_event, false);
+ qemu_thread_create(&thread, call_rcu_thread, NULL, QEMU_THREAD_DETACHED);
rcu_register_thread();
}
--
1.8.3.1
- [Qemu-devel] [RFC PATCH 00/14] RCU Implementation for Qemu, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 01/14] qemu-thread: add QemuEvent, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 02/14] rcu: add rcu library, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 03/14] fix #include directive for rcu header, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 04/14] qemu-thread: register threads with RCU, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 05/14] rcu: add call_rcu,
Mike Day <=
- [Qemu-devel] [RFC PATCH 06/14] rcu: add rcutorture, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 07/14] rcu: allow nested calls to rcu_thread_offline/rcu_thread_online, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 09/14] event loop: report RCU quiescent states, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 08/14] qemu-thread: report RCU quiescent states, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 10/14] cpus: report RCU quiescent states, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 11/14] block: report RCU quiescent states, Mike Day, 2013/08/14
- [Qemu-devel] [RFC PATCH 12/14] migration: report RCU quiescent states, Mike Day, 2013/08/14
- [Qemu-devel] [PATCH 13/14] include osdep.h for definition of glue(a, b), Mike Day, 2013/08/14
- [Qemu-devel] [PATCH 14/14] fix pointer reference to rcu_assign_pointer, Mike Day, 2013/08/14