Blob Blame History Raw
# -*- coding: utf-8 -*-
# utils.py - a module with support methods for centpkg
#
# Copyright (C) 2021 Red Hat Inc.
# Author(s): Ondrej Nosek <onosek@redhat.com>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
# the full text of the license.

import git
import json
import logging
import os
import re
import requests
import sys
from collections import namedtuple
from datetime import date, datetime
from http import HTTPStatus
from pyrpkg import rpkgError
from requests.exceptions import ConnectionError, HTTPError
from configparser import NoOptionError, NoSectionError
from urllib.parse import quote_plus, urlparse
import yaml

import git as gitpython

dist_git_config = None

# RHEL Product Pages Phase Identifiers
pp_phase_name_lookup = dict()
# Phase 230 is "Planning / Development / Testing" (AKA DevTestDoc)
pp_phase_devtestdoc = 230
pp_phase_name_lookup[pp_phase_devtestdoc] = "DevTestDoc"

# Phase 450 is "Stabilization" (AKA Exception Phase)
pp_phase_stabilization = 450
pp_phase_name_lookup[pp_phase_stabilization] = "Stabilization"

# Phase 500 is "Launch"
pp_phase_launch = 500
pp_phase_name_lookup[pp_phase_launch] = "Launch"

# Phase 600 is "Maintenance" (AKA Z-stream Phase)
pp_phase_maintenance = 600
pp_phase_name_lookup[pp_phase_maintenance] = "Maintenance"

# Phase 1000 is "Unsupported" (AKA, end-of-life)
pp_phase_unsupported = 1000
pp_phase_name_lookup[pp_phase_unsupported] = "Unsupported"

# Default lookup location for unsynced packages
default_distrobaker_config = "https://gitlab.cee.redhat.com/osci/distrobaker_config/-/raw/rhel9/distrobaker.yaml?ref_type=heads"

rhel_state_nt = namedtuple(
    "RHELState",
    [
        "latest_version",
        "target_version",
        "rule_branch",
        "phase",
        "rhel_target_default",
        "enforcing",
        "synced",
    ],
)


# Super-class for errors related to internal RHEL infrastructure
class RHELError(Exception):
    pass


logger = logging.getLogger(__name__)


def do_fork(logger, base_url, token, repo_name, namespace, cli_name):
    """
    Creates a fork of the project.
    :param logger: A logger object
    :param base_url: a string of the URL repository
    :param token: a string of the API token that has rights to make a fork
    :param repo_name: a string of the repository name
    :param namespace: a string determines a type of the repository
    :param cli_name: string of the CLI's name (e.g. centpkg)
    :return: a tuple consisting of whether the fork needed to be created (bool)
        and the fork path (string)
    """
    api_url = "{0}/api/v4".format(base_url.rstrip("/"))
    project_id = quote_plus("redhat/centos-stream/{0}/{1}".format(namespace, repo_name))
    fork_url = "{0}/projects/{1}/fork".format(api_url, project_id)

    headers = {
        "PRIVATE-TOKEN": token,
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    # define a new repository name/path to avoid collision with other projects
    safe_name = "centos_{0}_{1}".format(namespace, repo_name)
    payload = json.dumps(
        {
            "name": safe_name,  # name of the project after forking
            "path": safe_name,
        }
    )
    try:
        rv = requests.post(fork_url, headers=headers, data=payload, timeout=60)
    except ConnectionError as error:
        error_msg = (
            "The connection to API failed while trying to "
            "create a new fork. The error was: {0}".format(str(error))
        )
        raise rpkgError(error_msg)

    try:
        # Extract response json for debugging
        rv_json = rv.json()
        logger.debug("GitLab API response: '{0}'".format(rv_json))
    except Exception:
        pass

    if rv.ok:
        fork_id = rv.json()["id"]
        try:
            # Unprotect c9s in fork
            rv = requests.delete(
                "{0}/projects/{1}/protected_branches/{2}".format(
                    api_url, fork_id, "c9s"
                ),
                headers=headers,
            )
        except ConnectionError as error:
            error_msg = (
                "The connection to API failed while trying to unprotect c9s branch"
                "in the fork. The error was: {0}".format(str(error))
            )
            raise rpkgError(error_msg)

        try:
            # Reprotect c9s to disable pushes
            # Only maintainers in gitlab are allowed to push with the following config
            # In CS, every pkg maintainer is considered as a developer in gitlab
            data = {
                "id": fork_id,
                "name": "c9s",
                "allowed_to_push": [{"access_level": 40}],
                "allowed_to_merge": [{"access_level": 40}],
            }
            rv = requests.post(
                "{0}/projects/{1}/protected_branches".format(api_url, fork_id),
                json=data,
                headers=headers,
            )
        except ConnectionError as error:
            error_msg = (
                "The connection to API failed while trying to reprotect c9s branch"
                "in the fork fork. The error was: {0}".format(str(error))
            )
            raise rpkgError(error_msg)

    base_error_msg = "The following error occurred while creating a new fork: {0}"
    if not rv.ok:
        # fork was already created
        if rv.status_code == 409 or rv.reason == "Conflict":
            # When the repo already exists, the return doesn't contain the repo
            # path or username.  Make one more API call to get the username of
            # the token to construct the repo path.
            rv = requests.get("{0}/user".format(api_url), headers=headers)
            username = rv.json()["username"]
            return False, "{0}/{1}".format(username, safe_name)
        # show hint for invalid, expired or revoked token
        elif rv.status_code == 401 or rv.reason == "Unauthorized":
            base_error_msg += (
                "\nFor invalid or expired token refer to "
                '"{0} fork -h" to set a token in your user '
                "configuration.".format(cli_name)
            )
        raise rpkgError(base_error_msg.format(rv.text))

    return True, rv_json["path_with_namespace"]


def do_add_remote(base_url, remote_base_url, repo, repo_path, remote_name):
    """
    Adds remote tracked repository
    :param base_url: a string of the URL repository
    :param remote_base_url: a string of the remote tracked repository
    :param repo: object, current project git repository
    :param repo_path: a string of the repository path
    :param remote_name: a string of the remote name
    :return: a bool; True if remote was created, False when already exists
    """
    parsed_url = urlparse(remote_base_url)
    remote_url = "{0}://{1}/{2}.git".format(
        parsed_url.scheme,
        parsed_url.netloc,
        repo_path,
    )

    # check already existing remote
    for remote in repo.remotes:
        if remote.name == remote_name:
            return False

    try:
        repo.create_remote(remote_name, url=remote_url)
    except git.exc.GitCommandError as e:
        error_msg = "During create remote:\n  {0}\n  {1}".format(
            " ".join(e.command), e.stderr
        )
        raise rpkgError(error_msg)
    return True


def config_get_safely(config, section, option):
    """
    Returns option from the user's configuration file. In case of missing
    section or option method throws an exception with a human-readable
    warning and a possible hint.
    The method should be used especially in situations when there are newly
    added sections/options into the config. In this case, there is a risk that
    the user's config wasn't properly upgraded.

    :param config: ConfigParser object
    :param section: section name in the config
    :param option: name of the option
    :return: option value from the right section
    :rtype: str
    """

    hint = (
        "First (if possible), refer to the help of the current command "
        "(-h/--help).\n"
        "There also might be a new version of the config after upgrade.\n"
        "Hint: you can check if you have 'centpkg.conf.rpmnew' or "
        "'centpkg.conf.rpmsave' in the config directory. If yes, try to merge "
        "your changes to the config with the maintainer provided version "
        "(or replace centpkg.conf file with 'centpkg.conf.rpmnew')."
    )

    try:
        return config.get(section, option)
    except NoSectionError:
        msg = "Missing section '{0}' in the config file.".format(section)
        raise rpkgError("{0}\n{1}".format(msg, hint))
    except NoOptionError:
        msg = "Missing option '{0}' in the section '{1}' of the config file.".format(
            option, section
        )
        raise rpkgError("{0}\n{1}".format(msg, hint))
    except Exception:
        raise


def get_canonical_repo_name(config, repo_url):
    """
    Check whether the current repo is a fork and if so, retrieve the parent
    fork to get the proper name.
    """

    # Look up the repo and query for forked_from_project
    cli_name = config_get_safely(dist_git_config, "__default", "cli_name")
    distgit_section = "{0}.distgit".format(cli_name)
    distgit_api_base_url = config_get_safely(
        dist_git_config, distgit_section, "apibaseurl"
    )

    parsed_repo_url = urlparse(repo_url)
    if not parsed_repo_url.scheme and repo_url.startswith("git@"):
        # Some git checkouts are in the form of git@gitlab.com/...
        # If it's missing the scheme, it will treat the entire URL as the path
        # so we'll fake up the scheme for this situation
        # https://www.git-scm.com/book/en/v2/Git-Basics-Getting-a-Git-Repository
        # implies that no scheme is equivalent to git+ssh://
        # When making that conversion, we also have to replace the leading ':'
        # with a slash.
        faked_url = "git+ssh://{0}".format(repo_url.replace(":", "/", 1))
        parsed_repo_url = urlparse(faked_url)

    try:
        distgit_token = config_get_safely(dist_git_config, distgit_section, "token")

        api_url = "{0}/api/v4".format(distgit_api_base_url.rstrip("/"))
        project_url = "{0}/projects/{1}".format(
            api_url, quote_plus(parsed_repo_url.path.lstrip("/"))
        )

        headers = {
            "PRIVATE-TOKEN": distgit_token,
            "Accept": "application/json",
            "Content-Type": "application/json",
        }

        rv = requests.get(project_url, headers=headers)
        rv.raise_for_status()

        # Extract response json for debugging
        rv_json = rv.json()

        canonical_repo_name = rv_json["forked_from_project"]["name"]

    except HTTPError as e:
        # We got a 4xx or 5xx error code from the URL lookup
        if e.response.status_code == HTTPStatus.FORBIDDEN:
            raise rpkgError("Insufficient Gitlab API permissions. Missing token?")

        # Other errors are unexpected, so re-raise them
        raise

    except KeyError as e:
        # There was no 'forked_from_project' key, likely meaning the
        # user lacked permissions to read the API. Usually this means
        # they haven't supplied a token or it is expired.
        raise rpkgError("Insufficient Gitlab API permissions. Missing token?")

    # Chop off a trailing .git if any
    return canonical_repo_name.rsplit(".git", 1)[0]


def get_repo_name(name, org="rpms"):
    """
    Try to parse the repository name in case it is a git url.

    Parameters
    ----------
    name: str
        The repository name, including the org name.
        It will try to retrieve both  repository name and org in case "name" is an url.

    org: str
        The org to use in case name parsing is needed.

    Returns
    -------
    str
        A string containing the repository name: $ORG/$REPO`.
        It will return the original `name` parameter in case of regex match failure.
    """
    if name.startswith(org):
        return name

    # This is probably a renamed fork, so try to find the fork's parent
    repo_name = get_canonical_repo_name(dist_git_config, name)

    return "%s/%s" % (org, repo_name)


class StreamMappingError(RHELError):
    pass


def stream_mapping(csname):
    """
    Given a CentOS Stream name, map it to the corresponding RHEL name.

    Parameters
    ----------
    csname: str
        The CentOS Stream name.

    Returns
    -------
    str
        Corresponding RHEL name.
    """
    if csname == "c8s" or csname == "cs8":
        return 8, "rhel-8"
    if csname == "c9s" or csname == "cs9":
        return 9, "rhel-9"
    if csname == "c10s" or csname == "cs10":
        return 10, "rhel-10"
    if csname == "c11s" or csname == "cs11":
        return 11, "rhel-11"
    raise StreamMappingError(f"{csname} is not a Stream branch")


def does_branch_exist(rhel_dist_git, namespace, repo_name, branch):
    # Determine if the Y-1 branch exists for this repo
    g = gitpython.cmd.Git()
    try:
        g.ls_remote(
            "--exit-code",
            os.path.join(rhel_dist_git, namespace, repo_name),
            branch,
        )
        branch_exists = True
    except gitpython.GitCommandError as e:
        t, v, tb = sys.exc_info()
        # `git ls-remote --exit-code` returns "2" if it cannot find the ref
        if e.status == 2:
            branch_exists = False
        else:
            raise
    return branch_exists


def _datesplit(isodate):
    date_string_tuple = isodate.split("-")
    return [int(x) for x in date_string_tuple]


# Certain packages are not synced to RHEL, and will always use the 'cXs' branch
# rules. This list is maintained in the distrobaker configuration:
def get_unsynced_projects(distrobaker_config, namespace):
    res = requests.get(distrobaker_config, timeout=60)
    res.raise_for_status()
    payload = yaml.safe_load(res.content.decode("utf-8"))
    return payload["configuration"]["control"]["exclude"][namespace]


def parse_rhel_shortname(shortname):
    # The shortname is in the form rhel-9-1.0 or rhel-10.0[.beta]
    m = re.match(
        "rhel-(?P<major>[0-9]+)[.-](?P<minor>[0-9]+)([.]0|[.](?P<extra>.*))?", shortname
    )
    if not m:
        raise RuntimeError("Could not parse version from {}".format(shortname))

    major_version = int(m.group("major"))
    minor_version = int(m.group("minor"))
    extra_version = m.group("extra") or None

    return major_version, minor_version, extra_version


def parse_rhel_branchname(shortname):
    # The branchname is in the form rhel-9-1.0 or rhel-10.0[-beta]
    m = re.match(
        "rhel-(?P<major>[0-9]+)[.-](?P<minor>[0-9]+)([.]0|[-](?P<extra>.*))?", shortname
    )
    if not m:
        raise RuntimeError("Could not parse version from {}".format(shortname))

    major_version = int(m.group("major"))
    minor_version = int(m.group("minor"))
    extra_version = m.group("extra") or None

    return major_version, minor_version, extra_version


def query_package_pages(api_url, request_params):
    """
    api_url: A URL to the API endpoing of the Product Pages (e.g.
        "https://example.com/pp/api/")
    request_params: A set of python-requests-compatible URL parameters to
        focus the query.
    """
    res = requests.get(
        os.path.join(api_url, "latest", "releases"),
        params=request_params,
        timeout=60,
    )
    res.raise_for_status()
    payload = json.loads(res.text)
    logger.debug("Response from PP API: {}".format(json.dumps(payload, indent=2)))

    return payload


def format_branch(x_version, y_version, is_beta):
    if x_version <= 9:
        # 9.x and older releases include an excess .0 in the branch name
        if is_beta:
            branch = "rhel-{}.{}.0-beta".format(x_version, y_version)
        else:
            branch = "rhel-{}.{}.0".format(x_version, y_version)
    else:
        # Starting with RHEL 10, the branch names have dropped the extra .0
        if is_beta:
            branch = "rhel-{}.{}-beta".format(x_version, y_version)
        else:
            branch = "rhel-{}.{}".format(x_version, y_version)
    return branch


def determine_rhel_state(
    rhel_dist_git, namespace, repo_name, cs_branch, pp_api_url, distrobaker_config
):
    """
    Arguments:
    * rhel_dist_git: an https URL to the RHEL dist-git. Used for determining
      the presence of the prior release's Z-stream branch.
    * namespace: The dist-git namespace (rpms, containers, modules, etc.).
      Used for determining the presence of the prior release's Z-stream
      branch.
    * repo_name: The name of the repo in the namespace from which we will
      determine status. Used for determining the presence of the prior
      release's Z-stream branch.
    * cs_branch: The CentOS Stream branch for this repo. Used to determine the
      RHEL major release.
    * pp_api_url: The URL to the RHEL Product Pages API. Used for determining
      the current development phase.
    * distrobaker_config: The URL to the DistroBaker configuration. Used for
      identifying packages that are not synced to RHEL.

    Returns: a namedtuple containing key information about the RHEL release
    associated with this CentOS Stream branch. It has the following members:

    * latest_version: The most recent major and minor release of RHEL. This
      is a presentation string and its format is not guaranteed.
    * target_version: The major and minor release of RHEL that is currently
      targeted by this CentOS Stream branch. This is a presentation string
      and its format is not guaranteed.
    * rule_branch: The branch to be used for check-tickets rules (str)
    * rhel_target_default: The default `--rhel-target` (str) or
      None (NoneType). The possible values if not None are "latest",
      "zstream" or "none" (distinctive from NoneType)
    * enforcing: Whether ticket approvals should be enforced. (bool)
    * synced: Whether this package is synced to RHEL. False means it is a
      CentOS Stream-only package. (bool)
    """

    # First, check if this package has a RHEL counterpart or is CentOS Stream
    # only.
    try:
        if repo_name in get_unsynced_projects(distrobaker_config, namespace):
            # We don't need to do any looking up, because it will always use the
            # stream version and never enforce tickets. It will return
            # rhel_target_default="none" to instruct distrobaker not to attempt
            # to build on RHEL.
            return rhel_state_nt(
                latest_version=cs_branch,
                target_version=cs_branch,
                rule_branch=cs_branch,
                phase=pp_phase_devtestdoc,
                rhel_target_default="none",
                enforcing=False,
                synced=False,
            )
    except (ConnectionError, HTTPError) as e:
        raise RHELError("Could not retrieve distrobaker config. Are you on the VPN?")

    try:
        x_version, rhel_version = stream_mapping(cs_branch)
    except StreamMappingError as e:
        # This is not a standard branch name, so it's probably either a custom
        # branch or a module stream branch. Either way, we'll return it as-is
        # and always treat it as if it is in enforcing mode.
        return rhel_state_nt(
            latest_version=cs_branch,
            target_version=cs_branch,
            rule_branch=cs_branch,
            phase=pp_phase_maintenance,
            rhel_target_default="none",
            enforcing=True,
            synced=False,
        )

    # Query the "package pages" API for the current active Y-stream release
    request_params = {
        "phase__in": f"{pp_phase_devtestdoc},{pp_phase_stabilization},{pp_phase_launch},{pp_phase_maintenance},{pp_phase_unsupported}",
        "product__shortname": "rhel",
        "relgroup__shortname": rhel_version,
        "format": "json",
    }

    try:
        pp_response = query_package_pages(
            api_url=pp_api_url, request_params=request_params
        )
    except (ConnectionError, HTTPError) as e:
        raise RHELError("Could not contact Product Pages. Are you on the VPN?")

    if len(pp_response) < 1:
        # Received zero potential release matches
        logger.warning("Didn't match any active releases. Assuming pre-Beta.")

        # Fake up a Beta payload
        pp_response = [
            {
                "shortname": "{}.0-beta".format(rhel_version),
                "phase": pp_phase_devtestdoc,
            }
        ]

    active_y_version = -1
    beta = False
    phase_lookup = dict()
    for entry in pp_response:
        shortname = entry["shortname"]

        # The shortname is in the form rhel-9-1.0 or rhel-10.0[.beta]
        # Extract the active Y-stream version
        x_version, y_version, extra_version = parse_rhel_shortname(shortname)
        entry_is_beta = bool(extra_version and "beta" in extra_version)

        # Enable looking up the phase later
        branch_name = format_branch(x_version, y_version, entry_is_beta)
        phase_lookup[branch_name] = entry["phase"]

        if y_version > active_y_version or (
            y_version == active_y_version and beta and not entry_is_beta
        ):
            # Replace the saved values with a higher Y version if we
            # see one. Also check whether we have the same Y version
            # but without the Beta indicator
            active_y_version = y_version
            beta = entry_is_beta

    if beta:
        latest_version = "{}.{} Beta".format(x_version, active_y_version)
    else:
        latest_version = "{}.{}".format(x_version, active_y_version)

    logger.debug("Latest version: {}".format(latest_version))

    # Next we need to find out if we're actually USING the latest version or
    # the previous one, by asking RHEL dist-git if the rhel-X.(Y-1).0 branch
    # exists. (Or rhel-X.Y.0-beta in the special case of Y=0)

    # If the latest release is the Beta, we can skip checking for a prior
    # release branch, since none can exist and we know it cannot be in
    # the Stabilization Phase yet. Thus, we return the CS branch and
    # --rhel-target=latest
    if beta:
        return rhel_state_nt(
            latest_version=latest_version,
            target_version=latest_version,
            rule_branch=cs_branch,
            phase=pp_phase_devtestdoc,
            rhel_target_default="latest",
            enforcing=False,
            synced=True,
        )

    # First, check if this is the special case of Y=0
    # Note: since this is being written during the 10.0 Beta timeframe, there
    # is no need to special case older formats like 9.0.0-beta. We can just
    # use rhel-X.0-beta instead.
    if active_y_version == 0:
        prior_release_branch = format_branch(x_version, active_y_version, is_beta=True)
    else:
        prior_release_branch = format_branch(
            x_version, active_y_version - 1, is_beta=False
        )
    current_release_branch = format_branch(x_version, active_y_version, is_beta=False)

    logger.debug("Prior release branch: {}".format(prior_release_branch))

    # Determine which phase the prior release is in:
    prior_release_phase = phase_lookup[prior_release_branch]

    # If the prior release is in the Unsupported Phase, it either means
    # that we're dealing with an EOL CentOS Stream (like 8.10) or else
    # the prior release was a Beta which is past the Launch Phase. We
    # need to use the stream rules in this case.
    prior_is_eol = bool(prior_release_phase == pp_phase_unsupported)

    if not prior_is_eol:
        try:
            branch_exists = does_branch_exist(
                rhel_dist_git, namespace, repo_name, prior_release_branch
            )
        except gitpython.GitCommandError as e:
            raise RHELError("Could not read from RHEL dist-git. Are you on the VPN?")

    if prior_is_eol or branch_exists:
        # The branch is there or the previous branch is EOL, so work on the
        # active Y-stream, which is always in either DevTestDoc Phase or
        # Maintenance Phase (in the case of an end-of-life CentOS Stream)
        # We'll catch the unexpected case of Unsupported Phase as well, just
        # to be safe.
        phase = phase_lookup[current_release_branch]
        check_tickets_branch = cs_branch
        rhel_target_default = "latest"
        target_version = latest_version
        if phase >= pp_phase_maintenance:
            enforcing = True
        else:
            enforcing = False
    else:
        # The branch is not present, so we'll work on the prior Y-stream
        check_tickets_branch = prior_release_branch
        phase = prior_release_phase

        target_x, target_y, target_extra = parse_rhel_branchname(prior_release_branch)
        target_version = "{}.{}{}".format(
            target_x,
            target_y,
            " Beta" if target_extra and "beta" in target_extra else "",
        )

        # The prior Y-stream is always in either Stabilization, Launch or
        # Maintenance phase, so it always enforces.
        enforcing = True

        if phase == pp_phase_stabilization:
            # We're in the Stabilization phase, so we can't automatically determine
            # between the "zstream" and "exception" targets.
            rhel_target_default = None
        else:
            # We're in Launch or Maintenance phase, so the "exception" target
            # is no longer permitted.
            rhel_target_default = "zstream"

    return rhel_state_nt(
        latest_version=latest_version,
        target_version=target_version,
        rule_branch=check_tickets_branch,
        phase=phase,
        rhel_target_default=rhel_target_default,
        enforcing=enforcing,
        synced=True,
    )


def format_current_state_message(rhel_state):
    """
    Returns a human-readable string providing actionable information about the
    current state of this repository. Useful for `centpkg current-state` and
    the check-tickets function in merge requests
    """

    if not rhel_state.synced:
        return f"This component is not synced to RHEL"

    message = (
        f"Current RHEL status:\n"
        f"\tThe latest active Y-stream release is RHEL {rhel_state.latest_version}\n"
        f"\tThis project is targeting RHEL {rhel_state.target_version}\n"
    )

    if rhel_state.latest_version != rhel_state.target_version:
        zstream_active_msg = (
            f"\t\tThe latest and targeted versions differ.\n"
            f"\t\tIf this is not intentional, please see\n"
            f"\t\thttps://one.redhat.com/rhel-development-guide/#proc_centos-stream-first_assembly_rhel-9-development\n"
            f"\t\tfor details on how to unlock Y-stream development by creating the {rhel_state.rule_branch} branch.\n"
        )
        message = "".join((message, zstream_active_msg))

    target_phase = pp_phase_name_lookup[rhel_state.phase]
    message = "".join(
        (
            message,
            f"\tThe {rhel_state.target_version} release is currently in {target_phase} phase\n",
        )
    )

    if rhel_state.phase == pp_phase_stabilization:
        message = "".join(
            (message, f"\t\tThe --rhel-target argument must be used when building.\n")
        )

    message = "".join(
        (
            message,
            f"\tTicket approvals are {'' if rhel_state.enforcing else 'not '}currently required for merge request approval.",
        )
    )

    return message