csle_common.util package

Submodules

csle_common.util.cluster_util module

class csle_common.util.cluster_util.ClusterUtil[source]

Bases: object

Class with utility related to cluster management

static am_i_leader(ip: str, config: csle_common.dao.emulation_config.config.Config) bool[source]

Checks if a given IP is leader or not

Parameters
  • ip – the ip to check

  • config – the cluster configuration

Returns

True if leader, False otherwise

static get_config() csle_common.dao.emulation_config.config.Config[source]

Gets the current cluster config from the metastore or from disk depending on if it is the leader node or not :return: the cluster config

static set_config_parameters_from_config_file() None[source]

Reads the config file from $CSLE_HOME/config.json and initializes certain config parameters

Returns

None

csle_common.util.connection_util module

class csle_common.util.connection_util.ConnectionUtil[source]

Bases: object

Class containing utility functions for the connection-related functionality to the emulation

static find_jump_host_connection(ip, s: csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState) csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState[source]

Utility function for finding a jump-host from the set of compromised machines to reach a target IP

Parameters
  • ip – the ip to reach

  • s – the current state

Returns

a connection DTO

static login_service_helper(s: csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState, a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction, alive_check, service_name: str) Tuple[csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState, bool][source]

Helper function for logging in to a network service in the emulation

Parameters
  • s – the current state

  • a – the action of the login

  • alive_check – the function to check whether current connections are alive or not

  • service_name – name of the service to login to

Returns

s_prime, new connection (bool)

static reconnect_ssh(c: csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState) csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState[source]

Reconnects the given SSH connection if it has died for some reason

Parameters

c – the connection to reconnect

Returns

the new connection

static reconnect_telnet(c: csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState, forward_port: int = 9000) csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState[source]

Reconnects the given Telnet connection if it has died for some reason

Parameters
  • c – the connection to reconnect

  • forward_port – the port for port-forwarding

Returns

the new connection

static test_connection(c: csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState) bool[source]

Utility function for testing if a connection is alive or not

Parameters

c – the connection to thest

Returns

True if the connection is alive, otherwise false

csle_common.util.docker_util module

class csle_common.util.docker_util.DockerUtil[source]

Bases: object

Utility class for extracting information from running emulation environments

static get_container_hex_id(name: str) Optional[str][source]

Queries the docker engine for the id of a container with a given name

Returns

the id

static get_docker_gw_bridge_ip(container_id: str) str[source]

Gets the docker gw bridge ip of a container

Parameters

container_id – the id of the container

Returns

the ip in the gw bridge network

static parse_containers(containers: List[docker.models.containers.Container], client2: docker.api.client.APIClient) List[csle_common.dao.docker.docker_container_metadata.DockerContainerMetadata][source]

Queries docker to get a list of running or stopped csle containers

Parameters
  • containers – list of containers to parse

  • client2 – docker client

Returns

List of parsed container DTOs

static parse_running_containers(client_1: docker.client.DockerClient, client_2: docker.api.client.APIClient) List[csle_common.dao.docker.docker_container_metadata.DockerContainerMetadata][source]

Queries docker to get a list of all running containers

Parameters
  • client_1 – docker client 1

  • client_2 – docker client 2

Returns

list of parsed running containers

static parse_running_emulation_envs(emulations: List[str], containers: List[csle_common.dao.docker.docker_container_metadata.DockerContainerMetadata]) List[csle_common.dao.docker.docker_env_metadata.DockerEnvMetadata][source]

Queries docker to get a list of all active emulation environments

Parameters
  • emulations – list of csle emulations

  • containers – list of running csle containers

Returns

list of parsed emulation environments

static parse_runnning_emulation_infos() List[csle_common.dao.docker.docker_env_metadata.DockerEnvMetadata][source]

Queries docker to get a list of all running emulation environments

Returns

a list of environment DTOs

static parse_stopped_containers(client_1: docker.client.DockerClient, client2: docker.api.client.APIClient) List[csle_common.dao.docker.docker_container_metadata.DockerContainerMetadata][source]

Queries docker to get a list of all stopped csle containers

Parameters
  • client_1 – docker client 1

  • client2 – docker client 2

Returns

list of parsed containers

csle_common.util.emulation_util module

class csle_common.util.emulation_util.EmulationUtil[source]

Bases: object

Class containing utility functions for the emulation-middleware

static check_pid(pid: int) bool[source]

Check if a given pid is running on the host

Parameters

pid – the pid to check

Returns

True if it is running otherwise false.

static connect_admin(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, ip: str, create_producer: bool = False) None[source]

Connects the admin agent

Parameters
  • emulation_env_config – the configuration of the emulation to connect to

  • ip – the ip of the container to connect to

  • create_producer – boolean flag whether the producer thread to Kafka should be created also

Returns

None

static disconnect_admin(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig) None[source]

Disconnects the admin agent

Parameters

emulation_env_config – the configuration of the emulation to disconnect the admin of

Returns

None

static execute_cmd_interactive_channel(cmd: str, channel) None[source]

Executes an action on the emulation using an interactive shell (non synchronous) :param a: action to execute :param env_config: environment config :param channel: the channel to use :return: None

static execute_ssh_cmd(cmd: str, conn, wait_for_completion: bool = True) Tuple[bytes, bytes, float][source]

Executes an action on the emulation over a ssh connection, this is a synchronous operation that waits for the completion of the action before returning

Parameters
  • cmd – the command to execute

  • conn – the ssh connection

  • wait_for_completion – boolean flag whether to wait for completion or not

Returns

outdata, errdata, total_time

static execute_ssh_cmds(cmds: List[str], conn, wait_for_completion: bool = True) List[Tuple[bytes, bytes, float]][source]

Executes a list of commands over an ssh connection to the emulation

Parameters
  • cmds – the list of commands

  • conn – the ssh connection

  • wait_for_completion – whether to wait for completion of the commands or not

Returns

List of tuples (outdata, errdata, total_time)

static is_connection_active(conn)[source]

Checks if a given connection is active or not

Parameters

conn – the connection to check

Returns

True if active, otherwise False

Checks if a given attack action is legal in the current state of the environment

Parameters
  • attack_action_id – the id of the action to check

  • env_config – the environment config

  • env_state – the environment state

Returns

True if legal, else false

Checks if a given defense action is legal in the current state of the environment

Parameters
  • defense_action_id – the id of the action to check

  • env_config – the environment config

  • env_state – the environment state

  • attacker_action – the id of the previous attack action

Returns

True if legal, else false

static log_measured_action_time(total_time, action: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction, emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, create_producer: bool = False) None[source]

Logs the measured time of an action to Kafka

Parameters
  • total_time – the total time of executing the action

  • action – the action

  • emulation_env_config – the environment config

  • create_producer – boolean flag whether to create a producer from the csle-admin

Returns

None

static physical_ip_match(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, ip, physical_host_ip: str) bool[source]

Utility method for checking if a container ip matches a physical ip

Parameters
  • emulation_env_config – the emulation config

  • ip – the ip to check

  • physical_host_ip – the physical ip

Returns

True or False

static read_result_interactive(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, channel) str[source]

Reads the result of an action executed in interactive mode

Parameters

emulation_env_config – the emulation environment config

:param the channel :return: the result

static read_result_interactive_channel(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, channel) str[source]

Reads the result of an action executed in interactive mode :param emulation_env_config: the emulation environment config :param channel: the channel to use :return: the result

static setup_custom_connection(user: str, pw: str, source_ip: str, port: int, target_ip: str, proxy_conn, root: bool) Optional[csle_common.dao.emulation_observation.common.emulation_connection_observation_state.EmulationConnectionObservationState][source]

Utility function for setting up a custom SSH connection given credentials and a proxy connection :param user: the username of the new connection :param pw: the pw of the new connection :param source_ip: the ip of the proxy :param port: the port of the new connection :param target_ip: the ip to connect to :param proxy_conn: the proxy connection :param root: whether it is a root connection or not :return: the new connection

static write_remote_file(conn, file_name: str, contents: str, write_mode: str = 'w') None[source]

Utility function for writing contents to a file

Parameters
  • conn – the SSH connection to use for writing

  • file_name – the file name

  • contents – the contents of the file

  • write_mode – the write mode

Returns

None

csle_common.util.env_dynamics_util module

class csle_common.util.env_dynamics_util.EnvDynamicsUtil[source]

Bases: object

Class containing common utilities that are used both in simulation-mode and in emulation-mode.

static cache_attacker_action(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction, s: csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState) None[source]

Utility function for caching an attacker action

Parameters
  • a – the attacker action to cache

  • s – the current state

Returns

None

static cache_defender_action(a: csle_common.dao.emulation_action.defender.emulation_defender_action.EmulationDefenderAction, s: csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState) None[source]

Utility function for caching a defender action

Parameters
  • a – the defender action to cache

  • s – the current state

Returns

None

static check_if_ftp_connection_is_alive(conn) bool[source]

Utilitty function to check whether a FTP connection is alive or not

Parameters

conn – the connection to check

Returns

true or false

static check_if_telnet_connection_is_alive(conn) bool[source]

Utility function to check whether a Telnet connection is alive or not

Parameters

conn – the connection to check

Returns

true or false

static exploit_get_service_name(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction) str[source]

Utility function to get the name of the exploited service of a particular exploit action

Parameters

a – the action

Returns

the name of the service

static exploit_get_vuln_cvss(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction) float[source]

Utility function for getting the CVSS of an exploit action

Parameters

a – the action

Returns

the CVSS

static exploit_get_vuln_name(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction) str[source]

Utiltiy function to get the vulnerability name of a particular exploit action

Parameters

a – the action

Returns

the name of hte corresponding vulnerability

static exploit_tried_flags(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction, m_obs: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState)[source]

Utility function for updating exploit-tried flags

Parameters
  • a – the action

  • m_obs – the observation to update

Returns

the updated observation

static install_tools_tried(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction, m_obs: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState)[source]

Utility function for updating install-tools flag

Parameters
  • a – the action

  • m_obs – the observation to update

Returns

the updated observation

static is_all_flags_collected(s: csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState, emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig) bool[source]

Checks if all flags are collected (then episode is done)

Parameters
  • s – current state

  • emulation_env_config – environment config

Returns

True if all flags are collected otherwise false

static logged_in_ips_str(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, s: csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState) str[source]

Utility function to getting a string-id of the attacker state (Q) of logged in machines

Parameters
  • emulation_env_config – the environment config

  • s – the current state

Returns

the string id

static merge_backdoor_credentials(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation backdoor credentials nodes with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated backdoor credentials nodes

static merge_backdoor_installed(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) Tuple[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, int][source]

Helper function for merging an old machine observation “backdoor installed” flag with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated backdoor_installed flag, num_new_backdoor_installed

static merge_backdoor_tried(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation “backdoor tried” flag with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated backdoor-tried flag

static merge_complete_obs_state(old_obs_state: csle_common.dao.emulation_observation.attacker.emulation_attacker_observation_state.EmulationAttackerObservationState, new_obs_state: csle_common.dao.emulation_observation.attacker.emulation_attacker_observation_state.EmulationAttackerObservationState, emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig) csle_common.dao.emulation_observation.attacker.emulation_attacker_observation_state.EmulationAttackerObservationState[source]

Merges an old observation state with a new one

Parameters
  • old_obs_state – the old observation state

  • new_obs_state – the new observation state

  • emulation_env_config – the emulation environment configuration

Returns

the merged observation state

static merge_connections(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation shell-connections with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated connections

static merge_duplicate_machines(machines: List[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState], action: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction) List[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState][source]

Utility function for merging machines that are duplicates

Parameters
  • machines – list of machines (possibly with duplicates)

  • action – the action that generated the new machines

Returns

the merged set of machines

static merge_exploit_tried(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation tried_brute_flags with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated brute-tried flags

static merge_filesystem_scanned(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, new_root: int) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation file-system-scanned-flag with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

  • new_root – number of new root connections

Returns

the merged machine observation with updated filesystem-scanned flag

static merge_flags(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) Tuple[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, int][source]

Helper function for merging an old machine observation flags with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated flags, number of new flag points

static merge_logged_in(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) Tuple[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, int][source]

Helper function for merging an old machine observation logged in with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated logged-in flag, num_new_login

static merge_new_machine_obs_with_old_machine_obs(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, action: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation with new information collected

Parameters
  • o_m – old machine observation

  • n_m – newly collected machine information

Returns

the merged attacker machine observation state

static merge_new_obs_with_old(old_machines_obs: List[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState], new_machines_obs: List[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState], emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, action: Optional[csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction]) List[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState][source]

Helper function for merging an old network observation with new information collected

Parameters
  • old_machines_obs – the list of old machine observations

  • new_machines_obs – the list of newly collected information

  • emulation_env_config – environment config

  • action – the action

Returns

the merged machine observations

static merge_os(o_os: str, n_os: str) Tuple[str, int][source]

Helper function for merging an old machine observation OS with new information collected

Parameters
  • o_os – the old OS

  • n_os – the newly observed OS

Returns

the merged os, 1 if it was a newly detected OS, otherwise 0

static merge_ports(o_ports: List[csle_common.dao.emulation_observation.common.emulation_port_observation_state.EmulationPortObservationState], n_ports: List[csle_common.dao.emulation_observation.common.emulation_port_observation_state.EmulationPortObservationState], acc: bool = True) Tuple[List[csle_common.dao.emulation_observation.common.emulation_port_observation_state.EmulationPortObservationState], int][source]

Helper function for merging two port lists

Parameters
  • o_ports – old list of ports

  • n_ports – new list of ports

  • acc – if true, accumulate port observations rather than overwrite

Returns

the merged port list, number of new ports found

static merge_reachable(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation reachable nodes with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated reachable nodes

static merge_root(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) Tuple[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, int][source]

Helper function for merging an old machine observation root with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated root flag, 1 if new root otherwise 0

static merge_shell_access(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) Tuple[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, int][source]

Helper function for merging an old machine observation shell access with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with update shell-access parameters, 1 if new access otherwise 0

static merge_tools_installed(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) Tuple[csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, int][source]

Helper function for merging an old machine observation “tools installed flag” with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated tools_installed flag, num_new_tools_installed

static merge_tools_tried(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation “tools tried” flag with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated tools-installed-tried flag

static merge_trace(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation trace with new information collected

Parameters
  • o_os – the old machine observation

  • n_os – the new machine observation

Returns

the merged machine observation with updated trace

static merge_untried_credentials(o_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, n_m: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState, action: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction) csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState[source]

Helper function for merging an old machine observation untried-credentials-flag with new information collected

Parameters
  • o_m – the old machine observation

  • n_m – the new machine observation

  • action – the action that was done to get n_m

Returns

the merged machine observation with updated untried-credentials flag

static merge_vulnerabilities(o_vuln: List[csle_common.dao.emulation_observation.common.emulation_vulnerability_observation_state.EmulationVulnerabilityObservationState], n_vuln: List[csle_common.dao.emulation_observation.common.emulation_vulnerability_observation_state.EmulationVulnerabilityObservationState], acc: bool = True) Tuple[List[csle_common.dao.emulation_observation.common.emulation_vulnerability_observation_state.EmulationVulnerabilityObservationState], int][source]

Helper function for merging two vulnerability lists lists

Parameters
  • o_vuln – old list of vulnerabilities

  • n_vuln – new list of vulnerabilities

  • acc – if true, accumulate vulnerability observations rather than overwrite

Returns

the merged vulnerability list, number of new vulnerabilities detected

static ssh_backdoor_tried_flags(a: csle_common.dao.emulation_action.attacker.emulation_attacker_action.EmulationAttackerAction, m_obs: csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state.EmulationAttackerMachineObservationState)[source]

Utility function for updating install-backdoor flag

Parameters
  • a – the action

  • m_obs – the observation to update

Returns

the updated observation

csle_common.util.experiment_util module

Utility functions for training with the csle environments

class csle_common.util.experiment_util.ExperimentUtil[source]

Bases: object

Class with utility functions related to training

static create_artifact_dirs(output_dir: str, random_seed: int) None[source]

Creates artifact directories if they do not already exist

Parameters
  • output_dir – the base directory

  • random_seed – the random seed of the experiment

Returns

None

static default_containers_folders_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to container folders

static default_emulation_config_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to emulation config file

static default_emulation_picture_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to emulation img file

static default_makefile_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to makefile tempalte

static default_makefile_template_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to makefile tempalte

static default_output_dir() str[source]
Returns

the default output dir

static default_simulation_config_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to simulatio config file

static default_simulation_picture_path(out_dir: Optional[str] = None) str[source]
Parameters

out_dir – directory to write

Returns

the default path to simulatio img file

static get_script_path()[source]
Returns

the script path

static get_subdir(output_dir: str, results_dir: str, subdir: str, seed: int) str[source]

Utility function to construct the subdir string from a given results dir, subdir, and random seed

Parameters
  • output_dir – the base output directory

  • results_dir – the base results dir

  • subdir – the subdirectory (e.g. logs, data, tensorboard, etc)

  • seed – the random seed

Returns

the directory path

static mean_confidence_interval(data, confidence=0.95)[source]

Compute confidence intervals

Parameters
  • data – the data

  • confidence – the interval confidence

Returns

the mean, the lower confidence interval, the upper confidence interval

static parse_args(default_config_path)[source]

Parses the commandline arguments with argparse

Parameters

default_config_path – default path to config file

static read_containers_config(containers_config_path) csle_common.dao.emulation_config.containers_config.ContainersConfig[source]

Reads containers_config of the experiment from a json file

Parameters

containers_config_path – the path to the containers_config file

Returns

the configuration

static read_emulation_env_config(emulation_env_config_path) csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig[source]

Reads emulation env config from a json file

Parameters

emulation_env_config_path – the path to the emulation env config file

Returns

the emulation env configuration

static read_env_picture(env_picture_path) bytes[source]

Reads the environment topology picture from a file

Parameters

env_picture_path – the path to picture

Returns

the emulation env configuration

static read_simulation_env_config(simulation_env_config_path) csle_common.dao.simulation_config.simulation_env_config.SimulationEnvConfig[source]

Reads simulation env config from a json file

Parameters

simulation_env_config_path – the path to the simulation env config file

Returns

the simulation env configuration

static regress_lists(lists: List[List[float]]) List[List[float]][source]

Regress sublists by interpolating their values to fit the length of the first sublist (reference list).

Parameters

lists – the lists ro regress

Returns

the regressed lists

static running_average(x, N)[source]

Function used to compute the running average of the last N elements of a vector x

Parameters
  • x – the vector to take the running average

  • N – the lenght of the running average

Returns

the averaged vector

static running_average_list(x, N)[source]

Function used to compute the running average of the last N elements of a vector x

Parameters
  • x – the vector to take the running average of

  • N – the running average length

Returns

a list with running averages

static set_seed(seed: int) None[source]

Sets the random seed

Parameters

seed – the seed to set

Returns

None

static setup_experiment_logger(name: str, logdir: str, time_str: Optional[str] = None)[source]

Configures the logger for writing log-data of training

Parameters
  • name – name of the logger

  • logdir – directory to save log files

  • time_str – time string for file names

Returns

None

static write_emulation_config_file(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, path: str) None[source]

Writes a config object to a config file

Parameters
  • emulation_env_config – the emulation env config object

  • path – the path to write the file

Returns

None

static write_simulation_config_file(simulation_env_config: csle_common.dao.simulation_config.simulation_env_config.SimulationEnvConfig, path: str) None[source]

Writes a config object to a config file

Parameters
  • simulation_env_config – the simulation env config object

  • path – the path to write the file

Returns

None

csle_common.util.export_util module

class csle_common.util.export_util.ExportUtil[source]

Bases: object

Class with utility functions for exporting data from the metastore

static export_emulation_statistics_to_disk_json(output_dir: str, zip_file_output: str, statistics_id: int, added_by: str = 'unknown') None[source]

Exports emulation statistics from the metastore to disk

Parameters
  • output_dir – the output directory

  • zip_file_output – the compressed zip file path

  • added_by – the person who added the dataset

  • statistics_id – the id of the statistics to fetch

Returns

None

static export_emulation_traces_to_disk_csv(num_traces_per_file: int, output_dir: str, zip_file_output: str, max_num_traces: int, max_time_steps: int, max_nodes: int, max_ports: int, max_vulns: int, null_value: int = - 1, added_by: str = 'unknown') None[source]

Exports emulation traces from the metastore to disk

Parameters
  • num_traces_per_file – the number of traces per file in the output directory

  • output_dir – the output directory

  • zip_file_output – the compressed zip file path

  • max_num_traces – maximum number of traces

  • added_by – the person who added the dataset

  • max_time_steps – the maximum number of time-steps to include in a csv row

  • max_nodes – the maximum number of nodes to include metrics from

  • max_ports – the maximum number of ports to include metrics from

  • max_vulns – the maximum number of vulnerabilities to include metrics from

  • null_value – the default null value if a metric is missing

Returns

None

static export_emulation_traces_to_disk_json(num_traces_per_file: int, output_dir: str, zip_file_output: str, max_num_traces: int, added_by: str = 'unknown', offset: int = 0, file_start_id: int = 1, emulation_traces_ids=None) None[source]

Exports emulation traces from the metastore to disk

Parameters
  • num_traces_per_file – the number of traces per file in the output directory

  • output_dir – the output directory

  • zip_file_output – the compressed zip file path

  • max_num_traces – maximum number of traces

  • added_by – the person who added the dataset

  • offset – the trace id offset

  • file_start_id – the id of the first file to write

  • emulation_traces_ids – the ids of the traces to export

Returns

None

static extract_emulation_statistics_dataset_metadata(dir_path: str, zip_file_path: str) Tuple[int, float, float, str, str, int, int, str, str, int][source]

Extracts metadata of a traces dataset stored on disk

Parameters
  • dir_path – the path to the directory where the traces dataset is stored

  • zip_file_path – the path to the compressed zipfile of the dataset

Returns

num_files, dir_size_uncompressed_gb, size_compressed_gb, file_format, added_by, num_measurements, num_metrics, metrics, conditions, num_conditions

static extract_emulation_traces_dataset_metadata(dir_path: str, zip_file_path: str) Tuple[int, float, float, str, int, str, int, int, str, str][source]

Extracts metadata of a traces dataset stored on disk

Parameters
  • dir_path – the path to the directory where the traces dataset is stored

  • zip_file_path – the path to the compressed zipfile of the dataset

Returns

num_files, dir_size_uncompressed_gb, file_format, num_traces, schema, num_traces_per_file, num_attributes_per_time_step, added_by, columns

static get_dir_size_gb(dir_path: str = '.') float[source]

Utility method to calculate the size of a file directory in gb

Parameters

dir_path – the path to the directory

Returns

the size of the directory in GB

static zipdir(dir_path: str, file_path: str) None[source]

Creates a zip file of a given directory

Parameters
  • dir_path – the path to the directory to zip

  • file_path – the full path of the resulting zip file

Returns

None

csle_common.util.general_util module

class csle_common.util.general_util.GeneralUtil[source]

Bases: object

Class with general utility functions

static get_host_ip() str[source]

Utility method for getting the ip of the host

Returns

the ip of the host

static get_latest_table_id(cur, table_name: str) int[source]

Gets the next ID for a table with a serial column primary key

Parameters
  • cur – the postgres connection cursor

  • table_name – the table name

Returns

the next id

static replace_first_octet_of_ip(ip: str, ip_first_octet: int) str[source]

Utility function for changing the first octet in an IP address

Parameters
  • ip – the IP to modify

  • ip_first_octet – the first octet to insert

Returns

the new IP

static replace_first_octet_of_ip_tuple(tuple_of_ips: Tuple[str, str], ip_first_octet: int) Tuple[str, str][source]

Utility function for changing the first octet in an IP address

Parameters
  • ip – the IP to modify

  • ip_first_octet – the first octet to insert

Returns

the new IP

csle_common.util.grpc_util module

class csle_common.util.grpc_util.GrpcUtil[source]

Bases: object

Class with utility functions for GRPC functionality in CSLE

static grpc_server_on(channel) bool[source]

Utility function to test if a given gRPC channel is working or not

Parameters

channel – the channel to test

Returns

True if working, False if timeout

csle_common.util.import_util module

class csle_common.util.import_util.ImportUtil[source]

Bases: object

Class with utility functions for importing data to the metastore

static import_emulation_statistics_from_disk_json(input_file: str, emulation_name: Optional[str] = None) None[source]

Imports emulation statistics from disk to the metastore

Parameters
  • file (input) – the input file

  • emulation_name – the emulation_name (optional)

Returns

None

static import_emulation_traces_from_disk_json(input_file: str, emulation_name: Optional[str] = None) None[source]

Imports emulation traces from disk to the metastore

Parameters
  • file (input) – the input file

  • emulation_name – the emulation_name (optional)

Returns

None

csle_common.util.management_util module

class csle_common.util.management_util.ManagementUtil[source]

Bases: object

Class with utility functions for management of CSLE

static create_default_management_admin_account() None[source]

Creates the default management admin account

Returns

None

static create_default_management_guest_account() None[source]

Creates the default management guest account

Returns

None

csle_common.util.multiprocessing_util module

class csle_common.util.multiprocessing_util.NestablePool(*args, **kwargs)[source]

Bases: multiprocessing.pool.Pool

Multiprocessing pool that allows nested multiprocessing

class csle_common.util.multiprocessing_util.NoDaemonContext[source]

Bases: multiprocessing.context.ForkContext

Context of a non-daemon process

Process

alias of csle_common.util.multiprocessing_util.NoDaemonProcess

class csle_common.util.multiprocessing_util.NoDaemonProcess(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)[source]

Bases: multiprocessing.context.Process

A process with the daemon property set to false

property daemon: bool

The deamon property

Returns

True if the process is a Deamon, else false

csle_common.util.plotting_util module

class csle_common.util.plotting_util.PlottingUtil[source]

Bases: object

Class with utility functions related to plotting

static mean_confidence_interval(data: numpy.ndarray[Any, numpy.dtype[Any]], confidence=0.95) Tuple[float, float][source]

Compute confidence intervals

Parameters
  • data – the data

  • confidence – the interval confidence

Returns

the mean, the lower confidence interval, the upper confidence interval

static min_max_norm(vec: numpy.ndarray[Any, numpy.dtype[Any]], max_val: float, min_val: float) numpy.ndarray[Any, numpy.dtype[Any]][source]

Min-max normalization of a vector

Parameters
  • vec – the vector to normalize

  • max_val – the maximum value for the normalization

  • min_val – the minimuim value for the normalization

Returns

the normalized vector

static running_average(x: numpy.ndarray[Any, numpy.dtype[Any]], N: int) numpy.ndarray[Any, numpy.dtype[Any]][source]

Function used to compute the running average of the last N elements of a vector x

Parameters
  • x – the vector to average

  • N – the length of the running average

Returns

the averaged vector

static running_average_list(x: numpy.ndarray[Any, numpy.dtype[Any]], N: int) numpy.ndarray[Any, numpy.dtype[Any]][source]

Function used to compute the running average of the last N elements of a vector x

Parameters
  • x – the vector to average

  • N – the length of the running average

Returns

csle_common.util.read_emulation_statistics_util module

class csle_common.util.read_emulation_statistics_util.ReadEmulationStatisticsUtil[source]

Bases: object

Utility class for reading emulation statistics

static average_host_metrics(host_metrics: List[csle_collector.host_manager.dao.host_metrics.HostMetrics]) csle_collector.host_manager.dao.host_metrics.HostMetrics[source]

Computes the average metrics from a list of host metrics

Parameters

host_metrics – the list of host metrics to average

Returns

the computed averages

static average_ossec_metrics(ossec_metrics: List[csle_collector.ossec_ids_manager.dao.ossec_ids_alert_counters.OSSECIdsAlertCounters]) csle_collector.ossec_ids_manager.dao.ossec_ids_alert_counters.OSSECIdsAlertCounters[source]

Computes the average metrics from a list of OSSEC metrics

Parameters

ossec_metrics – the list of OSSEC metrics

Returns

the computed averages

static average_snort_metrics(snort_metrics: List[csle_collector.snort_ids_manager.dao.snort_ids_alert_counters.SnortIdsAlertCounters]) csle_collector.snort_ids_manager.dao.snort_ids_alert_counters.SnortIdsAlertCounters[source]

Computes the average metrics from a list of Snort metrics

Parameters

snort_metrics – the list of Snort metrics

Returns

the computed averages

static average_snort_rule_metrics(snort_metrics: List[csle_collector.snort_ids_manager.dao.snort_ids_rule_counters.SnortIdsRuleCounters]) csle_collector.snort_ids_manager.dao.snort_ids_rule_counters.SnortIdsRuleCounters[source]

Computes the average metrics from a list of Snort rule metrics

Parameters

snort_metrics – the list of Snort rule metrics

Returns

the computed averages

static read_all(emulation_env_config: csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig, logger: logging.Logger, time_window_minutes: int = 100) csle_common.dao.emulation_config.emulation_metrics_time_series.EmulationMetricsTimeSeries[source]

Reads all time series data from the kafka log

Parameters

emulation_env_config – the configuration of the emulation environment

:param time_window_minutes : the length of the time window in minutes to consume (now-time_windows_minutes, now) :return: the collected time series data

csle_common.util.ssh_util module

class csle_common.util.ssh_util.SSHUtil[source]

Bases: object

Class containing utility functions for SSH connections

static execute_ssh_cmd(cmd: str, conn, wait_for_completion: bool = True, retries: int = 2) Tuple[bytes, bytes, float][source]

Executes an action on the emulation over a ssh connection, this is a synchronous operation that waits for the completion of the action before returning

Parameters
  • cmd – the command to execute

  • conn – the ssh connection

  • wait_for_completion – boolean flag whether to wait for completion or not

  • retries – number of retries

Returns

outdata, errdata, total_time

static execute_ssh_cmds(cmds: List[str], conn, wait_for_completion: bool = True) List[Tuple[bytes, bytes, float]][source]

Executes a list of commands over an ssh connection to the emulation

Parameters
  • cmds – the list of commands

  • conn – the ssh connection

  • wait_for_completion – whether to wait for completion of the commands or not

Returns

List of tuples (outdata, errdata, total_time)

Module contents