Source code for csle_collector.client_manager.client_manager

from typing import Union
import time
import logging
import grpc
from concurrent import futures
from csle_collector.client_manager.dao.client import Client
from csle_collector.client_manager.threads.arrival_thread import ArrivalThread
from csle_collector.client_manager.threads.producer_thread import ProducerThread
from csle_collector.client_manager.dao.workflows_config import WorkflowsConfig
import csle_collector.client_manager.client_manager_pb2_grpc
import csle_collector.client_manager.client_manager_pb2
import csle_collector.constants.constants as constants


[docs]class ClientManagerServicer(csle_collector.client_manager.client_manager_pb2_grpc.ClientManagerServicer): """ gRPC server for managing the running clients. Allows to start/stop clients remotely and also to query the state of the clients. """ def __init__(self) -> None: """ Initializes the server """ self.arrival_thread: Union[None, ArrivalThread] = None self.producer_thread: Union[None, ProducerThread] = None logging.basicConfig(filename=f"{constants.LOG_FILES.CLIENT_MANAGER_LOG_DIR}" f"{constants.LOG_FILES.CLIENT_MANAGER_LOG_FILE}", level=logging.INFO)
[docs] def get_arrival_thread(self) -> Union[None, ArrivalThread]: """ Gets the arrival thread :return: The arrival thread if it is initialized, otherwise None """ return self.arrival_thread
[docs] def get_producer_thread(self) -> Union[None, ProducerThread]: """ Gets the producer thread :return: The producer thread if it is initialized, otherwise None """ return self.producer_thread
[docs] def getClients(self, request: csle_collector.client_manager.client_manager_pb2.GetClientsMsg, context: grpc.ServicerContext) -> csle_collector.client_manager.client_manager_pb2.ClientsDTO: """ Gets the state of the clients :param request: the gRPC request :param context: the gRPC context :return: a clients DTO with the state of the clients """ logging.info("Getting client information") num_clients = 0 clients_time_step_len_seconds = 0.0 producer_time_step_len_seconds = 0.0 arrival_thread = self.get_arrival_thread() producer_thread = self.get_producer_thread() client_process_active = False if arrival_thread is not None: num_clients = len(arrival_thread.client_threads) client_process_active = True clients_time_step_len_seconds = arrival_thread.time_step_len_seconds producer_active = False if producer_thread is not None: producer_active = True producer_time_step_len_seconds = producer_thread.time_step_len_seconds clients_dto = csle_collector.client_manager.client_manager_pb2.ClientsDTO( num_clients=num_clients, client_process_active=client_process_active, producer_active=producer_active, clients_time_step_len_seconds=int(clients_time_step_len_seconds), producer_time_step_len_seconds=int(producer_time_step_len_seconds)) return clients_dto
[docs] def stopClients(self, request: csle_collector.client_manager.client_manager_pb2.StopClientsMsg, context: grpc.ServicerContext): """ Stops the Poisson-process that generates new clients :param request: the gRPC request :param context: the gRPC context :return: a clients DTO with the state of the clients """ logging.info("Stopping clients") clients_time_step_len_seconds = 0.0 producer_time_step_len_seconds = 0.0 producer_thread = self.get_producer_thread() arrival_thread = self.get_producer_thread() if arrival_thread is not None: clients_time_step_len_seconds = arrival_thread.time_step_len_seconds arrival_thread.stopped = True time.sleep(1) self.arrival_thread = None producer_active = False if producer_thread is not None: producer_active = True producer_time_step_len_seconds = producer_thread.time_step_len_seconds return csle_collector.client_manager.client_manager_pb2.ClientsDTO( num_clients=0, client_process_active=False, producer_active=producer_active, clients_time_step_len_seconds=int(clients_time_step_len_seconds), producer_time_step_len_seconds=int(producer_time_step_len_seconds))
[docs] def startClients(self, request: csle_collector.client_manager.client_manager_pb2.StartClientsMsg, context: grpc.ServicerContext) -> csle_collector.client_manager.client_manager_pb2.ClientsDTO: """ Starts/Restarts the Poisson process(es) that generate(s) clients :param request: the gRPC request :param context: the gRPC context :return: a clients DTO with the state of the clients """ clients = list(map(lambda x: Client.from_grpc_object(x), request.clients)) workflows_config = WorkflowsConfig.from_grpc_object(request.workflows_config) logging.info(f"Starting clients, num clients:{len(clients)}, " f"num workflows: {len(workflows_config.workflow_markov_chains)}, " f"num services: {len(workflows_config.workflow_services)}, " f"client types: {list(map(lambda x: str(x), clients))}," f"workflow markov chains: {list(map(lambda x: str(x), workflows_config.workflow_markov_chains))}," f"workflow services: {list(map(lambda x: str(x), workflows_config.workflow_services))}," f"\n commands: {workflows_config.commands()}") producer_time_step_len_seconds = 0 arrival_thread = self.get_arrival_thread() producer_thread = self.get_producer_thread() if arrival_thread is not None: arrival_thread.stopped = True time.sleep(1) if request.time_step_len_seconds <= 0: request.time_step_len_seconds = 1 arrival_thread = ArrivalThread(time_step_len_seconds=request.time_step_len_seconds, clients=clients, workflows_config=workflows_config) arrival_thread.start() self.arrival_thread = arrival_thread clients_time_step_len_seconds = request.time_step_len_seconds producer_active = False if producer_thread is not None: producer_active = True producer_time_step_len_seconds = producer_thread.time_step_len_seconds clients_dto = csle_collector.client_manager.client_manager_pb2.ClientsDTO( num_clients=0, client_process_active=True, producer_active=producer_active, clients_time_step_len_seconds=int(clients_time_step_len_seconds), producer_time_step_len_seconds=producer_time_step_len_seconds) return clients_dto
[docs] def startProducer(self, request: csle_collector.client_manager.client_manager_pb2.StartProducerMsg, context: grpc.ServicerContext) -> csle_collector.client_manager.client_manager_pb2.ClientsDTO: """ Starts/Restarts the producer thread that pushes data to Kafka :param request: the gRPC request :param context: the gRPC context :return: a clients DTO with the state of the clients """ clients_time_step_len_seconds = 0.0 producer_thread = self.get_producer_thread() arrival_thread = self.get_arrival_thread() logging.info(f"Starting producer, time-step len:{request.time_step_len_seconds}s, " f"arrival_thread: {arrival_thread}") time.sleep(5) if producer_thread is not None: producer_thread.stopped = True time.sleep(1) if request.time_step_len_seconds <= 0: request.time_step_len_seconds = 1 if arrival_thread is None: raise ValueError("Cannot start producer if the arrival thread is not started") producer_thread = ProducerThread(arrival_thread=arrival_thread, time_step_len_seconds=request.time_step_len_seconds, ip=request.ip, port=request.port) producer_thread.start() self.producer_thread = producer_thread client_process_active = False num_clients = 0 if arrival_thread is not None: num_clients = len(arrival_thread.client_threads) client_process_active = True clients_time_step_len_seconds = arrival_thread.time_step_len_seconds clients_dto = csle_collector.client_manager.client_manager_pb2.ClientsDTO( num_clients=num_clients, client_process_active=client_process_active, producer_active=True, clients_time_step_len_seconds=int(clients_time_step_len_seconds), producer_time_step_len_seconds=request.time_step_len_seconds) return clients_dto
[docs] def stopProducer(self, request: csle_collector.client_manager.client_manager_pb2.StopProducerMsg, context: grpc.ServicerContext) -> csle_collector.client_manager.client_manager_pb2.ClientsDTO: """ Stops the producer thread that pushes data to Kafka :param request: the gRPC request :param context: the gRPC context :return: a clients DTO with the state of the clients """ logging.info("Stopping producer") clients_time_step_len_seconds = 0.0 producer_thread = self.get_producer_thread() arrival_thread = self.get_arrival_thread() if producer_thread is not None: producer_thread.stopped = True time.sleep(1) self.producer_thread = None client_process_active = False num_clients = 0 if arrival_thread is not None: num_clients = len(arrival_thread.client_threads) client_process_active = True clients_time_step_len_seconds = arrival_thread.time_step_len_seconds clients_dto = csle_collector.client_manager.client_manager_pb2.ClientsDTO( num_clients=num_clients, client_process_active=client_process_active, producer_active=False, clients_time_step_len_seconds=int(clients_time_step_len_seconds), producer_time_step_len_seconds=0) return clients_dto
[docs]def serve(port: int = 50044, log_dir: str = "/", log_file_name: str = "client_manager.log", max_workers: int = 10) -> None: """ Starts the gRPC server for managing clients :param port: the port that the server will listen to :param log_dir: the directory to write the log file :param log_file_name: the file name of the log :param max_workers: the maximum number of GRPC workers :return: None """ constants.LOG_FILES.CLIENT_MANAGER_LOG_DIR = log_dir constants.LOG_FILES.CLIENT_MANAGER_LOG_FILE = log_file_name server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) csle_collector.client_manager.client_manager_pb2_grpc.add_ClientManagerServicer_to_server( ClientManagerServicer(), server) server.add_insecure_port(f'[::]:{port}') server.start() logging.info(f"ClientManager Server Started, Listening on port: {port}") server.wait_for_termination()
# Program entrypoint if __name__ == '__main__': serve()