Source code for csle_agents.agents.c51_clean.c51_clean_agent

"""
MIT License

Copyright (c) 2019 CleanRL developers https://github.com/vwxyzjn/cleanrl
"""
from typing import List, Optional, Tuple, Any
import time
import torch
import torch.optim as optim
import gymnasium as gym
import os
import random
import numpy as np
from stable_baselines3.common.buffers import ReplayBuffer
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.training.experiment_config import ExperimentConfig
from gymnasium.wrappers.record_episode_statistics import RecordEpisodeStatistics
from csle_common.dao.training.experiment_execution import ExperimentExecution
from csle_common.models.q_network import QNetwork
from csle_common.dao.training.experiment_result import ExperimentResult
from csle_common.dao.training.agent_type import AgentType
from csle_common.util.experiment_util import ExperimentUtil
from csle_common.logging.log import Logger
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.dao.jobs.training_job_config import TrainingJobConfig
from csle_common.dao.training.dqn_policy import DQNPolicy
from csle_common.util.general_util import GeneralUtil
from csle_common.dao.simulation_config.base_env import BaseEnv
from csle_agents.agents.base.base_agent import BaseAgent
import csle_agents.constants.constants as agents_constants


[docs]class C51CleanAgent(BaseAgent): """ A C51 agent using the implementation from CleanRL documentation """ def __init__(self, simulation_env_config: SimulationEnvConfig, emulation_env_config: Optional[EmulationEnvConfig], experiment_config: ExperimentConfig, training_job: Optional[TrainingJobConfig] = None, save_to_metastore: bool = True) -> None: """ Initializes the agent :param simulation_env_config: the simulation environment configuration :param emulation_env_config: the emulation environment configuration :param experiment_config: the experiment configuration :param training_job: the training job configuration :param save_to_metastore: boolean flag indicating whether job information should be saved to the metastore """ super(C51CleanAgent, self).__init__(simulation_env_config=simulation_env_config, emulation_env_config=emulation_env_config, experiment_config=experiment_config) assert experiment_config.agent_type == AgentType.C51_CLEAN self.training_job = training_job self.num_hl = self.experiment_config.hparams[constants.NEURAL_NETWORKS.NUM_HIDDEN_LAYERS].value self.num_hl_neur = self.experiment_config.hparams[constants.NEURAL_NETWORKS.NUM_NEURONS_PER_HIDDEN_LAYER].value self.config = self.simulation_env_config.simulation_env_input_config self.save_to_metastore = save_to_metastore self.exp_name: str = self.simulation_env_config.name self.env_id = self.simulation_env_config.gym_env_name self.torch_deterministic = True self.cuda: bool = True self.learning_rate = self.experiment_config.hparams[agents_constants.COMMON.LEARNING_RATE].value self.num_envs = self.experiment_config.hparams[agents_constants.COMMON.NUM_PARALLEL_ENVS].value self.total_timesteps = self.experiment_config.hparams[ agents_constants.COMMON.NUM_TRAINING_TIMESTEPS].value self.batch_size = self.experiment_config.hparams[agents_constants.COMMON.BATCH_SIZE].value self.num_iterations = self.total_timesteps // self.batch_size self.gamma = self.experiment_config.hparams[agents_constants.COMMON.GAMMA].value self.tau = self.experiment_config.hparams[agents_constants.C51_CLEAN.TAU].value self.exploration_fraction = self.experiment_config.hparams[agents_constants.C51_CLEAN.EXP_FRAC].value self.learning_starts = self.experiment_config.hparams[agents_constants.C51_CLEAN.LEARNING_STARTS].value self.train_frequency = self.experiment_config.hparams[agents_constants.C51_CLEAN.TRAIN_FREQ].value self.target_network_frequency = self.experiment_config.hparams[agents_constants.C51_CLEAN.T_N_FREQ].value self.buffer_size = self.experiment_config.hparams[agents_constants.C51_CLEAN.BUFFER_SIZE].value self.save_model = self.experiment_config.hparams[agents_constants.C51_CLEAN.SAVE_MODEL].value self.device = self.experiment_config.hparams[constants.NEURAL_NETWORKS.DEVICE].value self.start_exploration_rate = ( self.experiment_config.hparams[agents_constants.C51_CLEAN.START_EXPLORATION_RATE].value) self.end_eploration_rate = ( self.experiment_config.hparams[agents_constants.C51_CLEAN.END_EXPLORATION_RATE].value) self.n_atoms = self.experiment_config.hparams[agents_constants.C51_CLEAN.N_ATOMS].value self.v_min = self.experiment_config.hparams[agents_constants.C51_CLEAN.V_MIN].value self.v_max = self.experiment_config.hparams[agents_constants.C51_CLEAN.V_MAX].value self.orig_env: BaseEnv = gym.make(self.simulation_env_config.gym_env_name, config=self.config)
[docs] def linear_schedule(self, duration: int, t: int) -> float: """ Linear exploration rate schedule :param duration: training duration :param t: the time step of the training :return: the updated exploration rate """ slope = (self.end_eploration_rate - self.start_exploration_rate) / duration return float(max(slope * t + self.start_exploration_rate, self.end_eploration_rate))
[docs] def make_env(self): """ Helper function for creating an environment in training of the agent :return: environment creating function """ def thunk(): """ The environment creating function """ orig_env: BaseEnv = gym.make(self.simulation_env_config.gym_env_name, config=self.config) env = RecordEpisodeStatistics(orig_env) return env return thunk
[docs] def train(self) -> ExperimentExecution: """ Implements the training logic of the C51 algorithm :return: the experiment result """ pid = os.getpid() # Setup experiment metrics exp_result = ExperimentResult() exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_TIME_HORIZON) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_UPPER_BOUND_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RANDOM_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_HEURISTIC_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.RUNTIME) descr = f"Training of policies with C51 using " \ f"simulation:{self.simulation_env_config.name}" # Setup training job if self.training_job is None: emulation_name = "" if self.emulation_env_config is not None: emulation_name = self.emulation_env_config.name self.training_job = TrainingJobConfig( simulation_env_name=self.simulation_env_config.name, experiment_config=self.experiment_config, progress_percentage=0, pid=pid, experiment_result=exp_result, emulation_env_name=emulation_name, simulation_traces=[], num_cached_traces=agents_constants.COMMON.NUM_CACHED_SIMULATION_TRACES, log_file_path=Logger.__call__().get_log_file_path(), descr=descr, physical_host_ip=GeneralUtil.get_host_ip()) training_job_id = -1 if self.save_to_metastore: training_job_id = MetastoreFacade.save_training_job(training_job=self.training_job) self.training_job.id = training_job_id else: self.training_job.pid = pid self.training_job.progress_percentage = 0 self.training_job.experiment_result = exp_result if self.save_to_metastore: MetastoreFacade.update_training_job(training_job=self.training_job, id=self.training_job.id) # Setup experiment execution ts = time.time() simulation_name = self.simulation_env_config.name emulation_name = "" if self.emulation_env_config is not None: emulation_name = self.emulation_env_config.name if self.emulation_env_config is not None: emulation_name = self.emulation_env_config.name self.exp_execution = ExperimentExecution( result=exp_result, config=self.experiment_config, timestamp=ts, emulation_name=emulation_name, simulation_name=simulation_name, descr=descr, log_file_path=self.training_job.log_file_path) exp_execution_id = -1 if self.save_to_metastore: exp_execution_id = MetastoreFacade.save_experiment_execution(self.exp_execution) self.exp_execution.id = exp_execution_id # Training runs, one per seed for seed in self.experiment_config.random_seeds: assert self.num_envs == 1, "vectorized envs are not supported at the moment" # Train exp_result, env, model = self.run_c51(exp_result=exp_result, seed=seed) # Save policy ts = time.time() save_path = f"{self.experiment_config.output_dir}/C51_policy_seed_{seed}_{ts}.zip" model.save(save_path) action_space = \ self.simulation_env_config.joint_action_space_config.action_spaces[ self.experiment_config.player_idx].actions policy = DQNPolicy(model=model, simulation_name=self.simulation_env_config.name, save_path=save_path, player_type=self.experiment_config.player_type, states=self.simulation_env_config.state_space_config.states, actions=action_space, experiment_config=self.experiment_config, avg_R=exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN][-1] ) if self.save_to_metastore: MetastoreFacade.save_dqn_policy(dqn_policy=policy) os.chmod(save_path, 0o777) # Save trace traces = env.get_traces() if len(traces) > 0 and self.save_to_metastore: MetastoreFacade.save_simulation_trace(traces[-1]) env.reset_traces() # Calculate average and std metrics exp_result.avg_metrics = {} exp_result.std_metrics = {} for metric in exp_result.all_metrics[self.experiment_config.random_seeds[0]].keys(): value_vectors = [] for seed in self.experiment_config.random_seeds: value_vectors.append(exp_result.all_metrics[seed][metric]) avg_metrics = [] std_metrics = [] for i in range(len(value_vectors[0])): seed_values = [] for seed_idx in range(len(self.experiment_config.random_seeds)): seed_values.append(value_vectors[seed_idx][i]) avg_metrics.append(ExperimentUtil.mean_confidence_interval( data=seed_values, confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[0]) std_metrics.append(ExperimentUtil.mean_confidence_interval( data=seed_values, confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[1]) exp_result.avg_metrics[metric] = avg_metrics exp_result.std_metrics[metric] = std_metrics return self.exp_execution
[docs] def run_c51(self, exp_result: ExperimentResult, seed: int) -> Tuple[ExperimentResult, BaseEnv, QNetwork]: """ Runs C51 with given seed :param exp_result: the object to save the experiment results :param seed: the random seed :retur: the updated experiment results, the environment and the trained model """ Logger.__call__().get_logger().info(f"[CleanC51] Start training; seed: {seed}") envs = gym.vector.SyncVectorEnv([self.make_env() for _ in range(self.num_envs)]) assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported" # Setup training metrics exp_result.all_metrics[seed] = {} exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_UPPER_BOUND_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RANDOM_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_HEURISTIC_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME] = [] ExperimentUtil.set_seed(seed) cuda = False # Create neural network device = torch.device(agents_constants.C51_CLEAN.CUDA if torch.cuda.is_available() and cuda else self.experiment_config.hparams[constants.NEURAL_NETWORKS.DEVICE].value) input_dim = np.array(envs.single_observation_space.shape).prod() q_network = QNetwork(input_dim, num_hidden_layers=self.num_hl, hidden_layer_dim=self.num_hl_neur, agent_type=self.experiment_config.agent_type, n_atoms=self.n_atoms, action_space_dim=envs.single_action_space.n).to(device) optimizer = optim.Adam(q_network.parameters(), lr=self.learning_rate, eps=0.01 / self.batch_size) target_network = QNetwork( input_dim, num_hidden_layers=self.num_hl, hidden_layer_dim=self.num_hl_neur, agent_type=self.experiment_config.agent_type, n_atoms=self.n_atoms, action_space_dim=envs.single_action_space.n).to(device) # Seeding random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.backends.cudnn.deterministic = self.torch_deterministic target_network.load_state_dict(q_network.state_dict()) rb = ReplayBuffer(self.buffer_size, envs.single_observation_space, envs.single_action_space, device, handle_timeout_termination=False) start_time = time.time() obs, _ = envs.reset(seed=seed) R: List[Any] = [] T = [] i = 0 for global_step in range(self.total_timesteps): i += 1 epsilon = self.linear_schedule(duration=self.exploration_fraction * self.total_timesteps, t=global_step) if random.random() < epsilon: actions = np.array([envs.single_action_space.sample() for _ in range(envs.num_envs)]) else: actions_tensor, pmf = q_network.get_action(torch.Tensor(obs).to(device)) actions = actions_tensor.cpu().numpy() next_obs, rewards, terminations, truncations, infos = envs.step(actions) if "final_info" in infos: R_sum = 0 T_sum = 0 for info in infos["final_info"]: R_sum += info["R"] T_sum += info["T"] R.append(R_sum) T.append(T_sum) real_next_obs = next_obs.copy() for idx, trunc in enumerate(truncations): if trunc: real_next_obs[idx] = infos["final_observation"][idx] rb.add(obs, real_next_obs, actions, rewards, terminations, infos) # type: ignore obs = next_obs if global_step > self.learning_starts: if global_step % self.train_frequency == 0: data = rb.sample(self.batch_size) with torch.no_grad(): _, next_pmfs = target_network.get_action(data.next_observations.to(dtype=torch.float32)) next_atoms = data.rewards + self.gamma * target_network.atoms * (1 - data.dones) # projection delta_z = target_network.atoms[1] - target_network.atoms[0] tz = next_atoms.clamp(self.v_min, self.v_max) b = (tz - self.v_min) / delta_z l = b.floor().clamp(0, self.n_atoms - 1) u = b.ceil().clamp(0, self.n_atoms - 1) d_m_l = (u + (l == u).float() - b) * next_pmfs d_m_u = (b - l) * next_pmfs target_pmfs = torch.zeros_like(next_pmfs) for i in range(target_pmfs.size(0)): target_pmfs[i].index_add_(0, l[i].long(), d_m_l[i]) target_pmfs[i].index_add_(0, u[i].long(), d_m_u[i]) _, old_pmfs = q_network.get_action(data.observations.to(dtype=torch.float32)) loss = (-(target_pmfs * old_pmfs.clamp(min=1e-5, max=1 - 1e-5).log()).sum(-1)).mean() # optimize the model optimizer.zero_grad() loss.backward() optimizer.step() # update target network if global_step % self.target_network_frequency == 0: target_network.load_state_dict(q_network.state_dict()) # Logging if global_step > 10 and global_step % self.experiment_config.log_every == 0: time_elapsed_minutes = round((time.time() - start_time) / 60, 3) exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME].append(time_elapsed_minutes) avg_R = round(float(np.mean(R)), 3) exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN].append(round(avg_R, 3)) avg_T = round(float(np.mean(T)), 3) exp_result.all_metrics[seed][ agents_constants.COMMON.AVERAGE_TIME_HORIZON].append(round(avg_T, 3)) running_avg_J = ExperimentUtil.running_average( exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN], self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value ) running_avg_T = ExperimentUtil.running_average( exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON], self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value) exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN].append( round(running_avg_J, 3)) Logger.__call__().get_logger().info( f"[C51Clean] Iteration: {global_step}/{self.total_timesteps}, " f"avg R: {avg_R}, " f"R_avg_{self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}:" f"{running_avg_J}, Avg T:{round(avg_T, 3)}, " f"Running_avg_{self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}_T: " f"{round(running_avg_T, 3)}, " f"runtime: {time_elapsed_minutes} min") envs.close() base_env: BaseEnv = envs.envs[0].env.env.env # type: ignore return exp_result, base_env, q_network
[docs] def hparam_names(self) -> List[str]: """ Gets the hyperparameters :return: a list with the hyperparameter names """ return [constants.NEURAL_NETWORKS.NUM_NEURONS_PER_HIDDEN_LAYER, constants.NEURAL_NETWORKS.NUM_HIDDEN_LAYERS, agents_constants.COMMON.LEARNING_RATE, agents_constants.COMMON.BATCH_SIZE, agents_constants.COMMON.GAMMA, agents_constants.COMMON.NUM_TRAINING_TIMESTEPS, agents_constants.COMMON.EVAL_EVERY, agents_constants.COMMON.EVAL_BATCH_SIZE, constants.NEURAL_NETWORKS.DEVICE, agents_constants.COMMON.SAVE_EVERY, agents_constants.C51_CLEAN.LEARNING_STARTS, agents_constants.C51_CLEAN.BUFFER_SIZE, agents_constants.C51_CLEAN.N_ATOMS, agents_constants.C51_CLEAN.V_MIN, agents_constants.C51_CLEAN.V_MAX, agents_constants.COMMON.L, agents_constants.C51_CLEAN.NORM_ADV, agents_constants.C51_CLEAN.START_EXPLORATION_RATE, agents_constants.C51_CLEAN.END_EXPLORATION_RATE]