from typing import Tuple
import logging
import os
import subprocess
from concurrent import futures
import grpc
import socket
import netifaces
import csle_collector.elk_manager.elk_manager_pb2_grpc
import csle_collector.elk_manager.elk_manager_pb2
import csle_collector.constants.constants as constants
[docs]class ElkManagerServicer(csle_collector.elk_manager.elk_manager_pb2_grpc.ElkManagerServicer):
"""
gRPC server for managing an ELK stack. Allows to start/stop the ELK stack remotely and also to query the
state of the ELK stack.
"""
def __init__(self) -> None:
"""
Initializes the server
"""
logging.basicConfig(filename=f"{constants.LOG_FILES.ELK_MANAGER_LOG_DIR}"
f"{constants.LOG_FILES.ELK_MANAGER_LOG_FILE}", level=logging.INFO)
self.hostname = socket.gethostname()
try:
self.ip = netifaces.ifaddresses(constants.INTERFACES.ETH0)[netifaces.AF_INET][0][constants.INTERFACES.ADDR]
except Exception:
self.ip = socket.gethostbyname(self.hostname)
logging.info(f"Setting up ElkManager hostname: {self.hostname} ip: {self.ip}")
def _get_elk_status(self) -> Tuple[bool, bool, bool]:
"""
Utility method to get the status of the ELK stack
:return: status of elastic, status of kibana, status of logstash
"""
status_output = subprocess.run(constants.ELK.ELASTICSEARCH_STATUS.split(" "),
capture_output=True, text=True).stdout
elasticsearch_running = not ("not" in status_output)
status_output = subprocess.run(constants.ELK.KIBANA_STATUS.split(" "),
capture_output=True, text=True).stdout
kibana_running = not ("not" in status_output)
status_output = subprocess.run(constants.ELK.LOGSTASH_STATUS.split(" "),
capture_output=True, text=True).stdout
logstash_running = not ("not" in status_output)
return elasticsearch_running, kibana_running, logstash_running
[docs] def getElkStatus(self, request: csle_collector.elk_manager.elk_manager_pb2.GetElkStatusMsg,
context: grpc.ServicerContext) \
-> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Gets the state of the ELK server
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK stack server
"""
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=elasticsearch_running,
kibanaRunning=kibana_running,
logstashRunning=logstash_running)
return elk_dto
[docs] def stopElk(self, request: csle_collector.elk_manager.elk_manager_pb2.StopElkMsg, context: grpc.ServicerContext) \
-> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Stops the ELK stack
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK stack
"""
logging.info("Stopping the ELK stack")
logging.info("Stopping kibana")
os.system(constants.ELK.KIBANA_STOP)
logging.info("Stopping logstash")
os.system(constants.ELK.LOGSTASH_STOP)
logging.info("Stopping elasticsearch")
os.system(constants.ELK.ELASTICSEARCH_STOP)
return csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=False, kibanaRunning=False,
logstashRunning=False)
[docs] def startElk(self, request: csle_collector.elk_manager.elk_manager_pb2.StartElkMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Starts the ELK stack
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Starting ELK")
os.system(constants.ELK.ELK_START)
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=True, kibanaRunning=True,
logstashRunning=True)
return elk_dto
[docs] def startElastic(self, request: csle_collector.elk_manager.elk_manager_pb2.StartElasticMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Starts elasticsearch
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Starting Elasticsearch")
os.system(constants.ELK.ELASTICSEARCH_START)
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=True, kibanaRunning=kibana_running,
logstashRunning=logstash_running)
return elk_dto
[docs] def startKibana(self, request: csle_collector.elk_manager.elk_manager_pb2.StartKibanaMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Starts Kibana
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Starting Kibana")
os.system(constants.ELK.KIBANA_START)
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=elasticsearch_running,
kibanaRunning=True,
logstashRunning=logstash_running)
return elk_dto
[docs] def startLogstash(self, request: csle_collector.elk_manager.elk_manager_pb2.StartLogstashMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Starts Logstash
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Starting Logstash")
os.system(constants.ELK.LOGSTASH_START)
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=elasticsearch_running,
kibanaRunning=kibana_running,
logstashRunning=True)
return elk_dto
[docs] def stopElastic(self, request: csle_collector.elk_manager.elk_manager_pb2.StartElasticMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Stops elasticsearch
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Stops Elasticsearch")
os.system(constants.ELK.ELASTICSEARCH_STOP)
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=False, kibanaRunning=kibana_running,
logstashRunning=logstash_running)
return elk_dto
[docs] def stopKibana(self, request: csle_collector.elk_manager.elk_manager_pb2.StopKibanaMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Stops Kibana
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Stops Kibana")
os.system(constants.ELK.KIBANA_STOP)
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=elasticsearch_running,
kibanaRunning=False,
logstashRunning=logstash_running)
return elk_dto
[docs] def stopLogstash(self, request: csle_collector.elk_manager.elk_manager_pb2.StopLogstashMsg,
context: grpc.ServicerContext) -> csle_collector.elk_manager.elk_manager_pb2.ElkDTO:
"""
Stops Logstash
:param request: the gRPC request
:param context: the gRPC context
:return: an ElkDTO with the state of the ELK server
"""
logging.info("Stopping Logstash")
os.system(constants.ELK.LOGSTASH_STOP)
elasticsearch_running, kibana_running, logstash_running = self._get_elk_status()
elk_dto = csle_collector.elk_manager.elk_manager_pb2.ElkDTO(elasticRunning=elasticsearch_running,
kibanaRunning=kibana_running,
logstashRunning=False)
return elk_dto
[docs]def serve(port: int = 50045, log_dir: str = "/", max_workers: int = 10,
log_file_name: str = "elk_manager.log") -> None:
"""
Starts the gRPC server for managing the ELK stack
:param port: the port that the server will listen to
:param log_dir: the directory to write the log file
:param log_file_name: the file name of the log
:param max_workers: the maximum number of GRPC workers
:return: None
"""
constants.LOG_FILES.ELK_MANAGER_LOG_DIR = log_dir
constants.LOG_FILES.ELK_MANAGER_LOG_FILE = log_file_name
server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers))
csle_collector.elk_manager.elk_manager_pb2_grpc.add_ElkManagerServicer_to_server(
ElkManagerServicer(), server)
server.add_insecure_port(f'[::]:{port}')
server.start()
logging.info(f"ElkManager Server Started, Listening on port: {port}")
server.wait_for_termination()
# Program entrypoint
if __name__ == '__main__':
serve()