Blob Blame History Raw
From 016aa2bb9553a9a64ec6645db40ef95dd8de7041 Mon Sep 17 00:00:00 2001
From: Tomas Jelinek <tojeline@redhat.com>
Date: Tue, 19 Feb 2019 17:53:17 +0100
Subject: [PATCH 3/3] lower load created by config files syncing in pcsd

* make the sync less frequent (10 minutes instead of 1 minute) by
  default
* if previous attempt for syncing was unable to connect to other nodes,
  try again sooner (in 1 minute by default)
---
 pcsd/cfgsync.rb           |  60 ++++++++++++++++----
 pcsd/pcsd.8               |   9 ++-
 pcsd/pcsd.rb              |  24 ++++++--
 pcsd/test/test_cfgsync.rb | 114 ++++++++++++++++++++++++++++++--------
 4 files changed, 167 insertions(+), 40 deletions(-)

diff --git a/pcsd/cfgsync.rb b/pcsd/cfgsync.rb
index 9acd8d0f..44e6d853 100644
--- a/pcsd/cfgsync.rb
+++ b/pcsd/cfgsync.rb
@@ -313,8 +313,11 @@ module Cfgsync
 
 
   class ConfigSyncControl
-    @thread_interval_default = 60
-    @thread_interval_minimum = 20
+    # intervals in seconds
+    @thread_interval_default = 600
+    @thread_interval_minimum = 60
+    @thread_interval_previous_not_connected_default = 60
+    @thread_interval_previous_not_connected_minimum = 20
     @file_backup_count_default = 50
     @file_backup_count_minimum = 0
 
@@ -349,6 +352,20 @@ module Cfgsync
       return self.save(data)
     end
 
+    def self.sync_thread_interval_previous_not_connected()
+      return self.get_integer_value(
+        self.load()['thread_interval_previous_not_connected'],
+        @thread_interval_previous_not_connected_default,
+        @thread_interval_previous_not_connected_minimum
+      )
+    end
+
+    def self.sync_thread_interval_previous_not_connected=(seconds)
+      data = self.load()
+      data['thread_interval_previous_not_connected'] = seconds
+      return self.save(data)
+    end
+
     def self.sync_thread_pause(semaphore_cfgsync, seconds=300)
       # wait for the thread to finish current run and disable it
       semaphore_cfgsync.synchronize {
@@ -585,14 +602,17 @@ module Cfgsync
     end
 
     def fetch_all()
-      return self.filter_configs_cluster(
-        self.get_configs_cluster(@nodes, @cluster_name),
-        @config_classes
+      node_configs, node_connected = self.get_configs_cluster(
+        @nodes, @cluster_name
       )
+      filtered_configs = self.filter_configs_cluster(
+        node_configs, @config_classes
+      )
+      return filtered_configs, node_connected
     end
 
     def fetch()
-      configs_cluster = self.fetch_all()
+      configs_cluster, node_connected = self.fetch_all()
 
       newest_configs_cluster = {}
       configs_cluster.each { |name, cfgs|
@@ -613,7 +633,7 @@ module Cfgsync
           end
         end
       }
-      return to_update_locally, to_update_in_cluster
+      return to_update_locally, to_update_in_cluster, node_connected
     end
 
     protected
@@ -630,12 +650,15 @@ module Cfgsync
       $logger.debug 'Fetching configs from the cluster'
       threads = []
       node_configs = {}
+      connected_to = {}
       nodes.each { |node|
         threads << Thread.new {
           code, out = send_request_with_token(
             @auth_user, node, 'get_configs', false, data
           )
+          connected_to[node] = false
           if 200 == code
+            connected_to[node] = true
             begin
               parsed = JSON::parse(out)
               if 'ok' == parsed['status'] and cluster_name == parsed['cluster_name']
@@ -647,7 +670,24 @@ module Cfgsync
         }
       }
       threads.each { |t| t.join }
-      return node_configs
+
+      node_connected = false
+      if connected_to.empty?()
+        node_connected = true # no nodes to connect to => no connection errors
+      else
+        connected_count = 0
+        connected_to.each { |node, connected|
+          if connected
+            connected_count += 1
+          end
+        }
+        # If we only connected to one node, consider it a fail and continue as
+        # if we could not connect anywhere. The one node is probably the local
+        # node.
+        node_connected = connected_count > 1
+      end
+
+      return node_configs, node_connected
     end
 
     def filter_configs_cluster(node_configs, wanted_configs_classes)
@@ -752,7 +792,7 @@ module Cfgsync
           fetcher = ConfigFetcher.new(
             PCSAuth.getSuperuserAuth(), [config.class], nodes, cluster_name
           )
-          cfgs_to_save, _ = fetcher.fetch()
+          cfgs_to_save, _, _ = fetcher.fetch()
           cfgs_to_save.each { |cfg_to_save|
             cfg_to_save.save() if cfg_to_save.class == config.class
           }
@@ -812,7 +852,7 @@ module Cfgsync
     fetcher = ConfigFetcher.new(
       PCSAuth.getSuperuserAuth(), [config_new.class], nodes, cluster_name
     )
-    fetched_tokens = fetcher.fetch_all()[config_new.class.name]
+    fetched_tokens, _ = fetcher.fetch_all()[config_new.class.name]
     config_new = Cfgsync::merge_tokens_files(
       config, fetched_tokens, new_tokens, new_ports
     )
diff --git a/pcsd/pcsd.8 b/pcsd/pcsd.8
index e58b7ff6..bd405043 100644
--- a/pcsd/pcsd.8
+++ b/pcsd/pcsd.8
@@ -63,9 +63,11 @@ Example:
 .br
   "thread_disabled": false,
 .br
-  "thread_interval": 60,
+  "thread_interval": 600,
 .br
-  "thread_paused_until": 1487780453,
+  "thread_interval_previous_not_connected": 60,
+.br
+  "thread_paused_until": 1487780453
 .br
 }
 
@@ -79,6 +81,9 @@ Set this to \fBtrue\fR to completely disable the synchronization.
 .B thread_interval
 How often in seconds should pcsd ask other nodes if the synchronized files have changed.
 .TP
+.B thread_interval_previous_not_connected
+How often in seconds should pcsd ask other nodes if the synchronized files have changed if during the previous attempt pcsd was unable to connect to at least two nodes.
+.TP
 .B thread_paused_until
 Disable the synchronization until the set unix timestamp.
 
diff --git a/pcsd/pcsd.rb b/pcsd/pcsd.rb
index 9f9bd091..6e5e27e0 100644
--- a/pcsd/pcsd.rb
+++ b/pcsd/pcsd.rb
@@ -132,14 +132,15 @@ set :run, false
 
 $thread_cfgsync = Thread.new {
   while true
+    node_connected = true
     $semaphore_cfgsync.synchronize {
-      $logger.debug('Config files sync thread started')
       if Cfgsync::ConfigSyncControl.sync_thread_allowed?()
+        $logger.info('Config files sync thread started')
         begin
           # do not sync if this host is not in a cluster
           cluster_name = get_cluster_name()
           cluster_nodes = get_corosync_nodes()
-          if cluster_name and !cluster_name.empty?() and cluster_nodes and !cluster_nodes.empty?
+          if cluster_name and !cluster_name.empty?() and cluster_nodes and cluster_nodes.count > 1
             $logger.debug('Config files sync thread fetching')
             fetcher = Cfgsync::ConfigFetcher.new(
               PCSAuth.getSuperuserAuth(),
@@ -147,18 +148,31 @@ $thread_cfgsync = Thread.new {
               cluster_nodes,
               cluster_name
             )
-            cfgs_to_save, _ = fetcher.fetch()
+            cfgs_to_save, _, node_connected = fetcher.fetch()
             cfgs_to_save.each { |cfg_to_save|
               cfg_to_save.save()
             }
+            $logger.info('Config files sync thread finished')
+          else
+            $logger.info(
+              'Config files sync skipped, this host does not seem to be in ' +
+              'a cluster of at least 2 nodes'
+            )
           end
         rescue => e
           $logger.warn("Config files sync thread exception: #{e}")
         end
+      else
+        $logger.info('Config files sync is disabled or paused, skipping')
       end
-      $logger.debug('Config files sync thread finished')
     }
-    sleep(Cfgsync::ConfigSyncControl.sync_thread_interval())
+    if node_connected
+      sleep(Cfgsync::ConfigSyncControl.sync_thread_interval())
+    else
+      sleep(
+        Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+      )
+    end
   end
 }
 
diff --git a/pcsd/test/test_cfgsync.rb b/pcsd/test/test_cfgsync.rb
index 9b0317ce..b49c44d2 100644
--- a/pcsd/test/test_cfgsync.rb
+++ b/pcsd/test/test_cfgsync.rb
@@ -287,8 +287,10 @@ class TestConfigSyncControll < Test::Unit::TestCase
     file = File.open(CFG_SYNC_CONTROL, 'w')
     file.write(JSON.pretty_generate({}))
     file.close()
-    @thread_interval_default = 60
-    @thread_interval_minimum = 20
+    @thread_interval_default = 600
+    @thread_interval_minimum = 60
+    @thread_interval_previous_not_connected_default = 60
+    @thread_interval_previous_not_connected_minimum = 20
     @file_backup_count_default = 50
     @file_backup_count_minimum = 0
   end
@@ -441,6 +443,65 @@ class TestConfigSyncControll < Test::Unit::TestCase
     )
   end
 
+  def test_interval_previous_not_connected()
+    assert_equal(
+      @thread_interval_previous_not_connected_default,
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+    )
+
+    interval = (
+      @thread_interval_previous_not_connected_default +
+      @thread_interval_previous_not_connected_minimum
+    )
+    assert(
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+        interval
+      )
+    )
+    assert_equal(
+      interval,
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+    )
+
+    assert(
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+        @thread_interval_previous_not_connected_minimum / 2
+      )
+    )
+    assert_equal(
+      @thread_interval_previous_not_connected_minimum,
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+    )
+
+    assert(
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(0)
+    )
+    assert_equal(
+      @thread_interval_previous_not_connected_minimum,
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+    )
+
+    assert(
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+        -100
+      )
+    )
+    assert_equal(
+      @thread_interval_previous_not_connected_minimum,
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+    )
+
+    assert(
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected=(
+        'abcd'
+      )
+    )
+    assert_equal(
+      @thread_interval_previous_not_connected_default,
+      Cfgsync::ConfigSyncControl.sync_thread_interval_previous_not_connected()
+    )
+  end
+
   def test_file_backup_count()
     assert_equal(
       @file_backup_count_default,
@@ -495,11 +556,12 @@ class TestConfigFetcher < Test::Unit::TestCase
     end
 
     def get_configs_cluster(nodes, cluster_name)
-      return @configs_cluster
+      return @configs_cluster, @node_connected
     end
 
-    def set_configs_cluster(configs)
+    def set_configs_cluster(configs, node_connected=true)
       @configs_cluster = configs
+      @node_connected = node_connected
       return self
     end
 
@@ -569,31 +631,37 @@ class TestConfigFetcher < Test::Unit::TestCase
     cfg_name = Cfgsync::ClusterConf.name
     fetcher = ConfigFetcherMock.new({}, [Cfgsync::ClusterConf], nil, nil)
 
+    # unable to connect to any nodes
+    fetcher.set_configs_local({cfg_name => cfg1})
+
+    fetcher.set_configs_cluster({}, false)
+    assert_equal([[], [], false], fetcher.fetch())
+
     # local config is synced
     fetcher.set_configs_local({cfg_name => cfg1})
 
     fetcher.set_configs_cluster({
       'node1' => {'configs' => {cfg_name => cfg1}},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {'configs' => {cfg_name => cfg2}},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {'configs' => {cfg_name => cfg1}},
       'node2' => {'configs' => {cfg_name => cfg2}},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {'configs' => {cfg_name => cfg1}},
       'node2' => {'configs' => {cfg_name => cfg2}},
       'node3' => {'configs' => {cfg_name => cfg2}},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     # local config is older
     fetcher.set_configs_local({cfg_name => cfg1})
@@ -601,20 +669,20 @@ class TestConfigFetcher < Test::Unit::TestCase
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
     })
-    assert_equal([[cfg3], []], fetcher.fetch())
+    assert_equal([[cfg3], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
     })
-    assert_equal([[cfg4], []], fetcher.fetch())
+    assert_equal([[cfg4], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
       'node3' => {cfg_name => cfg3},
     })
-    assert_equal([[cfg3], []], fetcher.fetch())
+    assert_equal([[cfg3], [], true], fetcher.fetch())
 
     # local config is newer
     fetcher.set_configs_local({cfg_name => cfg3})
@@ -622,13 +690,13 @@ class TestConfigFetcher < Test::Unit::TestCase
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg1},
     })
-    assert_equal([[], [cfg3]], fetcher.fetch())
+    assert_equal([[], [cfg3], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg1},
       'node2' => {cfg_name => cfg1},
     })
-    assert_equal([[], [cfg3]], fetcher.fetch())
+    assert_equal([[], [cfg3], true], fetcher.fetch())
 
     # local config is the same version
     fetcher.set_configs_local({cfg_name => cfg3})
@@ -636,32 +704,32 @@ class TestConfigFetcher < Test::Unit::TestCase
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg4},
     })
-    assert_equal([[cfg4], []], fetcher.fetch())
+    assert_equal([[cfg4], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
     })
-    assert_equal([[cfg4], []], fetcher.fetch())
+    assert_equal([[cfg4], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
       'node3' => {cfg_name => cfg3},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
       'node3' => {cfg_name => cfg4},
     })
-    assert_equal([[cfg4], []], fetcher.fetch())
+    assert_equal([[cfg4], [], true], fetcher.fetch())
 
     # local config is the same version
     fetcher.set_configs_local({cfg_name => cfg4})
@@ -669,32 +737,32 @@ class TestConfigFetcher < Test::Unit::TestCase
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
     })
-    assert_equal([[cfg3], []], fetcher.fetch())
+    assert_equal([[cfg3], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg4},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
       'node3' => {cfg_name => cfg3},
     })
-    assert_equal([[cfg3], []], fetcher.fetch())
+    assert_equal([[cfg3], [], true], fetcher.fetch())
 
     fetcher.set_configs_cluster({
       'node1' => {cfg_name => cfg3},
       'node2' => {cfg_name => cfg4},
       'node3' => {cfg_name => cfg4},
     })
-    assert_equal([[], []], fetcher.fetch())
+    assert_equal([[], [], true], fetcher.fetch())
   end
 end
 
-- 
2.17.0