From 99e1b82ea8a8fafdb73f77a4c522d41ece8dda03 Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Sat, 5 Jul 2025 19:27:53 +0100 Subject: [PATCH 1/6] Add autonomous salary detection feature to API Integrated SalaryDetect class into the API and initiated an autonomous salary detection loop during the startup event. This enhancement improves the system's capability to monitor and analyze salary data in real-time. --- PROJECT.md | 7 +++++++ salary_analytics/api.py | 7 +++++++ salary_analytics/salary_detect.py | 32 +++++++++++++++++++++++++++++++ 3 files changed, 46 insertions(+) create mode 100644 PROJECT.md create mode 100644 salary_analytics/salary_detect.py diff --git a/PROJECT.md b/PROJECT.md new file mode 100644 index 0000000..b9cc3ad --- /dev/null +++ b/PROJECT.md @@ -0,0 +1,7 @@ +- Connect to transcation data source +- Analyze transition data +- Detect salary +- Inform event of salary +- SAFETY - report transaction import gaps +- SAFETY - report database connections +- SAFETY - report when event cannot be reached \ No newline at end of file diff --git a/salary_analytics/api.py b/salary_analytics/api.py index 8d9e054..cb41a30 100644 --- a/salary_analytics/api.py +++ b/salary_analytics/api.py @@ -23,6 +23,7 @@ from .data_loader import DataLoader from .salary_predictor import SalaryPredictor from .salary_earner_analyzer import SalaryEarnerAnalyzer from .db_operations import DatabaseOperations +from .salary_detect import SalaryDetect # Configure logging logging.basicConfig( @@ -59,6 +60,8 @@ df = None salary_predictor = None salary_earner_analyzer = None +salary_detect = SalaryDetect() + class AnalysisResponse(BaseModel): """Response model for analysis endpoints.""" message: str @@ -87,6 +90,10 @@ async def startup_event(): try: logger.info("Initializing pipeline...") + # Start autonomous salary detection loop + salary_detect.start() + logger.info("Started autonomous salary detection loop.") + # Print network information hostname = socket.gethostname() ip_address = socket.gethostbyname(hostname) diff --git a/salary_analytics/salary_detect.py b/salary_analytics/salary_detect.py new file mode 100644 index 0000000..1bd76fe --- /dev/null +++ b/salary_analytics/salary_detect.py @@ -0,0 +1,32 @@ +import time +import logging +import threading + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +class SalaryDetect: + def __init__(self): + self._running = False + self._thread = None + + def _run(self): + while self._running: + logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Detecting salary...") + time.sleep(1) + logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Salary detection complete") + time.sleep(1) + + def start(self): + if not self._running: + self._running = True + self._thread = threading.Thread(target=self._run, daemon=True) + self._thread.start() + + def stop(self): + self._running = False + if self._thread: + self._thread.join() + + + From 332b743fa5977dc131229ba492289febb2343e8e Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Tue, 8 Jul 2025 16:43:37 +0100 Subject: [PATCH 2/6] Add salary detection configuration and payloads Updated config.py to include the salary detection endpoint, headers, and sample payloads. Enhanced salary_detect.py to utilize the new configuration for making POST requests to the endpoint with random salary data. Improved error handling during the request process. --- salary_analytics/config.py | 20 +++++++++++++++++++- salary_analytics/salary_detect.py | 11 +++++++++-- 2 files changed, 28 insertions(+), 3 deletions(-) diff --git a/salary_analytics/config.py b/salary_analytics/config.py index 1022fbd..8d0bdc6 100644 --- a/salary_analytics/config.py +++ b/salary_analytics/config.py @@ -4,6 +4,7 @@ Configuration settings for the salary analytics package. import os from dotenv import load_dotenv +import random # Load environment variables load_dotenv() @@ -76,4 +77,21 @@ OUTPUT_PATHS = { "inconsistent_model": os.path.join(MODEL_DIR, "inconsistent_model.joblib"), "consistent_scaler": os.path.join(MODEL_DIR, "consistent_scaler.joblib"), "inconsistent_scaler": os.path.join(MODEL_DIR, "inconsistent_scaler.joblib") -} \ No newline at end of file +} + +# Salary Detect Endpoint Config +SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect" +SALARY_DETECT_HEADERS = { + "accept": "/", + "Content-Type": "application/json" +} +SALARY_DETECT_PAYLOADS = [ + {"salaryDate": "2022-01-01", "customerId": "CN621868", "accountId": "2017821799", "salaryAmount": 200000}, + {"customerId": "CUC2268333011", "accountId": "ACC8116931898", "salaryDate": "2025-07-08", "salaryAmount": 200000}, + {"customerId": "CUC2163677018", "accountId": "ACC8118539484", "salaryDate": "2025-07-08", "salaryAmount": 200000}, + {"customerId": "CUC1968062010", "accountId": "ACC8115473093", "salaryDate": "2025-07-08", "salaryAmount": 200000}, + {"customerId": "CUC1302360013", "accountId": "ACC8117628489", "salaryDate": "2025-07-08", "salaryAmount": 200000} +] + +def get_random_salary_payload(): + return [random.choice(SALARY_DETECT_PAYLOADS)] \ No newline at end of file diff --git a/salary_analytics/salary_detect.py b/salary_analytics/salary_detect.py index 1bd76fe..0824ae2 100644 --- a/salary_analytics/salary_detect.py +++ b/salary_analytics/salary_detect.py @@ -1,6 +1,8 @@ import time import logging import threading +import requests +from .config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -13,9 +15,14 @@ class SalaryDetect: def _run(self): while self._running: logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Detecting salary...") - time.sleep(1) + try: + payload = get_random_salary_payload() + response = requests.post(SALARY_DETECT_URL, headers=SALARY_DETECT_HEADERS, json=payload) + logger.info(f"POST {SALARY_DETECT_URL} status: {response.status_code}, response: {response.text}") + except Exception as e: + logger.error(f"Error during POST: {e}") logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Salary detection complete") - time.sleep(1) + time.sleep(120) def start(self): if not self._running: From 5998db9c6845dfc9f849c31a1808257c85ebe6e9 Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Tue, 8 Jul 2025 16:50:42 +0100 Subject: [PATCH 3/6] added RAC check implementation --- salary_analytics/rac_check.py | 47 +++++++++++++++++++++++++++++++++++ 1 file changed, 47 insertions(+) create mode 100644 salary_analytics/rac_check.py diff --git a/salary_analytics/rac_check.py b/salary_analytics/rac_check.py new file mode 100644 index 0000000..90ec681 --- /dev/null +++ b/salary_analytics/rac_check.py @@ -0,0 +1,47 @@ +import json +from datetime import datetime, timezone +import logging + +logger = logging.getLogger(__name__) + +class RACCheckService: + def __init__(self): + self._rac_checks = [] # In-memory storage for demonstration + + def add_rac_check(self, customer_id, account_id, transaction_id, data=None): + rac_check = { + 'id': len(self._rac_checks) + 1, + 'transaction_id': transaction_id, + 'customer_id': customer_id, + 'account_id': account_id, + 'rac_response': json.dumps(data or {}), + 'created_at': datetime.now(timezone.utc), + 'updated_at': datetime.now(timezone.utc) + } + self._rac_checks.append(rac_check) + logger.info(f"Added RAC check: {rac_check}") + return rac_check + + def get_all_rac_checks(self): + logger.info(f"Retrieving all RAC checks. Count: {len(self._rac_checks)}") + return self._rac_checks if self._rac_checks else None + + def get_rac_check(self, customer_id, account_id): + for rac_check in self._rac_checks: + if rac_check['customer_id'] == customer_id and rac_check['account_id'] == account_id: + logger.info(f"Retrieved RAC check for customer_id={customer_id}, account_id={account_id}: {rac_check}") + return rac_check + logger.error(f"RAC Check for customer_id={customer_id}, account_id={account_id} not found") + raise ValueError("RAC Check for customer not found") + + def to_dict(self, rac_check): + logger.debug(f"Converting RAC check to dict: {rac_check}") + return { + "id": str(rac_check["id"]), + "transactionId": str(rac_check["transaction_id"]), + "customerId": rac_check["customer_id"], + "accountId": rac_check["account_id"], + "racResponse": json.loads(rac_check["rac_response"]), + "createdAt": rac_check["created_at"].isoformat(), + "updatedAt": rac_check["updated_at"].isoformat() if rac_check["updated_at"] else None + } \ No newline at end of file From 98de3de7a3b6afbbe29a0363f54c1b3157737f88 Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Tue, 8 Jul 2025 17:01:17 +0100 Subject: [PATCH 4/6] Revert "Add salary detection configuration and payloads" This reverts commit 332b743fa5977dc131229ba492289febb2343e8e. --- salary_analytics/config.py | 20 +------------------- salary_analytics/salary_detect.py | 11 ++--------- 2 files changed, 3 insertions(+), 28 deletions(-) diff --git a/salary_analytics/config.py b/salary_analytics/config.py index 8d0bdc6..1022fbd 100644 --- a/salary_analytics/config.py +++ b/salary_analytics/config.py @@ -4,7 +4,6 @@ Configuration settings for the salary analytics package. import os from dotenv import load_dotenv -import random # Load environment variables load_dotenv() @@ -77,21 +76,4 @@ OUTPUT_PATHS = { "inconsistent_model": os.path.join(MODEL_DIR, "inconsistent_model.joblib"), "consistent_scaler": os.path.join(MODEL_DIR, "consistent_scaler.joblib"), "inconsistent_scaler": os.path.join(MODEL_DIR, "inconsistent_scaler.joblib") -} - -# Salary Detect Endpoint Config -SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect" -SALARY_DETECT_HEADERS = { - "accept": "/", - "Content-Type": "application/json" -} -SALARY_DETECT_PAYLOADS = [ - {"salaryDate": "2022-01-01", "customerId": "CN621868", "accountId": "2017821799", "salaryAmount": 200000}, - {"customerId": "CUC2268333011", "accountId": "ACC8116931898", "salaryDate": "2025-07-08", "salaryAmount": 200000}, - {"customerId": "CUC2163677018", "accountId": "ACC8118539484", "salaryDate": "2025-07-08", "salaryAmount": 200000}, - {"customerId": "CUC1968062010", "accountId": "ACC8115473093", "salaryDate": "2025-07-08", "salaryAmount": 200000}, - {"customerId": "CUC1302360013", "accountId": "ACC8117628489", "salaryDate": "2025-07-08", "salaryAmount": 200000} -] - -def get_random_salary_payload(): - return [random.choice(SALARY_DETECT_PAYLOADS)] \ No newline at end of file +} \ No newline at end of file diff --git a/salary_analytics/salary_detect.py b/salary_analytics/salary_detect.py index 0824ae2..1bd76fe 100644 --- a/salary_analytics/salary_detect.py +++ b/salary_analytics/salary_detect.py @@ -1,8 +1,6 @@ import time import logging import threading -import requests -from .config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -15,14 +13,9 @@ class SalaryDetect: def _run(self): while self._running: logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Detecting salary...") - try: - payload = get_random_salary_payload() - response = requests.post(SALARY_DETECT_URL, headers=SALARY_DETECT_HEADERS, json=payload) - logger.info(f"POST {SALARY_DETECT_URL} status: {response.status_code}, response: {response.text}") - except Exception as e: - logger.error(f"Error during POST: {e}") + time.sleep(1) logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Salary detection complete") - time.sleep(120) + time.sleep(1) def start(self): if not self._running: From d650dbcd441ecad9a35676715e3476f08c955224 Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Tue, 8 Jul 2025 17:25:22 +0100 Subject: [PATCH 5/6] Revert "added RAC check implementation" This reverts commit 5998db9c6845dfc9f849c31a1808257c85ebe6e9. --- salary_analytics/rac_check.py | 47 ----------------------------------- 1 file changed, 47 deletions(-) delete mode 100644 salary_analytics/rac_check.py diff --git a/salary_analytics/rac_check.py b/salary_analytics/rac_check.py deleted file mode 100644 index 90ec681..0000000 --- a/salary_analytics/rac_check.py +++ /dev/null @@ -1,47 +0,0 @@ -import json -from datetime import datetime, timezone -import logging - -logger = logging.getLogger(__name__) - -class RACCheckService: - def __init__(self): - self._rac_checks = [] # In-memory storage for demonstration - - def add_rac_check(self, customer_id, account_id, transaction_id, data=None): - rac_check = { - 'id': len(self._rac_checks) + 1, - 'transaction_id': transaction_id, - 'customer_id': customer_id, - 'account_id': account_id, - 'rac_response': json.dumps(data or {}), - 'created_at': datetime.now(timezone.utc), - 'updated_at': datetime.now(timezone.utc) - } - self._rac_checks.append(rac_check) - logger.info(f"Added RAC check: {rac_check}") - return rac_check - - def get_all_rac_checks(self): - logger.info(f"Retrieving all RAC checks. Count: {len(self._rac_checks)}") - return self._rac_checks if self._rac_checks else None - - def get_rac_check(self, customer_id, account_id): - for rac_check in self._rac_checks: - if rac_check['customer_id'] == customer_id and rac_check['account_id'] == account_id: - logger.info(f"Retrieved RAC check for customer_id={customer_id}, account_id={account_id}: {rac_check}") - return rac_check - logger.error(f"RAC Check for customer_id={customer_id}, account_id={account_id} not found") - raise ValueError("RAC Check for customer not found") - - def to_dict(self, rac_check): - logger.debug(f"Converting RAC check to dict: {rac_check}") - return { - "id": str(rac_check["id"]), - "transactionId": str(rac_check["transaction_id"]), - "customerId": rac_check["customer_id"], - "accountId": rac_check["account_id"], - "racResponse": json.loads(rac_check["rac_response"]), - "createdAt": rac_check["created_at"].isoformat(), - "updatedAt": rac_check["updated_at"].isoformat() if rac_check["updated_at"] else None - } \ No newline at end of file From 41ce2caa0a8c98ad8d83a13c34bd2c7d610efc65 Mon Sep 17 00:00:00 2001 From: Joshua Salako Date: Tue, 8 Jul 2025 17:41:30 +0100 Subject: [PATCH 6/6] implement RAC check and salary detect every 2 minutes --- salary_analytics/config.py | 23 +++++++++++++++- salary_analytics/rac_check.py | 46 +++++++++++++++++++++++++++++++ salary_analytics/salary_detect.py | 11 ++++++-- 3 files changed, 77 insertions(+), 3 deletions(-) create mode 100644 salary_analytics/rac_check.py diff --git a/salary_analytics/config.py b/salary_analytics/config.py index 1022fbd..fccdb37 100644 --- a/salary_analytics/config.py +++ b/salary_analytics/config.py @@ -4,6 +4,7 @@ Configuration settings for the salary analytics package. import os from dotenv import load_dotenv +import random # Load environment variables load_dotenv() @@ -76,4 +77,24 @@ OUTPUT_PATHS = { "inconsistent_model": os.path.join(MODEL_DIR, "inconsistent_model.joblib"), "consistent_scaler": os.path.join(MODEL_DIR, "consistent_scaler.joblib"), "inconsistent_scaler": os.path.join(MODEL_DIR, "inconsistent_scaler.joblib") -} \ No newline at end of file +} + +SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337") +SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS","api/rac-check") + +# Salary Detect Endpoint Config +SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect" +SALARY_DETECT_HEADERS = { + "accept": "/", + "Content-Type": "application/json" +} +SALARY_DETECT_PAYLOADS = [ + {"salaryDate": "2022-01-01", "customerId": "CN621868", "accountId": "2017821799", "salaryAmount": 200000}, + {"customerId": "CUC2268333011", "accountId": "ACC8116931898", "salaryDate": "2025-07-08", "salaryAmount": 200000}, + {"customerId": "CUC2163677018", "accountId": "ACC8118539484", "salaryDate": "2025-07-08", "salaryAmount": 200000}, + {"customerId": "CUC1968062010", "accountId": "ACC8115473093", "salaryDate": "2025-07-08", "salaryAmount": 200000}, + {"customerId": "CUC1302360013", "accountId": "ACC8117628489", "salaryDate": "2025-07-08", "salaryAmount": 200000} +] + +def get_random_salary_payload(): + return [random.choice(SALARY_DETECT_PAYLOADS)] \ No newline at end of file diff --git a/salary_analytics/rac_check.py b/salary_analytics/rac_check.py new file mode 100644 index 0000000..d8f16d3 --- /dev/null +++ b/salary_analytics/rac_check.py @@ -0,0 +1,46 @@ +from django.conf import settings +import httpx +import json +from salary_analytics.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS +import logging + +logger = logging.getLogger(__name__) + +class SimbrellaIntegration: + BASE_URL = SIMBRELLA_BASE_URL + ENDPOINT_RAC_CHECKS = SIMBRELLA_ENDPOINT_RAC_CHECKS + + @staticmethod + def rac_check(customer_id, account_id, transaction_id): + """ + Calls the RACCheck endpoit + """ + url = f"{SimbrellaIntegration.BASE_URL}/{SimbrellaIntegration.ENDPOINT_RAC_CHECKS}" + logger.info(f"Contacting Rack Checks EndPoint: {str(url)}", exc_info=True) + + payload = { + "customerId": customer_id, + "accountId": account_id, + "transactionId": str(transaction_id), + "fbnTransactionId": str(transaction_id), + "countryCode": "NG", + "channel": "USSD" + } + + headers = { + "Content-Type": "application/json", + "x-api-key": f"{settings.VALID_API_KEY}", + "App-Id": f"{settings.VALID_APP_ID}", + } + + try: + response = httpx.post(url, json=payload, headers=headers, timeout=10.0) + + logger.info(f"This is Response: {str(response)}", exc_info=True) + + return response + + except Exception as e: + logger.error(f"RACCheck API call failed: {str(e)}", exc_info=True) + raise Exception(f"RACCheck API call failed: {str(e)}") + diff --git a/salary_analytics/salary_detect.py b/salary_analytics/salary_detect.py index 1bd76fe..0824ae2 100644 --- a/salary_analytics/salary_detect.py +++ b/salary_analytics/salary_detect.py @@ -1,6 +1,8 @@ import time import logging import threading +import requests +from .config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -13,9 +15,14 @@ class SalaryDetect: def _run(self): while self._running: logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Detecting salary...") - time.sleep(1) + try: + payload = get_random_salary_payload() + response = requests.post(SALARY_DETECT_URL, headers=SALARY_DETECT_HEADERS, json=payload) + logger.info(f"POST {SALARY_DETECT_URL} status: {response.status_code}, response: {response.text}") + except Exception as e: + logger.error(f"Error during POST: {e}") logger.info(f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Salary detection complete") - time.sleep(1) + time.sleep(120) def start(self): if not self._running: