Source code for csle_common.util.emulation_util

from typing import Tuple, List, Union
import time
import paramiko
import psutil
import csle_collector.constants.constants as collector_constants
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_observation.common.emulation_connection_observation_state \
    import EmulationConnectionObservationState
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_id import EmulationAttackerActionId
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_type import EmulationAttackerActionType
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_outcome import EmulationAttackerActionOutcome
from csle_common.dao.emulation_config.credential import Credential
from csle_common.dao.emulation_config.transport_protocol import TransportProtocol
from csle_common.util.ssh_util import SSHUtil
from csle_common.logging.log import Logger


[docs]class EmulationUtil: """ Class containing utility functions for the emulation-middleware """
[docs] @staticmethod def execute_ssh_cmds(cmds: List[str], conn, wait_for_completion: bool = True) -> List[Tuple[bytes, bytes, float]]: """ Executes a list of commands over an ssh connection to the emulation :param cmds: the list of commands :param conn: the ssh connection :param wait_for_completion: whether to wait for completion of the commands or not :return: List of tuples (outdata, errdata, total_time) """ return SSHUtil.execute_ssh_cmds(cmds=cmds, conn=conn, wait_for_completion=wait_for_completion)
[docs] @staticmethod def execute_ssh_cmd(cmd: str, conn, wait_for_completion: bool = True) -> Tuple[bytes, bytes, float]: """ Executes an action on the emulation over a ssh connection, this is a synchronous operation that waits for the completion of the action before returning :param cmd: the command to execute :param conn: the ssh connection :param wait_for_completion: boolean flag whether to wait for completion or not :return: outdata, errdata, total_time """ return SSHUtil.execute_ssh_cmd(cmd=cmd, conn=conn, wait_for_completion=wait_for_completion)
[docs] @staticmethod def log_measured_action_time(total_time, action: EmulationAttackerAction, emulation_env_config: EmulationEnvConfig, create_producer: bool = False) -> None: """ Logs the measured time of an action to Kafka :param total_time: the total time of executing the action :param action: the action :param emulation_env_config: the environment config :param create_producer: boolean flag whether to create a producer from the csle-admin :return: None """ action.execution_time = total_time record = action.to_kafka_record() if emulation_env_config.producer is None and create_producer: emulation_env_config.create_producer() if emulation_env_config.producer is not None: emulation_env_config.producer.produce(collector_constants.KAFKA_CONFIG.ATTACKER_ACTIONS_TOPIC_NAME, record) emulation_env_config.producer.poll(0) emulation_env_config.producer.flush()
@staticmethod def _check_if_ssh_server_is_running(conn, telnet: bool = False) -> bool: """ Checks if an ssh server is running on the machine :param conn: the connection to use for the command :param telnet: whether the connection is a telnet connection :return: True if server is running, else false """ cmd = "service ssh status" if not telnet: outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=conn) return "is running" in outdata.decode() or "is running" in errdata.decode() else: cmd = cmd + "\n" conn.write(cmd.encode()) response = conn.read_until(constants.TELNET.PROMPT, timeout=5) return "is running" in response.decode() @staticmethod def _list_all_users(c: EmulationConnectionObservationState, emulation_env_config: EmulationEnvConfig, telnet: bool = False) -> List[str]: """ List all users on a machine :param c: the connection to user for the command :param telnet: whether it is a telnet connection :param emulation_env_config: emulation env config :return: list of users """ cmd = constants.SHELL.LIST_ALL_USERS users = [] for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_FIND_USERS): if not telnet: outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=c.conn) outdata_1 = outdata.decode() users = outdata_1.split("\n") users = list(filter(lambda x: x != '', users)) else: cmd = cmd + "\n" c.conn.write(cmd.encode()) response = c.conn.read_until(constants.TELNET.PROMPT, timeout=5) response = response.decode() users = response.split("\n") users = list(map(lambda x: x.replace("\r", ""), users)) users = list(filter(lambda x: x != '', users)) if len(users) == 0: continue else: break if len(users) == 0: raise ValueError("users empty, ip:{}, telnet:{}, root:{}, username:{}".format(c.ip, telnet, c.root, c.username)) return users
[docs] @staticmethod def is_connection_active(conn): """ Checks if a given connection is active or not :param conn: the connection to check :return: True if active, otherwise False """ if conn is not None and conn.get_transport() is not None: return conn.get_transport().is_active() else: return False
[docs] @staticmethod def setup_custom_connection(user: str, pw: str, source_ip: str, port: int, target_ip: str, proxy_conn, root: bool) \ -> Union[EmulationConnectionObservationState, None]: """ Utility function for setting up a custom SSH connection given credentials and a proxy connection :param user: the username of the new connection :param pw: the pw of the new connection :param source_ip: the ip of the proxy :param port: the port of the new connection :param target_ip: the ip to connect to :param proxy_conn: the proxy connection :param root: whether it is a root connection or not :return: the new connection """ cr = Credential(username=user, pw=pw, root=root, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME, port=constants.SSH.DEFAULT_PORT) agent_addr = (source_ip, port) target_addr = (target_ip, port) agent_transport = proxy_conn.conn.get_transport() try: relay_channel = agent_transport.open_channel(constants.SSH.DIRECT_CHANNEL, target_addr, agent_addr, timeout=3) target_conn = paramiko.SSHClient() target_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) target_conn.connect(target_ip, username=user, password=pw, sock=relay_channel) transport = target_conn.get_transport() if transport is not None: transport.set_keepalive(5) else: raise ValueError(f"Connection failed, ip: {target_ip}, user: {user}, pw: {pw}") connection_dto = EmulationConnectionObservationState( conn=target_conn, credential=cr, root=root, service=constants.SSH.SERVICE_NAME, port=constants.SSH.DEFAULT_PORT, proxy=proxy_conn, ip=target_ip) return connection_dto except Exception as e: Logger.__call__().get_logger().warning(f"Custom connection setup failed: {str(e)}, {repr(e)}") return None
[docs] @staticmethod def write_remote_file(conn, file_name: str, contents: str, write_mode: str = "w") -> None: """ Utility function for writing contents to a file :param conn: the SSH connection to use for writing :param file_name: the file name :param contents: the contents of the file :param write_mode: the write mode :return: None """ sftp_client = conn.open_sftp() remote_file = sftp_client.file(file_name, mode=write_mode) try: remote_file.write(contents) except Exception as e: Logger.__call__().get_logger().warning(f"Exception writing file: {str(e)}, {repr(e)}") finally: remote_file.close()
[docs] @staticmethod def connect_admin(emulation_env_config: EmulationEnvConfig, ip: str, create_producer: bool = False) -> None: """ Connects the admin agent :param emulation_env_config: the configuration of the emulation to connect to :param ip: the ip of the container to connect to :param create_producer: boolean flag whether the producer thread to Kafka should be created also :return: None """ emulation_env_config.agent_ip = ip emulation_env_config.connect(ip=ip, username=constants.CSLE_ADMIN.SSH_USER, pw=constants.CSLE_ADMIN.SSH_PW, create_producer=create_producer)
[docs] @staticmethod def disconnect_admin(emulation_env_config: EmulationEnvConfig) -> None: """ Disconnects the admin agent :param emulation_env_config: the configuration of the emulation to disconnect the admin of :return: None """ emulation_env_config.close_all_connections()
[docs] @staticmethod def execute_cmd_interactive_channel(cmd: str, channel) -> None: """ Executes an action on the emulation using an interactive shell (non synchronous) :param a: action to execute :param env_config: environment config :param channel: the channel to use :return: None """ channel.send(cmd + "\n")
[docs] @staticmethod def read_result_interactive(emulation_env_config: EmulationEnvConfig, channel) -> str: """ Reads the result of an action executed in interactive mode :param emulation_env_config: the emulation environment config :param the channel :return: the result """ return EmulationUtil.read_result_interactive_channel( emulation_env_config=emulation_env_config, channel=channel)
[docs] @staticmethod def read_result_interactive_channel(emulation_env_config: EmulationEnvConfig, channel) -> str: """ Reads the result of an action executed in interactive mode :param emulation_env_config: the emulation environment config :param channel: the channel to use :return: the result """ while not channel.recv_ready(): time.sleep(constants.ENV_CONSTANTS.SHELL_READ_WAIT) output = channel.recv(constants.COMMON.LARGE_RECV_SIZE) output_str = output.decode("utf-8") output_str = constants.NMAP.SHELL_ESCAPE.sub("", output_str) return str(output_str)
[docs] @staticmethod def check_pid(pid: int) -> bool: """ Check if a given pid is running on the host :param pid: the pid to check :return: True if it is running otherwise false. """ try: return bool(psutil.Process(pid).is_running()) except Exception: return False
[docs] @staticmethod def physical_ip_match(emulation_env_config: EmulationEnvConfig, ip, physical_host_ip: str) -> bool: """ Utility method for checking if a container ip matches a physical ip :param emulation_env_config: the emulation config :param ip: the ip to check :param physical_host_ip: the physical ip :return: True or False """ if ip == emulation_env_config.kafka_config.container.docker_gw_bridge_ip and \ emulation_env_config.kafka_config.container.physical_host_ip == physical_host_ip: return True if ip == emulation_env_config.elk_config.container.docker_gw_bridge_ip and \ emulation_env_config.elk_config.container.physical_host_ip == physical_host_ip: return True if emulation_env_config.sdn_controller_config is not None and \ ip == emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip and \ emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_host_ip: return True node_config = emulation_env_config.containers_config.get_container_from_ip(ip=ip) if node_config is not None and node_config.physical_host_ip == physical_host_ip: return True return False