Source code for csle_collector.client_manager.dao.client

import random
from typing import List, Dict, Any
import numpy as np
from scipy.stats import expon
from csle_collector.client_manager.dao.arrival_config import ArrivalConfig
from csle_collector.client_manager.dao.eptmp_arrival_config import EPTMPArrivalConfig
from csle_collector.client_manager.dao.spiking_arrival_config import SpikingArrivalConfig
from csle_collector.client_manager.dao.sine_arrival_config import SineArrivalConfig
from csle_collector.client_manager.dao.piece_wise_constant_arrival_config import PieceWiseConstantArrivalConfig
from csle_collector.client_manager.dao.constant_arrival_config import ConstantArrivalConfig
from csle_collector.client_manager.dao.client_arrival_type import ClientArrivalType
from csle_collector.client_manager.dao.workflows_config import WorkflowsConfig
import csle_collector.client_manager.client_manager_pb2
from csle_base.json_serializable import JSONSerializable
from csle_base.grpc_serializable import GRPCSerializable


[docs]class Client(JSONSerializable, GRPCSerializable): """ A client, which is characterized by its arrival process and its workflow distribution. """ def __init__(self, id: int, workflow_distribution: List[float], arrival_config: ArrivalConfig, mu: float = 4, exponential_service_time: bool = False) -> None: """ Initializes the object :param id: the client id :param mu: the mean service time if exponential service times are used (otherwise MC is used) :param exponential_service_time: boolean flag indicating whether exponential service times should be used :param arrival_config: the arrival process configuration of the client :param workflow_distribution: the workflow distribution of the client """ self.id = id self.mu = mu self.exponential_service_time = exponential_service_time self.arrival_config = arrival_config assert round(sum(workflow_distribution), 2) == 1 assert self.arrival_config is not None self.workflow_distribution = workflow_distribution
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "Client": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ if d["arrival_config"]["client_arrival_type"] == ClientArrivalType.CONSTANT.value: arrival_config = ConstantArrivalConfig.from_dict(d["arrival_config"]) elif d["arrival_config"]["client_arrival_type"] == ClientArrivalType.EPTMP.value: arrival_config = EPTMPArrivalConfig.from_dict(d["arrival_config"]) elif d["arrival_config"]["client_arrival_type"] == ClientArrivalType.SPIKING.value: arrival_config = SpikingArrivalConfig.from_dict(d["arrival_config"]) elif d["arrival_config"]["client_arrival_type"] == ClientArrivalType.SINE_MODULATED.value: arrival_config = SineArrivalConfig.from_dict(d["arrival_config"]) elif d["arrival_config"]["client_arrival_type"] == ClientArrivalType.PIECE_WISE_CONSTANT.value: arrival_config = PieceWiseConstantArrivalConfig.from_dict(d["arrival_config"]) else: raise ValueError("Arrival config not recognized") obj = Client( arrival_config=arrival_config, id=d["id"], mu=d["mu"], exponential_service_time=d["exponential_service_time"], workflow_distribution=d["workflow_distribution"] ) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["arrival_config"] = self.arrival_config.to_dict() d["id"] = self.id d["mu"] = self.mu d["exponential_service_time"] = self.exponential_service_time d["workflow_distribution"] = self.workflow_distribution return d
[docs] @staticmethod def from_json_file(json_file_path: str) -> "Client": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return Client.from_dict(json.loads(json_str))
[docs] def copy(self) -> "Client": """ :return: a copy of the DTO """ return Client.from_dict(self.to_dict())
[docs] def to_grpc_object(self) -> csle_collector.client_manager.client_manager_pb2.ClientDTO: """ :return: a GRPC serializable version of the object """ constant_arrival_config = None sine_arrival_config = None spiking_arrival_config = None piece_wise_constant_arrival_config = None eptmp_arrival_config = None if self.arrival_config.client_arrival_type == ClientArrivalType.CONSTANT: constant_arrival_config = self.arrival_config.to_grpc_object() elif self.arrival_config.client_arrival_type == ClientArrivalType.SINE_MODULATED: sine_arrival_config = self.arrival_config.to_grpc_object() elif self.arrival_config.client_arrival_type == ClientArrivalType.SPIKING: spiking_arrival_config = self.arrival_config.to_grpc_object() elif self.arrival_config.client_arrival_type == ClientArrivalType.PIECE_WISE_CONSTANT: piece_wise_constant_arrival_config = self.arrival_config.to_grpc_object() elif self.arrival_config.client_arrival_type == ClientArrivalType.EPTMP: eptmp_arrival_config = self.arrival_config.to_grpc_object() else: raise ValueError(f"Client arrival type: {self.arrival_config.client_arrival_type} not recognized") return csle_collector.client_manager.client_manager_pb2.ClientDTO( id=self.id, workflow_distribution=self.workflow_distribution, mu=self.mu, exponential_service_time=self.exponential_service_time, constant_arrival_config=constant_arrival_config, sine_arrival_config=sine_arrival_config, spiking_arrival_config=spiking_arrival_config, piece_wise_constant_arrival_config=piece_wise_constant_arrival_config, eptmp_arrival_config=eptmp_arrival_config, arrival_type=self.arrival_config.client_arrival_type.value)
[docs] @staticmethod def from_grpc_object(obj: csle_collector.client_manager.client_manager_pb2.ClientDTO) -> "Client": """ Instantiates the object from a GRPC DTO :param obj: the object to instantiate from :return: the instantiated object """ arrival_type = ClientArrivalType(obj.arrival_type) if arrival_type.value == ClientArrivalType.EPTMP.value: arrival_config = EPTMPArrivalConfig.from_grpc_object(obj.eptmp_arrival_config) elif arrival_type.value == ClientArrivalType.SINE_MODULATED.value: arrival_config = SineArrivalConfig.from_grpc_object(obj.sine_arrival_config) elif arrival_type.value == ClientArrivalType.SPIKING.value: arrival_config = SpikingArrivalConfig.from_grpc_object(obj.spiking_arrival_config) elif arrival_type.value == ClientArrivalType.PIECE_WISE_CONSTANT.value: arrival_config = PieceWiseConstantArrivalConfig.from_grpc_object(obj.piece_wise_constant_arrival_config) elif arrival_type.value == ClientArrivalType.CONSTANT.value: arrival_config = ConstantArrivalConfig.from_grpc_object(obj.constant_arrival_config) else: raise ValueError(f"Client arrival type: {arrival_type} not recognized") workflow_distribution = list(map(lambda x: float(x), obj.workflow_distribution)) return Client(id=obj.id, workflow_distribution=workflow_distribution, mu=obj.mu, exponential_service_time=obj.exponential_service_time, arrival_config=arrival_config)
[docs] def generate_commands(self, workflows_config: WorkflowsConfig) -> List[str]: """ Generate the commands for the client :param workflows_config: the workflows configuration :return: sampled list of commands for the client """ commands: List[str] = [] w = np.random.choice(np.arange(0, len(workflows_config.workflow_services)), p=self.workflow_distribution) mc = workflows_config.get_workflow_mc(id=w) if mc is None: raise ValueError(f"Workflow not recognized: {w}") s = mc.initial_state service_time = 0 if self.exponential_service_time: service_time = expon.rvs(scale=(self.mu), loc=0, size=1)[0] done = False while not done: if not self.exponential_service_time: done = s == len(workflows_config.workflow_services) else: done = len(commands) > service_time if not done: service = workflows_config.get_workflow_service(id=s) if service is None: raise ValueError(f"Service with id: {s} not found, available service ids: " f"{list(map(lambda x: x.id, workflows_config.workflow_services))}") service_cmds = service.get_commands() commands.append(random.choice(service_cmds)) s_prime = mc.step_forward() if not self.exponential_service_time or s_prime != len(workflows_config.workflow_services): s = s_prime mc.reset() return commands
def __str__(self) -> str: """ :return: a string representation of the object """ return f"id: {self.id}, workflow distribution: {self.workflow_distribution}, mu: {self.mu}, " \ f"exponential service time: {self.exponential_service_time}, arrival config: {str(self.arrival_config)}"