From db8643c4489274faee0bba008846a63c2ab63f46 Mon Sep 17 00:00:00 2001 From: Tomas Jelinek Date: Wed, 15 Jun 2016 14:52:39 +0200 Subject: [PATCH] bz1158805-01-add support for qdevice-qnetd provided by corosync --- pcs/cli/common/lib_wrapper.py | 10 + pcs/cluster.py | 119 +- pcs/common/report_codes.py | 31 +- pcs/lib/commands/qdevice.py | 88 +- pcs/lib/commands/quorum.py | 217 +- pcs/lib/corosync/config_facade.py | 98 +- pcs/lib/corosync/live.py | 15 + pcs/lib/corosync/qdevice_client.py | 93 + pcs/lib/corosync/qdevice_net.py | 314 ++- pcs/lib/env.py | 11 +- pcs/lib/errors.py | 6 +- pcs/lib/external.py | 44 +- pcs/lib/nodes_task.py | 69 +- pcs/lib/reports.py | 225 +- pcs/pcs.8 | 27 +- pcs/qdevice.py | 71 + pcs/quorum.py | 34 +- pcs/settings_default.py | 6 +- pcs/test/resources/qdevice-certs/qnetd-cacert.crt | 1 + pcs/test/test_lib_commands_qdevice.py | 255 ++ pcs/test/test_lib_commands_quorum.py | 1109 ++++++++- pcs/test/test_lib_corosync_config_facade.py | 367 ++- pcs/test/test_lib_corosync_live.py | 62 +- pcs/test/test_lib_corosync_qdevice_client.py | 60 + pcs/test/test_lib_corosync_qdevice_net.py | 965 +++++++- pcs/test/test_lib_env.py | 142 +- pcs/test/test_lib_external.py | 126 +- pcs/test/test_lib_nodes_task.py | 168 +- pcs/test/test_quorum.py | 9 +- pcs/test/test_utils.py | 2628 +++++++++++---------- pcs/usage.py | 53 +- pcs/utils.py | 147 +- pcsd/pcs.rb | 17 + pcsd/remote.rb | 163 +- pcsd/settings.rb | 6 + pcsd/settings.rb.debian | 10 +- 36 files changed, 6170 insertions(+), 1596 deletions(-) create mode 100644 pcs/lib/corosync/qdevice_client.py create mode 100644 pcs/test/resources/qdevice-certs/qnetd-cacert.crt create mode 100644 pcs/test/test_lib_corosync_qdevice_client.py diff --git a/pcs/cli/common/lib_wrapper.py b/pcs/cli/common/lib_wrapper.py index 2ba5602..2dd5810 100644 --- a/pcs/cli/common/lib_wrapper.py +++ b/pcs/cli/common/lib_wrapper.py @@ -117,6 +117,8 @@ def load_module(env, middleware_factory, name): "get_config": quorum.get_config, "remove_device": quorum.remove_device, "set_options": quorum.set_options, + "status": quorum.status_text, + "status_device": quorum.status_device_text, "update_device": quorum.update_device, } ) @@ -125,6 +127,7 @@ def load_module(env, middleware_factory, name): env, middleware.build(), { + "status": qdevice.qdevice_status_text, "setup": qdevice.qdevice_setup, "destroy": qdevice.qdevice_destroy, "start": qdevice.qdevice_start, @@ -132,6 +135,13 @@ def load_module(env, middleware_factory, name): "kill": qdevice.qdevice_kill, "enable": qdevice.qdevice_enable, "disable": qdevice.qdevice_disable, + # following commands are internal use only, called from pcsd + "client_net_setup": qdevice.client_net_setup, + "client_net_import_certificate": + qdevice.client_net_import_certificate, + "client_net_destroy": qdevice.client_net_destroy, + "sign_net_cert_request": + qdevice.qdevice_net_sign_certificate_request, } ) if name == "sbd": diff --git a/pcs/cluster.py b/pcs/cluster.py index 002b5c5..988ab75 100644 --- a/pcs/cluster.py +++ b/pcs/cluster.py @@ -36,23 +36,29 @@ from pcs import ( ) from pcs.utils import parallel_for_nodes from pcs.common import report_codes +from pcs.cli.common.reports import process_library_reports from pcs.lib import ( pacemaker as lib_pacemaker, sbd as lib_sbd, reports as lib_reports, ) -from pcs.lib.tools import environment_file_to_dict +from pcs.lib.commands.quorum import _add_device_model_net +from pcs.lib.corosync import ( + config_parser as corosync_conf_utils, + qdevice_net, +) +from pcs.lib.corosync.config_facade import ConfigFacade as corosync_conf_facade +from pcs.lib.errors import ( + LibraryError, + ReportItemSeverity, +) from pcs.lib.external import ( disable_service, NodeCommunicationException, node_communicator_exception_to_report_item, ) from pcs.lib.node import NodeAddresses -from pcs.lib.errors import ( - LibraryError, - ReportItemSeverity, -) -from pcs.lib.corosync import config_parser as corosync_conf_utils +from pcs.lib.tools import environment_file_to_dict def cluster_cmd(argv): if len(argv) == 0: @@ -288,7 +294,7 @@ def cluster_setup(argv): ) if udpu_rrp and "rrp_mode" not in options["transport_options"]: options["transport_options"]["rrp_mode"] = "passive" - utils.process_library_reports(messages) + process_library_reports(messages) # prepare config file if is_rhel6: @@ -306,7 +312,7 @@ def cluster_setup(argv): options["totem_options"], options["quorum_options"] ) - utils.process_library_reports(messages) + process_library_reports(messages) # setup on the local node if "--local" in utils.pcs_options: @@ -870,6 +876,7 @@ def start_cluster(argv): return print("Starting Cluster...") + service_list = [] if utils.is_rhel6(): # Verify that CMAN_QUORUM_TIMEOUT is set, if not, then we set it to 0 retval, output = getstatusoutput('source /etc/sysconfig/cman ; [ -z "$CMAN_QUORUM_TIMEOUT" ]') @@ -882,14 +889,15 @@ def start_cluster(argv): print(output) utils.err("unable to start cman") else: - output, retval = utils.run(["service", "corosync","start"]) + service_list.append("corosync") + if utils.need_to_handle_qdevice_service(): + service_list.append("corosync-qdevice") + service_list.append("pacemaker") + for service in service_list: + output, retval = utils.run(["service", service, "start"]) if retval != 0: print(output) - utils.err("unable to start corosync") - output, retval = utils.run(["service", "pacemaker", "start"]) - if retval != 0: - print(output) - utils.err("unable to start pacemaker") + utils.err("unable to start {0}".format(service)) if wait: wait_for_nodes_started([], wait_timeout) @@ -1035,14 +1043,20 @@ def enable_cluster(argv): enable_cluster_nodes(argv) return - utils.enableServices() + try: + utils.enableServices() + except LibraryError as e: + process_library_reports(e.args) def disable_cluster(argv): if len(argv) > 0: disable_cluster_nodes(argv) return - utils.disableServices() + try: + utils.disableServices() + except LibraryError as e: + process_library_reports(e.args) def enable_cluster_all(): enable_cluster_nodes(utils.getNodesFromCorosyncConf()) @@ -1132,13 +1146,18 @@ def stop_cluster_corosync(): utils.err("unable to stop cman") else: print("Stopping Cluster (corosync)...") - output, retval = utils.run(["service", "corosync","stop"]) - if retval != 0: - print(output) - utils.err("unable to stop corosync") + service_list = [] + if utils.need_to_handle_qdevice_service(): + service_list.append("corosync-qdevice") + service_list.append("corosync") + for service in service_list: + output, retval = utils.run(["service", service, "stop"]) + if retval != 0: + print(output) + utils.err("unable to stop {0}".format(service)) def kill_cluster(argv): - daemons = ["crmd", "pengine", "attrd", "lrmd", "stonithd", "cib", "pacemakerd", "corosync"] + daemons = ["crmd", "pengine", "attrd", "lrmd", "stonithd", "cib", "pacemakerd", "corosync-qdevice", "corosync"] dummy_output, dummy_retval = utils.run(["killall", "-9"] + daemons) # if dummy_retval != 0: # print "Error: unable to execute killall -9" @@ -1321,19 +1340,16 @@ def cluster_node(argv): "cluster is not configured for RRP, " "you must not specify ring 1 address for the node" ) - utils.check_qdevice_algorithm_and_running_cluster( - utils.getCorosyncConf(), add=True - ) corosync_conf = None (canAdd, error) = utils.canAddNodeToCluster(node0) if not canAdd: utils.err("Unable to add '%s' to cluster: %s" % (node0, error)) + lib_env = utils.get_lib_env() + report_processor = lib_env.report_processor + node_communicator = lib_env.node_communicator() + node_addr = NodeAddresses(node0, node1) try: - node_addr = NodeAddresses(node0, node1) - lib_env = utils.get_lib_env() - report_processor = lib_env.report_processor - node_communicator = lib_env.node_communicator() if lib_sbd.is_sbd_enabled(utils.cmd_runner()): if "--watchdog" not in utils.pcs_options: watchdog = settings.sbd_watchdog_default @@ -1367,9 +1383,9 @@ def cluster_node(argv): report_processor, node_communicator, node_addr ) except LibraryError as e: - utils.process_library_reports(e.args) + process_library_reports(e.args) except NodeCommunicationException as e: - utils.process_library_reports( + process_library_reports( [node_communicator_exception_to_report_item(e)] ) @@ -1383,6 +1399,8 @@ def cluster_node(argv): else: print("%s: Corosync updated" % my_node) corosync_conf = output + # corosync.conf must be reloaded before the new node is started + output, retval = utils.reloadCorosync() if corosync_conf != None: # send local cluster pcsd configs to the new node # may be used for sending corosync config as well in future @@ -1406,6 +1424,25 @@ def cluster_node(argv): except: utils.err('Unable to communicate with pcsd') + # set qdevice-net certificates if needed + if not utils.is_rhel6(): + try: + conf_facade = corosync_conf_facade.from_string( + corosync_conf + ) + qdevice_model, qdevice_model_options, _ = conf_facade.get_quorum_device_settings() + if qdevice_model == "net": + _add_device_model_net( + lib_env, + qdevice_model_options["host"], + conf_facade.get_cluster_name(), + [node_addr], + skip_offline_nodes=False + ) + except LibraryError as e: + process_library_reports(e.args) + + print("Setting up corosync...") utils.setCorosyncConfig(node0, corosync_conf) if "--enable" in utils.pcs_options: retval, err = utils.enableCluster(node0) @@ -1421,7 +1458,6 @@ def cluster_node(argv): pcsd.pcsd_sync_certs([node0], exit_after_error=False) else: utils.err("Unable to update any nodes") - output, retval = utils.reloadCorosync() if utils.is_cman_with_udpu_transport(): print("Warning: Using udpu transport on a CMAN cluster, " + "cluster restart is required to apply node addition") @@ -1433,9 +1469,6 @@ def cluster_node(argv): utils.err( "node '%s' does not appear to exist in configuration" % node0 ) - utils.check_qdevice_algorithm_and_running_cluster( - utils.getCorosyncConf(), add=False - ) if "--force" not in utils.pcs_options: retval, data = utils.get_remote_quorumtool_output(node0) if retval != 0: @@ -1697,10 +1730,18 @@ def cluster_destroy(argv): else: print("Shutting down pacemaker/corosync services...") os.system("service pacemaker stop") + # returns error if qdevice is not running, it is safe to ignore it + # since we want it not to be running + os.system("service corosync-qdevice stop") os.system("service corosync stop") print("Killing any remaining services...") - os.system("killall -q -9 corosync aisexec heartbeat pacemakerd ccm stonithd ha_logd lrmd crmd pengine attrd pingd mgmtd cib fenced dlm_controld gfs_controld") - utils.disableServices() + os.system("killall -q -9 corosync corosync-qdevice aisexec heartbeat pacemakerd ccm stonithd ha_logd lrmd crmd pengine attrd pingd mgmtd cib fenced dlm_controld gfs_controld") + try: + utils.disableServices() + except: + # previously errors were suppressed in here, let's keep it that way + # for now + pass try: disable_service(utils.cmd_runner(), "sbd") except: @@ -1716,6 +1757,12 @@ def cluster_destroy(argv): "pe*.bz2","cib.*"] for name in state_files: os.system("find /var/lib -name '"+name+"' -exec rm -f \{\} \;") + try: + qdevice_net.client_destroy() + except: + # errors from deleting other files are suppressed as well + # we do not want to fail if qdevice was not set up + pass def cluster_verify(argv): nofilename = True diff --git a/pcs/common/report_codes.py b/pcs/common/report_codes.py index bda982a..afe0554 100644 --- a/pcs/common/report_codes.py +++ b/pcs/common/report_codes.py @@ -45,6 +45,8 @@ COROSYNC_CONFIG_RELOAD_ERROR = "COROSYNC_CONFIG_RELOAD_ERROR" COROSYNC_NOT_RUNNING_CHECK_STARTED = "COROSYNC_NOT_RUNNING_CHECK_STARTED" COROSYNC_NOT_RUNNING_CHECK_NODE_ERROR = "COROSYNC_NOT_RUNNING_CHECK_NODE_ERROR" COROSYNC_NOT_RUNNING_ON_NODE = "COROSYNC_NOT_RUNNING_ON_NODE" +COROSYNC_OPTIONS_INCOMPATIBLE_WITH_QDEVICE = "COROSYNC_OPTIONS_INCOMPATIBLE_WITH_QDEVICE" +COROSYNC_QUORUM_GET_STATUS_ERROR = "COROSYNC_QUORUM_GET_STATUS_ERROR" COROSYNC_RUNNING_ON_NODE = "COROSYNC_RUNNING_ON_NODE" CRM_MON_ERROR = "CRM_MON_ERROR" DUPLICATE_CONSTRAINTS_EXIST = "DUPLICATE_CONSTRAINTS_EXIST" @@ -62,11 +64,11 @@ INVALID_SCORE = "INVALID_SCORE" INVALID_TIMEOUT_VALUE = "INVALID_TIMEOUT_VALUE" MULTIPLE_SCORE_OPTIONS = "MULTIPLE_SCORE_OPTIONS" NODE_COMMUNICATION_COMMAND_UNSUCCESSFUL = "NODE_COMMUNICATION_COMMAND_UNSUCCESSFUL" -NODE_COMMUNICATION_ERROR = "NODE_COMMUNICATION_ERROR", -NODE_COMMUNICATION_ERROR_NOT_AUTHORIZED = "NODE_COMMUNICATION_ERROR_NOT_AUTHORIZED", -NODE_COMMUNICATION_ERROR_PERMISSION_DENIED = "NODE_COMMUNICATION_ERROR_PERMISSION_DENIED", -NODE_COMMUNICATION_ERROR_UNABLE_TO_CONNECT = "NODE_COMMUNICATION_ERROR_UNABLE_TO_CONNECT", -NODE_COMMUNICATION_ERROR_UNSUPPORTED_COMMAND = "NODE_COMMUNICATION_ERROR_UNSUPPORTED_COMMAND", +NODE_COMMUNICATION_ERROR = "NODE_COMMUNICATION_ERROR" +NODE_COMMUNICATION_ERROR_NOT_AUTHORIZED = "NODE_COMMUNICATION_ERROR_NOT_AUTHORIZED" +NODE_COMMUNICATION_ERROR_PERMISSION_DENIED = "NODE_COMMUNICATION_ERROR_PERMISSION_DENIED" +NODE_COMMUNICATION_ERROR_UNABLE_TO_CONNECT = "NODE_COMMUNICATION_ERROR_UNABLE_TO_CONNECT" +NODE_COMMUNICATION_ERROR_UNSUPPORTED_COMMAND = "NODE_COMMUNICATION_ERROR_UNSUPPORTED_COMMAND" NODE_COMMUNICATION_FINISHED = "NODE_COMMUNICATION_FINISHED" NODE_COMMUNICATION_NOT_CONNECTED = "NODE_COMMUNICATION_NOT_CONNECTED" NODE_COMMUNICATION_STARTED = "NODE_COMMUNICATION_STARTED" @@ -74,16 +76,25 @@ NODE_NOT_FOUND = "NODE_NOT_FOUND" NON_UDP_TRANSPORT_ADDR_MISMATCH = 'NON_UDP_TRANSPORT_ADDR_MISMATCH' OMITTING_NODE = "OMITTING_NODE" PACEMAKER_LOCAL_NODE_NAME_NOT_FOUND = "PACEMAKER_LOCAL_NODE_NAME_NOT_FOUND" -PARSE_ERROR_COROSYNC_CONF_MISSING_CLOSING_BRACE = "PARSE_ERROR_COROSYNC_CONF_MISSING_CLOSING_BRACE", -PARSE_ERROR_COROSYNC_CONF = "PARSE_ERROR_COROSYNC_CONF", -PARSE_ERROR_COROSYNC_CONF_UNEXPECTED_CLOSING_BRACE = "PARSE_ERROR_COROSYNC_CONF_UNEXPECTED_CLOSING_BRACE", +PARSE_ERROR_COROSYNC_CONF_MISSING_CLOSING_BRACE = "PARSE_ERROR_COROSYNC_CONF_MISSING_CLOSING_BRACE" +PARSE_ERROR_COROSYNC_CONF = "PARSE_ERROR_COROSYNC_CONF" +PARSE_ERROR_COROSYNC_CONF_UNEXPECTED_CLOSING_BRACE = "PARSE_ERROR_COROSYNC_CONF_UNEXPECTED_CLOSING_BRACE" QDEVICE_ALREADY_DEFINED = "QDEVICE_ALREADY_DEFINED" QDEVICE_ALREADY_INITIALIZED = "QDEVICE_ALREADY_INITIALIZED" +QDEVICE_CERTIFICATE_ACCEPTED_BY_NODE = "QDEVICE_CERTIFICATE_ACCEPTED_BY_NODE" +QDEVICE_CERTIFICATE_DISTRIBUTION_STARTED = "QDEVICE_CERTIFICATE_DISTRIBUTION_STARTED" +QDEVICE_CERTIFICATE_REMOVAL_STARTED = "QDEVICE_CERTIFICATE_REMOVAL_STARTED" +QDEVICE_CERTIFICATE_REMOVED_FROM_NODE = "QDEVICE_CERTIFICATE_REMOVED_FROM_NODE" +QDEVICE_CERTIFICATE_IMPORT_ERROR = "QDEVICE_CERTIFICATE_IMPORT_ERROR" +QDEVICE_CERTIFICATE_SIGN_ERROR = "QDEVICE_CERTIFICATE_SIGN_ERROR" QDEVICE_DESTROY_ERROR = "QDEVICE_DESTROY_ERROR" QDEVICE_DESTROY_SUCCESS = "QDEVICE_DESTROY_SUCCESS" +QDEVICE_GET_STATUS_ERROR = "QDEVICE_GET_STATUS_ERROR" QDEVICE_INITIALIZATION_ERROR = "QDEVICE_INITIALIZATION_ERROR" QDEVICE_INITIALIZATION_SUCCESS = "QDEVICE_INITIALIZATION_SUCCESS" QDEVICE_NOT_DEFINED = "QDEVICE_NOT_DEFINED" +QDEVICE_NOT_INITIALIZED = "QDEVICE_NOT_INITIALIZED" +QDEVICE_CLIENT_RELOAD_STARTED = "QDEVICE_CLIENT_RELOAD_STARTED" QDEVICE_REMOVE_OR_CLUSTER_STOP_NEEDED = "QDEVICE_REMOVE_OR_CLUSTER_STOP_NEEDED" REQUIRED_OPTION_IS_MISSING = "REQUIRED_OPTION_IS_MISSING" RESOURCE_CLEANUP_ERROR = "RESOURCE_CLEANUP_ERROR" @@ -106,12 +117,16 @@ SBD_ENABLING_STARTED = "SBD_ENABLING_STARTED" SBD_NOT_INSTALLED = "SBD_NOT_INSTALLED" SBD_NOT_ENABLED = "SBD_NOT_ENABLED" SERVICE_DISABLE_ERROR = "SERVICE_DISABLE_ERROR" +SERVICE_DISABLE_STARTED = "SERVICE_DISABLE_STARTED" SERVICE_DISABLE_SUCCESS = "SERVICE_DISABLE_SUCCESS" SERVICE_ENABLE_ERROR = "SERVICE_ENABLE_ERROR" +SERVICE_ENABLE_STARTED = "SERVICE_ENABLE_STARTED" +SERVICE_ENABLE_SKIPPED = "SERVICE_ENABLE_SKIPPED" SERVICE_ENABLE_SUCCESS = "SERVICE_ENABLE_SUCCESS" SERVICE_KILL_ERROR = "SERVICE_KILL_ERROR" SERVICE_KILL_SUCCESS = "SERVICE_KILL_SUCCESS" SERVICE_START_ERROR = "SERVICE_START_ERROR" +SERVICE_START_SKIPPED = "SERVICE_START_SKIPPED" SERVICE_START_STARTED = "SERVICE_START_STARTED" SERVICE_START_SUCCESS = "SERVICE_START_SUCCESS" SERVICE_STOP_ERROR = "SERVICE_STOP_ERROR" diff --git a/pcs/lib/commands/qdevice.py b/pcs/lib/commands/qdevice.py index c300a4c..1d1d85f 100644 --- a/pcs/lib/commands/qdevice.py +++ b/pcs/lib/commands/qdevice.py @@ -5,6 +5,9 @@ from __future__ import ( unicode_literals, ) +import base64 +import binascii + from pcs.lib import external, reports from pcs.lib.corosync import qdevice_net from pcs.lib.errors import LibraryError @@ -31,7 +34,7 @@ def qdevice_setup(lib_env, model, enable, start): def qdevice_destroy(lib_env, model): """ Stop and disable qdevice on local host and remove its configuration - string model qdevice model to initialize + string model qdevice model to destroy """ _ensure_not_cman(lib_env) _check_model(model) @@ -40,6 +43,22 @@ def qdevice_destroy(lib_env, model): qdevice_net.qdevice_destroy() lib_env.report_processor.process(reports.qdevice_destroy_success(model)) +def qdevice_status_text(lib_env, model, verbose=False, cluster=None): + """ + Get runtime status of a quorum device in plain text + string model qdevice model to query + bool verbose get more detailed output + string cluster show information only about specified cluster + """ + _ensure_not_cman(lib_env) + _check_model(model) + runner = lib_env.cmd_runner() + return ( + qdevice_net.qdevice_status_generic_text(runner, verbose) + + + qdevice_net.qdevice_status_cluster_text(runner, cluster, verbose) + ) + def qdevice_enable(lib_env, model): """ make qdevice start automatically on boot on local host @@ -80,6 +99,73 @@ def qdevice_kill(lib_env, model): _check_model(model) _service_kill(lib_env, qdevice_net.qdevice_kill) +def qdevice_net_sign_certificate_request( + lib_env, certificate_request, cluster_name +): + """ + Sign node certificate request by qnetd CA + string certificate_request base64 encoded certificate request + string cluster_name name of the cluster to which qdevice is being added + """ + _ensure_not_cman(lib_env) + try: + certificate_request_data = base64.b64decode(certificate_request) + except (TypeError, binascii.Error): + raise LibraryError(reports.invalid_option_value( + "qnetd certificate request", + certificate_request, + ["base64 encoded certificate"] + )) + return base64.b64encode( + qdevice_net.qdevice_sign_certificate_request( + lib_env.cmd_runner(), + certificate_request_data, + cluster_name + ) + ) + +def client_net_setup(lib_env, ca_certificate): + """ + Intialize qdevice net client on local host + ca_certificate base64 encoded qnetd CA certificate + """ + _ensure_not_cman(lib_env) + try: + ca_certificate_data = base64.b64decode(ca_certificate) + except (TypeError, binascii.Error): + raise LibraryError(reports.invalid_option_value( + "qnetd CA certificate", + ca_certificate, + ["base64 encoded certificate"] + )) + qdevice_net.client_setup(lib_env.cmd_runner(), ca_certificate_data) + +def client_net_import_certificate(lib_env, certificate): + """ + Import qnetd client certificate to local node certificate storage + certificate base64 encoded qnetd client certificate + """ + _ensure_not_cman(lib_env) + try: + certificate_data = base64.b64decode(certificate) + except (TypeError, binascii.Error): + raise LibraryError(reports.invalid_option_value( + "qnetd client certificate", + certificate, + ["base64 encoded certificate"] + )) + qdevice_net.client_import_certificate_and_key( + lib_env.cmd_runner(), + certificate_data + ) + +def client_net_destroy(lib_env): + """ + delete qdevice client config files on local host + """ + _ensure_not_cman(lib_env) + qdevice_net.client_destroy() + def _ensure_not_cman(lib_env): if lib_env.is_cman_cluster: raise LibraryError(reports.cman_unsupported_command()) diff --git a/pcs/lib/commands/quorum.py b/pcs/lib/commands/quorum.py index 1ee5411..aa00bbd 100644 --- a/pcs/lib/commands/quorum.py +++ b/pcs/lib/commands/quorum.py @@ -5,9 +5,18 @@ from __future__ import ( unicode_literals, ) - from pcs.lib import reports from pcs.lib.errors import LibraryError +from pcs.lib.corosync import ( + live as corosync_live, + qdevice_net, + qdevice_client +) +from pcs.lib.external import ( + NodeCommunicationException, + node_communicator_exception_to_report_item, + parallel_nodes_communication_helper, +) def get_config(lib_env): @@ -42,6 +51,21 @@ def set_options(lib_env, options, skip_offline_nodes=False): cfg.set_quorum_options(lib_env.report_processor, options) lib_env.push_corosync_conf(cfg, skip_offline_nodes) +def status_text(lib_env): + """ + Get quorum runtime status in plain text + """ + __ensure_not_cman(lib_env) + return corosync_live.get_quorum_status_text(lib_env.cmd_runner()) + +def status_device_text(lib_env, verbose=False): + """ + Get quorum device client runtime status in plain text + bool verbose get more detailed output + """ + __ensure_not_cman(lib_env) + return qdevice_client.get_status_text(lib_env.cmd_runner(), verbose) + def add_device( lib_env, model, model_options, generic_options, force_model=False, force_options=False, skip_offline_nodes=False @@ -58,6 +82,8 @@ def add_device( __ensure_not_cman(lib_env) cfg = lib_env.get_corosync_conf() + # Try adding qdevice to corosync.conf. This validates all the options and + # makes sure qdevice is not defined in corosync.conf yet. cfg.add_quorum_device( lib_env.report_processor, model, @@ -66,9 +92,131 @@ def add_device( force_model, force_options ) - # TODO validation, verification, certificates, etc. + + # First setup certificates for qdevice, then send corosync.conf to nodes. + # If anything fails, nodes will not have corosync.conf with qdevice in it, + # so there is no effect on the cluster. + if lib_env.is_corosync_conf_live: + # do model specific configuration + # if model is not known to pcs and was forced, do not configure antyhing + # else but corosync.conf, as we do not know what to do anyways + if model == "net": + _add_device_model_net( + lib_env, + # we are sure it's there, it was validated in add_quorum_device + model_options["host"], + cfg.get_cluster_name(), + cfg.get_nodes(), + skip_offline_nodes + ) + + lib_env.report_processor.process( + reports.service_enable_started("corosync-qdevice") + ) + communicator = lib_env.node_communicator() + parallel_nodes_communication_helper( + qdevice_client.remote_client_enable, + [ + [(lib_env.report_processor, communicator, node), {}] + for node in cfg.get_nodes() + ], + lib_env.report_processor, + skip_offline_nodes + ) + + # everything set up, it's safe to tell the nodes to use qdevice lib_env.push_corosync_conf(cfg, skip_offline_nodes) + # Now, when corosync.conf has been reloaded, we can start qdevice service. + if lib_env.is_corosync_conf_live: + lib_env.report_processor.process( + reports.service_start_started("corosync-qdevice") + ) + communicator = lib_env.node_communicator() + parallel_nodes_communication_helper( + qdevice_client.remote_client_start, + [ + [(lib_env.report_processor, communicator, node), {}] + for node in cfg.get_nodes() + ], + lib_env.report_processor, + skip_offline_nodes + ) + +def _add_device_model_net( + lib_env, qnetd_host, cluster_name, cluster_nodes, skip_offline_nodes +): + """ + setup cluster nodes for using qdevice model net + string qnetd_host address of qdevice provider (qnetd host) + string cluster_name name of the cluster to which qdevice is being added + NodeAddressesList cluster_nodes list of cluster nodes addresses + bool skip_offline_nodes continue even if not all nodes are accessible + """ + communicator = lib_env.node_communicator() + runner = lib_env.cmd_runner() + reporter = lib_env.report_processor + + reporter.process( + reports.qdevice_certificate_distribution_started() + ) + # get qnetd CA certificate + try: + qnetd_ca_cert = qdevice_net.remote_qdevice_get_ca_certificate( + communicator, + qnetd_host + ) + except NodeCommunicationException as e: + raise LibraryError( + node_communicator_exception_to_report_item(e) + ) + # init certificate storage on all nodes + parallel_nodes_communication_helper( + qdevice_net.remote_client_setup, + [ + ((communicator, node, qnetd_ca_cert), {}) + for node in cluster_nodes + ], + reporter, + skip_offline_nodes + ) + # create client certificate request + cert_request = qdevice_net.client_generate_certificate_request( + runner, + cluster_name + ) + # sign the request on qnetd host + try: + signed_certificate = qdevice_net.remote_sign_certificate_request( + communicator, + qnetd_host, + cert_request, + cluster_name + ) + except NodeCommunicationException as e: + raise LibraryError( + node_communicator_exception_to_report_item(e) + ) + # transform the signed certificate to pk12 format which can sent to nodes + pk12 = qdevice_net.client_cert_request_to_pk12(runner, signed_certificate) + # distribute final certificate to nodes + def do_and_report(reporter, communicator, node, pk12): + qdevice_net.remote_client_import_certificate_and_key( + communicator, node, pk12 + ) + reporter.process( + reports.qdevice_certificate_accepted_by_node(node.label) + ) + parallel_nodes_communication_helper( + do_and_report, + [ + ((reporter, communicator, node, pk12), {}) + for node in cluster_nodes + ], + reporter, + skip_offline_nodes + ) + def update_device( lib_env, model_options, generic_options, force_options=False, skip_offline_nodes=False @@ -98,9 +246,74 @@ def remove_device(lib_env, skip_offline_nodes=False): __ensure_not_cman(lib_env) cfg = lib_env.get_corosync_conf() + model, dummy_options, dummy_options = cfg.get_quorum_device_settings() cfg.remove_quorum_device() lib_env.push_corosync_conf(cfg, skip_offline_nodes) + if lib_env.is_corosync_conf_live: + # disable qdevice + lib_env.report_processor.process( + reports.service_disable_started("corosync-qdevice") + ) + communicator = lib_env.node_communicator() + parallel_nodes_communication_helper( + qdevice_client.remote_client_disable, + [ + [(lib_env.report_processor, communicator, node), {}] + for node in cfg.get_nodes() + ], + lib_env.report_processor, + skip_offline_nodes + ) + # stop qdevice + lib_env.report_processor.process( + reports.service_stop_started("corosync-qdevice") + ) + communicator = lib_env.node_communicator() + parallel_nodes_communication_helper( + qdevice_client.remote_client_stop, + [ + [(lib_env.report_processor, communicator, node), {}] + for node in cfg.get_nodes() + ], + lib_env.report_processor, + skip_offline_nodes + ) + # handle model specific configuration + if model == "net": + _remove_device_model_net( + lib_env, + cfg.get_nodes(), + skip_offline_nodes + ) + +def _remove_device_model_net(lib_env, cluster_nodes, skip_offline_nodes): + """ + remove configuration used by qdevice model net + NodeAddressesList cluster_nodes list of cluster nodes addresses + bool skip_offline_nodes continue even if not all nodes are accessible + """ + reporter = lib_env.report_processor + communicator = lib_env.node_communicator() + + reporter.process( + reports.qdevice_certificate_removal_started() + ) + def do_and_report(reporter, communicator, node): + qdevice_net.remote_client_destroy(communicator, node) + reporter.process( + reports.qdevice_certificate_removed_from_node(node.label) + ) + parallel_nodes_communication_helper( + do_and_report, + [ + [(reporter, communicator, node), {}] + for node in cluster_nodes + ], + lib_env.report_processor, + skip_offline_nodes + ) + def __ensure_not_cman(lib_env): if lib_env.is_corosync_conf_live and lib_env.is_cman_cluster: raise LibraryError(reports.cman_unsupported_command()) diff --git a/pcs/lib/corosync/config_facade.py b/pcs/lib/corosync/config_facade.py index 5a486ca..600a89b 100644 --- a/pcs/lib/corosync/config_facade.py +++ b/pcs/lib/corosync/config_facade.py @@ -22,6 +22,12 @@ class ConfigFacade(object): "last_man_standing_window", "wait_for_all", ) + QUORUM_OPTIONS_INCOMPATIBLE_WITH_QDEVICE = ( + "auto_tie_breaker", + "last_man_standing", + "last_man_standing_window", + ) + @classmethod def from_string(cls, config_string): @@ -52,6 +58,8 @@ class ConfigFacade(object): self._config = parsed_config # set to True if changes cannot be applied on running cluster self._need_stopped_cluster = False + # set to True if qdevice reload is required to apply changes + self._need_qdevice_reload = False @property def config(self): @@ -61,6 +69,17 @@ class ConfigFacade(object): def need_stopped_cluster(self): return self._need_stopped_cluster + @property + def need_qdevice_reload(self): + return self._need_qdevice_reload + + def get_cluster_name(self): + cluster_name = "" + for totem in self.config.get_sections("totem"): + for attrs in totem.get_attributes("cluster_name"): + cluster_name = attrs[1] + return cluster_name + def get_nodes(self): """ Get all defined nodes @@ -112,8 +131,9 @@ class ConfigFacade(object): def __validate_quorum_options(self, options): report_items = [] + has_qdevice = self.has_quorum_device() + qdevice_incompatible_options = [] for name, value in sorted(options.items()): - allowed_names = self.__class__.QUORUM_OPTIONS if name not in allowed_names: report_items.append( @@ -124,6 +144,13 @@ class ConfigFacade(object): if value == "": continue + if ( + has_qdevice + and + name in self.__class__.QUORUM_OPTIONS_INCOMPATIBLE_WITH_QDEVICE + ): + qdevice_incompatible_options.append(name) + if name == "last_man_standing_window": if not value.isdigit(): report_items.append(reports.invalid_option_value( @@ -137,6 +164,13 @@ class ConfigFacade(object): name, value, allowed_values )) + if qdevice_incompatible_options: + report_items.append( + reports.corosync_options_incompatible_with_qdevice( + qdevice_incompatible_options + ) + ) + return report_items def has_quorum_device(self): @@ -201,13 +235,13 @@ class ConfigFacade(object): force=force_options ) ) + # configuration cleanup - remove_need_stopped_cluster = { - "auto_tie_breaker": "", - "last_man_standing": "", - "last_man_standing_window": "", - } - need_stopped_cluster = False + remove_need_stopped_cluster = dict([ + (name, "") + for name in self.__class__.QUORUM_OPTIONS_INCOMPATIBLE_WITH_QDEVICE + ]) + # remove old device settings quorum_section_list = self.__ensure_section(self.config, "quorum") for quorum in quorum_section_list: for device in quorum.get_sections("device"): @@ -218,13 +252,19 @@ class ConfigFacade(object): and value not in ["", "0"] ): - need_stopped_cluster = True + self._need_stopped_cluster = True + # remove conflicting quorum options attrs_to_remove = { "allow_downscale": "", "two_node": "", } attrs_to_remove.update(remove_need_stopped_cluster) self.__set_section_options(quorum_section_list, attrs_to_remove) + # remove nodes' votes + for nodelist in self.config.get_sections("nodelist"): + for node in nodelist.get_sections("node"): + node.del_attributes_by_name("quorum_votes") + # add new configuration quorum = quorum_section_list[-1] new_device = config_parser.Section("device") @@ -234,12 +274,9 @@ class ConfigFacade(object): new_model = config_parser.Section(model) self.__set_section_options([new_model], model_options) new_device.add_section(new_model) + self.__update_qdevice_votes() self.__update_two_node() self.__remove_empty_sections(self.config) - # update_two_node sets self._need_stopped_cluster when changing an - # algorithm lms <-> 2nodelms. We don't care about that, it's not really - # a change, as there was no qdevice before. So we override it. - self._need_stopped_cluster = need_stopped_cluster def update_quorum_device( self, report_processor, model_options, generic_options, @@ -281,9 +318,10 @@ class ConfigFacade(object): model_sections.extend(device.get_sections(model)) self.__set_section_options(device_sections, generic_options) self.__set_section_options(model_sections, model_options) + self.__update_qdevice_votes() self.__update_two_node() self.__remove_empty_sections(self.config) - self._need_stopped_cluster = True + self._need_qdevice_reload = True def remove_quorum_device(self): """ @@ -369,7 +407,7 @@ class ConfigFacade(object): continue if name == "algorithm": - allowed_values = ("2nodelms", "ffsplit", "lms") + allowed_values = ("ffsplit", "lms") if value not in allowed_values: report_items.append(reports.invalid_option_value( name, value, allowed_values, severity, forceable @@ -461,19 +499,29 @@ class ConfigFacade(object): else: for quorum in self.config.get_sections("quorum"): quorum.del_attributes_by_name("two_node") - # update qdevice algorithm "lms" vs "2nodelms" + + def __update_qdevice_votes(self): + # ffsplit won't start if votes is missing or not set to 1 + # for other algorithms it's required not to put votes at all + model = None + algorithm = None + device_sections = [] for quorum in self.config.get_sections("quorum"): for device in quorum.get_sections("device"): - for net in device.get_sections("net"): - algorithm = None - for dummy_name, value in net.get_attributes("algorithm"): - algorithm = value - if algorithm == "lms" and has_two_nodes: - net.set_attribute("algorithm", "2nodelms") - self._need_stopped_cluster = True - elif algorithm == "2nodelms" and not has_two_nodes: - net.set_attribute("algorithm", "lms") - self._need_stopped_cluster = True + device_sections.append(device) + for dummy_name, value in device.get_attributes("model"): + model = value + for device in device_sections: + for model_section in device.get_sections(model): + for dummy_name, value in model_section.get_attributes( + "algorithm" + ): + algorithm = value + if model == "net": + if algorithm == "ffsplit": + self.__set_section_options(device_sections, {"votes": "1"}) + else: + self.__set_section_options(device_sections, {"votes": ""}) def __set_section_options(self, section_list, options): for section in section_list[:-1]: diff --git a/pcs/lib/corosync/live.py b/pcs/lib/corosync/live.py index 2446a46..4129aeb 100644 --- a/pcs/lib/corosync/live.py +++ b/pcs/lib/corosync/live.py @@ -47,3 +47,18 @@ def reload_config(runner): reports.corosync_config_reload_error(output.rstrip()) ) +def get_quorum_status_text(runner): + """ + Get runtime quorum status from the local node + """ + output, retval = runner.run([ + os.path.join(settings.corosync_binaries, "corosync-quorumtool"), + "-p" + ]) + # retval is 0 on success if node is not in partition with quorum + # retval is 1 on error OR on success if node has quorum + if retval not in [0, 1]: + raise LibraryError( + reports.corosync_quorum_get_status_error(output) + ) + return output diff --git a/pcs/lib/corosync/qdevice_client.py b/pcs/lib/corosync/qdevice_client.py new file mode 100644 index 0000000..98fbb0e --- /dev/null +++ b/pcs/lib/corosync/qdevice_client.py @@ -0,0 +1,93 @@ +from __future__ import ( + absolute_import, + division, + print_function, + unicode_literals, +) + +import os.path + +from pcs import settings +from pcs.lib import reports +from pcs.lib.errors import LibraryError + + +def get_status_text(runner, verbose=False): + """ + Get quorum device client runtime status in plain text + bool verbose get more detailed output + """ + cmd = [ + os.path.join(settings.corosync_binaries, "corosync-qdevice-tool"), + "-s" + ] + if verbose: + cmd.append("-v") + output, retval = runner.run(cmd) + if retval != 0: + raise LibraryError( + reports.corosync_quorum_get_status_error(output) + ) + return output + +def remote_client_enable(reporter, node_communicator, node): + """ + enable qdevice client service (corosync-qdevice) on a remote node + """ + response = node_communicator.call_node( + node, + "remote/qdevice_client_enable", + None + ) + if response == "corosync is not enabled, skipping": + reporter.process( + reports.service_enable_skipped( + "corosync-qdevice", + "corosync is not enabled", + node.label + ) + ) + else: + reporter.process( + reports.service_enable_success("corosync-qdevice", node.label) + ) + +def remote_client_disable(reporter, node_communicator, node): + """ + disable qdevice client service (corosync-qdevice) on a remote node + """ + node_communicator.call_node(node, "remote/qdevice_client_disable", None) + reporter.process( + reports.service_disable_success("corosync-qdevice", node.label) + ) + +def remote_client_start(reporter, node_communicator, node): + """ + start qdevice client service (corosync-qdevice) on a remote node + """ + response = node_communicator.call_node( + node, + "remote/qdevice_client_start", + None + ) + if response == "corosync is not running, skipping": + reporter.process( + reports.service_start_skipped( + "corosync-qdevice", + "corosync is not running", + node.label + ) + ) + else: + reporter.process( + reports.service_start_success("corosync-qdevice", node.label) + ) + +def remote_client_stop(reporter, node_communicator, node): + """ + stop qdevice client service (corosync-qdevice) on a remote node + """ + node_communicator.call_node(node, "remote/qdevice_client_stop", None) + reporter.process( + reports.service_stop_success("corosync-qdevice", node.label) + ) diff --git a/pcs/lib/corosync/qdevice_net.py b/pcs/lib/corosync/qdevice_net.py index 7479257..4054592 100644 --- a/pcs/lib/corosync/qdevice_net.py +++ b/pcs/lib/corosync/qdevice_net.py @@ -5,8 +5,14 @@ from __future__ import ( unicode_literals, ) +import base64 +import binascii +import functools +import os import os.path +import re import shutil +import tempfile from pcs import settings from pcs.lib import external, reports @@ -15,6 +21,18 @@ from pcs.lib.errors import LibraryError __model = "net" __service_name = "corosync-qnetd" +__qnetd_certutil = os.path.join( + settings.corosync_qnet_binaries, + "corosync-qnetd-certutil" +) +__qnetd_tool = os.path.join( + settings.corosync_qnet_binaries, + "corosync-qnetd-tool" +) +__qdevice_certutil = os.path.join( + settings.corosync_binaries, + "corosync-qdevice-net-certutil" +) def qdevice_setup(runner): """ @@ -24,25 +42,63 @@ def qdevice_setup(runner): raise LibraryError(reports.qdevice_already_initialized(__model)) output, retval = runner.run([ - os.path.join(settings.corosync_binaries, "corosync-qnetd-certutil"), - "-i" + __qnetd_certutil, "-i" ]) if retval != 0: raise LibraryError( reports.qdevice_initialization_error(__model, output.rstrip()) ) +def qdevice_initialized(): + """ + check if qdevice server certificate database has been initialized + """ + return os.path.exists(os.path.join( + settings.corosync_qdevice_net_server_certs_dir, + "cert8.db" + )) + def qdevice_destroy(): """ delete qdevice configuration on local host """ try: - shutil.rmtree(settings.corosync_qdevice_net_server_certs_dir) + if qdevice_initialized(): + shutil.rmtree(settings.corosync_qdevice_net_server_certs_dir) except EnvironmentError as e: raise LibraryError( reports.qdevice_destroy_error(__model, e.strerror) ) +def qdevice_status_generic_text(runner, verbose=False): + """ + get qdevice runtime status in plain text + bool verbose get more detailed output + """ + cmd = [__qnetd_tool, "-s"] + if verbose: + cmd.append("-v") + output, retval = runner.run(cmd) + if retval != 0: + raise LibraryError(reports.qdevice_get_status_error(__model, output)) + return output + +def qdevice_status_cluster_text(runner, cluster=None, verbose=False): + """ + get qdevice runtime status in plain text + bool verbose get more detailed output + string cluster show information only about specified cluster + """ + cmd = [__qnetd_tool, "-l"] + if verbose: + cmd.append("-v") + if cluster: + cmd.extend(["-c", cluster]) + output, retval = runner.run(cmd) + if retval != 0: + raise LibraryError(reports.qdevice_get_status_error(__model, output)) + return output + def qdevice_enable(runner): """ make qdevice start automatically on boot on local host @@ -72,3 +128,255 @@ def qdevice_kill(runner): kill qdevice now on local host """ external.kill_services(runner, [__service_name]) + +def qdevice_sign_certificate_request(runner, cert_request, cluster_name): + """ + sign client certificate request + cert_request certificate request data + string cluster_name name of the cluster to which qdevice is being added + """ + if not qdevice_initialized(): + raise LibraryError(reports.qdevice_not_initialized(__model)) + # save the certificate request, corosync tool only works with files + tmpfile = _store_to_tmpfile( + cert_request, + reports.qdevice_certificate_sign_error + ) + # sign the request + output, retval = runner.run([ + __qnetd_certutil, "-s", "-c", tmpfile.name, "-n", cluster_name + ]) + tmpfile.close() # temp file is deleted on close + if retval != 0: + raise LibraryError( + reports.qdevice_certificate_sign_error(output.strip()) + ) + # get signed certificate, corosync tool only works with files + return _get_output_certificate( + output, + reports.qdevice_certificate_sign_error + ) + +def client_setup(runner, ca_certificate): + """ + initialize qdevice client on local host + ca_certificate qnetd CA certificate + """ + client_destroy() + # save CA certificate, corosync tool only works with files + ca_file_path = os.path.join( + settings.corosync_qdevice_net_client_certs_dir, + settings.corosync_qdevice_net_client_ca_file_name + ) + try: + if not os.path.exists(ca_file_path): + os.makedirs( + settings.corosync_qdevice_net_client_certs_dir, + mode=0o700 + ) + with open(ca_file_path, "wb") as ca_file: + ca_file.write(ca_certificate) + except EnvironmentError as e: + raise LibraryError( + reports.qdevice_initialization_error(__model, e.strerror) + ) + # initialize client's certificate storage + output, retval = runner.run([ + __qdevice_certutil, "-i", "-c", ca_file_path + ]) + if retval != 0: + raise LibraryError( + reports.qdevice_initialization_error(__model, output.rstrip()) + ) + +def client_initialized(): + """ + check if qdevice net client certificate database has been initialized + """ + return os.path.exists(os.path.join( + settings.corosync_qdevice_net_client_certs_dir, + "cert8.db" + )) + +def client_destroy(): + """ + delete qdevice client config files on local host + """ + try: + if client_initialized(): + shutil.rmtree(settings.corosync_qdevice_net_client_certs_dir) + except EnvironmentError as e: + raise LibraryError( + reports.qdevice_destroy_error(__model, e.strerror) + ) + +def client_generate_certificate_request(runner, cluster_name): + """ + create a certificate request which can be signed by qnetd server + string cluster_name name of the cluster to which qdevice is being added + """ + if not client_initialized(): + raise LibraryError(reports.qdevice_not_initialized(__model)) + output, retval = runner.run([ + __qdevice_certutil, "-r", "-n", cluster_name + ]) + if retval != 0: + raise LibraryError( + reports.qdevice_initialization_error(__model, output.rstrip()) + ) + return _get_output_certificate( + output, + functools.partial(reports.qdevice_initialization_error, __model) + ) + +def client_cert_request_to_pk12(runner, cert_request): + """ + transform signed certificate request to pk12 certificate which can be + imported to nodes + cert_request signed certificate request + """ + if not client_initialized(): + raise LibraryError(reports.qdevice_not_initialized(__model)) + # save the signed certificate request, corosync tool only works with files + tmpfile = _store_to_tmpfile( + cert_request, + reports.qdevice_certificate_import_error + ) + # transform it + output, retval = runner.run([ + __qdevice_certutil, "-M", "-c", tmpfile.name + ]) + tmpfile.close() # temp file is deleted on close + if retval != 0: + raise LibraryError( + reports.qdevice_certificate_import_error(output) + ) + # get resulting pk12, corosync tool only works with files + return _get_output_certificate( + output, + reports.qdevice_certificate_import_error + ) + +def client_import_certificate_and_key(runner, pk12_certificate): + """ + import qdevice client certificate to the local node certificate storage + """ + if not client_initialized(): + raise LibraryError(reports.qdevice_not_initialized(__model)) + # save the certificate, corosync tool only works with files + tmpfile = _store_to_tmpfile( + pk12_certificate, + reports.qdevice_certificate_import_error + ) + output, retval = runner.run([ + __qdevice_certutil, "-m", "-c", tmpfile.name + ]) + tmpfile.close() # temp file is deleted on close + if retval != 0: + raise LibraryError( + reports.qdevice_certificate_import_error(output) + ) + +def remote_qdevice_get_ca_certificate(node_communicator, host): + """ + connect to a qnetd host and get qnetd CA certificate + string host address of the qnetd host + """ + try: + return base64.b64decode( + node_communicator.call_host( + host, + "remote/qdevice_net_get_ca_certificate", + None + ) + ) + except (TypeError, binascii.Error): + raise LibraryError(reports.invalid_response_format(host)) + +def remote_client_setup(node_communicator, node, qnetd_ca_certificate): + """ + connect to a remote node and initialize qdevice there + NodeAddresses node target node + qnetd_ca_certificate qnetd CA certificate + """ + return node_communicator.call_node( + node, + "remote/qdevice_net_client_init_certificate_storage", + external.NodeCommunicator.format_data_dict([ + ("ca_certificate", base64.b64encode(qnetd_ca_certificate)), + ]) + ) + +def remote_sign_certificate_request( + node_communicator, host, cert_request, cluster_name +): + """ + connect to a qdevice host and sign node certificate there + string host address of the qnetd host + cert_request certificate request to be signed + string cluster_name name of the cluster to which qdevice is being added + """ + try: + return base64.b64decode( + node_communicator.call_host( + host, + "remote/qdevice_net_sign_node_certificate", + external.NodeCommunicator.format_data_dict([ + ("certificate_request", base64.b64encode(cert_request)), + ("cluster_name", cluster_name), + ]) + ) + ) + except (TypeError, binascii.Error): + raise LibraryError(reports.invalid_response_format(host)) + +def remote_client_import_certificate_and_key(node_communicator, node, pk12): + """ + import pk12 certificate on a remote node + NodeAddresses node target node + pk12 certificate + """ + return node_communicator.call_node( + node, + "remote/qdevice_net_client_import_certificate", + external.NodeCommunicator.format_data_dict([ + ("certificate", base64.b64encode(pk12)), + ]) + ) + +def remote_client_destroy(node_communicator, node): + """ + delete qdevice client config files on a remote node + NodeAddresses node target node + """ + return node_communicator.call_node( + node, + "remote/qdevice_net_client_destroy", + None + ) + +def _store_to_tmpfile(data, report_func): + try: + tmpfile = tempfile.NamedTemporaryFile(mode="wb", suffix=".pcs") + tmpfile.write(data) + tmpfile.flush() + return tmpfile + except EnvironmentError as e: + raise LibraryError(report_func(e.strerror)) + +def _get_output_certificate(cert_tool_output, report_func): + regexp = re.compile(r"^Certificate( request)? stored in (?P.+)$") + filename = None + for line in cert_tool_output.splitlines(): + match = regexp.search(line) + if match: + filename = match.group("path") + if not filename: + raise LibraryError(report_func(cert_tool_output)) + try: + with open(filename, "rb") as cert_file: + return cert_file.read() + except EnvironmentError as e: + raise LibraryError(report_func( + "{path}: {error}".format(path=filename, error=e.strerror) + )) diff --git a/pcs/lib/env.py b/pcs/lib/env.py index 1151891..24e4252 100644 --- a/pcs/lib/env.py +++ b/pcs/lib/env.py @@ -10,6 +10,7 @@ from lxml import etree from pcs.lib import reports from pcs.lib.external import ( is_cman_cluster, + is_service_running, CommandRunner, NodeCommunicator, ) @@ -21,6 +22,7 @@ from pcs.lib.corosync.live import ( from pcs.lib.nodes_task import ( distribute_corosync_conf, check_corosync_offline_on_nodes, + qdevice_reload_on_nodes, ) from pcs.lib.pacemaker import ( get_cib, @@ -152,11 +154,18 @@ class LibraryEnvironment(object): corosync_conf_data, skip_offline_nodes ) - if not corosync_conf_facade.need_stopped_cluster: + if is_service_running(self.cmd_runner(), "corosync"): reload_corosync_config(self.cmd_runner()) self.report_processor.process( reports.corosync_config_reloaded() ) + if corosync_conf_facade.need_qdevice_reload: + qdevice_reload_on_nodes( + self.node_communicator(), + self.report_processor, + node_list, + skip_offline_nodes + ) else: self._corosync_conf_data = corosync_conf_data diff --git a/pcs/lib/errors.py b/pcs/lib/errors.py index c0bd3d1..9cab5e9 100644 --- a/pcs/lib/errors.py +++ b/pcs/lib/errors.py @@ -42,4 +42,8 @@ class ReportItem(object): self.message = self.message_pattern.format(**self.info) def __repr__(self): - return self.code+": "+str(self.info) + return "{severity} {code}: {info}".format( + severity=self.severity, + code=self.code, + info=self.info + ) diff --git a/pcs/lib/external.py b/pcs/lib/external.py index 34426f9..c773e5a 100644 --- a/pcs/lib/external.py +++ b/pcs/lib/external.py @@ -49,7 +49,11 @@ except ImportError: from pcs.lib import reports from pcs.lib.errors import LibraryError, ReportItemSeverity -from pcs.common.tools import simple_cache +from pcs.common import report_codes +from pcs.common.tools import ( + simple_cache, + run_parallel as tools_run_parallel, +) from pcs import settings @@ -521,7 +525,7 @@ class NodeCommunicator(object): # text in response body with HTTP code 400 # we need to be backward compatible with that raise NodeCommandUnsuccessfulException( - host, request, response_data + host, request, response_data.rstrip() ) elif e.code == 401: raise NodeAuthenticationException( @@ -581,3 +585,39 @@ class NodeCommunicator(object): base64.b64encode(" ".join(self._groups).encode("utf-8")) )) return cookies + + +def parallel_nodes_communication_helper( + func, func_args_kwargs, reporter, skip_offline_nodes=False +): + """ + Help running node calls in parallel and handle communication exceptions. + Raise LibraryError on any failure. + + function func function to be run, should be a function calling a node + iterable func_args_kwargs list of tuples: (*args, **kwargs) + bool skip_offline_nodes do not raise LibraryError if a node is unreachable + """ + failure_severity = ReportItemSeverity.ERROR + failure_forceable = report_codes.SKIP_OFFLINE_NODES + if skip_offline_nodes: + failure_severity = ReportItemSeverity.WARNING + failure_forceable = None + report_items = [] + + def _parallel(*args, **kwargs): + try: + func(*args, **kwargs) + except NodeCommunicationException as e: + report_items.append( + node_communicator_exception_to_report_item( + e, + failure_severity, + failure_forceable + ) + ) + except LibraryError as e: + report_items.extend(e.args) + + tools_run_parallel(_parallel, func_args_kwargs) + reporter.process_list(report_items) diff --git a/pcs/lib/nodes_task.py b/pcs/lib/nodes_task.py index b9a61f6..e94d327 100644 --- a/pcs/lib/nodes_task.py +++ b/pcs/lib/nodes_task.py @@ -8,14 +8,19 @@ from __future__ import ( import json from pcs.common import report_codes +from pcs.common.tools import run_parallel as tools_run_parallel from pcs.lib import reports -from pcs.lib.errors import ReportItemSeverity +from pcs.lib.errors import LibraryError, ReportItemSeverity from pcs.lib.external import ( NodeCommunicator, NodeCommunicationException, node_communicator_exception_to_report_item, + parallel_nodes_communication_helper, +) +from pcs.lib.corosync import ( + live as corosync_live, + qdevice_client, ) -from pcs.lib.corosync import live as corosync_live def distribute_corosync_conf( @@ -33,11 +38,9 @@ def distribute_corosync_conf( if skip_offline_nodes: failure_severity = ReportItemSeverity.WARNING failure_forceable = None - - reporter.process(reports.corosync_config_distribution_started()) report_items = [] - # TODO use parallel communication - for node in node_addr_list: + + def _parallel(node): try: corosync_live.set_remote_corosync_conf( node_communicator, @@ -62,6 +65,12 @@ def distribute_corosync_conf( failure_forceable ) ) + + reporter.process(reports.corosync_config_distribution_started()) + tools_run_parallel( + _parallel, + [((node, ), {}) for node in node_addr_list] + ) reporter.process_list(report_items) def check_corosync_offline_on_nodes( @@ -77,13 +86,11 @@ def check_corosync_offline_on_nodes( if skip_offline_nodes: failure_severity = ReportItemSeverity.WARNING failure_forceable = None - - reporter.process(reports.corosync_not_running_check_started()) report_items = [] - # TODO use parallel communication - for node in node_addr_list: + + def _parallel(node): try: - status = node_communicator.call_node(node, "remote/status", "") + status = node_communicator.call_node(node, "remote/status", None) if not json.loads(status)["corosync"]: reporter.process( reports.corosync_not_running_on_node_ok(node.label) @@ -115,8 +122,48 @@ def check_corosync_offline_on_nodes( failure_forceable ) ) + + reporter.process(reports.corosync_not_running_check_started()) + tools_run_parallel( + _parallel, + [((node, ), {}) for node in node_addr_list] + ) reporter.process_list(report_items) +def qdevice_reload_on_nodes( + node_communicator, reporter, node_addr_list, skip_offline_nodes=False +): + """ + Reload corosync-qdevice configuration on cluster nodes + NodeAddressesList node_addr_list nodes to reload config on + bool skip_offline_nodes don't raise an error on node communication errors + """ + reporter.process(reports.qdevice_client_reload_started()) + parallel_params = [ + [(reporter, node_communicator, node), {}] + for node in node_addr_list + ] + # catch an exception so we try to start qdevice on nodes where we stopped it + report_items = [] + try: + parallel_nodes_communication_helper( + qdevice_client.remote_client_stop, + parallel_params, + reporter, + skip_offline_nodes + ) + except LibraryError as e: + report_items.extend(e.args) + try: + parallel_nodes_communication_helper( + qdevice_client.remote_client_start, + parallel_params, + reporter, + skip_offline_nodes + ) + except LibraryError as e: + report_items.extend(e.args) + reporter.process_list(report_items) def node_check_auth(communicator, node): """ diff --git a/pcs/lib/reports.py b/pcs/lib/reports.py index 490b4ff..d8f88cd 100644 --- a/pcs/lib/reports.py +++ b/pcs/lib/reports.py @@ -552,6 +552,19 @@ def corosync_running_on_node_fail(node): info={"node": node} ) +def corosync_quorum_get_status_error(reason): + """ + unable to get runtime status of quorum on local node + string reason an error message + """ + return ReportItem.error( + report_codes.COROSYNC_QUORUM_GET_STATUS_ERROR, + "Unable to get quorum status: {reason}", + info={ + "reason": reason, + } + ) + def corosync_config_reloaded(): """ corosync configuration has been reloaded @@ -614,6 +627,21 @@ def corosync_config_parser_other_error(): "Unable to parse corosync config" ) +def corosync_options_incompatible_with_qdevice(options): + """ + cannot set specified corosync options when qdevice is in use + iterable options incompatible options names + """ + return ReportItem.error( + report_codes.COROSYNC_OPTIONS_INCOMPATIBLE_WITH_QDEVICE, + "These options cannot be set when the cluster uses a quorum device: " + + "{options_names_str}", + info={ + "options_names": options, + "options_names_str": ", ".join(sorted(options)), + } + ) + def qdevice_already_defined(): """ qdevice is already set up in a cluster, when it was expected not to be @@ -641,6 +669,15 @@ def qdevice_remove_or_cluster_stop_needed(): "You need to stop the cluster or remove qdevice from cluster to continue" ) +def qdevice_client_reload_started(): + """ + qdevice client configuration is about to be reloaded on nodes + """ + return ReportItem.info( + report_codes.QDEVICE_CLIENT_RELOAD_STARTED, + "Reloading qdevice configuration on nodes..." + ) + def qdevice_already_initialized(model): """ cannot create qdevice on local host, it has been already created @@ -654,6 +691,19 @@ def qdevice_already_initialized(model): } ) +def qdevice_not_initialized(model): + """ + cannot work with qdevice on local host, it has not been created yet + string model qdevice model + """ + return ReportItem.error( + report_codes.QDEVICE_NOT_INITIALIZED, + "Quorum device '{model}' has not been initialized yet", + info={ + "model": model, + } + ) + def qdevice_initialization_success(model): """ qdevice was successfully initialized on local host @@ -682,6 +732,72 @@ def qdevice_initialization_error(model, reason): } ) +def qdevice_certificate_distribution_started(): + """ + Qdevice certificates are about to be set up on nodes + """ + return ReportItem.info( + report_codes.QDEVICE_CERTIFICATE_DISTRIBUTION_STARTED, + "Setting up qdevice certificates on nodes..." + ) + +def qdevice_certificate_accepted_by_node(node): + """ + Qdevice certificates have been saved to a node + string node node on which certificates have been saved + """ + return ReportItem.info( + report_codes.QDEVICE_CERTIFICATE_ACCEPTED_BY_NODE, + "{node}: Succeeded", + info={"node": node} + ) + +def qdevice_certificate_removal_started(): + """ + Qdevice certificates are about to be removed from nodes + """ + return ReportItem.info( + report_codes.QDEVICE_CERTIFICATE_REMOVAL_STARTED, + "Removing qdevice certificates from nodes..." + ) + +def qdevice_certificate_removed_from_node(node): + """ + Qdevice certificates have been removed from a node + string node node on which certificates have been deleted + """ + return ReportItem.info( + report_codes.QDEVICE_CERTIFICATE_REMOVED_FROM_NODE, + "{node}: Succeeded", + info={"node": node} + ) + +def qdevice_certificate_import_error(reason): + """ + an error occured when importing qdevice certificate to a node + string reason an error message + """ + return ReportItem.error( + report_codes.QDEVICE_CERTIFICATE_IMPORT_ERROR, + "Unable to import quorum device certificate: {reason}", + info={ + "reason": reason, + } + ) + +def qdevice_certificate_sign_error(reason): + """ + an error occured when signing qdevice certificate + string reason an error message + """ + return ReportItem.error( + report_codes.QDEVICE_CERTIFICATE_SIGN_ERROR, + "Unable to sign quorum device certificate: {reason}", + info={ + "reason": reason, + } + ) + def qdevice_destroy_success(model): """ qdevice configuration successfully removed from local host @@ -710,6 +826,21 @@ def qdevice_destroy_error(model, reason): } ) +def qdevice_get_status_error(model, reason): + """ + unable to get runtime status of qdevice + string model qdevice model + string reason an error message + """ + return ReportItem.error( + report_codes.QDEVICE_GET_STATUS_ERROR, + "Unable to get status of quorum device '{model}': {reason}", + info={ + "model": model, + "reason": reason, + } + ) + def cman_unsupported_command(): """ requested library command is not available as local cluster is CMAN based @@ -1022,31 +1153,55 @@ def service_start_started(service): } ) -def service_start_error(service, reason): +def service_start_error(service, reason, node=None): """ system service start failed string service service name or description string reason error message + string node node on which service has been requested to start """ + msg = "Unable to start {service}: {reason}" return ReportItem.error( report_codes.SERVICE_START_ERROR, - "Unable to start {service}: {reason}", + msg if node is None else "{node}: " + msg, info={ "service": service, "reason": reason, + "node": node, } ) -def service_start_success(service): +def service_start_success(service, node=None): """ system service was started successfully string service service name or description + string node node on which service has been requested to start """ + msg = "{service} started" return ReportItem.info( report_codes.SERVICE_START_SUCCESS, - "{service} started", + msg if node is None else "{node}: " + msg, info={ "service": service, + "node": node, + } + ) + +def service_start_skipped(service, reason, node=None): + """ + starting system service was skipped, no error occured + string service service name or description + string reason why the start has been skipped + string node node on which service has been requested to start + """ + msg = "not starting {service} - {reason}" + return ReportItem.info( + report_codes.SERVICE_START_SKIPPED, + msg if node is None else "{node}: " + msg, + info={ + "service": service, + "reason": reason, + "node": node, } ) @@ -1063,31 +1218,37 @@ def service_stop_started(service): } ) -def service_stop_error(service, reason): +def service_stop_error(service, reason, node=None): """ system service stop failed string service service name or description string reason error message + string node node on which service has been requested to stop """ + msg = "Unable to stop {service}: {reason}" return ReportItem.error( report_codes.SERVICE_STOP_ERROR, - "Unable to stop {service}: {reason}", + msg if node is None else "{node}: " + msg, info={ "service": service, "reason": reason, + "node": node, } ) -def service_stop_success(service): +def service_stop_success(service, node=None): """ system service was stopped successfully string service service name or description + string node node on which service has been requested to stop """ + msg = "{service} stopped" return ReportItem.info( report_codes.SERVICE_STOP_SUCCESS, - "{service} stopped", + msg if node is None else "{node}: " + msg, info={ "service": service, + "node": node, } ) @@ -1121,6 +1282,19 @@ def service_kill_success(services): } ) +def service_enable_started(service): + """ + system service is being enabled + string service service name or description + """ + return ReportItem.info( + report_codes.SERVICE_ENABLE_STARTED, + "Enabling {service}...", + info={ + "service": service, + } + ) + def service_enable_error(service, reason, node=None): """ system service enable failed @@ -1143,7 +1317,7 @@ def service_enable_success(service, node=None): """ system service was enabled successfully string service service name or description - string node node on which service was enabled + string node node on which service has been enabled """ msg = "{service} enabled" return ReportItem.info( @@ -1155,6 +1329,37 @@ def service_enable_success(service, node=None): } ) +def service_enable_skipped(service, reason, node=None): + """ + enabling system service was skipped, no error occured + string service service name or description + string reason why the enabling has been skipped + string node node on which service has been requested to enable + """ + msg = "not enabling {service} - {reason}" + return ReportItem.info( + report_codes.SERVICE_ENABLE_SKIPPED, + msg if node is None else "{node}: " + msg, + info={ + "service": service, + "reason": reason, + "node": node, + } + ) + +def service_disable_started(service): + """ + system service is being disabled + string service service name or description + """ + return ReportItem.info( + report_codes.SERVICE_DISABLE_STARTED, + "Disabling {service}...", + info={ + "service": service, + } + ) + def service_disable_error(service, reason, node=None): """ system service disable failed @@ -1189,7 +1394,6 @@ def service_disable_success(service, node=None): } ) - def invalid_metadata_format(severity=ReportItemSeverity.ERROR, forceable=None): """ Invalid format of metadata @@ -1201,7 +1405,6 @@ def invalid_metadata_format(severity=ReportItemSeverity.ERROR, forceable=None): forceable=forceable ) - def unable_to_get_agent_metadata( agent, reason, severity=ReportItemSeverity.ERROR, forceable=None ): diff --git a/pcs/pcs.8 b/pcs/pcs.8 index 425b613..a72a9bd 100644 --- a/pcs/pcs.8 +++ b/pcs/pcs.8 @@ -518,8 +518,11 @@ rule remove Remove a rule if a rule id is specified, if rule is last rule in its constraint, the constraint will be removed. .SS "qdevice" .TP +status [\fB\-\-full\fR] [] +Show runtime status of specified model of quorum device provider. Using \fB\-\-full\fR will give more detailed output. If is specified, only information about the specified cluster will be displayed. +.TP setup model [\fB\-\-enable\fR] [\fB\-\-start\fR] -Configure specified model of quorum device provider. Quorum device then may be added to clusters by "pcs quorum device add" command. \fB\-\-start\fR will also start the provider. \fB\-\-enable\fR will configure the provider to start on boot. +Configure specified model of quorum device provider. Quorum device then can be added to clusters by running "pcs quorum device add" command in a cluster. \fB\-\-start\fR will also start the provider. \fB\-\-enable\fR will configure the provider to start on boot. .TP destroy Disable and stop specified model of quorum device provider and delete its configuration files. @@ -531,7 +534,7 @@ stop Stop specified model of quorum device provider. .TP kill -Force specified model of quorum device provider to stop (performs kill -9). +Force specified model of quorum device provider to stop (performs kill \-9). Note that init system (e.g. systemd) can detect that the qdevice is not running and start it again. If you want to stop the qdevice, run "pcs qdevice stop" command. .TP enable Configure specified model of quorum device provider to start on boot. @@ -543,14 +546,22 @@ Configure specified model of quorum device provider to not start on boot. config Show quorum configuration. .TP -device add [generic options] model [model options] -Add quorum device to cluster. Quorum device needs to be created first by "pcs qdevice setup" command. +status +Show quorum runtime status. +.TP +device add [] model [] +Add a quorum device to the cluster. Quorum device needs to be created first by "pcs qdevice setup" command. It is not possible to use more than one quorum device in a cluster simultaneously. Generic options, model and model options are all documented in corosync's corosync\-qdevice(8) man page. .TP device remove -Remove quorum device from cluster. +Remove a quorum device from the cluster. .TP -device update [generic options] [model ] -Add/Change quorum device options. Requires cluster to be stopped. +device status [\fB\-\-full\fR] +Show quorum device runtime status. Using \fB\-\-full\fR will give more detailed output. +.TP +device update [] [model ] +Add/Change quorum device options. Generic options and model options are all documented in corosync's corosync\-qdevice(8) man page. Requires the cluster to be stopped. + +WARNING: If you want to change "host" option of qdevice model net, use "pcs quorum device remove" and "pcs quorum device add" commands to set up configuration properly unless old and new host is the same machine. .TP unblock [\fB\-\-force\fR] Cancel waiting for all nodes when establishing quorum. Useful in situations where you know the cluster is inquorate, but you are confident that the cluster should proceed with resource management regardless. This command should ONLY be used when nodes which the cluster is waiting for have been confirmed to be powered off and to have no access to shared resources. @@ -558,7 +569,7 @@ Cancel waiting for all nodes when establishing quorum. Useful in situations whe .B WARNING: If the nodes are not actually powered off or they do have access to shared resources, data corruption/cluster failure can occur. To prevent accidental running of this command, \-\-force or interactive user response is required in order to proceed. .TP update [auto_tie_breaker=[0|1]] [last_man_standing=[0|1]] [last_man_standing_window=[