From e8fad05507f04b517d464bbf4c559e9de4bad53f Mon Sep 17 00:00:00 2001
From: Mark Reynolds <mreynolds@redhat.com>
Date: Sun, 17 Jan 2016 19:33:44 -0500
Subject: [PATCH 375/375] Ticket 47788 - Supplier can skip a failing update,
although it should retry
Bug Description: If a replicated update fails on the consumer,
the update is never tried. This is due to the
replication async result thread missing the failure
before another update is replicated and it succeeds.
This second update that succeeds updates the consumer
RUV. This makes it appear that the consumer is caught
up, and the supplier never resends that original
failed update.
Fix Description: When a replicated update fails, and its an error we can
not ignore, the connection is closed. Which stops the
replication session, and prevents any further updates
coming in and updating the consumer RUV. This allows
the supplier to correctly retry the operation that
failed on the next replication session.
https://fedorahosted.org/389/ticket/47788
Reviewed by: nhosoi, wibrown, and rmeggins (Thanks!!!)
(cherry picked from commit 80c68e202ff2b50a60f35b6683ef26b41609bc56)
(cherry picked from commit 96b3a5b76d9e9f3aa77627198c50741796fbe44c)
---
ldap/servers/plugins/replication/repl5.h | 1 +
.../servers/plugins/replication/repl5_connection.c | 19 +--
.../plugins/replication/repl5_inc_protocol.c | 180 ++++++++++++---------
ldap/servers/plugins/replication/repl5_plugins.c | 60 ++++++-
ldap/servers/plugins/replication/urp.c | 2 +-
5 files changed, 167 insertions(+), 95 deletions(-)
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 66006f6..ee161ef 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -589,6 +589,7 @@ void replica_set_ruv_dirty (Replica *r);
int replica_write_ruv (Replica *r);
char *replica_get_dn(Replica *r);
void replica_check_for_tasks(Replica*r, Slapi_Entry *e);
+PRBool ignore_error_and_keep_going(int error);
/* The functions below handles the state flag */
/* Current internal state flags */
diff --git a/ldap/servers/plugins/replication/repl5_connection.c b/ldap/servers/plugins/replication/repl5_connection.c
index e080a3f..0360f07 100644
--- a/ldap/servers/plugins/replication/repl5_connection.c
+++ b/ldap/servers/plugins/replication/repl5_connection.c
@@ -467,17 +467,17 @@ conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retda
conn->last_ldap_error = rc;
close_connection_internal(conn); /* we already have the lock */
return_value = CONN_NOT_CONNECTED;
+ goto done;
}
else if (IS_DISCONNECT_ERROR(err))
{
conn->last_ldap_error = err;
close_connection_internal(conn); /* we already have the lock */
return_value = CONN_NOT_CONNECTED;
+ goto done;
}
/* Got a result */
- if ((rc == LDAP_SUCCESS) && (err == LDAP_BUSY))
- return_value = CONN_BUSY;
- else if (retoidp)
+ if (retoidp /* total update */)
{
if (!((rc == LDAP_SUCCESS) && (err == LDAP_BUSY)))
{
@@ -506,16 +506,11 @@ conn_read_result_ex(Repl_Connection *conn, char **retoidp, struct berval **retda
}
return_value = LDAP_SUCCESS == conn->last_ldap_error ? CONN_OPERATION_SUCCESS : CONN_OPERATION_FAILED;
}
- /*
- * XXXggood do I need to free matched, referrals,
- * anything else? Or can I pass NULL for the args
- * I'm not interested in?
- */
- /* Good question! Meanwhile, as RTM aproaches, let's free them... */
- slapi_ch_free((void **) &errmsg);
- slapi_ch_free((void **) &matched);
- charray_free(referrals);
conn->status = STATUS_CONNECTED;
+done:
+ slapi_ch_free_string(&errmsg);
+ slapi_ch_free_string(&matched);
+ charray_free(referrals);
}
if (res) ldap_msgfree(res);
PR_Unlock(conn->lock); /* release the conn lock */
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 3268bfd..02b54b3 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -174,7 +174,6 @@ static void protocol_sleep(Private_Repl_Protocol *prp, PRIntervalTime duration);
static int send_updates(Private_Repl_Protocol *prp, RUV *ruv, PRUint32 *num_changes_sent);
static void repl5_inc_backoff_expired(time_t timer_fire_time, void *arg);
static int examine_update_vector(Private_Repl_Protocol *prp, RUV *ruv);
-static PRBool ignore_error_and_keep_going(int error);
static const char* state2name (int state);
static const char* event2name (int event);
static const char* op2string (int op);
@@ -478,11 +477,13 @@ repl5_inc_flow_control_results(Repl_Agmt *agmt, result_data *rd)
PR_Unlock(rd->lock);
}
-static void
+static int
repl5_inc_waitfor_async_results(result_data *rd)
{
int done = 0;
int loops = 0;
+ int rc = UPDATE_NO_MORE_UPDATES;
+
/* Keep pulling results off the LDAP connection until we catch up to the last message id stored in the rd */
while (!done && !slapi_is_shutting_down())
{
@@ -501,6 +502,10 @@ repl5_inc_waitfor_async_results(result_data *rd)
{
done = 1; /* no connection == no more results */
}
+ /*
+ * Return the last operation result
+ */
+ rc = rd->result;
PR_Unlock(rd->lock);
/* If not then sleep a bit */
DS_Sleep(PR_SecondsToInterval(1));
@@ -516,6 +521,7 @@ repl5_inc_waitfor_async_results(result_data *rd)
done = 1;
}
}
+ return rc;
}
/*
@@ -1483,78 +1489,84 @@ static int
repl5_inc_update_from_op_result(Private_Repl_Protocol *prp, ConnResult replay_crc, int connection_error, char *csn_str, char *uniqueid, ReplicaId replica_id, int* finished, PRUint32 *num_changes_sent)
{
int return_value = 0;
-
- /* Indentation is wrong here so we can get a sensible cvs diff */
- if (CONN_OPERATION_SUCCESS != replay_crc)
- {
- /* Figure out what to do next */
- if (CONN_OPERATION_FAILED == replay_crc)
- {
- /* Map ldap error code to return value */
- if (!ignore_error_and_keep_going(connection_error))
- {
- return_value = UPDATE_TRANSIENT_ERROR;
- *finished = 1;
- }
- else
- {
- agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
- }
- slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
- "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
- agmt_get_long_name(prp->agmt),
- uniqueid, csn_str,
- ldap_err2string(connection_error), connection_error,
- *finished ? "Will retry later" : "Skipping");
- }
- else if (CONN_NOT_CONNECTED == replay_crc)
- {
- /* We lost the connection - enter backoff state */
- return_value = UPDATE_CONNECTION_LOST;
- *finished = 1;
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
- "%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
- "%s(%d). Will retry later.\n",
- agmt_get_long_name(prp->agmt),
- uniqueid, csn_str,
- connection_error ? ldap_err2string(connection_error) : "Connection lost",
- connection_error);
- }
- else if (CONN_TIMEOUT == replay_crc)
- {
- return_value = UPDATE_TIMEOUT;
- *finished = 1;
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
- "%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
- "%s.\n",
- agmt_get_long_name(prp->agmt),
- uniqueid, csn_str,
- connection_error ? ldap_err2string(connection_error) : "Timeout");
- }
- else if (CONN_LOCAL_ERROR == replay_crc)
- {
- /*
- * Something bad happened on the local server - enter
- * backoff state.
- */
- return_value = UPDATE_TRANSIENT_ERROR;
- *finished = 1;
- slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
- "%s: Failed to replay change (uniqueid %s, CSN %s): "
- "Local error. Will retry later.\n",
- agmt_get_long_name(prp->agmt),
- uniqueid, csn_str);
- }
-
- }
- else
- {
- /* Positive response received */
- (*num_changes_sent)++;
- agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
- }
- return return_value;
+ if (CONN_OPERATION_SUCCESS != replay_crc)
+ {
+ /* Figure out what to do next */
+ if (CONN_OPERATION_FAILED == replay_crc)
+ {
+ /* Map ldap error code to return value */
+ if (!ignore_error_and_keep_going(connection_error))
+ {
+ return_value = UPDATE_TRANSIENT_ERROR;
+ *finished = 1;
+ }
+ else
+ {
+ agmt_inc_last_update_changecount (prp->agmt, replica_id, 1 /*skipped*/);
+ }
+ slapi_log_error(*finished ? SLAPI_LOG_FATAL : slapi_log_urp, repl_plugin_name,
+ "%s: Consumer failed to replay change (uniqueid %s, CSN %s): %s (%d). %s.\n",
+ agmt_get_long_name(prp->agmt),
+ uniqueid, csn_str,
+ ldap_err2string(connection_error), connection_error,
+ *finished ? "Will retry later" : "Skipping");
+ }
+ else if (CONN_NOT_CONNECTED == replay_crc)
+ {
+ /* We lost the connection - enter backoff state */
+
+ return_value = UPDATE_CONNECTION_LOST;
+ *finished = 1;
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Consumer failed to replay change (uniqueid %s, CSN %s): "
+ "%s(%d). Will retry later.\n",
+ agmt_get_long_name(prp->agmt),
+ uniqueid, csn_str,
+ connection_error ? ldap_err2string(connection_error) : "Connection lost",
+ connection_error);
+ }
+ else if (CONN_TIMEOUT == replay_crc)
+ {
+ return_value = UPDATE_TIMEOUT;
+ *finished = 1;
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Consumer timed out to replay change (uniqueid %s, CSN %s): "
+ "%s.\n",
+ agmt_get_long_name(prp->agmt),
+ uniqueid, csn_str,
+ connection_error ? ldap_err2string(connection_error) : "Timeout");
+ }
+ else if (CONN_LOCAL_ERROR == replay_crc)
+ {
+ /*
+ * Something bad happened on the local server - enter
+ * backoff state.
+ */
+ return_value = UPDATE_TRANSIENT_ERROR;
+ *finished = 1;
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "%s: Failed to replay change (uniqueid %s, CSN %s): "
+ "Local error. Will retry later.\n",
+ agmt_get_long_name(prp->agmt),
+ uniqueid, csn_str);
+ }
+ if (*finished){
+ /*
+ * A serious error has occurred, the consumer might have closed
+ * the connection already, but we need to close the conn on the
+ * supplier side to properly set the conn structure as closed.
+ */
+ conn_disconnect(prp->conn);
+ }
+ }
+ else
+ {
+ /* Positive response received */
+ (*num_changes_sent)++;
+ agmt_inc_last_update_changecount (prp->agmt, replica_id, 0 /*replayed*/);
+ }
+ return return_value;
}
/*
@@ -1572,7 +1584,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
{
CL5Entry entry;
slapi_operation_parameters op;
- int return_value;
+ int return_value = 0;
int rc;
CL5ReplayIterator *changelog_iterator;
int message_id = 0;
@@ -1937,8 +1949,22 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
{
/* We need to ensure that we wait until all the responses have been received from our operations */
if (return_value != UPDATE_CONNECTION_LOST) {
- /* if connection was lost/closed, there will be nothing to read */
- repl5_inc_waitfor_async_results(rd);
+ /*
+ * If we already have an error, there is no need to check the
+ * async result thread anymore.
+ */
+ if (return_value == UPDATE_NO_MORE_UPDATES)
+ {
+ /*
+ * We need to double check that an error hasn't popped up from
+ * the async result thread since our last check.
+ */
+ int final_result;
+
+ if((final_result = repl5_inc_waitfor_async_results(rd))){
+ return_value = final_result;
+ }
+ }
}
rc = repl5_inc_destroy_async_result_thread(rd);
@@ -2228,7 +2254,7 @@ examine_update_vector(Private_Repl_Protocol *prp, RUV *remote_ruv)
* We stop if there's some indication that the server just completely
* failed to process the operation, e.g. LDAP_OPERATIONS_ERROR.
*/
-static PRBool
+PRBool
ignore_error_and_keep_going(int error)
{
int return_value = PR_FALSE;
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
index dddbf15..d49a666 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -1161,12 +1161,13 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
static int
process_postop (Slapi_PBlock *pb)
{
- int rc = LDAP_SUCCESS;
- Slapi_Operation *op;
+ Slapi_Operation *op;
Slapi_Backend *be;
- int is_replicated_operation = 0;
+ int is_replicated_operation = 0;
CSN *opcsn = NULL;
char sessionid[REPL_SESSION_ID_SIZE];
+ int retval = LDAP_SUCCESS;
+ int rc = 0;
/* we just let fixup operations through */
slapi_pblock_get( pb, SLAPI_OPERATION, &op );
@@ -1190,8 +1191,8 @@ process_postop (Slapi_PBlock *pb)
get_repl_session_id (pb, sessionid, &opcsn);
- slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
- if (rc == LDAP_SUCCESS)
+ slapi_pblock_get(pb, SLAPI_RESULT_CODE, &retval);
+ if (retval == LDAP_SUCCESS)
{
agmtlist_notify_all(pb);
}
@@ -1233,6 +1234,55 @@ process_postop (Slapi_PBlock *pb)
slapi_ch_free((void **) &op_params->p.p_modrdn.modrdn_newsuperior_address.uniqueid);
}
}
+ if (!ignore_error_and_keep_going(retval)){
+ /*
+ * We have an error we can't ignore. Release the replica and close
+ * the connection to stop the replication session.
+ */
+ consumer_connection_extension *connext = NULL;
+ Slapi_Connection *conn = NULL;
+ char csn_str[CSN_STRSIZE] = {'\0'};
+ PRUint64 connid = 0;
+ int opid = 0;
+
+ slapi_pblock_get(pb, SLAPI_CONNECTION, &conn);
+ slapi_pblock_get(pb, SLAPI_OPERATION_ID, &opid);
+ slapi_pblock_get(pb, SLAPI_CONN_ID, &connid);
+ if (conn)
+ {
+ slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+ "process_postop: Failed to apply update (%s) error (%d). "
+ "Aborting replication session(conn=%" NSPRIu64 " op=%d)\n",
+ csn_as_string(opcsn, PR_FALSE, csn_str), retval,
+ (long long unsigned int)connid, opid);
+ /*
+ * Release this replica so new sessions can begin
+ */
+ connext = consumer_connection_extension_acquire_exclusive_access(conn, connid, opid);
+ if (connext && connext->replica_acquired)
+ {
+ int zero = 0;
+ Replica *r = (Replica*)object_get_data ((Object*)connext->replica_acquired);
+
+ replica_relinquish_exclusive_access(r, connid, opid);
+ object_release ((Object*)connext->replica_acquired);
+ connext->replica_acquired = NULL;
+ connext->isreplicationsession = 0;
+ slapi_pblock_set( pb, SLAPI_CONN_IS_REPLICATION_SESSION, &zero );
+ }
+ if (connext){
+ consumer_connection_extension_relinquish_exclusive_access(conn, connid, opid, PR_FALSE);
+ }
+
+ /*
+ * Close the connection to end the current session with the
+ * supplier. This prevents new updates from coming in and
+ * updating the consumer RUV - which would cause this failed
+ * update to be never be replayed.
+ */
+ slapi_disconnect_server(conn);
+ }
+ }
if (NULL == opcsn)
opcsn = operation_get_csn(op);
if (opcsn)
diff --git a/ldap/servers/plugins/replication/urp.c b/ldap/servers/plugins/replication/urp.c
index 0182af3..527995c 100644
--- a/ldap/servers/plugins/replication/urp.c
+++ b/ldap/servers/plugins/replication/urp.c
@@ -146,7 +146,7 @@ urp_add_operation( Slapi_PBlock *pb )
slapi_log_error(slapi_log_urp, sessionid,
"urp_add (%s): an entry with this uniqueid already exists.\n",
slapi_entry_get_dn_const(existing_uniqueid_entry));
- op_result= LDAP_UNWILLING_TO_PERFORM;
+ op_result= LDAP_ALREADY_EXISTS;
slapi_pblock_set(pb, SLAPI_RESULT_CODE, &op_result);
rc= -1; /* Ignore this Operation */
PROFILE_POINT; /* Add Conflict; UniqueID Exists; Ignore */
--
2.4.3