The Identity, Policy and Audit system
CentOS Sources
2016-11-03 403b09ab980c02ef36095973349a13e0181c794a
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
From c9428fe0a1230fb9ea9c18c895c0834678e94da8 Mon Sep 17 00:00:00 2001
From: Fraser Tweedale <ftweedal@redhat.com>
Date: Fri, 26 Aug 2016 15:31:13 +1000
Subject: [PATCH] Make host/service cert revocation aware of lightweight CAs
 
Revocation of host/service certs on host/service deletion or other
operations is broken when cert is issued by a lightweight (sub)CA,
causing the delete operation to be aborted.  Look up the issuing CA
and pass it to 'cert_revoke' to fix the issue.
 
Fixes: https://fedorahosted.org/freeipa/ticket/6221
Reviewed-By: Jan Cholasta <jcholast@redhat.com>
---
 ipaserver/plugins/host.py    | 20 +++++++---------
 ipaserver/plugins/service.py | 56 ++++++++++++++++++++++----------------------
 2 files changed, 37 insertions(+), 39 deletions(-)
 
diff --git a/ipaserver/plugins/host.py b/ipaserver/plugins/host.py
index 03c64c637cbba0aee1b6569f3b5dbe200953bff8..2362b6247af87b4ce63c21083e6bc8ac39db0804 100644
--- a/ipaserver/plugins/host.py
+++ b/ipaserver/plugins/host.py
@@ -843,12 +843,8 @@ class host_del(LDAPDelete):
                 )
 
         if self.api.Command.ca_is_enabled()['result']:
-            try:
-                entry_attrs = ldap.get_entry(dn, ['usercertificate'])
-            except errors.NotFound:
-                self.obj.handle_not_found(*keys)
-
-            revoke_certs(entry_attrs.get('usercertificate', []), self.log)
+            certs = self.api.Command.cert_find(host=keys)['result']
+            revoke_certs(certs)
 
         return dn
 
@@ -910,7 +906,9 @@ class host_mod(LDAPUpdate):
             old_certs = entry_attrs_old.get('usercertificate', [])
             old_certs_der = [x509.normalize_certificate(c) for c in old_certs]
             removed_certs_der = set(old_certs_der) - set(certs_der)
-            revoke_certs(removed_certs_der, self.log)
+            for der in removed_certs_der:
+                rm_certs = api.Command.cert_find(certificate=der)['result']
+                revoke_certs(rm_certs)
 
         if certs:
             entry_attrs['usercertificate'] = certs_der
@@ -1196,10 +1194,10 @@ class host_disable(LDAPQuery):
         except errors.NotFound:
             self.obj.handle_not_found(*keys)
         if self.api.Command.ca_is_enabled()['result']:
-            certs = entry_attrs.get('usercertificate', [])
+            certs = self.api.Command.cert_find(host=keys)['result']
 
             if certs:
-                revoke_certs(certs, self.log)
+                revoke_certs(certs)
                 # Remove the usercertificate altogether
                 entry_attrs['usercertificate'] = None
                 ldap.update_entry(entry_attrs)
@@ -1341,8 +1339,8 @@ class host_remove_cert(LDAPRemoveAttributeViaOption):
     def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
         assert isinstance(dn, DN)
 
-        if 'usercertificate' in options:
-            revoke_certs(options['usercertificate'], self.log)
+        for cert in options.get('usercertificate', []):
+            revoke_certs(api.Command.cert_find(certificate=cert)['result'])
 
         return dn
 
diff --git a/ipaserver/plugins/service.py b/ipaserver/plugins/service.py
index 04d1916fe989a8651bcc4d44f1914c460be1081c..093525f2e7cb84b18f0658dcb5d7c786e45c6ab6 100644
--- a/ipaserver/plugins/service.py
+++ b/ipaserver/plugins/service.py
@@ -220,37 +220,38 @@ def validate_certificate(ugettext, cert):
         x509.validate_certificate(cert, datatype=x509.DER)
 
 
-def revoke_certs(certs, logger=None):
+def revoke_certs(certs):
     """
     revoke the certificates removed from host/service entry
-    """
-    for cert in certs:
-        try:
-            cert = x509.normalize_certificate(cert)
-        except errors.CertificateFormatError as e:
-            if logger is not None:
-                logger.info("Problem decoding certificate: %s" % e)
 
-        serial = unicode(x509.get_serial_number(cert, x509.DER))
+    :param certs: Output of a 'cert_find' command.
 
-        try:
-            result = api.Command['cert_show'](unicode(serial))['result']
-        except errors.CertificateOperationError:
-            continue
-        if 'revocation_reason' in result:
+    """
+    for cert in certs:
+        if 'cacn' not in cert:
+            # Cert is known to IPA, but has no associated CA.
+            # If it was issued by 3rd-party CA, we can't revoke it.
+            # If it was issued by a Dogtag lightweight CA that was
+            # subsequently deleted, we can't revoke it via IPA.
+            # We could go directly to Dogtag to revoke it, but the
+            # issuer's cert should have been revoked so never mind.
             continue
-        if x509.normalize_certificate(result['certificate']) != cert:
+
+        if cert['revoked']:
+            # cert is already revoked
             continue
 
         try:
-            api.Command['cert_revoke'](unicode(serial),
-                                       revocation_reason=4)
+            api.Command['cert_revoke'](
+                cert['serial_number'],
+                cacn=cert['cacn'],
+                revocation_reason=4,
+            )
         except errors.NotImplementedError:
             # some CA's might not implement revoke
             pass
 
 
-
 def set_certificate_attrs(entry_attrs):
     """
     Set individual attributes from some values from a certificate.
@@ -674,11 +675,8 @@ class service_del(LDAPDelete):
         # custom services allow them to manage them.
         check_required_principal(ldap, keys[-1])
         if self.api.Command.ca_is_enabled()['result']:
-            try:
-                entry_attrs = ldap.get_entry(dn, ['usercertificate'])
-            except errors.NotFound:
-                self.obj.handle_not_found(*keys)
-            revoke_certs(entry_attrs.get('usercertificate', []), self.log)
+            certs = self.api.Command.cert_find(service=keys)['result']
+            revoke_certs(certs)
 
         return dn
 
@@ -711,7 +709,9 @@ class service_mod(LDAPUpdate):
             old_certs = entry_attrs_old.get('usercertificate', [])
             old_certs_der = [x509.normalize_certificate(c) for c in old_certs]
             removed_certs_der = set(old_certs_der) - set(certs_der)
-            revoke_certs(removed_certs_der, self.log)
+            for der in removed_certs_der:
+                rm_certs = api.Command.cert_find(certificate=der)['result']
+                revoke_certs(rm_certs)
 
         if certs:
             entry_attrs['usercertificate'] = certs_der
@@ -950,10 +950,10 @@ class service_disable(LDAPQuery):
         done_work = False
 
         if self.api.Command.ca_is_enabled()['result']:
-            certs = entry_attrs.get('usercertificate', [])
+            certs = self.api.Command.cert_find(service=keys)['result']
 
             if len(certs) > 0:
-                revoke_certs(certs, self.log)
+                revoke_certs(certs)
                 # Remove the usercertificate altogether
                 entry_attrs['usercertificate'] = None
                 ldap.update_entry(entry_attrs)
@@ -989,8 +989,8 @@ class service_remove_cert(LDAPRemoveAttributeViaOption):
     def post_callback(self, ldap, dn, entry_attrs, *keys, **options):
         assert isinstance(dn, DN)
 
-        if 'usercertificate' in options:
-            revoke_certs(options['usercertificate'], self.log)
+        for cert in options.get('usercertificate', []):
+            revoke_certs(api.Command.cert_find(certificate=cert)['result'])
 
         return dn
 
-- 
2.7.4