Source code for csle_attacker.emulation.post_exploit_middleware

from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
    import EmulationAttackerMachineObservationState
from csle_attacker.emulation.util.shell_util import ShellUtil


[docs]class PostExploitMiddleware: """ Class that implements functionality for executing post-exploits actions on the emulation """
[docs] @staticmethod def execute_service_login(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Executes a service login on the emulation using previously found credentials :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ShellUtil.execute_service_login_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_bash_find_flag(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Searches the file system for all servers where the agent is currently logged in to find flags :param s: the current state :param a: the action to take :return: s_prime, reward, done """ new_machines_obs = [] total_cost = 0.0 for machine in s.attacker_obs_state.machines: if machine.logged_in: new_m_obs = EmulationAttackerMachineObservationState(ips=machine.ips) root_scan = False # Start with ssh connections new_m_obs, ssh_cost, ssh_root = ShellUtil._find_flag_using_ssh( machine=machine, emulation_env_config=s.emulation_env_config, a=a, new_m_obs=new_m_obs) new_m_obs.filesystem_searched = True total_cost += ssh_cost if ssh_root: root_scan = True if root_scan: new_machines_obs.append(new_m_obs) continue # If root scan not tried, try telnet connections new_m_obs, telnet_cost, telnet_root = ShellUtil._find_flag_using_telnet( machine=machine, emulation_env_config=s.emulation_env_config, a=a, new_m_obs=new_m_obs) new_m_obs.filesystem_searched = True total_cost += telnet_cost if telnet_root: root_scan = True if root_scan: new_machines_obs.append(new_m_obs) continue # If root scan not tried, try ftp connections new_m_obs, ftp_cost, ftp_root = ShellUtil._find_flag_using_ftp( machine=machine, emulation_env_config=s.emulation_env_config, a=a, new_m_obs=new_m_obs) new_m_obs.filesystem_searched = True total_cost += ftp_cost if ftp_root: root_scan = True new_machines_obs.append(new_m_obs) attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, new_machines_obs, emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_install_tools(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Uses compromised machines with root access to install tools :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ShellUtil.install_tools_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_ssh_backdoor(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Uses compromised machines with root access to setup SSH backdoor :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ShellUtil.execute_ssh_backdoor_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime