neil / rpms / udisks2

Forked from rpms/udisks2 a year ago
Clone
Blob Blame History Raw
From dd6ef8393a8f27fefad66cce136e52f13350f0df Mon Sep 17 00:00:00 2001
From: Tomas Bzatek <tbzatek@redhat.com>
Date: Fri, 1 Oct 2021 18:23:10 +0200
Subject: [PATCH] tests: Add LVM2 teardown object existence checks

This adds a check for the created stack objects presence after the teardown.

Due to the nature of the global lvm2 module update some objects may still
hang around after the org.freedesktop.UDisks2.VolumeGroup.Delete(options='tear-down')
method call has returned. Until this is properly fixed an explicit timeout
had to be added.
---
 src/tests/dbus-tests/test_20_LVM.py | 80 +++++++++++++++++------------
 1 file changed, 46 insertions(+), 34 deletions(-)

diff --git a/src/tests/dbus-tests/test_20_LVM.py b/src/tests/dbus-tests/test_20_LVM.py
index 56915e580..7fbff0434 100644
--- a/src/tests/dbus-tests/test_20_LVM.py
+++ b/src/tests/dbus-tests/test_20_LVM.py
@@ -3,6 +3,7 @@
 import re
 import time
 import unittest
+import six
 
 from distutils.version import LooseVersion
 
@@ -637,7 +638,8 @@ def _remove_luks(self, device, name, close=True):
                 device.Lock(self.no_options, dbus_interface=self.iface_prefix + '.Encrypted')
             except dbus.exceptions.DBusException as e:
                 # ignore when luks is actually already locked
-                if not str(e).endswith('is not unlocked') and not 'No such interface' in str(e):
+                if not str(e).endswith('is not unlocked') and not 'No such interface' in str(e) and \
+                   not 'Object does not exist at path' in str(e):
                     raise e
 
         try:
@@ -645,7 +647,7 @@ def _remove_luks(self, device, name, close=True):
             d['erase'] = True
             device.Format('empty', d, dbus_interface=self.iface_prefix + '.Block')
         except dbus.exceptions.DBusException as e:
-            if not 'No such interface' in str(e):
+            if not 'No such interface' in str(e) and not 'Object does not exist at path' in str(e):
                 raise e
 
     def _init_stack(self, name):
@@ -663,18 +665,19 @@ def _init_stack(self, name):
         self.assertIsNotNone(self.pv)
 
         self.vg = self._create_vg(vgname, dbus.Array([self.pv]))
+        self.vg_path = self.vg.object_path
         self.addCleanup(self._remove_vg, self.vg, tear_down=True, ignore_removed=True)
 
         # create an LV on it
-        lv_path = self.vg.CreatePlainVolume(lvname, dbus.UInt64(200 * 1024**2), self.no_options,
-                                            dbus_interface=self.iface_prefix + '.VolumeGroup')
-        self.lv = self.bus.get_object(self.iface_prefix, lv_path)
+        self.lv_path = self.vg.CreatePlainVolume(lvname, dbus.UInt64(200 * 1024**2), self.no_options,
+                                                 dbus_interface=self.iface_prefix + '.VolumeGroup')
+        self.lv = self.bus.get_object(self.iface_prefix, self.lv_path)
         self.assertIsNotNone(self.lv)
 
-        lv_block_path = self.lv.Activate(self.no_options, dbus_interface=self.iface_prefix + '.LogicalVolume')
-        self.assertIsNotNone(lv_block_path)
+        self.lv_block_path = self.lv.Activate(self.no_options, dbus_interface=self.iface_prefix + '.LogicalVolume')
+        self.assertIsNotNone(self.lv_block_path)
 
-        self.lv_block = self.get_object(lv_block_path)
+        self.lv_block = self.get_object(self.lv_block_path)
         self.assertIsNotNone(self.lv_block)
 
         # create LUKS on the LV
@@ -702,10 +705,10 @@ def _init_stack(self, name):
         self.addCleanup(self._remove_luks, self.lv_block, vgname)
         self.luks_uuid = self.get_property_raw(self.lv_block, '.Block', 'IdUUID')
 
-        luks_block_path = self.get_property(self.lv_block, '.Encrypted', 'CleartextDevice')
-        self.luks_block = self.get_object(luks_block_path.value)
-        self.assertIsNotNone(self.luks_block)
-        self.fs_uuid = self.get_property_raw(self.luks_block, '.Block', 'IdUUID')
+        self.luks_block_path = self.get_property_raw(self.lv_block, '.Encrypted', 'CleartextDevice')
+        luks_block = self.get_object(self.luks_block_path)
+        self.assertIsNotNone(luks_block)
+        self.fs_uuid = self.get_property_raw(luks_block, '.Block', 'IdUUID')
 
         # check for present crypttab configuration item
         conf = self.get_property(self.lv_block, '.Block', 'Configuration')
@@ -713,7 +716,7 @@ def _init_stack(self, name):
         self.assertEqual(conf.value[0][0], 'crypttab')
 
         # check for present fstab configuration item on a cleartext block device
-        conf = self.get_property(self.luks_block, '.Block', 'Configuration')
+        conf = self.get_property(luks_block, '.Block', 'Configuration')
         conf.assertTrue()
         self.assertEqual(conf.value[0][0], 'fstab')
 
@@ -730,6 +733,32 @@ def _init_stack(self, name):
         self.assertIn(vgname, fstab)
         self.assertIn(self.fs_uuid, fstab)
 
+    def _check_torn_down_stack(self, name):
+        # check that all created objects don't exist anymore
+        msg = r'Object does not exist at path|No such interface'
+        with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
+            luks_block = self.get_object(self.luks_block_path)
+            self.get_property_raw(luks_block, '.Block', 'DeviceNumber')
+        with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
+            lv_block = self.get_object(self.lv_block_path)
+            self.get_property_raw(lv_block, '.Block', 'DeviceNumber')
+        with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
+            # the lvm2 udisks module is not fully synchronous, see https://github.com/storaged-project/udisks/pull/814
+            time.sleep(2)
+            lv = self.get_object(self.lv_path)
+            self.get_property_raw(lv, '.LogicalVolume', 'Name')
+        with six.assertRaisesRegex(self, dbus.exceptions.DBusException, msg):
+            vg = self.get_object(self.vg_path)
+            self.get_property_raw(vg, '.VolumeGroup', 'Name')
+
+        # check that fstab and crypttab records have been removed
+        crypttab = self.read_file('/etc/crypttab')
+        self.assertNotIn(name, crypttab)
+        self.assertNotIn(self.luks_uuid, crypttab)
+        fstab = self.read_file('/etc/fstab')
+        self.assertNotIn(name, fstab)
+        self.assertNotIn(self.fs_uuid, fstab)
+
 
     @udiskstestcase.tag_test(udiskstestcase.TestTags.UNSAFE)
     def test_teardown_active_vg_unlocked(self):
@@ -741,13 +770,7 @@ def test_teardown_active_vg_unlocked(self):
 
         self._remove_vg(self.vg, tear_down=True, ignore_removed=False)
 
-        # check that fstab and crypttab records have been removed
-        crypttab = self.read_file('/etc/crypttab')
-        self.assertNotIn(name, crypttab)
-        self.assertNotIn(self.luks_uuid, crypttab)
-        fstab = self.read_file('/etc/fstab')
-        self.assertNotIn(name, fstab)
-        self.assertNotIn(self.fs_uuid, fstab)
+        self._check_torn_down_stack(name)
 
     @udiskstestcase.tag_test(udiskstestcase.TestTags.UNSAFE)
     def test_teardown_active_vg_locked(self):
@@ -760,13 +783,7 @@ def test_teardown_active_vg_locked(self):
         self.lv_block.Lock(self.no_options, dbus_interface=self.iface_prefix + '.Encrypted')
         self._remove_vg(self.vg, tear_down=True, ignore_removed=False)
 
-        # check that fstab and crypttab records have been removed
-        crypttab = self.read_file('/etc/crypttab')
-        self.assertNotIn(name, crypttab)
-        self.assertNotIn(self.luks_uuid, crypttab)
-        fstab = self.read_file('/etc/fstab')
-        self.assertNotIn(name, fstab)
-        self.assertNotIn(self.fs_uuid, fstab)
+        self._check_torn_down_stack(name)
 
     @udiskstestcase.tag_test(udiskstestcase.TestTags.UNSAFE)
     def test_teardown_inactive_vg_locked(self):
@@ -780,13 +797,7 @@ def test_teardown_inactive_vg_locked(self):
         self.lv.Deactivate(self.no_options, dbus_interface=self.iface_prefix + '.LogicalVolume')
         self._remove_vg(self.vg, tear_down=True, ignore_removed=False)
 
-        # check that fstab and crypttab records have been removed
-        crypttab = self.read_file('/etc/crypttab')
-        self.assertNotIn(name, crypttab)
-        self.assertNotIn(self.luks_uuid, crypttab)
-        fstab = self.read_file('/etc/fstab')
-        self.assertNotIn(name, fstab)
-        self.assertNotIn(self.fs_uuid, fstab)
+        self._check_torn_down_stack(name)
 
     @udiskstestcase.tag_test(udiskstestcase.TestTags.UNSAFE)
     def test_reformat_inactive_vg_locked(self):
@@ -812,6 +823,7 @@ def test_reformat_inactive_vg_locked(self):
 
         # check that fstab and crypttab records have been removed
         # TODO: these checks are the opposite - record shouldn't be present, once this is fixed
+        # self._check_torn_down_stack(name)
         crypttab = self.read_file('/etc/crypttab')
         self.assertIn(name, crypttab)
         self.assertIn(self.luks_uuid, crypttab)