dcavalca / rpms / qemu

Forked from rpms/qemu 11 months ago
Clone

Blame 0110-virtiofsd-process-requests-in-a-thread-pool.patch

1d442b
From: Stefan Hajnoczi <stefanha@redhat.com>
1d442b
Date: Mon, 27 Jan 2020 19:02:19 +0000
1d442b
Subject: [PATCH] virtiofsd: process requests in a thread pool
1d442b
1d442b
Introduce a thread pool so that fv_queue_thread() just pops
1d442b
VuVirtqElements and hands them to the thread pool.  For the time being
1d442b
only one worker thread is allowed since passthrough_ll.c is not
1d442b
thread-safe yet.  Future patches will lift this restriction so that
1d442b
multiple FUSE requests can be processed in parallel.
1d442b
1d442b
The main new concept is struct FVRequest, which contains both
1d442b
VuVirtqElement and struct fuse_chan.  We now have fv_VuDev for a device,
1d442b
fv_QueueInfo for a virtqueue, and FVRequest for a request.  Some of
1d442b
fv_QueueInfo's fields are moved into FVRequest because they are
1d442b
per-request.  The name FVRequest conforms to QEMU coding style and I
1d442b
expect the struct fv_* types will be renamed in a future refactoring.
1d442b
1d442b
This patch series is not optimal.  fbuf reuse is dropped so each request
1d442b
does malloc(se->bufsize), but there is no clean and cheap way to keep
1d442b
this with a thread pool.  The vq_lock mutex is held for longer than
1d442b
necessary, especially during the eventfd_write() syscall.  Performance
1d442b
can be improved in the future.
1d442b
1d442b
prctl(2) had to be added to the seccomp whitelist because glib invokes
1d442b
it.
1d442b
1d442b
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
1d442b
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
1d442b
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
1d442b
(cherry picked from commit a3d756c5aecccc4c0e51060a7e2f1c87bf8f1180)
1d442b
---
1d442b
 tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++---------------
1d442b
 1 file changed, 201 insertions(+), 158 deletions(-)
1d442b
1d442b
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
1d442b
index f6242f9338..0dcf2ef57a 100644
1d442b
--- a/tools/virtiofsd/fuse_virtio.c
1d442b
+++ b/tools/virtiofsd/fuse_virtio.c
1d442b
@@ -22,6 +22,7 @@
1d442b
 
1d442b
 #include <assert.h>
1d442b
 #include <errno.h>
1d442b
+#include <glib.h>
1d442b
 #include <stdint.h>
1d442b
 #include <stdio.h>
1d442b
 #include <stdlib.h>
1d442b
@@ -37,17 +38,28 @@
1d442b
 struct fv_VuDev;
1d442b
 struct fv_QueueInfo {
1d442b
     pthread_t thread;
1d442b
+    /*
1d442b
+     * This lock protects the VuVirtq preventing races between
1d442b
+     * fv_queue_thread() and fv_queue_worker().
1d442b
+     */
1d442b
+    pthread_mutex_t vq_lock;
1d442b
+
1d442b
     struct fv_VuDev *virtio_dev;
1d442b
 
1d442b
     /* Our queue index, corresponds to array position */
1d442b
     int qidx;
1d442b
     int kick_fd;
1d442b
     int kill_fd; /* For killing the thread */
1d442b
+};
1d442b
 
1d442b
-    /* The element for the command currently being processed */
1d442b
-    VuVirtqElement *qe;
1d442b
+/* A FUSE request */
1d442b
+typedef struct {
1d442b
+    VuVirtqElement elem;
1d442b
+    struct fuse_chan ch;
1d442b
+
1d442b
+    /* Used to complete requests that involve no reply */
1d442b
     bool reply_sent;
1d442b
-};
1d442b
+} FVRequest;
1d442b
 
1d442b
 /*
1d442b
  * We pass the dev element into libvhost-user
1d442b
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
1d442b
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
1d442b
                     struct iovec *iov, int count)
1d442b
 {
1d442b
-    VuVirtqElement *elem;
1d442b
-    VuVirtq *q;
1d442b
+    FVRequest *req = container_of(ch, FVRequest, ch);
1d442b
+    struct fv_QueueInfo *qi = ch->qi;
1d442b
+    VuDev *dev = &se->virtio_dev->dev;
1d442b
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
1d442b
+    VuVirtqElement *elem = &req->elem;
1d442b
     int ret = 0;
1d442b
 
1d442b
     assert(count >= 1);
1d442b
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
1d442b
 
1d442b
     /* unique == 0 is notification, which we don't support */
1d442b
     assert(out->unique);
1d442b
-    /* For virtio we always have ch */
1d442b
-    assert(ch);
1d442b
-    assert(!ch->qi->reply_sent);
1d442b
-    elem = ch->qi->qe;
1d442b
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
1d442b
+    assert(!req->reply_sent);
1d442b
 
1d442b
     /* The 'in' part of the elem is to qemu */
1d442b
     unsigned int in_num = elem->in_num;
1d442b
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
1d442b
     }
1d442b
 
1d442b
     copy_iov(iov, count, in_sg, in_num, tosend_len);
1d442b
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
1d442b
-    vu_queue_notify(&se->virtio_dev->dev, q);
1d442b
-    ch->qi->reply_sent = true;
1d442b
+
1d442b
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
+    pthread_mutex_lock(&qi->vq_lock);
1d442b
+    vu_queue_push(dev, q, elem, tosend_len);
1d442b
+    vu_queue_notify(dev, q);
1d442b
+    pthread_mutex_unlock(&qi->vq_lock);
1d442b
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
+
1d442b
+    req->reply_sent = true;
1d442b
 
1d442b
 err:
1d442b
     return ret;
1d442b
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
1d442b
                          struct iovec *iov, int count, struct fuse_bufvec *buf,
1d442b
                          size_t len)
1d442b
 {
1d442b
+    FVRequest *req = container_of(ch, FVRequest, ch);
1d442b
+    struct fv_QueueInfo *qi = ch->qi;
1d442b
+    VuDev *dev = &se->virtio_dev->dev;
1d442b
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
1d442b
+    VuVirtqElement *elem = &req->elem;
1d442b
     int ret = 0;
1d442b
-    VuVirtqElement *elem;
1d442b
-    VuVirtq *q;
1d442b
 
1d442b
     assert(count >= 1);
1d442b
     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
1d442b
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
1d442b
     /* unique == 0 is notification which we don't support */
1d442b
     assert(out->unique);
1d442b
 
1d442b
-    /* For virtio we always have ch */
1d442b
-    assert(ch);
1d442b
-    assert(!ch->qi->reply_sent);
1d442b
-    elem = ch->qi->qe;
1d442b
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
1d442b
+    assert(!req->reply_sent);
1d442b
 
1d442b
     /* The 'in' part of the elem is to qemu */
1d442b
     unsigned int in_num = elem->in_num;
1d442b
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
1d442b
 
1d442b
     ret = 0;
1d442b
 
1d442b
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
1d442b
-    vu_queue_notify(&se->virtio_dev->dev, q);
1d442b
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
+    pthread_mutex_lock(&qi->vq_lock);
1d442b
+    vu_queue_push(dev, q, elem, tosend_len);
1d442b
+    vu_queue_notify(dev, q);
1d442b
+    pthread_mutex_unlock(&qi->vq_lock);
1d442b
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
 
1d442b
 err:
1d442b
     if (ret == 0) {
1d442b
-        ch->qi->reply_sent = true;
1d442b
+        req->reply_sent = true;
1d442b
     }
1d442b
 
1d442b
     return ret;
1d442b
 }
1d442b
 
1d442b
+/* Process one FVRequest in a thread pool */
1d442b
+static void fv_queue_worker(gpointer data, gpointer user_data)
1d442b
+{
1d442b
+    struct fv_QueueInfo *qi = user_data;
1d442b
+    struct fuse_session *se = qi->virtio_dev->se;
1d442b
+    struct VuDev *dev = &qi->virtio_dev->dev;
1d442b
+    FVRequest *req = data;
1d442b
+    VuVirtqElement *elem = &req->elem;
1d442b
+    struct fuse_buf fbuf = {};
1d442b
+    bool allocated_bufv = false;
1d442b
+    struct fuse_bufvec bufv;
1d442b
+    struct fuse_bufvec *pbufv;
1d442b
+
1d442b
+    assert(se->bufsize > sizeof(struct fuse_in_header));
1d442b
+
1d442b
+    /*
1d442b
+     * An element contains one request and the space to send our response
1d442b
+     * They're spread over multiple descriptors in a scatter/gather set
1d442b
+     * and we can't trust the guest to keep them still; so copy in/out.
1d442b
+     */
1d442b
+    fbuf.mem = malloc(se->bufsize);
1d442b
+    assert(fbuf.mem);
1d442b
+
1d442b
+    fuse_mutex_init(&req->ch.lock);
1d442b
+    req->ch.fd = -1;
1d442b
+    req->ch.qi = qi;
1d442b
+
1d442b
+    /* The 'out' part of the elem is from qemu */
1d442b
+    unsigned int out_num = elem->out_num;
1d442b
+    struct iovec *out_sg = elem->out_sg;
1d442b
+    size_t out_len = iov_size(out_sg, out_num);
1d442b
+    fuse_log(FUSE_LOG_DEBUG,
1d442b
+             "%s: elem %d: with %d out desc of length %zd\n",
1d442b
+             __func__, elem->index, out_num, out_len);
1d442b
+
1d442b
+    /*
1d442b
+     * The elem should contain a 'fuse_in_header' (in to fuse)
1d442b
+     * plus the data based on the len in the header.
1d442b
+     */
1d442b
+    if (out_len < sizeof(struct fuse_in_header)) {
1d442b
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
1d442b
+                 __func__, elem->index);
1d442b
+        assert(0); /* TODO */
1d442b
+    }
1d442b
+    if (out_len > se->bufsize) {
1d442b
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
1d442b
+                 elem->index);
1d442b
+        assert(0); /* TODO */
1d442b
+    }
1d442b
+    /* Copy just the first element and look at it */
1d442b
+    copy_from_iov(&fbuf, 1, out_sg);
1d442b
+
1d442b
+    pbufv = NULL; /* Compiler thinks an unitialised path */
1d442b
+    if (out_num > 2 &&
1d442b
+        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
1d442b
+        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
1d442b
+        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
1d442b
+        /*
1d442b
+         * For a write we don't actually need to copy the
1d442b
+         * data, we can just do it straight out of guest memory
1d442b
+         * but we must still copy the headers in case the guest
1d442b
+         * was nasty and changed them while we were using them.
1d442b
+         */
1d442b
+        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
1d442b
+
1d442b
+        /* copy the fuse_write_in header afte rthe fuse_in_header */
1d442b
+        fbuf.mem += out_sg->iov_len;
1d442b
+        copy_from_iov(&fbuf, 1, out_sg + 1);
1d442b
+        fbuf.mem -= out_sg->iov_len;
1d442b
+        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
1d442b
+
1d442b
+        /* Allocate the bufv, with space for the rest of the iov */
1d442b
+        pbufv = malloc(sizeof(struct fuse_bufvec) +
1d442b
+                       sizeof(struct fuse_buf) * (out_num - 2));
1d442b
+        if (!pbufv) {
1d442b
+            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
1d442b
+                    __func__);
1d442b
+            goto out;
1d442b
+        }
1d442b
+
1d442b
+        allocated_bufv = true;
1d442b
+        pbufv->count = 1;
1d442b
+        pbufv->buf[0] = fbuf;
1d442b
+
1d442b
+        size_t iovindex, pbufvindex;
1d442b
+        iovindex = 2; /* 2 headers, separate iovs */
1d442b
+        pbufvindex = 1; /* 2 headers, 1 fusebuf */
1d442b
+
1d442b
+        for (; iovindex < out_num; iovindex++, pbufvindex++) {
1d442b
+            pbufv->count++;
1d442b
+            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
1d442b
+            pbufv->buf[pbufvindex].flags = 0;
1d442b
+            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
1d442b
+            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
1d442b
+        }
1d442b
+    } else {
1d442b
+        /* Normal (non fast write) path */
1d442b
+
1d442b
+        /* Copy the rest of the buffer */
1d442b
+        fbuf.mem += out_sg->iov_len;
1d442b
+        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
1d442b
+        fbuf.mem -= out_sg->iov_len;
1d442b
+        fbuf.size = out_len;
1d442b
+
1d442b
+        /* TODO! Endianness of header */
1d442b
+
1d442b
+        /* TODO: Add checks for fuse_session_exited */
1d442b
+        bufv.buf[0] = fbuf;
1d442b
+        bufv.count = 1;
1d442b
+        pbufv = &buf;;
1d442b
+    }
1d442b
+    pbufv->idx = 0;
1d442b
+    pbufv->off = 0;
1d442b
+    fuse_session_process_buf_int(se, pbufv, &req->ch);
1d442b
+
1d442b
+out:
1d442b
+    if (allocated_bufv) {
1d442b
+        free(pbufv);
1d442b
+    }
1d442b
+
1d442b
+    /* If the request has no reply, still recycle the virtqueue element */
1d442b
+    if (!req->reply_sent) {
1d442b
+        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
1d442b
+
1d442b
+        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
1d442b
+                 elem->index);
1d442b
+
1d442b
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
+        pthread_mutex_lock(&qi->vq_lock);
1d442b
+        vu_queue_push(dev, q, elem, 0);
1d442b
+        vu_queue_notify(dev, q);
1d442b
+        pthread_mutex_unlock(&qi->vq_lock);
1d442b
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
+    }
1d442b
+
1d442b
+    pthread_mutex_destroy(&req->ch.lock);
1d442b
+    free(fbuf.mem);
1d442b
+    free(req);
1d442b
+}
1d442b
+
1d442b
 /* Thread function for individual queues, created when a queue is 'started' */
1d442b
 static void *fv_queue_thread(void *opaque)
1d442b
 {
1d442b
     struct fv_QueueInfo *qi = opaque;
1d442b
     struct VuDev *dev = &qi->virtio_dev->dev;
1d442b
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
1d442b
-    struct fuse_session *se = qi->virtio_dev->se;
1d442b
-    struct fuse_chan ch;
1d442b
-    struct fuse_buf fbuf;
1d442b
+    GThreadPool *pool;
1d442b
 
1d442b
-    fbuf.mem = NULL;
1d442b
-    fbuf.flags = 0;
1d442b
-
1d442b
-    fuse_mutex_init(&ch.lock);
1d442b
-    ch.fd = (int)0xdaff0d111;
1d442b
-    ch.qi = qi;
1d442b
+    pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
1d442b
+                             TRUE, NULL);
1d442b
+    if (!pool) {
1d442b
+        fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
1d442b
+        return NULL;
1d442b
+    }
1d442b
 
1d442b
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
1d442b
              qi->qidx, qi->kick_fd);
1d442b
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
1d442b
         /* Mutual exclusion with virtio_loop() */
1d442b
         ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
         assert(ret == 0); /* there is no possible error case */
1d442b
+        pthread_mutex_lock(&qi->vq_lock);
1d442b
         /* out is from guest, in is too guest */
1d442b
         unsigned int in_bytes, out_bytes;
1d442b
         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
1d442b
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
1d442b
                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
1d442b
                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
1d442b
 
1d442b
-
1d442b
         while (1) {
1d442b
-            bool allocated_bufv = false;
1d442b
-            struct fuse_bufvec bufv;
1d442b
-            struct fuse_bufvec *pbufv;
1d442b
-
1d442b
-            /*
1d442b
-             * An element contains one request and the space to send our
1d442b
-             * response They're spread over multiple descriptors in a
1d442b
-             * scatter/gather set and we can't trust the guest to keep them
1d442b
-             * still; so copy in/out.
1d442b
-             */
1d442b
-            VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
1d442b
-            if (!elem) {
1d442b
+            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
1d442b
+            if (!req) {
1d442b
                 break;
1d442b
             }
1d442b
 
1d442b
-            qi->qe = elem;
1d442b
-            qi->reply_sent = false;
1d442b
+            req->reply_sent = false;
1d442b
 
1d442b
-            if (!fbuf.mem) {
1d442b
-                fbuf.mem = malloc(se->bufsize);
1d442b
-                assert(fbuf.mem);
1d442b
-                assert(se->bufsize > sizeof(struct fuse_in_header));
1d442b
-            }
1d442b
-            /* The 'out' part of the elem is from qemu */
1d442b
-            unsigned int out_num = elem->out_num;
1d442b
-            struct iovec *out_sg = elem->out_sg;
1d442b
-            size_t out_len = iov_size(out_sg, out_num);
1d442b
-            fuse_log(FUSE_LOG_DEBUG,
1d442b
-                     "%s: elem %d: with %d out desc of length %zd\n", __func__,
1d442b
-                     elem->index, out_num, out_len);
1d442b
-
1d442b
-            /*
1d442b
-             * The elem should contain a 'fuse_in_header' (in to fuse)
1d442b
-             * plus the data based on the len in the header.
1d442b
-             */
1d442b
-            if (out_len < sizeof(struct fuse_in_header)) {
1d442b
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
1d442b
-                         __func__, elem->index);
1d442b
-                assert(0); /* TODO */
1d442b
-            }
1d442b
-            if (out_len > se->bufsize) {
1d442b
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
1d442b
-                         __func__, elem->index);
1d442b
-                assert(0); /* TODO */
1d442b
-            }
1d442b
-            /* Copy just the first element and look at it */
1d442b
-            copy_from_iov(&fbuf, 1, out_sg);
1d442b
-
1d442b
-            if (out_num > 2 &&
1d442b
-                out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
1d442b
-                ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
1d442b
-                out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
1d442b
-                /*
1d442b
-                 * For a write we don't actually need to copy the
1d442b
-                 * data, we can just do it straight out of guest memory
1d442b
-                 * but we must still copy the headers in case the guest
1d442b
-                 * was nasty and changed them while we were using them.
1d442b
-                 */
1d442b
-                fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
1d442b
-
1d442b
-                /* copy the fuse_write_in header after the fuse_in_header */
1d442b
-                fbuf.mem += out_sg->iov_len;
1d442b
-                copy_from_iov(&fbuf, 1, out_sg + 1);
1d442b
-                fbuf.mem -= out_sg->iov_len;
1d442b
-                fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
1d442b
-
1d442b
-                /* Allocate the bufv, with space for the rest of the iov */
1d442b
-                allocated_bufv = true;
1d442b
-                pbufv = malloc(sizeof(struct fuse_bufvec) +
1d442b
-                               sizeof(struct fuse_buf) * (out_num - 2));
1d442b
-                if (!pbufv) {
1d442b
-                    vu_queue_unpop(dev, q, elem, 0);
1d442b
-                    free(elem);
1d442b
-                    fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
1d442b
-                             __func__);
1d442b
-                    goto out;
1d442b
-                }
1d442b
-
1d442b
-                pbufv->count = 1;
1d442b
-                pbufv->buf[0] = fbuf;
1d442b
-
1d442b
-                size_t iovindex, pbufvindex;
1d442b
-                iovindex = 2; /* 2 headers, separate iovs */
1d442b
-                pbufvindex = 1; /* 2 headers, 1 fusebuf */
1d442b
-
1d442b
-                for (; iovindex < out_num; iovindex++, pbufvindex++) {
1d442b
-                    pbufv->count++;
1d442b
-                    pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
1d442b
-                    pbufv->buf[pbufvindex].flags = 0;
1d442b
-                    pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
1d442b
-                    pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
1d442b
-                }
1d442b
-            } else {
1d442b
-                /* Normal (non fast write) path */
1d442b
-
1d442b
-                /* Copy the rest of the buffer */
1d442b
-                fbuf.mem += out_sg->iov_len;
1d442b
-                copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
1d442b
-                fbuf.mem -= out_sg->iov_len;
1d442b
-                fbuf.size = out_len;
1d442b
-
1d442b
-                /* TODO! Endianness of header */
1d442b
-
1d442b
-                /* TODO: Add checks for fuse_session_exited */
1d442b
-                bufv.buf[0] = fbuf;
1d442b
-                bufv.count = 1;
1d442b
-                pbufv = &buf;;
1d442b
-            }
1d442b
-            pbufv->idx = 0;
1d442b
-            pbufv->off = 0;
1d442b
-            fuse_session_process_buf_int(se, pbufv, &ch);
1d442b
-
1d442b
-            if (allocated_bufv) {
1d442b
-                free(pbufv);
1d442b
-            }
1d442b
-
1d442b
-            if (!qi->reply_sent) {
1d442b
-                fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
1d442b
-                         __func__, elem->index);
1d442b
-                /* I think we've still got to recycle the element */
1d442b
-                vu_queue_push(dev, q, elem, 0);
1d442b
-                vu_queue_notify(dev, q);
1d442b
-            }
1d442b
-            qi->qe = NULL;
1d442b
-            free(elem);
1d442b
-            elem = NULL;
1d442b
+            g_thread_pool_push(pool, req, NULL);
1d442b
         }
1d442b
 
1d442b
+        pthread_mutex_unlock(&qi->vq_lock);
1d442b
         pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
1d442b
     }
1d442b
-out:
1d442b
-    pthread_mutex_destroy(&ch.lock);
1d442b
-    free(fbuf.mem);
1d442b
+
1d442b
+    g_thread_pool_free(pool, FALSE, TRUE);
1d442b
 
1d442b
     return NULL;
1d442b
 }
1d442b
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
1d442b
         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
1d442b
                  __func__, qidx, ret);
1d442b
     }
1d442b
+    pthread_mutex_destroy(&ourqi->vq_lock);
1d442b
     close(ourqi->kill_fd);
1d442b
     ourqi->kick_fd = -1;
1d442b
     free(vud->qi[qidx]);
1d442b
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
1d442b
 
1d442b
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
1d442b
         assert(ourqi->kill_fd != -1);
1d442b
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
1d442b
+
1d442b
         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
1d442b
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
1d442b
                      __func__, qidx);