diff --git a/README.md b/README.md index cbc96a5..91850a6 100644 --- a/README.md +++ b/README.md @@ -119,6 +119,32 @@ uvicorn salary_analytics.api:app --reload 6. **Pipeline** - `POST /run/pipeline`: Run complete pipeline + - `POST /run/streaming-pipeline`: Run pipeline in batches + - Parameters: + - `source`: Data source ('db' or 'csv') + - `file`: CSV file (required if source is 'csv') + - `batch_size`: Number of rows to process in each batch (default: 10000) + - Example: + ```bash + # Run streaming pipeline from database + curl -X POST "http://localhost:8000/run/streaming-pipeline?source=db&batch_size=5000" + + # Run streaming pipeline from CSV + curl -X POST "http://localhost:8000/run/streaming-pipeline?source=csv&batch_size=5000" -F "file=@path/to/your/file.csv" + ``` + - Response: + ```json + [ + { + "batch_number": 1, + "total_batches": 10, + "processed_rows": 5000, + "results_path": "/app/output/csv/batch_results_20240315_123456/batch_1_results.csv", + "message": "Successfully processed batch 1 of 10" + }, + // ... more batch responses ... + ] + ``` ### Workflow @@ -127,6 +153,12 @@ uvicorn salary_analytics.api:app --reload 3. Run any of the analysis endpoints 4. Generate and download reports as needed +For large datasets, use the streaming pipeline endpoint: +1. Start the API server +2. Run the streaming pipeline with appropriate batch size +3. Monitor batch processing progress +4. Access results in the batch results directory + Note: All analysis endpoints require data to be loaded first. If you try to run any analysis without loading data, you'll receive a 400 error with a message to load data first. ## Docker Deployment diff --git a/salary_analytics/__pycache__/api.cpython-311.pyc b/salary_analytics/__pycache__/api.cpython-311.pyc index 939b1f5..374ad32 100644 Binary files a/salary_analytics/__pycache__/api.cpython-311.pyc and b/salary_analytics/__pycache__/api.cpython-311.pyc differ diff --git a/salary_analytics/__pycache__/config.cpython-311.pyc b/salary_analytics/__pycache__/config.cpython-311.pyc index 66e50b5..7495a52 100644 Binary files a/salary_analytics/__pycache__/config.cpython-311.pyc and b/salary_analytics/__pycache__/config.cpython-311.pyc differ diff --git a/salary_analytics/__pycache__/data_loader.cpython-311.pyc b/salary_analytics/__pycache__/data_loader.cpython-311.pyc index c99c113..a747980 100644 Binary files a/salary_analytics/__pycache__/data_loader.cpython-311.pyc and b/salary_analytics/__pycache__/data_loader.cpython-311.pyc differ diff --git a/salary_analytics/__pycache__/main.cpython-311.pyc b/salary_analytics/__pycache__/main.cpython-311.pyc index 4f1a3ec..6727b9c 100644 Binary files a/salary_analytics/__pycache__/main.cpython-311.pyc and b/salary_analytics/__pycache__/main.cpython-311.pyc differ diff --git a/salary_analytics/__pycache__/salary_earner_analyzer.cpython-311.pyc b/salary_analytics/__pycache__/salary_earner_analyzer.cpython-311.pyc index d72e45e..828d0ba 100644 Binary files a/salary_analytics/__pycache__/salary_earner_analyzer.cpython-311.pyc and b/salary_analytics/__pycache__/salary_earner_analyzer.cpython-311.pyc differ diff --git a/salary_analytics/__pycache__/salary_predictor.cpython-311.pyc b/salary_analytics/__pycache__/salary_predictor.cpython-311.pyc index 264f9fc..1d8ced8 100644 Binary files a/salary_analytics/__pycache__/salary_predictor.cpython-311.pyc and b/salary_analytics/__pycache__/salary_predictor.cpython-311.pyc differ diff --git a/salary_analytics/api.py b/salary_analytics/api.py index 63c3560..8172a9b 100644 --- a/salary_analytics/api.py +++ b/salary_analytics/api.py @@ -6,15 +6,16 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel -from typing import Optional, Dict +from typing import Optional, Dict, List import os import socket import logging import pandas as pd import tempfile - +from datetime import datetime +from sqlalchemy import text from .main import SalaryAnalyticsPipeline -from .config import OUTPUT_PATHS +from .config import OUTPUT_PATHS, TABLE_NAME from .data_loader import DataLoader from .salary_predictor import SalaryPredictor from .salary_earner_analyzer import SalaryEarnerAnalyzer @@ -56,6 +57,14 @@ class AnalysisResponse(BaseModel): data: Optional[Dict] = None file_path: Optional[str] = None +class BatchResponse(BaseModel): + """Response model for batch processing.""" + batch_number: int + total_batches: int + processed_rows: int + results_path: str + message: str + def check_data_loaded(): """Check if data is loaded before running analytics.""" if pipeline.df is None: @@ -277,4 +286,172 @@ async def load_data(source: str = "db", file: UploadFile = None): } except Exception as e: logger.error(f"Error loading data: {str(e)}") + raise HTTPException(status_code=500, detail=str(e)) + +@app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) +async def run_streaming_pipeline(source: str = "db", file: UploadFile = None, batch_size: int = 10000): + """ + Run the complete salary analytics pipeline in batches. + + Args: + source (str): Source of data ('db' or 'csv') + file (UploadFile): CSV file to load (required if source is 'csv') + batch_size (int): Number of rows to process in each batch + + Returns: + List[BatchResponse]: List of responses for each batch processed + """ + try: + if source not in ['db', 'csv']: + raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + + if source == 'csv' and not file: + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + + # Initialize data loader + data_loader = DataLoader() + data_loader.chunk_size = batch_size + + # Create output directory for batch results + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") + os.makedirs(batch_output_dir, exist_ok=True) + + responses = [] + batch_number = 0 + + def preprocess_chunk(chunk): + """Preprocess a chunk of data with the same logic as DataLoader.""" + # Convert dates + chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) + chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) + + chunk = chunk.rename(columns={ + 'd1': 'trx_type', + 'd2': 'trx_subtype', + 'd3': 'initiated_by', + 'd4': 'customer_id' + }) + chunk = chunk.dropna() + + return chunk + + if source == 'csv': + # Save uploaded file temporarily + with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: + content = await file.read() + temp_file.write(content) + temp_file_path = temp_file.name + + try: + # Process CSV in chunks + for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): + batch_number += 1 + logger.info(f"Processing batch {batch_number}") + + # Preprocess chunk + chunk = preprocess_chunk(chunk) + + # Run pipeline on chunk + pipeline = SalaryAnalyticsPipeline() + pipeline.df = chunk + + try: + # Run analyses + pipeline.run_keyword_analysis() + pipeline.run_consistent_amount_analysis() + pipeline.run_transaction_type_analysis() + + # Generate reports + reports = pipeline.generate_salary_earner_reports() + + # Save batch results + batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") + reports['final_table'].to_csv(batch_results_path, index=False) + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=-1, # Unknown for CSV + processed_rows=len(chunk), + results_path=batch_results_path, + message=f"Successfully processed batch {batch_number}" + )) + except Exception as e: + logger.error(f"Error processing batch {batch_number}: {str(e)}") + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=-1, + processed_rows=len(chunk), + results_path="", + message=f"Error processing batch {batch_number}: {str(e)}" + )) + finally: + # Clean up temporary file + os.unlink(temp_file_path) + else: + # Process database in chunks + if not data_loader.connect(): + raise HTTPException(status_code=500, detail="Failed to connect to database") + + # Get total row count + with data_loader.engine.connect() as conn: + count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") + total_rows = conn.execute(count_query).scalar() + + total_batches = (total_rows + batch_size - 1) // batch_size + offset = 0 + + while offset < total_rows: + batch_number += 1 + logger.info(f"Processing batch {batch_number} of {total_batches}") + + # Load chunk from database + query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" + chunk = pd.read_sql(query, data_loader.engine) + + if chunk.empty: + break + + # Preprocess chunk + chunk = preprocess_chunk(chunk) + + # Run pipeline on chunk + pipeline = SalaryAnalyticsPipeline() + pipeline.df = chunk + + try: + # Run analyses + pipeline.run_keyword_analysis() + pipeline.run_consistent_amount_analysis() + pipeline.run_transaction_type_analysis() + + # Generate reports + reports = pipeline.generate_salary_earner_reports() + + # Save batch results + batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") + reports['final_table'].to_csv(batch_results_path, index=False) + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=total_batches, + processed_rows=len(chunk), + results_path=batch_results_path, + message=f"Successfully processed batch {batch_number} of {total_batches}" + )) + except Exception as e: + logger.error(f"Error processing batch {batch_number}: {str(e)}") + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=total_batches, + processed_rows=len(chunk), + results_path="", + message=f"Error processing batch {batch_number}: {str(e)}" + )) + + offset += batch_size + + return responses + except Exception as e: + logger.error(f"Error in streaming pipeline: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/salary_analytics/data_loader.py b/salary_analytics/data_loader.py index 8eb6d25..ec2da46 100644 --- a/salary_analytics/data_loader.py +++ b/salary_analytics/data_loader.py @@ -69,6 +69,7 @@ class DataLoader: 'd4': 'customer_id' }) + chunk = chunk.dropna() chunks.append(chunk) # Combine all chunks @@ -127,6 +128,7 @@ class DataLoader: 'd4': 'customer_id' }) + chunk = chunk.dropna() chunks.append(chunk) offset += self.chunk_size diff --git a/salary_analytics/salary_earner_analyzer.py b/salary_analytics/salary_earner_analyzer.py index 00407ba..ccdc6bb 100644 --- a/salary_analytics/salary_earner_analyzer.py +++ b/salary_analytics/salary_earner_analyzer.py @@ -18,7 +18,7 @@ class SalaryEarnerAnalyzer: def filter_venn_section(self, **kwargs): """Filter accounts based on specified combinations of hypothesis flags.""" valid_columns = {'is_salary_related', 'is_consistent_amount', 'is_salary_type'} - df1 = self.df[self.df['initiated_by'] == 'C'] + df1 = self.df[self.df['initiated_by'] == 'C'].copy() invalid_keys = set(kwargs.keys()) - valid_columns if invalid_keys: @@ -28,7 +28,13 @@ class SalaryEarnerAnalyzer: for key, value in kwargs.items(): condition &= (df1[key] == value) - return df1[condition] + filtered_df = df1[condition] + + # Drop any rows with NaN values in critical columns + critical_cols = ['accountid', 'trx_start_date', 'amount'] + filtered_df = filtered_df.dropna(subset=critical_cols) + + return filtered_df def plot_hypothesis_overlap(self, hypothesis1_df, hypothesis3_df, hypothesis4_df, account_col='accountid'): """Plot and save Venn diagram showing overlap between hypotheses.""" @@ -47,21 +53,37 @@ class SalaryEarnerAnalyzer: """Generate a table of salary earners with their metrics.""" results = [] for accountid, group in all_three_hypotheses.groupby('accountid'): + # Skip if group is empty + if group.empty: + continue + # Calculate required metrics num_months = len(group) + + # Handle last 6 months calculation last_6_months = group[group['trx_start_date'] >= (datetime.now() - timedelta(days=180))] - least_inflow = last_6_months['amount'].min() - avg_salary = group['amount'].mean() - - # Calculate days since last transaction + if last_6_months.empty: + least_inflow = 0 + else: + least_inflow = last_6_months['amount'].min() + + # Handle average salary calculation + if group['amount'].notna().any(): + avg_salary = group['amount'].mean() + else: + avg_salary = 0 + + # Calculate days_since_last_trx with NaN handling group['days_since_last_trx'] = group['trx_start_date'].diff().dt.days median_interval = group['days_since_last_trx'].median() + if pd.isna(median_interval): + median_interval = 30 # Default to 30 days if no interval data last_date = group['trx_start_date'].max() next_date = last_date + timedelta(days=median_interval) next_amount = avg_salary - # Boolean flags + # Boolean flags with NaN handling days_since_last = (datetime.now() - last_date).days has_45d = days_since_last <= 45 has_2m = len(group[group['trx_start_date'] >= (datetime.now() - timedelta(days=60))]) >= 2 @@ -78,7 +100,9 @@ class SalaryEarnerAnalyzer: }) final_df = pd.DataFrame(results) - final_df = final_df.dropna() + # Drop rows where all numeric columns are NaN + numeric_cols = ['num_months', 'least_inflow_6m', 'avg_monthly_salary', 'estimated_next_amount'] + final_df = final_df.dropna(subset=numeric_cols, how='all') return final_df def analyze_salary_earners(self, final_df):