[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[Qemu-devel] [PATCH v2 13/15] nbd: switch to asynchronous operation
From: |
Paolo Bonzini |
Subject: |
[Qemu-devel] [PATCH v2 13/15] nbd: switch to asynchronous operation |
Date: |
Fri, 16 Sep 2011 16:25:50 +0200 |
Signed-off-by: Paolo Bonzini <address@hidden>
---
block/nbd.c | 217 +++++++++++++++++++++++++++++++++++++++--------------------
nbd.c | 8 ++
2 files changed, 151 insertions(+), 74 deletions(-)
diff --git a/block/nbd.c b/block/nbd.c
index 35c15c8..f6efd7b 100644
--- a/block/nbd.c
+++ b/block/nbd.c
@@ -53,6 +53,11 @@ typedef struct BDRVNBDState {
size_t blocksize;
char *export_name; /* An NBD server may export several devices */
+ CoMutex mutex;
+ Coroutine *coroutine;
+
+ struct nbd_reply reply;
+
/* If it begins with '/', this is a UNIX domain socket. Otherwise,
* it's a string of the form <hostname|ip4|\[ip6\]>:port
*/
@@ -105,6 +110,95 @@ out:
return err;
}
+static void nbd_coroutine_start(BDRVNBDState *s, struct nbd_request *request)
+{
+ qemu_co_mutex_lock(&s->mutex);
+ s->coroutine = qemu_coroutine_self();
+ request->handle = (uint64_t)(intptr_t)s;
+}
+
+static int nbd_have_request(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+
+ return !!s->coroutine;
+}
+
+static void nbd_reply_ready(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+
+ if (s->reply.handle == 0) {
+ /* No reply already in flight. Fetch a header. */
+ if (nbd_receive_reply(s->sock, &s->reply) < 0) {
+ s->reply.handle = 0;
+ }
+ }
+
+ /* There's no need for a mutex on the receive side, because the
+ * handler acts as a synchronization point and ensures that only
+ * one coroutine is called until the reply finishes. */
+ if (s->coroutine) {
+ qemu_coroutine_enter(s->coroutine, NULL);
+ }
+}
+
+static void nbd_restart_write(void *opaque)
+{
+ BDRVNBDState *s = opaque;
+ qemu_coroutine_enter(s->coroutine, NULL);
+}
+
+static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request,
+ struct iovec *iov, int offset)
+{
+ int rc, ret;
+
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, nbd_restart_write,
+ nbd_have_request, NULL, s);
+ rc = nbd_send_request(s->sock, request);
+ if (rc != -1 && iov) {
+ ret = qemu_co_sendv(s->sock, iov, request->len, offset);
+ if (ret != request->len) {
+ errno = -EIO;
+ rc = -1;
+ }
+ }
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+ nbd_have_request, NULL, s);
+ return rc;
+}
+
+static void nbd_co_receive_reply(BDRVNBDState *s, struct nbd_request *request,
+ struct nbd_reply *reply,
+ struct iovec *iov, int offset)
+{
+ int ret;
+
+ /* Wait until we're woken up by the read handler. */
+ qemu_coroutine_yield();
+ *reply = s->reply;
+ if (reply->handle != request->handle) {
+ reply->error = EIO;
+ } else {
+ if (iov && reply->error == 0) {
+ ret = qemu_co_recvv(s->sock, iov, request->len, offset);
+ if (ret != request->len) {
+ reply->error = EIO;
+ }
+ }
+
+ /* Tell the read handler to read another header. */
+ s->reply.handle = 0;
+ }
+}
+
+static void nbd_coroutine_end(BDRVNBDState *s, struct nbd_request *request)
+{
+ s->coroutine = NULL;
+ qemu_co_mutex_unlock(&s->mutex);
+}
+
static int nbd_establish_connection(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
@@ -134,8 +228,11 @@ static int nbd_establish_connection(BlockDriverState *bs)
return -errno;
}
- /* Now that we're connected, set the socket to be non-blocking */
+ /* Now that we're connected, set the socket to be non-blocking and
+ * kick the reply mechanism. */
socket_set_nonblock(sock);
+ qemu_aio_set_fd_handler(s->sock, nbd_reply_ready, NULL,
+ nbd_have_request, NULL, s);
s->sock = sock;
s->size = size;
@@ -151,7 +248,6 @@ static void nbd_teardown_connection(BlockDriverState *bs)
struct nbd_request request;
request.type = NBD_CMD_DISC;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = 0;
request.len = 0;
nbd_send_request(s->sock, &request);
@@ -164,6 +260,8 @@ static int nbd_open(BlockDriverState *bs, const char*
filename, int flags)
BDRVNBDState *s = bs->opaque;
int result;
+ qemu_co_mutex_init(&s->mutex);
+
/* Pop the config into our state object. Exit if invalid. */
result = nbd_config(s, filename, flags);
if (result != 0) {
@@ -178,38 +276,30 @@ static int nbd_open(BlockDriverState *bs, const char*
filename, int flags)
return result;
}
-static int nbd_read(BlockDriverState *bs, int64_t sector_num,
- uint8_t *buf, int nb_sectors)
+static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
struct nbd_reply reply;
request.type = NBD_CMD_READ;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;;
request.len = nb_sectors * 512;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
-
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
-
- if (reply.error !=0)
- return -reply.error;
-
- if (reply.handle != request.handle)
- return -EIO;
-
- if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
- return -EIO;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, qiov->iov, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
- return 0;
}
-static int nbd_write(BlockDriverState *bs, int64_t sector_num,
- const uint8_t *buf, int nb_sectors)
+static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors, QEMUIOVector *qiov)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
@@ -220,29 +310,20 @@ static int nbd_write(BlockDriverState *bs, int64_t
sector_num,
request.type |= NBD_CMD_FLAG_FUA;
}
- request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;;
request.len = nb_sectors * 512;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
-
- if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
- return -EIO;
-
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
-
- if (reply.error !=0)
- return -reply.error;
-
- if (reply.handle != request.handle)
- return -EIO;
-
- return 0;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, qiov->iov, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
}
-static int nbd_flush(BlockDriverState *bs)
+static int nbd_co_flush(BlockDriverState *bs)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
@@ -257,27 +338,21 @@ static int nbd_flush(BlockDriverState *bs)
request.type |= NBD_CMD_FLAG_FUA;
}
- request.handle = (uint64_t)(intptr_t)bs;
request.from = 0;
request.len = 0;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
-
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
-
- if (reply.error != 0)
- return -reply.error;
-
- if (reply.handle != request.handle)
- return -EIO;
-
- return 0;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
}
-static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
- int nb_sectors)
+static int nbd_co_discard(BlockDriverState *bs, int64_t sector_num,
+ int nb_sectors)
{
BDRVNBDState *s = bs->opaque;
struct nbd_request request;
@@ -287,23 +362,17 @@ static int nbd_discard(BlockDriverState *bs, int64_t
sector_num,
return 0;
}
request.type = NBD_CMD_TRIM;
- request.handle = (uint64_t)(intptr_t)bs;
request.from = sector_num * 512;;
request.len = nb_sectors * 512;
- if (nbd_send_request(s->sock, &request) == -1)
- return -errno;
-
- if (nbd_receive_reply(s->sock, &reply) == -1)
- return -errno;
-
- if (reply.error !=0)
- return -reply.error;
-
- if (reply.handle != request.handle)
- return -EIO;
-
- return 0;
+ nbd_coroutine_start(s, &request);
+ if (nbd_co_send_request(s, &request, NULL, 0) == -1) {
+ reply.error = errno;
+ } else {
+ nbd_co_receive_reply(s, &request, &reply, NULL, 0);
+ }
+ nbd_coroutine_end(s, &request);
+ return -reply.error;
}
static void nbd_close(BlockDriverState *bs)
@@ -326,11 +395,11 @@ static BlockDriver bdrv_nbd = {
.format_name = "nbd",
.instance_size = sizeof(BDRVNBDState),
.bdrv_file_open = nbd_open,
- .bdrv_read = nbd_read,
- .bdrv_write = nbd_write,
+ .bdrv_co_readv = nbd_co_readv,
+ .bdrv_co_writev = nbd_co_writev,
.bdrv_close = nbd_close,
- .bdrv_flush = nbd_flush,
- .bdrv_discard = nbd_discard,
+ .bdrv_co_flush = nbd_co_flush,
+ .bdrv_co_discard = nbd_co_discard,
.bdrv_getlength = nbd_getlength,
.protocol_name = "nbd",
};
diff --git a/nbd.c b/nbd.c
index 5a618c5..40c76d9 100644
--- a/nbd.c
+++ b/nbd.c
@@ -81,6 +81,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool
do_read)
{
size_t offset = 0;
+ if (qemu_in_coroutine()) {
+ if (do_read) {
+ return qemu_co_recv(fd, buffer, size);
+ } else {
+ return qemu_co_send(fd, buffer, size);
+ }
+ }
+
while (offset < size) {
ssize_t len;
--
1.7.6
- Re: [Qemu-devel] [PATCH v2 04/15] coroutine-io: handle zero returns from recv, (continued)
- [Qemu-devel] [PATCH v2 05/15] block: emulate .bdrv_flush() using .bdrv_aio_flush(), Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 06/15] block: group together the plugging of synchronous IO emulation, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 12/15] nbd: add support for NBD_CMD_TRIM, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 09/15] nbd: fix error handling in the server, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 10/15] nbd: add support for NBD_CMD_FLUSH, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 08/15] block: add bdrv_co_discard and bdrv_aio_discard support, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 07/15] block: add bdrv_co_flush support, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 14/15] nbd: split requests, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 11/15] nbd: add support for NBD_CMD_FLAG_FUA, Paolo Bonzini, 2011/09/16
- [Qemu-devel] [PATCH v2 13/15] nbd: switch to asynchronous operation,
Paolo Bonzini <=
- [Qemu-devel] [PATCH v2 15/15] nbd: allow multiple in-flight requests, Paolo Bonzini, 2011/09/16