Source code for gym_csle_stopping_game.util.stopping_game_util

from typing import Any
import numpy as np
import numpy.typing as npt
from scipy.stats import betabinom
from gym_csle_stopping_game.dao.stopping_game_config import StoppingGameConfig


[docs]class StoppingGameUtil: """ Class with utility functions for the StoppingGame Environment """
[docs] @staticmethod def b1() -> npt.NDArray[np.float64]: """ Gets the initial belief :return: the initial belief """ return np.array([1.0, 0.0, 0.0])
[docs] @staticmethod def state_space(): """ Gets the state space :return: the state space of the game """ return np.array([0, 1, 2])
[docs] @staticmethod def defender_actions() -> npt.NDArray[np.int_]: """ Gets the action space of the defender :return: the action space of the defender """ return np.array([0, 1])
[docs] @staticmethod def attacker_actions() -> npt.NDArray[np.int_]: """ Gets the action space of the attacker :return: the action space of the attacker """ return np.array([0, 1])
[docs] @staticmethod def observation_space(n): """ Returns the observation space of size n :param n: the maximum observation :return: the observation space """ return np.array(list(range(n + 1)))
[docs] @staticmethod def reward_tensor(R_SLA: int, R_INT: int, R_COST: int, L: int, R_ST: int) -> npt.NDArray[Any]: """ Gets the reward tensor :param R_SLA: the R_SLA constant :param R_INT: the R_INT constant :param R_COST: the R_COST constant :param R_ST: the R_ST constant :return: a |L|x|A1|x|A2|x|S| tensor """ R_l = [] for l in range(1, L + 1): R = [ # Defender continues [ # Attacker continues [R_SLA, R_SLA + R_INT, 0], # Attacker stops [R_SLA, R_SLA, 0] ], # Defender stops [ # Attacker continues [R_COST / l, R_ST / l, 0], # Attacker stops [R_COST / l, R_SLA, 0] ] ] R_l.append(R) return np.array(R_l)
[docs] @staticmethod def transition_tensor(L: int, p: float) -> npt.NDArray[Any]: """ Gets the transition tensor :param L: the maximum number of stop actions :return: a |L|x|A1|x|A2||S|^2 tensor """ T_l = [] for l in range(1, L + 1): if l == 1: T = [ # Defender continues [ # Attacker continues [ [1, 0, 0], # No intrusion [0, 1 - 1 / (2 * l), 1 / (2 * l)], # Intrusion [0, 0, 1] # Terminal ], # Attacker stops [ [0, 1, 0], # No intrusion [0, 0, 1], # Intrusion [0, 0, 1] # Terminal ] ], # Defender stops [ # Attacker continues [ [0, 0, 1], # No intrusion [0, 0, 1], # Intrusion [0, 0, 1] # Terminal ], # Attacker stops [ [0, 0, 1], # No Intrusion [0, 0, 1], # Intrusion [0, 0, 1] # Terminal ] ] ] else: T = [ # Defender continues [ # Attacker continues [ [1, 0, 0], # No intrusion [0, 1 - 1 / (2 * l), 1 / (2 * l)], # Intrusion [0, 0, 1] # Terminal ], # Attacker stops [ [0, 1, 0], # No intrusion [0, 0, 1], # Intrusion [0, 0, 1] # Terminal ] ], # Defender stops [ # Attacker continues [ [1, 0, 0], # No intrusion [0, 1 - 1 / (2 * l), 1 / (2 * l)], # Intrusion [0, 0, 1] # Terminal ], # Attacker stops [ [0, 1, 0], # No Intrusion [0, 0, 1], # Intrusion [0, 0, 1] # Terminal ] ] ] T_l.append(T) return np.array(T_l)
[docs] @staticmethod def observation_tensor(n): """ :return: a |A1|x|A2|x|S|x|O| tensor """ intrusion_dist = [] no_intrusion_dist = [] terminal_dist = np.zeros(n + 1) terminal_dist[-1] = 1 intrusion_rv = betabinom(n=n, a=1, b=0.7) no_intrusion_rv = betabinom(n=n, a=0.7, b=3) for i in range(n + 1): intrusion_dist.append(intrusion_rv.pmf(i)) no_intrusion_dist.append(no_intrusion_rv.pmf(i)) Z = np.array( [ [ [ no_intrusion_dist, intrusion_dist, terminal_dist ], [ no_intrusion_dist, intrusion_dist, terminal_dist ], ], [ [ no_intrusion_dist, intrusion_dist, terminal_dist ], [ no_intrusion_dist, intrusion_dist, terminal_dist ], ] ] ) return Z
[docs] @staticmethod def sample_next_state(T: npt.NDArray[Any], l: int, s: int, a1: int, a2: int, S: npt.NDArray[np.int_]) -> int: """ Samples the next state :param T: the transition operator :param s: the currrent state :param a1: the defender action :param a2: the attacker action :param S: the state space :param l: the number of stops remaining :return: s' """ state_probs = [] for s_prime in S: state_probs.append(T[l - 1][a1][a2][s][s_prime]) return int(np.random.choice(np.arange(0, len(S)), p=state_probs))
[docs] @staticmethod def sample_initial_state(b1: npt.NDArray[np.float64]) -> int: """ Samples the initial state :param b1: the initial belief :return: s1 """ return int(np.random.choice(np.arange(0, len(b1)), p=b1))
[docs] @staticmethod def sample_next_observation(Z: npt.NDArray[Any], s_prime: int, O: npt.NDArray[np.int_]) -> int: """ Samples the next observation :param Z: observation tensor which include the observation probables :param s_prime: the new state :param O: the observation space :return: o """ observation_probs = [] for o in O: if len(Z.shape) == 4: observation_probs.append(Z[0][0][s_prime][o]) elif len(Z.shape) == 3: observation_probs.append(Z[0][s_prime][o]) elif len(Z.shape) == 2: observation_probs.append(Z[s_prime][o]) o = np.random.choice(np.arange(0, len(O)), p=observation_probs) return int(o)
[docs] @staticmethod def bayes_filter(s_prime: int, o: int, a1: int, b: npt.NDArray[np.float64], pi2: npt.NDArray[Any], l: int, config: StoppingGameConfig) -> float: """ A Bayesian filter to compute the belief of player 1 of being in s_prime when observing o after taking action a in belief b given that the opponent follows strategy pi2 :param s_prime: the state to compute the belief of :param o: the observation :param a1: the action of player 1 :param b: the current belief point :param pi2: the policy of player 2 :param l: stops remaining :return: b_prime(s_prime) """ l = l - 1 norm = 0 for s in config.S: for a2 in config.A2: for s_prime_1 in config.S: prob_1 = config.Z[a1][a2][s_prime_1][o] norm += b[s] * prob_1 * config.T[l][a1][a2][s][s_prime_1] * pi2[s][a2] if norm == 0: return 0 temp = 0 for s in config.S: for a2 in config.A2: temp += config.Z[a1][a2][s_prime][o] * config.T[l][a1][a2][s][s_prime] * b[s] * pi2[s][a2] b_prime_s_prime = temp / norm if round(b_prime_s_prime, 2) > 1: print(f"b_prime_s_prime >= 1: {b_prime_s_prime}, a1:{a1}, s_prime:{s_prime}, l:{l}, o:{o}, pi2:{pi2}") assert round(b_prime_s_prime, 2) <= 1 if s_prime == 2 and o != config.O[-1]: assert round(b_prime_s_prime, 2) <= 0.01 return float(b_prime_s_prime)
[docs] @staticmethod def next_belief(o: int, a1: int, b: npt.NDArray[np.float64], pi2: npt.NDArray[Any], config: StoppingGameConfig, l: int, a2: int = 0, s: int = 0) -> npt.NDArray[np.float64]: """ Computes the next belief using a Bayesian filter :param o: the latest observation :param a1: the latest action of player 1 :param b: the current belief :param pi2: the policy of player 2 :param config: the game config :param l: stops remaining :param a2: the attacker action (for debugging, should be consistent with pi2) :param s: the true state (for debugging) :return: the new belief """ b_prime = np.zeros(len(config.S)) for s_prime in config.S: b_prime[s_prime] = StoppingGameUtil.bayes_filter(s_prime=s_prime, o=o, a1=a1, b=b, pi2=pi2, config=config, l=l) if round(sum(b_prime), 2) != 1: print(f"error, b_prime:{b_prime}, o:{o}, a1:{a1}, b:{b}, pi2:{pi2}, " f"a2: {a2}, s:{s}") assert round(sum(b_prime), 2) == 1 return b_prime
[docs] @staticmethod def sample_attacker_action(pi2: npt.NDArray[Any], s: int) -> int: """ Samples the attacker action :param pi2: the attacker policy :param s: the game state :return: a2 is the attacker action """ return int(np.random.choice(np.arange(0, len(pi2[s])), p=pi2[s]))
[docs] @staticmethod def pomdp_solver_file(config: StoppingGameConfig, discount_factor: float, pi2: npt.NDArray[Any]) -> str: """ Gets the POMDP environment specification based on the format at http://www.pomdp.org/code/index.html, for the defender's local problem against a static attacker :param config: the POMDP config :param discount_factor: the discount factor :param pi2: the attacker strategy :return: the file content as a string """ file_str = "" file_str = file_str + f"discount: {discount_factor}\n\n" file_str = file_str + "values: reward\n\n" file_str = file_str + f"states: {len(config.S)}\n\n" file_str = file_str + f"actions: {len(config.A1)}\n\n" file_str = file_str + f"observations: {len(config.O)}\n\n" initial_belief_str = " ".join(list(map(lambda x: str(x), config.b1))) file_str = file_str + f"start: {initial_belief_str}\n\n\n" num_transitions = 0 for s in config.S: for a1 in config.A1: probs = [] for s_prime in range(len(config.S)): num_transitions += 1 prob = 0 for a2 in config.A2: prob += config.T[0][a1][a2][s][s_prime] * pi2[s][a2] file_str = file_str + f"T: {a1} : {s} : {s_prime} {prob:.80f}\n" probs.append(prob) assert round(sum(probs), 3) == 1 file_str = file_str + "\n\n" for a1 in config.A1: for s_prime in config.S: probs = [] for o in range(len(config.O)): prob = config.Z[0][0][s_prime][o] file_str = file_str + f"O : {a1} : {s_prime} : {o} {prob:.80f}\n" probs.append(prob) assert round(sum(probs), 3) == 1 file_str = file_str + "\n\n" for s in config.S: for a1 in config.A1: for s_prime in config.S: for o in config.O: r = config.R[0][a1][0][s] file_str = file_str + f"R: {a1} : {s} : {s_prime} : {o} {r:.80f}\n" return file_str