import logging
from typing import List
import grpc
import time
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.snort_managers_info import SnortIdsManagersInfo
import csle_common.constants.constants as constants
import csle_collector.constants.constants as csle_collector_constants
import csle_collector.snort_ids_manager.snort_ids_manager_pb2_grpc
import csle_collector.snort_ids_manager.snort_ids_manager_pb2
import csle_collector.snort_ids_manager.query_snort_ids_manager
import csle_collector.snort_ids_manager.snort_ids_manager_util
from csle_common.util.emulation_util import EmulationUtil
[docs]class SnortIDSController:
"""
Class managing operations related to Snort IDS Managers
"""
[docs] @staticmethod
def start_snort_idses(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility function for starting the Snort IDSes
:param emulation_env_config: the emulation env configuration
:param physical_server_ip: the ip of the phsyical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
logger.info(f"Starting the Snort IDS on IP: {c.docker_gw_bridge_ip}")
SnortIDSController.start_snort_ids(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_snort_idses(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility function for stopping the Snort IDSes
:param emulation_env_config: the emulation env configuration
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
SnortIDSController.stop_snort_ids(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_snort_ids(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility function for starting the Snort IDS on a specific IP
:param emulation_env_config: the emulation env configuration
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
SnortIDSController.start_snort_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
ids_monitor_dto = SnortIDSController.get_snort_idses_monitor_threads_statuses_by_ip_and_port(
port=emulation_env_config.snort_ids_manager_config.snort_ids_manager_port, ip=ip)
if not ids_monitor_dto.snort_ids_running:
logger.info(f"Snort IDS is not running on {ip}, starting it.")
subnetmask = f"{emulation_env_config.execution_id}.{emulation_env_config.level}" \
f"{constants.CSLE.CSLE_LEVEL_SUBNETMASK_SUFFIX}"
ingress_interface = constants.NETWORKING.ETH2
egress_interface = constants.NETWORKING.ETH0
logger.info(f"Subnetmask: {subnetmask}, ingress interface: {ingress_interface}, "
f"egress interface: {egress_interface}")
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.snort_ids_manager_config.snort_ids_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.snort_ids_manager.snort_ids_manager_pb2_grpc.SnortIdsManagerStub(channel)
csle_collector.snort_ids_manager.query_snort_ids_manager.start_snort_ids(
stub=stub, ingress_interface=ingress_interface, egress_interface=egress_interface,
subnetmask=subnetmask)
[docs] @staticmethod
def stop_snort_ids(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility function for stopping the Snort IDS on a specific IP
:param emulation_env_config: the emulation env configuration
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
SnortIDSController.start_snort_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
ids_monitor_dto = SnortIDSController.get_snort_idses_monitor_threads_statuses_by_ip_and_port(
port=emulation_env_config.snort_ids_manager_config.snort_ids_manager_port, ip=ip)
if ids_monitor_dto.snort_ids_running:
logger.info(f"Snort IDS is running on {ip}, stopping it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.snort_ids_manager_config.snort_ids_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.snort_ids_manager.snort_ids_manager_pb2_grpc.SnortIdsManagerStub(channel)
csle_collector.snort_ids_manager.query_snort_ids_manager.stop_snort_ids(stub=stub)
[docs] @staticmethod
def start_snort_managers(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility function for starting snort IDS managers
:param emulation_env_config: the emulation env config
:param physical_server_ip: the physical server ip
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
SnortIDSController.start_snort_manager(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_snort_manager(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility function for starting the snort IDS manager on a specific IP
:param emulation_env_config: the emulation env config
:param ip: IP of the container
:param logger: the logger to use for logging
:return: None
"""
# Connect
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip)
# Check if ids_manager is already running
cmd = (constants.COMMANDS.PS_AUX + " | " + constants.COMMANDS.GREP + constants.COMMANDS.SPACE_DELIM +
constants.TRAFFIC_COMMANDS.SNORT_IDS_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
if constants.COMMANDS.SEARCH_SNORT_IDS_MANAGER not in str(o):
logger.info(f"Starting Snort IDS manager on node {ip}")
# Stop old background job if running
cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.SNORT_IDS_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
# Start the ids_manager
cmd = constants.COMMANDS.START_SNORT_IDS_MANAGER.format(
emulation_env_config.snort_ids_manager_config.snort_ids_manager_port,
emulation_env_config.snort_ids_manager_config.snort_ids_manager_log_dir,
emulation_env_config.snort_ids_manager_config.snort_ids_manager_log_file,
emulation_env_config.snort_ids_manager_config.snort_ids_manager_max_workers)
o, e, _ = EmulationUtil.execute_ssh_cmd(
cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
time.sleep(2)
[docs] @staticmethod
def stop_snort_managers(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
Utility function for stopping snort IDS managers
:param emulation_env_config: the emulation env config
:param physical_server_ip: the physical server ip
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
SnortIDSController.stop_snort_manager(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_snort_manager(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility function for stopping the snort IDS manager on a specific IP
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
# Connect
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip)
logger.info(f"Stopping Snort IDS manager on node {ip}")
cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.SNORT_IDS_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd, conn=emulation_env_config.get_connection(ip=ip))
time.sleep(2)
[docs] @staticmethod
def start_snort_idses_monitor_threads(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the SnortIDSManager on every container that runs
an IDS to start the IDS manager and the monitor thread
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
logger.info(f"Starting Snort IDS monitor thread on IP: {c.docker_gw_bridge_ip}")
SnortIDSController.start_snort_idses_monitor_thread(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_snort_idses_monitor_thread(emulation_env_config: EmulationEnvConfig, ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the SnortIDSManager on a specific container that runs
an IDS to start the IDS manager and the monitor thread
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
SnortIDSController.start_snort_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
ids_monitor_dto = SnortIDSController.get_snort_idses_monitor_threads_statuses_by_ip_and_port(
port=emulation_env_config.snort_ids_manager_config.snort_ids_manager_port, ip=ip)
if not ids_monitor_dto.monitor_running:
logger.info(f"Snort IDS monitor thread is not running on {ip}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.snort_ids_manager_config.snort_ids_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.snort_ids_manager.snort_ids_manager_pb2_grpc.SnortIdsManagerStub(channel)
csle_collector.snort_ids_manager.query_snort_ids_manager.start_snort_ids_monitor(
stub=stub, kafka_ip=emulation_env_config.kafka_config.container.get_ips()[0],
kafka_port=emulation_env_config.kafka_config.kafka_port,
log_file_path=csle_collector_constants.SNORT_IDS_ROUTER.SNORT_FAST_LOG_FILE,
time_step_len_seconds=emulation_env_config.snort_ids_manager_config.time_step_len_seconds)
[docs] @staticmethod
def stop_snort_idses_monitor_threads(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the SnortIDSManager on every container that runs
an IDS to stop the monitor threads
:param emulation_env_config: the emulation env config
:param physical_server_ip: the physical server ip
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
SnortIDSController.stop_snort_idses_monitor_thread(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_snort_idses_monitor_thread(emulation_env_config: EmulationEnvConfig, ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the SnortIDSManager on a specific container that runs
an IDS to stop the monitor threads
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
SnortIDSController.start_snort_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.snort_ids_manager_config.snort_ids_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.snort_ids_manager.snort_ids_manager_pb2_grpc.SnortIdsManagerStub(channel)
logger.info(f"Stopping the Snort IDS monitor thread on {ip}.")
csle_collector.snort_ids_manager.query_snort_ids_manager.stop_snort_ids_monitor(stub=stub)
[docs] @staticmethod
def get_snort_idses_monitor_threads_statuses(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger, start_if_stopped: bool = True) -> \
List[csle_collector.snort_ids_manager.snort_ids_manager_pb2.SnortIdsMonitorDTO]:
"""
A method that sends a request to the SnortIDSManager on every container to get the status of the
IDS monitor thread
:param emulation_env_config: the emulation config
:param start_if_stopped: whether to start the IDS monitor if it is stopped
:param physical_server_ip: the physical server IP
:param logger: the logger to use for logging
:return: List of monitor thread statuses
"""
statuses = []
if start_if_stopped:
SnortIDSController.start_snort_managers(emulation_env_config=emulation_env_config,
physical_server_ip=physical_server_ip,
logger=logger)
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
status = SnortIDSController.get_snort_idses_monitor_threads_statuses_by_ip_and_port(
port=emulation_env_config.snort_ids_manager_config.snort_ids_manager_port,
ip=c.docker_gw_bridge_ip)
statuses.append(status)
return statuses
[docs] @staticmethod
def get_snort_idses_monitor_threads_statuses_by_ip_and_port(port: int, ip: str) \
-> csle_collector.snort_ids_manager.snort_ids_manager_pb2.SnortIdsMonitorDTO:
"""
A method that sends a request to the SnortIDSManager with a specific port and ip
to get the status of the IDS monitor thread
:param port: the port of the SnortIDSManager
:param ip: the ip of the SnortIDSManager
:return: the status of the SnortIDSManager
"""
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.snort_ids_manager.snort_ids_manager_pb2_grpc.SnortIdsManagerStub(channel)
status = \
csle_collector.snort_ids_manager.query_snort_ids_manager.get_snort_ids_monitor_status(
stub=stub)
return status
[docs] @staticmethod
def get_snort_ids_managers_ips(emulation_env_config: EmulationEnvConfig) -> List[str]:
"""
A method that extracts the IPs of the snort IDS managers in a given emulation
:param emulation_env_config: the emulation env config
:return: the list of IP addresses
"""
ips = []
for c in emulation_env_config.containers_config.containers:
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
ips.append(c.docker_gw_bridge_ip)
return ips
[docs] @staticmethod
def get_snort_idses_managers_ports(emulation_env_config: EmulationEnvConfig) -> List[int]:
"""
A method that extracts the ports of the snort IDS managers in a given emulation
:param emulation_env_config: the emulation env config
:return: the list of ports
"""
ports = []
for c in emulation_env_config.containers_config.containers:
for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES:
if ids_image in c.name:
ports.append(emulation_env_config.snort_ids_manager_config.snort_ids_manager_port)
return ports
[docs] @staticmethod
def get_snort_managers_info(emulation_env_config: EmulationEnvConfig, active_ips: List[str],
logger: logging.Logger, physical_server_ip: str) -> SnortIdsManagersInfo:
"""
Extracts the information of the Snort managers for a given emulation
:param emulation_env_config: the configuration of the emulation
:param active_ips: list of active IPs
:param physical_server_ip: the IP of the physical server
:param logger: the logger to use for logging
:return: a DTO with the status of the Snort managers
"""
snort_ids_managers_ips = SnortIDSController.get_snort_ids_managers_ips(
emulation_env_config=emulation_env_config)
snort_ids_managers_ports = \
SnortIDSController.get_snort_idses_managers_ports(emulation_env_config=emulation_env_config)
snort_managers_statuses = []
snort_managers_running = []
for ip in snort_ids_managers_ips:
if ip not in active_ips or not EmulationUtil.physical_ip_match(
emulation_env_config=emulation_env_config, ip=ip, physical_host_ip=physical_server_ip):
continue
running = False
status = None
try:
status = SnortIDSController.get_snort_idses_monitor_threads_statuses_by_ip_and_port(
port=emulation_env_config.snort_ids_manager_config.snort_ids_manager_port, ip=ip)
running = True
except Exception as e:
logger.debug(
f"Could not fetch Snort IDS manager status on IP:{ip}, error: {str(e)}, {repr(e)}")
if status is not None:
snort_managers_statuses.append(status)
else:
util = csle_collector.snort_ids_manager.snort_ids_manager_util.SnortIdsManagerUtil
snort_managers_statuses.append(util.snort_ids_monitor_dto_empty())
snort_managers_running.append(running)
execution_id = emulation_env_config.execution_id
emulation_name = emulation_env_config.name
snort_manager_info_dto = SnortIdsManagersInfo(
snort_ids_managers_running=snort_managers_running, ips=snort_ids_managers_ips,
ports=snort_ids_managers_ports, execution_id=execution_id, emulation_name=emulation_name,
snort_ids_managers_statuses=snort_managers_statuses)
return snort_manager_info_dto