Source code for csle_system_identification.environment_evaluations.stopping_game_emulation_eval

from typing import List
from csle_common.dao.emulation_config.emulation_trace import EmulationTrace
from csle_common.dao.simulation_config.simulation_trace import SimulationTrace
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.training.policy import Policy
from csle_common.dao.emulation_config.emulation_simulation_trace import EmulationSimulationTrace
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.defender.emulation_defender_stopping_actions import (
    EmulationDefenderStoppingActions)
from csle_common.dao.emulation_action.attacker.emulation_attacker_stopping_actions import (
    EmulationAttackerStoppingActions)
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.logging.log import Logger
from gym_csle_stopping_game.envs.stopping_game_env import StoppingGameEnv
from csle_system_identification.emulator import Emulator
from gym_csle_stopping_game.util.stopping_game_util import StoppingGameUtil


[docs]class StoppingGameEmulationEval: """ Utility class for performing emulation evaluations of the stopping game """
[docs] @staticmethod def emulation_evaluation(env: StoppingGameEnv, n_episodes: int, intrusion_seq: List[EmulationAttackerAction], defender_policy: Policy, attacker_policy: Policy, emulation_env_config: EmulationEnvConfig, simulation_env_config: SimulationEnvConfig) -> List[EmulationSimulationTrace]: """ Utility function for evaluating a strategy profile in the emulation environment :param env: the environment to use for evaluation :param n_episodes: the number of evaluation episodes :param intrusion_seq: the intrusion sequence for the evaluation (sequence of attacker actions) :param defender_policy: the defender policy for the evaluation :param attacker_policy: the attacker policy for the evaluation :param emulation_env_config: configuration of the emulation environment for the evaluation :param simulation_env_config: configuration of the simulation environment for the evaluation :return: traces with the evaluation results """ logger = Logger.__call__().get_logger() traces = [] s = EmulationEnvState(emulation_env_config=emulation_env_config) s.initialize_defender_machines() for i in range(n_episodes): done = False defender_obs_space = simulation_env_config.joint_observation_space_config.observation_spaces[0] b = env.state.b1 o, _ = env.reset() (d_obs, a_obs) = o t = 0 s.reset() emulation_trace = EmulationTrace(initial_attacker_observation_state=s.attacker_obs_state, initial_defender_observation_state=s.defender_obs_state, emulation_name=emulation_env_config.name) simulation_trace = SimulationTrace(simulation_env=env.config.env_name) while not done: a1 = defender_policy.action(d_obs) a2 = attacker_policy.action(a_obs) o, r, done, info, _ = env.step((a1, a2)) (d_obs, a_obs) = o r_1, r_2 = r logger.debug(f"a1:{a1}, a2:{a2}, d_obs:{d_obs}, a_obs:{a_obs}, r:{r}, done:{done}, info: {info}") if a1 == 0: defender_action = EmulationDefenderStoppingActions.CONTINUE(index=-1) else: defender_action = EmulationDefenderStoppingActions.CONTINUE(index=-1) if env.state.s == 1: if t >= len(intrusion_seq): t = 0 attacker_action = intrusion_seq[t] else: attacker_action = EmulationAttackerStoppingActions.CONTINUE(index=-1) emulation_trace, s = Emulator.run_actions( s=s, emulation_env_config=emulation_env_config, attacker_action=attacker_action, defender_action=defender_action, trace=emulation_trace, sleep_time=emulation_env_config.kafka_config.time_step_len_seconds) o_components = [s.defender_obs_state.snort_ids_alert_counters.severe_alerts, s.defender_obs_state.snort_ids_alert_counters.warning_alerts, s.defender_obs_state.aggregated_host_metrics.num_failed_login_attempts] o_components_str = ",".join(list(map(lambda x: str(x), o_components))) logger.debug(f"o_components:{o_components}") logger.debug(f"observation_id_to_observation_vector_inv:" f"{defender_obs_space.observation_id_to_observation_vector_inv}") logger.debug(f"observation_id_to_observation_vector_inv:" f"{o_components_str in defender_obs_space.observation_id_to_observation_vector_inv}") emulation_o = 0 if o_components_str in defender_obs_space.observation_id_to_observation_vector_inv: emulation_o = defender_obs_space.observation_id_to_observation_vector_inv[o_components_str] logger.debug(f"o:{emulation_o}") b = StoppingGameUtil.next_belief(o=emulation_o, a1=a1, b=b, pi2=a2, config=env.config, l=env.state.l, a2=a2) d_obs[1] = b[1] a_obs[1] = b[1] logger.debug(f"b:{b}") simulation_trace.defender_rewards.append(r_1) simulation_trace.attacker_rewards.append(r_2) simulation_trace.attacker_actions.append(a2) simulation_trace.defender_actions.append(a1) simulation_trace.infos.append(info) simulation_trace.states.append(s) simulation_trace.beliefs.append(b[1]) simulation_trace.infrastructure_metrics.append(emulation_o) em_sim_trace = EmulationSimulationTrace(emulation_trace=emulation_trace, simulation_trace=simulation_trace) MetastoreFacade.save_emulation_simulation_trace(em_sim_trace) traces.append(em_sim_trace) return traces