amoralej / rpms / 389-ds-base

Forked from rpms/389-ds-base 5 years ago
Clone
Blob Blame History Raw
From 6b5aa0e288f1ea5553d4dd5d220d4e5daf50a247 Mon Sep 17 00:00:00 2001
From: Mark Reynolds <mreynolds@redhat.com>
Date: Mon, 31 Jul 2017 14:45:50 -0400
Subject: [PATCH] Ticket 49287 - v3 extend csnpl handling to multiple backends

        The csn pending list mechanism failed if internal operation affected multiple backends

        This fix is an extension to the fix in ticket 49008, the thread local data now also contains
        a list of all affected replicas.

        http://www.port389.org/docs/389ds/design/csn-pending-lists-and-ruv-update.html

        Reviewed by: William, Thierry - thanks
---
 ldap/servers/plugins/replication/csnpl.c         |  85 ++++++++--
 ldap/servers/plugins/replication/csnpl.h         |   8 +-
 ldap/servers/plugins/replication/repl5.h         |  22 ++-
 ldap/servers/plugins/replication/repl5_init.c    |  48 +++++-
 ldap/servers/plugins/replication/repl5_plugins.c |  16 +-
 ldap/servers/plugins/replication/repl5_replica.c |  18 ++-
 ldap/servers/plugins/replication/repl5_ruv.c     | 191 ++++++++++++++---------
 ldap/servers/plugins/replication/repl5_ruv.h     |   6 +-
 ldap/servers/slapd/slapi-private.h               |   2 +-
 9 files changed, 283 insertions(+), 113 deletions(-)

diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
index 4a0f5f5..12a0bb8 100644
--- a/ldap/servers/plugins/replication/csnpl.c
+++ b/ldap/servers/plugins/replication/csnpl.c
@@ -14,7 +14,6 @@
 
 #include "csnpl.h"
 #include "llist.h"
-#include "repl_shared.h"
 
 struct csnpl 
 {
@@ -22,13 +21,17 @@ struct csnpl
 	Slapi_RWLock*	csnLock;	/* lock to serialize access to PL */
 };	
 
+
 typedef struct _csnpldata
 {
 	PRBool	committed;  /* True if CSN committed */
 	CSN	*csn;       /* The actual CSN */
+	Replica * prim_replica; /* The replica where the prom csn was generated */
 	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
 } csnpldata;
 
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx);
+
 /* forward declarations */
 #ifdef DEBUG
 static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller);
@@ -104,7 +107,7 @@ void csnplFree (CSNPL **csnpl)
  *          1 if the csn has already been seen
  *         -1 for any other kind of errors
  */
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn)
 {
 	int rc;
 	csnpldata *csnplnode;
@@ -129,10 +132,13 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
         return 1;
     }
 
-	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
+	csnplnode = (csnpldata *)slapi_ch_calloc(1, sizeof(csnpldata));
 	csnplnode->committed = PR_FALSE;
 	csnplnode->csn = csn_dup(csn);
-	csnplnode->prim_csn = prim_csn;
+	if (prim_csn) {
+		csnplnode->prim_csn = prim_csn->prim_csn;
+		csnplnode->prim_replica =  prim_csn->prim_repl;
+	}
 	csn_as_string(csn, PR_FALSE, csn_str);
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
 
@@ -187,8 +193,58 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
 
 	return 0;
 }
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx)
+{
+    if (csn_ctx == NULL)
+        return PR_FALSE;
+    
+    if (replica != csn_ctx->prim_repl) {
+        /* The CSNs are not from the same replication topology
+         * so even if the csn values are equal they are not related
+         * to the same operation
+         */
+        return PR_FALSE;
+    }
+    
+    /* Here the two CSNs belong to the same replication topology */
+    
+    /* check if the CSN identifies the primary update */
+    if (csn_is_equal(csn, csn_ctx->prim_csn)) {
+        return PR_TRUE;
+    }
+    
+    return PR_FALSE;
+}
+
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx)
+{
+    if ((csn_data == NULL) || (csn_ctx == NULL))
+        return PR_FALSE;
+    
+    if (csn_data->prim_replica != csn_ctx->prim_repl) {
+        /* The CSNs are not from the same replication topology
+         * so even if the csn values are equal they are not related
+         * to the same operation
+         */
+        return PR_FALSE;
+    }
+    
+    /* Here the two CSNs belong to the same replication topology */
+    
+    /* First check if the CSN identifies the primary update */
+    if (csn_is_equal(csn_data->csn, csn_ctx->prim_csn)) {
+        return PR_TRUE;
+    }
+    
+    /* Second check if the CSN identifies a nested update */
+    if (csn_is_equal(csn_data->prim_csn, csn_ctx->prim_csn)) {
+        return PR_TRUE;
+    }
+    
+    return PR_FALSE;
+}
 
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
 {
 	csnpldata *data;
 	void *iterator;
@@ -197,8 +253,7 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
 	while (NULL != data)
 	{
-		if (csn_is_equal(data->csn, csn) ||
-		    csn_is_equal(data->prim_csn, csn)) {
+		if (csn_primary_or_nested(data, csn_ctx)) {
 			csnpldata_free(&data);
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
 		} else {
@@ -213,13 +268,13 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
 }
 
 
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
 {
 	csnpldata *data;
 	void *iterator;
 	char csn_str[CSN_STRSIZE];
 
-	csn_as_string(csn, PR_FALSE, csn_str);
+	csn_as_string(csn_ctx->prim_csn, PR_FALSE, csn_str);
 	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
 		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
 	slapi_rwlock_wrlock (csnpl->csnLock);
@@ -229,8 +284,7 @@ int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
 		csn_as_string(data->csn, PR_FALSE, csn_str);
 		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
 				"csnplCommitALL: processing data csn %s\n", csn_str);
-		if (csn_is_equal(data->csn, csn) ||
-		    csn_is_equal(data->prim_csn, csn)) {
+                if (csn_primary_or_nested(data, csn_ctx)) {
 			data->committed = PR_TRUE;
 		}
 		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
@@ -395,7 +449,12 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
 
 /* wrapper around csn_free, to satisfy NSPR thread context API */
 void
-csnplFreeCSN (void *arg)
+csnplFreeCSNPL_CTX (void *arg)
 {
-	csn_free((CSN **)&arg);
+	CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)arg;
+	csn_free(&csnpl_ctx->prim_csn);
+	if (csnpl_ctx->sec_repl) {
+		slapi_ch_free((void **)&csnpl_ctx->sec_repl);
+	}
+	slapi_ch_free((void **)&csnpl_ctx);
 }
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
index 594c8f2..1036c62 100644
--- a/ldap/servers/plugins/replication/csnpl.h
+++ b/ldap/servers/plugins/replication/csnpl.h
@@ -17,15 +17,17 @@
 #define CSNPL_H
 
 #include "slapi-private.h"
+#include "repl5.h"
 
 typedef struct csnpl CSNPL;
 
 CSNPL* csnplNew(void);
 void csnplFree (CSNPL **csnpl);
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn);
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx);
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 1d8989c..718f64e 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -228,12 +228,27 @@ int multimaster_be_betxnpostop_delete (Slapi_PBlock *pb);
 int multimaster_be_betxnpostop_add (Slapi_PBlock *pb);
 int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
 
+/* In repl5_replica.c */
+typedef struct replica Replica;
+
+/* csn pending lists */
+#define CSNPL_CTX_REPLCNT 4
+typedef struct CSNPL_CTX
+{
+	CSN *prim_csn;
+	size_t repl_alloc; /* max number of replicas  */
+	size_t repl_cnt; /* number of replicas affected by operation */
+	Replica *prim_repl; /* pirmary replica */
+	Replica **sec_repl; /* additional replicas affected */
+} CSNPL_CTX;
+
 /* In repl5_init.c */
 extern int repl5_is_betxn;
 char* get_thread_private_agmtname(void);
 void  set_thread_private_agmtname (const char *agmtname);
-void  set_thread_primary_csn (const CSN *prim_csn);
-CSN*  get_thread_primary_csn(void);
+void  set_thread_primary_csn (const CSN *prim_csn, Replica *repl);
+void  add_replica_to_primcsn(CSNPL_CTX *prim_csn, Replica *repl);
+CSNPL_CTX*  get_thread_primary_csn(void);
 void* get_thread_private_cache(void);
 void  set_thread_private_cache (void *buf);
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
@@ -302,7 +317,6 @@ typedef struct repl_bos Repl_Bos;
 
 /* In repl5_agmt.c */
 typedef struct repl5agmt Repl_Agmt;
-typedef struct replica Replica;
 
 #define TRANSPORT_FLAG_SSL 1
 #define TRANSPORT_FLAG_TLS 2
@@ -629,6 +643,8 @@ PRUint64 replica_get_precise_purging(Replica *r);
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
 PRBool ignore_error_and_keep_going(int error);
 void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
+void replica_lock_replica(Replica *r);
+void replica_unlock_replica(Replica *r);
 
 /* The functions below handles the state flag */
 /* Current internal state flags */
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
index edffb84..b0bc515 100644
--- a/ldap/servers/plugins/replication/repl5_init.c
+++ b/ldap/servers/plugins/replication/repl5_init.c
@@ -154,26 +154,62 @@ set_thread_private_agmtname(const char *agmtname)
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
 }
 
-CSN*
+CSNPL_CTX*
 get_thread_primary_csn(void)
 {
-	CSN *prim_csn = NULL;
+	CSNPL_CTX *prim_csn = NULL;
 	if (thread_primary_csn)
-		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
+		prim_csn = (CSNPL_CTX *)PR_GetThreadPrivate(thread_primary_csn);
+
 	return prim_csn;
 }
 void
-set_thread_primary_csn(const CSN *prim_csn)
+set_thread_primary_csn (const CSN *prim_csn, Replica *repl)
 {
 	if (thread_primary_csn) {
 		if (prim_csn) {
-			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
+			CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)slapi_ch_calloc(1,sizeof(CSNPL_CTX));
+			csnpl_ctx->prim_csn = csn_dup(prim_csn);
+			/* repl_alloc, repl_cnt and sec_repl are 0 by calloc */
+			csnpl_ctx->prim_repl = repl;
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csnpl_ctx);
 		} else {
 			PR_SetThreadPrivate(thread_primary_csn, NULL);
 		}
 	}
 }
 
+void
+add_replica_to_primcsn(CSNPL_CTX *csnpl_ctx, Replica *repl)
+{
+	size_t found = 0;
+	size_t it = 0;
+
+	if (repl == csnpl_ctx->prim_repl) return;
+
+	while (it < csnpl_ctx->repl_cnt) {
+		if (csnpl_ctx->sec_repl[it] == repl) {
+			found = 1;
+			break;
+		}
+		it++;
+	}
+	if (found) return;
+
+	if (csnpl_ctx->repl_cnt < csnpl_ctx->repl_alloc) {
+		csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
+		return;
+	}
+	csnpl_ctx->repl_alloc += CSNPL_CTX_REPLCNT;
+	if (csnpl_ctx->repl_cnt == 0) {
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_calloc(csnpl_ctx->repl_alloc, sizeof(Replica *));
+	} else {
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_realloc((char *)csnpl_ctx->sec_repl, csnpl_ctx->repl_alloc * sizeof(Replica *));
+	}
+	csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
+	return;
+}
+
 void*
 get_thread_private_cache ()
 {
@@ -740,7 +776,7 @@ multimaster_start( Slapi_PBlock *pb )
 		/* Initialize thread private data for logging. Ignore if fails */
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
-		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSNPL_CTX);
 
 		/* Decode the command line args to see if we're dumping to LDIF */
 		is_ldif_dump = check_for_ldif_dump(pb);
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
index 9ef06af..c31d9d5 100644
--- a/ldap/servers/plugins/replication/repl5_plugins.c
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
@@ -45,6 +45,7 @@
 #include "repl.h"
 #include "cl5_api.h"
 #include "urp.h"
+#include "csnpl.h"
 
 static char *local_purl = NULL;
 static char *purl_attrs[] = {"nsslapd-localhost", "nsslapd-port", "nsslapd-secureport", NULL};
@@ -1034,7 +1035,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 {
 	Slapi_Operation *op = NULL;
 	CSN *opcsn;
-	CSN *prim_csn;
+	CSNPL_CTX *prim_csn;
 	int rc;
 	slapi_operation_parameters *op_params = NULL;
 	Object *repl_obj = NULL;
@@ -1070,14 +1071,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 	if (repl_obj == NULL)
 		return return_value;
 
+	r = (Replica*)object_get_data (repl_obj);
+	PR_ASSERT (r);
+
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
 	if (rc) { /* op failed - just return */
 		cancel_opcsn(pb);
 		goto common_return;
 	}
 
-	r = (Replica*)object_get_data (repl_obj);
-	PR_ASSERT (r);
 
 	replica_check_release_timeout(r, pb);
 
@@ -1223,12 +1225,12 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
 common_return:
 	opcsn = operation_get_csn(op);
 	prim_csn = get_thread_primary_csn();
-	if (csn_is_equal(opcsn, prim_csn)) {
+	if (csn_primary(r, opcsn, prim_csn)) {
 		if (return_value == 0) {
 			/* the primary csn was succesfully committed
 			 * unset it in the thread local data
 			 */
-			set_thread_primary_csn(NULL);
+			set_thread_primary_csn(NULL, NULL);
 		}
 	}
 	if (repl_obj) {
@@ -1430,7 +1432,7 @@ cancel_opcsn (Slapi_PBlock *pb)
 
             ruv_obj = replica_get_ruv (r);
             PR_ASSERT (ruv_obj);
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
+            ruv_cancel_csn_inprogress (r, (RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
             object_release (ruv_obj);
         }
 
@@ -1491,7 +1493,7 @@ process_operation (Slapi_PBlock *pb, const CSN *csn)
     ruv = (RUV*)object_get_data (ruv_obj);
     PR_ASSERT (ruv);
  
-    rc = ruv_add_csn_inprogress (ruv, csn);
+    rc = ruv_add_csn_inprogress (r, ruv, csn);
 
     object_release (ruv_obj);
     object_release (r_obj);
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
index 1bdc138..7927ac3 100644
--- a/ldap/servers/plugins/replication/repl5_replica.c
+++ b/ldap/servers/plugins/replication/repl5_replica.c
@@ -923,7 +923,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
 					}
 				}
 				/* Update max csn for local and remote replicas */
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r, r->repl_rid);
 				if (RUV_COVERS_CSN == rc)
 				{
 					slapi_log_err(SLAPI_LOG_REPL,
@@ -3663,7 +3663,7 @@ assign_csn_callback(const CSN *csn, void *data)
         }
     }
 
-    ruv_add_csn_inprogress (ruv, csn);
+    ruv_add_csn_inprogress (r, ruv, csn);
 
     replica_unlock(r->repl_lock);
 
@@ -3692,13 +3692,13 @@ abort_csn_callback(const CSN *csn, void *data)
     {
         int rc = csnplRemove(r->min_csn_pl, csn);
         if (rc) {
-            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed");
+            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed\n");
             replica_unlock(r->repl_lock);
             return;
         }
     }
 
-    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
+    ruv_cancel_csn_inprogress (r, ruv, csn, replica_get_rid(r));
     replica_unlock(r->repl_lock);
 
     object_release (ruv_obj);
@@ -4489,3 +4489,13 @@ replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
 	}
 	replica_unlock(r->repl_lock);
 }
+void
+replica_lock_replica(Replica *r)
+{
+	replica_lock(r->repl_lock);
+}
+void
+replica_unlock_replica(Replica *r)
+{
+	replica_unlock(r->repl_lock);
+}
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
index d59e6d2..39449b6 100644
--- a/ldap/servers/plugins/replication/repl5_ruv.c
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
@@ -77,7 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
 static const char * const prefix_replicageneration = "{replicageneration}";
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
 
-static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal);
 
 /* API implementation */
 
@@ -1599,13 +1599,13 @@ ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile)
 
 /* this function notifies the ruv that there are operations in progress so that
    they can be added to the pending list for the appropriate client. */
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn)
 {
     RUVElement* replica;
     char csn_str[CSN_STRSIZE];
     int rc = RUV_SUCCESS;
     int rid = csn_get_replicaid (csn);
-    CSN *prim_csn;
+    CSNPL_CTX *prim_csn;
 
     PR_ASSERT (ruv && csn);
 
@@ -1645,8 +1645,13 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
     }
     prim_csn = get_thread_primary_csn();
     if (prim_csn == NULL) {
-        set_thread_primary_csn(csn);
+        set_thread_primary_csn(csn, (Replica *)repl);
         prim_csn = get_thread_primary_csn();
+    } else {
+	/* the prim csn data already exist, need to check if
+	 * current replica is already present
+	 */
+	add_replica_to_primcsn(prim_csn, (Replica *)repl);
     }
     rc = csnplInsert (replica->csnpl, csn, prim_csn);
     if (rc == 1)    /* we already seen this csn */
@@ -1656,7 +1661,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
                             "The csn %s has already be seen - ignoring\n",
                             csn_as_string (csn, PR_FALSE, csn_str));
         }
-        set_thread_primary_csn(NULL);
+        set_thread_primary_csn(NULL, NULL);
         rc = RUV_COVERS_CSN;    
     }
     else if(rc != 0)
@@ -1681,11 +1686,13 @@ done:
     return rc;
 }
 
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId local_rid)
 {
-    RUVElement* replica;
+    RUVElement* repl_ruv;
     int rc = RUV_SUCCESS;
-    CSN *prim_csn = NULL;
+    CSNPL_CTX *prim_csn = NULL;
+    Replica *repl_it;
+    size_t it;
 
 
     PR_ASSERT (ruv && csn);
@@ -1693,29 +1700,53 @@ int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
     prim_csn = get_thread_primary_csn();
     /* locate ruvElement */
     slapi_rwlock_wrlock (ruv->lock);
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
-    if (replica == NULL) {
+    repl_ruv = ruvGetReplica (ruv, csn_get_replicaid (csn));
+    if (repl_ruv == NULL) {
         /* ONREPL - log error */
 	rc = RUV_NOTFOUND;
 	goto done;
     }
-    if (csn_is_equal(csn, prim_csn)) {
-	/* the prim csn is cancelled, lets remove all dependent csns */
-	ReplicaId prim_rid = csn_get_replicaid (csn);
-	replica = ruvGetReplica (ruv, prim_rid);
-	rc = csnplRemoveAll (replica->csnpl, prim_csn);
-	if (prim_rid != local_rid) {
-		if( local_rid != READ_ONLY_REPLICA_ID) {
-			replica = ruvGetReplica (ruv, local_rid);
-			if (replica) {
-				rc = csnplRemoveAll (replica->csnpl, prim_csn);
-			} else {
-				rc = RUV_NOTFOUND;
-			}
-		}
-	}
+    if (csn_primary(repl, csn, prim_csn)) {
+        /* the prim csn is cancelled, lets remove all dependent csns */
+        /* for the primary replica we can have modifications for two RIDS:
+         * - the local RID for direct or internal operations
+         * - a remote RID if the primary csn is for a replciated op.
+         */
+        ReplicaId prim_rid = csn_get_replicaid(csn);
+        repl_ruv = ruvGetReplica(ruv, prim_rid);
+        if (!repl_ruv) {
+            rc = RUV_NOTFOUND;
+            goto done;
+        }
+        rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
+
+        if (prim_rid != local_rid && local_rid != READ_ONLY_REPLICA_ID) {
+            repl_ruv = ruvGetReplica(ruv, local_rid);
+            if (!repl_ruv) {
+                rc = RUV_NOTFOUND;
+                goto done;
+            }
+            rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
+        }
+
+        for (it = 0; it < prim_csn->repl_cnt; it++) {
+            repl_it = prim_csn->sec_repl[it];
+            replica_lock_replica(repl_it);
+            local_rid = replica_get_rid(repl_it);
+            if (local_rid != READ_ONLY_REPLICA_ID) {
+                Object *ruv_obj = replica_get_ruv(repl_it);
+                RUV *ruv_it = object_get_data(ruv_obj);
+                repl_ruv = ruvGetReplica(ruv_it, local_rid);
+                if (repl_ruv) {
+                    rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
+                } else {
+                    rc = RUV_NOTFOUND;
+                }
+            }
+            replica_unlock_replica(repl_it);
+        }
     } else {
-	rc = csnplRemove (replica->csnpl, csn);
+	rc = csnplRemove (repl_ruv->csnpl, csn);
     }
     if (rc != 0)
         rc = RUV_NOTFOUND;
@@ -1727,86 +1758,100 @@ done:
     return rc;
 }
 
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid)
 {
     int rc=RUV_SUCCESS;
-    RUVElement *replica;
+    RUVElement *repl_ruv;
     ReplicaId prim_rid;
+    Replica *repl_it = NULL;
+    size_t it = 0;
 
-    CSN *prim_csn = get_thread_primary_csn();
+    CSNPL_CTX *prim_csn = get_thread_primary_csn();
 
-    if (! csn_is_equal(csn, prim_csn)) {
+    if (! csn_primary(replica, csn, prim_csn)) {
 	/* not a primary csn, nothing to do */
 	return rc;
     }
-    slapi_rwlock_wrlock (ruv->lock);
+
+    /* first handle primary replica 
+     * there can be two ruv elements affected
+     */
     prim_rid = csn_get_replicaid (csn);
-    replica = ruvGetReplica (ruv, local_rid);
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
-    if ( rc || local_rid == prim_rid) goto done;
-    replica = ruvGetReplica (ruv, prim_rid);
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
-done:
+    slapi_rwlock_wrlock (ruv->lock);
+    if ( local_rid != prim_rid) {
+	repl_ruv = ruvGetReplica (ruv, prim_rid);
+	rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_FALSE);
+    }
+    repl_ruv = ruvGetReplica (ruv, local_rid);
+    rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_TRUE);
     slapi_rwlock_unlock (ruv->lock);
+    if (rc) return rc;
+
+    /* now handle secondary replicas */
+    for (it=0; it<prim_csn->repl_cnt; it++) {
+	repl_it = prim_csn->sec_repl[it];
+	replica_lock_replica(repl_it);
+	Object *ruv_obj = replica_get_ruv (repl_it);
+	RUV *ruv_it = object_get_data (ruv_obj);
+	slapi_rwlock_wrlock (ruv_it->lock);
+	repl_ruv = ruvGetReplica (ruv_it, replica_get_rid(repl_it));
+	rc = ruv_update_ruv_element(ruv_it, repl_ruv, prim_csn, replica_purl, PR_TRUE);
+	slapi_rwlock_unlock (ruv_it->lock);
+	replica_unlock_replica(repl_it);
+	if (rc) break;
+    }
     return rc;
 }
+
 static int
-ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal)
 {
     int rc=RUV_SUCCESS;
     char csn_str[CSN_STRSIZE];
     CSN *max_csn;
     CSN *first_csn = NULL;
     
-    if (replica == NULL)
-    {
+    if (replica == NULL) {
         /* we should have a ruv element at this point because it would have
            been added by ruv_add_inprogress function */
         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
-			            "Can't locate RUV element for replica %d\n", csn_get_replicaid (csn)); 
+                        "Can't locate RUV element for replica %d\n", csn_get_replicaid (prim_csn->prim_csn));
         goto done;
     } 
 
-	if (csnplCommitAll(replica->csnpl, csn) != 0)
-	{
-		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
-			            csn_as_string(csn, PR_FALSE, csn_str));
+    if (csnplCommitAll(replica->csnpl, prim_csn) != 0) {
+        slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
+                        csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
         rc = RUV_CSNPL_ERROR;
         goto done;
-	}
-    else
-    {
+    } else {
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
             slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
-                            "Successfully committed csn %s\n", csn_as_string(csn, PR_FALSE, csn_str));
+                            "Successfully committed csn %s\n", csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
         }
     }
 
-	if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL)
-	{
-#ifdef DEBUG
-		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
-			            csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
-#endif
+    if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL) {
+        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
+                        csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
         /* replica object sets min csn for local replica */
-		if (!isLocal && replica->min_csn == NULL) {
-		  /* bug 559223 - it seems that, under huge stress, a server might pass
-		   * through this code when more than 1 change has already been sent and commited into
-		   * the pending lists... Therefore, as we are trying to set the min_csn ever 
-		   * generated by this replica, we need to set the first_csn as the min csn in the
-		   * ruv */
-		  set_min_csn_nolock(ruv, first_csn, replica_purl);
-		}
-		/* only update the max_csn in the RUV if it is greater than the existing one */
-		rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
-		/* It is possible that first_csn points to max_csn.
-		   We need to free it once */
-		if (max_csn != first_csn) {
-			csn_free(&first_csn); 
-		}
-		csn_free(&max_csn);
-	}
-
+        if (!isLocal && replica->min_csn == NULL) {
+            /* bug 559223 - it seems that, under huge stress, a server might pass
+             * through this code when more than 1 change has already been sent and commited into
+             * the pending lists... Therefore, as we are trying to set the min_csn ever
+             * generated by this replica, we need to set the first_csn as the min csn in the
+             * ruv */
+        set_min_csn_nolock(ruv, first_csn, replica_purl);
+        }
+        /* only update the max_csn in the RUV if it is greater than the existing one */
+        rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
+        /* It is possible that first_csn points to max_csn.
+           We need to free it once */
+        if (max_csn != first_csn) {
+            csn_free(&first_csn);
+        }
+        csn_free(&max_csn);
+    }
 done:
 
     return rc;
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
index c8960fd..f3cd38b 100644
--- a/ldap/servers/plugins/replication/repl5_ruv.h
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
@@ -108,9 +108,9 @@ int ruv_to_bervals(const RUV *ruv, struct berval ***bvals);
 PRInt32 ruv_replica_count (const RUV *ruv);
 char **ruv_get_referrals(const RUV *ruv);
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn);
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId rid);
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid);
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
index 0836d66..3910dbe 100644
--- a/ldap/servers/slapd/slapi-private.h
+++ b/ldap/servers/slapd/slapi-private.h
@@ -193,7 +193,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
    a csn from the set.*/
 int csn_increment_subsequence (CSN *csn);
 
-void csnplFreeCSN (void *arg);
+void csnplFreeCSNPL_CTX (void *arg);
 /*
  * csnset.c
  */
-- 
2.9.4