From 9ba2b33ff1741cb09b40fd2f36b8656704b88a18 Mon Sep 17 00:00:00 2001 From: Mark Reynolds Date: Tue, 23 Aug 2016 12:06:30 -0400 Subject: [PATCH 401/404] Ticket 48964 - cleanAllRUV changelog purging incorrectly processes all backends Bug Description: When the changelog was being purged of "cleaned" rids it was checking all the backend changelogs, and not the one from which the cleanAllRUV task originated from. This could corrupt a different backend's changelog if both backends used the same RID. Fix Description: Purge the changelog associated with the backend that is specified in the cleanAllRUV task. Also moved the "purging" to its own function, and fixed a few compiler warnings. https://fedorahosted.org/389/ticket/48965 Reviewed by: nhosoi(Thanks!) (cherry picked from commit fda00435a7536c1ded72bb78a975f3370d09a3be) (cherry picked from commit 0d5afb2bbd6d639ec59e6bdf075bd61d3d72ef79) --- ldap/servers/plugins/replication/cl5_api.c | 163 +++++++++++++-------- ldap/servers/plugins/replication/cl5_api.h | 2 +- .../plugins/replication/repl5_replica_config.c | 2 +- 3 files changed, 106 insertions(+), 61 deletions(-) diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c index 259c054..d91574f 100644 --- a/ldap/servers/plugins/replication/cl5_api.c +++ b/ldap/servers/plugins/replication/cl5_api.c @@ -344,10 +344,9 @@ static int _cl5CheckMissingCSN (const CSN *minCsn, const RUV *supplierRUV, CL5DB static int _cl5TrimInit (); static void _cl5TrimCleanup (); static int _cl5TrimMain (void *param); -static void _cl5DoTrimming (ReplicaId rid); +static void _cl5DoTrimming (); static PRBool _cl5CanTrim (time_t time, long *numToTrim); static void _cl5TrimFile (Object *obj, long *numToTrim); - static void _cl5PurgeRID(Object *obj, ReplicaId cleaned_rid); static int _cl5PurgeGetFirstEntry (Object *obj, CL5Entry *entry, void **iterator, DB_TXN *txnid, int rid, DBT *key); static int _cl5PurgeGetNextEntry (CL5Entry *entry, void *iterator, DBT *key); @@ -3445,43 +3444,37 @@ static int _cl5TrimMain (void *param) return 0; } -/* We remove an entry if it has been replayed to all consumers and - and the number of entries in the changelog is larger than maxEntries - or age of the entry is larger than maxAge. - Also we can't purge entries which correspond to max csns in the - supplier's ruv. Here is a example where we can get into trouble: - The server is setup with time based trimming and no consumer's - At some point all the entries are trimmed from the changelog. - At a later point a consumer is added and initialized online - Then a change is made on the supplier. - To update the consumer, the supplier would attempt to locate - the last change sent to the consumer in the changelog and will - fail because the change was removed. - +/* + * We remove an entry if it has been replayed to all consumers and the number + * of entries in the changelog is larger than maxEntries or age of the entry + * is larger than maxAge. Also we can't purge entries which correspond to max + * csns in the supplier's ruv. Here is a example where we can get into trouble: + * + * The server is setup with time based trimming and no consumer's + * At some point all the entries are trimmed from the changelog. + * At a later point a consumer is added and initialized online. + * Then a change is made on the supplier. + * To update the consumer, the supplier would attempt to locate the last + * change sent to the consumer in the changelog and will fail because the + * change was removed. */ - -static void _cl5DoTrimming (ReplicaId rid) +static void _cl5DoTrimming () { Object *obj; long numToTrim; PR_Lock (s_cl5Desc.dbTrim.lock); - /* ONREPL We trim file by file which means that some files will be - trimmed more often than other. We might have to fix that by, for - example, randomizing starting point */ + /* + * We are trimming all the changelogs. We trim file by file which + * means that some files will be trimmed more often than other. We + * might have to fix that by, for example, randomizing the starting + * point. + */ obj = objset_first_obj (s_cl5Desc.dbFiles); - while (obj && (_cl5CanTrim ((time_t)0, &numToTrim) || rid)) + while (obj && _cl5CanTrim ((time_t)0, &numToTrim)) { - if (rid){ - /* - * We are cleaning an invalid rid, and need to strip it - * from the changelog. - */ - _cl5PurgeRID (obj, rid); - } else { - _cl5TrimFile (obj, &numToTrim); - } + _cl5TrimFile (obj, &numToTrim); obj = objset_next_obj (s_cl5Desc.dbFiles, obj); } @@ -3494,6 +3487,43 @@ static void _cl5DoTrimming (ReplicaId rid) } /* + * We are purging a changelog after a cleanAllRUV task. Find the specific + * changelog for the backend that is being cleaned, and purge all the records + * with the cleaned rid. + */ +static void _cl5DoPurging (Replica *replica) +{ + ReplicaId rid = replica_get_rid(replica); + const Slapi_DN *sdn = replica_get_root(replica); + const char *replName = replica_get_name(replica); + char *replGen = replica_get_generation(replica); + char *fileName; + Object *obj; + + PR_Lock (s_cl5Desc.dbTrim.lock); + fileName = _cl5MakeFileName (replName, replGen); + obj = objset_find(s_cl5Desc.dbFiles, _cl5CompareDBFile, fileName); + if (obj) { + /* We found our changelog, now purge it */ + _cl5PurgeRID (obj, rid); + object_release (obj); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "Purged rid (%d) from suffix (%s)\n", + rid, slapi_sdn_get_dn(sdn)); + } else { + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, + "Purge rid (%d) failed to find changelog file (%s) for suffix (%s)\n", + rid, fileName, slapi_sdn_get_dn(sdn)); + } + PR_Unlock (s_cl5Desc.dbTrim.lock); + + slapi_ch_free_string(&replGen); + slapi_ch_free_string(&fileName); + + return; +} + +/* * If the rid is not set it is the very first iteration of the changelog. * If the rid is set, we are doing another pass, and we have a key as our * starting point. @@ -4005,23 +4035,25 @@ static PRBool _cl5CanTrim (time_t time, long *numToTrim) { *numToTrim = 0; - if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) + if (s_cl5Desc.dbTrim.maxAge == 0 && s_cl5Desc.dbTrim.maxEntries == 0) { return PR_FALSE; - + } if (s_cl5Desc.dbTrim.maxAge == 0) { *numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries; return ( *numToTrim > 0 ); } - if (s_cl5Desc.dbTrim.maxEntries > 0 && - (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) - return PR_TRUE; + if (s_cl5Desc.dbTrim.maxEntries > 0 && + (*numToTrim = cl5GetOperationCount (NULL) - s_cl5Desc.dbTrim.maxEntries) > 0) { + return PR_TRUE; + } - if (time) + if (time) { return (current_time () - time > s_cl5Desc.dbTrim.maxAge); - else - return PR_TRUE; + } else { + return PR_TRUE; + } } static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) @@ -4034,7 +4066,6 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) char *pos; char *agmt_name; - PR_ASSERT (replGen && obj); file = (CL5DBFile*)object_get_data (obj); @@ -4042,13 +4073,12 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) agmt_name = get_thread_private_agmtname(); - if (purge) /* read purge vector entry */ - key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr); - else /* read upper bound vector */ - key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr); - + if (purge) { /* read purge vector entry */ + key.data = _cl5GetHelperEntryKey (PURGE_RUV_TIME, csnStr); + } else { /* read upper bound vector */ + key.data = _cl5GetHelperEntryKey (MAX_RUV_TIME, csnStr); + } key.size = CSN_STRSIZE; - data.flags = DB_DBT_MALLOC; rc = file->db->get(file->db, NULL/*txn*/, &key, &data, 0); @@ -4058,13 +4088,13 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) rc = _cl5ReadBervals (&vals, &pos, data.size); slapi_ch_free (&(data.data)); if (rc != CL5_SUCCESS) - goto done; + goto done; - if (purge) + if (purge) { rc = ruv_init_from_bervals(vals, &file->purgeRUV); - else + } else { rc = ruv_init_from_bervals(vals, &file->maxRUV); - + } if (rc != RUV_SUCCESS) { slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, @@ -4072,7 +4102,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) "RUV error %d\n", agmt_name, purge? "purge" : "upper bound", rc); rc = CL5_RUV_ERROR; - goto done; + goto done; } /* delete the entry; it is re-added when file @@ -4084,7 +4114,7 @@ static int _cl5ReadRUV (const char *replGen, Object *obj, PRBool purge) case DB_NOTFOUND: /* RUV is lost - need to construct */ rc = _cl5ConstructRUV (replGen, obj, purge); - goto done; + goto done; default: slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, "%s: _cl5ReadRUV: failed to get purge RUV; " @@ -6858,12 +6888,14 @@ cl5CleanRUV(ReplicaId rid){ slapi_rwlock_unlock (s_cl5Desc.stLock); } -void trigger_cl_purging(ReplicaId rid){ +/* + * Create a thread to purge a changelog of cleaned RIDs + */ +void trigger_cl_purging(Replica *replica){ PRThread *trim_tid = NULL; - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, "trigger_cl_purging: rid (%d)\n",(int)rid); trim_tid = PR_CreateThread(PR_USER_THREAD, (VFP)(void*)trigger_cl_purging_thread, - (void *)&rid, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, + (void *)replica, PR_PRIORITY_NORMAL, PR_GLOBAL_THREAD, PR_UNJOINABLE_THREAD, DEFAULT_THREAD_STACKSIZE); if (NULL == trim_tid){ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, @@ -6875,19 +6907,32 @@ void trigger_cl_purging(ReplicaId rid){ } } +/* + * Purge a changelog of entries that originated from a particular replica(rid) + */ void trigger_cl_purging_thread(void *arg){ - ReplicaId rid = *(ReplicaId *)arg; + Replica *replica = (Replica *)arg; - /* make sure we have a change log, and we aren't closing it */ - if(s_cl5Desc.dbState == CL5_STATE_CLOSED || s_cl5Desc.dbState == CL5_STATE_CLOSING){ + /* Make sure we have a change log, and we aren't closing it */ + if (replica == NULL || + s_cl5Desc.dbState == CL5_STATE_CLOSED || + s_cl5Desc.dbState == CL5_STATE_CLOSING) { return; } + + /* Bump the changelog thread count */ if (CL5_SUCCESS != _cl5AddThread()) { slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, - "trigger_cl_purging: failed to increment thread count " + "trigger_cl_purging: Abort - failed to increment thread count " "NSPR error - %d\n", PR_GetError ()); + return; } - _cl5DoTrimming(rid); + + /* Purge the changelog */ + _cl5DoPurging(replica); _cl5RemoveThread(); + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, + "trigger_cl_purging: purged changelog for (%s) rid (%d)\n", + slapi_sdn_get_dn(replica_get_root(replica)), replica_get_rid(replica)); } diff --git a/ldap/servers/plugins/replication/cl5_api.h b/ldap/servers/plugins/replication/cl5_api.h index b46a691..83683f5 100644 --- a/ldap/servers/plugins/replication/cl5_api.h +++ b/ldap/servers/plugins/replication/cl5_api.h @@ -493,6 +493,6 @@ int cl5WriteRUV(); int cl5DeleteRUV(); void cl5CleanRUV(ReplicaId rid); void cl5NotifyCleanup(int rid); -void trigger_cl_purging(ReplicaId rid); +void trigger_cl_purging(Replica *replica); #endif diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c index a1dceae..010e517 100644 --- a/ldap/servers/plugins/replication/repl5_replica_config.c +++ b/ldap/servers/plugins/replication/repl5_replica_config.c @@ -1244,7 +1244,7 @@ replica_execute_cleanruv_task (Object *r, ReplicaId rid, char *returntext /* not /* * Now purge the changelog */ - trigger_cl_purging(rid); + trigger_cl_purging(replica); if (rc != RUV_SUCCESS){ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "cleanruv_task: task failed(%d)\n",rc); -- 2.4.11