22c213
From b0db5e666aaa43eadff3e60a1ada704f33b03074 Mon Sep 17 00:00:00 2001
22c213
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
22c213
Date: Mon, 27 Jan 2020 19:02:19 +0100
22c213
Subject: [PATCH 108/116] virtiofsd: process requests in a thread pool
22c213
MIME-Version: 1.0
22c213
Content-Type: text/plain; charset=UTF-8
22c213
Content-Transfer-Encoding: 8bit
22c213
22c213
RH-Author: Dr. David Alan Gilbert <dgilbert@redhat.com>
22c213
Message-id: <20200127190227.40942-105-dgilbert@redhat.com>
22c213
Patchwork-id: 93554
22c213
O-Subject: [RHEL-AV-8.2 qemu-kvm PATCH 104/112] virtiofsd: process requests in a thread pool
22c213
Bugzilla: 1694164
22c213
RH-Acked-by: Philippe Mathieu-Daudé <philmd@redhat.com>
22c213
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
22c213
RH-Acked-by: Sergio Lopez Pascual <slp@redhat.com>
22c213
22c213
From: Stefan Hajnoczi <stefanha@redhat.com>
22c213
22c213
Introduce a thread pool so that fv_queue_thread() just pops
22c213
VuVirtqElements and hands them to the thread pool.  For the time being
22c213
only one worker thread is allowed since passthrough_ll.c is not
22c213
thread-safe yet.  Future patches will lift this restriction so that
22c213
multiple FUSE requests can be processed in parallel.
22c213
22c213
The main new concept is struct FVRequest, which contains both
22c213
VuVirtqElement and struct fuse_chan.  We now have fv_VuDev for a device,
22c213
fv_QueueInfo for a virtqueue, and FVRequest for a request.  Some of
22c213
fv_QueueInfo's fields are moved into FVRequest because they are
22c213
per-request.  The name FVRequest conforms to QEMU coding style and I
22c213
expect the struct fv_* types will be renamed in a future refactoring.
22c213
22c213
This patch series is not optimal.  fbuf reuse is dropped so each request
22c213
does malloc(se->bufsize), but there is no clean and cheap way to keep
22c213
this with a thread pool.  The vq_lock mutex is held for longer than
22c213
necessary, especially during the eventfd_write() syscall.  Performance
22c213
can be improved in the future.
22c213
22c213
prctl(2) had to be added to the seccomp whitelist because glib invokes
22c213
it.
22c213
22c213
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
22c213
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
22c213
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
22c213
(cherry picked from commit a3d756c5aecccc4c0e51060a7e2f1c87bf8f1180)
22c213
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
22c213
---
22c213
 tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++++++-------------------
22c213
 1 file changed, 201 insertions(+), 158 deletions(-)
22c213
22c213
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
22c213
index f6242f9..0dcf2ef 100644
22c213
--- a/tools/virtiofsd/fuse_virtio.c
22c213
+++ b/tools/virtiofsd/fuse_virtio.c
22c213
@@ -22,6 +22,7 @@
22c213
 
22c213
 #include <assert.h>
22c213
 #include <errno.h>
22c213
+#include <glib.h>
22c213
 #include <stdint.h>
22c213
 #include <stdio.h>
22c213
 #include <stdlib.h>
22c213
@@ -37,17 +38,28 @@
22c213
 struct fv_VuDev;
22c213
 struct fv_QueueInfo {
22c213
     pthread_t thread;
22c213
+    /*
22c213
+     * This lock protects the VuVirtq preventing races between
22c213
+     * fv_queue_thread() and fv_queue_worker().
22c213
+     */
22c213
+    pthread_mutex_t vq_lock;
22c213
+
22c213
     struct fv_VuDev *virtio_dev;
22c213
 
22c213
     /* Our queue index, corresponds to array position */
22c213
     int qidx;
22c213
     int kick_fd;
22c213
     int kill_fd; /* For killing the thread */
22c213
+};
22c213
 
22c213
-    /* The element for the command currently being processed */
22c213
-    VuVirtqElement *qe;
22c213
+/* A FUSE request */
22c213
+typedef struct {
22c213
+    VuVirtqElement elem;
22c213
+    struct fuse_chan ch;
22c213
+
22c213
+    /* Used to complete requests that involve no reply */
22c213
     bool reply_sent;
22c213
-};
22c213
+} FVRequest;
22c213
 
22c213
 /*
22c213
  * We pass the dev element into libvhost-user
22c213
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
22c213
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
22c213
                     struct iovec *iov, int count)
22c213
 {
22c213
-    VuVirtqElement *elem;
22c213
-    VuVirtq *q;
22c213
+    FVRequest *req = container_of(ch, FVRequest, ch);
22c213
+    struct fv_QueueInfo *qi = ch->qi;
22c213
+    VuDev *dev = &se->virtio_dev->dev;
22c213
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
22c213
+    VuVirtqElement *elem = &req->elem;
22c213
     int ret = 0;
22c213
 
22c213
     assert(count >= 1);
22c213
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
22c213
 
22c213
     /* unique == 0 is notification, which we don't support */
22c213
     assert(out->unique);
22c213
-    /* For virtio we always have ch */
22c213
-    assert(ch);
22c213
-    assert(!ch->qi->reply_sent);
22c213
-    elem = ch->qi->qe;
22c213
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
22c213
+    assert(!req->reply_sent);
22c213
 
22c213
     /* The 'in' part of the elem is to qemu */
22c213
     unsigned int in_num = elem->in_num;
22c213
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
22c213
     }
22c213
 
22c213
     copy_iov(iov, count, in_sg, in_num, tosend_len);
22c213
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
22c213
-    vu_queue_notify(&se->virtio_dev->dev, q);
22c213
-    ch->qi->reply_sent = true;
22c213
+
22c213
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
+    pthread_mutex_lock(&qi->vq_lock);
22c213
+    vu_queue_push(dev, q, elem, tosend_len);
22c213
+    vu_queue_notify(dev, q);
22c213
+    pthread_mutex_unlock(&qi->vq_lock);
22c213
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
+
22c213
+    req->reply_sent = true;
22c213
 
22c213
 err:
22c213
     return ret;
22c213
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
22c213
                          struct iovec *iov, int count, struct fuse_bufvec *buf,
22c213
                          size_t len)
22c213
 {
22c213
+    FVRequest *req = container_of(ch, FVRequest, ch);
22c213
+    struct fv_QueueInfo *qi = ch->qi;
22c213
+    VuDev *dev = &se->virtio_dev->dev;
22c213
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
22c213
+    VuVirtqElement *elem = &req->elem;
22c213
     int ret = 0;
22c213
-    VuVirtqElement *elem;
22c213
-    VuVirtq *q;
22c213
 
22c213
     assert(count >= 1);
22c213
     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
22c213
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
22c213
     /* unique == 0 is notification which we don't support */
22c213
     assert(out->unique);
22c213
 
22c213
-    /* For virtio we always have ch */
22c213
-    assert(ch);
22c213
-    assert(!ch->qi->reply_sent);
22c213
-    elem = ch->qi->qe;
22c213
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
22c213
+    assert(!req->reply_sent);
22c213
 
22c213
     /* The 'in' part of the elem is to qemu */
22c213
     unsigned int in_num = elem->in_num;
22c213
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
22c213
 
22c213
     ret = 0;
22c213
 
22c213
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
22c213
-    vu_queue_notify(&se->virtio_dev->dev, q);
22c213
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
+    pthread_mutex_lock(&qi->vq_lock);
22c213
+    vu_queue_push(dev, q, elem, tosend_len);
22c213
+    vu_queue_notify(dev, q);
22c213
+    pthread_mutex_unlock(&qi->vq_lock);
22c213
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
 
22c213
 err:
22c213
     if (ret == 0) {
22c213
-        ch->qi->reply_sent = true;
22c213
+        req->reply_sent = true;
22c213
     }
22c213
 
22c213
     return ret;
22c213
 }
22c213
 
22c213
+/* Process one FVRequest in a thread pool */
22c213
+static void fv_queue_worker(gpointer data, gpointer user_data)
22c213
+{
22c213
+    struct fv_QueueInfo *qi = user_data;
22c213
+    struct fuse_session *se = qi->virtio_dev->se;
22c213
+    struct VuDev *dev = &qi->virtio_dev->dev;
22c213
+    FVRequest *req = data;
22c213
+    VuVirtqElement *elem = &req->elem;
22c213
+    struct fuse_buf fbuf = {};
22c213
+    bool allocated_bufv = false;
22c213
+    struct fuse_bufvec bufv;
22c213
+    struct fuse_bufvec *pbufv;
22c213
+
22c213
+    assert(se->bufsize > sizeof(struct fuse_in_header));
22c213
+
22c213
+    /*
22c213
+     * An element contains one request and the space to send our response
22c213
+     * They're spread over multiple descriptors in a scatter/gather set
22c213
+     * and we can't trust the guest to keep them still; so copy in/out.
22c213
+     */
22c213
+    fbuf.mem = malloc(se->bufsize);
22c213
+    assert(fbuf.mem);
22c213
+
22c213
+    fuse_mutex_init(&req->ch.lock);
22c213
+    req->ch.fd = -1;
22c213
+    req->ch.qi = qi;
22c213
+
22c213
+    /* The 'out' part of the elem is from qemu */
22c213
+    unsigned int out_num = elem->out_num;
22c213
+    struct iovec *out_sg = elem->out_sg;
22c213
+    size_t out_len = iov_size(out_sg, out_num);
22c213
+    fuse_log(FUSE_LOG_DEBUG,
22c213
+             "%s: elem %d: with %d out desc of length %zd\n",
22c213
+             __func__, elem->index, out_num, out_len);
22c213
+
22c213
+    /*
22c213
+     * The elem should contain a 'fuse_in_header' (in to fuse)
22c213
+     * plus the data based on the len in the header.
22c213
+     */
22c213
+    if (out_len < sizeof(struct fuse_in_header)) {
22c213
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
22c213
+                 __func__, elem->index);
22c213
+        assert(0); /* TODO */
22c213
+    }
22c213
+    if (out_len > se->bufsize) {
22c213
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
22c213
+                 elem->index);
22c213
+        assert(0); /* TODO */
22c213
+    }
22c213
+    /* Copy just the first element and look at it */
22c213
+    copy_from_iov(&fbuf, 1, out_sg);
22c213
+
22c213
+    pbufv = NULL; /* Compiler thinks an unitialised path */
22c213
+    if (out_num > 2 &&
22c213
+        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
22c213
+        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
22c213
+        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
22c213
+        /*
22c213
+         * For a write we don't actually need to copy the
22c213
+         * data, we can just do it straight out of guest memory
22c213
+         * but we must still copy the headers in case the guest
22c213
+         * was nasty and changed them while we were using them.
22c213
+         */
22c213
+        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
22c213
+
22c213
+        /* copy the fuse_write_in header afte rthe fuse_in_header */
22c213
+        fbuf.mem += out_sg->iov_len;
22c213
+        copy_from_iov(&fbuf, 1, out_sg + 1);
22c213
+        fbuf.mem -= out_sg->iov_len;
22c213
+        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
22c213
+
22c213
+        /* Allocate the bufv, with space for the rest of the iov */
22c213
+        pbufv = malloc(sizeof(struct fuse_bufvec) +
22c213
+                       sizeof(struct fuse_buf) * (out_num - 2));
22c213
+        if (!pbufv) {
22c213
+            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
22c213
+                    __func__);
22c213
+            goto out;
22c213
+        }
22c213
+
22c213
+        allocated_bufv = true;
22c213
+        pbufv->count = 1;
22c213
+        pbufv->buf[0] = fbuf;
22c213
+
22c213
+        size_t iovindex, pbufvindex;
22c213
+        iovindex = 2; /* 2 headers, separate iovs */
22c213
+        pbufvindex = 1; /* 2 headers, 1 fusebuf */
22c213
+
22c213
+        for (; iovindex < out_num; iovindex++, pbufvindex++) {
22c213
+            pbufv->count++;
22c213
+            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
22c213
+            pbufv->buf[pbufvindex].flags = 0;
22c213
+            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
22c213
+            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
22c213
+        }
22c213
+    } else {
22c213
+        /* Normal (non fast write) path */
22c213
+
22c213
+        /* Copy the rest of the buffer */
22c213
+        fbuf.mem += out_sg->iov_len;
22c213
+        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
22c213
+        fbuf.mem -= out_sg->iov_len;
22c213
+        fbuf.size = out_len;
22c213
+
22c213
+        /* TODO! Endianness of header */
22c213
+
22c213
+        /* TODO: Add checks for fuse_session_exited */
22c213
+        bufv.buf[0] = fbuf;
22c213
+        bufv.count = 1;
22c213
+        pbufv = &buf;;
22c213
+    }
22c213
+    pbufv->idx = 0;
22c213
+    pbufv->off = 0;
22c213
+    fuse_session_process_buf_int(se, pbufv, &req->ch);
22c213
+
22c213
+out:
22c213
+    if (allocated_bufv) {
22c213
+        free(pbufv);
22c213
+    }
22c213
+
22c213
+    /* If the request has no reply, still recycle the virtqueue element */
22c213
+    if (!req->reply_sent) {
22c213
+        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
22c213
+
22c213
+        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
22c213
+                 elem->index);
22c213
+
22c213
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
+        pthread_mutex_lock(&qi->vq_lock);
22c213
+        vu_queue_push(dev, q, elem, 0);
22c213
+        vu_queue_notify(dev, q);
22c213
+        pthread_mutex_unlock(&qi->vq_lock);
22c213
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
+    }
22c213
+
22c213
+    pthread_mutex_destroy(&req->ch.lock);
22c213
+    free(fbuf.mem);
22c213
+    free(req);
22c213
+}
22c213
+
22c213
 /* Thread function for individual queues, created when a queue is 'started' */
22c213
 static void *fv_queue_thread(void *opaque)
22c213
 {
22c213
     struct fv_QueueInfo *qi = opaque;
22c213
     struct VuDev *dev = &qi->virtio_dev->dev;
22c213
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
22c213
-    struct fuse_session *se = qi->virtio_dev->se;
22c213
-    struct fuse_chan ch;
22c213
-    struct fuse_buf fbuf;
22c213
+    GThreadPool *pool;
22c213
 
22c213
-    fbuf.mem = NULL;
22c213
-    fbuf.flags = 0;
22c213
-
22c213
-    fuse_mutex_init(&ch.lock);
22c213
-    ch.fd = (int)0xdaff0d111;
22c213
-    ch.qi = qi;
22c213
+    pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
22c213
+                             TRUE, NULL);
22c213
+    if (!pool) {
22c213
+        fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
22c213
+        return NULL;
22c213
+    }
22c213
 
22c213
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
22c213
              qi->qidx, qi->kick_fd);
22c213
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
22c213
         /* Mutual exclusion with virtio_loop() */
22c213
         ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
         assert(ret == 0); /* there is no possible error case */
22c213
+        pthread_mutex_lock(&qi->vq_lock);
22c213
         /* out is from guest, in is too guest */
22c213
         unsigned int in_bytes, out_bytes;
22c213
         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
22c213
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
22c213
                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
22c213
                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
22c213
 
22c213
-
22c213
         while (1) {
22c213
-            bool allocated_bufv = false;
22c213
-            struct fuse_bufvec bufv;
22c213
-            struct fuse_bufvec *pbufv;
22c213
-
22c213
-            /*
22c213
-             * An element contains one request and the space to send our
22c213
-             * response They're spread over multiple descriptors in a
22c213
-             * scatter/gather set and we can't trust the guest to keep them
22c213
-             * still; so copy in/out.
22c213
-             */
22c213
-            VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
22c213
-            if (!elem) {
22c213
+            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
22c213
+            if (!req) {
22c213
                 break;
22c213
             }
22c213
 
22c213
-            qi->qe = elem;
22c213
-            qi->reply_sent = false;
22c213
+            req->reply_sent = false;
22c213
 
22c213
-            if (!fbuf.mem) {
22c213
-                fbuf.mem = malloc(se->bufsize);
22c213
-                assert(fbuf.mem);
22c213
-                assert(se->bufsize > sizeof(struct fuse_in_header));
22c213
-            }
22c213
-            /* The 'out' part of the elem is from qemu */
22c213
-            unsigned int out_num = elem->out_num;
22c213
-            struct iovec *out_sg = elem->out_sg;
22c213
-            size_t out_len = iov_size(out_sg, out_num);
22c213
-            fuse_log(FUSE_LOG_DEBUG,
22c213
-                     "%s: elem %d: with %d out desc of length %zd\n", __func__,
22c213
-                     elem->index, out_num, out_len);
22c213
-
22c213
-            /*
22c213
-             * The elem should contain a 'fuse_in_header' (in to fuse)
22c213
-             * plus the data based on the len in the header.
22c213
-             */
22c213
-            if (out_len < sizeof(struct fuse_in_header)) {
22c213
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
22c213
-                         __func__, elem->index);
22c213
-                assert(0); /* TODO */
22c213
-            }
22c213
-            if (out_len > se->bufsize) {
22c213
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
22c213
-                         __func__, elem->index);
22c213
-                assert(0); /* TODO */
22c213
-            }
22c213
-            /* Copy just the first element and look at it */
22c213
-            copy_from_iov(&fbuf, 1, out_sg);
22c213
-
22c213
-            if (out_num > 2 &&
22c213
-                out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
22c213
-                ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
22c213
-                out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
22c213
-                /*
22c213
-                 * For a write we don't actually need to copy the
22c213
-                 * data, we can just do it straight out of guest memory
22c213
-                 * but we must still copy the headers in case the guest
22c213
-                 * was nasty and changed them while we were using them.
22c213
-                 */
22c213
-                fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
22c213
-
22c213
-                /* copy the fuse_write_in header after the fuse_in_header */
22c213
-                fbuf.mem += out_sg->iov_len;
22c213
-                copy_from_iov(&fbuf, 1, out_sg + 1);
22c213
-                fbuf.mem -= out_sg->iov_len;
22c213
-                fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
22c213
-
22c213
-                /* Allocate the bufv, with space for the rest of the iov */
22c213
-                allocated_bufv = true;
22c213
-                pbufv = malloc(sizeof(struct fuse_bufvec) +
22c213
-                               sizeof(struct fuse_buf) * (out_num - 2));
22c213
-                if (!pbufv) {
22c213
-                    vu_queue_unpop(dev, q, elem, 0);
22c213
-                    free(elem);
22c213
-                    fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
22c213
-                             __func__);
22c213
-                    goto out;
22c213
-                }
22c213
-
22c213
-                pbufv->count = 1;
22c213
-                pbufv->buf[0] = fbuf;
22c213
-
22c213
-                size_t iovindex, pbufvindex;
22c213
-                iovindex = 2; /* 2 headers, separate iovs */
22c213
-                pbufvindex = 1; /* 2 headers, 1 fusebuf */
22c213
-
22c213
-                for (; iovindex < out_num; iovindex++, pbufvindex++) {
22c213
-                    pbufv->count++;
22c213
-                    pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
22c213
-                    pbufv->buf[pbufvindex].flags = 0;
22c213
-                    pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
22c213
-                    pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
22c213
-                }
22c213
-            } else {
22c213
-                /* Normal (non fast write) path */
22c213
-
22c213
-                /* Copy the rest of the buffer */
22c213
-                fbuf.mem += out_sg->iov_len;
22c213
-                copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
22c213
-                fbuf.mem -= out_sg->iov_len;
22c213
-                fbuf.size = out_len;
22c213
-
22c213
-                /* TODO! Endianness of header */
22c213
-
22c213
-                /* TODO: Add checks for fuse_session_exited */
22c213
-                bufv.buf[0] = fbuf;
22c213
-                bufv.count = 1;
22c213
-                pbufv = &buf;;
22c213
-            }
22c213
-            pbufv->idx = 0;
22c213
-            pbufv->off = 0;
22c213
-            fuse_session_process_buf_int(se, pbufv, &ch);
22c213
-
22c213
-            if (allocated_bufv) {
22c213
-                free(pbufv);
22c213
-            }
22c213
-
22c213
-            if (!qi->reply_sent) {
22c213
-                fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
22c213
-                         __func__, elem->index);
22c213
-                /* I think we've still got to recycle the element */
22c213
-                vu_queue_push(dev, q, elem, 0);
22c213
-                vu_queue_notify(dev, q);
22c213
-            }
22c213
-            qi->qe = NULL;
22c213
-            free(elem);
22c213
-            elem = NULL;
22c213
+            g_thread_pool_push(pool, req, NULL);
22c213
         }
22c213
 
22c213
+        pthread_mutex_unlock(&qi->vq_lock);
22c213
         pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
22c213
     }
22c213
-out:
22c213
-    pthread_mutex_destroy(&ch.lock);
22c213
-    free(fbuf.mem);
22c213
+
22c213
+    g_thread_pool_free(pool, FALSE, TRUE);
22c213
 
22c213
     return NULL;
22c213
 }
22c213
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
22c213
         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
22c213
                  __func__, qidx, ret);
22c213
     }
22c213
+    pthread_mutex_destroy(&ourqi->vq_lock);
22c213
     close(ourqi->kill_fd);
22c213
     ourqi->kick_fd = -1;
22c213
     free(vud->qi[qidx]);
22c213
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
22c213
 
22c213
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
22c213
         assert(ourqi->kill_fd != -1);
22c213
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
22c213
+
22c213
         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
22c213
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
22c213
                      __func__, qidx);
22c213
-- 
22c213
1.8.3.1
22c213