from confluent_kafka import Consumer, Producer import json from app.utils.logger import logger from app.config import settings import requests class KafkaIntegration: BASE_URL = settings.SIMBRELLA_BASE_URL _consumer = None _consumer_config = { "bootstrap.servers": settings.KAFKA_BROKER, "group.id": "loan-service-consumer", "auto.offset.reset": "earliest", "enable.auto.commit": True, } @staticmethod def _get_consumer(): """Kafka consumer""" if not KafkaIntegration._consumer: KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) logger.info( f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" ) return KafkaIntegration._consumer @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: logger.error(f"Message delivery failed: {err}") raise RuntimeError(f"Message delivery failed: {err}") else: logger.debug( f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" ) @staticmethod def receive_disbursement_messages(topic, timeout=1.0): """ Receive messages from a Kafka topic. :param topic: The Kafka topic to subscribe to :param timeout: Time to wait for a message (in seconds) :return: The message value (decoded) or None if no message is received """ consumer = KafkaIntegration._get_consumer() consumer.subscribe([topic]) try: msg = consumer.poll(timeout=timeout) if msg is None: logger.debug(f"No message received from topic {topic} within timeout") return None if msg.error(): logger.error(f"Consumer error: {msg.error()}") raise RuntimeError(f"Consumer error: {msg.error()}") # Decode and return the message value message_value = msg.value().decode("utf-8") logger.debug( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) message = json.loads(message_value) if message_value else None # Call the endpoint if provided if message: KafkaIntegration._call_disbursement_endpoint(message) return message except Exception as e: logger.error(f"Error while receiving message: {e}") raise @staticmethod def close_consumer(): """Shutdown consumer""" consumer = KafkaIntegration._get_consumer() consumer.close() logger.info("Kafka consumer closed") @staticmethod def _call_disbursement_endpoint(message, endpoint_url=f"{BASE_URL}/Disbursement"): """Call an HTTP endpoint with the received message""" try: response = requests.post(endpoint_url, json=message, timeout=5) response.raise_for_status() logger.info( f"Successfully sent message to {endpoint_url}: {response.status_code}" ) print(response.json()) except requests.exceptions.RequestException as e: logger.error(f"Failed to call endpoint {endpoint_url}: {e}") raise