Source code for csle_common.dao.emulation_config.node_firewall_config

from typing import Set, List, Dict, Any, Tuple
from csle_common.dao.emulation_config.default_network_firewall_config import DefaultNetworkFirewallConfig
from csle_common.util.general_util import GeneralUtil
from csle_base.json_serializable import JSONSerializable


[docs]class NodeFirewallConfig(JSONSerializable): """ A DTO object representing a firewall configuration of a container in an emulation environment """ def __init__(self, ips_gw_default_policy_networks: List[DefaultNetworkFirewallConfig], hostname: str, output_accept: Set[str], input_accept: Set[str], forward_accept: Set[str], output_drop: Set[str], input_drop: Set[str], forward_drop: Set[str], routes: Set[Tuple[str, str]], docker_gw_bridge_ip: str = "", physical_host_ip: str = ""): """ Initializes the DTO :param ips_gw_default_policy_networks: List of ip,gw,default policy, network :param ip: the ip of the node :param hostname: the hostname of the node :param output_accept: the list of ips to accept output :param input_accept: the list of ips to accept input :param forward_accept: the list of ips to accept forward :param output_drop: the list of ips to drop output :param input_drop: the list of ips to drop input :param forward_drop: the list of ips to drop forward :param routes: the set of custom routes for the routing table :param docker_gw_bridge_ip: IP to reach the container from the host network :param physical_host_ip: IP of the physical host where the container is running """ self.ips_gw_default_policy_networks = ips_gw_default_policy_networks self.docker_gw_bridge_ip = docker_gw_bridge_ip self.hostname = hostname self.output_accept = output_accept self.input_accept = input_accept self.forward_accept = forward_accept self.output_drop = output_drop self.input_drop = input_drop self.forward_drop = forward_drop self.routes = routes self.physical_host_ip = physical_host_ip
[docs] def get_ips(self): """ :return: list of ip addresses """ return list(filter(lambda x: x is not None, map(lambda x: x.ip, self.ips_gw_default_policy_networks)))
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "NodeFirewallConfig": """ Converts a dict representation into an instance :param d: the dict to convert :return: the created instance """ obj = NodeFirewallConfig( hostname=d["hostname"], ips_gw_default_policy_networks=list(map(lambda x: DefaultNetworkFirewallConfig.from_dict(x), d["ips_gw_default_policy_networks"])), output_accept=set(d["output_accept"]), input_accept=set(d["input_accept"]), forward_accept=set(d["forward_accept"]), output_drop=set(d["output_drop"]), input_drop=set(d["input_drop"]), forward_drop=set(d["forward_drop"]), routes=set(list(map(lambda x: (str(x[0]), str(x[1])), d["routes"]))), docker_gw_bridge_ip=d["docker_gw_bridge_ip"], physical_host_ip=d["physical_host_ip"] ) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["hostname"] = self.hostname d["ips_gw_default_policy_networks"] = list(map(lambda x: x.to_dict(), self.ips_gw_default_policy_networks)) d["output_accept"] = list(self.output_accept) d["input_accept"] = list(self.input_accept) d["forward_accept"] = list(self.forward_accept) d["output_drop"] = list(self.output_drop) d["input_drop"] = list(self.input_drop) d["forward_drop"] = list(self.forward_drop) d["routes"] = list(self.routes) d["docker_gw_bridge_ip"] = self.docker_gw_bridge_ip d["physical_host_ip"] = self.physical_host_ip return d
def __str__(self) -> str: """ :return: a string representation of the object """ return f"ips_gw_default_policy_networks:{list(map(lambda x: str(x), self.ips_gw_default_policy_networks))}, " \ f"output_accept:{self.output_accept}, " \ f"input_accept:{self.input_accept}, forward_accept:{self.forward_accept}, " \ f"output_drop:{self.output_drop}, " \ f"input_drop:{self.input_drop}, forward_drop:{self.forward_drop}, " \ f"routers:{self.routes}, hostname: {self.hostname}, docker_gw_bridge_ip:{self.docker_gw_bridge_ip}," \ f" physical_host_ip: {self.physical_host_ip}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "NodeFirewallConfig": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return NodeFirewallConfig.from_dict(json.loads(json_str))
[docs] def copy(self) -> "NodeFirewallConfig": """ :return: a copy of the DTO """ return NodeFirewallConfig.from_dict(self.to_dict())
[docs] def create_execution_config(self, ip_first_octet: int) -> "NodeFirewallConfig": """ Creates a new config for an execution :param ip_first_octet: the first octet of the IP of the new execution :return: the new config """ config = self.copy() config.output_accept = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), list(config.output_accept)))) config.input_accept = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), list(config.input_accept)))) config.forward_accept = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), list(config.forward_accept)))) config.output_drop = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), list(config.output_drop)))) config.input_drop = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), list(config.input_drop)))) config.forward_drop = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), list(config.forward_drop)))) config.routes = set(list(map( lambda x: GeneralUtil.replace_first_octet_of_ip_tuple( tuple_of_ips=x, ip_first_octet=ip_first_octet), list(config.routes)))) config.ips_gw_default_policy_networks = list(map(lambda x: x.create_execution_config( ip_first_octet=ip_first_octet), config.ips_gw_default_policy_networks)) return config
[docs] @staticmethod def schema() -> "NodeFirewallConfig": """ :return: get the schema of the DTO """ dto = NodeFirewallConfig(ips_gw_default_policy_networks=[DefaultNetworkFirewallConfig.schema()], hostname="", output_accept=set(), input_accept=set(), forward_accept=set(), output_drop=set(), input_drop=set(), forward_drop=set(), routes=set()) dto.output_accept.add("") dto.input_accept.add("") dto.forward_accept.add("") dto.output_drop.add("") dto.input_drop.add("") dto.forward_drop.add("") dto.routes.add(("", "")) return dto