added extra query

This commit is contained in:
Chinenye Nmoh
2026-03-17 09:37:08 +01:00
parent bfb35e0285
commit 569d4c45d7
49 changed files with 4516 additions and 4505 deletions
+9 -9
View File
@@ -1,10 +1,10 @@
from .transactions import Transaction
from .repayment import Repayment
from .loan import Loan
from .loan_charge import LoanCharge
from .customer import Customer
from .account import Account
from .repayments_data import RepaymentsData
from .salary import Salary
from .transactions import Transaction
from .repayment import Repayment
from .loan import Loan
from .loan_charge import LoanCharge
from .customer import Customer
from .account import Account
from .repayments_data import RepaymentsData
from .salary import Salary
__all__ = ['Transaction', 'Repayment', 'Loan', 'LoanCharge', 'Customer', 'Account', 'RepaymentsData','Salary']
+25 -25
View File
@@ -1,25 +1,25 @@
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
class Account(db.Model):
__tablename__ = 'accounts'
id = db.Column(db.String(50), primary_key=True)
customer_id = db.Column(db.String(50), nullable=False)
account_type = db.Column(db.String(50))
status = db.Column(db.String(20), default='active')
lien_amount = db.Column(db.Float, default=0.0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
customer = relationship(
"Customer",
primaryjoin="Customer.id == Account.customer_id",
foreign_keys=[customer_id],
back_populates="accounts",
)
def __repr__(self):
return f'<Account {self.id}>'
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
class Account(db.Model):
__tablename__ = 'accounts'
id = db.Column(db.String(50), primary_key=True)
customer_id = db.Column(db.String(50), nullable=False)
account_type = db.Column(db.String(50))
status = db.Column(db.String(20), default='active')
lien_amount = db.Column(db.Float, default=0.0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
customer = relationship(
"Customer",
primaryjoin="Customer.id == Account.customer_id",
foreign_keys=[customer_id],
back_populates="accounts",
)
def __repr__(self):
return f'<Account {self.id}>'
+41 -41
View File
@@ -1,41 +1,41 @@
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
class Customer(db.Model):
__tablename__ = 'customers'
id = db.Column(db.String(50), primary_key=True)
msisdn = db.Column(db.String(20), unique=True, nullable=False)
country_code = db.Column(db.String(3), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
accounts = relationship(
"Account",
primaryjoin="Customer.id == Account.customer_id",
foreign_keys="Account.customer_id",
back_populates="customer",
)
loans = relationship(
"Loan",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys="Loan.customer_id",
back_populates="customer",
)
@classmethod
def get_customer(cls, customer_id):
"""
Get customer by ID.
"""
customer = cls.query.filter_by(id=customer_id).first()
if not customer:
raise ValueError(f"Customer does not exist")
return customer
def __repr__(self):
return f'<Customer {self.id}>'
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
class Customer(db.Model):
__tablename__ = 'customers'
id = db.Column(db.String(50), primary_key=True)
msisdn = db.Column(db.String(20), unique=True, nullable=False)
country_code = db.Column(db.String(3), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
accounts = relationship(
"Account",
primaryjoin="Customer.id == Account.customer_id",
foreign_keys="Account.customer_id",
back_populates="customer",
)
loans = relationship(
"Loan",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys="Loan.customer_id",
back_populates="customer",
)
@classmethod
def get_customer(cls, customer_id):
"""
Get customer by ID.
"""
customer = cls.query.filter_by(id=customer_id).first()
if not customer:
raise ValueError(f"Customer does not exist")
return customer
def __repr__(self):
return f'<Customer {self.id}>'
+415 -415
View File
@@ -1,415 +1,415 @@
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from dateutil.relativedelta import relativedelta
from datetime import timedelta
import logging
from sqlalchemy import and_, or_, not_
from sqlalchemy.sql import func
from app.utils.logger import logger
from app.extensions import db
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime, timezone
class Loan(db.Model):
__tablename__ = "loans"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
original_transaction = db.Column(db.String(50), nullable=True)
account_id = db.Column(db.String(50), nullable=False)
offer_id = db.Column(db.String(20), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
collection_type = db.Column(db.String(20), nullable=True)
current_loan_amount = db.Column(db.Float, nullable=True)
initial_loan_amount = db.Column(db.Float, nullable=False)
default_penalty_fee = db.Column(db.Float, default=0)
continuous_fee = db.Column(db.Float, default=0)
upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
balance = db.Column(db.Float, nullable=True, default=0.0)
installment_amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), default='pending')
tenor = db.Column(db.Integer, nullable=True)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
eligible_amount = db.Column(db.Float, nullable=True, default=0.0)
disburse_date = db.Column(db.DateTime, nullable=True)
disburse_verify = db.Column(db.DateTime, nullable=True)
disburse_result = db.Column(db.String(10), nullable=True)
disburse_description = db.Column(db.String(100), nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
reference = db.Column(db.String(50), nullable=True)
total_penal_charge = db.Column(db.Float, default=0.0)
last_penal_date = db.Column(db.DateTime, nullable=True)
customer = relationship(
"Customer",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys=[customer_id],
back_populates="loans",
)
loan_charges = relationship(
"LoanCharge",
primaryjoin="Loan.id == LoanCharge.loan_id",
foreign_keys="LoanCharge.loan_id",
back_populates="loan",
)
def __repr__(self):
return f"<Loan {self.id}>"
def to_dict(self):
"""
Convert the Loan object to a dictionary format for JSON serialization.
"""
return {
'debtId': self.id,
"customerId": self.customer_id,
'initialLoanAmount': self.initial_loan_amount,
'currentLoanAmount': self.current_loan_amount,
'defaultPenaltyFee': self.default_penalty_fee,
'continuousFee': self.continuous_fee,
'collectionType': self.collection_type,
'repaymentAmount':self.repayment_amount,
'status': self.status,
'productId': self.product_id,
'disburseResult': self.disburse_result,
'disburseDescription': self.disburse_description,
'verifyResult': self.verify_result,
'verifyDescription': self.verify_description,
'transactionId': self.transaction_id,
'accountId':self.account_id,
'dueDate': self.due_date.isoformat() if self.due_date else None,
'loanDate': self.created_at.isoformat() if self.created_at else None,
'disburseDate': self.disburse_date.isoformat() if self.disburse_date else None,
'disburseVerify': self.disburse_verify.isoformat() if self.disburse_verify else None,
'reference': self.reference,
'balance': self.balance,
'tenor': self.tenor,
'totalPenalCharge': self.total_penal_charge,
'lastPenalDate': self.last_penal_date
}
@classmethod
def get_loan_by_transaction_id(cls, transaction_id):
return cls.query.filter_by(transaction_id=transaction_id).first()
@classmethod
def get_loan_by_loan_id(cls, loan_id):
return cls.query.filter_by(id=loan_id).first()
@classmethod
def set_disbursement_date(cls, loan_id, customer_id):
"""
Update the disburse date of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Check if customer_id matches
if loan.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the loan's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update loan disburse_date
loan.disburse_date = current_time
# Commit changes to database
try:
logger.info(f"Updating disburse date for loan ID {loan_id} to {current_time}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disburse date: {e}")
raise
@classmethod
def set_disburse_verify_date(cls, loan_id, customer_id):
"""
Update the disburse verify date of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Check if customer_id matches
if loan.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the loan's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update loan verify_date
loan.disburse_verify = current_time
# Commit changes to database
try:
logger.info(f"Updating disburse verify date for loan ID {loan_id} to {current_time}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disburse verify date: {e}")
raise
@classmethod
def set_disbursement_message(cls, loan_id, description):
"""
Update the disburse result and description of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Update disburse description only
loan.disburse_description = description
# Commit changes to database
try:
logger.info(f"Updating disburse result for loan ID {loan_id} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disbursement result: {e}")
raise
@classmethod
def set_disbursement_result(cls, loan_id, result, description):
"""
Update the disburse result and description of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Update disburse result and description
loan.disburse_result = result
loan.disburse_description = description
# Commit changes to database
try:
logger.info(f"Updating disburse result for loan ID {loan_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disbursement result: {e}")
raise
@classmethod
def set_disburse_verify_result(cls, loan_id, result, description):
"""
Update the verify result and description of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Update disburse result and description
loan.verify_result = result
loan.verify_description = description
# Commit changes to database
try:
logger.info(f"Updating verify result for loan ID {loan_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update verify result: {e}")
raise
@classmethod
def get_latest_loan_without_disburse_date(cls):
"""
Get the latest loan without a disbursement date.
"""
logger.info("Fetching latest loan without disburse date")
try:
return cls.query.filter(
cls.disburse_date.is_(None)
).order_by(cls.created_at.desc()).first()
except Exception as e:
logger.error(f"Error fetching latest loan without disburse date: {e}")
raise
@classmethod
def get_latest_loan_with_disburse_date(cls):
"""
Get the latest loan with a disbursement date and no verification date.
"""
return cls.query.filter(
cls.disburse_date.isnot(None),
cls.disburse_verify.is_(None)
).order_by(cls.created_at.desc()).first()
@classmethod
def get_customer_loans(cls, customer_id):
"""
Get customer's active loans and sum by customer_id.
"""
customer_loans = cls.query.filter_by( customer_id = customer_id).all()
if not customer_loans:
raise ValueError(f"Customer with Id {customer_id} does not have any loan.")
total_amount = (
cls.query.with_entities(func.coalesce(func.sum(cls.balance), 0.0))
.filter_by(customer_id=customer_id)
.scalar()
)
logger.info(f"Found {len(customer_loans)} loans for customer ID: {customer_id} with total amount: {total_amount}")
return customer_loans, total_amount
@classmethod
def get_customer_active_loans(cls, customer_id):
"""
Get customer's active loans and sum by customer_id.
"""
customer_loans = cls.query.filter(
cls.customer_id == customer_id,
cls.status != 'repaid'
).all()
if not customer_loans:
raise ValueError(f"Customer with Id {customer_id} does not have any active loan.")
total_amount = (
cls.query
.with_entities(func.coalesce(func.sum(cls.balance), 0.0))
.filter(
cls.customer_id == customer_id,
cls.status != 'repaid'
)
.scalar()
)
logger.info(f"Found {len(customer_loans)} active loans for customer ID: {customer_id} with total amount: {total_amount}")
return customer_loans, total_amount
@classmethod
def update_status(cls, loan_id, status):
"""
Update the status of the loan record with the given loan_id.
"""
try:
# Retrieve loan record
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
if loan.status == status:
return loan.to_dict() # Still return the current state if no change
# Update status and timestamp
loan.status = status
loan.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info("Loan status updated and committed.")
return loan.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating loan status: {e}")
raise Exception(f"Error updating loan status: {str(e)}")
@classmethod
def update_loan_balance(cls, loan_id, amount_collected):
"""
Update the balance of a loan after successful repayment.
"""
try:
# Fetch the loan record
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Convert to Decimal and round to 2 decimal places
amount_collected = Decimal(str(amount_collected)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
balance = Decimal(str(loan.balance or 0)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
# Ensure valid repayment amount
if amount_collected <= Decimal("0.00"):
logger.info(f"Repayment amount is less than or equal to 0: {amount_collected}. Must be greater than 0.00")
if balance <= Decimal("0.00"):
raise ValueError("There is no balance for this loan.")
if amount_collected > balance:
# allow tiny rounding diff
if abs(amount_collected - balance) <= Decimal("0.01"):
amount_collected = balance
else:
raise ValueError("Repayment amount exceeds current loan balance.")
# Deduct the amount from the current balance
new_balance = balance - amount_collected
loan.balance = float(new_balance)
loan.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"Loan balance updated for loan ID {loan_id}. New balance: {loan.balance}")
return loan.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating loan balance: {e}")
raise Exception(f"Error updating loan balance: {str(e)}")
@classmethod
def get_overdue_loans(cls):
"""
Get all overdue loans.
"""
try:
overdue_loans = cls.query.filter(
cls.due_date < datetime.now(timezone.utc),
cls.status != 'repaid'
).all()
if not overdue_loans:
logger.info("No overdue loans found.")
return []
logger.info(f"Found {len(overdue_loans)} overdue loans.")
return overdue_loans
except Exception as e:
logger.error(f"Error fetching overdue loans: {e}")
return []
@classmethod
def apply_penal_to_loan(cls, loan_id, penal_amount):
loan = cls.query.get(loan_id)
if not loan:
raise ValueError("Loan not found")
penal_amount = Decimal(str(penal_amount))
loan.total_penal_charge = Decimal(str(loan.total_penal_charge or 0)) + penal_amount
loan.last_penal_date = datetime.now(timezone.utc)
db.session.commit()
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from dateutil.relativedelta import relativedelta
from datetime import timedelta
import logging
from sqlalchemy import and_, or_, not_
from sqlalchemy.sql import func
from app.utils.logger import logger
from app.extensions import db
from decimal import Decimal, ROUND_HALF_UP
from datetime import datetime, timezone
class Loan(db.Model):
__tablename__ = "loans"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
original_transaction = db.Column(db.String(50), nullable=True)
account_id = db.Column(db.String(50), nullable=False)
offer_id = db.Column(db.String(20), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
collection_type = db.Column(db.String(20), nullable=True)
current_loan_amount = db.Column(db.Float, nullable=True)
initial_loan_amount = db.Column(db.Float, nullable=False)
default_penalty_fee = db.Column(db.Float, default=0)
continuous_fee = db.Column(db.Float, default=0)
upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
balance = db.Column(db.Float, nullable=True, default=0.0)
installment_amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), default='pending')
tenor = db.Column(db.Integer, nullable=True)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
eligible_amount = db.Column(db.Float, nullable=True, default=0.0)
disburse_date = db.Column(db.DateTime, nullable=True)
disburse_verify = db.Column(db.DateTime, nullable=True)
disburse_result = db.Column(db.String(10), nullable=True)
disburse_description = db.Column(db.String(100), nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
reference = db.Column(db.String(50), nullable=True)
total_penal_charge = db.Column(db.Float, default=0.0)
last_penal_date = db.Column(db.DateTime, nullable=True)
customer = relationship(
"Customer",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys=[customer_id],
back_populates="loans",
)
loan_charges = relationship(
"LoanCharge",
primaryjoin="Loan.id == LoanCharge.loan_id",
foreign_keys="LoanCharge.loan_id",
back_populates="loan",
)
def __repr__(self):
return f"<Loan {self.id}>"
def to_dict(self):
"""
Convert the Loan object to a dictionary format for JSON serialization.
"""
return {
'debtId': self.id,
"customerId": self.customer_id,
'initialLoanAmount': self.initial_loan_amount,
'currentLoanAmount': self.current_loan_amount,
'defaultPenaltyFee': self.default_penalty_fee,
'continuousFee': self.continuous_fee,
'collectionType': self.collection_type,
'repaymentAmount':self.repayment_amount,
'status': self.status,
'productId': self.product_id,
'disburseResult': self.disburse_result,
'disburseDescription': self.disburse_description,
'verifyResult': self.verify_result,
'verifyDescription': self.verify_description,
'transactionId': self.transaction_id,
'accountId':self.account_id,
'dueDate': self.due_date.isoformat() if self.due_date else None,
'loanDate': self.created_at.isoformat() if self.created_at else None,
'disburseDate': self.disburse_date.isoformat() if self.disburse_date else None,
'disburseVerify': self.disburse_verify.isoformat() if self.disburse_verify else None,
'reference': self.reference,
'balance': self.balance,
'tenor': self.tenor,
'totalPenalCharge': self.total_penal_charge,
'lastPenalDate': self.last_penal_date
}
@classmethod
def get_loan_by_transaction_id(cls, transaction_id):
return cls.query.filter_by(transaction_id=transaction_id).first()
@classmethod
def get_loan_by_loan_id(cls, loan_id):
return cls.query.filter_by(id=loan_id).first()
@classmethod
def set_disbursement_date(cls, loan_id, customer_id):
"""
Update the disburse date of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Check if customer_id matches
if loan.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the loan's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update loan disburse_date
loan.disburse_date = current_time
# Commit changes to database
try:
logger.info(f"Updating disburse date for loan ID {loan_id} to {current_time}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disburse date: {e}")
raise
@classmethod
def set_disburse_verify_date(cls, loan_id, customer_id):
"""
Update the disburse verify date of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Check if customer_id matches
if loan.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the loan's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update loan verify_date
loan.disburse_verify = current_time
# Commit changes to database
try:
logger.info(f"Updating disburse verify date for loan ID {loan_id} to {current_time}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disburse verify date: {e}")
raise
@classmethod
def set_disbursement_message(cls, loan_id, description):
"""
Update the disburse result and description of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Update disburse description only
loan.disburse_description = description
# Commit changes to database
try:
logger.info(f"Updating disburse result for loan ID {loan_id} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disbursement result: {e}")
raise
@classmethod
def set_disbursement_result(cls, loan_id, result, description):
"""
Update the disburse result and description of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Update disburse result and description
loan.disburse_result = result
loan.disburse_description = description
# Commit changes to database
try:
logger.info(f"Updating disburse result for loan ID {loan_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update disbursement result: {e}")
raise
@classmethod
def set_disburse_verify_result(cls, loan_id, result, description):
"""
Update the verify result and description of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Update disburse result and description
loan.verify_result = result
loan.verify_description = description
# Commit changes to database
try:
logger.info(f"Updating verify result for loan ID {loan_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update verify result: {e}")
raise
@classmethod
def get_latest_loan_without_disburse_date(cls):
"""
Get the latest loan without a disbursement date.
"""
logger.info("Fetching latest loan without disburse date")
try:
return cls.query.filter(
cls.disburse_date.is_(None)
).order_by(cls.created_at.desc()).first()
except Exception as e:
logger.error(f"Error fetching latest loan without disburse date: {e}")
raise
@classmethod
def get_latest_loan_with_disburse_date(cls):
"""
Get the latest loan with a disbursement date and no verification date.
"""
return cls.query.filter(
cls.disburse_date.isnot(None),
cls.disburse_verify.is_(None)
).order_by(cls.created_at.desc()).first()
@classmethod
def get_customer_loans(cls, customer_id):
"""
Get customer's active loans and sum by customer_id.
"""
customer_loans = cls.query.filter_by( customer_id = customer_id).all()
if not customer_loans:
raise ValueError(f"Customer with Id {customer_id} does not have any loan.")
total_amount = (
cls.query.with_entities(func.coalesce(func.sum(cls.balance), 0.0))
.filter_by(customer_id=customer_id)
.scalar()
)
logger.info(f"Found {len(customer_loans)} loans for customer ID: {customer_id} with total amount: {total_amount}")
return customer_loans, total_amount
@classmethod
def get_customer_active_loans(cls, customer_id):
"""
Get customer's active loans and sum by customer_id.
"""
customer_loans = cls.query.filter(
cls.customer_id == customer_id,
cls.status != 'repaid'
).all()
if not customer_loans:
raise ValueError(f"Customer with Id {customer_id} does not have any active loan.")
total_amount = (
cls.query
.with_entities(func.coalesce(func.sum(cls.balance), 0.0))
.filter(
cls.customer_id == customer_id,
cls.status != 'repaid'
)
.scalar()
)
logger.info(f"Found {len(customer_loans)} active loans for customer ID: {customer_id} with total amount: {total_amount}")
return customer_loans, total_amount
@classmethod
def update_status(cls, loan_id, status):
"""
Update the status of the loan record with the given loan_id.
"""
try:
# Retrieve loan record
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
if loan.status == status:
return loan.to_dict() # Still return the current state if no change
# Update status and timestamp
loan.status = status
loan.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info("Loan status updated and committed.")
return loan.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating loan status: {e}")
raise Exception(f"Error updating loan status: {str(e)}")
@classmethod
def update_loan_balance(cls, loan_id, amount_collected):
"""
Update the balance of a loan after successful repayment.
"""
try:
# Fetch the loan record
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
# Convert to Decimal and round to 2 decimal places
amount_collected = Decimal(str(amount_collected)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
balance = Decimal(str(loan.balance or 0)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
# Ensure valid repayment amount
if amount_collected <= Decimal("0.00"):
logger.info(f"Repayment amount is less than or equal to 0: {amount_collected}. Must be greater than 0.00")
if balance <= Decimal("0.00"):
raise ValueError("There is no balance for this loan.")
if amount_collected > balance:
# allow tiny rounding diff
if abs(amount_collected - balance) <= Decimal("0.01"):
amount_collected = balance
else:
raise ValueError("Repayment amount exceeds current loan balance.")
# Deduct the amount from the current balance
new_balance = balance - amount_collected
loan.balance = float(new_balance)
loan.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"Loan balance updated for loan ID {loan_id}. New balance: {loan.balance}")
return loan.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating loan balance: {e}")
raise Exception(f"Error updating loan balance: {str(e)}")
@classmethod
def get_overdue_loans(cls):
"""
Get all overdue loans.
"""
try:
overdue_loans = cls.query.filter(
cls.due_date < datetime.now(timezone.utc),
cls.status != 'repaid'
).all()
if not overdue_loans:
logger.info("No overdue loans found.")
return []
logger.info(f"Found {len(overdue_loans)} overdue loans.")
return overdue_loans
except Exception as e:
logger.error(f"Error fetching overdue loans: {e}")
return []
@classmethod
def apply_penal_to_loan(cls, loan_id, penal_amount):
loan = cls.query.get(loan_id)
if not loan:
raise ValueError("Loan not found")
penal_amount = Decimal(str(penal_amount))
loan.total_penal_charge = Decimal(str(loan.total_penal_charge or 0)) + penal_amount
loan.last_penal_date = datetime.now(timezone.utc)
db.session.commit()
+126 -126
View File
@@ -1,127 +1,127 @@
from datetime import datetime, timezone, timedelta
from os.path import devnull
from sqlalchemy.exc import IntegrityError
from app.extensions import db
from sqlalchemy.orm import relationship
class LoanCharge(db.Model):
__tablename__ = 'loan_charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
code = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, default=0.0)
percent = db.Column(db.Float, default=0.0)
description = db.Column(db.Text, nullable=True)
due = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
loan = relationship(
"Loan",
primaryjoin="LoanCharge.loan_id == Loan.id",
foreign_keys=[loan_id],
back_populates="loan_charges",
)
def __repr__(self):
return f"<LoanCharge {self.id} - Loan {self.loan_id} - {self.code}>"
def to_dict(self):
"""
Convert the Loan charge object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'loanId': self.loan_id,
'transactionId': self.transaction_id,
'code': self.code,
'amount': self.amount,
'percent': self.percent,
'description': self.description,
'due': self.due
}
#get last penal
@classmethod
def get_last_penal_no(cls, loan_id):
"""
Returns the last penal number created for a loan.
Example:
PENAL1 -> returns 1
PENAL3 -> returns 3
If none exists, returns 0.
"""
last_penal = (
cls.query
.filter(cls.loan_id == loan_id)
.filter(cls.code.like("PENAL%"))
.order_by(cls.id.desc())
.first()
)
if not last_penal:
return 0
try:
return int(last_penal.code.replace("PENAL", ""))
except ValueError:
return 0
@classmethod
def get_penal_charges_by_loan_id(cls, loan_id):
"""
Returns all penal charges for a specific loan.
"""
return cls.query.filter(
cls.loan_id == loan_id,
cls.code.like("PENAL%")
).all()
@classmethod
def get_loan_charge_by_debt_id(cls, debt_id):
return cls.query.filter_by(loan_id=debt_id)
#create penal charge
@classmethod
def create_penal_charges_for_loan(cls, loan_id, transaction_id, percent, penal_no, schedule_number, penal_amount=0.0):
"""
Create a penal charge for a given loan and schedule.
"""
if loan_id is None:
raise ValueError("loan_id cannot be None")
code = f"PENAL{penal_no:02d}-SCHEDULE{schedule_number:02d}"
# Check if this penal charge already exists
existing = cls.query.filter_by(
loan_id=loan_id,
code=code
).first()
if existing:
return existing
now = datetime.now(timezone.utc)
penal_charge = cls(
loan_id=loan_id,
transaction_id=transaction_id,
code=code,
amount=penal_amount,
percent=percent,
description=f"Penal Charge {penal_no} for loan {loan_id} schedule {schedule_number}",
due=True,
due_date=now
)
try:
db.session.add(penal_charge)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
raise ValueError(f"Database integrity error: {err}")
from datetime import datetime, timezone, timedelta
from os.path import devnull
from sqlalchemy.exc import IntegrityError
from app.extensions import db
from sqlalchemy.orm import relationship
class LoanCharge(db.Model):
__tablename__ = 'loan_charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
code = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, default=0.0)
percent = db.Column(db.Float, default=0.0)
description = db.Column(db.Text, nullable=True)
due = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
loan = relationship(
"Loan",
primaryjoin="LoanCharge.loan_id == Loan.id",
foreign_keys=[loan_id],
back_populates="loan_charges",
)
def __repr__(self):
return f"<LoanCharge {self.id} - Loan {self.loan_id} - {self.code}>"
def to_dict(self):
"""
Convert the Loan charge object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'loanId': self.loan_id,
'transactionId': self.transaction_id,
'code': self.code,
'amount': self.amount,
'percent': self.percent,
'description': self.description,
'due': self.due
}
#get last penal
@classmethod
def get_last_penal_no(cls, loan_id):
"""
Returns the last penal number created for a loan.
Example:
PENAL1 -> returns 1
PENAL3 -> returns 3
If none exists, returns 0.
"""
last_penal = (
cls.query
.filter(cls.loan_id == loan_id)
.filter(cls.code.like("PENAL%"))
.order_by(cls.id.desc())
.first()
)
if not last_penal:
return 0
try:
return int(last_penal.code.replace("PENAL", ""))
except ValueError:
return 0
@classmethod
def get_penal_charges_by_loan_id(cls, loan_id):
"""
Returns all penal charges for a specific loan.
"""
return cls.query.filter(
cls.loan_id == loan_id,
cls.code.like("PENAL%")
).all()
@classmethod
def get_loan_charge_by_debt_id(cls, debt_id):
return cls.query.filter_by(loan_id=debt_id)
#create penal charge
@classmethod
def create_penal_charges_for_loan(cls, loan_id, transaction_id, percent, penal_no, schedule_number, penal_amount=0.0):
"""
Create a penal charge for a given loan and schedule.
"""
if loan_id is None:
raise ValueError("loan_id cannot be None")
code = f"PENAL{penal_no:02d}-SCHEDULE{schedule_number:02d}"
# Check if this penal charge already exists
existing = cls.query.filter_by(
loan_id=loan_id,
code=code
).first()
if existing:
return existing
now = datetime.now(timezone.utc)
penal_charge = cls(
loan_id=loan_id,
transaction_id=transaction_id,
code=code,
amount=penal_amount,
percent=percent,
description=f"Penal Charge {penal_no} for loan {loan_id} schedule {schedule_number}",
due=True,
due_date=now
)
try:
db.session.add(penal_charge)
db.session.commit()
except IntegrityError as err:
db.session.rollback()
raise ValueError(f"Database integrity error: {err}")
return penal_charge
+346 -335
View File
@@ -1,336 +1,347 @@
from datetime import datetime, timedelta, timezone
from app.extensions import db
from app.utils.logger import logger
from sqlalchemy.exc import SQLAlchemyError
from app.enums.repayment_schedule_status import RepaymentScheduleStatus
from app.config import settings
from decimal import Decimal, ROUND_HALF_UP
# from dateutil.relativedelta import relativedelta
class LoanRepaymentSchedule(db.Model):
__tablename__ = 'loan_repayment_schedules'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
product_id = db.Column(db.String(20), nullable=True)
installment_number = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=False)
installment_amount= db.Column(db.Float, default=0.0)
total_repayment_amount = db.Column(db.Float, default=0.0)
paid = db.Column(db.Boolean, default=False)
paid_at = db.Column(db.DateTime, nullable=True)
due_process_date = db.Column(db.DateTime, nullable=True)
due_process_count = db.Column(db.Integer, default=0)
paid_status = db.Column(db.String(20), nullable=True)
repay_description = db.Column(db.String(255), nullable=True)
partial_balance = db.Column(db.Float, default=0.0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
penal_charge = db.Column(db.Float, default=0.0)
penal_count = db.Column(db.Integer, default=0)
last_penal_date = db.Column(db.DateTime, nullable=True)
def to_dict(self):
return {
'id': self.id,
'loan_id': self.loan_id,
'product_id': self.product_id,
'transaction_id': self.transaction_id,
'installment_number': self.installment_number,
'due_date': self.due_date.isoformat() if self.due_date else None,
'installment_amount': self.installment_amount,
'total_repayment_amount': self.total_repayment_amount,
'paid': self.paid,
'due_process_date': self.due_process_date.isoformat() if self.due_process_date else None,
'due_process_count': self.due_process_count,
'paid_status': self.paid_status,
'repay_description': self.repay_description,
'partial_balance': self.partial_balance,
'paid_at': self.paid_at.isoformat() if self.paid_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'penal_charge': self.penal_charge,
'penal_count': self.penal_count,
'last_penal_date': self.last_penal_date.isoformat() if self.last_penal_date else None
}
def __repr__(self):
return f'<LoanRepaymentSchedule Loan:{self.loan_id} Installment:{self.installment_number}>'
@classmethod
def get_repayment_schedule_by_loan_id(cls, loan_id, include_paid=True):
"""
Get repayment schedules by loan ID.
:param loan_id: Loan ID to filter by
:param include_paid: If True, include all schedules. If False, only unpaid ones.
:return: List of repayment schedules ordered by due_date
"""
try:
query = cls.query.filter_by(loan_id=loan_id)
if not include_paid:
query = query.filter_by(paid=False)
schedules = query.order_by(cls.due_date.asc()).all()
return schedules
except Exception as e:
logger.error(f"Error fetching repayment schedules for loan {loan_id}: {e}")
raise
@classmethod
def get_repayment_schedule_by_id_and_transaction_id(cls, id, transaction_id):
"""
Get repayment schedule by ID and transaction ID
"""
try:
return cls.query.filter_by(id=id, transaction_id=transaction_id).first()
except Exception as e:
logger.error(f"Error fetching repayment schedule for id={id}, transaction_id={transaction_id}: {e}")
return None
@classmethod
def get_overdue_repayment_schedule(cls):
"""
Get all overdue repayment schedules that are not repaid.
"""
try:
return cls.query.filter(cls.due_date < datetime.now(timezone.utc), cls.paid == False).order_by(cls.due_date.asc()).all()
except Exception as e:
logger.error(f"Error fetching overdue repayment schedules: {e}")
return []
@classmethod
def get_active_overdue_repayment_schedule(cls):
"""
Get all overdue repayment schedules that are active.
"""
try:
return (
cls.query
.filter(
cls.due_date < datetime.now(timezone.utc),
cls.paid_status == RepaymentScheduleStatus.ACTIVE
)
.order_by(cls.due_date.asc())
.all()
)
except Exception as e:
logger.error(f"Error fetching active overdue repayment schedules: {e}")
return []
@classmethod
def get_overdue_repayment_schedule_with_grace_period(cls, grace_period_days, limit=None):
"""
Get all overdue repayment schedules that are not repaid and beyond the grace period.
"""
try:
grace_period_date = datetime.now(timezone.utc) - timedelta(days=grace_period_days)
return cls.query.filter(
cls.due_date < grace_period_date,
cls.paid == False
).order_by(cls.due_date.asc()).limit(limit).all()
except Exception as e:
logger.error(f"Error fetching overdue repayment schedules with grace period: {e}")
return []
@classmethod
def get_partially_paid_overdue_repayment_schedule(cls):
"""
Get all overdue repayment schedules that are partially paid.
"""
try:
return (
cls.query
.filter(
cls.due_date < datetime.now(timezone.utc),
cls.paid_status == RepaymentScheduleStatus.PARTIALLY_PAID
)
.order_by(cls.due_date.asc())
.all()
)
except Exception as e:
logger.error(f"Error fetching partially paid overdue repayment schedules: {e}")
return []
@classmethod
def get_repayment_schedule_by_transaction_id(cls, transaction_id):
"""
Get repayment schedule by transaction ID
"""
return cls.query.filter_by(transaction_id=transaction_id).all()
@classmethod
def update_repayment_schedule_description(cls, schedule_id, description):
"""
Update the repayment description for a specific schedule.
"""
try:
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
schedule.repay_description = description
schedule.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"Updated repayment description for schedule ID {schedule_id}")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating repayment description for schedule {schedule_id}: {e}")
raise
@classmethod
def update_repayment_schedule_status(cls, schedule_id):
"""
Mark a repayment schedule as fully repaid when the parent loan is fully repaid.
This function does not take amount_collected because the loan is already cleared.
"""
try:
# Fetch schedule
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
# Force balance to 0
schedule.partial_balance = 0.0
schedule.paid_status = RepaymentScheduleStatus.REPAID
schedule.paid = True
schedule.paid_at = datetime.now(timezone.utc)
# Track due processing
if schedule.due_process_count is None:
schedule.due_process_count = 0
schedule.due_process_count += 1
schedule.due_process_date = datetime.now(timezone.utc)
# Update timestamp
schedule.updated_at = datetime.now(timezone.utc)
# Commit changes
db.session.commit()
logger.info(f"Schedule {schedule_id} marked as REPAID since parent loan is fully repaid.")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating repayment schedule {schedule_id} after loan repayment: {e}")
raise
@classmethod
def update_repayment_schedule_status_to_active(cls, schedule_id):
"""
Update repayment schedule status to ACTIVE.
"""
try:
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
schedule.paid_status = RepaymentScheduleStatus.ACTIVE
schedule.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"Updated repayment schedule ID {schedule_id} status to ACTIVE")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating repayment schedule status for schedule {schedule_id}: {e}")
raise
@classmethod
def update_repayment_schedule_balance(cls, schedule_id, amount_collected):
"""
Apply repayment to a loan schedule:
- Deduct from partial balance if partially paid.
- Otherwise deduct from installment amount.
- Update partial balance, paid status, timestamps, etc.
"""
try:
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
# Normalize amount
amount_collected = Decimal(str(amount_collected)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
if amount_collected <= Decimal("0.00"):
logger.info("Repayment amount must be greater than zero.")
return schedule.to_dict()
# Determine current balance
if schedule.paid_status == RepaymentScheduleStatus.PARTIALLY_PAID and (schedule.partial_balance or 0) > 0:
balance = Decimal(str(schedule.partial_balance))
else:
balance = Decimal(str(schedule.installment_amount))
# Deduct repayment
new_balance = balance - amount_collected
if new_balance < 0:
new_balance = Decimal("0.00") # prevent negatives
# Update schedule fields
schedule.partial_balance = float(new_balance) if new_balance > 0 else 0.0
schedule.updated_at = datetime.now(timezone.utc)
if new_balance == 0:
schedule.paid_status = RepaymentScheduleStatus.REPAID
schedule.paid = True
schedule.paid_at = datetime.now(timezone.utc)
else:
schedule.paid_status = RepaymentScheduleStatus.PARTIALLY_PAID
schedule.paid = False # not fully paid yet
# Track due processing
if schedule.due_process_count is None:
schedule.due_process_count = 0
schedule.due_process_count += 1
schedule.due_process_date = datetime.now(timezone.utc)
# Commit
db.session.commit()
logger.info(f"Repayment applied for schedule ID {schedule_id}. Remaining balance: {schedule.partial_balance}")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error applying repayment for schedule {schedule_id}: {e}")
raise
@classmethod
def apply_penal_to_schedule(cls, schedule_id, penal_amount):
schedule = cls.query.get(schedule_id)
now = datetime.now(timezone.utc)
schedule.penal_count = (schedule.penal_count or 0) + 1
schedule.penal_charge = (schedule.penal_charge or 0) + penal_amount
schedule.last_penal_date = now
schedule.due_process_date = now
schedule.updated_at = now
db.session.commit()
@classmethod
def calculate_penal_charge(cls, schedule):
if schedule.paid_status == RepaymentScheduleStatus.PARTIALLY_PAID:
outstanding = Decimal(str(schedule.partial_balance))
else:
outstanding = Decimal(str(schedule.installment_amount))
rate = Decimal(str(settings.PENAL_CHARGE_PERCENTAGE)) / 100
penal_charge = (outstanding * rate).quantize(
Decimal("0.01"),
rounding=ROUND_HALF_UP
)
return penal_charge
from datetime import datetime, timedelta, timezone
from app.extensions import db
from app.utils.logger import logger
from sqlalchemy.exc import SQLAlchemyError
from sqlalchemy import or_
from app.enums.repayment_schedule_status import RepaymentScheduleStatus
from app.config import settings
from decimal import Decimal, ROUND_HALF_UP
# from dateutil.relativedelta import relativedelta
class LoanRepaymentSchedule(db.Model):
__tablename__ = 'loan_repayment_schedules'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
product_id = db.Column(db.String(20), nullable=True)
installment_number = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=False)
installment_amount= db.Column(db.Float, default=0.0)
total_repayment_amount = db.Column(db.Float, default=0.0)
paid = db.Column(db.Boolean, default=False)
paid_at = db.Column(db.DateTime, nullable=True)
due_process_date = db.Column(db.DateTime, nullable=True)
due_process_count = db.Column(db.Integer, default=0)
paid_status = db.Column(db.String(20), nullable=True)
repay_description = db.Column(db.String(255), nullable=True)
partial_balance = db.Column(db.Float, default=0.0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
penal_charge = db.Column(db.Float, default=0.0)
penal_count = db.Column(db.Integer, default=0)
last_penal_date = db.Column(db.DateTime, nullable=True)
def to_dict(self):
return {
'id': self.id,
'loan_id': self.loan_id,
'product_id': self.product_id,
'transaction_id': self.transaction_id,
'installment_number': self.installment_number,
'due_date': self.due_date.isoformat() if self.due_date else None,
'installment_amount': self.installment_amount,
'total_repayment_amount': self.total_repayment_amount,
'paid': self.paid,
'due_process_date': self.due_process_date.isoformat() if self.due_process_date else None,
'due_process_count': self.due_process_count,
'paid_status': self.paid_status,
'repay_description': self.repay_description,
'partial_balance': self.partial_balance,
'paid_at': self.paid_at.isoformat() if self.paid_at else None,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None,
'penal_charge': self.penal_charge,
'penal_count': self.penal_count,
'last_penal_date': self.last_penal_date.isoformat() if self.last_penal_date else None
}
def __repr__(self):
return f'<LoanRepaymentSchedule Loan:{self.loan_id} Installment:{self.installment_number}>'
@classmethod
def get_repayment_schedule_by_loan_id(cls, loan_id, include_paid=True):
"""
Get repayment schedules by loan ID.
:param loan_id: Loan ID to filter by
:param include_paid: If True, include all schedules. If False, only unpaid ones.
:return: List of repayment schedules ordered by due_date
"""
try:
query = cls.query.filter_by(loan_id=loan_id)
if not include_paid:
query = query.filter_by(paid=False)
schedules = query.order_by(cls.due_date.asc()).all()
return schedules
except Exception as e:
logger.error(f"Error fetching repayment schedules for loan {loan_id}: {e}")
raise
@classmethod
def get_repayment_schedule_by_id_and_transaction_id(cls, id, transaction_id):
"""
Get repayment schedule by ID and transaction ID
"""
try:
return cls.query.filter_by(id=id, transaction_id=transaction_id).first()
except Exception as e:
logger.error(f"Error fetching repayment schedule for id={id}, transaction_id={transaction_id}: {e}")
return None
@classmethod
def get_overdue_repayment_schedule(cls):
"""
Get all overdue repayment schedules that are not repaid.
"""
try:
return cls.query.filter(cls.due_date < datetime.now(timezone.utc), cls.paid == False).order_by(cls.due_date.asc()).all()
except Exception as e:
logger.error(f"Error fetching overdue repayment schedules: {e}")
return []
@classmethod
def get_active_overdue_repayment_schedule(cls):
"""
Get all overdue repayment schedules that are active.
"""
try:
return (
cls.query
.filter(
cls.due_date < datetime.now(timezone.utc),
cls.paid_status == RepaymentScheduleStatus.ACTIVE
)
.order_by(cls.due_date.asc())
.all()
)
except Exception as e:
logger.error(f"Error fetching active overdue repayment schedules: {e}")
return []
@classmethod
def get_overdue_repayment_schedule_with_grace_period(cls, grace_period_days, limit=None):
try:
now = datetime.now(timezone.utc)
grace_period_date = now - timedelta(days=grace_period_days)
penal_interval = timedelta(days=settings.PENAL_CHARGE_INTERVAL_DAYS)
return cls.query.filter(
cls.due_date < grace_period_date,
cls.paid == False,
or_(
cls.last_penal_date == None, # never penalized before
cls.last_penal_date < now - penal_interval
)
).order_by(cls.due_date.asc()).limit(limit).all()
except Exception as e:
logger.error(f"Error fetching overdue repayment schedules with grace period: {e}")
return []
@classmethod
def get_partially_paid_overdue_repayment_schedule(cls):
"""
Get all overdue repayment schedules that are partially paid.
"""
try:
return (
cls.query
.filter(
cls.due_date < datetime.now(timezone.utc),
cls.paid_status == RepaymentScheduleStatus.PARTIALLY_PAID
)
.order_by(cls.due_date.asc())
.all()
)
except Exception as e:
logger.error(f"Error fetching partially paid overdue repayment schedules: {e}")
return []
@classmethod
def get_repayment_schedule_by_transaction_id(cls, transaction_id):
"""
Get repayment schedule by transaction ID
"""
return cls.query.filter_by(transaction_id=transaction_id).all()
@classmethod
def update_repayment_schedule_description(cls, schedule_id, description):
"""
Update the repayment description for a specific schedule.
"""
try:
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
schedule.repay_description = description
schedule.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"Updated repayment description for schedule ID {schedule_id}")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating repayment description for schedule {schedule_id}: {e}")
raise
@classmethod
def update_repayment_schedule_status(cls, schedule_id):
"""
Mark a repayment schedule as fully repaid when the parent loan is fully repaid.
This function does not take amount_collected because the loan is already cleared.
"""
try:
# Fetch schedule
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
# Force balance to 0
schedule.partial_balance = 0.0
schedule.paid_status = RepaymentScheduleStatus.REPAID
schedule.paid = True
schedule.paid_at = datetime.now(timezone.utc)
# Track due processing
if schedule.due_process_count is None:
schedule.due_process_count = 0
schedule.due_process_count += 1
schedule.due_process_date = datetime.now(timezone.utc)
# Update timestamp
schedule.updated_at = datetime.now(timezone.utc)
# Commit changes
db.session.commit()
logger.info(f"Schedule {schedule_id} marked as REPAID since parent loan is fully repaid.")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating repayment schedule {schedule_id} after loan repayment: {e}")
raise
@classmethod
def update_repayment_schedule_status_to_active(cls, schedule_id):
"""
Update repayment schedule status to ACTIVE.
"""
try:
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
schedule.paid_status = RepaymentScheduleStatus.ACTIVE
schedule.updated_at = datetime.now(timezone.utc)
db.session.commit()
logger.info(f"Updated repayment schedule ID {schedule_id} status to ACTIVE")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating repayment schedule status for schedule {schedule_id}: {e}")
raise
@classmethod
def update_repayment_schedule_balance(cls, schedule_id, amount_collected):
"""
Apply repayment to a loan schedule:
- Deduct from partial balance if partially paid.
- Otherwise deduct from installment amount.
- Update partial balance, paid status, timestamps, etc.
"""
try:
schedule = cls.query.get(schedule_id)
if not schedule:
raise ValueError(f"Schedule with ID {schedule_id} does not exist.")
# Normalize amount
amount_collected = Decimal(str(amount_collected)).quantize(Decimal("0.01"), rounding=ROUND_HALF_UP)
if amount_collected <= Decimal("0.00"):
logger.info("Repayment amount must be greater than zero.")
return schedule.to_dict()
# Determine current balance
if schedule.paid_status == RepaymentScheduleStatus.PARTIALLY_PAID and (schedule.partial_balance or 0) > 0:
balance = Decimal(str(schedule.partial_balance))
else:
balance = Decimal(str(schedule.installment_amount))
# Deduct repayment
new_balance = balance - amount_collected
if new_balance < 0:
new_balance = Decimal("0.00") # prevent negatives
# Update schedule fields
schedule.partial_balance = float(new_balance) if new_balance > 0 else 0.0
schedule.updated_at = datetime.now(timezone.utc)
if new_balance == 0:
schedule.paid_status = RepaymentScheduleStatus.REPAID
schedule.paid = True
schedule.paid_at = datetime.now(timezone.utc)
else:
schedule.paid_status = RepaymentScheduleStatus.PARTIALLY_PAID
schedule.paid = False # not fully paid yet
# Track due processing
if schedule.due_process_count is None:
schedule.due_process_count = 0
schedule.due_process_count += 1
schedule.due_process_date = datetime.now(timezone.utc)
# Commit
db.session.commit()
logger.info(f"Repayment applied for schedule ID {schedule_id}. Remaining balance: {schedule.partial_balance}")
return schedule.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error applying repayment for schedule {schedule_id}: {e}")
raise
from decimal import Decimal
@classmethod
def apply_penal_to_schedule(cls, schedule_id, penal_amount):
schedule = cls.query.get(schedule_id)
now = datetime.now(timezone.utc)
penal_amount = Decimal(str(penal_amount))
current_penal = Decimal(str(schedule.penal_charge)) if schedule.penal_charge else Decimal("0")
schedule.penal_count = (schedule.penal_count or 0) + 1
schedule.penal_charge = current_penal + penal_amount
schedule.last_penal_date = now
schedule.due_process_date = now
schedule.updated_at = now
db.session.commit()
@classmethod
def calculate_penal_charge(cls, schedule):
if schedule.paid_status == RepaymentScheduleStatus.PARTIALLY_PAID:
outstanding = Decimal(str(schedule.partial_balance))
else:
outstanding = Decimal(str(schedule.installment_amount))
rate = Decimal(str(settings.PENAL_CHARGE_PERCENTAGE)) / 100
penal_charge = (outstanding * rate).quantize(
Decimal("0.01"),
rounding=ROUND_HALF_UP
)
return penal_charge
+259 -259
View File
@@ -1,260 +1,260 @@
from app.extensions import db
from datetime import datetime, timezone
from app.utils.logger import logger
from app.enums.loan_status import LoanStatus
from sqlalchemy.exc import IntegrityError
class Repayment(db.Model):
__tablename__ = "repayments"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
loan_id = db.Column(db.String(50), nullable=False)
customer_id = db.Column(db.String(50), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
transaction_id = db.Column(db.String(50), nullable=False)
initiated_by = db.Column(db.String(50), nullable=True)
salary_amount = db.Column(db.Float, nullable=True, default=0.0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
repay_date = db.Column(db.DateTime, nullable=True)
verify_date = db.Column(db.DateTime, nullable=True)
repay_result = db.Column(db.String(10), nullable=True)
repay_description = db.Column(db.String(100), nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
def __repr__(self):
return f'<Repayment {self.id}>'
def to_dict(self):
"""
Convert the Repayment object to a dictionary format for JSON serialization.
"""
return {
'Id': self.id,
"customerId": self.customer_id,
'loanId': self.loan_id,
'productId': self.product_id,
'repayResult': self.repay_result,
'repayDescription': self.repay_description,
'verifyResult': self.verify_result,
'verifyDescription': self.verify_description,
'transactionId': self.transaction_id,
'initiatedBy':self.initiated_by,
'salaryAmount':self.salary_amount,
'repayDate': self.repay_date.isoformat() if self.repay_date else None,
'VerifyDate': self.verify_date.isoformat() if self.verify_date else None,
}
@classmethod
def create_repayment(cls, repayment_data):
if repayment_data["LoanStatus"] not in [LoanStatus.ACTIVE, LoanStatus.START_REPAY,LoanStatus.ACTIVE_PARTIAL]:
raise ValueError(f"Repayment cannot be processed. Loan status: ({repayment_data['LoanStatus']})")
repayment = cls(
customer_id=repayment_data["customerId"],
loan_id=repayment_data["loanId"],
product_id=repayment_data["productId"],
transaction_id=repayment_data["transactionId"],
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
initiated_by= repayment_data["initiatedBy"],
salary_amount=repayment_data["salaryAmount"]
)
try:
db.session.add(repayment)
db.session.commit()
logger.info("Repayment record committed.")
return repayment
except IntegrityError as err:
logger.error(f"Database integrity error: {err}")
return {"error": "Integrity error", "details": str(err)}
@classmethod
def add_repayment(cls, data: dict):
"""
Create and persist a new repayment record.
"""
logger.info(f"Received repayment data: {data}")
try:
new_repayment = cls(
loan_id=data["loanId"],
customer_id=data["customerId"],
product_id=data.get("productId"),
transaction_id=data["transactionId"],
initiated_by=data.get("initiatedBy"),
salary_amount=float(data.get("salaryAmount", 0.0)),
repay_date=(
datetime.strptime(data["repayDate"], "%Y-%m-%d")
.replace(tzinfo=timezone.utc)
if data.get("repayDate")
else None
),
repay_result=data.get("repayResult"),
repay_description=data.get("repayDescription"),
verify_result=data.get("verifyResult"),
verify_description=data.get("verifyDescription"),
verify_date=(
datetime.strptime(data["verifyDate"], "%Y-%m-%d")
.replace(tzinfo=timezone.utc)
if data.get("verifyDate")
else None
),
)
db.session.add(new_repayment)
db.session.commit()
logger.info("Repayment record committed.")
return new_repayment
except Exception as e:
db.session.rollback()
logger.error(f"Error adding repayment data: {e}")
raise
@classmethod
def get_repayment_by_transaction_id(cls, transaction_id):
return cls.query.filter_by(transaction_id=transaction_id).first()
@classmethod
def get_repayment_by_id(cls, id):
return cls.query.filter_by(id=id).first()
@classmethod
def set_repay_date(cls, repayment_id, customer_id):
"""
Update the repay date of the loan with the given loan_id.
"""
# Retrieve repayment
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Check if customer_id matches
if repayment.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the repayment's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update repayment date
repayment.repay_date = current_time
# Commit changes to database
try:
logger.info(f"Updating repay date for repayment ID {repayment_id} to {current_time}")
db.session.commit()
return repayment.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update repay date: {e}")
raise e
@classmethod
def set_repay_verify_date(cls, repayment_id, customer_id):
"""
Update the repayment verify date of the loan with the given repayment_id.
"""
# Retrieve repayment
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Check if customer_id matches
if repayment.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the repayment's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update repayment verify_date
repayment.verify_date = current_time
# Commit changes to database
try:
logger.info(f"Updating repay verify date for repayment ID {repayment_id} to {current_time}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update repay verify date: {e}")
raise e
@classmethod
def set_repay_result(cls, repayment_id, result, description):
logger.info("repay result called")
"""
Update the repayment result and description of the repayment with the given repayment_id.
"""
# Retrieve loan
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Update repayment result and description
repayment.repay_result = result
repayment.repay_description = description
# Commit changes to database
try:
logger.info(f"Updating repayment result for repayment ID {repayment_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update repayment result: {e}")
raise
@classmethod
def set_verify_date_result(cls, repayment_id, result, description):
"""
Update the verify result and description of the repayment with the given repayment_id.
"""
# Retrieve repayment
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Update disburse result and description
repayment.verify_result = result
repayment.verify_description = description
# Commit changes to database
try:
logger.info(f"Updating verify result for repayment ID {repayment_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update verify result: {e}")
raise
@classmethod
def get_latest_repayment_without_repay_date(cls):
"""
Get the latest repayment without a repay date.
"""
return cls.query.filter(
cls.repay_date.is_(None)
).order_by(cls.created_at.desc()).first()
@classmethod
def get_latest_repayment_with_loanId(cls, loan_id):
"""
Get the latest repayment with loan Id.
"""
return cls.query.filter(
cls.loan_id == loan_id
).order_by(cls.created_at.desc()).first()
@classmethod
def get_latest_loan_with_repay_date(cls):
"""
Get the latest repayment with a repay date and no verification date.
"""
return cls.query.filter(
cls.repay_date.isnot(None),
cls.verify_date.is_(None)
from app.extensions import db
from datetime import datetime, timezone
from app.utils.logger import logger
from app.enums.loan_status import LoanStatus
from sqlalchemy.exc import IntegrityError
class Repayment(db.Model):
__tablename__ = "repayments"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
loan_id = db.Column(db.String(50), nullable=False)
customer_id = db.Column(db.String(50), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
transaction_id = db.Column(db.String(50), nullable=False)
initiated_by = db.Column(db.String(50), nullable=True)
salary_amount = db.Column(db.Float, nullable=True, default=0.0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
repay_date = db.Column(db.DateTime, nullable=True)
verify_date = db.Column(db.DateTime, nullable=True)
repay_result = db.Column(db.String(10), nullable=True)
repay_description = db.Column(db.String(100), nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
def __repr__(self):
return f'<Repayment {self.id}>'
def to_dict(self):
"""
Convert the Repayment object to a dictionary format for JSON serialization.
"""
return {
'Id': self.id,
"customerId": self.customer_id,
'loanId': self.loan_id,
'productId': self.product_id,
'repayResult': self.repay_result,
'repayDescription': self.repay_description,
'verifyResult': self.verify_result,
'verifyDescription': self.verify_description,
'transactionId': self.transaction_id,
'initiatedBy':self.initiated_by,
'salaryAmount':self.salary_amount,
'repayDate': self.repay_date.isoformat() if self.repay_date else None,
'VerifyDate': self.verify_date.isoformat() if self.verify_date else None,
}
@classmethod
def create_repayment(cls, repayment_data):
if repayment_data["LoanStatus"] not in [LoanStatus.ACTIVE, LoanStatus.START_REPAY,LoanStatus.ACTIVE_PARTIAL]:
raise ValueError(f"Repayment cannot be processed. Loan status: ({repayment_data['LoanStatus']})")
repayment = cls(
customer_id=repayment_data["customerId"],
loan_id=repayment_data["loanId"],
product_id=repayment_data["productId"],
transaction_id=repayment_data["transactionId"],
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
initiated_by= repayment_data["initiatedBy"],
salary_amount=repayment_data["salaryAmount"]
)
try:
db.session.add(repayment)
db.session.commit()
logger.info("Repayment record committed.")
return repayment
except IntegrityError as err:
logger.error(f"Database integrity error: {err}")
return {"error": "Integrity error", "details": str(err)}
@classmethod
def add_repayment(cls, data: dict):
"""
Create and persist a new repayment record.
"""
logger.info(f"Received repayment data: {data}")
try:
new_repayment = cls(
loan_id=data["loanId"],
customer_id=data["customerId"],
product_id=data.get("productId"),
transaction_id=data["transactionId"],
initiated_by=data.get("initiatedBy"),
salary_amount=float(data.get("salaryAmount", 0.0)),
repay_date=(
datetime.strptime(data["repayDate"], "%Y-%m-%d")
.replace(tzinfo=timezone.utc)
if data.get("repayDate")
else None
),
repay_result=data.get("repayResult"),
repay_description=data.get("repayDescription"),
verify_result=data.get("verifyResult"),
verify_description=data.get("verifyDescription"),
verify_date=(
datetime.strptime(data["verifyDate"], "%Y-%m-%d")
.replace(tzinfo=timezone.utc)
if data.get("verifyDate")
else None
),
)
db.session.add(new_repayment)
db.session.commit()
logger.info("Repayment record committed.")
return new_repayment
except Exception as e:
db.session.rollback()
logger.error(f"Error adding repayment data: {e}")
raise
@classmethod
def get_repayment_by_transaction_id(cls, transaction_id):
return cls.query.filter_by(transaction_id=transaction_id).first()
@classmethod
def get_repayment_by_id(cls, id):
return cls.query.filter_by(id=id).first()
@classmethod
def set_repay_date(cls, repayment_id, customer_id):
"""
Update the repay date of the loan with the given loan_id.
"""
# Retrieve repayment
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Check if customer_id matches
if repayment.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the repayment's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update repayment date
repayment.repay_date = current_time
# Commit changes to database
try:
logger.info(f"Updating repay date for repayment ID {repayment_id} to {current_time}")
db.session.commit()
return repayment.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update repay date: {e}")
raise e
@classmethod
def set_repay_verify_date(cls, repayment_id, customer_id):
"""
Update the repayment verify date of the loan with the given repayment_id.
"""
# Retrieve repayment
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Check if customer_id matches
if repayment.customer_id != customer_id:
raise ValueError(f"Customer ID {customer_id} does not match the repayment's customer ID.")
current_time = datetime.now()
logger.info(f"What is now ======= ==== ==> : {current_time}")
# Update repayment verify_date
repayment.verify_date = current_time
# Commit changes to database
try:
logger.info(f"Updating repay verify date for repayment ID {repayment_id} to {current_time}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update repay verify date: {e}")
raise e
@classmethod
def set_repay_result(cls, repayment_id, result, description):
logger.info("repay result called")
"""
Update the repayment result and description of the repayment with the given repayment_id.
"""
# Retrieve loan
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Update repayment result and description
repayment.repay_result = result
repayment.repay_description = description
# Commit changes to database
try:
logger.info(f"Updating repayment result for repayment ID {repayment_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update repayment result: {e}")
raise
@classmethod
def set_verify_date_result(cls, repayment_id, result, description):
"""
Update the verify result and description of the repayment with the given repayment_id.
"""
# Retrieve repayment
repayment = cls.query.get(repayment_id)
if not repayment:
raise ValueError(f"repayment with ID {repayment_id} does not exist.")
# Update disburse result and description
repayment.verify_result = result
repayment.verify_description = description
# Commit changes to database
try:
logger.info(f"Updating verify result for repayment ID {repayment_id} to {result} with description {description}")
db.session.commit()
except Exception as e:
db.session.rollback()
logger.error(f"Failed to update verify result: {e}")
raise
@classmethod
def get_latest_repayment_without_repay_date(cls):
"""
Get the latest repayment without a repay date.
"""
return cls.query.filter(
cls.repay_date.is_(None)
).order_by(cls.created_at.desc()).first()
@classmethod
def get_latest_repayment_with_loanId(cls, loan_id):
"""
Get the latest repayment with loan Id.
"""
return cls.query.filter(
cls.loan_id == loan_id
).order_by(cls.created_at.desc()).first()
@classmethod
def get_latest_loan_with_repay_date(cls):
"""
Get the latest repayment with a repay date and no verification date.
"""
return cls.query.filter(
cls.repay_date.isnot(None),
cls.verify_date.is_(None)
).order_by(cls.created_at.desc()).first()
+74 -74
View File
@@ -1,75 +1,75 @@
from datetime import datetime, timezone
from app.extensions import db
from app.utils.logger import logger
class RepaymentsData(db.Model):
__tablename__ = 'repayments_data'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
transaction_id = db.Column(db.String(50), nullable=False)
added_date = db.Column(db.DateTime(timezone=True), default=datetime.now(timezone.utc), nullable=False)
response_code = db.Column(db.String(10), nullable=True)
response_descr = db.Column(db.String(255), nullable=True)
fbn_transaction_id = db.Column(db.String(255),nullable=True)
account_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
repayment_amount = db.Column(db.Float, nullable=True)
amount_collected = db.Column(db.Float, nullable=True)
balance = db.Column(db.Float, nullable=True, default=0.0)
def to_dict(self):
return {
"id": self.id,
"transaction_id": self.transaction_id,
"added_date": self.added_date.isoformat() if self.added_date else None,
"response_code": self.response_code,
"response_descr": self.response_descr,
"customerId": self.customer_id,
"accountId": self.account_id,
"fbnTransactionId": self.fbn_transaction_id,
"repaymentAmount": self.repayment_amount,
"amountCollected": self.amount_collected,
"balance": self.balance
}
def __repr__(self):
return f"<RepaymentsData id={self.id}, transaction_id={self.transaction_id}>"
@classmethod
def add_repayment_data(cls, data):
"""
Add a new repayment data entry.
"""
try:
repayment_amount = float(data.get('repaymentAmount', 0.0))
amount_collected = float(data.get('amountCollected', 0.0))
if amount_collected < 0 or repayment_amount < 0:
raise ValueError("Amounts cannot be negative.")
account_balance = round(repayment_amount - amount_collected, 2)
new_data = cls(
transaction_id=data.get('transactionId'),
response_code=data.get('responseCode'),
response_descr=data.get('responseDescr'),
fbn_transaction_id=data.get('fbnTransactionId'),
account_id=data.get('accountId'),
customer_id=data.get('customerId'),
amount_collected=amount_collected,
repayment_amount=repayment_amount,
balance=account_balance,
)
db.session.add(new_data)
db.session.commit()
logger.info("Repayment data committed successfully")
return new_data
except Exception as e:
db.session.rollback()
logger.error(f"Error adding repayment data: {e}")
raise Exception(f"Error adding repayment data: {str(e)}")
from datetime import datetime, timezone
from app.extensions import db
from app.utils.logger import logger
class RepaymentsData(db.Model):
__tablename__ = 'repayments_data'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
transaction_id = db.Column(db.String(50), nullable=False)
added_date = db.Column(db.DateTime(timezone=True), default=datetime.now(timezone.utc), nullable=False)
response_code = db.Column(db.String(10), nullable=True)
response_descr = db.Column(db.String(255), nullable=True)
fbn_transaction_id = db.Column(db.String(255),nullable=True)
account_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
repayment_amount = db.Column(db.Float, nullable=True)
amount_collected = db.Column(db.Float, nullable=True)
balance = db.Column(db.Float, nullable=True, default=0.0)
def to_dict(self):
return {
"id": self.id,
"transaction_id": self.transaction_id,
"added_date": self.added_date.isoformat() if self.added_date else None,
"response_code": self.response_code,
"response_descr": self.response_descr,
"customerId": self.customer_id,
"accountId": self.account_id,
"fbnTransactionId": self.fbn_transaction_id,
"repaymentAmount": self.repayment_amount,
"amountCollected": self.amount_collected,
"balance": self.balance
}
def __repr__(self):
return f"<RepaymentsData id={self.id}, transaction_id={self.transaction_id}>"
@classmethod
def add_repayment_data(cls, data):
"""
Add a new repayment data entry.
"""
try:
repayment_amount = float(data.get('repaymentAmount', 0.0))
amount_collected = float(data.get('amountCollected', 0.0))
if amount_collected < 0 or repayment_amount < 0:
raise ValueError("Amounts cannot be negative.")
account_balance = round(repayment_amount - amount_collected, 2)
new_data = cls(
transaction_id=data.get('transactionId'),
response_code=data.get('responseCode'),
response_descr=data.get('responseDescr'),
fbn_transaction_id=data.get('fbnTransactionId'),
account_id=data.get('accountId'),
customer_id=data.get('customerId'),
amount_collected=amount_collected,
repayment_amount=repayment_amount,
balance=account_balance,
)
db.session.add(new_data)
db.session.commit()
logger.info("Repayment data committed successfully")
return new_data
except Exception as e:
db.session.rollback()
logger.error(f"Error adding repayment data: {e}")
raise Exception(f"Error adding repayment data: {str(e)}")
+98 -98
View File
@@ -1,98 +1,98 @@
from app.extensions import db
from datetime import datetime, timezone
from app.utils.logger import logger
class Salary(db.Model):
__tablename__ = "salaries"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
salary_date = db.Column(db.DateTime, nullable=True)
def __repr__(self):
return f'<Salary {self.id}>'
def to_dict(self):
"""
Convert the Salary object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'customerId': self.customer_id,
'accountId' : self.account_id,
'salaryAmount': self.amount,
'status': self.status,
'createdAt': self.created_at.isoformat() if self.created_at else None,
'updatedAt': self.updated_at.isoformat() if self.updated_at else None,
'salaryDate': self.salary_date.isoformat() if self.salary_date else None,
}
@classmethod
def add_salary_data(cls, data):
"""
Add a new salary data entry.
"""
logger.info(f"Received data:{data}")
try:
new_data = cls(
customer_id=data.get('customerId'),
amount=data.get('salaryAmount', 0.0),
status='START',
salary_date = datetime.strptime(data.get('salaryDate'), "%Y-%m-%d").date() if data.get('salaryDate') else None,
account_id=data.get('accountId')
)
db.session.add(new_data)
db.session.commit()
logger.info("Salary data has been committed.")
return new_data
except Exception as e:
db.session.rollback()
logger.info(f"error : {str(e)}")
raise Exception(f"Error adding salary data: {str(e)}")
@classmethod
def get_pending_salaries(cls):
"""
Retrieve all salary entries with status 'START', ordered by ID ascending.
"""
try:
return cls.query.filter_by(status='START').order_by(cls.id.asc()).all()
except Exception as e:
logger.error(f"Error fetching pending salaries: {str(e)}")
return []
@classmethod
def update_status(cls, salary_id, status):
"""
Update the status of the salary record with the given salary_id.
"""
try:
# Retrieve salary record
salary = cls.query.get(salary_id)
if not salary:
raise ValueError(f"Salary with ID {salary_id} does not exist.")
if salary.status == status:
return salary.to_dict() # Still return the current state if no change
# Update status and timestamp
salary.status = status
salary.updated_at = datetime.now(timezone.utc) # Manually update timestamp if not auto-updating
db.session.commit()
logger.info("Salary status updated and committed.")
return salary.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating salary status: {e}")
raise Exception(f"Error updating salary status: {str(e)}")
from app.extensions import db
from datetime import datetime, timezone
from app.utils.logger import logger
class Salary(db.Model):
__tablename__ = "salaries"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
salary_date = db.Column(db.DateTime, nullable=True)
def __repr__(self):
return f'<Salary {self.id}>'
def to_dict(self):
"""
Convert the Salary object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'customerId': self.customer_id,
'accountId' : self.account_id,
'salaryAmount': self.amount,
'status': self.status,
'createdAt': self.created_at.isoformat() if self.created_at else None,
'updatedAt': self.updated_at.isoformat() if self.updated_at else None,
'salaryDate': self.salary_date.isoformat() if self.salary_date else None,
}
@classmethod
def add_salary_data(cls, data):
"""
Add a new salary data entry.
"""
logger.info(f"Received data:{data}")
try:
new_data = cls(
customer_id=data.get('customerId'),
amount=data.get('salaryAmount', 0.0),
status='START',
salary_date = datetime.strptime(data.get('salaryDate'), "%Y-%m-%d").date() if data.get('salaryDate') else None,
account_id=data.get('accountId')
)
db.session.add(new_data)
db.session.commit()
logger.info("Salary data has been committed.")
return new_data
except Exception as e:
db.session.rollback()
logger.info(f"error : {str(e)}")
raise Exception(f"Error adding salary data: {str(e)}")
@classmethod
def get_pending_salaries(cls):
"""
Retrieve all salary entries with status 'START', ordered by ID ascending.
"""
try:
return cls.query.filter_by(status='START').order_by(cls.id.asc()).all()
except Exception as e:
logger.error(f"Error fetching pending salaries: {str(e)}")
return []
@classmethod
def update_status(cls, salary_id, status):
"""
Update the status of the salary record with the given salary_id.
"""
try:
# Retrieve salary record
salary = cls.query.get(salary_id)
if not salary:
raise ValueError(f"Salary with ID {salary_id} does not exist.")
if salary.status == status:
return salary.to_dict() # Still return the current state if no change
# Update status and timestamp
salary.status = status
salary.updated_at = datetime.now(timezone.utc) # Manually update timestamp if not auto-updating
db.session.commit()
logger.info("Salary status updated and committed.")
return salary.to_dict()
except Exception as e:
db.session.rollback()
logger.error(f"Error updating salary status: {e}")
raise Exception(f"Error updating salary status: {str(e)}")
+67 -67
View File
@@ -1,68 +1,68 @@
from app.extensions import db
from datetime import datetime, timezone
from app.utils.logger import logger
from sqlalchemy import and_, or_, not_
class Transaction(db.Model):
__tablename__ = "transactions"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
transaction_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
type = db.Column(db.String(50), nullable=False)
channel = db.Column(db.String(50), nullable=False)
phone_number = db.Column(db.String(50), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
@classmethod
def create_transaction(cls, transaction_id, account_id, customer_id, type, channel):
logger.error(f"**Setting Transaction {transaction_id} for Type {type}")
if cls.query.filter( and_( cls.transaction_id ==transaction_id, cls.type==type) ).first():
logger.error(f"Transaction already exists for {type}")
return '' # dont raise - do not crash beacause of this
transaction = cls(
transaction_id = transaction_id,
customer_id = customer_id,
account_id = account_id,
type = type,
channel = channel,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(transaction)
db.session.commit()
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return transaction
def __repr__(self):
return f'<Transaction {self.id}>'
def to_dict(self):
"""
Convert the Transaction object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'transaction_id': self.transaction_id,
'account_id': self.account_id,
'customer_id': self.customer_id,
'phone_number':self.phone_number,
'type': self.type,
'channel': self.channel,
}
@classmethod
def get_transaction_by_transaction_id(cls, transaction_id):
from app.extensions import db
from datetime import datetime, timezone
from app.utils.logger import logger
from sqlalchemy import and_, or_, not_
class Transaction(db.Model):
__tablename__ = "transactions"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
transaction_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
type = db.Column(db.String(50), nullable=False)
channel = db.Column(db.String(50), nullable=False)
phone_number = db.Column(db.String(50), nullable=True)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
@classmethod
def create_transaction(cls, transaction_id, account_id, customer_id, type, channel):
logger.error(f"**Setting Transaction {transaction_id} for Type {type}")
if cls.query.filter( and_( cls.transaction_id ==transaction_id, cls.type==type) ).first():
logger.error(f"Transaction already exists for {type}")
return '' # dont raise - do not crash beacause of this
transaction = cls(
transaction_id = transaction_id,
customer_id = customer_id,
account_id = account_id,
type = type,
channel = channel,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(transaction)
db.session.commit()
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return transaction
def __repr__(self):
return f'<Transaction {self.id}>'
def to_dict(self):
"""
Convert the Transaction object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'transaction_id': self.transaction_id,
'account_id': self.account_id,
'customer_id': self.customer_id,
'phone_number':self.phone_number,
'type': self.type,
'channel': self.channel,
}
@classmethod
def get_transaction_by_transaction_id(cls, transaction_id):
return cls.query.filter_by(transaction_id=transaction_id).first()