diff --git a/app/analytics/api.py b/app/analytics/api.py deleted file mode 100644 index ab069e3..0000000 --- a/app/analytics/api.py +++ /dev/null @@ -1,600 +0,0 @@ -""" -FastAPI application for salary analytics. -""" - -from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends -from fastapi.responses import FileResponse -import os -import socket -from typing import Optional, List, Union -import pandas as pd -import tempfile -from datetime import datetime -from sqlalchemy import text -import warnings -import time -from app.analytics.services.main import SalaryAnalyticsPipeline -from app.config import OUTPUT_PATHS, TABLE_NAME -from app.analytics.services.data_loader import DataLoader -from app.analytics.middlewares.middleware import add_middlewares -from app.models.db_operations import DatabaseOperations -from app.analytics.integrations.salary_detect import SalaryDetect -from app.utils.logger import logger -from app.analytics.helpers.response_helpers import AnalysisResponse, BatchResponse - - -# Suppress warnings -warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy') -pd.options.mode.chained_assignment = None - -app = FastAPI( - title="Salary Analytics API", - description="API for analyzing and predicting salary patterns from transaction data", - version="1.0.0" -) - -# Add CORS middleware -add_middlewares(app) - -# Global pipeline instance -pipeline = SalaryAnalyticsPipeline() - -# Global variables to store loaded data and models -data_loader = None -df = None -salary_predictor = None -salary_earner_analyzer = None - -salary_detect = SalaryDetect() - - - -def check_data_loaded(): - """Check if data is loaded before running analytics.""" - if pipeline.df is None: - raise HTTPException( - status_code=400, - detail="No data loaded. Please load data first using the /load-data endpoint." - ) - -@app.on_event("startup") -async def startup_event(): - """Initialize the pipeline on startup.""" - try: - logger.info("Initializing pipeline...") - - # Start autonomous salary detection loop - salary_detect.start() - logger.info("Started autonomous salary detection loop.") - - # Print network information - hostname = socket.gethostname() - ip_address = socket.gethostbyname(hostname) - logger.info(f"Server running on hostname: {hostname}") - logger.info(f"Server IP address: {ip_address}") - logger.info(f"Server is accessible at:") - logger.info(f"- http://localhost:8000") - logger.info(f"- http://127.0.0.1:8000") - logger.info(f"- http://{ip_address}:8000") - logger.info("Pipeline initialized successfully") - except Exception as e: - logger.error(f"Error during startup: {str(e)}") - raise - - - -@app.get("/") -async def root(): - """Root endpoint.""" - start_time = time.time() - logger.info("Root endpoint accessed") - response = {"message": "Welcome to Salary Analytics API"} - logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") - return response - - - -@app.get("/health") -async def health_check(): - """Health check endpoint.""" - start_time = time.time() - logger.info("Health check endpoint accessed") - response = {"status": "healthy"} - logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") - return response - - - -@app.post("/analyze/keyword", response_model=AnalysisResponse) -async def analyze_keyword(): - """Run keyword-based salary transaction analysis.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting keyword analysis...") - data = pipeline.run_keyword_analysis() - logger.info(f"Keyword analysis completed. Found {len(data)} matches") - response = AnalysisResponse( - message="Keyword analysis completed successfully", - data={"count": len(data)} - ) - logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in keyword analysis: {str(e)}") - logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.post("/analyze/consistent-amount", response_model=AnalysisResponse) -async def analyze_consistent_amount(): - """Run consistent amount transaction analysis.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting consistent amount analysis...") - data = pipeline.run_consistent_amount_analysis() - logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") - response = AnalysisResponse( - message="Consistent amount analysis completed successfully", - data={"count": len(data)} - ) - logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in consistent amount analysis: {str(e)}") - logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.post("/analyze/transaction-type", response_model=AnalysisResponse) -async def analyze_transaction_type(): - """Run transaction type analysis.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting transaction type analysis...") - data = pipeline.run_transaction_type_analysis() - logger.info(f"Transaction type analysis completed. Found {len(data)} matches") - response = AnalysisResponse( - message="Transaction type analysis completed successfully", - data={"count": len(data)} - ) - logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in transaction type analysis: {str(e)}") - logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.post("/generate/reports", response_model=AnalysisResponse) -async def generate_reports(background_tasks: BackgroundTasks): - """Generate salary earner reports.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting report generation...") - reports = pipeline.generate_salary_earner_reports() - logger.info("Reports generated successfully") - response = AnalysisResponse( - message="Reports generated successfully", - data={ - "verified_salary_earners": len(reports['final_table']), - "likely_salary_earners": len(reports['likely_salary_earner']), - "high_earners": reports['total_high_earners'] - } - ) - logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in report generation: {str(e)}") - logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.post("/train/models", response_model=AnalysisResponse) -async def train_models(): - """Train salary prediction models.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting model training...") - pipeline.train_salary_prediction_models() - logger.info("Models trained successfully") - response = AnalysisResponse( - message="Models trained successfully" - ) - logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in model training: {str(e)}") - logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.get("/download/{report_type}") -async def download_report(report_type: str): - """Download generated reports.""" - start_time = time.time() - try: - check_data_loaded() - logger.info(f"Attempting to download report: {report_type}") - file_paths = { - "high_earners": OUTPUT_PATHS["high_earner_details"], - "likely_earners": OUTPUT_PATHS["likely_salary_earner"], - "final_table": OUTPUT_PATHS["final_table"], - "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], - "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], - "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] - } - - if report_type not in file_paths: - logger.error(f"Report type not found: {report_type}") - logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=404, detail="Report type not found") - - file_path = file_paths[report_type] - if not os.path.exists(file_path): - logger.error(f"Report file not found: {file_path}") - logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=404, detail="Report file not found") - - logger.info(f"Successfully found report file: {file_path}") - response = FileResponse( - path=file_path, - filename=os.path.basename(file_path), - media_type="application/octet-stream" - ) - logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error downloading report: {str(e)}") - logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.post("/run/pipeline", response_model=AnalysisResponse) -async def run_full_pipeline(): - """Run the complete salary analytics pipeline.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting full pipeline...") - success = pipeline.run_full_pipeline() - if not success: - logger.error("Pipeline failed") - logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Pipeline failed") - - logger.info("Pipeline completed successfully") - response = AnalysisResponse( - message="Pipeline completed successfully" - ) - logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in pipeline: {str(e)}") - logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - - - -@app.post("/load-data") -async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): - """ - Load data from either database or CSV file. - - Args: - source (str): Source of data ('db' or 'csv') - file (UploadFile, optional): CSV file to load (required if source is 'csv') - - Returns: - dict: Status of data loading - """ - start_time = time.time() - try: - if source not in ['db', 'csv']: - logger.error(f"Invalid source: {source}") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - - if source == 'csv' and not file: - logger.error("No file provided for CSV source") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - - if source == 'csv': - # Save uploaded file temporarily - with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: - content = await file.read() - temp_file.write(content) - temp_file_path = temp_file.name - - try: - success = pipeline.load_data(source='csv', file_path=temp_file_path) - finally: - # Clean up temporary file - os.unlink(temp_file_path) - else: - success = pipeline.load_data(source='db') - - if not success: - logger.error("Failed to load data") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Failed to load data") - - response = { - "status": "success", - "message": f"Successfully loaded {len(pipeline.df)} rows of data", - "columns": pipeline.df.columns.tolist(), - "row_count": len(pipeline.df) - } - logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error loading data: {str(e)}") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): - """Dependency to handle file upload only when source is csv.""" - if source == 'csv' and not file: - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - return file - - - -@app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) -async def run_streaming_pipeline( - source: str = "db", - batch_size: int = 10000, - file: Optional[Union[UploadFile, str]] = File(None) -): - """ - Run the complete salary analytics pipeline in batches. - - Args: - source (str): Source of data ('db' or 'csv') - batch_size (int): Number of rows to process in each batch - file (UploadFile, optional): CSV file to load (required if source is 'csv') - - Returns: - List[BatchResponse]: List of responses for each batch processed - """ - start_time = time.time() - try: - if source not in ['db', 'csv']: - logger.error(f"Invalid source: {source}") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - - if source == 'csv' and not file: - logger.error("No file provided for CSV source") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - - # Initialize data loader - data_loader = DataLoader() - data_loader.chunk_size = batch_size - - # Create output directory for batch results - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") - os.makedirs(batch_output_dir, exist_ok=True) - - # Initialize database operations - if not data_loader.connect(): - logger.error("Failed to connect to database") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Failed to connect to database") - - db_ops = DatabaseOperations(data_loader.engine) - if not db_ops.create_batch_results_table(): - logger.error("Failed to create batch results table") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Failed to create batch results table") - - responses = [] - batch_number = 0 - batch_start_time = time.time() - - def preprocess_chunk(chunk): - """Preprocess a chunk of data with the same logic as DataLoader.""" - # Convert dates - chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) - chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) - - # Rename columns - chunk = chunk.rename(columns={ - 'd1': 'trx_type', - 'd2': 'trx_subtype', - 'd3': 'initiated_by', - 'd4': 'customer_id' - }) - - chunk = chunk.dropna() - - return chunk - - if source == 'csv': - # Save uploaded file temporarily - with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: - content = await file.read() - temp_file.write(content) - temp_file_path = temp_file.name - - try: - # Process CSV in chunks - for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): - batch_number += 1 - logger.info(f"Processing batch {batch_number}") - - # Preprocess chunk - chunk = preprocess_chunk(chunk) - - # Run pipeline on chunk - pipeline = SalaryAnalyticsPipeline() - pipeline.df = chunk - - try: - batch_start_time = time.time() - # Run analyses - pipeline.run_keyword_analysis() - pipeline.run_consistent_amount_analysis() - pipeline.run_transaction_type_analysis() - - # Generate reports - reports = pipeline.generate_salary_earner_reports() - - # Add batch metadata to results - results_df = reports['final_table'].copy() - results_df['batch_number'] = batch_number - results_df['total_batches'] = -1 # Unknown for CSV - results_df['processed_at'] = datetime.now() - - # Save batch results to CSV - batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") - results_df.to_csv(batch_results_path, index=False) - - # Save to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=-1, # Unknown for CSV - results_df=results_df, - status="success" - ) - - logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=-1, # Unknown for CSV - processed_rows=len(chunk), - results_path=batch_results_path, - message=f"Successfully processed batch {batch_number}" - )) - except Exception as e: - error_message = str(e) - logger.error(f"Error processing batch {batch_number}: {error_message}") - - # Save error to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=-1, - results_df=pd.DataFrame(), # Empty DataFrame for error case - status="error" - ) - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=-1, - processed_rows=len(chunk), - results_path="", - message=f"Error processing batch {batch_number}: {error_message}" - )) - finally: - # Clean up temporary file - os.unlink(temp_file_path) - else: - # Process database in chunks - if not data_loader.connect(): - raise HTTPException(status_code=500, detail="Failed to connect to database") - - # Get total row count - with data_loader.engine.connect() as conn: - count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") - total_rows = conn.execute(count_query).scalar() - - total_batches = (total_rows + batch_size - 1) // batch_size - offset = 0 - - while offset < total_rows: - batch_number += 1 - logger.info(f"Processing batch {batch_number} of {total_batches}") - - # Load chunk from database - query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" - chunk = pd.read_sql(query, data_loader.engine) - - if chunk.empty: - break - - # Preprocess chunk - chunk = preprocess_chunk(chunk) - - # Run pipeline on chunk - pipeline = SalaryAnalyticsPipeline() - pipeline.df = chunk - - try: - batch_start_time = time.time() - # Run analyses - pipeline.run_keyword_analysis() - pipeline.run_consistent_amount_analysis() - pipeline.run_transaction_type_analysis() - - # Generate reports - reports = pipeline.generate_salary_earner_reports() - - # Add batch metadata to results - results_df = reports['final_table'].copy() - results_df['batch_number'] = batch_number - results_df['total_batches'] = total_batches - results_df['processed_at'] = datetime.now() - - # Save batch results to CSV - batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") - results_df.to_csv(batch_results_path, index=False) - - # Save to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=total_batches, - results_df=results_df, - status="success" - ) - - logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=total_batches, - processed_rows=len(chunk), - results_path=batch_results_path, - message=f"Successfully processed batch {batch_number} of {total_batches}" - )) - except Exception as e: - error_message = str(e) - logger.error(f"Error processing batch {batch_number}: {error_message}") - - # Save error to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=total_batches, - results_df=pd.DataFrame(), # Empty DataFrame for error case - status="error" - ) - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=total_batches, - processed_rows=len(chunk), - results_path="", - message=f"Error processing batch {batch_number}: {error_message}" - )) - - offset += batch_size - - logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") - return responses - except Exception as e: - logger.error(f"Error in streaming pipeline: {str(e)}") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/app/api.py b/app/api.py new file mode 100644 index 0000000..3a65527 --- /dev/null +++ b/app/api.py @@ -0,0 +1,601 @@ +""" +FastAPI application for salary analytics. +""" + +from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends +from fastapi.responses import FileResponse +import os +import socket +from typing import Optional, List, Union +import pandas as pd +import tempfile +from datetime import datetime +from sqlalchemy import text +import warnings +import time +from app.salary_analytics.services.main import SalaryAnalyticsPipeline +from app.config import OUTPUT_PATHS, TABLE_NAME +from app.salary_analytics.services.data_loader import DataLoader +from app.salary_analytics.middlewares.middleware import add_middlewares +from app.models.db_operations import DatabaseOperations +from app.salary_analytics.integrations.salary_detect import SalaryDetect +from app.utils.logger import logger +from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse + + +# Suppress warnings +warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy') +pd.options.mode.chained_assignment = None + +app = FastAPI( + title="Salary Analytics API", + description="API for analyzing and predicting salary patterns from transaction data", + version="1.0.0" +) + +# Add CORS middleware +add_middlewares(app) + +# Global pipeline instance +pipeline = SalaryAnalyticsPipeline() + +# Global variables to store loaded data and models +data_loader = None +df = None +salary_predictor = None +salary_earner_analyzer = None + +# salary_detect = SalaryDetect() + + + +# def check_data_loaded(): +# """Check if data is loaded before running analytics.""" +# if pipeline.df is None: +# raise HTTPException( +# status_code=400, +# detail="No data loaded. Please load data first using the /load-data endpoint." +# ) + +# @app.on_event("startup") +# async def startup_event(): +# """Initialize the pipeline on startup.""" +# try: +# logger.info("Initializing pipeline...") + +# # Start autonomous salary detection loop +# salary_detect.start() +# logger.info("Started autonomous salary detection loop.") + +# # Print network information +# hostname = socket.gethostname() +# ip_address = socket.gethostbyname(hostname) +# logger.info(f"Server running on hostname: {hostname}") +# logger.info(f"Server IP address: {ip_address}") +# logger.info(f"Server is accessible at:") +# logger.info(f"- http://localhost:8000") +# logger.info(f"- http://127.0.0.1:8000") +# logger.info(f"- http://{ip_address}:8000") +# logger.info("Pipeline initialized successfully") +# except Exception as e: +# logger.error(f"Error during startup: {str(e)}") +# raise + + + +# @app.get("/") +# async def root(): +# """Root endpoint.""" +# start_time = time.time() +# logger.info("Root endpoint accessed") +# response = {"message": "Welcome to Salary Analytics API"} +# logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") +# return response + + + +# @app.get("/health") +# async def health_check(): +# """Health check endpoint.""" +# start_time = time.time() +# logger.info("Health check endpoint accessed") +# response = {"status": "healthy"} +# logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") +# return response + + + +# @app.post("/analyze/keyword", response_model=AnalysisResponse) +# async def analyze_keyword(): +# """Run keyword-based salary transaction analysis.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info("Starting keyword analysis...") +# data = pipeline.run_keyword_analysis() +# logger.info(f"Keyword analysis completed. Found {len(data)} matches") +# response = AnalysisResponse( +# message="Keyword analysis completed successfully", +# data={"count": len(data)} +# ) +# logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error in keyword analysis: {str(e)}") +# logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.post("/analyze/consistent-amount", response_model=AnalysisResponse) +# async def analyze_consistent_amount(): +# """Run consistent amount transaction analysis.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info("Starting consistent amount analysis...") +# data = pipeline.run_consistent_amount_analysis() +# logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") +# response = AnalysisResponse( +# message="Consistent amount analysis completed successfully", +# data={"count": len(data)} +# ) +# logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error in consistent amount analysis: {str(e)}") +# logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.post("/analyze/transaction-type", response_model=AnalysisResponse) +# async def analyze_transaction_type(): +# """Run transaction type analysis.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info("Starting transaction type analysis...") +# data = pipeline.run_transaction_type_analysis() +# logger.info(f"Transaction type analysis completed. Found {len(data)} matches") +# response = AnalysisResponse( +# message="Transaction type analysis completed successfully", +# data={"count": len(data)} +# ) +# logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error in transaction type analysis: {str(e)}") +# logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.post("/generate/reports", response_model=AnalysisResponse) +# async def generate_reports(background_tasks: BackgroundTasks): +# """Generate salary earner reports.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info("Starting report generation...") +# reports = pipeline.generate_salary_earner_reports() +# logger.info("Reports generated successfully") +# response = AnalysisResponse( +# message="Reports generated successfully", +# data={ +# "verified_salary_earners": len(reports['final_table']), +# "likely_salary_earners": len(reports['likely_salary_earner']), +# "high_earners": reports['total_high_earners'] +# } +# ) +# logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error in report generation: {str(e)}") +# logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.post("/train/models", response_model=AnalysisResponse) +# async def train_models(): +# """Train salary prediction models.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info("Starting model training...") +# pipeline.train_salary_prediction_models() +# logger.info("Models trained successfully") +# response = AnalysisResponse( +# message="Models trained successfully" +# ) +# logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error in model training: {str(e)}") +# logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.get("/download/{report_type}") +# async def download_report(report_type: str): +# """Download generated reports.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info(f"Attempting to download report: {report_type}") +# file_paths = { +# "high_earners": OUTPUT_PATHS["high_earner_details"], +# "likely_earners": OUTPUT_PATHS["likely_salary_earner"], +# "final_table": OUTPUT_PATHS["final_table"], +# "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], +# "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], +# "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] +# } + +# if report_type not in file_paths: +# logger.error(f"Report type not found: {report_type}") +# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=404, detail="Report type not found") + +# file_path = file_paths[report_type] +# if not os.path.exists(file_path): +# logger.error(f"Report file not found: {file_path}") +# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=404, detail="Report file not found") + +# logger.info(f"Successfully found report file: {file_path}") +# response = FileResponse( +# path=file_path, +# filename=os.path.basename(file_path), +# media_type="application/octet-stream" +# ) +# logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error downloading report: {str(e)}") +# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.post("/run/pipeline", response_model=AnalysisResponse) +# async def run_full_pipeline(): +# """Run the complete salary analytics pipeline.""" +# start_time = time.time() +# try: +# check_data_loaded() +# logger.info("Starting full pipeline...") +# success = pipeline.run_full_pipeline() +# if not success: +# logger.error("Pipeline failed") +# logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail="Pipeline failed") + +# logger.info("Pipeline completed successfully") +# response = AnalysisResponse( +# message="Pipeline completed successfully" +# ) +# logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error in pipeline: {str(e)}") +# logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + + + +# @app.post("/load-data") +# async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): +# """ +# Load data from either database or CSV file. + +# Args: +# source (str): Source of data ('db' or 'csv') +# file (UploadFile, optional): CSV file to load (required if source is 'csv') + +# Returns: +# dict: Status of data loading +# """ +# start_time = time.time() +# try: +# if source not in ['db', 'csv']: +# logger.error(f"Invalid source: {source}") +# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + +# if source == 'csv' and not file: +# logger.error("No file provided for CSV source") +# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + +# if source == 'csv': +# # Save uploaded file temporarily +# with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: +# content = await file.read() +# temp_file.write(content) +# temp_file_path = temp_file.name + +# try: +# success = pipeline.load_data(source='csv', file_path=temp_file_path) +# finally: +# # Clean up temporary file +# os.unlink(temp_file_path) +# else: +# success = pipeline.load_data(source='db') + +# if not success: +# logger.error("Failed to load data") +# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail="Failed to load data") + +# response = { +# "status": "success", +# "message": f"Successfully loaded {len(pipeline.df)} rows of data", +# "columns": pipeline.df.columns.tolist(), +# "row_count": len(pipeline.df) +# } +# logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") +# return response +# except Exception as e: +# logger.error(f"Error loading data: {str(e)}") +# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + +# async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): +# """Dependency to handle file upload only when source is csv.""" +# if source == 'csv' and not file: +# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") +# return file + + + +# @app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) +# async def run_streaming_pipeline( +# source: str = "db", +# batch_size: int = 10000, +# file: Optional[Union[UploadFile, str]] = File(None) +# ): +# """ +# Run the complete salary analytics pipeline in batches. + +# Args: +# source (str): Source of data ('db' or 'csv') +# batch_size (int): Number of rows to process in each batch +# file (UploadFile, optional): CSV file to load (required if source is 'csv') + +# Returns: +# List[BatchResponse]: List of responses for each batch processed +# """ +# start_time = time.time() +# try: +# if source not in ['db', 'csv']: +# logger.error(f"Invalid source: {source}") +# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + +# if source == 'csv' and not file: +# logger.error("No file provided for CSV source") +# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + +# # Initialize data loader +# data_loader = DataLoader() +# data_loader.chunk_size = batch_size + +# # Create output directory for batch results +# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") +# batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") +# os.makedirs(batch_output_dir, exist_ok=True) + +# # Initialize database operations +# if not data_loader.connect(): +# logger.error("Failed to connect to database") +# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail="Failed to connect to database") + +# db_ops = DatabaseOperations(data_loader.engine) +# if not db_ops.create_batch_results_table(): +# logger.error("Failed to create batch results table") +# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail="Failed to create batch results table") + +# responses = [] +# batch_number = 0 +# batch_start_time = time.time() + +# def preprocess_chunk(chunk): +# """Preprocess a chunk of data with the same logic as DataLoader.""" +# # Convert dates +# chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) +# chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) + +# # Rename columns +# chunk = chunk.rename(columns={ +# 'd1': 'trx_type', +# 'd2': 'trx_subtype', +# 'd3': 'initiated_by', +# 'd4': 'customer_id' +# }) + +# chunk = chunk.dropna() + +# return chunk + +# if source == 'csv': +# # Save uploaded file temporarily +# with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: +# content = await file.read() +# temp_file.write(content) +# temp_file_path = temp_file.name + +# try: +# # Process CSV in chunks +# for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): +# batch_number += 1 +# logger.info(f"Processing batch {batch_number}") + +# # Preprocess chunk +# chunk = preprocess_chunk(chunk) + +# # Run pipeline on chunk +# pipeline = SalaryAnalyticsPipeline() +# pipeline.df = chunk + +# try: +# batch_start_time = time.time() +# # Run analyses +# pipeline.run_keyword_analysis() +# pipeline.run_consistent_amount_analysis() +# pipeline.run_transaction_type_analysis() + +# # Generate reports +# reports = pipeline.generate_salary_earner_reports() + +# # Add batch metadata to results +# results_df = reports['final_table'].copy() +# results_df['batch_number'] = batch_number +# results_df['total_batches'] = -1 # Unknown for CSV +# results_df['processed_at'] = datetime.now() + +# # Save batch results to CSV +# batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") +# results_df.to_csv(batch_results_path, index=False) + +# # Save to database +# db_ops.save_batch_to_db( +# batch_number=batch_number, +# total_batches=-1, # Unknown for CSV +# results_df=results_df, +# status="success" +# ) + +# logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") + +# responses.append(BatchResponse( +# batch_number=batch_number, +# total_batches=-1, # Unknown for CSV +# processed_rows=len(chunk), +# results_path=batch_results_path, +# message=f"Successfully processed batch {batch_number}" +# )) +# except Exception as e: +# error_message = str(e) +# logger.error(f"Error processing batch {batch_number}: {error_message}") + +# # Save error to database +# db_ops.save_batch_to_db( +# batch_number=batch_number, +# total_batches=-1, +# results_df=pd.DataFrame(), # Empty DataFrame for error case +# status="error" +# ) + +# responses.append(BatchResponse( +# batch_number=batch_number, +# total_batches=-1, +# processed_rows=len(chunk), +# results_path="", +# message=f"Error processing batch {batch_number}: {error_message}" +# )) +# finally: +# # Clean up temporary file +# os.unlink(temp_file_path) +# else: +# # Process database in chunks +# if not data_loader.connect(): +# raise HTTPException(status_code=500, detail="Failed to connect to database") + +# # Get total row count +# with data_loader.engine.connect() as conn: +# count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") +# total_rows = conn.execute(count_query).scalar() + +# total_batches = (total_rows + batch_size - 1) // batch_size +# offset = 0 + +# while offset < total_rows: +# batch_number += 1 +# logger.info(f"Processing batch {batch_number} of {total_batches}") + +# # Load chunk from database +# query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" +# chunk = pd.read_sql(query, data_loader.engine) + +# if chunk.empty: +# break + +# # Preprocess chunk +# chunk = preprocess_chunk(chunk) + +# # Run pipeline on chunk +# pipeline = SalaryAnalyticsPipeline() +# pipeline.df = chunk + +# try: +# batch_start_time = time.time() +# # Run analyses +# pipeline.run_keyword_analysis() +# pipeline.run_consistent_amount_analysis() +# pipeline.run_transaction_type_analysis() + +# # Generate reports +# reports = pipeline.generate_salary_earner_reports() + +# # Add batch metadata to results +# results_df = reports['final_table'].copy() +# results_df['batch_number'] = batch_number +# results_df['total_batches'] = total_batches +# results_df['processed_at'] = datetime.now() + +# # Save batch results to CSV +# batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") +# results_df.to_csv(batch_results_path, index=False) + +# # Save to database +# db_ops.save_batch_to_db( +# batch_number=batch_number, +# total_batches=total_batches, +# results_df=results_df, +# status="success" +# ) + +# logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") + +# responses.append(BatchResponse( +# batch_number=batch_number, +# total_batches=total_batches, +# processed_rows=len(chunk), +# results_path=batch_results_path, +# message=f"Successfully processed batch {batch_number} of {total_batches}" +# )) +# except Exception as e: +# error_message = str(e) +# logger.error(f"Error processing batch {batch_number}: {error_message}") + +# # Save error to database +# db_ops.save_batch_to_db( +# batch_number=batch_number, +# total_batches=total_batches, +# results_df=pd.DataFrame(), # Empty DataFrame for error case +# status="error" +# ) + +# responses.append(BatchResponse( +# batch_number=batch_number, +# total_batches=total_batches, +# processed_rows=len(chunk), +# results_path="", +# message=f"Error processing batch {batch_number}: {error_message}" +# )) + +# offset += batch_size + +# logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") +# return responses +# except Exception as e: +# logger.error(f"Error in streaming pipeline: {str(e)}") +# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") +# raise HTTPException(status_code=500, detail=str(e)) + diff --git a/app/analytics/commands/commands.py b/app/api/commands/commands.py similarity index 100% rename from app/analytics/commands/commands.py rename to app/api/commands/commands.py diff --git a/app/analytics/helpers/response_helpers.py b/app/salary_analytics/helpers/response_helpers.py similarity index 100% rename from app/analytics/helpers/response_helpers.py rename to app/salary_analytics/helpers/response_helpers.py diff --git a/app/analytics/integrations/rac_check.py b/app/salary_analytics/integrations/rac_check.py similarity index 100% rename from app/analytics/integrations/rac_check.py rename to app/salary_analytics/integrations/rac_check.py diff --git a/app/analytics/integrations/salary_detect.py b/app/salary_analytics/integrations/salary_detect.py similarity index 92% rename from app/analytics/integrations/salary_detect.py rename to app/salary_analytics/integrations/salary_detect.py index 684abcb..441dd02 100644 --- a/app/analytics/integrations/salary_detect.py +++ b/app/salary_analytics/integrations/salary_detect.py @@ -1,7 +1,7 @@ import time import threading import requests -from ...config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload +from app.config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload from app.utils.logger import logger class SalaryDetect: diff --git a/app/analytics/middlewares/middleware.py b/app/salary_analytics/middlewares/middleware.py similarity index 100% rename from app/analytics/middlewares/middleware.py rename to app/salary_analytics/middlewares/middleware.py diff --git a/app/salary_analytics/routes/analysis.py b/app/salary_analytics/routes/analysis.py new file mode 100644 index 0000000..e6c6688 --- /dev/null +++ b/app/salary_analytics/routes/analysis.py @@ -0,0 +1,75 @@ +from fastapi import APIRouter, HTTPException +from app.salary_analytics.helpers.response_helpers import AnalysisResponse +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.utils.logger import logger +import time +from app.salary_analytics.core.state import state + +router = APIRouter() + + +@router.post("/keyword", response_model=AnalysisResponse) +async def analyze_keyword(): + """Run keyword-based salary transaction analysis.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting keyword analysis...") + data = state.pipeline.run_keyword_analysis() + logger.info(f"Keyword analysis completed. Found {len(data)} matches") + response = AnalysisResponse( + message="Keyword analysis completed successfully", + data={"count": len(data)} + ) + logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in keyword analysis: {str(e)}") + logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + + +@router.post("/consistent-amount", response_model=AnalysisResponse) +async def analyze_consistent_amount(): + """Run consistent amount transaction analysis.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting consistent amount analysis...") + data = state.pipeline.run_consistent_amount_analysis() + logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") + response = AnalysisResponse( + message="Consistent amount analysis completed successfully", + data={"count": len(data)} + ) + logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in consistent amount analysis: {str(e)}") + logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + + + +@router.post("/transaction-type", response_model=AnalysisResponse) +async def analyze_transaction_type(): + """Run transaction type analysis.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting transaction type analysis...") + data = state.pipeline.run_transaction_type_analysis() + logger.info(f"Transaction type analysis completed. Found {len(data)} matches") + response = AnalysisResponse( + message="Transaction type analysis completed successfully", + data={"count": len(data)} + ) + logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in transaction type analysis: {str(e)}") + logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/app/salary_analytics/routes/load.py b/app/salary_analytics/routes/load.py new file mode 100644 index 0000000..ac6802c --- /dev/null +++ b/app/salary_analytics/routes/load.py @@ -0,0 +1,74 @@ +from fastapi import APIRouter, HTTPException, UploadFile, File +from app.salary_analytics.core.state import state +from app.utils.logger import logger +import tempfile, os, time +from typing import Optional + +router = APIRouter() + + + +@router.post("/load-data") +async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): + """ + Load data from either database or CSV file. + + Args: + source (str): Source of data ('db' or 'csv') + file (UploadFile, optional): CSV file to load (required if source is 'csv') + + Returns: + dict: Status of data loading + """ + start_time = time.time() + try: + if source not in ['db', 'csv']: + logger.error(f"Invalid source: {source}") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + + if source == 'csv' and not file: + logger.error("No file provided for CSV source") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + + if source == 'csv': + # Save uploaded file temporarily + with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: + content = await file.read() + temp_file.write(content) + temp_file_path = temp_file.name + + try: + success = state.pipeline.load_data(source='csv', file_path=temp_file_path) + finally: + # Clean up temporary file + os.unlink(temp_file_path) + else: + success = state.pipeline.load_data(source='db') + + if not success: + logger.error("Failed to load data") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to load data") + + response = { + "status": "success", + "message": f"Successfully loaded {len(state.pipeline.df)} rows of data", + "columns": state.pipeline.df.columns.tolist(), + "row_count": len(state.pipeline.df) + } + logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error loading data: {str(e)}") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/load-data-with-file") +async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): + """Dependency to handle file upload only when source is csv.""" + if source == 'csv' and not file: + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + return file diff --git a/app/salary_analytics/routes/pipeline.py b/app/salary_analytics/routes/pipeline.py new file mode 100644 index 0000000..db08518 --- /dev/null +++ b/app/salary_analytics/routes/pipeline.py @@ -0,0 +1,292 @@ +from fastapi import APIRouter, HTTPException +from app.salary_analytics.services.main import SalaryAnalyticsPipeline +from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.salary_analytics.services.data_loader import DataLoader +from app.salary_analytics.core.state import state +from app.models.db_operations import DatabaseOperations +from app.config import OUTPUT_PATHS, TABLE_NAME +from app.utils.logger import logger +from typing import Optional, List, Union +from sqlalchemy import text +from datetime import datetime +import pandas as pd, os, tempfile, time +from typing import Optional, Union +from fastapi import UploadFile, File + +router = APIRouter() + + +@router.post("/run/pipeline", response_model=AnalysisResponse) +async def run_full_pipeline(): + """Run the complete salary analytics pipeline.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting full pipeline...") + success = state.pipeline.run_full_pipeline() + if not success: + logger.error("Pipeline failed") + logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Pipeline failed") + + logger.info("Pipeline completed successfully") + response = AnalysisResponse( + message="Pipeline completed successfully" + ) + logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in pipeline: {str(e)}") + logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/run/streaming-pipeline", response_model=List[BatchResponse]) +async def run_streaming_pipeline( + source: str = "db", + batch_size: int = 10000, + file: Optional[Union[UploadFile, str]] = File(None) +): + """ + Run the complete salary analytics pipeline in batches. + + Args: + source (str): Source of data ('db' or 'csv') + batch_size (int): Number of rows to process in each batch + file (UploadFile, optional): CSV file to load (required if source is 'csv') + + Returns: + List[BatchResponse]: List of responses for each batch processed + """ + start_time = time.time() + try: + if source not in ['db', 'csv']: + logger.error(f"Invalid source: {source}") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + + if source == 'csv' and not file: + logger.error("No file provided for CSV source") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + + # Initialize data loader + state.data_loader = DataLoader() + state.data_loader.chunk_size = batch_size + + # Create output directory for batch results + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") + os.makedirs(batch_output_dir, exist_ok=True) + + # Initialize database operations + if not state.data_loader.connect(): + logger.error("Failed to connect to database") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to connect to database") + + db_ops = DatabaseOperations(state.data_loader.engine) + if not db_ops.create_batch_results_table(): + logger.error("Failed to create batch results table") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to create batch results table") + + responses = [] + batch_number = 0 + batch_start_time = time.time() + + def preprocess_chunk(chunk): + """Preprocess a chunk of data with the same logic as DataLoader.""" + # Convert dates + chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) + chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) + + # Rename columns + chunk = chunk.rename(columns={ + 'd1': 'trx_type', + 'd2': 'trx_subtype', + 'd3': 'initiated_by', + 'd4': 'customer_id' + }) + + chunk = chunk.dropna() + + return chunk + + if source == 'csv': + # Save uploaded file temporarily + with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: + content = await file.read() + temp_file.write(content) + temp_file_path = temp_file.name + + try: + # Process CSV in chunks + for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): + batch_number += 1 + logger.info(f"Processing batch {batch_number}") + + # Preprocess chunk + chunk = preprocess_chunk(chunk) + + # Run pipeline on chunk + state.pipeline = SalaryAnalyticsPipeline() + state.pipeline.df = chunk + + try: + batch_start_time = time.time() + # Run analyses + state.pipeline.run_keyword_analysis() + state.pipeline.run_consistent_amount_analysis() + state.pipeline.run_transaction_type_analysis() + + # Generate reports + reports = state.pipeline.generate_salary_earner_reports() + + # Add batch metadata to results + results_df = reports['final_table'].copy() + results_df['batch_number'] = batch_number + results_df['total_batches'] = -1 # Unknown for CSV + results_df['processed_at'] = datetime.now() + + # Save batch results to CSV + batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") + results_df.to_csv(batch_results_path, index=False) + + # Save to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=-1, # Unknown for CSV + results_df=results_df, + status="success" + ) + + logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=-1, # Unknown for CSV + processed_rows=len(chunk), + results_path=batch_results_path, + message=f"Successfully processed batch {batch_number}" + )) + except Exception as e: + error_message = str(e) + logger.error(f"Error processing batch {batch_number}: {error_message}") + + # Save error to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=-1, + results_df=pd.DataFrame(), # Empty DataFrame for error case + status="error" + ) + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=-1, + processed_rows=len(chunk), + results_path="", + message=f"Error processing batch {batch_number}: {error_message}" + )) + finally: + # Clean up temporary file + os.unlink(temp_file_path) + else: + # Process database in chunks + if not state.data_loader.connect(): + raise HTTPException(status_code=500, detail="Failed to connect to database") + + # Get total row count + with state.data_loader.engine.connect() as conn: + count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") + total_rows = conn.execute(count_query).scalar() + + total_batches = (total_rows + batch_size - 1) // batch_size + offset = 0 + + while offset < total_rows: + batch_number += 1 + logger.info(f"Processing batch {batch_number} of {total_batches}") + + # Load chunk from database + query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" + chunk = pd.read_sql(query, state.data_loader.engine) + + if chunk.empty: + break + + # Preprocess chunk + chunk = preprocess_chunk(chunk) + + # Run pipeline on chunk + pipeline = SalaryAnalyticsPipeline() + state.pipeline.df = chunk + + try: + batch_start_time = time.time() + # Run analyses + state.pipeline.run_keyword_analysis() + state.pipeline.run_consistent_amount_analysis() + state.pipeline.run_transaction_type_analysis() + + # Generate reports + reports = state.pipeline.generate_salary_earner_reports() + + # Add batch metadata to results + results_df = reports['final_table'].copy() + results_df['batch_number'] = batch_number + results_df['total_batches'] = total_batches + results_df['processed_at'] = datetime.now() + + # Save batch results to CSV + batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") + results_df.to_csv(batch_results_path, index=False) + + # Save to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=total_batches, + results_df=results_df, + status="success" + ) + + logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=total_batches, + processed_rows=len(chunk), + results_path=batch_results_path, + message=f"Successfully processed batch {batch_number} of {total_batches}" + )) + except Exception as e: + error_message = str(e) + logger.error(f"Error processing batch {batch_number}: {error_message}") + + # Save error to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=total_batches, + results_df=pd.DataFrame(), # Empty DataFrame for error case + status="error" + ) + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=total_batches, + processed_rows=len(chunk), + results_path="", + message=f"Error processing batch {batch_number}: {error_message}" + )) + + offset += batch_size + + logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") + return responses + except Exception as e: + logger.error(f"Error in streaming pipeline: {str(e)}") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + \ No newline at end of file diff --git a/app/salary_analytics/routes/reports.py b/app/salary_analytics/routes/reports.py new file mode 100644 index 0000000..24f9477 --- /dev/null +++ b/app/salary_analytics/routes/reports.py @@ -0,0 +1,78 @@ +from fastapi import APIRouter, HTTPException, BackgroundTasks +from fastapi.responses import FileResponse +from app.salary_analytics.helpers.response_helpers import AnalysisResponse +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.salary_analytics.core.state import state +from app.config import OUTPUT_PATHS +from app.utils.logger import logger +import os, time + +router = APIRouter() + + +@router.post("/generate/reports", response_model=AnalysisResponse) +async def generate_reports(background_tasks: BackgroundTasks): + """Generate salary earner reports.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting report generation...") + reports = state.pipeline.generate_salary_earner_reports() + logger.info("Reports generated successfully") + response = AnalysisResponse( + message="Reports generated successfully", + data={ + "verified_salary_earners": len(reports['final_table']), + "likely_salary_earners": len(reports['likely_salary_earner']), + "high_earners": reports['total_high_earners'] + } + ) + logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in report generation: {str(e)}") + logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + + +@router.get("/download/{report_type}") +async def download_report(report_type: str): + """Download generated reports.""" + start_time = time.time() + try: + check_data_loaded() + logger.info(f"Attempting to download report: {report_type}") + file_paths = { + "high_earners": OUTPUT_PATHS["high_earner_details"], + "likely_earners": OUTPUT_PATHS["likely_salary_earner"], + "final_table": OUTPUT_PATHS["final_table"], + "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], + "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], + "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] + } + + if report_type not in file_paths: + logger.error(f"Report type not found: {report_type}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=404, detail="Report type not found") + + file_path = file_paths[report_type] + if not os.path.exists(file_path): + logger.error(f"Report file not found: {file_path}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=404, detail="Report file not found") + + logger.info(f"Successfully found report file: {file_path}") + response = FileResponse( + path=file_path, + filename=os.path.basename(file_path), + media_type="application/octet-stream" + ) + logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error downloading report: {str(e)}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/app/analytics/services/__init__.py b/app/salary_analytics/services/__init__.py similarity index 100% rename from app/analytics/services/__init__.py rename to app/salary_analytics/services/__init__.py diff --git a/app/analytics/services/consistent_amount_analyzer.py b/app/salary_analytics/services/consistent_amount_analyzer.py similarity index 97% rename from app/analytics/services/consistent_amount_analyzer.py rename to app/salary_analytics/services/consistent_amount_analyzer.py index 3b74afc..119adae 100644 --- a/app/analytics/services/consistent_amount_analyzer.py +++ b/app/salary_analytics/services/consistent_amount_analyzer.py @@ -3,7 +3,7 @@ Consistent amount transaction analysis module. """ import pandas as pd -from .config import MODEL_CONFIG +from app.config import MODEL_CONFIG class ConsistentAmountAnalyzer: def __init__(self, df): diff --git a/app/analytics/services/data_loader.py b/app/salary_analytics/services/data_loader.py similarity index 98% rename from app/analytics/services/data_loader.py rename to app/salary_analytics/services/data_loader.py index e11b0e4..3aadd99 100644 --- a/app/analytics/services/data_loader.py +++ b/app/salary_analytics/services/data_loader.py @@ -7,7 +7,7 @@ import pandas as pd from datetime import datetime import logging import os -from .config import DB_CONFIG, TABLE_NAME +from app.config import DB_CONFIG, TABLE_NAME from app.utils.logger import logger class DataLoader: diff --git a/app/analytics/services/keyword_analyzer.py b/app/salary_analytics/services/keyword_analyzer.py similarity index 96% rename from app/analytics/services/keyword_analyzer.py rename to app/salary_analytics/services/keyword_analyzer.py index 5c45b0f..b9db479 100644 --- a/app/analytics/services/keyword_analyzer.py +++ b/app/salary_analytics/services/keyword_analyzer.py @@ -4,7 +4,7 @@ Keyword-based salary transaction analysis module. import re import pandas as pd -from .config import SALARY_KEYWORDS +from app.config import SALARY_KEYWORDS class KeywordAnalyzer: def __init__(self, df): diff --git a/app/analytics/services/main.py b/app/salary_analytics/services/main.py similarity index 100% rename from app/analytics/services/main.py rename to app/salary_analytics/services/main.py diff --git a/app/analytics/services/salary_earner_analyzer.py b/app/salary_analytics/services/salary_earner_analyzer.py similarity index 99% rename from app/analytics/services/salary_earner_analyzer.py rename to app/salary_analytics/services/salary_earner_analyzer.py index f17d4ce..7302cbc 100644 --- a/app/analytics/services/salary_earner_analyzer.py +++ b/app/salary_analytics/services/salary_earner_analyzer.py @@ -6,7 +6,7 @@ import pandas as pd import matplotlib.pyplot as plt from matplotlib_venn import venn3 from datetime import datetime, timedelta -from .config import MODEL_CONFIG, OUTPUT_PATHS +from app.config import MODEL_CONFIG, OUTPUT_PATHS from app.utils.logger import logger class SalaryEarnerAnalyzer: diff --git a/app/analytics/services/salary_predictor.py b/app/salary_analytics/services/salary_predictor.py similarity index 99% rename from app/analytics/services/salary_predictor.py rename to app/salary_analytics/services/salary_predictor.py index b74dfd5..19fc531 100644 --- a/app/analytics/services/salary_predictor.py +++ b/app/salary_analytics/services/salary_predictor.py @@ -9,7 +9,7 @@ from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from joblib import dump -from .config import OUTPUT_PATHS +from app.config import OUTPUT_PATHS class SalaryPredictor: def __init__(self, df): diff --git a/app/analytics/services/transaction_type_analyzer.py b/app/salary_analytics/services/transaction_type_analyzer.py similarity index 93% rename from app/analytics/services/transaction_type_analyzer.py rename to app/salary_analytics/services/transaction_type_analyzer.py index 2d1249c..0597aea 100644 --- a/app/analytics/services/transaction_type_analyzer.py +++ b/app/salary_analytics/services/transaction_type_analyzer.py @@ -3,7 +3,7 @@ Transaction type analysis module. """ import pandas as pd -from .config import MODEL_CONFIG +from app.config import MODEL_CONFIG class TransactionTypeAnalyzer: def __init__(self, df):