from typing import Dict, Any
from csle_base.json_serializable import JSONSerializable
[docs]class FlowStatistic(JSONSerializable):
"""
DTO containing data of a flow statistic returned by an OpenFlow switch
"""
def __init__(self, timestamp: float, datapath_id: str, in_port: str, out_port: str, dst_mac_address: str,
num_packets: int, num_bytes: int, duration_nanoseconds: int, duration_seconds: int, hard_timeout: int,
idle_timeout: int, priority: int, cookie: int):
"""
Initializes the DTO
:param timestamp: the timestamp the data was received
:param datapath_id: the datapath ID
:param in_port: the input port of the flow
:param out_port: the output port of the flow
:param dst_mac_address: the destination MAC address of the flow
:param num_packets: the number of packets of the flow
:param num_bytes: the number of bytes of the flow
:param duration_nanoseconds: the duration of the flow in nanoseconds
:param duration_seconds: the duration of the flow in seconds
:param hard_timeout: the hard timeout of the flow
:param idle_timeout: the idle timeout of the flow
:param priority: the priority of the flow
:param cookie: the cookie of the flow
"""
self.timestamp = timestamp
self.datapath_id = datapath_id
self.in_port = in_port
self.out_port = out_port
self.dst_mac_address = dst_mac_address
self.num_packets = num_packets
self.num_bytes = num_bytes
self.duration_nanoseconds = duration_nanoseconds
self.duration_seconds = duration_seconds
self.hard_timeout = hard_timeout
self.idle_timeout = idle_timeout
self.priority = priority
self.cookie = cookie
[docs] @staticmethod
def from_dict(d: Dict[str, Any]) -> "FlowStatistic":
"""
Converts a dict representation to an instance
:param d: the dict to convert
:return: the created instance
"""
obj = FlowStatistic(
timestamp=d["timestamp"], datapath_id=d["datapath_id"], in_port=d["in_port"], out_port=d["out_port"],
dst_mac_address=d["dst_mac_address"], num_packets=d["num_packets"], num_bytes=d["num_bytes"],
duration_nanoseconds=d["duration_nanoseconds"], duration_seconds=d["duration_seconds"],
hard_timeout=d["hard_timeout"], idle_timeout=d["idle_timeout"], priority=d["priority"], cookie=d["cookie"]
)
return obj
[docs] def to_dict(self) -> Dict[str, Any]:
"""
Converts the object to a dict representation
:return: a dict representation of the object
"""
d: Dict[str, Any] = {}
d["timestamp"] = self.timestamp
d["datapath_id"] = self.datapath_id
d["in_port"] = self.in_port
d["out_port"] = self.out_port
d["dst_mac_address"] = self.dst_mac_address
d["num_packets"] = self.num_packets
d["num_bytes"] = self.num_bytes
d["duration_nanoseconds"] = self.duration_nanoseconds
d["duration_seconds"] = self.duration_seconds
d["hard_timeout"] = self.hard_timeout
d["idle_timeout"] = self.idle_timeout
d["priority"] = self.priority
d["cookie"] = self.cookie
return d
def __str__(self) -> str:
"""
:return: a string representation of the object
"""
return f"timestamp: {self.timestamp}, datapath_id: {self.datapath_id}, in_port: {self.in_port}, " \
f"out_port: {self.out_port}, dst_mac_address: {self.dst_mac_address}, num_packets: {self.num_packets}," \
f"num_bytes: {self.num_bytes}, duration_nanoseconds: {self.duration_nanoseconds}, " \
f"hard_timeout: {self.hard_timeout}, idle_timeout: {self.idle_timeout}, priority: {self.priority}," \
f"cookie: {self.cookie}"
[docs] @staticmethod
def from_json_file(json_file_path: str) -> "FlowStatistic":
"""
Reads a json file and converts it to a DTO
:param json_file_path: the json file path
:return: the converted DTO
"""
import io
import json
with io.open(json_file_path, 'r') as f:
json_str = f.read()
return FlowStatistic.from_dict(json.loads(json_str))
[docs] def copy(self) -> "FlowStatistic":
"""
:return: a copy of the DTO
"""
return FlowStatistic.from_dict(self.to_dict())
[docs] @staticmethod
def from_kafka_record(record: str) -> "FlowStatistic":
"""
Converts a kafka record to a DTO
:param record: the kafka record
:return: the DTO
"""
parts = record.split(",")
obj = FlowStatistic(timestamp=float(parts[0]), datapath_id=parts[1], in_port=parts[2],
out_port=parts[3], dst_mac_address=parts[4], num_packets=int(parts[5]),
num_bytes=int(parts[6]), duration_nanoseconds=int(parts[7]),
duration_seconds=int(parts[8]),
hard_timeout=int(parts[9]), idle_timeout=int(parts[10]), priority=int(parts[11]),
cookie=int(parts[12]))
return obj
[docs] def to_kafka_record(self) -> str:
"""
Converts the DTO into a kafka record
:return: the kafka record
"""
return f"{self.timestamp},{self.datapath_id},{self.in_port},{self.out_port},{self.dst_mac_address}," \
f"{self.num_packets},{self.num_bytes},{self.duration_nanoseconds},{self.duration_seconds}," \
f"{self.hard_timeout},{self.idle_timeout},{self.priority},{self.cookie}"
[docs] def update_with_kafka_record(self, record: str) -> None:
"""
Updates the DTO with a new kafka record
:param record: the kafka record
:return: None
"""
parts = record.split(",")
self.timestamp = float(parts[0])
self.datapath_id = str(parts[1])
self.in_port = str(parts[2])
self.out_port = str(parts[3])
self.dst_mac_address = str(parts[4])
self.num_packets = int(parts[5])
self.num_bytes = int(parts[6])
self.duration_nanoseconds = int(parts[7])
self.duration_seconds = int(parts[8])
self.hard_timeout = int(parts[9])
self.idle_timeout = int(parts[10])
self.priority = int(parts[11])
self.cookie = int(parts[12])