mercore starter

This commit is contained in:
CHIEFSOFT\ameye
2025-06-22 20:45:07 -04:00
commit 987b7d6383
114 changed files with 6223 additions and 0 deletions
+18
View File
@@ -0,0 +1,18 @@
from .customer import Customer
from .account import Account
from .loan import Loan
from .transaction import Transaction
from .repayment import Repayment
from .loan_charge import LoanCharge
from .offer import Offer
from .charge import Charge
from .rac_checks import RACCheck
from .loan_repayment_schedule import LoanRepaymentSchedule
from .transaction_offers import TransactionOffer
from .repayments_data import RepaymentsData
from .salary import Salary
from .members import Members
__all__ = ['Members','Customer', 'Account', 'Loan', 'Transaction', 'Repayment', 'LoanCharge', 'Offer', 'Charge', 'RACCheck', 'LoanRepaymentSchedule', 'TransactionOffer', 'RepaymentsData', 'Salary']
+52
View File
@@ -0,0 +1,52 @@
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
from app.extensions import db
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import func
class Account(db.Model):
__tablename__ = 'accounts'
id = db.Column(db.String(50), primary_key=True)
customer_id = db.Column(db.String(50), nullable=False)
account_type = db.Column(db.String(50))
status = db.Column(db.String(20), default='active')
lien_amount = db.Column(db.Float, default=0.0)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
customer = relationship(
"Customer",
primaryjoin="Customer.id == Account.customer_id",
foreign_keys=[customer_id],
back_populates="accounts",
)
@classmethod
def create_account(cls, id, customer_id, account_type, status='active'):
account = cls(
id=id,
customer_id=customer_id,
account_type=account_type,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(account)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return account
@classmethod
def is_valid_account(cls, account_id, customer_id):
account = cls.query.filter_by(id=account_id, customer_id=customer_id).first()
if not account:
return False
if account.lien_amount > 0:
return False
return account
def __repr__(self):
return f'<Account {self.id}>'
+97
View File
@@ -0,0 +1,97 @@
from datetime import datetime, timezone, timedelta
from app.extensions import db
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
class Charge(db.Model):
__tablename__ = 'charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
offer_id = db.Column(db.String(50), nullable=False)
code = db.Column(db.String(50), nullable=False)
percent = db.Column(db.Float, default=0.0)
description = db.Column(db.Text, nullable=True)
due = db.Column(db.Integer, nullable=False)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
offer = relationship(
"Offer",
primaryjoin="Charge.offer_id == Offer.id",
foreign_keys=[offer_id],
back_populates="charges",
)
@classmethod
def add_charges(cls, offer_id, charges):
"""
Add charges to an offer.
Args:
offer_id (int): ID of the offer to associate charges with.
charges (list): A list of dictionaries with keys:
code (str), amount (float), percent (float), description (str), due (int)
"""
if not charges or not isinstance(charges, list):
raise ValueError("Charges must be a non-empty list of dictionaries")
if offer_id is None:
raise ValueError("offer_id cannot be None")
offer_charges = []
for charge in charges:
code = charge.get("code")
percent = charge.get("percent", 0.0)
description = charge.get("description", "")
due_days = charge.get("due", 0)
existing = cls.query.filter_by(offer_id=offer_id, code=code).first()
if existing:
continue
charge_obj = cls(
offer_id = offer_id,
code = code,
percent = percent,
description = description,
due = due_days,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.session.add(charge_obj)
offer_charges.append(charge_obj)
return offer_charges
@classmethod
def get_offer_charges(cls, offer_id):
"""
Get all charges for a particular offer as a dictionary
Args:
offer_id (str): The offer ID.
"""
if not offer_id:
raise ValueError("offer_id not found")
charges = cls.query.filter_by(offer_id=offer_id).all()
return charges
def to_dict(self):
return {
'id': self.id,
'offerId': self.offer_id,
'code': self.code,
'percent': self.percent,
'description': self.description,
'due': self.due
}
def __repr__(self):
return f"<Charge {self.id} - Offer {self.offer_id} - {self.code}>"
+90
View File
@@ -0,0 +1,90 @@
from datetime import datetime, timezone
from sqlalchemy.orm import relationship
#
# from app.api.services.offer_analysis import logger
from app.extensions import db
from app.models.account import Account
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import func
# from app.utils.logger import logger
class Customer(db.Model):
__tablename__ = 'customers'
id = db.Column(db.String(50), primary_key=True)
msisdn = db.Column(db.String(20), unique=True, nullable=False)
country_code = db.Column(db.String(3), nullable=False)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
accounts = relationship(
"Account",
primaryjoin="Customer.id == Account.customer_id",
foreign_keys="Account.customer_id",
back_populates="customer",
)
loans = relationship(
"Loan",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys="Loan.customer_id",
back_populates="customer",
)
transaction_offers = relationship(
"TransactionOffer",
primaryjoin="Customer.id == TransactionOffer.customer_id",
foreign_keys="TransactionOffer.customer_id",
back_populates="customer",
)
@classmethod
def is_valid_customer(cls, customer_id):
customer = cls.query.filter_by(id=customer_id).first()
if not customer:
return False
return customer
@classmethod
def create_customer(cls, id, msisdn, country_code, account_id, account_type='savings'):
if cls.query.filter_by(id=id).first():
raise ValueError("Customer already exists")
elif Account.query.filter_by(id=account_id).first():
raise ValueError("Account already exists")
elif cls.query.filter_by(msisdn=msisdn).first():
raise ValueError("msisdn already exists")
# Create the customer
customer = cls(
id=id,
msisdn=msisdn,
country_code=country_code,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(customer)
# Create an associated account
account = Account.create_account(
id=account_id,
customer_id=id,
account_type=account_type
)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return customer
@classmethod
def get_customer_with_loan_list(cls, customer_id):
"""
Get customer by ID.
"""
customer = cls.query.filter_by(id=customer_id).first()
if not customer:
raise ValueError(f"Customer does not exist")
return customer
def __repr__(self):
return f'<Customer {self.id}>'
+261
View File
@@ -0,0 +1,261 @@
from datetime import datetime, timezone, timedelta
from itertools import product
from app.extensions import db
from app.models.customer import Customer
from app.models.account import Account
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import relationship
from dateutil.relativedelta import relativedelta
from datetime import timedelta
import logging
from sqlalchemy import and_, or_, not_
from sqlalchemy.sql import func
logger = logging.getLogger(__name__)
class Loan(db.Model):
__tablename__ = 'loans'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
customer_id = db.Column(db.String(50), nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
original_transaction = db.Column(db.String(50), nullable=True)
account_id = db.Column(db.String(50), nullable=False)
offer_id = db.Column(db.String(20), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
collection_type = db.Column(db.String(20), nullable=True)
current_loan_amount = db.Column(db.Float, nullable=True)
initial_loan_amount = db.Column(db.Float, nullable=False)
default_penalty_fee = db.Column(db.Float, default=0)
continuous_fee = db.Column(db.Float, default=0)
upfront_fee = db.Column(db.Float, nullable=True, default=0.0)
repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
installment_amount = db.Column(db.Float, nullable=True, default=0.0)
status = db.Column(db.String(20), default='pending')
tenor = db.Column(db.Integer, nullable=True)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
eligible_amount = db.Column(db.Float, nullable=True, default=0.0)
disburse_date = db.Column(db.DateTime, nullable=True)
disburse_verify = db.Column(db.DateTime, nullable=True)
reference = db.Column(db.String(50), nullable=True)
disburse_result = db.Column(db.String(10), nullable=True)
disburse_description = db.Column(db.String(100), nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
customer = relationship(
"Customer",
primaryjoin="Customer.id == Loan.customer_id",
foreign_keys=[customer_id],
back_populates="loans",
)
loan_charges = relationship(
"LoanCharge",
primaryjoin="LoanCharge.loan_id == Loan.id",
foreign_keys="LoanCharge.loan_id",
back_populates="loan",
)
loan_repayment_schedules = relationship(
"LoanRepaymentSchedule",
primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
foreign_keys="LoanRepaymentSchedule.loan_id",
back_populates="loan",
)
@classmethod
def create_loan(
cls,
customer_id,
account_id,
offer_id,
product_id,
initial_loan_amount,
collection_type,
transaction_id,
original_transaction,
upfront_fee,
repayment_amount,
installment_amount,
tenor,
eligible_amount,
reference,
status = "pending",
):
# Check if customer exists
customer = Customer.is_valid_customer(customer_id)
if not customer:
raise ValueError("Customer does not exist")
now = datetime.now(timezone.utc)
due_date = now + timedelta(days=tenor)
# Create and save the loan
loan = cls(
customer_id = customer_id,
account_id = account_id,
offer_id = offer_id,
product_id = product_id,
collection_type = collection_type,
transaction_id = transaction_id,
original_transaction = original_transaction,
initial_loan_amount = initial_loan_amount,
current_loan_amount = initial_loan_amount,
upfront_fee = upfront_fee,
repayment_amount = repayment_amount,
installment_amount = installment_amount,
due_date=due_date,
tenor = tenor,
status = status,
eligible_amount =eligible_amount,
reference = reference,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(loan)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return loan
@classmethod
def has_active_loans(cls, customer_id):
active_loans = cls.query.filter_by(
customer_id=customer_id,
status='active'
).count()
if active_loans > 0:
return False
return True
@classmethod
def get_customer_loan(cls, loan_id, customer_id):
"""
Get customer's active loans by loan_id.
"""
loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first()
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.")
return loan
@classmethod
def get_customer_original_loan(cls, customer_id, original_transaction):
"""
Get customer's original loan offer.
"""
original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first()
if not original_loan:
return None
logger.info(f" get_customer_original_loan ==>>>> {original_loan}")
return original_loan
@classmethod
def get_customer_last_loan(cls, customer_id):
"""
Get customer's active loans.
"""
logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}")
# loan = cls.query.filter_by( cls.customer_id == customer_id).first()
loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first()
if not loan:
return None
# loan = {
# "original_transaction":"",
# "eligible_amount": 0,
# "loan_amount": 0,
# "customer_id": customer_id,
# "transaction_id": "",
# "resultDescription": "No Active Loan"
# }
logger.info(f" get_customer_last_loan ==>>>> {loan}")
return loan
@classmethod
def get_active_loans_by_original_transaction(cls, original_transaction_id):
"""
Get all active loans with the same original_transaction ID.
"""
active_loans = cls.query.filter_by(
original_transaction=original_transaction_id,
# status='active'
).all()
return active_loans
@classmethod
def update_status(cls, loan_id, status):
"""
Update the status of the loan with the given loan_id.
"""
# Retrieve loan
loan = cls.query.get(loan_id)
if not loan:
raise ValueError(f"Loan with ID {loan_id} does not exist.")
if loan.status == status:
return
# Update loan status and the updated_at timestamp
loan.status = status
@classmethod
def get_daily_loan_count(cls, customer_id, product_id):
"""
Returns the count of loans created today for a customer.
"""
start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
end_of_day = start_of_day + timedelta(days=1)
return cls.query.filter_by(
customer_id=customer_id,
product_id=product_id,
).filter(
cls.created_at >= start_of_day,
cls.created_at < end_of_day
).count()
def to_dict(self):
"""
Convert the Loan object to a dictionary format for JSON serialization.
"""
return {
'debtId': self.id,
'transactionId': self.transaction_id,
'loanRef': self.reference,
'productId': self.product_id,
'initialLoanAmount': self.initial_loan_amount,
'currentLoanAmount': self.current_loan_amount,
'defaultPenaltyFee': self.default_penalty_fee,
'continuousFee': self.continuous_fee,
'collectionType': self.collection_type,
'upfrontFee': self.upfront_fee,
'repaymentAmount': self.repayment_amount,
'installmentAmount': self.installment_amount,
'status': self.status,
'tenor': self.tenor,
'dueDate': self.due_date.isoformat() if self.due_date else None,
'loanDate': self.created_at.isoformat() if self.created_at else None,
}
def __repr__(self):
return f'<Loan {self.id}>'
+91
View File
@@ -0,0 +1,91 @@
from datetime import datetime, timezone, timedelta
from app.extensions import db
from sqlalchemy.orm import relationship
from app.utils.logger import logger
from sqlalchemy.sql import func
class LoanCharge(db.Model):
__tablename__ = 'loan_charges'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
code = db.Column(db.String(50), nullable=False)
amount = db.Column(db.Float, default=0.0)
percent = db.Column(db.Float, default=0.0)
description = db.Column(db.Text, nullable=True)
due = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
loan = relationship(
"Loan",
primaryjoin="LoanCharge.loan_id == Loan.id",
foreign_keys=[loan_id],
back_populates="loan_charges",
)
@classmethod
def create_charges_for_loan(cls, loan_id, transaction_id, charges, referenced_amount = 0.0):
"""
Create loan charges for a given loan.
Args:
loan_id (int): ID of the loan to associate charges with.
charges (list): A list of dictionaries with keys:
code (str), amount (float), percent (float), description (str), due (int)
"""
# if not charges or not isinstance(charges, list):
# raise ValueError("Charges must be a non-empty list of dictionaries")
if loan_id is None:
raise ValueError("loan_id cannot be None")
loan_charges = []
now = datetime.now(timezone.utc)
subset_keys = ['interest', 'management', 'insurance', 'vat']
for item in subset_keys:
charge = charges[item]
due_days = charge['due_days'] # getattr(charge, "due_days", 0)
amount = charge['fee'] # getattr(charge, "fee", 0.0)
percent = charge['rate'] # getattr(charge, "rate", 0.0)
code = charge['code'] # getattr(charge, "code","")
description = charge['description'] # getattr(charge, "description", "")
charge_obj = cls(
loan_id = loan_id,
transaction_id = transaction_id,
code = code,
amount = round(amount, 2),
percent = percent,
description = description,
due = due_days,
due_date = now + timedelta(days=due_days),
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.session.add(charge_obj)
loan_charges.append(charge_obj)
return loan_charges
def to_dict(self):
return {
'id': self.id,
'loanId': self.loan_id,
'transactionId': self.transaction_id,
'code': self.code,
'amount': self.amount,
'percent': self.percent,
'description': self.description,
'due': self.due,
}
def __repr__(self):
return f"<LoanCharge {self.id} - Loan {self.loan_id} - {self.code}>"
+72
View File
@@ -0,0 +1,72 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.orm import relationship
from dateutil.relativedelta import relativedelta
from sqlalchemy.sql import func
class LoanRepaymentSchedule(db.Model):
__tablename__ = 'loan_repayment_schedules'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
loan_id = db.Column(db.Integer, nullable=False)
transaction_id = db.Column(db.String(50), nullable=True)
product_id = db.Column(db.String(20), nullable=True)
installment_number = db.Column(db.Integer, nullable=False)
due_date = db.Column(db.DateTime, nullable=False)
installment_amount= db.Column(db.Float, default=0.0)
total_repayment_amount = db.Column(db.Float, default=0.0)
paid = db.Column(db.Boolean, default=False)
paid_at = db.Column(db.DateTime, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
loan = relationship(
"Loan",
primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id",
foreign_keys=[loan_id],
back_populates="loan_repayment_schedules",
)
@classmethod
def add_repayment_schedule(cls, loan, num_schedules, transaction_id):
"""
Add repayment schedules for a given loan.
"""
now = datetime.now(timezone.utc)
schedules = []
for i in range(num_schedules):
due_date = now + relativedelta(months=i + 1)
schedule = LoanRepaymentSchedule(
loan_id=loan.id,
installment_number=i + 1,
due_date=due_date,
total_repayment_amount = round(loan.repayment_amount, 2),
installment_amount=round(loan.installment_amount, 2),
product_id = loan.product_id,
transaction_id = transaction_id,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.session.add(schedule)
schedules.append(schedule)
return schedules
def to_dict(self):
return {
'id': self.id,
'loanId': self.loan_id,
'installmentNumber': self.installment_number,
'dueDate': self.due_date.isoformat(),
'principalAmount': self.principal_amount,
'interestAmount': self.interest_amount,
'totalInstallment': self.total_installment,
'paid': self.paid,
'paidAt': self.paid_at.isoformat() if self.paid_at else None
}
def __repr__(self):
return f'<LoanRepaymentSchedule Loan:{self.loan_id} Installment:{self.installment_number}>'
+51
View File
@@ -0,0 +1,51 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.sql import func
class Members(db.Model):
__tablename__ = 'members'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
uid = db.Column(db.String(150), nullable=False)
username = db.Column(db.String(25), nullable=False)
password = db.Column(db.String(100), nullable=True)
loc = db.Column(db.String(20), nullable=True)
status = db.Column(db.Integer, default=1)
added = db.Column(db.DateTime(timezone=False), server_default=func.now())
updated = db.Column(db.DateTime(timezone=False), server_default=func.now(), onupdate=func.now())
email = db.Column(db.String(100), nullable=False)
account_name = db.Column(db.String(100), nullable=True)
firstname = db.Column(db.String(25), nullable=False)
lastname = db.Column(db.String(100), nullable=True)
def to_dict(self):
return {
"id": self.id,
"uid": self.uid,
"username": self.account_id,
"account_id": self.username,
"password": self.password,
"loc": self.loc,
"status": self.status,
"added": self.added.isoformat() if self.added else None,
"updated": self.updated.isoformat() if self.updated else None,
"email": self.email,
"account_name": self.account_name,
"firstname": self.firstname,
"lastname": self.lastname
}
def __repr__(self):
return f'<Members {self.id} - {self.amount}>'
@classmethod
def get_member_by_username(cls, username):
"""
Return an offer by its ID.
"""
member = cls.query.filter_by(username=str(username)).first()
if not member:
raise ValueError(f"Username = {username} not found")
return member
+94
View File
@@ -0,0 +1,94 @@
from datetime import datetime, timezone
from app.extensions import db
from app.models.charge import Charge
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
class Offer(db.Model):
__tablename__ = 'offers'
id = db.Column(db.String, primary_key=True)
product_id = db.Column(db.String, nullable=False)
min_amount = db.Column(db.Float, nullable=False)
max_amount = db.Column(db.Float, nullable=False)
tenor = db.Column(db.Integer, nullable=False)
schedule = db.Column(db.Integer, nullable=True)
interest_rate = db.Column(db.Float, default=3.0)
management_rate = db.Column(db.Float, default=1.0)
insurance_rate = db.Column(db.Float, default=1.0)
vat_rate = db.Column(db.Float, default=7.5)
list_order = db.Column(db.Integer, nullable=True)
max_daily_loans = db.Column(db.Integer, nullable=True)
max_active_loans = db.Column(db.Integer, nullable=True)
max_life_loans = db.Column(db.Integer, nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
charges = relationship(
"Charge",
primaryjoin="Offer.id == Charge.offer_id",
foreign_keys="Charge.offer_id",
back_populates="offer",
)
@classmethod
def get_all_offers(cls):
"""
Return all offers in dictionary format.
"""
offers = cls.query.all()
if not offers:
raise ValueError(f"No available offers")
return offers
@classmethod
def is_valid_offer(cls, offer_id):
offer = cls.query.filter_by(id=str(offer_id)).first()
if not offer:
return False
return offer
@classmethod
def get_offer_by_id(cls, offer_id):
"""
Return an offer by its ID.
"""
offer = cls.query.filter_by(id=str(offer_id)).first()
if not offer:
raise ValueError(f"Offer with ID {offer_id} not found")
return offer
@classmethod
def get_offer_by_product_id(cls, product_id):
"""
Return an offer by its product ID.
"""
offer = cls.query.filter_by(product_id=str(product_id)).first()
if not offer:
raise ValueError(f"Offer with Product ID {product_id} not found")
return offer
def to_dict(self):
return {
"offerId": self.id,
"productId": self.product_id,
"minAmount": self.min_amount,
"maxAmount": self.max_amount,
"tenor": self.tenor,
"interest_rate": self.interest_rate,
"management_rate": self.management_rate,
"insurance_rate": self.insurance_rate,
"vat_rate": self.vat_rate,
"maxDailyLoans": self.max_daily_loans,
"maxActiveLoans": self.max_active_loans,
"maxLifeLoans": self.max_life_loans
}
def __repr__(self):
return f'<LoanOffer {self.id}>'
+75
View File
@@ -0,0 +1,75 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.orm import relationship
from sqlalchemy.exc import IntegrityError
from uuid import uuid4
from sqlalchemy.types import JSON
from sqlalchemy.sql import func
class RACCheck(db.Model):
__tablename__ = 'rac_checks'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
transaction_id = db.Column(db.String(50), nullable=False)
customer_id = db.Column(db.String, nullable=False)
account_id = db.Column(db.String, nullable=False)
rac_response = db.Column(db.JSON, nullable=False)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
@classmethod
def add_rac_check(cls, customer_id, account_id, transaction_id, data = None):
# Save the response
rac_check = cls(
customer_id = customer_id,
account_id = account_id,
transaction_id = transaction_id,
rac_response = data,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(rac_check)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return rac_check
@classmethod
def get_all_rac_checks(cls):
"""
Return all RAC checks in dictionary format.
"""
rac_checks = cls.query.all()
if not rac_checks:
return None
return rac_checks
@classmethod
def get_rac_check(cls, customer_id, account_id):
"""
Return a RAC check by its ID.
"""
rac_check = cls.query.filter_by( customer_id = customer_id,
account_id = account_id,).first()
if not rac_check:
raise ValueError(f"RAC Check for customer not found")
return rac_check
def to_dict(self):
return {
"id": str(self.id),
"transactionId": str(self.transaction_id),
"customerId": self.customer_id,
"accountId": self.account_id,
"racResponse": self.rac_response,
"createdAt": self.created_at.isoformat(),
"updatedAt": self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<RACCheck {self.id}>'
+80
View File
@@ -0,0 +1,80 @@
from datetime import datetime, timezone
from app.api.enums.loan_status import LoanStatus
from app.extensions import db
from app.models.customer import Customer
from app.models.loan import Loan
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import func
class Repayment(db.Model):
__tablename__ = 'repayments'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
loan_id = db.Column(db.String(50), nullable=False)
customer_id = db.Column(db.String(50), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
transaction_id = db.Column(db.String(50), nullable=True)
# repay_date = db.Column(db.DateTime, default=datetime.now(timezone.utc))
repay_date = db.Column(db.DateTime, nullable=True)
repay_result = db.Column(db.String(10), nullable=True)
repay_description = db.Column(db.String(100), nullable=True)
# verify_date = db.Column(db.DateTime, default=datetime.now(timezone.utc))
verify_date = db.Column(db.DateTime, nullable=True)
verify_result = db.Column(db.String(10), nullable=True)
verify_description = db.Column(db.String(100), nullable=True)
initiated_by = db.Column(db.String(50), nullable=True)
salary_amount = db.Column(db.Float, nullable=True, default=0.0)
@classmethod
def create_repayment(cls, customer_id, loan, transaction_id):
# Check that the loan is active
if loan.status not in [LoanStatus.ACTIVE, LoanStatus.START_REPAY]:
raise ValueError(f"Repayment cannot be processed. Loan status: ({loan.status})")
repayment = cls(
customer_id=customer_id,
loan_id=loan.id,
product_id=loan.product_id,
transaction_id = transaction_id,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc),
initiated_by='USER_INITIATED'
)
try:
db.session.add(repayment)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return repayment
def to_dict(self):
return {
"id": self.id,
"loan_id": self.loan_id,
"customer_id": self.customer_id,
"product_id": self.product_id,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None,
"transaction_id": self.transaction_id,
"repay_date": self.repay_date.isoformat() if self.repay_date else None,
"repay_result": self.repay_result,
"repay_description": self.repay_description,
"verify_date": self.verify_date.isoformat() if self.verify_date else None,
"verify_result": self.verify_result,
"verify_description": self.verify_description,
"initiated_by": self.initiated_by,
"salary_amount": self.salary_amount
}
def __repr__(self):
return f'<Repayment {self.id}>'
+36
View File
@@ -0,0 +1,36 @@
from datetime import datetime, timezone
from app.extensions import db
class RepaymentsData(db.Model):
__tablename__ = "repayments_data"
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
transaction_id = db.Column(db.String(50), nullable=False)
fbn_transaction_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
account_id = db.Column(db.String(50), nullable=True)
repayment_amount = db.Column(db.Float, nullable=True, default=0.0)
amount_collected = db.Column(db.Float, nullable=True, default=0.0)
added_date = db.Column(db.DateTime(timezone=True), default=datetime.now(timezone.utc), nullable=False)
response_code = db.Column(db.String(10), nullable=True)
response_descr = db.Column(db.String(255), nullable=True)
balance = db.Column(db.Float, nullable=True, default=0.0)
def to_dict(self):
return {
"id": self.id,
"transaction_id": self.transaction_id,
"fbn_transaction_id": self.fbn_transaction_id,
"customer_id": self.customer_id,
"account_id": self.account_id,
"repayment_amount": self.repayment_amount,
"amount_collected": self.amount_collected,
"added_date": self.added_date.isoformat() if self.added_date else None,
"response_code": self.response_code,
"response_descr": self.response_descr,
"balance": self.balance,
}
def __repr__(self):
return f"<RepaymentsData id={self.id}, transaction_id={self.transaction_id}>"
+31
View File
@@ -0,0 +1,31 @@
from datetime import datetime, timezone
from app.extensions import db
from sqlalchemy.sql import func
class Salary(db.Model):
__tablename__ = 'salaries'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
customer_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=True)
status = db.Column(db.String(20), default='active')
amount = db.Column(db.Float, nullable=False, default=0.0)
salary_date = db.Column(db.DateTime(timezone=False), server_default=func.now())
created_at = db.Column(db.DateTime(timezone=False), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=False), server_default=func.now(), onupdate=func.now())
def to_dict(self):
return {
"id": self.id,
"customer_id": self.customer_id,
"account_id": self.account_id,
"status": self.status,
"amount": self.amount,
"salary_date": self.salary_date.isoformat() if self.salary_date else None,
"created_at": self.created_at.isoformat() if self.created_at else None,
"updated_at": self.updated_at.isoformat() if self.updated_at else None
}
def __repr__(self):
return f'<Salary {self.id} - {self.amount}>'
+57
View File
@@ -0,0 +1,57 @@
from datetime import datetime, timezone
from app.extensions import db
from app.models import account
from sqlalchemy.exc import IntegrityError
from sqlalchemy import and_, or_, not_
from sqlalchemy.sql import func
class Transaction(db.Model):
__tablename__ = 'transactions'
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
transaction_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
type = db.Column(db.String(50), nullable=False)
channel = db.Column(db.String(50), nullable=False)
phone_number = db.Column(db.String(50), nullable=True)
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
def __repr__(self):
return f'<Transaction {self.id}>'
@classmethod
def create_transaction(cls, transaction_id, account_id, customer_id, type, channel):
# if cls.query.filter_by(transaction_id=transaction_id).first():
# raise ValueError("Duplicate Transaction")
if cls.query.filter( and_( cls.transaction_id ==transaction_id, cls.type==type) ).first():
raise ValueError("Duplicate Transaction")
transaction = cls(
transaction_id = transaction_id,
customer_id = customer_id,
account_id = account_id,
type = type,
channel = channel,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
try:
db.session.add(transaction)
except IntegrityError as err:
raise ValueError(f"Database integrity error: {err}")
return transaction
@classmethod
def get_transaction_by_id(cls, transaction_id):
return cls.query.get(transaction_id)
+115
View File
@@ -0,0 +1,115 @@
from datetime import datetime, timezone, timedelta
from app.api.enums.loan_status import LoanStatus
from app.extensions import db
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
import logging
logger = logging.getLogger(__name__)
class TransactionOffer(db.Model):
__tablename__ = 'transaction_offers'
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
customer_id = db.Column(db.String(50), nullable=False)
transaction_id = db.Column(db.String(50), nullable=False)
original_transaction = db.Column(db.String(50), nullable=True)
offer_id = db.Column(db.String(20), nullable=False)
product_id = db.Column(db.String(20), nullable=True)
min_amount = db.Column(db.Float, nullable=False)
max_amount = db.Column(db.Float, nullable=False)
eligible_amount = db.Column(db.Float, nullable=True)
tenor = db.Column(db.Integer, nullable=True) # tenor in months, typically
created_at = db.Column(db.DateTime(timezone=True), server_default=func.now())
updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now())
customer = relationship(
"Customer",
primaryjoin="Customer.id == TransactionOffer.customer_id",
foreign_keys=[customer_id],
back_populates="transaction_offers",
)
@classmethod
def is_valid_transaction_offer(cls, transaction_offer, customer_id, product_id):
transaction_offer = cls.query.filter_by(
id = transaction_offer,
customer_id = customer_id,
# product_id = product_id
# transaction_id = transaction_id,
).first()
if not transaction_offer:
return False
return transaction_offer
@classmethod
def create_transaction_offer(cls, customer_id, transaction_id, original_transaction, offer_id, min_amount, max_amount, eligible_amount=None, product_id=None, tenor=None):
"""
Class method to create and save a TransactionOffer.
"""
transaction_offer = cls(
customer_id=customer_id,
transaction_id=transaction_id,
original_transaction=original_transaction,
offer_id=offer_id,
min_amount=min_amount,
max_amount=max_amount,
eligible_amount=eligible_amount,
product_id=product_id,
tenor=tenor,
created_at=datetime.now(timezone.utc),
updated_at=datetime.now(timezone.utc)
)
db.session.add(transaction_offer)
db.session.flush()
return transaction_offer
@classmethod
def get_lifetime_loan_count(cls, customer_id):
"""
Returns the total number of loans ever created for a customer.
"""
return cls.query.filter_by(customer_id=customer_id).count()
@classmethod
def get_latest_transaction_offer(cls, customer_id):
"""
Returns the most recent transaction offer for the given customer based on creation time.
"""
return cls.query.filter_by(customer_id=customer_id) \
.order_by(cls.created_at.desc()) \
.first()
@classmethod
def get_transaction_offer(cls, transaction_offer_id):
"""
Returns a transaction offer by its ID.
"""
return cls.query.get(transaction_offer_id)
def to_dict(self):
return {
'id': self.id,
'customerId': self.customer_id,
'transactionId': self.transaction_id,
'offerId': self.offer_id,
'productId': self.product_id,
'minAmount': self.min_amount,
'maxAmount': self.max_amount,
'eligibleAmount': self.eligible_amount,
'tenor': self.tenor,
'createdAt': self.created_at.isoformat() if self.created_at else None,
'updatedAt': self.updated_at.isoformat() if self.updated_at else None,
}
def __repr__(self):
return f'<TransactionOffer {self.id}>'