From ebe40cda198aa39c97c5052bab2fcf5d103914ff Mon Sep 17 00:00:00 2001 From: VivianDee <115420678+VivianDee@users.noreply.github.com> Date: Tue, 9 Sep 2025 12:06:37 +0100 Subject: [PATCH] [add]: database configuration fix --- Dockerfile | 2 +- app.log | 182 +++++++++++++++ app/api.py | 601 -------------------------------------------------- app/config.py | 31 ++- 4 files changed, 204 insertions(+), 612 deletions(-) delete mode 100644 app/api.py diff --git a/Dockerfile b/Dockerfile index d07e212..c74ecdb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,7 +17,7 @@ RUN mkdir -p output/csv output/plots output/models # ENV FLASK_APP=wsgi.py -ENV FLASK_APP=run.py +ENV FLASK_APP=run.py. ENV FLASK_RUN_HOST=0.0.0.0 EXPOSE 8000 diff --git a/app.log b/app.log index 5c264fd..4282a8a 100644 --- a/app.log +++ b/app.log @@ -29,3 +29,185 @@ 2025-09-09 10:25:42,100 - INFO - [2025-09-09 10:25:42] Salary detection complete 2025-09-09 10:27:03,741 - INFO - Shutting down Salary Analytics API... +2025-09-09 10:29:59,503 - INFO - Initializing pipeline... +2025-09-09 10:29:59,506 - INFO - [2025-09-09 10:29:59] Detecting salary... +2025-09-09 10:29:59,506 - INFO - Started autonomous salary detection loop. +2025-09-09 10:29:59,534 - INFO - Server running on hostname: 1c3f3ceb2429 +2025-09-09 10:29:59,535 - INFO - Server IP address: 172.25.0.2 +2025-09-09 10:29:59,535 - INFO - Server is accessible at: +2025-09-09 10:29:59,536 - INFO - - http://localhost:8000 +2025-09-09 10:29:59,537 - INFO - - http://127.0.0.1:8000 +2025-09-09 10:29:59,539 - INFO - - http://172.25.0.2:8000 +2025-09-09 10:29:59,541 - INFO - Pipeline initialized successfully +2025-09-09 10:30:04,484 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:30:04,485 - INFO - [2025-09-09 10:30:04] Salary detection complete +2025-09-09 10:30:47,978 - INFO - Shutting down Salary Analytics API... +2025-09-09 10:41:41,451 - INFO - Initializing pipeline... +2025-09-09 10:41:41,456 - INFO - [2025-09-09 10:41:41] Detecting salary... +2025-09-09 10:41:41,457 - INFO - Started autonomous salary detection loop. +2025-09-09 10:41:41,481 - INFO - Server running on hostname: 1c3f3ceb2429 +2025-09-09 10:41:41,485 - INFO - Server IP address: 172.25.0.2 +2025-09-09 10:41:41,486 - INFO - Server is accessible at: +2025-09-09 10:41:41,486 - INFO - - http://localhost:8000 +2025-09-09 10:41:41,488 - INFO - - http://127.0.0.1:8000 +2025-09-09 10:41:41,490 - INFO - - http://172.25.0.2:8000 +2025-09-09 10:41:41,491 - INFO - Pipeline initialized successfully +2025-09-09 10:41:42,431 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:41:42,432 - INFO - [2025-09-09 10:41:42] Salary detection complete +2025-09-09 10:43:42,431 - INFO - [2025-09-09 10:43:42] Detecting salary... +2025-09-09 10:43:43,092 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:43:43,093 - INFO - [2025-09-09 10:43:43] Salary detection complete +2025-09-09 10:45:43,093 - INFO - [2025-09-09 10:45:43] Detecting salary... +2025-09-09 10:45:43,818 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:45:43,819 - INFO - [2025-09-09 10:45:43] Salary detection complete +2025-09-09 10:47:16,454 - INFO - Shutting down Salary Analytics API... +2025-09-09 10:47:30,172 - INFO - Initializing pipeline... +2025-09-09 10:47:30,174 - INFO - [2025-09-09 10:47:30] Detecting salary... +2025-09-09 10:47:30,175 - INFO - Started autonomous salary detection loop. +2025-09-09 10:47:30,185 - INFO - Server running on hostname: 1c3f3ceb2429 +2025-09-09 10:47:30,188 - INFO - Server IP address: 172.25.0.2 +2025-09-09 10:47:30,188 - INFO - Server is accessible at: +2025-09-09 10:47:30,189 - INFO - - http://localhost:8000 +2025-09-09 10:47:30,190 - INFO - - http://127.0.0.1:8000 +2025-09-09 10:47:30,191 - INFO - - http://172.25.0.2:8000 +2025-09-09 10:47:30,191 - INFO - Pipeline initialized successfully +2025-09-09 10:47:31,032 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:47:31,033 - INFO - [2025-09-09 10:47:31] Salary detection complete +2025-09-09 10:47:38,286 - INFO - Shutting down Salary Analytics API... +2025-09-09 10:47:47,645 - INFO - generated new fontManager +2025-09-09 10:48:19,231 - INFO - generated new fontManager +2025-09-09 10:48:24,426 - INFO - Initializing pipeline... +2025-09-09 10:48:24,429 - INFO - [2025-09-09 10:48:24] Detecting salary... +2025-09-09 10:48:24,429 - INFO - Started autonomous salary detection loop. +2025-09-09 10:48:24,441 - INFO - Server running on hostname: 349f9fd0c78b +2025-09-09 10:48:24,442 - INFO - Server IP address: 172.25.0.2 +2025-09-09 10:48:24,444 - INFO - Server is accessible at: +2025-09-09 10:48:24,445 - INFO - - http://localhost:8000 +2025-09-09 10:48:24,448 - INFO - - http://127.0.0.1:8000 +2025-09-09 10:48:24,450 - INFO - - http://172.25.0.2:8000 +2025-09-09 10:48:24,451 - INFO - Pipeline initialized successfully +2025-09-09 10:48:25,094 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:48:25,095 - INFO - [2025-09-09 10:48:25] Salary detection complete +2025-09-09 10:49:03,380 - INFO - Shutting down Salary Analytics API... +2025-09-09 10:49:18,345 - INFO - Initializing pipeline... +2025-09-09 10:49:18,346 - INFO - [2025-09-09 10:49:18] Detecting salary... +2025-09-09 10:49:18,347 - INFO - Started autonomous salary detection loop. +2025-09-09 10:49:18,352 - INFO - Server running on hostname: 349f9fd0c78b +2025-09-09 10:49:18,353 - INFO - Server IP address: 172.25.0.2 +2025-09-09 10:49:18,353 - INFO - Server is accessible at: +2025-09-09 10:49:18,354 - INFO - - http://localhost:8000 +2025-09-09 10:49:18,355 - INFO - - http://127.0.0.1:8000 +2025-09-09 10:49:18,365 - INFO - - http://172.25.0.2:8000 +2025-09-09 10:49:18,366 - INFO - Pipeline initialized successfully +2025-09-09 10:50:37,994 - INFO - generated new fontManager +2025-09-09 10:50:45,235 - INFO - Initializing pipeline... +2025-09-09 10:50:45,238 - INFO - [2025-09-09 10:50:45] Detecting salary... +2025-09-09 10:50:45,238 - INFO - Started autonomous salary detection loop. +2025-09-09 10:50:45,244 - INFO - Server running on hostname: 087fb63cb9f0 +2025-09-09 10:50:45,244 - INFO - Server IP address: 172.25.0.2 +2025-09-09 10:50:45,245 - INFO - Server is accessible at: +2025-09-09 10:50:45,245 - INFO - - http://localhost:8000 +2025-09-09 10:50:45,246 - INFO - - http://127.0.0.1:8000 +2025-09-09 10:50:45,247 - INFO - - http://172.25.0.2:8000 +2025-09-09 10:50:45,248 - INFO - Pipeline initialized successfully +2025-09-09 10:50:46,400 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-09 10:50:46,401 - INFO - [2025-09-09 10:50:46] Salary detection complete +2025-09-09 10:51:51,570 - INFO - Shutting down Salary Analytics API... +2025-09-09 11:01:38,522 - INFO - generated new fontManager +2025-09-09 11:01:45,459 - INFO - Initializing pipeline... +2025-09-09 11:01:45,463 - INFO - [2025-09-09 11:01:45] Detecting salary... +2025-09-09 11:01:45,464 - INFO - Started autonomous salary detection loop. +2025-09-09 11:01:45,483 - INFO - Server running on hostname: 5d4fdd4232a7 +2025-09-09 11:01:45,484 - INFO - Server IP address: 172.25.0.2 +2025-09-09 11:01:45,485 - INFO - Server is accessible at: +2025-09-09 11:01:45,491 - INFO - - http://localhost:8000 +2025-09-09 11:01:45,493 - INFO - - http://127.0.0.1:8000 +2025-09-09 11:01:45,495 - INFO - - http://172.25.0.2:8000 +2025-09-09 11:01:45,496 - INFO - Pipeline initialized successfully +2025-09-09 11:02:00,358 - INFO - Shutting down Salary Analytics API... +2025-09-09 11:02:15,204 - INFO - Initializing pipeline... +2025-09-09 11:02:15,208 - INFO - [2025-09-09 11:02:15] Detecting salary... +2025-09-09 11:02:15,208 - INFO - Started autonomous salary detection loop. +2025-09-09 11:02:15,395 - INFO - Server running on hostname: 5d4fdd4232a7 +2025-09-09 11:02:15,397 - INFO - Server IP address: 172.25.0.2 +2025-09-09 11:02:15,415 - INFO - Server is accessible at: +2025-09-09 11:02:15,417 - INFO - - http://localhost:8000 +2025-09-09 11:02:15,417 - INFO - - http://127.0.0.1:8000 +2025-09-09 11:02:15,418 - INFO - - http://172.25.0.2:8000 +2025-09-09 11:02:15,419 - INFO - Pipeline initialized successfully +2025-09-09 11:04:18,780 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 500, response: +
+Internal Server Error
+ + + + +2025-09-09 11:04:18,781 - INFO - [2025-09-09 11:04:18] Salary detection complete +2025-09-09 11:04:41,264 - INFO - Initializing SalaryAnalyticsPipeline +2025-09-09 11:04:41,265 - INFO - Starting data loading process +2025-09-09 11:04:41,265 - INFO - No database connection. Attempting to connect... +2025-09-09 11:04:41,266 - INFO - Attempting to connect to database... +2025-09-09 11:05:42,201 - ERROR - Error connecting to database: (psycopg2.OperationalError) connection to server at "dev-data.simbrellang.net" (209.195.2.27), port 1521 failed: server closed the connection unexpectedly + This probably means the server terminated abnormally + before or while processing the request. + +(Background on this error at: https://sqlalche.me/e/20/e3q8) +2025-09-09 11:05:42,202 - ERROR - Failed to establish database connection +2025-09-09 11:05:42,202 - ERROR - Failed to load data +2025-09-09 11:05:42,203 - ERROR - Failed to load data +2025-09-09 11:05:42,203 - INFO - Load data endpoint failed after 60.94 seconds +2025-09-09 11:05:42,204 - ERROR - Error loading data: 500: Failed to load data +2025-09-09 11:05:42,206 - INFO - Load data endpoint failed after 60.94 seconds +2025-09-09 11:06:18,783 - INFO - [2025-09-09 11:06:18] Detecting salary... diff --git a/app/api.py b/app/api.py deleted file mode 100644 index 3a65527..0000000 --- a/app/api.py +++ /dev/null @@ -1,601 +0,0 @@ -""" -FastAPI application for salary analytics. -""" - -from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends -from fastapi.responses import FileResponse -import os -import socket -from typing import Optional, List, Union -import pandas as pd -import tempfile -from datetime import datetime -from sqlalchemy import text -import warnings -import time -from app.salary_analytics.services.main import SalaryAnalyticsPipeline -from app.config import OUTPUT_PATHS, TABLE_NAME -from app.salary_analytics.services.data_loader import DataLoader -from app.salary_analytics.middlewares.middleware import add_middlewares -from app.models.db_operations import DatabaseOperations -from app.salary_analytics.integrations.salary_detect import SalaryDetect -from app.utils.logger import logger -from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse - - -# Suppress warnings -warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy') -pd.options.mode.chained_assignment = None - -app = FastAPI( - title="Salary Analytics API", - description="API for analyzing and predicting salary patterns from transaction data", - version="1.0.0" -) - -# Add CORS middleware -add_middlewares(app) - -# Global pipeline instance -pipeline = SalaryAnalyticsPipeline() - -# Global variables to store loaded data and models -data_loader = None -df = None -salary_predictor = None -salary_earner_analyzer = None - -# salary_detect = SalaryDetect() - - - -# def check_data_loaded(): -# """Check if data is loaded before running analytics.""" -# if pipeline.df is None: -# raise HTTPException( -# status_code=400, -# detail="No data loaded. Please load data first using the /load-data endpoint." -# ) - -# @app.on_event("startup") -# async def startup_event(): -# """Initialize the pipeline on startup.""" -# try: -# logger.info("Initializing pipeline...") - -# # Start autonomous salary detection loop -# salary_detect.start() -# logger.info("Started autonomous salary detection loop.") - -# # Print network information -# hostname = socket.gethostname() -# ip_address = socket.gethostbyname(hostname) -# logger.info(f"Server running on hostname: {hostname}") -# logger.info(f"Server IP address: {ip_address}") -# logger.info(f"Server is accessible at:") -# logger.info(f"- http://localhost:8000") -# logger.info(f"- http://127.0.0.1:8000") -# logger.info(f"- http://{ip_address}:8000") -# logger.info("Pipeline initialized successfully") -# except Exception as e: -# logger.error(f"Error during startup: {str(e)}") -# raise - - - -# @app.get("/") -# async def root(): -# """Root endpoint.""" -# start_time = time.time() -# logger.info("Root endpoint accessed") -# response = {"message": "Welcome to Salary Analytics API"} -# logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") -# return response - - - -# @app.get("/health") -# async def health_check(): -# """Health check endpoint.""" -# start_time = time.time() -# logger.info("Health check endpoint accessed") -# response = {"status": "healthy"} -# logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") -# return response - - - -# @app.post("/analyze/keyword", response_model=AnalysisResponse) -# async def analyze_keyword(): -# """Run keyword-based salary transaction analysis.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info("Starting keyword analysis...") -# data = pipeline.run_keyword_analysis() -# logger.info(f"Keyword analysis completed. Found {len(data)} matches") -# response = AnalysisResponse( -# message="Keyword analysis completed successfully", -# data={"count": len(data)} -# ) -# logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error in keyword analysis: {str(e)}") -# logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.post("/analyze/consistent-amount", response_model=AnalysisResponse) -# async def analyze_consistent_amount(): -# """Run consistent amount transaction analysis.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info("Starting consistent amount analysis...") -# data = pipeline.run_consistent_amount_analysis() -# logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") -# response = AnalysisResponse( -# message="Consistent amount analysis completed successfully", -# data={"count": len(data)} -# ) -# logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error in consistent amount analysis: {str(e)}") -# logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.post("/analyze/transaction-type", response_model=AnalysisResponse) -# async def analyze_transaction_type(): -# """Run transaction type analysis.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info("Starting transaction type analysis...") -# data = pipeline.run_transaction_type_analysis() -# logger.info(f"Transaction type analysis completed. Found {len(data)} matches") -# response = AnalysisResponse( -# message="Transaction type analysis completed successfully", -# data={"count": len(data)} -# ) -# logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error in transaction type analysis: {str(e)}") -# logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.post("/generate/reports", response_model=AnalysisResponse) -# async def generate_reports(background_tasks: BackgroundTasks): -# """Generate salary earner reports.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info("Starting report generation...") -# reports = pipeline.generate_salary_earner_reports() -# logger.info("Reports generated successfully") -# response = AnalysisResponse( -# message="Reports generated successfully", -# data={ -# "verified_salary_earners": len(reports['final_table']), -# "likely_salary_earners": len(reports['likely_salary_earner']), -# "high_earners": reports['total_high_earners'] -# } -# ) -# logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error in report generation: {str(e)}") -# logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.post("/train/models", response_model=AnalysisResponse) -# async def train_models(): -# """Train salary prediction models.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info("Starting model training...") -# pipeline.train_salary_prediction_models() -# logger.info("Models trained successfully") -# response = AnalysisResponse( -# message="Models trained successfully" -# ) -# logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error in model training: {str(e)}") -# logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.get("/download/{report_type}") -# async def download_report(report_type: str): -# """Download generated reports.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info(f"Attempting to download report: {report_type}") -# file_paths = { -# "high_earners": OUTPUT_PATHS["high_earner_details"], -# "likely_earners": OUTPUT_PATHS["likely_salary_earner"], -# "final_table": OUTPUT_PATHS["final_table"], -# "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], -# "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], -# "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] -# } - -# if report_type not in file_paths: -# logger.error(f"Report type not found: {report_type}") -# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=404, detail="Report type not found") - -# file_path = file_paths[report_type] -# if not os.path.exists(file_path): -# logger.error(f"Report file not found: {file_path}") -# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=404, detail="Report file not found") - -# logger.info(f"Successfully found report file: {file_path}") -# response = FileResponse( -# path=file_path, -# filename=os.path.basename(file_path), -# media_type="application/octet-stream" -# ) -# logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error downloading report: {str(e)}") -# logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.post("/run/pipeline", response_model=AnalysisResponse) -# async def run_full_pipeline(): -# """Run the complete salary analytics pipeline.""" -# start_time = time.time() -# try: -# check_data_loaded() -# logger.info("Starting full pipeline...") -# success = pipeline.run_full_pipeline() -# if not success: -# logger.error("Pipeline failed") -# logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail="Pipeline failed") - -# logger.info("Pipeline completed successfully") -# response = AnalysisResponse( -# message="Pipeline completed successfully" -# ) -# logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error in pipeline: {str(e)}") -# logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - - - -# @app.post("/load-data") -# async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): -# """ -# Load data from either database or CSV file. - -# Args: -# source (str): Source of data ('db' or 'csv') -# file (UploadFile, optional): CSV file to load (required if source is 'csv') - -# Returns: -# dict: Status of data loading -# """ -# start_time = time.time() -# try: -# if source not in ['db', 'csv']: -# logger.error(f"Invalid source: {source}") -# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - -# if source == 'csv' and not file: -# logger.error("No file provided for CSV source") -# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - -# if source == 'csv': -# # Save uploaded file temporarily -# with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: -# content = await file.read() -# temp_file.write(content) -# temp_file_path = temp_file.name - -# try: -# success = pipeline.load_data(source='csv', file_path=temp_file_path) -# finally: -# # Clean up temporary file -# os.unlink(temp_file_path) -# else: -# success = pipeline.load_data(source='db') - -# if not success: -# logger.error("Failed to load data") -# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail="Failed to load data") - -# response = { -# "status": "success", -# "message": f"Successfully loaded {len(pipeline.df)} rows of data", -# "columns": pipeline.df.columns.tolist(), -# "row_count": len(pipeline.df) -# } -# logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") -# return response -# except Exception as e: -# logger.error(f"Error loading data: {str(e)}") -# logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - -# async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): -# """Dependency to handle file upload only when source is csv.""" -# if source == 'csv' and not file: -# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") -# return file - - - -# @app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) -# async def run_streaming_pipeline( -# source: str = "db", -# batch_size: int = 10000, -# file: Optional[Union[UploadFile, str]] = File(None) -# ): -# """ -# Run the complete salary analytics pipeline in batches. - -# Args: -# source (str): Source of data ('db' or 'csv') -# batch_size (int): Number of rows to process in each batch -# file (UploadFile, optional): CSV file to load (required if source is 'csv') - -# Returns: -# List[BatchResponse]: List of responses for each batch processed -# """ -# start_time = time.time() -# try: -# if source not in ['db', 'csv']: -# logger.error(f"Invalid source: {source}") -# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - -# if source == 'csv' and not file: -# logger.error("No file provided for CSV source") -# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - -# # Initialize data loader -# data_loader = DataLoader() -# data_loader.chunk_size = batch_size - -# # Create output directory for batch results -# timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") -# batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") -# os.makedirs(batch_output_dir, exist_ok=True) - -# # Initialize database operations -# if not data_loader.connect(): -# logger.error("Failed to connect to database") -# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail="Failed to connect to database") - -# db_ops = DatabaseOperations(data_loader.engine) -# if not db_ops.create_batch_results_table(): -# logger.error("Failed to create batch results table") -# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail="Failed to create batch results table") - -# responses = [] -# batch_number = 0 -# batch_start_time = time.time() - -# def preprocess_chunk(chunk): -# """Preprocess a chunk of data with the same logic as DataLoader.""" -# # Convert dates -# chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) -# chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) - -# # Rename columns -# chunk = chunk.rename(columns={ -# 'd1': 'trx_type', -# 'd2': 'trx_subtype', -# 'd3': 'initiated_by', -# 'd4': 'customer_id' -# }) - -# chunk = chunk.dropna() - -# return chunk - -# if source == 'csv': -# # Save uploaded file temporarily -# with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: -# content = await file.read() -# temp_file.write(content) -# temp_file_path = temp_file.name - -# try: -# # Process CSV in chunks -# for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): -# batch_number += 1 -# logger.info(f"Processing batch {batch_number}") - -# # Preprocess chunk -# chunk = preprocess_chunk(chunk) - -# # Run pipeline on chunk -# pipeline = SalaryAnalyticsPipeline() -# pipeline.df = chunk - -# try: -# batch_start_time = time.time() -# # Run analyses -# pipeline.run_keyword_analysis() -# pipeline.run_consistent_amount_analysis() -# pipeline.run_transaction_type_analysis() - -# # Generate reports -# reports = pipeline.generate_salary_earner_reports() - -# # Add batch metadata to results -# results_df = reports['final_table'].copy() -# results_df['batch_number'] = batch_number -# results_df['total_batches'] = -1 # Unknown for CSV -# results_df['processed_at'] = datetime.now() - -# # Save batch results to CSV -# batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") -# results_df.to_csv(batch_results_path, index=False) - -# # Save to database -# db_ops.save_batch_to_db( -# batch_number=batch_number, -# total_batches=-1, # Unknown for CSV -# results_df=results_df, -# status="success" -# ) - -# logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") - -# responses.append(BatchResponse( -# batch_number=batch_number, -# total_batches=-1, # Unknown for CSV -# processed_rows=len(chunk), -# results_path=batch_results_path, -# message=f"Successfully processed batch {batch_number}" -# )) -# except Exception as e: -# error_message = str(e) -# logger.error(f"Error processing batch {batch_number}: {error_message}") - -# # Save error to database -# db_ops.save_batch_to_db( -# batch_number=batch_number, -# total_batches=-1, -# results_df=pd.DataFrame(), # Empty DataFrame for error case -# status="error" -# ) - -# responses.append(BatchResponse( -# batch_number=batch_number, -# total_batches=-1, -# processed_rows=len(chunk), -# results_path="", -# message=f"Error processing batch {batch_number}: {error_message}" -# )) -# finally: -# # Clean up temporary file -# os.unlink(temp_file_path) -# else: -# # Process database in chunks -# if not data_loader.connect(): -# raise HTTPException(status_code=500, detail="Failed to connect to database") - -# # Get total row count -# with data_loader.engine.connect() as conn: -# count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") -# total_rows = conn.execute(count_query).scalar() - -# total_batches = (total_rows + batch_size - 1) // batch_size -# offset = 0 - -# while offset < total_rows: -# batch_number += 1 -# logger.info(f"Processing batch {batch_number} of {total_batches}") - -# # Load chunk from database -# query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" -# chunk = pd.read_sql(query, data_loader.engine) - -# if chunk.empty: -# break - -# # Preprocess chunk -# chunk = preprocess_chunk(chunk) - -# # Run pipeline on chunk -# pipeline = SalaryAnalyticsPipeline() -# pipeline.df = chunk - -# try: -# batch_start_time = time.time() -# # Run analyses -# pipeline.run_keyword_analysis() -# pipeline.run_consistent_amount_analysis() -# pipeline.run_transaction_type_analysis() - -# # Generate reports -# reports = pipeline.generate_salary_earner_reports() - -# # Add batch metadata to results -# results_df = reports['final_table'].copy() -# results_df['batch_number'] = batch_number -# results_df['total_batches'] = total_batches -# results_df['processed_at'] = datetime.now() - -# # Save batch results to CSV -# batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") -# results_df.to_csv(batch_results_path, index=False) - -# # Save to database -# db_ops.save_batch_to_db( -# batch_number=batch_number, -# total_batches=total_batches, -# results_df=results_df, -# status="success" -# ) - -# logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") - -# responses.append(BatchResponse( -# batch_number=batch_number, -# total_batches=total_batches, -# processed_rows=len(chunk), -# results_path=batch_results_path, -# message=f"Successfully processed batch {batch_number} of {total_batches}" -# )) -# except Exception as e: -# error_message = str(e) -# logger.error(f"Error processing batch {batch_number}: {error_message}") - -# # Save error to database -# db_ops.save_batch_to_db( -# batch_number=batch_number, -# total_batches=total_batches, -# results_df=pd.DataFrame(), # Empty DataFrame for error case -# status="error" -# ) - -# responses.append(BatchResponse( -# batch_number=batch_number, -# total_batches=total_batches, -# processed_rows=len(chunk), -# results_path="", -# message=f"Error processing batch {batch_number}: {error_message}" -# )) - -# offset += batch_size - -# logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") -# return responses -# except Exception as e: -# logger.error(f"Error in streaming pipeline: {str(e)}") -# logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") -# raise HTTPException(status_code=500, detail=str(e)) - diff --git a/app/config.py b/app/config.py index b2f671a..73e206e 100644 --- a/app/config.py +++ b/app/config.py @@ -25,18 +25,29 @@ os.makedirs(MODEL_DIR, exist_ok=True) # Database Configuration DB_CONFIG = { - "user": os.getenv("DB_USER"), - "password": os.getenv("DB_PASSWORD"), - "name": os.getenv("DB_NAME"), - "port": os.getenv("DB_PORT"), - "host": os.getenv("DB_HOST") + "user": os.getenv("DATABASE_USER"), + "password": os.getenv("DATABASE_PASSWORD"), + "name": os.getenv("DATABASE_NAME"), + "port": os.getenv("DATABASE_PORT", 10532), + "host": os.getenv("DATABASE_HOST", "firstadvancedev"), + "sid": os.getenv("DATABASE_SID", "FREE") } + +DNS = f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={DB_CONFIG['host']})(PORT={DB_CONFIG['port']}))(CONNECT_DATA=(SID={DB_CONFIG['sid']})))" + +# Database Connection +SQLALCHEMY_DATABASE_URI_INTERNAL = (f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DNS}") +SQLALCHEMY_DATABASE_URI = os.getenv("SQLALCHEMY_DATABASE_URI_FULL", SQLALCHEMY_DATABASE_URI_INTERNAL) + +#SQLALCHEMY_DATABASE_URI_FULL = 'oracle+oracledb://FIRSTADVSTG:Pchanged_56789@10.2.110.30:1521/?service_name=firstadv' + # SQLAlchemy Configuration -SQLALCHEMY_DATABASE_URI = ( - f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" - f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}" -) +# SQLALCHEMY_DATABASE_URI = ( +# f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" +# f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}" +# ) + SQLALCHEMY_TRACK_MODIFICATIONS = False # Table Configuration @@ -81,7 +92,7 @@ OUTPUT_PATHS = { } SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337") -SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS","api/rac-check") +SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS", "api/rac-check") # Salary Detect Endpoint Config SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect"