Update .gitignore to exclude __pycache__ in salary_analytics directory and enhance run_streaming_pipeline endpoint to enforce file presence for CSV source.

This commit is contained in:
2025-05-10 15:14:38 +01:00
parent 5e5459450b
commit 305e5da4ec
2 changed files with 7 additions and 10 deletions
+1 -2
View File
@@ -14,5 +14,4 @@ __pycache__
__pycache__/* __pycache__/*
output/* output/*
.idea/* .idea/*
salary_analytics/__pycache__/*
+6 -8
View File
@@ -6,7 +6,7 @@ from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, D
from fastapi.responses import FileResponse from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel from pydantic import BaseModel
from typing import Optional, Dict, List from typing import Optional, Dict, List, Union
import os import os
import socket import socket
import logging import logging
@@ -298,11 +298,8 @@ async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
async def run_streaming_pipeline( async def run_streaming_pipeline(
source: str = "db", source: str = "db",
batch_size: int = 10000, batch_size: int = 10000,
UploadFile: str=''): file: Optional[Union[UploadFile, str]] = File(None)
):
file = None
if len(UploadFile) > 0 :
file : Optional[UploadFile] = Depends(get_file_if_csv)
""" """
Run the complete salary analytics pipeline in batches. Run the complete salary analytics pipeline in batches.
@@ -318,6 +315,9 @@ async def run_streaming_pipeline(
if source not in ['db', 'csv']: if source not in ['db', 'csv']:
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader # Initialize data loader
data_loader = DataLoader() data_loader = DataLoader()
data_loader.chunk_size = batch_size data_loader.chunk_size = batch_size
@@ -330,8 +330,6 @@ async def run_streaming_pipeline(
responses = [] responses = []
batch_number = 0 batch_number = 0
print("Sal************************", source)
def preprocess_chunk(chunk): def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader.""" """Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates # Convert dates