diff --git a/app/api/integrations/__init__.py b/app/api/integrations/__init__.py index 54ad10b..0fce6e8 100644 --- a/app/api/integrations/__init__.py +++ b/app/api/integrations/__init__.py @@ -1 +1,2 @@ -from .simbrella import SimbrellaClient \ No newline at end of file +from .simbrella import SimbrellaIntegration +from .kafka import KafkaIntegration \ No newline at end of file diff --git a/app/api/integrations/kafka.py b/app/api/integrations/kafka.py new file mode 100644 index 0000000..c03b81e --- /dev/null +++ b/app/api/integrations/kafka.py @@ -0,0 +1,77 @@ +from confluent_kafka import Producer +import json +import logging +from app.config import settings + +logger = logging.getLogger(__name__) + +class KafkaIntegration: + _producer = None + _config = { + 'bootstrap.servers': settings.KAFKA_BROKER, + 'client.id': 'loan-service-producer', + 'acks': 'all', + 'retries': 3 + } + + + @staticmethod + def _get_producer(): + """Kafka producer""" + if not KafkaIntegration._producer: + KafkaIntegration._producer = Producer(KafkaIntegration._config) + logger.info(f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}") + + return KafkaIntegration._producer + + + + @staticmethod + def delivery_report(err, msg): + """Called once for each message produced""" + if err is not None: + logger.error(f'Message delivery failed: {err}') + + raise RuntimeError(f"Message delivery failed: {err}") + + else: + logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') + + + + + @staticmethod + def send_loan_request(loan_data, request_id): + """ + Send loan request to PROCESS_PAYMENT topic + + Args: + loan_data: Loan request payload as dict + request_id: Unique request identifier (used as Kafka key) + """ + try: + + + # Proceed to send loan request to Kafka + producer = KafkaIntegration._get_producer() + + # Sending loan request message to Kafka + producer.produce( + topic="PROCESS_PAYMENT", + key=str(request_id), + value=json.dumps(loan_data).encode('utf-8'), + callback=KafkaIntegration.delivery_report + ) + producer.poll(0) + logger.info(f"Loan request {request_id} queued for processing") + + except Exception as e: + logger.error(f"Failed to send loan request to Kafka: {str(e)}", exc_info=True) + raise Exception(f"Failed to send loan request to Kafka: {str(e)}") + + + @staticmethod + def flush(): + """Shutdown""" + producer = KafkaIntegration._get_producer() + producer.flush() diff --git a/app/api/integrations/simbrella.py b/app/api/integrations/simbrella.py index 7b6296a..286ee9f 100644 --- a/app/api/integrations/simbrella.py +++ b/app/api/integrations/simbrella.py @@ -2,7 +2,7 @@ import requests from app.utils.logger import logger from app.config import settings -class SimbrellaClient: +class SimbrellaIntegration: BASE_URL = settings.SIMBRELLA_BASE_URL @staticmethod @@ -10,7 +10,7 @@ class SimbrellaClient: """ Calls the RACCheck endpoit """ - url = f"{SimbrellaClient.BASE_URL}/RACCheck" + url = f"{SimbrellaIntegration.BASE_URL}/RACCheck" payload = { "customerId": customer_id, @@ -39,7 +39,6 @@ class SimbrellaClient: # Raise an error for non-200 responses # response.raise_for_status() - return response.json() except requests.exceptions.RequestException as err: logger.error(f"RACCheck API call failed: {str(err)}", exc_info=True) diff --git a/app/api/middlewares/app_id_checker.py b/app/api/middlewares/app_id_checker.py index b674aa9..8f45a45 100644 --- a/app/api/middlewares/app_id_checker.py +++ b/app/api/middlewares/app_id_checker.py @@ -15,12 +15,12 @@ def require_app_id(f): if not app_id: logger.error("Unauthorized access: Missing App-ID.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 if app_id != VALID_APP_ID: logger.error(f"Unauthorized access: Invalid App-ID {app_id}.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 return f(*args, **kwargs) diff --git a/app/api/middlewares/basic_auth.py b/app/api/middlewares/basic_auth.py index 00a8771..fb0e8aa 100644 --- a/app/api/middlewares/basic_auth.py +++ b/app/api/middlewares/basic_auth.py @@ -11,7 +11,7 @@ def require_auth(f): def decorated(*args, **kwargs): auth = request.headers.get('Authorization') if not auth or not check_auth(auth): - return jsonify({"message": "Invalid request parameters"}), 401 + return jsonify({"message": "Invalid request"}), 401 return f(*args, **kwargs) return decorated diff --git a/app/api/middlewares/cors.py b/app/api/middlewares/cors.py index 7df0844..9964c15 100644 --- a/app/api/middlewares/cors.py +++ b/app/api/middlewares/cors.py @@ -4,4 +4,4 @@ from flask import request, jsonify def enforce_json(): """Middleware to enforce JSON Content-Type for incoming requests""" if request.method in ["POST", "PUT", "PATCH"] and request.content_type != "application/json": - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 diff --git a/app/api/middlewares/verify_api_key.py b/app/api/middlewares/verify_api_key.py index b395fc4..57a6fee 100644 --- a/app/api/middlewares/verify_api_key.py +++ b/app/api/middlewares/verify_api_key.py @@ -14,11 +14,11 @@ def require_api_key(f): if not api_key: logger.error("Unauthorized access: Missing API key.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 if api_key != VALID_API_KEY: logger.error("Unauthorized access: Invalid API key.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 return f(*args, **kwargs) diff --git a/app/api/services/eligibility_check.py b/app/api/services/eligibility_check.py index c5a5d15..de691fe 100644 --- a/app/api/services/eligibility_check.py +++ b/app/api/services/eligibility_check.py @@ -4,7 +4,7 @@ from app.api.services.base_service import BaseService from app.api.schemas.eligibility_check import EligibilityCheckSchema from marshmallow import ValidationError from app.api.enums import TransactionType -from app.api.integrations import SimbrellaClient +from app.api.integrations import SimbrellaIntegration class EligibilityCheckService(BaseService): TRANSACTION_TYPE = TransactionType.ELIGIBILITY_CHECK @@ -42,7 +42,7 @@ class EligibilityCheckService(BaseService): }), 400 # Call RACCheck - response = SimbrellaClient.rac_check( + response = SimbrellaIntegration.rac_check( customer_id = customer_id, account_id = account_id, transaction_id = transaction.id, diff --git a/app/api/services/loan_status.py b/app/api/services/loan_status.py index 95d5048..cdd161c 100644 --- a/app/api/services/loan_status.py +++ b/app/api/services/loan_status.py @@ -22,10 +22,11 @@ class LoanStatusService(BaseService): """ try: validated_data = LoanStatusService.validate_data(data, LoanStatusSchema()) - account_id = validated_data.get('accountId') customer_id = validated_data.get('customerId') + customer = LoanStatusService.get_or_create_customer(validated_data) + account = customer.accounts[0] - if (LoanStatusService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): + if (LoanStatusService.validate_account_ownership(account_id = account.id, customer_id = customer_id)): transaction = LoanStatusService.log_transaction(validated_data = validated_data) if not transaction: diff --git a/app/api/services/provide_loan.py b/app/api/services/provide_loan.py index 49dcefe..4c37218 100644 --- a/app/api/services/provide_loan.py +++ b/app/api/services/provide_loan.py @@ -1,9 +1,11 @@ from flask import request, jsonify from marshmallow import ValidationError +from app.api.integrations.kafka import KafkaIntegration from app.api.services.base_service import BaseService from app.api.enums import TransactionType from app.utils.logger import logger -from app.api.schemas.provide_loan import ProvideLoanSchema +from app.api.schemas.provide_loan import ProvideLoanSchema +from app.api.integrations import KafkaIntegration class ProvideLoanService(BaseService): TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN @@ -24,6 +26,7 @@ class ProvideLoanService(BaseService): validated_data = ProvideLoanService.validate_data(data, ProvideLoanSchema()) account_id = validated_data.get('accountId') customer_id = validated_data.get('customerId') + request_id = validated_data.get('requestId') if (ProvideLoanService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): transaction = ProvideLoanService.log_transaction(validated_data = validated_data) @@ -40,16 +43,17 @@ class ProvideLoanService(BaseService): response_data = { - "requestId": "202111170001371256908", + "requestId": request_id, "transactionId": "Tr201712RK9232P115", - "customerId": "CN621868", - "accountId": "ACN8263457", + "customerId": customer_id, + "accountId": account_id, "msisdn": "3451342", "resultCode": "00", "resultDescription": "Successful" } + response = KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id) return response_data @@ -72,4 +76,5 @@ class ProvideLoanService(BaseService): logger.error(f"An error occurred: {str(e)}", exc_info=True) return jsonify({ "message": "Internal Server Error" - }) , 500 \ No newline at end of file + }) , 500 + diff --git a/app/config.py b/app/config.py index fb66b2d..57f9f63 100644 --- a/app/config.py +++ b/app/config.py @@ -25,5 +25,8 @@ class Config: SQLALCHEMY_TRACK_MODIFICATIONS = False SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337") + KAFKA_BROKER = 'dev-events.simbrellang.net:9085' + KAFKA_PAYMENT_TOPIC = 'PROCESS_PAYMENT' + settings = Config() \ No newline at end of file diff --git a/app/models/account.py b/app/models/account.py index 95ddbc4..8c94098 100644 --- a/app/models/account.py +++ b/app/models/account.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from sqlalchemy.orm import relationship from app.extensions import db class Account(db.Model): @@ -12,6 +13,13 @@ class Account(db.Model): created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc)) updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc)) + customer = relationship( + "Customer", + primaryjoin="Customer.id == Account.customer_id", + foreign_keys=[customer_id], + back_populates="accounts", + ) + @classmethod def create_account(cls, id, customer_id, account_type, status='active'): account = cls( diff --git a/app/models/customer.py b/app/models/customer.py index a41efe3..2cb9c5d 100644 --- a/app/models/customer.py +++ b/app/models/customer.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from sqlalchemy.orm import relationship from app.extensions import db from app.models.account import Account @@ -11,6 +12,13 @@ class Customer(db.Model): created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc)) updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc)) + accounts = relationship( + "Account", + primaryjoin="Customer.id == Account.customer_id", + foreign_keys="Account.customer_id", + back_populates="customer", + ) + @classmethod def is_eligible(cls, customer_id): customer = cls.query.filter_by(id=customer_id).first() diff --git a/app/swagger/paths/CustomerConsent.json b/app/swagger/paths/CustomerConsent.json index c6da51c..f246704 100644 --- a/app/swagger/paths/CustomerConsent.json +++ b/app/swagger/paths/CustomerConsent.json @@ -43,7 +43,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/EligibilityCheck.json b/app/swagger/paths/EligibilityCheck.json index 9afbf64..f2142b1 100644 --- a/app/swagger/paths/EligibilityCheck.json +++ b/app/swagger/paths/EligibilityCheck.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/LoanStatus.json b/app/swagger/paths/LoanStatus.json index fbfd0c4..59d165b 100644 --- a/app/swagger/paths/LoanStatus.json +++ b/app/swagger/paths/LoanStatus.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/NotificationCallback.json b/app/swagger/paths/NotificationCallback.json index ed902ef..26af904 100644 --- a/app/swagger/paths/NotificationCallback.json +++ b/app/swagger/paths/NotificationCallback.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/ProvideLoan.json b/app/swagger/paths/ProvideLoan.json index 5c2a4e7..e875312 100644 --- a/app/swagger/paths/ProvideLoan.json +++ b/app/swagger/paths/ProvideLoan.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/Repayment.json b/app/swagger/paths/Repayment.json index 68fafa5..e88d18c 100644 --- a/app/swagger/paths/Repayment.json +++ b/app/swagger/paths/Repayment.json @@ -43,7 +43,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/SelectOffer.json b/app/swagger/paths/SelectOffer.json index 4c7069d..06c20bd 100644 --- a/app/swagger/paths/SelectOffer.json +++ b/app/swagger/paths/SelectOffer.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/requirements.txt b/requirements.txt index 7e47465..8e2c02f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -27,3 +27,6 @@ python-dotenv # Requests requests +# Kafka +confluent-kafka +