137 lines
6.1 KiB
Python
137 lines
6.1 KiB
Python
"""
|
|
Database operations module for salary analytics.
|
|
"""
|
|
|
|
import logging
|
|
from sqlalchemy import text
|
|
from .config import BATCH_RESULTS_TABLE
|
|
from datetime import datetime
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class DatabaseOperations:
|
|
def __init__(self, engine):
|
|
"""Initialize with SQLAlchemy engine."""
|
|
self.engine = engine
|
|
|
|
def create_batch_results_table(self):
|
|
"""Create the batch results table if it doesn't exist."""
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
# Check if table exists
|
|
check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{BATCH_RESULTS_TABLE}')")
|
|
table_exists = conn.execute(check_table).scalar()
|
|
|
|
if not table_exists:
|
|
# Create table
|
|
create_table = text(f"""
|
|
CREATE TABLE {BATCH_RESULTS_TABLE} (
|
|
id SERIAL PRIMARY KEY,
|
|
batch_number INTEGER,
|
|
total_batches INTEGER,
|
|
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
accountid TEXT,
|
|
num_months INTEGER,
|
|
least_inflow_6m DECIMAL,
|
|
avg_monthly_salary DECIMAL,
|
|
estimated_next_amount DECIMAL,
|
|
estimated_next_date DATE,
|
|
is_45day_salary BOOLEAN,
|
|
is_2months_salary BOOLEAN,
|
|
status TEXT
|
|
)
|
|
""")
|
|
conn.execute(create_table)
|
|
conn.commit()
|
|
logger.info(f"Created table {BATCH_RESULTS_TABLE}")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error creating batch results table: {str(e)}")
|
|
return False
|
|
|
|
def save_batch_to_db(self, batch_number, total_batches, results_df, status="success"):
|
|
"""Save batch processing results to database."""
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
# Add batch metadata to the DataFrame
|
|
results_df['batch_number'] = batch_number
|
|
results_df['total_batches'] = total_batches
|
|
results_df['processed_at'] = datetime.now()
|
|
|
|
# Convert DataFrame to list of dictionaries
|
|
records = results_df.to_dict('records')
|
|
|
|
# Insert each record
|
|
for record in records:
|
|
insert_query = text(f"""
|
|
INSERT INTO {BATCH_RESULTS_TABLE}
|
|
(batch_number, total_batches, processed_at, accountid, num_months,
|
|
least_inflow_6m, avg_monthly_salary, estimated_next_amount,
|
|
estimated_next_date, is_45day_salary, is_2months_salary, status)
|
|
VALUES
|
|
(:batch_number, :total_batches, :processed_at, :accountid, :num_months,
|
|
:least_inflow_6m, :avg_monthly_salary, :estimated_next_amount,
|
|
:estimated_next_date, :is_45day_salary, :is_2months_salary, :status)
|
|
""")
|
|
|
|
# Convert boolean columns to proper format
|
|
record['is_45day_salary'] = record.get('45daysalary', False)
|
|
record['is_2months_salary'] = record.get('2monthssalary', False)
|
|
|
|
# Add status
|
|
record['status'] = status
|
|
|
|
conn.execute(insert_query, record)
|
|
|
|
conn.commit()
|
|
logger.info(f"Successfully saved batch {batch_number} results to database")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Error saving batch {batch_number} to database: {str(e)}")
|
|
return False
|
|
|
|
def get_batch_status(self, batch_number):
|
|
"""Get the status of a specific batch."""
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
query = text(f"""
|
|
SELECT
|
|
batch_number,
|
|
total_batches,
|
|
processed_at,
|
|
COUNT(*) as total_records,
|
|
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records,
|
|
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records
|
|
FROM {BATCH_RESULTS_TABLE}
|
|
WHERE batch_number = :batch_number
|
|
GROUP BY batch_number, total_batches, processed_at
|
|
ORDER BY processed_at DESC
|
|
LIMIT 1
|
|
""")
|
|
result = conn.execute(query, {"batch_number": batch_number}).fetchone()
|
|
return dict(result) if result else None
|
|
except Exception as e:
|
|
logger.error(f"Error getting batch {batch_number} status: {str(e)}")
|
|
return None
|
|
|
|
def get_all_batches(self):
|
|
"""Get all batch processing results."""
|
|
try:
|
|
with self.engine.connect() as conn:
|
|
query = text(f"""
|
|
SELECT
|
|
batch_number,
|
|
total_batches,
|
|
processed_at,
|
|
COUNT(*) as total_records,
|
|
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records,
|
|
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records
|
|
FROM {BATCH_RESULTS_TABLE}
|
|
GROUP BY batch_number, total_batches, processed_at
|
|
ORDER BY batch_number
|
|
""")
|
|
results = conn.execute(query).fetchall()
|
|
return [dict(row) for row in results]
|
|
except Exception as e:
|
|
logger.error(f"Error getting all batches: {str(e)}")
|
|
return [] |