diff --git a/SQL/site_data.sql b/SQL/site_data.sql index 55eb726..3f9fd34 100644 --- a/SQL/site_data.sql +++ b/SQL/site_data.sql @@ -635,6 +635,10 @@ ALTER TABLE ONLY office_users ALTER TABLE office_users OWNER TO merms_panel; +INSERT INTO office_users (username, password, firstname, lastname, acc_level) VALUES ('mermsadmin','password','Merms','Admin',1000); + + + CREATE TABLE products_contacts( id SERIAL, uid uuid DEFAULT uuid_generate_v4(), @@ -654,6 +658,31 @@ ALTER TABLE ONLY products_contacts ALTER TABLE products_contacts OWNER TO merms_panel; + CREATE TABLE custom_templates ( + id SERIAL, + uid uuid DEFAULT uuid_generate_v4(), + custom_id VARCHAR(25) UNIQUE NOT NULL, + provision_name VARCHAR(250), + added timestamp without time zone DEFAULT now() + ); + ALTER TABLE ONLY custom_templates + ADD CONSTRAINT custom_templates_id_key UNIQUE (id); + +ALTER TABLE custom_templates OWNER TO merms_panel; + +INSERT INTO custom_templates(custom_id,provision_name) VALUES('icare_template','registry.chiefsoft.com/custom/mermsicare-mermshostedicare-001:latest'); + +INSERT INTO custom_templates(custom_id,provision_name) VALUES('kevkem_template','registry.chiefsoft.com/custom/mermskevkem-mermshostedicare-001:latest'); + +icare_template +registry.chiefsoft.com/custom/mermsicare-mermshostedicare-001:latest + + +kevkem_template +registry.chiefsoft.com/custom/mermskevkem-mermshostedicare-001:latest + + + new_l = { "uid": "425611f2-c692-4404-b93d-76ca7a5ce7" + str(x), "title": "Contact Random Item on " + str(x), diff --git a/app/api/enums/__init__.py b/app/api/enums/__init__.py index 27117ee..f93b26a 100644 --- a/app/api/enums/__init__.py +++ b/app/api/enums/__init__.py @@ -1,4 +1,5 @@ from .transaction_type import TransactionType from .loan_status import LoanStatus from .settings_items_data import SettingsItemsData -from .generatives_list import GenerativesList \ No newline at end of file +from .generatives_list import GenerativesList +from .kafka_messages import KafkaMessage \ No newline at end of file diff --git a/app/api/enums/kafka_messages.py b/app/api/enums/kafka_messages.py new file mode 100644 index 0000000..5ee3432 --- /dev/null +++ b/app/api/enums/kafka_messages.py @@ -0,0 +1,5 @@ +from enum import Enum + +class KafkaMessage(str, Enum): + REFRESH_PRODUCT_SETTINGS = "REFRESH_PRODUCT_SETTINGS" + FLUTTER_PAYMENT_RECEIVED = "FLUTTER_PAYMENT_RECEIVED" diff --git a/app/api/services/base_service.py b/app/api/services/base_service.py index 78a41b7..96992ed 100644 --- a/app/api/services/base_service.py +++ b/app/api/services/base_service.py @@ -306,45 +306,45 @@ class BaseService: logger.info(f"Processing {cls.TRANSACTION_TYPE} request") return schema.load(data) - @classmethod - def get_or_create_customer(cls, validated_data): - - """ - Check if a customer exists; if not, create one. - """ + # @classmethod + # def get_or_create_customer(cls, validated_data): + # + # """ + # Check if a customer exists; if not, create one. + # """ + # + # customer = Customer.query.filter_by(id=validated_data.get("customerId")).first() + # if not customer: + # customer = Customer.create_customer( + # id=validated_data.get("customerId"), + # msisdn=validated_data.get("msisdn"), + # country_code=validated_data.get("countryCode"), + # account_id=validated_data.get("accountId"), + # ) + # return customer - customer = Customer.query.filter_by(id=validated_data.get("customerId")).first() - if not customer: - customer = Customer.create_customer( - id=validated_data.get("customerId"), - msisdn=validated_data.get("msisdn"), - country_code=validated_data.get("countryCode"), - account_id=validated_data.get("accountId"), - ) - return customer + # @classmethod + # def validate_account_ownership(cls, account_id, customer_id): + # """ + # Check if the provided account belongs to the customer. + # """ + # is_valid = Account.is_valid_account(account_id, customer_id) + # return is_valid - @classmethod - def validate_account_ownership(cls, account_id, customer_id): - """ - Check if the provided account belongs to the customer. - """ - is_valid = Account.is_valid_account(account_id, customer_id) - return is_valid - - @classmethod - def log_transaction(cls, validated_data): - """ - Create a new transaction. - """ - channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel") - - return Transaction.create_transaction( - transaction_id = validated_data.get("transactionId"), - customer_id = validated_data.get('customerId', None), - account_id = validated_data.get("accountId", None), - type = cls.TRANSACTION_TYPE, - channel = channel, - ) + # @classmethod + # def log_transaction(cls, validated_data): + # """ + # Create a new transaction. + # """ + # channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel") + # + # return Transaction.create_transaction( + # transaction_id = validated_data.get("transactionId"), + # customer_id = validated_data.get('customerId', None), + # account_id = validated_data.get("accountId", None), + # type = cls.TRANSACTION_TYPE, + # channel = channel, + # ) @classmethod def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic): @@ -357,113 +357,79 @@ class BaseService: KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id, topic = topic) KafkaIntegration.flush() - - @classmethod - def calculate_charges(cls, offer, amount): - """ - Calculates and returns the charges for the given offer and amount. - - Args: - offer (Offer): The offer object that contains the charges. - amount (float): The requested loan amount. - - Returns: - dict: A dictionary containing the calculated charges. - """ - if not offer or not offer.charges: - logger.error(f"No charges found for offer ID {offer.id}") - return {"error": "No charges found for the offer"} - - loan_charges = offer.charges - tenor = offer.schedule # offer.tenor // 30 # Convert to months - interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount) - management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount) - insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount) - vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"]) - - # Separate fees into upfront and postpaid - upfront_fees = [ - fee["fee"] - for fee in [interest, management, insurance, vat] - if fee["due_days"] == 0 - ] - - postpaid_fees = [ - fee["fee"] - for fee in [interest, management, insurance, vat] - if fee["due_days"] != 0 - ] - vat_test = vat["fee"] - logger.info(f"VAT fee == *************** : {vat_test}") - - # Up-front payment: (only those fees due immediately i.e due_days == 0) - # upfront_payment = sum(upfront_fees) - if offer.schedule == 1: - upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"] - interest_amount = interest["fee"] - repayment_amount = amount - else: - upfront_payment = vat["fee"] + insurance["fee"]+management["fee"] - interest_amount = interest["fee"]*offer.schedule - repayment_amount = amount + interest_amount + # + # @classmethod + # def calculate_charges(cls, offer, amount): + # """ + # Calculates and returns the charges for the given offer and amount. + # + # Args: + # offer (Offer): The offer object that contains the charges. + # amount (float): The requested loan amount. + # + # Returns: + # dict: A dictionary containing the calculated charges. + # """ + # if not offer or not offer.charges: + # logger.error(f"No charges found for offer ID {offer.id}") + # return {"error": "No charges found for the offer"} + # + # loan_charges = offer.charges + # tenor = offer.schedule # offer.tenor // 30 # Convert to months + # interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount) + # management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount) + # insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount) + # vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"]) + # + # # Separate fees into upfront and postpaid + # upfront_fees = [ + # fee["fee"] + # for fee in [interest, management, insurance, vat] + # if fee["due_days"] == 0 + # ] + # + # postpaid_fees = [ + # fee["fee"] + # for fee in [interest, management, insurance, vat] + # if fee["due_days"] != 0 + # ] + # vat_test = vat["fee"] + # logger.info(f"VAT fee == *************** : {vat_test}") + # + # # Up-front payment: (only those fees due immediately i.e due_days == 0) + # # upfront_payment = sum(upfront_fees) + # if offer.schedule == 1: + # upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"] + # interest_amount = interest["fee"] + # repayment_amount = amount + # else: + # upfront_payment = vat["fee"] + insurance["fee"]+management["fee"] + # interest_amount = interest["fee"]*offer.schedule + # repayment_amount = amount + interest_amount + # + # + # # Repayment amount: (principal + only those fees not due immediately i.e due_days != 0) + # # repayment_amount = amount + (sum(postpaid_fees) * tenor) + # + # # Total amount: (upfront_payment + repayment_amount) + # total_amount = upfront_payment + repayment_amount + # + # # Calculate the installment amount + # installment_amount = repayment_amount / offer.schedule + # + # return { + # "interest": interest, + # "interest_amount": interest_amount, + # "management": management, + # "insurance": insurance, + # "vat": vat, + # "upfront_payment": round(upfront_payment, 2), + # "repayment_amount": round(repayment_amount, 2), + # "installment_amount": round(installment_amount, 2), + # "total_amount": round(total_amount, 2) + # } + # + # - # Repayment amount: (principal + only those fees not due immediately i.e due_days != 0) - # repayment_amount = amount + (sum(postpaid_fees) * tenor) - - # Total amount: (upfront_payment + repayment_amount) - total_amount = upfront_payment + repayment_amount - - # Calculate the installment amount - installment_amount = repayment_amount / offer.schedule - - return { - "interest": interest, - "interest_amount": interest_amount, - "management": management, - "insurance": insurance, - "vat": vat, - "upfront_payment": round(upfront_payment, 2), - "repayment_amount": round(repayment_amount, 2), - "installment_amount": round(installment_amount, 2), - "total_amount": round(total_amount, 2) - } - - - @classmethod - def get_charge_detail(cls, rates, charges, code, amount, management_fee= 0.0): - """ - Get details for a specific charge code from a list of loan charges. - - Returns default values if not found. - """ - fee = 0.0 - - if code == "VAT" and management_fee > 0: - fee = management_fee * rates / 100 - else: - fee = amount * rates / 100 - - return { - "rate": rates, - "fee": round(fee, 2), - "due_days": 30, - "code": code, - "description" : "have no idea how to get this yet" - } - - # if charge.code == code: - # if code == "VAT" and management_fee > 0: - # fee = management_fee * rates / 100 - # else: - # fee = amount * rates / 100 - # - # return { - # "rate": rates, - # "fee": round(fee, 2), - # "due_days": charge.due - # } - - # return {"rate": 0, "fee": 0, "due_days": 0} - diff --git a/app/api/services/customer_consent.py b/app/api/services/customer_consent.py index f787ee7..08192d4 100644 --- a/app/api/services/customer_consent.py +++ b/app/api/services/customer_consent.py @@ -1,60 +1,60 @@ -from flask import request, jsonify -from app.api.helpers.response_helper import ResponseHelper +# from flask import request, jsonify +# from app.api.helpers.response_helper import ResponseHelper +# # from app.api.services.base_service import BaseService +# from marshmallow import ValidationError +# from app.utils.logger import logger +# # from app.api.schemas.customer_consent import CustomerConsentSchema # from app.api.services.base_service import BaseService -from marshmallow import ValidationError -from app.utils.logger import logger -# from app.api.schemas.customer_consent import CustomerConsentSchema -from app.api.services.base_service import BaseService -from app.api.enums import TransactionType -from app.extensions import db - - -class CustomerConsentService(BaseService): - TRANSACTION_TYPE = TransactionType.CUSTOMER_CONSENT - - @staticmethod - def process_request(data): - """ - Process the CustomerConsent request. - - Args: - data (dict): The request data. - - Returns: - dict: A standardized response. - """ - try: - with db.session.begin(): - validated_data = CustomerConsentService.validate_data(data, CustomerConsentSchema()) - account_id = validated_data.get('accountId') - customer_id = validated_data.get('customerId') - - if(CustomerConsentService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): - - transaction = CustomerConsentService.log_transaction(validated_data = validated_data) - - if not transaction: - logger.error(f"Failed to log transaction") - return ResponseHelper.error(result_description="Failed to log transaction.") - else: - return ResponseHelper.error(result_description="Invalid Customer or Account") - - - db.session.commit() - return ResponseHelper.success(result_description="Request is received") - - except ValidationError as err: - - logger.error(f"Validation Error: {getattr(err, 'messages', str(err))}") - db.session.rollback() - return ResponseHelper.unprocessable_entity(result_description="Validation exception") - - except ValueError as err: - logger.error(f"{getattr(err, 'messages', str(err))}") - db.session.rollback() - return ResponseHelper.error(result_description=str(err)) - - except Exception as e: - logger.error(f"An error occurred: {str(e)}", exc_info=True) - db.session.rollback() - return ResponseHelper.internal_server_error() \ No newline at end of file +# from app.api.enums import TransactionType +# from app.extensions import db +# +# +# class CustomerConsentService(BaseService): +# TRANSACTION_TYPE = TransactionType.CUSTOMER_CONSENT +# +# @staticmethod +# def process_request(data): +# """ +# Process the CustomerConsent request. +# +# Args: +# data (dict): The request data. +# +# Returns: +# dict: A standardized response. +# """ +# try: +# with db.session.begin(): +# validated_data = CustomerConsentService.validate_data(data, CustomerConsentSchema()) +# account_id = validated_data.get('accountId') +# customer_id = validated_data.get('customerId') +# +# if(CustomerConsentService.validate_account_ownership(account_id = account_id, customer_id = customer_id)): +# +# transaction = CustomerConsentService.log_transaction(validated_data = validated_data) +# +# if not transaction: +# logger.error(f"Failed to log transaction") +# return ResponseHelper.error(result_description="Failed to log transaction.") +# else: +# return ResponseHelper.error(result_description="Invalid Customer or Account") +# +# +# db.session.commit() +# return ResponseHelper.success(result_description="Request is received") +# +# except ValidationError as err: +# +# logger.error(f"Validation Error: {getattr(err, 'messages', str(err))}") +# db.session.rollback() +# return ResponseHelper.unprocessable_entity(result_description="Validation exception") +# +# except ValueError as err: +# logger.error(f"{getattr(err, 'messages', str(err))}") +# db.session.rollback() +# return ResponseHelper.error(result_description=str(err)) +# +# except Exception as e: +# logger.error(f"An error occurred: {str(e)}", exc_info=True) +# db.session.rollback() +# return ResponseHelper.internal_server_error() \ No newline at end of file diff --git a/app/api/services/myproduct.py b/app/api/services/myproduct.py index cb6924d..3867f50 100644 --- a/app/api/services/myproduct.py +++ b/app/api/services/myproduct.py @@ -2,7 +2,7 @@ from urllib import request from flask import session, jsonify -from app.api.enums import SettingsItemsData +from app.api.enums import SettingsItemsData,KafkaMessage from app.api.schemas.myproduct_set_template import MyProductSetTemplateSchema from app.utils.logger import logger from app.api.services.base_service import BaseService @@ -279,15 +279,7 @@ class MyProductsService(BaseService): # logger.info(f"Incoming MyProduct data ==>>>> {memberSubscription}") productDataStatus = memberSubscription.status product_subscription_uid = memberSubscription.uid - # product_subscription_external_url = memberSubscription.external_url - # product_subscription_internal_url = memberSubscription.internal_url - # result_data = { - # "myproudct": { - # "result": "Reveived under development ", - # "message": "to be fixed" - # } - # } for key in settings.keys(): setting_value = settings[key] @@ -303,7 +295,7 @@ class MyProductsService(BaseService): } logger.error(f"Going for Thread ******************** ") thread = Thread(target=MyProductsService.async_send_settings_refresh_to_kafka, - args=(response_data, subscription_uid, "REFRESH_PRODUCT_SETTINGS")) + args=(response_data, subscription_uid, KafkaMessage.REFRESH_PRODUCT_SETTINGS)) thread.start() logger.error(f"After the Thread ******************** ") diff --git a/app/models/__init__.py b/app/models/__init__.py index 58c93d3..d0db1f5 100644 --- a/app/models/__init__.py +++ b/app/models/__init__.py @@ -18,10 +18,12 @@ from .payments import Payments from .subscription_generative import SubscriptionGenerative from .generative_results import GenerativeResults from .office_users import OfficeUsers +from .custom_templates import CustomTemplates __all__ = ['Members', 'Account', 'Products', 'MembersProducts', 'MembersActions', 'MembersPending', 'ProductsDetails', 'ProvisionActions', 'MembersProductsRefresh', 'MembersProductsSettings', 'PasswordReset', 'MembersProfile', 'SubscriptionOptions', 'SubscriptionOptionsItems', 'ProductsTemplates', 'Payments', 'PaymentsSession', 'SubscriptionGenerative', 'GenerativeResults', + 'CustomTemplates', 'OfficeUsers'] diff --git a/app/models/custom_templates.py b/app/models/custom_templates.py new file mode 100644 index 0000000..366dfd5 --- /dev/null +++ b/app/models/custom_templates.py @@ -0,0 +1,35 @@ +from datetime import datetime, timezone +from app.extensions import db +from app.models.charge import Charge +from sqlalchemy.orm import relationship +from sqlalchemy.sql import func + + +class CustomTemplates(db.Model): + __tablename__ = 'custom_templates' + + id = db.Column(db.String, primary_key=True) + uid = db.Column(db.String, nullable=False) + custom_id = db.Column(db.String, nullable=False) + provision_name = db.Column(db.String, nullable=False) + added = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) + + @classmethod + def get_template_by_custom_id(cls, custom_id): + templates = cls.query.filter_by(custom_id=str(custom_id)).all() + + if not templates: + raise ValueError(f"Templates with Custom ID {custom_id} not found") + return templates + + def to_dict(self): + return { + "id": self.id, + "uid": self.uid, + "custom_id": self.product_id, + "provision_name": self.name, + "added": self.added.isoformat() if self.added else None + } + + def __repr__(self): + return f'' diff --git a/app/models/loan.py b/app/models/loan.py index e8c1b11..bf105fc 100644 --- a/app/models/loan.py +++ b/app/models/loan.py @@ -1,261 +1,261 @@ -from datetime import datetime, timezone, timedelta -from itertools import product -from app.extensions import db -from app.models.customer import Customer -from app.models.account import Account -from sqlalchemy.exc import IntegrityError -from sqlalchemy.orm import relationship -from dateutil.relativedelta import relativedelta -from datetime import timedelta -import logging -from sqlalchemy import and_, or_, not_ -from sqlalchemy.sql import func - -logger = logging.getLogger(__name__) - - -class Loan(db.Model): - __tablename__ = 'loans' - - id = db.Column( - db.Integer, - primary_key=True, - autoincrement=True, - ) - customer_id = db.Column(db.String(50), nullable=False) - transaction_id = db.Column(db.String(50), nullable=True) - original_transaction = db.Column(db.String(50), nullable=True) - account_id = db.Column(db.String(50), nullable=False) - offer_id = db.Column(db.String(20), nullable=False) - product_id = db.Column(db.String(20), nullable=True) - collection_type = db.Column(db.String(20), nullable=True) - current_loan_amount = db.Column(db.Float, nullable=True) - initial_loan_amount = db.Column(db.Float, nullable=False) - default_penalty_fee = db.Column(db.Float, default=0) - continuous_fee = db.Column(db.Float, default=0) - upfront_fee = db.Column(db.Float, nullable=True, default=0.0) - repayment_amount = db.Column(db.Float, nullable=True, default=0.0) - installment_amount = db.Column(db.Float, nullable=True, default=0.0) - status = db.Column(db.String(20), default='pending') - tenor = db.Column(db.Integer, nullable=True) - due_date = db.Column(db.DateTime, nullable=True) - created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) - updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) - eligible_amount = db.Column(db.Float, nullable=True, default=0.0) - disburse_date = db.Column(db.DateTime, nullable=True) - disburse_verify = db.Column(db.DateTime, nullable=True) - reference = db.Column(db.String(50), nullable=True) - disburse_result = db.Column(db.String(10), nullable=True) - disburse_description = db.Column(db.String(100), nullable=True) - verify_result = db.Column(db.String(10), nullable=True) - verify_description = db.Column(db.String(100), nullable=True) - - # customer = relationship( - # "Customer", - # primaryjoin="Customer.id == Loan.customer_id", - # foreign_keys=[customer_id], - # back_populates="loans", - # ) - - loan_charges = relationship( - "LoanCharge", - primaryjoin="LoanCharge.loan_id == Loan.id", - foreign_keys="LoanCharge.loan_id", - back_populates="loan", - ) - - loan_repayment_schedules = relationship( - "LoanRepaymentSchedule", - primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id", - foreign_keys="LoanRepaymentSchedule.loan_id", - back_populates="loan", - ) - - - @classmethod - def create_loan( - cls, - customer_id, - account_id, - offer_id, - product_id, - initial_loan_amount, - collection_type, - transaction_id, - original_transaction, - upfront_fee, - repayment_amount, - installment_amount, - tenor, - eligible_amount, - reference, - status = "pending", - ): - # Check if customer exists - customer = Customer.is_valid_customer(customer_id) - if not customer: - raise ValueError("Customer does not exist") - - now = datetime.now(timezone.utc) - due_date = now + timedelta(days=tenor) - - # Create and save the loan - loan = cls( - customer_id = customer_id, - account_id = account_id, - offer_id = offer_id, - product_id = product_id, - collection_type = collection_type, - transaction_id = transaction_id, - original_transaction = original_transaction, - initial_loan_amount = initial_loan_amount, - current_loan_amount = initial_loan_amount, - upfront_fee = upfront_fee, - repayment_amount = repayment_amount, - installment_amount = installment_amount, - due_date=due_date, - tenor = tenor, - status = status, - eligible_amount =eligible_amount, - reference = reference, - created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc) - ) - - try: - db.session.add(loan) - except IntegrityError as err: - raise ValueError(f"Database integrity error: {err}") - return loan - - @classmethod - def has_active_loans(cls, customer_id): - active_loans = cls.query.filter_by( - customer_id=customer_id, - status='active' - ).count() - - if active_loans > 0: - return False - return True - - - @classmethod - def get_customer_loan(cls, loan_id, customer_id): - """ - Get customer's active loans by loan_id. - """ - loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first() - if not loan: - raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.") - return loan - - @classmethod - def get_customer_original_loan(cls, customer_id, original_transaction): - """ - Get customer's original loan offer. - """ - original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first() - if not original_loan: - return None - - logger.info(f" get_customer_original_loan ==>>>> {original_loan}") - return original_loan - - @classmethod - def get_customer_last_loan(cls, customer_id): - """ - Get customer's active loans. - """ - logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}") - # loan = cls.query.filter_by( cls.customer_id == customer_id).first() - loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first() - - if not loan: - return None - # loan = { - # "original_transaction":"", - # "eligible_amount": 0, - # "loan_amount": 0, - # "customer_id": customer_id, - # "transaction_id": "", - # "resultDescription": "No Active Loan" - # } - logger.info(f" get_customer_last_loan ==>>>> {loan}") - return loan - - @classmethod - def get_active_loans_by_original_transaction(cls, original_transaction_id): - """ - Get all active loans with the same original_transaction ID. - """ - - active_loans = cls.query.filter_by( - original_transaction=original_transaction_id, - # status='active' - ).all() - - return active_loans - - - @classmethod - def update_status(cls, loan_id, status): - """ - Update the status of the loan with the given loan_id. - """ - # Retrieve loan - loan = cls.query.get(loan_id) - - if not loan: - raise ValueError(f"Loan with ID {loan_id} does not exist.") - - if loan.status == status: - return - - # Update loan status and the updated_at timestamp - loan.status = status - - - @classmethod - def get_daily_loan_count(cls, customer_id, product_id): - """ - Returns the count of loans created today for a customer. - """ - - start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) - end_of_day = start_of_day + timedelta(days=1) - - return cls.query.filter_by( - customer_id=customer_id, - product_id=product_id, - ).filter( - cls.created_at >= start_of_day, - cls.created_at < end_of_day - ).count() - - - def to_dict(self): - """ - Convert the Loan object to a dictionary format for JSON serialization. - """ - return { - 'debtId': self.id, - 'transactionId': self.transaction_id, - 'loanRef': self.reference, - 'productId': self.product_id, - 'initialLoanAmount': self.initial_loan_amount, - 'currentLoanAmount': self.current_loan_amount, - 'defaultPenaltyFee': self.default_penalty_fee, - 'continuousFee': self.continuous_fee, - 'collectionType': self.collection_type, - 'upfrontFee': self.upfront_fee, - 'repaymentAmount': self.repayment_amount, - 'installmentAmount': self.installment_amount, - 'status': self.status, - 'tenor': self.tenor, - 'dueDate': self.due_date.isoformat() if self.due_date else None, - 'loanDate': self.created_at.isoformat() if self.created_at else None, - } - - def __repr__(self): - return f'' \ No newline at end of file +# from datetime import datetime, timezone, timedelta +# from itertools import product +# from app.extensions import db +# from app.models.customer import Customer +# from app.models.account import Account +# from sqlalchemy.exc import IntegrityError +# from sqlalchemy.orm import relationship +# from dateutil.relativedelta import relativedelta +# from datetime import timedelta +# import logging +# from sqlalchemy import and_, or_, not_ +# from sqlalchemy.sql import func +# +# logger = logging.getLogger(__name__) +# +# +# class Loan(db.Model): +# __tablename__ = 'loans' +# +# id = db.Column( +# db.Integer, +# primary_key=True, +# autoincrement=True, +# ) +# customer_id = db.Column(db.String(50), nullable=False) +# transaction_id = db.Column(db.String(50), nullable=True) +# original_transaction = db.Column(db.String(50), nullable=True) +# account_id = db.Column(db.String(50), nullable=False) +# offer_id = db.Column(db.String(20), nullable=False) +# product_id = db.Column(db.String(20), nullable=True) +# collection_type = db.Column(db.String(20), nullable=True) +# current_loan_amount = db.Column(db.Float, nullable=True) +# initial_loan_amount = db.Column(db.Float, nullable=False) +# default_penalty_fee = db.Column(db.Float, default=0) +# continuous_fee = db.Column(db.Float, default=0) +# upfront_fee = db.Column(db.Float, nullable=True, default=0.0) +# repayment_amount = db.Column(db.Float, nullable=True, default=0.0) +# installment_amount = db.Column(db.Float, nullable=True, default=0.0) +# status = db.Column(db.String(20), default='pending') +# tenor = db.Column(db.Integer, nullable=True) +# due_date = db.Column(db.DateTime, nullable=True) +# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) +# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) +# eligible_amount = db.Column(db.Float, nullable=True, default=0.0) +# disburse_date = db.Column(db.DateTime, nullable=True) +# disburse_verify = db.Column(db.DateTime, nullable=True) +# reference = db.Column(db.String(50), nullable=True) +# disburse_result = db.Column(db.String(10), nullable=True) +# disburse_description = db.Column(db.String(100), nullable=True) +# verify_result = db.Column(db.String(10), nullable=True) +# verify_description = db.Column(db.String(100), nullable=True) +# +# # customer = relationship( +# # "Customer", +# # primaryjoin="Customer.id == Loan.customer_id", +# # foreign_keys=[customer_id], +# # back_populates="loans", +# # ) +# +# loan_charges = relationship( +# "LoanCharge", +# primaryjoin="LoanCharge.loan_id == Loan.id", +# foreign_keys="LoanCharge.loan_id", +# back_populates="loan", +# ) +# +# loan_repayment_schedules = relationship( +# "LoanRepaymentSchedule", +# primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id", +# foreign_keys="LoanRepaymentSchedule.loan_id", +# back_populates="loan", +# ) +# +# +# @classmethod +# def create_loan( +# cls, +# customer_id, +# account_id, +# offer_id, +# product_id, +# initial_loan_amount, +# collection_type, +# transaction_id, +# original_transaction, +# upfront_fee, +# repayment_amount, +# installment_amount, +# tenor, +# eligible_amount, +# reference, +# status = "pending", +# ): +# # Check if customer exists +# customer = Customer.is_valid_customer(customer_id) +# if not customer: +# raise ValueError("Customer does not exist") +# +# now = datetime.now(timezone.utc) +# due_date = now + timedelta(days=tenor) +# +# # Create and save the loan +# loan = cls( +# customer_id = customer_id, +# account_id = account_id, +# offer_id = offer_id, +# product_id = product_id, +# collection_type = collection_type, +# transaction_id = transaction_id, +# original_transaction = original_transaction, +# initial_loan_amount = initial_loan_amount, +# current_loan_amount = initial_loan_amount, +# upfront_fee = upfront_fee, +# repayment_amount = repayment_amount, +# installment_amount = installment_amount, +# due_date=due_date, +# tenor = tenor, +# status = status, +# eligible_amount =eligible_amount, +# reference = reference, +# created_at=datetime.now(timezone.utc), +# updated_at=datetime.now(timezone.utc) +# ) +# +# try: +# db.session.add(loan) +# except IntegrityError as err: +# raise ValueError(f"Database integrity error: {err}") +# return loan +# +# @classmethod +# def has_active_loans(cls, customer_id): +# active_loans = cls.query.filter_by( +# customer_id=customer_id, +# status='active' +# ).count() +# +# if active_loans > 0: +# return False +# return True +# +# +# @classmethod +# def get_customer_loan(cls, loan_id, customer_id): +# """ +# Get customer's active loans by loan_id. +# """ +# loan = cls.query.filter_by(id = loan_id, customer_id = customer_id).first() +# if not loan: +# raise ValueError(f"Loan with ID {loan_id} does not exist or does not belong to customer {customer_id}.") +# return loan +# +# @classmethod +# def get_customer_original_loan(cls, customer_id, original_transaction): +# """ +# Get customer's original loan offer. +# """ +# original_loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.original_transaction==original_transaction, cls.transaction_id==original_transaction )).first() +# if not original_loan: +# return None +# +# logger.info(f" get_customer_original_loan ==>>>> {original_loan}") +# return original_loan +# +# @classmethod +# def get_customer_last_loan(cls, customer_id): +# """ +# Get customer's active loans. +# """ +# logger.info(f"get_customer_last_loan [customer_id] ==>>>> {customer_id}") +# # loan = cls.query.filter_by( cls.customer_id == customer_id).first() +# loan = cls.query.filter(and_( cls.customer_id ==customer_id, cls.status=='active')).first() +# +# if not loan: +# return None +# # loan = { +# # "original_transaction":"", +# # "eligible_amount": 0, +# # "loan_amount": 0, +# # "customer_id": customer_id, +# # "transaction_id": "", +# # "resultDescription": "No Active Loan" +# # } +# logger.info(f" get_customer_last_loan ==>>>> {loan}") +# return loan +# +# @classmethod +# def get_active_loans_by_original_transaction(cls, original_transaction_id): +# """ +# Get all active loans with the same original_transaction ID. +# """ +# +# active_loans = cls.query.filter_by( +# original_transaction=original_transaction_id, +# # status='active' +# ).all() +# +# return active_loans +# +# +# @classmethod +# def update_status(cls, loan_id, status): +# """ +# Update the status of the loan with the given loan_id. +# """ +# # Retrieve loan +# loan = cls.query.get(loan_id) +# +# if not loan: +# raise ValueError(f"Loan with ID {loan_id} does not exist.") +# +# if loan.status == status: +# return +# +# # Update loan status and the updated_at timestamp +# loan.status = status +# +# +# @classmethod +# def get_daily_loan_count(cls, customer_id, product_id): +# """ +# Returns the count of loans created today for a customer. +# """ +# +# start_of_day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0) +# end_of_day = start_of_day + timedelta(days=1) +# +# return cls.query.filter_by( +# customer_id=customer_id, +# product_id=product_id, +# ).filter( +# cls.created_at >= start_of_day, +# cls.created_at < end_of_day +# ).count() +# +# +# def to_dict(self): +# """ +# Convert the Loan object to a dictionary format for JSON serialization. +# """ +# return { +# 'debtId': self.id, +# 'transactionId': self.transaction_id, +# 'loanRef': self.reference, +# 'productId': self.product_id, +# 'initialLoanAmount': self.initial_loan_amount, +# 'currentLoanAmount': self.current_loan_amount, +# 'defaultPenaltyFee': self.default_penalty_fee, +# 'continuousFee': self.continuous_fee, +# 'collectionType': self.collection_type, +# 'upfrontFee': self.upfront_fee, +# 'repaymentAmount': self.repayment_amount, +# 'installmentAmount': self.installment_amount, +# 'status': self.status, +# 'tenor': self.tenor, +# 'dueDate': self.due_date.isoformat() if self.due_date else None, +# 'loanDate': self.created_at.isoformat() if self.created_at else None, +# } +# +# def __repr__(self): +# return f'' \ No newline at end of file diff --git a/app/models/loan_charge.py b/app/models/loan_charge.py index 8040437..83fb5a4 100644 --- a/app/models/loan_charge.py +++ b/app/models/loan_charge.py @@ -1,91 +1,91 @@ -from datetime import datetime, timezone, timedelta -from app.extensions import db -from sqlalchemy.orm import relationship -from app.utils.logger import logger -from sqlalchemy.sql import func - - -class LoanCharge(db.Model): - __tablename__ = 'loan_charges' - - id = db.Column(db.Integer, primary_key=True, autoincrement=True) - loan_id = db.Column(db.Integer, nullable=False) - transaction_id = db.Column(db.String(50), nullable=True) - code = db.Column(db.String(50), nullable=False) - amount = db.Column(db.Float, default=0.0) - percent = db.Column(db.Float, default=0.0) - description = db.Column(db.Text, nullable=True) - due = db.Column(db.Integer, nullable=False) - due_date = db.Column(db.DateTime, nullable=True) - created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) - updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) - # loan = relationship( - # "Loan", - # primaryjoin="LoanCharge.loan_id == Loan.id", - # foreign_keys=[loan_id], - # back_populates="loan_charges", - # ) - - @classmethod - def create_charges_for_loan(cls, loan_id, transaction_id, charges, referenced_amount = 0.0): - """ - Create loan charges for a given loan. - - Args: - loan_id (int): ID of the loan to associate charges with. - charges (list): A list of dictionaries with keys: - code (str), amount (float), percent (float), description (str), due (int) - """ - # if not charges or not isinstance(charges, list): - # raise ValueError("Charges must be a non-empty list of dictionaries") - - if loan_id is None: - raise ValueError("loan_id cannot be None") - - loan_charges = [] - now = datetime.now(timezone.utc) - - - subset_keys = ['interest', 'management', 'insurance', 'vat'] - for item in subset_keys: - charge = charges[item] - due_days = charge['due_days'] # getattr(charge, "due_days", 0) - amount = charge['fee'] # getattr(charge, "fee", 0.0) - percent = charge['rate'] # getattr(charge, "rate", 0.0) - code = charge['code'] # getattr(charge, "code","") - description = charge['description'] # getattr(charge, "description", "") - - charge_obj = cls( - loan_id = loan_id, - transaction_id = transaction_id, - code = code, - amount = round(amount, 2), - percent = percent, - description = description, - due = due_days, - due_date = now + timedelta(days=due_days), - created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc) - ) - - db.session.add(charge_obj) - loan_charges.append(charge_obj) - - return loan_charges - - - - def to_dict(self): - return { - 'id': self.id, - 'loanId': self.loan_id, - 'transactionId': self.transaction_id, - 'code': self.code, - 'amount': self.amount, - 'percent': self.percent, - 'description': self.description, - 'due': self.due, - } - - def __repr__(self): - return f"" +# from datetime import datetime, timezone, timedelta +# from app.extensions import db +# from sqlalchemy.orm import relationship +# from app.utils.logger import logger +# from sqlalchemy.sql import func +# +# +# class LoanCharge(db.Model): +# __tablename__ = 'loan_charges' +# +# id = db.Column(db.Integer, primary_key=True, autoincrement=True) +# loan_id = db.Column(db.Integer, nullable=False) +# transaction_id = db.Column(db.String(50), nullable=True) +# code = db.Column(db.String(50), nullable=False) +# amount = db.Column(db.Float, default=0.0) +# percent = db.Column(db.Float, default=0.0) +# description = db.Column(db.Text, nullable=True) +# due = db.Column(db.Integer, nullable=False) +# due_date = db.Column(db.DateTime, nullable=True) +# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) +# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) +# # loan = relationship( +# # "Loan", +# # primaryjoin="LoanCharge.loan_id == Loan.id", +# # foreign_keys=[loan_id], +# # back_populates="loan_charges", +# # ) +# +# @classmethod +# def create_charges_for_loan(cls, loan_id, transaction_id, charges, referenced_amount = 0.0): +# """ +# Create loan charges for a given loan. +# +# Args: +# loan_id (int): ID of the loan to associate charges with. +# charges (list): A list of dictionaries with keys: +# code (str), amount (float), percent (float), description (str), due (int) +# """ +# # if not charges or not isinstance(charges, list): +# # raise ValueError("Charges must be a non-empty list of dictionaries") +# +# if loan_id is None: +# raise ValueError("loan_id cannot be None") +# +# loan_charges = [] +# now = datetime.now(timezone.utc) +# +# +# subset_keys = ['interest', 'management', 'insurance', 'vat'] +# for item in subset_keys: +# charge = charges[item] +# due_days = charge['due_days'] # getattr(charge, "due_days", 0) +# amount = charge['fee'] # getattr(charge, "fee", 0.0) +# percent = charge['rate'] # getattr(charge, "rate", 0.0) +# code = charge['code'] # getattr(charge, "code","") +# description = charge['description'] # getattr(charge, "description", "") +# +# charge_obj = cls( +# loan_id = loan_id, +# transaction_id = transaction_id, +# code = code, +# amount = round(amount, 2), +# percent = percent, +# description = description, +# due = due_days, +# due_date = now + timedelta(days=due_days), +# created_at=datetime.now(timezone.utc), +# updated_at=datetime.now(timezone.utc) +# ) +# +# db.session.add(charge_obj) +# loan_charges.append(charge_obj) +# +# return loan_charges +# +# +# +# def to_dict(self): +# return { +# 'id': self.id, +# 'loanId': self.loan_id, +# 'transactionId': self.transaction_id, +# 'code': self.code, +# 'amount': self.amount, +# 'percent': self.percent, +# 'description': self.description, +# 'due': self.due, +# } +# +# def __repr__(self): +# return f"" diff --git a/app/models/loan_repayment_schedule.py b/app/models/loan_repayment_schedule.py index 73dc88f..e58aaa3 100644 --- a/app/models/loan_repayment_schedule.py +++ b/app/models/loan_repayment_schedule.py @@ -1,72 +1,72 @@ -from datetime import datetime, timezone -from app.extensions import db -from sqlalchemy.orm import relationship -from dateutil.relativedelta import relativedelta -from sqlalchemy.sql import func - -class LoanRepaymentSchedule(db.Model): - __tablename__ = 'loan_repayment_schedules' - - id = db.Column(db.Integer, primary_key=True, autoincrement=True) - loan_id = db.Column(db.Integer, nullable=False) - transaction_id = db.Column(db.String(50), nullable=True) - product_id = db.Column(db.String(20), nullable=True) - installment_number = db.Column(db.Integer, nullable=False) - due_date = db.Column(db.DateTime, nullable=False) - installment_amount= db.Column(db.Float, default=0.0) - total_repayment_amount = db.Column(db.Float, default=0.0) - paid = db.Column(db.Boolean, default=False) - paid_at = db.Column(db.DateTime, nullable=True) - - created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) - updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) - # loan = relationship( - # "Loan", - # primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id", - # foreign_keys=[loan_id], - # back_populates="loan_repayment_schedules", - # ) - - - @classmethod - def add_repayment_schedule(cls, loan, num_schedules, transaction_id): - """ - Add repayment schedules for a given loan. - """ - now = datetime.now(timezone.utc) - schedules = [] - - for i in range(num_schedules): - due_date = now + relativedelta(months=i + 1) - schedule = LoanRepaymentSchedule( - loan_id=loan.id, - installment_number=i + 1, - due_date=due_date, - total_repayment_amount = round(loan.repayment_amount, 2), - installment_amount=round(loan.installment_amount, 2), - product_id = loan.product_id, - transaction_id = transaction_id, - created_at=datetime.now(timezone.utc), - updated_at=datetime.now(timezone.utc) - ) - - db.session.add(schedule) - schedules.append(schedule) - - return schedules - - def to_dict(self): - return { - 'id': self.id, - 'loanId': self.loan_id, - 'installmentNumber': self.installment_number, - 'dueDate': self.due_date.isoformat(), - 'principalAmount': self.principal_amount, - 'interestAmount': self.interest_amount, - 'totalInstallment': self.total_installment, - 'paid': self.paid, - 'paidAt': self.paid_at.isoformat() if self.paid_at else None - } - - def __repr__(self): - return f'' +# from datetime import datetime, timezone +# from app.extensions import db +# from sqlalchemy.orm import relationship +# from dateutil.relativedelta import relativedelta +# from sqlalchemy.sql import func +# +# class LoanRepaymentSchedule(db.Model): +# __tablename__ = 'loan_repayment_schedules' +# +# id = db.Column(db.Integer, primary_key=True, autoincrement=True) +# loan_id = db.Column(db.Integer, nullable=False) +# transaction_id = db.Column(db.String(50), nullable=True) +# product_id = db.Column(db.String(20), nullable=True) +# installment_number = db.Column(db.Integer, nullable=False) +# due_date = db.Column(db.DateTime, nullable=False) +# installment_amount= db.Column(db.Float, default=0.0) +# total_repayment_amount = db.Column(db.Float, default=0.0) +# paid = db.Column(db.Boolean, default=False) +# paid_at = db.Column(db.DateTime, nullable=True) +# +# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) +# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) +# # loan = relationship( +# # "Loan", +# # primaryjoin="LoanRepaymentSchedule.loan_id == Loan.id", +# # foreign_keys=[loan_id], +# # back_populates="loan_repayment_schedules", +# # ) +# +# +# @classmethod +# def add_repayment_schedule(cls, loan, num_schedules, transaction_id): +# """ +# Add repayment schedules for a given loan. +# """ +# now = datetime.now(timezone.utc) +# schedules = [] +# +# for i in range(num_schedules): +# due_date = now + relativedelta(months=i + 1) +# schedule = LoanRepaymentSchedule( +# loan_id=loan.id, +# installment_number=i + 1, +# due_date=due_date, +# total_repayment_amount = round(loan.repayment_amount, 2), +# installment_amount=round(loan.installment_amount, 2), +# product_id = loan.product_id, +# transaction_id = transaction_id, +# created_at=datetime.now(timezone.utc), +# updated_at=datetime.now(timezone.utc) +# ) +# +# db.session.add(schedule) +# schedules.append(schedule) +# +# return schedules +# +# def to_dict(self): +# return { +# 'id': self.id, +# 'loanId': self.loan_id, +# 'installmentNumber': self.installment_number, +# 'dueDate': self.due_date.isoformat(), +# 'principalAmount': self.principal_amount, +# 'interestAmount': self.interest_amount, +# 'totalInstallment': self.total_installment, +# 'paid': self.paid, +# 'paidAt': self.paid_at.isoformat() if self.paid_at else None +# } +# +# def __repr__(self): +# return f'' diff --git a/app/models/offer.py b/app/models/offer.py index 506345e..7ec9a1e 100644 --- a/app/models/offer.py +++ b/app/models/offer.py @@ -1,94 +1,94 @@ -from datetime import datetime, timezone -from app.extensions import db -from app.models.charge import Charge -from sqlalchemy.orm import relationship -from sqlalchemy.sql import func - -class Offer(db.Model): - __tablename__ = 'offers' - - id = db.Column(db.String, primary_key=True) - product_id = db.Column(db.String, nullable=False) - min_amount = db.Column(db.Float, nullable=False) - max_amount = db.Column(db.Float, nullable=False) - tenor = db.Column(db.Integer, nullable=False) - schedule = db.Column(db.Integer, nullable=True) - interest_rate = db.Column(db.Float, default=3.0) - management_rate = db.Column(db.Float, default=1.0) - insurance_rate = db.Column(db.Float, default=1.0) - vat_rate = db.Column(db.Float, default=7.5) - list_order = db.Column(db.Integer, nullable=True) - max_daily_loans = db.Column(db.Integer, nullable=True) - max_active_loans = db.Column(db.Integer, nullable=True) - max_life_loans = db.Column(db.Integer, nullable=True) - created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) - updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) - - # charges = relationship( - # "Charge", - # primaryjoin="Offer.id == Charge.offer_id", - # foreign_keys="Charge.offer_id", - # back_populates="offer", - # ) - - @classmethod - def get_all_offers(cls): - """ - Return all offers in dictionary format. - """ - offers = cls.query.all() - - if not offers: - raise ValueError(f"No available offers") - return offers - - @classmethod - def is_valid_offer(cls, offer_id): - offer = cls.query.filter_by(id=str(offer_id)).first() - - - if not offer: - return False - return offer - - @classmethod - def get_offer_by_id(cls, offer_id): - """ - Return an offer by its ID. - """ - offer = cls.query.filter_by(id=str(offer_id)).first() - - if not offer: - raise ValueError(f"Offer with ID {offer_id} not found") - return offer - - @classmethod - def get_offer_by_product_id(cls, product_id): - """ - Return an offer by its product ID. - """ - offer = cls.query.filter_by(product_id=str(product_id)).first() - - if not offer: - raise ValueError(f"Offer with Product ID {product_id} not found") - return offer - - def to_dict(self): - return { - "offerId": self.id, - "productId": self.product_id, - "minAmount": self.min_amount, - "maxAmount": self.max_amount, - "tenor": self.tenor, - "interest_rate": self.interest_rate, - "management_rate": self.management_rate, - "insurance_rate": self.insurance_rate, - "vat_rate": self.vat_rate, - "maxDailyLoans": self.max_daily_loans, - "maxActiveLoans": self.max_active_loans, - "maxLifeLoans": self.max_life_loans - - } - - def __repr__(self): - return f'' \ No newline at end of file +# from datetime import datetime, timezone +# from app.extensions import db +# from app.models.charge import Charge +# from sqlalchemy.orm import relationship +# from sqlalchemy.sql import func +# +# class Offer(db.Model): +# __tablename__ = 'offers' +# +# id = db.Column(db.String, primary_key=True) +# product_id = db.Column(db.String, nullable=False) +# min_amount = db.Column(db.Float, nullable=False) +# max_amount = db.Column(db.Float, nullable=False) +# tenor = db.Column(db.Integer, nullable=False) +# schedule = db.Column(db.Integer, nullable=True) +# interest_rate = db.Column(db.Float, default=3.0) +# management_rate = db.Column(db.Float, default=1.0) +# insurance_rate = db.Column(db.Float, default=1.0) +# vat_rate = db.Column(db.Float, default=7.5) +# list_order = db.Column(db.Integer, nullable=True) +# max_daily_loans = db.Column(db.Integer, nullable=True) +# max_active_loans = db.Column(db.Integer, nullable=True) +# max_life_loans = db.Column(db.Integer, nullable=True) +# created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) +# updated_at = db.Column(db.DateTime(timezone=True), server_default=func.now(), onupdate=func.now()) +# +# # charges = relationship( +# # "Charge", +# # primaryjoin="Offer.id == Charge.offer_id", +# # foreign_keys="Charge.offer_id", +# # back_populates="offer", +# # ) +# +# @classmethod +# def get_all_offers(cls): +# """ +# Return all offers in dictionary format. +# """ +# offers = cls.query.all() +# +# if not offers: +# raise ValueError(f"No available offers") +# return offers +# +# @classmethod +# def is_valid_offer(cls, offer_id): +# offer = cls.query.filter_by(id=str(offer_id)).first() +# +# +# if not offer: +# return False +# return offer +# +# @classmethod +# def get_offer_by_id(cls, offer_id): +# """ +# Return an offer by its ID. +# """ +# offer = cls.query.filter_by(id=str(offer_id)).first() +# +# if not offer: +# raise ValueError(f"Offer with ID {offer_id} not found") +# return offer +# +# @classmethod +# def get_offer_by_product_id(cls, product_id): +# """ +# Return an offer by its product ID. +# """ +# offer = cls.query.filter_by(product_id=str(product_id)).first() +# +# if not offer: +# raise ValueError(f"Offer with Product ID {product_id} not found") +# return offer +# +# def to_dict(self): +# return { +# "offerId": self.id, +# "productId": self.product_id, +# "minAmount": self.min_amount, +# "maxAmount": self.max_amount, +# "tenor": self.tenor, +# "interest_rate": self.interest_rate, +# "management_rate": self.management_rate, +# "insurance_rate": self.insurance_rate, +# "vat_rate": self.vat_rate, +# "maxDailyLoans": self.max_daily_loans, +# "maxActiveLoans": self.max_active_loans, +# "maxLifeLoans": self.max_life_loans +# +# } +# +# def __repr__(self): +# return f'' \ No newline at end of file