Blob Blame History Raw
From 09cff2c4c01bbcaf45df553869d0b6cb8acfad2b Mon Sep 17 00:00:00 2001
From: Mark Reynolds <mreynolds@redhat.com>
Date: Sun, 17 Jan 2016 18:25:43 -0500
Subject: [PATCH 96/99] Ticket 47788 - Supplier can skip a failing update,
 although  it should retry

Bug Description:  If a replicated update fails on the consumer,
                  the update is never tried.  This is due to the
                  replication async result thread missing the failure
                  before another update is replicated and it succeeds.

                  This second update that succeeds updates the consumer
                  RUV.  This makes it appear that the consumer is caught
                  up, and the supplier never resends that original
                  failed update.

Fix Description:  When a replicated update fails, and its an error we can
                  not ignore, the connection is closed.  Which stops the
                  replication session, and prevents any further updates
                  coming in and updating the consumer RUV.  This allows
                  the supplier to correctly retry the operation that
                  failed on the next replication session.

https://fedorahosted.org/389/ticket/47788

Reviewed by: nhosoi, wibrown, and rmeggins (Thanks!!!)

(cherry picked from commit ab6501a963c94b2b6b5fa8d1924519ef1c26b0bd)
(cherry picked from commit 407c545f07c06520f8378649fc0ac8fe20748dc7)
---
 ldap/servers/plugins/replication/repl5.h           |   1 +
 .../servers/plugins/replication/repl5_connection.c |  19 +--
 .../plugins/replication/repl5_inc_protocol.c       | 182 ++++++++++++---------
 ldap/servers/plugins/replication/repl5_plugins.c   |  60 ++++++-
 ldap/servers/plugins/replication/urp.c             |   2 +-
 5 files changed, 168 insertions(+), 96 deletions(-)

diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index df92ca0..307da82 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -608,6 +608,7 @@ void replica_incr_agmt_count(Replica *r);
 void replica_decr_agmt_count(Replica *r);
 PRUint64 replica_get_precise_purging(Replica *r);
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
+PRBool ignore_error_and_keep_going(int error);
 
 /* The functions below handles the state flag */
 /* Current internal state flags */
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
index 1515ca1..d193938 100644
--- a/ldap/servers/plugins/replication/repl5_connection.c
+++ b/ldap/servers/plugins/replication/repl5_connection.c
@@ -480,17 +480,17 @@ conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retda
 					conn->last_ldap_error = rc;
 					close_connection_internal(conn); /* we already have the lock */
 					return_value = CONN_NOT_CONNECTED;
+					goto done;
 				}
 				else if (IS_DISCONNECT_ERROR(err))
 				{
 					conn->last_ldap_error = err;
 					close_connection_internal(conn); /* we already have the lock */
 					return_value = CONN_NOT_CONNECTED;
+					goto done;
 				}
 				/* Got a result */
-				if ((rc == LDAP_SUCCESS) && (err == LDAP_BUSY))
-				    	return_value = CONN_BUSY;
-				else if (retoidp)
+				if (retoidp /* total update */)
 				{
 					if (!((rc == LDAP_SUCCESS) && (err == LDAP_BUSY)))
 					{
@@ -519,16 +519,11 @@ conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retda
 					}
 					return_value = LDAP_SUCCESS == conn->last_ldap_error ? CONN_OPERATION_SUCCESS : CONN_OPERATION_FAILED;
 				}
-				/*
-				 * XXXggood do I need to free matched, referrals,
-				 * anything else? Or can I pass NULL for the args
-				 * I'm not interested in?
-				 */
-				/* Good question! Meanwhile, as RTM aproaches, let's free them... */
-				slapi_ch_free((void **) &errmsg);
-				slapi_ch_free((void **) &matched);
-				charray_free(referrals);
 				conn->status = STATUS_CONNECTED;
+done:
+				slapi_ch_free_string(&errmsg);
+				slapi_ch_free_string(&matched);
+				charray_free(referrals);
 			}
 			if (res) ldap_msgfree(res);
 			PR_Unlock(conn->lock); /* release the conn lock */
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 244bbb2..927f835 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -146,7 +146,6 @@ static void protocol_sleep(Private_Repl_Protocol *prp, PRIntervalTime duration);
 static int send_updates(Private_Repl_Protocol *prp, RUV *ruv, PRUint32 *num_changes_sent);
 static void repl5_inc_backoff_expired(time_t timer_fire_time, void *arg);
 static int examine_update_vector(Private_Repl_Protocol *prp, RUV *ruv);
-static PRBool ignore_error_and_keep_going(int error);
 static const char* state2name (int state);
 static const char* event2name (int event);
 static const char* op2string (int op);
@@ -450,11 +449,13 @@ repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
     PR_Unlock(rd->lock);
 }
 
-static void
+static int
 repl5_inc_waitfor_async_results(result_data *rd)
 {
 	int done = 0;
 	int loops = 0;
+	int rc = UPDATE_NO_MORE_UPDATES;
+
 	/* Keep pulling results off the LDAP connection until we catch up to the last message id stored in the rd */
 	while (!done && !slapi_is_shutting_down())
 	{
@@ -470,6 +471,10 @@ repl5_inc_waitfor_async_results(result_data *rd)
 		} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) {
 			done = 1; /* no connection == no more results */
 		}
+		/*
+		 * Return the last operation result
+		 */
+		rc = rd->result;
 		PR_Unlock(rd->lock);
 		if (!done) {
 			/* If not then sleep a bit */
@@ -487,6 +492,7 @@ repl5_inc_waitfor_async_results(result_data *rd)
 			done = 1;
 		}
 	}
+	return rc;
 }
 
 /*
@@ -1467,78 +1473,84 @@ static int
 repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_crc, int connection_error, char *csn_str, char *uniqueid, ReplicaId replica_id, int* finished, PRUint32 *num_changes_sent)
 {
 	int return_value = 0;
-	
-	/* Indentation is wrong here so we can get a sensible cvs diff */
-				if (CONN_OPERATION_SUCCESS != replay_crc)
-				{
-					/* Figure out what to do next */
-					if (CONN_OPERATION_FAILED == replay_crc)
-					{
-						/* Map ldap error code to return value */
-						if (!ignore_error_and_keep_going(connection_error))
-						{
-							return_value = UPDATE_TRANSIENT_ERROR;
-							*finished = 1;
-						}
-						else
-						{
-							agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
-						}
-						slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
-							"%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
-							agmt_get_long_name(prp->agmt),
-							uniqueid, csn_str,
-							ldap_err2string(connection_error), connection_error,
-							*finished ? "Will retry later" : "Skipping");
-					}
-					else if (CONN_NOT_CONNECTED == replay_crc)
-					{
-						/* We lost the connection - enter backoff state */
 
-						return_value = UPDATE_CONNECTION_LOST;
-						*finished = 1;
-						slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
-							"%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
-							"%s(%d). Will retry later.\n",
-							agmt_get_long_name(prp->agmt),
-							uniqueid, csn_str,
-							connection_error ? ldap_err2string(connection_error) : "Connection lost",
-							connection_error);
-					}
-					else if (CONN_TIMEOUT == replay_crc)
-					{
-						return_value = UPDATE_TIMEOUT;
-						*finished = 1;
-						slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
-							"%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
-							"%s.\n",
-							agmt_get_long_name(prp->agmt),
-							uniqueid, csn_str,
-							connection_error ? ldap_err2string(connection_error) : "Timeout");
-					}
-					else if (CONN_LOCAL_ERROR == replay_crc)
-					{
-						/*
-						 * Something bad happened on the local server - enter 
-						 * backoff state.
-						 */
-						return_value = UPDATE_TRANSIENT_ERROR;
-						*finished = 1;
-						slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
-							"%s: Failed to replay change (uniqueid %s, CSN %s): "
-							"Local error. Will retry later.\n",
-							agmt_get_long_name(prp->agmt),
-							uniqueid, csn_str);
-					}
-						
-				}
-				else
-				{
-					/* Positive response received */
-					(*num_changes_sent)++;
-					agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
-				}
-				return return_value;
+	if (CONN_OPERATION_SUCCESS != replay_crc)
+	{
+		/* Figure out what to do next */
+		if (CONN_OPERATION_FAILED == replay_crc)
+		{
+			/* Map ldap error code to return value */
+			if (!ignore_error_and_keep_going(connection_error))
+			{
+				return_value = UPDATE_TRANSIENT_ERROR;
+				*finished = 1;
+			}
+			else
+			{
+				agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
+			}
+			slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
+				"%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
+				agmt_get_long_name(prp->agmt),
+				uniqueid, csn_str,
+				ldap_err2string(connection_error), connection_error,
+				*finished ? "Will retry later" : "Skipping");
+		}
+		else if (CONN_NOT_CONNECTED == replay_crc)
+		{
+			/* We lost the connection - enter backoff state */
+
+			return_value = UPDATE_CONNECTION_LOST;
+			*finished = 1;
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+				"%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
+				"%s(%d). Will retry later.\n",
+				agmt_get_long_name(prp->agmt),
+				uniqueid, csn_str,
+				connection_error ? ldap_err2string(connection_error) : "Connection lost",
+				connection_error);
+		}
+		else if (CONN_TIMEOUT == replay_crc)
+		{
+			return_value = UPDATE_TIMEOUT;
+			*finished = 1;
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+				"%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
+				"%s.\n",
+				agmt_get_long_name(prp->agmt),
+				uniqueid, csn_str,
+				connection_error ? ldap_err2string(connection_error) : "Timeout");
+		}
+		else if (CONN_LOCAL_ERROR == replay_crc)
+		{
+			/*
+			 * Something bad happened on the local server - enter
+			 * backoff state.
+			 */
+			return_value = UPDATE_TRANSIENT_ERROR;
+			*finished = 1;
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+				"%s: Failed to replay change (uniqueid %s, CSN %s): "
+				"Local error. Will retry later.\n",
+				agmt_get_long_name(prp->agmt),
+				uniqueid, csn_str);
+		}
+		if (*finished){
+			/*
+			 * A serious error has occurred, the consumer might have closed
+			 * the connection already, but we need to close the conn on the
+			 * supplier side to properly set the conn structure as closed.
+			 */
+			conn_disconnect(prp->conn);
+		}
+	}
+	else
+	{
+		/* Positive response received */
+		(*num_changes_sent)++;
+		agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
+	}
+	return return_value;
 }
 
 /*
@@ -1556,7 +1568,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 {
 	CL5Entry entry;
 	slapi_operation_parameters op;
-	int return_value;
+	int return_value = 0;
 	int rc;
 	CL5ReplayIterator *changelog_iterator;
 	int message_id = 0;
@@ -1929,9 +1941,23 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 		{
 			/* We need to ensure that we wait until all the responses have been received from our operations */
 			if (return_value != UPDATE_CONNECTION_LOST) {
-				rd->WaitForAsyncResults = agmt_get_WaitForAsyncResults(prp->agmt);
-				/* if connection was lost/closed, there will be nothing to read */
-				repl5_inc_waitfor_async_results(rd);
+				/*
+				 * If we already have an error, there is no need to check the
+				 * async result thread anymore.
+				 */
+				if (return_value == UPDATE_NO_MORE_UPDATES)
+				{
+					/*
+					 * We need to double check that an error hasn't popped up from
+					 * the async result thread since our last check.
+					 */
+					int final_result;
+
+					rd->WaitForAsyncResults = agmt_get_WaitForAsyncResults(prp->agmt);
+					if((final_result = repl5_inc_waitfor_async_results(rd))){
+						return_value = final_result;
+					}
+				}
 			}
 
 			rc = repl5_inc_destroy_async_result_thread(rd);
@@ -2220,7 +2246,7 @@ examine_update_vector(Private_Repl_Protocol *prp, RUV *remote_ruv)
  * We stop if there's some indication that the server just completely
  * failed to process the operation, e.g. LDAP_OPERATIONS_ERROR.
  */
-static PRBool
+PRBool
 ignore_error_and_keep_going(int error)
 {
 	int return_value = PR_FALSE;
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
index 8992055..c2fa214 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -1231,12 +1231,13 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 static int
 process_postop (Slapi_PBlock *pb)
 {
-    int rc = LDAP_SUCCESS;
-    Slapi_Operation *op;
+	Slapi_Operation *op;
 	Slapi_Backend *be;
-    int is_replicated_operation = 0;
+	int is_replicated_operation = 0;
 	CSN *opcsn = NULL;
 	char sessionid[REPL_SESSION_ID_SIZE];
+	int retval = LDAP_SUCCESS;
+	int rc = 0;
 
     /* we just let fixup operations through */
     slapi_pblock_get( pb, SLAPI_OPERATION, &op );
@@ -1260,8 +1261,8 @@ process_postop (Slapi_PBlock *pb)
 
 	get_repl_session_id (pb, sessionid, &opcsn);
 
-	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
-	if (rc == LDAP_SUCCESS)
+	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &retval);
+	if (retval == LDAP_SUCCESS)
 	{
 		agmtlist_notify_all(pb);
 		rc = SLAPI_PLUGIN_SUCCESS;
@@ -1306,6 +1307,55 @@ process_postop (Slapi_PBlock *pb)
 			slapi_ch_free((void **) &op_params->p.p_modrdn.modrdn_newsuperior_address.uniqueid);
 		}
 	}
+	if (!ignore_error_and_keep_going(retval)){
+		/*
+		 * We have an error we can't ignore.  Release the replica and close
+		 * the connection to stop the replication session.
+		 */
+		consumer_connection_extension *connext = NULL;
+		Slapi_Connection *conn = NULL;
+		char csn_str[CSN_STRSIZE] = {'\0'};
+		PRUint64 connid = 0;
+		int opid = 0;
+
+		slapi_pblock_get(pb, SLAPI_CONNECTION, &conn);
+		slapi_pblock_get(pb, SLAPI_OPERATION_ID, &opid);
+		slapi_pblock_get(pb, SLAPI_CONN_ID, &connid);
+		if (conn)
+		{
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+				"process_postop: Failed to apply update (%s) error (%d).  "
+				"Aborting replication session(conn=%" NSPRIu64 " op=%d)\n",
+				csn_as_string(opcsn, PR_FALSE, csn_str), retval,
+				connid, opid);
+			/*
+			 * Release this replica so new sessions can begin
+			 */
+			connext = consumer_connection_extension_acquire_exclusive_access(conn, connid, opid);
+			if (connext && connext->replica_acquired)
+			{
+				int zero = 0;
+				Replica *r = (Replica*)object_get_data ((Object*)connext->replica_acquired);
+
+				replica_relinquish_exclusive_access(r, connid, opid);
+				object_release ((Object*)connext->replica_acquired);
+				connext->replica_acquired = NULL;
+				connext->isreplicationsession = 0;
+				slapi_pblock_set( pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero );
+			}
+			if (connext){
+				consumer_connection_extension_relinquish_exclusive_access(conn, connid, opid, PR_FALSE);
+			}
+
+			/*
+			 * Close the connection to end the current session with the
+			 * supplier.  This prevents new updates from coming in and
+			 * updating the consumer RUV - which would cause this failed
+			 * update to be never be replayed.
+			 */
+			slapi_disconnect_server(conn);
+		}
+	}
 	if (NULL == opcsn)
 		opcsn = operation_get_csn(op);
 	if (opcsn)
diff --git a/ldap/servers/plugins/replication/urp.c b/ldap/servers/plugins/replication/urp.c
index 5fe6f55..8d0d735 100644
--- a/ldap/servers/plugins/replication/urp.c
+++ b/ldap/servers/plugins/replication/urp.c
@@ -122,7 +122,7 @@ urp_add_operation( Slapi_PBlock *pb )
 		slapi_log_error(slapi_log_urp, sessionid,
 		          "urp_add (%s): an entry with this uniqueid already exists.\n",
 		          slapi_entry_get_dn_const(existing_uniqueid_entry));
-		op_result= LDAP_UNWILLING_TO_PERFORM;
+		op_result= LDAP_ALREADY_EXISTS;
 		slapi_pblock_set(pb, SLAPI_RESULT_CODE, &op_result);
 		rc = SLAPI_PLUGIN_NOOP; /* Ignore this Operation */
 		PROFILE_POINT; /* Add Conflict; UniqueID Exists;  Ignore */
-- 
2.4.11