Source code for csle_common.dao.emulation_config.kafka_topic

from typing import List, Dict, Any, Union
from csle_base.json_serializable import JSONSerializable


[docs]class KafkaTopic(JSONSerializable): """ DTO representing a kafka topic (Records are assumed to be comma-separated strings) where attributes define the list of columns in the csv. """ def __init__(self, name: str, num_partitions: int, num_replicas: int, attributes: List[str], retention_time_hours: int): """ Initializes the DTO :param name: the name of the topic :param num_partitions: the number of partitions :param num_replicas: the number of replicas :param attributes: the attributes of the topic :param retention_time_hours: the retention time of the topic (how long to store the data) """ self.name = name self.num_partitions = num_partitions self.num_replicas = num_replicas self.attributes = attributes self.retention_time_hours = retention_time_hours
[docs] @staticmethod def from_dict(d: Dict[str, Any]) -> "KafkaTopic": """ Converts a dict representation to an instance :param d: the dict to convert :return: the created instance """ obj = KafkaTopic(name=d["name"], num_replicas=d["num_replicas"], num_partitions=d["num_partitions"], attributes=d["attributes"], retention_time_hours=d["retention_time_hours"]) return obj
[docs] def to_dict(self) -> Dict[str, Any]: """ Converts the object to a dict representation :return: a dict representation of the object """ d: Dict[str, Union[str, int, List[str]]] = {} d["name"] = self.name d["num_partitions"] = self.num_partitions d["num_replicas"] = self.num_replicas d["attributes"] = self.attributes d["retention_time_hours"] = self.retention_time_hours return d
def __str__(self) -> str: """ :return: a string representation of the object """ return f"name:{self.name}, num_partitions: {self.num_partitions}, " \ f"num_replicas:{self.num_replicas}, attributes: {','.join(self.attributes)}, " \ f"retention_time_hours:{self.retention_time_hours}"
[docs] @staticmethod def from_json_file(json_file_path: str) -> "KafkaTopic": """ Reads a json file and converts it to a DTO :param json_file_path: the json file path :return: the converted DTO """ import io import json with io.open(json_file_path, 'r') as f: json_str = f.read() return KafkaTopic.from_dict(json.loads(json_str))
[docs] def copy(self) -> "KafkaTopic": """ :return: a copy of the DTO """ return KafkaTopic.from_dict(self.to_dict())
[docs] @staticmethod def schema() -> "KafkaTopic": """ :return: get the schema of the DTO """ return KafkaTopic(name="", num_partitions=1, num_replicas=1, attributes=[""], retention_time_hours=1)