Files
FirstCore/app/models/loan.py
T
2025-04-27 20:12:17 -04:00

156 lines
6.0 KiB
Python

from datetime import datetime, timezone
from app.extensions import db
from app.models.customer import Customer
from app.models.account import Account
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship
from sqlalchemy import and_, or_, not_
from app.models import Customer
class Loan(db.Model):
__tablename__ = 'loans'
#
# id = db.Column(
# db.Integer,
# primary_key=True,
# autoincrement=True,
# )
# customer_id = db.Column(db.String(50), nullable=False)
# account_id = db.Column(db.String(50), nullable=False)
# offer_id = db.Column(db.String(20), nullable=False)
# initial_loan_amount = db.Column(db.Float, nullable=False)
# status = db.Column(db.String(20), default='pending')
# created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
# updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
# product_id = db.Column(db.String(20))
# current_loan_amount = db.Column(db.Float)
# default_penalty_fee = db.Column(db.Float)
# continuous_fee = db.Column(db.Float)
# due_date = db.Column(db.DateTime)
#
# __tablename__ = 'loans'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
original_transaction = db.Column(db.String(50), nullable=True)
account_id = db.Column(db.String(50), nullable=False)
offer_id = db.Column(db.String(20), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
collection_type = db.Column(db.String(20), nullable=True)
current_loan_amount = db.Column(db.Float, nullable=True)
initial_loan_amount = db.Column(db.Float, nullable=False)
default_penalty_fee = db.Column(db.Float, default=0)
continuous_fee = db.Column(db.Float, default=0)
upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
installment_amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), default='pending')
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
customer = relationship(
"Customer",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys=[customer_id],
back_populates="loans",
)
@classmethod
def get_all_loans(cls, customer_id=None, account_id=None, status=None, offer_id=None,
product_id=None, start_date=None, end_date=None, due_before=None, due_after=None,
page=1, limit=20):
"""
Get all loans with optional filtering
Args:
customer_id (str, optional): Filter by customer ID
account_id (str, optional): Filter by account ID
status (str, optional): Filter by loan status
offer_id (str, optional): Filter by offer ID
product_id (str, optional): Filter by product ID
start_date (datetime, optional): Filter by start date (created_at)
end_date (datetime, optional): Filter by end date (created_at)
due_before (datetime, optional): Filter loans due before this date
due_after (datetime, optional): Filter loans due after this date
Returns:
list: List of Loan objects
"""
query = cls.query
logger.info(f"Get all loan models from loans model cme back")
# Apply filters if provided
if customer_id:
query = query.filter(cls.customer_id == customer_id)
if account_id:
query = query.filter(cls.account_id == account_id)
if status:
query = query.filter(cls.status == status)
if offer_id:
query = query.filter(cls.offer_id == offer_id)
if product_id:
query = query.filter(cls.product_id == product_id)
if start_date:
query = query.filter(cls.created_at >= start_date)
if end_date:
query = query.filter(cls.created_at <= end_date)
if due_before:
query = query.filter(cls.due_date <= due_before)
if due_after:
query = query.filter(cls.due_date >= due_after)
# Order by created_at descending (newest first)
query = query.order_by(cls.created_at.desc())
# Get total count before pagination
total_count = query.count()
# Apply pagination
offset = (page - 1) * limit
query = query.limit(limit).offset(offset)
return query.all(), total_count
#
# def to_dict(self):
# """
# Convert the Loan object to a dictionary format for JSON serialization.
# """
# return {
# 'id': self.id,
# 'customer_id': self.customer_id,
# 'account_id': self.account_id,
# 'transaction_id': self.transaction_id,
# 'original_transaction': self.original_transaction,
# 'offer_id': self.offer_id,
# 'initial_loan_amount': self.initial_loan_amount,
# 'current_loan_amount': self.current_loan_amount,
# 'status': self.status,
# 'product_id': self.product_id,
# 'default_penalty_fee': self.default_penalty_fee,
# 'continuous_fee': self.continuous_fee,
# 'upfront_fee': self.upfront_fee,
# 'repayment_amount': self.repayment_amount,
# 'installment_amount': self.installment_amount,
# 'due_date': self.due_date.isoformat() if self.due_date else None,
# 'created_at': self.created_at.isoformat() if self.created_at else None,
# 'updated_at': self.updated_at.isoformat() if self.updated_at else None
# }
def __repr__(self):
return f'<Loan {self.id}>'