From 016aa2bb9553a9a64ec6645db40ef95dd8de7041 Mon Sep 17 00:00:00 2001 From: Tomas Jelinek Date: Tue, 19 Feb 2019 17:53:17 +0100 Subject: [PATCH 3/3] lower load created by config files syncing in pcsd * make the sync less frequent (10 minutes instead of 1 minute) by default * if previous attempt for syncing was unable to connect to other nodes, try again sooner (in 1 minute by default) --- pcsd/cfgsync.rb | 60 ++++++++++++++++---- pcsd/pcsd.8 | 9 ++- pcsd/pcsd.rb | 24 ++++++-- pcsd/test/test_cfgsync.rb | 114 ++++++++++++++++++++++++++++++-------- 4 files changed, 167 insertions(+), 40 deletions(-) diff --git a/pcsd/cfgsync.rb b/pcsd/cfgsync.rb index 9acd8d0f..44e6d853 100644 --- a/pcsd/cfgsync.rb +++ b/pcsd/cfgsync.rb @@ -313,8 +313,11 @@ module Cfgsync class ConfigSyncControl - @thread_interval_default = 60 - @thread_interval_minimum = 20 + # intervals in seconds + @thread_interval_default = 600 + @thread_interval_minimum = 60 + @thread_interval_previous_not_connected_default = 60 + @thread_interval_previous_not_connected_minimum = 20 @file_backup_count_default = 50 @file_backup_count_minimum = 0 @@ -349,6 +352,20 @@ module Cfgsync return self.save(data) end + def self.sync_thread_interval_previous_not_connected() + return self.get_integer_value( + self.load()['thread_interval_previous_not_connected'], + @thread_interval_previous_not_connected_default, + @thread_interval_previous_not_connected_minimum + ) + end + + def self.sync_thread_interval_previous_not_connected=(seconds) + data = self.load() + data['thread_interval_previous_not_connected'] = seconds + return self.save(data) + end + def self.sync_thread_pause(semaphore_cfgsync, seconds=300) # wait for the thread to finish current run and disable it semaphore_cfgsync.synchronize { @@ -585,14 +602,17 @@ module Cfgsync end def fetch_all() - return self.filter_configs_cluster( - self.get_configs_cluster(@nodes, @cluster_name), - @config_classes + node_configs, node_connected = self.get_configs_cluster( + @nodes, @cluster_name ) + filtered_configs = self.filter_configs_cluster( + node_configs, @config_classes + ) + return filtered_configs, node_connected end def fetch() - configs_cluster = self.fetch_all() + configs_cluster, node_connected = self.fetch_all() newest_configs_cluster = {} configs_cluster.each { |name, cfgs| @@ -613,7 +633,7 @@ module Cfgsync end end } - return to_update_locally, to_update_in_cluster + return to_update_locally, to_update_in_cluster, node_connected end protected @@ -630,12 +650,15 @@ module Cfgsync $logger.debug 'Fetching configs from the cluster' threads = [] node_configs = {} + connected_to = {} nodes.each { |node| threads << Thread.new { code, out = send_request_with_token( @auth_user, node, 'get_configs', false, data ) + connected_to[node] = false if 200 == code + connected_to[node] = true begin parsed = JSON::parse(out) if 'ok' == parsed['status'] and cluster_name == parsed['cluster_name'] @@ -647,7 +670,24 @@ module Cfgsync } } threads.each { |t| t.join } - return node_configs + + node_connected = false + if connected_to.empty?() + node_connected = true # no nodes to connect to => no connection errors + else + connected_count = 0 + connected_to.each { |node, connected| + if connected + connected_count += 1 + end + } + # If we only connected to one node, consider it a fail and continue as + # if we could not connect anywhere. The one node is probably the local + # node. + node_connected = connected_count > 1 + end + + return node_configs, node_connected end def filter_configs_cluster(node_configs, wanted_configs_classes) @@ -752,7 +792,7 @@ module Cfgsync fetcher = ConfigFetcher.new( PCSAuth.getSuperuserAuth(), [config.class], nodes, cluster_name ) - cfgs_to_save, _ = fetcher.fetch() + cfgs_to_save, _, _ = fetcher.fetch() cfgs_to_save.each { |cfg_to_save| cfg_to_save.save() if cfg_to_save.class == config.class } @@ -812,7 +852,7 @@ module Cfgsync fetcher = ConfigFetcher.new( PCSAuth.getSuperuserAuth(), [config_new.class], nodes, cluster_name ) - fetched_tokens = fetcher.fetch_all()[config_new.class.name] + fetched_tokens, _ = fetcher.fetch_all()[config_new.class.name] config_new = Cfgsync::merge_tokens_files( config, fetched_tokens, new_tokens, new_ports ) diff --git a/pcsd/pcsd.8 b/pcsd/pcsd.8 index e58b7ff6..bd405043 100644 --- a/pcsd/pcsd.8 +++ b/pcsd/pcsd.8 @@ -63,9 +63,11 @@ Example: .br "thread_disabled": false, .br - "thread_interval": 60, + "thread_interval": 600, .br - "thread_paused_until": 1487780453, + "thread_interval_previous_not_connected": 60, +.br + "thread_paused_until": 1487780453 .br } @@ -79,6 +81,9 @@ Set this to \fBtrue\fR to completely disable the synchronization. .B thread_interval How often in seconds should pcsd ask other nodes if the synchronized files have changed. .TP +.B thread_interval_previous_not_connected +How often in seconds should pcsd ask other nodes if the synchronized files have changed if during the previous attempt pcsd was unable to connect to at least two nodes. +.TP .B thread_paused_until Disable the synchronization until the set unix timestamp. diff --git a/pcsd/pcsd.rb b/pcsd/pcsd.rb index 9f9bd091..6e5e27e0 100644 --- a/pcsd/pcsd.rb +++ b/pcsd/pcsd.rb @@ -132,14 +132,15 @@ set :run, false $thread_cfgsync = Thread.new { while true + node_connected = true $semaphore_cfgsync.synchronize { - $logger.debug('Config files sync thread started') if Cfgsync::ConfigSyncControl.sync_thread_allowed?() + $logger.info('Config files sync thread started') begin # do not sync if this host is not in a cluster cluster_name = get_cluster_name() cluster_nodes = get_corosync_nodes() - if cluster_name and !cluster_name.empty?() and cluster_nodes and !cluster_nodes.empty? + if cluster_name and !cluster_name.empty?() and cluster_nodes and cluster_nodes.count > 1 $logger.debug('Config files sync thread fetching') fetcher = Cfgsync::ConfigFetcher.new( PCSAuth.getSuperuserAuth(), @@ -147,18 +148,31 @@ $thread_cfgsync = Thread.new { cluster_nodes, cluster_name ) - cfgs_to_save, _ = fetcher.fetch() + cfgs_to_save, _, node_connected = fetcher.fetch() cfgs_to_save.each { |cfg_to_save| cfg_to_save.save() } + $logger.info('Config files sync thread finished') + else + $logger.info( + 'Config files sync skipped, this host does not seem to be in ' + + 'a cluster of at least 2 nodes' + ) end rescue => e $logger.warn("Config files sync thread exception: #{e}") end + else + $logger.info('Config files sync is disabled or paused, skipping') end - $logger.debug('Config files sync thread finished') } - sleep(Cfgsync::ConfigSyncControl.sync_thread_interval()) + if node_connected + sleep(Cfgsync::ConfigSyncControl.sync_thread_interval()) + else + sleep( + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + end end } diff --git a/pcsd/test/test_cfgsync.rb b/pcsd/test/test_cfgsync.rb index 9b0317ce..b49c44d2 100644 --- a/pcsd/test/test_cfgsync.rb +++ b/pcsd/test/test_cfgsync.rb @@ -287,8 +287,10 @@ class TestConfigSyncControll < Test::Unit::TestCase file = File.open(CFG_SYNC_CONTROL, 'w') file.write(JSON.pretty_generate({})) file.close() - @thread_interval_default = 60 - @thread_interval_minimum = 20 + @thread_interval_default = 600 + @thread_interval_minimum = 60 + @thread_interval_previous_not_connected_default = 60 + @thread_interval_previous_not_connected_minimum = 20 @file_backup_count_default = 50 @file_backup_count_minimum = 0 end @@ -441,6 +443,65 @@ class TestConfigSyncControll < Test::Unit::TestCase ) end + def test_interval_previous_not_connected() + assert_equal( + @thread_interval_previous_not_connected_default, + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + + interval = ( + @thread_interval_previous_not_connected_default + + @thread_interval_previous_not_connected_minimum + ) + assert( + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=( + interval + ) + ) + assert_equal( + interval, + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + + assert( + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=( + @thread_interval_previous_not_connected_minimum / 2 + ) + ) + assert_equal( + @thread_interval_previous_not_connected_minimum, + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + + assert( + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(0) + ) + assert_equal( + @thread_interval_previous_not_connected_minimum, + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + + assert( + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=( + -100 + ) + ) + assert_equal( + @thread_interval_previous_not_connected_minimum, + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + + assert( + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=( + 'abcd' + ) + ) + assert_equal( + @thread_interval_previous_not_connected_default, + Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected() + ) + end + def test_file_backup_count() assert_equal( @file_backup_count_default, @@ -495,11 +556,12 @@ class TestConfigFetcher < Test::Unit::TestCase end def get_configs_cluster(nodes, cluster_name) - return @configs_cluster + return @configs_cluster, @node_connected end - def set_configs_cluster(configs) + def set_configs_cluster(configs, node_connected=true) @configs_cluster = configs + @node_connected = node_connected return self end @@ -569,31 +631,37 @@ class TestConfigFetcher < Test::Unit::TestCase cfg_name = Cfgsync::ClusterConf.name fetcher = ConfigFetcherMock.new({}, [Cfgsync::ClusterConf], nil, nil) + # unable to connect to any nodes + fetcher.set_configs_local({cfg_name => cfg1}) + + fetcher.set_configs_cluster({}, false) + assert_equal([[], [], false], fetcher.fetch()) + # local config is synced fetcher.set_configs_local({cfg_name => cfg1}) fetcher.set_configs_cluster({ 'node1' => {'configs' => {cfg_name => cfg1}}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {'configs' => {cfg_name => cfg2}}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {'configs' => {cfg_name => cfg1}}, 'node2' => {'configs' => {cfg_name => cfg2}}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {'configs' => {cfg_name => cfg1}}, 'node2' => {'configs' => {cfg_name => cfg2}}, 'node3' => {'configs' => {cfg_name => cfg2}}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) # local config is older fetcher.set_configs_local({cfg_name => cfg1}) @@ -601,20 +669,20 @@ class TestConfigFetcher < Test::Unit::TestCase fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, }) - assert_equal([[cfg3], []], fetcher.fetch()) + assert_equal([[cfg3], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, }) - assert_equal([[cfg4], []], fetcher.fetch()) + assert_equal([[cfg4], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, 'node3' => {cfg_name => cfg3}, }) - assert_equal([[cfg3], []], fetcher.fetch()) + assert_equal([[cfg3], [], true], fetcher.fetch()) # local config is newer fetcher.set_configs_local({cfg_name => cfg3}) @@ -622,13 +690,13 @@ class TestConfigFetcher < Test::Unit::TestCase fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg1}, }) - assert_equal([[], [cfg3]], fetcher.fetch()) + assert_equal([[], [cfg3], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg1}, 'node2' => {cfg_name => cfg1}, }) - assert_equal([[], [cfg3]], fetcher.fetch()) + assert_equal([[], [cfg3], true], fetcher.fetch()) # local config is the same version fetcher.set_configs_local({cfg_name => cfg3}) @@ -636,32 +704,32 @@ class TestConfigFetcher < Test::Unit::TestCase fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg4}, }) - assert_equal([[cfg4], []], fetcher.fetch()) + assert_equal([[cfg4], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, }) - assert_equal([[cfg4], []], fetcher.fetch()) + assert_equal([[cfg4], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, 'node3' => {cfg_name => cfg3}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, 'node3' => {cfg_name => cfg4}, }) - assert_equal([[cfg4], []], fetcher.fetch()) + assert_equal([[cfg4], [], true], fetcher.fetch()) # local config is the same version fetcher.set_configs_local({cfg_name => cfg4}) @@ -669,32 +737,32 @@ class TestConfigFetcher < Test::Unit::TestCase fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, }) - assert_equal([[cfg3], []], fetcher.fetch()) + assert_equal([[cfg3], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg4}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, 'node3' => {cfg_name => cfg3}, }) - assert_equal([[cfg3], []], fetcher.fetch()) + assert_equal([[cfg3], [], true], fetcher.fetch()) fetcher.set_configs_cluster({ 'node1' => {cfg_name => cfg3}, 'node2' => {cfg_name => cfg4}, 'node3' => {cfg_name => cfg4}, }) - assert_equal([[], []], fetcher.fetch()) + assert_equal([[], [], true], fetcher.fetch()) end end -- 2.17.0