Blame SOURCES/0057-Ticket-49287-v3-extend-csnpl-handling-to-multiple-ba.patch

61f723
From 6b5aa0e288f1ea5553d4dd5d220d4e5daf50a247 Mon Sep 17 00:00:00 2001
61f723
From: Mark Reynolds <mreynolds@redhat.com>
61f723
Date: Mon, 31 Jul 2017 14:45:50 -0400
61f723
Subject: [PATCH] Ticket 49287 - v3 extend csnpl handling to multiple backends
61f723
61f723
        The csn pending list mechanism failed if internal operation affected multiple backends
61f723
61f723
        This fix is an extension to the fix in ticket 49008, the thread local data now also contains
61f723
        a list of all affected replicas.
61f723
61f723
        http://www.port389.org/docs/389ds/design/csn-pending-lists-and-ruv-update.html
61f723
61f723
        Reviewed by: William, Thierry - thanks
61f723
---
61f723
 ldap/servers/plugins/replication/csnpl.c         |  85 ++++++++--
61f723
 ldap/servers/plugins/replication/csnpl.h         |   8 +-
61f723
 ldap/servers/plugins/replication/repl5.h         |  22 ++-
61f723
 ldap/servers/plugins/replication/repl5_init.c    |  48 +++++-
61f723
 ldap/servers/plugins/replication/repl5_plugins.c |  16 +-
61f723
 ldap/servers/plugins/replication/repl5_replica.c |  18 ++-
61f723
 ldap/servers/plugins/replication/repl5_ruv.c     | 191 ++++++++++++++---------
61f723
 ldap/servers/plugins/replication/repl5_ruv.h     |   6 +-
61f723
 ldap/servers/slapd/slapi-private.h               |   2 +-
61f723
 9 files changed, 283 insertions(+), 113 deletions(-)
61f723
61f723
diff --git a/ldap/servers/plugins/replication/csnpl.c b/ldap/servers/plugins/replication/csnpl.c
61f723
index 4a0f5f5..12a0bb8 100644
61f723
--- a/ldap/servers/plugins/replication/csnpl.c
61f723
+++ b/ldap/servers/plugins/replication/csnpl.c
61f723
@@ -14,7 +14,6 @@
61f723
 
61f723
 #include "csnpl.h"
61f723
 #include "llist.h"
61f723
-#include "repl_shared.h"
61f723
 
61f723
 struct csnpl 
61f723
 {
61f723
@@ -22,13 +21,17 @@ struct csnpl
61f723
 	Slapi_RWLock*	csnLock;	/* lock to serialize access to PL */
61f723
 };	
61f723
 
61f723
+
61f723
 typedef struct _csnpldata
61f723
 {
61f723
 	PRBool	committed;  /* True if CSN committed */
61f723
 	CSN	*csn;       /* The actual CSN */
61f723
+	Replica * prim_replica; /* The replica where the prom csn was generated */
61f723
 	const CSN *prim_csn;  /* The primary CSN of an operation consising of multiple sub ops*/
61f723
 } csnpldata;
61f723
 
61f723
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx);
61f723
+
61f723
 /* forward declarations */
61f723
 #ifdef DEBUG
61f723
 static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller);
61f723
@@ -104,7 +107,7 @@ void csnplFree (CSNPL **csnpl)
61f723
  *          1 if the csn has already been seen
61f723
  *         -1 for any other kind of errors
61f723
  */
61f723
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
61f723
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn)
61f723
 {
61f723
 	int rc;
61f723
 	csnpldata *csnplnode;
61f723
@@ -129,10 +132,13 @@ int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn)
61f723
         return 1;
61f723
     }
61f723
 
61f723
-	csnplnode = (csnpldata *)slapi_ch_malloc(sizeof(csnpldata));
61f723
+	csnplnode = (csnpldata *)slapi_ch_calloc(1, sizeof(csnpldata));
61f723
 	csnplnode->committed = PR_FALSE;
61f723
 	csnplnode->csn = csn_dup(csn);
61f723
-	csnplnode->prim_csn = prim_csn;
61f723
+	if (prim_csn) {
61f723
+		csnplnode->prim_csn = prim_csn->prim_csn;
61f723
+		csnplnode->prim_replica =  prim_csn->prim_repl;
61f723
+	}
61f723
 	csn_as_string(csn, PR_FALSE, csn_str);
61f723
 	rc = llistInsertTail (csnpl->csnList, csn_str, csnplnode);
61f723
 
61f723
@@ -187,8 +193,58 @@ int csnplRemove (CSNPL *csnpl, const CSN *csn)
61f723
 
61f723
 	return 0;
61f723
 }
61f723
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx)
61f723
+{
61f723
+    if (csn_ctx == NULL)
61f723
+        return PR_FALSE;
61f723
+    
61f723
+    if (replica != csn_ctx->prim_repl) {
61f723
+        /* The CSNs are not from the same replication topology
61f723
+         * so even if the csn values are equal they are not related
61f723
+         * to the same operation
61f723
+         */
61f723
+        return PR_FALSE;
61f723
+    }
61f723
+    
61f723
+    /* Here the two CSNs belong to the same replication topology */
61f723
+    
61f723
+    /* check if the CSN identifies the primary update */
61f723
+    if (csn_is_equal(csn, csn_ctx->prim_csn)) {
61f723
+        return PR_TRUE;
61f723
+    }
61f723
+    
61f723
+    return PR_FALSE;
61f723
+}
61f723
+
61f723
+static PRBool csn_primary_or_nested(csnpldata *csn_data,  const CSNPL_CTX *csn_ctx)
61f723
+{
61f723
+    if ((csn_data == NULL) || (csn_ctx == NULL))
61f723
+        return PR_FALSE;
61f723
+    
61f723
+    if (csn_data->prim_replica != csn_ctx->prim_repl) {
61f723
+        /* The CSNs are not from the same replication topology
61f723
+         * so even if the csn values are equal they are not related
61f723
+         * to the same operation
61f723
+         */
61f723
+        return PR_FALSE;
61f723
+    }
61f723
+    
61f723
+    /* Here the two CSNs belong to the same replication topology */
61f723
+    
61f723
+    /* First check if the CSN identifies the primary update */
61f723
+    if (csn_is_equal(csn_data->csn, csn_ctx->prim_csn)) {
61f723
+        return PR_TRUE;
61f723
+    }
61f723
+    
61f723
+    /* Second check if the CSN identifies a nested update */
61f723
+    if (csn_is_equal(csn_data->prim_csn, csn_ctx->prim_csn)) {
61f723
+        return PR_TRUE;
61f723
+    }
61f723
+    
61f723
+    return PR_FALSE;
61f723
+}
61f723
 
61f723
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
61f723
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
61f723
 {
61f723
 	csnpldata *data;
61f723
 	void *iterator;
61f723
@@ -197,8 +253,7 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
61f723
 	data = (csnpldata *)llistGetFirst(csnpl->csnList, &iterator);
61f723
 	while (NULL != data)
61f723
 	{
61f723
-		if (csn_is_equal(data->csn, csn) ||
61f723
-		    csn_is_equal(data->prim_csn, csn)) {
61f723
+		if (csn_primary_or_nested(data, csn_ctx)) {
61f723
 			csnpldata_free(&data);
61f723
 			data = (csnpldata *)llistRemoveCurrentAndGetNext(csnpl->csnList, &iterator);
61f723
 		} else {
61f723
@@ -213,13 +268,13 @@ int csnplRemoveAll (CSNPL *csnpl, const CSN *csn)
61f723
 }
61f723
 
61f723
 
61f723
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
61f723
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx)
61f723
 {
61f723
 	csnpldata *data;
61f723
 	void *iterator;
61f723
 	char csn_str[CSN_STRSIZE];
61f723
 
61f723
-	csn_as_string(csn, PR_FALSE, csn_str);
61f723
+	csn_as_string(csn_ctx->prim_csn, PR_FALSE, csn_str);
61f723
 	slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
61f723
 		            "csnplCommitALL: committing all csns for csn %s\n", csn_str);
61f723
 	slapi_rwlock_wrlock (csnpl->csnLock);
61f723
@@ -229,8 +284,7 @@ int csnplCommitAll (CSNPL *csnpl, const CSN *csn)
61f723
 		csn_as_string(data->csn, PR_FALSE, csn_str);
61f723
 		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name,
61f723
 				"csnplCommitALL: processing data csn %s\n", csn_str);
61f723
-		if (csn_is_equal(data->csn, csn) ||
61f723
-		    csn_is_equal(data->prim_csn, csn)) {
61f723
+                if (csn_primary_or_nested(data, csn_ctx)) {
61f723
 			data->committed = PR_TRUE;
61f723
 		}
61f723
 		data = (csnpldata *)llistGetNext (csnpl->csnList, &iterator);
61f723
@@ -395,7 +449,12 @@ static void _csnplDumpContentNoLock(CSNPL *csnpl, const char *caller)
61f723
 
61f723
 /* wrapper around csn_free, to satisfy NSPR thread context API */
61f723
 void
61f723
-csnplFreeCSN (void *arg)
61f723
+csnplFreeCSNPL_CTX (void *arg)
61f723
 {
61f723
-	csn_free((CSN **)&arg;;
61f723
+	CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)arg;
61f723
+	csn_free(&csnpl_ctx->prim_csn);
61f723
+	if (csnpl_ctx->sec_repl) {
61f723
+		slapi_ch_free((void **)&csnpl_ctx->sec_repl);
61f723
+	}
61f723
+	slapi_ch_free((void **)&csnpl_ctx);
61f723
 }
61f723
diff --git a/ldap/servers/plugins/replication/csnpl.h b/ldap/servers/plugins/replication/csnpl.h
61f723
index 594c8f2..1036c62 100644
61f723
--- a/ldap/servers/plugins/replication/csnpl.h
61f723
+++ b/ldap/servers/plugins/replication/csnpl.h
61f723
@@ -17,15 +17,17 @@
61f723
 #define CSNPL_H
61f723
 
61f723
 #include "slapi-private.h"
61f723
+#include "repl5.h"
61f723
 
61f723
 typedef struct csnpl CSNPL;
61f723
 
61f723
 CSNPL* csnplNew(void);
61f723
 void csnplFree (CSNPL **csnpl);
61f723
-int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSN *prim_csn);
61f723
+int csnplInsert (CSNPL *csnpl, const CSN *csn, const CSNPL_CTX *prim_csn);
61f723
 int csnplRemove (CSNPL *csnpl, const CSN *csn);
61f723
-int csnplRemoveAll (CSNPL *csnpl, const CSN *csn);
61f723
-int csnplCommitAll (CSNPL *csnpl, const CSN *csn);
61f723
+int csnplRemoveAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
61f723
+int csnplCommitAll (CSNPL *csnpl, const CSNPL_CTX *csn_ctx);
61f723
+PRBool csn_primary(Replica *replica, const CSN *csn,  const CSNPL_CTX *csn_ctx);
61f723
 CSN* csnplGetMinCSN (CSNPL *csnpl, PRBool *committed);
61f723
 int csnplCommit (CSNPL *csnpl, const CSN *csn);
61f723
 CSN *csnplRollUp(CSNPL *csnpl, CSN ** first);
61f723
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
61f723
index 1d8989c..718f64e 100644
61f723
--- a/ldap/servers/plugins/replication/repl5.h
61f723
+++ b/ldap/servers/plugins/replication/repl5.h
61f723
@@ -228,12 +228,27 @@ int multimaster_be_betxnpostop_delete (Slapi_PBlock *pb);
61f723
 int multimaster_be_betxnpostop_add (Slapi_PBlock *pb);
61f723
 int multimaster_be_betxnpostop_modify (Slapi_PBlock *pb);
61f723
 
61f723
+/* In repl5_replica.c */
61f723
+typedef struct replica Replica;
61f723
+
61f723
+/* csn pending lists */
61f723
+#define CSNPL_CTX_REPLCNT 4
61f723
+typedef struct CSNPL_CTX
61f723
+{
61f723
+	CSN *prim_csn;
61f723
+	size_t repl_alloc; /* max number of replicas  */
61f723
+	size_t repl_cnt; /* number of replicas affected by operation */
61f723
+	Replica *prim_repl; /* pirmary replica */
61f723
+	Replica **sec_repl; /* additional replicas affected */
61f723
+} CSNPL_CTX;
61f723
+
61f723
 /* In repl5_init.c */
61f723
 extern int repl5_is_betxn;
61f723
 char* get_thread_private_agmtname(void);
61f723
 void  set_thread_private_agmtname (const char *agmtname);
61f723
-void  set_thread_primary_csn (const CSN *prim_csn);
61f723
-CSN*  get_thread_primary_csn(void);
61f723
+void  set_thread_primary_csn (const CSN *prim_csn, Replica *repl);
61f723
+void  add_replica_to_primcsn(CSNPL_CTX *prim_csn, Replica *repl);
61f723
+CSNPL_CTX*  get_thread_primary_csn(void);
61f723
 void* get_thread_private_cache(void);
61f723
 void  set_thread_private_cache (void *buf);
61f723
 char* get_repl_session_id (Slapi_PBlock *pb, char *id, CSN **opcsn);
61f723
@@ -302,7 +317,6 @@ typedef struct repl_bos Repl_Bos;
61f723
 
61f723
 /* In repl5_agmt.c */
61f723
 typedef struct repl5agmt Repl_Agmt;
61f723
-typedef struct replica Replica;
61f723
 
61f723
 #define TRANSPORT_FLAG_SSL 1
61f723
 #define TRANSPORT_FLAG_TLS 2
61f723
@@ -629,6 +643,8 @@ PRUint64 replica_get_precise_purging(Replica *r);
61f723
 void replica_set_precise_purging(Replica *r, PRUint64 on_off);
61f723
 PRBool ignore_error_and_keep_going(int error);
61f723
 void replica_check_release_timeout(Replica *r, Slapi_PBlock *pb);
61f723
+void replica_lock_replica(Replica *r);
61f723
+void replica_unlock_replica(Replica *r);
61f723
 
61f723
 /* The functions below handles the state flag */
61f723
 /* Current internal state flags */
61f723
diff --git a/ldap/servers/plugins/replication/repl5_init.c b/ldap/servers/plugins/replication/repl5_init.c
61f723
index edffb84..b0bc515 100644
61f723
--- a/ldap/servers/plugins/replication/repl5_init.c
61f723
+++ b/ldap/servers/plugins/replication/repl5_init.c
61f723
@@ -154,26 +154,62 @@ set_thread_private_agmtname(const char *agmtname)
61f723
 		PR_SetThreadPrivate(thread_private_agmtname, (void *)agmtname);
61f723
 }
61f723
 
61f723
-CSN*
61f723
+CSNPL_CTX*
61f723
 get_thread_primary_csn(void)
61f723
 {
61f723
-	CSN *prim_csn = NULL;
61f723
+	CSNPL_CTX *prim_csn = NULL;
61f723
 	if (thread_primary_csn)
61f723
-		prim_csn = (CSN *)PR_GetThreadPrivate(thread_primary_csn);
61f723
+		prim_csn = (CSNPL_CTX *)PR_GetThreadPrivate(thread_primary_csn);
61f723
+
61f723
 	return prim_csn;
61f723
 }
61f723
 void
61f723
-set_thread_primary_csn(const CSN *prim_csn)
61f723
+set_thread_primary_csn (const CSN *prim_csn, Replica *repl)
61f723
 {
61f723
 	if (thread_primary_csn) {
61f723
 		if (prim_csn) {
61f723
-			PR_SetThreadPrivate(thread_primary_csn, (void *)csn_dup(prim_csn));
61f723
+			CSNPL_CTX *csnpl_ctx = (CSNPL_CTX *)slapi_ch_calloc(1,sizeof(CSNPL_CTX));
61f723
+			csnpl_ctx->prim_csn = csn_dup(prim_csn);
61f723
+			/* repl_alloc, repl_cnt and sec_repl are 0 by calloc */
61f723
+			csnpl_ctx->prim_repl = repl;
61f723
+			PR_SetThreadPrivate(thread_primary_csn, (void *)csnpl_ctx);
61f723
 		} else {
61f723
 			PR_SetThreadPrivate(thread_primary_csn, NULL);
61f723
 		}
61f723
 	}
61f723
 }
61f723
 
61f723
+void
61f723
+add_replica_to_primcsn(CSNPL_CTX *csnpl_ctx, Replica *repl)
61f723
+{
61f723
+	size_t found = 0;
61f723
+	size_t it = 0;
61f723
+
61f723
+	if (repl == csnpl_ctx->prim_repl) return;
61f723
+
61f723
+	while (it < csnpl_ctx->repl_cnt) {
61f723
+		if (csnpl_ctx->sec_repl[it] == repl) {
61f723
+			found = 1;
61f723
+			break;
61f723
+		}
61f723
+		it++;
61f723
+	}
61f723
+	if (found) return;
61f723
+
61f723
+	if (csnpl_ctx->repl_cnt < csnpl_ctx->repl_alloc) {
61f723
+		csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
61f723
+		return;
61f723
+	}
61f723
+	csnpl_ctx->repl_alloc += CSNPL_CTX_REPLCNT;
61f723
+	if (csnpl_ctx->repl_cnt == 0) {
61f723
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_calloc(csnpl_ctx->repl_alloc, sizeof(Replica *));
61f723
+	} else {
61f723
+		csnpl_ctx->sec_repl = (Replica **)slapi_ch_realloc((char *)csnpl_ctx->sec_repl, csnpl_ctx->repl_alloc * sizeof(Replica *));
61f723
+	}
61f723
+	csnpl_ctx->sec_repl[csnpl_ctx->repl_cnt++] = repl;
61f723
+	return;
61f723
+}
61f723
+
61f723
 void*
61f723
 get_thread_private_cache ()
61f723
 {
61f723
@@ -740,7 +776,7 @@ multimaster_start( Slapi_PBlock *pb )
61f723
 		/* Initialize thread private data for logging. Ignore if fails */
61f723
 		PR_NewThreadPrivateIndex (&thread_private_agmtname, NULL);
61f723
 		PR_NewThreadPrivateIndex (&thread_private_cache, NULL);
61f723
-		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSN);
61f723
+		PR_NewThreadPrivateIndex (&thread_primary_csn, csnplFreeCSNPL_CTX);
61f723
 
61f723
 		/* Decode the command line args to see if we're dumping to LDIF */
61f723
 		is_ldif_dump = check_for_ldif_dump(pb);
61f723
diff --git a/ldap/servers/plugins/replication/repl5_plugins.c b/ldap/servers/plugins/replication/repl5_plugins.c
61f723
index 9ef06af..c31d9d5 100644
61f723
--- a/ldap/servers/plugins/replication/repl5_plugins.c
61f723
+++ b/ldap/servers/plugins/replication/repl5_plugins.c
61f723
@@ -45,6 +45,7 @@
61f723
 #include "repl.h"
61f723
 #include "cl5_api.h"
61f723
 #include "urp.h"
61f723
+#include "csnpl.h"
61f723
 
61f723
 static char *local_purl = NULL;
61f723
 static char *purl_attrs[] = {"nsslapd-localhost", "nsslapd-port", "nsslapd-secureport", NULL};
61f723
@@ -1034,7 +1035,7 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
61f723
 {
61f723
 	Slapi_Operation *op = NULL;
61f723
 	CSN *opcsn;
61f723
-	CSN *prim_csn;
61f723
+	CSNPL_CTX *prim_csn;
61f723
 	int rc;
61f723
 	slapi_operation_parameters *op_params = NULL;
61f723
 	Object *repl_obj = NULL;
61f723
@@ -1070,14 +1071,15 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
61f723
 	if (repl_obj == NULL)
61f723
 		return return_value;
61f723
 
61f723
+	r = (Replica*)object_get_data (repl_obj);
61f723
+	PR_ASSERT (r);
61f723
+
61f723
 	slapi_pblock_get(pb, SLAPI_RESULT_CODE, &rc);
61f723
 	if (rc) { /* op failed - just return */
61f723
 		cancel_opcsn(pb);
61f723
 		goto common_return;
61f723
 	}
61f723
 
61f723
-	r = (Replica*)object_get_data (repl_obj);
61f723
-	PR_ASSERT (r);
61f723
 
61f723
 	replica_check_release_timeout(r, pb);
61f723
 
61f723
@@ -1223,12 +1225,12 @@ write_changelog_and_ruv (Slapi_PBlock *pb)
61f723
 common_return:
61f723
 	opcsn = operation_get_csn(op);
61f723
 	prim_csn = get_thread_primary_csn();
61f723
-	if (csn_is_equal(opcsn, prim_csn)) {
61f723
+	if (csn_primary(r, opcsn, prim_csn)) {
61f723
 		if (return_value == 0) {
61f723
 			/* the primary csn was succesfully committed
61f723
 			 * unset it in the thread local data
61f723
 			 */
61f723
-			set_thread_primary_csn(NULL);
61f723
+			set_thread_primary_csn(NULL, NULL);
61f723
 		}
61f723
 	}
61f723
 	if (repl_obj) {
61f723
@@ -1430,7 +1432,7 @@ cancel_opcsn (Slapi_PBlock *pb)
61f723
 
61f723
             ruv_obj = replica_get_ruv (r);
61f723
             PR_ASSERT (ruv_obj);
61f723
-            ruv_cancel_csn_inprogress ((RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
61f723
+            ruv_cancel_csn_inprogress (r, (RUV*)object_get_data (ruv_obj), opcsn, replica_get_rid(r));
61f723
             object_release (ruv_obj);
61f723
         }
61f723
 
61f723
@@ -1491,7 +1493,7 @@ process_operation (Slapi_PBlock *pb, const CSN *csn)
61f723
     ruv = (RUV*)object_get_data (ruv_obj);
61f723
     PR_ASSERT (ruv);
61f723
  
61f723
-    rc = ruv_add_csn_inprogress (ruv, csn);
61f723
+    rc = ruv_add_csn_inprogress (r, ruv, csn);
61f723
 
61f723
     object_release (ruv_obj);
61f723
     object_release (r_obj);
61f723
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
61f723
index 1bdc138..7927ac3 100644
61f723
--- a/ldap/servers/plugins/replication/repl5_replica.c
61f723
+++ b/ldap/servers/plugins/replication/repl5_replica.c
61f723
@@ -923,7 +923,7 @@ replica_update_ruv(Replica *r, const CSN *updated_csn, const char *replica_purl)
61f723
 					}
61f723
 				}
61f723
 				/* Update max csn for local and remote replicas */
61f723
-				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r->repl_rid);
61f723
+				rc = ruv_update_ruv (ruv, updated_csn, replica_purl, r, r->repl_rid);
61f723
 				if (RUV_COVERS_CSN == rc)
61f723
 				{
61f723
 					slapi_log_err(SLAPI_LOG_REPL,
61f723
@@ -3663,7 +3663,7 @@ assign_csn_callback(const CSN *csn, void *data)
61f723
         }
61f723
     }
61f723
 
61f723
-    ruv_add_csn_inprogress (ruv, csn);
61f723
+    ruv_add_csn_inprogress (r, ruv, csn);
61f723
 
61f723
     replica_unlock(r->repl_lock);
61f723
 
61f723
@@ -3692,13 +3692,13 @@ abort_csn_callback(const CSN *csn, void *data)
61f723
     {
61f723
         int rc = csnplRemove(r->min_csn_pl, csn);
61f723
         if (rc) {
61f723
-            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed");
61f723
+            slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "abort_csn_callback - csnplRemove failed\n");
61f723
             replica_unlock(r->repl_lock);
61f723
             return;
61f723
         }
61f723
     }
61f723
 
61f723
-    ruv_cancel_csn_inprogress (ruv, csn, replica_get_rid(r));
61f723
+    ruv_cancel_csn_inprogress (r, ruv, csn, replica_get_rid(r));
61f723
     replica_unlock(r->repl_lock);
61f723
 
61f723
     object_release (ruv_obj);
61f723
@@ -4489,3 +4489,13 @@ replica_check_release_timeout(Replica *r, Slapi_PBlock *pb)
61f723
 	}
61f723
 	replica_unlock(r->repl_lock);
61f723
 }
61f723
+void
61f723
+replica_lock_replica(Replica *r)
61f723
+{
61f723
+	replica_lock(r->repl_lock);
61f723
+}
61f723
+void
61f723
+replica_unlock_replica(Replica *r)
61f723
+{
61f723
+	replica_unlock(r->repl_lock);
61f723
+}
61f723
diff --git a/ldap/servers/plugins/replication/repl5_ruv.c b/ldap/servers/plugins/replication/repl5_ruv.c
61f723
index d59e6d2..39449b6 100644
61f723
--- a/ldap/servers/plugins/replication/repl5_ruv.c
61f723
+++ b/ldap/servers/plugins/replication/repl5_ruv.c
61f723
@@ -77,7 +77,7 @@ static char *get_replgen_from_berval(const struct berval *bval);
61f723
 static const char * const prefix_replicageneration = "{replicageneration}";
61f723
 static const char * const prefix_ruvcsn = "{replica "; /* intentionally missing '}' */
61f723
 
61f723
-static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal);
61f723
+static int ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal);
61f723
 
61f723
 /* API implementation */
61f723
 
61f723
@@ -1599,13 +1599,13 @@ ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile)
61f723
 
61f723
 /* this function notifies the ruv that there are operations in progress so that
61f723
    they can be added to the pending list for the appropriate client. */
61f723
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
61f723
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn)
61f723
 {
61f723
     RUVElement* replica;
61f723
     char csn_str[CSN_STRSIZE];
61f723
     int rc = RUV_SUCCESS;
61f723
     int rid = csn_get_replicaid (csn);
61f723
-    CSN *prim_csn;
61f723
+    CSNPL_CTX *prim_csn;
61f723
 
61f723
     PR_ASSERT (ruv && csn);
61f723
 
61f723
@@ -1645,8 +1645,13 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
61f723
     }
61f723
     prim_csn = get_thread_primary_csn();
61f723
     if (prim_csn == NULL) {
61f723
-        set_thread_primary_csn(csn);
61f723
+        set_thread_primary_csn(csn, (Replica *)repl);
61f723
         prim_csn = get_thread_primary_csn();
61f723
+    } else {
61f723
+	/* the prim csn data already exist, need to check if
61f723
+	 * current replica is already present
61f723
+	 */
61f723
+	add_replica_to_primcsn(prim_csn, (Replica *)repl);
61f723
     }
61f723
     rc = csnplInsert (replica->csnpl, csn, prim_csn);
61f723
     if (rc == 1)    /* we already seen this csn */
61f723
@@ -1656,7 +1661,7 @@ int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn)
61f723
                             "The csn %s has already be seen - ignoring\n",
61f723
                             csn_as_string (csn, PR_FALSE, csn_str));
61f723
         }
61f723
-        set_thread_primary_csn(NULL);
61f723
+        set_thread_primary_csn(NULL, NULL);
61f723
         rc = RUV_COVERS_CSN;    
61f723
     }
61f723
     else if(rc != 0)
61f723
@@ -1681,11 +1686,13 @@ done:
61f723
     return rc;
61f723
 }
61f723
 
61f723
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
61f723
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId local_rid)
61f723
 {
61f723
-    RUVElement* replica;
61f723
+    RUVElement* repl_ruv;
61f723
     int rc = RUV_SUCCESS;
61f723
-    CSN *prim_csn = NULL;
61f723
+    CSNPL_CTX *prim_csn = NULL;
61f723
+    Replica *repl_it;
61f723
+    size_t it;
61f723
 
61f723
 
61f723
     PR_ASSERT (ruv && csn);
61f723
@@ -1693,29 +1700,53 @@ int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId local_rid)
61f723
     prim_csn = get_thread_primary_csn();
61f723
     /* locate ruvElement */
61f723
     slapi_rwlock_wrlock (ruv->lock);
61f723
-    replica = ruvGetReplica (ruv, csn_get_replicaid (csn));
61f723
-    if (replica == NULL) {
61f723
+    repl_ruv = ruvGetReplica (ruv, csn_get_replicaid (csn));
61f723
+    if (repl_ruv == NULL) {
61f723
         /* ONREPL - log error */
61f723
 	rc = RUV_NOTFOUND;
61f723
 	goto done;
61f723
     }
61f723
-    if (csn_is_equal(csn, prim_csn)) {
61f723
-	/* the prim csn is cancelled, lets remove all dependent csns */
61f723
-	ReplicaId prim_rid = csn_get_replicaid (csn);
61f723
-	replica = ruvGetReplica (ruv, prim_rid);
61f723
-	rc = csnplRemoveAll (replica->csnpl, prim_csn);
61f723
-	if (prim_rid != local_rid) {
61f723
-		if( local_rid != READ_ONLY_REPLICA_ID) {
61f723
-			replica = ruvGetReplica (ruv, local_rid);
61f723
-			if (replica) {
61f723
-				rc = csnplRemoveAll (replica->csnpl, prim_csn);
61f723
-			} else {
61f723
-				rc = RUV_NOTFOUND;
61f723
-			}
61f723
-		}
61f723
-	}
61f723
+    if (csn_primary(repl, csn, prim_csn)) {
61f723
+        /* the prim csn is cancelled, lets remove all dependent csns */
61f723
+        /* for the primary replica we can have modifications for two RIDS:
61f723
+         * - the local RID for direct or internal operations
61f723
+         * - a remote RID if the primary csn is for a replciated op.
61f723
+         */
61f723
+        ReplicaId prim_rid = csn_get_replicaid(csn);
61f723
+        repl_ruv = ruvGetReplica(ruv, prim_rid);
61f723
+        if (!repl_ruv) {
61f723
+            rc = RUV_NOTFOUND;
61f723
+            goto done;
61f723
+        }
61f723
+        rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
61f723
+
61f723
+        if (prim_rid != local_rid && local_rid != READ_ONLY_REPLICA_ID) {
61f723
+            repl_ruv = ruvGetReplica(ruv, local_rid);
61f723
+            if (!repl_ruv) {
61f723
+                rc = RUV_NOTFOUND;
61f723
+                goto done;
61f723
+            }
61f723
+            rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
61f723
+        }
61f723
+
61f723
+        for (it = 0; it < prim_csn->repl_cnt; it++) {
61f723
+            repl_it = prim_csn->sec_repl[it];
61f723
+            replica_lock_replica(repl_it);
61f723
+            local_rid = replica_get_rid(repl_it);
61f723
+            if (local_rid != READ_ONLY_REPLICA_ID) {
61f723
+                Object *ruv_obj = replica_get_ruv(repl_it);
61f723
+                RUV *ruv_it = object_get_data(ruv_obj);
61f723
+                repl_ruv = ruvGetReplica(ruv_it, local_rid);
61f723
+                if (repl_ruv) {
61f723
+                    rc = csnplRemoveAll(repl_ruv->csnpl, prim_csn);
61f723
+                } else {
61f723
+                    rc = RUV_NOTFOUND;
61f723
+                }
61f723
+            }
61f723
+            replica_unlock_replica(repl_it);
61f723
+        }
61f723
     } else {
61f723
-	rc = csnplRemove (replica->csnpl, csn);
61f723
+	rc = csnplRemove (repl_ruv->csnpl, csn);
61f723
     }
61f723
     if (rc != 0)
61f723
         rc = RUV_NOTFOUND;
61f723
@@ -1727,86 +1758,100 @@ done:
61f723
     return rc;
61f723
 }
61f723
 
61f723
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid)
61f723
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid)
61f723
 {
61f723
     int rc=RUV_SUCCESS;
61f723
-    RUVElement *replica;
61f723
+    RUVElement *repl_ruv;
61f723
     ReplicaId prim_rid;
61f723
+    Replica *repl_it = NULL;
61f723
+    size_t it = 0;
61f723
 
61f723
-    CSN *prim_csn = get_thread_primary_csn();
61f723
+    CSNPL_CTX *prim_csn = get_thread_primary_csn();
61f723
 
61f723
-    if (! csn_is_equal(csn, prim_csn)) {
61f723
+    if (! csn_primary(replica, csn, prim_csn)) {
61f723
 	/* not a primary csn, nothing to do */
61f723
 	return rc;
61f723
     }
61f723
-    slapi_rwlock_wrlock (ruv->lock);
61f723
+
61f723
+    /* first handle primary replica 
61f723
+     * there can be two ruv elements affected
61f723
+     */
61f723
     prim_rid = csn_get_replicaid (csn);
61f723
-    replica = ruvGetReplica (ruv, local_rid);
61f723
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_TRUE);
61f723
-    if ( rc || local_rid == prim_rid) goto done;
61f723
-    replica = ruvGetReplica (ruv, prim_rid);
61f723
-    rc = ruv_update_ruv_element(ruv, replica, csn, replica_purl, PR_FALSE);
61f723
-done:
61f723
+    slapi_rwlock_wrlock (ruv->lock);
61f723
+    if ( local_rid != prim_rid) {
61f723
+	repl_ruv = ruvGetReplica (ruv, prim_rid);
61f723
+	rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_FALSE);
61f723
+    }
61f723
+    repl_ruv = ruvGetReplica (ruv, local_rid);
61f723
+    rc = ruv_update_ruv_element(ruv, repl_ruv, prim_csn, replica_purl, PR_TRUE);
61f723
     slapi_rwlock_unlock (ruv->lock);
61f723
+    if (rc) return rc;
61f723
+
61f723
+    /* now handle secondary replicas */
61f723
+    for (it=0; it<prim_csn->repl_cnt; it++) {
61f723
+	repl_it = prim_csn->sec_repl[it];
61f723
+	replica_lock_replica(repl_it);
61f723
+	Object *ruv_obj = replica_get_ruv (repl_it);
61f723
+	RUV *ruv_it = object_get_data (ruv_obj);
61f723
+	slapi_rwlock_wrlock (ruv_it->lock);
61f723
+	repl_ruv = ruvGetReplica (ruv_it, replica_get_rid(repl_it));
61f723
+	rc = ruv_update_ruv_element(ruv_it, repl_ruv, prim_csn, replica_purl, PR_TRUE);
61f723
+	slapi_rwlock_unlock (ruv_it->lock);
61f723
+	replica_unlock_replica(repl_it);
61f723
+	if (rc) break;
61f723
+    }
61f723
     return rc;
61f723
 }
61f723
+
61f723
 static int
61f723
-ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSN *csn, const char *replica_purl, PRBool isLocal)
61f723
+ruv_update_ruv_element (RUV *ruv, RUVElement *replica, const CSNPL_CTX *prim_csn, const char *replica_purl, PRBool isLocal)
61f723
 {
61f723
     int rc=RUV_SUCCESS;
61f723
     char csn_str[CSN_STRSIZE];
61f723
     CSN *max_csn;
61f723
     CSN *first_csn = NULL;
61f723
     
61f723
-    if (replica == NULL)
61f723
-    {
61f723
+    if (replica == NULL) {
61f723
         /* we should have a ruv element at this point because it would have
61f723
            been added by ruv_add_inprogress function */
61f723
         slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
61f723
-			            "Can't locate RUV element for replica %d\n", csn_get_replicaid (csn)); 
61f723
+                        "Can't locate RUV element for replica %d\n", csn_get_replicaid (prim_csn->prim_csn));
61f723
         goto done;
61f723
     } 
61f723
 
61f723
-	if (csnplCommitAll(replica->csnpl, csn) != 0)
61f723
-	{
61f723
-		slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
61f723
-			            csn_as_string(csn, PR_FALSE, csn_str));
61f723
+    if (csnplCommitAll(replica->csnpl, prim_csn) != 0) {
61f723
+        slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "ruv_update_ruv - Cannot commit csn %s\n",
61f723
+                        csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
61f723
         rc = RUV_CSNPL_ERROR;
61f723
         goto done;
61f723
-	}
61f723
-    else
61f723
-    {
61f723
+    } else {
61f723
         if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
61f723
             slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - "
61f723
-                            "Successfully committed csn %s\n", csn_as_string(csn, PR_FALSE, csn_str));
61f723
+                            "Successfully committed csn %s\n", csn_as_string(prim_csn->prim_csn, PR_FALSE, csn_str));
61f723
         }
61f723
     }
61f723
 
61f723
-	if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL)
61f723
-	{
61f723
-#ifdef DEBUG
61f723
-		slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
61f723
-			            csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
61f723
-#endif
61f723
+    if ((max_csn = csnplRollUp(replica->csnpl, &first_csn)) != NULL) {
61f723
+        slapi_log_err(SLAPI_LOG_REPL, repl_plugin_name, "ruv_update_ruv - Rolled up to csn %s\n",
61f723
+                        csn_as_string(max_csn, PR_FALSE, csn_str)); /* XXXggood remove debugging */
61f723
         /* replica object sets min csn for local replica */
61f723
-		if (!isLocal && replica->min_csn == NULL) {
61f723
-		  /* bug 559223 - it seems that, under huge stress, a server might pass
61f723
-		   * through this code when more than 1 change has already been sent and commited into
61f723
-		   * the pending lists... Therefore, as we are trying to set the min_csn ever 
61f723
-		   * generated by this replica, we need to set the first_csn as the min csn in the
61f723
-		   * ruv */
61f723
-		  set_min_csn_nolock(ruv, first_csn, replica_purl);
61f723
-		}
61f723
-		/* only update the max_csn in the RUV if it is greater than the existing one */
61f723
-		rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
61f723
-		/* It is possible that first_csn points to max_csn.
61f723
-		   We need to free it once */
61f723
-		if (max_csn != first_csn) {
61f723
-			csn_free(&first_csn); 
61f723
-		}
61f723
-		csn_free(&max_csn);
61f723
-	}
61f723
-
61f723
+        if (!isLocal && replica->min_csn == NULL) {
61f723
+            /* bug 559223 - it seems that, under huge stress, a server might pass
61f723
+             * through this code when more than 1 change has already been sent and commited into
61f723
+             * the pending lists... Therefore, as we are trying to set the min_csn ever
61f723
+             * generated by this replica, we need to set the first_csn as the min csn in the
61f723
+             * ruv */
61f723
+        set_min_csn_nolock(ruv, first_csn, replica_purl);
61f723
+        }
61f723
+        /* only update the max_csn in the RUV if it is greater than the existing one */
61f723
+        rc = set_max_csn_nolock_ext(ruv, max_csn, replica_purl, PR_TRUE /* must be greater */);
61f723
+        /* It is possible that first_csn points to max_csn.
61f723
+           We need to free it once */
61f723
+        if (max_csn != first_csn) {
61f723
+            csn_free(&first_csn);
61f723
+        }
61f723
+        csn_free(&max_csn);
61f723
+    }
61f723
 done:
61f723
 
61f723
     return rc;
61f723
diff --git a/ldap/servers/plugins/replication/repl5_ruv.h b/ldap/servers/plugins/replication/repl5_ruv.h
61f723
index c8960fd..f3cd38b 100644
61f723
--- a/ldap/servers/plugins/replication/repl5_ruv.h
61f723
+++ b/ldap/servers/plugins/replication/repl5_ruv.h
61f723
@@ -108,9 +108,9 @@ int ruv_to_bervals(const RUV *ruv, struct berval ***bvals);
61f723
 PRInt32 ruv_replica_count (const RUV *ruv);
61f723
 char **ruv_get_referrals(const RUV *ruv);
61f723
 void ruv_dump(const RUV *ruv, char *ruv_name, PRFileDesc *prFile);
61f723
-int ruv_add_csn_inprogress (RUV *ruv, const CSN *csn);
61f723
-int ruv_cancel_csn_inprogress (RUV *ruv, const CSN *csn, ReplicaId rid);
61f723
-int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, ReplicaId local_rid);
61f723
+int ruv_add_csn_inprogress (void *repl, RUV *ruv, const CSN *csn);
61f723
+int ruv_cancel_csn_inprogress (void *repl, RUV *ruv, const CSN *csn, ReplicaId rid);
61f723
+int ruv_update_ruv (RUV *ruv, const CSN *csn, const char *replica_purl, void *replica, ReplicaId local_rid);
61f723
 int ruv_move_local_supplier_to_first(RUV *ruv, ReplicaId rid);
61f723
 int ruv_get_first_id_and_purl(RUV *ruv, ReplicaId *rid, char **replica_purl );
61f723
 int ruv_local_contains_supplier(RUV *ruv, ReplicaId rid);
61f723
diff --git a/ldap/servers/slapd/slapi-private.h b/ldap/servers/slapd/slapi-private.h
61f723
index 0836d66..3910dbe 100644
61f723
--- a/ldap/servers/slapd/slapi-private.h
61f723
+++ b/ldap/servers/slapd/slapi-private.h
61f723
@@ -193,7 +193,7 @@ const CSN *csn_max(const CSN *csn1,const CSN *csn2);
61f723
    a csn from the set.*/
61f723
 int csn_increment_subsequence (CSN *csn);
61f723
 
61f723
-void csnplFreeCSN (void *arg);
61f723
+void csnplFreeCSNPL_CTX (void *arg);
61f723
 /*
61f723
  * csnset.c
61f723
  */
61f723
-- 
61f723
2.9.4
61f723