Blob Blame History Raw
From f762f8f5e2f9b6d66d786b426d4d2fe40c994192 Mon Sep 17 00:00:00 2001
From: Antonio Torres <antorres@redhat.com>
Date: Fri, 23 Apr 2021 17:42:21 +0200
Subject: [PATCH] Add checks to detect mismatch of certificates

Add checks to detect mismatch of certificates between LDAP
and NSS databases. Check for existance of entries as well as
ensure the certificates match between the different databases.

Related: https://bugzilla.redhat.com/show_bug.cgi?id=1886770
Signed-off-by: Antonio Torres <antorres@redhat.com>
---
 README.md                       |  24 ++++
 src/ipahealthcheck/ipa/certs.py | 219 ++++++++++++++++++++++++++++++++
 2 files changed, 243 insertions(+)

diff --git a/README.md b/README.md
index 0f3ed6a..11e0e88 100644
--- a/README.md
+++ b/README.md
@@ -507,6 +507,30 @@ The trust for certificates stored in NSS databases is compared against a known g
       }
     }
 
+### IPACertMatchCheck
+Ensure CA certificate entries in LDAP and NSS databases match.
+
+    {
+      "source": "ipahealthcheck.ipa.certs",
+      "check": "IPACertMatchCheck",
+      "result": "ERROR",
+      "kw": {
+        "msg": "CA Certificate from /etc/ipa/nssdb does not match /etc/ipa/ca.crt"
+      }
+    }
+
+### IPADogtagCertsMatchCheck
+Check if Dogtag certificates present in both NSS DB and LDAP match.
+
+    {
+      "source": "ipahealthcheck.ipa.certs",
+      "check": "IPADogtagCertsMatchCheck",
+      "result": "ERROR",
+      "kw": {
+        "msg": "'subsystemCert cert-pki-ca' certificate in NSS DB does not match entry in LDAP"
+      }
+    }
+
 ### IPANSSChainValidation
 Validate the certificate chain of the NSS certificates. This executes: certutil -V -u V -e -d [dbdir] -n [nickname].
 
diff --git a/src/ipahealthcheck/ipa/certs.py b/src/ipahealthcheck/ipa/certs.py
index c668093..82435f3 100644
--- a/src/ipahealthcheck/ipa/certs.py
+++ b/src/ipahealthcheck/ipa/certs.py
@@ -29,6 +29,7 @@ from ipaserver.plugins import ldap2
 from ipapython import certdb
 from ipapython import ipautil
 from ipapython.dn import DN
+from ipapython.ipaldap import realm_to_serverid
 
 logger = logging.getLogger()
 DAY = 60 * 60 * 24
@@ -587,6 +588,224 @@ class IPACertNSSTrust(IPAPlugin):
                     'verifying trust')
 
 
+@registry
+class IPACertMatchCheck(IPAPlugin):
+    """
+    Ensure certificates match between LDAP and NSS databases
+    """
+
+    requires = ('dirsrv',)
+
+    def get_cert_list_from_db(self, nssdb, nickname):
+        """
+        Retrieve all certificates from an NSS database for nickname.
+        """
+        try:
+            args = ["-L", "-n", nickname, "-a"]
+            result = nssdb.run_certutil(args, capture_output=True)
+            return x509.load_certificate_list(result.raw_output)
+        except ipautil.CalledProcessError:
+            return []
+
+    @duration
+    def check(self):
+        if not self.ca.is_configured():
+            logger.debug("No CA configured, skipping certificate match check")
+            return
+
+        # Ensure /etc/ipa/ca.crt matches the NSS DB CA certificates
+        def match_cacert_and_db(plugin, cacerts, dbpath):
+            db = certs.CertDB(api.env.realm, dbpath)
+            nickname = '%s IPA CA' % api.env.realm
+            try:
+                dbcacerts = self.get_cert_list_from_db(db, nickname)
+            except Exception as e:
+                yield Result(plugin, constants.ERROR,
+                             error=str(e),
+                             msg='Unable to load CA cert: {error}')
+                return False
+
+            ok = True
+            for cert in dbcacerts:
+                if cert not in cacerts:
+                    ok = False
+                    yield Result(plugin, constants.ERROR,
+                                 nickname=nickname,
+                                 serial_number=cert.serial_number,
+                                 dbdir=dbpath,
+                                 certdir=paths.IPA_CA_CRT,
+                                 msg=('CA Certificate nickname {nickname} '
+                                      'with serial number {serial} '
+                                      'is in {dbdir} but is not in'
+                                      '%s' % paths.IPA_CA_CRT))
+            return ok
+
+        try:
+            cacerts = x509.load_certificate_list_from_file(paths.IPA_CA_CRT)
+        except Exception:
+            yield Result(self, constants.ERROR,
+                         path=paths.IPA_CA_CRT,
+                         msg='Unable to load CA cert file {path}: {error}')
+            return
+
+        # Ensure CA cert entry from LDAP matches /etc/ipa/ca.crt
+        dn = DN('cn=%s IPA CA' % api.env.realm,
+                'cn=certificates,cn=ipa,cn=etc',
+                api.env.basedn)
+        try:
+            entry = self.conn.get_entry(dn)
+        except errors.NotFound:
+            yield Result(self, constants.ERROR,
+                         dn=str(dn),
+                         msg='CA Certificate entry \'{dn}\' '
+                             'not found in LDAP')
+            return
+
+        cacerts_ok = True
+        # Are all the certs in LDAP for the IPA CA in /etc/ipa/ca.crt
+        for cert in entry['CACertificate']:
+            if cert not in cacerts:
+                cacerts_ok = False
+                yield Result(self, constants.ERROR,
+                             dn=str(dn),
+                             serial_number=cert.serial_number,
+                             msg=('CA Certificate serial number {serial} is '
+                                  'in LDAP \'{dn}\' but is not in '
+                                  '%s' % paths.IPA_CA_CRT))
+
+        # Ensure NSS DBs have matching CA certs for /etc/ipa/ca.crt
+        serverid = realm_to_serverid(api.env.realm)
+        dspath = paths.ETC_DIRSRV_SLAPD_INSTANCE_TEMPLATE % serverid
+
+        cacertds_ok = yield from match_cacert_and_db(self, cacerts, dspath)
+        cacertnss_ok = yield from match_cacert_and_db(self, cacerts,
+                                                      paths.IPA_NSSDB_DIR)
+        if cacerts_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=paths.IPA_CA_CRT)
+        if cacertds_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=dspath)
+        if cacertnss_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=paths.IPA_NSSDB_DIR)
+
+
+@registry
+class IPADogtagCertsMatchCheck(IPAPlugin):
+    """
+    Check if dogtag certs present in both NSS DB and LDAP match
+    """
+    requires = ('dirsrv',)
+
+    @duration
+    def check(self):
+        if not self.ca.is_configured():
+            logger.debug('CA is not configured, skipping connectivity check')
+            return
+
+        def match_ldap_nss_cert(plugin, ldap, db, cert_dn, attr, cert_nick):
+            try:
+                entry = ldap.get_entry(cert_dn)
+            except errors.NotFound:
+                yield Result(plugin, constants.ERROR,
+                             msg='%s entry not found in LDAP' % cert_dn)
+                return False
+            try:
+                nsscert = db.get_cert_from_db(cert_nick)
+            except Exception as e:
+                yield Result(plugin, constants.ERROR,
+                             error=str(e),
+                             msg=('Unable to load %s certificate:'
+                                  '{error}' % cert_nick))
+                return False
+            cert_matched = any([cert == nsscert for cert in entry[attr]])
+            if not cert_matched:
+                yield Result(plugin, constants.ERROR,
+                             key=cert_nick,
+                             nickname=cert_nick,
+                             dbdir=db.secdir,
+                             msg=('{nickname} certificate in NSS DB {dbdir} '
+                                  'does not match entry in LDAP'))
+                return False
+            return True
+
+        def match_ldap_nss_certs_by_subject(plugin, ldap, db, dn,
+                                            expected_nicks_subjects):
+            entries = ldap.get_entries(dn)
+            all_ok = True
+            for nick, subject in expected_nicks_subjects.items():
+                cert = db.get_cert_from_db(nick)
+                ok = any([cert in entry['userCertificate'] and
+                          subject == entry['subjectName'][0]
+                          for entry in entries
+                          if 'userCertificate' in entry])
+                if not ok:
+                    all_ok = False
+                    yield Result(plugin, constants.ERROR,
+                                 key=nick,
+                                 nickname=nick,
+                                 dbdir=db.secdir,
+                                 msg=('{nickname} certificate in NSS DB '
+                                      '{dbdir} does not match entry in LDAP'))
+            return all_ok
+
+        db = certs.CertDB(api.env.realm, paths.PKI_TOMCAT_ALIAS_DIR)
+        dn = DN('uid=pkidbuser,ou=people,o=ipaca')
+        subsystem_nick = 'subsystemCert cert-pki-ca'
+        subsystem_ok = yield from match_ldap_nss_cert(self, self.conn,
+                                                      db, dn,
+                                                      'userCertificate',
+                                                      subsystem_nick)
+        dn = DN('cn=%s IPA CA' % api.env.realm,
+                'cn=certificates,cn=ipa,cn=etc',
+                api.env.basedn)
+        casigning_nick = 'caSigningCert cert-pki-ca'
+        casigning_ok = yield from match_ldap_nss_cert(self, self.conn,
+                                                      db, dn, 'CACertificate',
+                                                      casigning_nick)
+
+        expected_nicks_subjects = {
+            'ocspSigningCert cert-pki-ca':
+                'CN=OCSP Subsystem,O=%s' % api.env.realm,
+            'subsystemCert cert-pki-ca':
+                'CN=CA Subsystem,O=%s' % api.env.realm,
+            'auditSigningCert cert-pki-ca':
+                'CN=CA Audit,O=%s' % api.env.realm,
+            'Server-Cert cert-pki-ca':
+                'CN=%s,O=%s' % (api.env.host, api.env.realm),
+        }
+
+        kra = krainstance.KRAInstance(api.env.realm)
+        if kra.is_installed():
+            kra_expected_nicks_subjects = {
+                'transportCert cert-pki-kra':
+                    'CN=KRA Transport Certificate,O=%s' % api.env.realm,
+                'storageCert cert-pki-kra':
+                    'CN=KRA Storage Certificate,O=%s' % api.env.realm,
+                'auditSigningCert cert-pki-kra':
+                    'CN=KRA Audit,O=%s' % api.env.realm,
+            }
+            expected_nicks_subjects.update(kra_expected_nicks_subjects)
+
+        ipaca_basedn = DN('ou=certificateRepository,ou=ca,o=ipaca')
+        ipaca_certs_ok = yield from match_ldap_nss_certs_by_subject(
+                                    self, self.conn, db,
+                                    ipaca_basedn,
+                                    expected_nicks_subjects
+                                )
+
+        if subsystem_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=subsystem_nick)
+        if casigning_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=casigning_nick)
+        if ipaca_certs_ok:
+            yield Result(self, constants.SUCCESS,
+                         key=str(ipaca_basedn))
+
+
 @registry
 class IPANSSChainValidation(IPAPlugin):
     """Validate the certificate chain of the certs."""
-- 
2.26.3