Blame SOURCES/0096-Ticket-47788-Supplier-can-skip-a-failing-update-alth.patch

c16027
From 09cff2c4c01bbcaf45df553869d0b6cb8acfad2b Mon Sep 17 00:00:00 2001
c16027
From: Mark Reynolds <mreynolds@redhat.com>
c16027
Date: Sun, 17 Jan 2016 18:25:43 -0500
c16027
Subject: [PATCH 96/99] Ticket 47788 - Supplier can skip a failing update,
c16027
 although  it should retry
c16027
c16027
Bug Description:  If a replicated update fails on the consumer,
c16027
                  the update is never tried.  This is due to the
c16027
                  replication async result thread missing the failure
c16027
                  before another update is replicated and it succeeds.
c16027
c16027
                  This second update that succeeds updates the consumer
c16027
                  RUV.  This makes it appear that the consumer is caught
c16027
                  up, and the supplier never resends that original
c16027
                  failed update.
c16027
c16027
Fix Description:  When a replicated update fails, and its an error we can
c16027
                  not ignore, the connection is closed.  Which stops the
c16027
                  replication session, and prevents any further updates
c16027
                  coming in and updating the consumer RUV.  This allows
c16027
                  the supplier to correctly retry the operation that
c16027
                  failed on the next replication session.
c16027
c16027
https://fedorahosted.org/389/ticket/47788
c16027
c16027
Reviewed by: nhosoi, wibrown, and rmeggins (Thanks!!!)
c16027
c16027
(cherry picked from commit ab6501a963c94b2b6b5fa8d1924519ef1c26b0bd)
c16027
(cherry picked from commit 407c545f07c06520f8378649fc0ac8fe20748dc7)
c16027
---
c16027
 ldap/servers/plugins/replication/repl5.h           |   1 +
c16027
 .../servers/plugins/replication/repl5_connection.c |  19 +--
c16027
 .../plugins/replication/repl5_inc_protocol.c       | 182 ++++++++++++---------
c16027
 ldap/servers/plugins/replication/repl5_plugins.c   |  60 ++++++-
c16027
 ldap/servers/plugins/replication/urp.c             |   2 +-
c16027
 5 files changed, 168 insertions(+), 96 deletions(-)
c16027
c16027
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
c16027
index df92ca0..307da82 100644
c16027
--- a/ldap/servers/plugins/replication/repl5.h
c16027
+++ b/ldap/servers/plugins/replication/repl5.h
c16027
@@ -608,6 +608,7 @@ void replica_incr_agmt_count(Replica *r);
c16027
 void replica_decr_agmt_count(Replica *r);
c16027
 PRUint64 replica_get_precise_purging(Replica *r);
c16027
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
c16027
+PRBool ignore_error_and_keep_going(int error);
c16027
 
c16027
 /* The functions below handles the state flag */
c16027
 /* Current internal state flags */
c16027
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
c16027
index 1515ca1..d193938 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_connection.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_connection.c
c16027
@@ -480,17 +480,17 @@ conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retda
c16027
 					conn->last_ldap_error = rc;
c16027
 					close_connection_internal(conn); /* we already have the lock */
c16027
 					return_value = CONN_NOT_CONNECTED;
c16027
+					goto done;
c16027
 				}
c16027
 				else if (IS_DISCONNECT_ERROR(err))
c16027
 				{
c16027
 					conn->last_ldap_error = err;
c16027
 					close_connection_internal(conn); /* we already have the lock */
c16027
 					return_value = CONN_NOT_CONNECTED;
c16027
+					goto done;
c16027
 				}
c16027
 				/* Got a result */
c16027
-				if ((rc == LDAP_SUCCESS) && (err == LDAP_BUSY))
c16027
-				    	return_value = CONN_BUSY;
c16027
-				else if (retoidp)
c16027
+				if (retoidp /* total update */)
c16027
 				{
c16027
 					if (!((rc == LDAP_SUCCESS) && (err == LDAP_BUSY)))
c16027
 					{
c16027
@@ -519,16 +519,11 @@ conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retda
c16027
 					}
c16027
 					return_value = LDAP_SUCCESS == conn->last_ldap_error ? CONN_OPERATION_SUCCESS : CONN_OPERATION_FAILED;
c16027
 				}
c16027
-				/*
c16027
-				 * XXXggood do I need to free matched, referrals,
c16027
-				 * anything else? Or can I pass NULL for the args
c16027
-				 * I'm not interested in?
c16027
-				 */
c16027
-				/* Good question! Meanwhile, as RTM aproaches, let's free them... */
c16027
-				slapi_ch_free((void **) &errmsg);
c16027
-				slapi_ch_free((void **) &matched);
c16027
-				charray_free(referrals);
c16027
 				conn->status = STATUS_CONNECTED;
c16027
+done:
c16027
+				slapi_ch_free_string(&errmsg);
c16027
+				slapi_ch_free_string(&matched);
c16027
+				charray_free(referrals);
c16027
 			}
c16027
 			if (res) ldap_msgfree(res);
c16027
 			PR_Unlock(conn->lock); /* release the conn lock */
c16027
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
c16027
index 244bbb2..927f835 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
c16027
@@ -146,7 +146,6 @@ static void protocol_sleep(Private_Repl_Protocol *prp, PRIntervalTime duration);
c16027
 static int send_updates(Private_Repl_Protocol *prp, RUV *ruv, PRUint32 *num_changes_sent);
c16027
 static void repl5_inc_backoff_expired(time_t timer_fire_time, void *arg);
c16027
 static int examine_update_vector(Private_Repl_Protocol *prp, RUV *ruv);
c16027
-static PRBool ignore_error_and_keep_going(int error);
c16027
 static const char* state2name (int state);
c16027
 static const char* event2name (int event);
c16027
 static const char* op2string (int op);
c16027
@@ -450,11 +449,13 @@ repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
c16027
     PR_Unlock(rd->lock);
c16027
 }
c16027
 
c16027
-static void
c16027
+static int
c16027
 repl5_inc_waitfor_async_results(result_data *rd)
c16027
 {
c16027
 	int done = 0;
c16027
 	int loops = 0;
c16027
+	int rc = UPDATE_NO_MORE_UPDATES;
c16027
+
c16027
 	/* Keep pulling results off the LDAP connection until we catch up to the last message id stored in the rd */
c16027
 	while (!done && !slapi_is_shutting_down())
c16027
 	{
c16027
@@ -470,6 +471,10 @@ repl5_inc_waitfor_async_results(result_data *rd)
c16027
 		} else if (rd->abort && (rd->result == UPDATE_CONNECTION_LOST)) {
c16027
 			done = 1; /* no connection == no more results */
c16027
 		}
c16027
+		/*
c16027
+		 * Return the last operation result
c16027
+		 */
c16027
+		rc = rd->result;
c16027
 		PR_Unlock(rd->lock);
c16027
 		if (!done) {
c16027
 			/* If not then sleep a bit */
c16027
@@ -487,6 +492,7 @@ repl5_inc_waitfor_async_results(result_data *rd)
c16027
 			done = 1;
c16027
 		}
c16027
 	}
c16027
+	return rc;
c16027
 }
c16027
 
c16027
 /*
c16027
@@ -1467,78 +1473,84 @@ static int
c16027
 repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_crc, int connection_error, char *csn_str, char *uniqueid, ReplicaId replica_id, int* finished, PRUint32 *num_changes_sent)
c16027
 {
c16027
 	int return_value = 0;
c16027
-	
c16027
-	/* Indentation is wrong here so we can get a sensible cvs diff */
c16027
-				if (CONN_OPERATION_SUCCESS != replay_crc)
c16027
-				{
c16027
-					/* Figure out what to do next */
c16027
-					if (CONN_OPERATION_FAILED == replay_crc)
c16027
-					{
c16027
-						/* Map ldap error code to return value */
c16027
-						if (!ignore_error_and_keep_going(connection_error))
c16027
-						{
c16027
-							return_value = UPDATE_TRANSIENT_ERROR;
c16027
-							*finished = 1;
c16027
-						}
c16027
-						else
c16027
-						{
c16027
-							agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
c16027
-						}
c16027
-						slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
c16027
-							"%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
c16027
-							agmt_get_long_name(prp->agmt),
c16027
-							uniqueid, csn_str,
c16027
-							ldap_err2string(connection_error), connection_error,
c16027
-							*finished ? "Will retry later" : "Skipping");
c16027
-					}
c16027
-					else if (CONN_NOT_CONNECTED == replay_crc)
c16027
-					{
c16027
-						/* We lost the connection - enter backoff state */
c16027
 
c16027
-						return_value = UPDATE_CONNECTION_LOST;
c16027
-						*finished = 1;
c16027
-						slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
-							"%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
c16027
-							"%s(%d). Will retry later.\n",
c16027
-							agmt_get_long_name(prp->agmt),
c16027
-							uniqueid, csn_str,
c16027
-							connection_error ? ldap_err2string(connection_error) : "Connection lost",
c16027
-							connection_error);
c16027
-					}
c16027
-					else if (CONN_TIMEOUT == replay_crc)
c16027
-					{
c16027
-						return_value = UPDATE_TIMEOUT;
c16027
-						*finished = 1;
c16027
-						slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
-							"%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
c16027
-							"%s.\n",
c16027
-							agmt_get_long_name(prp->agmt),
c16027
-							uniqueid, csn_str,
c16027
-							connection_error ? ldap_err2string(connection_error) : "Timeout");
c16027
-					}
c16027
-					else if (CONN_LOCAL_ERROR == replay_crc)
c16027
-					{
c16027
-						/*
c16027
-						 * Something bad happened on the local server - enter 
c16027
-						 * backoff state.
c16027
-						 */
c16027
-						return_value = UPDATE_TRANSIENT_ERROR;
c16027
-						*finished = 1;
c16027
-						slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
-							"%s: Failed to replay change (uniqueid %s, CSN %s): "
c16027
-							"Local error. Will retry later.\n",
c16027
-							agmt_get_long_name(prp->agmt),
c16027
-							uniqueid, csn_str);
c16027
-					}
c16027
-						
c16027
-				}
c16027
-				else
c16027
-				{
c16027
-					/* Positive response received */
c16027
-					(*num_changes_sent)++;
c16027
-					agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
c16027
-				}
c16027
-				return return_value;
c16027
+	if (CONN_OPERATION_SUCCESS != replay_crc)
c16027
+	{
c16027
+		/* Figure out what to do next */
c16027
+		if (CONN_OPERATION_FAILED == replay_crc)
c16027
+		{
c16027
+			/* Map ldap error code to return value */
c16027
+			if (!ignore_error_and_keep_going(connection_error))
c16027
+			{
c16027
+				return_value = UPDATE_TRANSIENT_ERROR;
c16027
+				*finished = 1;
c16027
+			}
c16027
+			else
c16027
+			{
c16027
+				agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
c16027
+			}
c16027
+			slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
c16027
+				"%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
c16027
+				agmt_get_long_name(prp->agmt),
c16027
+				uniqueid, csn_str,
c16027
+				ldap_err2string(connection_error), connection_error,
c16027
+				*finished ? "Will retry later" : "Skipping");
c16027
+		}
c16027
+		else if (CONN_NOT_CONNECTED == replay_crc)
c16027
+		{
c16027
+			/* We lost the connection - enter backoff state */
c16027
+
c16027
+			return_value = UPDATE_CONNECTION_LOST;
c16027
+			*finished = 1;
c16027
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
+				"%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
c16027
+				"%s(%d). Will retry later.\n",
c16027
+				agmt_get_long_name(prp->agmt),
c16027
+				uniqueid, csn_str,
c16027
+				connection_error ? ldap_err2string(connection_error) : "Connection lost",
c16027
+				connection_error);
c16027
+		}
c16027
+		else if (CONN_TIMEOUT == replay_crc)
c16027
+		{
c16027
+			return_value = UPDATE_TIMEOUT;
c16027
+			*finished = 1;
c16027
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
+				"%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
c16027
+				"%s.\n",
c16027
+				agmt_get_long_name(prp->agmt),
c16027
+				uniqueid, csn_str,
c16027
+				connection_error ? ldap_err2string(connection_error) : "Timeout");
c16027
+		}
c16027
+		else if (CONN_LOCAL_ERROR == replay_crc)
c16027
+		{
c16027
+			/*
c16027
+			 * Something bad happened on the local server - enter
c16027
+			 * backoff state.
c16027
+			 */
c16027
+			return_value = UPDATE_TRANSIENT_ERROR;
c16027
+			*finished = 1;
c16027
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
+				"%s: Failed to replay change (uniqueid %s, CSN %s): "
c16027
+				"Local error. Will retry later.\n",
c16027
+				agmt_get_long_name(prp->agmt),
c16027
+				uniqueid, csn_str);
c16027
+		}
c16027
+		if (*finished){
c16027
+			/*
c16027
+			 * A serious error has occurred, the consumer might have closed
c16027
+			 * the connection already, but we need to close the conn on the
c16027
+			 * supplier side to properly set the conn structure as closed.
c16027
+			 */
c16027
+			conn_disconnect(prp->conn);
c16027
+		}
c16027
+	}
c16027
+	else
c16027
+	{
c16027
+		/* Positive response received */
c16027
+		(*num_changes_sent)++;
c16027
+		agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
c16027
+	}
c16027
+	return return_value;
c16027
 }
c16027
 
c16027
 /*
c16027
@@ -1556,7 +1568,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
c16027
 {
c16027
 	CL5Entry entry;
c16027
 	slapi_operation_parameters op;
c16027
-	int return_value;
c16027
+	int return_value = 0;
c16027
 	int rc;
c16027
 	CL5ReplayIterator *changelog_iterator;
c16027
 	int message_id = 0;
c16027
@@ -1929,9 +1941,23 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
c16027
 		{
c16027
 			/* We need to ensure that we wait until all the responses have been received from our operations */
c16027
 			if (return_value != UPDATE_CONNECTION_LOST) {
c16027
-				rd->WaitForAsyncResults = agmt_get_WaitForAsyncResults(prp->agmt);
c16027
-				/* if connection was lost/closed, there will be nothing to read */
c16027
-				repl5_inc_waitfor_async_results(rd);
c16027
+				/*
c16027
+				 * If we already have an error, there is no need to check the
c16027
+				 * async result thread anymore.
c16027
+				 */
c16027
+				if (return_value == UPDATE_NO_MORE_UPDATES)
c16027
+				{
c16027
+					/*
c16027
+					 * We need to double check that an error hasn't popped up from
c16027
+					 * the async result thread since our last check.
c16027
+					 */
c16027
+					int final_result;
c16027
+
c16027
+					rd->WaitForAsyncResults = agmt_get_WaitForAsyncResults(prp->agmt);
c16027
+					if((final_result = repl5_inc_waitfor_async_results(rd))){
c16027
+						return_value = final_result;
c16027
+					}
c16027
+				}
c16027
 			}
c16027
 
c16027
 			rc = repl5_inc_destroy_async_result_thread(rd);
c16027
@@ -2220,7 +2246,7 @@ examine_update_vector(Private_Repl_Protocol *prp, RUV *remote_ruv)
c16027
  * We stop if there's some indication that the server just completely
c16027
  * failed to process the operation, e.g. LDAP_OPERATIONS_ERROR.
c16027
  */
c16027
-static PRBool
c16027
+PRBool
c16027
 ignore_error_and_keep_going(int error)
c16027
 {
c16027
 	int return_value = PR_FALSE;
c16027
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
c16027
index 8992055..c2fa214 100644
c16027
--- a/ldap/servers/plugins/replication/repl5_plugins.c
c16027
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
c16027
@@ -1231,12 +1231,13 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
c16027
 static int
c16027
 process_postop (Slapi_PBlock *pb)
c16027
 {
c16027
-    int rc = LDAP_SUCCESS;
c16027
-    Slapi_Operation *op;
c16027
+	Slapi_Operation *op;
c16027
 	Slapi_Backend *be;
c16027
-    int is_replicated_operation = 0;
c16027
+	int is_replicated_operation = 0;
c16027
 	CSN *opcsn = NULL;
c16027
 	char sessionid[REPL_SESSION_ID_SIZE];
c16027
+	int retval = LDAP_SUCCESS;
c16027
+	int rc = 0;
c16027
 
c16027
     /* we just let fixup operations through */
c16027
     slapi_pblock_get( pb, SLAPI_OPERATION, &op );
c16027
@@ -1260,8 +1261,8 @@ process_postop (Slapi_PBlock *pb)
c16027
 
c16027
 	get_repl_session_id (pb, sessionid, &opcsn);
c16027
 
c16027
-	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
c16027
-	if (rc == LDAP_SUCCESS)
c16027
+	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &retval);
c16027
+	if (retval == LDAP_SUCCESS)
c16027
 	{
c16027
 		agmtlist_notify_all(pb);
c16027
 		rc = SLAPI_PLUGIN_SUCCESS;
c16027
@@ -1306,6 +1307,55 @@ process_postop (Slapi_PBlock *pb)
c16027
 			slapi_ch_free((void **) &op_params->p.p_modrdn.modrdn_newsuperior_address.uniqueid);
c16027
 		}
c16027
 	}
c16027
+	if (!ignore_error_and_keep_going(retval)){
c16027
+		/*
c16027
+		 * We have an error we can't ignore.  Release the replica and close
c16027
+		 * the connection to stop the replication session.
c16027
+		 */
c16027
+		consumer_connection_extension *connext = NULL;
c16027
+		Slapi_Connection *conn = NULL;
c16027
+		char csn_str[CSN_STRSIZE] = {'\0'};
c16027
+		PRUint64 connid = 0;
c16027
+		int opid = 0;
c16027
+
c16027
+		slapi_pblock_get(pb, SLAPI_CONNECTION, &conn;;
c16027
+		slapi_pblock_get(pb, SLAPI_OPERATION_ID, &opid);
c16027
+		slapi_pblock_get(pb, SLAPI_CONN_ID, &connid);
c16027
+		if (conn)
c16027
+		{
c16027
+			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
c16027
+				"process_postop: Failed to apply update (%s) error (%d).  "
c16027
+				"Aborting replication session(conn=%" NSPRIu64 " op=%d)\n",
c16027
+				csn_as_string(opcsn, PR_FALSE, csn_str), retval,
c16027
+				connid, opid);
c16027
+			/*
c16027
+			 * Release this replica so new sessions can begin
c16027
+			 */
c16027
+			connext = consumer_connection_extension_acquire_exclusive_access(conn, connid, opid);
c16027
+			if (connext && connext->replica_acquired)
c16027
+			{
c16027
+				int zero = 0;
c16027
+				Replica *r = (Replica*)object_get_data ((Object*)connext->replica_acquired);
c16027
+
c16027
+				replica_relinquish_exclusive_access(r, connid, opid);
c16027
+				object_release ((Object*)connext->replica_acquired);
c16027
+				connext->replica_acquired = NULL;
c16027
+				connext->isreplicationsession = 0;
c16027
+				slapi_pblock_set( pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero );
c16027
+			}
c16027
+			if (connext){
c16027
+				consumer_connection_extension_relinquish_exclusive_access(conn, connid, opid, PR_FALSE);
c16027
+			}
c16027
+
c16027
+			/*
c16027
+			 * Close the connection to end the current session with the
c16027
+			 * supplier.  This prevents new updates from coming in and
c16027
+			 * updating the consumer RUV - which would cause this failed
c16027
+			 * update to be never be replayed.
c16027
+			 */
c16027
+			slapi_disconnect_server(conn);
c16027
+		}
c16027
+	}
c16027
 	if (NULL == opcsn)
c16027
 		opcsn = operation_get_csn(op);
c16027
 	if (opcsn)
c16027
diff --git a/ldap/servers/plugins/replication/urp.c b/ldap/servers/plugins/replication/urp.c
c16027
index 5fe6f55..8d0d735 100644
c16027
--- a/ldap/servers/plugins/replication/urp.c
c16027
+++ b/ldap/servers/plugins/replication/urp.c
c16027
@@ -122,7 +122,7 @@ urp_add_operation( Slapi_PBlock *pb )
c16027
 		slapi_log_error(slapi_log_urp, sessionid,
c16027
 		          "urp_add (%s): an entry with this uniqueid already exists.\n",
c16027
 		          slapi_entry_get_dn_const(existing_uniqueid_entry));
c16027
-		op_result= LDAP_UNWILLING_TO_PERFORM;
c16027
+		op_result= LDAP_ALREADY_EXISTS;
c16027
 		slapi_pblock_set(pb, SLAPI_RESULT_CODE, &op_result);
c16027
 		rc = SLAPI_PLUGIN_NOOP; /* Ignore this Operation */
c16027
 		PROFILE_POINT; /* Add Conflict; UniqueID Exists;  Ignore */
c16027
-- 
c16027
2.4.11
c16027