Blame SOURCES/kvm-util-event-loop-base-Introduce-options-to-set-the-th.patch

29b115
From 7a6fa42d4a4263c94b9bf18290f9e7680ea9e7f4 Mon Sep 17 00:00:00 2001
29b115
From: Nicolas Saenz Julienne <nsaenzju@redhat.com>
29b115
Date: Mon, 25 Apr 2022 09:57:23 +0200
29b115
Subject: [PATCH 03/16] util/event-loop-base: Introduce options to set the
29b115
 thread pool size
29b115
29b115
RH-Author: Nicolas Saenz Julienne <nsaenzju@redhat.com>
29b115
RH-MergeRequest: 93: util/thread-pool: Expose minimum and maximum size
29b115
RH-Commit: [3/3] af78a88ff3c69701cbb5f9e980c3d6ebbd13ff98
29b115
RH-Bugzilla: 2031024
29b115
RH-Acked-by: Miroslav Rezanina <mrezanin@redhat.com>
29b115
RH-Acked-by: Stefano Garzarella <sgarzare@redhat.com>
29b115
RH-Acked-by: Stefan Hajnoczi <stefanha@redhat.com>
29b115
29b115
The thread pool regulates itself: when idle, it kills threads until
29b115
empty, when in demand, it creates new threads until full. This behaviour
29b115
doesn't play well with latency sensitive workloads where the price of
29b115
creating a new thread is too high. For example, when paired with qemu's
29b115
'-mlock', or using safety features like SafeStack, creating a new thread
29b115
has been measured take multiple milliseconds.
29b115
29b115
In order to mitigate this let's introduce a new 'EventLoopBase'
29b115
property to set the thread pool size. The threads will be created during
29b115
the pool's initialization or upon updating the property's value, remain
29b115
available during its lifetime regardless of demand, and destroyed upon
29b115
freeing it. A properly characterized workload will then be able to
29b115
configure the pool to avoid any latency spikes.
29b115
29b115
Signed-off-by: Nicolas Saenz Julienne <nsaenzju@redhat.com>
29b115
Reviewed-by: Stefan Hajnoczi <stefanha@redhat.com>
29b115
Acked-by: Markus Armbruster <armbru@redhat.com>
29b115
Message-id: 20220425075723.20019-4-nsaenzju@redhat.com
29b115
Signed-off-by: Stefan Hajnoczi <stefanha@redhat.com>
29b115
(cherry picked from commit 71ad4713cc1d7fca24388b828ef31ae6cb38a31c)
29b115
---
29b115
 event-loop-base.c                | 23 +++++++++++++
29b115
 include/block/aio.h              | 10 ++++++
29b115
 include/block/thread-pool.h      |  3 ++
29b115
 include/sysemu/event-loop-base.h |  4 +++
29b115
 iothread.c                       |  3 ++
29b115
 qapi/qom.json                    | 10 +++++-
29b115
 util/aio-posix.c                 |  1 +
29b115
 util/async.c                     | 20 ++++++++++++
29b115
 util/main-loop.c                 |  9 ++++++
29b115
 util/thread-pool.c               | 55 +++++++++++++++++++++++++++++---
29b115
 10 files changed, 133 insertions(+), 5 deletions(-)
29b115
29b115
diff --git a/event-loop-base.c b/event-loop-base.c
29b115
index e7f99a6ec8..d5be4dc6fc 100644
29b115
--- a/event-loop-base.c
29b115
+++ b/event-loop-base.c
29b115
@@ -14,6 +14,7 @@
29b115
 #include "qemu/osdep.h"
29b115
 #include "qom/object_interfaces.h"
29b115
 #include "qapi/error.h"
29b115
+#include "block/thread-pool.h"
29b115
 #include "sysemu/event-loop-base.h"
29b115
 
29b115
 typedef struct {
29b115
@@ -21,9 +22,22 @@ typedef struct {
29b115
     ptrdiff_t offset; /* field's byte offset in EventLoopBase struct */
29b115
 } EventLoopBaseParamInfo;
29b115
 
29b115
+static void event_loop_base_instance_init(Object *obj)
29b115
+{
29b115
+    EventLoopBase *base = EVENT_LOOP_BASE(obj);
29b115
+
29b115
+    base->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
29b115
+}
29b115
+
29b115
 static EventLoopBaseParamInfo aio_max_batch_info = {
29b115
     "aio-max-batch", offsetof(EventLoopBase, aio_max_batch),
29b115
 };
29b115
+static EventLoopBaseParamInfo thread_pool_min_info = {
29b115
+    "thread-pool-min", offsetof(EventLoopBase, thread_pool_min),
29b115
+};
29b115
+static EventLoopBaseParamInfo thread_pool_max_info = {
29b115
+    "thread-pool-max", offsetof(EventLoopBase, thread_pool_max),
29b115
+};
29b115
 
29b115
 static void event_loop_base_get_param(Object *obj, Visitor *v,
29b115
         const char *name, void *opaque, Error **errp)
29b115
@@ -95,12 +109,21 @@ static void event_loop_base_class_init(ObjectClass *klass, void *class_data)
29b115
                               event_loop_base_get_param,
29b115
                               event_loop_base_set_param,
29b115
                               NULL, &aio_max_batch_info);
29b115
+    object_class_property_add(klass, "thread-pool-min", "int",
29b115
+                              event_loop_base_get_param,
29b115
+                              event_loop_base_set_param,
29b115
+                              NULL, &thread_pool_min_info);
29b115
+    object_class_property_add(klass, "thread-pool-max", "int",
29b115
+                              event_loop_base_get_param,
29b115
+                              event_loop_base_set_param,
29b115
+                              NULL, &thread_pool_max_info);
29b115
 }
29b115
 
29b115
 static const TypeInfo event_loop_base_info = {
29b115
     .name = TYPE_EVENT_LOOP_BASE,
29b115
     .parent = TYPE_OBJECT,
29b115
     .instance_size = sizeof(EventLoopBase),
29b115
+    .instance_init = event_loop_base_instance_init,
29b115
     .class_size = sizeof(EventLoopBaseClass),
29b115
     .class_init = event_loop_base_class_init,
29b115
     .abstract = true,
29b115
diff --git a/include/block/aio.h b/include/block/aio.h
29b115
index 5634173b12..d128558f1d 100644
29b115
--- a/include/block/aio.h
29b115
+++ b/include/block/aio.h
29b115
@@ -192,6 +192,8 @@ struct AioContext {
29b115
     QSLIST_HEAD(, Coroutine) scheduled_coroutines;
29b115
     QEMUBH *co_schedule_bh;
29b115
 
29b115
+    int thread_pool_min;
29b115
+    int thread_pool_max;
29b115
     /* Thread pool for performing work and receiving completion callbacks.
29b115
      * Has its own locking.
29b115
      */
29b115
@@ -769,4 +771,12 @@ void aio_context_set_poll_params(AioContext *ctx, int64_t max_ns,
29b115
 void aio_context_set_aio_params(AioContext *ctx, int64_t max_batch,
29b115
                                 Error **errp);
29b115
 
29b115
+/**
29b115
+ * aio_context_set_thread_pool_params:
29b115
+ * @ctx: the aio context
29b115
+ * @min: min number of threads to have readily available in the thread pool
29b115
+ * @min: max number of threads the thread pool can contain
29b115
+ */
29b115
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
29b115
+                                        int64_t max, Error **errp);
29b115
 #endif
29b115
diff --git a/include/block/thread-pool.h b/include/block/thread-pool.h
29b115
index 7dd7d730a0..2020bcc92d 100644
29b115
--- a/include/block/thread-pool.h
29b115
+++ b/include/block/thread-pool.h
29b115
@@ -20,6 +20,8 @@
29b115
 
29b115
 #include "block/block.h"
29b115
 
29b115
+#define THREAD_POOL_MAX_THREADS_DEFAULT         64
29b115
+
29b115
 typedef int ThreadPoolFunc(void *opaque);
29b115
 
29b115
 typedef struct ThreadPool ThreadPool;
29b115
@@ -33,5 +35,6 @@ BlockAIOCB *thread_pool_submit_aio(ThreadPool *pool,
29b115
 int coroutine_fn thread_pool_submit_co(ThreadPool *pool,
29b115
         ThreadPoolFunc *func, void *arg);
29b115
 void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg);
29b115
+void thread_pool_update_params(ThreadPool *pool, struct AioContext *ctx);
29b115
 
29b115
 #endif
29b115
diff --git a/include/sysemu/event-loop-base.h b/include/sysemu/event-loop-base.h
29b115
index fced4c9fea..2748bf6ae1 100644
29b115
--- a/include/sysemu/event-loop-base.h
29b115
+++ b/include/sysemu/event-loop-base.h
29b115
@@ -33,5 +33,9 @@ struct EventLoopBase {
29b115
 
29b115
     /* AioContext AIO engine parameters */
29b115
     int64_t aio_max_batch;
29b115
+
29b115
+    /* AioContext thread pool parameters */
29b115
+    int64_t thread_pool_min;
29b115
+    int64_t thread_pool_max;
29b115
 };
29b115
 #endif
29b115
diff --git a/iothread.c b/iothread.c
29b115
index 8fa2f3bfb8..529194a566 100644
29b115
--- a/iothread.c
29b115
+++ b/iothread.c
29b115
@@ -174,6 +174,9 @@ static void iothread_set_aio_context_params(EventLoopBase *base, Error **errp)
29b115
     aio_context_set_aio_params(iothread->ctx,
29b115
                                iothread->parent_obj.aio_max_batch,
29b115
                                errp);
29b115
+
29b115
+    aio_context_set_thread_pool_params(iothread->ctx, base->thread_pool_min,
29b115
+                                       base->thread_pool_max, errp);
29b115
 }
29b115
 
29b115
 
29b115
diff --git a/qapi/qom.json b/qapi/qom.json
29b115
index 7d4a2ac1b9..6a653c6636 100644
29b115
--- a/qapi/qom.json
29b115
+++ b/qapi/qom.json
29b115
@@ -508,10 +508,18 @@
29b115
 #                 0 means that the engine will use its default.
29b115
 #                 (default: 0)
29b115
 #
29b115
+# @thread-pool-min: minimum number of threads reserved in the thread pool
29b115
+#                   (default:0)
29b115
+#
29b115
+# @thread-pool-max: maximum number of threads the thread pool can contain
29b115
+#                   (default:64)
29b115
+#
29b115
 # Since: 7.1
29b115
 ##
29b115
 { 'struct': 'EventLoopBaseProperties',
29b115
-  'data': { '*aio-max-batch': 'int' } }
29b115
+  'data': { '*aio-max-batch': 'int',
29b115
+            '*thread-pool-min': 'int',
29b115
+            '*thread-pool-max': 'int' } }
29b115
 
29b115
 ##
29b115
 # @IothreadProperties:
29b115
diff --git a/util/aio-posix.c b/util/aio-posix.c
29b115
index be0182a3c6..731f3826c0 100644
29b115
--- a/util/aio-posix.c
29b115
+++ b/util/aio-posix.c
29b115
@@ -15,6 +15,7 @@
29b115
 
29b115
 #include "qemu/osdep.h"
29b115
 #include "block/block.h"
29b115
+#include "block/thread-pool.h"
29b115
 #include "qemu/main-loop.h"
29b115
 #include "qemu/rcu.h"
29b115
 #include "qemu/rcu_queue.h"
29b115
diff --git a/util/async.c b/util/async.c
29b115
index 2ea1172f3e..554ba70cca 100644
29b115
--- a/util/async.c
29b115
+++ b/util/async.c
29b115
@@ -563,6 +563,9 @@ AioContext *aio_context_new(Error **errp)
29b115
 
29b115
     ctx->aio_max_batch = 0;
29b115
 
29b115
+    ctx->thread_pool_min = 0;
29b115
+    ctx->thread_pool_max = THREAD_POOL_MAX_THREADS_DEFAULT;
29b115
+
29b115
     return ctx;
29b115
 fail:
29b115
     g_source_destroy(&ctx->source);
29b115
@@ -696,3 +699,20 @@ void qemu_set_current_aio_context(AioContext *ctx)
29b115
     assert(!get_my_aiocontext());
29b115
     set_my_aiocontext(ctx);
29b115
 }
29b115
+
29b115
+void aio_context_set_thread_pool_params(AioContext *ctx, int64_t min,
29b115
+                                        int64_t max, Error **errp)
29b115
+{
29b115
+
29b115
+    if (min > max || !max || min > INT_MAX || max > INT_MAX) {
29b115
+        error_setg(errp, "bad thread-pool-min/thread-pool-max values");
29b115
+        return;
29b115
+    }
29b115
+
29b115
+    ctx->thread_pool_min = min;
29b115
+    ctx->thread_pool_max = max;
29b115
+
29b115
+    if (ctx->thread_pool) {
29b115
+        thread_pool_update_params(ctx->thread_pool, ctx);
29b115
+    }
29b115
+}
29b115
diff --git a/util/main-loop.c b/util/main-loop.c
29b115
index 5b13f456fa..a0f48186ab 100644
29b115
--- a/util/main-loop.c
29b115
+++ b/util/main-loop.c
29b115
@@ -30,6 +30,7 @@
29b115
 #include "sysemu/replay.h"
29b115
 #include "qemu/main-loop.h"
29b115
 #include "block/aio.h"
29b115
+#include "block/thread-pool.h"
29b115
 #include "qemu/error-report.h"
29b115
 #include "qemu/queue.h"
29b115
 #include "qemu/compiler.h"
29b115
@@ -187,12 +188,20 @@ int qemu_init_main_loop(Error **errp)
29b115
 
29b115
 static void main_loop_update_params(EventLoopBase *base, Error **errp)
29b115
 {
29b115
+    ERRP_GUARD();
29b115
+
29b115
     if (!qemu_aio_context) {
29b115
         error_setg(errp, "qemu aio context not ready");
29b115
         return;
29b115
     }
29b115
 
29b115
     aio_context_set_aio_params(qemu_aio_context, base->aio_max_batch, errp);
29b115
+    if (*errp) {
29b115
+        return;
29b115
+    }
29b115
+
29b115
+    aio_context_set_thread_pool_params(qemu_aio_context, base->thread_pool_min,
29b115
+                                       base->thread_pool_max, errp);
29b115
 }
29b115
 
29b115
 MainLoop *mloop;
29b115
diff --git a/util/thread-pool.c b/util/thread-pool.c
29b115
index d763cea505..196835b4d3 100644
29b115
--- a/util/thread-pool.c
29b115
+++ b/util/thread-pool.c
29b115
@@ -58,7 +58,6 @@ struct ThreadPool {
29b115
     QemuMutex lock;
29b115
     QemuCond worker_stopped;
29b115
     QemuSemaphore sem;
29b115
-    int max_threads;
29b115
     QEMUBH *new_thread_bh;
29b115
 
29b115
     /* The following variables are only accessed from one AioContext. */
29b115
@@ -71,8 +70,27 @@ struct ThreadPool {
29b115
     int new_threads;     /* backlog of threads we need to create */
29b115
     int pending_threads; /* threads created but not running yet */
29b115
     bool stopping;
29b115
+    int min_threads;
29b115
+    int max_threads;
29b115
 };
29b115
 
29b115
+static inline bool back_to_sleep(ThreadPool *pool, int ret)
29b115
+{
29b115
+    /*
29b115
+     * The semaphore timed out, we should exit the loop except when:
29b115
+     *  - There is work to do, we raced with the signal.
29b115
+     *  - The max threads threshold just changed, we raced with the signal.
29b115
+     *  - The thread pool forces a minimum number of readily available threads.
29b115
+     */
29b115
+    if (ret == -1 && (!QTAILQ_EMPTY(&pool->request_list) ||
29b115
+            pool->cur_threads > pool->max_threads ||
29b115
+            pool->cur_threads <= pool->min_threads)) {
29b115
+            return true;
29b115
+    }
29b115
+
29b115
+    return false;
29b115
+}
29b115
+
29b115
 static void *worker_thread(void *opaque)
29b115
 {
29b115
     ThreadPool *pool = opaque;
29b115
@@ -91,8 +109,9 @@ static void *worker_thread(void *opaque)
29b115
             ret = qemu_sem_timedwait(&pool->sem, 10000);
29b115
             qemu_mutex_lock(&pool->lock);
29b115
             pool->idle_threads--;
29b115
-        } while (ret == -1 && !QTAILQ_EMPTY(&pool->request_list));
29b115
-        if (ret == -1 || pool->stopping) {
29b115
+        } while (back_to_sleep(pool, ret));
29b115
+        if (ret == -1 || pool->stopping ||
29b115
+            pool->cur_threads > pool->max_threads) {
29b115
             break;
29b115
         }
29b115
 
29b115
@@ -294,6 +313,33 @@ void thread_pool_submit(ThreadPool *pool, ThreadPoolFunc *func, void *arg)
29b115
     thread_pool_submit_aio(pool, func, arg, NULL, NULL);
29b115
 }
29b115
 
29b115
+void thread_pool_update_params(ThreadPool *pool, AioContext *ctx)
29b115
+{
29b115
+    qemu_mutex_lock(&pool->lock);
29b115
+
29b115
+    pool->min_threads = ctx->thread_pool_min;
29b115
+    pool->max_threads = ctx->thread_pool_max;
29b115
+
29b115
+    /*
29b115
+     * We either have to:
29b115
+     *  - Increase the number available of threads until over the min_threads
29b115
+     *    threshold.
29b115
+     *  - Decrease the number of available threads until under the max_threads
29b115
+     *    threshold.
29b115
+     *  - Do nothing. The current number of threads fall in between the min and
29b115
+     *    max thresholds. We'll let the pool manage itself.
29b115
+     */
29b115
+    for (int i = pool->cur_threads; i < pool->min_threads; i++) {
29b115
+        spawn_thread(pool);
29b115
+    }
29b115
+
29b115
+    for (int i = pool->cur_threads; i > pool->max_threads; i--) {
29b115
+        qemu_sem_post(&pool->sem);
29b115
+    }
29b115
+
29b115
+    qemu_mutex_unlock(&pool->lock);
29b115
+}
29b115
+
29b115
 static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
29b115
 {
29b115
     if (!ctx) {
29b115
@@ -306,11 +352,12 @@ static void thread_pool_init_one(ThreadPool *pool, AioContext *ctx)
29b115
     qemu_mutex_init(&pool->lock);
29b115
     qemu_cond_init(&pool->worker_stopped);
29b115
     qemu_sem_init(&pool->sem, 0);
29b115
-    pool->max_threads = 64;
29b115
     pool->new_thread_bh = aio_bh_new(ctx, spawn_thread_bh_fn, pool);
29b115
 
29b115
     QLIST_INIT(&pool->head);
29b115
     QTAILQ_INIT(&pool->request_list);
29b115
+
29b115
+    thread_pool_update_params(pool, ctx);
29b115
 }
29b115
 
29b115
 ThreadPool *thread_pool_new(AioContext *ctx)
29b115
-- 
29b115
2.31.1
29b115