Source code for csle_system_identification.job_controllers.data_collection_job_manager

import subprocess
import csle_common.constants.constants as constants
from csle_common.dao.jobs.data_collection_job_config import DataCollectionJobConfig
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_common.controllers.container_controller import ContainerController
from csle_system_identification.emulator import Emulator


[docs]class DataCollectionJobManager: """ Class that manages data collection jobs in CSLE """
[docs] @staticmethod def run_data_collection_job(job_config: DataCollectionJobConfig) -> None: """ Runs a given data collection job :param job_config: the configuration of the job :return: None """ emulation_env_config = MetastoreFacade.get_emulation_by_name("csle-level9-030") assert emulation_env_config is not None assert ContainerController.is_emulation_running(emulation_env_config=emulation_env_config) is True em_statistic = MetastoreFacade.get_emulation_statistic(id=job_config.emulation_statistic_id) Emulator.run_action_sequences( emulation_env_config=emulation_env_config, attacker_sequence=job_config.attacker_sequence, defender_sequence=job_config.defender_sequence, repeat_times=job_config.repeat_times, sleep_time=emulation_env_config.kafka_config.time_step_len_seconds, descr=job_config.descr, emulation_statistics=em_statistic, data_collection_job=job_config, save_emulation_traces_every=job_config.save_emulation_traces_every, emulation_traces_to_save_with_data_collection_job=job_config.num_cached_traces)
[docs] @staticmethod def start_data_collection_job_in_background(data_collection_job: DataCollectionJobConfig) \ -> None: """ Starts a system identification job with a given configuration in the background :param data_collection_job: the job configuration :return: None """ cmd = constants.COMMANDS.START_SYSTEM_IDENTIFICATION_JOB.format(data_collection_job.id) p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate()