csle_common.dao.emulation_config package

Submodules

csle_common.dao.emulation_config.beats_config module

class csle_common.dao.emulation_config.beats_config.BeatsConfig(node_beats_configs: List[NodeBeatsConfig], num_elastic_shards: int, reload_enabled: bool)[source]

Bases: JSONSerializable

A DTO object representing the beats configuration of an emulation environment

copy() BeatsConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) BeatsConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) BeatsConfig[source]

Converts a dict representation of the object into a an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) BeatsConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_node_beats_config_by_ips(ips: List[str]) Optional[NodeBeatsConfig][source]

Gets a node beats config which matches a list of ips

Parameters

ips – the ips

Returns

the node beats config or None

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.client_managers_info module

class csle_common.dao.emulation_config.client_managers_info.ClientManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, client_managers_statuses: List[ClientsDTO], client_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the Client managers for a given emulation execution

static from_dict(d: Dict[str, Any]) ClientManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) ClientManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.client_population_config module

class csle_common.dao.emulation_config.client_population_config.ClientPopulationConfig(ip: str, networks: List[ContainerNetwork], client_manager_port: int, client_manager_log_file: str, client_manager_log_dir: str, client_manager_max_workers: int, clients: List[Client], workflows_config: WorkflowsConfig, client_time_step_len_seconds: int = 1, docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing the configuration of the client population of an emulation

copy() ClientPopulationConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) ClientPopulationConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) ClientPopulationConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) ClientPopulationConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

no_clients() ClientPopulationConfig[source]
Returns

A version of the config with no clients

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.cluster_config module

class csle_common.dao.emulation_config.cluster_config.ClusterConfig(cluster_nodes: List[ClusterNode])[source]

Bases: JSONSerializable

A DTO Class representing the config of a CSLE cluster

copy() ClusterConfig[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) ClusterConfig[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) ClusterConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() ClusterConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.cluster_node module

class csle_common.dao.emulation_config.cluster_node.ClusterNode(ip: str, leader: bool, cpus: int, gpus: int, RAM: int)[source]

Bases: JSONSerializable

A DTO Class representing a node in a CSLE cluster

copy() ClusterNode[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) ClusterNode[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) ClusterNode[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() ClusterNode[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Union[str, bool, int]][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.config module

class csle_common.dao.emulation_config.config.Config(management_admin_username_default: str, management_admin_password_default: str, management_admin_first_name_default: str, management_admin_last_name_default: str, management_admin_email_default: str, management_admin_organization_default: str, management_guest_username_default: str, management_guest_password_default: str, management_guest_first_name_default: str, management_guest_last_name_default: str, management_guest_email_default: str, management_guest_organization_default: str, ssh_admin_username: str, ssh_admin_password: str, ssh_agent_username: str, ssh_agent_password: str, metastore_user: str, metastore_password: str, metastore_database_name: str, metastore_ip: str, node_exporter_port: int, grafana_port: int, management_system_port: int, cadvisor_port: int, prometheus_port: int, node_exporter_pid_file: str, pgadmin_port: int, csle_mgmt_webapp_pid_file: str, docker_stats_manager_log_file: str, docker_stats_manager_log_dir: str, docker_stats_manager_port: int, docker_stats_manager_max_workers: int, docker_stats_manager_outfile: str, docker_stats_manager_pidfile: str, prometheus_pid_file: str, prometheus_log_file: str, prometheus_config_file: str, default_log_dir: str, cluster_config: ClusterConfig, node_exporter_log_file: str, allow_registration: bool, grafana_username: str, grafana_password: str, pgadmin_username: str, pgadmin_password: str, postgresql_log_dir: str, nginx_log_dir: str, flask_log_file: str, cluster_manager_log_file: str, version: str, localhost: bool)[source]

Bases: JSONSerializable

A DTO Class representing the CSLE configuration

copy() ClusterConfig[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) Config[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) Config[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static from_param_dict(d2: Dict[str, Any]) Config[source]

Convert a param-dict representation to a DTO representation

Returns

a dto representation of the object

static get_current_config() Optional[Config][source]
Returns

The currently parsed config

static get_std_param_names()[source]
Returns

The names ought to be included in a config class

static read_config_file() Config[source]

Reads and parses the config file

Returns

the parsed config file

static save_config_file(config: Dict[str, Any]) None[source]

Saves the config file to disk

Returns

None

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

to_param_dict() Dict[str, Any][source]
Returns

a param-dict representation of the object

csle_common.dao.emulation_config.connection_setup_dto module

class csle_common.dao.emulation_config.connection_setup_dto.ConnectionSetupDTO(connected: bool, credentials: List[Credential], target_connections: List[Any], tunnel_threads: List[ForwardTunnelThread], forward_ports: List[int], ports: List[int], interactive_shells: List[Any], non_failed_credentials: List[Credential], proxies: List[EmulationConnectionObservationState], ip: str, total_time: float = 0.0)[source]

Bases: JSONSerializable

DTO class containing information for setting up connections in the emulation

copy() ConnectionSetupDTO[source]
Returns

a copy of the object

static from_dict(d: Dict[str, Any]) ConnectionSetupDTO[source]

Converts a dict representation of the object into an object instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) ConnectionSetupDTO[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.container_network module

class csle_common.dao.emulation_config.container_network.ContainerNetwork(name: str, subnet_mask: str, bitmask: str, subnet_prefix: str, interface: str = 'eth0')[source]

Bases: JSONSerializable

DTO representing an IP network of virtual containers

copy() ContainerNetwork[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) ContainerNetwork[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) ContainerNetwork[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) ContainerNetwork[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() ContainerNetwork[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.containers_config module

class csle_common.dao.emulation_config.containers_config.ContainersConfig(containers: List[NodeContainerConfig], agent_ip: str, router_ip: str, networks: List[ContainerNetwork], ids_enabled: bool, vulnerable_nodes=None, agent_reachable_nodes=None)[source]

Bases: JSONSerializable

A DTO representing the configuration of the containers that make up an emulation environment

copy() ContainersConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) ContainersConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the physical servers where the execution will be deployed

Returns

the new config

static from_dict(d: Dict[str, Any]) ContainersConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) ContainersConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_agent_container() Optional[NodeContainerConfig][source]
Returns

get container of the attacker agent

get_agent_reachable_ips() List[str][source]
Returns

list of ips reachable for the attacker agent

get_container_from_full_name(name: str) Optional[NodeContainerConfig][source]

Utility function for getting the container

Parameters

name – the full name of the container

Returns

the container with the given ip or None

get_container_from_ip(ip: str) Optional[NodeContainerConfig][source]

Utility function for getting the container

Parameters

ip – the ip of the container

Returns

the container with the given ip or None

get_reachable_ips(container: NodeContainerConfig) List[str][source]

Get list of IP addresses reachable from a given container

Parameters

container – the container to get reachable IPs from

Returns

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.credential module

class csle_common.dao.emulation_config.credential.Credential(username: str, pw: str, port: int = -1, protocol: Optional[TransportProtocol] = None, service: Optional[str] = None, root: bool = False)[source]

Bases: JSONSerializable

A DTO Class to represent a credential to a service of some component in the infrastructure

copy() Credential[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) Credential[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) Credential[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

num_attributes() int[source]
Returns

The number of attribute of the DTO

static schema() Credential[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.default_network_firewall_config module

class csle_common.dao.emulation_config.default_network_firewall_config.DefaultNetworkFirewallConfig(ip: Optional[str], default_gw: Optional[str], default_input: str, default_output: str, default_forward: str, network: ContainerNetwork)[source]

Bases: JSONSerializable

DTO representing a default firewall configuration

copy() DefaultNetworkFirewallConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) DefaultNetworkFirewallConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) DefaultNetworkFirewallConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) DefaultNetworkFirewallConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() DefaultNetworkFirewallConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.docker_stats_manager_config module

class csle_common.dao.emulation_config.docker_stats_manager_config.DockerStatsManagerConfig(docker_stats_manager_log_file: str, docker_stats_manager_log_dir: str, docker_stats_manager_max_workers: int, time_step_len_seconds: int = 15, docker_stats_manager_port: int = 50046, version: str = '0.0.1')[source]

Bases: JSONSerializable

Represents the configuration of the docker stats managers in a CSLE emulation

copy() DockerStatsManagerConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) DockerStatsManagerConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) DockerStatsManagerConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) DockerStatsManagerConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() DockerStatsManagerConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.docker_stats_managers_info module

class csle_common.dao.emulation_config.docker_stats_managers_info.DockerStatsManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, docker_stats_managers_statuses: List[DockerStatsMonitorDTO], docker_stats_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the docker stats managers for a given emulation execution

static from_dict(d: Dict[str, Any]) DockerStatsManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) DockerStatsManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.elk_config module

class csle_common.dao.emulation_config.elk_config.ElkConfig(container: NodeContainerConfig, resources: NodeResourcesConfig, firewall_config: NodeFirewallConfig, elk_manager_log_file: str, elk_manager_log_dir: str, elk_manager_max_workers: int, elastic_port: int = 9200, kibana_port: int = 5601, logstash_port: int = 5044, time_step_len_seconds: int = 15, elk_manager_port: int = 50045, version: str = '0.0.1')[source]

Bases: JSONSerializable

Represents the configuration of an ELK node in CSLE

copy() ElkConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) ElkConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the physical servers where the execution is deployedn

Returns

the new config

static from_dict(d: Dict[str, Any]) ElkConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) ElkConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() ElkConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.elk_managers_info module

class csle_common.dao.emulation_config.elk_managers_info.ELKManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, elk_managers_statuses: List[ElkDTO], elk_managers_running: List[bool], local_kibana_port: int = -1, physical_server_ip='')[source]

Bases: JSONSerializable

DTO containing the status of the ELK managers for a given emulation execution

static from_dict(d: Dict[str, Any]) ELKManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) ELKManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.emulation_env_config module

class csle_common.dao.emulation_config.emulation_env_config.EmulationEnvConfig(name: str, containers_config: ContainersConfig, users_config: UsersConfig, flags_config: FlagsConfig, vuln_config: VulnerabilitiesConfig, topology_config: TopologyConfig, traffic_config: TrafficConfig, resources_config: ResourcesConfig, kafka_config: KafkaConfig, services_config: ServicesConfig, descr: str, static_attacker_sequences: Dict[str, List[EmulationAttackerAction]], ovs_config: OVSConfig, sdn_controller_config: Optional[SDNControllerConfig], host_manager_config: HostManagerConfig, snort_ids_manager_config: SnortIDSManagerConfig, ossec_ids_manager_config: OSSECIDSManagerConfig, docker_stats_manager_config: DockerStatsManagerConfig, elk_config: ElkConfig, beats_config: BeatsConfig, level: int, version: str, execution_id: int, csle_collector_version: str = 'latest', csle_ryu_version: str = 'latest')[source]

Bases: JSONSerializable

Class representing the configuration of an emulation

static check_if_ssh_connection_is_alive(conn: SSHClient) bool[source]

Utility function to check whether a SSH connection is alive or not :param conn: the connection to check :return: true or false

cleanup() None[source]

Cleans up old connections

Returns

None

close_all_connections() None[source]

Closes the emulation connection :return: None

connect(ip: str = '', username: str = '', pw: str = '', create_producer: bool = False) SSHClient[source]

Connects to the agent’s host with SSH, either directly or through a jumphost

Parameters
  • ip – the ip to connect to

  • username – the username to connect with

  • pw – the password to connect with

  • create_producer – whether the producer should be created if it not already created

Returns

the created conn

copy() EmulationEnvConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) EmulationEnvConfig[source]

Creates an execution config from the base config

Parameters
  • ip_first_octet – the id of the execution

  • physical_servers – the list of physical servers

Returns

the created execution config

create_producer() None[source]

Creates a Kafka producer

Returns

None

static from_dict(d: Dict[str, Any]) EmulationEnvConfig[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) EmulationEnvConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_all_ips() List[str][source]
Returns

a list of all ip addresses in the emulation

get_connection(ip: str) SSHClient[source]

Gets a connection to a given IP address

Parameters

ip – the ip address to get the connection for

Returns

the connection

get_container_from_ip(ip: str) Optional[NodeContainerConfig][source]

Utility function for getting a container with a specific IP

Parameters

ip – the ip of the container

Returns

the container with the given ip or None

get_hacker_connection() SSHClient[source]

Gets an SSH connection to the hacker agent, creates one if it does not exist

Returns

SSH connection to the hacker

get_network_by_name(net_name: str) Optional[ContainerNetwork][source]

Gets the network by name from the config

Parameters

net_name – the name of the network

Returns

the network with the given name

get_port_forward_port() int[source]
Returns

the next port to use for forwarding

ids() bool[source]

Check if the configuration includes an IDS

Returns

True if it includes an IDS, otherwise False

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.emulation_env_state module

class csle_common.dao.emulation_config.emulation_env_state.EmulationEnvState(emulation_env_config: EmulationEnvConfig)[source]

Bases: JSONSerializable

Represents the combined state of the emulation environment, including both the attacker’s and the defender’s states.

cleanup() None[source]

Cleanup

Returns

None

copy() EmulationEnvState[source]
Returns

a copy of the env state

static from_dict(d: Dict[str, Any]) EmulationEnvState[source]

Converts a dict representation of the object into a an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) EmulationEnvState[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_attacker_machine(ip: str) Optional[EmulationAttackerMachineObservationState][source]

Utility function for extracting the attacker machine from the attacker’s observation

Parameters

ip – the ip of the attacker machine

Returns

the machine if is found, otherwise None

get_defender_machine(ip: str) Optional[EmulationDefenderMachineObservationState][source]

Utility function for extracting the defender machine from the defender’s observation given an IP

Parameters

ip – the ip of the machine

Returns

the machine if found otherwise None

initialize_defender_machines() None[source]

Initializes the defender observation state based on the emulation configuration

Returns

None

reset() None[source]

Resets the env state. Caches connections

Returns

None

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.emulation_execution module

class csle_common.dao.emulation_config.emulation_execution.EmulationExecution(emulation_name: str, timestamp: float, ip_first_octet: int, emulation_env_config: EmulationEnvConfig, physical_servers: List[str])[source]

Bases: JSONSerializable

A DTO representing an execution of an emulation

static from_dict(d: Dict[str, Any]) EmulationExecution[source]

Converts a dict representation to a DTO

Parameters

d – the dict to convert

Returns

the DTO

static from_json_file(json_file_path: str) EmulationExecution[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.emulation_execution_info module

class csle_common.dao.emulation_config.emulation_execution_info.EmulationExecutionInfo(emulation_name: str, execution_id: int, snort_ids_managers_info: SnortIdsManagersInfo, ossec_ids_managers_info: OSSECIDSManagersInfo, kafka_managers_info: KafkaManagersInfo, host_managers_info: HostManagersInfo, client_managers_info: ClientManagersInfo, docker_stats_managers_info: DockerStatsManagersInfo, running_containers: List[NodeContainerConfig], stopped_containers: List[NodeContainerConfig], traffic_managers_info: TrafficManagersInfo, active_networks: List[ContainerNetwork], inactive_networks: List[ContainerNetwork], elk_managers_info: ELKManagersInfo, ryu_managers_info: Union[None, RyuManagersInfo])[source]

Bases: JSONSerializable

DTO containing the runtime status of an emulation execution

static from_dict(d: Dict[str, Any]) EmulationExecutionInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) EmulationExecutionInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dictionary representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.emulation_metrics_time_series module

class csle_common.dao.emulation_config.emulation_metrics_time_series.EmulationMetricsTimeSeries(client_metrics: List[ClientPopulationMetrics], aggregated_docker_stats: List[DockerStats], docker_host_stats: Dict[str, List[DockerStats]], host_metrics: Dict[str, List[HostMetrics]], aggregated_host_metrics: List[HostMetrics], defender_actions: List[EmulationDefenderAction], attacker_actions: List[EmulationAttackerAction], agg_snort_ids_metrics: List[SnortIdsAlertCounters], emulation_env_config: EmulationEnvConfig, ossec_host_alert_counters: Dict[str, List[OSSECIdsAlertCounters]], aggregated_ossec_host_alert_counters: List[OSSECIdsAlertCounters], openflow_flow_stats: List[FlowStatistic], openflow_port_stats: List[PortStatistic], avg_openflow_flow_stats: List[AvgFlowStatistic], avg_openflow_port_stats: List[AvgPortStatistic], openflow_flow_metrics_per_switch: Dict[str, List[FlowStatistic]], openflow_port_metrics_per_switch: Dict[str, List[PortStatistic]], openflow_flow_avg_metrics_per_switch: Dict[str, List[AvgFlowStatistic]], openflow_port_avg_metrics_per_switch: Dict[str, List[AvgPortStatistic]], agg_openflow_flow_metrics_per_switch: Dict[str, List[AggFlowStatistic]], agg_openflow_flow_stats: List[AggFlowStatistic], snort_ids_ip_metrics: Dict[str, List[SnortIdsIPAlertCounters]], agg_snort_ids_rule_metrics: List[SnortIdsRuleCounters], snort_alert_metrics_per_ids: Dict[str, List[SnortIdsAlertCounters]], snort_rule_metrics_per_ids: Dict[str, List[SnortIdsRuleCounters]])[source]

Bases: JSONSerializable

DTO containing time series data from the emulation

static from_dict(d: Dict[str, Any]) EmulationMetricsTimeSeries[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the converted instance

static from_json_file(json_file_path: str) EmulationMetricsTimeSeries[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.emulation_simulation_trace module

class csle_common.dao.emulation_config.emulation_simulation_trace.EmulationSimulationTrace(emulation_trace: EmulationTrace, simulation_trace: SimulationTrace)[source]

Bases: JSONSerializable

DTO class representing a combined emulation and simulation trace

static from_dict(d: Dict[str, Any]) EmulationSimulationTrace[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) EmulationSimulationTrace[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]
Returns

a dict representation of the DTO

csle_common.dao.emulation_config.emulation_statistics_windowed module

class csle_common.dao.emulation_config.emulation_statistics_windowed.EmulationStatisticsWindowed(window_size: int, emulation_name: str, descr: str)[source]

Bases: JSONSerializable

Windowed emulation statistic. The statistic is updated with the last <window size> samples

add_initial_state(s: EmulationEnvState) None[source]

Adds an initial state sample

Parameters

s – the initial state

Returns

None

add_state_transition(s: EmulationEnvState, s_prime: EmulationEnvState, a1: EmulationDefenderAction, a2: EmulationAttackerAction) None[source]

Adds a state transition samlpe

Parameters
  • s – the state

  • s_prime – the next state

  • a1 – the defender action

  • a2 – the attacker action

Returns

None

static from_dict(d: Dict[str, Any]) EmulationStatisticsWindowed[source]

Converts a dict representation of the object into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) EmulationStatisticsWindowed[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

update_emulation_statistics() None[source]

Updates the emulation statistic using the current window of samples

Returns

None

csle_common.dao.emulation_config.emulation_trace module

class csle_common.dao.emulation_config.emulation_trace.EmulationTrace(initial_attacker_observation_state: EmulationAttackerObservationState, initial_defender_observation_state: EmulationDefenderObservationState, emulation_name: str)[source]

Bases: JSONSerializable

DTO class representing a trace in the emulation system

static from_dict(d: Dict[str, Any]) EmulationTrace[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) EmulationTrace[source]

Reads a json file and converts it into a dto

Parameters

json_file_path – the json file path to save the DTO to

Returns

None

static from_json_str(json_str: str) EmulationTrace[source]

Converts json string into a DTO

Parameters

json_str – the json string representation

Returns

the DTO instance

static load_traces_from_disk(traces_file: str) List[EmulationTrace][source]

Utility function for loading and parsing a list of traces from a json file

Parameters

traces_file – (optional) a custom name of the traces file

Returns

a list of the loaded traces

num_attributes_per_time_step() int[source]
Returns

approximately the number of attributes recorded per time-step of the trace

static save_traces_to_disk(traces_save_dir, traces: List[EmulationTrace], traces_file: Optional[str] = None) None[source]

Utility function for saving a list of traces to a json file

Parameters
  • traces_save_dir – the directory where to save the traces

  • traces – the traces to save

  • traces_file – the filename of the traces file

Returns

None

static schema()[source]
Returns

the schema of the DTO

to_csv_record(max_time_steps: int, max_nodes: int, max_ports: int, max_vulns: int, null_value: int = -1) Tuple[List[Union[str, int, float]], List[str]][source]

Converts the trace into a csv row

Parameters
  • max_time_steps – the maximum number of time-steps to include in the row

  • max_nodes – the maximum number of nodes to include metrics from

  • max_ports – the maximum number of ports to include metrics from

  • max_vulns – the maximum number of vulnerabilities to include metrics from

  • null_value – the default null value if a metric is missing

Returns

the list of labels and values of the csv row

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.flag module

class csle_common.dao.emulation_config.flag.Flag(name: str, dir: str, id: int, path: str, requires_root: bool = False, score: int = 1)[source]

Bases: JSONSerializable

Class that represents a flag in the environment

copy() Flag[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) Flag[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) Flag[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() Flag[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Union[str, bool, int]][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.flags_config module

class csle_common.dao.emulation_config.flags_config.FlagsConfig(node_flag_configs: List[NodeFlagsConfig])[source]

Bases: JSONSerializable

A DTO representing the set of flags in an emulation environment

copy() FlagsConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) FlagsConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) FlagsConfig[source]

Converts a dict representation to a an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) FlagsConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_flags_for_ips(ips: List[str]) List[Flag][source]

Get all flags for a list of ip addresses

Parameters

ips – the list of ip addresses to get flags for

Returns

the list of flags

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.host_manager_config module

class csle_common.dao.emulation_config.host_manager_config.HostManagerConfig(host_manager_log_file: str, host_manager_log_dir: str, host_manager_max_workers: int, time_step_len_seconds: int = 15, host_manager_port: int = 50049, version: str = '0.0.1')[source]

Bases: JSONSerializable

Represents the configuration of the Host managers in a CSLE emulation

copy() HostManagerConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) HostManagerConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) HostManagerConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) HostManagerConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() HostManagerConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Union[str, int]][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.host_managers_info module

class csle_common.dao.emulation_config.host_managers_info.HostManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, host_managers_statuses: List[HostStatusDTO], host_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the Host managers for a given emulation execution

static from_dict(d: Dict[str, Any]) HostManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) HostManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.kafka_config module

class csle_common.dao.emulation_config.kafka_config.KafkaConfig(container: NodeContainerConfig, resources: NodeResourcesConfig, firewall_config: NodeFirewallConfig, topics: List[KafkaTopic], kafka_manager_log_file: str, kafka_manager_log_dir: str, kafka_manager_max_workers: int, kafka_port: int = 9092, kafka_port_external: int = 9292, time_step_len_seconds: int = 15, kafka_manager_port: int = 50051, version: str = '0.0.1')[source]

Bases: JSONSerializable

Represents the configuration of the Kafka node in a CSLE emulation

copy() KafkaConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) KafkaConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the physical servers of the execution

Returns

the new config

static from_dict(d: Dict[str, Any]) KafkaConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) KafkaConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() KafkaConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.kafka_managers_info module

class csle_common.dao.emulation_config.kafka_managers_info.KafkaManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, kafka_managers_statuses: List[KafkaDTO], kafka_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the Kafka managers for a given emulation execution

static from_dict(d: Dict[str, Any]) KafkaManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) KafkaManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.kafka_topic module

class csle_common.dao.emulation_config.kafka_topic.KafkaTopic(name: str, num_partitions: int, num_replicas: int, attributes: List[str], retention_time_hours: int)[source]

Bases: JSONSerializable

DTO representing a kafka topic (Records are assumed to be comma-separated strings) where attributes define the list of columns in the csv.

copy() KafkaTopic[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) KafkaTopic[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) KafkaTopic[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() KafkaTopic[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.network_service module

class csle_common.dao.emulation_config.network_service.NetworkService(protocol: TransportProtocol, port: int, name: str, credentials: Optional[List[Credential]] = None)[source]

Bases: JSONSerializable

DTO Class representing a service in the network

copy() NetworkService[source]
Returns

a copy of the DTO

static from_credential(credential: Credential) NetworkService[source]

Converts the object into a network service representation

Returns

the network service representation

static from_dict(d: Dict[str, Any]) NetworkService[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) NetworkService[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static pw_vuln_services()[source]
Returns

a list of all vulnerabilities that involve weak passwords

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_beats_config module

class csle_common.dao.emulation_config.node_beats_config.NodeBeatsConfig(ip: str, log_files_paths: List[str], filebeat_modules: List[str], metricbeat_modules: List[str], heartbeat_hosts_to_monitor: List[str], kafka_input: bool = False, start_filebeat_automatically: bool = False, start_packetbeat_automatically: bool = False, start_metricbeat_automatically: bool = False, start_heartbeat_automatically: bool = False)[source]

Bases: JSONSerializable

A DTO object representing the beats configuration of an individual container in an emulation

copy() NodeBeatsConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeBeatsConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeBeatsConfig[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) NodeBeatsConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_container_config module

class csle_common.dao.emulation_config.node_container_config.NodeContainerConfig(name: str, ips_and_networks: List[Tuple[str, ContainerNetwork]], version: str, level: str, restart_policy: str, suffix: str, os: str, execution_ip_first_octet: int = -1, docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing an individual container in an emulation environment

copy() NodeContainerConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) NodeContainerConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the list of physical servers of the execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeContainerConfig[source]

Converts a dict representation to an instance :param d: the dict to convert :return: the created instance

static from_json_file(json_file_path: str) NodeContainerConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_full_name() str[source]
Returns

the full name

get_ips() List[str][source]
Returns

a list of ips that this container has

get_readable_name() str[source]
Returns

the readable name

reachable(reachable_ips: List[str]) bool[source]

Check if container is reachable given a list of reachable ips

Parameters

reachable_ips – the list of reachable ips

Returns

True if the container is reachable, false otherwise

static schema() NodeContainerConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_firewall_config module

class csle_common.dao.emulation_config.node_firewall_config.NodeFirewallConfig(ips_gw_default_policy_networks: List[DefaultNetworkFirewallConfig], hostname: str, output_accept: Set[str], input_accept: Set[str], forward_accept: Set[str], output_drop: Set[str], input_drop: Set[str], forward_drop: Set[str], routes: Set[Tuple[str, str]], docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing a firewall configuration of a container in an emulation environment

copy() NodeFirewallConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeFirewallConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeFirewallConfig[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) NodeFirewallConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_ips()[source]
Returns

list of ip addresses

static schema() NodeFirewallConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_flags_config module

class csle_common.dao.emulation_config.node_flags_config.NodeFlagsConfig(ip: str, flags: List[Flag], docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing the set of flags at a specific container in an emulation environment

copy() NodeFlagsConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeFlagsConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeFlagsConfig[source]

Converts a dict representation of the object to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) NodeFlagsConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_network_config module

Network configuration of a container in the emulation

class csle_common.dao.emulation_config.node_network_config.NodeNetworkConfig(interface: str = 'eth0', limit_packets_queue: int = 30000, packet_delay_ms: float = 0.1, packet_delay_jitter_ms: float = 0.025, packet_delay_correlation_percentage: float = 25, packet_delay_distribution: PacketDelayDistributionType = PacketDelayDistributionType.PARETO, packet_loss_type: PacketLossType = PacketLossType.GEMODEL, packet_loss_rate_random_percentage: float = 2, packet_loss_random_correlation_percentage: float = 25, loss_state_markov_chain_p13: float = 0.1, loss_state_markov_chain_p31: float = 0.1, loss_state_markov_chain_p32: float = 0.1, loss_state_markov_chain_p23: float = 0.1, loss_state_markov_chain_p14: float = 0.1, loss_gemodel_p: float = 0.0001, loss_gemodel_r: float = 0.999, loss_gemodel_h: float = 0.0001, loss_gemodel_k: float = 0.9999, packet_corrupt_percentage: float = 1e-05, packet_corrupt_correlation_percentage: float = 25, packet_duplicate_percentage: float = 1e-05, packet_duplicate_correlation_percentage: float = 25, packet_reorder_percentage: float = 0.0025, packet_reorder_correlation_percentage: float = 25, packet_reorder_gap: int = 5, rate_limit_mbit: float = 100, packet_overhead_bytes: int = 0, cell_overhead_bytes: int = 0)[source]

Bases: JSONSerializable

A DTO object representing the network configuration of a specific container in an emulation environment

copy() NodeNetworkConfig[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) NodeNetworkConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the converted instance

static from_json_file(json_file_path: str) NodeNetworkConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() NodeNetworkConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_resources_config module

class csle_common.dao.emulation_config.node_resources_config.NodeResourcesConfig(container_name: str, num_cpus: int, available_memory_gb: int, ips_and_network_configs: List[Tuple[str, NodeNetworkConfig]], docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing the resources of a specific container in an emulation environment

copy() NodeResourcesConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeResourcesConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeResourcesConfig[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) NodeResourcesConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_ips() List[str][source]
Returns

a list of ips

static schema() NodeResourcesConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_services_config module

class csle_common.dao.emulation_config.node_services_config.NodeServicesConfig(ip: str, services: List[NetworkService])[source]

Bases: JSONSerializable

A DTO object representing the services configuration of a node in an emulation environment

copy() NodeServicesConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeServicesConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeServicesConfig[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) NodeServicesConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_traffic_config module

class csle_common.dao.emulation_config.node_traffic_config.NodeTrafficConfig(ip: str, commands: List[str], traffic_manager_log_file: str, traffic_manager_log_dir: str, traffic_manager_max_workers: int, traffic_manager_port: int = 50043, docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing the traffic configuration of an individual container in an emulation

copy() NodeTrafficConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeTrafficConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeTrafficConfig[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) NodeTrafficConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_users_config module

class csle_common.dao.emulation_config.node_users_config.NodeUsersConfig(ip: str, users: List[User], docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing the users of a container in an emulation environment

copy() NodeUsersConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeUsersConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeUsersConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) NodeUsersConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.node_vulnerability_config module

class csle_common.dao.emulation_config.node_vulnerability_config.NodeVulnerabilityConfig(ip: str, vuln_type: VulnType, name: str, port: Optional[int], protocol: TransportProtocol, credentials: Optional[List[Credential]] = None, cvss: float = 2.0, cve: Optional[str] = None, service: Optional[str] = None, root: bool = False, docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

A DTO object representing a vulnerability of a container in the emulation environment

copy() NodeVulnerabilityConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) NodeVulnerabilityConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) NodeVulnerabilityConfig[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) NodeVulnerabilityConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.ossec_ids_manager_config module

class csle_common.dao.emulation_config.ossec_ids_manager_config.OSSECIDSManagerConfig(ossec_ids_manager_log_file: str, ossec_ids_manager_log_dir: str, ossec_ids_manager_max_workers: int, time_step_len_seconds: int = 15, ossec_ids_manager_port: int = 50047, version: str = '0.0.1')[source]

Bases: JSONSerializable

Represents the configuration of the OSSEC IDS managers in a CSLE emulation

copy() OSSECIDSManagerConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) OSSECIDSManagerConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) OSSECIDSManagerConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) OSSECIDSManagerConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() OSSECIDSManagerConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Union[str, int]][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.ossec_managers_info module

class csle_common.dao.emulation_config.ossec_managers_info.OSSECIDSManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, ossec_ids_managers_statuses: List[OSSECIdsMonitorDTO], ossec_ids_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the OSSEC IDS managers for a given emulation execution

static from_dict(d: Dict[str, Any]) OSSECIDSManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) OSSECIDSManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.ovs_config module

class csle_common.dao.emulation_config.ovs_config.OVSConfig(switch_configs: List[OvsSwitchConfig])[source]

Bases: JSONSerializable

DTO containing the configuration of OVS in an emulation

copy() OVSConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) OVSConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the list of physical servers of the execution

Returns

the new config

static from_dict(d: Dict[str, Any]) OVSConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) OVSConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.ovs_switch_config module

class csle_common.dao.emulation_config.ovs_switch_config.OvsSwitchConfig(container_name: str, ip: str, openflow_protocols: List[str], controller_ip: str, controller_port: int, controller_transport_protocol: str, docker_gw_bridge_ip: str = '', physical_host_ip: str = '')[source]

Bases: JSONSerializable

DTO containing the configuration of an OVS Switch

copy() OvsSwitchConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) OvsSwitchConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the list of physical servers of the execution

Returns

the new config

static from_dict(d: Dict[str, Any]) OvsSwitchConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) OvsSwitchConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.packet_delay_distribution_type module

Type of delay distributions on a container’s network interface

class csle_common.dao.emulation_config.packet_delay_distribution_type.PacketDelayDistributionType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum representing the different types of delay distributions to emulate on a container

NORMAL = 1
PARETO = 2
PARETONORMAL = 3
UNIFORM = 0

csle_common.dao.emulation_config.packet_loss_type module

Type of packet losses on a container’s network interface

class csle_common.dao.emulation_config.packet_loss_type.PacketLossType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum representing the different types of packet losses to emulate on a container

GEMODEL = 2
RANDOM = 0
STATE = 1

csle_common.dao.emulation_config.resources_config module

class csle_common.dao.emulation_config.resources_config.ResourcesConfig(node_resources_configurations: List[NodeResourcesConfig])[source]

Bases: JSONSerializable

A DTO representing the resources assigned to the containers in an emulation environment

copy() ResourcesConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) ResourcesConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) ResourcesConfig[source]

Converts a dict representation into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) ResourcesConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.ryu_managers_info module

class csle_common.dao.emulation_config.ryu_managers_info.RyuManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, ryu_managers_statuses: List[RyuDTO], ryu_managers_running: List[bool], local_controller_web_port: int = -1, physical_server_ip: str = '')[source]

Bases: JSONSerializable

DTO containing the status of the Ryu managers for a given emulation execution

static from_dict(d: Dict[str, Any]) RyuManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) RyuManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.sdn_controller_config module

class csle_common.dao.emulation_config.sdn_controller_config.SDNControllerConfig(container: NodeContainerConfig, resources: NodeResourcesConfig, firewall_config: NodeFirewallConfig, controller_port: int, controller_type: SDNControllerType, controller_module_name: str, controller_web_api_port: int, manager_log_file: str, manager_log_dir: str, manager_max_workers: int, time_step_len_seconds: int = 15, version: str = '0.0.1', manager_port: int = 50042)[source]

Bases: JSONSerializable

DTO containing configuration for the SDN controller

copy() SDNControllerConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int, physical_servers: List[str]) SDNControllerConfig[source]

Creates a new config for an execution

Parameters
  • ip_first_octet – the first octet of the IP of the new execution

  • physical_servers – the physical servers of the execution

Returns

the new config

static from_dict(d: Dict[str, Any]) SDNControllerConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) SDNControllerConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.sdn_controller_type module

Type of SDN controllers in CSLE

class csle_common.dao.emulation_config.sdn_controller_type.SDNControllerType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum representing the different SDN controller types supported in CSLE

RYU = 0

csle_common.dao.emulation_config.services_config module

class csle_common.dao.emulation_config.services_config.ServicesConfig(services_configs: List[NodeServicesConfig])[source]

Bases: JSONSerializable

A DTO object representing the services configuration of an emulation environment

copy() ServicesConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) ServicesConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) ServicesConfig[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) ServicesConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_services_for_ips(ips: List[str]) List[NetworkService][source]

Gets all services for a list ip addresses

Parameters

ips – the list of ips

Returns

the list of services

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.snort_ids_manager_config module

class csle_common.dao.emulation_config.snort_ids_manager_config.SnortIDSManagerConfig(snort_ids_manager_log_file: str, snort_ids_manager_log_dir: str, snort_ids_manager_max_workers: int, time_step_len_seconds: int = 15, snort_ids_manager_port: int = 50048, version: str = '0.0.1')[source]

Bases: JSONSerializable

Represents the configuration of the Snort IDS managers in a CSLE emulation

copy() SnortIDSManagerConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) SnortIDSManagerConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) SnortIDSManagerConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) SnortIDSManagerConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static schema() SnortIDSManagerConfig[source]
Returns

get the schema of the DTO

to_dict() Dict[str, Union[str, int]][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.snort_managers_info module

class csle_common.dao.emulation_config.snort_managers_info.SnortIdsManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, snort_ids_managers_statuses: List[SnortIdsMonitorDTO], snort_ids_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the snort managers for a given emulation execution

static from_dict(d: Dict[str, Any]) SnortIdsManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) SnortIdsManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

static get_empty_dto() SnortIdsManagersInfo[source]
Returns

an empty version of the DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.static_emulation_attacker_type module

Type of vulnerabilities in an emulation

class csle_common.dao.emulation_config.static_emulation_attacker_type.StaticEmulationAttackerType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum representing the different static emulation attacker types

EXPERIENCED = 1
EXPERT = 2
NOVICE = 0

csle_common.dao.emulation_config.topology_config module

class csle_common.dao.emulation_config.topology_config.TopologyConfig(node_configs: List[NodeFirewallConfig], subnetwork_masks: List[str])[source]

Bases: JSONSerializable

A DTO representing the topology configuration of an emulation environment

copy() TopologyConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) TopologyConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) TopologyConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) TopologyConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.traffic_config module

class csle_common.dao.emulation_config.traffic_config.TrafficConfig(node_traffic_configs: List[NodeTrafficConfig], client_population_config: ClientPopulationConfig)[source]

Bases: JSONSerializable

A DTO object representing the traffic configuration of an emulation environment

copy() TrafficConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) TrafficConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) TrafficConfig[source]

Converts a dict representation of the object into a an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) TrafficConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_node_traffic_config_by_ip(ip: str) Optional[NodeTrafficConfig][source]

Gets a node traffic config with a specific IP

Parameters

ip – the ip

Returns

the node traffic config or None

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.traffic_managers_info module

class csle_common.dao.emulation_config.traffic_managers_info.TrafficManagersInfo(ips: List[str], ports: List[int], emulation_name: str, execution_id: int, traffic_managers_statuses: List[TrafficDTO], traffic_managers_running: List[bool])[source]

Bases: JSONSerializable

DTO containing the status of the Traffic managers for a given emulation execution

static from_dict(d: Dict[str, Any]) TrafficManagersInfo[source]

Convert a dict representation to a DTO representation

Returns

a dto representation of the object

static from_json_file(json_file_path: str) TrafficManagersInfo[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.transport_protocol module

Transport protocols in CSLE

class csle_common.dao.emulation_config.transport_protocol.TransportProtocol(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum representing the different transport protocols in the network.

TCP = 0
UDP = 1

csle_common.dao.emulation_config.user module

class csle_common.dao.emulation_config.user.User(username: str, pw: str, root: bool)[source]

Bases: JSONSerializable

DTO class representing a user in an emulation

copy() User[source]
Returns

a copy of the DTO

static from_dict(d: Dict[str, Any]) User[source]

Converts a dict representation of the object into an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) User[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

to_dict() Dict[str, Union[str, bool]][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.users_config module

class csle_common.dao.emulation_config.users_config.UsersConfig(users_configs: List[NodeUsersConfig])[source]

Bases: JSONSerializable

A DTO object representing the users configuration of an emulation environment

copy() UsersConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) UsersConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) UsersConfig[source]

Converts a dict representation to an instance

Parameters

d – the dict to convert

Returns

the created instance

static from_json_file(json_file_path: str) UsersConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_root_usernames(ips: List[str]) List[str][source]

Gets the root usernames for a list of ips

Parameters

ips – the list of ips to get the root usernames for

Returns

the list of root usernames

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.vulnerabilities_config module

class csle_common.dao.emulation_config.vulnerabilities_config.VulnerabilitiesConfig(node_vulnerability_configs: List[NodeVulnerabilityConfig])[source]

Bases: JSONSerializable

A DTO class representing the vulnerabilities configuration of an emulation environment

copy() VulnerabilitiesConfig[source]
Returns

a copy of the DTO

create_execution_config(ip_first_octet: int) VulnerabilitiesConfig[source]

Creates a new config for an execution

Parameters

ip_first_octet – the first octet of the IP of the new execution

Returns

the new config

static from_dict(d: Dict[str, Any]) VulnerabilitiesConfig[source]

Converts a dict representation of the object to a DTO representation :return: a DTO representation of the object

static from_json_file(json_file_path: str) VulnerabilitiesConfig[source]

Reads a json file and converts it to a DTO

Parameters

json_file_path – the json file path

Returns

the converted DTO

get_vulnerabilities(ips: List[str]) List[NodeVulnerabilityConfig][source]

Gets a list of vulnerabilities for a list of ip addresses

Parameters

ips – the list of ip addresse

Returns

the list of vulnerabilities corresponding to the list of ip addresses

to_dict() Dict[str, Any][source]

Converts the object to a dict representation

Returns

a dict representation of the object

csle_common.dao.emulation_config.vulnerability_type module

Type of vulnerabilities in an emulation

class csle_common.dao.emulation_config.vulnerability_type.VulnType(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]

Bases: IntEnum

Enum representing the different vulnerability types for generated containers

PRIVILEGE_ESCALATION = 3
RCE = 1
SQL_INJECTION = 2
WEAK_PW = 0

Module contents