""" FastAPI application for salary analytics. """ from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel from typing import Optional, Dict, List, Union import os import socket import logging import pandas as pd import tempfile from datetime import datetime from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData import numpy as np import warnings import time from .main import SalaryAnalyticsPipeline from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE from .data_loader import DataLoader from .salary_predictor import SalaryPredictor from .salary_earner_analyzer import SalaryEarnerAnalyzer from .db_operations import DatabaseOperations # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) # Suppress warnings warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy') pd.options.mode.chained_assignment = None app = FastAPI( title="Salary Analytics API", description="API for analyzing and predicting salary patterns from transaction data", version="1.0.0" ) # Add CORS middleware app.add_middleware( CORSMiddleware, allow_origins=["*"], # Allows all origins allow_credentials=True, allow_methods=["*"], # Allows all methods allow_headers=["*"], # Allows all headers ) # Global pipeline instance pipeline = SalaryAnalyticsPipeline() # Global variables to store loaded data and models data_loader = None df = None salary_predictor = None salary_earner_analyzer = None class AnalysisResponse(BaseModel): """Response model for analysis endpoints.""" message: str data: Optional[Dict] = None file_path: Optional[str] = None class BatchResponse(BaseModel): """Response model for batch processing.""" batch_number: int total_batches: int processed_rows: int results_path: str message: str def check_data_loaded(): """Check if data is loaded before running analytics.""" if pipeline.df is None: raise HTTPException( status_code=400, detail="No data loaded. Please load data first using the /load-data endpoint." ) @app.on_event("startup") async def startup_event(): """Initialize the pipeline on startup.""" try: logger.info("Initializing pipeline...") # Print network information hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) logger.info(f"Server running on hostname: {hostname}") logger.info(f"Server IP address: {ip_address}") logger.info(f"Server is accessible at:") logger.info(f"- http://localhost:8000") logger.info(f"- http://127.0.0.1:8000") logger.info(f"- http://{ip_address}:8000") logger.info("Pipeline initialized successfully") except Exception as e: logger.error(f"Error during startup: {str(e)}") raise @app.get("/") async def root(): """Root endpoint.""" start_time = time.time() logger.info("Root endpoint accessed") response = {"message": "Welcome to Salary Analytics API"} logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") return response @app.get("/health") async def health_check(): """Health check endpoint.""" start_time = time.time() logger.info("Health check endpoint accessed") response = {"status": "healthy"} logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") return response @app.post("/analyze/keyword", response_model=AnalysisResponse) async def analyze_keyword(): """Run keyword-based salary transaction analysis.""" start_time = time.time() try: check_data_loaded() logger.info("Starting keyword analysis...") data = pipeline.run_keyword_analysis() logger.info(f"Keyword analysis completed. Found {len(data)} matches") response = AnalysisResponse( message="Keyword analysis completed successfully", data={"count": len(data)} ) logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error in keyword analysis: {str(e)}") logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/analyze/consistent-amount", response_model=AnalysisResponse) async def analyze_consistent_amount(): """Run consistent amount transaction analysis.""" start_time = time.time() try: check_data_loaded() logger.info("Starting consistent amount analysis...") data = pipeline.run_consistent_amount_analysis() logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") response = AnalysisResponse( message="Consistent amount analysis completed successfully", data={"count": len(data)} ) logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error in consistent amount analysis: {str(e)}") logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/analyze/transaction-type", response_model=AnalysisResponse) async def analyze_transaction_type(): """Run transaction type analysis.""" start_time = time.time() try: check_data_loaded() logger.info("Starting transaction type analysis...") data = pipeline.run_transaction_type_analysis() logger.info(f"Transaction type analysis completed. Found {len(data)} matches") response = AnalysisResponse( message="Transaction type analysis completed successfully", data={"count": len(data)} ) logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error in transaction type analysis: {str(e)}") logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/generate/reports", response_model=AnalysisResponse) async def generate_reports(background_tasks: BackgroundTasks): """Generate salary earner reports.""" start_time = time.time() try: check_data_loaded() logger.info("Starting report generation...") reports = pipeline.generate_salary_earner_reports() logger.info("Reports generated successfully") response = AnalysisResponse( message="Reports generated successfully", data={ "verified_salary_earners": len(reports['final_table']), "likely_salary_earners": len(reports['likely_salary_earner']), "high_earners": reports['total_high_earners'] } ) logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error in report generation: {str(e)}") logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/train/models", response_model=AnalysisResponse) async def train_models(): """Train salary prediction models.""" start_time = time.time() try: check_data_loaded() logger.info("Starting model training...") pipeline.train_salary_prediction_models() logger.info("Models trained successfully") response = AnalysisResponse( message="Models trained successfully" ) logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error in model training: {str(e)}") logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.get("/download/{report_type}") async def download_report(report_type: str): """Download generated reports.""" start_time = time.time() try: check_data_loaded() logger.info(f"Attempting to download report: {report_type}") file_paths = { "high_earners": OUTPUT_PATHS["high_earner_details"], "likely_earners": OUTPUT_PATHS["likely_salary_earner"], "final_table": OUTPUT_PATHS["final_table"], "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] } if report_type not in file_paths: logger.error(f"Report type not found: {report_type}") logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=404, detail="Report type not found") file_path = file_paths[report_type] if not os.path.exists(file_path): logger.error(f"Report file not found: {file_path}") logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=404, detail="Report file not found") logger.info(f"Successfully found report file: {file_path}") response = FileResponse( path=file_path, filename=os.path.basename(file_path), media_type="application/octet-stream" ) logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error downloading report: {str(e)}") logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/run/pipeline", response_model=AnalysisResponse) async def run_full_pipeline(): """Run the complete salary analytics pipeline.""" start_time = time.time() try: check_data_loaded() logger.info("Starting full pipeline...") success = pipeline.run_full_pipeline() if not success: logger.error("Pipeline failed") logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail="Pipeline failed") logger.info("Pipeline completed successfully") response = AnalysisResponse( message="Pipeline completed successfully" ) logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error in pipeline: {str(e)}") logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) @app.post("/load-data") async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): """ Load data from either database or CSV file. Args: source (str): Source of data ('db' or 'csv') file (UploadFile, optional): CSV file to load (required if source is 'csv') Returns: dict: Status of data loading """ start_time = time.time() try: if source not in ['db', 'csv']: logger.error(f"Invalid source: {source}") logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") if source == 'csv' and not file: logger.error("No file provided for CSV source") logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") if source == 'csv': # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name try: success = pipeline.load_data(source='csv', file_path=temp_file_path) finally: # Clean up temporary file os.unlink(temp_file_path) else: success = pipeline.load_data(source='db') if not success: logger.error("Failed to load data") logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail="Failed to load data") response = { "status": "success", "message": f"Successfully loaded {len(pipeline.df)} rows of data", "columns": pipeline.df.columns.tolist(), "row_count": len(pipeline.df) } logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") return response except Exception as e: logger.error(f"Error loading data: {str(e)}") logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e)) async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): """Dependency to handle file upload only when source is csv.""" if source == 'csv' and not file: raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") return file @app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) async def run_streaming_pipeline( source: str = "db", batch_size: int = 10000, file: Optional[Union[UploadFile, str]] = File(None) ): """ Run the complete salary analytics pipeline in batches. Args: source (str): Source of data ('db' or 'csv') batch_size (int): Number of rows to process in each batch file (UploadFile, optional): CSV file to load (required if source is 'csv') Returns: List[BatchResponse]: List of responses for each batch processed """ start_time = time.time() try: if source not in ['db', 'csv']: logger.error(f"Invalid source: {source}") logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") if source == 'csv' and not file: logger.error("No file provided for CSV source") logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") # Initialize data loader data_loader = DataLoader() data_loader.chunk_size = batch_size # Create output directory for batch results timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") os.makedirs(batch_output_dir, exist_ok=True) # Initialize database operations if not data_loader.connect(): logger.error("Failed to connect to database") logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail="Failed to connect to database") db_ops = DatabaseOperations(data_loader.engine) if not db_ops.create_batch_results_table(): logger.error("Failed to create batch results table") logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail="Failed to create batch results table") responses = [] batch_number = 0 batch_start_time = time.time() def preprocess_chunk(chunk): """Preprocess a chunk of data with the same logic as DataLoader.""" # Convert dates chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) # Rename columns chunk = chunk.rename(columns={ 'd1': 'trx_type', 'd2': 'trx_subtype', 'd3': 'initiated_by', 'd4': 'customer_id' }) chunk = chunk.dropna() return chunk if source == 'csv': # Save uploaded file temporarily with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: content = await file.read() temp_file.write(content) temp_file_path = temp_file.name try: # Process CSV in chunks for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): batch_number += 1 logger.info(f"Processing batch {batch_number}") # Preprocess chunk chunk = preprocess_chunk(chunk) # Run pipeline on chunk pipeline = SalaryAnalyticsPipeline() pipeline.df = chunk try: batch_start_time = time.time() # Run analyses pipeline.run_keyword_analysis() pipeline.run_consistent_amount_analysis() pipeline.run_transaction_type_analysis() # Generate reports reports = pipeline.generate_salary_earner_reports() # Add batch metadata to results results_df = reports['final_table'].copy() results_df['batch_number'] = batch_number results_df['total_batches'] = -1 # Unknown for CSV results_df['processed_at'] = datetime.now() # Save batch results to CSV batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") results_df.to_csv(batch_results_path, index=False) # Save to database db_ops.save_batch_to_db( batch_number=batch_number, total_batches=-1, # Unknown for CSV results_df=results_df, status="success" ) logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") responses.append(BatchResponse( batch_number=batch_number, total_batches=-1, # Unknown for CSV processed_rows=len(chunk), results_path=batch_results_path, message=f"Successfully processed batch {batch_number}" )) except Exception as e: error_message = str(e) logger.error(f"Error processing batch {batch_number}: {error_message}") # Save error to database db_ops.save_batch_to_db( batch_number=batch_number, total_batches=-1, results_df=pd.DataFrame(), # Empty DataFrame for error case status="error" ) responses.append(BatchResponse( batch_number=batch_number, total_batches=-1, processed_rows=len(chunk), results_path="", message=f"Error processing batch {batch_number}: {error_message}" )) finally: # Clean up temporary file os.unlink(temp_file_path) else: # Process database in chunks if not data_loader.connect(): raise HTTPException(status_code=500, detail="Failed to connect to database") # Get total row count with data_loader.engine.connect() as conn: count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") total_rows = conn.execute(count_query).scalar() total_batches = (total_rows + batch_size - 1) // batch_size offset = 0 while offset < total_rows: batch_number += 1 logger.info(f"Processing batch {batch_number} of {total_batches}") # Load chunk from database query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" chunk = pd.read_sql(query, data_loader.engine) if chunk.empty: break # Preprocess chunk chunk = preprocess_chunk(chunk) # Run pipeline on chunk pipeline = SalaryAnalyticsPipeline() pipeline.df = chunk try: batch_start_time = time.time() # Run analyses pipeline.run_keyword_analysis() pipeline.run_consistent_amount_analysis() pipeline.run_transaction_type_analysis() # Generate reports reports = pipeline.generate_salary_earner_reports() # Add batch metadata to results results_df = reports['final_table'].copy() results_df['batch_number'] = batch_number results_df['total_batches'] = total_batches results_df['processed_at'] = datetime.now() # Save batch results to CSV batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") results_df.to_csv(batch_results_path, index=False) # Save to database db_ops.save_batch_to_db( batch_number=batch_number, total_batches=total_batches, results_df=results_df, status="success" ) logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") responses.append(BatchResponse( batch_number=batch_number, total_batches=total_batches, processed_rows=len(chunk), results_path=batch_results_path, message=f"Successfully processed batch {batch_number} of {total_batches}" )) except Exception as e: error_message = str(e) logger.error(f"Error processing batch {batch_number}: {error_message}") # Save error to database db_ops.save_batch_to_db( batch_number=batch_number, total_batches=total_batches, results_df=pd.DataFrame(), # Empty DataFrame for error case status="error" ) responses.append(BatchResponse( batch_number=batch_number, total_batches=total_batches, processed_rows=len(chunk), results_path="", message=f"Error processing batch {batch_number}: {error_message}" )) offset += batch_size logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") return responses except Exception as e: logger.error(f"Error in streaming pipeline: {str(e)}") logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") raise HTTPException(status_code=500, detail=str(e))