Source code for gym_csle_apt_game.util.rollout_util

from typing import Any, List, Tuple, Union
import numpy as np
import random
from multiprocessing import Pool
from csle_common.util.multiprocessing_util import NestablePool
from gym_csle_apt_game.util.apt_game_util import AptGameUtil
from gym_csle_apt_game.dao.apt_game_config import AptGameConfig
from gym_csle_apt_game.envs.apt_game_env import AptGameEnv
import numpy.typing as npt


[docs]class RolloutUtil: """ Class with utility functions for rollout """
[docs] @staticmethod def eval_attacker_base(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, horizon: int, s: Union[int, None], b: npt.NDArray[Any], id: int) -> float: """ Function for evaluating a base threshold strategy of the attacker :param alpha: the defender's threshold :param pi2: the attacker's base strategy :param config: the game configuration :param horizon: the horizon for the Monte-Carlo sampling :param id: the id of the parallel processor :param s: the state :param b: the belief :return: the average return """ np.random.seed(100 * id + 378 * id + 23 + id) random.seed(100 * id + 378 * id + 23 + id) env = AptGameEnv(config=config) env.reset() cumulative_cost = 0.0 env.state.b = b if s is None: s = AptGameUtil.sample_initial_state(b1=b) env.state.s = s for j in range(horizon): b = env.state.b s = env.state.s a1 = 0 if sum(b[1:] >= alpha): a1 = 1 a2 = (pi2, AptGameUtil.sample_attacker_action(pi2=pi2, s=s)) action_profile = (a1, a2) o, costs, done, _, info = env.step(action_profile) c = costs[1] cumulative_cost += c return cumulative_cost
[docs] @staticmethod def eval_attacker_base_parallel(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, num_samples: int, horizon: int, s: Union[int, None], b: List[float]) -> float: """ Starts a pool of parallel processors for evaluating a threshold base strategy of the attacker :param alpha: the threshold of the defender :param pi2: the base strategy of the attacker :param config: the game configuration :param num_samples: the number of monte carlo samples :param horizon: the horizon of the Monte-Carlo sampling :param s: the state :param b: the belief :return: the average cost-to-go of the base strategy """ p = Pool(num_samples) args = [] for i in range(num_samples): args.append((alpha, pi2, config, horizon, s, b, i)) cumulative_costs = p.starmap(RolloutUtil.eval_attacker_base, args) return float(np.mean(cumulative_costs))
[docs] @staticmethod def eval_defender_base(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, horizon: int, s: Union[int, None], b: npt.NDArray[Any], id: int) -> float: """ Function for evaluating a base threshold strategy of the defender :param alpha: the defender's threshold :param pi2: the attacker's strategy :param config: the game configuration :param horizon: the horizon for the Monte-Carlo sampling :param id: the id of the parallel processor :param s: the state :param b: the belief :return: the average return """ np.random.seed(100 * id + 378 * id + 23 + id) random.seed(100 * id + 378 * id + 23 + id) env = AptGameEnv(config=config) env.reset() cumulative_cost = 0.0 env.state.b = b if s is None: s = AptGameUtil.sample_initial_state(b1=b) env.state.s = s for j in range(horizon): b = env.state.b s = env.state.s a1 = 0 if sum(b[1:] >= alpha): a1 = 1 a2 = (pi2, AptGameUtil.sample_attacker_action(pi2=pi2, s=s)) action_profile = (a1, a2) o, costs, done, _, info = env.step(action_profile) c = costs[0] cumulative_cost += c return cumulative_cost
[docs] @staticmethod def eval_defender_base_parallel(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, num_samples: int, horizon: int, s: Union[None, int], b: List[float]) -> float: """ Starts a pool of parallel processors for evaluating a threshold base strategy of the defender :param alpha: the threshold :param pi2: the defender strategy :param config: the game configuration :param num_samples: the number of monte carlo samples :param horizon: the horizon of the Monte-Carlo sampling :param s: the state :param b: the belief :return: the average cost-to-go of the base strategy """ p = Pool(num_samples) args = [] for i in range(num_samples): args.append((alpha, pi2, config, horizon, s, b, i)) cumulative_costs = p.starmap(RolloutUtil.eval_defender_base, args) return float(np.mean(cumulative_costs))
[docs] @staticmethod def exact_defender_rollout(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, num_samples: int, horizon: int, ell: int, b: List[float]) -> Tuple[int, float]: """ Performs exact rollout of the defender against a fixed attacker strategy and with a threshold base strategy :param alpha: the threshold base strategy :param pi2: the strategy of the attacker :param config: the game configuraton :param num_samples: the number of Monte-Carlo samples :param horizon: the horizon for the Monte-Carlo sampling :param ell: the lookahead length :param b: the belief state :return: The rollout action and the corresponding Q-factor """ if ell == 0: return 0, RolloutUtil.eval_defender_base_parallel(alpha=alpha, pi2=pi2, config=config, num_samples=num_samples, horizon=horizon, s=0, b=b) else: A_costs = [] for a1 in config.A1: expected_immediate_cost = AptGameUtil.expected_cost(C=list(config.C), S=list(config.S), b=b, a1=a1) expected_future_cost = 0.0 for a2 in config.A2: for i, o in enumerate(config.O): b_prime = AptGameUtil.next_belief(o=i, a1=a1, b=np.array(b), pi2=pi2, config=config, a2=a2) _, cost = RolloutUtil.exact_defender_rollout(alpha=alpha, pi2=pi2, config=config, num_samples=num_samples, horizon=horizon, ell=ell - 1, b=list(b_prime)) obs_prob = 0.0 action_prob = 0.0 for s in config.S: action_prob += b[s] * pi2[s][a2] for s_prime in config.S: obs_prob += b[s] * config.T[a1][a2][s][s_prime] * config.Z[s_prime][i] expected_future_cost += action_prob * obs_prob * cost A_costs.append(expected_immediate_cost + config.gamma * expected_future_cost) best_action = np.argmin(A_costs) return int(best_action), float(A_costs[best_action])
[docs] @staticmethod def monte_carlo_defender_rollout(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, num_samples: int, horizon: int, ell: int, b: List[float], a2: Union[None, int] = None, s: Union[None, int] = None) \ -> Tuple[int, float]: """ Monte-Carlo based on rollout of the defender with a threshold base strategy :param alpha: the threshold of the base strategy :param pi2: the attacker strategy :param config: the game configuration :param num_samples: the number of monte-carlo samples :param horizon: the horizon for monte-carlo sampling :param ell: the lookahead length :param b: the belief state :param a2: the action of the attacker :param s: the state :return: The rollout action and the corresponding Q-factor """ if ell == 0: return 0, RolloutUtil.eval_defender_base_parallel(alpha=alpha, pi2=pi2, config=config, num_samples=num_samples, horizon=horizon, s=None, b=b.copy()) else: A_costs = [] for a1 in config.A1: if s is None: expected_immediate_cost = AptGameUtil.expected_cost(C=list(config.C), S=list(config.S), b=b, a1=a1) else: expected_immediate_cost = config.C[a1][s] p = NestablePool(num_samples) args = [] for i in range(num_samples): if s is None: s = AptGameUtil.sample_initial_state(b1=np.array(b)) if a2 is None: a2 = AptGameUtil.sample_attacker_action(pi2=pi2, s=s) s_prime = AptGameUtil.sample_next_state(T=config.T, s=s, a1=a1, a2=int(a2), S=config.S) o = AptGameUtil.sample_next_observation(Z=config.Z, s_prime=s_prime, O=config.O) o_idx = list(config.O).index(o) b_prime = AptGameUtil.next_belief(o=o_idx, a1=a1, b=np.array(b), pi2=pi2, config=config, a2=int(a2)) args.append((alpha, pi2, config, num_samples, horizon, ell - 1, b_prime.copy(), None, s)) cumulative_costs = p.starmap(RolloutUtil.monte_carlo_defender_rollout, args) expected_future_cost = np.mean(list(map(lambda x: x[1], cumulative_costs))) A_costs.append(expected_immediate_cost + config.gamma * expected_future_cost) best_action = np.argmin(A_costs) return int(best_action), float(A_costs[best_action])
[docs] @staticmethod def monte_carlo_attacker_rollout(alpha: float, pi2: npt.NDArray[Any], config: AptGameConfig, num_samples: int, horizon: int, ell: int, b: List[float], a1: Union[None, int] = None, s: Union[None, int] = None) -> Tuple[int, float]: """ Monte-Carlo based on rollout of the attacker with a threshold base strategy :param alpha: the threshold of the defender :param pi: the base strategy of the attacker :param config: the game configuration :param num_samples: the number of monte-carlo samples :param horizon: the horizon for monte-carlo sampling :param ell: the lookahead length :param b: the belief state :param a1: the action of the defender :return: The rollout action and the corresponding Q-factor """ if ell == 0: return 0, RolloutUtil.eval_attacker_base_parallel(alpha=alpha, pi2=pi2, config=config, num_samples=num_samples, horizon=horizon, s=s, b=b) else: A_costs = [] if a1 is None: a1 = 0 if sum(b[1:]) >= alpha: a1 = 1 expected_immediate_cost = AptGameUtil.expected_cost(C=list(config.C), S=list(config.S), b=b, a1=a1) for a2 in config.A2: p = NestablePool(num_samples) args = [] for i in range(num_samples): s = AptGameUtil.sample_initial_state(b1=np.array(b)) s_prime = AptGameUtil.sample_next_state(T=config.T, s=s, a1=a1, a2=a2, S=config.S) o = AptGameUtil.sample_next_observation(Z=config.Z, s_prime=s_prime, O=config.O) o_idx = list(config.O).index(o) b_prime = AptGameUtil.next_belief(o=o_idx, a1=a1, b=np.array(b), pi2=pi2, config=config, a2=a2) args.append((alpha, pi2, config, num_samples, horizon, ell - 1, b_prime)) cumulative_costs = p.starmap(RolloutUtil.monte_carlo_attacker_rollout, args) expected_future_cost = np.mean(list(map(lambda x: x[1], cumulative_costs))) A_costs.append(expected_immediate_cost + config.gamma * expected_future_cost) best_action = np.argmax(A_costs) return int(best_action), float(A_costs[best_action])