Source code for csle_ryu.dao.port_statistic

from typing import Dict, Any
from csle_base.json_serializable import JSONSerializable


[docs]class PortStatistic(JSONSerializable): """ DTO containing data with port statistics from an OpenFlow switch """ def __init__(self, timestamp: float, datapath_id: str, port: int, num_received_packets: int, num_received_bytes: int, num_received_errors: int, num_transmitted_packets: int, num_transmitted_bytes: int, num_transmitted_errors: int, num_received_dropped: int, num_transmitted_dropped: int, num_received_frame_errors: int, num_received_overrun_errors: int, num_received_crc_errors: int, num_collisions: int, duration_nanoseconds: int, duration_seconds: int): """ Initializes the DTO :param timestamp: the timestamp the statistic was received :param datapath_id: the datapath id :param port: the port :param num_received_packets: the number of received packets on the port :param num_received_bytes: the number of received bytes on the port :param num_received_errors: the number of received errors on the port :param num_transmitted_packets: the number of transmitted packets on the port :param num_transmitted_bytes: the nubmer of transmitted bytes on the port :param num_transmitted_errors: the number of transmitted errors on the port :param num_received_dropped: the number of received packets dropped on the port :param num_transmitted_dropped: the number of transmitted packets dropped on the port :param num_received_frame_errors: the number of received frame errors on the port :param num_received_overrun_errors: the number of overrun errors on the port :param num_received_crc_errors: the number of received crc errors on the port :param num_collisions: the number of collisions on the port :param duration_nanoseconds: the duration the port has been up in nanoseconds :param duration_seconds: the duration the port has been up in seconds """ self.timestamp = timestamp self.datapath_id = datapath_id self.port = port self.num_received_packets = num_received_packets self.num_received_bytes = num_received_bytes self.num_received_errors = num_received_errors self.num_transmitted_packets = num_transmitted_packets self.num_transmitted_bytes = num_transmitted_bytes self.num_transmitted_errors = num_transmitted_errors self.num_received_dropped = num_received_dropped self.num_transmitted_dropped = num_transmitted_dropped self.num_received_frame_errors = num_received_frame_errors self.num_received_overrun_errors = num_received_overrun_errors self.num_received_crc_errors = num_received_crc_errors self.num_collisions = num_collisions self.duration_nanoseconds = duration_nanoseconds self.duration_seconds = duration_seconds
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "PortStatistic": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = PortStatistic( timestamp=d["timestamp"], datapath_id=d["datapath_id"], port=d["port"], num_received_packets=d["num_received_packets"], num_received_bytes=d["num_received_bytes"], num_received_errors=d["num_received_errors"], num_transmitted_packets=d["num_transmitted_packets"], num_transmitted_bytes=d["num_transmitted_bytes"], num_transmitted_errors=d["num_transmitted_errors"], num_received_dropped=d["num_received_dropped"], num_transmitted_dropped=d["num_transmitted_dropped"], num_received_frame_errors=d["num_received_frame_errors"], num_received_overrun_errors=d["num_received_overrun_errors"], num_received_crc_errors=d["num_received_crc_errors"], num_collisions=d["num_collisions"], duration_nanoseconds=d["duration_nanoseconds"], duration_seconds=d["duration_seconds"]) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["timestamp"] = self.timestamp d["datapath_id"] = self.datapath_id d["port"] = self.port d["num_received_packets"] = self.num_received_packets d["num_received_bytes"] = self.num_received_bytes d["num_received_errors"] = self.num_received_errors d["num_transmitted_packets"] = self.num_transmitted_packets d["num_transmitted_bytes"] = self.num_transmitted_bytes d["num_transmitted_errors"] = self.num_transmitted_errors d["num_received_dropped"] = self.num_received_dropped d["num_transmitted_dropped"] = self.num_transmitted_dropped d["num_received_frame_errors"] = self.num_received_frame_errors d["num_received_overrun_errors"] = self.num_received_overrun_errors d["num_received_crc_errors"] = self.num_received_crc_errors d["num_collisions"] = self.num_collisions d["duration_nanoseconds"] = self.duration_nanoseconds d["duration_seconds"] = self.duration_seconds return d
def __str__(self) -> str: """ Gets a string representation of the DTO :return: a string representation of the object """ return f"timestamp: {self.timestamp}, datapath_id: {self.datapath_id}, " \ f"port: {self.port}, num_received_packets: {self.num_received_packets}, " \ f"num_received_bytes: {self.num_received_bytes}," \ f" num_received_errors: {self.num_received_errors}, " \ f"num_transmitted_packets: {self.num_transmitted_packets}, " \ f"num_transmitted_bytes: {self.num_transmitted_bytes}," \ f" num_transmitted_errors: {self.num_transmitted_errors}, " \ f"num_received_dropped: {self.num_received_dropped}, " \ f"num_transmitted_dropped: {self.num_transmitted_dropped}, " \ f"num_received_frame_errors: {self.num_received_frame_errors}, " \ f"num_received_overrun_errors: {self.num_received_overrun_errors}, " \ f"num_received_crc_errors: {self.num_received_crc_errors}, " \ f"num_collisions: {self.num_collisions}, duration_nanoseconds: {self.duration_nanoseconds}, " \ f"duration_seconds: {self.duration_seconds}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "PortStatistic": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return PortStatistic.from_dict(json.loads(json_str))
[docs] def copy(self) -> "PortStatistic": """ :return: a copy of the DTO """ return PortStatistic.from_dict(self.to_dict())
[docs] @staticmethod def from_kafka_record(record: str) -> "PortStatistic": """ Converts a kafka record to a DTO :param record: the kafka record :return: the DTO """ parts = record.split(",") obj = PortStatistic(timestamp=float(parts[0]), datapath_id=parts[1], port=int(parts[2]), num_received_packets=int(parts[3]), num_received_bytes=int(parts[4]), num_received_errors=int(parts[5]), num_transmitted_packets=int(parts[6]), num_transmitted_bytes=int(parts[7]), num_transmitted_errors=int(parts[8]), num_received_dropped=int(parts[9]), num_transmitted_dropped=int(parts[10]), num_received_frame_errors=int(parts[11]), num_received_overrun_errors=int(parts[12]), num_received_crc_errors=int(parts[13]), num_collisions=int(parts[14]), duration_nanoseconds=int(parts[15]), duration_seconds=int(parts[16])) return obj
[docs] def to_kafka_record(self) -> str: """ Converts the DTO into a kafka record :return: the kafka record """ return f"{self.timestamp},{self.datapath_id},{self.port},{self.num_received_packets}," \ f"{self.num_received_bytes}," \ f"{self.num_received_errors},{self.num_transmitted_packets},{self.num_transmitted_bytes}," \ f"{self.num_transmitted_errors},{self.num_received_dropped},{self.num_transmitted_dropped}," \ f"{self.num_received_frame_errors},{self.num_received_overrun_errors}," \ f"{self.num_received_crc_errors}," \ f"{self.num_collisions},{self.duration_nanoseconds},{self.duration_seconds}"
[docs] def update_with_kafka_record(self, record: str) -> None: """ Updates the DTO with a new kafka record :param record: the kafka record :return: None """ parts = record.split(",") self.timestamp = float(parts[0]) self.datapath_id = parts[1] self.port = int(parts[2]) self.num_received_packets = int(parts[3]) self.num_received_bytes = int(parts[4]) self.num_received_errors = int(parts[5]) self.num_transmitted_packets = int(parts[6]) self.num_transmitted_bytes = int(parts[7]) self.num_transmitted_errors = int(parts[8]) self.num_received_dropped = int(parts[9]) self.num_transmitted_dropped = int(parts[10]) self.num_received_frame_errors = int(parts[11]) self.num_received_overrun_errors = int(parts[12]) self.num_received_crc_errors = int(parts[13]) self.num_collisions = int(parts[14]) self.duration_nanoseconds = int(parts[15]) self.duration_seconds = int(parts[16])