from confluent_kafka import Consumer import json import logging from app.config import settings import requests logger = logging.getLogger(__name__) class KafkaIntegration: _consumer = None _consumer_config = { "bootstrap.servers": settings.KAFKA_BROKER, "group.id": "loan-service-consumer", "auto.offset.reset": "earliest", "enable.auto.commit": True, } @staticmethod def _get_consumer(): """Kafka consumer""" if not KafkaIntegration._consumer: KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) logger.info( f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" ) return KafkaIntegration._consumer @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: logger.error(f"Message delivery failed: {err}") raise RuntimeError(f"Message delivery failed: {err}") else: logger.debug( f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" ) @staticmethod def receive_messages(topic, timeout=1.0): """ Receive messages from a Kafka topic. :param topic: The Kafka topic to subscribe to :param timeout: Time to wait for a message (in seconds) :return: The message value (decoded) or None if no message is received """ consumer = KafkaIntegration._get_consumer() consumer.subscribe([topic]) try: msg = consumer.poll(timeout=timeout) if msg is None: logger.debug(f"No message received from topic {topic} within timeout") return None if msg.error(): logger.error(f"Consumer error: {msg.error()}") raise RuntimeError(f"Consumer error: {msg.error()}") # Decode and return the message value message_value = msg.value().decode("utf-8") logger.debug( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) return json.loads(message_value) if message_value else None except Exception as e: logger.error(f"Error while receiving message: {e}") raise @staticmethod def close_consumer(): """Shutdown consumer""" consumer = KafkaIntegration._get_consumer() consumer.clonse() logger.info("Kafka consumer closed") @staticmethod def is_kafka_running(): """ Check if Kafka broker is reachable by attempting to fetch metadata. :return: True if Kafka is running, False otherwise """ try: producer = KafkaIntegration._get_producer() # Attempt to fetch metadata for a dummy topic metadata = producer.list_topics(timeout=5) if metadata.brokers: # If brokers are returned, Kafka is running logger.info(f"Kafka broker at {settings.KAFKA_BROKER} is running") return True else: logger.warning(f"No brokers found for {settings.KAFKA_BROKER}") return False except Exception as e: logger.error(f"Kafka broker check failed: {e}") return False @staticmethod def _call_disbursement_endpoint( message, endpoint_url="http://localhost:8000/loans/disbursement" ): """Call an HTTP endpoint with the received message""" try: response = requests.post(endpoint_url, json=message, timeout=5) response.raise_for_status() logger.info( f"Successfully sent message to {endpoint_url}: {response.status_code}" ) print(response.json()) except requests.exceptions.RequestException as e: logger.error(f"Failed to call endpoint {endpoint_url}: {e}") raise