Source code for csle_common.dao.emulation_config.ossec_ids_manager_config

from typing import Dict, Any, Union
from csle_base.json_serializable import JSONSerializable


[docs]class OSSECIDSManagerConfig(JSONSerializable): """ Represents the configuration of the OSSEC IDS managers in a CSLE emulation """ def __init__(self, ossec_ids_manager_log_file: str, ossec_ids_manager_log_dir: str, ossec_ids_manager_max_workers: int, time_step_len_seconds: int = 15, ossec_ids_manager_port: int = 50047, version: str = "0.0.1") -> None: """ Initializes the DTO :param time_step_len_seconds: the length of a time-step (period for logging) :param version: the version :param ossec_ids_manager_port: the GRPC port of the OSSEC IDS manager :param ossec_ids_manager_log_file: Log file of the OSSEC IDS manager :param ossec_ids_manager_log_dir: Log dir of the OSSEC IDS manager :param ossec_ids_manager_max_workers: Max GRPC workers of the OSSEC IDS manager """ self.time_step_len_seconds = time_step_len_seconds self.version = version self.ossec_ids_manager_port = ossec_ids_manager_port self.ossec_ids_manager_log_file = ossec_ids_manager_log_file self.ossec_ids_manager_log_dir = ossec_ids_manager_log_dir self.ossec_ids_manager_max_workers = ossec_ids_manager_max_workers
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "OSSECIDSManagerConfig": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = OSSECIDSManagerConfig(time_step_len_seconds=d["time_step_len_seconds"], version=d["version"], ossec_ids_manager_port=d["ossec_ids_manager_port"], ossec_ids_manager_log_dir=d["ossec_ids_manager_log_dir"], ossec_ids_manager_log_file=d["ossec_ids_manager_log_file"], ossec_ids_manager_max_workers=d["ossec_ids_manager_max_workers"]) return obj
[docs] def to_dict(self) -> Dict[str, Union[str, int]]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Union[str, int]] = {} d["ossec_ids_manager_port"] = self.ossec_ids_manager_port d["time_step_len_seconds"] = self.time_step_len_seconds d["version"] = self.version d["ossec_ids_manager_log_file"] = self.ossec_ids_manager_log_file d["ossec_ids_manager_log_dir"] = self.ossec_ids_manager_log_dir d["ossec_ids_manager_max_workers"] = self.ossec_ids_manager_max_workers return d
def __str__(self) -> str: """ :return: a string representation of the object """ return f"ossec_ids_manager_port: {self.ossec_ids_manager_port}, " \ f"time_step_len_seconds: {self.time_step_len_seconds}," \ f" version: {self.version}, ossec_ids_manager_log_file: {self.ossec_ids_manager_log_file}, " \ f"ossec_ids_manager_log_dir: {self.ossec_ids_manager_log_dir}, " \ f"ossec_ids_manager_max_workers: {self.ossec_ids_manager_max_workers}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "OSSECIDSManagerConfig": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return OSSECIDSManagerConfig.from_dict(json.loads(json_str))
[docs] def copy(self) -> "OSSECIDSManagerConfig": """ :return: a copy of the DTO """ return OSSECIDSManagerConfig.from_dict(self.to_dict())
[docs] def create_execution_config(self, ip_first_octet: int) -> "OSSECIDSManagerConfig": """ Creates a new config for an execution :param ip_first_octet: the first octet of the IP of the new execution :return: the new config """ config = self.copy() return config
[docs] @staticmethod def schema() -> "OSSECIDSManagerConfig": """ :return: get the schema of the DTO """ return OSSECIDSManagerConfig(ossec_ids_manager_log_file="ossec_ids_manager.log", ossec_ids_manager_log_dir="/", ossec_ids_manager_max_workers=10)