diff --git a/app/api/enums/__init__.py b/app/api/enums/__init__.py index 2e3a1d8..deb3f94 100644 --- a/app/api/enums/__init__.py +++ b/app/api/enums/__init__.py @@ -1 +1,2 @@ -from .transaction_type import TransactionType \ No newline at end of file +from .transaction_type import TransactionType +from .loan_status import LoanStatus \ No newline at end of file diff --git a/app/api/enums/loan_status.py b/app/api/enums/loan_status.py new file mode 100644 index 0000000..dace234 --- /dev/null +++ b/app/api/enums/loan_status.py @@ -0,0 +1,6 @@ +from enum import Enum + +class LoanStatus(str, Enum): + PENDING = "pending" + ACTIVE = "active" + REPAID = "repaid" \ No newline at end of file diff --git a/app/api/integrations/kafka.py b/app/api/integrations/kafka.py index 610ad1e..fa3f8d1 100644 --- a/app/api/integrations/kafka.py +++ b/app/api/integrations/kafka.py @@ -43,9 +43,9 @@ class KafkaIntegration: @staticmethod - def send_loan_request(loan_data, request_id): + def send_loan_request(loan_data, request_id, topic): """ - Send loan request to PROCESS_PAYMENT topic + Send loan request to topic Args: loan_data: Loan request payload as dict @@ -58,7 +58,7 @@ class KafkaIntegration: # Sending loan request message to Kafka producer.produce( - topic="PROCESS_PAYMENT", + topic=topic, key=str(request_id), value=json.dumps(loan_data).encode("utf-8"), callback=KafkaIntegration.delivery_report, diff --git a/app/api/services/base_service.py b/app/api/services/base_service.py index db4bc64..aa678e0 100644 --- a/app/api/services/base_service.py +++ b/app/api/services/base_service.py @@ -3,6 +3,7 @@ from app.api.enums import TransactionType from flask import jsonify from marshmallow import ValidationError import logging +from app.api.integrations import KafkaIntegration logger = logging.getLogger(__name__) @@ -53,3 +54,8 @@ class BaseService: type=cls.TRANSACTION_TYPE, channel=validated_data.get("channel"), ) + + @classmethod + def async_send_to_kafka(cls, loan_data, request_id, topic): + KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id, topic = topic) + KafkaIntegration.flush() diff --git a/app/api/services/provide_loan.py b/app/api/services/provide_loan.py index 214e58e..e268a92 100644 --- a/app/api/services/provide_loan.py +++ b/app/api/services/provide_loan.py @@ -5,10 +5,9 @@ from app.api.services.base_service import BaseService from app.api.enums import TransactionType from app.utils.logger import logger from app.api.schemas.provide_loan import ProvideLoanSchema -from app.api.integrations import KafkaIntegration from threading import Thread from app.models.loan import Loan - +from app.api.enums import LoanStatus class ProvideLoanService(BaseService): TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN @@ -34,6 +33,22 @@ class ProvideLoanService(BaseService): if (ProvideLoanService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): + # Save the loan details + loan = Loan.create_loan( + customer_id=customer_id, + account_id=account_id, + offer_id=validated_data.get('offerId'), + principal_amount=validated_data.get('requestedAmount'), + status=LoanStatus.ACTIVE + ) + + if not loan: + logger.error(f"Failed to save loan details") + return jsonify({ + "message": "Failed to save loan details." + }), 400 + + # Log Transaction transaction = ProvideLoanService.log_transaction(validated_data = validated_data) if not transaction: @@ -42,22 +57,6 @@ class ProvideLoanService(BaseService): "message": "Failed to log transaction." }), 400 - # Save the loan details - loan_id = f"loan_{transaction_id}" - - loan = Loan.create_loan( - customer_id=customer_id, - account_id=account_id, - offer_id=validated_data.get('offerId'), - principal_amount=validated_data.get('requestedAmount'), - status="active" - ) - - if not loan: - logger.error(f"Failed to save loan details") - return jsonify({ - "message": "Failed to save loan details." - }), 400 else: @@ -79,7 +78,7 @@ class ProvideLoanService(BaseService): # KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id) # Call Kafka in a background thread - thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id)) + thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id, "PROCESS_PAYMENT")) thread.start() return response_data @@ -106,7 +105,5 @@ class ProvideLoanService(BaseService): }) , 500 - def async_send_to_kafka(loan_data, request_id): - KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id) - KafkaIntegration.flush() + diff --git a/app/api/services/repayment.py b/app/api/services/repayment.py index e1500bf..d92f8b8 100644 --- a/app/api/services/repayment.py +++ b/app/api/services/repayment.py @@ -1,9 +1,13 @@ from flask import request, jsonify from marshmallow import ValidationError +from app.api.enums.loan_status import LoanStatus +from app.models import Repayment +from app.models.loan import Loan from app.utils.logger import logger from app.api.schemas.repayment import RepaymentSchema from app.api.services.base_service import BaseService -from app.api.enums import TransactionType +from app.api.enums import TransactionType +from threading import Thread class RepaymentService(BaseService): TRANSACTION_TYPE = TransactionType.REPAYMENT @@ -25,9 +29,29 @@ class RepaymentService(BaseService): customer = RepaymentService.get_or_create_customer(validated_data) account = customer.accounts[0] validated_data['accountId'] = account.id + request_id = validated_data.get('requestId') + loan_id = validated_data.get('debtId') if (RepaymentService.validate_account_ownership(account_id = account.id, customer_id = customer_id)): + + # Save the repayment details + repayment = Repayment.create_repayment( + customer_id = customer_id, + loan_id = loan_id, + product_id = validated_data.get('productId') + + ) + + if not repayment: + logger.error(f"Failed to save repayment details") + return jsonify({ + "message": "Failed to save repayment details." + }), 400 + + #Update Loan status + Loan.update_status(loan_id = loan_id, status = LoanStatus.REPAID) + transaction = RepaymentService.log_transaction(validated_data = validated_data) if not transaction: @@ -54,6 +78,10 @@ class RepaymentService(BaseService): # message="Repayment processed successfully" # ) + # Call Kafka in a background thread + thread = Thread(target=RepaymentService.async_send_to_kafka, args=(response_data, request_id, "LOAN_REPAYMENT")) + thread.start() + return response_data except ValidationError as err: diff --git a/app/models/__init__.py b/app/models/__init__.py index 8fd1277..af5353a 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -2,5 +2,6 @@ from .customer import Customer from .account import Account from .loan import Loan from .transaction import Transaction +from .repayment import Repayment -__all__ = ['Customer', 'Account', 'Loan', 'Transaction'] \ No newline at end of file +__all__ = ['Customer', 'Account', 'Loan', 'Transaction', 'Repayment'] \ No newline at end of file diff --git a/app/models/account.py b/app/models/account.py index 8c94098..af4501e 100644 --- a/app/models/account.py +++ b/app/models/account.py @@ -1,6 +1,7 @@ from datetime import datetime, timezone from sqlalchemy.orm import relationship from app.extensions import db +from sqlalchemy.exc import IntegrityError class Account(db.Model): __tablename__ = 'accounts' @@ -27,8 +28,13 @@ class Account(db.Model): customer_id=customer_id, account_type=account_type ) - db.session.add(account) - db.session.commit() + + try: + db.session.add(account) + db.session.commit() + except IntegrityError as err: + db.session.rollback() + raise ValueError(f"Database integrity error: {err}") return account @classmethod diff --git a/app/models/customer.py b/app/models/customer.py index e6418c1..e0fb316 100644 --- a/app/models/customer.py +++ b/app/models/customer.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from sqlalchemy.orm import relationship from app.extensions import db from app.models.account import Account +from sqlalchemy.exc import IntegrityError class Customer(db.Model): __tablename__ = 'customers' @@ -33,16 +34,20 @@ class Customer(db.Model): # Create the customer customer = cls(id=id, msisdn=msisdn, country_code=country_code) - db.session.add(customer) - - # Create an associated account - account = Account.create_account( - id=account_id, - customer_id=id, - account_type=account_type - ) - - db.session.commit() + try: + db.session.add(customer) + + # Create an associated account + account = Account.create_account( + id=account_id, + customer_id=id, + account_type=account_type + ) + + db.session.commit() + except IntegrityError as err: + db.session.rollback() + raise ValueError(f"Database integrity error: {err}") return customer def __repr__(self): diff --git a/app/models/loan.py b/app/models/loan.py index 81c86fd..0fec3d2 100644 --- a/app/models/loan.py +++ b/app/models/loan.py @@ -2,6 +2,7 @@ from datetime import datetime, timezone from app.extensions import db from app.models.customer import Customer from app.models.account import Account +from sqlalchemy.exc import IntegrityError class Loan(db.Model): @@ -44,8 +45,12 @@ class Loan(db.Model): status=status ) - db.session.add(loan) - db.session.commit() + try: + db.session.add(loan) + db.session.commit() + except IntegrityError as err: + db.session.rollback() + raise ValueError(f"Database integrity error: {err}") return loan @@ -60,6 +65,35 @@ class Loan(db.Model): return False return True + + @classmethod + def get_customer_loan(cls, loan_id, customer_id): + """ + Get customer's active loans. + """ + loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first() + if not loan: + raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.") + return loan + + @classmethod + def update_status(cls, loan_id, status): + """ + Update the status of the loan with the given loan_id. + """ + # Retrieve loan + loan = cls.query.get(loan_id) + + if not loan: + raise ValueError(f"Loan with ID {loan_id} does not exist.") + + if loan.status == status: + return + + # Update loan status and the updated_at timestamp + loan.status = status + + db.session.commit() def __repr__(self): return f'' \ No newline at end of file diff --git a/app/models/repayment.py b/app/models/repayment.py new file mode 100644 index 0000000..06b872c --- /dev/null +++ b/app/models/repayment.py @@ -0,0 +1,55 @@ +from datetime import datetime, timezone +from app.api.enums.loan_status import LoanStatus +from app.extensions import db +from app.models.customer import Customer +from app.models.loan import Loan +from sqlalchemy.exc import IntegrityError + + +class Repayment(db.Model): + __tablename__ = 'repayments' + + id = db.Column( + db.Integer, + primary_key=True, + autoincrement=True, + ) + loan_id = db.Column(db.String(50), nullable=False) + customer_id = db.Column(db.String(50), nullable=False) + product_id = db.Column(db.String(20), nullable=True) + created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc)) + updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc)) + + @classmethod + def create_repayment(cls, customer_id, loan_id, product_id): + + + # Check customer exists + if not Customer.is_valid_customer(customer_id): + raise ValueError("Invalid customer") + + # Check loan exists + loan = Loan.get_customer_loan(loan_id = loan_id, customer_id = customer_id) + + # Check that the loan is active + if loan.status != LoanStatus.ACTIVE: + raise ValueError(f"Repayment cannot be processed. Loan status: ({loan.status})") + + + repayment = cls( + customer_id=customer_id, + loan_id=loan.id, + product_id=product_id, + ) + + try: + db.session.add(repayment) + db.session.commit() + except IntegrityError as err: + db.session.rollback() + raise ValueError(f"Database integrity error: {err}") + + return repayment + + def __repr__(self): + return f'' diff --git a/app/swagger/schemas/RepaymentRequest.json b/app/swagger/schemas/RepaymentRequest.json index cb959a9..c31d585 100644 --- a/app/swagger/schemas/RepaymentRequest.json +++ b/app/swagger/schemas/RepaymentRequest.json @@ -7,7 +7,7 @@ }, "debtId": { "type": "string", - "example": "273194670" + "example": "10" }, "productId": { "type": "string", @@ -19,7 +19,7 @@ }, "customerId": { "type": "string", - "example": "CN621868" + "example": "CID0000025585" }, "channel": { "type": "string", diff --git a/migrations/versions/b8f6fd76ead8_migration_on_thu_apr_10_16_21_45_utc_.py b/migrations/versions/b8f6fd76ead8_migration_on_thu_apr_10_16_21_45_utc_.py new file mode 100644 index 0000000..69d16e7 --- /dev/null +++ b/migrations/versions/b8f6fd76ead8_migration_on_thu_apr_10_16_21_45_utc_.py @@ -0,0 +1,86 @@ +"""Migration on Thu Apr 10 16:21:45 UTC 2025 + +Revision ID: b8f6fd76ead8 +Revises: +Create Date: 2025-04-10 16:22:15.946157 + +""" +from alembic import op +import sqlalchemy as sa +from sqlalchemy.dialects import postgresql + +# revision identifiers, used by Alembic. +revision = 'b8f6fd76ead8' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('repayments', + sa.Column('id', sa.Integer(), autoincrement=True, nullable=False), + sa.Column('loan_id', sa.String(length=50), nullable=False), + sa.Column('customer_id', sa.String(length=50), nullable=False), + sa.Column('product_id', sa.String(length=20), nullable=True), + sa.Column('created_at', sa.DateTime(), nullable=True), + sa.Column('updated_at', sa.DateTime(), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + with op.batch_alter_table('loans', schema=None) as batch_op: + batch_op.alter_column('id', + existing_type=sa.VARCHAR(length=50), + type_=sa.Integer(), + existing_nullable=False, + autoincrement=True, + existing_server_default=sa.text("nextval('loan_id_seq'::regclass)")) + + with op.batch_alter_table('transactions', schema=None) as batch_op: + batch_op.alter_column('channel', + existing_type=sa.VARCHAR(length=8), + type_=sa.String(length=50), + existing_nullable=False) + batch_op.alter_column('created_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=True, + existing_server_default=sa.text('now()')) + batch_op.alter_column('updated_at', + existing_type=postgresql.TIMESTAMP(timezone=True), + type_=sa.DateTime(), + existing_nullable=True, + existing_server_default=sa.text('now()')) + batch_op.drop_constraint('transactions_id_key', type_='unique') + + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table('transactions', schema=None) as batch_op: + batch_op.create_unique_constraint('transactions_id_key', ['id']) + batch_op.alter_column('updated_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=True, + existing_server_default=sa.text('now()')) + batch_op.alter_column('created_at', + existing_type=sa.DateTime(), + type_=postgresql.TIMESTAMP(timezone=True), + existing_nullable=True, + existing_server_default=sa.text('now()')) + batch_op.alter_column('channel', + existing_type=sa.String(length=50), + type_=sa.VARCHAR(length=8), + existing_nullable=False) + + with op.batch_alter_table('loans', schema=None) as batch_op: + batch_op.alter_column('id', + existing_type=sa.Integer(), + type_=sa.VARCHAR(length=50), + existing_nullable=False, + autoincrement=True, + existing_server_default=sa.text("nextval('loan_id_seq'::regclass)")) + + op.drop_table('repayments') + # ### end Alembic commands ### diff --git a/scripts/entrypoint.sh b/scripts/entrypoint.sh index 8c7018a..3e73752 100755 --- a/scripts/entrypoint.sh +++ b/scripts/entrypoint.sh @@ -1,6 +1,7 @@ #!/bin/sh echo "Running DB migrations..." +flask db migrate -m "Migration on $(date)" flask db upgrade echo "Starting Gunicorn server..."