[Date Prev][Date Next][Thread Prev][Thread Next][Date Index][Thread Index]
[PATCH 12/26] vfio-user: define socket receive functions
From: |
John Levon |
Subject: |
[PATCH 12/26] vfio-user: define socket receive functions |
Date: |
Wed, 8 Jan 2025 11:50:18 +0000 |
From: Jagannathan Raman <jag.raman@oracle.com>
Add infrastructure needed to receive incoming messages
Originally-by: John Johnson <john.g.johnson@oracle.com>
Signed-off-by: Elena Ufimtseva <elena.ufimtseva@oracle.com>
Signed-off-by: Jagannathan Raman <jag.raman@oracle.com>
Signed-off-by: John Levon <john.levon@nutanix.com>
---
hw/vfio/trace-events | 5 +
hw/vfio/user-pci.c | 11 ++
hw/vfio/user-protocol.h | 54 ++++++
hw/vfio/user.c | 408 ++++++++++++++++++++++++++++++++++++++++
hw/vfio/user.h | 10 +
5 files changed, 488 insertions(+)
create mode 100644 hw/vfio/user-protocol.h
diff --git a/hw/vfio/trace-events b/hw/vfio/trace-events
index cab1cf1de0..0e3e7be10c 100644
--- a/hw/vfio/trace-events
+++ b/hw/vfio/trace-events
@@ -180,3 +180,8 @@ iommufd_cdev_fail_attach_existing_container(const char
*msg) " %s"
iommufd_cdev_alloc_ioas(int iommufd, int ioas_id) " [iommufd=%d] new IOMMUFD
container with ioasid=%d"
iommufd_cdev_device_info(char *name, int devfd, int num_irqs, int num_regions,
int flags) " %s (%d) num_irqs=%d num_regions=%d flags=%d"
iommufd_cdev_pci_hot_reset_dep_devices(int domain, int bus, int slot, int
function, int dev_id) "\t%04x:%02x:%02x.%x devid %d"
+
+# user.c
+vfio_user_recv_hdr(const char *name, uint16_t id, uint16_t cmd, uint32_t size,
uint32_t flags) " (%s) id 0x%x cmd 0x%x size 0x%x flags 0x%x"
+vfio_user_recv_read(uint16_t id, int read) " id 0x%x read 0x%x"
+vfio_user_recv_request(uint16_t cmd) " command 0x%x"
diff --git a/hw/vfio/user-pci.c b/hw/vfio/user-pci.c
index 7610b47163..b62fd4edef 100644
--- a/hw/vfio/user-pci.c
+++ b/hw/vfio/user-pci.c
@@ -41,6 +41,16 @@ struct VFIOUserPCIDevice {
char *sock_name;
};
+/*
+ * Incoming request message callback.
+ *
+ * Runs off main loop, so BQL held.
+ */
+static void vfio_user_pci_process_req(void *opaque, VFIOUserMsg *msg)
+{
+
+}
+
/*
* Emulated devices don't use host hot reset
*/
@@ -86,6 +96,7 @@ static void vfio_user_pci_realize(PCIDevice *pdev, Error
**errp)
return;
}
vbasedev->proxy = proxy;
+ vfio_user_set_handler(vbasedev, vfio_user_pci_process_req, vdev);
vbasedev->name = g_strdup_printf("VFIO user <%s>", udev->sock_name);
vbasedev->ops = &vfio_user_pci_ops;
diff --git a/hw/vfio/user-protocol.h b/hw/vfio/user-protocol.h
new file mode 100644
index 0000000000..d23877c958
--- /dev/null
+++ b/hw/vfio/user-protocol.h
@@ -0,0 +1,54 @@
+#ifndef VFIO_USER_PROTOCOL_H
+#define VFIO_USER_PROTOCOL_H
+
+/*
+ * vfio protocol over a UNIX socket.
+ *
+ * Copyright © 2018, 2021 Oracle and/or its affiliates.
+ *
+ * This work is licensed under the terms of the GNU GPL, version 2. See
+ * the COPYING file in the top-level directory.
+ *
+ * Each message has a standard header that describes the command
+ * being sent, which is almost always a VFIO ioctl().
+ *
+ * The header may be followed by command-specific data, such as the
+ * region and offset info for read and write commands.
+ */
+
+typedef struct {
+ uint16_t id;
+ uint16_t command;
+ uint32_t size;
+ uint32_t flags;
+ uint32_t error_reply;
+} VFIOUserHdr;
+
+/* VFIOUserHdr commands */
+enum vfio_user_command {
+ VFIO_USER_VERSION = 1,
+ VFIO_USER_DMA_MAP = 2,
+ VFIO_USER_DMA_UNMAP = 3,
+ VFIO_USER_DEVICE_GET_INFO = 4,
+ VFIO_USER_DEVICE_GET_REGION_INFO = 5,
+ VFIO_USER_DEVICE_GET_REGION_IO_FDS = 6,
+ VFIO_USER_DEVICE_GET_IRQ_INFO = 7,
+ VFIO_USER_DEVICE_SET_IRQS = 8,
+ VFIO_USER_REGION_READ = 9,
+ VFIO_USER_REGION_WRITE = 10,
+ VFIO_USER_DMA_READ = 11,
+ VFIO_USER_DMA_WRITE = 12,
+ VFIO_USER_DEVICE_RESET = 13,
+ VFIO_USER_DIRTY_PAGES = 14,
+ VFIO_USER_MAX,
+};
+
+/* VFIOUserHdr flags */
+#define VFIO_USER_REQUEST 0x0
+#define VFIO_USER_REPLY 0x1
+#define VFIO_USER_TYPE 0xF
+
+#define VFIO_USER_NO_REPLY 0x10
+#define VFIO_USER_ERROR 0x20
+
+#endif /* VFIO_USER_PROTOCOL_H */
diff --git a/hw/vfio/user.c b/hw/vfio/user.c
index 1c79fb1cb9..1ab8e10739 100644
--- a/hw/vfio/user.c
+++ b/hw/vfio/user.c
@@ -24,11 +24,27 @@
#include "io/channel-util.h"
#include "system/iothread.h"
#include "user.h"
+#include "trace.h"
static IOThread *vfio_user_iothread;
static void vfio_user_shutdown(VFIOUserProxy *proxy);
+static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
+ VFIOUserFDs *fds);
+static VFIOUserFDs *vfio_user_getfds(int numfds);
+static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg);
+static void vfio_user_recv(void *opaque);
+static int vfio_user_recv_one(VFIOUserProxy *proxy);
+static void vfio_user_cb(void *opaque);
+
+static void vfio_user_request(void *opaque);
+
+static inline void vfio_user_set_error(VFIOUserHdr *hdr, uint32_t err)
+{
+ hdr->flags |= VFIO_USER_ERROR;
+ hdr->error_reply = err;
+}
/*
* Functions called by main, CPU, or iothread threads
@@ -41,10 +57,340 @@ static void vfio_user_shutdown(VFIOUserProxy *proxy)
proxy->ctx, NULL, NULL);
}
+static VFIOUserMsg *vfio_user_getmsg(VFIOUserProxy *proxy, VFIOUserHdr *hdr,
+ VFIOUserFDs *fds)
+{
+ VFIOUserMsg *msg;
+
+ msg = QTAILQ_FIRST(&proxy->free);
+ if (msg != NULL) {
+ QTAILQ_REMOVE(&proxy->free, msg, next);
+ } else {
+ msg = g_malloc0(sizeof(*msg));
+ qemu_cond_init(&msg->cv);
+ }
+
+ msg->hdr = hdr;
+ msg->fds = fds;
+ return msg;
+}
+
+/*
+ * Recycle a message list entry to the free list.
+ */
+static void vfio_user_recycle(VFIOUserProxy *proxy, VFIOUserMsg *msg)
+{
+ if (msg->type == VFIO_MSG_NONE) {
+ error_printf("vfio_user_recycle - freeing free msg\n");
+ return;
+ }
+
+ /* free msg buffer if no one is waiting to consume the reply */
+ if (msg->type == VFIO_MSG_NOWAIT || msg->type == VFIO_MSG_ASYNC) {
+ g_free(msg->hdr);
+ if (msg->fds != NULL) {
+ g_free(msg->fds);
+ }
+ }
+
+ msg->type = VFIO_MSG_NONE;
+ msg->hdr = NULL;
+ msg->fds = NULL;
+ msg->complete = false;
+ QTAILQ_INSERT_HEAD(&proxy->free, msg, next);
+}
+
+static VFIOUserFDs *vfio_user_getfds(int numfds)
+{
+ VFIOUserFDs *fds = g_malloc0(sizeof(*fds) + (numfds * sizeof(int)));
+
+ fds->fds = (int *)((char *)fds + sizeof(*fds));
+
+ return fds;
+}
+
/*
* Functions only called by iothread
*/
+/*
+ * Process a received message.
+ */
+static void vfio_user_process(VFIOUserProxy *proxy, VFIOUserMsg *msg,
+ bool isreply)
+{
+
+ /*
+ * Replies signal a waiter, if none just check for errors
+ * and free the message buffer.
+ *
+ * Requests get queued for the BH.
+ */
+ if (isreply) {
+ msg->complete = true;
+ if (msg->type == VFIO_MSG_WAIT) {
+ qemu_cond_signal(&msg->cv);
+ } else {
+ if (msg->hdr->flags & VFIO_USER_ERROR) {
+ error_printf("vfio_user_process: error reply on async ");
+ error_printf("request command %x error %s\n",
+ msg->hdr->command,
+ strerror(msg->hdr->error_reply));
+ }
+ /* youngest nowait msg has been ack'd */
+ if (proxy->last_nowait == msg) {
+ proxy->last_nowait = NULL;
+ }
+ vfio_user_recycle(proxy, msg);
+ }
+ } else {
+ QTAILQ_INSERT_TAIL(&proxy->incoming, msg, next);
+ qemu_bh_schedule(proxy->req_bh);
+ }
+}
+
+/*
+ * Complete a partial message read
+ */
+static int vfio_user_complete(VFIOUserProxy *proxy, Error **errp)
+{
+ VFIOUserMsg *msg = proxy->part_recv;
+ size_t msgleft = proxy->recv_left;
+ bool isreply;
+ char *data;
+ int ret;
+
+ data = (char *)msg->hdr + (msg->hdr->size - msgleft);
+ while (msgleft > 0) {
+ ret = qio_channel_read(proxy->ioc, data, msgleft, errp);
+
+ /* error or would block */
+ if (ret <= 0) {
+ /* try for rest on next iternation */
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ proxy->recv_left = msgleft;
+ }
+ return ret;
+ }
+ trace_vfio_user_recv_read(msg->hdr->id, ret);
+
+ msgleft -= ret;
+ data += ret;
+ }
+
+ /*
+ * Read complete message, process it.
+ */
+ proxy->part_recv = NULL;
+ proxy->recv_left = 0;
+ isreply = (msg->hdr->flags & VFIO_USER_TYPE) == VFIO_USER_REPLY;
+ vfio_user_process(proxy, msg, isreply);
+
+ /* return positive value */
+ return 1;
+}
+
+static void vfio_user_recv(void *opaque)
+{
+ VFIOUserProxy *proxy = opaque;
+
+ QEMU_LOCK_GUARD(&proxy->lock);
+
+ if (proxy->state == VFIO_PROXY_CONNECTED) {
+ while (vfio_user_recv_one(proxy) == 0) {
+ ;
+ }
+ }
+}
+
+/*
+ * Receive and process one incoming message.
+ *
+ * For replies, find matching outgoing request and wake any waiters.
+ * For requests, queue in incoming list and run request BH.
+ */
+static int vfio_user_recv_one(VFIOUserProxy *proxy)
+{
+ VFIOUserMsg *msg = NULL;
+ g_autofree int *fdp = NULL;
+ VFIOUserFDs *reqfds;
+ VFIOUserHdr hdr;
+ struct iovec iov = {
+ .iov_base = &hdr,
+ .iov_len = sizeof(hdr),
+ };
+ bool isreply = false;
+ int i, ret;
+ size_t msgleft, numfds = 0;
+ char *data = NULL;
+ char *buf = NULL;
+ Error *local_err = NULL;
+
+ /*
+ * Complete any partial reads
+ */
+ if (proxy->part_recv != NULL) {
+ ret = vfio_user_complete(proxy, &local_err);
+
+ /* still not complete, try later */
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ return ret;
+ }
+
+ if (ret <= 0) {
+ goto fatal;
+ }
+ /* else fall into reading another msg */
+ }
+
+ /*
+ * Read header
+ */
+ ret = qio_channel_readv_full(proxy->ioc, &iov, 1, &fdp, &numfds, 0,
+ &local_err);
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ return ret;
+ }
+
+ /* read error or other side closed connection */
+ if (ret <= 0) {
+ goto fatal;
+ }
+
+ if (ret < sizeof(hdr)) {
+ error_setg(&local_err, "short read of header");
+ goto fatal;
+ }
+
+ /*
+ * Validate header
+ */
+ if (hdr.size < sizeof(VFIOUserHdr)) {
+ error_setg(&local_err, "bad header size");
+ goto fatal;
+ }
+ switch (hdr.flags & VFIO_USER_TYPE) {
+ case VFIO_USER_REQUEST:
+ isreply = false;
+ break;
+ case VFIO_USER_REPLY:
+ isreply = true;
+ break;
+ default:
+ error_setg(&local_err, "unknown message type");
+ goto fatal;
+ }
+ trace_vfio_user_recv_hdr(proxy->sockname, hdr.id, hdr.command, hdr.size,
+ hdr.flags);
+
+ /*
+ * For replies, find the matching pending request.
+ * For requests, reap incoming FDs.
+ */
+ if (isreply) {
+ QTAILQ_FOREACH(msg, &proxy->pending, next) {
+ if (hdr.id == msg->id) {
+ break;
+ }
+ }
+ if (msg == NULL) {
+ error_setg(&local_err, "unexpected reply");
+ goto err;
+ }
+ QTAILQ_REMOVE(&proxy->pending, msg, next);
+
+ /*
+ * Process any received FDs
+ */
+ if (numfds != 0) {
+ if (msg->fds == NULL || msg->fds->recv_fds < numfds) {
+ error_setg(&local_err, "unexpected FDs");
+ goto err;
+ }
+ msg->fds->recv_fds = numfds;
+ memcpy(msg->fds->fds, fdp, numfds * sizeof(int));
+ }
+ } else {
+ if (numfds != 0) {
+ reqfds = vfio_user_getfds(numfds);
+ memcpy(reqfds->fds, fdp, numfds * sizeof(int));
+ } else {
+ reqfds = NULL;
+ }
+ }
+
+ /*
+ * Put the whole message into a single buffer.
+ */
+ if (isreply) {
+ if (hdr.size > msg->rsize) {
+ error_setg(&local_err, "reply larger than recv buffer");
+ goto err;
+ }
+ *msg->hdr = hdr;
+ data = (char *)msg->hdr + sizeof(hdr);
+ } else {
+ buf = g_malloc0(hdr.size);
+ memcpy(buf, &hdr, sizeof(hdr));
+ data = buf + sizeof(hdr);
+ msg = vfio_user_getmsg(proxy, (VFIOUserHdr *)buf, reqfds);
+ msg->type = VFIO_MSG_REQ;
+ }
+
+ /*
+ * Read rest of message.
+ */
+ msgleft = hdr.size - sizeof(hdr);
+ while (msgleft > 0) {
+ ret = qio_channel_read(proxy->ioc, data, msgleft, &local_err);
+
+ /* prepare to complete read on next iternation */
+ if (ret == QIO_CHANNEL_ERR_BLOCK) {
+ proxy->part_recv = msg;
+ proxy->recv_left = msgleft;
+ return ret;
+ }
+
+ if (ret <= 0) {
+ goto fatal;
+ }
+ trace_vfio_user_recv_read(hdr.id, ret);
+
+ msgleft -= ret;
+ data += ret;
+ }
+
+ vfio_user_process(proxy, msg, isreply);
+ return 0;
+
+ /*
+ * fatal means the other side closed or we don't trust the stream
+ * err means this message is corrupt
+ */
+fatal:
+ vfio_user_shutdown(proxy);
+ proxy->state = VFIO_PROXY_ERROR;
+
+ /* set error if server side closed */
+ if (ret == 0) {
+ error_setg(&local_err, "server closed socket");
+ }
+
+err:
+ for (i = 0; i < numfds; i++) {
+ close(fdp[i]);
+ }
+ if (isreply && msg != NULL) {
+ /* force an error to keep sending thread from hanging */
+ vfio_user_set_error(msg->hdr, EINVAL);
+ msg->complete = true;
+ qemu_cond_signal(&msg->cv);
+ }
+ error_prepend(&local_err, "vfio_user_recv_one: ");
+ error_report_err(local_err);
+ return -1;
+}
+
static void vfio_user_cb(void *opaque)
{
VFIOUserProxy *proxy = opaque;
@@ -60,6 +406,53 @@ static void vfio_user_cb(void *opaque)
* Functions called by main or CPU threads
*/
+/*
+ * Process incoming requests.
+ *
+ * The bus-specific callback has the form:
+ * request(opaque, msg)
+ * where 'opaque' was specified in vfio_user_set_handler
+ * and 'msg' is the inbound message.
+ *
+ * The callback is responsible for disposing of the message buffer,
+ * usually by re-using it when calling vfio_send_reply or vfio_send_error,
+ * both of which free their message buffer when the reply is sent.
+ *
+ * If the callback uses a new buffer, it needs to free the old one.
+ */
+static void vfio_user_request(void *opaque)
+{
+ VFIOUserProxy *proxy = opaque;
+ VFIOUserMsgQ new, free;
+ VFIOUserMsg *msg, *m1;
+
+ /* reap all incoming */
+ QTAILQ_INIT(&new);
+ WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+ QTAILQ_FOREACH_SAFE(msg, &proxy->incoming, next, m1) {
+ QTAILQ_REMOVE(&proxy->incoming, msg, next);
+ QTAILQ_INSERT_TAIL(&new, msg, next);
+ }
+ }
+
+ /* process list */
+ QTAILQ_INIT(&free);
+ QTAILQ_FOREACH_SAFE(msg, &new, next, m1) {
+ QTAILQ_REMOVE(&new, msg, next);
+ trace_vfio_user_recv_request(msg->hdr->command);
+ proxy->request(proxy->req_arg, msg);
+ QTAILQ_INSERT_HEAD(&free, msg, next);
+ }
+
+ /* free list */
+ WITH_QEMU_LOCK_GUARD(&proxy->lock) {
+ QTAILQ_FOREACH_SAFE(msg, &free, next, m1) {
+ vfio_user_recycle(proxy, msg);
+ }
+ }
+}
+
+
static QLIST_HEAD(, VFIOUserProxy) vfio_user_sockets =
QLIST_HEAD_INITIALIZER(vfio_user_sockets);
@@ -98,6 +491,7 @@ VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr,
Error **errp)
}
proxy->ctx = iothread_get_aio_context(vfio_user_iothread);
+ proxy->req_bh = qemu_bh_new(vfio_user_request, proxy);
QTAILQ_INIT(&proxy->outgoing);
QTAILQ_INIT(&proxy->incoming);
@@ -108,6 +502,18 @@ VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr,
Error **errp)
return proxy;
}
+void vfio_user_set_handler(VFIODevice *vbasedev,
+ void (*handler)(void *opaque, VFIOUserMsg *msg),
+ void *req_arg)
+{
+ VFIOUserProxy *proxy = vbasedev->proxy;
+
+ proxy->request = handler;
+ proxy->req_arg = req_arg;
+ qio_channel_set_aio_fd_handler(proxy->ioc, proxy->ctx,
+ vfio_user_recv, NULL, NULL, proxy);
+}
+
void vfio_user_disconnect(VFIOUserProxy *proxy)
{
VFIOUserMsg *r1, *r2;
@@ -123,6 +529,8 @@ void vfio_user_disconnect(VFIOUserProxy *proxy)
}
object_unref(OBJECT(proxy->ioc));
proxy->ioc = NULL;
+ qemu_bh_delete(proxy->req_bh);
+ proxy->req_bh = NULL;
proxy->state = VFIO_PROXY_CLOSING;
QTAILQ_FOREACH_SAFE(r1, &proxy->outgoing, next, r2) {
diff --git a/hw/vfio/user.h b/hw/vfio/user.h
index ac7d15dfa8..30cf35d3e4 100644
--- a/hw/vfio/user.h
+++ b/hw/vfio/user.h
@@ -11,6 +11,8 @@
*
*/
+#include "user-protocol.h"
+
typedef struct {
int send_fds;
int recv_fds;
@@ -27,6 +29,7 @@ enum msg_type {
typedef struct VFIOUserMsg {
QTAILQ_ENTRY(VFIOUserMsg) next;
+ VFIOUserHdr *hdr;
VFIOUserFDs *fds;
uint32_t rsize;
uint32_t id;
@@ -66,13 +69,20 @@ typedef struct VFIOUserProxy {
VFIOUserMsgQ incoming;
VFIOUserMsgQ outgoing;
VFIOUserMsg *last_nowait;
+ VFIOUserMsg *part_recv;
+ size_t recv_left;
enum proxy_state state;
} VFIOUserProxy;
/* VFIOProxy flags */
#define VFIO_PROXY_CLIENT 0x1
+typedef struct VFIODevice VFIODevice;
+
VFIOUserProxy *vfio_user_connect_dev(SocketAddress *addr, Error **errp);
void vfio_user_disconnect(VFIOUserProxy *proxy);
+void vfio_user_set_handler(VFIODevice *vbasedev,
+ void (*handler)(void *opaque, VFIOUserMsg *msg),
+ void *reqarg);
#endif /* VFIO_USER_H */
--
2.34.1
- [v7 00/26] vfio-user client, John Levon, 2025/01/08
- [PATCH 10/26] vfio-user: add vfio-user class and container, John Levon, 2025/01/08
- [PATCH 02/26] vfio/container: pass listener_begin/commit callbacks, John Levon, 2025/01/08
- [PATCH 08/26] vfio: add device IO ops vector, John Levon, 2025/01/08
- [PATCH 04/26] vfio: add vfio_attach_device_by_iommu_type(), John Levon, 2025/01/08
- [PATCH 05/26] vfio: add vfio_prepare_device(), John Levon, 2025/01/08
- [PATCH 09/26] vfio-user: introduce vfio-user protocol specification, John Levon, 2025/01/08
- [PATCH 11/26] vfio-user: connect vfio proxy to remote server, John Levon, 2025/01/08
- [PATCH 14/26] vfio-user: get device info, John Levon, 2025/01/08
- [PATCH 12/26] vfio-user: define socket receive functions,
John Levon <=
- [PATCH 07/26] vfio: add VFIO base abstract class, John Levon, 2025/01/08
- [PATCH 13/26] vfio-user: define socket send functions, John Levon, 2025/01/08
- [PATCH 17/26] vfio-user: pci_user_realize PCI setup, John Levon, 2025/01/08
- [PATCH 15/26] vfio-user: get region info, John Levon, 2025/01/08
- [PATCH 20/26] vfio-user: proxy container connect/disconnect, John Levon, 2025/01/08
- [PATCH 22/26] vfio-user: no-mmap DMA support, John Levon, 2025/01/08
- [PATCH 19/26] vfio-user: forward msix BAR accesses to server, John Levon, 2025/01/08
- [PATCH 06/26] vfio: add region cache, John Levon, 2025/01/08
- [PATCH 01/26] vfio/container: pass MemoryRegion to DMA operations, John Levon, 2025/01/08
- [PATCH 24/26] vfio-user: pci reset, John Levon, 2025/01/08