from flask import request, jsonify from marshmallow import ValidationError from app.api.integrations.kafka import KafkaIntegration from app.api.services.base_service import BaseService from app.api.enums import TransactionType from app.utils.logger import logger from app.api.schemas.provide_loan import ProvideLoanSchema from threading import Thread from app.models.loan import Loan from app.api.enums import LoanStatus from app.extensions import db class ProvideLoanService(BaseService): TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN @staticmethod def process_request(data): """ Process the ProvideLoan request. Args: data (dict): The request data. Returns: dict: A standardized response. """ try: with db.session.begin(): validated_data = ProvideLoanService.validate_data(data, ProvideLoanSchema()) account_id = validated_data.get('accountId') customer_id = validated_data.get('customerId') request_id = validated_data.get('requestId') transaction_id = validated_data.get('transactionId') if (ProvideLoanService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): # Save the loan details loan = Loan.create_loan( customer_id=customer_id, account_id=account_id, offer_id=validated_data.get('offerId'), principal_amount=validated_data.get('requestedAmount'), status=LoanStatus.ACTIVE ) if not loan: logger.error(f"Failed to save loan details") return jsonify({ "message": "Failed to save loan details." }), 400 db.session.flush() validated_data['refId'] = loan.id validated_data['refModel'] = "loan" # Log Transaction transaction = ProvideLoanService.log_transaction(validated_data = validated_data) if not transaction: logger.error(f"Failed to log transaction") return jsonify({ "message": "Failed to log transaction." }), 400 else: return jsonify({ "message": "Invalid Customer or Account" }), 400 response_data = { "requestId": request_id, "transactionId": transaction_id, "customerId": customer_id, "accountId": account_id, "msisdn": "3451342", "resultCode": "00", "resultDescription": "Successful" } # KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id) # Call Kafka in a background thread thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id, "PROCESS_PAYMENT")) thread.start() db.session.commit() return response_data except ValidationError as err: logger.error(f"Validation Error: {getattr(err, 'messages', str(err))}") db.session.rollback() return jsonify({ "message": "Validation exception" }) , 422 except ValueError as err: logger.error(f"{getattr(err, 'messages', str(err))}") db.session.rollback() return jsonify({ "message": str(err) }) , 400 except Exception as e: logger.error(f"An error occurred: {str(e)}", exc_info=True) db.session.rollback() return jsonify({ "message": "Internal Server Error" }) , 500