From d32a172a4bc927a5eb72acecfe07ba7fa8ea3a55 Mon Sep 17 00:00:00 2001 From: Mark Reynolds Date: Wed, 8 Jul 2015 11:48:27 -0400 Subject: [PATCH 80/84] Ticket 48208 - CleanAllRUV should completely purge changelog Bug Description: After cleanAllRUV finishes, the changelog still contains entries from the cleaned rid. Under certain conditions this can allow the RUV to get polluted again, and the ruv element will be missing the replica url. Fix Description: At the end of the cleaning task, fire of a thread to to completely purge the changelog of all entries containing the cleaned rid. Also, improved the cleanAllRUV task when dealing with a server shutdown - previously if the timing is right the task can "delay/hang" the shutdown process. https://fedorahosted.org/389/ticket/48208 Reviewed by: nhosoi(Thanks!) (cherry picked from commit ff1c34538b0600259dba4801da2b2f0993fa5404) (cherry picked from commit 9e4cf12cfbfde0761325b75c3fd5a8b39223760a) (cherry picked from commit 46cd28db8402517febf0c5db4f2f869c491c41c0) --- ldap/servers/plugins/replication/cl5_api.c | 447 ++++++++++++++++++--- ldap/servers/plugins/replication/cl5_api.h | 5 +- .../plugins/replication/repl5_replica_config.c | 44 +- 3 files changed, 430 insertions(+), 66 deletions(-) diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c index 42e52ae..c5840b5 100644 --- a/ldap/servers/plugins/replication/cl5_api.c +++ b/ldap/servers/plugins/replication/cl5_api.c @@ -353,14 +353,17 @@ static void _cl5TrimCleanup (); static int _cl5TrimMain (void *param); static void _cl5DoTrimming (ReplicaId rid); static void _cl5CompactDBs(); -static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid); +static void _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid); +static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key); +static int _cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key); +static void _cl5TrimFile (Object *obj, long *numToTrim); static PRBool _cl5CanTrim (time_t time, long *numToTrim); static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge); static int _cl5WriteRUV (CL5DBFile *file, PRBool purge); static int _cl5ConstructRUV (const char *replGen, Object *obj, PRBool purge); static int _cl5UpdateRUV (Object *obj, CSN *csn, PRBool newReplica, PRBool purge); static int _cl5GetRUV2Purge2 (Object *fileObj, RUV **ruv); -void trigger_cl_trimming_thread(void *rid); +void trigger_cl_purging_thread(void *rid); /* bakup/recovery, import/export */ static int _cl5LDIF2Operation (char *ldifEntry, slapi_operation_parameters *op, @@ -3499,9 +3502,17 @@ static void _cl5DoTrimming (ReplicaId rid) trimmed more often than other. We might have to fix that by, for example, randomizing starting point */ obj = objset_first_obj (s_cl5Desc.dbFiles); - while (obj && _cl5CanTrim ((time_t)0, &numToTrim)) + while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid)) { - _cl5TrimFile (obj, &numToTrim, rid); + if (rid){ + /* + * We are cleaning an invalid rid, and need to strip it + * from the changelog. + */ + _cl5PurgeRID (obj, rid); + } else { + _cl5TrimFile (obj, &numToTrim); + } obj = objset_next_obj (s_cl5Desc.dbFiles, obj); } @@ -3578,12 +3589,351 @@ bail: return; } +/* + * If the rid is not set it is the very first iteration of the changelog. + * If the rid is set, we are doing another pass, and we have a key as our + * starting point. + */ +static int +_cl5PurgeGetFirstEntry(Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key) +{ + DBC *cursor = NULL; + DBT data = {0}; + CL5Iterator *it; + CL5DBFile *file; + int rc; + + file = (CL5DBFile*)object_get_data (obj); + + /* create cursor */ + rc = file->db->cursor(file->db, txnid, &cursor, 0); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeGetFirstEntry: failed to create cursor; db error - %d %s\n", rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + goto done; + } + + key->flags = DB_DBT_MALLOC; + data.flags = DB_DBT_MALLOC; + while ((rc = cursor->c_get(cursor, key, &data, rid?DB_SET:DB_NEXT)) == 0) + { + /* skip service entries on the first pass (rid == 0)*/ + if (!rid && cl5HelperEntry ((char*)key->data, NULL)) + { + slapi_ch_free(&key->data); + slapi_ch_free(&(data.data)); + continue; + } + + /* format entry */ + rc = cl5DBData2Entry(data.data, data.size, entry); + slapi_ch_free(&(data.data)); + if (rc != 0) + { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5PurgeGetFirstEntry: failed to format entry: %d\n", rc); + goto done; + } + + it = (CL5Iterator*)slapi_ch_malloc(sizeof (CL5Iterator)); + it->cursor = cursor; + object_acquire (obj); + it->file = obj; + *(CL5Iterator**)iterator = it; + + return CL5_SUCCESS; + } + + slapi_ch_free(&key->data); + slapi_ch_free(&(data.data)); + + /* walked of the end of the file */ + if (rc == DB_NOTFOUND) + { + rc = CL5_NOTFOUND; + goto done; + } + + /* db error occured while iterating */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeGetFirstEntry: failed to get entry; db error - %d %s\n", + rc, db_strerror(rc)); + rc = CL5_DB_ERROR; + +done: + /* + * We didn't success in assigning this cursor to the iterator, + * so we need to free the cursor here. + */ + if (cursor) + cursor->c_close(cursor); + + return rc; +} + +/* + * Get the next entry. If we get a lock error we will restart the process + * starting at the current key. + */ +static int +_cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key) +{ + CL5Iterator *it; + DBT data={0}; + int rc; + + it = (CL5Iterator*) iterator; + + key->flags = DB_DBT_MALLOC; + data.flags = DB_DBT_MALLOC; + while ((rc = it->cursor->c_get(it->cursor, key, &data, DB_NEXT)) == 0) + { + if (cl5HelperEntry ((char*)key->data, NULL)) + { + slapi_ch_free(&key->data); + slapi_ch_free(&(data.data)); + continue; + } + + /* format entry */ + rc = cl5DBData2Entry (data.data, data.size, entry); + slapi_ch_free (&(data.data)); + if (rc != 0) + { + if (rc != CL5_DB_LOCK_ERROR){ + /* Not a lock error, free the key */ + slapi_ch_free(&key->data); + } + slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL, + repl_plugin_name_cl, + "_cl5PurgeGetNextEntry: failed to format entry: %d\n", + rc); + + } + + return rc; + } + slapi_ch_free(&(data.data)); + + /* walked of the end of the file or entry is out of range */ + if (rc == 0 || rc == DB_NOTFOUND){ + slapi_ch_free(&key->data); + return CL5_NOTFOUND; + } + if (rc != CL5_DB_LOCK_ERROR){ + /* Not a lock error, free the key */ + slapi_ch_free(&key->data); + } + + /* cursor operation failed */ + slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL, + repl_plugin_name_cl, + "_cl5PurgeGetNextEntry: failed to get entry; db error - %d %s\n", + rc, db_strerror(rc)); + + return rc; +} + +#define MAX_RETRIES 10 +/* + * _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid) + * + * Clean the entire changelog of updates from the "cleaned rid" via CLEANALLRUV + * Delete entries in batches so we don't consume too many db locks, and we don't + * lockup the changelog during the entire purging process using one transaction. + * We save the key from the last iteration so we don't have to start from the + * beginning for each new iteration. + */ +static void +_cl5PurgeRID(Object *obj, ReplicaId cleaned_rid) +{ + slapi_operation_parameters op = {0}; + ReplicaId csn_rid; + CL5Entry entry; + DB_TXN *txnid = NULL; + DBT key = {0}; + void *iterator = NULL; + long totalTrimmed = 0; + long trimmed = 0; + char *starting_key = NULL; + int batch_count = 0; + int db_lock_retry_count = 0; + int first_pass = 1; + int finished = 0; + int rc = 0; + + PR_ASSERT (obj); + entry.op = &op; + + /* + * Keep processing the changelog until we are done, shutting down, or we + * maxed out on the db lock retries. + */ + while (!finished && db_lock_retry_count < MAX_RETRIES && !slapi_is_shutting_down()){ + trimmed = 0; + + /* + * Sleep a bit to allow others to use the changelog - we can't hog the + * changelog for the entire purge. + */ + DS_Sleep(PR_MillisecondsToInterval(100)); + + rc = TXN_BEGIN(s_cl5Desc.dbEnv, NULL, &txnid, 0); + if (rc != 0){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeRID: failed to begin transaction; db error - %d %s. " + "Changelog was not purged of rid(%d)\n", + rc, db_strerror(rc), cleaned_rid); + return; + } + + /* + * Check every changelog entry for the cleaned rid + */ + rc = _cl5PurgeGetFirstEntry(obj, &entry, &iterator, txnid, first_pass?0:cleaned_rid, &key); + first_pass = 0; + while (rc == CL5_SUCCESS && !slapi_is_shutting_down()) { + /* + * Store the new starting key - we need this starting key in case + * we run out of locks and have to start the transaction over. + */ + slapi_ch_free_string(&starting_key); + starting_key = slapi_ch_strdup((char*)key.data); + + if(trimmed == 10000 || (batch_count && trimmed == batch_count)){ + /* + * Break out, and commit these deletes. Do not free the key, + * we need it for the next pass. + */ + cl5_operation_parameters_done (&op); + db_lock_retry_count = 0; /* reset the retry count */ + break; + } + if(op.csn){ + csn_rid = csn_get_replicaid (op.csn); + if (csn_rid == cleaned_rid){ + rc = _cl5CurrentDeleteEntry (iterator); + if (rc != CL5_SUCCESS){ + /* log error */ + cl5_operation_parameters_done (&op); + if (rc == CL5_DB_LOCK_ERROR){ + /* + * Ran out of locks, need to restart the transaction. + * Reduce the the batch count and reset the key to + * the starting point + */ + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5PurgeRID: Ran out of db locks deleting entry. " + "Reduce the batch value and restart.\n"); + batch_count = trimmed - 10; + if (batch_count < 10){ + batch_count = 10; + } + trimmed = 0; + slapi_ch_free(&(key.data)); + key.data = starting_key; + starting_key = NULL; + db_lock_retry_count++; + break; + } else { + /* fatal error */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeRID: fatal error (%d)\n", rc); + slapi_ch_free(&(key.data)); + finished = 1; + break; + } + } + trimmed++; + } + } + slapi_ch_free(&(key.data)); + cl5_operation_parameters_done (&op); + + rc = _cl5PurgeGetNextEntry (&entry, iterator, &key); + if (rc == CL5_DB_LOCK_ERROR){ + /* + * Ran out of locks, need to restart the transaction. + * Reduce the the batch count and reset the key to the starting + * point. + */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeRID: Ran out of db locks getting the next entry. " + "Reduce the batch value and restart.\n"); + batch_count = trimmed - 10; + if (batch_count < 10){ + batch_count = 10; + } + trimmed = 0; + cl5_operation_parameters_done (&op); + slapi_ch_free(&(key.data)); + key.data = starting_key; + starting_key = NULL; + db_lock_retry_count++; + break; + } + } + + if (rc == CL5_NOTFOUND){ + /* Scanned the entire changelog, we're done */ + finished = 1; + } + + /* Destroy the iterator before we finish with the txn */ + cl5DestroyIterator (iterator); + + /* + * Commit or abort the txn + */ + if (rc == CL5_SUCCESS || rc == CL5_NOTFOUND){ + rc = TXN_COMMIT (txnid, 0); + if (rc != 0){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeRID: failed to commit transaction; db error - %d %s. " + "Changelog was not completely purged of rid (%d)\n", + rc, db_strerror(rc), cleaned_rid); + break; + } else if (finished){ + /* We're done */ + totalTrimmed += trimmed; + break; + } else { + /* Not done yet */ + totalTrimmed += trimmed; + trimmed = 0; + } + } else { + rc = TXN_ABORT (txnid); + if (rc != 0){ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeRID: failed to abort transaction; db error - %d %s. " + "Changelog was not completely purged of rid (%d)\n", + rc, db_strerror(rc), cleaned_rid); + } + if (batch_count == 0){ + /* This was not a retry. Fatal error, break out */ + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5PurgeRID: Changelog was not purged of rid (%d)\n", + cleaned_rid); + break; + } + } + } + slapi_ch_free_string(&starting_key); + + slapi_log_error (SLAPI_LOG_REPL, repl_plugin_name_cl, + "_cl5PurgeRID: Removed (%ld entries) that originated from rid (%d)\n", + totalTrimmed, cleaned_rid); +} + /* Note that each file contains changes for a single replicated area. trimming algorithm: */ #define CL5_TRIM_MAX_PER_TRANSACTION 10 -static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) +static void _cl5TrimFile (Object *obj, long *numToTrim) { DB_TXN *txnid; RUV *ruv = NULL; @@ -3606,7 +3956,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) } entry.op = &op; - while ( !finished && !slapi_is_shutting_down() ) { it = NULL; @@ -3627,7 +3976,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) } finished = _cl5GetFirstEntry (obj, &entry, &it, txnid); - while ( !finished ) + while ( !finished && !slapi_is_shutting_down()) { /* * This change can be trimmed if it exceeds purge @@ -3641,11 +3990,12 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) continue; } csn_rid = csn_get_replicaid (op.csn); + if ( (*numToTrim > 0 || _cl5CanTrim (entry.time, numToTrim)) && ruv_covers_csn_strict (ruv, op.csn) ) { rc = _cl5CurrentDeleteEntry (it); - if ( rc == CL5_SUCCESS && cleaned_rid != csn_rid) + if ( rc == CL5_SUCCESS) { rc = _cl5UpdateRUV (obj, op.csn, PR_FALSE, PR_TRUE); } @@ -3659,7 +4009,6 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) /* The above two functions have logged the error */ abort = PR_TRUE; } - } else { @@ -3716,7 +4065,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) rc = TXN_ABORT (txnid); if (rc != 0) { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5TrimFile: failed to abort transaction; db error - %d %s\n", rc, db_strerror(rc)); } @@ -3727,7 +4076,7 @@ static void _cl5TrimFile (Object *obj, long *numToTrim, ReplicaId cleaned_rid) if (rc != 0) { finished = 1; - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5TrimFile: failed to commit transaction; db error - %d %s\n", rc, db_strerror(rc)); } @@ -4751,9 +5100,9 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen, goto done; } #endif - /* back off */ + /* back off */ interval = PR_MillisecondsToInterval(slapi_rand() % 100); - DS_Sleep(interval); + DS_Sleep(interval); } #if USE_DB_TXN /* begin transaction */ @@ -4799,19 +5148,19 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen, } cnt ++; } - + if (rc == 0) /* we successfully added entry */ { #if USE_DB_TXN rc = TXN_COMMIT (txnid, 0); #endif } - else + else { - char s[CSN_STRSIZE]; - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + char s[CSN_STRSIZE]; + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5WriteOperationTxn: failed to write entry with csn (%s); " - "db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s), + "db error - %d %s\n", csn_as_string(op->csn,PR_FALSE,s), rc, db_strerror(rc)); #if USE_DB_TXN rc = TXN_ABORT (txnid); @@ -4832,7 +5181,7 @@ static int _cl5WriteOperationTxn(const char *replName, const char *replGen, /* update purge vector if we have not seen any changes from this replica before */ _cl5UpdateRUV (file_obj, op->csn, PR_TRUE, PR_TRUE); - slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name_cl, "cl5WriteOperationTxn: successfully written entry with csn (%s)\n", csnStr); rc = CL5_SUCCESS; done: @@ -4846,7 +5195,7 @@ done: return rc; } -static int _cl5WriteOperation(const char *replName, const char *replGen, +static int _cl5WriteOperation(const char *replName, const char *replGen, const slapi_operation_parameters *op, PRBool local) { return _cl5WriteOperationTxn(replName, replGen, op, local, NULL); @@ -4897,7 +5246,7 @@ static int _cl5GetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_ goto done; } - it = (CL5Iterator*)slapi_ch_malloc (sizeof (CL5Iterator)); + it = (CL5Iterator*)slapi_ch_malloc(sizeof (CL5Iterator)); it->cursor = cursor; object_acquire (obj); it->file = obj; @@ -4972,7 +5321,7 @@ static int _cl5GetNextEntry (CL5Entry *entry, void *iterator) slapi_ch_free (&(data.data)); if (rc != 0) { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "_cl5GetNextEntry: failed to format entry: %d\n", rc); } @@ -5001,38 +5350,42 @@ static int _cl5GetNextEntry (CL5Entry *entry, void *iterator) } /* cursor operation failed */ - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, - "_cl5GetNextEntry: failed to get entry; db error - %d %s\n", - rc, db_strerror(rc)); + slapi_log_error(rc == CL5_DB_LOCK_ERROR?SLAPI_LOG_REPL:SLAPI_LOG_FATAL, + repl_plugin_name_cl, + "_cl5GetNextEntry: failed to get entry; db error - %d %s\n", + rc, db_strerror(rc)); - return CL5_DB_ERROR; + return rc; } static int _cl5CurrentDeleteEntry (void *iterator) { int rc; CL5Iterator *it; - CL5DBFile *file; + CL5DBFile *file; - PR_ASSERT (iterator); + PR_ASSERT (iterator); it = (CL5Iterator*)iterator; rc = it->cursor->c_del (it->cursor, 0); if (rc == 0) { - /* decrement entry count */ - file = (CL5DBFile*)object_get_data (it->file); - PR_AtomicDecrement (&file->entryCount); - return CL5_SUCCESS; - } else { - slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, - "_cl5CurrentDeleteEntry failed, err=%d %s\n", - rc, db_strerror(rc)); - /* We don't free(close) the cursor here, as the caller will free it by a call to cl5DestroyIterator */ - /* Freeing it here is a potential bug, as the cursor can't be referenced later once freed */ - return CL5_DB_ERROR; - } + /* decrement entry count */ + file = (CL5DBFile*)object_get_data (it->file); + PR_AtomicDecrement (&file->entryCount); + return CL5_SUCCESS; + } else { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "_cl5CurrentDeleteEntry failed, err=%d %s\n", + rc, db_strerror(rc)); + /* + * We don't free(close) the cursor here, as the caller will free it by + * a call to cl5DestroyIterator. Freeing it here is a potential bug, + * as the cursor can't be referenced later once freed. + */ + return rc; + } } static PRBool _cl5IsValidIterator (const CL5Iterator *iterator) @@ -6304,7 +6657,7 @@ static int _cl5ExportFile (PRFileDesc *prFile, Object *obj) slapi_write_buffer (prFile, "\n", strlen("\n")); entry.op = &op; - rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); + rc = _cl5GetFirstEntry (obj, &entry, &iterator, NULL); while (rc == CL5_SUCCESS) { rc = _cl5Operation2LDIF (&op, file->replGen, &buff, &len); @@ -6725,16 +7078,16 @@ cl5CleanRUV(ReplicaId rid){ slapi_rwlock_unlock (s_cl5Desc.stLock); } -void trigger_cl_trimming(ReplicaId rid){ +void trigger_cl_purging(ReplicaId rid){ PRThread *trim_tid = NULL; - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_trimming: rid (%d)\n",(int)rid); - trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_trimming_thread, + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid); + trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread, (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE); if (NULL == trim_tid){ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, - "trigger_cl_trimming: failed to create trimming " + "trigger_cl_purging: failed to create trimming " "thread; NSPR error - %d\n", PR_GetError ()); } else { /* need a little time for the thread to get started */ @@ -6743,7 +7096,7 @@ void trigger_cl_trimming(ReplicaId rid){ } void -trigger_cl_trimming_thread(void *arg){ +trigger_cl_purging_thread(void *arg){ ReplicaId rid = *(ReplicaId *)arg; /* make sure we have a change log, and we aren't closing it */ @@ -6752,7 +7105,7 @@ trigger_cl_trimming_thread(void *arg){ } if (CL5_SUCCESS != _cl5AddThread()) { slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, - "trigger_cl_trimming: failed to increment thread count " + "trigger_cl_purging: failed to increment thread count " "NSPR error - %d\n", PR_GetError ()); } _cl5DoTrimming(rid); diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h index ba9eb32..5dcc8e2 100644 --- a/ldap/servers/plugins/replication/cl5_api.h +++ b/ldap/servers/plugins/replication/cl5_api.h @@ -145,6 +145,9 @@ enum CL5_CSN_ERROR, /* CSN API failed */ CL5_RUV_ERROR, /* RUV API failed */ CL5_OBJSET_ERROR, /* namedobjset api failed */ + CL5_DB_LOCK_ERROR, /* bdb returns error 12 when the db runs out of locks, + this var needs to be in slot 12 of the list. + Do not re-order enum above! */ CL5_PURGED_DATA, /* requested data has been purged */ CL5_MISSING_DATA, /* data should be in the changelog, but is missing */ CL5_UNKNOWN_ERROR, /* unclassified error */ @@ -492,6 +495,6 @@ int cl5WriteRUV(); int cl5DeleteRUV(); void cl5CleanRUV(ReplicaId rid); void cl5NotifyCleanup(int rid); -void trigger_cl_trimming(ReplicaId rid); +void trigger_cl_purging(ReplicaId rid); #endif diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c index 1570ba7..974778c 100644 --- a/ldap/servers/plugins/replication/repl5_replica_config.c +++ b/ldap/servers/plugins/replication/repl5_replica_config.c @@ -1468,6 +1468,11 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not */ cl5CleanRUV(rid); + /* + * Now purge the changelog + */ + trigger_cl_purging(rid); + if (rc != RUV_SUCCESS){ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc); return LDAP_OPERATIONS_ERROR; @@ -1867,7 +1872,7 @@ replica_cleanallruv_thread(void *arg) /* no agmts, just clean this replica */ break; } - while (agmt_obj){ + while (agmt_obj && !slapi_is_shutting_down()){ agmt = (Repl_Agmt*)object_get_data (agmt_obj); if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); @@ -1947,13 +1952,15 @@ replica_cleanallruv_thread(void *arg) break; } /* - * need to sleep between passes + * Need to sleep between passes unless we are shutting down */ - cleanruv_log(data->task, CLEANALLRUV_ID, "Replicas have not been cleaned yet, " - "retrying in %d seconds", interval); - PR_Lock( notify_lock ); - PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); - PR_Unlock( notify_lock ); + if (!slapi_is_shutting_down()){ + cleanruv_log(data->task, CLEANALLRUV_ID, "Replicas have not been cleaned yet, " + "retrying in %d seconds", interval); + PR_Lock( notify_lock ); + PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); + PR_Unlock( notify_lock ); + } if(interval < 14400){ /* 4 hour max */ interval = interval * 2; @@ -1964,10 +1971,9 @@ replica_cleanallruv_thread(void *arg) done: /* - * If the replicas are cleaned, release the rid, and trim the changelog + * If the replicas are cleaned, release the rid */ if(!aborted){ - trigger_cl_trimming(data->rid); delete_cleaned_rid_config(data); /* make sure all the replicas have been "pre_cleaned" before finishing */ check_replicas_are_done_cleaning(data); @@ -1977,7 +1983,7 @@ done: /* * Shutdown or abort */ - if(!is_task_aborted(data->rid)){ + if(!is_task_aborted(data->rid) || slapi_is_shutting_down()){ cleanruv_log(data->task, CLEANALLRUV_ID,"Server shutting down. Process will resume at server startup"); } else { cleanruv_log(data->task, CLEANALLRUV_ID,"Task aborted for rid(%d).",data->rid); @@ -2212,7 +2218,7 @@ check_agmts_are_caught_up(cleanruv_data *data, char *maxcsn) not_all_caughtup = 0; break; } - while (agmt_obj){ + while (agmt_obj && !slapi_is_shutting_down()){ agmt = (Repl_Agmt*)object_get_data (agmt_obj); if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ agmt_obj = agmtlist_get_next_agreement_for_replica (data->replica, agmt_obj); @@ -2269,7 +2275,7 @@ check_agmts_are_alive(Replica *replica, ReplicaId rid, Slapi_Task *task) not_all_alive = 0; break; } - while (agmt_obj){ + while (agmt_obj && !slapi_is_shutting_down()){ agmt = (Repl_Agmt*)object_get_data (agmt_obj); if(!agmt_is_enabled(agmt) || get_agmt_agreement_type(agmt) == REPLICA_TYPE_WINDOWS){ agmt_obj = agmtlist_get_next_agreement_for_replica (replica, agmt_obj); @@ -3034,12 +3040,14 @@ replica_abort_task_thread(void *arg) break; } /* - * need to sleep between passes + * Need to sleep between passes. unless we are shutting down */ - cleanruv_log(data->task, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval); - PR_Lock( notify_lock ); - PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); - PR_Unlock( notify_lock ); + if (!slapi_is_shutting_down()){ + cleanruv_log(data->task, ABORT_CLEANALLRUV_ID,"Retrying in %d seconds",interval); + PR_Lock( notify_lock ); + PR_WaitCondVar( notify_cvar, PR_SecondsToInterval(interval) ); + PR_Unlock( notify_lock ); + } if(interval < 14400){ /* 4 hour max */ interval = interval * 2; @@ -3057,7 +3065,7 @@ done: * Wait for this server to stop its cleanallruv task(which removes the rid from the cleaned list) */ cleanruv_log(data->task, ABORT_CLEANALLRUV_ID, "Waiting for CleanAllRUV task to abort..."); - while(is_cleaned_rid(data->rid)){ + while(is_cleaned_rid(data->rid) && !slapi_is_shutting_down()){ DS_Sleep(PR_SecondsToInterval(1)); count++; if(count == 60){ /* it should not take this long */ -- 1.9.3