from confluent_kafka import Consumer, Producer import json from app.utils.logger import logger from app.config import settings import requests from app.routes.loan import disbursement as disbursement_endpoint class KafkaIntegration: BASE_URL = settings.BANK_CALL_BASE_URL _consumer = None _consumer_config = { "bootstrap.servers": settings.KAFKA_BROKER, "group.id": "loan-service-consumer", "auto.offset.reset": "earliest", "enable.auto.commit": True, } @staticmethod def _get_consumer(): """Kafka consumer""" if not KafkaIntegration._consumer: KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) logger.info( f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" ) return KafkaIntegration._consumer @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: logger.error(f"Message delivery failed: {err}") raise RuntimeError(f"Message delivery failed: {err}") else: logger.debug( f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" ) @staticmethod def receive_disbursement_messages(topic, timeout=1.0): """ Receive messages from a Kafka topic. :param topic: The Kafka topic to subscribe to :param timeout: Time to wait for a message (in seconds) :return: The message value (decoded) or None if no message is received """ consumer = KafkaIntegration._get_consumer() consumer.subscribe([topic]) logger.info( f"Waiting for messages from topic {topic} with this timeout: {timeout}..." ) try: msg = consumer.poll(timeout=timeout) if msg is None: logger.debug(f"No message received from topic {topic} within timeout") return None if msg.error(): logger.error(f"Consumer error: {msg.error()}") raise RuntimeError(f"Consumer error: {msg.error()}") # Decode and return the message value message_value = msg.value().decode("utf-8") logger.debug( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) message = json.loads(message_value) if message_value else None # Call the endpoint if provided if message: KafkaIntegration._call_disbursement_endpoint(message) return message except Exception as e: logger.error(f"Error while receiving message: {e}") raise @staticmethod def close_consumer(): """Shutdown consumer""" consumer = KafkaIntegration._get_consumer() consumer.close() logger.info("Kafka consumer closed") @staticmethod def _call_disbursement_endpoint(message): """Call the disbursement endpoint with the received message""" logger.info(f"Calling disbursement endpoint with message: {message}") try: response = disbursement_endpoint(message) logger.info( f"Successfully sent message to disbursement endpoint: {response.status_code}" ) except Exception as e: logger.error(f"Failed to call disbursement endpoint: {e}") raise