Blob Blame History Raw
From e71e44e4393a803900ac79d26a91f96ad0068e59 Mon Sep 17 00:00:00 2001
From: Mark Reynolds <mreynolds@redhat.com>
Date: Tue, 23 Aug 2016 12:06:30 -0400
Subject: [PATCH 39/45] Ticket 48964 - cleanAllRUV changelog purging
 incorrectly  processes all backends

Bug Description:  When the changelog was being purged of "cleaned" rids it was checking
                  all the backend changelogs, and not the one from which the
                  cleanAllRUV task originated from.  This could corrupt a different
                  backend's changelog if both backends used the same RID.

Fix Description:  Purge the changelog associated with the backend that is specified in
                  the cleanAllRUV task.  Also moved the "purging" to its own function,
                  and fixed a few compiler warnings.

https://fedorahosted.org/389/ticket/48965

Reviewed by: nhosoi(Thanks!)

(cherry picked from commit fda00435a7536c1ded72bb78a975f3370d09a3be)
---
 ldap/servers/plugins/replication/cl5_api.c         | 162 +++++++++++++--------
 ldap/servers/plugins/replication/cl5_api.h         |   2 +-
 .../plugins/replication/repl5_replica_config.c     |   2 +-
 3 files changed, 106 insertions(+), 60 deletions(-)

diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
index 3adaf86..6a09aea 100644
--- a/ldap/servers/plugins/replication/cl5_api.c
+++ b/ldap/servers/plugins/replication/cl5_api.c
@@ -317,7 +317,7 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB
 static int _cl5TrimInit ();
 static void _cl5TrimCleanup ();
 static int _cl5TrimMain (void *param);
-static void _cl5DoTrimming (ReplicaId rid);
+static void _cl5DoTrimming ();
 static void _cl5CompactDBs();
 static void _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid);
 static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
@@ -3447,43 +3447,37 @@ static int _cl5TrimMain (void *param)
     return 0;
 }
 
-/* We remove an entry if it has been replayed to all consumers and
-   and the number of entries in the changelog is larger than maxEntries 
-   or age of the entry is larger than maxAge. 
-   Also we can't purge entries which correspond to max csns in the
-   supplier's ruv. Here is a example where we can get into trouble:
-   The server is setup with time based trimming and no consumer's
-   At some point all the entries are trimmed from the changelog.
-   At a later point a consumer is added and initialized online
-   Then a change is made on the supplier.
-   To update the consumer, the supplier would attempt to locate
-   the last change sent to the consumer in the changelog and will
-   fail because the change was removed.
-    
+/*
+ * We remove an entry if it has been replayed to all consumers and the number
+ * of entries in the changelog is larger than maxEntries or age of the entry
+ * is larger than maxAge.  Also we can't purge entries which correspond to max
+ * csns in the supplier's ruv. Here is a example where we can get into trouble:
+ *
+ *   The server is setup with time based trimming and no consumer's
+ *   At some point all the entries are trimmed from the changelog.
+ *   At a later point a consumer is added and initialized online.
+ *   Then a change is made on the supplier.
+ *   To update the consumer, the supplier would attempt to locate the last
+ *   change sent to the consumer in the changelog and will fail because the
+ *   change was removed.
  */
-
-static void _cl5DoTrimming (ReplicaId rid)
+static void _cl5DoTrimming ()
 {
 	Object *obj;
 	long numToTrim;
 
 	PR_Lock (s_cl5Desc.dbTrim.lock);
 
-	/* ONREPL We trim file by file which means that some files will be 
-	   trimmed more often than other. We might have to fix that by, for 
-	   example, randomizing starting point */
+	/*
+	 * We are trimming all the changelogs.  We trim file by file which
+	 * means that some files will be trimmed more often than other. We
+	 * might have to fix that by, for example, randomizing the starting
+	 * point.
+	 */
 	obj = objset_first_obj (s_cl5Desc.dbFiles);
-	while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
+	while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
 	{
-		if (rid){
-			/*
-			 * We are cleaning an invalid rid, and need to strip it
-			 * from the changelog.
-			 */
-			_cl5PurgeRID (obj, rid);
-		} else {
-			_cl5TrimFile (obj, &numToTrim);
-		}
+		_cl5TrimFile (obj, &numToTrim);
 		obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
 	}
 
@@ -3495,6 +3489,43 @@ static void _cl5DoTrimming (ReplicaId rid)
 	return;
 }
 
+/*
+ * We are purging a changelog after a cleanAllRUV task.  Find the specific
+ * changelog for the backend that is being cleaned, and purge all the records
+ * with the cleaned rid.
+ */
+static void _cl5DoPurging (Replica *replica)
+{
+	ReplicaId rid = replica_get_rid(replica);
+	const Slapi_DN *sdn = replica_get_root(replica);
+	const char *replName = replica_get_name(replica);
+	char *replGen = replica_get_generation(replica);
+	char *fileName;
+	Object *obj;
+
+	PR_Lock (s_cl5Desc.dbTrim.lock);
+	fileName = _cl5MakeFileName (replName, replGen);
+	obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
+	if (obj) {
+		/* We found our changelog, now purge it */
+		_cl5PurgeRID (obj, rid);
+		object_release (obj);
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+			"Purged rid (%d) from suffix (%s)\n",
+			rid, slapi_sdn_get_dn(sdn));
+	} else {
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
+			"Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
+			rid, fileName, slapi_sdn_get_dn(sdn));
+	}
+	PR_Unlock (s_cl5Desc.dbTrim.lock);
+
+	slapi_ch_free_string(&replGen);
+	slapi_ch_free_string(&fileName);
+
+	return;
+}
+
 /* clear free page files to reduce changelog */
 static void
 _cl5CompactDBs()
@@ -4072,23 +4103,25 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
 {
 	*numToTrim = 0;
 
-    if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0)
+	if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
 		return PR_FALSE;
-
+	}
 	if (s_cl5Desc.dbTrim.maxAge == 0)
 	{
 		*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
 		return ( *numToTrim > 0 );
 	}
 
-    if (s_cl5Desc.dbTrim.maxEntries > 0 &&
-		(*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
-    	return PR_TRUE;
+	if (s_cl5Desc.dbTrim.maxEntries > 0 &&
+	    (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
+		return PR_TRUE;
+	}
 
-	if (time)
+	if (time) {
 		return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
-    else			
-	    return PR_TRUE;
+	} else {
+		return PR_TRUE;
+	}
 }  
 
 static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
@@ -4101,7 +4134,6 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
 	char *pos;
 	char *agmt_name;
 
-
 	PR_ASSERT (replGen && obj);
 
     file = (CL5DBFile*)object_get_data (obj);
@@ -4109,13 +4141,12 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
 
 	agmt_name = get_thread_private_agmtname();
 	
-    if (purge) /* read purge vector entry */
-	    key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
-    else /* read upper bound vector */
-        key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
-
+	if (purge) { /* read purge vector entry */
+		key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
+	} else { /* read upper bound vector */
+		key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
+	}
 	key.size = CSN_STRSIZE;
-
 	data.flags = DB_DBT_MALLOC;
 
 	rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
@@ -4125,13 +4156,13 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
 							rc = _cl5ReadBervals (&vals, &pos, data.size);
                             slapi_ch_free (&(data.data));
                             if (rc != CL5_SUCCESS)
-				goto done;
+                                goto done;
                             
-                            if (purge)
+                            if (purge) {
                                 rc = ruv_init_from_bervals(vals, &file->purgeRUV);							
-                            else
+                            } else {
                                 rc = ruv_init_from_bervals(vals, &file->maxRUV);	    
-
+                            }
                             if (rc != RUV_SUCCESS)
                             {
                                 slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
@@ -4139,7 +4170,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
                                     "RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc);
 						
                                 rc = CL5_RUV_ERROR;
-				goto done;
+                                goto done;
                             }
 
                             /* delete the entry; it is re-added when file
@@ -4151,7 +4182,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
 
 		case DB_NOTFOUND:	/* RUV is lost - need to construct */
                             rc = _cl5ConstructRUV (replGen, obj, purge);
-							goto done;
+                            goto done;
 		
 		default:			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
 								"%s: _cl5ReadRUV: failed to get purge RUV; "
@@ -6946,12 +6977,14 @@ cl5CleanRUV(ReplicaId rid){
     slapi_rwlock_unlock (s_cl5Desc.stLock);
 }
 
-void trigger_cl_purging(ReplicaId rid){
+/*
+ * Create a thread to purge a changelog of cleaned RIDs
+ */
+void trigger_cl_purging(Replica *replica){
     PRThread *trim_tid = NULL;
 
-    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
     trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
-                   (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
+                   (void *)replica, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
                    PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
     if (NULL == trim_tid){
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
@@ -6963,19 +6996,32 @@ void trigger_cl_purging(ReplicaId rid){
     }
 }
 
+/*
+ * Purge a changelog of entries that originated from a particular replica(rid)
+ */
 void
 trigger_cl_purging_thread(void *arg){
-    ReplicaId rid = *(ReplicaId *)arg;
+    Replica *replica = (Replica *)arg;
 
-    /* make sure we have a change log, and we aren't closing it */
-    if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
+    /* Make sure we have a change log, and we aren't closing it */
+    if (replica == NULL ||
+        s_cl5Desc.dbState == CL5_STATE_CLOSED ||
+        s_cl5Desc.dbState == CL5_STATE_CLOSING) {
         return;
     }
+
+    /* Bump the changelog thread count */
     if (CL5_SUCCESS != _cl5AddThread()) {
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
-            "trigger_cl_purging: failed to increment thread count "
+            "trigger_cl_purging: Abort - failed to increment thread count "
             "NSPR error - %d\n", PR_GetError ());
+        return;
     }
-    _cl5DoTrimming(rid);
+
+    /* Purge the changelog */
+    _cl5DoPurging(replica);
     _cl5RemoveThread();
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
+        "trigger_cl_purging: purged changelog for (%s) rid (%d)\n",
+        slapi_sdn_get_dn(replica_get_root(replica)), replica_get_rid(replica));
 }
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
index 4c3b8e8..1a1c2f5 100644
--- a/ldap/servers/plugins/replication/cl5_api.h
+++ b/ldap/servers/plugins/replication/cl5_api.h
@@ -467,6 +467,6 @@ int cl5WriteRUV();
 int cl5DeleteRUV();
 void cl5CleanRUV(ReplicaId rid);
 void cl5NotifyCleanup(int rid);
-void trigger_cl_purging(ReplicaId rid);
+void trigger_cl_purging(Replica *replica);
 
 #endif
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
index 59d3374..011e4ca 100644
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
@@ -1467,7 +1467,7 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
 	/*
 	 * Now purge the changelog
 	 */
-	trigger_cl_purging(rid);
+	trigger_cl_purging(replica);
 
 	if (rc != RUV_SUCCESS){
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc);
-- 
2.4.11