Blob Blame Raw
From 05e127c89281cece8bc1fa79bac6b95cc23dcca9 Mon Sep 17 00:00:00 2001
From: Thierry Bordaz <tbordaz@redhat.com>
Date: Fri, 11 Sep 2015 18:56:53 +0200
Subject: [PATCH 57/61] Ticket 48266: Fractional replication evaluates several
 times the same CSN

Bug Description:
	In fractional replication if there are only skipped updates and many of them, the supplier
	acquire the replica for a long time. At the end of the session, RUV is not updated
	so the next session will restart evaluating the same skipped updates

Fix Description:
	The fix introduces subentries under the suffix: 'cn=repl keep alive <rid>,$SUFFIX'
	During an incremental replication session, if the session only contains skipped updates
	and the number of them overpass a threshold (100), it triggers an update on that subentry.

	This update will eventually be replicated, moving forward the RUV

https://fedorahosted.org/389/ticket/48266

Reviewed by: Noriko Hosoi, Rich Megginson, Simon Pichugin

Platforms tested: <plat>

Flag Day: no

Doc impact: no

(cherry picked from commit 6343e4cba17802e19daa5c971120fa352ff80ad4)
---
 ldap/servers/plugins/replication/repl5.h           |   2 +
 .../plugins/replication/repl5_inc_protocol.c       |  39 ++++++
 ldap/servers/plugins/replication/repl5_replica.c   | 156 +++++++++++++++++++++
 .../plugins/replication/repl5_tot_protocol.c       |  13 +-
 4 files changed, 209 insertions(+), 1 deletion(-)

diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 0b0f26b..17282bb 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -523,6 +523,8 @@ Replica *windows_replica_new(const Slapi_DN *root);
    during addition of the replica over LDAP */
 Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation);
 void replica_destroy(void **arg);
+int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid);
+int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid);
 PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
 									const char *locking_purl,
 									char **current_purl);
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index 216de3c..e0599e5 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -1672,6 +1672,11 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 		int finished = 0;
 		ConnResult replay_crc;
 		char csn_str[CSN_STRSIZE];
+		PRBool subentry_update_sent = PR_FALSE;
+		PRBool subentry_update_needed = PR_FALSE;
+		int skipped_updates = 0;
+		int fractional_repl;
+#define FRACTIONAL_SKIPPED_THRESHOLD 100
 
 		/* Start the results reading thread */
 		rd = repl5_inc_rd_new(prp);
@@ -1688,6 +1693,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 
 		memset ( (void*)&op, 0, sizeof (op) );
 		entry.op = &op;
+		fractional_repl = agmt_is_fractional(prp->agmt);
 		do {
 			cl5_operation_parameters_done ( entry.op );
 			memset ( (void*)entry.op, 0, sizeof (op) );
@@ -1783,6 +1789,15 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 					csn_as_string(entry.op->csn, PR_FALSE, csn_str);
 					replica_id = csn_get_replicaid(entry.op->csn);
 					uniqueid = entry.op->target_address.uniqueid;
+                    
+					if (fractional_repl && message_id) 
+					{
+						/* This update was sent no need to update the subentry
+						 * and restart counting the skipped updates
+						 */
+						subentry_update_needed = PR_FALSE;
+						skipped_updates = 0;
+					}
 
 					if (prp->repl50consumer && message_id) 
 					{
@@ -1813,6 +1828,16 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 							agmt_get_long_name(prp->agmt),
 							entry.op->target_address.uniqueid, csn_str);
 						agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/);
+						if (fractional_repl) 
+						{
+							skipped_updates++;
+							if (skipped_updates > FRACTIONAL_SKIPPED_THRESHOLD) {
+								slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+										"%s: skipped updates is too high (%d) if no other update is sent we will update the subentry\n",
+										agmt_get_long_name(prp->agmt), skipped_updates);
+								subentry_update_needed = PR_TRUE;
+							}
+						}
 					}
 				}
 				break;
@@ -1878,6 +1903,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 			PR_Unlock(rd->lock);
 		} while (!finished);
 
+		if (fractional_repl && subentry_update_needed)
+		{
+			Replica *replica;
+			ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
+			replica = (Replica*) object_get_data(prp->replica_object);
+			if (replica)
+			{
+				rid = replica_get_rid(replica);
+			}
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+					"%s: skipped updates was definitely too high (%d) update the subentry now\n",
+					agmt_get_long_name(prp->agmt), skipped_updates);
+			replica_subentry_update(agmt_get_replarea(prp->agmt), rid);
+		}
 		/* Terminate the results reading thread */
 		if (!prp->repl50consumer) 
 		{
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
index 92b4e96..6ac28c1 100644
--- a/ldap/servers/plugins/replication/repl5_replica.c
+++ b/ldap/servers/plugins/replication/repl5_replica.c
@@ -414,6 +414,161 @@ replica_destroy(void **arg)
 	slapi_ch_free((void **)arg);
 }
 
+#define KEEP_ALIVE_ATTR "keepalivetimestamp"
+#define KEEP_ALIVE_ENTRY "repl keep alive"
+#define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s"
+
+
+static int
+replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid) 
+{
+    char *entry_string = NULL;
+    Slapi_Entry *e = NULL;
+    Slapi_PBlock *pb = NULL;
+    int return_value;
+    int rc = 0;
+
+    entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d",
+            KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid);
+    if (entry_string == NULL) {
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+            "replica_subentry_create add failed in slapi_ch_smprintf\n");
+        rc = -1;
+        goto done;
+    }
+
+    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "add %s\n", entry_string);
+    e = slapi_str2entry(entry_string, 0);
+
+    /* create the entry */
+    pb = slapi_pblock_new();
+
+
+    slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */);
+    slapi_add_internal_pb(pb);
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value);
+    if (return_value != LDAP_SUCCESS && return_value != LDAP_ALREADY_EXISTS) 
+    {
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
+                "create replication keep alive entry %s: %s\n", slapi_entry_get_dn_const(e),
+                ldap_err2string(return_value));
+        rc = -1;
+        slapi_entry_free(e); /* The entry was not consumed */
+        goto done;
+    }
+
+done:
+
+    slapi_pblock_destroy(pb);
+    slapi_ch_free_string(&entry_string);
+    return rc;
+
+}
+
+int
+replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid)
+{
+    Slapi_PBlock *pb;
+    char *filter = NULL;
+    Slapi_Entry **entries = NULL;
+    int res;
+    int rc = 0;
+
+    pb = slapi_pblock_new();
+    filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid);
+    slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL,
+            filter, NULL, 0, NULL, NULL,
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
+    slapi_search_internal_pb(pb);
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res);
+    if (res == LDAP_SUCCESS)
+    {
+        slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
+        if (entries && (entries[0] == NULL))
+        {
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                    "Need to create replication keep alive entry <cn=%s %d,%s>\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
+            rc = replica_subentry_create(repl_root, rid);
+        } else {
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                    "replication keep alive entry <cn=%s %d,%s> already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
+            rc = 0;
+        }
+    } else {
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                "Error accessing replication keep alive entry <cn=%s %d,%s> res=%d\n",
+                KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res);
+        /* The status of the entry is not clear, do not attempt to create it */
+        rc = 1;
+    }
+    slapi_free_search_results_internal(pb);
+
+    slapi_pblock_destroy(pb);
+    slapi_ch_free_string(&filter);
+    return rc;
+}
+
+int
+replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) 
+{
+    int ldrc;
+    int rc = LDAP_SUCCESS; /* Optimistic default */
+    LDAPMod * mods[2];
+    LDAPMod mod;
+    struct berval * vals[2];
+    char buf[20];
+    time_t curtime;
+    struct tm ltm;
+    struct berval val;
+    Slapi_PBlock *modpb = NULL;
+    char *dn;
+
+    replica_subentry_check(repl_root, rid);
+    curtime = current_time();
+    gmtime_r(&curtime, &ltm);
+    strftime(buf, sizeof (buf), "%Y%m%d%H%M%SZ", &ltm);
+
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf);
+
+
+    val.bv_val = buf;
+    val.bv_len = strlen(val.bv_val);
+
+    vals [0] = &val;
+    vals [1] = NULL;
+
+    mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES;
+    mod.mod_type = KEEP_ALIVE_ATTR;
+    mod.mod_bvalues = vals;
+
+    mods[0] = &mod;
+    mods[1] = NULL;
+
+    modpb = slapi_pblock_new();
+    dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
+
+    slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL,
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
+    slapi_modify_internal_pb(modpb);
+
+    slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc);
+
+    if (ldrc != LDAP_SUCCESS)
+    {
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+                "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf);
+        rc = ldrc;
+    } else {
+        slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name,
+                "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf);
+    }
+
+    slapi_pblock_destroy(modpb);
+    slapi_ch_free_string(&dn);
+    return rc;
+
+}
 /*
  * Attempt to obtain exclusive access to replica (advisory only)
  *
@@ -3816,6 +3971,7 @@ replica_enable_replication (Replica *r)
         /* What to do ? */
     }
 
+    replica_subentry_check(r->repl_root, replica_get_rid(r));
     /* Replica came back online, Check if the total update was terminated.
        If flag is still set, it was not terminated, therefore the data is
        very likely to be incorrect, and we should not restart Replication threads...
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
index d9401cf..e004af4 100644
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
@@ -318,6 +318,9 @@ repl5_tot_run(Private_Repl_Protocol *prp)
 	int portnum = 0;
 	Slapi_DN *area_sdn = NULL;
 	CSN *remote_schema_csn = NULL;
+	int init_retry = 0;
+	Replica *replica;
+	ReplicaId rid = 0; /* Used to create the replica keep alive subentry */
 	
 	PR_ASSERT(NULL != prp);
 
@@ -395,7 +398,15 @@ repl5_tot_run(Private_Repl_Protocol *prp)
     ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
     ctrls[0] = create_managedsait_control ();
     ctrls[1] = create_backend_control(area_sdn);
-	
+
+	/* Time to make sure it exists a keep alive subentry for that replica */
+	replica = (Replica*) object_get_data(prp->replica_object);
+	if (replica)
+	{
+		rid = replica_get_rid(replica);
+	}
+	replica_subentry_check(area_sdn, rid);
+
     slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn), 
                                   LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, 
                                   repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
-- 
1.9.3