Blob Blame Raw
From 5b7d67bdef7810c661ae4ba1fdfa620c86985661 Mon Sep 17 00:00:00 2001
From: Mark Reynolds <mreynolds@redhat.com>
Date: Fri, 27 Apr 2018 08:34:51 -0400
Subject: [PATCH] Ticket 49652 - DENY aci's are not handled properly

Bug Description:  There are really two issues here.  One, when a resource
                  is denied by a DENY aci the cached results for that resource
                  are not proprely set, and on the same connection if the same
                  operation repeated it will be allowed instead of denied because
                  the cache result was not proprely updated.

                  Two, if there are no ALLOW aci's on a resource, then we don't
                  check the deny rules, and resources that are restricted are
                  returned to the client.

Fix Description:  For issue one, when an entry is denied access reset all the
                  attributes' cache results to DENIED as it's possible previously
                  evaluated aci's granted access to some of these attributes which
                  are still present in the acl result cache.

                  For issue two, if there are no ALLOW aci's on a resource but
                  there are DENY aci's, then set the aclpb state flags to
                  process DENY aci's

https://pagure.io/389-ds-base/issue/49652

Reviewed by: tbordaz & lkrispenz(Thanks!!)

(cherry picked from commit d77c7f0754f67022b42784c05be8a493a00f2ec5)
---
 dirsrvtests/tests/suites/acl/acl_deny_test.py | 198 ++++++++++++++++++
 ldap/servers/plugins/acl/acl.c                |  24 ++-
 2 files changed, 220 insertions(+), 2 deletions(-)
 create mode 100644 dirsrvtests/tests/suites/acl/acl_deny_test.py

diff --git a/dirsrvtests/tests/suites/acl/acl_deny_test.py b/dirsrvtests/tests/suites/acl/acl_deny_test.py
new file mode 100644
index 000000000..285664150
--- /dev/null
+++ b/dirsrvtests/tests/suites/acl/acl_deny_test.py
@@ -0,0 +1,198 @@
+import logging
+import pytest
+import os
+import ldap
+import time
+from lib389._constants import *
+from lib389.topologies import topology_st as topo
+from lib389.idm.user import UserAccount, UserAccounts, TEST_USER_PROPERTIES
+from lib389.idm.domain import Domain
+
+DEBUGGING = os.getenv("DEBUGGING", default=False)
+if DEBUGGING:
+    logging.getLogger(__name__).setLevel(logging.DEBUG)
+else:
+    logging.getLogger(__name__).setLevel(logging.INFO)
+log = logging.getLogger(__name__)
+
+BIND_DN2 = 'uid=tuser,ou=People,dc=example,dc=com'
+BIND_RDN2 = 'tuser'
+BIND_DN = 'uid=tuser1,ou=People,dc=example,dc=com'
+BIND_RDN = 'tuser1'
+SRCH_FILTER = "uid=tuser1"
+SRCH_FILTER2 = "uid=tuser"
+
+aci_list_A = ['(targetattr != "userPassword") (version 3.0; acl "Anonymous access"; allow (read, search, compare)userdn = "ldap:///anyone";)',
+              '(targetattr = "*") (version 3.0;acl "allow tuser";allow (all)(userdn = "ldap:///uid=tuser5,ou=People,dc=example,dc=com");)',
+              '(targetattr != "uid || mail") (version 3.0; acl "deny-attrs"; deny (all) (userdn = "ldap:///anyone");)',
+              '(targetfilter = "(inetUserStatus=1)") ( version 3.0; acl "deny-specific-entry"; deny(all) (userdn = "ldap:///anyone");)']
+
+aci_list_B = ['(targetattr != "userPassword") (version 3.0; acl "Anonymous access"; allow (read, search, compare)userdn = "ldap:///anyone";)',
+              '(targetattr != "uid || mail") (version 3.0; acl "deny-attrs"; deny (all) (userdn = "ldap:///anyone");)',
+              '(targetfilter = "(inetUserStatus=1)") ( version 3.0; acl "deny-specific-entry"; deny(all) (userdn = "ldap:///anyone");)']
+
+
+@pytest.fixture(scope="module")
+def aci_setup(topo):
+    topo.standalone.log.info("Add {}".format(BIND_DN))
+    user = UserAccount(topo.standalone, BIND_DN)
+    user_props = TEST_USER_PROPERTIES.copy()
+    user_props.update({'sn': BIND_RDN,
+                       'cn': BIND_RDN,
+                       'uid': BIND_RDN,
+                       'inetUserStatus': '1',
+                       'objectclass': 'extensibleObject',
+                       'userpassword': PASSWORD})
+    user.create(properties=user_props, basedn=SUFFIX)
+
+    topo.standalone.log.info("Add {}".format(BIND_DN2))
+    user2 = UserAccount(topo.standalone, BIND_DN2)
+    user_props = TEST_USER_PROPERTIES.copy()
+    user_props.update({'sn': BIND_RDN2,
+                       'cn': BIND_RDN2,
+                       'uid': BIND_RDN2,
+                       'userpassword': PASSWORD})
+    user2.create(properties=user_props, basedn=SUFFIX)
+
+
+def test_multi_deny_aci(topo, aci_setup):
+    """Test that mutliple deny rules work, and that they the cache properly
+    stores the result
+
+    :id: 294c366d-850e-459e-b5a0-3cc828ec3aca
+    :setup: Standalone Instance
+    :steps:
+        1. Add aci_list_A aci's and verify two searches on the same connection
+           behave the same
+        2. Add aci_list_B aci's and verify search fails as expected
+    :expectedresults:
+        1. Both searches do not return any entries
+        2. Seaches do not return any entries
+    """
+
+    if DEBUGGING:
+        # Maybe add aci logging?
+        pass
+
+    suffix = Domain(topo.standalone, DEFAULT_SUFFIX)
+
+    for run in range(2):
+        topo.standalone.log.info("Pass " + str(run + 1))
+
+        # Test ACI List A
+        topo.standalone.log.info("Testing two searches behave the same...")
+        topo.standalone.simple_bind_s(DN_DM, PASSWORD)
+        suffix.set('aci', aci_list_A, ldap.MOD_REPLACE)
+        time.sleep(1)
+
+        topo.standalone.simple_bind_s(BIND_DN, PASSWORD)
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as good user")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as good user")
+            assert False
+
+        # Bind a different user who has rights
+        topo.standalone.simple_bind_s(BIND_DN2, PASSWORD)
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as good user")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as good user (2)")
+            assert False
+
+        if run > 0:
+            # Second pass
+            topo.standalone.restart()
+
+        # Reset ACI's and do the second test
+        topo.standalone.log.info("Testing search does not return any entries...")
+        topo.standalone.simple_bind_s(DN_DM, PASSWORD)
+        suffix.set('aci', aci_list_B, ldap.MOD_REPLACE)
+        time.sleep(1)
+
+        topo.standalone.simple_bind_s(BIND_DN, PASSWORD)
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
+            assert False
+
+        if run > 0:
+            # Second pass
+            topo.standalone.restart()
+
+        # Bind as different user who has rights
+        topo.standalone.simple_bind_s(BIND_DN2, PASSWORD)
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as good user")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as good user (2)")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
+            assert False
+
+        # back to user 1
+        topo.standalone.simple_bind_s(BIND_DN, PASSWORD)
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as user1")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER2)
+        if entries is None or len(entries) == 0:
+            topo.standalone.log.fatal("Failed to get entry as user1 (2)")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 1")
+            assert False
+
+        entries = topo.standalone.search_s(DEFAULT_SUFFIX, ldap.SCOPE_SUBTREE, SRCH_FILTER)
+        if entries and entries[0]:
+            topo.standalone.log.fatal("Incorrectly got an entry returned from search 2")
+            assert False
+
+    topo.standalone.log.info("Test PASSED")
+
+
+if __name__ == '__main__':
+    # Run isolated
+    # -s for DEBUG mode
+    CURRENT_FILE = os.path.realpath(__file__)
+    pytest.main(["-s", CURRENT_FILE])
+
diff --git a/ldap/servers/plugins/acl/acl.c b/ldap/servers/plugins/acl/acl.c
index bc154c78f..6d105f4fa 100644
--- a/ldap/servers/plugins/acl/acl.c
+++ b/ldap/servers/plugins/acl/acl.c
@@ -1088,9 +1088,23 @@ acl_read_access_allowed_on_entry(
                     ** a DENY rule, then we don't have access to
                     ** the entry ( nice trick to get in )
                     */
-                    if (aclpb->aclpb_state &
-                        ACLPB_EXECUTING_DENY_HANDLES)
+                    if (aclpb->aclpb_state & ACLPB_EXECUTING_DENY_HANDLES) {
+                        aclEvalContext *c_ContextEval = &aclpb->aclpb_curr_entryEval_context;
+                        AclAttrEval *c_attrEval = NULL;
+                        /*
+                         * The entire entry is blocked, but previously evaluated allow aci's might
+                         * show some of the attributes as readable in the acl cache, so reset all
+                         * the cached attributes' status to FAIL.
+                         */
+                        for (size_t j = 0; j < c_ContextEval->acle_numof_attrs; j++) {
+                            c_attrEval = &c_ContextEval->acle_attrEval[j];
+                            c_attrEval->attrEval_r_status &= ~ACL_ATTREVAL_SUCCESS;
+                            c_attrEval->attrEval_r_status |= ACL_ATTREVAL_FAIL;
+                            c_attrEval->attrEval_s_status &= ~ACL_ATTREVAL_SUCCESS;
+                            c_attrEval->attrEval_s_status |= ACL_ATTREVAL_FAIL;
+                        }
                         return LDAP_INSUFFICIENT_ACCESS;
+                    }
 
                     /* The other case is I don't have an
                     ** explicit allow rule -- which is fine.
@@ -2908,6 +2922,12 @@ acl__TestRights(Acl_PBlock *aclpb, int access, const char **right, const char **
         result_reason->deciding_aci = NULL;
         result_reason->reason = ACL_REASON_NO_MATCHED_RESOURCE_ALLOWS;
 
+        /* If we have deny handles we should process them */
+        if (aclpb->aclpb_num_deny_handles > 0) {
+            aclpb->aclpb_state &= ~ACLPB_EXECUTING_ALLOW_HANDLES;
+            aclpb->aclpb_state |= ACLPB_EXECUTING_DENY_HANDLES;
+        }
+
         TNF_PROBE_1_DEBUG(acl__TestRights_end, "ACL", "",
                           tnf_string, no_allows, "");
 
-- 
2.17.0