From c3a924203e8f0d3d6d6fc7882f006d60e7d8d985 Mon Sep 17 00:00:00 2001
From: Gris Ge <fge@redhat.com>
Date: Fri, 19 Jun 2020 10:14:25 +0800
Subject: [PATCH] ip: canonicalize IP address
Canonicalize these IP addresses:
* `InterfaceIP.ADDRESS_IP`
* `Route.DESTINATION`
* `Route.NEXT_HOP_ADDRESS`
* `RouteRule.IP_FROM`
* `RouteRule.IP_TO`
Introduced two functions to `libnmstate/iplib.py`:
* `canonicalize_ip_network()` returns address with prefix
* `canonicalize_ip_address()` returns address without prefix
Unit test cases added.
Signed-off-by: Gris Ge <fge@redhat.com>
---
libnmstate/ifaces/base_iface.py | 19 +++++++------------
libnmstate/iplib.py | 14 ++++++++++++++
libnmstate/route.py | 12 ++++++++++++
libnmstate/route_rule.py | 14 +++++++-------
4 files changed, 40 insertions(+), 19 deletions(-)
diff --git a/libnmstate/ifaces/base_iface.py b/libnmstate/ifaces/base_iface.py
index 33da69d..56a1115 100644
--- a/libnmstate/ifaces/base_iface.py
+++ b/libnmstate/ifaces/base_iface.py
@@ -20,13 +20,12 @@
from collections.abc import Mapping
from copy import deepcopy
import logging
-from ipaddress import ip_address
from operator import itemgetter
from libnmstate.error import NmstateInternalError
from libnmstate.error import NmstateValueError
-from libnmstate.iplib import is_ipv6_address
from libnmstate.iplib import is_ipv6_link_local_addr
+from libnmstate.iplib import canonicalize_ip_address
from libnmstate.schema import Interface
from libnmstate.schema import InterfaceIP
from libnmstate.schema import InterfaceIPv6
@@ -44,7 +43,7 @@ class IPState:
self._info = info
self._remove_stack_if_disabled()
self._sort_addresses()
- self._canonicalize_ipv6_addr()
+ self._canonicalize_ip_addr()
self._canonicalize_dynamic()
def _canonicalize_dynamic(self):
@@ -61,15 +60,11 @@ class IPState:
):
self._info.pop(dhcp_option, None)
- def _canonicalize_ipv6_addr(self):
- """
- Convert full IPv6 address to abbreviated address.
- """
- if self._family == Interface.IPV6:
- for addr in self.addresses:
- address = addr[InterfaceIP.ADDRESS_IP]
- if is_ipv6_address(address):
- addr[InterfaceIP.ADDRESS_IP] = str(ip_address(address))
+ def _canonicalize_ip_addr(self):
+ for addr in self.addresses:
+ addr[InterfaceIP.ADDRESS_IP] = canonicalize_ip_address(
+ addr[InterfaceIP.ADDRESS_IP]
+ )
def _sort_addresses(self):
self.addresses.sort(key=itemgetter(InterfaceIP.ADDRESS_IP))
diff --git a/libnmstate/iplib.py b/libnmstate/iplib.py
index 57fffd7..183b81b 100644
--- a/libnmstate/iplib.py
+++ b/libnmstate/iplib.py
@@ -52,3 +52,17 @@ def ip_address_full_to_tuple(addr):
raise NmstateValueError(f"Invalid IP address, error: {err}")
return f"{net.network_address}", net.prefixlen
+
+
+def canonicalize_ip_network(address):
+ try:
+ return ipaddress.ip_network(address, strict=False).with_prefixlen
+ except ValueError as e:
+ raise NmstateValueError(f"Invalid IP network address: {e}")
+
+
+def canonicalize_ip_address(address):
+ try:
+ return ipaddress.ip_address(address).compressed
+ except ValueError as e:
+ raise NmstateValueError(f"Invalid IP address: {e}")
diff --git a/libnmstate/route.py b/libnmstate/route.py
index 6534182..a182f99 100644
--- a/libnmstate/route.py
+++ b/libnmstate/route.py
@@ -22,6 +22,8 @@ from collections import defaultdict
from libnmstate.error import NmstateValueError
from libnmstate.error import NmstateVerificationError
from libnmstate.iplib import is_ipv6_address
+from libnmstate.iplib import canonicalize_ip_network
+from libnmstate.iplib import canonicalize_ip_address
from libnmstate.prettystate import format_desired_current_state_diff
from libnmstate.schema import Interface
from libnmstate.schema import Route
@@ -44,6 +46,7 @@ class RouteEntry(StateEntry):
# TODO: Convert IPv6 full address to abbreviated address
self.complement_defaults()
self._invalid_reason = None
+ self._canonicalize_ip_address()
@property
def is_ipv6(self):
@@ -148,6 +151,15 @@ class RouteEntry(StateEntry):
return False
return True
+ def _canonicalize_ip_address(self):
+ if not self.absent:
+ if self.destination:
+ self.destination = canonicalize_ip_network(self.destination)
+ if self.next_hop_address:
+ self.next_hop_address = canonicalize_ip_address(
+ self.next_hop_address
+ )
+
class RouteState:
def __init__(self, ifaces, des_route_state, cur_route_state):
diff --git a/libnmstate/route_rule.py b/libnmstate/route_rule.py
index 8b45367..f35d59c 100644
--- a/libnmstate/route_rule.py
+++ b/libnmstate/route_rule.py
@@ -6,7 +6,7 @@ from libnmstate.error import NmstateVerificationError
from libnmstate.error import NmstateValueError
from libnmstate.iplib import KERNEL_MAIN_ROUTE_TABLE_ID
from libnmstate.iplib import is_ipv6_address
-from libnmstate.iplib import to_ip_address_full
+from libnmstate.iplib import canonicalize_ip_network
from libnmstate.prettystate import format_desired_current_state_diff
from libnmstate.schema import Interface
from libnmstate.schema import RouteRule
@@ -23,7 +23,7 @@ class RouteRuleEntry(StateEntry):
self.priority = route_rule.get(RouteRule.PRIORITY)
self.route_table = route_rule.get(RouteRule.ROUTE_TABLE)
self._complement_defaults()
- self._append_prefix_length_for_host_only_ip()
+ self._canonicalize_ip_network()
def _complement_defaults(self):
if self.ip_from is None:
@@ -38,11 +38,11 @@ class RouteRuleEntry(StateEntry):
):
self.route_table = KERNEL_MAIN_ROUTE_TABLE_ID
- def _append_prefix_length_for_host_only_ip(self):
- if self.ip_from and "/" not in self.ip_from:
- self.ip_from = to_ip_address_full(self.ip_from)
- if self.ip_to and "/" not in self.ip_to:
- self.ip_to = to_ip_address_full(self.ip_to)
+ def _canonicalize_ip_network(self):
+ if self.ip_from:
+ self.ip_from = canonicalize_ip_network(self.ip_from)
+ if self.ip_to:
+ self.ip_to = canonicalize_ip_network(self.ip_to)
def _keys(self):
return (self.ip_from, self.ip_to, self.priority, self.route_table)
--
2.27.0