from datetime import datetime, timezone, timedelta from itertools import product from app.extensions import db from app.models.customer import Customer from app.models.account import Account from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import relationship from dateutil.relativedelta import relativedelta from datetime import timedelta import logging from sqlalchemy import and_, or_, not_ from sqlalchemy.sql import func logger = logging.getLogger(__name__) class Loan(db.Model): __tablename__ = 'loans' id = db.Column( db.Integer, primary_key=True, autoincrement=True, ) customer_id = db.Column(db.String(50), nullable=False) transaction_id = db.Column(db.String(50), nullable=True) original_transaction = db.Column(db.String(50), nullable=True) account_id = db.Column(db.String(50), nullable=False) offer_id = db.Column(db.String(20), nullable=False) product_id = db.Column(db.String(20), nullable=True) collection_type = db.Column(db.String(20), nullable=True) current_loan_amount = db.Column(db.Float, nullable=True) initial_loan_amount = db.Column(db.Float, nullable=False) default_penalty_fee = db.Column(db.Float, default=0) continuous_fee = db.Column(db.Float, default=0) upfront_fee = db.Column(db.Float, nullable=True, default=0.0) repayment_amount = db.Column(db.Float, nullable=True, default=0.0) installment_amount = db.Column(db.Float, nullable=True, default=0.0) status = db.Column(db.String(20), default='pending') tenor = db.Column(db.Integer, nullable=True) due_date = db.Column(db.DateTime, nullable=True) created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) eligible_amount = db.Column(db.Float, nullable=True, default=0.0) disburse_date = db.Column(db.DateTime, nullable=True) disburse_verify = db.Column(db.DateTime, nullable=True) reference = db.Column(db.String(50), nullable=True) disburse_result = db.Column(db.String(10), nullable=True) disburse_description = db.Column(db.String(100), nullable=True) verify_result = db.Column(db.String(10), nullable=True) verify_description = db.Column(db.String(100), nullable=True) customer = relationship( "Customer", primaryjoin="Customer.id == Loan.customer_id", foreign_keys=[customer_id], back_populates="loans", ) loan_charges = relationship( "LoanCharge", primaryjoin="LoanCharge.loan_id == Loan.id", foreign_keys="LoanCharge.loan_id", back_populates="loan", ) loan_repayment_schedules = relationship( "LoanRepaymentSchedule", primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id", foreign_keys="LoanRepaymentSchedule.loan_id", back_populates="loan", ) @classmethod def create_loan( cls, customer_id, account_id, offer_id, product_id, initial_loan_amount, collection_type, transaction_id, original_transaction, upfront_fee, repayment_amount, installment_amount, tenor, eligible_amount, reference, status = "pending", ): # Check if customer exists customer = Customer.is_valid_customer(customer_id) if not customer: raise ValueError("Customer does not exist") now = datetime.now(timezone.utc) due_date = now + timedelta(days=tenor) # Create and save the loan loan = cls( customer_id = customer_id, account_id = account_id, offer_id = offer_id, product_id = product_id, collection_type = collection_type, transaction_id = transaction_id, original_transaction = original_transaction, initial_loan_amount = initial_loan_amount, current_loan_amount = initial_loan_amount, upfront_fee = upfront_fee, repayment_amount = repayment_amount, installment_amount = installment_amount, due_date=due_date, tenor = tenor, status = status, eligible_amount =eligible_amount, reference = reference, created_at=datetime.now(timezone.utc), updated_at=datetime.now(timezone.utc) ) try: db.session.add(loan) except IntegrityError as err: raise ValueError(f"Database integrity error: {err}") return loan @classmethod def has_active_loans(cls, customer_id): active_loans = cls.query.filter_by( customer_id=customer_id, status='active' ).count() if active_loans > 0: return False return True @classmethod def get_customer_loan(cls, loan_id, customer_id): """ Get customer's active loans by loan_id. """ loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first() if not loan: raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.") return loan @classmethod def get_customer_original_loan(cls, customer_id, original_transaction): """ Get customer's original loan offer. """ original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first() if not original_loan: return None logger.info(f" get_customer_original_loan ==>>>> {original_loan}") return original_loan @classmethod def get_customer_last_loan(cls, customer_id): """ Get customer's active loans. """ logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}") # loan = cls.query.filter_by( cls.customer_id == customer_id).first() loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first() if not loan: return None # loan = { # "original_transaction":"", # "eligible_amount": 0, # "loan_amount": 0, # "customer_id": customer_id, # "transaction_id": "", # "resultDescription": "No Active Loan" # } logger.info(f" get_customer_last_loan ==>>>> {loan}") return loan @classmethod def get_active_loans_by_original_transaction(cls, original_transaction_id): """ Get all active loans with the same original_transaction ID. """ active_loans = cls.query.filter_by( original_transaction=original_transaction_id, # status='active' ).all() return active_loans @classmethod def update_status(cls, loan_id, status): """ Update the status of the loan with the given loan_id. """ # Retrieve loan loan = cls.query.get(loan_id) if not loan: raise ValueError(f"Loan with ID {loan_id} does not exist.") if loan.status == status: return # Update loan status and the updated_at timestamp loan.status = status @classmethod def get_daily_loan_count(cls, customer_id, product_id): """ Returns the count of loans created today for a customer. """ start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) end_of_day = start_of_day + timedelta(days=1) return cls.query.filter_by( customer_id=customer_id, product_id=product_id, ).filter( cls.created_at >= start_of_day, cls.created_at < end_of_day ).count() def to_dict(self): """ Convert the Loan object to a dictionary format for JSON serialization. """ return { 'debtId': self.id, 'transactionId': self.transaction_id, 'loanRef': self.reference, 'productId': self.product_id, 'initialLoanAmount': self.initial_loan_amount, 'currentLoanAmount': self.current_loan_amount, 'defaultPenaltyFee': self.default_penalty_fee, 'continuousFee': self.continuous_fee, 'collectionType': self.collection_type, 'upfrontFee': self.upfront_fee, 'repaymentAmount': self.repayment_amount, 'installmentAmount': self.installment_amount, 'status': self.status, 'tenor': self.tenor, 'dueDate': self.due_date.isoformat() if self.due_date else None, 'loanDate': self.created_at.isoformat() if self.created_at else None, } def __repr__(self): return f''