| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| import git |
| import json |
| import logging |
| import os |
| import re |
| import requests |
| import sys |
| from collections import namedtuple |
| from datetime import date, datetime |
| from http import HTTPStatus |
| from pyrpkg import rpkgError |
| from requests.exceptions import ConnectionError, HTTPError |
| from configparser import NoOptionError, NoSectionError |
| from urllib.parse import quote_plus, urlparse |
| import yaml |
| |
| import git as gitpython |
| |
| dist_git_config = None |
| |
| |
| pp_phase_name_lookup = dict() |
| |
| pp_phase_devtestdoc = 230 |
| pp_phase_name_lookup[pp_phase_devtestdoc] = "DevTestDoc" |
| |
| |
| pp_phase_stabilization = 450 |
| pp_phase_name_lookup[pp_phase_stabilization] = "Stabilization" |
| |
| |
| pp_phase_launch = 500 |
| pp_phase_name_lookup[pp_phase_launch] = "Launch" |
| |
| |
| pp_phase_maintenance = 600 |
| pp_phase_name_lookup[pp_phase_maintenance] = "Maintenance" |
| |
| |
| pp_phase_unsupported = 1000 |
| pp_phase_name_lookup[pp_phase_unsupported] = "Unsupported" |
| |
| |
| default_distrobaker_config = "https://gitlab.cee.redhat.com/osci/distrobaker_config/-/raw/rhel9/distrobaker.yaml?ref_type=heads" |
| |
| rhel_state_nt = namedtuple( |
| "RHELState", |
| [ |
| "latest_version", |
| "target_version", |
| "rule_branch", |
| "phase", |
| "rhel_target_default", |
| "enforcing", |
| "synced", |
| ], |
| ) |
| |
| |
| |
| class RHELError(Exception): |
| pass |
| |
| |
| logger = logging.getLogger(__name__) |
| |
| |
| def do_fork(logger, base_url, token, repo_name, namespace, cli_name): |
| """ |
| Creates a fork of the project. |
| :param logger: A logger object |
| :param base_url: a string of the URL repository |
| :param token: a string of the API token that has rights to make a fork |
| :param repo_name: a string of the repository name |
| :param namespace: a string determines a type of the repository |
| :param cli_name: string of the CLI's name (e.g. centpkg) |
| :return: a tuple consisting of whether the fork needed to be created (bool) |
| and the fork path (string) |
| """ |
| api_url = "{0}/api/v4".format(base_url.rstrip("/")) |
| project_id = quote_plus("redhat/centos-stream/{0}/{1}".format(namespace, repo_name)) |
| fork_url = "{0}/projects/{1}/fork".format(api_url, project_id) |
| |
| headers = { |
| "PRIVATE-TOKEN": token, |
| "Accept": "application/json", |
| "Content-Type": "application/json", |
| } |
| |
| safe_name = "centos_{0}_{1}".format(namespace, repo_name) |
| payload = json.dumps( |
| { |
| "name": safe_name, |
| "path": safe_name, |
| } |
| ) |
| try: |
| rv = requests.post(fork_url, headers=headers, data=payload, timeout=60) |
| except ConnectionError as error: |
| error_msg = ( |
| "The connection to API failed while trying to " |
| "create a new fork. The error was: {0}".format(str(error)) |
| ) |
| raise rpkgError(error_msg) |
| |
| try: |
| |
| rv_json = rv.json() |
| logger.debug("GitLab API response: '{0}'".format(rv_json)) |
| except Exception: |
| pass |
| |
| if rv.ok: |
| fork_id = rv.json()["id"] |
| try: |
| |
| rv = requests.delete( |
| "{0}/projects/{1}/protected_branches/{2}".format( |
| api_url, fork_id, "c9s" |
| ), |
| headers=headers, |
| ) |
| except ConnectionError as error: |
| error_msg = ( |
| "The connection to API failed while trying to unprotect c9s branch" |
| "in the fork. The error was: {0}".format(str(error)) |
| ) |
| raise rpkgError(error_msg) |
| |
| try: |
| |
| |
| |
| data = { |
| "id": fork_id, |
| "name": "c9s", |
| "allowed_to_push": [{"access_level": 40}], |
| "allowed_to_merge": [{"access_level": 40}], |
| } |
| rv = requests.post( |
| "{0}/projects/{1}/protected_branches".format(api_url, fork_id), |
| json=data, |
| headers=headers, |
| ) |
| except ConnectionError as error: |
| error_msg = ( |
| "The connection to API failed while trying to reprotect c9s branch" |
| "in the fork fork. The error was: {0}".format(str(error)) |
| ) |
| raise rpkgError(error_msg) |
| |
| base_error_msg = "The following error occurred while creating a new fork: {0}" |
| if not rv.ok: |
| |
| if rv.status_code == 409 or rv.reason == "Conflict": |
| |
| |
| |
| rv = requests.get("{0}/user".format(api_url), headers=headers) |
| username = rv.json()["username"] |
| return False, "{0}/{1}".format(username, safe_name) |
| |
| elif rv.status_code == 401 or rv.reason == "Unauthorized": |
| base_error_msg += ( |
| "\nFor invalid or expired token refer to " |
| '"{0} fork -h" to set a token in your user ' |
| "configuration.".format(cli_name) |
| ) |
| raise rpkgError(base_error_msg.format(rv.text)) |
| |
| return True, rv_json["path_with_namespace"] |
| |
| |
| def do_add_remote(base_url, remote_base_url, repo, repo_path, remote_name): |
| """ |
| Adds remote tracked repository |
| :param base_url: a string of the URL repository |
| :param remote_base_url: a string of the remote tracked repository |
| :param repo: object, current project git repository |
| :param repo_path: a string of the repository path |
| :param remote_name: a string of the remote name |
| :return: a bool; True if remote was created, False when already exists |
| """ |
| parsed_url = urlparse(remote_base_url) |
| remote_url = "{0}://{1}/{2}.git".format( |
| parsed_url.scheme, |
| parsed_url.netloc, |
| repo_path, |
| ) |
| |
| |
| for remote in repo.remotes: |
| if remote.name == remote_name: |
| return False |
| |
| try: |
| repo.create_remote(remote_name, url=remote_url) |
| except git.exc.GitCommandError as e: |
| error_msg = "During create remote:\n {0}\n {1}".format( |
| " ".join(e.command), e.stderr |
| ) |
| raise rpkgError(error_msg) |
| return True |
| |
| |
| def config_get_safely(config, section, option): |
| """ |
| Returns option from the user's configuration file. In case of missing |
| section or option method throws an exception with a human-readable |
| warning and a possible hint. |
| The method should be used especially in situations when there are newly |
| added sections/options into the config. In this case, there is a risk that |
| the user's config wasn't properly upgraded. |
| |
| :param config: ConfigParser object |
| :param section: section name in the config |
| :param option: name of the option |
| :return: option value from the right section |
| :rtype: str |
| """ |
| |
| hint = ( |
| "First (if possible), refer to the help of the current command " |
| "(-h/--help).\n" |
| "There also might be a new version of the config after upgrade.\n" |
| "Hint: you can check if you have 'centpkg.conf.rpmnew' or " |
| "'centpkg.conf.rpmsave' in the config directory. If yes, try to merge " |
| "your changes to the config with the maintainer provided version " |
| "(or replace centpkg.conf file with 'centpkg.conf.rpmnew')." |
| ) |
| |
| try: |
| return config.get(section, option) |
| except NoSectionError: |
| msg = "Missing section '{0}' in the config file.".format(section) |
| raise rpkgError("{0}\n{1}".format(msg, hint)) |
| except NoOptionError: |
| msg = "Missing option '{0}' in the section '{1}' of the config file.".format( |
| option, section |
| ) |
| raise rpkgError("{0}\n{1}".format(msg, hint)) |
| except Exception: |
| raise |
| |
| |
| def get_canonical_repo_name(config, repo_url): |
| """ |
| Check whether the current repo is a fork and if so, retrieve the parent |
| fork to get the proper name. |
| """ |
| |
| |
| cli_name = config_get_safely(dist_git_config, "__default", "cli_name") |
| distgit_section = "{0}.distgit".format(cli_name) |
| distgit_api_base_url = config_get_safely( |
| dist_git_config, distgit_section, "apibaseurl" |
| ) |
| |
| parsed_repo_url = urlparse(repo_url) |
| if not parsed_repo_url.scheme and repo_url.startswith("git@"): |
| |
| |
| |
| |
| |
| |
| |
| faked_url = "git+ssh://{0}".format(repo_url.replace(":", "/", 1)) |
| parsed_repo_url = urlparse(faked_url) |
| |
| try: |
| distgit_token = config_get_safely(dist_git_config, distgit_section, "token") |
| |
| api_url = "{0}/api/v4".format(distgit_api_base_url.rstrip("/")) |
| project_url = "{0}/projects/{1}".format( |
| api_url, quote_plus(parsed_repo_url.path.lstrip("/")) |
| ) |
| |
| headers = { |
| "PRIVATE-TOKEN": distgit_token, |
| "Accept": "application/json", |
| "Content-Type": "application/json", |
| } |
| |
| rv = requests.get(project_url, headers=headers) |
| rv.raise_for_status() |
| |
| |
| rv_json = rv.json() |
| |
| canonical_repo_name = rv_json["forked_from_project"]["name"] |
| |
| except HTTPError as e: |
| |
| if e.response.status_code == HTTPStatus.FORBIDDEN: |
| raise rpkgError("Insufficient Gitlab API permissions. Missing token?") |
| |
| |
| raise |
| |
| except KeyError as e: |
| |
| |
| |
| raise rpkgError("Insufficient Gitlab API permissions. Missing token?") |
| |
| |
| return canonical_repo_name.rsplit(".git", 1)[0] |
| |
| |
| def get_repo_name(name, org="rpms"): |
| """ |
| Try to parse the repository name in case it is a git url. |
| |
| Parameters |
| ---------- |
| name: str |
| The repository name, including the org name. |
| It will try to retrieve both repository name and org in case "name" is an url. |
| |
| org: str |
| The org to use in case name parsing is needed. |
| |
| Returns |
| ------- |
| str |
| A string containing the repository name: $ORG/$REPO`. |
| It will return the original `name` parameter in case of regex match failure. |
| """ |
| if name.startswith(org): |
| return name |
| |
| |
| repo_name = get_canonical_repo_name(dist_git_config, name) |
| |
| return "%s/%s" % (org, repo_name) |
| |
| |
| class StreamMappingError(RHELError): |
| pass |
| |
| |
| def stream_mapping(csname): |
| """ |
| Given a CentOS Stream name, map it to the corresponding RHEL name. |
| |
| Parameters |
| ---------- |
| csname: str |
| The CentOS Stream name. |
| |
| Returns |
| ------- |
| str |
| Corresponding RHEL name. |
| """ |
| if csname == "c8s" or csname == "cs8": |
| return 8, "rhel-8" |
| if csname == "c9s" or csname == "cs9": |
| return 9, "rhel-9" |
| if csname == "c10s" or csname == "cs10": |
| return 10, "rhel-10" |
| if csname == "c11s" or csname == "cs11": |
| return 11, "rhel-11" |
| raise StreamMappingError(f"{csname} is not a Stream branch") |
| |
| |
| def does_branch_exist(rhel_dist_git, namespace, repo_name, branch): |
| |
| g = gitpython.cmd.Git() |
| try: |
| g.ls_remote( |
| "--exit-code", |
| os.path.join(rhel_dist_git, namespace, repo_name), |
| branch, |
| ) |
| branch_exists = True |
| except gitpython.GitCommandError as e: |
| t, v, tb = sys.exc_info() |
| |
| if e.status == 2: |
| branch_exists = False |
| else: |
| raise |
| return branch_exists |
| |
| |
| def _datesplit(isodate): |
| date_string_tuple = isodate.split("-") |
| return [int(x) for x in date_string_tuple] |
| |
| |
| |
| |
| def get_unsynced_projects(distrobaker_config, namespace): |
| res = requests.get(distrobaker_config, timeout=60) |
| res.raise_for_status() |
| payload = yaml.safe_load(res.content.decode("utf-8")) |
| return payload["configuration"]["control"]["exclude"][namespace] |
| |
| |
| def parse_rhel_shortname(shortname): |
| |
| m = re.match( |
| "rhel-(?P<major>[0-9]+)[.-](?P<minor>[0-9]+)([.]0|[.](?P<extra>.*))?", shortname |
| ) |
| if not m: |
| raise RuntimeError("Could not parse version from {}".format(shortname)) |
| |
| major_version = int(m.group("major")) |
| minor_version = int(m.group("minor")) |
| extra_version = m.group("extra") or None |
| |
| return major_version, minor_version, extra_version |
| |
| |
| def parse_rhel_branchname(shortname): |
| |
| m = re.match( |
| "rhel-(?P<major>[0-9]+)[.-](?P<minor>[0-9]+)([.]0|[-](?P<extra>.*))?", shortname |
| ) |
| if not m: |
| raise RuntimeError("Could not parse version from {}".format(shortname)) |
| |
| major_version = int(m.group("major")) |
| minor_version = int(m.group("minor")) |
| extra_version = m.group("extra") or None |
| |
| return major_version, minor_version, extra_version |
| |
| |
| def query_package_pages(api_url, request_params): |
| """ |
| api_url: A URL to the API endpoing of the Product Pages (e.g. |
| "https://example.com/pp/api/") |
| request_params: A set of python-requests-compatible URL parameters to |
| focus the query. |
| """ |
| res = requests.get( |
| os.path.join(api_url, "latest", "releases"), |
| params=request_params, |
| timeout=60, |
| ) |
| res.raise_for_status() |
| payload = json.loads(res.text) |
| logger.debug("Response from PP API: {}".format(json.dumps(payload, indent=2))) |
| |
| return payload |
| |
| |
| def format_branch(x_version, y_version, is_beta): |
| if x_version <= 9: |
| |
| if is_beta: |
| branch = "rhel-{}.{}.0-beta".format(x_version, y_version) |
| else: |
| branch = "rhel-{}.{}.0".format(x_version, y_version) |
| else: |
| |
| if is_beta: |
| branch = "rhel-{}.{}-beta".format(x_version, y_version) |
| else: |
| branch = "rhel-{}.{}".format(x_version, y_version) |
| return branch |
| |
| |
| def determine_rhel_state( |
| rhel_dist_git, namespace, repo_name, cs_branch, pp_api_url, distrobaker_config |
| ): |
| """ |
| Arguments: |
| * rhel_dist_git: an https URL to the RHEL dist-git. Used for determining |
| the presence of the prior release's Z-stream branch. |
| * namespace: The dist-git namespace (rpms, containers, modules, etc.). |
| Used for determining the presence of the prior release's Z-stream |
| branch. |
| * repo_name: The name of the repo in the namespace from which we will |
| determine status. Used for determining the presence of the prior |
| release's Z-stream branch. |
| * cs_branch: The CentOS Stream branch for this repo. Used to determine the |
| RHEL major release. |
| * pp_api_url: The URL to the RHEL Product Pages API. Used for determining |
| the current development phase. |
| * distrobaker_config: The URL to the DistroBaker configuration. Used for |
| identifying packages that are not synced to RHEL. |
| |
| Returns: a namedtuple containing key information about the RHEL release |
| associated with this CentOS Stream branch. It has the following members: |
| |
| * latest_version: The most recent major and minor release of RHEL. This |
| is a presentation string and its format is not guaranteed. |
| * target_version: The major and minor release of RHEL that is currently |
| targeted by this CentOS Stream branch. This is a presentation string |
| and its format is not guaranteed. |
| * rule_branch: The branch to be used for check-tickets rules (str) |
| * rhel_target_default: The default `--rhel-target` (str) or |
| None (NoneType). The possible values if not None are "latest", |
| "zstream" or "none" (distinctive from NoneType) |
| * enforcing: Whether ticket approvals should be enforced. (bool) |
| * synced: Whether this package is synced to RHEL. False means it is a |
| CentOS Stream-only package. (bool) |
| """ |
| |
| |
| |
| try: |
| if not cs_branch.startswith("rhel-") and repo_name in get_unsynced_projects( |
| distrobaker_config, namespace |
| ): |
| |
| |
| |
| |
| return rhel_state_nt( |
| latest_version=cs_branch, |
| target_version=cs_branch, |
| rule_branch=cs_branch, |
| phase=pp_phase_devtestdoc, |
| rhel_target_default="none", |
| enforcing=False, |
| synced=False, |
| ) |
| except (ConnectionError, HTTPError) as e: |
| raise RHELError("Could not retrieve distrobaker config. Are you on the VPN?") |
| |
| try: |
| x_version, rhel_version = stream_mapping(cs_branch) |
| except StreamMappingError as e: |
| |
| |
| |
| |
| return rhel_state_nt( |
| latest_version=cs_branch, |
| target_version=cs_branch, |
| rule_branch=cs_branch, |
| phase=pp_phase_maintenance, |
| rhel_target_default="none", |
| enforcing=True, |
| synced=False, |
| ) |
| |
| |
| request_params = { |
| "phase__in": f"{pp_phase_devtestdoc},{pp_phase_stabilization},{pp_phase_launch},{pp_phase_maintenance},{pp_phase_unsupported}", |
| "product__shortname": "rhel", |
| "relgroup__shortname": rhel_version, |
| "format": "json", |
| } |
| |
| try: |
| pp_response = query_package_pages( |
| api_url=pp_api_url, request_params=request_params |
| ) |
| except (ConnectionError, HTTPError) as e: |
| raise RHELError("Could not contact Product Pages. Are you on the VPN?") |
| |
| if len(pp_response) < 1: |
| |
| logger.warning("Didn't match any active releases. Assuming pre-Beta.") |
| |
| |
| pp_response = [ |
| { |
| "shortname": "{}.0-beta".format(rhel_version), |
| "phase": pp_phase_devtestdoc, |
| } |
| ] |
| |
| active_y_version = -1 |
| beta = False |
| phase_lookup = dict() |
| for entry in pp_response: |
| shortname = entry["shortname"] |
| |
| |
| |
| x_version, y_version, extra_version = parse_rhel_shortname(shortname) |
| entry_is_beta = bool(extra_version and "beta" in extra_version) |
| |
| |
| branch_name = format_branch(x_version, y_version, entry_is_beta) |
| phase_lookup[branch_name] = entry["phase"] |
| |
| if y_version > active_y_version or ( |
| y_version == active_y_version and beta and not entry_is_beta |
| ): |
| |
| |
| |
| active_y_version = y_version |
| beta = entry_is_beta |
| |
| if beta: |
| latest_version = "{}.{} Beta".format(x_version, active_y_version) |
| else: |
| latest_version = "{}.{}".format(x_version, active_y_version) |
| |
| logger.debug("Latest version: {}".format(latest_version)) |
| |
| |
| |
| |
| |
| |
| |
| |
| |
| if beta: |
| return rhel_state_nt( |
| latest_version=latest_version, |
| target_version=latest_version, |
| rule_branch=cs_branch, |
| phase=pp_phase_devtestdoc, |
| rhel_target_default="latest", |
| enforcing=False, |
| synced=True, |
| ) |
| |
| |
| |
| |
| |
| if active_y_version == 0: |
| prior_release_branch = format_branch(x_version, active_y_version, is_beta=True) |
| else: |
| prior_release_branch = format_branch( |
| x_version, active_y_version - 1, is_beta=False |
| ) |
| current_release_branch = format_branch(x_version, active_y_version, is_beta=False) |
| |
| logger.debug("Prior release branch: {}".format(prior_release_branch)) |
| |
| |
| prior_release_phase = phase_lookup[prior_release_branch] |
| |
| |
| |
| |
| |
| prior_is_eol = bool(prior_release_phase == pp_phase_unsupported) |
| |
| if not prior_is_eol: |
| try: |
| branch_exists = does_branch_exist( |
| rhel_dist_git, namespace, repo_name, prior_release_branch |
| ) |
| except gitpython.GitCommandError as e: |
| raise RHELError("Could not read from RHEL dist-git. Are you on the VPN?") |
| |
| if prior_is_eol or branch_exists: |
| |
| |
| |
| |
| |
| phase = phase_lookup[current_release_branch] |
| check_tickets_branch = cs_branch |
| rhel_target_default = "latest" |
| target_version = latest_version |
| if phase >= pp_phase_maintenance: |
| enforcing = True |
| else: |
| enforcing = False |
| else: |
| |
| check_tickets_branch = prior_release_branch |
| phase = prior_release_phase |
| |
| target_x, target_y, target_extra = parse_rhel_branchname(prior_release_branch) |
| target_version = "{}.{}{}".format( |
| target_x, |
| target_y, |
| " Beta" if target_extra and "beta" in target_extra else "", |
| ) |
| |
| |
| |
| enforcing = True |
| |
| if phase == pp_phase_stabilization: |
| |
| |
| rhel_target_default = None |
| else: |
| |
| |
| rhel_target_default = "zstream" |
| |
| return rhel_state_nt( |
| latest_version=latest_version, |
| target_version=target_version, |
| rule_branch=check_tickets_branch, |
| phase=phase, |
| rhel_target_default=rhel_target_default, |
| enforcing=enforcing, |
| synced=True, |
| ) |
| |
| |
| def format_current_state_message(rhel_state): |
| """ |
| Returns a human-readable string providing actionable information about the |
| current state of this repository. Useful for `centpkg current-state` and |
| the check-tickets function in merge requests |
| """ |
| |
| if not rhel_state.synced: |
| return f"This component is not synced to RHEL" |
| |
| message = ( |
| f"Current RHEL status:\n" |
| f"\tThe latest active Y-stream release is RHEL {rhel_state.latest_version}\n" |
| f"\tThis project is targeting RHEL {rhel_state.target_version}\n" |
| ) |
| |
| if rhel_state.latest_version != rhel_state.target_version: |
| zstream_active_msg = ( |
| f"\t\tThe latest and targeted versions differ.\n" |
| f"\t\tIf this is not intentional, please see\n" |
| f"\t\thttps://one.redhat.com/rhel-development-guide/#proc_centos-stream-first_assembly_rhel-9-development\n" |
| f"\t\tfor details on how to unlock Y-stream development by creating the {rhel_state.rule_branch} branch.\n" |
| ) |
| message = "".join((message, zstream_active_msg)) |
| |
| target_phase = pp_phase_name_lookup[rhel_state.phase] |
| message = "".join( |
| ( |
| message, |
| f"\tThe {rhel_state.target_version} release is currently in {target_phase} phase\n", |
| ) |
| ) |
| |
| if rhel_state.phase == pp_phase_stabilization: |
| message = "".join( |
| (message, f"\t\tThe --rhel-target argument must be used when building.\n") |
| ) |
| |
| message = "".join( |
| ( |
| message, |
| f"\tTicket approvals are {'' if rhel_state.enforcing else 'not '}currently required for merge request approval.", |
| ) |
| ) |
| |
| return message |