from confluent_kafka import Producer import json import logging from app.config import settings logger = logging.getLogger(__name__) class KafkaIntegration: _producer = None _config = { "bootstrap.servers": settings.KAFKA_BROKER, "client.id": "loan-service-producer", "acks": "all", "retries": 3, "debug": "broker,topic,msg", } @staticmethod def _get_producer(): """Kafka producer""" if not KafkaIntegration._producer: KafkaIntegration._producer = Producer(KafkaIntegration._config) logger.info( f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" ) return KafkaIntegration._producer @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: logger.error(f'Message delivery failed: {err}') raise RuntimeError(f"Message delivery failed: {err}") else: logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') @staticmethod def send_loan_request(loan_data, request_id, topic): """ Send loan request to topic Args: loan_data: Loan request payload as dict request_id: Unique request identifier (used as Kafka key) """ try: # Proceed to send loan request to Kafka producer = KafkaIntegration._get_producer() # Sending loan request message to Kafka producer.produce( topic=topic, key=str(request_id), value=json.dumps(loan_data).encode("utf-8"), callback=KafkaIntegration.delivery_report, ) producer.poll(0) logger.info(f"Loan request {request_id} queued for processing") except Exception as e: logger.error( f"Failed to send loan request to Kafka: {str(e)}", exc_info=True ) raise Exception(f"Failed to send loan request to Kafka: {str(e)}") @staticmethod def flush(): """Shutdown""" producer = KafkaIntegration._get_producer() producer.flush()