Blob Blame History Raw
From aae797abf3bfcfda124f111d8b4e805e77bee691 Mon Sep 17 00:00:00 2001
From: Lukas Slebodnik <lslebodn@redhat.com>
Date: Tue, 4 Aug 2015 12:47:58 +0200
Subject: [PATCH 47/47] test_memory_cache: Test invalidation with sss_cache
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit

Reviewed-by: Michal Židek <mzidek@redhat.com>
---
 src/tests/intg/test_memory_cache.py | 176 ++++++++++++++++++++++++++++++++++++
 1 file changed, 176 insertions(+)

diff --git a/src/tests/intg/test_memory_cache.py b/src/tests/intg/test_memory_cache.py
index c809a4b6daacfd04834db46d21bfb97ad025ada6..1fd577e652d278c35211b55c871797a3dee98b13 100644
--- a/src/tests/intg/test_memory_cache.py
+++ b/src/tests/intg/test_memory_cache.py
@@ -20,6 +20,7 @@ import os
 import stat
 import ent
 import grp
+import pwd
 import config
 import signal
 import subprocess
@@ -570,3 +571,178 @@ def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307):
 
     # everything should be in memory cache
     run_simple_test_with_initgroups()
+
+
+def assert_mc_records_for_user1():
+    ent.assert_passwd_by_name(
+        'user1',
+        dict(name='user1', passwd='*', uid=1001, gid=2001,
+             gecos='1001', shell='/bin/bash'))
+    ent.assert_passwd_by_uid(
+        1001,
+        dict(name='user1', passwd='*', uid=1001, gid=2001,
+             gecos='1001', shell='/bin/bash'))
+
+    ent.assert_group_by_name(
+        "group1",
+        dict(mem=ent.contains_only("user1", "user11", "user21")))
+    ent.assert_group_by_gid(
+        2001,
+        dict(mem=ent.contains_only("user1", "user11", "user21")))
+    ent.assert_group_by_name(
+        "group0x",
+        dict(mem=ent.contains_only("user1", "user2", "user3")))
+    ent.assert_group_by_gid(
+        2000,
+        dict(mem=ent.contains_only("user1", "user2", "user3")))
+
+    assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+
+def assert_missing_mc_records_for_user1():
+    with pytest.raises(KeyError):
+        pwd.getpwnam("user1")
+    with pytest.raises(KeyError):
+        pwd.getpwuid(1001)
+
+    for gid in [2000, 2001]:
+        with pytest.raises(KeyError):
+            grp.getgrgid(gid)
+    for group in ["group0x", "group1"]:
+        with pytest.raises(KeyError):
+            grp.getgrnam(group)
+
+    (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001)
+    assert res == sssd_id.NssReturnCode.UNAVAIL, \
+        "Initgroups should not find anything after invalidation of mc.\n" \
+        "User %s, errno:%d" % (user, err)
+
+
+def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    subprocess.call(["sss_cache", "-u", "user1"])
+    stop_sssd()
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    stop_sssd()
+    subprocess.call(["sss_cache", "-u", "user1"])
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    subprocess.call(["sss_cache", "-U"])
+    stop_sssd()
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    stop_sssd()
+    subprocess.call(["sss_cache", "-U"])
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    subprocess.call(["sss_cache", "-g", "group1"])
+    stop_sssd()
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    stop_sssd()
+    subprocess.call(["sss_cache", "-g", "group1"])
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    subprocess.call(["sss_cache", "-G"])
+    stop_sssd()
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    stop_sssd()
+    subprocess.call(["sss_cache", "-G"])
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    subprocess.call(["sss_cache", "-E"])
+    stop_sssd()
+
+    assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307):
+    # initialize cache with full ID
+    (res, errno, _) = sssd_id.get_user_groups("user1")
+    assert res == sssd_id.NssReturnCode.SUCCESS, \
+        "Could not find groups for user1 %s, %d" % errno
+    assert_mc_records_for_user1()
+
+    stop_sssd()
+    subprocess.call(["sss_cache", "-E"])
+
+    assert_missing_mc_records_for_user1()
-- 
2.4.3