from confluent_kafka import Producer import json import logging from app.config import settings logger = logging.getLogger(__name__) class KafkaIntegration: _producer = None _config = { 'bootstrap.servers': settings.KAFKA_BROKER, 'client.id': 'loan-service-producer', 'acks': 'all', 'retries': 3 } @staticmethod def _get_producer(): """Kafka producer""" if not KafkaIntegration._producer: KafkaIntegration._producer = Producer(KafkaIntegration._config) logger.info(f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}") return KafkaIntegration._producer @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: logger.error(f'Message delivery failed: {err}') raise RuntimeError(f"Message delivery failed: {err}") else: logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') @staticmethod def send_loan_request(loan_data, request_id): """ Send loan request to PROCESS_PAYMENT topic Args: loan_data: Loan request payload as dict request_id: Unique request identifier (used as Kafka key) """ try: # Proceed to send loan request to Kafka producer = KafkaIntegration._get_producer() # Sending loan request message to Kafka producer.produce( topic="PROCESS_PAYMENT", key=str(request_id), value=json.dumps(loan_data).encode('utf-8'), callback=KafkaIntegration.delivery_report ) producer.poll(0) logger.info(f"Loan request {request_id} queued for processing") except Exception as e: logger.error(f"Failed to send loan request to Kafka: {str(e)}", exc_info=True) raise Exception(f"Failed to send loan request to Kafka: {str(e)}") @staticmethod def flush(): """Shutdown""" producer = KafkaIntegration._get_producer() producer.flush()