Blob Blame History Raw
# -*- coding: utf-8 -*-
# utils.py - a module with support methods for centpkg
#
# Copyright (C) 2021 Red Hat Inc.
# Author(s): Ondrej Nosek <onosek@redhat.com>
#
# This program is free software; you can redistribute it and/or modify it
# under the terms of the GNU General Public License as published by the
# Free Software Foundation; either version 2 of the License, or (at your
# option) any later version.  See http://www.gnu.org/copyleft/gpl.html for
# the full text of the license.

import git
import json
import logging
import os
import pytz
import re
import requests
import sys
from datetime import date, datetime
from pyrpkg import rpkgError
from requests.exceptions import ConnectionError
from six.moves.configparser import NoOptionError, NoSectionError
from six.moves.urllib.parse import quote_plus, urlparse

import git as gitpython

dist_git_config = None

def do_fork(logger, base_url, token, repo_name, namespace, cli_name):
    """
    Creates a fork of the project.
    :param logger: A logger object
    :param base_url: a string of the URL repository
    :param token: a string of the API token that has rights to make a fork
    :param repo_name: a string of the repository name
    :param namespace: a string determines a type of the repository
    :param cli_name: string of the CLI's name (e.g. centpkg)
    :return: a tuple consisting of whether the fork needed to be created (bool)
        and the fork path (string)
    """
    api_url = '{0}/api/v4'.format(base_url.rstrip('/'))
    project_id = quote_plus("redhat/centos-stream/{0}/{1}".format(namespace, repo_name))
    fork_url = '{0}/projects/{1}/fork'.format(api_url, project_id)

    headers = {
        'PRIVATE-TOKEN': token,
        'Accept': 'application/json',
        'Content-Type': 'application/json'
    }
    # define a new repository name/path to avoid collision with other projects
    safe_name = "centos_{0}_{1}".format(namespace, repo_name)
    payload = json.dumps({
        'name': safe_name,  # name of the project after forking
        'path': safe_name,
    })
    try:
        rv = requests.post(
            fork_url, headers=headers, data=payload, timeout=60)
    except ConnectionError as error:
        error_msg = ('The connection to API failed while trying to '
                     'create a new fork. The error was: {0}'.format(str(error)))
        raise rpkgError(error_msg)

    try:
        # Extract response json for debugging
        rv_json = rv.json()
        logger.debug("GitLab API response: '{0}'".format(rv_json))
    except Exception:
        pass

    if rv.ok:
        fork_id = rv.json()['id']
        try:
            # Unprotect c9s in fork
            rv = requests.delete('{0}/projects/{1}/protected_branches/{2}'.format(api_url, fork_id, 'c9s'), headers=headers)
        except ConnectionError as error:
            error_msg = ('The connection to API failed while trying to unprotect c9s branch'
                         'in the fork. The error was: {0}'.format(str(error)))
            raise rpkgError(error_msg)


        try:
            # Reprotect c9s to disable pushes
            # Only maintainers in gitlab are allowed to push with the following config
            # In CS, every pkg maintainer is considered as a developer in gitlab
            data = {'id': fork_id,
                    'name': 'c9s',
                    'allowed_to_push': [{'access_level': 40}],
                    'allowed_to_merge': [{'access_level': 40}],
                    }
            rv = requests.post('{0}/projects/{1}/protected_branches'.format(api_url, fork_id), json=data, headers=headers)
        except ConnectionError as error:
            error_msg = ('The connection to API failed while trying to reprotect c9s branch'
                         'in the fork fork. The error was: {0}'.format(str(error)))
            raise rpkgError(error_msg)

    base_error_msg = ('The following error occurred while creating a new fork: {0}')
    if not rv.ok:
        # fork was already created
        if rv.status_code == 409 or rv.reason == "Conflict":
            # When the repo already exists, the return doesn't contain the repo
            # path or username.  Make one more API call to get the username of
            # the token to construct the repo path.
            rv = requests.get('{0}/user'.format(api_url), headers=headers)
            username = rv.json()['username']
            return False, '{0}/{1}'.format(username, safe_name)
        # show hint for invalid, expired or revoked token
        elif rv.status_code == 401 or rv.reason == "Unauthorized":
            base_error_msg += '\nFor invalid or expired token refer to ' \
                '"{0} fork -h" to set a token in your user ' \
                'configuration.'.format(cli_name)
        raise rpkgError(base_error_msg.format(rv.text))

    return True, rv_json['path_with_namespace']


def do_add_remote(base_url, remote_base_url, repo, repo_path, remote_name):
    """
    Adds remote tracked repository
    :param base_url: a string of the URL repository
    :param remote_base_url: a string of the remote tracked repository
    :param repo: object, current project git repository
    :param repo_path: a string of the repository path
    :param remote_name: a string of the remote name
    :return: a bool; True if remote was created, False when already exists
    """
    parsed_url = urlparse(remote_base_url)
    remote_url = '{0}://{1}/{2}.git'.format(
        parsed_url.scheme,
        parsed_url.netloc,
        repo_path,
    )

    # check already existing remote
    for remote in repo.remotes:
        if remote.name == remote_name:
            return False

    try:
        repo.create_remote(remote_name, url=remote_url)
    except git.exc.GitCommandError as e:
        error_msg = "During create remote:\n  {0}\n  {1}".format(
            " ".join(e.command), e.stderr)
        raise rpkgError(error_msg)
    return True


def config_get_safely(config, section, option):
    """
    Returns option from the user's configuration file. In case of missing
    section or option method throws an exception with a human-readable
    warning and a possible hint.
    The method should be used especially in situations when there are newly
    added sections/options into the config. In this case, there is a risk that
    the user's config wasn't properly upgraded.

    :param config: ConfigParser object
    :param section: section name in the config
    :param option: name of the option
    :return: option value from the right section
    :rtype: str
    """

    hint = (
        "First (if possible), refer to the help of the current command "
        "(-h/--help).\n"
        "There also might be a new version of the config after upgrade.\n"
        "Hint: you can check if you have 'centpkg.conf.rpmnew' or "
        "'centpkg.conf.rpmsave' in the config directory. If yes, try to merge "
        "your changes to the config with the maintainer provided version "
        "(or replace centpkg.conf file with 'centpkg.conf.rpmnew')."
    )

    try:
        return config.get(section, option)
    except NoSectionError:
        msg = "Missing section '{0}' in the config file.".format(section)
        raise rpkgError("{0}\n{1}".format(msg, hint))
    except NoOptionError:
        msg = "Missing option '{0}' in the section '{1}' of the config file.".format(
            option, section
        )
        raise rpkgError("{0}\n{1}".format(msg, hint))
    except Exception:
        raise


def get_canonical_repo_name(config, repo_url):
    """
    Check whether the current repo is a fork and if so, retrieve the parent
    fork to get the proper name.
    """

    # Look up the repo and query for forked_from_project
    cli_name = config_get_safely(dist_git_config, '__default', 'cli_name')
    distgit_section = '{0}.distgit'.format(cli_name)
    distgit_api_base_url = config_get_safely(dist_git_config, distgit_section, "apibaseurl")

    # Make sure the fork comes from the same Gitlab instance
    parsed_repo_url = urlparse(repo_url)
    parsed_base_url = urlparse(distgit_api_base_url)

    try:
        distgit_token = config_get_safely(dist_git_config, distgit_section, 'token')

        api_url = '{0}/api/v4'.format(distgit_api_base_url.rstrip('/'))
        project_url = '{0}/projects/{1}'.format(api_url, quote_plus(parsed_repo_url.path.lstrip('/')))

        headers = {
            'PRIVATE-TOKEN': distgit_token,
            'Accept': 'application/json',
            'Content-Type': 'application/json'
        }

        rv = requests.get(project_url, headers=headers)
        rv.raise_for_status()

        # Extract response json for debugging
        rv_json = rv.json()

        canonical_repo_name = rv_json['forked_from_project']['name']
    except KeyError as e:
        # There was no 'forked_from_project' key, likely meaning the
        # user lacked permissions to read the API. Usually this means
        # they haven't supplied a token or it is expired.
        raise rpkgError("Insufficient Gitlab API permissions. Missing token?")

    except Exception as e:
        # For any other exception, just fall back to using the last segment
        # of the URL path.
        canonical_repo_name = parsed_repo_url.path.split('/')[-1]

    # Chop off a trailing .git if any
    return canonical_repo_name.rsplit('.git', 1)[0]

def get_repo_name(name, org='rpms'):
    """
    Try to parse the repository name in case it is a git url.

    Parameters
    ----------
    name: str
        The repository name, including the org name.
        It will try to retrieve both  repository name and org in case "name" is an url.

    org: str
        The org to use in case name parsing is needed.

    Returns
    -------
    str
        A string containing the repository name: $ORG/$REPO`.
        It will return the original `name` parameter in case of regex match failure.
    """
    if name.startswith(org):
        return name

    # This is probably a renamed fork, so try to find the fork's parent
    repo_name = get_canonical_repo_name(dist_git_config, name)

    return '%s/%s' % (org, repo_name)

def stream_mapping(csname):
    """
    Given a CentOS Stream name, map it to the corresponding RHEL name.

    Parameters
    ----------
    csname: str
        The CentOS Stream name.

    Returns
    -------
    str
        Correspoinding RHEL name.
    """
    if csname == "c8s" or csname == "cs8" :
        return "rhel-8"
    if csname == "c9s" or csname == "cs9" :
        return "rhel-9"
    if csname == "c10s" or csname == "cs10" :
        return "rhel-10"
    if csname == "c11s" or csname == "cs11" :
        return "rhel-11"
    return None

def does_divergent_branch_exist(repo_name, rhel_version, rhel_dist_git, pp_api_url, namespace):
    logger = logging.getLogger(__name__)

    # Determine if the Y-1 branch exists for this repo

    # Look up the Y-1 branch name
    divergent_branch = determine_divergent_branch(
        rhel_version,
        pp_api_url,
        namespace,
    )
    logger.debug("Divergent branch: {}".format(divergent_branch))
    
    g = gitpython.cmd.Git()
    try:
        g.ls_remote(
            "--exit-code",
            os.path.join(rhel_dist_git, namespace, repo_name),
            divergent_branch,
        )
        branch_exists = True
    except gitpython.GitCommandError as e:
        t, v, tb = sys.exc_info()
        # `git ls-remote --exit-code` returns "2" if it cannot find the ref
        if e.status == 2:
            branch_exists = False
        else:
            raise
    return branch_exists

def determine_divergent_branch(rhel_version, pp_api_url, namespace):
    logger = logging.getLogger(__name__)

    # Query the "package pages" API for the current active Y-stream release
    # Phase 230 is "Planning / Development / Testing" (AKA DeveTestDoc)
    request_params = {
        "phase": 230,
        "product__shortname": "rhel",
        "relgroup__shortname": rhel_version,
        "format": "json",
    }

    res = requests.get(
        os.path.join(pp_api_url, "latest", "releases"),
        params=request_params,
        timeout=60,
    )
    res.raise_for_status()
    payload = json.loads(res.text)
    logger.debug(
        "Response from PP API: {}".format(json.dumps(payload, indent=2))
    )
    if len(payload) < 1:
        raise RuntimeError("Received zero potential release matches)")

    active_y_version = -1
    for entry in payload:
        shortname = entry["shortname"]

        # The shortname is in the form rhel-9-1.0
        # Extract the active Y-stream version
        m = re.search("(?<={}-)\d+(?=\.0)".format(rhel_version), shortname)
        if not m:
            raise RuntimeError(
                "Could not determine active Y-stream version from shortname"
            )
        y_version = int(m.group(0))
        if y_version > active_y_version:
            active_y_version = y_version

    # The divergent branch is Y-1
    return "{}.{}.0".format(rhel_version, active_y_version - 1)

def _datesplit(isodate):
    date_string_tuple = isodate.split('-')
    return [ int(x) for x in date_string_tuple ]


def determine_active_y_version(rhel_version, pp_api_url):
    """
    Returns: A 2-tuple of the active Y-stream version(int) and whether we are
    in the Exception Phase(bool)
    """
    logger = logging.getLogger(__name__)

    # Query the "package pages" API for the current active Y-stream release
    # Phase 230 is "Planning / Development / Testing" (AKA DeveTestDoc)
    request_params = {
        "phase": 230,
        "product__shortname": "rhel",
        "relgroup__shortname": rhel_version,
        "format": "json",
    }

    res = requests.get(
        os.path.join(pp_api_url, "latest", "releases"),
        params=request_params,
        timeout=60,
    )
    res.raise_for_status()
    payload = json.loads(res.text)
    logger.debug(
        "Response from PP API: {}".format(json.dumps(payload, indent=2))
    )
    if len(payload) < 1:
        raise RuntimeError("Received zero potential release matches)")

    release_id = -1
    active_y_version = -1
    for entry in payload:
        shortname = entry["shortname"]

        # The shortname is in the form rhel-9-1.0
        # Extract the active Y-stream version
        m = re.search("(?<={}-)\d+(?=\.0)".format(rhel_version), shortname)
        if not m:
            raise RuntimeError(
                "Could not determine active Y-stream version from shortname"
            )
        y_version = int(m.group(0))
        if y_version > active_y_version:
            active_y_version = y_version
            release_id = entry["id"]

    # Now look up whether we are in the Exception Phase for this Y-stream release
    request_params = {
        "name__regex": "(Excep|Stabiliza)tion Phase",
        "format": "json",
    }
    res = requests.get(os.path.join(pp_api_url, "latest", "releases", str(release_id), "schedule-tasks"), params=request_params)
    res.raise_for_status()
    payload = json.loads(res.text)
    logger.debug(
        "Response from phase lookup: {}".format(json.dumps(payload, indent=2))
    )

    # This lookup *must* return exactly one value or the Product Pages are
    # wrong and must be fixed.
    assert len(payload) == 1

    # Determine if this Y-stream release is in the exception phase
    today = datetime.now(tz=pytz.utc).date()
    exception_start_date = date(*_datesplit(payload[0]["date_start"]))
    in_exception_phase = today >= exception_start_date

    logger.debug("Active Y-stream: {}, Enforcing: {}".format(active_y_version, in_exception_phase))

    return active_y_version, in_exception_phase