From 4011007b445e8f8da9b0cc45eccd793b94f6b5ce Mon Sep 17 00:00:00 2001 From: Sergio Correia Date: Thu, 29 Jul 2021 19:25:43 -0300 Subject: [PATCH] Add ausysrulevalidate --- contrib/ausysrulevalidate | 198 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 198 insertions(+) create mode 100755 contrib/ausysrulevalidate diff --git a/contrib/ausysrulevalidate b/contrib/ausysrulevalidate new file mode 100755 index 0000000..a251b2c --- /dev/null +++ b/contrib/ausysrulevalidate @@ -0,0 +1,198 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +# ausysrulevalidate - A program that lets you validate the syscalls +# in audit rules. +# Copyright (c) 2021 Red Hat Inc., Durham, North Carolina. +# All Rights Reserved. +# +# This software may be freely redistributed and/or modified under the +# terms of the GNU General Public License as published by the Free +# Software Foundation; either version 2, or (at your option) any +# later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; see the file COPYING. If not, write to the +# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor +# Boston, MA 02110-1335, USA. +# +# Authors: +# Sergio Correia + +""" This program lets you validate syscalls in audit rules. """ + +import argparse +import os.path +import sys + +import audit + + +class AuSyscallRuleValidate: + """AuSyscallRuleValidate validates syscalls in audit rules.""" + + def __init__(self): + self.syscalls_table = {} + self.invalid_syscalls = {} + self.machines = { + "b32": audit.audit_determine_machine("b32"), + "b64": audit.audit_determine_machine("b64"), + } + + if self.machines["b32"] == -1 or self.machines["b64"] == -1: + sys.stderr.write("ERROR: Unable to determine machine type\n") + sys.exit(1) + + def validate_syscall(self, arch, syscall): + """Validates a single syscall.""" + + if syscall == "all": + return True + + lookup = "{0}:{1}".format(arch, syscall) + if lookup in self.syscalls_table: + return self.syscalls_table[lookup] + + ret = audit.audit_name_to_syscall(syscall, self.machines[arch]) + self.syscalls_table[lookup] = ret != -1 + if not self.syscalls_table[lookup]: + self.invalid_syscalls[lookup] = lookup + + return self.syscalls_table[lookup] + + def process_syscalls(self, arch, syscalls): + """Processes a group of syscalls, validating them individually.""" + + scalls = syscalls.split(",") + processed = [] + for syscall in scalls: + if self.validate_syscall(arch, syscall): + processed.append(syscall) + return ",".join(processed) + + def parse_line(self, line): + """Processes a single line from the audit rules file, and returns the + same line adjusted, if required, by removing invalid syscalls, or even + removing the rule altogether, if no valid syscall remain after + validation.""" + + if line.lstrip().startswith("#") or "-S" not in line: + return line + + # We do have a rule specifying syscalls, so let's validate them. + tokens = line.split() + processed = [] + is_syscall = False + arch = None + + for val in tokens: + if not is_syscall: + processed.append(val) + + if val.startswith("arch="): + archs = val.split("=") + if len(archs) == 2: + arch = val.split("=")[1] + if arch not in self.machines: + sys.stderr.write("ERROR: unexpected arch '{0}'\n".format(arch)) + continue + + if val == "-S": + is_syscall = True + continue + + if is_syscall: + is_syscall = False + scalls = self.process_syscalls(arch, val) + + if len(scalls) == 0: + processed = processed[:-1] + continue + processed.append(scalls) + + if "-S" not in processed: + # Removing rule altogether, as we have no valid syscalls remaining. + return None + return " ".join(processed) + + def process_rules(self, rules_file): + """Reads a file with audit rules and returns the rules after + validation of syscalls/architecture. Invalid syscalls will be removed + and, if there are no valid remaining syscalls, the rule itself is + removed.""" + + if not os.path.isfile(rules_file): + sys.stderr.write("ERROR: rules file '{0}' not found\n".format(rules_file)) + sys.exit(1) + + with open(rules_file) as rules: + content = rules.readlines() + + processed = [] + changed = False + for line in content: + validated = self.parse_line(line) + if validated is None: + changed = True + continue + + if validated.rstrip("\r\n") != line.rstrip("\r\n"): + changed = True + processed.append(validated.rstrip("\r\n")) + + invalid_syscalls = [] + for invalid in self.invalid_syscalls: + invalid_syscalls.append(invalid) + + return (processed, changed, invalid_syscalls) + + def update_rules(self, rules_file): + """Reads a file with audit rules and updates it after validation of + syscalls/architecture. Invalid syscalls will be removed and, if + there are no valid remaining syscalls, the rule itself is removed.""" + + new_rules, changed, invalid_syscalls = self.process_rules(rules_file) + if changed: + with open(rules_file, "w") as rules: + for line in new_rules: + rules.write("{0}\n".format(line)) + + return (new_rules, changed, invalid_syscalls) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="ausysrulevalidate") + parser.add_argument( + "-u", "--update", help="Update rules file if required", action="store_true" + ) + parser.add_argument( + "-v", "--verbose", help="Show the resulting rules file", action="store_true" + ) + required_named = parser.add_argument_group("required named arguments") + required_named.add_argument( + "-r", "--rules-file", help="Rules file name", required=True + ) + args = parser.parse_args() + + validator = AuSyscallRuleValidate() + + action = validator.process_rules + if args.update: + action = validator.update_rules + + data, changed, invalid = action(args.rules_file) + if changed: + verb = "require" + if args.update: + verb += "d" + sys.stderr.write("Rules in '{0}' {1} changes\n".format(args.rules_file, verb)) + if len(invalid) > 0: + sys.stderr.write("Invalid syscalls: {0}\n".format(", ".join(invalid))) + + if args.verbose: + print(*data, sep="\n") -- 2.31.1