98 lines
3.9 KiB
Python
98 lines
3.9 KiB
Python
from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean, func
|
|
from sqlalchemy.orm import declarative_base, Session
|
|
from datetime import datetime
|
|
from app.utils.logger import logger
|
|
from app.extensions import db
|
|
|
|
|
|
class BatchResult(db.Model):
|
|
__tablename__ = "salary_analytics_batch_results"
|
|
|
|
id = Column(Integer, primary_key=True, autoincrement=True)
|
|
batch_number = Column(Integer, nullable=False)
|
|
total_batches = Column(Integer, nullable=False)
|
|
processed_at = Column(DateTime, default=datetime.utcnow)
|
|
accountid = Column(String, nullable=False)
|
|
num_months = Column(Integer)
|
|
least_inflow_6m = Column(Numeric)
|
|
avg_monthly_salary = Column(Numeric)
|
|
estimated_next_amount = Column(Numeric)
|
|
estimated_next_date = Column(DateTime)
|
|
is_45day_salary = Column(Boolean, default=False)
|
|
is_2months_salary = Column(Boolean, default=False)
|
|
status = Column(String, default="success")
|
|
|
|
|
|
@classmethod
|
|
def save_batch(cls, batch_number, total_batches, results_df, status="success"):
|
|
"""Save batch results into DB using ORM bulk insert."""
|
|
try:
|
|
results_df["batch_number"] = batch_number
|
|
results_df["total_batches"] = total_batches
|
|
results_df["processed_at"] = datetime.utcnow()
|
|
results_df["status"] = status
|
|
|
|
# Normalize boolean columns
|
|
results_df["is_45day_salary"] = results_df.get("45daysalary", False)
|
|
results_df["is_2months_salary"] = results_df.get("2monthssalary", False)
|
|
|
|
# Convert to list of ORM objects
|
|
records = [
|
|
cls(**row)
|
|
for row in results_df.to_dict("records")
|
|
]
|
|
|
|
db.session.bulk_save_objects(records)
|
|
db.session.commit()
|
|
logger.info(f"Saved batch {batch_number} successfully.")
|
|
return True
|
|
except Exception as e:
|
|
db.session.rollback()
|
|
logger.error(f"Error saving batch {batch_number}: {str(e)}")
|
|
return False
|
|
|
|
@classmethod
|
|
def get_batch_status(cls, batch_number: int):
|
|
"""Return summary info about one batch."""
|
|
try:
|
|
result = (
|
|
db.session.query(
|
|
cls.batch_number,
|
|
cls.total_batches,
|
|
cls.processed_at,
|
|
func.count().label("total_records"),
|
|
func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"),
|
|
func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"),
|
|
)
|
|
.filter(cls.batch_number == batch_number)
|
|
.group_by(cls.batch_number, cls.total_batches, cls.processed_at)
|
|
.order_by(cls.processed_at.desc())
|
|
.first()
|
|
)
|
|
return dict(result._mapping) if result else None
|
|
except Exception as e:
|
|
logger.error(f"Error fetching batch {batch_number} status: {str(e)}")
|
|
return None
|
|
|
|
@classmethod
|
|
def get_all_batches(cls):
|
|
"""Return summaries for all batches."""
|
|
try:
|
|
results = (
|
|
db.session.query(
|
|
cls.batch_number,
|
|
cls.total_batches,
|
|
cls.processed_at,
|
|
func.count().label("total_records"),
|
|
func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"),
|
|
func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"),
|
|
)
|
|
.group_by(cls.batch_number, cls.total_batches, cls.processed_at)
|
|
.order_by(cls.batch_number)
|
|
.all()
|
|
)
|
|
return [dict(r._mapping) for r in results]
|
|
except Exception as e:
|
|
logger.error(f"Error fetching all batches: {str(e)}")
|
|
return []
|