From 1a4e5396262ea06bc99a0c5038702690e9635cbe Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Sat, 10 May 2025 16:56:23 +0100 Subject: [PATCH] Enhance salary analytics API with database operations and performance logging - Introduced `DatabaseOperations` class for managing batch results in the database. - Added functionality to create a batch results table and save batch processing results. - Updated API endpoints to log execution time and handle batch processing errors more effectively. - Improved response handling in analysis endpoints and added batch metadata to results. - Suppressed warnings and improved logging throughout the application. --- salary_analytics/api.py | 169 ++++++++++++++++++--- salary_analytics/config.py | 1 + salary_analytics/db_operations.py | 137 +++++++++++++++++ salary_analytics/salary_earner_analyzer.py | 14 +- 4 files changed, 298 insertions(+), 23 deletions(-) create mode 100644 salary_analytics/db_operations.py diff --git a/salary_analytics/api.py b/salary_analytics/api.py index 1906962..8d9e054 100644 --- a/salary_analytics/api.py +++ b/salary_analytics/api.py @@ -13,12 +13,16 @@ import logging import pandas as pd import tempfile from datetime import datetime -from sqlalchemy import text +from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData +import numpy as np +import warnings +import time from .main import SalaryAnalyticsPipeline -from .config import OUTPUT_PATHS, TABLE_NAME +from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE from .data_loader import DataLoader from .salary_predictor import SalaryPredictor from .salary_earner_analyzer import SalaryEarnerAnalyzer +from .db_operations import DatabaseOperations # Configure logging logging.basicConfig( @@ -27,6 +31,10 @@ logging.basicConfig( ) logger = logging.getLogger(__name__) +# Suppress warnings +warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy') +pd.options.mode.chained_assignment = None + app = FastAPI( title="Salary Analytics API", description="API for analyzing and predicting salary patterns from transaction data", @@ -96,72 +104,91 @@ async def startup_event(): @app.get("/") async def root(): """Root endpoint.""" + start_time = time.time() logger.info("Root endpoint accessed") - return {"message": "Welcome to Salary Analytics API"} + response = {"message": "Welcome to Salary Analytics API"} + logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") + return response @app.get("/health") async def health_check(): """Health check endpoint.""" + start_time = time.time() logger.info("Health check endpoint accessed") - return {"status": "healthy"} + response = {"status": "healthy"} + logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") + return response @app.post("/analyze/keyword", response_model=AnalysisResponse) async def analyze_keyword(): """Run keyword-based salary transaction analysis.""" + start_time = time.time() try: check_data_loaded() logger.info("Starting keyword analysis...") data = pipeline.run_keyword_analysis() logger.info(f"Keyword analysis completed. Found {len(data)} matches") - return AnalysisResponse( + response = AnalysisResponse( message="Keyword analysis completed successfully", data={"count": len(data)} ) + logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error in keyword analysis: {str(e)}") + logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/analyze/consistent-amount", response_model=AnalysisResponse) async def analyze_consistent_amount(): """Run consistent amount transaction analysis.""" + start_time = time.time() try: check_data_loaded() logger.info("Starting consistent amount analysis...") data = pipeline.run_consistent_amount_analysis() logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") - return AnalysisResponse( + response = AnalysisResponse( message="Consistent amount analysis completed successfully", data={"count": len(data)} ) + logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error in consistent amount analysis: {str(e)}") + logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/analyze/transaction-type", response_model=AnalysisResponse) async def analyze_transaction_type(): """Run transaction type analysis.""" + start_time = time.time() try: check_data_loaded() logger.info("Starting transaction type analysis...") data = pipeline.run_transaction_type_analysis() logger.info(f"Transaction type analysis completed. Found {len(data)} matches") - return AnalysisResponse( + response = AnalysisResponse( message="Transaction type analysis completed successfully", data={"count": len(data)} ) + logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error in transaction type analysis: {str(e)}") + logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/generate/reports", response_model=AnalysisResponse) async def generate_reports(background_tasks: BackgroundTasks): """Generate salary earner reports.""" + start_time = time.time() try: check_data_loaded() logger.info("Starting report generation...") reports = pipeline.generate_salary_earner_reports() logger.info("Reports generated successfully") - return AnalysisResponse( + response = AnalysisResponse( message="Reports generated successfully", data={ "verified_salary_earners": len(reports['final_table']), @@ -169,28 +196,36 @@ async def generate_reports(background_tasks: BackgroundTasks): "high_earners": reports['total_high_earners'] } ) + logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error in report generation: {str(e)}") + logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/train/models", response_model=AnalysisResponse) async def train_models(): """Train salary prediction models.""" + start_time = time.time() try: check_data_loaded() logger.info("Starting model training...") pipeline.train_salary_prediction_models() logger.info("Models trained successfully") - return AnalysisResponse( + response = AnalysisResponse( message="Models trained successfully" ) + logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error in model training: {str(e)}") + logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.get("/download/{report_type}") async def download_report(report_type: str): """Download generated reports.""" + start_time = time.time() try: check_data_loaded() logger.info(f"Attempting to download report: {report_type}") @@ -205,40 +240,50 @@ async def download_report(report_type: str): if report_type not in file_paths: logger.error(f"Report type not found: {report_type}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=404, detail="Report type not found") file_path = file_paths[report_type] if not os.path.exists(file_path): logger.error(f"Report file not found: {file_path}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=404, detail="Report file not found") logger.info(f"Successfully found report file: {file_path}") - return FileResponse( + response = FileResponse( path=file_path, filename=os.path.basename(file_path), media_type="application/octet-stream" ) + logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error downloading report: {str(e)}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/run/pipeline", response_model=AnalysisResponse) async def run_full_pipeline(): """Run the complete salary analytics pipeline.""" + start_time = time.time() try: check_data_loaded() logger.info("Starting full pipeline...") success = pipeline.run_full_pipeline() if not success: logger.error("Pipeline failed") + logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail="Pipeline failed") logger.info("Pipeline completed successfully") - return AnalysisResponse( + response = AnalysisResponse( message="Pipeline completed successfully" ) + logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error in pipeline: {str(e)}") + logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/load-data") @@ -253,11 +298,16 @@ async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)) Returns: dict: Status of data loading """ + start_time = time.time() try: if source not in ['db', 'csv']: + logger.error(f"Invalid source: {source}") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") if source == 'csv' and not file: + logger.error("No file provided for CSV source") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") if source == 'csv': @@ -276,16 +326,21 @@ async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)) success = pipeline.load_data(source='db') if not success: + logger.error("Failed to load data") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail="Failed to load data") - return { + response = { "status": "success", "message": f"Successfully loaded {len(pipeline.df)} rows of data", "columns": pipeline.df.columns.tolist(), "row_count": len(pipeline.df) } + logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") + return response except Exception as e: logger.error(f"Error loading data: {str(e)}") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): @@ -311,11 +366,16 @@ async def run_streaming_pipeline( Returns: List[BatchResponse]: List of responses for each batch processed """ + start_time = time.time() try: if source not in ['db', 'csv']: + logger.error(f"Invalid source: {source}") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") if source == 'csv' and not file: + logger.error("No file provided for CSV source") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") # Initialize data loader @@ -327,8 +387,21 @@ async def run_streaming_pipeline( batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") os.makedirs(batch_output_dir, exist_ok=True) + # Initialize database operations + if not data_loader.connect(): + logger.error("Failed to connect to database") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to connect to database") + + db_ops = DatabaseOperations(data_loader.engine) + if not db_ops.create_batch_results_table(): + logger.error("Failed to create batch results table") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to create batch results table") + responses = [] batch_number = 0 + batch_start_time = time.time() def preprocess_chunk(chunk): """Preprocess a chunk of data with the same logic as DataLoader.""" @@ -369,6 +442,7 @@ async def run_streaming_pipeline( pipeline.df = chunk try: + batch_start_time = time.time() # Run analyses pipeline.run_keyword_analysis() pipeline.run_consistent_amount_analysis() @@ -377,9 +451,25 @@ async def run_streaming_pipeline( # Generate reports reports = pipeline.generate_salary_earner_reports() - # Save batch results + # Add batch metadata to results + results_df = reports['final_table'].copy() + results_df['batch_number'] = batch_number + results_df['total_batches'] = -1 # Unknown for CSV + results_df['processed_at'] = datetime.now() + + # Save batch results to CSV batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") - reports['final_table'].to_csv(batch_results_path, index=False) + results_df.to_csv(batch_results_path, index=False) + + # Save to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=-1, # Unknown for CSV + results_df=results_df, + status="success" + ) + + logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") responses.append(BatchResponse( batch_number=batch_number, @@ -389,13 +479,23 @@ async def run_streaming_pipeline( message=f"Successfully processed batch {batch_number}" )) except Exception as e: - logger.error(f"Error processing batch {batch_number}: {str(e)}") + error_message = str(e) + logger.error(f"Error processing batch {batch_number}: {error_message}") + + # Save error to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=-1, + results_df=pd.DataFrame(), # Empty DataFrame for error case + status="error" + ) + responses.append(BatchResponse( batch_number=batch_number, total_batches=-1, processed_rows=len(chunk), results_path="", - message=f"Error processing batch {batch_number}: {str(e)}" + message=f"Error processing batch {batch_number}: {error_message}" )) finally: # Clean up temporary file @@ -432,6 +532,7 @@ async def run_streaming_pipeline( pipeline.df = chunk try: + batch_start_time = time.time() # Run analyses pipeline.run_keyword_analysis() pipeline.run_consistent_amount_analysis() @@ -440,9 +541,25 @@ async def run_streaming_pipeline( # Generate reports reports = pipeline.generate_salary_earner_reports() - # Save batch results + # Add batch metadata to results + results_df = reports['final_table'].copy() + results_df['batch_number'] = batch_number + results_df['total_batches'] = total_batches + results_df['processed_at'] = datetime.now() + + # Save batch results to CSV batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") - reports['final_table'].to_csv(batch_results_path, index=False) + results_df.to_csv(batch_results_path, index=False) + + # Save to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=total_batches, + results_df=results_df, + status="success" + ) + + logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") responses.append(BatchResponse( batch_number=batch_number, @@ -452,18 +569,30 @@ async def run_streaming_pipeline( message=f"Successfully processed batch {batch_number} of {total_batches}" )) except Exception as e: - logger.error(f"Error processing batch {batch_number}: {str(e)}") + error_message = str(e) + logger.error(f"Error processing batch {batch_number}: {error_message}") + + # Save error to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=total_batches, + results_df=pd.DataFrame(), # Empty DataFrame for error case + status="error" + ) + responses.append(BatchResponse( batch_number=batch_number, total_batches=total_batches, processed_rows=len(chunk), results_path="", - message=f"Error processing batch {batch_number}: {str(e)}" + message=f"Error processing batch {batch_number}: {error_message}" )) offset += batch_size + logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") return responses except Exception as e: logger.error(f"Error in streaming pipeline: {str(e)}") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/salary_analytics/config.py b/salary_analytics/config.py index 6d1b0fc..0e3d36c 100644 --- a/salary_analytics/config.py +++ b/salary_analytics/config.py @@ -32,6 +32,7 @@ DB_CONFIG = { # Table Configuration TABLE_NAME = "customer_account_transaction_hx" +BATCH_RESULTS_TABLE = "salary_analytics_batch_results" # Salary Keywords SALARY_KEYWORDS = [ diff --git a/salary_analytics/db_operations.py b/salary_analytics/db_operations.py new file mode 100644 index 0000000..9cb317e --- /dev/null +++ b/salary_analytics/db_operations.py @@ -0,0 +1,137 @@ +""" +Database operations module for salary analytics. +""" + +import logging +from sqlalchemy import text +from .config import BATCH_RESULTS_TABLE +from datetime import datetime + +logger = logging.getLogger(__name__) + +class DatabaseOperations: + def __init__(self, engine): + """Initialize with SQLAlchemy engine.""" + self.engine = engine + + def create_batch_results_table(self): + """Create the batch results table if it doesn't exist.""" + try: + with self.engine.connect() as conn: + # Check if table exists + check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{BATCH_RESULTS_TABLE}')") + table_exists = conn.execute(check_table).scalar() + + if not table_exists: + # Create table + create_table = text(f""" + CREATE TABLE {BATCH_RESULTS_TABLE} ( + id SERIAL PRIMARY KEY, + batch_number INTEGER, + total_batches INTEGER, + processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + accountid TEXT, + num_months INTEGER, + least_inflow_6m DECIMAL, + avg_monthly_salary DECIMAL, + estimated_next_amount DECIMAL, + estimated_next_date DATE, + is_45day_salary BOOLEAN, + is_2months_salary BOOLEAN, + status TEXT + ) + """) + conn.execute(create_table) + conn.commit() + logger.info(f"Created table {BATCH_RESULTS_TABLE}") + return True + except Exception as e: + logger.error(f"Error creating batch results table: {str(e)}") + return False + + def save_batch_to_db(self, batch_number, total_batches, results_df, status="success"): + """Save batch processing results to database.""" + try: + with self.engine.connect() as conn: + # Add batch metadata to the DataFrame + results_df['batch_number'] = batch_number + results_df['total_batches'] = total_batches + results_df['processed_at'] = datetime.now() + + # Convert DataFrame to list of dictionaries + records = results_df.to_dict('records') + + # Insert each record + for record in records: + insert_query = text(f""" + INSERT INTO {BATCH_RESULTS_TABLE} + (batch_number, total_batches, processed_at, accountid, num_months, + least_inflow_6m, avg_monthly_salary, estimated_next_amount, + estimated_next_date, is_45day_salary, is_2months_salary, status) + VALUES + (:batch_number, :total_batches, :processed_at, :accountid, :num_months, + :least_inflow_6m, :avg_monthly_salary, :estimated_next_amount, + :estimated_next_date, :is_45day_salary, :is_2months_salary, :status) + """) + + # Convert boolean columns to proper format + record['is_45day_salary'] = record.get('45daysalary', False) + record['is_2months_salary'] = record.get('2monthssalary', False) + + # Add status + record['status'] = status + + conn.execute(insert_query, record) + + conn.commit() + logger.info(f"Successfully saved batch {batch_number} results to database") + return True + except Exception as e: + logger.error(f"Error saving batch {batch_number} to database: {str(e)}") + return False + + def get_batch_status(self, batch_number): + """Get the status of a specific batch.""" + try: + with self.engine.connect() as conn: + query = text(f""" + SELECT + batch_number, + total_batches, + processed_at, + COUNT(*) as total_records, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records, + SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records + FROM {BATCH_RESULTS_TABLE} + WHERE batch_number = :batch_number + GROUP BY batch_number, total_batches, processed_at + ORDER BY processed_at DESC + LIMIT 1 + """) + result = conn.execute(query, {"batch_number": batch_number}).fetchone() + return dict(result) if result else None + except Exception as e: + logger.error(f"Error getting batch {batch_number} status: {str(e)}") + return None + + def get_all_batches(self): + """Get all batch processing results.""" + try: + with self.engine.connect() as conn: + query = text(f""" + SELECT + batch_number, + total_batches, + processed_at, + COUNT(*) as total_records, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records, + SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records + FROM {BATCH_RESULTS_TABLE} + GROUP BY batch_number, total_batches, processed_at + ORDER BY batch_number + """) + results = conn.execute(query).fetchall() + return [dict(row) for row in results] + except Exception as e: + logger.error(f"Error getting all batches: {str(e)}") + return [] \ No newline at end of file diff --git a/salary_analytics/salary_earner_analyzer.py b/salary_analytics/salary_earner_analyzer.py index ccdc6bb..b32c995 100644 --- a/salary_analytics/salary_earner_analyzer.py +++ b/salary_analytics/salary_earner_analyzer.py @@ -6,8 +6,16 @@ import pandas as pd import matplotlib.pyplot as plt from matplotlib_venn import venn3 from datetime import datetime, timedelta +import logging from .config import MODEL_CONFIG, OUTPUT_PATHS +# Configure logging +logging.basicConfig( + level=logging.INFO, + format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' +) +logger = logging.getLogger(__name__) + class SalaryEarnerAnalyzer: def __init__(self, df): self.df = df @@ -124,7 +132,7 @@ class SalaryEarnerAnalyzer: # Generate final table self.final_table = self.generate_salary_earners_table(all_three_hypotheses) - print(f"Found {self.final_table['accountid'].nunique()} verified salary earners") + logger.info(f"Found {self.final_table['accountid'].nunique()} verified salary earners") # Generate likely salary earner table green_section = self.filter_venn_section( @@ -142,11 +150,11 @@ class SalaryEarnerAnalyzer: self.likely_salary_earner = pd.concat([yellow_section, green_section]) self.likely_salary_earner = self.likely_salary_earner.drop_duplicates(subset=['id']) self.likely_salary_earner = self.generate_salary_earners_table(self.likely_salary_earner) - print(f"Found {self.likely_salary_earner['accountid'].nunique()} likely salary earners") + logger.info(f"Found {self.likely_salary_earner['accountid'].nunique()} likely salary earners") # Analyze high earners self.high_earner_details, total_high_earners = self.analyze_salary_earners(self.final_table) - print(f"\nTotal High Earners: {total_high_earners}") + logger.info(f"\nTotal High Earners: {total_high_earners}") # Plot hypothesis overlap self.plot_hypothesis_overlap(