from datetime import datetime, timezone from app.extensions import db from sqlalchemy.orm import relationship from sqlalchemy.sql import func from sqlalchemy import and_, or_, not_ import logging logger = logging.getLogger(__name__) class TransactionOffer(db.Model): __tablename__ = 'transaction_offers' id = db.Column(db.Integer, primary_key=True, autoincrement=True) customer_id = db.Column(db.String(50), nullable=False) transaction_id = db.Column(db.String(50), nullable=False) original_transaction = db.Column(db.String(50), nullable=True) offer_id = db.Column(db.String(20), nullable=False) product_id = db.Column(db.String(20), nullable=True) min_amount = db.Column(db.Float, nullable=False) max_amount = db.Column(db.Float, nullable=False) eligible_amount = db.Column(db.Float, nullable=True) tenor = db.Column(db.Integer, nullable=True) # tenor in months, typically created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc)) updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc)) # created_at = db.Column(db.DateTime(timezone=True), server_default=func.now()) # updated_at = db.Column(db.DateTime(timezone=True), onupdate=func.now()) def __repr__(self): return f'' # customer = relationship( # "Customer", # primaryjoin="Customer.id == TransactionOffer.customer_id", # foreign_keys=[customer_id], # back_populates="transaction_offers", # ) @classmethod def is_valid_transaction_offer(cls, transaction_offer, customer_id, product_id): transaction_offer = cls.query.filter_by( id=transaction_offer, customer_id=customer_id, # product_id = product_id # transaction_id = transaction_id, ).first() if not transaction_offer: return False return transaction_offer @classmethod def get_all_transaction_offers(cls, customer_id=None, transaction_id=None, offer_id=None, product_id=None, original_transaction=None, start_date=None, end_date=None, page=1, limit=20): """ Get all transaction offers with optional filtering Args: customer_id (str, optional): Filter by customer ID transaction_id (str, optional): Filter by transaction ID offer_id (str, optional): Filter by offer ID product_id (str, optional): Filter by product ID original_transaction (str, optional): Filter by original transaction start_date (datetime, optional): Filter by start date end_date (datetime, optional): Filter by end date page (int, optional): Page number for pagination limit (int, optional): Number of items per page Returns: tuple: (List of TransactionOffer objects, total count) """ query = cls.query # Apply filters if provided if customer_id: query = query.filter(cls.customer_id == customer_id) if transaction_id: query = query.filter(cls.transaction_id == transaction_id) if offer_id: query = query.filter(cls.offer_id == offer_id) if product_id: query = query.filter(cls.product_id == product_id) if original_transaction: query = query.filter(cls.original_transaction == original_transaction) if start_date: query = query.filter(cls.created_at >= start_date) if end_date: query = query.filter(cls.created_at <= end_date) # Order by created_at descending (newest first) query = query.order_by(cls.created_at.desc()) # Get total count before pagination total_count = query.count() # Apply pagination offset = (page - 1) * limit query = query.limit(limit).offset(offset) return query.all(), total_count # def to_dict(self): # return { # 'id': self.id, # 'customerId': self.customer_id, # 'transactionId': self.transaction_id, # 'offerId': self.offer_id, # 'productId': self.product_id, # 'minAmount': self.min_amount, # 'maxAmount': self.max_amount, # 'eligibleAmount': self.eligible_amount, # 'tenor': self.tenor, # 'createdAt': self.created_at.isoformat() if self.created_at else None, # 'updatedAt': self.updated_at.isoformat() if self.updated_at else None, # }