Source code for csle_attacker.emulation.util.nmap_util

from typing import Tuple, List, Union, Any
import time
import threading
import xml.etree.ElementTree as ET
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.dao.emulation_action.attacker.emulation_attacker_action \
    import EmulationAttackerAction
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
    import EmulationAttackerMachineObservationState
import csle_common.constants.constants as constants
from csle_common.dao.emulation_action_result.nmap_scan_result import NmapScanResult
from csle_common.dao.emulation_action_result.nmap_host_result import NmapHostResult
from csle_common.dao.emulation_action_result.nmap_port_status import NmapPortStatus
from csle_common.dao.emulation_action_result.nmap_host_status import NmapHostStatus
from csle_common.dao.emulation_action_result.nmap_port import NmapPort
from csle_common.dao.emulation_action_result.nmap_addr_type import NmapAddrType
from csle_common.dao.emulation_config.transport_protocol import TransportProtocol
from csle_common.dao.emulation_action_result.nmap_os import NmapOs
from csle_common.dao.emulation_action_result.nmap_vuln import NmapVuln
from csle_common.dao.emulation_action_result.nmap_brute_credentials import NmapBruteCredentials
from csle_common.dao.emulation_action_result.nmap_hop import NmapHop
from csle_common.dao.emulation_action_result.nmap_trace import NmapTrace
from csle_common.dao.emulation_action_result.nmap_http_enum import NmapHttpEnum
from csle_common.dao.emulation_action_result.nmap_http_grep import NmapHttpGrep
from csle_common.dao.emulation_action_result.nmap_vulscan import NmapVulscan
from csle_common.util.emulation_util import EmulationUtil
from csle_common.util.connection_util import ConnectionUtil
from csle_common.logging.log import Logger


[docs]class NmapUtil: """ Class containing utility functions for the nmap-related functionality to the emulation """
[docs] @staticmethod def parse_nmap_scan(file_name: str, emulation_env_config: EmulationEnvConfig, conn=None, dir: str = "") \ -> ET.Element: """ Parses an XML file containing the result of an nmap scan :param file_name: name of the file to parse :param conn: the SSH connection to use for parsing :param dir: the directory to parse the XML file :param emulation_env_config: environment config :return: the parsed xml file """ if conn is None: conn = emulation_env_config.get_hacker_connection() if dir is None or dir == "": dir = constants.NMAP.RESULTS_DIR sftp_client = conn.open_sftp() file_name = str(dir + file_name).strip() remote_file = sftp_client.open(file_name) try: xml_tree = ET.parse(remote_file) finally: remote_file.close() xml_data = xml_tree.getroot() return xml_data
[docs] @staticmethod def parse_nmap_scan_xml(xml_data, ips: List[str], action: EmulationAttackerAction) -> NmapScanResult: """ Parses an XML Tree into a DTO :param xml_data: the xml tree to parse :param ips: ips of the source of the scan :param action: the action of the scan :return: parsed nmap scan result """ hosts: List[NmapHostResult] = [] for child in xml_data: if child.tag == constants.NMAP_XML.HOST: host = NmapUtil._parse_nmap_host_xml(child, action=action) hosts = NmapUtil._merge_nmap_hosts(host, hosts) result = NmapScanResult(hosts=hosts, ips=ips) return result
@staticmethod def _parse_nmap_host_xml(xml_data, action: EmulationAttackerAction) -> NmapHostResult: """ Parses a host-element in the XML tree :param xml_data: the host element :param action: action of the scan :return: parsed nmap host result """ ip_addr = None mac_addr = None hostnames = [] ports: List[NmapPort] = [] vulnerabilities: List[NmapVuln] = [] credentials: List[NmapBruteCredentials] = [] os = None os_matches = [] status = NmapHostStatus.UP trace = None for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.ADDRESS: addr, type = NmapUtil._parse_nmap_address_xml(child) if type == NmapAddrType.MAC: mac_addr = addr else: ip_addr = addr elif child.tag == constants.NMAP_XML.HOSTNAMES: hostnames = NmapUtil._parse_nmap_hostnames_xml(child) elif child.tag == constants.NMAP_XML.PORTS: ports, vulnerabilities, credentials = NmapUtil._parse_nmap_ports_xml(child, action=action) elif child.tag == constants.NMAP_XML.OS: os_matches = NmapUtil._parse_nmap_os_xml(child) os = NmapOs.get_best_match(os_matches) elif child.tag == constants.NMAP_XML.TRACE: trace = NmapUtil._parse_nmap_trace_xml(child) nmap_host_result = NmapHostResult(status=status, ips=[ip_addr], mac_addr=mac_addr, hostnames=hostnames, ports=ports, os=os, os_matches=os_matches, vulnerabilities=vulnerabilities, credentials=credentials, trace=trace) return nmap_host_result @staticmethod def _parse_nmap_status_xml(xml_data) -> NmapPortStatus: """ Parses a status element in the xml tree :param xml_data: the status XML element :return: parsed Nmap Port-Status DTO """ status = NmapPortStatus.DOWN status_val = xml_data.attrib[constants.NMAP_XML.STATE] if status_val == constants.NMAP_XML.STATUS_UP: status = NmapPortStatus.UP return status @staticmethod def _parse_nmap_address_xml(xml_data) -> Tuple[str, NmapAddrType]: """ Parses a address element in the xml tree :param xml_data: the address XML element :return: (address, addresstype) """ type = NmapAddrType.IP addr = xml_data.attrib[constants.NMAP_XML.ADDR] addrtype_val = xml_data.attrib[constants.NMAP_XML.ADDR_TYPE] if constants.NMAP_XML.IP in addrtype_val: type = NmapAddrType.IP elif constants.NMAP_XML.MAC in addrtype_val: type = NmapAddrType.MAC return addr, type @staticmethod def _parse_nmap_hostnames_xml(xml_data) -> List[str]: """ Parses a hostnames element in the XML tree :param xml_data: the hostnames XML element :return: a list of hostnames """ hostnames = [] for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.HOSTNAME: hostnames.append(child.attrib[constants.NMAP_XML.NAME]) return hostnames @staticmethod def _parse_nmap_ports_xml(xml_data, action: EmulationAttackerAction) \ -> Tuple[List[NmapPort], List[NmapVuln], List[NmapBruteCredentials]]: """ Parses a ports XML element in the XML tree :param xml_data: the ports XML element :param action: action of the scan :return: (List NmapPort, List NmapVuln, ListNmapBruteCredentials) """ ports = [] vulnerabilities = [] credentials = [] http_enum = None http_grep = None vulscan = None for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.PORT: port_status = NmapPortStatus.DOWN protocol = TransportProtocol._from_str(child.attrib["protocol"]) port_id = child.attrib[constants.NMAP_XML.PORT_ID] service_name = constants.NMAP_XML.UNKNOWN service_version = "" service_fp = "" for child_2 in list(child.iter()): if child_2.tag == constants.NMAP_XML.STATE: port_status = NmapUtil._parse_nmap_port_status_xml(child_2) elif child_2.tag == constants.NMAP_XML.SERVICE: service_name = NmapUtil._parse_nmap_service_name_xml(child_2) service_version = NmapUtil._parse_nmap_service_version_xml(child_2) service_fp = NmapUtil._parse_nmap_service_fp_xml(child_2) elif child_2.tag == constants.NMAP_XML.SCRIPT: result, brute_vuln = NmapUtil._parse_nmap_script(child_2, port=port_id, protocol=protocol, service=service_name, action=action) if result is not None: if isinstance(result, list) and len(result) > 0 and isinstance(result[0], NmapVuln): vulnerabilities = result elif isinstance(result, list) and len(result) > 0 \ and isinstance(result[0], NmapBruteCredentials): credentials = result if brute_vuln is not None: vulnerabilities.append(brute_vuln) elif isinstance(result, NmapHttpEnum): http_enum = result elif isinstance(result, NmapHttpGrep): http_grep = result elif isinstance(result, NmapVulscan): vulscan = result if port_status == NmapPortStatus.UP: port = NmapPort(port_id=port_id, protocol=protocol, status=port_status, service_name=service_name, http_enum=http_enum, http_grep=http_grep, vulscan=vulscan, service_version=service_version, service_fp=service_fp) ports.append(port) return ports, vulnerabilities, credentials @staticmethod def _parse_nmap_service_name_xml(xml_data) -> Union[str, Any]: """ Parses a XML service name element :param xml_data: the XML service element :return: the name of the service """ return xml_data.attrib[constants.NMAP_XML.NAME] @staticmethod def _parse_nmap_service_version_xml(xml_data) -> str: """ Parses a XML service element :param xml_data: the XML service element :return: the version of the service """ version = "" if constants.NMAP_XML.VERSION in xml_data.keys(): version = xml_data.attrib[constants.NMAP_XML.VERSION] return version @staticmethod def _parse_nmap_service_fp_xml(xml_data) -> str: """ Parses a XML service element :param xml_data: the XML service element :return: the fingerprint of the service """ servicefp = "" if constants.NMAP_XML.SERVICEFP in xml_data.keys(): servicefp = xml_data.attrib[constants.NMAP_XML.SERVICEFP] return servicefp @staticmethod def _parse_nmap_port_status_xml(xml_data) -> NmapPortStatus: """ Parses a XML port status element :param xml_data: the XML port status element :return: the parsed port status """ port_status = NmapPortStatus.DOWN if xml_data.attrib[constants.NMAP_XML.STATE] == constants.NMAP_XML.OPEN_STATE: port_status = NmapPortStatus.UP return port_status @staticmethod def _parse_nmap_os_xml(xml_data) -> List[NmapOs]: """ Parses NMAP OS XML element :param xml_data: the XML OS element :return: Parsed NmapOS """ os_matches = [] for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.OS_MATCH: name = child.attrib[constants.NMAP_XML.NAME] accuracy = int(child.attrib[constants.NMAP_XML.ACCURACY]) t_acc_cmp = 0 vendor = "" osfamily = "" for c2 in list(child.iter()): if c2.tag == constants.NMAP_XML.OS_CLASS: t_acc = int(c2.attrib[constants.NMAP_XML.ACCURACY]) if t_acc > t_acc_cmp: vendor = c2.attrib[constants.NMAP_XML.VENDOR] osfamily = c2.attrib[constants.NMAP_XML.OS_FAMILY] t_acc_cmp = t_acc os_match = NmapOs(name=name, vendor=vendor, osfamily=osfamily, accuracy=accuracy) os_matches.append(os_match) return os_matches @staticmethod def _parse_nmap_table_vuln(xml_data, port: int, protocol: TransportProtocol, service: str) -> NmapVuln: """ Parses a Table XML element with vulnerabilities :param xml_data: the XML table element :param port: the port element parent :param protocol: the protocol of the parent :param service: the service running on the port :return: parsed Nmap vulnerability """ cvss = constants.VULNERABILITIES.default_cvss id = "" for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.ELEM: if constants.NMAP_XML.KEY in child.keys(): if child.attrib[constants.NMAP_XML.KEY] == constants.NMAP_XML.CVSS: cvss = float(child.text) elif child.attrib[constants.NMAP_XML.KEY] == constants.NMAP_XML.ID: id = child.text vuln = NmapVuln(name=id, port=port, protocol=protocol, cvss=cvss, service=service) return vuln @staticmethod def _parse_nmap_script(xml_data, port: int, protocol: TransportProtocol, service: str, action: EmulationAttackerAction) \ -> Tuple[Union[List[NmapVuln], List[NmapBruteCredentials], NmapHttpEnum, NmapHttpGrep, NmapVulscan, None], Union[NmapVuln, None]]: """ Parses a XML script element :param xml_data: the XML script element :param port: the port of the parent element :param protocol: the protocol of the parent element :param service: the service running on the port :param action: action of the scan :return: a list of parsed nmap vulnerabilities or a list of parsed credentials and maybe a vuln """ if constants.NMAP_XML.ID in xml_data.keys(): if xml_data.attrib[constants.NMAP_XML.ID] == constants.NMAP_XML.VULNERS_SCRIPT_ID: return NmapUtil._parse_nmap_vulners(xml_data, port=port, protocol=protocol, service=service), None elif xml_data.attrib[constants.NMAP_XML.ID] in constants.NMAP_XML.BRUTE_SCRIPTS: return NmapUtil._parse_nmap_brute(xml_data, port=port, protocol=protocol, service=service, action=action) elif xml_data.attrib[constants.NMAP_XML.ID] == constants.NMAP_XML.HTTP_ENUM_SCRIPT: return NmapUtil._parse_nmap_http_enum_xml(xml_data), None elif xml_data.attrib[constants.NMAP_XML.ID] == constants.NMAP_XML.HTTP_GREP_SCRIPT: return NmapUtil._parse_nmap_http_grep_xml(xml_data), None elif xml_data.attrib[constants.NMAP_XML.ID] == constants.NMAP_XML.VULSCAN_SCRIPT: return NmapUtil._parse_nmap_http_vulscan_xml(xml_data), None return None, None @staticmethod def _parse_nmap_vulners(xml_data, port: int, protocol: TransportProtocol, service: str) -> List[NmapVuln]: """ Parses a XML result from a vulners scan :param xml_data: the XML script element :param port: the port of the parent element :param protocol: the protocol of the parent element :param service: the service running on the port :return: a list of parsed nmap vulnerabilities """ vulnerabilities = [] for child in list(xml_data.iter())[1:]: if child.tag == constants.NMAP_XML.TABLE: for c_2 in list(child.iter())[1:]: if c_2.tag == constants.NMAP_XML.TABLE: vuln = NmapUtil._parse_nmap_table_vuln(c_2, port=port, protocol=protocol, service=service) vulnerabilities.append(vuln) break return vulnerabilities @staticmethod def _parse_nmap_brute(xml_data, port: int, protocol: TransportProtocol, service: str, action: EmulationAttackerAction) -> Tuple[List[NmapBruteCredentials], NmapVuln]: """ Parses a XML result from a brute force dictionary scan :param xml_data: the XML script element :param port: the port of the parent element :param protocol: the protocol of the parent element :param service: the service running on the port :return: a list of found credentials, vulnerability """ credentials = [] for child in list(xml_data.iter())[1:]: if child.tag == constants.NMAP_XML.TABLE: if constants.NMAP_XML.KEY in child.keys(): if child.attrib[constants.NMAP_XML.KEY] == constants.NMAP_XML.ACCOUNTS: for c_2 in list(child.iter())[1:]: if c_2.tag == constants.NMAP_XML.TABLE: cred = NmapUtil._parse_nmap_table_cred(c_2, port=port, protocol=protocol, service=service) credentials.append(cred) break vulnerability = None if len(credentials) > 0: vuln_name = EnvDynamicsUtil.exploit_get_vuln_name(a=action) service_name = EnvDynamicsUtil.exploit_get_service_name(a=action) credentials_1 = list(map(lambda x: x.to_obs(), credentials)) vulnerability = NmapVuln(name=vuln_name, port=port, protocol=protocol, cvss=EnvDynamicsUtil.exploit_get_vuln_cvss(a=action), service=service_name, credentials=credentials_1) return credentials, vulnerability @staticmethod def _parse_nmap_table_cred(xml_data, port: int, protocol: TransportProtocol, service: str) -> NmapBruteCredentials: """ Parses a Table XML element with credentials :param xml_data: the XML table element :param port: the port element parent :param protocol: the protocol of the parent :param service: the service running on the port :return: parsed Nmap credentials """ username = "" pw = "" state = "" for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.ELEM: if constants.NMAP_XML.KEY in child.keys(): if child.attrib[constants.NMAP_XML.KEY] == constants.NMAP_XML.USERNAME: username = child.text elif child.attrib[constants.NMAP_XML.KEY] == constants.NMAP_XML.PASSWORD: pw = child.text elif child.attrib[constants.NMAP_XML.KEY] == constants.NMAP_XML.STATE: state = child.text credentials = NmapBruteCredentials(username=username, pw=pw, state=state, port=int(port), protocol=protocol, service=service) return credentials
[docs] @staticmethod def merge_nmap_scan_result_with_state(scan_result: NmapScanResult, s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Merges a NMAP scan result with an existing observation state :param scan_result: the scan result :param s: the current state :param a: the action just executed :return: s' """ new_m_obs = [] for host in scan_result.hosts: m_obs = EmulationAttackerMachineObservationState.from_nmap_result(host) # m_obs = EnvDynamicsUtil.brute_tried_flags(a=a, m_obs=m_obs) new_m_obs.append(m_obs) attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, new_m_obs, action=a, emulation_env_config=s.emulation_env_config) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs] @staticmethod def nmap_scan_action_helper(s: EmulationEnvState, a: EmulationAttackerAction, masscan: bool = False) -> EmulationEnvState: """ Helper function for executing a NMAP scan action on the emulation. Implements caching. :param s: the current env state :param a: the NMAP action to execute :param masscan: whether it is a masscan or not :return: s' """ # If cache miss, then execute cmd cmds, file_names = a.nmap_cmds() if masscan: cmds = a.masscan_cmds() Logger.__call__().get_logger().info( f"Running NMAP scan on container: {s.emulation_env_config.containers_config.agent_ip}, commands: " f"{','.join(cmds)}") results = EmulationUtil.execute_ssh_cmds(cmds=cmds, conn=s.emulation_env_config.get_hacker_connection()) total_time = sum(list(map(lambda x: x[2], results))) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) # Read results scan_result = NmapScanResult(hosts=[], ips=[s.emulation_env_config.containers_config.agent_ip]) for file_name in file_names: for i in range(constants.ENV_CONSTANTS.NUM_RETRIES): xml_data = NmapUtil.parse_nmap_scan(file_name=file_name, emulation_env_config=s.emulation_env_config) scan_result_new = NmapUtil.parse_nmap_scan_xml( xml_data, ips=[s.emulation_env_config.containers_config.agent_ip], action=a) s.attacker_obs_state.agent_reachable.update(scan_result.reachable) scan_result = NmapUtil.merge_nmap_scan_results(scan_result, scan_result_new) return NmapUtil.nmap_pivot_scan_action_helper(s=s, a=a, partial_result=scan_result.copy())
@staticmethod def _merge_nmap_hosts(host: NmapHostResult, hosts: List[NmapHostResult]) -> List[NmapHostResult]: """ Merge nmap host results :param host: a host to merge with an existing list of hosts :param hosts: the list of hosts to merge with the new host :return: the merged list """ found = False for h in hosts: if h.ips_match(host.ips): found = True vulnerabilities = list(set(h.vulnerabilities).union(host.vulnerabilities)) h.vulnerabilities = vulnerabilities ports = list(set(h.ports).union(host.ports)) h.ports = ports if not found: hosts.append(host) return hosts @staticmethod def _parse_nmap_trace_xml(xml_data) -> NmapTrace: """ Parses a trace XML element in the XML tree :param xml_data: the trace XML element :return: NmapTrace """ hops = [] for child in list(xml_data.iter()): if child.tag == constants.NMAP_XML.HOP: hop = NmapUtil._parse_nmap_hop_xml(child) hops.append(hop) nmap_trace = NmapTrace(hops=hops) return nmap_trace @staticmethod def _parse_nmap_hop_xml(xml_data) -> NmapHop: """ Parses a hop XML element in the XML tree :param xml_data: the hop XML element :return: NmapHop """ ttl = 0 ip = "" rtt = 0.0 host = "" if constants.NMAP_XML.IPADDR in xml_data.keys(): ip = xml_data.attrib[constants.NMAP_XML.IPADDR] if constants.NMAP_XML.RTT in xml_data.keys(): rtt = float(xml_data.attrib[constants.NMAP_XML.RTT]) if constants.NMAP_XML.TTL in xml_data.keys(): ttl = int(xml_data.attrib[constants.NMAP_XML.TTL]) if constants.NMAP_XML.HOST in xml_data.keys(): host = xml_data.attrib[constants.NMAP_XML.HOST] nmap_hop = NmapHop(ttl=ttl, ipaddr=ip, rtt=rtt, host=host) return nmap_hop @staticmethod def _parse_nmap_http_enum_xml(xml_data) -> NmapHttpEnum: """ Parses a http enum XML element in the XML tree :param xml_data: the http enum XML element :return: HttpEnum """ output = "" if constants.NMAP_XML.OUTPUT in xml_data.keys(): output = xml_data.attrib[constants.NMAP_XML.OUTPUT] nmap_http_enum = NmapHttpEnum(output=output) return nmap_http_enum @staticmethod def _parse_nmap_http_grep_xml(xml_data) -> NmapHttpGrep: """ Parses a http grep XML element in the XML tree :param xml_data: the http grep XML element :return: NmapHttpGrep """ output = "" if constants.NMAP_XML.OUTPUT in xml_data.keys(): output = xml_data.attrib[constants.NMAP_XML.OUTPUT] nmap_http_grep = NmapHttpGrep(output=output) return nmap_http_grep @staticmethod def _parse_nmap_http_vulscan_xml(xml_data) -> NmapVulscan: """ Parses a vulscan XML element in the XML tree :param xml_data: the vulscan XML element :return: NmapVulScan """ output = "" if constants.NMAP_XML.OUTPUT in xml_data.keys(): output = xml_data.attrib[constants.NMAP_XML.OUTPUT] nmap_vulscan = NmapVulscan(output=output) return nmap_vulscan
[docs] @staticmethod def merge_nmap_scan_results(scan_result_1: NmapScanResult, scan_result_2: NmapScanResult) -> NmapScanResult: """ Merges two nmap scan results :param scan_result_1: the first result to merge :param scan_result_2: the second result to merge :return: the merged result """ new_hosts = [] for h in scan_result_2.hosts: new_host = True for h2 in scan_result_1.hosts: if h.ips_match(h2.ips): new_host = False if new_host: new_hosts.append(h) for h in scan_result_1.hosts: for h2 in scan_result_2.hosts: if h.ips_match(h2.ips): h.hostnames = list(set(h.hostnames).union(set(h2.hostnames))) h.ports = list(set(h.ports).union(h2.ports)) h.vulnerabilities = list(set(h.vulnerabilities).union(h2.vulnerabilities)) h.credentials = list(set(h.credentials).union(h2.credentials)) if h.os is None: h.os = h2.os if h.trace is None: h.trace = h2.trace scan_result_1.hosts = scan_result_1.hosts + new_hosts scan_result_1.reachable = list(set(scan_result_1.reachable + scan_result_2.reachable)) return scan_result_1
[docs] @staticmethod def nmap_pivot_scan_action_helper(s: EmulationEnvState, a: EmulationAttackerAction, partial_result: NmapScanResult) -> EmulationEnvState: """ Performs an NMAP pivot scan, utilizing many compromised hosts :param s: the curretn state :param a: the attacker scan action :param partial_result: the initial result before pivoting :return: the new state """ merged_scan_result = partial_result total_results: List[NmapScanResult] = [] threads = [] for machine in s.attacker_obs_state.machines: if machine.logged_in and machine.tools_installed and machine.backdoor_installed: thread = PivotNMAPScanThread(machine=machine, s=s, a=a) thread.start() threads.append(thread) for thread in threads: thread.join() for i in range(len(threads)): if merged_scan_result is not None and threads[i].scan_result is not None: total_results.append(threads[i].scan_result) merged_scan_result = NmapUtil.merge_nmap_scan_results(scan_result_1=merged_scan_result, scan_result_2=threads[i].scan_result.copy()) elif merged_scan_result is None: total_results.append(threads[i].scan_result) merged_scan_result = threads[i].scan_result.copy() s_prime = NmapUtil.merge_nmap_scan_result_with_state(scan_result=merged_scan_result, s=s, a=a) new_machines_obs_1 = [] reachable = s.attacker_obs_state.agent_reachable reachable.add(s.emulation_env_config.containers_config.router_ip) for machine in s_prime.attacker_obs_state.machines: if machine.logged_in and machine.tools_installed and machine.backdoor_installed: reachable = reachable.union(machine.reachable) for machine in s_prime.attacker_obs_state.machines: if machine.logged_in and machine.tools_installed: machine = EnvDynamicsUtil.ssh_backdoor_tried_flags(a=a, m_obs=machine) if machine.ips_match(reachable) and (machine.ips_match(a.ips) or a.index == -1): machine = EnvDynamicsUtil.exploit_tried_flags(a=a, m_obs=machine) valid_ips = True for ip in machine.ips: if int(ip.split(".")[-1]) == 1: valid_ips = False if valid_ips: new_machines_obs_1.append(machine) s_prime.attacker_obs_state.machines = new_machines_obs_1 s_prime.attacker_obs_state.sort_machines() s_prime.defender_obs_state.sort_machines() return s_prime
[docs]class PivotNMAPScanThread(threading.Thread): """ Thread for asynchronous nmap scanning """ def __init__(self, machine: EmulationAttackerMachineObservationState, a: EmulationAttackerAction, s: EmulationEnvState): """ Initializes the thread :param machine: the machine to do the details on :param a: the scanning action :param s: the current environment state """ threading.Thread.__init__(self) self.machine = machine self.a = a self.s = s self.total_time = 0 self.scan_result: NmapScanResult = NmapScanResult(hosts=[], ips=[])
[docs] def run(self) -> None: """ Main loop of the thread; performs the scanning :return: None """ ssh_connections_alive = [] for c in self.machine.ssh_connections: try: EmulationUtil.execute_ssh_cmds(cmds=[constants.COMMANDS.LS], conn=c.conn) ssh_connections_alive.append(c) except Exception as e: Logger.__call__().get_logger().debug(f"There was an error connecting to {str(e)}, {repr(e)}") new_conn = ConnectionUtil.reconnect_ssh(c) ssh_connections_alive.append(new_conn) self.machine.ssh_connections = ssh_connections_alive ssh_connections_sorted_by_root = sorted( self.machine.ssh_connections, key=lambda x: (constants.SSH_BACKDOOR.BACKDOOR_PREFIX in x.credential.username, x.root, x.credential.username), reverse=True) for c in ssh_connections_sorted_by_root: cwd = "/home/" + c.credential.username + "/" cmds, file_names = self.a.nmap_cmds(machine_ips=self.machine.ips) results = [] c2 = c for i in range(constants.ENV_CONSTANTS.NUM_RETRIES): try: Logger.__call__().get_logger().info( f"Running NMAP scan on container(s): {self.machine.ips}, commands: " f"{','.join(cmds)}") results = EmulationUtil.execute_ssh_cmds(cmds=cmds, conn=c2.conn) break except Exception as e: Logger.__call__().get_logger().warning( f"exception execution commands for ip:{c.ip}, " f"username: {c.credential.username}, conn: {c.conn}, " f"transport: {c.conn.get_transport()}, active: {c.conn.get_transport().is_active()}," f"{str(e)}, {repr(e)}") c2 = ConnectionUtil.reconnect_ssh(c2) time.sleep(constants.ENV_CONSTANTS.SLEEP_RETRY) total_time = sum(list(map(lambda x: x[2], results))) EmulationUtil.log_measured_action_time( total_time=total_time, action=self.a, emulation_env_config=self.s.emulation_env_config) # Read result scan_result = NmapScanResult(hosts=[], ips=self.machine.ips) for file_name in file_names: for i in range(constants.ENV_CONSTANTS.NUM_RETRIES): try: xml_data = NmapUtil.parse_nmap_scan( file_name=file_name, emulation_env_config=self.s.emulation_env_config, conn=c.conn, dir=cwd) new_scan_result = NmapUtil.parse_nmap_scan_xml(xml_data, ips=self.machine.ips, action=self.a) scan_result = NmapUtil.merge_nmap_scan_results(scan_result_1=scan_result, scan_result_2=new_scan_result) self.machine.reachable.update(scan_result.reachable) except Exception as e: Logger.__call__().get_logger().warning( f"There was an exception parsing the file:{file_name} on ip:{c.ip}, error:{e}") time.sleep(constants.ENV_CONSTANTS.SLEEP_RETRY) self.scan_result = scan_result break