Source code for csle_system_identification.gp.gp_regression_algorithm

from typing import List, Optional
import os
import torch
import gpytorch
from csle_system_identification.base.base_system_identification_algorithm import BaseSystemIdentificationAlgorithm
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.system_identification.emulation_statistics import EmulationStatistics
from csle_common.dao.system_identification.system_identification_config import SystemIdentificationConfig
from csle_common.dao.system_identification.gp_system_model import GPSystemModel
from csle_common.dao.system_identification.gp_conditional import GPConditional
from csle_common.dao.jobs.system_identification_job_config import SystemIdentificationJobConfig
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.logging.log import Logger
from csle_common.util.general_util import GeneralUtil
import csle_system_identification.constants.constants as system_identification_constants
from csle_common.dao.system_identification.gp_regression_model_with_gauissan_noise import \
    GPRegressionModelWithGaussianNoise


[docs]class GPRegressionAlgorithm(BaseSystemIdentificationAlgorithm): """ Class that implements the system identification procedure using EM """ def __init__(self, emulation_env_config: EmulationEnvConfig, emulation_statistics: EmulationStatistics, system_identification_config: SystemIdentificationConfig, system_identification_job: Optional[SystemIdentificationJobConfig] = None): """ Initializes the algorithm :param emulation_env_config: the configuration of the emulation environment :param emulation_statistics: the statistics to fit :param system_identification_config: configuration of EM :param system_identification_job: system identification job config (optional) """ super(GPRegressionAlgorithm, self).__init__( emulation_env_config=emulation_env_config, emulation_statistics=emulation_statistics, system_identification_config=system_identification_config ) self.system_identification_job = system_identification_job
[docs] def fit(self) -> GPSystemModel: """ Fits a Gaussian Process for each conditional and metric using the GP regression algorithm :return: the fitted model """ if self.emulation_env_config is None: raise ValueError("Emulation config cannot be None") # Setup system identification job pid = os.getpid() descr = f"System identification through Gaussian Process Regression, " \ f"emulation:{self.emulation_env_config.name}, statistic id: {self.emulation_statistics.id}" if self.system_identification_job is None: self.system_identification_job = SystemIdentificationJobConfig( emulation_env_name=self.emulation_env_config.name, emulation_statistics_id=self.emulation_statistics.id, pid=pid, progress_percentage=0, log_file_path=Logger.__call__().get_log_file_path(), descr=descr, system_model=None, system_identification_config=self.system_identification_config, physical_host_ip=GeneralUtil.get_host_ip()) system_identification_job_id = MetastoreFacade.save_system_identification_job( system_identification_job=self.system_identification_job) self.system_identification_job.id = system_identification_job_id else: self.system_identification_job.pid = pid self.system_identification_job.progress_percentage = 0 self.system_identification_job.system_model = None MetastoreFacade.update_system_identification_job(system_identification_job=self.system_identification_job, id=self.system_identification_job.id) # Run the GP regression algorithm for each conditional and metric conditionals = self.system_identification_config.hparams[ system_identification_constants.SYSTEM_IDENTIFICATION.CONDITIONAL_DISTRIBUTIONS].value metrics = self.system_identification_config.hparams[ system_identification_constants.SYSTEM_IDENTIFICATION.METRICS].value Logger.__call__().get_logger().info(f"Starting execution of the Gaussian Process regression algorithm. " f"Emulation env name: {self.emulation_env_config.name}, " f"emulation_statistic_id: {self.emulation_statistics.id}," f"conditionals: {conditionals}, metrics: {metrics}") gp_conditionals = [] max_val = 0 for i, conditional in enumerate(conditionals): for j, metric in enumerate(metrics): counts = self.emulation_statistics.conditionals_counts[conditional][metric] for val, count in counts.items(): if val > max_val: max_val = val sample_space = list(range(0, max_val)) self.emulation_statistics.compute_descriptive_statistics_and_distributions() for i, conditional in enumerate(conditionals): gp_conditionals_metrics = [] for j, metric in enumerate(metrics): observed_x = [] observed_y = [] for val, prob in self.emulation_statistics.conditionals_probs[conditional][metric].items(): observed_x.append(val) observed_y.append(prob) observed_x_tensor = torch.tensor(observed_x) observed_y_tensor = torch.tensor(observed_y) # initialize likelihood and model, the Gaussian likelihood assumes observed data points # have zero mean gaussian noise likelihood = gpytorch.likelihoods.GaussianLikelihood() model = GPRegressionModelWithGaussianNoise(observed_x_tensor, observed_y_tensor, likelihood) # get into train mode model.train() likelihood.train() # Includes GaussianLikelihood parameters lr = self.system_identification_config.hparams[ system_identification_constants.GAUSSIAN_PROCESS_REGRESSION.LEARNING_RATE].value optimizer = torch.optim.Adam(model.parameters(), lr=lr) # "Loss" for GPs - the marginal log likelihood mll = gpytorch.mlls.ExactMarginalLogLikelihood(likelihood, model) training_iter = self.system_identification_config.hparams[ system_identification_constants.GAUSSIAN_PROCESS_REGRESSION.TRAINING_ITERATIONS].value # Find optimal model hyperparameters by minimizing the negative marignal likelihood loss # through gradient descent. for i in range(training_iter): # Zero gradients from previous iteration optimizer.zero_grad() # Output from model output = model(observed_x_tensor) # Calc loss and backprop gradients loss = -mll(output, observed_y_tensor) loss.backward() Logger.__call__().get_logger().info( f"[GP-Regression] iter:{i+1}/{training_iter}, loss:{loss.item()}, " f"learned lengthscale param: {model.covar_module.base_kernel.lengthscale.item()}, " f"learned likehood noise: {model.likelihood.noise.item()}") # Gradient descent step optimizer.step() gp_conditionals_metrics.append(GPConditional( conditional_name=conditional, metric_name=metric, sample_space=sample_space, observed_x=observed_x, observed_y=observed_y, scale_parameter=model.covar_module.base_kernel.lengthscale.item(), noise_parameter=model.likelihood.noise.item() )) gp_conditionals.append(gp_conditionals_metrics) model_descr = f"Model fitted through GP regression, " \ f"emulation:{self.emulation_env_config.name}, statistic id: {self.emulation_statistics.id}" model = GPSystemModel( emulation_env_name=self.emulation_env_config.name, emulation_statistic_id=self.emulation_statistics.id, conditional_metric_distributions=gp_conditionals, descr=model_descr) self.system_identification_job.system_model = model self.system_identification_job.progress_percentage = 100 MetastoreFacade.update_system_identification_job(system_identification_job=self.system_identification_job, id=self.system_identification_job.id) Logger.__call__().get_logger().info(f"Execution of the Gaussian process algorithm complete." f"Emulation env name: {self.emulation_env_config.name}, " f"emulation_statistic_id: {self.emulation_statistics.id}," f"conditionals: {conditionals}, metrics: {metrics}") return model
[docs] def hparam_names(self) -> List[str]: """ :return: the names of the necessary hyperparameters """ return [ system_identification_constants.SYSTEM_IDENTIFICATION.CONDITIONAL_DISTRIBUTIONS, system_identification_constants.SYSTEM_IDENTIFICATION.METRICS, system_identification_constants.GAUSSIAN_PROCESS_REGRESSION.LEARNING_RATE, system_identification_constants.GAUSSIAN_PROCESS_REGRESSION.TRAINING_ITERATIONS ]