Custom templates fix

This commit is contained in:
CHIEFSOFT\ameye
2025-09-20 20:00:24 -04:00
parent ff8513292b
commit 4d894d8214
12 changed files with 762 additions and 732 deletions
+2
View File
@@ -18,10 +18,12 @@ from .payments import Payments
from .subscription_generative import SubscriptionGenerative
from .generative_results import GenerativeResults
from .office_users import OfficeUsers
from .custom_templates import CustomTemplates
__all__ = ['Members', 'Account', 'Products',
'MembersProducts', 'MembersActions', 'MembersPending', 'ProductsDetails',
'ProvisionActions', 'MembersProductsRefresh', 'MembersProductsSettings',
'PasswordReset', 'MembersProfile', 'SubscriptionOptions', 'SubscriptionOptionsItems',
'ProductsTemplates', 'Payments', 'PaymentsSession', 'SubscriptionGenerative', 'GenerativeResults',
'CustomTemplates',
'OfficeUsers']
+35
View File
@@ -0,0 +1,35 @@
from datetime import datetime, timezone
from app.extensions import db
from app.models.charge import Charge
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
class CustomTemplates(db.Model):
__tablename__ = 'custom_templates'
id = db.Column(db.String, primary_key=True)
uid = db.Column(db.String, nullable=False)
custom_id = db.Column(db.String, nullable=False)
provision_name = db.Column(db.String, nullable=False)
added = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
@classmethod
def get_template_by_custom_id(cls, custom_id):
templates = cls.query.filter_by(custom_id=str(custom_id)).all()
if not templates:
raise ValueError(f"Templates with Custom ID {custom_id} not found")
return templates
def to_dict(self):
return {
"id": self.id,
"uid": self.uid,
"custom_id": self.product_id,
"provision_name": self.name,
"added": self.added.isoformat() if self.added else None
}
def __repr__(self):
return f'<CustomTemplates {self.id}>'
+261 -261
View File
@@ -1,261 +1,261 @@
from datetime import datetime, timezone, timedelta
from itertools import product
from app.extensions import db
from app.models.customer import Customer
from app.models.account import Account
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship
from dateutil.relativedelta import relativedelta
from datetime import timedelta
import logging
from sqlalchemy import and_, or_, not_
from sqlalchemy.sql import func
logger = logging.getLogger(__name__)
class Loan(db.Model):
__tablename__ = 'loans'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
original_transaction = db.Column(db.String(50), nullable=True)
account_id = db.Column(db.String(50), nullable=False)
offer_id = db.Column(db.String(20), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
collection_type = db.Column(db.String(20), nullable=True)
current_loan_amount = db.Column(db.Float, nullable=True)
initial_loan_amount = db.Column(db.Float, nullable=False)
default_penalty_fee = db.Column(db.Float, default=0)
continuous_fee = db.Column(db.Float, default=0)
upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
installment_amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), default='pending')
tenor = db.Column(db.Integer, nullable=True)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
eligible_amount = db.Column(db.Float, nullable=True, default=0.0)
disburse_date = db.Column(db.DateTime, nullable=True)
disburse_verify = db.Column(db.DateTime, nullable=True)
reference = db.Column(db.String(50), nullable=True)
disburse_result = db.Column(db.String(10), nullable=True)
disburse_description = db.Column(db.String(100), nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
# customer = relationship(
# "Customer",
# primaryjoin="Customer.id == Loan.customer_id",
# foreign_keys=[customer_id],
# back_populates="loans",
# )
loan_charges = relationship(
"LoanCharge",
primaryjoin="LoanCharge.loan_id == Loan.id",
foreign_keys="LoanCharge.loan_id",
back_populates="loan",
)
loan_repayment_schedules = relationship(
"LoanRepaymentSchedule",
primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
foreign_keys="LoanRepaymentSchedule.loan_id",
back_populates="loan",
)
@classmethod
def create_loan(
cls,
customer_id,
account_id,
offer_id,
product_id,
initial_loan_amount,
collection_type,
transaction_id,
original_transaction,
upfront_fee,
repayment_amount,
installment_amount,
tenor,
eligible_amount,
reference,
status = "pending",
):
# Check if customer exists
customer = Customer.is_valid_customer(customer_id)
if not customer:
raise ValueError("Customer does not exist")
now = datetime.now(timezone.utc)
due_date = now + timedelta(days=tenor)
# Create and save the loan
loan = cls(
customer_id = customer_id,
account_id = account_id,
offer_id = offer_id,
product_id = product_id,
collection_type = collection_type,
transaction_id = transaction_id,
original_transaction = original_transaction,
initial_loan_amount = initial_loan_amount,
current_loan_amount = initial_loan_amount,
upfront_fee = upfront_fee,
repayment_amount = repayment_amount,
installment_amount = installment_amount,
due_date=due_date,
tenor = tenor,
status = status,
eligible_amount =eligible_amount,
reference = reference,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(loan)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return loan
@classmethod
def has_active_loans(cls, customer_id):
active_loans = cls.query.filter_by(
customer_id=customer_id,
status='active'
).count()
if active_loans > 0:
return False
return True
@classmethod
def get_customer_loan(cls, loan_id, customer_id):
"""
Get customer's active loans by loan_id.
"""
loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first()
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.")
return loan
@classmethod
def get_customer_original_loan(cls, customer_id, original_transaction):
"""
Get customer's original loan offer.
"""
original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first()
if not original_loan:
return None
logger.info(f" get_customer_original_loan ==>>>> {original_loan}")
return original_loan
@classmethod
def get_customer_last_loan(cls, customer_id):
"""
Get customer's active loans.
"""
logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}")
# loan = cls.query.filter_by( cls.customer_id == customer_id).first()
loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first()
if not loan:
return None
# loan = {
# "original_transaction":"",
# "eligible_amount": 0,
# "loan_amount": 0,
# "customer_id": customer_id,
# "transaction_id": "",
# "resultDescription": "No Active Loan"
# }
logger.info(f" get_customer_last_loan ==>>>> {loan}")
return loan
@classmethod
def get_active_loans_by_original_transaction(cls, original_transaction_id):
"""
Get all active loans with the same original_transaction ID.
"""
active_loans = cls.query.filter_by(
original_transaction=original_transaction_id,
# status='active'
).all()
return active_loans
@classmethod
def update_status(cls, loan_id, status):
"""
Update the status of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
if loan.status == status:
return
# Update loan status and the updated_at timestamp
loan.status = status
@classmethod
def get_daily_loan_count(cls, customer_id, product_id):
"""
Returns the count of loans created today for a customer.
"""
start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = start_of_day + timedelta(days=1)
return cls.query.filter_by(
customer_id=customer_id,
product_id=product_id,
).filter(
cls.created_at >= start_of_day,
cls.created_at < end_of_day
).count()
def to_dict(self):
"""
Convert the Loan object to a dictionary format for JSON serialization.
"""
return {
'debtId': self.id,
'transactionId': self.transaction_id,
'loanRef': self.reference,
'productId': self.product_id,
'initialLoanAmount': self.initial_loan_amount,
'currentLoanAmount': self.current_loan_amount,
'defaultPenaltyFee': self.default_penalty_fee,
'continuousFee': self.continuous_fee,
'collectionType': self.collection_type,
'upfrontFee': self.upfront_fee,
'repaymentAmount': self.repayment_amount,
'installmentAmount': self.installment_amount,
'status': self.status,
'tenor': self.tenor,
'dueDate': self.due_date.isoformat() if self.due_date else None,
'loanDate': self.created_at.isoformat() if self.created_at else None,
}
def __repr__(self):
return f'<Loan {self.id}>'
# from datetime import datetime, timezone, timedelta
# from itertools import product
# from app.extensions import db
# from app.models.customer import Customer
# from app.models.account import Account
# from sqlalchemy.exc import IntegrityError
# from sqlalchemy.orm import relationship
# from dateutil.relativedelta import relativedelta
# from datetime import timedelta
# import logging
# from sqlalchemy import and_, or_, not_
# from sqlalchemy.sql import func
#
# logger = logging.getLogger(__name__)
#
#
# class Loan(db.Model):
# __tablename__ = 'loans'
#
# id = db.Column(
# db.Integer,
# primary_key=True,
# autoincrement=True,
# )
# customer_id = db.Column(db.String(50), nullable=False)
# transaction_id = db.Column(db.String(50), nullable=True)
# original_transaction = db.Column(db.String(50), nullable=True)
# account_id = db.Column(db.String(50), nullable=False)
# offer_id = db.Column(db.String(20), nullable=False)
# product_id = db.Column(db.String(20), nullable=True)
# collection_type = db.Column(db.String(20), nullable=True)
# current_loan_amount = db.Column(db.Float, nullable=True)
# initial_loan_amount = db.Column(db.Float, nullable=False)
# default_penalty_fee = db.Column(db.Float, default=0)
# continuous_fee = db.Column(db.Float, default=0)
# upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
# repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
# installment_amount = db.Column(db.Float, nullable=True, default=0.0)
# status = db.Column(db.String(20), default='pending')
# tenor = db.Column(db.Integer, nullable=True)
# due_date = db.Column(db.DateTime, nullable=True)
# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# eligible_amount = db.Column(db.Float, nullable=True, default=0.0)
# disburse_date = db.Column(db.DateTime, nullable=True)
# disburse_verify = db.Column(db.DateTime, nullable=True)
# reference = db.Column(db.String(50), nullable=True)
# disburse_result = db.Column(db.String(10), nullable=True)
# disburse_description = db.Column(db.String(100), nullable=True)
# verify_result = db.Column(db.String(10), nullable=True)
# verify_description = db.Column(db.String(100), nullable=True)
#
# # customer = relationship(
# # "Customer",
# # primaryjoin="Customer.id == Loan.customer_id",
# # foreign_keys=[customer_id],
# # back_populates="loans",
# # )
#
# loan_charges = relationship(
# "LoanCharge",
# primaryjoin="LoanCharge.loan_id == Loan.id",
# foreign_keys="LoanCharge.loan_id",
# back_populates="loan",
# )
#
# loan_repayment_schedules = relationship(
# "LoanRepaymentSchedule",
# primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
# foreign_keys="LoanRepaymentSchedule.loan_id",
# back_populates="loan",
# )
#
#
# @classmethod
# def create_loan(
# cls,
# customer_id,
# account_id,
# offer_id,
# product_id,
# initial_loan_amount,
# collection_type,
# transaction_id,
# original_transaction,
# upfront_fee,
# repayment_amount,
# installment_amount,
# tenor,
# eligible_amount,
# reference,
# status = "pending",
# ):
# # Check if customer exists
# customer = Customer.is_valid_customer(customer_id)
# if not customer:
# raise ValueError("Customer does not exist")
#
# now = datetime.now(timezone.utc)
# due_date = now + timedelta(days=tenor)
#
# # Create and save the loan
# loan = cls(
# customer_id = customer_id,
# account_id = account_id,
# offer_id = offer_id,
# product_id = product_id,
# collection_type = collection_type,
# transaction_id = transaction_id,
# original_transaction = original_transaction,
# initial_loan_amount = initial_loan_amount,
# current_loan_amount = initial_loan_amount,
# upfront_fee = upfront_fee,
# repayment_amount = repayment_amount,
# installment_amount = installment_amount,
# due_date=due_date,
# tenor = tenor,
# status = status,
# eligible_amount =eligible_amount,
# reference = reference,
# created_at=datetime.now(timezone.utc),
# updated_at=datetime.now(timezone.utc)
# )
#
# try:
# db.session.add(loan)
# except IntegrityError as err:
# raise ValueError(f"Database integrity error: {err}")
# return loan
#
# @classmethod
# def has_active_loans(cls, customer_id):
# active_loans = cls.query.filter_by(
# customer_id=customer_id,
# status='active'
# ).count()
#
# if active_loans > 0:
# return False
# return True
#
#
# @classmethod
# def get_customer_loan(cls, loan_id, customer_id):
# """
# Get customer's active loans by loan_id.
# """
# loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first()
# if not loan:
# raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.")
# return loan
#
# @classmethod
# def get_customer_original_loan(cls, customer_id, original_transaction):
# """
# Get customer's original loan offer.
# """
# original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first()
# if not original_loan:
# return None
#
# logger.info(f" get_customer_original_loan ==>>>> {original_loan}")
# return original_loan
#
# @classmethod
# def get_customer_last_loan(cls, customer_id):
# """
# Get customer's active loans.
# """
# logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}")
# # loan = cls.query.filter_by( cls.customer_id == customer_id).first()
# loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first()
#
# if not loan:
# return None
# # loan = {
# # "original_transaction":"",
# # "eligible_amount": 0,
# # "loan_amount": 0,
# # "customer_id": customer_id,
# # "transaction_id": "",
# # "resultDescription": "No Active Loan"
# # }
# logger.info(f" get_customer_last_loan ==>>>> {loan}")
# return loan
#
# @classmethod
# def get_active_loans_by_original_transaction(cls, original_transaction_id):
# """
# Get all active loans with the same original_transaction ID.
# """
#
# active_loans = cls.query.filter_by(
# original_transaction=original_transaction_id,
# # status='active'
# ).all()
#
# return active_loans
#
#
# @classmethod
# def update_status(cls, loan_id, status):
# """
# Update the status of the loan with the given loan_id.
# """
# # Retrieve loan
# loan = cls.query.get(loan_id)
#
# if not loan:
# raise ValueError(f"Loan with ID {loan_id} does not exist.")
#
# if loan.status == status:
# return
#
# # Update loan status and the updated_at timestamp
# loan.status = status
#
#
# @classmethod
# def get_daily_loan_count(cls, customer_id, product_id):
# """
# Returns the count of loans created today for a customer.
# """
#
# start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
# end_of_day = start_of_day + timedelta(days=1)
#
# return cls.query.filter_by(
# customer_id=customer_id,
# product_id=product_id,
# ).filter(
# cls.created_at >= start_of_day,
# cls.created_at < end_of_day
# ).count()
#
#
# def to_dict(self):
# """
# Convert the Loan object to a dictionary format for JSON serialization.
# """
# return {
# 'debtId': self.id,
# 'transactionId': self.transaction_id,
# 'loanRef': self.reference,
# 'productId': self.product_id,
# 'initialLoanAmount': self.initial_loan_amount,
# 'currentLoanAmount': self.current_loan_amount,
# 'defaultPenaltyFee': self.default_penalty_fee,
# 'continuousFee': self.continuous_fee,
# 'collectionType': self.collection_type,
# 'upfrontFee': self.upfront_fee,
# 'repaymentAmount': self.repayment_amount,
# 'installmentAmount': self.installment_amount,
# 'status': self.status,
# 'tenor': self.tenor,
# 'dueDate': self.due_date.isoformat() if self.due_date else None,
# 'loanDate': self.created_at.isoformat() if self.created_at else None,
# }
#
# def __repr__(self):
# return f'<Loan {self.id}>'
+91 -91
View File
@@ -1,91 +1,91 @@
from datetime import datetime, timezone, timedelta
from app.extensions import db
from sqlalchemy.orm import relationship
from app.utils.logger import logger
from sqlalchemy.sql import func
class LoanCharge(db.Model):
__tablename__ = 'loan_charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
code = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, default=0.0)
percent = db.Column(db.Float, default=0.0)
description = db.Column(db.Text, nullable=True)
due = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# loan = relationship(
# "Loan",
# primaryjoin="LoanCharge.loan_id == Loan.id",
# foreign_keys=[loan_id],
# back_populates="loan_charges",
# )
@classmethod
def create_charges_for_loan(cls, loan_id, transaction_id, charges, referenced_amount = 0.0):
"""
Create loan charges for a given loan.
Args:
loan_id (int): ID of the loan to associate charges with.
charges (list): A list of dictionaries with keys:
code (str), amount (float), percent (float), description (str), due (int)
"""
# if not charges or not isinstance(charges, list):
# raise ValueError("Charges must be a non-empty list of dictionaries")
if loan_id is None:
raise ValueError("loan_id cannot be None")
loan_charges = []
now = datetime.now(timezone.utc)
subset_keys = ['interest', 'management', 'insurance', 'vat']
for item in subset_keys:
charge = charges[item]
due_days = charge['due_days'] # getattr(charge, "due_days", 0)
amount = charge['fee'] # getattr(charge, "fee", 0.0)
percent = charge['rate'] # getattr(charge, "rate", 0.0)
code = charge['code'] # getattr(charge, "code","")
description = charge['description'] # getattr(charge, "description", "")
charge_obj = cls(
loan_id = loan_id,
transaction_id = transaction_id,
code = code,
amount = round(amount, 2),
percent = percent,
description = description,
due = due_days,
due_date = now + timedelta(days=due_days),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.session.add(charge_obj)
loan_charges.append(charge_obj)
return loan_charges
def to_dict(self):
return {
'id': self.id,
'loanId': self.loan_id,
'transactionId': self.transaction_id,
'code': self.code,
'amount': self.amount,
'percent': self.percent,
'description': self.description,
'due': self.due,
}
def __repr__(self):
return f"<LoanCharge {self.id} - Loan {self.loan_id} - {self.code}>"
# from datetime import datetime, timezone, timedelta
# from app.extensions import db
# from sqlalchemy.orm import relationship
# from app.utils.logger import logger
# from sqlalchemy.sql import func
#
#
# class LoanCharge(db.Model):
# __tablename__ = 'loan_charges'
#
# id = db.Column(db.Integer, primary_key=True, autoincrement=True)
# loan_id = db.Column(db.Integer, nullable=False)
# transaction_id = db.Column(db.String(50), nullable=True)
# code = db.Column(db.String(50), nullable=False)
# amount = db.Column(db.Float, default=0.0)
# percent = db.Column(db.Float, default=0.0)
# description = db.Column(db.Text, nullable=True)
# due = db.Column(db.Integer, nullable=False)
# due_date = db.Column(db.DateTime, nullable=True)
# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# # loan = relationship(
# # "Loan",
# # primaryjoin="LoanCharge.loan_id == Loan.id",
# # foreign_keys=[loan_id],
# # back_populates="loan_charges",
# # )
#
# @classmethod
# def create_charges_for_loan(cls, loan_id, transaction_id, charges, referenced_amount = 0.0):
# """
# Create loan charges for a given loan.
#
# Args:
# loan_id (int): ID of the loan to associate charges with.
# charges (list): A list of dictionaries with keys:
# code (str), amount (float), percent (float), description (str), due (int)
# """
# # if not charges or not isinstance(charges, list):
# # raise ValueError("Charges must be a non-empty list of dictionaries")
#
# if loan_id is None:
# raise ValueError("loan_id cannot be None")
#
# loan_charges = []
# now = datetime.now(timezone.utc)
#
#
# subset_keys = ['interest', 'management', 'insurance', 'vat']
# for item in subset_keys:
# charge = charges[item]
# due_days = charge['due_days'] # getattr(charge, "due_days", 0)
# amount = charge['fee'] # getattr(charge, "fee", 0.0)
# percent = charge['rate'] # getattr(charge, "rate", 0.0)
# code = charge['code'] # getattr(charge, "code","")
# description = charge['description'] # getattr(charge, "description", "")
#
# charge_obj = cls(
# loan_id = loan_id,
# transaction_id = transaction_id,
# code = code,
# amount = round(amount, 2),
# percent = percent,
# description = description,
# due = due_days,
# due_date = now + timedelta(days=due_days),
# created_at=datetime.now(timezone.utc),
# updated_at=datetime.now(timezone.utc)
# )
#
# db.session.add(charge_obj)
# loan_charges.append(charge_obj)
#
# return loan_charges
#
#
#
# def to_dict(self):
# return {
# 'id': self.id,
# 'loanId': self.loan_id,
# 'transactionId': self.transaction_id,
# 'code': self.code,
# 'amount': self.amount,
# 'percent': self.percent,
# 'description': self.description,
# 'due': self.due,
# }
#
# def __repr__(self):
# return f"<LoanCharge {self.id} - Loan {self.loan_id} - {self.code}>"
+72 -72
View File
@@ -1,72 +1,72 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.orm import relationship
from dateutil.relativedelta import relativedelta
from sqlalchemy.sql import func
class LoanRepaymentSchedule(db.Model):
__tablename__ = 'loan_repayment_schedules'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
product_id = db.Column(db.String(20), nullable=True)
installment_number = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=False)
installment_amount= db.Column(db.Float, default=0.0)
total_repayment_amount = db.Column(db.Float, default=0.0)
paid = db.Column(db.Boolean, default=False)
paid_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# loan = relationship(
# "Loan",
# primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
# foreign_keys=[loan_id],
# back_populates="loan_repayment_schedules",
# )
@classmethod
def add_repayment_schedule(cls, loan, num_schedules, transaction_id):
"""
Add repayment schedules for a given loan.
"""
now = datetime.now(timezone.utc)
schedules = []
for i in range(num_schedules):
due_date = now + relativedelta(months=i + 1)
schedule = LoanRepaymentSchedule(
loan_id=loan.id,
installment_number=i + 1,
due_date=due_date,
total_repayment_amount = round(loan.repayment_amount, 2),
installment_amount=round(loan.installment_amount, 2),
product_id = loan.product_id,
transaction_id = transaction_id,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.session.add(schedule)
schedules.append(schedule)
return schedules
def to_dict(self):
return {
'id': self.id,
'loanId': self.loan_id,
'installmentNumber': self.installment_number,
'dueDate': self.due_date.isoformat(),
'principalAmount': self.principal_amount,
'interestAmount': self.interest_amount,
'totalInstallment': self.total_installment,
'paid': self.paid,
'paidAt': self.paid_at.isoformat() if self.paid_at else None
}
def __repr__(self):
return f'<LoanRepaymentSchedule Loan:{self.loan_id} Installment:{self.installment_number}>'
# from datetime import datetime, timezone
# from app.extensions import db
# from sqlalchemy.orm import relationship
# from dateutil.relativedelta import relativedelta
# from sqlalchemy.sql import func
#
# class LoanRepaymentSchedule(db.Model):
# __tablename__ = 'loan_repayment_schedules'
#
# id = db.Column(db.Integer, primary_key=True, autoincrement=True)
# loan_id = db.Column(db.Integer, nullable=False)
# transaction_id = db.Column(db.String(50), nullable=True)
# product_id = db.Column(db.String(20), nullable=True)
# installment_number = db.Column(db.Integer, nullable=False)
# due_date = db.Column(db.DateTime, nullable=False)
# installment_amount= db.Column(db.Float, default=0.0)
# total_repayment_amount = db.Column(db.Float, default=0.0)
# paid = db.Column(db.Boolean, default=False)
# paid_at = db.Column(db.DateTime, nullable=True)
#
# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# # loan = relationship(
# # "Loan",
# # primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
# # foreign_keys=[loan_id],
# # back_populates="loan_repayment_schedules",
# # )
#
#
# @classmethod
# def add_repayment_schedule(cls, loan, num_schedules, transaction_id):
# """
# Add repayment schedules for a given loan.
# """
# now = datetime.now(timezone.utc)
# schedules = []
#
# for i in range(num_schedules):
# due_date = now + relativedelta(months=i + 1)
# schedule = LoanRepaymentSchedule(
# loan_id=loan.id,
# installment_number=i + 1,
# due_date=due_date,
# total_repayment_amount = round(loan.repayment_amount, 2),
# installment_amount=round(loan.installment_amount, 2),
# product_id = loan.product_id,
# transaction_id = transaction_id,
# created_at=datetime.now(timezone.utc),
# updated_at=datetime.now(timezone.utc)
# )
#
# db.session.add(schedule)
# schedules.append(schedule)
#
# return schedules
#
# def to_dict(self):
# return {
# 'id': self.id,
# 'loanId': self.loan_id,
# 'installmentNumber': self.installment_number,
# 'dueDate': self.due_date.isoformat(),
# 'principalAmount': self.principal_amount,
# 'interestAmount': self.interest_amount,
# 'totalInstallment': self.total_installment,
# 'paid': self.paid,
# 'paidAt': self.paid_at.isoformat() if self.paid_at else None
# }
#
# def __repr__(self):
# return f'<LoanRepaymentSchedule Loan:{self.loan_id} Installment:{self.installment_number}>'
+94 -94
View File
@@ -1,94 +1,94 @@
from datetime import datetime, timezone
from app.extensions import db
from app.models.charge import Charge
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
class Offer(db.Model):
__tablename__ = 'offers'
id = db.Column(db.String, primary_key=True)
product_id = db.Column(db.String, nullable=False)
min_amount = db.Column(db.Float, nullable=False)
max_amount = db.Column(db.Float, nullable=False)
tenor = db.Column(db.Integer, nullable=False)
schedule = db.Column(db.Integer, nullable=True)
interest_rate = db.Column(db.Float, default=3.0)
management_rate = db.Column(db.Float, default=1.0)
insurance_rate = db.Column(db.Float, default=1.0)
vat_rate = db.Column(db.Float, default=7.5)
list_order = db.Column(db.Integer, nullable=True)
max_daily_loans = db.Column(db.Integer, nullable=True)
max_active_loans = db.Column(db.Integer, nullable=True)
max_life_loans = db.Column(db.Integer, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
# charges = relationship(
# "Charge",
# primaryjoin="Offer.id == Charge.offer_id",
# foreign_keys="Charge.offer_id",
# back_populates="offer",
# )
@classmethod
def get_all_offers(cls):
"""
Return all offers in dictionary format.
"""
offers = cls.query.all()
if not offers:
raise ValueError(f"No available offers")
return offers
@classmethod
def is_valid_offer(cls, offer_id):
offer = cls.query.filter_by(id=str(offer_id)).first()
if not offer:
return False
return offer
@classmethod
def get_offer_by_id(cls, offer_id):
"""
Return an offer by its ID.
"""
offer = cls.query.filter_by(id=str(offer_id)).first()
if not offer:
raise ValueError(f"Offer with ID {offer_id} not found")
return offer
@classmethod
def get_offer_by_product_id(cls, product_id):
"""
Return an offer by its product ID.
"""
offer = cls.query.filter_by(product_id=str(product_id)).first()
if not offer:
raise ValueError(f"Offer with Product ID {product_id} not found")
return offer
def to_dict(self):
return {
"offerId": self.id,
"productId": self.product_id,
"minAmount": self.min_amount,
"maxAmount": self.max_amount,
"tenor": self.tenor,
"interest_rate": self.interest_rate,
"management_rate": self.management_rate,
"insurance_rate": self.insurance_rate,
"vat_rate": self.vat_rate,
"maxDailyLoans": self.max_daily_loans,
"maxActiveLoans": self.max_active_loans,
"maxLifeLoans": self.max_life_loans
}
def __repr__(self):
return f'<LoanOffer {self.id}>'
# from datetime import datetime, timezone
# from app.extensions import db
# from app.models.charge import Charge
# from sqlalchemy.orm import relationship
# from sqlalchemy.sql import func
#
# class Offer(db.Model):
# __tablename__ = 'offers'
#
# id = db.Column(db.String, primary_key=True)
# product_id = db.Column(db.String, nullable=False)
# min_amount = db.Column(db.Float, nullable=False)
# max_amount = db.Column(db.Float, nullable=False)
# tenor = db.Column(db.Integer, nullable=False)
# schedule = db.Column(db.Integer, nullable=True)
# interest_rate = db.Column(db.Float, default=3.0)
# management_rate = db.Column(db.Float, default=1.0)
# insurance_rate = db.Column(db.Float, default=1.0)
# vat_rate = db.Column(db.Float, default=7.5)
# list_order = db.Column(db.Integer, nullable=True)
# max_daily_loans = db.Column(db.Integer, nullable=True)
# max_active_loans = db.Column(db.Integer, nullable=True)
# max_life_loans = db.Column(db.Integer, nullable=True)
# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
#
# # charges = relationship(
# # "Charge",
# # primaryjoin="Offer.id == Charge.offer_id",
# # foreign_keys="Charge.offer_id",
# # back_populates="offer",
# # )
#
# @classmethod
# def get_all_offers(cls):
# """
# Return all offers in dictionary format.
# """
# offers = cls.query.all()
#
# if not offers:
# raise ValueError(f"No available offers")
# return offers
#
# @classmethod
# def is_valid_offer(cls, offer_id):
# offer = cls.query.filter_by(id=str(offer_id)).first()
#
#
# if not offer:
# return False
# return offer
#
# @classmethod
# def get_offer_by_id(cls, offer_id):
# """
# Return an offer by its ID.
# """
# offer = cls.query.filter_by(id=str(offer_id)).first()
#
# if not offer:
# raise ValueError(f"Offer with ID {offer_id} not found")
# return offer
#
# @classmethod
# def get_offer_by_product_id(cls, product_id):
# """
# Return an offer by its product ID.
# """
# offer = cls.query.filter_by(product_id=str(product_id)).first()
#
# if not offer:
# raise ValueError(f"Offer with Product ID {product_id} not found")
# return offer
#
# def to_dict(self):
# return {
# "offerId": self.id,
# "productId": self.product_id,
# "minAmount": self.min_amount,
# "maxAmount": self.max_amount,
# "tenor": self.tenor,
# "interest_rate": self.interest_rate,
# "management_rate": self.management_rate,
# "insurance_rate": self.insurance_rate,
# "vat_rate": self.vat_rate,
# "maxDailyLoans": self.max_daily_loans,
# "maxActiveLoans": self.max_active_loans,
# "maxLifeLoans": self.max_life_loans
#
# }
#
# def __repr__(self):
# return f'<LoanOffer {self.id}>'