Blob Blame History Raw
From b31f75f44a9e1dc0521ec73176f89e05db4973ba Mon Sep 17 00:00:00 2001
From: Jakub Hrozek <jhrozek@redhat.com>
Date: Thu, 11 May 2017 16:24:24 +0200
Subject: [PATCH 136/138] KCM: Fix the per-client serialization queue
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Resolves:
    https://pagure.io/SSSD/sssd/issue/3372

Fixes a race condition between one client request adding an operation to
the hash table value, which was previously a linked list of operations,
while another concurrent operation would remove the last remaining
linked list element through its callback.

Instead, the hash table value is now a separate 'queue head' structure
which is only changed in a tevent request to make sure is is not
processes concurrently with adding to the queue (which is also a tevent
request).

Reviewed-by: Pavel Březina <pbrezina@redhat.com>
(cherry picked from commit fb51bb68e62de7bb8542f5d224994eb7143040a6)
---
 src/responder/kcm/kcmsrv_op_queue.c | 182 ++++++++++++++++++++++++------------
 1 file changed, 122 insertions(+), 60 deletions(-)

diff --git a/src/responder/kcm/kcmsrv_op_queue.c b/src/responder/kcm/kcmsrv_op_queue.c
index f6c425dd5b64877c8b7401e488dd6565157fc9b5..55c8b65d94f70979fe56fcc4d8747547a9cc9d33 100644
--- a/src/responder/kcm/kcmsrv_op_queue.c
+++ b/src/responder/kcm/kcmsrv_op_queue.c
@@ -27,17 +27,23 @@
 
 struct kcm_ops_queue_entry {
     struct tevent_req *req;
-    uid_t uid;
 
-    hash_table_t *wait_queue_hash;
+    struct kcm_ops_queue *queue;
 
-    struct kcm_ops_queue_entry *head;
     struct kcm_ops_queue_entry *next;
     struct kcm_ops_queue_entry *prev;
 };
 
+struct kcm_ops_queue {
+    uid_t uid;
+    struct tevent_context *ev;
+    struct kcm_ops_queue_ctx *qctx;
+
+    struct kcm_ops_queue_entry *head;
+};
+
 struct kcm_ops_queue_ctx {
-    /* UID: dlist of kcm_ops_queue_entry */
+    /* UID:kcm_ops_queue */
     hash_table_t *wait_queue_hash;
 };
 
@@ -45,8 +51,9 @@ struct kcm_ops_queue_ctx {
  * Per-UID wait queue
  *
  * They key in the hash table is the UID of the peer. The value of each
- * hash table entry is a linked list of kcm_ops_queue_entry structures
- * which primarily hold the tevent request being queued.
+ * hash table entry is kcm_ops_queue structure which in turn contains a
+ * linked list of kcm_ops_queue_entry structures * which primarily hold the
+ * tevent request being queued.
  */
 struct kcm_ops_queue_ctx *kcm_ops_queue_create(TALLOC_CTX *mem_ctx)
 {
@@ -71,11 +78,45 @@ struct kcm_ops_queue_ctx *kcm_ops_queue_create(TALLOC_CTX *mem_ctx)
     return queue_ctx;
 }
 
-static int kcm_op_queue_entry_destructor(struct kcm_ops_queue_entry *entry)
+void queue_removal_cb(struct tevent_context *ctx,
+                      struct tevent_immediate *imm,
+                      void *private_data)
 {
+    struct kcm_ops_queue *kq = talloc_get_type(private_data,
+                                               struct kcm_ops_queue);
     int ret;
+    hash_key_t key;
+
+    talloc_free(imm);
+
+    if (kq->head != NULL) {
+        DEBUG(SSSDBG_TRACE_LIBS, "The queue is no longer empty\n");
+        return;
+    }
+
+    key.type = HASH_KEY_ULONG;
+    key.ul = kq->uid;
+
+    /* If this was the last entry, remove the key (the UID) from the
+     * hash table to signal the queue is empty
+     */
+    ret = hash_delete(kq->qctx->wait_queue_hash, &key);
+    if (ret != HASH_SUCCESS) {
+        DEBUG(SSSDBG_CRIT_FAILURE,
+              "Failed to remove wait queue for user %"SPRIuid"\n",
+              kq->uid);
+        return;
+    }
+
+    DEBUG(SSSDBG_FUNC_DATA,
+          "Removed queue for %"SPRIuid" \n", kq->uid);
+    talloc_free(kq);
+}
+
+static int kcm_op_queue_entry_destructor(struct kcm_ops_queue_entry *entry)
+{
     struct kcm_ops_queue_entry *next_entry;
-    hash_key_t key;
+    struct tevent_immediate *imm;
 
     if (entry == NULL) {
         return 1;
@@ -85,22 +126,19 @@ static int kcm_op_queue_entry_destructor(struct kcm_ops_queue_entry *entry)
     next_entry = entry->next;
 
     /* Remove the current entry from the queue */
-    DLIST_REMOVE(entry->head, entry);
+    DLIST_REMOVE(entry->queue->head, entry);
 
     if (next_entry == NULL) {
-        key.type = HASH_KEY_ULONG;
-        key.ul = entry->uid;
-
-        /* If this was the last entry, remove the key (the UID) from the
-         * hash table to signal the queue is empty
+        /* If there was no other entry, schedule removal of the queue. Do it
+         * in another tevent tick to avoid issues with callbacks invoking
+         * the descructor while another request is touching the queue
          */
-        ret = hash_delete(entry->wait_queue_hash, &key);
-        if (ret != HASH_SUCCESS) {
-            DEBUG(SSSDBG_CRIT_FAILURE,
-                  "Failed to remove wait queue for user %"SPRIuid"\n",
-                  entry->uid);
+        imm = tevent_create_immediate(entry->queue);
+        if (imm == NULL) {
             return 1;
         }
+
+        tevent_schedule_immediate(imm, entry->queue->ev, queue_removal_cb, entry->queue);
         return 0;
     }
 
@@ -109,41 +147,33 @@ static int kcm_op_queue_entry_destructor(struct kcm_ops_queue_entry *entry)
     return 0;
 }
 
-static errno_t kcm_op_queue_add(hash_table_t *wait_queue_hash,
-                                struct kcm_ops_queue_entry *entry,
-                                uid_t uid)
+static struct kcm_ops_queue *kcm_op_queue_get(struct kcm_ops_queue_ctx *qctx,
+                                              struct tevent_context *ev,
+                                              uid_t uid)
 {
     errno_t ret;
     hash_key_t key;
     hash_value_t value;
-    struct kcm_ops_queue_entry *head = NULL;
+    struct kcm_ops_queue *kq;
 
     key.type = HASH_KEY_ULONG;
     key.ul = uid;
 
-    ret = hash_lookup(wait_queue_hash, &key, &value);
+    ret = hash_lookup(qctx->wait_queue_hash, &key, &value);
     switch (ret) {
     case HASH_SUCCESS:
-        /* The key with this UID already exists. Its value is request queue
-         * for the UID, so let's just add the current request to the end
-         * of the queue and wait for the previous requests to finish
-         */
         if (value.type != HASH_VALUE_PTR) {
             DEBUG(SSSDBG_CRIT_FAILURE, "Unexpected hash value type.\n");
-            return EINVAL;
+            return NULL;
         }
 
-        head = talloc_get_type(value.ptr, struct kcm_ops_queue_entry);
-        if (head == NULL) {
+        kq = talloc_get_type(value.ptr, struct kcm_ops_queue);
+        if (kq == NULL) {
             DEBUG(SSSDBG_CRIT_FAILURE, "Invalid queue pointer\n");
-            return EINVAL;
+            return NULL;
         }
 
-        entry->head = head;
-        DLIST_ADD_END(head, entry, struct kcm_ops_queue_entry *);
-
-        DEBUG(SSSDBG_TRACE_LIBS, "Waiting in queue\n");
-        ret = EAGAIN;
+        DEBUG(SSSDBG_TRACE_LIBS, "Found existing queue for this ID\n");
         break;
 
     case HASH_ERROR_KEY_NOT_FOUND:
@@ -151,36 +181,41 @@ static errno_t kcm_op_queue_add(hash_table_t *wait_queue_hash,
          * another one comes in and return EOK to run the current request
          * immediatelly
          */
-        entry->head = entry;
+        DEBUG(SSSDBG_TRACE_LIBS, "No existing queue for this ID\n");
+
+        kq = talloc_zero(qctx->wait_queue_hash, struct kcm_ops_queue);
+        if (kq == NULL) {
+            return NULL;
+        }
+        kq->uid = uid;
+        kq->qctx = qctx;
+        kq->ev = ev;
 
         value.type = HASH_VALUE_PTR;
-        value.ptr = entry;
+        value.ptr = kq;
 
-        ret = hash_enter(wait_queue_hash, &key, &value);
+        ret = hash_enter(qctx->wait_queue_hash, &key, &value);
         if (ret != HASH_SUCCESS) {
             DEBUG(SSSDBG_CRIT_FAILURE, "hash_enter failed.\n");
-            return EIO;
+            return NULL;
         }
-
-        DEBUG(SSSDBG_TRACE_LIBS,
-              "Added a first request to the queue, running immediately\n");
-        ret = EOK;
         break;
 
     default:
         DEBUG(SSSDBG_CRIT_FAILURE, "hash_lookup failed.\n");
-        return EIO;
+        return NULL;
     }
 
-    talloc_steal(wait_queue_hash, entry);
-    talloc_set_destructor(entry, kcm_op_queue_entry_destructor);
-    return ret;
+    return kq;
 }
 
 struct kcm_op_queue_state {
     struct kcm_ops_queue_entry *entry;
 };
 
+static errno_t kcm_op_queue_add_req(struct kcm_ops_queue *kq,
+                                    struct tevent_req *req);
+
 /*
  * Enqueue a request.
  *
@@ -198,6 +233,7 @@ struct tevent_req *kcm_op_queue_send(TALLOC_CTX *mem_ctx,
 {
     errno_t ret;
     struct tevent_req *req;
+    struct kcm_ops_queue *kq;
     struct kcm_op_queue_state *state;
     uid_t uid;
 
@@ -208,22 +244,21 @@ struct tevent_req *kcm_op_queue_send(TALLOC_CTX *mem_ctx,
         return NULL;
     }
 
-    state->entry = talloc_zero(state, struct kcm_ops_queue_entry);
-    if (state->entry == NULL) {
-        ret = ENOMEM;
-        goto immediate;
-    }
-    state->entry->req = req;
-    state->entry->uid = uid;
-    state->entry->wait_queue_hash = qctx->wait_queue_hash;
-
     DEBUG(SSSDBG_FUNC_DATA,
           "Adding request by %"SPRIuid" to the wait queue\n", uid);
 
-    ret = kcm_op_queue_add(qctx->wait_queue_hash, state->entry, uid);
+    kq = kcm_op_queue_get(qctx, ev, uid);
+    if (kq == NULL) {
+        ret = EIO;
+        DEBUG(SSSDBG_OP_FAILURE,
+              "Cannot get queue [%d]: %s\n", ret, sss_strerror(ret));
+        goto immediate;
+    }
+
+    ret = kcm_op_queue_add_req(kq, req);
     if (ret == EOK) {
         DEBUG(SSSDBG_TRACE_LIBS,
-              "Wait queue was empty, running immediately\n");
+              "Queue was empty, running the request immediately\n");
         goto immediate;
     } else if (ret != EAGAIN) {
         DEBUG(SSSDBG_OP_FAILURE,
@@ -244,6 +279,33 @@ immediate:
     return req;
 }
 
+static errno_t kcm_op_queue_add_req(struct kcm_ops_queue *kq,
+                                    struct tevent_req *req)
+{
+    errno_t ret;
+    struct kcm_op_queue_state *state = tevent_req_data(req,
+                                                struct kcm_op_queue_state);
+
+    state->entry = talloc_zero(kq->qctx->wait_queue_hash, struct kcm_ops_queue_entry);
+    if (state->entry == NULL) {
+        return ENOMEM;
+    }
+    state->entry->req = req;
+    state->entry->queue = kq;
+    talloc_set_destructor(state->entry, kcm_op_queue_entry_destructor);
+
+    if (kq->head == NULL) {
+        /* First entry, will run callback at once */
+        ret = EOK;
+    } else {
+        /* Will wait for the previous callbacks to finish */
+        ret = EAGAIN;
+    }
+
+    DLIST_ADD_END(kq->head, state->entry, struct kcm_ops_queue_entry *);
+    return ret;
+}
+
 /*
  * The queue recv function is called when this request is 'activated'. The queue
  * entry should be allocated on the same memory context as the enqueued request
-- 
2.9.4