Blob Blame History Raw
From 3f6ed4393dfa9ddf982e326065a3ea160bef90b6 Mon Sep 17 00:00:00 2001
From: Antonio Torres <antorres@redhat.com>
Date: Tue, 23 Feb 2021 16:11:59 +0100
Subject: [PATCH] Add check for IPA KRA Agent

Add check to validate KRA Agent in case KRA is installed, including
checking for the KRA Agent LDAP entry.

Fixes: https://bugzilla.redhat.com/show_bug.cgi?id=1894781
Signed-off-by: Antonio Torres <antorres@redhat.com>
---
 README.md                       |  16 ++-
 src/ipahealthcheck/ipa/certs.py | 167 +++++++++++++++++++-------------
 2 files changed, 112 insertions(+), 71 deletions(-)

diff --git a/README.md b/README.md
index b9c60a2..0f3ed6a 100644
--- a/README.md
+++ b/README.md
@@ -547,7 +547,21 @@ Verify the description and userCertificate values in uid=ipara,ou=People,o=ipaca
       "kw": {
         "expected": "2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
         "got": "2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
-        "msg": "RA agent description does not match 2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST in LDAP and expected 2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST"
+        "msg": "RA agent description does not match. Found 2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST in LDAP and expected 2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST"
+      }
+    }
+
+### IPAKRAAgent
+Verify the description and userCertificate values in uid=ipakra,ou=people,o=kra,o=ipaca.
+
+    {
+      "source": "ipahealthcheck.ipa.certs",
+      "check": "IPAKRAAgent",
+      "result": "ERROR",
+      "kw": {
+        "expected": "2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
+        "got": "2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST",
+        "msg": "KRA agent description does not match. Found 2;7;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST in LDAP and expected 2;125;CN=Certificate Authority,O=EXAMPLE.TEST;CN=IPA RA,O=EXAMPLE.TEST"
       }
     }
 
diff --git a/src/ipahealthcheck/ipa/certs.py b/src/ipahealthcheck/ipa/certs.py
index d3043d0..32c0d76 100644
--- a/src/ipahealthcheck/ipa/certs.py
+++ b/src/ipahealthcheck/ipa/certs.py
@@ -724,6 +724,83 @@ class IPAOpenSSLChainValidation(IPAPlugin):
                         self, constants.SUCCESS, key=cert)
 
 
+def check_agent(plugin, base_dn, agent_type):
+    """Check RA/KRA Agent"""
+
+    try:
+        cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
+    except Exception as e:
+        yield Result(plugin, constants.ERROR,
+                     error=str(e),
+                     msg='Unable to load RA cert: {error}')
+        return
+    serial_number = cert.serial_number
+    subject = DN(cert.subject)
+    issuer = DN(cert.issuer)
+    description = '2;%d;%s;%s' % (serial_number, issuer, subject)
+    logger.debug('%s agent description should be %s', agent_type, description)
+    db_filter = ldap2.ldap2.combine_filters(
+        [
+            ldap2.ldap2.make_filter({'objectClass': 'inetOrgPerson'}),
+            ldap2.ldap2.make_filter(
+                {'description': ';%s;%s' % (issuer, subject)},
+                exact=False, trailing_wildcard=False),
+        ],
+        ldap2.ldap2.MATCH_ALL)
+    try:
+        entries = plugin.conn.get_entries(base_dn,
+                                          plugin.conn.SCOPE_SUBTREE,
+                                          db_filter)
+    except errors.NotFound:
+        yield Result(plugin, constants.ERROR,
+                     description=description,
+                     msg='%s agent not found in LDAP' % agent_type)
+        return
+    except Exception as e:
+        yield Result(plugin, constants.ERROR,
+                     error=str(e),
+                     msg='Retrieving %s agent from LDAP failed {error}'
+                         % agent_type)
+        return
+    else:
+        logger.debug('%s agent description is %s', agent_type, description)
+        if len(entries) != 1:
+            yield Result(plugin, constants.ERROR,
+                         found=len(entries),
+                         msg='Too many %s agent entries found, {found}'
+                             % agent_type)
+            return
+        entry = entries[0]
+        raw_desc = entry.get('description')
+        if raw_desc is None:
+            yield Result(plugin, constants.ERROR,
+                         msg='%s agent is missing the description '
+                             'attribute or it is not readable' % agent_type)
+            return
+        ra_desc = raw_desc[0]
+        ra_certs = entry.get('usercertificate')
+        if ra_desc != description:
+            yield Result(plugin, constants.ERROR,
+                         expected=description,
+                         got=ra_desc,
+                         msg='%s agent description does not match. Found '
+                         '{got} in LDAP and expected {expected}' % agent_type)
+            return
+        found = False
+        for candidate in ra_certs:
+            if candidate == cert:
+                found = True
+                break
+        if not found:
+            yield Result(plugin, constants.ERROR,
+                         certfile=paths.RA_AGENT_PEM,
+                         dn=str(entry.dn),
+                         msg='%s agent certificate in {certfile} not '
+                             'found in LDAP userCertificate attribute '
+                             'for the entry {dn}' % agent_type)
+        yield Result(plugin, constants.SUCCESS)
+
+
 @registry
 class IPARAAgent(IPAPlugin):
     """Validate the RA Agent used to talk to the CA
@@ -739,82 +816,32 @@ class IPARAAgent(IPAPlugin):
             logger.debug('CA is not configured, skipping RA Agent check')
             return
 
-        try:
-            cert = x509.load_certificate_from_file(paths.RA_AGENT_PEM)
-        except Exception as e:
-            yield Result(self, constants.ERROR,
-                         error=str(e),
-                         msg='Unable to load RA cert: {error}')
-            return
+        base_dn = DN('uid=ipara,ou=people,o=ipaca')
+        yield from check_agent(self, base_dn, 'RA')
 
-        serial_number = cert.serial_number
-        subject = DN(cert.subject)
-        issuer = DN(cert.issuer)
-        description = '2;%d;%s;%s' % (serial_number, issuer, subject)
 
-        logger.debug('RA agent description should be %s', description)
+@registry
+class IPAKRAAgent(IPAPlugin):
+    """Validate the KRA Agent
 
-        db_filter = ldap2.ldap2.combine_filters(
-            [
-                ldap2.ldap2.make_filter({'objectClass': 'inetOrgPerson'}),
-                ldap2.ldap2.make_filter({'sn': 'ipara'}),
-                ldap2.ldap2.make_filter(
-                    {'description': ';%s;%s' % (issuer, subject)},
-                    exact=False, trailing_wildcard=False),
-            ],
-            ldap2.ldap2.MATCH_ALL)
+       Compare the description and usercertificate values.
+    """
 
-        base_dn = DN(('o', 'ipaca'))
-        try:
-            entries = self.conn.get_entries(base_dn,
-                                            self.conn.SCOPE_SUBTREE,
-                                            db_filter)
-        except errors.NotFound:
-            yield Result(self, constants.ERROR,
-                         description=description,
-                         msg='RA agent not found in LDAP')
+    requires = ('dirsrv',)
+
+    @duration
+    def check(self):
+        if not self.ca.is_configured():
+            logger.debug('CA is not configured, skipping KRA Agent check')
             return
-        except Exception as e:
-            yield Result(self, constants.ERROR,
-                         error=str(e),
-                         msg='Retrieving RA agent from LDAP failed {error}')
+
+        kra = krainstance.KRAInstance(api.env.realm)
+        if not kra.is_installed():
+            logger.debug('KRA is not installed, skipping KRA Agent check')
             return
-        else:
-            logger.debug('RA agent description is %s', description)
-            if len(entries) != 1:
-                yield Result(self, constants.ERROR,
-                             found=len(entries),
-                             msg='Too many RA agent entries found, {found}')
-                return
-            entry = entries[0]
-            raw_desc = entry.get('description')
-            if raw_desc is None:
-                yield Result(self, constants.ERROR,
-                             msg='RA agent is missing the description '
-                                 'attribute or it is not readable')
-                return
-            ra_desc = raw_desc[0]
-            ra_certs = entry.get('usercertificate')
-            if ra_desc != description:
-                yield Result(self, constants.ERROR,
-                             expected=description,
-                             got=ra_desc,
-                             msg='RA agent description does not match. Found '
-                             '{got} in LDAP and expected {expected}')
-                return
-            found = False
-            for candidate in ra_certs:
-                if candidate == cert:
-                    found = True
-                    break
-            if not found:
-                yield Result(self, constants.ERROR,
-                             certfile=paths.RA_AGENT_PEM,
-                             dn=str(entry.dn),
-                             msg='RA agent certificate in {certfile} not '
-                                 'found in LDAP userCertificate attribute '
-                                 'for the entry {dn}')
-            yield Result(self, constants.SUCCESS)
+
+        base_dn = DN('uid=ipakra,ou=people,o=kra,o=ipaca')
+        yield from check_agent(self, base_dn, 'KRA')
 
 
 @registry
-- 
2.26.2