diff --git a/.gitignore b/.gitignore index 18e8883..889671b 100644 --- a/.gitignore +++ b/.gitignore @@ -14,5 +14,4 @@ __pycache__ __pycache__/* output/* .idea/* - - +salary_analytics/__pycache__/* diff --git a/salary_analytics/api.py b/salary_analytics/api.py index 2407d31..1906962 100644 --- a/salary_analytics/api.py +++ b/salary_analytics/api.py @@ -6,7 +6,7 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, D from fastapi.responses import FileResponse from fastapi.middleware.cors import CORSMiddleware from pydantic import BaseModel -from typing import Optional, Dict, List +from typing import Optional, Dict, List, Union import os import socket import logging @@ -298,11 +298,8 @@ async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): async def run_streaming_pipeline( source: str = "db", batch_size: int = 10000, - UploadFile: str=''): - - file = None - if len(UploadFile) > 0 : - file : Optional[UploadFile] = Depends(get_file_if_csv) + file: Optional[Union[UploadFile, str]] = File(None) +): """ Run the complete salary analytics pipeline in batches. @@ -318,6 +315,9 @@ async def run_streaming_pipeline( if source not in ['db', 'csv']: raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + if source == 'csv' and not file: + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + # Initialize data loader data_loader = DataLoader() data_loader.chunk_size = batch_size @@ -330,8 +330,6 @@ async def run_streaming_pipeline( responses = [] batch_number = 0 - print("Sal************************", source) - def preprocess_chunk(chunk): """Preprocess a chunk of data with the same logic as DataLoader.""" # Convert dates