diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py index de1f661..c309263 100644 --- a/app/integrations/kafka.py +++ b/app/integrations/kafka.py @@ -42,7 +42,7 @@ class KafkaIntegration: ) @staticmethod - def receive_disbursement_messages(topic, timeout=1.0): + def receive_disbursement_messages(topic, timeout): """ Receive messages from a Kafka topic. @@ -56,25 +56,44 @@ class KafkaIntegration: logger.info( f"Waiting for messages from topic {topic} with this timeout: {timeout}..." ) - + message =[] try: msg = consumer.poll(timeout=timeout) + + logger.info(str(msg.value)) + + logger.info( + f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: " + ) + if msg is None: - logger.debug(f"No message received from topic {topic} within timeout") + logger.info(f"No message received from topic {topic} within timeout") return None if msg.error(): - logger.error(f"Consumer error: {msg.error()}") + logger.info(f"Consumer error: {msg.error()}") raise RuntimeError(f"Consumer error: {msg.error()}") # Decode and return the message value - message_value = msg.value().decode("utf-8") - logger.debug( + message_value= {"value":''} + if msg.value() != "": + message_value = msg.value().decode("utf-8") + logger.info( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) - message = json.loads(message_value) if message_value else None + + # Invalid Json will stop this process + try: + message = json.loads(message_value) if message_value else None + except ValueError as e: + return message # Invalid JSON JUST TURN BACK HERE + else: + pass # valid json # Call the endpoint if provided if message: + logger.info( + f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message}" + ) KafkaIntegration._call_disbursement_endpoint(message) return message @@ -83,6 +102,8 @@ class KafkaIntegration: logger.error(f"Error while receiving message: {e}") raise + #return [] + @staticmethod def close_consumer(): """Shutdown consumer""" @@ -101,5 +122,5 @@ class KafkaIntegration: f"Successfully sent message to disbursement endpoint: {response.status_code}" ) except Exception as e: - logger.error(f"Failed to call disbursement endpoint: {e}") - raise + logger.info(f"Failed to call disbursement endpoint: {e}") + #raise diff --git a/app/integrations/simbrella.py b/app/integrations/simbrella.py index 26ce965..db8f6a7 100644 --- a/app/integrations/simbrella.py +++ b/app/integrations/simbrella.py @@ -7,20 +7,50 @@ from flask import jsonify class SimbrellaClient: BASE_URL = settings.BANK_CALL_BASE_URL + BANK_CALL_BASE_URL = settings.BANK_CALL_BASE_URL @staticmethod def disbursement(data): - - api_url = f"{SimbrellaClient.BASE_URL}/Disbursement" - + BANK_CALL_BASE_URL = "https://bank-emulator.dev.simbrellang.net" + api_url = f"{BANK_CALL_BASE_URL}/Disbursement" + logger.info(f"BANK_CALL_BASE_URL = {BANK_CALL_BASE_URL}") logger.info(f"Calling disbursement endpoint with data: {data}") - response = requests.post( - api_url, - json=data, - headers=get_headers() - ) + data1 ={ + "requestId": "7876786", + "transactionId": "T001", + "debtId": "273194670", + "customerId": "CN621868", + "accountId": "2017821799", + "productId": "101", + "provideAmount": 100000, + "collectAmountInterest": 5000, + "collectAmountMgtFee": 1000, + "collectAmountInsurance": 1000, + "collectAmountVAT": 75, + "countryId": "01", + "comment": "Testing LoanRequest" + } + try: + # response = requests.post( + # api_url, + # json=data1, + # headers=get_headers() + # ) - logger.info(f"Disbursement response: {response.json()}") + # headers = { + # 'Content-Type': 'application/json', + # 'x-api_key': f'{settings.VALID_API_KEY}', + # 'App-Id': f'{settings.VALID_APP_ID}' + # } + # + response = requests.post(api_url, json=data1, timeout=10, headers=get_headers()) + logger.info(f"Disbursement response: {response.json()}") + logger.info(f"Here is your disbursement data: {data1}") + except Exception as e: + logger.info(f"Failed to call disbursement endpoint: {e}") + #raise + return 0 - return jsonify(response.json()), response.status_code \ No newline at end of file + # return jsonify(response.json()), response.status_code + return 1 \ No newline at end of file diff --git a/app/utils/auth.py b/app/utils/auth.py index 5b8ea56..5daf068 100644 --- a/app/utils/auth.py +++ b/app/utils/auth.py @@ -2,8 +2,14 @@ from app.config import settings def get_headers(): + # return { + # "Content-Type": "application/json", + # "x-api_key": settings.BANK_CALL_API_KEY, + # "App-Id": settings.BANK_CALL_APP_ID, + # } return { "Content-Type": "application/json", - "x-api_key": settings.BANK_CALL_API_KEY, - "App-Id": settings.BANK_CALL_APP_ID, + "x-api-key": "test-api-key-12345", + "App-Id": "app1", } + diff --git a/wsgi.py b/wsgi.py index 32de55c..55e8617 100644 --- a/wsgi.py +++ b/wsgi.py @@ -6,19 +6,30 @@ from app.utils.logger import logger app = create_app() if __name__ != "__main__": - # - # kafka = KafkaIntegration() - # - # logger.info("Starting Kafka consumer...") - # while True: - # message = kafka.receive_disbursement_messages( - # topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT - # ) - # - # if message: - # logger.info(f"Processed message: {message}") - # else: - # logger.info("No message received within timeout") + + #Expose WSGI app instance for Gunicorn + wsgi_app = app + + kafka = KafkaIntegration() + + logger.info("Starting Kafka consumer...") + while True: + try: + + message = kafka.receive_disbursement_messages( + topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT + ) + + if message: + logger.info(f"Processed message: {message}") + else: + logger.info("No message received within timeout") + + + except Exception as e: + logger.error(f"Error while receiving message: {e}") + raise + # Expose WSGI app instance for Gunicorn - wsgi_app = app + # wsgi_app = app