Source code for csle_agents.agents.dqn_clean.dqn_clean_agent

"""
MIT License

Copyright (c) 2019 CleanRL developers https://github.com/vwxyzjn/cleanrl
"""
from typing import List, Optional, Tuple, Any, Callable
import time
import torch
import torch.optim as optim
import gymnasium as gym
import os
import random
import torch.nn.functional as F
import numpy as np
from stable_baselines3.common.buffers import ReplayBuffer
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.training.experiment_config import ExperimentConfig
from gymnasium.wrappers.record_episode_statistics import RecordEpisodeStatistics
from csle_common.dao.training.experiment_execution import ExperimentExecution
from csle_common.models.q_network import QNetwork
from csle_common.dao.training.experiment_result import ExperimentResult
from csle_common.dao.training.agent_type import AgentType
from csle_common.util.experiment_util import ExperimentUtil
from csle_common.logging.log import Logger
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.dao.jobs.training_job_config import TrainingJobConfig
from csle_common.dao.training.dqn_policy import DQNPolicy
from csle_common.util.general_util import GeneralUtil
from csle_common.dao.simulation_config.base_env import BaseEnv
from csle_agents.agents.base.base_agent import BaseAgent
import csle_agents.constants.constants as agents_constants


[docs]class DQNCleanAgent(BaseAgent): """ A DQN agent using the implementation from OpenAI baselines """ def __init__(self, simulation_env_config: SimulationEnvConfig, emulation_env_config: Optional[EmulationEnvConfig], experiment_config: ExperimentConfig, training_job: Optional[TrainingJobConfig] = None, save_to_metastore: bool = True) -> None: """ Initializes the agent :param simulation_env_config: the simulation environment configuration :param emulation_env_config: the emulation environment configuration :param experiment_config: the experiment configuration :param training_job: the training job configuration :param save_to_metastore: boolean flag indicating whether job information should be saved to the metastore """ super(DQNCleanAgent, self).__init__(simulation_env_config=simulation_env_config, emulation_env_config=emulation_env_config, experiment_config=experiment_config) assert experiment_config.agent_type == AgentType.DQN_CLEAN self.training_job = training_job self.num_hl = self.experiment_config.hparams[constants.NEURAL_NETWORKS.NUM_HIDDEN_LAYERS].value self.num_hl_neur = self.experiment_config.hparams[constants.NEURAL_NETWORKS.NUM_NEURONS_PER_HIDDEN_LAYER].value self.config = self.simulation_env_config.simulation_env_input_config self.save_to_metastore = save_to_metastore self.exp_name: str = self.simulation_env_config.name self.env_id = self.simulation_env_config.gym_env_name self.torch_deterministic = True self.cuda: bool = True self.learning_rate = self.experiment_config.hparams[agents_constants.COMMON.LEARNING_RATE].value self.start_e = 1 self.end_e = 0.05 self.num_envs = self.experiment_config.hparams[agents_constants.COMMON.NUM_PARALLEL_ENVS].value self.total_timesteps = self.experiment_config.hparams[agents_constants.COMMON.NUM_TRAINING_TIMESTEPS].value self.batch_size = self.experiment_config.hparams[agents_constants.COMMON.BATCH_SIZE].value self.num_iterations = self.total_timesteps // self.batch_size self.gamma = self.experiment_config.hparams[agents_constants.COMMON.GAMMA].value self.tau = self.experiment_config.hparams[agents_constants.DQN_CLEAN.TAU].value self.exploration_fraction = self.experiment_config.hparams[agents_constants.DQN_CLEAN.EXP_FRAC].value self.learning_starts = self.experiment_config.hparams[agents_constants.DQN_CLEAN.LEARNING_STARTS].value self.train_frequency = self.experiment_config.hparams[agents_constants.DQN_CLEAN.TRAIN_FREQ].value self.target_network_frequency = self.experiment_config.hparams[agents_constants.DQN_CLEAN.T_N_FREQ].value self.buffer_size = self.experiment_config.hparams[agents_constants.DQN_CLEAN.BUFFER_SIZE].value self.save_model = self.experiment_config.hparams[agents_constants.DQN_CLEAN.SAVE_MODEL].value self.device = self.experiment_config.hparams[constants.NEURAL_NETWORKS.DEVICE].value self.orig_env: BaseEnv = gym.make(self.simulation_env_config.gym_env_name, config=self.config)
[docs] def linear_schedule(self, duration: int, t: int) -> float: """ Linear exploration rate decay sechdule :param duration: the duration of training :param t: the current time :return: the new exploration rate """ slope = (self.end_e - self.start_e) / duration return max(slope * t + self.start_e, self.end_e)
[docs] def make_env(self) -> Callable[[], RecordEpisodeStatistics]: """ Helper function for creating an environment in training of the agent :return: environment creating function """ def thunk() -> RecordEpisodeStatistics: """ The environment creating function """ orig_env: BaseEnv = gym.make(self.simulation_env_config.gym_env_name, config=self.config) env = RecordEpisodeStatistics(orig_env) return env return thunk
[docs] def train(self) -> ExperimentExecution: """ Implements the training logic of the DQN algorithm :return: the experiment result """ pid = os.getpid() # Setup experiment metrics exp_result = ExperimentResult() exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_TIME_HORIZON) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_UPPER_BOUND_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RANDOM_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_HEURISTIC_RETURN) exp_result.plot_metrics.append(agents_constants.COMMON.RUNTIME) descr = f"Training of policies with Clean-DQN using " \ f"simulation:{self.simulation_env_config.name}" # Setup training job if self.training_job is None: emulation_name = "" if self.emulation_env_config is not None: emulation_name = self.emulation_env_config.name self.training_job = TrainingJobConfig( simulation_env_name=self.simulation_env_config.name, experiment_config=self.experiment_config, progress_percentage=0, pid=pid, experiment_result=exp_result, emulation_env_name=emulation_name, simulation_traces=[], num_cached_traces=agents_constants.COMMON.NUM_CACHED_SIMULATION_TRACES, log_file_path=Logger.__call__().get_log_file_path(), descr=descr, physical_host_ip=GeneralUtil.get_host_ip()) training_job_id = -1 if self.save_to_metastore: training_job_id = MetastoreFacade.save_training_job(training_job=self.training_job) self.training_job.id = training_job_id else: self.training_job.pid = pid self.training_job.progress_percentage = 0 self.training_job.experiment_result = exp_result if self.save_to_metastore: MetastoreFacade.update_training_job(training_job=self.training_job, id=self.training_job.id) # Setup experiment execution ts = time.time() simulation_name = self.simulation_env_config.name emulation_name = "" if self.emulation_env_config is not None: emulation_name = self.emulation_env_config.name if self.emulation_env_config is not None: emulation_name = self.emulation_env_config.name self.exp_execution = ExperimentExecution( result=exp_result, config=self.experiment_config, timestamp=ts, emulation_name=emulation_name, simulation_name=simulation_name, descr=descr, log_file_path=self.training_job.log_file_path) exp_execution_id = -1 if self.save_to_metastore: exp_execution_id = MetastoreFacade.save_experiment_execution(self.exp_execution) self.exp_execution.id = exp_execution_id # Training runs, one per seed for seed in self.experiment_config.random_seeds: assert self.num_envs == 1, "vectorized envs are not supported at the moment" # Train exp_result, env, model = self.run_dqn(exp_result=exp_result, seed=seed) # Save policy ts = time.time() save_path = f"{self.experiment_config.output_dir}/DQN_policy_seed_{seed}_{ts}.zip" model.save(save_path) action_space = \ self.simulation_env_config.joint_action_space_config.action_spaces[ self.experiment_config.player_idx].actions policy = DQNPolicy(model=model, simulation_name=self.simulation_env_config.name, save_path=save_path, player_type=self.experiment_config.player_type, states=self.simulation_env_config.state_space_config.states, actions=action_space, experiment_config=self.experiment_config, avg_R=exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN][-1]) if self.save_to_metastore: MetastoreFacade.save_dqn_policy(dqn_policy=policy) os.chmod(save_path, 0o777) # Save trace traces = env.get_traces() if len(traces) > 0 and self.save_to_metastore: MetastoreFacade.save_simulation_trace(traces[-1]) env.reset_traces() # Calculate average and std metrics exp_result.avg_metrics = {} exp_result.std_metrics = {} for metric in exp_result.all_metrics[self.experiment_config.random_seeds[0]].keys(): value_vectors = [] for seed in self.experiment_config.random_seeds: value_vectors.append(exp_result.all_metrics[seed][metric]) avg_metrics = [] std_metrics = [] for i in range(len(value_vectors[0])): seed_values = [] for seed_idx in range(len(self.experiment_config.random_seeds)): seed_values.append(value_vectors[seed_idx][i]) avg_metrics.append(ExperimentUtil.mean_confidence_interval( data=seed_values, confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[0]) std_metrics.append(ExperimentUtil.mean_confidence_interval( data=seed_values, confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[1]) exp_result.avg_metrics[metric] = avg_metrics exp_result.std_metrics[metric] = std_metrics return self.exp_execution
[docs] def run_dqn(self, exp_result: ExperimentResult, seed: int) -> Tuple[ExperimentResult, BaseEnv, QNetwork]: """ Runs DQN with given seed :param exp_result: the object to save the experiment results :param seed: the random seed :retur: the updated experiment results, the environment and the trained model """ Logger.__call__().get_logger().info(f"[CleanDQN] Start training; seed: {seed}") envs = gym.vector.SyncVectorEnv([self.make_env() for _ in range(self.num_envs)]) assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported" # Setup training metrics exp_result.all_metrics[seed] = {} exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_UPPER_BOUND_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RANDOM_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_HEURISTIC_RETURN] = [] exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME] = [] ExperimentUtil.set_seed(seed) cuda = False # Create neural network device = torch.device(agents_constants.DQN_CLEAN.CUDA if torch.cuda.is_available() and cuda else self.experiment_config.hparams[constants.NEURAL_NETWORKS.DEVICE].value) input_dim = np.array(envs.single_observation_space.shape).prod() q_network = QNetwork(input_dim=input_dim, num_hidden_layers=self.num_hl, hidden_layer_dim=self.num_hl_neur, agent_type=self.experiment_config.agent_type).to(device) optimizer = optim.Adam(q_network.parameters(), lr=self.learning_rate) target_network = QNetwork(input_dim=input_dim, num_hidden_layers=self.num_hl, hidden_layer_dim=self.num_hl_neur, agent_type=self.experiment_config.agent_type).to(device) # Seeding random.seed(seed) np.random.seed(seed) torch.manual_seed(seed) torch.backends.cudnn.deterministic = self.torch_deterministic target_network.load_state_dict(q_network.state_dict()) rb = ReplayBuffer( self.buffer_size, envs.single_observation_space, envs.single_action_space, device, handle_timeout_termination=False, ) start_time = time.time() obs, _ = envs.reset(seed=seed) R: List[Any] = [] T = [] i = 0 for global_step in range(self.total_timesteps): i += 1 # ALGO LOGIC: put action logic here epsilon = self.linear_schedule(self.exploration_fraction * self.total_timesteps, global_step) if random.random() < epsilon: actions = np.array([envs.single_action_space.sample() for _ in range(envs.num_envs)]) else: q_values = q_network(torch.Tensor(obs).to(device)) actions = torch.argmax(q_values, dim=1).cpu().numpy() e_name = "csle-stopping-game-pomdp-defender-v1" if self.simulation_env_config.simulation_env_input_config.env_name == e_name: actions[0] = random.randrange(0, 2) # Take a step in the environment next_obs, rewards, terminations, truncations, infos = envs.step(actions) # Record rewards for logging purposes if "final_info" in infos: R_sum = 0 T_sum = 0 for info in infos["final_info"]: R_sum += info["R"] T_sum += info["T"] R.append(R_sum) T.append(T_sum) # Save data to replay buffer real_next_obs = next_obs.copy() for idx, trunc in enumerate(truncations): if trunc: real_next_obs[idx] = infos["final_observation"][idx] rb.add(obs, real_next_obs, actions, rewards, terminations, infos) # type: ignore # Move to the next observation obs = next_obs # Optimizing the neural network based on data from the replay buffer if global_step > self.learning_starts: if global_step % self.train_frequency == 0: data = rb.sample(self.batch_size) with torch.no_grad(): target_max, _ = target_network(data.next_observations.to(dtype=torch.float32)).max(dim=1) td_target = data.rewards.flatten() + self.gamma * target_max * (1 - data.dones.flatten()) old_val = q_network(data.observations.to(dtype=torch.float32)).gather(1, data.actions).squeeze() loss = F.mse_loss(td_target, old_val) # optimize the model optimizer.zero_grad() loss.backward() optimizer.step() # update target network if global_step % self.target_network_frequency == 0: for target_network_param, q_network_param in zip(target_network.parameters(), q_network.parameters()): target_network_param.data.copy_( self.tau * q_network_param.data + (1.0 - self.tau) * target_network_param.data ) # Logging if global_step > 10 and global_step % self.experiment_config.log_every == 0: time_elapsed_minutes = round((time.time() - start_time) / 60, 3) exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME].append(time_elapsed_minutes) avg_R = round(float(np.mean(R)), 3) exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN].append(round(avg_R, 3)) avg_T = round(float(np.mean(T)), 3) exp_result.all_metrics[seed][ agents_constants.COMMON.AVERAGE_TIME_HORIZON].append(round(avg_T, 3)) running_avg_J = ExperimentUtil.running_average( exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN], self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value ) running_avg_T = ExperimentUtil.running_average( exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON], self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value) exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN].append( round(running_avg_J, 3)) Logger.__call__().get_logger().info( f"[CleanDQN] Iteration: {global_step}/{self.total_timesteps}, " f"avg R: {avg_R}, " f"R_avg_{self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}:" f"{running_avg_J}, Avg T:{round(avg_T, 3)}, " f"Running_avg_{self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}_T: " f"{round(running_avg_T, 3)}, " f"runtime: {time_elapsed_minutes} min") envs.close() base_env: BaseEnv = envs.envs[0].env.env.env # type: ignore return exp_result, base_env, q_network
[docs] def hparam_names(self) -> List[str]: """ Gets the hyperparameters :return: a list with the hyperparameter names """ return [constants.NEURAL_NETWORKS.NUM_NEURONS_PER_HIDDEN_LAYER, constants.NEURAL_NETWORKS.NUM_HIDDEN_LAYERS, agents_constants.COMMON.LEARNING_RATE, agents_constants.COMMON.BATCH_SIZE, agents_constants.COMMON.GAMMA, agents_constants.COMMON.NUM_TRAINING_TIMESTEPS, agents_constants.COMMON.EVAL_EVERY, agents_constants.COMMON.EVAL_BATCH_SIZE, constants.NEURAL_NETWORKS.DEVICE, agents_constants.COMMON.SAVE_EVERY, agents_constants.DQN.EXPLORATION_INITIAL_EPS, agents_constants.DQN.EXPLORATION_FINAL_EPS, agents_constants.DQN.EXPLORATION_FRACTION, agents_constants.DQN.MLP_POLICY, agents_constants.DQN.MAX_GRAD_NORM, agents_constants.DQN.GRADIENT_STEPS, agents_constants.DQN.N_EPISODES_ROLLOUT, agents_constants.DQN.TARGET_UPDATE_INTERVAL, agents_constants.DQN.LEARNING_STARTS, agents_constants.DQN.BUFFER_SIZE, agents_constants.DQN.DQN_BATCH_SIZE]