#!/usr/bin/env python3
"""
filter kmods into groups for packaging, see filtermods.adoc
"""
import argparse
import os
import re
import subprocess
import sys
import yaml
import unittest
from logging import getLogger, DEBUG, INFO, WARN, ERROR, CRITICAL, NOTSET, FileHandler, StreamHandler, Formatter, Logger
from typing import Optional
log = getLogger('filtermods')
def get_td(filename):
script_dir = os.path.dirname(os.path.realpath(__file__))
return os.path.join(script_dir, 'filtermods-testdata', filename)
def run_command(cmd, cwddir=None):
p = subprocess.Popen(cmd, cwd=cwddir, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True)
out, err = p.communicate()
out_str = out.decode('utf-8')
err_str = err.decode('utf-8')
return p.returncode, out_str, err_str
def safe_run_command(cmd, cwddir=None):
log.info('%s', cmd)
retcode, out, err = run_command(cmd, cwddir)
if retcode != 0:
log.warning('Command failed: %s, ret_code: %d', cmd, retcode)
log.warning(out)
log.warning(err)
raise Exception(err)
log.info(' ^^[OK]')
return retcode, out, err
def setup_logging(log_filename, stdout_log_level):
log_format = '%(asctime)s %(levelname)7.7s %(funcName)20.20s:%(lineno)4s %(message)s'
log = getLogger('filtermods')
log.setLevel(DEBUG)
handler = StreamHandler(sys.stdout)
formatter = Formatter(log_format, '%H:%M:%S')
handler.setFormatter(formatter)
handler.setLevel(stdout_log_level)
log.addHandler(handler)
log.debug('stdout logging on')
if log_filename:
file_handler = FileHandler(log_filename, 'w')
file_handler.setFormatter(formatter)
file_handler.setLevel(DEBUG)
log.addHandler(file_handler)
log.info('file logging on: %s', log_filename)
return log
def canon_modname(kmod_pathname: str) -> str:
name = os.path.basename(kmod_pathname)
if name.endswith('.xz'):
name = name[:-3]
return name
class HierarchyObject:
def __init__(self):
self.depends_on = set()
def get_topo_order(obj_list: list[HierarchyObject], func_get_linked_objs=lambda x: x.depends_on) -> list[HierarchyObject]:
topo_order = []
objs_to_sort = set(obj_list)
objs_sorted = set()
while len(objs_to_sort) > 0:
no_deps = set()
for obj in objs_to_sort:
linked = func_get_linked_objs(obj)
if not linked:
no_deps.add(obj)
else:
all_deps_sorted = True
for dep in linked:
if dep not in objs_sorted:
all_deps_sorted = False
break
if all_deps_sorted:
no_deps.add(obj)
for obj in no_deps:
topo_order.append(obj)
objs_sorted.add(obj)
objs_to_sort.remove(obj)
return topo_order
class KMod(HierarchyObject):
def __init__(self, kmod_pathname: str) -> None:
super(KMod, self).__init__()
self.name: str = canon_modname(kmod_pathname)
self.kmod_pathname: str = kmod_pathname
self.is_dependency_for: set[KMod] = set()
self.assigned_to_pkg: Optional[KModPackage] = None
self.preferred_pkg: Optional[KModPackage] = None
self.rule_specifity: int = 0
self.allowed_list: Optional[set[KModPackage]] = None
self.err = 0
def __str__(self):
depends_on = ''
for kmod in self.depends_on:
depends_on = depends_on + ' ' + kmod.name
return '%s {%s}' % (self.name, depends_on)
class KModList():
def __init__(self) -> None:
self.name_to_kmod_map: dict[str, KMod] = {}
self.topo_order: Optional[list[KMod]] = None
def get(self, kmod_pathname, create_if_missing=False):
kmod_name = canon_modname(kmod_pathname)
if kmod_name in self.name_to_kmod_map:
return self.name_to_kmod_map[kmod_name]
if not create_if_missing:
return None
kmod = KMod(kmod_pathname)
# log.debug('Adding kmod %s (%s) to list', kmod.name, kmod.kmod_pathname)
if kmod.kmod_pathname != kmod_pathname:
raise Exception('Already have %s, but path changed? %s', kmod_name, kmod_pathname)
if not kmod.name:
raise Exception('Each kmod needs a name')
self.name_to_kmod_map[kmod_name] = kmod
return kmod
def process_depmod_line(self, line):
tmp = line.split(':')
if len(tmp) != 2:
raise Exception('Depmod line has unexpected format: %s', line)
kmod_pathname = tmp[0].strip()
dependencies_pathnames = tmp[1].strip()
kmod = self.get(kmod_pathname, create_if_missing=True)
if dependencies_pathnames:
for dep_pathname in dependencies_pathnames.split(' '):
dep_kmod = self.get(dep_pathname, create_if_missing=True)
kmod.depends_on.add(dep_kmod)
dep_kmod.is_dependency_for.add(kmod)
def load_depmod_file(self, filepath):
with open(filepath) as f:
lines = f.readlines()
for line in lines:
if not line or line.startswith('#'):
continue
self.process_depmod_line(line)
log.info('depmod %s loaded, number of kmods: %s', filepath, len(self.name_to_kmod_map))
def dump(self):
for kmod in self.name_to_kmod_map.values():
print(kmod)
def get_topo_order(self):
if self.topo_order is None:
self.topo_order = get_topo_order(self.name_to_kmod_map.values())
# TODO: what if we add something after?
return self.topo_order
def get_alphabetical_order(self):
kmods = list(self.name_to_kmod_map.values())
kmods.sort(key=lambda k: k.kmod_pathname)
return kmods
def load_kmods_from_dir(self, topdir):
ret = []
for root, dirs, files in os.walk(topdir):
for filename in files:
if filename.endswith('.xz'):
filename = filename[:-3]
if filename.endswith('.ko'):
kmod_pathname = os.path.join(root, filename)
ret.append(kmod_pathname)
return ret
def check_depmod_has_all_kmods(self, dirpath):
ret = self.load_kmods_from_dir(dirpath)
for kmod_pathname in ret:
kmod = self.get(kmod_pathname)
if not kmod:
raise Exception('Could not find kmod %s in depmod', kmod_pathname)
log.debug('OK: all (%s) kmods from %s are known', len(ret), dirpath)
class KModPackage(HierarchyObject):
def _get_depends_on(pkg):
return pkg.depends_on
def _get_deps_for(pkg):
return pkg.is_dependency_for
def __init__(self, name: str, depends_on=[]) -> None:
self.name: str = name
self.depends_on: set[KModPackage] = set(depends_on)
self.is_dependency_for: set[KModPackage] = set()
for pkg in self.depends_on:
pkg.is_dependency_for.add(self)
self.all_depends_on_list: list[KModPackage] = self._get_all_linked(KModPackage._get_depends_on)
self.all_depends_on: set[KModPackage] = set(self.all_depends_on_list)
self.all_deps_for: Optional[set[KModPackage]] = None
self.default = False
log.debug('KModPackage created %s, depends_on: %s', name, [pkg.name for pkg in depends_on])
def __repr__(self):
return self.name
def get_all_deps_for(self):
if self.all_deps_for is None:
self.all_deps_for = set(self._get_all_linked(KModPackage._get_deps_for))
return self.all_deps_for
def _get_all_linked(self, func_get_links):
ret = []
explore = func_get_links(self)
while len(explore) > 0:
new_explore = set()
for pkg in explore:
if pkg not in ret:
ret.append(pkg)
for dep in func_get_links(pkg):
new_explore.add(dep)
explore = new_explore
return ret
class KModPackageList(HierarchyObject):
def __init__(self) -> None:
self.name_to_obj: dict[str, KModPackage] = {}
self.kmod_pkg_list: list[KModPackage] = []
self.rules: list[tuple[str, str, str]] = []
def get(self, pkgname):
if pkgname in self.name_to_obj:
return self.name_to_obj[pkgname]
return None
def add_kmod_pkg(self, pkg):
self.name_to_obj[pkg.name] = pkg
self.kmod_pkg_list.append(pkg)
def __iter__(self):
return iter(self.kmod_pkg_list)
def get_kmods_matching_re(kmod_list: KModList, param_re: str) -> list[KMod]:
ret = []
# first subdir can be anything - this is because during build everything
# goes to kernel, but subpackages can move it (e.g. to extra)
param_re = '[^/]+/' + param_re
pattern = re.compile(param_re)
for kmod in kmod_list.get_topo_order():
m = pattern.match(kmod.kmod_pathname)
if m:
ret.append(kmod)
return ret
def walk_kmod_chain(kmod, myfunc):
visited = set()
def visit_kmod(kmod, parent_kmod, func_to_call):
func_to_call(kmod, parent_kmod)
visited.add(kmod)
for dep in kmod.depends_on:
if dep not in visited:
visit_kmod(dep, kmod, func_to_call)
visit_kmod(kmod, None, myfunc)
return visited
# is pkg a parent to any pkg from "alist"
def is_pkg_parent_to_any(pkg: KModPackage, alist: set[KModPackage]) -> bool:
if pkg in alist:
return True
for some_pkg in alist:
if some_pkg in pkg.all_depends_on:
return True
return False
# is pkg a child to any pkg from "alist"
def is_pkg_child_to_any(pkg: KModPackage, alist: set[KModPackage]) -> bool:
if pkg in alist:
return True
for some_pkg in alist:
if pkg in some_pkg.all_depends_on:
return True
return False
def update_allowed(kmod: KMod, visited: set[KMod], update_linked: bool = False) -> int:
num_updated = 0
init = False
to_remove = set()
if kmod in visited:
return num_updated
visited.add(kmod)
# if we have nothing, try to initialise based on parents and children
if kmod.allowed_list is None:
init_allowed_list: set[KModPackage] = set()
# init from children
for kmod_dep in kmod.depends_on:
if kmod_dep.allowed_list:
init_allowed_list.update(kmod_dep.allowed_list)
init = True
if init:
# also add any pkgs that pkgs from list could depend on
deps_for = set()
for pkg in init_allowed_list:
deps_for.update(pkg.get_all_deps_for())
init_allowed_list.update(deps_for)
# init from parents
if not init:
for kmod_par in kmod.is_dependency_for:
if kmod_par.allowed_list:
init_allowed_list.update(kmod_par.allowed_list)
# also add any pkgs that depend on pkgs from list
for pkg in kmod_par.allowed_list:
init_allowed_list.update(pkg.all_depends_on)
init = True
if init:
kmod.allowed_list = init_allowed_list
log.debug('%s: init to %s', kmod.name, [x.name for x in kmod.allowed_list])
kmod_allowed_list = kmod.allowed_list or set()
# log.debug('%s: update to %s', kmod.name, [x.name for x in kmod_allowed_list])
# each allowed is parent to at least one child allowed [for _all_ children]
for pkg in kmod_allowed_list:
for kmod_dep in kmod.depends_on:
if kmod_dep.allowed_list is None or kmod_dep.err:
continue
if not is_pkg_parent_to_any(pkg, kmod_dep.allowed_list):
to_remove.add(pkg)
log.debug('%s: remove %s from allowed, child: %s [%s]',
kmod.name, [pkg.name], kmod_dep.name, [x.name for x in kmod_dep.allowed_list])
# each allowed is child to at least one parent allowed [for _all_ parents]
for pkg in kmod_allowed_list:
for kmod_par in kmod.is_dependency_for:
if kmod_par.allowed_list is None or kmod_par.err:
continue
if not is_pkg_child_to_any(pkg, kmod_par.allowed_list):
to_remove.add(pkg)
log.debug('%s: remove %s from allowed, parent: %s %s',
kmod.name, [pkg.name], kmod_par.name, [x.name for x in kmod_par.allowed_list])
for pkg in to_remove:
kmod_allowed_list.remove(pkg)
num_updated = num_updated + 1
if len(kmod_allowed_list) == 0:
log.error('%s: cleared entire allow list', kmod.name)
kmod.err = 1
if init or to_remove or update_linked:
if to_remove:
log.debug('%s: updated to %s', kmod.name, [x.name for x in kmod_allowed_list])
for kmod_dep in kmod.depends_on:
num_updated = num_updated + update_allowed(kmod_dep, visited)
for kmod_dep in kmod.is_dependency_for:
num_updated = num_updated + update_allowed(kmod_dep, visited)
return num_updated
def apply_initial_labels(pkg_list: KModPackageList, kmod_list: KModList, treat_default_as_wants=False):
log.debug('')
for cur_rule in ['needs', 'wants', 'default']:
for package_name, rule_type, rule in pkg_list.rules:
pkg_obj = pkg_list.get(package_name)
if not pkg_obj:
log.error('no package with name %s', package_name)
if cur_rule != rule_type:
continue
if rule_type == 'default' and treat_default_as_wants:
rule_type = 'wants'
if 'needs' == rule_type:
# kmod_matching is already in topo_order
kmod_matching = get_kmods_matching_re(kmod_list, rule)
for kmod in kmod_matching:
if kmod.assigned_to_pkg and kmod.assigned_to_pkg != pkg_obj:
log.error('%s: can not be required by 2 pkgs %s %s', kmod.name, kmod.assigned_to_pkg, pkg_obj.name)
else:
kmod.assigned_to_pkg = pkg_obj
kmod.allowed_list = set([pkg_obj])
kmod.rule_specifity = len(kmod_matching)
log.debug('%s: needed by %s', kmod.name, [pkg_obj.name])
if 'wants' == rule_type:
# kmod_matching is already in topo_order
kmod_matching = get_kmods_matching_re(kmod_list, rule)
for kmod in kmod_matching:
if kmod.allowed_list is None:
kmod.allowed_list = set(pkg_obj.all_depends_on)
kmod.allowed_list.add(pkg_obj)
kmod.preferred_pkg = pkg_obj
kmod.rule_specifity = len(kmod_matching)
log.debug('%s: wanted by %s, init allowed to %s', kmod.name, [pkg_obj.name], [pkg.name for pkg in kmod.allowed_list])
else:
if kmod.assigned_to_pkg:
log.debug('%s: ignoring wants by %s, already assigned to %s', kmod.name, pkg_obj.name, kmod.assigned_to_pkg.name)
else:
# rule specifity may not be good idea, so just log it
# e.g. .*test.* may not be more specific than arch/x86/.*
log.debug('already have wants for %s %s, new rule: %s', kmod.name, kmod.preferred_pkg, rule)
if 'default' == rule_type:
pkg_obj.default = True
def settle(kmod_list: KModList) -> None:
kmod_topo_order = list(kmod_list.get_topo_order())
for i in range(0, 25):
log.debug('settle start %s', i)
ret = 0
for kmod in kmod_topo_order:
visited: set[KMod] = set()
ret = ret + update_allowed(kmod, visited)
log.debug('settle %s updated nodes: %s', i, ret)
if ret == 0:
break
kmod_topo_order.reverse()
# phase 1 - propagate initial labels
def propagate_labels_1(pkg_list: KModPackageList, kmod_list: KModList):
log.info('')
settle(kmod_list)
def pick_closest_to_preffered(preferred_pkg: KModPackage, allowed_set: set[KModPackage]):
for child in preferred_pkg.all_depends_on_list:
if child in allowed_set:
return child
return None
# phase 2 - if some kmods allow more than one pkg, pick wanted package
def propagate_labels_2(pkg_list: KModPackageList, kmod_list: KModList):
log.info('')
ret = 0
for kmod in kmod_list.get_topo_order():
update_linked = False
if kmod.allowed_list is None and kmod.preferred_pkg:
log.error('%s: has no allowed list but has preferred_pkg %s', kmod.name, kmod.preferred_pkg.name)
kmod.err = 1
if kmod.allowed_list and kmod.preferred_pkg:
chosen_pkg = None
if kmod.preferred_pkg in kmod.allowed_list:
chosen_pkg = kmod.preferred_pkg
else:
chosen_pkg = pick_closest_to_preffered(kmod.preferred_pkg, kmod.allowed_list)
if chosen_pkg is not None:
kmod.allowed_list = set([chosen_pkg])
log.debug('%s: making to prefer %s (preffered is %s), allowed: %s', kmod.name, chosen_pkg.name,
kmod.preferred_pkg.name, [pkg.name for pkg in kmod.allowed_list])
update_linked = True
visited: set[KMod] = set()
ret = ret + update_allowed(kmod, visited, update_linked)
log.debug('updated nodes: %s', ret)
settle(kmod_list)
# Is this the best pick? ¯\_(ツ)_/¯
def pick_topmost_allowed(allowed_set: set[KModPackage]) -> KModPackage:
topmost = next(iter(allowed_set))
for pkg in allowed_set:
if len(pkg.all_depends_on) > len(topmost.all_depends_on):
topmost = pkg
return topmost
# phase 3 - assign everything else that remained
def propagate_labels_3(pkg_list: KModPackageList, kmod_list: KModList):
log.info('')
ret = 0
kmod_topo_order = list(kmod_list.get_topo_order())
# do reverse topo order to cover children faster
kmod_topo_order.reverse()
default_pkg = None
default_name = ''
for pkg_obj in pkg_list:
if pkg_obj.default:
if default_pkg:
log.error('Already have default pkg: %s / %s', default_pkg.name, pkg_obj.name)
else:
default_pkg = pkg_obj
default_name = default_pkg.name
for kmod in kmod_topo_order:
update_linked = False
chosen_pkg = None
if kmod.allowed_list is None:
if default_pkg:
chosen_pkg = default_pkg
else:
log.error('%s not assigned and there is no default', kmod.name)
elif len(kmod.allowed_list) > 1:
if default_pkg:
if default_pkg in kmod.allowed_list:
chosen_pkg = default_pkg
else:
chosen_pkg = pick_closest_to_preffered(default_pkg, kmod.allowed_list)
if chosen_pkg:
log.debug('closest is %s', chosen_pkg.name)
if not chosen_pkg:
# multiple pkgs are allowed, but none is preferred or default
chosen_pkg = pick_topmost_allowed(kmod.allowed_list)
log.debug('topmost is %s', chosen_pkg.name)
if chosen_pkg:
kmod.allowed_list = set([chosen_pkg])
log.debug('%s: making to prefer %s (default: %s)', kmod.name, [chosen_pkg.name], default_name)
update_linked = True
visited: set[KMod] = set()
ret = ret + update_allowed(kmod, visited, update_linked)
log.debug('updated nodes: %s', ret)
settle(kmod_list)
def load_config(config_pathname: str, kmod_list: KModList, variants=[]):
kmod_pkg_list = KModPackageList()
with open(config_pathname, 'r') as file:
yobj = yaml.safe_load(file)
for pkg_dict in yobj['packages']:
pkg_name = pkg_dict['name']
depends_on = pkg_dict.get('depends-on', [])
if_variant_in = pkg_dict.get('if_variant_in')
if if_variant_in is not None:
if not (set(variants) & set(if_variant_in)):
log.debug('Skipping %s for variants %s', pkg_name, variants)
continue
pkg_dep_list = []
for pkg_dep_name in depends_on:
pkg_dep = kmod_pkg_list.get(pkg_dep_name)
pkg_dep_list.append(pkg_dep)
pkg_obj = kmod_pkg_list.get(pkg_name)
if not pkg_obj:
pkg_obj = KModPackage(pkg_name, pkg_dep_list)
kmod_pkg_list.add_kmod_pkg(pkg_obj)
else:
log.error('package %s already exists?', pkg_name)
rules_list = yobj.get('rules', [])
for rule_dict in rules_list:
if_variant_in = rule_dict.get('if_variant_in')
exact_pkg = rule_dict.get('exact_pkg')
for key, value in rule_dict.items():
if key in ['if_variant_in', 'exact_pkg']:
continue
if if_variant_in is not None:
if not (set(variants) & set(if_variant_in)):
continue
rule = key
package_name = value
if not kmod_pkg_list.get(package_name):
raise Exception('Unknown package ' + package_name)
rule_type = 'wants'
if exact_pkg is True:
rule_type = 'needs'
elif key == 'default':
rule_type = 'default'
rule = '.*'
log.debug('found rule: %s', (package_name, rule_type, rule))
kmod_pkg_list.rules.append((package_name, rule_type, rule))
log.info('loaded config, rules: %s', len(kmod_pkg_list.rules))
return kmod_pkg_list
def make_pictures(pkg_list: KModPackageList, kmod_list: KModList, filename: str, print_allowed=True):
f = open(filename + '.dot', 'w')
f.write('digraph {\n')
f.write('node [style=filled fillcolor="#f8f8f8"]\n')
f.write(' subgraph kmods {\n')
f.write(' "Legend" [shape=note label="kmod name\\n{desired package}\\nresulting package(s)"]\n')
for kmod in kmod_list.get_topo_order():
pkg_name = ''
attr = ''
if kmod.assigned_to_pkg:
attr = 'fillcolor="#eddad5" color="#b22800"'
pkg_name = kmod.assigned_to_pkg.name + "!"
if kmod.preferred_pkg:
attr = 'fillcolor="#ddddf5" color="#b268fe"'
pkg_name = kmod.preferred_pkg.name + "?"
allowed = ''
if kmod.allowed_list and print_allowed:
allowed = '=' + ' '.join([pkg.name for pkg in kmod.allowed_list])
f.write(' "%s" [label="%s\\n%s\\n%s" shape=box %s] \n' % (kmod.name, kmod.name, pkg_name, allowed, attr))
for kmod in kmod_list.get_topo_order():
for kmod_dep in kmod.depends_on:
f.write(' "%s" -> "%s";\n' % (kmod.name, kmod_dep.name))
f.write(' }\n')
f.write(' subgraph packages {\n')
for pkg in pkg_list:
desc = ''
if pkg.default:
desc = '/default'
f.write(' "%s" [label="%s\\n%s"] \n' % (pkg.name, pkg.name, desc))
for pkg_dep in pkg.depends_on:
f.write(' "%s" -> "%s";\n' % (pkg.name, pkg_dep.name))
f.write(' }\n')
f.write('}\n')
f.close()
# safe_run_command('dot -Tpng -Gdpi=150 %s.dot > %s.png' % (filename, filename))
safe_run_command('dot -Tsvg %s.dot > %s.svg' % (filename, filename))
def sort_kmods(depmod_pathname: str, config_str: str, variants=[], do_pictures=''):
log.info('%s %s', depmod_pathname, config_str)
kmod_list = KModList()
kmod_list.load_depmod_file(depmod_pathname)
pkg_list = load_config(config_str, kmod_list, variants)
basename = os.path.splitext(config_str)[0]
apply_initial_labels(pkg_list, kmod_list)
if '0' in do_pictures:
make_pictures(pkg_list, kmod_list, basename + "_0", print_allowed=False)
try:
propagate_labels_1(pkg_list, kmod_list)
if '1' in do_pictures:
make_pictures(pkg_list, kmod_list, basename + "_1")
propagate_labels_2(pkg_list, kmod_list)
propagate_labels_3(pkg_list, kmod_list)
finally:
if 'f' in do_pictures:
make_pictures(pkg_list, kmod_list, basename + "_f")
return pkg_list, kmod_list
def abbrev_list_for_report(alist: list[KMod]) -> str:
tmp_str = []
for kmod in alist:
if kmod.allowed_list:
tmp_str.append('%s(%s)' % (kmod.name, ' '.join([x.name for x in kmod.allowed_list])))
ret = ', '.join(tmp_str)
return ret
def print_report(pkg_list: KModPackageList, kmod_list: KModList):
log.info('*'*26 + ' REPORT ' + '*'*26)
kmods_err = 0
kmods_moved = 0
kmods_good = 0
for kmod in kmod_list.get_topo_order():
if not kmod.allowed_list:
log.error('%s: not assigned to any package! Please check the full log for details', kmod.name)
kmods_err = kmods_err + 1
continue
if len(kmod.allowed_list) > 1:
log.error('%s: assigned to more than one package! Please check the full log for details', kmod.name)
kmods_err = kmods_err + 1
continue
if not kmod.preferred_pkg:
# config doesn't care where it ended up
kmods_good = kmods_good + 1
continue
if kmod.preferred_pkg in kmod.allowed_list:
# it ended up where it needs to be
kmods_good = kmods_good + 1
continue
bad_parent_list = []
for kmod_parent in kmod.is_dependency_for:
if not is_pkg_child_to_any(kmod.preferred_pkg, kmod_parent.allowed_list):
bad_parent_list.append(kmod_parent)
bad_child_list = []
for kmod_child in kmod.depends_on:
if not is_pkg_parent_to_any(kmod.preferred_pkg, kmod_child.allowed_list):
bad_child_list.append(kmod_parent)
log.info('%s: wanted by %s but ended up in %s', kmod.name, [kmod.preferred_pkg.name], [pkg.name for pkg in kmod.allowed_list])
if bad_parent_list:
log.info('\thas conflicting parent: %s', abbrev_list_for_report(bad_parent_list))
if bad_child_list:
log.info('\thas conflicting children: %s', abbrev_list_for_report(bad_child_list))
kmods_moved = kmods_moved + 1
log.info('No. of kmod(s) assigned to preferred package: %s', kmods_good)
log.info('No. of kmod(s) moved to a related package: %s', kmods_moved)
log.info('No. of kmod(s) which could not be assigned: %s', kmods_err)
log.info('*'*60)
return kmods_err
def write_modules_lists(path_prefix: str, pkg_list: KModPackageList, kmod_list: KModList):
kmod_list_alphabetical = sorted(kmod_list.get_topo_order(), key=lambda x: x.kmod_pathname)
for pkg in pkg_list:
output_path = os.path.join(path_prefix, pkg.name + '.list')
i = 0
with open(output_path, "w") as file:
for kmod in kmod_list_alphabetical:
if kmod.allowed_list and pkg in kmod.allowed_list:
file.write(kmod.kmod_pathname)
file.write('\n')
i = i + 1
log.info('Module list %s created with %s kmods', output_path, i)
class FiltermodTests(unittest.TestCase):
do_pictures = ''
def setUp(self):
self.pkg_list = None
self.kmod_list = None
def _is_kmod_pkg(self, kmodname, pkgnames):
self.assertIsNotNone(self.pkg_list)
self.assertIsNotNone(self.kmod_list)
if type(pkgnames) is str:
pkgnames = [pkgnames]
expected_pkgs = []
for pkgname in pkgnames:
pkg = self.pkg_list.get(pkgname)
self.assertIsNotNone(pkg)
expected_pkgs.append(pkg)
kmod = self.kmod_list.get(kmodname)
self.assertIsNotNone(kmod)
if expected_pkgs:
self.assertTrue(len(kmod.allowed_list) == 1)
self.assertIn(next(iter(kmod.allowed_list)), expected_pkgs)
else:
self.assertEqual(kmod.allowed_list, set())
def test1a(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test1.dep'), get_td('test1.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._is_kmod_pkg('kmod1', 'modules-core')
self._is_kmod_pkg('kmod2', 'modules-core')
self._is_kmod_pkg('kmod3', 'modules')
self._is_kmod_pkg('kmod4', 'modules')
def test1b(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test1.dep'), get_td('test1.yaml'),
do_pictures=FiltermodTests.do_pictures,
variants=['rt'])
self.assertIsNotNone(self.pkg_list.get('rt-kvm'))
self._is_kmod_pkg('kmod1', 'modules-core')
self._is_kmod_pkg('kmod2', 'modules-core')
self._is_kmod_pkg('kmod3', 'modules')
self._is_kmod_pkg('kmod4', 'rt-kvm')
def test2(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test2.dep'), get_td('test2.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._is_kmod_pkg('kmod1', 'modules-extra')
self._is_kmod_pkg('kmod2', 'modules')
self._is_kmod_pkg('kmod3', 'modules-core')
self._is_kmod_pkg('kmod4', 'modules-core')
self._is_kmod_pkg('kmod5', 'modules-core')
self._is_kmod_pkg('kmod6', 'modules-extra')
self._is_kmod_pkg('kmod8', 'modules')
def test3(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test3.dep'), get_td('test3.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._is_kmod_pkg('kmod2', ['modules-core', 'modules'])
self._is_kmod_pkg('kmod4', ['modules-core', 'modules-extra'])
self._is_kmod_pkg('kmod5', 'modules-core')
self._is_kmod_pkg('kmod6', 'modules-core')
def test4(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test4.dep'), get_td('test4.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._is_kmod_pkg('kmod0', 'modules')
self._is_kmod_pkg('kmod1', 'modules')
self._is_kmod_pkg('kmod2', 'modules')
self._is_kmod_pkg('kmod3', 'modules')
self._is_kmod_pkg('kmod4', 'modules')
self._is_kmod_pkg('kmod5', 'modules')
self._is_kmod_pkg('kmod6', 'modules')
self._is_kmod_pkg('kmod7', 'modules-partner2')
self._is_kmod_pkg('kmod8', 'modules-partner')
self._is_kmod_pkg('kmod9', 'modules-partner')
def _check_preffered_pkg(self, kmodname, pkgname):
kmod = self.kmod_list.get(kmodname)
self.assertIsNotNone(kmod)
self.assertEqual(kmod.preferred_pkg.name, pkgname)
def test5(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test5.dep'), get_td('test5.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._check_preffered_pkg('kmod2', 'modules')
self._check_preffered_pkg('kmod3', 'modules-partner')
self._check_preffered_pkg('kmod4', 'modules-partner')
def test6(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test6.dep'), get_td('test6.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._is_kmod_pkg('kmod2', 'modules-core')
self._is_kmod_pkg('kmod3', 'modules')
self._is_kmod_pkg('kmod4', 'modules')
self._is_kmod_pkg('kmod1', [])
def test7(self):
self.pkg_list, self.kmod_list = sort_kmods(get_td('test7.dep'), get_td('test7.yaml'),
do_pictures=FiltermodTests.do_pictures)
self._is_kmod_pkg('kmod1', 'modules-core')
self._is_kmod_pkg('kmod2', 'modules-core')
self._is_kmod_pkg('kmod3', 'modules-other')
self._is_kmod_pkg('kmod4', 'modules')
def do_rpm_mapping_test(config_pathname, kmod_rpms):
kmod_dict = {}
def get_kmods_matching_re(pkgname, param_re):
matched = []
param_re = '^kernel/' + param_re
pattern = re.compile(param_re)
for kmod_pathname, kmod_rec in kmod_dict.items():
m = pattern.match(kmod_pathname)
if m:
matched.append(kmod_pathname)
return matched
for kmod_rpm in kmod_rpms.split():
filename = os.path.basename(kmod_rpm)
m = re.match(r'.*-modules-([^-]+)', filename)
if not m:
raise Exception('Unrecognized rpm ' + kmod_rpm + ', expected a kernel-modules* rpm')
pkgname = 'modules-' + m.group(1)
m = re.match(r'modules-([0-9.]+)', pkgname)
if m:
pkgname = 'modules'
tmpdir = os.path.join('tmp.filtermods', filename, pkgname)
if not os.path.exists(tmpdir):
log.info('creating tmp dir %s', tmpdir)
os.makedirs(tmpdir)
safe_run_command('rpm2cpio %s | cpio -id' % (os.path.abspath(kmod_rpm)), cwddir=tmpdir)
else:
log.info('using cached content of tmp dir: %s', tmpdir)
for path, subdirs, files in os.walk(tmpdir):
for name in files:
ret = re.match(r'.*/'+pkgname+'/lib/modules/[^/]+/[^/]+/(.*)', os.path.join(path, name))
if not ret:
continue
kmod_pathname = 'kernel/' + ret.group(1)
if not kmod_pathname.endswith('.xz') and not kmod_pathname.endswith('.ko'):
continue
if kmod_pathname in kmod_dict:
if pkgname not in kmod_dict[kmod_pathname]['target_pkgs']:
kmod_dict[kmod_pathname]['target_pkgs'].append(pkgname)
else:
kmod_dict[kmod_pathname] = {}
kmod_dict[kmod_pathname]['target_pkgs'] = [pkgname]
kmod_dict[kmod_pathname]['pkg'] = None
kmod_dict[kmod_pathname]['matched'] = False
kmod_pkg_list = load_config(config_pathname, None)
for package_name, rule_type, rule in kmod_pkg_list.rules:
kmod_names = get_kmods_matching_re(package_name, rule)
for kmod_pathname in kmod_names:
kmod_rec = kmod_dict[kmod_pathname]
if not kmod_rec['matched']:
kmod_rec['matched'] = True
kmod_rec['pkg'] = package_name
for kmod_pathname, kmod_rec in kmod_dict.items():
if kmod_rec['pkg'] not in kmod_rec['target_pkgs']:
log.warning('kmod %s wanted by config in %s, in tree it is: %s', kmod_pathname, [kmod_rec['pkg']], kmod_rec['target_pkgs'])
elif len(kmod_rec['target_pkgs']) > 1:
# if set(kmod_rec['target_pkgs']) != set(['modules', 'modules-core']):
log.warning('kmod %s multiple matches in tree: %s/%s', kmod_pathname, [kmod_rec['pkg']], kmod_rec['target_pkgs'])
def cmd_sort(options):
do_pictures = ''
if options.graphviz:
do_pictures = '0f'
pkg_list, kmod_list = sort_kmods(options.depmod, options.config,
options.variants, do_pictures)
ret = print_report(pkg_list, kmod_list)
if options.output:
write_modules_lists(options.output, pkg_list, kmod_list)
return ret
def cmd_print_rule_map(options):
kmod_list = KModList()
kmod_list.load_depmod_file(options.depmod)
pkg_list = load_config(options.config, kmod_list, options.variants)
apply_initial_labels(pkg_list, kmod_list, treat_default_as_wants=True)
for kmod in kmod_list.get_alphabetical_order():
print('%-20s %s' % (kmod.preferred_pkg, kmod.kmod_pathname))
def cmd_selftest(options):
if options.graphviz:
FiltermodTests.do_pictures = '0f'
for arg in ['selftest', '-g', '--graphviz']:
if arg in sys.argv:
sys.argv.remove(arg)
unittest.main()
sys.exit(0)
def cmd_cmp2rpm(options):
do_rpm_mapping_test(options.config, options.kmod_rpms)
def main():
global log
parser = argparse.ArgumentParser()
parser.add_argument('-v', '--verbose', dest='verbose',
help='be more verbose', action='count', default=4)
parser.add_argument('-q', '--quiet', dest='quiet',
help='be more quiet', action='count', default=0)
parser.add_argument('-l', '--log-filename', dest='log_filename',
help='log filename', default='filtermods.log')
subparsers = parser.add_subparsers(dest='cmd')
def add_graphviz_arg(p):
p.add_argument('-g', '--graphviz', dest='graphviz',
help='generate graphviz visualizations',
action='store_true', default=False)
def add_config_arg(p):
p.add_argument('-c', '--config', dest='config', required=True,
help='path to yaml config with rules')
def add_depmod_arg(p):
p.add_argument('-d', '--depmod', dest='depmod', required=True,
help='path to modules.dep file')
def add_output_arg(p):
p.add_argument('-o', '--output', dest='output', default=None,
help='output $module_name.list files to directory specified by this parameter')
def add_variants_arg(p):
p.add_argument('-r', '--variants', dest='variants', action='append', default=[],
help='variants to enable in config')
def add_kmod_rpms_arg(p):
p.add_argument('-k', '--kmod-rpms', dest='kmod_rpms', required=True,
help='compare content of specified rpm(s) against yaml config rules')
parser_sort = subparsers.add_parser('sort', help='assign kmods specified by modules.dep using rules from yaml config')
add_config_arg(parser_sort)
add_depmod_arg(parser_sort)
add_output_arg(parser_sort)
add_variants_arg(parser_sort)
add_graphviz_arg(parser_sort)
parser_rule_map = subparsers.add_parser('rulemap', help='print how yaml config maps to kmods')
add_config_arg(parser_rule_map)
add_depmod_arg(parser_rule_map)
add_variants_arg(parser_rule_map)
parser_test = subparsers.add_parser('selftest', help='runs a self-test')
add_graphviz_arg(parser_test)
parser_cmp2rpm = subparsers.add_parser('cmp2rpm', help='compare ruleset against RPM(s)')
add_config_arg(parser_cmp2rpm)
add_kmod_rpms_arg(parser_cmp2rpm)
options = parser.parse_args()
if options.cmd == "selftest":
options.verbose = options.verbose - 2
options.verbose = max(options.verbose - options.quiet, 0)
levels = [NOTSET, CRITICAL, ERROR, WARN, INFO, DEBUG]
stdout_log_level = levels[min(options.verbose, len(levels) - 1)]
log = setup_logging(options.log_filename, stdout_log_level)
ret = 0
if options.cmd == "sort":
ret = cmd_sort(options)
elif options.cmd == "rulemap":
cmd_print_rule_map(options)
elif options.cmd == "selftest":
cmd_selftest(options)
elif options.cmd == "cmp2rpm":
cmd_cmp2rpm(options)
else:
parser.print_help()
return ret
if __name__ == '__main__':
# import profile
# profile.run('main()', sort=1)
sys.exit(main())