[add]: oracle database migrations #2

Open
vivian.d.simbrellang.com wants to merge 19 commits from refactor-cleanup into master
46 changed files with 1896 additions and 675 deletions
+11 -5
View File
@@ -1,20 +1,26 @@
FROM python:3.11-slim
# Set the working directory in the container
WORKDIR /app
# Copy the current directory contents into the container at /app
COPY . /app
RUN apt-get update && apt-get install -y libpq-dev && rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install -r requirements.txt
COPY salary_analytics/ ./salary_analytics/
RUN mkdir -p output/csv output/plots output/models
ENV PYTHONPATH=/app
ENV HOST=0.0.0.0
ENV PORT=8000
# ENV FLASK_APP=wsgi.py
ENV FLASK_APP=run.py.
ENV FLASK_RUN_HOST=0.0.0.0
EXPOSE 8000
CMD ["uvicorn", "salary_analytics.api:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
# CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "wsgi:wsgi_app"]
CMD ["uvicorn", "run:app", "--host", "0.0.0.0", "--port", "8000", "--reload"]
+644
View File
@@ -0,0 +1,644 @@
2025-09-10 10:21:11,016 - INFO - Initializing pipeline...
2025-09-10 10:21:11,019 - INFO - [2025-09-10 10:21:11] Detecting salary...
2025-09-10 10:21:11,019 - INFO - Started autonomous salary detection loop.
2025-09-10 10:21:11,030 - INFO - Server running on hostname: 22bad35c69c3
2025-09-10 10:21:11,035 - INFO - Server IP address: 172.25.0.2
2025-09-10 10:21:11,036 - INFO - Server is accessible at:
2025-09-10 10:21:11,037 - INFO - - http://localhost:8000
2025-09-10 10:21:11,039 - INFO - - http://127.0.0.1:8000
2025-09-10 10:21:11,040 - INFO - - http://172.25.0.2:8000
2025-09-10 10:21:11,041 - INFO - Pipeline initialized successfully
2025-09-10 10:21:11,532 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:21:11,533 - INFO - [2025-09-10 10:21:11] Salary detection complete
2025-09-10 10:21:19,451 - INFO - Initializing SalaryAnalyticsPipeline
2025-09-10 10:21:19,452 - INFO - Starting data loading process
2025-09-10 10:21:19,453 - INFO - No database connection. Attempting to connect...
2025-09-10 10:21:19,455 - INFO - Attempting to connect to database...
2025-09-10 10:21:27,271 - INFO - Table customer_account_transaction_hx exists with 5354307 rows
2025-09-10 10:21:27,488 - INFO - Connected successfully to database!
2025-09-10 10:21:27,715 - INFO - Loading data from table: customer_account_transaction_hx
2025-09-10 10:21:28,614 - INFO - Total rows to process: 5354307
2025-09-10 10:21:28,852 - INFO - Loading chunk starting at offset 0
2025-09-10 10:21:40,251 - INFO - Loading chunk starting at offset 10000
2025-09-10 10:21:46,981 - INFO - Loading chunk starting at offset 20000
2025-09-10 10:21:57,010 - INFO - Loading chunk starting at offset 30000
2025-09-10 10:22:04,792 - INFO - Loading chunk starting at offset 40000
2025-09-10 10:22:09,599 - INFO - Loading chunk starting at offset 50000
2025-09-10 10:22:15,016 - INFO - Loading chunk starting at offset 60000
2025-09-10 10:22:18,613 - INFO - Loading chunk starting at offset 70000
2025-09-10 10:22:22,007 - INFO - Loading chunk starting at offset 80000
2025-09-10 10:22:28,397 - INFO - Loading chunk starting at offset 90000
2025-09-10 10:22:36,081 - INFO - Loading chunk starting at offset 100000
2025-09-10 10:22:48,738 - INFO - Loading chunk starting at offset 110000
2025-09-10 10:23:01,521 - INFO - Loading chunk starting at offset 120000
2025-09-10 10:23:11,532 - INFO - [2025-09-10 10:23:11] Detecting salary...
2025-09-10 10:23:13,504 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:23:13,504 - INFO - [2025-09-10 10:23:13] Salary detection complete
2025-09-10 10:23:16,348 - INFO - Loading chunk starting at offset 130000
2025-09-10 10:23:23,259 - INFO - Loading chunk starting at offset 140000
2025-09-10 10:23:29,284 - INFO - Loading chunk starting at offset 150000
2025-09-10 10:23:41,579 - INFO - Loading chunk starting at offset 160000
2025-09-10 10:23:54,788 - INFO - Loading chunk starting at offset 170000
2025-09-10 10:24:19,519 - INFO - Loading chunk starting at offset 180000
2025-09-10 10:24:31,657 - INFO - Loading chunk starting at offset 190000
2025-09-10 10:24:46,130 - INFO - Loading chunk starting at offset 200000
2025-09-10 10:24:57,289 - INFO - Loading chunk starting at offset 210000
2025-09-10 10:25:07,964 - INFO - Loading chunk starting at offset 220000
2025-09-10 10:25:13,503 - INFO - [2025-09-10 10:25:13] Detecting salary...
2025-09-10 10:25:15,477 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:25:15,478 - INFO - [2025-09-10 10:25:15] Salary detection complete
2025-09-10 10:25:17,793 - INFO - Loading chunk starting at offset 230000
2025-09-10 10:25:25,026 - INFO - Loading chunk starting at offset 240000
2025-09-10 10:25:32,079 - INFO - Loading chunk starting at offset 250000
2025-09-10 10:25:39,990 - INFO - Loading chunk starting at offset 260000
2025-09-10 10:25:50,492 - INFO - Loading chunk starting at offset 270000
2025-09-10 10:26:00,181 - INFO - Loading chunk starting at offset 280000
2025-09-10 10:26:10,138 - INFO - Loading chunk starting at offset 290000
2025-09-10 10:26:20,437 - INFO - Loading chunk starting at offset 300000
2025-09-10 10:26:34,962 - INFO - Loading chunk starting at offset 310000
2025-09-10 10:26:46,248 - INFO - Loading chunk starting at offset 320000
2025-09-10 10:26:55,275 - INFO - Loading chunk starting at offset 330000
2025-09-10 10:27:09,733 - INFO - Loading chunk starting at offset 340000
2025-09-10 10:27:15,480 - INFO - [2025-09-10 10:27:15] Detecting salary...
2025-09-10 10:27:16,553 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:27:16,553 - INFO - [2025-09-10 10:27:16] Salary detection complete
2025-09-10 10:27:26,568 - INFO - Loading chunk starting at offset 350000
2025-09-10 10:27:36,472 - INFO - Loading chunk starting at offset 360000
2025-09-10 10:27:44,909 - INFO - Loading chunk starting at offset 370000
2025-09-10 10:27:54,557 - INFO - Loading chunk starting at offset 380000
2025-09-10 10:28:00,588 - INFO - Loading chunk starting at offset 390000
2025-09-10 10:28:05,957 - INFO - Loading chunk starting at offset 400000
2025-09-10 10:28:12,058 - INFO - Loading chunk starting at offset 410000
2025-09-10 10:28:19,248 - INFO - Loading chunk starting at offset 420000
2025-09-10 10:28:29,938 - INFO - Loading chunk starting at offset 430000
2025-09-10 10:28:51,274 - INFO - Loading chunk starting at offset 440000
2025-09-10 10:28:57,720 - INFO - Loading chunk starting at offset 450000
2025-09-10 10:29:02,117 - INFO - Loading chunk starting at offset 460000
2025-09-10 10:29:11,555 - INFO - Loading chunk starting at offset 470000
2025-09-10 10:29:16,565 - INFO - [2025-09-10 10:29:16] Detecting salary...
2025-09-10 10:29:17,452 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:29:17,455 - INFO - [2025-09-10 10:29:17] Salary detection complete
2025-09-10 10:29:30,145 - INFO - Loading chunk starting at offset 480000
2025-09-10 10:29:39,048 - INFO - Loading chunk starting at offset 490000
2025-09-10 10:29:43,412 - INFO - Loading chunk starting at offset 500000
2025-09-10 10:29:47,415 - INFO - Loading chunk starting at offset 510000
2025-09-10 10:29:52,149 - INFO - Loading chunk starting at offset 520000
2025-09-10 10:29:58,331 - INFO - Loading chunk starting at offset 530000
2025-09-10 10:30:04,933 - INFO - Loading chunk starting at offset 540000
2025-09-10 10:30:11,934 - INFO - Loading chunk starting at offset 550000
2025-09-10 10:30:19,315 - INFO - Loading chunk starting at offset 560000
2025-09-10 10:30:25,683 - INFO - Loading chunk starting at offset 570000
2025-09-10 10:30:33,577 - INFO - Loading chunk starting at offset 580000
2025-09-10 10:30:40,363 - INFO - Loading chunk starting at offset 590000
2025-09-10 10:30:46,205 - INFO - Loading chunk starting at offset 600000
2025-09-10 10:30:53,957 - INFO - Loading chunk starting at offset 610000
2025-09-10 10:31:08,083 - INFO - Loading chunk starting at offset 620000
2025-09-10 10:31:17,454 - INFO - [2025-09-10 10:31:17] Detecting salary...
2025-09-10 10:31:17,989 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:31:17,989 - INFO - [2025-09-10 10:31:17] Salary detection complete
2025-09-10 10:31:18,901 - INFO - Loading chunk starting at offset 630000
2025-09-10 10:31:27,551 - INFO - Loading chunk starting at offset 640000
2025-09-10 10:31:33,662 - INFO - Loading chunk starting at offset 650000
2025-09-10 10:31:40,107 - INFO - Loading chunk starting at offset 660000
2025-09-10 10:31:49,938 - INFO - Loading chunk starting at offset 670000
2025-09-10 10:31:57,777 - INFO - Loading chunk starting at offset 680000
2025-09-10 10:32:06,629 - INFO - Loading chunk starting at offset 690000
2025-09-10 10:32:15,281 - INFO - Loading chunk starting at offset 700000
2025-09-10 10:32:25,328 - INFO - Loading chunk starting at offset 710000
2025-09-10 10:32:40,639 - INFO - Loading chunk starting at offset 720000
2025-09-10 10:32:51,078 - INFO - Loading chunk starting at offset 730000
2025-09-10 10:33:03,924 - INFO - Loading chunk starting at offset 740000
2025-09-10 10:33:13,007 - INFO - Loading chunk starting at offset 750000
2025-09-10 10:33:17,989 - INFO - [2025-09-10 10:33:17] Detecting salary...
2025-09-10 10:33:19,698 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:33:20,039 - INFO - [2025-09-10 10:33:20] Salary detection complete
2025-09-10 10:33:20,857 - INFO - Loading chunk starting at offset 760000
2025-09-10 10:33:30,803 - INFO - Loading chunk starting at offset 770000
2025-09-10 10:33:38,162 - INFO - Loading chunk starting at offset 780000
2025-09-10 10:33:44,504 - INFO - Loading chunk starting at offset 790000
2025-09-10 10:33:50,063 - INFO - Loading chunk starting at offset 800000
2025-09-10 10:33:57,957 - INFO - Loading chunk starting at offset 810000
2025-09-10 10:34:05,256 - INFO - Loading chunk starting at offset 820000
2025-09-10 10:34:12,212 - INFO - Loading chunk starting at offset 830000
2025-09-10 10:34:20,478 - INFO - Loading chunk starting at offset 840000
2025-09-10 10:34:28,018 - INFO - Loading chunk starting at offset 850000
2025-09-10 10:34:36,413 - INFO - Loading chunk starting at offset 860000
2025-09-10 10:34:50,168 - INFO - Loading chunk starting at offset 870000
2025-09-10 10:35:13,867 - INFO - Loading chunk starting at offset 880000
2025-09-10 10:35:20,391 - INFO - [2025-09-10 10:35:20] Detecting salary...
2025-09-10 10:35:21,791 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:35:21,794 - INFO - [2025-09-10 10:35:21] Salary detection complete
2025-09-10 10:35:28,918 - INFO - Loading chunk starting at offset 890000
2025-09-10 10:35:40,300 - INFO - Loading chunk starting at offset 900000
2025-09-10 10:36:05,763 - INFO - Loading chunk starting at offset 910000
2025-09-10 10:36:15,199 - INFO - Loading chunk starting at offset 920000
2025-09-10 10:36:46,036 - INFO - Loading chunk starting at offset 930000
2025-09-10 10:36:53,221 - INFO - Loading chunk starting at offset 940000
2025-09-10 10:37:06,754 - INFO - Loading chunk starting at offset 950000
2025-09-10 10:37:15,558 - INFO - Loading chunk starting at offset 960000
2025-09-10 10:37:21,798 - INFO - [2025-09-10 10:37:21] Detecting salary...
2025-09-10 10:37:22,954 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:37:22,955 - INFO - [2025-09-10 10:37:22] Salary detection complete
2025-09-10 10:37:29,088 - INFO - Loading chunk starting at offset 970000
2025-09-10 10:37:54,730 - INFO - Loading chunk starting at offset 980000
2025-09-10 10:38:13,667 - INFO - Loading chunk starting at offset 990000
2025-09-10 10:38:27,880 - INFO - Loading chunk starting at offset 1000000
2025-09-10 10:39:07,546 - INFO - Loading chunk starting at offset 1010000
2025-09-10 10:39:17,369 - INFO - Loading chunk starting at offset 1020000
2025-09-10 10:39:23,176 - INFO - [2025-09-10 10:39:23] Detecting salary...
2025-09-10 10:39:24,444 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:39:24,445 - INFO - [2025-09-10 10:39:24] Salary detection complete
2025-09-10 10:39:26,048 - INFO - Loading chunk starting at offset 1030000
2025-09-10 10:39:45,494 - INFO - Loading chunk starting at offset 1040000
2025-09-10 10:39:50,480 - INFO - Loading chunk starting at offset 1050000
2025-09-10 10:39:54,793 - INFO - Loading chunk starting at offset 1060000
2025-09-10 10:39:59,415 - INFO - Loading chunk starting at offset 1070000
2025-09-10 10:40:04,306 - INFO - Loading chunk starting at offset 1080000
2025-09-10 10:40:10,517 - INFO - Loading chunk starting at offset 1090000
2025-09-10 10:40:15,671 - INFO - Loading chunk starting at offset 1100000
2025-09-10 10:40:21,662 - INFO - Loading chunk starting at offset 1110000
2025-09-10 10:40:30,551 - INFO - Loading chunk starting at offset 1120000
2025-09-10 10:40:47,391 - INFO - Loading chunk starting at offset 1130000
2025-09-10 10:40:55,609 - INFO - Loading chunk starting at offset 1140000
2025-09-10 10:41:04,496 - INFO - Loading chunk starting at offset 1150000
2025-09-10 10:41:14,608 - INFO - Loading chunk starting at offset 1160000
2025-09-10 10:41:24,447 - INFO - [2025-09-10 10:41:24] Detecting salary...
2025-09-10 10:41:28,537 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:41:28,548 - INFO - [2025-09-10 10:41:28] Salary detection complete
2025-09-10 10:42:00,867 - INFO - Loading chunk starting at offset 1170000
2025-09-10 10:42:13,069 - INFO - Loading chunk starting at offset 1180000
2025-09-10 10:42:21,593 - INFO - Loading chunk starting at offset 1190000
2025-09-10 10:42:32,011 - INFO - Loading chunk starting at offset 1200000
2025-09-10 10:42:37,982 - INFO - Loading chunk starting at offset 1210000
2025-09-10 10:42:45,458 - INFO - Loading chunk starting at offset 1220000
2025-09-10 10:42:54,545 - INFO - Loading chunk starting at offset 1230000
2025-09-10 10:43:15,705 - INFO - Loading chunk starting at offset 1240000
2025-09-10 10:43:28,549 - INFO - [2025-09-10 10:43:28] Detecting salary...
2025-09-10 10:43:31,640 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:43:31,641 - INFO - [2025-09-10 10:43:31] Salary detection complete
2025-09-10 10:43:43,344 - INFO - Loading chunk starting at offset 1250000
2025-09-10 10:43:50,004 - INFO - Loading chunk starting at offset 1260000
2025-09-10 10:43:59,430 - INFO - Loading chunk starting at offset 1270000
2025-09-10 10:44:07,478 - INFO - Loading chunk starting at offset 1280000
2025-09-10 10:44:18,927 - INFO - Loading chunk starting at offset 1290000
2025-09-10 10:44:28,523 - INFO - Loading chunk starting at offset 1300000
2025-09-10 10:44:38,629 - INFO - Loading chunk starting at offset 1310000
2025-09-10 10:44:56,977 - INFO - Loading chunk starting at offset 1320000
2025-09-10 10:45:09,203 - INFO - Loading chunk starting at offset 1330000
2025-09-10 10:45:18,657 - INFO - Loading chunk starting at offset 1340000
2025-09-10 10:45:27,201 - INFO - Loading chunk starting at offset 1350000
2025-09-10 10:45:31,641 - INFO - [2025-09-10 10:45:31] Detecting salary...
2025-09-10 10:45:32,261 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:45:32,262 - INFO - [2025-09-10 10:45:32] Salary detection complete
2025-09-10 10:45:35,308 - INFO - Loading chunk starting at offset 1360000
2025-09-10 10:45:44,783 - INFO - Loading chunk starting at offset 1370000
2025-09-10 10:45:52,048 - INFO - Loading chunk starting at offset 1380000
2025-09-10 10:45:58,491 - INFO - Loading chunk starting at offset 1390000
2025-09-10 10:46:13,270 - INFO - Loading chunk starting at offset 1400000
2025-09-10 10:46:25,221 - INFO - Loading chunk starting at offset 1410000
2025-09-10 10:46:34,358 - INFO - Loading chunk starting at offset 1420000
2025-09-10 10:46:38,777 - INFO - Loading chunk starting at offset 1430000
2025-09-10 10:46:42,534 - INFO - Loading chunk starting at offset 1440000
2025-09-10 10:46:46,147 - INFO - Loading chunk starting at offset 1450000
2025-09-10 10:46:50,727 - INFO - Loading chunk starting at offset 1460000
2025-09-10 10:46:55,939 - INFO - Loading chunk starting at offset 1470000
2025-09-10 10:47:01,696 - INFO - Loading chunk starting at offset 1480000
2025-09-10 10:47:07,411 - INFO - Loading chunk starting at offset 1490000
2025-09-10 10:47:13,449 - INFO - Loading chunk starting at offset 1500000
2025-09-10 10:47:23,058 - INFO - Loading chunk starting at offset 1510000
2025-09-10 10:47:30,097 - INFO - Loading chunk starting at offset 1520000
2025-09-10 10:47:32,259 - INFO - [2025-09-10 10:47:32] Detecting salary...
2025-09-10 10:47:32,923 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:47:32,924 - INFO - [2025-09-10 10:47:32] Salary detection complete
2025-09-10 10:47:36,423 - INFO - Loading chunk starting at offset 1530000
2025-09-10 10:47:45,086 - INFO - Loading chunk starting at offset 1540000
2025-09-10 10:47:57,079 - INFO - Loading chunk starting at offset 1550000
2025-09-10 10:48:19,741 - INFO - Loading chunk starting at offset 1560000
2025-09-10 10:48:41,300 - INFO - Loading chunk starting at offset 1570000
2025-09-10 10:48:51,349 - INFO - Loading chunk starting at offset 1580000
2025-09-10 10:48:58,892 - INFO - Loading chunk starting at offset 1590000
2025-09-10 10:49:04,857 - INFO - Loading chunk starting at offset 1600000
2025-09-10 10:49:10,299 - INFO - Loading chunk starting at offset 1610000
2025-09-10 10:49:16,650 - INFO - Loading chunk starting at offset 1620000
2025-09-10 10:49:23,107 - INFO - Loading chunk starting at offset 1630000
2025-09-10 10:49:32,320 - INFO - Loading chunk starting at offset 1640000
2025-09-10 10:49:32,927 - INFO - [2025-09-10 10:49:32] Detecting salary...
2025-09-10 10:49:33,484 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:49:33,485 - INFO - [2025-09-10 10:49:33] Salary detection complete
2025-09-10 10:49:40,707 - INFO - Loading chunk starting at offset 1650000
2025-09-10 10:49:50,677 - INFO - Loading chunk starting at offset 1660000
2025-09-10 10:49:59,664 - INFO - Loading chunk starting at offset 1670000
2025-09-10 10:50:07,452 - INFO - Loading chunk starting at offset 1680000
2025-09-10 10:50:13,583 - INFO - Loading chunk starting at offset 1690000
2025-09-10 10:50:20,858 - INFO - Loading chunk starting at offset 1700000
2025-09-10 10:50:27,519 - INFO - Loading chunk starting at offset 1710000
2025-09-10 10:50:34,012 - INFO - Loading chunk starting at offset 1720000
2025-09-10 10:50:39,628 - INFO - Loading chunk starting at offset 1730000
2025-09-10 10:50:50,012 - INFO - Loading chunk starting at offset 1740000
2025-09-10 10:51:06,647 - INFO - Loading chunk starting at offset 1750000
2025-09-10 10:51:17,401 - INFO - Loading chunk starting at offset 1760000
2025-09-10 10:51:26,360 - INFO - Loading chunk starting at offset 1770000
2025-09-10 10:51:33,485 - INFO - [2025-09-10 10:51:33] Detecting salary...
2025-09-10 10:51:55,392 - INFO - Initializing pipeline...
2025-09-10 10:51:55,395 - INFO - [2025-09-10 10:51:55] Detecting salary...
2025-09-10 10:51:55,395 - INFO - Started autonomous salary detection loop.
2025-09-10 10:51:55,409 - INFO - Server running on hostname: 22bad35c69c3
2025-09-10 10:51:55,416 - INFO - Server IP address: 172.25.0.2
2025-09-10 10:51:55,424 - INFO - Server is accessible at:
2025-09-10 10:51:55,426 - INFO - - http://localhost:8000
2025-09-10 10:51:55,444 - INFO - - http://127.0.0.1:8000
2025-09-10 10:51:55,455 - INFO - - http://172.25.0.2:8000
2025-09-10 10:51:55,456 - INFO - Pipeline initialized successfully
2025-09-10 10:51:56,024 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:51:56,024 - INFO - [2025-09-10 10:51:56] Salary detection complete
2025-09-10 10:52:36,450 - INFO - Initializing SalaryAnalyticsPipeline
2025-09-10 10:52:36,451 - INFO - Starting data loading process
2025-09-10 10:52:36,452 - INFO - No database connection. Attempting to connect...
2025-09-10 10:52:36,452 - INFO - Attempting to connect to database...
2025-09-10 10:52:36,538 - ERROR - Error connecting to database: No module named 'oracledb'
2025-09-10 10:52:36,539 - ERROR - Failed to establish database connection
2025-09-10 10:52:36,539 - ERROR - Failed to load data
2025-09-10 10:52:36,540 - ERROR - Failed to load data
2025-09-10 10:52:36,541 - INFO - Load data endpoint failed after 0.09 seconds
2025-09-10 10:52:36,541 - ERROR - Error loading data: 500: Failed to load data
2025-09-10 10:52:36,542 - INFO - Load data endpoint failed after 0.09 seconds
2025-09-10 10:52:58,149 - INFO - Shutting down Salary Analytics API...
2025-09-10 10:53:10,697 - INFO - generated new fontManager
2025-09-10 10:53:16,039 - INFO - Initializing pipeline...
2025-09-10 10:53:16,041 - INFO - [2025-09-10 10:53:16] Detecting salary...
2025-09-10 10:53:16,042 - INFO - Started autonomous salary detection loop.
2025-09-10 10:53:16,055 - INFO - Server running on hostname: 1c5d2376fb2a
2025-09-10 10:53:16,056 - INFO - Server IP address: 172.25.0.2
2025-09-10 10:53:16,057 - INFO - Server is accessible at:
2025-09-10 10:53:16,058 - INFO - - http://localhost:8000
2025-09-10 10:53:16,059 - INFO - - http://127.0.0.1:8000
2025-09-10 10:53:16,060 - INFO - - http://172.25.0.2:8000
2025-09-10 10:53:16,062 - INFO - Pipeline initialized successfully
2025-09-10 10:53:16,812 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 10:53:16,813 - INFO - [2025-09-10 10:53:16] Salary detection complete
2025-09-10 10:53:25,477 - INFO - Initializing SalaryAnalyticsPipeline
2025-09-10 10:53:25,482 - INFO - Starting data loading process
2025-09-10 10:53:25,483 - INFO - No database connection. Attempting to connect...
2025-09-10 10:53:25,484 - INFO - Attempting to connect to database...
2025-09-10 10:53:25,679 - ERROR - Error connecting to database: No module named 'oracledb'
2025-09-10 10:53:25,679 - ERROR - Failed to establish database connection
2025-09-10 10:53:25,680 - ERROR - Failed to load data
2025-09-10 10:53:25,680 - ERROR - Failed to load data
2025-09-10 10:53:25,681 - INFO - Load data endpoint failed after 0.20 seconds
2025-09-10 10:53:25,682 - ERROR - Error loading data: 500: Failed to load data
2025-09-10 10:53:25,682 - INFO - Load data endpoint failed after 0.20 seconds
2025-09-10 10:53:46,324 - INFO - Shutting down Salary Analytics API...
2025-09-10 11:07:49,841 - INFO - generated new fontManager
2025-09-10 11:07:59,771 - INFO - Initializing pipeline...
2025-09-10 11:07:59,774 - INFO - [2025-09-10 11:07:59] Detecting salary...
2025-09-10 11:07:59,774 - INFO - Started autonomous salary detection loop.
2025-09-10 11:07:59,893 - INFO - Server running on hostname: 0b21809edf52
2025-09-10 11:07:59,894 - INFO - Server IP address: 172.25.0.2
2025-09-10 11:07:59,895 - INFO - Server is accessible at:
2025-09-10 11:07:59,915 - INFO - - http://localhost:8000
2025-09-10 11:07:59,917 - INFO - - http://127.0.0.1:8000
2025-09-10 11:07:59,918 - INFO - - http://172.25.0.2:8000
2025-09-10 11:07:59,919 - INFO - Pipeline initialized successfully
2025-09-10 11:08:01,268 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:08:01,269 - INFO - [2025-09-10 11:08:01] Salary detection complete
2025-09-10 11:08:15,668 - INFO - Initializing SalaryAnalyticsPipeline
2025-09-10 11:08:15,669 - INFO - Starting data loading process
2025-09-10 11:08:15,671 - INFO - No database connection. Attempting to connect...
2025-09-10 11:08:15,672 - INFO - Attempting to connect to database...
2025-09-10 11:08:20,076 - ERROR - Error connecting to database: (oracledb.exceptions.DatabaseError) ORA-00936: missing expression
Help: https://docs.oracle.com/error-help/db/ora-00936/
[SQL: SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'customer_account_transaction_hx')]
(Background on this error at: https://sqlalche.me/e/20/4xp6)
2025-09-10 11:08:20,086 - ERROR - Failed to establish database connection
2025-09-10 11:08:20,087 - ERROR - Failed to load data
2025-09-10 11:08:20,092 - ERROR - Failed to load data
2025-09-10 11:08:20,119 - INFO - Load data endpoint failed after 4.45 seconds
2025-09-10 11:08:20,123 - ERROR - Error loading data: 500: Failed to load data
2025-09-10 11:08:20,124 - INFO - Load data endpoint failed after 4.46 seconds
2025-09-10 11:10:01,280 - INFO - [2025-09-10 11:10:01] Detecting salary...
2025-09-10 11:10:02,367 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:10:02,368 - INFO - [2025-09-10 11:10:02] Salary detection complete
2025-09-10 11:12:02,395 - INFO - [2025-09-10 11:12:02] Detecting salary...
2025-09-10 11:12:03,234 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:12:03,256 - INFO - [2025-09-10 11:12:03] Salary detection complete
2025-09-10 11:14:03,279 - INFO - [2025-09-10 11:14:03] Detecting salary...
2025-09-10 11:14:04,266 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:14:04,267 - INFO - [2025-09-10 11:14:04] Salary detection complete
2025-09-10 11:16:04,268 - INFO - [2025-09-10 11:16:04] Detecting salary...
2025-09-10 11:16:05,133 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:16:05,134 - INFO - [2025-09-10 11:16:05] Salary detection complete
2025-09-10 11:18:05,134 - INFO - [2025-09-10 11:18:05] Detecting salary...
2025-09-10 11:18:05,955 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:18:05,956 - INFO - [2025-09-10 11:18:05] Salary detection complete
2025-09-10 11:20:05,855 - INFO - [2025-09-10 11:20:05] Detecting salary...
2025-09-10 11:20:06,453 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:20:06,463 - INFO - [2025-09-10 11:20:06] Salary detection complete
2025-09-10 11:22:06,512 - INFO - [2025-09-10 11:22:06] Detecting salary...
2025-09-10 11:22:07,087 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:22:07,088 - INFO - [2025-09-10 11:22:07] Salary detection complete
2025-09-10 11:24:07,092 - INFO - [2025-09-10 11:24:07] Detecting salary...
2025-09-10 11:24:08,235 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:24:08,236 - INFO - [2025-09-10 11:24:08] Salary detection complete
2025-09-10 11:25:13,551 - INFO - Shutting down Salary Analytics API...
2025-09-10 11:25:52,852 - INFO - Initializing pipeline...
2025-09-10 11:25:52,857 - INFO - [2025-09-10 11:25:52] Detecting salary...
2025-09-10 11:25:52,857 - INFO - Started autonomous salary detection loop.
2025-09-10 11:25:52,866 - INFO - Server running on hostname: 0b21809edf52
2025-09-10 11:25:52,876 - INFO - Server IP address: 172.25.0.2
2025-09-10 11:25:52,881 - INFO - Server is accessible at:
2025-09-10 11:25:52,882 - INFO - - http://localhost:8000
2025-09-10 11:25:52,883 - INFO - - http://127.0.0.1:8000
2025-09-10 11:25:52,884 - INFO - - http://172.25.0.2:8000
2025-09-10 11:25:52,884 - INFO - Pipeline initialized successfully
2025-09-10 11:25:53,580 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:25:53,581 - INFO - [2025-09-10 11:25:53] Salary detection complete
2025-09-10 11:27:53,578 - INFO - [2025-09-10 11:27:53] Detecting salary...
2025-09-10 11:27:54,235 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:27:54,235 - INFO - [2025-09-10 11:27:54] Salary detection complete
2025-09-10 11:29:54,235 - INFO - [2025-09-10 11:29:54] Detecting salary...
2025-09-10 11:29:54,827 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-10 11:29:54,827 - INFO - [2025-09-10 11:29:54] Salary detection complete
2025-09-10 11:30:20,979 - INFO - Shutting down Salary Analytics API...
2025-09-10 11:31:18,351 - INFO - Initializing pipeline...
2025-09-10 11:31:18,375 - INFO - [2025-09-10 11:31:18] Detecting salary...
2025-09-10 11:31:18,376 - INFO - Started autonomous salary detection loop.
2025-09-10 11:31:18,405 - INFO - Server running on hostname: 0b21809edf52
2025-09-10 11:31:18,447 - INFO - Server IP address: 172.25.0.2
2025-09-10 11:31:18,528 - INFO - Server is accessible at:
2025-09-10 11:31:18,552 - INFO - - http://localhost:8000
2025-09-10 11:31:18,552 - INFO - - http://127.0.0.1:8000
2025-09-10 11:31:18,553 - INFO - - http://172.25.0.2:8000
2025-09-10 11:31:18,554 - INFO - Pipeline initialized successfully
2025-09-19 11:27:20,722 - INFO - Initializing pipeline...
2025-09-19 11:27:20,724 - INFO - [2025-09-19 11:27:20] Detecting salary...
2025-09-19 11:27:20,724 - INFO - Started autonomous salary detection loop.
2025-09-19 11:27:20,750 - INFO - Server running on hostname: 0b21809edf52
2025-09-19 11:27:20,752 - INFO - Server IP address: 172.25.0.2
2025-09-19 11:27:20,753 - INFO - Server is accessible at:
2025-09-19 11:27:20,759 - INFO - - http://localhost:8000
2025-09-19 11:27:20,761 - INFO - - http://127.0.0.1:8000
2025-09-19 11:27:20,766 - INFO - - http://172.25.0.2:8000
2025-09-19 11:27:20,771 - INFO - Pipeline initialized successfully
2025-09-19 11:27:24,622 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-19 11:27:24,628 - INFO - [2025-09-19 11:27:24] Salary detection complete
2025-09-19 11:29:13,095 - INFO - Shutting down Salary Analytics API...
2025-09-24 09:47:38,012 - INFO - Initializing pipeline...
2025-09-24 09:47:38,017 - INFO - [2025-09-24 09:47:38] Detecting salary...
2025-09-24 09:47:38,024 - INFO - Started autonomous salary detection loop.
2025-09-24 09:47:38,077 - INFO - Server running on hostname: 0b21809edf52
2025-09-24 09:47:38,079 - INFO - Server IP address: 172.25.0.2
2025-09-24 09:47:38,080 - INFO - Server is accessible at:
2025-09-24 09:47:38,081 - INFO - - http://localhost:8000
2025-09-24 09:47:38,082 - INFO - - http://127.0.0.1:8000
2025-09-24 09:47:38,083 - INFO - - http://172.25.0.2:8000
2025-09-24 09:47:38,084 - INFO - Pipeline initialized successfully
2025-09-24 09:47:41,272 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-24 09:47:41,273 - INFO - [2025-09-24 09:47:41] Salary detection complete
2025-09-24 09:49:41,290 - INFO - [2025-09-24 09:49:41] Detecting salary...
2025-09-24 09:49:43,791 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-24 09:49:43,791 - INFO - [2025-09-24 09:49:43] Salary detection complete
2025-09-24 09:50:21,300 - INFO - Shutting down Salary Analytics API...
2025-09-24 09:59:27,300 - INFO - generated new fontManager
2025-09-24 09:59:30,871 - INFO - Initializing pipeline...
2025-09-24 09:59:30,872 - INFO - [2025-09-24 09:59:30] Detecting salary...
2025-09-24 09:59:30,872 - INFO - Started autonomous salary detection loop.
2025-09-24 09:59:30,877 - INFO - Server running on hostname: bfe07c2f7da2
2025-09-24 09:59:30,878 - INFO - Server IP address: 172.25.0.2
2025-09-24 09:59:30,878 - INFO - Server is accessible at:
2025-09-24 09:59:30,883 - INFO - - http://localhost:8000
2025-09-24 09:59:30,884 - INFO - - http://127.0.0.1:8000
2025-09-24 09:59:30,887 - INFO - - http://172.25.0.2:8000
2025-09-24 09:59:30,889 - INFO - Pipeline initialized successfully
2025-09-24 09:59:32,676 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: {
"data": [],
"error": {},
"message": "AutoCall Add Salary Successful",
"status": true,
"statusCode": 200
}
2025-09-24 09:59:32,705 - INFO - [2025-09-24 09:59:32] Salary detection complete
@@ -1,17 +1,21 @@
from flask import Flask
import os
from .extensions import db, migrate
from app.extensions import db, migrate
def create_app():
app = Flask(__name__)
app.config.from_object('salary_analytics.config')
# Load configuration from config.py
app.config.from_object('app.config')
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
# Register blueprints or CLI commands here if needed
from . import commands
from app.api.commands import commands
app.cli.add_command(commands.upload_xls_cli)
return app
@@ -2,8 +2,8 @@ import click
import pandas as pd
from datetime import datetime
from flask.cli import with_appcontext
from salary_analytics.app.extensions import db
from salary_analytics.app.models import RawTransaction
from app.extensions import db
from app.models import RawTransaction
@click.group()
def commands():
+22 -10
View File
@@ -22,20 +22,32 @@ os.makedirs(PLOTS_DIR, exist_ok=True)
os.makedirs(CSV_DIR, exist_ok=True)
os.makedirs(MODEL_DIR, exist_ok=True)
# Database Configuration
DB_CONFIG = {
"user": os.getenv("DB_USER"), # Default value as fallback
"password": os.getenv("DB_PASSWORD"),
"name": os.getenv("DB_NAME"),
"port": os.getenv("DB_PORT"),
"host": os.getenv("DB_HOST")
"user": os.getenv("DATABASE_USER"),
"password": os.getenv("DATABASE_PASSWORD"),
"name": os.getenv("DATABASE_NAME"),
"port": os.getenv("DATABASE_PORT", 10532),
"host": os.getenv("DATABASE_HOST", "firstadvancedev"),
"sid": os.getenv("DATABASE_SID", "FREE"),
"service_name": os.getenv("DATABASE_SERVICE_NAME", "firstadv")
}
# DNS = f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={DB_CONFIG['host']})(PORT={DB_CONFIG['port']}))(CONNECT_DATA=(SID={DB_CONFIG['sid']})))"
# Database Connection
# SQLALCHEMY_DATABASE_URI = (f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DNS}")
SQLALCHEMY_DATABASE_URI = ( f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/?service_name={DB_CONFIG['service_name']}")
# SQLAlchemy Configuration
SQLALCHEMY_DATABASE_URI = (
f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
)
# SQLALCHEMY_DATABASE_URI = (
# f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
# f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
# )
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Table Configuration
@@ -80,7 +92,7 @@ OUTPUT_PATHS = {
}
SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337")
SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS","api/rac-check")
SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS", "api/rac-check")
# Salary Detect Endpoint Config
SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect"
+4
View File
@@ -0,0 +1,4 @@
from .raw_transaction import RawTransaction
__all__ = ['RawTransaction']
+18
View File
@@ -0,0 +1,18 @@
from app.extensions import db
class Account(db.Model):
__tablename__ = "accounts"
customerid = db.Column(db.String(50), primary_key=True)
accountid = db.Column(db.String(11), nullable=False)
registrationdate = db.Column(db.DateTime)
accountcurrencycode = db.Column(db.String(3))
schemecode = db.Column(db.String(5))
lastinflowtransactiondate = db.Column(db.DateTime)
accountstatus = db.Column(db.String(50))
accountstatusdate = db.Column(db.DateTime)
def __repr__(self):
return f"<Account {self.accountid}>"
+97
View File
@@ -0,0 +1,97 @@
from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean, func
from sqlalchemy.orm import declarative_base, Session
from datetime import datetime
from app.utils.logger import logger
from app.extensions import db
class BatchResult(db.Model):
__tablename__ = "salary_analytics_batch_results"
id = Column(Integer, primary_key=True, autoincrement=True)
batch_number = Column(Integer, nullable=False)
total_batches = Column(Integer, nullable=False)
processed_at = Column(DateTime, default=datetime.utcnow)
accountid = Column(String, nullable=False)
num_months = Column(Integer)
least_inflow_6m = Column(Numeric)
avg_monthly_salary = Column(Numeric)
estimated_next_amount = Column(Numeric)
estimated_next_date = Column(DateTime)
is_45day_salary = Column(Boolean, default=False)
is_2months_salary = Column(Boolean, default=False)
status = Column(String, default="success")
@classmethod
def save_batch(cls, batch_number, total_batches, results_df, status="success"):
"""Save batch results into DB using ORM bulk insert."""
try:
results_df["batch_number"] = batch_number
results_df["total_batches"] = total_batches
results_df["processed_at"] = datetime.utcnow()
results_df["status"] = status
# Normalize boolean columns
results_df["is_45day_salary"] = results_df.get("45daysalary", False)
results_df["is_2months_salary"] = results_df.get("2monthssalary", False)
# Convert to list of ORM objects
records = [
cls(**row)
for row in results_df.to_dict("records")
]
db.session.bulk_save_objects(records)
db.session.commit()
logger.info(f"Saved batch {batch_number} successfully.")
return True
except Exception as e:
db.session.rollback()
logger.error(f"Error saving batch {batch_number}: {str(e)}")
return False
@classmethod
def get_batch_status(cls, batch_number: int):
"""Return summary info about one batch."""
try:
result = (
db.session.query(
cls.batch_number,
cls.total_batches,
cls.processed_at,
func.count().label("total_records"),
func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"),
func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"),
)
.filter(cls.batch_number == batch_number)
.group_by(cls.batch_number, cls.total_batches, cls.processed_at)
.order_by(cls.processed_at.desc())
.first()
)
return dict(result._mapping) if result else None
except Exception as e:
logger.error(f"Error fetching batch {batch_number} status: {str(e)}")
return None
@classmethod
def get_all_batches(cls):
"""Return summaries for all batches."""
try:
results = (
db.session.query(
cls.batch_number,
cls.total_batches,
cls.processed_at,
func.count().label("total_records"),
func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"),
func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"),
)
.group_by(cls.batch_number, cls.total_batches, cls.processed_at)
.order_by(cls.batch_number)
.all()
)
return [dict(r._mapping) for r in results]
except Exception as e:
logger.error(f"Error fetching all batches: {str(e)}")
return []
@@ -0,0 +1,100 @@
from venv import logger
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from app.extensions import db
import pandas as pd
class CustomerAccountTransactionHx(db.Model):
__tablename__ = "customer_account_transaction_hx"
id = Column(Integer, primary_key=True, autoincrement=True)
accountid = Column(String(64), nullable=False, index=True)
trx_type = Column(String(50), nullable=False)
amount = Column(Float, nullable=False)
description = Column(String(255))
customer_id = Column(String(64))
trx_start_date = Column(DateTime, nullable=False)
trx_end_date = Column(DateTime)
is_salary_related = Column(Integer, default=0)
is_consistent_amount = Column(Integer, default=0)
is_salary_type = Column(Integer, default=0)
@classmethod
def get_all(cls):
"""Fetch all transactions."""
return db.session.query(cls).all()
@classmethod
def get_rows_count(cls):
"""Return total number of transaction rows."""
try:
count = db.session.query(db.func.count(cls.id)).scalar()
return count
except Exception as e:
logger.error(f"Error getting row count: {str(e)}")
return None
@classmethod
def get_by_account(cls, accountid: str):
"""Fetch transactions for a given account."""
return db.session.query(cls).filter_by(accountid=accountid).all()
@classmethod
def get_accounts(cls, limit=None):
"""Fetch distinct account IDs."""
query = db.session.query(cls.accountid).distinct()
if limit:
query = query.limit(limit)
return [row.accountid for row in query.all()]
@classmethod
def insert_transaction(cls, **kwargs):
"""Insert a new transaction."""
trx = cls(**kwargs)
try:
db.session.add(trx)
db.session.commit()
except Exception as e:
logger.error(f"Error inserting transaction: {str(e)}")
return None
return trx
@classmethod
def bulk_insert(cls, transactions: list[dict]):
"""Insert multiple transactions at once."""
objs = [cls(**trx) for trx in transactions]
try:
db.session.bulk_save_objects(objs)
db.session.commit()
except Exception as e:
logger.error(f"Error in bulk insert: {str(e)}")
return None
return objs
@classmethod
def get_transactions_df(cls, accountids: list[str] = None):
"""Return a Pandas DataFrame for ML model preparation."""
query = db.session.query(cls)
if accountids:
query = query.filter(cls.accountid.in_(accountids))
rows = query.all()
df = pd.DataFrame([{
"id": trx.id,
"accountid": trx.accountid,
"trx_type": trx.trx_type,
"amount": trx.amount,
"description": trx.description,
"customer_id": trx.customer_id,
"trx_start_date": trx.trx_start_date,
"trx_end_date": trx.trx_end_date,
"is_salary_related": trx.is_salary_related,
"is_consistent_amount": trx.is_consistent_amount,
"is_salary_type": trx.is_salary_type,
} for trx in rows])
return df
@@ -2,12 +2,10 @@
Database operations module for salary analytics.
"""
import logging
from sqlalchemy import text
from .config import BATCH_RESULTS_TABLE
from app.config import BATCH_RESULTS_TABLE
from datetime import datetime
logger = logging.getLogger(__name__)
from app.utils.logger import logger
class DatabaseOperations:
def __init__(self, engine):
@@ -1,4 +1,4 @@
from .extensions import db
from app.extensions import db
class RawTransaction(db.Model):
__tablename__ = 'analytics_raw_transactions'
+36
View File
@@ -0,0 +1,36 @@
from app.extensions import db
class SimbrellaCustomer(db.Model):
__tablename__ = "simbrella_customers"
customerid = db.Column(db.String(500), primary_key=True)
acct_opn_date = db.Column(db.DateTime)
gender = db.Column(db.String(20))
birth_date = db.Column(db.DateTime)
msisdn = db.Column(db.String(100))
isbvnvalid = db.Column(db.String(100))
datebvnvalidated = db.Column(db.DateTime)
iscrmsvalid = db.Column(db.String(100))
datecrmsvalidated = db.Column(db.DateTime)
iscrcvalid = db.Column(db.String(100))
datecrcvalidated = db.Column(db.DateTime)
def __repr__(self):
return f"<SimbrellaCustomer {self.customerid}>"
def to_dict(self):
return {
"customerid": self.customerid,
"acct_opn_date": self.acct_opn_date,
"gender": self.gender,
"birth_date": self.birth_date,
"msisdn": self.msisdn,
"isbvnvalid": self.isbvnvalid,
"datebvnvalidated": self.datebvnvalidated,
"iscrmsvalid": self.iscrmsvalid,
"datecrmsvalidated": self.datecrmsvalidated,
"iscrcvalid": self.iscrcvalid,
"datecrcvalidated": self.datecrcvalidated,
}
+29
View File
@@ -0,0 +1,29 @@
from app.extensions import db
class TempSimbrellaLien(db.Model):
__tablename__ = "tmp_simbrella_lien"
lienid = db.Column(db.String(33), primary_key=True)
customerid = db.Column(db.String(500))
accountid = db.Column(db.String(11))
lien_start_date = db.Column(db.DateTime)
action = db.Column(db.String(5))
restriction_nature = db.Column(db.String(150))
claim_amount = db.Column(db.Numeric(20, 4))
def __repr__(self):
return f"<TempSimbrellaLien {self.lienid}>"
def to_dict(self):
return {
"lienid": self.lienid,
"customerid": self.customerid,
"accountid": self.accountid,
"lien_start_date": self.lien_start_date,
"action": self.action,
"restriction_nature": self.restriction_nature,
"claim_amount": float(self.claim_amount) if self.claim_amount is not None else None,
}
+45
View File
@@ -0,0 +1,45 @@
from app.extensions import db
class Transaction(db.Model):
__tablename__ = "transactions"
tran_id = db.Column(db.String(50), primary_key=True)
cif_id = db.Column(db.String(500))
foracid = db.Column(db.String(150))
acid = db.Column(db.String(150))
tran_date = db.Column(db.DateTime)
value_date = db.Column(db.DateTime)
pstd_date = db.Column(db.DateTime)
tran_sub_type = db.Column(db.String(50))
part_tran_type = db.Column(db.String(50))
tran_crncy_code = db.Column(db.String(50))
tran_amt = db.Column(db.Numeric(38, 0))
tran_particular = db.Column(db.String(250))
origination_channel = db.Column(db.String(150))
reversal_tran_id = db.Column(db.String(50))
isreversal = db.Column(db.String(50))
def __repr__(self):
return f"<Transaction {self.tran_id}>"
def to_dict(self):
return {
"tran_id": self.tran_id,
"cif_id": self.cif_id,
"foracid": self.foracid,
"acid": self.acid,
"tran_date": self.tran_date,
"value_date": self.value_date,
"pstd_date": self.pstd_date,
"tran_sub_type": self.tran_sub_type,
"part_tran_type": self.part_tran_type,
"tran_crncy_code": self.tran_crncy_code,
"tran_amt": float(self.tran_amt) if self.tran_amt is not None else None,
"tran_particular": self.tran_particular,
"origination_channel": self.origination_channel,
"reversal_tran_id": self.reversal_tran_id,
"isreversal": self.isreversal,
}
+45
View File
@@ -0,0 +1,45 @@
from app.extensions import db
class TransactionStg(db.Model):
__tablename__ = "transaction_stg"
tran_id = db.Column(db.String(50), primary_key=True)
cif_id = db.Column(db.String(500))
foracid = db.Column(db.String(150))
acid = db.Column(db.String(150))
tran_date = db.Column(db.String(50)) # staging table keeps as string
value_date = db.Column(db.String(50))
pstd_date = db.Column(db.String(50))
tran_sub_type = db.Column(db.String(50))
part_tran_type = db.Column(db.String(50))
tran_crncy_code = db.Column(db.String(50))
tran_amt = db.Column(db.Numeric(38, 0))
tran_particular = db.Column(db.String(250))
origination_channel = db.Column(db.String(150))
reversal_tran_id = db.Column(db.String(50))
isreversal = db.Column(db.String(50))
def __repr__(self):
return f"<TransactionStg {self.tran_id}>"
def to_dict(self):
return {
"tran_id": self.tran_id,
"cif_id": self.cif_id,
"foracid": self.foracid,
"acid": self.acid,
"tran_date": self.tran_date,
"value_date": self.value_date,
"pstd_date": self.pstd_date,
"tran_sub_type": self.tran_sub_type,
"part_tran_type": self.part_tran_type,
"tran_crncy_code": self.tran_crncy_code,
"tran_amt": float(self.tran_amt) if self.tran_amt is not None else None,
"tran_particular": self.tran_particular,
"origination_channel": self.origination_channel,
"reversal_tran_id": self.reversal_tran_id,
"isreversal": self.isreversal,
}
+38
View File
@@ -0,0 +1,38 @@
from fastapi import FastAPI
from app.salary_analytics.routes import analysis, reports, pipeline, load, base, train
from app.salary_analytics.middlewares.middleware import add_middlewares
from app.salary_analytics.events.lifecycle import register_events
from app.utils.logger import logger
import socket
"""
Salary Analytics Package
A package for analyzing and predicting salary patterns from transaction data.
"""
__version__ = "0.1.0"
def create_app() -> FastAPI:
app = FastAPI(
title="Salary Analytics API",
description="API for analyzing and predicting salary patterns from transaction data",
version="1.0.0"
)
# Middlewares
add_middlewares(app)
# Events
register_events(app)
# Routers
app.include_router(base.router, tags=["Base"])
app.include_router(analysis.router, prefix="/analyze", tags=["Analysis"])
app.include_router(reports.router, tags=["Reports"])
app.include_router(pipeline.router, tags=["Pipeline"])
app.include_router(load.router, tags=["Data"])
app.include_router(train.router, tags=["Model Training"])
return app
+64
View File
@@ -0,0 +1,64 @@
from app.salary_analytics.services.main import SalaryAnalyticsPipeline
from app.salary_analytics.services.data_loader import DataLoader
class GlobalState:
def __init__(self):
self._pipeline = None
self._data_loader = None
self.df = None
self.salary_predictor = None
self.salary_earner_analyzer = None
# ---- Pipeline ----
@property
def pipeline(self):
if self._pipeline is None:
self._pipeline = SalaryAnalyticsPipeline()
return self._pipeline
@pipeline.setter
def pipeline(self, value):
self._pipeline = value
# ---- Data Loader ----
@property
def data_loader(self):
if self._data_loader is None:
self._data_loader = DataLoader()
return self._data_loader
@data_loader.setter
def data_loader(self, value):
self._data_loader = value
# ---- DataFrame ----
@property
def df(self):
return self._df
@df.setter
def df(self, value):
self._df = value
# ---- Salary Predictor ----
@property
def salary_predictor(self):
return self._salary_predictor
@salary_predictor.setter
def salary_predictor(self, value):
self._salary_predictor = value
# ---- Salary Earner Analyzer ----
@property
def salary_earner_analyzer(self):
return self._salary_earner_analyzer
@salary_earner_analyzer.setter
def salary_earner_analyzer(self, value):
self._salary_earner_analyzer = value
state = GlobalState()
+36
View File
@@ -0,0 +1,36 @@
import socket
from fastapi import FastAPI
from app.salary_analytics.integrations.salary_detect import SalaryDetect
from app.utils.logger import logger
salary_detect = SalaryDetect()
def register_events(app: FastAPI):
@app.on_event("startup")
async def startup_event():
"""Initialize the pipeline on startup."""
try:
logger.info("Initializing pipeline...")
# Start autonomous salary detection loop
salary_detect.start()
logger.info("Started autonomous salary detection loop.")
# Print network information
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
logger.info(f"Server running on hostname: {hostname}")
logger.info(f"Server IP address: {ip_address}")
logger.info(f"Server is accessible at:")
logger.info(f"- http://localhost:8000")
logger.info(f"- http://127.0.0.1:8000")
logger.info(f"- http://{ip_address}:8000")
logger.info("Pipeline initialized successfully")
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
raise
@app.on_event("shutdown")
async def shutdown_event():
logger.info("Shutting down Salary Analytics API...")
@@ -0,0 +1,12 @@
from fastapi import HTTPException
from app.salary_analytics.core.state import state
def check_data_loaded():
"""Raise HTTP 400 if no data is loaded into the pipeline."""
if state.pipeline.df is None:
raise HTTPException(
status_code=400,
detail="No data loaded. Please load data first using the /load-data endpoint."
)
return True
@@ -0,0 +1,17 @@
from typing import Optional, Dict, List, Union
from pydantic import BaseModel
class AnalysisResponse(BaseModel):
"""Response model for analysis endpoints."""
message: str
data: Optional[Dict] = None
file_path: Optional[str] = None
class BatchResponse(BaseModel):
"""Response model for batch processing."""
batch_number: int
total_batches: int
processed_rows: int
results_path: str
message: str
@@ -1,10 +1,8 @@
from django.conf import settings
import httpx
import json
from salary_analytics.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS
import logging
logger = logging.getLogger(__name__)
from app.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS
from app.utils.logger import logger
class SimbrellaIntegration:
BASE_URL = SIMBRELLA_BASE_URL
@@ -1,11 +1,8 @@
import time
import logging
import threading
import requests
from .config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
from app.config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload
from app.utils.logger import logger
class SalaryDetect:
def __init__(self):
@@ -0,0 +1,11 @@
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI
def add_middlewares(app: FastAPI):
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
+75
View File
@@ -0,0 +1,75 @@
from fastapi import APIRouter, HTTPException
from app.salary_analytics.helpers.response_helpers import AnalysisResponse
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.utils.logger import logger
import time
from app.salary_analytics.core.state import state
router = APIRouter()
@router.post("/keyword", response_model=AnalysisResponse)
async def analyze_keyword():
"""Run keyword-based salary transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting keyword analysis...")
data = state.pipeline.run_keyword_analysis()
logger.info(f"Keyword analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Keyword analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in keyword analysis: {str(e)}")
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/consistent-amount", response_model=AnalysisResponse)
async def analyze_consistent_amount():
"""Run consistent amount transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting consistent amount analysis...")
data = state.pipeline.run_consistent_amount_analysis()
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Consistent amount analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in consistent amount analysis: {str(e)}")
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/transaction-type", response_model=AnalysisResponse)
async def analyze_transaction_type():
"""Run transaction type analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting transaction type analysis...")
data = state.pipeline.run_transaction_type_analysis()
logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Transaction type analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in transaction type analysis: {str(e)}")
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+28
View File
@@ -0,0 +1,28 @@
from fastapi import APIRouter
from app.utils.logger import logger
import time
router = APIRouter()
@router.get("/")
async def root():
"""Root endpoint."""
start_time = time.time()
logger.info("Root endpoint accessed")
response = {"message": "Welcome to Salary Analytics API"}
logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
return response
@router.get("/health")
async def health_check():
"""Health check endpoint."""
start_time = time.time()
logger.info("Health check endpoint accessed")
response = {"status": "healthy"}
logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
return response
+74
View File
@@ -0,0 +1,74 @@
from fastapi import APIRouter, HTTPException, UploadFile, File
from app.salary_analytics.core.state import state
from app.utils.logger import logger
import tempfile, os, time
from typing import Optional
router = APIRouter()
@router.post("/load-data")
async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
"""
Load data from either database or CSV file.
Args:
source (str): Source of data ('db' or 'csv')
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
dict: Status of data loading
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
success = state.pipeline.load_data(source='csv', file_path=temp_file_path)
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
success = state.pipeline.load_data(source='db')
if not success:
logger.error("Failed to load data")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to load data")
response = {
"status": "success",
"message": f"Successfully loaded {len(state.pipeline.df)} rows of data",
"columns": state.pipeline.df.columns.tolist(),
"row_count": len(state.pipeline.df)
}
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/load-data-with-file")
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
"""Dependency to handle file upload only when source is csv."""
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
return file
+292
View File
@@ -0,0 +1,292 @@
from fastapi import APIRouter, HTTPException
from app.salary_analytics.services.main import SalaryAnalyticsPipeline
from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.salary_analytics.services.data_loader import DataLoader
from app.salary_analytics.core.state import state
from app.models.db_operations import DatabaseOperations
from app.config import OUTPUT_PATHS, TABLE_NAME
from app.utils.logger import logger
from typing import Optional, List, Union
from sqlalchemy import text
from datetime import datetime
import pandas as pd, os, tempfile, time
from typing import Optional, Union
from fastapi import UploadFile, File
router = APIRouter()
@router.post("/run/pipeline", response_model=AnalysisResponse)
async def run_full_pipeline():
"""Run the complete salary analytics pipeline."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting full pipeline...")
success = state.pipeline.run_full_pipeline()
if not success:
logger.error("Pipeline failed")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Pipeline failed")
logger.info("Pipeline completed successfully")
response = AnalysisResponse(
message="Pipeline completed successfully"
)
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.post("/run/streaming-pipeline", response_model=List[BatchResponse])
async def run_streaming_pipeline(
source: str = "db",
batch_size: int = 10000,
file: Optional[Union[UploadFile, str]] = File(None)
):
"""
Run the complete salary analytics pipeline in batches.
Args:
source (str): Source of data ('db' or 'csv')
batch_size (int): Number of rows to process in each batch
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
List[BatchResponse]: List of responses for each batch processed
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader
state.data_loader = DataLoader()
state.data_loader.chunk_size = batch_size
# Create output directory for batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True)
# Initialize database operations
if not state.data_loader.connect():
logger.error("Failed to connect to database")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to connect to database")
db_ops = DatabaseOperations(state.data_loader.engine)
if not db_ops.create_batch_results_table():
logger.error("Failed to create batch results table")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to create batch results table")
responses = []
batch_number = 0
batch_start_time = time.time()
def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
return chunk
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process CSV in chunks
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
batch_number += 1
logger.info(f"Processing batch {batch_number}")
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
state.pipeline = SalaryAnalyticsPipeline()
state.pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
state.pipeline.run_keyword_analysis()
state.pipeline.run_consistent_amount_analysis()
state.pipeline.run_transaction_type_analysis()
# Generate reports
reports = state.pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = -1 # Unknown for CSV
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
# Process database in chunks
if not state.data_loader.connect():
raise HTTPException(status_code=500, detail="Failed to connect to database")
# Get total row count
with state.data_loader.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
total_batches = (total_rows + batch_size - 1) // batch_size
offset = 0
while offset < total_rows:
batch_number += 1
logger.info(f"Processing batch {batch_number} of {total_batches}")
# Load chunk from database
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
chunk = pd.read_sql(query, state.data_loader.engine)
if chunk.empty:
break
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
state.pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
state.pipeline.run_keyword_analysis()
state.pipeline.run_consistent_amount_analysis()
state.pipeline.run_transaction_type_analysis()
# Generate reports
reports = state.pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number} of {total_batches}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
offset += batch_size
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return responses
except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+78
View File
@@ -0,0 +1,78 @@
from fastapi import APIRouter, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse
from app.salary_analytics.helpers.response_helpers import AnalysisResponse
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.salary_analytics.core.state import state
from app.config import OUTPUT_PATHS
from app.utils.logger import logger
import os, time
router = APIRouter()
@router.post("/generate/reports", response_model=AnalysisResponse)
async def generate_reports(background_tasks: BackgroundTasks):
"""Generate salary earner reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting report generation...")
reports = state.pipeline.generate_salary_earner_reports()
logger.info("Reports generated successfully")
response = AnalysisResponse(
message="Reports generated successfully",
data={
"verified_salary_earners": len(reports['final_table']),
"likely_salary_earners": len(reports['likely_salary_earner']),
"high_earners": reports['total_high_earners']
}
)
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in report generation: {str(e)}")
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@router.get("/download/{report_type}")
async def download_report(report_type: str):
"""Download generated reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info(f"Attempting to download report: {report_type}")
file_paths = {
"high_earners": OUTPUT_PATHS["high_earner_details"],
"likely_earners": OUTPUT_PATHS["likely_salary_earner"],
"final_table": OUTPUT_PATHS["final_table"],
"consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
"inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
"hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
}
if report_type not in file_paths:
logger.error(f"Report type not found: {report_type}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report type not found")
file_path = file_paths[report_type]
if not os.path.exists(file_path):
logger.error(f"Report file not found: {file_path}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report file not found")
logger.info(f"Successfully found report file: {file_path}")
response = FileResponse(
path=file_path,
filename=os.path.basename(file_path),
media_type="application/octet-stream"
)
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error downloading report: {str(e)}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+33
View File
@@ -0,0 +1,33 @@
import time
import logging
from fastapi import APIRouter, HTTPException
from app.salary_analytics.services.main import SalaryAnalyticsPipeline
from app.salary_analytics.helpers.data_checks import check_data_loaded
from app.salary_analytics.helpers.response_helpers import AnalysisResponse
from app.salary_analytics.core.state import state
from app.utils.logger import logger
router = APIRouter()
@router.post("/train/models", response_model=AnalysisResponse)
async def train_models():
"""Train salary prediction models."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting model training...")
state.pipeline.train_salary_prediction_models()
logger.info("Models trained successfully")
response = AnalysisResponse(
message="Models trained successfully"
)
logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in model training: {str(e)}")
logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+24
View File
@@ -0,0 +1,24 @@
from .main import SalaryAnalyticsPipeline
from .data_loader import DataLoader
from .keyword_analyzer import KeywordAnalyzer
from .consistent_amount_analyzer import ConsistentAmountAnalyzer
from .transaction_type_analyzer import TransactionTypeAnalyzer
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .salary_predictor import SalaryPredictor
"""
Salary Analytics Package
A package for analyzing and predicting salary patterns from transaction data.
"""
__version__ = "0.1.0"
__all__ = [
"SalaryAnalyticsPipeline",
"DataLoader",
"KeywordAnalyzer",
"ConsistentAmountAnalyzer",
"TransactionTypeAnalyzer",
"SalaryEarnerAnalyzer",
"SalaryPredictor"
]
@@ -3,7 +3,7 @@ Consistent amount transaction analysis module.
"""
import pandas as pd
from .config import MODEL_CONFIG
from app.config import MODEL_CONFIG
class ConsistentAmountAnalyzer:
def __init__(self, df):
@@ -7,9 +7,8 @@ import pandas as pd
from datetime import datetime
import logging
import os
from .config import DB_CONFIG, TABLE_NAME
logger = logging.getLogger(__name__)
from app.config import SQLALCHEMY_DATABASE_URI, TABLE_NAME
from app.utils.logger import logger
class DataLoader:
def __init__(self):
@@ -21,8 +20,7 @@ class DataLoader:
"""Establish database connection."""
try:
logger.info("Attempting to connect to database...")
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
self.engine = create_engine(DATABASE_URL)
self.engine = create_engine(SQLALCHEMY_DATABASE_URI)
with self.engine.connect() as conn:
# First check if table exists
check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{TABLE_NAME}')")
@@ -4,7 +4,7 @@ Keyword-based salary transaction analysis module.
import re
import pandas as pd
from .config import SALARY_KEYWORDS
from app.config import SALARY_KEYWORDS
class KeywordAnalyzer:
def __init__(self, df):
@@ -9,8 +9,7 @@ from .consistent_amount_analyzer import ConsistentAmountAnalyzer
from .transaction_type_analyzer import TransactionTypeAnalyzer
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .salary_predictor import SalaryPredictor
logger = logging.getLogger(__name__)
from app.utils.logger import logger
class SalaryAnalyticsPipeline:
def __init__(self):
@@ -6,15 +6,8 @@ import pandas as pd
import matplotlib.pyplot as plt
from matplotlib_venn import venn3
from datetime import datetime, timedelta
import logging
from .config import MODEL_CONFIG, OUTPUT_PATHS
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
from app.config import MODEL_CONFIG, OUTPUT_PATHS
from app.utils.logger import logger
class SalaryEarnerAnalyzer:
def __init__(self, df):
@@ -9,7 +9,7 @@ from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from joblib import dump
from .config import OUTPUT_PATHS
from app.config import OUTPUT_PATHS
class SalaryPredictor:
def __init__(self, df):
@@ -3,7 +3,7 @@ Transaction type analysis module.
"""
import pandas as pd
from .config import MODEL_CONFIG
from app.config import MODEL_CONFIG
class TransactionTypeAnalyzer:
def __init__(self, df):
+13
View File
@@ -0,0 +1,13 @@
import logging
# Configure logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
# logging.StreamHandler(),
logging.FileHandler("app.log", mode='a') # Log to file
]
)
logger = logging.getLogger("DetectionService")
+8 -7
View File
@@ -1,16 +1,17 @@
services:
digifi-analytics:
build: .
env_file:
- .env
ports:
- "${APP_PORT:-4800}:8000"
volumes:
- ./output:/app/output
environment:
- DB_USER=salaryloan
- DB_PASSWORD=salaryloan
- DB_NAME=salaryloan
- DB_PORT=10532
- DB_HOST=dev-data.simbrellang.net
- FLASK_APP=${FLASK_APP}
- FLASK_ENV=${FLASK_ENV}
- DATABASE_URL=postgresql+psycopg2://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_NAME}
volumes:
- .:/app
- ./output:/app/output
restart: unless-stopped
networks:
- salary_network
+1 -1
View File
@@ -19,7 +19,7 @@ if config.config_file_name is not None:
# from myapp import Base
# target_metadata = Base.metadata
from flask import current_app
from salary_analytics.app.extensions import db
from app.extensions import db
config.set_main_option('sqlalchemy.url',
current_app.config.get('SQLALCHEMY_DATABASE_URI'))
+7 -1
View File
@@ -1,4 +1,8 @@
# Database and ORM
sqlalchemy>=2.0.0
oracledb>=1.0.0
pandas>=1.5.0
numpy>=1.21.0
matplotlib>=3.5.0
@@ -17,4 +21,6 @@ openpyxl>=3.0.10
Flask>=2.0.0
Flask-SQLAlchemy>=3.0.0
Flask-Migrate>=4.0.0
alembic>=1.8.0
alembic>=1.8.0
requests>=2.26.0
gunicorn
+1 -1
View File
@@ -1,4 +1,4 @@
import os
from salary_analytics.app import create_app
from app.salary_analytics import create_app
app = create_app()
-6
View File
@@ -1,6 +0,0 @@
"""
Salary Analytics Package
A package for analyzing and predicting salary patterns from transaction data.
"""
__version__ = "0.1.0"
-605
View File
@@ -1,605 +0,0 @@
"""
FastAPI application for salary analytics.
"""
from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends
from fastapi.responses import FileResponse
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
from typing import Optional, Dict, List, Union
import os
import socket
import logging
import pandas as pd
import tempfile
from datetime import datetime
from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData
import numpy as np
import warnings
import time
from .main import SalaryAnalyticsPipeline
from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE
from .data_loader import DataLoader
from .salary_predictor import SalaryPredictor
from .salary_earner_analyzer import SalaryEarnerAnalyzer
from .db_operations import DatabaseOperations
from .salary_detect import SalaryDetect
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
# Suppress warnings
warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy')
pd.options.mode.chained_assignment = None
app = FastAPI(
title="Salary Analytics API",
description="API for analyzing and predicting salary patterns from transaction data",
version="1.0.0"
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Allows all origins
allow_credentials=True,
allow_methods=["*"], # Allows all methods
allow_headers=["*"], # Allows all headers
)
# Global pipeline instance
pipeline = SalaryAnalyticsPipeline()
# Global variables to store loaded data and models
data_loader = None
df = None
salary_predictor = None
salary_earner_analyzer = None
salary_detect = SalaryDetect()
class AnalysisResponse(BaseModel):
"""Response model for analysis endpoints."""
message: str
data: Optional[Dict] = None
file_path: Optional[str] = None
class BatchResponse(BaseModel):
"""Response model for batch processing."""
batch_number: int
total_batches: int
processed_rows: int
results_path: str
message: str
def check_data_loaded():
"""Check if data is loaded before running analytics."""
if pipeline.df is None:
raise HTTPException(
status_code=400,
detail="No data loaded. Please load data first using the /load-data endpoint."
)
@app.on_event("startup")
async def startup_event():
"""Initialize the pipeline on startup."""
try:
logger.info("Initializing pipeline...")
# Start autonomous salary detection loop
salary_detect.start()
logger.info("Started autonomous salary detection loop.")
# Print network information
hostname = socket.gethostname()
ip_address = socket.gethostbyname(hostname)
logger.info(f"Server running on hostname: {hostname}")
logger.info(f"Server IP address: {ip_address}")
logger.info(f"Server is accessible at:")
logger.info(f"- http://localhost:8000")
logger.info(f"- http://127.0.0.1:8000")
logger.info(f"- http://{ip_address}:8000")
logger.info("Pipeline initialized successfully")
except Exception as e:
logger.error(f"Error during startup: {str(e)}")
raise
@app.get("/")
async def root():
"""Root endpoint."""
start_time = time.time()
logger.info("Root endpoint accessed")
response = {"message": "Welcome to Salary Analytics API"}
logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds")
return response
@app.get("/health")
async def health_check():
"""Health check endpoint."""
start_time = time.time()
logger.info("Health check endpoint accessed")
response = {"status": "healthy"}
logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds")
return response
@app.post("/analyze/keyword", response_model=AnalysisResponse)
async def analyze_keyword():
"""Run keyword-based salary transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting keyword analysis...")
data = pipeline.run_keyword_analysis()
logger.info(f"Keyword analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Keyword analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in keyword analysis: {str(e)}")
logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/consistent-amount", response_model=AnalysisResponse)
async def analyze_consistent_amount():
"""Run consistent amount transaction analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting consistent amount analysis...")
data = pipeline.run_consistent_amount_analysis()
logger.info(f"Consistent amount analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Consistent amount analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in consistent amount analysis: {str(e)}")
logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/analyze/transaction-type", response_model=AnalysisResponse)
async def analyze_transaction_type():
"""Run transaction type analysis."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting transaction type analysis...")
data = pipeline.run_transaction_type_analysis()
logger.info(f"Transaction type analysis completed. Found {len(data)} matches")
response = AnalysisResponse(
message="Transaction type analysis completed successfully",
data={"count": len(data)}
)
logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in transaction type analysis: {str(e)}")
logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/generate/reports", response_model=AnalysisResponse)
async def generate_reports(background_tasks: BackgroundTasks):
"""Generate salary earner reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting report generation...")
reports = pipeline.generate_salary_earner_reports()
logger.info("Reports generated successfully")
response = AnalysisResponse(
message="Reports generated successfully",
data={
"verified_salary_earners": len(reports['final_table']),
"likely_salary_earners": len(reports['likely_salary_earner']),
"high_earners": reports['total_high_earners']
}
)
logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in report generation: {str(e)}")
logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/train/models", response_model=AnalysisResponse)
async def train_models():
"""Train salary prediction models."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting model training...")
pipeline.train_salary_prediction_models()
logger.info("Models trained successfully")
response = AnalysisResponse(
message="Models trained successfully"
)
logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in model training: {str(e)}")
logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/download/{report_type}")
async def download_report(report_type: str):
"""Download generated reports."""
start_time = time.time()
try:
check_data_loaded()
logger.info(f"Attempting to download report: {report_type}")
file_paths = {
"high_earners": OUTPUT_PATHS["high_earner_details"],
"likely_earners": OUTPUT_PATHS["likely_salary_earner"],
"final_table": OUTPUT_PATHS["final_table"],
"consistent_plot": OUTPUT_PATHS["consistent_earners_plot"],
"inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"],
"hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"]
}
if report_type not in file_paths:
logger.error(f"Report type not found: {report_type}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report type not found")
file_path = file_paths[report_type]
if not os.path.exists(file_path):
logger.error(f"Report file not found: {file_path}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=404, detail="Report file not found")
logger.info(f"Successfully found report file: {file_path}")
response = FileResponse(
path=file_path,
filename=os.path.basename(file_path),
media_type="application/octet-stream"
)
logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error downloading report: {str(e)}")
logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/run/pipeline", response_model=AnalysisResponse)
async def run_full_pipeline():
"""Run the complete salary analytics pipeline."""
start_time = time.time()
try:
check_data_loaded()
logger.info("Starting full pipeline...")
success = pipeline.run_full_pipeline()
if not success:
logger.error("Pipeline failed")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Pipeline failed")
logger.info("Pipeline completed successfully")
response = AnalysisResponse(
message="Pipeline completed successfully"
)
logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error in pipeline: {str(e)}")
logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/load-data")
async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)):
"""
Load data from either database or CSV file.
Args:
source (str): Source of data ('db' or 'csv')
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
dict: Status of data loading
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
success = pipeline.load_data(source='csv', file_path=temp_file_path)
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
success = pipeline.load_data(source='db')
if not success:
logger.error("Failed to load data")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to load data")
response = {
"status": "success",
"message": f"Successfully loaded {len(pipeline.df)} rows of data",
"columns": pipeline.df.columns.tolist(),
"row_count": len(pipeline.df)
}
logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds")
return response
except Exception as e:
logger.error(f"Error loading data: {str(e)}")
logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)):
"""Dependency to handle file upload only when source is csv."""
if source == 'csv' and not file:
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
return file
@app.post("/run/streaming-pipeline", response_model=List[BatchResponse])
async def run_streaming_pipeline(
source: str = "db",
batch_size: int = 10000,
file: Optional[Union[UploadFile, str]] = File(None)
):
"""
Run the complete salary analytics pipeline in batches.
Args:
source (str): Source of data ('db' or 'csv')
batch_size (int): Number of rows to process in each batch
file (UploadFile, optional): CSV file to load (required if source is 'csv')
Returns:
List[BatchResponse]: List of responses for each batch processed
"""
start_time = time.time()
try:
if source not in ['db', 'csv']:
logger.error(f"Invalid source: {source}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'")
if source == 'csv' and not file:
logger.error("No file provided for CSV source")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=400, detail="File must be provided when loading from CSV")
# Initialize data loader
data_loader = DataLoader()
data_loader.chunk_size = batch_size
# Create output directory for batch results
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}")
os.makedirs(batch_output_dir, exist_ok=True)
# Initialize database operations
if not data_loader.connect():
logger.error("Failed to connect to database")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to connect to database")
db_ops = DatabaseOperations(data_loader.engine)
if not db_ops.create_batch_results_table():
logger.error("Failed to create batch results table")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail="Failed to create batch results table")
responses = []
batch_number = 0
batch_start_time = time.time()
def preprocess_chunk(chunk):
"""Preprocess a chunk of data with the same logic as DataLoader."""
# Convert dates
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
# Rename columns
chunk = chunk.rename(columns={
'd1': 'trx_type',
'd2': 'trx_subtype',
'd3': 'initiated_by',
'd4': 'customer_id'
})
chunk = chunk.dropna()
return chunk
if source == 'csv':
# Save uploaded file temporarily
with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file:
content = await file.read()
temp_file.write(content)
temp_file_path = temp_file.name
try:
# Process CSV in chunks
for chunk in pd.read_csv(temp_file_path, chunksize=batch_size):
batch_number += 1
logger.info(f"Processing batch {batch_number}")
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = -1 # Unknown for CSV
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1, # Unknown for CSV
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=-1,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=-1,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
finally:
# Clean up temporary file
os.unlink(temp_file_path)
else:
# Process database in chunks
if not data_loader.connect():
raise HTTPException(status_code=500, detail="Failed to connect to database")
# Get total row count
with data_loader.engine.connect() as conn:
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
total_rows = conn.execute(count_query).scalar()
total_batches = (total_rows + batch_size - 1) // batch_size
offset = 0
while offset < total_rows:
batch_number += 1
logger.info(f"Processing batch {batch_number} of {total_batches}")
# Load chunk from database
query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}"
chunk = pd.read_sql(query, data_loader.engine)
if chunk.empty:
break
# Preprocess chunk
chunk = preprocess_chunk(chunk)
# Run pipeline on chunk
pipeline = SalaryAnalyticsPipeline()
pipeline.df = chunk
try:
batch_start_time = time.time()
# Run analyses
pipeline.run_keyword_analysis()
pipeline.run_consistent_amount_analysis()
pipeline.run_transaction_type_analysis()
# Generate reports
reports = pipeline.generate_salary_earner_reports()
# Add batch metadata to results
results_df = reports['final_table'].copy()
results_df['batch_number'] = batch_number
results_df['total_batches'] = total_batches
results_df['processed_at'] = datetime.now()
# Save batch results to CSV
batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv")
results_df.to_csv(batch_results_path, index=False)
# Save to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=results_df,
status="success"
)
logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds")
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path=batch_results_path,
message=f"Successfully processed batch {batch_number} of {total_batches}"
))
except Exception as e:
error_message = str(e)
logger.error(f"Error processing batch {batch_number}: {error_message}")
# Save error to database
db_ops.save_batch_to_db(
batch_number=batch_number,
total_batches=total_batches,
results_df=pd.DataFrame(), # Empty DataFrame for error case
status="error"
)
responses.append(BatchResponse(
batch_number=batch_number,
total_batches=total_batches,
processed_rows=len(chunk),
results_path="",
message=f"Error processing batch {batch_number}: {error_message}"
))
offset += batch_size
logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds")
return responses
except Exception as e:
logger.error(f"Error in streaming pipeline: {str(e)}")
logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds")
raise HTTPException(status_code=500, detail=str(e))
+7
View File
@@ -0,0 +1,7 @@
from app import create_app
app = create_app()
if __name__ != "__main__":
# Expose WSGI app instance for Gunicorn
wsgi_app = app