andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 months ago
Clone
dc8c34
From 9ba2b33ff1741cb09b40fd2f36b8656704b88a18 Mon Sep 17 00:00:00 2001
dc8c34
From: Mark Reynolds <mreynolds@redhat.com>
dc8c34
Date: Tue, 23 Aug 2016 12:06:30 -0400
dc8c34
Subject: [PATCH 401/404] Ticket 48964 - cleanAllRUV changelog purging
dc8c34
 incorrectly  processes all backends
dc8c34
dc8c34
Bug Description:  When the changelog was being purged of "cleaned" rids it was checking
dc8c34
                  all the backend changelogs, and not the one from which the
dc8c34
                  cleanAllRUV task originated from.  This could corrupt a different
dc8c34
                  backend's changelog if both backends used the same RID.
dc8c34
dc8c34
Fix Description:  Purge the changelog associated with the backend that is specified in
dc8c34
                  the cleanAllRUV task.  Also moved the "purging" to its own function,
dc8c34
                  and fixed a few compiler warnings.
dc8c34
dc8c34
https://fedorahosted.org/389/ticket/48965
dc8c34
dc8c34
Reviewed by: nhosoi(Thanks!)
dc8c34
dc8c34
(cherry picked from commit fda00435a7536c1ded72bb78a975f3370d09a3be)
dc8c34
(cherry picked from commit 0d5afb2bbd6d639ec59e6bdf075bd61d3d72ef79)
dc8c34
---
dc8c34
 ldap/servers/plugins/replication/cl5_api.c         | 163 +++++++++++++--------
dc8c34
 ldap/servers/plugins/replication/cl5_api.h         |   2 +-
dc8c34
 .../plugins/replication/repl5_replica_config.c     |   2 +-
dc8c34
 3 files changed, 106 insertions(+), 61 deletions(-)
dc8c34
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
dc8c34
index 259c054..d91574f 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_api.c
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_api.c
dc8c34
@@ -344,10 +344,9 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB
dc8c34
 static int _cl5TrimInit ();
dc8c34
 static void _cl5TrimCleanup ();
dc8c34
 static int _cl5TrimMain (void *param);
dc8c34
-static void _cl5DoTrimming (ReplicaId rid);
dc8c34
+static void _cl5DoTrimming ();
dc8c34
 static PRBool _cl5CanTrim (time_t time, long *numToTrim);
dc8c34
 static void _cl5TrimFile (Object *obj, long *numToTrim);
dc8c34
-
dc8c34
 static void _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid);
dc8c34
 static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
dc8c34
 static int _cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key);
dc8c34
@@ -3445,43 +3444,37 @@ static int _cl5TrimMain (void *param)
dc8c34
     return 0;
dc8c34
 }
dc8c34
 
dc8c34
-/* We remove an entry if it has been replayed to all consumers and
dc8c34
-   and the number of entries in the changelog is larger than maxEntries 
dc8c34
-   or age of the entry is larger than maxAge. 
dc8c34
-   Also we can't purge entries which correspond to max csns in the
dc8c34
-   supplier's ruv. Here is a example where we can get into trouble:
dc8c34
-   The server is setup with time based trimming and no consumer's
dc8c34
-   At some point all the entries are trimmed from the changelog.
dc8c34
-   At a later point a consumer is added and initialized online
dc8c34
-   Then a change is made on the supplier.
dc8c34
-   To update the consumer, the supplier would attempt to locate
dc8c34
-   the last change sent to the consumer in the changelog and will
dc8c34
-   fail because the change was removed.
dc8c34
-    
dc8c34
+/*
dc8c34
+ * We remove an entry if it has been replayed to all consumers and the number
dc8c34
+ * of entries in the changelog is larger than maxEntries or age of the entry
dc8c34
+ * is larger than maxAge.  Also we can't purge entries which correspond to max
dc8c34
+ * csns in the supplier's ruv. Here is a example where we can get into trouble:
dc8c34
+ *
dc8c34
+ *   The server is setup with time based trimming and no consumer's
dc8c34
+ *   At some point all the entries are trimmed from the changelog.
dc8c34
+ *   At a later point a consumer is added and initialized online.
dc8c34
+ *   Then a change is made on the supplier.
dc8c34
+ *   To update the consumer, the supplier would attempt to locate the last
dc8c34
+ *   change sent to the consumer in the changelog and will fail because the
dc8c34
+ *   change was removed.
dc8c34
  */
dc8c34
-
dc8c34
-static void _cl5DoTrimming (ReplicaId rid)
dc8c34
+static void _cl5DoTrimming ()
dc8c34
 {
dc8c34
 	Object *obj;
dc8c34
 	long numToTrim;
dc8c34
 
dc8c34
 	PR_Lock (s_cl5Desc.dbTrim.lock);
dc8c34
 
dc8c34
-	/* ONREPL We trim file by file which means that some files will be 
dc8c34
-	   trimmed more often than other. We might have to fix that by, for 
dc8c34
-	   example, randomizing starting point */
dc8c34
+	/*
dc8c34
+	 * We are trimming all the changelogs.  We trim file by file which
dc8c34
+	 * means that some files will be trimmed more often than other. We
dc8c34
+	 * might have to fix that by, for example, randomizing the starting
dc8c34
+	 * point.
dc8c34
+	 */
dc8c34
 	obj = objset_first_obj (s_cl5Desc.dbFiles);
dc8c34
-	while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
dc8c34
+	while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
dc8c34
 	{
dc8c34
-		if (rid){
dc8c34
-			/*
dc8c34
-			 * We are cleaning an invalid rid, and need to strip it
dc8c34
-			 * from the changelog.
dc8c34
-			 */
dc8c34
-			_cl5PurgeRID (obj, rid);
dc8c34
-		} else {
dc8c34
-			_cl5TrimFile (obj, &numToTrim);
dc8c34
-		}
dc8c34
+		_cl5TrimFile (obj, &numToTrim);
dc8c34
 		obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
dc8c34
 	}
dc8c34
 
dc8c34
@@ -3494,6 +3487,43 @@ static void _cl5DoTrimming (ReplicaId rid)
dc8c34
 }
dc8c34
 
dc8c34
 /*
dc8c34
+ * We are purging a changelog after a cleanAllRUV task.  Find the specific
dc8c34
+ * changelog for the backend that is being cleaned, and purge all the records
dc8c34
+ * with the cleaned rid.
dc8c34
+ */
dc8c34
+static void _cl5DoPurging (Replica *replica)
dc8c34
+{
dc8c34
+	ReplicaId rid = replica_get_rid(replica);
dc8c34
+	const Slapi_DN *sdn = replica_get_root(replica);
dc8c34
+	const char *replName = replica_get_name(replica);
dc8c34
+	char *replGen = replica_get_generation(replica);
dc8c34
+	char *fileName;
dc8c34
+	Object *obj;
dc8c34
+
dc8c34
+	PR_Lock (s_cl5Desc.dbTrim.lock);
dc8c34
+	fileName = _cl5MakeFileName (replName, replGen);
dc8c34
+	obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName);
dc8c34
+	if (obj) {
dc8c34
+		/* We found our changelog, now purge it */
dc8c34
+		_cl5PurgeRID (obj, rid);
dc8c34
+		object_release (obj);
dc8c34
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
dc8c34
+			"Purged rid (%d) from suffix (%s)\n",
dc8c34
+			rid, slapi_sdn_get_dn(sdn));
dc8c34
+	} else {
dc8c34
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
+			"Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n",
dc8c34
+			rid, fileName, slapi_sdn_get_dn(sdn));
dc8c34
+	}
dc8c34
+	PR_Unlock (s_cl5Desc.dbTrim.lock);
dc8c34
+
dc8c34
+	slapi_ch_free_string(&replGen);
dc8c34
+	slapi_ch_free_string(&fileName);
dc8c34
+
dc8c34
+	return;
dc8c34
+}
dc8c34
+
dc8c34
+/*
dc8c34
  * If the rid is not set it is the very first iteration of the changelog.
dc8c34
  * If the rid is set, we are doing another pass, and we have a key as our
dc8c34
  * starting point.
dc8c34
@@ -4005,23 +4035,25 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim)
dc8c34
 {
dc8c34
 	*numToTrim = 0;
dc8c34
 
dc8c34
-    if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0)
dc8c34
+	if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) {
dc8c34
 		return PR_FALSE;
dc8c34
-
dc8c34
+	}
dc8c34
 	if (s_cl5Desc.dbTrim.maxAge == 0)
dc8c34
 	{
dc8c34
 		*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries;
dc8c34
 		return ( *numToTrim > 0 );
dc8c34
 	}
dc8c34
 
dc8c34
-    if (s_cl5Desc.dbTrim.maxEntries > 0 &&
dc8c34
-		(*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0)
dc8c34
-    	return PR_TRUE;
dc8c34
+	if (s_cl5Desc.dbTrim.maxEntries > 0 &&
dc8c34
+	    (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) {
dc8c34
+		return PR_TRUE;
dc8c34
+	}
dc8c34
 
dc8c34
-	if (time)
dc8c34
+	if (time) {
dc8c34
 		return (current_time () - time > s_cl5Desc.dbTrim.maxAge);
dc8c34
-    else			
dc8c34
-	    return PR_TRUE;
dc8c34
+	} else {
dc8c34
+		return PR_TRUE;
dc8c34
+	}
dc8c34
 }  
dc8c34
 
dc8c34
 static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
dc8c34
@@ -4034,7 +4066,6 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
dc8c34
 	char *pos;
dc8c34
 	char *agmt_name;
dc8c34
 
dc8c34
-
dc8c34
 	PR_ASSERT (replGen && obj);
dc8c34
 
dc8c34
     file = (CL5DBFile*)object_get_data (obj);
dc8c34
@@ -4042,13 +4073,12 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
dc8c34
 
dc8c34
 	agmt_name = get_thread_private_agmtname();
dc8c34
 	
dc8c34
-    if (purge) /* read purge vector entry */
dc8c34
-	    key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
dc8c34
-    else /* read upper bound vector */
dc8c34
-        key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
dc8c34
-
dc8c34
+	if (purge) { /* read purge vector entry */
dc8c34
+		key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr);
dc8c34
+	} else { /* read upper bound vector */
dc8c34
+		key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr);
dc8c34
+	}
dc8c34
 	key.size = CSN_STRSIZE;
dc8c34
-
dc8c34
 	data.flags = DB_DBT_MALLOC;
dc8c34
 
dc8c34
 	rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0);
dc8c34
@@ -4058,13 +4088,13 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
dc8c34
 							rc = _cl5ReadBervals (&vals, &pos, data.size);
dc8c34
                             slapi_ch_free (&(data.data));
dc8c34
                             if (rc != CL5_SUCCESS)
dc8c34
-				goto done;
dc8c34
+                                goto done;
dc8c34
                             
dc8c34
-                            if (purge)
dc8c34
+                            if (purge) {
dc8c34
                                 rc = ruv_init_from_bervals(vals, &file->purgeRUV);							
dc8c34
-                            else
dc8c34
+                            } else {
dc8c34
                                 rc = ruv_init_from_bervals(vals, &file->maxRUV);	    
dc8c34
-
dc8c34
+                            }
dc8c34
                             if (rc != RUV_SUCCESS)
dc8c34
                             {
dc8c34
                                 slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
dc8c34
@@ -4072,7 +4102,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
dc8c34
                                     "RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc);
dc8c34
 						
dc8c34
                                 rc = CL5_RUV_ERROR;
dc8c34
-				goto done;
dc8c34
+                                goto done;
dc8c34
                             }
dc8c34
 
dc8c34
                             /* delete the entry; it is re-added when file
dc8c34
@@ -4084,7 +4114,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge)
dc8c34
 
dc8c34
 		case DB_NOTFOUND:	/* RUV is lost - need to construct */
dc8c34
                             rc = _cl5ConstructRUV (replGen, obj, purge);
dc8c34
-							goto done;
dc8c34
+                            goto done;
dc8c34
 		
dc8c34
 		default:			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
dc8c34
 								"%s: _cl5ReadRUV: failed to get purge RUV; "
dc8c34
@@ -6858,12 +6888,14 @@ cl5CleanRUV(ReplicaId rid){
dc8c34
     slapi_rwlock_unlock (s_cl5Desc.stLock);
dc8c34
 }
dc8c34
 
dc8c34
-void trigger_cl_purging(ReplicaId rid){
dc8c34
+/*
dc8c34
+ * Create a thread to purge a changelog of cleaned RIDs
dc8c34
+ */
dc8c34
+void trigger_cl_purging(Replica *replica){
dc8c34
     PRThread *trim_tid = NULL;
dc8c34
 
dc8c34
-    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
dc8c34
     trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
dc8c34
-                   (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
dc8c34
+                   (void *)replica, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
dc8c34
                    PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
dc8c34
     if (NULL == trim_tid){
dc8c34
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
@@ -6875,19 +6907,32 @@ void trigger_cl_purging(ReplicaId rid){
dc8c34
     }
dc8c34
 }
dc8c34
 
dc8c34
+/*
dc8c34
+ * Purge a changelog of entries that originated from a particular replica(rid)
dc8c34
+ */
dc8c34
 void
dc8c34
 trigger_cl_purging_thread(void *arg){
dc8c34
-    ReplicaId rid = *(ReplicaId *)arg;
dc8c34
+    Replica *replica = (Replica *)arg;
dc8c34
 
dc8c34
-    /* make sure we have a change log, and we aren't closing it */
dc8c34
-    if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){
dc8c34
+    /* Make sure we have a change log, and we aren't closing it */
dc8c34
+    if (replica == NULL ||
dc8c34
+        s_cl5Desc.dbState == CL5_STATE_CLOSED ||
dc8c34
+        s_cl5Desc.dbState == CL5_STATE_CLOSING) {
dc8c34
         return;
dc8c34
     }
dc8c34
+
dc8c34
+    /* Bump the changelog thread count */
dc8c34
     if (CL5_SUCCESS != _cl5AddThread()) {
dc8c34
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
dc8c34
-            "trigger_cl_purging: failed to increment thread count "
dc8c34
+            "trigger_cl_purging: Abort - failed to increment thread count "
dc8c34
             "NSPR error - %d\n", PR_GetError ());
dc8c34
+        return;
dc8c34
     }
dc8c34
-    _cl5DoTrimming(rid);
dc8c34
+
dc8c34
+    /* Purge the changelog */
dc8c34
+    _cl5DoPurging(replica);
dc8c34
     _cl5RemoveThread();
dc8c34
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
dc8c34
+        "trigger_cl_purging: purged changelog for (%s) rid (%d)\n",
dc8c34
+        slapi_sdn_get_dn(replica_get_root(replica)), replica_get_rid(replica));
dc8c34
 }
dc8c34
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
dc8c34
index b46a691..83683f5 100644
dc8c34
--- a/ldap/servers/plugins/replication/cl5_api.h
dc8c34
+++ b/ldap/servers/plugins/replication/cl5_api.h
dc8c34
@@ -493,6 +493,6 @@ int cl5WriteRUV();
dc8c34
 int cl5DeleteRUV();
dc8c34
 void cl5CleanRUV(ReplicaId rid);
dc8c34
 void cl5NotifyCleanup(int rid);
dc8c34
-void trigger_cl_purging(ReplicaId rid);
dc8c34
+void trigger_cl_purging(Replica *replica);
dc8c34
 
dc8c34
 #endif
dc8c34
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
dc8c34
index a1dceae..010e517 100644
dc8c34
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
dc8c34
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
dc8c34
@@ -1244,7 +1244,7 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
dc8c34
 	/*
dc8c34
 	 * Now purge the changelog
dc8c34
 	 */
dc8c34
-	trigger_cl_purging(rid);
dc8c34
+	trigger_cl_purging(replica);
dc8c34
 
dc8c34
 	if (rc != RUV_SUCCESS){
dc8c34
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc);
dc8c34
-- 
dc8c34
2.4.11
dc8c34