Source code for csle_common.dao.emulation_action.defender.emulation_defender_action

from typing import Optional, List, Dict, Any
import time
from csle_common.dao.emulation_action.defender.emulation_defender_action_type import EmulationDefenderActionType
from csle_common.dao.emulation_action.defender.emulation_defender_action_id import EmulationDefenderActionId
from csle_common.dao.emulation_action.defender.emulation_defender_action_outcome import EmulationDefenderActionOutcome
from csle_base.json_serializable import JSONSerializable


[docs]class EmulationDefenderAction(JSONSerializable): """ Class representing an action of the defender in the environment """ def __init__(self, id: EmulationDefenderActionId, name: str, cmds: List[str], type: EmulationDefenderActionType, descr: str, ips: List[str], index: int, action_outcome: EmulationDefenderActionOutcome = EmulationDefenderActionOutcome.GAME_END, alt_cmds: Optional[List[str]] = None, execution_time: float = 0.0, ts: float = 0.0): """ Class constructor :param id: id of the action :param name: name of the action :param cmds: command-line commands to apply the action on the emulation :param type: type of the action :param descr: description of the action (documentation) :param ips: ip of the machine to apply the action to :param index: index of the machine to apply the action to :param action_outcome: type of the outcome of the action :param alt_cmds: alternative command if the first command does not work :param execution_time: the time it took to run the action :param ts: the timestamp the action was run """ self.id = id self.name = name self.cmds = cmds self.type = type self.descr = descr self.ips = ips self.action_outcome = action_outcome self.alt_cmds = alt_cmds if self.alt_cmds is None: self.alt_cmds = [] self.index = index self.ts = ts self.execution_time = round(execution_time, 3) def __str__(self) -> str: """ :return: a string representation of the object """ return "id:{},name:{},ips:{},index:{}".format(self.id, self.name, self.ips, self.index)
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "EmulationDefenderAction": """ Converts a dict representation to an instance :param d: the dict to convert :return: the instance """ obj = EmulationDefenderAction( id=d["id"], name=d["name"], cmds=d["cmds"], type=d["type"], descr=d["descr"], ips=d["ips"], index=d["index"], action_outcome=d["action_outcome"], alt_cmds=d["alt_cmds"], ts=d["ts"], execution_time=d["execution_time"]) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ :return: a dicr representation of the object """ d: Dict[str, Any] = {} d["id"] = self.id d["name"] = self.name d["cmds"] = list(self.cmds) d["type"] = self.type d["descr"] = self.descr d["ips"] = list(self.ips) d["index"] = self.index d["action_outcome"] = self.action_outcome d["alt_cmds"] = list(self.alt_cmds) if self.alt_cmds is not None else [] d["execution_time"] = self.execution_time d["ts"] = self.ts return d
[docs] def ips_match(self, ips: List[str]) -> bool: """ Checks if a list of ips overlap with the ips of this host :param ips: the list of ips to check :return: True if they match, False otherwise """ for ip in self.ips: if ip in ips: return True return False
[docs] def to_kafka_record(self) -> str: """ Converts the instance into a kafka record format :param total_time: the total time of execution :return: the kafka record """ ts = time.time() record = f"{ts},{self.id},{self.descr},{self.index},{self.name}," \ f"{self.execution_time},{'_'.join(self.ips)},{'_'.join(self.cmds)},{self.type}," \ f"{self.action_outcome},{'_'.join(self.alt_cmds) if self.alt_cmds is not None else []}" return record
[docs] @staticmethod def from_kafka_record(record: str) -> "EmulationDefenderAction": """ Converts a kafka record into an instance :param record: the record to convert :return: the created instance """ parts = record.split(",") obj = EmulationDefenderAction(id=EmulationDefenderActionId(int(parts[1])), ts=float(parts[0]), descr=parts[2], index=int(parts[3]), name=parts[4], execution_time=float(parts[5]), ips=parts[6].split("_"), cmds=parts[7].split("_"), type=EmulationDefenderActionType(int(parts[8])), action_outcome=EmulationDefenderActionOutcome(int(parts[9])), alt_cmds=parts[10].split("_")) return obj
[docs] @staticmethod def from_json_file(json_file_path: str) -> "EmulationDefenderAction": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return EmulationDefenderAction.from_dict(json.loads(json_str))
[docs] def copy(self) -> "EmulationDefenderAction": """ :return: a copy of the DTO """ return EmulationDefenderAction.from_dict(self.to_dict())
[docs] def num_attributes(self) -> int: """ :return: The number of attributes of the DTO """ return 11
[docs] @staticmethod def schema() -> "EmulationDefenderAction": """ :return: get the schema of the DTO """ return EmulationDefenderAction(id=EmulationDefenderActionId.STOP, name="", cmds=[""], type=EmulationDefenderActionType.STOP, descr="", ips=[""], index=-1, action_outcome=EmulationDefenderActionOutcome.GAME_END, alt_cmds=[""], execution_time=0.0, ts=0.0)