Source code for csle_common.dao.emulation_config.containers_config

from typing import List, Union, Dict, Any
from csle_common.dao.emulation_config.node_container_config import NodeContainerConfig
from csle_common.dao.emulation_config.container_network import ContainerNetwork
from csle_common.util.general_util import GeneralUtil
from csle_base.json_serializable import JSONSerializable


[docs]class ContainersConfig(JSONSerializable): """ A DTO representing the configuration of the containers that make up an emulation environment """ def __init__(self, containers: List[NodeContainerConfig], agent_ip: str, router_ip: str, networks: List[ContainerNetwork], ids_enabled: bool, vulnerable_nodes=None, agent_reachable_nodes=None): """ Initializes the DTO :param containers: the list of containers :param agent_ip: the ip of the agent :param router_ip: the ip of the router :param ids_enabled: whether the IDS is enabled or nt :param vulnerable_nodes: the list of vulnerable nodes :param networks: list of subnetworks :param agent_reachable_nodes: nodes directly reachable by the attacker """ self.containers = containers self.agent_ip = agent_ip self.router_ip = router_ip self.ids_enabled = ids_enabled self.vulnerable_nodes = vulnerable_nodes self.networks = networks self.agent_reachable_nodes = agent_reachable_nodes
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "ContainersConfig": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = ContainersConfig( containers=list(map(lambda x: NodeContainerConfig.from_dict(x), d["containers"])), agent_ip=d["agent_ip"], router_ip=d["router_ip"], networks=list(map(lambda x: ContainerNetwork.from_dict(x), d["networks"])), ids_enabled=d["ids_enabled"], vulnerable_nodes=d["vulnerable_nodes"], agent_reachable_nodes=d["agent_reachable_nodes"] ) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["agent_ip"] = self.agent_ip d["router_ip"] = self.router_ip d["networks"] = list(map(lambda x: x.to_dict(), self.networks)) d["ids_enabled"] = self.ids_enabled d["vulnerable_nodes"] = self.vulnerable_nodes d["containers"] = list(map(lambda x: x.to_dict(), self.containers)) d["agent_reachable_nodes"] = self.agent_reachable_nodes return d
def __str__(self) -> str: """ :return: a string representation of the object """ return f"containers:{self.containers},networks:{self.networks},agent_ip:{self.agent_ip}, " \ f"router_ip:{self.router_ip}" \ f"ids_enabled:{self.ids_enabled},vulnerable_nodes:{self.vulnerable_nodes}, " \ f"agent_reachable_nodes: {self.agent_reachable_nodes}"
[docs] def get_reachable_ips(self, container: NodeContainerConfig) -> List[str]: """ Get list of IP addresses reachable from a given container :param container: the container to get reachable IPs from :return: """ reachable_ips = [] for c in self.containers: for ip_net in c.ips_and_networks: ip, net = ip_net for container_ip_net in container.ips_and_networks: container_ip, container_net = container_ip_net if net.name == container_net.name: reachable_ips.append(ip) return reachable_ips
[docs] def get_agent_container(self) -> Union[NodeContainerConfig, None]: """ :return: get container of the attacker agent """ for container in self.containers: if self.agent_ip in container.get_ips(): return container return None
[docs] def get_agent_reachable_ips(self) -> List[str]: """ :return: list of ips reachable for the attacker agent """ agent_container = self.get_agent_container() if agent_container is None: raise ValueError("Agent container is None") return self.get_reachable_ips(container=agent_container)
[docs] def get_container_from_ip(self, ip: str) -> Union[NodeContainerConfig, None]: """ Utility function for getting the container :param ip: the ip of the container :return: the container with the given ip or None """ for c in self.containers: if ip in c.get_ips() or ip == c.docker_gw_bridge_ip: return c return None
[docs] def get_container_from_full_name(self, name: str) -> Union[NodeContainerConfig, None]: """ Utility function for getting the container :param name: the full name of the container :return: the container with the given ip or None """ for c in self.containers: if name == c.full_name_str: return c return None
[docs] @staticmethod def from_json_file(json_file_path: str) -> "ContainersConfig": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return ContainersConfig.from_dict(json.loads(json_str))
[docs] def copy(self) -> "ContainersConfig": """ :return: a copy of the DTO """ return ContainersConfig.from_dict(self.to_dict())
[docs] def create_execution_config(self, ip_first_octet: int, physical_servers: List[str]) -> "ContainersConfig": """ Creates a new config for an execution :param ip_first_octet: the first octet of the IP of the new execution :param physical_servers: the physical servers where the execution will be deployed :return: the new config """ config = self.copy() config.containers = list(map(lambda x: x.create_execution_config(ip_first_octet=ip_first_octet, physical_servers=physical_servers), config.containers)) config.networks = list(map(lambda x: x.create_execution_config(ip_first_octet=ip_first_octet), config.networks)) config.agent_ip = GeneralUtil.replace_first_octet_of_ip(ip=config.agent_ip, ip_first_octet=ip_first_octet) config.router_ip = GeneralUtil.replace_first_octet_of_ip(ip=config.router_ip, ip_first_octet=ip_first_octet) config.vulnerable_nodes = list(map(lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), config.vulnerable_nodes)) config.agent_reachable_nodes = list(map(lambda x: GeneralUtil.replace_first_octet_of_ip( ip=x, ip_first_octet=ip_first_octet), config.agent_reachable_nodes)) return config