diff --git a/SOURCES/cephadm b/SOURCES/cephadm index 40da060..abe5986 100644 --- a/SOURCES/cephadm +++ b/SOURCES/cephadm @@ -1,7 +1,14 @@ #!/usr/bin/python3 -DEFAULT_IMAGE = 'docker.io/ceph/daemon-base:latest-master-devel' -DEFAULT_IMAGE_IS_MASTER = True +# Default container images ----------------------------------------------------- +DEFAULT_IMAGE = 'docker.io/ceph/daemon-base:latest-pacific-devel' +DEFAULT_IMAGE_IS_MASTER = False +DEFAULT_PROMETHEUS_IMAGE = "docker.io/prom/prometheus:v2.18.1" +DEFAULT_NODE_EXPORTER_IMAGE = "docker.io/prom/node-exporter:v0.18.1" +DEFAULT_GRAFANA_IMAGE = "docker.io/ceph/ceph-grafana:6.7.4" +DEFAULT_ALERT_MANAGER_IMAGE = "docker.io/prom/alertmanager:v0.20.0" +# ------------------------------------------------------------------------------ + LATEST_STABLE_RELEASE = 'pacific' DATA_DIR = '/var/lib/ceph' LOG_DIR = '/var/log/ceph' @@ -11,9 +18,10 @@ UNIT_DIR = '/etc/systemd/system' LOG_DIR_MODE = 0o770 DATA_DIR_MODE = 0o700 CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker +MIN_PODMAN_VERSION = (2, 0, 2) CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ ' DEFAULT_TIMEOUT = None # in seconds -DEFAULT_RETRY = 10 +DEFAULT_RETRY = 15 SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf' SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring' @@ -37,6 +45,8 @@ You can invoke cephadm in two ways: injected_stdin = '...' """ +import asyncio +import asyncio.subprocess import argparse import datetime import fcntl @@ -73,28 +83,13 @@ from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, import re import uuid +from configparser import ConfigParser from functools import wraps from glob import glob +from io import StringIO from threading import Thread, RLock - -if sys.version_info >= (3, 0): - from io import StringIO -else: - from StringIO import StringIO - -if sys.version_info >= (3, 2): - from configparser import ConfigParser -else: - from ConfigParser import SafeConfigParser - -if sys.version_info >= (3, 0): - from urllib.request import urlopen - from urllib.error import HTTPError -else: - from urllib2 import urlopen, HTTPError - -if sys.version_info > (3, 0): - unicode = str +from urllib.error import HTTPError +from urllib.request import urlopen cached_stdin = None @@ -145,7 +140,7 @@ class CephadmContext: return "func" in self._args - def has(self, name: str) -> bool: + def __contains__(self, name: str) -> bool: return hasattr(self, name) @@ -237,7 +232,7 @@ class Monitoring(object): components = { "prometheus": { - "image": "docker.io/prom/prometheus:v2.18.1", + "image": DEFAULT_PROMETHEUS_IMAGE, "cpus": '2', "memory": '4GB', "args": [ @@ -250,7 +245,7 @@ class Monitoring(object): ], }, "node-exporter": { - "image": "docker.io/prom/node-exporter:v0.18.1", + "image": DEFAULT_NODE_EXPORTER_IMAGE, "cpus": "1", "memory": "1GB", "args": [ @@ -258,7 +253,7 @@ class Monitoring(object): ], }, "grafana": { - "image": "docker.io/ceph/ceph-grafana:6.7.4", + "image": DEFAULT_GRAFANA_IMAGE, "cpus": "2", "memory": "4GB", "args": [], @@ -270,7 +265,7 @@ class Monitoring(object): ], }, "alertmanager": { - "image": "docker.io/prom/alertmanager:v0.20.0", + "image": DEFAULT_ALERT_MANAGER_IMAGE, "cpus": "2", "memory": "2GB", "args": [ @@ -1184,6 +1179,124 @@ class CallVerbosity(Enum): VERBOSE = 3 +if sys.version_info < (3, 8): + import itertools + import threading + import warnings + from asyncio import events + + class ThreadedChildWatcher(asyncio.AbstractChildWatcher): + """Threaded child watcher implementation. + The watcher uses a thread per process + for waiting for the process finish. + It doesn't require subscription on POSIX signal + but a thread creation is not free. + The watcher has O(1) complexity, its performance doesn't depend + on amount of spawn processes. + """ + + def __init__(self): + self._pid_counter = itertools.count(0) + self._threads = {} + + def is_active(self): + return True + + def close(self): + self._join_threads() + + def _join_threads(self): + """Internal: Join all non-daemon threads""" + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive() and not thread.daemon] + for thread in threads: + thread.join() + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + pass + + def __del__(self, _warn=warnings.warn): + threads = [thread for thread in list(self._threads.values()) + if thread.is_alive()] + if threads: + _warn(f"{self.__class__} has registered but not finished child processes", + ResourceWarning, + source=self) + + def add_child_handler(self, pid, callback, *args): + loop = events.get_event_loop() + thread = threading.Thread(target=self._do_waitpid, + name=f"waitpid-{next(self._pid_counter)}", + args=(loop, pid, callback, args), + daemon=True) + self._threads[pid] = thread + thread.start() + + def remove_child_handler(self, pid): + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classe requires it + return True + + def attach_loop(self, loop): + pass + + def _do_waitpid(self, loop, expected_pid, callback, args): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + if os.WIFEXITED(status): + returncode = os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + returncode = -os.WTERMSIG(status) + else: + raise ValueError(f'unknown wait status {status}') + if loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is closed", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + self._threads.pop(expected_pid) + + # unlike SafeChildWatcher which handles SIGCHLD in the main thread, + # ThreadedChildWatcher runs in a separated thread, hence allows us to + # run create_subprocess_exec() in non-main thread, see + # https://bugs.python.org/issue35621 + asyncio.set_child_watcher(ThreadedChildWatcher()) + + +try: + from asyncio import run as async_run # type: ignore[attr-defined] +except ImportError: + def async_run(coro): # type: ignore + loop = asyncio.new_event_loop() + try: + asyncio.set_event_loop(loop) + return loop.run_until_complete(coro) + finally: + try: + loop.run_until_complete(loop.shutdown_asyncgens()) + finally: + asyncio.set_event_loop(None) + loop.close() + def call(ctx: CephadmContext, command: List[str], desc: Optional[str] = None, @@ -1200,119 +1313,50 @@ def call(ctx: CephadmContext, :param timeout: timeout in seconds """ - if desc is None: - desc = command[0] - if desc: - desc += ': ' + prefix = command[0] if desc is None else desc + if prefix: + prefix += ': ' timeout = timeout or ctx.timeout logger.debug("Running command: %s" % ' '.join(command)) - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - close_fds=True, - **kwargs - ) - # get current p.stdout flags, add O_NONBLOCK - assert process.stdout is not None - assert process.stderr is not None - stdout_flags = fcntl.fcntl(process.stdout, fcntl.F_GETFL) - stderr_flags = fcntl.fcntl(process.stderr, fcntl.F_GETFL) - fcntl.fcntl(process.stdout, fcntl.F_SETFL, stdout_flags | os.O_NONBLOCK) - fcntl.fcntl(process.stderr, fcntl.F_SETFL, stderr_flags | os.O_NONBLOCK) - - out = '' - err = '' - reads = None - stop = False - out_buffer = '' # partial line (no newline yet) - err_buffer = '' # partial line (no newline yet) - start_time = time.time() - end_time = None - if timeout: - end_time = start_time + timeout - while not stop: - if end_time and (time.time() >= end_time): - stop = True - if process.poll() is None: - logger.info(desc + 'timeout after %s seconds' % timeout) - process.kill() - if reads and process.poll() is not None: - # we want to stop, but first read off anything remaining - # on stdout/stderr - stop = True + + async def tee(reader: asyncio.StreamReader) -> str: + collected = StringIO() + async for line in reader: + message = line.decode('utf-8') + collected.write(message) + if verbosity == CallVerbosity.VERBOSE: + logger.info(prefix + message.rstrip()) + elif verbosity != CallVerbosity.SILENT: + logger.debug(prefix + message.rstrip()) + return collected.getvalue() + + async def run_with_timeout() -> Tuple[str, str, int]: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE) + assert process.stdout + assert process.stderr + try: + stdout, stderr = await asyncio.gather(tee(process.stdout), + tee(process.stderr)) + returncode = await asyncio.wait_for(process.wait(), timeout) + except asyncio.TimeoutError: + logger.info(prefix + f'timeout after {timeout} seconds') + return '', '', 124 else: - reads, _, _ = select.select( - [process.stdout.fileno(), process.stderr.fileno()], - [], [], timeout - ) - for fd in reads: - try: - message = str() - message_b = os.read(fd, 1024) - if isinstance(message_b, bytes): - message = message_b.decode('utf-8') - elif isinstance(message_b, str): - message = message_b - else: - assert False - if stop and message: - # process has terminated, but have more to read still, so not stopping yet - # (os.read returns '' when it encounters EOF) - stop = False - if not message: - continue - if fd == process.stdout.fileno(): - out += message - message = out_buffer + message - lines = message.split('\n') - out_buffer = lines.pop() - for line in lines: - if verbosity == CallVerbosity.VERBOSE: - logger.info(desc + 'stdout ' + line) - elif verbosity != CallVerbosity.SILENT: - logger.debug(desc + 'stdout ' + line) - elif fd == process.stderr.fileno(): - err += message - message = err_buffer + message - lines = message.split('\n') - err_buffer = lines.pop() - for line in lines: - if verbosity == CallVerbosity.VERBOSE: - logger.info(desc + 'stderr ' + line) - elif verbosity != CallVerbosity.SILENT: - logger.debug(desc + 'stderr ' + line) - else: - assert False - except (IOError, OSError): - pass - if verbosity == CallVerbosity.VERBOSE: - logger.debug(desc + 'profile rt=%s, stop=%s, exit=%s, reads=%s' - % (time.time()-start_time, stop, process.poll(), reads)) - - returncode = process.wait() - - if out_buffer != '': - if verbosity == CallVerbosity.VERBOSE: - logger.info(desc + 'stdout ' + out_buffer) - elif verbosity != CallVerbosity.SILENT: - logger.debug(desc + 'stdout ' + out_buffer) - if err_buffer != '': - if verbosity == CallVerbosity.VERBOSE: - logger.info(desc + 'stderr ' + err_buffer) - elif verbosity != CallVerbosity.SILENT: - logger.debug(desc + 'stderr ' + err_buffer) + return stdout, stderr, returncode + stdout, stderr, returncode = async_run(run_with_timeout()) if returncode != 0 and verbosity == CallVerbosity.VERBOSE_ON_FAILURE: - # dump stdout + stderr - logger.info('Non-zero exit code %d from %s' % (returncode, ' '.join(command))) - for line in out.splitlines(): - logger.info(desc + 'stdout ' + line) - for line in err.splitlines(): - logger.info(desc + 'stderr ' + line) - - return out, err, returncode + logger.info('Non-zero exit code %d from %s', + returncode, ' '.join(command)) + for line in stdout.splitlines(): + logger.info(prefix + 'stdout ' + line) + for line in stderr.splitlines(): + logger.info(prefix + 'stderr ' + line) + return stdout, stderr, returncode def call_throws( @@ -1339,32 +1383,10 @@ def call_timeout(ctx, command, timeout): logger.debug(msg) raise TimeoutExpired(msg) - def call_timeout_py2(command, timeout): - # type: (List[str], int) -> int - proc = subprocess.Popen(command) - thread = Thread(target=proc.wait) - thread.start() - thread.join(timeout) - if thread.is_alive(): - proc.kill() - thread.join() - raise_timeout(command, timeout) - return proc.returncode - - def call_timeout_py3(command, timeout): - # type: (List[str], int) -> int - try: - return subprocess.call(command, timeout=timeout) - except subprocess.TimeoutExpired as e: - raise_timeout(command, timeout) - - ret = 1 - if sys.version_info >= (3, 3): - ret = call_timeout_py3(command, timeout) - else: - # py2 subprocess has no timeout arg - ret = call_timeout_py2(command, timeout) - return ret + try: + return subprocess.call(command, timeout=timeout) + except subprocess.TimeoutExpired as e: + raise_timeout(command, timeout) ################################## @@ -1393,29 +1415,14 @@ def is_available(ctx, what, func): % (what, num, retry)) num += 1 - time.sleep(1) + time.sleep(2) def read_config(fn): # type: (Optional[str]) -> ConfigParser - # bend over backwards here because py2's ConfigParser doesn't like - # whitespace before config option names (e.g., '\n foo = bar\n'). - # Yeesh! - if sys.version_info >= (3, 2): - cp = ConfigParser() - else: - cp = SafeConfigParser() - + cp = ConfigParser() if fn: - with open(fn, 'r') as f: - raw_conf = f.read() - nice_conf = re.sub(r'\n(\s)+', r'\n', raw_conf) - s_io = StringIO(nice_conf) - if sys.version_info >= (3, 2): - cp.read_file(s_io) - else: - cp.readfp(s_io) - + cp.read(fn) return cp @@ -1569,7 +1576,7 @@ def infer_fsid(func): if not is_fsid(daemon['fsid']): # 'unknown' fsid continue - elif ctx.has("name") or not ctx.name: + elif "name" not in ctx or not ctx.name: # ctx.name not specified fsids_set.add(daemon['fsid']) elif daemon['name'] == ctx.name: @@ -1654,7 +1661,7 @@ def default_image(func): @wraps(func) def _default_image(ctx: CephadmContext): if not ctx.image: - if ctx.has("name") and ctx.name: + if "name" in ctx and ctx.name: type_ = ctx.name.split('.', 1)[0] if type_ in Monitoring.components: ctx.image = Monitoring.components[type_]['image'] @@ -1875,6 +1882,29 @@ def find_program(filename): return name +def find_container_engine(ctx): + # type: (CephadmContext) -> str + if ctx.docker: + return find_program('docker') + else: + for i in CONTAINER_PREFERENCE: + try: + return find_program(i) + except Exception as e: + logger.debug('Could not locate %s: %s' % (i, e)) + return '' + + +def check_container_engine(ctx): + # type: (CephadmContext) -> None + engine = os.path.basename(ctx.container_path) if ctx.container_path else None + if engine not in CONTAINER_PREFERENCE: + raise Error('Unable to locate any of %s' % CONTAINER_PREFERENCE) + elif engine == 'podman': + if get_podman_version(ctx, ctx.container_path) < MIN_PODMAN_VERSION: + raise Error('podman version %d.%d.%d or later is required' % MIN_PODMAN_VERSION) + + def get_unit_name(fsid, daemon_type, daemon_id=None): # type: (str, str, Optional[Union[int, str]]) -> str # accept either name or type + id @@ -1947,6 +1977,13 @@ def check_units(ctx, units, enabler=None): return False +def is_container_running(ctx: CephadmContext, name: str) -> bool: + out, err, ret = call_throws(ctx, [ + ctx.container_path, 'ps', + '--format', '{{.Names}}']) + return name in out + + def get_legacy_config_fsid(cluster, legacy_dir=None): # type: (str, Optional[str]) -> Optional[str] config_file = '/etc/ceph/%s.conf' % cluster @@ -2140,18 +2177,18 @@ def get_config_and_keyring(ctx): config = None keyring = None - if ctx.has("config_json") and ctx.config_json: + if "config_json" in ctx and ctx.config_json: d = get_parm(ctx.config_json) config = d.get('config') keyring = d.get('keyring') - if ctx.has("config") and ctx.config: + if "config" in ctx and ctx.config: with open(ctx.config, 'r') as f: config = f.read() - if ctx.has("key") and ctx.key: + if "key" in ctx and ctx.key: keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key) - elif ctx.has("keyring") and ctx.keyring: + elif "keyring" in ctx and ctx.keyring: with open(ctx.keyring, 'r') as f: keyring = f.read() @@ -2208,6 +2245,12 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, mounts['/run/udev'] = '/run/udev' if daemon_type == 'osd': mounts['/sys'] = '/sys' # for numa.cc, pick_address, cgroups, ... + # selinux-policy in the container may not match the host. + if HostFacts(ctx).selinux_enabled: + selinux_folder = '/var/lib/ceph/%s/selinux' % fsid + if not os.path.exists(selinux_folder): + os.makedirs(selinux_folder, mode=0o755) + mounts[selinux_folder] = '/sys/fs/selinux:ro' mounts['/run/lvm'] = '/run/lvm' mounts['/run/lock/lvm'] = '/run/lock/lvm' @@ -2251,7 +2294,7 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, if daemon_type == HAproxy.daemon_type: assert daemon_id - data_dir = get_data_dir(fsid, daemon_type, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) mounts.update(HAproxy.get_container_mounts(data_dir)) if daemon_type == CephIscsi.daemon_type: @@ -2262,7 +2305,7 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, if daemon_type == Keepalived.daemon_type: assert daemon_id - data_dir = get_data_dir(fsid, daemon_type, daemon_type, daemon_id) + data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) mounts.update(Keepalived.get_container_mounts(data_dir)) if daemon_type == CustomContainer.daemon_type: @@ -2345,11 +2388,13 @@ def get_container(ctx: CephadmContext, # so service can have Type=Forking if 'podman' in ctx.container_path: runtime_dir = '/run' - container_args.extend(['-d', + container_args.extend([ + '-d', '--log-driver', 'journald', '--conmon-pidfile', runtime_dir + '/ceph-%s@%s.%s.service-pid' % (fsid, daemon_type, daemon_id), '--cidfile', - runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id)]) + runtime_dir + '/ceph-%s@%s.%s.service-cid' % (fsid, daemon_type, daemon_id), + ]) return CephContainer( ctx, @@ -3082,7 +3127,7 @@ def command_inspect_image(ctx): # type: (CephadmContext) -> int out, err, ret = call_throws(ctx, [ ctx.container_path, 'inspect', - '--format', '{{.ID}},{{json .RepoDigests}}', + '--format', '{{.ID}},{{.RepoDigests}}', ctx.image]) if ret: return errno.ENOENT @@ -3096,22 +3141,44 @@ def command_inspect_image(ctx): def get_image_info_from_inspect(out, image): - # type: (str, str) -> Dict[str, str] + # type: (str, str) -> Dict[str, Union[str,List[str]]] image_id, digests = out.split(',', 1) if not out: raise Error('inspect {}: empty result'.format(image)) r = { 'image_id': normalize_container_id(image_id) - } + } # type: Dict[str, Union[str,List[str]]] if digests: - json_digests = json.loads(digests) - if json_digests: - r['repo_digest'] = json_digests[0] + r['repo_digests'] = digests[1:-1].split(' ') return r ################################## +def check_subnet(subnets:str) -> Tuple[int, List[int], str]: + """Determine whether the given string is a valid subnet + + :param subnets: subnet string, a single definition or comma separated list of CIDR subnets + :returns: return code, IP version list of the subnets and msg describing any errors validation errors + """ + rc = 0 + versions = set() + errors = [] + subnet_list = subnets.split(',') + for subnet in subnet_list: + # ensure the format of the string is as expected address/netmask + if not re.search(r'\/\d+$', subnet): + rc = 1 + errors.append(f"{subnet} is not in CIDR format (address/netmask)") + continue + try: + v = ipaddress.ip_network(subnet).version + versions.add(v) + except ValueError as e: + rc = 1 + errors.append(f"{subnet} invalid: {str(e)}") + + return rc, list(versions), ", ".join(errors) def unwrap_ipv6(address): # type: (str) -> str @@ -3127,7 +3194,7 @@ def wrap_ipv6(address): # it's already wrapped it'll not pass (like if it's a hostname) and trigger # the ValueError try: - if ipaddress.ip_address(unicode(address)).version == 6: + if ipaddress.ip_address(address).version == 6: return f"[{address}]" except ValueError: pass @@ -3139,7 +3206,7 @@ def is_ipv6(address): # type: (str) -> bool address = unwrap_ipv6(address) try: - return ipaddress.ip_address(unicode(address)).version == 6 + return ipaddress.ip_address(address).version == 6 except ValueError: logger.warning("Address: {} isn't a valid IP address".format(address)) return False @@ -3199,8 +3266,8 @@ def prepare_mon_addresses( # make sure IP is configured locally, and then figure out the # CIDR network for net, ips in list_networks(ctx).items(): - if ipaddress.ip_address(unicode(unwrap_ipv6(base_ip))) in \ - [ipaddress.ip_address(unicode(ip)) for ip in ips]: + if ipaddress.ip_address(unwrap_ipv6(base_ip)) in \ + [ipaddress.ip_address(ip) for ip in ips]: mon_network = net logger.info('Mon IP %s is in CIDR network %s' % (base_ip, mon_network)) @@ -3208,10 +3275,29 @@ def prepare_mon_addresses( if not mon_network: raise Error('Failed to infer CIDR network for mon ip %s; pass ' '--skip-mon-network to configure it later' % base_ip) - + return (addr_arg, ipv6, mon_network) +def prepare_cluster_network(ctx: CephadmContext) -> Tuple[str, bool]: + cluster_network = "" + ipv6_cluster_network = False + # the cluster network may not exist on this node, so all we can do is + # validate that the address given is valid ipv4 or ipv6 subnet + if ctx.cluster_network: + rc, versions, err_msg = check_subnet(ctx.cluster_network) + if rc: + raise Error(f"Invalid --cluster-network parameter: {err_msg}") + cluster_network = ctx.cluster_network + ipv6_cluster_network = True if 6 in versions else False + else: + logger.info("- internal network (--cluster-network) has not " + "been provided, OSD replication will default to " + "the public_network") + + return cluster_network, ipv6_cluster_network + + def create_initial_keys( ctx: CephadmContext, uid: int, gid: int, @@ -3473,10 +3559,17 @@ def prepare_ssh( except RuntimeError as e: raise Error('Failed to add host <%s>: %s' % (host, e)) - if not ctx.orphan_initial_daemons: - for t in ['mon', 'mgr', 'crash']: + for t in ['mon', 'mgr', 'crash']: + if ctx.orphan_initial_daemons: logger.info('Deploying %s service with default placement...' % t) - cli(['orch', 'apply', t]) + cli(['orch', 'apply', t, '--unmanaged']) + else: + logger.info('Deploying unmanaged %s service...' % t) + cli(['orch', 'apply', t, '--unmanaged']) + + if not ctx.orphan_initial_daemons: + logger.info('Deploying crash service with default placement...') + cli(['orch', 'apply', 'crash']) if not ctx.skip_monitoring_stack: logger.info('Enabling mgr prometheus module...') @@ -3570,7 +3663,8 @@ def finish_bootstrap_config( config: str, mon_id: str, mon_dir: str, mon_network: Optional[str], ipv6: bool, - cli: Callable + cli: Callable, + cluster_network: Optional[str], ipv6_cluster_network: bool ) -> None: if not ctx.no_minimize_config: @@ -3599,11 +3693,15 @@ def finish_bootstrap_config( ]) if mon_network: - logger.info('Setting mon public_network...') + logger.info(f"Setting mon public_network to {mon_network}") cli(['config', 'set', 'mon', 'public_network', mon_network]) + + if cluster_network: + logger.info(f"Setting cluster_network to {cluster_network}") + cli(['config', 'set', 'global', 'cluster_network', cluster_network]) - if ipv6: - logger.info('Enabling IPv6 (ms_bind_ipv6)') + if ipv6 or ipv6_cluster_network: + logger.info('Enabling IPv6 (ms_bind_ipv6) binding') cli(['config', 'set', 'global', 'ms_bind_ipv6', 'true']) @@ -3663,6 +3761,8 @@ def command_bootstrap(ctx): l.acquire() (addr_arg, ipv6, mon_network) = prepare_mon_addresses(ctx) + cluster_network, ipv6_cluster_network = prepare_cluster_network(ctx) + config = prepare_bootstrap_config(ctx, fsid, addr_arg, ctx.image) logger.info('Extracting ceph user uid/gid from container image...') @@ -3712,7 +3812,8 @@ def command_bootstrap(ctx): wait_for_mon(ctx, mon_id, mon_dir, admin_keyring.name, tmp_config.name) finish_bootstrap_config(ctx, fsid, config, mon_id, mon_dir, - mon_network, ipv6, cli) + mon_network, ipv6, cli, + cluster_network, ipv6_cluster_network) # output files with open(ctx.output_keyring, 'w') as f: @@ -3726,8 +3827,13 @@ def command_bootstrap(ctx): # wait for mgr to restart (after enabling a module) def wait_for_mgr_restart(): - # first get latest mgrmap epoch from the mon - out = cli(['mgr', 'dump']) + # first get latest mgrmap epoch from the mon. try newer 'mgr + # stat' command first, then fall back to 'mgr dump' if + # necessary + try: + out = cli(['mgr', 'stat']) + except Exception as e: + out = cli(['mgr', 'dump']) j = json.loads(out) epoch = j['epoch'] # wait for mgr to have it @@ -3812,7 +3918,7 @@ def command_bootstrap(ctx): logger.info('Please consider enabling telemetry to help improve Ceph:\n\n' '\tceph telemetry on\n\n' 'For more information see:\n\n' - '\thttps://docs.ceph.com/docs/master/mgr/telemetry/\n') + '\thttps://docs.ceph.com/docs/pacific/mgr/telemetry/\n') logger.info('Bootstrap complete.') return 0 @@ -3890,8 +3996,9 @@ def command_deploy(ctx): redeploy = False unit_name = get_unit_name(ctx.fsid, daemon_type, daemon_id) + container_name = 'ceph-%s-%s.%s' % (ctx.fsid, daemon_type, daemon_id) (_, state, _) = check_unit(ctx, unit_name) - if state == 'running': + if state == 'running' or is_container_running(ctx, container_name): redeploy = True if ctx.reconfig: @@ -3903,8 +4010,12 @@ def command_deploy(ctx): # Get and check ports explicitly required to be opened daemon_ports = [] # type: List[int] - if ctx.tcp_ports: - daemon_ports = list(map(int, ctx.tcp_ports.split())) + + # only check port in use if not reconfig or redeploy since service + # we are redeploying/reconfiguring will already be using the port + if not ctx.reconfig and not redeploy: + if ctx.tcp_ports: + daemon_ports = list(map(int, ctx.tcp_ports.split())) if daemon_type in Ceph.daemons: config, keyring = get_config_and_keyring(ctx) @@ -4287,7 +4398,7 @@ def _parse_ipv6_route(routes, ips): ip = m[0][0] # find the network it belongs to net = [n for n in r.keys() - if ipaddress.ip_address(unicode(ip)) in ipaddress.ip_network(unicode(n))] + if ipaddress.ip_address(ip) in ipaddress.ip_network(n)] if net: r[net[0]].append(ip) @@ -4322,6 +4433,9 @@ def list_daemons(ctx, detail=True, legacy_dir=None): # keep track of ceph versions we see seen_versions = {} # type: Dict[str, Optional[str]] + # keep track of image digests + seen_digests = {} # type: Dict[str, List[str]] + # /var/lib/ceph if os.path.exists(data_dir): for i in os.listdir(data_dir): @@ -4378,19 +4492,14 @@ def list_daemons(ctx, detail=True, legacy_dir=None): container_id = None image_name = None image_id = None + image_digests = None version = None start_stamp = None - if 'podman' in container_path and \ - get_podman_version(ctx, container_path) < (1, 6, 2): - image_field = '.ImageID' - else: - image_field = '.Image' - out, err, code = call(ctx, [ container_path, 'inspect', - '--format', '{{.Id}},{{.Config.Image}},{{%s}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}' % image_field, + '--format', '{{.Id}},{{.Config.Image}},{{.Image}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}', 'ceph-%s-%s' % (fsid, j) ], verbosity=CallVerbosity.DEBUG) @@ -4400,6 +4509,22 @@ def list_daemons(ctx, detail=True, legacy_dir=None): image_id = normalize_container_id(image_id) daemon_type = name.split('.', 1)[0] start_stamp = try_convert_datetime(start) + + # collect digests for this image id + image_digests = seen_digests.get(image_id) + if not image_digests: + out, err, code = call( + ctx, + [ + container_path, 'image', 'inspect', image_name, + '--format', '{{.RepoDigests}}', + ], + verbosity=CallVerbosity.DEBUG) + if not code: + image_digests = out.strip()[1:-1].split(' ') + seen_digests[image_id] = image_digests + + # identify software version inside the container (if we can) if not version or '.' not in version: version = seen_versions.get(image_id, None) if daemon_type == NFSGanesha.daemon_type: @@ -4467,6 +4592,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None): val['container_id'] = container_id val['container_image_name'] = image_name val['container_image_id'] = image_id + val['container_image_digests'] = image_digests val['version'] = version val['started'] = start_stamp val['created'] = get_file_timestamp( @@ -5014,19 +5140,11 @@ def command_check_host(ctx: CephadmContext) -> None: errors = [] commands = ['systemctl', 'lvcreate'] - if ctx.docker: - container_path = find_program('docker') - else: - for i in CONTAINER_PREFERENCE: - try: - container_path = find_program(i) - break - except Exception as e: - logger.debug('Could not locate %s: %s' % (i, e)) - if not container_path: - errors.append('ERROR: Unable to locate a supported container engine ({})'.format(' or '.join(CONTAINER_PREFERENCE))) - else: - logger.info('podman|docker (%s) is present' % container_path) + try: + check_container_engine(ctx) + logger.info('podman|docker (%s) is present' % container_path) + except Error as e: + errors.append(str(e)) for command in commands: try: @@ -5039,7 +5157,7 @@ def command_check_host(ctx: CephadmContext) -> None: if not check_time_sync(ctx): errors.append('ERROR: No time synchronization is active') - if ctx.has("expect_hostname") and ctx.expect_hostname: + if "expect_hostname" in ctx and ctx.expect_hostname: if get_hostname().lower() != ctx.expect_hostname.lower(): errors.append('ERROR: hostname "%s" does not match expected hostname "%s"' % ( get_hostname(), ctx.expect_hostname)) @@ -5059,7 +5177,10 @@ def command_prepare_host(ctx: CephadmContext) -> None: logger.info('Verifying podman|docker is present...') pkg = None - if not container_path: + try: + check_container_engine(ctx) + except Error as e: + logger.warning(str(e)) if not pkg: pkg = create_packager(ctx) pkg.install_podman() @@ -5079,7 +5200,7 @@ def command_prepare_host(ctx: CephadmContext) -> None: # the service check_time_sync(ctx, enabler=pkg) - if ctx.has("expect_hostname") and ctx.expect_hostname and ctx.expect_hostname != get_hostname(): + if "expect_hostname" in ctx and ctx.expect_hostname and ctx.expect_hostname != get_hostname(): logger.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), ctx.expect_hostname)) call_throws(ctx, ['hostname', ctx.expect_hostname]) with open('/etc/hostname', 'w') as f: @@ -5267,13 +5388,13 @@ class Apt(Packager): def install(self, ls): logger.info('Installing packages %s...' % ls) - call_throws(self.ctx, ['apt', 'install', '-y'] + ls) + call_throws(self.ctx, ['apt-get', 'install', '-y'] + ls) def install_podman(self): if self.distro == 'ubuntu': logger.info('Setting up repo for podman...') self.add_kubic_repo() - call_throws(self.ctx, ['apt', 'update']) + call_throws(self.ctx, ['apt-get', 'update']) logger.info('Attempting podman install...') try: @@ -6056,10 +6177,11 @@ class HostFacts(): up_secs, _ = raw_time.split() return float(up_secs) + @property def kernel_security(self): - # type: () -> Optional[Dict[str, str]] + # type: () -> Dict[str, str] """Determine the security features enabled in the kernel - SELinux, AppArmor""" - def _fetch_selinux() -> Optional[Dict[str, str]]: + def _fetch_selinux() -> Dict[str, str]: """Read the selinux config file to determine state""" security = {} for selinux_path in HostFacts._selinux_path_list: @@ -6076,9 +6198,9 @@ class HostFacts(): else: security['description'] = "SELinux: Enabled({}, {})".format(security['SELINUX'], security['SELINUXTYPE']) return security - return None + return {} - def _fetch_apparmor() -> Optional[Dict[str, str]]: + def _fetch_apparmor() -> Dict[str, str]: """Read the apparmor profiles directly, returning an overview of AppArmor status""" security = {} for apparmor_path in HostFacts._apparmor_path_list: @@ -6103,9 +6225,9 @@ class HostFacts(): security['description'] += "({})".format(summary_str) return security - return None + return {} - ret = None + ret = {} if os.path.exists('/sys/kernel/security/lsm'): lsm = read_file(['/sys/kernel/security/lsm']).strip() if 'selinux' in lsm: @@ -6118,7 +6240,7 @@ class HostFacts(): "description": "Linux Security Module framework is active, but is not using SELinux or AppArmor" } - if ret is not None: + if ret: return ret return { @@ -6127,6 +6249,11 @@ class HostFacts(): } @property + def selinux_enabled(self): + return (self.kernel_security["type"] == "SELinux") and \ + (self.kernel_security["description"] != "SELinux: Disabled") + + @property def kernel_parameters(self): # type: () -> Dict[str, str] """Get kernel parameters required/used in Ceph clusters""" @@ -7290,7 +7417,7 @@ def _get_parser(): parser_bootstrap.add_argument( '--orphan-initial-daemons', action='store_true', - help='Do not create initial mon, mgr, and crash service specs') + help='Set mon and mgr service to `unmanaged`, Do not create the crash service') parser_bootstrap.add_argument( '--skip-monitoring-stack', action='store_true', @@ -7328,6 +7455,9 @@ def _get_parser(): '--exporter-config', action=CustomValidation, help=f'Exporter configuration information in JSON format (providing: {", ".join(CephadmDaemon.config_requirements)}, port information)') + parser_bootstrap.add_argument( + '--cluster-network', + help='subnet to use for cluster replication, recovery and heartbeats (in CIDR notation network/mask)') parser_deploy = subparsers.add_parser( 'deploy', help='deploy a daemon') @@ -7528,23 +7658,6 @@ def cephadm_init(args: List[str]) -> Optional[CephadmContext]: sys.stderr.write("No command specified; pass -h or --help for usage\n") return None - ctx.container_path = "" - if ctx.func != command_check_host: - if ctx.docker: - ctx.container_path = find_program("docker") - else: - for i in CONTAINER_PREFERENCE: - try: - ctx.container_path = find_program(i) - break - except Exception as e: - logger.debug("Could not locate %s: %s" % (i, e)) - if not ctx.container_path and ctx.func != command_prepare_host\ - and ctx.func != command_add_repo: - sys.stderr.write("Unable to locate any of %s\n" % - CONTAINER_PREFERENCE) - return None - return ctx @@ -7566,11 +7679,17 @@ def main(): sys.exit(1) try: + # podman or docker? + ctx.container_path = find_container_engine(ctx) + if ctx.func not in \ + [command_check_host, command_prepare_host, command_add_repo]: + check_container_engine(ctx) + # command handler r = ctx.func(ctx) except Error as e: if ctx.verbose: raise - sys.stderr.write('ERROR: %s\n' % e) + logger.error('ERROR: %s' % e) sys.exit(1) if not r: r = 0 diff --git a/SPECS/cephadm.spec b/SPECS/cephadm.spec index 6db8521..29b92a5 100644 --- a/SPECS/cephadm.spec +++ b/SPECS/cephadm.spec @@ -1,14 +1,14 @@ # Upstream ceph commit upon which this package is based: -# patches_base=74275226ac79999bfd40e683dc9a1309e76033bf +# patches_base=ca0faa0a628c041f5b5313060344006830812e25 Name: cephadm Epoch: 2 Version: 16.1.0 -Release: 100%{?dist} +Release: 568%{?dist} Summary: Utility to bootstrap Ceph clusters License: LGPL-2.1 URL: https://ceph.io -Source0: https://github.com/ceph/ceph/raw/74275226ac79999bfd40e683dc9a1309e76033bf/src/cephadm/cephadm +Source0: https://github.com/ceph/ceph/raw/ca0faa0a628c041f5b5313060344006830812e25/src/cephadm/cephadm Source1: COPYING-LGPL2.1 BuildArch: noarch @@ -53,6 +53,3 @@ exit 0 %attr(0600,cephadm,cephadm) %{_sharedstatedir}/cephadm/.ssh/authorized_keys %changelog -* Wed Feb 10 2021 Ken Dreyer - 16.1.0-100 -- initial package -