Blame SOURCES/0057-Ticket-48266-Fractional-replication-evaluates-severa.patch

a2f18f
From 05e127c89281cece8bc1fa79bac6b95cc23dcca9 Mon Sep 17 00:00:00 2001
a2f18f
From: Thierry Bordaz <tbordaz@redhat.com>
a2f18f
Date: Fri, 11 Sep 2015 18:56:53 +0200
a2f18f
Subject: [PATCH 57/61] Ticket 48266: Fractional replication evaluates several
a2f18f
 times the same CSN
a2f18f
a2f18f
Bug Description:
a2f18f
	In fractional replication if there are only skipped updates and many of them, the supplier
a2f18f
	acquire the replica for a long time. At the end of the session, RUV is not updated
a2f18f
	so the next session will restart evaluating the same skipped updates
a2f18f
a2f18f
Fix Description:
a2f18f
	The fix introduces subentries under the suffix: 'cn=repl keep alive <rid>,$SUFFIX'
a2f18f
	During an incremental replication session, if the session only contains skipped updates
a2f18f
	and the number of them overpass a threshold (100), it triggers an update on that subentry.
a2f18f
a2f18f
	This update will eventually be replicated, moving forward the RUV
a2f18f
a2f18f
https://fedorahosted.org/389/ticket/48266
a2f18f
a2f18f
Reviewed by: Noriko Hosoi, Rich Megginson, Simon Pichugin
a2f18f
a2f18f
Platforms tested: <plat>
a2f18f
a2f18f
Flag Day: no
a2f18f
a2f18f
Doc impact: no
a2f18f
a2f18f
(cherry picked from commit 6343e4cba17802e19daa5c971120fa352ff80ad4)
a2f18f
---
a2f18f
 ldap/servers/plugins/replication/repl5.h           |   2 +
a2f18f
 .../plugins/replication/repl5_inc_protocol.c       |  39 ++++++
a2f18f
 ldap/servers/plugins/replication/repl5_replica.c   | 156 +++++++++++++++++++++
a2f18f
 .../plugins/replication/repl5_tot_protocol.c       |  13 +-
a2f18f
 4 files changed, 209 insertions(+), 1 deletion(-)
a2f18f
a2f18f
diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
a2f18f
index 0b0f26b..17282bb 100644
a2f18f
--- a/ldap/servers/plugins/replication/repl5.h
a2f18f
+++ b/ldap/servers/plugins/replication/repl5.h
a2f18f
@@ -523,6 +523,8 @@ Replica *windows_replica_new(const Slapi_DN *root);
a2f18f
    during addition of the replica over LDAP */
a2f18f
 Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation);
a2f18f
 void replica_destroy(void **arg);
a2f18f
+int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid);
a2f18f
+int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid);
a2f18f
 PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
a2f18f
 									const char *locking_purl,
a2f18f
 									char **current_purl);
a2f18f
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
a2f18f
index 216de3c..e0599e5 100644
a2f18f
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
a2f18f
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
a2f18f
@@ -1672,6 +1672,11 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
a2f18f
 		int finished = 0;
a2f18f
 		ConnResult replay_crc;
a2f18f
 		char csn_str[CSN_STRSIZE];
a2f18f
+		PRBool subentry_update_sent = PR_FALSE;
a2f18f
+		PRBool subentry_update_needed = PR_FALSE;
a2f18f
+		int skipped_updates = 0;
a2f18f
+		int fractional_repl;
a2f18f
+#define FRACTIONAL_SKIPPED_THRESHOLD 100
a2f18f
 
a2f18f
 		/* Start the results reading thread */
a2f18f
 		rd = repl5_inc_rd_new(prp);
a2f18f
@@ -1688,6 +1693,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
a2f18f
 
a2f18f
 		memset ( (void*)&op, 0, sizeof (op) );
a2f18f
 		entry.op = &op;
a2f18f
+		fractional_repl = agmt_is_fractional(prp->agmt);
a2f18f
 		do {
a2f18f
 			cl5_operation_parameters_done ( entry.op );
a2f18f
 			memset ( (void*)entry.op, 0, sizeof (op) );
a2f18f
@@ -1783,6 +1789,15 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
a2f18f
 					csn_as_string(entry.op->csn, PR_FALSE, csn_str);
a2f18f
 					replica_id = csn_get_replicaid(entry.op->csn);
a2f18f
 					uniqueid = entry.op->target_address.uniqueid;
a2f18f
+                    
a2f18f
+					if (fractional_repl && message_id) 
a2f18f
+					{
a2f18f
+						/* This update was sent no need to update the subentry
a2f18f
+						 * and restart counting the skipped updates
a2f18f
+						 */
a2f18f
+						subentry_update_needed = PR_FALSE;
a2f18f
+						skipped_updates = 0;
a2f18f
+					}
a2f18f
 
a2f18f
 					if (prp->repl50consumer && message_id) 
a2f18f
 					{
a2f18f
@@ -1813,6 +1828,16 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
a2f18f
 							agmt_get_long_name(prp->agmt),
a2f18f
 							entry.op->target_address.uniqueid, csn_str);
a2f18f
 						agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/);
a2f18f
+						if (fractional_repl) 
a2f18f
+						{
a2f18f
+							skipped_updates++;
a2f18f
+							if (skipped_updates > FRACTIONAL_SKIPPED_THRESHOLD) {
a2f18f
+								slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
a2f18f
+										"%s: skipped updates is too high (%d) if no other update is sent we will update the subentry\n",
a2f18f
+										agmt_get_long_name(prp->agmt), skipped_updates);
a2f18f
+								subentry_update_needed = PR_TRUE;
a2f18f
+							}
a2f18f
+						}
a2f18f
 					}
a2f18f
 				}
a2f18f
 				break;
a2f18f
@@ -1878,6 +1903,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
a2f18f
 			PR_Unlock(rd->lock);
a2f18f
 		} while (!finished);
a2f18f
 
a2f18f
+		if (fractional_repl && subentry_update_needed)
a2f18f
+		{
a2f18f
+			Replica *replica;
a2f18f
+			ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
a2f18f
+			replica = (Replica*) object_get_data(prp->replica_object);
a2f18f
+			if (replica)
a2f18f
+			{
a2f18f
+				rid = replica_get_rid(replica);
a2f18f
+			}
a2f18f
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
a2f18f
+					"%s: skipped updates was definitely too high (%d) update the subentry now\n",
a2f18f
+					agmt_get_long_name(prp->agmt), skipped_updates);
a2f18f
+			replica_subentry_update(agmt_get_replarea(prp->agmt), rid);
a2f18f
+		}
a2f18f
 		/* Terminate the results reading thread */
a2f18f
 		if (!prp->repl50consumer) 
a2f18f
 		{
a2f18f
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
a2f18f
index 92b4e96..6ac28c1 100644
a2f18f
--- a/ldap/servers/plugins/replication/repl5_replica.c
a2f18f
+++ b/ldap/servers/plugins/replication/repl5_replica.c
a2f18f
@@ -414,6 +414,161 @@ replica_destroy(void **arg)
a2f18f
 	slapi_ch_free((void **)arg);
a2f18f
 }
a2f18f
 
a2f18f
+#define KEEP_ALIVE_ATTR "keepalivetimestamp"
a2f18f
+#define KEEP_ALIVE_ENTRY "repl keep alive"
a2f18f
+#define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s"
a2f18f
+
a2f18f
+
a2f18f
+static int
a2f18f
+replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid) 
a2f18f
+{
a2f18f
+    char *entry_string = NULL;
a2f18f
+    Slapi_Entry *e = NULL;
a2f18f
+    Slapi_PBlock *pb = NULL;
a2f18f
+    int return_value;
a2f18f
+    int rc = 0;
a2f18f
+
a2f18f
+    entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d",
a2f18f
+            KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid);
a2f18f
+    if (entry_string == NULL) {
a2f18f
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
a2f18f
+            "replica_subentry_create add failed in slapi_ch_smprintf\n");
a2f18f
+        rc = -1;
a2f18f
+        goto done;
a2f18f
+    }
a2f18f
+
a2f18f
+    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "add %s\n", entry_string);
a2f18f
+    e = slapi_str2entry(entry_string, 0);
a2f18f
+
a2f18f
+    /* create the entry */
a2f18f
+    pb = slapi_pblock_new();
a2f18f
+
a2f18f
+
a2f18f
+    slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */
a2f18f
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */);
a2f18f
+    slapi_add_internal_pb(pb);
a2f18f
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value);
a2f18f
+    if (return_value != LDAP_SUCCESS && return_value != LDAP_ALREADY_EXISTS) 
a2f18f
+    {
a2f18f
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
a2f18f
+                "create replication keep alive entry %s: %s\n", slapi_entry_get_dn_const(e),
a2f18f
+                ldap_err2string(return_value));
a2f18f
+        rc = -1;
a2f18f
+        slapi_entry_free(e); /* The entry was not consumed */
a2f18f
+        goto done;
a2f18f
+    }
a2f18f
+
a2f18f
+done:
a2f18f
+
a2f18f
+    slapi_pblock_destroy(pb);
a2f18f
+    slapi_ch_free_string(&entry_string);
a2f18f
+    return rc;
a2f18f
+
a2f18f
+}
a2f18f
+
a2f18f
+int
a2f18f
+replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid)
a2f18f
+{
a2f18f
+    Slapi_PBlock *pb;
a2f18f
+    char *filter = NULL;
a2f18f
+    Slapi_Entry **entries = NULL;
a2f18f
+    int res;
a2f18f
+    int rc = 0;
a2f18f
+
a2f18f
+    pb = slapi_pblock_new();
a2f18f
+    filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid);
a2f18f
+    slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL,
a2f18f
+            filter, NULL, 0, NULL, NULL,
a2f18f
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
a2f18f
+    slapi_search_internal_pb(pb);
a2f18f
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res;;
a2f18f
+    if (res == LDAP_SUCCESS)
a2f18f
+    {
a2f18f
+        slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
a2f18f
+        if (entries && (entries[0] == NULL))
a2f18f
+        {
a2f18f
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
a2f18f
+                    "Need to create replication keep alive entry <cn=%s %d,%s>\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
a2f18f
+            rc = replica_subentry_create(repl_root, rid);
a2f18f
+        } else {
a2f18f
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
a2f18f
+                    "replication keep alive entry <cn=%s %d,%s> already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
a2f18f
+            rc = 0;
a2f18f
+        }
a2f18f
+    } else {
a2f18f
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
a2f18f
+                "Error accessing replication keep alive entry <cn=%s %d,%s> res=%d\n",
a2f18f
+                KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res);
a2f18f
+        /* The status of the entry is not clear, do not attempt to create it */
a2f18f
+        rc = 1;
a2f18f
+    }
a2f18f
+    slapi_free_search_results_internal(pb);
a2f18f
+
a2f18f
+    slapi_pblock_destroy(pb);
a2f18f
+    slapi_ch_free_string(&filter);
a2f18f
+    return rc;
a2f18f
+}
a2f18f
+
a2f18f
+int
a2f18f
+replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) 
a2f18f
+{
a2f18f
+    int ldrc;
a2f18f
+    int rc = LDAP_SUCCESS; /* Optimistic default */
a2f18f
+    LDAPMod * mods[2];
a2f18f
+    LDAPMod mod;
a2f18f
+    struct berval * vals[2];
a2f18f
+    char buf[20];
a2f18f
+    time_t curtime;
a2f18f
+    struct tm ltm;
a2f18f
+    struct berval val;
a2f18f
+    Slapi_PBlock *modpb = NULL;
a2f18f
+    char *dn;
a2f18f
+
a2f18f
+    replica_subentry_check(repl_root, rid);
a2f18f
+    curtime = current_time();
a2f18f
+    gmtime_r(&curtime, <m;;
a2f18f
+    strftime(buf, sizeof (buf), "%Y%m%d%H%M%SZ", <m;;
a2f18f
+
a2f18f
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf);
a2f18f
+
a2f18f
+
a2f18f
+    val.bv_val = buf;
a2f18f
+    val.bv_len = strlen(val.bv_val);
a2f18f
+
a2f18f
+    vals [0] = &val;
a2f18f
+    vals [1] = NULL;
a2f18f
+
a2f18f
+    mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES;
a2f18f
+    mod.mod_type = KEEP_ALIVE_ATTR;
a2f18f
+    mod.mod_bvalues = vals;
a2f18f
+
a2f18f
+    mods[0] = &mod;
a2f18f
+    mods[1] = NULL;
a2f18f
+
a2f18f
+    modpb = slapi_pblock_new();
a2f18f
+    dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
a2f18f
+
a2f18f
+    slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL,
a2f18f
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
a2f18f
+    slapi_modify_internal_pb(modpb);
a2f18f
+
a2f18f
+    slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc;;
a2f18f
+
a2f18f
+    if (ldrc != LDAP_SUCCESS)
a2f18f
+    {
a2f18f
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
a2f18f
+                "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf);
a2f18f
+        rc = ldrc;
a2f18f
+    } else {
a2f18f
+        slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name,
a2f18f
+                "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf);
a2f18f
+    }
a2f18f
+
a2f18f
+    slapi_pblock_destroy(modpb);
a2f18f
+    slapi_ch_free_string(&dn;;
a2f18f
+    return rc;
a2f18f
+
a2f18f
+}
a2f18f
 /*
a2f18f
  * Attempt to obtain exclusive access to replica (advisory only)
a2f18f
  *
a2f18f
@@ -3816,6 +3971,7 @@ replica_enable_replication (Replica *r)
a2f18f
         /* What to do ? */
a2f18f
     }
a2f18f
 
a2f18f
+    replica_subentry_check(r->repl_root, replica_get_rid(r));
a2f18f
     /* Replica came back online, Check if the total update was terminated.
a2f18f
        If flag is still set, it was not terminated, therefore the data is
a2f18f
        very likely to be incorrect, and we should not restart Replication threads...
a2f18f
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
a2f18f
index d9401cf..e004af4 100644
a2f18f
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
a2f18f
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
a2f18f
@@ -318,6 +318,9 @@ repl5_tot_run(Private_Repl_Protocol *prp)
a2f18f
 	int portnum = 0;
a2f18f
 	Slapi_DN *area_sdn = NULL;
a2f18f
 	CSN *remote_schema_csn = NULL;
a2f18f
+	int init_retry = 0;
a2f18f
+	Replica *replica;
a2f18f
+	ReplicaId rid = 0; /* Used to create the replica keep alive subentry */
a2f18f
 	
a2f18f
 	PR_ASSERT(NULL != prp);
a2f18f
 
a2f18f
@@ -395,7 +398,15 @@ repl5_tot_run(Private_Repl_Protocol *prp)
a2f18f
     ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
a2f18f
     ctrls[0] = create_managedsait_control ();
a2f18f
     ctrls[1] = create_backend_control(area_sdn);
a2f18f
-	
a2f18f
+
a2f18f
+	/* Time to make sure it exists a keep alive subentry for that replica */
a2f18f
+	replica = (Replica*) object_get_data(prp->replica_object);
a2f18f
+	if (replica)
a2f18f
+	{
a2f18f
+		rid = replica_get_rid(replica);
a2f18f
+	}
a2f18f
+	replica_subentry_check(area_sdn, rid);
a2f18f
+
a2f18f
     slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn), 
a2f18f
                                   LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, 
a2f18f
                                   repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
a2f18f
-- 
a2f18f
1.9.3
a2f18f