Blob Blame History Raw
From c3a924203e8f0d3d6d6fc7882f006d60e7d8d985 Mon Sep 17 00:00:00 2001
From: Gris Ge <fge@redhat.com>
Date: Fri, 19 Jun 2020 10:14:25 +0800
Subject: [PATCH] ip: canonicalize IP address

Canonicalize these IP addresses:
    * `InterfaceIP.ADDRESS_IP`
    * `Route.DESTINATION`
    * `Route.NEXT_HOP_ADDRESS`
    * `RouteRule.IP_FROM`
    * `RouteRule.IP_TO`

Introduced two functions to `libnmstate/iplib.py`:
    * `canonicalize_ip_network()` returns address with prefix
    * `canonicalize_ip_address()` returns address without prefix

Unit test cases added.

Signed-off-by: Gris Ge <fge@redhat.com>
---
 libnmstate/ifaces/base_iface.py | 19 +++++++------------
 libnmstate/iplib.py             | 14 ++++++++++++++
 libnmstate/route.py             | 12 ++++++++++++
 libnmstate/route_rule.py        | 14 +++++++-------
 4 files changed, 40 insertions(+), 19 deletions(-)

diff --git a/libnmstate/ifaces/base_iface.py b/libnmstate/ifaces/base_iface.py
index 33da69d..56a1115 100644
--- a/libnmstate/ifaces/base_iface.py
+++ b/libnmstate/ifaces/base_iface.py
@@ -20,13 +20,12 @@
 from collections.abc import Mapping
 from copy import deepcopy
 import logging
-from ipaddress import ip_address
 from operator import itemgetter
 
 from libnmstate.error import NmstateInternalError
 from libnmstate.error import NmstateValueError
-from libnmstate.iplib import is_ipv6_address
 from libnmstate.iplib import is_ipv6_link_local_addr
+from libnmstate.iplib import canonicalize_ip_address
 from libnmstate.schema import Interface
 from libnmstate.schema import InterfaceIP
 from libnmstate.schema import InterfaceIPv6
@@ -44,7 +43,7 @@ class IPState:
         self._info = info
         self._remove_stack_if_disabled()
         self._sort_addresses()
-        self._canonicalize_ipv6_addr()
+        self._canonicalize_ip_addr()
         self._canonicalize_dynamic()
 
     def _canonicalize_dynamic(self):
@@ -61,15 +60,11 @@ class IPState:
             ):
                 self._info.pop(dhcp_option, None)
 
-    def _canonicalize_ipv6_addr(self):
-        """
-        Convert full IPv6 address to abbreviated address.
-        """
-        if self._family == Interface.IPV6:
-            for addr in self.addresses:
-                address = addr[InterfaceIP.ADDRESS_IP]
-                if is_ipv6_address(address):
-                    addr[InterfaceIP.ADDRESS_IP] = str(ip_address(address))
+    def _canonicalize_ip_addr(self):
+        for addr in self.addresses:
+            addr[InterfaceIP.ADDRESS_IP] = canonicalize_ip_address(
+                addr[InterfaceIP.ADDRESS_IP]
+            )
 
     def _sort_addresses(self):
         self.addresses.sort(key=itemgetter(InterfaceIP.ADDRESS_IP))
diff --git a/libnmstate/iplib.py b/libnmstate/iplib.py
index 57fffd7..183b81b 100644
--- a/libnmstate/iplib.py
+++ b/libnmstate/iplib.py
@@ -52,3 +52,17 @@ def ip_address_full_to_tuple(addr):
         raise NmstateValueError(f"Invalid IP address, error: {err}")
 
     return f"{net.network_address}", net.prefixlen
+
+
+def canonicalize_ip_network(address):
+    try:
+        return ipaddress.ip_network(address, strict=False).with_prefixlen
+    except ValueError as e:
+        raise NmstateValueError(f"Invalid IP network address: {e}")
+
+
+def canonicalize_ip_address(address):
+    try:
+        return ipaddress.ip_address(address).compressed
+    except ValueError as e:
+        raise NmstateValueError(f"Invalid IP address: {e}")
diff --git a/libnmstate/route.py b/libnmstate/route.py
index 6534182..a182f99 100644
--- a/libnmstate/route.py
+++ b/libnmstate/route.py
@@ -22,6 +22,8 @@ from collections import defaultdict
 from libnmstate.error import NmstateValueError
 from libnmstate.error import NmstateVerificationError
 from libnmstate.iplib import is_ipv6_address
+from libnmstate.iplib import canonicalize_ip_network
+from libnmstate.iplib import canonicalize_ip_address
 from libnmstate.prettystate import format_desired_current_state_diff
 from libnmstate.schema import Interface
 from libnmstate.schema import Route
@@ -44,6 +46,7 @@ class RouteEntry(StateEntry):
         # TODO: Convert IPv6 full address to abbreviated address
         self.complement_defaults()
         self._invalid_reason = None
+        self._canonicalize_ip_address()
 
     @property
     def is_ipv6(self):
@@ -148,6 +151,15 @@ class RouteEntry(StateEntry):
                 return False
         return True
 
+    def _canonicalize_ip_address(self):
+        if not self.absent:
+            if self.destination:
+                self.destination = canonicalize_ip_network(self.destination)
+            if self.next_hop_address:
+                self.next_hop_address = canonicalize_ip_address(
+                    self.next_hop_address
+                )
+
 
 class RouteState:
     def __init__(self, ifaces, des_route_state, cur_route_state):
diff --git a/libnmstate/route_rule.py b/libnmstate/route_rule.py
index 8b45367..f35d59c 100644
--- a/libnmstate/route_rule.py
+++ b/libnmstate/route_rule.py
@@ -6,7 +6,7 @@ from libnmstate.error import NmstateVerificationError
 from libnmstate.error import NmstateValueError
 from libnmstate.iplib import KERNEL_MAIN_ROUTE_TABLE_ID
 from libnmstate.iplib import is_ipv6_address
-from libnmstate.iplib import to_ip_address_full
+from libnmstate.iplib import canonicalize_ip_network
 from libnmstate.prettystate import format_desired_current_state_diff
 from libnmstate.schema import Interface
 from libnmstate.schema import RouteRule
@@ -23,7 +23,7 @@ class RouteRuleEntry(StateEntry):
         self.priority = route_rule.get(RouteRule.PRIORITY)
         self.route_table = route_rule.get(RouteRule.ROUTE_TABLE)
         self._complement_defaults()
-        self._append_prefix_length_for_host_only_ip()
+        self._canonicalize_ip_network()
 
     def _complement_defaults(self):
         if self.ip_from is None:
@@ -38,11 +38,11 @@ class RouteRuleEntry(StateEntry):
         ):
             self.route_table = KERNEL_MAIN_ROUTE_TABLE_ID
 
-    def _append_prefix_length_for_host_only_ip(self):
-        if self.ip_from and "/" not in self.ip_from:
-            self.ip_from = to_ip_address_full(self.ip_from)
-        if self.ip_to and "/" not in self.ip_to:
-            self.ip_to = to_ip_address_full(self.ip_to)
+    def _canonicalize_ip_network(self):
+        if self.ip_from:
+            self.ip_from = canonicalize_ip_network(self.ip_from)
+        if self.ip_to:
+            self.ip_to = canonicalize_ip_network(self.ip_to)
 
     def _keys(self):
         return (self.ip_from, self.ip_to, self.priority, self.route_table)
-- 
2.27.0