from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
import EmulationAttackerMachineObservationState
from csle_attacker.emulation.util.shell_util import ShellUtil
[docs]class PostExploitMiddleware:
"""
Class that implements functionality for executing post-exploits actions on the emulation
"""
[docs] @staticmethod
def execute_service_login(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Executes a service login on the emulation using previously found credentials
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ShellUtil.execute_service_login_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_bash_find_flag(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Searches the file system for all servers where the agent is currently logged in to find flags
:param s: the current state
:param a: the action to take
:return: s_prime, reward, done
"""
new_machines_obs = []
total_cost = 0.0
for machine in s.attacker_obs_state.machines:
if machine.logged_in:
new_m_obs = EmulationAttackerMachineObservationState(ips=machine.ips)
root_scan = False
# Start with ssh connections
new_m_obs, ssh_cost, ssh_root = ShellUtil._find_flag_using_ssh(
machine=machine, emulation_env_config=s.emulation_env_config, a=a, new_m_obs=new_m_obs)
new_m_obs.filesystem_searched = True
total_cost += ssh_cost
if ssh_root:
root_scan = True
if root_scan:
new_machines_obs.append(new_m_obs)
continue
# If root scan not tried, try telnet connections
new_m_obs, telnet_cost, telnet_root = ShellUtil._find_flag_using_telnet(
machine=machine, emulation_env_config=s.emulation_env_config, a=a, new_m_obs=new_m_obs)
new_m_obs.filesystem_searched = True
total_cost += telnet_cost
if telnet_root:
root_scan = True
if root_scan:
new_machines_obs.append(new_m_obs)
continue
# If root scan not tried, try ftp connections
new_m_obs, ftp_cost, ftp_root = ShellUtil._find_flag_using_ftp(
machine=machine, emulation_env_config=s.emulation_env_config, a=a, new_m_obs=new_m_obs)
new_m_obs.filesystem_searched = True
total_cost += ftp_cost
if ftp_root:
root_scan = True
new_machines_obs.append(new_m_obs)
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, new_machines_obs, emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_install_tools(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Uses compromised machines with root access to install tools
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ShellUtil.install_tools_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_ssh_backdoor(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Uses compromised machines with root access to setup SSH backdoor
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ShellUtil.execute_ssh_backdoor_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime