Source code for csle_common.controllers.management_system_controller

import logging
from typing import Any, Tuple
import subprocess
import docker
import psutil
import os
import csle_common.constants.constants as constants
import sys
import shutil


[docs]class ManagementSystemController: """ Controller managing monitoring tools """
[docs] @staticmethod def read_pid_file(path: str) -> int: """ Reads the PID from a pidfile :param path: the path to the file :return: the parsed pid, or -1 if the pidfile could not be read """ if os.path.exists(path): pid = int(open(path, "r").read()) return pid return -1
[docs] @staticmethod def is_prometheus_running() -> bool: """ Checks if prometheus is running on the host :return: True if it is running, false otherwise """ pid = ManagementSystemController.read_pid_file(constants.COMMANDS.PROMETHEUS_PID_FILE) if pid == -1: return False cmd = (constants.COMMANDS.PS_AUX + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PIPE_DELIM + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.GREP + constants.COMMANDS.SPACE_DELIM + "prometheus") p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (output, err) = p.communicate() output_1 = str(output) return constants.COMMANDS.SEARCH_PROMETHEUS in output_1 and str(pid) in output_1
[docs] @staticmethod def is_node_exporter_running() -> bool: """ Checks if node_exporter is running on the host :return: True if it is running, false otherwise """ pid = ManagementSystemController.read_pid_file(constants.COMMANDS.NODE_EXPORTER_PID_FILE) if pid == -1: return False cmd = (constants.COMMANDS.PS_AUX + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PIPE_DELIM + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.GREP + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.SEARCH_NODE_EXPORTER) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (output, err) = p.communicate() output_1 = str(output) return constants.COMMANDS.SEARCH_NODE_EXPORTER in output_1 and str(pid) in output_1
[docs] @staticmethod def is_nginx_running() -> bool: """ Checks if Nginx is running on the host :return: True if nginx is running, false otherwise """ output = subprocess.run(constants.COMMANDS.NGINX_STATUS.split(" "), capture_output=True, text=True) nginx_running = "active (running)" in output.stdout or "active (exited)" in output.stdout return nginx_running
[docs] @staticmethod def is_postgresql_running() -> bool: """ Checks if PostgreSQL is running on the host :return: True if PostgreSQL is running, false otherwise """ output = subprocess.run(constants.COMMANDS.POSTGRESQL_STATUS.split(" "), capture_output=True, text=True) postgresql_running = "active (running)" in output.stdout or "active (exited)" in output.stdout if postgresql_running: return postgresql_running output = subprocess.run(constants.COMMANDS.POSTGRESQL_STATUS_VERSION.split(" "), capture_output=True, text=True) postgresql_running = "active (running)" in output.stdout or "active (exited)" in output.stdout return postgresql_running
[docs] @staticmethod def is_docker_engine_running() -> bool: """ Checks if Docker engine is running on the host :return: True if Docker engine is running, false otherwise """ output = subprocess.run(constants.COMMANDS.DOCKER_ENGINE_STATUS.split(" "), capture_output=True, text=True) docker_engine_running = "active (running)" in output.stdout or "active (exited)" in output.stdout return docker_engine_running
[docs] @staticmethod def is_flask_running() -> bool: """ Checks if the flask web server is running on the host :return: True if it is running, false otherwise """ pid = ManagementSystemController.read_pid_file(constants.COMMANDS.CSLE_MGMT_WEBAPP_PID_FILE) if pid == -1: return False cmd = (constants.COMMANDS.PS_AUX + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PIPE_DELIM + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.GREP + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.SEARCH_MONITOR) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (output, err) = p.communicate() output_1 = str(output) return constants.COMMANDS.SEARCH_MONITOR in output_1 and str(pid) in output_1
[docs] @staticmethod def start_node_exporter(logger: logging.Logger) -> bool: """ Starts the node exporter :param logger: the logger to use for logging :return: True if it was started, False otherwise """ if ManagementSystemController.is_node_exporter_running(): logger.info("Node exporter is already running") return False cmd = constants.COMMANDS.START_NODE_EXPORTER logger.info(f"Starting node exporter by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True
[docs] @staticmethod def start_flask(logger: logging.Logger) -> bool: """ Starts the Flask REST API Server :param logger: the logger to use for logging :return: True if it was started, False otherwise """ if ManagementSystemController.is_flask_running(): logger.info("Flask is already running") return False cmd = constants.COMMANDS.BUILD_CSLE_MGMT_WEBAPP logger.info(f"Building the web app with the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) while True: if p.stdout is None: raise ValueError("Cannot read due to NoneType") out = p.stdout.read(1) if p.poll() is not None: break if str(out) != '': try: sys.stdout.write(out.decode("utf-8")) except Exception: pass sys.stdout.flush() cmd = constants.COMMANDS.START_CSLE_MGMT_WEBAPP cmd = cmd.replace("python", str(shutil.which("python"))) cmd = cmd.replace(f"${constants.CONFIG_FILE.CSLE_HOME_ENV_PARAM}", os.environ[constants.CONFIG_FILE.CSLE_HOME_ENV_PARAM]) logger.info(f"Starting flask with the command: {cmd}") p = subprocess.Popen(cmd.split(" "), stdout=subprocess.DEVNULL, shell=False) pid = p.pid cmd = constants.COMMANDS.SAVE_PID.format(pid, constants.COMMANDS.CSLE_MGMT_WEBAPP_PID_FILE) p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True
[docs] @staticmethod def stop_node_exporter(logger: logging.Logger) -> bool: """ Stops the node exporter :param logger: the logger to use for logging :return: True if it was stopped, False otherwise """ if not ManagementSystemController.is_node_exporter_running(): logger.info("Node exporter is not running") return False pid = ManagementSystemController.read_pid_file(constants.COMMANDS.NODE_EXPORTER_PID_FILE) cmd = constants.COMMANDS.KILL_PROCESS.format(pid) logger.info(f"Stopping node exporter by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True
[docs] @staticmethod def stop_flask(logger: logging.Logger) -> bool: """ Stops the flask REST API :param logger: the logger to use for logging :return: True if it was stopped, False otherwise """ if not ManagementSystemController.is_flask_running(): logger.info("Flask is not running") return False pid = ManagementSystemController.read_pid_file(constants.COMMANDS.CSLE_MGMT_WEBAPP_PID_FILE) cmd = constants.COMMANDS.KILL_PROCESS.format(pid) logger.info(f"Stopping flask by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True
[docs] @staticmethod def start_prometheus(logger: logging.Logger) -> bool: """ Starts Prometheus :param logger: the logger to use for logging :return: True if it was started, False otherwise """ if ManagementSystemController.is_prometheus_running(): logger.info("Prometheus is already running") return False cmd = constants.COMMANDS.START_PROMETHEUS logger.info(f"Starting Prometheus by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True
[docs] @staticmethod def stop_prometheus(logger: logging.RootLogger) -> bool: """ Stops Prometheus :param logger: the logger to use for logging :return: True if it was stopped, False otherwise """ if not ManagementSystemController.is_prometheus_running(): logger.info("Prometheus is not running") return False pid = ManagementSystemController.read_pid_file(constants.COMMANDS.PROMETHEUS_PID_FILE) cmd = constants.COMMANDS.KILL_PROCESS.format(pid) logger.info(f"Stopping Prometheus by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True
[docs] @staticmethod def is_cadvisor_running() -> bool: """ :return: True if cadvisor is running, otherwise False """ client_1 = docker.from_env() containers = client_1.containers.list() for c in containers: if constants.CONTAINER_IMAGES.CADVISOR in c.name: return True return False
[docs] @staticmethod def is_pgadmin_running() -> bool: """ :return: True if pgadmin is running, otherwise False """ client_1 = docker.from_env() containers = client_1.containers.list() for c in containers: if constants.CONTAINER_IMAGES.PGADMIN in c.name: return True return False
[docs] @staticmethod def stop_cadvisor() -> bool: """ :return: True if cadvisor was stopped, otherwise False """ client_1 = docker.from_env() containers = client_1.containers.list() for c in containers: if constants.CONTAINER_IMAGES.CADVISOR in c.name: c.stop() return True return False
[docs] @staticmethod def stop_pgadmin() -> bool: """ :return: True if pgadmin was stopped, otherwise False """ client_1 = docker.from_env() containers = client_1.containers.list() for c in containers: if constants.CONTAINER_IMAGES.PGADMIN in c.name: c.stop() return True return False
[docs] @staticmethod def start_cadvisor(logger: logging.Logger) -> bool: """ Starts cAdvisor :param logger: the logger to use for logging :return: True if cadvisor was started, otherwise False """ if ManagementSystemController.is_cadvisor_running(): logger.info("cAdvisor is already running") return False client_1 = docker.from_env() containers = client_1.containers.list(all=True) for c in containers: if constants.CONTAINER_IMAGES.CADVISOR in c.name: container_state = c.attrs['State'] running = container_state['Status'] == "running" if not running: c.start() return True cmd = constants.COMMANDS.START_CADVISOR logger.info(f"Starting cAdvisor by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) (output, err) = p.communicate() return True
[docs] @staticmethod def start_postgresql(logger: logging.RootLogger) -> Tuple[bool, Any, Any]: """ Starts PostgreSQL :param logger: the logger to use for logging :return: True if postgresql was started, otherwise False """ if ManagementSystemController.is_postgresql_running(): logger.info("PostgreSQL is already running") return False, None, None cmd = constants.COMMANDS.POSTGRESQL_START logger.info(f"Starting PostgreSQL by running the command: {cmd}") output = subprocess.run(cmd.split(" "), capture_output=True, text=True) return True, output.stdout, output.stderr
[docs] @staticmethod def stop_postgresql(logger: logging.RootLogger) -> Tuple[bool, Any, Any]: """ Stops PostgreSQL :param logger: the logger to use for logging :return: True if postgresql was stopped, otherwise False """ if not ManagementSystemController.is_postgresql_running(): logger.info("PostgreSQL is not running") return False, None, None cmd = constants.COMMANDS.POSTGRESQL_STOP logger.info(f"Stopping PostgreSQL by running the command: {cmd}") output = subprocess.run(constants.COMMANDS.POSTGRESQL_STOP.split(" "), capture_output=True, text=True) return True, output.stdout, output.stderr
[docs] @staticmethod def start_nginx(logger: logging.RootLogger) -> Tuple[bool, Any, Any]: """ Starts Nginx :param logger: the logger to use for logging :return: True if nginx was started, otherwise False """ if ManagementSystemController.is_nginx_running(): logger.info("Nginx is already running") return False, None, None cmd = constants.COMMANDS.NGINX_START logger.info(f"Starting Nginx by running the command: {cmd}") output = subprocess.run(cmd.split(" "), capture_output=True, text=True) return True, output.stdout, output.stderr
[docs] @staticmethod def stop_nginx(logger: logging.RootLogger) -> Tuple[bool, Any, Any]: """ Stops Nginx :param logger: the logger to use for logging :return: True if postgresql was stopped, otherwise False """ if not ManagementSystemController.is_nginx_running(): logger.info("Nginx is not running") return False, None, None cmd = constants.COMMANDS.NGINX_STOP logger.info(f"Stopping Nginx by running the command: {cmd}") output = subprocess.run(cmd.split(" "), capture_output=True, text=True) return True, output.stdout, output.stderr
[docs] @staticmethod def start_docker_engine(logger: logging.RootLogger) -> Tuple[bool, Any, Any]: """ Starts the Docker engine :param logger: the logger to use for logging :return: True if nginx was started, otherwise False """ if ManagementSystemController.is_docker_engine_running(): logger.info("The Docker engine is already running") return False, None, None cmd = constants.COMMANDS.DOCKER_ENGINE_START logger.info(f"Starting the Docker engine by running the command: {cmd}") output = subprocess.run(cmd.split(" "), capture_output=True, text=True) return True, output.stdout, output.stderr
[docs] @staticmethod def stop_docker_engine(logger: logging.RootLogger) -> Tuple[bool, Any, Any]: """ Stops the Docker engine :param logger: the logger to use for logging :return: True if postgresql was stopped, otherwise False """ if not ManagementSystemController.is_docker_engine_running(): logger.info("The Docker engine is not running") return False, None, None cmd = constants.COMMANDS.DOCKER_ENGINE_STOP logger.info(f"Stopping the Docker engine by running the command: {cmd}") output = subprocess.run(cmd.split(" "), capture_output=True, text=True) return True, output.stdout, output.stderr
[docs] @staticmethod def start_pgadmin(logger: logging.RootLogger) -> bool: """ Starts pgAdmin :param logger: the logger to use for logging :return: True if pgadmin was started, otherwise False """ if ManagementSystemController.is_pgadmin_running(): logger.info("pgAdmin is already running") return False client_1 = docker.from_env() containers = client_1.containers.list(all=True) for c in containers: if constants.CONTAINER_IMAGES.PGADMIN in c.name: container_state = c.attrs['State'] running = container_state['Status'] == "running" if not running: c.start() return True cmd = constants.COMMANDS.START_PGADMIN logger.info(f"Starting pgAdmin by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) (output, err) = p.communicate() return True
[docs] @staticmethod def stop_grafana() -> bool: """ Stops Grafana :return: True if grafana was stopped, otherwise False """ client_1 = docker.from_env() containers = client_1.containers.list() for c in containers: if constants.CONTAINER_IMAGES.GRAFANA in c.name: c.stop() return True return False
[docs] @staticmethod def start_grafana(logger: logging.RootLogger) -> bool: """ Starts Grafana :param logger: the logger to use for logging :return: True if grafana was started, otherwise False """ if ManagementSystemController.is_grafana_running(): logger.info("Grafana is already running") return False client_1 = docker.from_env() containers = client_1.containers.list(all=True) for c in containers: if constants.CONTAINER_IMAGES.GRAFANA in c.name: container_state = c.attrs['State'] running = container_state['Status'] == "running" if not running: c.start() return True cmd = constants.COMMANDS.START_GRAFANA logger.info(f"Starting Grafana by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) (output, err) = p.communicate() return True
[docs] @staticmethod def is_grafana_running() -> bool: """ :return: True if grafana is running otherwise False """ client_1 = docker.from_env() containers = client_1.containers.list() for c in containers: if constants.CONTAINER_IMAGES.GRAFANA in c.name: return True return False
[docs] @staticmethod def is_statsmanager_running() -> bool: """ Checks if the statsmanager is running on the host :return: True if it is running, false otherwise """ pid = ManagementSystemController.read_pid_file(constants.COMMANDS.DOCKER_STATS_MANAGER_PIDFILE) if pid == -1: return False logging.getLogger().info(pid) cmd = (constants.COMMANDS.PS_AUX + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.PIPE_DELIM + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.GREP + constants.COMMANDS.SPACE_DELIM + constants.COMMANDS.SEARCH_DOCKER_STATS_MANAGER) p = subprocess.Popen(cmd, stdout=subprocess.PIPE, shell=True) (output, err) = p.communicate() output_1 = str(output) return constants.COMMANDS.SEARCH_DOCKER_STATS_MANAGER in output_1 and str(pid) in output_1
[docs] @staticmethod def start_docker_statsmanager(logger: logging.Logger, log_file: str = "docker_stats_manager.log", log_dir: str = "/var/log/csle", max_workers: int = 10, port: int = 50046) -> bool: """ Starts the docker stats manager on the docker host if it is not already started :param logger: the logger to use for logging :param port: the port that the docker stats manager will listen to :param log_file: log file of the docker stats manager :param log_dir: log dir of the docker stats manager :param max_workers: max workers of the docker stats manager :return: True if it was started, False otherwise """ if ManagementSystemController.is_statsmanager_running(): logger.info("The docker statsmanager is already running") return False cmd = constants.COMMANDS.START_DOCKER_STATS_MANAGER.format(port, log_dir, log_file, max_workers) logger.info(f"Starting the Docker stats manager by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) (output, err) = p.communicate() return True
[docs] @staticmethod def stop_docker_statsmanager(logger: logging.Logger) -> bool: """ Stops the statsmanager :param logger: the logger to use for logging :return: True if it was stopped, False otherwise """ if not ManagementSystemController.is_statsmanager_running(): logger.info("The statsmanager is not running") return False pid = ManagementSystemController.read_pid_file(constants.COMMANDS.DOCKER_STATS_MANAGER_PIDFILE) cmd = constants.COMMANDS.KILL_PROCESS.format(pid) logger.info(f"Stopping the statsmanager by running the command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) (output, err) = p.communicate() return True
[docs] @staticmethod def stop_cluster_manager() -> bool: """ Stops the local cluster manager :return: True if it was stopped, False otherwise """ pid = ManagementSystemController.read_pid_file(constants.COMMANDS.CLUSTER_MANAGER_PIDFILE) cmd = constants.COMMANDS.KILL_PROCESS.format(pid) p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) (output, err) = p.communicate() return True
[docs] @staticmethod def is_pid_running(pid: int, logger: logging.Logger) -> bool: """ Checks if the given pid is running on the host :param pid: the pid to check :param logger: the logger to use for logging :return: True if it is running, false otherwise """ logger.info(f"Checking if PID: {pid} is running") return bool(psutil.pid_exists(pid))
[docs] @staticmethod def stop_pid(pid, logger: logging.Logger) -> bool: """ Stops a process with a given pid :param pid: the pid to stop :return: True if the pid was stopped, false if it was not running """ if not ManagementSystemController.is_pid_running(pid, logger=logger): return False cmd = constants.COMMANDS.KILL_PROCESS.format(pid) logger.info(f"Stopping PID:{pid} with command: {cmd}") p = subprocess.Popen(cmd, stdout=subprocess.DEVNULL, shell=True) p.communicate() return True