Source code for csle_attacker.emulation.util.exploit_util

from typing import Tuple
import time
import paramiko
import csle_common.constants.constants as constants
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_observation.common.emulation_connection_observation_state \
    import EmulationConnectionObservationState
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
    import EmulationAttackerMachineObservationState
from csle_common.dao.emulation_observation.common.emulation_vulnerability_observation_state \
    import EmulationVulnerabilityObservationState
from csle_common.dao.emulation_config.transport_protocol import TransportProtocol
from csle_common.dao.emulation_config.credential import Credential
from csle_common.util.emulation_util import EmulationUtil
from csle_common.util.connection_util import ConnectionUtil
from csle_common.logging.log import Logger


[docs]class ExploitUtil: """ Class containing utility functions for the exploit-related functionality to the emulation """
[docs] @staticmethod def sambacry_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Helper for executing the SambaCry exploit action :param s: the current state :param a: the SambaCry exploit action :return: s_prime """ # Extract target machine target_machine = EmulationAttackerMachineObservationState(ips=a.ips) total_time = 0.0 exploit_successful = False cost = 0.0 for ip in target_machine.ips: try: jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_SAMBACRY): cost = 0 cmd = a.cmds[0] cmd = cmd.format(ip) outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) Logger.__call__().get_logger().warning( f"Samba command: {cmd}, out: {outdata.decode()}, err: {errdata.decode()}") # Parse result and check outcome exploit_successful = (constants.SAMBA.ALREADY_EXISTS in outdata.decode() or (constants.SAMBA.ERROR not in outdata.decode() and constants.SAMBA.ERROR not in errdata.decode() and constants.SAMBA.AUTH_OK in outdata.decode())) if exploit_successful: break else: time.sleep(constants.ENV_CONSTANTS.ATTACKER_SAMBACRY_SLEEP_RETRY) if exploit_successful: break except Exception as e: Logger.__call__().get_logger().warning( f"There was an error executing the sambacry exploit: {str(e)} {repr(e)}") # Parse Result if exploit_successful: Logger.__call__().get_logger().info(f"The SambaCry exploit against machine: {target_machine.ips[0]} " f"was successful, cmd: {a.cmds[0].format(target_machine.ips[0])}") # Exploit successful credential = Credential(username=constants.SAMBA.BACKDOOR_USER, pw=constants.SAMBA.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.SAMBACRY_EXPLOIT, cvss=constants.EXPLOIT_VULNERABILITES.SAMBACRY_CVSS, credentials=[Credential(username=constants.SAMBA.BACKDOOR_USER, pw=constants.SAMBA.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) ], port=constants.SAMBA.PORT, protocol=TransportProtocol.TCP, service=constants.SAMBA.SERVICE_NAME) target_machine.shell_access = True target_machine.untried_credentials = True target_machine.shell_access_credentials.append(credential) target_machine.backdoor_credentials.append(credential) target_machine.backdoor_tried = True target_machine.backdoor_installed = True target_machine.cve_vulns.append(vuln) # Measure cost and alerts cost += float(total_time) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) else: Logger.__call__().get_logger().info(f"The SambaCry exploit against machine: {target_machine.ips[0]} " f"was unsuccessful") target_machine.sambacry_tried = True attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [target_machine.copy()], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def shellshock_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Helper for executing the ShellShock exploit action :param s: the current state :param a: the Shellshock exploit action :return: s_prime """ target_machine = EmulationAttackerMachineObservationState(ips=a.ips) exploit_successful = False cost = 0.0 total_time = 0.0 for ip in target_machine.ips: try: jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_SHELLSHOCK): # Try execute exploit from jumphost cost = 0 cmd = a.cmds[0] cmd = cmd.format(constants.SHELLSHOCK.BACKDOOR_USER, constants.SHELLSHOCK.BACKDOOR_PW, constants.SHELLSHOCK.BACKDOOR_USER, ip) outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) Logger.__call__().get_logger().info(f"Shellshock cmd: {cmd}, out: {outdata.decode()}, " f"err:{errdata.decode()}") cmd = a.cmds[1] cmd = cmd.format(constants.SHELLSHOCK.BACKDOOR_USER, constants.SHELLSHOCK.BACKDOOR_PW, a.ips) outdata1, errdata1, total_time1 = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) total_time = total_time + total_time1 # Parse Result proxy_conn = EmulationConnectionObservationState( conn=jump_connection.conn, credential=jump_connection.credential, root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME, proxy=None, ip=s.emulation_env_config.containers_config.agent_ip) exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded( user=constants.SHELLSHOCK.BACKDOOR_USER, pw=constants.SHELLSHOCK.BACKDOOR_PW, source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn) if exploit_successful: break else: time.sleep(constants.ENV_CONSTANTS.ATTACKER_SHELLSHOCK_SLEEP_RETRY) if exploit_successful: break except Exception as e: Logger.__call__().get_logger().warning(f"There was an error executing the shellshock exploit: " f"{str(e)}, {repr(e)}") if exploit_successful: Logger.__call__().get_logger().info(f"The Shellshock exploit against machine: {target_machine.ips[0]} " f"was successful") # Exploit successful credential = Credential(username=constants.SHELLSHOCK.BACKDOOR_USER, pw=constants.SHELLSHOCK.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.SHELLSHOCK_EXPLOIT, cvss=constants.EXPLOIT_VULNERABILITES.SHELLSHOCK_CVSS, credentials=[Credential(username=constants.SHELLSHOCK.BACKDOOR_USER, pw=constants.SHELLSHOCK.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) ], port=constants.SHELLSHOCK.PORT, protocol=TransportProtocol.TCP, service=constants.SHELLSHOCK.SERVICE_NAME) target_machine.shell_access = True target_machine.untried_credentials = True target_machine.shell_access_credentials.append(credential) target_machine.backdoor_credentials.append(credential) target_machine.backdoor_tried = True target_machine.backdoor_installed = True target_machine.cve_vulns.append(vuln) else: Logger.__call__().get_logger().info(f"The Shellshock exploit against machine: {target_machine.ips[0]} " f"was unsuccessful") # Measure cost and alerts cost += float(total_time) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) target_machine.shellshock_tried = True attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [target_machine.copy()], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def check_if_rce_exploit_succeeded(user: str, pw: str, source_ip: str, port: int, target_ip: str, proxy_conn) -> bool: """ Utility function for checking if a RCE exploit succeeded or not, i.e. if we got shell access :param user: the user that should have access :param pw: the password :param source_ip: the ip of the source node of the attack :param port: the port on the target :param target_ip: the target ip :param proxy_conn: optional proxy connection (jumphost) :return: True if the exploit succeeded, otherwise false """ agent_addr = (source_ip, port) target_addr = (target_ip, port) agent_transport = proxy_conn.conn.get_transport() try: relay_channel = agent_transport.open_channel(constants.SSH.DIRECT_CHANNEL, target_addr, agent_addr, timeout=3) target_conn = paramiko.SSHClient() target_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy()) target_conn.connect(target_ip, username=user, password=pw, sock=relay_channel) transport = target_conn.get_transport() if transport is not None: transport.set_keepalive(5) return True except Exception as e: Logger.__call__().get_logger().debug( f"There was an error reading the result of an RCE exploit: {str(e)}, {repr(e)}") return False
[docs] @staticmethod def dvwa_sql_injection_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Helper for executing the DVWA SQL Injection exploit action :param s: the current state :param a: the DVWA SQL Injection exploit action :return: s_prime """ target_machine = EmulationAttackerMachineObservationState(ips=a.ips) cost = 0.0 total_time = 0.0 exploit_successful = False for ip in target_machine.ips: try: jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_DVWA_SQL_INJECTION): cmd = a.cmds[0] cmd = cmd + " " + ip outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) Logger.__call__().get_logger().info(f"Sql injeciton cmd: {cmd}, out: {outdata.decode()}, " f"err:{errdata.decode()}") # Parse Result exploit_successful = False dir = "/home/" + jump_connection.credential.username + "/" try: exploit_result = ExploitUtil.read_dvwa_sql_injection_result(conn=jump_connection.conn, dir=dir) pw = exploit_result[exploit_result.find("pablo:") + 6: exploit_result.find("pablo:") + 6 + exploit_result[exploit_result.find("pablo:") + 6:].find("<")] if "0d107" in pw: exploit_successful = True else: exploit_successful = False pw = "-" except Exception as e: Logger.__call__().get_logger().warning( f"There was an error executing the SQL injection exploit: {str(e)}, {repr(e)}") exploit_successful = False pw = "-" if exploit_successful: Logger.__call__().get_logger().info(f"SQL injection against: {target_machine.ips[0]} " f"was successful") ExploitUtil.remove_dvwa_sql_injection_result(conn=jump_connection.conn, dir=dir) # Exploit successful credential = Credential(username=constants.DVWA_SQL_INJECTION.EXPLOIT_USER, pw=pw, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.DVWA_SQL_INJECTION, cvss=9.8, credentials=[Credential(username=constants.DVWA_SQL_INJECTION.EXPLOIT_USER, pw=pw, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) ], port=constants.DVWA_SQL_INJECTION.PORT, protocol=TransportProtocol.TCP, service=constants.DVWA_SQL_INJECTION.SERVICE_NAME) target_machine.shell_access = True target_machine.untried_credentials = True target_machine.shell_access_credentials.append(credential) target_machine.backdoor_credentials.append(credential) target_machine.backdoor_tried = True target_machine.backdoor_installed = True target_machine.cve_vulns.append(vuln) # Measure cost and alerts cost += float(total_time) break else: Logger.__call__().get_logger().info(f"SQL injection against: {target_machine.ips[0]} " f"was not successful") time.sleep(constants.ENV_CONSTANTS.ATTACKER_RETRY_DVWA_SQL_INJECTION) if exploit_successful: break except Exception as e: Logger.__call__().get_logger().warning( f"There was an error executing the SQL injection exploit: {e}, {repr(e)}") EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) target_machine.dvwa_sql_injection_tried = True attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [target_machine.copy()], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def read_dvwa_sql_injection_result(conn, dir: str) -> str: """ Reads the result of a DVWA SQL Injection Attack :param conn: connection to use for reading :param dir: the dir to check :return: the read result """ sftp_client = conn.open_sftp() remote_file = sftp_client.open(dir + constants.DVWA_SQL_INJECTION.EXPLOIT_OUTPUT_FILENAME) try: data = remote_file.read() result = data.decode() finally: remote_file.close() return str(result)
[docs] @staticmethod def remove_dvwa_sql_injection_result(conn, dir: str) -> None: """ Remove the result of a DVWA SQL Injection Attack :param conn: connection to use for the removal :param dir: the directory where the file is :return: None """ sftp_client = conn.open_sftp() sftp_client.remove(dir + constants.DVWA_SQL_INJECTION.EXPLOIT_OUTPUT_FILENAME)
[docs] @staticmethod def cve_2015_3306_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Helper for executing the CVE-2015-3306 exploit action :param s: the current state :param a: the CVE-2015-3306 exploit action :return: s_prime """ # Extract target machine target_machine = EmulationAttackerMachineObservationState(ips=a.ips) exploit_successful = False cost = 0.0 total_time = 0.0 for ip in target_machine.ips: try: jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2015_3306): cost = 0 cmd = a.cmds[0] cmd = cmd.format(a.ips) outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) Logger.__call__().get_logger().info(f"CVE-2015-3306, cmd: {cmd}, out: {outdata.decode()}, " f"err: {errdata.decode()}") proxy_conn = EmulationConnectionObservationState( conn=jump_connection.conn, credential=jump_connection.credential, root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME, proxy=None, ip=s.emulation_env_config.containers_config.agent_ip) exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded( user=constants.CVE_2015_3306.BACKDOOR_USER, pw=constants.CVE_2015_3306.BACKDOOR_PW, source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn) if exploit_successful: break else: time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_3306_SLEEP_RETRY) if exploit_successful: break except Exception as e: Logger.__call__().get_logger().warning( f"There was an error executing the CVE-2015-3306 exploit: {e}, {repr(e)}") if exploit_successful: Logger.__call__().get_logger().info(f"CVE-2015-3306 against target: {target_machine.ips[0]} " f"was successful") # Exploit successful credential = Credential(username=constants.CVE_2015_3306.BACKDOOR_USER, pw=constants.CVE_2015_3306.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.CVE_2015_3306, cvss=constants.EXPLOIT_VULNERABILITES.CVE_2015_3306_CVSS, credentials=[Credential(username=constants.CVE_2015_3306.BACKDOOR_USER, pw=constants.CVE_2015_3306.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) ], port=constants.CVE_2015_3306.PORT, protocol=TransportProtocol.TCP, service=constants.CVE_2015_3306.SERVICE_NAME) target_machine.shell_access = True target_machine.untried_credentials = True target_machine.shell_access_credentials.append(credential) target_machine.backdoor_credentials.append(credential) target_machine.backdoor_tried = True target_machine.backdoor_installed = True target_machine.cve_vulns.append(vuln) else: Logger.__call__().get_logger().info(f"CVE-2015-3306 against target: {target_machine.ips[0]} " f"was not successful") # Measure cost and alerts cost += float(total_time) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) target_machine.cve_2015_3306_tried = True attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [target_machine.copy()], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def cve_2015_1427_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Helper for executing the CVE-2015-1427 exploit action :param s: the current state :param a: the CVE-2015-1427 exploit action :return: s_prime """ # Extract target machine target_machine = EmulationAttackerMachineObservationState(ips=a.ips) cost = 0.0 total_time = 0.0 exploit_successful = False for ip in target_machine.ips: try: jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2015_1427): cost = 0 cmd = a.cmds[0] cmd = cmd.format(a.ips) outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) Logger.__call__().get_logger().info(f"CVE-2015-1427 cmd:{cmd}, out:{outdata.decode()}, " f"err: {errdata.decode()}") # Parse Result proxy_conn = EmulationConnectionObservationState( conn=jump_connection.conn, credential=jump_connection.credential, root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME, proxy=None, ip=s.emulation_env_config.containers_config.agent_ip) exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded( user=constants.CVE_2015_1427.BACKDOOR_USER, pw=constants.CVE_2015_1427.BACKDOOR_PW, source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn) if exploit_successful: break else: time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_1427_SLEEP_RETRY) if exploit_successful: break except Exception as e: Logger.__call__().get_logger().warning( f"There was an error executing the CVE-2015-1427 exploit: {e}, {repr(e)}") if exploit_successful: Logger.__call__().get_logger().info(f"CVE-2015-1427 against target: {target_machine.ips[0]} was successful") # Exploit successful credential = Credential(username=constants.CVE_2015_1427.BACKDOOR_USER, pw=constants.CVE_2015_1427.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.CVE_2015_1427, cvss=constants.EXPLOIT_VULNERABILITES.CVE_2015_1427_CVSS, credentials=[Credential(username=constants.CVE_2015_1427.BACKDOOR_USER, pw=constants.CVE_2015_1427.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) ], port=constants.CVE_2015_1427.PORT, protocol=TransportProtocol.TCP, service=constants.CVE_2015_1427.SERVICE_NAME) target_machine.shell_access = True target_machine.untried_credentials = True target_machine.shell_access_credentials.append(credential) target_machine.backdoor_credentials.append(credential) target_machine.backdoor_tried = True target_machine.backdoor_installed = True target_machine.cve_vulns.append(vuln) else: Logger.__call__().get_logger().info(f"CVE-2015-1427 against target: {target_machine.ips[0]} " f"was not successful") # Measure cost and alerts cost += float(total_time) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) target_machine.cve_2015_1427_tried = True attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [target_machine.copy()], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def cve_2016_10033_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Helper for executing the CVE-2016-10033 exploit action :param s: the current state :param a: the CVE-2016-10033 exploit action :return: s_prime """ # Extract target machine target_machine = EmulationAttackerMachineObservationState(ips=a.ips) cost = 0.0 total_time = 0.0 exploit_successful = False for ip in target_machine.ips: try: jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s, emulation_env_config=s.emulation_env_config) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2016_10033): cost = 0 cmd = a.cmds[0] cmd = cmd.format(a.ips) outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=jump_connection.conn) Logger.__call__().get_logger().info(f"CVE-2016-10033 cmd:{cmd}, out: {outdata.decode()}, " f"err: {errdata.decode()}") # Parse Result proxy_conn = EmulationConnectionObservationState( conn=jump_connection.conn, credential=jump_connection.credential, root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME, proxy=None, ip=s.emulation_env_config.containers_config.agent_ip) exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded( user=constants.CVE_2016_10033.BACKDOOR_USER, pw=constants.CVE_2016_10033.BACKDOOR_PW, source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn) if exploit_successful: break else: time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2016_10033_SLEEP_RETRY) if exploit_successful: break except Exception as e: Logger.__call__().get_logger().warning( f"There was an exception executing exploit CVE-2016-10033: {e}, {repr(e)}") if exploit_successful: Logger.__call__().get_logger().info(f"CVE-2016-10033 against {target_machine.ips[0]} was succcessful") # Exploit successful credential = Credential(username=constants.CVE_2016_10033.BACKDOOR_USER, pw=constants.CVE_2016_10033.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.CVE_2016_10033, cvss=constants.EXPLOIT_VULNERABILITES.CVE_2016_10033_CVSS, credentials=[Credential(username=constants.CVE_2016_10033.BACKDOOR_USER, pw=constants.CVE_2016_10033.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) ], port=constants.CVE_2016_10033.PORT, protocol=TransportProtocol.TCP, service=constants.CVE_2016_10033.SERVICE_NAME) target_machine.shell_access = True target_machine.untried_credentials = True target_machine.shell_access_credentials.append(credential) target_machine.backdoor_credentials.append(credential) target_machine.backdoor_tried = True target_machine.backdoor_installed = True target_machine.cve_vulns.append(vuln) else: Logger.__call__().get_logger().info(f"CVE-2016-10033 against {target_machine.ips[0]} was not succcessful") # Measure cost and alerts cost += float(total_time) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) target_machine.cve_2016_10033_tried = True attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [target_machine.copy()], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def cve_2010_0426_helper(s: EmulationEnvState, a: EmulationAttackerAction, machine: EmulationAttackerMachineObservationState, result: EmulationAttackerMachineObservationState) \ -> Tuple[EmulationAttackerMachineObservationState, float, bool, Credential, str]: """ Helper for executing the CVE-2010-0426 privelege_escalation action :param s: the current state :param a: the CVE-2010-0425 privilege escalation action :return: new_machine_obs, cost, successful, root_credential, service_name """ cost = 0.0 e_succ = False root_credential = None service = "" sleep_time = 0.5 # Start with SSH connections for ssh_conn in machine.ssh_connections: for ip in a.ips: try: proxy_conn = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2010_0426): # Setup interactive shell channel = ssh_conn.conn.invoke_shell() # clear output if channel.recv_ready(): channel.recv(constants.COMMON.DEFAULT_RECV_SIZE) # Exploit cmd = a.cmds[0].format(constants.CVE_2010_0426.EXPLOIT_FILE) e_succ = False try: EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel) time.sleep(sleep_time) EmulationUtil.read_result_interactive_channel( emulation_env_config=s.emulation_env_config, channel=channel) cmd = a.cmds[1] EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel) time.sleep(sleep_time) EmulationUtil.read_result_interactive_channel( emulation_env_config=s.emulation_env_config, channel=channel) e_succ = ExploitUtil.check_if_rce_exploit_succeeded( user=constants.CVE_2010_0426.BACKDOOR_USER, pw=constants.CVE_2010_0426.BACKDOOR_PW, source_ip=s.emulation_env_config.containers_config.agent_ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn) except Exception as e: Logger.__call__().get_logger().warning("CVE-2010-0426 error:{}".format(str(e))) e_succ = False if e_succ: Logger.__call__().get_logger().info("CVE-2010-0426 was succcessful") root_credential = Credential( username=constants.CVE_2010_0426.BACKDOOR_USER, pw=constants.CVE_2010_0426.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) root_connection = EmulationUtil.setup_custom_connection( user=constants.CVE_2010_0426.BACKDOOR_USER, pw=constants.CVE_2010_0426.BACKDOOR_PW, source_ip=s.emulation_env_config.containers_config.agent_ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn, root=True) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.CVE_2010_0426, cvss=constants.EXPLOIT_VULNERABILITES.CVE_2010_0426_CVSS, credentials=[root_credential], port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=root_credential.service) result.cve_vulns.append(vuln) result.cve_2010_0426_tried = True result.shell_access_credentials.append(root_credential) result.backdoor_credentials.append(root_credential) result.backdoor_tried = True result.backdoor_installed = True result.shell_access = True result.logged_in = True result.root = True result.ssh_connections.append(root_connection) service = constants.SSH.SERVICE_NAME break else: Logger.__call__().get_logger().info("CVE-2010-0426 against was not succcessful") time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2010_0426_SLEEP_RETRY) if e_succ: break except Exception as e: Logger.__call__().get_logger().warning( f"Exception occurred during execution of exploit CVE-2010-0426, {str(e)}, {repr(e)}") # Measure cost and alerts cost += float(sleep_time * 2) EmulationUtil.log_measured_action_time(total_time=cost, action=a, emulation_env_config=s.emulation_env_config) result.cve_2010_0426_tried = True return result, cost, e_succ, root_credential, service
[docs] @staticmethod def cve_2015_5602_helper(s: EmulationEnvState, a: EmulationAttackerAction, machine: EmulationAttackerMachineObservationState, result: EmulationAttackerMachineObservationState) \ -> Tuple[EmulationAttackerMachineObservationState, float, bool, Credential, str]: """ Helper for executing the CVE-2015-5602 privilege escalation action :param s: the current state :param a: the CVE-2015-5602 privilege escalation action :return: s_prime, cost, e_succ (flag), credentials, servicename """ cost = 0.0 e_succ = False root_credential = None service = "" sleep_time = 0.5 # Start with SSH connections for ssh_conn in machine.ssh_connections: for ip in a.ips: proxy_conn = ConnectionUtil.find_jump_host_connection(ip=ip, s=s) for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2015_5602): # Setup interactive shell channel = ssh_conn.conn.invoke_shell() # clear output if channel.recv_ready(): channel.recv(constants.COMMON.DEFAULT_RECV_SIZE) # Exploit cmd = a.cmds[0] try: EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel) time.sleep(sleep_time) cost += float(sleep_time) EmulationUtil.read_result_interactive_channel( emulation_env_config=s.emulation_env_config, channel=channel) cmd = a.cmds[1] EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel) time.sleep(sleep_time) cost += float(sleep_time) EmulationUtil.read_result_interactive_channel( emulation_env_config=s.emulation_env_config, channel=channel) cmd = a.cmds[2] EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel) time.sleep(sleep_time) cost += float(sleep_time) EmulationUtil.read_result_interactive_channel( emulation_env_config=s.emulation_env_config, channel=channel) cmd = a.cmds[3] EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel) time.sleep(sleep_time) cost += float(sleep_time) EmulationUtil.read_result_interactive_channel( emulation_env_config=s.emulation_env_config, channel=channel) e_succ = ExploitUtil.check_if_rce_exploit_succeeded( user=constants.CVE_2015_5602.BACKDOOR_USER, pw=constants.CVE_2015_5602.BACKDOOR_PW, source_ip=s.emulation_env_config.containers_config.agent_ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn) except Exception as e: Logger.__call__().get_logger().warning(f"CVE-2015-5602 error:{str(e)}, {repr(e)}") e_succ = False if e_succ: Logger.__call__().get_logger().info("CVE-2015-5602 was succcessful") root_credential = Credential(username=constants.CVE_2015_5602.BACKDOOR_USER, pw=constants.CVE_2015_5602.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME) root_connection = EmulationUtil.setup_custom_connection( user=constants.CVE_2015_5602.BACKDOOR_USER, pw=constants.CVE_2015_5602.BACKDOOR_PW, source_ip=s.emulation_env_config.containers_config.agent_ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip, proxy_conn=proxy_conn, root=True) vuln = EmulationVulnerabilityObservationState( name=constants.EXPLOIT_VULNERABILITES.CVE_2015_5602, cvss=constants.EXPLOIT_VULNERABILITES.CVE_2015_5602_CVSS, credentials=[root_credential], port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP, service=root_credential.service) result.cve_vulns.append(vuln) result.cve_2015_5602_tried = True result.shell_access_credentials.append(root_credential) result.backdoor_credentials.append(root_credential) result.backdoor_tried = True result.backdoor_installed = True result.shell_access = True result.logged_in = True result.root = True result.ssh_connections.append(root_connection) service = constants.SSH.SERVICE_NAME else: time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_5602_SLEEP_RETRY) cost += float(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_5602_SLEEP_RETRY) # Measure cost and alerts cost += float(sleep_time * 2) EmulationUtil.log_measured_action_time(total_time=cost, action=a, emulation_env_config=s.emulation_env_config) result.cve_2015_5602_tried = True return result, cost, e_succ, root_credential, service