Added offer and charge

This commit is contained in:
Azeez Muibi
2025-04-30 12:48:34 +01:00
parent d1b9d80c84
commit 9bec2b2e9f
15 changed files with 775 additions and 28 deletions
+3 -1
View File
@@ -6,5 +6,7 @@ from .user import User
from .repayment import Repayment
from .loan_charge import LoanCharge
from .loan_repayment_schedule import LoanRepaymentSchedule
from .charge import Charge
from .offer import Offer
__all__ = ['Customer', 'Account', 'Loan', 'Transaction', 'User', 'Repayment', 'LoanCharge', 'LoanRepaymentSchedule']
__all__ = ['Customer', 'Account', 'Loan', 'Transaction', 'User', 'Repayment', 'LoanCharge', 'LoanRepaymentSchedule', 'Charge', 'Offer']
+82
View File
@@ -0,0 +1,82 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.orm import relationship
class Charge(db.Model):
__tablename__ = 'charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
offer_id = db.Column(db.String, db.ForeignKey('offers.id'), nullable=False)
code = db.Column(db.String(50), nullable=False)
percent = db.Column(db.Float, nullable=False)
description = db.Column(db.String(255), nullable=True)
due = db.Column(db.Integer, default=0)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
offer = relationship(
"Offer",
back_populates="charges",
foreign_keys=[offer_id]
)
@classmethod
def get_all_charges(cls, offer_id=None, code=None, start_date=None, end_date=None, page=1, limit=20):
"""
Get all charges with optional filtering
Args:
offer_id (str, optional): Filter by offer ID
code (str, optional): Filter by charge code
start_date (datetime, optional): Filter by start date (created_at)
end_date (datetime, optional): Filter by end date (created_at)
page (int, optional): Page number for pagination
limit (int, optional): Number of items per page
Returns:
tuple: (list of Charge objects, total count)
"""
query = cls.query
# Apply filters if provided
if offer_id:
query = query.filter(cls.offer_id == offer_id)
if code:
query = query.filter(cls.code == code)
if start_date:
query = query.filter(cls.created_at >= start_date)
if end_date:
query = query.filter(cls.created_at <= end_date)
# Order by created_at descending (newest first)
query = query.order_by(cls.created_at.desc())
# Get total count before pagination
total_count = query.count()
# Apply pagination
offset = (page - 1) * limit
query = query.limit(limit).offset(offset)
return query.all(), total_count
def to_dict(self):
"""
Convert the Charge object to a dictionary format for JSON serialization.
"""
return {
'id': self.id,
'offer_id': self.offer_id,
'code': self.code,
'percent': self.percent,
'description': self.description,
'due': self.due,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<Charge {self.id} - {self.code}>'
+6 -1
View File
@@ -28,7 +28,7 @@ class LoanRepaymentSchedule(db.Model):
# )
@classmethod
def get_all_repayment_schedules(cls, loan_id=None, product_id=None, paid=None,
def get_all_repayment_schedules(cls, loan_id=None, product_id=None, transaction_id=None, paid=None,
due_before=None, due_after=None, installment_number=None,
page=1, limit=20):
"""
@@ -37,6 +37,7 @@ class LoanRepaymentSchedule(db.Model):
Args:
loan_id (int, optional): Filter by loan ID
product_id (str, optional): Filter by product ID
transaction_id (str, optional): Filter by transaction ID
paid (bool, optional): Filter by paid status
due_before (datetime, optional): Filter schedules due before this date
due_after (datetime, optional): Filter schedules due after this date
@@ -56,6 +57,9 @@ class LoanRepaymentSchedule(db.Model):
if product_id:
query = query.filter(cls.product_id == product_id)
if transaction_id:
query = query.filter(cls.transaction_id == transaction_id)
if paid is not None:
query = query.filter(cls.paid == paid)
@@ -85,6 +89,7 @@ class LoanRepaymentSchedule(db.Model):
'id': self.id,
'loan_id': self.loan_id,
'product_id': self.product_id,
'transaction_id': self.transaction_id,
'installment_number': self.installment_number,
'due_date': self.due_date.isoformat() if self.due_date else None,
'installment_amount': self.installment_amount,
+60 -25
View File
@@ -1,8 +1,8 @@
from datetime import datetime, timezone
from app.extensions import db
from app.models.charge import Charge
from sqlalchemy.orm import relationship
class Offer(db.Model):
__tablename__ = 'offers'
@@ -22,45 +22,80 @@ class Offer(db.Model):
charges = relationship(
"Charge",
primaryjoin="Offer.id == Charge.offer_id",
foreign_keys="Charge.offer_id",
back_populates="offer",
cascade="all, delete-orphan"
)
@classmethod
def get_all_offers(cls):
def get_all_offers(cls, id=None, product_id=None, start_date=None, end_date=None, page=1, limit=20):
"""
Return all offers in dictionary format.
"""
offers = cls.query.all()
Get all offers with optional filtering
Args:
id (str, optional): Filter by offer ID
product_id (str, optional): Filter by product ID
start_date (datetime, optional): Filter by start date (created_at)
end_date (datetime, optional): Filter by end date (created_at)
page (int, optional): Page number for pagination
limit (int, optional): Number of items per page
Returns:
tuple: (list of Offer objects, total count)
"""
query = cls.query
# Apply filters if provided
if id:
query = query.filter(cls.id == id)
if product_id:
query = query.filter(cls.product_id == product_id)
if start_date:
query = query.filter(cls.created_at >= start_date)
if end_date:
query = query.filter(cls.created_at <= end_date)
# Order by list_order ascending (if available) or created_at descending
query = query.order_by(cls.list_order.asc(), cls.created_at.desc())
# Get total count before pagination
total_count = query.count()
# Apply pagination
offset = (page - 1) * limit
query = query.limit(limit).offset(offset)
return query.all(), total_count
if not offers:
raise ValueError(f"No available offers")
return offers
@classmethod
def is_valid_offer(cls, offer_id):
offer = cls.query.filter_by(id=str(offer_id)).first()
if not offer:
return False
return offer
def to_dict(self):
"""
Convert the Offer object to a dictionary format for JSON serialization.
"""
return {
"offerId": self.id,
"productId": self.product_id,
"minAmount": self.min_amount,
"maxAmount": self.max_amount,
"tenor": self.tenor,
"interest_rate": self.interest_rate,
"management_rate": self.management_rate,
"insurance_rate": self.insurance_rate,
"vat_rate": self.vat_rate
'id': self.id,
'product_id': self.product_id,
'min_amount': self.min_amount,
'max_amount': self.max_amount,
'tenor': self.tenor,
'schedule': self.schedule,
'interest_rate': self.interest_rate,
'management_rate': self.management_rate,
'insurance_rate': self.insurance_rate,
'vat_rate': self.vat_rate,
'list_order': self.list_order,
'created_at': self.created_at.isoformat() if self.created_at else None,
'updated_at': self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<LoanOffer {self.id}>'
return f'<Offer {self.id}>'