Source code for gym_csle_stopping_game.util.stopping_game_util

from typing import Any, Tuple
import itertools
import numpy as np
import numpy.typing as npt
from scipy.stats import betabinom
from gym_csle_stopping_game.dao.stopping_game_config import StoppingGameConfig
from csle_common.dao.training.policy import Policy


[docs]class StoppingGameUtil: """ Class with utility functions for the StoppingGame Environment """
[docs] @staticmethod def b1() -> npt.NDArray[np.float64]: """ Gets the initial belief :return: the initial belief """ return np.array([1.0, 0.0, 0.0])
[docs] @staticmethod def state_space(): """ Gets the state space :return: the state space of the game """ return np.array([0, 1, 2])
[docs] @staticmethod def defender_actions() -> npt.NDArray[np.int_]: """ Gets the action space of the defender :return: the action space of the defender """ return np.array([0, 1])
[docs] @staticmethod def attacker_actions() -> npt.NDArray[np.int_]: """ Gets the action space of the attacker :return: the action space of the attacker """ return np.array([0, 1])
[docs] @staticmethod def observation_space(n): """ Returns the observation space of size n :param n: the maximum observation :return: the observation space """ return np.array(list(range(n + 1)))
[docs] @staticmethod def reward_tensor(R_SLA: int, R_INT: int, R_COST: int, L: int, R_ST: int) -> npt.NDArray[Any]: """ Gets the reward tensor :param R_SLA: the R_SLA constant :param R_INT: the R_INT constant :param R_COST: the R_COST constant :param R_ST: the R_ST constant :return: a |L|x|A1|x|A2|x|S| tensor """ R_l = [] for l in range(1, L + 1): R = [ # Defender continues [ # Attacker continues [R_SLA, R_SLA + R_INT, 0], # Attacker stops [R_SLA, R_SLA, 0] ], # Defender stops [ # Attacker continues [R_COST / l, R_ST / l, 0], # Attacker stops [R_COST / l, R_SLA, 0] ] ] R_l.append(R) return np.array(R_l)
[docs] @staticmethod def transition_tensor(L: int) -> npt.NDArray[Any]: """ Gets the transition tensor :param L: the maximum number of stop actions :return: a |L|x|A1|x|A2||S|^2 tensor """ T_l = [] for l in range(1, L + 1): if l == 1: T = [ # Defender continues [ # Attacker continues [ [1.0, 0.0, 0.0], # No intrusion [0.0, 1.0, 0.0], # Intrusion [0.0, 0.0, 1.0] # Terminal ], # Attacker stops [ [0.0, 1.0, 0.0], # No intrusion [0.0, 0.0, 1.0], # Intrusion [0.0, 0.0, 1.0] # Terminal ] ], # Defender stops [ # Attacker continues [ [0.0, 0.0, 1.0], # No intrusion [0.0, 0.0, 1.0], # Intrusion [0.0, 0.0, 1.0] # Terminal ], # Attacker stops [ [0.0, 0.0, 1.0], # No Intrusion [0.0, 0.0, 1.0], # Intrusion [0.0, 0.0, 1.0] # Terminal ] ] ] else: T = [ # Defender continues [ # Attacker continues [ [1.0, 0.0, 0.0], # No intrusion [0.0, 1.0 - 1.0 / (2.0 * l), 1.0 / (2.0 * l)], # Intrusion [0.0, 0.0, 1.0] # Terminal ], # Attacker stops [ [0.0, 1.0, 0.0], # No intrusion [0.0, 0.0, 1.0], # Intrusion [0.0, 0.0, 1.0] # Terminal ] ], # Defender stops [ # Attacker continues [ [1.0, 0.0, 0.0], # No intrusion [0.0, 1.0 - 1.0 / (2.0 * l), 1.0 / (2.0 * l)], # Intrusion [0.0, 0.0, 1.0] # Terminal ], # Attacker stops [ [0.0, 1.0, 0.0], # No Intrusion [0.0, 0.0, 1.0], # Intrusion [0.0, 0.0, 1.0] # Terminal ] ] ] T_l.append(T) return np.array(T_l)
[docs] @staticmethod def observation_tensor(n): """ :return: a |A1|x|A2|x|S|x|O| tensor """ intrusion_dist = [] no_intrusion_dist = [] terminal_dist = np.zeros(n + 1) terminal_dist[-1] = 1 intrusion_rv = betabinom(n=n, a=1, b=0.7) no_intrusion_rv = betabinom(n=n, a=0.7, b=3) for i in range(n + 1): intrusion_dist.append(intrusion_rv.pmf(i)) no_intrusion_dist.append(no_intrusion_rv.pmf(i)) Z = np.array( [ [ [ no_intrusion_dist, intrusion_dist, terminal_dist ], [ no_intrusion_dist, intrusion_dist, terminal_dist ], ], [ [ no_intrusion_dist, intrusion_dist, terminal_dist ], [ no_intrusion_dist, intrusion_dist, terminal_dist ], ] ] ) return Z
[docs] @staticmethod def sample_next_state(T: npt.NDArray[Any], l: int, s: int, a1: int, a2: int, S: npt.NDArray[np.int_]) -> int: """ Samples the next state :param T: the transition operator :param s: the currrent state :param a1: the defender action :param a2: the attacker action :param S: the state space :param l: the number of stops remaining :return: s' """ state_probs = [] for s_prime in S: state_probs.append(T[l - 1][a1][a2][s][s_prime]) return int(np.random.choice(np.arange(0, len(S)), p=state_probs))
[docs] @staticmethod def sample_initial_state(b1: npt.NDArray[np.float64]) -> int: """ Samples the initial state :param b1: the initial belief :return: s1 """ return int(np.random.choice(np.arange(0, len(b1)), p=b1))
[docs] @staticmethod def sample_next_observation(Z: npt.NDArray[Any], s_prime: int, O: npt.NDArray[np.int_]) -> int: """ Samples the next observation :param Z: observation tensor which include the observation probables :param s_prime: the new state :param O: the observation space :return: o """ observation_probs = [] for o in O: if len(Z.shape) == 4: observation_probs.append(Z[0][0][s_prime][o]) elif len(Z.shape) == 3: observation_probs.append(Z[0][s_prime][o]) elif len(Z.shape) == 2: observation_probs.append(Z[s_prime][o]) o = np.random.choice(np.arange(0, len(O)), p=observation_probs) return int(o)
[docs] @staticmethod def bayes_filter(s_prime: int, o: int, a1: int, b: npt.NDArray[np.float64], pi2: npt.NDArray[Any], l: int, config: StoppingGameConfig) -> float: """ A Bayesian filter to compute the belief of player 1 of being in s_prime when observing o after taking action a in belief b given that the opponent follows strategy pi2 :param s_prime: the state to compute the belief of :param o: the observation :param a1: the action of player 1 :param b: the current belief point :param pi2: the policy of player 2 :param l: stops remaining :return: b_prime(s_prime) """ l = l - 1 norm = 0 for s in config.S: for a2 in config.A2: for s_prime_1 in config.S: prob_1 = config.Z[a1][a2][s_prime_1][o] norm += b[s] * prob_1 * config.T[l][a1][a2][s][s_prime_1] * pi2[s][a2] if norm == 0: return 0 temp = 0 for s in config.S: for a2 in config.A2: temp += config.Z[a1][a2][s_prime][o] * config.T[l][a1][a2][s][s_prime] * b[s] * pi2[s][a2] b_prime_s_prime = temp / norm if round(b_prime_s_prime, 2) > 1: print(f"b_prime_s_prime >= 1: {b_prime_s_prime}, a1:{a1}, s_prime:{s_prime}, l:{l}, o:{o}, pi2:{pi2}") assert round(b_prime_s_prime, 2) <= 1 if s_prime == 2 and o != config.O[-1]: assert round(b_prime_s_prime, 2) <= 0.01 return float(b_prime_s_prime)
[docs] @staticmethod def next_belief(o: int, a1: int, b: npt.NDArray[np.float64], pi2: npt.NDArray[Any], config: StoppingGameConfig, l: int, a2: int = 0, s: int = 0) -> npt.NDArray[np.float64]: """ Computes the next belief using a Bayesian filter :param o: the latest observation :param a1: the latest action of player 1 :param b: the current belief :param pi2: the policy of player 2 :param config: the game config :param l: stops remaining :param a2: the attacker action (for debugging, should be consistent with pi2) :param s: the true state (for debugging) :return: the new belief """ b_prime = np.zeros(len(config.S)) for s_prime in config.S: b_prime[s_prime] = StoppingGameUtil.bayes_filter(s_prime=s_prime, o=o, a1=a1, b=b, pi2=pi2, config=config, l=l) if round(sum(b_prime), 2) != 1: print(f"error, b_prime:{b_prime}, o:{o}, a1:{a1}, b:{b}, pi2:{pi2}, " f"a2: {a2}, s:{s}") assert round(sum(b_prime), 2) == 1 return b_prime
[docs] @staticmethod def sample_attacker_action(pi2: npt.NDArray[Any], s: int) -> int: """ Samples the attacker action :param pi2: the attacker policy :param s: the game state :return: a2 is the attacker action """ return int(np.random.choice(np.arange(0, len(pi2[s])), p=pi2[s]))
[docs] @staticmethod def pomdp_solver_file(config: StoppingGameConfig, discount_factor: float, pi2: npt.NDArray[Any]) -> str: """ Gets the POMDP environment specification based on the format at http://www.pomdp.org/code/index.html, for the defender's local problem against a static attacker :param config: the POMDP config :param discount_factor: the discount factor :param pi2: the attacker strategy :return: the file content as a string """ file_str = "" file_str = file_str + f"discount: {discount_factor}\n\n" file_str = file_str + "values: reward\n\n" file_str = file_str + f"states: {len(config.S)}\n\n" file_str = file_str + f"actions: {len(config.A1)}\n\n" file_str = file_str + f"observations: {len(config.O)}\n\n" initial_belief_str = " ".join(list(map(lambda x: str(x), config.b1))) file_str = file_str + f"start: {initial_belief_str}\n\n\n" num_transitions = 0 for s in config.S: for a1 in config.A1: probs = [] for s_prime in range(len(config.S)): num_transitions += 1 prob = 0 for a2 in config.A2: prob += config.T[0][a1][a2][s][s_prime] * pi2[s][a2] file_str = file_str + f"T: {a1} : {s} : {s_prime} {prob:.80f}\n" probs.append(prob) assert round(sum(probs), 3) == 1 file_str = file_str + "\n\n" for a1 in config.A1: for s_prime in config.S: probs = [] for o in range(len(config.O)): prob = config.Z[0][0][s_prime][o] file_str = file_str + f"O : {a1} : {s_prime} : {o} {prob:.80f}\n" probs.append(prob) assert round(sum(probs), 3) == 1 file_str = file_str + "\n\n" for s in config.S: for a1 in config.A1: for s_prime in config.S: for o in config.O: r = config.R[0][a1][0][s] file_str = file_str + f"R: {a1} : {s} : {s_prime} : {o} {r:.80f}\n" return file_str
[docs] @staticmethod def reduce_T_attacker(T: npt.NDArray[np.float_], strategy: Policy) -> npt.NDArray[np.float_]: """ Reduces the transition tensor based on a given attacker strategy :param T: the tensor to reduce :param strategy: the strategy to use for the reduction :return: the reduced tensor (|A1|x|S|x|S|) """ if len(T.shape) == 5: T = T[0] reduced_T = np.zeros((T.shape[0], T.shape[2], T.shape[3])) for i in range(T.shape[0]): for j in range(T.shape[2]): for k in range(T.shape[3]): reduced_T[i][j][k] = T[i][0][j][k] * strategy.probability(j, 0) + T[i][1][j][ k] * strategy.probability(j, 1) # if j == 0: # reduced_T[i][j][k] = T[i][0][j][k] * strategy.probability(j, 0) + T[i][1][j][ # k] * strategy.probability(j, 1) # else: # reduced_T[i][j][k] = (T[i][0][j][k] * (1 - strategy.probability(j, 0)) + T[i][1][j][k] * # strategy.probability(j, 1)) return reduced_T
[docs] @staticmethod def reduce_R_attacker(R: npt.NDArray[np.float_], strategy: Policy) -> npt.NDArray[np.float_]: """ Reduces the reward tensor based on a given attacker strategy :param R: the reward tensor to reduce :param strategy: the strategy to use for the reduction :return: the reduced reward tensor (|A1|x|S|) """ if len(R.shape) == 4: R = R[0] reduced_R = np.zeros((R.shape[0], R.shape[2])) for i in range(R.shape[0]): for j in range(R.shape[2]): reduced_R[i][j] = (R[i][0][j] * strategy.probability(j, 0) + R[i][1][j] * strategy.probability(j, 1)) return reduced_R
[docs] @staticmethod def reduce_Z_attacker(Z: npt.NDArray[np.float_], strategy: Policy) -> npt.NDArray[np.float_]: """ Reduces the observation tensor based on a given attacker strategy :param Z: the observation tensor to reduce :param strategy: the strategy to use for the reduction :return: the reduced observation tensor (|A1|x|S|x|O|) """ reduced_Z = np.zeros((Z.shape[0], Z.shape[2], Z.shape[3])) for i in range(Z.shape[0]): for j in range(Z.shape[2]): for k in range(Z.shape[3]): reduced_Z[i][j][k] = Z[i][0][j][k] * strategy.probability(j, 0) + Z[i][1][j][ k] * strategy.probability(j, 1) return reduced_Z
[docs] @staticmethod def reduce_T_defender(T: npt.NDArray[np.float_], strategy: Policy) -> npt.NDArray[np.float_]: """ Reduces the transition tensor based on a given defender strategy :param T: the tensor to reduce :param strategy: the strategy to use for the reduction :return: the reduced tensor (|A2|x|S|x|S|) """ if len(T.shape) == 5: T = T[0] reduced_T = np.zeros((T.shape[1], T.shape[2], T.shape[3])) for i in range(T.shape[1]): for j in range(T.shape[2]): for k in range(T.shape[3]): reduced_T[i][j][k] = (T[0][i][j][k] * strategy.probability(j, 0) + T[1][i][j][k] * strategy.probability(j, 1)) return reduced_T
[docs] @staticmethod def reduce_R_defender(R: npt.NDArray[np.float_], strategy: Policy) -> npt.NDArray[np.float_]: """ Reduces the reward tensor based on a given defender strategy :param R: the reward tensor to reduce :param strategy: the strategy to use for the reduction :return: the reduced reward tensor (|A2|x|S|) """ if len(R.shape) == 4: R = R[0] reduced_R = np.zeros((R.shape[1], R.shape[2])) for i in range(R.shape[1]): for j in range(R.shape[2]): reduced_R[i][j] = (R[0][i][j] * strategy.probability(j, 0) + R[1][i][j] * strategy.probability(j, 1)) return reduced_R
[docs] @staticmethod def aggregate_belief_mdp_defender(aggregation_resolution: int, T: npt.NDArray[np.float_], R: npt.NDArray[np.float_], Z: npt.NDArray[np.float_], S: npt.NDArray[np.int_], A: npt.NDArray[np.int_], O: npt.NDArray[np.int_]) \ -> Tuple[npt.NDArray[np.float_], npt.NDArray[np.int_], npt.NDArray[np.float_], npt.NDArray[np.float_]]: """ Generates an aggregate belief MDP from a given POMDP specification and aggregation resolution :param aggregation_resolution: the belief aggregation resolution :param T: the transition tensor of the POMDP :param R: the reward tensor of the POMDP :param Z: the observation tensor of the POMDP :param S: the state space of the POMDP :param A: the action space of the POMDP :param O: the observation space of the POMDP :return: the state space, action space, transition operator, and belief operator of the belief MDP """ aggregate_belief_space = StoppingGameUtil.generate_aggregate_belief_space( n=aggregation_resolution, belief_space_dimension=len(S)) belief_T = StoppingGameUtil.generate_aggregate_belief_transition_operator( aggregate_belief_space=aggregate_belief_space, S=S, A=A, O=O, T=T, Z=Z) belief_R = StoppingGameUtil.generate_aggregate_belief_reward_tensor( aggregate_belief_space=aggregate_belief_space, S=S, A=A, R=R) return aggregate_belief_space, A, belief_T, belief_R
[docs] @staticmethod def generate_aggregate_belief_space(n: int, belief_space_dimension: int) -> npt.NDArray[np.float_]: """ Generate an aggregate belief space B_n. :param n: the aggregation resolution :param belief_space_dimension: the belief space dimension :return: the aggregate belief space """ # Generate all combinations of integer allocations k_i such that sum(k_i) = n combinations = [k for k in itertools.product(range(n + 1), repeat=belief_space_dimension) if sum(k) == n] # Convert integer allocations to belief points by dividing each k_i by n belief_points = [list(k_i / n for k_i in k) for k in combinations] # Remove all beliefs that violate the stopping dynamics belief_points = list(filter(lambda x: x[-1] == 1.0 or x[-1] == 0.0, belief_points)) return np.array(belief_points)
[docs] @staticmethod def generate_aggregate_belief_reward_tensor( aggregate_belief_space: npt.NDArray[np.float_], S: npt.NDArray[np.int_], A: npt.NDArray[np.int_], R: npt.NDArray[np.float_]) -> npt.NDArray[np.float_]: """ Generates an aggregate reward tensor for the aggregate belief MDP :param aggregate_belief_space: the aggregate belief space :param S: the state space of the POMDP :param A: the action space of the POMDP :param R: the reward tensor of the POMDP :return: the reward tensor of the aggregate belief MDP """ belief_R = np.zeros((len(A), len(aggregate_belief_space))) belief_space_list = aggregate_belief_space.tolist() for a in A: for b in aggregate_belief_space: expected_reward = 0 for s in S: expected_reward += R[a][s] * b[s] belief_R[a][belief_space_list.index(b.tolist())] = expected_reward return belief_R
[docs] @staticmethod def generate_aggregate_belief_transition_operator( aggregate_belief_space: npt.NDArray[np.float_], S: npt.NDArray[np.int_], A: npt.NDArray[np.int_], O: npt.NDArray[np.int_], T: npt.NDArray[np.float_], Z: npt.NDArray[np.float_]) -> npt.NDArray[np.float_]: """ Generates an aggregate belief space transition operator :param aggregate_belief_space: the aggregate belief space :param O: the observation space of the POMDP :param S: the state space of the POMDP :param A: the action space of the POMDP :param T: the transition operator of the POMDP :param Z: the observation tensor of the POMDP :return: the aggregate belief space operator """ belief_space_list = aggregate_belief_space.tolist() belief_T = np.zeros((len(A), len(aggregate_belief_space), len(aggregate_belief_space))) for a in A: for b1 in aggregate_belief_space: for b2 in aggregate_belief_space: belief_T[a][belief_space_list.index(b1.tolist())][belief_space_list.index(b2.tolist())] \ = StoppingGameUtil.aggregate_belief_transition_probability( b1=b1, b2=b2, a=a, S=S, O=O, T=T, Z=Z, aggregate_belief_space=aggregate_belief_space, A=A) return belief_T
[docs] @staticmethod def aggregate_belief_transition_probability(b1: npt.NDArray[np.float_], b2: npt.NDArray[np.float_], a: int, S: npt.NDArray[np.int_], O: npt.NDArray[np.int_], A: npt.NDArray[np.int_], T: npt.NDArray[np.float_], Z: npt.NDArray[np.float_], aggregate_belief_space: npt.NDArray[np.float_]) -> float: """ Calculates the probability of transitioning from belief b1 to belief b2 when taking action a :param b1: the source belief :param b2: the target belief :param a: the action :param S: the state space of the POMDP :param O: the observation space of the POMDP :param A: the action space of the POMDP :param T: the transition operator :param Z: the observation tensor :param aggregate_belief_space: the aggregate belief space :return: the probability P(b2 | b1, a) """ prob = 0 for o in O: if sum([Z[a][s_prime][o] * b1[s] * T[a][s][s_prime] for s in S for s_prime in S]) == 0: continue b_prime = StoppingGameUtil.pomdp_next_belief( o=o, a=a, b=b1, states=S, observations=O, observation_tensor=Z, transition_tensor=T) nearest_neighbor = StoppingGameUtil.find_nearest_neighbor_belief(belief_space=aggregate_belief_space, target_belief=b_prime) if np.array_equal(nearest_neighbor, b2): for s in S: for s_prime in S: prob += Z[a][s_prime][o] * b1[s] * T[a][s][s_prime] return prob
[docs] @staticmethod def pomdp_next_belief(o: int, a: int, b: npt.NDArray[np.float64], states: npt.NDArray[np.int_], observations: npt.NDArray[np.int_], observation_tensor: npt.NDArray[np.float_], transition_tensor: npt.NDArray[np.float_]) \ -> npt.NDArray[np.float64]: """ Computes the next belief of the POMDP using a Bayesian filter :param o: the latest observation :param a: the latest action of player 1 :param b: the current belief :param states: the list of states :param observations: the list of observations :param observation_tensor: the observation tensor :param transition_tensor: the transition tensor :return: the new belief """ b_prime = [0.0] * len(states) for s_prime in states: b_prime[s_prime] = StoppingGameUtil.pomdp_bayes_filter( s_prime=s_prime, o=o, a=a, b=b, states=states, observations=observations, transition_tensor=transition_tensor, observation_tensor=observation_tensor) if round(sum(b_prime), 2) != 1: print(f"error, b_prime:{b_prime}, o:{o}, a:{a}, b:{b}") assert round(sum(b_prime), 2) == 1 return np.array(b_prime)
[docs] @staticmethod def pomdp_bayes_filter(s_prime: int, o: int, a: int, b: npt.NDArray[np.float64], states: npt.NDArray[np.int_], observations: npt.NDArray[np.int_], observation_tensor: npt.NDArray[np.float_], transition_tensor: npt.NDArray[np.float_]) -> float: """ A Bayesian filter to compute b[s_prime] of the POMDP :param s_prime: the state to compute the belief for :param o: the latest observation :param a: the latest action :param b: the current belief :param states: the list of states :param observations: the list of observations :param observation_tensor: the observation tensor :param transition_tensor: the transition tensor of the POMDP :return: b[s_prime] """ norm = 0.0 for s in states: for s_prime_1 in states: prob_1 = observation_tensor[a][s_prime_1][o] norm += b[s] * prob_1 * transition_tensor[a][s][s_prime_1] if norm == 0.0: print(f"zero norm, a: {a}, b: {b}, o: {o}") return 0.0 temp = 0.0 for s in states: temp += observation_tensor[a][s_prime][o] * transition_tensor[a][s][s_prime] * b[s] b_prime_s_prime = temp / norm if round(b_prime_s_prime, 2) > 1: print(f"b_prime_s_prime >= 1: {b_prime_s_prime}, a1:{a}, s_prime:{s_prime}") assert round(b_prime_s_prime, 2) <= 1 if s_prime == 2 and o != observations[-1]: assert round(b_prime_s_prime, 2) <= 0.01 return b_prime_s_prime
[docs] @staticmethod def find_nearest_neighbor_belief(belief_space: npt.NDArray[np.float_], target_belief: npt.NDArray[np.float_]) \ -> npt.NDArray[np.float_]: """ Finds the nearest neighbor (in the Euclidean sense) of a given belief in a certain belief space :param belief_space: the belief to search from :param target_belief: the belief to find the nearest neighbor of :return: the nearest neighbor belief from the belief space """ # Compute Euclidean distances between the target belief and all points in the belief space distances = np.linalg.norm(belief_space - target_belief, axis=1) # Find the index of the minimum distance (break ties consistently by choosing the smallest index) nearest_index = int(np.argmin(distances)) return np.array(belief_space[nearest_index])