Source code for csle_ryu.dao.agg_flow_statistic

from typing import Dict, Any
from csle_base.json_serializable import JSONSerializable


[docs]class AggFlowStatistic(JSONSerializable): """ DTO containing aggregated flow statistics of an OpenFlow switch """ def __init__(self, timestamp: float, datapath_id: str, total_num_packets: int, total_num_bytes: int, total_num_flows: int): """ Initializes the DTO :param timestamp: the timestamp the data was received :param datapath_id: the datapath ID :param total_num_packets: the total number of packets :param total_num_bytes: the total number of bytes :param total_num_flows: the total number of flows :param avg_duration_nanoseconds: the avg duration of the flow in nanoseconds """ self.timestamp = timestamp self.datapath_id = datapath_id self.total_num_packets = total_num_packets self.total_num_bytes = total_num_bytes self.total_num_flows = total_num_flows
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "AggFlowStatistic": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = AggFlowStatistic( timestamp=d["timestamp"], datapath_id=d["datapath_id"], total_num_packets=d["total_num_packets"], total_num_flows=d["total_num_flows"], total_num_bytes=d["total_num_bytes"]) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Any] = {} d["timestamp"] = self.timestamp d["datapath_id"] = self.datapath_id d["total_num_packets"] = self.total_num_packets d["total_num_bytes"] = self.total_num_bytes d["total_num_flows"] = self.total_num_flows return d
def __str__(self) -> str: """ :return: a string representation of the object """ return f"timestamp: {self.timestamp}, datapath_id: {self.datapath_id}, " \ f"total_num_packets: {self.total_num_packets}," \ f"total_num_bytes: {self.total_num_bytes}, total_num_flows: {self.total_num_flows}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "AggFlowStatistic": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return AggFlowStatistic.from_dict(json.loads(json_str))
[docs] def copy(self) -> "AggFlowStatistic": """ Copies the object :return: a copy of the DTO """ return AggFlowStatistic.from_dict(self.to_dict())
[docs] @staticmethod def from_kafka_record(record: str) -> "AggFlowStatistic": """ Converts a kafka record to a DTO :param record: the kafka record :return: the DTO """ parts = record.split(",") obj = AggFlowStatistic(timestamp=float(parts[0]), datapath_id=parts[1], total_num_packets=int(parts[2]), total_num_bytes=int(parts[3]), total_num_flows=int(parts[4])) return obj
[docs] def to_kafka_record(self) -> str: """ Converts the DTO into a kafka record :return: the kafka record """ return f"{self.timestamp},{self.datapath_id}," \ f"{self.total_num_packets},{self.total_num_bytes},{self.total_num_flows}"
[docs] def update_with_kafka_record(self, record: str) -> None: """ Updates the DTO with a new kafka record :param record: the kafka record :return: None """ parts = record.split(",") self.timestamp = float(parts[0]) self.datapath_id = parts[1] self.total_num_packets = int(parts[2]) self.total_num_bytes = int(parts[3]) self.total_num_flows = int(parts[4])