From 18dc684896dd4c0e15434b06b53f2afa901adb8a Mon Sep 17 00:00:00 2001 From: Tomas Jelinek Date: Mon, 12 Jan 2015 15:30:33 +0100 Subject: [PATCH] stop cluster nodes in parallel --- pcs/cluster.py | 78 +++++++++++++++++++++++++++++++++++++++++++++++++--------- pcs/pcs.py | 19 +++++++++++++- pcs/utils.py | 40 ++++++++++++++++++++++++------ pcsd/pcs.rb | 4 +-- pcsd/pcsd.rb | 2 +- pcsd/remote.rb | 19 +++++++++++--- 6 files changed, 137 insertions(+), 25 deletions(-) diff --git a/pcs/cluster.py b/pcs/cluster.py index 9730e55..5e94389 100644 --- a/pcs/cluster.py +++ b/pcs/cluster.py @@ -18,6 +18,7 @@ import datetime import commands import json import xml.dom.minidom +import threading pcs_dir = os.path.dirname(os.path.realpath(__file__)) COROSYNC_CONFIG_TEMPLATE = pcs_dir + "/corosync.conf.template" @@ -125,9 +126,7 @@ def sync_start(partial_argv, nodes): for node in nodes: utils.setCorosyncConfig(node,config) print "Starting cluster on nodes: " + ", ".join(nodes) + "..." - - for node in nodes: - utils.startCluster(node) + start_cluster_nodes(nodes) def sync(partial_argv,nodes): argv = partial_argv[:] @@ -536,16 +535,29 @@ def start_cluster_all(): start_cluster_nodes(utils.getNodesFromCorosyncConf()) def start_cluster_nodes(nodes): - error_list = utils.map_for_error_list(utils.startCluster, nodes) - if len(error_list) > 0: + threads = dict() + for node in nodes: + threads[node] = NodeStartThread(node) + error_list = utils.run_node_threads(threads) + if error_list: utils.err("unable to start all nodes\n" + "\n".join(error_list)) def stop_cluster_all(): stop_cluster_nodes(utils.getNodesFromCorosyncConf()) def stop_cluster_nodes(nodes): - error_list = utils.map_for_error_list(utils.stopCluster, nodes) - if len(error_list) > 0: + threads = dict() + for node in nodes: + threads[node] = NodeStopPacemakerThread(node) + error_list = utils.run_node_threads(threads) + if error_list: + utils.err("unable to stop all nodes\n" + "\n".join(error_list)) + + threads = dict() + for node in nodes: + threads[node] = NodeStopCorosyncThread(node) + error_list = utils.run_node_threads(threads) + if error_list: utils.err("unable to stop all nodes\n" + "\n".join(error_list)) def node_standby(argv,standby=True): @@ -610,27 +622,44 @@ def disable_cluster_nodes(nodes): def destroy_cluster(argv): if len(argv) > 0: - error_list = utils.map_for_error_list(utils.destroyCluster, argv) - if len(error_list) > 0: + threads = dict() + for node in argv: + threads[node] = NodeDestroyThread(node) + error_list = utils.run_node_threads(threads) + if error_list: utils.err("unable to destroy cluster\n" + "\n".join(error_list)) - return def stop_cluster(argv): if len(argv) > 0: stop_cluster_nodes(argv) return - print "Stopping Cluster..." + stop_all = ( + "--pacemaker" not in utils.pcs_options + and + "--corosync" not in utils.pcs_options + ) + if stop_all or "--pacemaker" in utils.pcs_options: + stop_cluster_pacemaker() + if stop_all or "--corosync" in utils.pcs_options: + stop_cluster_corosync() + +def stop_cluster_pacemaker(): + print "Stopping Cluster (pacemaker)...", output, retval = utils.run(["service", "pacemaker","stop"]) if retval != 0: print output, utils.err("unable to stop pacemaker") + +def stop_cluster_corosync(): if utils.is_rhel6(): + print "Stopping Cluster (cman)...", output, retval = utils.run(["service", "cman","stop"]) if retval != 0: print output, utils.err("unable to stop cman") else: + print "Stopping Cluster (corosync)...", output, retval = utils.run(["service", "corosync","stop"]) if retval != 0: print output, @@ -1200,3 +1229,30 @@ def cluster_quorum_unblock(argv): utils.set_cib_property("startup-fencing", startup_fencing) print "Waiting for nodes cancelled" +class NodeActionThread(threading.Thread): + def __init__(self, node): + super(NodeActionThread, self).__init__() + self.node = node + self.retval = 0 + self.output = "" + +class NodeStartThread(NodeActionThread): + def run(self): + self.retval, self.output = utils.startCluster(self.node, quiet=True) + +class NodeStopPacemakerThread(NodeActionThread): + def run(self): + self.retval, self.output = utils.stopCluster( + self.node, quiet=True, pacemaker=True, corosync=False + ) + +class NodeStopCorosyncThread(NodeActionThread): + def run(self): + self.retval, self.output = utils.stopCluster( + self.node, quiet=True, pacemaker=False, corosync=True + ) + +class NodeDestroyThread(NodeActionThread): + def run(self): + self.retval, self.output = utils.destroyCluster(self.node, quiet=True) + diff --git a/pcs/pcs.py b/pcs/pcs.py index b2c3f4b..a0c0df5 100755 --- a/pcs/pcs.py +++ b/pcs/pcs.py @@ -54,7 +54,24 @@ def main(argv): pcs_short_options_with_args.append(prev_char) prev_char = c - pcs_long_options = ["local","start","all","clone","master","force","corosync_conf=", "defaults","debug","version","help","fullhelp","off","from=","to=", "name=", "wait", "group=","groups","full","enable","node=","nodesc","transport=", "addr0=","addr1=","bcast0=","bcast1=","mcast0=","mcast1=","mcastport0=","mcastport1=","ttl0=","ttl1=","rrpmode=", "broadcast0", "broadcast1","wait_for_all=","auto_tie_breaker=","last_man_standing=", "last_man_standing_window=","no-default-ops","ipv6","token=", "token_coefficient=", "consensus=", "miss_count_const=", "fail_recv_const=","join=", "disabled", "after=", "before=", "autocorrect", "interactive", "autodelete"] + pcs_long_options = [ + "debug", "version", "help", "fullhelp", + "force", "autocorrect", "interactive", "autodelete", + "all", "full", "groups", "local", "wait", "config", + "start", "enable", "disabled", "off", + "pacemaker", "corosync", + "no-default-ops", "defaults", "nodesc", + "clone", "master", "name=", "group=", "node=", + "from=", "to=", "after=", "before=", + "transport=", "rrpmode=", "ipv6", + "addr0=", "bcast0=", "mcast0=", "mcastport0=", "ttl0=", "broadcast0", + "addr1=", "bcast1=", "mcast1=", "mcastport1=", "ttl1=", "broadcast1", + "wait_for_all=", "auto_tie_breaker=", "last_man_standing=", + "last_man_standing_window=", + "token=", "token_coefficient=", "consensus=", "join=", + "miss_count_const=", "fail_recv_const=", + "corosync_conf=", "cluster_conf=", + ] # pull out negative number arguments and add them back after getopt prev_arg = "" for arg in argv: diff --git a/pcs/utils.py b/pcs/utils.py index 724519a..1f41ae0 100644 --- a/pcs/utils.py +++ b/pcs/utils.py @@ -208,11 +208,19 @@ def setCorosyncConfig(node,config): if status != 0: err("Unable to set corosync config") -def startCluster(node): - return sendHTTPRequest(node, 'remote/cluster_start', None, False, True) - -def stopCluster(node): - return sendHTTPRequest(node, 'remote/cluster_stop', None, False, True) +def startCluster(node, quiet=False): + return sendHTTPRequest(node, 'remote/cluster_start', None, False, not quiet) + +def stopCluster(node, quiet=False, pacemaker=True, corosync=True): + if (pacemaker and corosync) or (not pacemaker and not corosync): + data = None + elif pacemaker: + data = {"component": "pacemaker"} + elif corosync: + data = {"component": "corosync"} + if data: + data = urllib.urlencode(data) + return sendHTTPRequest(node, 'remote/cluster_stop', data, False, not quiet) def enableCluster(node): return sendHTTPRequest(node, 'remote/cluster_enable', None, False, True) @@ -220,8 +228,8 @@ def enableCluster(node): def disableCluster(node): return sendHTTPRequest(node, 'remote/cluster_disable', None, False, True) -def destroyCluster(node): - return sendHTTPRequest(node, 'remote/cluster_destroy') +def destroyCluster(node, quiet=False): + return sendHTTPRequest(node, 'remote/cluster_destroy', None, not quiet, not quiet) def restoreConfig(node, tarball_data): data = urllib.urlencode({"tarball": tarball_data}) @@ -730,6 +738,24 @@ def map_for_error_list(callab, iterab): error_list.append(err) return error_list +def run_node_threads(node_threads): + error_list = [] + for node, thread in node_threads.items(): + thread.daemon = True + thread.start() + while node_threads: + for node in node_threads.keys(): + thread = node_threads[node] + thread.join(1) + if thread.is_alive(): + continue + output = node + ": " + thread.output.strip() + print output + if thread.retval != 0: + error_list.append(output) + del node_threads[node] + return error_list + # Check is something exists in the CIB, if it does return it, if not, return # an empty string def does_exist(xpath_query): diff --git a/pcsd/pcs.rb b/pcsd/pcs.rb index a1acfdc..3fad833 100644 --- a/pcsd/pcs.rb +++ b/pcsd/pcs.rb @@ -283,7 +283,7 @@ def send_cluster_request_with_token(cluster_name, request, post=false, data={}, return code,out end -def send_request_with_token(node,request, post=false, data={}, remote=true, raw_data = nil) +def send_request_with_token(node, request, post=false, data={}, remote=true, raw_data=nil, timeout=30) start = Time.now begin retval, token = get_node_token(node) @@ -312,7 +312,7 @@ def send_request_with_token(node,request, post=false, data={}, remote=true, raw_ myhttp.use_ssl = true myhttp.verify_mode = OpenSSL::SSL::VERIFY_NONE res = myhttp.start do |http| - http.read_timeout = 30 + http.read_timeout = timeout http.request(req) end return res.code.to_i, res.body diff --git a/pcsd/pcsd.rb b/pcsd/pcsd.rb index c653ae2..94fdae2 100644 --- a/pcsd/pcsd.rb +++ b/pcsd/pcsd.rb @@ -428,7 +428,7 @@ if not DISABLE_GUI } $logger.info("Sending setup cluster request for: " + @cluster_name + " to: " + @nodes[0]) - code,out = send_request_with_token(@nodes[0], "setup_cluster", true, {:clustername => @cluster_name, :nodes => @nodes_rrp.join(';'), :options => options.to_json}) + code,out = send_request_with_token(@nodes[0], "setup_cluster", true, {:clustername => @cluster_name, :nodes => @nodes_rrp.join(';'), :options => options.to_json}, true, nil, 60) if code == 200 pcs_config.clusters << Cluster.new(@cluster_name, @nodes) diff --git a/pcsd/remote.rb b/pcsd/remote.rb index 9709941..2245d47 100644 --- a/pcsd/remote.rb +++ b/pcsd/remote.rb @@ -151,10 +151,23 @@ end def cluster_stop(params) if params[:name] - code, response = send_request_with_token(params[:name], 'cluster_stop', true) + params_without_name = params.reject {|key, value| + key == "name" or key == :name + } + code, response = send_request_with_token( + params[:name], 'cluster_stop', true, params_without_name + ) else - $logger.info "Starting Daemons" - output = `#{PCS} cluster stop` + options = "" + if params.has_key?("component") + if params["component"].downcase == "pacemaker" + options = "--pacemaker" + elsif params["component"].downcase == "corosync" + options = "--corosync" + end + end + $logger.info "Stopping Daemons #{options}" + output = `#{PCS} cluster stop #{options}` $logger.debug output return output end -- 1.9.1