Blame SOURCES/0095-Ticket-48766-Replication-changelog-can-incorrectly-s.patch

c16027
From a39e2b7cba91b9f13fe54123b7e8b510bf5bcee8 Mon Sep 17 00:00:00 2001
c16027
From: Ludwig Krispenz <lkrispen@redhat.com>
c16027
Date: Wed, 8 Jun 2016 11:28:07 +0200
c16027
Subject: [PATCH 95/99] Ticket 48766 - Replication changelog can incorrectly
c16027
 skip over updates
c16027
c16027
Bug Description:
c16027
      The changelog iterator uses a buffer to load and send changes, when the buffer is empty
c16027
      there were scenarios when the straing point for reloading the buffer was incorrectly set
c16027
      and changes were skipped
c16027
c16027
Fix Description: reworked clcach buffer code following design at
c16027
      http://www.port389.org/docs/389ds/design/changelog-processing-in-repl-state-sending-updates.html
c16027
c16027
https://fedorahosted.org/389/ticket/48766
c16027
c16027
Reviewed by: Mark and Thierry, thanks
c16027
c16027
(cherry picked from commit b08df71aa9eb18572f58e55e8d6b9ef7fe181773)
c16027
(cherry picked from commit ec15a75ccdba713e4d74dcd760e3244ba43b6191)
c16027
---
c16027
 ldap/servers/plugins/replication/cl5_api.c     | 171 +++------------
c16027
 ldap/servers/plugins/replication/cl5_clcache.c | 292 +++++++++++++++----------
c16027
 ldap/servers/plugins/replication/cl5_clcache.h |   2 +-
c16027
 3 files changed, 214 insertions(+), 251 deletions(-)
c16027
c16027
diff --git a/ldap/servers/plugins/replication/cl5_api.c b/ldap/servers/plugins/replication/cl5_api.c
c16027
index ae23353..3adaf86 100644
c16027
--- a/ldap/servers/plugins/replication/cl5_api.c
c16027
+++ b/ldap/servers/plugins/replication/cl5_api.c
c16027
@@ -5489,18 +5489,13 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
c16027
 {
c16027
 	CLC_Buffer *clcache = NULL;
c16027
 	CL5DBFile *file;
c16027
-    int i;
c16027
-    CSN **csns = NULL;
c16027
     CSN *startCSN = NULL;
c16027
-    CSN *minCSN = NULL;
c16027
     char csnStr [CSN_STRSIZE];
c16027
     int rc = CL5_SUCCESS;
c16027
     Object *supplierRuvObj = NULL;
c16027
     RUV *supplierRuv = NULL;
c16027
-    PRBool newReplica;
c16027
     PRBool haveChanges = PR_FALSE;
c16027
 	char *agmt_name;
c16027
-	ReplicaId rid;
c16027
 
c16027
     PR_ASSERT (consumerRuv && replica && fileObj && iterator);
c16027
 	csnStr[0] = '\0';
c16027
@@ -5528,111 +5523,32 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
c16027
 		ruv_dump (supplierRuv, agmt_name, NULL);
c16027
 	}
c16027
    
c16027
-	/*
c16027
-	 * get the sorted list of SupplierMinCSN (if no ConsumerMaxCSN)
c16027
-	 * and ConsumerMaxCSN for those RIDs where consumer is not
c16027
-	 * up-to-date.
c16027
-	 */
c16027
-    csns = cl5BuildCSNList (consumerRuv, supplierRuv);
c16027
-    if (csns == NULL)
c16027
-    {
c16027
-        rc = CL5_NOTFOUND;
c16027
-        goto done;
c16027
-    }
c16027
 
c16027
-	/* iterate over elements of consumer's (and/or supplier's) ruv */
c16027
-    for (i = 0; csns[i]; i++)
c16027
-    {
c16027
-        CSN *consumerMaxCSN = NULL;
c16027
-
c16027
-		rid = csn_get_replicaid(csns[i]);
c16027
-
c16027
-		/*
c16027
-		 * Skip CSN that is originated from the consumer.
c16027
-		 * If RID==65535, the CSN is originated from a
c16027
-		 * legacy consumer. In this case the supplier
c16027
-		 * and the consumer may have the same RID.
c16027
-		 */
c16027
-		if ((rid == consumerRID && rid != MAX_REPLICA_ID) || (is_cleaned_rid(rid)) )
c16027
-			continue;
c16027
+	/* initialize the changelog buffer and do the initial load */
c16027
 
c16027
-        startCSN = csns[i];
c16027
+	rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
c16027
+	if ( rc != 0 ) goto done;
c16027
 
c16027
-		rc = clcache_get_buffer ( &clcache, file->db, consumerRID, consumerRuv, supplierRuv );
c16027
-		if ( rc != 0 ) goto done;
c16027
-
c16027
-	    /* This is the first loading of this iteration. For replicas
c16027
-		 * already known to the consumer, we exclude the last entry
c16027
-		 * sent to the consumer by using DB_NEXT. However, for
c16027
-		 * replicas new to the consumer, we include the first change
c16027
-		 * ever generated by that replica.
c16027
-		 */
c16027
-		newReplica = ruv_get_largest_csn_for_replica (consumerRuv, rid, &consumerMaxCSN);
c16027
-		csn_free(&consumerMaxCSN);
c16027
-		rc = clcache_load_buffer (clcache, startCSN, (newReplica ? DB_SET : DB_NEXT));
c16027
-
c16027
-		/* there is a special case which can occur just after migration - in this case,
c16027
-		the consumer RUV will contain the last state of the supplier before migration,
c16027
-		but the supplier will have an empty changelog, or the supplier changelog will
c16027
-		not contain any entries within the consumer min and max CSN - also, since
c16027
-		the purge RUV contains no CSNs, the changelog has never been purged
c16027
-		ASSUMPTIONS - it is assumed that the supplier had no pending changes to send
c16027
-		to any consumers; that is, we can assume that no changes were lost due to
c16027
-		either changelog purging or database reload - bug# 603061 - richm@netscape.com
c16027
-		*/
c16027
-        if ((rc == DB_NOTFOUND) && !ruv_has_csns(file->purgeRUV))
c16027
-        {
c16027
-            char mincsnStr[CSN_STRSIZE];
c16027
-
c16027
-            /* use the supplier min csn for the buffer start csn - we know
c16027
-               this csn is in our changelog */
c16027
-            if ((RUV_SUCCESS == ruv_get_min_csn_ext(supplierRuv, &minCSN, 1 /* ignore cleaned rids */)) &&
c16027
-                minCSN)
c16027
-            { /* must now free startCSN */
c16027
-                if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
-                    csn_as_string(startCSN, PR_FALSE, csnStr);
c16027
-                    csn_as_string(minCSN, PR_FALSE, mincsnStr);
c16027
-                    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
c16027
-                                    "%s: CSN %s not found and no purging, probably a reinit\n",
c16027
-                                    agmt_name, csnStr);
c16027
-                    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
c16027
-                                    "%s: Will try to use supplier min CSN %s to load changelog\n",
c16027
-                                    agmt_name, mincsnStr);
c16027
-                }
c16027
-                startCSN = minCSN;
c16027
-                rc = clcache_load_buffer (clcache, startCSN, DB_SET);
c16027
-            }
c16027
-            else
c16027
-            {
c16027
-                if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
-                    csn_as_string(startCSN, PR_FALSE, csnStr); 
c16027
-                    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl,
c16027
-                                    "%s: CSN %s not found and no purging, probably a reinit\n",
c16027
-                                    agmt_name, csnStr);
c16027
-                    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
c16027
-                                    "%s: Could not get the min csn from the supplier RUV\n",
c16027
-                                    agmt_name);
c16027
-                }
c16027
-                rc = CL5_RUV_ERROR;
c16027
-                goto done;
c16027
-            }
c16027
-        }
c16027
+	rc = clcache_load_buffer (clcache, &startCSN);
c16027
 
c16027
         if (rc == 0) {
c16027
-            haveChanges = PR_TRUE;
c16027
-            rc = CL5_SUCCESS;
c16027
-            if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
-                csn_as_string(startCSN, PR_FALSE, csnStr); 
c16027
-                slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl, 
c16027
-                                "%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
c16027
-            }
c16027
-            if (startCSN != csns[i]) {
c16027
-                csn_free(&startCSN);
c16027
-            }
c16027
-            break;
c16027
+		haveChanges = PR_TRUE;
c16027
+		rc = CL5_SUCCESS;
c16027
+		if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
+			csn_as_string(startCSN, PR_FALSE, csnStr);
c16027
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name_cl,
c16027
+				"%s: CSN %s found, position set for replay\n", agmt_name, csnStr);
c16027
+		}
c16027
         }
c16027
-        else if (rc == DB_NOTFOUND)  /* entry not found */
c16027
-        {
c16027
+        else if (rc == DB_NOTFOUND)   {
c16027
+	    /* buffer not loaded.
c16027
+	     * either because no changes have to be sent ==> startCSN is NULL
c16027
+	     * or the calculated startCSN cannot be found in the changelog
c16027
+	     */
c16027
+	    if (startCSN == NULL) {
c16027
+		rc = CL5_NOTFOUND;
c16027
+		goto done;
c16027
+	    }
c16027
             /* check whether this csn should be present */
c16027
             rc = _cl5CheckMissingCSN (startCSN, supplierRuv, file);
c16027
             if (rc == CL5_MISSING_DATA)  /* we should have had the change but we don't */
c16027
@@ -5650,17 +5566,6 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
c16027
                                 "%s: CSN %s not found, we aren't as up to date, or we purged\n", 
c16027
                                 agmt_name, csnStr);
c16027
             }
c16027
-            if (startCSN != csns[i]) {
c16027
-                csn_free(&startCSN);
c16027
-            }
c16027
-            if (rc == CL5_MISSING_DATA)  /* we should have had the change but we don't */
c16027
-            {
c16027
-                break;
c16027
-            }
c16027
-            else /* we are not as up to date or we purged */
c16027
-            {
c16027
-                continue;
c16027
-            } 
c16027
         }
c16027
         else
c16027
         {
c16027
@@ -5669,34 +5574,29 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
c16027
 			slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
c16027
                             "%s: Failed to retrieve change with CSN %s; db error - %d %s\n", 
c16027
                             agmt_name, csnStr, rc, db_strerror(rc));
c16027
-            if (startCSN != csns[i]) {
c16027
-                csn_free(&startCSN);
c16027
-            }
c16027
 
c16027
             rc = CL5_DB_ERROR;
c16027
-            break;
c16027
-        }
c16027
+    }
c16027
 
c16027
-    } /* end for */
c16027
 
c16027
     /* setup the iterator */
c16027
     if (haveChanges)
c16027
     {
c16027
-	    *iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
c16027
+	*iterator = (CL5ReplayIterator*) slapi_ch_calloc (1, sizeof (CL5ReplayIterator));
c16027
 
c16027
-	    if (*iterator == NULL)
c16027
-	    {
c16027
-		    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
c16027
+        if (*iterator == NULL)
c16027
+	{
c16027
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name_cl, 
c16027
 						"%s: _cl5PositionCursorForReplay: failed to allocate iterator\n", agmt_name);
c16027
-		    rc = CL5_MEMORY_ERROR;
c16027
-		    goto done;
c16027
-	    }
c16027
+            rc = CL5_MEMORY_ERROR;
c16027
+	    goto done;
c16027
+	}
c16027
 
c16027
         /* ONREPL - should we make a copy of both RUVs here ?*/
c16027
-		(*iterator)->fileObj = fileObj;
c16027
-		(*iterator)->clcache = clcache; clcache = NULL;
c16027
-		(*iterator)->consumerRID = consumerRID;
c16027
-	    (*iterator)->consumerRuv = consumerRuv;
c16027
+	(*iterator)->fileObj = fileObj;
c16027
+	(*iterator)->clcache = clcache; clcache = NULL;
c16027
+	(*iterator)->consumerRID = consumerRID;
c16027
+	(*iterator)->consumerRuv = consumerRuv;
c16027
         (*iterator)->supplierRuvObj = supplierRuvObj;
c16027
     }
c16027
     else if (rc == CL5_SUCCESS)
c16027
@@ -5706,11 +5606,8 @@ static int _cl5PositionCursorForReplay (ReplicaId consumerRID, const RUV *consum
c16027
     }
c16027
 
c16027
 done:
c16027
-	if ( clcache )
c16027
-		clcache_return_buffer ( &clcache );
c16027
-
c16027
-    if (csns)
c16027
-        cl5DestroyCSNList (&csns);
c16027
+    if ( clcache )
c16027
+	clcache_return_buffer ( &clcache );
c16027
 
c16027
     if (rc != CL5_SUCCESS)
c16027
     {
c16027
diff --git a/ldap/servers/plugins/replication/cl5_clcache.c b/ldap/servers/plugins/replication/cl5_clcache.c
c16027
index b53d7c0..2d3bb28 100644
c16027
--- a/ldap/servers/plugins/replication/cl5_clcache.c
c16027
+++ b/ldap/servers/plugins/replication/cl5_clcache.c
c16027
@@ -39,6 +39,7 @@
c16027
 #define DEFAULT_CLC_BUFFER_COUNT_MAX		0
c16027
 #define DEFAULT_CLC_BUFFER_PAGE_COUNT		32
c16027
 #define DEFAULT_CLC_BUFFER_PAGE_SIZE		1024
c16027
+#define WORK_CLC_BUFFER_PAGE_SIZE 8*DEFAULT_CLC_BUFFER_PAGE_SIZE
c16027
 
c16027
 enum {
c16027
 	CLC_STATE_READY = 0,		/* ready to iterate */
c16027
@@ -56,8 +57,9 @@ struct csn_seq_ctrl_block {
c16027
 	ReplicaId	rid;				/* RID this block serves */
c16027
 	CSN			*consumer_maxcsn;	/* Don't send CSN <= this */
c16027
 	CSN			*local_maxcsn;		/* Don't send CSN > this */
c16027
-	CSN			*prev_local_maxcsn;	/* */
c16027
-	int			state;				/* CLC_STATE_* */
c16027
+	CSN			*prev_local_maxcsn;	/* Copy of last state at buffer loading */
c16027
+	CSN			*local_mincsn;		/* Used to determin anchor csn*/
c16027
+	int			state;			/* CLC_STATE_* */
c16027
 };
c16027
 
c16027
 /*
c16027
@@ -70,6 +72,8 @@ struct clc_buffer {
c16027
 	ReplicaId	 buf_consumer_rid;	/* help checking threshold csn */
c16027
 	const RUV	*buf_consumer_ruv;	/* used to skip change */
c16027
 	const RUV	*buf_local_ruv;		/* used to refresh local_maxcsn */
c16027
+	int		buf_ignoreConsumerRID;	/* how to handle updates from consumer */
c16027
+	int	 	buf_load_cnt;		/* number of loads for session */
c16027
 
c16027
 	/*
c16027
 	 * fields for retriving data from DB
c16027
@@ -90,7 +94,6 @@ struct clc_buffer {
c16027
 	int			 buf_max_cscbs;
c16027
 
c16027
 	/* fields for debugging stat */
c16027
-	int		 	 buf_load_cnt;		/* number of loads for session */
c16027
 	int		 	 buf_record_cnt;	/* number of changes for session */
c16027
 	int		 	 buf_record_skipped;	/* number of changes skipped */
c16027
 	int		 	 buf_skipped_new_rid;	/* number of changes skipped due to new_rid */
c16027
@@ -133,7 +136,8 @@ struct clc_pool {
c16027
 static struct clc_pool *_pool = NULL;	/* process's buffer pool */
c16027
 
c16027
 /* static prototypes */
c16027
-static int	clcache_adjust_anchorcsn ( CLC_Buffer *buf );
c16027
+static int	clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag );
c16027
+static int	clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag );
c16027
 static void	clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf );
c16027
 static int	clcache_refresh_local_maxcsns ( CLC_Buffer *buf );
c16027
 static int	clcache_skip_change ( CLC_Buffer *buf );
c16027
@@ -251,8 +255,23 @@ clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV
c16027
 	}
c16027
 
c16027
 	if ( NULL != *buf ) {
c16027
+		CSN *c_csn = NULL;
c16027
+		CSN *l_csn = NULL;
c16027
 		(*buf)->buf_consumer_ruv = consumer_ruv;
c16027
 		(*buf)->buf_local_ruv = local_ruv;
c16027
+		(*buf)->buf_load_flag = DB_MULTIPLE_KEY;
c16027
+		ruv_get_largest_csn_for_replica (consumer_ruv, consumer_rid, &c_csn);
c16027
+		ruv_get_largest_csn_for_replica (local_ruv, consumer_rid, &l_csn);
c16027
+		if (l_csn && csn_compare(l_csn, c_csn) > 0) {
c16027
+			/* the supplier has updates for the consumer RID and
c16027
+			 * these updates are newer than on the consumer
c16027
+			 */
c16027
+			(*buf)->buf_ignoreConsumerRID = 0;
c16027
+		} else {
c16027
+			(*buf)->buf_ignoreConsumerRID = 1;
c16027
+		}
c16027
+		csn_free(&c_csn);
c16027
+		csn_free(&l_csn);
c16027
 	}
c16027
 	else {
c16027
 		slapi_log_error ( SLAPI_LOG_FATAL, get_thread_private_agmtname(),
c16027
@@ -305,36 +324,25 @@ clcache_return_buffer ( CLC_Buffer **buf )
c16027
  *		       historic reason.
c16027
  */
c16027
 int
c16027
-clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
c16027
+clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN )
c16027
 {
c16027
 	int rc = 0;
c16027
+        int flag = DB_NEXT;
c16027
 
c16027
+	if (anchorCSN) *anchorCSN = NULL;
c16027
 	clcache_refresh_local_maxcsns ( buf );
c16027
 
c16027
-	/* Set the loading key */
c16027
-	if ( anchorcsn ) {
c16027
+	if (buf->buf_load_cnt == 0 ) {
c16027
 		clcache_refresh_consumer_maxcsns ( buf );
c16027
-		buf->buf_load_flag = DB_MULTIPLE_KEY;
c16027
-		csn_as_string ( anchorcsn, 0, (char*)buf->buf_key.data );
c16027
-		slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
c16027
-				"session start: anchorcsn=%s\n", (char*)buf->buf_key.data );
c16027
-	}
c16027
-	else if ( csn_get_time(buf->buf_current_csn) == 0 ) {
c16027
-		/* time == 0 means this csn has never been set */
c16027
-		rc = DB_NOTFOUND;
c16027
-	}
c16027
-	else if ( clcache_adjust_anchorcsn ( buf ) != 0 ) {
c16027
-		rc = DB_NOTFOUND;
c16027
-	}
c16027
-	else {
c16027
-		csn_as_string ( buf->buf_current_csn, 0, (char*)buf->buf_key.data );
c16027
-		slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
c16027
-				"load next: anchorcsn=%s\n", (char*)buf->buf_key.data );
c16027
+		rc = clcache_initial_anchorcsn ( buf, &flag );
c16027
+        } else {
c16027
+		rc = clcache_adjust_anchorcsn ( buf, &flag );
c16027
 	}
c16027
 
c16027
 	if ( rc == 0 ) {
c16027
 
c16027
 		buf->buf_state = CLC_STATE_READY;
c16027
+		if (anchorCSN) *anchorCSN = buf->buf_current_csn;
c16027
 		rc = clcache_load_buffer_bulk ( buf, flag );
c16027
 
c16027
 		/* Reset some flag variables */
c16027
@@ -344,21 +352,15 @@ clcache_load_buffer ( CLC_Buffer *buf, CSN *anchorcsn, int flag )
c16027
 				buf->buf_cscbs[i]->state = CLC_STATE_READY;
c16027
 			}
c16027
 		}
c16027
-		else if ( anchorcsn ) {
c16027
-			/* Report error only when the missing is persistent */
c16027
-			if ( buf->buf_missing_csn && csn_compare (buf->buf_missing_csn, anchorcsn) == 0 ) {
c16027
-				if (!buf->buf_prev_missing_csn || csn_compare (buf->buf_prev_missing_csn, anchorcsn)) {
c16027
-					slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
c16027
-						"Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
c16027
-						(char*)buf->buf_key.data, rc );
c16027
-					csn_dup_or_init_by_csn (&buf->buf_prev_missing_csn, anchorcsn);
c16027
-				}
c16027
-			}
c16027
-			else {
c16027
-				csn_dup_or_init_by_csn (&buf->buf_missing_csn, anchorcsn);
c16027
-			}
c16027
+		else {
c16027
+			slapi_log_error ( SLAPI_LOG_FATAL, buf->buf_agmt_name,
c16027
+					"Can't locate CSN %s in the changelog (DB rc=%d). If replication stops, the consumer may need to be reinitialized.\n",
c16027
+					(char*)buf->buf_key.data, rc );
c16027
 		}
c16027
+	} else if (rc == CLC_STATE_DONE) {
c16027
+		rc = DB_NOTFOUND;
c16027
 	}
c16027
+
c16027
 	if ( rc != 0 ) {
c16027
 		slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
c16027
 				"clcache_load_buffer: rc=%d\n", rc );
c16027
@@ -483,7 +485,7 @@ clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **da
c16027
 		 * We're done with the current buffer. Now load the next chunk.
c16027
 		 */
c16027
 		if ( NULL == *key && CLC_STATE_READY == buf->buf_state ) {
c16027
-			rc = clcache_load_buffer ( buf, NULL, DB_NEXT );
c16027
+			rc = clcache_load_buffer ( buf, NULL );
c16027
 			if ( 0 == rc && buf->buf_record_ptr ) {
c16027
 				DB_MULTIPLE_KEY_NEXT ( buf->buf_record_ptr, &buf->buf_data,
c16027
 								   *key, *keylen, *data, *datalen );
c16027
@@ -521,7 +523,6 @@ clcache_refresh_consumer_maxcsns ( CLC_Buffer *buf )
c16027
 	int i;
c16027
 
c16027
 	for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
c16027
-		csn_free(&buf->buf_cscbs[i]->consumer_maxcsn);
c16027
 		ruv_get_largest_csn_for_replica (
c16027
 				buf->buf_consumer_ruv,
c16027
 				buf->buf_cscbs[i]->rid,
c16027
@@ -538,14 +539,11 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
c16027
 	int i;
c16027
 
c16027
 	rid = csn_get_replicaid ( rid_data->csn );
c16027
-
c16027
-	/*
c16027
-	 * No need to create cscb for consumer's RID.
c16027
-	 * If RID==65535, the CSN is originated from a
c16027
-	 * legacy consumer. In this case the supplier
c16027
-	 * and the consumer may have the same RID.
c16027
+	/* we do not handle updates originated at the consumer if not required
c16027
+	 * and we ignore RID which have been cleaned
c16027
 	 */
c16027
-	if ( rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID )
c16027
+	if ( (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID) ||
c16027
+		is_cleaned_rid(rid) )
c16027
 		return rc;
c16027
 
c16027
 	for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
c16027
@@ -564,9 +562,20 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
c16027
 		}
c16027
 		buf->buf_cscbs[i]->rid = rid;
c16027
 		buf->buf_num_cscbs++;
c16027
+		/* this is the first time we have a local change for the RID
c16027
+		 * we need to check what the consumer knows about it.
c16027
+		 */
c16027
+		ruv_get_largest_csn_for_replica (
c16027
+				buf->buf_consumer_ruv,
c16027
+				buf->buf_cscbs[i]->rid,
c16027
+				&buf->buf_cscbs[i]->consumer_maxcsn );
c16027
 	}
c16027
 
c16027
+	if (buf->buf_cscbs[i]->local_maxcsn)
c16027
+	    csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn, buf->buf_cscbs[i]->local_maxcsn );
c16027
+
c16027
 	csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_maxcsn, rid_data->csn );
c16027
+	csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->local_mincsn, rid_data->min_csn );
c16027
 
c16027
 	if ( buf->buf_cscbs[i]->consumer_maxcsn &&
c16027
 		 csn_compare (buf->buf_cscbs[i]->consumer_maxcsn, rid_data->csn) >= 0 ) {
c16027
@@ -580,88 +589,147 @@ clcache_refresh_local_maxcsn ( const ruv_enum_data *rid_data, void *data )
c16027
 static int
c16027
 clcache_refresh_local_maxcsns ( CLC_Buffer *buf )
c16027
 {
c16027
-	int i;
c16027
 
c16027
-	for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
c16027
-		csn_dup_or_init_by_csn ( &buf->buf_cscbs[i]->prev_local_maxcsn,
c16027
-								  buf->buf_cscbs[i]->local_maxcsn );
c16027
-	}
c16027
 	return ruv_enumerate_elements ( buf->buf_local_ruv, clcache_refresh_local_maxcsn, buf );
c16027
 }
c16027
 
c16027
 /*
c16027
  * Algorithm:
c16027
  *
c16027
- *	1. Snapshot local RUVs;
c16027
- *	2. Load buffer;
c16027
- *	3. Send to the consumer only those CSNs that are covered
c16027
- *	   by the RUVs snapshot taken in the first step;
c16027
- *	   All CSNs that are covered by the RUVs snapshot taken in the
c16027
- *	   first step are guaranteed in consecutive order for the respected
c16027
- *	   RIDs because of the the CSN pending list control;
c16027
- *	   A CSN that is not covered by the RUVs snapshot may be out of order
c16027
- *	   since it is possible that a smaller CSN might not have committed 
c16027
- *	   yet by the time the buffer was loaded.
c16027
- *	4. Determine anchorcsn for each RID:
c16027
- *
c16027
- *	   Case|  Local vs. Buffer | New Local |       Next
c16027
- *	       | MaxCSN     MaxCSN |    MaxCSN | Anchor-CSN
c16027
- *	   ----+-------------------+-----------+----------------
c16027
- *       1 |   Cl    >=   Cb   |     *     | Cb
c16027
- *       2 |   Cl    <    Cb   |     Cl    | Cb
c16027
- *       3 |   Cl    <    Cb   |     Cl2   | Cl 
c16027
- *
c16027
- *	5. Determine anchorcsn for next load:
c16027
+ *	1. Determine anchorcsn for each RID:
c16027
+ *	2. Determine anchorcsn for next load:
c16027
  *	   Anchor-CSN = min { all Next-Anchor-CSN, Buffer-MaxCSN }
c16027
  */
c16027
 static int
c16027
-clcache_adjust_anchorcsn ( CLC_Buffer *buf )
c16027
+clcache_initial_anchorcsn ( CLC_Buffer *buf, int *flag )
c16027
 {
c16027
 	PRBool hasChange = PR_FALSE;
c16027
 	struct csn_seq_ctrl_block *cscb;
c16027
 	int i;
c16027
+	CSN *anchorcsn = NULL;
c16027
 
c16027
 	if ( buf->buf_state == CLC_STATE_READY ) {
c16027
 		for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
c16027
+			CSN *rid_anchor = NULL;
c16027
+			int rid_flag = DB_NEXT;
c16027
 			cscb = buf->buf_cscbs[i];
c16027
 
c16027
-			if ( cscb->state == CLC_STATE_UP_TO_DATE )
c16027
-				continue;
c16027
+			if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
+				char prevmax[CSN_STRSIZE];
c16027
+				char local[CSN_STRSIZE];
c16027
+				char curr[CSN_STRSIZE];
c16027
+				char conmaxcsn[CSN_STRSIZE];
c16027
+				csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
c16027
+				csn_as_string(cscb->local_maxcsn, 0, local);
c16027
+				csn_as_string(buf->buf_current_csn, 0, curr);
c16027
+				csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
c16027
+				slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn" ,
c16027
+								"%s - (cscb %d - state %d) - csnPrevMax (%s) "
c16027
+								"csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
c16027
+								buf->buf_agmt_name, i, cscb->state, prevmax, local,
c16027
+								curr, conmaxcsn);
c16027
+			}
c16027
 
c16027
-			/*
c16027
-			 * Case 3 unsafe ruv change: next buffer load should start
c16027
-			 * from where the maxcsn in the old ruv was. Since each
c16027
-			 * cscb has remembered the maxcsn sent to the consumer,
c16027
-			 * CSNs that may be loaded again could easily be skipped.
c16027
-			 */
c16027
-			if ( cscb->prev_local_maxcsn &&
c16027
-				 csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) < 0 &&
c16027
-				 csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) != 0 ) {
c16027
+			if (cscb->consumer_maxcsn == NULL) {
c16027
+				/* the consumer hasn't seen changes for this RID */
c16027
+				rid_anchor = cscb->local_mincsn;
c16027
+				rid_flag = DB_SET;
c16027
+			} else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
c16027
+				rid_anchor = cscb->consumer_maxcsn;
c16027
+			}
c16027
+
c16027
+			if (rid_anchor && (anchorcsn == NULL ||
c16027
+			    ( csn_compare(rid_anchor, anchorcsn) < 0))) {
c16027
+				anchorcsn = rid_anchor;
c16027
+				*flag = rid_flag;
c16027
 				hasChange = PR_TRUE;
c16027
-				cscb->state = CLC_STATE_READY;
c16027
-				csn_init_by_csn ( buf->buf_current_csn, cscb->prev_local_maxcsn );
c16027
-				csn_as_string ( cscb->prev_local_maxcsn, 0, (char*)buf->buf_key.data );
c16027
-				slapi_log_error ( SLAPI_LOG_REPL, buf->buf_agmt_name,
c16027
-						"adjust anchor csn upon %s\n",
c16027
-						( cscb->state == CLC_STATE_CSN_GT_RUV ? "out of sequence csn" : "unsafe ruv change") );
c16027
-				continue;
c16027
 			}
c16027
 
c16027
-			/*
c16027
-			 * check if there are still changes to send for this RID
c16027
-			 * Assume we had compared the local maxcsn and the consumer
c16027
-			 * max csn before this function was called and hence the
c16027
-			 * cscb->state had been set accordingly.
c16027
-			 */ 
c16027
-			if ( hasChange == PR_FALSE &&
c16027
-				 csn_compare (cscb->local_maxcsn, buf->buf_current_csn) > 0 ) {
c16027
+
c16027
+		}
c16027
+	}
c16027
+
c16027
+	if ( !hasChange ) {
c16027
+		buf->buf_state = CLC_STATE_DONE;
c16027
+	} else {
c16027
+		csn_init_by_csn(buf->buf_current_csn, anchorcsn);
c16027
+		csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
c16027
+		slapi_log_error(SLAPI_LOG_REPL, "clcache_initial_anchorcsn",
c16027
+						"anchor is now: %s\n", (char *)buf->buf_key.data);
c16027
+	}
c16027
+
c16027
+	return buf->buf_state;
c16027
+}
c16027
+
c16027
+static int
c16027
+clcache_adjust_anchorcsn ( CLC_Buffer *buf, int *flag )
c16027
+{
c16027
+	PRBool hasChange = PR_FALSE;
c16027
+	struct csn_seq_ctrl_block *cscb;
c16027
+	int i;
c16027
+	CSN *anchorcsn = NULL;
c16027
+
c16027
+	if ( buf->buf_state == CLC_STATE_READY ) {
c16027
+		for ( i = 0; i < buf->buf_num_cscbs; i++ ) {
c16027
+			CSN *rid_anchor = NULL;
c16027
+			int rid_flag = DB_NEXT;
c16027
+			cscb = buf->buf_cscbs[i];
c16027
+
c16027
+			if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
+				char prevmax[CSN_STRSIZE];
c16027
+				char local[CSN_STRSIZE];
c16027
+				char curr[CSN_STRSIZE];
c16027
+				char conmaxcsn[CSN_STRSIZE];
c16027
+				csn_as_string(cscb->prev_local_maxcsn, 0, prevmax);
c16027
+				csn_as_string(cscb->local_maxcsn, 0, local);
c16027
+				csn_as_string(buf->buf_current_csn, 0, curr);
c16027
+				csn_as_string(cscb->consumer_maxcsn, 0, conmaxcsn);
c16027
+				slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn" ,
c16027
+								"%s - (cscb %d - state %d) - csnPrevMax (%s) "
c16027
+								"csnMax (%s) csnBuf (%s) csnConsumerMax (%s)\n",
c16027
+								buf->buf_agmt_name, i, cscb->state, prevmax, local,
c16027
+								curr, conmaxcsn);
c16027
+			}
c16027
+
c16027
+			if (csn_compare (cscb->local_maxcsn, cscb->prev_local_maxcsn) == 0 ||
c16027
+			    csn_compare (cscb->prev_local_maxcsn, buf->buf_current_csn) > 0 ) {
c16027
+				if (csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
c16027
+					rid_anchor = buf->buf_current_csn;
c16027
+				}
c16027
+			} else {
c16027
+				/* prev local max csn < csnBuffer AND different from local maxcsn */
c16027
+				if (cscb->prev_local_maxcsn == NULL) {
c16027
+					if (cscb->consumer_maxcsn == NULL) {
c16027
+						/* the consumer hasn't seen changes for this RID */
c16027
+						rid_anchor = cscb->local_mincsn;
c16027
+						rid_flag = DB_SET;
c16027
+					} else if ( csn_compare (cscb->local_maxcsn, cscb->consumer_maxcsn) > 0 ) {
c16027
+						rid_anchor = cscb->consumer_maxcsn;
c16027
+					}
c16027
+				} else {
c16027
+					/* csnPrevMaxSup > 0 */
c16027
+					rid_anchor = cscb->consumer_maxcsn;
c16027
+				}
c16027
+			}
c16027
+
c16027
+			if (rid_anchor && (anchorcsn == NULL ||
c16027
+			    ( csn_compare(rid_anchor, anchorcsn) < 0))) {
c16027
+				anchorcsn = rid_anchor;
c16027
+				*flag = rid_flag;
c16027
 				hasChange = PR_TRUE;
c16027
 			}
c16027
+
c16027
+
c16027
 		}
c16027
 	}
c16027
 
c16027
 	if ( !hasChange ) {
c16027
 		buf->buf_state = CLC_STATE_DONE;
c16027
+	} else {
c16027
+		csn_init_by_csn(buf->buf_current_csn, anchorcsn);
c16027
+		csn_as_string(buf->buf_current_csn, 0, (char *)buf->buf_key.data);
c16027
+		slapi_log_error(SLAPI_LOG_REPL, "clcache_adjust_anchorcsn",
c16027
+						"anchor is now: %s\n", (char *)buf->buf_key.data);
c16027
 	}
c16027
 
c16027
 	return buf->buf_state;
c16027
@@ -675,7 +743,6 @@ clcache_skip_change ( CLC_Buffer *buf )
c16027
 	int skip = 1;
c16027
 	int i;
c16027
 	char buf_cur_csn_str[CSN_STRSIZE];
c16027
-	char oth_csn_str[CSN_STRSIZE];
c16027
 
c16027
 	do {
c16027
 
c16027
@@ -688,25 +755,14 @@ clcache_skip_change ( CLC_Buffer *buf )
c16027
 		 * legacy consumer. In this case the supplier
c16027
 		 * and the consumer may have the same RID.
c16027
 		 */
c16027
-		if (rid == buf->buf_consumer_rid && rid != MAX_REPLICA_ID){
c16027
-			CSN *cons_maxcsn = NULL;
c16027
-
c16027
-			ruv_get_max_csn(buf->buf_consumer_ruv, &cons_maxcsn);
c16027
-			if ( csn_compare ( buf->buf_current_csn, cons_maxcsn) > 0 ) {
c16027
-				/*
c16027
-				 *  The consumer must have been "restored" and needs this newer update.
c16027
-				 */
c16027
-				skip = 0;
c16027
-			} else if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
+		if (rid == buf->buf_consumer_rid && buf->buf_ignoreConsumerRID){
c16027
+			if (slapi_is_loglevel_set(SLAPI_LOG_REPL)) {
c16027
 				csn_as_string(buf->buf_current_csn, 0, buf_cur_csn_str);
c16027
-				csn_as_string(cons_maxcsn, 0, oth_csn_str);
c16027
 				slapi_log_error(SLAPI_LOG_REPL, buf->buf_agmt_name,
c16027
-					"Skipping update because the changelog buffer current csn [%s] is "
c16027
-				        "less than or equal to the consumer max csn [%s]\n",
c16027
-				        buf_cur_csn_str, oth_csn_str);
c16027
+					"Skipping update because the consumer with Rid: [%d] is "
c16027
+				        "ignored\n", rid);
c16027
 				buf->buf_skipped_csn_gt_cons_maxcsn++;
c16027
 			}
c16027
-			csn_free(&cons_maxcsn);
c16027
 			break;
c16027
 		}
c16027
 
c16027
@@ -821,6 +877,7 @@ clcache_free_cscb ( struct csn_seq_ctrl_block ** cscb )
c16027
 	csn_free ( & (*cscb)->consumer_maxcsn );
c16027
 	csn_free ( & (*cscb)->local_maxcsn );
c16027
 	csn_free ( & (*cscb)->prev_local_maxcsn );
c16027
+	csn_free ( & (*cscb)->local_mincsn );
c16027
 	slapi_ch_free ( (void **) cscb );
c16027
 }
c16027
 
c16027
@@ -1003,6 +1060,15 @@ clcache_cursor_get ( DBC *cursor, CLC_Buffer *buf, int flag )
c16027
 {
c16027
 	int rc;
c16027
 
c16027
+	if (buf->buf_data.ulen > WORK_CLC_BUFFER_PAGE_SIZE) {
c16027
+		/*
c16027
+		 * The buffer size had to be increased,
c16027
+		 * reset it to a smaller working size,
c16027
+		 * if not sufficient it will be increased again
c16027
+		 */
c16027
+		buf->buf_data.ulen = WORK_CLC_BUFFER_PAGE_SIZE;
c16027
+	}
c16027
+
c16027
 	rc = cursor->c_get ( cursor,
c16027
 						 & buf->buf_key,
c16027
 						 & buf->buf_data,
c16027
diff --git a/ldap/servers/plugins/replication/cl5_clcache.h b/ldap/servers/plugins/replication/cl5_clcache.h
c16027
index 4c459ab..75b2817 100644
c16027
--- a/ldap/servers/plugins/replication/cl5_clcache.h
c16027
+++ b/ldap/servers/plugins/replication/cl5_clcache.h
c16027
@@ -23,7 +23,7 @@ typedef struct clc_buffer CLC_Buffer;
c16027
 int	 clcache_init ( DB_ENV **dbenv );
c16027
 void clcache_set_config ();
c16027
 int	 clcache_get_buffer ( CLC_Buffer **buf, DB *db, ReplicaId consumer_rid, const RUV *consumer_ruv, const RUV *local_ruv );
c16027
-int	 clcache_load_buffer ( CLC_Buffer *buf, CSN *startCSN, int flag );
c16027
+int	 clcache_load_buffer ( CLC_Buffer *buf, CSN **anchorCSN );
c16027
 void clcache_return_buffer ( CLC_Buffer **buf );
c16027
 int	 clcache_get_next_change ( CLC_Buffer *buf, void **key, size_t *keylen, void **data, size_t *datalen, CSN **csn );
c16027
 void clcache_destroy ();
c16027
-- 
c16027
2.4.11
c16027