From aae797abf3bfcfda124f111d8b4e805e77bee691 Mon Sep 17 00:00:00 2001
From: Lukas Slebodnik <lslebodn@redhat.com>
Date: Tue, 4 Aug 2015 12:47:58 +0200
Subject: [PATCH 47/47] test_memory_cache: Test invalidation with sss_cache
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Reviewed-by: Michal Židek <mzidek@redhat.com>
---
src/tests/intg/test_memory_cache.py | 176 ++++++++++++++++++++++++++++++++++++
1 file changed, 176 insertions(+)
diff --git a/src/tests/intg/test_memory_cache.py b/src/tests/intg/test_memory_cache.py
index c809a4b6daacfd04834db46d21bfb97ad025ada6..1fd577e652d278c35211b55c871797a3dee98b13 100644
--- a/src/tests/intg/test_memory_cache.py
+++ b/src/tests/intg/test_memory_cache.py
@@ -20,6 +20,7 @@ import os
import stat
import ent
import grp
+import pwd
import config
import signal
import subprocess
@@ -570,3 +571,178 @@ def test_initgroups_without_change_in_membership(ldap_conn, sanity_rfc2307):
# everything should be in memory cache
run_simple_test_with_initgroups()
+
+
+def assert_mc_records_for_user1():
+ ent.assert_passwd_by_name(
+ 'user1',
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+ ent.assert_passwd_by_uid(
+ 1001,
+ dict(name='user1', passwd='*', uid=1001, gid=2001,
+ gecos='1001', shell='/bin/bash'))
+
+ ent.assert_group_by_name(
+ "group1",
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_gid(
+ 2001,
+ dict(mem=ent.contains_only("user1", "user11", "user21")))
+ ent.assert_group_by_name(
+ "group0x",
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+ ent.assert_group_by_gid(
+ 2000,
+ dict(mem=ent.contains_only("user1", "user2", "user3")))
+
+ assert_initgroups_equal("user1", 2001, [2000, 2001])
+
+
+def assert_missing_mc_records_for_user1():
+ with pytest.raises(KeyError):
+ pwd.getpwnam("user1")
+ with pytest.raises(KeyError):
+ pwd.getpwuid(1001)
+
+ for gid in [2000, 2001]:
+ with pytest.raises(KeyError):
+ grp.getgrgid(gid)
+ for group in ["group0x", "group1"]:
+ with pytest.raises(KeyError):
+ grp.getgrnam(group)
+
+ (res, err, _) = sssd_id.call_sssd_initgroups("user1", 2001)
+ assert res == sssd_id.NssReturnCode.UNAVAIL, \
+ "Initgroups should not find anything after invalidation of mc.\n" \
+ "User %s, errno:%d" % (user, err)
+
+
+def test_invalidate_user_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-u", "user1"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_user_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-u", "user1"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_users_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-U"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_users_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-U"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_group_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-g", "group1"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_group_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-g", "group1"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_groups_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-G"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_groups_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-G"])
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_everything_before_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ subprocess.call(["sss_cache", "-E"])
+ stop_sssd()
+
+ assert_missing_mc_records_for_user1()
+
+
+def test_invalidate_everything_after_stop(ldap_conn, sanity_rfc2307):
+ # initialize cache with full ID
+ (res, errno, _) = sssd_id.get_user_groups("user1")
+ assert res == sssd_id.NssReturnCode.SUCCESS, \
+ "Could not find groups for user1 %s, %d" % errno
+ assert_mc_records_for_user1()
+
+ stop_sssd()
+ subprocess.call(["sss_cache", "-E"])
+
+ assert_missing_mc_records_for_user1()
--
2.4.3