Files
salakojoshua1234_gmail.com 8acfb436f3 Enhance API with data loading functionality and update README.
- Added `/load-data` endpoint to load transaction data from either a database or a CSV file.
- Updated `SalaryAnalyticsPipeline` and `DataLoader` to support loading from CSV.
- Implemented data validation and error handling for loading processes.
- Revised README to include new data loading instructions and workflow steps.
- Added checks to ensure data is loaded before running analysis endpoints.
2025-05-01 22:57:55 +01:00

154 lines
6.5 KiB
Python

"""
Main module for running the salary analytics pipeline.
"""
import logging
from .data_loader import DataLoader
from .keyword_analyzer import KeywordAnalyzer
from .consistent_amount_analyzer import ConsistentAmountAnalyzer
from .transaction_type_analyzer import TransactionTypeAnalyzer
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .salary_predictor import SalaryPredictor
logger = logging.getLogger(__name__)
class SalaryAnalyticsPipeline:
def __init__(self):
logger.info("Initializing SalaryAnalyticsPipeline")
self.data_loader = None
self.df = None
self.keyword_analyzer = None
self.consistent_amount_analyzer = None
self.transaction_type_analyzer = None
self.salary_earner_analyzer = None
self.salary_predictor = None
def load_data(self, source='db', file_path=None):
"""Load and preprocess the transaction data."""
logger.info("Starting data loading process")
self.data_loader = DataLoader()
self.df = self.data_loader.load_data(source=source, file_path=file_path)
if self.df is not None:
logger.info(f"Successfully loaded data with {len(self.df)} rows")
else:
logger.error("Failed to load data")
return self.df is not None
def run_keyword_analysis(self):
"""Run keyword-based salary transaction analysis."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting keyword analysis")
self.keyword_analyzer = KeywordAnalyzer(self.df)
self.keyword_analyzer.identify_salary_transactions()
keyword_data = self.keyword_analyzer.get_salary_related_data()
# Update main DataFrame with keyword analysis results
self.df['is_salary_related'] = self.df.index.isin(keyword_data.index)
return keyword_data
def run_consistent_amount_analysis(self):
"""Run consistent amount transaction analysis."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting consistent amount analysis")
self.consistent_amount_analyzer = ConsistentAmountAnalyzer(self.df)
self.consistent_amount_analyzer.identify_consistent_amount_accounts()
consistent_data = self.consistent_amount_analyzer.get_consistent_amount_data()
# Update main DataFrame with consistent amount analysis results
self.df['is_consistent_amount'] = self.df.index.isin(consistent_data.index)
return consistent_data
def run_transaction_type_analysis(self):
"""Run transaction type analysis."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting transaction type analysis")
self.transaction_type_analyzer = TransactionTypeAnalyzer(self.df)
self.transaction_type_analyzer.flag_salary_type_transactions()
type_data = self.transaction_type_analyzer.get_salary_type_data()
# Update main DataFrame with transaction type analysis results
self.df['is_salary_type'] = self.df.index.isin(type_data.index)
return type_data
def generate_salary_earner_reports(self):
"""Generate salary earner reports."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
# Ensure all analysis flags are present
required_columns = ['is_salary_related', 'is_consistent_amount', 'is_salary_type']
missing_columns = [col for col in required_columns if col not in self.df.columns]
if missing_columns:
logger.error(f"Missing required columns: {missing_columns}")
raise ValueError(f"Missing required columns: {missing_columns}. Run all analyses first.")
logger.info("Starting salary earner report generation")
self.salary_earner_analyzer = SalaryEarnerAnalyzer(self.df)
return self.salary_earner_analyzer.generate_reports()
def train_salary_prediction_models(self):
"""Train salary prediction models."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting model training")
self.salary_predictor = SalaryPredictor(self.df)
# Get accounts from the salary earner analyzer
if self.salary_earner_analyzer is None:
logger.info("Salary earner analyzer not initialized. Generating reports first.")
self.generate_salary_earner_reports()
consistent_accounts = self.salary_earner_analyzer.final_table['accountid'].unique()
inconsistent_accounts = self.salary_earner_analyzer.likely_salary_earner['accountid'].unique()
self.salary_predictor.train_and_evaluate(consistent_accounts, inconsistent_accounts)
def run_full_pipeline(self, source='db', file_path=None):
"""Run the complete salary analytics pipeline."""
logger.info("Starting full pipeline execution")
if not self.load_data(source=source, file_path=file_path):
logger.error("Failed to load data. Exiting pipeline.")
return False
try:
logger.info("Running keyword analysis...")
self.run_keyword_analysis()
logger.info("Running consistent amount analysis...")
self.run_consistent_amount_analysis()
logger.info("Running transaction type analysis...")
self.run_transaction_type_analysis()
logger.info("Generating salary earner reports...")
self.generate_salary_earner_reports()
logger.info("Training salary prediction models...")
self.train_salary_prediction_models()
logger.info("Pipeline completed successfully!")
return True
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
return False
def main():
"""Main function to run the salary analytics pipeline."""
pipeline = SalaryAnalyticsPipeline()
pipeline.run_full_pipeline()
if __name__ == "__main__":
main()