8acfb436f3
- Added `/load-data` endpoint to load transaction data from either a database or a CSV file. - Updated `SalaryAnalyticsPipeline` and `DataLoader` to support loading from CSV. - Implemented data validation and error handling for loading processes. - Revised README to include new data loading instructions and workflow steps. - Added checks to ensure data is loaded before running analysis endpoints.
154 lines
6.5 KiB
Python
154 lines
6.5 KiB
Python
"""
|
|
Main module for running the salary analytics pipeline.
|
|
"""
|
|
|
|
import logging
|
|
from .data_loader import DataLoader
|
|
from .keyword_analyzer import KeywordAnalyzer
|
|
from .consistent_amount_analyzer import ConsistentAmountAnalyzer
|
|
from .transaction_type_analyzer import TransactionTypeAnalyzer
|
|
from .salary_earner_analyzer import SalaryEarnerAnalyzer
|
|
from .salary_predictor import SalaryPredictor
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class SalaryAnalyticsPipeline:
|
|
def __init__(self):
|
|
logger.info("Initializing SalaryAnalyticsPipeline")
|
|
self.data_loader = None
|
|
self.df = None
|
|
self.keyword_analyzer = None
|
|
self.consistent_amount_analyzer = None
|
|
self.transaction_type_analyzer = None
|
|
self.salary_earner_analyzer = None
|
|
self.salary_predictor = None
|
|
|
|
def load_data(self, source='db', file_path=None):
|
|
"""Load and preprocess the transaction data."""
|
|
logger.info("Starting data loading process")
|
|
self.data_loader = DataLoader()
|
|
self.df = self.data_loader.load_data(source=source, file_path=file_path)
|
|
if self.df is not None:
|
|
logger.info(f"Successfully loaded data with {len(self.df)} rows")
|
|
else:
|
|
logger.error("Failed to load data")
|
|
return self.df is not None
|
|
|
|
def run_keyword_analysis(self):
|
|
"""Run keyword-based salary transaction analysis."""
|
|
if self.df is None:
|
|
logger.error("Data not loaded. Call load_data() first.")
|
|
raise ValueError("Data not loaded. Call load_data() first.")
|
|
|
|
logger.info("Starting keyword analysis")
|
|
self.keyword_analyzer = KeywordAnalyzer(self.df)
|
|
self.keyword_analyzer.identify_salary_transactions()
|
|
keyword_data = self.keyword_analyzer.get_salary_related_data()
|
|
|
|
# Update main DataFrame with keyword analysis results
|
|
self.df['is_salary_related'] = self.df.index.isin(keyword_data.index)
|
|
return keyword_data
|
|
|
|
def run_consistent_amount_analysis(self):
|
|
"""Run consistent amount transaction analysis."""
|
|
if self.df is None:
|
|
logger.error("Data not loaded. Call load_data() first.")
|
|
raise ValueError("Data not loaded. Call load_data() first.")
|
|
|
|
logger.info("Starting consistent amount analysis")
|
|
self.consistent_amount_analyzer = ConsistentAmountAnalyzer(self.df)
|
|
self.consistent_amount_analyzer.identify_consistent_amount_accounts()
|
|
consistent_data = self.consistent_amount_analyzer.get_consistent_amount_data()
|
|
|
|
# Update main DataFrame with consistent amount analysis results
|
|
self.df['is_consistent_amount'] = self.df.index.isin(consistent_data.index)
|
|
return consistent_data
|
|
|
|
def run_transaction_type_analysis(self):
|
|
"""Run transaction type analysis."""
|
|
if self.df is None:
|
|
logger.error("Data not loaded. Call load_data() first.")
|
|
raise ValueError("Data not loaded. Call load_data() first.")
|
|
|
|
logger.info("Starting transaction type analysis")
|
|
self.transaction_type_analyzer = TransactionTypeAnalyzer(self.df)
|
|
self.transaction_type_analyzer.flag_salary_type_transactions()
|
|
type_data = self.transaction_type_analyzer.get_salary_type_data()
|
|
|
|
# Update main DataFrame with transaction type analysis results
|
|
self.df['is_salary_type'] = self.df.index.isin(type_data.index)
|
|
return type_data
|
|
|
|
def generate_salary_earner_reports(self):
|
|
"""Generate salary earner reports."""
|
|
if self.df is None:
|
|
logger.error("Data not loaded. Call load_data() first.")
|
|
raise ValueError("Data not loaded. Call load_data() first.")
|
|
|
|
# Ensure all analysis flags are present
|
|
required_columns = ['is_salary_related', 'is_consistent_amount', 'is_salary_type']
|
|
missing_columns = [col for col in required_columns if col not in self.df.columns]
|
|
|
|
if missing_columns:
|
|
logger.error(f"Missing required columns: {missing_columns}")
|
|
raise ValueError(f"Missing required columns: {missing_columns}. Run all analyses first.")
|
|
|
|
logger.info("Starting salary earner report generation")
|
|
self.salary_earner_analyzer = SalaryEarnerAnalyzer(self.df)
|
|
return self.salary_earner_analyzer.generate_reports()
|
|
|
|
def train_salary_prediction_models(self):
|
|
"""Train salary prediction models."""
|
|
if self.df is None:
|
|
logger.error("Data not loaded. Call load_data() first.")
|
|
raise ValueError("Data not loaded. Call load_data() first.")
|
|
|
|
logger.info("Starting model training")
|
|
self.salary_predictor = SalaryPredictor(self.df)
|
|
|
|
# Get accounts from the salary earner analyzer
|
|
if self.salary_earner_analyzer is None:
|
|
logger.info("Salary earner analyzer not initialized. Generating reports first.")
|
|
self.generate_salary_earner_reports()
|
|
|
|
consistent_accounts = self.salary_earner_analyzer.final_table['accountid'].unique()
|
|
inconsistent_accounts = self.salary_earner_analyzer.likely_salary_earner['accountid'].unique()
|
|
|
|
self.salary_predictor.train_and_evaluate(consistent_accounts, inconsistent_accounts)
|
|
|
|
def run_full_pipeline(self, source='db', file_path=None):
|
|
"""Run the complete salary analytics pipeline."""
|
|
logger.info("Starting full pipeline execution")
|
|
if not self.load_data(source=source, file_path=file_path):
|
|
logger.error("Failed to load data. Exiting pipeline.")
|
|
return False
|
|
|
|
try:
|
|
logger.info("Running keyword analysis...")
|
|
self.run_keyword_analysis()
|
|
|
|
logger.info("Running consistent amount analysis...")
|
|
self.run_consistent_amount_analysis()
|
|
|
|
logger.info("Running transaction type analysis...")
|
|
self.run_transaction_type_analysis()
|
|
|
|
logger.info("Generating salary earner reports...")
|
|
self.generate_salary_earner_reports()
|
|
|
|
logger.info("Training salary prediction models...")
|
|
self.train_salary_prediction_models()
|
|
|
|
logger.info("Pipeline completed successfully!")
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"Pipeline failed: {str(e)}")
|
|
return False
|
|
|
|
def main():
|
|
"""Main function to run the salary analytics pipeline."""
|
|
pipeline = SalaryAnalyticsPipeline()
|
|
pipeline.run_full_pipeline()
|
|
|
|
if __name__ == "__main__":
|
|
main() |