diff --git a/app/api/integrations/kafka.py b/app/api/integrations/kafka.py new file mode 100644 index 0000000..a7774e2 --- /dev/null +++ b/app/api/integrations/kafka.py @@ -0,0 +1,78 @@ +from confluent_kafka import Producer +import json +import logging +from app.config import settings + +logger = logging.getLogger(__name__) + + +class KafkaIntegration: + _producer = None + _config = { + "bootstrap.servers": settings.KAFKA_BROKER, + "client.id": "loan-service-producer", + "acks": "all", + "retries": 3, + "debug": "broker,topic,msg", + } + + @staticmethod + def _get_producer(): + """Kafka producer""" + if not KafkaIntegration._producer: + KafkaIntegration._producer = Producer(KafkaIntegration._config) + logger.info( + f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" + ) + + return KafkaIntegration._producer + + @staticmethod + def delivery_report(err, msg): + """Called once for each message produced""" + if err is not None: + logger.error(f"Message delivery failed: {err}") + raise RuntimeError(f"Message delivery failed: {err}") + + else: + logger.debug( + f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" + ) + + @staticmethod + def send_loan_request(loan_data, request_id): + """ + Send loan request to PROCESS_PAYMENT topic + + Args: + loan_data: Loan request payload as dict + request_id: Unique request identifier (used as Kafka key) + """ + try: + + # Proceed to send loan request to Kafka + producer = KafkaIntegration._get_producer() + + # Sending loan request message to Kafka + producer.produce( + topic="PROCESS_PAYMENT", + key=str(request_id), + value=json.dumps(loan_data).encode("utf-8"), + callback=KafkaIntegration.delivery_report, + ) + + producer.poll(0) + + logger.info(f"Loan request {request_id} queued for processing") + + except Exception as e: + logger.error( + f"Failed to send loan request to Kafka: {str(e)}", exc_info=True + ) + raise Exception(f"Failed to send loan request to Kafka: {str(e)}") + + @staticmethod + def flush(): + """Shutdown""" + producer = KafkaIntegration._get_producer() + producer.flush()