Enhance salary analytics API with database operations and performance logging

- Introduced `DatabaseOperations` class for managing batch results in the database.
- Added functionality to create a batch results table and save batch processing results.
- Updated API endpoints to log execution time and handle batch processing errors more effectively.
- Improved response handling in analysis endpoints and added batch metadata to results.
- Suppressed warnings and improved logging throughout the application.
This commit is contained in:
2025-05-10 16:56:23 +01:00
parent 305e5da4ec
commit 1a4e539626
4 changed files with 298 additions and 23 deletions
+149 -20
View File
@@ -13,12 +13,16 @@ import logging
import pandas as pd import pandas as pd
import tempfile import tempfile
from datetime import datetime from datetime import datetime
from sqlalchemy import text from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData
import numpy as np
import warnings
import time
from .main import SalaryAnalyticsPipeline from .main import SalaryAnalyticsPipeline
from .config import OUTPUT_PATHS, TABLE_NAME from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE
from .data_loader import DataLoader from .data_loader import DataLoader
from .salary_predictor import SalaryPredictor from .salary_predictor import SalaryPredictor
from .salary_earner_analyzer import SalaryEarnerAnalyzer from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .db_operations import DatabaseOperations
# Configure logging # Configure logging
logging.basicConfig( logging.basicConfig(
@@ -27,6 +31,10 @@ logging.basicConfig(
) )
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
# Suppress warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
pd.options.mode.chained_assignment = None
app = FastAPI( app = FastAPI(
title="Salary Analytics API", title="Salary Analytics API",
description="API for analyzing and predicting salary patterns from transaction data", description="API for analyzing and predicting salary patterns from transaction data",
@@ -96,72 +104,91 @@ async def startup_event():
@app.get("/") @app.get("/")
async def root(): async def root():
"""Root endpoint.""" """Root endpoint."""
start_time = time.time()
logger.info("Root endpoint accessed") logger.info("Root endpoint accessed")
return {"message": "Welcome to Salary Analytics API"} response = {"message": "Welcome to Salary Analytics API"}
logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
return response
@app.get("/health") @app.get("/health")
async def health_check(): async def health_check():
"""Health check endpoint.""" """Health check endpoint."""
start_time = time.time()
logger.info("Health check endpoint accessed") logger.info("Health check endpoint accessed")
return {"status": "healthy"} response = {"status": "healthy"}
logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
return response
@app.post("/analyze/keyword", response_model=AnalysisResponse) @app.post("/analyze/keyword", response_model=AnalysisResponse)
async def analyze_keyword(): async def analyze_keyword():
"""Run keyword-based salary transaction analysis.""" """Run keyword-based salary transaction analysis."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info("Starting keyword analysis...") logger.info("Starting keyword analysis...")
data = pipeline.run_keyword_analysis() data = pipeline.run_keyword_analysis()
logger.info(f"Keyword analysis completed. Found {len(data)} matches") logger.info(f"Keyword analysis completed. Found {len(data)} matches")
return AnalysisResponse( response = AnalysisResponse(
message="Keyword analysis completed successfully", message="Keyword analysis completed successfully",
data={"count": len(data)} data={"count": len(data)}
) )
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error in keyword analysis: {str(e)}") logger.error(f"Error in keyword analysis: {str(e)}")
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/consistent-amount", response_model=AnalysisResponse) @app.post("/analyze/consistent-amount", response_model=AnalysisResponse)
async def analyze_consistent_amount(): async def analyze_consistent_amount():
"""Run consistent amount transaction analysis.""" """Run consistent amount transaction analysis."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info("Starting consistent amount analysis...") logger.info("Starting consistent amount analysis...")
data = pipeline.run_consistent_amount_analysis() data = pipeline.run_consistent_amount_analysis()
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
return AnalysisResponse( response = AnalysisResponse(
message="Consistent amount analysis completed successfully", message="Consistent amount analysis completed successfully",
data={"count": len(data)} data={"count": len(data)}
) )
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error in consistent amount analysis: {str(e)}") logger.error(f"Error in consistent amount analysis: {str(e)}")
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/transaction-type", response_model=AnalysisResponse) @app.post("/analyze/transaction-type", response_model=AnalysisResponse)
async def analyze_transaction_type(): async def analyze_transaction_type():
"""Run transaction type analysis.""" """Run transaction type analysis."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info("Starting transaction type analysis...") logger.info("Starting transaction type analysis...")
data = pipeline.run_transaction_type_analysis() data = pipeline.run_transaction_type_analysis()
logger.info(f"Transaction type analysis completed. Found {len(data)} matches") logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
return AnalysisResponse( response = AnalysisResponse(
message="Transaction type analysis completed successfully", message="Transaction type analysis completed successfully",
data={"count": len(data)} data={"count": len(data)}
) )
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error in transaction type analysis: {str(e)}") logger.error(f"Error in transaction type analysis: {str(e)}")
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/generate/reports", response_model=AnalysisResponse) @app.post("/generate/reports", response_model=AnalysisResponse)
async def generate_reports(background_tasks: BackgroundTasks): async def generate_reports(background_tasks: BackgroundTasks):
"""Generate salary earner reports.""" """Generate salary earner reports."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info("Starting report generation...") logger.info("Starting report generation...")
reports = pipeline.generate_salary_earner_reports() reports = pipeline.generate_salary_earner_reports()
logger.info("Reports generated successfully") logger.info("Reports generated successfully")
return AnalysisResponse( response = AnalysisResponse(
message="Reports generated successfully", message="Reports generated successfully",
data={ data={
"verified_salary_earners": len(reports['final_table']), "verified_salary_earners": len(reports['final_table']),
@@ -169,28 +196,36 @@ async def generate_reports(background_tasks: BackgroundTasks):
"high_earners": reports['total_high_earners'] "high_earners": reports['total_high_earners']
} }
) )
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error in report generation: {str(e)}") logger.error(f"Error in report generation: {str(e)}")
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/train/models", response_model=AnalysisResponse) @app.post("/train/models", response_model=AnalysisResponse)
async def train_models(): async def train_models():
"""Train salary prediction models.""" """Train salary prediction models."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info("Starting model training...") logger.info("Starting model training...")
pipeline.train_salary_prediction_models() pipeline.train_salary_prediction_models()
logger.info("Models trained successfully") logger.info("Models trained successfully")
return AnalysisResponse( response = AnalysisResponse(
message="Models trained successfully" message="Models trained successfully"
) )
logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error in model training: {str(e)}") logger.error(f"Error in model training: {str(e)}")
logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{report_type}") @app.get("/download/{report_type}")
async def download_report(report_type: str): async def download_report(report_type: str):
"""Download generated reports.""" """Download generated reports."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info(f"Attempting to download report: {report_type}") logger.info(f"Attempting to download report: {report_type}")
@@ -205,40 +240,50 @@ async def download_report(report_type: str):
if report_type not in file_paths: if report_type not in file_paths:
logger.error(f"Report type not found: {report_type}") logger.error(f"Report type not found: {report_type}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report type not found") raise HTTPException(status_code=404, detail="Report type not found")
file_path = file_paths[report_type] file_path = file_paths[report_type]
if not os.path.exists(file_path): if not os.path.exists(file_path):
logger.error(f"Report file not found: {file_path}") logger.error(f"Report file not found: {file_path}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report file not found") raise HTTPException(status_code=404, detail="Report file not found")
logger.info(f"Successfully found report file: {file_path}") logger.info(f"Successfully found report file: {file_path}")
return FileResponse( response = FileResponse(
path=file_path, path=file_path,
filename=os.path.basename(file_path), filename=os.path.basename(file_path),
media_type="application/octet-stream" media_type="application/octet-stream"
) )
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error downloading report: {str(e)}") logger.error(f"Error downloading report: {str(e)}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/run/pipeline", response_model=AnalysisResponse) @app.post("/run/pipeline", response_model=AnalysisResponse)
async def run_full_pipeline(): async def run_full_pipeline():
"""Run the complete salary analytics pipeline.""" """Run the complete salary analytics pipeline."""
start_time = time.time()
try: try:
check_data_loaded() check_data_loaded()
logger.info("Starting full pipeline...") logger.info("Starting full pipeline...")
success = pipeline.run_full_pipeline() success = pipeline.run_full_pipeline()
if not success: if not success:
logger.error("Pipeline failed") logger.error("Pipeline failed")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Pipeline failed") raise HTTPException(status_code=500, detail="Pipeline failed")
logger.info("Pipeline completed successfully") logger.info("Pipeline completed successfully")
return AnalysisResponse( response = AnalysisResponse(
message="Pipeline completed successfully" message="Pipeline completed successfully"
) )
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error in pipeline: {str(e)}") logger.error(f"Error in pipeline: {str(e)}")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
@app.post("/load-data") @app.post("/load-data")
@@ -253,11 +298,16 @@ async def load_data(source: str = "db", file: Optional[UploadFile] = File(None))
Returns: Returns:
dict: Status of data loading dict: Status of data loading
""" """
start_time = time.time()
try: try:
if source not in ['db', 'csv']: if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file: if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
if source == 'csv': if source == 'csv':
@@ -276,16 +326,21 @@ async def load_data(source: str = "db", file: Optional[UploadFile] = File(None))
success = pipeline.load_data(source='db') success = pipeline.load_data(source='db')
if not success: if not success:
logger.error("Failed to load data")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to load data") raise HTTPException(status_code=500, detail="Failed to load data")
return { response = {
"status": "success", "status": "success",
"message": f"Successfully loaded {len(pipeline.df)} rows of data", "message": f"Successfully loaded {len(pipeline.df)} rows of data",
"columns": pipeline.df.columns.tolist(), "columns": pipeline.df.columns.tolist(),
"row_count": len(pipeline.df) "row_count": len(pipeline.df)
} }
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e: except Exception as e:
logger.error(f"Error loading data: {str(e)}") logger.error(f"Error loading data: {str(e)}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
@@ -311,11 +366,16 @@ async def run_streaming_pipeline(
Returns: Returns:
List[BatchResponse]: List of responses for each batch processed List[BatchResponse]: List of responses for each batch processed
""" """
start_time = time.time()
try: try:
if source not in ['db', 'csv']: if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file: if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader # Initialize data loader
@@ -327,8 +387,21 @@ async def run_streaming_pipeline(
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True) os.makedirs(batch_output_dir, exist_ok=True)
# Initialize database operations
if not data_loader.connect():
logger.error("Failed to connect to database")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to connect to database")
db_ops = DatabaseOperations(data_loader.engine)
if not db_ops.create_batch_results_table():
logger.error("Failed to create batch results table")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to create batch results table")
responses = [] responses = []
batch_number = 0 batch_number = 0
batch_start_time = time.time()
def preprocess_chunk(chunk): def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader.""" """Preprocess a chunk of data with the same logic as DataLoader."""
@@ -369,6 +442,7 @@ async def run_streaming_pipeline(
pipeline.df = chunk pipeline.df = chunk
try: try:
batch_start_time = time.time()
# Run analyses # Run analyses
pipeline.run_keyword_analysis() pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis() pipeline.run_consistent_amount_analysis()
@@ -377,9 +451,25 @@ async def run_streaming_pipeline(
# Generate reports # Generate reports
reports = pipeline.generate_salary_earner_reports() reports = pipeline.generate_salary_earner_reports()
# Save batch results # Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = -1 # Unknown for CSV
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
reports['final_table'].to_csv(batch_results_path, index=False) results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse( responses.append(BatchResponse(
batch_number=batch_number, batch_number=batch_number,
@@ -389,13 +479,23 @@ async def run_streaming_pipeline(
message=f"Successfully processed batch {batch_number}" message=f"Successfully processed batch {batch_number}"
)) ))
except Exception as e: except Exception as e:
logger.error(f"Error processing batch {batch_number}: {str(e)}") error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse( responses.append(BatchResponse(
batch_number=batch_number, batch_number=batch_number,
total_batches=-1, total_batches=-1,
processed_rows=len(chunk), processed_rows=len(chunk),
results_path="", results_path="",
message=f"Error processing batch {batch_number}: {str(e)}" message=f"Error processing batch {batch_number}: {error_message}"
)) ))
finally: finally:
# Clean up temporary file # Clean up temporary file
@@ -432,6 +532,7 @@ async def run_streaming_pipeline(
pipeline.df = chunk pipeline.df = chunk
try: try:
batch_start_time = time.time()
# Run analyses # Run analyses
pipeline.run_keyword_analysis() pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis() pipeline.run_consistent_amount_analysis()
@@ -440,9 +541,25 @@ async def run_streaming_pipeline(
# Generate reports # Generate reports
reports = pipeline.generate_salary_earner_reports() reports = pipeline.generate_salary_earner_reports()
# Save batch results # Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
reports['final_table'].to_csv(batch_results_path, index=False) results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse( responses.append(BatchResponse(
batch_number=batch_number, batch_number=batch_number,
@@ -452,18 +569,30 @@ async def run_streaming_pipeline(
message=f"Successfully processed batch {batch_number} of {total_batches}" message=f"Successfully processed batch {batch_number} of {total_batches}"
)) ))
except Exception as e: except Exception as e:
logger.error(f"Error processing batch {batch_number}: {str(e)}") error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse( responses.append(BatchResponse(
batch_number=batch_number, batch_number=batch_number,
total_batches=total_batches, total_batches=total_batches,
processed_rows=len(chunk), processed_rows=len(chunk),
results_path="", results_path="",
message=f"Error processing batch {batch_number}: {str(e)}" message=f"Error processing batch {batch_number}: {error_message}"
)) ))
offset += batch_size offset += batch_size
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return responses return responses
except Exception as e: except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}") logger.error(f"Error in streaming pipeline: {str(e)}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e)) raise HTTPException(status_code=500, detail=str(e))
+1
View File
@@ -32,6 +32,7 @@ DB_CONFIG = {
# Table Configuration # Table Configuration
TABLE_NAME = "customer_account_transaction_hx" TABLE_NAME = "customer_account_transaction_hx"
BATCH_RESULTS_TABLE = "salary_analytics_batch_results"
# Salary Keywords # Salary Keywords
SALARY_KEYWORDS = [ SALARY_KEYWORDS = [
+137
View File
@@ -0,0 +1,137 @@
"""
Database operations module for salary analytics.
"""
import logging
from sqlalchemy import text
from .config import BATCH_RESULTS_TABLE
from datetime import datetime
logger = logging.getLogger(__name__)
class DatabaseOperations:
def __init__(self, engine):
"""Initialize with SQLAlchemy engine."""
self.engine = engine
def create_batch_results_table(self):
"""Create the batch results table if it doesn't exist."""
try:
with self.engine.connect() as conn:
# Check if table exists
check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{BATCH_RESULTS_TABLE}')")
table_exists = conn.execute(check_table).scalar()
if not table_exists:
# Create table
create_table = text(f"""
CREATE TABLE {BATCH_RESULTS_TABLE} (
id SERIAL PRIMARY KEY,
batch_number INTEGER,
total_batches INTEGER,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accountid TEXT,
num_months INTEGER,
least_inflow_6m DECIMAL,
avg_monthly_salary DECIMAL,
estimated_next_amount DECIMAL,
estimated_next_date DATE,
is_45day_salary BOOLEAN,
is_2months_salary BOOLEAN,
status TEXT
)
""")
conn.execute(create_table)
conn.commit()
logger.info(f"Created table {BATCH_RESULTS_TABLE}")
return True
except Exception as e:
logger.error(f"Error creating batch results table: {str(e)}")
return False
def save_batch_to_db(self, batch_number, total_batches, results_df, status="success"):
"""Save batch processing results to database."""
try:
with self.engine.connect() as conn:
# Add batch metadata to the DataFrame
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Convert DataFrame to list of dictionaries
records = results_df.to_dict('records')
# Insert each record
for record in records:
insert_query = text(f"""
INSERT INTO {BATCH_RESULTS_TABLE}
(batch_number, total_batches, processed_at, accountid, num_months,
least_inflow_6m, avg_monthly_salary, estimated_next_amount,
estimated_next_date, is_45day_salary, is_2months_salary, status)
VALUES
(:batch_number, :total_batches, :processed_at, :accountid, :num_months,
:least_inflow_6m, :avg_monthly_salary, :estimated_next_amount,
:estimated_next_date, :is_45day_salary, :is_2months_salary, :status)
""")
# Convert boolean columns to proper format
record['is_45day_salary'] = record.get('45daysalary', False)
record['is_2months_salary'] = record.get('2monthssalary', False)
# Add status
record['status'] = status
conn.execute(insert_query, record)
conn.commit()
logger.info(f"Successfully saved batch {batch_number} results to database")
return True
except Exception as e:
logger.error(f"Error saving batch {batch_number} to database: {str(e)}")
return False
def get_batch_status(self, batch_number):
"""Get the status of a specific batch."""
try:
with self.engine.connect() as conn:
query = text(f"""
SELECT
batch_number,
total_batches,
processed_at,
COUNT(*) as total_records,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records
FROM {BATCH_RESULTS_TABLE}
WHERE batch_number = :batch_number
GROUP BY batch_number, total_batches, processed_at
ORDER BY processed_at DESC
LIMIT 1
""")
result = conn.execute(query, {"batch_number": batch_number}).fetchone()
return dict(result) if result else None
except Exception as e:
logger.error(f"Error getting batch {batch_number} status: {str(e)}")
return None
def get_all_batches(self):
"""Get all batch processing results."""
try:
with self.engine.connect() as conn:
query = text(f"""
SELECT
batch_number,
total_batches,
processed_at,
COUNT(*) as total_records,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records
FROM {BATCH_RESULTS_TABLE}
GROUP BY batch_number, total_batches, processed_at
ORDER BY batch_number
""")
results = conn.execute(query).fetchall()
return [dict(row) for row in results]
except Exception as e:
logger.error(f"Error getting all batches: {str(e)}")
return []
+11 -3
View File
@@ -6,8 +6,16 @@ import pandas as pd
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
from matplotlib_venn import venn3 from matplotlib_venn import venn3
from datetime import datetime, timedelta from datetime import datetime, timedelta
import logging
from .config import MODEL_CONFIG, OUTPUT_PATHS from .config import MODEL_CONFIG, OUTPUT_PATHS
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
class SalaryEarnerAnalyzer: class SalaryEarnerAnalyzer:
def __init__(self, df): def __init__(self, df):
self.df = df self.df = df
@@ -124,7 +132,7 @@ class SalaryEarnerAnalyzer:
# Generate final table # Generate final table
self.final_table = self.generate_salary_earners_table(all_three_hypotheses) self.final_table = self.generate_salary_earners_table(all_three_hypotheses)
print(f"Found {self.final_table['accountid'].nunique()} verified salary earners") logger.info(f"Found {self.final_table['accountid'].nunique()} verified salary earners")
# Generate likely salary earner table # Generate likely salary earner table
green_section = self.filter_venn_section( green_section = self.filter_venn_section(
@@ -142,11 +150,11 @@ class SalaryEarnerAnalyzer:
self.likely_salary_earner = pd.concat([yellow_section, green_section]) self.likely_salary_earner = pd.concat([yellow_section, green_section])
self.likely_salary_earner = self.likely_salary_earner.drop_duplicates(subset=['id']) self.likely_salary_earner = self.likely_salary_earner.drop_duplicates(subset=['id'])
self.likely_salary_earner = self.generate_salary_earners_table(self.likely_salary_earner) self.likely_salary_earner = self.generate_salary_earners_table(self.likely_salary_earner)
print(f"Found {self.likely_salary_earner['accountid'].nunique()} likely salary earners") logger.info(f"Found {self.likely_salary_earner['accountid'].nunique()} likely salary earners")
# Analyze high earners # Analyze high earners
self.high_earner_details, total_high_earners = self.analyze_salary_earners(self.final_table) self.high_earner_details, total_high_earners = self.analyze_salary_earners(self.final_table)
print(f"\nTotal High Earners: {total_high_earners}") logger.info(f"\nTotal High Earners: {total_high_earners}")
# Plot hypothesis overlap # Plot hypothesis overlap
self.plot_hypothesis_overlap( self.plot_hypothesis_overlap(