Source code for csle_common.controllers.container_controller

from typing import List, Tuple, Union
import logging
import subprocess
import time
import docker
import re
import os
import grpc
import csle_collector.docker_stats_manager.docker_stats_manager_pb2_grpc
import csle_collector.docker_stats_manager.docker_stats_manager_pb2
import csle_collector.docker_stats_manager.query_docker_stats_manager
import csle_collector.docker_stats_manager.docker_stats_util
from csle_common.util.docker_util import DockerUtil
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.containers_config import ContainersConfig
from csle_common.dao.emulation_config.container_network import ContainerNetwork
from csle_common.dao.emulation_config.docker_stats_manager_config import DockerStatsManagerConfig
import csle_common.constants.constants as constants
from csle_common.logging.log import Logger
from csle_common.dao.emulation_config.emulation_execution import EmulationExecution
from csle_common.dao.emulation_config.docker_stats_managers_info import DockerStatsManagersInfo
from csle_common.dao.emulation_config.node_container_config import NodeContainerConfig
from csle_common.controllers.management_system_controller import ManagementSystemController
from csle_common.dao.emulation_config.config import Config
from csle_common.util.general_util import GeneralUtil


[docs]class ContainerController: """ A class for managing Docker containers and virtual networks """
[docs] @staticmethod def stop_all_running_containers() -> None: """ Utility function for stopping all running containers :return: None """ client_1 = docker.from_env() containers = client_1.containers.list() containers = list(filter(lambda x: constants.CSLE.NAME in x.name, containers)) for c in containers: Logger.__call__().get_logger().info(f"Stopping container: {c.name}") c.stop()
[docs] @staticmethod def stop_container(name: str) -> bool: """ Utility function for stopping a specific container :param name: the name of the container to stop :return: True if stopped, False otherwise """ client_1 = docker.from_env() containers = client_1.containers.list() containers = list(filter(lambda x: constants.CSLE.NAME in x.name, containers)) for c in containers: if c.name == name: c.stop() return True return False
[docs] @staticmethod def rm_all_stopped_containers() -> None: """ A utility function for removing all stopped containers :return: None """ client_1 = docker.from_env() containers = client_1.containers.list(all=True) containers = list(filter(lambda x: (constants.CSLE.NAME in x.name and x.status == constants.DOCKER.CONTAINER_EXIT_STATUS or x.status == constants.DOCKER.CONTAINER_CREATED_STATUS), containers)) for c in containers: Logger.__call__().get_logger().info(f"Removing container: {c.name}") c.remove()
[docs] @staticmethod def rm_container(container_name: str) -> bool: """ Remove a specific container :param container_name: the container to remove :return: True if the container was removed and False otherwise """ client_1 = docker.from_env() containers = client_1.containers.list(all=True) for c in containers: if c.name == container_name: c.remove() return True return False
[docs] @staticmethod def rm_all_images() -> None: """ A utility function for removing all csle images :return: None """ client_1 = docker.from_env() images = client_1.images.list() images = list(filter(lambda x: constants.CSLE.NAME in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]), images)) non_base_images = list( filter(lambda x: (constants.DOCKER.BASE_CONTAINER_TYPE not in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), images)) base_images = list(filter(lambda x: (constants.DOCKER.BASE_CONTAINER_TYPE in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), images)) non_os_base_images = list(filter( lambda x: not (constants.OS.UBUNTU in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]) or constants.OS.KALI in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), base_images)) os_base_images = list(filter(lambda x: (constants.OS.UBUNTU in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]) or constants.OS.KALI in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), base_images)) for img in non_base_images: Logger.__call__().get_logger().info("Removing image: {}".format(img.attrs[constants.DOCKER.REPO_TAGS])) client_1.images.remove(image=img.attrs[constants.DOCKER.REPO_TAGS][0], force=True) for img in non_os_base_images: Logger.__call__().get_logger().info("Removing image: {}".format(img.attrs[constants.DOCKER.REPO_TAGS])) client_1.images.remove(image=img.attrs[constants.DOCKER.REPO_TAGS][0], force=True) for img in os_base_images: Logger.__call__().get_logger().info("Removing image: {}".format(img.attrs[constants.DOCKER.REPO_TAGS])) client_1.images.remove(image=img.attrs[constants.DOCKER.REPO_TAGS][0], force=True)
[docs] @staticmethod def rm_image(name) -> bool: """ A utility function for removing a specific image :param name: the name of the image to remove :return: True if the image was removed and False otherwise """ client_1 = docker.from_env() images = client_1.images.list() images = list(filter(lambda x: constants.CSLE.NAME in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]), images)) non_base_images = list(filter( lambda x: (constants.DOCKER.BASE_CONTAINER_TYPE not in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), images)) base_images = list(filter(lambda x: (constants.DOCKER.BASE_CONTAINER_TYPE in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), images)) non_os_base_images = list( filter(lambda x: not (constants.OS.UBUNTU in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]) or constants.OS.KALI in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), base_images)) os_base_images = list( filter(lambda x: (constants.OS.UBUNTU in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]) or constants.OS.KALI in ",".join(x.attrs[constants.DOCKER.REPO_TAGS])), base_images)) for img in non_base_images: if img.attrs[constants.DOCKER.REPO_TAGS][0] == name: client_1.images.remove(image=img.attrs[constants.DOCKER.REPO_TAGS][0], force=True) return True for img in non_os_base_images: if img.attrs[constants.DOCKER.REPO_TAGS][0] == name: client_1.images.remove(image=img.attrs[constants.DOCKER.REPO_TAGS][0], force=True) return True for img in os_base_images: if img.attrs[constants.DOCKER.REPO_TAGS][0] == name: client_1.images.remove(image=img.attrs[constants.DOCKER.REPO_TAGS][0], force=True) return True return False
[docs] @staticmethod def list_all_images() -> List[Tuple[str, str, str, str, int]]: """ A utility function for listing all csle images :return: a list of the csle images """ client_1 = docker.from_env() images = client_1.images.list() images = list(filter(lambda x: constants.CSLE.NAME in ",".join(x.attrs[constants.DOCKER.REPO_TAGS]), images)) images_names_created_os_architecture_size = list( map(lambda x: (x.attrs[constants.DOCKER.REPO_TAGS][0], x.attrs[constants.DOCKER.IMAGE_CREATED], x.attrs[constants.DOCKER.IMAGE_OS], x.attrs[constants.DOCKER.IMAGE_ARCHITECTURE], x.attrs[constants.DOCKER.IMAGE_SIZE]), images)) return images_names_created_os_architecture_size
[docs] @staticmethod def list_docker_networks() -> Tuple[List[str], List[int]]: """ Lists the csle docker networks :return: (network names, network ids) """ cmd = constants.DOCKER.LIST_NETWORKS_CMD stream = os.popen(cmd) networks_str: str = stream.read() networks: List[str] = networks_str.split("\n") networks_list: List[List[str]] = list(map(lambda x: x.split(), networks)) networks_list = list(filter(lambda x: len(x) > 1, networks_list)) networks_ids_str: List[str] = list(map(lambda x: x[1], networks_list)) networks_ids_str = list(filter(lambda x: re.match( r"{}\d".format(constants.CSLE.CSLE_NETWORK_PREFIX), x), networks_ids_str)) network_ids: List[int] = list(map(lambda x: int(x.replace(constants.CSLE.CSLE_NETWORK_PREFIX, "")), networks_ids_str)) return networks_ids_str, network_ids
[docs] @staticmethod def list_all_networks() -> List[str]: """ A utility function for listing all csle networks :return: a list of the networks """ networks, network_ids = ContainerController.list_docker_networks() return networks
[docs] @staticmethod def start_all_stopped_containers() -> None: """ Starts all stopped csle containers :return: None """ client_1 = docker.from_env() containers = client_1.containers.list(all=True) containers = list(filter(lambda x: (constants.CSLE.NAME in x.name and x.status == constants.DOCKER.CONTAINER_EXIT_STATUS or x.status == constants.DOCKER.CONTAINER_CREATED_STATUS), containers)) for c in containers: Logger.__call__().get_logger().info("Starting container: {}".format(c.name)) c.start()
[docs] @staticmethod def start_container(name: str) -> bool: """ Starts a stopped container with a specific name :param name: the name of the stopped container to start :return: True if started, False otherrwise """ client_1 = docker.from_env() containers = client_1.containers.list(all=True) containers = list(filter(lambda x: (constants.CSLE.NAME in x.name and x.status == constants.DOCKER.CONTAINER_EXIT_STATUS or x.status == constants.DOCKER.CONTAINER_CREATED_STATUS), containers)) for c in containers: if c.name == name: c.start() return True return False
[docs] @staticmethod def list_all_running_containers() -> List[Tuple[str, str, str]]: """ Lists all running csle containers :return: a list of the names of the running containers """ parsed_envs = DockerUtil.parse_runnning_emulation_infos() container_name_image_ip: List[Tuple[str, str, str]] = [] for env in parsed_envs: container_name_image_ip = (container_name_image_ip + list(map(lambda x: (x.name, x.image_name, x.ip), env.containers))) return container_name_image_ip
[docs] @staticmethod def list_all_running_containers_in_emulation(emulation_env_config: EmulationEnvConfig) \ -> Tuple[List[NodeContainerConfig], List[NodeContainerConfig]]: """ Extracts the running containers and the stopped containers of a given emulation :param emulation_env_config: the emulation configuration :return: the list of running containers and the list of stopped containers """ running_emulation_containers = [] stopped_emulation_containers = [] running_containers = ContainerController.list_all_running_containers() running_containers_names = list(map(lambda x: x[0], running_containers)) for c in emulation_env_config.containers_config.containers: if c.full_name_str in running_containers_names: running_emulation_containers.append(c) else: stopped_emulation_containers.append(c) if emulation_env_config.kafka_config.container.full_name_str in running_containers_names: running_emulation_containers.append(emulation_env_config.kafka_config.container) else: stopped_emulation_containers.append(emulation_env_config.kafka_config.container) if emulation_env_config.elk_config.container.full_name_str in running_containers_names: running_emulation_containers.append(emulation_env_config.elk_config.container) else: stopped_emulation_containers.append(emulation_env_config.elk_config.container) if emulation_env_config.sdn_controller_config is not None: if emulation_env_config.sdn_controller_config.container.full_name_str in running_containers_names: running_emulation_containers.append(emulation_env_config.sdn_controller_config.container) else: stopped_emulation_containers.append(emulation_env_config.sdn_controller_config.container) return running_emulation_containers, stopped_emulation_containers
[docs] @staticmethod def list_all_active_networks_for_emulation(emulation_env_config: EmulationEnvConfig) \ -> Tuple[List[ContainerNetwork], List[ContainerNetwork]]: """ Extracts the active networks of a given emulation :param emulation_env_config: the emulation configuration :return: the list of active networks and inactive networks of an emulation """ active_emulation_networks = [] inactive_emulation_networks = [] active_networks_names = ContainerController.list_all_networks() for net in emulation_env_config.containers_config.networks: if net.name in active_networks_names: active_emulation_networks.append(net) else: inactive_emulation_networks.append(net) return active_emulation_networks, inactive_emulation_networks
[docs] @staticmethod def is_emulation_running(emulation_env_config: EmulationEnvConfig) -> bool: """ Checks if a given emulation config is running or not :param emulation_env_config: the emulation environment configuration :return: True if running otherwise False """ running_emulations = ContainerController.list_running_emulations() return emulation_env_config.name in running_emulations
[docs] @staticmethod def list_running_emulations() -> List[str]: """ :return: A list of names of running emulations """ parsed_envs = DockerUtil.parse_runnning_emulation_infos() emulation_names = set() for env in parsed_envs: emulation_names.add(env.name) return list(emulation_names)
[docs] @staticmethod def list_all_stopped_containers() -> List[Tuple[str, str, str]]: """ Lists all stopped csle containers :return: a list of the stopped containers """ client_1 = docker.from_env() client2 = docker.APIClient(base_url=constants.DOCKER.UNIX_DOCKER_SOCK_URL) parsed_stopped_containers = DockerUtil.parse_stopped_containers(client_1=client_1, client2=client2) container_name_image_ips = list(map(lambda x: (x.name, x.image_name, x.ip), parsed_stopped_containers)) return container_name_image_ips
[docs] @staticmethod def get_network_references(): """ :return: a list of Docker network references """ client_1 = docker.from_env() networks = client_1.networks.list() return networks
[docs] @staticmethod def create_networks(containers_config: ContainersConfig, logger: logging.Logger) -> None: """ Creates docker networks for a given containers configuration :param logger: the logger to use for logging :param containers_config: the containers configuration :return: None """ for c in containers_config.containers: for ip_net in c.ips_and_networks: existing_networks = ContainerController.get_network_references() existing_networks = list(map(lambda x: x.name, existing_networks)) ip, net = ip_net if net.name != "": ContainerController.create_network_from_dto(network_dto=net, existing_network_names=existing_networks, logger=logger)
[docs] @staticmethod def connect_containers_to_networks(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) -> None: """ Connects running containers to networks :param emulation_env_config: the emulation config :param physical_server_ip: the ip of the physical server where the operation is executed :param logger: the logger to use for logging :return: None """ for c in emulation_env_config.containers_config.containers: if c.physical_host_ip != physical_server_ip: continue container_name = c.get_full_name() # Disconnect from none cmd = f"docker network disconnect none {container_name}" subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) # Wait a few seconds before connecting time.sleep(2) for ip_net in c.ips_and_networks: time.sleep(2) ip, net = ip_net cmd = f"{constants.DOCKER.NETWORK_CONNECT} --ip {ip} {net.name} " \ f"{container_name}" logger.info(f"Connecting container:{container_name} to network:{net.name} with ip: {ip}, cmd: {cmd}") subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) if c.docker_gw_bridge_ip == "" or c.docker_gw_bridge_ip is None: # Wait to make sure docker networks are updated time.sleep(2) # Extract the docker bridge gateway IP container_id = DockerUtil.get_container_hex_id(name=container_name) if container_id is None: raise ValueError(f"Could not parse the container id with container name: {container_name}") docker_gw_bridge_ip = DockerUtil.get_docker_gw_bridge_ip(container_id=container_id) c.docker_gw_bridge_ip = docker_gw_bridge_ip if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip: ContainerController.connect_container_to_network(container=emulation_env_config.kafka_config.container, logger=logger) if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip: ContainerController.connect_container_to_network(container=emulation_env_config.elk_config.container, logger=logger) if emulation_env_config.sdn_controller_config is not None and \ emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip: # Connect controller ContainerController.connect_container_to_network( container=emulation_env_config.sdn_controller_config.container, logger=logger) # Update IPs of OVS switches for ovs_sw in emulation_env_config.ovs_config.switch_configs: node_container_config = emulation_env_config.containers_config.get_container_from_ip(ovs_sw.ip) if node_container_config is None: raise ValueError(f"Could not find node container config for IP: {ovs_sw.ip}") ovs_sw.docker_gw_bridge_ip = node_container_config.docker_gw_bridge_ip
[docs] @staticmethod def connect_container_to_network(container: NodeContainerConfig, logger: logging.Logger) -> None: """ Connect a running container to networks :param container: the container to connect :param logger: the logger to use for logging :return: None """ container_name = container.get_full_name() # Disconnect from none cmd = f"docker network disconnect none {container_name}" subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) # Wait a few seconds before connecting time.sleep(2) for ip_net in container.ips_and_networks: time.sleep(2) ip, net = ip_net cmd = f"{constants.DOCKER.NETWORK_CONNECT} --ip {ip} {net.name} " \ f"{container_name}" logger.info(f"Connecting container:{container_name} to network:{net.name} with ip: {ip}, cmd: {cmd}") subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) if container.docker_gw_bridge_ip == "" or container.docker_gw_bridge_ip is None: # Wait to make sure docker networks are updated time.sleep(2) # Extract the docker bridge gateway IP container_id = DockerUtil.get_container_hex_id(name=container_name) if container_id is None: raise ValueError(f"Could not parse the container id for container name: {container_name}") docker_gw_bridge_ip = DockerUtil.get_docker_gw_bridge_ip(container_id=container_id) container.docker_gw_bridge_ip = docker_gw_bridge_ip
[docs] @staticmethod def start_docker_stats_thread(execution: EmulationExecution, physical_server_ip: str, logger: logging.Logger) -> None: """ Sends a request to the docker stats manager on the docker host for starting a docker stats monitor thread :param execution: the emulation execution :param physical_server_ip: the ip of the physical server :param logger: the logger to use for logging :return: None """ if not ManagementSystemController.is_statsmanager_running(): ManagementSystemController.start_docker_statsmanager( logger=logger, port=execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_port, log_dir=execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_log_dir, log_file=execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_log_file, max_workers=execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_max_workers ) time.sleep(5) ip = physical_server_ip logger.info( f"connecting to: {ip}:" f"{execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_port}") with grpc.insecure_channel( f'{ip}:' f'{execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.docker_stats_manager.docker_stats_manager_pb2_grpc.DockerStatsManagerStub(channel) container_ip_dtos = [] for c in execution.emulation_env_config.containers_config.containers: if c.physical_host_ip == physical_server_ip: name = c.get_full_name() ip = c.get_ips()[0] container_ip_dtos.append(csle_collector.docker_stats_manager.docker_stats_manager_pb2.ContainerIp( ip=ip, container=name)) logger.info("connected") csle_collector.docker_stats_manager.query_docker_stats_manager.start_docker_stats_monitor( stub=stub, emulation=execution.emulation_name, kafka_ip=execution.emulation_env_config.kafka_config.container.docker_gw_bridge_ip, stats_queue_maxsize=1000, time_step_len_seconds=execution.emulation_env_config.docker_stats_manager_config.time_step_len_seconds, kafka_port=execution.emulation_env_config.kafka_config.kafka_port_external, containers=container_ip_dtos, execution_first_ip_octet=execution.ip_first_octet)
[docs] @staticmethod def stop_docker_stats_thread(execution: EmulationExecution, physical_server_ip: str, logger: logging.Logger) -> None: """ Sends a request to the docker stats manager on the docker host for stopping a docker stats monitor thread :param logger: the logger to use for logging :param execution: the execution of the emulation for which the monitor should be stopped :param physical_server_ip: the ip of the physical server :return: None """ if not ManagementSystemController.is_statsmanager_running(): ManagementSystemController.stop_docker_statsmanager(logger=logger) time.sleep(5) ip = physical_server_ip with grpc.insecure_channel( f'{ip}:' f'{execution.emulation_env_config.docker_stats_manager_config.docker_stats_manager_port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.docker_stats_manager.docker_stats_manager_pb2_grpc.DockerStatsManagerStub(channel) csle_collector.docker_stats_manager.query_docker_stats_manager.stop_docker_stats_monitor( stub=stub, emulation=execution.emulation_name, execution_first_ip_octet=execution.ip_first_octet)
[docs] @staticmethod def get_docker_stats_manager_status(docker_stats_manager_config: DockerStatsManagerConfig) \ -> csle_collector.docker_stats_manager.docker_stats_manager_pb2.DockerStatsMonitorDTO: """ Sends a request to get the status of the docker stats manager :param docker_stats_manager_config: the docker stats manager configuration :return: None """ ip = GeneralUtil.get_host_ip() docker_stats_monitor_dto = ContainerController.get_docker_stats_manager_status_by_ip_and_port( ip=ip, port=docker_stats_manager_config.docker_stats_manager_port) return docker_stats_monitor_dto
[docs] @staticmethod def get_docker_stats_manager_status_by_ip_and_port(ip: str, port: int) \ -> csle_collector.docker_stats_manager.docker_stats_manager_pb2.DockerStatsMonitorDTO: """ Sends a request to get the status of the docker stats manager :param ip: ip of the host where the stats manager is running :param port: port that the host manager is listening to :return: None """ try: with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.docker_stats_manager.docker_stats_manager_pb2_grpc.DockerStatsManagerStub(channel) docker_stats_monitor_dto = \ csle_collector.docker_stats_manager.query_docker_stats_manager.get_docker_stats_manager_status( stub=stub) return docker_stats_monitor_dto except Exception: return None
[docs] @staticmethod def create_network_from_dto(network_dto: ContainerNetwork, logger: logging.Logger, existing_network_names=None) -> None: """ Creates a network from a given DTO representing the network :param network_dto: details of the network to create :param existing_network_names: list of network names, if not None, check if network exists befeore creating :param logger: the logger to use for logging :return: None """ config = Config.get_current_config() if config is None: raise ValueError("Could not parse the CSLE config") driver = constants.DOCKER.BRIDGE_NETWORK_DRIVER if len(config.cluster_config.cluster_nodes) > 0: driver = constants.DOCKER.OVERLAY_NETWORK_DRIVER ContainerController.create_network(name=network_dto.name, subnetmask=network_dto.subnet_mask, existing_network_names=existing_network_names, driver=driver, logger=logger)
[docs] @staticmethod def create_network(name: str, subnetmask: str, logger: logging.Logger, driver: str = "bridge", existing_network_names: Union[None, List[str]] = None) -> None: """ Creates a network :param name: the name of the network to create :param subnetmask: the subnetmask of the network to create :param logger: the logger to use for logging :param driver: the driver of the network to create :param existing_network_names: list of network names, if not None, check if network exists befeore creating :return: None """ client_1 = docker.from_env() ipam_pool = docker.types.IPAMPool(subnet=subnetmask) ipam_config = docker.types.IPAMConfig(pool_configs=[ipam_pool]) network_names = [] if existing_network_names is not None: network_names = existing_network_names if name not in network_names: logger.info(f"Creating network: {name}, subnetmask: {subnetmask}, driver: {driver}") client_1.networks.create( name, driver=driver, ipam=ipam_config, attachable=True )
[docs] @staticmethod def remove_network(name: str, logger: logging.Logger) -> None: """ Removes a network :param name: the name of the network to remove :param logger: the logger to use for logging :return: None """ client_1 = docker.from_env() networks = client_1.networks.list() for net in networks: if net.name == name: logger.info(f"Removing network: {net.name}") try: net.remove() except Exception: pass
[docs] @staticmethod def remove_networks(names: List[str], logger: logging.Logger) -> bool: """ Removes a network :param name: the name of the network to remove :param logger: the logger to use for logging :return: True if at least one of the networks were successfully removed """ client_1 = docker.from_env() networks = client_1.networks.list() network_removed = False for net in networks: if net.name in names: logger.info(f"Removing network: {net.name}") try: net.remove() network_removed = True except Exception: pass return network_removed
[docs] @staticmethod def rm_all_networks(logger: logging.Logger) -> None: """ A utility function for removing all csle networks :param logger: the logger to use for logging :return: None """ client_1 = docker.from_env() networks = client_1.networks.list() networks = list(filter(lambda x: constants.CSLE.NAME in x.name, networks)) for net in networks: logger.info(f"Removing network:{net.name}") ContainerController.remove_network(name=net.name, logger=logger)
[docs] @staticmethod def rm_network(name, logger: logging.Logger) -> bool: """ A utility function for removing a network with a specific name :param name: the name of the network to remove :param logger: the logger to use for logging :return: True if it was removed or False otherwise """ client_1 = docker.from_env() networks = client_1.networks.list() networks_names = [] for net in networks: if constants.CSLE.NAME in net.name: net.name = constants.CSLE.NAME networks_names.append(net) networks = list(filter(lambda x: constants.CSLE.NAME in x.name, networks)) for net in networks_names: if net.name == name: ContainerController.remove_network(name=net.name, logger=logger) return True return False
[docs] @staticmethod def run_command(cmd: str) -> None: """ Runs a container management command :param cmd: the command to run :return: None """ if cmd == constants.MANAGEMENT.LIST_STOPPED: stopped_container_names = ContainerController.list_all_stopped_containers() Logger.__call__().get_logger().info(stopped_container_names) elif cmd == constants.MANAGEMENT.LIST_RUNNING: running_container_names = ContainerController.list_all_running_containers() Logger.__call__().get_logger().info(running_container_names) elif cmd == constants.MANAGEMENT.LIST_IMAGES: images_names = ContainerController.list_all_images() Logger.__call__().get_logger().info(images_names) elif cmd == constants.MANAGEMENT.STOP_RUNNING: ContainerController.stop_all_running_containers() elif cmd == constants.MANAGEMENT.RM_STOPPED: ContainerController.rm_all_stopped_containers() elif cmd == constants.MANAGEMENT.RM_IMAGES: ContainerController.rm_all_images() elif cmd == constants.MANAGEMENT.START_STOPPED: ContainerController.start_all_stopped_containers() elif cmd == constants.MANAGEMENT.LIST_NETWORKS: networks = ContainerController.list_all_networks() Logger.__call__().get_logger().info(networks) elif cmd == constants.MANAGEMENT.RM_NETWORKS: ContainerController.rm_all_networks(logger=Logger.__call__().get_logger()) else: raise ValueError("Command: {} not recognized".format(cmd))
[docs] @staticmethod def get_docker_stats_managers_ips(emulation_env_config: EmulationEnvConfig) -> List[str]: """ A method that extracts the IPS of the Docker stats managers in a given emulation :param emulation_env_config: the emulation env config :return: the list of IP addresses """ return [GeneralUtil.get_host_ip()]
[docs] @staticmethod def get_docker_stats_managers_ports(emulation_env_config: EmulationEnvConfig) -> List[int]: """ A method that extracts the ports of the Docker stats managers in a given emulation :param emulation_env_config: the emulation env config :return: the list of ports """ return [emulation_env_config.docker_stats_manager_config.docker_stats_manager_port]
[docs] @staticmethod def get_docker_stats_managers_info(emulation_env_config: EmulationEnvConfig, active_ips: List[str], physical_host_ip: str, logger: logging.Logger) \ -> DockerStatsManagersInfo: """ Extracts the information of the Docker stats managers for a given emulation :param emulation_env_config: the configuration of the emulation :param active_ips: list of active IPs :param physical_host_ip: the ip of the physical host :param logger: the logger to use for logging :return: a DTO with the status of the Docker stats managers """ docker_stats_managers_ips = ContainerController.get_docker_stats_managers_ips( emulation_env_config=emulation_env_config) docker_stats_managers_ports = ContainerController.get_docker_stats_managers_ports( emulation_env_config=emulation_env_config) docker_stats_managers_statuses = [] docker_stats_managers_running = [] for ip in docker_stats_managers_ips: if ip not in active_ips or ip != physical_host_ip: continue running = False status = None try: status = ContainerController.get_docker_stats_manager_status_by_ip_and_port( port=emulation_env_config.docker_stats_manager_config.docker_stats_manager_port, ip=ip) running = True except Exception as e: logger.debug( f"Could not fetch Docker stats manager status on IP:{ip}, error: {str(e)}, {repr(e)}") if status is not None: docker_stats_managers_statuses.append(status) else: stats_util = csle_collector.docker_stats_manager.docker_stats_util.DockerStatsUtil docker_stats_managers_statuses.append(stats_util.docker_stats_monitor_dto_empty()) docker_stats_managers_running.append(running) execution_id = emulation_env_config.execution_id emulation_name = emulation_env_config.name docker_stats_manager_info_dto = DockerStatsManagersInfo( docker_stats_managers_running=docker_stats_managers_running, ips=docker_stats_managers_ips, execution_id=execution_id, emulation_name=emulation_name, docker_stats_managers_statuses=docker_stats_managers_statuses, ports=docker_stats_managers_ports) return docker_stats_manager_info_dto