Source code for csle_collector.kafka_manager.kafka_manager_util

from typing import Dict, Any
import csle_collector.kafka_manager.kafka_manager_pb2


[docs]class KafkaManagerUtil: """ Class with utility functions related to the Kafka Manager """
[docs] @staticmethod def kafka_dto_to_dict(kafka_dto: csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO) -> Dict[str, Any]: """ Converts a KafkaDTO to a dict :param kafka_dto: the dto to convert :return: a dict representation of the DTO """ d: Dict[str, Any] = {} d["running"] = kafka_dto.running topics = [] for top in kafka_dto.topics: topics.append(str(top)) d["topics"] = topics return d
[docs] @staticmethod def kafka_dto_from_dict(d: Dict[str, Any]) -> csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO: """ Converts a dict representation of a KafkaDTO to a DTO :param d: the dict to convert :return: the converted DTO """ kafka_dto = csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO(running=d["running"], topics=d["topics"]) return kafka_dto
[docs] @staticmethod def kafka_dto_empty() -> csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO: """ :return: an empty KafkaDTO """ kafka_dto = csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO() kafka_dto.running = False return kafka_dto
[docs] @staticmethod def hours_to_ms(hours: float) -> float: """ Convert hours to ms :param hours: the hours to convert :return: the ms """ return int((((hours * 1000) * 60) * 60))