Blob Blame History Raw
From 57c67e2b359e9544ecd5a0ba264adf7aa7c67991 Mon Sep 17 00:00:00 2001
From: rpm-build <rpm-build>
Date: Mon, 14 Nov 2022 21:47:40 -0300
Subject: [PATCH 3/3] Backport upsteam PR#1156

From: https://github.com/keylime/keylime/pull/1156

We had partially addressed the issue we encountered when parsing some
certificates with python-cryptography by writing cert_utils module that
provided a single helper to parse the pubkey from a certificate.

However, we still were doing the EK validation in tpm_main using
python-cryptography, which means we would fall into the same parsing
problem again, since during the validation it would read all the certs
in the tpm cert store to check if it is signing the presented EK.

We address this issue by moving the EK cert verification to cert_utils:
we use the same approach as before, i.e. we parse the cert with pyasn1
when python-cryptography fails, and then use python-cryptography to do
the actual signature verification, as before.

By moving the method to cert_utils, it also becomes simpler to test it,
so in this commit we more tests to verify the methods work as expected.

Additionally, the updated EK verification is also capable of handling
ECDSA signatures
---
 keylime.conf                |   2 +
 keylime/cert_utils.py       | 144 ++++++++++++++++++++++++++++++++++--
 keylime/registrar_common.py |   7 +-
 keylime/tenant.py           |  21 ++----
 keylime/tpm/tpm_main.py     |  47 +-----------
 keylime/tpm_ek_ca.py        |  10 +--
 scripts/ek-openssl-verify   |  98 ++++++++++++++++++++++++
 test/run_tests.sh           |   8 +-
 test/test_cert_utils.py     | 142 ++++++++++++++++++++++++++++++-----
 9 files changed, 380 insertions(+), 99 deletions(-)
 create mode 100755 scripts/ek-openssl-verify

diff --git a/keylime.conf b/keylime.conf
index 331e57a..d896f9f 100644
--- a/keylime.conf
+++ b/keylime.conf
@@ -501,6 +501,8 @@ require_ek_cert = True
 # PROVKEYS - contains a json document containing EK, EKcert, and AIK from the
 # provider.  EK and AIK are in PEM format.  The EKcert is in base64 encoded
 # DER format.
+# TPM_CERT_STORE - contains the path to the TPM certificates store, e.g.:
+# "/var/lib/keylime/tpm_cert_store".
 #
 # Set to blank to disable this check.  See warning above if require_ek_cert
 # is "False".
diff --git a/keylime/cert_utils.py b/keylime/cert_utils.py
index d014aed..d2fc54d 100644
--- a/keylime/cert_utils.py
+++ b/keylime/cert_utils.py
@@ -1,13 +1,143 @@
-from cryptography.hazmat.primitives.serialization import load_der_public_key
+import io
+import os.path
+import subprocess
+import sys
+
+from cryptography import exceptions as crypto_exceptions
+from cryptography import x509
+from cryptography.hazmat.backends import default_backend
+from cryptography.hazmat.primitives.asymmetric import ec, padding
+from cryptography.hazmat.primitives.asymmetric.ec import EllipticCurvePublicKey
+from cryptography.hazmat.primitives.asymmetric.rsa import RSAPublicKey
 from pyasn1.codec.der import decoder, encoder
-from pyasn1_modules import rfc2459
+from pyasn1_modules import pem, rfc2459
 
+from keylime import config, keylime_logging, tpm_ek_ca
 
 # Issue #944 -- python-cryptography won't parse malformed certs,
 # such as some Nuvoton ones we have encountered in the field.
 # Unfortunately, we still have to deal with such certs anyway.
-# Let's read the EK cert with pyasn1 instead of python-cryptography.
-def read_x509_der_cert_pubkey(der_cert_data):
-    """Returns the public key of a DER-encoded X.509 certificate"""
-    der509 = decoder.decode(der_cert_data, asn1Spec=rfc2459.Certificate())[0]
-    return load_der_public_key(encoder.encode(der509["tbsCertificate"]["subjectPublicKeyInfo"]))
+
+# Here we provide some helpers that use pyasn1 to parse the certificates
+# when parsing them with python-cryptography fails, and in this case, we
+# try to read the parsed certificate again into python-cryptograhy.
+
+logger = keylime_logging.init_logging("cert_utils")
+
+
+def x509_der_cert(der_cert_data: bytes):
+    """Load an x509 certificate provided in DER format
+    :param der_cert_data: the DER bytes of the certificate
+    :type der_cert_data: bytes
+    :returns: cryptography.x509.Certificate
+    """
+    try:
+        return x509.load_der_x509_certificate(data=der_cert_data, backend=default_backend())
+    except Exception as e:
+        logger.warning("Failed to parse DER data with python-cryptography: %s", e)
+        pyasn1_cert = decoder.decode(der_cert_data, asn1Spec=rfc2459.Certificate())[0]
+        return x509.load_der_x509_certificate(data=encoder.encode(pyasn1_cert), backend=default_backend())
+
+
+def x509_pem_cert(pem_cert_data: str):
+    """Load an x509 certificate provided in PEM format
+    :param pem_cert_data: the base-64 encoded PEM certificate
+    :type pem_cert_data: str
+    :returns: cryptography.x509.Certificate
+    """
+    try:
+        return x509.load_pem_x509_certificate(data=pem_cert_data.encode("utf-8"), backend=default_backend())
+    except Exception as e:
+        logger.warning("Failed to parse PEM data with python-cryptography: %s", e)
+        # Let's read the DER bytes from the base-64 PEM.
+        der_data = pem.readPemFromFile(io.StringIO(pem_cert_data))
+        # Now we can load it as we do in x509_der_cert().
+        pyasn1_cert = decoder.decode(der_data, asn1Spec=rfc2459.Certificate())[0]
+        return x509.load_der_x509_certificate(data=encoder.encode(pyasn1_cert), backend=default_backend())
+
+
+def verify_ek(ekcert, tpm_cert_store=config.get("tenant", "tpm_cert_store")):
+    """Verify that the provided EK certificate is signed by a trusted root
+    :param ekcert: The Endorsement Key certificate in DER format
+    :returns: True if the certificate can be verified, False otherwise
+    """
+    try:
+        trusted_certs = tpm_ek_ca.cert_loader(tpm_cert_store)
+    except Exception as e:
+        logger.warning("Error loading trusted certificates from the TPM cert store: %s", e)
+        return False
+
+    try:
+        ek509 = x509_der_cert(ekcert)
+        for cert_file, pem_cert in trusted_certs.items():
+            signcert = x509_pem_cert(pem_cert)
+            if ek509.issuer != signcert.subject:
+                continue
+
+            signcert_pubkey = signcert.public_key()
+            try:
+                if isinstance(signcert_pubkey, RSAPublicKey):
+                    signcert_pubkey.verify(
+                        ek509.signature,
+                        ek509.tbs_certificate_bytes,
+                        padding.PKCS1v15(),
+                        ek509.signature_hash_algorithm,
+                    )
+                elif isinstance(signcert_pubkey, EllipticCurvePublicKey):
+                    signcert_pubkey.verify(
+                        ek509.signature,
+                        ek509.tbs_certificate_bytes,
+                        ec.ECDSA(ek509.signature_hash_algorithm),
+                    )
+                else:
+                    logger.warning("Unsupported public key type: %s", type(signcert_pubkey))
+                    continue
+            except crypto_exceptions.InvalidSignature:
+                continue
+
+            logger.debug("EK cert matched cert: %s", cert_file)
+            return True
+    except Exception as e:
+        # Log the exception so we don't lose the raw message
+        logger.exception(e)
+        raise Exception("Error processing ek/ekcert. Does this TPM have a valid EK?").with_traceback(sys.exc_info()[2])
+
+    logger.error("No Root CA matched EK Certificate")
+    return False
+
+
+def verify_ek_script(script, env, cwd):
+    if script is None:
+        logger.warning("External check script (%s) not specified", script)
+        return False
+
+    script_path = os.path.abspath(script)
+    if not os.path.isfile(script_path):
+        if cwd is None or not os.path.isfile(os.path.abspath(os.path.join(cwd, script))):
+            logger.warning("External check script (%s) not found; please make sure its path is correct", script)
+            return False
+        script_path = os.path.abspath(os.path.join(cwd, script))
+
+    try:
+        proc = subprocess.run(
+            [script_path],
+            env=env,
+            shell=False,
+            cwd=cwd,
+            stderr=subprocess.STDOUT,
+            stdout=subprocess.PIPE,
+            check=False,
+        )
+        if proc.returncode != 0:
+            errmsg = ""
+            if proc.stdout is not None:
+                errmsg = proc.stdout.decode("utf-8")
+            logger.error("External check script failed to validate EK: %s", errmsg)
+            return False
+        logger.debug("External check script successfully to validated EK")
+        if proc.stdout is not None:
+            logger.info("ek_check output: %s", proc.stdout.decode("utf-8"))
+    except subprocess.CalledProcessError as e:
+        logger.error("Error while trying to run external check script to validate EK: %s", e)
+        return False
+    return True
diff --git a/keylime/registrar_common.py b/keylime/registrar_common.py
index 2c32d19..fb37e5b 100644
--- a/keylime/registrar_common.py
+++ b/keylime/registrar_common.py
@@ -261,11 +261,8 @@ class UnprotectedHandler(BaseHTTPRequestHandler, SessionManager):
                 # Note, we don't validate the EKCert here, other than the implicit
                 #  "is it a valid x509 cert" check. So it's still untrusted.
                 # This will be validated by the tenant.
-                ek_tpm = base64.b64encode(
-                    tpm2_objects.ek_low_tpm2b_public_from_pubkey(
-                        cert_utils.read_x509_der_cert_pubkey(base64.b64decode(ekcert))
-                    )
-                ).decode()
+                cert = cert_utils.x509_der_cert(base64.b64decode(ekcert))
+                ek_tpm = base64.b64encode(tpm2_objects.ek_low_tpm2b_public_from_pubkey(cert.public_key())).decode()
 
             aik_attrs = tpm2_objects.get_tpm2b_public_object_attributes(
                 base64.b64decode(aik_tpm),
diff --git a/keylime/tenant.py b/keylime/tenant.py
index cc53623..dd9c09c 100644
--- a/keylime/tenant.py
+++ b/keylime/tenant.py
@@ -5,7 +5,6 @@ import io
 import json
 import logging
 import os
-import subprocess
 import sys
 import tempfile
 import time
@@ -15,7 +14,7 @@ import requests
 from cryptography.hazmat.primitives import serialization as crypto_serialization
 
 from keylime import api_version as keylime_api_version
-from keylime import ca_util, config, crypto, keylime_logging, registrar_client, signing, web_util
+from keylime import ca_util, cert_utils, config, crypto, keylime_logging, registrar_client, signing, web_util
 from keylime.agentstates import AgentAttestState
 from keylime.cli import options, policies
 from keylime.cmd import user_data_encrypt
@@ -516,19 +515,11 @@ class Tenant:
             env["EK_CERT"] = ""
 
         env["PROVKEYS"] = json.dumps(self.registrar_data.get("provider_keys", {}))
-        with subprocess.Popen(
-            script, env=env, shell=True, cwd=config.WORK_DIR, stdout=subprocess.PIPE, stderr=subprocess.STDOUT
-        ) as proc:
-            retval = proc.wait()
-            if retval != 0:
-                raise UserError("External check script failed to validate EK")
-            logger.debug("External check script successfully to validated EK")
-            while True:
-                line = proc.stdout.readline().decode()
-                if line == "":
-                    break
-                logger.debug("ek_check output: %s", line.strip())
-        return True
+
+        # Define the TPM cert store for the external script.
+        env["TPM_CERT_STORE"] = config.get("tenant", "tpm_cert_store")
+
+        return cert_utils.verify_ek_script(script, env, config.WORK_DIR)
 
     def do_cv(self):
         """Initiate v, agent_id and ip and initiate the cloudinit sequence"""
diff --git a/keylime/tpm/tpm_main.py b/keylime/tpm/tpm_main.py
index 7bab4fd..35f0a2f 100644
--- a/keylime/tpm/tpm_main.py
+++ b/keylime/tpm/tpm_main.py
@@ -12,14 +12,10 @@ import time
 import typing
 import zlib
 
-from cryptography import exceptions as crypto_exceptions
-from cryptography import x509
-from cryptography.hazmat.backends import default_backend
 from cryptography.hazmat.primitives import serialization as crypto_serialization
-from cryptography.hazmat.primitives.asymmetric import padding
 from packaging.version import Version
 
-from keylime import cmd_exec, config, keylime_logging, secure_mount, tpm_ek_ca
+from keylime import cert_utils, cmd_exec, config, keylime_logging, secure_mount
 from keylime.agentstates import TPMClockInfo
 from keylime.common import algorithms, retry
 from keylime.failure import Component, Failure
@@ -785,46 +781,7 @@ class tpm(tpm_abstract.AbstractTPM):
         :param ekcert: The Endorsement Key certificate in DER format
         :returns: True if the certificate can be verified, false otherwise
         """
-        # openssl x509 -inform der -in certificate.cer -out certificate.pem
-        try:
-            tpm_ek_ca.check_tpm_cert_store()
-
-            ek509 = x509.load_der_x509_certificate(
-                data=ekcert,
-                backend=default_backend(),
-            )
-
-            trusted_certs = tpm_ek_ca.cert_loader()
-            for cert in trusted_certs:
-                signcert = x509.load_pem_x509_certificate(
-                    data=cert.encode(),
-                    backend=default_backend(),
-                )
-
-                if ek509.issuer.rfc4514_string() != signcert.subject.rfc4514_string():
-                    continue
-
-                try:
-                    signcert.public_key().verify(
-                        ek509.signature,
-                        ek509.tbs_certificate_bytes,
-                        padding.PKCS1v15(),
-                        ek509.signature_hash_algorithm,
-                    )
-                except crypto_exceptions.InvalidSignature:
-                    continue
-
-                logger.debug("EK cert matched cert: %s", cert)
-                return True
-        except Exception as e:
-            # Log the exception so we don't lose the raw message
-            logger.exception(e)
-            raise Exception("Error processing ek/ekcert. Does this TPM have a valid EK?").with_traceback(
-                sys.exc_info()[2]
-            )
-
-        logger.error("No Root CA matched EK Certificate")
-        return False
+        return cert_utils.verify_ek(ekcert)
 
     def get_tpm_manufacturer(self, output=None):
         vendorStr = None
diff --git a/keylime/tpm_ek_ca.py b/keylime/tpm_ek_ca.py
index 3695f0b..fb66c07 100644
--- a/keylime/tpm_ek_ca.py
+++ b/keylime/tpm_ek_ca.py
@@ -7,8 +7,7 @@ logger = keylime_logging.init_logging("tpm_ek_ca")
 trusted_certs = {}
 
 
-def check_tpm_cert_store():
-    tpm_cert_store = config.get("tenant", "tpm_cert_store")
+def check_tpm_cert_store(tpm_cert_store=config.get("tenant", "tpm_cert_store")):
     if not os.path.isdir(tpm_cert_store):
         logger.error("The directory %s does not exist.", tpm_cert_store)
         raise Exception(f"The directory {tpm_cert_store} does not exist.")
@@ -21,11 +20,10 @@ def check_tpm_cert_store():
         raise Exception(f"The directory {tpm_cert_store} does not contain " f"any .pem files")
 
 
-def cert_loader():
-    tpm_cert_store = config.get("tenant", "tpm_cert_store")
+def cert_loader(tpm_cert_store=config.get("tenant", "tpm_cert_store")):
     file_list = glob.glob(os.path.join(tpm_cert_store, "*.pem"))
-    my_trusted_certs = []
+    my_trusted_certs = {}
     for file_path in file_list:
         with open(file_path, encoding="utf-8") as f_input:
-            my_trusted_certs.append(f_input.read())
+            my_trusted_certs[file_path] = f_input.read()
     return my_trusted_certs
diff --git a/scripts/ek-openssl-verify b/scripts/ek-openssl-verify
new file mode 100755
index 0000000..f91e9b5
--- /dev/null
+++ b/scripts/ek-openssl-verify
@@ -0,0 +1,98 @@
+#!/bin/sh
+
+# This script can be used as the `ek_check_script' (tenant configuration),
+# to attempt to verify a provided EK_CERT via env var using openssl.
+
+# EK             - contains a PEM-encoded version of the public EK
+# EK_CERT        - contains a base64 DER-encoded EK certificate if one is
+#                  available.
+# PROVKEYS       - contains a json document containing EK, EKcert, and AIK
+#                  from the provider. EK and AIK are in PEM format. The
+#                  EKcert is in base64-encoded DER format
+# TPM_CERT_STORE - contains the path of the TPM certificate store.
+EK=${EK:-}
+EK_CERT=${EK_CERT:-}
+PROVKEYS=${PROVKEYS:-}
+
+EK_VERIFICATION_LOG=${EK_VERIFICATION_LOG:-/var/log/keylime/ek-verification.log}
+LOG="${EK_VERIFICATION_LOG}"
+
+# Setting log fallback in case we cannot write to the specified file.
+touch "${LOG}" 2>/dev/null || LOG=/dev/stderr
+
+log() {
+  _stderr=${2:-}
+  echo "[$(date)] ${1}" >&2 >> "${LOG}"
+  [ -n "${_stderr}" ] && [ "${LOG}" != '/dev/stderr' ] && echo "${1}" >&2
+}
+
+die() {
+  log "ERROR: ${1}" _
+  exit 1
+}
+
+command -v openssl >/dev/null \
+  || die "openssl CLI was not found in the PATH; please make sure it is installed"
+
+[ -n "${EK_CERT}" ] || die "EK_CERT was not provided as an env var"
+
+# Cert store directory. If one is not provided via TPM_CERT_STORE env var,
+# we start by attempting to read tenant.conf.
+CERT_STORE=${TPM_CERT_STORE:-}
+
+if [ -z "${CERT_STORE}" ]; then
+  KEYLIME_CONFIG_DIR=${KEYLIME_CONFIG_DIR:-/etc/keylime}
+  [ -d "${KEYLIME_CONFIG_DIR}" ] \
+    || die "KEYLIME_CONFIG_DIR (${KEYLIME_CONFIG_DIR}) does not seem to exist"
+
+  if [ -r "${KEYLIME_CONFIG_DIR}"/tenant.conf ]; then
+    CERT_STORE="$(grep -w ^tpm_cert_store "${KEYLIME_CONFIG_DIR}"/tenant.conf \
+                  | tail -1 | cut -d'=' -f2 | tr -d "[:blank:]")"
+  fi
+
+  # Next we try to read any snippets in tenant.conf.d/
+  if [ -d "${KEYLIME_CONFIG_DIR}"/tenant.conf.d ]; then
+    for _s in "${KEYLIME_CONFIG_DIR}"/tenant.conf.d/*.conf; do
+      [ -e "${_s}" ] || continue
+      _store="$(grep -w ^tpm_cert_store "${_s}" \
+                | tail -1 | cut -d'=' -f2 | tr -d "[:blank:]")"
+      [ -n "${_store}" ] && CERT_STORE="${_store}"
+    done
+  fi
+fi
+
+[ -n "${CERT_STORE}" ] \
+  || die "It was not possible to determine the TPM cert store dir from tenant.conf or tenant.conf.d/ snippets"
+[ -d "${CERT_STORE}" ] \
+  || die "TPM cert store is not a valid directory (${CERT_STORE})"
+
+EK_VERIFICATION_TMPDIR=
+ek_verification_cleanup() {
+  [ -d "${EK_VERIFICATION_TMPDIR}" ] || return 0
+  rm -rf "${EK_VERIFICATION_TMPDIR}"
+}
+trap ek_verification_cleanup EXIT
+
+mkdir -p "${TMPDIR:-/tmp}"
+EK_VERIFICATION_TMPDIR="$(mktemp -d)" || \
+  die "Creating a temp dir for EK verification failed"
+
+EK_CERT_FILE_DER="${EK_VERIFICATION_TMPDIR}"/ek.der
+EK_CERT_FILE_PEM="${EK_VERIFICATION_TMPDIR}"/ek.pem
+
+printf '%s' "${EK_CERT}" > "${EK_CERT_FILE_DER}".b64
+base64 -d "${EK_CERT_FILE_DER}".b64 > "${EK_CERT_FILE_DER}"
+openssl x509 -inform der -in "${EK_CERT_FILE_DER}" > "${EK_CERT_FILE_PEM}"
+
+for c in "${CERT_STORE}"/*.pem; do
+  [ -e "${c}" ] || continue
+  log "Checking if ${c} is the issuer of EK cert..."
+  if openssl verify -partial_chain -CAfile "${c}" "${EK_CERT_FILE_PEM}" \
+                                           >>"${LOG}" 2>>"${LOG}"; then
+    log "${EK_CERT} successfully verified by $(basename "${c}")" _
+    exit 0
+  fi
+done
+
+die "EK signature did not match certificates from TPM cert store"
+# vim:set ts=2 sw=2 et:
diff --git a/test/run_tests.sh b/test/run_tests.sh
index fc43113..81a6dff 100755
--- a/test/run_tests.sh
+++ b/test/run_tests.sh
@@ -107,19 +107,19 @@ fi
 # Set correct dependencies
 # Fedora
 if [ $PACKAGE_MGR = "dnf" ]; then
-    PYTHON_PREIN="python3"
+    PYTHON_PREIN="python3 openssl"
     PYTHON_DEPS="python3-pip python3-dbus"
 # RHEL / CentOS etc
 elif [ $PACKAGE_MGR = "yum" ]; then
-    PYTHON_PREIN="epel-release python36"
+    PYTHON_PREIN="epel-release python36 openssl"
     PYTHON_DEPS="python36-pip python36-dbus"
 # Ubuntu / Debian
 elif [ $PACKAGE_MGR = "apt-get" ]; then
-    PYTHON_PREIN="python3"
+    PYTHON_PREIN="python3 openssl"
     PYTHON_DEPS="python3-pip python3-dbus"
 # SUSE
 elif [ $PACKAGE_MGR = "zypper" ]; then
-    PYTHON_PREIN="python3"
+    PYTHON_PREIN="python3 openssl"
     PYTHON_DEPS="python3-pip python3-dbus"
 else
     echo "No recognized package manager found on this system!" 1>&2
diff --git a/test/test_cert_utils.py b/test/test_cert_utils.py
index 4666c0f..bdf6090 100644
--- a/test/test_cert_utils.py
+++ b/test/test_cert_utils.py
@@ -4,17 +4,19 @@ Copyright 2022 Red Hat, Inc.
 """
 
 import base64
+import os
 import unittest
 
-from keylime import cert_utils
+import cryptography
 
+from keylime import cert_utils, tpm_ek_ca
 
-class Cert_Utils_Test(unittest.TestCase):
-    def test_read_x509_der_cert_pubkey(self):
-        # The certificate listed in issue #944, from Nuvoton. It fails to
-        # be parsed by python-cryptography with the following error:
-        # ValueError: error parsing asn1 value: ParseError { kind: InvalidSetOrdering, location: ["RawCertificate::tbs_cert", "TbsCertificate::issuer", "0", "2"] }
-        nuvoton_ecdsa_sha256_der = """\
+CERT_STORE_DIR = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "tpm_cert_store"))
+
+# The certificate listed in issue #944, from Nuvoton. It fails to
+# be parsed by python-cryptography with the following error:
+# ValueError: error parsing asn1 value: ParseError { kind: InvalidSetOrdering, location: ["RawCertificate::tbs_cert", "TbsCertificate::issuer", "0", "2"] }
+nuvoton_ecdsa_sha256_der = """\
 MIICBjCCAaygAwIBAgIIP5MvnZk8FrswCgYIKoZIzj0EAwIwVTFTMB8GA1UEAxMYTnV2b3RvbiBU
 UE0gUm9vdCBDQSAyMTEwMCUGA1UEChMeTnV2b3RvbiBUZWNobm9sb2d5IENvcnBvcmF0aW9uMAkG
 A1UEBhMCVFcwHhcNMTUxMDE5MDQzMjAwWhcNMzUxMDE1MDQzMjAwWjBVMVMwHwYDVQQDExhOdXZv
@@ -26,10 +28,10 @@ ajW+9zAfBgNVHSMEGDAWgBSfu3mqD1JieL7RUJKacXHpajW+9zAKBggqhkjOPQQDAgNIADBFAiEA
 /jiywhOKpiMOUnTfDmXsXfDFokhKVNTXB6Xtqm7J8L4CICjT3/Y+rrSnf8zrBXqWeHDh8Wi41+w2
 ppq6Ev9orZFI
 """
-        # This cert from STMicroelectronics presents a different issue when
-        # parsed by python-cryptography:
-        # ValueError: error parsing asn1 value: ParseError { kind: ExtraData }
-        st_sha256_with_rsa_der = """\
+# This cert from STMicroelectronics presents a different issue when
+# parsed by python-cryptography:
+# ValueError: error parsing asn1 value: ParseError { kind: ExtraData }
+st_sha256_with_rsa_der = """\
 MIIEjTCCA3WgAwIBAgIUTL0P5h7nYu2yjVCyaPw1hv89XoIwDQYJKoZIhvcNAQELBQAwVTELMAkG
 A1UEBhMCQ0gxHjAcBgNVBAoTFVNUTWljcm9lbGVjdHJvbmljcyBOVjEmMCQGA1UEAxMdU1RNIFRQ
 TSBFSyBJbnRlcm1lZGlhdGUgQ0EgMDUwHhcNMTgwNzExMDAwMDAwWhcNMjgwNzExMDAwMDAwWjAA
@@ -60,10 +62,116 @@ rTJ1x4NA2ZtQMYyT29Yy1UlkjocAaXL5u0m3Hvz/////////////////////////////////////
 ////////////////////////////////////////////////////////////////////////////
 /////w==
 """
-        certs = [nuvoton_ecdsa_sha256_der, st_sha256_with_rsa_der]
-        for c in certs:
+
+st_ecdsa_sha256_der = """\
+MIIDAzCCAqmgAwIBAgIUIymn2ai+UaVx1bM26/wU7I+sJd8wCgYIKoZIzj0EAwIwVjELMAkGA1UE
+BhMCQ0gxHjAcBgNVBAoTFVNUTWljcm9lbGVjdHJvbmljcyBOVjEnMCUGA1UEAxMeU1RNIFRQTSBF
+Q0MgSW50ZXJtZWRpYXRlIENBIDAxMB4XDTE4MDcyNjAwMDAwMFoXDTI4MDcyNjAwMDAwMFowADBZ
+MBMGByqGSM49AgEGCCqGSM49AwEHA0IABBsTz5y2cedVZxG/GsbXQ9bL6EQylWNjx1b/SSp2EHlN
+aJjtn43iz2zb+qot2UOhQIwPxS5hMCXhasw4XsFXgnijggGpMIIBpTAfBgNVHSMEGDAWgBR+uDbO
++9+KY3H/czP5utcUYWyWyzBCBgNVHSAEOzA5MDcGBFUdIAAwLzAtBggrBgEFBQcCARYhaHR0cDov
+L3d3dy5zdC5jb20vVFBNL3JlcG9zaXRvcnkvMFkGA1UdEQEB/wRPME2kSzBJMRYwFAYFZ4EFAgEM
+C2lkOjUzNTQ0RDIwMRcwFQYFZ4EFAgIMDFNUMzNIVFBIQUhCNDEWMBQGBWeBBQIDDAtpZDowMDQ5
+MDAwNDBmBgNVHQkEXzBdMBYGBWeBBQIQMQ0wCwwDMi4wAgEAAgF0MEMGBWeBBQISMTowOAIBAAEB
+/6ADCgEBoQMKAQCiAwoBAKMQMA4WAzMuMQoBBAoBAgEB/6QPMA0WBTE0MC0yCgECAQEAMAwGA1Ud
+EwEB/wQCMAAwEAYDVR0lBAkwBwYFZ4EFCAEwDgYDVR0PAQH/BAQDAgMIMEsGCCsGAQUFBwEBBD8w
+PTA7BggrBgEFBQcwAoYvaHR0cDovL3NlY3VyZS5nbG9iYWxzaWduLmNvbS9zdG10cG1lY2NpbnQw
+MS5jcnQwCgYIKoZIzj0EAwIDSAAwRQIgcNiZkn7poyk6J8Y1Cnwz4nV7YGPb5pBesBg6bk9n6KIC
+IQCE/jkHb/aPP/T3GtfLNHAdHL4JnofAbsDEuLQxAseeZA==
+"""
+
+
+def has_strict_x509_parsing():
+    """Indicates whether python-cryptography has strict x509 parsing."""
+
+    # Major release where python-cryptography started being strict
+    # when parsing x509 certificates.
+    PYCRYPTO_STRICT_X509_MAJOR = 35
+    return int(cryptography.__version__.split(".", maxsplit=1)[0]) >= PYCRYPTO_STRICT_X509_MAJOR
+
+
+def expectedFailureIf(condition):
+    """The test is marked as an expectedFailure if the condition is satisfied."""
+
+    def wrapper(func):
+        if condition:
+            return unittest.expectedFailure(func)
+        return func
+
+    return wrapper
+
+
+class Cert_Utils_Test(unittest.TestCase):
+    def test_tpm_cert_store(self):
+        tpm_ek_ca.check_tpm_cert_store(CERT_STORE_DIR)
+        my_trusted_certs = tpm_ek_ca.cert_loader(CERT_STORE_DIR)
+
+        self.assertNotEqual(len(my_trusted_certs), 0)
+
+    def test_cert_store_certs(self):
+        my_trusted_certs = tpm_ek_ca.cert_loader(CERT_STORE_DIR)
+        for fname, pem_cert in my_trusted_certs.items():
             try:
-                pubkey = cert_utils.read_x509_der_cert_pubkey(base64.b64decode(c))
-            except Exception:
-                self.fail("read_x509_der_cert_pubkey() is not expected to raise an exception here")
-            self.assertIsNotNone(pubkey)
+                cert = cert_utils.x509_pem_cert(pem_cert)
+            except Exception as e:
+                self.fail(f"Failed to load certificate {fname}: {e}")
+            self.assertIsNotNone(cert)
+
+    def test_verify_ek(self):
+        tests = [
+            {"cert": st_sha256_with_rsa_der, "expected": True},  # RSA, signed by STM_RSA_05I.pem.
+            {"cert": st_ecdsa_sha256_der, "expected": True},  # ECC, signed by STM_ECC_01I.pem.
+        ]
+        for t in tests:
+            self.assertEqual(
+                cert_utils.verify_ek(base64.b64decode(t["cert"]), CERT_STORE_DIR),
+                t["expected"],
+                msg=f"Test failed for cert {t['cert']}; expected: {t['expected']}",
+            )
+
+    @expectedFailureIf(has_strict_x509_parsing())
+    def test_verify_ek_expected_failures(self):
+        # The following certificates are not compliant, and will fail the
+        # signature verification with python-cryptography, even though they
+        # should validate. Marking as expected failure for now.
+        tests = [
+            {"cert": nuvoton_ecdsa_sha256_der, "expected": True},  # ECC, signed by NUVO_2110.pem.
+        ]
+        for t in tests:
+            self.assertEqual(
+                cert_utils.verify_ek(base64.b64decode(t["cert"]), CERT_STORE_DIR),
+                t["expected"],
+                msg=f"Test failed for cert {t['cert']}; expected: {t['expected']}",
+            )
+
+    def test_verify_ek_script(self):
+        # We will be using `nuvoton_ecdsa_sha256_der', which is signed by
+        # NUVO_2110.pem but fails verification when using python-cryptography
+        # as it is a malformed cert -- it is the same one we use in
+        # test_verify_ek_expected_failures().
+        # With an external script `ek_script_check' that uses openssl, the
+        # validation works.
+        cert = nuvoton_ecdsa_sha256_der.replace("\n", "")
+
+        self.assertFalse(cert_utils.verify_ek_script(None, None, None))
+        self.assertFalse(cert_utils.verify_ek_script("/foo/bar", None, None))
+
+        script = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "scripts", "ek-openssl-verify"))
+        # Testing ek-openssl-verify script, but without specifying the
+        # EK_CERT env var.
+        self.assertFalse(cert_utils.verify_ek_script(script, None, None))
+
+        # Now let's specify the EK_CERT.
+        env = os.environ.copy()
+        env["EK_CERT"] = cert
+        env["TPM_CERT_STORE"] = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "tpm_cert_store"))
+        self.assertTrue(cert_utils.verify_ek_script(script, env, None))
+
+        # Now, let us specify the ek_check_script with a relative path.
+        script = "ek-openssl-verify"
+        cwd = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "scripts"))
+        self.assertTrue(cert_utils.verify_ek_script(script, env, cwd))
+
+        # And now we try a bad TPM cert store.
+        env["TPM_CERT_STORE"] = "/some/bad/directory"
+        self.assertFalse(cert_utils.verify_ek_script(script, env, cwd))
-- 
2.38.1