from typing import List, Optional, Tuple, Any
import math
import time
import os
import numpy as np
import numpy.typing as npt
import gymnasium as gym
import pulp
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.training.experiment_config import ExperimentConfig
from csle_common.dao.training.experiment_result import ExperimentResult
from csle_common.dao.training.agent_type import AgentType
from csle_common.util.experiment_util import ExperimentUtil
from csle_common.logging.log import Logger
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.dao.jobs.training_job_config import TrainingJobConfig
from csle_common.dao.training.experiment_execution import ExperimentExecution
from csle_common.dao.training.alpha_vectors_policy import AlphaVectorsPolicy
from csle_common.util.general_util import GeneralUtil
from csle_common.dao.simulation_config.base_env import BaseEnv
from csle_agents.agents.base.base_agent import BaseAgent
import csle_agents.constants.constants as agents_constants
import csle_agents.common.pruning as pruning
[docs]class HSVIAgent(BaseAgent):
"""
Heuristic-Search Value Iteration for POMDPs (Trey Smith and Reid Simmons, 2004) Agent
"""
def __init__(self, simulation_env_config: SimulationEnvConfig,
experiment_config: ExperimentConfig,
training_job: Optional[TrainingJobConfig] = None, save_to_metastore: bool = True,
env: Optional[BaseEnv] = None):
"""
Initializes the HSVI agent
:param simulation_env_config: configuration of the simulation environment
:param experiment_config: the experiment configuration
:param training_job: an existing training job to use (optional)
:param save_to_metastore: boolean flag whether to save the execution to the metastore
:param env: the gym environment for training
"""
super().__init__(simulation_env_config=simulation_env_config, emulation_env_config=None,
experiment_config=experiment_config)
assert experiment_config.agent_type == AgentType.HSVI
self.training_job = training_job
self.save_to_metastore = save_to_metastore
self.env = env
[docs] def train(self) -> ExperimentExecution:
"""
Runs the value iteration algorithm to compute V*
:return: the results
"""
pid = os.getpid()
# Initialize metrics
exp_result = ExperimentResult()
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_RETURN)
exp_result.plot_metrics.append(agents_constants.HSVI.WIDTHS)
exp_result.plot_metrics.append(agents_constants.HSVI.LB_SIZES)
exp_result.plot_metrics.append(agents_constants.HSVI.UB_SIZES)
exp_result.plot_metrics.append(agents_constants.HSVI.INITIAL_BELIEF_VALUES)
descr = f"Computation of V* with the HSVI algorithm using " \
f"simulation:{self.simulation_env_config.name}"
for seed in self.experiment_config.random_seeds:
exp_result.all_metrics[seed] = {}
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN] = []
exp_result.all_metrics[seed][agents_constants.HSVI.WIDTH] = []
exp_result.all_metrics[seed][agents_constants.HSVI.UB_SIZE] = []
exp_result.all_metrics[seed][agents_constants.HSVI.LB_SIZE] = []
if self.env is None:
self.env = gym.make(self.simulation_env_config.gym_env_name,
config=self.simulation_env_config.simulation_env_input_config)
# Initialize training job
if self.training_job is None:
self.training_job = TrainingJobConfig(
simulation_env_name=self.simulation_env_config.name, experiment_config=self.experiment_config,
progress_percentage=0, pid=pid, experiment_result=exp_result,
emulation_env_name=None, simulation_traces=[],
num_cached_traces=0,
log_file_path=Logger.__call__().get_log_file_path(), descr=descr,
physical_host_ip=GeneralUtil.get_host_ip())
if self.save_to_metastore:
training_job_id = MetastoreFacade.save_training_job(training_job=self.training_job)
self.training_job.id = training_job_id
else:
self.training_job.pid = pid
self.training_job.progress_percentage = 0
self.training_job.experiment_result = exp_result
if self.save_to_metastore:
MetastoreFacade.update_training_job(training_job=self.training_job, id=self.training_job.id)
# Initialize execution result
ts = time.time()
emulation_name = None
if self.emulation_env_config is not None:
emulation_name = self.emulation_env_config.name
simulation_name = self.simulation_env_config.name
self.exp_execution = ExperimentExecution(result=exp_result, config=self.experiment_config, timestamp=ts,
emulation_name=emulation_name, simulation_name=simulation_name,
descr=descr, log_file_path=self.training_job.log_file_path)
if self.save_to_metastore:
exp_execution_id = MetastoreFacade.save_experiment_execution(self.exp_execution)
self.exp_execution.id = exp_execution_id
for seed in self.experiment_config.random_seeds:
ExperimentUtil.set_seed(seed)
exp_result = self.hsvi_algorithm(exp_result=exp_result, seed=seed)
# Calculate average and std metrics
exp_result.avg_metrics = {}
exp_result.std_metrics = {}
for metric in exp_result.all_metrics[self.experiment_config.random_seeds[0]].keys():
value_vectors = []
for seed in self.experiment_config.random_seeds:
value_vectors.append(exp_result.all_metrics[seed][metric])
avg_metrics = []
std_metrics = []
for i in range(len(value_vectors[0])):
if type(value_vectors[0][0]) is int or type(value_vectors[0][0]) is float \
or type(value_vectors[0][0]) is np.int64 or type(value_vectors[0][0]) is np.float64:
seed_values = []
for seed_idx in range(len(self.experiment_config.random_seeds)):
seed_values.append(value_vectors[seed_idx][i])
avg = ExperimentUtil.mean_confidence_interval(
data=seed_values,
confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[0]
if not math.isnan(avg):
avg_metrics.append(avg)
ci = ExperimentUtil.mean_confidence_interval(
data=seed_values,
confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[1]
if not math.isnan(ci):
std_metrics.append(ci)
else:
std_metrics.append(-1)
else:
avg_metrics.append(-1)
std_metrics.append(-1)
exp_result.avg_metrics[metric] = avg_metrics
exp_result.std_metrics[metric] = std_metrics
ts = time.time()
self.exp_execution.timestamp = ts
self.exp_execution.result = exp_result
self.training_job.experiment_result = exp_result
if self.save_to_metastore:
MetastoreFacade.update_experiment_execution(experiment_execution=self.exp_execution,
id=self.exp_execution.id)
MetastoreFacade.update_training_job(training_job=self.training_job, id=self.training_job.id)
return self.exp_execution
[docs] def hparam_names(self) -> List[str]:
"""
:return: a list with the hyperparameter names
"""
return [agents_constants.COMMON.EVAL_BATCH_SIZE, agents_constants.COMMON.CONFIDENCE_INTERVAL,
agents_constants.COMMON.RUNNING_AVERAGE, agents_constants.COMMON.GAMMA,
agents_constants.HSVI.TRANSITION_TENSOR,
agents_constants.HSVI.REWARD_TENSOR, agents_constants.HSVI.OBSERVATION_TENSOR,
agents_constants.HSVI.OBSERVATION_SPACE, agents_constants.HSVI.STATE_SPACE,
agents_constants.HSVI.ACTION_SPACE, agents_constants.HSVI.EPSILON,
agents_constants.HSVI.INITIAL_BELIEF, agents_constants.HSVI.USE_LP,
agents_constants.HSVI.PRUNE_FREQUENCY, agents_constants.HSVI.SIMULATION_FREQUENCY,
agents_constants.HSVI.SIMULATE_HORIZON, agents_constants.HSVI.NUMBER_OF_SIMULATIONS]
[docs] def hsvi_algorithm(self, exp_result: ExperimentResult, seed: int) -> ExperimentResult:
"""
Runs
:param exp_result: the experiment result object
:param seed: the random seed
:return: the updated experiment result
"""
discount_factor = self.experiment_config.hparams[agents_constants.COMMON.GAMMA].value
T = self.experiment_config.hparams[agents_constants.HSVI.TRANSITION_TENSOR].value
R = self.experiment_config.hparams[agents_constants.HSVI.REWARD_TENSOR].value
Z = self.experiment_config.hparams[agents_constants.HSVI.OBSERVATION_TENSOR].value
O = self.experiment_config.hparams[agents_constants.HSVI.OBSERVATION_SPACE].value
S = self.experiment_config.hparams[agents_constants.HSVI.STATE_SPACE].value
A = self.experiment_config.hparams[agents_constants.HSVI.ACTION_SPACE].value
epsilon = self.experiment_config.hparams[agents_constants.HSVI.EPSILON].value
b0 = self.experiment_config.hparams[agents_constants.HSVI.INITIAL_BELIEF].value
lp = self.experiment_config.hparams[agents_constants.HSVI.USE_LP].value
prune_frequency = self.experiment_config.hparams[agents_constants.HSVI.PRUNE_FREQUENCY].value
simulation_frequency = self.experiment_config.hparams[agents_constants.HSVI.SIMULATION_FREQUENCY].value
simulate_horizon = self.experiment_config.hparams[agents_constants.HSVI.SIMULATE_HORIZON].value
number_of_simulations = self.experiment_config.hparams[agents_constants.HSVI.NUMBER_OF_SIMULATIONS].value
Logger.__call__().get_logger().info(f"Starting the HSVI algorithm, epsilon:{epsilon}, "
f"discount_factor: {discount_factor}, lp_nf: {lp}, "
f"prune_frequency: {prune_frequency}, "
f"simulation_frequency: {simulation_frequency}, "
f"number_of_simulations: {number_of_simulations}, "
f"simulate_horizon: {simulate_horizon}, b0: {b0}")
alpha_vectors, avg_returns, running_avg_returns, widths, ub_sizes, lb_sizes, initial_belief_values = self.hsvi(
T=np.array(T), R=np.array(R), O=np.array(O), S=np.array(S), Z=np.array(Z), A=np.array(A),
gamma=discount_factor, b0=b0, epsilon=epsilon, lp=lp, prune_frequency=prune_frequency,
simulation_frequency=simulation_frequency, simulate_horizon=simulate_horizon,
number_of_simulations=number_of_simulations)
exp_result.all_metrics[seed][agents_constants.HSVI.WIDTHS] = widths
exp_result.all_metrics[seed][agents_constants.HSVI.LB_SIZES] = lb_sizes
exp_result.all_metrics[seed][agents_constants.HSVI.UB_SIZES] = ub_sizes
exp_result.all_metrics[seed][agents_constants.HSVI.INITIAL_BELIEF_VALUES] = initial_belief_values
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN] = avg_returns
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN] = running_avg_returns
alpha_vec_policy = AlphaVectorsPolicy(
player_type=self.experiment_config.player_type,
actions=self.simulation_env_config.joint_action_space_config.action_spaces[
self.experiment_config.player_idx].actions,
states=self.simulation_env_config.state_space_config.states,
alpha_vectors=alpha_vectors, agent_type=self.experiment_config.agent_type, avg_R=avg_returns[-1],
simulation_name=self.simulation_env_config.name, transition_tensor=T, reward_tensor=R)
exp_result.policies[seed] = alpha_vec_policy
return exp_result
[docs] def hsvi(self, O: npt.NDArray[Any], Z: npt.NDArray[Any], R: npt.NDArray[Any], T: npt.NDArray[Any],
A: npt.NDArray[Any], S: npt.NDArray[Any], gamma: float, b0: npt.NDArray[Any],
epsilon: float, lp: bool = False, prune_frequency: int = 10,
simulation_frequency: int = 10, simulate_horizon: int = 10, number_of_simulations: int = 10) \
-> Tuple[List[List[float]], List[float], List[float], List[float], List[int], List[int], List[float]]:
"""
Heuristic Search Value Iteration for POMDPs (Trey Smith and Reid Simmons, 2004)
:param O: set of observations of the POMDP
:param Z: observation tensor of the POMDP
:param R: reward tensor of the POMDP
:param T: transition tensor of the POMDP
:param A: action set of the POMDP
:param S: state set of the POMDP
:param gamma: discount factor
:param b0: initial belief point
:param epsilon: accuracy parameter
:param lp: whether to use LP to compute upper bound values or SawTooth approximation
:param prune_frequency: how often to prune the upper and lower bounds
:param simulation_frequency: how frequently to simulate the POMDP to compure rewards of current policy
:param simulate_horizon: length of simulations to compute rewards
:param number_of_simulations: number of simulations to estimate reward
:return: None
"""
# Initialize upper and lower bound
lower_bound = self.initialize_lower_bound(R=R, S=S, A=A, gamma=gamma)
upper_bound = self.initialize_upper_bound(T=T, A=A, S=S, gamma=gamma, R=R)
# Initialize state
w = self.width(lower_bound=lower_bound, upper_bound=upper_bound, b=b0, S=S, lp=lp)
iteration = 0
cumulative_r = 0.0
average_rewards: List[float] = []
widths = []
ub_sizes = []
lb_sizes = []
average_running_rewards: List[float] = []
initial_belief_value = []
# HSVI iterations
while w > epsilon:
lower_bound, upper_bound = self.explore(
b=b0, epsilon=epsilon, t=0, lower_bound=lower_bound, upper_bound=upper_bound,
gamma=gamma, S=S, O=O, R=R, T=T, A=A, Z=Z, lp=lp)
w = self.width(lower_bound=lower_bound, upper_bound=upper_bound, b=b0, S=S, lp=lp)
if iteration % simulation_frequency == 0:
r = 0.0
for i in range(number_of_simulations):
r += self.simulate(horizon=simulate_horizon, b0=b0, lower_bound=np.array(lower_bound), Z=Z,
R=R, gamma=gamma, T=T, A=A, O=O, S=S)
cumulative_r = r / number_of_simulations
if iteration > 1 and iteration % prune_frequency == 0:
size_before_lower_bound = len(lower_bound)
size_before_upper_bound = len(upper_bound[0])
lower_bound = pruning.prune_lower_bound(lower_bound=np.array(lower_bound), S=S)
upper_bound = self.prune_upper_bound(upper_bound=upper_bound, S=S, lp=lp)
Logger.__call__().get_logger().debug(
f"[HSVI] Pruning, LB before:{size_before_lower_bound}, after:{len(lower_bound)}, "
f"UB before: {size_before_upper_bound},after:{len(upper_bound[0])}")
initial_belief_V_star_upper = self.upper_bound_value(upper_bound=upper_bound, b=b0, S=S, lp=lp)
initial_belief_V_star_lower = self.lower_bound_value(lower_bound=lower_bound, b=b0, S=S)
iteration += 1
if iteration % self.experiment_config.log_every == 0 and iteration > 0:
Logger.__call__().get_logger().info(f"[HSVI] iteration: {iteration}, width: {w}, epsilon: {epsilon}, "
f"R: {cumulative_r}, "
f"UB size:{len(upper_bound[0])}, LB size:{len(lower_bound)},"
f"Upper V*[b0]: {initial_belief_V_star_upper},"
f"Lower V*[b0]:{initial_belief_V_star_lower}")
average_rewards.append(cumulative_r)
running_avg_J = ExperimentUtil.running_average(
average_rewards, self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value)
average_running_rewards.append(running_avg_J)
widths.append(w)
ub_sizes.append(len(upper_bound[0]))
lb_sizes.append(len(lower_bound))
initial_belief_value.append(initial_belief_V_star_lower)
return (list(lower_bound), average_rewards, average_running_rewards, widths, ub_sizes, lb_sizes,
initial_belief_value)
[docs] def explore(self, b: npt.NDArray[Any], epsilon: float, t: int, lower_bound: List[int],
upper_bound: Tuple[List[int], List[int]],
gamma: float, S: npt.NDArray[Any], O: npt.NDArray[Any], Z: npt.NDArray[Any], R: npt.NDArray[Any],
T: npt.NDArray[Any], A: npt.NDArray[Any], lp: bool) -> Tuple[List[int], Tuple[List[int], List[int]]]:
"""
Explores the POMDP tree
:param b: current belief
:param epsilon: accuracy parameter
:param t: the current depth of the exploration
:param lower_bound: the lower bound on the value function
:param upper_bound: the upper bound on the value function
:param gamma: discount factor
:param S: set of states
:param O: set of observations
:param Z: observation tensor
:param R: reward tensor
:param T: transition tensor
:param A: set of actions
:param lp: whether to use LP to compute upper bound values
:return: new lower and upper bounds
"""
w = self.width(lower_bound=lower_bound, upper_bound=upper_bound, b=b, S=S, lp=lp)
print(f"explore, w:{w}, ep:{epsilon * math.pow(gamma, -t)}")
if (gamma > 0 and w <= epsilon * math.pow(gamma, -t)) or (w <= epsilon):
return lower_bound, upper_bound
# Determine a*
a_Q_vals = []
for a in A:
upper_Q = self.q(b=b, a=a, lower_bound=lower_bound, upper_bound=upper_bound, S=S, O=O, Z=Z,
R=R, gamma=gamma, T=T, upper=True, lp=lp)
a_Q_vals.append(upper_Q)
a_star = int(np.argmax(np.array(a_Q_vals)))
# Determine o*
o_vals = []
for o in O:
if self.observation_possible(o=o, b=b, Z=Z, T=T, S=S, a=a_star):
new_belief = self.next_belief(o=o, a=a_star, b=b, S=S, Z=Z, T=T)
o_val = (self.p_o_given_b_a(o=o, b=b, a=a_star, S=S, Z=Z, T=T) *
self.excess(lower_bound=lower_bound, upper_bound=upper_bound, b=new_belief, S=S,
epsilon=epsilon, gamma=gamma, t=(t + 1), lp=lp))
o_vals.append(o_val)
else:
o_vals.append(-np.inf)
o_star = int(np.argmax(np.array(o_vals)))
b_prime = self.next_belief(o=o_star, a=a_star, b=b, S=S, Z=Z, T=T)
lower_bound, upper_bound = self.explore(b=b_prime, epsilon=epsilon, t=t + 1, lower_bound=lower_bound,
upper_bound=upper_bound, gamma=gamma, S=S, O=O, R=R, T=T, A=A,
Z=Z, lp=lp)
lower_bound, upper_bound = \
self.local_updates(lower_bound=lower_bound, upper_bound=upper_bound, b=b, A=A, S=S, Z=Z, O=O, R=R,
T=T, gamma=gamma, lp=lp)
return lower_bound, upper_bound
[docs] def initialize_lower_bound(self, R: npt.NDArray[Any], S: npt.NDArray[Any], A: npt.NDArray[Any],
gamma: float) -> List[Any]:
"""
Initializes the lower bound
:param R: reward tensor
:param S: set of states
:param A: set of actions
:param gamma: discount factor
:return: the initialized lower bound
"""
vals_1 = []
for a in A:
vals_2 = []
for s in S:
vals_2.append(R[a][s] / (1 - gamma))
vals_1.append(min(vals_2))
R_underbar = max(vals_1)
alpha_vector = np.zeros(len(S))
alpha_vector.fill(R_underbar)
lower_bound = []
lower_bound.append(alpha_vector)
return lower_bound
[docs] def initialize_upper_bound(self, T: npt.NDArray[Any], A: npt.NDArray[Any], S: npt.NDArray[Any], gamma: float,
R: npt.NDArray[Any]) -> Tuple[List[Any], List[Any]]:
"""
Initializes the upper bound
:param T: the transition tensor
:param A: the set of actions
:param S: the set of states
:param R: the reward tensor
:param gamma: the discount factor
:return: the initialized upper bound
"""
V, pi = self.vi(T=T, num_states=len(S), num_actions=len(A), R=R, theta=0.0001, discount_factor=gamma)
point_set = []
for s in S:
b = self.generate_corner_belief(s=s, S=S)
point_set.append([b, V[s]])
return (point_set, point_set.copy())
[docs] def generate_corner_belief(self, s: int, S: npt.NDArray[Any]):
"""
Generate the corner of the simplex that corresponds to being in some state with probability 1
:param s: the state
:param S: the set of States
:return: the corner belief corresponding to state s
"""
b = np.zeros(len(S))
b[s] = 1
return b
[docs] def local_updates(self, lower_bound: List[Any], upper_bound: Tuple[List[Any], List[Any]], b: npt.NDArray[Any],
A: npt.NDArray[Any], S: npt.NDArray[Any], O: npt.NDArray[Any], R: npt.NDArray[Any],
T: npt.NDArray[Any], gamma: float,
Z: npt.NDArray[Any], lp: bool) -> Tuple[List[Any], Tuple[List[Any], List[Any]]]:
"""
Perform local updates to the upper and lower bounds for the given belief in the heuristic-search-exploration
:param lower_bound: the lower bound on V
:param upper_bound: the upper bound on V
:param b: the current belief point
:param A: the set of actions
:param S: the set of states
:param O: the set of observations
:param R: the reward tensor
:param T: the transition tensor
:param gamma: the discount factor
:param Z: the set of observations
:param lp: a boolean flag whether to use LP to compute upper bound beliefs
:return: The updated lower and upper bounds
"""
new_upper_bound = self.local_upper_bound_update(upper_bound=upper_bound, b=b, A=A, S=S, O=O, R=R, T=T,
gamma=gamma, Z=Z, lp=lp)
new_lower_bound = self.local_lower_bound_update(lower_bound=lower_bound, b=b, Z=Z, A=A, O=O, S=S, T=T,
R=R, gamma=gamma)
return new_lower_bound, new_upper_bound
[docs] def local_upper_bound_update(self, upper_bound: Tuple[List[Any], List[Any]], b: npt.NDArray[Any],
A: npt.NDArray[Any], S: npt.NDArray[Any], O: npt.NDArray[Any], R: npt.NDArray[Any],
T: npt.NDArray[Any], gamma: float, Z: npt.NDArray[Any], lp: bool) \
-> Tuple[List[Any], List[Any]]:
"""
Performs a local update to the upper bound during the heuristic-search exploration
:param upper_bound: the upper bound to update
:param b: the current belief point
:param A: the set of actions
:param S: the set of states
:param O: the set of observations
:param R: the reward tensor
:param T: the transition tensor
:param gamma: the discount factor
:param Z: the set of observations
:param lp: whether or not to use LP to compute upper bound beliefs
:return: the updated upper bound
"""
b, new_val = self.upper_bound_backup(upper_bound=upper_bound, b=b, A=A, S=S, Z=Z, O=O, R=R, T=T,
gamma=gamma, lp=lp)
upper_bound_point_set, corner_points = upper_bound
upper_bound_point_set.append([b, new_val])
new_corner_points = self.update_corner_points(corner_points=corner_points, new_point=(b, new_val))
upper_bound = (upper_bound_point_set, new_corner_points)
return upper_bound
[docs] def update_corner_points(self, corner_points: List[Any], new_point: Tuple[npt.NDArray[Any], float]) -> List[Any]:
"""
(Maybe) update the corner points of the upper bound
:param corner_points: the current set of corner points
:param new_point: the new point to add to the upper bound
:return: the new set of corner points
"""
new_corner_points = []
for cp in corner_points:
corner_match = True
for i in range(len(cp[0])):
if cp[0][i] != new_point[0][i]:
corner_match = False
if corner_match:
new_corner_points.append((cp[0], new_point[1]))
else:
new_corner_points.append(cp)
return new_corner_points
[docs] def local_lower_bound_update(self, lower_bound: List[Any], b: npt.NDArray[Any], A: npt.NDArray[Any],
O: npt.NDArray[Any], Z: npt.NDArray[Any], S: npt.NDArray[Any], T: npt.NDArray[Any],
R: npt.NDArray[Any], gamma: float) -> List[Any]:
"""
Performs a local update to the lower bound given a belief point in the heuristic search
:param lower_bound: the current lower bound
:param b: the current belief point
:param A: the set of actions
:param O: the set of observations
:param Z: the observation tensor
:param S: the set of states
:param T: the transition tensor
:param R: the reward tensor
:param gamma: the discount factor
:return: the updated lower bound
"""
beta = self.lower_bound_backup(lower_bound=lower_bound, b=b, A=A, Z=Z, O=O, S=S, T=T, R=R, gamma=gamma)
if not pruning.check_duplicate(np.array(lower_bound), beta):
lower_bound.append(beta)
return lower_bound
[docs] def lower_bound_backup(self, lower_bound: List[Any], b: npt.NDArray[Any], A: npt.NDArray[Any], O: npt.NDArray[Any],
Z: npt.NDArray[Any], S: npt.NDArray[Any], T: npt.NDArray[Any], R: npt.NDArray[Any],
gamma: float) -> npt.NDArray[Any]:
"""
Generates a new alpha-vector for the lower bound
:param lower_bound: the current lower bound
:param b: the current belief point
:param A: the set of actions
:param O: the set of observations
:param Z: the observation tensor
:param S: the set of states
:param T: the transition tensor
:param R: the reward tensor
:param gamma: the discount factor
:return: the new alpha vector
"""
max_beta_a_o_alpha_vecs = []
for a in A:
max_beta_a_vecs = []
for o in O:
if self.observation_possible(o=o, b=b, Z=Z, T=T, S=S, a=a):
new_belief = np.array(self.next_belief(o=o, a=a, b=b, S=S, Z=Z, T=T))
max_alpha_vec = lower_bound[0]
max_alpha_val = float("-inf")
for alpha_vec in lower_bound:
alpha_val = new_belief.dot(alpha_vec)
if alpha_val > max_alpha_val:
max_alpha_val = alpha_val
max_alpha_vec = alpha_vec
max_beta_a_vecs.append(max_alpha_vec)
max_beta_a_o_alpha_vecs.append(max_beta_a_vecs)
beta_a_vecs = []
for a in A:
beta_a_vec = []
for s in S:
o_idx = 0
beta_a_s_val = 0.0
beta_a_s_val += R[a][s]
expected_future_vals = 0.0
for o in O:
if self.observation_possible(o=o, b=b, Z=Z, T=T, S=S, a=a):
for s_prime in S:
expected_future_vals += (max_beta_a_o_alpha_vecs[a][o_idx][s_prime] * Z[a][s_prime][o] *
T[a][s][s_prime])
o_idx += 1
beta_a_s_val += gamma * expected_future_vals
beta_a_vec.append(beta_a_s_val)
beta_a_vecs.append(beta_a_vec)
beta = beta_a_vecs[0]
max_val = float("-inf")
for beta_a_vec in beta_a_vecs:
val = np.array(beta_a_vec).dot(np.array(b))
if val > max_val:
max_val = val
beta = beta_a_vec
return np.array(beta)
[docs] def upper_bound_backup(self, upper_bound: Tuple[List[Any], List[Any]], b: npt.NDArray[Any], A: npt.NDArray[Any],
S: npt.NDArray[Any], O: npt.NDArray[Any], Z: npt.NDArray[Any], R: npt.NDArray[Any],
T: npt.NDArray[Any], gamma: float, lp: bool) -> Tuple[npt.NDArray[Any], float]:
"""
Adds a point to the upper bound
:param upper_bound: the current upper bound
:param b: the current belief point
:param A: the set of actions
:param S: the set of states
:param O: the set of observations
:param Z: the observation tensor
:param R: the reward tensor
:param T: the transition tensor
:param gamma: the discount factor
:param lp: a boolean flag whether to use LP to compute the upper bound belief
:return: the new point
"""
q_vals = []
for a in A:
v = 0
for s in S:
immediate_r = b[s] * R[a][s]
expected_future_rew = 0
for o in O:
if self.observation_possible(o=o, b=b, Z=Z, T=T, S=S, a=a):
new_belief = self.next_belief(o=o, a=a, b=b, S=S, Z=Z, T=T)
for s_prime in S:
expected_future_rew += \
b[s] * T[a][s][s_prime] * Z[a][s_prime][o] * self.upper_bound_value(
upper_bound=upper_bound, b=new_belief, S=S, lp=lp)
v += immediate_r + gamma * expected_future_rew
q_vals.append(v)
new_val = max(q_vals)
return b, new_val
[docs] def lp_convex_hull_projection_lp(self, upper_bound: Tuple[List[Any], List[Any]], b: npt.NDArray[Any],
S: npt.NDArray[Any]) -> float:
"""
Reference: (Hauskreht 2000)
Computes the upper bound belief by performing a projection onto the convex hull of the upper bound,
it is computed by solving an LP
:param upper_bound: the upper bound
:param b: the belief point to compute the value for
:param S: the set of states
:return: the upper bound value of the belief point
"""
upper_bound_point_set, corner_points = upper_bound
problem = pulp.LpProblem("ConvexHullProjection", pulp.LpMinimize)
# Convex hull coefficients
lamb = []
for i in range(len(upper_bound_point_set)):
lamb_i = pulp.LpVariable("lambda_" + str(i), lowBound=0, upBound=1, cat=pulp.LpContinuous)
lamb.append(lamb_i)
# The objective function
objective = 0
for i, point in enumerate(upper_bound_point_set):
objective += lamb[i] * point[1]
problem += objective, "Convex hull projection"
# --- The constraints ---
# Belief probability constraint
for j in range(len(S)):
belief_sum = 0
for i, point in enumerate(upper_bound_point_set):
belief_sum += lamb[i] * point[0][j]
problem += belief_sum == b[j], "BeliefVectorConstraint_" + str(j)
# Convex Hull constraint
lambda_weights_sum = 0
for i in range(len(lamb)):
lambda_weights_sum += lamb[i]
problem += lambda_weights_sum == 1, "ConvexHullWeightsSumConstraint"
# Solve
problem.solve(pulp.PULP_CBC_CMD(msg=0))
# Obtain solution
projected_lamb_coefficients = []
belief_value = 0
for i in range(len(upper_bound_point_set)):
projected_lamb_coefficients.append(lamb[i].varValue)
belief_value += projected_lamb_coefficients[i] * upper_bound_point_set[i][1]
return belief_value
[docs] def approximate_projection_sawtooth(self, upper_bound: Tuple[List[Any], List[Any]], b: npt.NDArray[Any],
S: npt.NDArray[Any]) -> float:
"""
Reference: (Hauskreht 2000)
Performs an approximate projection of the belief onto the convex hull of the upepr bound
to compute the upper bound value of the belief
:param upper_bound: the upper bound
:param b: the belief point
:param S: the set of states
:return: the value of the belief point
"""
upper_bound_point_set, corner_points = upper_bound
alpha_corner = np.array(list(map(lambda x: x[1], corner_points)))
# min_val = corner_points_belief_value
interior_belief_values = []
for point in upper_bound_point_set:
interior_belief_values.append(self.interior_point_belief_val(interior_point=point, b=b,
alpha_corner=alpha_corner, S=S))
return min(interior_belief_values)
[docs] def interior_point_belief_val(self, interior_point: Tuple[npt.NDArray[Any], float], b: npt.NDArray[Any],
alpha_corner: npt.NDArray[Any], S: npt.NDArray[Any]) -> float:
"""
Computes the value induced on the belief point b projected onto the convex hull by a given interior belief point
:param interior_point: the interior point
:param b: the belief point
:param alpha_corner: the alpha vector corresponding to the corners of the belief simplex
:param S: the set of states
:return: the value of the belief point induced by the interior point
"""
min_ratios = []
for s in S:
if interior_point[0][s] > 0:
min_ratio = b[s] / interior_point[0][s]
min_ratios.append(min_ratio)
else:
min_ratios.append(float("inf"))
min_ratio = min(min_ratios)
if min_ratio > 1:
min_ratio = 1
interior_alpha_corner_dot = alpha_corner.dot(interior_point[0])
return float(interior_alpha_corner_dot + min_ratio * (interior_point[1] - interior_alpha_corner_dot))
[docs] def upper_bound_value(self, upper_bound: Tuple[List[Any], List[Any]], b: npt.NDArray[Any], S: npt.NDArray[Any],
lp: bool = False) -> float:
"""
Computes the upper bound value of a given belief point
:param upper_bound: the upper bound
:param b: the belief point
:param S: the set of states
:param lp: boolean flag that decides whether to use LP to compute the upper bound value or not
:return: the upper bound value
"""
if lp:
return self.lp_convex_hull_projection_lp(upper_bound=upper_bound, b=b, S=S)
else:
return self.approximate_projection_sawtooth(upper_bound=upper_bound, b=b, S=S)
[docs] def lower_bound_value(self, lower_bound: List[Any], b: npt.NDArray[Any], S: npt.NDArray[Any]) -> float:
"""
Computes the lower bound value of a given belief point
:param lower_bound: the lower bound
:param b: the belief point
:param S: the set of states
:return: the lower bound value
"""
alpha_vals = []
for alpha_vec in lower_bound:
sum = 0
for s in S:
sum += b[s] * alpha_vec[s]
alpha_vals.append(sum)
return max(alpha_vals)
[docs] def next_belief(self, o: int, a: int, b: npt.NDArray[Any], S: npt.NDArray[Any], Z: npt.NDArray[Any],
T: npt.NDArray[Any]) -> npt.NDArray[Any]:
"""
Computes the next belief using a Bayesian filter
:param o: the latest observation
:param a: the latest action
:param b: the current belief
:param S: the set of states
:param Z: the observation tensor
:param T: the transition tensor
:return: the new belief
"""
b_prime = np.zeros(len(S))
for s_prime in S:
b_prime[s_prime] = self.bayes_filter(s_prime=s_prime, o=o, a=a, b=b, S=S, Z=Z, T=T)
assert round(sum(b_prime), 5) == 1
return b_prime
[docs] def bayes_filter(self, s_prime: int, o: int, a: int, b: npt.NDArray[Any], S: npt.NDArray[Any], Z: npt.NDArray[Any],
T: npt.NDArray[Any]) -> float:
"""
A Bayesian filter to compute the belief of being in s_prime when observing o after taking action a in belief b
:param s_prime: the state to compute the belief of
:param o: the observation
:param a: the action
:param b: the current belief point
:param S: the set of states
:param Z: the observation tensor
:param T: the transition tensor
:return: b_prime(s_prime)
"""
norm = 0
for s in S:
for s_prime_1 in S:
prob_1 = Z[a][s_prime_1][o]
norm += b[s] * prob_1 * T[a][s][s_prime_1]
if norm == 0:
return 0
temp = 0
for s in S:
temp += b[s] * T[a][s][s_prime] * Z[a][s_prime][o]
b_prime = (temp) / norm
assert b_prime <= 1
return b_prime
[docs] def p_o_given_b_a(self, o: int, b: npt.NDArray[Any], a: int, S: npt.NDArray[Any], Z: npt.NDArray[Any],
T: npt.NDArray[Any]) -> float:
"""
Computes P[o|a,b]
:param o: the observation
:param b: the belief point
:param a: the action
:param S: the set of states
:param Z: the observation tensor
:param T: the transition tensor
:return: the probability of observing o when taking action a in belief point b
"""
prob = 0
for s in S:
for s_prime in S:
prob += b[s] * T[a][s][s_prime] * Z[a][s_prime][o]
prob = round(prob, 3)
assert round(prob, 3) <= 1
return prob
[docs] def excess(self, lower_bound: List[Any], upper_bound: Tuple[List[Any], List[Any]],
b: npt.NDArray[Any], S: npt.NDArray[Any],
epsilon: float, gamma: float, t: int, lp: bool) -> float:
"""
Computes the excess uncertainty (Trey Smith and Reid Simmons, 2004)
:param lower_bound: the lower bound
:param upper_bound: the upper bound
:param b: the current belief point
:param S: the set of states
:param epsilon: the epsilon accuracy parameter
:param gamma: the discount factor
:param t: the current exploration depth
:param lp: whether to use LP or not to compute upper bound belief values
:return: the excess uncertainty
"""
w = self.width(lower_bound=lower_bound, upper_bound=upper_bound, b=b, S=S, lp=lp)
if gamma == 0:
return w
else:
return w - epsilon * math.pow(gamma, -(t))
[docs] def width(self, lower_bound: List[Any], upper_bound: Tuple[List[Any], List[Any]],
b: npt.NDArray[Any], S: npt.NDArray[Any],
lp: bool) -> float:
"""
Computes the bounds width (Trey Smith and Reid Simmons, 2004)
:param lower_bound: the current lower bound
:param upper_bound: the current upper bound
:param b: the current belief point
:param S: the set of states
:param lp: boolean flag that decides whether to use LP to compute upper bound belief values
:return: the width of the bounds
"""
ub = self.upper_bound_value(upper_bound=upper_bound, b=b, S=S, lp=lp)
lb = self.lower_bound_value(lower_bound=lower_bound, b=b, S=S)
return ub - lb
[docs] def q_hat_interval(self, b: npt.NDArray[Any], a: int, S: npt.NDArray[Any], O: npt.NDArray[Any],
Z: npt.NDArray[Any], R: npt.NDArray[Any], T: npt.NDArray[Any], gamma: float,
lower_bound: List[int], upper_bound: Tuple[List[Any], List[Any]], lp: bool) -> List[float]:
"""
Computes the interval (Trey Smith and Reid Simmons, 2004)
:param b: the current belief point
:param a: the action
:param S: the set of states
:param O: the set of observations
:param Z: the observation tensor
:param R: the reward tensor
:param T: the transition tensor
:param gamma: the discount factor
:param lower_bound: the lower bound
:param upper_bound: the upper bound
:param lp: boolean flag that decides whether to use LP to compute upper bound belief values
:return: the interval
"""
upper_Q = self.q(b=b, a=a, lower_bound=lower_bound, upper_bound=upper_bound, S=S, O=O, Z=Z,
R=R, gamma=gamma, T=T, upper=True, lp=lp)
lower_Q = self.q(b=b, a=a, lower_bound=lower_bound, upper_bound=upper_bound, S=S, O=O, Z=Z,
R=R, gamma=gamma, T=T, upper=False, lp=lp)
return [lower_Q, upper_Q]
[docs] def q(self, b: npt.NDArray[Any], a: int, lower_bound: List[int], upper_bound: Tuple[List[int], List[int]],
S: npt.NDArray[Any], O: npt.NDArray[Any], Z: npt.NDArray[Any], R: npt.NDArray[Any], gamma: float,
T: npt.NDArray[Any], upper: bool = True, lp: bool = False) -> float:
"""
Applies the Bellman equation to compute Q values
:param b: the belief point
:param a: the action
:param lower_bound: the lower bound
:param upper_bound: the upper bound
:param S: the set of states
:param O: the set of observations
:param Z: the observation tensor
:param R: the reward tensor
:param gamma: the discount factor
:param T: the transition tensor
:param upper: boolean flag that decides whether to use the upper bound or lower bound on V to
compute the Q-value
:param lp: boolean flag that decides whether to use LP to compute upper bound belief values
:return: the Q-value
"""
Q_val = 0
for s in S:
immediate_r = R[a][s] * b[s]
expected_future_rew = 0
for o in O:
if self.observation_possible(o=o, b=b, Z=Z, T=T, S=S, a=a):
new_belief = self.next_belief(o=o, a=a, b=b, S=S, Z=Z, T=T)
for s_prime in S:
if upper:
future_val = self.upper_bound_value(upper_bound=upper_bound, b=new_belief, S=S, lp=lp)
else:
future_val = self.lower_bound_value(lower_bound=lower_bound, b=new_belief, S=S)
expected_future_rew += \
b[s] * T[a][s][s_prime] * Z[a][s_prime][o] * future_val
Q_val += immediate_r + expected_future_rew * gamma
return Q_val
[docs] def prune_upper_bound(self, upper_bound: Tuple[List[Any], List[Any]], S: npt.NDArray[Any], lp: bool) \
-> Tuple[List[Any], List[Any]]:
"""
Prunes the points in the upper bound
:param upper_bound: the current upper bound
:param S: the set of states
:param lp: boolean flag that decides whether to use LP to compute upper bound belief values
:return: the pruned upper bound
"""
upper_bound_point_set, corner_points = upper_bound
pruned_upper_bound_point_set = []
for point in upper_bound_point_set:
true_val = self.upper_bound_value(upper_bound=upper_bound, S=S, b=point[0], lp=lp)
if not (point[1] > true_val):
pruned_upper_bound_point_set.append(point)
return pruned_upper_bound_point_set, corner_points
[docs] def simulate(self, horizon: int, b0: npt.NDArray[Any], lower_bound: npt.NDArray[Any],
Z: npt.NDArray[Any], R: npt.NDArray[Any], gamma: float,
T: npt.NDArray[Any], A: npt.NDArray[Any], O: npt.NDArray[Any], S: npt.NDArray[Any]) -> float:
"""
Simulates the POMDP to estimate the reward of the greedy policy with respect to the value function represented
by the lower bound
:param horizon: the horizon for the simulation
:param b0: the initial belief
:param lower_bound: the lower bound which represents the value function
:param Z: the observation tensor
:param R: the reward tensor
:param gamma: the discount factor
:param T: the transition operator
:param A: the action set
:param O: the observation set
:param S: the set of states
:return: the cumulative discounted reward
"""
t = 0
b = b0
cumulative_r = 0.0
while t < horizon:
q_values = list(map(lambda a: self.q(
b=b, a=a, lower_bound=list(lower_bound.tolist()), upper_bound=([], []), S=S, O=O, Z=Z,
R=R, gamma=gamma, T=T, upper=False, lp=False), A))
a = int(np.argmax(np.array(q_values)))
r = 0
for s in S:
r += R[a][s] * b[s]
cumulative_r += math.pow(gamma, t) * r
observation_probabilities = []
for o in O:
p = 0
for s in S:
for s_prime in S:
p += b[s] * T[a][s][s_prime] * Z[a][s_prime][o]
observation_probabilities.append(p)
o = np.random.choice(np.arange(0, len(O)), p=observation_probabilities)
b = self.next_belief(o=o, a=a, b=b, S=S, Z=Z, T=T)
t += 1
return cumulative_r
[docs] def one_step_lookahead(self, state, V, num_actions, num_states, T, discount_factor, R) -> npt.NDArray[Any]:
"""
Performs a one-step lookahead for value iteration
:param state: the current state
:param V: the current value function
:param num_actions: the number of actions
:param num_states: the number of states
:param T: the transition kernel
:param discount_factor: the discount factor
:param R: the table with rewards
:param next_state_lookahead: the next state lookahead table
:return: an array with lookahead values
"""
A = np.zeros(num_actions)
for a in range(num_actions):
reward = R[a][state]
for next_state in range(num_states):
prob = T[a][state][next_state]
A[a] += prob * (reward + discount_factor * V[next_state])
return A
[docs] def vi(self, T: npt.NDArray[Any], num_states: int, num_actions: int, R: npt.NDArray[Any],
theta=0.0001, discount_factor=1.0) -> Tuple[npt.NDArray[Any], npt.NDArray[Any]]:
"""
An implementation of the Value Iteration algorithm
:param T: the transition kernel T
:param num_states: the number of states
:param num_actions: the number of actions
:param state_to_id: the state-to-id lookup table
:param HP: the table with hack probabilities
:param R: the table with rewards
:param next_state_lookahead: the next-state-lookahead table
:param theta: convergence threshold
:param discount_factor: the discount factor
:return: (greedy policy, value function)
"""
V = np.zeros(num_states)
while True:
# Stopping condition
delta = 0
# Update each state...
for s in range(num_states):
# Do a one-step lookahead to find the best action
A = self.one_step_lookahead(s, V, num_actions, num_states, T, discount_factor, R)
best_action_value = np.max(A)
# Calculate delta across all states seen so far
delta = max(delta, np.abs(best_action_value - V[s]))
# Update the value function. Ref: Sutton book eq. 4.10.
V[s] = best_action_value
# Check if we can stop
if delta < theta:
break
# Create a deterministic policy using the optimal value function
policy = np.zeros([num_states, num_actions * 2])
for s in range(num_states):
# One step lookahead to find the best action for this state
A = self.one_step_lookahead(s, V, num_actions, num_states, T, discount_factor, R)
best_action = np.argmax(A)
# Always take the best action
policy[s, best_action] = 1.0
return V, policy
[docs] def observation_possible(self, o, b, Z, T, S, a) -> bool:
"""
Checks if a given observation can be observed when taking a given action in a given state
:param o: the observation to check
:param b: the belief to check
:param Z: the observation tensor
:param T: the transition tensor
:param S: the state space
:param a: the aciton tocheck
:return: true if possible otherwise fasle
"""
prob = 0
for s in S:
for s_prime in S:
prob += b[s] * T[a][s][s_prime] * Z[a][s_prime][o]
return prob > 0