[Top][All Lists]
[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
Re: [Qemu-devel] [PATCH 11/12] nbd: switch to asynchronous operation
From: |
Nicholas Thomas |
Subject: |
Re: [Qemu-devel] [PATCH 11/12] nbd: switch to asynchronous operation |
Date: |
Fri, 09 Sep 2011 15:52:08 +0100 |
User-agent: |
Mozilla/5.0 (X11; U; Linux x86_64; en-US; rv:1.9.2.21) Gecko/20110831 Thunderbird/3.1.13 |
On 08/09/11 16:25, Paolo Bonzini wrote:
> Signed-off-by: Paolo Bonzini <address@hidden>
> ---
> block/nbd.c | 167 ++++++++++++++++++++++++++++++++++++++--------------------
> nbd.c | 8 +++
> 2 files changed, 117 insertions(+), 58 deletions(-)
>
> diff --git a/block/nbd.c b/block/nbd.c
> index 964caa8..5a75263 100644
> --- a/block/nbd.c
> +++ b/block/nbd.c
> @@ -52,6 +52,9 @@ typedef struct BDRVNBDState {
> size_t blocksize;
> char *export_name; /* An NBD server may export several devices */
>
> + CoMutex mutex;
> + Coroutine *coroutine;
> +
> /* If it begins with '/', this is a UNIX domain socket. Otherwise,
> * it's a string of the form <hostname|ip4|\[ip6\]>:port
> */
> @@ -104,6 +107,37 @@ out:
> return err;
> }
>
> +static void nbd_coroutine_start(BDRVNBDState *s)
> +{
> + qemu_co_mutex_lock(&s->mutex);
> + s->coroutine = qemu_coroutine_self();
> +}
> +
> +static void nbd_coroutine_enter(void *opaque)
> +{
> + BDRVNBDState *s = opaque;
> + qemu_coroutine_enter(s->coroutine, NULL);
> +}
> +
> +static int nbd_co_send_request(BDRVNBDState *s, struct nbd_request *request)
> +{
> + qemu_aio_set_fd_handler(s->sock, NULL, nbd_coroutine_enter, NULL, NULL,
> s);
> + return nbd_send_request(s->sock, request);
> +}
> +
> +static int nbd_co_receive_reply(BDRVNBDState *s, struct nbd_reply *reply)
> +{
> + qemu_aio_set_fd_handler(s->sock, nbd_coroutine_enter, NULL, NULL, NULL,
> s);
> + return nbd_receive_reply(s->sock, reply);
> +}
> +
> +static void nbd_coroutine_end(BDRVNBDState *s)
> +{
> + qemu_aio_set_fd_handler(s->sock, NULL, NULL, NULL, NULL, s);
> + s->coroutine = NULL;
> + qemu_co_mutex_unlock(&s->mutex);
> +}
> +
> static int nbd_establish_connection(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> @@ -163,6 +197,8 @@ static int nbd_open(BlockDriverState *bs, const char*
> filename, int flags)
> BDRVNBDState *s = bs->opaque;
> int result;
>
> + qemu_co_mutex_init(&s->mutex);
> +
> /* Pop the config into our state object. Exit if invalid. */
> result = nbd_config(s, filename, flags);
> if (result != 0) {
> @@ -177,8 +213,8 @@ static int nbd_open(BlockDriverState *bs, const char*
> filename, int flags)
> return result;
> }
>
> -static int nbd_read(BlockDriverState *bs, int64_t sector_num,
> - uint8_t *buf, int nb_sectors)
> +static int nbd_co_readv(BlockDriverState *bs, int64_t sector_num,
> + int nb_sectors, QEMUIOVector *qiov)
> {
> BDRVNBDState *s = bs->opaque;
> struct nbd_request request;
> @@ -189,30 +225,39 @@ static int nbd_read(BlockDriverState *bs, int64_t
> sector_num,
> request.from = sector_num * 512;;
> request.len = nb_sectors * 512;
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> -
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> -
> - if (reply.error !=0)
> - return -reply.error;
> -
> - if (reply.handle != request.handle)
> - return -EIO;
> + nbd_coroutine_start(s);
> + if (nbd_co_send_request(s, &request) == -1) {
> + reply.error = errno;
> + goto done;
> + }
> + if (nbd_co_receive_reply(s, &reply) == -1) {
> + reply.error = errno;
> + goto done;
> + }
> + if (reply.error != 0) {
> + goto done;
> + }
> + if (reply.handle != request.handle) {
> + reply.error = EIO;
> + goto done;
> + }
> + if (qemu_co_recvv(s->sock, qiov->iov, request.len, 0) != request.len) {
> + reply.error = EIO;
> + }
>
> - if (nbd_wr_sync(s->sock, buf, request.len, 1) != request.len)
> - return -EIO;
> +done:
> + nbd_coroutine_end(s);
> + return -reply.error;
>
> - return 0;
> }
I'm a bit unsure here, actually. So you lock a mutex, send a request,
wait for a response, then unlock the mutex. Surely this code doesn't
allow more than one request to be in flight at a time?
My approach was to write the request to the socket as soon as possible.
IIRC, QEMU can have up to 16 IOs outstanding, so this gave us "some"
speedup, although I don't have formal before/after benchmarks. For
testing, speed isn't too important, but we're using NBD in production to
run VMs on a 10GigE network with SAS and SSD storage, so we're somewhat
interested in performance :).
If this *is* letting > 1 request be on the wire at a time, then
request.handle needs to be unique to the request. I don't think:
request.handle = (uint64_t)(intptr_t)bs;
is.
> -static int nbd_write(BlockDriverState *bs, int64_t sector_num,
> - const uint8_t *buf, int nb_sectors)
> +static int nbd_co_writev(BlockDriverState *bs, int64_t sector_num,
> + int nb_sectors, QEMUIOVector *qiov)
> {
> BDRVNBDState *s = bs->opaque;
> struct nbd_request request;
> struct nbd_reply reply;
> + int ret;
>
> request.type = NBD_CMD_WRITE;
> if (!bdrv_enable_write_cache(bs) && (s->nbdflags & NBD_FLAG_SEND_FUA)) {
> @@ -223,25 +268,30 @@ static int nbd_write(BlockDriverState *bs, int64_t
> sector_num,
> request.from = sector_num * 512;;
> request.len = nb_sectors * 512;
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> -
> - if (nbd_wr_sync(s->sock, (uint8_t*)buf, request.len, 0) != request.len)
> - return -EIO;
> -
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> -
> - if (reply.error !=0)
> - return -reply.error;
> -
> - if (reply.handle != request.handle)
> - return -EIO;
> + nbd_coroutine_start(s);
> + if (nbd_co_send_request(s, &request) == -1) {
> + reply.error = errno;
> + goto done;
> + }
> + ret = qemu_co_sendv(s->sock, qiov->iov, request.len, 0);
> + if (ret != request.len) {
> + reply.error = EIO;
> + goto done;
> + }
> + if (nbd_co_receive_reply(s, &reply) == -1) {
> + reply.error = errno;
> + goto done;
> + }
> + if (reply.handle != request.handle) {
> + reply.error = EIO;
> + }
>
> - return 0;
> +done:
> + nbd_coroutine_end(s);
> + return -reply.error;
> }
>
> -static int nbd_flush(BlockDriverState *bs)
> +static int nbd_co_flush(BlockDriverState *bs)
> {
> BDRVNBDState *s = bs->opaque;
> struct nbd_request request;
> @@ -260,19 +310,22 @@ static int nbd_flush(BlockDriverState *bs)
> request.from = 0;
> request.len = 0;
>
> - if (nbd_send_request(s->sock, &request) == -1)
> - return -errno;
> -
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> - return -errno;
> -
> - if (reply.error !=0)
> - return -reply.error;
> -
> - if (reply.handle != request.handle)
> - return -EIO;
> + nbd_coroutine_start(s);
> + if (nbd_co_send_request(s, &request) == -1) {
> + reply.error = errno;
> + goto done;
> + }
> + if (nbd_co_receive_reply(s, &reply) == -1) {
> + reply.error = errno;
> + goto done;
> + }
> + if (reply.error == 0 && reply.handle != request.handle) {
> + reply.error = EIO;
> + }
>
> - return 0;
> +done:
> + nbd_coroutine_end(s);
> + return -reply.error;
> }
>
> static int nbd_discard(BlockDriverState *bs, int64_t sector_num,
> @@ -290,19 +343,17 @@ static int nbd_discard(BlockDriverState *bs, int64_t
> sector_num,
> request.from = sector_num * 512;;
> request.len = nb_sectors * 512;
>
> - if (nbd_send_request(s->sock, &request) == -1)
> + if (nbd_send_request(s->sock, &request) == -1) {
> return -errno;
> -
> - if (nbd_receive_reply(s->sock, &reply) == -1)
> + }
> + if (nbd_receive_reply(s->sock, &reply) == -1) {
> return -errno;
> -
> - if (reply.error !=0)
> - return -reply.error;
> -
> - if (reply.handle != request.handle)
> + }
> + if (reply.error == 0 && reply.handle != request.handle) {
> return -EIO;
> + }
>
> - return 0;
> + return -reply.error;
> }
>
> static void nbd_close(BlockDriverState *bs)
> @@ -325,10 +376,10 @@ static BlockDriver bdrv_nbd = {
> .format_name = "nbd",
> .instance_size = sizeof(BDRVNBDState),
> .bdrv_file_open = nbd_open,
> - .bdrv_read = nbd_read,
> - .bdrv_write = nbd_write,
> + .bdrv_co_readv = nbd_co_readv,
> + .bdrv_co_writev = nbd_co_writev,
> .bdrv_close = nbd_close,
> - .bdrv_flush = nbd_flush,
> + .bdrv_co_flush = nbd_co_flush,
> .bdrv_discard = nbd_discard,
> .bdrv_getlength = nbd_getlength,
> .protocol_name = "nbd",
> diff --git a/nbd.c b/nbd.c
> index f089904..2f4c6b3 100644
> --- a/nbd.c
> +++ b/nbd.c
> @@ -80,6 +80,14 @@ size_t nbd_wr_sync(int fd, void *buffer, size_t size, bool
> do_read)
> {
> size_t offset = 0;
>
> + if (qemu_in_coroutine()) {
> + if (do_read) {
> + return qemu_co_recv(fd, buffer, size);
> + } else {
> + return qemu_co_send(fd, buffer, size);
> + }
> + }
> +
> while (offset < size) {
> ssize_t len;
>