""" Database operations module for salary analytics. """ import logging from sqlalchemy import text from .config import BATCH_RESULTS_TABLE from datetime import datetime logger = logging.getLogger(__name__) class DatabaseOperations: def __init__(self, engine): """Initialize with SQLAlchemy engine.""" self.engine = engine def create_batch_results_table(self): """Create the batch results table if it doesn't exist.""" try: with self.engine.connect() as conn: # Check if table exists check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{BATCH_RESULTS_TABLE}')") table_exists = conn.execute(check_table).scalar() if not table_exists: # Create table create_table = text(f""" CREATE TABLE {BATCH_RESULTS_TABLE} ( id SERIAL PRIMARY KEY, batch_number INTEGER, total_batches INTEGER, processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, accountid TEXT, num_months INTEGER, least_inflow_6m DECIMAL, avg_monthly_salary DECIMAL, estimated_next_amount DECIMAL, estimated_next_date DATE, is_45day_salary BOOLEAN, is_2months_salary BOOLEAN, status TEXT ) """) conn.execute(create_table) conn.commit() logger.info(f"Created table {BATCH_RESULTS_TABLE}") return True except Exception as e: logger.error(f"Error creating batch results table: {str(e)}") return False def save_batch_to_db(self, batch_number, total_batches, results_df, status="success"): """Save batch processing results to database.""" try: with self.engine.connect() as conn: # Add batch metadata to the DataFrame results_df['batch_number'] = batch_number results_df['total_batches'] = total_batches results_df['processed_at'] = datetime.now() # Convert DataFrame to list of dictionaries records = results_df.to_dict('records') # Insert each record for record in records: insert_query = text(f""" INSERT INTO {BATCH_RESULTS_TABLE} (batch_number, total_batches, processed_at, accountid, num_months, least_inflow_6m, avg_monthly_salary, estimated_next_amount, estimated_next_date, is_45day_salary, is_2months_salary, status) VALUES (:batch_number, :total_batches, :processed_at, :accountid, :num_months, :least_inflow_6m, :avg_monthly_salary, :estimated_next_amount, :estimated_next_date, :is_45day_salary, :is_2months_salary, :status) """) # Convert boolean columns to proper format record['is_45day_salary'] = record.get('45daysalary', False) record['is_2months_salary'] = record.get('2monthssalary', False) # Add status record['status'] = status conn.execute(insert_query, record) conn.commit() logger.info(f"Successfully saved batch {batch_number} results to database") return True except Exception as e: logger.error(f"Error saving batch {batch_number} to database: {str(e)}") return False def get_batch_status(self, batch_number): """Get the status of a specific batch.""" try: with self.engine.connect() as conn: query = text(f""" SELECT batch_number, total_batches, processed_at, COUNT(*) as total_records, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records, SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records FROM {BATCH_RESULTS_TABLE} WHERE batch_number = :batch_number GROUP BY batch_number, total_batches, processed_at ORDER BY processed_at DESC LIMIT 1 """) result = conn.execute(query, {"batch_number": batch_number}).fetchone() return dict(result) if result else None except Exception as e: logger.error(f"Error getting batch {batch_number} status: {str(e)}") return None def get_all_batches(self): """Get all batch processing results.""" try: with self.engine.connect() as conn: query = text(f""" SELECT batch_number, total_batches, processed_at, COUNT(*) as total_records, SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records, SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records FROM {BATCH_RESULTS_TABLE} GROUP BY batch_number, total_batches, processed_at ORDER BY batch_number """) results = conn.execute(query).fetchall() return [dict(row) for row in results] except Exception as e: logger.error(f"Error getting all batches: {str(e)}") return []