from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
import EmulationAttackerMachineObservationState
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_attacker.emulation.util.nmap_util import NmapUtil
from csle_attacker.emulation.util.exploit_util import ExploitUtil
from csle_common.logging.log import Logger
[docs]class ExploitMiddleware:
"""
Class that implements functionality for executing exploits actions on the emulation
"""
[docs] @staticmethod
def execute_telnet_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a Telnet Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_ssh_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a SSH Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_ftp_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a FTP Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_cassandra_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a Cassandra Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_irc_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a IRC Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_mongo_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a Mongo Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_mysql_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a MySQL Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_smtp_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a SMTP Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_postgres_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a Postgres Dictionary Password Attack action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_sambacry(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a Sambacry Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ExploitUtil.sambacry_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_shellshock(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a ShellShock Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ExploitUtil.shellshock_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_dvwa_sql_injection(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a DVWA SQL Injection Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ExploitUtil.dvwa_sql_injection_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_cve_2015_3306_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a CVE-2015-3306 Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ExploitUtil.cve_2015_3306_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_cve_2015_1427_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a CVE-2015-1427 Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ExploitUtil.cve_2015_1427_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_cve_2016_10033_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a CVE-2016-10033 Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
s_prime = ExploitUtil.cve_2016_10033_helper(s=s, a=a)
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_cve_2010_0426_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a CVE-2010-0426 Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
new_machines_obs = []
total_cost = 0.0
for machine in s.attacker_obs_state.machines:
for ip in machine.ips:
try:
if ip in a.ips and machine.logged_in and not machine.root:
new_m_obs = EmulationAttackerMachineObservationState(ips=machine.ips)
new_m_obs.cve_2010_0426_tried = True
# Try to escalate privileges using existing connections
new_m_obs, m_cost, exploit_successful, root_credential, service = \
ExploitUtil.cve_2010_0426_helper(s=s, a=a, machine=machine, result=new_m_obs)
new_machines_obs.append(new_m_obs.copy())
# Total cost update
total_cost += m_cost
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an exception executing exploit CVE-2010-0426: {str(e)}, {repr(e)}")
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, new_machines_obs, emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime
[docs] @staticmethod
def execute_cve_2015_5602_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Performs a CVE-2015-5602 Exploit action
:param s: the current state
:param a: the action to take
:return: s_prime
"""
new_machines_obs = []
total_cost = 0.0
for machine in s.attacker_obs_state.machines:
for ip in machine.ips:
try:
if ip in a.ips and machine.logged_in and not machine.root:
new_m_obs = EmulationAttackerMachineObservationState(ips=machine.ips)
new_m_obs.cve_2015_5602_tried = True
# Try to escalate privileges using existing connections
new_m_obs, m_cost, exploit_successful, root_credential, service = \
ExploitUtil.cve_2015_5602_helper(
s=s, a=a, machine=machine, result=new_m_obs)
new_machines_obs.append(new_m_obs.copy())
# Total cost update
total_cost += m_cost
except Exception as e:
Logger.__call__().get_logger().debug(f"{str(e)}, {repr(e)}")
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, new_machines_obs, action=a, emulation_env_config=s.emulation_env_config)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
s_prime.attacker_obs_state.sort_machines()
s_prime.defender_obs_state.sort_machines()
return s_prime