diff --git a/README.md b/README.md
index c16fee6..ebf8de1 100644
--- a/README.md
+++ b/README.md
@@ -28,7 +28,7 @@ that at least version 1.2 of NetworkManager's API is available. For
`initscripts`, it requires the legacy network service as commonly available on
Fedora/RHEL.
-For each host a list of networking profiles can be configure via the
+For each host a list of networking profiles can be configured via the
`network_connections` variable.
- For initscripts, profiles correspond to ifcfg files in `/etc/sysconfig/network-scripts/ifcfg-*`.
@@ -42,6 +42,24 @@ profile with a certain IP configuration without activating the profile. To
apply the configuration to the actual networking interface, a command like
`nmcli` needs to be used on the target system.
+### Warning
+
+The role updates or creates all connection profiles on the target system as
+specified in the `network_connections` variable. Therefore, the role will
+remove settings from the specified profiles if the settings are only present on
+the system but not in the `network_connections` variable. The following
+exceptions apply:
+
+* For profiles that only contain a `state` setting, the role will only activate
+ or deactivate the connection without changing its configuration.
+
+* The `route_append_only` setting allows to only add new routes to the
+ existing routes on the system.
+
+* The `rule_append_only` setting allows to preserve the current routing rules.
+ There is no support to specify routing rules at the moment.
+
+See also [Limitations](#limitations).
Variables
---------
@@ -64,18 +82,37 @@ for NetworkManager, a connection can only be active at one device at a time.
this role cannot handle a duplicate `name`. Specifying a `name` multiple
times refers to the same connection profile.
-* For initscripts, the name determines the ifcfg file name `/etc/sysconfig/network-scripts/-ifcfg-$NAME`.
+* For initscripts, the name determines the ifcfg file name `/etc/sysconfig/network-scripts/ifcfg-$NAME`.
Note that here too the name doesn't specify the `DEVICE` but a filename. As a consequence
`'/'` is not a valid character for the name.
-### `state`
+### `state` and `persistent_state`
+
+Each connection profile can have a runtime state, represented by the `state`
+setting and a persistent state, represtented by the `persistent_state` setting.
+
+The optional `state` setting supports the following values:
+
+- `up`
+- `down`
+
+It defines whether the profile is activated (`up`) or deactivated (`down`). If
+it is unset, the profile's runtime state will not be changed.
+
+The `persistent_state` setting is either `present` (default) or `absent`. If
+the `persistent_state` setting is `present` and the connection profile contains
+a `type` setting, the profile will be created or updated. If the profile is
+incomplete (lacks the `type` setting) and `persistent_state` is `present`,
+the behavior is undefined. The value `absent` makes the role ensure
+that the profile is not present on the target host.
+
#### Example
```yaml
network_connections:
- - name: "eth0"
- state: "absent"
+ - name: eth0
+ persistent_state: absent
```
Above example ensures the absence of a connection profile. If a profile with `name` `eth0`
@@ -90,45 +127,36 @@ exists, it will be deleted.
* For initscripts it results in the deletion of the ifcfg file. Usually that
has no side-effect, unless some component is watching the sysconfig directory.
-We already saw that state `absent` before. There are more states:
-
- - `absent`
- - `present`
- - `up`
- - `down`
-
-If the `state` variable is omitted, the default is `up` -- unless a `type` is specified,
-in which case the default is `present`.
-
#### Example
```yaml
network_connections:
- - name: "eth0"
- #state: present # default, as a type is present
- type: "ethernet"
+ - name: eth0
+ #persistent_state: present # default
+ type: ethernet
autoconnect: yes
- mac: "00:00:5e:00:53:5d"
+ mac: 00:00:5e:00:53:5d
ip:
dhcp4: yes
```
Above example creates a new connection profile or ensures that it is present
-with the given configuration. It has implicitly `state` `present`, due to the
-presence of `type`. On the other hand, the `present` state requires at least a `type`
-variable.
+with the given configuration. It implies the `persistent_state` setting to be
+`present`.
Valid values for `type` are:
+ - `bond`
+ - `bridge`
- `ethernet`
- `infiniband`
- - `bridge`
- - `bond`
+ - `macvlan`
- `team`
- `vlan`
-`state` `present` does not directly result in a change in the network configuration.
-That is, the profile is only created or modified, not activated.
+The value `present` for the `persistent_state` setting does not directly
+result in a change in the network configuration. That is, without `state`
+set to `up`, the profile is only created or modified, not activated.
- For NetworkManager, note the new connection profile is created with
`autoconnect` turned on by default. Thus, NetworkManager may very well decide
@@ -156,11 +184,11 @@ profile.
### `interface_name`
-For type `ethernet` and `infiniband`, this option restricts the profile to the
-given interface by name. This argument is optional and by default the profile
-name is used unless a mac address is specified using the `mac` key. Specifying
-an empty string (`""`) allows to specify that the profile is not restricted to
-a network interface.
+For the types `ethernet` and `infiniband`, this option restricts the profile to
+the given interface by name. This argument is optional and by default the
+profile name is used unless a mac address is specified using the `mac` key.
+Specifying an empty string (`""`) allows to specify that the profile is not
+restricted to a network interface.
**Note:** With [persistent interface naming](https://access.redhat.com/documentation/en-US/Red_Hat_Enterprise_Linux/7/html/Networking_Guide/ch-Consistent_Network_Device_Naming.html),
@@ -185,30 +213,27 @@ Slaves to bridge/bond/team devices cannot specify a zone.
```yaml
network_connections:
- - name: "eth0"
- #state: up # implicit default, as there is no type specified
- wait: 0
+ - name: eth0
+ state: up
```
-The above example defaults to `state=up` and requires an existing profile to activate.
-Note that if neither `type` nor `state` is specifed, `up` is implied. Thus in above
-example the `state` is redundant.
+The above example requires an existing profile to activate.
- For NetworkManager this results in `nmcli connection id {{name}} up`.
- For initscripts it is the same as `ifup {{name}}`.
-`up` also supports an optional integer argument `wait`. `wait=0` will only initiate
-the activation but not wait until the device is fully connected. Connection will complete
-in the background, for example after a DHCP lease was received.
-`wait: <SECONDS>` is a timeout for how long we give the device
-to activate. The default is `wait=-1` which uses a suitable timeout. Note that this
-argument only makes sense for NetworkManager.
+State `up` also supports an optional integer setting `wait`. `wait: 0` will
+only initiate the activation but not wait until the device is fully connected.
+Connection will complete in the background, for example after a DHCP lease was
+received. `wait: <SECONDS>` is a timeout for how long we give the device to
+activate. The default is using a suitable timeout. Note that this setting is
+only supported by NetworkManager.
**TODO** `wait` different from zero is not yet implemented.
-Note that `up` always re-activates the profile and possibly changes the networking
-configuration, even if the profile was already active before. As such, it always
-changes the system.
+Note that state `up` always re-activates the profile and possibly changes the
+networking configuration, even if the profile was already active before. As
+such, it always changes the system.
### `state: down`
@@ -218,7 +243,6 @@ changes the system.
network_connections:
- name: eth0
state: down
- wait: 0
```
Another `state` is `down`.
@@ -228,8 +252,8 @@ Another `state` is `down`.
- For initscripts this means to call `ifdown {{name}}`.
This is the opposite of the `up` state. It also will always issue the command
-to deactivate the profile, even it if seemingly is currently not active. As such,
-`down` always changes the system.
+to deactivate the profile, even it if seemingly is currently not active. As
+such, `down` always changes the system.
For NetworkManager, a `wait` argument is supported like for `up` state.
@@ -239,18 +263,19 @@ For NetworkManager, a `wait` argument is supported like for `up` state.
```yaml
network_connections:
- - name: "Wired0"
- type: "ethernet"
- interface_name: "eth0"
+ - name: Wired0
+ type: ethernet
+ interface_name: eth0
ip:
dhcp4: yes
- - name: "Wired0"
+ - name: Wired0
+ state: up
```
-As said, the `name` identifies a unique profile. However, you can refer to the same
-profile multiple times. Thus above example makes perfectly sense to create a profile and
-activate it within the same play.
+As said, the `name` identifies a unique profile. However, you can refer to the
+same profile multiple times. Therefore it is possible to create a profile and
+activate it separately.
### `ip`
@@ -258,8 +283,8 @@ The IP configuration supports the following options:
```yaml
network_connections:
- - name: "eth0"
- type: "ethernet"
+ - name: eth0
+ type: ethernet
ip:
route_metric4: 100
dhcp4: no
@@ -341,8 +366,8 @@ integer giving the speed in Mb/s, the valid values of `duplex` are `half` and `f
```yaml
network_connections:
- - name: "eth0"
- type: "ethernet"
+ - name: eth0
+ type: ethernet
ethernet:
autoneg: no
@@ -356,9 +381,9 @@ Device types like `bridge`, `bond`, `team` work similar:
```yaml
network_connections:
- - name: "br0"
+ - name: br0
type: bridge
- #interface_name: br0 # defaults to the connection name
+ #interface_name: br0 # defaults to the connection name
```
Note that `team` is not supported on RHEL6 kernels.
@@ -368,7 +393,8 @@ For slaves of these virtual types, the special properites `slave_type` and
```yaml
network_connections:
- - name: br0
+ - name: internal-br0
+ interface_name: br0
type: bridge
ip:
dhcp4: no
@@ -377,7 +403,7 @@ network_connections:
- name: br0-bond0
type: bond
interface_name: bond0
- master: br0
+ master: internal-br0
slave_type: bridge
- name: br0-bond0-eth1
@@ -450,8 +476,8 @@ network_connections:
parent: eth0-profile
macvlan:
mode: bridge
- promiscuous: True
- tap: False
+ promiscuous: yes
+ tap: no
ip:
address:
- 192.168.1.1/24
@@ -473,7 +499,7 @@ operating system. This is usually `nm` except for RHEL 6 or CentOS 6 systems.
```yaml
network_provider: nm
network_connections:
- - name: "eth0"
+ - name: eth0
#...
```
@@ -511,7 +537,7 @@ so that the host is connected to a management LAN or VLAN. It strongly depends o
- It seems difficult to change networking of the target host in a way that breaks the current
SSH connection of ansible. If you want to do that, ansible-pull might be a solution.
Alternatively, a combination of `async`/`poll` with changing the `ansible_host` midway
- of the play.
+ of the play.
**TODO** The current role doesn't yet support to easily split the
play in a pre-configure step, and a second step to activate the new configuration.
diff --git a/examples/eth-with-vlan.yml b/examples/eth-with-vlan.yml
index 63c7432..69da673 100644
--- a/examples/eth-with-vlan.yml
+++ b/examples/eth-with-vlan.yml
@@ -8,6 +8,7 @@
- name: prod2
type: ethernet
autoconnect: no
+ state: up
interface_name: "{{ network_interface_name2 }}"
ip:
dhcp4: no
diff --git a/examples/infiniband.yml b/examples/infiniband.yml
index e7197fe..22603d9 100644
--- a/examples/infiniband.yml
+++ b/examples/infiniband.yml
@@ -15,6 +15,7 @@
autoconnect: yes
infiniband_p_key: 10
parent: ib0
+ state: up
ip:
dhcp4: no
auto6: no
diff --git a/examples/macvlan.yml b/examples/macvlan.yml
index 0e6ba1b..90cd09d 100644
--- a/examples/macvlan.yml
+++ b/examples/macvlan.yml
@@ -6,6 +6,7 @@
- name: eth0
type: ethernet
+ state: up
interface_name: eth0
ip:
address:
@@ -14,6 +15,7 @@
# Create a virtual ethernet card bound to eth0
- name: veth0
type: macvlan
+ state: up
parent: eth0
macvlan:
mode: bridge
diff --git a/library/network_connections.py b/library/network_connections.py
index 4da3ae8..b8c5ec7 100644
--- a/library/network_connections.py
+++ b/library/network_connections.py
@@ -5,9 +5,21 @@
import functools
import os
import socket
-import sys
import traceback
+# pylint: disable=import-error, no-name-in-module
+from ansible.module_utils.network_lsr import MyError
+
+# pylint: disable=import-error
+from ansible.module_utils.network_lsr.argument_validator import (
+ ArgUtil,
+ ArgValidator_ListConnections,
+ ValidationError,
+)
+
+# pylint: disable=import-error
+from ansible.module_utils.network_lsr.utils import Util
+
DOCUMENTATION = """
---
module: network_connections
@@ -46,21 +58,6 @@ def fmt(level):
return "<%-6s" % (str(level) + ">")
-class MyError(Exception):
- pass
-
-
-class ValidationError(MyError):
- def __init__(self, name, message):
- Exception.__init__(self, name + ": " + message)
- self.error_message = message
- self.name = name
-
- @staticmethod
- def from_connection(idx, message):
- return ValidationError("connection[" + str(idx) + "]", message)
-
-
# cmp() is not available in python 3 anymore
if "cmp" not in dir(__builtins__):
@@ -76,281 +73,6 @@ def cmp(x, y):
return (x > y) - (x < y)
-class Util:
-
- PY3 = sys.version_info[0] == 3
-
- STRING_TYPE = str if PY3 else basestring # noqa:F821
-
- @staticmethod
- def first(iterable, default=None, pred=None):
- for v in iterable:
- if pred is None or pred(v):
- return v
- return default
-
- @staticmethod
- def check_output(argv):
- # subprocess.check_output is python 2.7.
- with open("/dev/null", "wb") as DEVNULL:
- import subprocess
-
- env = os.environ.copy()
- env["LANG"] = "C"
- p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=DEVNULL, env=env)
- # FIXME: Can we assume this to always be UTF-8?
- out = p.communicate()[0].decode("UTF-8")
- if p.returncode != 0:
- raise MyError("failure calling %s: exit with %s" % (argv, p.returncode))
- return out
-
- @classmethod
- def create_uuid(cls):
- cls.NM()
- return str(cls._uuid.uuid4())
-
- @classmethod
- def NM(cls):
- n = getattr(cls, "_NM", None)
- if n is None:
- # Installing pygobject in a tox virtualenv does not work out of the
- # box
- # pylint: disable=import-error
- import gi
-
- gi.require_version("NM", "1.0")
- from gi.repository import NM, GLib, Gio, GObject
-
- cls._NM = NM
- cls._GLib = GLib
- cls._Gio = Gio
- cls._GObject = GObject
- n = NM
- import uuid
-
- cls._uuid = uuid
- return n
-
- @classmethod
- def GLib(cls):
- cls.NM()
- return cls._GLib
-
- @classmethod
- def Gio(cls):
- cls.NM()
- return cls._Gio
-
- @classmethod
- def GObject(cls):
- cls.NM()
- return cls._GObject
-
- @classmethod
- def Timestamp(cls):
- return cls.GLib().get_monotonic_time()
-
- @classmethod
- def GMainLoop(cls):
- gmainloop = getattr(cls, "_GMainLoop", None)
- if gmainloop is None:
- gmainloop = cls.GLib().MainLoop()
- cls._GMainLoop = gmainloop
- return gmainloop
-
- @classmethod
- def GMainLoop_run(cls, timeout=None):
- if timeout is None:
- cls.GMainLoop().run()
- return True
-
- GLib = cls.GLib()
- result = []
- loop = cls.GMainLoop()
-
- def _timeout_cb(unused):
- result.append(1)
- loop.quit()
- return False
-
- timeout_id = GLib.timeout_add(int(timeout * 1000), _timeout_cb, None)
- loop.run()
- if result:
- return False
- GLib.source_remove(timeout_id)
- return True
-
- @classmethod
- def GMainLoop_iterate(cls, may_block=False):
- return cls.GMainLoop().get_context().iteration(may_block)
-
- @classmethod
- def GMainLoop_iterate_all(cls):
- c = 0
- while cls.GMainLoop_iterate():
- c += 1
- return c
-
- @classmethod
- def create_cancellable(cls):
- return cls.Gio().Cancellable.new()
-
- @classmethod
- def error_is_cancelled(cls, e):
- GLib = cls.GLib()
- if isinstance(e, GLib.GError):
- if (
- e.domain == "g-io-error-quark"
- and e.code == cls.Gio().IOErrorEnum.CANCELLED
- ):
- return True
- return False
-
- @staticmethod
- def ifname_valid(ifname):
- # see dev_valid_name() in kernel's net/core/dev.c
- if not ifname:
- return False
- if ifname in [".", ".."]:
- return False
- if len(ifname) >= 16:
- return False
- if any([c == "/" or c == ":" or c.isspace() for c in ifname]):
- return False
- # FIXME: encoding issues regarding python unicode string
- return True
-
- @staticmethod
- def mac_aton(mac_str, force_len=None):
- # we also accept None and '' for convenience.
- # - None yiels None
- # - '' yields []
- if mac_str is None:
- return mac_str
- i = 0
- b = []
- for c in mac_str:
- if i == 2:
- if c != ":":
- raise MyError("not a valid MAC address: '%s'" % (mac_str))
- i = 0
- continue
- try:
- if i == 0:
- n = int(c, 16) * 16
- i = 1
- else:
- assert i == 1
- n = n + int(c, 16)
- i = 2
- b.append(n)
- except Exception:
- raise MyError("not a valid MAC address: '%s'" % (mac_str))
- if i == 1:
- raise MyError("not a valid MAC address: '%s'" % (mac_str))
- if force_len is not None:
- if force_len != len(b):
- raise MyError(
- "not a valid MAC address of length %s: '%s'" % (force_len, mac_str)
- )
- return b
-
- @staticmethod
- def mac_ntoa(mac):
- if mac is None:
- return None
- return ":".join(["%02x" % c for c in mac])
-
- @staticmethod
- def mac_norm(mac_str, force_len=None):
- return Util.mac_ntoa(Util.mac_aton(mac_str, force_len))
-
- @staticmethod
- def boolean(arg):
- if arg is None or isinstance(arg, bool):
- return arg
- arg0 = arg
- if isinstance(arg, Util.STRING_TYPE):
- arg = arg.lower()
-
- if arg in ["y", "yes", "on", "1", "true", 1, True]:
- return True
- if arg in ["n", "no", "off", "0", "false", 0, False]:
- return False
-
- raise MyError("value '%s' is not a boolean" % (arg0))
-
- @staticmethod
- def parse_ip(addr, family=None):
- if addr is None:
- return (None, None)
- if family is not None:
- Util.addr_family_check(family)
- a = socket.inet_pton(family, addr)
- else:
- a = None
- family = None
- try:
- a = socket.inet_pton(socket.AF_INET, addr)
- family = socket.AF_INET
- except Exception:
- a = socket.inet_pton(socket.AF_INET6, addr)
- family = socket.AF_INET6
- return (socket.inet_ntop(family, a), family)
-
- @staticmethod
- def addr_family_check(family):
- if family != socket.AF_INET and family != socket.AF_INET6:
- raise MyError("invalid address family %s" % (family))
-
- @staticmethod
- def addr_family_to_v(family):
- if family is None:
- return ""
- if family == socket.AF_INET:
- return "v4"
- if family == socket.AF_INET6:
- return "v6"
- raise MyError("invalid address family '%s'" % (family))
-
- @staticmethod
- def addr_family_default_prefix(family):
- Util.addr_family_check(family)
- if family == socket.AF_INET:
- return 24
- else:
- return 64
-
- @staticmethod
- def addr_family_valid_prefix(family, prefix):
- Util.addr_family_check(family)
- if family == socket.AF_INET:
- m = 32
- else:
- m = 128
- return prefix >= 0 and prefix <= m
-
- @staticmethod
- def parse_address(address, family=None):
- try:
- parts = address.split()
- addr_parts = parts[0].split("/")
- if len(addr_parts) != 2:
- raise MyError("expect two addr-parts: ADDR/PLEN")
- a, family = Util.parse_ip(addr_parts[0], family)
- prefix = int(addr_parts[1])
- if not Util.addr_family_valid_prefix(family, prefix):
- raise MyError("invalid prefix %s" % (prefix))
- if len(parts) > 1:
- raise MyError("too many parts")
- return {"address": a, "family": family, "prefix": prefix}
- except Exception:
- raise MyError("invalid address '%s'" % (address))
-
-
-###############################################################################
-
-
class SysUtil:
@staticmethod
def _sysctl_read(filename):
@@ -456,1058 +178,6 @@ def link_info_find(cls, refresh=False, mac=None, ifname=None):
###############################################################################
-class ArgUtil:
- @staticmethod
- def connection_find_by_name(name, connections, n_connections=None):
- if not name:
- raise ValueError("missing name argument")
- c = None
- for idx, connection in enumerate(connections):
- if n_connections is not None and idx >= n_connections:
- break
- if "name" not in connection or name != connection["name"]:
- continue
-
- if connection["state"] == "absent":
- c = None
- elif "type" in connection:
- assert connection["state"] in ["up", "present"]
- c = connection
- return c
-
- @staticmethod
- def connection_find_master(name, connections, n_connections=None):
- c = ArgUtil.connection_find_by_name(name, connections, n_connections)
- if not c:
- raise MyError("invalid master/parent '%s'" % (name))
- if c["interface_name"] is None:
- raise MyError(
- "invalid master/parent '%s' which needs an 'interface_name'" % (name)
- )
- if not Util.ifname_valid(c["interface_name"]):
- raise MyError(
- 'invalid master/parent \'%s\' with invalid "interface_name" ("%s")'
- % (name, c["interface_name"])
- )
- return c["interface_name"]
-
- @staticmethod
- def connection_find_master_uuid(name, connections, n_connections=None):
- c = ArgUtil.connection_find_by_name(name, connections, n_connections)
- if not c:
- raise MyError("invalid master/parent '%s'" % (name))
- return c["nm.uuid"]
-
- @staticmethod
- def connection_get_non_absent_names(connections):
- # @idx is the index with state['absent']. This will
- # return the names of all explicitly mentioned profiles.
- # That is, the names of profiles that should not be deleted.
- result = set()
- for connection in connections:
- if "name" not in connection:
- continue
- if not connection["name"]:
- continue
- result.add(connection["name"])
- return result
-
-
-class ArgValidator:
- MISSING = object()
- DEFAULT_SENTINEL = object()
-
- def __init__(self, name=None, required=False, default_value=None):
- self.name = name
- self.required = required
- self.default_value = default_value
-
- def get_default_value(self):
- try:
- return self.default_value()
- except Exception: # pylint: disable=broad-except
- return self.default_value
-
- def _validate(self, value, name):
- raise NotImplementedError()
-
- def validate(self, value, name=None):
- name = name or self.name or ""
- validated = self._validate(value, name)
- return self._validate_post(value, name, validated)
-
- # pylint: disable=unused-argument,no-self-use
- def _validate_post(self, value, name, result):
- return result
-
-
-class ArgValidatorStr(ArgValidator):
- def __init__( # pylint: disable=too-many-arguments
- self,
- name,
- required=False,
- default_value=None,
- enum_values=None,
- allow_empty=False,
- ):
- ArgValidator.__init__(self, name, required, default_value)
- self.enum_values = enum_values
- self.allow_empty = allow_empty
-
- def _validate(self, value, name):
- if not isinstance(value, Util.STRING_TYPE):
- raise ValidationError(name, "must be a string but is '%s'" % (value))
- value = str(value)
- if self.enum_values is not None and value not in self.enum_values:
- raise ValidationError(
- name,
- "is '%s' but must be one of '%s'"
- % (value, "' '".join(sorted(self.enum_values))),
- )
- if not self.allow_empty and not value:
- raise ValidationError(name, "cannot be empty")
- return value
-
-
-class ArgValidatorNum(ArgValidator):
- def __init__( # pylint: disable=too-many-arguments
- self,
- name,
- required=False,
- val_min=None,
- val_max=None,
- default_value=ArgValidator.DEFAULT_SENTINEL,
- numeric_type=int,
- ):
- ArgValidator.__init__(
- self,
- name,
- required,
- numeric_type(0)
- if default_value is ArgValidator.DEFAULT_SENTINEL
- else default_value,
- )
- self.val_min = val_min
- self.val_max = val_max
- self.numeric_type = numeric_type
-
- def _validate(self, value, name):
- v = None
- try:
- if isinstance(value, self.numeric_type):
- v = value
- else:
- v2 = self.numeric_type(value)
- if isinstance(value, Util.STRING_TYPE) or v2 == value:
- v = v2
- except Exception:
- pass
- if v is None:
- raise ValidationError(
- name, "must be an integer number but is '%s'" % (value)
- )
- if self.val_min is not None and v < self.val_min:
- raise ValidationError(
- name, "value is %s but cannot be less then %s" % (value, self.val_min)
- )
- if self.val_max is not None and v > self.val_max:
- raise ValidationError(
- name,
- "value is %s but cannot be greater then %s" % (value, self.val_max),
- )
- return v
-
-
-class ArgValidatorBool(ArgValidator):
- def __init__(self, name, required=False, default_value=False):
- ArgValidator.__init__(self, name, required, default_value)
-
- def _validate(self, value, name):
- try:
- if isinstance(value, bool):
- return value
- if isinstance(value, Util.STRING_TYPE) or isinstance(value, int):
- return Util.boolean(value)
- except Exception:
- pass
- raise ValidationError(name, "must be an boolean but is '%s'" % (value))
-
-
-class ArgValidatorDict(ArgValidator):
- def __init__(
- self,
- name=None,
- required=False,
- nested=None,
- default_value=None,
- all_missing_during_validate=False,
- ):
- ArgValidator.__init__(self, name, required, default_value)
- if nested is not None:
- self.nested = dict([(v.name, v) for v in nested])
- else:
- self.nested = {}
- self.all_missing_during_validate = all_missing_during_validate
-
- def _validate(self, value, name):
- result = {}
- seen_keys = set()
- try:
- items = list(value.items())
- except AttributeError:
- raise ValidationError(name, "invalid content is not a dictionary")
- for (k, v) in items:
- if k in seen_keys:
- raise ValidationError(name, "duplicate key '%s'" % (k))
- seen_keys.add(k)
- validator = self.nested.get(k, None)
- if validator is None:
- raise ValidationError(name, "invalid key '%s'" % (k))
- try:
- vv = validator.validate(v, name + "." + k)
- except ValidationError as e:
- raise ValidationError(e.name, e.error_message)
- result[k] = vv
- for (k, v) in self.nested.items():
- if k in seen_keys:
- continue
- if v.required:
- raise ValidationError(name, "missing required key '%s'" % (k))
- vv = v.get_default_value()
- if not self.all_missing_during_validate and vv is not ArgValidator.MISSING:
- result[k] = vv
- return result
-
-
-class ArgValidatorList(ArgValidator):
- def __init__(self, name, nested, default_value=None):
- ArgValidator.__init__(self, name, required=False, default_value=default_value)
- self.nested = nested
-
- def _validate(self, value, name):
-
- if isinstance(value, Util.STRING_TYPE):
- # we expect a list. However, for convenience allow to
- # specify a string, separated by space. Escaping is
- # not supported. If you need that, define a proper list.
- value = [s for s in value.split(" ") if s]
-
- result = []
- for (idx, v) in enumerate(value):
- try:
- vv = self.nested.validate(v, name + "[" + str(idx) + "]")
- except ValidationError as e:
- raise ValidationError(e.name, e.error_message)
- result.append(vv)
- return result
-
-
-class ArgValidatorIP(ArgValidatorStr):
- def __init__(
- self, name, family=None, required=False, default_value=None, plain_address=True
- ):
- ArgValidatorStr.__init__(self, name, required, default_value, None)
- self.family = family
- self.plain_address = plain_address
-
- def _validate(self, value, name):
- v = ArgValidatorStr._validate(self, value, name)
- try:
- addr, family = Util.parse_ip(v, self.family)
- except Exception:
- raise ValidationError(
- name,
- "value '%s' is not a valid IP%s address"
- % (value, Util.addr_family_to_v(self.family)),
- )
- if self.plain_address:
- return addr
- return {"family": family, "address": addr}
-
-
-class ArgValidatorMac(ArgValidatorStr):
- def __init__(self, name, force_len=None, required=False, default_value=None):
- ArgValidatorStr.__init__(self, name, required, default_value, None)
- self.force_len = force_len
-
- def _validate(self, value, name):
- v = ArgValidatorStr._validate(self, value, name)
- try:
- addr = Util.mac_aton(v, self.force_len)
- except MyError:
- raise ValidationError(
- name, "value '%s' is not a valid MAC address" % (value)
- )
- if not addr:
- raise ValidationError(
- name, "value '%s' is not a valid MAC address" % (value)
- )
- return Util.mac_ntoa(addr)
-
-
-class ArgValidatorIPAddr(ArgValidatorDict):
- def __init__(self, name, family=None, required=False, default_value=None):
- ArgValidatorDict.__init__(
- self,
- name,
- required,
- nested=[
- ArgValidatorIP(
- "address", family=family, required=True, plain_address=False
- ),
- ArgValidatorNum("prefix", default_value=None, val_min=0),
- ],
- )
- self.family = family
-
- def _validate(self, value, name):
- if isinstance(value, Util.STRING_TYPE):
- v = str(value)
- if not v:
- raise ValidationError(name, "cannot be empty")
- try:
- return Util.parse_address(v, self.family)
- except Exception:
- raise ValidationError(
- name,
- "value '%s' is not a valid IP%s address with prefix length"
- % (value, Util.addr_family_to_v(self.family)),
- )
- v = ArgValidatorDict._validate(self, value, name)
- return {
- "address": v["address"]["address"],
- "family": v["address"]["family"],
- "prefix": v["prefix"],
- }
-
- def _validate_post(self, value, name, result):
- family = result["family"]
- prefix = result["prefix"]
- if prefix is None:
- prefix = Util.addr_family_default_prefix(family)
- result["prefix"] = prefix
- elif not Util.addr_family_valid_prefix(family, prefix):
- raise ValidationError(name, "invalid prefix %s in '%s'" % (prefix, value))
- return result
-
-
-class ArgValidatorIPRoute(ArgValidatorDict):
- def __init__(self, name, family=None, required=False, default_value=None):
- ArgValidatorDict.__init__(
- self,
- name,
- required,
- nested=[
- ArgValidatorIP(
- "network", family=family, required=True, plain_address=False
- ),
- ArgValidatorNum("prefix", default_value=None, val_min=0),
- ArgValidatorIP(
- "gateway", family=family, default_value=None, plain_address=False
- ),
- ArgValidatorNum(
- "metric", default_value=-1, val_min=-1, val_max=0xFFFFFFFF
- ),
- ],
- )
- self.family = family
-
- def _validate_post(self, value, name, result):
- network = result["network"]
-
- family = network["family"]
- result["network"] = network["address"]
- result["family"] = family
-
- gateway = result["gateway"]
- if gateway is not None:
- if family != gateway["family"]:
- raise ValidationError(
- name,
- "conflicting address family between network and gateway '%s'"
- % (gateway["address"]),
- )
- result["gateway"] = gateway["address"]
-
- prefix = result["prefix"]
- if prefix is None:
- prefix = Util.addr_family_default_prefix(family)
- result["prefix"] = prefix
- elif not Util.addr_family_valid_prefix(family, prefix):
- raise ValidationError(name, "invalid prefix %s in '%s'" % (prefix, value))
-
- return result
-
-
-class ArgValidator_DictIP(ArgValidatorDict):
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="ip",
- nested=[
- ArgValidatorBool("dhcp4", default_value=None),
- ArgValidatorBool("dhcp4_send_hostname", default_value=None),
- ArgValidatorIP("gateway4", family=socket.AF_INET),
- ArgValidatorNum(
- "route_metric4", val_min=-1, val_max=0xFFFFFFFF, default_value=None
- ),
- ArgValidatorBool("auto6", default_value=None),
- ArgValidatorIP("gateway6", family=socket.AF_INET6),
- ArgValidatorNum(
- "route_metric6", val_min=-1, val_max=0xFFFFFFFF, default_value=None
- ),
- ArgValidatorList(
- "address",
- nested=ArgValidatorIPAddr("address[?]"),
- default_value=list,
- ),
- ArgValidatorList(
- "route", nested=ArgValidatorIPRoute("route[?]"), default_value=list
- ),
- ArgValidatorBool("route_append_only"),
- ArgValidatorBool("rule_append_only"),
- ArgValidatorList(
- "dns",
- nested=ArgValidatorIP("dns[?]", plain_address=False),
- default_value=list,
- ),
- ArgValidatorList(
- "dns_search",
- nested=ArgValidatorStr("dns_search[?]"),
- default_value=list,
- ),
- ],
- default_value=lambda: {
- "dhcp4": True,
- "dhcp4_send_hostname": None,
- "gateway4": None,
- "route_metric4": None,
- "auto6": True,
- "gateway6": None,
- "route_metric6": None,
- "address": [],
- "route": [],
- "route_append_only": False,
- "rule_append_only": False,
- "dns": [],
- "dns_search": [],
- },
- )
-
- def _validate_post(self, value, name, result):
- if result["dhcp4"] is None:
- result["dhcp4"] = result["dhcp4_send_hostname"] is not None or not any(
- [a for a in result["address"] if a["family"] == socket.AF_INET]
- )
- if result["auto6"] is None:
- result["auto6"] = not any(
- [a for a in result["address"] if a["family"] == socket.AF_INET6]
- )
- if result["dhcp4_send_hostname"] is not None:
- if not result["dhcp4"]:
- raise ValidationError(
- name, "'dhcp4_send_hostname' is only valid if 'dhcp4' is enabled"
- )
- return result
-
-
-class ArgValidator_DictEthernet(ArgValidatorDict):
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="ethernet",
- nested=[
- ArgValidatorBool("autoneg", default_value=None),
- ArgValidatorNum(
- "speed", val_min=0, val_max=0xFFFFFFFF, default_value=0
- ),
- ArgValidatorStr("duplex", enum_values=["half", "full"]),
- ],
- default_value=ArgValidator.MISSING,
- )
-
- def get_default_ethernet(self):
- return {"autoneg": None, "speed": 0, "duplex": None}
-
- def _validate_post(self, value, name, result):
- has_speed_or_duplex = result["speed"] != 0 or result["duplex"] is not None
- if result["autoneg"] is None:
- if has_speed_or_duplex:
- result["autoneg"] = False
- elif result["autoneg"]:
- if has_speed_or_duplex:
- raise ValidationError(
- name,
- "cannot specify '%s' with 'autoneg' enabled"
- % ("duplex" if result["duplex"] is not None else "speed"),
- )
- else:
- if not has_speed_or_duplex:
- raise ValidationError(
- name,
- 'need to specify \'duplex\' and "speed" with "autoneg" enabled',
- )
- if has_speed_or_duplex and (result["speed"] == 0 or result["duplex"] is None):
- raise ValidationError(
- name,
- 'need to specify both \'speed\' and "duplex" with "autoneg" disabled',
- )
- return result
-
-
-class ArgValidator_DictBond(ArgValidatorDict):
-
- VALID_MODES = [
- "balance-rr",
- "active-backup",
- "balance-xor",
- "broadcast",
- "802.3ad",
- "balance-tlb",
- "balance-alb",
- ]
-
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="bond",
- nested=[
- ArgValidatorStr("mode", enum_values=ArgValidator_DictBond.VALID_MODES),
- ArgValidatorNum(
- "miimon", val_min=0, val_max=1000000, default_value=None
- ),
- ],
- default_value=ArgValidator.MISSING,
- )
-
- def get_default_bond(self):
- return {"mode": ArgValidator_DictBond.VALID_MODES[0], "miimon": None}
-
-
-class ArgValidator_DictInfiniband(ArgValidatorDict):
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="infiniband",
- nested=[
- ArgValidatorStr(
- "transport_mode", enum_values=["datagram", "connected"]
- ),
- ArgValidatorNum("p_key", val_min=-1, val_max=0xFFFF, default_value=-1),
- ],
- default_value=ArgValidator.MISSING,
- )
-
- def get_default_infiniband(self):
- return {"transport_mode": "datagram", "p_key": -1}
-
-
-class ArgValidator_DictVlan(ArgValidatorDict):
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="vlan",
- nested=[ArgValidatorNum("id", val_min=0, val_max=4094, required=True)],
- default_value=ArgValidator.MISSING,
- )
-
- def get_default_vlan(self):
- return {"id": None}
-
-
-class ArgValidator_DictMacvlan(ArgValidatorDict):
-
- VALID_MODES = ["vepa", "bridge", "private", "passthru", "source"]
-
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="macvlan",
- nested=[
- ArgValidatorStr(
- "mode",
- enum_values=ArgValidator_DictMacvlan.VALID_MODES,
- default_value="bridge",
- ),
- ArgValidatorBool("promiscuous", default_value=True),
- ArgValidatorBool("tap", default_value=False),
- ],
- default_value=ArgValidator.MISSING,
- )
-
- def get_default_macvlan(self):
- return {"mode": "bridge", "promiscuous": True, "tap": False}
-
- def _validate_post(self, value, name, result):
- if result["promiscuous"] is False and result["mode"] != "passthru":
- raise ValidationError(
- name, "non promiscuous operation is allowed only in passthru mode"
- )
- return result
-
-
-class ArgValidator_DictConnection(ArgValidatorDict):
-
- VALID_STATES = ["up", "down", "present", "absent", "wait"]
- VALID_TYPES = [
- "ethernet",
- "infiniband",
- "bridge",
- "team",
- "bond",
- "vlan",
- "macvlan",
- ]
- VALID_SLAVE_TYPES = ["bridge", "bond", "team"]
-
- def __init__(self):
- ArgValidatorDict.__init__(
- self,
- name="connections[?]",
- nested=[
- ArgValidatorStr("name"),
- ArgValidatorStr(
- "state", enum_values=ArgValidator_DictConnection.VALID_STATES
- ),
- ArgValidatorBool("force_state_change", default_value=None),
- ArgValidatorNum("wait", val_min=0, val_max=3600, numeric_type=float),
- ArgValidatorStr(
- "type", enum_values=ArgValidator_DictConnection.VALID_TYPES
- ),
- ArgValidatorBool("autoconnect", default_value=True),
- ArgValidatorStr(
- "slave_type",
- enum_values=ArgValidator_DictConnection.VALID_SLAVE_TYPES,
- ),
- ArgValidatorStr("master"),
- ArgValidatorStr("interface_name", allow_empty=True),
- ArgValidatorMac("mac"),
- ArgValidatorNum(
- "mtu", val_min=0, val_max=0xFFFFFFFF, default_value=None
- ),
- ArgValidatorStr("zone"),
- ArgValidatorBool("check_iface_exists", default_value=True),
- ArgValidatorStr("parent"),
- ArgValidatorBool("ignore_errors", default_value=None),
- ArgValidator_DictIP(),
- ArgValidator_DictEthernet(),
- ArgValidator_DictBond(),
- ArgValidator_DictInfiniband(),
- ArgValidator_DictVlan(),
- ArgValidator_DictMacvlan(),
- # deprecated options:
- ArgValidatorStr(
- "infiniband_transport_mode",
- enum_values=["datagram", "connected"],
- default_value=ArgValidator.MISSING,
- ),
- ArgValidatorNum(
- "infiniband_p_key",
- val_min=-1,
- val_max=0xFFFF,
- default_value=ArgValidator.MISSING,
- ),
- ArgValidatorNum(
- "vlan_id",
- val_min=0,
- val_max=4094,
- default_value=ArgValidator.MISSING,
- ),
- ],
- default_value=dict,
- all_missing_during_validate=True,
- )
-
- def _validate_post(self, value, name, result):
- if "state" not in result:
- if "type" in result:
- result["state"] = "present"
- elif list(result.keys()) == ["wait"]:
- result["state"] = "wait"
- else:
- result["state"] = "up"
-
- if result["state"] == "present" or (
- result["state"] == "up" and "type" in result
- ):
- VALID_FIELDS = list(self.nested.keys())
- if result["state"] == "present":
- VALID_FIELDS.remove("wait")
- VALID_FIELDS.remove("force_state_change")
- elif result["state"] in ["up", "down"]:
- VALID_FIELDS = [
- "name",
- "state",
- "wait",
- "ignore_errors",
- "force_state_change",
- ]
- elif result["state"] == "absent":
- VALID_FIELDS = ["name", "state", "ignore_errors"]
- elif result["state"] == "wait":
- VALID_FIELDS = ["state", "wait"]
- else:
- assert False
-
- VALID_FIELDS = set(VALID_FIELDS)
- for k in result:
- if k not in VALID_FIELDS:
- raise ValidationError(
- name + "." + k,
- "property is not allowed for state '%s'" % (result["state"]),
- )
-
- if result["state"] != "wait":
- if result["state"] == "absent":
- if "name" not in result:
- result[
- "name"
- ] = "" # set to empty string to mean *absent all others*
- else:
- if "name" not in result:
- raise ValidationError(name, "missing 'name'")
-
- if result["state"] in ["wait", "up", "down"]:
- if "wait" not in result:
- result["wait"] = None
- else:
- if "wait" in result:
- raise ValidationError(
- name + ".wait",
- "'wait' is not allowed for state '%s'" % (result["state"]),
- )
-
- if result["state"] == "present" and "type" not in result:
- raise ValidationError(
- name + ".state", '"present" state requires a "type" argument'
- )
-
- if "type" in result:
-
- if "master" in result:
- if "slave_type" not in result:
- result["slave_type"] = None
- if result["master"] == result["name"]:
- raise ValidationError(
- name + ".master", '"master" cannot refer to itself'
- )
- else:
- if "slave_type" in result:
- raise ValidationError(
- name + ".slave_type",
- "'slave_type' requires a 'master' property",
- )
-
- if "ip" in result:
- if "master" in result:
- raise ValidationError(
- name + ".ip", 'a slave cannot have an "ip" property'
- )
- else:
- if "master" not in result:
- result["ip"] = self.nested["ip"].get_default_value()
-
- if "zone" in result:
- if "master" in result:
- raise ValidationError(
- name + ".zone", '"zone" cannot be configured for slave types'
- )
- else:
- result["zone"] = None
-
- if "mac" in result:
- if result["type"] not in ["ethernet", "infiniband"]:
- raise ValidationError(
- name + ".mac",
- "a 'mac' address is only allowed for type 'ethernet' "
- "or 'infiniband'",
- )
- maclen = len(Util.mac_aton(result["mac"]))
- if result["type"] == "ethernet" and maclen != 6:
- raise ValidationError(
- name + ".mac",
- "a 'mac' address for type ethernet requires 6 octets "
- "but is '%s'" % result["mac"],
- )
- if result["type"] == "infiniband" and maclen != 20:
- raise ValidationError(
- name + ".mac",
- "a 'mac' address for type ethernet requires 20 octets "
- "but is '%s'" % result["mac"],
- )
-
- if result["type"] == "infiniband":
- if "infiniband" not in result:
- result["infiniband"] = self.nested[
- "infiniband"
- ].get_default_infiniband()
- if "infiniband_transport_mode" in result:
- result["infiniband"]["transport_mode"] = result[
- "infiniband_transport_mode"
- ]
- del result["infiniband_transport_mode"]
- if "infiniband_p_key" in result:
- result["infiniband"]["p_key"] = result["infiniband_p_key"]
- del result["infiniband_p_key"]
- else:
- if "infiniband_transport_mode" in result:
- raise ValidationError(
- name + ".infiniband_transport_mode",
- "cannot mix deprecated 'infiniband_transport_mode' "
- "property with 'infiniband' settings",
- )
- if "infiniband_p_key" in result:
- raise ValidationError(
- name + ".infiniband_p_key",
- "cannot mix deprecated 'infiniband_p_key' property "
- "with 'infiniband' settings",
- )
- if result["infiniband"]["transport_mode"] is None:
- result["infiniband"]["transport_mode"] = "datagram"
- if result["infiniband"]["p_key"] != -1:
- if "mac" not in result and "parent" not in result:
- raise ValidationError(
- name + ".infiniband.p_key",
- "a infiniband device with 'infiniband.p_key' "
- "property also needs 'mac' or 'parent' property",
- )
- else:
- if "infiniband" in result:
- raise ValidationError(
- name + ".infiniband",
- "'infiniband' settings are only allowed for type 'infiniband'",
- )
- if "infiniband_transport_mode" in result:
- raise ValidationError(
- name + ".infiniband_transport_mode",
- "a 'infiniband_transport_mode' property is only "
- "allowed for type 'infiniband'",
- )
- if "infiniband_p_key" in result:
- raise ValidationError(
- name + ".infiniband_p_key",
- "a 'infiniband_p_key' property is only allowed for "
- "type 'infiniband'",
- )
-
- if "interface_name" in result:
- # Ignore empty interface_name
- if result["interface_name"] == "":
- del result["interface_name"]
- elif not Util.ifname_valid(result["interface_name"]):
- raise ValidationError(
- name + ".interface_name",
- "invalid 'interface_name' '%s'" % (result["interface_name"]),
- )
- else:
- if not result.get("mac"):
- if not Util.ifname_valid(result["name"]):
- raise ValidationError(
- name + ".interface_name",
- '\'interface_name\' as "name" "%s" is not valid'
- % (result["name"]),
- )
- result["interface_name"] = result["name"]
-
- if "interface_name" not in result and result["type"] in [
- "bond",
- "bridge",
- "macvlan",
- "team",
- "vlan",
- ]:
- raise ValidationError(
- name + ".interface_name",
- "type '%s' requires 'interface_name'" % (result["type"]),
- )
-
- if result["type"] == "vlan":
- if "vlan" not in result:
- if "vlan_id" not in result:
- raise ValidationError(
- name + ".vlan", 'missing "vlan" settings for "type" "vlan"'
- )
- result["vlan"] = self.nested["vlan"].get_default_vlan()
- result["vlan"]["id"] = result["vlan_id"]
- del result["vlan_id"]
- else:
- if "vlan_id" in result:
- raise ValidationError(
- name + ".vlan_id",
- "don't use the deprecated 'vlan_id' together with the "
- "'vlan' settings'",
- )
- if "parent" not in result:
- raise ValidationError(
- name + ".parent", 'missing "parent" for "type" "vlan"'
- )
- else:
- if "vlan" in result:
- raise ValidationError(
- name + ".vlan", '"vlan" is only allowed for "type" "vlan"'
- )
- if "vlan_id" in result:
- raise ValidationError(
- name + ".vlan_id", '"vlan_id" is only allowed for "type" "vlan"'
- )
-
- if "parent" in result:
- if result["type"] not in ["vlan", "macvlan", "infiniband"]:
- raise ValidationError(
- name + ".parent",
- '\'parent\' is only allowed for type "vlan", "macvlan" or '
- "'infiniband'",
- )
- if result["parent"] == result["name"]:
- raise ValidationError(
- name + ".parent", '"parent" cannot refer to itself'
- )
-
- if result["type"] == "bond":
- if "bond" not in result:
- result["bond"] = self.nested["bond"].get_default_bond()
- else:
- if "bond" in result:
- raise ValidationError(
- name + ".bond",
- '\'bond\' settings are not allowed for "type" "%s"'
- % (result["type"]),
- )
-
- if result["type"] in ["ethernet", "vlan", "bridge", "bond", "team"]:
- if "ethernet" not in result:
- result["ethernet"] = self.nested["ethernet"].get_default_ethernet()
- else:
- if "ethernet" in result:
- raise ValidationError(
- name + ".ethernet",
- '\'ethernet\' settings are not allowed for "type" "%s"'
- % (result["type"]),
- )
-
- if result["type"] == "macvlan":
- if "macvlan" not in result:
- result["macvlan"] = self.nested["macvlan"].get_default_macvlan()
- else:
- if "macvlan" in result:
- raise ValidationError(
- name + ".macvlan",
- '\'macvlan\' settings are not allowed for "type" "%s"'
- % (result["type"]),
- )
-
- for k in VALID_FIELDS:
- if k in result:
- continue
- v = self.nested[k]
- vv = v.get_default_value()
- if vv is not ArgValidator.MISSING:
- result[k] = vv
-
- return result
-
-
-class ArgValidator_ListConnections(ArgValidatorList):
- def __init__(self):
- ArgValidatorList.__init__(
- self,
- name="connections",
- nested=ArgValidator_DictConnection(),
- default_value=list,
- )
-
- def _validate_post(self, value, name, result):
- for idx, connection in enumerate(result):
- if connection["state"] in ["up"]:
- if connection["state"] == "up" and "type" in connection:
- pass
- elif not ArgUtil.connection_find_by_name(
- connection["name"], result, idx
- ):
- raise ValidationError(
- name + "[" + str(idx) + "].name",
- "state '%s' references non-existing connection '%s'"
- % (connection["state"], connection["name"]),
- )
- if "type" in connection:
- if connection["master"]:
- c = ArgUtil.connection_find_by_name(
- connection["master"], result, idx
- )
- if not c:
- raise ValidationError(
- name + "[" + str(idx) + "].master",
- "references non-existing 'master' connection '%s'"
- % (connection["master"]),
- )
- if c["type"] not in ArgValidator_DictConnection.VALID_SLAVE_TYPES:
- raise ValidationError(
- name + "[" + str(idx) + "].master",
- "references 'master' connection '%s' which is not a master "
- "type by '%s'" % (connection["master"], c["type"]),
- )
- if connection["slave_type"] is None:
- connection["slave_type"] = c["type"]
- elif connection["slave_type"] != c["type"]:
- raise ValidationError(
- name + "[" + str(idx) + "].master",
- "references 'master' connection '%s' which is of type '%s' "
- "instead of slave_type '%s'"
- % (
- connection["master"],
- c["type"],
- connection["slave_type"],
- ),
- )
- if connection["parent"]:
- if not ArgUtil.connection_find_by_name(
- connection["parent"], result, idx
- ):
- raise ValidationError(
- name + "[" + str(idx) + "].parent",
- "references non-existing 'parent' connection '%s'"
- % (connection["parent"]),
- )
- return result
-
- VALIDATE_ONE_MODE_NM = "nm"
- VALIDATE_ONE_MODE_INITSCRIPTS = "initscripts"
-
- def validate_connection_one(self, mode, connections, idx):
- connection = connections[idx]
- if "type" not in connection:
- return
-
- if (connection["parent"]) and (
- (
- (mode == self.VALIDATE_ONE_MODE_INITSCRIPTS)
- and (connection["type"] == "vlan")
- )
- or (
- (connection["type"] == "infiniband")
- and (connection["infiniband"]["p_key"] != -1)
- )
- ):
- try:
- ArgUtil.connection_find_master(connection["parent"], connections, idx)
- except MyError:
- raise ValidationError.from_connection(
- idx,
- "profile references a parent '%s' which has 'interface_name' "
- "missing" % (connection["parent"]),
- )
-
- if (connection["master"]) and (mode == self.VALIDATE_ONE_MODE_INITSCRIPTS):
- try:
- ArgUtil.connection_find_master(connection["master"], connections, idx)
- except MyError:
- raise ValidationError.from_connection(
- idx,
- "profile references a master '%s' which has 'interface_name' "
- "missing" % (connection["master"]),
- )
-
-
###############################################################################
@@ -1876,6 +546,9 @@ def content_to_dict(cls, content, file_type=None):
@classmethod
def content_from_file(cls, name, file_type=None):
+ """
+ Return dictionary with all file contents for an initscripts profile
+ """
content = {}
for file_type in cls._file_types(file_type):
path = cls.ifcfg_path(name, file_type)
@@ -2698,9 +1371,12 @@ def _complete_kwargs_loglines(self, rr, connections, idx):
prefix = "#"
else:
c = connections[idx]
- prefix = "#%s, state:%s" % (idx, c["state"])
- if c["state"] != "wait":
- prefix = prefix + (", '%s'" % (c["name"]))
+ prefix = "#%s, state:%s persistent_state:%s" % (
+ idx,
+ c["state"],
+ c["persistent_state"],
+ )
+ prefix = prefix + (", '%s'" % (c["name"]))
for r in rr["log"]:
yield (r[2], "[%03d] %s %s: %s" % (r[2], LogLevel.fmt(r[0]), prefix, r[1]))
@@ -2895,13 +1571,14 @@ def connection_modified_earlier(self, idx):
continue
c_state = c["state"]
+ c_pstate = c["persistent_state"]
if c_state == "up" and "type" not in c:
pass
elif c_state == "down":
return True
- elif c_state == "absent":
+ elif c_pstate == "absent":
return True
- elif c_state in ["present", "up"]:
+ elif c_state == "up" or c_pstate == "present":
if self.connections_data[idx]["changed"]:
return True
@@ -2941,28 +1618,17 @@ def run(self):
while self.check_mode_next() != CheckMode.DONE:
for idx, connection in enumerate(self.connections):
try:
- state = connection["state"]
- if state == "wait":
- w = connection["wait"]
- if w is None:
- w = 10
- self.log_info(idx, "wait for %s seconds" % (w))
- if self.check_mode == CheckMode.REAL_RUN:
- import time
-
- time.sleep(w)
- elif state == "absent":
- self.run_state_absent(idx)
- elif state == "present":
- self.run_state_present(idx)
- elif state == "up":
- if "type" in connection:
- self.run_state_present(idx)
- self.run_state_up(idx)
- elif state == "down":
- self.run_state_down(idx)
- else:
- assert False
+ for action in connection["actions"]:
+ if action == "absent":
+ self.run_action_absent(idx)
+ elif action == "present":
+ self.run_action_present(idx)
+ elif action == "up":
+ self.run_action_up(idx)
+ elif action == "down":
+ self.run_action_down(idx)
+ else:
+ assert False
except Exception as e:
self.log_warn(
idx, "failure: %s [[%s]]" % (e, traceback.format_exc())
@@ -3017,16 +1683,16 @@ def run_prepare(self):
% (connection["interface_name"], connection["mac"]),
)
- def run_state_absent(self, idx):
+ def run_action_absent(self, idx):
raise NotImplementedError()
- def run_state_present(self, idx):
+ def run_action_present(self, idx):
raise NotImplementedError()
- def run_state_down(self, idx):
+ def run_action_down(self, idx):
raise NotImplementedError()
- def run_state_up(self, idx):
+ def run_action_up(self, idx):
raise NotImplementedError()
@@ -3053,11 +1719,9 @@ def run_prepare(self):
Cmd.run_prepare(self)
names = {}
for connection in self.connections:
- if connection["state"] not in ["up", "down", "present", "absent"]:
- continue
name = connection["name"]
if not name:
- assert connection["state"] == "absent"
+ assert connection["persistent_state"] == "absent"
continue
if name in names:
exists = names[name]["nm.exists"]
@@ -3074,7 +1738,7 @@ def run_prepare(self):
connection["nm.exists"] = exists
connection["nm.uuid"] = uuid
- def run_state_absent(self, idx):
+ def run_action_absent(self, idx):
seen = set()
name = self.connections[idx]["name"]
black_list_names = None
@@ -3099,13 +1763,23 @@ def run_state_absent(self, idx):
if not seen:
self.log_info(idx, "no connection '%s'" % (name))
- def run_state_present(self, idx):
+ def run_action_present(self, idx):
connection = self.connections[idx]
con_cur = Util.first(
self.nmutil.connection_list(
name=connection["name"], uuid=connection["nm.uuid"]
)
)
+
+ if not connection.get("type"):
+ # if the type is not specified, just check that the connection was
+ # found
+ if not con_cur:
+ self.log_error(
+ idx, "Connection not found on system and 'type' not present"
+ )
+ return
+
con_new = self.nmutil.connection_create(self.connections, idx, con_cur)
if con_cur is None:
self.log_info(
@@ -3159,7 +1833,7 @@ def run_state_present(self, idx):
self.log_error(idx, "delete duplicate connection failed: %s" % (e))
seen.add(c)
- def run_state_up(self, idx):
+ def run_action_up(self, idx):
connection = self.connections[idx]
con = Util.first(
@@ -3223,7 +1897,7 @@ def run_state_up(self, idx):
except MyError as e:
self.log_error(idx, "up connection failed while waiting: %s" % (e))
- def run_state_down(self, idx):
+ def run_action_down(self, idx):
connection = self.connections[idx]
cons = self.nmutil.connection_list(name=connection["name"])
@@ -3300,7 +1974,7 @@ def check_name(self, idx, name=None):
return None
return f
- def run_state_absent(self, idx):
+ def run_action_absent(self, idx):
n = self.connections[idx]["name"]
name = n
if not name:
@@ -3343,7 +2017,7 @@ def run_state_absent(self, idx):
% ("'" + n + "'" if n else "*"),
)
- def run_state_present(self, idx):
+ def run_action_present(self, idx):
if not self.check_name(idx):
return
@@ -3352,6 +2026,15 @@ def run_state_present(self, idx):
old_content = IfcfgUtil.content_from_file(name)
+ if not connection.get("type"):
+ # if the type is not specified, just check that the connection was
+ # found
+ if not old_content.get("ifcfg"):
+ self.log_error(
+ idx, "Connection not found on system and 'type' not present"
+ )
+ return
+
ifcfg_all = IfcfgUtil.ifcfg_create(
self.connections, idx, lambda msg: self.log_warn(idx, msg), old_content
)
@@ -3377,7 +2060,7 @@ def run_state_present(self, idx):
idx, "%s ifcfg-rh profile '%s' failed: %s" % (op, name, e)
)
- def _run_state_updown(self, idx, do_up):
+ def _run_action_updown(self, idx, do_up):
if not self.check_name(idx):
return
@@ -3449,11 +2132,11 @@ def _run_state_updown(self, idx, do_up):
idx, "call '%s %s' failed with exit status %d" % (cmd, name, rc)
)
- def run_state_up(self, idx):
- self._run_state_updown(idx, True)
+ def run_action_up(self, idx):
+ self._run_action_updown(idx, True)
- def run_state_down(self, idx):
- self._run_state_updown(idx, False)
+ def run_action_down(self, idx):
+ self._run_action_updown(idx, False)
###############################################################################
diff --git a/module_utils/network_lsr/__init__.py b/module_utils/network_lsr/__init__.py
new file mode 100644
index 0000000..22c717c
--- /dev/null
+++ b/module_utils/network_lsr/__init__.py
@@ -0,0 +1,7 @@
+#!/usr/bin/python3 -tt
+# vim: fileencoding=utf8
+# SPDX-License-Identifier: BSD-3-Clause
+
+
+class MyError(Exception):
+ pass
diff --git a/module_utils/network_lsr/argument_validator.py b/module_utils/network_lsr/argument_validator.py
new file mode 100644
index 0000000..98b584a
--- /dev/null
+++ b/module_utils/network_lsr/argument_validator.py
@@ -0,0 +1,1119 @@
+#!/usr/bin/python3 -tt
+# vim: fileencoding=utf8
+# SPDX-License-Identifier: BSD-3-Clause
+
+import socket
+
+# pylint: disable=import-error, no-name-in-module
+from ansible.module_utils.network_lsr import MyError
+from ansible.module_utils.network_lsr.utils import Util
+
+
+class ArgUtil:
+ @staticmethod
+ def connection_find_by_name(name, connections, n_connections=None):
+ if not name:
+ raise ValueError("missing name argument")
+ c = None
+ for idx, connection in enumerate(connections):
+ if n_connections is not None and idx >= n_connections:
+ break
+ if "name" not in connection or name != connection["name"]:
+ continue
+
+ if connection["persistent_state"] == "absent":
+ c = None
+ elif connection["persistent_state"] == "present":
+ c = connection
+ return c
+
+ @staticmethod
+ def connection_find_master(name, connections, n_connections=None):
+ c = ArgUtil.connection_find_by_name(name, connections, n_connections)
+ if not c:
+ raise MyError("invalid master/parent '%s'" % (name))
+ if c["interface_name"] is None:
+ raise MyError(
+ "invalid master/parent '%s' which needs an 'interface_name'" % (name)
+ )
+ if not Util.ifname_valid(c["interface_name"]):
+ raise MyError(
+ "invalid master/parent '%s' with invalid 'interface_name' ('%s')"
+ % (name, c["interface_name"])
+ )
+ return c["interface_name"]
+
+ @staticmethod
+ def connection_find_master_uuid(name, connections, n_connections=None):
+ c = ArgUtil.connection_find_by_name(name, connections, n_connections)
+ if not c:
+ raise MyError("invalid master/parent '%s'" % (name))
+ return c["nm.uuid"]
+
+ @staticmethod
+ def connection_get_non_absent_names(connections):
+ # @idx is the index with state['absent']. This will
+ # return the names of all explicitly mentioned profiles.
+ # That is, the names of profiles that should not be deleted.
+ result = set()
+ for connection in connections:
+ if "name" not in connection:
+ continue
+ if not connection["name"]:
+ continue
+ result.add(connection["name"])
+ return result
+
+
+class ValidationError(MyError):
+ def __init__(self, name, message):
+ Exception.__init__(self, name + ": " + message)
+ self.error_message = message
+ self.name = name
+
+ @staticmethod
+ def from_connection(idx, message):
+ return ValidationError("connection[" + str(idx) + "]", message)
+
+
+class ArgValidator:
+ MISSING = object()
+ DEFAULT_SENTINEL = object()
+
+ def __init__(self, name=None, required=False, default_value=None):
+ self.name = name
+ self.required = required
+ self.default_value = default_value
+
+ def get_default_value(self):
+ try:
+ return self.default_value()
+ except Exception: # pylint: disable=broad-except
+ return self.default_value
+
+ def _validate(self, value, name):
+ raise NotImplementedError()
+
+ def validate(self, value, name=None):
+ name = name or self.name or ""
+ validated = self._validate(value, name)
+ return self._validate_post(value, name, validated)
+
+ # pylint: disable=unused-argument,no-self-use
+ def _validate_post(self, value, name, result):
+ return result
+
+
+class ArgValidatorStr(ArgValidator):
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ name,
+ required=False,
+ default_value=None,
+ enum_values=None,
+ allow_empty=False,
+ ):
+ ArgValidator.__init__(self, name, required, default_value)
+ self.enum_values = enum_values
+ self.allow_empty = allow_empty
+
+ def _validate(self, value, name):
+ if not isinstance(value, Util.STRING_TYPE):
+ raise ValidationError(name, "must be a string but is '%s'" % (value))
+ value = str(value)
+ if self.enum_values is not None and value not in self.enum_values:
+ raise ValidationError(
+ name,
+ "is '%s' but must be one of '%s'"
+ % (value, "' '".join(sorted(self.enum_values))),
+ )
+ if not self.allow_empty and not value:
+ raise ValidationError(name, "cannot be empty")
+ return value
+
+
+class ArgValidatorNum(ArgValidator):
+ def __init__( # pylint: disable=too-many-arguments
+ self,
+ name,
+ required=False,
+ val_min=None,
+ val_max=None,
+ default_value=ArgValidator.DEFAULT_SENTINEL,
+ numeric_type=int,
+ ):
+ ArgValidator.__init__(
+ self,
+ name,
+ required,
+ numeric_type(0)
+ if default_value is ArgValidator.DEFAULT_SENTINEL
+ else default_value,
+ )
+ self.val_min = val_min
+ self.val_max = val_max
+ self.numeric_type = numeric_type
+
+ def _validate(self, value, name):
+ v = None
+ try:
+ if isinstance(value, self.numeric_type):
+ v = value
+ else:
+ v2 = self.numeric_type(value)
+ if isinstance(value, Util.STRING_TYPE) or v2 == value:
+ v = v2
+ except Exception:
+ pass
+ if v is None:
+ raise ValidationError(
+ name, "must be an integer number but is '%s'" % (value)
+ )
+ if self.val_min is not None and v < self.val_min:
+ raise ValidationError(
+ name, "value is %s but cannot be less then %s" % (value, self.val_min)
+ )
+ if self.val_max is not None and v > self.val_max:
+ raise ValidationError(
+ name,
+ "value is %s but cannot be greater then %s" % (value, self.val_max),
+ )
+ return v
+
+
+class ArgValidatorBool(ArgValidator):
+ def __init__(self, name, required=False, default_value=False):
+ ArgValidator.__init__(self, name, required, default_value)
+
+ def _validate(self, value, name):
+ try:
+ if isinstance(value, bool):
+ return value
+ if isinstance(value, Util.STRING_TYPE) or isinstance(value, int):
+ return Util.boolean(value)
+ except Exception:
+ pass
+ raise ValidationError(name, "must be an boolean but is '%s'" % (value))
+
+
+class ArgValidatorDict(ArgValidator):
+ def __init__(
+ self,
+ name=None,
+ required=False,
+ nested=None,
+ default_value=None,
+ all_missing_during_validate=False,
+ ):
+ ArgValidator.__init__(self, name, required, default_value)
+ if nested is not None:
+ self.nested = dict([(v.name, v) for v in nested])
+ else:
+ self.nested = {}
+ self.all_missing_during_validate = all_missing_during_validate
+
+ def _validate(self, value, name):
+ result = {}
+ seen_keys = set()
+ try:
+ items = list(value.items())
+ except AttributeError:
+ raise ValidationError(name, "invalid content is not a dictionary")
+ for (k, v) in items:
+ if k in seen_keys:
+ raise ValidationError(name, "duplicate key '%s'" % (k))
+ seen_keys.add(k)
+ validator = self.nested.get(k, None)
+ if validator is None:
+ raise ValidationError(name, "invalid key '%s'" % (k))
+ try:
+ vv = validator.validate(v, name + "." + k)
+ except ValidationError as e:
+ raise ValidationError(e.name, e.error_message)
+ result[k] = vv
+ for (k, v) in self.nested.items():
+ if k in seen_keys:
+ continue
+ if v.required:
+ raise ValidationError(name, "missing required key '%s'" % (k))
+ vv = v.get_default_value()
+ if not self.all_missing_during_validate and vv is not ArgValidator.MISSING:
+ result[k] = vv
+ return result
+
+
+class ArgValidatorList(ArgValidator):
+ def __init__(self, name, nested, default_value=None):
+ ArgValidator.__init__(self, name, required=False, default_value=default_value)
+ self.nested = nested
+
+ def _validate(self, value, name):
+
+ if isinstance(value, Util.STRING_TYPE):
+ # we expect a list. However, for convenience allow to
+ # specify a string, separated by space. Escaping is
+ # not supported. If you need that, define a proper list.
+ value = [s for s in value.split(" ") if s]
+
+ result = []
+ for (idx, v) in enumerate(value):
+ try:
+ vv = self.nested.validate(v, name + "[" + str(idx) + "]")
+ except ValidationError as e:
+ raise ValidationError(e.name, e.error_message)
+ result.append(vv)
+ return result
+
+
+class ArgValidatorIP(ArgValidatorStr):
+ def __init__(
+ self, name, family=None, required=False, default_value=None, plain_address=True
+ ):
+ ArgValidatorStr.__init__(self, name, required, default_value, None)
+ self.family = family
+ self.plain_address = plain_address
+
+ def _validate(self, value, name):
+ v = ArgValidatorStr._validate(self, value, name)
+ try:
+ addr, family = Util.parse_ip(v, self.family)
+ except Exception:
+ raise ValidationError(
+ name,
+ "value '%s' is not a valid IP%s address"
+ % (value, Util.addr_family_to_v(self.family)),
+ )
+ if self.plain_address:
+ return addr
+ return {"family": family, "address": addr}
+
+
+class ArgValidatorMac(ArgValidatorStr):
+ def __init__(self, name, force_len=None, required=False, default_value=None):
+ ArgValidatorStr.__init__(self, name, required, default_value, None)
+ self.force_len = force_len
+
+ def _validate(self, value, name):
+ v = ArgValidatorStr._validate(self, value, name)
+ try:
+ addr = Util.mac_aton(v, self.force_len)
+ except MyError:
+ raise ValidationError(
+ name, "value '%s' is not a valid MAC address" % (value)
+ )
+ if not addr:
+ raise ValidationError(
+ name, "value '%s' is not a valid MAC address" % (value)
+ )
+ return Util.mac_ntoa(addr)
+
+
+class ArgValidatorIPAddr(ArgValidatorDict):
+ def __init__(self, name, family=None, required=False, default_value=None):
+ ArgValidatorDict.__init__(
+ self,
+ name,
+ required,
+ nested=[
+ ArgValidatorIP(
+ "address", family=family, required=True, plain_address=False
+ ),
+ ArgValidatorNum("prefix", default_value=None, val_min=0),
+ ],
+ )
+ self.family = family
+
+ def _validate(self, value, name):
+ if isinstance(value, Util.STRING_TYPE):
+ v = str(value)
+ if not v:
+ raise ValidationError(name, "cannot be empty")
+ try:
+ return Util.parse_address(v, self.family)
+ except Exception:
+ raise ValidationError(
+ name,
+ "value '%s' is not a valid IP%s address with prefix length"
+ % (value, Util.addr_family_to_v(self.family)),
+ )
+ v = ArgValidatorDict._validate(self, value, name)
+ return {
+ "address": v["address"]["address"],
+ "family": v["address"]["family"],
+ "prefix": v["prefix"],
+ }
+
+ def _validate_post(self, value, name, result):
+ family = result["family"]
+ prefix = result["prefix"]
+ if prefix is None:
+ prefix = Util.addr_family_default_prefix(family)
+ result["prefix"] = prefix
+ elif not Util.addr_family_valid_prefix(family, prefix):
+ raise ValidationError(name, "invalid prefix %s in '%s'" % (prefix, value))
+ return result
+
+
+class ArgValidatorIPRoute(ArgValidatorDict):
+ def __init__(self, name, family=None, required=False, default_value=None):
+ ArgValidatorDict.__init__(
+ self,
+ name,
+ required,
+ nested=[
+ ArgValidatorIP(
+ "network", family=family, required=True, plain_address=False
+ ),
+ ArgValidatorNum("prefix", default_value=None, val_min=0),
+ ArgValidatorIP(
+ "gateway", family=family, default_value=None, plain_address=False
+ ),
+ ArgValidatorNum(
+ "metric", default_value=-1, val_min=-1, val_max=0xFFFFFFFF
+ ),
+ ],
+ )
+ self.family = family
+
+ def _validate_post(self, value, name, result):
+ network = result["network"]
+
+ family = network["family"]
+ result["network"] = network["address"]
+ result["family"] = family
+
+ gateway = result["gateway"]
+ if gateway is not None:
+ if family != gateway["family"]:
+ raise ValidationError(
+ name,
+ "conflicting address family between network and gateway '%s'"
+ % (gateway["address"]),
+ )
+ result["gateway"] = gateway["address"]
+
+ prefix = result["prefix"]
+ if prefix is None:
+ prefix = Util.addr_family_default_prefix(family)
+ result["prefix"] = prefix
+ elif not Util.addr_family_valid_prefix(family, prefix):
+ raise ValidationError(name, "invalid prefix %s in '%s'" % (prefix, value))
+
+ return result
+
+
+class ArgValidator_DictIP(ArgValidatorDict):
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="ip",
+ nested=[
+ ArgValidatorBool("dhcp4", default_value=None),
+ ArgValidatorBool("dhcp4_send_hostname", default_value=None),
+ ArgValidatorIP("gateway4", family=socket.AF_INET),
+ ArgValidatorNum(
+ "route_metric4", val_min=-1, val_max=0xFFFFFFFF, default_value=None
+ ),
+ ArgValidatorBool("auto6", default_value=None),
+ ArgValidatorIP("gateway6", family=socket.AF_INET6),
+ ArgValidatorNum(
+ "route_metric6", val_min=-1, val_max=0xFFFFFFFF, default_value=None
+ ),
+ ArgValidatorList(
+ "address",
+ nested=ArgValidatorIPAddr("address[?]"),
+ default_value=list,
+ ),
+ ArgValidatorList(
+ "route", nested=ArgValidatorIPRoute("route[?]"), default_value=list
+ ),
+ ArgValidatorBool("route_append_only"),
+ ArgValidatorBool("rule_append_only"),
+ ArgValidatorList(
+ "dns",
+ nested=ArgValidatorIP("dns[?]", plain_address=False),
+ default_value=list,
+ ),
+ ArgValidatorList(
+ "dns_search",
+ nested=ArgValidatorStr("dns_search[?]"),
+ default_value=list,
+ ),
+ ],
+ default_value=lambda: {
+ "dhcp4": True,
+ "dhcp4_send_hostname": None,
+ "gateway4": None,
+ "route_metric4": None,
+ "auto6": True,
+ "gateway6": None,
+ "route_metric6": None,
+ "address": [],
+ "route": [],
+ "route_append_only": False,
+ "rule_append_only": False,
+ "dns": [],
+ "dns_search": [],
+ },
+ )
+
+ def _validate_post(self, value, name, result):
+ if result["dhcp4"] is None:
+ result["dhcp4"] = result["dhcp4_send_hostname"] is not None or not any(
+ [a for a in result["address"] if a["family"] == socket.AF_INET]
+ )
+ if result["auto6"] is None:
+ result["auto6"] = not any(
+ [a for a in result["address"] if a["family"] == socket.AF_INET6]
+ )
+ if result["dhcp4_send_hostname"] is not None:
+ if not result["dhcp4"]:
+ raise ValidationError(
+ name, "'dhcp4_send_hostname' is only valid if 'dhcp4' is enabled"
+ )
+ return result
+
+
+class ArgValidator_DictEthernet(ArgValidatorDict):
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="ethernet",
+ nested=[
+ ArgValidatorBool("autoneg", default_value=None),
+ ArgValidatorNum(
+ "speed", val_min=0, val_max=0xFFFFFFFF, default_value=0
+ ),
+ ArgValidatorStr("duplex", enum_values=["half", "full"]),
+ ],
+ default_value=ArgValidator.MISSING,
+ )
+
+ def get_default_ethernet(self):
+ return {"autoneg": None, "speed": 0, "duplex": None}
+
+ def _validate_post(self, value, name, result):
+ has_speed_or_duplex = result["speed"] != 0 or result["duplex"] is not None
+ if result["autoneg"] is None:
+ if has_speed_or_duplex:
+ result["autoneg"] = False
+ elif result["autoneg"]:
+ if has_speed_or_duplex:
+ raise ValidationError(
+ name,
+ "cannot specify '%s' with 'autoneg' enabled"
+ % ("duplex" if result["duplex"] is not None else "speed"),
+ )
+ else:
+ if not has_speed_or_duplex:
+ raise ValidationError(
+ name, "need to specify 'duplex' and 'speed' with 'autoneg' enabled"
+ )
+ if has_speed_or_duplex and (result["speed"] == 0 or result["duplex"] is None):
+ raise ValidationError(
+ name,
+ "need to specify both 'speed' and 'duplex' with 'autoneg' disabled",
+ )
+ return result
+
+
+class ArgValidator_DictBond(ArgValidatorDict):
+
+ VALID_MODES = [
+ "balance-rr",
+ "active-backup",
+ "balance-xor",
+ "broadcast",
+ "802.3ad",
+ "balance-tlb",
+ "balance-alb",
+ ]
+
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="bond",
+ nested=[
+ ArgValidatorStr("mode", enum_values=ArgValidator_DictBond.VALID_MODES),
+ ArgValidatorNum(
+ "miimon", val_min=0, val_max=1000000, default_value=None
+ ),
+ ],
+ default_value=ArgValidator.MISSING,
+ )
+
+ def get_default_bond(self):
+ return {"mode": ArgValidator_DictBond.VALID_MODES[0], "miimon": None}
+
+
+class ArgValidator_DictInfiniband(ArgValidatorDict):
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="infiniband",
+ nested=[
+ ArgValidatorStr(
+ "transport_mode", enum_values=["datagram", "connected"]
+ ),
+ ArgValidatorNum("p_key", val_min=-1, val_max=0xFFFF, default_value=-1),
+ ],
+ default_value=ArgValidator.MISSING,
+ )
+
+ def get_default_infiniband(self):
+ return {"transport_mode": "datagram", "p_key": -1}
+
+
+class ArgValidator_DictVlan(ArgValidatorDict):
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="vlan",
+ nested=[ArgValidatorNum("id", val_min=0, val_max=4094, required=True)],
+ default_value=ArgValidator.MISSING,
+ )
+
+ def get_default_vlan(self):
+ return {"id": None}
+
+
+class ArgValidator_DictMacvlan(ArgValidatorDict):
+
+ VALID_MODES = ["vepa", "bridge", "private", "passthru", "source"]
+
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="macvlan",
+ nested=[
+ ArgValidatorStr(
+ "mode",
+ enum_values=ArgValidator_DictMacvlan.VALID_MODES,
+ default_value="bridge",
+ ),
+ ArgValidatorBool("promiscuous", default_value=True),
+ ArgValidatorBool("tap", default_value=False),
+ ],
+ default_value=ArgValidator.MISSING,
+ )
+
+ def get_default_macvlan(self):
+ return {"mode": "bridge", "promiscuous": True, "tap": False}
+
+ def _validate_post(self, value, name, result):
+ if result["promiscuous"] is False and result["mode"] != "passthru":
+ raise ValidationError(
+ name, "non promiscuous operation is allowed only in passthru mode"
+ )
+ return result
+
+
+class ArgValidator_DictConnection(ArgValidatorDict):
+
+ VALID_PERSISTENT_STATES = ["absent", "present"]
+ VALID_STATES = VALID_PERSISTENT_STATES + ["up", "down"]
+ VALID_TYPES = [
+ "ethernet",
+ "infiniband",
+ "bridge",
+ "team",
+ "bond",
+ "vlan",
+ "macvlan",
+ ]
+ VALID_SLAVE_TYPES = ["bridge", "bond", "team"]
+
+ def __init__(self):
+ ArgValidatorDict.__init__(
+ self,
+ name="connections[?]",
+ nested=[
+ ArgValidatorStr("name"),
+ ArgValidatorStr(
+ "state", enum_values=ArgValidator_DictConnection.VALID_STATES
+ ),
+ ArgValidatorStr(
+ "persistent_state",
+ enum_values=ArgValidator_DictConnection.VALID_PERSISTENT_STATES,
+ ),
+ ArgValidatorBool("force_state_change", default_value=None),
+ ArgValidatorNum(
+ "wait",
+ val_min=0,
+ val_max=3600,
+ numeric_type=float,
+ default_value=None,
+ ),
+ ArgValidatorStr(
+ "type", enum_values=ArgValidator_DictConnection.VALID_TYPES
+ ),
+ ArgValidatorBool("autoconnect", default_value=True),
+ ArgValidatorStr(
+ "slave_type",
+ enum_values=ArgValidator_DictConnection.VALID_SLAVE_TYPES,
+ ),
+ ArgValidatorStr("master"),
+ ArgValidatorStr("interface_name", allow_empty=True),
+ ArgValidatorMac("mac"),
+ ArgValidatorNum(
+ "mtu", val_min=0, val_max=0xFFFFFFFF, default_value=None
+ ),
+ ArgValidatorStr("zone"),
+ ArgValidatorBool("check_iface_exists", default_value=True),
+ ArgValidatorStr("parent"),
+ ArgValidatorBool("ignore_errors", default_value=None),
+ ArgValidator_DictIP(),
+ ArgValidator_DictEthernet(),
+ ArgValidator_DictBond(),
+ ArgValidator_DictInfiniband(),
+ ArgValidator_DictVlan(),
+ ArgValidator_DictMacvlan(),
+ # deprecated options:
+ ArgValidatorStr(
+ "infiniband_transport_mode",
+ enum_values=["datagram", "connected"],
+ default_value=ArgValidator.MISSING,
+ ),
+ ArgValidatorNum(
+ "infiniband_p_key",
+ val_min=-1,
+ val_max=0xFFFF,
+ default_value=ArgValidator.MISSING,
+ ),
+ ArgValidatorNum(
+ "vlan_id",
+ val_min=0,
+ val_max=4094,
+ default_value=ArgValidator.MISSING,
+ ),
+ ],
+ default_value=dict,
+ all_missing_during_validate=True,
+ )
+
+ # valid field based on specified state, used to set defaults and reject
+ # bad values
+ self.VALID_FIELDS = []
+
+ def _validate_post_state(self, value, name, result):
+ """
+ Validate state definitions and create a corresponding list of actions.
+ """
+ actions = []
+ state = result.get("state")
+ if state in self.VALID_PERSISTENT_STATES:
+ del result["state"]
+ persistent_state_default = state
+ state = None
+ else:
+ persistent_state_default = None
+
+ persistent_state = result.get("persistent_state", persistent_state_default)
+
+ # default persistent_state to present (not done via default_value in the
+ # ArgValidatorStr, the value will only be set at the end of
+ # _validate_post()
+ if not persistent_state:
+ persistent_state = "present"
+
+ # If the profile is present, it should be ensured first
+ if persistent_state == "present":
+ actions.append(persistent_state)
+
+ # If the profile should be absent at the end, it needs to be present in
+ # the meantime to allow to (de)activate it
+ if persistent_state == "absent" and state:
+ actions.append("present")
+
+ # Change the runtime state if necessary
+ if state:
+ actions.append(state)
+
+ # Remove the profile in the end if requested
+ if persistent_state == "absent":
+ actions.append(persistent_state)
+
+ result["state"] = state
+ result["persistent_state"] = persistent_state
+ result["actions"] = actions
+
+ return result
+
+ def _validate_post_fields(self, value, name, result):
+ """
+ Validate the allowed fields (settings depending on the requested state).
+ FIXME: Maybe it should check whether "up"/"down" is present in the
+ actions instead of checking the runtime state from "state" to switch
+ from state to actions after the state parsing is done.
+ """
+ state = result.get("state")
+ persistent_state = result.get("persistent_state")
+
+ # minimal settings not related to runtime changes
+ valid_fields = ["actions", "ignore_errors", "name", "persistent_state", "state"]
+
+ # when type is present, a profile is completely specified (using
+ # defaults or other settings)
+ if "type" in result:
+ valid_fields += list(self.nested.keys())
+
+ # If there are no runtime changes, "wait" and "force_state_change" do
+ # not make sense
+ # FIXME: Maybe this restriction can be removed. Need to make sure that
+ # defaults for wait or force_state_change do not interfer
+ if not state:
+ while "wait" in valid_fields:
+ valid_fields.remove("wait")
+ while "force_state_change" in valid_fields:
+ valid_fields.remove("force_state_change")
+ else:
+ valid_fields += ["force_state_change", "wait"]
+
+ # FIXME: Maybe just accept all values, even if they are not
+ # needed/meaningful in the respective context
+ valid_fields = set(valid_fields)
+ for k in result:
+ if k not in valid_fields:
+ raise ValidationError(
+ name + "." + k,
+ "property is not allowed for state '%s' and persistent_state '%s'"
+ % (state, persistent_state),
+ )
+
+ if "name" not in result:
+ if persistent_state == "absent":
+ result["name"] = "" # set to empty string to mean *absent all others*
+ else:
+ raise ValidationError(name, "missing 'name'")
+
+ # FIXME: Seems to be a duplicate check since "wait" will be removed from
+ # valid_keys when state is considered to be not True
+ if "wait" in result and not state:
+ raise ValidationError(
+ name + ".wait",
+ "'wait' is not allowed for state '%s'" % (result["state"]),
+ )
+
+ result["state"] = state
+ result["persistent_state"] = persistent_state
+
+ self.VALID_FIELDS = valid_fields
+ return result
+
+ def _validate_post(self, value, name, result):
+ result = self._validate_post_state(value, name, result)
+ result = self._validate_post_fields(value, name, result)
+
+ if "type" in result:
+
+ if "master" in result:
+ if "slave_type" not in result:
+ result["slave_type"] = None
+ if result["master"] == result["name"]:
+ raise ValidationError(
+ name + ".master", '"master" cannot refer to itself'
+ )
+ else:
+ if "slave_type" in result:
+ raise ValidationError(
+ name + ".slave_type",
+ "'slave_type' requires a 'master' property",
+ )
+
+ if "ip" in result:
+ if "master" in result:
+ raise ValidationError(
+ name + ".ip", 'a slave cannot have an "ip" property'
+ )
+ else:
+ if "master" not in result:
+ result["ip"] = self.nested["ip"].get_default_value()
+
+ if "zone" in result:
+ if "master" in result:
+ raise ValidationError(
+ name + ".zone", '"zone" cannot be configured for slave types'
+ )
+ else:
+ result["zone"] = None
+
+ if "mac" in result:
+ if result["type"] not in ["ethernet", "infiniband"]:
+ raise ValidationError(
+ name + ".mac",
+ "a 'mac' address is only allowed for type 'ethernet' "
+ "or 'infiniband'",
+ )
+ maclen = len(Util.mac_aton(result["mac"]))
+ if result["type"] == "ethernet" and maclen != 6:
+ raise ValidationError(
+ name + ".mac",
+ "a 'mac' address for type ethernet requires 6 octets "
+ "but is '%s'" % result["mac"],
+ )
+ if result["type"] == "infiniband" and maclen != 20:
+ raise ValidationError(
+ name + ".mac",
+ "a 'mac' address for type ethernet requires 20 octets "
+ "but is '%s'" % result["mac"],
+ )
+
+ if result["type"] == "infiniband":
+ if "infiniband" not in result:
+ result["infiniband"] = self.nested[
+ "infiniband"
+ ].get_default_infiniband()
+ if "infiniband_transport_mode" in result:
+ result["infiniband"]["transport_mode"] = result[
+ "infiniband_transport_mode"
+ ]
+ del result["infiniband_transport_mode"]
+ if "infiniband_p_key" in result:
+ result["infiniband"]["p_key"] = result["infiniband_p_key"]
+ del result["infiniband_p_key"]
+ else:
+ if "infiniband_transport_mode" in result:
+ raise ValidationError(
+ name + ".infiniband_transport_mode",
+ "cannot mix deprecated 'infiniband_transport_mode' "
+ "property with 'infiniband' settings",
+ )
+ if "infiniband_p_key" in result:
+ raise ValidationError(
+ name + ".infiniband_p_key",
+ "cannot mix deprecated 'infiniband_p_key' property "
+ "with 'infiniband' settings",
+ )
+ if result["infiniband"]["transport_mode"] is None:
+ result["infiniband"]["transport_mode"] = "datagram"
+ if result["infiniband"]["p_key"] != -1:
+ if "mac" not in result and "parent" not in result:
+ raise ValidationError(
+ name + ".infiniband.p_key",
+ "a infiniband device with 'infiniband.p_key' "
+ "property also needs 'mac' or 'parent' property",
+ )
+ else:
+ if "infiniband" in result:
+ raise ValidationError(
+ name + ".infiniband",
+ "'infiniband' settings are only allowed for type 'infiniband'",
+ )
+ if "infiniband_transport_mode" in result:
+ raise ValidationError(
+ name + ".infiniband_transport_mode",
+ "a 'infiniband_transport_mode' property is only "
+ "allowed for type 'infiniband'",
+ )
+ if "infiniband_p_key" in result:
+ raise ValidationError(
+ name + ".infiniband_p_key",
+ "a 'infiniband_p_key' property is only allowed for "
+ "type 'infiniband'",
+ )
+
+ if "interface_name" in result:
+ # Ignore empty interface_name
+ if result["interface_name"] == "":
+ del result["interface_name"]
+ elif not Util.ifname_valid(result["interface_name"]):
+ raise ValidationError(
+ name + ".interface_name",
+ "invalid 'interface_name' '%s'" % (result["interface_name"]),
+ )
+ else:
+ if not result.get("mac"):
+ if not Util.ifname_valid(result["name"]):
+ raise ValidationError(
+ name + ".interface_name",
+ "'interface_name' as 'name' '%s' is not valid"
+ % (result["name"]),
+ )
+ result["interface_name"] = result["name"]
+
+ if "interface_name" not in result and result["type"] in [
+ "bond",
+ "bridge",
+ "macvlan",
+ "team",
+ "vlan",
+ ]:
+ raise ValidationError(
+ name + ".interface_name",
+ "type '%s' requires 'interface_name'" % (result["type"]),
+ )
+
+ if result["type"] == "vlan":
+ if "vlan" not in result:
+ if "vlan_id" not in result:
+ raise ValidationError(
+ name + ".vlan", 'missing "vlan" settings for "type" "vlan"'
+ )
+ result["vlan"] = self.nested["vlan"].get_default_vlan()
+ result["vlan"]["id"] = result["vlan_id"]
+ del result["vlan_id"]
+ else:
+ if "vlan_id" in result:
+ raise ValidationError(
+ name + ".vlan_id",
+ "don't use the deprecated 'vlan_id' together with the "
+ "'vlan' settings'",
+ )
+ if "parent" not in result:
+ raise ValidationError(
+ name + ".parent", 'missing "parent" for "type" "vlan"'
+ )
+ else:
+ if "vlan" in result:
+ raise ValidationError(
+ name + ".vlan", '"vlan" is only allowed for "type" "vlan"'
+ )
+ if "vlan_id" in result:
+ raise ValidationError(
+ name + ".vlan_id", '"vlan_id" is only allowed for "type" "vlan"'
+ )
+
+ if "parent" in result:
+ if result["type"] not in ["vlan", "macvlan", "infiniband"]:
+ raise ValidationError(
+ name + ".parent",
+ "'parent' is only allowed for type 'vlan', 'macvlan' or "
+ "'infiniband'",
+ )
+ if result["parent"] == result["name"]:
+ raise ValidationError(
+ name + ".parent", '"parent" cannot refer to itself'
+ )
+
+ if result["type"] == "bond":
+ if "bond" not in result:
+ result["bond"] = self.nested["bond"].get_default_bond()
+ else:
+ if "bond" in result:
+ raise ValidationError(
+ name + ".bond",
+ "'bond' settings are not allowed for 'type' '%s'"
+ % (result["type"]),
+ )
+
+ if result["type"] in ["ethernet", "vlan", "bridge", "bond", "team"]:
+ if "ethernet" not in result:
+ result["ethernet"] = self.nested["ethernet"].get_default_ethernet()
+ else:
+ if "ethernet" in result:
+ raise ValidationError(
+ name + ".ethernet",
+ "'ethernet' settings are not allowed for 'type' '%s'"
+ % (result["type"]),
+ )
+
+ if result["type"] == "macvlan":
+ if "macvlan" not in result:
+ result["macvlan"] = self.nested["macvlan"].get_default_macvlan()
+ else:
+ if "macvlan" in result:
+ raise ValidationError(
+ name + ".macvlan",
+ "'macvlan' settings are not allowed for 'type' '%s'"
+ % (result["type"]),
+ )
+
+ for k in self.VALID_FIELDS:
+ if k in result:
+ continue
+ v = self.nested[k]
+ vv = v.get_default_value()
+ if vv is not ArgValidator.MISSING:
+ result[k] = vv
+
+ return result
+
+
+class ArgValidator_ListConnections(ArgValidatorList):
+ def __init__(self):
+ ArgValidatorList.__init__(
+ self,
+ name="connections",
+ nested=ArgValidator_DictConnection(),
+ default_value=list,
+ )
+
+ def _validate_post(self, value, name, result):
+ for idx, connection in enumerate(result):
+ if "type" in connection:
+ if connection["master"]:
+ c = ArgUtil.connection_find_by_name(
+ connection["master"], result, idx
+ )
+ if not c:
+ raise ValidationError(
+ name + "[" + str(idx) + "].master",
+ "references non-existing 'master' connection '%s'"
+ % (connection["master"]),
+ )
+ if c["type"] not in ArgValidator_DictConnection.VALID_SLAVE_TYPES:
+ raise ValidationError(
+ name + "[" + str(idx) + "].master",
+ "references 'master' connection '%s' which is not a master "
+ "type by '%s'" % (connection["master"], c["type"]),
+ )
+ if connection["slave_type"] is None:
+ connection["slave_type"] = c["type"]
+ elif connection["slave_type"] != c["type"]:
+ raise ValidationError(
+ name + "[" + str(idx) + "].master",
+ "references 'master' connection '%s' which is of type '%s' "
+ "instead of slave_type '%s'"
+ % (
+ connection["master"],
+ c["type"],
+ connection["slave_type"],
+ ),
+ )
+ if connection["parent"]:
+ if not ArgUtil.connection_find_by_name(
+ connection["parent"], result, idx
+ ):
+ raise ValidationError(
+ name + "[" + str(idx) + "].parent",
+ "references non-existing 'parent' connection '%s'"
+ % (connection["parent"]),
+ )
+ return result
+
+ VALIDATE_ONE_MODE_NM = "nm"
+ VALIDATE_ONE_MODE_INITSCRIPTS = "initscripts"
+
+ def validate_connection_one(self, mode, connections, idx):
+ connection = connections[idx]
+ if "type" not in connection:
+ return
+
+ if (connection["parent"]) and (
+ (
+ (mode == self.VALIDATE_ONE_MODE_INITSCRIPTS)
+ and (connection["type"] == "vlan")
+ )
+ or (
+ (connection["type"] == "infiniband")
+ and (connection["infiniband"]["p_key"] != -1)
+ )
+ ):
+ try:
+ ArgUtil.connection_find_master(connection["parent"], connections, idx)
+ except MyError:
+ raise ValidationError.from_connection(
+ idx,
+ "profile references a parent '%s' which has 'interface_name' "
+ "missing" % (connection["parent"]),
+ )
+
+ if (connection["master"]) and (mode == self.VALIDATE_ONE_MODE_INITSCRIPTS):
+ try:
+ ArgUtil.connection_find_master(connection["master"], connections, idx)
+ except MyError:
+ raise ValidationError.from_connection(
+ idx,
+ "profile references a master '%s' which has 'interface_name' "
+ "missing" % (connection["master"]),
+ )
diff --git a/module_utils/network_lsr/utils.py b/module_utils/network_lsr/utils.py
new file mode 100644
index 0000000..ff16bfd
--- /dev/null
+++ b/module_utils/network_lsr/utils.py
@@ -0,0 +1,282 @@
+#!/usr/bin/python3 -tt
+# SPDX-License-Identifier: BSD-3-Clause
+# vim: fileencoding=utf8
+
+import os
+import socket
+import sys
+
+# pylint: disable=import-error, no-name-in-module
+from ansible.module_utils.network_lsr import MyError
+
+
+class Util:
+
+ PY3 = sys.version_info[0] == 3
+
+ STRING_TYPE = str if PY3 else basestring # noqa:F821
+
+ @staticmethod
+ def first(iterable, default=None, pred=None):
+ for v in iterable:
+ if pred is None or pred(v):
+ return v
+ return default
+
+ @staticmethod
+ def check_output(argv):
+ # subprocess.check_output is python 2.7.
+ with open("/dev/null", "wb") as DEVNULL:
+ import subprocess
+
+ env = os.environ.copy()
+ env["LANG"] = "C"
+ p = subprocess.Popen(argv, stdout=subprocess.PIPE, stderr=DEVNULL, env=env)
+ # FIXME: Can we assume this to always be UTF-8?
+ out = p.communicate()[0].decode("UTF-8")
+ if p.returncode != 0:
+ raise MyError("failure calling %s: exit with %s" % (argv, p.returncode))
+ return out
+
+ @classmethod
+ def create_uuid(cls):
+ cls.NM()
+ return str(cls._uuid.uuid4())
+
+ @classmethod
+ def NM(cls):
+ n = getattr(cls, "_NM", None)
+ if n is None:
+ # Installing pygobject in a tox virtualenv does not work out of the
+ # box
+ # pylint: disable=import-error
+ import gi
+
+ gi.require_version("NM", "1.0")
+ from gi.repository import NM, GLib, Gio, GObject
+
+ cls._NM = NM
+ cls._GLib = GLib
+ cls._Gio = Gio
+ cls._GObject = GObject
+ n = NM
+ import uuid
+
+ cls._uuid = uuid
+ return n
+
+ @classmethod
+ def GLib(cls):
+ cls.NM()
+ return cls._GLib
+
+ @classmethod
+ def Gio(cls):
+ cls.NM()
+ return cls._Gio
+
+ @classmethod
+ def GObject(cls):
+ cls.NM()
+ return cls._GObject
+
+ @classmethod
+ def Timestamp(cls):
+ return cls.GLib().get_monotonic_time()
+
+ @classmethod
+ def GMainLoop(cls):
+ gmainloop = getattr(cls, "_GMainLoop", None)
+ if gmainloop is None:
+ gmainloop = cls.GLib().MainLoop()
+ cls._GMainLoop = gmainloop
+ return gmainloop
+
+ @classmethod
+ def GMainLoop_run(cls, timeout=None):
+ if timeout is None:
+ cls.GMainLoop().run()
+ return True
+
+ GLib = cls.GLib()
+ result = []
+ loop = cls.GMainLoop()
+
+ def _timeout_cb(unused):
+ result.append(1)
+ loop.quit()
+ return False
+
+ timeout_id = GLib.timeout_add(int(timeout * 1000), _timeout_cb, None)
+ loop.run()
+ if result:
+ return False
+ GLib.source_remove(timeout_id)
+ return True
+
+ @classmethod
+ def GMainLoop_iterate(cls, may_block=False):
+ return cls.GMainLoop().get_context().iteration(may_block)
+
+ @classmethod
+ def GMainLoop_iterate_all(cls):
+ c = 0
+ while cls.GMainLoop_iterate():
+ c += 1
+ return c
+
+ @classmethod
+ def create_cancellable(cls):
+ return cls.Gio().Cancellable.new()
+
+ @classmethod
+ def error_is_cancelled(cls, e):
+ GLib = cls.GLib()
+ if isinstance(e, GLib.GError):
+ if (
+ e.domain == "g-io-error-quark"
+ and e.code == cls.Gio().IOErrorEnum.CANCELLED
+ ):
+ return True
+ return False
+
+ @staticmethod
+ def ifname_valid(ifname):
+ # see dev_valid_name() in kernel's net/core/dev.c
+ if not ifname:
+ return False
+ if ifname in [".", ".."]:
+ return False
+ if len(ifname) >= 16:
+ return False
+ if any([c == "/" or c == ":" or c.isspace() for c in ifname]):
+ return False
+ # FIXME: encoding issues regarding python unicode string
+ return True
+
+ @staticmethod
+ def mac_aton(mac_str, force_len=None):
+ # we also accept None and '' for convenience.
+ # - None yiels None
+ # - '' yields []
+ if mac_str is None:
+ return mac_str
+ i = 0
+ b = []
+ for c in mac_str:
+ if i == 2:
+ if c != ":":
+ raise MyError("not a valid MAC address: '%s'" % (mac_str))
+ i = 0
+ continue
+ try:
+ if i == 0:
+ n = int(c, 16) * 16
+ i = 1
+ else:
+ assert i == 1
+ n = n + int(c, 16)
+ i = 2
+ b.append(n)
+ except Exception:
+ raise MyError("not a valid MAC address: '%s'" % (mac_str))
+ if i == 1:
+ raise MyError("not a valid MAC address: '%s'" % (mac_str))
+ if force_len is not None:
+ if force_len != len(b):
+ raise MyError(
+ "not a valid MAC address of length %s: '%s'" % (force_len, mac_str)
+ )
+ return b
+
+ @staticmethod
+ def mac_ntoa(mac):
+ if mac is None:
+ return None
+ return ":".join(["%02x" % c for c in mac])
+
+ @staticmethod
+ def mac_norm(mac_str, force_len=None):
+ return Util.mac_ntoa(Util.mac_aton(mac_str, force_len))
+
+ @staticmethod
+ def boolean(arg):
+ if arg is None or isinstance(arg, bool):
+ return arg
+ arg0 = arg
+ if isinstance(arg, Util.STRING_TYPE):
+ arg = arg.lower()
+
+ if arg in ["y", "yes", "on", "1", "true", 1, True]:
+ return True
+ if arg in ["n", "no", "off", "0", "false", 0, False]:
+ return False
+
+ raise MyError("value '%s' is not a boolean" % (arg0))
+
+ @staticmethod
+ def parse_ip(addr, family=None):
+ if addr is None:
+ return (None, None)
+ if family is not None:
+ Util.addr_family_check(family)
+ a = socket.inet_pton(family, addr)
+ else:
+ a = None
+ family = None
+ try:
+ a = socket.inet_pton(socket.AF_INET, addr)
+ family = socket.AF_INET
+ except Exception:
+ a = socket.inet_pton(socket.AF_INET6, addr)
+ family = socket.AF_INET6
+ return (socket.inet_ntop(family, a), family)
+
+ @staticmethod
+ def addr_family_check(family):
+ if family != socket.AF_INET and family != socket.AF_INET6:
+ raise MyError("invalid address family %s" % (family))
+
+ @staticmethod
+ def addr_family_to_v(family):
+ if family is None:
+ return ""
+ if family == socket.AF_INET:
+ return "v4"
+ if family == socket.AF_INET6:
+ return "v6"
+ raise MyError("invalid address family '%s'" % (family))
+
+ @staticmethod
+ def addr_family_default_prefix(family):
+ Util.addr_family_check(family)
+ if family == socket.AF_INET:
+ return 24
+ else:
+ return 64
+
+ @staticmethod
+ def addr_family_valid_prefix(family, prefix):
+ Util.addr_family_check(family)
+ if family == socket.AF_INET:
+ m = 32
+ else:
+ m = 128
+ return prefix >= 0 and prefix <= m
+
+ @staticmethod
+ def parse_address(address, family=None):
+ try:
+ parts = address.split()
+ addr_parts = parts[0].split("/")
+ if len(addr_parts) != 2:
+ raise MyError("expect two addr-parts: ADDR/PLEN")
+ a, family = Util.parse_ip(addr_parts[0], family)
+ prefix = int(addr_parts[1])
+ if not Util.addr_family_valid_prefix(family, prefix):
+ raise MyError("invalid prefix %s" % (prefix))
+ if len(parts) > 1:
+ raise MyError("too many parts")
+ return {"address": a, "family": family, "prefix": prefix}
+ except Exception:
+ raise MyError("invalid address '%s'" % (address))
diff --git a/pylintrc b/pylintrc
index 407924c..2f07798 100644
--- a/pylintrc
+++ b/pylintrc
@@ -16,7 +16,7 @@ ignore-patterns=
# Python code to execute, usually for sys.path manipulation such as
# pygtk.require().
-init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()) + '/library')"
+init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()) + '/library'); sys.path.append(os.path.dirname(find_pylintrc()) + '/module_utils'); sys.path.append(os.path.dirname(find_pylintrc()) + '/tests')"
# Use multiple processes to speed up Pylint.
diff --git a/tasks/main.yml b/tasks/main.yml
index 8d162b3..5f33ce9 100644
--- a/tasks/main.yml
+++ b/tasks/main.yml
@@ -6,6 +6,7 @@
# needed for ansible_facts.packages
- name: Check which packages are installed
package_facts:
+ no_log: true
# Depending on the plugins, checking installed packages might be slow
# for example subscription manager might slow this down
diff --git a/tests/remove-profile.yml b/tests/remove-profile.yml
index 95496aa..a50e848 100644
--- a/tests/remove-profile.yml
+++ b/tests/remove-profile.yml
@@ -5,6 +5,6 @@
vars:
network_connections:
- name: "{{ profile }}"
- state: absent
+ persistent_state: absent
roles:
- linux-system-roles.network
diff --git a/tests/roles/linux-system-roles.network/module_utils b/tests/roles/linux-system-roles.network/module_utils
new file mode 120000
index 0000000..ad35115
--- /dev/null
+++ b/tests/roles/linux-system-roles.network/module_utils
@@ -0,0 +1 @@
+../../../module_utils/
\ No newline at end of file
diff --git a/tests/test_network_connections.py b/tests/test_network_connections.py
index a386044..b420ced 100755
--- a/tests/test_network_connections.py
+++ b/tests/test_network_connections.py
@@ -11,7 +11,20 @@
TESTS_BASEDIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(1, os.path.join(TESTS_BASEDIR, "..", "library"))
+sys.path.insert(1, os.path.join(TESTS_BASEDIR, "..", "module_utils"))
+try:
+ from unittest import mock
+except ImportError: # py2
+ import mock
+
+sys.modules["ansible"] = mock.Mock()
+sys.modules["ansible.module_utils.basic"] = mock.Mock()
+sys.modules["ansible.module_utils"] = mock.Mock()
+sys.modules["ansible.module_utils.network_lsr"] = __import__("network_lsr")
+
+# pylint: disable=import-error
+import network_lsr
import network_connections as n
from network_connections import SysUtil
@@ -51,15 +64,47 @@ def pprint(msg, obj):
obj.dump()
-ARGS_CONNECTIONS = n.ArgValidator_ListConnections()
+ARGS_CONNECTIONS = network_lsr.argument_validator.ArgValidator_ListConnections()
+VALIDATE_ONE_MODE_INITSCRIPTS = ARGS_CONNECTIONS.VALIDATE_ONE_MODE_INITSCRIPTS
class TestValidator(unittest.TestCase):
+ def setUp(self):
+ # default values when "type" is specified and state is not
+ self.default_connection_settings = {
+ "autoconnect": True,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "ignore_errors": None,
+ "ip": {
+ "gateway6": None,
+ "gateway4": None,
+ "route_metric4": None,
+ "auto6": True,
+ "dhcp4": True,
+ "address": [],
+ "route_append_only": False,
+ "rule_append_only": False,
+ "route": [],
+ "route_metric6": None,
+ "dhcp4_send_hostname": None,
+ "dns": [],
+ "dns_search": [],
+ },
+ "mac": None,
+ "master": None,
+ "mtu": None,
+ "name": "5",
+ "parent": None,
+ "slave_type": None,
+ "zone": None,
+ }
+
def assertValidationError(self, v, value):
self.assertRaises(n.ValidationError, v.validate, value)
def assert_nm_connection_routes_expected(self, connection, route_list_expected):
- parser = n.ArgValidatorIPRoute("route[?]")
+ parser = network_lsr.argument_validator.ArgValidatorIPRoute("route[?]")
route_list_exp = [parser.validate(r) for r in route_list_expected]
route_list_new = itertools.chain(
nmutil.setting_ip_config_get_routes(
@@ -92,7 +137,7 @@ def do_connections_validate_nm(self, input_connections, **kwargs):
if "type" in connection:
connection["nm.exists"] = False
connection["nm.uuid"] = n.Util.create_uuid()
- mode = n.ArgValidator_ListConnections.VALIDATE_ONE_MODE_INITSCRIPTS
+ mode = VALIDATE_ONE_MODE_INITSCRIPTS
for idx, connection in enumerate(connections):
try:
ARGS_CONNECTIONS.validate_connection_one(mode, connections, idx)
@@ -103,7 +148,9 @@ def do_connections_validate_nm(self, input_connections, **kwargs):
self.assertTrue(con_new)
self.assertTrue(con_new.verify())
if "nm_route_list_current" in kwargs:
- parser = n.ArgValidatorIPRoute("route[?]")
+ parser = network_lsr.argument_validator.ArgValidatorIPRoute(
+ "route[?]"
+ )
s4 = con_new.get_setting(NM.SettingIP4Config)
s6 = con_new.get_setting(NM.SettingIP6Config)
s4.clear_routes()
@@ -132,7 +179,7 @@ def do_connections_validate_nm(self, input_connections, **kwargs):
)
def do_connections_validate_ifcfg(self, input_connections, **kwargs):
- mode = n.ArgValidator_ListConnections.VALIDATE_ONE_MODE_INITSCRIPTS
+ mode = VALIDATE_ONE_MODE_INITSCRIPTS
connections = ARGS_CONNECTIONS.validate(input_connections)
for idx, connection in enumerate(connections):
try:
@@ -165,24 +212,26 @@ def do_connections_validate(
def test_validate_str(self):
- v = n.ArgValidatorStr("state")
+ v = network_lsr.argument_validator.ArgValidatorStr("state")
self.assertEqual("a", v.validate("a"))
self.assertValidationError(v, 1)
self.assertValidationError(v, None)
- v = n.ArgValidatorStr("state", required=True)
+ v = network_lsr.argument_validator.ArgValidatorStr("state", required=True)
self.assertValidationError(v, None)
def test_validate_int(self):
- v = n.ArgValidatorNum("state", default_value=None, numeric_type=float)
+ v = network_lsr.argument_validator.ArgValidatorNum(
+ "state", default_value=None, numeric_type=float
+ )
self.assertEqual(1, v.validate(1))
self.assertEqual(1.5, v.validate(1.5))
self.assertEqual(1.5, v.validate("1.5"))
self.assertValidationError(v, None)
self.assertValidationError(v, "1a")
- v = n.ArgValidatorNum("state", default_value=None)
+ v = network_lsr.argument_validator.ArgValidatorNum("state", default_value=None)
self.assertEqual(1, v.validate(1))
self.assertEqual(1, v.validate(1.0))
self.assertEqual(1, v.validate("1"))
@@ -191,12 +240,12 @@ def test_validate_int(self):
self.assertValidationError(v, 1.5)
self.assertValidationError(v, "1.5")
- v = n.ArgValidatorNum("state", required=True)
+ v = network_lsr.argument_validator.ArgValidatorNum("state", required=True)
self.assertValidationError(v, None)
def test_validate_bool(self):
- v = n.ArgValidatorBool("state")
+ v = network_lsr.argument_validator.ArgValidatorBool("state")
self.assertEqual(True, v.validate("yes"))
self.assertEqual(True, v.validate("yeS"))
self.assertEqual(True, v.validate("Y"))
@@ -218,18 +267,22 @@ def test_validate_bool(self):
self.assertValidationError(v, "Ye")
self.assertValidationError(v, "")
self.assertValidationError(v, None)
- v = n.ArgValidatorBool("state", required=True)
+ v = network_lsr.argument_validator.ArgValidatorBool("state", required=True)
self.assertValidationError(v, None)
def test_validate_dict(self):
- v = n.ArgValidatorDict(
+ v = network_lsr.argument_validator.ArgValidatorDict(
"dict",
nested=[
- n.ArgValidatorNum("i", required=True),
- n.ArgValidatorStr("s", required=False, default_value="s_default"),
- n.ArgValidatorStr(
- "l", required=False, default_value=n.ArgValidator.MISSING
+ network_lsr.argument_validator.ArgValidatorNum("i", required=True),
+ network_lsr.argument_validator.ArgValidatorStr(
+ "s", required=False, default_value="s_default"
+ ),
+ network_lsr.argument_validator.ArgValidatorStr(
+ "l",
+ required=False,
+ default_value=network_lsr.argument_validator.ArgValidator.MISSING,
),
],
)
@@ -242,24 +295,27 @@ def test_validate_dict(self):
def test_validate_list(self):
- v = n.ArgValidatorList("list", nested=n.ArgValidatorNum("i"))
+ v = network_lsr.argument_validator.ArgValidatorList(
+ "list", nested=network_lsr.argument_validator.ArgValidatorNum("i")
+ )
self.assertEqual([1, 5], v.validate(["1", 5]))
self.assertValidationError(v, [1, "s"])
- def test_1(self):
-
+ def test_empty(self):
self.maxDiff = None
-
self.do_connections_validate([], [])
+ def test_ethernet_two_defaults(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
- "name": "5",
- "state": "present",
- "type": "ethernet",
+ "actions": ["present"],
"autoconnect": True,
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "ignore_errors": None,
+ "interface_name": "5",
"ip": {
"gateway6": None,
"gateway4": None,
@@ -275,34 +331,40 @@ def test_1(self):
"dns": [],
"dns_search": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": "5",
- "check_iface_exists": True,
+ "mtu": None,
+ "name": "5",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": None,
+ "type": "ethernet",
+ "zone": None,
},
{
- "name": "5",
- "state": "up",
- "force_state_change": None,
- "wait": None,
+ "actions": ["present"],
"ignore_errors": None,
+ "name": "5",
+ "persistent_state": "present",
+ "state": None,
},
],
[{"name": "5", "type": "ethernet"}, {"name": "5"}],
)
+
+ def test_up_ethernet(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
- "name": "5",
- "state": "up",
- "type": "ethernet",
+ "actions": ["present", "up"],
"autoconnect": True,
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "5",
"ip": {
"gateway6": None,
"gateway4": None,
@@ -318,29 +380,34 @@ def test_1(self):
"route_metric6": None,
"dhcp4_send_hostname": None,
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": "5",
- "check_iface_exists": True,
- "force_state_change": None,
+ "mtu": None,
+ "name": "5",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
}
],
[{"name": "5", "state": "up", "type": "ethernet"}],
)
+
+ def test_up_ethernet_no_autoconnect(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
- "name": "5",
- "state": "up",
- "type": "ethernet",
+ "actions": ["present", "up"],
"autoconnect": False,
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "5",
"ip": {
"gateway6": None,
"gateway4": None,
@@ -356,17 +423,17 @@ def test_1(self):
"route_metric6": None,
"dhcp4_send_hostname": None,
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": "5",
- "check_iface_exists": True,
- "force_state_change": None,
+ "mtu": None,
+ "name": "5",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
}
],
[{"name": "5", "state": "up", "type": "ethernet", "autoconnect": "no"}],
@@ -390,19 +457,37 @@ def test_1(self):
],
)
+ def test_invalid_autoconnect(self):
+ self.maxDiff = None
self.do_connections_check_invalid([{"name": "a", "autoconnect": True}])
+ def test_absent(self):
+ self.maxDiff = None
self.do_connections_validate(
- [{"name": "5", "state": "absent", "ignore_errors": None}],
- [{"name": "5", "state": "absent"}],
+ [
+ {
+ "actions": ["absent"],
+ "ignore_errors": None,
+ "name": "5",
+ "persistent_state": "absent",
+ "state": None,
+ }
+ ],
+ [{"name": "5", "persistent_state": "absent"}],
)
+ def test_up_ethernet_mac_mtu_static_ip(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod1",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": None,
"ip": {
"dhcp4": False,
"route_metric6": None,
@@ -424,19 +509,17 @@ def test_1(self):
"rule_append_only": False,
"route": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
- "state": "up",
- "mtu": 1450,
- "check_iface_exists": True,
- "force_state_change": None,
"mac": "52:54:00:44:9f:ba",
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": None,
- "type": "ethernet",
+ "mtu": 1450,
+ "name": "prod1",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
}
],
[
@@ -452,13 +535,19 @@ def test_1(self):
],
)
+ def test_up_single_v4_dns(self):
+ self.maxDiff = None
# set single IPv4 DNS server
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod1",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "prod1",
"ip": {
"dhcp4": False,
"route_metric6": None,
@@ -480,19 +569,17 @@ def test_1(self):
"rule_append_only": False,
"route": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
- "state": "up",
- "check_iface_exists": True,
- "force_state_change": None,
- "zone": None,
"mac": None,
"master": None,
"mtu": None,
- "ignore_errors": None,
- "interface_name": "prod1",
- "type": "ethernet",
+ "name": "prod1",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
}
],
[
@@ -505,12 +592,19 @@ def test_1(self):
}
],
)
+
+ def test_routes(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod1",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": None,
"ip": {
"dhcp4": False,
"auto6": True,
@@ -537,24 +631,26 @@ def test_1(self):
"gateway4": None,
"dns": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
- "state": "up",
- "mtu": 1450,
- "check_iface_exists": True,
- "force_state_change": None,
"mac": "52:54:00:44:9f:ba",
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": None,
- "type": "ethernet",
+ "mtu": 1450,
+ "name": "prod1",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
},
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod.100",
- "parent": "prod1",
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "prod.100",
"ip": {
"dhcp4": False,
"route_metric6": None,
@@ -589,20 +685,18 @@ def test_1(self):
}
],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
- "state": "up",
"master": None,
+ "mtu": None,
+ "name": "prod.100",
+ "parent": "prod1",
+ "persistent_state": "present",
"slave_type": None,
- "ignore_errors": None,
- "interface_name": "prod.100",
+ "state": "up",
"type": "vlan",
"vlan": {"id": 100},
"wait": None,
+ "zone": None,
},
],
[
@@ -634,12 +728,18 @@ def test_1(self):
],
)
+ def test_vlan(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod1",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": None,
"ip": {
"dhcp4": False,
"auto6": True,
@@ -666,24 +766,26 @@ def test_1(self):
"gateway4": None,
"dns": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
- "state": "up",
- "mtu": 1450,
- "check_iface_exists": True,
- "force_state_change": None,
"mac": "52:54:00:44:9f:ba",
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": None,
- "type": "ethernet",
+ "mtu": 1450,
+ "name": "prod1",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
},
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod.100",
- "parent": "prod1",
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "prod.100",
"ip": {
"dhcp4": False,
"route_metric6": None,
@@ -718,20 +820,18 @@ def test_1(self):
}
],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
- "state": "up",
"master": None,
+ "mtu": None,
+ "name": "prod.100",
+ "parent": "prod1",
+ "persistent_state": "present",
"slave_type": None,
- "ignore_errors": None,
- "interface_name": "prod.100",
+ "state": "up",
"type": "vlan",
"vlan": {"id": 101},
"wait": None,
+ "zone": None,
},
],
[
@@ -763,12 +863,18 @@ def test_1(self):
],
)
+ def test_macvlan(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "eth0-parent",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "eth0",
"ip": {
"dhcp4": False,
"auto6": False,
@@ -790,24 +896,25 @@ def test_1(self):
"gateway4": None,
"dns": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
- "state": "up",
- "mtu": 1450,
- "check_iface_exists": True,
- "force_state_change": None,
"mac": "33:24:10:24:2f:b9",
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": "eth0",
- "type": "ethernet",
+ "mtu": 1450,
+ "name": "eth0-parent",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
},
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "veth0.0",
- "parent": "eth0-parent",
+ "check_iface_exists": True,
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "veth0",
"ip": {
"dhcp4": False,
"route_metric6": None,
@@ -838,23 +945,25 @@ def test_1(self):
],
},
"mac": None,
- "mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
- "state": "up",
+ "macvlan": {"mode": "bridge", "promiscuous": True, "tap": False},
"master": None,
+ "mtu": None,
+ "name": "veth0.0",
+ "parent": "eth0-parent",
+ "persistent_state": "present",
"slave_type": None,
- "ignore_errors": None,
- "interface_name": "veth0",
+ "state": "up",
"type": "macvlan",
- "macvlan": {"mode": "bridge", "promiscuous": True, "tap": False},
"wait": None,
+ "zone": None,
},
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "veth0.1",
- "parent": "eth0-parent",
+ "check_iface_exists": True,
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "veth1",
"ip": {
"dhcp4": False,
"route_metric6": None,
@@ -885,18 +994,17 @@ def test_1(self):
],
},
"mac": None,
- "mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
- "state": "up",
+ "macvlan": {"mode": "passthru", "promiscuous": False, "tap": True},
"master": None,
+ "mtu": None,
+ "name": "veth0.1",
+ "parent": "eth0-parent",
+ "persistent_state": "present",
"slave_type": None,
- "ignore_errors": None,
- "interface_name": "veth1",
+ "state": "up",
"type": "macvlan",
- "macvlan": {"mode": "passthru", "promiscuous": False, "tap": True},
"wait": None,
+ "zone": None,
},
],
[
@@ -943,73 +1051,79 @@ def test_1(self):
],
)
+ def test_bridge_no_dhcp4_auto6(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod2",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "bridge2",
"ip": {
+ "address": [],
+ "auto6": False,
"dhcp4": False,
- "route_metric6": None,
- "route_metric4": None,
- "dns_search": [],
"dhcp4_send_hostname": None,
- "gateway6": None,
- "gateway4": None,
- "auto6": False,
"dns": [],
- "address": [],
+ "dns_search": [],
+ "gateway4": None,
+ "gateway6": None,
+ "route": [],
"route_append_only": False,
+ "route_metric4": None,
+ "route_metric6": None,
"rule_append_only": False,
- "route": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
+ "master": None,
"mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
+ "name": "prod2",
+ "parent": None,
+ "persistent_state": "present",
+ "slave_type": None,
"state": "up",
- "master": None,
- "ignore_errors": None,
- "interface_name": "bridge2",
"type": "bridge",
- "slave_type": None,
"wait": None,
+ "zone": None,
},
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "prod2-slave1",
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "eth1",
"ip": {
- "dhcp4": True,
- "auto6": True,
"address": [],
- "route_append_only": False,
- "rule_append_only": False,
- "route": [],
- "route_metric6": None,
- "route_metric4": None,
- "dns_search": [],
+ "auto6": True,
+ "dhcp4": True,
"dhcp4_send_hostname": None,
- "gateway6": None,
- "gateway4": None,
"dns": [],
+ "dns_search": [],
+ "gateway4": None,
+ "gateway6": None,
+ "route": [],
+ "route_append_only": False,
+ "route_metric4": None,
+ "route_metric6": None,
+ "rule_append_only": False,
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
+ "master": "prod2",
"mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
+ "name": "prod2-slave1",
+ "parent": None,
+ "persistent_state": "present",
+ "slave_type": "bridge",
"state": "up",
- "master": "prod2",
- "ignore_errors": None,
- "interface_name": "eth1",
"type": "ethernet",
- "slave_type": "bridge",
"wait": None,
+ "zone": None,
},
],
[
@@ -1030,12 +1144,19 @@ def test_1(self):
],
)
+ def test_bond(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "bond1",
- "parent": None,
+ "bond": {"mode": "balance-rr", "miimon": None},
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "bond1",
"ip": {
"dhcp4": True,
"route_metric6": None,
@@ -1051,31 +1172,35 @@ def test_1(self):
"rule_append_only": False,
"route": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
+ "master": None,
"mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
+ "name": "bond1",
+ "parent": None,
+ "persistent_state": "present",
+ "slave_type": None,
"state": "up",
- "master": None,
- "ignore_errors": None,
- "interface_name": "bond1",
"type": "bond",
- "slave_type": None,
- "bond": {"mode": "balance-rr", "miimon": None},
"wait": None,
+ "zone": None,
}
],
[{"name": "bond1", "state": "up", "type": "bond"}],
)
+ def test_bond_active_backup(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
- "name": "bond1",
- "parent": None,
+ "bond": {"mode": "active-backup", "miimon": None},
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "bond1",
"ip": {
"dhcp4": True,
"route_metric6": None,
@@ -1091,20 +1216,17 @@ def test_1(self):
"rule_append_only": False,
"route": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
+ "master": None,
"mtu": None,
- "zone": None,
- "check_iface_exists": True,
- "force_state_change": None,
+ "name": "bond1",
+ "parent": None,
+ "persistent_state": "present",
+ "slave_type": None,
"state": "up",
- "master": None,
- "ignore_errors": None,
- "interface_name": "bond1",
"type": "bond",
- "slave_type": None,
- "bond": {"mode": "active-backup", "miimon": None},
"wait": None,
+ "zone": None,
}
],
[
@@ -1117,13 +1239,21 @@ def test_1(self):
],
)
+ def test_invalid_values(self):
+ self.maxDiff = None
self.do_connections_check_invalid([{}])
self.do_connections_check_invalid([{"name": "b", "xxx": 5}])
+ def test_ethernet_mac_address(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present"],
"autoconnect": True,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "ignore_errors": None,
"interface_name": None,
"ip": {
"address": [],
@@ -1140,70 +1270,33 @@ def test_1(self):
"dns": [],
"dns_search": [],
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": "aa:bb:cc:dd:ee:ff",
- "mtu": None,
- "zone": None,
"master": None,
- "check_iface_exists": True,
+ "mtu": None,
"name": "5",
"parent": None,
- "ignore_errors": None,
+ "persistent_state": "present",
"slave_type": None,
- "state": "present",
+ "state": None,
"type": "ethernet",
+ "zone": None,
}
],
[{"name": "5", "type": "ethernet", "mac": "AA:bb:cC:DD:ee:FF"}],
)
+ def test_ethernet_speed_settings(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
- "name": "5",
- "state": "up",
- "type": "ethernet",
+ "actions": ["present", "up"],
"autoconnect": True,
- "parent": None,
- "ip": {
- "gateway6": None,
- "gateway4": None,
- "route_metric4": None,
- "auto6": True,
- "dhcp4": True,
- "address": [],
- "route_append_only": False,
- "rule_append_only": False,
- "route": [],
- "dns": [],
- "dns_search": [],
- "route_metric6": None,
- "dhcp4_send_hostname": None,
- },
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
- "mac": None,
- "mtu": None,
- "zone": None,
- "master": None,
- "ignore_errors": None,
- "interface_name": "5",
"check_iface_exists": True,
+ "ethernet": {"autoneg": False, "duplex": "half", "speed": 400},
"force_state_change": None,
- "slave_type": None,
- "wait": None,
- }
- ],
- [{"name": "5", "state": "up", "type": "ethernet"}],
- )
-
- self.do_connections_validate(
- [
- {
- "name": "5",
- "state": "up",
- "type": "ethernet",
- "autoconnect": True,
- "parent": None,
+ "ignore_errors": None,
+ "interface_name": "5",
"ip": {
"gateway6": None,
"gateway4": None,
@@ -1219,17 +1312,17 @@ def test_1(self):
"route_metric6": None,
"dhcp4_send_hostname": None,
},
- "ethernet": {"autoneg": False, "duplex": "half", "speed": 400},
"mac": None,
- "mtu": None,
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": "5",
- "check_iface_exists": True,
- "force_state_change": None,
+ "mtu": None,
+ "name": "5",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
}
],
[
@@ -1262,9 +1355,12 @@ def test_1(self):
],
)
+ def test_bridge2(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
"check_iface_exists": True,
"ethernet": {"autoneg": None, "duplex": None, "speed": 0},
@@ -1291,6 +1387,7 @@ def test_1(self):
"mtu": None,
"name": "6643-master",
"parent": None,
+ "persistent_state": "present",
"slave_type": None,
"state": "up",
"type": "bridge",
@@ -1298,6 +1395,7 @@ def test_1(self):
"zone": None,
},
{
+ "actions": ["present", "up"],
"autoconnect": True,
"check_iface_exists": True,
"ethernet": {"autoneg": None, "duplex": None, "speed": 0},
@@ -1307,8 +1405,8 @@ def test_1(self):
"ip": {
"address": [],
"auto6": True,
- "dhcp4": True,
"dhcp4_send_hostname": None,
+ "dhcp4": True,
"dns": [],
"dns_search": [],
"gateway4": None,
@@ -1324,6 +1422,7 @@ def test_1(self):
"mtu": None,
"name": "6643",
"parent": None,
+ "persistent_state": "present",
"slave_type": "bridge",
"state": "up",
"type": "ethernet",
@@ -1342,9 +1441,12 @@ def test_1(self):
],
)
+ def test_infiniband(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
"check_iface_exists": True,
"force_state_change": None,
@@ -1371,6 +1473,7 @@ def test_1(self):
"mtu": None,
"name": "infiniband.1",
"parent": None,
+ "persistent_state": "present",
"slave_type": None,
"state": "up",
"type": "infiniband",
@@ -1406,9 +1509,12 @@ def test_1(self):
],
)
+ def test_infiniband2(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
+ "actions": ["present", "up"],
"autoconnect": True,
"check_iface_exists": True,
"force_state_change": None,
@@ -1436,6 +1542,7 @@ def test_1(self):
"mtu": None,
"name": "infiniband.2",
"parent": None,
+ "persistent_state": "present",
"slave_type": None,
"state": "up",
"type": "infiniband",
@@ -1477,14 +1584,18 @@ def test_1(self):
],
)
+ def test_route_metric_prefix(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
- "name": "555",
- "state": "up",
- "type": "ethernet",
+ "actions": ["present", "up"],
"autoconnect": True,
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "555",
"ip": {
"gateway6": None,
"gateway4": None,
@@ -1515,17 +1626,17 @@ def test_1(self):
"route_metric6": None,
"dhcp4_send_hostname": None,
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": None,
"master": None,
- "ignore_errors": None,
- "interface_name": "555",
- "check_iface_exists": True,
- "force_state_change": None,
+ "mtu": None,
+ "name": "555",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": None,
}
],
[
@@ -1563,14 +1674,18 @@ def test_1(self):
],
)
+ def test_route_v6(self):
+ self.maxDiff = None
self.do_connections_validate(
[
{
- "name": "e556",
- "state": "up",
- "type": "ethernet",
+ "actions": ["present", "up"],
"autoconnect": True,
- "parent": None,
+ "check_iface_exists": True,
+ "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
+ "force_state_change": None,
+ "ignore_errors": None,
+ "interface_name": "e556",
"ip": {
"gateway6": None,
"gateway4": None,
@@ -1608,17 +1723,17 @@ def test_1(self):
"route_metric6": None,
"dhcp4_send_hostname": None,
},
- "ethernet": {"autoneg": None, "duplex": None, "speed": 0},
"mac": None,
- "mtu": None,
- "zone": "external",
"master": None,
- "ignore_errors": None,
- "interface_name": "e556",
- "check_iface_exists": True,
- "force_state_change": None,
+ "mtu": None,
+ "name": "e556",
+ "parent": None,
+ "persistent_state": "present",
"slave_type": None,
+ "state": "up",
+ "type": "ethernet",
"wait": None,
+ "zone": "external",
}
],
[
@@ -1688,6 +1803,8 @@ def test_1(self):
],
)
+ def test_invalid_mac(self):
+ self.maxDiff = None
self.do_connections_check_invalid(
[{"name": "b", "type": "ethernet", "mac": "aa:b"}]
)
@@ -1716,7 +1833,7 @@ def test_interface_name_ethernet_empty(self):
self.assertTrue(connections[0]["interface_name"] is None)
def test_interface_name_ethernet_None(self):
- """ Check that inerface_name cannot be None """
+ """ Check that interface_name cannot be None """
network_connections = [
{"name": "internal_network", "type": "ethernet", "interface_name": None}
]
@@ -1730,7 +1847,7 @@ def test_interface_name_ethernet_explicit(self):
{"name": "internal", "type": "ethernet", "interface_name": "eth0"}
]
connections = ARGS_CONNECTIONS.validate(network_connections)
- self.assertEquals(connections[0]["interface_name"], "eth0")
+ self.assertEqual(connections[0]["interface_name"], "eth0")
def test_interface_name_ethernet_invalid_profile(self):
""" Require explicit interface_name when the profile name is not a
@@ -1764,7 +1881,235 @@ def test_interface_name_bond_empty_interface_name(self):
def test_interface_name_bond_profile_as_interface_name(self):
network_connections = [{"name": "internal", "type": "bond"}]
connections = ARGS_CONNECTIONS.validate(network_connections)
- self.assertEquals(connections[0]["interface_name"], "internal")
+ self.assertEqual(connections[0]["interface_name"], "internal")
+
+ def check_connection(self, connection, expected):
+ reduced_connection = {}
+ for setting in expected:
+ reduced_connection[setting] = connection[setting]
+ self.assertEqual(reduced_connection, expected)
+
+ def check_partial_connection_zero(self, network_config, expected):
+ connections = ARGS_CONNECTIONS.validate([network_config])
+ self.check_connection(connections[0], expected)
+
+ def check_one_connection_with_defaults(
+ self, network_config, expected_changed_settings
+ ):
+ self.maxDiff = None
+ expected = self.default_connection_settings
+ expected.update(expected_changed_settings)
+
+ self.do_connections_validate([expected], [network_config])
+
+ def test_default_states(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0"},
+ {"actions": ["present"], "persistent_state": "present", "state": None},
+ )
+
+ def test_invalid_persistent_state_up(self):
+ network_connections = [{"name": "internal", "persistent_state": "up"}]
+ self.assertRaises(
+ n.ValidationError, ARGS_CONNECTIONS.validate, network_connections
+ )
+
+ def test_invalid_persistent_state_down(self):
+ network_connections = [{"name": "internal", "persistent_state": "down"}]
+ self.assertRaises(
+ n.ValidationError, ARGS_CONNECTIONS.validate, network_connections
+ )
+
+ def test_invalid_state_test(self):
+ network_connections = [{"name": "internal", "state": "test"}]
+ self.assertRaises(
+ n.ValidationError, ARGS_CONNECTIONS.validate, network_connections
+ )
+
+ def test_default_states_type(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "type": "ethernet"},
+ {"actions": ["present"], "persistent_state": "present", "state": None},
+ )
+
+ def test_persistent_state_present(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "persistent_state": "present", "type": "ethernet"},
+ {"actions": ["present"], "persistent_state": "present", "state": None},
+ )
+
+ def test_state_present(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "state": "present", "type": "ethernet"},
+ {"actions": ["present"], "persistent_state": "present", "state": None},
+ )
+
+ def test_state_absent(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "state": "absent"},
+ {"actions": ["absent"], "persistent_state": "absent", "state": None},
+ )
+
+ def test_persistent_state_absent(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "persistent_state": "absent"},
+ {"actions": ["absent"], "persistent_state": "absent", "state": None},
+ )
+
+ def test_state_present_up(self):
+ self.check_partial_connection_zero(
+ {
+ "name": "eth0",
+ "persistent_state": "present",
+ "state": "up",
+ "type": "ethernet",
+ },
+ {
+ "actions": ["present", "up"],
+ "persistent_state": "present",
+ "state": "up",
+ },
+ )
+
+ def test_state_present_down(self):
+ self.check_partial_connection_zero(
+ {
+ "name": "eth0",
+ "persistent_state": "present",
+ "state": "down",
+ "type": "ethernet",
+ },
+ {
+ "actions": ["present", "down"],
+ "persistent_state": "present",
+ "state": "down",
+ },
+ )
+
+ def test_state_absent_up_no_type(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "persistent_state": "absent", "state": "up"},
+ {
+ "actions": ["present", "up", "absent"],
+ "persistent_state": "absent",
+ "state": "up",
+ },
+ )
+
+ def test_state_absent_up_type(self):
+ # if type is specified, present should happen, too
+ self.check_partial_connection_zero(
+ {
+ "name": "eth0",
+ "persistent_state": "absent",
+ "state": "up",
+ "type": "ethernet",
+ },
+ {
+ "actions": ["present", "up", "absent"],
+ "persistent_state": "absent",
+ "state": "up",
+ },
+ )
+
+ def test_state_absent_down(self):
+ # if type is specified, present should happen, too
+ self.check_partial_connection_zero(
+ {"name": "eth0", "persistent_state": "absent", "state": "down"},
+ {
+ "actions": ["present", "down", "absent"],
+ "persistent_state": "absent",
+ "state": "down",
+ },
+ )
+
+ def test_state_up_no_type(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "state": "up"},
+ {
+ "actions": ["present", "up"],
+ "persistent_state": "present",
+ "state": "up",
+ },
+ )
+
+ def test_state_up_type(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "state": "up", "type": "ethernet"},
+ {
+ "actions": ["present", "up"],
+ "persistent_state": "present",
+ "state": "up",
+ },
+ )
+
+ def test_state_down_no_type(self):
+ self.check_partial_connection_zero(
+ {"name": "eth0", "state": "down"},
+ {
+ "actions": ["present", "down"],
+ "persistent_state": "present",
+ "state": "down",
+ },
+ )
+
+ def test_full_state_present_no_type(self):
+ self.maxDiff = None
+ self.do_connections_validate(
+ [
+ {
+ "actions": ["present"],
+ "ignore_errors": None,
+ "name": "eth0",
+ "state": None,
+ "persistent_state": "present",
+ }
+ ],
+ [{"name": "eth0", "persistent_state": "present"}],
+ )
+
+ def test_full_state_present_type_defaults(self):
+ self.check_one_connection_with_defaults(
+ {"name": "eth0", "type": "ethernet", "persistent_state": "present"},
+ {
+ "actions": ["present"],
+ "interface_name": "eth0",
+ "name": "eth0",
+ "persistent_state": "present",
+ "state": None,
+ "type": "ethernet",
+ },
+ )
+
+ def test_full_state_absent_no_type(self):
+ self.maxDiff = None
+ self.do_connections_validate(
+ [
+ {
+ "actions": ["absent"],
+ "ignore_errors": None,
+ "name": "eth0",
+ "state": None,
+ "persistent_state": "absent",
+ }
+ ],
+ [{"name": "eth0", "persistent_state": "absent"}],
+ )
+
+ def test_full_state_absent_defaults(self):
+ self.maxDiff = None
+ self.check_one_connection_with_defaults(
+ {"name": "eth0", "persistent_state": "absent", "type": "ethernet"},
+ {
+ "actions": ["absent"],
+ "ignore_errors": None,
+ "name": "eth0",
+ "state": None,
+ "persistent_state": "absent",
+ "type": "ethernet",
+ "interface_name": "eth0",
+ },
+ )
@my_test_skipIf(nmutil is None, "no support for NM (libnm via pygobject)")
diff --git a/tests/tests_unit.yml b/tests/tests_unit.yml
index 203b98b..48e478b 100644
--- a/tests/tests_unit.yml
+++ b/tests/tests_unit.yml
@@ -3,27 +3,33 @@
- hosts: all
name: Setup for test running
tasks:
- # FIXME: change with_items to loop when test-harness is updated
+ - name: Install EPEL on enterprise Linux for python2-mock
+ command: yum install -y https://dl.fedoraproject.org/pub/epel/epel-release-latest-{{ ansible_distribution_major_version }}.noarch.rpm
+ when:
+ - ansible_distribution in ['RedHat', 'CentOS']
+ - ansible_distribution_major_version in ['6', '7']
+
- package:
name: "{{ item }}"
state: present
ignore_errors: true
- with_items:
+ loop:
- NetworkManager-libnm
- python2-gobject-base
- python3-gobject-base
- python-gobject-base
+ - python2-mock
- hosts: all
name: execute python unit tests
tasks:
- copy:
- src: ../library/network_connections.py
- dest: /tmp/test-unit-1/
-
- - copy:
- src: test_network_connections.py
+ src: "{{ item }}"
dest: /tmp/test-unit-1/
+ loop:
+ - ../library/network_connections.py
+ - test_network_connections.py
+ - ../module_utils/network_lsr
- file:
state: directory
diff --git a/tox.ini b/tox.ini
index 369f890..1c8a0a6 100644
--- a/tox.ini
+++ b/tox.ini
@@ -8,15 +8,16 @@ basepython = python2.7
deps =
py{26,27,36,37}: pytest-cov
py{27,36,37}: pytest>=3.5.1
+ py{26,27}: mock
py26: pytest
[base]
passenv = *
setenv =
- PYTHONPATH = {toxinidir}/library
+ PYTHONPATH = {toxinidir}/library:{toxinidir}/module_utils
LC_ALL = C
changedir = {toxinidir}/tests
-covtarget = network_connections
+covtarget = {toxinidir}/library --cov {toxinidir}/module_utils
pytesttarget = .
[testenv:black]
@@ -93,6 +94,7 @@ commands =
--errors-only \
{posargs} \
library/network_connections.py \
+ module_utils/network_lsr \
tests/test_network_connections.py
[testenv:flake8]