amoralej / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 years ago
Clone

Blame SOURCES/0010-Ticket-48208-CleanAllRUV-should-completely-purge-cha.patch

a2f18f
From 46cd28db8402517febf0c5db4f2f869c491c41c0 Mon Sep 17 00:00:00 2001
b161c9
From: Mark Reynolds <mreynolds@redhat.com>
b161c9
Date: Wed, 8 Jul 2015 11:48:27 -0400
a2f18f
Subject: [PATCH 10/20] Ticket 48208 - CleanAllRUV should completely purge
b161c9
 changelog
b161c9
b161c9
Bug Description:  After cleanAllRUV finishes, the changelog still
b161c9
                  contains entries from the cleaned rid.  Under certain
b161c9
                  conditions this can allow the RUV to get polluted
b161c9
                  again, and the ruv element will be missing the replica
b161c9
                  url.
b161c9
b161c9
Fix Description:  At the end of the cleaning task, fire of a thread to
b161c9
                  to completely purge the changelog of all entries
b161c9
                  containing the cleaned rid.
b161c9
b161c9
                  Also, improved the cleanAllRUV task when dealing
b161c9
                  with a server shutdown - previously if the timing is
b161c9
                  right the task can "delay/hang" the shutdown process.
b161c9
b161c9
https://fedorahosted.org/389/ticket/48208
b161c9
b161c9
Reviewed by: nhosoi(Thanks!)
b161c9
b161c9
(cherry picked from commit ff1c34538b0600259dba4801da2b2f0993fa5404)
b161c9
(cherry picked from commit 9e4cf12cfbfde0761325b75c3fd5a8b39223760a)
b161c9
---
b161c9
 ldap/servers/plugins/replication/cl5_api.c         | 447 ++++++++++++++++++---
b161c9
 ldap/servers/plugins/replication/cl5_api.h         |   5 +-
b161c9
 .../plugins/replication/repl5_replica_config.c     |  44 +-
b161c9
 3 files changed, 430 insertions(+), 66 deletions(-)
b161c9
b161c9
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
a2f18f
index a10c3ac..ae23353 100644
b161c9
--- a/ldap/servers/plugins/replication/cl5_api.c
b161c9
+++ b/ldap/servers/plugins/replication/cl5_api.c
a2f18f
@@ -319,14 +319,17 @@ static void _cl5TrimCleanup ();
b161c9
 static int _cl5TrimMain (void *param);
b161c9
 static void _cl5DoTrimming (ReplicaId rid);
b161c9
 static void _cl5CompactDBs();
b161c9
-static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid);
b161c9
+static void _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid);
b161c9
+static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key);
b161c9
+static int _cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key);
b161c9
+static void _cl5TrimFile (Object *obj, long *numToTrim);
b161c9
 static PRBool _cl5CanTrim (time_t time, long *numToTrim);
b161c9
 static int  _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge);
b161c9
 static int  _cl5WriteRUV (CL5DBFile *file, PRBool purge);
b161c9
 static int  _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge);
b161c9
 static int  _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge);
b161c9
 static int  _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv);
b161c9
-void trigger_cl_trimming_thread(void *rid);
b161c9
+void trigger_cl_purging_thread(void *rid);
b161c9
 
b161c9
 /* bakup/recovery, import/export */
b161c9
 static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op,
a2f18f
@@ -3470,9 +3473,17 @@ static void _cl5DoTrimming (ReplicaId rid)
b161c9
 	   trimmed more often than other. We might have to fix that by, for 
b161c9
 	   example, randomizing starting point */
b161c9
 	obj = objset_first_obj (s_cl5Desc.dbFiles);
b161c9
-	while (obj && _cl5CanTrim ((time_t)0, &numToTrim))
b161c9
+	while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid))
b161c9
 	{
b161c9
-		_cl5TrimFile (obj, &numToTrim, rid);
b161c9
+		if (rid){
b161c9
+			/*
b161c9
+			 * We are cleaning an invalid rid, and need to strip it
b161c9
+			 * from the changelog.
b161c9
+			 */
b161c9
+			_cl5PurgeRID (obj, rid);
b161c9
+		} else {
b161c9
+			_cl5TrimFile (obj, &numToTrim);
b161c9
+		}
b161c9
 		obj = objset_next_obj (s_cl5Desc.dbFiles, obj);
b161c9
 	}
b161c9
 
a2f18f
@@ -3549,12 +3560,351 @@ bail:
b161c9
 	return;
b161c9
 }
b161c9
 
b161c9
+/*
b161c9
+ * If the rid is not set it is the very first iteration of the changelog.
b161c9
+ * If the rid is set, we are doing another pass, and we have a key as our
b161c9
+ * starting point.
b161c9
+ */
b161c9
+static int
b161c9
+_cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key)
b161c9
+{
b161c9
+	DBC *cursor = NULL;
b161c9
+	DBT	data = {0};
b161c9
+	CL5Iterator *it;
b161c9
+	CL5DBFile *file;
b161c9
+	int rc;
b161c9
+
b161c9
+	file = (CL5DBFile*)object_get_data (obj);
b161c9
+
b161c9
+	/* create cursor */
b161c9
+	rc = file->db->cursor(file->db, txnid, &cursor, 0);
b161c9
+	if (rc != 0)
b161c9
+	{
b161c9
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+			"_cl5PurgeGetFirstEntry: failed to create cursor; db error - %d %s\n", rc, db_strerror(rc));
b161c9
+		rc = CL5_DB_ERROR;
b161c9
+		goto done;
b161c9
+	}
b161c9
+
b161c9
+	key->flags = DB_DBT_MALLOC;
b161c9
+	data.flags = DB_DBT_MALLOC;
b161c9
+	while ((rc = cursor->c_get(cursor, key, &data, rid?DB_SET:DB_NEXT)) == 0)
b161c9
+	{
b161c9
+		/* skip service entries on the first pass (rid == 0)*/
b161c9
+		if (!rid && cl5HelperEntry ((char*)key->data, NULL))
b161c9
+		{
b161c9
+			slapi_ch_free(&key->data);
b161c9
+			slapi_ch_free(&(data.data));
b161c9
+			continue;
b161c9
+		}
b161c9
+
b161c9
+		/* format entry */
b161c9
+		rc = cl5DBData2Entry(data.data, data.size, entry);
b161c9
+		slapi_ch_free(&(data.data));
b161c9
+		if (rc != 0)
b161c9
+		{
b161c9
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
b161c9
+				"_cl5PurgeGetFirstEntry: failed to format entry: %d\n", rc);
b161c9
+			goto done;
b161c9
+		}
b161c9
+
b161c9
+		it = (CL5Iterator*)slapi_ch_malloc(sizeof (CL5Iterator));
b161c9
+		it->cursor  = cursor;
b161c9
+		object_acquire (obj);
b161c9
+		it->file = obj;
b161c9
+		*(CL5Iterator**)iterator = it;
b161c9
+
b161c9
+		return CL5_SUCCESS;
b161c9
+	}
b161c9
+
b161c9
+	slapi_ch_free(&key->data);
b161c9
+	slapi_ch_free(&(data.data));
b161c9
+
b161c9
+	/* walked of the end of the file */
b161c9
+	if (rc == DB_NOTFOUND)
b161c9
+	{
b161c9
+		rc = CL5_NOTFOUND;
b161c9
+		goto done;
b161c9
+	}
b161c9
+
b161c9
+	/* db error occured while iterating */
b161c9
+	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+				"_cl5PurgeGetFirstEntry: failed to get entry; db error - %d %s\n",
b161c9
+				rc, db_strerror(rc));
b161c9
+	rc = CL5_DB_ERROR;
b161c9
+
b161c9
+done:
b161c9
+	/*
b161c9
+	 * We didn't success in assigning this cursor to the iterator,
b161c9
+	 * so we need to free the cursor here.
b161c9
+	 */
b161c9
+	if (cursor)
b161c9
+		cursor->c_close(cursor);
b161c9
+
b161c9
+	return rc;
b161c9
+}
b161c9
+
b161c9
+/*
b161c9
+ * Get the next entry.  If we get a lock error we will restart the process
b161c9
+ * starting at the current key.
b161c9
+ */
b161c9
+static int
b161c9
+_cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key)
b161c9
+{
b161c9
+	CL5Iterator *it;
b161c9
+	DBT data={0};
b161c9
+	int rc;
b161c9
+
b161c9
+	it = (CL5Iterator*) iterator;
b161c9
+
b161c9
+	key->flags = DB_DBT_MALLOC;
b161c9
+	data.flags = DB_DBT_MALLOC;
b161c9
+	while ((rc = it->cursor->c_get(it->cursor, key, &data, DB_NEXT)) == 0)
b161c9
+	{
b161c9
+		if (cl5HelperEntry ((char*)key->data, NULL))
b161c9
+		{
b161c9
+			slapi_ch_free(&key->data);
b161c9
+			slapi_ch_free(&(data.data));
b161c9
+			continue;
b161c9
+		}
b161c9
+
b161c9
+		/* format entry */
b161c9
+		rc = cl5DBData2Entry (data.data, data.size, entry);
b161c9
+		slapi_ch_free (&(data.data));
b161c9
+		if (rc != 0)
b161c9
+		{
b161c9
+			if (rc != CL5_DB_LOCK_ERROR){
b161c9
+				/* Not a lock error, free the key */
b161c9
+				slapi_ch_free(&key->data);
b161c9
+			}
b161c9
+			slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL,
b161c9
+				repl_plugin_name_cl,
b161c9
+				"_cl5PurgeGetNextEntry: failed to format entry: %d\n",
b161c9
+				rc);
b161c9
+
b161c9
+		}
b161c9
+
b161c9
+		return rc;
b161c9
+	}
b161c9
+	slapi_ch_free(&(data.data));
b161c9
+
b161c9
+	/* walked of the end of the file or entry is out of range */
b161c9
+	if (rc == 0 || rc == DB_NOTFOUND){
b161c9
+		slapi_ch_free(&key->data);
b161c9
+		return CL5_NOTFOUND;
b161c9
+	}
b161c9
+	if (rc != CL5_DB_LOCK_ERROR){
b161c9
+		/* Not a lock error, free the key */
b161c9
+		slapi_ch_free(&key->data);
b161c9
+	}
b161c9
+
b161c9
+	/* cursor operation failed */
b161c9
+	slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL,
b161c9
+		repl_plugin_name_cl,
b161c9
+		"_cl5PurgeGetNextEntry: failed to get entry; db error - %d %s\n",
b161c9
+		rc, db_strerror(rc));
b161c9
+
b161c9
+	return rc;
b161c9
+}
b161c9
+
b161c9
+#define MAX_RETRIES 10
b161c9
+/*
b161c9
+ *  _cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid)
b161c9
+ *
b161c9
+ *  Clean the entire changelog of updates from the "cleaned rid" via CLEANALLRUV
b161c9
+ *  Delete entries in batches so we don't consume too many db locks, and we don't
b161c9
+ *  lockup the changelog during the entire purging process using one transaction.
b161c9
+ *  We save the key from the last iteration so we don't have to start from the
b161c9
+ *  beginning for each new iteration.
b161c9
+ */
b161c9
+static void
b161c9
+_cl5PurgeRID(Object *obj,  ReplicaId cleaned_rid)
b161c9
+{
b161c9
+	slapi_operation_parameters op = {0};
b161c9
+	ReplicaId csn_rid;
b161c9
+	CL5Entry entry;
b161c9
+	DB_TXN *txnid = NULL;
b161c9
+	DBT key = {0};
b161c9
+	void *iterator = NULL;
b161c9
+	long totalTrimmed = 0;
b161c9
+	long trimmed = 0;
b161c9
+	char *starting_key = NULL;
b161c9
+	int batch_count = 0;
b161c9
+	int db_lock_retry_count = 0;
b161c9
+	int first_pass = 1;
b161c9
+	int finished = 0;
b161c9
+	int rc = 0;
b161c9
+
b161c9
+	PR_ASSERT (obj);
b161c9
+	entry.op = &op;
b161c9
+
b161c9
+	/*
b161c9
+	 * Keep processing the changelog until we are done, shutting down, or we
b161c9
+	 * maxed out on the db lock retries.
b161c9
+	 */
b161c9
+	while (!finished && db_lock_retry_count < MAX_RETRIES && !slapi_is_shutting_down()){
b161c9
+		trimmed = 0;
b161c9
+
b161c9
+		/*
b161c9
+		 * Sleep a bit to allow others to use the changelog - we can't hog the
b161c9
+		 * changelog for the entire purge.
b161c9
+		 */
b161c9
+		DS_Sleep(PR_MillisecondsToInterval(100));
b161c9
+
b161c9
+		rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0);
b161c9
+		if (rc != 0){
b161c9
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+				"_cl5PurgeRID: failed to begin transaction; db error - %d %s.  "
b161c9
+				"Changelog was not purged of rid(%d)\n",
b161c9
+				rc, db_strerror(rc), cleaned_rid);
b161c9
+			return;
b161c9
+		}
b161c9
+
b161c9
+		/*
b161c9
+		 * Check every changelog entry for the cleaned rid
b161c9
+		 */
b161c9
+		rc = _cl5PurgeGetFirstEntry(obj, &entry, &iterator, txnid, first_pass?0:cleaned_rid, &key);
b161c9
+		first_pass = 0;
b161c9
+		while (rc == CL5_SUCCESS && !slapi_is_shutting_down()) {
b161c9
+			/*
b161c9
+			 * Store the new starting key - we need this starting key in case
b161c9
+			 * we run out of locks and have to start the transaction over.
b161c9
+			 */
b161c9
+			slapi_ch_free_string(&starting_key);
b161c9
+			starting_key = slapi_ch_strdup((char*)key.data);
b161c9
+
b161c9
+			if(trimmed == 10000 || (batch_count && trimmed == batch_count)){
b161c9
+				/*
b161c9
+				 * Break out, and commit these deletes.  Do not free the key,
b161c9
+				 * we need it for the next pass.
b161c9
+				 */
b161c9
+				cl5_operation_parameters_done (&op);
b161c9
+				db_lock_retry_count = 0; /* reset the retry count */
b161c9
+				break;
b161c9
+			}
b161c9
+			if(op.csn){
b161c9
+				csn_rid = csn_get_replicaid (op.csn);
b161c9
+				if (csn_rid == cleaned_rid){
b161c9
+					rc = _cl5CurrentDeleteEntry (iterator);
b161c9
+					if (rc != CL5_SUCCESS){
b161c9
+						/* log error */
b161c9
+						cl5_operation_parameters_done (&op);
b161c9
+						if (rc == CL5_DB_LOCK_ERROR){
b161c9
+							/*
b161c9
+							 * Ran out of locks, need to restart the transaction.
b161c9
+							 * Reduce the the batch count and reset the key to
b161c9
+							 * the starting point
b161c9
+							 */
b161c9
+							slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
b161c9
+								"_cl5PurgeRID: Ran out of db locks deleting entry.  "
b161c9
+								"Reduce the batch value and restart.\n");
b161c9
+							batch_count = trimmed - 10;
b161c9
+							if (batch_count < 10){
b161c9
+								batch_count = 10;
b161c9
+							}
b161c9
+							trimmed = 0;
b161c9
+							slapi_ch_free(&(key.data));
b161c9
+							key.data = starting_key;
b161c9
+							starting_key = NULL;
b161c9
+							db_lock_retry_count++;
b161c9
+							break;
b161c9
+						} else {
b161c9
+							/* fatal error */
b161c9
+							slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+								"_cl5PurgeRID: fatal error (%d)\n", rc);
b161c9
+							slapi_ch_free(&(key.data));
b161c9
+							finished = 1;
b161c9
+							break;
b161c9
+						}
b161c9
+					}
b161c9
+					trimmed++;
b161c9
+				}
b161c9
+			}
b161c9
+			slapi_ch_free(&(key.data));
b161c9
+			cl5_operation_parameters_done (&op);
b161c9
+
b161c9
+			rc = _cl5PurgeGetNextEntry (&entry, iterator, &key);
b161c9
+			if (rc == CL5_DB_LOCK_ERROR){
b161c9
+				/*
b161c9
+				 * Ran out of locks, need to restart the transaction.
b161c9
+				 * Reduce the the batch count and reset the key to the starting
b161c9
+				 * point.
b161c9
+				 */
b161c9
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+					"_cl5PurgeRID: Ran out of db locks getting the next entry.  "
b161c9
+					"Reduce the batch value and restart.\n");
b161c9
+				batch_count = trimmed - 10;
b161c9
+				if (batch_count < 10){
b161c9
+					batch_count = 10;
b161c9
+				}
b161c9
+				trimmed = 0;
b161c9
+				cl5_operation_parameters_done (&op);
b161c9
+				slapi_ch_free(&(key.data));
b161c9
+				key.data = starting_key;
b161c9
+				starting_key = NULL;
b161c9
+				db_lock_retry_count++;
b161c9
+				break;
b161c9
+			}
b161c9
+		}
b161c9
+
b161c9
+		if (rc == CL5_NOTFOUND){
b161c9
+			/* Scanned the entire changelog, we're done */
b161c9
+			finished = 1;
b161c9
+		}
b161c9
+
b161c9
+		/* Destroy the iterator before we finish with the txn */
b161c9
+		cl5DestroyIterator (iterator);
b161c9
+
b161c9
+		/*
b161c9
+		 * Commit or abort the txn
b161c9
+		 */
b161c9
+		if (rc == CL5_SUCCESS || rc == CL5_NOTFOUND){
b161c9
+			rc = TXN_COMMIT (txnid, 0);
b161c9
+			if (rc != 0){
b161c9
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+					"_cl5PurgeRID: failed to commit transaction; db error - %d %s.  "
b161c9
+					"Changelog was not completely purged of rid (%d)\n",
b161c9
+					rc, db_strerror(rc), cleaned_rid);
b161c9
+				break;
b161c9
+			} else if (finished){
b161c9
+				/* We're done  */
b161c9
+				totalTrimmed += trimmed;
b161c9
+				break;
b161c9
+			} else {
b161c9
+				/* Not done yet */
b161c9
+				totalTrimmed += trimmed;
b161c9
+				trimmed = 0;
b161c9
+			}
b161c9
+		} else {
b161c9
+			rc = TXN_ABORT (txnid);
b161c9
+			if (rc != 0){
b161c9
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+					"_cl5PurgeRID: failed to abort transaction; db error - %d %s.  "
b161c9
+					"Changelog was not completely purged of rid (%d)\n",
b161c9
+					rc, db_strerror(rc), cleaned_rid);
b161c9
+			}
b161c9
+			if (batch_count == 0){
b161c9
+				/* This was not a retry.  Fatal error, break out */
b161c9
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+					"_cl5PurgeRID: Changelog was not purged of rid (%d)\n",
b161c9
+					cleaned_rid);
b161c9
+				break;
b161c9
+			}
b161c9
+		}
b161c9
+	}
b161c9
+	slapi_ch_free_string(&starting_key);
b161c9
+
b161c9
+	slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name_cl,
b161c9
+		"_cl5PurgeRID: Removed (%ld entries) that originated from rid (%d)\n",
b161c9
+		totalTrimmed, cleaned_rid);
b161c9
+}
b161c9
+
b161c9
 /* Note that each file contains changes for a single replicated area.
b161c9
    trimming algorithm:
b161c9
 */
b161c9
 #define CL5_TRIM_MAX_PER_TRANSACTION 10
b161c9
 
b161c9
-static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
+static void _cl5TrimFile (Object *obj, long *numToTrim)
b161c9
 {
b161c9
 	DB_TXN *txnid;
b161c9
 	RUV *ruv = NULL;
a2f18f
@@ -3577,7 +3927,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
 	}
b161c9
 
b161c9
 	entry.op = &op;
b161c9
-
b161c9
 	while ( !finished && !slapi_is_shutting_down() )
b161c9
 	{
b161c9
 		it = NULL;
a2f18f
@@ -3598,7 +3947,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
 		}
b161c9
 
b161c9
 		finished = _cl5GetFirstEntry (obj, &entry, &it, txnid);
b161c9
-		while ( !finished )
b161c9
+		while ( !finished && !slapi_is_shutting_down())
b161c9
 		{
b161c9
         	/*
b161c9
 			 * This change can be trimmed if it exceeds purge
a2f18f
@@ -3612,11 +3961,12 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
 				continue;
b161c9
 			}
b161c9
 			csn_rid = csn_get_replicaid (op.csn);
b161c9
+
b161c9
 			if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) &&
b161c9
 				 ruv_covers_csn_strict (ruv, op.csn) )
b161c9
 			{
b161c9
 				rc = _cl5CurrentDeleteEntry (it);
b161c9
-				if ( rc == CL5_SUCCESS && cleaned_rid != csn_rid)
b161c9
+				if ( rc == CL5_SUCCESS)
b161c9
 				{
b161c9
 					rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE);				
b161c9
 				}
a2f18f
@@ -3630,7 +3980,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
 					/* The above two functions have logged the error */
b161c9
 					abort = PR_TRUE;
b161c9
 				}
b161c9
-
b161c9
 			}
b161c9
 			else
b161c9
 			{
a2f18f
@@ -3687,7 +4036,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
 			rc = TXN_ABORT (txnid);
b161c9
 			if (rc != 0)
b161c9
 			{
b161c9
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
b161c9
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
 					"_cl5TrimFile: failed to abort transaction; db error - %d %s\n",
b161c9
 					rc, db_strerror(rc));	
b161c9
 			}
a2f18f
@@ -3698,7 +4047,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid)
b161c9
 			if (rc != 0)
b161c9
 			{
b161c9
 				finished = 1;
b161c9
-				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
b161c9
+				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
 					"_cl5TrimFile: failed to commit transaction; db error - %d %s\n",
b161c9
 					rc, db_strerror(rc));
b161c9
 			}
a2f18f
@@ -4722,9 +5071,9 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
b161c9
 				goto done;
b161c9
 			}
b161c9
 #endif
b161c9
-			/* back off */			
b161c9
+			/* back off */
b161c9
     		interval = PR_MillisecondsToInterval(slapi_rand() % 100);
b161c9
-    		DS_Sleep(interval);		
b161c9
+    		DS_Sleep(interval);
b161c9
 		}
b161c9
 #if USE_DB_TXN
b161c9
 		/* begin transaction */
a2f18f
@@ -4770,19 +5119,19 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
b161c9
 		}
b161c9
 		cnt ++;
b161c9
 	}
b161c9
-    
b161c9
+
b161c9
 	if (rc == 0) /* we successfully added entry */
b161c9
 	{
b161c9
 #if USE_DB_TXN
b161c9
 		rc = TXN_COMMIT (txnid, 0);
b161c9
 #endif
b161c9
 	}
b161c9
-	else	
b161c9
+	else
b161c9
 	{
b161c9
-		char s[CSN_STRSIZE];		
b161c9
-		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
b161c9
+		char s[CSN_STRSIZE];
b161c9
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
 						"_cl5WriteOperationTxn: failed to write entry with csn (%s); "
b161c9
-						"db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s), 
b161c9
+						"db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s),
b161c9
 						rc, db_strerror(rc));
b161c9
 #if USE_DB_TXN
b161c9
 		rc = TXN_ABORT (txnid);
a2f18f
@@ -4803,7 +5152,7 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen,
b161c9
     /* update purge vector if we have not seen any changes from this replica before */
b161c9
     _cl5UpdateRUV (file_obj, op->csn, PR_TRUE, PR_TRUE);
b161c9
 
b161c9
-	slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, 
b161c9
+	slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl,
b161c9
 			"cl5WriteOperationTxn: successfully written entry with csn (%s)\n", csnStr);
b161c9
 	rc = CL5_SUCCESS;
b161c9
 done:
a2f18f
@@ -4817,7 +5166,7 @@ done:
b161c9
 	return rc;
b161c9
 }
b161c9
 
b161c9
-static int _cl5WriteOperation(const char *replName, const char *replGen, 
b161c9
+static int _cl5WriteOperation(const char *replName, const char *replGen,
b161c9
                               const slapi_operation_parameters *op, PRBool local)
b161c9
 {
b161c9
     return _cl5WriteOperationTxn(replName, replGen, op, local, NULL);
a2f18f
@@ -4868,7 +5217,7 @@ static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_
b161c9
 			goto done;
b161c9
 		}
b161c9
 
b161c9
-		it = (CL5Iterator*)slapi_ch_malloc (sizeof (CL5Iterator));
b161c9
+		it = (CL5Iterator*)slapi_ch_malloc(sizeof (CL5Iterator));
b161c9
 		it->cursor  = cursor;
b161c9
 		object_acquire (obj);
b161c9
 		it->file = obj;
a2f18f
@@ -4943,7 +5292,7 @@ static int _cl5GetNextEntry (CL5Entry *entry, void *iterator)
b161c9
 		slapi_ch_free (&(data.data));
b161c9
 		if (rc != 0)
b161c9
 		{
b161c9
-			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
b161c9
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
 				"_cl5GetNextEntry: failed to format entry: %d\n", rc);
b161c9
 		}
b161c9
 
a2f18f
@@ -4972,38 +5321,42 @@ static int _cl5GetNextEntry (CL5Entry *entry, void *iterator)
b161c9
 	}
b161c9
 
b161c9
 	/* cursor operation failed */
b161c9
-	slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
b161c9
-			"_cl5GetNextEntry: failed to get entry; db error - %d %s\n", 
b161c9
-			rc, db_strerror(rc));
b161c9
+	slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL,
b161c9
+		repl_plugin_name_cl,
b161c9
+		"_cl5GetNextEntry: failed to get entry; db error - %d %s\n",
b161c9
+		rc, db_strerror(rc));
b161c9
 
b161c9
-	return CL5_DB_ERROR;
b161c9
+	return rc;
b161c9
 }
b161c9
 
b161c9
 static int _cl5CurrentDeleteEntry (void *iterator)
b161c9
 {
b161c9
 	int rc;
b161c9
 	CL5Iterator *it;
b161c9
-    CL5DBFile *file;
b161c9
+	CL5DBFile *file;
b161c9
 
b161c9
-    PR_ASSERT (iterator);
b161c9
+	PR_ASSERT (iterator);
b161c9
 
b161c9
 	it = (CL5Iterator*)iterator;
b161c9
 
b161c9
 	rc = it->cursor->c_del (it->cursor, 0);
b161c9
 
b161c9
 	if (rc == 0) {        
b161c9
-            /* decrement entry count */
b161c9
-            file = (CL5DBFile*)object_get_data (it->file);
b161c9
-            PR_AtomicDecrement (&file->entryCount);
b161c9
-            return CL5_SUCCESS;
b161c9
-        } else {
b161c9
-            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
b161c9
-                            "_cl5CurrentDeleteEntry failed, err=%d %s\n", 
b161c9
-                            rc, db_strerror(rc));
b161c9
-	    /* We don't free(close) the cursor here, as the caller will free it by a call to cl5DestroyIterator */
b161c9
-	    /* Freeing it here is a potential bug, as the cursor can't be referenced later once freed */
b161c9
-            return CL5_DB_ERROR;
b161c9
-        }
b161c9
+		/* decrement entry count */
b161c9
+		file = (CL5DBFile*)object_get_data (it->file);
b161c9
+		PR_AtomicDecrement (&file->entryCount);
b161c9
+		return CL5_SUCCESS;
b161c9
+	} else {
b161c9
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
+			"_cl5CurrentDeleteEntry failed, err=%d %s\n",
b161c9
+			rc, db_strerror(rc));
b161c9
+		/*
b161c9
+		 * We don't free(close) the cursor here, as the caller will free it by
b161c9
+		 * a call to cl5DestroyIterator.  Freeing it here is a potential bug,
b161c9
+		 * as the cursor can't be referenced later once freed.
b161c9
+		 */
b161c9
+		return rc;
b161c9
+	}
b161c9
 }
b161c9
 
b161c9
 static PRBool _cl5IsValidIterator (const CL5Iterator *iterator)
a2f18f
@@ -6275,7 +6628,7 @@ static int _cl5ExportFile (PRFileDesc *prFile, Object *obj)
b161c9
 	slapi_write_buffer (prFile, "\n", strlen("\n"));
b161c9
 
b161c9
 	entry.op = &op;
b161c9
-	rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); 
b161c9
+	rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL);
b161c9
 	while (rc == CL5_SUCCESS)
b161c9
 	{
b161c9
 		rc = _cl5Operation2LDIF (&op, file->replGen, &buff, &len;;
a2f18f
@@ -6696,16 +7049,16 @@ cl5CleanRUV(ReplicaId rid){
b161c9
     slapi_rwlock_unlock (s_cl5Desc.stLock);
b161c9
 }
b161c9
 
b161c9
-void trigger_cl_trimming(ReplicaId rid){
b161c9
+void trigger_cl_purging(ReplicaId rid){
b161c9
     PRThread *trim_tid = NULL;
b161c9
 
b161c9
-    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_trimming: rid (%d)\n",(int)rid);
b161c9
-    trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_trimming_thread,
b161c9
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid);
b161c9
+    trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread,
b161c9
                    (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD,
b161c9
                    PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE);
b161c9
     if (NULL == trim_tid){
b161c9
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
-            "trigger_cl_trimming: failed to create trimming "
b161c9
+            "trigger_cl_purging: failed to create trimming "
b161c9
             "thread; NSPR error - %d\n", PR_GetError ());
b161c9
     } else {
b161c9
         /* need a little time for the thread to get started */
a2f18f
@@ -6714,7 +7067,7 @@ void trigger_cl_trimming(ReplicaId rid){
b161c9
 }
b161c9
 
b161c9
 void
b161c9
-trigger_cl_trimming_thread(void *arg){
b161c9
+trigger_cl_purging_thread(void *arg){
b161c9
     ReplicaId rid = *(ReplicaId *)arg;
b161c9
 
b161c9
     /* make sure we have a change log, and we aren't closing it */
a2f18f
@@ -6723,7 +7076,7 @@ trigger_cl_trimming_thread(void *arg){
b161c9
     }
b161c9
     if (CL5_SUCCESS != _cl5AddThread()) {
b161c9
         slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
b161c9
-            "trigger_cl_trimming: failed to increment thread count "
b161c9
+            "trigger_cl_purging: failed to increment thread count "
b161c9
             "NSPR error - %d\n", PR_GetError ());
b161c9
     }
b161c9
     _cl5DoTrimming(rid);
b161c9
diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h
a2f18f
index 5809570..4c3b8e8 100644
b161c9
--- a/ldap/servers/plugins/replication/cl5_api.h
b161c9
+++ b/ldap/servers/plugins/replication/cl5_api.h
a2f18f
@@ -117,6 +117,9 @@ enum
b161c9
 	CL5_CSN_ERROR,		/* CSN API failed */
b161c9
 	CL5_RUV_ERROR,		/* RUV API failed */
b161c9
 	CL5_OBJSET_ERROR,	/* namedobjset api failed */
b161c9
+	CL5_DB_LOCK_ERROR,  /* bdb returns error 12 when the db runs out of locks,
b161c9
+	                       this var needs to be in slot 12 of the list.
b161c9
+	                       Do not re-order enum above! */
b161c9
 	CL5_PURGED_DATA,    /* requested data has been purged */
b161c9
 	CL5_MISSING_DATA,   /* data should be in the changelog, but is missing */
b161c9
 	CL5_UNKNOWN_ERROR,	/* unclassified error */
a2f18f
@@ -464,6 +467,6 @@ int cl5WriteRUV();
b161c9
 int cl5DeleteRUV();
b161c9
 void cl5CleanRUV(ReplicaId rid);
b161c9
 void cl5NotifyCleanup(int rid);
b161c9
-void trigger_cl_trimming(ReplicaId rid);
b161c9
+void trigger_cl_purging(ReplicaId rid);
b161c9
 
b161c9
 #endif
b161c9
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
a2f18f
index 660b134..faa86b8 100644
b161c9
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
b161c9
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
a2f18f
@@ -1439,6 +1439,11 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not
b161c9
 	 */
b161c9
 	cl5CleanRUV(rid);
b161c9
 
b161c9
+	/*
b161c9
+	 * Now purge the changelog
b161c9
+	 */
b161c9
+	trigger_cl_purging(rid);
b161c9
+
b161c9
 	if (rc != RUV_SUCCESS){
b161c9
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc);
b161c9
 		return LDAP_OPERATIONS_ERROR;
a2f18f
@@ -1837,7 +1842,7 @@ replica_cleanallruv_thread(void *arg)
b161c9
             /* no agmts, just clean this replica */
b161c9
             break;
b161c9
         }
b161c9
-        while (agmt_obj){
b161c9
+        while (agmt_obj && !slapi_is_shutting_down()){
b161c9
             agmt = (Repl_Agmt*)object_get_data (agmt_obj);
b161c9
             if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){
b161c9
                 agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj);
a2f18f
@@ -1919,13 +1924,15 @@ replica_cleanallruv_thread(void *arg)
b161c9
             break;
b161c9
         }
b161c9
         /*
b161c9
-         *  need to sleep between passes
b161c9
+         * Need to sleep between passes unless we are shutting down
b161c9
          */
a2f18f
-        cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, "Replicas have not been cleaned yet, "
b161c9
-            "retrying in %d seconds", interval);
b161c9
-        PR_Lock( notify_lock );
b161c9
-        PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
b161c9
-        PR_Unlock( notify_lock );
b161c9
+        if (!slapi_is_shutting_down()){
a2f18f
+            cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, "Replicas have not been cleaned yet, "
b161c9
+                "retrying in %d seconds", interval);
b161c9
+            PR_Lock( notify_lock );
b161c9
+            PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
b161c9
+            PR_Unlock( notify_lock );
b161c9
+        }
b161c9
 
b161c9
         if(interval < 14400){ /* 4 hour max */
b161c9
             interval = interval * 2;
a2f18f
@@ -1936,10 +1943,9 @@ replica_cleanallruv_thread(void *arg)
b161c9
 
b161c9
 done:
b161c9
     /*
b161c9
-     *  If the replicas are cleaned, release the rid, and trim the changelog
b161c9
+     *  If the replicas are cleaned, release the rid
b161c9
      */
b161c9
     if(!aborted){
b161c9
-        trigger_cl_trimming(data->rid);
b161c9
         delete_cleaned_rid_config(data);
b161c9
         /* make sure all the replicas have been "pre_cleaned" before finishing */
b161c9
         check_replicas_are_done_cleaning(data);
a2f18f
@@ -1949,7 +1955,7 @@ done:
b161c9
         /*
b161c9
          *  Shutdown or abort
b161c9
          */
b161c9
-        if(!is_task_aborted(data->rid)){
b161c9
+        if(!is_task_aborted(data->rid) || slapi_is_shutting_down()){
a2f18f
             cleanruv_log(data->task, data->rid, CLEANALLRUV_ID,"Server shutting down.  Process will resume at server startup");
b161c9
         } else {
a2f18f
             cleanruv_log(data->task, data->rid, CLEANALLRUV_ID,"Task aborted for rid(%d).",data->rid);
a2f18f
@@ -2184,7 +2190,7 @@ check_agmts_are_caught_up(cleanruv_data *data, char *maxcsn)
b161c9
             not_all_caughtup = 0;
b161c9
             break;
b161c9
         }
b161c9
-        while (agmt_obj){
b161c9
+        while (agmt_obj && !slapi_is_shutting_down()){
b161c9
             agmt = (Repl_Agmt*)object_get_data (agmt_obj);
b161c9
             if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){
b161c9
                 agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj);
a2f18f
@@ -2242,7 +2248,7 @@ check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task)
b161c9
             not_all_alive = 0;
b161c9
             break;
b161c9
         }
b161c9
-        while (agmt_obj){
b161c9
+        while (agmt_obj && !slapi_is_shutting_down()){
b161c9
             agmt = (Repl_Agmt*)object_get_data (agmt_obj);
b161c9
             if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){
b161c9
                 agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj);
a2f18f
@@ -3022,12 +3028,14 @@ replica_abort_task_thread(void *arg)
b161c9
             break;
b161c9
         }
b161c9
         /*
b161c9
-         *  need to sleep between passes
b161c9
+         *  Need to sleep between passes. unless we are shutting down
b161c9
          */
a2f18f
-        cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval);
b161c9
-        PR_Lock( notify_lock );
b161c9
-        PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
b161c9
-        PR_Unlock( notify_lock );
b161c9
+        if (!slapi_is_shutting_down()){
a2f18f
+            cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval);
b161c9
+            PR_Lock( notify_lock );
b161c9
+            PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) );
b161c9
+            PR_Unlock( notify_lock );
b161c9
+        }
b161c9
 
b161c9
         if(interval < 14400){ /* 4 hour max */
b161c9
             interval = interval * 2;
a2f18f
@@ -3045,7 +3053,7 @@ done:
b161c9
          *  Wait for this server to stop its cleanallruv task(which removes the rid from the cleaned list)
b161c9
          */
a2f18f
         cleanruv_log(data->task, data->rid, ABORT_CLEANALLRUV_ID, "Waiting for CleanAllRUV task to abort...");
b161c9
-        while(is_cleaned_rid(data->rid)){
b161c9
+        while(is_cleaned_rid(data->rid) && !slapi_is_shutting_down()){
b161c9
             DS_Sleep(PR_SecondsToInterval(1));
b161c9
             count++;
b161c9
             if(count == 60){ /* it should not take this long */
b161c9
-- 
b161c9
1.9.3
b161c9