Source code for csle_collector.host_manager.host_manager_util

from typing import Dict, Any, List, Union
import datetime
import subprocess
import yaml
import logging
import csle_collector.constants.constants as constants
from csle_collector.host_manager.dao.failed_login_attempt import FailedLoginAttempt
from csle_collector.host_manager.dao.successful_login import SuccessfulLogin
from csle_collector.host_manager.dao.host_metrics import HostMetrics
import csle_collector.host_manager.host_manager_pb2


[docs]class HostManagerUtil: """ Utility class for the host manager """
[docs] @staticmethod def read_latest_ts_auth() -> float: """ Measures the timestamp of the latest failed login attempt :return: the number of recently failed login attempts """ try: cmd = constants.HOST_METRICS.LIST_FAILED_LOGIN_ATTEMPTS output = subprocess.run(cmd.split(" "), capture_output=True, text=True) login_attempts_str = output.stdout login_attempts = login_attempts_str.split("\n") login_attempts = list(filter(lambda x: x != "" and len(x) > 14, login_attempts)) year = datetime.datetime.now().year parsed_ts = FailedLoginAttempt.parse_from_str(str(year) + " " + " ".join(login_attempts[-1][0:15].split())).timestamp if parsed_ts is None or parsed_ts.timestamp is None: raise ValueError("Could not parse the login attempts log") return float(parsed_ts.timestamp) except Exception: return datetime.datetime.now().timestamp()
[docs] @staticmethod def read_failed_login_attempts(failed_auth_last_ts: float) -> int: """ Measures the number of recent failed login attempts :param emulation_config: configuration to connect to the node in the emulation :return: the number of recently failed login attempts """ cmd = constants.HOST_METRICS.LIST_FAILED_LOGIN_ATTEMPTS output = subprocess.run(cmd.split(" "), capture_output=True, text=True) login_attempts_str = output.stdout login_attempts = login_attempts_str.split("\n") login_attempts = list(filter(lambda x: x != "" and len(x) > 14, login_attempts)) year = datetime.datetime.now().year failed_login_attempts: List[FailedLoginAttempt] = list(map(lambda x: FailedLoginAttempt.parse_from_str( str(year) + " " + " ".join(x[0:15].split())), login_attempts)) filtered_failed_login_attempts = list(filter(lambda x: (x is not None and x.timestamp is not None and float(x.timestamp) > failed_auth_last_ts), failed_login_attempts)) return len(filtered_failed_login_attempts)
[docs] @staticmethod def read_latest_ts_login() -> float: """ Measures the timestamp of the latest successful login attempt :return: the number of recently failed login attempts """ try: cmd = constants.HOST_METRICS.LIST_SUCCESSFUL_LOGIN_ATTEMPTS output = subprocess.run(cmd.split(" "), capture_output=True, text=True) logins = output.stdout.split("\n") logins = list(filter(lambda x: x != "" and len(x) > 0 and "wtmp begins" not in x, logins)) year = datetime.datetime.now().year ts = SuccessfulLogin.parse_from_str(" ".join(logins[0].split()), year=year).timestamp if ts is None: raise ValueError("Could not read latest login timestamp") return float(ts) except Exception: return datetime.datetime.now().timestamp()
[docs] @staticmethod def read_successful_login_events(login_last_ts: float) -> int: """ Measures the number of recent successful login attempts :param login_last_ts: the timestamp to use when filtering logins :return: the number of recently failed login attempts """ cmd = constants.HOST_METRICS.LIST_SUCCESSFUL_LOGIN_ATTEMPTS output = subprocess.run(cmd.split(" "), capture_output=True, text=True) logins = output.stdout.split("\n") logins = list(filter(lambda x: x != "" and len(x) > 0 and "wtmp begins" not in x, logins)) year = datetime.datetime.now().year successful_logins = list(map(lambda x: SuccessfulLogin.parse_from_str(" ".join(x.split()), year=year), logins)) successful_logins = list(filter(lambda x: x.timestamp is not None, successful_logins)) successful_logins = list(filter(lambda x: x.timestamp > login_last_ts, successful_logins)) return len(successful_logins)
[docs] @staticmethod def read_open_connections() -> int: """ Measures the number of open connections to the server :return: the number of open connections """ cmd = constants.HOST_METRICS.LIST_OPEN_CONNECTIONS_CMD output = subprocess.run(cmd.split(" "), capture_output=True, text=True) connections_str = output.stdout parts = connections_str.split("Active UNIX domain sockets", 1) if len(parts) > 0: parts2 = parts[0].split("State") if len(parts2) > 1: parts3 = parts2[1].split("\n") return len(parts3) - 1 return -1
[docs] @staticmethod def read_users() -> int: """ Measures the number of user accounts on the server :return: the number of user accounts """ cmd = constants.HOST_METRICS.LIST_USER_ACCOUNTS output = subprocess.run(cmd.split(" "), capture_output=True, text=True) users_str = output.stdout users = users_str.split("\n") return len(users)
[docs] @staticmethod def read_logged_in_users() -> int: """ Measures the number of logged-in users on the server :return: the number of logged in users """ cmd = constants.HOST_METRICS.LIST_LOGGED_IN_USERS_CMD output = subprocess.run(cmd.split(" "), capture_output=True, text=True) users_str = output.stdout users_str = users_str.replace("\n", "") users = users_str.split(" ") return len(users)
[docs] @staticmethod def read_processes() -> int: """ Measures the number of processes on the server :param emulation_config: configuration to connect to the node in the emulation :return: the number of processes """ try: cmd = constants.HOST_METRICS.LIST_NUMBER_OF_PROCESSES output = subprocess.run(cmd.split(" "), capture_output=True, text=True) processes_str = output.stdout num_processes = int(processes_str) except Exception: num_processes = -1 return num_processes
[docs] @staticmethod def read_host_metrics(failed_auth_last_ts: float, login_last_ts: float) -> HostMetrics: """ Reads the latest metrics from the host :param failed_auth_last_ts: the last time-step of read failed authentication attempts :param login_last_ts: the last time-step of read successful logins :return: The parsed host metrics """ num_logged_in_users = HostManagerUtil.read_logged_in_users() num_failed_login_attempts = HostManagerUtil.read_failed_login_attempts( failed_auth_last_ts=failed_auth_last_ts) num_open_connections = HostManagerUtil.read_open_connections() if num_open_connections == -1: num_open_connections = 0 num_login_events = HostManagerUtil.read_successful_login_events(login_last_ts=login_last_ts) num_processes = HostManagerUtil.read_processes() if num_processes == -1: num_processes = 0 num_users = HostManagerUtil.read_users() host_metrics = HostMetrics( num_logged_in_users=num_logged_in_users, num_failed_login_attempts=num_failed_login_attempts, num_open_connections=num_open_connections, num_login_events=num_login_events, num_processes=num_processes, num_users=num_users ) return host_metrics
[docs] @staticmethod def host_monitor_dto_to_dict(host_dto: csle_collector.host_manager.host_manager_pb2.HostStatusDTO) \ -> Dict[str, Any]: """ Converts a HostStatusDTO to a dict :param host_dto: the dto to convert :return: a dict representation of the DTO """ if isinstance(host_dto, tuple): host_dto = host_dto[0] d = {} d["monitor_running"] = host_dto.monitor_running d["filebeat_running"] = host_dto.filebeat_running d["packetbeat_running"] = host_dto.packetbeat_running d["metricbeat_running"] = host_dto.metricbeat_running d["heartbeat_running"] = host_dto.heartbeat_running return d
[docs] @staticmethod def host_monitor_dto_from_dict(d: Dict[str, Any]) -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ Converts a dict representation of a HostStatusDTO to a DTO :param d: the dict to convert :return: the converted DTO """ host_dto = csle_collector.host_manager.host_manager_pb2.HostStatusDTO() host_dto.monitor_running = d["monitor_running"] host_dto.filebeat_running = d["filebeat_running"] host_dto.packetbeat_running = d["packetbeat_running"] host_dto.metricbeat_running = d["metricbeat_running"] host_dto.heartbeat_running = d["heartbeat_running"] return host_dto
[docs] @staticmethod def host_metrics_dto_to_dict(host_metrics_dto: csle_collector.host_manager.host_manager_pb2.HostMetricsDTO) \ -> Dict[str, Any]: """ Converts a HostMetricsDTO to a dict :param host_metrics_dto: the dto to convert :return: a dict representation of the DTO """ d: Dict[str, Any] = {} d["num_logged_in_users"] = host_metrics_dto.num_logged_in_users d["num_failed_login_attempts"] = host_metrics_dto.num_failed_login_attempts d["num_open_connections"] = host_metrics_dto.num_open_connections d["num_login_events"] = host_metrics_dto.num_login_events d["num_processes"] = host_metrics_dto.num_processes d["num_users"] = host_metrics_dto.num_users d["ip"] = host_metrics_dto.ip d["timestamp"] = host_metrics_dto.timestamp return d
[docs] @staticmethod def host_metrics_dto_from_dict(d: Dict[str, Any]) -> csle_collector.host_manager.host_manager_pb2.HostMetricsDTO: """ Converts a dict representation of a HostMetricsDTO to a DTO :param d: the dict to convert :return: the converted DTO """ host_metrics_dto = csle_collector.host_manager.host_manager_pb2.HostMetricsDTO() host_metrics_dto.num_logged_in_users = d["num_logged_in_users"] host_metrics_dto.num_failed_login_attempts = d["num_failed_login_attempts"] host_metrics_dto.num_open_connections = d["num_open_connections"] host_metrics_dto.num_login_events = d["num_login_events"] host_metrics_dto.num_processes = d["num_processes"] host_metrics_dto.num_users = d["num_users"] host_metrics_dto.ip = d["ip"] host_metrics_dto.timestamp = d["timestamp"] return host_metrics_dto
[docs] @staticmethod def host_metrics_dto_empty() -> csle_collector.host_manager.host_manager_pb2.HostMetricsDTO: """ :return: an empty HostMetricsDTO """ host_metrics_dto = csle_collector.host_manager.host_manager_pb2.HostMetricsDTO() host_metrics_dto.num_logged_in_users = 0 host_metrics_dto.num_failed_login_attempts = 0 host_metrics_dto.num_open_connections = 0 host_metrics_dto.num_login_events = 0 host_metrics_dto.num_processes = 0 host_metrics_dto.num_users = 0 host_metrics_dto.ip = "" host_metrics_dto.timestamp = 0 return host_metrics_dto
[docs] @staticmethod def host_monitor_dto_empty() -> csle_collector.host_manager.host_manager_pb2.HostStatusDTO: """ :return: an empty HostStatusDTO """ host_monitor_dto = csle_collector.host_manager.host_manager_pb2.HostStatusDTO() host_monitor_dto.monitor_running = False host_monitor_dto.filebeat_running = False host_monitor_dto.packetbeat_running = False host_monitor_dto.metricbeat_running = False host_monitor_dto.heartbeat_running = False return host_monitor_dto
[docs] @staticmethod def filebeat_config(log_files_paths: List[str], kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int, kafka_topics: List[str], kafka_ip: str, kafka_port: int, reload_enabled: bool = False, kafka: bool = False) \ -> Dict[str, Any]: """ Generates the filebeat.yml config :param log_files_paths: the list of log files that filebeat should monitor :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :param reload_enabled: whether automatic reload of modules should be enabled :param kafka: whether kafka should be added as input :param kafka_topics: list of kafka topics to ingest :param kafka_port: the kafka server port :param kafka_ip: the kafka server ip :return: the filebeat configuration dict """ filebeat_config: Dict[str, Any] = {} filebeat_config[constants.FILEBEAT.INPUTS_PROPERTY] = [ { constants.BEATS.TYPE_PROPERTY: constants.BEATS.FILESTREAM_PROPERTY, constants.BEATS.ID_PROPERTY: 1, constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.PATHS_PROPERTY: log_files_paths } ] if kafka: filebeat_config[constants.FILEBEAT.INPUTS_PROPERTY].append( { constants.BEATS.TYPE_PROPERTY: constants.BEATS.KAFKA_PROPERTY, constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.GROUP_ID_PROPERTY: constants.FILEBEAT.FILEBEAT_GROUP_ID, constants.BEATS.TOPICS_PROPERTY: kafka_topics, constants.BEATS.HOSTS_PROPERTY: [f"{kafka_ip}:{kafka_port}"], } ) filebeat_config[constants.FILEBEAT.MODULES_PROPERTY] = { constants.BEATS.PATH_PROPERTY: f"{constants.FILEBEAT.MODULES_CONFIG_DIR}*.yml", constants.BEATS.RELOAD_ENABLED_PROPERTY: reload_enabled } filebeat_config[constants.BEATS.SETUP_TEMPLATE_SETTINGS_PROPERTY] = { constants.BEATS.INDEX_NUM_SHARDS_PROPERTY: num_elastic_shards } filebeat_config[constants.BEATS.SETUP_KIBANA_PROPERTY] = { constants.BEATS.HOST_PROPERTY: f"{kibana_ip}:{kibana_port}" } filebeat_config[constants.BEATS.ELASTIC_OUTPUT_PROPERTY] = { constants.BEATS.HOSTS_PROPERTY: [f"{elastic_ip}:{elastic_port}"] } filebeat_config[constants.BEATS.PROCESSORS_PROPERTY] = { constants.BEATS.ADD_HOST_METADATA_PROPERTY: { constants.BEATS.WHEN_NOT_CONTAIN_TAGS_PROPERTY: constants.BEATS.FORWARDED_PROPERTY } } return filebeat_config
[docs] @staticmethod def write_yaml_config(config: Union[Dict[str, Any], List[Dict[str, Any]]], path: str) -> None: """ Writes a given filebeat config to disk :param config: the filebeat config to write :param path: the path to write the file to :return: None """ logging.info(f"Writing configuration file to path: {path}") with open(path, 'w') as file: yaml.dump(config, file)
[docs] @staticmethod def filebeat_snort_module_config() -> List[Dict[str, Any]]: """ :return: the snort filebeat module config """ snort_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.SNORT_MODULE, constants.BEATS.LOG_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_INPUT_PROPERTY: constants.BEATS.FILE_PROPERTY, constants.BEATS.VAR_PATHS_PROPERTY: [ constants.SNORT_IDS_ROUTER.SNORT_FAST_LOG_FILE ] } } ] return snort_config
[docs] @staticmethod def filebeat_elasticsearch_module_config() -> List[Dict[str, Any]]: """ :return: the elasticsearch filebeat module config """ elastic_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.ELASTICSEARCH_MODULE, constants.BEATS.SERVER_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.ELK.ELASTICSEARCH_LOG_DIR}*.log", f"{constants.ELK.ELASTICSEARCH_LOG_DIR}*_server.json" ] } } ] return elastic_config
[docs] @staticmethod def filebeat_logstash_module_config() -> List[Dict[str, Any]]: """ :return: the logstash filebeat module config """ logstash_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.LOGSTASH_MODULE, constants.BEATS.LOG_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.ELK.LOGSTASH_LOG_DIR}logstash.log*" ] }, constants.BEATS.SLOWLOG_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.ELK.LOGSTASH_LOG_DIR}logstash-slowlog.log*" ] } } ] return logstash_config
[docs] @staticmethod def filebeat_kibana_module_config() -> List[Dict[str, Any]]: """ :return: the kibana filebeat module config """ kibana_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.KIBANA_MODULE, constants.BEATS.LOG_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.ELK.KIBANA_LOG_DIR}*.log" ] }, constants.BEATS.AUDIT_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True } } ] return kibana_config
[docs] @staticmethod def filebeat_system_module_config() -> List[Dict[str, Any]]: """ :return: the system filebeat module config """ system_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.SYSTEM_MODULE, constants.BEATS.SYSLOG_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.SYSTEM.SYSLOG}*" ] }, constants.BEATS.AUTH_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.SYSTEM.AUTH_LOG}" ] } } ] return system_config
[docs] @staticmethod def filebeat_kafka_module_config() -> List[Dict[str, Any]]: """ :return: the kafka filebeat module config """ system_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.KAFKA_MODULE, constants.BEATS.LOG_PROPERTY: { constants.BEATS.ENABLED_PROPERTY: True, constants.BEATS.VAR_PATHS_PROPERTY: [ f"{constants.KAFKA.DIR}controller.log*", f"{constants.KAFKA.DIR}server.log*", f"{constants.KAFKA.DIR}state-change.log*", f"{constants.KAFKA.DIR}kafka-*.log*" ] } } ] return system_config
[docs] @staticmethod def packetbeat_config(kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int) -> Dict[str, Any]: """ Generates the packetbeat.yml config :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :return: the filebeat configuration dict """ packetbeat_config: Dict[str, Any] = {} packetbeat_config[constants.PACKETBEAT.INTERFACES_TYPE_PROPERTY] = constants.PACKETBEAT.AF_PACKET_PROPERTY packetbeat_config[constants.PACKETBEAT.INTERFACES_DEVICE_PROPERTY] = constants.PACKETBEAT.ANY_DEVICE_PROPERTY packetbeat_config[constants.PACKETBEAT.FLOWS] = { constants.PACKETBEAT.TIMEOUT_PROPERTY: "30s", constants.BEATS.PERIOD_PROPERTY: "10s" } packetbeat_config[constants.PACKETBEAT.PROTOCOLS] = [ { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.ICMP_PROTOCOL, constants.BEATS.ENABLED_PROPERTY: True }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.AMQP_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.AMQP_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.CASSANDRA_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.CASSANDRA_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.DHCPV4_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.DHCPV4_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.DNS_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.DNS_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.HTTP_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.HTTP_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.MEMCACHE_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.MEMCACHE_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.MYSQL_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.MYSQL_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.PGSQL_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.PGSQL_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.REDIS_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.REDIS_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.THRIFT_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.THRIFT_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.MONGODB_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.MONGODB_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.NFS_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.NFS_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.TLS_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.TLS_PORTS }, { constants.BEATS.TYPE_PROPERTY: constants.PACKETBEAT.SIP_PROTOCOL, constants.PACKETBEAT.PORTS_PROPERTY: constants.PACKETBEAT.SIP_PORTS } ] packetbeat_config[constants.BEATS.SETUP_TEMPLATE_SETTINGS_PROPERTY] = { constants.BEATS.INDEX_NUM_SHARDS_PROPERTY: num_elastic_shards } packetbeat_config[constants.BEATS.SETUP_KIBANA_PROPERTY] = { constants.BEATS.HOST_PROPERTY: f"{kibana_ip}:{kibana_port}" } packetbeat_config[constants.BEATS.ELASTIC_OUTPUT_PROPERTY] = { constants.BEATS.HOSTS_PROPERTY: [f"{elastic_ip}:{elastic_port}"] } packetbeat_config[constants.BEATS.PROCESSORS_PROPERTY] = { constants.BEATS.ADD_HOST_METADATA_PROPERTY: { constants.BEATS.WHEN_NOT_CONTAIN_TAGS_PROPERTY: constants.BEATS.FORWARDED_PROPERTY } } return packetbeat_config
[docs] @staticmethod def metricbeat_config(kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int, reload_enabled: bool = False) -> Dict[str, Any]: """ Generates the metricbeat.yml config :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :param reload_enabled: whether automatic reload of modules should be enabled :return: the metricbeat configuration dict """ metricbeat_config = {} metricbeat_config[constants.METRICBEAT.MODULES_PROPERTY] = { constants.BEATS.PATH_PROPERTY: f"{constants.METRICBEAT.MODULES_CONFIG_DIR}*.yml", constants.BEATS.RELOAD_ENABLED_PROPERTY: reload_enabled } metricbeat_config[constants.BEATS.SETUP_TEMPLATE_SETTINGS_PROPERTY] = { constants.BEATS.INDEX_NUM_SHARDS_PROPERTY: num_elastic_shards } metricbeat_config[constants.BEATS.SETUP_KIBANA_PROPERTY] = { constants.BEATS.HOST_PROPERTY: f"{kibana_ip}:{kibana_port}" } metricbeat_config[constants.BEATS.ELASTIC_OUTPUT_PROPERTY] = { constants.BEATS.HOSTS_PROPERTY: [f"{elastic_ip}:{elastic_port}"] } metricbeat_config[constants.BEATS.PROCESSORS_PROPERTY] = { constants.BEATS.ADD_HOST_METADATA_PROPERTY: { constants.BEATS.WHEN_NOT_CONTAIN_TAGS_PROPERTY: constants.BEATS.FORWARDED_PROPERTY } } return metricbeat_config
[docs] @staticmethod def metricbeat_elasticsearch_module_config(elastic_ip: str, elastic_port: int) -> List[Dict[str, Any]]: """ :return: the elasticsearch metricbeat module config """ elastic_config = [ { constants.BEATS.MODULE_PROPERTY: constants.FILEBEAT.ELASTICSEARCH_MODULE, constants.BEATS.PERIOD_PROPERTY: "10s", constants.BEATS.HOSTS_PROPERTY: [f"{elastic_ip}:{elastic_port}"] } ] return elastic_config
[docs] @staticmethod def metricbeat_kibana_module_config(kibana_ip: str, kibana_port: int) -> List[Dict[str, Any]]: """ Creates the kibana metricbeat module configuration :param kibana_ip: the kibana host IP :param kibana_port: the kibana host port :return: the kibana metricbeat module config """ kibana_config = [ { constants.BEATS.MODULE_PROPERTY: constants.METRICBEAT.KIBANA_MODULE, constants.BEATS.PERIOD_PROPERTY: "10s", constants.BEATS.HOSTS_PROPERTY: [f"{kibana_ip}:{kibana_port}"] } ] return kibana_config
[docs] @staticmethod def metricbeat_logstash_module_config(logstash_ip: str, logstash_port: int) -> List[Dict[str, Any]]: """ Creates the logstash metricbeat module configuration :param logstash_ip: the kibana host IP :param logstash_port: the kibana host port :return: the kibana metricbeat module config """ logstash_config = [ { constants.BEATS.MODULE_PROPERTY: constants.METRICBEAT.LOGSTASH_MODULE, constants.BEATS.PERIOD_PROPERTY: "10s", constants.BEATS.HOSTS_PROPERTY: [f"{logstash_ip}:{logstash_port}"] } ] return logstash_config
[docs] @staticmethod def metricbeat_kafka_module_config(kafka_ip: str, kafka_port: int) -> List[Dict[str, Any]]: """ Creates the kafka metricbeat module configuration :param kafka_ip: the kibana host IP :param kafka_port: the kibana host port :return: the kafka metricbeat module config """ kafka_config = [ { constants.BEATS.MODULE_PROPERTY: constants.METRICBEAT.KAFKA_MODULE, constants.BEATS.PERIOD_PROPERTY: "10s", constants.BEATS.HOSTS_PROPERTY: [f"{kafka_ip}:{kafka_port}"] } ] return kafka_config
[docs] @staticmethod def metricbeat_system_module_config() -> List[Dict[str, Any]]: """ Creates the system metricbeat module configuration :return: the system metricbeat module config """ system_config = [ { constants.BEATS.MODULE_PROPERTY: constants.METRICBEAT.SYSTEM_MODULE, constants.BEATS.PERIOD_PROPERTY: "10s", constants.BEATS.METRICSETS_PROPERTY: [ constants.METRICBEAT.CPU_METRIC, constants.METRICBEAT.LOAD_METRIC, constants.METRICBEAT.MEMORY_METRIC, constants.METRICBEAT.NETWORK_METRIC, constants.METRICBEAT.PROCESS_METRIC, constants.METRICBEAT.PROCESS_SUMMARY_METRIC, constants.METRICBEAT.SOCKET_SUMMARY_METRIC ], constants.BEATS.ENABLED_PROPERTY: True, constants.METRICBEAT.PROCESSES_PROPERTY: [".*"], constants.METRICBEAT.CPU_METRICS_PROPERTY: [constants.METRICBEAT.PERCENTAGES_PROPERTY, constants.METRICBEAT.NORMALIZED_PERCENTAGES_PROPERTY], constants.METRICBEAT.CORE_METRICS_PROPERTY: [constants.METRICBEAT.PERCENTAGES_PROPERTY] } ] return system_config
[docs] @staticmethod def metricbeat_linux_module_config() -> List[Dict[str, Any]]: """ Creates the linux metricbeat module configuration :return: the linux metricbeat module config """ linux_config = [ { constants.BEATS.MODULE_PROPERTY: constants.METRICBEAT.LINUX_MODULE, constants.BEATS.PERIOD_PROPERTY: "10s", constants.BEATS.METRICSETS_PROPERTY: { constants.METRICBEAT.PAGEINFO_METRIC, constants.METRICBEAT.SUMMARY_METRIC }, constants.BEATS.ENABLED_PROPERTY: True } ] return linux_config
[docs] @staticmethod def heartbeat_config(kibana_ip: str, kibana_port: int, elastic_ip: str, elastic_port: int, num_elastic_shards: int, hosts_to_monitor: List[str]) -> Dict[str, Any]: """ Generates the heartbeat.yml config :param kibana_ip: the IP of Kibana where the data should be visualized :param kibana_port: the port of Kibana where the data should be visualized :param elastic_ip: the IP of elastic where the data should be shipped :param elastic_port: the port of elastic where the data should be shipped :param num_elastic_shards: the number of elastic shards :param hosts_to_monitor: a list of host IP addresses to monitor :return: the heartbeat configuration dict """ heartbeat_config: Dict[str, Any] = {} heartbeat_config[constants.HEARTBEAT.HEARTBEAT_MONITORS_PROPERTY] = [ { constants.BEATS.TYPE_PROPERTY: constants.HEARTBEAT.ICMP_MONITOR_TYPE, constants.HEARTBEAT.SCHEDULE_PROPERTY: '*/5 * * * * * *', constants.BEATS.HOSTS_PROPERTY: hosts_to_monitor, constants.BEATS.ID_PROPERTY: constants.HEARTBEAT.CSLE_MONITOR_SERVICE_ID, constants.BEATS.NAME_PROPERTY: constants.HEARTBEAT.CSLE_MONITOR_SERVICE_NAME } ] heartbeat_config[constants.BEATS.SETUP_TEMPLATE_SETTINGS_PROPERTY] = { constants.BEATS.INDEX_NUM_SHARDS_PROPERTY: num_elastic_shards } heartbeat_config[constants.BEATS.SETUP_KIBANA_PROPERTY] = { constants.BEATS.HOST_PROPERTY: f"{kibana_ip}:{kibana_port}" } heartbeat_config[constants.BEATS.ELASTIC_OUTPUT_PROPERTY] = { constants.BEATS.HOSTS_PROPERTY: [f"{elastic_ip}:{elastic_port}"] } heartbeat_config[constants.BEATS.PROCESSORS_PROPERTY] = { constants.BEATS.ADD_HOST_METADATA_PROPERTY: { constants.BEATS.WHEN_NOT_CONTAIN_TAGS_PROPERTY: constants.BEATS.FORWARDED_PROPERTY } } return heartbeat_config