A framework for building self-learning security systems.

What is CSLE?

CSLE is a framework for evaluating and developing reinforcement learning agents for control problems in cyber security. Everything from network emulation, to simulation, and learning in CSLE have been co-designed to provide an environment where it is possible to train and evaluate reinforcement learning agents for practical cyber security tasks. CSLE is an opinionated framework, which is based on a specific method for learning and evaluating security strategies for a given IT infrastructure (see Fig. 1).

Figure 1: The method used to automatically find effective security strategies in CSLE.

This method includes two systems: an emulation system and a simulation system. The emulation system closely approximates the functionality of the target infrastructure and is used to run attack scenarios and defender responses. Such runs produce system measurements and logs, from which infrastructure statistics are estimated, which then are used to instantiate Markov decision processes (MDPs).

The simulation system is used to simulate the instantiated MDPs and to learn security strategies through reinforcement learning. Learned strategies are extracted from the simulation system and evaluated in the emulation system.

Three benefits of this method are: (i) that the emulation system provides a realistic environment to evaluate strategies; (ii) that the emulation system allows evaluating strategies without affecting operational workflows on the target infrastructure; and (iii) that the simulation system enables efficient and rapid learning of strategies.


Why CSLE?

As the ubiquity and evolving nature of cyber attacks is of growing concern to society, automation of security processes and functions has been recognized as an important part of the response to this threat. A promising approach to achieve this automation is reinforcement learning, which has proven effective in finding near-optimal solutions to control problems in several domains (e.g., robotics and industry automation), and is actively investigated as an approach to automated security (see survey). While encouraging results have been obtained in this line of research, key challenges remain. Chief among them is narrowing the gap between the environment where the reinforcement learning agents are evaluated and a scenario playing out in a real system. Most of the results obtained so far are limited to simulation environments, and it is not clear how they generalize to practical IT infrastructures. Another limitation of prior research is the lack of common benchmarks and toolsets.

CSLE was developed to address precisely the above limitations. By using high-fidelity emulations, it narrows the gap between the evaluation environment and a real system, and by being open-source, it provides a foundation for further research to build on.

Recently, efforts to build similar frameworks as CSLE has started (see survey). Most notably, there is CyberBattleSim by Microsoft, CyBorg by the Australian department of defence, Yawning Titan by the UK Defence Science and Technology Laboratory (DSTL), and FARLAND , which is developed at USA's National Security Agency (NSA). Some of these frameworks only include simulation components and some of them include both simulation and emulation components. In contrast to these frameworks, CSLE is fully open-source, includes both a simulation component and an emulation component, and has demonstrated the capabilitiy to learn near-optimal defender strategies on specific use cases (see publications).


Installation & Usage

CSLE is available for download on Github The python APIs are available on PyPi and can be installed using Pip. The Docker containers are available on Docker hub . Detailed installation instructions can be found in the documentation.

Examples of the four main usages of CSLE are given below: (1) data collection; (2) system identification; (3) strategy training; and (4), strategy evaluation.

You can find additional examples in the documentation. and a video demonstration is available on Youtube.

                # Imports
                import csle_common.constants.constants as constants
                from csle_common.dao.training.experiment_config import ExperimentConfig
                from csle_common.metastore.metastore_facade import MetastoreFacade
                from csle_common.dao.training.agent_type import AgentType
                from csle_common.dao.training.hparam import HParam
                from csle_common.dao.training.player_type import PlayerType
                from csle_agents.agents.ppo.ppo_agent import PPOAgent
                import csle_agents.constants.constants as agents_constants
                from csle_common.dao.training.tabular_policy import TabularPolicy

                # Select emulation configuration from the metastore
                emulation_env_config = MetastoreFacade.get_emulation_by_name("csle-level9-010")

                # Select simulation configuration from the metastore
                simulation_env_config = MetastoreFacade.get_simulation_by_name(
                                                   "csle-stopping-pomdp-defender-010")

                # Setup the reinforcement learning experiment
                experiment_config = ExperimentConfig(
                                  output_dir=f"{constants.LOGGING.DEFAULT_LOG_DIR}ppo_test",
                                  title="PPO test",
                                  random_seeds=[399, 98912, 999], agent_type=AgentType.PPO,
                                  log_every=1, hparams={..},
                                  player_type=PlayerType.DEFENDER, player_idx=0)
                agent = PPOAgent(emulation_env_config=emulation_env_config,
                simulation_env_config=simulation_env_config,
                experiment_config=experiment_config)

                # Run the PPO algorithm to learn defender policies
                experiment_execution = agent.train()

                # Save the experiment results and the learned policies
                MetastoreFacade.save_experiment_execution(experiment_execution)
                for policy in experiment_execution.result.policies.values():
                    MetastoreFacade.save_ppo_policy(ppo_policy=policy)
                
                # Imports
                import csle_common.constants.constants as constants
                from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
                from csle_common.dao.emulation_action.defender.emulation_defender_action import EmulationDefenderAction
                from csle_common.dao.emulation_action.attacker.emulation_attacker_stopping_actions \
                import EmulationAttackerStoppingActions
                from csle_common.dao.emulation_action.defender.emulation_defender_stopping_actions \
                import EmulationDefenderStoppingActions
                from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
                from csle_common.metastore.metastore_facade import MetastoreFacade
                from csle_common.controllers.container_controller import ContainerController
                from csle_system_identification.emulator import Emulator

                # Select an emulation execution
                executions = MetastoreFacade.list_emulation_executions_for_a_given_emulation(
                                            emulation_name="csle-level9-030")
                emulation_env_config = executions[0].emulation_env_config

                # Define attacker and defender sequences for the traces
                attacker_sequence = ...
                defender_sequence = ...

                # Run the sequences
                Emulator.run_action_sequences(
                        emulation_env_config=emulation_env_config,
                        attacker_sequence=attacker_sequence,
                        defender_sequence=defender_sequence, repeat_times=5000,
                        sleep_time=15,
                        descr="Intrusion data collected against novice attacker",
                        save_emulation_traces_every=1,
                        intrusion_start_p=0.2,
                        intrusion_continue=1)

                # Extract recorded traces and statistics
                statistics = MetastoreFacade.list_emulation_statistics()
                traces = MetastoreFacade.list_emulation_traces()
                
                # Imports
                import gymnasium as gym
                import csle_common.constants.constants as constants
                from csle_common.metastore.metastore_facade import MetastoreFacade
                from csle_common.dao.training.multi_threshold_stopping_policy import MultiThresholdStoppingPolicy
                from gym_csle_stopping_game.envs.stopping_game_pomdp_defender_env import StoppingGamePomdpDefenderEnv
                from csle_common.dao.training.player_type import PlayerType
                from csle_common.dao.training.agent_type import AgentType

                # Select emulation to be used as evaluation environment
                emulation_env_config = MetastoreFacade.get_emulation_by_name("..")

                # Select simulation environment
                simulation_env_config = MetastoreFacade.get_simulation_by_name("..")
                config = simulation_env_config.simulation_env_input_config
                env = gym.make(simulation_env_config.gym_env_name, config=config)

                # Define the security policy to evaluate
                tspsa_policy = MultiThresholdStoppingPolicy(..)

                # Perform the evaluation
                StoppingGamePomdpDefenderEnv.emulation_evaluation(
                           env=env, n_episodes=10, intrusion_seq=[..],
                           defender_policy=tspsa_policy,
                           emulation_env_config=emulation_env_config,
                           simulation_env_config=simulation_env_config)
                
                # Imports
                import csle_common.constants.constants as constants
                from csle_common.dao.system_identification.system_identification_config import SystemIdentificationConfig
                from csle_common.metastore.metastore_facade import MetastoreFacade
                from csle_common.dao.system_identification.system_model_type import SystemModelType
                from csle_common.dao.training.hparam import HParam
                from csle_system_identification.expectation_maximization.expectation_maximization_algorithm \
                import ExpectationMaximizationAlgorithm
                import csle_system_identification.constants.constants as system_identification_constants

                # Select emulation configuration from metastore
                emulation_env_config = MetastoreFacade.get_emulation_by_name("csle-level9-030")

                # Select emulation statistic (input data) from metastore
                emulation_statistic = MetastoreFacade.get_emulation_statistic(id=1)

                # Setup the system identification algorithm
                system_identifcation_config = SystemIdentificationConfig(
                                                 output_dir="..",
                                                 title="Expectation-Maximization level 9 test",
                                                 model_type=SystemModelType.GAUSSIAN_MIXTURE,
                                                 log_every=1,
                                                 hparams={..})
                algorithm = ExpectationMaximizationAlgorithm(
                                emulation_env_config=emulation_env_config,
                                emulation_statistics=emulation_statistic,
                                system_identification_config=system_identifcation_config)

                # Run the algorithm
                system_model = algorithm.fit()

                # Save the result to the metastore
                MetastoreFacade.save_gaussian_mixture_system_model(gaussian_mixture_system_model=system_model)
                

Management System

The management system is the central component of CLSE and manages the overall execution of the framework. It is a distributed system that consist of N>=1 physical servers connected through an IP network. One of the servers is designated to be the "leader" and the other servers are "workers". The management system can be used to monitor emulations in real-time, to start or stop services, to monitor reinforcement learning workloads, to access terminals of emulated components, and to examine security policies.

Emulation System

The emulation system allows emulating large scale IT infrastructures and network traffic, i.e client traffic, cyber attacks, and automated defenses. It executes on a cluster of machines that runs a virtualization layer provided by Docker containers and virtual links. It implements network isolation and traffic shaping on the containers using network namespaces and the NetEm module in the Linux kernel. Resource constraints of the containers, e.g., CPU and memory constraints, are enforced using cgroups.

Simulation System

The simulation system of CSLE allows running reinforcement learning and optimization algorithms to learn security strategies. Formally, we model the interaction between an attacker and a defender as a Markov game. We then use simulations of self-play where autonomous agents interact and continuously update their strategies based on experience from previously played games. To automatically update strategies in the game, several methods can be used, including computational game theory, dynamic programming, evolutionary algorithms, and reinforcement learning.