from confluent_kafka import Consumer, Producer import json from app.utils.logger import logger from app.config import settings import requests from app.integrations.simbrella import SimbrellaClient class KafkaIntegration: BASE_URL = settings.BANK_CALL_BASE_URL _consumer = None _consumer_config = { "bootstrap.servers": settings.KAFKA_BROKER, "group.id": "loan-service-consumer", "auto.offset.reset": "earliest", "enable.auto.commit": True, } @staticmethod def _get_consumer(): """Kafka consumer""" if not KafkaIntegration._consumer: KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) logger.info( f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" ) return KafkaIntegration._consumer @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: logger.error(f"Message delivery failed: {err}") raise RuntimeError(f"Message delivery failed: {err}") else: logger.debug( f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" ) @staticmethod def receive_messages(topics, timeout): """ Receive messages from a Kafka topic. :param topics: The Kafka topics to subscribe to :param timeout: Time to wait for a message (in seconds) :return: The message value (decoded) or None if no message is received """ consumer = KafkaIntegration._get_consumer() consumer.subscribe(topics) logger.info( f"Waiting for messages from topic {topics} with this timeout: {timeout}..." ) message = [] try: msg = consumer.poll(timeout=timeout) logger.info(str(msg.value)) logger.info( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: " ) if msg is None: logger.info(f"No message received from topic {topics} within timeout") return None if msg.error(): logger.info(f"Consumer error: {msg.error()}") raise RuntimeError(f"Consumer error: {msg.error()}") # Decode and return the message value message_value= {"value":''} if msg.value() != "": message_value = msg.value().decode("utf-8") logger.info( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) # Invalid Json will stop this process try: message = json.loads(message_value) if message_value else None except ValueError as e: return message # Invalid JSON JUST TURN BACK HERE else: pass # valid json # Call the endpoint if provided if message: logger.info( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message}" ) current_topic = msg.topic() if current_topic=="PROCESS_PAYMENT": KafkaIntegration._call_disbursement_service(message) if current_topic=="LOAN_REPAYMENT": KafkaIntegration._call_collect_loan_service(message) logger.info( f"Loan Repayment message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message}" ) return message except Exception as e: logger.error(f"Error while receiving message: {e}") raise #return [] @staticmethod def close_consumer(): """Shutdown consumer""" consumer = KafkaIntegration._get_consumer() consumer.close() logger.info("Kafka consumer closed") @staticmethod def _call_disbursement_service(message): """Call the disbursement service with the received message""" logger.info(f"Calling disbursement service with message: {message}") try: response = SimbrellaClient.disburse_loan(message) logger.info( f"Successfully sent message to disbursement service: {response}" ) # LoanService.set_disbursement_date(loan_id=loan_data['debtId'], # customer_id=customerId) # must mark it on way out # except Exception as e: logger.info(f"Failed to call disbursement service: {e}") #raise @staticmethod def _call_collect_loan_service(message): """Call the collect loan service with the received message""" logger.info(f"Calling collect_loan service with message: {message}") try: #Calling CollectLoan endpoint with data: {'transactionId': 'TRCVIC85641527829', 'customerId': 'ZX48440946', 'productId': 'AMPC', 'loanRef': 'TRCVIC85641527829USSDAMPC', 'debtId': '014231'} response = SimbrellaClient.collect_loan_user_initiated(message) logger.info( f"Successfully sent message to collect_loan service: {response}" ) except Exception as e: logger.error(f"Failed to call collect_loan service: {e}") # raise