Files
2025-09-20 20:00:24 -04:00

261 lines
9.3 KiB
Python

# from datetime import datetime, timezone, timedelta
# from itertools import product
# from app.extensions import db
# from app.models.customer import Customer
# from app.models.account import Account
# from sqlalchemy.exc import IntegrityError
# from sqlalchemy.orm import relationship
# from dateutil.relativedelta import relativedelta
# from datetime import timedelta
# import logging
# from sqlalchemy import and_, or_, not_
# from sqlalchemy.sql import func
#
# logger = logging.getLogger(__name__)
#
#
# class Loan(db.Model):
# __tablename__ = 'loans'
#
# id = db.Column(
# db.Integer,
# primary_key=True,
# autoincrement=True,
# )
# customer_id = db.Column(db.String(50), nullable=False)
# transaction_id = db.Column(db.String(50), nullable=True)
# original_transaction = db.Column(db.String(50), nullable=True)
# account_id = db.Column(db.String(50), nullable=False)
# offer_id = db.Column(db.String(20), nullable=False)
# product_id = db.Column(db.String(20), nullable=True)
# collection_type = db.Column(db.String(20), nullable=True)
# current_loan_amount = db.Column(db.Float, nullable=True)
# initial_loan_amount = db.Column(db.Float, nullable=False)
# default_penalty_fee = db.Column(db.Float, default=0)
# continuous_fee = db.Column(db.Float, default=0)
# upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
# repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
# installment_amount = db.Column(db.Float, nullable=True, default=0.0)
# status = db.Column(db.String(20), default='pending')
# tenor = db.Column(db.Integer, nullable=True)
# due_date = db.Column(db.DateTime, nullable=True)
# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# eligible_amount = db.Column(db.Float, nullable=True, default=0.0)
# disburse_date = db.Column(db.DateTime, nullable=True)
# disburse_verify = db.Column(db.DateTime, nullable=True)
# reference = db.Column(db.String(50), nullable=True)
# disburse_result = db.Column(db.String(10), nullable=True)
# disburse_description = db.Column(db.String(100), nullable=True)
# verify_result = db.Column(db.String(10), nullable=True)
# verify_description = db.Column(db.String(100), nullable=True)
#
# # customer = relationship(
# # "Customer",
# # primaryjoin="Customer.id == Loan.customer_id",
# # foreign_keys=[customer_id],
# # back_populates="loans",
# # )
#
# loan_charges = relationship(
# "LoanCharge",
# primaryjoin="LoanCharge.loan_id == Loan.id",
# foreign_keys="LoanCharge.loan_id",
# back_populates="loan",
# )
#
# loan_repayment_schedules = relationship(
# "LoanRepaymentSchedule",
# primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
# foreign_keys="LoanRepaymentSchedule.loan_id",
# back_populates="loan",
# )
#
#
# @classmethod
# def create_loan(
# cls,
# customer_id,
# account_id,
# offer_id,
# product_id,
# initial_loan_amount,
# collection_type,
# transaction_id,
# original_transaction,
# upfront_fee,
# repayment_amount,
# installment_amount,
# tenor,
# eligible_amount,
# reference,
# status = "pending",
# ):
# # Check if customer exists
# customer = Customer.is_valid_customer(customer_id)
# if not customer:
# raise ValueError("Customer does not exist")
#
# now = datetime.now(timezone.utc)
# due_date = now + timedelta(days=tenor)
#
# # Create and save the loan
# loan = cls(
# customer_id = customer_id,
# account_id = account_id,
# offer_id = offer_id,
# product_id = product_id,
# collection_type = collection_type,
# transaction_id = transaction_id,
# original_transaction = original_transaction,
# initial_loan_amount = initial_loan_amount,
# current_loan_amount = initial_loan_amount,
# upfront_fee = upfront_fee,
# repayment_amount = repayment_amount,
# installment_amount = installment_amount,
# due_date=due_date,
# tenor = tenor,
# status = status,
# eligible_amount =eligible_amount,
# reference = reference,
# created_at=datetime.now(timezone.utc),
# updated_at=datetime.now(timezone.utc)
# )
#
# try:
# db.session.add(loan)
# except IntegrityError as err:
# raise ValueError(f"Database integrity error: {err}")
# return loan
#
# @classmethod
# def has_active_loans(cls, customer_id):
# active_loans = cls.query.filter_by(
# customer_id=customer_id,
# status='active'
# ).count()
#
# if active_loans > 0:
# return False
# return True
#
#
# @classmethod
# def get_customer_loan(cls, loan_id, customer_id):
# """
# Get customer's active loans by loan_id.
# """
# loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first()
# if not loan:
# raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.")
# return loan
#
# @classmethod
# def get_customer_original_loan(cls, customer_id, original_transaction):
# """
# Get customer's original loan offer.
# """
# original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first()
# if not original_loan:
# return None
#
# logger.info(f" get_customer_original_loan ==>>>> {original_loan}")
# return original_loan
#
# @classmethod
# def get_customer_last_loan(cls, customer_id):
# """
# Get customer's active loans.
# """
# logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}")
# # loan = cls.query.filter_by( cls.customer_id == customer_id).first()
# loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first()
#
# if not loan:
# return None
# # loan = {
# # "original_transaction":"",
# # "eligible_amount": 0,
# # "loan_amount": 0,
# # "customer_id": customer_id,
# # "transaction_id": "",
# # "resultDescription": "No Active Loan"
# # }
# logger.info(f" get_customer_last_loan ==>>>> {loan}")
# return loan
#
# @classmethod
# def get_active_loans_by_original_transaction(cls, original_transaction_id):
# """
# Get all active loans with the same original_transaction ID.
# """
#
# active_loans = cls.query.filter_by(
# original_transaction=original_transaction_id,
# # status='active'
# ).all()
#
# return active_loans
#
#
# @classmethod
# def update_status(cls, loan_id, status):
# """
# Update the status of the loan with the given loan_id.
# """
# # Retrieve loan
# loan = cls.query.get(loan_id)
#
# if not loan:
# raise ValueError(f"Loan with ID {loan_id} does not exist.")
#
# if loan.status == status:
# return
#
# # Update loan status and the updated_at timestamp
# loan.status = status
#
#
# @classmethod
# def get_daily_loan_count(cls, customer_id, product_id):
# """
# Returns the count of loans created today for a customer.
# """
#
# start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
# end_of_day = start_of_day + timedelta(days=1)
#
# return cls.query.filter_by(
# customer_id=customer_id,
# product_id=product_id,
# ).filter(
# cls.created_at >= start_of_day,
# cls.created_at < end_of_day
# ).count()
#
#
# def to_dict(self):
# """
# Convert the Loan object to a dictionary format for JSON serialization.
# """
# return {
# 'debtId': self.id,
# 'transactionId': self.transaction_id,
# 'loanRef': self.reference,
# 'productId': self.product_id,
# 'initialLoanAmount': self.initial_loan_amount,
# 'currentLoanAmount': self.current_loan_amount,
# 'defaultPenaltyFee': self.default_penalty_fee,
# 'continuousFee': self.continuous_fee,
# 'collectionType': self.collection_type,
# 'upfrontFee': self.upfront_fee,
# 'repaymentAmount': self.repayment_amount,
# 'installmentAmount': self.installment_amount,
# 'status': self.status,
# 'tenor': self.tenor,
# 'dueDate': self.due_date.isoformat() if self.due_date else None,
# 'loanDate': self.created_at.isoformat() if self.created_at else None,
# }
#
# def __repr__(self):
# return f'<Loan {self.id}>'