Source code for csle_common.dao.training.dqn_policy

from typing import List, Dict, Union, Any
import numpy as np
from numpy.typing import NDArray
import torch
from stable_baselines3 import DQN
from csle_common.dao.training.policy import Policy
from csle_common.dao.training.agent_type import AgentType
from csle_common.dao.training.player_type import PlayerType
from csle_common.dao.simulation_config.state import State
from csle_common.dao.simulation_config.state_type import StateType
from csle_common.dao.simulation_config.action import Action
from csle_common.dao.training.experiment_config import ExperimentConfig
from csle_common.dao.training.policy_type import PolicyType
from csle_common.models.q_network import QNetwork
from csle_common.logging.log import Logger


[docs]class DQNPolicy(Policy): """ A neural network policy learned with DQN """ def __init__(self, model: Union[None, DQN, QNetwork], simulation_name: str, save_path: str, player_type: PlayerType, states: List[State], actions: List[Action], experiment_config: ExperimentConfig, avg_R: float): """ Initializes the policy :param model: the DQN model :param simulation_name: the simulation name :param save_path: the path to save the model to :param states: list of states (required for computing stage policies) :param actions: list of actions :param experiment_config: the experiment configuration for training the policy :param avg_R: the average reward of the policy when evaluated in the simulation """ super(DQNPolicy, self).__init__(agent_type=AgentType.DQN, player_type=player_type) self.model = model self.id = -1 self.simulation_name = simulation_name self.save_path = save_path if self.model is None and save_path != "": try: self.model = DQN.load(path=self.save_path) except Exception as e: try: self.model = QNetwork.load(path=self.save_path) except Exception as e2: Logger.__call__().get_logger().warning( f"There was an exception loading the model from path: {self.save_path}, " f"exception: {str(e)}, {repr(e)}, {str(e2)}, {repr(e2)}") self.states = states self.actions = actions self.experiment_config = experiment_config self.avg_R = avg_R self.policy_type = PolicyType.DQN
[docs] def action(self, o: List[float], deterministic: bool = True) -> NDArray[Any]: """ Multi-threshold stopping policy :param o: the current observation :param deterministic: boolean flag indicating whether the action selection should be deterministic :return: the selected action """ if self.model is None: raise ValueError("The model i None") if isinstance(self.model, DQN): a = self.model.predict(np.array(o), deterministic=deterministic)[0] elif isinstance(self.model, QNetwork): a = self.model.forward(x=torch.Tensor(o)).numpy() else: raise ValueError(f"Invalid model: {self.model}") return a
[docs] def probability(self, o: List[float], a) -> int: """ Multi-threshold stopping policy :param o: the current observation :return: the selected action """ if self.model is None: raise ValueError("The model is None") if isinstance(self.model, DQN): actions = self.model.policy.forward(obs=torch.tensor(o).to(self.model.device)) action = actions[0] if action == a: return 1 else: return 0 else: raise ValueError(f"This function is not supported for the current model: {self.model}")
[docs] def to_dict(self) -> Dict[str, Any]: """ :return: a dict representation of the policy """ d: Dict[str, Any] = {} d["id"] = self.id d["simulation_name"] = self.simulation_name d["save_path"] = self.save_path d["states"] = list(map(lambda x: x.to_dict(), self.states)) d["player_type"] = self.player_type d["actions"] = list(map(lambda x: x.to_dict(), self.actions)) d["experiment_config"] = self.experiment_config.to_dict() d["agent_type"] = self.agent_type d["avg_R"] = self.avg_R d["policy_type"] = self.policy_type return d
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "DQNPolicy": """ Converst a dict representation of the object to an instance :param d: the dict to convert :return: the created instance """ obj = DQNPolicy(model=None, simulation_name=d["simulation_name"], save_path=d["save_path"], states=list(map(lambda x: State.from_dict(x), d["states"])), player_type=d["player_type"], actions=list(map(lambda x: Action.from_dict(x), d["actions"])), experiment_config=ExperimentConfig.from_dict(d["experiment_config"]), avg_R=d["avg_R"]) obj.id = d["id"] return obj
[docs] def stage_policy(self, o: Union[List[int], List[float]]) -> List[List[float]]: """ Gets the stage policy, i.e a |S|x|A| policy :param o: the latest observation :return: the |S|x|A| stage policy """ b1 = o[1] l = int(o[0]) if not self.player_type == PlayerType.ATTACKER: stage_policy = [] for _ in self.states: stage_policy.append(self._get_attacker_stopping_dist(obs=o)) return stage_policy else: stage_policy = [] for s in self.states: if s.state_type != StateType.TERMINAL: o = [l, b1, s.id] stage_policy.append(self._get_attacker_stopping_dist(obs=o)) else: stage_policy.append([0.5, 0.5]) return stage_policy
def _get_attacker_stopping_dist(self, obs) -> List[float]: """ Utility function for getting the stopping action distribution conditioned on a given observation :param obs: the observation to condition on :return: the conditional ation distribution """ if self.model is None: raise ValueError("The model is None") obs = np.array([obs]) if isinstance(self.model, DQN): actions = self.model.policy.forward(obs=torch.tensor(obs).to(self.model.device)) else: raise ValueError(f"Stopping dist not supported for model: {self.model}") action = actions[0] if action == 1: stop_prob = 1 else: stop_prob = 0 return [1 - stop_prob, stop_prob] def __str__(self) -> str: """ :return: a string representation of the object """ return f"model: {self.model}, id: {self.id}, simulation_name: {self.simulation_name}, " \ f"save path: {self.save_path}, states: {self.states}, experiment_config: {self.experiment_config}," \ f"avg_R: {self.avg_R}, policy_type: {self.policy_type}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "DQNPolicy": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return DQNPolicy.from_dict(json.loads(json_str))
[docs] def copy(self) -> "DQNPolicy": """ :return: a copy of the DTO """ return self.from_dict(self.to_dict())