Blob Blame History Raw
From 6243c5621a6b76a4652ea68437f37d3b15bc93c7 Mon Sep 17 00:00:00 2001
From: Matej Tyc <matyc@redhat.com>
Date: Fri, 14 Jan 2022 17:24:00 +0100
Subject: [PATCH] Add support for the firstboot remediation

OpenSCAP can support a new remediation type -
a classical scan + remediate that is executed during the first boot.
This PR adds support for this functionality,
and allows to control it via a remediate kickstart property.
---
 org_fedora_oscap/common.py   | 41 ++++++++++++++++++++++++++++++
 org_fedora_oscap/ks/oscap.py | 48 ++++++++++++++++++++++++++++--------
 2 files changed, 79 insertions(+), 10 deletions(-)

diff --git a/org_fedora_oscap/common.py b/org_fedora_oscap/common.py
index 05829ce..663c526 100644
--- a/org_fedora_oscap/common.py
+++ b/org_fedora_oscap/common.py
@@ -29,6 +29,7 @@
 import subprocess
 import zipfile
 import tarfile
+import textwrap
 
 import cpioarchive
 import re
@@ -309,6 +310,46 @@ def run_oscap_remediate(profile, fpath, ds_id="", xccdf_id="", tailoring="",
     return proc.stdout
 
 
+def _schedule_firstboot_remediation(
+        chroot, profile, ds_path, results_path, report_path, ds_id, xccdf_id, tailoring_path):
+    config = textwrap.dedent(f"""\
+    OSCAP_REMEDIATE_DS='{ds_path}'
+    OSCAP_REMEDIATE_PROFILE_ID='{profile}'
+    OSCAP_REMEDIATE_ARF_RESULT='{results_path}'
+    OSCAP_REMEDIATE_HTML_REPORT='{report_path}'
+    """)
+    if ds_id:
+        config += "OSCAP_REMEDIATE_DATASTREAM_ID='{ds_id}'\n"
+    if xccdf_id:
+        config += "OSCAP_REMEDIATE_XCCDF_ID='{xccdf_id}'\n"
+    if tailoring_path:
+        config += "OSCAP_REMEDIATE_TAILORING='{tailoring_path}'\n"
+
+    relative_filename = "var/tmp/oscap-remediate-offline.conf.sh"
+    local_config_filename = f"/{relative_filename}"
+    chroot_config_filename = os.path.join(chroot, relative_filename)
+    with open(chroot_config_filename, "w") as f:
+        f.write(config)
+    os.symlink(local_config_filename,
+               os.path.join(chroot, "system-update"))
+
+
+def schedule_firstboot_remediation(chroot, profile, fpath, ds_id="", xccdf_id="", tailoring=""):
+    if not profile:
+        return ""
+
+    # make sure the directory for the results exists
+    results_dir = os.path.dirname(RESULTS_PATH)
+    results_dir = os.path.normpath(chroot + "/" + results_dir)
+    utils.ensure_dir_exists(results_dir)
+
+    log.info("OSCAP addon: Scheduling firstboot remediation")
+    _schedule_firstboot_remediation(
+        chroot, profile, fpath, RESULTS_PATH, REPORT_PATH, ds_id, xccdf_id, tailoring)
+
+    return ""
+
+
 def extract_data(archive, out_dir, ensure_has_files=None):
     """
     Fuction that extracts the given archive to the given output directory. It
diff --git a/org_fedora_oscap/ks/oscap.py b/org_fedora_oscap/ks/oscap.py
index ef34448..8fef2aa 100644
--- a/org_fedora_oscap/ks/oscap.py
+++ b/org_fedora_oscap/ks/oscap.py
@@ -54,7 +54,7 @@
                           # LABEL:?, hdaX:?,
                           )
 
-REQUIRED_PACKAGES = ("openscap", "openscap-scanner", )
+REQUIRED_PACKAGES = ("openscap", "openscap-scanner", "openscap-utils",)
 
 FINGERPRINT_REGEX = re.compile(r'^[a-z0-9]+$')
 
@@ -100,6 +100,9 @@ def __init__(self, name, just_clear=False):
         # certificate to verify HTTPS connection or signed data
         self.certificates = ""
 
+        # What remediation(s) to execute
+        self.remediate = ""
+
         # internal values
         self.rule_data = rule_handling.RuleData()
         self.dry_run = False
@@ -145,6 +148,9 @@ def key_value_pair(key, value, indent=4):
         if self.certificates:
             ret += "\n%s" % key_value_pair("certificates", self.certificates)
 
+        if self.remediate:
+            ret += "\n%s" % key_value_pair("remediate", self.remediate)
+
         ret += "\n%end\n\n"
         return ret
 
@@ -203,6 +209,10 @@ def _parse_fingerprint(self, value):
     def _parse_certificates(self, value):
         self.certificates = value
 
+    def _parse_remediate(self, value):
+        assert value in ("none", "post", "firstboot", "both")
+        self.remediate = value
+
     def handle_line(self, line):
         """
         The handle_line method that is called with every line from this addon's
@@ -224,6 +234,7 @@ def handle_line(self, line):
                    "tailoring-path": self._parse_tailoring_path,
                    "fingerprint": self._parse_fingerprint,
                    "certificates": self._parse_certificates,
+                   "remediate": self._parse_remediate,
                    }
 
         line = line.strip()
@@ -527,15 +538,32 @@ def execute(self, storage, ksdata, users, payload):
         if os.path.exists(self.preinst_tailoring_path):
             shutil.copy2(self.preinst_tailoring_path, target_content_dir)
 
-        try:
-            common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
-                                       self.datastream_id, self.xccdf_id,
-                                       self.postinst_tailoring_path,
-                                       chroot=conf.target.system_root)
-        except Exception as exc:
-            msg = _(f"Something went wrong during the final hardening: {str(exc)}.")
-            self._terminate(msg)
-            return
+        if self.remediate in ("", "post", "both"):
+            try:
+                common.run_oscap_remediate(self.profile_id, self.postinst_content_path,
+                                           self.datastream_id, self.xccdf_id,
+                                           self.postinst_tailoring_path,
+                                           chroot=conf.target.system_root)
+            except Exception as exc:
+                msg = _(f"Something went wrong during the final hardening: {str(exc)}.")
+                self._terminate(msg)
+                return
+
+        if self.remediate in ("", "firstboot", "both"):
+            try:
+                common.schedule_firstboot_remediation(
+                    conf.target.system_root,
+                    self.profile_id,
+                    self.postinst_content_path,
+                    self.datastream_id,
+                    self.xccdf_id,
+                    self.postinst_tailoring_path,
+                )
+            except Exception as exc:
+                msg = _("Something went wrong when scheduling the first-boot remediation: {exc}."
+                        .format(exc=str(exc)))
+                terminate(msg)
+                return
 
     def clear_all(self):
         """Clear all the stored values."""