from typing import Tuple, List
import time
import logging
import subprocess
import os
from concurrent import futures
import grpc
import socket
import netifaces
import confluent_kafka
import confluent_kafka.admin
import csle_collector.kafka_manager.kafka_manager_pb2_grpc
import csle_collector.kafka_manager.kafka_manager_pb2
import csle_collector.constants.constants as constants
from csle_collector.kafka_manager.kafka_manager_util import KafkaManagerUtil
[docs]class KafkaManagerServicer(csle_collector.kafka_manager.kafka_manager_pb2_grpc.KafkaManagerServicer):
"""
gRPC server for managing a Kafka server. Allows to start/stop the kafka server remotely and also to query the
state of the server and create/delete topics.
"""
def __init__(self) -> None:
"""
Initializes the server
"""
logging.basicConfig(filename=f"{constants.LOG_FILES.KAFKA_MANAGER_LOG_DIR}"
f"{constants.LOG_FILES.KAFKA_MANAGER_LOG_FILE}", level=logging.INFO)
self.hostname = socket.gethostname()
try:
self.ip = netifaces.ifaddresses(constants.INTERFACES.ETH0)[netifaces.AF_INET][0][constants.INTERFACES.ADDR]
except Exception:
self.ip = socket.gethostbyname(self.hostname)
self.conf = {constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY: f"{self.ip}:{constants.KAFKA.PORT}",
constants.KAFKA.CLIENT_ID_PROPERTY: self.hostname}
logging.info(f"Setting up KafkaManager hostname: {self.hostname} ip: {self.ip}")
def _get_kafka_status_and_topics(self) -> Tuple[bool, List[str]]:
"""
Utility method to get the status of Kafka and existing topics
:return: status and list of topics
"""
status_output = subprocess.run(constants.KAFKA.KAFKA_STATUS.split(" "), capture_output=True, text=True).stdout
running = not ("not" in status_output)
topics = []
if running:
client = confluent_kafka.admin.AdminClient(self.conf)
try:
cluster_metadata = client.list_topics(timeout=1)
for k, v in cluster_metadata.topics.items():
topics.append(k)
except Exception as e:
logging.info(f"There was an exception listing the Kafka topics: {str(e)}, {repr(e)}")
return running, topics
[docs] def getKafkaStatus(self, request: csle_collector.kafka_manager.kafka_manager_pb2.GetKafkaStatusMsg,
context: grpc.ServicerContext) \
-> csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO:
"""
Gets the state of the kafka server
:param request: the gRPC request
:param context: the gRPC context
:return: a clients DTO with the state of the kafka server
"""
running, topics = self._get_kafka_status_and_topics()
kafka_dto = csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO(running=running, topics=topics)
return kafka_dto
[docs] def stopKafka(self, request: csle_collector.kafka_manager.kafka_manager_pb2.StopKafkaMsg,
context: grpc.ServicerContext):
"""
Stops the kafka server
:param request: the gRPC request
:param context: the gRPC context
:return: a clients DTO with the state of the kafka server
"""
logging.info("Stopping kafka")
os.system(constants.KAFKA.KAFKA_STOP)
return csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO(running=False, topics=[])
[docs] def startKafka(self, request: csle_collector.kafka_manager.kafka_manager_pb2.StartKafkaMsg,
context: grpc.ServicerContext) -> csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO:
"""
Starts the kafka server
:param request: the gRPC request
:param context: the gRPC context
:return: a clients DTO with the state of the kafka server
"""
logging.info("Starting kafka")
os.system(constants.KAFKA.KAFKA_START)
kafka_dto = csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO(running=True, topics=[])
return kafka_dto
[docs] def createTopic(self, request: csle_collector.kafka_manager.kafka_manager_pb2.CreateTopicMsg,
context: grpc.ServicerContext) -> csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO:
"""
Creates a new Kafka topic
:param request: the gRPC request
:param context: the gRPC context
:return: a clients DTO with the state of the kafka server
"""
logging.info(f"Creating topic: {request.name}, partitions:{request.partitions}, replicas:{request.replicas}, "
f"retention hours: {request.retention_time_hours}")
running, topics = self._get_kafka_status_and_topics()
client = confluent_kafka.admin.AdminClient(self.conf)
config = {
constants.KAFKA.RETENTION_MS_CONFIG_PROPERTY: KafkaManagerUtil.hours_to_ms(request.retention_time_hours)}
new_topic = confluent_kafka.admin.NewTopic(
request.name, request.partitions, request.replicas,
config=config)
client.create_topics([new_topic])
time.sleep(5)
kafka_dto = csle_collector.kafka_manager.kafka_manager_pb2.KafkaDTO(running=True,
topics=topics + [request.name])
return kafka_dto
[docs]def serve(port: int = 50051, log_dir: str = "/", max_workers: int = 10,
log_file_name: str = "kafka_manager.log") -> None:
"""
Starts the gRPC server for managing the kafka server
:param port: the port that the server will listen to
:param log_dir: the directory to write the log file
:param log_file_name: the file name of the log
:param max_workers: the maximum number of GRPC workers
:return: None
"""
constants.LOG_FILES.KAFKA_MANAGER_LOG_DIR = log_dir
constants.LOG_FILES.KAFKA_MANAGER_LOG_FILE = log_file_name
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
csle_collector.kafka_manager.kafka_manager_pb2_grpc.add_KafkaManagerServicer_to_server(
KafkaManagerServicer(), server)
server.add_insecure_port(f'[::]:{port}')
server.start()
logging.info(f"KafkaManager Server Started, Listening on port: {port}")
server.wait_for_termination()
# Program entrypoint
if __name__ == '__main__':
serve()