amoralej / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 years ago
Clone
Blob Blame History Raw
From 94377fba9dbcfc2fe47a32cc7cb85766813ad482 Mon Sep 17 00:00:00 2001
From: Mark Reynolds <mreynolds@redhat.com>
Date: Wed, 8 Jun 2016 13:06:46 -0400
Subject: [PATCH 98/99] Ticket 48636 - Improve replication convergence

Bug Description:  In a busy MMR environment where multiple masters are being
                  updated at the same time the replica sessions stay open for
                  a very long time.  This causes other masters to wait to send
                  their updates.  This causes lop-sided convergence.  Where
                  entries added to the MMR environment, but on different masters,
                  take a very different amount of time until they are each seen
                  on all the replicas.

Fix Description:  A new configuratoin setting was added (nsds5ReplicaReleaseTimeout)
                  to the replica configuration entry.  So when replica A tries
                  to acquire a replica B, replica B send a control back to the
                  master(master C) that is updating replica B to abort the session.
                  Master C will continue sending updates for the amount of time
                  specified in the the "release timeout", then it will "yield" its
                  current session so other replicas can acquire that replica.

https://fedorahosted.org/389/ticket/48636

Reviewed by: lkrispen & nhosoi(Thanks!!)

(cherry picked from commit a1545cdae48e4b4e1fc87a168e4d8f959626f112)
(cherry picked from commit a085b0cd6b39fc85821777b7bcd2a8a2482a48bf)
---
 ldap/schema/01core389.ldif                         |   3 +-
 ldap/servers/plugins/replication/repl5.h           |  14 ++-
 .../plugins/replication/repl5_inc_protocol.c       | 102 ++++++++++++----
 ldap/servers/plugins/replication/repl5_plugins.c   |   3 +-
 ldap/servers/plugins/replication/repl5_replica.c   | 135 +++++++++++++++++----
 .../plugins/replication/repl5_replica_config.c     |  22 ++++
 ldap/servers/plugins/replication/repl_globals.c    |   1 +
 7 files changed, 229 insertions(+), 51 deletions(-)

diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
index aebdb5a..14143ed 100644
--- a/ldap/schema/01core389.ldif
+++ b/ldap/schema/01core389.ldif
@@ -278,6 +278,7 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause'
 attributeTypes: ( 2.16.840.1.113730.3.1.2313 NAME 'nsslapd-changelogtrim-interval' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
 attributeTypes: ( 2.16.840.1.113730.3.1.2314 NAME 'nsslapd-changelogcompactdb-interval' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
 attributeTypes: ( 2.16.840.1.113730.3.1.2315 NAME 'nsDS5ReplicaWaitForAsyncResults' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2333 NAME 'nsds5ReplicaReleaseTimeout' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
 #
 # objectclasses
 #
@@ -287,7 +288,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.44 NAME 'nsIndex' DESC 'Netscape defined
 objectClasses: ( 2.16.840.1.113730.3.2.109 NAME 'nsBackendInstance' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
-objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top  MUST ( nsDS5ReplicaRoot $  nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
+objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top  MUST ( nsDS5ReplicaRoot $  nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax $ nsds5ReplicaReleaseTimeout ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause $ nsDS5ReplicaWaitForAsyncResults ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 307da82..6f6c81a 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -69,6 +69,10 @@
 #define REPL_ABORT_CLEANRUV_OID "2.16.840.1.113730.3.6.6"
 #define REPL_CLEANRUV_GET_MAXCSN_OID "2.16.840.1.113730.3.6.7"
 #define REPL_CLEANRUV_CHECK_STATUS_OID "2.16.840.1.113730.3.6.8"
+#define REPL_ABORT_SESSION_OID "2.16.840.1.113730.3.6.9"
+#define SESSION_ACQUIRED 0
+#define ABORT_SESSION 1
+#define SESSION_ABORTED 2
 
 #define CLEANRUV_ACCEPTED "accepted"
 #define CLEANRUV_REJECTED "rejected"
@@ -141,6 +145,7 @@ extern const char *type_nsds5ReplicaStripAttrs;
 extern const char *type_nsds5ReplicaFlowControlWindow;
 extern const char *type_nsds5ReplicaFlowControlPause;
 extern const char *type_replicaProtocolTimeout;
+extern const char *type_replicaReleaseTimeout;
 extern const char *type_replicaBackoffMin;
 extern const char *type_replicaBackoffMax;
 extern const char *type_replicaPrecisePurge;
@@ -526,9 +531,9 @@ Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_
 void replica_destroy(void **arg);
 int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid);
 int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid);
-PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
-									const char *locking_purl,
-									char **current_purl);
+PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid,
+                                    int opid, const char *locking_purl,
+                                    char **current_purl);
 void replica_relinquish_exclusive_access(Replica *r, PRUint64 connid, int opid);
 PRBool replica_get_tombstone_reap_active(const Replica *r);
 const Slapi_DN *replica_get_root(const Replica *r);
@@ -598,6 +603,8 @@ void replica_update_state (time_t when, void *arg);
 void replica_reset_csn_pl(Replica *r);
 PRUint64 replica_get_protocol_timeout(Replica *r);
 void replica_set_protocol_timeout(Replica *r, PRUint64 timeout);
+PRUint64 replica_get_release_timeout(Replica *r);
+void replica_set_release_timeout(Replica *r, PRUint64 timeout);
 void replica_set_groupdn_checkinterval(Replica *r, int timeout);
 PRUint64 replica_get_backoff_min(Replica *r);
 PRUint64 replica_get_backoff_max(Replica *r);
@@ -609,6 +616,7 @@ void replica_decr_agmt_count(Replica *r);
 PRUint64 replica_get_precise_purging(Replica *r);
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
 PRBool ignore_error_and_keep_going(int error);
+void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
 
 /* The functions below handles the state flag */
 /* Current internal state flags */
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 927f835..d6fb898 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -36,6 +36,11 @@ Perhaps these events should be properties of the main protocol.
 #include "repl5_prot_private.h"
 #include "cl5_api.h"
 
+#include "repl5.h"
+#include "repl5_prot_private.h"
+#include "cl5_api.h"
+#include "slapi-plugin.h"
+
 extern int slapi_log_urp;
 
 /*** from proto-slap.h ***/
@@ -82,6 +87,7 @@ typedef struct result_data
 	int flowcontrol_detection;
 	int result; /* The UPDATE_TRANSIENT_ERROR etc */
 	int WaitForAsyncResults;
+	time_t abort_time;
 } result_data;
 
 /* Various states the incremental protocol can pass through */
@@ -121,6 +127,7 @@ typedef struct result_data
 #define EXAMINE_RUV_PARAM_ERROR 405
 
 #define MAX_CHANGES_PER_SESSION	10000
+
 /*
  * Maximum time to wait between replication sessions. If we
  * don't see any updates for a period equal to this interval,
@@ -240,19 +247,21 @@ repl5_inc_result_threadmain(void *param)
 	Repl_Connection *conn = rd->prp->conn;
 	int finished = 0;
 	int message_id = 0;
+	int yield_session = 0;
 
 	slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain starting\n");
 	while (!finished) 
 	{
+		LDAPControl **returned_controls = NULL;
 		repl5_inc_operation *op = NULL;
-		int connection_error = 0;
+		ReplicaId replica_id = 0;
 		char *csn_str = NULL; 
 		char *uniqueid = NULL;
-		ReplicaId replica_id = 0;
-		int operation_code = 0;
 		char *ldap_error_string = NULL;
 		time_t time_now = 0;
 		time_t start_time = time( NULL );
+		int connection_error = 0;
+		int operation_code = 0;
 		int backoff_time = 1;
 
 		/* Read the next result */
@@ -264,7 +273,7 @@ repl5_inc_result_threadmain(void *param)
 
 		while (!finished)
 		{
-			conres = conn_read_result_ex(conn, NULL, NULL, NULL, LDAP_RES_ANY, &message_id, 0);
+			conres = conn_read_result_ex(conn, NULL, NULL, &returned_controls, LDAP_RES_ANY, &message_id, 0);
 			slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: read result for message_id %d\n", message_id);
 			/* Timeout here means that we didn't block, not a real timeout */
 			if (CONN_TIMEOUT == conres)
@@ -292,9 +301,19 @@ repl5_inc_result_threadmain(void *param)
 					finished = 1;
 				}
 				PR_Unlock(rd->lock);
-			} else
-			{
-				/* Something other than a timeout, so we exit the loop */
+			} else {
+				/*
+				 * Something other than a timeout, so we exit the loop.
+				 * First check if we were told to abort the session
+				 */;
+				Replica *r = (Replica*)object_get_data(rd->prp->replica_object);
+				if (replica_get_release_timeout(r) &&
+				    slapi_control_present(returned_controls,
+				                          REPL_ABORT_SESSION_OID,
+				                          NULL, NULL))
+				{
+					yield_session = 1;
+				}
 				break;
 			}
 		}
@@ -318,21 +337,29 @@ repl5_inc_result_threadmain(void *param)
 			}
 
 			conn_get_error_ex(conn, &operation_code, &connection_error, &ldap_error_string);
-			slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n", operation_code,connection_error,conres,message_id,ldap_error_string);
-			return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, csn_str, uniqueid, replica_id, &should_finish, &(rd->num_changes_sent));
+			slapi_log_error(SLAPI_LOG_REPL, NULL,
+			                "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n",
+			                operation_code,connection_error,conres,message_id,ldap_error_string);
+			return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error,
+			                                               csn_str, uniqueid, replica_id, &should_finish,
+			                                               &(rd->num_changes_sent));
 			if (return_value || should_finish)
 			{
-				slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: got op result %d should finish %d\n", return_value, should_finish);
+				slapi_log_error(SLAPI_LOG_REPL, NULL,
+				                "repl5_inc_result_threadmain: got op result %d should finish %d\n",
+				                return_value, should_finish);
 				/* If so then we need to take steps to abort the update process */
 				PR_Lock(rd->lock);
 				rd->result = return_value;
-				rd->abort = 1;
+				rd->abort = ABORT_SESSION;
 				PR_Unlock(rd->lock);
-				/* We also need to log the error, including details stored from when the operation was sent */
-				/* we cannot finish yet - we still need to waitfor the pending results, then
-				   the main repl code will shut down this thread */
-				/* we can finish if we have disconnected - in that case, there will be nothing
-				   to read */
+				/*
+				 * We also need to log the error, including details stored from
+				 * when the operation was sent.  We cannot finish yet - we still
+				 * need to wait for the pending results, then the main repl code
+				 * will shut down this thread.  We can finish if we have
+				 * disconnected - in that case, there will be nothing to read
+				 */
 				if (return_value == UPDATE_CONNECTION_LOST) {
 					finished = 1;
 				}
@@ -341,8 +368,16 @@ repl5_inc_result_threadmain(void *param)
 				rd->result = return_value;
 			}
 		}
+
 		/* Should we stop ? */
 		PR_Lock(rd->lock);
+		if (!finished && yield_session && rd->abort != SESSION_ABORTED && rd->abort_time == 0) {
+			rd->abort_time = time( NULL );
+			rd->abort = SESSION_ABORTED;  /* only set the abort time once */
+			slapi_log_error(SLAPI_LOG_REPL, "repl5_inc_result_threadmain",
+			                "Abort control detected, setting abort time...(%s)\n",
+			                agmt_get_long_name(rd->prp->agmt));
+		}
 		if (rd->stop_result_thread) 
 		{
 			finished = 1;
@@ -468,7 +503,8 @@ repl5_inc_waitfor_async_results(result_data *rd)
 		if (rd->last_message_id_received >= rd->last_message_id_sent) {
 			/* If so then we're done */
 			done = 1;
-		} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) {
+		} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST))
+		{
 			done = 1; /* no connection == no more results */
 		}
 		/*
@@ -846,10 +882,10 @@ repl5_inc_run(Private_Repl_Protocol *prp)
                   if (!busywaittime){
                       busywaittime = repl5_get_backoff_min(prp);
                   }
-                  prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime, busywaittime);
+                  prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime , busywaittime);
               } else {
                   prp_priv->backoff = backoff_new(BACKOFF_EXPONENTIAL, repl5_get_backoff_min(prp),
-                		  repl5_get_backoff_max(prp));
+                                                  repl5_get_backoff_max(prp));
               }
               next_state = STATE_BACKOFF;
               backoff_reset(prp_priv->backoff, repl5_inc_backoff_expired, (void *)prp);
@@ -1055,6 +1091,7 @@ repl5_inc_run(Private_Repl_Protocol *prp)
                       } else if (rc == UPDATE_YIELD){
                           dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_YIELD -> STATE_BACKOFF_START");
                           agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update succeeded and yielded");
+                          use_busy_backoff_timer = PR_TRUE;
                           next_state = STATE_BACKOFF_START;
                       } else if (rc == UPDATE_TRANSIENT_ERROR){
                           dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_TRANSIENT_ERROR -> STATE_BACKOFF_START");
@@ -1099,6 +1136,7 @@ repl5_inc_run(Private_Repl_Protocol *prp)
               ruv_destroy(&ruv); ruv = NULL;
           }
           agmt_update_done(prp->agmt, 0);
+
           /* If timed out, close the connection after released the replica */
           release_replica(prp);
           if (rc == UPDATE_TIMEOUT) {
@@ -1681,12 +1719,14 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 	}
 	else
 	{
-		int finished = 0;
 		ConnResult replay_crc;
-		char csn_str[CSN_STRSIZE];
+		Replica *replica = (Replica*) object_get_data(prp->replica_object);
 		PRBool subentry_update_needed = PR_FALSE;
+		PRUint64 release_timeout = replica_get_release_timeout(replica);
+		char csn_str[CSN_STRSIZE];
 		int skipped_updates = 0;
 		int fractional_repl;
+		int finished = 0;
 #define FRACTIONAL_SKIPPED_THRESHOLD 100
 
 		/* Start the results reading thread */
@@ -1906,7 +1946,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 			}
 			PR_Lock(rd->lock);
 			/* See if the result thread has hit a problem */
-			if (!finished && rd->abort)
+
+			if(!finished && rd->abort_time){
+				time_t current_time = time ( NULL );
+				if ((current_time - rd->abort_time) >= release_timeout){
+					rd->result = UPDATE_YIELD;
+					return_value = UPDATE_YIELD;
+					finished = 1;
+					slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+					                "Aborting send_updates...(%s)\n",
+					                agmt_get_long_name(rd->prp->agmt));
+				}
+			}
+
+			if (!finished && rd->abort == ABORT_SESSION)
 			{
 				return_value = rd->result;
 				finished = 1;
@@ -1916,10 +1969,9 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 
 		if (fractional_repl && subentry_update_needed)
 		{
-			Replica *replica;
 			ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
 			Slapi_DN *replarea_sdn = NULL;
-			replica = (Replica*) object_get_data(prp->replica_object);
+
 			if (replica)
 			{
 				rid = replica_get_rid(replica);
@@ -1945,7 +1997,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 				 * If we already have an error, there is no need to check the
 				 * async result thread anymore.
 				 */
-				if (return_value == UPDATE_NO_MORE_UPDATES)
+				if (return_value == UPDATE_NO_MORE_UPDATES || return_value == UPDATE_YIELD)
 				{
 					/*
 					 * We need to double check that an error hasn't popped up from
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
index bb43b9b..9f38d05 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -1077,6 +1077,8 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	r = (Replica*)object_get_data (repl_obj);
 	PR_ASSERT (r);
 
+	replica_check_release_timeout(r, pb);
+
 	if (replica_is_flag_set (r, REPLICA_LOG_CHANGES) &&
 		(cl5GetState () == CL5_STATE_OPEN))
 	{
@@ -1365,7 +1367,6 @@ process_postop (Slapi_PBlock *pb)
 	return rc;
 }
 
-
 /*
  * Cancel an operation CSN. This removes it from any CSN pending lists.
  * This function is called when a previously-generated CSN will not
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
index c7cf25f..6d2452a 100644
--- a/ldap/servers/plugins/replication/repl5_replica.c
+++ b/ldap/servers/plugins/replication/repl5_replica.c
@@ -23,8 +23,8 @@
 
 #define RUV_SAVE_INTERVAL (30 * 1000) /* 30 seconds */
 
-#define REPLICA_RDN				 "cn=replica"
-#define CHANGELOG_RDN            "cn=legacy changelog"
+#define REPLICA_RDN "cn=replica"
+#define CHANGELOG_RDN "cn=legacy changelog"
 
 /*
  * A replica is a locally-held copy of a portion of the DIT.
@@ -68,6 +68,8 @@ struct replica {
 	Slapi_Counter *backoff_max;		/* backoff retry maximum */
 	Slapi_Counter *precise_purging;		/* Enable precise tombstone purging */
 	PRUint64 agmt_count;			/* Number of agmts */
+	Slapi_Counter *release_timeout;		/* The amount of time to wait before releasing active replica */
+	PRUint64 abort_session;			/* Abort the current replica session */
 };
 
 
@@ -201,6 +203,7 @@ replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation
 
 	/* init the slapi_counter/atomic settings */
 	r->protocol_timeout = slapi_counter_new();
+	r->release_timeout = slapi_counter_new();
 	r->backoff_min = slapi_counter_new();
 	r->backoff_max = slapi_counter_new();
 
@@ -408,6 +411,7 @@ replica_destroy(void **arg)
 	}
 
 	slapi_counter_destroy(&r->protocol_timeout);
+	slapi_counter_destroy(&r->release_timeout);
 	slapi_counter_destroy(&r->backoff_min);
 	slapi_counter_destroy(&r->backoff_max);
 
@@ -585,8 +589,7 @@ replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid)
  */
 PRBool
 replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
-							 const char *locking_purl,
-							 char **current_purl)
+							 const char *locking_purl, char **current_purl)
 {
 	PRBool rval = PR_TRUE;
 
@@ -609,6 +612,15 @@ replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opi
 		{
 			*current_purl = slapi_ch_strdup(r->locking_purl);
 		}
+		if (!(r->repl_state_flags & REPLICA_TOTAL_IN_PROGRESS) &&
+		    replica_get_release_timeout(r))
+		{
+			/*
+			 * We are not doing a total update, so abort the current session
+			 * so other replicas can acquire this server.
+			 */
+			r->abort_session = ABORT_SESSION;
+		}
 	}
 	else
 	{
@@ -617,14 +629,17 @@ replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opi
 						connid, opid,
 						slapi_sdn_get_dn(r->repl_root));
 		r->repl_state_flags |= REPLICA_IN_USE;
+		r->abort_session = SESSION_ACQUIRED;
 		if (isInc && *isInc) 
 		{
 			r->repl_state_flags |= REPLICA_INCREMENTAL_IN_PROGRESS;
 		}
 		else
 		{ 
-			/* if connid or opid != 0, it's a total update */
-			/* Both set to 0 means we're disabling replication */
+			/*
+			 * If connid or opid != 0, it's a total update.
+			 * Both set to 0 means we're disabling replication
+			 */
 			if (connid || opid)
 			{
 				r->repl_state_flags |= REPLICA_TOTAL_IN_PROGRESS;
@@ -652,13 +667,13 @@ replica_relinquish_exclusive_access(Replica *r, PRUint64 connid, int opid)
 	/* check to see if the replica is in use and log a warning if not */
 	if (!(r->repl_state_flags & REPLICA_IN_USE))
 	{
-        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, 
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
 					"conn=%" NSPRIu64 " op=%d repl=\"%s\": "
 					"Replica not in use\n",
 					connid, opid,
 					slapi_sdn_get_dn(r->repl_root));
 	} else {
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, 
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
 					"conn=%" NSPRIu64 " op=%d repl=\"%s\": "
 					"Released replica held by locking_purl=%s\n",
 					connid, opid,
@@ -970,6 +985,24 @@ replica_get_protocol_timeout(Replica *r)
 	}
 }
 
+PRUint64
+replica_get_release_timeout(Replica *r)
+{
+	if(r){
+		return slapi_counter_get_value(r->release_timeout);
+	} else {
+		return 0;
+	}
+}
+
+void
+replica_set_release_timeout(Replica *r, PRUint64 limit)
+{
+	if(r){
+		slapi_counter_set_value(r->release_timeout, limit);
+	}
+}
+
 void
 replica_set_protocol_timeout(Replica *r, PRUint64 timeout)
 {
@@ -977,6 +1010,7 @@ replica_set_protocol_timeout(Replica *r, PRUint64 timeout)
 		slapi_counter_set_value(r->protocol_timeout, timeout);
 	}
 }
+
 void
 replica_set_groupdn_checkinterval(Replica *r, int interval)
 {
@@ -1064,11 +1098,7 @@ replica_get_legacy_purl (const Replica *r)
     char *purl;
 
     replica_lock(r->repl_lock);
-
-    PR_ASSERT (r->legacy_consumer);
-
     purl = slapi_ch_strdup(r->legacy_purl);
-
     replica_unlock(r->repl_lock);
 
     return purl;
@@ -1924,6 +1954,7 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext)
     int backoff_min;
     int backoff_max;
     int ptimeout = 0;
+    int release_timeout = 0;
     int rc;
 
     PR_ASSERT (r && e);
@@ -2008,6 +2039,14 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext)
         slapi_counter_set_value(r->protocol_timeout, ptimeout);
     }
 
+    /* Get the release timeout */
+    release_timeout = slapi_entry_attr_get_int(e, type_replicaReleaseTimeout);
+    if(release_timeout <= 0){
+        slapi_counter_set_value(r->release_timeout, 0);
+    } else {
+        slapi_counter_set_value(r->release_timeout, release_timeout);
+    }
+
     /* check for precise tombstone purging */
     precise_purging = slapi_entry_attr_get_charptr(e, type_replicaPrecisePurge);
     if(precise_purging){
@@ -4029,21 +4068,21 @@ replica_disable_replication (Replica *r, Object *r_obj)
 	ruv_get_first_id_and_purl(repl_ruv, &junkrid, &p_locking_purl);
 	locking_purl = slapi_ch_strdup(p_locking_purl);
 	p_locking_purl = NULL;
-	repl_ruv = NULL;	
-    while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication",
+	repl_ruv = NULL;
+	while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication",
 										 &current_purl)) {
-        if (!isInc) /* already locked, but not by inc update - break */
-            break;
-        isInc = PR_FALSE;
-        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+		if (!isInc) /* already locked, but not by inc update - break */
+			break;
+		isInc = PR_FALSE;
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
 						"replica_disable_replication: "
 						"replica %s is already locked by (%s) for incoming "
 						"incremental update; sleeping 100ms\n",
-                        slapi_sdn_get_ndn (replica_get_root (r)),
+						slapi_sdn_get_ndn (replica_get_root (r)),
 						current_purl ? current_purl : "unknown");
 		slapi_ch_free_string(&current_purl);
-        DS_Sleep(PR_MillisecondsToInterval(100));
-    }
+		DS_Sleep(PR_MillisecondsToInterval(100));
+	}
 
 	slapi_ch_free_string(&current_purl);
 	slapi_ch_free_string(&locking_purl);
@@ -4281,3 +4320,57 @@ replica_decr_agmt_count(Replica *r)
 		}
 	}
 }
+
+/*
+ * Add the "Abort Replication Session" control to the pblock
+ */
+static void
+replica_add_session_abort_control(Slapi_PBlock *pb)
+{
+	LDAPControl ctrl = {0};
+	BerElement *ber;
+	struct berval *bvp;
+	int rc;
+
+	/* Build the BER payload */
+	if ( (ber = der_alloc()) == NULL ) {
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+			"add_session_abort_control: Failed to create ber\n");
+		return;
+	}
+	rc = ber_printf( ber, "{}");
+	if (rc != -1) {
+		rc = ber_flatten( ber, &bvp );
+	}
+	ber_free( ber, 1 );
+	if ( rc == -1 ) {
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+			"add_session_abort_control: Failed to flatten ber\n");
+		return;
+	}
+
+	ctrl.ldctl_oid = slapi_ch_strdup( REPL_ABORT_SESSION_OID );
+	ctrl.ldctl_value = *bvp;
+	bvp->bv_val = NULL;
+	ber_bvfree( bvp );
+	slapi_pblock_set(pb, SLAPI_ADD_RESCONTROL, &ctrl);
+
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+		"add_session_abort_control: abort control successfully added to result\n");
+}
+
+/*
+ * Check if we have exceeded the failed replica acquire limit,
+ * if so, end the replication session.
+ */
+void
+replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
+{
+	replica_lock(r->repl_lock);
+	if(r->abort_session  == ABORT_SESSION){
+		/* Need to abort this session (just send the control once) */
+		replica_add_session_abort_control(pb);
+		r->abort_session = SESSION_ABORTED;
+	}
+	replica_unlock(r->repl_lock);
+}
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
index 4d7135c..71b3c92 100644
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
@@ -406,6 +406,11 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry*
                     if (apply_mods)
                         replica_set_precise_purging(r, 0);
                     }
+                else if (strcasecmp (config_attr, type_replicaReleaseTimeout) == 0 )
+                {
+                    if (apply_mods)
+                        replica_set_release_timeout(r, 0);
+                    }
                 else
                 {
                     *returncode = LDAP_UNWILLING_TO_PERFORM;
@@ -592,6 +597,23 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry*
                         }
                     }
                 }
+                else if (strcasecmp (config_attr, type_replicaReleaseTimeout) == 0 )
+                {
+                    if (apply_mods)
+                    {
+                        PRUint64 val = atoll(config_attr_value);
+
+                        if(val < 0){
+                            *returncode = LDAP_UNWILLING_TO_PERFORM;
+                            PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE,
+                                    "attribute %s value (%s) is invalid, must be a number zero or greater.\n",
+                                    config_attr, config_attr_value);
+                            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", errortext);
+                            break;
+                        }
+                        replica_set_release_timeout(r, val);
+                    }
+                }
                 else
                 {
                     *returncode = LDAP_UNWILLING_TO_PERFORM;
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
index 331f839..8b891fb 100644
--- a/ldap/servers/plugins/replication/repl_globals.c
+++ b/ldap/servers/plugins/replication/repl_globals.c
@@ -87,6 +87,7 @@ const char *type_ruvElementUpdatetime = "nsruvReplicaLastModified";
 const char *type_replicaCleanRUV = "nsds5ReplicaCleanRUV";
 const char *type_replicaAbortCleanRUV = "nsds5ReplicaAbortCleanRUV";
 const char *type_replicaProtocolTimeout = "nsds5ReplicaProtocolTimeout";
+const char *type_replicaReleaseTimeout = "nsds5ReplicaReleaseTimeout";
 const char *type_replicaBackoffMin = "nsds5ReplicaBackoffMin";
 const char *type_replicaBackoffMax = "nsds5ReplicaBackoffMax";
 const char *type_replicaPrecisePurge = "nsds5ReplicaPreciseTombstonePurging";
-- 
2.4.11