""" Main module for running the salary analytics pipeline. """ import logging from .data_loader import DataLoader from .keyword_analyzer import KeywordAnalyzer from .consistent_amount_analyzer import ConsistentAmountAnalyzer from .transaction_type_analyzer import TransactionTypeAnalyzer from .salary_earner_analyzer import SalaryEarnerAnalyzer from .salary_predictor import SalaryPredictor logger = logging.getLogger(__name__) class SalaryAnalyticsPipeline: def __init__(self): logger.info("Initializing SalaryAnalyticsPipeline") self.data_loader = None self.df = None self.keyword_analyzer = None self.consistent_amount_analyzer = None self.transaction_type_analyzer = None self.salary_earner_analyzer = None self.salary_predictor = None def load_data(self, source='db', file_path=None): """Load and preprocess the transaction data.""" logger.info("Starting data loading process") self.data_loader = DataLoader() self.df = self.data_loader.load_data(source=source, file_path=file_path) if self.df is not None: logger.info(f"Successfully loaded data with {len(self.df)} rows") else: logger.error("Failed to load data") return self.df is not None def run_keyword_analysis(self): """Run keyword-based salary transaction analysis.""" if self.df is None: logger.error("Data not loaded. Call load_data() first.") raise ValueError("Data not loaded. Call load_data() first.") logger.info("Starting keyword analysis") self.keyword_analyzer = KeywordAnalyzer(self.df) self.keyword_analyzer.identify_salary_transactions() keyword_data = self.keyword_analyzer.get_salary_related_data() # Update main DataFrame with keyword analysis results self.df['is_salary_related'] = self.df.index.isin(keyword_data.index) return keyword_data def run_consistent_amount_analysis(self): """Run consistent amount transaction analysis.""" if self.df is None: logger.error("Data not loaded. Call load_data() first.") raise ValueError("Data not loaded. Call load_data() first.") logger.info("Starting consistent amount analysis") self.consistent_amount_analyzer = ConsistentAmountAnalyzer(self.df) self.consistent_amount_analyzer.identify_consistent_amount_accounts() consistent_data = self.consistent_amount_analyzer.get_consistent_amount_data() # Update main DataFrame with consistent amount analysis results self.df['is_consistent_amount'] = self.df.index.isin(consistent_data.index) return consistent_data def run_transaction_type_analysis(self): """Run transaction type analysis.""" if self.df is None: logger.error("Data not loaded. Call load_data() first.") raise ValueError("Data not loaded. Call load_data() first.") logger.info("Starting transaction type analysis") self.transaction_type_analyzer = TransactionTypeAnalyzer(self.df) self.transaction_type_analyzer.flag_salary_type_transactions() type_data = self.transaction_type_analyzer.get_salary_type_data() # Update main DataFrame with transaction type analysis results self.df['is_salary_type'] = self.df.index.isin(type_data.index) return type_data def generate_salary_earner_reports(self): """Generate salary earner reports.""" if self.df is None: logger.error("Data not loaded. Call load_data() first.") raise ValueError("Data not loaded. Call load_data() first.") # Ensure all analysis flags are present required_columns = ['is_salary_related', 'is_consistent_amount', 'is_salary_type'] missing_columns = [col for col in required_columns if col not in self.df.columns] if missing_columns: logger.error(f"Missing required columns: {missing_columns}") raise ValueError(f"Missing required columns: {missing_columns}. Run all analyses first.") logger.info("Starting salary earner report generation") self.salary_earner_analyzer = SalaryEarnerAnalyzer(self.df) return self.salary_earner_analyzer.generate_reports() def train_salary_prediction_models(self): """Train salary prediction models.""" if self.df is None: logger.error("Data not loaded. Call load_data() first.") raise ValueError("Data not loaded. Call load_data() first.") logger.info("Starting model training") self.salary_predictor = SalaryPredictor(self.df) # Get accounts from the salary earner analyzer if self.salary_earner_analyzer is None: logger.info("Salary earner analyzer not initialized. Generating reports first.") self.generate_salary_earner_reports() consistent_accounts = self.salary_earner_analyzer.final_table['accountid'].unique() inconsistent_accounts = self.salary_earner_analyzer.likely_salary_earner['accountid'].unique() self.salary_predictor.train_and_evaluate(consistent_accounts, inconsistent_accounts) def run_full_pipeline(self, source='db', file_path=None): """Run the complete salary analytics pipeline.""" logger.info("Starting full pipeline execution") if not self.load_data(source=source, file_path=file_path): logger.error("Failed to load data. Exiting pipeline.") return False try: logger.info("Running keyword analysis...") self.run_keyword_analysis() logger.info("Running consistent amount analysis...") self.run_consistent_amount_analysis() logger.info("Running transaction type analysis...") self.run_transaction_type_analysis() logger.info("Generating salary earner reports...") self.generate_salary_earner_reports() logger.info("Training salary prediction models...") self.train_salary_prediction_models() logger.info("Pipeline completed successfully!") return True except Exception as e: logger.error(f"Pipeline failed: {str(e)}") return False def main(): """Main function to run the salary analytics pipeline.""" pipeline = SalaryAnalyticsPipeline() pipeline.run_full_pipeline() if __name__ == "__main__": main()