Source code for csle_common.util.read_emulation_statistics_util

import logging
from typing import List, Dict
import time
import datetime
from confluent_kafka import Consumer, KafkaError, KafkaException, TopicPartition
from csle_ryu.dao.avg_port_statistic import AvgPortStatistic
from csle_ryu.dao.avg_flow_statistic import AvgFlowStatistic
from csle_ryu.dao.agg_flow_statistic import AggFlowStatistic
from csle_ryu.dao.flow_statistic import FlowStatistic
from csle_ryu.dao.port_statistic import PortStatistic
import csle_collector.constants.constants as collector_constants
import csle_common.constants.constants as constants
from csle_collector.snort_ids_manager.dao.snort_ids_alert_counters import SnortIdsAlertCounters
from csle_collector.snort_ids_manager.dao.snort_ids_rule_counters import SnortIdsRuleCounters
from csle_collector.snort_ids_manager.dao.snort_ids_ip_alert_counters import SnortIdsIPAlertCounters
from csle_collector.ossec_ids_manager.dao.ossec_ids_alert_counters import OSSECIdsAlertCounters
from csle_collector.client_manager.client_population_metrics import ClientPopulationMetrics
from csle_collector.docker_stats_manager.dao.docker_stats import DockerStats
from csle_collector.host_manager.dao.host_metrics import HostMetrics
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_action.defender.emulation_defender_action import EmulationDefenderAction
from csle_common.dao.emulation_config.emulation_metrics_time_series import EmulationMetricsTimeSeries


[docs]class ReadEmulationStatisticsUtil: """ Utility class for reading emulation statistics """
[docs] @staticmethod def read_all(emulation_env_config: EmulationEnvConfig, logger: logging.Logger, time_window_minutes: int = 100) \ -> EmulationMetricsTimeSeries: """ Reads all time series data from the kafka log :param emulation_env_config: the configuration of the emulation environment :param time_window_minutes : the length of the time window in minutes to consume (now-time_windows_minutes, now) :return: the collected time series data """ client_metrics: List[ClientPopulationMetrics] = [] agg_docker_stats: List[DockerStats] = [] docker_host_stats: Dict[str, List[DockerStats]] = {} host_metrics: Dict[str, List[HostMetrics]] = {} aggregated_host_metrics: List[HostMetrics] = [] defender_actions: List[EmulationDefenderAction] = [] attacker_actions: List[EmulationAttackerAction] = [] snort_ids_metrics: List[SnortIdsAlertCounters] = [] snort_ids_ip_metrics: Dict[str, List[SnortIdsIPAlertCounters]] = {} snort_ids_rule_metrics: List[SnortIdsRuleCounters] = [] snort_alert_metrics_per_ids: Dict[str, List[SnortIdsAlertCounters]] = {} snort_rule_metrics_per_ids: Dict[str, List[SnortIdsRuleCounters]] = {} total_snort_metrics: List[SnortIdsAlertCounters] = [] total_snort_rule_metrics: List[SnortIdsRuleCounters] = [] total_host_metrics: List[HostMetrics] = [] ossec_host_ids_metrics: Dict[str, List[OSSECIdsAlertCounters]] = {} total_ossec_metrics: List[OSSECIdsAlertCounters] = [] aggregated_ossec_metrics: List[OSSECIdsAlertCounters] = [] openflow_flow_stats: List[FlowStatistic] = [] openflow_port_stats: List[PortStatistic] = [] avg_openflow_flow_stats: List[AvgFlowStatistic] = [] avg_openflow_port_stats: List[AvgPortStatistic] = [] openflow_flow_metrics_per_switch: Dict[str, List[FlowStatistic]] = {} openflow_port_metrics_per_switch: Dict[str, List[PortStatistic]] = {} openflow_flow_avg_metrics_per_switch: Dict[str, List[AvgFlowStatistic]] = {} openflow_port_avg_metrics_per_switch: Dict[str, List[AvgPortStatistic]] = {} agg_openflow_flow_stats: List[AggFlowStatistic] = [] agg_openflow_flow_metrics_per_switch: Dict[str, List[AggFlowStatistic]] = {} num_ossec_containers = len(list(filter(lambda x: x.name in constants.CONTAINER_IMAGES.OSSEC_IDS_IMAGES, emulation_env_config.containers_config.containers))) num_snort_containers = len(list(filter(lambda x: x.name in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES, emulation_env_config.containers_config.containers))) for c in emulation_env_config.containers_config.containers: docker_host_stats[c.get_full_name()] = [] host_metrics[c.get_full_name()] = [] ossec_host_ids_metrics[c.get_full_name()] = [] snort_ids_ip_metrics[c.get_full_name()] = [] for ids_image in constants.CONTAINER_IMAGES.SNORT_IDS_IMAGES: if ids_image in c.name: snort_alert_metrics_per_ids[c.get_full_name()] = [] snort_rule_metrics_per_ids[c.get_full_name()] = [] host_metrics[emulation_env_config.kafka_config.container.get_full_name()] = [] docker_host_stats[emulation_env_config.kafka_config.container.get_full_name()] = [] ossec_host_ids_metrics[emulation_env_config.kafka_config.container.get_full_name()] = [] snort_ids_ip_metrics[emulation_env_config.kafka_config.container.get_full_name()] = [] host_metrics[emulation_env_config.elk_config.container.get_full_name()] = [] docker_host_stats[emulation_env_config.elk_config.container.get_full_name()] = [] ossec_host_ids_metrics[emulation_env_config.elk_config.container.get_full_name()] = [] snort_ids_ip_metrics[emulation_env_config.elk_config.container.get_full_name()] = [] if emulation_env_config.sdn_controller_config is not None: host_metrics[emulation_env_config.sdn_controller_config.container.get_full_name()] = [] docker_host_stats[emulation_env_config.sdn_controller_config.container.get_full_name()] = [] ossec_host_ids_metrics[emulation_env_config.sdn_controller_config.container.get_full_name()] = [] snort_ids_ip_metrics[emulation_env_config.sdn_controller_config.container.get_full_name()] = [] topic_names = [collector_constants.KAFKA_CONFIG.ATTACKER_ACTIONS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.DOCKER_HOST_STATS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.DEFENDER_ACTIONS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.DOCKER_STATS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.SNORT_IDS_LOG_TOPIC_NAME, collector_constants.KAFKA_CONFIG.HOST_METRICS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.CLIENT_POPULATION_TOPIC_NAME, collector_constants.KAFKA_CONFIG.OSSEC_IDS_LOG_TOPIC_NAME, collector_constants.KAFKA_CONFIG.OPENFLOW_FLOW_STATS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.OPENFLOW_PORT_STATS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.AVERAGE_OPENFLOW_FLOW_STATS_PER_SWITCH_TOPIC_NAME, collector_constants.KAFKA_CONFIG.AVERAGE_OPENFLOW_PORT_STATS_PER_SWITCH_TOPIC_NAME, collector_constants.KAFKA_CONFIG.OPENFLOW_AGG_FLOW_STATS_TOPIC_NAME, collector_constants.KAFKA_CONFIG.SNORT_IDS_IP_LOG_TOPIC_NAME, collector_constants.KAFKA_CONFIG.SNORT_IDS_RULE_LOG_TOPIC_NAME] logger.info(f"Reading time-series data for the last {time_window_minutes} minutes from topics: {topic_names}") start_consume_ts = time.time() kafka_conf = { collector_constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY: f"{emulation_env_config.kafka_config.container.docker_gw_bridge_ip}:" f"{emulation_env_config.kafka_config.kafka_port_external}", collector_constants.KAFKA.GROUP_ID_PROPERTY: f"emulation_data_consumer_thread_{start_consume_ts}", collector_constants.KAFKA.AUTO_OFFSET_RESET_PROPERTY: collector_constants.KAFKA.EARLIEST_OFFSET} consumer = Consumer(**kafka_conf) start_consume_ts = int(datetime.datetime.timestamp(datetime.datetime.now() - datetime.timedelta(minutes=time_window_minutes))) start_consume_ts = int(start_consume_ts * 1e3) # convert to ms topic_partitions = list(map(lambda x: TopicPartition(topic=x, partition=0, offset=start_consume_ts), topic_names)) time_offsets_partitions = consumer.offsets_for_times(partitions=topic_partitions, timeout=20) def on_assign(consumer, partitions): """ Callback called when a Kafka consumer is assigned to a partition """ consumer.assign(time_offsets_partitions) consumer.subscribe(topic_names, on_assign=on_assign) done = False num_msg = 0 host_metrics_counter = 0 ossec_host_metrics_counter = 0 snort_metrics_counter = 0 snort_rule_metrics_counter = 0 while not done: msg = consumer.poll(timeout=1.0) if msg is not None: num_msg += 1 if msg.error(): if msg.error().code() == KafkaError._PARTITION_EOF: logger.warning(f"reached end of partition: {msg.topic(), msg.partition(), msg.offset()}") elif msg.error(): raise KafkaException(msg.error()) else: topic = msg.topic() if topic == collector_constants.KAFKA_CONFIG.DOCKER_STATS_TOPIC_NAME: agg_docker_stats.append(DockerStats.from_kafka_record(record=msg.value().decode())) elif topic == collector_constants.KAFKA_CONFIG.HOST_METRICS_TOPIC_NAME: metrics = HostMetrics.from_kafka_record(record=msg.value().decode()) c_1 = emulation_env_config.get_container_from_ip(metrics.ip) if c_1 is not None: host_metrics[c_1.get_full_name()].append(metrics) host_metrics_counter += 1 total_host_metrics.append(metrics) elif topic == collector_constants.KAFKA_CONFIG.OSSEC_IDS_LOG_TOPIC_NAME: metrics = OSSECIdsAlertCounters.from_kafka_record(record=msg.value().decode()) c_1 = emulation_env_config.get_container_from_ip(metrics.ip) if c_1 is not None: ossec_host_ids_metrics[c_1.get_full_name()].append(metrics) ossec_host_metrics_counter += 1 total_ossec_metrics.append(metrics) elif topic == collector_constants.KAFKA_CONFIG.ATTACKER_ACTIONS_TOPIC_NAME: attacker_actions.append(EmulationAttackerAction.from_kafka_record(record=msg.value().decode())) elif topic == collector_constants.KAFKA_CONFIG.DEFENDER_ACTIONS_TOPIC_NAME: defender_actions.append(EmulationDefenderAction.from_kafka_record(record=msg.value().decode())) elif topic == collector_constants.KAFKA_CONFIG.DOCKER_HOST_STATS_TOPIC_NAME: stats = DockerStats.from_kafka_record(record=msg.value().decode()) c_1 = emulation_env_config.get_container_from_ip(stats.ip) if c_1 is not None: docker_host_stats[c_1.get_full_name()].append(stats) elif topic == collector_constants.KAFKA_CONFIG.SNORT_IDS_LOG_TOPIC_NAME: metrics = SnortIdsAlertCounters.from_kafka_record(record=msg.value().decode()) c_1 = emulation_env_config.get_container_from_ip(metrics.ip) if c_1 is not None: snort_alert_metrics_per_ids[c_1.get_full_name()].append(metrics) snort_metrics_counter += 1 total_snort_metrics.append(metrics) elif topic == collector_constants.KAFKA_CONFIG.SNORT_IDS_RULE_LOG_TOPIC_NAME: metrics = SnortIdsRuleCounters.from_kafka_record(record=msg.value().decode()) c_1 = emulation_env_config.get_container_from_ip(metrics.ip) if c_1 is not None: snort_rule_metrics_per_ids[c_1.get_full_name()].append(metrics) snort_rule_metrics_counter += 1 total_snort_rule_metrics.append(metrics) elif topic == collector_constants.KAFKA_CONFIG.CLIENT_POPULATION_TOPIC_NAME: client_metrics.append(ClientPopulationMetrics.from_kafka_record(record=msg.value().decode())) elif topic == collector_constants.KAFKA_CONFIG.OPENFLOW_FLOW_STATS_TOPIC_NAME: flow_metrics_record = FlowStatistic.from_kafka_record(record=msg.value().decode()) openflow_flow_stats.append(flow_metrics_record) if str(flow_metrics_record.datapath_id) not in openflow_flow_metrics_per_switch: openflow_flow_metrics_per_switch[str(flow_metrics_record.datapath_id)] = [] openflow_flow_metrics_per_switch[str(flow_metrics_record.datapath_id)].append( flow_metrics_record) elif topic == collector_constants.KAFKA_CONFIG.OPENFLOW_PORT_STATS_TOPIC_NAME: port_metrics_record = PortStatistic.from_kafka_record(record=msg.value().decode()) openflow_port_stats.append(port_metrics_record) if str(port_metrics_record.datapath_id) not in openflow_port_metrics_per_switch: openflow_port_metrics_per_switch[str(port_metrics_record.datapath_id)] = [] openflow_port_metrics_per_switch[str(port_metrics_record.datapath_id)].append( port_metrics_record) elif topic == collector_constants.KAFKA_CONFIG.AVERAGE_OPENFLOW_FLOW_STATS_PER_SWITCH_TOPIC_NAME: avg_flow_statistics_record = AvgFlowStatistic.from_kafka_record(record=msg.value().decode()) avg_openflow_flow_stats.append(avg_flow_statistics_record) if str(avg_flow_statistics_record.datapath_id) not in openflow_flow_avg_metrics_per_switch: openflow_flow_avg_metrics_per_switch[str(avg_flow_statistics_record.datapath_id)] = [] openflow_flow_avg_metrics_per_switch[str(avg_flow_statistics_record.datapath_id)].append( avg_flow_statistics_record) elif topic == collector_constants.KAFKA_CONFIG.AVERAGE_OPENFLOW_PORT_STATS_PER_SWITCH_TOPIC_NAME: avg_port_statistics_record = AvgPortStatistic.from_kafka_record(record=msg.value().decode()) avg_openflow_port_stats.append(avg_port_statistics_record) if str(avg_port_statistics_record.datapath_id) not in openflow_port_avg_metrics_per_switch: openflow_port_avg_metrics_per_switch[str(avg_port_statistics_record.datapath_id)] = [] openflow_port_avg_metrics_per_switch[str(avg_port_statistics_record.datapath_id)].append( avg_port_statistics_record) elif topic == collector_constants.KAFKA_CONFIG.OPENFLOW_AGG_FLOW_STATS_TOPIC_NAME: agg_flow_statistics_record = AggFlowStatistic.from_kafka_record(record=msg.value().decode()) agg_openflow_flow_stats.append(agg_flow_statistics_record) if str(agg_flow_statistics_record.datapath_id) not in agg_openflow_flow_metrics_per_switch: agg_openflow_flow_metrics_per_switch[str(agg_flow_statistics_record.datapath_id)] = [] agg_openflow_flow_metrics_per_switch[str(agg_flow_statistics_record.datapath_id)].append( agg_flow_statistics_record) elif topic == collector_constants.KAFKA_CONFIG.SNORT_IDS_IP_LOG_TOPIC_NAME: metrics = SnortIdsIPAlertCounters.from_kafka_record(record=msg.value().decode()) c_1 = emulation_env_config.get_container_from_ip(metrics.alert_ip) if c_1 is not None: snort_ids_ip_metrics[c_1.get_full_name()].append(metrics) if host_metrics_counter >= len(emulation_env_config.containers_config.containers): agg_host_metrics_dto = ReadEmulationStatisticsUtil.average_host_metrics( host_metrics=total_host_metrics) aggregated_host_metrics.append(agg_host_metrics_dto) host_metrics_counter = 0 total_host_metrics = [] if ossec_host_metrics_counter >= num_ossec_containers: agg_ossec_metrics_dto = ReadEmulationStatisticsUtil.average_ossec_metrics( ossec_metrics=total_ossec_metrics) aggregated_ossec_metrics.append(agg_ossec_metrics_dto) ossec_host_metrics_counter = 0 total_ossec_metrics = [] if snort_metrics_counter >= num_snort_containers: agg_snort_metrics_dto = ReadEmulationStatisticsUtil.average_snort_metrics( snort_metrics=total_snort_metrics) snort_ids_metrics.append(agg_snort_metrics_dto) snort_metrics_counter = 0 total_snort_metrics = [] if snort_rule_metrics_counter >= num_snort_containers: agg_snort_rule_metrics_dto = ReadEmulationStatisticsUtil.average_snort_rule_metrics( snort_metrics=total_snort_rule_metrics) snort_ids_rule_metrics.append(agg_snort_rule_metrics_dto) snort_rule_metrics_counter = 0 total_snort_rule_metrics = [] else: done = True consumer.close() dto = EmulationMetricsTimeSeries( client_metrics=client_metrics, aggregated_docker_stats=agg_docker_stats, host_metrics=host_metrics, docker_host_stats=docker_host_stats, agg_snort_ids_metrics=snort_ids_metrics, attacker_actions=attacker_actions, defender_actions=defender_actions, aggregated_host_metrics=aggregated_host_metrics, emulation_env_config=emulation_env_config, aggregated_ossec_host_alert_counters=aggregated_ossec_metrics, ossec_host_alert_counters=ossec_host_ids_metrics, avg_openflow_flow_stats=avg_openflow_flow_stats, avg_openflow_port_stats=avg_openflow_port_stats, openflow_port_stats=openflow_port_stats, openflow_flow_stats=openflow_flow_stats, openflow_flow_metrics_per_switch=openflow_flow_metrics_per_switch, openflow_port_metrics_per_switch=openflow_port_metrics_per_switch, openflow_flow_avg_metrics_per_switch=openflow_flow_avg_metrics_per_switch, openflow_port_avg_metrics_per_switch=openflow_port_avg_metrics_per_switch, agg_openflow_flow_stats=agg_openflow_flow_stats, agg_openflow_flow_metrics_per_switch=agg_openflow_flow_metrics_per_switch, agg_snort_ids_rule_metrics=snort_ids_rule_metrics, snort_ids_ip_metrics=snort_ids_ip_metrics, snort_rule_metrics_per_ids=snort_rule_metrics_per_ids, snort_alert_metrics_per_ids=snort_alert_metrics_per_ids ) return dto
[docs] @staticmethod def average_host_metrics(host_metrics: List[HostMetrics]) -> HostMetrics: """ Computes the average metrics from a list of host metrics :param host_metrics: the list of host metrics to average :return: the computed averages """ total_num_logged_in_users = 0 total_num_failed_login_attempts = 0 total_num_open_connections = 0 total_num_login_events = 0 total_num_processes = 0 total_num_users = 0 for metric in host_metrics: total_num_logged_in_users += metric.num_logged_in_users total_num_failed_login_attempts += metric.num_failed_login_attempts total_num_open_connections += metric.num_open_connections total_num_login_events += metric.num_login_events total_num_processes += metric.num_processes total_num_users += metric.num_users aggregated_host_metrics_dto = HostMetrics() aggregated_host_metrics_dto.num_logged_in_users = total_num_logged_in_users aggregated_host_metrics_dto.num_failed_login_attempts = total_num_failed_login_attempts aggregated_host_metrics_dto.num_open_connections = total_num_open_connections aggregated_host_metrics_dto.num_login_events = total_num_login_events aggregated_host_metrics_dto.num_processes = total_num_processes aggregated_host_metrics_dto.num_users = total_num_users return aggregated_host_metrics_dto
[docs] @staticmethod def average_ossec_metrics(ossec_metrics: List[OSSECIdsAlertCounters]) -> OSSECIdsAlertCounters: """ Computes the average metrics from a list of OSSEC metrics :param ossec_metrics: the list of OSSEC metrics :return: the computed averages """ aggregated_ossec_ids_alert_counters = OSSECIdsAlertCounters() for alert_counters in ossec_metrics: aggregated_ossec_ids_alert_counters.add(alert_counters) return aggregated_ossec_ids_alert_counters
[docs] @staticmethod def average_snort_metrics(snort_metrics: List[SnortIdsAlertCounters]) -> SnortIdsAlertCounters: """ Computes the average metrics from a list of Snort metrics :param snort_metrics: the list of Snort metrics :return: the computed averages """ aggregated_snort_ids_alert_counters = SnortIdsAlertCounters() for alert_counters in snort_metrics: aggregated_snort_ids_alert_counters.add(alert_counters) return aggregated_snort_ids_alert_counters
[docs] @staticmethod def average_snort_rule_metrics(snort_metrics: List[SnortIdsRuleCounters]) -> SnortIdsRuleCounters: """ Computes the average metrics from a list of Snort rule metrics :param snort_metrics: the list of Snort rule metrics :return: the computed averages """ aggregated_snort_ids_rule_counters = SnortIdsRuleCounters() for alert_counters in snort_metrics: aggregated_snort_ids_rule_counters.add(alert_counters) return aggregated_snort_ids_rule_counters