From 016aa2bb9553a9a64ec6645db40ef95dd8de7041 Mon Sep 17 00:00:00 2001
From: Tomas Jelinek <tojeline@redhat.com>
Date: Tue, 19 Feb 2019 17:53:17 +0100
Subject: [PATCH 3/3] lower load created by config files syncing in pcsd
* make the sync less frequent (10 minutes instead of 1 minute) by
default
* if previous attempt for syncing was unable to connect to other nodes,
try again sooner (in 1 minute by default)
---
pcsd/cfgsync.rb | 60 ++++++++++++++++----
pcsd/pcsd.8 | 9 ++-
pcsd/pcsd.rb | 24 ++++++--
pcsd/test/test_cfgsync.rb | 114 ++++++++++++++++++++++++++++++--------
4 files changed, 167 insertions(+), 40 deletions(-)
diff --git a/pcsd/cfgsync.rb b/pcsd/cfgsync.rb
index 9acd8d0f..44e6d853 100644
--- a/pcsd/cfgsync.rb
+++ b/pcsd/cfgsync.rb
@@ -313,8 +313,11 @@ module Cfgsync
class ConfigSyncControl
- @thread_interval_default = 60
- @thread_interval_minimum = 20
+ # intervals in seconds
+ @thread_interval_default = 600
+ @thread_interval_minimum = 60
+ @thread_interval_previous_not_connected_default = 60
+ @thread_interval_previous_not_connected_minimum = 20
@file_backup_count_default = 50
@file_backup_count_minimum = 0
@@ -349,6 +352,20 @@ module Cfgsync
return self.save(data)
end
+ def self.sync_thread_interval_previous_not_connected()
+ return self.get_integer_value(
+ self.load()['thread_interval_previous_not_connected'],
+ @thread_interval_previous_not_connected_default,
+ @thread_interval_previous_not_connected_minimum
+ )
+ end
+
+ def self.sync_thread_interval_previous_not_connected=(seconds)
+ data = self.load()
+ data['thread_interval_previous_not_connected'] = seconds
+ return self.save(data)
+ end
+
def self.sync_thread_pause(semaphore_cfgsync, seconds=300)
# wait for the thread to finish current run and disable it
semaphore_cfgsync.synchronize {
@@ -585,14 +602,17 @@ module Cfgsync
end
def fetch_all()
- return self.filter_configs_cluster(
- self.get_configs_cluster(@nodes, @cluster_name),
- @config_classes
+ node_configs, node_connected = self.get_configs_cluster(
+ @nodes, @cluster_name
)
+ filtered_configs = self.filter_configs_cluster(
+ node_configs, @config_classes
+ )
+ return filtered_configs, node_connected
end
def fetch()
- configs_cluster = self.fetch_all()
+ configs_cluster, node_connected = self.fetch_all()
newest_configs_cluster = {}
configs_cluster.each { |name, cfgs|
@@ -613,7 +633,7 @@ module Cfgsync
end
end
}
- return to_update_locally, to_update_in_cluster
+ return to_update_locally, to_update_in_cluster, node_connected
end
protected
@@ -630,12 +650,15 @@ module Cfgsync
$logger.debug 'Fetching configs from the cluster'
threads = []
node_configs = {}
+ connected_to = {}
nodes.each { |node|
threads << Thread.new {
code, out = send_request_with_token(
@auth_user, node, 'get_configs', false, data
)
+ connected_to[node] = false
if 200 == code
+ connected_to[node] = true
begin
parsed = JSON::parse(out)
if 'ok' == parsed['status'] and cluster_name == parsed['cluster_name']
@@ -647,7 +670,24 @@ module Cfgsync
}
}
threads.each { |t| t.join }
- return node_configs
+
+ node_connected = false
+ if connected_to.empty?()
+ node_connected = true # no nodes to connect to => no connection errors
+ else
+ connected_count = 0
+ connected_to.each { |node, connected|
+ if connected
+ connected_count += 1
+ end
+ }
+ # If we only connected to one node, consider it a fail and continue as
+ # if we could not connect anywhere. The one node is probably the local
+ # node.
+ node_connected = connected_count > 1
+ end
+
+ return node_configs, node_connected
end
def filter_configs_cluster(node_configs, wanted_configs_classes)
@@ -752,7 +792,7 @@ module Cfgsync
fetcher = ConfigFetcher.new(
PCSAuth.getSuperuserAuth(), [config.class], nodes, cluster_name
)
- cfgs_to_save, _ = fetcher.fetch()
+ cfgs_to_save, _, _ = fetcher.fetch()
cfgs_to_save.each { |cfg_to_save|
cfg_to_save.save() if cfg_to_save.class == config.class
}
@@ -812,7 +852,7 @@ module Cfgsync
fetcher = ConfigFetcher.new(
PCSAuth.getSuperuserAuth(), [config_new.class], nodes, cluster_name
)
- fetched_tokens = fetcher.fetch_all()[config_new.class.name]
+ fetched_tokens, _ = fetcher.fetch_all()[config_new.class.name]
config_new = Cfgsync::merge_tokens_files(
config, fetched_tokens, new_tokens, new_ports
)
diff --git a/pcsd/pcsd.8 b/pcsd/pcsd.8
index e58b7ff6..bd405043 100644
--- a/pcsd/pcsd.8
+++ b/pcsd/pcsd.8
@@ -63,9 +63,11 @@ Example:
.br
"thread_disabled": false,
.br
- "thread_interval": 60,
+ "thread_interval": 600,
.br
- "thread_paused_until": 1487780453,
+ "thread_interval_previous_not_connected": 60,
+.br
+ "thread_paused_until": 1487780453
.br
}
@@ -79,6 +81,9 @@ Set this to \fBtrue\fR to completely disable the synchronization.
.B thread_interval
How often in seconds should pcsd ask other nodes if the synchronized files have changed.
.TP
+.B thread_interval_previous_not_connected
+How often in seconds should pcsd ask other nodes if the synchronized files have changed if during the previous attempt pcsd was unable to connect to at least two nodes.
+.TP
.B thread_paused_until
Disable the synchronization until the set unix timestamp.
diff --git a/pcsd/pcsd.rb b/pcsd/pcsd.rb
index 9f9bd091..6e5e27e0 100644
--- a/pcsd/pcsd.rb
+++ b/pcsd/pcsd.rb
@@ -132,14 +132,15 @@ set :run, false
$thread_cfgsync = Thread.new {
while true
+ node_connected = true
$semaphore_cfgsync.synchronize {
- $logger.debug('Config files sync thread started')
if Cfgsync::ConfigSyncControl.sync_thread_allowed?()
+ $logger.info('Config files sync thread started')
begin
# do not sync if this host is not in a cluster
cluster_name = get_cluster_name()
cluster_nodes = get_corosync_nodes()
- if cluster_name and !cluster_name.empty?() and cluster_nodes and !cluster_nodes.empty?
+ if cluster_name and !cluster_name.empty?() and cluster_nodes and cluster_nodes.count > 1
$logger.debug('Config files sync thread fetching')
fetcher = Cfgsync::ConfigFetcher.new(
PCSAuth.getSuperuserAuth(),
@@ -147,18 +148,31 @@ $thread_cfgsync = Thread.new {
cluster_nodes,
cluster_name
)
- cfgs_to_save, _ = fetcher.fetch()
+ cfgs_to_save, _, node_connected = fetcher.fetch()
cfgs_to_save.each { |cfg_to_save|
cfg_to_save.save()
}
+ $logger.info('Config files sync thread finished')
+ else
+ $logger.info(
+ 'Config files sync skipped, this host does not seem to be in ' +
+ 'a cluster of at least 2 nodes'
+ )
end
rescue => e
$logger.warn("Config files sync thread exception: #{e}")
end
+ else
+ $logger.info('Config files sync is disabled or paused, skipping')
end
- $logger.debug('Config files sync thread finished')
}
- sleep(Cfgsync::ConfigSyncControl.sync_thread_interval())
+ if node_connected
+ sleep(Cfgsync::ConfigSyncControl.sync_thread_interval())
+ else
+ sleep(
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+ end
end
}
diff --git a/pcsd/test/test_cfgsync.rb b/pcsd/test/test_cfgsync.rb
index 9b0317ce..b49c44d2 100644
--- a/pcsd/test/test_cfgsync.rb
+++ b/pcsd/test/test_cfgsync.rb
@@ -287,8 +287,10 @@ class TestConfigSyncControll < Test::Unit::TestCase
file = File.open(CFG_SYNC_CONTROL, 'w')
file.write(JSON.pretty_generate({}))
file.close()
- @thread_interval_default = 60
- @thread_interval_minimum = 20
+ @thread_interval_default = 600
+ @thread_interval_minimum = 60
+ @thread_interval_previous_not_connected_default = 60
+ @thread_interval_previous_not_connected_minimum = 20
@file_backup_count_default = 50
@file_backup_count_minimum = 0
end
@@ -441,6 +443,65 @@ class TestConfigSyncControll < Test::Unit::TestCase
)
end
+ def test_interval_previous_not_connected()
+ assert_equal(
+ @thread_interval_previous_not_connected_default,
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+
+ interval = (
+ @thread_interval_previous_not_connected_default +
+ @thread_interval_previous_not_connected_minimum
+ )
+ assert(
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+ interval
+ )
+ )
+ assert_equal(
+ interval,
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+
+ assert(
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+ @thread_interval_previous_not_connected_minimum / 2
+ )
+ )
+ assert_equal(
+ @thread_interval_previous_not_connected_minimum,
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+
+ assert(
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(0)
+ )
+ assert_equal(
+ @thread_interval_previous_not_connected_minimum,
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+
+ assert(
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+ -100
+ )
+ )
+ assert_equal(
+ @thread_interval_previous_not_connected_minimum,
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+
+ assert(
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+ 'abcd'
+ )
+ )
+ assert_equal(
+ @thread_interval_previous_not_connected_default,
+ Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+ )
+ end
+
def test_file_backup_count()
assert_equal(
@file_backup_count_default,
@@ -495,11 +556,12 @@ class TestConfigFetcher < Test::Unit::TestCase
end
def get_configs_cluster(nodes, cluster_name)
- return @configs_cluster
+ return @configs_cluster, @node_connected
end
- def set_configs_cluster(configs)
+ def set_configs_cluster(configs, node_connected=true)
@configs_cluster = configs
+ @node_connected = node_connected
return self
end
@@ -569,31 +631,37 @@ class TestConfigFetcher < Test::Unit::TestCase
cfg_name = Cfgsync::ClusterConf.name
fetcher = ConfigFetcherMock.new({}, [Cfgsync::ClusterConf], nil, nil)
+ # unable to connect to any nodes
+ fetcher.set_configs_local({cfg_name => cfg1})
+
+ fetcher.set_configs_cluster({}, false)
+ assert_equal([[], [], false], fetcher.fetch())
+
# local config is synced
fetcher.set_configs_local({cfg_name => cfg1})
fetcher.set_configs_cluster({
'node1' => {'configs' => {cfg_name => cfg1}},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {'configs' => {cfg_name => cfg2}},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {'configs' => {cfg_name => cfg1}},
'node2' => {'configs' => {cfg_name => cfg2}},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {'configs' => {cfg_name => cfg1}},
'node2' => {'configs' => {cfg_name => cfg2}},
'node3' => {'configs' => {cfg_name => cfg2}},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
# local config is older
fetcher.set_configs_local({cfg_name => cfg1})
@@ -601,20 +669,20 @@ class TestConfigFetcher < Test::Unit::TestCase
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
})
- assert_equal([[cfg3], []], fetcher.fetch())
+ assert_equal([[cfg3], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
})
- assert_equal([[cfg4], []], fetcher.fetch())
+ assert_equal([[cfg4], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
'node3' => {cfg_name => cfg3},
})
- assert_equal([[cfg3], []], fetcher.fetch())
+ assert_equal([[cfg3], [], true], fetcher.fetch())
# local config is newer
fetcher.set_configs_local({cfg_name => cfg3})
@@ -622,13 +690,13 @@ class TestConfigFetcher < Test::Unit::TestCase
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg1},
})
- assert_equal([[], [cfg3]], fetcher.fetch())
+ assert_equal([[], [cfg3], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg1},
'node2' => {cfg_name => cfg1},
})
- assert_equal([[], [cfg3]], fetcher.fetch())
+ assert_equal([[], [cfg3], true], fetcher.fetch())
# local config is the same version
fetcher.set_configs_local({cfg_name => cfg3})
@@ -636,32 +704,32 @@ class TestConfigFetcher < Test::Unit::TestCase
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg4},
})
- assert_equal([[cfg4], []], fetcher.fetch())
+ assert_equal([[cfg4], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
})
- assert_equal([[cfg4], []], fetcher.fetch())
+ assert_equal([[cfg4], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
'node3' => {cfg_name => cfg3},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
'node3' => {cfg_name => cfg4},
})
- assert_equal([[cfg4], []], fetcher.fetch())
+ assert_equal([[cfg4], [], true], fetcher.fetch())
# local config is the same version
fetcher.set_configs_local({cfg_name => cfg4})
@@ -669,32 +737,32 @@ class TestConfigFetcher < Test::Unit::TestCase
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
})
- assert_equal([[cfg3], []], fetcher.fetch())
+ assert_equal([[cfg3], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg4},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
'node3' => {cfg_name => cfg3},
})
- assert_equal([[cfg3], []], fetcher.fetch())
+ assert_equal([[cfg3], [], true], fetcher.fetch())
fetcher.set_configs_cluster({
'node1' => {cfg_name => cfg3},
'node2' => {cfg_name => cfg4},
'node3' => {cfg_name => cfg4},
})
- assert_equal([[], []], fetcher.fetch())
+ assert_equal([[], [], true], fetcher.fetch())
end
end
--
2.17.0