Blame SOURCES/0032-Ticket-47942-DS-hangs-during-online-total-update.patch

f92ce9
From 5b0283a5a5b12c9b2ccee049ddc611decaa07a09 Mon Sep 17 00:00:00 2001
f92ce9
From: "Thierry bordaz (tbordaz)" <tbordaz@redhat.com>
f92ce9
Date: Mon, 15 Dec 2014 15:12:35 +0100
f92ce9
Subject: [PATCH 32/53] Ticket 47942: DS hangs during online total update
f92ce9
f92ce9
Bug Description:
f92ce9
	During incremental or total update of a consumer the replica agreement thread may hang.
f92ce9
	For total update:
f92ce9
	The replica agreement thread that send the entries flowed the consumer that is not
f92ce9
	able to process fast enough the entries. So the TCP connection get full and
f92ce9
	the RA sender sleep on the connection to be able to write the next entries.
f92ce9
f92ce9
	Sleeping on the poll or write the RA.sender holds the connection lock.
f92ce9
f92ce9
	It prevents the replica agreement result thread to read the results from the
f92ce9
	network. So the consumer is also halted because is can no longer send the results.
f92ce9
f92ce9
	For incrementatl update:
f92ce9
	During incremental update, all updates are sent by the RA.sender.
f92ce9
	If many updates need to be send, the supplier may overflow the consumer
f92ce9
	that is very late. This flow of updates can fill the TCP connection
f92ce9
	so that the RA.sender hang when writing the next update.
f92ce9
	On the hang, it holds the connection lock preventing the RA.reader
f92ce9
	to receive the acks. And so the consumer can also hang trying to send the
f92ce9
	acks.
f92ce9
f92ce9
Fix Description:
f92ce9
	For total update there are two parts of the fix:
f92ce9
f92ce9
	To prevent the RA.sender to sleep too long on the poll, the fix (conn_is_available)
f92ce9
	splits the RA.timeout into 1s period.
f92ce9
	If unable to write for 1s, it releases the connection for a short period of time 100ms.
f92ce9
f92ce9
	To prevent the RA.sender to sleep on the write, the fix (check_flow_control_tot_init)
f92ce9
	checks how late is the consumer and if it is too late, it pauses (releasing the connection
f92ce9
	during that time). This second part of the fix is configurable and it may need to be
f92ce9
	tune according to the observed failures.
f92ce9
f92ce9
	For incremental update:
f92ce9
	The fix is to implement a flow control on the RA.sender.
f92ce9
	After each sent update, if the window (update.sent - update.acked) cross the limit
f92ce9
	The RA.sender pause during a configured delay.
f92ce9
	When the RA.sender pause it does not hold the connection lock
f92ce9
f92ce9
	Tuning can be done with nsds5ReplicaFlowControlWindow (how late is the consumer in terms of
f92ce9
	number of entries/updates acknowledged) and nsds5ReplicaFlowControlPause (how long the RA.sender will
f92ce9
	pause if the consumer is too late)
f92ce9
f92ce9
	Logging:
f92ce9
		For total update, the first time the flow control pauses, it logs a message (FATAL level).
f92ce9
		If flow control happened, then at the end of the total update, it also logs the number
f92ce9
		of flow control pauses (FATAL level).
f92ce9
f92ce9
		For incremental update, if flow control happened it logs the number of pause (REPL level).
f92ce9
f92ce9
https://fedorahosted.org/389/ticket/47942
f92ce9
f92ce9
Reviewed by: Mark Reynolds, Rich Megginson, Andrey Ivanov, Noriko Hosoi (many many thanks to all of you !)
f92ce9
f92ce9
Platforms tested: RHEL 7.0, Centos
f92ce9
f92ce9
Flag Day: no
f92ce9
f92ce9
Doc impact: no
f92ce9
---
f92ce9
 ldap/schema/01core389.ldif                         |   4 +-
f92ce9
 ldap/servers/plugins/replication/repl5.h           |  10 ++
f92ce9
 ldap/servers/plugins/replication/repl5_agmt.c      | 160 ++++++++++++++++++++
f92ce9
 ldap/servers/plugins/replication/repl5_agmtlist.c  |  26 ++++
f92ce9
 .../servers/plugins/replication/repl5_connection.c | 163 ++++++++++++++++++++-
f92ce9
 .../plugins/replication/repl5_inc_protocol.c       |  32 +++-
f92ce9
 .../plugins/replication/repl5_prot_private.h       |   2 +
f92ce9
 .../plugins/replication/repl5_tot_protocol.c       |  53 ++++++-
f92ce9
 ldap/servers/plugins/replication/repl_globals.c    |   2 +
f92ce9
 9 files changed, 446 insertions(+), 6 deletions(-)
f92ce9
f92ce9
diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
f92ce9
index c7aec70..c59d762 100644
f92ce9
--- a/ldap/schema/01core389.ldif
f92ce9
+++ b/ldap/schema/01core389.ldif
f92ce9
@@ -302,6 +302,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2306 NAME 'nsslapd-return-default-opattr
f92ce9
 attributeTypes: ( 2.16.840.1.113730.3.1.2307 NAME 'nsslapd-allow-hashed-passwords' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
f92ce9
 attributeTypes: ( 2.16.840.1.113730.3.1.2308 NAME 'nstombstonecsn' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
f92ce9
 attributeTypes: ( 2.16.840.1.113730.3.1.2309 NAME 'nsds5ReplicaPreciseTombstonePurging' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
f92ce9
+attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
f92ce9
+attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
f92ce9
 #
f92ce9
 # objectclasses
f92ce9
 #
f92ce9
@@ -313,7 +315,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d
f92ce9
 objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
 objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top  MUST ( nsDS5ReplicaRoot $  nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
 objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
 objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
 objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) MAY ( nsSaslMapPriority ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
 objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' )
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
f92ce9
index 86c77ce..e2b6209 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5.h
f92ce9
+++ b/ldap/servers/plugins/replication/repl5.h
f92ce9
@@ -170,6 +170,8 @@ extern const char *type_nsds5ReplicaBusyWaitTime;
f92ce9
 extern const char *type_nsds5ReplicaSessionPauseTime;
f92ce9
 extern const char *type_nsds5ReplicaEnabled;
f92ce9
 extern const char *type_nsds5ReplicaStripAttrs;
f92ce9
+extern const char *type_nsds5ReplicaFlowControlWindow;
f92ce9
+extern const char *type_nsds5ReplicaFlowControlPause;
f92ce9
 extern const char *type_replicaProtocolTimeout;
f92ce9
 extern const char *type_replicaBackoffMin;
f92ce9
 extern const char *type_replicaBackoffMax;
f92ce9
@@ -332,6 +334,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra);
f92ce9
 long agmt_get_timeout(const Repl_Agmt *ra);
f92ce9
 long agmt_get_busywaittime(const Repl_Agmt *ra);
f92ce9
 long agmt_get_pausetime(const Repl_Agmt *ra);
f92ce9
+long agmt_get_flowcontrolwindow(const Repl_Agmt *ra);
f92ce9
+long agmt_get_flowcontrolpause(const Repl_Agmt *ra);
f92ce9
 int agmt_start(Repl_Agmt *ra);
f92ce9
 int windows_agmt_start(Repl_Agmt *ra); 
f92ce9
 int agmt_stop(Repl_Agmt *ra);
f92ce9
@@ -352,6 +356,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name);
f92ce9
 int agmt_schedule_in_window_now(const Repl_Agmt *ra);
f92ce9
 int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
f92ce9
 int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
f92ce9
+int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
f92ce9
+int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
f92ce9
 int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
f92ce9
 int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
f92ce9
 int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
f92ce9
@@ -490,6 +496,10 @@ void conn_lock(Repl_Connection *conn);
f92ce9
 void conn_unlock(Repl_Connection *conn);
f92ce9
 void conn_delete_internal_ext(Repl_Connection *conn);
f92ce9
 const char* conn_get_bindmethod(Repl_Connection *conn);
f92ce9
+void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data);
f92ce9
+void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data);
f92ce9
+void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data);
f92ce9
+void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data);
f92ce9
 
f92ce9
 /* In repl5_protocol.c */
f92ce9
 typedef struct repl_protocol Repl_Protocol;
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
f92ce9
index 7c5c37c..91be757 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5_agmt.c
f92ce9
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
f92ce9
@@ -87,6 +87,8 @@
f92ce9
 #include "slapi-plugin.h"
f92ce9
 
f92ce9
 #define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
f92ce9
+#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */
f92ce9
+#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */
f92ce9
 #define STATUS_LEN 1024
f92ce9
 
f92ce9
 struct changecounter {
f92ce9
@@ -145,6 +147,12 @@ typedef struct repl5agmt {
f92ce9
 	int agreement_type;
f92ce9
 	Slapi_Counter *protocol_timeout;
f92ce9
 	char *maxcsn; /* agmt max csn */
f92ce9
+	long flowControlWindow; /* This is the maximum number of entries 
f92ce9
+	                         * sent without acknowledgment
f92ce9
+	                         */
f92ce9
+	long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow
f92ce9
+	                        * This is the duration (in msec) that the RA will pause before sending the next entry
f92ce9
+	                        */
f92ce9
 	Slapi_RWLock *attr_lock; /* RW lock for all the stripped attrs */
f92ce9
 } repl5agmt;
f92ce9
 
f92ce9
@@ -345,6 +353,28 @@ agmt_new_from_entry(Slapi_Entry *e)
f92ce9
 		}
f92ce9
 	}
f92ce9
 
f92ce9
+	/* flow control update window. */
f92ce9
+	ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW;
f92ce9
+	if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0)
f92ce9
+	{
f92ce9
+		Slapi_Value *sval;
f92ce9
+		if (slapi_attr_first_value(sattr, &sval) == 0)
f92ce9
+		{
f92ce9
+			ra->flowControlWindow = slapi_value_get_long(sval);
f92ce9
+		}
f92ce9
+	}
f92ce9
+
f92ce9
+	/* flow control update pause. */
f92ce9
+	ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE;
f92ce9
+	if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0)
f92ce9
+	{
f92ce9
+		Slapi_Value *sval;
f92ce9
+		if (slapi_attr_first_value(sattr, &sval) == 0)
f92ce9
+		{
f92ce9
+			ra->flowControlPause = slapi_value_get_long(sval);
f92ce9
+		}
f92ce9
+	}
f92ce9
+
f92ce9
 	/* DN of entry at root of replicated area */
f92ce9
 	tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
f92ce9
 	if (NULL != tmpstr)
f92ce9
@@ -1014,6 +1044,26 @@ agmt_get_pausetime(const Repl_Agmt *ra)
f92ce9
 	return return_value;
f92ce9
 }
f92ce9
 
f92ce9
+long
f92ce9
+agmt_get_flowcontrolwindow(const Repl_Agmt *ra)
f92ce9
+{
f92ce9
+	long return_value;
f92ce9
+	PR_ASSERT(NULL != ra);
f92ce9
+	PR_Lock(ra->lock);
f92ce9
+	return_value = ra->flowControlWindow;
f92ce9
+	PR_Unlock(ra->lock);
f92ce9
+	return return_value;
f92ce9
+}
f92ce9
+long
f92ce9
+agmt_get_flowcontrolpause(const Repl_Agmt *ra)
f92ce9
+{
f92ce9
+	long return_value;
f92ce9
+	PR_ASSERT(NULL != ra);
f92ce9
+	PR_Lock(ra->lock);
f92ce9
+	return_value = ra->flowControlPause;
f92ce9
+	PR_Unlock(ra->lock);
f92ce9
+	return return_value;
f92ce9
+}
f92ce9
 /*
f92ce9
  * Warning - reference to the long name of the agreement is returned.
f92ce9
  * The long name of an agreement is the DN of the agreement entry,
f92ce9
@@ -1775,6 +1825,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
f92ce9
 	return return_value;
f92ce9
 }
f92ce9
 
f92ce9
+/*
f92ce9
+ * Set or reset the windows of entries sent without acknowledgment.
f92ce9
+ * The window is used during update to determine the number of
f92ce9
+ * entries will be send by the replica agreement without acknowledgment from the consumer
f92ce9
+ *
f92ce9
+ * Returns 0 if window set, or -1 if an error occurred.
f92ce9
+ */
f92ce9
+int
f92ce9
+agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
f92ce9
+{
f92ce9
+	Slapi_Attr *sattr = NULL;
f92ce9
+	int return_value = -1;
f92ce9
+
f92ce9
+	PR_ASSERT(NULL != ra);
f92ce9
+	PR_Lock(ra->lock);
f92ce9
+	if (ra->stop_in_progress)
f92ce9
+	{
f92ce9
+		PR_Unlock(ra->lock);
f92ce9
+		return return_value;
f92ce9
+	}
f92ce9
+
f92ce9
+	slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr);
f92ce9
+	if (NULL != sattr)
f92ce9
+	{
f92ce9
+		Slapi_Value *sval = NULL;
f92ce9
+		slapi_attr_first_value(sattr, &sval);
f92ce9
+		if (NULL != sval)
f92ce9
+		{
f92ce9
+			long tmpval = slapi_value_get_long(sval);
f92ce9
+			if (tmpval >= 0) {
f92ce9
+				ra->flowControlWindow = tmpval;
f92ce9
+				return_value = 0; /* success! */
f92ce9
+			}
f92ce9
+		}
f92ce9
+	}
f92ce9
+	PR_Unlock(ra->lock);
f92ce9
+	if (return_value == 0)
f92ce9
+	{
f92ce9
+		prot_notify_agmt_changed(ra->protocol, ra->long_name);
f92ce9
+	}
f92ce9
+	return return_value;
f92ce9
+}
f92ce9
+
f92ce9
+/*
f92ce9
+ * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window
f92ce9
+ *
f92ce9
+ * Returns 0 if pause set, or -1 if an error occurred.
f92ce9
+ */
f92ce9
+int
f92ce9
+agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
f92ce9
+{
f92ce9
+	Slapi_Attr *sattr = NULL;
f92ce9
+	int return_value = -1;
f92ce9
+
f92ce9
+	PR_ASSERT(NULL != ra);
f92ce9
+	PR_Lock(ra->lock);
f92ce9
+	if (ra->stop_in_progress)
f92ce9
+	{
f92ce9
+		PR_Unlock(ra->lock);
f92ce9
+		return return_value;
f92ce9
+	}
f92ce9
+
f92ce9
+	slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr);
f92ce9
+	if (NULL != sattr)
f92ce9
+	{
f92ce9
+		Slapi_Value *sval = NULL;
f92ce9
+		slapi_attr_first_value(sattr, &sval);
f92ce9
+		if (NULL != sval)
f92ce9
+		{
f92ce9
+			long tmpval = slapi_value_get_long(sval);
f92ce9
+			if (tmpval >= 0) {
f92ce9
+				ra->flowControlPause = tmpval;
f92ce9
+				return_value = 0; /* success! */
f92ce9
+			}
f92ce9
+		}
f92ce9
+	}
f92ce9
+	PR_Unlock(ra->lock);
f92ce9
+	if (return_value == 0)
f92ce9
+	{
f92ce9
+		prot_notify_agmt_changed(ra->protocol, ra->long_name);
f92ce9
+	}
f92ce9
+	return return_value;
f92ce9
+}
f92ce9
+
f92ce9
 int
f92ce9
 agmt_set_timeout(Repl_Agmt *ra, long timeout)
f92ce9
 {
f92ce9
@@ -1788,6 +1922,32 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout)
f92ce9
 
f92ce9
     return 0;
f92ce9
 }
f92ce9
+int
f92ce9
+agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window)
f92ce9
+{
f92ce9
+    PR_Lock(ra->lock);
f92ce9
+    if (ra->stop_in_progress){
f92ce9
+        PR_Unlock(ra->lock);
f92ce9
+        return -1;
f92ce9
+    }
f92ce9
+    ra->flowControlWindow = window;
f92ce9
+    PR_Unlock(ra->lock);
f92ce9
+
f92ce9
+    return 0;
f92ce9
+}
f92ce9
+int
f92ce9
+agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause)
f92ce9
+{
f92ce9
+    PR_Lock(ra->lock);
f92ce9
+    if (ra->stop_in_progress){
f92ce9
+        PR_Unlock(ra->lock);
f92ce9
+        return -1;
f92ce9
+    }
f92ce9
+    ra->flowControlPause = pause;
f92ce9
+    PR_Unlock(ra->lock);
f92ce9
+
f92ce9
+    return 0;
f92ce9
+}
f92ce9
 
f92ce9
 /*
f92ce9
  * Set or reset the busywaittime
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c
f92ce9
index 8a70055..5eead07 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5_agmtlist.c
f92ce9
+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c
f92ce9
@@ -330,6 +330,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry
f92ce9
             }
f92ce9
 		}
f92ce9
 		else if (slapi_attr_types_equivalent(mods[i]->mod_type,
f92ce9
+					type_nsds5ReplicaFlowControlWindow))
f92ce9
+		{
f92ce9
+			/* New replica timeout */
f92ce9
+			if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0)
f92ce9
+			{
f92ce9
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " 
f92ce9
+						"failed to update the flow control window for agreement %s\n",
f92ce9
+						agmt_get_long_name(agmt));	
f92ce9
+				*returncode = LDAP_OPERATIONS_ERROR;
f92ce9
+				rc = SLAPI_DSE_CALLBACK_ERROR;
f92ce9
+			}
f92ce9
+		}
f92ce9
+		else if (slapi_attr_types_equivalent(mods[i]->mod_type,
f92ce9
+					type_nsds5ReplicaFlowControlPause))
f92ce9
+		{
f92ce9
+			/* New replica timeout */
f92ce9
+			if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0)
f92ce9
+			{
f92ce9
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " 
f92ce9
+						"failed to update the flow control pause for agreement %s\n",
f92ce9
+						agmt_get_long_name(agmt));	
f92ce9
+				*returncode = LDAP_OPERATIONS_ERROR;
f92ce9
+				rc = SLAPI_DSE_CALLBACK_ERROR;
f92ce9
+			}
f92ce9
+		}
f92ce9
+		else if (slapi_attr_types_equivalent(mods[i]->mod_type,
f92ce9
 					type_nsds5ReplicaBusyWaitTime))
f92ce9
 		{
f92ce9
 			/* New replica busywaittime */
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
f92ce9
index c004bfb..2971025 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5_connection.c
f92ce9
+++ b/ldap/servers/plugins/replication/repl5_connection.c
f92ce9
@@ -52,6 +52,7 @@ replica locked. Seems like right thing to do.
f92ce9
 */
f92ce9
 
f92ce9
 #include "repl5.h"
f92ce9
+#include "repl5_prot_private.h"
f92ce9
 #include "slapi-private.h"
f92ce9
 #if defined(USE_OPENLDAP)
f92ce9
 #include "ldap.h"
f92ce9
@@ -91,6 +92,7 @@ typedef struct repl_connection
f92ce9
 	struct timeval timeout;
f92ce9
 	int flag_agmt_changed;
f92ce9
 	char *plain;
f92ce9
+	void *tot_init_callback; /* Used during total update to do flow control */
f92ce9
 } repl_connection;
f92ce9
 
f92ce9
 /* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */
f92ce9
@@ -274,6 +276,32 @@ conn_delete(Repl_Connection *conn)
f92ce9
 	PR_Unlock(conn->lock);
f92ce9
 }
f92ce9
 
f92ce9
+void
f92ce9
+conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data)
f92ce9
+{
f92ce9
+    conn->tot_init_callback = (void *) cb_data;
f92ce9
+}
f92ce9
+void
f92ce9
+conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data)
f92ce9
+{
f92ce9
+	PR_Lock(conn->lock);
f92ce9
+	conn_set_tot_update_cb_nolock(conn, cb_data);
f92ce9
+	PR_Unlock(conn->lock);
f92ce9
+}
f92ce9
+
f92ce9
+void
f92ce9
+conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data)
f92ce9
+{
f92ce9
+    *cb_data = (void *) conn->tot_init_callback;
f92ce9
+}
f92ce9
+void
f92ce9
+conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data)
f92ce9
+{
f92ce9
+	PR_Lock(conn->lock);
f92ce9
+	conn_get_tot_update_cb_nolock(conn, cb_data);
f92ce9
+	PR_Unlock(conn->lock);
f92ce9
+}
f92ce9
+
f92ce9
 /*
f92ce9
  * Return the last operation type processed by the connection
f92ce9
  * object, and the LDAP error encountered.
f92ce9
@@ -640,6 +668,131 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout)
f92ce9
 }
f92ce9
 #endif /* ! USE_OPENLDAP */
f92ce9
 
f92ce9
+/* 
f92ce9
+ * During a total update, this function checks how much entries
f92ce9
+ * have been sent to the consumer without having received their acknowledgment.
f92ce9
+ * Basically it checks how late is the consumer.
f92ce9
+ * 
f92ce9
+ * If the consumer is too late, it pause the RA.sender (releasing the lock) to
f92ce9
+ * let the consumer to catch up and RA.reader to receive the acknowledgments.
f92ce9
+ * 
f92ce9
+ * Caller must hold conn->lock
f92ce9
+ */
f92ce9
+static void
f92ce9
+check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid)
f92ce9
+{
f92ce9
+    int rcv_msgid;
f92ce9
+    int once;
f92ce9
+    
f92ce9
+    if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) {
f92ce9
+        /* We are sending entries part of the total update of a consumer
f92ce9
+         * Wait a bit if the consumer needs to catchup from the current sent entries
f92ce9
+         */
f92ce9
+        rcv_msgid = repl5_tot_last_rcv_msgid(conn);
f92ce9
+        if (rcv_msgid == -1) {
f92ce9
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
f92ce9
+                            "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n",
f92ce9
+                            agmt_get_long_name(conn->agmt),
f92ce9
+                            sent_msgid);
f92ce9
+        } else if (sent_msgid < rcv_msgid) {
f92ce9
+            slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
f92ce9
+                            "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n",
f92ce9
+                            agmt_get_long_name(conn->agmt),
f92ce9
+                            sent_msgid,
f92ce9
+                            rcv_msgid);
f92ce9
+        } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) {
f92ce9
+            int totalUpdatePause;
f92ce9
+
f92ce9
+            totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt);
f92ce9
+            if (totalUpdatePause) {
f92ce9
+                /* The consumer is late. Last sent entry compare to last acknowledged entry 
f92ce9
+                 * overpass the allowed limit (flowcontrolwindow)
f92ce9
+                 * Give some time to the consumer to catch up
f92ce9
+                 */
f92ce9
+                once = repl5_tot_flowcontrol_detection(conn, 1);
f92ce9
+                PR_Unlock(conn->lock);
f92ce9
+                if (once == 1) {
f92ce9
+                    /* This is the first time we hit total update flow control.
f92ce9
+                     * Log it at least once to inform administrator there is
f92ce9
+                     * a potential configuration issue here
f92ce9
+                     */
f92ce9
+                    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
f92ce9
+                            "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n"
f92ce9
+                            "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n",
f92ce9
+                            agmt_get_long_name(conn->agmt),
f92ce9
+                            totalUpdatePause,
f92ce9
+                            sent_msgid,
f92ce9
+                            rcv_msgid,
f92ce9
+                            type_nsds5ReplicaFlowControlPause,
f92ce9
+                            type_nsds5ReplicaFlowControlWindow);
f92ce9
+                }
f92ce9
+                DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause));
f92ce9
+                PR_Lock(conn->lock);
f92ce9
+            }
f92ce9
+        }
f92ce9
+    }
f92ce9
+    
f92ce9
+}
f92ce9
+/* 
f92ce9
+ * Test if the connection is available to do a write.
f92ce9
+ * This function is doing a periodic polling of the connection.
f92ce9
+ * If the polling times out:
f92ce9
+ *  - it releases the connection lock (to let other thread ,i.e. 
f92ce9
+ *    replication result thread, the opportunity to use the connection)
f92ce9
+ *  - Sleeps for a short period (100ms)
f92ce9
+ *  - acquires the connection lock
f92ce9
+ * 
f92ce9
+ * It loops until
f92ce9
+ *  - it is available
f92ce9
+ *  - exceeds RA complete timeout
f92ce9
+ *  - server is shutdown
f92ce9
+ *  - connection is disconnected (Disable, stop, delete the RA
f92ce9
+ *    'terminate' the replication protocol and disconnect the connection)
f92ce9
+ * 
f92ce9
+ * Return:
f92ce9
+ *   - CONN_OPERATION_SUCCESS if the connection is available
f92ce9
+ *   - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout
f92ce9
+ *   - CONN_NOT_CONNECTED if the replication connection state is disconnected
f92ce9
+ *   - other ConnResult
f92ce9
+ *
f92ce9
+ * Caller must hold conn->Lock. At the exit, conn->lock is held
f92ce9
+ */
f92ce9
+static ConnResult
f92ce9
+conn_is_available(Repl_Connection *conn)
f92ce9
+{
f92ce9
+    time_t poll_timeout_sec = 1; /* Polling for 1sec */
f92ce9
+    time_t yield_delay_msec = 100; /* Delay to wait */
f92ce9
+    time_t start_time = time( NULL );
f92ce9
+    time_t time_now;
f92ce9
+    ConnResult return_value = CONN_OPERATION_SUCCESS;
f92ce9
+    
f92ce9
+    while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) {
f92ce9
+        return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec));
f92ce9
+        if (return_value == CONN_TIMEOUT) {
f92ce9
+            /* in case of timeout we return CONN_TIMEOUT only
f92ce9
+             * if the RA.timeout is exceeded
f92ce9
+             */
f92ce9
+            time_now = time(NULL);
f92ce9
+            if (conn->timeout.tv_sec <= (time_now - start_time)) {
f92ce9
+                break;
f92ce9
+            } else {
f92ce9
+                /* Else give connection to others threads */
f92ce9
+                PR_Unlock(conn->lock);
f92ce9
+                slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
f92ce9
+                        "%s: perform_operation transient timeout. retry)\n",
f92ce9
+                        agmt_get_long_name(conn->agmt));
f92ce9
+                DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec));
f92ce9
+                PR_Lock(conn->lock);
f92ce9
+            }
f92ce9
+        } else {
f92ce9
+            break;
f92ce9
+        }
f92ce9
+    }
f92ce9
+    if (conn->state == STATE_DISCONNECTED) {
f92ce9
+        return_value = CONN_NOT_CONNECTED;
f92ce9
+    }
f92ce9
+    return return_value;
f92ce9
+}
f92ce9
 /*
f92ce9
  * Common code to send an LDAPv3 operation and collect the result.
f92ce9
  * Return values:
f92ce9
@@ -683,10 +836,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
f92ce9
 
f92ce9
 		Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel);
f92ce9
 
f92ce9
-		return_value = see_if_write_available(
f92ce9
-			conn, PR_SecondsToInterval(conn->timeout.tv_sec));
f92ce9
+		return_value = conn_is_available(conn);
f92ce9
 		if (return_value != CONN_OPERATION_SUCCESS) {
f92ce9
 			PR_Unlock(conn->lock);
f92ce9
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
f92ce9
+							"%s: perform_operation connection is not available (%d)\n",
f92ce9
+							agmt_get_long_name(conn->agmt),
f92ce9
+							return_value);
f92ce9
 			return return_value;
f92ce9
 		}
f92ce9
 		conn->last_operation = optype;
f92ce9
@@ -758,6 +914,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
f92ce9
 		 */
f92ce9
 		return_value = CONN_NOT_CONNECTED;
f92ce9
 	}
f92ce9
+    
f92ce9
+	check_flow_control_tot_init(conn, optype, extop_oid, msgid);
f92ce9
+        
f92ce9
 	PR_Unlock(conn->lock); /* release the lock */
f92ce9
 	if (message_id)
f92ce9
 	{
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
f92ce9
index 3bb68e7..b867fc4 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
f92ce9
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
f92ce9
@@ -108,6 +108,7 @@ typedef struct result_data
f92ce9
 	int stop_result_thread; /* Flag used to tell the result thread to exit */
f92ce9
 	int last_message_id_sent;
f92ce9
 	int last_message_id_received;
f92ce9
+	int flowcontrol_detection;
f92ce9
 	int result; /* The UPDATE_TRANSIENT_ERROR etc */
f92ce9
 } result_data;
f92ce9
 
f92ce9
@@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd)
f92ce9
 	return retval;
f92ce9
 }
f92ce9
 
f92ce9
+/* The interest of this routine is to give time to the consumer
f92ce9
+ * to apply the sent updates and return the acks.
f92ce9
+ * So the caller should not hold the replication connection lock
f92ce9
+ * to let the RA.reader receives the acks.
f92ce9
+ */
f92ce9
+static void
f92ce9
+repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
f92ce9
+{
f92ce9
+    PR_Lock(rd->lock);
f92ce9
+    if ((rd->last_message_id_received <= rd->last_message_id_sent) &&
f92ce9
+        ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) {
f92ce9
+        rd->flowcontrol_detection++;
f92ce9
+        DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt)));
f92ce9
+    }
f92ce9
+    PR_Unlock(rd->lock);
f92ce9
+}
f92ce9
+
f92ce9
 static void
f92ce9
 repl5_inc_waitfor_async_results(result_data *rd)
f92ce9
 {
f92ce9
@@ -1683,7 +1701,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
f92ce9
 	{
f92ce9
 		int finished = 0;
f92ce9
 		ConnResult replay_crc;
f92ce9
-        char csn_str[CSN_STRSIZE];
f92ce9
+		char csn_str[CSN_STRSIZE];
f92ce9
 
f92ce9
 		/* Start the results reading thread */
f92ce9
 		rd = repl5_inc_rd_new(prp);
f92ce9
@@ -1818,6 +1836,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
f92ce9
 						sop->replica_id = replica_id;
f92ce9
 						PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid));
f92ce9
 						repl5_int_push_operation(rd,sop);
f92ce9
+						repl5_inc_flow_control_results(prp->agmt, rd);
f92ce9
 					} else {
f92ce9
 						slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
f92ce9
 							"%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n",
f92ce9
@@ -1906,6 +1925,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
f92ce9
 			}
f92ce9
 			*num_changes_sent = rd->num_changes_sent;
f92ce9
 		}
f92ce9
+		PR_Lock(rd->lock);
f92ce9
+		if (rd->flowcontrol_detection) {
f92ce9
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
f92ce9
+					"%s: Incremental update flow control triggered %d times\n"
f92ce9
+					"You may increase %s and/or decrease %s in the replica agreement configuration\n",
f92ce9
+					agmt_get_long_name(prp->agmt),
f92ce9
+					rd->flowcontrol_detection,
f92ce9
+					type_nsds5ReplicaFlowControlPause,
f92ce9
+					type_nsds5ReplicaFlowControlWindow);             
f92ce9
+		}
f92ce9
+		PR_Unlock(rd->lock);
f92ce9
 		repl5_inc_rd_destroy(&rd);
f92ce9
 
f92ce9
 		cl5_operation_parameters_done ( entry.op );
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h
f92ce9
index 586e1eb..1b1c00b 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5_prot_private.h
f92ce9
+++ b/ldap/servers/plugins/replication/repl5_prot_private.h
f92ce9
@@ -79,6 +79,8 @@ typedef struct private_repl_protocol
f92ce9
 
f92ce9
 extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new();
f92ce9
 extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new();
f92ce9
+extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn);
f92ce9
+extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment);
f92ce9
 extern Private_Repl_Protocol *Windows_Inc_Protocol_new();
f92ce9
 extern Private_Repl_Protocol *Windows_Tot_Protocol_new();
f92ce9
 
f92ce9
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
f92ce9
index d4f0fcc..adadd44 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
f92ce9
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
f92ce9
@@ -82,6 +82,7 @@ typedef struct callback_data
f92ce9
 	int stop_result_thread; /* Flag used to tell the result thread to exit */
f92ce9
 	int last_message_id_sent;
f92ce9
 	int last_message_id_received;
f92ce9
+	int flowcontrol_detection;
f92ce9
 } callback_data;
f92ce9
 
f92ce9
 /* 
f92ce9
@@ -428,13 +429,19 @@ repl5_tot_run(Private_Repl_Protocol *prp)
f92ce9
                                   LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, 
f92ce9
                                   repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
f92ce9
 
f92ce9
-    cb_data.prp = prp;
f92ce9
-    cb_data.rc = 0;
f92ce9
+	cb_data.prp = prp;
f92ce9
+	cb_data.rc = 0;
f92ce9
 	cb_data.num_entries = 0UL;
f92ce9
 	cb_data.sleep_on_busy = 0UL;
f92ce9
 	cb_data.last_busy = current_time ();
f92ce9
+	cb_data.flowcontrol_detection = 0;
f92ce9
 	cb_data.lock = PR_NewLock();
f92ce9
 
f92ce9
+	/* This allows during perform_operation to check the callback data
f92ce9
+	 * especially to do flow contol on delta send msgid / recv msgid
f92ce9
+	 */
f92ce9
+	conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
f92ce9
+    
f92ce9
 	/* Before we get started on sending entries to the replica, we need to 
f92ce9
 	 * setup things for async propagation: 
f92ce9
 	 * 1. Create a thread that will read the LDAP results from the connection.
f92ce9
@@ -506,6 +513,17 @@ repl5_tot_run(Private_Repl_Protocol *prp)
f92ce9
 done:
f92ce9
 	slapi_sdn_free(&area_sdn);
f92ce9
 	slapi_ch_free_string(&hostname);
f92ce9
+	if (cb_data.flowcontrol_detection > 1)
f92ce9
+	{
f92ce9
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
f92ce9
+			"%s: Total update flow control triggered %d times\n"
f92ce9
+			"You may increase %s and/or decrease %s in the replica agreement configuration\n",
f92ce9
+			agmt_get_long_name(prp->agmt),
f92ce9
+			cb_data.flowcontrol_detection,
f92ce9
+			type_nsds5ReplicaFlowControlPause,
f92ce9
+			type_nsds5ReplicaFlowControlWindow);
f92ce9
+	}
f92ce9
+	conn_set_tot_update_cb(prp->conn, NULL);
f92ce9
 	if (cb_data.lock) 
f92ce9
 	{
f92ce9
 		PR_DestroyLock(cb_data.lock);
f92ce9
@@ -645,6 +663,37 @@ void get_result (int rc, void *cb_data)
f92ce9
     ((callback_data*)cb_data)->rc = rc;
f92ce9
 }
f92ce9
 
f92ce9
+/* Call must hold the connection lock */
f92ce9
+int
f92ce9
+repl5_tot_last_rcv_msgid(Repl_Connection *conn) 
f92ce9
+{
f92ce9
+    struct callback_data *cb_data;
f92ce9
+    
f92ce9
+    conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
f92ce9
+    if (cb_data == NULL) {
f92ce9
+        return -1;
f92ce9
+    } else {
f92ce9
+        return cb_data->last_message_id_received;
f92ce9
+    }
f92ce9
+}
f92ce9
+
f92ce9
+/* Increase the flowcontrol counter
f92ce9
+ * Call must hold the connection lock
f92ce9
+ */
f92ce9
+int
f92ce9
+repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment) 
f92ce9
+{
f92ce9
+    struct callback_data *cb_data;
f92ce9
+    
f92ce9
+    conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
f92ce9
+    if (cb_data == NULL) {
f92ce9
+        return -1;
f92ce9
+    } else {
f92ce9
+        cb_data->flowcontrol_detection += increment;
f92ce9
+        return cb_data->flowcontrol_detection;
f92ce9
+    }
f92ce9
+}
f92ce9
+
f92ce9
 static 
f92ce9
 int send_entry (Slapi_Entry *e, void *cb_data)
f92ce9
 {
f92ce9
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
f92ce9
index 5609def..e2157fa 100644
f92ce9
--- a/ldap/servers/plugins/replication/repl_globals.c
f92ce9
+++ b/ldap/servers/plugins/replication/repl_globals.c
f92ce9
@@ -139,6 +139,8 @@ const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime";
f92ce9
 const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime";
f92ce9
 const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled";
f92ce9
 const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs";
f92ce9
+const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow";
f92ce9
+const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause";
f92ce9
 
f92ce9
 /* windows sync specific attributes */
f92ce9
 const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree";
f92ce9
-- 
f92ce9
1.9.3
f92ce9