Source code for csle_attacker.emulation.emulated_attacker

from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_type import EmulationAttackerActionType
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_id import EmulationAttackerActionId
from csle_attacker.emulation.recon_middleware import ReconMiddleware
from csle_attacker.emulation.exploit_middleware import ExploitMiddleware
from csle_attacker.emulation.attacker_stopping_middleware import AttackerStoppingMiddleware
from csle_attacker.emulation.post_exploit_middleware import PostExploitMiddleware
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction


[docs]class EmulatedAttacker: """ Represents an emulated attacker agent """
[docs] @staticmethod def attacker_transition(s: EmulationEnvState, attacker_action: EmulationAttackerAction) -> EmulationEnvState: """ Implements the transition operator T: (s,a) -> s' :param s: the current state :param attacker_action: the attacker action :param emulation_env_config: the emulation environment configuration :return: s' """ if attacker_action.type == EmulationAttackerActionType.RECON: EnvDynamicsUtil.cache_attacker_action(a=attacker_action, s=s) return EmulatedAttacker.attacker_recon_action(s=s, a=attacker_action) elif attacker_action.type == EmulationAttackerActionType.EXPLOIT \ or attacker_action.type == EmulationAttackerActionType.PRIVILEGE_ESCALATION: if attacker_action.index == -1: EnvDynamicsUtil.cache_attacker_action(a=attacker_action, s=s) return EmulatedAttacker.attacker_exploit_action(s=s, a=attacker_action) elif attacker_action.type == EmulationAttackerActionType.POST_EXPLOIT: return EmulatedAttacker.attacker_post_exploit_action(s=s, a=attacker_action) elif attacker_action.type == EmulationAttackerActionType.STOP \ or attacker_action.type == EmulationAttackerActionType.CONTINUE: return EmulatedAttacker.attacker_stopping_action(s=s, a=attacker_action) else: raise ValueError("Action type not recognized")
[docs] @staticmethod def attacker_recon_action(s: EmulationEnvState, a: EmulationAttackerAction) \ -> EmulationEnvState: """ Implements the transition of a reconnaissance action :param s: the current state :param a: the action :return: s' """ if a.id == EmulationAttackerActionId.TCP_SYN_STEALTH_SCAN_HOST \ or a.id == EmulationAttackerActionId.TCP_SYN_STEALTH_SCAN_ALL: return ReconMiddleware.execute_tcp_syn_stealth_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.PING_SCAN_HOST or a.id == EmulationAttackerActionId.PING_SCAN_ALL: return ReconMiddleware.execute_ping_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.UDP_PORT_SCAN_HOST \ or a.id == EmulationAttackerActionId.UDP_PORT_SCAN_ALL: return ReconMiddleware.execute_udp_port_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.TCP_CON_NON_STEALTH_SCAN_HOST \ or a.id == EmulationAttackerActionId.TCP_CON_NON_STEALTH_SCAN_ALL: return ReconMiddleware.execute_tcp_con_stealth_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.TCP_FIN_SCAN_HOST or a.id == EmulationAttackerActionId.TCP_FIN_SCAN_ALL: return ReconMiddleware.execute_tcp_fin_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.TCP_NULL_SCAN_HOST \ or a.id == EmulationAttackerActionId.TCP_NULL_SCAN_ALL: return ReconMiddleware.execute_tcp_null_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.TCP_XMAS_TREE_SCAN_HOST \ or a.id == EmulationAttackerActionId.TCP_XMAS_TREE_SCAN_ALL: return ReconMiddleware.execute_tcp_xmas_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.OS_DETECTION_SCAN_HOST \ or a.id == EmulationAttackerActionId.OS_DETECTION_SCAN_ALL: return ReconMiddleware.execute_os_detection_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.VULSCAN_HOST \ or a.id == EmulationAttackerActionId.VULSCAN_ALL: return ReconMiddleware.execute_vulscan(s=s, a=a) elif a.id == EmulationAttackerActionId.NMAP_VULNERS_HOST \ or a.id == EmulationAttackerActionId.NMAP_VULNERS_ALL: return ReconMiddleware.execute_nmap_vulners(s=s, a=a) elif a.id == EmulationAttackerActionId.NIKTO_WEB_HOST_SCAN: return ReconMiddleware.execute_nikto_web_host_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.MASSCAN_HOST_SCAN or a.id == EmulationAttackerActionId.MASSCAN_ALL_SCAN: return ReconMiddleware.execute_masscan_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.FIREWALK_HOST \ or a.id == EmulationAttackerActionId.FIREWALK_ALL: return ReconMiddleware.execute_firewalk_scan(s=s, a=a) elif a.id == EmulationAttackerActionId.HTTP_ENUM_HOST \ or a.id == EmulationAttackerActionId.HTTP_ENUM_ALL: return ReconMiddleware.execute_http_enum(s=s, a=a) elif a.id == EmulationAttackerActionId.HTTP_GREP_HOST \ or a.id == EmulationAttackerActionId.HTTP_GREP_ALL: return ReconMiddleware.execute_http_grep(s=s, a=a) elif a.id == EmulationAttackerActionId.FINGER_HOST \ or a.id == EmulationAttackerActionId.FINGER_ALL: return ReconMiddleware.execute_finger(s=s, a=a) else: raise ValueError("Recon action id:{},name:{} not recognized".format(a.id, a.name))
[docs] @staticmethod def attacker_exploit_action(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Implements transition of an exploit action :param s: the current state :param a: the action :return: s' """ if a.id == EmulationAttackerActionId.TELNET_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.TELNET_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_telnet_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.SSH_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.SSH_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_ssh_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.FTP_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.FTP_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_ftp_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.CASSANDRA_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.CASSANDRA_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_cassandra_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.IRC_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.IRC_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_irc_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.MONGO_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.MONGO_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_mongo_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.MYSQL_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.MYSQL_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_mysql_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.SMTP_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.SMTP_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_smtp_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.POSTGRES_SAME_USER_PASS_DICTIONARY_HOST \ or a.id == EmulationAttackerActionId.POSTGRES_SAME_USER_PASS_DICTIONARY_ALL: return ExploitMiddleware.execute_postgres_same_user_dictionary(s=s, a=a) elif a.id == EmulationAttackerActionId.SAMBACRY_EXPLOIT: return ExploitMiddleware.execute_sambacry(s=s, a=a) elif a.id == EmulationAttackerActionId.SHELLSHOCK_EXPLOIT: return ExploitMiddleware.execute_shellshock(s=s, a=a) elif a.id == EmulationAttackerActionId.DVWA_SQL_INJECTION: return ExploitMiddleware.execute_dvwa_sql_injection(s=s, a=a) elif a.id == EmulationAttackerActionId.CVE_2015_3306_EXPLOIT: return ExploitMiddleware.execute_cve_2015_3306_exploit(s=s, a=a) elif a.id == EmulationAttackerActionId.CVE_2015_1427_EXPLOIT: return ExploitMiddleware.execute_cve_2015_1427_exploit(s=s, a=a) elif a.id == EmulationAttackerActionId.CVE_2016_10033_EXPLOIT: return ExploitMiddleware.execute_cve_2016_10033_exploit(s=s, a=a) elif a.id == EmulationAttackerActionId.CVE_2010_0426_PRIV_ESC: return ExploitMiddleware.execute_cve_2010_0426_exploit(s=s, a=a) elif a.id == EmulationAttackerActionId.CVE_2015_5602_PRIV_ESC: return ExploitMiddleware.execute_cve_2015_5602_exploit(s=s, a=a) else: raise ValueError("Exploit action id:{},name:{} not recognized".format(a.id, a.name))
[docs] @staticmethod def attacker_post_exploit_action(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Implements the transition of a post-exploit action :param s: the current state :param a: the action :return: s' """ if a.id == EmulationAttackerActionId.NETWORK_SERVICE_LOGIN: return PostExploitMiddleware.execute_service_login(s=s, a=a) if a.id == EmulationAttackerActionId.FIND_FLAG: return PostExploitMiddleware.execute_bash_find_flag(s=s, a=a) if a.id == EmulationAttackerActionId.INSTALL_TOOLS: return PostExploitMiddleware.execute_install_tools(s=s, a=a) if a.id == EmulationAttackerActionId.SSH_BACKDOOR: return PostExploitMiddleware.execute_ssh_backdoor(s=s, a=a) else: raise ValueError("Post-expoit action id:{},name:{} not recognized".format(a.id, a.name))
[docs] @staticmethod def attacker_stopping_action(s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Implements transition of a stopping action of the attacker :param s: the current state :param a: the action :return: s' """ if a.id == EmulationAttackerActionId.STOP: return AttackerStoppingMiddleware.stop_intrusion(s=s, a=a) elif a.id == EmulationAttackerActionId.CONTINUE: return AttackerStoppingMiddleware.continue_intrusion(s=s, a=a) else: raise ValueError("Stopping action id:{},name:{} not recognized".format(a.id, a.name))