# -*- coding: utf-8 -*- # utils.py - a module with support methods for centpkg # # Copyright (C) 2021 Red Hat Inc. # Author(s): Ondrej Nosek # # This program is free software; you can redistribute it and/or modify it # under the terms of the GNU General Public License as published by the # Free Software Foundation; either version 2 of the License, or (at your # option) any later version. See http://www.gnu.org/copyleft/gpl.html for # the full text of the license. import git import json import logging import os import re import requests import sys from collections import namedtuple from datetime import date, datetime from http import HTTPStatus from pyrpkg import rpkgError from requests.exceptions import ConnectionError, HTTPError from configparser import NoOptionError, NoSectionError from urllib.parse import quote_plus, urlparse import yaml import git as gitpython dist_git_config = None # RHEL Product Pages Phase Identifiers pp_phase_name_lookup = dict() # Phase 230 is "Planning / Development / Testing" (AKA DevTestDoc) pp_phase_devtestdoc = 230 pp_phase_name_lookup[pp_phase_devtestdoc] = "DevTestDoc" # Phase 450 is "Stabilization" (AKA Exception Phase) pp_phase_stabilization = 450 pp_phase_name_lookup[pp_phase_stabilization] = "Stabilization" # Phase 500 is "Launch" pp_phase_launch = 500 pp_phase_name_lookup[pp_phase_launch] = "Launch" # Phase 600 is "Maintenance" (AKA Z-stream Phase) pp_phase_maintenance = 600 pp_phase_name_lookup[pp_phase_maintenance] = "Maintenance" # Phase 1000 is "Unsupported" (AKA, end-of-life) pp_phase_unsupported = 1000 pp_phase_name_lookup[pp_phase_unsupported] = "Unsupported" # Default lookup location for unsynced packages default_distrobaker_config = "https://gitlab.cee.redhat.com/osci/distrobaker_config/-/raw/rhel9/distrobaker.yaml?ref_type=heads" rhel_state_nt = namedtuple( "RHELState", [ "latest_version", "target_version", "rule_branch", "phase", "rhel_target_default", "enforcing", "synced", ], ) # Super-class for errors related to internal RHEL infrastructure class RHELError(Exception): pass logger = logging.getLogger(__name__) def do_fork(logger, base_url, token, repo_name, namespace, cli_name): """ Creates a fork of the project. :param logger: A logger object :param base_url: a string of the URL repository :param token: a string of the API token that has rights to make a fork :param repo_name: a string of the repository name :param namespace: a string determines a type of the repository :param cli_name: string of the CLI's name (e.g. centpkg) :return: a tuple consisting of whether the fork needed to be created (bool) and the fork path (string) """ api_url = "{0}/api/v4".format(base_url.rstrip("/")) project_id = quote_plus("redhat/centos-stream/{0}/{1}".format(namespace, repo_name)) fork_url = "{0}/projects/{1}/fork".format(api_url, project_id) headers = { "PRIVATE-TOKEN": token, "Accept": "application/json", "Content-Type": "application/json", } # define a new repository name/path to avoid collision with other projects safe_name = "centos_{0}_{1}".format(namespace, repo_name) payload = json.dumps( { "name": safe_name, # name of the project after forking "path": safe_name, } ) try: rv = requests.post(fork_url, headers=headers, data=payload, timeout=60) except ConnectionError as error: error_msg = ( "The connection to API failed while trying to " "create a new fork. The error was: {0}".format(str(error)) ) raise rpkgError(error_msg) try: # Extract response json for debugging rv_json = rv.json() logger.debug("GitLab API response: '{0}'".format(rv_json)) except Exception: pass if rv.ok: fork_id = rv.json()["id"] try: # Unprotect c9s in fork rv = requests.delete( "{0}/projects/{1}/protected_branches/{2}".format( api_url, fork_id, "c9s" ), headers=headers, ) except ConnectionError as error: error_msg = ( "The connection to API failed while trying to unprotect c9s branch" "in the fork. The error was: {0}".format(str(error)) ) raise rpkgError(error_msg) try: # Reprotect c9s to disable pushes # Only maintainers in gitlab are allowed to push with the following config # In CS, every pkg maintainer is considered as a developer in gitlab data = { "id": fork_id, "name": "c9s", "allowed_to_push": [{"access_level": 40}], "allowed_to_merge": [{"access_level": 40}], } rv = requests.post( "{0}/projects/{1}/protected_branches".format(api_url, fork_id), json=data, headers=headers, ) except ConnectionError as error: error_msg = ( "The connection to API failed while trying to reprotect c9s branch" "in the fork fork. The error was: {0}".format(str(error)) ) raise rpkgError(error_msg) base_error_msg = "The following error occurred while creating a new fork: {0}" if not rv.ok: # fork was already created if rv.status_code == 409 or rv.reason == "Conflict": # When the repo already exists, the return doesn't contain the repo # path or username. Make one more API call to get the username of # the token to construct the repo path. rv = requests.get("{0}/user".format(api_url), headers=headers) username = rv.json()["username"] return False, "{0}/{1}".format(username, safe_name) # show hint for invalid, expired or revoked token elif rv.status_code == 401 or rv.reason == "Unauthorized": base_error_msg += ( "\nFor invalid or expired token refer to " '"{0} fork -h" to set a token in your user ' "configuration.".format(cli_name) ) raise rpkgError(base_error_msg.format(rv.text)) return True, rv_json["path_with_namespace"] def do_add_remote(base_url, remote_base_url, repo, repo_path, remote_name): """ Adds remote tracked repository :param base_url: a string of the URL repository :param remote_base_url: a string of the remote tracked repository :param repo: object, current project git repository :param repo_path: a string of the repository path :param remote_name: a string of the remote name :return: a bool; True if remote was created, False when already exists """ parsed_url = urlparse(remote_base_url) remote_url = "{0}://{1}/{2}.git".format( parsed_url.scheme, parsed_url.netloc, repo_path, ) # check already existing remote for remote in repo.remotes: if remote.name == remote_name: return False try: repo.create_remote(remote_name, url=remote_url) except git.exc.GitCommandError as e: error_msg = "During create remote:\n {0}\n {1}".format( " ".join(e.command), e.stderr ) raise rpkgError(error_msg) return True def config_get_safely(config, section, option): """ Returns option from the user's configuration file. In case of missing section or option method throws an exception with a human-readable warning and a possible hint. The method should be used especially in situations when there are newly added sections/options into the config. In this case, there is a risk that the user's config wasn't properly upgraded. :param config: ConfigParser object :param section: section name in the config :param option: name of the option :return: option value from the right section :rtype: str """ hint = ( "First (if possible), refer to the help of the current command " "(-h/--help).\n" "There also might be a new version of the config after upgrade.\n" "Hint: you can check if you have 'centpkg.conf.rpmnew' or " "'centpkg.conf.rpmsave' in the config directory. If yes, try to merge " "your changes to the config with the maintainer provided version " "(or replace centpkg.conf file with 'centpkg.conf.rpmnew')." ) try: return config.get(section, option) except NoSectionError: msg = "Missing section '{0}' in the config file.".format(section) raise rpkgError("{0}\n{1}".format(msg, hint)) except NoOptionError: msg = "Missing option '{0}' in the section '{1}' of the config file.".format( option, section ) raise rpkgError("{0}\n{1}".format(msg, hint)) except Exception: raise def get_canonical_repo_name(config, repo_url): """ Check whether the current repo is a fork and if so, retrieve the parent fork to get the proper name. """ # Look up the repo and query for forked_from_project cli_name = config_get_safely(dist_git_config, "__default", "cli_name") distgit_section = "{0}.distgit".format(cli_name) distgit_api_base_url = config_get_safely( dist_git_config, distgit_section, "apibaseurl" ) parsed_repo_url = urlparse(repo_url) if not parsed_repo_url.scheme and repo_url.startswith("git@"): # Some git checkouts are in the form of git@gitlab.com/... # If it's missing the scheme, it will treat the entire URL as the path # so we'll fake up the scheme for this situation # https://www.git-scm.com/book/en/v2/Git-Basics-Getting-a-Git-Repository # implies that no scheme is equivalent to git+ssh:// # When making that conversion, we also have to replace the leading ':' # with a slash. faked_url = "git+ssh://{0}".format(repo_url.replace(":", "/", 1)) parsed_repo_url = urlparse(faked_url) try: distgit_token = config_get_safely(dist_git_config, distgit_section, "token") api_url = "{0}/api/v4".format(distgit_api_base_url.rstrip("/")) project_url = "{0}/projects/{1}".format( api_url, quote_plus(parsed_repo_url.path.lstrip("/")) ) headers = { "PRIVATE-TOKEN": distgit_token, "Accept": "application/json", "Content-Type": "application/json", } rv = requests.get(project_url, headers=headers) rv.raise_for_status() # Extract response json for debugging rv_json = rv.json() canonical_repo_name = rv_json["forked_from_project"]["name"] except HTTPError as e: # We got a 4xx or 5xx error code from the URL lookup if e.response.status_code == HTTPStatus.FORBIDDEN: raise rpkgError("Insufficient Gitlab API permissions. Missing token?") # Other errors are unexpected, so re-raise them raise except KeyError as e: # There was no 'forked_from_project' key, likely meaning the # user lacked permissions to read the API. Usually this means # they haven't supplied a token or it is expired. raise rpkgError("Insufficient Gitlab API permissions. Missing token?") # Chop off a trailing .git if any return canonical_repo_name.rsplit(".git", 1)[0] def get_repo_name(name, org="rpms"): """ Try to parse the repository name in case it is a git url. Parameters ---------- name: str The repository name, including the org name. It will try to retrieve both repository name and org in case "name" is an url. org: str The org to use in case name parsing is needed. Returns ------- str A string containing the repository name: $ORG/$REPO`. It will return the original `name` parameter in case of regex match failure. """ if name.startswith(org): return name # This is probably a renamed fork, so try to find the fork's parent repo_name = get_canonical_repo_name(dist_git_config, name) return "%s/%s" % (org, repo_name) class StreamMappingError(RHELError): pass def stream_mapping(csname): """ Given a CentOS Stream name, map it to the corresponding RHEL name. Parameters ---------- csname: str The CentOS Stream name. Returns ------- str Corresponding RHEL name. """ if csname == "c8s" or csname == "cs8": return 8, "rhel-8" if csname == "c9s" or csname == "cs9": return 9, "rhel-9" if csname == "c10s" or csname == "cs10": return 10, "rhel-10" if csname == "c11s" or csname == "cs11": return 11, "rhel-11" raise StreamMappingError(f"{csname} is not a Stream branch") def does_branch_exist(rhel_dist_git, namespace, repo_name, branch): # Determine if the Y-1 branch exists for this repo g = gitpython.cmd.Git() try: g.ls_remote( "--exit-code", os.path.join(rhel_dist_git, namespace, repo_name), branch, ) branch_exists = True except gitpython.GitCommandError as e: t, v, tb = sys.exc_info() # `git ls-remote --exit-code` returns "2" if it cannot find the ref if e.status == 2: branch_exists = False else: raise return branch_exists def _datesplit(isodate): date_string_tuple = isodate.split("-") return [int(x) for x in date_string_tuple] # Certain packages are not synced to RHEL, and will always use the 'cXs' branch # rules. This list is maintained in the distrobaker configuration: def get_unsynced_projects(distrobaker_config, namespace): res = requests.get(distrobaker_config, timeout=60) res.raise_for_status() payload = yaml.safe_load(res.content.decode("utf-8")) return payload["configuration"]["control"]["exclude"][namespace] def parse_rhel_shortname(shortname): # The shortname is in the form rhel-9-1.0 or rhel-10.0[.beta] m = re.match( "rhel-(?P[0-9]+)[.-](?P[0-9]+)([.]0|[.](?P.*))?", shortname ) if not m: raise RuntimeError("Could not parse version from {}".format(shortname)) major_version = int(m.group("major")) minor_version = int(m.group("minor")) extra_version = m.group("extra") or None return major_version, minor_version, extra_version def parse_rhel_branchname(shortname): # The branchname is in the form rhel-9-1.0 or rhel-10.0[-beta] m = re.match( "rhel-(?P[0-9]+)[.-](?P[0-9]+)([.]0|[-](?P.*))?", shortname ) if not m: raise RuntimeError("Could not parse version from {}".format(shortname)) major_version = int(m.group("major")) minor_version = int(m.group("minor")) extra_version = m.group("extra") or None return major_version, minor_version, extra_version def query_package_pages(api_url, request_params): """ api_url: A URL to the API endpoing of the Product Pages (e.g. "https://example.com/pp/api/") request_params: A set of python-requests-compatible URL parameters to focus the query. """ res = requests.get( os.path.join(api_url, "latest", "releases"), params=request_params, timeout=60, ) res.raise_for_status() payload = json.loads(res.text) logger.debug("Response from PP API: {}".format(json.dumps(payload, indent=2))) return payload def format_branch(x_version, y_version, is_beta): if x_version <= 9: # 9.x and older releases include an excess .0 in the branch name if is_beta: branch = "rhel-{}.{}.0-beta".format(x_version, y_version) else: branch = "rhel-{}.{}.0".format(x_version, y_version) else: # Starting with RHEL 10, the branch names have dropped the extra .0 if is_beta: branch = "rhel-{}.{}-beta".format(x_version, y_version) else: branch = "rhel-{}.{}".format(x_version, y_version) return branch def determine_rhel_state( rhel_dist_git, namespace, repo_name, cs_branch, pp_api_url, distrobaker_config ): """ Arguments: * rhel_dist_git: an https URL to the RHEL dist-git. Used for determining the presence of the prior release's Z-stream branch. * namespace: The dist-git namespace (rpms, containers, modules, etc.). Used for determining the presence of the prior release's Z-stream branch. * repo_name: The name of the repo in the namespace from which we will determine status. Used for determining the presence of the prior release's Z-stream branch. * cs_branch: The CentOS Stream branch for this repo. Used to determine the RHEL major release. * pp_api_url: The URL to the RHEL Product Pages API. Used for determining the current development phase. * distrobaker_config: The URL to the DistroBaker configuration. Used for identifying packages that are not synced to RHEL. Returns: a namedtuple containing key information about the RHEL release associated with this CentOS Stream branch. It has the following members: * latest_version: The most recent major and minor release of RHEL. This is a presentation string and its format is not guaranteed. * target_version: The major and minor release of RHEL that is currently targeted by this CentOS Stream branch. This is a presentation string and its format is not guaranteed. * rule_branch: The branch to be used for check-tickets rules (str) * rhel_target_default: The default `--rhel-target` (str) or None (NoneType). The possible values if not None are "latest", "zstream" or "none" (distinctive from NoneType) * enforcing: Whether ticket approvals should be enforced. (bool) * synced: Whether this package is synced to RHEL. False means it is a CentOS Stream-only package. (bool) """ # First, check if this package has a RHEL counterpart or is CentOS Stream # only. try: if repo_name in get_unsynced_projects(distrobaker_config, namespace): # We don't need to do any looking up, because it will always use the # stream version and never enforce tickets. It will return # rhel_target_default="none" to instruct distrobaker not to attempt # to build on RHEL. return rhel_state_nt( latest_version=cs_branch, target_version=cs_branch, rule_branch=cs_branch, phase=pp_phase_devtestdoc, rhel_target_default="none", enforcing=False, synced=False, ) except (ConnectionError, HTTPError) as e: raise RHELError("Could not retrieve distrobaker config. Are you on the VPN?") try: x_version, rhel_version = stream_mapping(cs_branch) except StreamMappingError as e: # This is not a standard branch name, so it's probably either a custom # branch or a module stream branch. Either way, we'll return it as-is # and always treat it as if it is in enforcing mode. return rhel_state_nt( latest_version=cs_branch, target_version=cs_branch, rule_branch=cs_branch, phase=pp_phase_maintenance, rhel_target_default="none", enforcing=True, synced=False, ) # Query the "package pages" API for the current active Y-stream release request_params = { "phase__in": f"{pp_phase_devtestdoc},{pp_phase_stabilization},{pp_phase_launch},{pp_phase_maintenance},{pp_phase_unsupported}", "product__shortname": "rhel", "relgroup__shortname": rhel_version, "format": "json", } try: pp_response = query_package_pages( api_url=pp_api_url, request_params=request_params ) except (ConnectionError, HTTPError) as e: raise RHELError("Could not contact Product Pages. Are you on the VPN?") if len(pp_response) < 1: # Received zero potential release matches logger.warning("Didn't match any active releases. Assuming pre-Beta.") # Fake up a Beta payload pp_response = [ { "shortname": "{}.0-beta".format(rhel_version), "phase": pp_phase_devtestdoc, } ] active_y_version = -1 beta = False phase_lookup = dict() for entry in pp_response: shortname = entry["shortname"] # The shortname is in the form rhel-9-1.0 or rhel-10.0[.beta] # Extract the active Y-stream version x_version, y_version, extra_version = parse_rhel_shortname(shortname) entry_is_beta = bool(extra_version and "beta" in extra_version) # Enable looking up the phase later branch_name = format_branch(x_version, y_version, entry_is_beta) phase_lookup[branch_name] = entry["phase"] if y_version > active_y_version or ( y_version == active_y_version and beta and not entry_is_beta ): # Replace the saved values with a higher Y version if we # see one. Also check whether we have the same Y version # but without the Beta indicator active_y_version = y_version beta = entry_is_beta if beta: latest_version = "{}.{} Beta".format(x_version, active_y_version) else: latest_version = "{}.{}".format(x_version, active_y_version) logger.debug("Latest version: {}".format(latest_version)) # Next we need to find out if we're actually USING the latest version or # the previous one, by asking RHEL dist-git if the rhel-X.(Y-1).0 branch # exists. (Or rhel-X.Y.0-beta in the special case of Y=0) # If the latest release is the Beta, we can skip checking for a prior # release branch, since none can exist and we know it cannot be in # the Stabilization Phase yet. Thus, we return the CS branch and # --rhel-target=latest if beta: return rhel_state_nt( latest_version=latest_version, target_version=latest_version, rule_branch=cs_branch, phase=pp_phase_devtestdoc, rhel_target_default="latest", enforcing=False, synced=True, ) # First, check if this is the special case of Y=0 # Note: since this is being written during the 10.0 Beta timeframe, there # is no need to special case older formats like 9.0.0-beta. We can just # use rhel-X.0-beta instead. if active_y_version == 0: prior_release_branch = format_branch(x_version, active_y_version, is_beta=True) else: prior_release_branch = format_branch( x_version, active_y_version - 1, is_beta=False ) current_release_branch = format_branch(x_version, active_y_version, is_beta=False) logger.debug("Prior release branch: {}".format(prior_release_branch)) # Determine which phase the prior release is in: prior_release_phase = phase_lookup[prior_release_branch] # If the prior release is in the Unsupported Phase, it probably means # that we're dealing with an EOL CentOS Stream (like 8.10). We need # to use the stream rules in this case. prior_is_eol = bool(prior_release_phase == pp_phase_unsupported) if not prior_is_eol: try: branch_exists = does_branch_exist( rhel_dist_git, namespace, repo_name, prior_release_branch ) except gitpython.GitCommandError as e: raise RHELError("Could not read from RHEL dist-git. Are you on the VPN?") if prior_is_eol or branch_exists: # The branch is there or the previous branch is EOL, so work on the # active Y-stream, which is always in either DevTestDoc Phase or # Maintenance Phase (in the case of an end-of-life CentOS Stream) # We'll catch the unexpected case of Unsupported Phase as well, just # to be safe. phase = phase_lookup[current_release_branch] check_tickets_branch = cs_branch rhel_target_default = "latest" target_version = latest_version if phase >= pp_phase_maintenance: enforcing = True else: enforcing = False else: # The branch is not present, so we'll work on the prior Y-stream check_tickets_branch = prior_release_branch phase = prior_release_phase target_x, target_y, target_extra = parse_rhel_branchname(prior_release_branch) target_version = "{}.{}{}".format( target_x, target_y, " Beta" if target_extra and "beta" in target_extra else "", ) # The prior Y-stream is always in either Stabilization, Launch or # Maintenance phase, so it always enforces. enforcing = True if phase == pp_phase_stabilization: # We're in the Stabilization phase, so we can't automatically determine # between the "zstream" and "exception" targets. rhel_target_default = None else: # We're in Launch or Maintenance phase, so the "exception" target # is no longer permitted. rhel_target_default = "zstream" return rhel_state_nt( latest_version=latest_version, target_version=target_version, rule_branch=check_tickets_branch, phase=phase, rhel_target_default=rhel_target_default, enforcing=enforcing, synced=True, ) def format_current_state_message(rhel_state): """ Returns a human-readable string providing actionable information about the current state of this repository. Useful for `centpkg current-state` and the check-tickets function in merge requests """ if not rhel_state.synced: return f"This component is not synced to RHEL" message = ( f"Current RHEL status:\n" f"\tThe latest active Y-stream release is RHEL {rhel_state.latest_version}\n" f"\tThis project is targeting RHEL {rhel_state.target_version}\n" ) if rhel_state.latest_version != rhel_state.target_version: zstream_active_msg = ( f"\t\tThe latest and targeted versions differ.\n" f"\t\tIf this is not intentional, please see\n" f"\t\thttps://one.redhat.com/rhel-development-guide/#proc_centos-stream-first_assembly_rhel-9-development\n" f"\t\tfor details on how to unlock Y-stream development by creating the {rhel_state.rule_branch} branch.\n" ) message = "".join((message, zstream_active_msg)) target_phase = pp_phase_name_lookup[rhel_state.phase] message = "".join( ( message, f"\tThe {rhel_state.target_version} release is currently in {target_phase} phase\n", ) ) if rhel_state.phase == pp_phase_stabilization: message = "".join( (message, f"\t\tThe --rhel-target argument must be used when building.\n") ) message = "".join( ( message, f"\tTicket approvals are {'' if rhel_state.enforcing else 'not '}currently required for merge request approval.", ) ) return message