Source code for csle_attacker.emulation.exploit_middleware

from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
    import EmulationAttackerMachineObservationState
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_attacker.emulation.util.nmap_util import NmapUtil
from csle_attacker.emulation.util.exploit_util import ExploitUtil
from csle_common.logging.log import Logger


[docs]class ExploitMiddleware: """ Class that implements functionality for executing exploits actions on the emulation """
[docs] @staticmethod def execute_telnet_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a Telnet Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_ssh_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a SSH Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_ftp_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a FTP Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_cassandra_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a Cassandra Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_irc_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a IRC Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_mongo_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a Mongo Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_mysql_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a MySQL Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_smtp_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a SMTP Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_postgres_same_user_dictionary(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a Postgres Dictionary Password Attack action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = NmapUtil.nmap_scan_action_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_sambacry(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a Sambacry Exploit action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ExploitUtil.sambacry_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_shellshock(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a ShellShock Exploit action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ExploitUtil.shellshock_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_dvwa_sql_injection(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a DVWA SQL Injection Exploit action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ExploitUtil.dvwa_sql_injection_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_cve_2015_3306_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a CVE-2015-3306 Exploit action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ExploitUtil.cve_2015_3306_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_cve_2015_1427_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a CVE-2015-1427 Exploit action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ExploitUtil.cve_2015_1427_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_cve_2016_10033_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a CVE-2016-10033 Exploit action :param s: the current state :param a: the action to take :return: s_prime """ s_prime = ExploitUtil.cve_2016_10033_helper(s=s, a=a) s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_cve_2010_0426_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a CVE-2010-0426 Exploit action :param s: the current state :param a: the action to take :return: s_prime """ new_machines_obs = [] total_cost = 0.0 for machine in s.attacker_obs_state.machines: for ip in machine.ips: try: if ip in a.ips and machine.logged_in and not machine.root: new_m_obs = EmulationAttackerMachineObservationState(ips=machine.ips) new_m_obs.cve_2010_0426_tried = True # Try to escalate privileges using existing connections new_m_obs, m_cost, exploit_successful, root_credential, service = \ ExploitUtil.cve_2010_0426_helper(s=s, a=a, machine=machine, result=new_m_obs) new_machines_obs.append(new_m_obs.copy()) # Total cost update total_cost += m_cost except Exception as e: Logger.__call__().get_logger().warning( f"There was an exception executing exploit CVE-2010-0426: {str(e)}, {repr(e)}") attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, new_machines_obs, emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def execute_cve_2015_5602_exploit(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Performs a CVE-2015-5602 Exploit action :param s: the current state :param a: the action to take :return: s_prime """ new_machines_obs = [] total_cost = 0.0 for machine in s.attacker_obs_state.machines: for ip in machine.ips: try: if ip in a.ips and machine.logged_in and not machine.root: new_m_obs = EmulationAttackerMachineObservationState(ips=machine.ips) new_m_obs.cve_2015_5602_tried = True # Try to escalate privileges using existing connections new_m_obs, m_cost, exploit_successful, root_credential, service = \ ExploitUtil.cve_2015_5602_helper( s=s, a=a, machine=machine, result=new_m_obs) new_machines_obs.append(new_m_obs.copy()) # Total cost update total_cost += m_cost except Exception as e: Logger.__call__().get_logger().debug(f"{str(e)}, {repr(e)}") attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, new_machines_obs, action=a, emulation_env_config=s.emulation_env_config) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime