Source code for csle_attack_profiler.attack_profiler

from csle_common.dao.emulation_action.attacker.emulation_attacker_action \
    import EmulationAttackerAction
from csle_common.dao.emulation_action.attacker.emulation_attacker_action_id \
    import EmulationAttackerActionId
from csle_attack_profiler.dao.tactics import Tactics
from csle_attack_profiler.dao.attack_mapping import EmulationAttackerMapping
from csle_attack_profiler.dao.attack_graph import AttackGraph
from mitreattack.stix20 import MitreAttackData
from typing import List, Dict, Union
import os


[docs]class AttackProfiler: """ Class representing the attack profile based on the MITRE ATT&CK framework for Enterprise. """ def __init__(self, techniques_tactics: Dict[str, List[str]], mitigations: Dict[str, List[str]], data_sources: Dict[str, List[str]], subtechniques: Dict[str, str], action_id: EmulationAttackerActionId) -> None: """ Class constructor :params techniques_tactics: the techniques and tactics used by the attacker action. The key is the technique and the value is the tactics :params mitigations: the mitigations used by the attacker action. The key is the technique and the value is the mitigations :params data_sources: the data sources used by the attacker action. The key is the technqiue and the value is the data sources :params sub_techniques: the sub-techniques used by the attacker action. The key is the technique and the value is the sub-techniques :params action_id: the id of the attacker action """ self.techniques_tactics = techniques_tactics self.mitigations = mitigations self.data_sources = data_sources self.subtechniques = subtechniques self.action_id = action_id
[docs] @staticmethod def get_attack_profile(attacker_action: EmulationAttackerAction) -> 'AttackProfiler': """ Returns the attack profile of the actions :params attacker_action: the attacker action :return: the attack profile of the action """ current_dir = os.path.dirname(__file__) path = os.path.join(current_dir, "./dao/enterprise-attack.json") mitre_attack_data = MitreAttackData(path) # Retrieve the id from the attacker action attacker_id = attacker_action.id # Get the defined tactics and techniques for the attack attack_mapping = EmulationAttackerMapping.get_attack_info(attacker_id) if attack_mapping is None: return AttackProfiler({}, {}, {}, {}, EmulationAttackerActionId.CONTINUE) attack_techniques_vals = [technique.value for technique in attack_mapping['techniques']] attacker_action_id = attacker_action.id techniques_tactics = {} mitigations = {} data_sources = {} sub_techniques = {} # Loop over the techniques associated with the attack for technique_name in attack_techniques_vals: # Get technique from MitreAttackData, stix_id maps to technique in the library. try: obj = mitre_attack_data.get_objects_by_name(technique_name, "attack-pattern") except Exception as e: os.system("echo 'Error in getting technique: {}'".format(e)) continue technique = obj[0] stix_id = technique.id # Collect the tactics and add it to the dictionary tactics = [phase['phase_name'] for phase in technique.kill_chain_phases] techniques_tactics[technique_name] = tactics # Add the data sources to the dictionary if hasattr(technique, 'x_mitre_data_sources'): data_sources[technique_name] = technique.x_mitre_data_sources # Fetch the mitigations from the technique and add it to the dictionary try: mitigations_object = mitre_attack_data.get_mitigations_mitigating_technique(stix_id) mitigations_list = [mitig['object']['name'] for mitig in mitigations_object] mitigations[technique_name] = mitigations_list except Exception as e: os.system("echo 'Error in getting mitigations: {}'".format(e)) continue # Add the sub-technique to the dictionary if 'subtechniques' in attack_mapping: sub_techniques_mapping = [sub_technique.value for sub_technique in attack_mapping['subtechniques']] for st in sub_techniques_mapping: try: sub_technique_obj = mitre_attack_data.get_objects_by_name(st, "attack-pattern") parent_technique_obj = mitre_attack_data.get_parent_technique_of_subtechnique( sub_technique_obj[0].id) sub_techniques[parent_technique_obj[0]['object'].name] = st except Exception as e: os.system("echo 'Error in getting sub-techniques: {}'".format(e)) continue return AttackProfiler(techniques_tactics, mitigations, data_sources, sub_techniques, attacker_action_id)
[docs] @staticmethod def get_attack_profile_sequence(attacker_actions: List[EmulationAttackerAction], attack_graph: Union[AttackGraph, None] = None) -> List['AttackProfiler']: """ Returns the attack profile of the actions in a sequence :params attacker_action: a list of attacker actions :return: a list of attack profiles of the actions """ attack_profiles = [] for action in attacker_actions: attack_profiles.append(AttackProfiler.get_attack_profile(action)) # IF attack graph is provided if attack_graph: node = attack_graph.get_root_node() for profile in attack_profiles: # Get the mappings of the techniques and tactics techniques_tactics = profile.techniques_tactics techniques_to_keep = [] children = attack_graph.get_children(node[0], node[2]) possible_nodes = [] # First we check the techniques in node we are currently at for technique in techniques_tactics: # If the node.name is in the techniques_tactics, add it to the techniques_to_keep if node[0].value in techniques_tactics[technique]: techniques_to_keep.append(technique) if node not in possible_nodes: possible_nodes.append(node) if children is None: continue for child in children: # Child is a list of tuples, where the first element is the node name, # and the second element is the node id for technique in techniques_tactics: if child[0].value in techniques_tactics[technique]: techniques_to_keep.append(technique) # If the child is not in the possible_children, add it to the list. if attack_graph.get_node(child[0], child[1]) not in possible_nodes: p_node = attack_graph.get_node(child[0], child[1]) if p_node is not None: possible_nodes.append(p_node) # If the possible node is just one node, move to that node if len(possible_nodes) == 1: node = possible_nodes[0] if not techniques_to_keep: continue # Remove the techniques and associated tactics, data sources, # mitigations and sub-techniques that are not in the techniques_to_keep techniques_to_remove = set(profile.techniques_tactics.keys()) - set(techniques_to_keep) for technique in techniques_to_remove: try: del profile.techniques_tactics[technique] del profile.mitigations[technique] del profile.data_sources[technique] del profile.subtechniques[technique] except Exception as e: os.system("echo 'Error in removing techniques: {}'".format(e)) continue # ELSE Baseline conditions else: initial_access = False for profile in attack_profiles: techniques_tactics = profile.techniques_tactics techniques_to_remove = set() # Loop over the mappings of the techniques to tactics for technique in techniques_tactics: if Tactics.DISCOVERY.value in techniques_tactics[technique] and not initial_access: techniques_to_remove.add(technique) elif Tactics.RECONNAISSANCE.value in techniques_tactics[technique] and initial_access: techniques_to_remove.add(technique) if Tactics.INITIAL_ACCESS.value in techniques_tactics[technique] and not initial_access: initial_access = True elif Tactics.INITIAL_ACCESS.value in techniques_tactics[technique] and initial_access: techniques_to_remove.add(technique) elif Tactics.LATERAL_MOVEMENT.value in techniques_tactics[technique] and not initial_access: techniques_to_remove.add(technique) # Remove the techniques and associated tactics, data sources, mitigations and sub-techniques for technique in techniques_to_remove: try: del profile.techniques_tactics[technique] del profile.mitigations[technique] del profile.data_sources[technique] del profile.subtechniques[technique] except Exception as e: os.system("echo 'Error in removing techniques: {}'".format(e)) continue return attack_profiles