[add]: threading for Kafa integration
This commit is contained in:
+1
-1
@@ -17,7 +17,7 @@ def create_app():
|
||||
app.config.from_object(Config)
|
||||
|
||||
|
||||
CORS(app)
|
||||
CORS(app, supports_credentials=True)
|
||||
|
||||
# Swagger Doc
|
||||
SWAGGER_URL = app.config.get("SWAGGER_URL")
|
||||
|
||||
@@ -11,7 +11,8 @@ class KafkaIntegration:
|
||||
'bootstrap.servers': settings.KAFKA_BROKER,
|
||||
'client.id': 'loan-service-producer',
|
||||
'acks': 'all',
|
||||
'retries': 3
|
||||
'retries': 3,
|
||||
'debug': 'broker,topic,msg'
|
||||
}
|
||||
|
||||
|
||||
@@ -31,7 +32,6 @@ class KafkaIntegration:
|
||||
"""Called once for each message produced"""
|
||||
if err is not None:
|
||||
logger.error(f'Message delivery failed: {err}')
|
||||
|
||||
raise RuntimeError(f"Message delivery failed: {err}")
|
||||
|
||||
else:
|
||||
@@ -62,7 +62,9 @@ class KafkaIntegration:
|
||||
value=json.dumps(loan_data).encode('utf-8'),
|
||||
callback=KafkaIntegration.delivery_report
|
||||
)
|
||||
|
||||
producer.poll(0)
|
||||
|
||||
logger.info(f"Loan request {request_id} queued for processing")
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -37,9 +37,9 @@ class SimbrellaIntegration:
|
||||
response = requests.post(url, json=payload, timeout=10)
|
||||
|
||||
# Raise an error for non-200 responses
|
||||
# response.raise_for_status()
|
||||
response.raise_for_status()
|
||||
|
||||
return response.json()
|
||||
except requests.exceptions.RequestException as err:
|
||||
logger.error(f"RACCheck API call failed: {str(err)}", exc_info=True)
|
||||
return {"error": "RACCheck API error", "details": str(err)}
|
||||
return {"error": "RACCheck API error"}
|
||||
|
||||
@@ -49,7 +49,7 @@ class EligibilityCheckService(BaseService):
|
||||
)
|
||||
|
||||
if "error" in response or response.get("status") != 200:
|
||||
return jsonify({"message": "RACCheck failed", "error": response.get("message", response)}), 400
|
||||
return jsonify({"message": "RACCheck failed"}), 400
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -6,6 +6,8 @@ from app.api.enums import TransactionType
|
||||
from app.utils.logger import logger
|
||||
from app.api.schemas.provide_loan import ProvideLoanSchema
|
||||
from app.api.integrations import KafkaIntegration
|
||||
from threading import Thread
|
||||
|
||||
|
||||
class ProvideLoanService(BaseService):
|
||||
TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN
|
||||
@@ -53,7 +55,10 @@ class ProvideLoanService(BaseService):
|
||||
}
|
||||
|
||||
|
||||
response = KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id)
|
||||
# KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id)
|
||||
# Call Kafka in a background thread
|
||||
thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id))
|
||||
thread.start()
|
||||
|
||||
return response_data
|
||||
|
||||
@@ -78,3 +83,9 @@ class ProvideLoanService(BaseService):
|
||||
"message": "Internal Server Error"
|
||||
}) , 500
|
||||
|
||||
|
||||
def async_send_to_kafka(loan_data, request_id):
|
||||
KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id)
|
||||
KafkaIntegration.flush()
|
||||
|
||||
|
||||
|
||||
+1
-1
@@ -28,5 +28,5 @@ python-dotenv
|
||||
requests
|
||||
|
||||
# Kafka
|
||||
confluent-kafka
|
||||
confluent-kafka==1.9.2
|
||||
|
||||
|
||||
Reference in New Issue
Block a user