This commit is contained in:
Azeez Muibi
2025-04-24 12:42:35 +01:00
parent d80a43cb6f
commit 4e63905308
19 changed files with 786 additions and 218 deletions
+2 -1
View File
@@ -3,5 +3,6 @@ from .account import Account
from .loan import Loan
from .transaction import Transaction
from .user import User
from .repayment import Repayment
__all__ = ['Customer', 'Account', 'Loan', 'Transaction', User]
__all__ = ['Customer', 'Account', 'Loan', 'Transaction', User, Repayment]
+90
View File
@@ -0,0 +1,90 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.exc import IntegrityError
class LoanCharge(db.Model):
__tablename__ = 'loan_charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
code = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, default=0.00)
percent = db.Column(db.Float)
description = db.Column(db.String(255))
due = db.Column(db.Integer)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
transaction_id = db.Column(db.String(50))
due_date = db.Column(db.DateTime)
@classmethod
def get_all_loan_charges(cls, loan_id=None, code=None, start_date=None, end_date=None,
due_before=None, due_after=None, page=1, limit=20):
"""
Get all loan charges with optional filtering
Args:
loan_id (int, optional): Filter by loan ID
code (str, optional): Filter by charge code
start_date (datetime, optional): Filter by start date (created_at)
end_date (datetime, optional): Filter by end date (created_at)
due_before (datetime, optional): Filter charges due before this date
due_after (datetime, optional): Filter charges due after this date
page (int, optional): Page number for pagination
limit (int, optional): Number of items per page
Returns:
tuple: (list of LoanCharge objects, total count)
"""
query = cls.query
# Apply filters if provided
if loan_id:
query = query.filter(cls.loan_id == loan_id)
if code:
query = query.filter(cls.code == code)
if start_date:
query = query.filter(cls.created_at >= start_date)
if end_date:
query = query.filter(cls.created_at <= end_date)
if due_before:
query = query.filter(cls.due_date <= due_before)
if due_after:
query = query.filter(cls.due_date >= due_after)
# Order by created_at descending (newest first)
query = query.order_by(cls.created_at.desc())
# Get total count before pagination
total_count = query.count()
# Apply pagination
offset = (page - 1) * limit
query = query.limit(limit).offset(offset)
return query.all(), total_count
def to_dict(self):
"""
Convert the LoanCharge object to a dictionary format for JSON serialization.
"""
return {
'loan_id': self.loan_id,
'code': self.code,
'amount': self.amount,
'percent': self.percent,
'description': self.description,
'due': self.due,
'transaction_id': self.transaction_id,
'due_date': self.due_date.isoformat() if self.due_date else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<LoanCharge {self.id}>'
+58 -31
View File
@@ -1,51 +1,78 @@
from datetime import datetime, timezone
from app.api.enums.loan_status import LoanStatus
from app.extensions import db
from app.models.customer import Customer
from app.models.loan import Loan
from sqlalchemy.exc import IntegrityError
class Repayment(db.Model):
__tablename__ = 'repayments'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.String(50), nullable=False)
customer_id = db.Column(db.String(50), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
product_id = db.Column(db.String(50), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
transaction_id = db.Column(db.String(50))
@classmethod
def create_repayment(cls, customer_id, loan_id, product_id):
def get_all_repayments(cls, loan_id=None, customer_id=None, product_id=None,
start_date=None, end_date=None, page=1, limit=20):
"""
Get all repayments with optional filtering
# Check customer exists
if not Customer.is_valid_customer(customer_id):
raise ValueError("Invalid customer")
Args:
loan_id (str, optional): Filter by loan ID
customer_id (str, optional): Filter by customer ID
product_id (str, optional): Filter by product ID
start_date (datetime, optional): Filter by start date (created_at)
end_date (datetime, optional): Filter by end date (created_at)
page (int, optional): Page number for pagination
limit (int, optional): Number of items per page
# Check loan exists
loan = Loan.get_customer_loan(loan_id = loan_id, customer_id = customer_id)
# Check that the loan is active
if loan.status != LoanStatus.ACTIVE:
raise ValueError(f"Repayment cannot be processed. Loan status: ({loan.status})")
Returns:
tuple: (list of Repayment objects, total count)
"""
query = cls.query
repayment = cls(
customer_id=customer_id,
loan_id=loan_id,
product_id=product_id,
)
# Apply filters if provided
if loan_id:
query = query.filter(cls.loan_id == loan_id)
try:
db.session.add(repayment)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
if customer_id:
query = query.filter(cls.customer_id == customer_id)
return repayment
if product_id:
query = query.filter(cls.product_id == product_id)
if start_date:
query = query.filter(cls.created_at >= start_date)
if end_date:
query = query.filter(cls.created_at <= end_date)
# Order by created_at descending (newest first)
query = query.order_by(cls.created_at.desc())
# Get total count before pagination
total_count = query.count()
# Apply pagination
offset = (page - 1) * limit
query = query.limit(limit).offset(offset)
return query.all(), total_count
def to_dict(self):
"""
Convert the Repayment object to a dictionary format for JSON serialization.
"""
return {
'loan_id': self.loan_id,
'customer_id': self.customer_id,
'product_id': self.product_id,
'transaction_id': self.transaction_id,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<Repayment {self.id}>'
return f'<Repayment {self.id}>'