from typing import Tuple
import time
import paramiko
import csle_common.constants.constants as constants
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_observation.common.emulation_connection_observation_state \
import EmulationConnectionObservationState
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
import EmulationAttackerMachineObservationState
from csle_common.dao.emulation_observation.common.emulation_vulnerability_observation_state \
import EmulationVulnerabilityObservationState
from csle_common.dao.emulation_config.transport_protocol import TransportProtocol
from csle_common.dao.emulation_config.credential import Credential
from csle_common.util.emulation_util import EmulationUtil
from csle_common.util.connection_util import ConnectionUtil
from csle_common.logging.log import Logger
[docs]class ExploitUtil:
"""
Class containing utility functions for the exploit-related functionality to the emulation
"""
[docs] @staticmethod
def sambacry_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Helper for executing the SambaCry exploit action
:param s: the current state
:param a: the SambaCry exploit action
:return: s_prime
"""
# Extract target machine
target_machine = EmulationAttackerMachineObservationState(ips=a.ips)
total_time = 0.0
exploit_successful = False
cost = 0.0
for ip in target_machine.ips:
try:
jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_SAMBACRY):
cost = 0
cmd = a.cmds[0]
cmd = cmd.format(ip)
outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
Logger.__call__().get_logger().warning(
f"Samba command: {cmd}, out: {outdata.decode()}, err: {errdata.decode()}")
# Parse result and check outcome
exploit_successful = (constants.SAMBA.ALREADY_EXISTS in outdata.decode() or
(constants.SAMBA.ERROR not in outdata.decode() and
constants.SAMBA.ERROR not in errdata.decode() and
constants.SAMBA.AUTH_OK in outdata.decode()))
if exploit_successful:
break
else:
time.sleep(constants.ENV_CONSTANTS.ATTACKER_SAMBACRY_SLEEP_RETRY)
if exploit_successful:
break
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an error executing the sambacry exploit: {str(e)} {repr(e)}")
# Parse Result
if exploit_successful:
Logger.__call__().get_logger().info(f"The SambaCry exploit against machine: {target_machine.ips[0]} "
f"was successful, cmd: {a.cmds[0].format(target_machine.ips[0])}")
# Exploit successful
credential = Credential(username=constants.SAMBA.BACKDOOR_USER, pw=constants.SAMBA.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.SAMBACRY_EXPLOIT,
cvss=constants.EXPLOIT_VULNERABILITES.SAMBACRY_CVSS,
credentials=[Credential(username=constants.SAMBA.BACKDOOR_USER, pw=constants.SAMBA.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
], port=constants.SAMBA.PORT, protocol=TransportProtocol.TCP,
service=constants.SAMBA.SERVICE_NAME)
target_machine.shell_access = True
target_machine.untried_credentials = True
target_machine.shell_access_credentials.append(credential)
target_machine.backdoor_credentials.append(credential)
target_machine.backdoor_tried = True
target_machine.backdoor_installed = True
target_machine.cve_vulns.append(vuln)
# Measure cost and alerts
cost += float(total_time)
EmulationUtil.log_measured_action_time(total_time=total_time, action=a,
emulation_env_config=s.emulation_env_config)
else:
Logger.__call__().get_logger().info(f"The SambaCry exploit against machine: {target_machine.ips[0]} "
f"was unsuccessful")
target_machine.sambacry_tried = True
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, [target_machine.copy()],
emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
return s_prime
[docs] @staticmethod
def shellshock_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Helper for executing the ShellShock exploit action
:param s: the current state
:param a: the Shellshock exploit action
:return: s_prime
"""
target_machine = EmulationAttackerMachineObservationState(ips=a.ips)
exploit_successful = False
cost = 0.0
total_time = 0.0
for ip in target_machine.ips:
try:
jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_SHELLSHOCK):
# Try execute exploit from jumphost
cost = 0
cmd = a.cmds[0]
cmd = cmd.format(constants.SHELLSHOCK.BACKDOOR_USER, constants.SHELLSHOCK.BACKDOOR_PW,
constants.SHELLSHOCK.BACKDOOR_USER, ip)
outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
Logger.__call__().get_logger().info(f"Shellshock cmd: {cmd}, out: {outdata.decode()}, "
f"err:{errdata.decode()}")
cmd = a.cmds[1]
cmd = cmd.format(constants.SHELLSHOCK.BACKDOOR_USER, constants.SHELLSHOCK.BACKDOOR_PW, a.ips)
outdata1, errdata1, total_time1 = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
total_time = total_time + total_time1
# Parse Result
proxy_conn = EmulationConnectionObservationState(
conn=jump_connection.conn, credential=jump_connection.credential,
root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME,
proxy=None, ip=s.emulation_env_config.containers_config.agent_ip)
exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded(
user=constants.SHELLSHOCK.BACKDOOR_USER, pw=constants.SHELLSHOCK.BACKDOOR_PW,
source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT,
target_ip=ip, proxy_conn=proxy_conn)
if exploit_successful:
break
else:
time.sleep(constants.ENV_CONSTANTS.ATTACKER_SHELLSHOCK_SLEEP_RETRY)
if exploit_successful:
break
except Exception as e:
Logger.__call__().get_logger().warning(f"There was an error executing the shellshock exploit: "
f"{str(e)}, {repr(e)}")
if exploit_successful:
Logger.__call__().get_logger().info(f"The Shellshock exploit against machine: {target_machine.ips[0]} "
f"was successful")
# Exploit successful
credential = Credential(username=constants.SHELLSHOCK.BACKDOOR_USER,
pw=constants.SHELLSHOCK.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.SHELLSHOCK_EXPLOIT,
cvss=constants.EXPLOIT_VULNERABILITES.SHELLSHOCK_CVSS,
credentials=[Credential(username=constants.SHELLSHOCK.BACKDOOR_USER,
pw=constants.SHELLSHOCK.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
], port=constants.SHELLSHOCK.PORT, protocol=TransportProtocol.TCP,
service=constants.SHELLSHOCK.SERVICE_NAME)
target_machine.shell_access = True
target_machine.untried_credentials = True
target_machine.shell_access_credentials.append(credential)
target_machine.backdoor_credentials.append(credential)
target_machine.backdoor_tried = True
target_machine.backdoor_installed = True
target_machine.cve_vulns.append(vuln)
else:
Logger.__call__().get_logger().info(f"The Shellshock exploit against machine: {target_machine.ips[0]} "
f"was unsuccessful")
# Measure cost and alerts
cost += float(total_time)
EmulationUtil.log_measured_action_time(total_time=total_time, action=a,
emulation_env_config=s.emulation_env_config)
target_machine.shellshock_tried = True
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, [target_machine.copy()],
emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
return s_prime
[docs] @staticmethod
def check_if_rce_exploit_succeeded(user: str, pw: str, source_ip: str, port: int, target_ip: str,
proxy_conn) -> bool:
"""
Utility function for checking if a RCE exploit succeeded or not, i.e. if we got shell access
:param user: the user that should have access
:param pw: the password
:param source_ip: the ip of the source node of the attack
:param port: the port on the target
:param target_ip: the target ip
:param proxy_conn: optional proxy connection (jumphost)
:return: True if the exploit succeeded, otherwise false
"""
agent_addr = (source_ip, port)
target_addr = (target_ip, port)
agent_transport = proxy_conn.conn.get_transport()
try:
relay_channel = agent_transport.open_channel(constants.SSH.DIRECT_CHANNEL, target_addr, agent_addr,
timeout=3)
target_conn = paramiko.SSHClient()
target_conn.set_missing_host_key_policy(paramiko.AutoAddPolicy())
target_conn.connect(target_ip, username=user, password=pw, sock=relay_channel)
transport = target_conn.get_transport()
if transport is not None:
transport.set_keepalive(5)
return True
except Exception as e:
Logger.__call__().get_logger().debug(
f"There was an error reading the result of an RCE exploit: {str(e)}, {repr(e)}")
return False
[docs] @staticmethod
def dvwa_sql_injection_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Helper for executing the DVWA SQL Injection exploit action
:param s: the current state
:param a: the DVWA SQL Injection exploit action
:return: s_prime
"""
target_machine = EmulationAttackerMachineObservationState(ips=a.ips)
cost = 0.0
total_time = 0.0
exploit_successful = False
for ip in target_machine.ips:
try:
jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_DVWA_SQL_INJECTION):
cmd = a.cmds[0]
cmd = cmd + " " + ip
outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
Logger.__call__().get_logger().info(f"Sql injeciton cmd: {cmd}, out: {outdata.decode()}, "
f"err:{errdata.decode()}")
# Parse Result
exploit_successful = False
dir = "/home/" + jump_connection.credential.username + "/"
try:
exploit_result = ExploitUtil.read_dvwa_sql_injection_result(conn=jump_connection.conn,
dir=dir)
pw = exploit_result[exploit_result.find("pablo:") + 6:
exploit_result.find("pablo:") + 6 +
exploit_result[exploit_result.find("pablo:") + 6:].find("<")]
if "0d107" in pw:
exploit_successful = True
else:
exploit_successful = False
pw = "-"
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an error executing the SQL injection exploit: {str(e)}, {repr(e)}")
exploit_successful = False
pw = "-"
if exploit_successful:
Logger.__call__().get_logger().info(f"SQL injection against: {target_machine.ips[0]} "
f"was successful")
ExploitUtil.remove_dvwa_sql_injection_result(conn=jump_connection.conn,
dir=dir)
# Exploit successful
credential = Credential(username=constants.DVWA_SQL_INJECTION.EXPLOIT_USER,
pw=pw,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.DVWA_SQL_INJECTION,
cvss=9.8,
credentials=[Credential(username=constants.DVWA_SQL_INJECTION.EXPLOIT_USER,
pw=pw,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
], port=constants.DVWA_SQL_INJECTION.PORT, protocol=TransportProtocol.TCP,
service=constants.DVWA_SQL_INJECTION.SERVICE_NAME)
target_machine.shell_access = True
target_machine.untried_credentials = True
target_machine.shell_access_credentials.append(credential)
target_machine.backdoor_credentials.append(credential)
target_machine.backdoor_tried = True
target_machine.backdoor_installed = True
target_machine.cve_vulns.append(vuln)
# Measure cost and alerts
cost += float(total_time)
break
else:
Logger.__call__().get_logger().info(f"SQL injection against: {target_machine.ips[0]} "
f"was not successful")
time.sleep(constants.ENV_CONSTANTS.ATTACKER_RETRY_DVWA_SQL_INJECTION)
if exploit_successful:
break
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an error executing the SQL injection exploit: {e}, {repr(e)}")
EmulationUtil.log_measured_action_time(total_time=total_time, action=a,
emulation_env_config=s.emulation_env_config)
target_machine.dvwa_sql_injection_tried = True
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, [target_machine.copy()],
emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
return s_prime
[docs] @staticmethod
def read_dvwa_sql_injection_result(conn, dir: str) -> str:
"""
Reads the result of a DVWA SQL Injection Attack
:param conn: connection to use for reading
:param dir: the dir to check
:return: the read result
"""
sftp_client = conn.open_sftp()
remote_file = sftp_client.open(dir + constants.DVWA_SQL_INJECTION.EXPLOIT_OUTPUT_FILENAME)
try:
data = remote_file.read()
result = data.decode()
finally:
remote_file.close()
return str(result)
[docs] @staticmethod
def remove_dvwa_sql_injection_result(conn, dir: str) -> None:
"""
Remove the result of a DVWA SQL Injection Attack
:param conn: connection to use for the removal
:param dir: the directory where the file is
:return: None
"""
sftp_client = conn.open_sftp()
sftp_client.remove(dir + constants.DVWA_SQL_INJECTION.EXPLOIT_OUTPUT_FILENAME)
[docs] @staticmethod
def cve_2015_3306_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Helper for executing the CVE-2015-3306 exploit action
:param s: the current state
:param a: the CVE-2015-3306 exploit action
:return: s_prime
"""
# Extract target machine
target_machine = EmulationAttackerMachineObservationState(ips=a.ips)
exploit_successful = False
cost = 0.0
total_time = 0.0
for ip in target_machine.ips:
try:
jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2015_3306):
cost = 0
cmd = a.cmds[0]
cmd = cmd.format(a.ips)
outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
Logger.__call__().get_logger().info(f"CVE-2015-3306, cmd: {cmd}, out: {outdata.decode()}, "
f"err: {errdata.decode()}")
proxy_conn = EmulationConnectionObservationState(
conn=jump_connection.conn, credential=jump_connection.credential,
root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME,
proxy=None, ip=s.emulation_env_config.containers_config.agent_ip)
exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded(
user=constants.CVE_2015_3306.BACKDOOR_USER, pw=constants.CVE_2015_3306.BACKDOOR_PW,
source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip,
proxy_conn=proxy_conn)
if exploit_successful:
break
else:
time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_3306_SLEEP_RETRY)
if exploit_successful:
break
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an error executing the CVE-2015-3306 exploit: {e}, {repr(e)}")
if exploit_successful:
Logger.__call__().get_logger().info(f"CVE-2015-3306 against target: {target_machine.ips[0]} "
f"was successful")
# Exploit successful
credential = Credential(username=constants.CVE_2015_3306.BACKDOOR_USER,
pw=constants.CVE_2015_3306.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.CVE_2015_3306,
cvss=constants.EXPLOIT_VULNERABILITES.CVE_2015_3306_CVSS,
credentials=[Credential(username=constants.CVE_2015_3306.BACKDOOR_USER,
pw=constants.CVE_2015_3306.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
], port=constants.CVE_2015_3306.PORT, protocol=TransportProtocol.TCP,
service=constants.CVE_2015_3306.SERVICE_NAME)
target_machine.shell_access = True
target_machine.untried_credentials = True
target_machine.shell_access_credentials.append(credential)
target_machine.backdoor_credentials.append(credential)
target_machine.backdoor_tried = True
target_machine.backdoor_installed = True
target_machine.cve_vulns.append(vuln)
else:
Logger.__call__().get_logger().info(f"CVE-2015-3306 against target: {target_machine.ips[0]} "
f"was not successful")
# Measure cost and alerts
cost += float(total_time)
EmulationUtil.log_measured_action_time(total_time=total_time, action=a,
emulation_env_config=s.emulation_env_config)
target_machine.cve_2015_3306_tried = True
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, [target_machine.copy()],
emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
return s_prime
[docs] @staticmethod
def cve_2015_1427_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Helper for executing the CVE-2015-1427 exploit action
:param s: the current state
:param a: the CVE-2015-1427 exploit action
:return: s_prime
"""
# Extract target machine
target_machine = EmulationAttackerMachineObservationState(ips=a.ips)
cost = 0.0
total_time = 0.0
exploit_successful = False
for ip in target_machine.ips:
try:
jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2015_1427):
cost = 0
cmd = a.cmds[0]
cmd = cmd.format(a.ips)
outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
Logger.__call__().get_logger().info(f"CVE-2015-1427 cmd:{cmd}, out:{outdata.decode()}, "
f"err: {errdata.decode()}")
# Parse Result
proxy_conn = EmulationConnectionObservationState(
conn=jump_connection.conn, credential=jump_connection.credential,
root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME,
proxy=None, ip=s.emulation_env_config.containers_config.agent_ip)
exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded(
user=constants.CVE_2015_1427.BACKDOOR_USER, pw=constants.CVE_2015_1427.BACKDOOR_PW,
source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip,
proxy_conn=proxy_conn)
if exploit_successful:
break
else:
time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_1427_SLEEP_RETRY)
if exploit_successful:
break
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an error executing the CVE-2015-1427 exploit: {e}, {repr(e)}")
if exploit_successful:
Logger.__call__().get_logger().info(f"CVE-2015-1427 against target: {target_machine.ips[0]} was successful")
# Exploit successful
credential = Credential(username=constants.CVE_2015_1427.BACKDOOR_USER,
pw=constants.CVE_2015_1427.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.CVE_2015_1427,
cvss=constants.EXPLOIT_VULNERABILITES.CVE_2015_1427_CVSS,
credentials=[Credential(username=constants.CVE_2015_1427.BACKDOOR_USER,
pw=constants.CVE_2015_1427.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
], port=constants.CVE_2015_1427.PORT, protocol=TransportProtocol.TCP,
service=constants.CVE_2015_1427.SERVICE_NAME)
target_machine.shell_access = True
target_machine.untried_credentials = True
target_machine.shell_access_credentials.append(credential)
target_machine.backdoor_credentials.append(credential)
target_machine.backdoor_tried = True
target_machine.backdoor_installed = True
target_machine.cve_vulns.append(vuln)
else:
Logger.__call__().get_logger().info(f"CVE-2015-1427 against target: {target_machine.ips[0]} "
f"was not successful")
# Measure cost and alerts
cost += float(total_time)
EmulationUtil.log_measured_action_time(total_time=total_time, action=a,
emulation_env_config=s.emulation_env_config)
target_machine.cve_2015_1427_tried = True
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, [target_machine.copy()],
emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
return s_prime
[docs] @staticmethod
def cve_2016_10033_helper(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState:
"""
Helper for executing the CVE-2016-10033 exploit action
:param s: the current state
:param a: the CVE-2016-10033 exploit action
:return: s_prime
"""
# Extract target machine
target_machine = EmulationAttackerMachineObservationState(ips=a.ips)
cost = 0.0
total_time = 0.0
exploit_successful = False
for ip in target_machine.ips:
try:
jump_connection = ConnectionUtil.find_jump_host_connection(ip=ip, s=s,
emulation_env_config=s.emulation_env_config)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2016_10033):
cost = 0
cmd = a.cmds[0]
cmd = cmd.format(a.ips)
outdata, errdata, total_time = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=jump_connection.conn)
Logger.__call__().get_logger().info(f"CVE-2016-10033 cmd:{cmd}, out: {outdata.decode()}, "
f"err: {errdata.decode()}")
# Parse Result
proxy_conn = EmulationConnectionObservationState(
conn=jump_connection.conn, credential=jump_connection.credential,
root=True, port=constants.SSH.DEFAULT_PORT, service=constants.SSH.SERVICE_NAME,
proxy=None, ip=s.emulation_env_config.containers_config.agent_ip)
exploit_successful = ExploitUtil.check_if_rce_exploit_succeeded(
user=constants.CVE_2016_10033.BACKDOOR_USER, pw=constants.CVE_2016_10033.BACKDOOR_PW,
source_ip=jump_connection.ip, port=constants.SSH.DEFAULT_PORT, target_ip=ip,
proxy_conn=proxy_conn)
if exploit_successful:
break
else:
time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2016_10033_SLEEP_RETRY)
if exploit_successful:
break
except Exception as e:
Logger.__call__().get_logger().warning(
f"There was an exception executing exploit CVE-2016-10033: {e}, {repr(e)}")
if exploit_successful:
Logger.__call__().get_logger().info(f"CVE-2016-10033 against {target_machine.ips[0]} was succcessful")
# Exploit successful
credential = Credential(username=constants.CVE_2016_10033.BACKDOOR_USER,
pw=constants.CVE_2016_10033.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.CVE_2016_10033,
cvss=constants.EXPLOIT_VULNERABILITES.CVE_2016_10033_CVSS,
credentials=[Credential(username=constants.CVE_2016_10033.BACKDOOR_USER,
pw=constants.CVE_2016_10033.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT, protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
], port=constants.CVE_2016_10033.PORT, protocol=TransportProtocol.TCP,
service=constants.CVE_2016_10033.SERVICE_NAME)
target_machine.shell_access = True
target_machine.untried_credentials = True
target_machine.shell_access_credentials.append(credential)
target_machine.backdoor_credentials.append(credential)
target_machine.backdoor_tried = True
target_machine.backdoor_installed = True
target_machine.cve_vulns.append(vuln)
else:
Logger.__call__().get_logger().info(f"CVE-2016-10033 against {target_machine.ips[0]} was not succcessful")
# Measure cost and alerts
cost += float(total_time)
EmulationUtil.log_measured_action_time(total_time=total_time, action=a,
emulation_env_config=s.emulation_env_config)
target_machine.cve_2016_10033_tried = True
attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old(
s.attacker_obs_state.machines, [target_machine.copy()],
emulation_env_config=s.emulation_env_config, action=a)
s_prime = s
s_prime.attacker_obs_state.machines = attacker_machine_observations
return s_prime
[docs] @staticmethod
def cve_2010_0426_helper(s: EmulationEnvState, a: EmulationAttackerAction,
machine: EmulationAttackerMachineObservationState,
result: EmulationAttackerMachineObservationState) \
-> Tuple[EmulationAttackerMachineObservationState, float, bool, Credential, str]:
"""
Helper for executing the CVE-2010-0426 privelege_escalation action
:param s: the current state
:param a: the CVE-2010-0425 privilege escalation action
:return: new_machine_obs, cost, successful, root_credential, service_name
"""
cost = 0.0
e_succ = False
root_credential = None
service = ""
sleep_time = 0.5
# Start with SSH connections
for ssh_conn in machine.ssh_connections:
for ip in a.ips:
try:
proxy_conn = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2010_0426):
# Setup interactive shell
channel = ssh_conn.conn.invoke_shell()
# clear output
if channel.recv_ready():
channel.recv(constants.COMMON.DEFAULT_RECV_SIZE)
# Exploit
cmd = a.cmds[0].format(constants.CVE_2010_0426.EXPLOIT_FILE)
e_succ = False
try:
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel)
time.sleep(sleep_time)
EmulationUtil.read_result_interactive_channel(
emulation_env_config=s.emulation_env_config, channel=channel)
cmd = a.cmds[1]
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel)
time.sleep(sleep_time)
EmulationUtil.read_result_interactive_channel(
emulation_env_config=s.emulation_env_config, channel=channel)
e_succ = ExploitUtil.check_if_rce_exploit_succeeded(
user=constants.CVE_2010_0426.BACKDOOR_USER, pw=constants.CVE_2010_0426.BACKDOOR_PW,
source_ip=s.emulation_env_config.containers_config.agent_ip,
port=constants.SSH.DEFAULT_PORT,
target_ip=ip, proxy_conn=proxy_conn)
except Exception as e:
Logger.__call__().get_logger().warning("CVE-2010-0426 error:{}".format(str(e)))
e_succ = False
if e_succ:
Logger.__call__().get_logger().info("CVE-2010-0426 was succcessful")
root_credential = Credential(
username=constants.CVE_2010_0426.BACKDOOR_USER,
pw=constants.CVE_2010_0426.BACKDOOR_PW, port=constants.SSH.DEFAULT_PORT,
protocol=TransportProtocol.TCP, service=constants.SSH.SERVICE_NAME)
root_connection = EmulationUtil.setup_custom_connection(
user=constants.CVE_2010_0426.BACKDOOR_USER, pw=constants.CVE_2010_0426.BACKDOOR_PW,
source_ip=s.emulation_env_config.containers_config.agent_ip,
port=constants.SSH.DEFAULT_PORT,
target_ip=ip, proxy_conn=proxy_conn, root=True)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.CVE_2010_0426,
cvss=constants.EXPLOIT_VULNERABILITES.CVE_2010_0426_CVSS,
credentials=[root_credential], port=constants.SSH.DEFAULT_PORT,
protocol=TransportProtocol.TCP,
service=root_credential.service)
result.cve_vulns.append(vuln)
result.cve_2010_0426_tried = True
result.shell_access_credentials.append(root_credential)
result.backdoor_credentials.append(root_credential)
result.backdoor_tried = True
result.backdoor_installed = True
result.shell_access = True
result.logged_in = True
result.root = True
result.ssh_connections.append(root_connection)
service = constants.SSH.SERVICE_NAME
break
else:
Logger.__call__().get_logger().info("CVE-2010-0426 against was not succcessful")
time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2010_0426_SLEEP_RETRY)
if e_succ:
break
except Exception as e:
Logger.__call__().get_logger().warning(
f"Exception occurred during execution of exploit CVE-2010-0426, {str(e)}, {repr(e)}")
# Measure cost and alerts
cost += float(sleep_time * 2)
EmulationUtil.log_measured_action_time(total_time=cost, action=a, emulation_env_config=s.emulation_env_config)
result.cve_2010_0426_tried = True
return result, cost, e_succ, root_credential, service
[docs] @staticmethod
def cve_2015_5602_helper(s: EmulationEnvState, a: EmulationAttackerAction,
machine: EmulationAttackerMachineObservationState,
result: EmulationAttackerMachineObservationState) \
-> Tuple[EmulationAttackerMachineObservationState, float, bool, Credential, str]:
"""
Helper for executing the CVE-2015-5602 privilege escalation action
:param s: the current state
:param a: the CVE-2015-5602 privilege escalation action
:return: s_prime, cost, e_succ (flag), credentials, servicename
"""
cost = 0.0
e_succ = False
root_credential = None
service = ""
sleep_time = 0.5
# Start with SSH connections
for ssh_conn in machine.ssh_connections:
for ip in a.ips:
proxy_conn = ConnectionUtil.find_jump_host_connection(ip=ip, s=s)
for i in range(constants.ENV_CONSTANTS.ATTACKER_RETRY_CVE_2015_5602):
# Setup interactive shell
channel = ssh_conn.conn.invoke_shell()
# clear output
if channel.recv_ready():
channel.recv(constants.COMMON.DEFAULT_RECV_SIZE)
# Exploit
cmd = a.cmds[0]
try:
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel)
time.sleep(sleep_time)
cost += float(sleep_time)
EmulationUtil.read_result_interactive_channel(
emulation_env_config=s.emulation_env_config, channel=channel)
cmd = a.cmds[1]
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel)
time.sleep(sleep_time)
cost += float(sleep_time)
EmulationUtil.read_result_interactive_channel(
emulation_env_config=s.emulation_env_config, channel=channel)
cmd = a.cmds[2]
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel)
time.sleep(sleep_time)
cost += float(sleep_time)
EmulationUtil.read_result_interactive_channel(
emulation_env_config=s.emulation_env_config, channel=channel)
cmd = a.cmds[3]
EmulationUtil.execute_cmd_interactive_channel(cmd=cmd, channel=channel)
time.sleep(sleep_time)
cost += float(sleep_time)
EmulationUtil.read_result_interactive_channel(
emulation_env_config=s.emulation_env_config, channel=channel)
e_succ = ExploitUtil.check_if_rce_exploit_succeeded(
user=constants.CVE_2015_5602.BACKDOOR_USER, pw=constants.CVE_2015_5602.BACKDOOR_PW,
source_ip=s.emulation_env_config.containers_config.agent_ip,
port=constants.SSH.DEFAULT_PORT,
target_ip=ip, proxy_conn=proxy_conn)
except Exception as e:
Logger.__call__().get_logger().warning(f"CVE-2015-5602 error:{str(e)}, {repr(e)}")
e_succ = False
if e_succ:
Logger.__call__().get_logger().info("CVE-2015-5602 was succcessful")
root_credential = Credential(username=constants.CVE_2015_5602.BACKDOOR_USER,
pw=constants.CVE_2015_5602.BACKDOOR_PW,
port=constants.SSH.DEFAULT_PORT,
protocol=TransportProtocol.TCP,
service=constants.SSH.SERVICE_NAME)
root_connection = EmulationUtil.setup_custom_connection(
user=constants.CVE_2015_5602.BACKDOOR_USER, pw=constants.CVE_2015_5602.BACKDOOR_PW,
source_ip=s.emulation_env_config.containers_config.agent_ip,
port=constants.SSH.DEFAULT_PORT,
target_ip=ip, proxy_conn=proxy_conn, root=True)
vuln = EmulationVulnerabilityObservationState(
name=constants.EXPLOIT_VULNERABILITES.CVE_2015_5602,
cvss=constants.EXPLOIT_VULNERABILITES.CVE_2015_5602_CVSS,
credentials=[root_credential], port=constants.SSH.DEFAULT_PORT,
protocol=TransportProtocol.TCP, service=root_credential.service)
result.cve_vulns.append(vuln)
result.cve_2015_5602_tried = True
result.shell_access_credentials.append(root_credential)
result.backdoor_credentials.append(root_credential)
result.backdoor_tried = True
result.backdoor_installed = True
result.shell_access = True
result.logged_in = True
result.root = True
result.ssh_connections.append(root_connection)
service = constants.SSH.SERVICE_NAME
else:
time.sleep(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_5602_SLEEP_RETRY)
cost += float(constants.ENV_CONSTANTS.ATTACKER_CVE_2015_5602_SLEEP_RETRY)
# Measure cost and alerts
cost += float(sleep_time * 2)
EmulationUtil.log_measured_action_time(total_time=cost, action=a,
emulation_env_config=s.emulation_env_config)
result.cve_2015_5602_tried = True
return result, cost, e_succ, root_credential, service