Source code for csle_attacker.emulation.util.nikto_util

import xml.etree.ElementTree as ET
from csle_common.dao.emulation_config.emulation_env_config import EmulationEnvConfig
from csle_common.dao.emulation_action.attacker.emulation_attacker_action import EmulationAttackerAction
from csle_common.dao.emulation_config.emulation_env_state import EmulationEnvState
from csle_common.util.env_dynamics_util import EnvDynamicsUtil
from csle_common.dao.emulation_observation.attacker.emulation_attacker_machine_observation_state \
    import EmulationAttackerMachineObservationState
import csle_common.constants.constants as constants
from csle_common.dao.emulation_action_result.nikto_scan_result \
    import NiktoScanResult
from csle_common.dao.emulation_action_result.nikto_vuln import NiktoVuln
from csle_common.util.emulation_util import EmulationUtil
from csle_attacker.emulation.util.nmap_util import NmapUtil
from csle_common.logging.log import Logger


[docs]class NiktoUtil: """ Class containing utility functions for the nikto-related functionality to the emulation """
[docs] @staticmethod def parse_nikto_scan(file_name: str, emulation_env_config: EmulationEnvConfig) -> ET.Element: """ Parses an XML file containing the result of an nikt scan :param file_name: name of the file to parse :param emulation_env_config: environment config :return: the parsed xml file """ sftp_client = emulation_env_config.get_hacker_connection().open_sftp() remote_file = sftp_client.open(constants.NMAP.RESULTS_DIR + file_name) try: xml_tree = ET.parse(remote_file) finally: remote_file.close() xml_data = xml_tree.getroot() return xml_data
[docs] @staticmethod def nikto_scan_action_helper(s: EmulationEnvState, a: EmulationAttackerAction) \ -> EmulationEnvState: """ Helper function for executing a NIKTO web scan action on the emulation. Implements caching. :param s: the current env state :param a: the Nikto action to execute :param emulation_env_config: the emulation env config :return: s', reward, done """ cmds, file_names = a.nikto_cmds() outdata, errdata, total_time = EmulationUtil.execute_ssh_cmds( cmds=cmds, conn=s.emulation_env_config.get_hacker_connection()) EmulationUtil.log_measured_action_time(total_time=total_time, action=a, emulation_env_config=s.emulation_env_config) # Read result scan_result = NiktoScanResult(ip=a.ips[0], vulnerabilities=[], port=80, sitename=a.ips[0]) for file_name in file_names: for i in range(constants.ENV_CONSTANTS.NUM_RETRIES): try: xml_data = NiktoUtil.parse_nikto_scan( file_name=file_name, emulation_env_config=s.emulation_env_config) scan_result = NiktoUtil.parse_nikto_scan_xml(xml_data) s = NiktoUtil.merge_nikto_scan_result_with_state( scan_result=scan_result, s=s, a=a) break except Exception as e: Logger.__call__().get_logger().warning( f"There was an error parsing the Nikto scan output: {e}, {repr(e)}") s_prime = s return s_prime
[docs] @staticmethod def merge_nikto_scan_result_with_state(scan_result: NiktoScanResult, s: EmulationEnvState, a: EmulationAttackerAction) -> EmulationEnvState: """ Merges a Nikto scan result with an existing observation state :param scan_result: the scan result :param s: the current state :param a: the action just executed :return: s', reward, done """ m_obs = None for m in s.attacker_obs_state.machines: if m.ips == scan_result.ip: m_obs = EmulationAttackerMachineObservationState(ips=m.ips) if m_obs is None: raise ValueError(f"Unknown IP: {scan_result.ip}") else: for vuln in scan_result.vulnerabilities: vuln_obs = vuln.to_obs() m_obs.osvdb_vulns.append(vuln_obs) attacker_machine_observations = EnvDynamicsUtil.merge_new_obs_with_old( s.attacker_obs_state.machines, [m_obs], emulation_env_config=s.emulation_env_config, action=a) s_prime = s s_prime.attacker_obs_state.machines = attacker_machine_observations return s_prime
[docs] @staticmethod def parse_nikto_scan_xml(xml_data) -> NiktoScanResult: """ Parses an XML Tree with Nikto Scan Result into a Nikto Scan DTO :param xml_data: the xml tree of Nikto Scan Result to parse :return: parsed nikto scan result """ result = None for child in xml_data: if child.tag == constants.NIKTO_XML.SCANDETAILS: result = NiktoUtil._parse_nikto_scandetails(child) elif child.tag == constants.NIKTO_XML.ITEM: result = NiktoUtil._parse_nikto_scandetails(xml_data) elif child.tag == constants.NIKTO_XML.NIKTOSCAN: NiktoUtil.parse_nikto_scan_xml(xml_data) return result
@staticmethod def _parse_nikto_scandetails(xml_data) -> NiktoScanResult: """ Parses a host-element in the XML tree :param xml_data: the host element :return: parsed nikto scan result """ target_ip = "" targetport = "" vulnerabilities = [] if constants.NIKTO_XML.TARGETPORT in xml_data.keys(): targetport = xml_data.attrib[constants.NIKTO_XML.TARGETPORT] if constants.NIKTO_XML.TARGETIP in xml_data.keys(): target_ip = xml_data.attrib[constants.NIKTO_XML.TARGETIP] for child in list(xml_data.iter()): if child.tag == constants.NIKTO_XML.ITEM: vuln = NiktoUtil._parse_nikto_item(NmapUtil._parse_nmap_status_xml(child)) vulnerabilities.append(vuln) nikto_scan_result = NiktoScanResult(vulnerabilities=vulnerabilities, port=targetport, ip=target_ip, sitename=target_ip) return nikto_scan_result @staticmethod def _parse_nikto_item(xml_data) -> NiktoVuln: """ Parses a item in the XML tree of a Nikto scan :param xml_data: the item element :return: parsed nikto vuln """ id = "" osvdb_id = None method = "" iplink = "" namelink = "" uri = "" description = "" if constants.NIKTO_XML.METHOD in xml_data.keys(): method = xml_data.attrib[constants.NIKTO_XML.METHOD] if constants.NIKTO_XML.OSVDB_ID in xml_data.keys(): osvdb_id = int(xml_data.attrib[constants.NIKTO_XML.OSVDB_ID]) if constants.NIKTO_XML.ITEM_ID in xml_data.keys(): id = str(int(xml_data.attrib[constants.NIKTO_XML.ITEM_ID])) for child in list(xml_data.iter()): if child.tag == constants.NIKTO_XML.DESCR: description = child.text elif child.tag == constants.NIKTO_XML.URI: uri = child.text elif child.tag == constants.NIKTO_XML.NAMELINK: namelink = child.text elif child.tag == constants.NIKTO_XML.IPLINK: iplink = child.text nikto_vuln = NiktoVuln(id=id, osvdb_id=osvdb_id, method=method, iplink=iplink, namelink=namelink, uri=uri, description=description) return nikto_vuln