[add]: refactor/clean up

This commit is contained in:
VivianDee
2025-09-09 10:58:08 +01:00
parent 4e22161088
commit 80cc543cdd
19 changed files with 1127 additions and 607 deletions
-600
View File
@@ -1,600 +0,0 @@
"""
FastAPI application for salary analytics.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends
from fastapi.responses import FileResponse
import os
import socket
from typing import Optional, List, Union
import pandas as pd
import tempfile
from datetime import datetime
from sqlalchemy import text
import warnings
import time
from app.analytics.services.main import SalaryAnalyticsPipeline
from app.config import OUTPUT_PATHS, TABLE_NAME
from app.analytics.services.data_loader import DataLoader
from app.analytics.middlewares.middleware import add_middlewares
from app.models.db_operations import DatabaseOperations
from app.analytics.integrations.salary_detect import SalaryDetect
from app.utils.logger import logger
from app.analytics.helpers.response_helpers import AnalysisResponse, BatchResponse
# Suppress warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
pd.options.mode.chained_assignment = None
app = FastAPI(
title="Salary Analytics API",
description="API for analyzing and predicting salary patterns from transaction data",
version="1.0.0"
)
# Add CORS middleware
add_middlewares(app)
# Global pipeline instance
pipeline = SalaryAnalyticsPipeline()
# Global variables to store loaded data and models
data_loader = None
df = None
salary_predictor = None
salary_earner_analyzer = None
salary_detect = SalaryDetect()
def check_data_loaded():
"""Check if data is loaded before running analytics."""
if pipeline.df is None:
raise HTTPException(
status_code=400,
detail="No data loaded. Please load data first using the /load-data endpoint."
)
@app.on_event("startup")
async def startup_event():
"""Initialize the pipeline on startup."""
try:
logger.info("Initializing pipeline...")
# Start autonomous salary detection loop
salary_detect.start()
logger.info("Started autonomous salary detection loop.")
# Print network information
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
logger.info(f"Server running on hostname: {hostname}")
logger.info(f"Server IP address: {ip_address}")
logger.info(f"Server is accessible at:")
logger.info(f"- http://localhost:8000")
logger.info(f"- http://127.0.0.1:8000")
logger.info(f"- http://{ip_address}:8000")
logger.info("Pipeline initialized successfully")
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
raise
@app.get("/")
async def root():
"""Root endpoint."""
start_time = time.time()
logger.info("Root endpoint accessed")
response = {"message": "Welcome to Salary Analytics API"}
logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
return response
@app.get("/health")
async def health_check():
"""Health check endpoint."""
start_time = time.time()
logger.info("Health check endpoint accessed")
response = {"status": "healthy"}
logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
return response
@app.post("/analyze/keyword", response_model=AnalysisResponse)
async def analyze_keyword():
"""Run keyword-based salary transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting keyword analysis...")
data = pipeline.run_keyword_analysis()
logger.info(f"Keyword analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Keyword analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in keyword analysis: {str(e)}")
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/consistent-amount", response_model=AnalysisResponse)
async def analyze_consistent_amount():
"""Run consistent amount transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting consistent amount analysis...")
data = pipeline.run_consistent_amount_analysis()
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Consistent amount analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in consistent amount analysis: {str(e)}")
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/transaction-type", response_model=AnalysisResponse)
async def analyze_transaction_type():
"""Run transaction type analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting transaction type analysis...")
data = pipeline.run_transaction_type_analysis()
logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Transaction type analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in transaction type analysis: {str(e)}")
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/generate/reports", response_model=AnalysisResponse)
async def generate_reports(background_tasks: BackgroundTasks):
"""Generate salary earner reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting report generation...")
reports = pipeline.generate_salary_earner_reports()
logger.info("Reports generated successfully")
response = AnalysisResponse(
message="Reports generated successfully",
data={
"verified_salary_earners": len(reports['final_table']),
"likely_salary_earners": len(reports['likely_salary_earner']),
"high_earners": reports['total_high_earners']
}
)
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in report generation: {str(e)}")
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/train/models", response_model=AnalysisResponse)
async def train_models():
"""Train salary prediction models."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting model training...")
pipeline.train_salary_prediction_models()
logger.info("Models trained successfully")
response = AnalysisResponse(
message="Models trained successfully"
)
logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in model training: {str(e)}")
logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{report_type}")
async def download_report(report_type: str):
"""Download generated reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info(f"Attempting to download report: {report_type}")
file_paths = {
"high_earners": OUTPUT_PATHS["high_earner_details"],
"likely_earners": OUTPUT_PATHS["likely_salary_earner"],
"final_table": OUTPUT_PATHS["final_table"],
"consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
"inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
"hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
}
if report_type not in file_paths:
logger.error(f"Report type not found: {report_type}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report type not found")
file_path = file_paths[report_type]
if not os.path.exists(file_path):
logger.error(f"Report file not found: {file_path}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report file not found")
logger.info(f"Successfully found report file: {file_path}")
response = FileResponse(
path=file_path,
filename=os.path.basename(file_path),
media_type="application/octet-stream"
)
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error downloading report: {str(e)}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/run/pipeline", response_model=AnalysisResponse)
async def run_full_pipeline():
"""Run the complete salary analytics pipeline."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting full pipeline...")
success = pipeline.run_full_pipeline()
if not success:
logger.error("Pipeline failed")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Pipeline failed")
logger.info("Pipeline completed successfully")
response = AnalysisResponse(
message="Pipeline completed successfully"
)
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/load-data")
async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
"""
Load data from either database or CSV file.
Args:
source (str): Source of data ('db' or 'csv')
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
dict: Status of data loading
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
success = pipeline.load_data(source='csv', file_path=temp_file_path)
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
success = pipeline.load_data(source='db')
if not success:
logger.error("Failed to load data")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to load data")
response = {
"status": "success",
"message": f"Successfully loaded {len(pipeline.df)} rows of data",
"columns": pipeline.df.columns.tolist(),
"row_count": len(pipeline.df)
}
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
"""Dependency to handle file upload only when source is csv."""
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
return file
@app.post("/run/streaming-pipeline", response_model=List[BatchResponse])
async def run_streaming_pipeline(
source: str = "db",
batch_size: int = 10000,
file: Optional[Union[UploadFile, str]] = File(None)
):
"""
Run the complete salary analytics pipeline in batches.
Args:
source (str): Source of data ('db' or 'csv')
batch_size (int): Number of rows to process in each batch
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
List[BatchResponse]: List of responses for each batch processed
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader
data_loader = DataLoader()
data_loader.chunk_size = batch_size
# Create output directory for batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True)
# Initialize database operations
if not data_loader.connect():
logger.error("Failed to connect to database")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to connect to database")
db_ops = DatabaseOperations(data_loader.engine)
if not db_ops.create_batch_results_table():
logger.error("Failed to create batch results table")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to create batch results table")
responses = []
batch_number = 0
batch_start_time = time.time()
def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
return chunk
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process CSV in chunks
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
batch_number += 1
logger.info(f"Processing batch {batch_number}")
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = -1 # Unknown for CSV
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
# Process database in chunks
if not data_loader.connect():
raise HTTPException(status_code=500, detail="Failed to connect to database")
# Get total row count
with data_loader.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
total_batches = (total_rows + batch_size - 1) // batch_size
offset = 0
while offset < total_rows:
batch_number += 1
logger.info(f"Processing batch {batch_number} of {total_batches}")
# Load chunk from database
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
chunk = pd.read_sql(query, data_loader.engine)
if chunk.empty:
break
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number} of {total_batches}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
offset += batch_size
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return responses
except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+601
View File
@@ -0,0 +1,601 @@
"""
FastAPI application for salary analytics.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends
from fastapi.responses import FileResponse
import os
import socket
from typing import Optional, List, Union
import pandas as pd
import tempfile
from datetime import datetime
from sqlalchemy import text
import warnings
import time
from app.salary_analytics.services.main import SalaryAnalyticsPipeline
from app.config import OUTPUT_PATHS, TABLE_NAME
from app.salary_analytics.services.data_loader import DataLoader
from app.salary_analytics.middlewares.middleware import add_middlewares
from app.models.db_operations import DatabaseOperations
from app.salary_analytics.integrations.salary_detect import SalaryDetect
from app.utils.logger import logger
from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse
# Suppress warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
pd.options.mode.chained_assignment = None
app = FastAPI(
title="Salary Analytics API",
description="API for analyzing and predicting salary patterns from transaction data",
version="1.0.0"
)
# Add CORS middleware
add_middlewares(app)
# Global pipeline instance
pipeline = SalaryAnalyticsPipeline()
# Global variables to store loaded data and models
data_loader = None
df = None
salary_predictor = None
salary_earner_analyzer = None
# salary_detect = SalaryDetect()
# def check_data_loaded():
# """Check if data is loaded before running analytics."""
# if pipeline.df is None:
# raise HTTPException(
# status_code=400,
# detail="No data loaded. Please load data first using the /load-data endpoint."
# )
# @app.on_event("startup")
# async def startup_event():
# """Initialize the pipeline on startup."""
# try:
# logger.info("Initializing pipeline...")
# # Start autonomous salary detection loop
# salary_detect.start()
# logger.info("Started autonomous salary detection loop.")
# # Print network information
# hostname = socket.gethostname()
# ip_address = socket.gethostbyname(hostname)
# logger.info(f"Server running on hostname: {hostname}")
# logger.info(f"Server IP address: {ip_address}")
# logger.info(f"Server is accessible at:")
# logger.info(f"- http://localhost:8000")
# logger.info(f"- http://127.0.0.1:8000")
# logger.info(f"- http://{ip_address}:8000")
# logger.info("Pipeline initialized successfully")
# except Exception as e:
# logger.error(f"Error during startup: {str(e)}")
# raise
# @app.get("/")
# async def root():
# """Root endpoint."""
# start_time = time.time()
# logger.info("Root endpoint accessed")
# response = {"message": "Welcome to Salary Analytics API"}
# logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# @app.get("/health")
# async def health_check():
# """Health check endpoint."""
# start_time = time.time()
# logger.info("Health check endpoint accessed")
# response = {"status": "healthy"}
# logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
# return response
# @app.post("/analyze/keyword", response_model=AnalysisResponse)
# async def analyze_keyword():
# """Run keyword-based salary transaction analysis."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info("Starting keyword analysis...")
# data = pipeline.run_keyword_analysis()
# logger.info(f"Keyword analysis completed. Found {len(data)} matches")
# response = AnalysisResponse(
# message="Keyword analysis completed successfully",
# data={"count": len(data)}
# )
# logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error in keyword analysis: {str(e)}")
# logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/analyze/consistent-amount", response_model=AnalysisResponse)
# async def analyze_consistent_amount():
# """Run consistent amount transaction analysis."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info("Starting consistent amount analysis...")
# data = pipeline.run_consistent_amount_analysis()
# logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
# response = AnalysisResponse(
# message="Consistent amount analysis completed successfully",
# data={"count": len(data)}
# )
# logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error in consistent amount analysis: {str(e)}")
# logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/analyze/transaction-type", response_model=AnalysisResponse)
# async def analyze_transaction_type():
# """Run transaction type analysis."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info("Starting transaction type analysis...")
# data = pipeline.run_transaction_type_analysis()
# logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
# response = AnalysisResponse(
# message="Transaction type analysis completed successfully",
# data={"count": len(data)}
# )
# logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error in transaction type analysis: {str(e)}")
# logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/generate/reports", response_model=AnalysisResponse)
# async def generate_reports(background_tasks: BackgroundTasks):
# """Generate salary earner reports."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info("Starting report generation...")
# reports = pipeline.generate_salary_earner_reports()
# logger.info("Reports generated successfully")
# response = AnalysisResponse(
# message="Reports generated successfully",
# data={
# "verified_salary_earners": len(reports['final_table']),
# "likely_salary_earners": len(reports['likely_salary_earner']),
# "high_earners": reports['total_high_earners']
# }
# )
# logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error in report generation: {str(e)}")
# logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/train/models", response_model=AnalysisResponse)
# async def train_models():
# """Train salary prediction models."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info("Starting model training...")
# pipeline.train_salary_prediction_models()
# logger.info("Models trained successfully")
# response = AnalysisResponse(
# message="Models trained successfully"
# )
# logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error in model training: {str(e)}")
# logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.get("/download/{report_type}")
# async def download_report(report_type: str):
# """Download generated reports."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info(f"Attempting to download report: {report_type}")
# file_paths = {
# "high_earners": OUTPUT_PATHS["high_earner_details"],
# "likely_earners": OUTPUT_PATHS["likely_salary_earner"],
# "final_table": OUTPUT_PATHS["final_table"],
# "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
# "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
# "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
# }
# if report_type not in file_paths:
# logger.error(f"Report type not found: {report_type}")
# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=404, detail="Report type not found")
# file_path = file_paths[report_type]
# if not os.path.exists(file_path):
# logger.error(f"Report file not found: {file_path}")
# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=404, detail="Report file not found")
# logger.info(f"Successfully found report file: {file_path}")
# response = FileResponse(
# path=file_path,
# filename=os.path.basename(file_path),
# media_type="application/octet-stream"
# )
# logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error downloading report: {str(e)}")
# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/run/pipeline", response_model=AnalysisResponse)
# async def run_full_pipeline():
# """Run the complete salary analytics pipeline."""
# start_time = time.time()
# try:
# check_data_loaded()
# logger.info("Starting full pipeline...")
# success = pipeline.run_full_pipeline()
# if not success:
# logger.error("Pipeline failed")
# logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail="Pipeline failed")
# logger.info("Pipeline completed successfully")
# response = AnalysisResponse(
# message="Pipeline completed successfully"
# )
# logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error in pipeline: {str(e)}")
# logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# @app.post("/load-data")
# async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
# """
# Load data from either database or CSV file.
# Args:
# source (str): Source of data ('db' or 'csv')
# file (UploadFile, optional): CSV file to load (required if source is 'csv')
# Returns:
# dict: Status of data loading
# """
# start_time = time.time()
# try:
# if source not in ['db', 'csv']:
# logger.error(f"Invalid source: {source}")
# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
# if source == 'csv' and not file:
# logger.error("No file provided for CSV source")
# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# if source == 'csv':
# # Save uploaded file temporarily
# with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
# content = await file.read()
# temp_file.write(content)
# temp_file_path = temp_file.name
# try:
# success = pipeline.load_data(source='csv', file_path=temp_file_path)
# finally:
# # Clean up temporary file
# os.unlink(temp_file_path)
# else:
# success = pipeline.load_data(source='db')
# if not success:
# logger.error("Failed to load data")
# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail="Failed to load data")
# response = {
# "status": "success",
# "message": f"Successfully loaded {len(pipeline.df)} rows of data",
# "columns": pipeline.df.columns.tolist(),
# "row_count": len(pipeline.df)
# }
# logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
# return response
# except Exception as e:
# logger.error(f"Error loading data: {str(e)}")
# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
# async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
# """Dependency to handle file upload only when source is csv."""
# if source == 'csv' and not file:
# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# return file
# @app.post("/run/streaming-pipeline", response_model=List[BatchResponse])
# async def run_streaming_pipeline(
# source: str = "db",
# batch_size: int = 10000,
# file: Optional[Union[UploadFile, str]] = File(None)
# ):
# """
# Run the complete salary analytics pipeline in batches.
# Args:
# source (str): Source of data ('db' or 'csv')
# batch_size (int): Number of rows to process in each batch
# file (UploadFile, optional): CSV file to load (required if source is 'csv')
# Returns:
# List[BatchResponse]: List of responses for each batch processed
# """
# start_time = time.time()
# try:
# if source not in ['db', 'csv']:
# logger.error(f"Invalid source: {source}")
# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
# if source == 'csv' and not file:
# logger.error("No file provided for CSV source")
# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# # Initialize data loader
# data_loader = DataLoader()
# data_loader.chunk_size = batch_size
# # Create output directory for batch results
# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
# batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
# os.makedirs(batch_output_dir, exist_ok=True)
# # Initialize database operations
# if not data_loader.connect():
# logger.error("Failed to connect to database")
# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail="Failed to connect to database")
# db_ops = DatabaseOperations(data_loader.engine)
# if not db_ops.create_batch_results_table():
# logger.error("Failed to create batch results table")
# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail="Failed to create batch results table")
# responses = []
# batch_number = 0
# batch_start_time = time.time()
# def preprocess_chunk(chunk):
# """Preprocess a chunk of data with the same logic as DataLoader."""
# # Convert dates
# chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
# chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# # Rename columns
# chunk = chunk.rename(columns={
# 'd1': 'trx_type',
# 'd2': 'trx_subtype',
# 'd3': 'initiated_by',
# 'd4': 'customer_id'
# })
# chunk = chunk.dropna()
# return chunk
# if source == 'csv':
# # Save uploaded file temporarily
# with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
# content = await file.read()
# temp_file.write(content)
# temp_file_path = temp_file.name
# try:
# # Process CSV in chunks
# for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
# batch_number += 1
# logger.info(f"Processing batch {batch_number}")
# # Preprocess chunk
# chunk = preprocess_chunk(chunk)
# # Run pipeline on chunk
# pipeline = SalaryAnalyticsPipeline()
# pipeline.df = chunk
# try:
# batch_start_time = time.time()
# # Run analyses
# pipeline.run_keyword_analysis()
# pipeline.run_consistent_amount_analysis()
# pipeline.run_transaction_type_analysis()
# # Generate reports
# reports = pipeline.generate_salary_earner_reports()
# # Add batch metadata to results
# results_df = reports['final_table'].copy()
# results_df['batch_number'] = batch_number
# results_df['total_batches'] = -1 # Unknown for CSV
# results_df['processed_at'] = datetime.now()
# # Save batch results to CSV
# batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
# results_df.to_csv(batch_results_path, index=False)
# # Save to database
# db_ops.save_batch_to_db(
# batch_number=batch_number,
# total_batches=-1, # Unknown for CSV
# results_df=results_df,
# status="success"
# )
# logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
# responses.append(BatchResponse(
# batch_number=batch_number,
# total_batches=-1, # Unknown for CSV
# processed_rows=len(chunk),
# results_path=batch_results_path,
# message=f"Successfully processed batch {batch_number}"
# ))
# except Exception as e:
# error_message = str(e)
# logger.error(f"Error processing batch {batch_number}: {error_message}")
# # Save error to database
# db_ops.save_batch_to_db(
# batch_number=batch_number,
# total_batches=-1,
# results_df=pd.DataFrame(), # Empty DataFrame for error case
# status="error"
# )
# responses.append(BatchResponse(
# batch_number=batch_number,
# total_batches=-1,
# processed_rows=len(chunk),
# results_path="",
# message=f"Error processing batch {batch_number}: {error_message}"
# ))
# finally:
# # Clean up temporary file
# os.unlink(temp_file_path)
# else:
# # Process database in chunks
# if not data_loader.connect():
# raise HTTPException(status_code=500, detail="Failed to connect to database")
# # Get total row count
# with data_loader.engine.connect() as conn:
# count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
# total_rows = conn.execute(count_query).scalar()
# total_batches = (total_rows + batch_size - 1) // batch_size
# offset = 0
# while offset < total_rows:
# batch_number += 1
# logger.info(f"Processing batch {batch_number} of {total_batches}")
# # Load chunk from database
# query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
# chunk = pd.read_sql(query, data_loader.engine)
# if chunk.empty:
# break
# # Preprocess chunk
# chunk = preprocess_chunk(chunk)
# # Run pipeline on chunk
# pipeline = SalaryAnalyticsPipeline()
# pipeline.df = chunk
# try:
# batch_start_time = time.time()
# # Run analyses
# pipeline.run_keyword_analysis()
# pipeline.run_consistent_amount_analysis()
# pipeline.run_transaction_type_analysis()
# # Generate reports
# reports = pipeline.generate_salary_earner_reports()
# # Add batch metadata to results
# results_df = reports['final_table'].copy()
# results_df['batch_number'] = batch_number
# results_df['total_batches'] = total_batches
# results_df['processed_at'] = datetime.now()
# # Save batch results to CSV
# batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
# results_df.to_csv(batch_results_path, index=False)
# # Save to database
# db_ops.save_batch_to_db(
# batch_number=batch_number,
# total_batches=total_batches,
# results_df=results_df,
# status="success"
# )
# logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
# responses.append(BatchResponse(
# batch_number=batch_number,
# total_batches=total_batches,
# processed_rows=len(chunk),
# results_path=batch_results_path,
# message=f"Successfully processed batch {batch_number} of {total_batches}"
# ))
# except Exception as e:
# error_message = str(e)
# logger.error(f"Error processing batch {batch_number}: {error_message}")
# # Save error to database
# db_ops.save_batch_to_db(
# batch_number=batch_number,
# total_batches=total_batches,
# results_df=pd.DataFrame(), # Empty DataFrame for error case
# status="error"
# )
# responses.append(BatchResponse(
# batch_number=batch_number,
# total_batches=total_batches,
# processed_rows=len(chunk),
# results_path="",
# message=f"Error processing batch {batch_number}: {error_message}"
# ))
# offset += batch_size
# logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
# return responses
# except Exception as e:
# logger.error(f"Error in streaming pipeline: {str(e)}")
# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
# raise HTTPException(status_code=500, detail=str(e))
@@ -1,7 +1,7 @@
import time
import threading
import requests
from ...config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload
from app.config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload
from app.utils.logger import logger
class SalaryDetect:
+75
View File
@@ -0,0 +1,75 @@
from fastapi import APIRouter, HTTPException
from app.salary_analytics.helpers.response_helpers import AnalysisResponse
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.utils.logger import logger
import time
from app.salary_analytics.core.state import state
router = APIRouter()
@router.post("/keyword", response_model=AnalysisResponse)
async def analyze_keyword():
"""Run keyword-based salary transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting keyword analysis...")
data = state.pipeline.run_keyword_analysis()
logger.info(f"Keyword analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Keyword analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in keyword analysis: {str(e)}")
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/consistent-amount", response_model=AnalysisResponse)
async def analyze_consistent_amount():
"""Run consistent amount transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting consistent amount analysis...")
data = state.pipeline.run_consistent_amount_analysis()
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Consistent amount analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in consistent amount analysis: {str(e)}")
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/transaction-type", response_model=AnalysisResponse)
async def analyze_transaction_type():
"""Run transaction type analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting transaction type analysis...")
data = state.pipeline.run_transaction_type_analysis()
logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Transaction type analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in transaction type analysis: {str(e)}")
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+74
View File
@@ -0,0 +1,74 @@
from fastapi import APIRouter, HTTPException, UploadFile, File
from app.salary_analytics.core.state import state
from app.utils.logger import logger
import tempfile, os, time
from typing import Optional
router = APIRouter()
@router.post("/load-data")
async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
"""
Load data from either database or CSV file.
Args:
source (str): Source of data ('db' or 'csv')
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
dict: Status of data loading
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
success = state.pipeline.load_data(source='csv', file_path=temp_file_path)
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
success = state.pipeline.load_data(source='db')
if not success:
logger.error("Failed to load data")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to load data")
response = {
"status": "success",
"message": f"Successfully loaded {len(state.pipeline.df)} rows of data",
"columns": state.pipeline.df.columns.tolist(),
"row_count": len(state.pipeline.df)
}
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/load-data-with-file")
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
"""Dependency to handle file upload only when source is csv."""
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
return file
+292
View File
@@ -0,0 +1,292 @@
from fastapi import APIRouter, HTTPException
from app.salary_analytics.services.main import SalaryAnalyticsPipeline
from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.salary_analytics.services.data_loader import DataLoader
from app.salary_analytics.core.state import state
from app.models.db_operations import DatabaseOperations
from app.config import OUTPUT_PATHS, TABLE_NAME
from app.utils.logger import logger
from typing import Optional, List, Union
from sqlalchemy import text
from datetime import datetime
import pandas as pd, os, tempfile, time
from typing import Optional, Union
from fastapi import UploadFile, File
router = APIRouter()
@router.post("/run/pipeline", response_model=AnalysisResponse)
async def run_full_pipeline():
"""Run the complete salary analytics pipeline."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting full pipeline...")
success = state.pipeline.run_full_pipeline()
if not success:
logger.error("Pipeline failed")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Pipeline failed")
logger.info("Pipeline completed successfully")
response = AnalysisResponse(
message="Pipeline completed successfully"
)
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/run/streaming-pipeline", response_model=List[BatchResponse])
async def run_streaming_pipeline(
source: str = "db",
batch_size: int = 10000,
file: Optional[Union[UploadFile, str]] = File(None)
):
"""
Run the complete salary analytics pipeline in batches.
Args:
source (str): Source of data ('db' or 'csv')
batch_size (int): Number of rows to process in each batch
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
List[BatchResponse]: List of responses for each batch processed
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader
state.data_loader = DataLoader()
state.data_loader.chunk_size = batch_size
# Create output directory for batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True)
# Initialize database operations
if not state.data_loader.connect():
logger.error("Failed to connect to database")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to connect to database")
db_ops = DatabaseOperations(state.data_loader.engine)
if not db_ops.create_batch_results_table():
logger.error("Failed to create batch results table")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to create batch results table")
responses = []
batch_number = 0
batch_start_time = time.time()
def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
return chunk
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process CSV in chunks
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
batch_number += 1
logger.info(f"Processing batch {batch_number}")
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
state.pipeline = SalaryAnalyticsPipeline()
state.pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
state.pipeline.run_keyword_analysis()
state.pipeline.run_consistent_amount_analysis()
state.pipeline.run_transaction_type_analysis()
# Generate reports
reports = state.pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = -1 # Unknown for CSV
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
# Process database in chunks
if not state.data_loader.connect():
raise HTTPException(status_code=500, detail="Failed to connect to database")
# Get total row count
with state.data_loader.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
total_batches = (total_rows + batch_size - 1) // batch_size
offset = 0
while offset < total_rows:
batch_number += 1
logger.info(f"Processing batch {batch_number} of {total_batches}")
# Load chunk from database
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
chunk = pd.read_sql(query, state.data_loader.engine)
if chunk.empty:
break
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
state.pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
state.pipeline.run_keyword_analysis()
state.pipeline.run_consistent_amount_analysis()
state.pipeline.run_transaction_type_analysis()
# Generate reports
reports = state.pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number} of {total_batches}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
offset += batch_size
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return responses
except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+78
View File
@@ -0,0 +1,78 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from app.salary_analytics.helpers.response_helpers import AnalysisResponse
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.salary_analytics.core.state import state
from app.config import OUTPUT_PATHS
from app.utils.logger import logger
import os, time
router = APIRouter()
@router.post("/generate/reports", response_model=AnalysisResponse)
async def generate_reports(background_tasks: BackgroundTasks):
"""Generate salary earner reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting report generation...")
reports = state.pipeline.generate_salary_earner_reports()
logger.info("Reports generated successfully")
response = AnalysisResponse(
message="Reports generated successfully",
data={
"verified_salary_earners": len(reports['final_table']),
"likely_salary_earners": len(reports['likely_salary_earner']),
"high_earners": reports['total_high_earners']
}
)
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in report generation: {str(e)}")
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/download/{report_type}")
async def download_report(report_type: str):
"""Download generated reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info(f"Attempting to download report: {report_type}")
file_paths = {
"high_earners": OUTPUT_PATHS["high_earner_details"],
"likely_earners": OUTPUT_PATHS["likely_salary_earner"],
"final_table": OUTPUT_PATHS["final_table"],
"consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
"inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
"hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
}
if report_type not in file_paths:
logger.error(f"Report type not found: {report_type}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report type not found")
file_path = file_paths[report_type]
if not os.path.exists(file_path):
logger.error(f"Report file not found: {file_path}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report file not found")
logger.info(f"Successfully found report file: {file_path}")
response = FileResponse(
path=file_path,
filename=os.path.basename(file_path),
media_type="application/octet-stream"
)
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error downloading report: {str(e)}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@@ -3,7 +3,7 @@ Consistent amount transaction analysis module.
"""
import pandas as pd
from .config import MODEL_CONFIG
from app.config import MODEL_CONFIG
class ConsistentAmountAnalyzer:
def __init__(self, df):
@@ -7,7 +7,7 @@ import pandas as pd
from datetime import datetime
import logging
import os
from .config import DB_CONFIG, TABLE_NAME
from app.config import DB_CONFIG, TABLE_NAME
from app.utils.logger import logger
class DataLoader:
@@ -4,7 +4,7 @@ Keyword-based salary transaction analysis module.
import re
import pandas as pd
from .config import SALARY_KEYWORDS
from app.config import SALARY_KEYWORDS
class KeywordAnalyzer:
def __init__(self, df):
@@ -6,7 +6,7 @@ import pandas as pd
import matplotlib.pyplot as plt
from matplotlib_venn import venn3
from datetime import datetime, timedelta
from .config import MODEL_CONFIG, OUTPUT_PATHS
from app.config import MODEL_CONFIG, OUTPUT_PATHS
from app.utils.logger import logger
class SalaryEarnerAnalyzer:
@@ -9,7 +9,7 @@ from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from joblib import dump
from .config import OUTPUT_PATHS
from app.config import OUTPUT_PATHS
class SalaryPredictor:
def __init__(self, df):
@@ -3,7 +3,7 @@ Transaction type analysis module.
"""
import pandas as pd
from .config import MODEL_CONFIG
from app.config import MODEL_CONFIG
class TransactionTypeAnalyzer:
def __init__(self, df):