Blob Blame History Raw
From 5bca57b52069508a55b36fafe3729b7d1243743b Mon Sep 17 00:00:00 2001
From: tbordaz <tbordaz@redhat.com>
Date: Wed, 27 Jan 2021 11:58:38 +0100
Subject: [PATCH 2/3] Issue 4526 - sync_repl: when completing an operation in
 the pending list, it can select the wrong operation (#4553)

Bug description:
	When an operation complete, it was retrieved in the pending list with
	the address of the Operation structure. In case of POST OP nested operations
	the same address can be reused. So when completing an operation there could be
	a confusion which operation actually completed.
	A second problem is that if an update its DB_DEADLOCK, the BETXN_PREOP can
	be called several times. During retry, the operation is already in the pending
	list.

Fix description:
	The fix defines a new operation extension (sync_persist_extension_type).
	This operation extension contains an index (idx_pl) of the op_pl in the
	the pending list.

	And additional safety fix is to dump the pending list in case it becomes large (>10).
	The pending list is dumped with SLAPI_LOG_PLUGIN.

	When there is a retry (operation extension exists) the call to sync_update_persist_betxn_pre_op
	becomes a NOOP: the operation is not added again in the pending list.

relates: https://github.com/389ds/389-ds-base/issues/4526

Reviewed by: William Brown (Thanks !!)
---
 ldap/servers/plugins/sync/sync.h         |   9 ++
 ldap/servers/plugins/sync/sync_init.c    |  64 +++++++-
 ldap/servers/plugins/sync/sync_persist.c | 194 ++++++++++++++++-------
 3 files changed, 208 insertions(+), 59 deletions(-)

diff --git a/ldap/servers/plugins/sync/sync.h b/ldap/servers/plugins/sync/sync.h
index 7241fddbf..2fdf24476 100644
--- a/ldap/servers/plugins/sync/sync.h
+++ b/ldap/servers/plugins/sync/sync.h
@@ -82,6 +82,12 @@ typedef enum _pl_flags {
     OPERATION_PL_IGNORED = 5
 } pl_flags_t;
 
+typedef struct op_ext_ident
+{
+    uint32_t idx_pl;   /* To uniquely identify an operation in PL, the operation extension
+                        * contains the index of that operation in the pending list
+                        */
+} op_ext_ident_t;
 /* Pending list operations.
  * it contains a list ('next') of nested operations. The
  * order the same order that the server applied the operation
@@ -90,6 +96,7 @@ typedef enum _pl_flags {
 typedef struct OPERATION_PL_CTX
 {
     Operation *op;      /* Pending operation, should not be freed as it belongs to the pblock */
+    uint32_t idx_pl;    /* index of the operation in the pending list */
     pl_flags_t flags;  /* operation is completed (set to TRUE in POST) */
     Slapi_Entry *entry; /* entry to be store in the enqueued node. 1st arg sync_queue_change */
     Slapi_Entry *eprev; /* pre-entry to be stored in the enqueued node. 2nd arg sync_queue_change */
@@ -99,6 +106,8 @@ typedef struct OPERATION_PL_CTX
 
 OPERATION_PL_CTX_T * get_thread_primary_op(void);
 void set_thread_primary_op(OPERATION_PL_CTX_T *op);
+const op_ext_ident_t * sync_persist_get_operation_extension(Slapi_PBlock *pb);
+void sync_persist_set_operation_extension(Slapi_PBlock *pb, op_ext_ident_t *op_ident);
 
 int sync_register_operation_extension(void);
 int sync_unregister_operation_entension(void);
diff --git a/ldap/servers/plugins/sync/sync_init.c b/ldap/servers/plugins/sync/sync_init.c
index 74af14512..9e6a12000 100644
--- a/ldap/servers/plugins/sync/sync_init.c
+++ b/ldap/servers/plugins/sync/sync_init.c
@@ -16,6 +16,7 @@ static int sync_preop_init(Slapi_PBlock *pb);
 static int sync_postop_init(Slapi_PBlock *pb);
 static int sync_be_postop_init(Slapi_PBlock *pb);
 static int sync_betxn_preop_init(Slapi_PBlock *pb);
+static int sync_persist_register_operation_extension(void);
 
 static PRUintn thread_primary_op;
 
@@ -43,7 +44,8 @@ sync_init(Slapi_PBlock *pb)
         slapi_pblock_set(pb, SLAPI_PLUGIN_CLOSE_FN,
                          (void *)sync_close) != 0 ||
         slapi_pblock_set(pb, SLAPI_PLUGIN_DESCRIPTION,
-                         (void *)&pdesc) != 0) {
+                         (void *)&pdesc) != 0 ||
+        sync_persist_register_operation_extension()) {
         slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM,
                       "sync_init - Failed to register plugin\n");
         rc = 1;
@@ -242,4 +244,64 @@ set_thread_primary_op(OPERATION_PL_CTX_T *op)
         PR_SetThreadPrivate(thread_primary_op, (void *) head);
     }
     head->next = op;
+}
+
+/* The following definitions are used for the operation pending list
+ * (used by sync_repl). To retrieve a specific operation in the pending
+ * list, the operation extension contains the index of the operation in
+ * the pending list
+ */
+static int sync_persist_extension_type;   /* initialized in sync_persist_register_operation_extension */
+static int sync_persist_extension_handle; /* initialized in sync_persist_register_operation_extension */
+
+const op_ext_ident_t *
+sync_persist_get_operation_extension(Slapi_PBlock *pb)
+{
+    Slapi_Operation *op;
+    op_ext_ident_t *ident;
+
+    slapi_pblock_get(pb, SLAPI_OPERATION, &op);
+    ident = slapi_get_object_extension(sync_persist_extension_type, op,
+                                       sync_persist_extension_handle);
+    slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_get_operation_extension operation (op=0x%lx) -> %d\n",
+                    (ulong) op, ident ? ident->idx_pl : -1);
+    return (const op_ext_ident_t *) ident;
+
+}
+
+void
+sync_persist_set_operation_extension(Slapi_PBlock *pb, op_ext_ident_t *op_ident)
+{
+    Slapi_Operation *op;
+
+    slapi_pblock_get(pb, SLAPI_OPERATION, &op);
+    slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_persist_set_operation_extension operation (op=0x%lx) -> %d\n",
+                    (ulong) op, op_ident ? op_ident->idx_pl : -1);
+    slapi_set_object_extension(sync_persist_extension_type, op,
+                               sync_persist_extension_handle, (void *)op_ident);
+}
+/* operation extension constructor */
+static void *
+sync_persist_operation_extension_constructor(void *object __attribute__((unused)), void *parent __attribute__((unused)))
+{
+    /* we only set the extension value explicitly in sync_update_persist_betxn_pre_op */
+    return NULL; /* we don't set anything in the ctor */
+}
+
+/* consumer operation extension destructor */
+static void
+sync_persist_operation_extension_destructor(void *ext, void *object __attribute__((unused)), void *parent __attribute__((unused)))
+{
+    op_ext_ident_t *op_ident = (op_ext_ident_t *)ext;
+    slapi_ch_free((void **)&op_ident);
+}
+static int
+sync_persist_register_operation_extension(void)
+{
+    return slapi_register_object_extension(SYNC_PLUGIN_SUBSYSTEM,
+                                           SLAPI_EXT_OPERATION,
+                                           sync_persist_operation_extension_constructor,
+                                           sync_persist_operation_extension_destructor,
+                                           &sync_persist_extension_type,
+                                           &sync_persist_extension_handle);
 }
\ No newline at end of file
diff --git a/ldap/servers/plugins/sync/sync_persist.c b/ldap/servers/plugins/sync/sync_persist.c
index d13f142b0..e93a8fa83 100644
--- a/ldap/servers/plugins/sync/sync_persist.c
+++ b/ldap/servers/plugins/sync/sync_persist.c
@@ -47,6 +47,9 @@ static int sync_release_connection(Slapi_PBlock *pb, Slapi_Connection *conn, Sla
  * per thread pending list of nested operation..
  * being a betxn_preop the pending list has the same order
  * that the server received the operation
+ *
+ * In case of DB_RETRY, this callback can be called several times
+ * The detection of the DB_RETRY is done via the operation extension
  */
 int
 sync_update_persist_betxn_pre_op(Slapi_PBlock *pb)
@@ -54,64 +57,128 @@ sync_update_persist_betxn_pre_op(Slapi_PBlock *pb)
     OPERATION_PL_CTX_T *prim_op;
     OPERATION_PL_CTX_T *new_op;
     Slapi_DN *sdn;
+    uint32_t idx_pl = 0;
+    op_ext_ident_t *op_ident;
+    Operation *op;
 
     if (!SYNC_IS_INITIALIZED()) {
         /* not initialized if sync plugin is not started */
         return 0;
     }
 
+    prim_op = get_thread_primary_op();
+    op_ident = sync_persist_get_operation_extension(pb);
+    slapi_pblock_get(pb, SLAPI_OPERATION, &op);
+    slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);
+
+    /* Check if we are in a DB retry case */
+    if (op_ident && prim_op) {
+        OPERATION_PL_CTX_T *current_op;
+
+        /* This callback is called (with the same operation) because of a DB_RETRY */
+
+        /* It already existed (in the operation extension) an index of the operation in the pending list */
+        for (idx_pl = 0, current_op = prim_op; current_op->next; idx_pl++, current_op = current_op->next) {
+            if (op_ident->idx_pl == idx_pl) {
+                break;
+            }
+        }
+
+        /* The retrieved operation in the pending list is at the right
+         * index and state. Just return making this callback a noop
+         */
+        PR_ASSERT(current_op);
+        PR_ASSERT(current_op->op == op);
+        PR_ASSERT(current_op->flags == OPERATION_PL_PENDING);
+        slapi_log_err(SLAPI_LOG_WARNING, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - DB retried operation targets "
+                      "\"%s\" (op=0x%lx idx_pl=%d) => op not changed in PL\n",
+                      slapi_sdn_get_dn(sdn), (ulong) op, idx_pl);
+        return 0;
+    }
+
     /* Create a new pending operation node */
     new_op = (OPERATION_PL_CTX_T *)slapi_ch_calloc(1, sizeof(OPERATION_PL_CTX_T));
     new_op->flags = OPERATION_PL_PENDING;
-    slapi_pblock_get(pb, SLAPI_OPERATION, &new_op->op);
-    slapi_pblock_get(pb, SLAPI_TARGET_SDN, &sdn);
+    new_op->op = op;
 
-    prim_op = get_thread_primary_op();
     if (prim_op) {
         /* It already exists a primary operation, so the current
          * operation is a nested one that we need to register at the end
          * of the pending nested operations
+         * Also computes the idx_pl that will be the identifier (index) of the operation
+         * in the pending list
          */
         OPERATION_PL_CTX_T *current_op;
-        for (current_op = prim_op; current_op->next; current_op = current_op->next);
+        for (idx_pl = 0, current_op = prim_op; current_op->next; idx_pl++, current_op = current_op->next);
         current_op->next = new_op;
+        idx_pl++; /* idx_pl is currently the index of the last op
+                   * as we are adding a new op we need to increase that index
+                   */
         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - nested operation targets "
-                      "\"%s\" (0x%lx)\n",
-                      slapi_sdn_get_dn(sdn), (ulong) new_op->op);
+                      "\"%s\" (op=0x%lx idx_pl=%d)\n",
+                      slapi_sdn_get_dn(sdn), (ulong) new_op->op, idx_pl);
     } else {
         /* The current operation is the first/primary one in the txn
          * registers it directly in the thread private data (head)
          */
         set_thread_primary_op(new_op);
+        idx_pl = 0; /* as primary operation, its index in the pending list is 0 */
         slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "sync_update_persist_betxn_pre_op - primary operation targets "
                       "\"%s\" (0x%lx)\n",
                       slapi_sdn_get_dn(sdn), (ulong) new_op->op);
     }
+
+    /* records, in the operation extension AND in the pending list, the identifier (index) of
+     * this operation into the pending list
+     */
+    op_ident = (op_ext_ident_t *) slapi_ch_calloc(1, sizeof (op_ext_ident_t));
+    op_ident->idx_pl = idx_pl;
+    new_op->idx_pl   = idx_pl;
+    sync_persist_set_operation_extension(pb, op_ident);
     return 0;
 }
 
-/* This operation can not be proceed by sync_repl listener because
- * of internal problem. For example, POST entry does not exist
+/* This operation failed or skipped (e.g. no MODs).
+ * In such case POST entry does not exist
  */
 static void
-ignore_op_pl(Operation *op)
+ignore_op_pl(Slapi_PBlock *pb)
 {
     OPERATION_PL_CTX_T *prim_op, *curr_op;
+    op_ext_ident_t *ident;
+    Operation *op;
+
+    slapi_pblock_get(pb, SLAPI_OPERATION, &op);
+
+    /* prim_op is set if betxn was called
+     * In case of invalid update (schema violation) the
+     * operation skip betxn and prim_op is not set.
+     * This is the same for ident
+     */
     prim_op = get_thread_primary_op();
+    ident = sync_persist_get_operation_extension(pb);
 
-    for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
-        if ((curr_op->op == op) && 
-            (curr_op->flags == OPERATION_PL_PENDING)) {  /* If by any "chance" a same operation structure was reused in consecutive updates
-                                                         * we can not only rely on 'op' value
-                                                         */
-            slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl operation (0x%lx) from the pending list\n",
-                    (ulong) op);
-            curr_op->flags = OPERATION_PL_IGNORED;
-            return;
+    if (ident) {
+        /* The TXN_BEPROP was called, so the operation is
+         * registered in the pending list
+         */
+        for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
+            if (curr_op->idx_pl == ident->idx_pl) {
+                /* The operation extension (ident) refers this operation (currop in the pending list).
+                 * This is called during sync_repl postop. At this moment
+                 * the operation in the pending list (identified by idx_pl in the operation extension)
+                 * should be pending
+                 */
+                PR_ASSERT(curr_op->flags == OPERATION_PL_PENDING);
+                slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl operation (op=0x%lx, idx_pl=%d) from the pending list\n",
+                        (ulong) op, ident->idx_pl);
+                curr_op->flags = OPERATION_PL_IGNORED;
+                return;
+            }
         }
     }
-    slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl can not retrieve an operation (0x%lx) in pending list\n",
-                    (ulong) op);
+    slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "ignore_op_pl failing operation (op=0x%lx, idx_pl=%d) was not in the pending list\n",
+                    (ulong) op, ident ? ident->idx_pl : -1);
 }
 
 /* This is a generic function that is called by betxn_post of this plugin.
@@ -126,7 +193,9 @@ sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber
 {
     OPERATION_PL_CTX_T *prim_op = NULL, *curr_op;
     Operation *pb_op;
+    op_ext_ident_t *ident;
     Slapi_DN *sdn;
+    uint32_t count; /* use for diagnostic of the lenght of the pending list */
     int32_t rc;
 
     if (!SYNC_IS_INITIALIZED()) {
@@ -138,7 +207,7 @@ sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber
 
     if (NULL == e) {
         /* Ignore this operation (for example case of failure of the operation) */
-        ignore_op_pl(pb_op);
+        ignore_op_pl(pb);
         return;
     }
     
@@ -161,16 +230,21 @@ sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber
 
 
     prim_op = get_thread_primary_op();
+    ident = sync_persist_get_operation_extension(pb);
     PR_ASSERT(prim_op);
+    PR_ASSERT(ident);
     /* First mark the operation as completed/failed
      * the param to be used once the operation will be pushed
      * on the listeners queue
      */
     for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
-        if ((curr_op->op == pb_op) &&
-            (curr_op->flags == OPERATION_PL_PENDING)) {  /* If by any "chance" a same operation structure was reused in consecutive updates
-                                                         * we can not only rely on 'op' value
-                                                         */
+        if (curr_op->idx_pl == ident->idx_pl) {
+            /* The operation extension (ident) refers this operation (currop in the pending list)
+             * This is called during sync_repl postop. At this moment
+             * the operation in the pending list (identified by idx_pl in the operation extension)
+             * should be pending
+             */
+            PR_ASSERT(curr_op->flags == OPERATION_PL_PENDING);
             if (rc == LDAP_SUCCESS) {
                 curr_op->flags = OPERATION_PL_SUCCEEDED;
                 curr_op->entry = e ? slapi_entry_dup(e) : NULL;
@@ -183,46 +257,50 @@ sync_update_persist_op(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eprev, ber
         }
     }
     if (!curr_op) {
-        slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "%s - operation not found on the pendling list\n", label);
+        slapi_log_err(SLAPI_LOG_ERR, SYNC_PLUGIN_SUBSYSTEM, "%s - operation (op=0x%lx, idx_pl=%d) not found on the pendling list\n", 
+                      label, (ulong) pb_op, ident->idx_pl);
         PR_ASSERT(curr_op);
     }
     
-#if DEBUG
-    /* dump the pending queue */
-    for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
-        char *flags_str;
-        char * entry_str;
+    /* for diagnostic of the pending list, dump its content if it is too long */
+    for (count = 0, curr_op = prim_op; curr_op; count++, curr_op = curr_op->next);
+    if (loglevel_is_set(SLAPI_LOG_PLUGIN) && (count > 10)) {
 
-        if (curr_op->entry) {
-            entry_str = slapi_entry_get_dn(curr_op->entry);
-        } else if (curr_op->eprev){
-            entry_str = slapi_entry_get_dn(curr_op->eprev);
-        } else {
-            entry_str = "unknown";
-        }
-        switch (curr_op->flags) {
-            case OPERATION_PL_SUCCEEDED:
-                flags_str = "succeeded";
-                break;
-            case OPERATION_PL_FAILED:
-                flags_str = "failed";
-                break;
-            case OPERATION_PL_IGNORED:
-                flags_str = "ignored";
-                break;
-            case OPERATION_PL_PENDING:
-                flags_str = "pending";
-                break;
-            default:
-                flags_str = "unknown";
-                break;
-                        
+        /* if pending list looks abnormally too long, dump the pending list */
+        for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
+            char *flags_str;
+            char * entry_str;
 
-        }
-        slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "dump pending list(0x%lx) %s %s\n",
+            if (curr_op->entry) {
+                entry_str = slapi_entry_get_dn(curr_op->entry);
+            } else if (curr_op->eprev) {
+                entry_str = slapi_entry_get_dn(curr_op->eprev);
+            } else {
+                entry_str = "unknown";
+            }
+            switch (curr_op->flags) {
+                case OPERATION_PL_SUCCEEDED:
+                    flags_str = "succeeded";
+                    break;
+                case OPERATION_PL_FAILED:
+                    flags_str = "failed";
+                    break;
+                case OPERATION_PL_IGNORED:
+                    flags_str = "ignored";
+                    break;
+                case OPERATION_PL_PENDING:
+                    flags_str = "pending";
+                    break;
+                default:
+                    flags_str = "unknown";
+                    break;
+
+
+            }
+            slapi_log_err(SLAPI_LOG_PLUGIN, SYNC_PLUGIN_SUBSYSTEM, "dump pending list(0x%lx) %s %s\n",
                     (ulong) curr_op->op, entry_str, flags_str);
+        }
     }
-#endif
 
     /* Second check if it remains a pending operation in the pending list */
     for (curr_op = prim_op; curr_op; curr_op = curr_op->next) {
-- 
2.26.2