andykimpe / rpms / 389-ds-base

Forked from rpms/389-ds-base 4 months ago
Clone
Blob Blame History Raw
From 29ef94c5991621bc1c59cbbdedcfcbd0d04ba18f Mon Sep 17 00:00:00 2001
From: Thierry Bordaz <tbordaz@redhat.com>
Date: Fri, 11 Sep 2015 18:56:53 +0200
Subject: [PATCH 338/342] Ticket 48266: Fractional replication evaluates
 several times the same CSN

Bug Description:
	In fractional replication if there are only skipped updates and many of them, the supplier
	acquire the replica for a long time. At the end of the session, RUV is not updated
	so the next session will restart evaluating the same skipped updates

Fix Description:
	The fix introduces subentries under the suffix: 'cn=repl keep alive <rid>,$SUFFIX'
	During an incremental replication session, if the session only contains skipped updates
	and the number of them overpass a threshold (100), it triggers an update on that subentry.

	This update will eventually be replicated, moving forward the RUV

https://fedorahosted.org/389/ticket/48266

Reviewed by: Noriko Hosoi, Rich Megginson, Simon Pichugin

Platforms tested: <plat>

Flag Day: no

Doc impact: no

(cherry picked from commit f04f4c0140c1a970314735cb69b827230136b346)
---
 ldap/servers/plugins/replication/repl5.h           |   2 +
 .../plugins/replication/repl5_inc_protocol.c       |  39 ++++++
 ldap/servers/plugins/replication/repl5_replica.c   | 156 +++++++++++++++++++++
 .../plugins/replication/repl5_tot_protocol.c       |  13 +-
 4 files changed, 209 insertions(+), 1 deletion(-)

diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h
index 6cec248..66006f6 100644
--- a/ldap/servers/plugins/replication/repl5.h
+++ b/ldap/servers/plugins/replication/repl5.h
@@ -521,6 +521,8 @@ Replica *windows_replica_new(const Slapi_DN *root);
    during addition of the replica over LDAP */
 Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation);
 void replica_destroy(void **arg);
+int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid);
+int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid);
 PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid,
 									const char *locking_purl,
 									char **current_purl);
diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c
index f5516a3..0dc9f30 100644
--- a/ldap/servers/plugins/replication/repl5_inc_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_inc_protocol.c
@@ -1688,6 +1688,11 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 		int finished = 0;
 		ConnResult replay_crc;
 		char csn_str[CSN_STRSIZE];
+		PRBool subentry_update_sent = PR_FALSE;
+		PRBool subentry_update_needed = PR_FALSE;
+		int skipped_updates = 0;
+		int fractional_repl;
+#define FRACTIONAL_SKIPPED_THRESHOLD 100
 
 		/* Start the results reading thread */
 		rd = repl5_inc_rd_new(prp);
@@ -1704,6 +1709,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 
 		memset ( (void*)&op, 0, sizeof (op) );
 		entry.op = &op;
+		fractional_repl = agmt_is_fractional(prp->agmt);
 		do {
 			cl5_operation_parameters_done ( entry.op );
 			memset ( (void*)entry.op, 0, sizeof (op) );
@@ -1799,6 +1805,15 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 					csn_as_string(entry.op->csn, PR_FALSE, csn_str);
 					replica_id = csn_get_replicaid(entry.op->csn);
 					uniqueid = entry.op->target_address.uniqueid;
+                    
+					if (fractional_repl && message_id) 
+					{
+						/* This update was sent no need to update the subentry
+						 * and restart counting the skipped updates
+						 */
+						subentry_update_needed = PR_FALSE;
+						skipped_updates = 0;
+					}
 
 					if (prp->repl50consumer && message_id) 
 					{
@@ -1829,6 +1844,16 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 							agmt_get_long_name(prp->agmt),
 							entry.op->target_address.uniqueid, csn_str);
 						agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/);
+						if (fractional_repl) 
+						{
+							skipped_updates++;
+							if (skipped_updates > FRACTIONAL_SKIPPED_THRESHOLD) {
+								slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+										"%s: skipped updates is too high (%d) if no other update is sent we will update the subentry\n",
+										agmt_get_long_name(prp->agmt), skipped_updates);
+								subentry_update_needed = PR_TRUE;
+							}
+						}
 					}
 				}
 				break;
@@ -1894,6 +1919,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu
 			PR_Unlock(rd->lock);
 		} while (!finished);
 
+		if (fractional_repl && subentry_update_needed)
+		{
+			Replica *replica;
+			ReplicaId rid = -1; /* Used to create the replica keep alive subentry */
+			replica = (Replica*) object_get_data(prp->replica_object);
+			if (replica)
+			{
+				rid = replica_get_rid(replica);
+			}
+			slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+					"%s: skipped updates was definitely too high (%d) update the subentry now\n",
+					agmt_get_long_name(prp->agmt), skipped_updates);
+			replica_subentry_update(agmt_get_replarea(prp->agmt), rid);
+		}
 		/* Terminate the results reading thread */
 		if (!prp->repl50consumer) 
 		{
diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c
index f64e719..84539d2 100644
--- a/ldap/servers/plugins/replication/repl5_replica.c
+++ b/ldap/servers/plugins/replication/repl5_replica.c
@@ -398,6 +398,161 @@ replica_destroy(void **arg)
 	slapi_ch_free((void **)arg);
 }
 
+#define KEEP_ALIVE_ATTR "keepalivetimestamp"
+#define KEEP_ALIVE_ENTRY "repl keep alive"
+#define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s"
+
+
+static int
+replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid) 
+{
+    char *entry_string = NULL;
+    Slapi_Entry *e = NULL;
+    Slapi_PBlock *pb = NULL;
+    int return_value;
+    int rc = 0;
+
+    entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d",
+            KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid);
+    if (entry_string == NULL) {
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+            "replica_subentry_create add failed in slapi_ch_smprintf\n");
+        rc = -1;
+        goto done;
+    }
+
+    slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "add %s\n", entry_string);
+    e = slapi_str2entry(entry_string, 0);
+
+    /* create the entry */
+    pb = slapi_pblock_new();
+
+
+    slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */);
+    slapi_add_internal_pb(pb);
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value);
+    if (return_value != LDAP_SUCCESS && return_value != LDAP_ALREADY_EXISTS) 
+    {
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to "
+                "create replication keep alive entry %s: %s\n", slapi_entry_get_dn_const(e),
+                ldap_err2string(return_value));
+        rc = -1;
+        slapi_entry_free(e); /* The entry was not consumed */
+        goto done;
+    }
+
+done:
+
+    slapi_pblock_destroy(pb);
+    slapi_ch_free_string(&entry_string);
+    return rc;
+
+}
+
+int
+replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid)
+{
+    Slapi_PBlock *pb;
+    char *filter = NULL;
+    Slapi_Entry **entries = NULL;
+    int res;
+    int rc = 0;
+
+    pb = slapi_pblock_new();
+    filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid);
+    slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL,
+            filter, NULL, 0, NULL, NULL,
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
+    slapi_search_internal_pb(pb);
+    slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res);
+    if (res == LDAP_SUCCESS)
+    {
+        slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries);
+        if (entries && (entries[0] == NULL))
+        {
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                    "Need to create replication keep alive entry <cn=%s %d,%s>\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
+            rc = replica_subentry_create(repl_root, rid);
+        } else {
+            slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                    "replication keep alive entry <cn=%s %d,%s> already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
+            rc = 0;
+        }
+    } else {
+        slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name,
+                "Error accessing replication keep alive entry <cn=%s %d,%s> res=%d\n",
+                KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res);
+        /* The status of the entry is not clear, do not attempt to create it */
+        rc = 1;
+    }
+    slapi_free_search_results_internal(pb);
+
+    slapi_pblock_destroy(pb);
+    slapi_ch_free_string(&filter);
+    return rc;
+}
+
+int
+replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) 
+{
+    int ldrc;
+    int rc = LDAP_SUCCESS; /* Optimistic default */
+    LDAPMod * mods[2];
+    LDAPMod mod;
+    struct berval * vals[2];
+    char buf[20];
+    time_t curtime;
+    struct tm ltm;
+    struct berval val;
+    Slapi_PBlock *modpb = NULL;
+    char *dn;
+
+    replica_subentry_check(repl_root, rid);
+    curtime = current_time();
+    gmtime_r(&curtime, &ltm);
+    strftime(buf, sizeof (buf), "%Y%m%d%H%M%SZ", &ltm);
+
+    slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf);
+
+
+    val.bv_val = buf;
+    val.bv_len = strlen(val.bv_val);
+
+    vals [0] = &val;
+    vals [1] = NULL;
+
+    mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES;
+    mod.mod_type = KEEP_ALIVE_ATTR;
+    mod.mod_bvalues = vals;
+
+    mods[0] = &mod;
+    mods[1] = NULL;
+
+    modpb = slapi_pblock_new();
+    dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root));
+
+    slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL,
+            repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0);
+    slapi_modify_internal_pb(modpb);
+
+    slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc);
+
+    if (ldrc != LDAP_SUCCESS)
+    {
+        slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name,
+                "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf);
+        rc = ldrc;
+    } else {
+        slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name,
+                "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf);
+    }
+
+    slapi_pblock_destroy(modpb);
+    slapi_ch_free_string(&dn);
+    return rc;
+
+}
 /*
  * Attempt to obtain exclusive access to replica (advisory only)
  *
@@ -3616,6 +3771,7 @@ replica_enable_replication (Replica *r)
         /* What to do ? */
     }
 
+    replica_subentry_check(r->repl_root, replica_get_rid(r));
     /* Replica came back online, Check if the total update was terminated.
        If flag is still set, it was not terminated, therefore the data is
        very likely to be incorrect, and we should not restart Replication threads...
diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c
index e514dc6..0143e19 100644
--- a/ldap/servers/plugins/replication/repl5_tot_protocol.c
+++ b/ldap/servers/plugins/replication/repl5_tot_protocol.c
@@ -335,6 +335,9 @@ repl5_tot_run(Private_Repl_Protocol *prp)
 	int portnum = 0;
 	Slapi_DN *area_sdn = NULL;
 	CSN *remote_schema_csn = NULL;
+	int init_retry = 0;
+	Replica *replica;
+	ReplicaId rid = 0; /* Used to create the replica keep alive subentry */
 	
 	PR_ASSERT(NULL != prp);
 
@@ -412,7 +415,15 @@ repl5_tot_run(Private_Repl_Protocol *prp)
     ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *));
     ctrls[0] = create_managedsait_control ();
     ctrls[1] = create_backend_control(area_sdn);
-	
+
+	/* Time to make sure it exists a keep alive subentry for that replica */
+	replica = (Replica*) object_get_data(prp->replica_object);
+	if (replica)
+	{
+		rid = replica_get_rid(replica);
+	}
+	replica_subentry_check(area_sdn, rid);
+
     slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn), 
                                   LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, 
                                   repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0);
-- 
1.9.3