Blame SOURCES/0035-KCM-Queue-requests-by-the-same-UID.patch

bb7cd1
From 688e8d8ffe331a1dd75a78002bf212277f2d7664 Mon Sep 17 00:00:00 2001
bb7cd1
From: Jakub Hrozek <jhrozek@redhat.com>
bb7cd1
Date: Tue, 21 Mar 2017 13:25:11 +0100
bb7cd1
Subject: [PATCH 35/36] KCM: Queue requests by the same UID
bb7cd1
MIME-Version: 1.0
bb7cd1
Content-Type: text/plain; charset=UTF-8
bb7cd1
Content-Transfer-Encoding: 8bit
bb7cd1
bb7cd1
In order to avoid race conditions, we queue requests towards the KCM
bb7cd1
responder coming from the same client UID.
bb7cd1
bb7cd1
Reviewed-by: Michal Židek <mzidek@redhat.com>
bb7cd1
Reviewed-by: Simo Sorce <simo@redhat.com>
bb7cd1
Reviewed-by: Lukáš Slebodník <lslebodn@redhat.com>
bb7cd1
---
bb7cd1
 Makefile.am                         |  21 ++-
bb7cd1
 src/responder/kcm/kcm.c             |   7 +
bb7cd1
 src/responder/kcm/kcmsrv_cmd.c      |  10 +-
bb7cd1
 src/responder/kcm/kcmsrv_op_queue.c | 264 ++++++++++++++++++++++++++
bb7cd1
 src/responder/kcm/kcmsrv_ops.c      |  44 ++++-
bb7cd1
 src/responder/kcm/kcmsrv_ops.h      |   1 +
bb7cd1
 src/responder/kcm/kcmsrv_pvt.h      |  20 ++
bb7cd1
 src/tests/cmocka/test_kcm_queue.c   | 365 ++++++++++++++++++++++++++++++++++++
bb7cd1
 8 files changed, 721 insertions(+), 11 deletions(-)
bb7cd1
 create mode 100644 src/responder/kcm/kcmsrv_op_queue.c
bb7cd1
 create mode 100644 src/tests/cmocka/test_kcm_queue.c
bb7cd1
bb7cd1
diff --git a/Makefile.am b/Makefile.am
bb7cd1
index e9eaa312c91e3aee40bcf13c90a0ad8c683045d5..91afdd669aa11a3cc316588d3b51d7e8e9c91cb8 100644
bb7cd1
--- a/Makefile.am
bb7cd1
+++ b/Makefile.am
bb7cd1
@@ -304,7 +304,10 @@ non_interactive_cmocka_based_tests += test_inotify
bb7cd1
 endif   # HAVE_INOTIFY
bb7cd1
 
bb7cd1
 if BUILD_KCM
bb7cd1
-non_interactive_cmocka_based_tests += test_kcm_json
bb7cd1
+non_interactive_cmocka_based_tests += \
bb7cd1
+	test_kcm_json \
bb7cd1
+	test_kcm_queue \
bb7cd1
+        $(NULL)
bb7cd1
 endif   # BUILD_KCM
bb7cd1
 
bb7cd1
 if BUILD_SAMBA
bb7cd1
@@ -1501,6 +1504,7 @@ sssd_kcm_SOURCES = \
bb7cd1
     src/responder/kcm/kcmsrv_ccache_json.c \
bb7cd1
     src/responder/kcm/kcmsrv_ccache_secrets.c \
bb7cd1
     src/responder/kcm/kcmsrv_ops.c \
bb7cd1
+    src/responder/kcm/kcmsrv_op_queue.c \
bb7cd1
     src/util/sss_sockets.c \
bb7cd1
     src/util/sss_krb5.c \
bb7cd1
     src/util/sss_iobuf.c \
bb7cd1
@@ -3402,6 +3406,21 @@ test_kcm_json_LDADD = \
bb7cd1
     $(SSSD_INTERNAL_LTLIBS) \
bb7cd1
     libsss_test_common.la \
bb7cd1
     $(NULL)
bb7cd1
+
bb7cd1
+test_kcm_queue_SOURCES = \
bb7cd1
+    src/tests/cmocka/test_kcm_queue.c \
bb7cd1
+    src/responder/kcm/kcmsrv_op_queue.c \
bb7cd1
+    $(NULL)
bb7cd1
+test_kcm_queue_CFLAGS = \
bb7cd1
+    $(AM_CFLAGS) \
bb7cd1
+    $(NULL)
bb7cd1
+test_kcm_queue_LDADD = \
bb7cd1
+    $(CMOCKA_LIBS) \
bb7cd1
+    $(SSSD_LIBS) \
bb7cd1
+    $(SSSD_INTERNAL_LTLIBS) \
bb7cd1
+    libsss_test_common.la \
bb7cd1
+    $(NULL)
bb7cd1
+
bb7cd1
 endif # BUILD_KCM
bb7cd1
 
bb7cd1
 endif # HAVE_CMOCKA
bb7cd1
diff --git a/src/responder/kcm/kcm.c b/src/responder/kcm/kcm.c
bb7cd1
index 063c27b915b4b92f6259496feee891aa94a498b6..3ee978066c589a5cc38b0ae358f741d389d00e7a 100644
bb7cd1
--- a/src/responder/kcm/kcm.c
bb7cd1
+++ b/src/responder/kcm/kcm.c
bb7cd1
@@ -133,6 +133,13 @@ static int kcm_get_config(struct kcm_ctx *kctx)
bb7cd1
         goto done;
bb7cd1
     }
bb7cd1
 
bb7cd1
+    kctx->qctx = kcm_ops_queue_create(kctx);
bb7cd1
+    if (ret != EOK) {
bb7cd1
+        DEBUG(SSSDBG_OP_FAILURE,
bb7cd1
+              "Cannot create KCM request queue [%d]: %s\n",
bb7cd1
+               ret, strerror(ret));
bb7cd1
+        goto done;
bb7cd1
+    }
bb7cd1
     ret = EOK;
bb7cd1
 done:
bb7cd1
     return ret;
bb7cd1
diff --git a/src/responder/kcm/kcmsrv_cmd.c b/src/responder/kcm/kcmsrv_cmd.c
bb7cd1
index 537e88953fd1a190a9a73bcdd430d8e0db8f9291..81015de4a91617de3dca444cde95b636c8d5c0d1 100644
bb7cd1
--- a/src/responder/kcm/kcmsrv_cmd.c
bb7cd1
+++ b/src/responder/kcm/kcmsrv_cmd.c
bb7cd1
@@ -353,14 +353,18 @@ struct kcm_req_ctx {
bb7cd1
 
bb7cd1
 static void kcm_cmd_request_done(struct tevent_req *req);
bb7cd1
 
bb7cd1
-static errno_t kcm_cmd_dispatch(struct kcm_req_ctx *req_ctx)
bb7cd1
+static errno_t kcm_cmd_dispatch(struct kcm_ctx *kctx,
bb7cd1
+                                struct kcm_req_ctx *req_ctx)
bb7cd1
 {
bb7cd1
     struct tevent_req *req;
bb7cd1
     struct cli_ctx *cctx;
bb7cd1
 
bb7cd1
     cctx = req_ctx->cctx;
bb7cd1
 
bb7cd1
-    req = kcm_cmd_send(req_ctx, cctx->ev, req_ctx->kctx->kcm_data,
bb7cd1
+    req = kcm_cmd_send(req_ctx,
bb7cd1
+                       cctx->ev,
bb7cd1
+                       kctx->qctx,
bb7cd1
+                       req_ctx->kctx->kcm_data,
bb7cd1
                        req_ctx->cctx->creds,
bb7cd1
                        &req_ctx->op_io.request,
bb7cd1
                        req_ctx->op_io.op);
bb7cd1
@@ -505,7 +509,7 @@ static void kcm_recv(struct cli_ctx *cctx)
bb7cd1
     /* do not read anymore, client is done sending */
bb7cd1
     TEVENT_FD_NOT_READABLE(cctx->cfde);
bb7cd1
 
bb7cd1
-    ret = kcm_cmd_dispatch(req);
bb7cd1
+    ret = kcm_cmd_dispatch(kctx, req);
bb7cd1
     if (ret != EOK) {
bb7cd1
         DEBUG(SSSDBG_FATAL_FAILURE,
bb7cd1
               "Failed to dispatch KCM operation [%d]: %s\n",
bb7cd1
diff --git a/src/responder/kcm/kcmsrv_op_queue.c b/src/responder/kcm/kcmsrv_op_queue.c
bb7cd1
new file mode 100644
bb7cd1
index 0000000000000000000000000000000000000000..f6c425dd5b64877c8b7401e488dd6565157fc9b5
bb7cd1
--- /dev/null
bb7cd1
+++ b/src/responder/kcm/kcmsrv_op_queue.c
bb7cd1
@@ -0,0 +1,264 @@
bb7cd1
+/*
bb7cd1
+   SSSD
bb7cd1
+
bb7cd1
+   KCM Server - the KCM operations wait queue
bb7cd1
+
bb7cd1
+   Copyright (C) Red Hat, 2017
bb7cd1
+
bb7cd1
+   This program is free software; you can redistribute it and/or modify
bb7cd1
+   it under the terms of the GNU General Public License as published by
bb7cd1
+   the Free Software Foundation; either version 3 of the License, or
bb7cd1
+   (at your option) any later version.
bb7cd1
+
bb7cd1
+   This program is distributed in the hope that it will be useful,
bb7cd1
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
bb7cd1
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
bb7cd1
+   GNU General Public License for more details.
bb7cd1
+
bb7cd1
+   You should have received a copy of the GNU General Public License
bb7cd1
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
bb7cd1
+*/
bb7cd1
+
bb7cd1
+#include "util/util.h"
bb7cd1
+#include "util/util_creds.h"
bb7cd1
+#include "responder/kcm/kcmsrv_pvt.h"
bb7cd1
+
bb7cd1
+#define QUEUE_HASH_SIZE      32
bb7cd1
+
bb7cd1
+struct kcm_ops_queue_entry {
bb7cd1
+    struct tevent_req *req;
bb7cd1
+    uid_t uid;
bb7cd1
+
bb7cd1
+    hash_table_t *wait_queue_hash;
bb7cd1
+
bb7cd1
+    struct kcm_ops_queue_entry *head;
bb7cd1
+    struct kcm_ops_queue_entry *next;
bb7cd1
+    struct kcm_ops_queue_entry *prev;
bb7cd1
+};
bb7cd1
+
bb7cd1
+struct kcm_ops_queue_ctx {
bb7cd1
+    /* UID: dlist of kcm_ops_queue_entry */
bb7cd1
+    hash_table_t *wait_queue_hash;
bb7cd1
+};
bb7cd1
+
bb7cd1
+/*
bb7cd1
+ * Per-UID wait queue
bb7cd1
+ *
bb7cd1
+ * They key in the hash table is the UID of the peer. The value of each
bb7cd1
+ * hash table entry is a linked list of kcm_ops_queue_entry structures
bb7cd1
+ * which primarily hold the tevent request being queued.
bb7cd1
+ */
bb7cd1
+struct kcm_ops_queue_ctx *kcm_ops_queue_create(TALLOC_CTX *mem_ctx)
bb7cd1
+{
bb7cd1
+    errno_t ret;
bb7cd1
+    struct kcm_ops_queue_ctx *queue_ctx;
bb7cd1
+
bb7cd1
+    queue_ctx = talloc_zero(mem_ctx, struct kcm_ops_queue_ctx);
bb7cd1
+    if (queue_ctx == NULL) {
bb7cd1
+        return NULL;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    ret = sss_hash_create_ex(mem_ctx, QUEUE_HASH_SIZE,
bb7cd1
+                             &queue_ctx->wait_queue_hash, 0, 0, 0, 0,
bb7cd1
+                             NULL, NULL);
bb7cd1
+    if (ret != EOK) {
bb7cd1
+        DEBUG(SSSDBG_CRIT_FAILURE,
bb7cd1
+              "sss_hash_create failed [%d]: %s\n", ret, sss_strerror(ret));
bb7cd1
+        talloc_free(queue_ctx);
bb7cd1
+        return NULL;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    return queue_ctx;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static int kcm_op_queue_entry_destructor(struct kcm_ops_queue_entry *entry)
bb7cd1
+{
bb7cd1
+    int ret;
bb7cd1
+    struct kcm_ops_queue_entry *next_entry;
bb7cd1
+    hash_key_t key;
bb7cd1
+
bb7cd1
+    if (entry == NULL) {
bb7cd1
+        return 1;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    /* Take the next entry from the queue */
bb7cd1
+    next_entry = entry->next;
bb7cd1
+
bb7cd1
+    /* Remove the current entry from the queue */
bb7cd1
+    DLIST_REMOVE(entry->head, entry);
bb7cd1
+
bb7cd1
+    if (next_entry == NULL) {
bb7cd1
+        key.type = HASH_KEY_ULONG;
bb7cd1
+        key.ul = entry->uid;
bb7cd1
+
bb7cd1
+        /* If this was the last entry, remove the key (the UID) from the
bb7cd1
+         * hash table to signal the queue is empty
bb7cd1
+         */
bb7cd1
+        ret = hash_delete(entry->wait_queue_hash, &key);
bb7cd1
+        if (ret != HASH_SUCCESS) {
bb7cd1
+            DEBUG(SSSDBG_CRIT_FAILURE,
bb7cd1
+                  "Failed to remove wait queue for user %"SPRIuid"\n",
bb7cd1
+                  entry->uid);
bb7cd1
+            return 1;
bb7cd1
+        }
bb7cd1
+        return 0;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    /* Otherwise, mark the current head as done to run the next request */
bb7cd1
+    tevent_req_done(next_entry->req);
bb7cd1
+    return 0;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static errno_t kcm_op_queue_add(hash_table_t *wait_queue_hash,
bb7cd1
+                                struct kcm_ops_queue_entry *entry,
bb7cd1
+                                uid_t uid)
bb7cd1
+{
bb7cd1
+    errno_t ret;
bb7cd1
+    hash_key_t key;
bb7cd1
+    hash_value_t value;
bb7cd1
+    struct kcm_ops_queue_entry *head = NULL;
bb7cd1
+
bb7cd1
+    key.type = HASH_KEY_ULONG;
bb7cd1
+    key.ul = uid;
bb7cd1
+
bb7cd1
+    ret = hash_lookup(wait_queue_hash, &key, &value);
bb7cd1
+    switch (ret) {
bb7cd1
+    case HASH_SUCCESS:
bb7cd1
+        /* The key with this UID already exists. Its value is request queue
bb7cd1
+         * for the UID, so let's just add the current request to the end
bb7cd1
+         * of the queue and wait for the previous requests to finish
bb7cd1
+         */
bb7cd1
+        if (value.type != HASH_VALUE_PTR) {
bb7cd1
+            DEBUG(SSSDBG_CRIT_FAILURE, "Unexpected hash value type.\n");
bb7cd1
+            return EINVAL;
bb7cd1
+        }
bb7cd1
+
bb7cd1
+        head = talloc_get_type(value.ptr, struct kcm_ops_queue_entry);
bb7cd1
+        if (head == NULL) {
bb7cd1
+            DEBUG(SSSDBG_CRIT_FAILURE, "Invalid queue pointer\n");
bb7cd1
+            return EINVAL;
bb7cd1
+        }
bb7cd1
+
bb7cd1
+        entry->head = head;
bb7cd1
+        DLIST_ADD_END(head, entry, struct kcm_ops_queue_entry *);
bb7cd1
+
bb7cd1
+        DEBUG(SSSDBG_TRACE_LIBS, "Waiting in queue\n");
bb7cd1
+        ret = EAGAIN;
bb7cd1
+        break;
bb7cd1
+
bb7cd1
+    case HASH_ERROR_KEY_NOT_FOUND:
bb7cd1
+        /* No request for this UID yet. Enqueue this request in case
bb7cd1
+         * another one comes in and return EOK to run the current request
bb7cd1
+         * immediatelly
bb7cd1
+         */
bb7cd1
+        entry->head = entry;
bb7cd1
+
bb7cd1
+        value.type = HASH_VALUE_PTR;
bb7cd1
+        value.ptr = entry;
bb7cd1
+
bb7cd1
+        ret = hash_enter(wait_queue_hash, &key, &value);
bb7cd1
+        if (ret != HASH_SUCCESS) {
bb7cd1
+            DEBUG(SSSDBG_CRIT_FAILURE, "hash_enter failed.\n");
bb7cd1
+            return EIO;
bb7cd1
+        }
bb7cd1
+
bb7cd1
+        DEBUG(SSSDBG_TRACE_LIBS,
bb7cd1
+              "Added a first request to the queue, running immediately\n");
bb7cd1
+        ret = EOK;
bb7cd1
+        break;
bb7cd1
+
bb7cd1
+    default:
bb7cd1
+        DEBUG(SSSDBG_CRIT_FAILURE, "hash_lookup failed.\n");
bb7cd1
+        return EIO;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    talloc_steal(wait_queue_hash, entry);
bb7cd1
+    talloc_set_destructor(entry, kcm_op_queue_entry_destructor);
bb7cd1
+    return ret;
bb7cd1
+}
bb7cd1
+
bb7cd1
+struct kcm_op_queue_state {
bb7cd1
+    struct kcm_ops_queue_entry *entry;
bb7cd1
+};
bb7cd1
+
bb7cd1
+/*
bb7cd1
+ * Enqueue a request.
bb7cd1
+ *
bb7cd1
+ * If the request queue /for the given ID/ is empty, that is, if this
bb7cd1
+ * request is the first one in the queue, run the request immediatelly.
bb7cd1
+ *
bb7cd1
+ * Otherwise just add it to the queue and wait until the previous request
bb7cd1
+ * finishes and only at that point mark the current request as done, which
bb7cd1
+ * will trigger calling the recv function and allow the request to continue.
bb7cd1
+ */
bb7cd1
+struct tevent_req *kcm_op_queue_send(TALLOC_CTX *mem_ctx,
bb7cd1
+                                     struct tevent_context *ev,
bb7cd1
+                                     struct kcm_ops_queue_ctx *qctx,
bb7cd1
+                                     struct cli_creds *client)
bb7cd1
+{
bb7cd1
+    errno_t ret;
bb7cd1
+    struct tevent_req *req;
bb7cd1
+    struct kcm_op_queue_state *state;
bb7cd1
+    uid_t uid;
bb7cd1
+
bb7cd1
+    uid = cli_creds_get_uid(client);
bb7cd1
+
bb7cd1
+    req = tevent_req_create(mem_ctx, &state, struct kcm_op_queue_state);
bb7cd1
+    if (req == NULL) {
bb7cd1
+        return NULL;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    state->entry = talloc_zero(state, struct kcm_ops_queue_entry);
bb7cd1
+    if (state->entry == NULL) {
bb7cd1
+        ret = ENOMEM;
bb7cd1
+        goto immediate;
bb7cd1
+    }
bb7cd1
+    state->entry->req = req;
bb7cd1
+    state->entry->uid = uid;
bb7cd1
+    state->entry->wait_queue_hash = qctx->wait_queue_hash;
bb7cd1
+
bb7cd1
+    DEBUG(SSSDBG_FUNC_DATA,
bb7cd1
+          "Adding request by %"SPRIuid" to the wait queue\n", uid);
bb7cd1
+
bb7cd1
+    ret = kcm_op_queue_add(qctx->wait_queue_hash, state->entry, uid);
bb7cd1
+    if (ret == EOK) {
bb7cd1
+        DEBUG(SSSDBG_TRACE_LIBS,
bb7cd1
+              "Wait queue was empty, running immediately\n");
bb7cd1
+        goto immediate;
bb7cd1
+    } else if (ret != EAGAIN) {
bb7cd1
+        DEBUG(SSSDBG_OP_FAILURE,
bb7cd1
+              "Cannot enqueue request [%d]: %s\n", ret, sss_strerror(ret));
bb7cd1
+        goto immediate;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    DEBUG(SSSDBG_TRACE_LIBS, "Waiting our turn in the queue\n");
bb7cd1
+    return req;
bb7cd1
+
bb7cd1
+immediate:
bb7cd1
+    if (ret == EOK) {
bb7cd1
+        tevent_req_done(req);
bb7cd1
+    } else {
bb7cd1
+        tevent_req_error(req, ret);
bb7cd1
+    }
bb7cd1
+    tevent_req_post(req, ev);
bb7cd1
+    return req;
bb7cd1
+}
bb7cd1
+
bb7cd1
+/*
bb7cd1
+ * The queue recv function is called when this request is 'activated'. The queue
bb7cd1
+ * entry should be allocated on the same memory context as the enqueued request
bb7cd1
+ * to trigger freeing the kcm_ops_queue_entry structure destructor when the
bb7cd1
+ * parent request is done and its tevent_req freed. This would in turn unblock
bb7cd1
+ * the next request in the queue
bb7cd1
+ */
bb7cd1
+errno_t kcm_op_queue_recv(struct tevent_req *req,
bb7cd1
+                          TALLOC_CTX *mem_ctx,
bb7cd1
+                          struct kcm_ops_queue_entry **_entry)
bb7cd1
+{
bb7cd1
+    struct kcm_op_queue_state *state = tevent_req_data(req,
bb7cd1
+                                                struct kcm_op_queue_state);
bb7cd1
+
bb7cd1
+    TEVENT_REQ_RETURN_ON_ERROR(req);
bb7cd1
+    *_entry = talloc_steal(mem_ctx, state->entry);
bb7cd1
+    return EOK;
bb7cd1
+}
bb7cd1
diff --git a/src/responder/kcm/kcmsrv_ops.c b/src/responder/kcm/kcmsrv_ops.c
bb7cd1
index 50e8cc635424e15d53e3c8d122c5525044f59c8a..2feaf51f227ce9d90f706229ce7ac201b282dc6f 100644
bb7cd1
--- a/src/responder/kcm/kcmsrv_ops.c
bb7cd1
+++ b/src/responder/kcm/kcmsrv_ops.c
bb7cd1
@@ -67,17 +67,21 @@ struct kcm_op {
bb7cd1
 
bb7cd1
 struct kcm_cmd_state {
bb7cd1
     struct kcm_op *op;
bb7cd1
+    struct tevent_context *ev;
bb7cd1
 
bb7cd1
+    struct kcm_ops_queue_entry *queue_entry;
bb7cd1
     struct kcm_op_ctx *op_ctx;
bb7cd1
     struct sss_iobuf *reply;
bb7cd1
 
bb7cd1
     uint32_t op_ret;
bb7cd1
 };
bb7cd1
 
bb7cd1
+static void kcm_cmd_queue_done(struct tevent_req *subreq);
bb7cd1
 static void kcm_cmd_done(struct tevent_req *subreq);
bb7cd1
 
bb7cd1
 struct tevent_req *kcm_cmd_send(TALLOC_CTX *mem_ctx,
bb7cd1
                                 struct tevent_context *ev,
bb7cd1
+                                struct kcm_ops_queue_ctx *qctx,
bb7cd1
                                 struct kcm_resp_ctx *kcm_data,
bb7cd1
                                 struct cli_creds *client,
bb7cd1
                                 struct kcm_data *input,
bb7cd1
@@ -93,6 +97,7 @@ struct tevent_req *kcm_cmd_send(TALLOC_CTX *mem_ctx,
bb7cd1
         return NULL;
bb7cd1
     }
bb7cd1
     state->op = op;
bb7cd1
+    state->ev = ev;
bb7cd1
 
bb7cd1
     if (op == NULL) {
bb7cd1
         ret = EINVAL;
bb7cd1
@@ -154,18 +159,43 @@ struct tevent_req *kcm_cmd_send(TALLOC_CTX *mem_ctx,
bb7cd1
         goto immediate;
bb7cd1
     }
bb7cd1
 
bb7cd1
-    subreq = op->fn_send(state, ev, state->op_ctx);
bb7cd1
+    subreq = kcm_op_queue_send(state, ev, qctx, client);
bb7cd1
     if (subreq == NULL) {
bb7cd1
         ret = ENOMEM;
bb7cd1
         goto immediate;
bb7cd1
     }
bb7cd1
+    tevent_req_set_callback(subreq, kcm_cmd_queue_done, req);
bb7cd1
+    return req;
bb7cd1
+
bb7cd1
+immediate:
bb7cd1
+    tevent_req_error(req, ret);
bb7cd1
+    tevent_req_post(req, ev);
bb7cd1
+    return req;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static void kcm_cmd_queue_done(struct tevent_req *subreq)
bb7cd1
+{
bb7cd1
+    struct tevent_req *req = tevent_req_callback_data(subreq, struct tevent_req);
bb7cd1
+    struct kcm_cmd_state *state = tevent_req_data(req, struct kcm_cmd_state);
bb7cd1
+    errno_t ret;
bb7cd1
+
bb7cd1
+    /* When this request finishes, it frees the queue_entry which unblocks
bb7cd1
+     * other requests by the same UID
bb7cd1
+     */
bb7cd1
+    ret = kcm_op_queue_recv(subreq, state, &state->queue_entry);
bb7cd1
+    talloc_zfree(subreq);
bb7cd1
+    if (ret != EOK) {
bb7cd1
+        DEBUG(SSSDBG_CRIT_FAILURE, "Cannot acquire queue slot\n");
bb7cd1
+        tevent_req_error(req, ret);
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    subreq = state->op->fn_send(state, state->ev, state->op_ctx);
bb7cd1
+    if (subreq == NULL) {
bb7cd1
+        tevent_req_error(req, ENOMEM);
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
     tevent_req_set_callback(subreq, kcm_cmd_done, req);
bb7cd1
-    return req;
bb7cd1
-
bb7cd1
-immediate:
bb7cd1
-    tevent_req_error(req, ret);
bb7cd1
-    tevent_req_post(req, ev);
bb7cd1
-    return req;
bb7cd1
 }
bb7cd1
 
bb7cd1
 static void kcm_cmd_done(struct tevent_req *subreq)
bb7cd1
diff --git a/src/responder/kcm/kcmsrv_ops.h b/src/responder/kcm/kcmsrv_ops.h
bb7cd1
index 8e6feaf56a10b73c8b6375aea9ef26c392b5b492..67d9f86026bf949548471f2280c130ebefd2f865 100644
bb7cd1
--- a/src/responder/kcm/kcmsrv_ops.h
bb7cd1
+++ b/src/responder/kcm/kcmsrv_ops.h
bb7cd1
@@ -34,6 +34,7 @@ const char *kcm_opt_name(struct kcm_op *op);
bb7cd1
 
bb7cd1
 struct tevent_req *kcm_cmd_send(TALLOC_CTX *mem_ctx,
bb7cd1
                                 struct tevent_context *ev,
bb7cd1
+                                struct kcm_ops_queue_ctx *qctx,
bb7cd1
                                 struct kcm_resp_ctx *kcm_data,
bb7cd1
                                 struct cli_creds *client,
bb7cd1
                                 struct kcm_data *input,
bb7cd1
diff --git a/src/responder/kcm/kcmsrv_pvt.h b/src/responder/kcm/kcmsrv_pvt.h
bb7cd1
index 74f30c00014105ed533744779b02c5d42523722d..f081a6bf0c6e40d2f8a83b07f9bbc2abacff359d 100644
bb7cd1
--- a/src/responder/kcm/kcmsrv_pvt.h
bb7cd1
+++ b/src/responder/kcm/kcmsrv_pvt.h
bb7cd1
@@ -25,6 +25,7 @@
bb7cd1
 #include "config.h"
bb7cd1
 
bb7cd1
 #include <sys/types.h>
bb7cd1
+#include <krb5/krb5.h>
bb7cd1
 #include "responder/common/responder.h"
bb7cd1
 
bb7cd1
 /*
bb7cd1
@@ -65,6 +66,7 @@ struct kcm_ctx {
bb7cd1
     int fd_limit;
bb7cd1
     char *socket_path;
bb7cd1
     enum kcm_ccdb_be cc_be;
bb7cd1
+    struct kcm_ops_queue_ctx *qctx;
bb7cd1
 
bb7cd1
     struct kcm_resp_ctx *kcm_data;
bb7cd1
 };
bb7cd1
@@ -78,4 +80,22 @@ int kcm_connection_setup(struct cli_ctx *cctx);
bb7cd1
  */
bb7cd1
 krb5_error_code sss2krb5_error(errno_t err);
bb7cd1
 
bb7cd1
+/* We enqueue all requests by the same UID to avoid concurrency issues
bb7cd1
+ * especially when performing multiple round-trips to sssd-secrets. In
bb7cd1
+ * future, we should relax the queue to allow multiple read-only operations
bb7cd1
+ * if no write operations are in progress.
bb7cd1
+ */
bb7cd1
+struct kcm_ops_queue_entry;
bb7cd1
+
bb7cd1
+struct kcm_ops_queue_ctx *kcm_ops_queue_create(TALLOC_CTX *mem_ctx);
bb7cd1
+
bb7cd1
+struct tevent_req *kcm_op_queue_send(TALLOC_CTX *mem_ctx,
bb7cd1
+                                     struct tevent_context *ev,
bb7cd1
+                                     struct kcm_ops_queue_ctx *qctx,
bb7cd1
+                                     struct cli_creds *client);
bb7cd1
+
bb7cd1
+errno_t kcm_op_queue_recv(struct tevent_req *req,
bb7cd1
+                          TALLOC_CTX *mem_ctx,
bb7cd1
+                          struct kcm_ops_queue_entry **_entry);
bb7cd1
+
bb7cd1
 #endif /* __KCMSRV_PVT_H__ */
bb7cd1
diff --git a/src/tests/cmocka/test_kcm_queue.c b/src/tests/cmocka/test_kcm_queue.c
bb7cd1
new file mode 100644
bb7cd1
index 0000000000000000000000000000000000000000..ba0d2405629960df5c623848f3207b7c80fa948d
bb7cd1
--- /dev/null
bb7cd1
+++ b/src/tests/cmocka/test_kcm_queue.c
bb7cd1
@@ -0,0 +1,365 @@
bb7cd1
+/*
bb7cd1
+    Copyright (C) 2017 Red Hat
bb7cd1
+
bb7cd1
+    SSSD tests: Test KCM wait queue
bb7cd1
+
bb7cd1
+    This program is free software; you can redistribute it and/or modify
bb7cd1
+    it under the terms of the GNU General Public License as published by
bb7cd1
+    the Free Software Foundation; either version 3 of the License, or
bb7cd1
+    (at your option) any later version.
bb7cd1
+
bb7cd1
+    This program is distributed in the hope that it will be useful,
bb7cd1
+    but WITHOUT ANY WARRANTY; without even the implied warranty of
bb7cd1
+    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
bb7cd1
+    GNU General Public License for more details.
bb7cd1
+
bb7cd1
+    You should have received a copy of the GNU General Public License
bb7cd1
+    along with this program.  If not, see <http://www.gnu.org/licenses/>.
bb7cd1
+*/
bb7cd1
+
bb7cd1
+#include "config.h"
bb7cd1
+
bb7cd1
+#include <stdio.h>
bb7cd1
+#include <popt.h>
bb7cd1
+
bb7cd1
+#include "util/util.h"
bb7cd1
+#include "util/util_creds.h"
bb7cd1
+#include "tests/cmocka/common_mock.h"
bb7cd1
+#include "responder/kcm/kcmsrv_pvt.h"
bb7cd1
+
bb7cd1
+#define INVALID_ID      -1
bb7cd1
+#define FAST_REQ_ID     0
bb7cd1
+#define SLOW_REQ_ID     1
bb7cd1
+
bb7cd1
+#define FAST_REQ_DELAY  1
bb7cd1
+#define SLOW_REQ_DELAY  2
bb7cd1
+
bb7cd1
+struct timed_request_state {
bb7cd1
+    struct tevent_context *ev;
bb7cd1
+    struct kcm_ops_queue_ctx *qctx;
bb7cd1
+    struct cli_creds *client;
bb7cd1
+    int delay;
bb7cd1
+    int req_id;
bb7cd1
+
bb7cd1
+    struct kcm_ops_queue_entry *queue_entry;
bb7cd1
+};
bb7cd1
+
bb7cd1
+static void timed_request_start(struct tevent_req *subreq);
bb7cd1
+static void timed_request_done(struct tevent_context *ev,
bb7cd1
+                               struct tevent_timer *te,
bb7cd1
+                               struct timeval current_time,
bb7cd1
+                               void *pvt);
bb7cd1
+
bb7cd1
+static struct tevent_req *timed_request_send(TALLOC_CTX *mem_ctx,
bb7cd1
+                                             struct tevent_context *ev,
bb7cd1
+                                             struct kcm_ops_queue_ctx *qctx,
bb7cd1
+                                             struct cli_creds *client,
bb7cd1
+                                             int delay,
bb7cd1
+                                             int req_id)
bb7cd1
+{
bb7cd1
+    struct tevent_req *req;
bb7cd1
+    struct tevent_req *subreq;
bb7cd1
+    struct timed_request_state *state;
bb7cd1
+
bb7cd1
+    req = tevent_req_create(mem_ctx, &state, struct timed_request_state);
bb7cd1
+    if (req == NULL) {
bb7cd1
+        return NULL;
bb7cd1
+    }
bb7cd1
+    state->ev = ev;
bb7cd1
+    state->qctx = qctx;
bb7cd1
+    state->client = client;
bb7cd1
+    state->delay = delay;
bb7cd1
+    state->req_id = req_id;
bb7cd1
+
bb7cd1
+    DEBUG(SSSDBG_TRACE_ALL, "Request %p with delay %d\n", req, delay);
bb7cd1
+
bb7cd1
+    subreq = kcm_op_queue_send(state, ev, qctx, client);
bb7cd1
+    if (subreq == NULL) {
bb7cd1
+        return NULL;
bb7cd1
+    }
bb7cd1
+    tevent_req_set_callback(subreq, timed_request_start, req);
bb7cd1
+
bb7cd1
+    return req;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static void timed_request_start(struct tevent_req *subreq)
bb7cd1
+{
bb7cd1
+    struct timeval tv;
bb7cd1
+    struct tevent_timer *timeout = NULL;
bb7cd1
+    struct tevent_req *req = tevent_req_callback_data(subreq,
bb7cd1
+                                                      struct tevent_req);
bb7cd1
+    struct timed_request_state *state = tevent_req_data(req,
bb7cd1
+                                                struct timed_request_state);
bb7cd1
+    errno_t ret;
bb7cd1
+
bb7cd1
+    ret = kcm_op_queue_recv(subreq, state, &state->queue_entry);
bb7cd1
+    talloc_zfree(subreq);
bb7cd1
+    if (ret != EOK) {
bb7cd1
+        tevent_req_error(req, ret);
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    tv = tevent_timeval_current_ofs(state->delay, 0);
bb7cd1
+    timeout = tevent_add_timer(state->ev, state, tv, timed_request_done, req);
bb7cd1
+    if (timeout == NULL) {
bb7cd1
+        tevent_req_error(req, ENOMEM);
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    return;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static void timed_request_done(struct tevent_context *ev,
bb7cd1
+                               struct tevent_timer *te,
bb7cd1
+                               struct timeval current_time,
bb7cd1
+                               void *pvt)
bb7cd1
+{
bb7cd1
+    struct tevent_req *req = talloc_get_type(pvt, struct tevent_req);
bb7cd1
+    DEBUG(SSSDBG_TRACE_ALL, "Request %p done\n", req);
bb7cd1
+    tevent_req_done(req);
bb7cd1
+}
bb7cd1
+
bb7cd1
+static errno_t timed_request_recv(struct tevent_req *req,
bb7cd1
+                                  int *req_id)
bb7cd1
+{
bb7cd1
+    struct timed_request_state *state = tevent_req_data(req,
bb7cd1
+                                                struct timed_request_state);
bb7cd1
+
bb7cd1
+    TEVENT_REQ_RETURN_ON_ERROR(req);
bb7cd1
+    *req_id = state->req_id;
bb7cd1
+    return EOK;
bb7cd1
+}
bb7cd1
+
bb7cd1
+struct test_ctx {
bb7cd1
+    struct kcm_ops_queue_ctx *qctx;
bb7cd1
+    struct tevent_context *ev;
bb7cd1
+
bb7cd1
+    int *req_ids;
bb7cd1
+
bb7cd1
+    int num_requests;
bb7cd1
+    int finished_requests;
bb7cd1
+    bool done;
bb7cd1
+    errno_t error;
bb7cd1
+};
bb7cd1
+
bb7cd1
+static int setup_kcm_queue(void **state)
bb7cd1
+{
bb7cd1
+    struct test_ctx *tctx;
bb7cd1
+
bb7cd1
+    tctx = talloc_zero(NULL, struct test_ctx);
bb7cd1
+    assert_non_null(tctx);
bb7cd1
+
bb7cd1
+    tctx->ev = tevent_context_init(tctx);
bb7cd1
+    assert_non_null(tctx->ev);
bb7cd1
+
bb7cd1
+    tctx->qctx = kcm_ops_queue_create(tctx);
bb7cd1
+    assert_non_null(tctx->qctx);
bb7cd1
+
bb7cd1
+    *state = tctx;
bb7cd1
+    return 0;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static int teardown_kcm_queue(void **state)
bb7cd1
+{
bb7cd1
+    struct test_ctx *tctx = talloc_get_type(*state, struct test_ctx);
bb7cd1
+    talloc_free(tctx);
bb7cd1
+    return 0;
bb7cd1
+}
bb7cd1
+
bb7cd1
+static void test_kcm_queue_done(struct tevent_req *req)
bb7cd1
+{
bb7cd1
+    struct test_ctx *test_ctx = tevent_req_callback_data(req,
bb7cd1
+                                                struct test_ctx);
bb7cd1
+    int req_id = INVALID_ID;
bb7cd1
+
bb7cd1
+    test_ctx->error = timed_request_recv(req, &req_id);
bb7cd1
+    talloc_zfree(req);
bb7cd1
+    if (test_ctx->error != EOK) {
bb7cd1
+        test_ctx->done = true;
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    if (test_ctx->req_ids[test_ctx->finished_requests] != req_id) {
bb7cd1
+        DEBUG(SSSDBG_CRIT_FAILURE,
bb7cd1
+              "Request %d finished, expected %d\n",
bb7cd1
+              req_id, test_ctx->req_ids[test_ctx->finished_requests]);
bb7cd1
+        test_ctx->error = EIO;
bb7cd1
+        test_ctx->done = true;
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
+
bb7cd1
+    test_ctx->finished_requests++;
bb7cd1
+    if (test_ctx->finished_requests == test_ctx->num_requests) {
bb7cd1
+        test_ctx->done = true;
bb7cd1
+        return;
bb7cd1
+    }
bb7cd1
+}
bb7cd1
+
bb7cd1
+/*
bb7cd1
+ * Just make sure that a single pass through the queue works
bb7cd1
+ */
bb7cd1
+static void test_kcm_queue_single(void **state)
bb7cd1
+{
bb7cd1
+    struct test_ctx *test_ctx = talloc_get_type(*state, struct test_ctx);
bb7cd1
+    struct tevent_req *req;
bb7cd1
+    struct cli_creds client;
bb7cd1
+    static int req_ids[] = { 0 };
bb7cd1
+
bb7cd1
+    client.ucred.uid = getuid();
bb7cd1
+    client.ucred.gid = getgid();
bb7cd1
+
bb7cd1
+    req = timed_request_send(test_ctx,
bb7cd1
+                             test_ctx->ev,
bb7cd1
+                             test_ctx->qctx,
bb7cd1
+                             &client, 1, 0);
bb7cd1
+    assert_non_null(req);
bb7cd1
+    tevent_req_set_callback(req, test_kcm_queue_done, test_ctx);
bb7cd1
+
bb7cd1
+    test_ctx->num_requests = 1;
bb7cd1
+    test_ctx->req_ids = req_ids;
bb7cd1
+
bb7cd1
+    while (test_ctx->done == false) {
bb7cd1
+        tevent_loop_once(test_ctx->ev);
bb7cd1
+    }
bb7cd1
+    assert_int_equal(test_ctx->error, EOK);
bb7cd1
+}
bb7cd1
+
bb7cd1
+/*
bb7cd1
+ * Test that multiple requests from the same ID wait for one another
bb7cd1
+ */
bb7cd1
+static void test_kcm_queue_multi_same_id(void **state)
bb7cd1
+{
bb7cd1
+    struct test_ctx *test_ctx = talloc_get_type(*state, struct test_ctx);
bb7cd1
+    struct tevent_req *req;
bb7cd1
+    struct cli_creds client;
bb7cd1
+    /* The slow request will finish first because request from
bb7cd1
+     * the same ID are serialized
bb7cd1
+     */
bb7cd1
+    static int req_ids[] = { SLOW_REQ_ID, FAST_REQ_ID };
bb7cd1
+
bb7cd1
+    client.ucred.uid = getuid();
bb7cd1
+    client.ucred.gid = getgid();
bb7cd1
+
bb7cd1
+    req = timed_request_send(test_ctx,
bb7cd1
+                             test_ctx->ev,
bb7cd1
+                             test_ctx->qctx,
bb7cd1
+                             &client,
bb7cd1
+                             SLOW_REQ_DELAY,
bb7cd1
+                             SLOW_REQ_ID);
bb7cd1
+    assert_non_null(req);
bb7cd1
+    tevent_req_set_callback(req, test_kcm_queue_done, test_ctx);
bb7cd1
+
bb7cd1
+    req = timed_request_send(test_ctx,
bb7cd1
+                             test_ctx->ev,
bb7cd1
+                             test_ctx->qctx,
bb7cd1
+                             &client,
bb7cd1
+                             FAST_REQ_DELAY,
bb7cd1
+                             FAST_REQ_ID);
bb7cd1
+    assert_non_null(req);
bb7cd1
+    tevent_req_set_callback(req, test_kcm_queue_done, test_ctx);
bb7cd1
+
bb7cd1
+    test_ctx->num_requests = 2;
bb7cd1
+    test_ctx->req_ids = req_ids;
bb7cd1
+
bb7cd1
+    while (test_ctx->done == false) {
bb7cd1
+        tevent_loop_once(test_ctx->ev);
bb7cd1
+    }
bb7cd1
+    assert_int_equal(test_ctx->error, EOK);
bb7cd1
+}
bb7cd1
+
bb7cd1
+/*
bb7cd1
+ * Test that multiple requests from different IDs don't wait for one
bb7cd1
+ * another and can run concurrently
bb7cd1
+ */
bb7cd1
+static void test_kcm_queue_multi_different_id(void **state)
bb7cd1
+{
bb7cd1
+    struct test_ctx *test_ctx = talloc_get_type(*state, struct test_ctx);
bb7cd1
+    struct tevent_req *req;
bb7cd1
+    struct cli_creds client;
bb7cd1
+    /* In this test, the fast request will finish sooner because
bb7cd1
+     * both requests are from different IDs, allowing them to run
bb7cd1
+     * concurrently
bb7cd1
+     */
bb7cd1
+    static int req_ids[] = { FAST_REQ_ID, SLOW_REQ_ID };
bb7cd1
+
bb7cd1
+    client.ucred.uid = getuid();
bb7cd1
+    client.ucred.gid = getgid();
bb7cd1
+
bb7cd1
+    req = timed_request_send(test_ctx,
bb7cd1
+                             test_ctx->ev,
bb7cd1
+                             test_ctx->qctx,
bb7cd1
+                             &client,
bb7cd1
+                             SLOW_REQ_DELAY,
bb7cd1
+                             SLOW_REQ_ID);
bb7cd1
+    assert_non_null(req);
bb7cd1
+    tevent_req_set_callback(req, test_kcm_queue_done, test_ctx);
bb7cd1
+
bb7cd1
+    client.ucred.uid = getuid() + 1;
bb7cd1
+    client.ucred.gid = getgid() + 1;
bb7cd1
+
bb7cd1
+    req = timed_request_send(test_ctx,
bb7cd1
+                             test_ctx->ev,
bb7cd1
+                             test_ctx->qctx,
bb7cd1
+                             &client,
bb7cd1
+                             FAST_REQ_DELAY,
bb7cd1
+                             FAST_REQ_ID);
bb7cd1
+    assert_non_null(req);
bb7cd1
+    tevent_req_set_callback(req, test_kcm_queue_done, test_ctx);
bb7cd1
+
bb7cd1
+    test_ctx->num_requests = 2;
bb7cd1
+    test_ctx->req_ids = req_ids;
bb7cd1
+
bb7cd1
+    while (test_ctx->done == false) {
bb7cd1
+        tevent_loop_once(test_ctx->ev);
bb7cd1
+    }
bb7cd1
+    assert_int_equal(test_ctx->error, EOK);
bb7cd1
+}
bb7cd1
+
bb7cd1
+int main(int argc, const char *argv[])
bb7cd1
+{
bb7cd1
+    poptContext pc;
bb7cd1
+    int opt;
bb7cd1
+    int rv;
bb7cd1
+    struct poptOption long_options[] = {
bb7cd1
+        POPT_AUTOHELP
bb7cd1
+        SSSD_DEBUG_OPTS
bb7cd1
+        POPT_TABLEEND
bb7cd1
+    };
bb7cd1
+
bb7cd1
+    const struct CMUnitTest tests[] = {
bb7cd1
+        cmocka_unit_test_setup_teardown(test_kcm_queue_single,
bb7cd1
+                                        setup_kcm_queue,
bb7cd1
+                                        teardown_kcm_queue),
bb7cd1
+        cmocka_unit_test_setup_teardown(test_kcm_queue_multi_same_id,
bb7cd1
+                                        setup_kcm_queue,
bb7cd1
+                                        teardown_kcm_queue),
bb7cd1
+        cmocka_unit_test_setup_teardown(test_kcm_queue_multi_different_id,
bb7cd1
+                                        setup_kcm_queue,
bb7cd1
+                                        teardown_kcm_queue),
bb7cd1
+    };
bb7cd1
+
bb7cd1
+    /* Set debug level to invalid value so we can deside if -d 0 was used. */
bb7cd1
+    debug_level = SSSDBG_INVALID;
bb7cd1
+
bb7cd1
+    pc = poptGetContext(argv[0], argc, argv, long_options, 0);
bb7cd1
+    while((opt = poptGetNextOpt(pc)) != -1) {
bb7cd1
+        switch(opt) {
bb7cd1
+        default:
bb7cd1
+            fprintf(stderr, "\nInvalid option %s: %s\n\n",
bb7cd1
+                    poptBadOption(pc, 0), poptStrerror(opt));
bb7cd1
+            poptPrintUsage(pc, stderr, 0);
bb7cd1
+            return 1;
bb7cd1
+        }
bb7cd1
+    }
bb7cd1
+    poptFreeContext(pc);
bb7cd1
+
bb7cd1
+    DEBUG_CLI_INIT(debug_level);
bb7cd1
+
bb7cd1
+    /* Even though normally the tests should clean up after themselves
bb7cd1
+     * they might not after a failed run. Remove the old db to be sure */
bb7cd1
+    tests_set_cwd();
bb7cd1
+
bb7cd1
+    rv = cmocka_run_group_tests(tests, NULL, NULL);
bb7cd1
+
bb7cd1
+    return rv;
bb7cd1
+}
bb7cd1
-- 
bb7cd1
2.9.3
bb7cd1