import logging
from typing import List, Tuple
import grpc
import time
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.host_managers_info import HostManagersInfo
from csle_common.dao.emulation_config.node_container_config import NodeContainerConfig
import csle_common.constants.constants as constants
import csle_collector.host_manager.host_manager_pb2_grpc
import csle_collector.host_manager.host_manager_pb2
import csle_collector.host_manager.query_host_manager
import csle_collector.host_manager.host_manager_util
from csle_common.util.emulation_util import EmulationUtil
from csle_common.logging.log import Logger
[docs]class HostController:
"""
Class controlling host managers and host specific configurations
"""
[docs] @staticmethod
def start_host_managers(emulation_env_config: EmulationEnvConfig, logger: logging.Logger) -> None:
"""
Utility method for checking if the host manager is running and starting it if it is not running
:param emulation_env_config: the emulation env config
:param logger: the logger to use for logging
:return: None
"""
# Start host managers on emulation containers
for c in emulation_env_config.containers_config.containers:
# Connect
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=c.docker_gw_bridge_ip,
logger=logger)
# Start host manager on kafka container
HostController.start_host_manager(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
logger=logger)
# Start host manager on ELK container
HostController.start_host_manager(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.sdn_controller_config is not None:
# Start host manager on SDN controller container
HostController.start_host_manager(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_host_manager(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
Utility method for starting the host manager on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:param logger: the logger to use for logging
:return: None
"""
# Connect
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip)
# Check if host_manager is already running
cmd = (constants.COMMANDS.PS_AUX + " | " + constants.COMMANDS.GREP +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.HOST_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd,
conn=emulation_env_config.get_connection(ip=ip))
if constants.COMMANDS.SEARCH_HOST_MANAGER not in str(o):
logger.info(f"Host manager is not running on: {ip}, starting it. Output of {cmd} was: {str(o)}, "
f"err output was: {str(e)}")
# Stop old background job if running
cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.HOST_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd,
conn=emulation_env_config.get_connection(ip=ip))
# Start the host_manager
cmd = constants.COMMANDS.START_HOST_MANAGER.format(
emulation_env_config.host_manager_config.host_manager_port,
emulation_env_config.host_manager_config.host_manager_log_dir,
emulation_env_config.host_manager_config.host_manager_log_file,
emulation_env_config.host_manager_config.host_manager_max_workers)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd,
conn=emulation_env_config.get_connection(ip=ip))
time.sleep(5)
else:
logger.info(f"Host manager is already running on: {ip}. Output of {cmd} was: {str(o)}, "
f"err output was: {str(e)}")
[docs] @staticmethod
def stop_host_managers(emulation_env_config: EmulationEnvConfig, physical_host_ip: str) -> None:
"""
Utility method for stopping host managers
:param emulation_env_config: the emulation env config
:param physical_host_ip: the ip of the physical host
:return: None
"""
# Stop host manager on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_host_ip:
HostController.stop_host_manager(emulation_env_config=emulation_env_config, ip=c.docker_gw_bridge_ip)
# Stop host manager on Kafka container
if emulation_env_config.kafka_config.container.physical_host_ip == physical_host_ip:
HostController.stop_host_manager(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip)
# Stop host manager on ELK container
if emulation_env_config.elk_config.container.physical_host_ip == physical_host_ip:
HostController.stop_host_manager(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip)
if emulation_env_config.sdn_controller_config is not None:
# Stop host manager on the SDN controller container
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_host_ip:
HostController.stop_host_manager(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip)
[docs] @staticmethod
def stop_host_manager(emulation_env_config: EmulationEnvConfig, ip: str) -> None:
"""
Utility method for stopping the host manager on a specific container
:param emulation_env_config: the emulation env config
:param ip: the ip of the container
:return: None
"""
# Connect
EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=ip)
Logger.__call__().get_logger().info(f"Stopping host manager on node {ip}")
# Stop old background job if running
cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL +
constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.HOST_MANAGER_FILE_NAME)
o, e, _ = EmulationUtil.execute_ssh_cmd(cmd=cmd,
conn=emulation_env_config.get_connection(ip=ip))
[docs] @staticmethod
def start_host_monitor_threads(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and the monitor thread
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Start host monitor on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.start_host_monitor_thread(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Start host monitor on the Kafka container
HostController.start_host_monitor_thread(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Start host monitor on the ELK container
HostController.start_host_monitor_thread(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip, logger=logger)
if emulation_env_config.sdn_controller_config is not None and \
emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Start host monitor on the SDN controller container
HostController.start_host_monitor_thread(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def start_filebeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger,
initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and filebeat
:param emulation_env_config: the emulation env config
:param initial_start: boolean indicating whether this method is called on emulation initialziation or not
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Start filebeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.start_filebeat(emulation_env_config=emulation_env_config,
ips=[c.docker_gw_bridge_ip] + c.get_ips(),
initial_start=initial_start, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Start filebeat on the Kafka container
HostController.start_filebeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.kafka_config.container.docker_gw_bridge_ip] +
emulation_env_config.kafka_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Start filebeat on the ELK container
HostController.start_filebeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.elk_config.container.docker_gw_bridge_ip] +
emulation_env_config.elk_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Start filebeat on the SDN controller container
HostController.start_filebeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip] +
emulation_env_config.sdn_controller_config.container.get_ips()),
initial_start=initial_start, logger=logger)
[docs] @staticmethod
def start_packetbeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger,
initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and packetbeat
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param initial_start: boolean indicating whether this method is called on emulation initialziation or not
:param logger: the logger to use for logging
:return: None
"""
# Start packetbeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.start_packetbeat(
emulation_env_config=emulation_env_config,
ips=[c.docker_gw_bridge_ip] + c.get_ips(),
initial_start=initial_start, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Start packetbeat on the Kafka container
HostController.start_packetbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.kafka_config.container.docker_gw_bridge_ip] +
emulation_env_config.kafka_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Start packetbeat on the ELK container
HostController.start_packetbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.elk_config.container.docker_gw_bridge_ip] +
emulation_env_config.elk_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Start packetbeat on the SDN controller container
HostController.start_packetbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip] +
emulation_env_config.sdn_controller_config.container.get_ips()),
initial_start=initial_start, logger=logger)
[docs] @staticmethod
def start_metricbeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger,
initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and metricbeat
:param emulation_env_config: the emulation env config
:param initial_start: boolean indicating whether this method is called on emulation initialization or not
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Start packetbeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.start_metricbeat(
emulation_env_config=emulation_env_config,
ips=[c.docker_gw_bridge_ip] + c.get_ips(),
initial_start=initial_start, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Start metricbeat on the Kafka container
HostController.start_metricbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.kafka_config.container.docker_gw_bridge_ip] +
emulation_env_config.kafka_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Start metricbeat on the ELK container
HostController.start_metricbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.elk_config.container.docker_gw_bridge_ip] +
emulation_env_config.elk_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Start metricbeat on the SDN controller container
HostController.start_metricbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip] +
emulation_env_config.sdn_controller_config.container.get_ips()),
initial_start=initial_start, logger=logger)
[docs] @staticmethod
def start_heartbeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger,
initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and heartbeat
:param emulation_env_config: the emulation env config
:param initial_start: boolean indicating whether this method is called on emulation initialization or not
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Start heartbeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.start_heartbeat(
emulation_env_config=emulation_env_config,
ips=[c.docker_gw_bridge_ip] + c.get_ips(),
initial_start=initial_start, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Start heartbeat on the Kafka container
HostController.start_heartbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.kafka_config.container.docker_gw_bridge_ip] +
emulation_env_config.kafka_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Start heartbeat on the ELK container
HostController.start_heartbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.elk_config.container.docker_gw_bridge_ip] +
emulation_env_config.elk_config.container.get_ips()),
initial_start=initial_start, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Start heartbeat on the SDN controller container
HostController.start_heartbeat(
emulation_env_config=emulation_env_config,
ips=([emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip] +
emulation_env_config.sdn_controller_config.container.get_ips()),
initial_start=initial_start, logger=logger)
[docs] @staticmethod
def stop_filebeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to stop filebeat
:param emulation_env_config: the emulation env config
:param physical_server_ip: the physical host ip
:param logger: the logger to use for logging
:return: None
"""
# Stop filebeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_server_ip:
HostController.stop_filebeat(emulation_env_config=emulation_env_config, ip=c.docker_gw_bridge_ip,
logger=logger)
# Stop filebeat on the kafka container
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
HostController.stop_filebeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
logger=logger)
# Stop filebeat on the ELK container
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
HostController.stop_filebeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.sdn_controller_config is not None:
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Stop filebeat on the SDN controller container
HostController.stop_filebeat(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip,
logger=logger)
[docs] @staticmethod
def stop_packetbeats(emulation_env_config: EmulationEnvConfig, logger: logging.Logger,
physical_server_ip: str) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to stop packetbeat
:param emulation_env_config: the emulation env config
:param logger: the logger to use for logging
:param physical_server_ip: the of of the physical host
:return: None
"""
# Stop packetbeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_server_ip:
HostController.stop_packetbeat(emulation_env_config=emulation_env_config, ip=c.docker_gw_bridge_ip,
logger=logger)
# Stop packetbeat on the kafka container
if physical_server_ip == emulation_env_config.kafka_config.container.physical_host_ip:
HostController.stop_packetbeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
logger=logger)
# Stop packetbeat on the ELK container
if physical_server_ip == emulation_env_config.elk_config.container.physical_host_ip:
HostController.stop_packetbeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.sdn_controller_config is not None:
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Stop packetbeat on the SDN controller container
HostController.stop_packetbeat(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip,
logger=logger)
[docs] @staticmethod
def stop_metricbeats(emulation_env_config: EmulationEnvConfig, logger: logging.Logger,
physical_server_ip: str) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to stop metricbeat
:param emulation_env_config: the emulation env config
:param logger: the logger to use for logging
:param physical_server_ip: the physical server ip
:return: None
"""
# Stop metricbeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_server_ip:
HostController.stop_metricbeat(emulation_env_config=emulation_env_config, ip=c.docker_gw_bridge_ip,
logger=logger)
# Stop metricbeat on the kafka container
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
HostController.stop_metricbeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
logger=logger)
# Stop metricbeat on the ELK container
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
HostController.stop_metricbeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.sdn_controller_config is not None:
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Stop metricbeat on the SDN controller container
HostController.stop_metricbeat(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_heartbeats(emulation_env_config: EmulationEnvConfig, logger: logging.Logger,
physical_server_ip: str) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to stop heartbeat
:param emulation_env_config: the emulation env config
:param logger: the logger to use for logging
:param physical_server_ip: the physical server ip
:return: None
"""
# Stop heartbeat on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_server_ip:
HostController.stop_heartbeat(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
# Stop heartbeat on the kafka container
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
HostController.stop_heartbeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
logger=logger)
# Stop heartbeat on the ELK container
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
HostController.stop_heartbeat(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.sdn_controller_config is not None:
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Stop heartbeat on the SDN controller container
HostController.stop_heartbeat(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip,
logger=logger)
[docs] @staticmethod
def config_filebeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to setup the configuration of filebeat
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Configure filebeat on the emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.config_filebeat(emulation_env_config=emulation_env_config, container=c, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Configure filebeat on the kafka container
HostController.config_filebeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.kafka_config.container, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Configure filebeat on the ELK container
HostController.config_filebeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.elk_config.container, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Configure filebeat on the SDN controller container
HostController.config_filebeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.sdn_controller_config.container,
logger=logger)
[docs] @staticmethod
def config_packetbeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to setup the configuration of packetbeat
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Configure packetbeat on the emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.config_packetbeat(emulation_env_config=emulation_env_config, container=c, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Configure packetbeat on the kafka container
HostController.config_packetbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.kafka_config.container, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Configure packetbeat on the ELK container
HostController.config_packetbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.elk_config.container, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Configure packetbeat on the SDN controller container
HostController.config_packetbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.sdn_controller_config.container,
logger=logger)
[docs] @staticmethod
def config_metricbeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to setup the configuration of metricbeat
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Configure metricbeat on the emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.config_metricbeat(emulation_env_config=emulation_env_config, container=c, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Configure metricbeat on the kafka container
HostController.config_metricbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.kafka_config.container, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Configure metricbeat on the ELK container
HostController.config_metricbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.elk_config.container, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Configure metricbeat on the SDN controller container
HostController.config_metricbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.sdn_controller_config.container,
logger=logger)
[docs] @staticmethod
def config_heartbeats(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on every container
to start the Host manager and to setup the configuration of heartbeat
:param emulation_env_config: the emulation env config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: None
"""
# Configure heartbeat on the emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
HostController.config_heartbeat(emulation_env_config=emulation_env_config, container=c, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Configure heartbeat on the kafka container
HostController.config_heartbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.kafka_config.container, logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Configure heartbeat on the ELK container
HostController.config_heartbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.elk_config.container, logger=logger)
if emulation_env_config.sdn_controller_config is not None \
and emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Configure heartbeat on the SDN controller container
HostController.config_heartbeat(emulation_env_config=emulation_env_config,
container=emulation_env_config.sdn_controller_config.container,
logger=logger)
[docs] @staticmethod
def start_host_monitor_thread(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to start the Host manager and the monitor thread
:param emulation_env_config: the emulation env config
:param ip: IP of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=ip, port=emulation_env_config.host_manager_config.host_manager_port)
if not host_monitor_dto.monitor_running:
logger.info(f"Host monitor thread is not running on {ip}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_host_monitor(
stub=stub, kafka_ip=emulation_env_config.kafka_config.container.get_ips()[0],
kafka_port=emulation_env_config.kafka_config.kafka_port,
time_step_len_seconds=emulation_env_config.kafka_config.time_step_len_seconds)
[docs] @staticmethod
def start_filebeat(emulation_env_config: EmulationEnvConfig, ips: List[str], logger: logging.Logger,
initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to start the Host manager and filebeat
:param emulation_env_config: the emulation env config
:param ips: IP of the container
:param initial_start: boolean indicating whether this method is called on emulation initialization or not
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ips[0], logger=logger)
if initial_start:
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=ips)
if node_beats_config is None or not node_beats_config.start_filebeat_automatically:
return
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=ips[0], port=emulation_env_config.host_manager_config.host_manager_port)
if not host_monitor_dto.filebeat_running:
logger.info(f"Filebeat is not running on {ips[0]}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ips[0]}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_filebeat(stub=stub)
[docs] @staticmethod
def start_packetbeat(emulation_env_config: EmulationEnvConfig, ips: List[str],
logger: logging.Logger, initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to start the Host manager and packetbeat
:param emulation_env_config: the emulation env config
:param ips: IP of the container
:param initial_start: boolean indicating whether this method is called on emulation initialization or not
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ips[0], logger=logger)
if initial_start:
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=ips)
if node_beats_config is None or not node_beats_config.start_packetbeat_automatically:
return
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=ips[0], port=emulation_env_config.host_manager_config.host_manager_port)
if not host_monitor_dto.packetbeat_running:
logger.info(
f"Packetbeat is not running on {ips[0]}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ips[0]}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_packetbeat(stub=stub)
[docs] @staticmethod
def start_metricbeat(emulation_env_config: EmulationEnvConfig, ips: List[str],
logger: logging.Logger, initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to start the Host manager and metricbeat
:param emulation_env_config: the emulation env config
:param ips: IP of the container
:param initial_start: boolean indicating whether this method is called on emulation initialization or not
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ips[0], logger=logger)
if initial_start:
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=ips)
if node_beats_config is None or not node_beats_config.start_metricbeat_automatically:
return
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=ips[0], port=emulation_env_config.host_manager_config.host_manager_port)
if not host_monitor_dto.metricbeat_running:
logger.info(
f"Metricbeat is not running on {ips[0]}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ips[0]}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_metricbeat(stub=stub)
[docs] @staticmethod
def start_heartbeat(emulation_env_config: EmulationEnvConfig, ips: List[str],
logger: logging.Logger,
initial_start: bool = False) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to start the Host manager and heartbeat
:param emulation_env_config: the emulation env config
:param ips: IPs of the container
:param initial_start: boolean indicating whether this method is called on emulation initialization or not
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ips[0], logger=logger)
if initial_start:
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=ips)
if node_beats_config is None or not node_beats_config.start_heartbeat_automatically:
return
host_monitor_dto = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=ips[0], port=emulation_env_config.host_manager_config.host_manager_port)
if not host_monitor_dto.heartbeat_running:
logger.info(
f"Heartbeat is not running on {ips[0]}, starting it.")
# Open a gRPC session
with grpc.insecure_channel(
f'{ips[0]}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_heartbeat(stub=stub)
[docs] @staticmethod
def start_sparks(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) -> None:
"""
Utility function for starting Spark on compute nodes
:param emulation_config: the emulation env configuration
:param physical_server_ip: the ip of the phsyical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SPARK_IMAGES:
if ids_image in c.name:
logger.info(f"Starting Spark on IP: {c.docker_gw_bridge_ip}")
HostController.start_spark(emulation_env_config=emulation_env_config, ips=[c.docker_gw_bridge_ip],
logger=logger)
[docs] @staticmethod
def stop_sparks(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) -> None:
"""
Utility function for stopping Spark on compute nodes
:param emulation_config: the emulation env configuration
:param physical_server_ip: the ip of the phsyical server
:param logger: the logger to use for logging
:return: None
"""
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip != physical_server_ip:
continue
for ids_image in constants.CONTAINER_IMAGES.SPARK_IMAGES:
if ids_image in c.name:
logger.info(f"Stopping Spark on IP: {c.docker_gw_bridge_ip}")
HostController.stop_spark(emulation_env_config=emulation_env_config,
ips=[c.docker_gw_bridge_ip], logger=logger)
[docs] @staticmethod
def start_spark(emulation_env_config: EmulationEnvConfig, ips: List[str], logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to start the Host manager and spark
:param emulation_env_config: the emulation env config
:param ips: IPs of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ips[0], logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ips[0]}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.start_spark(stub=stub)
[docs] @staticmethod
def stop_spark(emulation_env_config: EmulationEnvConfig, ips: List[str], logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific IP
to stop spark
:param emulation_env_config: the emulation env config
:param ips: IPs of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ips[0], logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ips[0]}:{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
csle_collector.host_manager.query_host_manager.stop_spark(stub=stub)
[docs] @staticmethod
def config_filebeat(emulation_env_config: EmulationEnvConfig, container: NodeContainerConfig,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container
to setup the filebeat configuration
:param emulation_env_config: the emulation env config
:param container: the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=container.docker_gw_bridge_ip,
logger=logger)
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=container.get_ips())
if node_beats_config is None:
return
kafka_topics = list(map(lambda topic: topic.name, emulation_env_config.kafka_config.topics))
# Open a gRPC session
with grpc.insecure_channel(
f'{container.docker_gw_bridge_ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Configuring filebeat on {container.docker_gw_bridge_ip}, {container.get_full_name()}, "
f"ips: {container.get_ips()}")
csle_collector.host_manager.query_host_manager.config_filebeat(
stub=stub, log_files_paths=node_beats_config.log_files_paths,
kibana_ip=emulation_env_config.elk_config.container.get_ips()[0],
kibana_port=emulation_env_config.elk_config.kibana_port,
elastic_ip=emulation_env_config.elk_config.container.get_ips()[0],
elastic_port=emulation_env_config.elk_config.elastic_port,
num_elastic_shards=emulation_env_config.beats_config.num_elastic_shards, kafka_topics=kafka_topics,
kafka_ip=emulation_env_config.kafka_config.container.get_ips()[0],
kafka_port=emulation_env_config.kafka_config.kafka_port,
filebeat_modules=node_beats_config.filebeat_modules,
reload_enabled=emulation_env_config.beats_config.reload_enabled,
kafka=node_beats_config.kafka_input)
[docs] @staticmethod
def config_packetbeat(emulation_env_config: EmulationEnvConfig, container: NodeContainerConfig,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container
to setup the packetbeat configuration
:param emulation_env_config: the emulation env config
:param container: the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=container.docker_gw_bridge_ip,
logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{container.docker_gw_bridge_ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Configuring packetbeat on {container.docker_gw_bridge_ip}, {container.get_full_name()}, "
f"ips: {container.get_ips()}")
csle_collector.host_manager.query_host_manager.config_packetbeat(
stub=stub, kibana_ip=emulation_env_config.elk_config.container.get_ips()[0],
kibana_port=emulation_env_config.elk_config.kibana_port,
elastic_ip=emulation_env_config.elk_config.container.get_ips()[0],
elastic_port=emulation_env_config.elk_config.elastic_port,
num_elastic_shards=emulation_env_config.beats_config.num_elastic_shards)
[docs] @staticmethod
def config_metricbeat(emulation_env_config: EmulationEnvConfig, container: NodeContainerConfig,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container
to setup the metricbeat configuration
:param emulation_env_config: the emulation env config
:param container: the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=container.docker_gw_bridge_ip,
logger=logger)
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=container.get_ips())
if node_beats_config is None:
return
# Open a gRPC session
with grpc.insecure_channel(
f'{container.docker_gw_bridge_ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Configuring metricbeat on {container.docker_gw_bridge_ip}, {container.get_full_name()}, "
f"ips: {container.get_ips()}")
csle_collector.host_manager.query_host_manager.config_metricbeat(
stub=stub, kibana_ip=emulation_env_config.elk_config.container.get_ips()[0],
kibana_port=emulation_env_config.elk_config.kibana_port,
elastic_ip=emulation_env_config.elk_config.container.get_ips()[0],
elastic_port=emulation_env_config.elk_config.elastic_port,
num_elastic_shards=emulation_env_config.beats_config.num_elastic_shards,
kafka_ip=emulation_env_config.kafka_config.container.get_ips()[0],
kafka_port=emulation_env_config.kafka_config.kafka_port,
metricbeat_modules=node_beats_config.metricbeat_modules,
reload_enabled=emulation_env_config.beats_config.reload_enabled)
[docs] @staticmethod
def config_heartbeat(emulation_env_config: EmulationEnvConfig, container: NodeContainerConfig,
logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container
to setup the heartbeat configuration
:param emulation_env_config: the emulation env config
:param container: the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=container.docker_gw_bridge_ip,
logger=logger)
node_beats_config = emulation_env_config.beats_config.get_node_beats_config_by_ips(ips=container.get_ips())
if node_beats_config is None:
return
# Open a gRPC session
with grpc.insecure_channel(
f'{container.docker_gw_bridge_ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Configuring heartbeat on {container.docker_gw_bridge_ip}, {container.get_full_name()}, "
f"ips: {container.get_ips()}")
csle_collector.host_manager.query_host_manager.config_heartbeat(
stub=stub, kibana_ip=emulation_env_config.elk_config.container.get_ips()[0],
kibana_port=emulation_env_config.elk_config.kibana_port,
elastic_ip=emulation_env_config.elk_config.container.get_ips()[0],
elastic_port=emulation_env_config.elk_config.elastic_port,
num_elastic_shards=emulation_env_config.beats_config.num_elastic_shards,
hosts_to_monitor=node_beats_config.heartbeat_hosts_to_monitor)
[docs] @staticmethod
def stop_host_monitor_threads(emulation_env_config: EmulationEnvConfig, logger: logging.Logger,
physical_host_ip: str) -> None:
"""
A method that sends a request to the HostManager on every container to stop the monitor threads
:param emulation_env_config: the emulation env config
:param physical_host_ip: the IP of the physical host
:param logger: the logger to use for logging
:return: None
"""
# Stop host monitor threads on emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_host_ip:
HostController.stop_host_monitor_thread(emulation_env_config=emulation_env_config,
ip=c.docker_gw_bridge_ip, logger=logger)
if emulation_env_config.kafka_config.container.physical_host_ip == physical_host_ip:
# Stop host monitor threads on the kafka container
HostController.stop_host_monitor_thread(emulation_env_config=emulation_env_config,
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.elk_config.container.physical_host_ip == physical_host_ip:
# Stop host monitor threads on the ELK container
HostController.stop_host_monitor_thread(emulation_env_config=emulation_env_config,
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
logger=logger)
if emulation_env_config.sdn_controller_config is not None:
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_host_ip:
# Stop host monitor threads on the SDN controller container
HostController.stop_host_monitor_thread(
emulation_env_config=emulation_env_config,
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, logger=logger)
[docs] @staticmethod
def stop_host_monitor_thread(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container to stop the monitor threads
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Stopping the Host monitor thread on {ip}.")
csle_collector.host_manager.query_host_manager.stop_host_monitor(stub=stub)
[docs] @staticmethod
def stop_filebeat(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container to stop filebeat
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Stopping filebeat on {ip}.")
csle_collector.host_manager.query_host_manager.stop_filebeat(stub=stub)
[docs] @staticmethod
def stop_packetbeat(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container to stop packetbeat
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Stopping packetbeat on {ip}.")
csle_collector.host_manager.query_host_manager.stop_packetbeat(stub=stub)
[docs] @staticmethod
def stop_metricbeat(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container to stop metricbeat
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Stopping metricbeat on {ip}.")
csle_collector.host_manager.query_host_manager.stop_metricbeat(stub=stub)
[docs] @staticmethod
def stop_heartbeat(emulation_env_config: EmulationEnvConfig, ip: str, logger: logging.Logger) -> None:
"""
A method that sends a request to the HostManager on a specific container to stop heartbeat
:param emulation_env_config: the emulation env config
:param ip: the IP of the container
:param logger: the logger to use for logging
:return: None
"""
HostController.start_host_manager(emulation_env_config=emulation_env_config, ip=ip, logger=logger)
# Open a gRPC session
with grpc.insecure_channel(
f'{ip}:'
f'{emulation_env_config.host_manager_config.host_manager_port}',
options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
logger.info(f"Stopping heartbeat on {ip}.")
csle_collector.host_manager.query_host_manager.stop_heartbeat(stub=stub)
[docs] @staticmethod
def get_host_monitor_threads_statuses(emulation_env_config: EmulationEnvConfig, physical_server_ip: str,
logger: logging.Logger) -> \
List[Tuple[csle_collector.host_manager.host_manager_pb2.HostStatusDTO, str]]:
"""
A method that sends a request to the HostManager on every container to get the status of the Host monitor thread
:param emulation_env_config: the emulation config
:param physical_server_ip: the ip of the physical server
:param logger: the logger to use for logging
:return: List of monitor thread statuses
"""
statuses = []
HostController.start_host_managers(emulation_env_config=emulation_env_config, logger=logger)
# Get statuses of emulation containers
for c in emulation_env_config.containers_config.containers:
if c.physical_host_ip == physical_server_ip:
status = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=c.docker_gw_bridge_ip, port=emulation_env_config.host_manager_config.host_manager_port)
statuses.append((status, c.docker_gw_bridge_ip))
if emulation_env_config.kafka_config.container.physical_host_ip == physical_server_ip:
# Get status of kafka container
status = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=emulation_env_config.kafka_config.container.docker_gw_bridge_ip,
port=emulation_env_config.host_manager_config.host_manager_port)
statuses.append((status, emulation_env_config.kafka_config.container.docker_gw_bridge_ip))
if emulation_env_config.elk_config.container.physical_host_ip == physical_server_ip:
# Get status of ELK container
status = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=emulation_env_config.elk_config.container.docker_gw_bridge_ip,
port=emulation_env_config.host_manager_config.host_manager_port)
statuses.append((status, emulation_env_config.elk_config.container.docker_gw_bridge_ip))
if emulation_env_config.sdn_controller_config is not None:
if emulation_env_config.sdn_controller_config.container.physical_host_ip == physical_server_ip:
# Get status of SDN controller container
status = HostController.get_host_monitor_thread_status_by_port_and_ip(
ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip,
port=emulation_env_config.host_manager_config.host_manager_port)
statuses.append((status, emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip))
return statuses
[docs] @staticmethod
def get_host_monitor_thread_status_by_port_and_ip(ip: str, port: int) -> \
csle_collector.host_manager.host_manager_pb2.HostStatusDTO:
"""
A method that sends a request to the HostManager on a specific container
to get the status of the Host monitor thread
:param ip: the ip of the container
:param port: the port of the host manager
:return: the status of the host manager
"""
# Open a gRPC session
with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel:
stub = csle_collector.host_manager.host_manager_pb2_grpc.HostManagerStub(channel)
status = csle_collector.host_manager.query_host_manager.get_host_status(stub=stub)
return status
[docs] @staticmethod
def get_host_managers_ips(emulation_env_config: EmulationEnvConfig) -> List[str]:
"""
A method that extracts the ips of the Host managers in a given emulation
:param emulation_env_config: the emulation env config
:return: the list of IP addresses
"""
ips = []
# Get ips of emulation containers
for c in emulation_env_config.containers_config.containers:
ips.append(c.docker_gw_bridge_ip)
# Get ip of Kafka container
ips.append(emulation_env_config.kafka_config.container.docker_gw_bridge_ip)
# Get ip of ELK container
ips.append(emulation_env_config.elk_config.container.docker_gw_bridge_ip)
if emulation_env_config.sdn_controller_config is not None:
# Get ip of SDN controller container
ips.append(emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip)
return ips
[docs] @staticmethod
def get_host_managers_ports(emulation_env_config: EmulationEnvConfig) -> List[int]:
"""
A method that extracts the ports of the Host managers in a given emulation
:param emulation_env_config: the emulation env config
:return: the list of ports
"""
ports = []
# Get port of emulation containers
for c in emulation_env_config.containers_config.containers:
ports.append(emulation_env_config.host_manager_config.host_manager_port)
# Get port of kafka container
ports.append(emulation_env_config.host_manager_config.host_manager_port)
# Get port of ELK container
ports.append(emulation_env_config.host_manager_config.host_manager_port)
if emulation_env_config.sdn_controller_config is not None:
# Get port of SDN controller container
ports.append(emulation_env_config.host_manager_config.host_manager_port)
return ports
[docs] @staticmethod
def get_host_managers_info(emulation_env_config: EmulationEnvConfig, active_ips: List[str],
logger: logging.Logger, physical_host_ip: str) -> HostManagersInfo:
"""
Extracts the information of the Host managers for a given emulation
:param emulation_env_config: the configuration of the emulation
:param active_ips: list of active IPs
:param logger: the logger to use for logging
:param physical_host_ip: the ip of the physical host
:return: a DTO with the status of the Host managers
"""
host_managers_ips = HostController.get_host_managers_ips(emulation_env_config=emulation_env_config)
host_managers_ports = HostController.get_host_managers_ports(emulation_env_config=emulation_env_config)
host_managers_statuses = []
host_managers_running = []
for ip in host_managers_ips:
if ip not in active_ips or not EmulationUtil.physical_ip_match(
emulation_env_config=emulation_env_config, ip=ip, physical_host_ip=physical_host_ip):
continue
status = None
running = False
try:
status = HostController.get_host_monitor_thread_status_by_port_and_ip(
port=emulation_env_config.host_manager_config.host_manager_port, ip=ip)
running = True
except Exception as e:
logger.info(f"Could not fetch Host manager status on IP:{ip}, error: {str(e)}, {repr(e)}")
if status is not None:
host_managers_statuses.append((status, ip))
else:
host_managers_statuses.append(
(csle_collector.host_manager.host_manager_util.HostManagerUtil.host_monitor_dto_empty(), ip))
host_managers_running.append(running)
execution_id = emulation_env_config.execution_id
emulation_name = emulation_env_config.name
host_manager_info_dto = HostManagersInfo(host_managers_running=host_managers_running, ips=host_managers_ips,
execution_id=execution_id, emulation_name=emulation_name,
host_managers_statuses=host_managers_statuses,
ports=host_managers_ports)
return host_manager_info_dto