1a4e539626
- Introduced `DatabaseOperations` class for managing batch results in the database. - Added functionality to create a batch results table and save batch processing results. - Updated API endpoints to log execution time and handle batch processing errors more effectively. - Improved response handling in analysis endpoints and added batch metadata to results. - Suppressed warnings and improved logging throughout the application.
598 lines
25 KiB
Python
598 lines
25 KiB
Python
"""
|
|
FastAPI application for salary analytics.
|
|
"""
|
|
|
|
from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends
|
|
from fastapi.responses import FileResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from pydantic import BaseModel
|
|
from typing import Optional, Dict, List, Union
|
|
import os
|
|
import socket
|
|
import logging
|
|
import pandas as pd
|
|
import tempfile
|
|
from datetime import datetime
|
|
from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData
|
|
import numpy as np
|
|
import warnings
|
|
import time
|
|
from .main import SalaryAnalyticsPipeline
|
|
from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE
|
|
from .data_loader import DataLoader
|
|
from .salary_predictor import SalaryPredictor
|
|
from .salary_earner_analyzer import SalaryEarnerAnalyzer
|
|
from .db_operations import DatabaseOperations
|
|
|
|
# Configure logging
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Suppress warnings
|
|
warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
|
|
pd.options.mode.chained_assignment = None
|
|
|
|
app = FastAPI(
|
|
title="Salary Analytics API",
|
|
description="API for analyzing and predicting salary patterns from transaction data",
|
|
version="1.0.0"
|
|
)
|
|
|
|
# Add CORS middleware
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"], # Allows all origins
|
|
allow_credentials=True,
|
|
allow_methods=["*"], # Allows all methods
|
|
allow_headers=["*"], # Allows all headers
|
|
)
|
|
|
|
# Global pipeline instance
|
|
pipeline = SalaryAnalyticsPipeline()
|
|
|
|
# Global variables to store loaded data and models
|
|
data_loader = None
|
|
df = None
|
|
salary_predictor = None
|
|
salary_earner_analyzer = None
|
|
|
|
class AnalysisResponse(BaseModel):
|
|
"""Response model for analysis endpoints."""
|
|
message: str
|
|
data: Optional[Dict] = None
|
|
file_path: Optional[str] = None
|
|
|
|
class BatchResponse(BaseModel):
|
|
"""Response model for batch processing."""
|
|
batch_number: int
|
|
total_batches: int
|
|
processed_rows: int
|
|
results_path: str
|
|
message: str
|
|
|
|
def check_data_loaded():
|
|
"""Check if data is loaded before running analytics."""
|
|
if pipeline.df is None:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="No data loaded. Please load data first using the /load-data endpoint."
|
|
)
|
|
|
|
@app.on_event("startup")
|
|
async def startup_event():
|
|
"""Initialize the pipeline on startup."""
|
|
try:
|
|
logger.info("Initializing pipeline...")
|
|
|
|
# Print network information
|
|
hostname = socket.gethostname()
|
|
ip_address = socket.gethostbyname(hostname)
|
|
logger.info(f"Server running on hostname: {hostname}")
|
|
logger.info(f"Server IP address: {ip_address}")
|
|
logger.info(f"Server is accessible at:")
|
|
logger.info(f"- http://localhost:8000")
|
|
logger.info(f"- http://127.0.0.1:8000")
|
|
logger.info(f"- http://{ip_address}:8000")
|
|
logger.info("Pipeline initialized successfully")
|
|
except Exception as e:
|
|
logger.error(f"Error during startup: {str(e)}")
|
|
raise
|
|
|
|
@app.get("/")
|
|
async def root():
|
|
"""Root endpoint."""
|
|
start_time = time.time()
|
|
logger.info("Root endpoint accessed")
|
|
response = {"message": "Welcome to Salary Analytics API"}
|
|
logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
|
|
@app.get("/health")
|
|
async def health_check():
|
|
"""Health check endpoint."""
|
|
start_time = time.time()
|
|
logger.info("Health check endpoint accessed")
|
|
response = {"status": "healthy"}
|
|
logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
|
|
@app.post("/analyze/keyword", response_model=AnalysisResponse)
|
|
async def analyze_keyword():
|
|
"""Run keyword-based salary transaction analysis."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info("Starting keyword analysis...")
|
|
data = pipeline.run_keyword_analysis()
|
|
logger.info(f"Keyword analysis completed. Found {len(data)} matches")
|
|
response = AnalysisResponse(
|
|
message="Keyword analysis completed successfully",
|
|
data={"count": len(data)}
|
|
)
|
|
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error in keyword analysis: {str(e)}")
|
|
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/analyze/consistent-amount", response_model=AnalysisResponse)
|
|
async def analyze_consistent_amount():
|
|
"""Run consistent amount transaction analysis."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info("Starting consistent amount analysis...")
|
|
data = pipeline.run_consistent_amount_analysis()
|
|
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
|
|
response = AnalysisResponse(
|
|
message="Consistent amount analysis completed successfully",
|
|
data={"count": len(data)}
|
|
)
|
|
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error in consistent amount analysis: {str(e)}")
|
|
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/analyze/transaction-type", response_model=AnalysisResponse)
|
|
async def analyze_transaction_type():
|
|
"""Run transaction type analysis."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info("Starting transaction type analysis...")
|
|
data = pipeline.run_transaction_type_analysis()
|
|
logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
|
|
response = AnalysisResponse(
|
|
message="Transaction type analysis completed successfully",
|
|
data={"count": len(data)}
|
|
)
|
|
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error in transaction type analysis: {str(e)}")
|
|
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/generate/reports", response_model=AnalysisResponse)
|
|
async def generate_reports(background_tasks: BackgroundTasks):
|
|
"""Generate salary earner reports."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info("Starting report generation...")
|
|
reports = pipeline.generate_salary_earner_reports()
|
|
logger.info("Reports generated successfully")
|
|
response = AnalysisResponse(
|
|
message="Reports generated successfully",
|
|
data={
|
|
"verified_salary_earners": len(reports['final_table']),
|
|
"likely_salary_earners": len(reports['likely_salary_earner']),
|
|
"high_earners": reports['total_high_earners']
|
|
}
|
|
)
|
|
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error in report generation: {str(e)}")
|
|
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/train/models", response_model=AnalysisResponse)
|
|
async def train_models():
|
|
"""Train salary prediction models."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info("Starting model training...")
|
|
pipeline.train_salary_prediction_models()
|
|
logger.info("Models trained successfully")
|
|
response = AnalysisResponse(
|
|
message="Models trained successfully"
|
|
)
|
|
logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error in model training: {str(e)}")
|
|
logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.get("/download/{report_type}")
|
|
async def download_report(report_type: str):
|
|
"""Download generated reports."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info(f"Attempting to download report: {report_type}")
|
|
file_paths = {
|
|
"high_earners": OUTPUT_PATHS["high_earner_details"],
|
|
"likely_earners": OUTPUT_PATHS["likely_salary_earner"],
|
|
"final_table": OUTPUT_PATHS["final_table"],
|
|
"consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
|
|
"inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
|
|
"hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
|
|
}
|
|
|
|
if report_type not in file_paths:
|
|
logger.error(f"Report type not found: {report_type}")
|
|
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=404, detail="Report type not found")
|
|
|
|
file_path = file_paths[report_type]
|
|
if not os.path.exists(file_path):
|
|
logger.error(f"Report file not found: {file_path}")
|
|
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=404, detail="Report file not found")
|
|
|
|
logger.info(f"Successfully found report file: {file_path}")
|
|
response = FileResponse(
|
|
path=file_path,
|
|
filename=os.path.basename(file_path),
|
|
media_type="application/octet-stream"
|
|
)
|
|
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error downloading report: {str(e)}")
|
|
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/run/pipeline", response_model=AnalysisResponse)
|
|
async def run_full_pipeline():
|
|
"""Run the complete salary analytics pipeline."""
|
|
start_time = time.time()
|
|
try:
|
|
check_data_loaded()
|
|
logger.info("Starting full pipeline...")
|
|
success = pipeline.run_full_pipeline()
|
|
if not success:
|
|
logger.error("Pipeline failed")
|
|
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail="Pipeline failed")
|
|
|
|
logger.info("Pipeline completed successfully")
|
|
response = AnalysisResponse(
|
|
message="Pipeline completed successfully"
|
|
)
|
|
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error in pipeline: {str(e)}")
|
|
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
@app.post("/load-data")
|
|
async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
|
|
"""
|
|
Load data from either database or CSV file.
|
|
|
|
Args:
|
|
source (str): Source of data ('db' or 'csv')
|
|
file (UploadFile, optional): CSV file to load (required if source is 'csv')
|
|
|
|
Returns:
|
|
dict: Status of data loading
|
|
"""
|
|
start_time = time.time()
|
|
try:
|
|
if source not in ['db', 'csv']:
|
|
logger.error(f"Invalid source: {source}")
|
|
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
|
|
|
|
if source == 'csv' and not file:
|
|
logger.error("No file provided for CSV source")
|
|
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
|
|
|
|
if source == 'csv':
|
|
# Save uploaded file temporarily
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
|
|
content = await file.read()
|
|
temp_file.write(content)
|
|
temp_file_path = temp_file.name
|
|
|
|
try:
|
|
success = pipeline.load_data(source='csv', file_path=temp_file_path)
|
|
finally:
|
|
# Clean up temporary file
|
|
os.unlink(temp_file_path)
|
|
else:
|
|
success = pipeline.load_data(source='db')
|
|
|
|
if not success:
|
|
logger.error("Failed to load data")
|
|
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail="Failed to load data")
|
|
|
|
response = {
|
|
"status": "success",
|
|
"message": f"Successfully loaded {len(pipeline.df)} rows of data",
|
|
"columns": pipeline.df.columns.tolist(),
|
|
"row_count": len(pipeline.df)
|
|
}
|
|
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return response
|
|
except Exception as e:
|
|
logger.error(f"Error loading data: {str(e)}")
|
|
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e))
|
|
|
|
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
|
|
"""Dependency to handle file upload only when source is csv."""
|
|
if source == 'csv' and not file:
|
|
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
|
|
return file
|
|
|
|
@app.post("/run/streaming-pipeline", response_model=List[BatchResponse])
|
|
async def run_streaming_pipeline(
|
|
source: str = "db",
|
|
batch_size: int = 10000,
|
|
file: Optional[Union[UploadFile, str]] = File(None)
|
|
):
|
|
"""
|
|
Run the complete salary analytics pipeline in batches.
|
|
|
|
Args:
|
|
source (str): Source of data ('db' or 'csv')
|
|
batch_size (int): Number of rows to process in each batch
|
|
file (UploadFile, optional): CSV file to load (required if source is 'csv')
|
|
|
|
Returns:
|
|
List[BatchResponse]: List of responses for each batch processed
|
|
"""
|
|
start_time = time.time()
|
|
try:
|
|
if source not in ['db', 'csv']:
|
|
logger.error(f"Invalid source: {source}")
|
|
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
|
|
|
|
if source == 'csv' and not file:
|
|
logger.error("No file provided for CSV source")
|
|
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
|
|
|
|
# Initialize data loader
|
|
data_loader = DataLoader()
|
|
data_loader.chunk_size = batch_size
|
|
|
|
# Create output directory for batch results
|
|
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
|
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
|
|
os.makedirs(batch_output_dir, exist_ok=True)
|
|
|
|
# Initialize database operations
|
|
if not data_loader.connect():
|
|
logger.error("Failed to connect to database")
|
|
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail="Failed to connect to database")
|
|
|
|
db_ops = DatabaseOperations(data_loader.engine)
|
|
if not db_ops.create_batch_results_table():
|
|
logger.error("Failed to create batch results table")
|
|
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail="Failed to create batch results table")
|
|
|
|
responses = []
|
|
batch_number = 0
|
|
batch_start_time = time.time()
|
|
|
|
def preprocess_chunk(chunk):
|
|
"""Preprocess a chunk of data with the same logic as DataLoader."""
|
|
# Convert dates
|
|
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
|
|
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
|
|
|
|
# Rename columns
|
|
chunk = chunk.rename(columns={
|
|
'd1': 'trx_type',
|
|
'd2': 'trx_subtype',
|
|
'd3': 'initiated_by',
|
|
'd4': 'customer_id'
|
|
})
|
|
|
|
chunk = chunk.dropna()
|
|
|
|
return chunk
|
|
|
|
if source == 'csv':
|
|
# Save uploaded file temporarily
|
|
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
|
|
content = await file.read()
|
|
temp_file.write(content)
|
|
temp_file_path = temp_file.name
|
|
|
|
try:
|
|
# Process CSV in chunks
|
|
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
|
|
batch_number += 1
|
|
logger.info(f"Processing batch {batch_number}")
|
|
|
|
# Preprocess chunk
|
|
chunk = preprocess_chunk(chunk)
|
|
|
|
# Run pipeline on chunk
|
|
pipeline = SalaryAnalyticsPipeline()
|
|
pipeline.df = chunk
|
|
|
|
try:
|
|
batch_start_time = time.time()
|
|
# Run analyses
|
|
pipeline.run_keyword_analysis()
|
|
pipeline.run_consistent_amount_analysis()
|
|
pipeline.run_transaction_type_analysis()
|
|
|
|
# Generate reports
|
|
reports = pipeline.generate_salary_earner_reports()
|
|
|
|
# Add batch metadata to results
|
|
results_df = reports['final_table'].copy()
|
|
results_df['batch_number'] = batch_number
|
|
results_df['total_batches'] = -1 # Unknown for CSV
|
|
results_df['processed_at'] = datetime.now()
|
|
|
|
# Save batch results to CSV
|
|
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
|
|
results_df.to_csv(batch_results_path, index=False)
|
|
|
|
# Save to database
|
|
db_ops.save_batch_to_db(
|
|
batch_number=batch_number,
|
|
total_batches=-1, # Unknown for CSV
|
|
results_df=results_df,
|
|
status="success"
|
|
)
|
|
|
|
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
|
|
|
|
responses.append(BatchResponse(
|
|
batch_number=batch_number,
|
|
total_batches=-1, # Unknown for CSV
|
|
processed_rows=len(chunk),
|
|
results_path=batch_results_path,
|
|
message=f"Successfully processed batch {batch_number}"
|
|
))
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
logger.error(f"Error processing batch {batch_number}: {error_message}")
|
|
|
|
# Save error to database
|
|
db_ops.save_batch_to_db(
|
|
batch_number=batch_number,
|
|
total_batches=-1,
|
|
results_df=pd.DataFrame(), # Empty DataFrame for error case
|
|
status="error"
|
|
)
|
|
|
|
responses.append(BatchResponse(
|
|
batch_number=batch_number,
|
|
total_batches=-1,
|
|
processed_rows=len(chunk),
|
|
results_path="",
|
|
message=f"Error processing batch {batch_number}: {error_message}"
|
|
))
|
|
finally:
|
|
# Clean up temporary file
|
|
os.unlink(temp_file_path)
|
|
else:
|
|
# Process database in chunks
|
|
if not data_loader.connect():
|
|
raise HTTPException(status_code=500, detail="Failed to connect to database")
|
|
|
|
# Get total row count
|
|
with data_loader.engine.connect() as conn:
|
|
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
|
|
total_rows = conn.execute(count_query).scalar()
|
|
|
|
total_batches = (total_rows + batch_size - 1) // batch_size
|
|
offset = 0
|
|
|
|
while offset < total_rows:
|
|
batch_number += 1
|
|
logger.info(f"Processing batch {batch_number} of {total_batches}")
|
|
|
|
# Load chunk from database
|
|
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
|
|
chunk = pd.read_sql(query, data_loader.engine)
|
|
|
|
if chunk.empty:
|
|
break
|
|
|
|
# Preprocess chunk
|
|
chunk = preprocess_chunk(chunk)
|
|
|
|
# Run pipeline on chunk
|
|
pipeline = SalaryAnalyticsPipeline()
|
|
pipeline.df = chunk
|
|
|
|
try:
|
|
batch_start_time = time.time()
|
|
# Run analyses
|
|
pipeline.run_keyword_analysis()
|
|
pipeline.run_consistent_amount_analysis()
|
|
pipeline.run_transaction_type_analysis()
|
|
|
|
# Generate reports
|
|
reports = pipeline.generate_salary_earner_reports()
|
|
|
|
# Add batch metadata to results
|
|
results_df = reports['final_table'].copy()
|
|
results_df['batch_number'] = batch_number
|
|
results_df['total_batches'] = total_batches
|
|
results_df['processed_at'] = datetime.now()
|
|
|
|
# Save batch results to CSV
|
|
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
|
|
results_df.to_csv(batch_results_path, index=False)
|
|
|
|
# Save to database
|
|
db_ops.save_batch_to_db(
|
|
batch_number=batch_number,
|
|
total_batches=total_batches,
|
|
results_df=results_df,
|
|
status="success"
|
|
)
|
|
|
|
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
|
|
|
|
responses.append(BatchResponse(
|
|
batch_number=batch_number,
|
|
total_batches=total_batches,
|
|
processed_rows=len(chunk),
|
|
results_path=batch_results_path,
|
|
message=f"Successfully processed batch {batch_number} of {total_batches}"
|
|
))
|
|
except Exception as e:
|
|
error_message = str(e)
|
|
logger.error(f"Error processing batch {batch_number}: {error_message}")
|
|
|
|
# Save error to database
|
|
db_ops.save_batch_to_db(
|
|
batch_number=batch_number,
|
|
total_batches=total_batches,
|
|
results_df=pd.DataFrame(), # Empty DataFrame for error case
|
|
status="error"
|
|
)
|
|
|
|
responses.append(BatchResponse(
|
|
batch_number=batch_number,
|
|
total_batches=total_batches,
|
|
processed_rows=len(chunk),
|
|
results_path="",
|
|
message=f"Error processing batch {batch_number}: {error_message}"
|
|
))
|
|
|
|
offset += batch_size
|
|
|
|
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
|
|
return responses
|
|
except Exception as e:
|
|
logger.error(f"Error in streaming pipeline: {str(e)}")
|
|
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
|
|
raise HTTPException(status_code=500, detail=str(e)) |