Source code for csle_collector.traffic_manager.traffic_manager

from typing import List
import logging
import os
import subprocess
from concurrent import futures
import grpc
import socket
import netifaces
import io
import csle_collector.traffic_manager.traffic_manager_pb2_grpc
import csle_collector.traffic_manager.traffic_manager_pb2
import csle_collector.constants.constants as constants


[docs]class TrafficManagerServicer(csle_collector.traffic_manager.traffic_manager_pb2_grpc.TrafficManagerServicer): """ gRPC server for managing a traffic generator. Allows to start/stop the script and also to query the state of the script. """ def __init__(self) -> None: """ Initializes the server """ logging.basicConfig(filename=f"{constants.LOG_FILES.TRAFFIC_MANAGER_LOG_DIR}" f"{constants.LOG_FILES.TRAFFIC_MANAGER_LOG_FILE}", level=logging.INFO) self.hostname = socket.gethostname() try: self.ip = netifaces.ifaddresses(constants.INTERFACES.ETH0)[netifaces.AF_INET][0][constants.INTERFACES.ADDR] except Exception: self.ip = socket.gethostbyname(self.hostname) logging.info(f"Setting up TrafficManager hostname: {self.hostname} ip: {self.ip}") @staticmethod def _get_traffic_status() -> bool: """ Utility method to get the status of the traffic generator script :return: status of the traffic generator """ cmd = constants.TRAFFIC_GENERATOR.CHECK_IF_TRAFFIC_GENERATOR_IS_RUNNING output = subprocess.run(cmd.split(" "), capture_output=True, text=True).stdout running = constants.TRAFFIC_GENERATOR.TRAFFIC_GENERATOR_FILE_NAME in output return running @staticmethod def _read_traffic_script() -> str: """ :return: the traffic generator script file """ try: with io.open(f"/{constants.TRAFFIC_GENERATOR.TRAFFIC_GENERATOR_FILE_NAME}", 'r', encoding='utf-8') as f: script_file_str = f.read() return script_file_str except Exception as e: logging.info(f"Could not read the script file: {str(e)}, {repr(e)}") return "" @staticmethod def _create_traffic_script(commands: List[str], sleep_time: int) -> None: """ Utility method to create the traffic script based on the list of commands :param commands: the list of commands :param sleep_time: the sleep time :return: None """ # File contents script_file = "" script_file = script_file + "#!/bin/bash\n" script_file = script_file + "while [ 1 ]\n" script_file = script_file + "do\n" script_file = script_file + " sleep {}\n".format(sleep_time) for cmd in commands: script_file = script_file + " " + cmd + "\n" script_file = script_file + " sleep {}\n".format(sleep_time) script_file = script_file + "done\n" # Remove old file if exists cmd = constants.TRAFFIC_GENERATOR.REMOVE_OLD_TRAFFIC_GENERATOR_FILE result = subprocess.run(cmd.split(" "), capture_output=True, text=True) logging.info(f"Removed old file, stdout: {result.stdout}, stderr: {result.stderr}") # Create file cmd = constants.TRAFFIC_GENERATOR.CREATE_TRAFFIC_GENERATOR_FILE result = subprocess.run(cmd.split(" "), capture_output=True, text=True) logging.info(f"Created new file, stdout: {result.stdout}, stderr: {result.stderr}") # Make executable cmd = constants.TRAFFIC_GENERATOR.MAKE_TRAFFIC_GENERATOR_FILE_EXECUTABLE result = subprocess.run(cmd.split(" "), capture_output=True, text=True) logging.info(f"Changed permissions, stdout: {result.stdout}, stderr: {result.stderr}") # Write traffic generation script file with io.open(f"/{constants.TRAFFIC_GENERATOR.TRAFFIC_GENERATOR_FILE_NAME}", 'w', encoding='utf-8') as f: f.write(script_file)
[docs] def getTrafficStatus(self, request: csle_collector.traffic_manager.traffic_manager_pb2.GetTrafficStatusMsg, context: grpc.ServicerContext) \ -> csle_collector.traffic_manager.traffic_manager_pb2.TrafficDTO: """ Gets the state of the traffic manager :param request: the gRPC request :param context: the gRPC context :return: a TrafficDTO with the state of the traffic manager """ logging.info("Getting traffic manager status") running = TrafficManagerServicer._get_traffic_status() script_file_str = TrafficManagerServicer._read_traffic_script() logging.info("Returning traffic manager status") traffic_dto = csle_collector.traffic_manager.traffic_manager_pb2.TrafficDTO(running=running, script=script_file_str) return traffic_dto
[docs] def stopTraffic(self, request: csle_collector.traffic_manager.traffic_manager_pb2.StopTrafficMsg, context: grpc.ServicerContext): """ Stops the traffic generator :param request: the gRPC request :param context: the gRPC context :return: a traffic DTO with the state of the traffic generator """ logging.info("Stopping the traffic generator script") cmd = "sudo pkill -f traffic_generator.sh" os.system(cmd) script_file_str = TrafficManagerServicer._read_traffic_script() logging.info("Traffic generator stopped") return csle_collector.traffic_manager.traffic_manager_pb2.TrafficDTO(running=False, script=script_file_str)
[docs] def startTraffic(self, request: csle_collector.traffic_manager.traffic_manager_pb2.StartTrafficMsg, context: grpc.ServicerContext) -> csle_collector.traffic_manager.traffic_manager_pb2.TrafficDTO: """ Starts the traffic generator :param request: the gRPC request :param context: the gRPC context :return: a TrafficDTO with the state of the traffic generator """ logging.info(f"Starting the traffic generator, \n sleep_time: {request.sleepTime}, " f"commands:{request.commands}") commands = list(request.commands) sleep_time = request.sleepTime TrafficManagerServicer._create_traffic_script(commands=commands, sleep_time=sleep_time) cmd = constants.TRAFFIC_GENERATOR.START_TRAFFIC_GENERATOR_CMD os.system(cmd) script_file_str = TrafficManagerServicer._read_traffic_script() traffic_dto = csle_collector.traffic_manager.traffic_manager_pb2.TrafficDTO( running=True, script=script_file_str) logging.info("Traffic generator started") return traffic_dto
[docs]def serve(port: int = 50043, log_dir: str = "/", max_workers: int = 10, log_file_name: str = "traffic_manager.log") -> None: """ Starts the gRPC server for managing traffic scripts :param port: the port that the server will listen to :param log_dir: the directory to write the log file :param log_file_name: the file name of the log :param max_workers: the maximum number of GRPC workers :return: None """ constants.LOG_FILES.TRAFFIC_MANAGER_LOG_DIR = log_dir constants.LOG_FILES.TRAFFIC_MANAGER_LOG_FILE = log_file_name server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) csle_collector.traffic_manager.traffic_manager_pb2_grpc.add_TrafficManagerServicer_to_server( TrafficManagerServicer(), server) server.add_insecure_port(f'[::]:{port}') server.start() logging.info(f"TrafficManager Server Started, Listening on port: {port}") server.wait_for_termination()
# Program entrypoint if __name__ == '__main__': serve()