amoralej / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 years ago
Clone

Blame SOURCES/0064-Ticket-49008-backport-1.3.5-aborted-operation-can-le.patch

723150
From d1477e4a03d85aec79b497db4531d9e484029139 Mon Sep 17 00:00:00 2001
723150
From: Ludwig Krispenz <lkrispen@redhat.com>
723150
Date: Tue, 24 Jan 2017 15:07:19 +0100
723150
Subject: [PATCH 64/67]     Ticket 49008 backport 1.3.5 : aborted operation can
723150
 leave RUV in incorrect state
723150
723150
        Bug description:
723150
        If a plugin operation succeeded, but the operation itself fails and is aborted the RUV is in an incorrect state (rolled up to the succesful plugin op)
723150
723150
        Fix Decription:
723150
        Introduce a "primary_csn", this is the csn of the main operation, either a client operation or a replicated operation.
723150
        csns generated by internal operations, eg by plugins are secondary csn.
723150
723150
        Maintain the primary csn in thread local data, like it is used for the agreement name (or txn stack): prim_csn.
723150
723150
        Extend the data structure of the pending list to keep prim_csn for each inserted csn
723150
723150
        If a csn is created or received check prim_csn: if it exists use it, if it doesn't exist set it
723150
723150
        when inserting a csn to the pending list pass the prim_csn
723150
723150
        when cancelling a csn, if it is the prim_csn also cancell all secondary csns
723150
723150
        when committing a csn,
723150
723150
        if it is not the primary csn, do nothing
723150
723150
        if it is the prim_csn trigger the pending list rollup, stop at the first not committed csn
723150
723150
        if the RID of the prim_csn is not the local RID also rollup the pending list for the local RID.
723150
723150
        Reviewed by:  Thierry, Thanks
723150
723150
(cherry picked from commit 79a3deafe943a3ce5c31c50272939146d17bd7ac)
723150
---
723150
 ldap/servers/plugins/replication/csnpl.c         | 75 +++++++++++++++++++++---
723150
 ldap/servers/plugins/replication/csnpl.h         |  5 +-
723150
 ldap/servers/plugins/replication/repl5.h         |  2 +
723150
 ldap/servers/plugins/replication/repl5_init.c    | 22 +++++++
723150
 ldap/servers/plugins/replication/repl5_plugins.c | 40 ++++++++-----
723150
 ldap/servers/plugins/replication/repl5_replica.c |  6 +-
723150
 ldap/servers/plugins/replication/repl5_ruv.c     | 74 +++++++++++++++++------
723150
 ldap/servers/plugins/replication/repl5_ruv.h     |  4 +-
723150
 ldap/servers/slapd/csn.c                         | 15 +++++
723150
 ldap/servers/slapd/slapi-private.h               |  2 +
723150
 10 files changed, 195 insertions(+), 50 deletions(-)
723150
723150
diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
723150
index acd38d0..db1ae13 100644
723150
--- a/ldap/servers/plugins/replication/csnpl.c
723150
+++ b/ldap/servers/plugins/replication/csnpl.c
723150
@@ -24,8 +24,9 @@ struct csnpl
723150
 
723150
 typedef struct _csnpldata
723150
 {
723150
-	PRBool		committed;  /* True if CSN committed */
723150
-	CSN			*csn;       /* The actual CSN */
723150
+	PRBool	committed;  /* True if CSN committed */
723150
+	CSN	*csn;       /* The actual CSN */
723150
+	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
723150
 } csnpldata;
723150
 
723150
 /* forward declarations */
723150
@@ -103,7 +104,7 @@ void csnplFree (CSNPL **csnpl)
723150
  *          1 if the csn has already been seen
723150
  *         -1 for any other kind of errors
723150
  */
723150
-int csnplInsert (CSNPL *csnpl, const CSN *csn)
723150
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
723150
 {
723150
 	int rc;
723150
 	csnpldata *csnplnode;
723150
@@ -131,6 +132,7 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn)
723150
 	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
723150
 	csnplnode->committed = PR_FALSE;
723150
 	csnplnode->csn = csn_dup(csn);
723150
+	csnplnode->prim_csn = prim_csn;
723150
 	csn_as_string(csn, PR_FALSE, csn_str);
723150
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
723150
 
723150
@@ -186,6 +188,57 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
723150
 	return 0;
723150
 }
723150
 
723150
+int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
723150
+{
723150
+	csnpldata *data;
723150
+	void *iterator;
723150
+
723150
+	slapi_rwlock_wrlock (csnpl->csnLock);
723150
+	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
723150
+	while (NULL != data)
723150
+	{
723150
+		if (csn_is_equal(data->csn, csn) ||
723150
+		    csn_is_equal(data->prim_csn, csn)) {
723150
+			csnpldata_free(&data);
723150
+			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
723150
+		} else {
723150
+			data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
723150
+		}
723150
+	}
723150
+#ifdef DEBUG
723150
+    _csnplDumpContentNoLock(csnpl, "csnplRemoveAll");
723150
+#endif
723150
+	slapi_rwlock_unlock (csnpl->csnLock);
723150
+	return 0;
723150
+}
723150
+
723150
+
723150
+int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
723150
+{
723150
+	csnpldata *data;
723150
+	void *iterator;
723150
+	char csn_str[CSN_STRSIZE];
723150
+
723150
+	csn_as_string(csn, PR_FALSE, csn_str);
723150
+	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
723150
+		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
723150
+	slapi_rwlock_wrlock (csnpl->csnLock);
723150
+	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
723150
+	while (NULL != data)
723150
+	{
723150
+		csn_as_string(data->csn, PR_FALSE, csn_str);
723150
+		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
723150
+				"csnplCommitALL: processing data csn %s\n", csn_str);
723150
+		if (csn_is_equal(data->csn, csn) ||
723150
+		    csn_is_equal(data->prim_csn, csn)) {
723150
+			data->committed = PR_TRUE;
723150
+		}
723150
+		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
723150
+	}
723150
+	slapi_rwlock_unlock (csnpl->csnLock);
723150
+	return 0;
723150
+}
723150
+
723150
 int csnplCommit (CSNPL *csnpl, const CSN *csn)
723150
 {
723150
 	csnpldata *data;
723150
@@ -276,13 +329,12 @@ csnplRollUp(CSNPL *csnpl, CSN **first_commited)
723150
 	  *first_commited = NULL;
723150
 	}
723150
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
723150
-	while (NULL != data)
723150
+	while (NULL != data && data->committed)
723150
 	{
723150
 		if (NULL != largest_committed_csn && freeit)
723150
 		{
723150
 			csn_free(&largest_committed_csn);
723150
 		}
723150
-		if (data->committed) {
723150
 			freeit = PR_TRUE;
723150
 			largest_committed_csn = data->csn; /* Save it */
723150
 			if (first_commited && (*first_commited == NULL)) {
723150
@@ -294,9 +346,6 @@ csnplRollUp(CSNPL *csnpl, CSN **first_commited)
723150
 			data->csn = NULL;
723150
 			csnpldata_free(&data);
723150
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
723150
-		} else {
723150
-			data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
723150
-		}
723150
 	} 
723150
 
723150
 #ifdef DEBUG
723150
@@ -326,6 +375,7 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
723150
     csnpldata *data;
723150
     void *iterator;
723150
     char csn_str[CSN_STRSIZE];
723150
+    char primcsn_str[CSN_STRSIZE];
723150
     
723150
     data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
723150
 	if (data) {
723150
@@ -334,11 +384,18 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
723150
 	}
723150
     while (data)
723150
     {
723150
-        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s, %s\n",                        
723150
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "%s,(prim %s), %s\n",
723150
                         csn_as_string(data->csn, PR_FALSE, csn_str),
723150
+			data->prim_csn ? csn_as_string(data->prim_csn, PR_FALSE, primcsn_str) : " ",
723150
                         data->committed ? "committed" : "not committed");
723150
         data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
723150
     }
723150
 }
723150
 #endif
723150
 
723150
+/* wrapper around csn_free, to satisfy NSPR thread context API */
723150
+void
723150
+csnplFreeCSN (void *arg)
723150
+{
723150
+	csn_free((CSN **)&arg;;
723150
+}
723150
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
723150
index 32e3ff7..f5c28f5 100644
723150
--- a/ldap/servers/plugins/replication/csnpl.h
723150
+++ b/ldap/servers/plugins/replication/csnpl.h
723150
@@ -22,10 +22,13 @@ typedef struct csnpl CSNPL;
723150
 
723150
 CSNPL* csnplNew ();
723150
 void csnplFree (CSNPL **csnpl);
723150
-int csnplInsert (CSNPL *csnpl, const CSN *csn);
723150
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
723150
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
723150
+int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
723150
+int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
723150
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
723150
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
723150
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
723150
 void csnplDumpContent(CSNPL *csnpl, const char *caller); 
723150
+
723150
 #endif
723150
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
723150
index 4ab2355..27ad416 100644
723150
--- a/ldap/servers/plugins/replication/repl5.h
723150
+++ b/ldap/servers/plugins/replication/repl5.h
723150
@@ -232,6 +232,8 @@ int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
723150
 extern int repl5_is_betxn;
723150
 char* get_thread_private_agmtname ();
723150
 void  set_thread_private_agmtname (const char *agmtname);
723150
+void  set_thread_primary_csn (const CSN *prim_csn);
723150
+CSN*  get_thread_primary_csn(void);
723150
 void* get_thread_private_cache ();
723150
 void  set_thread_private_cache (void *buf);
723150
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
723150
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
723150
index 0304ed5..1570655 100644
723150
--- a/ldap/servers/plugins/replication/repl5_init.c
723150
+++ b/ldap/servers/plugins/replication/repl5_init.c
723150
@@ -136,6 +136,7 @@ static int multimaster_started_flag = 0;
723150
 /* Thread private data and interface */
723150
 static PRUintn thread_private_agmtname;	/* thread private index for logging*/
723150
 static PRUintn thread_private_cache;
723150
+static PRUintn thread_primary_csn;
723150
 
723150
 char*
723150
 get_thread_private_agmtname()
723150
@@ -153,6 +154,26 @@ set_thread_private_agmtname(const char *agmtname)
723150
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
723150
 }
723150
 
723150
+CSN*
723150
+get_thread_primary_csn(void)
723150
+{
723150
+	CSN *prim_csn = NULL;
723150
+	if (thread_primary_csn)
723150
+		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
723150
+	return prim_csn;
723150
+}
723150
+void
723150
+set_thread_primary_csn(const CSN *prim_csn)
723150
+{
723150
+	if (thread_primary_csn) {
723150
+		if (prim_csn) {
723150
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
723150
+		} else {
723150
+			PR_SetThreadPrivate(thread_primary_csn, NULL);
723150
+		}
723150
+	}
723150
+}
723150
+
723150
 void*
723150
 get_thread_private_cache ()
723150
 {
723150
@@ -721,6 +742,7 @@ multimaster_start( Slapi_PBlock *pb )
723150
 		/* Initialize thread private data for logging. Ignore if fails */
723150
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
723150
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
723150
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
723150
 
723150
 		/* Decode the command line args to see if we're dumping to LDIF */
723150
 		is_ldif_dump = check_for_ldif_dump(pb);
723150
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
723150
index b331c81..84624e9 100644
723150
--- a/ldap/servers/plugins/replication/repl5_plugins.c
723150
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
723150
@@ -1033,9 +1033,11 @@ static int
723150
 write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 {
723150
 	Slapi_Operation *op = NULL;
723150
+	CSN *opcsn;
723150
+	CSN *prim_csn;
723150
 	int rc;
723150
 	slapi_operation_parameters *op_params = NULL;
723150
-	Object *repl_obj;
723150
+	Object *repl_obj = NULL;
723150
 	int return_value = SLAPI_PLUGIN_SUCCESS;
723150
 	Replica *r;
723150
 	Slapi_Backend *be;
723150
@@ -1063,17 +1065,17 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 	{
723150
 		return return_value;
723150
 	}
723150
+	/* we only log changes for operations applied to a replica */
723150
+	repl_obj = replica_get_replica_for_op (pb);
723150
+	if (repl_obj == NULL)
723150
+		return return_value;
723150
 
723150
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
723150
 	if (rc) { /* op failed - just return */
723150
-		return return_value;
723150
+		cancel_opcsn(pb);
723150
+		goto common_return;
723150
 	}
723150
 
723150
-	/* we only log changes for operations applied to a replica */
723150
-	repl_obj = replica_get_replica_for_op (pb);
723150
-	if (repl_obj == NULL)
723150
-		return return_value;
723150
- 
723150
 	r = (Replica*)object_get_data (repl_obj);
723150
 	PR_ASSERT (r);
723150
 
723150
@@ -1108,7 +1110,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 
723150
 			slapi_pblock_get (pb, SLAPI_OPERATION_PARAMETERS, &op_params);
723150
 			if (NULL == op_params) {
723150
-				return return_value;
723150
+				goto common_return;
723150
 			}
723150
 
723150
 			/* need to set uniqueid operation parameter */
723150
@@ -1127,19 +1129,18 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 				slapi_pblock_get (pb, SLAPI_ENTRY_PRE_OP, &e);
723150
 			}
723150
 			if (NULL == e) {
723150
-				return return_value;
723150
+				goto common_return;
723150
 			}
723150
 			uniqueid = slapi_entry_get_uniqueid (e);
723150
 			if (NULL == uniqueid) {
723150
-				return return_value;
723150
+				goto common_return;
723150
 			}
723150
 			op_params->target_address.uniqueid = slapi_ch_strdup (uniqueid);
723150
 		} 
723150
 
723150
 		if( op_params->csn && is_cleaned_rid(csn_get_replicaid(op_params->csn))){
723150
 			/* this RID has been cleaned */
723150
-			object_release (repl_obj);
723150
-			return return_value;
723150
+			goto common_return;
723150
 		}
723150
 
723150
 		/* we might have stripped all the mods - in that case we do not
723150
@@ -1152,7 +1153,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 			{
723150
 				slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
723150
 								"write_changelog_and_ruv: Skipped due to DISKFULL\n");
723150
-				return return_value;
723150
+				goto common_return;
723150
 			}
723150
 			slapi_pblock_get(pb, SLAPI_TXN, &txn);
723150
 			rc = cl5WriteOperationTxn(repl_name, repl_gen, op_params, 
723150
@@ -1188,7 +1189,6 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 	*/
723150
 	if (0 == return_value) {
723150
 		char csn_str[CSN_STRSIZE] = {'\0'};
723150
-		CSN *opcsn;
723150
 		int rc;
723150
 		const char *dn = op_params ? REPL_GET_DN(&op_params->target_address) : "unknown";
723150
 		Slapi_DN *sdn = op_params ? (&op_params->target_address)->sdn : NULL;
723150
@@ -1220,7 +1220,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
723150
 		}
723150
 	}
723150
 
723150
-	object_release (repl_obj);
723150
+common_return:
723150
+	opcsn = operation_get_csn(op);
723150
+	prim_csn = get_thread_primary_csn();
723150
+	if (csn_is_equal(opcsn, prim_csn)) {
723150
+		set_thread_primary_csn(NULL);
723150
+	}
723150
+	if (repl_obj) {
723150
+		object_release (repl_obj);
723150
+	}
723150
 	return return_value;
723150
 }
723150
 
723150
@@ -1417,7 +1425,7 @@ cancel_opcsn (Slapi_PBlock *pb)
723150
 
723150
             ruv_obj = replica_get_ruv (r);
723150
             PR_ASSERT (ruv_obj);
723150
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn);
723150
+            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
723150
             object_release (ruv_obj);
723150
         }
723150
 
723150
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
723150
index 7360d97..602653a 100644
723150
--- a/ldap/servers/plugins/replication/repl5_replica.c
723150
+++ b/ldap/servers/plugins/replication/repl5_replica.c
723150
@@ -903,7 +903,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
723150
 					}
723150
 				}
723150
 				/* Update max csn for local and remote replicas */
723150
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, rid == r->repl_rid);
723150
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
723150
 				if (RUV_COVERS_CSN == rc)
723150
 				{
723150
 					slapi_log_error(SLAPI_LOG_REPL,
723150
@@ -3626,7 +3626,7 @@ assign_csn_callback(const CSN *csn, void *data)
723150
 	
723150
     if (NULL != r->min_csn_pl)
723150
     {
723150
-        if (csnplInsert(r->min_csn_pl, csn) != 0)
723150
+        if (csnplInsert(r->min_csn_pl, csn, NULL) != 0)
723150
         {
723150
             char csn_str[CSN_STRSIZE]; /* For logging only */
723150
             /* Ack, we can't keep track of min csn. Punt. */
723150
@@ -3674,7 +3674,7 @@ abort_csn_callback(const CSN *csn, void *data)
723150
         }
723150
     }
723150
 
723150
-    ruv_cancel_csn_inprogress (ruv, csn);
723150
+    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
723150
     replica_unlock(r->repl_lock);
723150
 
723150
     object_release (ruv_obj);
723150
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
723150
index 5d6e1c3..c2d3bb4 100644
723150
--- a/ldap/servers/plugins/replication/repl5_ruv.c
723150
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
723150
@@ -77,6 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
723150
 static const char * const prefix_replicageneration = "{replicageneration}";
723150
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
723150
 
723150
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
723150
 
723150
 /* API implementation */
723150
 
723150
@@ -1602,6 +1603,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
723150
     char csn_str[CSN_STRSIZE];
723150
     int rc = RUV_SUCCESS;
723150
     int rid = csn_get_replicaid (csn);
723150
+    CSN *prim_csn;
723150
 
723150
     PR_ASSERT (ruv && csn);
723150
 
723150
@@ -1639,8 +1641,12 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
723150
         rc = RUV_COVERS_CSN;
723150
         goto done;
723150
     }
723150
-
723150
-    rc = csnplInsert (replica->csnpl, csn);
723150
+    prim_csn = get_thread_primary_csn();
723150
+    if (prim_csn == NULL) {
723150
+        set_thread_primary_csn(csn);
723150
+        prim_csn = get_thread_primary_csn();
723150
+    }
723150
+    rc = csnplInsert (replica->csnpl, csn, prim_csn);
723150
     if (rc == 1)    /* we already seen this csn */
723150
     {
723150
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
723150
@@ -1648,6 +1654,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
723150
                             "the csn %s has already be seen - ignoring\n",
723150
                             csn_as_string (csn, PR_FALSE, csn_str));
723150
         }
723150
+        set_thread_primary_csn(NULL);
723150
         rc = RUV_COVERS_CSN;    
723150
     }
723150
     else if(rc != 0)
723150
@@ -1672,24 +1679,36 @@ done:
723150
     return rc;
723150
 }
723150
 
723150
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn)
723150
+int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
723150
 {
723150
     RUVElement* replica;
723150
     int rc = RUV_SUCCESS;
723150
+    CSN *prim_csn = NULL;
723150
+
723150
 
723150
     PR_ASSERT (ruv && csn);
723150
 
723150
+    prim_csn = get_thread_primary_csn();
723150
     /* locate ruvElement */
723150
     slapi_rwlock_wrlock (ruv->lock);
723150
     replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
723150
-    if (replica == NULL)
723150
-    {
723150
+    if (replica == NULL) {
723150
         /* ONREPL - log error */
723150
-        rc = RUV_NOTFOUND;
723150
-        goto done;
723150
-    } 
723150
-
723150
-    rc = csnplRemove (replica->csnpl, csn);
723150
+	rc = RUV_NOTFOUND;
723150
+	goto done;
723150
+    }
723150
+    if (csn_is_equal(csn, prim_csn)) {
723150
+	/* the prim csn is cancelled, lets remove all dependent csns */
723150
+	ReplicaId prim_rid = csn_get_replicaid (csn);
723150
+	replica = ruvGetReplica (ruv, prim_rid);
723150
+	rc = csnplRemoveAll (replica->csnpl, prim_csn);
723150
+	if (prim_rid != local_rid) {
723150
+		replica = ruvGetReplica (ruv, local_rid);
723150
+		rc = csnplRemoveAll (replica->csnpl, prim_csn);
723150
+	}
723150
+    } else {
723150
+	rc = csnplRemove (replica->csnpl, csn);
723150
+    }
723150
     if (rc != 0)
723150
         rc = RUV_NOTFOUND;
723150
     else
723150
@@ -1700,19 +1719,37 @@ done:
723150
     return rc;
723150
 }
723150
 
723150
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool isLocal)
723150
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
723150
+{
723150
+    int rc=RUV_SUCCESS;
723150
+    RUVElement *replica;
723150
+    ReplicaId prim_rid;
723150
+
723150
+    CSN *prim_csn = get_thread_primary_csn();
723150
+
723150
+    if (! csn_is_equal(csn, prim_csn)) {
723150
+	/* not a primary csn, nothing to do */
723150
+	return rc;
723150
+    }
723150
+    slapi_rwlock_wrlock (ruv->lock);
723150
+    prim_rid = csn_get_replicaid (csn);
723150
+    replica = ruvGetReplica (ruv, local_rid);
723150
+    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
723150
+    if ( rc || local_rid == prim_rid) goto done;
723150
+    replica = ruvGetReplica (ruv, prim_rid);
723150
+    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
723150
+done:
723150
+    slapi_rwlock_unlock (ruv->lock);
723150
+    return rc;
723150
+}
723150
+static int
723150
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
723150
 {
723150
     int rc=RUV_SUCCESS;
723150
     char csn_str[CSN_STRSIZE];
723150
     CSN *max_csn;
723150
     CSN *first_csn = NULL;
723150
-    RUVElement *replica;
723150
     
723150
-    PR_ASSERT (ruv && csn);
723150
-
723150
-    slapi_rwlock_wrlock (ruv->lock);
723150
-
723150
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
723150
     if (replica == NULL)
723150
     {
723150
         /* we should have a ruv element at this point because it would have
723150
@@ -1722,7 +1759,7 @@ int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool i
723150
         goto done;
723150
     } 
723150
 
723150
-	if (csnplCommit(replica->csnpl, csn) != 0)
723150
+	if (csnplCommitAll(replica->csnpl, csn) != 0)
723150
 	{
723150
 		slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "ruv_update_ruv: cannot commit csn %s\n",
723150
 			            csn_as_string(csn, PR_FALSE, csn_str));
723150
@@ -1763,7 +1800,6 @@ int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool i
723150
 	}
723150
 
723150
 done:
723150
-    slapi_rwlock_unlock (ruv->lock);
723150
 
723150
     return rc;
723150
 }
723150
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
723150
index e9eff5a..c8960fd 100644
723150
--- a/ldap/servers/plugins/replication/repl5_ruv.h
723150
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
723150
@@ -109,8 +109,8 @@ PRInt32 ruv_replica_count (const RUV *ruv);
723150
 char **ruv_get_referrals(const RUV *ruv);
723150
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
723150
 int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
723150
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn);
723150
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, PRBool isLocal);
723150
+int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
723150
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
723150
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
723150
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
723150
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
723150
diff --git a/ldap/servers/slapd/csn.c b/ldap/servers/slapd/csn.c
723150
index a3f4815..175f82a 100644
723150
--- a/ldap/servers/slapd/csn.c
723150
+++ b/ldap/servers/slapd/csn.c
723150
@@ -268,6 +268,21 @@ csn_as_attr_option_string(CSNType t,const CSN *csn,char *ss)
723150
 	return s;
723150
 }
723150
 
723150
+int
723150
+csn_is_equal(const CSN *csn1, const CSN *csn2)
723150
+{
723150
+	int retval = 0;
723150
+	if ((csn1 == NULL && csn2 == NULL) ||
723150
+		(csn1 && csn2 &&
723150
+		 csn1->tstamp == csn2->tstamp &&
723150
+		 csn1->seqnum == csn2->seqnum &&
723150
+		 csn1->rid == csn2->rid &&
723150
+		 csn1->subseqnum == csn2->subseqnum)) {
723150
+		retval = 1;
723150
+	}
723150
+	return retval;
723150
+}
723150
+
723150
 int 
723150
 csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags)
723150
 {
723150
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
723150
index 52d1c4a..e909e9c 100644
723150
--- a/ldap/servers/slapd/slapi-private.h
723150
+++ b/ldap/servers/slapd/slapi-private.h
723150
@@ -166,6 +166,7 @@ time_t csn_get_time(const CSN *csn);
723150
 PRUint16 csn_get_seqnum(const CSN *csn);
723150
 PRUint16 csn_get_subseqnum(const CSN *csn);
723150
 char *csn_as_string(const CSN *csn, PRBool replicaIdOrder, char *ss); /* WARNING: ss must be CSN_STRSIZE bytes, or NULL. */
723150
+int csn_is_equal(const CSN *csn1, const CSN *csn2);
723150
 int csn_compare(const CSN *csn1, const CSN *csn2);
723150
 int csn_compare_ext(const CSN *csn1, const CSN *csn2, unsigned int flags);
723150
 #define CSN_COMPARE_SKIP_SUBSEQ 0x1
723150
@@ -181,6 +182,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
723150
    a csn from the set.*/
723150
 int csn_increment_subsequence (CSN *csn);
723150
 
723150
+void csnplFreeCSN (void *arg);
723150
 /*
723150
  * csnset.c
723150
  */
723150
-- 
723150
2.9.3
723150