Blame SOURCES/0039-Ticket-48964-cleanAllRUV-changelog-purging-incorrect.patch

7c7f29
From e71e44e4393a803900ac79d26a91f96ad0068e59 Mon Sep 17 00:00:00 2001
7c7f29
From: Mark Reynolds <mreynolds@redhat.com>
7c7f29
Date: Tue, 23 Aug 2016 12:06:30 -0400
7c7f29
Subject: [PATCH 39/45] Ticket 48964 - cleanAllRUV changelog purging
7c7f29
 incorrectly  processes all backends
7c7f29
7c7f29
Bug Description:  When the changelog was being purged of "cleaned" rids it was checking
7c7f29
                  all the backend changelogs, and not the one from which the
7c7f29
                  cleanAllRUV task originated from.  This could corrupt a different
7c7f29
                  backend's changelog if both backends used the same RID.
7c7f29
7c7f29
Fix Description:  Purge the changelog associated with the backend that is specified in
7c7f29
                  the cleanAllRUV task.  Also moved the "purging" to its own function,
7c7f29
                  and fixed a few compiler warnings.
7c7f29
7c7f29
https://fedorahosted.org/389/ticket/48965
7c7f29
7c7f29
Reviewed by: nhosoi(Thanks!)
7c7f29
7c7f29
(cherry picked from commit fda00435a7536c1ded72bb78a975f3370d09a3be)
7c7f29
---
7c7f29
 ldap/servers/plugins/replication/cl5_api.c         | 162 +++++++++++++--------
7c7f29
 ldap/servers/plugins/replication/cl5_api.h         |   2 +-
7c7f29
 .../plugins/replication/repl5_replica_config.c     |   2 +-
7c7f29
 3 files changed, 106 insertions(+), 60 deletions(-)
7c7f29
7c7f29
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
7c7f29
index 3adaf86..6a09aea 100644
7c7f29
--- a/ldap/servers/plugins/replication/cl5_api.c
7c7f29
+++ b/ldap/servers/plugins/replication/cl5_api.c
7c7f29
@@ -317,7 +317,7 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB
7c7f29
 static int _cl5TrimInit ();
7c7f29
 static void _cl5TrimCleanup ();
7c7f29
 static int _cl5TrimMain (void *param);
7c7f29
-static void _cl5DoTrimming (ReplicaId rid);
7c7f29
+static void _cl5DoTrimming ();
7c7f29
 static void _cl5CompactDBs();
7c7f29
 static void _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid);
7c7f29
 static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
7c7f29
@@ -3447,43 +3447,37 @@ static int _cl5TrimMain (void *param)
7c7f29
     return 0;
7c7f29
 }
7c7f29
 
7c7f29
-/* We remove an entry if it has been replayed to all consumers and
7c7f29
-   and the number of entries in the changelog is larger than maxEntries 
7c7f29
-   or age of the entry is larger than maxAge. 
7c7f29
-   Also we can't purge entries which correspond to max csns in the
7c7f29
-   supplier's ruv. Here is a example where we can get into trouble:
7c7f29
-   The server is setup with time based trimming and no consumer's
7c7f29
-   At some point all the entries are trimmed from the changelog.
7c7f29
-   At a later point a consumer is added and initialized online
7c7f29
-   Then a change is made on the supplier.
7c7f29
-   To update the consumer, the supplier would attempt to locate
7c7f29
-   the last change sent to the consumer in the changelog and will
7c7f29
-   fail because the change was removed.
7c7f29
-    
7c7f29
+/*
7c7f29
+ * We remove an entry if it has been replayed to all consumers and the number
7c7f29
+ * of entries in the changelog is larger than maxEntries or age of the entry
7c7f29
+ * is larger than maxAge.  Also we can't purge entries which correspond to max
7c7f29
+ * csns in the supplier's ruv. Here is a example where we can get into trouble:
7c7f29
+ *
7c7f29
+ *   The server is setup with time based trimming and no consumer's
7c7f29
+ *   At some point all the entries are trimmed from the changelog.
7c7f29
+ *   At a later point a consumer is added and initialized online.
7c7f29
+ *   Then a change is made on the supplier.
7c7f29
+ *   To update the consumer, the supplier would attempt to locate the last
7c7f29
+ *   change sent to the consumer in the changelog and will fail because the
7c7f29
+ *   change was removed.
7c7f29
  */
7c7f29
-
7c7f29
-static void _cl5DoTrimming (ReplicaId rid)
7c7f29
+static void _cl5DoTrimming ()
7c7f29
 {
7c7f29
 	Object *obj;
7c7f29
 	long numToTrim;
7c7f29
 
7c7f29
 	PR_Lock (s_cl5Desc.dbTrim.lock);
7c7f29
 
7c7f29
-	/* ONREPL We trim file by file which means that some files will be 
7c7f29
-	   trimmed more often than other. We might have to fix that by, for 
7c7f29
-	   example, randomizing starting point */
7c7f29
+	/*
7c7f29
+	 * We are trimming all the changelogs.  We trim file by file which
7c7f29
+	 * means that some files will be trimmed more often than other. We
7c7f29
+	 * might have to fix that by, for example, randomizing the starting
7c7f29
+	 * point.
7c7f29
+	 */
7c7f29
 	obj = objset_first_obj (s_cl5Desc.dbFiles);
7c7f29
-	while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
7c7f29
+	while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
7c7f29
 	{
7c7f29
-		if (rid){
7c7f29
-			/*
7c7f29
-			 * We are cleaning an invalid rid, and need to strip it
7c7f29
-			 * from the changelog.
7c7f29
-			 */
7c7f29
-			_cl5PurgeRID (obj, rid);
7c7f29
-		} else {
7c7f29
-			_cl5TrimFile (obj, &numToTrim);
7c7f29
-		}
7c7f29
+		_cl5TrimFile (obj, &numToTrim);
7c7f29
 		obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
7c7f29
 	}
7c7f29
 
7c7f29
@@ -3495,6 +3489,43 @@ static void _cl5DoTrimming (ReplicaId rid)
7c7f29
 	return;
7c7f29
 }
7c7f29
 
7c7f29
+/*
7c7f29
+ * We are purging a changelog after a cleanAllRUV task.  Find the specific
7c7f29
+ * changelog for the backend that is being cleaned, and purge all the records
7c7f29
+ * with the cleaned rid.
7c7f29
+ */
7c7f29
+static void _cl5DoPurging (Replica *replica)
7c7f29
+{
7c7f29
+	ReplicaId rid = replica_get_rid(replica);
7c7f29
+	const Slapi_DN *sdn = replica_get_root(replica);
7c7f29
+	const char *replName = replica_get_name(replica);
7c7f29
+	char *replGen = replica_get_generation(replica);
7c7f29
+	char *fileName;
7c7f29
+	Object *obj;
7c7f29
+
7c7f29
+	PR_Lock (s_cl5Desc.dbTrim.lock);
7c7f29
+	fileName = _cl5MakeFileName (replName, replGen);
7c7f29
+	obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
7c7f29
+	if (obj) {
7c7f29
+		/* We found our changelog, now purge it */
7c7f29
+		_cl5PurgeRID (obj, rid);
7c7f29
+		object_release (obj);
7c7f29
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
7c7f29
+			"Purged rid (%d) from suffix (%s)\n",
7c7f29
+			rid, slapi_sdn_get_dn(sdn));
7c7f29
+	} else {
7c7f29
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
7c7f29
+			"Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
7c7f29
+			rid, fileName, slapi_sdn_get_dn(sdn));
7c7f29
+	}
7c7f29
+	PR_Unlock (s_cl5Desc.dbTrim.lock);
7c7f29
+
7c7f29
+	slapi_ch_free_string(&replGen);
7c7f29
+	slapi_ch_free_string(&fileName);
7c7f29
+
7c7f29
+	return;
7c7f29
+}
7c7f29
+
7c7f29
 /* clear free page files to reduce changelog */
7c7f29
 static void
7c7f29
 _cl5CompactDBs()
7c7f29
@@ -4072,23 +4103,25 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
7c7f29
 {
7c7f29
 	*numToTrim = 0;
7c7f29
 
7c7f29
-    if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0)
7c7f29
+	if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
7c7f29
 		return PR_FALSE;
7c7f29
-
7c7f29
+	}
7c7f29
 	if (s_cl5Desc.dbTrim.maxAge == 0)
7c7f29
 	{
7c7f29
 		*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
7c7f29
 		return ( *numToTrim > 0 );
7c7f29
 	}
7c7f29
 
7c7f29
-    if (s_cl5Desc.dbTrim.maxEntries > 0 &&
7c7f29
-		(*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
7c7f29
-    	return PR_TRUE;
7c7f29
+	if (s_cl5Desc.dbTrim.maxEntries > 0 &&
7c7f29
+	    (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
7c7f29
+		return PR_TRUE;
7c7f29
+	}
7c7f29
 
7c7f29
-	if (time)
7c7f29
+	if (time) {
7c7f29
 		return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
7c7f29
-    else			
7c7f29
-	    return PR_TRUE;
7c7f29
+	} else {
7c7f29
+		return PR_TRUE;
7c7f29
+	}
7c7f29
 }  
7c7f29
 
7c7f29
 static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
7c7f29
@@ -4101,7 +4134,6 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
7c7f29
 	char *pos;
7c7f29
 	char *agmt_name;
7c7f29
 
7c7f29
-
7c7f29
 	PR_ASSERT (replGen && obj);
7c7f29
 
7c7f29
     file = (CL5DBFile*)object_get_data (obj);
7c7f29
@@ -4109,13 +4141,12 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
7c7f29
 
7c7f29
 	agmt_name = get_thread_private_agmtname();
7c7f29
 	
7c7f29
-    if (purge) /* read purge vector entry */
7c7f29
-	    key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
7c7f29
-    else /* read upper bound vector */
7c7f29
-        key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
7c7f29
-
7c7f29
+	if (purge) { /* read purge vector entry */
7c7f29
+		key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
7c7f29
+	} else { /* read upper bound vector */
7c7f29
+		key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
7c7f29
+	}
7c7f29
 	key.size = CSN_STRSIZE;
7c7f29
-
7c7f29
 	data.flags = DB_DBT_MALLOC;
7c7f29
 
7c7f29
 	rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
7c7f29
@@ -4125,13 +4156,13 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
7c7f29
 							rc = _cl5ReadBervals (&vals, &pos, data.size);
7c7f29
                             slapi_ch_free (&(data.data));
7c7f29
                             if (rc != CL5_SUCCESS)
7c7f29
-				goto done;
7c7f29
+                                goto done;
7c7f29
                             
7c7f29
-                            if (purge)
7c7f29
+                            if (purge) {
7c7f29
                                 rc = ruv_init_from_bervals(vals, &file->purgeRUV);							
7c7f29
-                            else
7c7f29
+                            } else {
7c7f29
                                 rc = ruv_init_from_bervals(vals, &file->maxRUV);	    
7c7f29
-
7c7f29
+                            }
7c7f29
                             if (rc != RUV_SUCCESS)
7c7f29
                             {
7c7f29
                                 slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
7c7f29
@@ -4139,7 +4170,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
7c7f29
                                     "RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc);
7c7f29
 						
7c7f29
                                 rc = CL5_RUV_ERROR;
7c7f29
-				goto done;
7c7f29
+                                goto done;
7c7f29
                             }
7c7f29
 
7c7f29
                             /* delete the entry; it is re-added when file
7c7f29
@@ -4151,7 +4182,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
7c7f29
 
7c7f29
 		case DB_NOTFOUND:	/* RUV is lost - need to construct */
7c7f29
                             rc = _cl5ConstructRUV (replGen, obj, purge);
7c7f29
-							goto done;
7c7f29
+                            goto done;
7c7f29
 		
7c7f29
 		default:			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
7c7f29
 								"%s: _cl5ReadRUV: failed to get purge RUV; "
7c7f29
@@ -6946,12 +6977,14 @@ cl5CleanRUV(ReplicaId rid){
7c7f29
     slapi_rwlock_unlock (s_cl5Desc.stLock);
7c7f29
 }
7c7f29
 
7c7f29
-void trigger_cl_purging(ReplicaId rid){
7c7f29
+/*
7c7f29
+ * Create a thread to purge a changelog of cleaned RIDs
7c7f29
+ */
7c7f29
+void trigger_cl_purging(Replica *replica){
7c7f29
     PRThread *trim_tid = NULL;
7c7f29
 
7c7f29
-    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
7c7f29
     trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
7c7f29
-                   (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
7c7f29
+                   (void *)replica, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
7c7f29
                    PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
7c7f29
     if (NULL == trim_tid){
7c7f29
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
7c7f29
@@ -6963,19 +6996,32 @@ void trigger_cl_purging(ReplicaId rid){
7c7f29
     }
7c7f29
 }
7c7f29
 
7c7f29
+/*
7c7f29
+ * Purge a changelog of entries that originated from a particular replica(rid)
7c7f29
+ */
7c7f29
 void
7c7f29
 trigger_cl_purging_thread(void *arg){
7c7f29
-    ReplicaId rid = *(ReplicaId *)arg;
7c7f29
+    Replica *replica = (Replica *)arg;
7c7f29
 
7c7f29
-    /* make sure we have a change log, and we aren't closing it */
7c7f29
-    if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
7c7f29
+    /* Make sure we have a change log, and we aren't closing it */
7c7f29
+    if (replica == NULL ||
7c7f29
+        s_cl5Desc.dbState == CL5_STATE_CLOSED ||
7c7f29
+        s_cl5Desc.dbState == CL5_STATE_CLOSING) {
7c7f29
         return;
7c7f29
     }
7c7f29
+
7c7f29
+    /* Bump the changelog thread count */
7c7f29
     if (CL5_SUCCESS != _cl5AddThread()) {
7c7f29
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
7c7f29
-            "trigger_cl_purging: failed to increment thread count "
7c7f29
+            "trigger_cl_purging: Abort - failed to increment thread count "
7c7f29
             "NSPR error - %d\n", PR_GetError ());
7c7f29
+        return;
7c7f29
     }
7c7f29
-    _cl5DoTrimming(rid);
7c7f29
+
7c7f29
+    /* Purge the changelog */
7c7f29
+    _cl5DoPurging(replica);
7c7f29
     _cl5RemoveThread();
7c7f29
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
7c7f29
+        "trigger_cl_purging: purged changelog for (%s) rid (%d)\n",
7c7f29
+        slapi_sdn_get_dn(replica_get_root(replica)), replica_get_rid(replica));
7c7f29
 }
7c7f29
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
7c7f29
index 4c3b8e8..1a1c2f5 100644
7c7f29
--- a/ldap/servers/plugins/replication/cl5_api.h
7c7f29
+++ b/ldap/servers/plugins/replication/cl5_api.h
7c7f29
@@ -467,6 +467,6 @@ int cl5WriteRUV();
7c7f29
 int cl5DeleteRUV();
7c7f29
 void cl5CleanRUV(ReplicaId rid);
7c7f29
 void cl5NotifyCleanup(int rid);
7c7f29
-void trigger_cl_purging(ReplicaId rid);
7c7f29
+void trigger_cl_purging(Replica *replica);
7c7f29
 
7c7f29
 #endif
7c7f29
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
7c7f29
index 59d3374..011e4ca 100644
7c7f29
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
7c7f29
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
7c7f29
@@ -1467,7 +1467,7 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
7c7f29
 	/*
7c7f29
 	 * Now purge the changelog
7c7f29
 	 */
7c7f29
-	trigger_cl_purging(rid);
7c7f29
+	trigger_cl_purging(replica);
7c7f29
 
7c7f29
 	if (rc != RUV_SUCCESS){
7c7f29
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc);
7c7f29
-- 
7c7f29
2.4.11
7c7f29