qemu-block
[Top][All Lists]
Advanced

[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]

[PATCH v3 4/6] util: implement seqcache


From: Vladimir Sementsov-Ogievskiy
Subject: [PATCH v3 4/6] util: implement seqcache
Date: Fri, 5 Mar 2021 20:35:05 +0300

Implement cache for small sequential unaligned writes, so that they may
be cached until we get a complete cluster and then write it.

The cache is intended to be used for backup to qcow2 compressed target
opened in O_DIRECT mode, but can be reused for any similar (even not
block-layer related) task.

Signed-off-by: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
---
 include/qemu/seqcache.h |  42 +++++
 util/seqcache.c         | 361 ++++++++++++++++++++++++++++++++++++++++
 MAINTAINERS             |   6 +
 util/meson.build        |   1 +
 4 files changed, 410 insertions(+)
 create mode 100644 include/qemu/seqcache.h
 create mode 100644 util/seqcache.c

diff --git a/include/qemu/seqcache.h b/include/qemu/seqcache.h
new file mode 100644
index 0000000000..a308d54b01
--- /dev/null
+++ b/include/qemu/seqcache.h
@@ -0,0 +1,42 @@
+/*
+ * Cache for small sequential write requests.
+ *
+ * Copyright (c) 2021 Virtuozzo International GmbH.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to 
deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ */
+
+#ifndef QEMU_SEQCACHE_H
+#define QEMU_SEQCACHE_H
+
+typedef struct SeqCache SeqCache;
+
+SeqCache *seqcache_new(int64_t cluster_size);
+void seqcache_free(SeqCache *s);
+
+void seqcache_write(SeqCache *s, int64_t offset, int64_t bytes, uint8_t *buf);
+int64_t seqcache_read(SeqCache *s, int64_t offset, int64_t bytes, uint8_t 
*buf);
+
+bool seqcache_get_next_flush(SeqCache *s, int64_t *offset, int64_t *bytes,
+                             uint8_t **buf, bool *unfinished);
+void seqcache_discard_cluster(SeqCache *s, int64_t offset);
+
+uint64_t seqcache_nb_clusters(SeqCache *s);
+
+#endif /* QEMU_SEQCACHE_H */
diff --git a/util/seqcache.c b/util/seqcache.c
new file mode 100644
index 0000000000..d923560eab
--- /dev/null
+++ b/util/seqcache.c
@@ -0,0 +1,361 @@
+/*
+ * Cache for small sequential write requests.
+ *
+ * Copyright (c) 2021 Virtuozzo International GmbH.
+ *
+ * Permission is hereby granted, free of charge, to any person obtaining a copy
+ * of this software and associated documentation files (the "Software"), to 
deal
+ * in the Software without restriction, including without limitation the rights
+ * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+ * copies of the Software, and to permit persons to whom the Software is
+ * furnished to do so, subject to the following conditions:
+ *
+ * The above copyright notice and this permission notice shall be included in
+ * all copies or substantial portions of the Software.
+ *
+ * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+ * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+ * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
+ * THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+ * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 
FROM,
+ * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+ * THE SOFTWARE.
+ *
+ *
+ * = Description =
+ *
+ * SeqCache is an abbreviation for Sequential Cache.
+ *
+ * The Cache is intended to improve performance of small unaligned sequential
+ * writes. Cache has a cluster_size parameter and the unit of caching is 
aligned
+ * cluster.  Cache has a list of cached clusters, several "finished" ones and 
at
+ * most one "unfinished". "unfinished" cluster is a cluster where last byte of
+ * last write operation is cached and there is a free place after that byte to
+ * the end of cluster. "finished" clusters are just stored in cache to be read
+ * or flushed and don't allow any writes to them.
+ *
+ * If write to the cache intersects cluster bounds, it's splat into several
+ * requests by cluster bounds. So, consider a write that doesn't intersect
+ * cluster bounds to describe the whole picture:
+ *
+ * There are two cases allowed:
+ *
+ * 1. Sequential write to "unfinished" cluster. Actually it's a write 
sequential
+ *    previous write. The data goes to "unfinished" cluster. If "unfinished" is
+ *    filled up to the cluster bound it becomes "finished".
+ *
+ * 2. Write to new cluster, not existing in the cache. It may be sequential to
+ *    previous write or not. Current "unfinshed" cluster (if exists) becomes
+ *    "finished" and new "unfinished" cluster is started. Note also that offset
+ *    of the write to new cluster is not required to be aligned.
+ *
+ *    Any other write operation (non-sequential write to "unfinished" cluster
+ *    or write to any of "finished" clusters) will crash.
+ */
+
+#include "qemu/osdep.h"
+
+#include "qemu/queue.h"
+#include "qemu/seqcache.h"
+
+/*
+ * Cluster
+ *
+ * Representation of one cached cluster, aligned to SeqCache::cluster_size.
+ * Caches only one subregion of the cluster, started at @offset (may be
+ * unaligned to cluster_size) and of @bytes length (may be unaligned as well).
+ * The whole subregion always lay in one aligned cluster:
+ *
+ *      QEMU_ALIGN_DOWN(offset, cluster_size) ==
+ *          QEMU_ALIGN_DOWN(offset + bytes - 1, cluster_size)
+ *
+ * @buf is allocated to be able to fill the cluster up to the cluster end, i.e.
+ * allocated @buf length is at least:
+ *
+ *      cluster_size - offset % cluster_size
+ */
+typedef struct Cluster {
+   int64_t offset;
+   int64_t bytes;
+   uint8_t *buf;
+   QSIMPLEQ_ENTRY(Cluster) entry;
+} Cluster;
+
+/*
+ * SeqCache caches small sequential writes into "unfinished" @cur_write 
cluster,
+ * until entire cluster (of @cluster_size bytes) is filled by seqcache_write()
+ * calls.
+ *
+ * @cur_write->offset may be unaligned to @cluster_size if first write offset 
is
+ * not aligned (for example, if there was a flush request and cache was 
flushed,
+ * then we continue from the middle of the cluster with an empty cache).
+ *
+ * @cur_write may be NULL, which means we don't have current cluster and next
+ * seqcache_write() will start a new one.
+ *
+ * @all is a list of all clusters cached in the cache, some "finished" and one
+ * "unfinished" @cur_write (if exists). If @cur_write is not NULL it is a last
+ * one in the list.
+ *
+ * @nb_clusters is number of elements in @all list.
+ *
+ * @next_flush is an iterator for flushing. SeqCache knows nothing about how
+ * data should be flushing, so for flush user calls seqcache_get_next_flush()
+ * (which moves @next_flush iterator) and when data is flushed somehow and 
cache
+ * is not needed anymore, user can call seqcache_discard_cluster().
+ */
+typedef struct SeqCache {
+    int64_t cluster_size;
+    int64_t nb_clusters;
+    Cluster *cur_write;
+    Cluster *next_flush;
+    QSIMPLEQ_HEAD(, Cluster) all;
+} SeqCache;
+
+static void cluster_free(Cluster *req)
+{
+    if (req) {
+        g_free(req->buf);
+        g_free(req);
+    }
+}
+
+SeqCache *seqcache_new(int64_t cluster_size)
+{
+    SeqCache *s = g_new(SeqCache, 1);
+
+    *s = (SeqCache) {
+        .cluster_size = cluster_size,
+    };
+
+    QSIMPLEQ_INIT(&s->all);
+
+    return s;
+}
+
+/*
+ * User should clean the cache calling seqcache_get_next_flush() and
+ * seqcache_discard_cluster() sequentially before freeing it.
+ */
+void seqcache_free(SeqCache *s)
+{
+    if (s) {
+        assert(QSIMPLEQ_EMPTY(&s->all));
+        g_free(s);
+    }
+}
+
+/* End of cached region inside one cluster */
+static int64_t cached_end(Cluster *cl)
+{
+    return cl->offset + cl->bytes;
+}
+
+/* Start of aligned cluster containing the @offset */
+static int64_t cluster_start(SeqCache *s, int64_t offset)
+{
+    return QEMU_ALIGN_DOWN(offset, s->cluster_size);
+}
+
+/* End of aligned cluster containing the @offset */
+static int64_t cluster_end(SeqCache *s, int64_t offset)
+{
+    return cluster_start(s, offset) + s->cluster_size;
+}
+
+/* Align down @offset to s->cluster_size and search for corresponding cluster 
*/
+static Cluster *seqcache_find_cluster(SeqCache *s, int64_t offset)
+{
+    Cluster *cl;
+    int64_t cl_start = cluster_start(s, offset);
+
+    QSIMPLEQ_FOREACH(cl, &s->all, entry) {
+        if (cluster_start(s, cl->offset) == cl_start) {
+            return cl;
+        }
+    }
+
+    return NULL;
+}
+
+/* Makes current "unfinished" cluster the "finished" one. */
+static void seqcache_finalize_current_cluster(SeqCache *s)
+{
+    if (s->cur_write && !s->next_flush) {
+        s->next_flush = s->cur_write;
+    }
+    s->cur_write = NULL;
+}
+
+/*
+ * Write an @offset, @bytes, @buf request into the cache. The requests MUST not
+ * intersect cluster bounds.
+ */
+static void seqcache_write_one(SeqCache *s, int64_t offset, int64_t bytes,
+                               uint8_t *buf)
+{
+    assert(bytes > 0);
+    assert(cluster_start(s, offset) == cluster_start(s, offset + bytes - 1));
+
+    if (s->cur_write && offset == cached_end(s->cur_write)) {
+        /* Continue sequential process */
+        memcpy(s->cur_write->buf + s->cur_write->bytes, buf, bytes);
+        s->cur_write->bytes += bytes;
+
+        if (cached_end(s->cur_write) == cluster_end(s, s->cur_write->offset)) {
+            seqcache_finalize_current_cluster(s);
+        }
+
+        return;
+    }
+
+    /* We are starting a new cluster. Check that it's really new */
+    assert(!seqcache_find_cluster(s, offset));
+
+    seqcache_finalize_current_cluster(s);
+
+    s->cur_write = g_new(Cluster, 1);
+    *s->cur_write = (Cluster) {
+        .offset = offset,
+        .bytes = bytes,
+        .buf = g_malloc(s->cluster_size),
+    };
+
+    memcpy(s->cur_write->buf, buf, bytes);
+    QSIMPLEQ_INSERT_TAIL(&s->all, s->cur_write, entry);
+    s->nb_clusters++;
+}
+
+/* Write an @offset, @bytes, @buf request into the cache. */
+void seqcache_write(SeqCache *s, int64_t offset, int64_t bytes, uint8_t *buf)
+{
+    while (bytes) {
+        int64_t cl_end = cluster_end(s, offset);
+        int64_t chunk = MIN(bytes, cl_end - offset);
+
+        seqcache_write_one(s, offset, chunk, buf);
+        offset += chunk;
+        bytes -= chunk;
+        buf += chunk;
+    }
+}
+
+/*
+ * Read from the cache.
+ *
+ * If @offset misses the cache, return 0.
+ *
+ * If @offset is inside the cache, copy corresponding available data to @buf
+ * (may be less than required @bytes if hole reached earlier) and return number
+ * of bytes copied.
+ */
+int64_t seqcache_read(SeqCache *s, int64_t offset, int64_t bytes, uint8_t *buf)
+{
+    uint8_t *pos = buf;
+
+    while (bytes) {
+        Cluster *cl = seqcache_find_cluster(s, offset);
+        int64_t chunk;
+
+        if (!cl || offset < cl->offset || offset >= cached_end(cl)) {
+            break;
+        }
+
+        chunk = MIN(bytes, cached_end(cl) - offset);
+        memcpy(pos, cl->buf + offset - cl->offset, chunk);
+        offset += chunk;
+        bytes -= chunk;
+        pos += chunk;
+
+        if (!QEMU_IS_ALIGNED(offset, s->cluster_size)) {
+            break;
+        }
+    }
+
+    return pos - buf;
+}
+
+/*
+ * Get next region for flushing. @offset, @bytes and @buf are out-parameters
+ * to return the region.
+ *
+ * @unfinished is in-out argument which means that user is interested in
+ * flushing unfinished cluster too:
+ *
+ * If there are "finished" clusters, "finished" cluster is returned and
+ * *@unfinished is set to false, independently of its original value.
+ *
+ * If there are no "finished" clusters but "unfinished" exists (i.e.
+ * s->cur_write != NULL and it is the only element of s->all), then 
*@unfinished
+ * value remains the same and the following logic works:
+ *
+ *    If *@unfinished:
+ *       return s->cur_write unfinished cluster for flushing
+ *    Else
+ *       return nothing
+ *
+ *
+ * Returns true and set @offset, @bytes, @buf and @unfinished if there is
+ * something to flush (accordingly to @unfinished value), returns false
+ * otherwise.
+ *
+ * Nothing is removed from the cache.
+ */
+bool seqcache_get_next_flush(SeqCache *s, int64_t *offset, int64_t *bytes,
+                             uint8_t **buf, bool *unfinished)
+{
+    Cluster *req = s->next_flush;
+
+    if (s->next_flush) {
+        *unfinished = false;
+        req = s->next_flush;
+        s->next_flush = QSIMPLEQ_NEXT(req, entry);
+        if (s->next_flush == s->cur_write) {
+            s->next_flush = NULL;
+        }
+    } else if (s->cur_write && *unfinished) {
+        req = s->cur_write;
+    } else {
+        return false;
+    }
+
+    *offset = req->offset;
+    *bytes = req->bytes;
+    *buf = req->buf;
+
+    return true;
+}
+
+/*
+ * Find corresponding cluster and drop it. No matter does requested @offset is
+ * cached itself or not.
+ */
+void seqcache_discard_cluster(SeqCache *s, int64_t offset)
+{
+    Cluster *cl = seqcache_find_cluster(s, offset);
+
+    if (!cl) {
+        return;
+    }
+
+    if (cl == s->cur_write) {
+        assert(cl != s->next_flush);
+        s->cur_write = NULL;
+    } else if (cl == s->next_flush) {
+        assert(cl != s->cur_write);
+        s->next_flush = QSIMPLEQ_NEXT(s->next_flush, entry);
+        if (s->next_flush == s->cur_write) {
+            s->next_flush = NULL;
+        }
+    }
+
+    QSIMPLEQ_REMOVE(&s->all, cl, Cluster, entry);
+    cluster_free(cl);
+    s->nb_clusters--;
+}
+
+/* Returns number of cached clusters including unfinished */
+uint64_t seqcache_nb_clusters(SeqCache *s)
+{
+    return s->nb_clusters;
+}
diff --git a/MAINTAINERS b/MAINTAINERS
index 9b2aa18e1f..5e263c3343 100644
--- a/MAINTAINERS
+++ b/MAINTAINERS
@@ -3347,3 +3347,9 @@ Performance Tools and Tests
 M: Ahmed Karaman <ahmedkhaledkaraman@gmail.com>
 S: Maintained
 F: scripts/performance/
+
+Sequential Cache
+M: Vladimir Sementsov-Ogievskiy <vsementsov@virtuozzo.com>
+S: Supported
+F: util/seqcache.c
+F: include/qemu/seqcache.h
diff --git a/util/meson.build b/util/meson.build
index 984fba965f..6682e463ac 100644
--- a/util/meson.build
+++ b/util/meson.build
@@ -75,6 +75,7 @@ if have_block
   util_ss.add(files('block-helpers.c'))
   util_ss.add(files('qemu-coroutine-sleep.c'))
   util_ss.add(files('qemu-co-shared-resource.c'))
+  util_ss.add(files('seqcache.c'))
   util_ss.add(files('thread-pool.c', 'qemu-timer.c'))
   util_ss.add(files('readline.c'))
   util_ss.add(files('throttle.c'))
-- 
2.29.2




reply via email to

[Prev in Thread] Current Thread [Next in Thread]