Source code for csle_ryu.dao.avg_port_statistic

from typing import Dict, Any, List
from csle_ryu.dao.port_statistic import PortStatistic
from csle_base.json_serializable import JSONSerializable


[docs]class AvgPortStatistic(JSONSerializable): """ DTO containing data with average port statistics from an OpenFlow switch """ def __init__(self, timestamp: float, datapath_id: str, total_num_received_packets: int, total_num_received_bytes: int, total_num_received_errors: int, total_num_transmitted_packets: int, total_num_transmitted_bytes: int, total_num_transmitted_errors: int, total_num_received_dropped: int, total_num_transmitted_dropped, total_num_received_frame_errors, total_num_received_overrun_errors, total_num_received_crc_errors, total_num_collisions, avg_duration_nanoseconds, avg_duration_seconds): """ Initializes the DTO :param timestamp: the timestamp the statistic was received :param datapath_id: the datapath id :param total_num_received_packets: the total number of received packets on the port :param total_num_received_bytes: the total number of received bytes on the port :param total_num_received_errors: the total number of received errors on the port :param total_num_transmitted_packets: the total number of transmitted packets on the port :param total_num_transmitted_bytes: the total number of transmitted bytes on the port :param total_num_transmitted_errors: the total number of transmitted errors on the port :param total_num_received_dropped: the total number of received packets dropped on the port :param total_num_transmitted_dropped: the total number of transmitted packets dropped on the port :param total_num_received_frame_errors: the total number of received frame errors on the port :param total_num_received_overrun_errors: the total number of overrun errors on the port :param total_num_received_crc_errors: the total number of received crc errors on the port :param total_num_collisions: the total number of collisions on the port :param avg_duration_nanoseconds: the avg duration the port has been up in nanoseconds :param avg_duration_seconds: the avg duration the port has been up in seconds """ self.timestamp = timestamp self.datapath_id = datapath_id self.total_num_received_packets = total_num_received_packets self.total_num_received_bytes = total_num_received_bytes self.total_num_received_errors = total_num_received_errors self.total_num_transmitted_packets = total_num_transmitted_packets self.total_num_transmitted_bytes = total_num_transmitted_bytes self.total_num_transmitted_errors = total_num_transmitted_errors self.total_num_received_dropped = total_num_received_dropped self.total_num_transmitted_dropped = total_num_transmitted_dropped self.total_num_received_frame_errors = total_num_received_frame_errors self.total_num_received_overrun_errors = total_num_received_overrun_errors self.total_num_received_crc_errors = total_num_received_crc_errors self.total_num_collisions = total_num_collisions self.avg_duration_nanoseconds = avg_duration_nanoseconds self.avg_duration_seconds = avg_duration_seconds
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "AvgPortStatistic": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = AvgPortStatistic( timestamp=d["timestamp"], datapath_id=d["datapath_id"], total_num_received_packets=d["total_num_received_packets"], total_num_received_bytes=d["total_num_received_bytes"], total_num_received_errors=d["total_num_received_errors"], total_num_transmitted_packets=d["total_num_transmitted_packets"], total_num_transmitted_bytes=d["total_num_transmitted_bytes"], total_num_transmitted_errors=d["total_num_transmitted_errors"], total_num_received_dropped=d["total_num_received_dropped"], total_num_transmitted_dropped=d["total_num_transmitted_dropped"], total_num_received_frame_errors=d["total_num_received_frame_errors"], total_num_received_overrun_errors=d["total_num_received_overrun_errors"], total_num_received_crc_errors=d["total_num_received_crc_errors"], total_num_collisions=d["total_num_collisions"], avg_duration_nanoseconds=d["avg_duration_nanoseconds"], avg_duration_seconds=d["avg_duration_seconds"]) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["timestamp"] = self.timestamp d["datapath_id"] = self.datapath_id d["total_num_received_packets"] = self.total_num_received_packets d["total_num_received_bytes"] = self.total_num_received_bytes d["total_num_received_errors"] = self.total_num_received_errors d["total_num_transmitted_packets"] = self.total_num_transmitted_packets d["total_num_transmitted_bytes"] = self.total_num_transmitted_bytes d["total_num_transmitted_errors"] = self.total_num_transmitted_errors d["total_num_received_dropped"] = self.total_num_received_dropped d["total_num_transmitted_dropped"] = self.total_num_transmitted_dropped d["total_num_received_frame_errors"] = self.total_num_received_frame_errors d["total_num_received_overrun_errors"] = self.total_num_received_overrun_errors d["total_num_received_crc_errors"] = self.total_num_received_crc_errors d["total_num_collisions"] = self.total_num_collisions d["avg_duration_nanoseconds"] = self.avg_duration_nanoseconds d["avg_duration_seconds"] = self.avg_duration_seconds return d
def __str__(self) -> str: """ :return: a string representation of the object """ return f"timestamp: {self.timestamp}, datapath_id: {self.datapath_id}, " \ f"total_num_received_packets: {self.total_num_received_packets}, " \ f"total_num_received_bytes: {self.total_num_received_bytes}," \ f" total_num_received_errors: {self.total_num_received_errors}, " \ f"total_num_transmitted_packets: {self.total_num_transmitted_packets}, " \ f"total_num_transmitted_bytes: {self.total_num_transmitted_bytes}," \ f" total_num_transmitted_errors: {self.total_num_transmitted_errors}, " \ f"total_num_received_dropped: {self.total_num_received_dropped}, " \ f"total_num_transmitted_dropped: {self.total_num_transmitted_dropped}, " \ f"total_num_received_frame_errors: {self.total_num_received_frame_errors}, " \ f"total_num_received_overrun_errors: {self.total_num_received_overrun_errors}, " \ f"total_num_received_crc_errors: {self.total_num_received_crc_errors}, " \ f"total_num_collisions: {self.total_num_collisions}, " \ f"avg_duration_nanoseconds: {self.avg_duration_nanoseconds}, " \ f"avg_duration_seconds: {self.avg_duration_seconds}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "AvgPortStatistic": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return AvgPortStatistic.from_dict(json.loads(json_str))
[docs] def copy(self) -> "PortStatistic": """ :return: a copy of the DTO """ return PortStatistic.from_dict(self.to_dict())
[docs] @staticmethod def from_kafka_record(record: str) -> "AvgPortStatistic": """ Converts a kafka record to a DTO :param record: the kafka record :return: the DTO """ parts = record.split(",") obj = AvgPortStatistic(timestamp=float(parts[0]), datapath_id=parts[1], total_num_received_packets=int(parts[2]), total_num_received_bytes=int(parts[3]), total_num_received_errors=int(parts[4]), total_num_transmitted_packets=int(parts[5]), total_num_transmitted_bytes=int(parts[6]), total_num_transmitted_errors=int(parts[7]), total_num_received_dropped=int(parts[8]), total_num_transmitted_dropped=int(parts[9]), total_num_received_frame_errors=int(parts[10]), total_num_received_overrun_errors=int(parts[11]), total_num_received_crc_errors=int(parts[12]), total_num_collisions=int(parts[13]), avg_duration_nanoseconds=int(parts[14]), avg_duration_seconds=int(parts[15])) return obj
[docs] def to_kafka_record(self) -> str: """ Converts the DTO into a kafka record :return: the kafka record """ return f"{self.timestamp},{self.datapath_id},{self.total_num_received_packets}, " \ f"{self.total_num_received_bytes}," \ f"{self.total_num_received_errors},{self.total_num_transmitted_packets}, " \ f"{self.total_num_transmitted_bytes}," \ f"{self.total_num_transmitted_errors},{self.total_num_received_dropped}, " \ f"{self.total_num_transmitted_dropped}," \ f"{self.total_num_received_frame_errors},{self.total_num_received_overrun_errors}, " \ f"{self.total_num_received_crc_errors}," \ f"{self.total_num_collisions},{self.avg_duration_nanoseconds}, " \ f"{self.avg_duration_seconds}"
[docs] def update_with_kafka_record(self, record: str) -> None: """ Updates the DTO with a new kafka record :param record: the kafka record :return: None """ parts = record.split(",") self.timestamp = float(parts[0]) self.datapath_id = parts[1] self.total_num_received_packets = int(parts[2]) self.total_num_received_bytes = int(parts[3]) self.total_num_received_errors = int(parts[4]) self.total_num_transmitted_packets = int(parts[5]) self.total_num_transmitted_bytes = int(parts[6]) self.total_num_transmitted_errors = int(parts[7]) self.total_num_received_dropped = int(parts[8]) self.total_num_transmitted_dropped = int(parts[9]) self.total_num_received_frame_errors = int(parts[10]) self.total_num_received_overrun_errors = int(parts[11]) self.total_num_received_crc_errors = int(parts[12]) self.total_num_collisions = int(parts[13]) self.avg_duration_nanoseconds = int(parts[14]) self.avg_duration_seconds = int(parts[15])
[docs] @staticmethod def average_port_statistics(timestamp: float, datapath_id: str, port_statistics: List[PortStatistic]) -> "AvgPortStatistic": """ Computes the average metrics from a list of flow statistics :param port_statistics: the list of flow statistics to average :return: the computed averages """ total_num_received_packets = 0 total_num_received_bytes = 0 total_num_received_errors = 0 total_num_transmitted_packets = 0 total_num_transmitted_bytes = 0 total_num_transmitted_errors = 0 total_num_received_dropped = 0 total_num_transmitted_dropped = 0 total_num_received_frame_errors = 0 total_num_received_overrun_errors = 0 total_num_received_crc_errors = 0 total_num_collisions = 0 total_num_duration_nanoseconds = 0 total_num_duration_seconds = 0 for port in port_statistics: total_num_received_packets += port.num_received_packets total_num_received_bytes += port.num_received_bytes total_num_received_errors += port.num_received_errors total_num_transmitted_packets += port.num_transmitted_packets total_num_transmitted_bytes += port.num_transmitted_bytes total_num_transmitted_errors += port.num_transmitted_errors total_num_received_dropped += port.num_received_dropped total_num_transmitted_dropped += port.num_transmitted_dropped total_num_received_frame_errors += port.num_received_frame_errors total_num_received_overrun_errors += port.num_received_overrun_errors total_num_received_crc_errors += port.num_received_crc_errors total_num_collisions += port.num_collisions total_num_duration_nanoseconds += port.duration_nanoseconds total_num_duration_seconds += port.duration_seconds num_ports = len(port_statistics) aggregated_flow_statistics_dto = AvgPortStatistic( timestamp=timestamp, datapath_id=datapath_id, total_num_received_packets=total_num_received_packets, total_num_received_bytes=total_num_received_bytes, total_num_received_errors=total_num_received_errors, total_num_transmitted_packets=total_num_transmitted_packets, total_num_transmitted_bytes=total_num_transmitted_bytes, total_num_transmitted_errors=total_num_transmitted_errors, total_num_received_dropped=total_num_received_dropped, total_num_transmitted_dropped=total_num_transmitted_dropped, total_num_received_frame_errors=total_num_received_frame_errors, total_num_received_overrun_errors=total_num_received_overrun_errors, total_num_received_crc_errors=total_num_received_crc_errors, total_num_collisions=total_num_collisions, avg_duration_nanoseconds=int(total_num_duration_nanoseconds / num_ports), avg_duration_seconds=int(total_num_duration_seconds / num_ports) ) return aggregated_flow_statistics_dto