ddf19c
From b0db5e666aaa43eadff3e60a1ada704f33b03074 Mon Sep 17 00:00:00 2001
ddf19c
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
ddf19c
Date: Mon, 27 Jan 2020 19:02:19 +0100
ddf19c
Subject: [PATCH 108/116] virtiofsd: process requests in a thread pool
ddf19c
MIME-Version: 1.0
ddf19c
Content-Type: text/plain; charset=UTF-8
ddf19c
Content-Transfer-Encoding: 8bit
ddf19c
ddf19c
RH-Author: Dr. David Alan Gilbert <dgilbert@redhat.com>
ddf19c
Message-id: <20200127190227.40942-105-dgilbert@redhat.com>
ddf19c
Patchwork-id: 93554
ddf19c
O-Subject: [RHEL-AV-8.2 qemu-kvm PATCH 104/112] virtiofsd: process requests in a thread pool
ddf19c
Bugzilla: 1694164
ddf19c
RH-Acked-by: Philippe Mathieu-Daudé <philmd@redhat.com>
ddf19c
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
ddf19c
RH-Acked-by: Sergio Lopez Pascual <slp@redhat.com>
ddf19c
ddf19c
From: Stefan Hajnoczi <stefanha@redhat.com>
ddf19c
ddf19c
Introduce a thread pool so that fv_queue_thread() just pops
ddf19c
VuVirtqElements and hands them to the thread pool.  For the time being
ddf19c
only one worker thread is allowed since passthrough_ll.c is not
ddf19c
thread-safe yet.  Future patches will lift this restriction so that
ddf19c
multiple FUSE requests can be processed in parallel.
ddf19c
ddf19c
The main new concept is struct FVRequest, which contains both
ddf19c
VuVirtqElement and struct fuse_chan.  We now have fv_VuDev for a device,
ddf19c
fv_QueueInfo for a virtqueue, and FVRequest for a request.  Some of
ddf19c
fv_QueueInfo's fields are moved into FVRequest because they are
ddf19c
per-request.  The name FVRequest conforms to QEMU coding style and I
ddf19c
expect the struct fv_* types will be renamed in a future refactoring.
ddf19c
ddf19c
This patch series is not optimal.  fbuf reuse is dropped so each request
ddf19c
does malloc(se->bufsize), but there is no clean and cheap way to keep
ddf19c
this with a thread pool.  The vq_lock mutex is held for longer than
ddf19c
necessary, especially during the eventfd_write() syscall.  Performance
ddf19c
can be improved in the future.
ddf19c
ddf19c
prctl(2) had to be added to the seccomp whitelist because glib invokes
ddf19c
it.
ddf19c
ddf19c
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
ddf19c
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
ddf19c
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
ddf19c
(cherry picked from commit a3d756c5aecccc4c0e51060a7e2f1c87bf8f1180)
ddf19c
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
ddf19c
---
ddf19c
 tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++++++-------------------
ddf19c
 1 file changed, 201 insertions(+), 158 deletions(-)
ddf19c
ddf19c
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
ddf19c
index f6242f9..0dcf2ef 100644
ddf19c
--- a/tools/virtiofsd/fuse_virtio.c
ddf19c
+++ b/tools/virtiofsd/fuse_virtio.c
ddf19c
@@ -22,6 +22,7 @@
ddf19c
 
ddf19c
 #include <assert.h>
ddf19c
 #include <errno.h>
ddf19c
+#include <glib.h>
ddf19c
 #include <stdint.h>
ddf19c
 #include <stdio.h>
ddf19c
 #include <stdlib.h>
ddf19c
@@ -37,17 +38,28 @@
ddf19c
 struct fv_VuDev;
ddf19c
 struct fv_QueueInfo {
ddf19c
     pthread_t thread;
ddf19c
+    /*
ddf19c
+     * This lock protects the VuVirtq preventing races between
ddf19c
+     * fv_queue_thread() and fv_queue_worker().
ddf19c
+     */
ddf19c
+    pthread_mutex_t vq_lock;
ddf19c
+
ddf19c
     struct fv_VuDev *virtio_dev;
ddf19c
 
ddf19c
     /* Our queue index, corresponds to array position */
ddf19c
     int qidx;
ddf19c
     int kick_fd;
ddf19c
     int kill_fd; /* For killing the thread */
ddf19c
+};
ddf19c
 
ddf19c
-    /* The element for the command currently being processed */
ddf19c
-    VuVirtqElement *qe;
ddf19c
+/* A FUSE request */
ddf19c
+typedef struct {
ddf19c
+    VuVirtqElement elem;
ddf19c
+    struct fuse_chan ch;
ddf19c
+
ddf19c
+    /* Used to complete requests that involve no reply */
ddf19c
     bool reply_sent;
ddf19c
-};
ddf19c
+} FVRequest;
ddf19c
 
ddf19c
 /*
ddf19c
  * We pass the dev element into libvhost-user
ddf19c
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
ddf19c
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
ddf19c
                     struct iovec *iov, int count)
ddf19c
 {
ddf19c
-    VuVirtqElement *elem;
ddf19c
-    VuVirtq *q;
ddf19c
+    FVRequest *req = container_of(ch, FVRequest, ch);
ddf19c
+    struct fv_QueueInfo *qi = ch->qi;
ddf19c
+    VuDev *dev = &se->virtio_dev->dev;
ddf19c
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
ddf19c
+    VuVirtqElement *elem = &req->elem;
ddf19c
     int ret = 0;
ddf19c
 
ddf19c
     assert(count >= 1);
ddf19c
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
ddf19c
 
ddf19c
     /* unique == 0 is notification, which we don't support */
ddf19c
     assert(out->unique);
ddf19c
-    /* For virtio we always have ch */
ddf19c
-    assert(ch);
ddf19c
-    assert(!ch->qi->reply_sent);
ddf19c
-    elem = ch->qi->qe;
ddf19c
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
ddf19c
+    assert(!req->reply_sent);
ddf19c
 
ddf19c
     /* The 'in' part of the elem is to qemu */
ddf19c
     unsigned int in_num = elem->in_num;
ddf19c
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
ddf19c
     }
ddf19c
 
ddf19c
     copy_iov(iov, count, in_sg, in_num, tosend_len);
ddf19c
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
ddf19c
-    vu_queue_notify(&se->virtio_dev->dev, q);
ddf19c
-    ch->qi->reply_sent = true;
ddf19c
+
ddf19c
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
+    pthread_mutex_lock(&qi->vq_lock);
ddf19c
+    vu_queue_push(dev, q, elem, tosend_len);
ddf19c
+    vu_queue_notify(dev, q);
ddf19c
+    pthread_mutex_unlock(&qi->vq_lock);
ddf19c
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
+
ddf19c
+    req->reply_sent = true;
ddf19c
 
ddf19c
 err:
ddf19c
     return ret;
ddf19c
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
ddf19c
                          struct iovec *iov, int count, struct fuse_bufvec *buf,
ddf19c
                          size_t len)
ddf19c
 {
ddf19c
+    FVRequest *req = container_of(ch, FVRequest, ch);
ddf19c
+    struct fv_QueueInfo *qi = ch->qi;
ddf19c
+    VuDev *dev = &se->virtio_dev->dev;
ddf19c
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
ddf19c
+    VuVirtqElement *elem = &req->elem;
ddf19c
     int ret = 0;
ddf19c
-    VuVirtqElement *elem;
ddf19c
-    VuVirtq *q;
ddf19c
 
ddf19c
     assert(count >= 1);
ddf19c
     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
ddf19c
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
ddf19c
     /* unique == 0 is notification which we don't support */
ddf19c
     assert(out->unique);
ddf19c
 
ddf19c
-    /* For virtio we always have ch */
ddf19c
-    assert(ch);
ddf19c
-    assert(!ch->qi->reply_sent);
ddf19c
-    elem = ch->qi->qe;
ddf19c
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
ddf19c
+    assert(!req->reply_sent);
ddf19c
 
ddf19c
     /* The 'in' part of the elem is to qemu */
ddf19c
     unsigned int in_num = elem->in_num;
ddf19c
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
ddf19c
 
ddf19c
     ret = 0;
ddf19c
 
ddf19c
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
ddf19c
-    vu_queue_notify(&se->virtio_dev->dev, q);
ddf19c
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
+    pthread_mutex_lock(&qi->vq_lock);
ddf19c
+    vu_queue_push(dev, q, elem, tosend_len);
ddf19c
+    vu_queue_notify(dev, q);
ddf19c
+    pthread_mutex_unlock(&qi->vq_lock);
ddf19c
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
 
ddf19c
 err:
ddf19c
     if (ret == 0) {
ddf19c
-        ch->qi->reply_sent = true;
ddf19c
+        req->reply_sent = true;
ddf19c
     }
ddf19c
 
ddf19c
     return ret;
ddf19c
 }
ddf19c
 
ddf19c
+/* Process one FVRequest in a thread pool */
ddf19c
+static void fv_queue_worker(gpointer data, gpointer user_data)
ddf19c
+{
ddf19c
+    struct fv_QueueInfo *qi = user_data;
ddf19c
+    struct fuse_session *se = qi->virtio_dev->se;
ddf19c
+    struct VuDev *dev = &qi->virtio_dev->dev;
ddf19c
+    FVRequest *req = data;
ddf19c
+    VuVirtqElement *elem = &req->elem;
ddf19c
+    struct fuse_buf fbuf = {};
ddf19c
+    bool allocated_bufv = false;
ddf19c
+    struct fuse_bufvec bufv;
ddf19c
+    struct fuse_bufvec *pbufv;
ddf19c
+
ddf19c
+    assert(se->bufsize > sizeof(struct fuse_in_header));
ddf19c
+
ddf19c
+    /*
ddf19c
+     * An element contains one request and the space to send our response
ddf19c
+     * They're spread over multiple descriptors in a scatter/gather set
ddf19c
+     * and we can't trust the guest to keep them still; so copy in/out.
ddf19c
+     */
ddf19c
+    fbuf.mem = malloc(se->bufsize);
ddf19c
+    assert(fbuf.mem);
ddf19c
+
ddf19c
+    fuse_mutex_init(&req->ch.lock);
ddf19c
+    req->ch.fd = -1;
ddf19c
+    req->ch.qi = qi;
ddf19c
+
ddf19c
+    /* The 'out' part of the elem is from qemu */
ddf19c
+    unsigned int out_num = elem->out_num;
ddf19c
+    struct iovec *out_sg = elem->out_sg;
ddf19c
+    size_t out_len = iov_size(out_sg, out_num);
ddf19c
+    fuse_log(FUSE_LOG_DEBUG,
ddf19c
+             "%s: elem %d: with %d out desc of length %zd\n",
ddf19c
+             __func__, elem->index, out_num, out_len);
ddf19c
+
ddf19c
+    /*
ddf19c
+     * The elem should contain a 'fuse_in_header' (in to fuse)
ddf19c
+     * plus the data based on the len in the header.
ddf19c
+     */
ddf19c
+    if (out_len < sizeof(struct fuse_in_header)) {
ddf19c
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
ddf19c
+                 __func__, elem->index);
ddf19c
+        assert(0); /* TODO */
ddf19c
+    }
ddf19c
+    if (out_len > se->bufsize) {
ddf19c
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
ddf19c
+                 elem->index);
ddf19c
+        assert(0); /* TODO */
ddf19c
+    }
ddf19c
+    /* Copy just the first element and look at it */
ddf19c
+    copy_from_iov(&fbuf, 1, out_sg);
ddf19c
+
ddf19c
+    pbufv = NULL; /* Compiler thinks an unitialised path */
ddf19c
+    if (out_num > 2 &&
ddf19c
+        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
ddf19c
+        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
ddf19c
+        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
ddf19c
+        /*
ddf19c
+         * For a write we don't actually need to copy the
ddf19c
+         * data, we can just do it straight out of guest memory
ddf19c
+         * but we must still copy the headers in case the guest
ddf19c
+         * was nasty and changed them while we were using them.
ddf19c
+         */
ddf19c
+        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
ddf19c
+
ddf19c
+        /* copy the fuse_write_in header afte rthe fuse_in_header */
ddf19c
+        fbuf.mem += out_sg->iov_len;
ddf19c
+        copy_from_iov(&fbuf, 1, out_sg + 1);
ddf19c
+        fbuf.mem -= out_sg->iov_len;
ddf19c
+        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
ddf19c
+
ddf19c
+        /* Allocate the bufv, with space for the rest of the iov */
ddf19c
+        pbufv = malloc(sizeof(struct fuse_bufvec) +
ddf19c
+                       sizeof(struct fuse_buf) * (out_num - 2));
ddf19c
+        if (!pbufv) {
ddf19c
+            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
ddf19c
+                    __func__);
ddf19c
+            goto out;
ddf19c
+        }
ddf19c
+
ddf19c
+        allocated_bufv = true;
ddf19c
+        pbufv->count = 1;
ddf19c
+        pbufv->buf[0] = fbuf;
ddf19c
+
ddf19c
+        size_t iovindex, pbufvindex;
ddf19c
+        iovindex = 2; /* 2 headers, separate iovs */
ddf19c
+        pbufvindex = 1; /* 2 headers, 1 fusebuf */
ddf19c
+
ddf19c
+        for (; iovindex < out_num; iovindex++, pbufvindex++) {
ddf19c
+            pbufv->count++;
ddf19c
+            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
ddf19c
+            pbufv->buf[pbufvindex].flags = 0;
ddf19c
+            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
ddf19c
+            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
ddf19c
+        }
ddf19c
+    } else {
ddf19c
+        /* Normal (non fast write) path */
ddf19c
+
ddf19c
+        /* Copy the rest of the buffer */
ddf19c
+        fbuf.mem += out_sg->iov_len;
ddf19c
+        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
ddf19c
+        fbuf.mem -= out_sg->iov_len;
ddf19c
+        fbuf.size = out_len;
ddf19c
+
ddf19c
+        /* TODO! Endianness of header */
ddf19c
+
ddf19c
+        /* TODO: Add checks for fuse_session_exited */
ddf19c
+        bufv.buf[0] = fbuf;
ddf19c
+        bufv.count = 1;
ddf19c
+        pbufv = &buf;;
ddf19c
+    }
ddf19c
+    pbufv->idx = 0;
ddf19c
+    pbufv->off = 0;
ddf19c
+    fuse_session_process_buf_int(se, pbufv, &req->ch);
ddf19c
+
ddf19c
+out:
ddf19c
+    if (allocated_bufv) {
ddf19c
+        free(pbufv);
ddf19c
+    }
ddf19c
+
ddf19c
+    /* If the request has no reply, still recycle the virtqueue element */
ddf19c
+    if (!req->reply_sent) {
ddf19c
+        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
ddf19c
+
ddf19c
+        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
ddf19c
+                 elem->index);
ddf19c
+
ddf19c
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
+        pthread_mutex_lock(&qi->vq_lock);
ddf19c
+        vu_queue_push(dev, q, elem, 0);
ddf19c
+        vu_queue_notify(dev, q);
ddf19c
+        pthread_mutex_unlock(&qi->vq_lock);
ddf19c
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
+    }
ddf19c
+
ddf19c
+    pthread_mutex_destroy(&req->ch.lock);
ddf19c
+    free(fbuf.mem);
ddf19c
+    free(req);
ddf19c
+}
ddf19c
+
ddf19c
 /* Thread function for individual queues, created when a queue is 'started' */
ddf19c
 static void *fv_queue_thread(void *opaque)
ddf19c
 {
ddf19c
     struct fv_QueueInfo *qi = opaque;
ddf19c
     struct VuDev *dev = &qi->virtio_dev->dev;
ddf19c
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
ddf19c
-    struct fuse_session *se = qi->virtio_dev->se;
ddf19c
-    struct fuse_chan ch;
ddf19c
-    struct fuse_buf fbuf;
ddf19c
+    GThreadPool *pool;
ddf19c
 
ddf19c
-    fbuf.mem = NULL;
ddf19c
-    fbuf.flags = 0;
ddf19c
-
ddf19c
-    fuse_mutex_init(&ch.lock);
ddf19c
-    ch.fd = (int)0xdaff0d111;
ddf19c
-    ch.qi = qi;
ddf19c
+    pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
ddf19c
+                             TRUE, NULL);
ddf19c
+    if (!pool) {
ddf19c
+        fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
ddf19c
+        return NULL;
ddf19c
+    }
ddf19c
 
ddf19c
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
ddf19c
              qi->qidx, qi->kick_fd);
ddf19c
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
ddf19c
         /* Mutual exclusion with virtio_loop() */
ddf19c
         ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
         assert(ret == 0); /* there is no possible error case */
ddf19c
+        pthread_mutex_lock(&qi->vq_lock);
ddf19c
         /* out is from guest, in is too guest */
ddf19c
         unsigned int in_bytes, out_bytes;
ddf19c
         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
ddf19c
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
ddf19c
                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
ddf19c
                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
ddf19c
 
ddf19c
-
ddf19c
         while (1) {
ddf19c
-            bool allocated_bufv = false;
ddf19c
-            struct fuse_bufvec bufv;
ddf19c
-            struct fuse_bufvec *pbufv;
ddf19c
-
ddf19c
-            /*
ddf19c
-             * An element contains one request and the space to send our
ddf19c
-             * response They're spread over multiple descriptors in a
ddf19c
-             * scatter/gather set and we can't trust the guest to keep them
ddf19c
-             * still; so copy in/out.
ddf19c
-             */
ddf19c
-            VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
ddf19c
-            if (!elem) {
ddf19c
+            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
ddf19c
+            if (!req) {
ddf19c
                 break;
ddf19c
             }
ddf19c
 
ddf19c
-            qi->qe = elem;
ddf19c
-            qi->reply_sent = false;
ddf19c
+            req->reply_sent = false;
ddf19c
 
ddf19c
-            if (!fbuf.mem) {
ddf19c
-                fbuf.mem = malloc(se->bufsize);
ddf19c
-                assert(fbuf.mem);
ddf19c
-                assert(se->bufsize > sizeof(struct fuse_in_header));
ddf19c
-            }
ddf19c
-            /* The 'out' part of the elem is from qemu */
ddf19c
-            unsigned int out_num = elem->out_num;
ddf19c
-            struct iovec *out_sg = elem->out_sg;
ddf19c
-            size_t out_len = iov_size(out_sg, out_num);
ddf19c
-            fuse_log(FUSE_LOG_DEBUG,
ddf19c
-                     "%s: elem %d: with %d out desc of length %zd\n", __func__,
ddf19c
-                     elem->index, out_num, out_len);
ddf19c
-
ddf19c
-            /*
ddf19c
-             * The elem should contain a 'fuse_in_header' (in to fuse)
ddf19c
-             * plus the data based on the len in the header.
ddf19c
-             */
ddf19c
-            if (out_len < sizeof(struct fuse_in_header)) {
ddf19c
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
ddf19c
-                         __func__, elem->index);
ddf19c
-                assert(0); /* TODO */
ddf19c
-            }
ddf19c
-            if (out_len > se->bufsize) {
ddf19c
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
ddf19c
-                         __func__, elem->index);
ddf19c
-                assert(0); /* TODO */
ddf19c
-            }
ddf19c
-            /* Copy just the first element and look at it */
ddf19c
-            copy_from_iov(&fbuf, 1, out_sg);
ddf19c
-
ddf19c
-            if (out_num > 2 &&
ddf19c
-                out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
ddf19c
-                ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
ddf19c
-                out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
ddf19c
-                /*
ddf19c
-                 * For a write we don't actually need to copy the
ddf19c
-                 * data, we can just do it straight out of guest memory
ddf19c
-                 * but we must still copy the headers in case the guest
ddf19c
-                 * was nasty and changed them while we were using them.
ddf19c
-                 */
ddf19c
-                fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
ddf19c
-
ddf19c
-                /* copy the fuse_write_in header after the fuse_in_header */
ddf19c
-                fbuf.mem += out_sg->iov_len;
ddf19c
-                copy_from_iov(&fbuf, 1, out_sg + 1);
ddf19c
-                fbuf.mem -= out_sg->iov_len;
ddf19c
-                fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
ddf19c
-
ddf19c
-                /* Allocate the bufv, with space for the rest of the iov */
ddf19c
-                allocated_bufv = true;
ddf19c
-                pbufv = malloc(sizeof(struct fuse_bufvec) +
ddf19c
-                               sizeof(struct fuse_buf) * (out_num - 2));
ddf19c
-                if (!pbufv) {
ddf19c
-                    vu_queue_unpop(dev, q, elem, 0);
ddf19c
-                    free(elem);
ddf19c
-                    fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
ddf19c
-                             __func__);
ddf19c
-                    goto out;
ddf19c
-                }
ddf19c
-
ddf19c
-                pbufv->count = 1;
ddf19c
-                pbufv->buf[0] = fbuf;
ddf19c
-
ddf19c
-                size_t iovindex, pbufvindex;
ddf19c
-                iovindex = 2; /* 2 headers, separate iovs */
ddf19c
-                pbufvindex = 1; /* 2 headers, 1 fusebuf */
ddf19c
-
ddf19c
-                for (; iovindex < out_num; iovindex++, pbufvindex++) {
ddf19c
-                    pbufv->count++;
ddf19c
-                    pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
ddf19c
-                    pbufv->buf[pbufvindex].flags = 0;
ddf19c
-                    pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
ddf19c
-                    pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
ddf19c
-                }
ddf19c
-            } else {
ddf19c
-                /* Normal (non fast write) path */
ddf19c
-
ddf19c
-                /* Copy the rest of the buffer */
ddf19c
-                fbuf.mem += out_sg->iov_len;
ddf19c
-                copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
ddf19c
-                fbuf.mem -= out_sg->iov_len;
ddf19c
-                fbuf.size = out_len;
ddf19c
-
ddf19c
-                /* TODO! Endianness of header */
ddf19c
-
ddf19c
-                /* TODO: Add checks for fuse_session_exited */
ddf19c
-                bufv.buf[0] = fbuf;
ddf19c
-                bufv.count = 1;
ddf19c
-                pbufv = &buf;;
ddf19c
-            }
ddf19c
-            pbufv->idx = 0;
ddf19c
-            pbufv->off = 0;
ddf19c
-            fuse_session_process_buf_int(se, pbufv, &ch);
ddf19c
-
ddf19c
-            if (allocated_bufv) {
ddf19c
-                free(pbufv);
ddf19c
-            }
ddf19c
-
ddf19c
-            if (!qi->reply_sent) {
ddf19c
-                fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
ddf19c
-                         __func__, elem->index);
ddf19c
-                /* I think we've still got to recycle the element */
ddf19c
-                vu_queue_push(dev, q, elem, 0);
ddf19c
-                vu_queue_notify(dev, q);
ddf19c
-            }
ddf19c
-            qi->qe = NULL;
ddf19c
-            free(elem);
ddf19c
-            elem = NULL;
ddf19c
+            g_thread_pool_push(pool, req, NULL);
ddf19c
         }
ddf19c
 
ddf19c
+        pthread_mutex_unlock(&qi->vq_lock);
ddf19c
         pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
ddf19c
     }
ddf19c
-out:
ddf19c
-    pthread_mutex_destroy(&ch.lock);
ddf19c
-    free(fbuf.mem);
ddf19c
+
ddf19c
+    g_thread_pool_free(pool, FALSE, TRUE);
ddf19c
 
ddf19c
     return NULL;
ddf19c
 }
ddf19c
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
ddf19c
         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
ddf19c
                  __func__, qidx, ret);
ddf19c
     }
ddf19c
+    pthread_mutex_destroy(&ourqi->vq_lock);
ddf19c
     close(ourqi->kill_fd);
ddf19c
     ourqi->kick_fd = -1;
ddf19c
     free(vud->qi[qidx]);
ddf19c
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
ddf19c
 
ddf19c
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
ddf19c
         assert(ourqi->kill_fd != -1);
ddf19c
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
ddf19c
+
ddf19c
         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
ddf19c
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
ddf19c
                      __func__, qidx);
ddf19c
-- 
ddf19c
1.8.3.1
ddf19c