Blob Blame Raw
From 5b0283a5a5b12c9b2ccee049ddc611decaa07a09 Mon Sep 17 00:00:00 2001
From: "Thierry bordaz (tbordaz)" <tbordaz@redhat.com>
Date: Mon, 15 Dec 2014 15:12:35 +0100
Subject: [PATCH 32/53] Ticket 47942: DS hangs during online total update

Bug Description:
	During incremental or total update of a consumer the replica agreement thread may hang.
	For total update:
	The replica agreement thread that send the entries flowed the consumer that is not
	able to process fast enough the entries. So the TCP connection get full and
	the RA sender sleep on the connection to be able to write the next entries.

	Sleeping on the poll or write the RA.sender holds the connection lock.

	It prevents the replica agreement result thread to read the results from the
	network. So the consumer is also halted because is can no longer send the results.

	For incrementatl update:
	During incremental update, all updates are sent by the RA.sender.
	If many updates need to be send, the supplier may overflow the consumer
	that is very late. This flow of updates can fill the TCP connection
	so that the RA.sender hang when writing the next update.
	On the hang, it holds the connection lock preventing the RA.reader
	to receive the acks. And so the consumer can also hang trying to send the
	acks.

Fix Description:
	For total update there are two parts of the fix:

	To prevent the RA.sender to sleep too long on the poll, the fix (conn_is_available)
	splits the RA.timeout into 1s period.
	If unable to write for 1s, it releases the connection for a short period of time 100ms.

	To prevent the RA.sender to sleep on the write, the fix (check_flow_control_tot_init)
	checks how late is the consumer and if it is too late, it pauses (releasing the connection
	during that time). This second part of the fix is configurable and it may need to be
	tune according to the observed failures.

	For incremental update:
	The fix is to implement a flow control on the RA.sender.
	After each sent update, if the window (update.sent - update.acked) cross the limit
	The RA.sender pause during a configured delay.
	When the RA.sender pause it does not hold the connection lock

	Tuning can be done with nsds5ReplicaFlowControlWindow (how late is the consumer in terms of
	number of entries/updates acknowledged) and nsds5ReplicaFlowControlPause (how long the RA.sender will
	pause if the consumer is too late)

	Logging:
		For total update, the first time the flow control pauses, it logs a message (FATAL level).
		If flow control happened, then at the end of the total update, it also logs the number
		of flow control pauses (FATAL level).

		For incremental update, if flow control happened it logs the number of pause (REPL level).

https://fedorahosted.org/389/ticket/47942

Reviewed by: Mark Reynolds, Rich Megginson, Andrey Ivanov, Noriko Hosoi (many many thanks to all of you !)

Platforms tested: RHEL 7.0, Centos

Flag Day: no

Doc impact: no
---
 ldap/schema/01core389.ldif                         |   4 +-
 ldap/servers/plugins/replication/repl5.h           |  10 ++
 ldap/servers/plugins/replication/repl5_agmt.c      | 160 ++++++++++++++++++++
 ldap/servers/plugins/replication/repl5_agmtlist.c  |  26 ++++
 .../servers/plugins/replication/repl5_connection.c | 163 ++++++++++++++++++++-
 .../plugins/replication/repl5_inc_protocol.c       |  32 +++-
 .../plugins/replication/repl5_prot_private.h       |   2 +
 .../plugins/replication/repl5_tot_protocol.c       |  53 ++++++-
 ldap/servers/plugins/replication/repl_globals.c    |   2 +
 9 files changed, 446 insertions(+), 6 deletions(-)

diff --git a/ldap/schema/01core389.ldif b/ldap/schema/01core389.ldif
index c7aec70..c59d762 100644
--- a/ldap/schema/01core389.ldif
+++ b/ldap/schema/01core389.ldif
@@ -302,6 +302,8 @@ attributeTypes: ( 2.16.840.1.113730.3.1.2306 NAME 'nsslapd-return-default-opattr
 attributeTypes: ( 2.16.840.1.113730.3.1.2307 NAME 'nsslapd-allow-hashed-passwords' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
 attributeTypes: ( 2.16.840.1.113730.3.1.2308 NAME 'nstombstonecsn' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
 attributeTypes: ( 2.16.840.1.113730.3.1.2309 NAME 'nsds5ReplicaPreciseTombstonePurging' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.15 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2310 NAME 'nsds5ReplicaFlowControlWindow' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
+attributeTypes: ( 2.16.840.1.113730.3.1.2311 NAME 'nsds5ReplicaFlowControlPause' DESC 'Netscape defined attribute type' SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE X-ORIGIN 'Netscape Directory Server' )
 #
 # objectclasses
 #
@@ -313,7 +315,7 @@ objectClasses: ( 2.16.840.1.113730.3.2.110 NAME 'nsMappingTree' DESC 'Netscape d
 objectClasses: ( 2.16.840.1.113730.3.2.104 NAME 'nsContainer' DESC 'Netscape defined objectclass' SUP top  MUST ( CN ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.108 NAME 'nsDS5Replica' DESC 'Netscape defined objectclass' SUP top  MUST ( nsDS5ReplicaRoot $  nsDS5ReplicaId ) MAY (cn $ nsds5ReplicaPreciseTombstonePurging $ nsds5ReplicaCleanRUV $ nsds5ReplicaAbortCleanRUV $ nsDS5ReplicaType $ nsDS5ReplicaBindDN $ nsState $ nsDS5ReplicaName $ nsDS5Flags $ nsDS5Task $ nsDS5ReplicaReferral $ nsDS5ReplicaAutoReferral $ nsds5ReplicaPurgeDelay $ nsds5ReplicaTombstonePurgeInterval $ nsds5ReplicaChangeCount $ nsds5ReplicaLegacyConsumer $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaBackoffMin $ nsds5ReplicaBackoffMax ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.113 NAME 'nsTombstone' DESC 'Netscape defined objectclass' SUP top MAY ( nstombstonecsn $ nsParentUniqueId $ nscpEntryDN ) X-ORIGIN 'Netscape Directory Server' )
-objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout ) X-ORIGIN 'Netscape Directory Server' )
+objectClasses: ( 2.16.840.1.113730.3.2.103 NAME 'nsDS5ReplicationAgreement' DESC 'Netscape defined objectclass' SUP top MUST ( cn ) MAY ( nsds5ReplicaCleanRUVNotified $ nsDS5ReplicaHost $ nsDS5ReplicaPort $ nsDS5ReplicaTransportInfo $ nsDS5ReplicaBindDN $ nsDS5ReplicaCredentials $ nsDS5ReplicaBindMethod $ nsDS5ReplicaRoot $ nsDS5ReplicatedAttributeList $ nsDS5ReplicatedAttributeListTotal $ nsDS5ReplicaUpdateSchedule $ nsds5BeginReplicaRefresh $ description $ nsds50ruv $ nsruvReplicaLastModified $ nsds5ReplicaTimeout $ nsds5replicaChangesSentSinceStartup $ nsds5replicaLastUpdateEnd $ nsds5replicaLastUpdateStart $ nsds5replicaLastUpdateStatus $ nsds5replicaUpdateInProgress $ nsds5replicaLastInitEnd $ nsds5ReplicaEnabled $ nsds5replicaLastInitStart $ nsds5replicaLastInitStatus $ nsds5debugreplicatimeout $ nsds5replicaBusyWaitTime $ nsds5ReplicaStripAttrs $ nsds5replicaSessionPauseTime $ nsds5ReplicaProtocolTimeout $ nsds5ReplicaFlowControlWindow $ nsds5ReplicaFlowControlPause ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.39 NAME 'nsslapdConfig' DESC 'Netscape defined objectclass' SUP top MAY ( cn ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.317 NAME 'nsSaslMapping' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSaslMapRegexString $ nsSaslMapBaseDNTemplate $ nsSaslMapFilterTemplate ) MAY ( nsSaslMapPriority ) X-ORIGIN 'Netscape Directory Server' )
 objectClasses: ( 2.16.840.1.113730.3.2.43 NAME 'nsSNMP' DESC 'Netscape defined objectclass' SUP top MUST ( cn $ nsSNMPEnabled ) MAY ( nsSNMPOrganization $ nsSNMPLocation $ nsSNMPContact $ nsSNMPDescription $ nsSNMPName $ nsSNMPMasterHost $ nsSNMPMasterPort ) X-ORIGIN 'Netscape Directory Server' )
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 86c77ce..e2b6209 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -170,6 +170,8 @@ extern const char *type_nsds5ReplicaBusyWaitTime;
 extern const char *type_nsds5ReplicaSessionPauseTime;
 extern const char *type_nsds5ReplicaEnabled;
 extern const char *type_nsds5ReplicaStripAttrs;
+extern const char *type_nsds5ReplicaFlowControlWindow;
+extern const char *type_nsds5ReplicaFlowControlPause;
 extern const char *type_replicaProtocolTimeout;
 extern const char *type_replicaBackoffMin;
 extern const char *type_replicaBackoffMax;
@@ -332,6 +334,8 @@ int agmt_get_auto_initialize(const Repl_Agmt *ra);
 long agmt_get_timeout(const Repl_Agmt *ra);
 long agmt_get_busywaittime(const Repl_Agmt *ra);
 long agmt_get_pausetime(const Repl_Agmt *ra);
+long agmt_get_flowcontrolwindow(const Repl_Agmt *ra);
+long agmt_get_flowcontrolpause(const Repl_Agmt *ra);
 int agmt_start(Repl_Agmt *ra);
 int windows_agmt_start(Repl_Agmt *ra); 
 int agmt_stop(Repl_Agmt *ra);
@@ -352,6 +356,8 @@ int agmt_replarea_matches(const Repl_Agmt *ra, const Slapi_DN *name);
 int agmt_schedule_in_window_now(const Repl_Agmt *ra);
 int agmt_set_schedule_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
 int agmt_set_timeout_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
+int agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
+int agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e);
 int agmt_set_busywaittime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
 int agmt_set_pausetime_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
 int agmt_set_credentials_from_entry( Repl_Agmt *ra, const Slapi_Entry *e );
@@ -490,6 +496,10 @@ void conn_lock(Repl_Connection *conn);
 void conn_unlock(Repl_Connection *conn);
 void conn_delete_internal_ext(Repl_Connection *conn);
 const char* conn_get_bindmethod(Repl_Connection *conn);
+void conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data);
+void conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data);
+void conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data);
+void conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data);
 
 /* In repl5_protocol.c */
 typedef struct repl_protocol Repl_Protocol;
diff --git a/ldap/servers/plugins/replication/repl5_agmt.c b/ldap/servers/plugins/replication/repl5_agmt.c
index 7c5c37c..91be757 100644
--- a/ldap/servers/plugins/replication/repl5_agmt.c
+++ b/ldap/servers/plugins/replication/repl5_agmt.c
@@ -87,6 +87,8 @@
 #include "slapi-plugin.h"
 
 #define DEFAULT_TIMEOUT 600 /* (seconds) default outbound LDAP connection */
+#define DEFAULT_FLOWCONTROL_WINDOW 1000 /* #entries sent without acknowledgment */
+#define DEFAULT_FLOWCONTROL_PAUSE 2000 /* msec of pause when #entries sent witout acknowledgment */
 #define STATUS_LEN 1024
 
 struct changecounter {
@@ -145,6 +147,12 @@ typedef struct repl5agmt {
 	int agreement_type;
 	Slapi_Counter *protocol_timeout;
 	char *maxcsn; /* agmt max csn */
+	long flowControlWindow; /* This is the maximum number of entries 
+	                         * sent without acknowledgment
+	                         */
+	long flowControlPause; /* When nb of not acknowledged entries overpass totalUpdateWindow
+	                        * This is the duration (in msec) that the RA will pause before sending the next entry
+	                        */
 	Slapi_RWLock *attr_lock; /* RW lock for all the stripped attrs */
 } repl5agmt;
 
@@ -345,6 +353,28 @@ agmt_new_from_entry(Slapi_Entry *e)
 		}
 	}
 
+	/* flow control update window. */
+	ra->flowControlWindow = DEFAULT_FLOWCONTROL_WINDOW;
+	if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr) == 0)
+	{
+		Slapi_Value *sval;
+		if (slapi_attr_first_value(sattr, &sval) == 0)
+		{
+			ra->flowControlWindow = slapi_value_get_long(sval);
+		}
+	}
+
+	/* flow control update pause. */
+	ra->flowControlPause = DEFAULT_FLOWCONTROL_PAUSE;
+	if (slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr) == 0)
+	{
+		Slapi_Value *sval;
+		if (slapi_attr_first_value(sattr, &sval) == 0)
+		{
+			ra->flowControlPause = slapi_value_get_long(sval);
+		}
+	}
+
 	/* DN of entry at root of replicated area */
 	tmpstr = slapi_entry_attr_get_charptr(e, type_nsds5ReplicaRoot);
 	if (NULL != tmpstr)
@@ -1014,6 +1044,26 @@ agmt_get_pausetime(const Repl_Agmt *ra)
 	return return_value;
 }
 
+long
+agmt_get_flowcontrolwindow(const Repl_Agmt *ra)
+{
+	long return_value;
+	PR_ASSERT(NULL != ra);
+	PR_Lock(ra->lock);
+	return_value = ra->flowControlWindow;
+	PR_Unlock(ra->lock);
+	return return_value;
+}
+long
+agmt_get_flowcontrolpause(const Repl_Agmt *ra)
+{
+	long return_value;
+	PR_ASSERT(NULL != ra);
+	PR_Lock(ra->lock);
+	return_value = ra->flowControlPause;
+	PR_Unlock(ra->lock);
+	return return_value;
+}
 /*
  * Warning - reference to the long name of the agreement is returned.
  * The long name of an agreement is the DN of the agreement entry,
@@ -1775,6 +1825,90 @@ agmt_set_timeout_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
 	return return_value;
 }
 
+/*
+ * Set or reset the windows of entries sent without acknowledgment.
+ * The window is used during update to determine the number of
+ * entries will be send by the replica agreement without acknowledgment from the consumer
+ *
+ * Returns 0 if window set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolwindow_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+	Slapi_Attr *sattr = NULL;
+	int return_value = -1;
+
+	PR_ASSERT(NULL != ra);
+	PR_Lock(ra->lock);
+	if (ra->stop_in_progress)
+	{
+		PR_Unlock(ra->lock);
+		return return_value;
+	}
+
+	slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlWindow, &sattr);
+	if (NULL != sattr)
+	{
+		Slapi_Value *sval = NULL;
+		slapi_attr_first_value(sattr, &sval);
+		if (NULL != sval)
+		{
+			long tmpval = slapi_value_get_long(sval);
+			if (tmpval >= 0) {
+				ra->flowControlWindow = tmpval;
+				return_value = 0; /* success! */
+			}
+		}
+	}
+	PR_Unlock(ra->lock);
+	if (return_value == 0)
+	{
+		prot_notify_agmt_changed(ra->protocol, ra->long_name);
+	}
+	return return_value;
+}
+
+/*
+ * Set or reset the pause duration when #entries sent without acknowledgment overpass flow control window
+ *
+ * Returns 0 if pause set, or -1 if an error occurred.
+ */
+int
+agmt_set_flowcontrolpause_from_entry(Repl_Agmt *ra, const Slapi_Entry *e)
+{
+	Slapi_Attr *sattr = NULL;
+	int return_value = -1;
+
+	PR_ASSERT(NULL != ra);
+	PR_Lock(ra->lock);
+	if (ra->stop_in_progress)
+	{
+		PR_Unlock(ra->lock);
+		return return_value;
+	}
+
+	slapi_entry_attr_find(e, type_nsds5ReplicaFlowControlPause, &sattr);
+	if (NULL != sattr)
+	{
+		Slapi_Value *sval = NULL;
+		slapi_attr_first_value(sattr, &sval);
+		if (NULL != sval)
+		{
+			long tmpval = slapi_value_get_long(sval);
+			if (tmpval >= 0) {
+				ra->flowControlPause = tmpval;
+				return_value = 0; /* success! */
+			}
+		}
+	}
+	PR_Unlock(ra->lock);
+	if (return_value == 0)
+	{
+		prot_notify_agmt_changed(ra->protocol, ra->long_name);
+	}
+	return return_value;
+}
+
 int
 agmt_set_timeout(Repl_Agmt *ra, long timeout)
 {
@@ -1788,6 +1922,32 @@ agmt_set_timeout(Repl_Agmt *ra, long timeout)
 
     return 0;
 }
+int
+agmt_set_flowcontrolwindow(Repl_Agmt *ra, long window)
+{
+    PR_Lock(ra->lock);
+    if (ra->stop_in_progress){
+        PR_Unlock(ra->lock);
+        return -1;
+    }
+    ra->flowControlWindow = window;
+    PR_Unlock(ra->lock);
+
+    return 0;
+}
+int
+agmt_set_flowcontrolpause(Repl_Agmt *ra, long pause)
+{
+    PR_Lock(ra->lock);
+    if (ra->stop_in_progress){
+        PR_Unlock(ra->lock);
+        return -1;
+    }
+    ra->flowControlPause = pause;
+    PR_Unlock(ra->lock);
+
+    return 0;
+}
 
 /*
  * Set or reset the busywaittime
diff --git a/ldap/servers/plugins/replication/repl5_agmtlist.c b/ldap/servers/plugins/replication/repl5_agmtlist.c
index 8a70055..5eead07 100644
--- a/ldap/servers/plugins/replication/repl5_agmtlist.c
+++ b/ldap/servers/plugins/replication/repl5_agmtlist.c
@@ -330,6 +330,32 @@ agmtlist_modify_callback(Slapi_PBlock *pb, Slapi_Entry *entryBefore, Slapi_Entry
             }
 		}
 		else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+					type_nsds5ReplicaFlowControlWindow))
+		{
+			/* New replica timeout */
+			if (agmt_set_flowcontrolwindow_from_entry(agmt, e) != 0)
+			{
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " 
+						"failed to update the flow control window for agreement %s\n",
+						agmt_get_long_name(agmt));	
+				*returncode = LDAP_OPERATIONS_ERROR;
+				rc = SLAPI_DSE_CALLBACK_ERROR;
+			}
+		}
+		else if (slapi_attr_types_equivalent(mods[i]->mod_type,
+					type_nsds5ReplicaFlowControlPause))
+		{
+			/* New replica timeout */
+			if (agmt_set_flowcontrolpause_from_entry(agmt, e) != 0)
+			{
+				slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "agmtlist_modify_callback: " 
+						"failed to update the flow control pause for agreement %s\n",
+						agmt_get_long_name(agmt));	
+				*returncode = LDAP_OPERATIONS_ERROR;
+				rc = SLAPI_DSE_CALLBACK_ERROR;
+			}
+		}
+		else if (slapi_attr_types_equivalent(mods[i]->mod_type,
 					type_nsds5ReplicaBusyWaitTime))
 		{
 			/* New replica busywaittime */
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
index c004bfb..2971025 100644
--- a/ldap/servers/plugins/replication/repl5_connection.c
+++ b/ldap/servers/plugins/replication/repl5_connection.c
@@ -52,6 +52,7 @@ replica locked. Seems like right thing to do.
 */
 
 #include "repl5.h"
+#include "repl5_prot_private.h"
 #include "slapi-private.h"
 #if defined(USE_OPENLDAP)
 #include "ldap.h"
@@ -91,6 +92,7 @@ typedef struct repl_connection
 	struct timeval timeout;
 	int flag_agmt_changed;
 	char *plain;
+	void *tot_init_callback; /* Used during total update to do flow control */
 } repl_connection;
 
 /* #define DEFAULT_LINGER_TIME (5 * 60) */ /* 5 minutes */
@@ -274,6 +276,32 @@ conn_delete(Repl_Connection *conn)
 	PR_Unlock(conn->lock);
 }
 
+void
+conn_set_tot_update_cb_nolock(Repl_Connection *conn, void *cb_data)
+{
+    conn->tot_init_callback = (void *) cb_data;
+}
+void
+conn_set_tot_update_cb(Repl_Connection *conn, void *cb_data)
+{
+	PR_Lock(conn->lock);
+	conn_set_tot_update_cb_nolock(conn, cb_data);
+	PR_Unlock(conn->lock);
+}
+
+void
+conn_get_tot_update_cb_nolock(Repl_Connection *conn, void **cb_data)
+{
+    *cb_data = (void *) conn->tot_init_callback;
+}
+void
+conn_get_tot_update_cb(Repl_Connection *conn, void **cb_data)
+{
+	PR_Lock(conn->lock);
+	conn_get_tot_update_cb_nolock(conn, cb_data);
+	PR_Unlock(conn->lock);
+}
+
 /*
  * Return the last operation type processed by the connection
  * object, and the LDAP error encountered.
@@ -640,6 +668,131 @@ see_if_write_available(Repl_Connection *conn, PRIntervalTime timeout)
 }
 #endif /* ! USE_OPENLDAP */
 
+/* 
+ * During a total update, this function checks how much entries
+ * have been sent to the consumer without having received their acknowledgment.
+ * Basically it checks how late is the consumer.
+ * 
+ * If the consumer is too late, it pause the RA.sender (releasing the lock) to
+ * let the consumer to catch up and RA.reader to receive the acknowledgments.
+ * 
+ * Caller must hold conn->lock
+ */
+static void
+check_flow_control_tot_init(Repl_Connection *conn, int optype, const char *extop_oid, int sent_msgid)
+{
+    int rcv_msgid;
+    int once;
+    
+    if ((sent_msgid != 0) && (optype == CONN_EXTENDED_OPERATION) && (strcmp(extop_oid, REPL_NSDS50_REPLICATION_ENTRY_REQUEST_OID) == 0)) {
+        /* We are sending entries part of the total update of a consumer
+         * Wait a bit if the consumer needs to catchup from the current sent entries
+         */
+        rcv_msgid = repl5_tot_last_rcv_msgid(conn);
+        if (rcv_msgid == -1) {
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                            "%s: check_flow_control_tot_init no callback data [ msgid sent: %d]\n",
+                            agmt_get_long_name(conn->agmt),
+                            sent_msgid);
+        } else if (sent_msgid < rcv_msgid) {
+            slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+                            "%s: check_flow_control_tot_init invalid message ids [ msgid sent: %d, rcv: %d]\n",
+                            agmt_get_long_name(conn->agmt),
+                            sent_msgid,
+                            rcv_msgid);
+        } else if ((sent_msgid - rcv_msgid) > agmt_get_flowcontrolwindow(conn->agmt)) {
+            int totalUpdatePause;
+
+            totalUpdatePause = agmt_get_flowcontrolpause(conn->agmt);
+            if (totalUpdatePause) {
+                /* The consumer is late. Last sent entry compare to last acknowledged entry 
+                 * overpass the allowed limit (flowcontrolwindow)
+                 * Give some time to the consumer to catch up
+                 */
+                once = repl5_tot_flowcontrol_detection(conn, 1);
+                PR_Unlock(conn->lock);
+                if (once == 1) {
+                    /* This is the first time we hit total update flow control.
+                     * Log it at least once to inform administrator there is
+                     * a potential configuration issue here
+                     */
+                    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                            "%s: Total update flow control gives time (%d msec) to the consumer before sending more entries [ msgid sent: %d, rcv: %d])\n"
+                            "If total update fails you can try to increase %s and/or decrease %s in the replica agreement configuration\n",
+                            agmt_get_long_name(conn->agmt),
+                            totalUpdatePause,
+                            sent_msgid,
+                            rcv_msgid,
+                            type_nsds5ReplicaFlowControlPause,
+                            type_nsds5ReplicaFlowControlWindow);
+                }
+                DS_Sleep(PR_MillisecondsToInterval(totalUpdatePause));
+                PR_Lock(conn->lock);
+            }
+        }
+    }
+    
+}
+/* 
+ * Test if the connection is available to do a write.
+ * This function is doing a periodic polling of the connection.
+ * If the polling times out:
+ *  - it releases the connection lock (to let other thread ,i.e. 
+ *    replication result thread, the opportunity to use the connection)
+ *  - Sleeps for a short period (100ms)
+ *  - acquires the connection lock
+ * 
+ * It loops until
+ *  - it is available
+ *  - exceeds RA complete timeout
+ *  - server is shutdown
+ *  - connection is disconnected (Disable, stop, delete the RA
+ *    'terminate' the replication protocol and disconnect the connection)
+ * 
+ * Return:
+ *   - CONN_OPERATION_SUCCESS if the connection is available
+ *   - CONN_TIMEOUT if the overall polling/sleeping delay exceeds RA timeout
+ *   - CONN_NOT_CONNECTED if the replication connection state is disconnected
+ *   - other ConnResult
+ *
+ * Caller must hold conn->Lock. At the exit, conn->lock is held
+ */
+static ConnResult
+conn_is_available(Repl_Connection *conn)
+{
+    time_t poll_timeout_sec = 1; /* Polling for 1sec */
+    time_t yield_delay_msec = 100; /* Delay to wait */
+    time_t start_time = time( NULL );
+    time_t time_now;
+    ConnResult return_value = CONN_OPERATION_SUCCESS;
+    
+    while (!slapi_is_shutting_down() && (conn->state != STATE_DISCONNECTED)) {
+        return_value = see_if_write_available(conn, PR_SecondsToInterval(poll_timeout_sec));
+        if (return_value == CONN_TIMEOUT) {
+            /* in case of timeout we return CONN_TIMEOUT only
+             * if the RA.timeout is exceeded
+             */
+            time_now = time(NULL);
+            if (conn->timeout.tv_sec <= (time_now - start_time)) {
+                break;
+            } else {
+                /* Else give connection to others threads */
+                PR_Unlock(conn->lock);
+                slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+                        "%s: perform_operation transient timeout. retry)\n",
+                        agmt_get_long_name(conn->agmt));
+                DS_Sleep(PR_MillisecondsToInterval(yield_delay_msec));
+                PR_Lock(conn->lock);
+            }
+        } else {
+            break;
+        }
+    }
+    if (conn->state == STATE_DISCONNECTED) {
+        return_value = CONN_NOT_CONNECTED;
+    }
+    return return_value;
+}
 /*
  * Common code to send an LDAPv3 operation and collect the result.
  * Return values:
@@ -683,10 +836,13 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
 
 		Slapi_Eq_Context eqctx = repl5_start_debug_timeout(&setlevel);
 
-		return_value = see_if_write_available(
-			conn, PR_SecondsToInterval(conn->timeout.tv_sec));
+		return_value = conn_is_available(conn);
 		if (return_value != CONN_OPERATION_SUCCESS) {
 			PR_Unlock(conn->lock);
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+							"%s: perform_operation connection is not available (%d)\n",
+							agmt_get_long_name(conn->agmt),
+							return_value);
 			return return_value;
 		}
 		conn->last_operation = optype;
@@ -758,6 +914,9 @@ perform_operation(Repl_Connection *conn, int optype, const char *dn,
 		 */
 		return_value = CONN_NOT_CONNECTED;
 	}
+    
+	check_flow_control_tot_init(conn, optype, extop_oid, msgid);
+        
 	PR_Unlock(conn->lock); /* release the lock */
 	if (message_id)
 	{
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 3bb68e7..b867fc4 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -108,6 +108,7 @@ typedef struct result_data
 	int stop_result_thread; /* Flag used to tell the result thread to exit */
 	int last_message_id_sent;
 	int last_message_id_received;
+	int flowcontrol_detection;
 	int result; /* The UPDATE_TRANSIENT_ERROR etc */
 } result_data;
 
@@ -460,6 +461,23 @@ repl5_inc_destroy_async_result_thread(result_data *rd)
 	return retval;
 }
 
+/* The interest of this routine is to give time to the consumer
+ * to apply the sent updates and return the acks.
+ * So the caller should not hold the replication connection lock
+ * to let the RA.reader receives the acks.
+ */
+static void
+repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
+{
+    PR_Lock(rd->lock);
+    if ((rd->last_message_id_received <= rd->last_message_id_sent) &&
+        ((rd->last_message_id_sent - rd->last_message_id_received) >= agmt_get_flowcontrolwindow(agmt))) {
+        rd->flowcontrol_detection++;
+        DS_Sleep(PR_MillisecondsToInterval(agmt_get_flowcontrolpause(agmt)));
+    }
+    PR_Unlock(rd->lock);
+}
+
 static void
 repl5_inc_waitfor_async_results(result_data *rd)
 {
@@ -1683,7 +1701,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 	{
 		int finished = 0;
 		ConnResult replay_crc;
-        char csn_str[CSN_STRSIZE];
+		char csn_str[CSN_STRSIZE];
 
 		/* Start the results reading thread */
 		rd = repl5_inc_rd_new(prp);
@@ -1818,6 +1836,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 						sop->replica_id = replica_id;
 						PL_strncpyz(sop->uniqueid, uniqueid, sizeof(sop->uniqueid));
 						repl5_int_push_operation(rd,sop);
+						repl5_inc_flow_control_results(prp->agmt, rd);
 					} else {
 						slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
 							"%s: Skipping update operation with no message_id (uniqueid %s, CSN %s):\n",
@@ -1906,6 +1925,17 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 			}
 			*num_changes_sent = rd->num_changes_sent;
 		}
+		PR_Lock(rd->lock);
+		if (rd->flowcontrol_detection) {
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+					"%s: Incremental update flow control triggered %d times\n"
+					"You may increase %s and/or decrease %s in the replica agreement configuration\n",
+					agmt_get_long_name(prp->agmt),
+					rd->flowcontrol_detection,
+					type_nsds5ReplicaFlowControlPause,
+					type_nsds5ReplicaFlowControlWindow);             
+		}
+		PR_Unlock(rd->lock);
 		repl5_inc_rd_destroy(&rd);
 
 		cl5_operation_parameters_done ( entry.op );
diff --git a/ldap/servers/plugins/replication/repl5_prot_private.h b/ldap/servers/plugins/replication/repl5_prot_private.h
index 586e1eb..1b1c00b 100644
--- a/ldap/servers/plugins/replication/repl5_prot_private.h
+++ b/ldap/servers/plugins/replication/repl5_prot_private.h
@@ -79,6 +79,8 @@ typedef struct private_repl_protocol
 
 extern Private_Repl_Protocol *Repl_5_Inc_Protocol_new();
 extern Private_Repl_Protocol *Repl_5_Tot_Protocol_new();
+extern int repl5_tot_last_rcv_msgid(Repl_Connection *conn);
+extern int repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment);
 extern Private_Repl_Protocol *Windows_Inc_Protocol_new();
 extern Private_Repl_Protocol *Windows_Tot_Protocol_new();
 
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
index d4f0fcc..adadd44 100644
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
@@ -82,6 +82,7 @@ typedef struct callback_data
 	int stop_result_thread; /* Flag used to tell the result thread to exit */
 	int last_message_id_sent;
 	int last_message_id_received;
+	int flowcontrol_detection;
 } callback_data;
 
 /* 
@@ -428,13 +429,19 @@ repl5_tot_run(Private_Repl_Protocol *prp)
                                   LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, 
                                   repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
 
-    cb_data.prp = prp;
-    cb_data.rc = 0;
+	cb_data.prp = prp;
+	cb_data.rc = 0;
 	cb_data.num_entries = 0UL;
 	cb_data.sleep_on_busy = 0UL;
 	cb_data.last_busy = current_time ();
+	cb_data.flowcontrol_detection = 0;
 	cb_data.lock = PR_NewLock();
 
+	/* This allows during perform_operation to check the callback data
+	 * especially to do flow contol on delta send msgid / recv msgid
+	 */
+	conn_set_tot_update_cb(prp->conn, (void *) &cb_data);
+    
 	/* Before we get started on sending entries to the replica, we need to 
 	 * setup things for async propagation: 
 	 * 1. Create a thread that will read the LDAP results from the connection.
@@ -506,6 +513,17 @@ repl5_tot_run(Private_Repl_Protocol *prp)
 done:
 	slapi_sdn_free(&area_sdn);
 	slapi_ch_free_string(&hostname);
+	if (cb_data.flowcontrol_detection > 1)
+	{
+		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+			"%s: Total update flow control triggered %d times\n"
+			"You may increase %s and/or decrease %s in the replica agreement configuration\n",
+			agmt_get_long_name(prp->agmt),
+			cb_data.flowcontrol_detection,
+			type_nsds5ReplicaFlowControlPause,
+			type_nsds5ReplicaFlowControlWindow);
+	}
+	conn_set_tot_update_cb(prp->conn, NULL);
 	if (cb_data.lock) 
 	{
 		PR_DestroyLock(cb_data.lock);
@@ -645,6 +663,37 @@ void get_result (int rc, void *cb_data)
     ((callback_data*)cb_data)->rc = rc;
 }
 
+/* Call must hold the connection lock */
+int
+repl5_tot_last_rcv_msgid(Repl_Connection *conn) 
+{
+    struct callback_data *cb_data;
+    
+    conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+    if (cb_data == NULL) {
+        return -1;
+    } else {
+        return cb_data->last_message_id_received;
+    }
+}
+
+/* Increase the flowcontrol counter
+ * Call must hold the connection lock
+ */
+int
+repl5_tot_flowcontrol_detection(Repl_Connection *conn, int increment) 
+{
+    struct callback_data *cb_data;
+    
+    conn_get_tot_update_cb_nolock(conn, (void **) &cb_data);
+    if (cb_data == NULL) {
+        return -1;
+    } else {
+        cb_data->flowcontrol_detection += increment;
+        return cb_data->flowcontrol_detection;
+    }
+}
+
 static 
 int send_entry (Slapi_Entry *e, void *cb_data)
 {
diff --git a/ldap/servers/plugins/replication/repl_globals.c b/ldap/servers/plugins/replication/repl_globals.c
index 5609def..e2157fa 100644
--- a/ldap/servers/plugins/replication/repl_globals.c
+++ b/ldap/servers/plugins/replication/repl_globals.c
@@ -139,6 +139,8 @@ const char *type_nsds5ReplicaBusyWaitTime = "nsds5ReplicaBusyWaitTime";
 const char *type_nsds5ReplicaSessionPauseTime = "nsds5ReplicaSessionPauseTime";
 const char *type_nsds5ReplicaEnabled = "nsds5ReplicaEnabled";
 const char *type_nsds5ReplicaStripAttrs = "nsds5ReplicaStripAttrs";
+const char* type_nsds5ReplicaFlowControlWindow = "nsds5ReplicaFlowControlWindow";
+const char* type_nsds5ReplicaFlowControlPause = "nsds5ReplicaFlowControlPause";
 
 /* windows sync specific attributes */
 const char *type_nsds7WindowsReplicaArea = "nsds7WindowsReplicaSubtree";
-- 
1.9.3