104 lines
4.2 KiB
Python
104 lines
4.2 KiB
Python
from datetime import datetime, timezone
|
|
from app.extensions import db
|
|
from app.utils.logger import logger
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
# from dateutil.relativedelta import relativedelta
|
|
|
|
class LoanRepaymentSchedule(db.Model):
|
|
__tablename__ = 'loan_repayment_schedules'
|
|
|
|
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
|
|
loan_id = db.Column(db.Integer, nullable=False)
|
|
transaction_id = db.Column(db.String(50), nullable=True)
|
|
product_id = db.Column(db.String(20), nullable=True)
|
|
installment_number = db.Column(db.Integer, nullable=False)
|
|
due_date = db.Column(db.DateTime, nullable=False)
|
|
installment_amount= db.Column(db.Float, default=0.0)
|
|
total_repayment_amount = db.Column(db.Float, default=0.0)
|
|
paid = db.Column(db.Boolean, default=False)
|
|
paid_at = db.Column(db.DateTime, nullable=True)
|
|
due_process_date = db.Column(db.DateTime, nullable=True)
|
|
due_process_count = db.Column(db.Integer, default=0)
|
|
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
|
|
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
|
|
|
|
|
|
|
|
def to_dict(self):
|
|
return {
|
|
'id': self.id,
|
|
'loan_id': self.loan_id,
|
|
'product_id': self.product_id,
|
|
'transaction_id': self.transaction_id,
|
|
'installment_number': self.installment_number,
|
|
'due_date': self.due_date.isoformat() if self.due_date else None,
|
|
'installment_amount': self.installment_amount,
|
|
'total_repayment_amount': self.total_repayment_amount,
|
|
'paid': self.paid,
|
|
'due_process_date': self.due_process_date.isoformat() if self.due_process_date else None,
|
|
'due_process_count': self.due_process_count,
|
|
'paid_at': self.paid_at.isoformat() if self.paid_at else None,
|
|
'created_at': self.created_at.isoformat() if self.created_at else None,
|
|
'updated_at': self.updated_at.isoformat() if self.updated_at else None
|
|
}
|
|
|
|
def __repr__(self):
|
|
return f'<LoanRepaymentSchedule Loan:{self.loan_id} Installment:{self.installment_number}>'
|
|
|
|
@classmethod
|
|
def get_repayment_schedule_by_loan_id(cls, loan_id):
|
|
"""
|
|
Get repayment schedule by loan ID
|
|
"""
|
|
try:
|
|
return cls.query.filter_by(loan_id=loan_id).all()
|
|
except Exception as e:
|
|
logger.error(f"Error fetching repayment schedule for loan_id={loan_id}: {e}")
|
|
return []
|
|
|
|
|
|
@classmethod
|
|
def get_repayment_schedule_by_transaction_id(cls, transaction_id):
|
|
"""
|
|
Get repayment schedule by transaction ID
|
|
"""
|
|
return cls.query.filter_by(transaction_id=transaction_id).all()
|
|
|
|
@classmethod
|
|
def update_repayment_schedule_status(cls, schedule_id, paid=False):
|
|
"""
|
|
Update repayment schedule status.
|
|
Args:
|
|
schedule_id (int): ID of the repayment schedule.
|
|
paid (bool): Whether the repayment is marked as paid or not.
|
|
Returns:
|
|
schedule (RepaymentSchedule | None): The updated schedule object, or None if not found.
|
|
"""
|
|
try:
|
|
# Fetch the repayment schedule by ID
|
|
schedule = cls.query.get(schedule_id)
|
|
|
|
if schedule:
|
|
# Update payment status
|
|
schedule.paid = paid
|
|
schedule.paid_at = datetime.now(timezone.utc) if paid else None
|
|
if schedule.due_process_count is None:
|
|
schedule.due_process_count = 0
|
|
schedule.due_process_count += 1
|
|
schedule.due_process_date = datetime.now(timezone.utc)
|
|
|
|
# Commit changes to the database
|
|
db.session.commit()
|
|
|
|
return schedule
|
|
|
|
except SQLAlchemyError as e:
|
|
# Rollback changes if something goes wrong at the DB level
|
|
db.session.rollback()
|
|
logger.error(f"Database error updating repayment schedule {schedule_id}: {e}")
|
|
return None
|
|
|
|
except Exception as e:
|
|
# Catch any other unexpected error
|
|
logger.error(f"Unexpected error updating repayment schedule {schedule_id}: {e}")
|
|
return None |