yeahuh / rpms / qemu-kvm

Forked from rpms/qemu-kvm 2 years ago
Clone

Blame SOURCES/kvm-virtiofsd-process-requests-in-a-thread-pool.patch

902636
From b0db5e666aaa43eadff3e60a1ada704f33b03074 Mon Sep 17 00:00:00 2001
902636
From: "Dr. David Alan Gilbert" <dgilbert@redhat.com>
902636
Date: Mon, 27 Jan 2020 19:02:19 +0100
902636
Subject: [PATCH 108/116] virtiofsd: process requests in a thread pool
902636
MIME-Version: 1.0
902636
Content-Type: text/plain; charset=UTF-8
902636
Content-Transfer-Encoding: 8bit
902636
902636
RH-Author: Dr. David Alan Gilbert <dgilbert@redhat.com>
902636
Message-id: <20200127190227.40942-105-dgilbert@redhat.com>
902636
Patchwork-id: 93554
902636
O-Subject: [RHEL-AV-8.2 qemu-kvm PATCH 104/112] virtiofsd: process requests in a thread pool
902636
Bugzilla: 1694164
902636
RH-Acked-by: Philippe Mathieu-Daudé <philmd@redhat.com>
902636
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
902636
RH-Acked-by: Sergio Lopez Pascual <slp@redhat.com>
902636
902636
From: Stefan Hajnoczi <stefanha@redhat.com>
902636
902636
Introduce a thread pool so that fv_queue_thread() just pops
902636
VuVirtqElements and hands them to the thread pool.  For the time being
902636
only one worker thread is allowed since passthrough_ll.c is not
902636
thread-safe yet.  Future patches will lift this restriction so that
902636
multiple FUSE requests can be processed in parallel.
902636
902636
The main new concept is struct FVRequest, which contains both
902636
VuVirtqElement and struct fuse_chan.  We now have fv_VuDev for a device,
902636
fv_QueueInfo for a virtqueue, and FVRequest for a request.  Some of
902636
fv_QueueInfo's fields are moved into FVRequest because they are
902636
per-request.  The name FVRequest conforms to QEMU coding style and I
902636
expect the struct fv_* types will be renamed in a future refactoring.
902636
902636
This patch series is not optimal.  fbuf reuse is dropped so each request
902636
does malloc(se->bufsize), but there is no clean and cheap way to keep
902636
this with a thread pool.  The vq_lock mutex is held for longer than
902636
necessary, especially during the eventfd_write() syscall.  Performance
902636
can be improved in the future.
902636
902636
prctl(2) had to be added to the seccomp whitelist because glib invokes
902636
it.
902636
902636
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
902636
Reviewed-by: Misono Tomohiro <misono.tomohiro@jp.fujitsu.com>
902636
Signed-off-by: Dr. David Alan Gilbert <dgilbert@redhat.com>
902636
(cherry picked from commit a3d756c5aecccc4c0e51060a7e2f1c87bf8f1180)
902636
Signed-off-by: Miroslav Rezanina <mrezanin@redhat.com>
902636
---
902636
 tools/virtiofsd/fuse_virtio.c | 359 +++++++++++++++++++++++-------------------
902636
 1 file changed, 201 insertions(+), 158 deletions(-)
902636
902636
diff --git a/tools/virtiofsd/fuse_virtio.c b/tools/virtiofsd/fuse_virtio.c
902636
index f6242f9..0dcf2ef 100644
902636
--- a/tools/virtiofsd/fuse_virtio.c
902636
+++ b/tools/virtiofsd/fuse_virtio.c
902636
@@ -22,6 +22,7 @@
902636
 
902636
 #include <assert.h>
902636
 #include <errno.h>
902636
+#include <glib.h>
902636
 #include <stdint.h>
902636
 #include <stdio.h>
902636
 #include <stdlib.h>
902636
@@ -37,17 +38,28 @@
902636
 struct fv_VuDev;
902636
 struct fv_QueueInfo {
902636
     pthread_t thread;
902636
+    /*
902636
+     * This lock protects the VuVirtq preventing races between
902636
+     * fv_queue_thread() and fv_queue_worker().
902636
+     */
902636
+    pthread_mutex_t vq_lock;
902636
+
902636
     struct fv_VuDev *virtio_dev;
902636
 
902636
     /* Our queue index, corresponds to array position */
902636
     int qidx;
902636
     int kick_fd;
902636
     int kill_fd; /* For killing the thread */
902636
+};
902636
 
902636
-    /* The element for the command currently being processed */
902636
-    VuVirtqElement *qe;
902636
+/* A FUSE request */
902636
+typedef struct {
902636
+    VuVirtqElement elem;
902636
+    struct fuse_chan ch;
902636
+
902636
+    /* Used to complete requests that involve no reply */
902636
     bool reply_sent;
902636
-};
902636
+} FVRequest;
902636
 
902636
 /*
902636
  * We pass the dev element into libvhost-user
902636
@@ -191,8 +203,11 @@ static void copy_iov(struct iovec *src_iov, int src_count,
902636
 int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
902636
                     struct iovec *iov, int count)
902636
 {
902636
-    VuVirtqElement *elem;
902636
-    VuVirtq *q;
902636
+    FVRequest *req = container_of(ch, FVRequest, ch);
902636
+    struct fv_QueueInfo *qi = ch->qi;
902636
+    VuDev *dev = &se->virtio_dev->dev;
902636
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
902636
+    VuVirtqElement *elem = &req->elem;
902636
     int ret = 0;
902636
 
902636
     assert(count >= 1);
902636
@@ -205,11 +220,7 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
902636
 
902636
     /* unique == 0 is notification, which we don't support */
902636
     assert(out->unique);
902636
-    /* For virtio we always have ch */
902636
-    assert(ch);
902636
-    assert(!ch->qi->reply_sent);
902636
-    elem = ch->qi->qe;
902636
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
902636
+    assert(!req->reply_sent);
902636
 
902636
     /* The 'in' part of the elem is to qemu */
902636
     unsigned int in_num = elem->in_num;
902636
@@ -236,9 +247,15 @@ int virtio_send_msg(struct fuse_session *se, struct fuse_chan *ch,
902636
     }
902636
 
902636
     copy_iov(iov, count, in_sg, in_num, tosend_len);
902636
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
902636
-    vu_queue_notify(&se->virtio_dev->dev, q);
902636
-    ch->qi->reply_sent = true;
902636
+
902636
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
+    pthread_mutex_lock(&qi->vq_lock);
902636
+    vu_queue_push(dev, q, elem, tosend_len);
902636
+    vu_queue_notify(dev, q);
902636
+    pthread_mutex_unlock(&qi->vq_lock);
902636
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
+
902636
+    req->reply_sent = true;
902636
 
902636
 err:
902636
     return ret;
902636
@@ -254,9 +271,12 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
902636
                          struct iovec *iov, int count, struct fuse_bufvec *buf,
902636
                          size_t len)
902636
 {
902636
+    FVRequest *req = container_of(ch, FVRequest, ch);
902636
+    struct fv_QueueInfo *qi = ch->qi;
902636
+    VuDev *dev = &se->virtio_dev->dev;
902636
+    VuVirtq *q = vu_get_queue(dev, qi->qidx);
902636
+    VuVirtqElement *elem = &req->elem;
902636
     int ret = 0;
902636
-    VuVirtqElement *elem;
902636
-    VuVirtq *q;
902636
 
902636
     assert(count >= 1);
902636
     assert(iov[0].iov_len >= sizeof(struct fuse_out_header));
902636
@@ -275,11 +295,7 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
902636
     /* unique == 0 is notification which we don't support */
902636
     assert(out->unique);
902636
 
902636
-    /* For virtio we always have ch */
902636
-    assert(ch);
902636
-    assert(!ch->qi->reply_sent);
902636
-    elem = ch->qi->qe;
902636
-    q = &ch->qi->virtio_dev->dev.vq[ch->qi->qidx];
902636
+    assert(!req->reply_sent);
902636
 
902636
     /* The 'in' part of the elem is to qemu */
902636
     unsigned int in_num = elem->in_num;
902636
@@ -395,33 +411,175 @@ int virtio_send_data_iov(struct fuse_session *se, struct fuse_chan *ch,
902636
 
902636
     ret = 0;
902636
 
902636
-    vu_queue_push(&se->virtio_dev->dev, q, elem, tosend_len);
902636
-    vu_queue_notify(&se->virtio_dev->dev, q);
902636
+    pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
+    pthread_mutex_lock(&qi->vq_lock);
902636
+    vu_queue_push(dev, q, elem, tosend_len);
902636
+    vu_queue_notify(dev, q);
902636
+    pthread_mutex_unlock(&qi->vq_lock);
902636
+    pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
 
902636
 err:
902636
     if (ret == 0) {
902636
-        ch->qi->reply_sent = true;
902636
+        req->reply_sent = true;
902636
     }
902636
 
902636
     return ret;
902636
 }
902636
 
902636
+/* Process one FVRequest in a thread pool */
902636
+static void fv_queue_worker(gpointer data, gpointer user_data)
902636
+{
902636
+    struct fv_QueueInfo *qi = user_data;
902636
+    struct fuse_session *se = qi->virtio_dev->se;
902636
+    struct VuDev *dev = &qi->virtio_dev->dev;
902636
+    FVRequest *req = data;
902636
+    VuVirtqElement *elem = &req->elem;
902636
+    struct fuse_buf fbuf = {};
902636
+    bool allocated_bufv = false;
902636
+    struct fuse_bufvec bufv;
902636
+    struct fuse_bufvec *pbufv;
902636
+
902636
+    assert(se->bufsize > sizeof(struct fuse_in_header));
902636
+
902636
+    /*
902636
+     * An element contains one request and the space to send our response
902636
+     * They're spread over multiple descriptors in a scatter/gather set
902636
+     * and we can't trust the guest to keep them still; so copy in/out.
902636
+     */
902636
+    fbuf.mem = malloc(se->bufsize);
902636
+    assert(fbuf.mem);
902636
+
902636
+    fuse_mutex_init(&req->ch.lock);
902636
+    req->ch.fd = -1;
902636
+    req->ch.qi = qi;
902636
+
902636
+    /* The 'out' part of the elem is from qemu */
902636
+    unsigned int out_num = elem->out_num;
902636
+    struct iovec *out_sg = elem->out_sg;
902636
+    size_t out_len = iov_size(out_sg, out_num);
902636
+    fuse_log(FUSE_LOG_DEBUG,
902636
+             "%s: elem %d: with %d out desc of length %zd\n",
902636
+             __func__, elem->index, out_num, out_len);
902636
+
902636
+    /*
902636
+     * The elem should contain a 'fuse_in_header' (in to fuse)
902636
+     * plus the data based on the len in the header.
902636
+     */
902636
+    if (out_len < sizeof(struct fuse_in_header)) {
902636
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
902636
+                 __func__, elem->index);
902636
+        assert(0); /* TODO */
902636
+    }
902636
+    if (out_len > se->bufsize) {
902636
+        fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n", __func__,
902636
+                 elem->index);
902636
+        assert(0); /* TODO */
902636
+    }
902636
+    /* Copy just the first element and look at it */
902636
+    copy_from_iov(&fbuf, 1, out_sg);
902636
+
902636
+    pbufv = NULL; /* Compiler thinks an unitialised path */
902636
+    if (out_num > 2 &&
902636
+        out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
902636
+        ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
902636
+        out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
902636
+        /*
902636
+         * For a write we don't actually need to copy the
902636
+         * data, we can just do it straight out of guest memory
902636
+         * but we must still copy the headers in case the guest
902636
+         * was nasty and changed them while we were using them.
902636
+         */
902636
+        fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
902636
+
902636
+        /* copy the fuse_write_in header afte rthe fuse_in_header */
902636
+        fbuf.mem += out_sg->iov_len;
902636
+        copy_from_iov(&fbuf, 1, out_sg + 1);
902636
+        fbuf.mem -= out_sg->iov_len;
902636
+        fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
902636
+
902636
+        /* Allocate the bufv, with space for the rest of the iov */
902636
+        pbufv = malloc(sizeof(struct fuse_bufvec) +
902636
+                       sizeof(struct fuse_buf) * (out_num - 2));
902636
+        if (!pbufv) {
902636
+            fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
902636
+                    __func__);
902636
+            goto out;
902636
+        }
902636
+
902636
+        allocated_bufv = true;
902636
+        pbufv->count = 1;
902636
+        pbufv->buf[0] = fbuf;
902636
+
902636
+        size_t iovindex, pbufvindex;
902636
+        iovindex = 2; /* 2 headers, separate iovs */
902636
+        pbufvindex = 1; /* 2 headers, 1 fusebuf */
902636
+
902636
+        for (; iovindex < out_num; iovindex++, pbufvindex++) {
902636
+            pbufv->count++;
902636
+            pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
902636
+            pbufv->buf[pbufvindex].flags = 0;
902636
+            pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
902636
+            pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
902636
+        }
902636
+    } else {
902636
+        /* Normal (non fast write) path */
902636
+
902636
+        /* Copy the rest of the buffer */
902636
+        fbuf.mem += out_sg->iov_len;
902636
+        copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
902636
+        fbuf.mem -= out_sg->iov_len;
902636
+        fbuf.size = out_len;
902636
+
902636
+        /* TODO! Endianness of header */
902636
+
902636
+        /* TODO: Add checks for fuse_session_exited */
902636
+        bufv.buf[0] = fbuf;
902636
+        bufv.count = 1;
902636
+        pbufv = &buf;;
902636
+    }
902636
+    pbufv->idx = 0;
902636
+    pbufv->off = 0;
902636
+    fuse_session_process_buf_int(se, pbufv, &req->ch);
902636
+
902636
+out:
902636
+    if (allocated_bufv) {
902636
+        free(pbufv);
902636
+    }
902636
+
902636
+    /* If the request has no reply, still recycle the virtqueue element */
902636
+    if (!req->reply_sent) {
902636
+        struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
902636
+
902636
+        fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n", __func__,
902636
+                 elem->index);
902636
+
902636
+        pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
+        pthread_mutex_lock(&qi->vq_lock);
902636
+        vu_queue_push(dev, q, elem, 0);
902636
+        vu_queue_notify(dev, q);
902636
+        pthread_mutex_unlock(&qi->vq_lock);
902636
+        pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
+    }
902636
+
902636
+    pthread_mutex_destroy(&req->ch.lock);
902636
+    free(fbuf.mem);
902636
+    free(req);
902636
+}
902636
+
902636
 /* Thread function for individual queues, created when a queue is 'started' */
902636
 static void *fv_queue_thread(void *opaque)
902636
 {
902636
     struct fv_QueueInfo *qi = opaque;
902636
     struct VuDev *dev = &qi->virtio_dev->dev;
902636
     struct VuVirtq *q = vu_get_queue(dev, qi->qidx);
902636
-    struct fuse_session *se = qi->virtio_dev->se;
902636
-    struct fuse_chan ch;
902636
-    struct fuse_buf fbuf;
902636
+    GThreadPool *pool;
902636
 
902636
-    fbuf.mem = NULL;
902636
-    fbuf.flags = 0;
902636
-
902636
-    fuse_mutex_init(&ch.lock);
902636
-    ch.fd = (int)0xdaff0d111;
902636
-    ch.qi = qi;
902636
+    pool = g_thread_pool_new(fv_queue_worker, qi, 1 /* TODO max_threads */,
902636
+                             TRUE, NULL);
902636
+    if (!pool) {
902636
+        fuse_log(FUSE_LOG_ERR, "%s: g_thread_pool_new failed\n", __func__);
902636
+        return NULL;
902636
+    }
902636
 
902636
     fuse_log(FUSE_LOG_INFO, "%s: Start for queue %d kick_fd %d\n", __func__,
902636
              qi->qidx, qi->kick_fd);
902636
@@ -478,6 +636,7 @@ static void *fv_queue_thread(void *opaque)
902636
         /* Mutual exclusion with virtio_loop() */
902636
         ret = pthread_rwlock_rdlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
         assert(ret == 0); /* there is no possible error case */
902636
+        pthread_mutex_lock(&qi->vq_lock);
902636
         /* out is from guest, in is too guest */
902636
         unsigned int in_bytes, out_bytes;
902636
         vu_queue_get_avail_bytes(dev, q, &in_bytes, &out_bytes, ~0, ~0);
902636
@@ -486,141 +645,22 @@ static void *fv_queue_thread(void *opaque)
902636
                  "%s: Queue %d gave evalue: %zx available: in: %u out: %u\n",
902636
                  __func__, qi->qidx, (size_t)evalue, in_bytes, out_bytes);
902636
 
902636
-
902636
         while (1) {
902636
-            bool allocated_bufv = false;
902636
-            struct fuse_bufvec bufv;
902636
-            struct fuse_bufvec *pbufv;
902636
-
902636
-            /*
902636
-             * An element contains one request and the space to send our
902636
-             * response They're spread over multiple descriptors in a
902636
-             * scatter/gather set and we can't trust the guest to keep them
902636
-             * still; so copy in/out.
902636
-             */
902636
-            VuVirtqElement *elem = vu_queue_pop(dev, q, sizeof(VuVirtqElement));
902636
-            if (!elem) {
902636
+            FVRequest *req = vu_queue_pop(dev, q, sizeof(FVRequest));
902636
+            if (!req) {
902636
                 break;
902636
             }
902636
 
902636
-            qi->qe = elem;
902636
-            qi->reply_sent = false;
902636
+            req->reply_sent = false;
902636
 
902636
-            if (!fbuf.mem) {
902636
-                fbuf.mem = malloc(se->bufsize);
902636
-                assert(fbuf.mem);
902636
-                assert(se->bufsize > sizeof(struct fuse_in_header));
902636
-            }
902636
-            /* The 'out' part of the elem is from qemu */
902636
-            unsigned int out_num = elem->out_num;
902636
-            struct iovec *out_sg = elem->out_sg;
902636
-            size_t out_len = iov_size(out_sg, out_num);
902636
-            fuse_log(FUSE_LOG_DEBUG,
902636
-                     "%s: elem %d: with %d out desc of length %zd\n", __func__,
902636
-                     elem->index, out_num, out_len);
902636
-
902636
-            /*
902636
-             * The elem should contain a 'fuse_in_header' (in to fuse)
902636
-             * plus the data based on the len in the header.
902636
-             */
902636
-            if (out_len < sizeof(struct fuse_in_header)) {
902636
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too short for in_header\n",
902636
-                         __func__, elem->index);
902636
-                assert(0); /* TODO */
902636
-            }
902636
-            if (out_len > se->bufsize) {
902636
-                fuse_log(FUSE_LOG_ERR, "%s: elem %d too large for buffer\n",
902636
-                         __func__, elem->index);
902636
-                assert(0); /* TODO */
902636
-            }
902636
-            /* Copy just the first element and look at it */
902636
-            copy_from_iov(&fbuf, 1, out_sg);
902636
-
902636
-            if (out_num > 2 &&
902636
-                out_sg[0].iov_len == sizeof(struct fuse_in_header) &&
902636
-                ((struct fuse_in_header *)fbuf.mem)->opcode == FUSE_WRITE &&
902636
-                out_sg[1].iov_len == sizeof(struct fuse_write_in)) {
902636
-                /*
902636
-                 * For a write we don't actually need to copy the
902636
-                 * data, we can just do it straight out of guest memory
902636
-                 * but we must still copy the headers in case the guest
902636
-                 * was nasty and changed them while we were using them.
902636
-                 */
902636
-                fuse_log(FUSE_LOG_DEBUG, "%s: Write special case\n", __func__);
902636
-
902636
-                /* copy the fuse_write_in header after the fuse_in_header */
902636
-                fbuf.mem += out_sg->iov_len;
902636
-                copy_from_iov(&fbuf, 1, out_sg + 1);
902636
-                fbuf.mem -= out_sg->iov_len;
902636
-                fbuf.size = out_sg[0].iov_len + out_sg[1].iov_len;
902636
-
902636
-                /* Allocate the bufv, with space for the rest of the iov */
902636
-                allocated_bufv = true;
902636
-                pbufv = malloc(sizeof(struct fuse_bufvec) +
902636
-                               sizeof(struct fuse_buf) * (out_num - 2));
902636
-                if (!pbufv) {
902636
-                    vu_queue_unpop(dev, q, elem, 0);
902636
-                    free(elem);
902636
-                    fuse_log(FUSE_LOG_ERR, "%s: pbufv malloc failed\n",
902636
-                             __func__);
902636
-                    goto out;
902636
-                }
902636
-
902636
-                pbufv->count = 1;
902636
-                pbufv->buf[0] = fbuf;
902636
-
902636
-                size_t iovindex, pbufvindex;
902636
-                iovindex = 2; /* 2 headers, separate iovs */
902636
-                pbufvindex = 1; /* 2 headers, 1 fusebuf */
902636
-
902636
-                for (; iovindex < out_num; iovindex++, pbufvindex++) {
902636
-                    pbufv->count++;
902636
-                    pbufv->buf[pbufvindex].pos = ~0; /* Dummy */
902636
-                    pbufv->buf[pbufvindex].flags = 0;
902636
-                    pbufv->buf[pbufvindex].mem = out_sg[iovindex].iov_base;
902636
-                    pbufv->buf[pbufvindex].size = out_sg[iovindex].iov_len;
902636
-                }
902636
-            } else {
902636
-                /* Normal (non fast write) path */
902636
-
902636
-                /* Copy the rest of the buffer */
902636
-                fbuf.mem += out_sg->iov_len;
902636
-                copy_from_iov(&fbuf, out_num - 1, out_sg + 1);
902636
-                fbuf.mem -= out_sg->iov_len;
902636
-                fbuf.size = out_len;
902636
-
902636
-                /* TODO! Endianness of header */
902636
-
902636
-                /* TODO: Add checks for fuse_session_exited */
902636
-                bufv.buf[0] = fbuf;
902636
-                bufv.count = 1;
902636
-                pbufv = &buf;;
902636
-            }
902636
-            pbufv->idx = 0;
902636
-            pbufv->off = 0;
902636
-            fuse_session_process_buf_int(se, pbufv, &ch);
902636
-
902636
-            if (allocated_bufv) {
902636
-                free(pbufv);
902636
-            }
902636
-
902636
-            if (!qi->reply_sent) {
902636
-                fuse_log(FUSE_LOG_DEBUG, "%s: elem %d no reply sent\n",
902636
-                         __func__, elem->index);
902636
-                /* I think we've still got to recycle the element */
902636
-                vu_queue_push(dev, q, elem, 0);
902636
-                vu_queue_notify(dev, q);
902636
-            }
902636
-            qi->qe = NULL;
902636
-            free(elem);
902636
-            elem = NULL;
902636
+            g_thread_pool_push(pool, req, NULL);
902636
         }
902636
 
902636
+        pthread_mutex_unlock(&qi->vq_lock);
902636
         pthread_rwlock_unlock(&qi->virtio_dev->vu_dispatch_rwlock);
902636
     }
902636
-out:
902636
-    pthread_mutex_destroy(&ch.lock);
902636
-    free(fbuf.mem);
902636
+
902636
+    g_thread_pool_free(pool, FALSE, TRUE);
902636
 
902636
     return NULL;
902636
 }
902636
@@ -643,6 +683,7 @@ static void fv_queue_cleanup_thread(struct fv_VuDev *vud, int qidx)
902636
         fuse_log(FUSE_LOG_ERR, "%s: Failed to join thread idx %d err %d\n",
902636
                  __func__, qidx, ret);
902636
     }
902636
+    pthread_mutex_destroy(&ourqi->vq_lock);
902636
     close(ourqi->kill_fd);
902636
     ourqi->kick_fd = -1;
902636
     free(vud->qi[qidx]);
902636
@@ -696,6 +737,8 @@ static void fv_queue_set_started(VuDev *dev, int qidx, bool started)
902636
 
902636
         ourqi->kill_fd = eventfd(0, EFD_CLOEXEC | EFD_SEMAPHORE);
902636
         assert(ourqi->kill_fd != -1);
902636
+        pthread_mutex_init(&ourqi->vq_lock, NULL);
902636
+
902636
         if (pthread_create(&ourqi->thread, NULL, fv_queue_thread, ourqi)) {
902636
             fuse_log(FUSE_LOG_ERR, "%s: Failed to create thread for queue %d\n",
902636
                      __func__, qidx);
902636
-- 
902636
1.8.3.1
902636