diff --git a/SOURCES/cephadm b/SOURCES/cephadm index abe5986..beb9bf0 100644 --- a/SOURCES/cephadm +++ b/SOURCES/cephadm @@ -1,50 +1,5 @@ #!/usr/bin/python3 -# Default container images ----------------------------------------------------- -DEFAULT_IMAGE = 'docker.io/ceph/daemon-base:latest-pacific-devel' -DEFAULT_IMAGE_IS_MASTER = False -DEFAULT_PROMETHEUS_IMAGE = "docker.io/prom/prometheus:v2.18.1" -DEFAULT_NODE_EXPORTER_IMAGE = "docker.io/prom/node-exporter:v0.18.1" -DEFAULT_GRAFANA_IMAGE = "docker.io/ceph/ceph-grafana:6.7.4" -DEFAULT_ALERT_MANAGER_IMAGE = "docker.io/prom/alertmanager:v0.20.0" -# ------------------------------------------------------------------------------ - -LATEST_STABLE_RELEASE = 'pacific' -DATA_DIR = '/var/lib/ceph' -LOG_DIR = '/var/log/ceph' -LOCK_DIR = '/run/cephadm' -LOGROTATE_DIR = '/etc/logrotate.d' -UNIT_DIR = '/etc/systemd/system' -LOG_DIR_MODE = 0o770 -DATA_DIR_MODE = 0o700 -CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker -MIN_PODMAN_VERSION = (2, 0, 2) -CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ ' -DEFAULT_TIMEOUT = None # in seconds -DEFAULT_RETRY = 15 -SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf' -SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring' - -""" -You can invoke cephadm in two ways: - -1. The normal way, at the command line. - -2. By piping the script to the python3 binary. In this latter case, you should - prepend one or more lines to the beginning of the script. - - For arguments, - - injected_argv = [...] - - e.g., - - injected_argv = ['ls'] - - For reading stdin from the '--config-json -' argument, - - injected_stdin = '...' -""" import asyncio import asyncio.subprocess import argparse @@ -54,12 +9,11 @@ import ipaddress import json import logging from logging.config import dictConfig -from operator import truediv import os import platform import pwd import random -import select +import shlex import shutil import socket import string @@ -77,7 +31,6 @@ from contextlib import redirect_stdout import ssl from enum import Enum - from typing import Dict, List, Tuple, Optional, Union, Any, NoReturn, Callable, IO import re @@ -91,19 +44,64 @@ from threading import Thread, RLock from urllib.error import HTTPError from urllib.request import urlopen -cached_stdin = None +# Default container images ----------------------------------------------------- +DEFAULT_IMAGE = 'docker.io/ceph/daemon-base:latest-pacific-devel' +DEFAULT_IMAGE_IS_MASTER = False +DEFAULT_PROMETHEUS_IMAGE = 'docker.io/prom/prometheus:v2.18.1' +DEFAULT_NODE_EXPORTER_IMAGE = 'docker.io/prom/node-exporter:v0.18.1' +DEFAULT_GRAFANA_IMAGE = 'docker.io/ceph/ceph-grafana:6.7.4' +DEFAULT_ALERT_MANAGER_IMAGE = 'docker.io/prom/alertmanager:v0.20.0' +# ------------------------------------------------------------------------------ +LATEST_STABLE_RELEASE = 'pacific' +DATA_DIR = '/var/lib/ceph' +LOG_DIR = '/var/log/ceph' +LOCK_DIR = '/run/cephadm' +LOGROTATE_DIR = '/etc/logrotate.d' +UNIT_DIR = '/etc/systemd/system' +LOG_DIR_MODE = 0o770 +DATA_DIR_MODE = 0o700 +CONTAINER_INIT = True +CONTAINER_PREFERENCE = ['podman', 'docker'] # prefer podman to docker +MIN_PODMAN_VERSION = (2, 0, 2) +CUSTOM_PS1 = r'[ceph: \u@\h \W]\$ ' +DEFAULT_TIMEOUT = None # in seconds +DEFAULT_RETRY = 15 +SHELL_DEFAULT_CONF = '/etc/ceph/ceph.conf' +SHELL_DEFAULT_KEYRING = '/etc/ceph/ceph.client.admin.keyring' DATEFMT = '%Y-%m-%dT%H:%M:%S.%fZ' +logger: logging.Logger = None # type: ignore + +""" +You can invoke cephadm in two ways: + +1. The normal way, at the command line. + +2. By piping the script to the python3 binary. In this latter case, you should + prepend one or more lines to the beginning of the script. + + For arguments, + + injected_argv = [...] + + e.g., -logger: logging.Logger = None # type: ignore + injected_argv = ['ls'] + + For reading stdin from the '--config-json -' argument, + + injected_stdin = '...' +""" +cached_stdin = None ################################## + class BaseConfig: def __init__(self): - self.image: str = "" + self.image: str = '' self.docker: bool = False self.data_dir: str = DATA_DIR self.log_dir: str = LOG_DIR @@ -113,8 +111,11 @@ class BaseConfig: self.timeout: Optional[int] = DEFAULT_TIMEOUT self.retry: int = DEFAULT_RETRY self.env: List[str] = [] + self.memory_request: Optional[int] = None + self.memory_limit: Optional[int] = None - self.container_path: str = "" + self.container_init: bool = CONTAINER_INIT + self.container_path: str = '' def set_from_args(self, args: argparse.Namespace): argdict: Dict[str, Any] = vars(args) @@ -126,30 +127,23 @@ class BaseConfig: class CephadmContext: def __init__(self): - - self.__dict__["_args"] = None - self.__dict__["_conf"] = BaseConfig() - + self.__dict__['_args'] = None + self.__dict__['_conf'] = BaseConfig() def set_args(self, args: argparse.Namespace) -> None: self._conf.set_from_args(args) self._args = args - def has_function(self) -> bool: - return "func" in self._args - + return 'func' in self._args def __contains__(self, name: str) -> bool: return hasattr(self, name) - def __getattr__(self, name: str) -> Any: - if "_conf" in self.__dict__ and \ - hasattr(self._conf, name): + if '_conf' in self.__dict__ and hasattr(self._conf, name): return getattr(self._conf, name) - elif "_args" in self.__dict__ and \ - hasattr(self._args, name): + elif '_args' in self.__dict__ and hasattr(self._args, name): return getattr(self._args, name) else: return super().__getattribute__(name) @@ -163,9 +157,6 @@ class CephadmContext: super().__setattr__(name, value) -################################## - - # Log and console output config logging_config = { 'version': 1, @@ -176,9 +167,9 @@ logging_config = { }, }, 'handlers': { - 'console':{ - 'level':'INFO', - 'class':'logging.StreamHandler', + 'console': { + 'level': 'INFO', + 'class': 'logging.StreamHandler', }, 'log_file': { 'level': 'DEBUG', @@ -197,6 +188,7 @@ logging_config = { } } + class termcolor: yellow = '\033[93m' red = '\033[31m' @@ -215,7 +207,7 @@ class TimeoutExpired(Error): class Ceph(object): daemons = ('mon', 'mgr', 'mds', 'osd', 'rgw', 'rbd-mirror', - 'crash') + 'crash', 'cephfs-mirror') ################################## @@ -224,64 +216,95 @@ class Monitoring(object): """Define the configs for the monitoring containers""" port_map = { - "prometheus": [9095], # Avoid default 9090, due to conflict with cockpit UI - "node-exporter": [9100], - "grafana": [3000], - "alertmanager": [9093, 9094], + 'prometheus': [9095], # Avoid default 9090, due to conflict with cockpit UI + 'node-exporter': [9100], + 'grafana': [3000], + 'alertmanager': [9093, 9094], } components = { - "prometheus": { - "image": DEFAULT_PROMETHEUS_IMAGE, - "cpus": '2', - "memory": '4GB', - "args": [ - "--config.file=/etc/prometheus/prometheus.yml", - "--storage.tsdb.path=/prometheus", - "--web.listen-address=:{}".format(port_map['prometheus'][0]), + 'prometheus': { + 'image': DEFAULT_PROMETHEUS_IMAGE, + 'cpus': '2', + 'memory': '4GB', + 'args': [ + '--config.file=/etc/prometheus/prometheus.yml', + '--storage.tsdb.path=/prometheus', + '--web.listen-address=:{}'.format(port_map['prometheus'][0]), ], - "config-json-files": [ - "prometheus.yml", + 'config-json-files': [ + 'prometheus.yml', ], }, - "node-exporter": { - "image": DEFAULT_NODE_EXPORTER_IMAGE, - "cpus": "1", - "memory": "1GB", - "args": [ - "--no-collector.timex", + 'node-exporter': { + 'image': DEFAULT_NODE_EXPORTER_IMAGE, + 'cpus': '1', + 'memory': '1GB', + 'args': [ + '--no-collector.timex', ], }, - "grafana": { - "image": DEFAULT_GRAFANA_IMAGE, - "cpus": "2", - "memory": "4GB", - "args": [], - "config-json-files": [ - "grafana.ini", - "provisioning/datasources/ceph-dashboard.yml", - "certs/cert_file", - "certs/cert_key", + 'grafana': { + 'image': DEFAULT_GRAFANA_IMAGE, + 'cpus': '2', + 'memory': '4GB', + 'args': [], + 'config-json-files': [ + 'grafana.ini', + 'provisioning/datasources/ceph-dashboard.yml', + 'certs/cert_file', + 'certs/cert_key', ], }, - "alertmanager": { - "image": DEFAULT_ALERT_MANAGER_IMAGE, - "cpus": "2", - "memory": "2GB", - "args": [ - "--web.listen-address=:{}".format(port_map['alertmanager'][0]), - "--cluster.listen-address=:{}".format(port_map['alertmanager'][1]), + 'alertmanager': { + 'image': DEFAULT_ALERT_MANAGER_IMAGE, + 'cpus': '2', + 'memory': '2GB', + 'args': [ + '--web.listen-address=:{}'.format(port_map['alertmanager'][0]), + '--cluster.listen-address=:{}'.format(port_map['alertmanager'][1]), ], - "config-json-files": [ - "alertmanager.yml", + 'config-json-files': [ + 'alertmanager.yml', ], - "config-json-args": [ - "peers", + 'config-json-args': [ + 'peers', ], }, } # type: ignore + @staticmethod + def get_version(ctx, container_id, daemon_type): + # type: (CephadmContext, str, str) -> str + """ + :param: daemon_type Either "prometheus", "alertmanager" or "node-exporter" + """ + assert daemon_type in ('prometheus', 'alertmanager', 'node-exporter') + cmd = daemon_type.replace('-', '_') + code = -1 + err = '' + version = '' + if daemon_type == 'alertmanager': + for cmd in ['alertmanager', 'prometheus-alertmanager']: + _, err, code = call(ctx, [ + ctx.container_path, 'exec', container_id, cmd, + '--version' + ], verbosity=CallVerbosity.DEBUG) + if code == 0: + break + cmd = 'alertmanager' # reset cmd for version extraction + else: + _, err, code = call(ctx, [ + ctx.container_path, 'exec', container_id, cmd, '--version' + ], verbosity=CallVerbosity.DEBUG) + if code == 0 and \ + err.startswith('%s, version ' % cmd): + version = err.split(' ')[2] + return version + ################################## + + def populate_files(config_dir, config_files, uid, gid): # type: (str, Dict, int, int) -> None """create config files for different services""" @@ -294,6 +317,7 @@ def populate_files(config_dir, config_files, uid, gid): os.fchmod(f.fileno(), 0o600) f.write(config_content) + class NFSGanesha(object): """Defines a NFS-Ganesha container""" @@ -304,7 +328,7 @@ class NFSGanesha(object): required_files = ['ganesha.conf'] port_map = { - "nfs" : 2049, + 'nfs': 2049, } def __init__(self, @@ -333,8 +357,7 @@ class NFSGanesha(object): @classmethod def init(cls, ctx, fsid, daemon_id): # type: (CephadmContext, str, Union[int, str]) -> NFSGanesha - return cls(ctx, fsid, daemon_id, get_parm(ctx.config_json), - ctx.image) + return cls(ctx, fsid, daemon_id, get_parm(ctx.config_json), ctx.image) def get_container_mounts(self, data_dir): # type: (str) -> Dict[str, str] @@ -346,7 +369,7 @@ class NFSGanesha(object): cluster = self.rgw.get('cluster', 'ceph') rgw_user = self.rgw.get('user', 'admin') mounts[os.path.join(data_dir, 'keyring.rgw')] = \ - '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user) + '/var/lib/ceph/radosgw/%s-%s/keyring:z' % (cluster, rgw_user) return mounts @staticmethod @@ -362,8 +385,9 @@ class NFSGanesha(object): # type: (CephadmContext, str) -> Optional[str] version = None out, err, code = call(ctx, - [ctx.container_path, 'exec', container_id, - NFSGanesha.entrypoint, '-v']) + [ctx.container_path, 'exec', container_id, + NFSGanesha.entrypoint, '-v'], + verbosity=CallVerbosity.DEBUG) if code == 0: match = re.search(r'NFS-Ganesha Release\s*=\s*[V]*([\d.]+)', out) if match: @@ -436,7 +460,7 @@ class NFSGanesha(object): entrypoint = '/usr/bin/ganesha-rados-grace' assert self.pool - args=['--pool', self.pool] + args = ['--pool', self.pool] if self.namespace: args += ['--ns', self.namespace] if self.userid: @@ -523,8 +547,9 @@ class CephIscsi(object): # type: (CephadmContext, str) -> Optional[str] version = None out, err, code = call(ctx, - [ctx.container_path, 'exec', container_id, - '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"]) + [ctx.container_path, 'exec', container_id, + '/usr/bin/python3', '-c', "import pkg_resources; print(pkg_resources.require('ceph_iscsi')[0].version)"], + verbosity=CallVerbosity.DEBUG) if code == 0: version = out.strip() return version @@ -573,17 +598,17 @@ class CephIscsi(object): # type: (str, bool) -> List[str] mount_path = os.path.join(data_dir, 'configfs') if mount: - cmd = "if ! grep -qs {0} /proc/mounts; then " \ - "mount -t configfs none {0}; fi".format(mount_path) + cmd = 'if ! grep -qs {0} /proc/mounts; then ' \ + 'mount -t configfs none {0}; fi'.format(mount_path) else: - cmd = "if grep -qs {0} /proc/mounts; then " \ - "umount {0}; fi".format(mount_path) + cmd = 'if grep -qs {0} /proc/mounts; then ' \ + 'umount {0}; fi'.format(mount_path) return cmd.split() def get_tcmu_runner_container(self): # type: () -> CephContainer tcmu_container = get_container(self.ctx, self.fsid, self.daemon_type, self.daemon_id) - tcmu_container.entrypoint = "/usr/bin/tcmu-runner" + tcmu_container.entrypoint = '/usr/bin/tcmu-runner' tcmu_container.cname = self.get_container_name(desc='tcmu') # remove extra container args for tcmu container. # extra args could cause issue with forking service type @@ -592,6 +617,7 @@ class CephIscsi(object): ################################## + class HAproxy(object): """Defines an HAproxy container""" daemon_type = 'haproxy' @@ -666,7 +692,7 @@ class HAproxy(object): @staticmethod def get_container_mounts(data_dir: str) -> Dict[str, str]: mounts = dict() - mounts[os.path.join(data_dir,'haproxy')] = '/var/lib/haproxy' + mounts[os.path.join(data_dir, 'haproxy')] = '/var/lib/haproxy' return mounts ################################## @@ -754,7 +780,7 @@ class Keepalived(object): @staticmethod def get_container_mounts(data_dir: str) -> Dict[str, str]: mounts = dict() - mounts[os.path.join(data_dir,'keepalived.conf')] = '/etc/keepalived/keepalived.conf' + mounts[os.path.join(data_dir, 'keepalived.conf')] = '/etc/keepalived/keepalived.conf' return mounts ################################## @@ -892,7 +918,7 @@ def dict_get(d: Dict, key: str, default: Any = None, require: bool = False) -> A """ if require and key not in d.keys(): raise Error('{} missing from dict'.format(key)) - return d.get(key, default) # type: ignore + return d.get(key, default) # type: ignore ################################## @@ -931,18 +957,22 @@ def get_supported_daemons(): ################################## +class PortOccupiedError(Error): + pass + + def attempt_bind(ctx, s, address, port): # type: (CephadmContext, socket.socket, str, int) -> None try: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) s.bind((address, port)) except (socket.error, OSError) as e: # py2 and py3 - msg = 'Cannot bind to IP %s port %d: %s' % (address, port, e) - logger.warning(msg) if e.errno == errno.EADDRINUSE: - raise OSError(msg) - elif e.errno == errno.EADDRNOTAVAIL: - pass + msg = 'Cannot bind to IP %s port %d: %s' % (address, port, e) + logger.warning(msg) + raise PortOccupiedError(msg) + else: + raise e finally: s.close() @@ -951,16 +981,26 @@ def port_in_use(ctx, port_num): # type: (CephadmContext, int) -> bool """Detect whether a port is in use on the local machine - IPv4 and IPv6""" logger.info('Verifying port %d ...' % port_num) - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - attempt_bind(ctx, s, '0.0.0.0', port_num) - s = socket.socket(socket.AF_INET6, socket.SOCK_STREAM) - attempt_bind(ctx, s, '::', port_num) - except OSError: - return True - else: + def _port_in_use(af: socket.AddressFamily, address: str) -> bool: + try: + s = socket.socket(af, socket.SOCK_STREAM) + attempt_bind(ctx, s, address, port_num) + except PortOccupiedError: + return True + except OSError as e: + if e.errno in (errno.EAFNOSUPPORT, errno.EADDRNOTAVAIL): + # Ignore EAFNOSUPPORT and EADDRNOTAVAIL as two interfaces are + # being tested here and one might be intentionally be disabled. + # In that case no error should be raised. + return False + else: + raise e return False + return any(_port_in_use(af, address) for af, address in ( + (socket.AF_INET, '0.0.0.0'), + (socket.AF_INET6, '::') + )) def check_ip_port(ctx, ip, port): @@ -972,13 +1012,11 @@ def check_ip_port(ctx, ip, port): ip = unwrap_ipv6(ip) else: s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - try: - attempt_bind(ctx, s, ip, port) - except OSError as e: - raise Error(e) + attempt_bind(ctx, s, ip, port) ################################## + # this is an abbreviated version of # https://github.com/benediktschmitt/py-filelock/blob/master/filelock.py # that drops all of the compatibility (this is Unix/Linux only). @@ -1103,7 +1141,7 @@ class FileLock(object): self._lock_counter = max(0, self._lock_counter - 1) raise - return _Acquire_ReturnProxy(lock = self) + return _Acquire_ReturnProxy(lock=self) def release(self, force=False): """ @@ -1200,7 +1238,7 @@ if sys.version_info < (3, 8): self._threads = {} def is_active(self): - return True + return True def close(self): self._join_threads() @@ -1222,14 +1260,14 @@ if sys.version_info < (3, 8): threads = [thread for thread in list(self._threads.values()) if thread.is_alive()] if threads: - _warn(f"{self.__class__} has registered but not finished child processes", + _warn(f'{self.__class__} has registered but not finished child processes', ResourceWarning, source=self) def add_child_handler(self, pid, callback, *args): loop = events.get_event_loop() thread = threading.Thread(target=self._do_waitpid, - name=f"waitpid-{next(self._pid_counter)}", + name=f'waitpid-{next(self._pid_counter)}', args=(loop, pid, callback, args), daemon=True) self._threads[pid] = thread @@ -1255,7 +1293,7 @@ if sys.version_info < (3, 8): pid = expected_pid returncode = 255 logger.warning( - "Unknown child process pid %d, will report returncode 255", + 'Unknown child process pid %d, will report returncode 255', pid) else: if os.WIFEXITED(status): @@ -1269,7 +1307,7 @@ if sys.version_info < (3, 8): expected_pid, returncode) if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is closed", loop, pid) + logger.warning('Loop %r that handles pid %r is closed', loop, pid) else: loop.call_soon_threadsafe(callback, pid, returncode, *args) @@ -1297,6 +1335,7 @@ except ImportError: asyncio.set_event_loop(None) loop.close() + def call(ctx: CephadmContext, command: List[str], desc: Optional[str] = None, @@ -1318,7 +1357,7 @@ def call(ctx: CephadmContext, prefix += ': ' timeout = timeout or ctx.timeout - logger.debug("Running command: %s" % ' '.join(command)) + logger.debug('Running command: %s' % ' '.join(command)) async def tee(reader: asyncio.StreamReader) -> str: collected = StringIO() @@ -1360,12 +1399,12 @@ def call(ctx: CephadmContext, def call_throws( - ctx: CephadmContext, - command: List[str], - desc: Optional[str] = None, - verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, - timeout: Optional[int] = DEFAULT_TIMEOUT, - **kwargs) -> Tuple[str, str, int]: + ctx: CephadmContext, + command: List[str], + desc: Optional[str] = None, + verbosity: CallVerbosity = CallVerbosity.VERBOSE_ON_FAILURE, + timeout: Optional[int] = DEFAULT_TIMEOUT, + **kwargs) -> Tuple[str, str, int]: out, err, ret = call(ctx, command, desc, verbosity, timeout, **kwargs) if ret: raise RuntimeError('Failed command: %s' % ' '.join(command)) @@ -1375,17 +1414,17 @@ def call_throws( def call_timeout(ctx, command, timeout): # type: (CephadmContext, List[str], int) -> int logger.debug('Running command (timeout=%s): %s' - % (timeout, ' '.join(command))) + % (timeout, ' '.join(command))) def raise_timeout(command, timeout): # type: (List[str], int) -> NoReturn - msg = 'Command \'%s\' timed out after %s seconds' % (command, timeout) + msg = 'Command `%s` timed out after %s seconds' % (command, timeout) logger.debug(msg) raise TimeoutExpired(msg) try: return subprocess.call(command, timeout=timeout) - except subprocess.TimeoutExpired as e: + except subprocess.TimeoutExpired: raise_timeout(command, timeout) ################################## @@ -1409,10 +1448,10 @@ def is_available(ctx, what, func): break elif num > retry: raise Error('%s not available after %s tries' - % (what, retry)) + % (what, retry)) logger.info('%s not available, waiting (%s/%s)...' - % (what, num, retry)) + % (what, num, retry)) num += 1 time.sleep(2) @@ -1576,7 +1615,7 @@ def infer_fsid(func): if not is_fsid(daemon['fsid']): # 'unknown' fsid continue - elif "name" not in ctx or not ctx.name: + elif 'name' not in ctx or not ctx.name: # ctx.name not specified fsids_set.add(daemon['fsid']) elif daemon['name'] == ctx.name: @@ -1631,10 +1670,10 @@ def infer_config(func): def _get_default_image(ctx: CephadmContext): if DEFAULT_IMAGE_IS_MASTER: - warn = '''This is a development version of cephadm. + warn = """This is a development version of cephadm. For information regarding the latest stable release: https://docs.ceph.com/docs/{}/cephadm/install -'''.format(LATEST_STABLE_RELEASE) +""".format(LATEST_STABLE_RELEASE) for line in warn.splitlines(): logger.warning('{}{}{}'.format(termcolor.yellow, line, termcolor.end)) return DEFAULT_IMAGE @@ -1661,7 +1700,7 @@ def default_image(func): @wraps(func) def _default_image(ctx: CephadmContext): if not ctx.image: - if "name" in ctx and ctx.name: + if 'name' in ctx and ctx.name: type_ = ctx.name.split('.', 1)[0] if type_ in Monitoring.components: ctx.image = Monitoring.components[type_]['image'] @@ -1684,10 +1723,10 @@ def get_last_local_ceph_image(ctx: CephadmContext, container_path: str): :return: The most recent local ceph image (already pulled) """ out, _, _ = call_throws(ctx, - [container_path, 'images', - '--filter', 'label=ceph=True', - '--filter', 'dangling=false', - '--format', '{{.Repository}}@{{.Digest}}']) + [container_path, 'images', + '--filter', 'label=ceph=True', + '--filter', 'dangling=false', + '--format', '{{.Repository}}@{{.Digest}}']) return _filter_last_local_ceph_image(out) @@ -1763,7 +1802,7 @@ def make_log_dir(ctx, fsid, uid=None, gid=None): def make_var_run(ctx, fsid, uid, gid): # type: (CephadmContext, str, int, int) -> None call_throws(ctx, ['install', '-d', '-m0770', '-o', str(uid), '-g', str(gid), - '/var/run/ceph/%s' % fsid]) + '/var/run/ceph/%s' % fsid]) def copy_tree(ctx, src, dst, uid=None, gid=None): @@ -1779,15 +1818,15 @@ def copy_tree(ctx, src, dst, uid=None, gid=None): if os.path.isdir(dst): dst_dir = os.path.join(dst, os.path.basename(src_dir)) - logger.debug('copy directory \'%s\' -> \'%s\'' % (src_dir, dst_dir)) + logger.debug('copy directory `%s` -> `%s`' % (src_dir, dst_dir)) shutil.rmtree(dst_dir, ignore_errors=True) - shutil.copytree(src_dir, dst_dir) # dirs_exist_ok needs python 3.8 + shutil.copytree(src_dir, dst_dir) # dirs_exist_ok needs python 3.8 for dirpath, dirnames, filenames in os.walk(dst_dir): - logger.debug('chown %s:%s \'%s\'' % (uid, gid, dirpath)) + logger.debug('chown %s:%s `%s`' % (uid, gid, dirpath)) os.chown(dirpath, uid, gid) for filename in filenames: - logger.debug('chown %s:%s \'%s\'' % (uid, gid, filename)) + logger.debug('chown %s:%s `%s`' % (uid, gid, filename)) os.chown(os.path.join(dirpath, filename), uid, gid) @@ -1804,10 +1843,10 @@ def copy_files(ctx, src, dst, uid=None, gid=None): if os.path.isdir(dst): dst_file = os.path.join(dst, os.path.basename(src_file)) - logger.debug('copy file \'%s\' -> \'%s\'' % (src_file, dst_file)) + logger.debug('copy file `%s` -> `%s`' % (src_file, dst_file)) shutil.copyfile(src_file, dst_file) - logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file)) + logger.debug('chown %s:%s `%s`' % (uid, gid, dst_file)) os.chown(dst_file, uid, gid) @@ -1833,11 +1872,11 @@ def move_files(ctx, src, dst, uid=None, gid=None): else: logger.debug("move file '%s' -> '%s'" % (src_file, dst_file)) shutil.move(src_file, dst_file) - logger.debug('chown %s:%s \'%s\'' % (uid, gid, dst_file)) + logger.debug('chown %s:%s `%s`' % (uid, gid, dst_file)) os.chown(dst_file, uid, gid) -## copied from distutils ## +# copied from distutils def find_executable(executable, path=None): """Tries to find 'executable' in the directories listed in 'path'. A string listing directories separated by 'os.pathsep'; defaults to @@ -1854,7 +1893,7 @@ def find_executable(executable, path=None): path = os.environ.get('PATH', None) if path is None: try: - path = os.confstr("CS_PATH") + path = os.confstr('CS_PATH') except (AttributeError, ValueError): # os.confstr() or CS_PATH is not available path = os.defpath @@ -1937,7 +1976,7 @@ def check_unit(ctx, unit_name): if code == 0: enabled = True installed = True - elif "disabled" in out: + elif 'disabled' in out: installed = True except Exception as e: logger.warning('unable to run systemctl: %s' % e) @@ -2028,7 +2067,7 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id): '--setgroup', 'ceph', '--default-log-to-file=false', '--default-log-to-stderr=true', - '--default-log-stderr-prefix="debug "', + '--default-log-stderr-prefix=debug ', ] if daemon_type == 'mon': r += [ @@ -2042,9 +2081,9 @@ def get_daemon_args(ctx, fsid, daemon_type, daemon_id): config = get_parm(ctx.config_json) peers = config.get('peers', list()) # type: ignore for peer in peers: - r += ["--cluster.peer={}".format(peer)] + r += ['--cluster.peer={}'.format(peer)] # some alertmanager, by default, look elsewhere for a config - r += ["--config.file=/etc/alertmanager/alertmanager.yml"] + r += ['--config.file=/etc/alertmanager/alertmanager.yml'] elif daemon_type == NFSGanesha.daemon_type: nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) r += nfs_ganesha.get_daemon_args() @@ -2148,11 +2187,8 @@ def get_parm(option): if cached_stdin is not None: j = cached_stdin else: - try: - j = injected_stdin # type: ignore - except NameError: - j = sys.stdin.read() - cached_stdin = j + j = sys.stdin.read() + cached_stdin = j else: # inline json string if option[0] == '{' and option[-1] == '}': @@ -2162,12 +2198,12 @@ def get_parm(option): with open(option, 'r') as f: j = f.read() else: - raise Error("Config file {} not found".format(option)) + raise Error('Config file {} not found'.format(option)) try: js = json.loads(j) except ValueError as e: - raise Error("Invalid JSON in {}: {}".format(option, e)) + raise Error('Invalid JSON in {}: {}'.format(option, e)) else: return js @@ -2177,18 +2213,18 @@ def get_config_and_keyring(ctx): config = None keyring = None - if "config_json" in ctx and ctx.config_json: + if 'config_json' in ctx and ctx.config_json: d = get_parm(ctx.config_json) config = d.get('config') keyring = d.get('keyring') - if "config" in ctx and ctx.config: + if 'config' in ctx and ctx.config: with open(ctx.config, 'r') as f: config = f.read() - if "key" in ctx and ctx.key: + if 'key' in ctx and ctx.key: keyring = '[%s]\n\tkey = %s\n' % (ctx.name, ctx.key) - elif "keyring" in ctx and ctx.keyring: + elif 'keyring' in ctx and ctx.keyring: with open(ctx.keyring, 'r') as f: keyring = f.read() @@ -2217,7 +2253,7 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, if daemon_type in Ceph.daemons: if fsid: - run_path = os.path.join('/var/run/ceph', fsid); + run_path = os.path.join('/var/run/ceph', fsid) if os.path.exists(run_path): mounts[run_path] = '/var/run/ceph:z' log_dir = get_log_dir(fsid, ctx.log_dir) @@ -2236,7 +2272,7 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, mounts[data_dir] = cdata_dir + ':z' if not no_config: mounts[data_dir + '/config'] = '/etc/ceph/ceph.conf:z' - if daemon_type == 'rbd-mirror' or daemon_type == 'crash': + if daemon_type in ['rbd-mirror', 'cephfs-mirror', 'crash']: # these do not search for their keyrings in a data directory mounts[data_dir + '/keyring'] = '/etc/ceph/ceph.client.%s.%s.keyring' % (daemon_type, daemon_id) @@ -2265,8 +2301,8 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, mounts[ceph_folder + '/monitoring/prometheus/alerts'] = '/etc/prometheus/ceph' else: logger.error('{}{}{}'.format(termcolor.red, - 'Ceph shared source folder does not exist.', - termcolor.end)) + 'Ceph shared source folder does not exist.', + termcolor.end)) except AttributeError: pass @@ -2279,7 +2315,7 @@ def get_container_mounts(ctx, fsid, daemon_type, daemon_id, mounts['/proc'] = '/host/proc:ro' mounts['/sys'] = '/host/sys:ro' mounts['/'] = '/rootfs:ro' - elif daemon_type == "grafana": + elif daemon_type == 'grafana': mounts[os.path.join(data_dir, 'etc/grafana/grafana.ini')] = '/etc/grafana/grafana.ini:Z' mounts[os.path.join(data_dir, 'etc/grafana/provisioning/datasources')] = '/etc/grafana/provisioning/datasources:Z' mounts[os.path.join(data_dir, 'etc/grafana/certs')] = '/etc/grafana/certs:Z' @@ -2339,6 +2375,9 @@ def get_container(ctx: CephadmContext, elif daemon_type == 'rbd-mirror': entrypoint = '/usr/bin/rbd-mirror' name = 'client.rbd-mirror.%s' % daemon_id + elif daemon_type == 'cephfs-mirror': + entrypoint = '/usr/bin/cephfs-mirror' + name = 'client.cephfs-mirror.%s' % daemon_id elif daemon_type == 'crash': entrypoint = '/usr/bin/ceph-crash' name = 'client.crash.%s' % daemon_id @@ -2408,7 +2447,6 @@ def get_container(ctx: CephadmContext, envs=envs, privileged=privileged, ptrace=ptrace, - init=ctx.container_init, host_network=host_network, ) @@ -2448,7 +2486,7 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, ports = ports or [] if any([port_in_use(ctx, port) for port in ports]): - raise Error("TCP Port(s) '{}' required for {} already in use".format(",".join(map(str, ports)), daemon_type)) + raise Error("TCP Port(s) '{}' required for {} already in use".format(','.join(map(str, ports)), daemon_type)) data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) if reconfig and not os.path.exists(data_dir): @@ -2466,15 +2504,16 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, create_daemon_dirs(ctx, fsid, daemon_type, daemon_id, uid, gid) mon_dir = get_data_dir(fsid, ctx.data_dir, 'mon', daemon_id) log_dir = get_log_dir(fsid, ctx.log_dir) - out = CephContainer( + CephContainer( ctx, image=ctx.image, entrypoint='/usr/bin/ceph-mon', - args=['--mkfs', - '-i', str(daemon_id), - '--fsid', fsid, - '-c', '/tmp/config', - '--keyring', '/tmp/keyring', + args=[ + '--mkfs', + '-i', str(daemon_id), + '--fsid', fsid, + '-c', '/tmp/config', + '--keyring', '/tmp/keyring', ] + get_daemon_args(ctx, fsid, 'mon', daemon_id), volume_mounts={ log_dir: '/var/log/ceph:z', @@ -2512,9 +2551,9 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, else: if c: deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, - c, osd_fsid=osd_fsid) + c, osd_fsid=osd_fsid, ports=ports) else: - raise RuntimeError("attempting to deploy a daemon without a container image") + raise RuntimeError('attempting to deploy a daemon without a container image') if not os.path.exists(data_dir + '/unit.created'): with open(data_dir + '/unit.created', 'w') as f: @@ -2539,9 +2578,10 @@ def deploy_daemon(ctx, fsid, daemon_type, daemon_id, c, uid, gid, # ceph daemons do not need a restart; others (presumably) do to pick # up the new config call_throws(ctx, ['systemctl', 'reset-failed', - get_unit_name(fsid, daemon_type, daemon_id)]) + get_unit_name(fsid, daemon_type, daemon_id)]) call_throws(ctx, ['systemctl', 'restart', - get_unit_name(fsid, daemon_type, daemon_id)]) + get_unit_name(fsid, daemon_type, daemon_id)]) + def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, background=False): # type: (CephadmContext, IO[str], CephContainer, Optional[str], Optional[bool]) -> None @@ -2550,22 +2590,37 @@ def _write_container_cmd_to_bash(ctx, file_obj, container, comment=None, backgro # unit file, makes it easier to read and grok. file_obj.write('# ' + comment + '\n') # Sometimes, adding `--rm` to a run_cmd doesn't work. Let's remove the container manually - file_obj.write('! '+ ' '.join(container.rm_cmd()) + ' 2> /dev/null\n') + file_obj.write('! ' + ' '.join(container.rm_cmd()) + ' 2> /dev/null\n') # Sometimes, `podman rm` doesn't find the container. Then you'll have to add `--storage` if 'podman' in ctx.container_path: - file_obj.write('! '+ ' '.join(container.rm_cmd(storage=True)) + ' 2> /dev/null\n') + file_obj.write( + '! ' + + ' '.join([shlex.quote(a) for a in container.rm_cmd(storage=True)]) + + ' 2> /dev/null\n') # container run command - file_obj.write(' '.join(container.run_cmd()) + (' &' if background else '') + '\n') + file_obj.write( + ' '.join([shlex.quote(a) for a in container.run_cmd()]) + + (' &' if background else '') + '\n') -def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, - enable=True, start=True, - osd_fsid=None): - # type: (CephadmContext, str, int, int, str, Union[int, str], CephContainer, bool, bool, Optional[str]) -> None +def deploy_daemon_units( + ctx: CephadmContext, + fsid: str, + uid: int, + gid: int, + daemon_type: str, + daemon_id: Union[int, str], + c: 'CephContainer', + enable: bool = True, + start: bool = True, + osd_fsid: Optional[str] = None, + ports: Optional[List[int]] = None, +) -> None: # cmd data_dir = get_data_dir(fsid, ctx.data_dir, daemon_type, daemon_id) - with open(data_dir + '/unit.run.new', 'w') as f: + with open(data_dir + '/unit.run.new', 'w') as f, \ + open(data_dir + '/unit.meta.new', 'w') as metaf: f.write('set -e\n') if daemon_type in Ceph.daemons: @@ -2597,13 +2652,15 @@ def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, volume_mounts=get_container_mounts(ctx, fsid, daemon_type, daemon_id), bind_mounts=get_container_binds(ctx, fsid, daemon_type, daemon_id), cname='ceph-%s-%s.%s-activate' % (fsid, daemon_type, daemon_id), + memory_request=ctx.memory_request, + memory_limit=ctx.memory_limit, ) _write_container_cmd_to_bash(ctx, f, prestart, 'LVM OSDs use ceph-volume lvm activate') elif daemon_type == NFSGanesha.daemon_type: # add nfs to the rados grace db nfs_ganesha = NFSGanesha.init(ctx, fsid, daemon_id) prestart = nfs_ganesha.get_rados_grace_container('add') - _write_container_cmd_to_bash(ctx, f, prestart, 'add daemon to rados grace') + _write_container_cmd_to_bash(ctx, f, prestart, 'add daemon to rados grace') elif daemon_type == CephIscsi.daemon_type: f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=True)) + '\n') ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) @@ -2611,9 +2668,25 @@ def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, _write_container_cmd_to_bash(ctx, f, tcmu_container, 'iscsi tcmu-runnter container', background=True) _write_container_cmd_to_bash(ctx, f, c, '%s.%s' % (daemon_type, str(daemon_id))) + + # some metadata about the deploy + meta: Dict[str, Any] = {} + if 'meta_json' in ctx and ctx.meta_json: + meta = json.loads(ctx.meta_json) or {} + meta.update({ + 'memory_request': int(ctx.memory_request) if ctx.memory_request else None, + 'memory_limit': int(ctx.memory_limit) if ctx.memory_limit else None, + }) + if not meta.get('ports'): + meta['ports'] = ports + metaf.write(json.dumps(meta, indent=4) + '\n') + os.fchmod(f.fileno(), 0o600) + os.fchmod(metaf.fileno(), 0o600) os.rename(data_dir + '/unit.run.new', data_dir + '/unit.run') + os.rename(data_dir + '/unit.meta.new', + data_dir + '/unit.meta') # post-stop command(s) with open(data_dir + '/unit.poststop.new', 'w') as f: @@ -2643,7 +2716,7 @@ def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, # make sure we also stop the tcmu container ceph_iscsi = CephIscsi.init(ctx, fsid, daemon_id) tcmu_container = ceph_iscsi.get_tcmu_runner_container() - f.write('! '+ ' '.join(tcmu_container.stop_cmd()) + '\n') + f.write('! ' + ' '.join(tcmu_container.stop_cmd()) + '\n') f.write(' '.join(CephIscsi.configfs_mount_umount(data_dir, mount=False)) + '\n') os.fchmod(f.fileno(), 0o600) os.rename(data_dir + '/unit.poststop.new', @@ -2654,7 +2727,7 @@ def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, f.write(c.image + '\n') os.fchmod(f.fileno(), 0o600) os.rename(data_dir + '/unit.image.new', - data_dir + '/unit.image') + data_dir + '/unit.image') # systemd install_base_units(ctx, fsid) @@ -2677,7 +2750,6 @@ def deploy_daemon_units(ctx, fsid, uid, gid, daemon_type, daemon_id, c, call_throws(ctx, ['systemctl', 'start', unit_name]) - class Firewalld(object): def __init__(self, ctx): # type: (CephadmContext) -> None @@ -2694,11 +2766,11 @@ class Firewalld(object): if not enabled: logger.debug('firewalld.service is not enabled') return False - if state != "running": + if state != 'running': logger.debug('firewalld.service is not running') return False - logger.info("firewalld ready") + logger.info('firewalld ready') return True def enable_service_for(self, daemon_type): @@ -2717,7 +2789,7 @@ class Firewalld(object): return if not self.cmd: - raise RuntimeError("command not defined") + raise RuntimeError('command not defined') out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--query-service', svc], verbosity=CallVerbosity.DEBUG) if ret: @@ -2736,7 +2808,7 @@ class Firewalld(object): return if not self.cmd: - raise RuntimeError("command not defined") + raise RuntimeError('command not defined') for port in fw_ports: tcp_port = str(port) + '/tcp' @@ -2746,7 +2818,7 @@ class Firewalld(object): out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--add-port', tcp_port]) if ret: raise RuntimeError('unable to add port %s to current zone: %s' % - (tcp_port, err)) + (tcp_port, err)) else: logger.debug('firewalld port %s is enabled in current zone' % tcp_port) @@ -2757,7 +2829,7 @@ class Firewalld(object): return if not self.cmd: - raise RuntimeError("command not defined") + raise RuntimeError('command not defined') for port in fw_ports: tcp_port = str(port) + '/tcp' @@ -2767,11 +2839,11 @@ class Firewalld(object): out, err, ret = call(self.ctx, [self.cmd, '--permanent', '--remove-port', tcp_port]) if ret: raise RuntimeError('unable to remove port %s from current zone: %s' % - (tcp_port, err)) + (tcp_port, err)) else: - logger.info(f"Port {tcp_port} disabled") + logger.info(f'Port {tcp_port} disabled') else: - logger.info(f"firewalld port {tcp_port} already closed") + logger.info(f'firewalld port {tcp_port} already closed') def apply_rules(self): # type: () -> None @@ -2779,7 +2851,7 @@ class Firewalld(object): return if not self.cmd: - raise RuntimeError("command not defined") + raise RuntimeError('command not defined') call_throws(self.ctx, [self.cmd, '--reload']) @@ -2798,6 +2870,7 @@ def update_firewalld(ctx, daemon_type): firewall.open_ports(fw_ports) firewall.apply_rules() + def install_base_units(ctx, fsid): # type: (CephadmContext, str) -> None """ @@ -2826,14 +2899,15 @@ def install_base_units(ctx, fsid): # cluster unit existed = os.path.exists(ctx.unit_dir + '/ceph-%s.target' % fsid) with open(ctx.unit_dir + '/ceph-%s.target.new' % fsid, 'w') as f: - f.write('[Unit]\n' - 'Description=Ceph cluster {fsid}\n' - 'PartOf=ceph.target\n' - 'Before=ceph.target\n' - '\n' - '[Install]\n' - 'WantedBy=multi-user.target ceph.target\n'.format( - fsid=fsid) + f.write( + '[Unit]\n' + 'Description=Ceph cluster {fsid}\n' + 'PartOf=ceph.target\n' + 'Before=ceph.target\n' + '\n' + '[Install]\n' + 'WantedBy=multi-user.target ceph.target\n'.format( + fsid=fsid) ) os.rename(ctx.unit_dir + '/ceph-%s.target.new' % fsid, ctx.unit_dir + '/ceph-%s.target' % fsid) @@ -2858,7 +2932,7 @@ def install_base_units(ctx, fsid): compress sharedscripts postrotate - killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror || pkill -1 -x "ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror" || true + killall -q -1 ceph-mon ceph-mgr ceph-mds ceph-osd ceph-fuse radosgw rbd-mirror cephfs-mirror || pkill -1 -x 'ceph-mon|ceph-mgr|ceph-mds|ceph-osd|ceph-fuse|radosgw|rbd-mirror|cephfs-mirror' || true endscript missingok notifempty @@ -2872,10 +2946,11 @@ def get_unit_file(ctx, fsid): extra_args = '' if 'podman' in ctx.container_path: extra_args = ('ExecStartPre=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n' - 'ExecStopPost=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n' - 'Type=forking\n' - 'PIDFile=/%t/%n-pid\n') + 'ExecStopPost=-/bin/rm -f /%t/%n-pid /%t/%n-cid\n' + 'Type=forking\n' + 'PIDFile=/%t/%n-pid\n') + docker = 'docker' in ctx.container_path u = """# generated by cephadm [Unit] Description=Ceph %i for {fsid} @@ -2884,8 +2959,9 @@ Description=Ceph %i for {fsid} # http://www.freedesktop.org/wiki/Software/systemd/NetworkTarget # these can be removed once ceph-mon will dynamically change network # configuration. -After=network-online.target local-fs.target time-sync.target +After=network-online.target local-fs.target time-sync.target{docker_after} Wants=network-online.target local-fs.target time-sync.target +{docker_requires} PartOf=ceph-{fsid}.target Before=ceph-{fsid}.target @@ -2907,11 +2983,13 @@ StartLimitBurst=5 {extra_args} [Install] WantedBy=ceph-{fsid}.target -""".format( - container_path=ctx.container_path, - fsid=fsid, - data_dir=ctx.data_dir, - extra_args=extra_args) +""".format(container_path=ctx.container_path, + fsid=fsid, + data_dir=ctx.data_dir, + extra_args=extra_args, + # if docker, we depend on docker.service + docker_after=' docker.service' if docker else '', + docker_requires='Requires=docker.service\n' if docker else '') return u @@ -2931,8 +3009,10 @@ class CephContainer: privileged: bool = False, ptrace: bool = False, bind_mounts: Optional[List[List[str]]] = None, - init: bool = False, + init: Optional[bool] = None, host_network: bool = True, + memory_request: Optional[str] = None, + memory_limit: Optional[str] = None, ) -> None: self.ctx = ctx self.image = image @@ -2945,8 +3025,10 @@ class CephContainer: self.privileged = privileged self.ptrace = ptrace self.bind_mounts = bind_mounts if bind_mounts else [] - self.init = init + self.init = init if init else ctx.container_init self.host_network = host_network + self.memory_request = memory_request + self.memory_limit = memory_limit def run_cmd(self) -> List[str]: cmd_args: List[str] = [ @@ -2967,6 +3049,12 @@ class CephContainer: vols: List[str] = [] binds: List[str] = [] + if self.memory_request: + cmd_args.extend(['-e', 'POD_MEMORY_REQUEST', str(self.memory_request)]) + if self.memory_limit: + cmd_args.extend(['-e', 'POD_MEMORY_LIMIT', str(self.memory_limit)]) + cmd_args.extend(['--memory', str(self.memory_limit)]) + if self.host_network: cmd_args.append('--net=host') if self.entrypoint: @@ -2983,6 +3071,7 @@ class CephContainer: cmd_args.append('--cap-add=SYS_PTRACE') if self.init: cmd_args.append('--init') + envs += ['-e', 'CEPH_USE_RANDOM_NONCE=1'] if self.cname: cmd_args.extend(['--name', self.cname]) if self.envs: @@ -2995,9 +3084,10 @@ class CephContainer: binds = sum([['--mount', '{}'.format(','.join(bind))] for bind in self.bind_mounts], []) - return cmd_args + self.container_args + envs + vols + binds + [ - self.image, - ] + self.args # type: ignore + return \ + cmd_args + self.container_args + \ + envs + vols + binds + \ + [self.image] + self.args # type: ignore def shell_cmd(self, cmd: List[str]) -> List[str]: cmd_args: List[str] = [ @@ -3021,6 +3111,9 @@ class CephContainer: # let OSD etc read block devs that haven't been chowned '--group-add=disk', ]) + if self.init: + cmd_args.append('--init') + envs += ['-e', 'CEPH_USE_RANDOM_NONCE=1'] if self.envs: for env in self.envs: envs.extend(['-e', env]) @@ -3066,9 +3159,8 @@ class CephContainer: def run(self, timeout=DEFAULT_TIMEOUT): # type: (Optional[int]) -> str - out, _, _ = call_throws( - self.ctx, - self.run_cmd(), desc=self.entrypoint, timeout=timeout) + out, _, _ = call_throws(self.ctx, self.run_cmd(), + desc=self.entrypoint, timeout=timeout) return out ################################## @@ -3077,9 +3169,11 @@ class CephContainer: @infer_image def command_version(ctx): # type: (CephadmContext) -> int - out = CephContainer(ctx, ctx.image, 'ceph', ['--version']).run() - print(out.strip()) - return 0 + c = CephContainer(ctx, ctx.image, 'ceph', ['--version']) + out, err, ret = call(ctx, c.run_cmd(), desc=c.entrypoint) + if not ret: + print(out.strip()) + return ret ################################## @@ -3097,14 +3191,14 @@ def _pull_image(ctx, image): logger.info('Pulling container image %s...' % image) ignorelist = [ - "error creating read-write layer with ID", - "net/http: TLS handshake timeout", - "Digest did not match, expected", + 'error creating read-write layer with ID', + 'net/http: TLS handshake timeout', + 'Digest did not match, expected', ] cmd = [ctx.container_path, 'pull', image] if 'podman' in ctx.container_path and os.path.exists('/etc/ceph/podman-auth.json'): - cmd.append('--authfile=/etc/ceph/podman-auth.json') + cmd.append('--authfile=/etc/ceph/podman-auth.json') cmd_str = ' '.join(cmd) for sleep_secs in [1, 4, 25]: @@ -3115,10 +3209,11 @@ def _pull_image(ctx, image): if not any(pattern in err for pattern in ignorelist): raise RuntimeError('Failed command: %s' % cmd_str) - logger.info('"%s failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs)) + logger.info('`%s` failed transiently. Retrying. waiting %s seconds...' % (cmd_str, sleep_secs)) time.sleep(sleep_secs) raise RuntimeError('Failed command: %s: maximum retries reached' % cmd_str) + ################################## @@ -3147,14 +3242,15 @@ def get_image_info_from_inspect(out, image): raise Error('inspect {}: empty result'.format(image)) r = { 'image_id': normalize_container_id(image_id) - } # type: Dict[str, Union[str,List[str]]] + } # type: Dict[str, Union[str,List[str]]] if digests: r['repo_digests'] = digests[1:-1].split(' ') return r - ################################## -def check_subnet(subnets:str) -> Tuple[int, List[int], str]: + + +def check_subnet(subnets: str) -> Tuple[int, List[int], str]: """Determine whether the given string is a valid subnet :param subnets: subnet string, a single definition or comma separated list of CIDR subnets @@ -3162,23 +3258,24 @@ def check_subnet(subnets:str) -> Tuple[int, List[int], str]: """ rc = 0 - versions = set() + versions = set() errors = [] subnet_list = subnets.split(',') for subnet in subnet_list: # ensure the format of the string is as expected address/netmask if not re.search(r'\/\d+$', subnet): rc = 1 - errors.append(f"{subnet} is not in CIDR format (address/netmask)") + errors.append(f'{subnet} is not in CIDR format (address/netmask)') continue try: v = ipaddress.ip_network(subnet).version versions.add(v) except ValueError as e: rc = 1 - errors.append(f"{subnet} invalid: {str(e)}") - - return rc, list(versions), ", ".join(errors) + errors.append(f'{subnet} invalid: {str(e)}') + + return rc, list(versions), ', '.join(errors) + def unwrap_ipv6(address): # type: (str) -> str @@ -3195,7 +3292,7 @@ def wrap_ipv6(address): # the ValueError try: if ipaddress.ip_address(address).version == 6: - return f"[{address}]" + return f'[{address}]' except ValueError: pass @@ -3208,7 +3305,7 @@ def is_ipv6(address): try: return ipaddress.ip_address(address).version == 6 except ValueError: - logger.warning("Address: {} isn't a valid IP address".format(address)) + logger.warning('Address: {} is not a valid IP address'.format(address)) return False @@ -3216,7 +3313,7 @@ def prepare_mon_addresses( ctx: CephadmContext ) -> Tuple[str, bool, Optional[str]]: r = re.compile(r':(\d+)$') - base_ip = "" + base_ip = '' ipv6 = False if ctx.mon_ip: @@ -3234,7 +3331,7 @@ def prepare_mon_addresses( logger.warning('Using msgr2 protocol for unrecognized port %d' % port) addr_arg = '[v2:%s]' % ctx.mon_ip - base_ip = ctx.mon_ip[0:-(len(str(port)))-1] + base_ip = ctx.mon_ip[0:-(len(str(port))) - 1] check_ip_port(ctx, base_ip, port) else: base_ip = ctx.mon_ip @@ -3255,7 +3352,7 @@ def prepare_mon_addresses( port = int(hasport[0]) # strip off v1: or v2: prefix addr = re.sub(r'^\w+:', '', addr) - base_ip = addr[0:-(len(str(port)))-1] + base_ip = addr[0:-(len(str(port))) - 1] check_ip_port(ctx, base_ip, port) else: raise Error('must specify --mon-ip or --mon-addrv') @@ -3275,25 +3372,25 @@ def prepare_mon_addresses( if not mon_network: raise Error('Failed to infer CIDR network for mon ip %s; pass ' '--skip-mon-network to configure it later' % base_ip) - + return (addr_arg, ipv6, mon_network) def prepare_cluster_network(ctx: CephadmContext) -> Tuple[str, bool]: - cluster_network = "" + cluster_network = '' ipv6_cluster_network = False - # the cluster network may not exist on this node, so all we can do is + # the cluster network may not exist on this node, so all we can do is # validate that the address given is valid ipv4 or ipv6 subnet if ctx.cluster_network: rc, versions, err_msg = check_subnet(ctx.cluster_network) if rc: - raise Error(f"Invalid --cluster-network parameter: {err_msg}") + raise Error(f'Invalid --cluster-network parameter: {err_msg}') cluster_network = ctx.cluster_network ipv6_cluster_network = True if 6 in versions else False else: - logger.info("- internal network (--cluster-network) has not " - "been provided, OSD replication will default to " - "the public_network") + logger.info('- internal network (--cluster-network) has not ' + 'been provided, OSD replication will default to ' + 'the public_network') return cluster_network, ipv6_cluster_network @@ -3302,7 +3399,7 @@ def create_initial_keys( ctx: CephadmContext, uid: int, gid: int, mgr_id: str -) -> Tuple[str, str, str, Any, Any]: # type: ignore +) -> Tuple[str, str, str, Any, Any]: # type: ignore _image = ctx.image @@ -3344,8 +3441,8 @@ def create_initial_keys( % (mon_key, admin_key, mgr_id, mgr_key)) admin_keyring = write_tmp('[client.admin]\n' - '\tkey = ' + admin_key + '\n', - uid, gid) + '\tkey = ' + admin_key + '\n', + uid, gid) # tmp keyring file bootstrap_keyring = write_tmp(keyring, uid, gid) @@ -3365,17 +3462,18 @@ def create_initial_monmap( ctx, image=ctx.image, entrypoint='/usr/bin/monmaptool', - args=['--create', - '--clobber', - '--fsid', fsid, - '--addv', mon_id, mon_addr, - '/tmp/monmap' + args=[ + '--create', + '--clobber', + '--fsid', fsid, + '--addv', mon_id, mon_addr, + '/tmp/monmap' ], volume_mounts={ monmap.name: '/tmp/monmap:z', }, ).run() - logger.debug(f"monmaptool for {mon_id} {mon_addr} on {out}") + logger.debug(f'monmaptool for {mon_id} {mon_addr} on {out}') # pass monmap file to ceph user for use by ceph-mon --mkfs below os.fchown(monmap.fileno(), uid, gid) @@ -3397,12 +3495,13 @@ def prepare_create_mon( ctx, image=ctx.image, entrypoint='/usr/bin/ceph-mon', - args=['--mkfs', - '-i', mon_id, - '--fsid', fsid, - '-c', '/dev/null', - '--monmap', '/tmp/monmap', - '--keyring', '/tmp/keyring', + args=[ + '--mkfs', + '-i', mon_id, + '--fsid', fsid, + '-c', '/dev/null', + '--monmap', '/tmp/monmap', + '--keyring', '/tmp/keyring', ] + get_daemon_args(ctx, fsid, 'mon', mon_id), volume_mounts={ log_dir: '/var/log/ceph:z', @@ -3411,7 +3510,7 @@ def prepare_create_mon( monmap_path: '/tmp/monmap:z', }, ).run() - logger.debug(f"create mon.{mon_id} on {out}") + logger.debug(f'create mon.{mon_id} on {out}') return (mon_dir, log_dir) @@ -3421,6 +3520,7 @@ def create_mon( fsid: str, mon_id: str ) -> None: mon_c = get_container(ctx, fsid, 'mon', mon_id) + ctx.meta_json = json.dumps({'service_name': 'mon'}) deploy_daemon(ctx, fsid, 'mon', mon_id, mon_c, uid, gid, config=None, keyring=None) @@ -3447,7 +3547,7 @@ def wait_for_mon( # wait for the service to become available def is_mon_available(): # type: () -> bool - timeout=ctx.timeout if ctx.timeout else 60 # seconds + timeout = ctx.timeout if ctx.timeout else 60 # seconds out, err, ret = call(ctx, c.run_cmd(), desc=c.entrypoint, timeout=timeout) @@ -3466,14 +3566,16 @@ def create_mgr( mgr_keyring = '[mgr.%s]\n\tkey = %s\n' % (mgr_id, mgr_key) mgr_c = get_container(ctx, fsid, 'mgr', mgr_id) # Note:the default port used by the Prometheus node exporter is opened in fw + ctx.meta_json = json.dumps({'service_name': 'mgr'}) deploy_daemon(ctx, fsid, 'mgr', mgr_id, mgr_c, uid, gid, config=config, keyring=mgr_keyring, ports=[9283]) # wait for the service to become available logger.info('Waiting for mgr to start...') + def is_mgr_available(): # type: () -> bool - timeout=ctx.timeout if ctx.timeout else 60 # seconds + timeout = ctx.timeout if ctx.timeout else 60 # seconds try: out = clifunc(['status', '-f', 'json-pretty'], timeout=timeout) j = json.loads(out) @@ -3520,12 +3622,12 @@ def prepare_ssh( with open(ctx.output_pub_ssh_key, 'w') as f: f.write(ssh_pub) - logger.info('Wrote public SSH key to to %s' % ctx.output_pub_ssh_key) + logger.info('Wrote public SSH key to %s' % ctx.output_pub_ssh_key) - logger.info('Adding key to %s@localhost\'s authorized_keys...' % ctx.ssh_user) + logger.info('Adding key to %s@localhost authorized_keys...' % ctx.ssh_user) try: s_pwd = pwd.getpwnam(ctx.ssh_user) - except KeyError as e: + except KeyError: raise Error('Cannot find uid/gid for ssh-user: %s' % (ctx.ssh_user)) ssh_uid = s_pwd.pw_uid ssh_gid = s_pwd.pw_gid @@ -3541,12 +3643,12 @@ def prepare_ssh( with open(auth_keys_file, 'r') as f: f.seek(0, os.SEEK_END) if f.tell() > 0: - f.seek(f.tell()-1, os.SEEK_SET) # go to last char + f.seek(f.tell() - 1, os.SEEK_SET) # go to last char if f.read() != '\n': add_newline = True with open(auth_keys_file, 'a') as f: - os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it + os.fchown(f.fileno(), ssh_uid, ssh_gid) # just in case we created it os.fchmod(f.fileno(), 0o600) # just in case we created it if add_newline: f.write('\n') @@ -3559,10 +3661,10 @@ def prepare_ssh( except RuntimeError as e: raise Error('Failed to add host <%s>: %s' % (host, e)) - for t in ['mon', 'mgr', 'crash']: - if ctx.orphan_initial_daemons: + for t in ['mon', 'mgr']: + if not ctx.orphan_initial_daemons: logger.info('Deploying %s service with default placement...' % t) - cli(['orch', 'apply', t, '--unmanaged']) + cli(['orch', 'apply', t]) else: logger.info('Deploying unmanaged %s service...' % t) cli(['orch', 'apply', t, '--unmanaged']) @@ -3587,7 +3689,7 @@ def prepare_dashboard( # Configure SSL port (cephadm only allows to configure dashboard SSL port) # if the user does not want to use SSL he can change this setting once the cluster is up - cli(["config", "set", "mgr", "mgr/dashboard/ssl_server_port" , str(ctx.ssl_dashboard_port)]) + cli(['config', 'set', 'mgr', 'mgr/dashboard/ssl_server_port', str(ctx.ssl_dashboard_port)]) # configuring dashboard parameters logger.info('Enabling the dashboard module...') @@ -3693,18 +3795,17 @@ def finish_bootstrap_config( ]) if mon_network: - logger.info(f"Setting mon public_network to {mon_network}") + logger.info(f'Setting mon public_network to {mon_network}') cli(['config', 'set', 'mon', 'public_network', mon_network]) - + if cluster_network: - logger.info(f"Setting cluster_network to {cluster_network}") + logger.info(f'Setting cluster_network to {cluster_network}') cli(['config', 'set', 'global', 'cluster_network', cluster_network]) if ipv6 or ipv6_cluster_network: logger.info('Enabling IPv6 (ms_bind_ipv6) binding') cli(['config', 'set', 'global', 'ms_bind_ipv6', 'true']) - with open(ctx.output_config, 'w') as f: f.write(config) logger.info('Wrote config to %s' % ctx.output_config) @@ -3721,7 +3822,7 @@ def command_bootstrap(ctx): ctx.output_config = os.path.join(ctx.output_dir, 'ceph.conf') if not ctx.output_keyring: ctx.output_keyring = os.path.join(ctx.output_dir, - 'ceph.client.admin.keyring') + 'ceph.client.admin.keyring') if not ctx.output_pub_ssh_key: ctx.output_pub_ssh_key = os.path.join(ctx.output_dir, 'ceph.pub') @@ -3731,17 +3832,16 @@ def command_bootstrap(ctx): if not ctx.allow_overwrite: if os.path.exists(f): raise Error('%s already exists; delete or pass ' - '--allow-overwrite to overwrite' % f) + '--allow-overwrite to overwrite' % f) dirname = os.path.dirname(f) if dirname and not os.path.exists(dirname): fname = os.path.basename(f) - logger.info(f"Creating directory {dirname} for {fname}") + logger.info(f'Creating directory {dirname} for {fname}') try: # use makedirs to create intermediate missing dirs os.makedirs(dirname, 0o755) except PermissionError: - raise Error(f"Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.") - + raise Error(f'Unable to create {dirname} due to permissions failure. Retry with root, or sudo or preallocate the directory.') if not ctx.skip_prepare_host: command_prepare_host(ctx) @@ -3757,8 +3857,8 @@ def command_bootstrap(ctx): mgr_id = ctx.mgr_id or generate_service_id() logger.info('Cluster fsid: %s' % fsid) - l = FileLock(ctx, fsid) - l.acquire() + lock = FileLock(ctx, fsid) + lock.acquire() (addr_arg, ipv6, mon_network) = prepare_mon_addresses(ctx) cluster_network, ipv6_cluster_network = prepare_cluster_network(ctx) @@ -3769,15 +3869,13 @@ def command_bootstrap(ctx): (uid, gid) = extract_uid_gid(ctx) # create some initial keys - (mon_key, mgr_key, admin_key, - bootstrap_keyring, admin_keyring - ) = \ + (mon_key, mgr_key, admin_key, bootstrap_keyring, admin_keyring) = \ create_initial_keys(ctx, uid, gid, mgr_id) monmap = create_initial_monmap(ctx, uid, gid, fsid, mon_id, addr_arg) (mon_dir, log_dir) = \ prepare_create_mon(ctx, uid, gid, fsid, mon_id, - bootstrap_keyring.name, monmap.name) + bootstrap_keyring.name, monmap.name) with open(mon_dir + '/config', 'w') as f: os.fchown(f.fileno(), uid, gid) @@ -3832,12 +3930,14 @@ def command_bootstrap(ctx): # necessary try: out = cli(['mgr', 'stat']) - except Exception as e: + except Exception: out = cli(['mgr', 'dump']) j = json.loads(out) epoch = j['epoch'] + # wait for mgr to have it logger.info('Waiting for the mgr to restart...') + def mgr_has_latest_epoch(): # type: () -> bool try: @@ -3859,31 +3959,29 @@ def command_bootstrap(ctx): cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_username', ctx.registry_username, '--force']) cli(['config', 'set', 'mgr', 'mgr/cephadm/registry_password', ctx.registry_password, '--force']) - if ctx.container_init: - cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(ctx.container_init), '--force']) + cli(['config', 'set', 'mgr', 'mgr/cephadm/container_init', str(ctx.container_init), '--force']) if ctx.with_exporter: cli(['config-key', 'set', 'mgr/cephadm/exporter_enabled', 'true']) if ctx.exporter_config: - logger.info("Applying custom cephadm exporter settings") + logger.info('Applying custom cephadm exporter settings') # validated within the parser, so we can just apply to the store with tempfile.NamedTemporaryFile(buffering=0) as tmp: tmp.write(json.dumps(ctx.exporter_config).encode('utf-8')) mounts = { - tmp.name: "/tmp/exporter-config.json:z" + tmp.name: '/tmp/exporter-config.json:z' } - cli(["cephadm", "set-exporter-config", "-i", "/tmp/exporter-config.json"], extra_mounts=mounts) - logger.info("-> Use ceph orch apply cephadm-exporter to deploy") + cli(['cephadm', 'set-exporter-config', '-i', '/tmp/exporter-config.json'], extra_mounts=mounts) + logger.info('-> Use ceph orch apply cephadm-exporter to deploy') else: # generate a default SSL configuration for the exporter(s) - logger.info("Generating a default cephadm exporter configuration (self-signed)") + logger.info('Generating a default cephadm exporter configuration (self-signed)') cli(['cephadm', 'generate-exporter-config']) # # deploy the service (commented out until the cephadm changes are in the ceph container build) logger.info('Deploying cephadm exporter service with default placement...') cli(['orch', 'apply', 'cephadm-exporter']) - if not ctx.skip_dashboard: prepare_dashboard(ctx, uid, gid, cli, wait_for_mgr_restart) @@ -3924,9 +4022,10 @@ def command_bootstrap(ctx): ################################## + def command_registry_login(ctx: CephadmContext): if ctx.registry_json: - logger.info("Pulling custom registry login info from %s." % ctx.registry_json) + logger.info('Pulling custom registry login info from %s.' % ctx.registry_json) d = get_parm(ctx.registry_json) if d.get('url') and d.get('username') and d.get('password'): ctx.registry_url = d.get('url') @@ -3934,23 +4033,24 @@ def command_registry_login(ctx: CephadmContext): ctx.registry_password = d.get('password') registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password) else: - raise Error("json provided for custom registry login did not include all necessary fields. " - "Please setup json file as\n" - "{\n" - " \"url\": \"REGISTRY_URL\",\n" - " \"username\": \"REGISTRY_USERNAME\",\n" - " \"password\": \"REGISTRY_PASSWORD\"\n" - "}\n") + raise Error('json provided for custom registry login did not include all necessary fields. ' + 'Please setup json file as\n' + '{\n' + ' "url": "REGISTRY_URL",\n' + ' "username": "REGISTRY_USERNAME",\n' + ' "password": "REGISTRY_PASSWORD"\n' + '}\n') elif ctx.registry_url and ctx.registry_username and ctx.registry_password: registry_login(ctx, ctx.registry_url, ctx.registry_username, ctx.registry_password) else: - raise Error("Invalid custom registry arguments received. To login to a custom registry include " - "--registry-url, --registry-username and --registry-password " - "options or --registry-json option") + raise Error('Invalid custom registry arguments received. To login to a custom registry include ' + '--registry-url, --registry-username and --registry-password ' + 'options or --registry-json option') return 0 + def registry_login(ctx: CephadmContext, url, username, password): - logger.info("Logging into custom registry.") + logger.info('Logging into custom registry.') try: container_path = ctx.container_path cmd = [container_path, 'login', @@ -3961,8 +4061,8 @@ def registry_login(ctx: CephadmContext, url, username, password): out, _, _ = call_throws(ctx, cmd) if 'podman' in container_path: os.chmod('/etc/ceph/podman-auth.json', 0o600) - except: - raise Error("Failed to login to custom registry @ %s as %s with given password" % (ctx.registry_url, ctx.registry_username)) + except Exception: + raise Error('Failed to login to custom registry @ %s as %s with given password' % (ctx.registry_url, ctx.registry_username)) ################################## @@ -3979,7 +4079,7 @@ def extract_uid_gid_monitoring(ctx, daemon_type): elif daemon_type == 'alertmanager': uid, gid = extract_uid_gid(ctx, file_path=['/etc/alertmanager', '/etc/prometheus']) else: - raise Error("{} not implemented yet".format(daemon_type)) + raise Error('{} not implemented yet'.format(daemon_type)) return uid, gid @@ -3988,8 +4088,8 @@ def command_deploy(ctx): # type: (CephadmContext) -> None daemon_type, daemon_id = ctx.name.split('.', 1) - l = FileLock(ctx, ctx.fsid) - l.acquire() + lock = FileLock(ctx, ctx.fsid) + lock.acquire() if daemon_type not in get_supported_daemons(): raise Error('daemon type %s not recognized' % daemon_type) @@ -4009,7 +4109,7 @@ def command_deploy(ctx): logger.info('%s daemon %s ...' % ('Deploy', ctx.name)) # Get and check ports explicitly required to be opened - daemon_ports = [] # type: List[int] + daemon_ports = [] # type: List[int] # only check port in use if not reconfig or redeploy since service # we are redeploying/reconfiguring will already be using the port @@ -4037,17 +4137,17 @@ def command_deploy(ctx): daemon_ports.extend(Monitoring.port_map[daemon_type]) # make sure provided config-json is sufficient - config = get_parm(ctx.config_json) # type: ignore + config = get_parm(ctx.config_json) # type: ignore required_files = Monitoring.components[daemon_type].get('config-json-files', list()) required_args = Monitoring.components[daemon_type].get('config-json-args', list()) if required_files: if not config or not all(c in config.get('files', {}).keys() for c in required_files): # type: ignore - raise Error("{} deployment requires config-json which must " - "contain file content for {}".format(daemon_type.capitalize(), ', '.join(required_files))) + raise Error('{} deployment requires config-json which must ' + 'contain file content for {}'.format(daemon_type.capitalize(), ', '.join(required_files))) if required_args: if not config or not all(c in config.keys() for c in required_args): # type: ignore - raise Error("{} deployment requires config-json which must " - "contain arg for {}".format(daemon_type.capitalize(), ', '.join(required_args))) + raise Error('{} deployment requires config-json which must ' + 'contain arg for {}'.format(daemon_type.capitalize(), ', '.join(required_args))) uid, gid = extract_uid_gid_monitoring(ctx, daemon_type) c = get_container(ctx, ctx.fsid, daemon_type, daemon_id) @@ -4111,8 +4211,8 @@ def command_deploy(ctx): gid = os.getgid() config_js = get_parm(ctx.config_json) # type: Dict[str, str] if not daemon_ports: - logger.info("cephadm-exporter will use default port ({})".format(CephadmDaemon.default_port)) - daemon_ports =[CephadmDaemon.default_port] + logger.info('cephadm-exporter will use default port ({})'.format(CephadmDaemon.default_port)) + daemon_ports = [CephadmDaemon.default_port] CephadmDaemon.validate_config(config_js) @@ -4163,7 +4263,7 @@ def command_shell(ctx): if not ctx.keyring and os.path.exists(SHELL_DEFAULT_KEYRING): ctx.keyring = SHELL_DEFAULT_KEYRING - container_args = [] # type: List[str] + container_args = [] # type: List[str] mounts = get_container_mounts(ctx, ctx.fsid, daemon_type, daemon_id, no_config=True if ctx.config else False) binds = get_container_binds(ctx, ctx.fsid, daemon_type, daemon_id) @@ -4188,7 +4288,7 @@ def command_shell(ctx): container_args += [ '-it', '-e', 'LANG=C', - '-e', "PS1=%s" % CUSTOM_PS1, + '-e', 'PS1=%s' % CUSTOM_PS1, ] if ctx.fsid: home = os.path.join(ctx.data_dir, ctx.fsid, 'home') @@ -4225,7 +4325,7 @@ def command_enter(ctx): if not ctx.fsid: raise Error('must pass --fsid to specify cluster') (daemon_type, daemon_id) = ctx.name.split('.', 1) - container_args = [] # type: List[str] + container_args = [] # type: List[str] if ctx.command: command = ctx.command else: @@ -4233,7 +4333,7 @@ def command_enter(ctx): container_args += [ '-it', '-e', 'LANG=C', - '-e', "PS1=%s" % CUSTOM_PS1, + '-e', 'PS1=%s' % CUSTOM_PS1, ] c = CephContainer( ctx, @@ -4255,10 +4355,10 @@ def command_ceph_volume(ctx): if ctx.fsid: make_log_dir(ctx, ctx.fsid) - l = FileLock(ctx, ctx.fsid) - l.acquire() + lock = FileLock(ctx, ctx.fsid) + lock.acquire() - (uid, gid) = (0, 0) # ceph-volume runs as root + (uid, gid) = (0, 0) # ceph-volume runs as root mounts = get_container_mounts(ctx, ctx.fsid, 'osd', None) tmp_config = None @@ -4327,8 +4427,8 @@ def command_logs(ctx): # call this directly, without our wrapper, so that we get an unmolested # stdout with logger prefixing. - logger.debug("Running command: %s" % ' '.join(cmd)) - subprocess.call(cmd) # type: ignore + logger.debug('Running command: %s' % ' '.join(cmd)) + subprocess.call(cmd) # type: ignore ################################## @@ -4336,11 +4436,12 @@ def command_logs(ctx): def list_networks(ctx): # type: (CephadmContext) -> Dict[str,List[str]] - ## sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag, - ## so we'll need to use a regex to parse 'ip' command output. - #out, _, _ = call_throws(['ip', '-j', 'route', 'ls']) - #j = json.loads(out) - #for x in j: + # sadly, 18.04's iproute2 4.15.0-2ubun doesn't support the -j flag, + # so we'll need to use a regex to parse 'ip' command output. + # + # out, _, _ = call_throws(['ip', '-j', 'route', 'ls']) + # j = json.loads(out) + # for x in j: res = _list_ipv4_networks(ctx) res.update(_list_ipv6_networks(ctx)) @@ -4420,6 +4521,27 @@ def command_ls(ctx): print(json.dumps(ls, indent=4)) +def with_units_to_int(v: str) -> int: + if v.endswith('iB'): + v = v[:-2] + elif v.endswith('B'): + v = v[:-1] + mult = 1 + if v[-1].upper() == 'K': + mult = 1024 + v = v[:-1] + elif v[-1].upper() == 'M': + mult = 1024 * 1024 + v = v[:-1] + elif v[-1].upper() == 'G': + mult = 1024 * 1024 * 1024 + v = v[:-1] + elif v[-1].upper() == 'T': + mult = 1024 * 1024 * 1024 * 1024 + v = v[:-1] + return int(float(v) * mult) + + def list_daemons(ctx, detail=True, legacy_dir=None): # type: (CephadmContext, bool, Optional[str]) -> List[Dict[str, str]] host_version: Optional[str] = None @@ -4436,6 +4558,22 @@ def list_daemons(ctx, detail=True, legacy_dir=None): # keep track of image digests seen_digests = {} # type: Dict[str, List[str]] + # keep track of memory usage we've seen + seen_memusage = {} # type: Dict[str, int] + out, err, code = call( + ctx, + [container_path, 'stats', '--format', '{{.ID}},{{.MemUsage}}', '--no-stream'], + verbosity=CallVerbosity.DEBUG + ) + seen_memusage_cid_len = 0 + if not code: + for line in out.splitlines(): + (cid, usage) = line.split(',') + (used, limit) = usage.split(' / ') + seen_memusage[cid] = with_units_to_int(used) + if not seen_memusage_cid_len: + seen_memusage_cid_len = len(cid) + # /var/lib/ceph if os.path.exists(data_dir): for i in os.listdir(data_dir): @@ -4445,10 +4583,9 @@ def list_daemons(ctx, detail=True, legacy_dir=None): if '-' not in j: continue (cluster, daemon_id) = j.split('-', 1) - fsid = get_legacy_daemon_fsid( - ctx, - cluster, daemon_type, daemon_id, - legacy_dir=legacy_dir) + fsid = get_legacy_daemon_fsid(ctx, + cluster, daemon_type, daemon_id, + legacy_dir=legacy_dir) legacy_unit_name = 'ceph-%s@%s' % (daemon_type, daemon_id) val: Dict[str, Any] = { 'style': 'legacy', @@ -4461,7 +4598,9 @@ def list_daemons(ctx, detail=True, legacy_dir=None): check_unit(ctx, legacy_unit_name) if not host_version: try: - out, err, code = call(ctx, ['ceph', '-v']) + out, err, code = call(ctx, + ['ceph', '-v'], + verbosity=CallVerbosity.DEBUG) if not code and out.startswith('ceph version '): host_version = out.split(' ')[2] except Exception: @@ -4496,13 +4635,12 @@ def list_daemons(ctx, detail=True, legacy_dir=None): version = None start_stamp = None - out, err, code = call(ctx, - [ - container_path, 'inspect', - '--format', '{{.Id}},{{.Config.Image}},{{.Image}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}', - 'ceph-%s-%s' % (fsid, j) - ], - verbosity=CallVerbosity.DEBUG) + cmd = [ + container_path, 'inspect', + '--format', '{{.Id}},{{.Config.Image}},{{.Image}},{{.Created}},{{index .Config.Labels "io.ceph.version"}}', + 'ceph-%s-%s' % (fsid, j) + ] + out, err, code = call(ctx, cmd, verbosity=CallVerbosity.DEBUG) if not code: (container_id, image_name, image_id, start, version) = out.strip().split(',') @@ -4516,7 +4654,7 @@ def list_daemons(ctx, detail=True, legacy_dir=None): out, err, code = call( ctx, [ - container_path, 'image', 'inspect', image_name, + container_path, 'image', 'inspect', image_id, '--format', '{{.RepoDigests}}', ], verbosity=CallVerbosity.DEBUG) @@ -4528,22 +4666,24 @@ def list_daemons(ctx, detail=True, legacy_dir=None): if not version or '.' not in version: version = seen_versions.get(image_id, None) if daemon_type == NFSGanesha.daemon_type: - version = NFSGanesha.get_version(ctx,container_id) + version = NFSGanesha.get_version(ctx, container_id) if daemon_type == CephIscsi.daemon_type: - version = CephIscsi.get_version(ctx,container_id) + version = CephIscsi.get_version(ctx, container_id) elif not version: if daemon_type in Ceph.daemons: out, err, code = call(ctx, - [container_path, 'exec', container_id, - 'ceph', '-v']) + [container_path, 'exec', container_id, + 'ceph', '-v'], + verbosity=CallVerbosity.DEBUG) if not code and \ out.startswith('ceph version '): version = out.split(' ')[2] seen_versions[image_id] = version elif daemon_type == 'grafana': out, err, code = call(ctx, - [container_path, 'exec', container_id, - 'grafana-server', '-v']) + [container_path, 'exec', container_id, + 'grafana-server', '-v'], + verbosity=CallVerbosity.DEBUG) if not code and \ out.startswith('Version '): version = out.split(' ')[1] @@ -4551,26 +4691,22 @@ def list_daemons(ctx, detail=True, legacy_dir=None): elif daemon_type in ['prometheus', 'alertmanager', 'node-exporter']: - cmd = daemon_type.replace('-', '_') - out, err, code = call(ctx, - [container_path, 'exec', container_id, - cmd, '--version']) - if not code and \ - err.startswith('%s, version ' % cmd): - version = err.split(' ')[2] - seen_versions[image_id] = version + version = Monitoring.get_version(ctx, container_id, daemon_type) + seen_versions[image_id] = version elif daemon_type == 'haproxy': out, err, code = call(ctx, - [container_path, 'exec', container_id, - 'haproxy', '-v']) + [container_path, 'exec', container_id, + 'haproxy', '-v'], + verbosity=CallVerbosity.DEBUG) if not code and \ out.startswith('HA-Proxy version '): version = out.split(' ')[2] seen_versions[image_id] = version elif daemon_type == 'keepalived': out, err, code = call(ctx, - [container_path, 'exec', container_id, - 'keepalived', '--version']) + [container_path, 'exec', container_id, + 'keepalived', '--version'], + verbosity=CallVerbosity.DEBUG) if not code and \ err.startswith('Keepalived '): version = err.split(' ')[1] @@ -4583,16 +4719,28 @@ def list_daemons(ctx, detail=True, legacy_dir=None): else: logger.warning('version for unknown daemon type %s' % daemon_type) else: - vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore + vfile = os.path.join(data_dir, fsid, j, 'unit.image') # type: ignore try: with open(vfile, 'r') as f: image_name = f.read().strip() or None except IOError: pass + + # unit.meta? + mfile = os.path.join(data_dir, fsid, j, 'unit.meta') # type: ignore + try: + with open(mfile, 'r') as f: + meta = json.loads(f.read()) + val.update(meta) + except IOError: + pass + val['container_id'] = container_id val['container_image_name'] = image_name val['container_image_id'] = image_id val['container_image_digests'] = image_digests + if container_id: + val['memory_usage'] = seen_memusage.get(container_id[0:seen_memusage_cid_len]) val['version'] = version val['started'] = start_stamp val['created'] = get_file_timestamp( @@ -4619,9 +4767,9 @@ def get_daemon_description(ctx, fsid, name, detail=False, legacy_dir=None): return d raise Error('Daemon not found: {}. See `cephadm ls`'.format(name)) - ################################## + @default_image def command_adopt(ctx): # type: (CephadmContext) -> None @@ -4643,12 +4791,12 @@ def command_adopt(ctx): legacy_dir=ctx.legacy_dir) if not fsid: raise Error('could not detect legacy fsid; set fsid in ceph.conf') - l = FileLock(ctx, fsid) - l.acquire() + lock = FileLock(ctx, fsid) + lock.acquire() # call correct adoption if daemon_type in Ceph.daemons: - command_adopt_ceph(ctx, daemon_type, daemon_id, fsid); + command_adopt_ceph(ctx, daemon_type, daemon_id, fsid) elif daemon_type == 'prometheus': command_adopt_prometheus(ctx, daemon_id, fsid) elif daemon_type == 'grafana': @@ -4677,7 +4825,7 @@ class AdoptOsd(object): try: with open(path, 'r') as f: osd_fsid = f.read().strip() - logger.info("Found online OSD at %s" % path) + logger.info('Found online OSD at %s' % path) except IOError: logger.info('Unable to read OSD fsid from %s' % path) if os.path.exists(os.path.join(self.osd_data_dir, 'type')): @@ -4704,7 +4852,7 @@ class AdoptOsd(object): try: js = json.loads(out) if self.osd_id in js: - logger.info("Found offline LVM OSD {}".format(self.osd_id)) + logger.info('Found offline LVM OSD {}'.format(self.osd_id)) osd_fsid = js[self.osd_id][0]['tags']['ceph.osd_fsid'] for device in js[self.osd_id]: if device['tags']['ceph.type'] == 'block': @@ -4714,7 +4862,7 @@ class AdoptOsd(object): osd_type = 'filestore' break except ValueError as e: - logger.info("Invalid JSON in ceph-volume lvm list: {}".format(e)) + logger.info('Invalid JSON in ceph-volume lvm list: {}'.format(e)) return osd_fsid, osd_type @@ -4722,20 +4870,20 @@ class AdoptOsd(object): # type: () -> Tuple[Optional[str], Optional[str]] osd_fsid, osd_type = None, None - osd_file = glob("/etc/ceph/osd/{}-[a-f0-9-]*.json".format(self.osd_id)) + osd_file = glob('/etc/ceph/osd/{}-[a-f0-9-]*.json'.format(self.osd_id)) if len(osd_file) == 1: with open(osd_file[0], 'r') as f: try: js = json.loads(f.read()) - logger.info("Found offline simple OSD {}".format(self.osd_id)) - osd_fsid = js["fsid"] - osd_type = js["type"] - if osd_type != "filestore": + logger.info('Found offline simple OSD {}'.format(self.osd_id)) + osd_fsid = js['fsid'] + osd_type = js['type'] + if osd_type != 'filestore': # need this to be mounted for the adopt to work, as it # needs to move files from this directory - call_throws(self.ctx, ['mount', js["data"]["path"], self.osd_data_dir]) + call_throws(self.ctx, ['mount', js['data']['path'], self.osd_data_dir]) except ValueError as e: - logger.info("Invalid JSON in {}: {}".format(osd_file, e)) + logger.info('Invalid JSON in {}: {}'.format(osd_file, e)) return osd_fsid, osd_type @@ -4751,8 +4899,8 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): if not os.path.exists(data_dir_src): raise Error("{}.{} data directory '{}' does not exist. " - "Incorrect ID specified, or daemon alrady adopted?".format( - daemon_type, daemon_id, data_dir_src)) + 'Incorrect ID specified, or daemon already adopted?'.format( + daemon_type, daemon_id, data_dir_src)) osd_fsid = None if daemon_type == 'osd': @@ -4788,7 +4936,7 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): move_files(ctx, glob(os.path.join(data_dir_src, '*')), data_dir_dst, uid=uid, gid=gid) - logger.debug('Remove dir \'%s\'' % (data_dir_src)) + logger.debug('Remove dir `%s`' % (data_dir_src)) if os.path.ismount(data_dir_src): call_throws(ctx, ['umount', data_dir_src]) os.rmdir(data_dir_src) @@ -4826,13 +4974,13 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): os.rename(simple_fn, new_fn) logger.info('Disabling host unit ceph-volume@ simple unit...') call(ctx, ['systemctl', 'disable', - 'ceph-volume@simple-%s-%s.service' % (daemon_id, osd_fsid)]) + 'ceph-volume@simple-%s-%s.service' % (daemon_id, osd_fsid)]) else: # assume this is an 'lvm' c-v for now, but don't error # out if it's not. logger.info('Disabling host unit ceph-volume@ lvm unit...') call(ctx, ['systemctl', 'disable', - 'ceph-volume@lvm-%s-%s.service' % (daemon_id, osd_fsid)]) + 'ceph-volume@lvm-%s-%s.service' % (daemon_id, osd_fsid)]) # config config_src = '/etc/ceph/%s.conf' % (ctx.cluster) @@ -4843,7 +4991,7 @@ def command_adopt_ceph(ctx, daemon_type, daemon_id, fsid): # logs logger.info('Moving logs...') log_dir_src = ('/var/log/ceph/%s-%s.%s.log*' % - (ctx.cluster, daemon_type, daemon_id)) + (ctx.cluster, daemon_type, daemon_id)) log_dir_src = os.path.abspath(ctx.legacy_dir + log_dir_src) log_dir_dst = make_log_dir(ctx, fsid, uid=uid, gid=gid) move_files(ctx, glob(log_dir_src), @@ -4868,7 +5016,7 @@ def command_adopt_prometheus(ctx, daemon_id, fsid): _stop_and_disable(ctx, 'prometheus') data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + uid=uid, gid=gid) # config config_src = '/etc/prometheus/prometheus.yml' @@ -4898,7 +5046,7 @@ def command_adopt_grafana(ctx, daemon_id, fsid): _stop_and_disable(ctx, 'grafana-server') data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + uid=uid, gid=gid) # config config_src = '/etc/grafana/grafana.ini' @@ -4929,7 +5077,7 @@ def command_adopt_grafana(ctx, daemon_id, fsid): _adjust_grafana_ini(os.path.join(config_dst, 'grafana.ini')) else: - logger.debug("Skipping ssl, missing cert {} or key {}".format(cert, key)) + logger.debug('Skipping ssl, missing cert {} or key {}'.format(cert, key)) # data - possible custom dashboards/plugins data_src = '/var/lib/grafana/' @@ -4952,7 +5100,7 @@ def command_adopt_alertmanager(ctx, daemon_id, fsid): _stop_and_disable(ctx, 'prometheus-alertmanager') data_dir_dst = make_data_dir(ctx, fsid, daemon_type, daemon_id, - uid=uid, gid=gid) + uid=uid, gid=gid) # config config_src = '/etc/prometheus/alertmanager.yml' @@ -4979,24 +5127,24 @@ def _adjust_grafana_ini(filename): # Update cert_file, cert_key pathnames in server section # ConfigParser does not preserve comments try: - with open(filename, "r") as grafana_ini: + with open(filename, 'r') as grafana_ini: lines = grafana_ini.readlines() - with open("{}.new".format(filename), "w") as grafana_ini: - server_section=False + with open('{}.new'.format(filename), 'w') as grafana_ini: + server_section = False for line in lines: if line.startswith('['): - server_section=False + server_section = False if line.startswith('[server]'): - server_section=True + server_section = True if server_section: line = re.sub(r'^cert_file.*', - 'cert_file = /etc/grafana/certs/cert_file', line) + 'cert_file = /etc/grafana/certs/cert_file', line) line = re.sub(r'^cert_key.*', - 'cert_key = /etc/grafana/certs/cert_key', line) + 'cert_key = /etc/grafana/certs/cert_key', line) grafana_ini.write(line) - os.rename("{}.new".format(filename), filename) + os.rename('{}.new'.format(filename), filename) except OSError as err: - raise Error("Cannot update {}: {}".format(filename, err)) + raise Error('Cannot update {}: {}'.format(filename, err)) def _stop_and_disable(ctx, unit_name): @@ -5010,19 +5158,20 @@ def _stop_and_disable(ctx, unit_name): logger.info('Disabling old systemd unit %s...' % unit_name) call_throws(ctx, ['systemctl', 'disable', unit_name]) - ################################## + def command_rm_daemon(ctx): # type: (CephadmContext) -> None - l = FileLock(ctx, ctx.fsid) - l.acquire() + lock = FileLock(ctx, ctx.fsid) + lock.acquire() + (daemon_type, daemon_id) = ctx.name.split('.', 1) unit_name = get_unit_name_by_daemon_name(ctx, ctx.fsid, ctx.name) if daemon_type in ['mon', 'osd'] and not ctx.force: raise Error('must pass --force to proceed: ' - 'this command may destroy precious data!') + 'this command may destroy precious data!') call(ctx, ['systemctl', 'stop', unit_name], verbosity=CallVerbosity.DEBUG) @@ -5053,10 +5202,10 @@ def command_rm_cluster(ctx): # type: (CephadmContext) -> None if not ctx.force: raise Error('must pass --force to proceed: ' - 'this command may destroy precious data!') + 'this command may destroy precious data!') - l = FileLock(ctx, ctx.fsid) - l.acquire() + lock = FileLock(ctx, ctx.fsid) + lock.acquire() # stop + disable individual daemon units for d in list_daemons(ctx, detail=False): @@ -5081,24 +5230,23 @@ def command_rm_cluster(ctx): call(ctx, ['systemctl', 'disable', unit_name], verbosity=CallVerbosity.DEBUG) - slice_name = 'system-%s.slice' % (('ceph-%s' % ctx.fsid).replace('-', - '\\x2d')) + slice_name = 'system-%s.slice' % (('ceph-%s' % ctx.fsid).replace('-', '\\x2d')) call(ctx, ['systemctl', 'stop', slice_name], verbosity=CallVerbosity.DEBUG) # rm units - call_throws(ctx, ['rm', '-f', ctx.unit_dir + - '/ceph-%s@.service' % ctx.fsid]) - call_throws(ctx, ['rm', '-f', ctx.unit_dir + - '/ceph-%s.target' % ctx.fsid]) + call_throws(ctx, ['rm', '-f', ctx.unit_dir + # noqa: W504 + '/ceph-%s@.service' % ctx.fsid]) + call_throws(ctx, ['rm', '-f', ctx.unit_dir + # noqa: W504 + '/ceph-%s.target' % ctx.fsid]) call_throws(ctx, ['rm', '-rf', - ctx.unit_dir + '/ceph-%s.target.wants' % ctx.fsid]) + ctx.unit_dir + '/ceph-%s.target.wants' % ctx.fsid]) # rm data call_throws(ctx, ['rm', '-rf', ctx.data_dir + '/' + ctx.fsid]) # rm logs call_throws(ctx, ['rm', '-rf', ctx.log_dir + '/' + ctx.fsid]) - call_throws(ctx, ['rm', '-rf', ctx.log_dir + - '/*.wants/ceph-%s@*' % ctx.fsid]) + call_throws(ctx, ['rm', '-rf', ctx.log_dir + # noqa: W504 + '/*.wants/ceph-%s@*' % ctx.fsid]) # rm logrotate config call_throws(ctx, ['rm', '-f', ctx.logrotate_dir + '/ceph-%s' % ctx.fsid]) @@ -5115,16 +5263,16 @@ def command_rm_cluster(ctx): if os.path.exists(files[n]): os.remove(files[n]) - ################################## + def check_time_sync(ctx, enabler=None): # type: (CephadmContext, Optional[Packager]) -> bool units = [ 'chrony.service', # 18.04 (at least) - 'chronyd.service', # el / opensuse + 'chronyd.service', # el / opensuse 'systemd-timesyncd.service', - 'ntpd.service', # el7 (at least) + 'ntpd.service', # el7 (at least) 'ntp.service', # 18.04 (at least) 'ntpsec.service', # 20.04 (at least) / buster ] @@ -5151,21 +5299,21 @@ def command_check_host(ctx: CephadmContext) -> None: find_program(command) logger.info('%s is present' % command) except ValueError: - errors.append('ERROR: %s binary does not appear to be installed' % command) + errors.append('%s binary does not appear to be installed' % command) # check for configured+running chronyd or ntp if not check_time_sync(ctx): - errors.append('ERROR: No time synchronization is active') + errors.append('No time synchronization is active') - if "expect_hostname" in ctx and ctx.expect_hostname: + if 'expect_hostname' in ctx and ctx.expect_hostname: if get_hostname().lower() != ctx.expect_hostname.lower(): - errors.append('ERROR: hostname "%s" does not match expected hostname "%s"' % ( + errors.append('hostname "%s" does not match expected hostname "%s"' % ( get_hostname(), ctx.expect_hostname)) logger.info('Hostname "%s" matches what is expected.', ctx.expect_hostname) if errors: - raise Error('\n'.join(errors)) + raise Error('\nERROR: '.join(errors)) logger.info('Host looks OK') @@ -5173,8 +5321,6 @@ def command_check_host(ctx: CephadmContext) -> None: def command_prepare_host(ctx: CephadmContext) -> None: - container_path = ctx.container_path - logger.info('Verifying podman|docker is present...') pkg = None try: @@ -5200,7 +5346,7 @@ def command_prepare_host(ctx: CephadmContext) -> None: # the service check_time_sync(ctx, enabler=pkg) - if "expect_hostname" in ctx and ctx.expect_hostname and ctx.expect_hostname != get_hostname(): + if 'expect_hostname' in ctx and ctx.expect_hostname and ctx.expect_hostname != get_hostname(): logger.warning('Adjusting hostname from %s -> %s...' % (get_hostname(), ctx.expect_hostname)) call_throws(ctx, ['hostname', ctx.expect_hostname]) with open('/etc/hostname', 'w') as f: @@ -5219,16 +5365,16 @@ class CustomValidation(argparse.Action): (daemon_type, daemon_id) = values.split('.', 1) except ValueError: raise argparse.ArgumentError(self, - "must be of the format .. For example, osd.1 or prometheus.myhost.com") + 'must be of the format .. For example, osd.1 or prometheus.myhost.com') daemons = get_supported_daemons() if daemon_type not in daemons: raise argparse.ArgumentError(self, - "name must declare the type of daemon e.g. " - "{}".format(', '.join(daemons))) + 'name must declare the type of daemon e.g. ' + '{}'.format(', '.join(daemons))) def __call__(self, parser, namespace, values, option_string=None): - if self.dest == "name": + if self.dest == 'name': self._check_name(values) setattr(namespace, self.dest, values) elif self.dest == 'exporter_config': @@ -5399,7 +5545,7 @@ class Apt(Packager): logger.info('Attempting podman install...') try: self.install(['podman']) - except Error as e: + except Error: logger.info('Podman did not work. Falling back to docker...') self.install(['docker.io']) @@ -5682,7 +5828,7 @@ def create_packager(ctx: CephadmContext, if distro in YumDnf.DISTRO_NAMES: return YumDnf(ctx, stable=stable, version=version, branch=branch, commit=commit, - distro=distro, distro_version=distro_version) + distro=distro, distro_version=distro_version) elif distro in Apt.DISTRO_NAMES: return Apt(ctx, stable=stable, version=version, branch=branch, commit=commit, @@ -5703,7 +5849,7 @@ def command_add_repo(ctx: CephadmContext): if ctx.version: try: (x, y, z) = ctx.version.split('.') - except Exception as e: + except Exception: raise Error('version must be in the form x.y.z (e.g., 15.2.0)') pkg = create_packager(ctx, stable=ctx.release, @@ -5724,16 +5870,17 @@ def command_install(ctx: CephadmContext): ################################## + def get_ipv4_address(ifname): # type: (str) -> str def _extract(sock, offset): return socket.inet_ntop( - socket.AF_INET, - fcntl.ioctl( - sock.fileno(), - offset, - struct.pack('256s', bytes(ifname[:15], 'utf-8')) - )[20:24]) + socket.AF_INET, + fcntl.ioctl( + sock.fileno(), + offset, + struct.pack('256s', bytes(ifname[:15], 'utf-8')) + )[20:24]) s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) try: @@ -5761,10 +5908,10 @@ def get_ipv6_address(ifname): field = iface_setting.split() if field[-1] == ifname: ipv6_raw = field[0] - ipv6_fmtd = ":".join([ipv6_raw[_p:_p+4] for _p in range(0, len(field[0]),4)]) + ipv6_fmtd = ':'.join([ipv6_raw[_p:_p + 4] for _p in range(0, len(field[0]), 4)]) # apply naming rules using ipaddress module ipv6 = ipaddress.ip_address(ipv6_fmtd) - return "{}/{}".format(str(ipv6), int('0x{}'.format(field[2]), 16)) + return '{}/{}'.format(str(ipv6), int('0x{}'.format(field[2]), 16)) return '' @@ -5778,18 +5925,18 @@ def bytes_to_human(num, mode='decimal'): """ unit_list = ['', 'KB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB'] divisor = 1000.0 - yotta = "YB" + yotta = 'YB' if mode == 'binary': unit_list = ['', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB'] divisor = 1024.0 - yotta = "YiB" + yotta = 'YiB' for unit in unit_list: if abs(num) < divisor: - return "%3.1f%s" % (num, unit) + return '%3.1f%s' % (num, unit) num /= divisor - return "%.1f%s" % (num, yotta) + return '%.1f%s' % (num, yotta) def read_file(path_list, file_name=''): @@ -5812,20 +5959,21 @@ def read_file(path_list, file_name=''): except OSError: # sysfs may populate the file, but for devices like # virtio reads can fail - return "Unknown" + return 'Unknown' else: return content - return "Unknown" - + return 'Unknown' ################################## + + class HostFacts(): _dmi_path_list = ['/sys/class/dmi/id'] _nic_path_list = ['/sys/class/net'] _selinux_path_list = ['/etc/selinux/config'] _apparmor_path_list = ['/etc/apparmor'] _disk_vendor_workarounds = { - "0x1af4": "Virtio Block Device" + '0x1af4': 'Virtio Block Device' } def __init__(self, ctx: CephadmContext): @@ -5850,14 +5998,14 @@ class HostFacts(): cpu_set = set() for line in output: - field = [l.strip() for l in line.split(':')] - if "model name" in line: + field = [f.strip() for f in line.split(':')] + if 'model name' in line: self.cpu_model = field[1] - if "physical id" in line: + if 'physical id' in line: cpu_set.add(field[1]) - if "siblings" in line: + if 'siblings' in line: self.cpu_threads = int(field[1].strip()) - if "cpu cores" in line: + if 'cpu cores' in line: self.cpu_cores = int(field[1].strip()) pass self.cpu_count = len(cpu_set) @@ -5889,14 +6037,14 @@ class HostFacts(): rel_dict = dict() for line in os_release: - if "=" in line: + if '=' in line: var_name, var_value = line.split('=') rel_dict[var_name] = var_value.strip('"') # Would normally use PRETTY_NAME, but NAME and VERSION are more # consistent - if all(_v in rel_dict for _v in ["NAME", "VERSION"]): - rel_str = "{} {}".format(rel_dict['NAME'], rel_dict['VERSION']) + if all(_v in rel_dict for _v in ['NAME', 'VERSION']): + rel_str = '{} {}'.format(rel_dict['NAME'], rel_dict['VERSION']) return rel_str @property @@ -5916,15 +6064,15 @@ class HostFacts(): if os.path.exists(entitlements_dir): pems = glob('{}/*.pem'.format(entitlements_dir)) if len(pems) >= 2: - return "Yes" + return 'Yes' - return "No" + return 'No' os_name = self.operating_system - if os_name.upper().startswith("RED HAT"): + if os_name.upper().startswith('RED HAT'): return _red_hat() - return "Unknown" + return 'Unknown' @property def hdd_count(self): @@ -5963,15 +6111,14 @@ class HostFacts(): disk_vendor = HostFacts._disk_vendor_workarounds.get(vendor, vendor) disk_size_bytes = self._get_capacity(dev) disk_list.append({ - "description": "{} {} ({})".format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)), - "vendor": disk_vendor, - "model": disk_model, - "rev": disk_rev, - "wwid": disk_wwid, - "dev_name": dev, - "disk_size_bytes": disk_size_bytes, - } - ) + 'description': '{} {} ({})'.format(disk_vendor, disk_model, bytes_to_human(disk_size_bytes)), + 'vendor': disk_vendor, + 'model': disk_model, + 'rev': disk_rev, + 'wwid': disk_wwid, + 'dev_name': dev, + 'disk_size_bytes': disk_size_bytes, + }) return disk_list @property @@ -6007,9 +6154,9 @@ class HostFacts(): raw = read_file(['/proc/loadavg']).strip() data = raw.split() return { - "1min": float(data[0]), - "5min": float(data[1]), - "15min": float(data[2]), + '1min': float(data[0]), + '5min': float(data[1]), + '15min': float(data[2]), } @property @@ -6035,9 +6182,9 @@ class HostFacts(): """Look at the NIC devices and extract network related metadata""" # from https://github.com/torvalds/linux/blob/master/include/uapi/linux/if_arp.h hw_lookup = { - "1": "ethernet", - "32": "infiniband", - "772": "loopback", + '1': 'ethernet', + '32': 'infiniband', + '772': 'loopback', } for nic_path in HostFacts._nic_path_list: @@ -6045,8 +6192,8 @@ class HostFacts(): continue for iface in os.listdir(nic_path): - lower_devs_list = [os.path.basename(link.replace("lower_", "")) for link in glob(os.path.join(nic_path, iface, "lower_*"))] - upper_devs_list = [os.path.basename(link.replace("upper_", "")) for link in glob(os.path.join(nic_path, iface, "upper_*"))] + lower_devs_list = [os.path.basename(link.replace('lower_', '')) for link in glob(os.path.join(nic_path, iface, 'lower_*'))] + upper_devs_list = [os.path.basename(link.replace('upper_', '')) for link in glob(os.path.join(nic_path, iface, 'upper_*'))] try: mtu = int(read_file([os.path.join(nic_path, iface, 'mtu')])) @@ -6064,19 +6211,18 @@ class HostFacts(): speed = -1 if os.path.exists(os.path.join(nic_path, iface, 'bridge')): - nic_type = "bridge" + nic_type = 'bridge' elif os.path.exists(os.path.join(nic_path, iface, 'bonding')): - nic_type = "bonding" + nic_type = 'bonding' else: - nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), "Unknown") + nic_type = hw_lookup.get(read_file([os.path.join(nic_path, iface, 'type')]), 'Unknown') dev_link = os.path.join(nic_path, iface, 'device') if os.path.exists(dev_link): iftype = 'physical' driver_path = os.path.join(dev_link, 'driver') if os.path.exists(driver_path): - driver = os.path.basename( - os.path.realpath(driver_path)) + driver = os.path.basename(os.path.realpath(driver_path)) else: driver = 'Unknown' @@ -6085,16 +6231,16 @@ class HostFacts(): driver = '' self.interfaces[iface] = { - "mtu": mtu, - "upper_devs_list": upper_devs_list, - "lower_devs_list": lower_devs_list, - "operstate": operstate, - "iftype": iftype, - "nic_type": nic_type, - "driver": driver, - "speed": speed, - "ipv4_address": get_ipv4_address(iface), - "ipv6_address": get_ipv6_address(iface), + 'mtu': mtu, + 'upper_devs_list': upper_devs_list, + 'lower_devs_list': lower_devs_list, + 'operstate': operstate, + 'iftype': iftype, + 'nic_type': nic_type, + 'driver': driver, + 'speed': speed, + 'ipv4_address': get_ipv4_address(iface), + 'ipv6_address': get_ipv6_address(iface), } @property @@ -6103,11 +6249,10 @@ class HostFacts(): """Return a total count of all physical NICs detected in the host""" phys_devs = [] for iface in self.interfaces: - if self.interfaces[iface]["iftype"] == 'physical': + if self.interfaces[iface]['iftype'] == 'physical': phys_devs.append(iface) return len(phys_devs) - def _get_mem_data(self, field_name): # type: (str) -> int for line in self._meminfo: @@ -6138,30 +6283,30 @@ class HostFacts(): def vendor(self): # type: () -> str """Determine server vendor from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, "sys_vendor") + return read_file(HostFacts._dmi_path_list, 'sys_vendor') @property def model(self): # type: () -> str """Determine server model information from DMI data in sysfs""" - family = read_file(HostFacts._dmi_path_list, "product_family") - product = read_file(HostFacts._dmi_path_list, "product_name") + family = read_file(HostFacts._dmi_path_list, 'product_family') + product = read_file(HostFacts._dmi_path_list, 'product_name') if family == 'Unknown' and product: - return "{}".format(product) + return '{}'.format(product) - return "{} ({})".format(family, product) + return '{} ({})'.format(family, product) @property def bios_version(self): # type: () -> str """Determine server BIOS version from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, "bios_version") + return read_file(HostFacts._dmi_path_list, 'bios_version') @property def bios_date(self): # type: () -> str """Determine server BIOS date from DMI data in sysfs""" - return read_file(HostFacts._dmi_path_list, "bios_date") + return read_file(HostFacts._dmi_path_list, 'bios_date') @property def timestamp(self): @@ -6193,10 +6338,10 @@ class HostFacts(): continue k, v = line.split('=') security[k] = v - if security['SELINUX'].lower() == "disabled": - security['description'] = "SELinux: Disabled" + if security['SELINUX'].lower() == 'disabled': + security['description'] = 'SELinux: Disabled' else: - security['description'] = "SELinux: Enabled({}, {})".format(security['SELINUX'], security['SELINUXTYPE']) + security['description'] = 'SELinux: Enabled({}, {})'.format(security['SELINUX'], security['SELINUXTYPE']) return security return {} @@ -6205,8 +6350,8 @@ class HostFacts(): security = {} for apparmor_path in HostFacts._apparmor_path_list: if os.path.exists(apparmor_path): - security['type'] = "AppArmor" - security['description'] = "AppArmor: Enabled" + security['type'] = 'AppArmor' + security['description'] = 'AppArmor: Enabled' try: profiles = read_file(['/sys/kernel/security/apparmor/profiles']) except OSError: @@ -6215,14 +6360,14 @@ class HostFacts(): summary = {} # type: Dict[str, int] for line in profiles.split('\n'): item, mode = line.split(' ') - mode= mode.strip('()') + mode = mode.strip('()') if mode in summary: summary[mode] += 1 else: summary[mode] = 0 - summary_str = ",".join(["{} {}".format(v, k) for k, v in summary.items()]) - security = {**security, **summary} # type: ignore - security['description'] += "({})".format(summary_str) + summary_str = ','.join(['{} {}'.format(v, k) for k, v in summary.items()]) + security = {**security, **summary} # type: ignore + security['description'] += '({})'.format(summary_str) return security return {} @@ -6236,22 +6381,22 @@ class HostFacts(): ret = _fetch_apparmor() else: return { - "type": "Unknown", - "description": "Linux Security Module framework is active, but is not using SELinux or AppArmor" + 'type': 'Unknown', + 'description': 'Linux Security Module framework is active, but is not using SELinux or AppArmor' } if ret: return ret return { - "type": "None", - "description": "Linux Security Module framework is not available" + 'type': 'None', + 'description': 'Linux Security Module framework is not available' } @property def selinux_enabled(self): - return (self.kernel_security["type"] == "SELinux") and \ - (self.kernel_security["description"] != "SELinux: Disabled") + return (self.kernel_security['type'] == 'SELinux') and \ + (self.kernel_security['description'] != 'SELinux: Disabled') @property def kernel_parameters(self): @@ -6262,7 +6407,7 @@ class HostFacts(): out, _, _ = call_throws(self.ctx, ['sysctl', '-a'], verbosity=CallVerbosity.SILENT) if out: param_list = out.split('\n') - param_dict = { param.split(" = ")[0]:param.split(" = ")[-1] for param in param_list} + param_dict = {param.split(' = ')[0]: param.split(' = ')[-1] for param in param_list} # return only desired parameters if 'net.ipv4.ip_nonlocal_bind' in param_dict: @@ -6273,29 +6418,30 @@ class HostFacts(): def dump(self): # type: () -> str """Return the attributes of this HostFacts object as json""" - data = {k: getattr(self, k) for k in dir(self) - if not k.startswith('_') and - isinstance(getattr(self, k), - (float, int, str, list, dict, tuple)) + data = { + k: getattr(self, k) for k in dir(self) + if not k.startswith('_') + and isinstance(getattr(self, k), (float, int, str, list, dict, tuple)) } return json.dumps(data, indent=2, sort_keys=True) ################################## + def command_gather_facts(ctx: CephadmContext): """gather_facts is intended to provide host releated metadata to the caller""" host = HostFacts(ctx) print(host.dump()) - ################################## + def command_verify_prereqs(ctx: CephadmContext): if ctx.service_type == 'haproxy' or ctx.service_type == 'keepalived': out, err, code = call( ctx, ['sysctl', '-n', 'net.ipv4.ip_nonlocal_bind'] ) - if out.strip() != "1": + if out.strip() != '1': raise Error('net.ipv4.ip_nonlocal_bind not set to 1') ################################## @@ -6307,10 +6453,10 @@ class CephadmCache: def __init__(self): self.started_epoch_secs = time.time() self.tasks = { - "daemons": "inactive", - "disks": "inactive", - "host": "inactive", - "http_server": "inactive", + 'daemons': 'inactive', + 'disks': 'inactive', + 'host': 'inactive', + 'http_server': 'inactive', } self.errors = [] self.disks = {} @@ -6321,17 +6467,17 @@ class CephadmCache: @property def health(self): return { - "started_epoch_secs": self.started_epoch_secs, - "tasks": self.tasks, - "errors": self.errors, + 'started_epoch_secs': self.started_epoch_secs, + 'tasks': self.tasks, + 'errors': self.errors, } def to_json(self): return { - "health": self.health, - "host": self.host, - "daemons": self.daemons, - "disks": self.disks, + 'health': self.health, + 'host': self.host, + 'daemons': self.daemons, + 'disks': self.disks, } def update_health(self, task_type, task_status, error_msg=None): @@ -6358,6 +6504,7 @@ class CephadmHTTPServer(ThreadingMixIn, HTTPServer): cephadm_cache: CephadmCache token: str + class CephadmDaemonHandler(BaseHTTPRequestHandler): server: CephadmHTTPServer api_version = 'v1' @@ -6378,8 +6525,8 @@ class CephadmDaemonHandler(BaseHTTPRequestHandler): ensure we only respond to callers who know our token i.e. mgr """ def wrapper(self, *args, **kwargs): - auth = self.headers.get("Authorization", None) - if auth != "Bearer " + self.server.token: + auth = self.headers.get('Authorization', None) + if auth != 'Bearer ' + self.server.token: self.send_error(401) return f(self, *args, **kwargs) @@ -6475,30 +6622,29 @@ td,th {{ data = json.dumps(self.server.cephadm_cache.health) self.send_response(status_code) - self.send_header('Content-type','application/json') + self.send_header('Content-type', 'application/json') self.end_headers() self.wfile.write(data.encode('utf-8')) else: # Invalid GET URL - bad_request_msg = "Valid URLs are: {}".format(', '.join(CephadmDaemonHandler.valid_routes)) + bad_request_msg = 'Valid URLs are: {}'.format(', '.join(CephadmDaemonHandler.valid_routes)) self.send_response(404, message=bad_request_msg) # reason - self.send_header('Content-type','application/json') + self.send_header('Content-type', 'application/json') self.end_headers() - self.wfile.write(json.dumps({"message": bad_request_msg}).encode('utf-8')) + self.wfile.write(json.dumps({'message': bad_request_msg}).encode('utf-8')) def log_message(self, format, *args): - rqst = " ".join(str(a) for a in args) - logger.info(f"client:{self.address_string()} [{self.log_date_time_string()}] {rqst}") + rqst = ' '.join(str(a) for a in args) + logger.info(f'client:{self.address_string()} [{self.log_date_time_string()}] {rqst}') class CephadmDaemon(): - daemon_type = "cephadm-exporter" + daemon_type = 'cephadm-exporter' default_port = 9443 - bin_name = 'cephadm' - key_name = "key" - crt_name = "crt" - token_name = "token" + key_name = 'key' + crt_name = 'crt' + token_name = 'token' config_requirements = [ key_name, crt_name, @@ -6524,23 +6670,23 @@ class CephadmDaemon(): @classmethod def validate_config(cls, config): - reqs = ", ".join(CephadmDaemon.config_requirements) + reqs = ', '.join(CephadmDaemon.config_requirements) errors = [] if not config or not all([k_name in config for k_name in CephadmDaemon.config_requirements]): - raise Error(f"config must contain the following fields : {reqs}") + raise Error(f'config must contain the following fields : {reqs}') if not all([isinstance(config[k_name], str) for k_name in CephadmDaemon.config_requirements]): - errors.append(f"the following fields must be strings: {reqs}") + errors.append(f'the following fields must be strings: {reqs}') crt = config[CephadmDaemon.crt_name] key = config[CephadmDaemon.key_name] token = config[CephadmDaemon.token_name] if not crt.startswith('-----BEGIN CERTIFICATE-----') or not crt.endswith('-----END CERTIFICATE-----\n'): - errors.append("crt field is not a valid SSL certificate") + errors.append('crt field is not a valid SSL certificate') if not key.startswith('-----BEGIN PRIVATE KEY-----') or not key.endswith('-----END PRIVATE KEY-----\n'): - errors.append("key is not a valid SSL private key") + errors.append('key is not a valid SSL private key') if len(token) < 8: errors.append("'token' must be more than 8 characters long") @@ -6550,10 +6696,10 @@ class CephadmDaemon(): if p <= 1024: raise ValueError except (TypeError, ValueError): - errors.append("port must be an integer > 1024") + errors.append('port must be an integer > 1024') if errors: - raise Error("Parameter errors : {}".format(", ".join(errors))) + raise Error('Parameter errors : {}'.format(', '.join(errors))) @property def port_active(self): @@ -6563,18 +6709,18 @@ class CephadmDaemon(): def can_run(self): # if port is in use if self.port_active: - self.errors.append(f"TCP port {self.port} already in use, unable to bind") + self.errors.append(f'TCP port {self.port} already in use, unable to bind') if not os.path.exists(os.path.join(self.daemon_path, CephadmDaemon.key_name)): self.errors.append(f"Key file '{CephadmDaemon.key_name}' is missing from {self.daemon_path}") if not os.path.exists(os.path.join(self.daemon_path, CephadmDaemon.crt_name)): self.errors.append(f"Certificate file '{CephadmDaemon.crt_name}' is missing from {self.daemon_path}") - if self.token == "Unknown": + if self.token == 'Unknown': self.errors.append(f"Authentication token '{CephadmDaemon.token_name}' is missing from {self.daemon_path}") return len(self.errors) == 0 @staticmethod def _unit_name(fsid, daemon_id): - return "{}.service".format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id)) + return '{}.service'.format(get_unit_name(fsid, CephadmDaemon.daemon_type, daemon_id)) @property def unit_name(self): @@ -6590,14 +6736,12 @@ class CephadmDaemon(): @property def binary_path(self): - return os.path.join( - self.ctx.data_dir, - self.fsid, - CephadmDaemon.bin_name - ) + path = os.path.realpath(__file__) + assert os.path.isfile(path) + return path def _handle_thread_exception(self, exc, thread_type): - e_msg = f"{exc.__class__.__name__} exception: {str(exc)}" + e_msg = f'{exc.__class__.__name__} exception: {str(exc)}' thread_info = getattr(self.cephadm_cache, thread_type) errors = thread_info.get('scrape_errors', []) errors.append(e_msg) @@ -6605,10 +6749,10 @@ class CephadmDaemon(): logger.exception(exc) self.cephadm_cache.update_task( thread_type, - { - "scrape_errors": errors, - "data": None, - } + { + 'scrape_errors': errors, + 'data': None, + } ) def _scrape_host_facts(self, refresh_interval=10): @@ -6622,7 +6766,7 @@ class CephadmDaemon(): if ctr >= refresh_interval: ctr = 0 - logger.debug("executing host-facts scrape") + logger.debug('executing host-facts scrape') errors = [] s_time = time.time() @@ -6636,28 +6780,28 @@ class CephadmDaemon(): try: data = json.loads(facts.dump()) except json.decoder.JSONDecodeError: - errors.append("host-facts provided invalid JSON") + errors.append('host-facts provided invalid JSON') logger.warning(errors[-1]) data = {} self.cephadm_cache.update_task( 'host', { - "scrape_timestamp": s_time, - "scrape_duration_secs": elapsed, - "scrape_errors": errors, - "data": data, + 'scrape_timestamp': s_time, + 'scrape_duration_secs': elapsed, + 'scrape_errors': errors, + 'data': data, } ) - logger.debug(f"completed host-facts scrape - {elapsed}s") + logger.debug(f'completed host-facts scrape - {elapsed}s') time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay - logger.info("host-facts thread stopped") + logger.info('host-facts thread stopped') def _scrape_ceph_volume(self, refresh_interval=15): # we're invoking the ceph_volume command, so we need to set the args that it # expects to use - self.ctx.command = "inventory --format=json".split() + self.ctx.command = 'inventory --format=json'.split() self.ctx.fsid = self.fsid self.ctx.log_output = False @@ -6670,7 +6814,7 @@ class CephadmDaemon(): if ctr >= refresh_interval: ctr = 0 - logger.debug("executing ceph-volume scrape") + logger.debug('executing ceph-volume scrape') errors = [] s_time = time.time() stream = io.StringIO() @@ -6692,27 +6836,27 @@ class CephadmDaemon(): try: data = json.loads(stdout) except json.decoder.JSONDecodeError: - errors.append("ceph-volume thread provided bad json data") + errors.append('ceph-volume thread provided bad json data') logger.warning(errors[-1]) else: - errors.append("ceph-volume didn't return any data") + errors.append('ceph-volume did not return any data') logger.warning(errors[-1]) self.cephadm_cache.update_task( 'disks', { - "scrape_timestamp": s_time, - "scrape_duration_secs": elapsed, - "scrape_errors": errors, - "data": data, + 'scrape_timestamp': s_time, + 'scrape_duration_secs': elapsed, + 'scrape_errors': errors, + 'data': data, } ) - logger.debug(f"completed ceph-volume scrape - {elapsed}s") + logger.debug(f'completed ceph-volume scrape - {elapsed}s') time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay - logger.info("ceph-volume thread stopped") + logger.info('ceph-volume thread stopped') def _scrape_list_daemons(self, refresh_interval=20): ctr = 0 @@ -6723,7 +6867,7 @@ class CephadmDaemon(): if ctr >= refresh_interval: ctr = 0 - logger.debug("executing list-daemons scrape") + logger.debug('executing list-daemons scrape') errors = [] s_time = time.time() @@ -6735,24 +6879,24 @@ class CephadmDaemon(): exception_encountered = True else: if not isinstance(data, list): - errors.append("list-daemons didn't supply a list?") + errors.append('list-daemons did not supply a list?') logger.warning(errors[-1]) data = [] elapsed = time.time() - s_time self.cephadm_cache.update_task( 'daemons', { - "scrape_timestamp": s_time, - "scrape_duration_secs": elapsed, - "scrape_errors": errors, - "data": data, + 'scrape_timestamp': s_time, + 'scrape_duration_secs': elapsed, + 'scrape_errors': errors, + 'data': data, } ) - logger.debug(f"completed list-daemons scrape - {elapsed}s") + logger.debug(f'completed list-daemons scrape - {elapsed}s') time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay - logger.info("list-daemons thread stopped") + logger.info('list-daemons thread stopped') def _create_thread(self, target, name, refresh_interval=None): if refresh_interval: @@ -6761,14 +6905,14 @@ class CephadmDaemon(): t = Thread(target=target) t.daemon = True t.name = name - self.cephadm_cache.update_health(name, "active") + self.cephadm_cache.update_health(name, 'active') t.start() - start_msg = f"Started {name} thread" + start_msg = f'Started {name} thread' if refresh_interval: - logger.info(f"{start_msg}, with a refresh interval of {refresh_interval}s") + logger.info(f'{start_msg}, with a refresh interval of {refresh_interval}s') else: - logger.info(f"{start_msg}") + logger.info(f'{start_msg}') return t def reload(self, *args): @@ -6777,17 +6921,17 @@ class CephadmDaemon(): This is a placeholder function only, and serves to provide the hook that could be exploited later if the exporter evolves to incorporate a config file """ - logger.info("Reload request received - ignoring, no action needed") + logger.info('Reload request received - ignoring, no action needed') def shutdown(self, *args): - logger.info("Shutdown request received") + logger.info('Shutdown request received') self.stop = True self.http_server.shutdown() def run(self): logger.info(f"cephadm exporter starting for FSID '{self.fsid}'") if not self.can_run: - logger.error("Unable to start the exporter daemon") + logger.error('Unable to start the exporter daemon') for e in self.errors: logger.error(e) return @@ -6796,7 +6940,7 @@ class CephadmDaemon(): signal.signal(signal.SIGTERM, self.shutdown) signal.signal(signal.SIGINT, self.shutdown) signal.signal(signal.SIGHUP, self.reload) - logger.debug("Signal handlers attached") + logger.debug('Signal handlers attached') host_facts = self._create_thread(self._scrape_host_facts, 'host', 5) self.workers.append(host_facts) @@ -6816,7 +6960,7 @@ class CephadmDaemon(): self.http_server.cephadm_cache = self.cephadm_cache self.http_server.token = self.token server_thread = self._create_thread(self.http_server.serve_forever, 'http_server') - logger.info(f"https server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}") + logger.info(f'https server listening on {self.http_server.server_address[0]}:{self.http_server.server_port}') ctr = 0 while server_thread.is_alive(): @@ -6829,21 +6973,21 @@ class CephadmDaemon(): if self.cephadm_cache.tasks[worker.name] == 'inactive': continue if not worker.is_alive(): - logger.warning(f"{worker.name} thread not running") - stop_time = datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S") - self.cephadm_cache.update_health(worker.name, "inactive", f"{worker.name} stopped at {stop_time}") + logger.warning(f'{worker.name} thread not running') + stop_time = datetime.datetime.now().strftime('%Y/%m/%d %H:%M:%S') + self.cephadm_cache.update_health(worker.name, 'inactive', f'{worker.name} stopped at {stop_time}') time.sleep(CephadmDaemon.loop_delay) ctr += CephadmDaemon.loop_delay - logger.info("Main http server thread stopped") + logger.info('Main http server thread stopped') @property def unit_run(self): return """set -e {py3} {bin_path} exporter --fsid {fsid} --id {daemon_id} --port {port} &""".format( - py3 = shutil.which('python3'), + py3=shutil.which('python3'), bin_path=self.binary_path, fsid=self.fsid, daemon_id=self.daemon_id, @@ -6852,11 +6996,13 @@ class CephadmDaemon(): @property def unit_file(self): + docker = 'docker' in self.ctx.container_path return """#generated by cephadm [Unit] Description=cephadm exporter service for cluster {fsid} -After=network-online.target +After=network-online.target{docker_after} Wants=network-online.target +{docker_requires} PartOf=ceph-{fsid}.target Before=ceph-{fsid}.target @@ -6870,10 +7016,11 @@ RestartSec=10s [Install] WantedBy=ceph-{fsid}.target -""".format( - fsid=self.fsid, - daemon_path=self.daemon_path -) +""".format(fsid=self.fsid, + daemon_path=self.daemon_path, + # if docker, we depend on docker.service + docker_after=' docker.service' if docker else '', + docker_requires='Requires=docker.service\n' if docker else '') def deploy_daemon_unit(self, config=None): """deploy a specific unit file for cephadm @@ -6883,12 +7030,12 @@ WantedBy=ceph-{fsid}.target simple service definition and add it to the fsid's target """ if not config: - raise Error("Attempting to deploy cephadm daemon without a config") + raise Error('Attempting to deploy cephadm daemon without a config') assert isinstance(config, dict) # Create the required config files in the daemons dir, with restricted permissions for filename in config: - with open(os.open(os.path.join(self.daemon_path, filename), os.O_CREAT | os.O_WRONLY, mode=0o600), "w") as f: + with open(os.open(os.path.join(self.daemon_path, filename), os.O_CREAT | os.O_WRONLY, mode=0o600), 'w') as f: f.write(config[filename]) # When __file__ is we're being invoked over remoto via the orchestrator, so @@ -6898,36 +7045,37 @@ WantedBy=ceph-{fsid}.target shutil.copy(__file__, self.binary_path) - with open(os.path.join(self.daemon_path, 'unit.run'), "w") as f: + with open(os.path.join(self.daemon_path, 'unit.run'), 'w') as f: f.write(self.unit_run) - with open(os.path.join(self.ctx.unit_dir, - f"{self.unit_name}.new"), - "w" + with open( + os.path.join(self.ctx.unit_dir, + f'{self.unit_name}.new'), + 'w' ) as f: f.write(self.unit_file) os.rename( - os.path.join(self.ctx.unit_dir, f"{self.unit_name}.new"), + os.path.join(self.ctx.unit_dir, f'{self.unit_name}.new'), os.path.join(self.ctx.unit_dir, self.unit_name)) call_throws(self.ctx, ['systemctl', 'daemon-reload']) call(self.ctx, ['systemctl', 'stop', self.unit_name], - verbosity=CallVerbosity.DEBUG) + verbosity=CallVerbosity.DEBUG) call(self.ctx, ['systemctl', 'reset-failed', self.unit_name], - verbosity=CallVerbosity.DEBUG) + verbosity=CallVerbosity.DEBUG) call_throws(self.ctx, ['systemctl', 'enable', '--now', self.unit_name]) @classmethod def uninstall(cls, ctx: CephadmContext, fsid, daemon_type, daemon_id): unit_name = CephadmDaemon._unit_name(fsid, daemon_id) unit_path = os.path.join(ctx.unit_dir, unit_name) - unit_run = os.path.join(ctx.data_dir, fsid, f"{daemon_type}.{daemon_id}", "unit.run") + unit_run = os.path.join(ctx.data_dir, fsid, f'{daemon_type}.{daemon_id}', 'unit.run') port = None try: - with open(unit_run, "r") as u: - contents = u.read().strip(" &") + with open(unit_run, 'r') as u: + contents = u.read().strip(' &') except OSError: - logger.warning(f"Unable to access the unit.run file @ {unit_run}") + logger.warning(f'Unable to access the unit.run file @ {unit_run}') return port = None @@ -6936,8 +7084,8 @@ WantedBy=ceph-{fsid}.target try: port = int(line.split('--port ')[-1]) except ValueError: - logger.warning("Unexpected format in unit.run file: port is not numeric") - logger.warning("Unable to remove the systemd file and close the port") + logger.warning('Unexpected format in unit.run file: port is not numeric') + logger.warning('Unable to remove the systemd file and close the port') return break @@ -6946,14 +7094,14 @@ WantedBy=ceph-{fsid}.target try: fw.close_ports([port]) except RuntimeError: - logger.error(f"Unable to close port {port}") + logger.error(f'Unable to close port {port}') - stdout, stderr, rc = call(ctx, ["rm", "-f", unit_path]) + stdout, stderr, rc = call(ctx, ['rm', '-f', unit_path]) if rc: - logger.error(f"Unable to remove the systemd file @ {unit_path}") + logger.error(f'Unable to remove the systemd file @ {unit_path}') else: - logger.info(f"removed systemd unit file @ {unit_path}") - stdout, stderr, rc = call(ctx, ["systemctl", "daemon-reload"]) + logger.info(f'removed systemd unit file @ {unit_path}') + stdout, stderr, rc = call(ctx, ['systemctl', 'daemon-reload']) def command_exporter(ctx: CephadmContext): @@ -6964,15 +7112,15 @@ def command_exporter(ctx: CephadmContext): exporter.run() - ################################## + def systemd_target_state(target_name: str, subsystem: str = 'ceph') -> bool: # TODO: UNITTEST return os.path.exists( os.path.join( UNIT_DIR, - f"{subsystem}.target.wants", + f'{subsystem}.target.wants', target_name ) ) @@ -6983,59 +7131,55 @@ def command_maintenance(ctx: CephadmContext): if not ctx.fsid: raise Error('must pass --fsid to specify cluster') - target = f"ceph-{ctx.fsid}.target" + target = f'ceph-{ctx.fsid}.target' if ctx.maintenance_action.lower() == 'enter': - logger.info("Requested to place host into maintenance") + logger.info('Requested to place host into maintenance') if systemd_target_state(target): _out, _err, code = call(ctx, - ['systemctl', 'disable', target], - verbosity=CallVerbosity.DEBUG - ) + ['systemctl', 'disable', target], + verbosity=CallVerbosity.DEBUG) if code: - logger.error(f"Failed to disable the {target} target") - return "failed - to disable the target" + logger.error(f'Failed to disable the {target} target') + return 'failed - to disable the target' else: # stopping a target waits by default _out, _err, code = call(ctx, - ['systemctl', 'stop', target], - verbosity=CallVerbosity.DEBUG - ) + ['systemctl', 'stop', target], + verbosity=CallVerbosity.DEBUG) if code: - logger.error(f"Failed to stop the {target} target") - return "failed - to disable the target" + logger.error(f'Failed to stop the {target} target') + return 'failed - to disable the target' else: - return f"success - systemd target {target} disabled" + return f'success - systemd target {target} disabled' else: - return "skipped - target already disabled" + return 'skipped - target already disabled' else: - logger.info("Requested to exit maintenance state") + logger.info('Requested to exit maintenance state') # exit maintenance request if not systemd_target_state(target): _out, _err, code = call(ctx, - ['systemctl', 'enable', target], - verbosity=CallVerbosity.DEBUG - ) + ['systemctl', 'enable', target], + verbosity=CallVerbosity.DEBUG) if code: - logger.error(f"Failed to enable the {target} target") - return "failed - unable to enable the target" + logger.error(f'Failed to enable the {target} target') + return 'failed - unable to enable the target' else: # starting a target waits by default _out, _err, code = call(ctx, - ['systemctl', 'start', target], - verbosity=CallVerbosity.DEBUG - ) + ['systemctl', 'start', target], + verbosity=CallVerbosity.DEBUG) if code: - logger.error(f"Failed to start the {target} target") - return "failed - unable to start the target" + logger.error(f'Failed to start the {target} target') + return 'failed - unable to start the target' else: - return f"success - systemd target {target} enabled and started" - + return f'success - systemd target {target} enabled and started' ################################## + def _get_parser(): # type: () -> argparse.ArgumentParser parser = argparse.ArgumentParser( @@ -7084,6 +7228,11 @@ def _get_parser(): action='append', default=[], help='set environment variable') + parser.add_argument( + '--no-container-init', + action='store_true', + default=not CONTAINER_INIT, + help='Do not run podman/docker with `--init`') subparsers = parser.add_subparsers(help='sub-command') @@ -7148,11 +7297,12 @@ def _get_parser(): parser_adopt.add_argument( '--force-start', action='store_true', - help="start newly adoped daemon, even if it wasn't running previously") + help='start newly adoped daemon, even if it was not running previously') parser_adopt.add_argument( '--container-init', action='store_true', - help='Run podman/docker with `--init`') + default=CONTAINER_INIT, + help=argparse.SUPPRESS) parser_rm_daemon = subparsers.add_parser( 'rm-daemon', help='remove daemon instance') @@ -7216,11 +7366,11 @@ def _get_parser(): help='ceph.keyring to pass through to the container') parser_shell.add_argument( '--mount', '-m', - help=("mount a file or directory in the container. " - "Support multiple mounts. " - "ie: `--mount /foo /bar:/bar`. " - "When no destination is passed, default is /mnt"), - nargs='+') + help=('mount a file or directory in the container. ' + 'Support multiple mounts. ' + 'ie: `--mount /foo /bar:/bar`. ' + 'When no destination is passed, default is /mnt'), + nargs='+') parser_shell.add_argument( '--env', '-e', action='append', @@ -7269,7 +7419,7 @@ def _get_parser(): help='command') parser_unit = subparsers.add_parser( - 'unit', help='operate on the daemon\'s systemd unit') + 'unit', help="operate on the daemon's systemd unit") parser_unit.set_defaults(func=command_unit) parser_unit.add_argument( 'command', @@ -7331,7 +7481,7 @@ def _get_parser(): help='location to write conf file to connect to new cluster') parser_bootstrap.add_argument( '--output-pub-ssh-key', - help='location to write the cluster\'s public SSH key') + help="location to write the cluster's public SSH key") parser_bootstrap.add_argument( '--skip-ssh', action='store_true', @@ -7346,7 +7496,7 @@ def _get_parser(): parser_bootstrap.add_argument( '--ssl-dashboard-port', type=int, - default = 8443, + default=8443, help='Port number used to connect with dashboard using SSL') parser_bootstrap.add_argument( '--dashboard-key', @@ -7446,7 +7596,8 @@ def _get_parser(): parser_bootstrap.add_argument( '--container-init', action='store_true', - help='Run podman/docker with `--init`') + default=CONTAINER_INIT, + help=argparse.SUPPRESS) parser_bootstrap.add_argument( '--with-exporter', action='store_true', @@ -7504,7 +7655,20 @@ def _get_parser(): parser_deploy.add_argument( '--container-init', action='store_true', - help='Run podman/docker with `--init`') + default=CONTAINER_INIT, + help=argparse.SUPPRESS) + parser_deploy.add_argument( + '--memory-request', + help='Container memory request/target' + ) + parser_deploy.add_argument( + '--memory-limit', + help='Container memory hard limit' + ) + parser_deploy.add_argument( + '--meta-json', + help='JSON dict of additional metadata' + ) parser_check_host = subparsers.add_parser( 'check-host', help='check host configuration') @@ -7604,10 +7768,10 @@ def _get_parser(): '--fsid', help='cluster FSID') parser_maintenance.add_argument( - "maintenance_action", + 'maintenance_action', type=str, choices=['enter', 'exit'], - help="Maintenance action - enter maintenance, or exit maintenance") + help='Maintenance action - enter maintenance, or exit maintenance') parser_maintenance.set_defaults(func=command_maintenance) parser_verify_prereqs = subparsers.add_parser( @@ -7624,9 +7788,22 @@ def _get_parser(): def _parse_args(av): parser = _get_parser() + args = parser.parse_args(av) - if 'command' in args and args.command and args.command[0] == "--": + if 'command' in args and args.command and args.command[0] == '--': args.command.pop(0) + + # workaround argparse to deprecate the subparser `--container-init` flag + # container_init and no_container_init must always be mutually exclusive + container_init_args = ('--container-init', '--no-container-init') + if set(container_init_args).issubset(av): + parser.error('argument %s: not allowed with argument %s' % (container_init_args)) + elif '--container-init' in av: + args.no_container_init = not args.container_init + else: + args.container_init = not args.no_container_init + assert args.container_init is not args.no_container_init + return args @@ -7651,11 +7828,11 @@ def cephadm_init(args: List[str]) -> Optional[CephadmContext]: if ctx.verbose: for handler in logger.handlers: - if handler.name == "console": + if handler.name == 'console': handler.setLevel(logging.DEBUG) if not ctx.has_function(): - sys.stderr.write("No command specified; pass -h or --help for usage\n") + sys.stderr.write('No command specified; pass -h or --help for usage\n') return None return ctx @@ -7669,13 +7846,10 @@ def main(): sys.exit(1) av: List[str] = [] - try: - av = injected_argv # type: ignore - except NameError: - av = sys.argv[1:] + av = sys.argv[1:] ctx = cephadm_init(av) - if not ctx: # error, exit + if not ctx: # error, exit sys.exit(1) try: @@ -7695,5 +7869,6 @@ def main(): r = 0 sys.exit(r) -if __name__ == "__main__": + +if __name__ == '__main__': main() diff --git a/SPECS/cephadm.spec.in b/SPECS/cephadm.spec.in index 29b92a5..ade3989 100644 --- a/SPECS/cephadm.spec.in +++ b/SPECS/cephadm.spec.in @@ -1,14 +1,14 @@ # Upstream ceph commit upon which this package is based: -# patches_base=ca0faa0a628c041f5b5313060344006830812e25 +# patches_base=3eb70cf622aace689e45749e8a92fce033d3d55c Name: cephadm Epoch: 2 Version: 16.1.0 -Release: 568%{?dist} +Release: 3%{?dist} Summary: Utility to bootstrap Ceph clusters License: LGPL-2.1 URL: https://ceph.io -Source0: https://github.com/ceph/ceph/raw/ca0faa0a628c041f5b5313060344006830812e25/src/cephadm/cephadm +Source0: https://github.com/ceph/ceph/raw/3eb70cf622aace689e45749e8a92fce033d3d55c/src/cephadm/cephadm Source1: COPYING-LGPL2.1 BuildArch: noarch @@ -53,3 +53,6 @@ exit 0 %attr(0600,cephadm,cephadm) %{_sharedstatedir}/cephadm/.ssh/authorized_keys %changelog + +* Thu Mar 25 2021 Francesco Pantano - 2:16.1.0-2 +- 16.1.0-2 GA