Source code for csle_tolerance.util.intrusion_response_cmdp_util

from typing import List
import numpy as np
import math


[docs]class IntrusionResponseCmdpUtil: """ Class with utility functions for the intrusion-response CMDP """
[docs] @staticmethod def state_space(s_max) -> List[int]: """ Gets the state space of the CMDP :return: a list with the states """ return list(range(s_max + 1))
[docs] @staticmethod def action_space() -> List[int]: """ Gets the action space of the CMDP :return: a list with the actions """ return [0, 1]
[docs] @staticmethod def cost_function(s: int, negate: bool = False) -> float: """ Cost function of the CMDP :param negate: boolean flag, if true then return negated version of the cost (the reward) :param s: the state :return: the cost """ if negate: return float(-s) else: return float(s)
[docs] @staticmethod def cost_tensor(states: List[int], negate: bool = False) -> List[float]: """ Creates a |S| tensor with the costs (or rewards) of the CMDP :param states: the list of states :param negate: a boolean flag indicating whether the cost should be negated as a reward or not :return: A tensor with the costs (or rewards) """ cost_tensor = [] for s in states: cost_tensor.append(IntrusionResponseCmdpUtil.cost_function(s=s, negate=negate)) return cost_tensor
[docs] @staticmethod def constraint_cost_function(s: int, f: int) -> float: """ Constraint cost function of the CMDP :param s: the state :param f: the tolerance threshold :return: the cost """ constraint_cost = 0.0 if s >= f + 1: constraint_cost = 1.0 return constraint_cost
[docs] @staticmethod def constraint_cost_tensor(states: List[int], f: int) -> List[float]: """ Creates a |S| tensor with the constraint costs of the CMDP :param states: the list of states :param f: the tolerance threshold :return: A tensor with the costs (or rewards) """ constraint_cost_tensor = [] for s in states: constraint_cost_tensor.append(IntrusionResponseCmdpUtil.constraint_cost_function(s=s, f=f)) return constraint_cost_tensor
[docs] @staticmethod def delta_function(s: int, p_a: float, p_c: float, p_u: float, delta: int, s_max: int) -> float: """ The delta function that gives the probability of the change in the number of healthy nodes :param s: the current state :param p_a: the intrusion probability :param p_c: the crash probability :param p_u: the recovery probability :param delta: the delta value :param s_max: the maximum number of nodes :return: """ prob = 0.0 for failures in range(0, s_max + 1): for recoveries in range(0, s_max + 1): if (recoveries - failures) == delta and (s + delta) <= s_max and (s + delta) >= 0: non_recoveries = s - recoveries non_failures = s - failures prob += (math.pow(p_u, recoveries) * math.pow(1 - p_u, non_recoveries) * math.pow(p_a + p_c, failures) * math.pow(1 - p_a - p_c, non_failures)) return prob
[docs] @staticmethod def transition_function(s: int, s_prime: int, a: int, p_a: float, p_c: float, p_u: float, s_max: int) -> float: """ The transition function of the CMDP :param s: the state :param s_prime: the next state :param a: the action :param p_a: the intrusion probability :param p_c: the crash probability :param p_u: the recovery probability :param s_max: the maximum state :return: P(s_prime | s, a) """ delta = s_prime - a - s delta_prob = IntrusionResponseCmdpUtil.delta_function(s=s, p_a=p_a, p_c=p_c, p_u=p_u, delta=delta, s_max=s_max) return delta_prob
[docs] @staticmethod def transition_tensor(states: List[int], actions: List[int], p_a: float, p_c: float, p_u: float, s_max: int) \ -> List[List[List[float]]]: """ Creates a |A|x|S|x|S| tensor with the transition probabilities of the CMDP :param states: the list of states :param actions: the list of actions :param p_a: the intrusion probability :param p_c: the failure probability :param p_u: the upgrade probability :return: the transition tensor """ transition_tensor = [] for a in actions: a_transitions = [] for s in states: s_a_transitions = [] normalizing_constant = 0.0 for s_prime in states: s_a_transitions.append(IntrusionResponseCmdpUtil.transition_function( s=s, s_prime=s_prime, a=a, p_a=p_a, p_c=p_c, p_u=p_u, s_max=s_max)) normalizing_constant = sum(s_a_transitions) s_a_transitions = list(np.array(s_a_transitions) * (1 / normalizing_constant)) if not round(sum(s_a_transitions), 2) == 1: print(f"{sum(s_a_transitions)}, s:{s}, a:{a}, normalizing constant: {normalizing_constant}, " f"s_a_transitions: {s_a_transitions}") a_transitions.append(s_a_transitions) transition_tensor.append(a_transitions) return transition_tensor