Source code for csle_rest_api.resources.logs.routes

"""
Routes and sub-resources for the /logs resource
"""
from typing import Tuple
from flask import Blueprint, jsonify, request, Response
import json
import csle_common.constants.constants as constants
import csle_rest_api.constants.constants as api_constants
import csle_rest_api.util.rest_api_util as rest_api_util
from csle_common.metastore.metastore_facade import MetastoreFacade
from csle_cluster.cluster_manager.cluster_controller import ClusterController

# Creates a blueprint "sub application" of the main REST app
logs_bp = Blueprint(
    api_constants.MGMT_WEBAPP.LOGS_RESOURCE, __name__,
    url_prefix=f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.LOGS_RESOURCE}")


[docs]@logs_bp.route("", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def logs() -> Tuple[Response, int]: """ The /logs resource. :return: List of log files in the CSLE logging directory """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_csle_log_files(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.DOCKER_STATS_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def docker_stats_manager_logs() -> Tuple[Response, int]: """ The /logs/docker-stats-manager resource. :return: The logs of the docker stats manager """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_docker_statsmanager_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.PROMETHEUS_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def prometheus_logs() -> Tuple[Response, int]: """ The /logs/prometheus resource. :return: The Prometheus logs """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_prometheus_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.NGINX_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def nginx_logs() -> Tuple[Response, int]: """ The /logs/nginx resource. :return: The nginx logs """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_nginx_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.POSTGRESQL_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def postgresql_logs() -> Tuple[Response, int]: """ The /logs/postgresql resource. :return: The PostgreSQL logs """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_postgresql_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.FLASK_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def flask_logs() -> Tuple[Response, int]: """ The /logs/flask resource. :return: The Flask logs """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_flask_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.CLUSTERMANAGER_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def cluster_manager_logs() -> Tuple[Response, int]: """ The /logs/clustermanager resource. :return: The Clustermanager logs """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_cluster_manager_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.DOCKER_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def docker_logs() -> Tuple[Response, int]: """ The /logs/docker resource. :return: The Docker logs """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_docker_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.NODE_EXPORTER_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def node_exporter_logs() -> Tuple[Response, int]: """ The /logs/node-exporter resource. :return: The logs of the docker stats manager """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_node_exporter_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.CADVISOR_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def cadvisor_logs() -> Tuple[Response, int]: """ The /logs/cadvisor resource. :return: The logs of cAdvisor """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_cadvisor_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.PGADMIN_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def pgadmin_logs() -> Tuple[Response, int]: """ The /logs/pgadmin resource. :return: The logs of pgAdmin """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_pgadmin_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.GRAFANA_RESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def grafana_logs() -> Tuple[Response, int]: """ The /lofgs/grafana resource. :return: The logs of the docker stats manager """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized json_data = json.loads(request.data) if api_constants.MGMT_WEBAPP.IP_PROPERTY not in json_data: response_str = f"{api_constants.MGMT_WEBAPP.IP_PROPERTY} not provided" return (jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}), constants.HTTPS.BAD_REQUEST_STATUS_CODE) ip = json_data[api_constants.MGMT_WEBAPP.IP_PROPERTY] data_dict = ClusterController.get_grafana_logs(ip=ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.CONTAINER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def container_logs() -> Tuple[Response, int]: """ The /logs/container resource. :return: The logs of a specific container """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE container_name = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM, default="") execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM, default=-1) if emulation == "" or execution_id == 1: response_str = "emulation or execution id query parameters were not provided" response = jsonify({api_constants.MGMT_WEBAPP.REASON_PROPERTY: response_str}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_full_name( name=container_name) if container_config is not None: config = MetastoreFacade.get_config(id=1) for node in config.cluster_config.cluster_nodes: if container_config.physical_host_ip == node.ip: data_dict = ClusterController.get_container_logs( ip=node.ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=container_config.docker_gw_bridge_ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.CLIENT_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def client_manager_logs() -> Tuple[Response, int]: """ The /logs/client-manager resource. :return: The logs of the client manager """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) node_ip = execution.emulation_env_config.traffic_config.client_population_config.physical_host_ip data_dict = ClusterController.get_client_manager_logs( ip=node_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.KAFKA_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def kafka_manager_logs() -> Tuple[Response, int]: """ The /logs/kafka-manager resource. :return: The logs of the kafka manager """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) node_ip = execution.emulation_env_config.kafka_config.container.physical_host_ip data_dict = ClusterController.get_kafka_manager_logs( ip=node_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.KAFKA_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def kafka_logs() -> Tuple[Response, int]: """ The /logs/kafka resource. :return: The logs of the kafka server """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) node_ip = execution.emulation_env_config.kafka_config.container.physical_host_ip data_dict = ClusterController.get_kafka_logs( ip=node_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.SNORT_IDS_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def snort_ids_manager_logs() -> Tuple[Response, int]: """ The /logs/snort-ids-manager resource. :return: The logs of a Snort IDS manager with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_ip(ip=ip) if container_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE data_dict = ClusterController.get_snort_ids_manager_logs( ip=container_config.physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.SNORT_IDS_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def snort_ids_logs() -> Tuple[Response, int]: """ The /logs/snort-ids-logs resource. :return: The logs of a Snort IDS with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_ip(ip=ip) if container_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE data_dict = ClusterController.get_snort_ids_logs( ip=container_config.physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.OSSEC_IDS_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def ossec_ids_manager_logs() -> Tuple[Response, int]: """ The /logs/ossec-ids-manager resource. :return: The logs of a OSSEC IDS manager with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_ip(ip=ip) if container_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE data_dict = ClusterController.get_ossec_ids_manager_logs( ip=container_config.physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.OSSEC_IDS_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def ossec_ids_logs() -> Tuple[Response, int]: """ The /logs/ossec-ids resource. :return: The logs of an OSSEC IDS with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_ip(ip=ip) if container_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE data_dict = ClusterController.get_ossec_ids_logs( ip=container_config.physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.HOST_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def host_manager_logs() -> Tuple[Response, int]: """ The /logs/host-manager resource. :return: The logs of a Host manager with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_ip(ip=ip) if container_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE data_dict = ClusterController.get_host_manager_logs( ip=container_config.physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.TRAFFIC_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def traffic_manager_logs() -> Tuple[Response, int]: """ The /logs/traffic-manager resource. :return: The logs of a Traffic manager with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) container_config = execution.emulation_env_config.containers_config.get_container_from_ip(ip=ip) if container_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE data_dict = ClusterController.get_traffic_manager_logs( ip=container_config.physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id), container_ip=ip) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.ELK_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def elk_manager_logs() -> Tuple[Response, int]: """ The /logs/elk-manager resource. :return: The logs of a ELK manager with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) physical_host_ip = execution.emulation_env_config.elk_config.container.physical_host_ip data_dict = ClusterController.get_elk_manager_logs( ip=physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.ELK_STACK_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def elk_logs() -> Tuple[Response, int]: """ The /logs/elk-stack resource. :return: The logs of an ELK-stack instance with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) physical_host_ip = execution.emulation_env_config.elk_config.container.physical_host_ip data_dict = ClusterController.get_elk_logs( ip=physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.RYU_MANAGER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def ryu_manager_logs() -> Tuple[Response, int]: """ The /logs/ryu-manager resource. :return: The logs of a Ryu manager with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) if execution.emulation_env_config.sdn_controller_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE physical_host_ip = execution.emulation_env_config.sdn_controller_config.container.physical_host_ip data_dict = ClusterController.get_ryu_manager_logs( ip=physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE
[docs]@logs_bp.route(f"{constants.COMMANDS.SLASH_DELIM}{api_constants.MGMT_WEBAPP.RYU_CONTROLLER_SUBRESOURCE}", methods=[api_constants.MGMT_WEBAPP.HTTP_REST_POST]) def ryu_controller_logs() -> Tuple[Response, int]: """ The /logs/ryu-controller resource. :return: The logs of a Ryu controller with a specific IP """ # Check that token is valid authorized = rest_api_util.check_if_user_is_authorized(request=request, requires_admin=True) if authorized is not None: return authorized if api_constants.MGMT_WEBAPP.NAME_PROPERTY not in json.loads(request.data): response = jsonify({}) return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE ip = json.loads(request.data)[api_constants.MGMT_WEBAPP.NAME_PROPERTY] emulation = request.args.get(api_constants.MGMT_WEBAPP.EMULATION_QUERY_PARAM) execution_id = request.args.get(api_constants.MGMT_WEBAPP.EXECUTION_ID_QUERY_PARAM) if emulation is not None and ip is not None and execution_id is not None: execution = MetastoreFacade.get_emulation_execution(ip_first_octet=int(execution_id), emulation_name=emulation) if execution.emulation_env_config.sdn_controller_config is None: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE physical_host_ip = execution.emulation_env_config.sdn_controller_config.container.physical_host_ip data_dict = ClusterController.get_ryu_controller_logs( ip=physical_host_ip, port=constants.GRPC_SERVERS.CLUSTER_MANAGER_PORT, emulation=emulation, ip_first_octet=int(execution_id)) response = jsonify(data_dict) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.OK_STATUS_CODE else: response = jsonify({}) response.headers.add(api_constants.MGMT_WEBAPP.ACCESS_CONTROL_ALLOW_ORIGIN_HEADER, "*") return response, constants.HTTPS.BAD_REQUEST_STATUS_CODE