Source code for csle_collector.client_manager.threads.producer_thread

import socket
import netifaces
import threading
import logging
import time
from confluent_kafka import Producer
import csle_collector.constants.constants as constants
from csle_collector.client_manager.threads.arrival_thread import ArrivalThread


[docs]class ProducerThread(threading.Thread): """ Thread that pushes statistics to Kafka """ def __init__(self, arrival_thread: ArrivalThread, time_step_len_seconds: int, ip: str, port: int): """ Initializes the thread :param arrival_thread: the thread that manages the client arrivals, used to extract statistics :param time_step_len_seconds: the length between pushing statistics to Kafka :param ip: the ip of the Kafka server :param port: the port of the Kafka server """ threading.Thread.__init__(self) self.arrival_thread = arrival_thread self.time_step_len_seconds = time_step_len_seconds self.stopped = False self.kafka_ip = ip self.port = port self.hostname = socket.gethostname() try: self.ip = netifaces.ifaddresses(constants.INTERFACES.ETH0)[netifaces.AF_INET][0][constants.INTERFACES.ADDR] except Exception: self.ip = socket.gethostbyname(self.hostname) self.conf = { constants.KAFKA.BOOTSTRAP_SERVERS_PROPERTY: f"{self.kafka_ip}:{self.port}", constants.KAFKA.CLIENT_ID_PROPERTY: self.hostname} self.producer = Producer(**self.conf) logging.info(f"Starting producer thread, ip:{self.ip}, kafka port:{self.port}, " f"time_step_len:{self.time_step_len_seconds}, kafka_ip:{self.kafka_ip}")
[docs] def run(self) -> None: """ Main loop of the thread, pushes data to Kafka periodically :return: None """ while not self.stopped and self.arrival_thread is not None: time.sleep(self.time_step_len_seconds) if self.arrival_thread is not None: ts = time.time() num_clients = len(self.arrival_thread.client_threads) rate = self.arrival_thread.rate mu = 4 self.producer.produce(constants.KAFKA_CONFIG.CLIENT_POPULATION_TOPIC_NAME, f"{ts},{self.ip},{num_clients},{rate},{mu}") self.producer.poll(0)