Blame SOURCES/0098-Ticket-48636-Improve-replication-convergence.patch

c16027
From 94377fba9dbcfc2fe47a32cc7cb85766813ad482 Mon Sep 17 00:00:00 2001
c16027
From: Mark Reynolds <mreynolds@redhat.com>
c16027
Date: Wed, 8 Jun 2016 13:06:46 -0400
c16027
Subject: [PATCH 98/99] Ticket 48636 - Improve replication convergence
c16027
c16027
Bug Description:  In a busy MMR environment where multiple masters are being
c16027
                  updated at the same time the replica sessions stay open for
c16027
                  a very long time.  This causes other masters to wait to send
c16027
                  their updates.  This causes lop-sided convergence.  Where
c16027
                  entries added to the MMR environment, but on different masters,
c16027
                  take a very different amount of time until they are each seen
c16027
                  on all the replicas.
c16027
c16027
Fix Description:  A new configuratoin setting was added (nsds5ReplicaReleaseTimeout)
c16027
                  to the replica configuration entry.  So when replica A tries
c16027
                  to acquire a replica B, replica B send a control back to the
c16027
                  master(master C) that is updating replica B to abort the session.
c16027
                  Master C will continue sending updates for the amount of time
c16027
                  specified in the the "release timeout", then it will "yield" its
c16027
                  current session so other replicas can acquire that replica.
c16027
c16027
https://fedorahosted.org/389/ticket/48636
c16027
c16027
Reviewed by: lkrispen & nhosoi(Thanks!!)
c16027
c16027
(cherry picked from commit a1545cdae48e4b4e1fc87a168e4d8f959626f112)
c16027
(cherry picked from commit a085b0cd6b39fc85821777b7bcd2a8a2482a48bf)
c16027
---
c16027
 ldap/schema/01core389.ldif                         |   3 +-
c16027
 ldap/servers/plugins/replication/repl5.h           |  14 ++-
c16027
 .../plugins/replication/repl5_inc_protocol.c       | 102 ++++++++++++----
c16027
 ldap/servers/plugins/replication/repl5_plugins.c   |   3 +-
c16027
 ldap/servers/plugins/replication/repl5_replica.c   | 135 +++++++++++++++++----
c16027
 .../plugins/replication/repl5_replica_config.c     |  22 ++++
c16027
 ldap/servers/plugins/replication/repl_globals.c    |   1 +
c16027
 7 files changed, 229 insertions(+), 51 deletions(-)
c16027
c16027
diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
c16027
index aebdb5a..14143ed 100644
c16027
--- a/ldap/schema/01core389.ldif
c16027
+++ b/ldap/schema/01core389.ldif
c16027
@@ -278,6 +278,7 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause'
c16027
 attributeTypes: ( 2.16.840.1.113730.3.1.2313 NAME 'nsslapd-changelogtrim-interval' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
c16027
 attributeTypes: ( 2.16.840.1.113730.3.1.2314 NAME 'nsslapd-changelogcompactdb-interval' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
c16027
 attributeTypes: ( 2.16.840.1.113730.3.1.2315 NAME 'nsDS5ReplicaWaitForAsyncResults' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
c16027
+attributeTypes: ( 2.16.840.1.113730.3.1.2333 NAME 'nsds5ReplicaReleaseTimeout' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
c16027
 #
c16027
 # objectclasses
c16027
 #
c16027
@@ -287,7 +288,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.44 NAME 'nsIndex' DESC 'Netscape defined
c16027
 objectClasses: ( 2.16.840.1.113730.3.2.109 NAME 'nsBackendInstance' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
c16027
 objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
c16027
 objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
c16027
-objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top  MUST ( nsDS5ReplicaRoot $  nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
c16027
+objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top  MUST ( nsDS5ReplicaRoot $  nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax $ nsds5ReplicaReleaseTimeout ) X-ORIGIN 'Netscape Directory Server' )
c16027
 objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
c16027
 objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause $ nsDS5ReplicaWaitForAsyncResults ) X-ORIGIN 'Netscape Directory Server' )
c16027
 objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
c16027
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
c16027
index 307da82..6f6c81a 100644
c16027
--- a/ldap/servers/plugins/replication/repl5.h
c16027
+++ b/ldap/servers/plugins/replication/repl5.h
c16027
@@ -69,6 +69,10 @@
c16027
 #define REPL_ABORT_CLEANRUV_OID "2.16.840.1.113730.3.6.6"
c16027
 #define REPL_CLEANRUV_GET_MAXCSN_OID "2.16.840.1.113730.3.6.7"
c16027
 #define REPL_CLEANRUV_CHECK_STATUS_OID "2.16.840.1.113730.3.6.8"
c16027
+#define REPL_ABORT_SESSION_OID "2.16.840.1.113730.3.6.9"
c16027
+#define SESSION_ACQUIRED 0
c16027
+#define ABORT_SESSION 1
c16027
+#define SESSION_ABORTED 2
c16027
 
c16027
 #define CLEANRUV_ACCEPTED "accepted"
c16027
 #define CLEANRUV_REJECTED "rejected"
c16027
@@ -141,6 +145,7 @@ extern const char *type_nsds5ReplicaStripAttrs;
c16027
 extern const char *type_nsds5ReplicaFlowControlWindow;
c16027
 extern const char *type_nsds5ReplicaFlowControlPause;
c16027
 extern const char *type_replicaProtocolTimeout;
c16027
+extern const char *type_replicaReleaseTimeout;
c16027
 extern const char *type_replicaBackoffMin;
c16027
 extern const char *type_replicaBackoffMax;
c16027
 extern const char *type_replicaPrecisePurge;
c16027
@@ -526,9 +531,9 @@ Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_
c16027
 void replica_destroy(void **arg);
c16027
 int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid);
c16027
 int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid);
c16027
-PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
c16027
-									const char *locking_purl,
c16027
-									char **current_purl);
c16027
+PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid,
c16027
+                                    int opid, const char *locking_purl,
c16027
+                                    char **current_purl);
c16027
 void replica_relinquish_exclusive_access(Replica *r, PRUint64 connid, int opid);
c16027
 PRBool replica_get_tombstone_reap_active(const Replica *r);
c16027
 const Slapi_DN *replica_get_root(const Replica *r);
c16027
@@ -598,6 +603,8 @@ void replica_update_state (time_t when, void *arg);
c16027
 void replica_reset_csn_pl(Replica *r);
c16027
 PRUint64 replica_get_protocol_timeout(Replica *r);
c16027
 void replica_set_protocol_timeout(Replica *r, PRUint64 timeout);
c16027
+PRUint64 replica_get_release_timeout(Replica *r);
c16027
+void replica_set_release_timeout(Replica *r, PRUint64 timeout);
c16027
 void replica_set_groupdn_checkinterval(Replica *r, int timeout);
c16027
 PRUint64 replica_get_backoff_min(Replica *r);
c16027
 PRUint64 replica_get_backoff_max(Replica *r);
c16027
@@ -609,6 +616,7 @@ void replica_decr_agmt_count(Replica *r);
c16027
 PRUint64 replica_get_precise_purging(Replica *r);
c16027
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
c16027
 PRBool ignore_error_and_keep_going(int error);
c16027
+void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
c16027
 
c16027
 /* The functions below handles the state flag */
c16027
 /* Current internal state flags */
c16027
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
c16027
index 927f835..d6fb898 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
c16027
@@ -36,6 +36,11 @@ Perhaps these events should be properties of the main protocol.
c16027
 #include "repl5_prot_private.h"
c16027
 #include "cl5_api.h"
c16027
 
c16027
+#include "repl5.h"
c16027
+#include "repl5_prot_private.h"
c16027
+#include "cl5_api.h"
c16027
+#include "slapi-plugin.h"
c16027
+
c16027
 extern int slapi_log_urp;
c16027
 
c16027
 /*** from proto-slap.h ***/
c16027
@@ -82,6 +87,7 @@ typedef struct result_data
c16027
 	int flowcontrol_detection;
c16027
 	int result; /* The UPDATE_TRANSIENT_ERROR etc */
c16027
 	int WaitForAsyncResults;
c16027
+	time_t abort_time;
c16027
 } result_data;
c16027
 
c16027
 /* Various states the incremental protocol can pass through */
c16027
@@ -121,6 +127,7 @@ typedef struct result_data
c16027
 #define EXAMINE_RUV_PARAM_ERROR 405
c16027
 
c16027
 #define MAX_CHANGES_PER_SESSION	10000
c16027
+
c16027
 /*
c16027
  * Maximum time to wait between replication sessions. If we
c16027
  * don't see any updates for a period equal to this interval,
c16027
@@ -240,19 +247,21 @@ repl5_inc_result_threadmain(void *param)
c16027
 	Repl_Connection *conn = rd->prp->conn;
c16027
 	int finished = 0;
c16027
 	int message_id = 0;
c16027
+	int yield_session = 0;
c16027
 
c16027
 	slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain starting\n");
c16027
 	while (!finished) 
c16027
 	{
c16027
+		LDAPControl **returned_controls = NULL;
c16027
 		repl5_inc_operation *op = NULL;
c16027
-		int connection_error = 0;
c16027
+		ReplicaId replica_id = 0;
c16027
 		char *csn_str = NULL; 
c16027
 		char *uniqueid = NULL;
c16027
-		ReplicaId replica_id = 0;
c16027
-		int operation_code = 0;
c16027
 		char *ldap_error_string = NULL;
c16027
 		time_t time_now = 0;
c16027
 		time_t start_time = time( NULL );
c16027
+		int connection_error = 0;
c16027
+		int operation_code = 0;
c16027
 		int backoff_time = 1;
c16027
 
c16027
 		/* Read the next result */
c16027
@@ -264,7 +273,7 @@ repl5_inc_result_threadmain(void *param)
c16027
 
c16027
 		while (!finished)
c16027
 		{
c16027
-			conres = conn_read_result_ex(conn, NULL, NULL, NULL, LDAP_RES_ANY, &message_id, 0);
c16027
+			conres = conn_read_result_ex(conn, NULL, NULL, &returned_controls, LDAP_RES_ANY, &message_id, 0);
c16027
 			slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: read result for message_id %d\n", message_id);
c16027
 			/* Timeout here means that we didn't block, not a real timeout */
c16027
 			if (CONN_TIMEOUT == conres)
c16027
@@ -292,9 +301,19 @@ repl5_inc_result_threadmain(void *param)
c16027
 					finished = 1;
c16027
 				}
c16027
 				PR_Unlock(rd->lock);
c16027
-			} else
c16027
-			{
c16027
-				/* Something other than a timeout, so we exit the loop */
c16027
+			} else {
c16027
+				/*
c16027
+				 * Something other than a timeout, so we exit the loop.
c16027
+				 * First check if we were told to abort the session
c16027
+				 */;
c16027
+				Replica *r = (Replica*)object_get_data(rd->prp->replica_object);
c16027
+				if (replica_get_release_timeout(r) &&
c16027
+				    slapi_control_present(returned_controls,
c16027
+				                          REPL_ABORT_SESSION_OID,
c16027
+				                          NULL, NULL))
c16027
+				{
c16027
+					yield_session = 1;
c16027
+				}
c16027
 				break;
c16027
 			}
c16027
 		}
c16027
@@ -318,21 +337,29 @@ repl5_inc_result_threadmain(void *param)
c16027
 			}
c16027
 
c16027
 			conn_get_error_ex(conn, &operation_code, &connection_error, &ldap_error_string);
c16027
-			slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n", operation_code,connection_error,conres,message_id,ldap_error_string);
c16027
-			return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error, csn_str, uniqueid, replica_id, &should_finish, &(rd->num_changes_sent));
c16027
+			slapi_log_error(SLAPI_LOG_REPL, NULL,
c16027
+			                "repl5_inc_result_threadmain: result %d, %d, %d, %d, %s\n",
c16027
+			                operation_code,connection_error,conres,message_id,ldap_error_string);
c16027
+			return_value = repl5_inc_update_from_op_result(rd->prp, conres, connection_error,
c16027
+			                                               csn_str, uniqueid, replica_id, &should_finish,
c16027
+			                                               &(rd->num_changes_sent));
c16027
 			if (return_value || should_finish)
c16027
 			{
c16027
-				slapi_log_error(SLAPI_LOG_REPL, NULL, "repl5_inc_result_threadmain: got op result %d should finish %d\n", return_value, should_finish);
c16027
+				slapi_log_error(SLAPI_LOG_REPL, NULL,
c16027
+				                "repl5_inc_result_threadmain: got op result %d should finish %d\n",
c16027
+				                return_value, should_finish);
c16027
 				/* If so then we need to take steps to abort the update process */
c16027
 				PR_Lock(rd->lock);
c16027
 				rd->result = return_value;
c16027
-				rd->abort = 1;
c16027
+				rd->abort = ABORT_SESSION;
c16027
 				PR_Unlock(rd->lock);
c16027
-				/* We also need to log the error, including details stored from when the operation was sent */
c16027
-				/* we cannot finish yet - we still need to waitfor the pending results, then
c16027
-				   the main repl code will shut down this thread */
c16027
-				/* we can finish if we have disconnected - in that case, there will be nothing
c16027
-				   to read */
c16027
+				/*
c16027
+				 * We also need to log the error, including details stored from
c16027
+				 * when the operation was sent.  We cannot finish yet - we still
c16027
+				 * need to wait for the pending results, then the main repl code
c16027
+				 * will shut down this thread.  We can finish if we have
c16027
+				 * disconnected - in that case, there will be nothing to read
c16027
+				 */
c16027
 				if (return_value == UPDATE_CONNECTION_LOST) {
c16027
 					finished = 1;
c16027
 				}
c16027
@@ -341,8 +368,16 @@ repl5_inc_result_threadmain(void *param)
c16027
 				rd->result = return_value;
c16027
 			}
c16027
 		}
c16027
+
c16027
 		/* Should we stop ? */
c16027
 		PR_Lock(rd->lock);
c16027
+		if (!finished && yield_session && rd->abort != SESSION_ABORTED && rd->abort_time == 0) {
c16027
+			rd->abort_time = time( NULL );
c16027
+			rd->abort = SESSION_ABORTED;  /* only set the abort time once */
c16027
+			slapi_log_error(SLAPI_LOG_REPL, "repl5_inc_result_threadmain",
c16027
+			                "Abort control detected, setting abort time...(%s)\n",
c16027
+			                agmt_get_long_name(rd->prp->agmt));
c16027
+		}
c16027
 		if (rd->stop_result_thread) 
c16027
 		{
c16027
 			finished = 1;
c16027
@@ -468,7 +503,8 @@ repl5_inc_waitfor_async_results(result_data *rd)
c16027
 		if (rd->last_message_id_received >= rd->last_message_id_sent) {
c16027
 			/* If so then we're done */
c16027
 			done = 1;
c16027
-		} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) {
c16027
+		} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST))
c16027
+		{
c16027
 			done = 1; /* no connection == no more results */
c16027
 		}
c16027
 		/*
c16027
@@ -846,10 +882,10 @@ repl5_inc_run(Private_Repl_Protocol *prp)
c16027
                   if (!busywaittime){
c16027
                       busywaittime = repl5_get_backoff_min(prp);
c16027
                   }
c16027
-                  prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime, busywaittime);
c16027
+                  prp_priv->backoff = backoff_new(BACKOFF_FIXED, busywaittime , busywaittime);
c16027
               } else {
c16027
                   prp_priv->backoff = backoff_new(BACKOFF_EXPONENTIAL, repl5_get_backoff_min(prp),
c16027
-                		  repl5_get_backoff_max(prp));
c16027
+                                                  repl5_get_backoff_max(prp));
c16027
               }
c16027
               next_state = STATE_BACKOFF;
c16027
               backoff_reset(prp_priv->backoff, repl5_inc_backoff_expired, (void *)prp);
c16027
@@ -1055,6 +1091,7 @@ repl5_inc_run(Private_Repl_Protocol *prp)
c16027
                       } else if (rc == UPDATE_YIELD){
c16027
                           dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_YIELD -> STATE_BACKOFF_START");
c16027
                           agmt_set_last_update_status(prp->agmt, 0, 0, "Incremental update succeeded and yielded");
c16027
+                          use_busy_backoff_timer = PR_TRUE;
c16027
                           next_state = STATE_BACKOFF_START;
c16027
                       } else if (rc == UPDATE_TRANSIENT_ERROR){
c16027
                           dev_debug("repl5_inc_run(STATE_SENDING_UPDATES) -> send_updates = UPDATE_TRANSIENT_ERROR -> STATE_BACKOFF_START");
c16027
@@ -1099,6 +1136,7 @@ repl5_inc_run(Private_Repl_Protocol *prp)
c16027
               ruv_destroy(&ruv;; ruv = NULL;
c16027
           }
c16027
           agmt_update_done(prp->agmt, 0);
c16027
+
c16027
           /* If timed out, close the connection after released the replica */
c16027
           release_replica(prp);
c16027
           if (rc == UPDATE_TIMEOUT) {
c16027
@@ -1681,12 +1719,14 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
c16027
 	}
c16027
 	else
c16027
 	{
c16027
-		int finished = 0;
c16027
 		ConnResult replay_crc;
c16027
-		char csn_str[CSN_STRSIZE];
c16027
+		Replica *replica = (Replica*) object_get_data(prp->replica_object);
c16027
 		PRBool subentry_update_needed = PR_FALSE;
c16027
+		PRUint64 release_timeout = replica_get_release_timeout(replica);
c16027
+		char csn_str[CSN_STRSIZE];
c16027
 		int skipped_updates = 0;
c16027
 		int fractional_repl;
c16027
+		int finished = 0;
c16027
 #define FRACTIONAL_SKIPPED_THRESHOLD 100
c16027
 
c16027
 		/* Start the results reading thread */
c16027
@@ -1906,7 +1946,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
c16027
 			}
c16027
 			PR_Lock(rd->lock);
c16027
 			/* See if the result thread has hit a problem */
c16027
-			if (!finished && rd->abort)
c16027
+
c16027
+			if(!finished && rd->abort_time){
c16027
+				time_t current_time = time ( NULL );
c16027
+				if ((current_time - rd->abort_time) >= release_timeout){
c16027
+					rd->result = UPDATE_YIELD;
c16027
+					return_value = UPDATE_YIELD;
c16027
+					finished = 1;
c16027
+					slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
+					                "Aborting send_updates...(%s)\n",
c16027
+					                agmt_get_long_name(rd->prp->agmt));
c16027
+				}
c16027
+			}
c16027
+
c16027
+			if (!finished && rd->abort == ABORT_SESSION)
c16027
 			{
c16027
 				return_value = rd->result;
c16027
 				finished = 1;
c16027
@@ -1916,10 +1969,9 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
c16027
 
c16027
 		if (fractional_repl && subentry_update_needed)
c16027
 		{
c16027
-			Replica *replica;
c16027
 			ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
c16027
 			Slapi_DN *replarea_sdn = NULL;
c16027
-			replica = (Replica*) object_get_data(prp->replica_object);
c16027
+
c16027
 			if (replica)
c16027
 			{
c16027
 				rid = replica_get_rid(replica);
c16027
@@ -1945,7 +1997,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
c16027
 				 * If we already have an error, there is no need to check the
c16027
 				 * async result thread anymore.
c16027
 				 */
c16027
-				if (return_value == UPDATE_NO_MORE_UPDATES)
c16027
+				if (return_value == UPDATE_NO_MORE_UPDATES || return_value == UPDATE_YIELD)
c16027
 				{
c16027
 					/*
c16027
 					 * We need to double check that an error hasn't popped up from
c16027
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
c16027
index bb43b9b..9f38d05 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_plugins.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
c16027
@@ -1077,6 +1077,8 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
c16027
 	r = (Replica*)object_get_data (repl_obj);
c16027
 	PR_ASSERT (r);
c16027
 
c16027
+	replica_check_release_timeout(r, pb);
c16027
+
c16027
 	if (replica_is_flag_set (r, REPLICA_LOG_CHANGES) &&
c16027
 		(cl5GetState () == CL5_STATE_OPEN))
c16027
 	{
c16027
@@ -1365,7 +1367,6 @@ process_postop (Slapi_PBlock *pb)
c16027
 	return rc;
c16027
 }
c16027
 
c16027
-
c16027
 /*
c16027
  * Cancel an operation CSN. This removes it from any CSN pending lists.
c16027
  * This function is called when a previously-generated CSN will not
c16027
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
c16027
index c7cf25f..6d2452a 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_replica.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_replica.c
c16027
@@ -23,8 +23,8 @@
c16027
 
c16027
 #define RUV_SAVE_INTERVAL (30 * 1000) /* 30 seconds */
c16027
 
c16027
-#define REPLICA_RDN				 "cn=replica"
c16027
-#define CHANGELOG_RDN            "cn=legacy changelog"
c16027
+#define REPLICA_RDN "cn=replica"
c16027
+#define CHANGELOG_RDN "cn=legacy changelog"
c16027
 
c16027
 /*
c16027
  * A replica is a locally-held copy of a portion of the DIT.
c16027
@@ -68,6 +68,8 @@ struct replica {
c16027
 	Slapi_Counter *backoff_max;		/* backoff retry maximum */
c16027
 	Slapi_Counter *precise_purging;		/* Enable precise tombstone purging */
c16027
 	PRUint64 agmt_count;			/* Number of agmts */
c16027
+	Slapi_Counter *release_timeout;		/* The amount of time to wait before releasing active replica */
c16027
+	PRUint64 abort_session;			/* Abort the current replica session */
c16027
 };
c16027
 
c16027
 
c16027
@@ -201,6 +203,7 @@ replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation
c16027
 
c16027
 	/* init the slapi_counter/atomic settings */
c16027
 	r->protocol_timeout = slapi_counter_new();
c16027
+	r->release_timeout = slapi_counter_new();
c16027
 	r->backoff_min = slapi_counter_new();
c16027
 	r->backoff_max = slapi_counter_new();
c16027
 
c16027
@@ -408,6 +411,7 @@ replica_destroy(void **arg)
c16027
 	}
c16027
 
c16027
 	slapi_counter_destroy(&r->protocol_timeout);
c16027
+	slapi_counter_destroy(&r->release_timeout);
c16027
 	slapi_counter_destroy(&r->backoff_min);
c16027
 	slapi_counter_destroy(&r->backoff_max);
c16027
 
c16027
@@ -585,8 +589,7 @@ replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid)
c16027
  */
c16027
 PRBool
c16027
 replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
c16027
-							 const char *locking_purl,
c16027
-							 char **current_purl)
c16027
+							 const char *locking_purl, char **current_purl)
c16027
 {
c16027
 	PRBool rval = PR_TRUE;
c16027
 
c16027
@@ -609,6 +612,15 @@ replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opi
c16027
 		{
c16027
 			*current_purl = slapi_ch_strdup(r->locking_purl);
c16027
 		}
c16027
+		if (!(r->repl_state_flags & REPLICA_TOTAL_IN_PROGRESS) &&
c16027
+		    replica_get_release_timeout(r))
c16027
+		{
c16027
+			/*
c16027
+			 * We are not doing a total update, so abort the current session
c16027
+			 * so other replicas can acquire this server.
c16027
+			 */
c16027
+			r->abort_session = ABORT_SESSION;
c16027
+		}
c16027
 	}
c16027
 	else
c16027
 	{
c16027
@@ -617,14 +629,17 @@ replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opi
c16027
 						connid, opid,
c16027
 						slapi_sdn_get_dn(r->repl_root));
c16027
 		r->repl_state_flags |= REPLICA_IN_USE;
c16027
+		r->abort_session = SESSION_ACQUIRED;
c16027
 		if (isInc && *isInc) 
c16027
 		{
c16027
 			r->repl_state_flags |= REPLICA_INCREMENTAL_IN_PROGRESS;
c16027
 		}
c16027
 		else
c16027
 		{ 
c16027
-			/* if connid or opid != 0, it's a total update */
c16027
-			/* Both set to 0 means we're disabling replication */
c16027
+			/*
c16027
+			 * If connid or opid != 0, it's a total update.
c16027
+			 * Both set to 0 means we're disabling replication
c16027
+			 */
c16027
 			if (connid || opid)
c16027
 			{
c16027
 				r->repl_state_flags |= REPLICA_TOTAL_IN_PROGRESS;
c16027
@@ -652,13 +667,13 @@ replica_relinquish_exclusive_access(Replica *r, PRUint64 connid, int opid)
c16027
 	/* check to see if the replica is in use and log a warning if not */
c16027
 	if (!(r->repl_state_flags & REPLICA_IN_USE))
c16027
 	{
c16027
-        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, 
c16027
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
 					"conn=%" NSPRIu64 " op=%d repl=\"%s\": "
c16027
 					"Replica not in use\n",
c16027
 					connid, opid,
c16027
 					slapi_sdn_get_dn(r->repl_root));
c16027
 	} else {
c16027
-		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, 
c16027
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
 					"conn=%" NSPRIu64 " op=%d repl=\"%s\": "
c16027
 					"Released replica held by locking_purl=%s\n",
c16027
 					connid, opid,
c16027
@@ -970,6 +985,24 @@ replica_get_protocol_timeout(Replica *r)
c16027
 	}
c16027
 }
c16027
 
c16027
+PRUint64
c16027
+replica_get_release_timeout(Replica *r)
c16027
+{
c16027
+	if(r){
c16027
+		return slapi_counter_get_value(r->release_timeout);
c16027
+	} else {
c16027
+		return 0;
c16027
+	}
c16027
+}
c16027
+
c16027
+void
c16027
+replica_set_release_timeout(Replica *r, PRUint64 limit)
c16027
+{
c16027
+	if(r){
c16027
+		slapi_counter_set_value(r->release_timeout, limit);
c16027
+	}
c16027
+}
c16027
+
c16027
 void
c16027
 replica_set_protocol_timeout(Replica *r, PRUint64 timeout)
c16027
 {
c16027
@@ -977,6 +1010,7 @@ replica_set_protocol_timeout(Replica *r, PRUint64 timeout)
c16027
 		slapi_counter_set_value(r->protocol_timeout, timeout);
c16027
 	}
c16027
 }
c16027
+
c16027
 void
c16027
 replica_set_groupdn_checkinterval(Replica *r, int interval)
c16027
 {
c16027
@@ -1064,11 +1098,7 @@ replica_get_legacy_purl (const Replica *r)
c16027
     char *purl;
c16027
 
c16027
     replica_lock(r->repl_lock);
c16027
-
c16027
-    PR_ASSERT (r->legacy_consumer);
c16027
-
c16027
     purl = slapi_ch_strdup(r->legacy_purl);
c16027
-
c16027
     replica_unlock(r->repl_lock);
c16027
 
c16027
     return purl;
c16027
@@ -1924,6 +1954,7 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext)
c16027
     int backoff_min;
c16027
     int backoff_max;
c16027
     int ptimeout = 0;
c16027
+    int release_timeout = 0;
c16027
     int rc;
c16027
 
c16027
     PR_ASSERT (r && e);
c16027
@@ -2008,6 +2039,14 @@ _replica_init_from_config (Replica *r, Slapi_Entry *e, char *errortext)
c16027
         slapi_counter_set_value(r->protocol_timeout, ptimeout);
c16027
     }
c16027
 
c16027
+    /* Get the release timeout */
c16027
+    release_timeout = slapi_entry_attr_get_int(e, type_replicaReleaseTimeout);
c16027
+    if(release_timeout <= 0){
c16027
+        slapi_counter_set_value(r->release_timeout, 0);
c16027
+    } else {
c16027
+        slapi_counter_set_value(r->release_timeout, release_timeout);
c16027
+    }
c16027
+
c16027
     /* check for precise tombstone purging */
c16027
     precise_purging = slapi_entry_attr_get_charptr(e, type_replicaPrecisePurge);
c16027
     if(precise_purging){
c16027
@@ -4029,21 +4068,21 @@ replica_disable_replication (Replica *r, Object *r_obj)
c16027
 	ruv_get_first_id_and_purl(repl_ruv, &junkrid, &p_locking_purl);
c16027
 	locking_purl = slapi_ch_strdup(p_locking_purl);
c16027
 	p_locking_purl = NULL;
c16027
-	repl_ruv = NULL;	
c16027
-    while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication",
c16027
+	repl_ruv = NULL;
c16027
+	while (!replica_get_exclusive_access(r, &isInc, 0, 0, "replica_disable_replication",
c16027
 										 &current_purl)) {
c16027
-        if (!isInc) /* already locked, but not by inc update - break */
c16027
-            break;
c16027
-        isInc = PR_FALSE;
c16027
-        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
+		if (!isInc) /* already locked, but not by inc update - break */
c16027
+			break;
c16027
+		isInc = PR_FALSE;
c16027
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
 						"replica_disable_replication: "
c16027
 						"replica %s is already locked by (%s) for incoming "
c16027
 						"incremental update; sleeping 100ms\n",
c16027
-                        slapi_sdn_get_ndn (replica_get_root (r)),
c16027
+						slapi_sdn_get_ndn (replica_get_root (r)),
c16027
 						current_purl ? current_purl : "unknown");
c16027
 		slapi_ch_free_string(&current_purl);
c16027
-        DS_Sleep(PR_MillisecondsToInterval(100));
c16027
-    }
c16027
+		DS_Sleep(PR_MillisecondsToInterval(100));
c16027
+	}
c16027
 
c16027
 	slapi_ch_free_string(&current_purl);
c16027
 	slapi_ch_free_string(&locking_purl);
c16027
@@ -4281,3 +4320,57 @@ replica_decr_agmt_count(Replica *r)
c16027
 		}
c16027
 	}
c16027
 }
c16027
+
c16027
+/*
c16027
+ * Add the "Abort Replication Session" control to the pblock
c16027
+ */
c16027
+static void
c16027
+replica_add_session_abort_control(Slapi_PBlock *pb)
c16027
+{
c16027
+	LDAPControl ctrl = {0};
c16027
+	BerElement *ber;
c16027
+	struct berval *bvp;
c16027
+	int rc;
c16027
+
c16027
+	/* Build the BER payload */
c16027
+	if ( (ber = der_alloc()) == NULL ) {
c16027
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
+			"add_session_abort_control: Failed to create ber\n");
c16027
+		return;
c16027
+	}
c16027
+	rc = ber_printf( ber, "{}");
c16027
+	if (rc != -1) {
c16027
+		rc = ber_flatten( ber, &bvp );
c16027
+	}
c16027
+	ber_free( ber, 1 );
c16027
+	if ( rc == -1 ) {
c16027
+		slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
+			"add_session_abort_control: Failed to flatten ber\n");
c16027
+		return;
c16027
+	}
c16027
+
c16027
+	ctrl.ldctl_oid = slapi_ch_strdup( REPL_ABORT_SESSION_OID );
c16027
+	ctrl.ldctl_value = *bvp;
c16027
+	bvp->bv_val = NULL;
c16027
+	ber_bvfree( bvp );
c16027
+	slapi_pblock_set(pb, SLAPI_ADD_RESCONTROL, &ctrl);
c16027
+
c16027
+	slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
c16027
+		"add_session_abort_control: abort control successfully added to result\n");
c16027
+}
c16027
+
c16027
+/*
c16027
+ * Check if we have exceeded the failed replica acquire limit,
c16027
+ * if so, end the replication session.
c16027
+ */
c16027
+void
c16027
+replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
c16027
+{
c16027
+	replica_lock(r->repl_lock);
c16027
+	if(r->abort_session  == ABORT_SESSION){
c16027
+		/* Need to abort this session (just send the control once) */
c16027
+		replica_add_session_abort_control(pb);
c16027
+		r->abort_session = SESSION_ABORTED;
c16027
+	}
c16027
+	replica_unlock(r->repl_lock);
c16027
+}
c16027
diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c
c16027
index 4d7135c..71b3c92 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_replica_config.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_replica_config.c
c16027
@@ -406,6 +406,11 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry*
c16027
                     if (apply_mods)
c16027
                         replica_set_precise_purging(r, 0);
c16027
                     }
c16027
+                else if (strcasecmp (config_attr, type_replicaReleaseTimeout) == 0 )
c16027
+                {
c16027
+                    if (apply_mods)
c16027
+                        replica_set_release_timeout(r, 0);
c16027
+                    }
c16027
                 else
c16027
                 {
c16027
                     *returncode = LDAP_UNWILLING_TO_PERFORM;
c16027
@@ -592,6 +597,23 @@ replica_config_modify (Slapi_PBlock *pb, Slapi_Entry* entryBefore, Slapi_Entry*
c16027
                         }
c16027
                     }
c16027
                 }
c16027
+                else if (strcasecmp (config_attr, type_replicaReleaseTimeout) == 0 )
c16027
+                {
c16027
+                    if (apply_mods)
c16027
+                    {
c16027
+                        PRUint64 val = atoll(config_attr_value);
c16027
+
c16027
+                        if(val < 0){
c16027
+                            *returncode = LDAP_UNWILLING_TO_PERFORM;
c16027
+                            PR_snprintf (errortext, SLAPI_DSE_RETURNTEXT_SIZE,
c16027
+                                    "attribute %s value (%s) is invalid, must be a number zero or greater.\n",
c16027
+                                    config_attr, config_attr_value);
c16027
+                            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "replica_config_modify: %s\n", errortext);
c16027
+                            break;
c16027
+                        }
c16027
+                        replica_set_release_timeout(r, val);
c16027
+                    }
c16027
+                }
c16027
                 else
c16027
                 {
c16027
                     *returncode = LDAP_UNWILLING_TO_PERFORM;
c16027
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
c16027
index 331f839..8b891fb 100644
c16027
--- a/ldap/servers/plugins/replication/repl_globals.c
c16027
+++ b/ldap/servers/plugins/replication/repl_globals.c
c16027
@@ -87,6 +87,7 @@ const char *type_ruvElementUpdatetime = "nsruvReplicaLastModified";
c16027
 const char *type_replicaCleanRUV = "nsds5ReplicaCleanRUV";
c16027
 const char *type_replicaAbortCleanRUV = "nsds5ReplicaAbortCleanRUV";
c16027
 const char *type_replicaProtocolTimeout = "nsds5ReplicaProtocolTimeout";
c16027
+const char *type_replicaReleaseTimeout = "nsds5ReplicaReleaseTimeout";
c16027
 const char *type_replicaBackoffMin = "nsds5ReplicaBackoffMin";
c16027
 const char *type_replicaBackoffMax = "nsds5ReplicaBackoffMax";
c16027
 const char *type_replicaPrecisePurge = "nsds5ReplicaPreciseTombstonePurging";
c16027
-- 
c16027
2.4.11
c16027