Blame SOURCES/0004-Vendor-custodia.ipa.patch

af3cbb
From a38601968ccd8c8dfdce60c8d66b220eefb344b0 Mon Sep 17 00:00:00 2001
af3cbb
From: Christian Heimes <cheimes@redhat.com>
af3cbb
Date: Tue, 28 Mar 2017 18:41:08 +0200
af3cbb
Subject: [PATCH 4/4] Vendor custodia.ipa
af3cbb
af3cbb
---
af3cbb
 README.custodia.ipa      | 137 ++++++++++++++++++++++
af3cbb
 custodia/ipa/__init__.py |   1 +
af3cbb
 custodia/ipa/vault.py    | 291 +++++++++++++++++++++++++++++++++++++++++++++++
af3cbb
 setup.py                 |   8 +-
af3cbb
 tests/test_ipa.py        | 195 +++++++++++++++++++++++++++++++
af3cbb
 6 files changed, 638 insertions(+), 2 deletions(-)
af3cbb
 create mode 100644 README.custodia.ipa
af3cbb
 create mode 100644 custodia/ipa/__init__.py
af3cbb
 create mode 100644 custodia/ipa/vault.py
af3cbb
 create mode 100644 tests/test_ipa.py
af3cbb
af3cbb
diff --git a/README.custodia.ipa b/README.custodia.ipa
af3cbb
new file mode 100644
af3cbb
index 0000000..a952ef8
af3cbb
--- /dev/null
af3cbb
+++ b/README.custodia.ipa
af3cbb
@@ -0,0 +1,137 @@
af3cbb
+.. WARNING: AUTO-GENERATED FILE. DO NOT EDIT.
af3cbb
+
af3cbb
+custodia.ipa — IPA vault plugin for Custodia
af3cbb
+============================================
af3cbb
+
af3cbb
+**WARNING** *custodia.ipa is a tech preview with a provisional API.*
af3cbb
+
af3cbb
+custodia.ipa is a storage plugin for
af3cbb
+`Custodia <https://custodia.readthedocs.io/>`__. It provides integration
af3cbb
+with `FreeIPA <http://www.freeipa.org>`__'s
af3cbb
+`vault <https://www.freeipa.org/page/V4/Password_Vault>`__ facility.
af3cbb
+Secrets are encrypted and stored in
af3cbb
+`Dogtag <http://www.dogtagpki.org>`__'s Key Recovery Agent.
af3cbb
+
af3cbb
+Requirements
af3cbb
+------------
af3cbb
+
af3cbb
+Installation
af3cbb
+~~~~~~~~~~~~
af3cbb
+
af3cbb
+-  pip
af3cbb
+-  setuptools >= 18.0
af3cbb
+
af3cbb
+Runtime
af3cbb
+~~~~~~~
af3cbb
+
af3cbb
+-  custodia >= 0.3.1
af3cbb
+-  ipalib >= 4.5.0
af3cbb
+-  ipaclient >= 4.5.0
af3cbb
+-  Python 2.7 (Python 3 support in IPA vault is unstable.)
af3cbb
+
af3cbb
+custodia.ipa requires an IPA-enrolled host and a Kerberos TGT for
af3cbb
+authentication. It is recommended to provide credentials with a keytab
af3cbb
+file or GSS-Proxy.
af3cbb
+
af3cbb
+Testing and development
af3cbb
+~~~~~~~~~~~~~~~~~~~~~~~
af3cbb
+
af3cbb
+-  wheel
af3cbb
+-  tox
af3cbb
+
af3cbb
+virtualenv requirements
af3cbb
+~~~~~~~~~~~~~~~~~~~~~~~
af3cbb
+
af3cbb
+custodia.ipa depends on several binary extensions and shared libraries
af3cbb
+for e.g. python-cryptography, python-gssapi, python-ldap, and
af3cbb
+python-nss. For installation in a virtual environment, a C compiler and
af3cbb
+several development packages are required.
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    $ virtualenv venv
af3cbb
+    $ venv/bin/pip install --upgrade custodia.ipa
af3cbb
+
af3cbb
+Fedora
af3cbb
+^^^^^^
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    $ sudo dnf install python2 python-pip python-virtualenv python-devel \
af3cbb
+        gcc redhat-rpm-config krb5-workstation krb5-devel libffi-devel \
af3cbb
+        nss-devel openldap-devel cyrus-sasl-devel openssl-devel
af3cbb
+
af3cbb
+Debian / Ubuntu
af3cbb
+^^^^^^^^^^^^^^^
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    $ sudo apt-get update
af3cbb
+    $ sudo apt-get install -y python2.7 python-pip python-virtualenv python-dev \
af3cbb
+        gcc krb5-user libkrb5-dev libffi-dev libnss3-dev libldap2-dev \
af3cbb
+        libsasl2-dev libssl-dev
af3cbb
+
af3cbb
+--------------
af3cbb
+
af3cbb
+Example configuration
af3cbb
+---------------------
af3cbb
+
af3cbb
+Create directories
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    $ sudo mkdir /etc/custodia /var/lib/custodia /var/log/custodia /var/run/custodia
af3cbb
+    $ sudo chown USER:GROUP /var/lib/custodia /var/log/custodia /var/run/custodia
af3cbb
+    $ sudo chmod 750 /var/lib/custodia /var/log/custodia
af3cbb
+
af3cbb
+Create service account and keytab
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    $ kinit admin
af3cbb
+    $ ipa service-add custodia/client1.ipa.example
af3cbb
+    $ ipa service-allow-create-keytab custodia/client1.ipa.example --users=admin
af3cbb
+    $ mkdir -p /etc/custodia
af3cbb
+    $ ipa-getkeytab -p custodia/client1.ipa.example -k /etc/custodia/custodia.keytab
af3cbb
+
af3cbb
+Create ``/etc/custodia/custodia.conf``
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    [DEFAULT]
af3cbb
+    confdir = /etc/custodia
af3cbb
+    libdir = /var/lib/custodia
af3cbb
+    logdir = /var/log/custodia
af3cbb
+    rundir = /var/run/custodia
af3cbb
+
af3cbb
+    [global]
af3cbb
+    debug = true
af3cbb
+    server_socket = ${rundir}/custodia.sock
af3cbb
+    auditlog = ${logdir}/audit.log
af3cbb
+
af3cbb
+    [store:vault]
af3cbb
+    handler = IPAVault
af3cbb
+    keytab = {confdir}/custodia.keytab
af3cbb
+    ccache = FILE:{rundir}/ccache
af3cbb
+
af3cbb
+    [auth:creds]
af3cbb
+    handler = SimpleCredsAuth
af3cbb
+    uid = root
af3cbb
+    gid = root
af3cbb
+
af3cbb
+    [authz:paths]
af3cbb
+    handler = SimplePathAuthz
af3cbb
+    paths = /. /secrets
af3cbb
+
af3cbb
+    [/]
af3cbb
+    handler = Root
af3cbb
+
af3cbb
+    [/secrets]
af3cbb
+    handler = Secrets
af3cbb
+    store = vault
af3cbb
+
af3cbb
+Run Custodia server
af3cbb
+
af3cbb
+::
af3cbb
+
af3cbb
+    $ custodia /etc/custodia/custodia.conf
af3cbb
diff --git a/custodia/ipa/__init__.py b/custodia/ipa/__init__.py
af3cbb
new file mode 100644
af3cbb
index 0000000..ef1bb9d
af3cbb
--- /dev/null
af3cbb
+++ b/custodia/ipa/__init__.py
af3cbb
@@ -0,0 +1 @@
af3cbb
+# Copyright (C) 2016  Custodia Project Contributors - see LICENSE file
af3cbb
diff --git a/custodia/ipa/vault.py b/custodia/ipa/vault.py
af3cbb
new file mode 100644
af3cbb
index 0000000..f681c54
af3cbb
--- /dev/null
af3cbb
+++ b/custodia/ipa/vault.py
af3cbb
@@ -0,0 +1,291 @@
af3cbb
+# Copyright (C) 2016  Custodia Project Contributors - see LICENSE file
af3cbb
+"""FreeIPA vault store (PoC)
af3cbb
+"""
af3cbb
+import os
af3cbb
+
af3cbb
+import ipalib
af3cbb
+from ipalib.errors import DuplicateEntry, NotFound
af3cbb
+from ipalib.krb_utils import get_principal
af3cbb
+
af3cbb
+import six
af3cbb
+
af3cbb
+from custodia.plugin import CSStore, CSStoreError, CSStoreExists, PluginOption
af3cbb
+
af3cbb
+
af3cbb
+def krb5_unparse_principal_name(name):
af3cbb
+    """Split a Kerberos principal name into parts
af3cbb
+
af3cbb
+    Returns:
af3cbb
+       * ('host', hostname, realm) for a host principal
af3cbb
+       * (servicename, hostname, realm) for a service principal
af3cbb
+       * (None, username, realm) for a user principal
af3cbb
+
af3cbb
+    :param text name: Kerberos principal name
af3cbb
+    :return: (service, host, realm) or (None, username, realm)
af3cbb
+    """
af3cbb
+    prefix, realm = name.split(u'@')
af3cbb
+    if u'/' in prefix:
af3cbb
+        service, host = prefix.rsplit(u'/', 1)
af3cbb
+        return service, host, realm
af3cbb
+    else:
af3cbb
+        return None, prefix, realm
af3cbb
+
af3cbb
+
af3cbb
+class FreeIPA(object):
af3cbb
+    """FreeIPA wrapper
af3cbb
+
af3cbb
+    Custodia uses a forking server model. We can bootstrap FreeIPA API in
af3cbb
+    the main process. Connections must be created in the client process.
af3cbb
+    """
af3cbb
+    def __init__(self, api=None, ipa_context='cli', ipa_confdir=None,
af3cbb
+                 ipa_debug=False):
af3cbb
+        if api is None:
af3cbb
+            self._api = ipalib.api
af3cbb
+        else:
af3cbb
+            self._api = api
af3cbb
+        self._ipa_config = dict(
af3cbb
+            context=ipa_context,
af3cbb
+            debug=ipa_debug,
af3cbb
+            log=None,  # disable logging to file
af3cbb
+        )
af3cbb
+        if ipa_confdir is not None:
af3cbb
+            self._ipa_config['confdir'] = ipa_confdir
af3cbb
+        self._bootstrap()
af3cbb
+
af3cbb
+    @property
af3cbb
+    def Command(self):
af3cbb
+        return self._api.Command  # pylint: disable=no-member
af3cbb
+
af3cbb
+    @property
af3cbb
+    def env(self):
af3cbb
+        return self._api.env  # pylint: disable=no-member
af3cbb
+
af3cbb
+    def _bootstrap(self):
af3cbb
+        if not self._api.isdone('bootstrap'):
af3cbb
+            # TODO: bandaid for "A PKCS #11 module returned CKR_DEVICE_ERROR"
af3cbb
+            # https://github.com/avocado-framework/avocado/issues/1112#issuecomment-206999400
af3cbb
+            os.environ['NSS_STRICT_NOFORK'] = 'DISABLED'
af3cbb
+            self._api.bootstrap(**self._ipa_config)
af3cbb
+            self._api.finalize()
af3cbb
+
af3cbb
+    def __enter__(self):
af3cbb
+        # pylint: disable=no-member
af3cbb
+        if not self._api.Backend.rpcclient.isconnected():
af3cbb
+            self._api.Backend.rpcclient.connect()
af3cbb
+        # pylint: enable=no-member
af3cbb
+        return self
af3cbb
+
af3cbb
+    def __exit__(self, exc_type, exc_val, exc_tb):
af3cbb
+        # pylint: disable=no-member
af3cbb
+        if self._api.Backend.rpcclient.isconnected():
af3cbb
+            self._api.Backend.rpcclient.disconnect()
af3cbb
+        # pylint: enable=no-member
af3cbb
+
af3cbb
+
af3cbb
+class IPAVault(CSStore):
af3cbb
+    # vault arguments
af3cbb
+    principal = PluginOption(
af3cbb
+        str, None,
af3cbb
+        "Service principal for service vault (auto-discovered from GSSAPI)"
af3cbb
+    )
af3cbb
+    user = PluginOption(
af3cbb
+        str, None,
af3cbb
+        "User name for user vault (auto-discovered from GSSAPI)"
af3cbb
+    )
af3cbb
+    vault_type = PluginOption(
af3cbb
+        str, None,
af3cbb
+        "vault type, one of 'user', 'service', 'shared', or "
af3cbb
+        "auto-discovered from GSSAPI"
af3cbb
+    )
af3cbb
+
af3cbb
+    # Kerberos flags
af3cbb
+    krb5config = PluginOption(str, None, "Kerberos krb5.conf override")
af3cbb
+    keytab = PluginOption(str, None, "Kerberos keytab for auth")
af3cbb
+    ccache = PluginOption(
af3cbb
+        str, None, "Kerberos ccache, e,g. FILE:/path/to/ccache")
af3cbb
+
af3cbb
+    # ipalib.api arguments
af3cbb
+    ipa_confdir = PluginOption(str, None, "IPA confdir override")
af3cbb
+    ipa_context = PluginOption(str, "cli", "IPA bootstrap context")
af3cbb
+    ipa_debug = PluginOption(bool, False, "debug mode for ipalib")
af3cbb
+
af3cbb
+    def __init__(self, config, section=None):
af3cbb
+        super(IPAVault, self).__init__(config, section)
af3cbb
+        # configure Kerberos / GSSAPI and acquire principal
af3cbb
+        gssapi_principal = self._gssapi()
af3cbb
+        self.logger.info(u"Vault uses Kerberos principal '%s'",
af3cbb
+                         gssapi_principal)
af3cbb
+
af3cbb
+        # bootstrap FreeIPA API
af3cbb
+        self.ipa = FreeIPA(
af3cbb
+            ipa_confdir=self.ipa_confdir,
af3cbb
+            ipa_debug=self.ipa_debug,
af3cbb
+            ipa_context=self.ipa_context,
af3cbb
+        )
af3cbb
+        # connect
af3cbb
+        with self.ipa:
af3cbb
+            self.logger.info("IPA server '%s': %s",
af3cbb
+                             self.ipa.env.server,
af3cbb
+                             self.ipa.Command.ping()[u'summary'])
af3cbb
+            # retrieve and cache KRA transport cert
af3cbb
+            response = self.ipa.Command.vaultconfig_show()
af3cbb
+            servers = response[u'result'][u'kra_server_server']
af3cbb
+            self.logger.info("KRA server(s) %s", ', '.join(servers))
af3cbb
+
af3cbb
+        service, user_host, realm = krb5_unparse_principal_name(
af3cbb
+            gssapi_principal)
af3cbb
+        self._init_vault_args(service, user_host, realm)
af3cbb
+
af3cbb
+    def _gssapi(self):
af3cbb
+        # set client keytab env var for authentication
af3cbb
+        if self.keytab is not None:
af3cbb
+            os.environ['KRB5_CLIENT_KTNAME'] = self.keytab
af3cbb
+        if self.ccache is not None:
af3cbb
+            os.environ['KRB5CCNAME'] = self.ccache
af3cbb
+        if self.krb5config is not None:
af3cbb
+            os.environ['KRB5_CONFIG'] = self.krb5config
af3cbb
+        try:
af3cbb
+            return get_principal()
af3cbb
+        except Exception:
af3cbb
+            self.logger.error(
af3cbb
+                "Unable to get principal from GSSAPI. Are you missing a "
af3cbb
+                "TGT or valid Kerberos keytab?"
af3cbb
+            )
af3cbb
+            raise
af3cbb
+
af3cbb
+    def _init_vault_args(self, service, user_host, realm):
af3cbb
+        if self.vault_type is None:
af3cbb
+            self.vault_type = 'user' if service is None else 'service'
af3cbb
+            self.logger.info("Setting vault type to '%s' from Kerberos",
af3cbb
+                             self.vault_type)
af3cbb
+
af3cbb
+        if self.vault_type == 'shared':
af3cbb
+            self._vault_args = {'shared': True}
af3cbb
+        elif self.vault_type == 'user':
af3cbb
+            if self.user is None:
af3cbb
+                if service is not None:
af3cbb
+                    msg = "{!r}: User vault requires 'user' parameter"
af3cbb
+                    raise ValueError(msg.format(self))
af3cbb
+                else:
af3cbb
+                    self.user = user_host
af3cbb
+                    self.logger.info(u"Setting username '%s' from Kerberos",
af3cbb
+                                     self.user)
af3cbb
+            if six.PY2 and isinstance(self.user, str):
af3cbb
+                self.user = self.user.decode('utf-8')
af3cbb
+            self._vault_args = {'username': self.user}
af3cbb
+        elif self.vault_type == 'service':
af3cbb
+            if self.principal is None:
af3cbb
+                if service is None:
af3cbb
+                    msg = "{!r}: Service vault requires 'principal' parameter"
af3cbb
+                    raise ValueError(msg.format(self))
af3cbb
+                else:
af3cbb
+                    self.principal = u'/'.join((service, user_host))
af3cbb
+                    self.logger.info(u"Setting principal '%s' from Kerberos",
af3cbb
+                                     self.principal)
af3cbb
+            if six.PY2 and isinstance(self.principal, str):
af3cbb
+                self.principal = self.principal.decode('utf-8')
af3cbb
+            self._vault_args = {'service': self.principal}
af3cbb
+        else:
af3cbb
+            msg = '{!r}: Invalid vault type {}'
af3cbb
+            raise ValueError(msg.format(self, self.vault_type))
af3cbb
+
af3cbb
+    def _mangle_key(self, key):
af3cbb
+        if '__' in key:
af3cbb
+            raise ValueError
af3cbb
+        key = key.replace('/', '__')
af3cbb
+        if isinstance(key, bytes):
af3cbb
+            key = key.decode('utf-8')
af3cbb
+        return key
af3cbb
+
af3cbb
+    def get(self, key):
af3cbb
+        key = self._mangle_key(key)
af3cbb
+        with self.ipa as ipa:
af3cbb
+            try:
af3cbb
+                result = ipa.Command.vault_retrieve(
af3cbb
+                    key, **self._vault_args)
af3cbb
+            except NotFound as e:
af3cbb
+                self.logger.info(str(e))
af3cbb
+                return None
af3cbb
+            except Exception:
af3cbb
+                msg = "Failed to retrieve entry {}".format(key)
af3cbb
+                self.logger.exception(msg)
af3cbb
+                raise CSStoreError(msg)
af3cbb
+            else:
af3cbb
+                return result[u'result'][u'data']
af3cbb
+
af3cbb
+    def set(self, key, value, replace=False):
af3cbb
+        key = self._mangle_key(key)
af3cbb
+        if not isinstance(value, bytes):
af3cbb
+            value = value.encode('utf-8')
af3cbb
+        with self.ipa as ipa:
af3cbb
+            try:
af3cbb
+                ipa.Command.vault_add(
af3cbb
+                    key, ipavaulttype=u"standard", **self._vault_args)
af3cbb
+            except DuplicateEntry:
af3cbb
+                if not replace:
af3cbb
+                    raise CSStoreExists(key)
af3cbb
+            except Exception:
af3cbb
+                msg = "Failed to add entry {}".format(key)
af3cbb
+                self.logger.exception(msg)
af3cbb
+                raise CSStoreError(msg)
af3cbb
+            try:
af3cbb
+                ipa.Command.vault_archive(
af3cbb
+                    key, data=value, **self._vault_args)
af3cbb
+            except Exception:
af3cbb
+                msg = "Failed to archive entry {}".format(key)
af3cbb
+                self.logger.exception(msg)
af3cbb
+                raise CSStoreError(msg)
af3cbb
+
af3cbb
+    def span(self, key):
af3cbb
+        raise CSStoreError("span is not implemented")
af3cbb
+
af3cbb
+    def list(self, keyfilter=None):
af3cbb
+        with self.ipa as ipa:
af3cbb
+            try:
af3cbb
+                result = ipa.Command.vault_find(
af3cbb
+                    ipavaulttype=u"standard", **self._vault_args)
af3cbb
+            except Exception:
af3cbb
+                msg = "Failed to list entries"
af3cbb
+                self.logger.exception(msg)
af3cbb
+                raise CSStoreError(msg)
af3cbb
+
af3cbb
+        names = []
af3cbb
+        for entry in result[u'result']:
af3cbb
+            cn = entry[u'cn'][0]
af3cbb
+            key = cn.replace('__', '/')
af3cbb
+            if keyfilter is not None and not key.startswith(keyfilter):
af3cbb
+                continue
af3cbb
+            names.append(key.rsplit('/', 1)[-1])
af3cbb
+        return names
af3cbb
+
af3cbb
+    def cut(self, key):
af3cbb
+        key = self._mangle_key(key)
af3cbb
+        with self.ipa as ipa:
af3cbb
+            try:
af3cbb
+                ipa.Command.vault_del(key, **self._vault_args)
af3cbb
+            except NotFound:
af3cbb
+                return False
af3cbb
+            except Exception:
af3cbb
+                msg = "Failed to delete entry {}".format(key)
af3cbb
+                self.logger.exception(msg)
af3cbb
+                raise CSStoreError(msg)
af3cbb
+            else:
af3cbb
+                return True
af3cbb
+
af3cbb
+
af3cbb
+if __name__ == '__main__':
af3cbb
+    from custodia.compat import configparser
af3cbb
+
af3cbb
+    parser = configparser.ConfigParser(
af3cbb
+        interpolation=configparser.ExtendedInterpolation()
af3cbb
+    )
af3cbb
+    parser.read_string(u"""
af3cbb
+    [store:ipa_vault]
af3cbb
+    """)
af3cbb
+
af3cbb
+    v = IPAVault(parser, "store:ipa_vault")
af3cbb
+    v.set('foo', 'bar', replace=True)
af3cbb
+    print(v.get('foo'))
af3cbb
+    print(v.list())
af3cbb
+    v.cut('foo')
af3cbb
+    print(v.list())
af3cbb
diff --git a/setup.py b/setup.py
af3cbb
index c8f270d..4bf096c 100755
af3cbb
--- a/setup.py
af3cbb
+++ b/setup.py
af3cbb
@@ -15,10 +15,14 @@ requirements = [
af3cbb
     'requests'
af3cbb
 ]
af3cbb
 
af3cbb
+# extra requirements
af3cbb
+ipa_requires = ['ipalib >= 4.5.0', 'ipaclient >= 4.5.0']
af3cbb
+
af3cbb
 # test requirements
af3cbb
-test_requires = ['coverage', 'pytest']
af3cbb
+test_requires = ['coverage', 'pytest', 'mock'] + ipa_requires
af3cbb
 
af3cbb
 extras_require = {
af3cbb
+    'ipa': ipa_requires,
af3cbb
     'test': test_requires,
af3cbb
     'test_docs': ['docutils', 'markdown'],
af3cbb
     'test_pep8': ['flake8', 'flake8-import-order', 'pep8-naming'],
af3cbb
@@ -66,6 +70,7 @@ custodia_consumers = [
af3cbb
 custodia_stores = [
af3cbb
     'EncryptedOverlay = custodia.store.encgen:EncryptedOverlay',
af3cbb
     'EncryptedStore = custodia.store.enclite:EncryptedStore',
af3cbb
+    'IPAVault = custodia.ipa.vault:IPAVault',
af3cbb
     'SqliteStore = custodia.store.sqlite:SqliteStore',
af3cbb
 ]
af3cbb
 
af3cbb
@@ -84,6 +89,7 @@ setup(
af3cbb
         'custodia',
af3cbb
         'custodia.cli',
af3cbb
         'custodia.httpd',
af3cbb
+        'custodia.ipa',
af3cbb
         'custodia.message',
af3cbb
         'custodia.server',
af3cbb
         'custodia.store',
af3cbb
diff --git a/tests/test_ipa.py b/tests/test_ipa.py
af3cbb
new file mode 100644
af3cbb
index 0000000..eb4d7fa
af3cbb
--- /dev/null
af3cbb
+++ b/tests/test_ipa.py
af3cbb
@@ -0,0 +1,195 @@
af3cbb
+# Copyright (C) 2017  Custodia project Contributors, for licensee see COPYING
af3cbb
+
af3cbb
+import os
af3cbb
+
af3cbb
+import ipalib
af3cbb
+
af3cbb
+import mock
af3cbb
+
af3cbb
+import pytest
af3cbb
+
af3cbb
+from custodia.compat import configparser
af3cbb
+from custodia.ipa.vault import FreeIPA, IPAVault, krb5_unparse_principal_name
af3cbb
+
af3cbb
+
af3cbb
+CONFIG = u"""
af3cbb
+[store:ipa_service]
af3cbb
+vault_type = service
af3cbb
+principal = custodia/ipa.example
af3cbb
+
af3cbb
+[store:ipa_user]
af3cbb
+vault_type = user
af3cbb
+user = john
af3cbb
+
af3cbb
+[store:ipa_shared]
af3cbb
+vault_type = shared
af3cbb
+
af3cbb
+[store:ipa_invalid]
af3cbb
+vault_type = invalid
af3cbb
+
af3cbb
+[store:ipa_autodiscover]
af3cbb
+
af3cbb
+[store:ipa_environ]
af3cbb
+krb5config = /path/to/krb5.conf
af3cbb
+keytab = /path/to/custodia.keytab
af3cbb
+ccache = FILE:/path/to/ccache
af3cbb
+"""
af3cbb
+
af3cbb
+vault_parametrize = pytest.mark.parametrize(
af3cbb
+    'plugin,vault_type,vault_args',
af3cbb
+    [
af3cbb
+        ('store:ipa_service', 'service', {'service': 'custodia/ipa.example'}),
af3cbb
+        ('store:ipa_user', 'user', {'username': 'john'}),
af3cbb
+        ('store:ipa_shared', 'shared', {'shared': True}),
af3cbb
+    ]
af3cbb
+)
af3cbb
+
af3cbb
+
af3cbb
+class TestCustodiaIPA:
af3cbb
+    def setup_method(self, method):
af3cbb
+        self.parser = configparser.ConfigParser(
af3cbb
+            interpolation=configparser.ExtendedInterpolation(),
af3cbb
+        )
af3cbb
+        self.parser.read_string(CONFIG)
af3cbb
+        # mocked ipalib.api
af3cbb
+        self.p_api = mock.patch('ipalib.api', autospec=ipalib.api)
af3cbb
+        self.m_api = self.p_api.start()
af3cbb
+        self.m_api.isdone.return_value = False
af3cbb
+        self.m_api.env = mock.Mock()
af3cbb
+        self.m_api.env.server = 'server.ipa.example'
af3cbb
+        self.m_api.Backend = mock.Mock()
af3cbb
+        self.m_api.Command = mock.Mock()
af3cbb
+        self.m_api.Command.ping.return_value = {
af3cbb
+            u'summary': u'IPA server version 4.4.3. API version 2.215',
af3cbb
+        }
af3cbb
+        self.m_api.Command.vaultconfig_show.return_value = {
af3cbb
+            u'result': {
af3cbb
+                u'kra_server_server': [u'ipa.example'],
af3cbb
+            }
af3cbb
+        }
af3cbb
+        # mocked get_principal
af3cbb
+        self.p_get_principal = mock.patch('custodia.ipa.vault.get_principal')
af3cbb
+        self.m_get_principal = self.p_get_principal.start()
af3cbb
+        self.m_get_principal.return_value = 'custodia/ipa.example@IPA.EXAMPLE'
af3cbb
+        # mocked environ (empty dict)
af3cbb
+        self.p_env = mock.patch.dict('os.environ', clear=True)
af3cbb
+        self.p_env.start()
af3cbb
+
af3cbb
+    def teardown_method(self, method):
af3cbb
+        self.p_api.stop()
af3cbb
+        self.p_get_principal.stop()
af3cbb
+        self.p_env.stop()
af3cbb
+
af3cbb
+    def test_api_init(self):
af3cbb
+        m_api = self.m_api
af3cbb
+        freeipa = FreeIPA(api=m_api)
af3cbb
+        m_api.isdone.assert_called_once_with('bootstrap')
af3cbb
+        m_api.bootstrap.assert_called_once_with(
af3cbb
+            context='cli',
af3cbb
+            debug=False,
af3cbb
+            log=None,
af3cbb
+        )
af3cbb
+
af3cbb
+        m_api.Backend.rpcclient.isconnected.return_value = False
af3cbb
+        with freeipa:
af3cbb
+            m_api.Backend.rpcclient.connect.assert_called_once()
af3cbb
+            m_api.Backend.rpcclient.isconnected.return_value = True
af3cbb
+        m_api.Backend.rpcclient.disconnect.assert_called_once()
af3cbb
+
af3cbb
+        assert os.environ == dict(NSS_STRICT_NOFORK='DISABLED')
af3cbb
+
af3cbb
+    def test_api_environ(self):
af3cbb
+        assert os.environ == {}
af3cbb
+        IPAVault(self.parser, 'store:ipa_environ')
af3cbb
+        assert os.environ == dict(
af3cbb
+            NSS_STRICT_NOFORK='DISABLED',
af3cbb
+            KRB5_CONFIG='/path/to/krb5.conf',
af3cbb
+            KRB5_CLIENT_KTNAME='/path/to/custodia.keytab',
af3cbb
+            KRB5CCNAME='FILE:/path/to/ccache',
af3cbb
+        )
af3cbb
+
af3cbb
+    def test_invalid_vault_type(self):
af3cbb
+        pytest.raises(ValueError, IPAVault, self.parser, 'store:ipa_invalid')
af3cbb
+
af3cbb
+    def test_vault_autodiscover_service(self):
af3cbb
+        self.m_get_principal.return_value = 'custodia/ipa.example@IPA.EXAMPLE'
af3cbb
+        ipa = IPAVault(self.parser, 'store:ipa_autodiscover')
af3cbb
+        assert ipa.vault_type == 'service'
af3cbb
+        assert ipa.principal == 'custodia/ipa.example'
af3cbb
+        assert ipa.user is None
af3cbb
+
af3cbb
+    def test_vault_autodiscover_user(self):
af3cbb
+        self.m_get_principal.return_value = 'john@IPA.EXAMPLE'
af3cbb
+        ipa = IPAVault(self.parser, 'store:ipa_autodiscover')
af3cbb
+        assert ipa.vault_type == 'user'
af3cbb
+        assert ipa.principal is None
af3cbb
+        assert ipa.user == 'john'
af3cbb
+
af3cbb
+    @vault_parametrize
af3cbb
+    def test_vault_set(self, plugin, vault_type, vault_args):
af3cbb
+        ipa = IPAVault(self.parser, plugin)
af3cbb
+        assert ipa.vault_type == vault_type
af3cbb
+        self.m_api.Command.ping.assert_called_once()
af3cbb
+        ipa.set('directory/testkey', 'testvalue')
af3cbb
+        self.m_api.Command.vault_add.assert_called_once_with(
af3cbb
+            'directory__testkey',
af3cbb
+            ipavaulttype=u'standard',
af3cbb
+            **vault_args
af3cbb
+        )
af3cbb
+        self.m_api.Command.vault_archive.assert_called_once_with(
af3cbb
+            'directory__testkey',
af3cbb
+            data=b'testvalue',
af3cbb
+            **vault_args
af3cbb
+        )
af3cbb
+
af3cbb
+    @vault_parametrize
af3cbb
+    def test_vault_get(self, plugin, vault_type, vault_args):
af3cbb
+        ipa = IPAVault(self.parser, plugin)
af3cbb
+        assert ipa.vault_type == vault_type
af3cbb
+        self.m_api.Command.vault_retrieve.return_value = {
af3cbb
+            u'result': {
af3cbb
+                u'data': b'testvalue',
af3cbb
+            }
af3cbb
+        }
af3cbb
+        assert ipa.get('directory/testkey') == b'testvalue'
af3cbb
+        self.m_api.Command.vault_retrieve.assert_called_once_with(
af3cbb
+            'directory__testkey',
af3cbb
+            **vault_args
af3cbb
+        )
af3cbb
+
af3cbb
+    @vault_parametrize
af3cbb
+    def test_vault_list(self, plugin, vault_type, vault_args):
af3cbb
+        ipa = IPAVault(self.parser, plugin)
af3cbb
+        assert ipa.vault_type == vault_type
af3cbb
+        self.m_api.Command.vault_find.return_value = {
af3cbb
+            u'result': [{'cn': [u'directory__testkey']}]
af3cbb
+        }
af3cbb
+        assert ipa.list('directory') == ['testkey']
af3cbb
+        self.m_api.Command.vault_find.assert_called_once_with(
af3cbb
+            ipavaulttype=u'standard',
af3cbb
+            **vault_args
af3cbb
+        )
af3cbb
+
af3cbb
+    @vault_parametrize
af3cbb
+    def test_vault_cut(self, plugin, vault_type, vault_args):
af3cbb
+        ipa = IPAVault(self.parser, plugin)
af3cbb
+        assert ipa.vault_type == vault_type
af3cbb
+        ipa.cut('directory/testkey')
af3cbb
+        self.m_api.Command.vault_del.assert_called_once_with(
af3cbb
+            'directory__testkey',
af3cbb
+            **vault_args
af3cbb
+        )
af3cbb
+
af3cbb
+
af3cbb
+@pytest.mark.parametrize('principal,result', [
af3cbb
+    ('john@IPA.EXAMPLE',
af3cbb
+     (None, 'john', 'IPA.EXAMPLE')),
af3cbb
+    ('host/host.invalid@IPA.EXAMPLE',
af3cbb
+     ('host', 'host.invalid', 'IPA.EXAMPLE')),
af3cbb
+    ('custodia/host.invalid@IPA.EXAMPLE',
af3cbb
+     ('custodia', 'host.invalid', 'IPA.EXAMPLE')),
af3cbb
+    ('whatever/custodia/host.invalid@IPA.EXAMPLE',
af3cbb
+     ('whatever/custodia', 'host.invalid', 'IPA.EXAMPLE')),
af3cbb
+])
af3cbb
+def test_unparse(principal, result):
af3cbb
+    assert krb5_unparse_principal_name(principal) == result
af3cbb
-- 
af3cbb
2.9.3
af3cbb