diff --git a/app/__init__.py b/app/__init__.py index 1a6ef13..1530810 100644 --- a/app/__init__.py +++ b/app/__init__.py @@ -26,6 +26,7 @@ def create_app(): CORS(app) JWTManager(app) + CORS(app, supports_credentials=True) # Swagger Doc SWAGGER_URL = app.config.get("SWAGGER_URL") diff --git a/app/api/integrations/__init__.py b/app/api/integrations/__init__.py index 54ad10b..0fce6e8 100644 --- a/app/api/integrations/__init__.py +++ b/app/api/integrations/__init__.py @@ -1 +1,2 @@ -from .simbrella import SimbrellaClient \ No newline at end of file +from .simbrella import SimbrellaIntegration +from .kafka import KafkaIntegration \ No newline at end of file diff --git a/app/api/integrations/kafka.py b/app/api/integrations/kafka.py index a7774e2..610ad1e 100644 --- a/app/api/integrations/kafka.py +++ b/app/api/integrations/kafka.py @@ -21,23 +21,26 @@ class KafkaIntegration: """Kafka producer""" if not KafkaIntegration._producer: KafkaIntegration._producer = Producer(KafkaIntegration._config) + logger.info( f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" ) return KafkaIntegration._producer + @staticmethod def delivery_report(err, msg): """Called once for each message produced""" if err is not None: - logger.error(f"Message delivery failed: {err}") - raise RuntimeError(f"Message delivery failed: {err}") + logger.error(f'Message delivery failed: {err}') + raise RuntimeError(f"Message delivery failed: {err}") + else: - logger.debug( - f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" - ) + logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') + + @staticmethod def send_loan_request(loan_data, request_id): @@ -59,6 +62,7 @@ class KafkaIntegration: key=str(request_id), value=json.dumps(loan_data).encode("utf-8"), callback=KafkaIntegration.delivery_report, + ) producer.poll(0) diff --git a/app/api/integrations/simbrella.py b/app/api/integrations/simbrella.py index 7b6296a..d933908 100644 --- a/app/api/integrations/simbrella.py +++ b/app/api/integrations/simbrella.py @@ -2,7 +2,7 @@ import requests from app.utils.logger import logger from app.config import settings -class SimbrellaClient: +class SimbrellaIntegration: BASE_URL = settings.SIMBRELLA_BASE_URL @staticmethod @@ -10,7 +10,7 @@ class SimbrellaClient: """ Calls the RACCheck endpoit """ - url = f"{SimbrellaClient.BASE_URL}/RACCheck" + url = f"{SimbrellaIntegration.BASE_URL}/RACCheck" payload = { "customerId": customer_id, @@ -37,10 +37,9 @@ class SimbrellaClient: response = requests.post(url, json=payload, timeout=10) # Raise an error for non-200 responses - # response.raise_for_status() - + response.raise_for_status() return response.json() except requests.exceptions.RequestException as err: logger.error(f"RACCheck API call failed: {str(err)}", exc_info=True) - return {"error": "RACCheck API error", "details": str(err)} + return {"error": "RACCheck API error"} diff --git a/app/api/middlewares/app_id_checker.py b/app/api/middlewares/app_id_checker.py index b674aa9..8f45a45 100644 --- a/app/api/middlewares/app_id_checker.py +++ b/app/api/middlewares/app_id_checker.py @@ -15,12 +15,12 @@ def require_app_id(f): if not app_id: logger.error("Unauthorized access: Missing App-ID.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 if app_id != VALID_APP_ID: logger.error(f"Unauthorized access: Invalid App-ID {app_id}.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 return f(*args, **kwargs) diff --git a/app/api/middlewares/basic_auth.py b/app/api/middlewares/basic_auth.py index 00a8771..fb0e8aa 100644 --- a/app/api/middlewares/basic_auth.py +++ b/app/api/middlewares/basic_auth.py @@ -11,7 +11,7 @@ def require_auth(f): def decorated(*args, **kwargs): auth = request.headers.get('Authorization') if not auth or not check_auth(auth): - return jsonify({"message": "Invalid request parameters"}), 401 + return jsonify({"message": "Invalid request"}), 401 return f(*args, **kwargs) return decorated diff --git a/app/api/middlewares/cors.py b/app/api/middlewares/cors.py index 7df0844..9964c15 100644 --- a/app/api/middlewares/cors.py +++ b/app/api/middlewares/cors.py @@ -4,4 +4,4 @@ from flask import request, jsonify def enforce_json(): """Middleware to enforce JSON Content-Type for incoming requests""" if request.method in ["POST", "PUT", "PATCH"] and request.content_type != "application/json": - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 diff --git a/app/api/middlewares/verify_api_key.py b/app/api/middlewares/verify_api_key.py index b395fc4..57a6fee 100644 --- a/app/api/middlewares/verify_api_key.py +++ b/app/api/middlewares/verify_api_key.py @@ -14,11 +14,11 @@ def require_api_key(f): if not api_key: logger.error("Unauthorized access: Missing API key.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 if api_key != VALID_API_KEY: logger.error("Unauthorized access: Invalid API key.") - return jsonify({"message": "Invalid request parameters"}), 400 + return jsonify({"message": "Invalid request"}), 400 return f(*args, **kwargs) diff --git a/app/api/services/eligibility_check.py b/app/api/services/eligibility_check.py index c5a5d15..f15a50e 100644 --- a/app/api/services/eligibility_check.py +++ b/app/api/services/eligibility_check.py @@ -4,7 +4,7 @@ from app.api.services.base_service import BaseService from app.api.schemas.eligibility_check import EligibilityCheckSchema from marshmallow import ValidationError from app.api.enums import TransactionType -from app.api.integrations import SimbrellaClient +from app.api.integrations import SimbrellaIntegration class EligibilityCheckService(BaseService): TRANSACTION_TYPE = TransactionType.ELIGIBILITY_CHECK @@ -42,14 +42,14 @@ class EligibilityCheckService(BaseService): }), 400 # Call RACCheck - response = SimbrellaClient.rac_check( + response = SimbrellaIntegration.rac_check( customer_id = customer_id, account_id = account_id, transaction_id = transaction.id, ) if "error" in response or response.get("status") != 200: - return jsonify({"message": "RACCheck failed", "error": response.get("message", response)}), 400 + return jsonify({"message": "RACCheck failed"}), 400 diff --git a/app/api/services/loan_status.py b/app/api/services/loan_status.py index 95d5048..cdd161c 100644 --- a/app/api/services/loan_status.py +++ b/app/api/services/loan_status.py @@ -22,10 +22,11 @@ class LoanStatusService(BaseService): """ try: validated_data = LoanStatusService.validate_data(data, LoanStatusSchema()) - account_id = validated_data.get('accountId') customer_id = validated_data.get('customerId') + customer = LoanStatusService.get_or_create_customer(validated_data) + account = customer.accounts[0] - if (LoanStatusService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): + if (LoanStatusService.validate_account_ownership(account_id = account.id, customer_id = customer_id)): transaction = LoanStatusService.log_transaction(validated_data = validated_data) if not transaction: diff --git a/app/api/services/provide_loan.py b/app/api/services/provide_loan.py index 49dcefe..b70f587 100644 --- a/app/api/services/provide_loan.py +++ b/app/api/services/provide_loan.py @@ -1,9 +1,13 @@ from flask import request, jsonify from marshmallow import ValidationError +from app.api.integrations.kafka import KafkaIntegration from app.api.services.base_service import BaseService from app.api.enums import TransactionType from app.utils.logger import logger -from app.api.schemas.provide_loan import ProvideLoanSchema +from app.api.schemas.provide_loan import ProvideLoanSchema +from app.api.integrations import KafkaIntegration +from threading import Thread + class ProvideLoanService(BaseService): TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN @@ -24,6 +28,7 @@ class ProvideLoanService(BaseService): validated_data = ProvideLoanService.validate_data(data, ProvideLoanSchema()) account_id = validated_data.get('accountId') customer_id = validated_data.get('customerId') + request_id = validated_data.get('requestId') if (ProvideLoanService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): transaction = ProvideLoanService.log_transaction(validated_data = validated_data) @@ -40,16 +45,20 @@ class ProvideLoanService(BaseService): response_data = { - "requestId": "202111170001371256908", + "requestId": request_id, "transactionId": "Tr201712RK9232P115", - "customerId": "CN621868", - "accountId": "ACN8263457", + "customerId": customer_id, + "accountId": account_id, "msisdn": "3451342", "resultCode": "00", "resultDescription": "Successful" } + # KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id) + # Call Kafka in a background thread + thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id)) + thread.start() return response_data @@ -72,4 +81,11 @@ class ProvideLoanService(BaseService): logger.error(f"An error occurred: {str(e)}", exc_info=True) return jsonify({ "message": "Internal Server Error" - }) , 500 \ No newline at end of file + }) , 500 + + + def async_send_to_kafka(loan_data, request_id): + KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id) + KafkaIntegration.flush() + + diff --git a/app/config.py b/app/config.py index 6e1c431..b23548a 100644 --- a/app/config.py +++ b/app/config.py @@ -24,11 +24,15 @@ class Config: SQLALCHEMY_TRACK_MODIFICATIONS = False SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337") + JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "secret-key") JWT_ACCESS_TOKEN_EXPIRES = os.getenv("JWT_ACCESS_TOKEN_EXPIRES", timedelta(hours=1)) JWT_REFRESH_TOKEN_EXPIRES = os.getenv( "JWT_REFRESH_TOKEN_EXPIRES", timedelta(days=30) ) + KAFKA_BROKER = 'dev-events.simbrellang.net:9085' + KAFKA_PAYMENT_TOPIC = 'PROCESS_PAYMENT' + settings = Config() diff --git a/app/models/account.py b/app/models/account.py index 95ddbc4..8c94098 100644 --- a/app/models/account.py +++ b/app/models/account.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from sqlalchemy.orm import relationship from app.extensions import db class Account(db.Model): @@ -12,6 +13,13 @@ class Account(db.Model): created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc)) updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc)) + customer = relationship( + "Customer", + primaryjoin="Customer.id == Account.customer_id", + foreign_keys=[customer_id], + back_populates="accounts", + ) + @classmethod def create_account(cls, id, customer_id, account_type, status='active'): account = cls( diff --git a/app/models/customer.py b/app/models/customer.py index a41efe3..2cb9c5d 100644 --- a/app/models/customer.py +++ b/app/models/customer.py @@ -1,4 +1,5 @@ from datetime import datetime, timezone +from sqlalchemy.orm import relationship from app.extensions import db from app.models.account import Account @@ -11,6 +12,13 @@ class Customer(db.Model): created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc)) updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc)) + accounts = relationship( + "Account", + primaryjoin="Customer.id == Account.customer_id", + foreign_keys="Account.customer_id", + back_populates="customer", + ) + @classmethod def is_eligible(cls, customer_id): customer = cls.query.filter_by(id=customer_id).first() diff --git a/app/swagger/paths/CustomerConsent.json b/app/swagger/paths/CustomerConsent.json index c6da51c..f246704 100644 --- a/app/swagger/paths/CustomerConsent.json +++ b/app/swagger/paths/CustomerConsent.json @@ -43,7 +43,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/EligibilityCheck.json b/app/swagger/paths/EligibilityCheck.json index 9afbf64..f2142b1 100644 --- a/app/swagger/paths/EligibilityCheck.json +++ b/app/swagger/paths/EligibilityCheck.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/LoanStatus.json b/app/swagger/paths/LoanStatus.json index fbfd0c4..59d165b 100644 --- a/app/swagger/paths/LoanStatus.json +++ b/app/swagger/paths/LoanStatus.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/NotificationCallback.json b/app/swagger/paths/NotificationCallback.json index ed902ef..26af904 100644 --- a/app/swagger/paths/NotificationCallback.json +++ b/app/swagger/paths/NotificationCallback.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/ProvideLoan.json b/app/swagger/paths/ProvideLoan.json index 5c2a4e7..e875312 100644 --- a/app/swagger/paths/ProvideLoan.json +++ b/app/swagger/paths/ProvideLoan.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/Repayment.json b/app/swagger/paths/Repayment.json index 68fafa5..e88d18c 100644 --- a/app/swagger/paths/Repayment.json +++ b/app/swagger/paths/Repayment.json @@ -43,7 +43,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/app/swagger/paths/SelectOffer.json b/app/swagger/paths/SelectOffer.json index 4c7069d..06c20bd 100644 --- a/app/swagger/paths/SelectOffer.json +++ b/app/swagger/paths/SelectOffer.json @@ -44,7 +44,7 @@ } }, "400": { - "description": "Invalid request parameters" + "description": "Invalid request" }, "422": { "description": "Validation exception" diff --git a/requirements.txt b/requirements.txt index e322d87..9d5c945 100644 --- a/requirements.txt +++ b/requirements.txt @@ -30,3 +30,7 @@ requests # JWT flask-jwt-extended + +# Kafka +confluent-kafka==1.9.2 +