""" Salary prediction module using machine learning. """ import pandas as pd import numpy as np import matplotlib.pyplot as plt from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from joblib import dump from .config import OUTPUT_PATHS class SalaryPredictor: def __init__(self, df): self.df = df self.model_cons = None self.model_incons = None self.scaler_cons = None self.scaler_incons = None def add_feature_engineering(self, df): """Engineer features for salary prediction.""" df['month'] = df['trx_start_date'].dt.month df['month_seq'] = df.groupby(['accountid', 'month']).ngroup() + 1 # Categorical encoding encoder = OneHotEncoder(sparse_output=False) encoded_trx_type = encoder.fit_transform(df[['trx_type']]) encoded_df = pd.DataFrame(encoded_trx_type, columns=encoder.get_feature_names_out(['trx_type'])) df = pd.concat([df, encoded_df], axis=1) # Rolling statistics df = df.sort_values(['accountid', 'trx_start_date']) df['rolling_sum_3m'] = df.groupby('accountid')['amount'].rolling(window=3, min_periods=1).sum().reset_index(0, drop=True) df['rolling_avg_3m'] = df.groupby('accountid')['amount'].rolling(window=3, min_periods=1).mean().reset_index(0, drop=True) return df def prepare_data(self, df_transactions, accounts): """Prepare data for training and testing.""" df_filtered = df_transactions[df_transactions['accountid'].isin(accounts)].copy() print(f"Filtered data for {len(accounts)} accounts.") print(f"Total transactions: {len(df_filtered)}") # Drop unnecessary columns df_filtered = df_filtered.drop(['description', 'id', 'customer_id', 'trx_end_date', 'is_salary_related', 'is_consistent_amount', 'is_salary_type'], axis=1) # Add feature engineering df_filtered = self.add_feature_engineering(df_filtered) # Aggregate monthly data agg_funcs = { 'amount': 'mean', 'rolling_sum_3m': 'last', 'rolling_avg_3m': 'last', 'month': 'first' } encoded_cols = [col for col in df_filtered.columns if col.startswith('trx_type_')] for col in encoded_cols: agg_funcs[col] = 'sum' monthly_data = df_filtered.groupby(['accountid', 'month_seq']).agg(agg_funcs).reset_index() # Filter accounts with at least 12 months account_month_counts = monthly_data.groupby('accountid')['month_seq'].max() valid_accounts = account_month_counts[account_month_counts >= 12].index monthly_data = monthly_data[monthly_data['accountid'].isin(valid_accounts)] # Create sequences X_train, y_train, X_test, y_test = [], [], [], [] feature_cols = ['accountid', 'amount', 'rolling_sum_3m', 'rolling_avg_3m', 'month'] + encoded_cols for account in valid_accounts: account_data = monthly_data[monthly_data['accountid'] == account].sort_values('month_seq') if len(account_data) >= 12: for t in range(5, 8): X_train.append(account_data.iloc[t-5:t][feature_cols].values.flatten()) y_train.append(account_data['amount'].iloc[t]) for t in range(8, 12): X_test.append(account_data.iloc[t-5:t][feature_cols].values.flatten()) y_test.append(account_data['amount'].iloc[t]) else: print(f"Skipping account {account} due to insufficient data (less than 12 months).") return np.array(X_train), np.array(y_train), np.array(X_test), np.array(y_test) def train_model(self, X_train, y_train, X_test, y_test): """Train and evaluate a Random Forest model.""" # Scale features scaler = StandardScaler() X_train_scaled = scaler.fit_transform(X_train) X_test_scaled = scaler.transform(X_test) # Train model model = RandomForestRegressor(n_estimators=100, random_state=42) model.fit(X_train_scaled, y_train) # Evaluate y_pred = model.predict(X_test_scaled) mae = mean_absolute_error(y_test, y_pred) rmse = np.sqrt(mean_squared_error(y_test, y_pred)) r2 = r2_score(y_test, y_pred) print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}, R-squared: {r2:.2f}") return model, scaler def plot_predictions(self, y_test, y_pred, title, output_path): """Plot actual vs predicted values and save to file.""" plt.figure(figsize=(10, 5)) plt.scatter(y_test, y_pred, alpha=0.5) plt.xlabel("Actual Salary") plt.ylabel("Predicted Salary") plt.title(title) plt.plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], 'r--') plt.savefig(output_path) plt.close() def train_and_evaluate(self, consistent_accounts, inconsistent_accounts): """Train and evaluate models for both consistent and inconsistent salary earners.""" # Train model for consistent salary earners X_train_cons, y_train_cons, X_test_cons, y_test_cons = self.prepare_data(self.df, consistent_accounts) if len(X_train_cons) > 0: self.model_cons, self.scaler_cons = self.train_model(X_train_cons, y_train_cons, X_test_cons, y_test_cons) print("Model trained for consistent salary earners.") # Save model and scaler dump(self.model_cons, OUTPUT_PATHS['consistent_model']) dump(self.scaler_cons, OUTPUT_PATHS['consistent_scaler']) print("Saved consistent salary earner model and scaler.") # Plot predictions X_test_cons_scaled = self.scaler_cons.transform(X_test_cons) y_pred = self.model_cons.predict(X_test_cons_scaled) self.plot_predictions( y_test_cons, y_pred, "Actual vs. Predicted Salary (Consistent Earners)", OUTPUT_PATHS['consistent_earners_plot'] ) else: print("No accounts with sufficient data for consistent salary earners.") # Train model for inconsistent salary earners X_train_incons, y_train_incons, X_test_incons, y_test_incons = self.prepare_data(self.df, inconsistent_accounts) if len(X_train_incons) > 0: print("\nTraining model for inconsistent salary earners...") self.model_incons, self.scaler_incons = self.train_model(X_train_incons, y_train_incons, X_test_incons, y_test_incons) # Save model and scaler dump(self.model_incons, OUTPUT_PATHS['inconsistent_model']) dump(self.scaler_incons, OUTPUT_PATHS['inconsistent_scaler']) print("Saved inconsistent salary earner model and scaler.") # Plot predictions X_test_incons_scaled = self.scaler_incons.transform(X_test_incons) y_pred = self.model_incons.predict(X_test_incons_scaled) self.plot_predictions( y_test_incons, y_pred, "Actual vs. Predicted Salary (Inconsistent Earners)", OUTPUT_PATHS['inconsistent_earners_plot'] ) else: print("No accounts with sufficient data for inconsistent salary earners.")