diff --git a/app/__init__.py b/app/__init__.py index d2fab1c..511caa4 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -17,7 +17,7 @@ def create_app(): app.config.from_object(Config) - CORS(app) + CORS(app, supports_credentials=True) # Swagger Doc SWAGGER_URL = app.config.get("SWAGGER_URL") diff --git a/app/api/integrations/kafka.py b/app/api/integrations/kafka.py index c03b81e..392f905 100644 --- a/app/api/integrations/kafka.py +++ b/app/api/integrations/kafka.py @@ -11,7 +11,8 @@ class KafkaIntegration: 'bootstrap.servers': settings.KAFKA_BROKER, 'client.id': 'loan-service-producer', 'acks': 'all', - 'retries': 3 + 'retries': 3, + 'debug': 'broker,topic,msg' } @@ -31,7 +32,6 @@ class KafkaIntegration: """Called once for each message produced""" if err is not None: logger.error(f'Message delivery failed: {err}') - raise RuntimeError(f"Message delivery failed: {err}") else: @@ -62,7 +62,9 @@ class KafkaIntegration: value=json.dumps(loan_data).encode('utf-8'), callback=KafkaIntegration.delivery_report ) + producer.poll(0) + logger.info(f"Loan request {request_id} queued for processing") except Exception as e: diff --git a/app/api/integrations/simbrella.py b/app/api/integrations/simbrella.py index 286ee9f..d933908 100644 --- a/app/api/integrations/simbrella.py +++ b/app/api/integrations/simbrella.py @@ -37,9 +37,9 @@ class SimbrellaIntegration: response = requests.post(url, json=payload, timeout=10) # Raise an error for non-200 responses - # response.raise_for_status() + response.raise_for_status() return response.json() except requests.exceptions.RequestException as err: logger.error(f"RACCheck API call failed: {str(err)}", exc_info=True) - return {"error": "RACCheck API error", "details": str(err)} + return {"error": "RACCheck API error"} diff --git a/app/api/services/eligibility_check.py b/app/api/services/eligibility_check.py index de691fe..f15a50e 100644 --- a/app/api/services/eligibility_check.py +++ b/app/api/services/eligibility_check.py @@ -49,7 +49,7 @@ class EligibilityCheckService(BaseService): ) if "error" in response or response.get("status") != 200: - return jsonify({"message": "RACCheck failed", "error": response.get("message", response)}), 400 + return jsonify({"message": "RACCheck failed"}), 400 diff --git a/app/api/services/provide_loan.py b/app/api/services/provide_loan.py index 4c37218..b70f587 100644 --- a/app/api/services/provide_loan.py +++ b/app/api/services/provide_loan.py @@ -6,6 +6,8 @@ from app.api.enums import TransactionType from app.utils.logger import logger from app.api.schemas.provide_loan import ProvideLoanSchema from app.api.integrations import KafkaIntegration +from threading import Thread + class ProvideLoanService(BaseService): TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN @@ -53,7 +55,10 @@ class ProvideLoanService(BaseService): } - response = KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id) + # KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id) + # Call Kafka in a background thread + thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id)) + thread.start() return response_data @@ -78,3 +83,9 @@ class ProvideLoanService(BaseService): "message": "Internal Server Error" }) , 500 + + def async_send_to_kafka(loan_data, request_id): + KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id) + KafkaIntegration.flush() + + diff --git a/requirements.txt b/requirements.txt index 8e2c02f..143f16e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,5 +28,5 @@ python-dotenv requests # Kafka -confluent-kafka +confluent-kafka==1.9.2