Source code for csle_common.dao.system_identification.empirical_system_model

from typing import List, Dict, Any
import math
from scipy.special import rel_entr
from csle_common.dao.system_identification.empirical_conditional import EmpiricalConditional
from csle_common.dao.system_identification.system_model import SystemModel
from csle_common.dao.system_identification.system_model_type import SystemModelType


[docs]class EmpiricalSystemModel(SystemModel): """ A system model (list of conditional distributions) made up of empirical distributions """ def __init__(self, emulation_env_name: str, emulation_statistic_id: int, conditional_metric_distributions: List[List[EmpiricalConditional]], descr: str): """ Initializes the object :param emulation: the emulation that this system model is for :param emulation_statistic_id: the emulation statistic that this model was built from :param conditional_metric_distributions: the list of conditional distributions :param descr: description of the model """ super(EmpiricalSystemModel, self).__init__(descr=descr, model_type=SystemModelType.EMPIRICAL_DISTRIBUTION) self.conditional_metric_distributions = conditional_metric_distributions self.emulation_env_name = emulation_env_name self.emulation_statistic_id = emulation_statistic_id self.id = -1 self.conditionals_kl_divergences: Dict[str, Dict[str, Dict[str, float]]] = {} self.compute_kl_divergences()
[docs] def compute_kl_divergences(self) -> None: """ Computes the KL-divergences betwen different conditional distributions :return: None """ for metric_distributions_condition_1 in self.conditional_metric_distributions: self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name] = {} for metric_distributions_condition_2 in self.conditional_metric_distributions: self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name] = {} for i, metric_dist in enumerate(metric_distributions_condition_1): self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name][metric_dist.metric_name] = float( round(sum(rel_entr( metric_dist.probabilities, metric_distributions_condition_2[i].probabilities)), 3)) if math.isinf( self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name][metric_dist.metric_name]): self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name][metric_dist.metric_name] = math.inf
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "EmpiricalSystemModel": """ Converts a dict representation of the DTO into an instance :param d: the dict to convert :return: the converted instance """ dto = EmpiricalSystemModel( conditional_metric_distributions=list(map( lambda x: list(map(lambda y: EmpiricalConditional.from_dict(y), x)), d["conditional_metric_distributions"])), emulation_env_name=d["emulation_env_name"], emulation_statistic_id=d["emulation_statistic_id"], descr=d["descr"] ) if "id" in d: dto.id = d["id"] return dto
[docs] def to_dict(self) -> Dict[str, Any]: """ :return: a dict representation of the DTO """ d: Dict[str, Any] = {} d["conditional_metric_distributions"] = list(map(lambda x: list(map(lambda y: y.to_dict(), x)), self.conditional_metric_distributions)) d["emulation_env_name"] = self.emulation_env_name d["emulation_statistic_id"] = self.emulation_statistic_id d["descr"] = self.descr d["id"] = self.id d["conditionals_kl_divergences"] = self.conditionals_kl_divergences d["model_type"] = self.model_type return d
def __str__(self): """ :return: a string representation of the DTO """ return f"conditional_distributions: {self.conditional_metric_distributions}, " \ f"emulation_env_name: {self.emulation_env_name}, " \ f"emulation_statistic_id: {self.emulation_statistic_id}," \ f"descr: {self.descr}, conditionals_kl_divergences: {self.conditionals_kl_divergences}, " \ f"model_type: {self.model_type}"
[docs] def copy(self) -> "EmpiricalSystemModel": """ :return: a copy of the DTO """ return self.from_dict(self.to_dict())
[docs] @staticmethod def from_json_file(json_file_path: str) -> "EmpiricalSystemModel": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return EmpiricalSystemModel.from_dict(json.loads(json_str))