from typing import Union, Dict, Any
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_observation.attacker.emulation_attacker_observation_state \
import EmulationAttackerObservationState
from csle_common.dao.emulation_observation.defender.emulation_defender_observation_state \
import EmulationDefenderObservationState
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
import EmulationAttackerMachineObservationState
from csle_common.dao.emulation_observation.defender.emulation_defender_machine_observation_state \
import EmulationDefenderMachineObservationState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_config import EmulationAttackerActionConfig
from csle_common.dao.emulation_action.defender.emulation_defender_action_config import EmulationDefenderActionConfig
from csle_common.dao.emulation_observation.common.emulation_connection_observation_state import \
EmulationConnectionObservationState
from csle_common.dao.emulation_config.credential import Credential
from csle_base.json_serializable import JSONSerializable
from csle_collector.client_manager.client_population_metrics import ClientPopulationMetrics
from csle_collector.docker_stats_manager.dao.docker_stats import DockerStats
from csle_collector.snort_ids_manager.dao.snort_ids_alert_counters import SnortIdsAlertCounters
from csle_collector.snort_ids_manager.dao.snort_ids_rule_counters import SnortIdsRuleCounters
from csle_collector.ossec_ids_manager.dao.ossec_ids_alert_counters import OSSECIdsAlertCounters
from csle_collector.host_manager.dao.host_metrics import HostMetrics
[docs]class EmulationEnvState(JSONSerializable):
"""
Represents the combined state of the emulation environment,
including both the attacker's and the defender's states.
"""
def __init__(self, emulation_env_config: EmulationEnvConfig):
"""
Initializes the state
:param emulation_env_config: the environment configuration
:param attacker_action_config: the configuration of the attacker agent
:param defender_agent_config: the configuration of the defender agent
"""
self.emulation_env_config = emulation_env_config
self.attacker_action_config = EmulationAttackerActionConfig.all_actions_config(
num_nodes=len(self.emulation_env_config.containers_config.containers),
subnet_masks=self.emulation_env_config.topology_config.subnetwork_masks,
hacker_ip=self.emulation_env_config.containers_config.agent_ip)
self.defender_action_config = EmulationDefenderActionConfig.all_actions_config(
num_nodes=len(self.emulation_env_config.containers_config.containers),
subnet_masks=self.emulation_env_config.topology_config.subnetwork_masks)
self.vuln_lookup = constants.VULNERABILITIES.vuln_lookup
self.vuln_lookup_inv = {v: k for k, v in self.vuln_lookup.items()}
self.service_lookup = constants.SERVICES.service_lookup
self.service_lookup_inv = {v: k for k, v in self.service_lookup.items()}
self.os_lookup = constants.OS.os_lookup
self.os_lookup_inv = {v: k for k, v in self.os_lookup.items()}
self.attacker_obs_state: Union[EmulationAttackerObservationState, None] = None
self.defender_obs_state: Union[EmulationDefenderObservationState, None] = None
self.attacker_cached_ssh_connections: Dict[Any, Any] = {}
self.attacker_cached_telnet_connections: Dict[Any, EmulationConnectionObservationState] = {}
self.attacker_cached_ftp_connections: Dict[Any, EmulationConnectionObservationState] = {}
self.defender_cached_ssh_connections: Dict[Any, EmulationConnectionObservationState] = {}
self.attacker_cached_backdoor_credentials: Dict[Any, Credential] = {}
self.reset()
[docs] def initialize_defender_machines(self) -> None:
"""
Initializes the defender observation state based on the emulation configuration
:return: None
"""
defender_machines = []
for c in self.emulation_env_config.containers_config.containers:
defender_machines.append(EmulationDefenderMachineObservationState.from_container(
c, kafka_config=self.emulation_env_config.kafka_config))
if self.defender_obs_state is None:
raise ValueError("EmulationDefenderObservationState is None")
self.defender_obs_state.machines = defender_machines
self.defender_obs_state.start_monitoring_threads()
[docs] def reset(self) -> None:
"""
Resets the env state. Caches connections
:return: None
"""
agent_reachable = set()
if self.attacker_obs_state is not None:
agent_reachable = self.attacker_obs_state.agent_reachable
for m in self.attacker_obs_state.machines:
for c in m.ssh_connections:
self.attacker_cached_ssh_connections[(c.ip, c.credential.username, c.port)] = c
for c in m.telnet_connections:
self.attacker_cached_telnet_connections[(c.ip, c.credential.username, c.port)] = c
for c in m.ftp_connections:
self.attacker_cached_ftp_connections[(c.ip, c.credential.username, c.port)] = c
for cr in m.backdoor_credentials:
for ip in m.ips:
self.attacker_cached_backdoor_credentials[(ip, cr.username, cr.pw)] = cr
self.attacker_obs_state = EmulationAttackerObservationState(catched_flags=0, agent_reachable=agent_reachable)
self.attacker_obs_state.last_attacker_action = None
self.attacker_obs_state.undetected_intrusions_steps = 0
self.attacker_obs_state.all_flags = False
self.attacker_obs_state.catched_flags = 0
self.attacker_obs_state.step = 1
if self.defender_obs_state is not None:
for m in self.defender_obs_state.machines:
for c in m.ssh_connections:
self.defender_cached_ssh_connections[(c.ip, c.credential.username, c.port)] = c
else:
self.defender_obs_state = EmulationDefenderObservationState(
kafka_config=self.emulation_env_config.kafka_config,
client_population_metrics=ClientPopulationMetrics(),
docker_stats=DockerStats(), snort_ids_alert_counters=SnortIdsAlertCounters(),
snort_ids_rule_counters=SnortIdsRuleCounters(), ossec_ids_alert_counters=OSSECIdsAlertCounters(),
aggregated_host_metrics=HostMetrics(), defender_actions=[], attacker_actions=[])
[docs] def cleanup(self) -> None:
"""
Cleanup
:return: None
"""
for _, c in self.attacker_cached_ssh_connections.items():
c.cleanup()
for _, c in self.attacker_cached_ftp_connections.items():
c.cleanup()
for _, c in self.attacker_cached_telnet_connections.items():
c.cleanup()
if self.attacker_obs_state is None:
raise ValueError("EmualtionAttackerObservationState is None")
self.attacker_obs_state.cleanup()
for _, c in self.defender_cached_ssh_connections.items():
(ssh_conns, _) = c
for c2 in ssh_conns:
c2.cleanup()
if self.defender_obs_state is None:
raise ValueError("EmulationDefenderObservationState is None")
self.defender_obs_state.cleanup()
[docs] def get_attacker_machine(self, ip: str) -> Union[EmulationAttackerMachineObservationState, None]:
"""
Utility function for extracting the attacker machine from the attacker's observation
:param ip: the ip of the attacker machine
:return: the machine if is found, otherwise None
"""
if self.attacker_obs_state is None:
raise ValueError("EmulationAttackerObservationState is None")
for m in self.attacker_obs_state.machines:
for m_ip in m.ips:
if m_ip == ip:
return m
return None
[docs] def get_defender_machine(self, ip: str) -> Union[EmulationDefenderMachineObservationState, None]:
"""
Utility function for extracting the defender machine from the defender's observation given an IP
:param ip: the ip of the machine
:return: the machine if found otherwise None
"""
if self.defender_obs_state is None:
raise ValueError("EmulationDefenderObservationState is None")
for m in self.defender_obs_state.machines:
for m_ip in m.ips:
if m_ip == ip:
return m
return None
[docs] def copy(self) -> "EmulationEnvState":
"""
:return: a copy of the env state
"""
copy = EmulationEnvState(emulation_env_config=self.emulation_env_config)
if self.attacker_obs_state is None:
raise ValueError("EmulationAttackerObservationState is None")
if self.defender_obs_state is None:
raise ValueError("EmulationDefenderObservationState is None")
copy.attacker_obs_state = self.attacker_obs_state.copy()
copy.defender_obs_state = self.defender_obs_state.copy()
return copy
def __str__(self):
"""
:return: a string representation of the object
"""
return f"Attacker observation state: {self.attacker_obs_state}" \
f"Defender observation state: {self.defender_obs_state}"
[docs] @staticmethod
def from_dict(d: Dict[str, Any]) -> "EmulationEnvState":
"""
Converts a dict representation of the object into a an instance
:param d: the dict to convert
:return: the created instance
"""
obj = EmulationEnvState(emulation_env_config=EmulationEnvConfig.from_dict(d["emulation_env_config"]))
obj.attacker_action_config = EmulationAttackerActionConfig.from_dict(d["attacker_action_config"])
obj.defender_action_config = EmulationDefenderActionConfig.from_dict(d["defender_action_config"])
return obj
[docs] def to_dict(self) -> Dict[str, Any]:
"""
Converts the object to a dict representation
:return: a dict representation of the object
"""
d = {}
d["emulation_env_config"] = self.emulation_env_config.to_dict()
d["attacker_action_config"] = self.attacker_action_config.to_dict()
d["defender_action_config"] = self.defender_action_config.to_dict()
return d
[docs] @staticmethod
def from_json_file(json_file_path: str) -> "EmulationEnvState":
"""
Reads a json file and converts it to a DTO
:param json_file_path: the json file path
:return: the converted DTO
"""
import io
import json
with io.open(json_file_path, 'r') as f:
json_str = f.read()
return EmulationEnvState.from_dict(json.loads(json_str))