Source code for csle_tolerance.util.intrusion_recovery_pomdp_util

from typing import List
from scipy.stats import betabinom
import numpy as np
from csle_tolerance.dao.intrusion_recovery_pomdp_config import (
    IntrusionRecoveryPomdpConfig,
)
from csle_tolerance.dao.intrusion_recovery_game_config import (
    IntrusionRecoveryGameConfig,
)


[docs]class IntrusionRecoveryPomdpUtil: """ Class with utility functions for the intrusion-recovery POMDP """
[docs] @staticmethod def state_space() -> List[int]: """ Gets the state space of the POMDP :return: a list with the states """ return [0, 1, 2]
[docs] @staticmethod def initial_belief(p_a: float) -> List[float]: """ Gets the initial belief state of the POMDP :param p_a: the attack probability :return: the initial belief state """ return [1, 0, 0]
[docs] @staticmethod def action_space() -> List[int]: """ Gets the action space of the POMDP :return: a list with the actions """ return [0, 1]
[docs] @staticmethod def observation_space(num_observations: int) -> List[int]: """ Gets the observation space of the POMDP :param num_observations: the number of observations :return: a list with the actions """ return list(range(num_observations))
[docs] @staticmethod def cost_function(s: int, a: int, eta: float, negate: bool = False) -> float: """ Cost function of the POMDP :param s: the state :param a: the action :param eta: the scaling factor :param negate: boolean flag, if true then return negated version of the cost (the reward) :return: the cost or reward """ if s == 2: return 0 cost = eta * s - eta * a * s + a if negate: return -cost else: return cost
[docs] @staticmethod def cost_tensor( eta: float, states: List[int], actions: List[int], negate: bool = False ) -> List[List[float]]: """ Creates a |A|x|S| tensor with the costs (or rewards) of the POMDP :param eta: the cost scaling factor :param states: the list of states :param actions: the list of actions :param negate: a boolean flag indicating whether the cost should be negated as a reward or not :return: A tensor with the costs (or rewards) """ cost_tensor = [] for a in actions: a_costs = [] for s in states: a_costs.append( IntrusionRecoveryPomdpUtil.cost_function( s=s, a=a, eta=eta, negate=negate ) ) cost_tensor.append(a_costs) return cost_tensor
[docs] @staticmethod def observation_function(s: int, o: int, num_observations: int) -> float: """ The observation function of the POMDP :param s: the state :param o: the observation :param num_observations: the total number of observations :return: the probability P(o | s) """ if s == 0: no_intrusion_rv = betabinom(n=num_observations - 1, a=0.7, b=3) return float(no_intrusion_rv.pmf(o)) elif s == 1: intrusion_rv = betabinom(n=num_observations - 1, a=1, b=0.7) return float(intrusion_rv.pmf(o)) else: if o == num_observations - 1: return 1.0 else: return 0.0
[docs] @staticmethod def observation_tensor( states: List[int], observations: List[int] ) -> List[List[float]]: """ Creates a |S|x|O| tensor with the observation probabilities :param states: the list of states :param observations: the list of observations :return: the observation tensor """ observation_tensor = [] num_observations = len(observations) for s in states: s_observations = [] for o in observations: s_observations.append( IntrusionRecoveryPomdpUtil.observation_function( s=s, o=o, num_observations=num_observations ) ) observation_tensor.append(s_observations) return observation_tensor
[docs] @staticmethod def transition_function( s: int, s_prime: int, a: int, p_a: float, p_c_1: float, p_u: float, p_c_2: float ) -> float: """ The transition function of the POMDP :param s: the state :param s_prime: the next state :param a: the action :param p_a: the intrusion probability :param p_c_1: the crash probability in the healthy state :param p_c_2: the crash probability in the compromised state :param p_u: the upgrade probability :return: P(s_prime | s, a) """ if s == 2 and s_prime == 2: return 1.0 elif s_prime == 2 and s == 0: return p_c_1 elif s_prime == 2 and s == 1: return p_c_2 elif s_prime == 0 and a == 1 and s == 0: return (1.0 - p_a) * (1.0 - p_c_1) elif s_prime == 0 and a == 1 and s == 1: return (1.0 - p_a) * (1.0 - p_c_2) elif s_prime == 1 and a == 1 and s == 0: return p_a * (1.0 - p_c_1) elif s_prime == 1 and a == 1 and s == 1: return p_a * (1.0 - p_c_2) elif s_prime == 0 and s == 1 and a == 0: return (1.0 - p_c_2) * p_u elif s_prime == 0 and s == 0 and a == 0: return (1.0 - p_c_1) * (1.0 - p_a) elif s_prime == 1 and s == 0 and a == 0: return (1.0 - p_c_1) * p_a elif s_prime == 1 and s == 1 and a == 0: return (1 - p_c_2) * (1 - p_u) else: return 0
[docs] @staticmethod def transition_function_game( s: int, s_prime: int, a1: int, a2: int, p_a: float, p_c_1: float ) -> float: """ The transition function of the POSG :param s: the state :param s_prime: the next state :param a1: the defender action :param a2: the attacker action :param p_a: the intrusion probability :param p_c_1: the crash probability :return: P(s_prime | s, a1, a2) """ if s == 2 and s_prime == 2: return 1.0 elif s_prime == 2 and s in [0, 1]: return p_c_1 elif s_prime == 0 and a1 == 0 and a2 == 1 and s == 0: return (1 - p_a) * (1 - p_c_1) elif ( (s_prime == 0 and a2 == 0 and s == 0) or (s_prime == 0 and s == 1 and a1 == 1) or (s_prime == 1 and s == 1 and a1 == 0) ): return 1 - p_c_1 elif s_prime == 1 and s == 0 and a2 == 1: return (1 - p_c_1) * p_a else: return 0
[docs] @staticmethod def transition_tensor( states: List[int], actions: List[int], p_a: float, p_c_1: float, p_c_2: float, p_u: float, ) -> List[List[List[float]]]: """ Creates a |A|x|S|x|S| tensor with the transition probabilities of the POMDP :param states: the list of states :param actions: the list of actions :param p_a: the intrusion probability :param p_c_1: the crash probability in the healthy state :param p_c_2: the crash probability in the compromised state :param p_u: the upgrade probability :return: the transition tensor """ assert states == [0, 1, 2] transition_tensor = [] for a in actions: a_transitions = [] for s in states: s_a_transitions = [] for s_prime in states: s_a_transitions.append( IntrusionRecoveryPomdpUtil.transition_function( s=s, s_prime=s_prime, a=a, p_a=p_a, p_c_1=p_c_1, p_c_2=p_c_2, p_u=p_u, ) ) a_transitions.append(s_a_transitions) transition_tensor.append(a_transitions) return transition_tensor
[docs] @staticmethod def transition_tensor_game( states: List[int], defender_actions: List[int], attacker_actions: List[int], p_a: float, p_c_1: float, ) -> List[List[List[List[float]]]]: """ Creates a |A|x|A|x|S|x|S| tensor with the transition probabilities of the POSG :param states: the list of states :param defender_actions: the list of defender actions :param attacker_actions: the list of attacker actions :param p_a: the intrusion probability :param p_c_1: the crash probability :return: the transition tensor """ transition_tensor = [] for a1 in defender_actions: a1_transitions = [] for a2 in attacker_actions: a2_transitions = [] for s in states: s_a1_a2_transitions = [] for s_prime in states: s_a1_a2_transitions.append( IntrusionRecoveryPomdpUtil.transition_function_game( s=s, s_prime=s_prime, a1=a1, a2=a2, p_a=p_a, p_c_1=p_c_1 ) ) a2_transitions.append(s_a1_a2_transitions) a1_transitions.append(a2_transitions) transition_tensor.append(a1_transitions) return transition_tensor
[docs] @staticmethod def sample_initial_state(b1: List[float]) -> int: """ Samples the initial state :param b1: the initial belief :return: the initial state """ return int(np.random.choice(np.arange(0, len(b1)), p=b1))
[docs] @staticmethod def sample_next_observation( observation_tensor: List[List[float]], s_prime: int, observations: List[int] ) -> int: """ Samples the next observation :param s_prime: the new state :param observations: the observation space :param observation_tensor: the observation tensor :return: the next observation o """ observation_probs = [] for o in observations: observation_probs.append(observation_tensor[s_prime][o]) o = np.random.choice(np.arange(0, len(observations)), p=observation_probs) return int(o)
[docs] @staticmethod def sample_next_state_game( transition_tensor: List[List[List[List[float]]]], s: int, a1: int, a2: int ) -> int: """ Samples the next observation :param s: the current state :param a1: the defender action :param a2: the attacker action :param transition_tensor: the transition tensor :return: the next state a """ s_prime = np.random.choice( np.arange(0, len(transition_tensor[a1][a2][s])), p=transition_tensor[a1][a2][s], ) return int(s_prime)
[docs] @staticmethod def bayes_filter( s_prime: int, o: int, a: int, b: List[float], states: List[int], observations: List[int], observation_tensor: List[List[float]], transition_tensor: List[List[List[float]]], ) -> float: """ A Bayesian filter to compute b[s_prime] of the POMDP :param s_prime: the state to compute the belief for :param o: the latest observation :param a: the latest action :param b: the current belief :param states: the list of states :param observations: the list of observations :param observation_tensor: the observation tensor :param transition_tensor: the transition tensor of the POMDP :return: b[s_prime] """ norm = 0.0 for s in states: for s_prime_1 in states: prob_1 = observation_tensor[s_prime_1][o] norm += b[s] * prob_1 * transition_tensor[a][s][s_prime_1] if norm == 0.0: return 0.0 temp = 0.0 for s in states: temp += ( observation_tensor[s_prime][o] * transition_tensor[a][s][s_prime] * b[s] ) b_prime_s_prime = temp / norm if round(b_prime_s_prime, 2) > 1: print(f"b_prime_s_prime >= 1: {b_prime_s_prime}, a1:{a}, s_prime:{s_prime}") assert round(b_prime_s_prime, 2) <= 1 if s_prime == 2 and o != observations[-1]: assert round(b_prime_s_prime, 2) <= 0.01 return b_prime_s_prime
[docs] @staticmethod def p_o_given_b_a1_a2( o: int, b: List[float], a: int, states: List[int], transition_tensor: List[List[List[float]]], observation_tensor: List[List[float]], ) -> float: """ Computes P[o|a,b] of the POMDP :param o: the observation :param b: the belief point :param a: the action :param states: the list of states :param transition_tensor: the transition tensor :param observation_tensor: the observation tensor :return: the probability of observing o when taking action a in belief point b """ prob = 0.0 for s in states: for s_prime in states: prob += ( b[s] * transition_tensor[a][s][s_prime] * observation_tensor[s_prime][o] ) assert prob < 1 return prob
[docs] @staticmethod def next_belief( o: int, a: int, b: List[float], states: List[int], observations: List[int], observation_tensor: List[List[float]], transition_tensor: List[List[List[float]]], ) -> List[float]: """ Computes the next belief using a Bayesian filter :param o: the latest observation :param a: the latest action of player 1 :param b: the current belief :param states: the list of states :param observations: the list of observations :param observation_tensor: the observation tensor :param transition_tensor: the transition tensor :return: the new belief """ b_prime = [0.0] * len(states) for s_prime in states: b_prime[s_prime] = IntrusionRecoveryPomdpUtil.bayes_filter( s_prime=s_prime, o=o, a=a, b=b, states=states, observations=observations, transition_tensor=transition_tensor, observation_tensor=observation_tensor, ) if round(sum(b_prime), 2) != 1: print(f"error, b_prime:{b_prime}, o:{o}, a:{a}, b:{b}") assert round(sum(b_prime), 2) == 1 return b_prime
[docs] @staticmethod def pomdp_solver_file(config: IntrusionRecoveryPomdpConfig) -> str: """ Gets the POMDP environment specification based on the format at http://www.pomdp.org/code/index.html, for the defender's local problem against a static attacker :param config: the POMDP config :return: the file content as a string """ file_str = "" file_str = file_str + f"discount: {config.discount_factor}\n\n" file_str = file_str + "values: cost\n\n" file_str = file_str + f"states: {len(config.states)}\n\n" file_str = file_str + f"actions: {len(config.actions)}\n\n" file_str = file_str + f"observations: {len(config.observations)}\n\n" initial_belief_str = " ".join(list(map(lambda x: str(x), config.b1))) file_str = file_str + f"start: {initial_belief_str}\n\n\n" num_transitions = 0 for s in config.states: for a in config.actions: probs = [] for s_prime in range(len(config.states)): num_transitions += 1 prob = config.transition_tensor[a][s][s_prime] file_str = file_str + f"T: {a} : {s} : {s_prime} {prob:.80f}\n" probs.append(prob) assert round(sum(probs), 3) == 1 file_str = file_str + "\n\n" for a in config.actions: for s_prime in config.states: probs = [] for o in range(len(config.observations)): prob = config.observation_tensor[s_prime][o] file_str = file_str + f"O : {a} : {s_prime} : {o} {prob:.80f}\n" probs.append(prob) assert round(sum(probs), 3) == 1 file_str = file_str + "\n\n" for s in config.states: for a in config.actions: for s_prime in config.states: for o in config.observations: c = config.cost_tensor[a][s] file_str = ( file_str + f"R: {a} : {s} : {s_prime} : {o} {c:.80f}\n" ) return file_str
[docs] @staticmethod def generate_transitions(game_config: IntrusionRecoveryGameConfig) -> List[str]: """ Generates the transition rows of the POSG config file of HSVI :param game_config: the game configuration :return: list of transition rows """ transitions = [] for s in game_config.states: for a1 in game_config.actions: for a2 in game_config.actions: for s_prime in game_config.states: for i, _ in enumerate(game_config.observations): tr_prob = game_config.transition_tensor[a1][a2][s][s_prime] obs_prob = game_config.observation_tensor[a2][i] prob = tr_prob * obs_prob if prob > 0: transition = f"{s} {a1} {a2} {i} {s_prime} {prob}" transitions.append(transition) return transitions
[docs] @staticmethod def generate_rewards(game_config: IntrusionRecoveryGameConfig) -> List[str]: """ Generates the reward rows of the POSG config file of HSVI :param game_config: the game configuration :return: list of reward rows """ rewards = [] for s in game_config.states: for a1 in game_config.actions: for a2 in game_config.actions: r = -game_config.cost_tensor[a1][s] if r != 0: rew = f"{s} {a1} {a2} {r}" rewards.append(rew) return rewards
[docs] @staticmethod def generate_os_posg_game_file(game_config: IntrusionRecoveryGameConfig) -> str: """ Generates the POSG game file for HSVI :param game_config: the game configuration :return: a string with the contents of the config file """ num_partitions = 1 transitions = IntrusionRecoveryPomdpUtil.generate_transitions( game_config=game_config ) rewards = IntrusionRecoveryPomdpUtil.generate_rewards(game_config=game_config) game_description = ( f"{len(game_config.states)} {num_partitions} {len(game_config.actions)} " f"{len(game_config.actions)} " f"{len(game_config.observations)} {len(transitions)} " f"{len(rewards)} {game_config.discount_factor}" ) state_desriptions = [] for s in game_config.states: state_desriptions.append(f"{s} {0}") player_1_actions = ["WAIT", "RECOVER"] player_2_actions = ["FALSEALARM", "ATTACK"] player_2_legal_actions = [] for _ in game_config.states: player_2_legal_actions.append( " ".join(list(map(lambda x: str(x), game_config.actions))) ) player_1_legal_actions = [] player_1_legal_actions.append( " ".join(list(map(lambda x: str(x), game_config.actions))) ) obs_desriptions = [] for i, o in enumerate(game_config.observations): obs_desriptions.append(f"o_{o}") initial_belief_str = ( f"{0} {' '.join(list(map(lambda x: str(x), game_config.b1)))}" ) game_file_str = "" game_file_str = game_file_str + game_description + "\n" game_file_str = game_file_str + "\n".join(state_desriptions) + "\n" game_file_str = game_file_str + "\n".join(player_1_actions) + "\n" game_file_str = game_file_str + "\n".join(player_2_actions) + "\n" game_file_str = game_file_str + "\n".join(obs_desriptions) + "\n" game_file_str = game_file_str + "\n".join(player_2_legal_actions) + "\n" game_file_str = game_file_str + "\n".join(player_1_legal_actions) + "\n" game_file_str = game_file_str + "\n".join(transitions) + "\n" game_file_str = game_file_str + "\n".join(rewards) + "\n" game_file_str = game_file_str + initial_belief_str with open("recovery_game.txt", "w") as f: f.write(game_file_str) return game_file_str