from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean, func from sqlalchemy.orm import declarative_base, Session from datetime import datetime from app.utils.logger import logger from app.extensions import db class BatchResult(db.Model): __tablename__ = "salary_analytics_batch_results" id = Column(Integer, primary_key=True, autoincrement=True) batch_number = Column(Integer, nullable=False) total_batches = Column(Integer, nullable=False) processed_at = Column(DateTime, default=datetime.utcnow) accountid = Column(String, nullable=False) num_months = Column(Integer) least_inflow_6m = Column(Numeric) avg_monthly_salary = Column(Numeric) estimated_next_amount = Column(Numeric) estimated_next_date = Column(DateTime) is_45day_salary = Column(Boolean, default=False) is_2months_salary = Column(Boolean, default=False) status = Column(String, default="success") @classmethod def save_batch(cls, batch_number, total_batches, results_df, status="success"): """Save batch results into DB using ORM bulk insert.""" try: results_df["batch_number"] = batch_number results_df["total_batches"] = total_batches results_df["processed_at"] = datetime.utcnow() results_df["status"] = status # Normalize boolean columns results_df["is_45day_salary"] = results_df.get("45daysalary", False) results_df["is_2months_salary"] = results_df.get("2monthssalary", False) # Convert to list of ORM objects records = [ cls(**row) for row in results_df.to_dict("records") ] db.session.bulk_save_objects(records) db.session.commit() logger.info(f"Saved batch {batch_number} successfully.") return True except Exception as e: db.session.rollback() logger.error(f"Error saving batch {batch_number}: {str(e)}") return False @classmethod def get_batch_status(cls, batch_number: int): """Return summary info about one batch.""" try: result = ( db.session.query( cls.batch_number, cls.total_batches, cls.processed_at, func.count().label("total_records"), func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"), func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"), ) .filter(cls.batch_number == batch_number) .group_by(cls.batch_number, cls.total_batches, cls.processed_at) .order_by(cls.processed_at.desc()) .first() ) return dict(result._mapping) if result else None except Exception as e: logger.error(f"Error fetching batch {batch_number} status: {str(e)}") return None @classmethod def get_all_batches(cls): """Return summaries for all batches.""" try: results = ( db.session.query( cls.batch_number, cls.total_batches, cls.processed_at, func.count().label("total_records"), func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"), func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"), ) .group_by(cls.batch_number, cls.total_batches, cls.processed_at) .order_by(cls.batch_number) .all() ) return [dict(r._mapping) for r in results] except Exception as e: logger.error(f"Error fetching all batches: {str(e)}") return []