Source code for csle_collector.host_manager.dao.host_metrics

from typing import Dict, Any, Tuple, List, Union
import time
from csle_base.json_serializable import JSONSerializable
import csle_collector.host_manager.host_manager_pb2


[docs]class HostMetrics(JSONSerializable): """ DTO class containing host metrics """ def __init__(self, num_logged_in_users: int = 0, num_failed_login_attempts: int = 0, num_open_connections: int = 0, num_login_events: int = 0, num_processes: int = 0, num_users: int = 0, ip: Union[None, str] = None, ts: Union[float, None] = None) -> None: """ Initializes the DTO :param num_logged_in_users: the number of logged in users :param num_failed_login_attempts: the number of failed login attempts :param num_open_connections: the number of open connections :param num_login_events: the number of login events :param num_processes: the number of processes :param num_users: the number of users """ self.num_logged_in_users = num_logged_in_users self.num_failed_login_attempts = num_failed_login_attempts self.num_open_connections = num_open_connections self.num_login_events = num_login_events self.num_processes = num_processes self.num_users = num_users self.ts = ts self.ip = ip
[docs] def to_dto(self, ip: str) -> csle_collector.host_manager.host_manager_pb2.HostMetricsDTO: """ Converts the object into a gRPC DTO for serialization :param ip: the ip to add to the DTO in addition to the metrics :return: a csle_collector.host_manager.host_manager_pb2.HostMetricsDTO """ ts = time.time() return csle_collector.host_manager.host_manager_pb2.HostMetricsDTO( num_logged_in_users=self.num_logged_in_users, num_failed_login_attempts=self.num_failed_login_attempts, num_open_connections=self.num_open_connections, num_login_events=self.num_login_events, num_processes=self.num_processes, num_users=self.num_users, ip=ip, timestamp=ts)
[docs] def to_kafka_record(self, ip: str) -> str: """ Converts the DTO into a Kafka record string :param ip: the IP to add to the record in addition to the metrics :return: a comma separated string representing the kafka record """ ts = time.time() record_str = f"{ts},{ip},{self.num_logged_in_users},{self.num_failed_login_attempts}," \ f"{self.num_open_connections},{self.num_login_events},{self.num_processes},{self.num_users}" return record_str
[docs] @staticmethod def from_kafka_record(record: str) -> "HostMetrics": """ Converts the Kafka record string to a DTO :param record: the kafka record :return: the created DTO """ parts = record.split(",") obj = HostMetrics( ip=parts[1], ts=float(parts[0]), num_logged_in_users=int(parts[2]), num_failed_login_attempts=int(parts[3]), num_open_connections=int(parts[4]), num_login_events=int(parts[5]), num_processes=int(parts[6]), num_users=int(parts[7])) return obj
[docs] def update_with_kafka_record(self, record: str, ip: str) -> None: """ Updates the DTO based on a kafka record :param record: the kafka record :param ip: the host ip :return: None """ parts = record.split(",") if parts[1] == ip: self.ip = parts[1] self.ts = float(parts[0]) self.num_logged_in_users = int(parts[2]) self.num_failed_login_attempts = int(parts[3]) self.num_open_connections = int(parts[4]) self.num_login_events = int(parts[5]) self.num_processes = int(parts[6]) self.num_users = int(parts[7])
def __str__(self) -> str: """ :return: a string representation of the object """ return f"num_logged_in_users:{self.num_logged_in_users}, " \ f"num_failed_login_attempts: {self.num_failed_login_attempts}, " \ f"num_open_connections:{self.num_open_connections}, " \ f"num_login_events:{self.num_login_events}, num_processes: {self.num_processes}," \ f"num_users: {self.num_users}"
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "HostMetrics": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = HostMetrics( num_logged_in_users=d["num_logged_in_users"], num_failed_login_attempts=d["num_failed_login_attempts"], num_open_connections=d["num_open_connections"], num_login_events=d["num_login_events"], num_processes=d["num_processes"], num_users=d["num_users"], ip=d["ip"], ts=d["ts"], ) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ :return: a dict representation of the instance """ d: Dict[str, Any] = {} d["num_logged_in_users"] = self.num_logged_in_users d["num_failed_login_attempts"] = self.num_failed_login_attempts d["num_open_connections"] = self.num_open_connections d["num_login_events"] = self.num_login_events d["num_processes"] = self.num_processes d["num_users"] = self.num_users d["ts"] = self.ts d["ip"] = self.ip return d
[docs] def copy(self) -> "HostMetrics": """ :return: a copy of the object """ c = HostMetrics( num_logged_in_users=self.num_logged_in_users, num_failed_login_attempts=self.num_failed_login_attempts, num_open_connections=self.num_open_connections, num_login_events=self.num_login_events, num_processes=self.num_processes, num_users=self.num_users, ip=self.ip, ts=self.ts ) return c
[docs] def get_deltas(self, stats_prime: "HostMetrics") -> Tuple[List[int], List[str]]: """ Get the deltas between two stats objects :param stats_prime: the stats object to compare with :param max_counter: the maximum counter_value :return: the deltas and the labels """ deltas = [ int(stats_prime.num_logged_in_users - self.num_logged_in_users), int(stats_prime.num_failed_login_attempts - self.num_failed_login_attempts), int(stats_prime.num_open_connections - self.num_open_connections), int(stats_prime.num_login_events - self.num_login_events), int(stats_prime.num_processes - self.num_processes), int(stats_prime.num_users - self.num_users) ] labels = ["num_logged_in_users", "num_failed_login_attempts", "num_open_connections", "num_login_events", "num_processes", "num_users"] return deltas, labels
[docs] def get_values(self) -> Tuple[List[int], List[str]]: """ Get the current values :return: the values and the labels """ deltas = [ int(self.num_logged_in_users), int(self.num_failed_login_attempts), int(self.num_open_connections), int(self.num_login_events), int(self.num_processes), int(self.num_users) ] labels = ["num_logged_in_users", "num_failed_login_attempts", "num_open_connections", "num_login_events", "num_processes", "num_users"] return deltas, labels
[docs] def num_attributes(self) -> int: """ :return: The number of attributes of the DTO """ return 8
[docs] @staticmethod def schema() -> "HostMetrics": """ :return: get the schema of the DTO """ return HostMetrics()
[docs] @staticmethod def from_json_file(json_file_path: str) -> "HostMetrics": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return HostMetrics.from_dict(json.loads(json_str))