From c3a924203e8f0d3d6d6fc7882f006d60e7d8d985 Mon Sep 17 00:00:00 2001 From: Gris Ge Date: Fri, 19 Jun 2020 10:14:25 +0800 Subject: [PATCH] ip: canonicalize IP address Canonicalize these IP addresses: * `InterfaceIP.ADDRESS_IP` * `Route.DESTINATION` * `Route.NEXT_HOP_ADDRESS` * `RouteRule.IP_FROM` * `RouteRule.IP_TO` Introduced two functions to `libnmstate/iplib.py`: * `canonicalize_ip_network()` returns address with prefix * `canonicalize_ip_address()` returns address without prefix Unit test cases added. Signed-off-by: Gris Ge --- libnmstate/ifaces/base_iface.py | 19 +++++++------------ libnmstate/iplib.py | 14 ++++++++++++++ libnmstate/route.py | 12 ++++++++++++ libnmstate/route_rule.py | 14 +++++++------- 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/libnmstate/ifaces/base_iface.py b/libnmstate/ifaces/base_iface.py index 33da69d..56a1115 100644 --- a/libnmstate/ifaces/base_iface.py +++ b/libnmstate/ifaces/base_iface.py @@ -20,13 +20,12 @@ from collections.abc import Mapping from copy import deepcopy import logging -from ipaddress import ip_address from operator import itemgetter from libnmstate.error import NmstateInternalError from libnmstate.error import NmstateValueError -from libnmstate.iplib import is_ipv6_address from libnmstate.iplib import is_ipv6_link_local_addr +from libnmstate.iplib import canonicalize_ip_address from libnmstate.schema import Interface from libnmstate.schema import InterfaceIP from libnmstate.schema import InterfaceIPv6 @@ -44,7 +43,7 @@ class IPState: self._info = info self._remove_stack_if_disabled() self._sort_addresses() - self._canonicalize_ipv6_addr() + self._canonicalize_ip_addr() self._canonicalize_dynamic() def _canonicalize_dynamic(self): @@ -61,15 +60,11 @@ class IPState: ): self._info.pop(dhcp_option, None) - def _canonicalize_ipv6_addr(self): - """ - Convert full IPv6 address to abbreviated address. - """ - if self._family == Interface.IPV6: - for addr in self.addresses: - address = addr[InterfaceIP.ADDRESS_IP] - if is_ipv6_address(address): - addr[InterfaceIP.ADDRESS_IP] = str(ip_address(address)) + def _canonicalize_ip_addr(self): + for addr in self.addresses: + addr[InterfaceIP.ADDRESS_IP] = canonicalize_ip_address( + addr[InterfaceIP.ADDRESS_IP] + ) def _sort_addresses(self): self.addresses.sort(key=itemgetter(InterfaceIP.ADDRESS_IP)) diff --git a/libnmstate/iplib.py b/libnmstate/iplib.py index 57fffd7..183b81b 100644 --- a/libnmstate/iplib.py +++ b/libnmstate/iplib.py @@ -52,3 +52,17 @@ def ip_address_full_to_tuple(addr): raise NmstateValueError(f"Invalid IP address, error: {err}") return f"{net.network_address}", net.prefixlen + + +def canonicalize_ip_network(address): + try: + return ipaddress.ip_network(address, strict=False).with_prefixlen + except ValueError as e: + raise NmstateValueError(f"Invalid IP network address: {e}") + + +def canonicalize_ip_address(address): + try: + return ipaddress.ip_address(address).compressed + except ValueError as e: + raise NmstateValueError(f"Invalid IP address: {e}") diff --git a/libnmstate/route.py b/libnmstate/route.py index 6534182..a182f99 100644 --- a/libnmstate/route.py +++ b/libnmstate/route.py @@ -22,6 +22,8 @@ from collections import defaultdict from libnmstate.error import NmstateValueError from libnmstate.error import NmstateVerificationError from libnmstate.iplib import is_ipv6_address +from libnmstate.iplib import canonicalize_ip_network +from libnmstate.iplib import canonicalize_ip_address from libnmstate.prettystate import format_desired_current_state_diff from libnmstate.schema import Interface from libnmstate.schema import Route @@ -44,6 +46,7 @@ class RouteEntry(StateEntry): # TODO: Convert IPv6 full address to abbreviated address self.complement_defaults() self._invalid_reason = None + self._canonicalize_ip_address() @property def is_ipv6(self): @@ -148,6 +151,15 @@ class RouteEntry(StateEntry): return False return True + def _canonicalize_ip_address(self): + if not self.absent: + if self.destination: + self.destination = canonicalize_ip_network(self.destination) + if self.next_hop_address: + self.next_hop_address = canonicalize_ip_address( + self.next_hop_address + ) + class RouteState: def __init__(self, ifaces, des_route_state, cur_route_state): diff --git a/libnmstate/route_rule.py b/libnmstate/route_rule.py index 8b45367..f35d59c 100644 --- a/libnmstate/route_rule.py +++ b/libnmstate/route_rule.py @@ -6,7 +6,7 @@ from libnmstate.error import NmstateVerificationError from libnmstate.error import NmstateValueError from libnmstate.iplib import KERNEL_MAIN_ROUTE_TABLE_ID from libnmstate.iplib import is_ipv6_address -from libnmstate.iplib import to_ip_address_full +from libnmstate.iplib import canonicalize_ip_network from libnmstate.prettystate import format_desired_current_state_diff from libnmstate.schema import Interface from libnmstate.schema import RouteRule @@ -23,7 +23,7 @@ class RouteRuleEntry(StateEntry): self.priority = route_rule.get(RouteRule.PRIORITY) self.route_table = route_rule.get(RouteRule.ROUTE_TABLE) self._complement_defaults() - self._append_prefix_length_for_host_only_ip() + self._canonicalize_ip_network() def _complement_defaults(self): if self.ip_from is None: @@ -38,11 +38,11 @@ class RouteRuleEntry(StateEntry): ): self.route_table = KERNEL_MAIN_ROUTE_TABLE_ID - def _append_prefix_length_for_host_only_ip(self): - if self.ip_from and "/" not in self.ip_from: - self.ip_from = to_ip_address_full(self.ip_from) - if self.ip_to and "/" not in self.ip_to: - self.ip_to = to_ip_address_full(self.ip_to) + def _canonicalize_ip_network(self): + if self.ip_from: + self.ip_from = canonicalize_ip_network(self.ip_from) + if self.ip_to: + self.ip_to = canonicalize_ip_network(self.ip_to) def _keys(self): return (self.ip_from, self.ip_to, self.priority, self.route_table) -- 2.27.0