118 lines
3.6 KiB
Python
118 lines
3.6 KiB
Python
from datetime import datetime, timezone
|
|
from app.extensions import db
|
|
from app.models.customer import Customer
|
|
from app.models.account import Account
|
|
from sqlalchemy.exc import IntegrityError
|
|
from sqlalchemy.orm import relationship
|
|
from app.models import Customer
|
|
|
|
|
|
class Loan(db.Model):
|
|
__tablename__ = 'loans'
|
|
|
|
id = db.Column(
|
|
db.Integer,
|
|
primary_key=True,
|
|
autoincrement=True,
|
|
)
|
|
customer_id = db.Column(db.String(50), nullable=False)
|
|
account_id = db.Column(db.String(50), nullable=False)
|
|
offer_id = db.Column(db.String(20), nullable=False)
|
|
principal_amount = db.Column(db.Float, nullable=False)
|
|
status = db.Column(db.String(20), default='pending')
|
|
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
|
|
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
|
|
|
|
customer = relationship(
|
|
"Customer",
|
|
primaryjoin="Customer.id == Loan.customer_id",
|
|
foreign_keys=[customer_id],
|
|
back_populates="loans",
|
|
)
|
|
|
|
@classmethod
|
|
def create_loan(cls, customer_id, account_id, offer_id, principal_amount, status='pending'):
|
|
|
|
# Check if customer exists
|
|
is_valid = Customer.is_valid_customer(customer_id)
|
|
if not is_valid:
|
|
raise ValueError("Customer does not exist")
|
|
|
|
# # Check for active loans
|
|
# has_active_loans = cls.has_active_loans(customer_id)
|
|
# if has_active_loans:
|
|
# raise ValueError("Customer has active loans")
|
|
|
|
|
|
# Create and save the loan
|
|
loan = cls(
|
|
customer_id=customer_id,
|
|
account_id=account_id,
|
|
offer_id=offer_id,
|
|
principal_amount=principal_amount,
|
|
status=status
|
|
)
|
|
|
|
try:
|
|
db.session.add(loan)
|
|
except IntegrityError as err:
|
|
raise ValueError(f"Database integrity error: {err}")
|
|
return loan
|
|
|
|
|
|
@classmethod
|
|
def has_active_loans(cls, customer_id):
|
|
active_loans = cls.query.filter_by(
|
|
customer_id=customer_id,
|
|
status='active'
|
|
).count()
|
|
|
|
if active_loans > 0:
|
|
return False
|
|
return True
|
|
|
|
|
|
@classmethod
|
|
def get_customer_loan(cls, loan_id, customer_id):
|
|
"""
|
|
Get customer's active loans.
|
|
"""
|
|
loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first()
|
|
if not loan:
|
|
raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.")
|
|
return loan
|
|
|
|
@classmethod
|
|
def update_status(cls, loan_id, status):
|
|
"""
|
|
Update the status of the loan with the given loan_id.
|
|
"""
|
|
# Retrieve loan
|
|
loan = cls.query.get(loan_id)
|
|
|
|
if not loan:
|
|
raise ValueError(f"Loan with ID {loan_id} does not exist.")
|
|
|
|
if loan.status == status:
|
|
return
|
|
|
|
# Update loan status and the updated_at timestamp
|
|
loan.status = status
|
|
|
|
def to_dict(self):
|
|
"""
|
|
Convert the Loan object to a dictionary format for JSON serialization.
|
|
"""
|
|
return {
|
|
'id': self.id,
|
|
'customer_id': self.customer_id,
|
|
'account_id': self.account_id,
|
|
'offer_id': self.offer_id,
|
|
'principal_amount': self.principal_amount,
|
|
'status': self.status,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'updated_at': self.updated_at.isoformat() if self.updated_at else None
|
|
}
|
|
|
|
def __repr__(self):
|
|
return f'<Loan {self.id}>' |