Source code for csle_common.consumer_threads.aggregated_host_metrics_thread

from typing import List
import threading
import time
from csle_collector.host_manager.dao.host_metrics import HostMetrics
from csle_common.dao.emulation_observation.defender.emulation_defender_machine_observation_state import \
    EmulationDefenderMachineObservationState


[docs]class AggregatedHostMetricsThread(threading.Thread): """ Thread that polls the Kafka log to get the latest status of the docker statistics for a specific host """ def __init__(self, host_metrics: HostMetrics, machines: List[EmulationDefenderMachineObservationState], sleep_time: int) -> None: """ Initializes the thread :param host_metrics: the host metrics to update :param machines: the list of machines to update the host metrics with """ threading.Thread.__init__(self) self.machines = machines self.running = True self.host_metrics = host_metrics self.host_metrics_list: List[HostMetrics] = [] self.sleep_time = sleep_time
[docs] def run(self) -> None: """ Runs the thread :return: None """ while self.running: time.sleep(self.sleep_time) total_num_logged_in_users = 0 total_num_failed_login_attempts = 0 total_num_open_connections = 0 total_num_login_events = 0 total_num_processes = 0 total_num_users = 0 for m in self.machines: if m.host_metrics is None: raise ValueError("HostMetrics is None") total_num_logged_in_users += m.host_metrics.num_logged_in_users total_num_failed_login_attempts += m.host_metrics.num_failed_login_attempts total_num_open_connections += m.host_metrics.num_open_connections total_num_login_events += m.host_metrics.num_login_events total_num_processes += m.host_metrics.num_processes total_num_users += m.host_metrics.num_users self.host_metrics.num_logged_in_users = total_num_logged_in_users self.host_metrics.num_failed_login_attempts = total_num_failed_login_attempts self.host_metrics.num_open_connections = total_num_open_connections self.host_metrics.num_login_events = total_num_login_events self.host_metrics.num_processes = total_num_processes self.host_metrics.num_users = total_num_users self.host_metrics_list.append(self.host_metrics.copy())
[docs] def get_average_aggregated_host_metrics(self) -> HostMetrics: """ :return: average of the list of aggregated host metrics """ if len(self.host_metrics_list) == 0: return self.host_metrics.copy() if len(self.host_metrics_list) == 1: return self.host_metrics_list[0].copy() avg_host_metrics = HostMetrics() for i in range(0, len(self.host_metrics_list)): avg_host_metrics.num_logged_in_users = (avg_host_metrics.num_logged_in_users + self.host_metrics_list[i].num_logged_in_users) avg_host_metrics.num_failed_login_attempts = (avg_host_metrics.num_failed_login_attempts + self.host_metrics_list[i].num_failed_login_attempts) avg_host_metrics.num_open_connections = (avg_host_metrics.num_open_connections + self.host_metrics_list[i].num_open_connections) avg_host_metrics.num_login_events = (avg_host_metrics.num_login_events + self.host_metrics_list[i].num_login_events) avg_host_metrics.num_processes = avg_host_metrics.num_processes + self.host_metrics_list[i].num_processes avg_host_metrics.num_users = avg_host_metrics.num_users + self.host_metrics_list[i].num_users avg_host_metrics.num_logged_in_users = int(round(avg_host_metrics.num_logged_in_users / len(self.host_metrics_list))) avg_host_metrics.num_failed_login_attempts = int(round(avg_host_metrics.num_failed_login_attempts / len(self.host_metrics_list))) avg_host_metrics.num_open_connections = int(round(avg_host_metrics.num_open_connections / len(self.host_metrics_list))) avg_host_metrics.num_login_events = int(round(avg_host_metrics.num_login_events / len(self.host_metrics_list))) avg_host_metrics.num_processes = int(round(avg_host_metrics.num_processes / len(self.host_metrics_list))) avg_host_metrics.num_users = int(round(avg_host_metrics.num_users / len(self.host_metrics_list))) return avg_host_metrics