Source code for csle_common.controllers.sdn_controller_manager

import logging
from typing import List
import grpc
import time
import csle_collector.ryu_manager.ryu_manager_pb2_grpc
import csle_collector.ryu_manager.ryu_manager_pb2
import csle_collector.ryu_manager.query_ryu_manager
import csle_collector.ryu_manager.ryu_manager_util
import csle_common.constants.constants as constants
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.ryu_managers_info import RyuManagersInfo
from csle_common.util.emulation_util import EmulationUtil


[docs]class SDNControllerManager: """ Class managing interaction with the SDN controller """
[docs] @staticmethod def start_ryu_manager(emulation_env_config: EmulationEnvConfig, logger: logging.Logger) -> None: """ Utility method for starting the Ryu manager :param emulation_env_config: the emulation env config :param logger: the logger to use for logging :return: None """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot start ryu manager for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") # Connect EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, create_producer=False) # Check if ryu_manager is already running cmd = (constants.COMMANDS.PS_AUX + " | " + constants.COMMANDS.GREP + constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.RYU_MANAGER_FILE_NAME) o, e, _ = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=emulation_env_config.get_connection( ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip)) logger.info(f"Ryu manager running?, cmd: {cmd}, out:{o.decode()}, err: {e.decode()}") if constants.COMMANDS.SEARCH_RYU_MANAGER not in str(o): logger.info( f"Starting ryu manager on node: " f"{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}") # Stop old background job if running cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL + constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.RYU_MANAGER_FILE_NAME) o, e, _ = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=emulation_env_config.get_connection( ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip)) # Start the ryu_manager cmd = constants.COMMANDS.START_RYU_MANAGER.format( emulation_env_config.sdn_controller_config.manager_port, emulation_env_config.sdn_controller_config.manager_log_dir, emulation_env_config.sdn_controller_config.manager_log_file, emulation_env_config.sdn_controller_config.manager_max_workers) o, e, _ = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=emulation_env_config.get_connection( ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip)) logger.info(f"Starting the Ryu manager, cmd: {cmd}, out:{o.decode()}, err: {e.decode()}") time.sleep(2)
[docs] @staticmethod def stop_ryu_manager(emulation_env_config: EmulationEnvConfig, logger: logging.Logger) -> None: """ Utility method for stopping the Ryu manager :param emulation_env_config: the emulation env config :param logger: the logger to use for logging :return: None """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot stop ryu manager for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") # Connect EmulationUtil.connect_admin(emulation_env_config=emulation_env_config, ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, create_producer=False) logger.info( f"Stopping ryu manager on node: " f"{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}") # Stop background job cmd = (constants.COMMANDS.SUDO + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PKILL + constants.COMMANDS.SPACE_DELIM + constants.TRAFFIC_COMMANDS.RYU_MANAGER_FILE_NAME) o, e, _ = EmulationUtil.execute_ssh_cmd( cmd=cmd, conn=emulation_env_config.get_connection( ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip)) logger.info(f"Stopping the Ryu manager, cmd: {cmd}, out:{o.decode()}, err: {e.decode()}") time.sleep(2)
[docs] @staticmethod def get_ryu_status(emulation_env_config: EmulationEnvConfig, logger: logging.Logger) -> \ csle_collector.ryu_manager.ryu_manager_pb2.RyuDTO: """ Method for querying the RyuManager about the status of the Ryu SDN controller :param emulation_env_config: the emulation config :param logger: the logger to use for logging :return: a RyuDTO with the status """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot get ryu status for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") SDNControllerManager.start_ryu_manager(emulation_env_config=emulation_env_config, logger=logger) ryu_dto = SDNControllerManager.get_ryu_status_by_port_and_ip( ip=emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip, port=emulation_env_config.sdn_controller_config.manager_port) return ryu_dto
[docs] @staticmethod def get_ryu_status_by_port_and_ip(ip: str, port: int) -> csle_collector.ryu_manager.ryu_manager_pb2.RyuDTO: """ Method for querying the RyuManager about the status of the Ryu SDN controller :param ip: the ip where the RyuManager is running :param port: the port the RyuManager is listening to :return: an RyuDTO with the status """ # Open a gRPC session with grpc.insecure_channel(f'{ip}:{port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.ryu_manager.ryu_manager_pb2_grpc.RyuManagerStub(channel) ryu_dto = csle_collector.ryu_manager.query_ryu_manager.get_ryu_status(stub) return ryu_dto
[docs] @staticmethod def stop_ryu(emulation_env_config: EmulationEnvConfig, logger: logging.Logger) \ -> csle_collector.ryu_manager.ryu_manager_pb2.RyuDTO: """ Method for requesting the RyuManager to stop the RYU SDN controller :param emulation_env_config: the emulation env config :return: a RyuDTO with the status of the server """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot stop Ryu for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") logger.info( f"Stopping RYU SDN controller on container: " f"{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}") SDNControllerManager.start_ryu_manager(emulation_env_config=emulation_env_config, logger=logger) # Open a gRPC session with grpc.insecure_channel( f'{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}:' f'{emulation_env_config.sdn_controller_config.manager_port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.ryu_manager.ryu_manager_pb2_grpc.RyuManagerStub(channel) ryu_dto = csle_collector.ryu_manager.query_ryu_manager.stop_ryu(stub) return ryu_dto
[docs] @staticmethod def start_ryu(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) \ -> csle_collector.ryu_manager.ryu_manager_pb2.RyuDTO: """ Method for requesting the RyuManager to start the Ryu SDN controller :param emulation_env_config: the emulation env config :param physical_server_ip: the ip of the physical server :param logger: the logger to use for logging :return: an RyuDTO with the status of the server """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot start Ryu for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") if emulation_env_config.sdn_controller_config.container.physical_host_ip != physical_server_ip: return logger.info( f"Starting Ryu SDN controller on container: " f"{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}") SDNControllerManager.start_ryu_manager(emulation_env_config=emulation_env_config, logger=logger) # Open a gRPC session with grpc.insecure_channel( f'{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}:' f'{emulation_env_config.sdn_controller_config.manager_port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.ryu_manager.ryu_manager_pb2_grpc.RyuManagerStub(channel) logger.info( f"Starting RYU, port: {emulation_env_config.sdn_controller_config.controller_port}, " f"web_port: {emulation_env_config.sdn_controller_config.controller_web_api_port}, " f"controller: {emulation_env_config.sdn_controller_config.controller_module_name}") ryu_dto = csle_collector.ryu_manager.query_ryu_manager.start_ryu( stub, port=emulation_env_config.sdn_controller_config.controller_port, web_port=emulation_env_config.sdn_controller_config.controller_web_api_port, controller=emulation_env_config.sdn_controller_config.controller_module_name) return ryu_dto
[docs] @staticmethod def start_ryu_monitor(emulation_env_config: EmulationEnvConfig, physical_server_ip: str, logger: logging.Logger) \ -> csle_collector.ryu_manager.ryu_manager_pb2.RyuDTO: """ Method for requesting the RyuManager to start the Ryu monitor :param emulation_env_config: the emulation env config :param physical_server_ip: ip of the physical server :param logger: the logger to use for logging :return: an RyuDTO with the status """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot start Ryu monitor for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") if emulation_env_config.sdn_controller_config.container.physical_host_ip != physical_server_ip: return logger.info( f"Starting the ryu monitor on container: " f"{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}") SDNControllerManager.start_ryu_manager(emulation_env_config=emulation_env_config, logger=logger) # Open a gRPC session with grpc.insecure_channel( f'{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}:' f'{emulation_env_config.sdn_controller_config.manager_port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.ryu_manager.ryu_manager_pb2_grpc.RyuManagerStub(channel) ryu_dto = csle_collector.ryu_manager.query_ryu_manager.start_ryu_monitor( stub, kafka_ip=emulation_env_config.kafka_config.container.get_ips()[0], kafka_port=emulation_env_config.kafka_config.kafka_port, time_step_len=emulation_env_config.sdn_controller_config.time_step_len_seconds) return ryu_dto
[docs] @staticmethod def stop_ryu_monitor(emulation_env_config: EmulationEnvConfig, logger: logging.Logger) -> \ csle_collector.ryu_manager.ryu_manager_pb2.RyuDTO: """ Method for requesting the RyuManager to stop the ryu monitor :param emulation_env_config: the emulation env config :param logger: the logger to use for logging :return: an RyuDTO with the status of the server """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot stop Ryu monitor for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") logger.info( f"Stopping Ryu monitor on container: " f"{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}") SDNControllerManager.start_ryu_manager(emulation_env_config=emulation_env_config, logger=logger) # Open a gRPC session with grpc.insecure_channel( f'{emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip}:' f'{emulation_env_config.sdn_controller_config.manager_port}', options=constants.GRPC_SERVERS.GRPC_OPTIONS) as channel: stub = csle_collector.ryu_manager.ryu_manager_pb2_grpc.RyuManagerStub(channel) ryu_dto = csle_collector.ryu_manager.query_ryu_manager.stop_ryu_monitor(stub) return ryu_dto
[docs] @staticmethod def get_ryu_managers_ips(emulation_env_config: EmulationEnvConfig) -> List[str]: """ A method that extracts the IPS of the Ryu managers in a given emulation :param emulation_env_config: the emulation env config :return: the list of IP addresses """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot get Ryu managers IPs for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") return [emulation_env_config.sdn_controller_config.container.docker_gw_bridge_ip]
[docs] @staticmethod def get_ryu_managers_ports(emulation_env_config: EmulationEnvConfig) -> List[int]: """ A method that extracts the ports of the Ryu managers in a given emulation :param emulation_env_config: the emulation env config :return: the list of IP addresses """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot get Ryu managers ports for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") return [emulation_env_config.sdn_controller_config.manager_port]
[docs] @staticmethod def get_ryu_managers_info(emulation_env_config: EmulationEnvConfig, active_ips: List[str], logger: logging.Logger, physical_server_ip: str) -> RyuManagersInfo: """ Extracts the information of the Ryu managers for a given emulation :param emulation_env_config: the configuration of the emulation :param active_ips: list of active IPs :param logger: the logger to use for logging :param physical_server_ip: the ip of the physical server :return: a DTO with the status of the Ryu managers """ if emulation_env_config.sdn_controller_config is None: raise ValueError(f"Cannot get ryu managers info for emulation without an SDN config. " f"Emulation: {emulation_env_config.name}") ryu_managers_ips = SDNControllerManager.get_ryu_managers_ips(emulation_env_config=emulation_env_config) ryu_managers_ports = SDNControllerManager.get_ryu_managers_ports(emulation_env_config=emulation_env_config) ryu_managers_statuses = [] ryu_managers_running = [] for ip in ryu_managers_ips: if ip not in active_ips or not EmulationUtil.physical_ip_match( emulation_env_config=emulation_env_config, ip=ip, physical_host_ip=physical_server_ip): continue status = None try: status = SDNControllerManager.get_ryu_status_by_port_and_ip( port=emulation_env_config.sdn_controller_config.manager_port, ip=ip) running = True except Exception as e: running = False logger.debug(f"Could not fetch Ryu manager status on IP:{ip}, error: {str(e)}, {repr(e)}") if status is not None: ryu_managers_statuses.append(status) else: ryu_managers_statuses.append(csle_collector.ryu_manager.ryu_manager_util.RyuManagerUtil.ryu_dto_empty()) ryu_managers_running.append(running) execution_id = emulation_env_config.execution_id emulation_name = emulation_env_config.name ryu_manager_info_dto = RyuManagersInfo( ryu_managers_running=ryu_managers_running, ips=ryu_managers_ips, execution_id=execution_id, emulation_name=emulation_name, ryu_managers_statuses=ryu_managers_statuses, ports=ryu_managers_ports, physical_server_ip=emulation_env_config.sdn_controller_config.container.physical_host_ip) return ryu_manager_info_dto