| From 05e127c89281cece8bc1fa79bac6b95cc23dcca9 Mon Sep 17 00:00:00 2001 |
| From: Thierry Bordaz <tbordaz@redhat.com> |
| Date: Fri, 11 Sep 2015 18:56:53 +0200 |
| Subject: [PATCH 57/61] Ticket 48266: Fractional replication evaluates several |
| times the same CSN |
| |
| Bug Description: |
| In fractional replication if there are only skipped updates and many of them, the supplier |
| acquire the replica for a long time. At the end of the session, RUV is not updated |
| so the next session will restart evaluating the same skipped updates |
| |
| Fix Description: |
| The fix introduces subentries under the suffix: 'cn=repl keep alive <rid>,$SUFFIX' |
| During an incremental replication session, if the session only contains skipped updates |
| and the number of them overpass a threshold (100), it triggers an update on that subentry. |
| |
| This update will eventually be replicated, moving forward the RUV |
| |
| https://fedorahosted.org/389/ticket/48266 |
| |
| Reviewed by: Noriko Hosoi, Rich Megginson, Simon Pichugin |
| |
| Platforms tested: <plat> |
| |
| Flag Day: no |
| |
| Doc impact: no |
| |
| (cherry picked from commit 6343e4cba17802e19daa5c971120fa352ff80ad4) |
| |
| ldap/servers/plugins/replication/repl5.h | 2 + |
| .../plugins/replication/repl5_inc_protocol.c | 39 ++++++ |
| ldap/servers/plugins/replication/repl5_replica.c | 156 +++++++++++++++++++++ |
| .../plugins/replication/repl5_tot_protocol.c | 13 +- |
| 4 files changed, 209 insertions(+), 1 deletion(-) |
| |
| diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h |
| index 0b0f26b..17282bb 100644 |
| |
| |
| @@ -523,6 +523,8 @@ Replica *windows_replica_new(const Slapi_DN *root); |
| during addition of the replica over LDAP */ |
| Replica *replica_new_from_entry (Slapi_Entry *e, char *errortext, PRBool is_add_operation); |
| void replica_destroy(void **arg); |
| +int replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid); |
| +int replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid); |
| PRBool replica_get_exclusive_access(Replica *r, PRBool *isInc, PRUint64 connid, int opid, |
| const char *locking_purl, |
| char **current_purl); |
| diff --git a/ldap/servers/plugins/replication/repl5_inc_protocol.c b/ldap/servers/plugins/replication/repl5_inc_protocol.c |
| index 216de3c..e0599e5 100644 |
| |
| |
| @@ -1672,6 +1672,11 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu |
| int finished = 0; |
| ConnResult replay_crc; |
| char csn_str[CSN_STRSIZE]; |
| + PRBool subentry_update_sent = PR_FALSE; |
| + PRBool subentry_update_needed = PR_FALSE; |
| + int skipped_updates = 0; |
| + int fractional_repl; |
| +#define FRACTIONAL_SKIPPED_THRESHOLD 100 |
| |
| /* Start the results reading thread */ |
| rd = repl5_inc_rd_new(prp); |
| @@ -1688,6 +1693,7 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu |
| |
| memset ( (void*)&op, 0, sizeof (op) ); |
| entry.op = &op; |
| + fractional_repl = agmt_is_fractional(prp->agmt); |
| do { |
| cl5_operation_parameters_done ( entry.op ); |
| memset ( (void*)entry.op, 0, sizeof (op) ); |
| @@ -1783,6 +1789,15 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu |
| csn_as_string(entry.op->csn, PR_FALSE, csn_str); |
| replica_id = csn_get_replicaid(entry.op->csn); |
| uniqueid = entry.op->target_address.uniqueid; |
| + |
| + if (fractional_repl && message_id) |
| + { |
| + /* This update was sent no need to update the subentry |
| + * and restart counting the skipped updates |
| + */ |
| + subentry_update_needed = PR_FALSE; |
| + skipped_updates = 0; |
| + } |
| |
| if (prp->repl50consumer && message_id) |
| { |
| @@ -1813,6 +1828,16 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu |
| agmt_get_long_name(prp->agmt), |
| entry.op->target_address.uniqueid, csn_str); |
| agmt_inc_last_update_changecount (prp->agmt, csn_get_replicaid(entry.op->csn), 1 /*skipped*/); |
| + if (fractional_repl) |
| + { |
| + skipped_updates++; |
| + if (skipped_updates > FRACTIONAL_SKIPPED_THRESHOLD) { |
| + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, |
| + "%s: skipped updates is too high (%d) if no other update is sent we will update the subentry\n", |
| + agmt_get_long_name(prp->agmt), skipped_updates); |
| + subentry_update_needed = PR_TRUE; |
| + } |
| + } |
| } |
| } |
| break; |
| @@ -1878,6 +1903,20 @@ send_updates(Private_Repl_Protocol *prp, RUV *remote_update_vector, PRUint32 *nu |
| PR_Unlock(rd->lock); |
| } while (!finished); |
| |
| + if (fractional_repl && subentry_update_needed) |
| + { |
| + Replica *replica; |
| + ReplicaId rid = -1; /* Used to create the replica keep alive subentry */ |
| + replica = (Replica*) object_get_data(prp->replica_object); |
| + if (replica) |
| + { |
| + rid = replica_get_rid(replica); |
| + } |
| + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, |
| + "%s: skipped updates was definitely too high (%d) update the subentry now\n", |
| + agmt_get_long_name(prp->agmt), skipped_updates); |
| + replica_subentry_update(agmt_get_replarea(prp->agmt), rid); |
| + } |
| /* Terminate the results reading thread */ |
| if (!prp->repl50consumer) |
| { |
| diff --git a/ldap/servers/plugins/replication/repl5_replica.c b/ldap/servers/plugins/replication/repl5_replica.c |
| index 92b4e96..6ac28c1 100644 |
| |
| |
| @@ -414,6 +414,161 @@ replica_destroy(void **arg) |
| slapi_ch_free((void **)arg); |
| } |
| |
| +#define KEEP_ALIVE_ATTR "keepalivetimestamp" |
| +#define KEEP_ALIVE_ENTRY "repl keep alive" |
| +#define KEEP_ALIVE_DN_FORMAT "cn=%s %d,%s" |
| + |
| + |
| +static int |
| +replica_subentry_create(Slapi_DN *repl_root, ReplicaId rid) |
| +{ |
| + char *entry_string = NULL; |
| + Slapi_Entry *e = NULL; |
| + Slapi_PBlock *pb = NULL; |
| + int return_value; |
| + int rc = 0; |
| + |
| + entry_string = slapi_ch_smprintf("dn: cn=%s %d,%s\nobjectclass: top\nobjectclass: ldapsubentry\nobjectclass: extensibleObject\ncn: %s %d", |
| + KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), KEEP_ALIVE_ENTRY, rid); |
| + if (entry_string == NULL) { |
| + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, |
| + "replica_subentry_create add failed in slapi_ch_smprintf\n"); |
| + rc = -1; |
| + goto done; |
| + } |
| + |
| + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "add %s\n", entry_string); |
| + e = slapi_str2entry(entry_string, 0); |
| + |
| + /* create the entry */ |
| + pb = slapi_pblock_new(); |
| + |
| + |
| + slapi_add_entry_internal_set_pb(pb, e, NULL, /* controls */ |
| + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0 /* flags */); |
| + slapi_add_internal_pb(pb); |
| + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &return_value); |
| + if (return_value != LDAP_SUCCESS && return_value != LDAP_ALREADY_EXISTS) |
| + { |
| + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, "Warning: unable to " |
| + "create replication keep alive entry %s: %s\n", slapi_entry_get_dn_const(e), |
| + ldap_err2string(return_value)); |
| + rc = -1; |
| + slapi_entry_free(e); /* The entry was not consumed */ |
| + goto done; |
| + } |
| + |
| +done: |
| + |
| + slapi_pblock_destroy(pb); |
| + slapi_ch_free_string(&entry_string); |
| + return rc; |
| + |
| +} |
| + |
| +int |
| +replica_subentry_check(Slapi_DN *repl_root, ReplicaId rid) |
| +{ |
| + Slapi_PBlock *pb; |
| + char *filter = NULL; |
| + Slapi_Entry **entries = NULL; |
| + int res; |
| + int rc = 0; |
| + |
| + pb = slapi_pblock_new(); |
| + filter = slapi_ch_smprintf("(&(objectclass=ldapsubentry)(cn=%s %d))", KEEP_ALIVE_ENTRY, rid); |
| + slapi_search_internal_set_pb(pb, slapi_sdn_get_dn(repl_root), LDAP_SCOPE_ONELEVEL, |
| + filter, NULL, 0, NULL, NULL, |
| + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); |
| + slapi_search_internal_pb(pb); |
| + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_RESULT, &res); |
| + if (res == LDAP_SUCCESS) |
| + { |
| + slapi_pblock_get(pb, SLAPI_PLUGIN_INTOP_SEARCH_ENTRIES, &entries); |
| + if (entries && (entries[0] == NULL)) |
| + { |
| + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, |
| + "Need to create replication keep alive entry <cn=%s %d,%s>\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root)); |
| + rc = replica_subentry_create(repl_root, rid); |
| + } else { |
| + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, |
| + "replication keep alive entry <cn=%s %d,%s> already exists\n", KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root)); |
| + rc = 0; |
| + } |
| + } else { |
| + slapi_log_error(SLAPI_LOG_FATAL, repl_plugin_name, |
| + "Error accessing replication keep alive entry <cn=%s %d,%s> res=%d\n", |
| + KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root), res); |
| + /* The status of the entry is not clear, do not attempt to create it */ |
| + rc = 1; |
| + } |
| + slapi_free_search_results_internal(pb); |
| + |
| + slapi_pblock_destroy(pb); |
| + slapi_ch_free_string(&filter); |
| + return rc; |
| +} |
| + |
| +int |
| +replica_subentry_update(Slapi_DN *repl_root, ReplicaId rid) |
| +{ |
| + int ldrc; |
| + int rc = LDAP_SUCCESS; /* Optimistic default */ |
| + LDAPMod * mods[2]; |
| + LDAPMod mod; |
| + struct berval * vals[2]; |
| + char buf[20]; |
| + time_t curtime; |
| + struct tm ltm; |
| + struct berval val; |
| + Slapi_PBlock *modpb = NULL; |
| + char *dn; |
| + |
| + replica_subentry_check(repl_root, rid); |
| + curtime = current_time(); |
| + gmtime_r(&curtime, <m); |
| + strftime(buf, sizeof (buf), "%Y%m%d%H%M%SZ", <m); |
| + |
| + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, "subentry_update called at %s\n", buf); |
| + |
| + |
| + val.bv_val = buf; |
| + val.bv_len = strlen(val.bv_val); |
| + |
| + vals [0] = &val; |
| + vals [1] = NULL; |
| + |
| + mod.mod_op = LDAP_MOD_REPLACE | LDAP_MOD_BVALUES; |
| + mod.mod_type = KEEP_ALIVE_ATTR; |
| + mod.mod_bvalues = vals; |
| + |
| + mods[0] = &mod; |
| + mods[1] = NULL; |
| + |
| + modpb = slapi_pblock_new(); |
| + dn = slapi_ch_smprintf(KEEP_ALIVE_DN_FORMAT, KEEP_ALIVE_ENTRY, rid, slapi_sdn_get_dn(repl_root)); |
| + |
| + slapi_modify_internal_set_pb(modpb, dn, mods, NULL, NULL, |
| + repl_get_plugin_identity(PLUGIN_MULTIMASTER_REPLICATION), 0); |
| + slapi_modify_internal_pb(modpb); |
| + |
| + slapi_pblock_get(modpb, SLAPI_PLUGIN_INTOP_RESULT, &ldrc); |
| + |
| + if (ldrc != LDAP_SUCCESS) |
| + { |
| + slapi_log_error(SLAPI_LOG_REPL, repl_plugin_name, |
| + "Failure (%d) to update replication keep alive entry \"%s: %s\"\n", ldrc, KEEP_ALIVE_ATTR, buf); |
| + rc = ldrc; |
| + } else { |
| + slapi_log_error(SLAPI_LOG_PLUGIN, repl_plugin_name, |
| + "Successful update of replication keep alive entry \"%s: %s\"\n", KEEP_ALIVE_ATTR, buf); |
| + } |
| + |
| + slapi_pblock_destroy(modpb); |
| + slapi_ch_free_string(&dn); |
| + return rc; |
| + |
| +} |
| /* |
| * Attempt to obtain exclusive access to replica (advisory only) |
| * |
| @@ -3816,6 +3971,7 @@ replica_enable_replication (Replica *r) |
| /* What to do ? */ |
| } |
| |
| + replica_subentry_check(r->repl_root, replica_get_rid(r)); |
| /* Replica came back online, Check if the total update was terminated. |
| If flag is still set, it was not terminated, therefore the data is |
| very likely to be incorrect, and we should not restart Replication threads... |
| diff --git a/ldap/servers/plugins/replication/repl5_tot_protocol.c b/ldap/servers/plugins/replication/repl5_tot_protocol.c |
| index d9401cf..e004af4 100644 |
| |
| |
| @@ -318,6 +318,9 @@ repl5_tot_run(Private_Repl_Protocol *prp) |
| int portnum = 0; |
| Slapi_DN *area_sdn = NULL; |
| CSN *remote_schema_csn = NULL; |
| + int init_retry = 0; |
| + Replica *replica; |
| + ReplicaId rid = 0; /* Used to create the replica keep alive subentry */ |
| |
| PR_ASSERT(NULL != prp); |
| |
| @@ -395,7 +398,15 @@ repl5_tot_run(Private_Repl_Protocol *prp) |
| ctrls = (LDAPControl **)slapi_ch_calloc (3, sizeof (LDAPControl *)); |
| ctrls[0] = create_managedsait_control (); |
| ctrls[1] = create_backend_control(area_sdn); |
| - |
| + |
| + /* Time to make sure it exists a keep alive subentry for that replica */ |
| + replica = (Replica*) object_get_data(prp->replica_object); |
| + if (replica) |
| + { |
| + rid = replica_get_rid(replica); |
| + } |
| + replica_subentry_check(area_sdn, rid); |
| + |
| slapi_search_internal_set_pb (pb, slapi_sdn_get_dn (area_sdn), |
| LDAP_SCOPE_SUBTREE, "(|(objectclass=ldapsubentry)(objectclass=nstombstone)(nsuniqueid=*))", NULL, 0, ctrls, NULL, |
| repl_get_plugin_identity (PLUGIN_MULTIMASTER_REPLICATION), 0); |
| -- |
| 1.9.3 |
| |