Source code for csle_ryu.monitor.flow_and_port_stats_monitor

from confluent_kafka import Producer
import json
import socket
import time
import requests
from ryu.controller.handler import MAIN_DISPATCHER, DEAD_DISPATCHER
from ryu.lib import hub
from ryu.base import app_manager
from ryu.controller import ofp_event
from ryu.controller.handler import set_ev_cls
from ryu.ofproto import ofproto_v1_3
from ryu.app.wsgi import ControllerBase
from ryu.app.wsgi import Response
from ryu.app.wsgi import route
from ryu.app.wsgi import WSGIApplication
import csle_collector.constants.constants as collector_constants
import csle_ryu.constants.constants as constants
from csle_ryu.dao.flow_statistic import FlowStatistic
from csle_ryu.dao.port_statistic import PortStatistic
from csle_ryu.dao.avg_flow_statistic import AvgFlowStatistic
from csle_ryu.dao.avg_port_statistic import AvgPortStatistic
from csle_ryu.dao.agg_flow_statistic import AggFlowStatistic


[docs]class FlowAndPortStatsMonitor(app_manager.RyuApp): """ Contains general functionality to monitor of flow and port statistics """ # OpenFlow version to use OFP_VERSIONS = [ofproto_v1_3.OFP_VERSION] # Include WSGI context for the northbound API # Specifyies Ryu's WSGI-compatible Web server clas to use _CONTEXTS = {constants.RYU.WSGI: WSGIApplication} def __init__(self, *args, **kwargs): """ Initializes the class :param args: app arguments :param kwargs: app arguments """ super(FlowAndPortStatsMonitor, self).__init__(*args, **kwargs) # Dict to keep track of all connected switches self.datapaths = {} # Thread which will periodically query switches for statistics self.monitor_thread = hub.spawn(self._monitor) # State self.kafka_conf = {} self.producer_running = False self.producer = None self.time_step_len_seconds = 30 # Acquires the the WSGIApplication to register the controller class self.logger.info("Registering CSLE Northbound REST API Controller") wsgi = kwargs[constants.RYU.WSGI] wsgi.register(NorthBoundRestAPIController, {constants.RYU.CONTROLLER_APP: self}) @set_ev_cls(ofp_event.EventOFPStateChange, [MAIN_DISPATCHER, DEAD_DISPATCHER]) def _state_change_handler(self, ev) -> None: """ Handler called whenever a new switch connects or disconnects from the controller. MAIN_DISPATCHER corresponds to the event that a new switch connects and DEAD_DISPATCHER corresponds to the event that a switch disconnects :param ev: the connection event :return: None """ # Extract the datapath (abstraction of the link to the switch) that the event concerns datapath = ev.datapath if ev.state == MAIN_DISPATCHER: # A new switch connected if datapath.id not in self.datapaths: # If the switch was not already registered, add it to the datapath dict. self.logger.info(f"A new switch connection, DPID:{datapath.id}") self.datapaths[datapath.id] = datapath elif ev.state == DEAD_DISPATCHER: if datapath.id in self.datapaths: # Remove the switch from the dict of datapaths since it disconnected. self.logger.info(f"Switch with DPID:{datapath.id} disconnected") del self.datapaths[datapath.id] def _monitor(self) -> None: """ Thread that periodically sends requests to the connected switches to get switch statistics :return: None """ while True: if self.producer_running: for dp in self.datapaths.values(): self._request_stats(dp) try: response = requests.get(f"{collector_constants.HTTP.HTTP_PROTOCOL_PREFIX}" f"{collector_constants.HTTP.LOCALHOST}:8080" f"{constants.RYU.STATS_AGGREGATE_FLOW_RESOURCE}/{dp.id}", timeout=constants.RYU.TIMEOUT) aggflows = json.loads(response.content)[str(dp.id)][0] agg_flow_stat = AggFlowStatistic(timestamp=time.time(), datapath_id=str(dp.id), total_num_bytes=aggflows[constants.RYU.BYTE_COUNT], total_num_packets=aggflows[constants.RYU.PACKET_COUNT], total_num_flows=aggflows[constants.RYU.FLOW_COUNT]) if self.producer is not None and self.producer_running: self.producer.produce(collector_constants.KAFKA_CONFIG.OPENFLOW_AGG_FLOW_STATS_TOPIC_NAME, agg_flow_stat.to_kafka_record()) self.producer.poll(0) except Exception as e: self.logger.warning(f"There was an error parsing the flow statistics: {str(e)}, {repr(e)}") hub.sleep(self.time_step_len_seconds) def _request_stats(self, datapath) -> None: """ Utility function for sending a request to a switch with a qiven datapath to return its flow and port statistics. :param datapath: the datapath, i.e., abstraction of the link to the switch :return: None """ # Extract the protocol and message parser to use for sending the request openflow_protocol = datapath.ofproto parser = datapath.ofproto_parser # Prepare the request for flow statistics statistics_request = parser.OFPFlowStatsRequest(datapath) # Send the request to get the flow statistics of the switch datapath.send_msg(statistics_request) # Prepare the request for port statistics statistics_request = parser.OFPPortStatsRequest(datapath, 0, openflow_protocol.OFPP_ANY) # Send the request to get the port statistics of the switch datapath.send_msg(statistics_request) @set_ev_cls(ofp_event.EventOFPFlowStatsReply, MAIN_DISPATCHER) def _flow_stats_reply_handler(self, ev) -> None: """ Handler called when a flow statistics response is received from a switch :param ev: the response event :return: None """ # Extract the response body body = ev.msg.body ts = time.time() # Log the statistics flow_stats = [] for flow in body: in_port = "" eth_dst = "" if constants.RYU.IN_PORT in flow.match: in_port = flow.match[constants.RYU.IN_PORT] if constants.RYU.ETH_DST in flow.match: eth_dst = flow.match[constants.RYU.ETH_DST] flow_statistic_dto = FlowStatistic( timestamp=ts, datapath_id=str(ev.msg.datapath.id), in_port=in_port, out_port=flow.instructions[0].actions[0].port, dst_mac_address=eth_dst, num_packets=flow.packet_count, num_bytes=flow.byte_count, duration_nanoseconds=flow.duration_nsec, duration_seconds=flow.duration_sec, hard_timeout=flow.hard_timeout, idle_timeout=flow.idle_timeout, priority=flow.priority, cookie=flow.cookie ) if self.producer is not None and self.producer_running: self.producer.produce(collector_constants.KAFKA_CONFIG.OPENFLOW_FLOW_STATS_TOPIC_NAME, flow_statistic_dto.to_kafka_record()) self.producer.poll(0) flow_stats.append(flow_statistic_dto) if self.producer is not None and self.producer_running and len(flow_stats) > 0: avg_flow_stats = AvgFlowStatistic.average_flow_statistics( timestamp=ts, datapath_id=str(flow_stats[0].datapath_id), flow_statistics=flow_stats) self.producer.produce(collector_constants.KAFKA_CONFIG.AVERAGE_OPENFLOW_FLOW_STATS_PER_SWITCH_TOPIC_NAME, avg_flow_stats.to_kafka_record()) self.producer.poll(0) @set_ev_cls(ofp_event.EventOFPPortStatsReply, MAIN_DISPATCHER) def _port_stats_reply_handler(self, ev) -> None: """ Handler called when a port statistics response is received from a switch :param ev: the response event :return: None """ # Extract the response body body = ev.msg.body ts = time.time() port_stats = [] # Log the statistics for port in body: port_statistics_dto = PortStatistic( timestamp=ts, datapath_id=str(ev.msg.datapath.id), port=port.port_no, num_received_packets=port.rx_packets, num_received_bytes=port.rx_bytes, num_received_errors=port.rx_errors, num_transmitted_packets=port.tx_packets, num_transmitted_bytes=port.tx_bytes, num_transmitted_errors=port.tx_errors, num_received_dropped=port.rx_dropped, num_transmitted_dropped=port.tx_dropped, num_received_frame_errors=port.rx_frame_err, num_received_overrun_errors=port.rx_over_err, num_received_crc_errors=port.rx_crc_err, num_collisions=port.collisions, duration_nanoseconds=port.duration_nsec, duration_seconds=port.duration_sec ) if self.producer is not None and self.producer_running: self.producer.produce(collector_constants.KAFKA_CONFIG.OPENFLOW_PORT_STATS_TOPIC_NAME, port_statistics_dto.to_kafka_record()) self.producer.poll(0) port_stats.append(port_statistics_dto) if self.producer is not None and self.producer_running and len(port_stats) > 0: avg_flow_stats = AvgPortStatistic.average_port_statistics( timestamp=ts, datapath_id=str(port_stats[0].datapath_id), port_statistics=port_stats) self.producer.produce(collector_constants.KAFKA_CONFIG.AVERAGE_OPENFLOW_PORT_STATS_PER_SWITCH_TOPIC_NAME, avg_flow_stats.to_kafka_record()) self.producer.poll(0)
[docs]class NorthBoundRestAPIController(ControllerBase): """ Controller class for the Northbound REST API that accepts HTTP requests. Example requests: curl -X GET http://15.12.252.3:8080/cslenorthboundapi/producer/status curl -X PUT -d '{"bootstrap.servers": "15.12.253.253", "time_step_len_seconds": 30}' http://15.12.252.3:8080/cslenorthboundapi/producer/start curl -X POST http://15.12.252.3:8080/cslenorthboundapi/producer/stop """ def __init__(self, req, link, data, **config): """ Initializes the controller :param req: the request for the initialization :param link: the link on which the controller was started :param data: the data :param config: configuration parameters """ super(NorthBoundRestAPIController, self).__init__(req, link, data, **config) self.controller_app = data[constants.RYU.CONTROLLER_APP] # These names have to match! self.hostname = socket.gethostname() self.ip = socket.gethostbyname(self.hostname)
[docs] @route(constants.RYU.CONTROLLER_APP, collector_constants.RYU.STATUS_PRODUCER_HTTP_RESOURCE, methods=[collector_constants.HTTP.GET]) def producer_status(self, req, **kwargs): """ Gets the status of the Kafka producer :param req: the REST API request :param kwargs: the WSGI arguments :return: the REST API response """ response_body = json.dumps({ constants.RYU.KAFKA_CONF: self.controller_app.kafka_conf, constants.RYU.PRODUCER_RUNNING: self.controller_app.producer_running, collector_constants.RYU.TIME_STEP_LEN_SECONDS: self.controller_app.time_step_len_seconds}) return Response(content_type=collector_constants.HTTP.APPLICATION_JSON_TYPE, text=response_body)
[docs] @route(constants.RYU.CONTROLLER_APP, collector_constants.RYU.START_PRODUCER_HTTP_RESOURCE, methods=[collector_constants.HTTP.PUT]) def start_producer(self, req, **kwargs): """ Starts the Kafka producer that sends flow and port statistics :param req: the REST API request :param kwargs: WSGI arguments :return: the REST API response """ try: kafka_conf = req.json if req.body else {} except ValueError: return Response(status=collector_constants.HTTP.BAD_REQUEST_RESPONSE_CODE) if collector_constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY in kafka_conf \ and collector_constants.RYU.TIME_STEP_LEN_SECONDS in kafka_conf: self.controller_app.kafka_conf = { collector_constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY: kafka_conf[ collector_constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY], collector_constants.KAFKA.CLIENT_ID_PROPERTY: self.hostname} self.controller_app.logger.info(f"Starting Kafka producer with conf: {self.controller_app.kafka_conf}") self.controller_app.producer_running = True self.controller_app.producer = Producer(**self.controller_app.kafka_conf) self.controller_app.time_step_len_seconds = kafka_conf[collector_constants.RYU.TIME_STEP_LEN_SECONDS] body = json.dumps(self.controller_app.kafka_conf) return Response(content_type=collector_constants.HTTP.APPLICATION_JSON_TYPE, text=body, status=collector_constants.HTTP.OK_RESPONSE_CODE) else: return Response(status=collector_constants.HTTP.INTERNAL_SERVER_ERROR_RESPONSE_CODE)
[docs] @route(constants.RYU.CONTROLLER_APP, collector_constants.RYU.STOP_PRODUCER_HTTP_RESOURCE, methods=[collector_constants.HTTP.POST]) def stop_producer(self, req, **kwargs): """ Stops the Kafka producer that sends flow and port statistics :param req: the REST API request :param kwargs: WSGI arguments :return: The REST response """ self.controller_app.logger.info("Stopping Kafka producer") self.controller_app.kafka_conf = {} self.controller_app.producer_running = False self.controller_app.producer = None response_body = json.dumps({constants.RYU.KAFKA_CONF: self.controller_app.kafka_conf, constants.RYU.PRODUCER_RUNNING: self.controller_app.producer_running, collector_constants.RYU.TIME_STEP_LEN_SECONDS: self.controller_app.time_step_len_seconds}) return Response(content_type=collector_constants.HTTP.APPLICATION_JSON_TYPE, text=response_body, status=collector_constants.HTTP.OK_RESPONSE_CODE)