Source code for csle_common.dao.emulation_action.attacker.emulation_attacker_action

from typing import Optional, List, Tuple, Dict, Any
import time
import csle_common.constants.constants as constants
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_type import EmulationAttackerActionType
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_id import EmulationAttackerActionId
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_outcome import EmulationAttackerActionOutcome
from csle_common.util.general_util import GeneralUtil
from csle_base.json_serializable import JSONSerializable


[docs]class EmulationAttackerAction(JSONSerializable): """ Class representing an action of the attacker in the emulation """ def __init__(self, id: EmulationAttackerActionId, name: str, cmds: List[str], type: EmulationAttackerActionType, descr: str, ips: List[str], index: int, action_outcome: EmulationAttackerActionOutcome = EmulationAttackerActionOutcome.INFORMATION_GATHERING, vulnerability: Optional[str] = None, alt_cmds: Optional[List[str]] = None, backdoor: bool = False, execution_time: float = 0.0, ts: Optional[float] = None): """ Class constructor :param id: id of the action :param name: name of the action :param cmds: command-line commands to apply the action on the emulation :param type: type of the action :param descr: description of the action (documentation) :param ips: ips of the machines to apply the action to :param index: index of the machien to apply the action to :param action_outcome: type of the outcome of the action :param vulnerability: type of vulnerability that the action exploits (in case an exploit) :param alt_cmds: alternative command if the first command does not work :param backdoor: if the action also installs a backdoor (some exploits does this) :param execution_time: the time it took to run the action :param ts: the timestep the action was completed """ self.type = type self.id = id self.name = name self.cmds = cmds self.descr = descr self.index = index self.ips = ips self.vulnerability = vulnerability self.action_outcome = action_outcome self.backdoor = backdoor self.alt_cmds = alt_cmds if self.alt_cmds is None: self.alt_cmds = [] self.index = index self.backdoor = backdoor self.execution_time = round(execution_time, 3) self.ts = ts
[docs] def nmap_cmds(self, machine_ips: Optional[List[str]] = None) -> Tuple[List[str], List[str]]: """ Augments the original command of the action with extra flags for NMAP :param machine_ips: list of ips :return: the new command """ commands = [] file_names = [] for ip in self.ips: file_name = str(self.id) + "_" + str(self.index) + "_" + ip.replace("/", "_") file_name = file_name + ".xml " commands.append(self.cmds[0] + constants.NMAP.FILE_ARGS + " " + file_name + ip) file_names.append(file_name) if machine_ips is None: machine_ips = [] for ip in machine_ips: file_name = str(self.id) + "_" + str(self.index) + "_" + ip.replace("/", "_") file_name = file_name + ".xml " commands.append(self.cmds[0] + constants.NMAP.FILE_ARGS + " " + file_name + ip) file_names.append(file_name) return commands, file_names
[docs] def nikto_cmds(self) -> Tuple[List[str], List[str]]: """ Augments the original command of the action with extra flags for Nikto :return: the new command """ file_names = [] commands = [] for ip in self.ips: file_name = str(self.id) + "_" + str(self.index) + "_" + ip + ".xml " commands.append(self.cmds[0] + constants.NIKTO.HOST_ARG + ip + " " + constants.NIKTO.OUTPUT_ARG + file_name) file_names.append(file_name) return commands, file_names
[docs] def masscan_cmds(self) -> Tuple[List[str], List[str]]: """ Augments the original command of the action with extra flags for massscan :return: the new command """ commands = [] file_names = [] for ip in self.ips: file_name = str(self.id) + "_" + str(self.index) + "_" + ip + ".xml " if self.index == -1: file_name = str(self.id) + "_" + str(self.index) + ".xml " commands.append(self.cmds[0] + constants.MASSCAN.OUTPUT_ARG + " " + file_name + ip) file_names.append(file_name) return commands, file_names
def __str__(self): """ :return: a string representation of the object """ return "id:{},name:{},ips:{},index:{}".format(self.id, self.name, self.ips, self.index)
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "EmulationAttackerAction": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = EmulationAttackerAction( type=d["type"], id=d["id"], name=d["name"], cmds=d["cmds"], descr=d["descr"], index=d["index"], ips=d["ips"], vulnerability=d["vulnerability"], action_outcome=d["action_outcome"], backdoor=d["backdoor"], alt_cmds=d["alt_cmds"], execution_time=d["execution_time"], ts=d["ts"] ) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["id"] = self.id d["name"] = self.name d["cmds"] = list(self.cmds) d["type"] = self.type d["descr"] = self.descr d["ips"] = list(self.ips) d["index"] = self.index d["action_outcome"] = self.action_outcome d["vulnerability"] = self.vulnerability d["alt_cmds"] = list(self.alt_cmds) if self.alt_cmds is not None else self.alt_cmds d["backdoor"] = self.backdoor d["execution_time"] = self.execution_time d["ts"] = self.ts return d
[docs] def ips_match(self, ips: List[str]) -> bool: """ Checks if a list of ips overlap with the ips of this host :param ips: the list of ips to check :return: True if they match, False otherwise """ for ip in self.ips: if ip in ips: return True return False
[docs] def to_kafka_record(self) -> str: """ Converts the instance into a kafka record format :param total_time: the total time of execution :return: the kafka record """ if self.vulnerability is None: vuln = "" else: vuln = self.vulnerability if self.alt_cmds is None: alt_cmds = [] else: alt_cmds = self.alt_cmds ts = time.time() if self.alt_cmds is None or len(self.alt_cmds) == 0: self.alt_cmds = ["-"] if self.cmds is None or len(self.cmds) == 0: self.cmds = ["-"] alt_cmds = list(map(lambda x: x.replace(",", "-"), alt_cmds)) vuln = vuln.replace(",", "-") cmds = list(map(lambda x: x.replace(",", "-"), self.cmds)) record = f"{ts},{self.id},{self.descr.replace(',', '')},{self.index},{self.name.replace(',', '-')}," \ f"{self.execution_time},{'_'.join(self.ips)},{'_'.join(cmds)},{self.type}," \ f"{self.action_outcome},{vuln}," \ f"{'_'.join(alt_cmds)},{self.backdoor}" return record
[docs] @staticmethod def from_kafka_record(record: str) -> "EmulationAttackerAction": """ Converts a kafka record into an instance :param record: the record to convert :return: the created instance """ parts = record.split(",") obj = EmulationAttackerAction(id=EmulationAttackerActionId(int(parts[1])), ts=float(parts[0]), descr=parts[2], index=int(parts[3]), name=parts[4], execution_time=float(parts[5]), ips=parts[6].split("_"), cmds=parts[7].split("_"), type=EmulationAttackerActionType(int(parts[8])), action_outcome=EmulationAttackerActionOutcome(int(parts[9])), vulnerability=parts[10], alt_cmds=parts[11].split("_"), backdoor=parts[12] == "True") return obj
[docs] @staticmethod def from_json_file(json_file_path: str) -> "EmulationAttackerAction": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return EmulationAttackerAction.from_dict(json.loads(json_str))
[docs] def copy(self) -> "EmulationAttackerAction": """ :return: a copy of the DTO """ return EmulationAttackerAction.from_dict(self.to_dict())
[docs] def create_execution_config(self, ip_first_octet: int) -> "EmulationAttackerAction": """ Creates a new config for an execution :param ip_first_octet: the first octet of the IP of the new execution :return: the new config """ config = self.copy() config.ips = list(map(lambda x: GeneralUtil.replace_first_octet_of_ip(ip=x, ip_first_octet=ip_first_octet), config.ips)) return config
[docs] def num_attributes(self) -> int: """ :return: The number of attributes of the DTO """ return 13
[docs] @staticmethod def schema() -> "EmulationAttackerAction": """ :return: get the schema of the DTO """ return EmulationAttackerAction(id=EmulationAttackerActionId.STOP, name="", cmds=[""], type=EmulationAttackerActionType.STOP, descr="", ips=[""], index=-1, action_outcome=EmulationAttackerActionOutcome.INFORMATION_GATHERING, vulnerability="", alt_cmds=[""], backdoor=False, execution_time=0.0, ts=0.0)