Merge branch 'development' of DigiFi/digifi-Analytics into master

This commit is contained in:
2025-08-22 11:15:41 +00:00
committed by Gogs
5 changed files with 121 additions and 1 deletions
+7
View File
@@ -0,0 +1,7 @@
- Connect to transcation data source
- Analyze transition data
- Detect salary
- Inform event of salary
- SAFETY - report transaction import gaps
- SAFETY - report database connections
- SAFETY - report when event cannot be reached
+7
View File
@@ -23,6 +23,7 @@ from .data_loader import DataLoader
from .salary_predictor import SalaryPredictor
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .db_operations import DatabaseOperations
from .salary_detect import SalaryDetect
# Configure logging
logging.basicConfig(
@@ -59,6 +60,8 @@ df = None
salary_predictor = None
salary_earner_analyzer = None
salary_detect = SalaryDetect()
class AnalysisResponse(BaseModel):
"""Response model for analysis endpoints."""
message: str
@@ -87,6 +90,10 @@ async def startup_event():
try:
logger.info("Initializing pipeline...")
# Start autonomous salary detection loop
salary_detect.start()
logger.info("Started autonomous salary detection loop.")
# Print network information
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
+22 -1
View File
@@ -4,6 +4,7 @@ Configuration settings for the salary analytics package.
import os
from dotenv import load_dotenv
import random
# Load environment variables
load_dotenv()
@@ -76,4 +77,24 @@ OUTPUT_PATHS = {
"inconsistent_model": os.path.join(MODEL_DIR, "inconsistent_model.joblib"),
"consistent_scaler": os.path.join(MODEL_DIR, "consistent_scaler.joblib"),
"inconsistent_scaler": os.path.join(MODEL_DIR, "inconsistent_scaler.joblib")
}
}
SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337")
SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS","api/rac-check")
# Salary Detect Endpoint Config
SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect"
SALARY_DETECT_HEADERS = {
"accept": "/",
"Content-Type": "application/json"
}
SALARY_DETECT_PAYLOADS = [
{"salaryDate": "2022-01-01", "customerId": "CN621868", "accountId": "2017821799", "salaryAmount": 200000},
{"customerId": "CUC2268333011", "accountId": "ACC8116931898", "salaryDate": "2025-07-08", "salaryAmount": 200000},
{"customerId": "CUC2163677018", "accountId": "ACC8118539484", "salaryDate": "2025-07-08", "salaryAmount": 200000},
{"customerId": "CUC1968062010", "accountId": "ACC8115473093", "salaryDate": "2025-07-08", "salaryAmount": 200000},
{"customerId": "CUC1302360013", "accountId": "ACC8117628489", "salaryDate": "2025-07-08", "salaryAmount": 200000}
]
def get_random_salary_payload():
return [random.choice(SALARY_DETECT_PAYLOADS)]
+46
View File
@@ -0,0 +1,46 @@
from django.conf import settings
import httpx
import json
from salary_analytics.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS
import logging
logger = logging.getLogger(__name__)
class SimbrellaIntegration:
BASE_URL = SIMBRELLA_BASE_URL
ENDPOINT_RAC_CHECKS = SIMBRELLA_ENDPOINT_RAC_CHECKS
@staticmethod
def rac_check(customer_id, account_id, transaction_id):
"""
Calls the RACCheck endpoit
"""
url = f"{SimbrellaIntegration.BASE_URL}/{SimbrellaIntegration.ENDPOINT_RAC_CHECKS}"
logger.info(f"Contacting Rack Checks EndPoint: {str(url)}", exc_info=True)
payload = {
"customerId": customer_id,
"accountId": account_id,
"transactionId": str(transaction_id),
"fbnTransactionId": str(transaction_id),
"countryCode": "NG",
"channel": "USSD"
}
headers = {
"Content-Type": "application/json",
"x-api-key": f"{settings.VALID_API_KEY}",
"App-Id": f"{settings.VALID_APP_ID}",
}
try:
response = httpx.post(url, json=payload, headers=headers, timeout=10.0)
logger.info(f"This is Response: {str(response)}", exc_info=True)
return response
except Exception as e:
logger.error(f"RACCheck API call failed: {str(e)}", exc_info=True)
raise Exception(f"RACCheck API call failed: {str(e)}")
+39
View File
@@ -0,0 +1,39 @@
import time
import logging
import threading
import requests
from .config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SalaryDetect:
def __init__(self):
self._running = False
self._thread = None
def _run(self):
while self._running:
logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Detecting salary...")
try:
payload = get_random_salary_payload()
response = requests.post(SALARY_DETECT_URL, headers=SALARY_DETECT_HEADERS, json=payload)
logger.info(f"POST {SALARY_DETECT_URL} status: {response.status_code}, response: {response.text}")
except Exception as e:
logger.error(f"Error during POST: {e}")
logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Salary detection complete")
time.sleep(120)
def start(self):
if not self._running:
self._running = True
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
def stop(self):
self._running = False
if self._thread:
self._thread.join()