From bc438614f71d18e337a56b49a67627299658b649 Mon Sep 17 00:00:00 2001 From: Mark Reynolds Date: Wed, 7 Aug 2019 20:36:53 -0400 Subject: [PATCH 1/4] Issue 50538 - cleanAllRUV task limit is not enforced for replicated tasks Bug Description: There is a hard limit of 64 concurrent cleanAllRUV tasks, but this limit is only enforced when creating "new" tasks. It was not enforced when a task was received via an extended operation. There were also race conditions in the existing logic that allowed the array of cleaned rids to get corrupted . This allowed for a very large number of task threads to be created. Fix Description: Maintain a new counter to keep track of the number of clean and abort threads to make sure it never over runs the rid array buffers. relates: https://pagure.io/389-ds-base/issue/50538 Reviewed by: lkrispenz(Thanks!) --- .../suites/replication/cleanallruv_test.py | 144 +++++++++- ldap/servers/plugins/replication/repl5.h | 7 +- .../replication/repl5_replica_config.c | 247 ++++++++++-------- ldap/servers/plugins/replication/repl_extop.c | 19 +- 4 files changed, 299 insertions(+), 118 deletions(-) diff --git a/dirsrvtests/tests/suites/replication/cleanallruv_test.py b/dirsrvtests/tests/suites/replication/cleanallruv_test.py index 09805d6b2..4893b81fe 100644 --- a/dirsrvtests/tests/suites/replication/cleanallruv_test.py +++ b/dirsrvtests/tests/suites/replication/cleanallruv_test.py @@ -1,5 +1,5 @@ # --- BEGIN COPYRIGHT BLOCK --- -# Copyright (C) 2016 Red Hat, Inc. +# Copyright (C) 2019 Red Hat, Inc. # All rights reserved. # # License: GPL (version 3 or any later version). @@ -7,7 +7,6 @@ # --- END COPYRIGHT BLOCK --- # import threading - import pytest import random from lib389 import DirSrv @@ -721,6 +720,147 @@ def test_multiple_tasks_with_force(topology_m4, m4rid): log.fatal('test_abort: CleanAllRUV task was not aborted') assert False + +@pytest.mark.bz1466441 +@pytest.mark.ds50370 +def test_clean_shutdown_crash(topology_m2): + """Check that server didn't crash after shutdown when running CleanAllRUV task + + :id: c34d0b40-3c3e-4f53-8656-5e4c2a310aaf + :setup: Replication setup with two masters + :steps: + 1. Enable TLS on both masters + 2. Reconfigure both agreements to use TLS Client auth + 3. Stop master2 + 4. Run the CleanAllRUV task + 5. Restart master1 + 6. Check if master1 didn't crash + 7. Restart master1 again + 8. Check if master1 didn't crash + + :expectedresults: + 1. Success + 2. Success + 3. Success + 4. Success + 5. Success + 6. Success + 7. Success + 8. Success + """ + + m1 = topology_m2.ms["master1"] + m2 = topology_m2.ms["master2"] + + repl = ReplicationManager(DEFAULT_SUFFIX) + + cm_m1 = CertmapLegacy(m1) + cm_m2 = CertmapLegacy(m2) + + certmaps = cm_m1.list() + certmaps['default']['DNComps'] = None + certmaps['default']['CmapLdapAttr'] = 'nsCertSubjectDN' + + cm_m1.set(certmaps) + cm_m2.set(certmaps) + + log.info('Enabling TLS') + [i.enable_tls() for i in topology_m2] + + log.info('Creating replication dns') + services = ServiceAccounts(m1, DEFAULT_SUFFIX) + repl_m1 = services.get('%s:%s' % (m1.host, m1.sslport)) + repl_m1.set('nsCertSubjectDN', m1.get_server_tls_subject()) + + repl_m2 = services.get('%s:%s' % (m2.host, m2.sslport)) + repl_m2.set('nsCertSubjectDN', m2.get_server_tls_subject()) + + log.info('Changing auth type') + replica_m1 = Replicas(m1).get(DEFAULT_SUFFIX) + agmt_m1 = replica_m1.get_agreements().list()[0] + agmt_m1.replace_many( + ('nsDS5ReplicaBindMethod', 'SSLCLIENTAUTH'), + ('nsDS5ReplicaTransportInfo', 'SSL'), + ('nsDS5ReplicaPort', '%s' % m2.sslport), + ) + + agmt_m1.remove_all('nsDS5ReplicaBindDN') + + replica_m2 = Replicas(m2).get(DEFAULT_SUFFIX) + agmt_m2 = replica_m2.get_agreements().list()[0] + + agmt_m2.replace_many( + ('nsDS5ReplicaBindMethod', 'SSLCLIENTAUTH'), + ('nsDS5ReplicaTransportInfo', 'SSL'), + ('nsDS5ReplicaPort', '%s' % m1.sslport), + ) + agmt_m2.remove_all('nsDS5ReplicaBindDN') + + log.info('Stopping master2') + m2.stop() + + log.info('Run the cleanAllRUV task') + cruv_task = CleanAllRUVTask(m1) + cruv_task.create(properties={ + 'replica-id': repl.get_rid(m1), + 'replica-base-dn': DEFAULT_SUFFIX, + 'replica-force-cleaning': 'no', + 'replica-certify-all': 'yes' + }) + + m1.restart() + + log.info('Check if master1 crashed') + assert not m1.detectDisorderlyShutdown() + + log.info('Repeat') + m1.restart() + assert not m1.detectDisorderlyShutdown() + + +def test_max_tasks(topology_m4): + """Test we can not create more than 64 cleaning tasks + + :id: c34d0b40-3c3e-4f53-8656-5e4c2a310a1f + :setup: Replication setup with four masters + :steps: + 1. Stop masters 3 & 4 + 2. Create over 64 tasks between m1 and m2 + 3. Check logs to see if (>65) tasks were rejected + + :expectedresults: + 1. Success + 2. Success + 3. Success + """ + + # Stop masters 3 & 4 + m1 = topology_m4.ms["master1"] + m2 = topology_m4.ms["master2"] + m3 = topology_m4.ms["master3"] + m4 = topology_m4.ms["master4"] + m3.stop() + m4.stop() + + # Add over 64 tasks between master1 & 2 to try to exceed the 64 task limit + for i in range(1, 64): + cruv_task = CleanAllRUVTask(m1) + cruv_task.create(properties={ + 'replica-id': str(i), + 'replica-base-dn': DEFAULT_SUFFIX, + 'replica-force-cleaning': 'no', # This forces these tasks to stick around + }) + cruv_task = CleanAllRUVTask(m2) + cruv_task.create(properties={ + 'replica-id': "10" + str(i), + 'replica-base-dn': DEFAULT_SUFFIX, + 'replica-force-cleaning': 'yes', # This allows the tasks to propagate + }) + + # Check the errors log for our error message in master 1 + assert m1.searchErrorsLog('Exceeded maximum number of active CLEANALLRUV tasks') + + if __name__ == '__main__': # Run isolated # -s for DEBUG mode diff --git a/ldap/servers/plugins/replication/repl5.h b/ldap/servers/plugins/replication/repl5.h index 1801a333e..9d25f2305 100644 --- a/ldap/servers/plugins/replication/repl5.h +++ b/ldap/servers/plugins/replication/repl5.h @@ -80,6 +80,8 @@ #define CLEANRUV_FINISHED "finished" #define CLEANRUV_CLEANING "cleaning" #define CLEANRUV_NO_MAXCSN "no maxcsn" +#define CLEANALLRUV_ID "CleanAllRUV Task" +#define ABORT_CLEANALLRUV_ID "Abort CleanAllRUV Task" /* DS 5.0 replication protocol error codes */ #define NSDS50_REPL_REPLICA_READY 0x00 /* Replica ready, go ahead */ @@ -784,6 +786,7 @@ void multimaster_mtnode_construct_replicas(void); void multimaster_be_state_change(void *handle, char *be_name, int old_be_state, int new_be_state); #define CLEANRIDSIZ 64 /* maximum number for concurrent CLEANALLRUV tasks */ +#define CLEANRID_BUFSIZ 128 typedef struct _cleanruv_data { @@ -815,6 +818,8 @@ int get_replica_type(Replica *r); int replica_execute_cleanruv_task_ext(Object *r, ReplicaId rid); void add_cleaned_rid(cleanruv_data *data); int is_cleaned_rid(ReplicaId rid); +int32_t check_and_set_cleanruv_task_count(ReplicaId rid); +int32_t check_and_set_abort_cleanruv_task_count(void); int replica_cleanall_ruv_abort(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *eAfter, int *returncode, char *returntext, void *arg); void replica_cleanallruv_thread_ext(void *arg); void stop_ruv_cleaning(void); @@ -833,8 +838,6 @@ void set_cleaned_rid(ReplicaId rid); void cleanruv_log(Slapi_Task *task, int rid, char *task_type, int sev_level, char *fmt, ...); char *replica_cleanallruv_get_local_maxcsn(ReplicaId rid, char *base_dn); - - /* replutil.c */ LDAPControl *create_managedsait_control(void); LDAPControl *create_backend_control(Slapi_DN *sdn); diff --git a/ldap/servers/plugins/replication/repl5_replica_config.c b/ldap/servers/plugins/replication/repl5_replica_config.c index 749e90936..c66a1c81d 100644 --- a/ldap/servers/plugins/replication/repl5_replica_config.c +++ b/ldap/servers/plugins/replication/repl5_replica_config.c @@ -30,17 +30,18 @@ #define CLEANALLRUV "CLEANALLRUV" #define CLEANALLRUVLEN 11 #define REPLICA_RDN "cn=replica" -#define CLEANALLRUV_ID "CleanAllRUV Task" -#define ABORT_CLEANALLRUV_ID "Abort CleanAllRUV Task" int slapi_log_urp = SLAPI_LOG_REPL; -static ReplicaId cleaned_rids[CLEANRIDSIZ + 1] = {0}; -static ReplicaId pre_cleaned_rids[CLEANRIDSIZ + 1] = {0}; -static ReplicaId aborted_rids[CLEANRIDSIZ + 1] = {0}; -static Slapi_RWLock *rid_lock = NULL; -static Slapi_RWLock *abort_rid_lock = NULL; +static ReplicaId cleaned_rids[CLEANRID_BUFSIZ] = {0}; +static ReplicaId pre_cleaned_rids[CLEANRID_BUFSIZ] = {0}; +static ReplicaId aborted_rids[CLEANRID_BUFSIZ] = {0}; +static PRLock *rid_lock = NULL; +static PRLock *abort_rid_lock = NULL; static PRLock *notify_lock = NULL; static PRCondVar *notify_cvar = NULL; +static PRLock *task_count_lock = NULL; +static int32_t clean_task_count = 0; +static int32_t abort_task_count = 0; /* Forward Declartions */ static int replica_config_add(Slapi_PBlock *pb, Slapi_Entry *e, Slapi_Entry *entryAfter, int *returncode, char *returntext, void *arg); @@ -67,8 +68,6 @@ static int replica_cleanallruv_send_abort_extop(Repl_Agmt *ra, Slapi_Task *task, static int replica_cleanallruv_check_maxcsn(Repl_Agmt *agmt, char *basedn, char *rid_text, char *maxcsn, Slapi_Task *task); static int replica_cleanallruv_replica_alive(Repl_Agmt *agmt); static int replica_cleanallruv_check_ruv(char *repl_root, Repl_Agmt *ra, char *rid_text, Slapi_Task *task, char *force); -static int get_cleanruv_task_count(void); -static int get_abort_cleanruv_task_count(void); static int replica_cleanup_task(Object *r, const char *task_name, char *returntext, int apply_mods); static int replica_task_done(Replica *replica); static void delete_cleaned_rid_config(cleanruv_data *data); @@ -114,20 +113,27 @@ replica_config_init() PR_GetError()); return -1; } - rid_lock = slapi_new_rwlock(); + rid_lock = PR_NewLock(); if (rid_lock == NULL) { slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - " "Failed to create rid_lock; NSPR error - %d\n", PR_GetError()); return -1; } - abort_rid_lock = slapi_new_rwlock(); + abort_rid_lock = PR_NewLock(); if (abort_rid_lock == NULL) { slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - " "Failed to create abort_rid_lock; NSPR error - %d\n", PR_GetError()); return -1; } + task_count_lock = PR_NewLock(); + if (task_count_lock == NULL) { + slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - " + "Failed to create task_count_lock; NSPR error - %d\n", + PR_GetError()); + return -1; + } if ((notify_lock = PR_NewLock()) == NULL) { slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "replica_config_init - " "Failed to create notify lock; NSPR error - %d\n", @@ -1533,12 +1539,6 @@ replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, co cleanruv_log(pre_task, rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Initiating CleanAllRUV Task..."); - if (get_cleanruv_task_count() >= CLEANRIDSIZ) { - /* we are already running the maximum number of tasks */ - cleanruv_log(pre_task, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, - "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ); - return LDAP_UNWILLING_TO_PERFORM; - } /* * Grab the replica */ @@ -1590,6 +1590,13 @@ replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, co goto fail; } + if (check_and_set_cleanruv_task_count(rid) != LDAP_SUCCESS) { + cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, + "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ); + rc = LDAP_UNWILLING_TO_PERFORM; + goto fail; + } + /* * Launch the cleanallruv thread. Once all the replicas are cleaned it will release the rid */ @@ -1597,6 +1604,9 @@ replica_execute_cleanall_ruv_task(Object *r, ReplicaId rid, Slapi_Task *task, co if (data == NULL) { cleanruv_log(pre_task, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, "Failed to allocate cleanruv_data. Aborting task."); rc = -1; + PR_Lock(task_count_lock); + clean_task_count--; + PR_Unlock(task_count_lock); goto fail; } data->repl_obj = r; @@ -1679,13 +1689,13 @@ replica_cleanallruv_thread(void *arg) int aborted = 0; int rc = 0; - if (!data || slapi_is_shutting_down()) { - return; /* no data */ - } - /* Increase active thread count to prevent a race condition at server shutdown */ g_incr_active_threadcnt(); + if (!data || slapi_is_shutting_down()) { + goto done; + } + if (data->task) { slapi_task_inc_refcount(data->task); slapi_log_err(SLAPI_LOG_PLUGIN, repl_plugin_name, @@ -1732,16 +1742,13 @@ replica_cleanallruv_thread(void *arg) slapi_task_begin(data->task, 1); } /* - * Presetting the rid prevents duplicate thread creation, but allows the db and changelog to still - * process updates from the rid. - * set_cleaned_rid() blocks updates, so we don't want to do that... yet unless we are in force mode. - * If we are forcing a clean independent of state of other servers for this RID we can set_cleaned_rid() + * We have already preset this rid, but if we are forcing a clean independent of state + * of other servers for this RID we can set_cleaned_rid() */ if (data->force) { set_cleaned_rid(data->rid); - } else { - preset_cleaned_rid(data->rid); } + rid_text = slapi_ch_smprintf("%d", data->rid); csn_as_string(data->maxcsn, PR_FALSE, csnstr); /* @@ -1911,6 +1918,9 @@ done: /* * If the replicas are cleaned, release the rid */ + if (slapi_is_shutting_down()) { + stop_ruv_cleaning(); + } if (!aborted && !slapi_is_shutting_down()) { /* * Success - the rid has been cleaned! @@ -1929,10 +1939,9 @@ done: } else { cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Propagated task does not delete Keep alive entry (%d).", data->rid); } - clean_agmts(data); remove_cleaned_rid(data->rid); - cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Successfully cleaned rid(%d).", data->rid); + cleanruv_log(data->task, data->rid, CLEANALLRUV_ID, SLAPI_LOG_INFO, "Successfully cleaned rid(%d)", data->rid); } else { /* * Shutdown or abort @@ -1965,6 +1974,10 @@ done: slapi_ch_free_string(&data->force); slapi_ch_free_string(&rid_text); slapi_ch_free((void **)&data); + /* decrement task count */ + PR_Lock(task_count_lock); + clean_task_count--; + PR_Unlock(task_count_lock); g_decr_active_threadcnt(); } @@ -2462,16 +2475,14 @@ replica_send_cleanruv_task(Repl_Agmt *agmt, cleanruv_data *clean_data) int is_cleaned_rid(ReplicaId rid) { - int i; - - slapi_rwlock_rdlock(rid_lock); - for (i = 0; i < CLEANRIDSIZ && cleaned_rids[i] != 0; i++) { + PR_Lock(rid_lock); + for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) { if (rid == cleaned_rids[i]) { - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); return 1; } } - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); return 0; } @@ -2479,16 +2490,14 @@ is_cleaned_rid(ReplicaId rid) int is_pre_cleaned_rid(ReplicaId rid) { - int i; - - slapi_rwlock_rdlock(rid_lock); - for (i = 0; i < CLEANRIDSIZ && pre_cleaned_rids[i] != 0; i++) { + PR_Lock(rid_lock); + for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) { if (rid == pre_cleaned_rids[i]) { - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); return 1; } } - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); return 0; } @@ -2501,14 +2510,14 @@ is_task_aborted(ReplicaId rid) if (rid == 0) { return 0; } - slapi_rwlock_rdlock(abort_rid_lock); - for (i = 0; i < CLEANRIDSIZ && aborted_rids[i] != 0; i++) { + PR_Lock(abort_rid_lock); + for (i = 0; i < CLEANRID_BUFSIZ && aborted_rids[i] != 0; i++) { if (rid == aborted_rids[i]) { - slapi_rwlock_unlock(abort_rid_lock); + PR_Unlock(abort_rid_lock); return 1; } } - slapi_rwlock_unlock(abort_rid_lock); + PR_Unlock(abort_rid_lock); return 0; } @@ -2517,15 +2526,14 @@ preset_cleaned_rid(ReplicaId rid) { int i; - slapi_rwlock_wrlock(rid_lock); - for (i = 0; i < CLEANRIDSIZ; i++) { + PR_Lock(rid_lock); + for (i = 0; i < CLEANRID_BUFSIZ && pre_cleaned_rids[i] != rid; i++) { if (pre_cleaned_rids[i] == 0) { pre_cleaned_rids[i] = rid; - pre_cleaned_rids[i + 1] = 0; break; } } - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); } /* @@ -2538,14 +2546,13 @@ set_cleaned_rid(ReplicaId rid) { int i; - slapi_rwlock_wrlock(rid_lock); - for (i = 0; i < CLEANRIDSIZ; i++) { + PR_Lock(rid_lock); + for (i = 0; i < CLEANRID_BUFSIZ && cleaned_rids[i] != rid; i++) { if (cleaned_rids[i] == 0) { cleaned_rids[i] = rid; - cleaned_rids[i + 1] = 0; } } - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); } /* @@ -2621,15 +2628,14 @@ add_aborted_rid(ReplicaId rid, Replica *r, char *repl_root, char *certify_all, P int rc; int i; - slapi_rwlock_wrlock(abort_rid_lock); - for (i = 0; i < CLEANRIDSIZ; i++) { + PR_Lock(abort_rid_lock); + for (i = 0; i < CLEANRID_BUFSIZ; i++) { if (aborted_rids[i] == 0) { aborted_rids[i] = rid; - aborted_rids[i + 1] = 0; break; } } - slapi_rwlock_unlock(abort_rid_lock); + PR_Unlock(abort_rid_lock); /* * Write the rid to the config entry */ @@ -2672,21 +2678,24 @@ delete_aborted_rid(Replica *r, ReplicaId rid, char *repl_root, char *certify_all char *data; char *dn; int rc; - int i; if (r == NULL) return; if (skip) { /* skip the deleting of the config, and just remove the in memory rid */ - slapi_rwlock_wrlock(abort_rid_lock); - for (i = 0; i < CLEANRIDSIZ && aborted_rids[i] != rid; i++) - ; /* found rid, stop */ - for (; i < CLEANRIDSIZ; i++) { - /* rewrite entire array */ - aborted_rids[i] = aborted_rids[i + 1]; - } - slapi_rwlock_unlock(abort_rid_lock); + ReplicaId new_abort_rids[CLEANRID_BUFSIZ] = {0}; + int32_t idx = 0; + + PR_Lock(abort_rid_lock); + for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) { + if (aborted_rids[i] != rid) { + new_abort_rids[idx] = aborted_rids[i]; + idx++; + } + } + memcpy(aborted_rids, new_abort_rids, sizeof(new_abort_rids)); + PR_Unlock(abort_rid_lock); } else { /* only remove the config, leave the in-memory rid */ dn = replica_get_dn(r); @@ -2832,27 +2841,31 @@ bail: void remove_cleaned_rid(ReplicaId rid) { - int i; - /* - * Remove this rid, and optimize the array - */ - slapi_rwlock_wrlock(rid_lock); + ReplicaId new_cleaned_rids[CLEANRID_BUFSIZ] = {0}; + ReplicaId new_pre_cleaned_rids[CLEANRID_BUFSIZ] = {0}; + size_t idx = 0; + + PR_Lock(rid_lock); - for (i = 0; i < CLEANRIDSIZ && cleaned_rids[i] != rid; i++) - ; /* found rid, stop */ - for (; i < CLEANRIDSIZ; i++) { - /* rewrite entire array */ - cleaned_rids[i] = cleaned_rids[i + 1]; + for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) { + if (cleaned_rids[i] != rid) { + new_cleaned_rids[idx] = cleaned_rids[i]; + idx++; + } } + memcpy(cleaned_rids, new_cleaned_rids, sizeof(new_cleaned_rids)); + /* now do the preset cleaned rids */ - for (i = 0; i < CLEANRIDSIZ && pre_cleaned_rids[i] != rid; i++) - ; /* found rid, stop */ - for (; i < CLEANRIDSIZ; i++) { - /* rewrite entire array */ - pre_cleaned_rids[i] = pre_cleaned_rids[i + 1]; + idx = 0; + for (size_t i = 0; i < CLEANRID_BUFSIZ; i++) { + if (pre_cleaned_rids[i] != rid) { + new_pre_cleaned_rids[idx] = pre_cleaned_rids[i]; + idx++; + } } + memcpy(pre_cleaned_rids, new_pre_cleaned_rids, sizeof(new_pre_cleaned_rids)); - slapi_rwlock_unlock(rid_lock); + PR_Unlock(rid_lock); } /* @@ -2882,16 +2895,6 @@ replica_cleanall_ruv_abort(Slapi_PBlock *pb __attribute__((unused)), char *ridstr = NULL; int rc = SLAPI_DSE_CALLBACK_OK; - if (get_abort_cleanruv_task_count() >= CLEANRIDSIZ) { - /* we are already running the maximum number of tasks */ - PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, - "Exceeded maximum number of active ABORT CLEANALLRUV tasks(%d)", - CLEANRIDSIZ); - cleanruv_log(task, -1, ABORT_CLEANALLRUV_ID, SLAPI_LOG_ERR, "%s", returntext); - *returncode = LDAP_OPERATIONS_ERROR; - return SLAPI_DSE_CALLBACK_ERROR; - } - /* allocate new task now */ task = slapi_new_task(slapi_entry_get_ndn(e)); @@ -2976,6 +2979,16 @@ replica_cleanall_ruv_abort(Slapi_PBlock *pb __attribute__((unused)), */ certify_all = "no"; } + + if (check_and_set_abort_cleanruv_task_count() != LDAP_SUCCESS) { + /* we are already running the maximum number of tasks */ + PR_snprintf(returntext, SLAPI_DSE_RETURNTEXT_SIZE, + "Exceeded maximum number of active ABORT CLEANALLRUV tasks(%d)", + CLEANRIDSIZ); + cleanruv_log(task, -1, ABORT_CLEANALLRUV_ID, SLAPI_LOG_ERR, "%s", returntext); + *returncode = LDAP_UNWILLING_TO_PERFORM; + goto out; + } /* * Create payload */ @@ -3190,6 +3203,9 @@ done: slapi_ch_free_string(&data->certify); slapi_sdn_free(&data->sdn); slapi_ch_free((void **)&data); + PR_Lock(task_count_lock); + abort_task_count--; + PR_Unlock(task_count_lock); g_decr_active_threadcnt(); } @@ -3541,36 +3557,43 @@ replica_cleanallruv_check_ruv(char *repl_root, Repl_Agmt *agmt, char *rid_text, return rc; } -static int -get_cleanruv_task_count(void) +/* + * Before starting a cleanAllRUV task make sure there are not + * too many task threads already running. If everything is okay + * also pre-set the RID now so rebounding extended ops do not + * try to clean it over and over. + */ +int32_t +check_and_set_cleanruv_task_count(ReplicaId rid) { - int i, count = 0; + int32_t rc = 0; - slapi_rwlock_wrlock(rid_lock); - for (i = 0; i < CLEANRIDSIZ; i++) { - if (pre_cleaned_rids[i] != 0) { - count++; - } + PR_Lock(task_count_lock); + if (clean_task_count >= CLEANRIDSIZ) { + rc = -1; + } else { + clean_task_count++; + preset_cleaned_rid(rid); } - slapi_rwlock_unlock(rid_lock); + PR_Unlock(task_count_lock); - return count; + return rc; } -static int -get_abort_cleanruv_task_count(void) +int32_t +check_and_set_abort_cleanruv_task_count(void) { - int i, count = 0; + int32_t rc = 0; - slapi_rwlock_wrlock(rid_lock); - for (i = 0; i < CLEANRIDSIZ; i++) { - if (aborted_rids[i] != 0) { - count++; + PR_Lock(task_count_lock); + if (abort_task_count > CLEANRIDSIZ) { + rc = -1; + } else { + abort_task_count++; } - } - slapi_rwlock_unlock(rid_lock); + PR_Unlock(task_count_lock); - return count; + return rc; } /* diff --git a/ldap/servers/plugins/replication/repl_extop.c b/ldap/servers/plugins/replication/repl_extop.c index b49cb8cd5..5bed84958 100644 --- a/ldap/servers/plugins/replication/repl_extop.c +++ b/ldap/servers/plugins/replication/repl_extop.c @@ -1393,6 +1393,12 @@ multimaster_extop_abort_cleanruv(Slapi_PBlock *pb) rc = LDAP_OPERATIONS_ERROR; goto out; } + if (check_and_set_abort_cleanruv_task_count() != LDAP_SUCCESS) { + cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, + "Exceeded maximum number of active abort CLEANALLRUV tasks(%d)", CLEANRIDSIZ); + rc = LDAP_UNWILLING_TO_PERFORM; + goto out; + } /* * Prepare the abort data */ @@ -1499,6 +1505,7 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb) if (force == NULL) { force = "no"; } + maxcsn = csn_new(); csn_init_by_string(maxcsn, csnstr); /* @@ -1535,13 +1542,21 @@ multimaster_extop_cleanruv(Slapi_PBlock *pb) goto free_and_return; } + if (check_and_set_cleanruv_task_count((ReplicaId)rid) != LDAP_SUCCESS) { + cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, + "Exceeded maximum number of active CLEANALLRUV tasks(%d)", CLEANRIDSIZ); + rc = LDAP_UNWILLING_TO_PERFORM; + goto free_and_return; + } + if (replica_get_type(r) != REPLICA_TYPE_READONLY) { /* * Launch the cleanruv monitoring thread. Once all the replicas are cleaned it will release the rid * * This will also release mtnode_ext->replica */ - slapi_log_err(SLAPI_LOG_INFO, repl_plugin_name, "multimaster_extop_cleanruv - CleanAllRUV Task - Launching cleanAllRUV thread...\n"); + + cleanruv_log(NULL, rid, CLEANALLRUV_ID, SLAPI_LOG_ERR, "Launching cleanAllRUV thread...\n"); data = (cleanruv_data *)slapi_ch_calloc(1, sizeof(cleanruv_data)); if (data == NULL) { slapi_log_err(SLAPI_LOG_ERR, repl_plugin_name, "multimaster_extop_cleanruv - CleanAllRUV Task - Failed to allocate " @@ -1635,7 +1650,7 @@ free_and_return: ber_printf(resp_bere, "{s}", CLEANRUV_ACCEPTED); ber_flatten(resp_bere, &resp_bval); slapi_pblock_set(pb, SLAPI_EXT_OP_RET_VALUE, resp_bval); - slapi_send_ldap_result(pb, LDAP_SUCCESS, NULL, NULL, 0, NULL); + slapi_send_ldap_result(pb, rc, NULL, NULL, 0, NULL); /* resp_bere */ if (NULL != resp_bere) { ber_free(resp_bere, 1); -- 2.21.0