Source code for csle_common.util.docker_util

from typing import List, Union
import docker
from docker.models.containers import Container
import os
import json
from csle_common.dao.docker.docker_container_metadata import DockerContainerMetadata
from csle_common.dao.docker.docker_env_metadata import DockerEnvMetadata
from csle_common.metastore.metastore_facade import MetastoreFacade
import csle_common.constants.constants as constants


[docs]class DockerUtil: """ Utility class for extracting information from running emulation environments """
[docs] @staticmethod def parse_runnning_emulation_infos() -> List[DockerEnvMetadata]: """ Queries docker to get a list of all running emulation environments :return: a list of environment DTOs """ client_1: docker.DockerClient = docker.from_env() client_2: docker.APIClient = docker.APIClient(base_url=constants.DOCKER.UNIX_DOCKER_SOCK_URL) parsed_containers = DockerUtil.parse_running_containers(client_1=client_1, client_2=client_2) emulations: List[str] = list(set(list(map(lambda x: x.emulation, filter(lambda x: x is not None, parsed_containers))))) parsed_envs = DockerUtil.parse_running_emulation_envs(emulations=emulations, containers=parsed_containers) return parsed_envs
[docs] @staticmethod def get_container_hex_id(name: str) -> Union[str, None]: """ Queries the docker engine for the id of a container with a given name :return: the id """ client_1: docker.DockerClient = docker.from_env() client_2: docker.APIClient = docker.APIClient(base_url=constants.DOCKER.UNIX_DOCKER_SOCK_URL) containers: List[DockerContainerMetadata] = DockerUtil.parse_running_containers(client_1=client_1, client_2=client_2) for container in containers: if container.name == name: return container.id return None
[docs] @staticmethod def parse_running_containers(client_1: docker.DockerClient, client_2: docker.APIClient) \ -> List[DockerContainerMetadata]: """ Queries docker to get a list of all running containers :param client_1: docker client 1 :param client_2: docker client 2 :return: list of parsed running containers """ containers: List[Container] = client_1.containers.list() parsed_containers = DockerUtil.parse_containers(containers=containers, client2=client_2) return parsed_containers
[docs] @staticmethod def parse_stopped_containers(client_1: docker.DockerClient, client2: docker.APIClient) \ -> List[DockerContainerMetadata]: """ Queries docker to get a list of all stopped csle containers :param client_1: docker client 1 :param client2: docker client 2 :return: list of parsed containers """ containers: List[docker.models.containers.Container] = client_1.containers.list(all=True) stopped_containers: List[docker.models.containers.Container] = list(filter( lambda x: (x.status == constants.DOCKER.CONTAINER_EXIT_STATUS or x.status == constants.DOCKER.CONTAINER_CREATED_STATUS), containers)) parsed_containers = DockerUtil.parse_containers(containers=stopped_containers, client2=client2) return parsed_containers
[docs] @staticmethod def parse_running_emulation_envs(emulations: List[str], containers: List[DockerContainerMetadata]) \ -> List[DockerEnvMetadata]: """ Queries docker to get a list of all active emulation environments :param emulations: list of csle emulations :param containers: list of running csle containers :return: list of parsed emulation environments """ parsed_envs = [] for em in emulations: em_containers = list(filter(lambda x: x.emulation == em, containers)) subnet_prefix = constants.COMMANDS.DOT_DELIM.join(em_containers[0].ip.rsplit( constants.COMMANDS.DOT_DELIM)[0:-1]) subnet_mask = subnet_prefix + constants.COMMANDS.SLASH_DELIM + str(em_containers[0].ip_prefix_len) config = None em_record = MetastoreFacade.get_emulation_by_name(name=em) if em_record is not None: config = em_record p_env = DockerEnvMetadata(containers=em_containers, name=em, subnet_prefix=subnet_mask, subnet_mask=subnet_mask, level=em_containers[0].level, config=config, kafka_config=None) parsed_envs.append(p_env) return parsed_envs
[docs] @staticmethod def parse_containers(containers: List[docker.models.containers.Container], client2: docker.APIClient) \ -> List[DockerContainerMetadata]: """ Queries docker to get a list of running or stopped csle containers :param containers: list of containers to parse :param client2: docker client :return: List of parsed container DTOs """ parsed_containers = [] for c in containers: if constants.CONTAINER_IMAGES.CSLE_PREFIX in c.name: name_parts = c.name.split("-") container_name_2 = name_parts[0] level = name_parts[1] inspect_info = client2.inspect_container(c.id) net = None if len(list(inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS].keys())) > 0: net = list(inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS].keys())[0] labels = c.labels config_path = "" dir_path = "" emulation = "" kafka_config = "" if constants.DOCKER.CFG in labels: config_path = labels[constants.DOCKER.CFG] if constants.DOCKER.CONTAINER_CONFIG_DIR in labels: dir_path = labels[constants.DOCKER.CONTAINER_CONFIG_DIR] if constants.DOCKER.EMULATION in labels: emulation = labels[constants.DOCKER.EMULATION] if constants.DOCKER.KAFKA_CONFIG in labels: kafka_config = labels[constants.DOCKER.KAFKA_CONFIG] ip = "" network_id = -1 gateway = "" mac = "" ip_prefix_len = 0 if net is not None: ip = inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS][net][ constants.DOCKER.IP_ADDRESS_INFO] network_id = inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS][net][ constants.DOCKER.NETWORK_ID_INFO] gateway = inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS][net][ constants.DOCKER.GATEWAY_INFO] mac = inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS][net][ constants.DOCKER.MAC_ADDRESS_INFO] ip_prefix_len = inspect_info[constants.DOCKER.NETWORK_SETTINGS][constants.DOCKER.NETWORKS][net][ constants.DOCKER.IP_PREFIX_LEN_INFO] parsed_c = DockerContainerMetadata( name=c.name, status=c.status, short_id=c.short_id, image_short_id=c.image.short_id, image_tags=c.image.tags, id=c.id, created=inspect_info[constants.DOCKER.CREATED_INFO], ip=ip, network_id=network_id, gateway=gateway, mac=mac, ip_prefix_len=ip_prefix_len, name2=container_name_2, level=level, hostname=inspect_info[constants.DOCKER.CONFIG][constants.DOCKER.HOSTNAME_INFO], image_name=inspect_info[constants.DOCKER.CONFIG][constants.DOCKER.IMAGE], net=net, dir=dir_path, config_path=config_path, container_handle=c, emulation=emulation, kafka_container=kafka_config) parsed_containers.append(parsed_c) return parsed_containers
[docs] @staticmethod def get_docker_gw_bridge_ip(container_id: str) -> str: """ Gets the docker gw bridge ip of a container :param container_id: the id of the container :return: the ip in the gw bridge network """ cmd = constants.DOCKER.INSPECT_DOCKER_GWBRIDGE stream = os.popen(cmd) json_output = stream.read() docker_gw_bridge_info = json.loads(json_output)[0] containers = docker_gw_bridge_info[constants.DOCKER.CONTAINERS_KEY] if container_id in containers: container = containers[container_id] ip = container[constants.DOCKER.IPV4_KEY] if isinstance(ip, str): ip = ip.split("/")[0] return ip raise ValueError(f"The container with id:{container_id} does not have an IP in the docker gw bridge network. " f"Containers: {containers}")