Implement streaming pipeline endpoint for batch processing

- Added `/run/streaming-pipeline` endpoint to process data in batches from either a database or CSV file.
- Introduced `BatchResponse` model for structured responses.
- Updated README with new endpoint details, including parameters and example usage.
- Enhanced error handling and logging during batch processing.
- Ensured data preprocessing and NaN handling in analysis functions.
This commit is contained in:
2025-05-02 14:25:31 +01:00
parent 5767f55686
commit 9c429caa56
10 changed files with 246 additions and 11 deletions
+32
View File
@@ -119,6 +119,32 @@ uvicorn salary_analytics.api:app --reload
6. **Pipeline**
- `POST /run/pipeline`: Run complete pipeline
- `POST /run/streaming-pipeline`: Run pipeline in batches
- Parameters:
- `source`: Data source ('db' or 'csv')
- `file`: CSV file (required if source is 'csv')
- `batch_size`: Number of rows to process in each batch (default: 10000)
- Example:
```bash
# Run streaming pipeline from database
curl -X POST "http://localhost:8000/run/streaming-pipeline?source=db&batch_size=5000"
# Run streaming pipeline from CSV
curl -X POST "http://localhost:8000/run/streaming-pipeline?source=csv&batch_size=5000" -F "file=@path/to/your/file.csv"
```
- Response:
```json
[
{
"batch_number": 1,
"total_batches": 10,
"processed_rows": 5000,
"results_path": "/app/output/csv/batch_results_20240315_123456/batch_1_results.csv",
"message": "Successfully processed batch 1 of 10"
},
// ... more batch responses ...
]
```
### Workflow
@@ -127,6 +153,12 @@ uvicorn salary_analytics.api:app --reload
3. Run any of the analysis endpoints
4. Generate and download reports as needed
For large datasets, use the streaming pipeline endpoint:
1. Start the API server
2. Run the streaming pipeline with appropriate batch size
3. Monitor batch processing progress
4. Access results in the batch results directory
Note: All analysis endpoints require data to be loaded first. If you try to run any analysis without loading data, you'll receive a 400 error with a message to load data first.
## Docker Deployment
Binary file not shown.
Binary file not shown.
Binary file not shown.
+180 -3
View File
@@ -6,15 +6,16 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict
from typing import Optional, Dict, List
import os
import socket
import logging
import pandas as pd
import tempfile
from datetime import datetime
from sqlalchemy import text
from .main import SalaryAnalyticsPipeline
from .config import OUTPUT_PATHS
from .config import OUTPUT_PATHS, TABLE_NAME
from .data_loader import DataLoader
from .salary_predictor import SalaryPredictor
from .salary_earner_analyzer import SalaryEarnerAnalyzer
@@ -56,6 +57,14 @@ class AnalysisResponse(BaseModel):
data: Optional[Dict] = None
file_path: Optional[str] = None
class BatchResponse(BaseModel):
"""Response model for batch processing."""
batch_number: int
total_batches: int
processed_rows: int
results_path: str
message: str
def check_data_loaded():
"""Check if data is loaded before running analytics."""
if pipeline.df is None:
@@ -278,3 +287,171 @@ async def load_data(source: str = "db", file: UploadFile = None):
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/run/streaming-pipeline", response_model=List[BatchResponse])
async def run_streaming_pipeline(source: str = "db", file: UploadFile = None, batch_size: int = 10000):
"""
Run the complete salary analytics pipeline in batches.
Args:
source (str): Source of data ('db' or 'csv')
file (UploadFile): CSV file to load (required if source is 'csv')
batch_size (int): Number of rows to process in each batch
Returns:
List[BatchResponse]: List of responses for each batch processed
"""
try:
if source not in ['db', 'csv']:
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader
data_loader = DataLoader()
data_loader.chunk_size = batch_size
# Create output directory for batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True)
responses = []
batch_number = 0
def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
return chunk
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process CSV in chunks
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
batch_number += 1
logger.info(f"Processing batch {batch_number}")
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Save batch results
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
reports['final_table'].to_csv(batch_results_path, index=False)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number}"
))
except Exception as e:
logger.error(f"Error processing batch {batch_number}: {str(e)}")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {str(e)}"
))
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
# Process database in chunks
if not data_loader.connect():
raise HTTPException(status_code=500, detail="Failed to connect to database")
# Get total row count
with data_loader.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
total_batches = (total_rows + batch_size - 1) // batch_size
offset = 0
while offset < total_rows:
batch_number += 1
logger.info(f"Processing batch {batch_number} of {total_batches}")
# Load chunk from database
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
chunk = pd.read_sql(query, data_loader.engine)
if chunk.empty:
break
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Save batch results
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
reports['final_table'].to_csv(batch_results_path, index=False)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number} of {total_batches}"
))
except Exception as e:
logger.error(f"Error processing batch {batch_number}: {str(e)}")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {str(e)}"
))
offset += batch_size
return responses
except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}")
raise HTTPException(status_code=500, detail=str(e))
+2
View File
@@ -69,6 +69,7 @@ class DataLoader:
'd4': 'customer_id'
})
chunk = chunk.dropna()
chunks.append(chunk)
# Combine all chunks
@@ -127,6 +128,7 @@ class DataLoader:
'd4': 'customer_id'
})
chunk = chunk.dropna()
chunks.append(chunk)
offset += self.chunk_size
+32 -8
View File
@@ -18,7 +18,7 @@ class SalaryEarnerAnalyzer:
def filter_venn_section(self, **kwargs):
"""Filter accounts based on specified combinations of hypothesis flags."""
valid_columns = {'is_salary_related', 'is_consistent_amount', 'is_salary_type'}
df1 = self.df[self.df['initiated_by'] == 'C']
df1 = self.df[self.df['initiated_by'] == 'C'].copy()
invalid_keys = set(kwargs.keys()) - valid_columns
if invalid_keys:
@@ -28,7 +28,13 @@ class SalaryEarnerAnalyzer:
for key, value in kwargs.items():
condition &= (df1[key] == value)
return df1[condition]
filtered_df = df1[condition]
# Drop any rows with NaN values in critical columns
critical_cols = ['accountid', 'trx_start_date', 'amount']
filtered_df = filtered_df.dropna(subset=critical_cols)
return filtered_df
def plot_hypothesis_overlap(self, hypothesis1_df, hypothesis3_df, hypothesis4_df, account_col='accountid'):
"""Plot and save Venn diagram showing overlap between hypotheses."""
@@ -47,21 +53,37 @@ class SalaryEarnerAnalyzer:
"""Generate a table of salary earners with their metrics."""
results = []
for accountid, group in all_three_hypotheses.groupby('accountid'):
# Skip if group is empty
if group.empty:
continue
# Calculate required metrics
num_months = len(group)
last_6_months = group[group['trx_start_date'] >= (datetime.now() - timedelta(days=180))]
least_inflow = last_6_months['amount'].min()
avg_salary = group['amount'].mean()
# Calculate days since last transaction
# Handle last 6 months calculation
last_6_months = group[group['trx_start_date'] >= (datetime.now() - timedelta(days=180))]
if last_6_months.empty:
least_inflow = 0
else:
least_inflow = last_6_months['amount'].min()
# Handle average salary calculation
if group['amount'].notna().any():
avg_salary = group['amount'].mean()
else:
avg_salary = 0
# Calculate days_since_last_trx with NaN handling
group['days_since_last_trx'] = group['trx_start_date'].diff().dt.days
median_interval = group['days_since_last_trx'].median()
if pd.isna(median_interval):
median_interval = 30 # Default to 30 days if no interval data
last_date = group['trx_start_date'].max()
next_date = last_date + timedelta(days=median_interval)
next_amount = avg_salary
# Boolean flags
# Boolean flags with NaN handling
days_since_last = (datetime.now() - last_date).days
has_45d = days_since_last <= 45
has_2m = len(group[group['trx_start_date'] >= (datetime.now() - timedelta(days=60))]) >= 2
@@ -78,7 +100,9 @@ class SalaryEarnerAnalyzer:
})
final_df = pd.DataFrame(results)
final_df = final_df.dropna()
# Drop rows where all numeric columns are NaN
numeric_cols = ['num_months', 'least_inflow_6m', 'avg_monthly_salary', 'estimated_next_amount']
final_df = final_df.dropna(subset=numeric_cols, how='all')
return final_df
def analyze_salary_earners(self, final_df):