Source code for csle_rest_api.resources.emulation_simulation_traces.routes

"""
Routes and sub-resources for the /emulation-simulation-traces resource
"""
from typing import Tuple
from flask import Blueprint, jsonify, request, Response
import csle_common.constants.constants as constants
import csle_rest_api.constants.constants as api_constants
from csle_common.metastore.metastore_facade import MetastoreFacade
import csle_rest_api.util.rest_api_util as rest_api_util

# Creates a blueprint "sub application" of the main REST app
emulation_simulation_traces_bp = Blueprint(
    api_constants.MGMT_WEBAPP.EMULATION_SIMULATION_TRACES_RESOURCE, __name__,
    url_prefix=f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.EMULATION_SIMULATION_TRACES_RESOURCE}")


[docs]@emulation_simulation_traces_bp.route("", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_GET, api_constants.MGMT_WEBAPP.HTTP_REST_DELETE]) def emulation_simulation_traces() -> Tuple[Response, int]: """ The /emulation-traces resource. :return: A list of emulation traces or a list of ids of the traces or deletes the traces """ requires_admin = False if request.method == api_constants.MGMT_WEBAPP.HTTP_REST_DELETE: requires_admin = True authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=requires_admin) if authorized is not None: return authorized if request.method == api_constants.MGMT_WEBAPP.HTTP_REST_GET: # Check if ids query parameter is True, then only return the ids and not the whole list of emulation-simulation # traces ids = request.args.get(api_constants.MGMT_WEBAPP.IDS_QUERY_PARAM) if ids is not None and ids: return emulation_simulation_traces_ids() demo_traces = [ { "attacker_found_nodes": [[], [], [], [], [], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", ], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16", "switch2", "n17", "n18"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16", "switch2", "n17", "n18"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16", "switch2", "n17", "n18", "switch3", "n19", "n20", "n21", "n22", "n23", "n24"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16", "switch2", "n17", "n18", "switch3", "n19", "n20", "n21", "n22", "n23", "n24"], ["n2", "n3", "n4", "n5", "n6", "n7", "n8", "n9", "n10", "n11", "n12", "switch1", "n13", "n14", "n15", "n16", "switch2", "n17", "n18", "switch3", "n19", "n20", "n21", "n22", "n23", "n24", "n25", "n26", "switch4"]], "attacker_compromised_nodes": [[], [], [], [], [], [], ["n2"], ["n2"], ["n2", "n12"], ["n2", "n12"], ["n2", "n12", "n4"], ["n2", "n12", "n4"], ["n2", "n12", "n4", "n17"], ["n2", "n12", "n4", "n17"], ["n2", "n12", "n4", "n17", "n18"], ["n2", "n12", "n4", "n17", "n18"]], "attacker_actions": [0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1], "defender_actions": [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0], "defender_stop_probabilities": [0, 0.02, 0.05, 0.06, 0.05, 0.06, 0.08, 0.25, 0.55, 0.79, 0.95, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99, 0.99], "defender_beliefs": [0, 0.02, 0.05, 0.06, 0.05, 0.06, 0.08, 0.25, 0.33, 0.45, 0.48, 0.51, 0.55, 0.6, 0.62, 0.73, 0.78, 0.86], "defender_observations": [[39, 48, 52], [101, 2, 4], [30, 51, 28], [9, 12, 3], [45, 10, 19], [61, 20, 12], [41, 59, 61], [161, 80, 122], [120, 60, 59], [190, 40, 90], [121, 29, 56], [100, 31, 45], [161, 80, 20], [119, 67, 5], [67, 51, 9], [167, 105, 98], [147, 51, 78]], "intrusion_start_index": 7, "name": "demo" } ] response = jsonify(demo_traces) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE elif request.method == api_constants.MGMT_WEBAPP.HTTP_REST_DELETE: traces = MetastoreFacade.list_emulation_simulation_traces() for trace in traces: MetastoreFacade.remove_emulation_simulation_trace(trace) response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: "HTTP method not supported"}), constants.HTTPS.BAD_REQUEST_STATUS_CODE)
[docs]def emulation_simulation_traces_ids() -> Tuple[Response, int]: """ :return: An HTTP response with all emulation ids """ ids_emulations = MetastoreFacade.list_emulation_simulation_traces_ids() response_dicts = [] for tup in ids_emulations: response_dicts.append({ api_constants.MGMT_WEBAPP.ID_PROPERTY: tup[0] }) response = jsonify(response_dicts) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@emulation_simulation_traces_bp.route("/<trace_id>", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_GET, api_constants.MGMT_WEBAPP.HTTP_REST_DELETE]) def emulation_trace(trace_id: int) -> Tuple[Response, int]: """ The /emulation-traces/id resource. :param trace_id: the id of the trace :return: The given trace or delets the trace """ trace = MetastoreFacade.get_emulation_trace(id=trace_id) response = jsonify({}) if trace is not None: if request.method == api_constants.MGMT_WEBAPP.HTTP_REST_GET: response = jsonify(trace.to_dict()) else: MetastoreFacade.remove_emulation_trace(trace) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE