"""
MIT License
Copyright (c) 2019 MCS developers https://github.com/vojha-code/Multilevel-Coordinate-Search
"""
from typing import Tuple
import copy
import sys
import os
import time
import math
from numpy.typing import NDArray
from typing import Union, List, Optional, Any, Dict
import gymnasium as gym
import numpy as np
import gym_csle_stopping_game.constants.constants as env_constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.training.experiment_config import ExperimentConfig
from csle_common.dao.training.experiment_execution import ExperimentExecution
from csle_common.dao.training.experiment_result import ExperimentResult
from csle_common.dao.training.agent_type import AgentType
from csle_common.dao.training.player_type import PlayerType
from csle_common.util.experiment_util import ExperimentUtil
from csle_common.logging.log import Logger
from csle_common.dao.training.multi_threshold_stopping_policy import MultiThresholdStoppingPolicy
from csle_common.dao.training.linear_threshold_stopping_policy import LinearThresholdStoppingPolicy
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.dao.jobs.training_job_config import TrainingJobConfig
from csle_common.util.general_util import GeneralUtil
from csle_common.dao.simulation_config.base_env import BaseEnv
from csle_common.dao.training.policy_type import PolicyType
from csle_agents.agents.base.base_agent import BaseAgent
import csle_agents.constants.constants as agents_constants
from csle_agents.agents.mcs.mcs_utils.mcs_fun import MCSUtils
from csle_agents.agents.mcs.mcs_utils.gls_utils import GLSUtils
from csle_agents.agents.mcs.mcs_utils.ls_utils import LSUtils
[docs]class MCSAgent(BaseAgent):
"""
Multi-Level Coordinate Search Agent
"""
def __init__(self, simulation_env_config: SimulationEnvConfig,
emulation_env_config: Union[None, EmulationEnvConfig], experiment_config: ExperimentConfig,
env: Optional[BaseEnv] = None, training_job: Optional[TrainingJobConfig] = None,
save_to_metastore: bool = True) -> None:
"""
Initializes the MCS Agent
:param simulation_env_config: the simulation env config
:param emulation_env_config: the emulation env config
:param experiment_config: the experiment config
:param env: (optional) the gym environment to use for simulation
:param training_job: (optional) a training job configuration
:param save_to_metastore: boolean flag that can be set to avoid saving results and progress to the metastore
"""
super().__init__(simulation_env_config=simulation_env_config, emulation_env_config=emulation_env_config,
experiment_config=experiment_config)
assert experiment_config.agent_type == AgentType.MCS
self.env = env
self.training_job = training_job
self.save_to_metastore = save_to_metastore
[docs] def eval_theta(self, policy: Union[MultiThresholdStoppingPolicy, LinearThresholdStoppingPolicy],
max_steps: int = 200) -> Dict[str, Union[float, int]]:
"""
Evaluates a given threshold policy by running monte-carlo simulations
:param policy: the policy to evaluate
:return: the average metrics of the evaluation
"""
if self.env is None:
raise ValueError("Need to specify an environment to run policy evaluation")
eval_batch_size = self.experiment_config.hparams[agents_constants.COMMON.EVAL_BATCH_SIZE].value
metrics: Dict[str, Any] = {}
for j in range(eval_batch_size):
done = False
o, _ = self.env.reset()
l = int(o[0])
b1 = o[1]
t = 1
r = 0
a = 0
info: Dict[str, Any] = {}
while not done and t <= max_steps:
Logger.__call__().get_logger().debug(f"t:{t}, a: {a}, b1:{b1}, r:{r}, l:{l}, info:{info}")
if self.experiment_config.player_type == PlayerType.ATTACKER:
policy.opponent_strategy = self.env.static_defender_strategy
a = policy.action(o=o)
else:
a = policy.action(o=o)
o, r, done, _, info = self.env.step(a)
l = int(o[0])
b1 = o[1]
t += 1
metrics = MCSAgent.update_metrics(metrics=metrics, info=info)
avg_metrics = MCSAgent.compute_avg_metrics(metrics=metrics)
avg_metrics[env_constants.ENV_METRICS.RETURN] = -avg_metrics[env_constants.ENV_METRICS.RETURN]
return avg_metrics
[docs] @staticmethod
def update_metrics(metrics: Dict[str, List[Union[float, int]]], info: Dict[str, Union[float, int]]) \
-> Dict[str, List[Union[float, int]]]:
"""
Update a dict with aggregated metrics using new information from the environment
:param metrics: the dict with the aggregated metrics
:param info: the new information
:return: the updated dict of metrics
"""
for k, v in info.items():
if k in metrics:
metrics[k].append(round(v, 3))
else:
metrics[k] = [v]
return metrics
[docs] @staticmethod
def compute_avg_metrics(metrics: Dict[str, List[Union[float, int]]]) -> Dict[str, Union[float, int]]:
"""
Computes the average metrics of a dict with aggregated metrics
:param metrics: the dict with the aggregated metrics
:return: the average metrics
"""
avg_metrics = {}
for k, v in metrics.items():
avg = round(sum(v) / len(v), 2)
avg_metrics[k] = avg
return avg_metrics
[docs] def hparam_names(self) -> List[str]:
"""
Function that contains the hyperparameter names
:return: a list with the hyperparameter names
"""
return [agents_constants.MCS.STEP, agents_constants.MCS.STEP1, agents_constants.MCS.U, agents_constants.MCS.V,
agents_constants.MCS.LOCAL, agents_constants.MCS.STOPPING_ACTIONS, agents_constants.MCS.GAMMA,
agents_constants.MCS.EPSILON, agents_constants.COMMON.CONFIDENCE_INTERVAL,
agents_constants.COMMON.RUNNING_AVERAGE]
[docs] def train(self) -> ExperimentExecution:
"""
Initiating the parameters of performing the MCS algorithm, using external functions
:return: The experiment execution
"""
pid = os.getpid()
u = self.experiment_config.hparams[agents_constants.MCS.U].value
v = self.experiment_config.hparams[agents_constants.MCS.V].value
iinit = self.experiment_config.hparams[agents_constants.MCS.IINIT].value
local = self.experiment_config.hparams[agents_constants.MCS.LOCAL].value
eps = self.experiment_config.hparams[agents_constants.MCS.EPSILON].value
gamma = self.experiment_config.hparams[agents_constants.MCS.GAMMA].value
# prt = self.experiment_config.hparams[agents_constants.MCS.PRT].value
# m = self.experiment_config.hparams[agents_constants.MCS.M].value
stopping_actions = self.experiment_config.hparams[agents_constants.MCS.STOPPING_ACTIONS].value
n = len(u)
smax = 5 * n + 10
nf = 50 * pow(n, 2)
stop: List[Union[float, int]] = [3 * n]
hess = np.ones((n, n))
stop.append(float("-inf"))
exp_result = ExperimentResult()
exp_result.plot_metrics.append(agents_constants.COMMON.AVERAGE_RETURN)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_RETURN)
exp_result.plot_metrics.append(env_constants.ENV_METRICS.INTRUSION_LENGTH)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_INTRUSION_LENGTH)
exp_result.plot_metrics.append(env_constants.ENV_METRICS.INTRUSION_START)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_INTRUSION_START)
exp_result.plot_metrics.append(env_constants.ENV_METRICS.TIME_HORIZON)
exp_result.plot_metrics.append(agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON)
exp_result.plot_metrics.append(env_constants.ENV_METRICS.AVERAGE_UPPER_BOUND_RETURN)
exp_result.plot_metrics.append(env_constants.ENV_METRICS.AVERAGE_DEFENDER_BASELINE_STOP_ON_FIRST_ALERT_RETURN)
for l in range(1, self.experiment_config.hparams[agents_constants.MCS.STOPPING_ACTIONS].value + 1):
exp_result.plot_metrics.append(env_constants.ENV_METRICS.STOP + f"_{l}")
exp_result.plot_metrics.append(env_constants.ENV_METRICS.STOP + f"_running_average_{l}")
descr = f"Training of policies with the random search algorithm using " \
f"simulation:{self.simulation_env_config.name}"
for seed in self.experiment_config.random_seeds:
exp_result.all_metrics[seed] = {}
exp_result.all_metrics[seed][agents_constants.MCS.THETAS] = []
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN] = []
exp_result.all_metrics[seed][agents_constants.MCS.THRESHOLDS] = []
if self.experiment_config.player_type == PlayerType.DEFENDER:
for l in range(1, self.experiment_config.hparams[agents_constants.MCS.STOPPING_ACTIONS].value + 1):
exp_result.all_metrics[seed][
agents_constants.NELDER_MEAD.STOP_DISTRIBUTION_DEFENDER + f"_l={l}"] = []
else:
for s in self.simulation_env_config.state_space_config.states:
for l in range(1, self.experiment_config.hparams[agents_constants.MCS.STOPPING_ACTIONS].value + 1):
exp_result.all_metrics[seed][agents_constants.NELDER_MEAD.STOP_DISTRIBUTION_ATTACKER
+ f"_l={l}_s={s.id}"] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_INTRUSION_START] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON] = []
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_INTRUSION_LENGTH] = []
exp_result.all_metrics[seed][env_constants.ENV_METRICS.INTRUSION_START] = []
exp_result.all_metrics[seed][env_constants.ENV_METRICS.INTRUSION_LENGTH] = []
exp_result.all_metrics[seed][env_constants.ENV_METRICS.TIME_HORIZON] = []
exp_result.all_metrics[seed][env_constants.ENV_METRICS.AVERAGE_UPPER_BOUND_RETURN] = []
exp_result.all_metrics[seed][
env_constants.ENV_METRICS.AVERAGE_DEFENDER_BASELINE_STOP_ON_FIRST_ALERT_RETURN] = []
for l in range(1, self.experiment_config.hparams[agents_constants.MCS.STOPPING_ACTIONS].value + 1):
exp_result.all_metrics[seed][env_constants.ENV_METRICS.STOP + f"_{l}"] = []
exp_result.all_metrics[seed][env_constants.ENV_METRICS.STOP + f"_running_average_{l}"] = []
# Initialize training job
if self.training_job is None:
emulation_name = ""
if self.emulation_env_config is not None:
emulation_name = self.emulation_env_config.name
self.training_job = TrainingJobConfig(
simulation_env_name=self.simulation_env_config.name, experiment_config=self.experiment_config,
progress_percentage=0, pid=pid, experiment_result=exp_result,
emulation_env_name=emulation_name, simulation_traces=[],
num_cached_traces=agents_constants.COMMON.NUM_CACHED_SIMULATION_TRACES,
log_file_path=Logger.__call__().get_log_file_path(), descr=descr,
physical_host_ip=GeneralUtil.get_host_ip())
if self.save_to_metastore:
training_job_id = MetastoreFacade.save_training_job(training_job=self.training_job)
self.training_job.id = training_job_id
else:
self.training_job.pid = pid
self.training_job.progress_percentage = 0
self.training_job.experiment_result = exp_result
if self.save_to_metastore:
MetastoreFacade.update_training_job(training_job=self.training_job, id=self.training_job.id)
# Initialize execution result
ts = time.time()
emulation_name = ""
if self.emulation_env_config is not None:
emulation_name = self.emulation_env_config.name
simulation_name = self.simulation_env_config.name
self.exp_execution = ExperimentExecution(
result=exp_result, config=self.experiment_config, timestamp=ts, emulation_name=emulation_name,
simulation_name=simulation_name, descr=descr, log_file_path=self.training_job.log_file_path)
if self.save_to_metastore:
exp_execution_id = MetastoreFacade.save_experiment_execution(self.exp_execution)
self.exp_execution.id = exp_execution_id
config = self.simulation_env_config.simulation_env_input_config
if self.env is None:
self.env = gym.make(self.simulation_env_config.gym_env_name, config=config)
for seed in self.experiment_config.random_seeds:
# ExperimentUtil.set_seed(seed)
exp_result = self.MCS(exp_result=exp_result, seed=seed, random_seeds=self.experiment_config.random_seeds,
training_job=self.training_job, u=u, v=v, smax=smax, nf=nf, stop=stop, iinit=iinit,
local=local, gamma=gamma, hess=hess, stopping_actions=stopping_actions, eps=eps,
n=n)
if self.save_to_metastore:
MetastoreFacade.save_simulation_trace(self.env.get_traces()[-1])
self.env.reset_traces()
# Calculate average and std metrics
exp_result.avg_metrics = {}
exp_result.std_metrics = {}
for metric in exp_result.all_metrics[self.experiment_config.random_seeds[0]].keys():
value_vectors = []
for seed in self.experiment_config.random_seeds:
value_vectors.append(exp_result.all_metrics[seed][metric])
max_num_measurements = max(list(map(lambda x: len(x), value_vectors)))
value_vectors = list(filter(lambda x: len(x) == max_num_measurements, value_vectors))
avg_metrics = []
std_metrics = []
for i in range(len(value_vectors[0])):
if type(value_vectors[0][0]) is int or type(value_vectors[0][0]) is float \
or type(value_vectors[0][0]) is np.int64 or type(value_vectors[0][0]) is np.float64:
seed_values = []
for seed_idx in range(len(value_vectors)):
seed_values.append(value_vectors[seed_idx][i])
avg = ExperimentUtil.mean_confidence_interval(
data=seed_values,
confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[0]
if not math.isnan(avg):
avg_metrics.append(avg)
ci = ExperimentUtil.mean_confidence_interval(
data=seed_values,
confidence=self.experiment_config.hparams[agents_constants.COMMON.CONFIDENCE_INTERVAL].value)[1]
if not math.isnan(ci):
std_metrics.append(ci)
else:
std_metrics.append(-1)
else:
avg_metrics.append(-1)
std_metrics.append(-1)
exp_result.avg_metrics[metric] = avg_metrics
exp_result.std_metrics[metric] = std_metrics
traces = self.env.get_traces()
if len(traces) > 0 and self.save_to_metastore:
MetastoreFacade.save_simulation_trace(traces[-1])
ts = time.time()
self.exp_execution.timestamp = ts
self.exp_execution.result = exp_result
if self.save_to_metastore:
MetastoreFacade.update_experiment_execution(experiment_execution=self.exp_execution,
id=self.exp_execution.id)
return self.exp_execution
[docs] def get_policy(self, theta: Union[List[Union[float, int]], NDArray[np.float64]], L: int) \
-> Union[MultiThresholdStoppingPolicy, LinearThresholdStoppingPolicy]:
"""
Gets the policy of a given parameter vector
:param theta: the parameter vector
:param L: the number of parameters
:return: the policy
"""
if self.experiment_config.hparams[agents_constants.SIMULATED_ANNEALING.POLICY_TYPE].value \
== PolicyType.MULTI_THRESHOLD.value:
policy = MultiThresholdStoppingPolicy(
theta=list(theta), simulation_name=self.simulation_env_config.name,
states=self.simulation_env_config.state_space_config.states,
player_type=self.experiment_config.player_type, L=L,
actions=self.simulation_env_config.joint_action_space_config.action_spaces[
self.experiment_config.player_idx].actions, experiment_config=self.experiment_config, avg_R=-1,
agent_type=AgentType.SIMULATED_ANNEALING)
else:
policy = LinearThresholdStoppingPolicy(
theta=list(theta), simulation_name=self.simulation_env_config.name,
states=self.simulation_env_config.state_space_config.states,
player_type=self.experiment_config.player_type, L=L,
actions=self.simulation_env_config.joint_action_space_config.action_spaces[
self.experiment_config.player_idx].actions, experiment_config=self.experiment_config, avg_R=-1,
agent_type=AgentType.SIMULATED_ANNEALING)
return policy
[docs] def init_list(self, theta0: NDArray[np.int32], l: NDArray[np.int32], L: NDArray[np.int32],
stopping_actions: int, n: int, ncall: int = 0) \
-> Tuple[NDArray[np.float32], NDArray[np.float32], int,
Union[MultiThresholdStoppingPolicy, LinearThresholdStoppingPolicy]]:
"""
Computes the function values corresponding to the initialization list
and the pointer istar to the final best point x^* of the init. list
:param theta0: theta0
:param l: Indication of the mid point
:param L: Indication of the end point (or total number of partition of the value x in the i'th dimenstion)
:param stopping actions: stopping actions for the eval_theta function
:param n: dimension (should equal the number of stopping actions)
:return : initial conditions
"""
theta = np.zeros(n)
for i in range(n):
theta[i] = theta0[i, l[i]]
policy = self.get_policy(theta, L=stopping_actions)
avg_metrics = self.eval_theta(
policy=policy, max_steps=self.experiment_config.hparams[agents_constants.COMMON.MAX_ENV_STEPS].value)
J1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall += 1
J0 = np.zeros((L[0] + 1, n))
J0[l[0], 0] = J1
istar = np.zeros(n).astype(int)
for i in range(n):
istar[i] = l[i]
for j in range(L[i] + 1):
if j == l[i]:
if i != 0:
J0[j, i] = J0[istar[i - 1], i - 1]
else:
theta[i] = theta0[i, j]
policy = self.get_policy(theta, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
J0[j, i] = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if J0[j, i] < J1:
J1 = J0[j, i]
istar[i] = j
theta[i] = theta0[i, istar[i]]
return J0, istar, ncall, policy # type: ignore
[docs] def MCS(self, exp_result: ExperimentResult, seed: int, random_seeds: List[int], training_job: TrainingJobConfig,
u: List[int], v: List[int], smax: int, nf: int, stop: List[Union[float, int]], iinit: int, local: int,
gamma: float, hess: NDArray[np.float64], stopping_actions: int, eps: float, n: int, prt: int = 1) \
-> ExperimentResult:
"""
The Multilevel Coordinate Search algorithm
:param exp_result: the experiment result
:param seed: the seed
:param random_seeds: the list of random seeds
:param training_job: the configuration of the training job
:param u: the initial lower bound ("lower corner" in 3D)
:param v: the initial upper bound ("upper corner" in 3D)
:param smax: maximum level depth
:param nf: maximum number of function calls
:param stop: stopping test
:param iinit: the initial list
:param local: command for lsearch or no lsearch
:param gamma: acceptable relative accuracy for local search
:param hess:
:param stopping_actions: number of stopping actions
:param hess: the hessian of the multidimensional function
:param eps: parameter value for the golden ratio
:param n: the number of iterations
:param prt: print option
:return: the experiment result
"""
progress = 0.0
if MCSUtils().check_box_bound(u, v):
sys.exit("Error MCS main: out of bound")
n = len(u)
ncall: int = 0
ncloc: int = 0
l = np.multiply(1, np.ones(n)).astype(int)
L = np.multiply(2, np.ones(n)).astype(int)
theta0 = MCSUtils().get_theta0(iinit, u, v, n) # type: ignore
if iinit != 3:
f0, istar, ncall1, policy = self.init_list(theta0, l, L, stopping_actions, n) # type: ignore
ncall = ncall + ncall1
theta = np.zeros(n)
for i in range(n):
theta[i] = theta0[i, l[i]]
v1 = np.zeros(n)
for i in range(n):
if abs(theta[i] - u[i]) > abs(theta[i] - v[i]):
v1[i] = u[i]
else:
v1[i] = v[i]
step = self.experiment_config.hparams[agents_constants.MCS.STEP].value
step1 = self.experiment_config.hparams[agents_constants.MCS.STEP1].value
dim = step1
isplit = np.zeros(step1).astype(int)
level = np.zeros(step1).astype(int)
ipar = np.zeros(step1).astype(int)
ichild = np.zeros(step1).astype(int)
nogain = np.zeros(step1).astype(int)
f = np.zeros((2, step1))
z = np.zeros((2, step1))
record: NDArray[Union[np.int32, np.float64]] = np.zeros(smax)
nboxes: int = 0
nbasket: int = -1
nbasket0: int = -1
nsweepbest: int = 0
nsweep: int = 0
m = n
record[0] = 1
nloc = 0
xloc: List[float] = []
flag = 1
ipar, level, ichild, f, isplit, p, xbest, fbest, nboxes = MCSUtils().initbox( # type: ignore
theta0, f0, l, L, istar, u, v, isplit, level, ipar, ichild, f, nboxes, prt) # type: ignore
f0min = fbest
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
s, record = MCSUtils().strtsw(smax, level, f[0, :], nboxes, record) # type: ignore
nsweep = nsweep + 1
xmin: List[Union[float, List[float], NDArray[np.float64]]] = []
fmi: List[float] = []
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN].append(f0min)
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN].append(f0min)
running_avg_J = ExperimentUtil.running_average(
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value)
avg_metrics: Optional[Dict[str, Union[float, int]]] = None
while s < smax and ncall + 1 <= nf:
if s % self.experiment_config.log_every == 0 and s > 0:
# Update training job
total_iterations = len(random_seeds) * smax
iterations_done = (random_seeds.index(seed)) * smax + s
progress = round(iterations_done / total_iterations, 2)
training_job.progress_percentage = progress
training_job.experiment_result = exp_result
if self.env is not None and len(self.env.get_traces()) > 0:
training_job.simulation_traces.append(self.env.get_traces()[-1])
if len(training_job.simulation_traces) > training_job.num_cached_traces:
training_job.simulation_traces = training_job.simulation_traces[1:]
if self.save_to_metastore:
MetastoreFacade.update_training_job(training_job=training_job, id=training_job.id)
# Update execution
ts = time.time()
self.exp_execution.timestamp = ts
self.exp_execution.result = exp_result
if self.save_to_metastore:
MetastoreFacade.update_experiment_execution(experiment_execution=self.exp_execution,
id=self.exp_execution.id)
Logger.__call__().get_logger().info(
f"[MCS] s: {s}, J:{-fbest}, "
f"J_avg_{-self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value}:"
f"{-running_avg_J}, "
f"sigmoid(theta):{policy.thresholds()}, progress: {round(progress * 100, 2)}%")
par = record[s]
n0, x, y, x1, x2, f1, f2 = MCSUtils().vertex(par, n, u, v, v1, theta0, f0, ipar, isplit, # type: ignore
ichild, z, f, l, L) # type: ignore
if s > 2 * n * (min(n0) + 1):
isplit[par], z[1, par] = MCSUtils().splrnk(n, n0, p, x, y)
splt = 1
else:
if nogain[par]:
splt = 0
else:
e, isplit[par], z[1, par] = MCSUtils().exgain(n, n0, l, L, x, y, x1, x2, f[0, par], f0, f1, f2)
fexp = f[0, par] + min(e)
if fexp < fbest:
splt = 1
else:
splt = 0
nogain[par] = (1)
if splt == 1:
i = isplit[par]
level[par] = 0
if z[1, par] == np.Inf:
m = m + 1
z[1, par] = m
(xbest, fbest, policy, f01, xmin, fmi, ipar, level,
ichild, f, flag, ncall1, record, nboxes,
nbasket, nsweepbest, nsweep) = \
self.splinit(i, s, smax, par, theta0, n0, u, v, x, y, x1, x2, L, l, xmin, # type: ignore
fmi, ipar, level, # type: ignore
ichild, f, xbest, fbest, stop, prt, record, nboxes, nbasket, # type: ignore
nsweepbest, nsweep, # type: ignore
stopping_actions) # type: ignore
f01 = f01.reshape(len(f01), 1)
f0 = np.concatenate((f0, f01), axis=1)
ncall = ncall + ncall1
else:
z[0, par] = x[i]
(xbest, fbest, policy, xmin, fmi,
ipar, level, ichild, f,
flag, ncall1, record,
nboxes, nbasket, nsweepbest,
nsweep) = self.split(i, s, smax, par, n0, u, v, x, y, x1, x2, z[:, par], xmin, fmi, ipar, level,
ichild, f, xbest, fbest, stop, prt, record, nboxes, nbasket, nsweepbest,
nsweep, stopping_actions)
ncall = ncall + ncall1
if nboxes > dim:
isplit = np.concatenate((isplit, np.zeros(step)))
level = np.concatenate((level, np.zeros(step)))
ipar = np.concatenate((ipar, np.zeros(step)))
ichild = np.concatenate((ichild, np.zeros(step)))
nogain = np.concatenate((nogain, np.zeros(step)))
# J: NDArray[Union[np.float64, np.int32]] = np.concatenate((J, np.ones((2, step))), axis=1)
z = np.concatenate((z, np.ones((2, step))), axis=1)
dim = nboxes + step
if not flag:
break
else:
if s + 1 < smax:
level[par] = s + 1
record = MCSUtils().updtrec(par, s + 1, f[0, :], record) # type: ignore
else:
level[par] = 0
nbasket = nbasket + 1
if len(xmin) == nbasket:
xmin.append(copy.deepcopy(x))
fmi.append(f[0, par])
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f[0, par]
s = s + 1
while s < smax:
if record[s] == 0:
s = s + 1
else:
break
if s == smax:
if local:
fmiTemp = np.asarray(fmi[nbasket0 + 1: nbasket + 1])
xminTemp = xmin[nbasket0 + 1: nbasket + 1]
j = np.argsort(fmiTemp)
fmiTemp = np.sort(fmiTemp)
xminTemp = [copy.deepcopy(xminTemp[jInd]) for jInd in j]
fmi[nbasket0 + 1: nbasket + 1] = fmiTemp
xmin[nbasket0 + 1: nbasket + 1] = xminTemp
for j_iter in range(nbasket0 + 1, nbasket + 1):
x = copy.deepcopy(xmin[j_iter])
f1 = copy.deepcopy(fmi[j_iter])
loc = MCSUtils().chkloc(nloc, xloc, x)
if loc:
nloc, xloc = MCSUtils().addloc(nloc, xloc, x)
if not nbasket0 or nbasket0 == -1:
(xbest, fbest, policy, avg_metrics, xmin,
fmi, x, f1, loc, flag,
ncall1, nsweep,
nsweepbest) = \
self.basket(
x, f1, policy, avg_metrics, xmin, fmi,
xbest, fbest, stop,
nbasket0, nsweep,
nsweepbest,
stopping_actions)
else:
(xbest, fbest, policy, avg_metrics,
xmin, fmi, x, f1, loc, flag,
ncall1, nsweep, nsweepbest) = self.basket(x, f1, policy, avg_metrics, xmin, fmi, xbest,
fbest, stop, nbasket0, nsweep, nsweepbest,
stopping_actions)
ncall = ncall + ncall1
if not flag:
break
if loc:
xmin1, fmi1, nc, flag, nsweep, nsweepbest = self.lsearch(
x, f1, f0min, u, v, nf - ncall, stop, local, gamma, hess, nsweep, nsweepbest,
stopping_actions, eps)
ncall = ncall + nc
ncloc = ncloc + nc
if fmi1 < fbest:
xbest = copy.deepcopy(xmin1)
fbest = copy.deepcopy(fmi1)
nsweepbest = nsweep
if not flag:
nbasket0 = nbasket0 + 1
nbasket = copy.deepcopy(nbasket0)
if len(xmin) == nbasket:
xmin.append(copy.deepcopy(xmin1))
fmi.append(copy.deepcopy(fmi1))
else:
xmin[nbasket] = copy.deepcopy(xmin1)
fmi[nbasket] = copy.deepcopy(fmi1)
break
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return exp_result
if not nbasket0 or nbasket0 == -1:
(xbest, fbest, xmin, fmi, loc, flag, ncall1, nsweep, nsweepbest) = self.basket1(
np.array(xmin1), fmi1, xmin, fmi, xbest, fbest, stop, nbasket0, nsweep,
nsweepbest, stopping_actions)
else:
(xbest, fbest, policy, avg_metrics, xmin, fmi, loc, flag, ncall1, nsweep,
nsweepbest) = self.basket1(np.array(xmin1), fmi1, xmin, fmi, xbest, fbest, stop,
nbasket0, nsweep, nsweepbest, stopping_actions)
ncall = ncall + ncall1
if not flag:
break
if loc:
nbasket0 = nbasket0 + 1
if len(xmin) == nbasket0:
xmin.append(copy.deepcopy(xmin1))
fmi.append(copy.deepcopy(fmi1))
else:
xmin[nbasket0] = copy.deepcopy(xmin1)
fmi[nbasket0] = copy.deepcopy(fmi1)
fbest, xbest = MCSUtils().fbestloc(fmi, fbest, xmin, xbest, # type: ignore
nbasket0, stop) # type: ignore
if not flag:
break
nbasket = copy.deepcopy(nbasket0)
if not flag:
break
s, record = MCSUtils().strtsw(smax, list(level), list(f[0, :]), nboxes, record)
running_avg_J = ExperimentUtil.running_average(
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value)
exp_result.all_metrics[seed][agents_constants.COMMON.AVERAGE_RETURN].append(fbest)
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_RETURN].append(running_avg_J)
# Log thresholds
exp_result.all_metrics[seed][agents_constants.NELDER_MEAD.THETAS].append(
MCSAgent.round_vec(xbest))
exp_result.all_metrics[seed][agents_constants.NELDER_MEAD.THRESHOLDS].append(
MCSAgent.round_vec(policy.thresholds()))
# Log stop distribution
for k, v in policy.stop_distributions().items():
exp_result.all_metrics[seed][k].append(v)
if avg_metrics is not None:
# Log intrusion lengths
exp_result.all_metrics[seed][env_constants.ENV_METRICS.INTRUSION_LENGTH].append(
round(avg_metrics[env_constants.ENV_METRICS.INTRUSION_LENGTH], 3))
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_INTRUSION_LENGTH].append(
ExperimentUtil.running_average(
exp_result.all_metrics[seed][env_constants.ENV_METRICS.INTRUSION_LENGTH],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value))
# Log stopping times
exp_result.all_metrics[seed][env_constants.ENV_METRICS.INTRUSION_START].append(
round(avg_metrics[env_constants.ENV_METRICS.INTRUSION_START], 3))
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_INTRUSION_START].append(
ExperimentUtil.running_average(
exp_result.all_metrics[seed][env_constants.ENV_METRICS.INTRUSION_START],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value))
exp_result.all_metrics[seed][env_constants.ENV_METRICS.TIME_HORIZON].append(
round(avg_metrics[env_constants.ENV_METRICS.TIME_HORIZON], 3))
exp_result.all_metrics[seed][agents_constants.COMMON.RUNNING_AVERAGE_TIME_HORIZON].append(
ExperimentUtil.running_average(
exp_result.all_metrics[seed][env_constants.ENV_METRICS.TIME_HORIZON],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value))
for k in range(1, self.experiment_config.hparams[agents_constants.MCS.STOPPING_ACTIONS].value + 1):
exp_result.plot_metrics.append(env_constants.ENV_METRICS.STOP + f"_{k}")
exp_result.all_metrics[seed][env_constants.ENV_METRICS.STOP + f"_{k}"].append(
round(avg_metrics[env_constants.ENV_METRICS.STOP + f"_{k}"], 3))
exp_result.all_metrics[seed][env_constants.ENV_METRICS.STOP + f"_running_average_{k}"].append(
ExperimentUtil.running_average(
exp_result.all_metrics[seed][env_constants.ENV_METRICS.STOP + f"_{k}"],
self.experiment_config.hparams[agents_constants.COMMON.RUNNING_AVERAGE].value))
# Log baseline returns
exp_result.all_metrics[seed][env_constants.ENV_METRICS.AVERAGE_UPPER_BOUND_RETURN].append(
round(avg_metrics[env_constants.ENV_METRICS.AVERAGE_UPPER_BOUND_RETURN], 3))
exp_result.all_metrics[seed][
env_constants.ENV_METRICS.AVERAGE_DEFENDER_BASELINE_STOP_ON_FIRST_ALERT_RETURN].append(
round(avg_metrics[env_constants.ENV_METRICS.AVERAGE_DEFENDER_BASELINE_STOP_ON_FIRST_ALERT_RETURN],
3))
policy = self.get_policy(theta=list(xbest), L=stopping_actions)
exp_result.policies[seed] = policy
# Save policy
if self.save_to_metastore:
MetastoreFacade.save_multi_threshold_stopping_policy(multi_threshold_stopping_policy=policy)
if prt:
Logger.__call__().get_logger().info(
f"[MCS-summary-log]: "
f"nsweep: {nsweep}, minlevel: {s}, ncall: {ncall}, J:{-fbest}, "
f"theta_best: {xbest}, "
f"sigmoid(theta):{policy.thresholds()}, progress: {round(progress * 100, 2)}%")
if stop[0] > 1:
if nsweep - nsweepbest >= stop[0]:
return exp_result
return exp_result
[docs] @staticmethod
def round_vec(vec) -> List[float]:
"""
Rounds a vector to 3 decimals
:param vec: the vector to round
:return: the rounded vector
"""
return list(map(lambda x: round(x, 3), vec))
[docs] def splinit(self, i: int, s: int, smax: int, par: int, x0: NDArray[np.int32], n0: int, u: List[int], v: List[int],
x: NDArray[np.float64], y: NDArray[np.float64], x1: NDArray[np.float64], x2: NDArray[np.float64],
L: NDArray[np.int32], l: NDArray[np.int32],
xmin: List[Union[float, List[float], NDArray[np.float64]]],
fmi: List[float], ipar: NDArray[np.int32],
level: NDArray[np.int32], ichild: NDArray[np.int32],
f: NDArray[np.float64], xbest: NDArray[np.float64], fbest: NDArray[np.float64],
stop: List[Union[float, int]], prt: int, record: NDArray[Union[np.int32, np.float64]],
nboxes: int, nbasket: int, nsweepbest: int, nsweep: int, stopping_actions: int, ncall: int = 0,
nchild: int = 0):
"""
Splitting box at specified level s according to an initialization list
:param i: specified index
:param s: current depth level
:param smax: maximum depth level
:param par:
:param x0: initial position
:param n0:
:param u: initial lower guess ("lower corner" in 3D)
:param v: initial upper guess ("upper corner" in 3D)
:param x: starting point
:param y:
:param x1: evaluation argument (position)
:param x2: evaluation argument (position)
:param L:
:param l:
:param xmin: evaluation argument (position)
:param fmi:
:param ipar:
:param level:
:param ichild:
:param f: function value
:param xbest: best evaluation argument (position)
:param fbest: best function value
:param stop: stopping test
:param prt: print - unsued in this implementation so far
:param record:
:param nboxes: counter for boxes not in the 'shopping bas
:param nbasket: counter for boxes in the 'shopping bas
:param nsweepbest: number of sweep in which fbest was updated for the last
:param nsweep: sweep counter
:stopping_actions: number of stopping actions
:return: a collection of parameters and metrics from the initial split
"""
f0 = np.zeros(max(L) + 1)
flag = 1
for j in range(L[i] + 1):
if j != l[i]:
x[i] = x0[i, j]
policy = self.get_policy(x, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f0[j] = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f0[j] < fbest:
fbest = f0[j]
xbest = copy.deepcopy(x)
nsweepbest = copy.deepcopy(nsweep)
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(float(fbest), stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(float(fbest), stop[2])
if not flag:
return xbest, fbest, f0, xmin, fmi, ipar, level, ichild, f,
else:
f0[j] = f[0, par]
if s + 1 < smax:
# nchild = 0
if u[i] < x0[i, 0]:
nchild = nchild + 1
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, s + 1, -nchild,
f0[0])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
for j in range(L[i]):
nchild = nchild + 1
if f0[j] <= f0[j + 1] or s + 2 < smax:
nboxes = nboxes + 1
if f0[j] <= f0[j + 1]:
level0 = s + 1
else:
level0 = s + 2
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(
par, level0, -nchild, f0[j])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
x[i] = x0[i, j]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(copy.deepcopy(x))
fmi.append(f0[j])
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f0[j]
nchild = nchild + 1
if f0[j + 1] < f0[j] or s + 2 < smax:
nboxes = nboxes + 1
if f0[j + 1] < f0[j]:
level0 = s + 1
else:
level0 = s + 2
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, level0,
-nchild, f0[j + 1])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
x[i] = x0[i, j + 1]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(copy.deepcopy(x))
fmi.append(f0[j + 1])
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f0[j + 1]
if x0[i, L[i]] < v[i]:
nchild = nchild + 1
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(
par, s + 1, -nchild, f0[L[i]])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
for j in range(L[i] + 1):
x[i] = x0[i, j]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(copy.deepcopy(x))
fmi.append(f0[j])
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f0[j]
return (xbest, fbest, policy, f0, xmin, fmi, ipar, level, ichild, f, flag, ncall,
record, nboxes, nbasket, nsweepbest, nsweep)
[docs] def split(self, i: int, s: int, smax: int, par: int, n0: int, u: List[int], v: List[int],
x: NDArray[np.float64], y: NDArray[np.float64],
x1: NDArray[np.float64], x2: NDArray[np.float64], z: NDArray[np.float64],
xmin: List[Union[float, List[float], NDArray[np.float64]]], fmi: List[float],
ipar: NDArray[np.int32], level: NDArray[np.int32], ichild: NDArray[np.int32],
f: NDArray[np.float64], xbest: NDArray[np.float64],
fbest: NDArray[np.float64], stop: List[Union[float, int]], prt: int,
record: NDArray[Union[np.float64, np.int32]], nboxes: int, nbasket: int,
nsweepbest: int, nsweep: int, stopping_actions: int, ncall: int = 0, flag: int = 1):
"""
Function that performs a box split
:param i:
:param s: current depth level
:param smax: maximum depth level
:param par:
:param n0:
:param u: initial lower guess ("lower corner" in 3D)
:param v: initial upper guess ("upper corner" in 3D)
:param x: starting point
:param y:
:param x1: evaluation argument (position)
:param x2: evaluation argument (position)
:param param z:
:param xmin: minimum position
:param fmi:
:param ipar:
:param level:
:param ichild:
:param f: function value
:param xbest: currently best position
:param fbest: current best function value'
:param stop: stopping test
:param prt: print - unsued in this implementation so far
:param record:
:param nboxes: counter for boxes not in the 'shopping bas
:param nbasket: counter for boxes in the 'shopping basket'
:param nsweepbest: number of sweep in which fbest was updated for the last
:param nsweep: sweep counter
:param stopping_actions: the number of stopping actions
:return: a collection of parameters and metrics afdter the arbitrary split
"""
# ncall = 0
# flag = 1
x[i] = z[1]
policy = self.get_policy(x, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f[1, par] = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f[1, par] < fbest:
fbest = copy.deepcopy(f[1, par])
xbest = copy.deepcopy(x)
nsweepbest = copy.deepcopy(nsweep)
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(float(fbest), stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(float(fbest), stop[2])
if not flag:
return (xbest, fbest, xmin, fmi, ipar, level, ichild, f,
flag, ncall, record, nboxes, nbasket, nsweepbest, nsweep)
if s + 1 < smax:
if f[0, par] <= f[1, par]:
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, s + 1,
1, f[0, par])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
if s + 2 < smax:
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, s + 2,
2, f[1, par])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
x[i] = z[1]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(copy.deepcopy(x))
fmi.append(f[1, par])
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f[1, par]
else:
if s + 2 < smax:
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, s + 2,
1, f[0, par])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
x[i] = z[0]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(copy.deepcopy(x))
fmi.append(f[0, par])
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f[0, par]
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, s + 1,
2, f[1, par])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
if z[1] != y[i]:
if abs(z[1] - y[i]) > abs(z[1] - z[0]) * (3 - np.sqrt(5)) * 0.5:
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(par, s + 1,
3, f[1, par])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
if s + 2 < smax:
nboxes = nboxes + 1
ipar[nboxes], level[nboxes], ichild[nboxes], f[0, nboxes] = MCSUtils().genbox(
par, s + 2, 3, f[1, par])
record = np.array(MCSUtils().updtrec(nboxes, level[nboxes], list(f[0, :]), list(record)))
else:
x[i] = z[1]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(copy.deepcopy(x))
fmi.append(copy.deepcopy(f[1, par]))
else:
xmin[nbasket] = copy.deepcopy(x)
fmi[nbasket] = f[1, par]
else:
xi1 = copy.deepcopy(x)
xi2 = copy.deepcopy(x)
xi1[i] = z[0]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(xi1)
fmi.append(f[0, par])
else:
xmin[nbasket] = xi1
fmi[nbasket] = f[0, par]
xi2[i] = z[1]
nbasket = nbasket + 1
if (len(xmin) == nbasket):
xmin.append(xi2)
fmi.append(f[1, par])
else:
xmin[nbasket] = xi2
fmi[nbasket] = f[1, par]
return (xbest, fbest, policy, xmin, fmi, ipar, level, ichild, f, flag,
ncall, record, nboxes, nbasket, nsweepbest, nsweep)
[docs] def basket(self, x: List[float], f: float,
policy: Union[MultiThresholdStoppingPolicy, LinearThresholdStoppingPolicy],
avg_metrics: Optional[Dict[str, Union[float, int]]],
xmin: List[Union[float, List[float], NDArray[np.float64]]],
fmi: List[float], xbest: List[float], fbest: float, stop: List[Union[int, float]],
nbasket: int, nsweep: int, nsweepbest: int, stopping_actions: int, loc: int = 1,
flag: int = 1, ncall: Union[float, int] = 0):
"""
Function representing the basket functional
:param x: starting point
:param f: function value
:param policy: current policy
:param avg_metrics: current average metrics
:param xmin: minum evaluation argumen (position)
:param fmi:
:param xbest: current best position
:param fbest: current best function value
:param stop: stopping test
:param nbasket: counter for boxes in the 'shopping basket'
:param nsweep: sweep counter
:param nsweepbest: number of sweep in which fbest was updated for the last
:param stopping_actions: number of stopping actions
:return: a collection of parameters and metrics after the basket functional
"""
if not nbasket:
return xbest, fbest, policy, avg_metrics, xmin, fmi, x, f, loc, flag, ncall, nsweep, nsweepbest
dist = np.zeros(nbasket + 1)
for k in range(len(dist)):
dist[k] = np.linalg.norm(np.subtract(x, xmin[k]))
ind = np.argsort(dist)
if nbasket == -1:
return xbest, fbest, policy, avg_metrics, xmin, fmi, x, f, loc, flag, ncall, nsweep, nsweepbest
else:
for k in range(nbasket + 1):
i = ind[k]
if fmi[i] <= f:
p = xmin[i] - x
y1 = x + 1 / 3 * p
policy = self.get_policy(y1, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f1 <= f:
y2 = x + 2 / 3 * p
policy = self.get_policy(y2, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f2 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f2 > max(f1, fmi[i]):
if f1 < f:
x = y1
f = f1
if f < fbest:
fbest = f
xbest = copy.deepcopy(x)
nsweepbest = nsweep
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return (
xbest, fbest, policy,
avg_metrics, xmin, fmi, x,
f, loc, flag,
ncall, nsweep,
nsweepbest,
)
else:
if f1 < min(f2, fmi[i]):
f = f1
x = copy.deepcopy(y1)
if f < fbest:
fbest = f
xbest = copy.deepcopy(x)
nsweepbest = nsweep
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return (
xbest, fbest, policy,
avg_metrics, xmin, fmi, x,
f, loc, flag,
ncall, nsweep,
nsweepbest,
)
elif f2 < min(f1, fmi[i]):
f = f2
x = copy.deepcopy(y2)
if f < fbest:
fbest = f
xbest = copy.deepcopy(x)
nsweepbest = nsweep
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return (
xbest, fbest, policy,
avg_metrics, xmin, fmi, x, f,
loc, flag, ncall,
nsweep, nsweepbest,
)
else:
loc = 0
break
return xbest, fbest, policy, avg_metrics, xmin, fmi, x, f, loc, flag, ncall, nsweep, nsweepbest
[docs] def lsearch(self, x: List[Union[float, int]], f: float, f0: NDArray[np.float64], u: List[int], v: List[int],
nf: int, stop: List[Union[int, float]], maxstep: int, gamma: float,
hess: NDArray[np.float64], nsweep: int,
nsweepbest: int, stopping_actions: int, eps: float, ncall: Union[float, int] = 0,
flag: int = 1, eps0: float = 0.001, nloc: int = 1, small: float = 0.1,
smaxls: int = 15, diag: int = 0, nstep: int = 0):
"""
The local search algorithm
:param x: starting point
:param f: function value
:param f0: function value
:param u: lower initial guess ("lower corner" in 3D)
:param v: initial upper guess ("upper corner" in 3D)
:param nf:
:param stop: stopping test
:param maxstep: maximum steps in the local search (mainly determined by the local command)
:param gamma: acceptable relative accuracy for local search
:param hess: the function Hessian
:param nsweep: sweep counter
:param nsweepbest: number of sweep in which fbest was updated for the last
:param stopping_actions: number of stopping actions
:param eps: parameter value for the golden ratio
:return: a collection of parameters and metrics afdter the local search
"""
n = len(x)
x0 = np.asarray([min(max(u[i], 0), v[i]) for i in range(len(u))])
xmin, fmi, g, G, nfcsearch = self.csearch(x, f, u, v, hess,
stopping_actions, eps)
xmin = [max(u[i], min(xmin[i], v[i])) for i in range(n)]
ncall = ncall + nfcsearch
xold = copy.deepcopy(xmin)
fold = copy.deepcopy(fmi)
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fmi, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fmi, stop[1])
if not flag:
return xmin, fmi, ncall, flag, nsweep, nsweepbest
d = np.asarray([min(min(xmin[i] - u[i], v[i] - xmin[i]), 0.25 * (1 + abs(x[i] - x0[i]))) for i in range(n)])
p, _, _ = LSUtils().minq(fmi, g, G, -d, d, 0, eps)
x = [max(u[i], min(xmin[i] + p[i], v[i])) for i in range(n)]
p = np.subtract(x, xmin)
if np.linalg.norm(p):
policy = self.get_policy(np.array(x), L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
alist: List[Union[float, int]] = [0, 1]
flist: List[Union[float, int]] = [fmi, f1]
fpred = fmi + np.dot(g.T, p) + np.dot(0.5, np.dot(p.T, np.dot(G, p)))
alist, flist, nfls = self.gls(u, v, xmin, p, alist,
flist, nloc, small, smaxls, stopping_actions)
ncall = ncall + nfls
i: Union[int, np.int64] = np.argmin(flist)
fminew = min(flist)
if fminew == fmi:
i = [k for k in range(len(alist)) if not alist[k]][0]
else:
fmi = copy.deepcopy(fminew)
xmin = xmin + np.dot(alist[i], p)
xmin = np.asarray([max(u[i], min(xmin[i], v[i])) for i in range(n)])
gain = f - fmi
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fmi, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fmi, stop[1])
if not flag:
return xmin, fmi, ncall, flag, nsweep, nsweepbest
if fold == fmi:
r: Union[int, float] = 0
elif fold == fpred:
r = 0.5
else:
r = (fold - fmi) / (fold - fpred)
else:
gain = f - fmi
r = 0
ind = [i for i in range(n) if (u[i] < xmin[i] and xmin[i] < v[i])]
b = np.dot(np.abs(g).T, [max(abs(xmin[i]), abs(xold[i])) for i in range(len(xmin))])
while (ncall < nf) and (nstep < maxstep) and ((diag or len(ind) < n) or
(stop[0] == 0 and fmi - gain <= stop[1]) or
(b >= gamma * (f0 - f) and gain > 0)):
nstep = nstep + 1
delta = [abs(xmin[i]) * eps ** (1 / 3) for i in range(len(xmin))]
j: Union[List[int]] = [inx for inx in range(len(delta)) if (not delta[inx])]
if len(j) != 0:
for inx in j:
delta[inx] = eps ** (1 / 3) * 1
x1, x2 = MCSUtils().neighbor(xmin, delta, list(u), list(v))
f = copy.deepcopy(fmi)
if len(ind) < n and (b < gamma * (f0 - f) or (not gain)):
ind1 = [i for i in range(len(u)) if (xmin[i] == u[i] or xmin[i] == v[i])]
for k in range(len(ind1)):
i = ind1[k]
x = copy.deepcopy(xmin)
if xmin[i] == u[i]:
x[i] = x2[i]
else:
x[i] = x1[i]
policy = self.get_policy(np.asarray(x), L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f1 < fmi:
alist = [0, x[i], -xmin[i]]
flist = [fmi, f1]
p = np.zeros(n)
p[i] = 1
alist, flist, nfls = self.gls(u, v, xmin, p, alist,
flist, nloc, small, 6, stopping_actions)
ncall = ncall + nfls
l: Union[int, np.int32] = np.argmin(flist)
fminew = min(flist)
if fminew == fmi:
temp_list = [inx for inx in range(len(alist)) if (not alist[inx])]
# j = [inx for inx in range(len(alist)) if (not alist[inx])][0]
item = temp_list[0]
l = item
else:
fmi = fminew
xmin[i] = xmin[i] + alist[l]
else:
ind1[k] = -1
xmin = np.asarray([max(u[inx], min(xmin[inx], v[inx])) for inx in range(len(xmin))])
if not sum(ind1):
break
for inx in range(len(delta)):
delta[inx] = abs(xmin[inx]) * eps ** (1 / 3)
j = [inx for inx in range(len(delta)) if (not delta[inx])]
if len(j) != 0:
for inx in j:
delta[inx] = eps ** (1 / 3) * 1
x1, x2 = MCSUtils().neighbor(xmin, delta, list(u), list(v))
if abs(r - 1) > 0.25 or (not gain) or (b < gamma * (f0 - f)):
xmin, fmi, g, G, x1, x2, nftriple = self.triple(xmin, fmi, x1, x2, u, v, hess, 0,
stopping_actions, setG=True)
ncall = ncall + nftriple
diag = 0
else:
xmin, fmi, g, G, x1, x2, nftriple = self.triple(xmin, fmi, x1, x2, u, v,
hess, G, stopping_actions)
ncall = ncall + nftriple
diag = 1
xold = copy.deepcopy(xmin)
fold = copy.deepcopy(fmi)
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fmi, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fmi, stop[1])
if not flag:
return xmin, fmi, ncall, flag, nsweep, nsweepbest
if r < 0.25:
d = 0.5 * d
elif r > 0.75:
d = 2 * d
minusd = np.asarray([max(-d[jnx], u[jnx] - xmin[jnx]) for jnx in range(len(xmin))])
mind = np.asarray([min(d[jnx], v[jnx] - xmin[jnx]) for jnx in range(len(xmin))])
p, _, _ = LSUtils().minq(fmi, g, G, minusd, mind, 0, eps)
if not (np.linalg.norm(p)) and (not diag) and (len(ind) == n):
break
if np.linalg.norm(p):
fpred = fmi + np.dot(g.T, p) + np.dot(0.5, np.dot(p.T, np.dot(G, p)))
x = copy.deepcopy(xmin + p)
policy = self.get_policy(np.array(x), L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
alist = [0, 1]
flist = [fmi, f1]
alist, flist, nfls = self.gls(u, v, xmin, p, alist,
flist, nloc, small, smaxls, stopping_actions)
ncall = ncall + nfls
argmin = np.argmin(flist)
fmi = min(flist)
xmin = [xmin[jnx] + alist[argmin] * p[jnx] for jnx in range(len(xmin))]
xmin = np.asarray([max(u[jnx], min(xmin[jnx], v[jnx])) for jnx in range(len(xmin))])
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fmi, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fmi, stop[1])
if not flag:
return xmin, fmi, ncall, flag, nsweep, nsweepbest
gain = f - fmi
if fold == fmi:
r = 0
elif fold == fpred:
r = 0.5
else:
r = (fold - fmi) / (fold - fpred)
if fmi < fold:
fac = abs(1 - 1 / r)
eps0 = max(eps, min(fac * eps0, 0.001))
else:
eps0 = 0.001
else:
gain = f - fmi
if (not gain):
eps0 = 0.001
fac = np.Inf
r = 0
ind = [inx for inx in range(len(u)) if (u[inx] < xmin[inx] and xmin[inx] < v[inx])]
b = np.dot(np.abs(g).T, [max(abs(xmin[inx]), abs(xold[inx])) for inx in range(len(xmin))])
return xmin, fmi, ncall, flag, nsweep, nsweepbest
[docs] def basket1(self, x: NDArray[np.float64], f: float,
xmin: List[Union[float, List[float], NDArray[np.float64]]], fmi: List[float],
xbest: List[float], fbest: float, stop: List[Union[float, int]], nbasket: int,
nsweep: int, nsweepbest: int, stopping_actions: int, loc: int = 1,
flag: int = 1, ncall: int = 0):
"""
Basket 1
:param x: starting point
:param f: function value(s)
:param xmin: current minimum evaluation argument (position)
:param fmi:
:param xbest: current best evaluation argument (position)
:param fbest: current best function value
:param stop: stopping test
:param nbasket: counter for boxes in the 'shopping basket'
:param nsweep: sweep counter
:param nsweepbest: number of sweep in which fbest was updated for the last
:param stopping_actions: number of stopping actions
:return: the metrics and parameters from basket1
"""
if not nbasket:
return xbest, fbest, xmin, fmi, loc, flag, ncall, nsweep, nsweepbest
dist = np.zeros(nbasket + 1)
for k in range(len(dist)):
dist[k] = np.linalg.norm(np.subtract(x, xmin[k]))
ind = np.argsort(dist)
if nbasket == -1:
return xbest, fbest, xmin, fmi, loc, flag, ncall, nsweep, nsweepbest
else:
for k in range(nbasket + 1):
i = ind[k]
p = xmin[i] - x
y1 = x + 1 / 3 * p
policy = self.get_policy(y1, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f1 <= max(fmi[i], f):
y2 = x + 2 / 3 * p
policy = self.get_policy(y2, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f2 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
ncall = ncall + 1
if f2 <= max(f1, fmi[i]):
if f < min(min(f1, f2), fmi[i]):
fmi[i] = f
xmin[i] = copy.deepcopy(x)
if fmi[i] < fbest:
fbest = copy.deepcopy(fmi[i])
xbest = copy.deepcopy(xmin[i])
nsweepbest = nsweep
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return (
xbest, fbest,
policy, avg_metrics, xmin,
fmi, loc, flag, ncall,
nsweep, nsweepbest,
)
loc = 0
break
elif f1 < min(min(f, f2), fmi[i]): # type: ignore[call-overload]
fmi[i] = f1
xmin[i] = copy.deepcopy(y1)
if fmi[i] < fbest:
fbest = copy.deepcopy(fmi[i])
xbest = copy.deepcopy(xmin[i])
nsweepbest = copy.deepcopy(nsweep)
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return (
xbest, fbest,
policy, avg_metrics, xmin,
fmi, loc, flag, ncall,
nsweep, nsweepbest,
)
# end fmi[i] < fbest: elif
loc = 0
break
elif f2 < min(min(f, f1), fmi[i]): # type: ignore[call-overload]
fmi[i] = f2
xmin[i] = copy.deepcopy(y2)
if fmi[i] < fbest:
fbest = copy.deepcopy(fmi[i])
xbest = copy.deepcopy(xmin[i])
nsweepbest = nsweep
if stop[0] > 0 and stop[0] < 1:
flag = MCSUtils().chrelerr(fbest, stop)
elif stop[0] == 0:
flag = MCSUtils().chvtr(fbest, stop[1])
if not flag:
return (
xbest, fbest,
policy, avg_metrics, xmin,
fmi, loc,
flag, ncall,
nsweep, nsweepbest,
)
loc = 0
break
else:
loc = 0
break
return xbest, fbest, policy, avg_metrics, xmin, fmi, loc, flag, ncall, nsweep, nsweepbest
[docs] def csearch(self, x: List[Union[float, int]], f: float, u: List[int], v: List[int], hess: NDArray[np.float64],
stopping_actions: int, eps: float):
"""
Performs the csearch algorithm
:param x: starting point
:param f: function value
:param u: lower initial guess ("Lower corner" in 3D)
:param v: upper initial guess ("upper corner" in 3D)
:param hess: the function Hessian
:param stopping_actions: the number of stopping actions
:return: a collection of parameters and metrics after doing the csearch
"""
n = len(x)
x = [min(v[i], max(x[i], u[i])) for i in range(len(x))]
nfcsearch = 0
smaxls = 6
small = 0.1
nloc = 1
hess = np.ones((n, n))
xmin = copy.deepcopy(x)
fmi = copy.deepcopy(f)
xminnew = copy.deepcopy(xmin)
fminew = copy.deepcopy(fmi)
g = np.zeros(n)
x1 = np.zeros(n)
x2 = np.zeros(n)
G = np.zeros((n, n))
for i in range(n):
p = np.zeros(n)
p[i] = 1
if xmin[i]:
delta = eps ** (1 / 3) * abs(xmin[i])
else:
delta = eps ** (1 / 3)
linesearch = True
if xmin[i] <= u[i]:
policy = self.get_policy(xmin + delta * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
nfcsearch = nfcsearch + 1
if f1 >= fmi:
policy = self.get_policy(xmin + 2 * delta * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f2 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
# fcsearch = nfcsearch + 1
x1[i] = xmin[i] + delta
x2[i] = xmin[i] + 2 * delta
if f2 >= fmi:
xminnew[i] = xmin[i]
fminew = fmi
else:
xminnew[i] = x2[i]
fminew = copy.deepcopy(f2)
linesearch = False
else:
alist: List[Union[float, int]] = [0, delta]
flist: List[Union[float, int]] = [fmi, f1]
elif xmin[i] >= v[i]:
policy = self.get_policy(xmin - delta * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
nfcsearch = nfcsearch + 1
if f1 >= fmi:
policy = self.get_policy(xmin - 2 * delta * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f2 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
nfcsearch = nfcsearch + 1
x1[i] = xmin[i] - delta
x2[i] = xmin[i] - 2 * delta
if f2 >= fmi:
xminnew[i] = xmin[i]
fminew = fmi
else:
xminnew[i] = x2[i]
fminew = f2
linesearch = False
else:
alist = [0, -delta]
flist = [fmi, f1]
else:
alist = [0]
flist = [fmi]
if linesearch:
alist, flist, nfls = self.gls(u, v, xmin, p, alist, flist, nloc,
small, smaxls, stopping_actions)
nfcsearch = nfcsearch + nfls
j: Union[int, np.int32] = np.argmin(flist)
fminew = min(flist)
if fminew == fmi:
j = [inx for inx in range(len(alist)) if not alist[inx]][0]
ind = [inx for inx in range(len(alist)) if abs(alist[inx] - alist[j]) < delta]
ind1 = [inx for inx in range(len(ind)) if ind[inx] == j]
for inx in ind1:
del ind[inx]
for inx in ind:
del alist[inx]
del flist[inx]
j = np.argmin(flist)
fminew = min(flist)
xminnew[i] = xmin[i] + alist[j]
if i == 0 or not alist[j]:
if j == 0:
x1[i] = xmin[i] + alist[1]
f1 = flist[1]
x2[i] = xmin[i] + alist[2]
f2 = flist[2]
elif j == len(alist) - 1:
x1[i] = xmin[i] + alist[j - 1]
f1 = flist[j - 1]
x2[i] = xmin[i] + alist[j - 2]
f2 = flist[j - 2]
else:
x1[i] = xmin[i] + alist[j - 1]
f1 = flist[j - 1]
x2[i] = xmin[i] + alist[j + 1]
f2 = flist[j + 1]
xmin[i] = xminnew[i]
fmi = copy.deepcopy(fminew)
else:
x1[i] = xminnew[i]
f1 = copy.deepcopy(fminew)
if xmin[i] < x1[i] and j < len(alist) - 1:
x2[i] = xmin[i] + alist[j + 1]
f2 = flist[j + 1]
elif j == 0:
if alist[j + 1]:
x2[i] = xmin[i] + alist[j + 1]
f2 = flist[j + 1]
else:
x2[i] = xmin[i] + alist[j + 2]
f2 = flist[j + 2]
elif alist[j - 1]:
x2[i] = xmin[i] + alist[j - 1]
f2 = flist[j - 1]
else:
x2[i] = xmin[i] + alist[j - 2]
f2 = flist[j - 2]
g[i], G[i, i] = MCSUtils().polint1([xmin[i], x1[i], x2[i]], [fmi, f1, f2])
x = copy.deepcopy(xmin)
k1 = -1
if f1 <= f2:
x[i] = x1[i]
else:
x[i] = x2[i]
for k in range(i):
if hess[i, k]:
q1 = fmi + g[k] * (x1[k] - xmin[k]) + 0.5 * G[k, k] * (x1[k] - xmin[k]) ** 2
q2 = fmi + g[k] * (x2[k] - xmin[k]) + 0.5 * G[k, k] * (x2[k] - xmin[k]) ** 2
if q1 <= q2:
x[k] = x1[k]
else:
x[k] = x2[k]
policy = self.get_policy(np.array(x), L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f12 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
nfcsearch = nfcsearch + 1
G[i, k] = MCSUtils().hessian(i, k, x, xmin, f12, fmi, g, G)
G[k, i] = G[i, k]
if f12 < fminew:
fminew = f12
xminnew = copy.deepcopy(x)
k1 = k
x[k] = xmin[k]
else:
G[i, k] = 0
G[k, i] = 0
if fminew <= fmi:
if x1[i] == xminnew[i]:
x1[i] = xmin[i]
elif x2[i] == xminnew[i]:
x2[i] = xmin[i]
if k1 > -1:
if xminnew[k1] == x1[k1]:
x1[k1] = xmin[k1]
elif xminnew[k1] == x2[k1]:
x2[k1] = xmin[k1]
for k in range(i + 1):
g[k] = g[k] + G[i, k] * (xminnew[i] - xmin[i])
if k1 > -1:
g[k] = g[k] + G[k1, k] * (xminnew[k1] - xmin[k1])
xmin = copy.deepcopy(xminnew)
fmi = copy.deepcopy(fminew)
return xmin, fmi, g, G, nfcsearch
[docs] def gls(self, xl: List[int], xu: List[int], x: List[Union[float, int]], p: NDArray[Union[np.int32, np.float64]],
alist: List[Union[float, int]], flist: List[Union[int, float]],
nloc: int, small: Union[float, int], smax: int, stopping_actions: int, prt: int = 2,
short: float = 0.381966, bend: int = 0):
"""
Global line search main function
:param func: funciton name which is subjected to optimization
:param xl: lower bound
:param xu: upper bound
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param nloc: (for local ~= 0) counter of points that have been
:param small: tolerance values
:param smax: search list size
:param prt: print - unsued in this implementation so far
:param short:
:param bend:
:return: search list,function values,number of fucntion evaluation
"""
sinit = len(alist)
# bend = 0
xl, xu, x, p, amin, amax, scale = GLSUtils().lsrange(xl, xu, x, p, prt, bend)
alist, flist, alp, alp1, alp2, falp = self.lsinit(x, p, alist, flist, amin, amax, scale, stopping_actions)
alist, flist, abest, fbest, fmed, up, down, monotone, minima, nmin, unitlen, s = GLSUtils().lssort(alist, flist)
nf = s - sinit
while s < min(5, smax):
if nloc == 1:
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s, alp, fac) = self.lspar(nloc, small, sinit, short, x, p, alist, flist,
amin, amax, alp, abest, fbest, fmed, up, down,
monotone, minima, nmin, unitlen, s, stopping_actions)
if s > 3 and monotone and (abest == amin or abest == amax):
nf = s - sinit
return alist, flist, nf
else:
alist, flist, alp, fac = self.lsnew(nloc, small, sinit, short, x, p, s, alist, flist,
amin, amax, alp, abest, fmed, unitlen, stopping_actions)
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
saturated = 0
if nmin == 1:
if monotone and (abest == amin or abest == amax):
nf = s - sinit
return alist, flist, nf
if s == 5:
(alist, flist, amin, amax, alp, abest, fbest, fmed, up, down,
monotone, minima, nmin, unitlen, s, good, saturated) = self.lsquart(nloc, small, sinit,
short, np.array(x), p, alist,
flist, amin, amax, alp,
abest, fbest, fmed, up,
down, monotone, minima,
nmin, unitlen, s, saturated,
stopping_actions)
(alist, flist, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = self.lsdescent(x, p, alist, flist, alp,
abest, fbest, fmed, up, down,
monotone, minima, nmin, unitlen,
s, stopping_actions)
convex = GLSUtils().lsconvex(alist, flist, nmin, s)
if convex:
nf = s - sinit
return alist, flist, nf
sold = 0
while 1:
(alist, flist, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = self.lsdescent(x, p, alist, flist, alp, abest,
fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s, stopping_actions)
alp, saturated = GLSUtils().lssat(small, alist, flist, alp, amin, amax, s, saturated)
if saturated or s == sold or s >= smax:
break
sold = s
nminold = nmin
if not saturated and nloc > 1:
(alist, flist, amin, amax, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = self.lssep(nloc, small,
sinit, short, x,
p, alist, flist, amin,
amax, alp, abest, fbest,
fmed, up, down, monotone,
minima, nmin, unitlen, s,
stopping_actions)
(alist, flist, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s, saturated) = self.lslocal(nloc, small, sinit, short,
x, p, alist, flist, amin, amax,
alp, abest, fbest, fmed, up,
down, monotone, minima, nmin,
unitlen, s, saturated, stopping_actions)
if nmin > nminold:
saturated = 0
nf = s - sinit
return alist, flist, nf
[docs] def lsinit(self, x, p, alist, flist, amin, amax, scale, stopping_actions):
"""
Line search algorithm
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param amin:
:param amax:
:param scale:
:param stopping_actions: number of stopping actions
:return: set of parameters obtained from performing the line search
"""
alp: Union[int, float] = 0
alp1: Union[int, float] = 0
alp2: Union[int, float] = 0
falp: Union[float, int] = 0
if len(alist) == 0:
# evaluate at absolutely smallest point
alp = 0
if amin > 0:
alp = amin
if amax < 0:
alp = amax
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
elif len(alist) == 1:
# evaluate at absolutely smallest point
alp = 0
if amin > 0:
alp = amin
if amax < 0:
alp = amax
if alist[0] != alp:
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
aamin = min(alist)
aamax = max(alist)
# if amin > aamin or amax < aamax:
# sys.exit('GLS Error: non-admissible step in alist') # TODO: investigate this
if aamax - aamin <= scale:
alp1 = max(amin, min(- scale, amax))
alp2 = max(amin, min(+ scale, amax))
alp = np.Inf
if aamin - alp1 >= alp2 - aamax:
alp = alp1
if alp2 - aamax >= aamin - alp1:
alp = alp2
if alp < aamin or alp > aamax:
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
if len(alist) == 1:
sys.exit('GLS Error: lsinit bug: no second point found')
return alist, flist, alp, alp1, alp2, falp
[docs] def triple(self, x: Union[List[Union[int, float]]], f: float, x1: Union[List[Union[int, float]]],
x2: Union[List[Union[int, float]]], u: List[int], v: List[int],
hess, G, stopping_actions, setG=False):
"""
The triple function
:param x: starting point
:param f: function value
:param x1: evaluation argument (position)
:param x2: evaluation argument (position)
:param u: lower initial guess ("Lower corner" in 3D)
:param v: lower initial guess ("upper corner" in 3D)
:param hess: the hessian of the function
:param G:
:param stopping_actions: number of stopping actions
:param setG:
:return: the set of parameters and metrics after performing the triple
"""
nf = 0
n = len(x)
g = np.zeros(n)
nargin = 10
if setG:
nargin = 9
G = np.zeros((n, n))
ind = [i for i in range(n) if (u[i] < x[i] and x[i] < v[i])]
ind1 = [i for i in range(n) if (x[i] <= u[i] or x[i] >= v[i])]
for j in range(len(ind1)):
g[ind1[j]] = 0
for k in range(n):
G[ind1[j], k] = 0
G[k, ind1[j]] = 0
if len(ind) <= 1:
xtrip = copy.deepcopy(x)
ftrip = copy.deepcopy(f)
if len(ind) != 0:
for i in ind:
g[i] = 1
G[i, i] = 1
return xtrip, ftrip, g, G, x1, x2, nf
if setG:
G = np.zeros((n, n))
xtrip = copy.deepcopy(x)
ftrip = copy.deepcopy(f)
xtripnew = copy.deepcopy(x)
ftripnew = copy.deepcopy(f)
for j in range(len(ind)):
i = ind[j]
x = copy.deepcopy(xtrip)
f = copy.deepcopy(ftrip)
x[i] = x1[i]
policy = self.get_policy(x, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f1 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
x[i] = x2[i]
policy = self.get_policy(x, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f2 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
nf = nf + 2
g[i], G[i, i] = MCSUtils().polint1([xtrip[i], x1[i], x2[i]], [f, f1, f2])
if f1 <= f2:
if f1 < ftrip:
ftripnew = copy.deepcopy(f1)
xtripnew[i] = x1[i]
else:
if f2 < ftrip:
ftripnew = copy.deepcopy(f2)
xtripnew[i] = x2[i]
if nargin < 10:
k1 = -1
if f1 <= f2:
x[i] = x1[i]
else:
x[i] = x2[i]
for k in range(i):
if hess[i, k]:
if xtrip[k] > u[k] and xtrip[k] < v[k] and \
(len([m for m in range(len(ind)) if ind[m] == k]) != 0):
q1 = ftrip + g[k] * (x1[k] - xtrip[k]) + 0.5 * G[k, k] * (x1[k] - xtrip[k]) ** 2
q2 = ftrip + g[k] * (x2[k] - xtrip[k]) + 0.5 * G[k, k] * (x2[k] - xtrip[k]) ** 2
if q1 <= q2:
x[k] = x1[k]
else:
x[k] = x2[k]
policy = self.get_policy(x, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
f12 = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
nf = nf + 1
G[i, k] = MCSUtils().hessian(i, k, x, xtrip, f12, ftrip, g, G)
G[k, i] = G[i, k]
if f12 < ftripnew:
ftripnew = copy.deepcopy(f12)
xtripnew = copy.deepcopy(x)
k1 = k
x[k] = xtrip[k]
else:
G[i, k] = 0
G[k, i] = 0
if ftripnew < ftrip:
if x1[i] == xtripnew[i]:
x1[i] = xtrip[i]
else:
x2[i] = xtrip[i]
if nargin < 10 and k1 > -1:
if xtripnew[k1] == x1[k1]:
x1[k1] = xtrip[k1]
else:
x2[k1] = xtrip[k1]
for k in range(i + 1):
if (len([m for m in range(len(ind)) if ind[m] == k]) != 0):
g[k] = g[k] + G[i, k] * (xtripnew[i] - xtrip[i])
if nargin < 10 and k1 > -1:
g[k] = g[k] + G[k1, k] * (xtripnew[k1] - xtrip[k1])
xtrip = copy.deepcopy(xtripnew)
ftrip = copy.deepcopy(ftripnew)
return xtrip, ftrip, g, G, x1, x2, nf
[docs] def lspar(self, nloc: int, small: Union[float, int], sinit: int, short: float,
x: Union[List[Union[int, float]], NDArray[np.float64]], p: NDArray[Union[np.float64, np.int32]],
alist: List[Union[float, int]], flist: List[Union[float, int]], amin: float, amax: float,
alp: Union[int, float], abest: float, fbest: float,
fmed: float, up: List[float], down: List[float], monotone: int,
minima: List[int], nmin: int, unitlen: float, s: int, stopping_actions: int):
"""
The lspar function
:param nloc: (for local ~= 0) counter of points that have been
:param small: tolerance values
:param sinit: length of list of known steps
:param short:
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param amin:
:param amax:
:param alp:
:param abest: best step
:param fbest: best function value so far
:param fmed:
:param up:
:param down:
:param monotone:
:param minima:
:param nmin:
:param unitlen:
:param s:
:param stopping_actions: number if stopping actions
:return: the set of parameters and metrics after performing lspar
"""
cont = 1
fac = short
if s < 3:
alist, flist, alp, fac = self.lsnew(nloc, small, sinit, short, x, p, s,
alist, flist, amin, amax, alp,
abest, fmed, unitlen, stopping_actions)
cont = 0
if cont:
# fmin = min(flist)
i: Union[int, np.int32] = np.argmin(flist)
if i <= 1:
ind = [j for j in range(3)]
ii: Union[int, np.int32] = copy.deepcopy(i)
elif i >= s - 2:
ind = [j for j in range(s - 2 - 1, s)]
ii = i - (s - 1) + 2
else:
ind = [j for j in range(i - 1, i + 1)]
ii = 2 - 1
aa = [alist[j] for j in ind]
ff = [flist[j] for j in ind]
f12 = (ff[1] - ff[0]) / (aa[1] - aa[0])
f23 = (ff[2] - ff[1]) / (aa[2] - aa[1])
f123 = (f23 - f12) / (aa[2] - aa[0])
if not (f123 > 0):
alist, flist, alp, fac = self.lsnew(nloc, small, sinit, short, x, p, s, alist,
flist, amin, amax, alp, abest, fmed, unitlen,
stopping_actions)
# alist,flist,abest,fbest,fmed,up,down,monotone,minima,nmin,unitlen,s = GLSUtils().lssort(alist,flist)
cont = 0
if cont:
alp0 = 0.5 * (aa[1] + aa[2] - f23 / f123)
alp = LSUtils().lsguard(alp0, alist, amax, amin, small)
alptol = small * (aa[2] - aa[0])
if f123 == np.Inf or min([abs(i - alp) for i in alist]) <= alptol:
if ii == 0 or (ii == 1 and (aa[1] >= 0.5 * (aa[0] + aa[2]))):
alp = 0.5 * (aa[0] + aa[1])
else:
alp = 0.5 * (aa[1] + aa[2])
# else:
# np_print = alp0
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
return alist, flist, abest, fbest, fmed, up, down, monotone, minima, nmin, unitlen, s, alp, fac
[docs] def lsnew(self, nloc: int, small: Union[float, int], sinit: int, short: float,
x: Union[List[Union[int, float]], NDArray[np.float64]], p: NDArray[Union[np.float64, np.int32]],
s: int, alist: List[Union[float, int]], flist: List[Union[float, int]],
amin: float, amax: float, alp: Union[int, float], abest: float, fmed: float,
unitlen: float, stopping_actions: int):
"""
The lsnew function
:param nloc: (for local ~= 0) counter of points that have been
:param small: tolerance values
:param sinit:
:param short:
:param x: starting point
:param p: search direction
:param s: current depth level
:param alist: list of known steps
:param flist: function values of known steps
:param amin:
:param amax:
:param alp:
:param abest: best step
:param fmed:
:param unitlen:
:param stopping_actions
:return: set of parameters and metrics obtained after performing lsnew
"""
if alist[0] <= amin:
leftok = 0
elif flist[0] >= max(fmed, flist[1]):
leftok = (sinit == 1 or nloc > 1)
else:
leftok = 1
if alist[s - 1] >= amax:
rightok = 0
elif flist[s - 1] >= max(fmed, flist[s - 2]):
rightok = (sinit == 1 or nloc > 1)
else:
rightok = 1
if sinit == 1:
step = s - 1
else:
step = 1
fac = short
if leftok and (flist[0] < flist[s - 1] or (not rightok)):
# extra = 1
al = alist[0] - (alist[0 + step] - alist[0]) / small
alp = max(amin, al)
elif rightok:
# extra = 1
au = alist[s - 1] + (alist[s - 1] - alist[s - 1 - step]) / small
alp = min(au, amax)
else:
# extra = 0
lenth = [i - j for i, j in zip(alist[1: s], alist[0: s - 1])]
dist = [max(i, j, k) for i, j, k in zip([i - abest for i in alist[1: s]],
[abest - i for i in alist[0: s - 1]],
(unitlen * np.ones(s - 1)).tolist())]
wid = [lenth[i] / dist[i] for i in range(len(lenth))]
i = np.argmax(wid)
alp, fac = LSUtils().lssplit(int(i), alist, flist, short)
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
return alist, flist, alp, fac
[docs] def lsdescent(self, x: Union[List[Union[int, float]], NDArray[np.float64]],
p: NDArray[Union[np.float64, np.int32]], alist: List[Union[float, int]],
flist: List[Union[float, int]], alp: Union[int, float],
abest: float, fbest: float, fmed: float, up: List[float],
down: List[float], monotone: int, minima: List[int],
nmin: int, unitlen: float, s: int, stopping_actions: int):
"""
The lsdescent algorithm
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param alp:
:param abest: best step
:param fbest: best function value so far
:param fmed:
:param up:
:param down:
:param monotone:
:param minima:
:param nmin:
:param unitlen:
:param s: the current depth level
:param stopping_actions: number of stopping actions
:return: the set pf parameters and metrics obtained from performing lsdescent
"""
cont: Union[bool, int] = max([i == 0 for i in alist])
if cont:
fbest = min(flist)
i = np.argmin(flist)
if alist[i] < 0:
if alist[i] >= 4 * alist[i + 1]:
cont = 0
elif alist[i] > 0:
if alist[i] < 4 * alist[i - 1]:
cont = 0
else:
if i == 0:
fbest = flist[1]
elif i == s - 1:
fbest = flist[s - 2]
else:
fbest = min(flist[i - 1], flist[i + 1])
if cont:
if alist[i] != 0:
alp = alist[i] / 3
elif i == s - 1:
alp = alist[s - 2] / 3
elif i == 0:
alp = alist[1] / 3
else:
if alist[i + 1] - alist[i] > alist[i] - alist[i - 1]:
alp = alist[i + 1] / 3
else:
alp = alist[i - 1] / 3
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
return (alist, flist, alp, abest, fbest, fmed, up, down,
monotone, minima, nmin, unitlen, s)
[docs] def lsquart(self, nloc: int, small: Union[float, int], sinit: int, short: float,
x: NDArray[np.float64], p: NDArray[Union[np.float64, np.int32]],
alist: List[Union[float, int]], flist: List[float], amin: float, amax: float,
alp: float, abest: float, fbest: float,
fmed: float, up: List[float], down: List[float],
monotone: int, minima: List[Union[int, float, bool]], nmin: int, unitlen: float, s: int,
saturated: int, stopping_actions: int):
"""
The lsaquart function
:param nloc: (for local ~= 0) counter of points that have been
:param small: tolerance values
:param sinit: initial depth level
:param short:
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param amin:
:param amax:
:param alp:
:param up:
:param down:
:param monotone:
:param minima:
:param nmin
:param unitlen:
:param s: the current depth level
:param saturated:
:param stopping_actions: the number of stopping actions
:return: the parameters and metrics obtained from performing lsquart
"""
if alist[0] == alist[1]:
f12: Union[int, float] = 0
else:
f12 = (flist[1] - flist[0]) / (alist[1] - alist[0])
if alist[1] == alist[2]:
f23: Union[int, float] = 0
else:
f23 = (flist[2] - flist[1]) / (alist[2] - alist[1])
if alist[2] == alist[3]:
f34: Union[int, float] = 0
else:
f34 = (flist[3] - flist[2]) / (alist[3] - alist[2])
if alist[3] == alist[4]:
f45: Union[int, float] = 0
else:
f45 = (flist[4] - flist[3]) / (alist[4] - alist[3])
f123 = (f23 - f12) / (alist[2] - alist[0])
f234 = (f34 - f23) / (alist[3] - alist[1])
f345 = (f45 - f34) / (alist[4] - alist[2])
f1234 = (f234 - f123) / (alist[3] - alist[0])
f2345 = (f345 - f234) / (alist[4] - alist[1])
f12345 = (f2345 - f1234) / (alist[4] - alist[0])
good = np.Inf
if f12345 <= 0:
good = 0
(alist, flist, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s, saturated) = self.lslocal(nloc, small, sinit,
short, x, p, alist,
flist, amin, amax, alp,
abest, fbest, fmed, up,
down, monotone, minima, nmin,
unitlen, s, saturated, stopping_actions)
quart = 0
else:
quart = 1
if quart:
c = np.zeros(len(alist))
c[0] = f12345
c[1] = f1234 + c[0] * (alist[2] - alist[0])
c[2] = f234 + c[1] * (alist[2] - alist[3])
c[1] = c[1] + c[0] * (alist[2] - alist[3])
c[3] = f23 + c[2] * (alist[2] - alist[1])
c[2] = c[2] + c[1] * (alist[2] - alist[1])
c[1] = c[1] + c[0] * (alist[2] - alist[1])
c[4] = flist[2]
cmax = max(c)
c = np.divide(c, cmax)
hk = 4 * c[0]
compmat = [[0, 0, - c[3]], [hk, 0, - 2 * c[2]], [0, hk, - 3 * c[1]]]
ev = np.divide(np.linalg.eig(compmat)[0], hk)
i = np.where(ev.imag == 0)
if i[0].shape[0] == 1:
alp = alist[2] + ev[i[0][0]]
else:
ev = np.sort(ev)
alp1 = LSUtils().lsguard(alist[2] + ev[0], alist, amax, amin, small)
alp2 = LSUtils().lsguard(alist[2] + ev[2], alist, amax, amin, small)
f1 = cmax * LSUtils().quartic(c, alp1 - alist[2])
f2 = cmax * LSUtils().quartic(c, alp2 - alist[2])
if alp2 > alist[4] and f2 < max(flist):
alp = alp2
elif alp1 < alist[0] and f1 < max(flist):
alp = alp1
elif f2 <= f1:
alp = alp2
else:
alp = alp1
if max([i == alp for i in alist]):
quart = 0
if quart:
alp = LSUtils().lsguard(alp, alist, amax, amin, small)
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
return (alist, flist, amin, amax, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s, good, saturated)
[docs] def lssep(self, nloc: int, small: float, sinit: int, short: float,
x: Union[List[Union[float, int]], NDArray[np.float64]], p: NDArray[Union[np.float64, np.int32]],
alist: List[Union[float, int]], flist: List[float],
amin: float, amax: float, alp: float, abest: float, fbest: float,
fmed: float, up: List[float], down: List[float], monotone: int,
minima: List[int], nmin: int, unitlen: float,
s: int, stopping_actions: int):
"""
The lssep function
:param nloc: (for local ~= 0) counter of points that have been
:param small: tolerance values
:param sinit: initial depth levekl
:param short:
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param amin:
:param amax:
:param alp:
:param abest: best step
:param fbest: best function value so far
:param fmed: median function value
:param up:
:param down:
:param monotone:
:param minima:
:param nmin:
:param untilen:
:param s: current depth level
:param stopping_actions: the number of stopping actions
:return: the parameters and metrics obtained from performing lssep
"""
nsep = 0
while nsep < nmin:
down = [i < j for i, j in zip(flist[1: s], flist[0: s - 1])]
sep = [i and j and k for i, j, k in zip([True, True] + down, [False] + up + [False], down + [True, True])]
temp_sep = [i and j and k for i, j, k in zip([True, True] + up,
[False] + down + [False], up + [True, True])]
sep = [i or j for i, j in zip(sep, temp_sep)]
ind = [i for i in range(len(sep)) if sep[i]]
if len(ind) == 0:
break
aa = [0.5 * (alist[i] + alist[i - 1]) for i in ind] # interval midpoints
if len(aa) > nloc:
# ff: List[Union[int, float]] = [min(flist[i], flist[j]) for i, j in ind]
ff: List[Union[int, float]] = [min(flist[i], flist[j]) for
i, j in enumerate(ind)] # this must be the intent
ind = list(np.argsort(ff))
ff.sort()
aa = [aa[ind[i]] for i in range(0, nloc)]
for alp in aa:
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
nsep = nsep + 1
if nsep >= nmin:
break
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
for times in range(0, nmin - nsep):
print(times)
alist, flist, alp, fac = self.lsnew(nloc, small, sinit, short, x, p, s, alist,
flist, amin, amax, alp, abest, fmed, unitlen, stopping_actions)
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
return (alist, flist, amin, amax, alp, abest, fbest, fmed,
up, down, monotone, minima, nmin, unitlen, s)
[docs] def lslocal(self, nloc: int, small: float, sinit: int, short: float,
x: Union[List[Union[int, float]], NDArray[np.float64]],
p: NDArray[Union[np.float64, np.int32]], alist: List[Union[float, int]],
flist: List[float], amin: float, amax: float, alp: float,
abest: float, fbest: float, fmed: float,
up: List[float], down: List[float], monotone: int,
minima: List[Union[int, float, bool]], nmin: int, unitlen: float, s: int,
saturated: int, stopping_actions: int):
"""
The lslocal function
:param nloc: (for local ~= 0) counter of points that have been
:param small: tolerance values
:param sinit: the initial depth level:
:param short:
:param x: starting point
:param p: search direction
:param alist: list of known steps
:param flist: function values of known steps
:param amin:
:param amax:
:param alp:
:param abest: best step
:param fbest: best function value so far
:param fmed: median function value
:param up:
:param down:
:param monotone: if function is monotone or not
:param minima:
:param nmin:
:param unitlen:
:param s: current depth level
:return: the parameters and metrics obtained from lslocal
"""
up = [i < j for i, j in zip(flist[0: s - 1], flist[1: s])]
down = [i <= j for i, j in zip(flist[1: s], flist[0: s - 1])]
down[s - 2] = (flist[s - 1] < flist[s - 2])
minima = [i and j for i, j in zip(up + [True], [True] + down)]
imin = [i for i in range(len(minima)) if minima[i]]
ff = [flist[i] for i in imin]
perm = np.argsort(ff)
ff.sort()
imin = [imin[i] for i in perm]
nind = min(nloc, len(imin))
imin = imin[nind - 1:: - 1]
nadd = 0
nsat = 0
for i in imin:
if i <= 1:
ind = [j for j in range(5)]
ii = i
elif i >= s - 2:
ind = [j for j in range(s - 5, s)]
ii = i - (s - 1) + 4
else:
ind = [j for j in range(i - 2, i + 3)]
ii = 2
aa = [alist[i] for i in ind]
ff = [flist[i] for i in ind]
f12 = (ff[1] - ff[0]) / (aa[1] - aa[0])
f23 = (ff[2] - ff[1]) / (aa[2] - aa[1])
f34 = (ff[3] - ff[2]) / (aa[3] - aa[2])
f45 = (ff[4] - ff[3]) / (aa[4] - aa[3])
f123 = (f23 - f12) / (aa[2] - aa[0])
f234 = (f34 - f23) / (aa[3] - aa[1])
f345 = (f45 - f34) / (aa[4] - aa[2])
if ii == 0:
cas = 0
if f123 > 0 and f123 < np.Inf:
alp = 0.5 * (aa[1] + aa[2] - f23 / f123)
if alp < amin:
cas = -1
else:
alp = -np.Inf
if alist[0] == amin and flist[1] < flist[2]:
cas = -1
alp = LSUtils().lsguard(alp, alist, amax, amin, small)
elif ii == 4:
cas = 0
if f345 > 0 and f345 < np.Inf:
alp = 0.5 * (aa[2] + aa[3] - f34 / f345)
if alp > amax:
cas = -1
else:
alp = np.Inf
if alist[s - 1] == amax and flist[s - 2] < flist[s - 3]:
cas = -1
alp = LSUtils().lsguard(alp, alist, amax, amin, small)
elif not (f234 > 0 and f234 < np.Inf):
cas = 0
if ii < 2:
alp = 0.5 * (aa[1] + aa[2] - f23 / f123)
else:
alp = 0.5 * (aa[2] + aa[3] - f34 / f345)
elif not (f123 > 0 and f123 < np.Inf):
if f345 > 0 and f345 < np.Inf:
cas = 5
else:
cas = 0
alp = 0.5 * (aa[2] + aa[3] - f34 / f234)
elif f345 > 0 and f345 < np.Inf and ff[1] > ff[3]:
cas = 5
else:
cas = 1
if cas == 0:
alp = max(amin, min(alp, amax))
elif cas == 1:
if ff[1] < ff[2]:
f13 = (ff[2] - ff[0]) / (aa[2] - aa[0])
f1x4 = (f34 - f13) / (aa[3] - aa[0])
else:
f24 = (ff[3] - ff[1]) / (aa[3] - aa[1])
f1x4 = (f24 - f12) / (aa[3] - aa[0])
alp = 0.5 * (aa[1] + aa[2] - f23 / (f123 + f234 - f1x4))
if alp <= min(aa) or alp >= max(aa):
cas = 0
alp = 0.5 * (aa[1] + aa[2] - f23 / max(f123, f234))
elif cas == 5:
if ff[2] < ff[3]:
f24 = (ff[3] - ff[1]) / (aa[3] - aa[1])
f2x5 = (f45 - f24) / (aa[4] - aa[1])
else:
f35 = (ff[4] - ff[2]) / (aa[4] - aa[2])
f2x5 = (f35 - f23) / (aa[4] - aa[1])
alp = 0.5 * (aa[2] + aa[3] - f34 / (f234 + f345 - f2x5))
if alp <= min(aa) or alp >= max(aa):
cas = 0
alp = 0.5 * (aa[2] + aa[3] - f34 / max(f234, f345))
if cas < 0 or flist[i] > fmed:
alptol: Union[float, int] = 0
elif cas >= 0:
if i == 0:
alptol = small * (alist[2] - alist[0])
elif i == s - 1:
alptol = small * (alist[s - 1] - alist[s - 3])
else:
alptol = small * (alist[i + 1] - alist[i - 1])
close = (min([abs(i - alp) for i in alist]) <= alptol)
if cas < 0 or close:
nsat = nsat + 1
saturated = (nsat == nind)
final = saturated and not max([i == alp for i in alist])
if cas >= 0 and (final or not close):
nadd = nadd + 1
policy = self.get_policy(x + alp * p, L=stopping_actions)
avg_metrics = self.eval_theta(policy=policy,
max_steps=self.experiment_config.hparams[
agents_constants.COMMON.MAX_ENV_STEPS].value)
falp = round(avg_metrics[env_constants.ENV_METRICS.RETURN], 3)
alist.append(alp)
flist.append(falp)
if nadd:
(alist, flist, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s) = GLSUtils().lssort(alist, flist)
return (alist, flist, alp, abest, fbest, fmed, up, down, monotone,
minima, nmin, unitlen, s, saturated)