"""
MIT License
Copyright (c) 2019 CleanRL developers https://github.com/vwxyzjn/cleanrl
"""
from typing import Union, List, Optional, Callable, Tuple
import random
import time
import gymnasium as gym
from gymnasium.wrappers.record_episode_statistics import RecordEpisodeStatistics
from gymnasium.spaces.discrete import Discrete
import os
import numpy as np
import torch
import torch.nn.utils.clip_grad as clip_grad
import torch.optim as optim
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.training.experiment_config import ExperimentConfig
from csle_common.dao.training.experiment_execution import ExperimentExecution
from csle_common.dao.training.experiment_result import ExperimentResult
from csle_common.dao.training.agent_type import AgentType
from csle_common.util.experiment_util import ExperimentUtil
from csle_common.logging.log import Logger
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.dao.jobs.training_job_config import TrainingJobConfig
from csle_common.util.general_util import GeneralUtil
from csle_common.dao.simulation_config.base_env import BaseEnv
from csle_common.dao.training.ppo_policy import PPOPolicy
from csle_agents.agents.base.base_agent import BaseAgent
from csle_common.models.ppo_network import PPONetwork
import csle_agents.constants.constants as agents_constants
[docs]class PPOCleanAgent(BaseAgent):
"""
A PPO agent using the implementation from CleanRL
"""
def __init__(self, simulation_env_config: SimulationEnvConfig,
emulation_env_config: Union[None, EmulationEnvConfig], experiment_config: ExperimentConfig,
training_job: Optional[TrainingJobConfig] = None, save_to_metastore: bool = True) -> None:
"""
Initializes the agent, and sets the hyperparameters as attributes of the class representing the agent.
:param simulation_env_config: the simulation environment configuration
:param emulation_env_config: the emulation environment configuration
:param experiment_config: the experiment configuration
:param training_job: the training job
:param save_to_metastore: boolean flag indicating whether the results should be saved to the metastore or not
"""
super(PPOCleanAgent, self).__init__(simulation_env_config=simulation_env_config,
emulation_env_config=emulation_env_config,
experiment_config=experiment_config)
assert experiment_config.agent_type == AgentType.PPO_CLEAN
self.training_job = training_job
self.save_to_metastore = save_to_metastore
self.num_steps = self.experiment_config.hparams[agents_constants.PPO_CLEAN.NUM_STEPS].value
self.num_envs = self.experiment_config.hparams[agents_constants.COMMON.NUM_PARALLEL_ENVS].value
self.batch_size = self.experiment_config.hparams[agents_constants.COMMON.BATCH_SIZE].value
self.total_timesteps = self.experiment_config.hparams[
agents_constants.COMMON.NUM_TRAINING_TIMESTEPS].value
self.num_minibatches = self.experiment_config.hparams[agents_constants.PPO_CLEAN.NUM_MINIBATCHES].value
self.minibatch_size = int(self.batch_size // self.num_minibatches)
self.num_iterations = self.total_timesteps // self.batch_size
self.update_epochs = self.experiment_config.hparams[agents_constants.PPO_CLEAN.UPDATE_EPOCHS].value
self.clip_coef = self.experiment_config.hparams[agents_constants.PPO_CLEAN.CLIP_RANGE].value
self.clip_vloss = self.experiment_config.hparams[agents_constants.PPO_CLEAN.CLIP_VLOSS].value
self.norm_adv = self.experiment_config.hparams[agents_constants.PPO_CLEAN.NORM_ADV].value
self.vf_coef = self.experiment_config.hparams[agents_constants.PPO_CLEAN.CLIP_RANGE_VF].value
self.ent_coef = self.experiment_config.hparams[agents_constants.PPO_CLEAN.ENT_COEF].value
self.max_grad_norm = self.experiment_config.hparams[agents_constants.PPO_CLEAN.MAX_GRAD_NORM].value
self.target_kl = None
if self.experiment_config.hparams[agents_constants.PPO_CLEAN.TARGET_KL].value != -1:
self.target_kl = self.experiment_config.hparams[agents_constants.PPO_CLEAN.TARGET_KL].value
self.anneal_lr = self.experiment_config.hparams[agents_constants.PPO_CLEAN.ANNEAL_LR].value
self.gamma = self.experiment_config.hparams[agents_constants.COMMON.GAMMA].value
self.gae_lambda = self.experiment_config.hparams[agents_constants.PPO_CLEAN.GAE_LAMBDA].value
self.learning_rate = self.experiment_config.hparams[agents_constants.COMMON.LEARNING_RATE].value
config = self.simulation_env_config.simulation_env_input_config
self.orig_env: BaseEnv = gym.make(self.simulation_env_config.gym_env_name, config=config)
[docs] def train(self) -> ExperimentExecution:
"""
Runs the training process
:return: the results
"""
pid = os.getpid()
# Setup experiment metrics
exp_result = ExperimentResult()
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON)
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_TIME_HORIZON)
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_UPPER_BOUND_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RANDOM_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_HEURISTIC_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNTIME)
descr = f"Training of policies with Clean-PPO using " \
f"simulation:{self.simulation_env_config.name}"
# Setup training job
if self.training_job is None:
emulation_name = ""
if self.emulation_env_config is not None:
emulation_name = self.emulation_env_config.name
self.training_job = TrainingJobConfig(
simulation_env_name=self.simulation_env_config.name, experiment_config=self.experiment_config,
progress_percentage=0, pid=pid, experiment_result=exp_result,
emulation_env_name=emulation_name, simulation_traces=[],
num_cached_traces=agents_constants.COMMON.NUM_CACHED_SIMULATION_TRACES,
log_file_path=Logger.__call__().get_log_file_path(), descr=descr,
physical_host_ip=GeneralUtil.get_host_ip())
training_job_id = -1
if self.save_to_metastore:
training_job_id = MetastoreFacade.save_training_job(training_job=self.training_job)
self.training_job.id = training_job_id
else:
self.training_job.pid = pid
self.training_job.progress_percentage = 0
self.training_job.experiment_result = exp_result
if self.save_to_metastore:
MetastoreFacade.update_training_job(training_job=self.training_job, id=self.training_job.id)
# Setup experiment execution
ts = time.time()
emulation_name = ""
if self.emulation_env_config is not None:
emulation_name = self.emulation_env_config.name
simulation_name = self.simulation_env_config.name
self.exp_execution = ExperimentExecution(
result=exp_result, config=self.experiment_config, timestamp=ts,
emulation_name=emulation_name, simulation_name=simulation_name, descr=descr,
log_file_path=self.training_job.log_file_path)
exp_execution_id = -1
if self.save_to_metastore:
exp_execution_id = MetastoreFacade.save_experiment_execution(self.exp_execution)
self.exp_execution.id = exp_execution_id
# Training runs, one per seed
for seed in self.experiment_config.random_seeds:
# Train
exp_result, env, model = self.run_ppo(exp_result=exp_result, seed=seed)
# Save policy
ts = time.time()
save_path = f"{self.experiment_config.output_dir}/ppo_policy_seed_{seed}_{ts}.zip"
model.save(save_path)
policy = PPOPolicy(
model=model, simulation_name=self.simulation_env_config.name, save_path=save_path,
states=self.simulation_env_config.state_space_config.states,
actions=self.simulation_env_config.joint_action_space_config.action_spaces[
self.experiment_config.player_idx].actions, player_type=self.experiment_config.player_type,
experiment_config=self.experiment_config,
avg_R=exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN][-1])
exp_result.policies[seed] = policy
# Save policy metadata
if self.save_to_metastore:
MetastoreFacade.save_ppo_policy(ppo_policy=policy)
os.chmod(save_path, 0o777)
# Save trace
traces = env.get_traces()
if len(traces) > 0 and self.save_to_metastore:
MetastoreFacade.save_simulation_trace(traces[-1])
env.reset_traces()
# Calculate average and std metrics
exp_result.avg_metrics = {}
exp_result.std_metrics = {}
for metric in exp_result.all_metrics[self.experiment_config.random_seeds[0]].keys():
value_vectors = []
for seed in self.experiment_config.random_seeds:
value_vectors.append(exp_result.all_metrics[seed][metric])
avg_metrics = []
std_metrics = []
for i in range(len(value_vectors[0])):
seed_values = []
for seed_idx in range(len(self.experiment_config.random_seeds)):
seed_values.append(value_vectors[seed_idx][i])
avg_metrics.append(ExperimentUtil.mean_confidence_interval(
data=seed_values,
confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[0])
std_metrics.append(ExperimentUtil.mean_confidence_interval(
data=seed_values,
confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[1])
exp_result.avg_metrics[metric] = avg_metrics
exp_result.std_metrics[metric] = std_metrics
return self.exp_execution
[docs] def run_ppo(self, exp_result: ExperimentResult, seed: int) -> Tuple[ExperimentResult, BaseEnv, PPONetwork]:
"""
Runs PPO with a given seed
:param exp_result: the object to save the experiment results
:param seed: the random seed
:return: the updated experiment results, the environment, and the trained model
"""
Logger.__call__().get_logger().info(f"[CleanPPO] Start training; seed: {seed}")
envs = gym.vector.SyncVectorEnv([self.make_env() for _ in range(self.num_envs)])
# Setup training metrics
exp_result.all_metrics[seed] = {}
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON] = []
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON] = []
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_UPPER_BOUND_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RANDOM_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_HEURISTIC_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME] = []
ExperimentUtil.set_seed(seed)
cuda = False
# Create neural network
device = torch.device(agents_constants.PPO_CLEAN.CUDA if torch.cuda.is_available() and cuda else
self.experiment_config.hparams[constants.NEURAL_NETWORKS.DEVICE].value)
num_hidden_layers = self.experiment_config.hparams[constants.NEURAL_NETWORKS.NUM_HIDDEN_LAYERS].value
hidden_layer_dim = self.experiment_config.hparams[constants.NEURAL_NETWORKS.NUM_NEURONS_PER_HIDDEN_LAYER].value
input_dim = np.array(envs.single_observation_space.shape).prod()
env: BaseEnv = self.orig_env
action_space: Discrete = env.action_space
action_dim = int(action_space.n)
model = PPONetwork(input_dim=input_dim, output_dim_critic=1, output_dim_action=action_dim,
num_hidden_layers=num_hidden_layers, hidden_layer_dim=hidden_layer_dim).to(device)
# seeding
random.seed(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.backends.cudnn.deterministic = True
# Setup gym environment
obs_shape: Tuple[int, ...] = (1,)
action_shape: Tuple[int, ...] = (1,)
if envs.single_observation_space.shape is not None:
obs_shape = envs.single_observation_space.shape
if envs.single_action_space.shape is not None:
action_shape = envs.single_action_space.shape
obs = torch.zeros((self.num_steps, self.num_envs) + obs_shape).to(device)
actions = torch.zeros((self.num_steps, self.num_envs) + action_shape).to(device)
logprobs = torch.zeros((self.num_steps, self.num_envs)).to(device)
rewards = torch.zeros((self.num_steps, self.num_envs)).to(device)
dones = torch.zeros((self.num_steps, self.num_envs)).to(device)
values = torch.zeros((self.num_steps, self.num_envs)).to(device)
horizons: List[int] = []
# Initialize the environment and optimizers
global_step = 0
start_time = time.time()
next_obs, _ = envs.reset(seed=seed)
next_obs = torch.Tensor(next_obs).to(device)
next_done = torch.zeros(self.num_envs).to(device)
optimizer = optim.Adam(model.parameters(), lr=self.learning_rate, eps=1e-5)
# Training
for iteration in range(1, self.num_iterations + 1):
# Annealing the rate if instructed to do so.
if self.anneal_lr:
frac = 1.0 - (iteration - 1.0) / self.num_iterations
lrnow = frac * self.learning_rate
optimizer.param_groups[0]["lr"] = lrnow
next_obs, next_done, global_step, horizons = \
self.update_trajectory_buffers(global_step=global_step, envs=envs, obs=obs, dones=dones,
actions=actions, rewards=rewards, device=device,
logprobs=logprobs, values=values, model=model, next_obs=next_obs,
next_done=next_done, horizons=horizons)
returns, advantages = self.generalized_advantage_estimation(model=model, next_obs=next_obs, rewards=rewards,
device=device, next_done=next_done,
dones=dones, values=values)
# flatten the batch
b_obs = obs.reshape((-1,) + obs_shape)
b_logprobs = logprobs.reshape(-1)
b_actions = actions.reshape((-1,) + action_shape)
b_advantages = advantages.reshape(-1)
b_returns = returns.reshape(-1)
b_values = values.reshape(-1)
# Optimizing the policy and value network
b_inds = np.arange(self.batch_size)
clipfracs = []
for epoch in range(self.update_epochs):
np.random.shuffle(b_inds)
for start in range(0, self.batch_size, self.minibatch_size):
end = start + self.minibatch_size
mb_inds = list(b_inds[start:end])
_, newlogprob, entropy, newvalue = \
model.get_action_and_value(b_obs[mb_inds], b_actions.long()[mb_inds])
logratio = newlogprob - b_logprobs[mb_inds]
ratio = logratio.exp()
with torch.no_grad():
# calculate approx_kl http://joschu.net/blog/kl-approx.html
approx_kl = ((ratio - 1) - logratio).mean()
clipfracs += [((ratio - 1.0).abs() > self.clip_coef).float().mean().item()]
mb_advantages = b_advantages[mb_inds]
if self.norm_adv:
mb_advantages = (mb_advantages - mb_advantages.mean()) / (mb_advantages.std() + 1e-8)
# Policy loss
pg_loss1 = -mb_advantages * ratio
pg_loss2 = -mb_advantages * torch.clamp(ratio, 1 - self.clip_coef, 1 + self.clip_coef)
pg_loss = torch.max(pg_loss1, pg_loss2).mean()
# Value loss
newvalue = newvalue.view(-1)
if self.clip_vloss:
v_loss_unclipped = (newvalue - b_returns[mb_inds]) ** 2
v_clipped = b_values[mb_inds] + torch.clamp(
newvalue - b_values[mb_inds],
-self.clip_coef,
self.clip_coef,
)
v_loss_clipped = (v_clipped - b_returns[mb_inds]) ** 2
v_loss_max = torch.max(v_loss_unclipped, v_loss_clipped)
v_loss = 0.5 * v_loss_max.mean()
else:
v_loss = 0.5 * ((newvalue - b_returns[mb_inds]) ** 2).mean()
# Entropy loss
entropy_loss = entropy.mean()
# Total loss
loss = pg_loss - self.ent_coef * entropy_loss + v_loss * self.vf_coef
# Backpropagation and update weights
optimizer.zero_grad()
loss.backward()
clip_grad.clip_grad_norm_(model.parameters(), self.max_grad_norm)
optimizer.step()
if self.target_kl is not None and approx_kl > self.target_kl:
break
# Logging
time_elapsed_minutes = round((time.time() - start_time) / 60, 3)
exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME].append(time_elapsed_minutes)
avg_R = round(float(np.mean(returns.flatten().cpu().numpy())), 3)
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN].append(round(avg_R, 3))
avg_T = float(np.mean(horizons))
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON].append(
round(avg_T, 3))
exp_result.all_metrics[seed][agents_constants.COMMON.RUNTIME].append(time_elapsed_minutes)
running_avg_J = ExperimentUtil.running_average(
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value)
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN].append(
round(running_avg_J, 3))
running_avg_T = ExperimentUtil.running_average(
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_TIME_HORIZON],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value)
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON].append(
round(running_avg_T, 3))
Logger.__call__().get_logger().info(
f"[CleanPPO] Iteration: {iteration}/{self.num_iterations}, "
f"avg R: {avg_R}, "
f"R_avg_{self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}:"
f"{running_avg_J}, Avg T:{round(avg_T, 3)}, "
f"Running_avg_{self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}_T: "
f"{round(running_avg_T, 3)}, "
f"runtime: {time_elapsed_minutes} min")
envs.close()
base_env: BaseEnv = envs.envs[0].env.env.env # type: ignore
return exp_result, base_env, model
[docs] def make_env(self) -> Callable[[], RecordEpisodeStatistics]:
"""
Helper function for creating the environment to use for training
:return: a function that creates the environment
"""
def thunk() -> RecordEpisodeStatistics:
"""
Function for creating a new environment
:return: the created environment
"""
config = self.simulation_env_config.simulation_env_input_config
orig_env: BaseEnv = gym.make(self.simulation_env_config.gym_env_name, config=config)
env = RecordEpisodeStatistics(orig_env)
return env
return thunk
[docs] def update_trajectory_buffers(self, global_step: int, envs: gym.vector.SyncVectorEnv, obs: torch.Tensor,
dones: torch.Tensor, actions: torch.Tensor, rewards: torch.Tensor,
device: torch.device, logprobs: torch.Tensor, values: torch.Tensor,
model: PPONetwork, next_obs: torch.Tensor, next_done: torch.Tensor,
horizons: List[int]) \
-> Tuple[torch.Tensor, torch.Tensor, int, List[int]]:
"""
Updates the buffers of trajectories collected from the environment
:param global step: the global step in the iteration
:param envs: list of environments
:param obs: torch tensor of observations
:param dones: tensor of done events
:param actions: tensor of available actions
:param rewards: tensor of available rewards
:param device: the device acted upon
:param logprobs: logarithmic probabilities
:param horizons: list of time horizons
:param values: tensor of values
:param model: the neural network model
:param next_obs: the next observation
:param next_done: logical or operation between terminations and truncations
:return: next_obs, next_done, global_step
"""
step_dones = [0] * self.num_envs
for step in range(0, self.num_steps):
global_step += self.num_envs
obs[step] = next_obs
dones[step] = next_done
for i, d in enumerate(next_done.cpu().numpy()):
if float(d) == 1.0:
horizons.append(step - step_dones[i])
step_dones[i] = step
with torch.no_grad():
action, logprob, _, value = model.get_action_and_value(next_obs)
values[step] = value.flatten()
actions[step] = action
logprobs[step] = logprob
# Step the environment
next_obs, reward, terminations, truncations, infos = envs.step(action.cpu().numpy())
next_done_np = np.logical_or(terminations, truncations)
rewards[step] = torch.tensor(reward).to(device).view(-1)
next_obs, next_done = torch.Tensor(next_obs).to(device), torch.Tensor(next_done_np).to(device)
return next_obs, next_done, global_step, horizons
[docs] def generalized_advantage_estimation(self, model: PPONetwork, next_obs: torch.Tensor, rewards: torch.Tensor,
device: torch.device, next_done: torch.Tensor, dones: torch.Tensor,
values: torch.Tensor) -> Tuple[torch.Tensor, torch.Tensor]:
"""
Computes the generalized advantage estimation (i.e., exponentially weighted average of n-step returns)
See (HIGH-DIMENSIONAL CONTINUOUS CONTROL USING GENERALIZED ADVANTAGE ESTIMATION, 2016, ICLR)
:param device: the device acted upon
:param values: tensor of values
:param model: the neural network model
:param rewards: tensor of available rewards
:param next_obs: the next observation
:param dones: tensor of done events
:param next_done: logical or operation between terminations and truncations
:return: returns, advantages
"""
with torch.no_grad():
next_value = model.get_value(next_obs).reshape(1, -1)
advantages = torch.zeros_like(rewards).to(device)
lastgaelam = 0
for t in reversed(range(self.num_steps)):
if t == self.num_steps - 1:
nextnonterminal = 1.0 - next_done
nextvalues = next_value
else:
nextnonterminal = 1.0 - dones[t + 1]
nextvalues = values[t + 1]
delta = rewards[t] + self.gamma * nextvalues * nextnonterminal - values[t]
advantages[t] = lastgaelam = delta + self.gamma * self.gae_lambda * nextnonterminal * lastgaelam
returns = advantages + values
return returns, advantages
[docs] def hparam_names(self) -> List[str]:
"""
:return: a list with the hyperparameter names
"""
return [constants.NEURAL_NETWORKS.NUM_NEURONS_PER_HIDDEN_LAYER,
constants.NEURAL_NETWORKS.NUM_HIDDEN_LAYERS,
agents_constants.PPO_CLEAN.NUM_MINIBATCHES,
agents_constants.COMMON.NUM_PARALLEL_ENVS, agents_constants.COMMON.BATCH_SIZE,
agents_constants.COMMON.NUM_TRAINING_TIMESTEPS, agents_constants.PPO_CLEAN.GAE_LAMBDA,
agents_constants.PPO_CLEAN.CLIP_RANGE, agents_constants.COMMON.LEARNING_RATE,
agents_constants.PPO_CLEAN.NORM_ADV, agents_constants.COMMON.GAMMA,
agents_constants.PPO_CLEAN.CLIP_RANGE_VF, agents_constants.PPO_CLEAN.ENT_COEF,
agents_constants.PPO_CLEAN.VF_COEF, agents_constants.PPO_CLEAN.UPDATE_EPOCHS,
agents_constants.PPO_CLEAN.TARGET_KL, agents_constants.PPO_CLEAN.MAX_GRAD_NORM,
agents_constants.COMMON.EVAL_EVERY, constants.NEURAL_NETWORKS.DEVICE,
agents_constants.PPO_CLEAN.CLIP_VLOSS, agents_constants.PPO_CLEAN.ANNEAL_LR,
agents_constants.COMMON.SAVE_EVERY]