Source code for csle_common.metastore.metastore_facade

from typing import List, Union, Any, Tuple
import psycopg
import json
import time
import zlib
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.simulation_config.simulation_env_config import SimulationEnvConfig
from csle_common.dao.emulation_config.emulation_trace import EmulationTrace
from csle_common.dao.simulation_config.simulation_trace import SimulationTrace
from csle_common.dao.emulation_config.emulation_simulation_trace import EmulationSimulationTrace
from csle_common.logging.log import Logger
from csle_common.dao.system_identification.emulation_statistics import EmulationStatistics
from csle_common.dao.training.experiment_execution import ExperimentExecution
from csle_common.dao.training.multi_threshold_stopping_policy import MultiThresholdStoppingPolicy
from csle_common.dao.training.linear_threshold_stopping_policy import LinearThresholdStoppingPolicy
from csle_common.dao.jobs.training_job_config import TrainingJobConfig
from csle_common.dao.jobs.data_collection_job_config import DataCollectionJobConfig
from csle_common.dao.jobs.system_identification_job_config import SystemIdentificationJobConfig
from csle_common.dao.system_identification.gaussian_mixture_system_model import GaussianMixtureSystemModel
from csle_common.dao.system_identification.empirical_system_model import EmpiricalSystemModel
from csle_common.dao.system_identification.mcmc_system_model import MCMCSystemModel
from csle_common.dao.system_identification.gp_system_model import GPSystemModel
from csle_common.dao.encoding.np_encoder import NpEncoder
from csle_common.dao.training.ppo_policy import PPOPolicy
from csle_common.dao.training.tabular_policy import TabularPolicy
from csle_common.dao.training.alpha_vectors_policy import AlphaVectorsPolicy
from csle_common.dao.training.dqn_policy import DQNPolicy
from csle_common.dao.training.fnn_with_softmax_policy import FNNWithSoftmaxPolicy
from csle_common.dao.training.vector_policy import VectorPolicy
from csle_common.dao.emulation_config.emulation_execution import EmulationExecution
from csle_common.dao.management.management_user import ManagementUser
from csle_common.dao.management.session_token import SessionToken
from csle_common.dao.datasets.traces_dataset import TracesDataset
from csle_common.dao.datasets.statistics_dataset import StatisticsDataset
from csle_common.dao.emulation_config.config import Config
from csle_common.util.general_util import GeneralUtil


[docs]class MetastoreFacade: """ Facade for the metastore, contains methods for querying the metastore """
[docs] @staticmethod def list_emulations() -> List[EmulationEnvConfig]: """ :return: A list of emulations in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_record_to_dto(x), records))
[docs] @staticmethod def list_emulations_ids() -> List[Tuple[int, str]]: """ :return: A list of emulation ids and names in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,name FROM {constants.METADATA_STORE.EMULATIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def get_emulation_by_name(name: str) -> Union[None, EmulationEnvConfig]: """ Function for extracting the metadata of an emulation with a given name :param name: the name of the emulation :return: The emulation config or None if the emulation was not found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATIONS_TABLE} WHERE name = %s", (name,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_record_to_dto(emulation_record=record) return record
[docs] @staticmethod def get_emulation(id: int) -> Union[None, EmulationEnvConfig]: """ Function for extracting the metadata of an emulation with a id :param id: the id of the emulation :return: The emulation config or None if the emulation was not found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATIONS_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_record_to_dto(emulation_record=record) return record
[docs] @staticmethod def list_simulations() -> List[SimulationEnvConfig]: """ :return: A list of simulations in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_simulation_record_to_dto(x), records))
[docs] @staticmethod def list_simulation_ids() -> List[Tuple[int, str]]: """ :return: A list of simulation ids and names in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,name FROM {constants.METADATA_STORE.SIMULATIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def get_simulation_by_name(name: str) -> Union[None, SimulationEnvConfig]: """ Function for extracting the metadata of a simulation with a given name :param name: the name of the simulation :return: The simulation config or None if the simulation was not found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATIONS_TABLE} WHERE name = %s", (name,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_simulation_record_to_dto(simulation_record=record) return record
[docs] @staticmethod def get_simulation(id: int) -> Union[None, SimulationEnvConfig]: """ Function for extracting the metadata of a simulation with a given id :param id: the id of the simulation :return: The simulation config or None if the simulation was not found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATIONS_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_simulation_record_to_dto(simulation_record=record) return record
@staticmethod def _convert_emulation_record_to_dto(emulation_record) -> EmulationEnvConfig: """ Converts an emulation record fetched from the metastore into a DTO :param emulation_record: the record to convert :return: the DTO representing the record """ emulation_config_json_str = json.dumps(emulation_record[2], indent=4, sort_keys=True, cls=NpEncoder) emulation_env_config: EmulationEnvConfig = EmulationEnvConfig.from_dict(json.loads(emulation_config_json_str)) emulation_env_config.id = emulation_record[0] return emulation_env_config @staticmethod def _convert_simulation_record_to_dto(simulation_record) -> SimulationEnvConfig: """ Converts an simulation record fetched from the metastore into a DTO :param simulation_record: the record to convert :return: the DTO representing the record """ simulation_config_json_str = json.dumps(simulation_record[2], indent=4, sort_keys=True, cls=NpEncoder) simulation_env_config: SimulationEnvConfig = SimulationEnvConfig.from_dict( json.loads(simulation_config_json_str)) simulation_env_config.id = simulation_record[0] return simulation_env_config @staticmethod def _convert_emulation_trace_record_to_dto(emulation_trace_record) -> EmulationTrace: """ Converts an emulation trace record fetched from the metastore into a DTO :param emulation_trace_record: the record to convert :return: the DTO representing the record """ emulation_trace_json_str = zlib.decompress(emulation_trace_record[2]).decode() emulation_trace: EmulationTrace = EmulationTrace.from_dict(json.loads(emulation_trace_json_str)) emulation_trace.id = emulation_trace_record[0] return emulation_trace @staticmethod def _convert_emulation_simulation_trace_record_to_dto(emulation_simulation_trace_record) \ -> EmulationSimulationTrace: """ Converts an emulation-simulkation trace record fetched from the metastore into a DTO :param emulation_simulation_trace_record: the record to convert :return: the DTO representing the record """ id = emulation_simulation_trace_record[0] emulation_trace_id = emulation_simulation_trace_record[1] simulation_trace_id = emulation_simulation_trace_record[2] emulation_trace = MetastoreFacade.get_emulation_trace(id=emulation_trace_id) simulation_trace = MetastoreFacade.get_simulation_trace(id=simulation_trace_id) if emulation_trace is None: raise ValueError(f"Could not find an emulation trace with id: {emulation_trace_id}") if simulation_trace is None: raise ValueError(f"Could not find a simulation trace with id: {simulation_trace_id}") emulation_simulation_trace = EmulationSimulationTrace(emulation_trace=emulation_trace, simulation_trace=simulation_trace) emulation_simulation_trace.id = id return emulation_simulation_trace @staticmethod def _convert_simulation_trace_record_to_dto(simulation_trace_record) -> SimulationTrace: """ Converts an emulation trace record fetched from the metastore into a DTO :param simulation_trace_record: the record to convert :return: the DTO representing the record """ simulation_trace_json_str = json.dumps(simulation_trace_record[2], indent=4, sort_keys=True, cls=NpEncoder) simulation_trace: SimulationTrace = SimulationTrace.from_dict(json.loads(simulation_trace_json_str)) simulation_trace.id = simulation_trace_record[0] return simulation_trace @staticmethod def _convert_emulation_statistics_record_to_dto(emulation_statistics_record) -> EmulationStatistics: """ Converts an emulation statistics record fetched from the metastore into a DTO :param emulation_statistics_record: the record to convert :return: the DTO representing the record """ emulation_statistics_json_str = zlib.decompress(emulation_statistics_record[2]).decode() emulation_statistics: EmulationStatistics = EmulationStatistics.from_dict( json.loads(emulation_statistics_json_str, object_hook=lambda d: {int(k.split(".", 1)[0]) if k.split(".", 1)[0].lstrip('-').isdigit() else k: v for k, v in d.items()})) emulation_statistics.id = emulation_statistics_record[0] return emulation_statistics @staticmethod def _convert_emulation_image_record_to_tuple(emulation_image_record) -> Tuple[str, bytes]: """ Converts an emulation image record fetched from the metastore into bytes :param emulation_image_record: the record to convert :return: a tuple (emulation name, image bytes) """ emulation_name = emulation_image_record[1] image_bytes = emulation_image_record[2] return emulation_name, image_bytes @staticmethod def _convert_simulation_image_record_to_tuple(simulation_image_record) -> Tuple[str, bytes]: """ Converts a simulation image record fetched from the metastore into bytes :param simulation_image_record: the record to convert :return: a tuple (emulation name, image bytes) """ emulation_name = simulation_image_record[1] image_bytes = simulation_image_record[2] return emulation_name, image_bytes
[docs] @staticmethod def install_emulation(config: EmulationEnvConfig) -> Union[int, None]: """ Installs the emulation configuration in the metastore :param config: the config to install :return: id of the created row or None if the installation failed """ Logger.__call__().get_logger().debug(f"Installing emulation:{config.name} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: try: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.EMULATIONS_TABLE) config_json_str = json.dumps(config.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.EMULATIONS_TABLE} (id, name, config) " f"VALUES (%s, %s, %s) RETURNING id", (id, config.name, config_json_str)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Emulation {config.name} installed successfully") return id_of_new_row except psycopg.errors.UniqueViolation as e: Logger.__call__().get_logger().debug(f"Emulation {config.name} is already installed " f"{str(e), repr(e)}") return None
[docs] @staticmethod def uninstall_emulation(config: EmulationEnvConfig) -> None: """ Uninstalls the emulation configuration in the metastore :param config: the config to uninstall :return: None """ Logger.__call__().get_logger().debug(f"Uninstalling emulation:{config.name} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EMULATIONS_TABLE} WHERE name = %s", (config.name,)) conn.commit() Logger.__call__().get_logger().debug(f"Emulation {config.name} uninstalled successfully") return None
[docs] @staticmethod def install_simulation(config: SimulationEnvConfig) -> Union[None, int]: """ Installs the simulation configuration in the metastore :param config: the config to install :return: id of the created row """ Logger.__call__().get_logger().debug(f"Installing simulation:{config.name} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: try: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.SIMULATIONS_TABLE) config_json_str = config.to_json_str() cur.execute(f"INSERT INTO {constants.METADATA_STORE.SIMULATIONS_TABLE} " f"(id, name, config) " f"VALUES (%s, %s, %s) RETURNING id", (id, config.name, config_json_str)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Simulation {config.name} installed successfully") return id_of_new_row except psycopg.errors.UniqueViolation as e: Logger.__call__().get_logger().debug(f"Simulation {config.name} is already installed, " f"{str(e), repr(e)}") return None
[docs] @staticmethod def uninstall_simulation(config: SimulationEnvConfig) -> None: """ Uninstalls the simulation configuration in the metastore :param config: the config to uninstall :return: None """ Logger.__call__().get_logger().debug(f"Uninstalling simulation:{config.name} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.SIMULATIONS_TABLE} WHERE name = %s", (config.name,)) conn.commit() Logger.__call__().get_logger().debug(f"Simulation {config.name} uninstalled successfully")
[docs] @staticmethod def save_emulation_trace(emulation_trace: EmulationTrace) -> Union[Any, int]: """ Saves a trace from the emulation :param emulation_trace: the emulation trace to save :return: id of the created record """ Logger.__call__().get_logger().debug(f"Installing emulation trace for " f"emulation:{emulation_trace.emulation_name} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.EMULATION_TRACES_TABLE) config_json_str = json.dumps(emulation_trace.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) # Need to compress due to postgres size limits compressed_json_str = zlib.compress(config_json_str.encode()) cur.execute(f"INSERT INTO {constants.METADATA_STORE.EMULATION_TRACES_TABLE} " f"(id, emulation_name, trace) " f"VALUES (%s, %s, %s) RETURNING id", (id, emulation_trace.emulation_name, compressed_json_str)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Emulation trace for " f"emulation {emulation_trace.emulation_name} saved successfully") return id_of_new_row
[docs] @staticmethod def save_emulation_statistic(emulation_statistics: EmulationStatistics) -> Union[Any, int]: """ Saves a DTO with emulation statistics to the metastore :param emulation_statistics: the emulation statistics to save :return: id of the created record """ Logger.__call__().get_logger().debug(f"Installing statistics " f"for emulation:{emulation_statistics.emulation_name} in the metastore") emulation_statistics.compute_descriptive_statistics_and_distributions() with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.EMULATION_STATISTICS_TABLE) config_json_str = json.dumps(emulation_statistics.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) # Need to compress due to postgres size limits compressed_json_str = zlib.compress(config_json_str.encode()) cur.execute(f"INSERT INTO " f"{constants.METADATA_STORE.EMULATION_STATISTICS_TABLE} " f"(id, emulation_name, statistics) " f"VALUES (%s, %s, %s) RETURNING id", (id, emulation_statistics.emulation_name, compressed_json_str)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Statistics for emulation " f"{emulation_statistics.emulation_name} saved successfully") return id_of_new_row
[docs] @staticmethod def update_emulation_statistic(emulation_statistics: EmulationStatistics, id: int) -> None: """ Updates a row with emulation statistic in the metastore :param emulation_statistics: the emulation statistics to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Installing statistics " f"for emulation:{emulation_statistics.emulation_name} in the metastore") emulation_statistics.compute_descriptive_statistics_and_distributions() with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(emulation_statistics.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) compressed_json_str = zlib.compress(config_json_str.encode()) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.EMULATION_STATISTICS_TABLE} " f" SET statistics=%s " f"WHERE {constants.METADATA_STORE.EMULATION_STATISTICS_TABLE}.id = %s", (compressed_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Statistics for emulation " f"{emulation_statistics.emulation_name} with id {id} " f"updated successfully")
[docs] @staticmethod def list_emulation_statistics() -> List[EmulationStatistics]: """ :return: A list of emulation statistics in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_STATISTICS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_statistics_record_to_dto(x), records))
[docs] @staticmethod def list_emulation_statistics_ids() -> List[Tuple[int, str]]: """ :return: A list of emulation statistics ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name FROM {constants.METADATA_STORE.EMULATION_STATISTICS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def list_emulation_traces() -> List[EmulationTrace]: """ :return: A list of emulation traces in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_TRACES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_trace_record_to_dto(x), records))
[docs] @staticmethod def list_emulation_traces_ids() -> List[Tuple[int, str]]: """ :return: A list of emulation traces ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name FROM {constants.METADATA_STORE.EMULATION_TRACES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def list_emulation_simulation_traces_ids() -> List[int]: """ :return: A list of emulation-simulation traces ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id FROM {constants.METADATA_STORE.EMULATION_SIMULATION_TRACES_TABLE}") records = cur.fetchall() return list(map(lambda x: int(x[0]), records))
[docs] @staticmethod def list_simulation_traces_ids() -> List[Tuple[int, str]]: """ :return: A list of simulation traces ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,gym_env FROM {constants.METADATA_STORE.SIMULATION_TRACES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def list_simulation_traces() -> List[SimulationTrace]: """ :return: A list of simulation traces in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATION_TRACES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_simulation_trace_record_to_dto(x), records))
[docs] @staticmethod def remove_simulation_trace(simulation_trace: SimulationTrace) -> None: """ Removes a simulation trace from the metastore :param simulation_trace: the simulation trace to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing simulation traec with " f"id:{simulation_trace.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.SIMULATION_TRACES_TABLE} WHERE id = %s", (simulation_trace.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Simulation trace " f"with id {simulation_trace.id} deleted successfully")
[docs] @staticmethod def get_emulation_trace(id: int) -> Union[None, EmulationTrace]: """ Function for fetching an emulation trace with a given id from the metastore :param id: the id of the emulation trace :return: The emulation trace or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_TRACES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_trace_record_to_dto(emulation_trace_record=record) return record
[docs] @staticmethod def remove_emulation_trace(emulation_trace: EmulationTrace) -> None: """ Removes an emulation trace from the metastore :param emulation_trace: the emulation trace to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing simulation traec with " f"id:{emulation_trace.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EMULATION_TRACES_TABLE} WHERE id = %s", (emulation_trace.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Emulation trace " f"with id {emulation_trace.id} deleted successfully")
[docs] @staticmethod def remove_emulation_simulation_trace(emulation_simulation_trace: EmulationSimulationTrace) -> None: """ Removes an emulation-simulation trace from the metastore :param emulation_simulation_trace: the emulation-simulation trace to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing an emulation-simulation trace with " f"id:{emulation_simulation_trace.id} " f"from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EMULATION_SIMULATION_TRACES_TABLE} WHERE id = %s", (emulation_simulation_trace.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Emulation-Simulation trace " f"with id {emulation_simulation_trace.id} deleted successfully")
[docs] @staticmethod def get_emulation_statistic(id: int) -> Union[None, EmulationStatistics]: """ Function for fetching an emulation satistic with a given id from the metastore :param id: the id of the statistics :return: The emulation statistic or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_STATISTICS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_statistics_record_to_dto( emulation_statistics_record=record) return record
[docs] @staticmethod def remove_emulation_statistic(emulation_statistic: EmulationStatistics) -> None: """ Removes an emulation statistic from the metastore :param emulation_statistic: the emulation statistic to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing emulation statistic with " f"id:{emulation_statistic.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EMULATION_STATISTICS_TABLE} WHERE id = %s", (emulation_statistic.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Emulation statistic " f"with id {emulation_statistic.id} deleted successfully")
[docs] @staticmethod def get_simulation_trace(id: int) -> Union[None, SimulationTrace]: """ Function for fetching a simulation trace with a given id from the metastore :param id: the id of the simulation trace :return: The simulation trace or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATION_TRACES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_simulation_trace_record_to_dto(simulation_trace_record=record) return record
[docs] @staticmethod def save_simulation_trace(simulation_trace: SimulationTrace) -> Union[Any, int]: """ Saves a trace from the simulation :param simulation_trace: the simulation trace to save :return: id of the created record """ Logger.__call__().get_logger().debug(f"Installing simulation trace " f"for simulation env:{simulation_trace.simulation_env} " f"in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.SIMULATION_TRACES_TABLE) config_json_str = json.dumps(simulation_trace.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.SIMULATION_TRACES_TABLE} (id, gym_env, trace) " f"VALUES (%s, %s, %s) RETURNING id", (id, simulation_trace.simulation_env, config_json_str)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Simulation trace for " f"env {simulation_trace.simulation_env} " f"saved successfully") return id_of_new_row
[docs] @staticmethod def save_emulation_simulation_trace(emulation_simulation_trace: EmulationSimulationTrace) -> Union[Any, int]: """ Saves an emulation_simulation trace :param emulation_simulation_trace: the emulation trace to save :return: id of the new row """ Logger.__call__().get_logger().debug(f"Installing emulation-simulation trace for " f"emulation:{emulation_simulation_trace.emulation_trace.emulation_name} " f"and simulation environment: " f"{emulation_simulation_trace.simulation_trace.simulation_env} in the " f"metastore") emulation_trace_id = MetastoreFacade.save_emulation_trace( emulation_trace=emulation_simulation_trace.emulation_trace) simulation_trace_id = MetastoreFacade.save_simulation_trace( simulation_trace=emulation_simulation_trace.simulation_trace) with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.EMULATION_SIMULATION_TRACES_TABLE) cur.execute(f"INSERT INTO " f"{constants.METADATA_STORE.EMULATION_SIMULATION_TRACES_TABLE} " f"(id, emulation_trace, simulation_trace) " f"VALUES (%s, %s, %s) RETURNING id", (id, emulation_trace_id, simulation_trace_id)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug( f"Emulation-Simulation trace for " f"emulation {emulation_simulation_trace.emulation_trace.emulation_name} " f"and simulation: {emulation_simulation_trace.simulation_trace.simulation_env} saved successfully") return id_of_new_row
[docs] @staticmethod def list_emulation_simulation_traces() -> List[EmulationSimulationTrace]: """ :return: A list of emulation-simulation traces in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_SIMULATION_TRACES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_simulation_trace_record_to_dto(x), records))
[docs] @staticmethod def get_emulation_simulation_trace(id: int) -> Union[None, EmulationSimulationTrace]: """ Function for fetching a simulation trace with a given id from the metastore :param id: the id of the emulation-simulation trace :return: The emulation-simulation trace or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_SIMULATION_TRACES_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_simulation_trace_record_to_dto( emulation_simulation_trace_record=record) return record
[docs] @staticmethod def save_emulation_image(img: bytes, emulation_name: str) -> Any: """ Saves the image of an emulation in the metastore :param img: the image data to save :param emulation_name: the name of the emulation :return: id of the created row """ Logger.__call__().get_logger().debug(f"Saving image for emulation:{emulation_name} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: try: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.EMULATION_IMAGES_TABLE) cur.execute(f"INSERT INTO {constants.METADATA_STORE.EMULATION_IMAGES_TABLE} " f"(id, emulation_name, image) VALUES (%s, %s, %s) RETURNING id", (id, emulation_name, img)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Saved image for emulation {emulation_name} successfully") return id_of_new_row except Exception as e: Logger.__call__().get_logger().warning(f"There was an error saving an image " f"for emulation {emulation_name}, {str(e), repr(e)}")
[docs] @staticmethod def list_emulation_images() -> List[Tuple[str, bytes]]: """ :return: A list of emulation names and images in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_IMAGES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_image_record_to_tuple(x), records))
[docs] @staticmethod def get_emulation_image(emulation_name: str) -> Union[None, Tuple[str, bytes]]: """ Function for fetching the image of a given emulation :param emulation_name: the name of the emulatin to fetch the image for :return: The simulation trace or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_IMAGES_TABLE} " f"WHERE emulation_name = %s", (emulation_name,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_image_record_to_tuple(emulation_image_record=record) return record
[docs] @staticmethod def delete_all(table: str) -> None: """ Deletes all rows in the metastore from a given table :param table: the table to delete from :return: None """ Logger.__call__().get_logger().debug(f"Deleting all traces from table " f"{table}") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {table}") conn.commit() Logger.__call__().get_logger().debug(f"All traces deleted from table " f"{table} successfully")
[docs] @staticmethod def save_simulation_image(img: bytes, simulation_name: str) -> Union[Any, int]: """ Saves the image of a simulation in the metastore :param img: the image data to save :param simulation_name: the name of the simulation :return: id of the created row """ Logger.__call__().get_logger().debug(f"Saving image for simulation:{simulation_name} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: try: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.SIMULATION_IMAGES_TABLE) cur.execute(f"INSERT INTO {constants.METADATA_STORE.SIMULATION_IMAGES_TABLE} " f"(id, simulation_name, image) VALUES (%s, %s, %s) RETURNING id", (id, simulation_name, img)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Saved image for simulation {simulation_name} successfully") return id_of_new_row except Exception as e: Logger.__call__().get_logger().warning(f"There was an error saving an image " f"for simulation {simulation_name}, {str(e), repr(e)}") return None
[docs] @staticmethod def list_simulation_images() -> List[Tuple[str, bytes]]: """ :return: A list of simulation names and images in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATION_IMAGES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_simulation_image_record_to_tuple(x), records))
[docs] @staticmethod def get_simulation_image(simulation_name: str) -> Union[None, Tuple[str, bytes]]: """ Function for fetching the image of a given simulation :param simulation_name: the name of the simulation to fetch the image for :return: The simulation trace or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SIMULATION_IMAGES_TABLE} " f"WHERE simulation_name = %s", (simulation_name,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_simulation_image_record_to_tuple(simulation_image_record=record) return record
[docs] @staticmethod def save_experiment_execution(experiment_execution: ExperimentExecution) -> Union[Any, int]: """ Saves an experiment execution to the metastore :param experiment_execution: the experiment execution to save :return: id of the created record """ Logger.__call__().get_logger().debug(f"Installing experiment exection for emulation: " f"{experiment_execution.emulation_name} " f"and simulation: {experiment_execution.simulation_name} " f"in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE) config_json_str = json.dumps(experiment_execution.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE} " f"(id, execution, simulation_name, emulation_name) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, config_json_str, experiment_execution.simulation_name, experiment_execution.emulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug(f"Experiment execution for " f"emulation {experiment_execution.emulation_name} " f"and simulation {experiment_execution.simulation_name} " f"saved successfully") return id_of_new_row
[docs] @staticmethod def list_experiment_executions() -> List[ExperimentExecution]: """ :return: A list of emulation traces in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_experiment_execution_record_to_dto(x), records))
[docs] @staticmethod def list_experiment_executions_ids() -> List[Tuple[int, str, str]]: """ :return: A list of experiment execution ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name,emulation_name FROM " f"{constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), str(x[2])), records))
@staticmethod def _convert_experiment_execution_record_to_dto(experiment_execution_record) -> ExperimentExecution: """ Converts an experiment execution record fetched from the metastore into a DTO :param experiment_execution_record: the record to convert :return: the DTO representing the record """ experiment_execution_json = json.dumps(experiment_execution_record[1], indent=4, sort_keys=True, cls=NpEncoder) experiment_execution: ExperimentExecution = ExperimentExecution.from_dict(json.loads(experiment_execution_json)) experiment_execution.id = experiment_execution_record[0] return experiment_execution
[docs] @staticmethod def get_experiment_execution(id: int) -> Union[None, ExperimentExecution]: """ Function for fetching an experiment execution with a given id from the metastore :param id: the id of the emulation trace :return: The emulation trace or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_experiment_execution_record_to_dto( experiment_execution_record=record) return record
[docs] @staticmethod def remove_experiment_execution(experiment_execution: ExperimentExecution) -> None: """ Removes an experiment execution from the metastore :param experiment_execution: the experiment execution to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing experiment execution with " f"id:{experiment_execution.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE} WHERE id = %s", (experiment_execution.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Experiment execution " f"with id {experiment_execution.id} deleted successfully")
[docs] @staticmethod def list_multi_threshold_stopping_policies() -> List[MultiThresholdStoppingPolicy]: """ :return: A list of Multi-threshold stopping policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MULTI_THRESHOLD_STOPPING_POLICIES_TABLE}") records = cur.fetchall() return list(map( lambda x: MetastoreFacade._convert_multi_threshold_stopping_policy_record_to_dto(x), records))
[docs] @staticmethod def list_multi_threshold_stopping_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of Multi-threshold stopping policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM " f"{constants.METADATA_STORE.MULTI_THRESHOLD_STOPPING_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_multi_threshold_stopping_policy_record_to_dto(multi_threshold_stopping_policy_record) \ -> MultiThresholdStoppingPolicy: """ Converts a Multi-threshold stopping policy record fetched from the metastore into a DTO :param multi_threshold_stopping_policy_record: the record to convert :return: the DTO representing the record """ multi_threshold_stopping_policy_json = json.dumps(multi_threshold_stopping_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) multi_threshold_stopping_policy: MultiThresholdStoppingPolicy = MultiThresholdStoppingPolicy.from_dict( json.loads(multi_threshold_stopping_policy_json)) multi_threshold_stopping_policy.id = multi_threshold_stopping_policy_record[0] return multi_threshold_stopping_policy
[docs] @staticmethod def get_multi_threshold_stopping_policy(id: int) -> Union[None, MultiThresholdStoppingPolicy]: """ Function for fetching a mult-threshold policy with a given id from the metastore :param id: the id of the multi-threshold policy :return: The mult-threshold policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MULTI_THRESHOLD_STOPPING_POLICIES_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_multi_threshold_stopping_policy_record_to_dto( multi_threshold_stopping_policy_record=record) return record
[docs] @staticmethod def remove_multi_threshold_stopping_policy(multi_threshold_stopping_policy: MultiThresholdStoppingPolicy) -> None: """ Removes a multi-threshold stopping policy from the metastore :param multi_threshold_stopping_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing Multi-threshold stopping policy with " f"id:{multi_threshold_stopping_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.MULTI_THRESHOLD_STOPPING_POLICIES_TABLE} " f"WHERE id = %s", (multi_threshold_stopping_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Multi-threshold stopping policy " f"with id {multi_threshold_stopping_policy.id} " f"deleted successfully")
[docs] @staticmethod def save_multi_threshold_stopping_policy(multi_threshold_stopping_policy: MultiThresholdStoppingPolicy) \ -> Union[Any, int]: """ Saves a multi-threshold stopping policy to the metastore :param multi_threshold_stopping_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing a multi-threshold stopping policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.MULTI_THRESHOLD_STOPPING_POLICIES_TABLE) policy_json_str = json.dumps(multi_threshold_stopping_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.MULTI_THRESHOLD_STOPPING_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, multi_threshold_stopping_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("Multi-threshold policy saved successfully") return id_of_new_row
@staticmethod def _convert_training_job_record_to_dto(training_job_record) -> TrainingJobConfig: """ Converts a training job record fetched from the metastore into a DTO :param training_job_record: the record to convert :return: the DTO representing the record """ tranining_job_config_json = json.dumps(training_job_record[1], indent=4, sort_keys=True, cls=NpEncoder) training_job_config: TrainingJobConfig = TrainingJobConfig.from_dict(json.loads(tranining_job_config_json)) training_job_config.id = training_job_record[0] return training_job_config
[docs] @staticmethod def list_training_jobs() -> List[TrainingJobConfig]: """ :return: A list of training jobs in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TRAINING_JOBS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_training_job_record_to_dto(x), records))
[docs] @staticmethod def list_training_jobs_ids() -> List[Tuple[int, str, str, int]]: """ :return: A list of training job ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name,emulation_name,pid FROM " f"{constants.METADATA_STORE.TRAINING_JOBS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), str(x[2]), int(x[3])), records))
[docs] @staticmethod def get_training_job_config(id: int) -> Union[None, TrainingJobConfig]: """ Function for fetching a training job config with a given id from the metastore :param id: the id of the training job config :return: The trainign job config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TRAINING_JOBS_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_training_job_record_to_dto(training_job_record=record) return record
[docs] @staticmethod def save_training_job(training_job: TrainingJobConfig) -> Union[Any, int]: """ Saves a training job to the metastore :param training_job: the training job to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a training job in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.TRAINING_JOBS_TABLE) training_job_str = json.dumps(training_job.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) emulation_name: Union[str, None] = training_job.emulation_env_name if emulation_name == "": emulation_name = None cur.execute(f"INSERT INTO {constants.METADATA_STORE.TRAINING_JOBS_TABLE} " f"(id, config, simulation_name, emulation_name, pid) " f"VALUES (%s, %s, %s, %s, %s) RETURNING id", (id, training_job_str, training_job.simulation_env_name, emulation_name, training_job.pid)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("Training job saved successfully") return id_of_new_row
@staticmethod def _convert_data_collection_job_record_to_dto(data_collection_job_record) -> \ DataCollectionJobConfig: """ Converts a data collection job record fetched from the metastore into a DTO :param data_collection_job_record: the record to convert :return: the DTO representing the record """ data_collection_job_config_json = json.dumps(data_collection_job_record[1], indent=4, sort_keys=True, cls=NpEncoder) data_collection_job_config: DataCollectionJobConfig = \ DataCollectionJobConfig.from_dict(json.loads(data_collection_job_config_json)) data_collection_job_config.id = data_collection_job_record[0] return data_collection_job_config
[docs] @staticmethod def list_data_collection_jobs() -> List[DataCollectionJobConfig]: """ :return: A list of data collection jobs in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_data_collection_job_record_to_dto(x), records))
[docs] @staticmethod def list_data_collection_jobs_ids() -> List[Tuple[int, str, int]]: """ :return: A list of data collection job ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name,pid FROM {constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), int(x[2])), records))
[docs] @staticmethod def get_data_collection_job_config(id: int) -> Union[None, DataCollectionJobConfig]: """ Function for fetching a data collection job config with a given id from the metastore :param id: the id of the data collection job config :return: The data collection job config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_data_collection_job_record_to_dto( data_collection_job_record=record) return record
[docs] @staticmethod def save_data_collection_job(data_collection_job: DataCollectionJobConfig) -> Union[Any, int]: """ Saves a data collection job to the metastore :param data_collection_job: the data collection job to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a data collection job in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE) data_collection_job_json = json.dumps(data_collection_job.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE} " f"(id, config, emulation_name, pid) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, data_collection_job_json, data_collection_job.emulation_env_name, data_collection_job.pid)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("Data collection job saved successfully") return id_of_new_row
[docs] @staticmethod def update_training_job(training_job: TrainingJobConfig, id: int) -> None: """ Updates a training job in the metastore :param training_job: the training job to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating training job with id: {id} in the metastore") for i in range(constants.METADATA_STORE.NUM_RETRIES_UPDATE_TRAINING_JOB): try: with psycopg.connect( f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(training_job.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.TRAINING_JOBS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.TRAINING_JOBS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Training job with id: {id} updated successfully") break except Exception: pass
[docs] @staticmethod def update_experiment_execution(experiment_execution: ExperimentExecution, id: int) -> None: """ Updates an experiment execution in the metastore :param experiment_execution: the experiment execution to update :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating experiment execution with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(experiment_execution.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE} " f" SET execution=%s " f"WHERE {constants.METADATA_STORE.EXPERIMENT_EXECUTIONS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Experiment execution with id: {id} updated successfully")
[docs] @staticmethod def update_data_collection_job(data_collection_job: DataCollectionJobConfig, id: int) -> None: """ Updates a data collection job in the metastore :param training_job: the training job to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating data collection job with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(data_collection_job.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Data collection job with id: {id} updated successfully")
[docs] @staticmethod def remove_training_job(training_job: TrainingJobConfig) -> None: """ Removes a training job from the metastore :param config: the config to uninstall :return: None """ Logger.__call__().get_logger().debug(f"Removing training job with id:{training_job.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.TRAINING_JOBS_TABLE} WHERE id = %s", (training_job.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Training job with id {training_job.id} deleted successfully")
[docs] @staticmethod def remove_data_collection_job(data_collection_job: DataCollectionJobConfig) -> None: """ Removes a data collection job from the metastore :param config: the config to uninstall :return: None """ Logger.__call__().get_logger().debug(f"Removing data collection job with " f"id:{data_collection_job.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.DATA_COLLECTION_JOBS_TABLE} WHERE id = %s", (data_collection_job.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Training job with " f"id {data_collection_job.id} deleted successfully")
[docs] @staticmethod def list_ppo_policies() -> List[PPOPolicy]: """ :return: A list of PPO policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.PPO_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_ppo_policy_record_to_dto(x), records))
[docs] @staticmethod def list_ppo_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of PPO policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM {constants.METADATA_STORE.PPO_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_ppo_policy_record_to_dto(ppo_policy_record) -> PPOPolicy: """ Converts a PPO policy record fetched from the metastore into a DTO :param ppo_policy_record: the record to convert :return: the DTO representing the record """ ppo_policy_json = json.dumps(ppo_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) ppo_policy: PPOPolicy = PPOPolicy.from_dict(json.loads(ppo_policy_json)) ppo_policy.id = ppo_policy_record[0] return ppo_policy
[docs] @staticmethod def get_ppo_policy(id: int) -> Union[None, PPOPolicy]: """ Function for fetching a PPO policy with a given id from the metastore :param id: the id of the PPO policy :return: The PPO policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.PPO_POLICIES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_ppo_policy_record_to_dto(ppo_policy_record=record) return record
[docs] @staticmethod def remove_ppo_policy(ppo_policy: PPOPolicy) -> None: """ Removes a PPO policy from the metastore :param ppo_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing ppo policy with " f"id:{ppo_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.PPO_POLICIES_TABLE} WHERE id = %s", (ppo_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"PPO policy " f"with id {ppo_policy.id} deleted successfully")
[docs] @staticmethod def save_ppo_policy(ppo_policy: PPOPolicy) -> Union[Any, int]: """ Saves a PPO policy to the metastore :param ppo_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing PPO policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id(cur=cur, table_name=constants.METADATA_STORE.PPO_POLICIES_TABLE) policy_json_str = json.dumps(ppo_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.PPO_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, ppo_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("PPO policy saved successfully") return id_of_new_row
@staticmethod def _convert_system_identification_job_record_to_dto(system_identification_job_record) -> \ SystemIdentificationJobConfig: """ Converts a system identification job record fetched from the metastore into a DTO :param system_identification_job_record: the record to convert :return: the DTO representing the record """ system_identification_job_config_json = json.dumps(system_identification_job_record[1], indent=4, sort_keys=True, cls=NpEncoder) system_identification_job_config: SystemIdentificationJobConfig = \ SystemIdentificationJobConfig.from_dict(json.loads(system_identification_job_config_json)) system_identification_job_config.id = system_identification_job_record[0] return system_identification_job_config
[docs] @staticmethod def list_system_identification_jobs() -> List[SystemIdentificationJobConfig]: """ :return: A list of system identification jobs in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_system_identification_job_record_to_dto(x), records))
[docs] @staticmethod def list_system_identification_jobs_ids() -> List[Tuple[int, str, int]]: """ :return: A list of system identification job ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name,pid FROM " f"{constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), int(x[2])), records))
[docs] @staticmethod def get_system_identification_job_config(id: int) -> Union[None, SystemIdentificationJobConfig]: """ Function for fetching a system identification job config with a given id from the metastore :param id: the id of the system identification job config :return: The system identification job config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM " f"{constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_system_identification_job_record_to_dto( system_identification_job_record=record) return record
[docs] @staticmethod def save_system_identification_job(system_identification_job: SystemIdentificationJobConfig) -> Union[Any, int]: """ Saves a system_identification job to the metastore :param system_identification_job: the system identification job to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a system identification job in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE) system_identification_job_json = json.dumps(system_identification_job.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE} " f"(id, config, emulation_name, pid) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, system_identification_job_json, system_identification_job.emulation_env_name, system_identification_job.pid)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("System identification job saved successfully") return id_of_new_row
[docs] @staticmethod def update_system_identification_job(system_identification_job: SystemIdentificationJobConfig, id: int) -> None: """ Updates a system identification job in the metastore :param system_identification_job: the system identification job to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating system identification job with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(system_identification_job.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"System identification job with id: {id} updated successfully")
[docs] @staticmethod def remove_system_identification_job(system_identification_job: SystemIdentificationJobConfig) -> None: """ Removes a system identification job from the metastore :param system_identification_job: the job to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing system identification job with " f"id:{system_identification_job.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.SYSTEM_IDENTIFICATION_JOBS_TABLE} WHERE id = %s", (system_identification_job.id,)) conn.commit() Logger.__call__().get_logger().debug(f"System identification job with " f"id {system_identification_job.id} deleted successfully")
@staticmethod def _convert_gaussian_mixture_system_model_record_to_dto(gaussian_mixture_system_model_record) -> \ GaussianMixtureSystemModel: """ Converts a gaussian mixture system model record fetched from the metastore into a DTO :param gaussian_mixture_system_model_record: the record to convert :return: the DTO representing the record """ gaussian_mixture_system_model_config_json = json.dumps(gaussian_mixture_system_model_record[1], indent=4, sort_keys=True, cls=NpEncoder) gaussian_mixture_system_model_config: GaussianMixtureSystemModel = \ GaussianMixtureSystemModel.from_dict(json.loads(gaussian_mixture_system_model_config_json)) gaussian_mixture_system_model_config.id = gaussian_mixture_system_model_record[0] return gaussian_mixture_system_model_config
[docs] @staticmethod def list_gaussian_mixture_system_models() -> List[GaussianMixtureSystemModel]: """ :return: A list of gaussian mixture system models in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map( lambda x: MetastoreFacade._convert_gaussian_mixture_system_model_record_to_dto(x), records))
[docs] @staticmethod def list_gaussian_mixture_system_models_ids() -> List[Tuple[int, str, int]]: """ :return: A list of gaussian mixture system model ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name,emulation_statistic_id FROM " f"{constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), int(x[2])), records))
[docs] @staticmethod def get_gaussian_mixture_system_model_config(id: int) -> Union[None, GaussianMixtureSystemModel]: """ Function for fetching a gaussian mixture system model config with a given id from the metastore :param id: the id of the gaussian mixture system model config :return: The gaussian mixture system model config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_gaussian_mixture_system_model_record_to_dto( gaussian_mixture_system_model_record=record) return record
[docs] @staticmethod def save_gaussian_mixture_system_model(gaussian_mixture_system_model: GaussianMixtureSystemModel) \ -> Union[Any, int]: """ Saves a gaussian mixture system model to the metastore :param gaussian_mixture_system_model: the gaussian mixture system model to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a gaussian mixture system model in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE) gaussian_mixture_system_model_json = json.dumps(gaussian_mixture_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE} " f"(id, model, emulation_name, emulation_statistic_id) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, gaussian_mixture_system_model_json, gaussian_mixture_system_model.emulation_env_name, gaussian_mixture_system_model.emulation_statistic_id)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("Gaussian mixture model saved successfully") return id_of_new_row
[docs] @staticmethod def update_gaussian_mixture_system_model(gaussian_mixture_system_model: GaussianMixtureSystemModel, id: int) \ -> None: """ Updates a gaussian mixture system model in the metastore :param gaussian_mixture_system_model: the gaussian mixture system model to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating gaussian mixture system model with id: {id} " f"in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(gaussian_mixture_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Gaussian mixture system model with id: {id} " f"updated successfully")
[docs] @staticmethod def remove_gaussian_mixture_system_model(gaussian_mixture_system_model: GaussianMixtureSystemModel) -> None: """ Removes a gaussian mixture system model from the metastore :param gaussian_mixture_system_model: the to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing gaussian mixture system model with " f"id:{gaussian_mixture_system_model.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.GAUSSIAN_MIXTURE_SYSTEM_MODELS_TABLE} " f"WHERE id = %s", (gaussian_mixture_system_model.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Gaussian mixture system model with " f"id {gaussian_mixture_system_model.id} deleted successfully")
[docs] @staticmethod def list_tabular_policies() -> List[TabularPolicy]: """ :return: A list of Tabular policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TABULAR_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_tabular_policy_record_to_dto(x), records))
[docs] @staticmethod def list_tabular_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of Tabular policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM {constants.METADATA_STORE.TABULAR_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_tabular_policy_record_to_dto(tabular_policy_record) -> TabularPolicy: """ Converts a Tabular policy record fetched from the metastore into a DTO :param tabular_policy_record: the record to convert :return: the DTO representing the record """ tabular_policy_json = json.dumps(tabular_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) tabular_policy: TabularPolicy = TabularPolicy.from_dict(json.loads(tabular_policy_json)) tabular_policy.id = tabular_policy_record[0] return tabular_policy
[docs] @staticmethod def get_tabular_policy(id: int) -> Union[None, TabularPolicy]: """ Function for fetching a Tabular policy with a given id from the metastore :param id: the id of the Tabular policy :return: The Tabular policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TABULAR_POLICIES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_tabular_policy_record_to_dto(tabular_policy_record=record) return record
[docs] @staticmethod def remove_tabular_policy(tabular_policy: TabularPolicy) -> None: """ Removes a Tabular policy from the metastore :param tabular_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing tabular policy with " f"id:{tabular_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.TABULAR_POLICIES_TABLE} WHERE id = %s", (tabular_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Tabular policy " f"with id {tabular_policy.id} deleted successfully")
[docs] @staticmethod def save_tabular_policy(tabular_policy: TabularPolicy) -> Union[Any, int]: """ Saves a Tabular policy to the metastore :param tabular_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing Tabular policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.TABULAR_POLICIES_TABLE) policy_json_str = json.dumps(tabular_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.TABULAR_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, tabular_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("Tabular policy saved successfully") return id_of_new_row
[docs] @staticmethod def list_alpha_vec_policies() -> List[AlphaVectorsPolicy]: """ :return: A list of AlphaVec policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.ALPHA_VEC_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_alpha_vec_policy_record_to_dto(x), records))
[docs] @staticmethod def list_alpha_vec_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of AlphaVec policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM {constants.METADATA_STORE.ALPHA_VEC_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_alpha_vec_policy_record_to_dto(alpha_vec_policy_record) -> AlphaVectorsPolicy: """ Converts a AlphaVec policy record fetched from the metastore into a DTO :param alpha_vec_policy_record: the record to convert :return: the DTO representing the record """ alpha_vec_policy_json = json.dumps(alpha_vec_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) alpha_vec_policy: AlphaVectorsPolicy = AlphaVectorsPolicy.from_dict(json.loads(alpha_vec_policy_json)) alpha_vec_policy.id = alpha_vec_policy_record[0] return alpha_vec_policy
[docs] @staticmethod def get_alpha_vec_policy(id: int) -> Union[None, AlphaVectorsPolicy]: """ Function for fetching a AlphaVec policy with a given id from the metastore :param id: the id of the AlphaVec policy :return: The AlphaVec policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.ALPHA_VEC_POLICIES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_alpha_vec_policy_record_to_dto(alpha_vec_policy_record=record) return record
[docs] @staticmethod def remove_alpha_vec_policy(alpha_vec_policy: AlphaVectorsPolicy) -> None: """ Removes a AlphaVec policy from the metastore :param alpha_vec_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing alpha_vec policy with " f"id:{alpha_vec_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.ALPHA_VEC_POLICIES_TABLE} WHERE id = %s", (alpha_vec_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"AlphaVec policy " f"with id {alpha_vec_policy.id} deleted successfully")
[docs] @staticmethod def save_alpha_vec_policy(alpha_vec_policy: AlphaVectorsPolicy) -> Union[Any, int]: """ Saves a AlphaVec policy to the metastore :param alpha_vec_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing AlphaVec policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.ALPHA_VEC_POLICIES_TABLE) policy_json_str = json.dumps(alpha_vec_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.ALPHA_VEC_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, alpha_vec_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("AlphaVec policy saved successfully") return id_of_new_row
[docs] @staticmethod def list_dqn_policies() -> List[DQNPolicy]: """ :return: A list of DQN policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.DQN_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_dqn_policy_record_to_dto(x), records))
[docs] @staticmethod def list_dqn_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of DQN policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM {constants.METADATA_STORE.DQN_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_dqn_policy_record_to_dto(dqn_policy_record) -> DQNPolicy: """ Converts a DQN policy record fetched from the metastore into a DTO :param dqn_policy_record: the record to convert :return: the DTO representing the record """ dqn_policy_json = json.dumps(dqn_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) dqn_policy: DQNPolicy = DQNPolicy.from_dict(json.loads(dqn_policy_json)) dqn_policy.id = dqn_policy_record[0] return dqn_policy
[docs] @staticmethod def get_dqn_policy(id: int) -> Union[None, DQNPolicy]: """ Function for fetching a DQN policy with a given id from the metastore :param id: the id of the DQN policy :return: The DQN policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.DQN_POLICIES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_dqn_policy_record_to_dto(dqn_policy_record=record) return record
[docs] @staticmethod def remove_dqn_policy(dqn_policy: DQNPolicy) -> None: """ Removes a DQN policy from the metastore :param dqn_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing dqn policy with " f"id:{dqn_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.DQN_POLICIES_TABLE} WHERE id = %s", (dqn_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"DQN policy " f"with id {dqn_policy.id} deleted successfully")
[docs] @staticmethod def save_dqn_policy(dqn_policy: DQNPolicy) -> Union[Any, int]: """ Saves a DQN policy to the metastore :param dqn_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing DQN policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.DQN_POLICIES_TABLE) policy_json_str = json.dumps(dqn_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.DQN_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, dqn_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("DQN policy saved successfully") return id_of_new_row
[docs] @staticmethod def list_fnn_w_softmax_policies() -> List[FNNWithSoftmaxPolicy]: """ :return: A list of FNN with softmax policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.FNN_W_SOFTMAX_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_fnn_w_softmax_policy_record_to_dto(x), records))
[docs] @staticmethod def list_fnn_w_softmax_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of FNN with softmax policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM {constants.METADATA_STORE.FNN_W_SOFTMAX_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_fnn_w_softmax_policy_record_to_dto(fnn_w_softmax_policy_record) -> FNNWithSoftmaxPolicy: """ Converts a FNN with softmax policy record fetched from the metastore into a DTO :param fnn_w_softmax_policy_record: the record to convert :return: the DTO representing the record """ fnn_w_softmax_policy_json = json.dumps(fnn_w_softmax_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) fnn_w_softmax_policy: FNNWithSoftmaxPolicy = FNNWithSoftmaxPolicy.from_dict(json.loads( fnn_w_softmax_policy_json)) fnn_w_softmax_policy.id = fnn_w_softmax_policy_record[0] return fnn_w_softmax_policy
[docs] @staticmethod def get_fnn_w_softmax_policy(id: int) -> Union[None, FNNWithSoftmaxPolicy]: """ Function for fetching a FNN with softmax policy with a given id from the metastore :param id: the id of the FNN with softmax policy :return: The FNN with softmax policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.FNN_W_SOFTMAX_POLICIES_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_fnn_w_softmax_policy_record_to_dto( fnn_w_softmax_policy_record=record) return record
[docs] @staticmethod def remove_fnn_w_softmax_policy(fnn_w_softmax_policy: FNNWithSoftmaxPolicy) -> None: """ Removes a FNN with softmax policy from the metastore :param fnn_w_softmax_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing fnn_w_softmax policy with " f"id:{fnn_w_softmax_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.FNN_W_SOFTMAX_POLICIES_TABLE} WHERE id = %s", (fnn_w_softmax_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"FNN with softmax policy " f"with id {fnn_w_softmax_policy.id} deleted successfully")
[docs] @staticmethod def save_fnn_w_softmax_policy(fnn_w_softmax_policy: FNNWithSoftmaxPolicy) -> Union[Any, int]: """ Saves a FNN with softmax policy to the metastore :param fnn_w_softmax_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing FNN with softmax policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.FNN_W_SOFTMAX_POLICIES_TABLE) policy_json_str = json.dumps(fnn_w_softmax_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.FNN_W_SOFTMAX_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, fnn_w_softmax_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("FNN with softmax policy saved successfully") return id_of_new_row
[docs] @staticmethod def list_vector_policies() -> List[VectorPolicy]: """ :return: A list of vector policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.VECTOR_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_vector_policy_record_to_dto(x), records))
[docs] @staticmethod def list_vector_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of vector policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM {constants.METADATA_STORE.VECTOR_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_vector_policy_record_to_dto(vector_policy_record) -> VectorPolicy: """ Converts a vector policy record fetched from the metastore into a DTO :param vector_policy_record: the record to convert :return: the DTO representing the record """ vector_policy_json = json.dumps(vector_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) vector_policy: VectorPolicy = VectorPolicy.from_dict(json.loads(vector_policy_json)) vector_policy.id = vector_policy_record[0] return vector_policy
[docs] @staticmethod def get_vector_policy(id: int) -> Union[None, VectorPolicy]: """ Function for fetching a vector policy with a given id from the metastore :param id: the id of the vector policy :return: The vector policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.VECTOR_POLICIES_TABLE} WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_vector_policy_record_to_dto(vector_policy_record=record) return record
[docs] @staticmethod def remove_vector_policy(vector_policy: VectorPolicy) -> None: """ Removes a vector policy from the metastore :param vector_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing vector policy with " f"id:{vector_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.VECTOR_POLICIES_TABLE} WHERE id = %s", (vector_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"vector policy " f"with id {vector_policy.id} deleted successfully")
[docs] @staticmethod def save_vector_policy(vector_policy: VectorPolicy) -> Union[Any, int]: """ Saves a vector policy to the metastore :param vector_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing vector policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.VECTOR_POLICIES_TABLE) policy_json_str = json.dumps(vector_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.VECTOR_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, vector_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("vector policy saved successfully") return id_of_new_row
[docs] @staticmethod def list_emulation_execution_ids() -> List[Tuple[int, str]]: """ :return: A list of emulation executions in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT ip_first_octet,emulation_name FROM " f"{constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def list_emulation_executions() -> List[EmulationExecution]: """ :return: A list of emulation executions in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_execution_record_to_dto(x), records))
[docs] @staticmethod def list_emulation_executions_for_a_given_emulation(emulation_name: str) -> List[EmulationExecution]: """ :param emulation_name: the name of the emulation :return: A list of emulation executions in the metastore for a given emulation """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE} " f"WHERE emulation_name = %s", (emulation_name,)) records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_execution_record_to_dto(x), records))
[docs] @staticmethod def list_emulation_executions_by_id(id: int) -> List[EmulationExecution]: """ :param id: the first IP octet of the execution :return: A list of emulation executions in the metastore for a given emulation """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE} " f"WHERE ip_first_octet = %s", (id,)) records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_emulation_execution_record_to_dto(x), records))
@staticmethod def _convert_emulation_execution_record_to_dto(emulation_execution_record) -> EmulationExecution: """ Converts a emulation execution record fetched from the metastore into a DTO :param emulation_execution_record: the record to convert :return: the DTO representing the record """ emulation_execution_json = json.dumps(emulation_execution_record[2], indent=4, sort_keys=True, cls=NpEncoder) emulation_execution: EmulationExecution = EmulationExecution.from_dict(json.loads(emulation_execution_json)) return emulation_execution
[docs] @staticmethod def get_emulation_execution(ip_first_octet: int, emulation_name: str) -> Union[None, EmulationExecution]: """ Function for fetching a emulation execution with a given id from the metastore :param ip_first_octet: the id of the emulation execution :param emulation_name: the name of the emulation :return: The emulation execution or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE} " f"WHERE ip_first_octet = %s AND emulation_name=%s", (ip_first_octet, emulation_name)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_emulation_execution_record_to_dto( emulation_execution_record=record) return record
[docs] @staticmethod def remove_emulation_execution(emulation_execution: EmulationExecution) -> None: """ Removes a emulation execution from the metastore :param emulation_execution: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing emulation execution with " f"ip_first_octet:{emulation_execution.ip_first_octet} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE} " f"WHERE ip_first_octet = %s AND emulation_name = %s", (emulation_execution.ip_first_octet, emulation_execution.emulation_name)) conn.commit() Logger.__call__().get_logger().debug(f"emulation execution " f"with ip_first_octet {emulation_execution.ip_first_octet} " f"deleted successfully")
[docs] @staticmethod def save_emulation_execution(emulation_execution: EmulationExecution) -> Union[Any, int]: """ Saves a emulation execution to the metastore :param emulation_execution: the policy to save :return: None """ Logger.__call__().get_logger().debug("Installing emulation execution in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: emulation_execution_str = \ json.dumps(emulation_execution.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE} " f"(ip_first_octet, emulation_name, info) " f"VALUES (%s, %s, %s) RETURNING ip_first_octet", ( emulation_execution.ip_first_octet, emulation_execution.emulation_name, emulation_execution_str)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("emulation execution saved successfully") return id_of_new_row
[docs] @staticmethod def update_emulation_execution(emulation_execution: EmulationExecution, ip_first_octet: int, emulation: str) -> None: """ Updates an emulation execution in the metastore :param emulation_execution: the emulation execution to update :param ip_first_octet: the first octet of the ip of the execution :param emulation: the emulation of the execution :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating emulation execution with ip first octet: {ip_first_octet} " f"and emulation: {emulation} " f"in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(emulation_execution.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE} " f" SET info=%s " f"WHERE {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE}.ip_first_octet = %s " f"AND {constants.METADATA_STORE.EMULATION_EXECUTIONS_TABLE}.emulation_name = %s", (config_json_str, ip_first_octet, emulation)) conn.commit() Logger.__call__().get_logger().debug(f"Emulation execution with ip first octet: {ip_first_octet} " f"and emulation: {emulation} " f"updated successfully")
@staticmethod def _convert_empirical_system_model_record_to_dto(empirical_system_model_record) -> \ EmpiricalSystemModel: """ Converts a empirical system model record fetched from the metastore into a DTO :param empirical_system_model_record: the record to convert :return: the DTO representing the record """ empirical_system_model_config_json = json.dumps(empirical_system_model_record[1], indent=4, sort_keys=True, cls=NpEncoder) empirical_system_model_config: EmpiricalSystemModel = \ EmpiricalSystemModel.from_dict(json.loads(empirical_system_model_config_json)) empirical_system_model_config.id = empirical_system_model_record[0] return empirical_system_model_config
[docs] @staticmethod def list_empirical_system_models() -> List[EmpiricalSystemModel]: """ :return: A list of empirical system models in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_empirical_system_model_record_to_dto(x), records))
[docs] @staticmethod def list_empirical_system_models_ids() -> List[Tuple[int, str, int]]: """ :return: A list of empirical system model ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name,emulation_statistic_id FROM " f"{constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), int(x[2])), records))
[docs] @staticmethod def get_empirical_system_model_config(id: int) -> Union[None, EmpiricalSystemModel]: """ Function for fetching an empirical system model config with a given id from the metastore :param id: the id of the empirical system model config :return: The empirical system model config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_empirical_system_model_record_to_dto( empirical_system_model_record=record) return record
[docs] @staticmethod def save_empirical_system_model(empirical_system_model: EmpiricalSystemModel) -> Union[Any, int]: """ Saves a empirical system model to the metastore :param empirical_system_model: the empirical system model to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a empirical system model in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE) empirical_system_model_json = json.dumps(empirical_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE} " f"(id, model, emulation_name, emulation_statistic_id) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, empirical_system_model_json, empirical_system_model.emulation_env_name, empirical_system_model.emulation_statistic_id)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("empirical system model saved successfully") return id_of_new_row
[docs] @staticmethod def update_empirical_system_model(empirical_system_model: EmpiricalSystemModel, id: int) -> None: """ Updates a empirical system model in the metastore :param empirical_system_model: the empirical system model to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating empirical system model with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(empirical_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Empirical system model with id: {id} updated successfully")
[docs] @staticmethod def remove_empirical_system_model(empirical_system_model: EmpiricalSystemModel) -> None: """ Removes a empirical system model from the metastore :param empirical_system_model: the to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing empirical system model with " f"id:{empirical_system_model.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.EMPIRICAL_SYSTEM_MODELS_TABLE} WHERE id = %s", (empirical_system_model.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Empirical system model with " f"id {empirical_system_model.id} deleted successfully")
@staticmethod def _convert_mcmc_system_model_record_to_dto(mcmc_system_model_record) -> MCMCSystemModel: """ Converts a MCMC system model record fetched from the metastore into a DTO :param mcmc_system_model_record: the record to convert :return: the DTO representing the record """ mcmc_system_model_config_json = json.dumps(mcmc_system_model_record[1], indent=4, sort_keys=True, cls=NpEncoder) mcmc_system_model_config: MCMCSystemModel = MCMCSystemModel.from_dict(json.loads(mcmc_system_model_config_json)) mcmc_system_model_config.id = mcmc_system_model_record[0] return mcmc_system_model_config
[docs] @staticmethod def list_mcmc_system_models() -> List[MCMCSystemModel]: """ :return: A list of MCMC system models in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_mcmc_system_model_record_to_dto(x), records))
[docs] @staticmethod def list_mcmc_system_models_ids() -> List[Tuple[int, str, int]]: """ :return: A list of MCMC system model ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name,emulation_statistic_id FROM " f"{constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), int(x[2])), records))
[docs] @staticmethod def get_mcmc_system_model_config(id: int) -> Union[None, MCMCSystemModel]: """ Function for fetching a mcmc system model config with a given id from the metastore :param id: the id of the mcmc system model config :return: The mcmc system model config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_mcmc_system_model_record_to_dto(mcmc_system_model_record=record) return record
[docs] @staticmethod def save_mcmc_system_model(mcmc_system_model: MCMCSystemModel) -> Union[Any, int]: """ Saves a mcmc system model to the metastore :param mcmc_system_model: the mcmc system model to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a mcmc system model in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE) mcmc_system_model_json = json.dumps(mcmc_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE} " f"(id, model, emulation_name, emulation_statistic_id) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, mcmc_system_model_json, mcmc_system_model.emulation_env_name, mcmc_system_model.emulation_statistic_id)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("mcmc system model saved successfully") return id_of_new_row
[docs] @staticmethod def update_mcmc_system_model(mcmc_system_model: MCMCSystemModel, id: int) -> None: """ Updates a mcmc system model in the metastore :param mcmc_system_model: the mcmc system model to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating mcmc system model with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(mcmc_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"MCMC system model with id: {id} updated successfully")
[docs] @staticmethod def remove_mcmc_system_model(mcmc_system_model: MCMCSystemModel) -> None: """ Removes a MCMC system model from the metastore :param mcmc_system_model: the system model to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing MCMC system model with " f"id:{mcmc_system_model.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.MCMC_SYSTEM_MODELS_TABLE} WHERE id = %s", (mcmc_system_model.id,)) conn.commit() Logger.__call__().get_logger().debug(f"MCMC system model with " f"id {mcmc_system_model.id} deleted successfully")
@staticmethod def _convert_gp_system_model_record_to_dto(gp_system_model_record) -> GPSystemModel: """ Converts a gp system model record fetched from the metastore into a DTO :param gp_system_model_record: the record to convert :return: the DTO representing the record """ gp_system_model_config_json = json.dumps(gp_system_model_record[1], indent=4, sort_keys=True, cls=NpEncoder) gp_system_model_config: GPSystemModel = \ GPSystemModel.from_dict(json.loads(gp_system_model_config_json)) gp_system_model_config.id = gp_system_model_record[0] return gp_system_model_config
[docs] @staticmethod def list_gp_system_models() -> List[GPSystemModel]: """ :return: A list of gp system models in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_gp_system_model_record_to_dto(x), records))
[docs] @staticmethod def list_gp_system_models_ids() -> List[Tuple[int, str, int]]: """ :return: A list of gp system model ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,emulation_name,emulation_statistic_id FROM " f"{constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1]), int(x[2])), records))
[docs] @staticmethod def get_gp_system_model_config(id: int) -> Union[None, GPSystemModel]: """ Function for fetching a gp system model config with a given id from the metastore :param id: the id of the gp system model config :return: The gp system model config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_gp_system_model_record_to_dto( gp_system_model_record=record) return record
[docs] @staticmethod def save_gp_system_model(gp_system_model: GPSystemModel) -> Union[Any, int]: """ Saves a gp system model to the metastore :param gp_system_model: the gp system model to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a gp system model in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE) gp_system_model_json = json.dumps(gp_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE} " f"(id, model, emulation_name, emulation_statistic_id) " f"VALUES (%s, %s, %s, %s) RETURNING id", (id, gp_system_model_json, gp_system_model.emulation_env_name, gp_system_model.emulation_statistic_id)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("gp system model saved successfully") return id_of_new_row
[docs] @staticmethod def update_gp_system_model(gp_system_model: GPSystemModel, id: int) -> None: """ Updates a gp system model in the metastore :param gp_system_model: the gp system model to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating gp system model with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(gp_system_model.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"GP system model with id: {id} updated successfully")
[docs] @staticmethod def remove_gp_system_model(gp_system_model: GPSystemModel) -> None: """ Removes a gp system model from the metastore :param gp_system_model: the model to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing gp system model with " f"id:{gp_system_model.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.GP_SYSTEM_MODELS_TABLE} WHERE id = %s", (gp_system_model.id,)) conn.commit() Logger.__call__().get_logger().debug(f"GP system model with " f"id {gp_system_model.id} deleted successfully")
@staticmethod def _convert_management_user_record_to_dto(management_user_record) -> ManagementUser: """ Converts a management user record fetched from the metastore into a DTO :param management_user_record: the record to convert :return: the DTO representing the record """ management_user = ManagementUser(username=management_user_record[1], password=management_user_record[2], email=management_user_record[3], first_name=management_user_record[4], last_name=management_user_record[5], organization=management_user_record[6], admin=management_user_record[7], salt=management_user_record[8]) management_user.id = management_user_record[0] return management_user
[docs] @staticmethod def list_management_users() -> List[ManagementUser]: """ :return: A list of management users in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MANAGEMENT_USERS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_management_user_record_to_dto(x), records))
[docs] @staticmethod def list_management_users_ids() -> List[int]: """ :return: A list of management user ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id FROM " f"{constants.METADATA_STORE.MANAGEMENT_USERS_TABLE}") records = cur.fetchall() return list(map(lambda x: int(x[0]), records))
[docs] @staticmethod def get_management_user_config(id: int) -> Union[None, ManagementUser]: """ Function for fetching a management user with a given id from the metastore :param id: the id of the management user :return: The management user or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MANAGEMENT_USERS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_management_user_record_to_dto( management_user_record=record) return record
[docs] @staticmethod def save_management_user(management_user: ManagementUser) -> Union[None, int]: """ Saves a management user to the metastore :param management_user: the management user to save :return: id of the created record """ Logger.__call__().get_logger().debug("Saving a management user in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.MANAGEMENT_USERS_TABLE) cur.execute(f"INSERT INTO {constants.METADATA_STORE.MANAGEMENT_USERS_TABLE} " f"(id, username, password, email, first_name, last_name, organization, admin, salt) " f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", (id, management_user.username, management_user.password, management_user.email, management_user.first_name, management_user.last_name, management_user.organization, management_user.admin, management_user.salt)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("management user saved successfully") return id_of_new_row
[docs] @staticmethod def update_management_user(management_user: ManagementUser, id: int) -> None: """ Updates a management user in the metastore :param management_user: the management user to update :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating management user with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"UPDATE " f"{constants.METADATA_STORE.MANAGEMENT_USERS_TABLE} " f" SET username=%s, password=%s, email=%s, first_name=%s, last_name=%s, organization=%s, " f"admin=%s, salt=%s WHERE {constants.METADATA_STORE.MANAGEMENT_USERS_TABLE}.id = %s", (management_user.username, management_user.password, management_user.email, management_user.first_name, management_user.last_name, management_user.organization, management_user.admin, management_user.salt, id)) conn.commit() Logger.__call__().get_logger().debug(f"Management user with id: {id} updated successfully")
[docs] @staticmethod def remove_management_user(management_user: ManagementUser) -> None: """ Removes a management user from the metastore :param management_user: the user to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing management user with " f"id:{management_user.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.MANAGEMENT_USERS_TABLE} WHERE id = %s", (management_user.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Management user with " f"id {management_user.id} deleted successfully")
[docs] @staticmethod def get_management_user_by_username(username: str) -> Union[None, ManagementUser]: """ Function for extracting a management user account based on the username :param username: the username of the user :return: The management user or None if no user with the given username was found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.MANAGEMENT_USERS_TABLE} " f"WHERE username = %s", (username,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_management_user_record_to_dto(management_user_record=record) return record
@staticmethod def _convert_session_token_record_to_dto(session_token_record) -> SessionToken: """ Converts a session token record fetched from the metastore into a DTO :param session_token_record: the record to convert :return: the DTO representing the record """ session_token = SessionToken(token=session_token_record[0], timestamp=session_token_record[1], username=session_token_record[2]) return session_token
[docs] @staticmethod def list_session_tokens() -> List[SessionToken]: """ :return: A list of session tokens in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SESSION_TOKENS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_session_token_record_to_dto(x), records))
[docs] @staticmethod def get_session_token_metadata(token: str) -> Union[None, SessionToken]: """ Function for fetching the metadata of a given session token :param token: the token to lookup the metadata dor :return: The session token and its metadata or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SESSION_TOKENS_TABLE} " f"WHERE token = %s", (token,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_session_token_record_to_dto( session_token_record=record) return record
[docs] @staticmethod def save_session_token(session_token: SessionToken) -> Union[None, str]: """ Saves a session token to the metastore :param session_token: the session token to save :return: token of the created record """ ts = time.time() Logger.__call__().get_logger().debug("Saving a session token in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: try: cur.execute(f"INSERT INTO {constants.METADATA_STORE.SESSION_TOKENS_TABLE} " f"(token, timestamp, username) " f"VALUES (%s, %s, %s) RETURNING token", (session_token.token, ts, session_token.username)) record = cur.fetchone() token_of_new_row = None if record is not None: token_of_new_row = record[0] conn.commit() Logger.__call__().get_logger().debug("Session token saved successfully") return token_of_new_row except Exception as e: Logger.__call__().get_logger().error(f"Could not save session token to database, error: {str(e)}, " f"{repr(e)}") return None
[docs] @staticmethod def update_session_token(session_token: SessionToken, token: str) -> None: """ Updates a session token in the metastore :param session_token: the session token to update :param token: the token of the row to update :return: None """ Logger.__call__().get_logger().debug(f"Updating session token with token: {token} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"UPDATE " f"{constants.METADATA_STORE.SESSION_TOKENS_TABLE} " f" SET token=%s, timestamp=%s, username=%s " f"WHERE {constants.METADATA_STORE.SESSION_TOKENS_TABLE}.token = %s", (session_token.token, session_token.timestamp, session_token.username, token)) conn.commit() Logger.__call__().get_logger().debug(f"Session token {token} updated successfully")
[docs] @staticmethod def remove_session_token(session_token: SessionToken) -> None: """ Removes a session token from the metastore :param session_token: the session token to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing session token with " f"token:{session_token.token} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.SESSION_TOKENS_TABLE} WHERE token = %s", (session_token.token,)) conn.commit() Logger.__call__().get_logger().debug(f"Session token with " f"token {session_token.token} deleted successfully")
[docs] @staticmethod def get_session_token_by_username(username: str) -> Union[None, SessionToken]: """ Function for extracting a session token account based on the username :param username: the username of the user :return: The session token or None if no user with the given username was found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.SESSION_TOKENS_TABLE} " f"WHERE username = %s", (username,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_session_token_record_to_dto(session_token_record=record) return record
@staticmethod def _convert_traces_dataset_record_to_dto(traces_dataset_record) -> TracesDataset: """ Converts a traces dataset record fetched from the metastore into a DTO :param traces_dataset_record: the record to convert :return: the DTO representing the record """ data_schema = {} if traces_dataset_record[3] is not None and traces_dataset_record[3] != "": data_schema_json_str = json.dumps(traces_dataset_record[3], indent=4, sort_keys=True, cls=NpEncoder) data_schema = json.loads(data_schema_json_str) traces_dataset = TracesDataset(name=traces_dataset_record[1], description=traces_dataset_record[2], data_schema=data_schema, download_count=traces_dataset_record[4], file_path=traces_dataset_record[5], url=traces_dataset_record[6], date_added=traces_dataset_record[7], num_traces=traces_dataset_record[8], num_attributes_per_time_step=traces_dataset_record[9], size_in_gb=traces_dataset_record[10], compressed_size_in_gb=traces_dataset_record[11], citation=traces_dataset_record[12], num_files=traces_dataset_record[13], file_format=traces_dataset_record[14], added_by=traces_dataset_record[15], columns=traces_dataset_record[16]) traces_dataset.id = traces_dataset_record[0] return traces_dataset
[docs] @staticmethod def list_traces_datasets_ids() -> List[Tuple[int, str]]: """ :return: A list of traces datasets ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,name FROM {constants.METADATA_STORE.TRACES_DATASETS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def list_traces_datasets() -> List[TracesDataset]: """ :return: A list of traces datasets in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TRACES_DATASETS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_traces_dataset_record_to_dto(x), records))
[docs] @staticmethod def get_traces_dataset_metadata(id: int) -> Union[None, TracesDataset]: """ Function for fetching the metadata of a given dataset name :param id: the id of the dataset to get the metadata of :return: The traces dataset and its metadata or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TRACES_DATASETS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_traces_dataset_record_to_dto( traces_dataset_record=record) return record
[docs] @staticmethod def get_traces_dataset_metadata_by_name(dataset_name: str) -> Union[None, TracesDataset]: """ Function for fetching the metadata of a given dataset name :param dataset_name: the dataset name to lookup the metadata for :return: The traces dataset and its metadata or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.TRACES_DATASETS_TABLE} " f"WHERE name = %s", (dataset_name,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_traces_dataset_record_to_dto( traces_dataset_record=record) return record
[docs] @staticmethod def save_traces_dataset(traces_dataset: TracesDataset) -> Union[Any, int]: """ Saves a traces dataset to the metastore :param traces_dataset: the traces dataset to save :return: idg of the created record """ Logger.__call__().get_logger().debug("Saving a traces dataset in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: schema_json_str = "" if traces_dataset.data_schema is not None: schema_json_str = json.dumps(traces_dataset.data_schema, indent=4, sort_keys=True, cls=NpEncoder) # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.TRACES_DATASETS_TABLE) cur.execute(f"INSERT INTO {constants.METADATA_STORE.TRACES_DATASETS_TABLE} " f"(id, name, description, data_schema, download_count, " f"file_path, url, date_added, num_traces, " f"num_attributes_per_time_step, size_in_gb, compressed_size_in_gb, citation, num_files, " f"file_format, added_by, columns) " f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", (id, traces_dataset.name, traces_dataset.description, schema_json_str, traces_dataset.download_count, traces_dataset.file_path, traces_dataset.url, traces_dataset.date_added, traces_dataset.num_traces, traces_dataset.num_attributes_per_time_step, traces_dataset.size_in_gb, traces_dataset.compressed_size_in_gb, traces_dataset.citation, traces_dataset.num_files, traces_dataset.file_format, traces_dataset.added_by, traces_dataset.columns)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("traces dataset saved successfully") return id_of_new_row
[docs] @staticmethod def update_traces_dataset(traces_dataset: TracesDataset, id: int) -> None: """ Updates a traces dataset in the metastore :param traces_dataset: the traces dataset to update :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating traces dataset with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: schema_json_str = "" if traces_dataset.data_schema is not None: schema_json_str = json.dumps(traces_dataset.data_schema, indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.TRACES_DATASETS_TABLE} " f" SET name=%s, description=%s, data_schema=%s, download_count=%s, file_path=%s, " f"url=%s, date_added=%s, num_traces=%s, num_attributes_per_time_step=%s, size_in_gb=%s, " f"compressed_size_in_gb=%s, citation=%s, num_files=%s, file_format=%s, added_by=%s, " f"columns=%s " f"WHERE {constants.METADATA_STORE.TRACES_DATASETS_TABLE}.id = %s", (traces_dataset.name, traces_dataset.description, schema_json_str, traces_dataset.download_count, traces_dataset.file_path, traces_dataset.url, traces_dataset.date_added, traces_dataset.num_traces, traces_dataset.num_attributes_per_time_step, traces_dataset.size_in_gb, traces_dataset.compressed_size_in_gb, traces_dataset.citation, traces_dataset.num_files, traces_dataset.file_format, traces_dataset.added_by, traces_dataset.columns, id)) conn.commit() Logger.__call__().get_logger().debug(f"Traces dataset with {id} updated successfully")
[docs] @staticmethod def remove_traces_dataset(traces_dataset: TracesDataset) -> None: """ Removes a traces dataset from the metastore :param traces_dataset: the traces dataset to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing traces dataset with " f"id:{traces_dataset.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.TRACES_DATASETS_TABLE} WHERE id = %s", (traces_dataset.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Traces dataset with " f"id {traces_dataset.id} deleted successfully")
@staticmethod def _convert_statistics_dataset_record_to_dto(statistics_dataset_record) -> StatisticsDataset: """ Converts a statistics dataset record fetched from the metastore into a DTO :param statistics_dataset_record: the record to convert :return: the DTO representing the record """ statistics_dataset = StatisticsDataset( name=statistics_dataset_record[1], description=statistics_dataset_record[2], download_count=statistics_dataset_record[3], file_path=statistics_dataset_record[4], url=statistics_dataset_record[5], date_added=statistics_dataset_record[6], num_measurements=statistics_dataset_record[7], num_metrics=statistics_dataset_record[8], size_in_gb=statistics_dataset_record[9], compressed_size_in_gb=statistics_dataset_record[10], citation=statistics_dataset_record[11], num_files=statistics_dataset_record[12], file_format=statistics_dataset_record[13], added_by=statistics_dataset_record[14], conditions=statistics_dataset_record[15], metrics=statistics_dataset_record[16], num_conditions=statistics_dataset_record[17]) statistics_dataset.id = statistics_dataset_record[0] return statistics_dataset
[docs] @staticmethod def list_statistics_datasets_ids() -> List[Tuple[int, str]]: """ :return: A list of statistics datasets ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,name FROM {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
[docs] @staticmethod def list_statistics_datasets() -> List[StatisticsDataset]: """ :return: A list of statistics datasets in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_statistics_dataset_record_to_dto(x), records))
[docs] @staticmethod def get_statistics_dataset_metadata(id: int) -> Union[None, StatisticsDataset]: """ Function for fetching the metadata of a given dataset name :param id: the id of the dataset to get the metadata of :return: The statistics dataset and its metadata or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_statistics_dataset_record_to_dto( statistics_dataset_record=record) return record
[docs] @staticmethod def get_statistics_dataset_metadata_by_name(dataset_name: str) -> Union[None, StatisticsDataset]: """ Function for fetching the metadata of a given dataset name :param dataset_name: the dataset name to lookup the metadata for :return: The statistics dataset and its metadata or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE} " f"WHERE name = %s", (dataset_name,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_statistics_dataset_record_to_dto( statistics_dataset_record=record) return record
[docs] @staticmethod def save_statistics_dataset(statistics_dataset: StatisticsDataset) -> Union[Any, int]: """ Saves a statistics dataset to the metastore :param statistics_dataset: the statistics dataset to save :return: idg of the created record """ Logger.__call__().get_logger().debug("Saving a statistics dataset in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.STATISTICS_DATASETS_TABLE) cur.execute(f"INSERT INTO {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE} " f"(id, name, description, download_count, file_path, url, date_added, num_measurements, " f"num_metrics, size_in_gb, compressed_size_in_gb, citation, num_files, " f"file_format, added_by, conditions, metrics, num_conditions) " f"VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) " f"RETURNING id", (id, statistics_dataset.name, statistics_dataset.description, statistics_dataset.download_count, statistics_dataset.file_path, statistics_dataset.url, statistics_dataset.date_added, statistics_dataset.num_measurements, statistics_dataset.num_metrics, statistics_dataset.size_in_gb, statistics_dataset.compressed_size_in_gb, statistics_dataset.citation, statistics_dataset.num_files, statistics_dataset.file_format, statistics_dataset.added_by, statistics_dataset.conditions, statistics_dataset.metrics, statistics_dataset.num_conditions)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("statistics dataset saved successfully") return id_of_new_row
[docs] @staticmethod def update_statistics_dataset(statistics_dataset: StatisticsDataset, id: int) -> None: """ Updates a statistics dataset in the metastore :param statistics_dataset: the statistics dataset to update :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating statistics dataset with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"UPDATE " f"{constants.METADATA_STORE.STATISTICS_DATASETS_TABLE} " f" SET name=%s, description=%s, download_count=%s, file_path=%s, " f"url=%s, date_added=%s, num_measurements=%s, num_metrics=%s, size_in_gb=%s, " f"compressed_size_in_gb=%s, citation=%s, num_files=%s, file_format=%s, added_by=%s, " f"conditions=%s, metrics=%s, num_conditions=%s " f"WHERE {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE}.id = %s", (statistics_dataset.name, statistics_dataset.description, statistics_dataset.download_count, statistics_dataset.file_path, statistics_dataset.url, statistics_dataset.date_added, statistics_dataset.num_measurements, statistics_dataset.num_metrics, statistics_dataset.size_in_gb, statistics_dataset.compressed_size_in_gb, statistics_dataset.citation, statistics_dataset.num_files, statistics_dataset.file_format, statistics_dataset.added_by, statistics_dataset.conditions, statistics_dataset.metrics, statistics_dataset.num_conditions, id)) conn.commit() Logger.__call__().get_logger().debug(f"Statistics dataset with {id} updated successfully")
[docs] @staticmethod def remove_statistics_dataset(statistics_dataset: StatisticsDataset) -> None: """ Removes a statistics dataset from the metastore :param statistics_dataset: the statistic dataset to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing statistics dataset with " f"id:{statistics_dataset.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.STATISTICS_DATASETS_TABLE} WHERE id = %s", (statistics_dataset.id,)) conn.commit() Logger.__call__().get_logger().debug("Statistics dataset with " f"id {statistics_dataset.id} deleted successfully")
@staticmethod def _convert_config_record_to_dto(config_record) -> Config: """ Converts a config record fetched from the metastore into a DTO :param config_record: the record to convert :return: the DTO representing the record """ config_json = json.dumps(config_record[1], indent=4, sort_keys=True, cls=NpEncoder) config: Config = Config.from_dict(json.loads(config_json)) config.id = config_record[0] return config
[docs] @staticmethod def list_configs() -> List[Config]: """ :return: A list of configs in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.CONFIG_TABLE}") records = cur.fetchall() return list(map(lambda x: MetastoreFacade._convert_config_record_to_dto(x), records))
[docs] @staticmethod def list_config_ids() -> List[int]: """ :return: A list of config ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id FROM {constants.METADATA_STORE.CONFIG_TABLE}") records = cur.fetchall() return list(map(lambda x: int(x[0]), records))
[docs] @staticmethod def get_config(id: int) -> Union[None, Config]: """ Function for fetching the config with a given id from the metastore :param id: the id of the config :return: The config or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.CONFIG_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_config_record_to_dto(config_record=record) return record
[docs] @staticmethod def save_config(config: Config) -> Union[Any, int]: """ Saves a config to the metastore :param config: the config to save :return: id of the config """ Logger.__call__().get_logger().info("Updating the configuration in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json = json.dumps(config.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.CONFIG_TABLE} " f"(id, config) " f"VALUES (%s, %s) RETURNING id", (1, config_json,)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("config saved successfully") return id_of_new_row
[docs] @staticmethod def update_config(config: Config, id: int) -> None: """ Updates a config in the metastore :param config: the config to save :param id: the id of the row to update :return: id of the created record """ Logger.__call__().get_logger().debug(f"Updating config with id: {id} in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: config_json_str = json.dumps(config.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"UPDATE " f"{constants.METADATA_STORE.CONFIG_TABLE} " f" SET config=%s " f"WHERE {constants.METADATA_STORE.CONFIG_TABLE}.id = %s", (config_json_str, id)) conn.commit() Logger.__call__().get_logger().debug(f"Config with id: {id} updated successfully")
[docs] @staticmethod def remove_config(config: Config) -> None: """ Removes a config from the metastore :param config: the config to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing config with " f"id:{config.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.CONFIG_TABLE} WHERE id = %s", (config.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Config with " f"id {config.id} deleted successfully")
[docs] @staticmethod def list_linear_threshold_stopping_policies() -> List[LinearThresholdStoppingPolicy]: """ :return: A list of Linear-threshold stopping policies in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.LINEAR_THRESHOLD_STOPPING_POLICIES_TABLE}") records = cur.fetchall() return list(map( lambda x: MetastoreFacade._convert_linear_threshold_stopping_policy_record_to_dto(x), records))
[docs] @staticmethod def list_linear_threshold_stopping_policies_ids() -> List[Tuple[int, str]]: """ :return: A list of Linear-threshold stopping policies ids in the metastore """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT id,simulation_name FROM " f"{constants.METADATA_STORE.LINEAR_THRESHOLD_STOPPING_POLICIES_TABLE}") records = cur.fetchall() return list(map(lambda x: (int(x[0]), str(x[1])), records))
@staticmethod def _convert_linear_threshold_stopping_policy_record_to_dto(linear_threshold_stopping_policy_record) \ -> LinearThresholdStoppingPolicy: """ Converts a Linear-threshold stopping policy record fetched from the metastore into a DTO :param linear_threshold_stopping_policy_record: the record to convert :return: the DTO representing the record """ linear_threshold_stopping_policy_json = json.dumps(linear_threshold_stopping_policy_record[1], indent=4, sort_keys=True, cls=NpEncoder) linear_threshold_stopping_policy: LinearThresholdStoppingPolicy = LinearThresholdStoppingPolicy.from_dict( json.loads(linear_threshold_stopping_policy_json)) linear_threshold_stopping_policy.id = linear_threshold_stopping_policy_record[0] return linear_threshold_stopping_policy
[docs] @staticmethod def get_linear_threshold_stopping_policy(id: int) -> Union[None, LinearThresholdStoppingPolicy]: """ Function for fetching a mult-threshold policy with a given id from the metastore :param id: the id of the linear-threshold policy :return: The mult-threshold policy or None if it could not be found """ with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"SELECT * FROM {constants.METADATA_STORE.LINEAR_THRESHOLD_STOPPING_POLICIES_TABLE} " f"WHERE id = %s", (id,)) record = cur.fetchone() if record is not None: record = MetastoreFacade._convert_linear_threshold_stopping_policy_record_to_dto( linear_threshold_stopping_policy_record=record) return record
[docs] @staticmethod def remove_linear_threshold_stopping_policy( linear_threshold_stopping_policy: LinearThresholdStoppingPolicy) -> None: """ Removes a linear-threshold stopping policy from the metastore :param linear_threshold_stopping_policy: the policy to remove :return: None """ Logger.__call__().get_logger().debug(f"Removing Linear-threshold stopping policy with " f"id:{linear_threshold_stopping_policy.id} from the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: cur.execute(f"DELETE FROM {constants.METADATA_STORE.LINEAR_THRESHOLD_STOPPING_POLICIES_TABLE} " f"WHERE id = %s", (linear_threshold_stopping_policy.id,)) conn.commit() Logger.__call__().get_logger().debug(f"Linear-threshold stopping policy " f"with id {linear_threshold_stopping_policy.id} " f"deleted successfully")
[docs] @staticmethod def save_linear_threshold_stopping_policy(linear_threshold_stopping_policy: LinearThresholdStoppingPolicy) \ -> Union[Any, int]: """ Saves a linear-threshold stopping policy to the metastore :param linear_threshold_stopping_policy: the policy to save :return: id of the created record """ Logger.__call__().get_logger().debug("Installing a linear-threshold stopping policy in the metastore") with psycopg.connect(f"{constants.METADATA_STORE.DB_NAME_PROPERTY}={constants.METADATA_STORE.DBNAME} " f"{constants.METADATA_STORE.USER_PROPERTY}={constants.METADATA_STORE.USER} " f"{constants.METADATA_STORE.PW_PROPERTY}={constants.METADATA_STORE.PASSWORD} " f"{constants.METADATA_STORE.HOST_PROPERTY}={constants.METADATA_STORE.HOST}") as conn: with conn.cursor() as cur: # Need to manually set the ID since CITUS does not handle serial columns # on distributed tables properly id = GeneralUtil.get_latest_table_id( cur=cur, table_name=constants.METADATA_STORE.LINEAR_THRESHOLD_STOPPING_POLICIES_TABLE) policy_json_str = json.dumps(linear_threshold_stopping_policy.to_dict(), indent=4, sort_keys=True, cls=NpEncoder) cur.execute(f"INSERT INTO {constants.METADATA_STORE.LINEAR_THRESHOLD_STOPPING_POLICIES_TABLE} " f"(id, policy, simulation_name) " f"VALUES (%s, %s, %s) RETURNING id", (id, policy_json_str, linear_threshold_stopping_policy.simulation_name)) record = cur.fetchone() id_of_new_row = None if record is not None: id_of_new_row = int(record[0]) conn.commit() Logger.__call__().get_logger().debug("Linear-threshold policy saved successfully") return id_of_new_row