Merge branch 'loan_repayment_event' of DigiFi/digifi-BankToProductCore into master

This commit is contained in:
2025-04-10 17:42:43 +00:00
committed by Gogs
14 changed files with 270 additions and 44 deletions
+2 -1
View File
@@ -1 +1,2 @@
from .transaction_type import TransactionType
from .transaction_type import TransactionType
from .loan_status import LoanStatus
+6
View File
@@ -0,0 +1,6 @@
from enum import Enum
class LoanStatus(str, Enum):
PENDING = "pending"
ACTIVE = "active"
REPAID = "repaid"
+3 -3
View File
@@ -43,9 +43,9 @@ class KafkaIntegration:
@staticmethod
def send_loan_request(loan_data, request_id):
def send_loan_request(loan_data, request_id, topic):
"""
Send loan request to PROCESS_PAYMENT topic
Send loan request to topic
Args:
loan_data: Loan request payload as dict
@@ -58,7 +58,7 @@ class KafkaIntegration:
# Sending loan request message to Kafka
producer.produce(
topic="PROCESS_PAYMENT",
topic=topic,
key=str(request_id),
value=json.dumps(loan_data).encode("utf-8"),
callback=KafkaIntegration.delivery_report,
+6
View File
@@ -3,6 +3,7 @@ from app.api.enums import TransactionType
from flask import jsonify
from marshmallow import ValidationError
import logging
from app.api.integrations import KafkaIntegration
logger = logging.getLogger(__name__)
@@ -53,3 +54,8 @@ class BaseService:
type=cls.TRANSACTION_TYPE,
channel=validated_data.get("channel"),
)
@classmethod
def async_send_to_kafka(cls, loan_data, request_id, topic):
KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id, topic = topic)
KafkaIntegration.flush()
+19 -22
View File
@@ -5,10 +5,9 @@ from app.api.services.base_service import BaseService
from app.api.enums import TransactionType
from app.utils.logger import logger
from app.api.schemas.provide_loan import ProvideLoanSchema
from app.api.integrations import KafkaIntegration
from threading import Thread
from app.models.loan import Loan
from app.api.enums import LoanStatus
class ProvideLoanService(BaseService):
TRANSACTION_TYPE = TransactionType.PROVIDE_LOAN
@@ -34,6 +33,22 @@ class ProvideLoanService(BaseService):
if (ProvideLoanService.validate_account_ownership(account_id = account_id, customer_id = customer_id)):
# Save the loan details
loan = Loan.create_loan(
customer_id=customer_id,
account_id=account_id,
offer_id=validated_data.get('offerId'),
principal_amount=validated_data.get('requestedAmount'),
status=LoanStatus.ACTIVE
)
if not loan:
logger.error(f"Failed to save loan details")
return jsonify({
"message": "Failed to save loan details."
}), 400
# Log Transaction
transaction = ProvideLoanService.log_transaction(validated_data = validated_data)
if not transaction:
@@ -42,22 +57,6 @@ class ProvideLoanService(BaseService):
"message": "Failed to log transaction."
}), 400
# Save the loan details
loan_id = f"loan_{transaction_id}"
loan = Loan.create_loan(
customer_id=customer_id,
account_id=account_id,
offer_id=validated_data.get('offerId'),
principal_amount=validated_data.get('requestedAmount'),
status="active"
)
if not loan:
logger.error(f"Failed to save loan details")
return jsonify({
"message": "Failed to save loan details."
}), 400
else:
@@ -79,7 +78,7 @@ class ProvideLoanService(BaseService):
# KafkaIntegration.send_loan_request(loan_data = response_data, request_id = request_id)
# Call Kafka in a background thread
thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id))
thread = Thread(target=ProvideLoanService.async_send_to_kafka, args=(response_data, request_id, "PROCESS_PAYMENT"))
thread.start()
return response_data
@@ -106,7 +105,5 @@ class ProvideLoanService(BaseService):
}) , 500
def async_send_to_kafka(loan_data, request_id):
KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id)
KafkaIntegration.flush()
+29 -1
View File
@@ -1,9 +1,13 @@
from flask import request, jsonify
from marshmallow import ValidationError
from app.api.enums.loan_status import LoanStatus
from app.models import Repayment
from app.models.loan import Loan
from app.utils.logger import logger
from app.api.schemas.repayment import RepaymentSchema
from app.api.services.base_service import BaseService
from app.api.enums import TransactionType
from app.api.enums import TransactionType
from threading import Thread
class RepaymentService(BaseService):
TRANSACTION_TYPE = TransactionType.REPAYMENT
@@ -25,9 +29,29 @@ class RepaymentService(BaseService):
customer = RepaymentService.get_or_create_customer(validated_data)
account = customer.accounts[0]
validated_data['accountId'] = account.id
request_id = validated_data.get('requestId')
loan_id = validated_data.get('debtId')
if (RepaymentService.validate_account_ownership(account_id = account.id, customer_id = customer_id)):
# Save the repayment details
repayment = Repayment.create_repayment(
customer_id = customer_id,
loan_id = loan_id,
product_id = validated_data.get('productId')
)
if not repayment:
logger.error(f"Failed to save repayment details")
return jsonify({
"message": "Failed to save repayment details."
}), 400
#Update Loan status
Loan.update_status(loan_id = loan_id, status = LoanStatus.REPAID)
transaction = RepaymentService.log_transaction(validated_data = validated_data)
if not transaction:
@@ -54,6 +78,10 @@ class RepaymentService(BaseService):
# message="Repayment processed successfully"
# )
# Call Kafka in a background thread
thread = Thread(target=RepaymentService.async_send_to_kafka, args=(response_data, request_id, "LOAN_REPAYMENT"))
thread.start()
return response_data
except ValidationError as err:
+2 -1
View File
@@ -2,5 +2,6 @@ from .customer import Customer
from .account import Account
from .loan import Loan
from .transaction import Transaction
from .repayment import Repayment
__all__ = ['Customer', 'Account', 'Loan', 'Transaction']
__all__ = ['Customer', 'Account', 'Loan', 'Transaction', 'Repayment']
+8 -2
View File
@@ -1,6 +1,7 @@
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
from sqlalchemy.exc import IntegrityError
class Account(db.Model):
__tablename__ = 'accounts'
@@ -27,8 +28,13 @@ class Account(db.Model):
customer_id=customer_id,
account_type=account_type
)
db.session.add(account)
db.session.commit()
try:
db.session.add(account)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
raise ValueError(f"Database integrity error: {err}")
return account
@classmethod
+15 -10
View File
@@ -2,6 +2,7 @@ from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
from app.models.account import Account
from sqlalchemy.exc import IntegrityError
class Customer(db.Model):
__tablename__ = 'customers'
@@ -33,16 +34,20 @@ class Customer(db.Model):
# Create the customer
customer = cls(id=id, msisdn=msisdn, country_code=country_code)
db.session.add(customer)
# Create an associated account
account = Account.create_account(
id=account_id,
customer_id=id,
account_type=account_type
)
db.session.commit()
try:
db.session.add(customer)
# Create an associated account
account = Account.create_account(
id=account_id,
customer_id=id,
account_type=account_type
)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
raise ValueError(f"Database integrity error: {err}")
return customer
def __repr__(self):
+36 -2
View File
@@ -2,6 +2,7 @@ from datetime import datetime, timezone
from app.extensions import db
from app.models.customer import Customer
from app.models.account import Account
from sqlalchemy.exc import IntegrityError
class Loan(db.Model):
@@ -44,8 +45,12 @@ class Loan(db.Model):
status=status
)
db.session.add(loan)
db.session.commit()
try:
db.session.add(loan)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
raise ValueError(f"Database integrity error: {err}")
return loan
@@ -60,6 +65,35 @@ class Loan(db.Model):
return False
return True
@classmethod
def get_customer_loan(cls, loan_id, customer_id):
"""
Get customer's active loans.
"""
loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first()
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.")
return loan
@classmethod
def update_status(cls, loan_id, status):
"""
Update the status of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
if loan.status == status:
return
# Update loan status and the updated_at timestamp
loan.status = status
db.session.commit()
def __repr__(self):
return f'<Loan {self.id}>'
+55
View File
@@ -0,0 +1,55 @@
from datetime import datetime, timezone
from app.api.enums.loan_status import LoanStatus
from app.extensions import db
from app.models.customer import Customer
from app.models.loan import Loan
from sqlalchemy.exc import IntegrityError
class Repayment(db.Model):
__tablename__ = 'repayments'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
loan_id = db.Column(db.String(50), nullable=False)
customer_id = db.Column(db.String(50), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
@classmethod
def create_repayment(cls, customer_id, loan_id, product_id):
# Check customer exists
if not Customer.is_valid_customer(customer_id):
raise ValueError("Invalid customer")
# Check loan exists
loan = Loan.get_customer_loan(loan_id = loan_id, customer_id = customer_id)
# Check that the loan is active
if loan.status != LoanStatus.ACTIVE:
raise ValueError(f"Repayment cannot be processed. Loan status: ({loan.status})")
repayment = cls(
customer_id=customer_id,
loan_id=loan.id,
product_id=product_id,
)
try:
db.session.add(repayment)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
raise ValueError(f"Database integrity error: {err}")
return repayment
def __repr__(self):
return f'<Repayment {self.id}>'
+2 -2
View File
@@ -7,7 +7,7 @@
},
"debtId": {
"type": "string",
"example": "273194670"
"example": "10"
},
"productId": {
"type": "string",
@@ -19,7 +19,7 @@
},
"customerId": {
"type": "string",
"example": "CN621868"
"example": "CID0000025585"
},
"channel": {
"type": "string",
@@ -0,0 +1,86 @@
"""Migration on Thu Apr 10 16:21:45 UTC 2025
Revision ID: b8f6fd76ead8
Revises:
Create Date: 2025-04-10 16:22:15.946157
"""
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision = 'b8f6fd76ead8'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('repayments',
sa.Column('id', sa.Integer(), autoincrement=True, nullable=False),
sa.Column('loan_id', sa.String(length=50), nullable=False),
sa.Column('customer_id', sa.String(length=50), nullable=False),
sa.Column('product_id', sa.String(length=20), nullable=True),
sa.Column('created_at', sa.DateTime(), nullable=True),
sa.Column('updated_at', sa.DateTime(), nullable=True),
sa.PrimaryKeyConstraint('id')
)
with op.batch_alter_table('loans', schema=None) as batch_op:
batch_op.alter_column('id',
existing_type=sa.VARCHAR(length=50),
type_=sa.Integer(),
existing_nullable=False,
autoincrement=True,
existing_server_default=sa.text("nextval('loan_id_seq'::regclass)"))
with op.batch_alter_table('transactions', schema=None) as batch_op:
batch_op.alter_column('channel',
existing_type=sa.VARCHAR(length=8),
type_=sa.String(length=50),
existing_nullable=False)
batch_op.alter_column('created_at',
existing_type=postgresql.TIMESTAMP(timezone=True),
type_=sa.DateTime(),
existing_nullable=True,
existing_server_default=sa.text('now()'))
batch_op.alter_column('updated_at',
existing_type=postgresql.TIMESTAMP(timezone=True),
type_=sa.DateTime(),
existing_nullable=True,
existing_server_default=sa.text('now()'))
batch_op.drop_constraint('transactions_id_key', type_='unique')
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
with op.batch_alter_table('transactions', schema=None) as batch_op:
batch_op.create_unique_constraint('transactions_id_key', ['id'])
batch_op.alter_column('updated_at',
existing_type=sa.DateTime(),
type_=postgresql.TIMESTAMP(timezone=True),
existing_nullable=True,
existing_server_default=sa.text('now()'))
batch_op.alter_column('created_at',
existing_type=sa.DateTime(),
type_=postgresql.TIMESTAMP(timezone=True),
existing_nullable=True,
existing_server_default=sa.text('now()'))
batch_op.alter_column('channel',
existing_type=sa.String(length=50),
type_=sa.VARCHAR(length=8),
existing_nullable=False)
with op.batch_alter_table('loans', schema=None) as batch_op:
batch_op.alter_column('id',
existing_type=sa.Integer(),
type_=sa.VARCHAR(length=50),
existing_nullable=False,
autoincrement=True,
existing_server_default=sa.text("nextval('loan_id_seq'::regclass)"))
op.drop_table('repayments')
# ### end Alembic commands ###
+1
View File
@@ -1,6 +1,7 @@
#!/bin/sh
echo "Running DB migrations..."
flask db migrate -m "Migration on $(date)"
flask db upgrade
echo "Starting Gunicorn server..."