Source code for csle_common.dao.system_identification.gp_system_model

from typing import List, Dict, Any, Set
import math
from scipy.special import rel_entr
from csle_common.dao.system_identification.gp_conditional import GPConditional
from csle_common.dao.system_identification.system_model import SystemModel
from csle_common.dao.system_identification.system_model_type import SystemModelType
from csle_common.dao.encoding.np_encoder import NpEncoder


[docs]class GPSystemModel(SystemModel): """ A system model (list of conditional distributions) made up of Gaussian Processes """ def __init__(self, emulation_env_name: str, emulation_statistic_id: int, conditional_metric_distributions: List[List[GPConditional]], descr: str): """ Initializes the object :param emulation: the emulation that this system model is for :param emulation_statistic_id: the emulation statistic that this model was built from :param conditional_metric_distributions: the list of conditional distributions :param descr: description of the model """ super(GPSystemModel, self).__init__(descr=descr, model_type=SystemModelType.GAUSSIAN_PROCESS) self.conditional_metric_distributions = conditional_metric_distributions complete_sample_space: Set[int] = set() for conds in self.conditional_metric_distributions: for cond in conds: complete_sample_space = complete_sample_space.union(set(cond.sample_space)) for conds in self.conditional_metric_distributions: for cond in conds: cond.sample_space = list(complete_sample_space) cond.generate_distribution() self.emulation_env_name = emulation_env_name self.emulation_statistic_id = emulation_statistic_id self.id = -1 self.conditionals_kl_divergences: Dict[str, Dict[str, Dict[str, float]]] = {} self.compute_kl_divergences()
[docs] def compute_kl_divergences(self) -> None: """ Computes the KL-divergences betwen different conditional distributions :return: None """ for metric_distributions_condition_1 in self.conditional_metric_distributions: self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name] = {} for metric_distributions_condition_2 in self.conditional_metric_distributions: self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name] = {} for i, metric_dist in enumerate(metric_distributions_condition_1): self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name][metric_dist.metric_name] = float( round(sum(rel_entr(metric_dist.distribution, metric_distributions_condition_2[i].distribution)), 3)) if math.isinf( self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name][metric_dist.metric_name]): self.conditionals_kl_divergences[metric_distributions_condition_1[0].conditional_name][ metric_distributions_condition_2[0].conditional_name][ metric_dist.metric_name] = math.inf
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "GPSystemModel": """ Converts a dict representation of the DTO into an instance :param d: the dict to convert :return: the converted instance """ dto = GPSystemModel( conditional_metric_distributions=list(map( lambda x: list(map(lambda y: GPConditional.from_dict(y), x)), d["conditional_metric_distributions"])), emulation_env_name=d["emulation_env_name"], emulation_statistic_id=d["emulation_statistic_id"], descr=d["descr"] ) if "id" in d: dto.id = d["id"] return dto
[docs] def to_dict(self) -> Dict[str, Any]: """ :return: a dict representation of the DTO """ d: Dict[str, Any] = {} d["conditional_metric_distributions"] = list(map(lambda x: list(map(lambda y: y.to_dict(), x)), self.conditional_metric_distributions)) d["emulation_env_name"] = self.emulation_env_name d["emulation_statistic_id"] = self.emulation_statistic_id d["descr"] = self.descr d["id"] = self.id d["conditionals_kl_divergences"] = self.conditionals_kl_divergences d["model_type"] = self.model_type return d
def __str__(self): """ :return: a string representation of the DTO """ return f"conditional_distributions: {self.conditional_metric_distributions}, " \ f"emulation_env_name: {self.emulation_env_name}, " \ f"emulation_statistic_id: {self.emulation_statistic_id}," \ f"descr: {self.descr}, conditionals_kl_divergences: {self.conditionals_kl_divergences}, " \ f"model_type: {self.model_type}"
[docs] def to_json_str(self) -> str: """ Converts the DTO into a json string :return: the json string representation of the DTO """ import json json_str = json.dumps(self.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) return json_str
[docs] def to_json_file(self, json_file_path: str) -> None: """ Saves the DTO to a json file :param json_file_path: the json file path to save the DTO to :return: None """ import io json_str = self.to_json_str() with io.open(json_file_path, 'w', encoding='utf-8') as f: f.write(json_str)
[docs] @staticmethod def from_json_file(json_file_path: str) -> "GPSystemModel": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return GPSystemModel.from_dict(json.loads(json_str))
[docs] def copy(self) -> "GPSystemModel": """ :return: a copy of the DTO """ return self.from_dict(self.to_dict())