from typing import Optional, List, Dict, Any
from csle_common.dao.emulation_config.transport_protocol import TransportProtocol
from csle_common.dao.emulation_config.node_vulnerability_config import NodeVulnerabilityConfig
from csle_common.dao.emulation_config.credential import Credential
from csle_common.dao.emulation_config.network_service import NetworkService
from csle_common.dao.emulation_config.vulnerability_type import VulnType
from csle_base.json_serializable import JSONSerializable
[docs]class EmulationVulnerabilityObservationState(JSONSerializable):
"""
A DTO representing an observed vulnerability in the emulation
"""
def __init__(self, name: str, port: int,
protocol: TransportProtocol, cvss: float, credentials: List[Credential],
osvdbid: int = -1,
description: Optional[str] = None,
service: Optional[str] = ""):
"""
Initializes the DTO
:param name: the name of the vulnerability
:param port: the port of the vulnerability
:param protocol: the protocol of the vulnerability
:param cvss: the CVSS of the vulnerability
:param osvdbid: the OSVDBID of the vulnerability
:param description: the description of the vulnerability
:param credentials: the credentials of the vulnerability
:param service: the service of the vulnerability
"""
self.name = name
self.port = port
self.protocol = protocol
self.cvss = cvss
self.osvdb_id = osvdbid
self.description = description
self.credentials = credentials
self.service = service
if service is None or service == "":
for cr in self.credentials:
if cr.service is not None and cr.service != "":
self.service = cr.service
[docs] def to_dict(self) -> Dict[str, Any]:
"""
Converts the object to a dict representation
:return: a dict representation of the object
"""
d: Dict[str, Any] = {}
d["name"] = self.name
d["protocol"] = self.protocol
d["cvss"] = self.cvss
d["osvdb_id"] = self.osvdb_id
d["description"] = self.description
d["credentials"] = list(map(lambda x: x.to_dict(), self.credentials))
d["service"] = self.service
d["port"] = self.port
return d
[docs] @staticmethod
def from_dict(d: Dict[str, Any]) -> "EmulationVulnerabilityObservationState":
"""
Converts a dict representation of the object into an instance
:param d: the dict to convert
:return: the created instance
"""
obj = EmulationVulnerabilityObservationState(
name=d["name"],
protocol=d["protocol"],
cvss=d["cvss"],
osvdbid=d["osvdb_id"],
description=d["description"],
credentials=list(map(lambda x: Credential.from_dict(x), d["credentials"])),
service=d["service"],
port=d["port"]
)
return obj
def __str__(self):
"""
:return: a string representation of the observed vulnerability
"""
return "name:{}, port:{}, protocol:{}, cvss:{}, osvdb_id:{}, desc:{}, service:{}, credentials:{}".format(
self.name, self.port, self.protocol, self.cvss, self.osvdb_id, self.description, self.service,
list(map(lambda x: str(x), self.credentials)) if self.credentials is not None else self.credentials)
[docs] def to_vulnerability(self) -> NodeVulnerabilityConfig:
"""
Converts the object into a vulnerability representation
:return: A vulnerability representation of the object
"""
vuln = NodeVulnerabilityConfig(name=self.name, port=self.port, protocol=self.protocol, cvss=self.cvss,
cve="", credentials=self.credentials, service=self.service, ip="",
vuln_type=VulnType.WEAK_PW)
return vuln
[docs] def to_network_services(self) -> List[NetworkService]:
"""
Converts the object into a network service representation
:return: the network service representation
"""
services = [NetworkService(protocol=self.protocol if self.protocol is not None else "",
port=self.port, name=self.service if self.service is not None else "",
credentials=self.credentials)]
for cr in self.credentials:
new_service = True
for s in services:
if s.name == cr.service:
new_service = False
if new_service:
services.append(NetworkService.from_credential(cr))
return services
[docs] @staticmethod
def from_json_file(json_file_path: str) -> "EmulationVulnerabilityObservationState":
"""
Reads a json file and converts it to a DTO
:param json_file_path: the json file path
:return: the converted DTO
"""
import io
import json
with io.open(json_file_path, 'r') as f:
json_str = f.read()
return EmulationVulnerabilityObservationState.from_dict(json.loads(json_str))
[docs] def num_attributes(self) -> int:
"""
:return: The number of attribute of the DTO
"""
if len(self.credentials) > 0:
return 7 + len(self.credentials) * self.credentials[0].num_attributes()
else:
return 0
[docs] @staticmethod
def schema() -> "EmulationVulnerabilityObservationState":
"""
:return: get the schema of the DTO
"""
return EmulationVulnerabilityObservationState(name="", port=-1, protocol=TransportProtocol.TCP, cvss=0.5,
osvdbid=-1, description="", credentials=[Credential.schema()],
service="")