Blob Blame History Raw
From 18dc684896dd4c0e15434b06b53f2afa901adb8a Mon Sep 17 00:00:00 2001
From: Tomas Jelinek <tojeline@redhat.com>
Date: Mon, 12 Jan 2015 15:30:33 +0100
Subject: [PATCH] stop cluster nodes in parallel

---
 pcs/cluster.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++++---------
 pcs/pcs.py     | 19 +++++++++++++-
 pcs/utils.py   | 40 ++++++++++++++++++++++++------
 pcsd/pcs.rb    |  4 +--
 pcsd/pcsd.rb   |  2 +-
 pcsd/remote.rb | 19 +++++++++++---
 6 files changed, 137 insertions(+), 25 deletions(-)

diff --git a/pcs/cluster.py b/pcs/cluster.py
index 9730e55..5e94389 100644
--- a/pcs/cluster.py
+++ b/pcs/cluster.py
@@ -18,6 +18,7 @@ import datetime
 import commands
 import json
 import xml.dom.minidom
+import threading
 
 pcs_dir = os.path.dirname(os.path.realpath(__file__))
 COROSYNC_CONFIG_TEMPLATE = pcs_dir + "/corosync.conf.template"
@@ -125,9 +126,7 @@ def sync_start(partial_argv, nodes):
     for node in nodes:
         utils.setCorosyncConfig(node,config)
     print "Starting cluster on nodes: " + ", ".join(nodes) + "..."
-
-    for node in nodes:
-        utils.startCluster(node)
+    start_cluster_nodes(nodes)
 
 def sync(partial_argv,nodes):
     argv = partial_argv[:]
@@ -536,16 +535,29 @@ def start_cluster_all():
     start_cluster_nodes(utils.getNodesFromCorosyncConf())
 
 def start_cluster_nodes(nodes):
-    error_list = utils.map_for_error_list(utils.startCluster, nodes)
-    if len(error_list) > 0:
+    threads = dict()
+    for node in nodes:
+        threads[node] = NodeStartThread(node)
+    error_list = utils.run_node_threads(threads)
+    if error_list:
         utils.err("unable to start all nodes\n" + "\n".join(error_list))
 
 def stop_cluster_all():
     stop_cluster_nodes(utils.getNodesFromCorosyncConf())
 
 def stop_cluster_nodes(nodes):
-    error_list = utils.map_for_error_list(utils.stopCluster, nodes)
-    if len(error_list) > 0:
+    threads = dict()
+    for node in nodes:
+        threads[node] = NodeStopPacemakerThread(node)
+    error_list = utils.run_node_threads(threads)
+    if error_list:
+        utils.err("unable to stop all nodes\n" + "\n".join(error_list))
+
+    threads = dict()
+    for node in nodes:
+        threads[node] = NodeStopCorosyncThread(node)
+    error_list = utils.run_node_threads(threads)
+    if error_list:
         utils.err("unable to stop all nodes\n" + "\n".join(error_list))
 
 def node_standby(argv,standby=True):
@@ -610,27 +622,44 @@ def disable_cluster_nodes(nodes):
 
 def destroy_cluster(argv):
     if len(argv) > 0:
-        error_list = utils.map_for_error_list(utils.destroyCluster, argv)
-        if len(error_list) > 0:
+        threads = dict()
+        for node in argv:
+            threads[node] = NodeDestroyThread(node)
+        error_list = utils.run_node_threads(threads)
+        if error_list:
             utils.err("unable to destroy cluster\n" + "\n".join(error_list))
-        return
 
 def stop_cluster(argv):
     if len(argv) > 0:
         stop_cluster_nodes(argv)
         return
 
-    print "Stopping Cluster..."
+    stop_all = (
+        "--pacemaker" not in utils.pcs_options
+        and
+        "--corosync" not in utils.pcs_options
+    )
+    if stop_all or "--pacemaker" in utils.pcs_options:
+        stop_cluster_pacemaker()
+    if stop_all or "--corosync" in utils.pcs_options:
+        stop_cluster_corosync()
+
+def stop_cluster_pacemaker():
+    print "Stopping Cluster (pacemaker)...",
     output, retval = utils.run(["service", "pacemaker","stop"])
     if retval != 0:
         print output,
         utils.err("unable to stop pacemaker")
+
+def stop_cluster_corosync():
     if utils.is_rhel6():
+        print "Stopping Cluster (cman)...",
         output, retval = utils.run(["service", "cman","stop"])
         if retval != 0:
             print output,
             utils.err("unable to stop cman")
     else:
+        print "Stopping Cluster (corosync)...",
         output, retval = utils.run(["service", "corosync","stop"])
         if retval != 0:
             print output,
@@ -1200,3 +1229,30 @@ def cluster_quorum_unblock(argv):
     utils.set_cib_property("startup-fencing", startup_fencing)
     print "Waiting for nodes cancelled"
 
+class NodeActionThread(threading.Thread):
+    def __init__(self, node):
+        super(NodeActionThread, self).__init__()
+        self.node = node
+        self.retval = 0
+        self.output = ""
+
+class NodeStartThread(NodeActionThread):
+    def run(self):
+        self.retval, self.output = utils.startCluster(self.node, quiet=True)
+
+class NodeStopPacemakerThread(NodeActionThread):
+    def run(self):
+        self.retval, self.output = utils.stopCluster(
+            self.node, quiet=True, pacemaker=True, corosync=False
+        )
+
+class NodeStopCorosyncThread(NodeActionThread):
+    def run(self):
+        self.retval, self.output = utils.stopCluster(
+            self.node, quiet=True, pacemaker=False, corosync=True
+        )
+
+class NodeDestroyThread(NodeActionThread):
+    def run(self):
+        self.retval, self.output = utils.destroyCluster(self.node, quiet=True)
+
diff --git a/pcs/pcs.py b/pcs/pcs.py
index b2c3f4b..a0c0df5 100755
--- a/pcs/pcs.py
+++ b/pcs/pcs.py
@@ -54,7 +54,24 @@ def main(argv):
                 pcs_short_options_with_args.append(prev_char)
             prev_char = c
 
-        pcs_long_options = ["local","start","all","clone","master","force","corosync_conf=", "defaults","debug","version","help","fullhelp","off","from=","to=", "name=", "wait", "group=","groups","full","enable","node=","nodesc","transport=", "addr0=","addr1=","bcast0=","bcast1=","mcast0=","mcast1=","mcastport0=","mcastport1=","ttl0=","ttl1=","rrpmode=", "broadcast0", "broadcast1","wait_for_all=","auto_tie_breaker=","last_man_standing=", "last_man_standing_window=","no-default-ops","ipv6","token=", "token_coefficient=", "consensus=", "miss_count_const=", "fail_recv_const=","join=", "disabled", "after=", "before=", "autocorrect", "interactive", "autodelete"]
+        pcs_long_options = [
+            "debug", "version", "help", "fullhelp",
+            "force", "autocorrect", "interactive", "autodelete",
+            "all", "full", "groups", "local", "wait", "config",
+            "start", "enable", "disabled", "off",
+            "pacemaker", "corosync",
+            "no-default-ops", "defaults", "nodesc",
+            "clone", "master", "name=", "group=", "node=",
+            "from=", "to=", "after=", "before=",
+            "transport=", "rrpmode=", "ipv6",
+            "addr0=", "bcast0=", "mcast0=", "mcastport0=", "ttl0=", "broadcast0",
+            "addr1=", "bcast1=", "mcast1=", "mcastport1=", "ttl1=", "broadcast1",
+            "wait_for_all=", "auto_tie_breaker=", "last_man_standing=",
+            "last_man_standing_window=",
+            "token=", "token_coefficient=", "consensus=", "join=",
+            "miss_count_const=", "fail_recv_const=",
+            "corosync_conf=", "cluster_conf=",
+        ]
         # pull out negative number arguments and add them back after getopt
         prev_arg = ""
         for arg in argv:
diff --git a/pcs/utils.py b/pcs/utils.py
index 724519a..1f41ae0 100644
--- a/pcs/utils.py
+++ b/pcs/utils.py
@@ -208,11 +208,19 @@ def setCorosyncConfig(node,config):
         if status != 0:
             err("Unable to set corosync config")
 
-def startCluster(node):
-    return sendHTTPRequest(node, 'remote/cluster_start', None, False, True)
-
-def stopCluster(node):
-    return sendHTTPRequest(node, 'remote/cluster_stop', None, False, True)
+def startCluster(node, quiet=False):
+    return sendHTTPRequest(node, 'remote/cluster_start', None, False, not quiet)
+
+def stopCluster(node, quiet=False, pacemaker=True, corosync=True):
+    if (pacemaker and corosync) or (not pacemaker and not corosync):
+        data = None
+    elif pacemaker:
+        data = {"component": "pacemaker"}
+    elif corosync:
+        data = {"component": "corosync"}
+    if data:
+        data = urllib.urlencode(data)
+    return sendHTTPRequest(node, 'remote/cluster_stop', data, False, not quiet)
 
 def enableCluster(node):
     return sendHTTPRequest(node, 'remote/cluster_enable', None, False, True)
@@ -220,8 +228,8 @@ def enableCluster(node):
 def disableCluster(node):
     return sendHTTPRequest(node, 'remote/cluster_disable', None, False, True)
 
-def destroyCluster(node):
-    return sendHTTPRequest(node, 'remote/cluster_destroy')
+def destroyCluster(node, quiet=False):
+    return sendHTTPRequest(node, 'remote/cluster_destroy', None, not quiet, not quiet)
 
 def restoreConfig(node, tarball_data):
     data = urllib.urlencode({"tarball": tarball_data})
@@ -730,6 +738,24 @@ def map_for_error_list(callab, iterab):
             error_list.append(err)
     return error_list
 
+def run_node_threads(node_threads):
+    error_list = []
+    for node, thread in node_threads.items():
+        thread.daemon = True
+        thread.start()
+    while node_threads:
+        for node in node_threads.keys():
+            thread = node_threads[node]
+            thread.join(1)
+            if thread.is_alive():
+                continue
+            output = node + ": " + thread.output.strip()
+            print output
+            if thread.retval != 0:
+                error_list.append(output)
+            del node_threads[node]
+    return error_list
+
 # Check is something exists in the CIB, if it does return it, if not, return
 #  an empty string
 def does_exist(xpath_query):
diff --git a/pcsd/pcs.rb b/pcsd/pcs.rb
index a1acfdc..3fad833 100644
--- a/pcsd/pcs.rb
+++ b/pcsd/pcs.rb
@@ -283,7 +283,7 @@ def send_cluster_request_with_token(cluster_name, request, post=false, data={},
   return code,out
 end
 
-def send_request_with_token(node,request, post=false, data={}, remote=true, raw_data = nil)
+def send_request_with_token(node, request, post=false, data={}, remote=true, raw_data=nil, timeout=30)
   start = Time.now
   begin
     retval, token = get_node_token(node)
@@ -312,7 +312,7 @@ def send_request_with_token(node,request, post=false, data={}, remote=true, raw_
     myhttp.use_ssl = true
     myhttp.verify_mode = OpenSSL::SSL::VERIFY_NONE
     res = myhttp.start do |http|
-      http.read_timeout = 30 
+      http.read_timeout = timeout
       http.request(req)
     end
     return res.code.to_i, res.body
diff --git a/pcsd/pcsd.rb b/pcsd/pcsd.rb
index c653ae2..94fdae2 100644
--- a/pcsd/pcsd.rb
+++ b/pcsd/pcsd.rb
@@ -428,7 +428,7 @@ if not DISABLE_GUI
     }
 
     $logger.info("Sending setup cluster request for: " + @cluster_name + " to: " + @nodes[0])
-    code,out = send_request_with_token(@nodes[0], "setup_cluster", true, {:clustername => @cluster_name, :nodes => @nodes_rrp.join(';'), :options => options.to_json})
+    code,out = send_request_with_token(@nodes[0], "setup_cluster", true, {:clustername => @cluster_name, :nodes => @nodes_rrp.join(';'), :options => options.to_json}, true, nil, 60)
 
     if code == 200
       pcs_config.clusters << Cluster.new(@cluster_name, @nodes)
diff --git a/pcsd/remote.rb b/pcsd/remote.rb
index 9709941..2245d47 100644
--- a/pcsd/remote.rb
+++ b/pcsd/remote.rb
@@ -151,10 +151,23 @@ end
 
 def cluster_stop(params)
   if params[:name]
-    code, response = send_request_with_token(params[:name], 'cluster_stop', true)
+    params_without_name = params.reject {|key, value|
+      key == "name" or key == :name
+    }
+    code, response = send_request_with_token(
+      params[:name], 'cluster_stop', true, params_without_name
+    )
   else
-    $logger.info "Starting Daemons"
-    output =  `#{PCS} cluster stop`
+    options = ""
+    if params.has_key?("component")
+      if params["component"].downcase == "pacemaker"
+        options = "--pacemaker"
+      elsif params["component"].downcase == "corosync"
+        options = "--corosync"
+      end
+    end
+    $logger.info "Stopping Daemons #{options}"
+    output =  `#{PCS} cluster stop #{options}`
     $logger.debug output
     return output
   end
-- 
1.9.1