Source code for csle_collector.client_manager.query_clients

from typing import List
import csle_collector.client_manager.client_manager_pb2_grpc
import csle_collector.client_manager.client_manager_pb2
from csle_collector.client_manager.dao.workflows_config import WorkflowsConfig
from csle_collector.client_manager.dao.client import Client
import csle_collector.constants.constants as constants


[docs]def get_clients(stub: csle_collector.client_manager.client_manager_pb2_grpc.ClientManagerStub, timeout=constants.GRPC.TIMEOUT_SECONDS) \ -> csle_collector.client_manager.client_manager_pb2.ClientsDTO: """ Queries the server for the client state :param stub: the stub to send the remote gRPC to the server :param timeout: the GRPC timeout (seconds) :return: a clientsDTO describing the state of the clients """ get_clients_dto_msg = csle_collector.client_manager.client_manager_pb2.GetClientsMsg() clients_dto: csle_collector.client_manager.client_manager_pb2.ClientsDTO = \ stub.getClients(get_clients_dto_msg, timeout=timeout) return clients_dto
[docs]def stop_clients(stub: csle_collector.client_manager.client_manager_pb2_grpc.ClientManagerStub, timeout=constants.GRPC.TIMEOUT_SECONDS): """ Stops the client arrival process :param stub: the stub to the gRPC server :param timeout: the GRPC timeout (seconds) :return: a clientsDTO describing the state of the clients """ stop_clients_msg = csle_collector.client_manager.client_manager_pb2.StopClientsMsg() clients_dto: csle_collector.client_manager.client_manager_pb2.ClientsDTO = \ stub.stopClients(stop_clients_msg, timeout=timeout) return clients_dto
[docs]def start_clients(stub: csle_collector.client_manager.client_manager_pb2_grpc.ClientManagerStub, time_step_len_seconds: int, workflows_config: WorkflowsConfig, clients: List[Client], timeout=constants.GRPC.TIMEOUT_SECONDS) \ -> csle_collector.client_manager.client_manager_pb2.ClientsDTO: """ Starts the client arrival process :param stub: the stub to the gRPC server :param time_step_len_seconds: length of a time-step in the emulation :param workflows_config: configuration of the workflows :param clients: list of client profiles :param timeout: the timeout for sending a request to the GRPC server :return: a clients DTO describing the state of the clients """ clients_grpcs = list(map(lambda x: x.to_grpc_object(), clients)) workflows_config_grpc = workflows_config.to_grpc_object() start_clients_msg = csle_collector.client_manager.client_manager_pb2.StartClientsMsg( time_step_len_seconds=time_step_len_seconds, clients=clients_grpcs, workflows_config=workflows_config_grpc) clients_dto: csle_collector.client_manager.client_manager_pb2.ClientsDTO = \ stub.startClients(start_clients_msg, timeout=timeout) return clients_dto
[docs]def stop_producer(stub: csle_collector.client_manager.client_manager_pb2_grpc.ClientManagerStub, timeout=constants.GRPC.TIMEOUT_SECONDS): """ Stops the producer process :param stub: the stub to the gRPC server :param timeout: the GRPC timeout (seconds) :return: a clientsDTO describing the state of the clients """ stop_producer_msg = csle_collector.client_manager.client_manager_pb2.StopProducerMsg() clients_dto = stub.stopProducer(stop_producer_msg, timeout=timeout) return clients_dto
[docs]def start_producer(stub: csle_collector.client_manager.client_manager_pb2_grpc.ClientManagerStub, ip: str, port: int, time_step_len_seconds: int, timeout=constants.GRPC.TIMEOUT_SECONDS): """ Stops the producer process :param stub: the stub to the gRPC server :param ip: ip of the kafka server to produce to :param port: port of the kafka server to produce to :param timeout: the GRPC timeout (seconds) :return: a clientsDTO describing the state of the clients """ start_producer_msg = csle_collector.client_manager.client_manager_pb2.StartProducerMsg( ip=ip, port=port, time_step_len_seconds=time_step_len_seconds ) clients_dto = stub.startProducer(start_producer_msg, timeout=timeout) return clients_dto