Source code for csle_common.controllers.emulation_env_controller

from typing import List, Dict, Any, Union
import logging
import time
import subprocess
import random
import paramiko
import csle_collector.constants.constants as collector_constants
import csle_ryu.constants.constants as ryu_constants
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.node_resources_config import NodeResourcesConfig
from csle_common.controllers.container_controller import ContainerController
from csle_common.controllers.snort_ids_controller import SnortIDSController
from csle_common.controllers.ossec_ids_controller import OSSECIDSController
from csle_common.controllers.host_controller import HostController
from csle_common.controllers.kafka_controller import KafkaController
from csle_common.controllers.elk_controller import ELKController
from csle_common.controllers.sdn_controller_manager import SDNControllerManager
from csle_common.controllers.traffic_controller import TrafficController
from csle_common.util.emulation_util import EmulationUtil
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.util.experiment_util import ExperimentUtil
from csle_common.logging.log import Logger
from csle_common.dao.emulation_config.emulation_execution import EmulationExecution
from csle_common.dao.emulation_config.emulation_execution_info import EmulationExecutionInfo
from csle_common.dao.emulation_config.config import Config
from csle_common.tunneling.forward_tunnel_thread import ForwardTunnelThread
from csle_common.util.cluster_util import ClusterUtil


[docs]class EmulationEnvController: """ Class managing emulation environments """
[docs] @staticmethod def stop_all_executions_of_emulation(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) -> None: """ Stops all executions of a given emulation :param emulation_env_config: the emulation for which executions should be stopped :param physical_server_ip: ip of the physical server :param logger: the logger to use for logging :return: None """ executions = MetastoreFacade.list_emulation_executions_for_a_given_emulation( emulation_name=emulation_env_config.name) for exec in executions: EmulationEnvController.stop_containers(execution=exec, physical_server_ip=physical_server_ip, logger=logger) ContainerController.stop_docker_stats_thread(execution=exec, physical_server_ip=physical_server_ip, logger=logger)
[docs] @staticmethod def stop_execution_of_emulation(emulation_env_config: EmulationEnvConfig, execution_id: int, physical_server_ip: str, logger: logging.Logger) -> None: """ Stops an execution of a given emulation :param emulation_env_config: the emulation for which executions should be stopped :param execution_id: the id of the execution to stop :param physical_server_ip: ip of the physical server :param logger: the logger to use for logging :return: None """ execution = MetastoreFacade.get_emulation_execution(emulation_name=emulation_env_config.name, ip_first_octet=execution_id) if execution is None: raise ValueError(f"Could not find any execution with id: {execution_id}, " f"emulation: {emulation_env_config.name}") EmulationEnvController.stop_containers(execution=execution, physical_server_ip=physical_server_ip, logger=logger) ContainerController.stop_docker_stats_thread(execution=execution, physical_server_ip=physical_server_ip, logger=logger)
[docs] @staticmethod def stop_all_executions(physical_server_ip: str, logger: logging.Logger) -> None: """ Stops all emulation executions :param physical_server_ip: ip of the physical server :param logger: the logger to use for logging :return: None """ executions = MetastoreFacade.list_emulation_executions() for exec in executions: EmulationEnvController.stop_containers(execution=exec, physical_server_ip=physical_server_ip, logger=logger) ContainerController.stop_docker_stats_thread(execution=exec, physical_server_ip=physical_server_ip, logger=logger)
[docs] @staticmethod def install_csle_collector_and_ryu_libraries(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) \ -> None: """ Installs the latest csle-collector and csle-ryu libraries on all nodes of a given emulation :param emulation_env_config: the emulation configuration :param physical_server_ip: the IP of the physical servers where the containers are :param logger: the logger to use for logging :return: None """ containers = list(filter(lambda x: x.physical_host_ip == physical_server_ip, emulation_env_config.containers_config.containers)) ips = list(map(lambda x: x.docker_gw_bridge_ip, containers)) if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip: ips.append(emulation_env_config.kafka_config.container.docker_gw_bridge_ip) if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip: ips.append(emulation_env_config.elk_config.container.docker_gw_bridge_ip) if emulation_env_config.sdn_controller_config is not None: ips.append(emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip) for ip in ips: logger.info(f"Installing csle-collector version " f"{emulation_env_config.csle_collector_version} on node: {ip}") EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip) cmd = collector_constants.INSTALL if emulation_env_config.csle_collector_version != collector_constants.LATEST_VERSION: cmd = cmd + f"=={emulation_env_config.csle_collector_version}" o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip)) time.sleep(2) o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip)) time.sleep(2) logger.info(f"Installing csle-ryu version " f"{emulation_env_config.csle_ryu_version} on node: {ip}") EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip) cmd = ryu_constants.INSTALL if emulation_env_config.csle_ryu_version != ryu_constants.LATEST_VERSION: cmd = cmd + f"=={emulation_env_config.csle_ryu_version}" o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip)) time.sleep(2) o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip)) EmulationUtil.disconnect_admin(emulation_env_config=emulation_env_config)
[docs] @staticmethod def update_execution_config_w_docker_gw_bridge_ip(execution: EmulationExecution) -> EmulationExecution: """ Updates the execution configuration with the IP of the docker gw of the docker swarm :param execution: the execution to update :return: the updated execution """ emulation_env_config = execution.emulation_env_config emulation_env_config.kafka_config.resources.docker_gw_bridge_ip = \ emulation_env_config.kafka_config.container.docker_gw_bridge_ip emulation_env_config.kafka_config.resources.physical_host_ip = \ emulation_env_config.kafka_config.container.physical_host_ip emulation_env_config.elk_config.resources.docker_gw_bridge_ip = \ emulation_env_config.elk_config.container.docker_gw_bridge_ip emulation_env_config.elk_config.resources.physical_host_ip = \ emulation_env_config.elk_config.container.physical_host_ip if emulation_env_config.sdn_controller_config is not None: emulation_env_config.sdn_controller_config.resources.docker_gw_bridge_ip = \ emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip emulation_env_config.sdn_controller_config.resources.physical_host_ip = \ emulation_env_config.sdn_controller_config.container.physical_host_ip for resource_config in emulation_env_config.resources_config.node_resources_configurations: for container in emulation_env_config.containers_config.containers: if container.get_readable_name() == resource_config.container_name: resource_config.docker_gw_bridge_ip = container.docker_gw_bridge_ip resource_config.physical_host_ip = container.physical_host_ip for ovs_switch in emulation_env_config.ovs_config.switch_configs: for container in emulation_env_config.containers_config.containers: if container.get_readable_name() == ovs_switch.container_name: ovs_switch.docker_gw_bridge_ip = container.docker_gw_bridge_ip ovs_switch.physical_host_ip = container.physical_host_ip for user_config in emulation_env_config.users_config.users_configs: for container in emulation_env_config.containers_config.containers: if user_config.ip in container.get_ips(): user_config.docker_gw_bridge_ip = container.docker_gw_bridge_ip user_config.physical_host_ip = container.physical_host_ip for vuln_config in emulation_env_config.vuln_config.node_vulnerability_configs: for container in emulation_env_config.containers_config.containers: if vuln_config.ip in container.get_ips(): vuln_config.docker_gw_bridge_ip = container.docker_gw_bridge_ip vuln_config.physical_host_ip = container.physical_host_ip for flags_config in emulation_env_config.flags_config.node_flag_configs: for container in emulation_env_config.containers_config.containers: if flags_config.ip in container.get_ips(): flags_config.docker_gw_bridge_ip = container.docker_gw_bridge_ip flags_config.physical_host_ip = container.physical_host_ip for node_fw_config in emulation_env_config.topology_config.node_configs: for container in emulation_env_config.containers_config.containers: for ip in container.get_ips(): if ip in node_fw_config.get_ips(): node_fw_config.docker_gw_bridge_ip = container.docker_gw_bridge_ip node_fw_config.physical_host_ip = container.physical_host_ip break emulation_env_config.kafka_config.firewall_config.docker_gw_bridge_ip = \ emulation_env_config.kafka_config.container.docker_gw_bridge_ip emulation_env_config.kafka_config.firewall_config.physical_host_ip = \ emulation_env_config.kafka_config.container.physical_host_ip emulation_env_config.elk_config.firewall_config.docker_gw_bridge_ip = \ emulation_env_config.elk_config.container.docker_gw_bridge_ip emulation_env_config.elk_config.firewall_config.physical_host_ip = \ emulation_env_config.elk_config.container.physical_host_ip if emulation_env_config.sdn_controller_config is not None: emulation_env_config.sdn_controller_config.firewall_config.docker_gw_bridge_ip = \ emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip emulation_env_config.sdn_controller_config.firewall_config.physical_host_ip = \ emulation_env_config.sdn_controller_config.container.physical_host_ip for node_traffic_config in emulation_env_config.traffic_config.node_traffic_configs: for container in emulation_env_config.containers_config.containers: if node_traffic_config.ip in container.get_ips(): node_traffic_config.docker_gw_bridge_ip = container.docker_gw_bridge_ip node_traffic_config.physical_host_ip = container.physical_host_ip for container in emulation_env_config.containers_config.containers: if emulation_env_config.traffic_config.client_population_config.ip in container.get_ips(): emulation_env_config.traffic_config.client_population_config.docker_gw_bridge_ip = \ container.docker_gw_bridge_ip emulation_env_config.traffic_config.client_population_config.physical_host_ip = \ container.physical_host_ip execution.emulation_env_config = emulation_env_config MetastoreFacade.update_emulation_execution(emulation_execution=execution, ip_first_octet=execution.ip_first_octet, emulation=execution.emulation_name) return execution
[docs] @staticmethod def apply_kafka_config(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) -> None: """ Applies the kafka config :param emulation_env_config: the emulation env config :param physical_server_ip: ip of the physical server :param logger: the logger to use for logging :return: None """ if emulation_env_config.kafka_config.container.physical_host_ip != physical_server_ip: return steps = 3 current_step = 1 logger.info("-- Configuring the kafka container --") logger.info( f"-- Kafka configuration step {current_step}/{steps}: Configuring the IP addresses of the kafka brokers --") KafkaController.configure_broker_ips(emulation_env_config=emulation_env_config, logger=logger) current_step += 1 logger.info( f"-- Kafka configuration step {current_step}/{steps}: Restarting the Kafka server --") KafkaController.stop_kafka_server(emulation_env_config=emulation_env_config, logger=logger) time.sleep(20) KafkaController.start_kafka_server(emulation_env_config=emulation_env_config, logger=logger) time.sleep(20) current_step += 1 logger.info(f"-- Kafka configuration step {current_step}/{steps}: Create topics --") KafkaController.create_topics(emulation_env_config=emulation_env_config, logger=logger)
[docs] @staticmethod def start_custom_traffic(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, no_traffic: bool = True) -> None: """ Utility function for starting traffic generators and client population on a given emulation :param emulation_env_config: the configuration of the emulation :param no_traffic boolean flag whether the internal traffic generators should be skipped. :param physical_server_ip: ip of the physical servern :return: None """ if not no_traffic: TrafficController.start_internal_traffic_generators(emulation_env_config=emulation_env_config, physical_server_ip=physical_server_ip, logger=Logger.__call__().get_logger()) TrafficController.stop_client_producer(emulation_env_config=emulation_env_config, physical_server_ip=physical_server_ip, logger=Logger.__call__().get_logger()) TrafficController.start_client_population(emulation_env_config=emulation_env_config, physical_server_ip=physical_server_ip, logger=Logger.__call__().get_logger()) TrafficController.start_client_producer(emulation_env_config=emulation_env_config, physical_server_ip=physical_server_ip, logger=Logger.__call__().get_logger())
[docs] @staticmethod def delete_networks_of_emulation_env_config(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger, leader: bool = False) -> None: """ Deletes the docker networks :param emulation_env_config: the emulation env config :param physical_server_ip: the ip of the physical server to remove the networks :param leader: boolean flag indicating whether this node is the leader in the Swarm cluster :param logger: the logger to use for logging :return: None """ for c in emulation_env_config.containers_config.containers: if c.physical_host_ip == physical_server_ip or leader: for ip_net in c.ips_and_networks: ip, net = ip_net ContainerController.remove_network(name=net.name, logger=logger) c = emulation_env_config.kafka_config.container if c.physical_host_ip == physical_server_ip or leader: for ip_net in c.ips_and_networks: ip, net = ip_net ContainerController.remove_network(name=net.name, logger=logger)
[docs] @staticmethod def create_execution(emulation_env_config: EmulationEnvConfig, physical_servers: List[str], logger: logging.Logger, id: int = -1) -> EmulationExecution: """ Creates a new emulation execution :param emulation_env_config: the emulation configuration :param physical_servers: the physical servers to deploy the containers on :param id: the id of the execution (if not specified the next available id will be used) :param logger: the logger to use for logging :return: a DTO representing the execution """ timestamp = float(time.time()) total_subnets = constants.CSLE.LIST_OF_IP_SUBNETS used_subnets = list(map(lambda x: x.ip_first_octet, MetastoreFacade.list_emulation_executions_for_a_given_emulation( emulation_name=emulation_env_config.name))) available_subnets = list(filter(lambda x: x not in used_subnets, total_subnets)) ip_first_octet = available_subnets[0] if id != -1 and id not in available_subnets: logger.warning(f"The specified execution ID: {id} is not valid or is already taken. " f"Using ID: {ip_first_octet} instead") elif id != -1 and id in available_subnets: ip_first_octet = id em_config = emulation_env_config.create_execution_config(ip_first_octet=ip_first_octet, physical_servers=physical_servers) emulation_execution = EmulationExecution(emulation_name=emulation_env_config.name, timestamp=timestamp, ip_first_octet=ip_first_octet, emulation_env_config=em_config, physical_servers=physical_servers) MetastoreFacade.save_emulation_execution(emulation_execution=emulation_execution) return emulation_execution
[docs] @staticmethod def run_containers(emulation_execution: EmulationExecution, physical_host_ip: str, logger: logging.Logger) -> None: """ Run containers in the emulation env config :param emulation_execution: the execution DTO :param physical_host_ip: the ip of the physical host where the containers should be started :param logger: the logger to use for logging :return: None """ path = ExperimentUtil.default_output_dir() emulation_env_config = emulation_execution.emulation_env_config # Start regular containers for c in emulation_env_config.containers_config.containers: if c.physical_host_ip != physical_host_ip: continue ips = c.get_ips() container_resources: Union[None, NodeResourcesConfig] = None for r in emulation_env_config.resources_config.node_resources_configurations: for ip_net_resources in r.ips_and_network_configs: ip, net_resources = ip_net_resources if ip in ips: container_resources = r break if container_resources is None: raise ValueError(f"Container resources not found for container with ips:{ips}, " f"resources:{emulation_env_config.resources_config}") name = c.get_full_name() cmd = f"docker container run -dt --name {name} " \ f"--hostname={c.name}{c.suffix} --label dir={path} " \ f"--label cfg={path + constants.DOCKER.EMULATION_ENV_CFG_PATH} " \ f"-e TZ=Europe/Stockholm " \ f"--label emulation={emulation_env_config.name} --network=none --publish-all=true " \ f"--memory={container_resources.available_memory_gb}G --cpus={container_resources.num_cpus} " \ f"--restart={c.restart_policy} --cap-add NET_ADMIN --cap-add=SYS_NICE --privileged " \ f"{constants.CONTAINER_IMAGES.DOCKERHUB_USERNAME}/{c.name}:{c.version}" logger.info(f"Starting container:{name} with cmd: {cmd}") subprocess.call(cmd, shell=True) if emulation_env_config.kafka_config.container.physical_host_ip == physical_host_ip: # Start the kafka container c = emulation_env_config.kafka_config.container container_resources = emulation_env_config.kafka_config.resources name = c.get_full_name() cmd = f"docker container run -dt --name {name} " \ f"--hostname={c.name}{c.suffix} --label dir={path} " \ f"--label cfg={path + constants.DOCKER.EMULATION_ENV_CFG_PATH} " \ f"-e TZ=Europe/Stockholm " \ f"--label emulation={emulation_env_config.name} --network=none --publish-all=true " \ f"--memory={container_resources.available_memory_gb}G --cpus={container_resources.num_cpus} " \ f"--restart={c.restart_policy} --cap-add NET_ADMIN --cap-add=SYS_NICE --privileged " \ f"{constants.CONTAINER_IMAGES.DOCKERHUB_USERNAME}/{c.name}:{c.version}" logger.info(f"Starting container:{name}, cmd: {cmd}") subprocess.call(cmd, shell=True) if emulation_env_config.elk_config.container.physical_host_ip == physical_host_ip: # Start the ELK container c = emulation_env_config.elk_config.container container_resources = emulation_env_config.elk_config.resources name = c.get_full_name() cmd = f"docker container run -dt --name {name} " \ f"--hostname={c.name}{c.suffix} --label dir={path} " \ f"--label cfg={path + constants.DOCKER.EMULATION_ENV_CFG_PATH} " \ f"-e TZ=Europe/Stockholm " \ f"--label emulation={emulation_env_config.name} --network=none --publish-all=true " \ f"--memory={container_resources.available_memory_gb}G --cpus={container_resources.num_cpus} " \ f"--restart={c.restart_policy} --cap-add NET_ADMIN --cap-add=SYS_NICE --privileged " \ f"{constants.CONTAINER_IMAGES.DOCKERHUB_USERNAME}/{c.name}:{c.version}" logger.info(f"Starting container:{name}, cmd: {cmd}") subprocess.call(cmd, shell=True) if emulation_env_config.sdn_controller_config is not None \ and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_host_ip: # Start the SDN controller container c = emulation_env_config.sdn_controller_config.container container_resources = emulation_env_config.sdn_controller_config.resources name = c.get_full_name() cmd = f"docker container run -dt --name {name} " \ f"--hostname={c.name}{c.suffix} --label dir={path} " \ f"--label cfg={path + constants.DOCKER.EMULATION_ENV_CFG_PATH} " \ f"-e TZ=Europe/Stockholm " \ f"--label emulation={emulation_env_config.name} --network=none --publish-all=true " \ f"--memory={container_resources.available_memory_gb}G --cpus={container_resources.num_cpus} " \ f"--restart={c.restart_policy} --cap-add NET_ADMIN --cap-add=SYS_NICE --privileged " \ f"{constants.CONTAINER_IMAGES.DOCKERHUB_USERNAME}/{c.name}:{c.version}" logger.info(f"Starting container:{name}, cmd: {cmd}") subprocess.call(cmd, shell=True)
[docs] @staticmethod def start_containers_of_execution(emulation_execution: EmulationExecution, physical_host_ip: str) -> None: """ Starts stopped containers in a given emulation execution :param emulation_execution: the execution DTO :param physical_host_ip: the ip of the physical host :return: None """ emulation_env_config = emulation_execution.emulation_env_config # Start regular containers for c in emulation_env_config.containers_config.containers: if c.physical_host_ip == physical_host_ip: ContainerController.start_container(name=c.get_full_name()) # Start the kafka container c = emulation_env_config.kafka_config.container if c.physical_host_ip == physical_host_ip: ContainerController.start_container(name=c.get_full_name()) # Start the ELK container c = emulation_env_config.elk_config.container if c.physical_host_ip == physical_host_ip: ContainerController.start_container(name=c.get_full_name()) if emulation_env_config.sdn_controller_config is not None: # Start the SDN controller container c = emulation_env_config.sdn_controller_config.container if c.physical_host_ip == physical_host_ip: ContainerController.start_container(name=c.get_full_name())
[docs] @staticmethod def run_container(image: str, name: str, logger: logging.Logger, memory: int = 4, num_cpus: int = 1, create_network: bool = True, version: str = "0.0.1") -> None: """ Runs a given container :param image: image of the container :param name: name of the container :param memory: memory in GB :param num_cpus: number of CPUs to allocate :param create_network: whether to create a virtual network or not :param version: the version tag :param logger: the logger to use for logging :return: None """ logger.info(f"Starting container with image:{image} and name:csle_{name}-{version.replace('.', '')}") if create_network: net_id = random.randint(128, 254) sub_net_id = random.randint(2, 254) host_id = random.randint(2, 254) net_name = f"csle_custom_net_{name}_{net_id}" ip = f"55.{net_id}.{sub_net_id}.{host_id}" ContainerController.create_network( name=net_name, subnetmask=f"55.{net_id}.0.0/16", existing_network_names=[], logger=logger) cmd = f"docker container run -dt --name csle_{name}-{version.replace('.', '')} " \ f"--hostname={name} " \ f"-e TZ=Europe/Stockholm " \ f"--network={net_name} --ip {ip} --publish-all=true " \ f"--memory={memory}G --cpus={num_cpus} " \ f"--restart={constants.DOCKER.ON_FAILURE_3} --cap-add NET_ADMIN --privileged " \ f"--cap-add=SYS_NICE {image}" else: cmd = f"docker container run -dt --name csle-{name}-{version.replace('.', '')} " \ f"--hostname={name} " \ f"-e TZ=Europe/Stockholm --net=none " \ f"--publish-all=true " \ f"--memory={memory}G --cpus={num_cpus} " \ f"--restart={constants.DOCKER.ON_FAILURE_3} --cap-add NET_ADMIN --privileged " \ f"--cap-add=SYS_NICE {image}" subprocess.call(cmd, shell=True)
[docs] @staticmethod def stop_containers(execution: EmulationExecution, physical_server_ip: str, logger: logging.Logger) -> None: """ Stop containers in the emulation env config :param execution: the execution to stop :param physical_server_ip: the ip of the physical server :param logger: the logger to use for logging :return: None """ emulation_env_config = execution.emulation_env_config # Stop regular containers for c in emulation_env_config.containers_config.containers: if c.physical_host_ip != physical_server_ip: continue name = c.get_full_name() logger.info(f"Stopping container:{name}") cmd = f"docker stop {name}" subprocess.call(cmd, shell=True) # Stop the Kafka container c = emulation_env_config.kafka_config.container if c.physical_host_ip == physical_server_ip: name = c.get_full_name() logger.info(f"Stopping container:{name}") cmd = f"docker stop {name}" subprocess.call(cmd, shell=True) # Stop the ELK container c = emulation_env_config.elk_config.container if c.physical_host_ip == physical_server_ip: name = c.get_full_name() logger.info(f"Stopping container:{name}") cmd = f"docker stop {name}" subprocess.call(cmd, shell=True) if emulation_env_config.sdn_controller_config is not None: # Stop the SDN controller container c = emulation_env_config.sdn_controller_config.container if c.physical_host_ip == physical_server_ip: name = c.get_full_name() logger.info(f"Stopping container:{name}") cmd = f"docker stop {name}" subprocess.call(cmd, shell=True)
[docs] @staticmethod def clean_all_emulation_executions(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger, leader: bool = False) -> None: """ Cleans an emulation :param emulation_env_config: the config of the emulation to clean :param physical_server_ip: the ip of the physical server to clean the emulation executions :param leader: boolean flag indicating whether this node is the leader in the Swarm cluster or not :param logger: the logger to use for logging :return: None """ executions = MetastoreFacade.list_emulation_executions_for_a_given_emulation( emulation_name=emulation_env_config.name) for exec in executions: EmulationEnvController.stop_containers(execution=exec, physical_server_ip=physical_server_ip, logger=logger) EmulationEnvController.rm_containers(execution=exec, physical_server_ip=physical_server_ip, logger=logger) try: ContainerController.stop_docker_stats_thread(execution=exec, physical_server_ip=physical_server_ip, logger=logger) except Exception: pass EmulationEnvController.delete_networks_of_emulation_env_config( emulation_env_config=exec.emulation_env_config, physical_server_ip=physical_server_ip, logger=logger, leader=leader)
[docs] @staticmethod def clean_emulation_execution(emulation_env_config: EmulationEnvConfig, execution_id: int, physical_server_ip: str, logger: logging.Logger, leader: bool = False) -> None: """ Cleans an emulation execution :param execution_id: the id of the execution to clean :param emulation_env_config: the config of the emulation to clean :param physical_server_ip: the ip of the physical server to clean the execution :param leader: boolean flag indicating whether this node is the leader or not :param logger: the logger to use for logging :return: None """ execution = MetastoreFacade.get_emulation_execution(ip_first_octet=execution_id, emulation_name=emulation_env_config.name) if execution is None: raise ValueError(f"Could not find any execution with id: {execution_id}, " f"emulation: {emulation_env_config.name}") EmulationEnvController.stop_containers(execution=execution, physical_server_ip=physical_server_ip, logger=logger) EmulationEnvController.rm_containers(execution=execution, physical_server_ip=physical_server_ip, logger=logger) try: ContainerController.stop_docker_stats_thread(execution=execution, physical_server_ip=physical_server_ip, logger=logger) except Exception: pass EmulationEnvController.delete_networks_of_emulation_env_config( emulation_env_config=execution.emulation_env_config, physical_server_ip=physical_server_ip, logger=logger, leader=leader)
[docs] @staticmethod def clean_all_executions(physical_server_ip: str, logger: logging.Logger, leader: bool = False) -> None: """ Cleans all executions of a given emulation on a given physical server :param physical_server_ip: the ip of the physical server to clean the executions :param logger: the logger to use for logging :param leader: boolean flag indicating whether this node is the leader or not :return: None """ executions = MetastoreFacade.list_emulation_executions() for exec in executions: EmulationEnvController.stop_containers(execution=exec, physical_server_ip=physical_server_ip, logger=logger) EmulationEnvController.rm_containers(execution=exec, physical_server_ip=physical_server_ip, logger=logger) try: ContainerController.stop_docker_stats_thread(execution=exec, physical_server_ip=physical_server_ip, logger=logger) except Exception: pass EmulationEnvController.delete_networks_of_emulation_env_config( emulation_env_config=exec.emulation_env_config, physical_server_ip=physical_server_ip, logger=logger, leader=leader) MetastoreFacade.remove_emulation_execution(emulation_execution=exec)
[docs] @staticmethod def rm_containers(execution: EmulationExecution, physical_server_ip: str, logger: logging.Logger) -> None: """ Remove containers in the emulation env config for a given execution :param execution: the execution to remove :param physical_server_ip: the ip of the physical server to remove the containers :param logger: the logger to use for logging :return: None """ # Remove regular containers for c in execution.emulation_env_config.containers_config.containers: if c.physical_host_ip != physical_server_ip: continue name = c.get_full_name() logger.info(f"Removing container:{name}") cmd = f"docker rm {name}" subprocess.call(cmd, shell=True) # Remove the kafka container c = execution.emulation_env_config.kafka_config.container if c.physical_host_ip == physical_server_ip: name = c.get_full_name() logger.info(f"Removing container:{name}") cmd = f"docker rm {name}" subprocess.call(cmd, shell=True) # Remove the elk container c = execution.emulation_env_config.elk_config.container if c.physical_host_ip == physical_server_ip: name = c.get_full_name() logger.info(f"Removing container:{name}") cmd = f"docker rm {name}" subprocess.call(cmd, shell=True) if execution.emulation_env_config.sdn_controller_config is not None: # Remove the SDN controller container c = execution.emulation_env_config.sdn_controller_config.container if c.physical_host_ip == physical_server_ip: name = c.get_full_name() logger.info(f"Removing container:{name}") cmd = f"docker rm {name}" subprocess.call(cmd, shell=True)
[docs] @staticmethod def install_emulation(config: EmulationEnvConfig) -> None: """ Installs the emulation configuration in the metastore :param config: the config to install :return: None """ MetastoreFacade.install_emulation(config=config)
[docs] @staticmethod def save_emulation_image(img: bytes, emulation_name: str) -> None: """ Saves the emulation image :param image: the image data :param emulation_name: the name of the emulation :return: None """ MetastoreFacade.save_emulation_image(img=img, emulation_name=emulation_name)
[docs] @staticmethod def uninstall_emulation(config: EmulationEnvConfig) -> None: """ Uninstalls the emulation configuration in the metastore :param config: the config to uninstall :return: None """ MetastoreFacade.uninstall_emulation(config=config)
[docs] @staticmethod def ping_all(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) -> None: """ Tests the connections between all the containers using ping :param emulation_env_config: the emulation config :param physical_server_ip: the ip of the physical server :param logger: the logger to use for logging :return: None """ if emulation_env_config.sdn_controller_config is not None \ and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip: # Ping controller-switches for ovs_sw in emulation_env_config.ovs_config.switch_configs: logger.info(f"Ping {ovs_sw.controller_ip} to {ovs_sw.ip}") cmd = f"{constants.COMMANDS.DOCKER_EXEC_COMMAND} " \ f"{emulation_env_config.sdn_controller_config.container.get_full_name()} " \ f"{constants.COMMANDS.PING} " \ f"{ovs_sw.ip} -c 5 &" subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) logger.info(f"Ping {ovs_sw.ip} to {ovs_sw.controller_ip}") cmd = f"{constants.COMMANDS.DOCKER_EXEC_COMMAND} {ovs_sw.container_name} {constants.COMMANDS.PING} " \ f"{ovs_sw.controller_ip} -c 5 &" subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True) # Ping containers to switches for c1 in emulation_env_config.containers_config.containers: if c1.physical_host_ip != physical_server_ip: continue for c2 in emulation_env_config.containers_config.containers: for ip in c2.get_ips(): logger.info(f"Ping {c1.get_ips()[0]} to {ip}") cmd = f"{constants.COMMANDS.DOCKER_EXEC_COMMAND} {c1.get_full_name()} {constants.COMMANDS.PING} " \ f"{ip} -c 5 &" subprocess.Popen(cmd, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, shell=True)
[docs] @staticmethod def get_execution_info(execution: EmulationExecution, logger: logging.Logger, physical_server_ip: str) \ -> EmulationExecutionInfo: """ Gets runtime information about an execution :param execution: the emulation execution to get the information for :param logger: the logger to use for logging :param physical_server_ip: the IP of the physical server :return: execution information """ running_containers, stopped_containers = ContainerController.list_all_running_containers_in_emulation( emulation_env_config=execution.emulation_env_config) active_ips: List[str] = [] for container in running_containers: active_ips = active_ips + container.get_ips() active_ips.append(container.docker_gw_bridge_ip) active_ips.append(constants.COMMON.LOCALHOST) active_ips.append(constants.COMMON.LOCALHOST_127_0_0_1) active_ips.append(constants.COMMON.LOCALHOST_127_0_1_1) config = Config.get_current_config() if config is None: raise ValueError("Could not cluster read configuration") for node in config.cluster_config.cluster_nodes: active_ips.append(node.ip) emulation_name = execution.emulation_name execution_id = execution.ip_first_octet logger.info("Getting the Snort IDS Managers info") snort_ids_managers_info = \ SnortIDSController.get_snort_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_server_ip=physical_server_ip) logger.info("Getting the OSSEC IDS Managers info") ossec_ids_managers_info = \ OSSECIDSController.get_ossec_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_host_ip=physical_server_ip) logger.info("Getting the Kafka Managers info") kafka_managers_info = \ KafkaController.get_kafka_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_host_ip=physical_server_ip) logger.info("Getting the Host Managers info") host_managers_info = \ HostController.get_host_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_host_ip=physical_server_ip) logger.info("Getting the Client Managers info") client_managers_info = \ TrafficController.get_client_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger) logger.info("Getting the Traffic Managers info") traffic_managers_info = \ TrafficController.get_traffic_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_host_ip=physical_server_ip) logger.info("Getting the Docker Stats Managers info") docker_stats_managers_info = \ ContainerController.get_docker_stats_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_host_ip=physical_server_ip) logger.info("Getting the Elk Managers info") elk_managers_info = \ ELKController.get_elk_managers_info(emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_host_ip=physical_server_ip) active_networks, inactive_networks = ContainerController.list_all_active_networks_for_emulation( emulation_env_config=execution.emulation_env_config) ryu_managers_info = None if execution.emulation_env_config.sdn_controller_config is not None: ryu_managers_info = SDNControllerManager.get_ryu_managers_info( emulation_env_config=execution.emulation_env_config, active_ips=active_ips, logger=logger, physical_server_ip=physical_server_ip) execution_info = EmulationExecutionInfo(emulation_name=emulation_name, execution_id=execution_id, snort_ids_managers_info=snort_ids_managers_info, ossec_ids_managers_info=ossec_ids_managers_info, kafka_managers_info=kafka_managers_info, host_managers_info=host_managers_info, client_managers_info=client_managers_info, docker_stats_managers_info=docker_stats_managers_info, running_containers=running_containers, stopped_containers=stopped_containers, active_networks=active_networks, inactive_networks=inactive_networks, elk_managers_info=elk_managers_info, traffic_managers_info=traffic_managers_info, ryu_managers_info=ryu_managers_info) return execution_info
[docs] @staticmethod def create_ssh_tunnel(tunnels_dict: Dict[str, Any], local_port: int, remote_port: int, remote_ip: str, emulation: str, execution_id: int) -> None: """ Creates an SSH tunnel to forward the port of a container :param execution: the emulation execution :param tunnels_dict: a dict with existing tunnels :param local_port: the local port to forward :param remote_port: the remote port to forward :param remote_ip: the remote ip to forward :param emulation: the name of the emulation :param execution_id: the id of the execution :return: None """ config = Config.get_current_config() if config is None: ClusterUtil.set_config_parameters_from_config_file() config = Config.get_current_config() if config is None: raise ValueError("Could not read the CSLE configuration") conn = paramiko.SSHClient() if conn is None: raise ValueError("Could not create paramiko SSH client") conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) conn.connect(remote_ip, username=config.ssh_admin_username, password=config.ssh_admin_password) agent_transport = conn.get_transport() if agent_transport is None: raise ValueError(f"Error opening SSH connection to {remote_ip}") agent_transport.set_keepalive(5) tunnel_thread = ForwardTunnelThread( local_port=local_port, remote_host=remote_ip, remote_port=remote_port, transport=agent_transport, tunnels_dict=tunnels_dict) tunnel_thread.start() tunnel_thread_dict: Dict[str, Union[ForwardTunnelThread, int, str]] = {} tunnel_thread_dict[constants.GENERAL.THREAD_PROPERTY] = tunnel_thread tunnel_thread_dict[constants.GENERAL.PORT_PROPERTY] = local_port tunnel_thread_dict[constants.GENERAL.EMULATION_PROPERTY] = emulation tunnel_thread_dict[constants.GENERAL.EXECUTION_ID_PROPERTY] = execution_id tunnels_dict[remote_ip] = tunnel_thread_dict