csle_rest_api.resources.recovery_ai package

Subpackages

Submodules

csle_rest_api.resources.recovery_ai.prompts module

class csle_rest_api.resources.recovery_ai.prompts.Prompts[source]

Bases: object

Class with string constants related to LLM-prompting

ACTION = 'Action'
ACTION_PROMPT_TEMPLATE = "Below is a system description, a sequence of network logs (e.g., from an intrusion detection system), a description of a cybersecurity incident, the current state of the recovery from the incident, a list of previously executed recovery actions, and an instruction that describes a task.\nWrite a response that appropriately completes the request.\nBefore generating the response, think carefully about the system, the logs, and the instruction, then create a step-by-step chain of thoughts to ensure a logical and accurate response.\n\n### System:\n{}\n\n### Logs:\n{}\n\n### Incident:\n{}\n\n### State:\n{}\nThe meaning of the state fields are as follows.\nis_attack_contained: Has the immediate threat been stopped from spreading?\nis_knowledge_sufficient: Have we gathered enough data to effectively contain and eradicate the attack?\nare_forensics_preserved: Has evidence been captured and stored in a forensically sound manner?\nis_eradicated: Is the adversary completely removed from the system?\nis_hardened: Has the root cause of the attack been remediated? i.e., are future attacks of the same type prevented?\nis_recovered: Are primary services restored for users?\n\n### Previous recovery actions:\n{}\n\n### Instruction:\nYou are a security operator with advanced knowledge in cybersecurity and IT systems. You have been given information about a security incident and should generate the next suitable action for recovering the system from the incident. Your suggested action should be based on the logs, the system description only, the current state, and the previous recovery actions.\nMake sure that the suggested recovery action is consistent with the system description and the logs and that you do not repeat any action that has already been performed.\nThe goal when selecting the recovery action is to change the state so that one of the state-properties that is currently 'false' becomes 'true'. The ideal recovery action sequence is: 1. contain the attack 2. gather information 3. preserve evidence 4. eradicate the attacker 5. harden the system 6. recover operational services.\nWhen selecting the recovery action, make sure that it is concrete and actionable and minimizes unnecessary service disruptions. Vague or unnecessary actions will not change the state and should be avoided.\nReturn a JSON object with two properties: 'Action' and 'Explanation', both of which should be strings.\nThe property 'Action' should be a string that concisely describes the concrete recovery action.\nThe property 'Explanation' should be a string that concisely explains why you selected the recovery action and motivates why the action is needed.\n\n### Response:\n<think>"
ARE_FORENSICS_PRESERVED = 'are_forensics_preserved'
ATTACKER = 'Attacker'
ENTITIES = 'Entities'
EXPLANATION = 'Explanation'
GEMINI_ACTION_EVAL = 'Below is a system description, logs, an incident description, a recovery state, and a recovery action, and an instruction. Complete the instruction.\n\n### System:\n{}\n\n### Logs:\n{}\n\n### Incident:\n{}\n\n### State:\n{}\nThe meaning of the state fields are as follows.\nis_attack_contained: Has the immediate threat been stopped from spreading?\nis_knowledge_sufficient: Have we gathered enough data to effectively contain and eradicate the attack?\nare_forensics_preserved: Has evidence been captured and stored in a forensically sound manner?\nis_eradicated: Is the adversary completely removed from the system?\nis_hardened: Has the root cause of the attack been remediated? i.e., are future attacks of the same type prevented?\nis_recovered: Are primary services restored for users?\n\n### Recovery action:\n{}\n\n### Instruction:\nGive a score between 0 and 1 that quantifies how good the recovery action is. These are the goals of the recovery action:\n- It should be concrete and actionable, i.e., it should clearly explain what the security operator should do, ideally with details like specific IP addresses/hostnames/configurations/vulnerabilities.\n- It should avoid unnecessary recovery measures (e.g., shutting down services that are not affected by the incident or blocking unnecessary IPs).\n- It should cause the recovery state to change so that at least one state-property that is currently False becomes True.\nReturn just the score between 0 and 1 that indicates how well the action meets the above criteria.\n\n### Response: '
GEMINI_INCIDENT_EVAL = "Below is a system description, logs, a suggested incident classification, and an instruction. Complete the instruction.\n\n### System:\n{}\n\n### Logs:\n{}\n\n### Incident classification:\n{}\n\n### Instruction:\nGive a score between 0 and 1 that quantifies how good the incident classification is based on the logs and the system description. These are the goals of the incident classification: \n- The logs/system description should be classified as an incident only if some recovery action is needed, if it is not severe enough to warrant any action at all, then it should be classified as 'No incident'.\n- If all of the logs are just indicate of normal system activity (e.g., false positives) or the logs are indicating alerts with low severity, then it should *not* be classified as an incident. \n- If the logs contain some false alerts or alerts that are indicative of normal operation but contains at least 1 severe alert that is unusual, then it should be classified as an incident based on the severe alerts. \n- The incident report should focus on the important (severe) alerts. \nReturn just the score between 0 and 1 that indicates how well the incident classification meets the above criteria.\n\n### Response: "
INCIDENT = 'Incident'
INCIDENT_DESCRIPTION = 'Incident description'
INCIDENT_PROMPT_TEMPLATE = "Below is a system description, a sequence of network logs (e.g., from an intrusion detection system), and an instruction that describes a task.\nWrite a response that appropriately completes the request.\nBefore generating the response, think carefully about the system, the logs, and the instruction, then create a step-by-step chain of thoughts to ensure a logical and accurate response.\n\n### System:\n{}\n\n### Logs:\n{}\n\n### Instruction:\nYou are a security operator with advanced knowledge in cybersecurity and IT systems.\nYou have been given information about a system and some logs generated by it, e.g., security alerts.\nYour task is to determine if the logs indicate a cyber incident (i.e., attack) that requires recovery actions.\nIf the logs are just indicative of normal system activity or if they are unrelated to security, then you should classify the logs/system as not being an incident that requires recovery.\nSimilarly, if the logs contain very minor security alerts that do not warrant any recovery action, then you should classify the logs/system as not being an incident that requires recovery.\nIf there is an incident that requires action, you should concisely describe the incident and explain why it is an incident, i.e., you should indicate which parts of the logs or system description indicate an incident that requires immediate action.\nIt is important that any conclusions you make in the incident description are supported by the logs/system description, don't make guesses.\nYou should also associate the incident with tactics and techniques from the MITRE ATT&CK taxonomy. You should also identify entities involved in the incident.\nReturn a JSON object with five fields: 'Incident', 'Incident description', 'MITRE ATT&CK Tactics', 'MITRE ATT&CK Techniques', and 'Entities'.\n'Incident' should be a string that is either 'Yes' or 'No'.\n'Incident description' should be a string with a concise summary of the incident and explanation of why the logs/system description indicate that there is a incident.\n'MITRE ATT&CK Tactics' should be an array of strings, each of which corresponds to one tactic used by the attacker in the incident.\n'MITRE ATT&CK Techniques' should be an array of strings, each of which corresponds to one technique used by the attacker in the incident.\n'Entities' should be a JSON object with three properties: 'Attacker', 'System', and 'Targeted', where 'Attacker' should be an array of strings, each of which is either an IP or a hostname that is related to the attacker/adversary, 'System' should be an array of strings, each of which is either an IP or a hostname that corresponds to some component in the system, and 'Targeted' should be an array of strings, each of which is either an IP or a hostname that corresponds to some component in the system that is under attack.\nIf the 'Incident' field is set to 'No', then 'Incident description' should be 'No incident can be inferred from the logs because they contain no substantial information.', 'MITRE ATT&CK Tactics' should be an empty array, 'MITRE ATT&CK Techniques' should be an empty array, and 'Entities' should be an empty JSON object.\nReturn only the JSON with the above five fields, nothing else.\n\n### Response:\n<think>"
INCIDENT_YES = 'Yes'
IS_ATTACK_CONTAINED = 'is_attack_contained'
IS_ERADICATED = 'is_eradicated'
IS_HARDENED = 'is_hardened'
IS_KNOWLEDGE_SUFFICIENT = 'is_knowledge_sufficient'
IS_RECOVERED = 'is_recovered'
MITRE_ATTACK_TACTICS = 'MITRE ATT&CK Tactics'
MITRE_ATTACK_TECHNIQUES = 'MITRE ATT&CK Techniques'
RAG_PROMPT_TEMPLATE = "Below is a sequenc of logs and an instruction. Complete the instruction.\n\n### Logs:\n{}\n\n### Instruction:\nExtract a list of threat identifiers, e.g., cve-identifiers, cwe-identifiers, identifiers for alerts (e.g., snort SIDs), or simply names of known vulnerabilities that occur in the logs. It can be any identifier that you think it is possible to use to fetch more relevant information about a potential incident.\nFor each of the identifiers that you extract, fetch brief and concise information/context about what it means.\nReturn a JSON object with two properties:\n'Identifiers', which is a list of strings with the identifiers.\n'Context', which is a list of strings with the context/description of each identifier.\nThese two lists should have the same length. Return only the valid JSON, nothing else.\nIt is important that each string in the 'Identifiers' list appears verbatim in the logs. Moreover, the context about each identifier should be maximum 2 sentences.\n\n### Response: "
STATE_PROMPT_TEMPLATE = "Below is a system description, a sequence of network logs (e.g., from an intrusion detection system), a description of a cybersecurity incident, the current state of the recovery from the incident, a proposed recovery action, and an instruction that describes a task.\nWrite a response that appropriately completes the request.\nBefore generating the response, think carefully about the system, the logs, and the instruction, then create a step-by-step chain of thoughts to ensure a logical and accurate response.\n\n### System:\n{}\n\n### Logs:\n{}\n\n### Incident:\n{}\n\n### State:\n{}\nThe meaning of the state fields are as follows.\nis_attack_contained: Has the immediate threat been stopped from spreading?\nis_knowledge_sufficient: Have we gathered enough data to effectively contain and eradicate the attack?\nare_forensics_preserved: Has evidence been captured and stored in a forensically sound manner?\nis_eradicated: Is the adversary completely removed from the system?\nis_hardened: Has the root cause of the attack been remediated? i.e., are future attacks of the same type prevented?\nis_recovered: Are primary services restored for users?\n\n### Recovery action:\n{}\n\n### Instruction:\nYou are a security operator with advanced knowledge in cybersecurity and IT systems.\nYou have been given information about a security incident, the state of recovery from the incident, and a recovery action.\nYour task is to predict what the next state of the recovery will be after applying the recovery action.\nFor example, if the given recovery action effectively contains the attack and 'is_attack_contained' is 'false' in the current state, then the next state should have 'is_attack_contained' set to 'true'.\nSimilarly, if 'is_recovered' is 'false' in the current state and the given recovery action effectively recovers operational services of the system, then the next state should have 'is_recovered' set to 'true', etc.\nIt is also possible that multiple state properties change values from false to true. It is also possible that the state remains the same, i.e., no property changes.\nIt is important that the state only changes if the action is effective in achieving one of the recovery goals: containment, information gathering, preserving evidence, eradication, hardening, or recovery.\nA state variable can only change from 'false' to 'true', it cannot be changed from 'true' to 'false'.\nReturn a JSON object that defines the next state and contains the Boolean fields 'is_attack_contained', 'is_knowledge_sufficient', 'are_forensics_preserved', 'is_eradicated', 'is_hardened', 'is_recovered'.\n\n### Response:\n<think>"
SYSTEM = 'System'
TARGETED = 'Targeted'

csle_rest_api.resources.recovery_ai.recovery_ai_util module

class csle_rest_api.resources.recovery_ai.recovery_ai_util.RecoveryAIUtil[source]

Bases: object

static action_selection(action_input: str, model: Union[PreTrainedModel, PeftModel], tokenizer: PreTrainedTokenizer, state: Dict[str, bool], state_prompt_template: str, system: str, logs: str, incident_str: str, num_optimization_steps: int = 3, temperature: float = 1, lookahead_horizon: int = 1, rollout_horizon: int = 1) Generator[str, Any, Union[None, Dict[str, str]]][source]

Lookahead optimization to select the next action to perform.

Parameters
  • action_input – the action prompt

  • model – the LLM

  • tokenizer – the tokenizer of the LLM

  • state – the current state

  • state_prompt_template – the prompt template for the state

  • system – the system description

  • logs – the logs

  • incident_str – description of the incident

  • num_optimization_steps – the number of actions to evaluate before selecting the next action

  • temperature – the temperature for action generation when num_optimization_steps > 1

  • lookahead_horizon – the lookahead horizon for the action selection

  • rollout_horizon – the rollout horizon for the action selection

Returns

static create_incident_prompt(system: str, logs: str, template: str) str[source]

Utiltiy function for creating the incident prompt from the template given a system description and a logs description. :param system: the system description :param logs: the logs description :param template: the prompt template :return: the prompt

static enrich_logs(logs: str, identifiers: List[str], contexts: List[str]) str[source]

Enriches the logs with context about therat identifiers

Parameters
  • logs – the logs to enrich

  • identifiers – the identifiers to provide contexts for

  • contexts – the contexts for the identifiers

Returns

the enriched logs

static format_incident_description(incident: Dict[str, Any]) str[source]

Utility function for formatting an incident report as a string suitable as a prompt.

Parameters

incident – the incident report

Returns

the formatted string

static gemini_rag(logs: str) Generator[str, Any, Union[None, Tuple[list[Any], list[Any]], Tuple[Any, Any]]][source]

Retrieves information abut threat identifiers in the logs/system description.

Parameters

logs – the logs

Returns

a list of threat identifiers and a list of contexts for those identifiers

static generate_output(model: Union[PreTrainedModel, PeftModel], tokenizer: PreTrainedTokenizer, prompt: str, no_think: bool = False, temperature: float = 0.6) Generator[str, Any, Union[List[str], Tuple[str, str]]][source]

Uses a given LLM, tokenizer, and prompt to generate a stream of outputs

Parameters
  • model – the pre-trained LLM

  • tokenizer – the tokenizer

  • prompt – the prompt

  • no_think – boolean flag indicating whether to skip the <think> token.

  • temperature – parameter that controls the stochasticity of the output (higher means more stochastic)

Returns

the reasoning string and the answer string by the LLM.

static generate_previous_actions_str(previous_actions: List[Dict[str, str]]) str[source]

Utility functions for formatting the list of previous recovery actions into a string suitable for a prompt.

Parameters

previous_actions – the list of previous recovery actions

Returns

the formatted string

static incident_classification(incident_input: str, model: Union[PreTrainedModel, PeftModel], tokenizer: PreTrainedTokenizer, system: str, logs: str, num_optimization_steps: int = 3, temperature: float = 1, lookahead_horizon: int = 1, rollout_horizon: int = 1) Generator[str, Any, Union[None, Dict[str, str]]][source]

Lookahead optimization to classify the incident

Parameters
  • incident_input – the incident prompt

  • model – the LLM

  • tokenizer – the tokenizer of the LLM

  • system – the system description

  • logs – the logs

  • num_optimization_steps – the number of incident classification to evaluate before selecting one

  • temperature – the temperature for incident classification generation when num_optimization_steps > 1

  • lookahead_horizon – the lookahead horizon for the incident classification selection

  • rollout_horizon – the rollout horizon for the incident classification selection

Returns

static is_state_terminal(state: Dict[str, bool]) bool[source]

Utility function that checks whether a given recovery state is terminal or not.

Parameters

state – the state to check

Returns

True if it is terminal, False otherwise

static load_llm() Tuple[PreTrainedTokenizer, PreTrainedModel][source]

Utility function for loading the pre-trained LLM from disk.

Returns

the loaded LLM as well as the corresponding tokenizer.

static otx_rag(logs: str) Generator[str, Any, str][source]

Utility function for retrieval-augmented generation

Parameters

logs – the logs to enrich with retrieved infromation

Returns

the enriched logs

static recovery_loop(model: Union[PreTrainedModel, PeftModel], tokenizer: PreTrainedTokenizer, system: str, logs: str, incident_str: str, action_prompt_template: str, state_prompt_template: str, num_optimization_steps: int = 3, temperature: float = 1, lookahead_horizon: int = 1, rollout_horizon: int = 1) Generator[str, None, None][source]

Loop that generates the recovery plan from a given incident report.

Parameters
  • model – the LLM

  • tokenizer – the tokenizer of the LLM

  • system – the system description

  • logs – the logs description

  • incident_str – the incident report

  • action_prompt_template – the prompt template for generating actions

  • state_prompt_template – the prompt template for generating states

  • num_optimization_steps – the number of actions to evaluate before selecting one

  • temperature – the temperature for action generation when num_optimization_steps > 1

  • lookahead_horizon – the lookahead horizon for the action selection

  • rollout_horizon – the rollout horizon for the action selection

Returns

None

csle_rest_api.resources.recovery_ai.routes module

Routes and sub-resources for the /recovery-ai resource

csle_rest_api.resources.recovery_ai.routes.example() Tuple[Response, int][source]

The /recovery-ai/example resource.

Returns

An example system description and log

csle_rest_api.resources.recovery_ai.routes.recovery_ai() Tuple[Response, int][source]

SSE endpoint that streams tokens of the recovery plan

Module contents