From 94377fba9dbcfc2fe47a32cc7cb85766813ad482 Mon Sep 17 00:00:00 2001 From: Mark Reynolds Date: Wed, 8 Jun 2016 13:06:46 -0400 Subject: [PATCH 98/99] Ticket 48636 - Improve replication convergence Bug Description: In a busy MMR environment where multiple masters are being updated at the same time the replica sessions stay open for a very long time. This causes other masters to wait to send their updates. This causes lop-sided convergence. Where entries added to the MMR environment, but on different masters, take a very different amount of time until they are each seen on all the replicas. Fix Description: A new configuratoin setting was added (nsds5ReplicaReleaseTimeout) to the replica configuration entry. So when replica A tries to acquire a replica B, replica B send a control back to the master(master C) that is updating replica B to abort the session. Master C will continue sending updates for the amount of time specified in the the "release timeout", then it will "yield" its current session so other replicas can acquire that replica. https://fedorahosted.org/389/ticket/48636 Reviewed by: lkrispen & nhosoi(Thanks!!) (cherry picked from commit a1545cdae48e4b4e1fc87a168e4d8f959626f112) (cherry picked from commit a085b0cd6b39fc85821777b7bcd2a8a2482a48bf) --- ldap/schema/01core389.ldif | 3 +- ldap/servers/plugins/replication/repl5.h | 14 ++- .../plugins/replication/repl5_inc_protocol.c | 102 ++++++++++++---- ldap/servers/plugins/replication/repl5_plugins.c | 3 +- ldap/servers/plugins/replication/repl5_replica.c | 135 +++++++++++++++++---- .../plugins/replication/repl5_replica_config.c | 22 ++++ ldap/servers/plugins/replication/repl_globals.c | 1 + 7 files changed, 229 insertions(+), 51 deletions(-) diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif index aebdb5a..14143ed 100644 --- a/ldap/schema/01core389.ldif +++ b/ldap/schema/01core389.ldif @@ -278,6 +278,7 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' attributeTypes: ( 2.16.840.1.113730.3.1.2313 NAME 'nsslapd-changelogtrim-interval' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) attributeTypes: ( 2.16.840.1.113730.3.1.2314 NAME 'nsslapd-changelogcompactdb-interval' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) attributeTypes: ( 2.16.840.1.113730.3.1.2315 NAME 'nsDS5ReplicaWaitForAsyncResults' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) +attributeTypes: ( 2.16.840.1.113730.3.1.2333 NAME 'nsds5ReplicaReleaseTimeout' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' ) # # objectclasses # @@ -287,7 +288,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.44 NAME 'nsIndex' DESC 'Netscape defined objectClasses: ( 2.16.840.1.113730.3.2.109 NAME 'nsBackendInstance' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top MUST ( CN ) X-ORIGIN 'Netscape Directory Server' ) -objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' ) +objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top MUST ( nsDS5ReplicaRoot $ nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax $ nsds5ReplicaReleaseTimeout ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause $ nsDS5ReplicaWaitForAsyncResults ) X-ORIGIN 'Netscape Directory Server' ) objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' ) diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index 307da82..6f6c81a 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -69,6 +69,10 @@ #define REPL_ABORT_CLEANRUV_OID "2.16.840.1.113730.3.6.6" #define REPL_CLEANRUV_GET_MAXCSN_OID "2.16.840.1.113730.3.6.7" #define REPL_CLEANRUV_CHECK_STATUS_OID "2.16.840.1.113730.3.6.8" +#define REPL_ABORT_SESSION_OID "2.16.840.1.113730.3.6.9" +#define SESSION_ACQUIRED 0 +#define ABORT_SESSION 1 +#define SESSION_ABORTED 2 #define CLEANRUV_ACCEPTED "accepted" #define CLEANRUV_REJECTED "rejected" @@ -141,6 +145,7 @@ extern const char *type_nsds5ReplicaStripAttrs; extern const char *type_nsds5ReplicaFlowControlWindow; extern const char *type_nsds5ReplicaFlowControlPause; extern const char *type_replicaProtocolTimeout; +extern const char *type_replicaReleaseTimeout; extern const char *type_replicaBackoffMin; extern const char *type_replicaBackoffMax; extern const char *type_replicaPrecisePurge; @@ -526,9 +531,9 @@ Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_ void replica_destroy(void **arg); int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid); int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid); -PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid, - const char *locking_purl, - char **current_purl); +PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, + int opid, const char *locking_purl, + char **current_purl); void replica_relinquish_exclusive_access(Replica *r, PRUint64 connid, int opid); PRBool replica_get_tombstone_reap_active(const Replica *r); const Slapi_DN *replica_get_root(const Replica *r); @@ -598,6 +603,8 @@ void replica_update_state (time_t when, void *arg); void replica_reset_csn_pl(Replica *r); PRUint64 replica_get_protocol_timeout(Replica *r); void replica_set_protocol_timeout(Replica *r, PRUint64 timeout); +PRUint64 replica_get_release_timeout(Replica *r); +void replica_set_release_timeout(Replica *r, PRUint64 timeout); void replica_set_groupdn_checkinterval(Replica *r, int timeout); PRUint64 replica_get_backoff_min(Replica *r); PRUint64 replica_get_backoff_max(Replica *r); @@ -609,6 +616,7 @@ void replica_decr_agmt_count(Replica *r); PRUint64 replica_get_precise_purging(Replica *r); void replica_set_precise_purging(Replica *r, PRUint64 on_off); PRBool ignore_error_and_keep_going(int error); +void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb); /* The functions below handles the state flag */ /* Current internal state flags */ diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c index 927f835..d6fb898 100644 --- a/ldap/servers/plugins/replication/repl5_inc_protocol.c +++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c @@ -36,6 +36,11 @@ Perhaps these events should be properties of the main protocol. #include "repl5_prot_private.h" #include "cl5_api.h" +#include "repl5.h" +#include "repl5_prot_private.h" +#include "cl5_api.h" +#include "slapi-plugin.h" + extern int slapi_log_urp; /*** from proto-slap.h ***/ @@ -82,6 +87,7 @@ typedef struct result_data int flowcontrol_detection; int result; /* The UPDATE_TRANSIENT_ERROR etc */ int WaitForAsyncResults; + time_t abort_time; } result_data; /* Various states the incremental protocol can pass through */ @@ -121,6 +127,7 @@ typedef struct result_data #define EXAMINE_RUV_PARAM_ERROR 405 #define MAX_CHANGES_PER_SESSION 10000 + /* * Maximum time to wait between replication sessions. If we * don't see any updates for a period equal to this interval, @@ -240,19 +247,21 @@ repl5_inc_result_threadmain(void *param) Repl_Connection *conn = rd->prp->conn; int finished = 0; int message_id = 0; + int yield_session = 0; slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain starting\n"); while (!finished) { + LDAPControl **returned_controls = NULL; repl5_inc_operation *op = NULL; - int connection_error = 0; + ReplicaId replica_id = 0; char *csn_str = NULL; char *uniqueid = NULL; - ReplicaId replica_id = 0; - int operation_code = 0; char *ldap_error_string = NULL; time_t time_now = 0; time_t start_time = time( NULL ); + int connection_error = 0; + int operation_code = 0; int backoff_time = 1; /* Read the next result */ @@ -264,7 +273,7 @@ repl5_inc_result_threadmain(void *param) while (!finished) { - conres = conn_read_result_ex(conn, NULL, NULL, NULL, LDAP_RES_ANY, &message_id, 0); + conres = conn_read_result_ex(conn, NULL, NULL, &returned_controls, LDAP_RES_ANY, &message_id, 0); slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: read result for message_id %d\n", message_id); /* Timeout here means that we didn't block, not a real timeout */ if (CONN_TIMEOUT == conres) @@ -292,9 +301,19 @@ repl5_inc_result_threadmain(void *param) finished = 1; } PR_Unlock(rd->lock); - } else - { - /* Something other than a timeout, so we exit the loop */ + } else { + /* + * Something other than a timeout, so we exit the loop. + * First check if we were told to abort the session + */; + Replica *r = (Replica*)object_get_data(rd->prp->replica_object); + if (replica_get_release_timeout(r) && + slapi_control_present(returned_controls, + REPL_ABORT_SESSION_OID, + NULL, NULL)) + { + yield_session = 1; + } break; } } @@ -318,21 +337,29 @@ repl5_inc_result_threadmain(void *param) } conn_get_error_ex(conn, &operation_code, &connection_error, &ldap_error_string); - slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n", operation_code,connection_error,conres,message_id,ldap_error_string); - return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, csn_str, uniqueid, replica_id, &should_finish, &(rd->num_changes_sent)); + slapi_log_error(SLAPI_LOG_REPL, NULL, + "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n", + operation_code,connection_error,conres,message_id,ldap_error_string); + return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, + csn_str, uniqueid, replica_id, &should_finish, + &(rd->num_changes_sent)); if (return_value || should_finish) { - slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: got op result %d should finish %d\n", return_value, should_finish); + slapi_log_error(SLAPI_LOG_REPL, NULL, + "repl5_inc_result_threadmain: got op result %d should finish %d\n", + return_value, should_finish); /* If so then we need to take steps to abort the update process */ PR_Lock(rd->lock); rd->result = return_value; - rd->abort = 1; + rd->abort = ABORT_SESSION; PR_Unlock(rd->lock); - /* We also need to log the error, including details stored from when the operation was sent */ - /* we cannot finish yet - we still need to waitfor the pending results, then - the main repl code will shut down this thread */ - /* we can finish if we have disconnected - in that case, there will be nothing - to read */ + /* + * We also need to log the error, including details stored from + * when the operation was sent. We cannot finish yet - we still + * need to wait for the pending results, then the main repl code + * will shut down this thread. We can finish if we have + * disconnected - in that case, there will be nothing to read + */ if (return_value == UPDATE_CONNECTION_LOST) { finished = 1; } @@ -341,8 +368,16 @@ repl5_inc_result_threadmain(void *param) rd->result = return_value; } } + /* Should we stop ? */ PR_Lock(rd->lock); + if (!finished && yield_session && rd->abort != SESSION_ABORTED && rd->abort_time == 0) { + rd->abort_time = time( NULL ); + rd->abort = SESSION_ABORTED; /* only set the abort time once */ + slapi_log_error(SLAPI_LOG_REPL, "repl5_inc_result_threadmain", + "Abort control detected, setting abort time...(%s)\n", + agmt_get_long_name(rd->prp->agmt)); + } if (rd->stop_result_thread) { finished = 1; @@ -468,7 +503,8 @@ repl5_inc_waitfor_async_results(result_data *rd) if (rd->last_message_id_received >= rd->last_message_id_sent) { /* If so then we're done */ done = 1; - } else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) { + } else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) + { done = 1; /* no connection == no more results */ } /* @@ -846,10 +882,10 @@ repl5_inc_run(Private_Repl_Protocol *prp) if (!busywaittime){ busywaittime = repl5_get_backoff_min(prp); } - prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime, busywaittime); + prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime , busywaittime); } else { prp_priv->backoff = backoff_new(BACKOFF_EXPONENTIAL, repl5_get_backoff_min(prp), - repl5_get_backoff_max(prp)); + repl5_get_backoff_max(prp)); } next_state = STATE_BACKOFF; backoff_reset(prp_priv->backoff, repl5_inc_backoff_expired, (void *)prp); @@ -1055,6 +1091,7 @@ repl5_inc_run(Private_Repl_Protocol *prp) } else if (rc == UPDATE_YIELD){ dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_YIELD -> STATE_BACKOFF_START"); agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update succeeded and yielded"); + use_busy_backoff_timer = PR_TRUE; next_state = STATE_BACKOFF_START; } else if (rc == UPDATE_TRANSIENT_ERROR){ dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_TRANSIENT_ERROR -> STATE_BACKOFF_START"); @@ -1099,6 +1136,7 @@ repl5_inc_run(Private_Repl_Protocol *prp) ruv_destroy(&ruv); ruv = NULL; } agmt_update_done(prp->agmt, 0); + /* If timed out, close the connection after released the replica */ release_replica(prp); if (rc == UPDATE_TIMEOUT) { @@ -1681,12 +1719,14 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu } else { - int finished = 0; ConnResult replay_crc; - char csn_str[CSN_STRSIZE]; + Replica *replica = (Replica*) object_get_data(prp->replica_object); PRBool subentry_update_needed = PR_FALSE; + PRUint64 release_timeout = replica_get_release_timeout(replica); + char csn_str[CSN_STRSIZE]; int skipped_updates = 0; int fractional_repl; + int finished = 0; #define FRACTIONAL_SKIPPED_THRESHOLD 100 /* Start the results reading thread */ @@ -1906,7 +1946,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu } PR_Lock(rd->lock); /* See if the result thread has hit a problem */ - if (!finished && rd->abort) + + if(!finished && rd->abort_time){ + time_t current_time = time ( NULL ); + if ((current_time - rd->abort_time) >= release_timeout){ + rd->result = UPDATE_YIELD; + return_value = UPDATE_YIELD; + finished = 1; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "Aborting send_updates...(%s)\n", + agmt_get_long_name(rd->prp->agmt)); + } + } + + if (!finished && rd->abort == ABORT_SESSION) { return_value = rd->result; finished = 1; @@ -1916,10 +1969,9 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu if (fractional_repl && subentry_update_needed) { - Replica *replica; ReplicaId rid = -1; /* Used to create the replica keep alive subentry */ Slapi_DN *replarea_sdn = NULL; - replica = (Replica*) object_get_data(prp->replica_object); + if (replica) { rid = replica_get_rid(replica); @@ -1945,7 +1997,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu * If we already have an error, there is no need to check the * async result thread anymore. */ - if (return_value == UPDATE_NO_MORE_UPDATES) + if (return_value == UPDATE_NO_MORE_UPDATES || return_value == UPDATE_YIELD) { /* * We need to double check that an error hasn't popped up from diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c index bb43b9b..9f38d05 100644 --- a/ldap/servers/plugins/replication/repl5_plugins.c +++ b/ldap/servers/plugins/replication/repl5_plugins.c @@ -1077,6 +1077,8 @@ write_changelog_and_ruv (Slapi_PBlock *pb) r = (Replica*)object_get_data (repl_obj); PR_ASSERT (r); + replica_check_release_timeout(r, pb); + if (replica_is_flag_set (r, REPLICA_LOG_CHANGES) && (cl5GetState () == CL5_STATE_OPEN)) { @@ -1365,7 +1367,6 @@ process_postop (Slapi_PBlock *pb) return rc; } - /* * Cancel an operation CSN. This removes it from any CSN pending lists. * This function is called when a previously-generated CSN will not diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c index c7cf25f..6d2452a 100644 --- a/ldap/servers/plugins/replication/repl5_replica.c +++ b/ldap/servers/plugins/replication/repl5_replica.c @@ -23,8 +23,8 @@ #define RUV_SAVE_INTERVAL (30 * 1000) /* 30 seconds */ -#define REPLICA_RDN "cn=replica" -#define CHANGELOG_RDN "cn=legacy changelog" +#define REPLICA_RDN "cn=replica" +#define CHANGELOG_RDN "cn=legacy changelog" /* * A replica is a locally-held copy of a portion of the DIT. @@ -68,6 +68,8 @@ struct replica { Slapi_Counter *backoff_max; /* backoff retry maximum */ Slapi_Counter *precise_purging; /* Enable precise tombstone purging */ PRUint64 agmt_count; /* Number of agmts */ + Slapi_Counter *release_timeout; /* The amount of time to wait before releasing active replica */ + PRUint64 abort_session; /* Abort the current replica session */ }; @@ -201,6 +203,7 @@ replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation /* init the slapi_counter/atomic settings */ r->protocol_timeout = slapi_counter_new(); + r->release_timeout = slapi_counter_new(); r->backoff_min = slapi_counter_new(); r->backoff_max = slapi_counter_new(); @@ -408,6 +411,7 @@ replica_destroy(void **arg) } slapi_counter_destroy(&r->protocol_timeout); + slapi_counter_destroy(&r->release_timeout); slapi_counter_destroy(&r->backoff_min); slapi_counter_destroy(&r->backoff_max); @@ -585,8 +589,7 @@ replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) */ PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid, - const char *locking_purl, - char **current_purl) + const char *locking_purl, char **current_purl) { PRBool rval = PR_TRUE; @@ -609,6 +612,15 @@ replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opi { *current_purl = slapi_ch_strdup(r->locking_purl); } + if (!(r->repl_state_flags & REPLICA_TOTAL_IN_PROGRESS) && + replica_get_release_timeout(r)) + { + /* + * We are not doing a total update, so abort the current session + * so other replicas can acquire this server. + */ + r->abort_session = ABORT_SESSION; + } } else { @@ -617,14 +629,17 @@ replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opi connid, opid, slapi_sdn_get_dn(r->repl_root)); r->repl_state_flags |= REPLICA_IN_USE; + r->abort_session = SESSION_ACQUIRED; if (isInc && *isInc) { r->repl_state_flags |= REPLICA_INCREMENTAL_IN_PROGRESS; } else { - /* if connid or opid != 0, it's a total update */ - /* Both set to 0 means we're disabling replication */ + /* + * If connid or opid != 0, it's a total update. + * Both set to 0 means we're disabling replication + */ if (connid || opid) { r->repl_state_flags |= REPLICA_TOTAL_IN_PROGRESS; @@ -652,13 +667,13 @@ replica_relinquish_exclusive_access(Replica *r, PRUint64 connid, int opid) /* check to see if the replica is in use and log a warning if not */ if (!(r->repl_state_flags & REPLICA_IN_USE)) { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "conn=%" NSPRIu64 " op=%d repl=\"%s\": " "Replica not in use\n", connid, opid, slapi_sdn_get_dn(r->repl_root)); } else { - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "conn=%" NSPRIu64 " op=%d repl=\"%s\": " "Released replica held by locking_purl=%s\n", connid, opid, @@ -970,6 +985,24 @@ replica_get_protocol_timeout(Replica *r) } } +PRUint64 +replica_get_release_timeout(Replica *r) +{ + if(r){ + return slapi_counter_get_value(r->release_timeout); + } else { + return 0; + } +} + +void +replica_set_release_timeout(Replica *r, PRUint64 limit) +{ + if(r){ + slapi_counter_set_value(r->release_timeout, limit); + } +} + void replica_set_protocol_timeout(Replica *r, PRUint64 timeout) { @@ -977,6 +1010,7 @@ replica_set_protocol_timeout(Replica *r, PRUint64 timeout) slapi_counter_set_value(r->protocol_timeout, timeout); } } + void replica_set_groupdn_checkinterval(Replica *r, int interval) { @@ -1064,11 +1098,7 @@ replica_get_legacy_purl (const Replica *r) char *purl; replica_lock(r->repl_lock); - - PR_ASSERT (r->legacy_consumer); - purl = slapi_ch_strdup(r->legacy_purl); - replica_unlock(r->repl_lock); return purl; @@ -1924,6 +1954,7 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext) int backoff_min; int backoff_max; int ptimeout = 0; + int release_timeout = 0; int rc; PR_ASSERT (r && e); @@ -2008,6 +2039,14 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext) slapi_counter_set_value(r->protocol_timeout, ptimeout); } + /* Get the release timeout */ + release_timeout = slapi_entry_attr_get_int(e, type_replicaReleaseTimeout); + if(release_timeout <= 0){ + slapi_counter_set_value(r->release_timeout, 0); + } else { + slapi_counter_set_value(r->release_timeout, release_timeout); + } + /* check for precise tombstone purging */ precise_purging = slapi_entry_attr_get_charptr(e, type_replicaPrecisePurge); if(precise_purging){ @@ -4029,21 +4068,21 @@ replica_disable_replication (Replica *r, Object *r_obj) ruv_get_first_id_and_purl(repl_ruv, &junkrid, &p_locking_purl); locking_purl = slapi_ch_strdup(p_locking_purl); p_locking_purl = NULL; - repl_ruv = NULL; - while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication", + repl_ruv = NULL; + while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication", ¤t_purl)) { - if (!isInc) /* already locked, but not by inc update - break */ - break; - isInc = PR_FALSE; - slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + if (!isInc) /* already locked, but not by inc update - break */ + break; + isInc = PR_FALSE; + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "replica_disable_replication: " "replica %s is already locked by (%s) for incoming " "incremental update; sleeping 100ms\n", - slapi_sdn_get_ndn (replica_get_root (r)), + slapi_sdn_get_ndn (replica_get_root (r)), current_purl ? current_purl : "unknown"); slapi_ch_free_string(¤t_purl); - DS_Sleep(PR_MillisecondsToInterval(100)); - } + DS_Sleep(PR_MillisecondsToInterval(100)); + } slapi_ch_free_string(¤t_purl); slapi_ch_free_string(&locking_purl); @@ -4281,3 +4320,57 @@ replica_decr_agmt_count(Replica *r) } } } + +/* + * Add the "Abort Replication Session" control to the pblock + */ +static void +replica_add_session_abort_control(Slapi_PBlock *pb) +{ + LDAPControl ctrl = {0}; + BerElement *ber; + struct berval *bvp; + int rc; + + /* Build the BER payload */ + if ( (ber = der_alloc()) == NULL ) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "add_session_abort_control: Failed to create ber\n"); + return; + } + rc = ber_printf( ber, "{}"); + if (rc != -1) { + rc = ber_flatten( ber, &bvp ); + } + ber_free( ber, 1 ); + if ( rc == -1 ) { + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "add_session_abort_control: Failed to flatten ber\n"); + return; + } + + ctrl.ldctl_oid = slapi_ch_strdup( REPL_ABORT_SESSION_OID ); + ctrl.ldctl_value = *bvp; + bvp->bv_val = NULL; + ber_bvfree( bvp ); + slapi_pblock_set(pb, SLAPI_ADD_RESCONTROL, &ctrl); + + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, + "add_session_abort_control: abort control successfully added to result\n"); +} + +/* + * Check if we have exceeded the failed replica acquire limit, + * if so, end the replication session. + */ +void +replica_check_release_timeout(Replica *r, Slapi_PBlock *pb) +{ + replica_lock(r->repl_lock); + if(r->abort_session == ABORT_SESSION){ + /* Need to abort this session (just send the control once) */ + replica_add_session_abort_control(pb); + r->abort_session = SESSION_ABORTED; + } + replica_unlock(r->repl_lock); +} diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c index 4d7135c..71b3c92 100644 --- a/ldap/servers/plugins/replication/repl5_replica_config.c +++ b/ldap/servers/plugins/replication/repl5_replica_config.c @@ -406,6 +406,11 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* if (apply_mods) replica_set_precise_purging(r, 0); } + else if (strcasecmp (config_attr, type_replicaReleaseTimeout) == 0 ) + { + if (apply_mods) + replica_set_release_timeout(r, 0); + } else { *returncode = LDAP_UNWILLING_TO_PERFORM; @@ -592,6 +597,23 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry* } } } + else if (strcasecmp (config_attr, type_replicaReleaseTimeout) == 0 ) + { + if (apply_mods) + { + PRUint64 val = atoll(config_attr_value); + + if(val < 0){ + *returncode = LDAP_UNWILLING_TO_PERFORM; + PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE, + "attribute %s value (%s) is invalid, must be a number zero or greater.\n", + config_attr, config_attr_value); + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", errortext); + break; + } + replica_set_release_timeout(r, val); + } + } else { *returncode = LDAP_UNWILLING_TO_PERFORM; diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c index 331f839..8b891fb 100644 --- a/ldap/servers/plugins/replication/repl_globals.c +++ b/ldap/servers/plugins/replication/repl_globals.c @@ -87,6 +87,7 @@ const char *type_ruvElementUpdatetime = "nsruvReplicaLastModified"; const char *type_replicaCleanRUV = "nsds5ReplicaCleanRUV"; const char *type_replicaAbortCleanRUV = "nsds5ReplicaAbortCleanRUV"; const char *type_replicaProtocolTimeout = "nsds5ReplicaProtocolTimeout"; +const char *type_replicaReleaseTimeout = "nsds5ReplicaReleaseTimeout"; const char *type_replicaBackoffMin = "nsds5ReplicaBackoffMin"; const char *type_replicaBackoffMax = "nsds5ReplicaBackoffMax"; const char *type_replicaPrecisePurge = "nsds5ReplicaPreciseTombstonePurging"; -- 2.4.11