[add]: code refractoring and cleanup

This commit is contained in:
VivianDee
2025-09-07 23:07:40 +01:00
parent 2cc3d70f4f
commit 6de9583aaf
25 changed files with 86 additions and 60 deletions
+26
View File
@@ -0,0 +1,26 @@
from flask import Flask
import os
from .extensions import db, migrate
"""
Salary Analytics Package
A package for analyzing and predicting salary patterns from transaction data.
"""
__version__ = "0.1.0"
def create_app():
app = Flask(__name__)
app.config.from_object('salary_analytics.config')
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
# Register blueprints or CLI commands here if needed
from .commands import commands
app.cli.add_command(commands.upload_xls_cli)
return app
+44
View File
@@ -0,0 +1,44 @@
from django.conf import settings
import httpx
import json
from app.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS
from app.utils.logger import logger
class SimbrellaIntegration:
BASE_URL = SIMBRELLA_BASE_URL
ENDPOINT_RAC_CHECKS = SIMBRELLA_ENDPOINT_RAC_CHECKS
@staticmethod
def rac_check(customer_id, account_id, transaction_id):
"""
Calls the RACCheck endpoit
"""
url = f"{SimbrellaIntegration.BASE_URL}/{SimbrellaIntegration.ENDPOINT_RAC_CHECKS}"
logger.info(f"Contacting Rack Checks EndPoint: {str(url)}", exc_info=True)
payload = {
"customerId": customer_id,
"accountId": account_id,
"transactionId": str(transaction_id),
"fbnTransactionId": str(transaction_id),
"countryCode": "NG",
"channel": "USSD"
}
headers = {
"Content-Type": "application/json",
"x-api-key": f"{settings.VALID_API_KEY}",
"App-Id": f"{settings.VALID_APP_ID}",
}
try:
response = httpx.post(url, json=payload, headers=headers, timeout=10.0)
logger.info(f"This is Response: {str(response)}", exc_info=True)
return response
except Exception as e:
logger.error(f"RACCheck API call failed: {str(e)}", exc_info=True)
raise Exception(f"RACCheck API call failed: {str(e)}")
@@ -0,0 +1,36 @@
import time
import threading
import requests
from ...config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload
from app.utils.logger import logger
class SalaryDetect:
def __init__(self):
self._running = False
self._thread = None
def _run(self):
while self._running:
logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Detecting salary...")
try:
payload = get_random_salary_payload()
response = requests.post(SALARY_DETECT_URL, headers=SALARY_DETECT_HEADERS, json=payload)
logger.info(f"POST {SALARY_DETECT_URL} status: {response.status_code}, response: {response.text}")
except Exception as e:
logger.error(f"Error during POST: {e}")
logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Salary detection complete")
time.sleep(120)
def start(self):
if not self._running:
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join()
+24
View File
@@ -0,0 +1,24 @@
from .main import SalaryAnalyticsPipeline
from .data_loader import DataLoader
from .keyword_analyzer import KeywordAnalyzer
from .consistent_amount_analyzer import ConsistentAmountAnalyzer
from .transaction_type_analyzer import TransactionTypeAnalyzer
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .salary_predictor import SalaryPredictor
"""
Salary Analytics Package
A package for analyzing and predicting salary patterns from transaction data.
"""
__version__ = "0.1.0"
__all__ = [
"SalaryAnalyticsPipeline",
"DataLoader",
"KeywordAnalyzer",
"ConsistentAmountAnalyzer",
"TransactionTypeAnalyzer",
"SalaryEarnerAnalyzer",
"SalaryPredictor"
]
@@ -0,0 +1,64 @@
"""
Consistent amount transaction analysis module.
"""
import pandas as pd
from .config import MODEL_CONFIG
class ConsistentAmountAnalyzer:
def __init__(self, df):
self.df = df
self.const_df = None
def calculate_coefficient_of_variation(self, group):
"""Calculate coefficient of variation for a group of transactions."""
amounts = group[group['initiated_by'] == 'C']['amount']
mean = amounts.mean()
std = amounts.std(ddof=0)
if mean == 0:
return float('nan')
return std / mean
def flag_consistent_amounts(self, group, cv_threshold=None):
"""Flag accounts with low variance in transaction amounts."""
if cv_threshold is None:
cv_threshold = MODEL_CONFIG['cv_threshold']
filtered_group = group[group['initiated_by'] == 'C']
cv = self.calculate_coefficient_of_variation(filtered_group)
is_consistent = cv <= cv_threshold if not pd.isna(cv) else False
return pd.Series(
[is_consistent] * len(group),
index=group.index,
name='is_consistent_amount'
)
def identify_consistent_amount_accounts(self, cv_threshold=None):
"""Identify accounts with consistent transaction amounts."""
if cv_threshold is None:
cv_threshold = MODEL_CONFIG['cv_threshold']
# Create a copy of the original DataFrame
self.const_df = self.df.copy()
# Calculate consistent amount flags
consistent_flags = self.const_df.groupby('accountid').apply(
lambda group: self.flag_consistent_amounts(group, cv_threshold)
).reset_index(level=0, drop=True)
# Add the flags to the original DataFrame
self.const_df['is_consistent_amount'] = consistent_flags
return self.const_df
def get_consistent_amount_data(self):
"""Get transactions identified as having consistent amounts."""
if self.const_df is None:
self.identify_consistent_amount_accounts()
return self.const_df[
(self.const_df['is_consistent_amount']) &
(self.const_df['initiated_by'] == 'C')
]
+169
View File
@@ -0,0 +1,169 @@
"""
Data loading and preprocessing module.
"""
from sqlalchemy import create_engine, text
import pandas as pd
from datetime import datetime
import logging
import os
from .config import DB_CONFIG, TABLE_NAME
from app.utils.logger import logger
class DataLoader:
def __init__(self):
self.engine = None
self.df = None
self.chunk_size = 10000 # Load 10,000 rows at a time
def connect(self):
"""Establish database connection."""
try:
logger.info("Attempting to connect to database...")
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
self.engine = create_engine(DATABASE_URL)
with self.engine.connect() as conn:
# First check if table exists
check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{TABLE_NAME}')")
table_exists = conn.execute(check_table).scalar()
if not table_exists:
logger.error(f"Table {TABLE_NAME} does not exist in the database")
return False
# Get row count
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
row_count = conn.execute(count_query).scalar()
logger.info(f"Table {TABLE_NAME} exists with {row_count} rows")
# Get version
result = conn.execute(text("SELECT version();"))
logger.info("Connected successfully to database!")
return True
except Exception as e:
logger.error(f"Error connecting to database: {str(e)}")
return False
def load_from_csv(self, file_path):
"""Load data from a CSV file."""
try:
logger.info(f"Loading data from CSV file: {file_path}")
if not os.path.exists(file_path):
logger.error(f"CSV file not found: {file_path}")
return None
# Load data in chunks
chunks = []
for chunk in pd.read_csv(file_path, chunksize=self.chunk_size):
# Preprocess chunk
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns if needed
if 'd1' in chunk.columns:
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
chunks.append(chunk)
# Combine all chunks
self.df = pd.concat(chunks, ignore_index=True)
logger.info(f"Successfully loaded {len(self.df)} rows from CSV")
# Basic data validation
logger.info("Performing data validation...")
logger.info(f"Columns in dataset: {self.df.columns.tolist()}")
logger.info(f"Data types:\n{self.df.dtypes}")
logger.info(f"Missing values:\n{self.df.isnull().sum()}")
return self.df
except Exception as e:
logger.error(f"Error loading data from CSV: {str(e)}")
return None
def load_from_db(self):
"""Load and preprocess transaction data from database in chunks."""
if not self.engine:
logger.info("No database connection. Attempting to connect...")
if not self.connect():
logger.error("Failed to establish database connection")
return None
try:
logger.info(f"Loading data from table: {TABLE_NAME}")
# First get total count
with self.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
logger.info(f"Total rows to process: {total_rows}")
# Load data in chunks
chunks = []
offset = 0
while True:
logger.info(f"Loading chunk starting at offset {offset}")
query = f"SELECT * FROM {TABLE_NAME} LIMIT {self.chunk_size} OFFSET {offset}"
chunk = pd.read_sql(query, self.engine)
if chunk.empty:
break
# Preprocess chunk
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
chunks.append(chunk)
offset += self.chunk_size
if offset >= total_rows:
break
# Combine all chunks
self.df = pd.concat(chunks, ignore_index=True)
logger.info(f"Successfully loaded {len(self.df)} rows of data")
# Basic data validation
logger.info("Performing data validation...")
logger.info(f"Columns in dataset: {self.df.columns.tolist()}")
logger.info(f"Data types:\n{self.df.dtypes}")
logger.info(f"Missing values:\n{self.df.isnull().sum()}")
return self.df
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
return None
def load_data(self, source='db', file_path=None):
"""Load data from either database or CSV file."""
if source == 'db':
return self.load_from_db()
elif source == 'csv':
if not file_path:
logger.error("File path must be provided when loading from CSV")
return None
return self.load_from_csv(file_path)
else:
logger.error(f"Invalid source: {source}. Must be 'db' or 'csv'")
return None
def get_data(self):
"""Get the loaded DataFrame."""
if self.df is None:
logger.warning("No data loaded. Call load_data() first.")
return self.df
@@ -0,0 +1,47 @@
"""
Keyword-based salary transaction analysis module.
"""
import re
import pandas as pd
from .config import SALARY_KEYWORDS
class KeywordAnalyzer:
def __init__(self, df):
self.df = df
self.desc_df = None
def identify_salary_transactions(self):
"""
Identifies potential salary-related transactions based on keywords
and month-year patterns in the 'description' column.
"""
month_year_patterns = [
r"\b(?:JAN|FEB|MAR|APR|MAY|JUN|JUL|AUG|SEP|OCT|NOV|DEC)\s?\d{2,4}\b",
r"\b(?:JANUARY|FEBRUARY|MARCH|APRIL|MAY|JUNE|JULY|AUGUST|SEPTEMBER|OCTOBER|NOVEMBER|DECEMBER)\s?\d{2,4}\b"
]
escaped_keywords = [re.escape(keyword.lower()) for keyword in SALARY_KEYWORDS]
combined_pattern = (
r'\b(?:' + '|'.join(escaped_keywords) + r')\b|' +
'|'.join(month_year_patterns)
)
self.df['is_salary_related'] = self.df['description'].str.lower().str.contains(
combined_pattern,
na=False,
regex=True
)
self.desc_df = self.df.copy()
return self.df
def get_salary_related_data(self):
"""Get transactions identified as salary-related."""
if self.desc_df is None:
self.identify_salary_transactions()
return self.desc_df[
(self.desc_df['is_salary_related'] == True) &
(self.desc_df['initiated_by'] == 'C')
]
+153
View File
@@ -0,0 +1,153 @@
"""
Main module for running the salary analytics pipeline.
"""
import logging
from .data_loader import DataLoader
from .keyword_analyzer import KeywordAnalyzer
from .consistent_amount_analyzer import ConsistentAmountAnalyzer
from .transaction_type_analyzer import TransactionTypeAnalyzer
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .salary_predictor import SalaryPredictor
from app.utils.logger import logger
class SalaryAnalyticsPipeline:
def __init__(self):
logger.info("Initializing SalaryAnalyticsPipeline")
self.data_loader = None
self.df = None
self.keyword_analyzer = None
self.consistent_amount_analyzer = None
self.transaction_type_analyzer = None
self.salary_earner_analyzer = None
self.salary_predictor = None
def load_data(self, source='db', file_path=None):
"""Load and preprocess the transaction data."""
logger.info("Starting data loading process")
self.data_loader = DataLoader()
self.df = self.data_loader.load_data(source=source, file_path=file_path)
if self.df is not None:
logger.info(f"Successfully loaded data with {len(self.df)} rows")
else:
logger.error("Failed to load data")
return self.df is not None
def run_keyword_analysis(self):
"""Run keyword-based salary transaction analysis."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting keyword analysis")
self.keyword_analyzer = KeywordAnalyzer(self.df)
self.keyword_analyzer.identify_salary_transactions()
keyword_data = self.keyword_analyzer.get_salary_related_data()
# Update main DataFrame with keyword analysis results
self.df['is_salary_related'] = self.df.index.isin(keyword_data.index)
return keyword_data
def run_consistent_amount_analysis(self):
"""Run consistent amount transaction analysis."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting consistent amount analysis")
self.consistent_amount_analyzer = ConsistentAmountAnalyzer(self.df)
self.consistent_amount_analyzer.identify_consistent_amount_accounts()
consistent_data = self.consistent_amount_analyzer.get_consistent_amount_data()
# Update main DataFrame with consistent amount analysis results
self.df['is_consistent_amount'] = self.df.index.isin(consistent_data.index)
return consistent_data
def run_transaction_type_analysis(self):
"""Run transaction type analysis."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting transaction type analysis")
self.transaction_type_analyzer = TransactionTypeAnalyzer(self.df)
self.transaction_type_analyzer.flag_salary_type_transactions()
type_data = self.transaction_type_analyzer.get_salary_type_data()
# Update main DataFrame with transaction type analysis results
self.df['is_salary_type'] = self.df.index.isin(type_data.index)
return type_data
def generate_salary_earner_reports(self):
"""Generate salary earner reports."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
# Ensure all analysis flags are present
required_columns = ['is_salary_related', 'is_consistent_amount', 'is_salary_type']
missing_columns = [col for col in required_columns if col not in self.df.columns]
if missing_columns:
logger.error(f"Missing required columns: {missing_columns}")
raise ValueError(f"Missing required columns: {missing_columns}. Run all analyses first.")
logger.info("Starting salary earner report generation")
self.salary_earner_analyzer = SalaryEarnerAnalyzer(self.df)
return self.salary_earner_analyzer.generate_reports()
def train_salary_prediction_models(self):
"""Train salary prediction models."""
if self.df is None:
logger.error("Data not loaded. Call load_data() first.")
raise ValueError("Data not loaded. Call load_data() first.")
logger.info("Starting model training")
self.salary_predictor = SalaryPredictor(self.df)
# Get accounts from the salary earner analyzer
if self.salary_earner_analyzer is None:
logger.info("Salary earner analyzer not initialized. Generating reports first.")
self.generate_salary_earner_reports()
consistent_accounts = self.salary_earner_analyzer.final_table['accountid'].unique()
inconsistent_accounts = self.salary_earner_analyzer.likely_salary_earner['accountid'].unique()
self.salary_predictor.train_and_evaluate(consistent_accounts, inconsistent_accounts)
def run_full_pipeline(self, source='db', file_path=None):
"""Run the complete salary analytics pipeline."""
logger.info("Starting full pipeline execution")
if not self.load_data(source=source, file_path=file_path):
logger.error("Failed to load data. Exiting pipeline.")
return False
try:
logger.info("Running keyword analysis...")
self.run_keyword_analysis()
logger.info("Running consistent amount analysis...")
self.run_consistent_amount_analysis()
logger.info("Running transaction type analysis...")
self.run_transaction_type_analysis()
logger.info("Generating salary earner reports...")
self.generate_salary_earner_reports()
logger.info("Training salary prediction models...")
self.train_salary_prediction_models()
logger.info("Pipeline completed successfully!")
return True
except Exception as e:
logger.error(f"Pipeline failed: {str(e)}")
return False
def main():
"""Main function to run the salary analytics pipeline."""
pipeline = SalaryAnalyticsPipeline()
pipeline.run_full_pipeline()
if __name__ == "__main__":
main()
@@ -0,0 +1,169 @@
"""
Salary earner analysis and report generation module.
"""
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib_venn import venn3
from datetime import datetime, timedelta
from .config import MODEL_CONFIG, OUTPUT_PATHS
from app.utils.logger import logger
class SalaryEarnerAnalyzer:
def __init__(self, df):
self.df = df
self.final_table = None
self.likely_salary_earner = None
self.high_earner_details = None
def filter_venn_section(self, **kwargs):
"""Filter accounts based on specified combinations of hypothesis flags."""
valid_columns = {'is_salary_related', 'is_consistent_amount', 'is_salary_type'}
df1 = self.df[self.df['initiated_by'] == 'C'].copy()
invalid_keys = set(kwargs.keys()) - valid_columns
if invalid_keys:
raise ValueError(f"Invalid keys: {invalid_keys}. Valid keys are {valid_columns}.")
condition = pd.Series([True] * len(df1), index=df1.index)
for key, value in kwargs.items():
condition &= (df1[key] == value)
filtered_df = df1[condition]
# Drop any rows with NaN values in critical columns
critical_cols = ['accountid', 'trx_start_date', 'amount']
filtered_df = filtered_df.dropna(subset=critical_cols)
return filtered_df
def plot_hypothesis_overlap(self, hypothesis1_df, hypothesis3_df, hypothesis4_df, account_col='accountid'):
"""Plot and save Venn diagram showing overlap between hypotheses."""
set2 = set(hypothesis3_df[account_col][hypothesis3_df['is_consistent_amount']])
set3 = set(hypothesis1_df[account_col][hypothesis1_df['is_salary_related']])
set4 = set(hypothesis4_df[account_col][hypothesis4_df['is_salary_type']])
plt.figure(figsize=(10, 10))
venn3([set2, set3, set4], set_labels=('Consistent Amount',
'Salary Description', 'Transaction Type'))
plt.title('Overlap Between Hypotheses')
plt.savefig(OUTPUT_PATHS['hypothesis_overlap_plot'])
plt.close()
def generate_salary_earners_table(self, all_three_hypotheses):
"""Generate a table of salary earners with their metrics."""
results = []
for accountid, group in all_three_hypotheses.groupby('accountid'):
# Skip if group is empty
if group.empty:
continue
# Calculate required metrics
num_months = len(group)
# Handle last 6 months calculation
last_6_months = group[group['trx_start_date'] >= (datetime.now() - timedelta(days=180))]
if last_6_months.empty:
least_inflow = 0
else:
least_inflow = last_6_months['amount'].min()
# Handle average salary calculation
if group['amount'].notna().any():
avg_salary = group['amount'].mean()
else:
avg_salary = 0
# Calculate days_since_last_trx with NaN handling
group['days_since_last_trx'] = group['trx_start_date'].diff().dt.days
median_interval = group['days_since_last_trx'].median()
if pd.isna(median_interval):
median_interval = 30 # Default to 30 days if no interval data
last_date = group['trx_start_date'].max()
next_date = last_date + timedelta(days=median_interval)
next_amount = avg_salary
# Boolean flags with NaN handling
days_since_last = (datetime.now() - last_date).days
has_45d = days_since_last <= 45
has_2m = len(group[group['trx_start_date'] >= (datetime.now() - timedelta(days=60))]) >= 2
results.append({
'accountid': accountid,
'num_months': num_months,
'least_inflow_6m': least_inflow,
'avg_monthly_salary': avg_salary,
'estimated_next_amount': next_amount,
'estimated_next_date': next_date,
'45daysalary': has_45d,
'2monthssalary': has_2m
})
final_df = pd.DataFrame(results)
# Drop rows where all numeric columns are NaN
numeric_cols = ['num_months', 'least_inflow_6m', 'avg_monthly_salary', 'estimated_next_amount']
final_df = final_df.dropna(subset=numeric_cols, how='all')
return final_df
def analyze_salary_earners(self, final_df):
"""Analyze salary earners and identify high earners."""
high_earners = final_df[final_df['estimated_next_amount'] >= MODEL_CONFIG['high_earner_threshold']].copy()
high_earner_details = high_earners[['accountid', 'least_inflow_6m']].reset_index(drop=True)
count_high = len(high_earners)
return high_earner_details, count_high
def generate_reports(self):
"""Generate all salary earner reports."""
# Get accounts flagged by all three hypotheses
all_three_hypotheses = self.filter_venn_section(
is_salary_related=True,
is_consistent_amount=True,
is_salary_type=True
)
# Generate final table
self.final_table = self.generate_salary_earners_table(all_three_hypotheses)
logger.info(f"Found {self.final_table['accountid'].nunique()} verified salary earners")
# Generate likely salary earner table
green_section = self.filter_venn_section(
is_salary_related=True,
is_consistent_amount=False,
is_salary_type=True
)
yellow_section = self.filter_venn_section(
is_salary_related=False,
is_consistent_amount=True,
is_salary_type=True
)
self.likely_salary_earner = pd.concat([yellow_section, green_section])
self.likely_salary_earner = self.likely_salary_earner.drop_duplicates(subset=['id'])
self.likely_salary_earner = self.generate_salary_earners_table(self.likely_salary_earner)
logger.info(f"Found {self.likely_salary_earner['accountid'].nunique()} likely salary earners")
# Analyze high earners
self.high_earner_details, total_high_earners = self.analyze_salary_earners(self.final_table)
logger.info(f"\nTotal High Earners: {total_high_earners}")
# Plot hypothesis overlap
self.plot_hypothesis_overlap(
self.df[self.df['is_salary_related']],
self.df[self.df['is_consistent_amount']],
self.df[self.df['is_salary_type']]
)
# Save reports
self.high_earner_details.to_csv(OUTPUT_PATHS['high_earner_details'], index=False)
self.likely_salary_earner.to_csv(OUTPUT_PATHS['likely_salary_earner'], index=False)
self.final_table.to_csv(OUTPUT_PATHS['final_table'], index=False)
return {
'final_table': self.final_table,
'likely_salary_earner': self.likely_salary_earner,
'high_earner_details': self.high_earner_details,
'total_high_earners': total_high_earners
}
+171
View File
@@ -0,0 +1,171 @@
"""
Salary prediction module using machine learning.
"""
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from joblib import dump
from .config import OUTPUT_PATHS
class SalaryPredictor:
def __init__(self, df):
self.df = df
self.model_cons = None
self.model_incons = None
self.scaler_cons = None
self.scaler_incons = None
def add_feature_engineering(self, df):
"""Engineer features for salary prediction."""
df['month'] = df['trx_start_date'].dt.month
df['month_seq'] = df.groupby(['accountid', 'month']).ngroup() + 1
# Categorical encoding
encoder = OneHotEncoder(sparse_output=False)
encoded_trx_type = encoder.fit_transform(df[['trx_type']])
encoded_df = pd.DataFrame(encoded_trx_type, columns=encoder.get_feature_names_out(['trx_type']))
df = pd.concat([df, encoded_df], axis=1)
# Rolling statistics
df = df.sort_values(['accountid', 'trx_start_date'])
df['rolling_sum_3m'] = df.groupby('accountid')['amount'].rolling(window=3,
min_periods=1).sum().reset_index(0, drop=True)
df['rolling_avg_3m'] = df.groupby('accountid')['amount'].rolling(window=3,
min_periods=1).mean().reset_index(0, drop=True)
return df
def prepare_data(self, df_transactions, accounts):
"""Prepare data for training and testing."""
df_filtered = df_transactions[df_transactions['accountid'].isin(accounts)].copy()
print(f"Filtered data for {len(accounts)} accounts.")
print(f"Total transactions: {len(df_filtered)}")
# Drop unnecessary columns
df_filtered = df_filtered.drop(['description', 'id', 'customer_id',
'trx_end_date', 'is_salary_related',
'is_consistent_amount', 'is_salary_type'], axis=1)
# Add feature engineering
df_filtered = self.add_feature_engineering(df_filtered)
# Aggregate monthly data
agg_funcs = {
'amount': 'mean',
'rolling_sum_3m': 'last',
'rolling_avg_3m': 'last',
'month': 'first'
}
encoded_cols = [col for col in df_filtered.columns if col.startswith('trx_type_')]
for col in encoded_cols:
agg_funcs[col] = 'sum'
monthly_data = df_filtered.groupby(['accountid', 'month_seq']).agg(agg_funcs).reset_index()
# Filter accounts with at least 12 months
account_month_counts = monthly_data.groupby('accountid')['month_seq'].max()
valid_accounts = account_month_counts[account_month_counts >= 12].index
monthly_data = monthly_data[monthly_data['accountid'].isin(valid_accounts)]
# Create sequences
X_train, y_train, X_test, y_test = [], [], [], []
feature_cols = ['accountid', 'amount', 'rolling_sum_3m', 'rolling_avg_3m',
'month'] + encoded_cols
for account in valid_accounts:
account_data = monthly_data[monthly_data['accountid'] == account].sort_values('month_seq')
if len(account_data) >= 12:
for t in range(5, 8):
X_train.append(account_data.iloc[t-5:t][feature_cols].values.flatten())
y_train.append(account_data['amount'].iloc[t])
for t in range(8, 12):
X_test.append(account_data.iloc[t-5:t][feature_cols].values.flatten())
y_test.append(account_data['amount'].iloc[t])
else:
print(f"Skipping account {account} due to insufficient data (less than 12 months).")
return np.array(X_train), np.array(y_train), np.array(X_test), np.array(y_test)
def train_model(self, X_train, y_train, X_test, y_test):
"""Train and evaluate a Random Forest model."""
# Scale features
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)
# Train model
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(X_train_scaled, y_train)
# Evaluate
y_pred = model.predict(X_test_scaled)
mae = mean_absolute_error(y_test, y_pred)
rmse = np.sqrt(mean_squared_error(y_test, y_pred))
r2 = r2_score(y_test, y_pred)
print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}, R-squared: {r2:.2f}")
return model, scaler
def plot_predictions(self, y_test, y_pred, title, output_path):
"""Plot actual vs predicted values and save to file."""
plt.figure(figsize=(10, 5))
plt.scatter(y_test, y_pred, alpha=0.5)
plt.xlabel("Actual Salary")
plt.ylabel("Predicted Salary")
plt.title(title)
plt.plot([min(y_test), max(y_test)], [min(y_test), max(y_test)], 'r--')
plt.savefig(output_path)
plt.close()
def train_and_evaluate(self, consistent_accounts, inconsistent_accounts):
"""Train and evaluate models for both consistent and inconsistent salary earners."""
# Train model for consistent salary earners
X_train_cons, y_train_cons, X_test_cons, y_test_cons = self.prepare_data(self.df, consistent_accounts)
if len(X_train_cons) > 0:
self.model_cons, self.scaler_cons = self.train_model(X_train_cons, y_train_cons, X_test_cons, y_test_cons)
print("Model trained for consistent salary earners.")
# Save model and scaler
dump(self.model_cons, OUTPUT_PATHS['consistent_model'])
dump(self.scaler_cons, OUTPUT_PATHS['consistent_scaler'])
print("Saved consistent salary earner model and scaler.")
# Plot predictions
X_test_cons_scaled = self.scaler_cons.transform(X_test_cons)
y_pred = self.model_cons.predict(X_test_cons_scaled)
self.plot_predictions(
y_test_cons,
y_pred,
"Actual vs. Predicted Salary (Consistent Earners)",
OUTPUT_PATHS['consistent_earners_plot']
)
else:
print("No accounts with sufficient data for consistent salary earners.")
# Train model for inconsistent salary earners
X_train_incons, y_train_incons, X_test_incons, y_test_incons = self.prepare_data(self.df, inconsistent_accounts)
if len(X_train_incons) > 0:
print("\nTraining model for inconsistent salary earners...")
self.model_incons, self.scaler_incons = self.train_model(X_train_incons, y_train_incons, X_test_incons, y_test_incons)
# Save model and scaler
dump(self.model_incons, OUTPUT_PATHS['inconsistent_model'])
dump(self.scaler_incons, OUTPUT_PATHS['inconsistent_scaler'])
print("Saved inconsistent salary earner model and scaler.")
# Plot predictions
X_test_incons_scaled = self.scaler_incons.transform(X_test_incons)
y_pred = self.model_incons.predict(X_test_incons_scaled)
self.plot_predictions(
y_test_incons,
y_pred,
"Actual vs. Predicted Salary (Inconsistent Earners)",
OUTPUT_PATHS['inconsistent_earners_plot']
)
else:
print("No accounts with sufficient data for inconsistent salary earners.")
@@ -0,0 +1,43 @@
"""
Transaction type analysis module.
"""
import pandas as pd
from .config import MODEL_CONFIG
class TransactionTypeAnalyzer:
def __init__(self, df):
self.df = df
self.trx_df = None
def flag_salary_type_transactions(self):
"""Flag transactions that match salary criteria based on type and subtype."""
self.df['is_salary_type'] = (
((self.df['trx_type'] == 'T') | (self.df['trx_type'] == 'C')) &
((self.df['trx_subtype'] == 'BI') | (self.df['trx_subtype'] == 'I') |
(self.df['trx_subtype'] == 'BS') | (self.df['trx_subtype'] == 'CI')) &
(self.df['initiated_by'] == 'C') &
(self.df['amount'] > 0)
)
self.trx_df = self.df.copy()
return self.df
def is_salary_earner_by_type(self, group, min_transactions=None, threshold=None):
"""Determine if an account likely belongs to a salary earner."""
if min_transactions is None:
min_transactions = MODEL_CONFIG['min_transactions']
if threshold is None:
threshold = MODEL_CONFIG['threshold']
if len(group) < min_transactions:
return False
valid_ratio = group['is_salary_type'].mean()
return valid_ratio >= threshold
def get_salary_type_data(self):
"""Get transactions identified as salary type."""
if self.trx_df is None:
self.flag_salary_type_transactions()
return self.trx_df[self.trx_df['is_salary_type']]
+600
View File
@@ -0,0 +1,600 @@
"""
FastAPI application for salary analytics.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, List, Union
import os
import socket
import logging
import pandas as pd
import tempfile
from datetime import datetime
from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData
import numpy as np
import warnings
import time
from .analytics.services.main import SalaryAnalyticsPipeline
from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE
from .data_loader import DataLoader
from .salary_predictor import SalaryPredictor
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .db_operations import DatabaseOperations
from .analytics.integrations.salary_detect import SalaryDetect
from app.utils.logger import logger
# Suppress warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
pd.options.mode.chained_assignment = None
app = FastAPI(
title="Salary Analytics API",
description="API for analyzing and predicting salary patterns from transaction data",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Global pipeline instance
pipeline = SalaryAnalyticsPipeline()
# Global variables to store loaded data and models
data_loader = None
df = None
salary_predictor = None
salary_earner_analyzer = None
salary_detect = SalaryDetect()
class AnalysisResponse(BaseModel):
"""Response model for analysis endpoints."""
message: str
data: Optional[Dict] = None
file_path: Optional[str] = None
class BatchResponse(BaseModel):
"""Response model for batch processing."""
batch_number: int
total_batches: int
processed_rows: int
results_path: str
message: str
def check_data_loaded():
"""Check if data is loaded before running analytics."""
if pipeline.df is None:
raise HTTPException(
status_code=400,
detail="No data loaded. Please load data first using the /load-data endpoint."
)
@app.on_event("startup")
async def startup_event():
"""Initialize the pipeline on startup."""
try:
logger.info("Initializing pipeline...")
# Start autonomous salary detection loop
salary_detect.start()
logger.info("Started autonomous salary detection loop.")
# Print network information
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
logger.info(f"Server running on hostname: {hostname}")
logger.info(f"Server IP address: {ip_address}")
logger.info(f"Server is accessible at:")
logger.info(f"- http://localhost:8000")
logger.info(f"- http://127.0.0.1:8000")
logger.info(f"- http://{ip_address}:8000")
logger.info("Pipeline initialized successfully")
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
raise
@app.get("/")
async def root():
"""Root endpoint."""
start_time = time.time()
logger.info("Root endpoint accessed")
response = {"message": "Welcome to Salary Analytics API"}
logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
return response
@app.get("/health")
async def health_check():
"""Health check endpoint."""
start_time = time.time()
logger.info("Health check endpoint accessed")
response = {"status": "healthy"}
logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
return response
@app.post("/analyze/keyword", response_model=AnalysisResponse)
async def analyze_keyword():
"""Run keyword-based salary transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting keyword analysis...")
data = pipeline.run_keyword_analysis()
logger.info(f"Keyword analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Keyword analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in keyword analysis: {str(e)}")
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/consistent-amount", response_model=AnalysisResponse)
async def analyze_consistent_amount():
"""Run consistent amount transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting consistent amount analysis...")
data = pipeline.run_consistent_amount_analysis()
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Consistent amount analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in consistent amount analysis: {str(e)}")
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/transaction-type", response_model=AnalysisResponse)
async def analyze_transaction_type():
"""Run transaction type analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting transaction type analysis...")
data = pipeline.run_transaction_type_analysis()
logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Transaction type analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in transaction type analysis: {str(e)}")
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/generate/reports", response_model=AnalysisResponse)
async def generate_reports(background_tasks: BackgroundTasks):
"""Generate salary earner reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting report generation...")
reports = pipeline.generate_salary_earner_reports()
logger.info("Reports generated successfully")
response = AnalysisResponse(
message="Reports generated successfully",
data={
"verified_salary_earners": len(reports['final_table']),
"likely_salary_earners": len(reports['likely_salary_earner']),
"high_earners": reports['total_high_earners']
}
)
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in report generation: {str(e)}")
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/train/models", response_model=AnalysisResponse)
async def train_models():
"""Train salary prediction models."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting model training...")
pipeline.train_salary_prediction_models()
logger.info("Models trained successfully")
response = AnalysisResponse(
message="Models trained successfully"
)
logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in model training: {str(e)}")
logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{report_type}")
async def download_report(report_type: str):
"""Download generated reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info(f"Attempting to download report: {report_type}")
file_paths = {
"high_earners": OUTPUT_PATHS["high_earner_details"],
"likely_earners": OUTPUT_PATHS["likely_salary_earner"],
"final_table": OUTPUT_PATHS["final_table"],
"consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
"inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
"hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
}
if report_type not in file_paths:
logger.error(f"Report type not found: {report_type}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report type not found")
file_path = file_paths[report_type]
if not os.path.exists(file_path):
logger.error(f"Report file not found: {file_path}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report file not found")
logger.info(f"Successfully found report file: {file_path}")
response = FileResponse(
path=file_path,
filename=os.path.basename(file_path),
media_type="application/octet-stream"
)
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error downloading report: {str(e)}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/run/pipeline", response_model=AnalysisResponse)
async def run_full_pipeline():
"""Run the complete salary analytics pipeline."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting full pipeline...")
success = pipeline.run_full_pipeline()
if not success:
logger.error("Pipeline failed")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Pipeline failed")
logger.info("Pipeline completed successfully")
response = AnalysisResponse(
message="Pipeline completed successfully"
)
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/load-data")
async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
"""
Load data from either database or CSV file.
Args:
source (str): Source of data ('db' or 'csv')
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
dict: Status of data loading
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
success = pipeline.load_data(source='csv', file_path=temp_file_path)
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
success = pipeline.load_data(source='db')
if not success:
logger.error("Failed to load data")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to load data")
response = {
"status": "success",
"message": f"Successfully loaded {len(pipeline.df)} rows of data",
"columns": pipeline.df.columns.tolist(),
"row_count": len(pipeline.df)
}
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
"""Dependency to handle file upload only when source is csv."""
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
return file
@app.post("/run/streaming-pipeline", response_model=List[BatchResponse])
async def run_streaming_pipeline(
source: str = "db",
batch_size: int = 10000,
file: Optional[Union[UploadFile, str]] = File(None)
):
"""
Run the complete salary analytics pipeline in batches.
Args:
source (str): Source of data ('db' or 'csv')
batch_size (int): Number of rows to process in each batch
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
List[BatchResponse]: List of responses for each batch processed
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader
data_loader = DataLoader()
data_loader.chunk_size = batch_size
# Create output directory for batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True)
# Initialize database operations
if not data_loader.connect():
logger.error("Failed to connect to database")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to connect to database")
db_ops = DatabaseOperations(data_loader.engine)
if not db_ops.create_batch_results_table():
logger.error("Failed to create batch results table")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to create batch results table")
responses = []
batch_number = 0
batch_start_time = time.time()
def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
return chunk
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process CSV in chunks
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
batch_number += 1
logger.info(f"Processing batch {batch_number}")
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = -1 # Unknown for CSV
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
# Process database in chunks
if not data_loader.connect():
raise HTTPException(status_code=500, detail="Failed to connect to database")
# Get total row count
with data_loader.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
total_batches = (total_rows + batch_size - 1) // batch_size
offset = 0
while offset < total_rows:
batch_number += 1
logger.info(f"Processing batch {batch_number} of {total_batches}")
# Load chunk from database
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
chunk = pd.read_sql(query, data_loader.engine)
if chunk.empty:
break
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number} of {total_batches}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
offset += batch_size
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return responses
except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+70
View File
@@ -0,0 +1,70 @@
import click
import pandas as pd
from datetime import datetime
from flask.cli import with_appcontext
from app.extensions import db
from app.models import RawTransaction
@click.group()
def commands():
"""Management commands for the salary analytics application."""
pass
@commands.command('upload-xls')
@click.argument('xls_path')
@with_appcontext
def upload_xls_cli(xls_path):
"""Uploads data from an XLS file to the analytics_raw_transactions table.
Args:
xls_path (str): The path to the XLS file.
"""
print(f"Attempting to upload data from {xls_path}...")
try:
df = pd.read_excel(xls_path, dtype=str)
# Convert date columns to datetime
date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"]
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
df[col] = df[col].fillna(pd.Timestamp.now())
# Convert numeric columns
for col in ["TRAN_AMT", "BALANCE"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce')
# Truncate TRAN_PARTICULAR to 100 chars
if "TRAN_PARTICULAR" in df.columns:
df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100)
# Prepare data for insertion into the database
records = []
for index, row in df.iterrows():
record = RawTransaction(
cust_id=row.get('CUST_ID'),
accountid=row.get('ACCOUNTID'),
tran_id=row.get('TRAN_ID'),
entry_date=row.get('ENTRY_DATE'),
value_date=row.get('VALUE_DATE'),
pstd_date=row.get('PSTD_DATE'),
tran_date=row.get('TRAN_DATE'),
tran_sub_ty=row.get('TRAN_SUB_TY'),
part_tran_ty=row.get('PART_TRAN_TY'),
channel=row.get('CHANNEL'),
tran_amt=row.get('TRAN_AMT'),
balance=row.get('BALANCE'),
isreverse=row.get('ISREVERSE'),
reverse=row.get('REVERSE'),
tran_particular=(row.get('TRAN_PARTICULAR') or '')[:100]
)
records.append(record)
db.session.add_all(records)
db.session.commit()
print(f"Successfully uploaded {len(records)} records to analytics_raw_transactions")
except Exception as e:
db.session.rollback()
print(f"Error uploading data: {str(e)}")
+100
View File
@@ -0,0 +1,100 @@
"""
Configuration settings for the salary analytics package.
"""
import os
from dotenv import load_dotenv
import random
# Load environment variables
load_dotenv()
# Base directories
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
OUTPUT_DIR = os.path.join(BASE_DIR, "output")
PLOTS_DIR = os.path.join(OUTPUT_DIR, "plots")
CSV_DIR = os.path.join(OUTPUT_DIR, "csv")
MODEL_DIR = os.path.join(OUTPUT_DIR, "models")
# Create directories if they don't exist
os.makedirs(OUTPUT_DIR, exist_ok=True)
os.makedirs(PLOTS_DIR, exist_ok=True)
os.makedirs(CSV_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
# Database Configuration
DB_CONFIG = {
"user": os.getenv("DB_USER"), # Default value as fallback
"password": os.getenv("DB_PASSWORD"),
"name": os.getenv("DB_NAME"),
"port": os.getenv("DB_PORT"),
"host": os.getenv("DB_HOST")
}
# SQLAlchemy Configuration
SQLALCHEMY_DATABASE_URI = (
f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Table Configuration
TABLE_NAME = "customer_account_transaction_hx"
BATCH_RESULTS_TABLE = "salary_analytics_batch_results"
# Salary Keywords
SALARY_KEYWORDS = [
"salary", "payroll", "income", "wage", "wages",
"earnings", "earning", "monthly pay", "net pay", "gross pay", "compensation",
"monthlypay", "netpay", "grosspay",
"remuneration", "stipend", "allowance", "bonus", "commission",
"pension", "retirement", "dividend", "benefits", "reimbursement",
"overtime", "incentive", "paycheck", "paycheque", "salary advance",
"monthly income", "income tax refund", "employer deposit",
"payroll deposit", "salary credit", "income credit", "salary transfer",
"income transfer", "salary received", "income received", "hr deposit",
"company deposit", "employer payment", "employee payment",
"sal",
]
# Model Configuration
MODEL_CONFIG = {
"cv_threshold": 0.10,
"min_transactions": 3,
"threshold": 0.7,
"high_earner_threshold": 10000
}
# File Paths
OUTPUT_PATHS = {
"high_earner_details": os.path.join(CSV_DIR, "high_earner_details.csv"),
"likely_salary_earner": os.path.join(CSV_DIR, "likely_salary_earner.csv"),
"final_table": os.path.join(CSV_DIR, "final_table.csv"),
"consistent_earners_plot": os.path.join(PLOTS_DIR, "consistent_earners_predictions.png"),
"inconsistent_earners_plot": os.path.join(PLOTS_DIR, "inconsistent_earners_predictions.png"),
"hypothesis_overlap_plot": os.path.join(PLOTS_DIR, "hypothesis_overlap.png"),
"consistent_model": os.path.join(MODEL_DIR, "consistent_model.joblib"),
"inconsistent_model": os.path.join(MODEL_DIR, "inconsistent_model.joblib"),
"consistent_scaler": os.path.join(MODEL_DIR, "consistent_scaler.joblib"),
"inconsistent_scaler": os.path.join(MODEL_DIR, "inconsistent_scaler.joblib")
}
SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337")
SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS","api/rac-check")
# Salary Detect Endpoint Config
SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect"
SALARY_DETECT_HEADERS = {
"accept": "/",
"Content-Type": "application/json"
}
SALARY_DETECT_PAYLOADS = [
{"salaryDate": "2022-01-01", "customerId": "CN621868", "accountId": "2017821799", "salaryAmount": 200000},
{"customerId": "CUC2268333011", "accountId": "ACC8116931898", "salaryDate": "2025-07-08", "salaryAmount": 200000},
{"customerId": "CUC2163677018", "accountId": "ACC8118539484", "salaryDate": "2025-07-08", "salaryAmount": 200000},
{"customerId": "CUC1968062010", "accountId": "ACC8115473093", "salaryDate": "2025-07-08", "salaryAmount": 200000},
{"customerId": "CUC1302360013", "accountId": "ACC8117628489", "salaryDate": "2025-07-08", "salaryAmount": 200000}
]
def get_random_salary_payload():
return [random.choice(SALARY_DETECT_PAYLOADS)]
+135
View File
@@ -0,0 +1,135 @@
"""
Database operations module for salary analytics.
"""
from sqlalchemy import text
from .config import BATCH_RESULTS_TABLE
from datetime import datetime
from app.utils.logger import logger
class DatabaseOperations:
def __init__(self, engine):
"""Initialize with SQLAlchemy engine."""
self.engine = engine
def create_batch_results_table(self):
"""Create the batch results table if it doesn't exist."""
try:
with self.engine.connect() as conn:
# Check if table exists
check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{BATCH_RESULTS_TABLE}')")
table_exists = conn.execute(check_table).scalar()
if not table_exists:
# Create table
create_table = text(f"""
CREATE TABLE {BATCH_RESULTS_TABLE} (
id SERIAL PRIMARY KEY,
batch_number INTEGER,
total_batches INTEGER,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
accountid TEXT,
num_months INTEGER,
least_inflow_6m DECIMAL,
avg_monthly_salary DECIMAL,
estimated_next_amount DECIMAL,
estimated_next_date DATE,
is_45day_salary BOOLEAN,
is_2months_salary BOOLEAN,
status TEXT
)
""")
conn.execute(create_table)
conn.commit()
logger.info(f"Created table {BATCH_RESULTS_TABLE}")
return True
except Exception as e:
logger.error(f"Error creating batch results table: {str(e)}")
return False
def save_batch_to_db(self, batch_number, total_batches, results_df, status="success"):
"""Save batch processing results to database."""
try:
with self.engine.connect() as conn:
# Add batch metadata to the DataFrame
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Convert DataFrame to list of dictionaries
records = results_df.to_dict('records')
# Insert each record
for record in records:
insert_query = text(f"""
INSERT INTO {BATCH_RESULTS_TABLE}
(batch_number, total_batches, processed_at, accountid, num_months,
least_inflow_6m, avg_monthly_salary, estimated_next_amount,
estimated_next_date, is_45day_salary, is_2months_salary, status)
VALUES
(:batch_number, :total_batches, :processed_at, :accountid, :num_months,
:least_inflow_6m, :avg_monthly_salary, :estimated_next_amount,
:estimated_next_date, :is_45day_salary, :is_2months_salary, :status)
""")
# Convert boolean columns to proper format
record['is_45day_salary'] = record.get('45daysalary', False)
record['is_2months_salary'] = record.get('2monthssalary', False)
# Add status
record['status'] = status
conn.execute(insert_query, record)
conn.commit()
logger.info(f"Successfully saved batch {batch_number} results to database")
return True
except Exception as e:
logger.error(f"Error saving batch {batch_number} to database: {str(e)}")
return False
def get_batch_status(self, batch_number):
"""Get the status of a specific batch."""
try:
with self.engine.connect() as conn:
query = text(f"""
SELECT
batch_number,
total_batches,
processed_at,
COUNT(*) as total_records,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records
FROM {BATCH_RESULTS_TABLE}
WHERE batch_number = :batch_number
GROUP BY batch_number, total_batches, processed_at
ORDER BY processed_at DESC
LIMIT 1
""")
result = conn.execute(query, {"batch_number": batch_number}).fetchone()
return dict(result) if result else None
except Exception as e:
logger.error(f"Error getting batch {batch_number} status: {str(e)}")
return None
def get_all_batches(self):
"""Get all batch processing results."""
try:
with self.engine.connect() as conn:
query = text(f"""
SELECT
batch_number,
total_batches,
processed_at,
COUNT(*) as total_records,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as successful_records,
SUM(CASE WHEN status = 'error' THEN 1 ELSE 0 END) as failed_records
FROM {BATCH_RESULTS_TABLE}
GROUP BY batch_number, total_batches, processed_at
ORDER BY batch_number
""")
results = conn.execute(query).fetchall()
return [dict(row) for row in results]
except Exception as e:
logger.error(f"Error getting all batches: {str(e)}")
return []
+5
View File
@@ -0,0 +1,5 @@
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
+24
View File
@@ -0,0 +1,24 @@
from app.extensions import db
class RawTransaction(db.Model):
__tablename__ = 'analytics_raw_transactions'
id = db.Column(db.Integer, primary_key=True)
cust_id = db.Column(db.String(10))
accountid = db.Column(db.String(10))
tran_id = db.Column(db.String(12))
entry_date = db.Column(db.TIMESTAMP, nullable=True)
value_date = db.Column(db.TIMESTAMP, nullable=True)
pstd_date = db.Column(db.TIMESTAMP, nullable=True)
tran_date = db.Column(db.TIMESTAMP, nullable=True)
tran_sub_ty = db.Column(db.String(4))
part_tran_ty = db.Column(db.String(4))
channel = db.Column(db.String(32))
tran_amt = db.Column(db.Numeric(20, 2))
balance = db.Column(db.Numeric(20, 2))
isreverse = db.Column(db.String(4))
reverse = db.Column(db.String(4))
tran_particular = db.Column(db.String(100))
def __repr__(self):
return f'<RawTransaction {self.tran_id}>'
+13
View File
@@ -0,0 +1,13 @@
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
# logging.StreamHandler(),
logging.FileHandler("app.log", mode='a') # Log to file
]
)
logger = logging.getLogger("DetectionService")