Source code for csle_collector.host_manager.host_manager

from typing import List, Union
import logging
import socket
import netifaces
import time
import grpc
import os
from concurrent import futures
import subprocess
import csle_collector.host_manager.host_manager_pb2_grpc
import csle_collector.host_manager.host_manager_pb2
from csle_collector.host_manager.host_manager_util import HostManagerUtil
from csle_collector.host_manager.threads.host_monitor_thread import HostMonitorThread
import csle_collector.constants.constants as constants


[docs]class HostManagerServicer(csle_collector.host_manager.host_manager_pb2_grpc.HostManagerServicer): """ gRPC server for collecting Host statistics. """ def __init__(self) -> None: """ Initializes the server """ logging.basicConfig(filename=f"{constants.LOG_FILES.HOST_MANAGER_LOG_DIR}" f"{constants.LOG_FILES.HOST_MANAGER_LOG_FILE}", level=logging.INFO) self.hostname = socket.gethostname() try: self.ip = netifaces.ifaddresses(constants.INTERFACES.ETH0)[netifaces.AF_INET][0][constants.INTERFACES.ADDR] except Exception: self.ip = socket.gethostbyname(self.hostname) self.conf = {constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY: f"{self.ip}:{constants.KAFKA.PORT}", constants.KAFKA.CLIENT_ID_PROPERTY: self.hostname} self.host_monitor_thread: Union[None, HostMonitorThread] = None logging.info(f"Starting the HostManager hostname: {self.hostname} ip: {self.ip}")
[docs] def getHostMetrics(self, request: csle_collector.host_manager.host_manager_pb2.GetHostMetricsMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostMetricsDTO: """ Gets the host metrics from given timestamps :param request: the gRPC request :param context: the gRPC context :return: a DTO with IDS statistics """ host_metrics = HostManagerUtil.read_host_metrics(failed_auth_last_ts=request.failed_auth_last_ts, login_last_ts=request.login_last_ts) host_metrics_dto = host_metrics.to_dto(ip=self.ip) return host_metrics_dto
[docs] def startHostMonitor(self, request: csle_collector.host_manager.host_manager_pb2.StartHostMonitorMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Starts the Host monitor thread :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host monitor thread """ logging.info(f"Starting the HostMonitor thread, timestep length: {request.time_step_len_seconds}, " f"kafka ip: {request.kafka_ip}, " f"kafka port: {request.kafka_port}") if self.host_monitor_thread is not None: self.host_monitor_thread.running = False self.host_monitor_thread = HostMonitorThread(kafka_ip=request.kafka_ip, kafka_port=request.kafka_port, ip=self.ip, hostname=self.hostname, time_step_len_seconds=request.time_step_len_seconds) self.host_monitor_thread.start() logging.info("Started the HostMonitor thread") filebeat_status = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=True, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def stopHostMonitor(self, request: csle_collector.host_manager.host_manager_pb2.StopHostMonitorMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Stops the Host monitor thread if it is running :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host monitor thread """ logging.info("Stopping the host monitor") if self.host_monitor_thread is not None: self.host_monitor_thread.running = False logging.info("Host monitor stopped") filebeat_running = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=False, filebeat_running=filebeat_running, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def startFilebeat(self, request: csle_collector.host_manager.host_manager_pb2.StartFilebeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Starts filebeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Starting filebeat") HostManagerServicer._start_filebeat() logging.info("Started filebeat") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=True, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def stopFilebeat(self, request: csle_collector.host_manager.host_manager_pb2.StopFilebeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Stops filebeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Stopping filebeat") HostManagerServicer._stop_filebeat() logging.info("Filebeat stopped") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=False, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def configFilebeat(self, request: csle_collector.host_manager.host_manager_pb2.ConfigFilebeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Updates the configuration of filebeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info(f"Updating the filebeat configuration, " f"log_files_paths: {request.log_files_paths}, " f"kibana_ip : {request.kibana_ip}, kibana_port: {request.kibana_port}, " f"elastic_ip: {request.elastic_ip}, elastic_port: {request.elastic_port}, " f"num_elastic_shards: {request.num_elastic_shards}, reload_enabled: {request.reload_enabled}, " f"kafka: {request.kafka}, kafka_ip: {request.kafka_ip}, kafka_port: {request.kafka_port}, " f"kafka_topics: {request.kafka_topics}, filebeat_modules: {request.filebeat_modules}") HostManagerServicer._set_filebeat_config( log_files_paths=list(request.log_files_paths), kibana_ip=request.kibana_ip, kibana_port=request.kibana_port, elastic_ip=request.elastic_ip, elastic_port=request.elastic_port, num_elastic_shards=request.num_elastic_shards, reload_enabled=request.reload_enabled, kafka=request.kafka, kafka_ip=request.kafka_ip, kafka_port=request.kafka_port, kafka_topics=list(request.kafka_topics), filebeat_modules=list(request.filebeat_modules)) logging.info("Filebeat configuration updated") monitor_running = self._is_monitor_running() filebeat_running = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_running, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def startMetricbeat(self, request: csle_collector.host_manager.host_manager_pb2.StartMetricbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Starts metricbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Starting metricbeat") HostManagerServicer._start_metricbeat() logging.info("Started metricbeat") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() filebeat_status = HostManagerServicer._get_filebeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=True, heartbeat_running=heartbeat_status)
[docs] def stopMetricbeat(self, request: csle_collector.host_manager.host_manager_pb2.StopMetricbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Stops metricbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Stopping metricbeat") HostManagerServicer._stop_metricbeat() logging.info("Metricbeat stopped") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() filebeat_status = HostManagerServicer._get_filebeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=False, heartbeat_running=heartbeat_status)
[docs] def configMetricbeat(self, request: csle_collector.host_manager.host_manager_pb2.ConfigMetricbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Updates the configuration of metricbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info(f"Updating the metricbeat configuration, " f"kibana_ip : {request.kibana_ip}, kibana_port: {request.kibana_port}, " f"elastic_ip: {request.elastic_ip}, elastic_port: {request.elastic_port}, " f"num_elastic_shards: {request.num_elastic_shards}, reload_enabled: {request.reload_enabled}, " f"kafka_ip: {request.kafka_ip}, kafka_port: {request.kafka_port}, " f"metricbeat_modules: {request.metricbeat_modules}") HostManagerServicer._set_metricbeat_config( kibana_ip=request.kibana_ip, kibana_port=request.kibana_port, elastic_ip=request.elastic_ip, elastic_port=request.elastic_port, num_elastic_shards=request.num_elastic_shards, reload_enabled=request.reload_enabled, kafka_ip=request.kafka_ip, kafka_port=request.kafka_port, metricbeat_modules=list(request.metricbeat_modules)) logging.info("Metricbeat configuration updated") monitor_running = self._is_monitor_running() filebeat_running = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_running, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def startPacketbeat(self, request: csle_collector.host_manager.host_manager_pb2.StartPacketbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Starts packetbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Starting packetbeat") HostManagerServicer._start_packetbeat() logging.info("Started packetbeat") monitor_running = self._is_monitor_running() filebeat_status = HostManagerServicer._get_filebeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=True, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def stopPacketbeat(self, request: csle_collector.host_manager.host_manager_pb2.StopPacketbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Stops packetbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Stopping packetbeat") HostManagerServicer._stop_packetbeat() logging.info("Packetbeat stopped") monitor_running = self._is_monitor_running() filebeat_status = HostManagerServicer._get_filebeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=False, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def configPacketbeat(self, request: csle_collector.host_manager.host_manager_pb2.ConfigPacketbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Updates the configuration of packetbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info(f"Updating the packetbeat configuration," f"kibana_ip : {request.kibana_ip}, kibana_port: {request.kibana_port}, " f"elastic_ip: {request.elastic_ip}, elastic_port: {request.elastic_port}, " f"num_elastic_shards: {request.num_elastic_shards}") HostManagerServicer._set_packetbeat_config( kibana_ip=request.kibana_ip, kibana_port=request.kibana_port, elastic_ip=request.elastic_ip, elastic_port=request.elastic_port, num_elastic_shards=request.num_elastic_shards) logging.info("Packetbeat configuration updated") monitor_running = self._is_monitor_running() filebeat_running = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_running, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def getHostStatus(self, request: csle_collector.host_manager.host_manager_pb2.GetHostStatusMsg, context: grpc.ServicerContext) \ -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Gets the status of the host :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the host """ monitor_running = self._is_monitor_running() filebeat_running = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_running, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def startHeartbeat(self, request: csle_collector.host_manager.host_manager_pb2.StartHeartbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Starts heartbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Starting heartbeat") HostManagerServicer._start_heartbeat() logging.info("Started heartbeat") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() filebeat_status = HostManagerServicer._get_filebeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=True)
[docs] def stopHeartbeat(self, request: csle_collector.host_manager.host_manager_pb2.StopHeartbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Stops heartbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Stopping heartbeat") HostManagerServicer._stop_heartbeat() logging.info("Heartbeat stopped") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() filebeat_status = HostManagerServicer._get_filebeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=False)
[docs] def configHeartbeat(self, request: csle_collector.host_manager.host_manager_pb2.ConfigHeartbeatMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Updates the configuration of heartbeat :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info(f"Updating the heartbeat configuration, " f"kibana_ip : {request.kibana_ip}, kibana_port: {request.kibana_port}, " f"elastic_ip: {request.elastic_ip}, elastic_port: {request.elastic_port}, " f"num_elastic_shards: {request.num_elastic_shards}, hosts_to_monitor: {request.hosts_to_monitor}") HostManagerServicer._set_heartbeat_config( kibana_ip=request.kibana_ip, kibana_port=request.kibana_port, elastic_ip=request.elastic_ip, elastic_port=request.elastic_port, num_elastic_shards=request.num_elastic_shards, hosts_to_monitor=list(request.hosts_to_monitor)) logging.info("Heartbeat configuration updated") monitor_running = self._is_monitor_running() filebeat_running = HostManagerServicer._get_filebeat_status() packetbeat_status = HostManagerServicer._get_packetbeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_running, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def startSpark(self, request: csle_collector.host_manager.host_manager_pb2.StartSparkMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Starts Spark :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Starting spark") HostManagerServicer._start_spark() logging.info("Started spark") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() filebeat_status = HostManagerServicer._get_filebeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
[docs] def stopSpark(self, request: csle_collector.host_manager.host_manager_pb2.StopSparkMsg, context: grpc.ServicerContext) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Stops Spark :param request: the gRPC request :param context: the gRPC context :return: a DTO with the status of the Host """ logging.info("Stopping spark") HostManagerServicer._stop_spark() logging.info("Stopped spark") monitor_running = self._is_monitor_running() packetbeat_status = HostManagerServicer._get_packetbeat_status() filebeat_status = HostManagerServicer._get_filebeat_status() metricbeat_status = HostManagerServicer._get_metricbeat_status() heartbeat_status = HostManagerServicer._get_heartbeat_status() return csle_collector.host_manager.host_manager_pb2.HostStatusDTO(monitor_running=monitor_running, filebeat_running=filebeat_status, packetbeat_running=packetbeat_status, metricbeat_running=metricbeat_status, heartbeat_running=heartbeat_status)
def _is_monitor_running(self) -> bool: """ Utility method to check if the monitor is running :return: True if running else false """ if self.host_monitor_thread is not None: return self.host_monitor_thread.running return False @staticmethod def _get_filebeat_status() -> bool: """ Utility method to get the status of filebeat :return: status of filebeat """ logging.info(f"Getting filebeat status with command: {constants.FILEBEAT.FILEBEAT_STATUS}") output = subprocess.run(constants.FILEBEAT.FILEBEAT_STATUS.split(" "), capture_output=True, text=True) filebeat_running = not ("not" in output.stdout) logging.info(f"Got filebeat status, output:{output.stdout}, err output: {output.stderr} ") return filebeat_running @staticmethod def _start_filebeat() -> None: """ Utility method to start filebeat :return: None """ logging.info(f"Starting filebeat with command: {constants.FILEBEAT.FILEBEAT_START}") output = subprocess.run(constants.FILEBEAT.FILEBEAT_START.split(" "), capture_output=True, text=True) logging.info(f"Started filebeat, stdout:{output.stdout}, stderr: {output.stderr}") @staticmethod def _stop_filebeat() -> None: """ Utility method to stop filebeat :return: None """ logging.info(f"Stopping filebeat with command: {constants.FILEBEAT.FILEBEAT_STOP}") output = subprocess.run(constants.FILEBEAT.FILEBEAT_STOP.split(" "), capture_output=True, text=True) logging.info(f"Stopped filebeat, output:{output.stdout}, err output: {output.stderr} ") @staticmethod def _set_filebeat_config(log_files_paths: List[str], kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int, kafka_topics: List[str], kafka_ip: str, filebeat_modules: List[str], kafka_port: int, reload_enabled: bool = False, kafka: bool = False) -> None: """ Updates the filebeat configuration file :param log_files_paths: the list of log files that filebeat should monitor :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :param reload_enabled: whether automatic reload of modules should be enabled :param kafka: whether kafka should be added as input :param kafka_topics: list of kafka topics to ingest :param kafka_port: the kafka server port :param kafka_ip: the kafka server ip :param filebeat_modules: list of filebeat modules to enable :return: None """ filebeat_config = HostManagerUtil.filebeat_config(log_files_paths=log_files_paths, kibana_ip=kibana_ip, kibana_port=kibana_port, elastic_ip=elastic_ip, elastic_port=elastic_port, num_elastic_shards=num_elastic_shards, reload_enabled=reload_enabled, kafka=kafka, kafka_port=kafka_port, kafka_ip=kafka_ip, kafka_topics=kafka_topics) for module in filebeat_modules: if module == constants.FILEBEAT.SYSTEM_MODULE: HostManagerServicer.set_filebeat_system_module_config() elif module == constants.FILEBEAT.SNORT_MODULE: HostManagerServicer.set_filebeat_snort_module_config() elif module == constants.FILEBEAT.KAFKA_MODULE: HostManagerServicer.set_filebeat_kafka_module_config() elif module == constants.FILEBEAT.KIBANA_MODULE: HostManagerServicer.set_filebeat_kibana_module_config() elif module == constants.FILEBEAT.ELASTICSEARCH_MODULE: HostManagerServicer.set_filebeat_elasticsearch_module_config() elif module == constants.FILEBEAT.LOGSTASH_MODULE: HostManagerServicer.set_filebeat_logstash_module_config() else: logging.warning(f"Filebeat module: {module} not recognized") logging.info(f"Updating filebeat config: \n{filebeat_config}") HostManagerUtil.write_yaml_config(config=filebeat_config, path=constants.FILEBEAT.CONFIG_FILE) logging.info(f"Running filebeat setup command: {constants.FILEBEAT.SETUP_CMD}") output = subprocess.run(constants.FILEBEAT.SETUP_CMD.split(" "), capture_output=True, text=True) logging.info(f"Stdout of the setup command: {output.stdout}, stderr of the setup command: {output.stderr}") @staticmethod def _get_metricbeat_status() -> bool: """ Utility method to get the status of metricbeat :return: status of metricbeat """ logging.info(f"Getting metricbeat status with command: {constants.METRICBEAT.METRICBEAT_STATUS}") output = subprocess.run(constants.METRICBEAT.METRICBEAT_STATUS.split(" "), capture_output=True, text=True) metricbeat_running = not ("not" in output.stdout) logging.info(f"Got metricbeat status, output:{output.stdout}, err output: {output.stderr} ") return metricbeat_running @staticmethod def _start_metricbeat() -> None: """ Utility method to start metricbeat :return: None """ logging.info(f"Starting metricbeat with command: {constants.METRICBEAT.METRICBEAT_START}") output = subprocess.run(constants.METRICBEAT.METRICBEAT_START.split(" "), capture_output=True, text=True) logging.info(f"Started metricbeat, stdout:{output.stdout}, stderr: {output.stderr}") @staticmethod def _stop_metricbeat() -> None: """ Utility method to stop metricbeat :return: None """ logging.info(f"Stopping metricbeat with command: {constants.METRICBEAT.METRICBEAT_STOP}") output = subprocess.run(constants.METRICBEAT.METRICBEAT_STOP.split(" "), capture_output=True, text=True) logging.info(f"Stopped metricbeat, output:{output.stdout}, err output: {output.stderr} ") @staticmethod def _set_metricbeat_config( kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int, metricbeat_modules: List[str], kafka_ip: str, kafka_port: int, reload_enabled: bool = False) -> None: """ Updates the metricbeat configuration file :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :param reload_enabled: whether automatic reload of modules should be enabled :param metricbeat_modules: list of metricbeat modules to enable :param kafka_ip: the ip of the kafka server :param kafka_port: the port of the kafka server :return: None """ metricbeat_config = HostManagerUtil.metricbeat_config( kibana_ip=kibana_ip, kibana_port=kibana_port, elastic_ip=elastic_ip, elastic_port=elastic_port, num_elastic_shards=num_elastic_shards, reload_enabled=reload_enabled) for module in metricbeat_modules: if module == constants.METRICBEAT.SYSTEM_MODULE: HostManagerServicer.set_metricbeat_system_module_config() elif module == constants.METRICBEAT.LINUX_MODULE: HostManagerServicer.set_metricbeat_linux_module_config() elif module == constants.METRICBEAT.KAFKA_MODULE: HostManagerServicer.set_metricbeat_kafka_module_config(kafka_ip=kafka_ip, kafka_port=kafka_port) elif module == constants.METRICBEAT.KIBANA_MODULE: HostManagerServicer.set_metricbeat_kibana_module_config(kibana_ip=kibana_ip, kibana_port=kibana_port) elif module == constants.METRICBEAT.ELASTICSEARCH_MODULE: HostManagerServicer.set_metricbeat_elasticsearch_module_config(elastic_ip=elastic_ip, elastic_port=elastic_port) elif module == constants.METRICBEAT.LOGSTASH_MODULE: HostManagerServicer.set_metricbeat_logstash_module_config(logstash_ip=elastic_ip, logstash_port=elastic_port) else: logging.warning(f"Metricbeat module: {module} not recognized") logging.info(f"Updating metricbeat config: \n{metricbeat_config}") HostManagerUtil.write_yaml_config(config=metricbeat_config, path=constants.METRICBEAT.CONFIG_FILE) logging.info(f"Running metricbeat setup command: {constants.METRICBEAT.SETUP_CMD}") output = subprocess.run(constants.METRICBEAT.SETUP_CMD.split(" "), capture_output=True, text=True) logging.info(f"Stdout of the setup command: {output.stdout}, stderr of the setup command: {output.stderr}")
[docs] @staticmethod def set_filebeat_snort_module_config() -> None: """ Updates the filebeat snort module configuration :return: None """ logging.info(f"Enabling module with command: " f"{constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.SNORT_MODULE)}") output = subprocess.run(constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.SNORT_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") snort_module_config = HostManagerUtil.filebeat_snort_module_config() logging.info(f"Updating filebeat snort module config: \n{snort_module_config}") HostManagerUtil.write_yaml_config(config=snort_module_config, path=f"{constants.FILEBEAT.MODULES_CONFIG_DIR}" f"{constants.FILEBEAT.SNORT_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_filebeat_elasticsearch_module_config() -> None: """ Updates the filebeat elasticsearch module configuration :return: None """ logging.info(f"Enabling module with command: " f"{constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.ELASTICSEARCH_MODULE)}") output = subprocess.run(constants.FILEBEAT.ENABLE_MODULE_CMD.format( constants.FILEBEAT.ELASTICSEARCH_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") elasticsearch_module_config = HostManagerUtil.filebeat_elasticsearch_module_config() logging.info(f"Updating filebeat elasticsearch module config: \n{elasticsearch_module_config}") HostManagerUtil.write_yaml_config(config=elasticsearch_module_config, path=f"{constants.FILEBEAT.MODULES_CONFIG_DIR}" f"{constants.FILEBEAT.ELASTICSEARCH_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_filebeat_logstash_module_config() -> None: """ Updates the filebeat logstash module configuration :return: None """ logging.info(f"Enabling module with command: " f"{constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.LOGSTASH_MODULE)}") output = subprocess.run(constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.LOGSTASH_MODULE).split( " "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") logstash_module_config = HostManagerUtil.filebeat_logstash_module_config() logging.info(f"Updating filebeat logstash module config: \n{logstash_module_config}") HostManagerUtil.write_yaml_config(config=logstash_module_config, path=f"{constants.FILEBEAT.MODULES_CONFIG_DIR}" f"{constants.FILEBEAT.LOGSTASH_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_filebeat_kibana_module_config() -> None: """ Updates the filebeat kibana module configuration :return: None """ logging.info(f"Enabling module with command: " f"{constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.KIBANA_MODULE)}") output = subprocess.run(constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.KIBANA_MODULE).split( " "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") kibana_module_config = HostManagerUtil.filebeat_kibana_module_config() logging.info(f"Updating filebeat kibana module config: \n{kibana_module_config}") HostManagerUtil.write_yaml_config(config=kibana_module_config, path=f"{constants.FILEBEAT.MODULES_CONFIG_DIR}" f"{constants.FILEBEAT.KIBANA_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_filebeat_system_module_config() -> None: """ Updates the filebeat system module configuration :return: None """ logging.info(f"Enabling module with command: " f"{constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.SYSTEM_MODULE)}") output = subprocess.run(constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.SYSTEM_MODULE).split( " "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") system_module_config = HostManagerUtil.filebeat_system_module_config() logging.info(f"Updating filebeat system module config: \n{system_module_config}") HostManagerUtil.write_yaml_config(config=system_module_config, path=f"{constants.FILEBEAT.MODULES_CONFIG_DIR}" f"{constants.FILEBEAT.SYSTEM_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_filebeat_kafka_module_config() -> None: """ Updates the filebeat kafka module configuration :return: None """ logging.info(f"Enabling module with command: " f"{constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.KAFKA_MODULE)}") output = subprocess.run(constants.FILEBEAT.ENABLE_MODULE_CMD.format(constants.FILEBEAT.KAFKA_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") kafka_module_config = HostManagerUtil.filebeat_kafka_module_config() logging.info(f"Updating filebeat kafka module config: \n{kafka_module_config}") HostManagerUtil.write_yaml_config(config=kafka_module_config, path=f"{constants.FILEBEAT.MODULES_CONFIG_DIR}" f"{constants.FILEBEAT.KAFKA_MODULE_CONFIG_FILE}")
@staticmethod def _get_packetbeat_status() -> bool: """ Utility method to get the status of packetbeat :return: status of filebeat """ logging.info(f"Getting packetbeat status with command: {constants.PACKETBEAT.PACKETBEAT_STATUS}") output = subprocess.run(constants.PACKETBEAT.PACKETBEAT_STATUS.split(" "), capture_output=True, text=True) packetbeat_running = not ("not" in output.stdout) logging.info(f"Got packetbeat status, output:{output.stdout}, err output: {output.stderr} ") return packetbeat_running @staticmethod def _start_packetbeat() -> None: """ Utility method to start packetbeat :return: None """ logging.info(f"Starting packetbeat with command: {constants.PACKETBEAT.PACKETBEAT_START}") output = subprocess.run(constants.PACKETBEAT.PACKETBEAT_START.split(" "), capture_output=True, text=True) logging.info(f"Started packetbeat, stdout:{output.stdout}, stderr: {output.stderr}") @staticmethod def _stop_packetbeat() -> None: """ Utility method to stop packetbeat :return: None """ logging.info(f"Stopping packetbeat with command: {constants.PACKETBEAT.PACKETBEAT_STOP}") output = subprocess.run(constants.PACKETBEAT.PACKETBEAT_STOP.split(" "), capture_output=True, text=True) logging.info(f"Stopped packetbeat, output:{output.stdout}, err output: {output.stderr} ") @staticmethod def _set_packetbeat_config(kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int) -> None: """ Updates the packetbeat configuration file :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :return: None """ packetbeat_config = HostManagerUtil.packetbeat_config( kibana_ip=kibana_ip, kibana_port=kibana_port, elastic_ip=elastic_ip, elastic_port=elastic_port, num_elastic_shards=num_elastic_shards) logging.info(f"Updating packetbeat config: \n{packetbeat_config}") HostManagerUtil.write_yaml_config(config=packetbeat_config, path=constants.PACKETBEAT.CONFIG_FILE) logging.info(f"Running packetbeat setup command: {constants.PACKETBEAT.SETUP_CMD}") output = subprocess.run(constants.PACKETBEAT.SETUP_CMD.split(" "), capture_output=True, text=True) logging.info(f"Stdout of the setup command: {output.stdout}, stderr of the setup command: {output.stderr}")
[docs] @staticmethod def set_metricbeat_system_module_config() -> None: """ Updates the metricbeat system module configuration :return: None """ logging.info(f"Enabling system module with command: " f"{constants.METRICBEAT.ENABLE_MODULE_CMD.format(constants.METRICBEAT.SYSTEM_MODULE)}") output = subprocess.run(constants.METRICBEAT.ENABLE_MODULE_CMD.format( constants.METRICBEAT.SYSTEM_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") system_module_config = HostManagerUtil.metricbeat_system_module_config() logging.info(f"Updating metricbeat system module config: \n{system_module_config}") HostManagerUtil.write_yaml_config(config=system_module_config, path=f"{constants.METRICBEAT.MODULES_CONFIG_DIR}" f"{constants.METRICBEAT.SYSTEM_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_metricbeat_linux_module_config() -> None: """ Updates the metricbeat system module configuration :return: None """ logging.info(f"Enabling linux module with command: " f"{constants.METRICBEAT.ENABLE_MODULE_CMD.format(constants.METRICBEAT.LINUX_MODULE)}") output = subprocess.run(constants.METRICBEAT.ENABLE_MODULE_CMD.format( constants.METRICBEAT.LINUX_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") linux_module_config = HostManagerUtil.metricbeat_linux_module_config() logging.info(f"Updating metricbeat linux module config: \n{linux_module_config}") HostManagerUtil.write_yaml_config(config=linux_module_config, path=f"{constants.METRICBEAT.MODULES_CONFIG_DIR}" f"{constants.METRICBEAT.LINUX_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_metricbeat_kafka_module_config(kafka_ip: str, kafka_port: int) -> None: """ Updates the metricbeat kafka module configuration :param kafka_ip: ip of the kafka server :param kafka_port: port of the kafka server :return: None """ logging.info(f"Enabling kafka module with command: " f"{constants.METRICBEAT.ENABLE_MODULE_CMD.format(constants.METRICBEAT.KAFKA_MODULE)}") output = subprocess.run(constants.METRICBEAT.ENABLE_MODULE_CMD.format( constants.METRICBEAT.KAFKA_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") kafka_module_config = HostManagerUtil.metricbeat_kafka_module_config(kafka_ip=kafka_ip, kafka_port=kafka_port) logging.info(f"Updating metricbeat kafka module config: \n{kafka_module_config}") HostManagerUtil.write_yaml_config(config=kafka_module_config, path=f"{constants.METRICBEAT.MODULES_CONFIG_DIR}" f"{constants.METRICBEAT.KAFKA_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_metricbeat_elasticsearch_module_config(elastic_ip: str, elastic_port: int) -> None: """ Updates the metricbeat elastic module configuration :param elastic_ip: ip of the elastic server :param elastic_port: port of the elastic server :return: None """ logging.info(f"Enabling elasticsearch module with command: " f"{constants.METRICBEAT.ENABLE_MODULE_CMD.format(constants.METRICBEAT.ELASTICSEARCH_MODULE)}") output = subprocess.run(constants.METRICBEAT.ENABLE_MODULE_CMD.format( constants.METRICBEAT.ELASTICSEARCH_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") elasticsearch_module_config = HostManagerUtil.metricbeat_elasticsearch_module_config(elastic_ip=elastic_ip, elastic_port=elastic_port) logging.info(f"Updating metricbeat elasticsearch module config: \n{elasticsearch_module_config}") HostManagerUtil.write_yaml_config(config=elasticsearch_module_config, path=f"{constants.METRICBEAT.MODULES_CONFIG_DIR}" f"{constants.METRICBEAT.ELASTICSEARCH_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_metricbeat_kibana_module_config(kibana_ip: str, kibana_port: int) -> None: """ Updates the metricbeat kibana module configuration :param kibana_ip: ip of the kibana server :param kibana_port: port of the kibana server :return: None """ logging.info(f"Enabling Kibana module with command: " f"{constants.METRICBEAT.ENABLE_MODULE_CMD.format(constants.METRICBEAT.KIBANA_MODULE)}") output = subprocess.run(constants.METRICBEAT.ENABLE_MODULE_CMD.format( constants.METRICBEAT.KIBANA_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") kibana_module_config = HostManagerUtil.metricbeat_kibana_module_config( kibana_ip=kibana_ip, kibana_port=kibana_port) logging.info(f"Updating metricbeat kibana module config: \n{kibana_module_config}") HostManagerUtil.write_yaml_config(config=kibana_module_config, path=f"{constants.METRICBEAT.MODULES_CONFIG_DIR}" f"{constants.METRICBEAT.KIBANA_MODULE_CONFIG_FILE}")
[docs] @staticmethod def set_metricbeat_logstash_module_config(logstash_ip: str, logstash_port: int) -> None: """ Updates the metricbeat logstas module configuration :param logstash_ip: ip of the logstash server :param logstash_port: port of the logstash server :return: None """ logging.info(f"Enabling Logstash module with command: " f"{constants.METRICBEAT.ENABLE_MODULE_CMD.format(constants.METRICBEAT.LOGSTASH_MODULE)}") output = subprocess.run(constants.METRICBEAT.ENABLE_MODULE_CMD.format( constants.METRICBEAT.LOGSTASH_MODULE).split(" "), capture_output=True, text=True) logging.info(f"Module enabled, output: {output.stdout}, err output: {output.stderr}") logstash_module_config = HostManagerUtil.metricbeat_logstash_module_config(logstash_ip=logstash_ip, logstash_port=logstash_port) logging.info(f"Updating metricbeat logstash module config: \n{logstash_module_config}") HostManagerUtil.write_yaml_config(config=logstash_module_config, path=f"{constants.METRICBEAT.MODULES_CONFIG_DIR}" f"{constants.METRICBEAT.LOGSTASH_MODULE_CONFIG_FILE}")
@staticmethod def _get_heartbeat_status() -> bool: """ Utility method to get the status of heartbeat :return: status of heartbeat """ logging.info(f"Getting heartbeat status with command: {constants.HEARTBEAT.HEARTBEAT_STATUS}") output = subprocess.run(constants.HEARTBEAT.HEARTBEAT_STATUS.split(" "), capture_output=True, text=True) heartbeat_running = not ("not" in output.stdout) logging.info(f"Got heartbeat status, output:{output.stdout}, err output: {output.stderr} ") return heartbeat_running @staticmethod def _start_heartbeat() -> None: """ Utility method to start heartbeat :return: None """ logging.info(f"Starting heartbeat with command: {constants.HEARTBEAT.HEARTBEAT_START}") output = subprocess.run(constants.HEARTBEAT.HEARTBEAT_START.split(" "), capture_output=True, text=True) logging.info(f"Started heartbeat, stdout:{output.stdout}, stderr: {output.stderr}") @staticmethod def _stop_heartbeat() -> None: """ Utility method to stop heartbeat :return: None """ logging.info(f"Stopping heartbeat with command: {constants.HEARTBEAT.HEARTBEAT_STOP}") output = subprocess.run(constants.HEARTBEAT.HEARTBEAT_STOP.split(" "), capture_output=True, text=True) logging.info(f"Stopped heartbeat, output:{output.stdout}, err output: {output.stderr} ") @staticmethod def _read_pid_file(path: str) -> int: """ Reads the PID from a pidfile :param path: the path to the file :return: the parsed pid, or -1 if the pidfile could not be read """ if os.path.exists(path): pid = int(open(path, "r").read()) return pid return -1 @staticmethod def _start_spark() -> None: """ Utility method to start spark :return: None """ logging.info(f"Starting spark master with command: {constants.SPARK.START_SPARK_MASTER}") output = subprocess.run(constants.SPARK.START_SPARK_MASTER.split(" "), capture_output=True, text=True) logging.info(f"Started spark master, stdout:{output.stdout}, stderr: {output.stderr}") time.sleep(5) logging.info(f"Starting spark worker with command: {constants.SPARK.START_SPARK_WORKER}") output = subprocess.run(constants.SPARK.START_SPARK_WORKER.split(" "), capture_output=True, text=True) logging.info(f"Started spark worker, stdout:{output.stdout}, stderr: {output.stderr}") @staticmethod def _stop_spark() -> None: """ Utility method to stop spark :return: None """ worker_pid = HostManagerServicer._read_pid_file(constants.SPARK.SPARK_WORKER_PID_FILE) logging.info(f"Stopping spark worker with command: {constants.SPARK.STOP_SPARK_WORKER.format(worker_pid)}") output = subprocess.run(constants.SPARK.STOP_SPARK_WORKER.format(worker_pid).split(" "), capture_output=True, text=True) logging.info(f"Stopped the spark worker, output:{output.stdout}, err output: {output.stderr} ") master_pid = HostManagerServicer._read_pid_file(constants.SPARK.SPARK_MASTER_PID_FILE) logging.info(f"Stopping spark master with command: {constants.SPARK.STOP_SPARK_MASTER.format(master_pid)}") output = subprocess.run(constants.SPARK.STOP_SPARK_MASTER.format(master_pid).split(" "), capture_output=True, text=True) logging.info(f"Stopped the spark master, output:{output.stdout}, err output: {output.stderr} ") @staticmethod def _set_heartbeat_config(kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int, hosts_to_monitor: List[str]) -> None: """ Updates the heartbeat configuration file :param hosts_to_monitor: the list of hosts to monitor :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :return: None """ heartbeat_config = HostManagerUtil.heartbeat_config( kibana_ip=kibana_ip, kibana_port=kibana_port, elastic_ip=elastic_ip, elastic_port=elastic_port, num_elastic_shards=num_elastic_shards, hosts_to_monitor=list(hosts_to_monitor)) logging.info(f"Updating heartbeat config: \n{heartbeat_config}") HostManagerUtil.write_yaml_config(config=heartbeat_config, path=constants.HEARTBEAT.CONFIG_FILE) logging.info(f"Running heartbeat setup command: {constants.HEARTBEAT.SETUP_CMD}") output = subprocess.run(constants.HEARTBEAT.SETUP_CMD.split(" "), capture_output=True, text=True) logging.info(f"Stdout of the setup command: {output.stdout}, stderr of the setup command: {output.stderr}")
[docs]def serve(port: int = 50049, log_dir: str = "/", max_workers: int = 10, log_file_name: str = "host_manager.log") -> None: """ Starts the gRPC server for managing clients :param port: the port that the server will listen to :param log_dir: the directory to write the log file :param log_file_name: the file name of the log :param max_workers: the maximum number of GRPC workers :return: None """ constants.LOG_FILES.HOST_MANAGER_LOG_DIR = log_dir constants.LOG_FILES.HOST_MANAGER_LOG_FILE = log_file_name server = grpc.server(futures.ThreadPoolExecutor(max_workers=max_workers)) csle_collector.host_manager.host_manager_pb2_grpc.add_HostManagerServicer_to_server( HostManagerServicer(), server) server.add_insecure_port(f'[::]:{port}') server.start() logging.info(f"HostManager Server Started, Listening on port: {port}") server.wait_for_termination()
# Program entrypoint if __name__ == '__main__': serve()