Source code for csle_common.dao.emulation_action.attacker.emulation_attacker_action_config

from typing import Optional, List, Dict, Any
import gymnasium as gym
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_id import EmulationAttackerActionId
from csle_common.dao.emulation_action.attacker.emulation_attacker_nmap_actions import EmulationAttackerNMAPActions
from csle_common.dao.emulation_action.attacker.emulation_attacker_nikto_actions import EmulationAttackerNIKTOActions
from csle_common.dao.emulation_action.attacker.emulation_attacker_masscan_actions \
    import EmulationAttackerMasscanActions
from csle_common.dao.emulation_action.attacker.emulation_attacker_network_service_actions \
    import EmulationAttackerNetworkServiceActions
from csle_common.dao.emulation_action.attacker.emulation_attacker_shell_actions import EmulationAttackerShellActions
from csle_common.dao.emulation_action.attacker.emulation_attacker_stopping_actions \
    import EmulationAttackerStoppingActions
from csle_base.json_serializable import JSONSerializable


[docs]class EmulationAttackerActionConfig(JSONSerializable): """ Configuration of the action space for the attacker in the emulation """ def __init__(self, num_indices: int, actions: Optional[List[EmulationAttackerAction]] = None, nmap_action_ids: Optional[List[EmulationAttackerActionId]] = None, network_service_action_ids: Optional[List[EmulationAttackerActionId]] = None, shell_action_ids: Optional[List[EmulationAttackerActionId]] = None, nikto_action_ids: Optional[List[EmulationAttackerActionId]] = None, masscan_action_ids: Optional[List[EmulationAttackerActionId]] = None, stopping_action_ids: Optional[List[EmulationAttackerActionId]] = None): """ Class constructor :param num_indices: max num machine indexes allowed :param actions: list of actions in the action space :param nmap_action_ids: list of ids of the actions that are NMAP actions :param network_service_action_ids: list of ids of the actions that are network service actions :param shell_action_ids: list of ids of the actions that are shell actions :param nikto_action_ids: list of ids of the actions that are Nikto actions :param masscan_action_ids: list of ids of the actions that are Masscan actions :param stopping_action_ids: List of ids of the actions that are actions related to optimal stopping """ if actions is None: raise ValueError("self.num_actions can not be defined") self.num_actions = len(actions) self.actions = actions self.num_indices = num_indices self.action_space = gym.spaces.Discrete(self.num_actions) self.action_lookup_d = {} self.action_lookup_d_val = {} for action in actions: self.action_lookup_d[(action.id, action.index)] = action self.action_lookup_d_val[(action.id, action.index)] = action self.nmap_action_ids = nmap_action_ids self.network_service_action_ids = network_service_action_ids self.shell_action_ids = shell_action_ids self.nikto_action_ids = nikto_action_ids self.masscan_action_ids = masscan_action_ids self.stopping_action_ids = stopping_action_ids if self.nmap_action_ids is None or self.network_service_action_ids is None or \ self.shell_action_ids is None or self.nikto_action_ids is None or \ self.masscan_action_ids is None or self.stopping_action_ids is None: raise ValueError("At least one of the actions is None") self.action_ids = (self.nmap_action_ids + self.network_service_action_ids + self.shell_action_ids + self.nikto_action_ids + self.masscan_action_ids + self.stopping_action_ids) self.num_node_specific_actions = len(self.action_ids) self.m_action_space = gym.spaces.Discrete(self.num_node_specific_actions) self.ar_action_converter = {} # Add all (temp for j, a in enumerate(self.actions): idx2 = self.action_ids.index(a.id) key = (num_indices, idx2) self.ar_action_converter[key] = j # Add subnet actions for j, a in enumerate(self.actions): idx2 = self.action_ids.index(a.id) if a.index == self.num_indices: key = (num_indices, idx2) self.ar_action_converter[key] = j # Add subnet actions to all machines for i in range(num_indices): for j, a in enumerate(self.actions): idx2 = self.action_ids.index(a.id) if a.index == self.num_indices: key = (i, idx2) self.ar_action_converter[key] = j # Add rest of actions for i, a in enumerate(self.actions): if a.index != self.num_indices: idx2 = self.action_ids.index(a.id) key = (a.index, idx2) self.ar_action_converter[key] = i
[docs] def print_actions(self) -> None: """ Utility function for printing the list of actions :return: None """ print("Attacker Actions:") for i, action in enumerate(self.actions): tag = "-" if not action.index == -1: if action.index is not None: tag = str(action.index) else: tag = "*" print(str(i) + ":" + action.name + "[" + tag + "]")
[docs] @staticmethod def dict_brute_same_user_ids(): """ :return: list of brute-force attack ids for teh attacker """ return [ EmulationAttackerActionId.TELNET_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.SSH_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.FTP_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.CASSANDRA_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.IRC_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.MONGO_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.MYSQL_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.SMTP_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.POSTGRES_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.TELNET_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.SSH_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.FTP_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.CASSANDRA_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.IRC_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.MONGO_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.MYSQL_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.SMTP_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.POSTGRES_SAME_USER_PASS_DICTIONARY_ALL ]
[docs] def get_continue_action_idx(self) -> int: """ :return: the id of the continue-action """ for i in range(len(self.actions)): if self.actions[i].id == EmulationAttackerActionId.CONTINUE: return i raise ValueError("No Continue Action in the action space")
[docs] def get_action_by_id(self, action_id: EmulationAttackerActionId) -> EmulationAttackerAction: """ Gets the action of a given id :param action_id: the action id :return: the action of the id """ for a in self.actions: if a.id == action_id: return a raise ValueError("action id not found: {}".format(action_id))
[docs] @staticmethod def all_actions_config(num_nodes: int, subnet_masks: List[str], hacker_ip: str) -> "EmulationAttackerActionConfig": """ Gets the default action config for a given environment, which includes all actions :param num_nodes: the number of nodes in the environment :param subnet_masks: the subnet masks of the environment :param hacker_ip: the hacker ip :return: """ attacker_actions = [] # Host actions for idx in range(num_nodes): attacker_actions.append(EmulationAttackerNMAPActions.TCP_SYN_STEALTH_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.PING_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.UDP_PORT_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_CON_NON_STEALTH_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_FIN_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_NULL_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_XMAS_TREE_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.OS_DETECTION_SCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.NMAP_VULNERS(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.TELNET_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.SSH_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.FTP_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.CASSANDRA_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.IRC_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.MONGO_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.MYSQL_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.SMTP_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.POSTGRES_SAME_USER_PASS_DICTIONARY(index=idx)) attacker_actions.append(EmulationAttackerNIKTOActions.NIKTO_WEB_HOST_SCAN(index=idx)) attacker_actions.append( EmulationAttackerMasscanActions.MASSCAN_HOST_SCAN(index=idx, host_ip=hacker_ip)) attacker_actions.append(EmulationAttackerNMAPActions.FIREWALK(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.HTTP_ENUM(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.HTTP_GREP(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.VULSCAN(index=idx)) attacker_actions.append(EmulationAttackerNMAPActions.FINGER(index=idx)) attacker_actions.append(EmulationAttackerShellActions.SAMBACRY_EXPLOIT(index=idx)) attacker_actions.append(EmulationAttackerShellActions.SHELLSHOCK_EXPLOIT(index=idx)) attacker_actions.append(EmulationAttackerShellActions.DVWA_SQL_INJECTION(index=idx)) attacker_actions.append(EmulationAttackerShellActions.CVE_2015_3306_EXPLOIT(index=idx)) attacker_actions.append(EmulationAttackerShellActions.CVE_2015_1427_EXPLOIT(index=idx)) attacker_actions.append(EmulationAttackerShellActions.CVE_2016_10033_EXPLOIT(index=idx)) attacker_actions.append(EmulationAttackerShellActions.CVE_2010_0426_PRIV_ESC(index=idx)) attacker_actions.append(EmulationAttackerShellActions.CVE_2015_5602_PRIV_ESC(index=idx)) # Subnet actions attacker_actions.append( EmulationAttackerNMAPActions.TCP_SYN_STEALTH_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.PING_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.UDP_PORT_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append( EmulationAttackerNMAPActions.TCP_CON_NON_STEALTH_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_FIN_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_NULL_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.TCP_XMAS_TREE_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.OS_DETECTION_SCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.NMAP_VULNERS(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.TELNET_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.SSH_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.FTP_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.CASSANDRA_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.IRC_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append( EmulationAttackerNMAPActions.MONGO_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append( EmulationAttackerNMAPActions.MYSQL_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append( EmulationAttackerNMAPActions.SMTP_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append( EmulationAttackerNMAPActions.POSTGRES_SAME_USER_PASS_DICTIONARY(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerShellActions.FIND_FLAG(index=-1)) attacker_actions.append(EmulationAttackerNetworkServiceActions.SERVICE_LOGIN(index=-1)) attacker_actions.append(EmulationAttackerMasscanActions.MASSCAN_HOST_SCAN(index=-1, host_ip=hacker_ip, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.FIREWALK(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.HTTP_ENUM(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.HTTP_GREP(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.VULSCAN(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerNMAPActions.FINGER(index=-1, ips=subnet_masks)) attacker_actions.append(EmulationAttackerShellActions.INSTALL_TOOLS(index=-1)) attacker_actions.append(EmulationAttackerShellActions.SSH_BACKDOOR(index=-1)) attacker_actions.append(EmulationAttackerStoppingActions.STOP(index=-1)) attacker_actions.append(EmulationAttackerStoppingActions.CONTINUE(index=-1)) attacker_actions = sorted(attacker_actions, key=lambda x: (x.id, x.index)) nmap_action_ids = [ EmulationAttackerActionId.TCP_SYN_STEALTH_SCAN_HOST, EmulationAttackerActionId.TCP_SYN_STEALTH_SCAN_ALL, EmulationAttackerActionId.PING_SCAN_HOST, EmulationAttackerActionId.PING_SCAN_ALL, EmulationAttackerActionId.UDP_PORT_SCAN_HOST, EmulationAttackerActionId.UDP_PORT_SCAN_ALL, EmulationAttackerActionId.TCP_CON_NON_STEALTH_SCAN_HOST, EmulationAttackerActionId.TCP_CON_NON_STEALTH_SCAN_ALL, EmulationAttackerActionId.TCP_FIN_SCAN_HOST, EmulationAttackerActionId.TCP_FIN_SCAN_ALL, EmulationAttackerActionId.TCP_NULL_SCAN_HOST, EmulationAttackerActionId.TCP_NULL_SCAN_ALL, EmulationAttackerActionId.TCP_XMAS_TREE_SCAN_HOST, EmulationAttackerActionId.TCP_XMAS_TREE_SCAN_ALL, EmulationAttackerActionId.OS_DETECTION_SCAN_HOST, EmulationAttackerActionId.OS_DETECTION_SCAN_ALL, EmulationAttackerActionId.NMAP_VULNERS_HOST, EmulationAttackerActionId.NMAP_VULNERS_ALL, EmulationAttackerActionId.TELNET_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.TELNET_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.SSH_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.SSH_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.FTP_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.FTP_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.CASSANDRA_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.CASSANDRA_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.IRC_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.IRC_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.MONGO_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.MONGO_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.MYSQL_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.MYSQL_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.SMTP_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.SMTP_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.POSTGRES_SAME_USER_PASS_DICTIONARY_HOST, EmulationAttackerActionId.POSTGRES_SAME_USER_PASS_DICTIONARY_ALL, EmulationAttackerActionId.FIREWALK_HOST, EmulationAttackerActionId.FIREWALK_ALL, EmulationAttackerActionId.HTTP_ENUM_HOST, EmulationAttackerActionId.HTTP_ENUM_ALL, EmulationAttackerActionId.HTTP_GREP_HOST, EmulationAttackerActionId.HTTP_GREP_ALL, EmulationAttackerActionId.VULSCAN_HOST, EmulationAttackerActionId.VULSCAN_ALL, EmulationAttackerActionId.FINGER_HOST, EmulationAttackerActionId.FINGER_ALL ] network_service_action_ids = [EmulationAttackerActionId.NETWORK_SERVICE_LOGIN] shell_action_ids = [EmulationAttackerActionId.FIND_FLAG, EmulationAttackerActionId.INSTALL_TOOLS, EmulationAttackerActionId.SSH_BACKDOOR, EmulationAttackerActionId.SAMBACRY_EXPLOIT, EmulationAttackerActionId.SHELLSHOCK_EXPLOIT, EmulationAttackerActionId.DVWA_SQL_INJECTION, EmulationAttackerActionId.CVE_2015_3306_EXPLOIT, EmulationAttackerActionId.CVE_2015_1427_EXPLOIT, EmulationAttackerActionId.CVE_2016_10033_EXPLOIT, EmulationAttackerActionId.CVE_2010_0426_PRIV_ESC, EmulationAttackerActionId.CVE_2015_5602_PRIV_ESC ] nikto_action_ids = [EmulationAttackerActionId.NIKTO_WEB_HOST_SCAN] masscan_action_ids = [EmulationAttackerActionId.MASSCAN_HOST_SCAN, EmulationAttackerActionId.MASSCAN_ALL_SCAN] stopping_action_ids = [EmulationAttackerActionId.STOP, EmulationAttackerActionId.CONTINUE] attacker_action_config = EmulationAttackerActionConfig(num_indices=num_nodes + 1, actions=attacker_actions, nmap_action_ids=nmap_action_ids, network_service_action_ids=network_service_action_ids, shell_action_ids=shell_action_ids, nikto_action_ids=nikto_action_ids, masscan_action_ids=masscan_action_ids, stopping_action_ids=stopping_action_ids) return attacker_action_config
def __str__(self) -> str: """ :return: a string representation of the object """ return f"num_indices: {self.num_indices}, actions: {self.actions}, nmap_action_ids: {self.nmap_action_ids}, " \ f"network_service_action_ids: {self.network_service_action_ids}, " \ f"shell_action_ids: {self.shell_action_ids}, nikto_action_ids: {self.nikto_action_ids}, " \ f"masscan_action_ids: {self.masscan_action_ids}, stopping_action_ids: {self.stopping_action_ids}"
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["num_indices"] = self.num_indices d["actions"] = self.actions d["nmap_action_ids"] = self.nmap_action_ids d["network_service_action_ids"] = self.network_service_action_ids d["shell_action_ids"] = self.shell_action_ids d["nikto_action_ids"] = self.nikto_action_ids d["masscan_action_ids"] = self.masscan_action_ids d["stopping_action_ids"] = self.stopping_action_ids return d
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "EmulationAttackerActionConfig": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = EmulationAttackerActionConfig( num_indices=d["num_indices"], actions=d["actions"], nmap_action_ids=d["nmap_action_ids"], network_service_action_ids=d["network_service_action_ids"], shell_action_ids=d["shell_action_ids"], nikto_action_ids=d["nikto_action_ids"], masscan_action_ids=d["masscan_action_ids"], stopping_action_ids=d["stopping_action_ids"] ) return obj
[docs] @staticmethod def from_json_file(json_file_path: str) -> "EmulationAttackerActionConfig": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return EmulationAttackerActionConfig.from_dict(json.loads(json_str))