diff --git a/Dockerfile b/Dockerfile index 2109548..c74ecdb 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,20 +1,26 @@ FROM python:3.11-slim +# Set the working directory in the container WORKDIR /app +# Copy the current directory contents into the container at /app +COPY . /app + RUN apt-get update && apt-get install -y libpq-dev && rm -rf /var/lib/apt/lists/* COPY requirements.txt . RUN pip install -r requirements.txt -COPY salary_analytics/ ./salary_analytics/ RUN mkdir -p output/csv output/plots output/models -ENV PYTHONPATH=/app -ENV HOST=0.0.0.0 -ENV PORT=8000 + +# ENV FLASK_APP=wsgi.py + +ENV FLASK_APP=run.py. +ENV FLASK_RUN_HOST=0.0.0.0 EXPOSE 8000 -CMD ["uvicorn", "salary_analytics.api:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] \ No newline at end of file +# CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "wsgi:wsgi_app"] +CMD ["uvicorn", "run:app", "--host", "0.0.0.0", "--port", "8000", "--reload"] \ No newline at end of file diff --git a/app.log b/app.log new file mode 100644 index 0000000..f4039c5 --- /dev/null +++ b/app.log @@ -0,0 +1,644 @@ +2025-09-10 10:21:11,016 - INFO - Initializing pipeline... +2025-09-10 10:21:11,019 - INFO - [2025-09-10 10:21:11] Detecting salary... +2025-09-10 10:21:11,019 - INFO - Started autonomous salary detection loop. +2025-09-10 10:21:11,030 - INFO - Server running on hostname: 22bad35c69c3 +2025-09-10 10:21:11,035 - INFO - Server IP address: 172.25.0.2 +2025-09-10 10:21:11,036 - INFO - Server is accessible at: +2025-09-10 10:21:11,037 - INFO - - http://localhost:8000 +2025-09-10 10:21:11,039 - INFO - - http://127.0.0.1:8000 +2025-09-10 10:21:11,040 - INFO - - http://172.25.0.2:8000 +2025-09-10 10:21:11,041 - INFO - Pipeline initialized successfully +2025-09-10 10:21:11,532 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:21:11,533 - INFO - [2025-09-10 10:21:11] Salary detection complete +2025-09-10 10:21:19,451 - INFO - Initializing SalaryAnalyticsPipeline +2025-09-10 10:21:19,452 - INFO - Starting data loading process +2025-09-10 10:21:19,453 - INFO - No database connection. Attempting to connect... +2025-09-10 10:21:19,455 - INFO - Attempting to connect to database... +2025-09-10 10:21:27,271 - INFO - Table customer_account_transaction_hx exists with 5354307 rows +2025-09-10 10:21:27,488 - INFO - Connected successfully to database! +2025-09-10 10:21:27,715 - INFO - Loading data from table: customer_account_transaction_hx +2025-09-10 10:21:28,614 - INFO - Total rows to process: 5354307 +2025-09-10 10:21:28,852 - INFO - Loading chunk starting at offset 0 +2025-09-10 10:21:40,251 - INFO - Loading chunk starting at offset 10000 +2025-09-10 10:21:46,981 - INFO - Loading chunk starting at offset 20000 +2025-09-10 10:21:57,010 - INFO - Loading chunk starting at offset 30000 +2025-09-10 10:22:04,792 - INFO - Loading chunk starting at offset 40000 +2025-09-10 10:22:09,599 - INFO - Loading chunk starting at offset 50000 +2025-09-10 10:22:15,016 - INFO - Loading chunk starting at offset 60000 +2025-09-10 10:22:18,613 - INFO - Loading chunk starting at offset 70000 +2025-09-10 10:22:22,007 - INFO - Loading chunk starting at offset 80000 +2025-09-10 10:22:28,397 - INFO - Loading chunk starting at offset 90000 +2025-09-10 10:22:36,081 - INFO - Loading chunk starting at offset 100000 +2025-09-10 10:22:48,738 - INFO - Loading chunk starting at offset 110000 +2025-09-10 10:23:01,521 - INFO - Loading chunk starting at offset 120000 +2025-09-10 10:23:11,532 - INFO - [2025-09-10 10:23:11] Detecting salary... +2025-09-10 10:23:13,504 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:23:13,504 - INFO - [2025-09-10 10:23:13] Salary detection complete +2025-09-10 10:23:16,348 - INFO - Loading chunk starting at offset 130000 +2025-09-10 10:23:23,259 - INFO - Loading chunk starting at offset 140000 +2025-09-10 10:23:29,284 - INFO - Loading chunk starting at offset 150000 +2025-09-10 10:23:41,579 - INFO - Loading chunk starting at offset 160000 +2025-09-10 10:23:54,788 - INFO - Loading chunk starting at offset 170000 +2025-09-10 10:24:19,519 - INFO - Loading chunk starting at offset 180000 +2025-09-10 10:24:31,657 - INFO - Loading chunk starting at offset 190000 +2025-09-10 10:24:46,130 - INFO - Loading chunk starting at offset 200000 +2025-09-10 10:24:57,289 - INFO - Loading chunk starting at offset 210000 +2025-09-10 10:25:07,964 - INFO - Loading chunk starting at offset 220000 +2025-09-10 10:25:13,503 - INFO - [2025-09-10 10:25:13] Detecting salary... +2025-09-10 10:25:15,477 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:25:15,478 - INFO - [2025-09-10 10:25:15] Salary detection complete +2025-09-10 10:25:17,793 - INFO - Loading chunk starting at offset 230000 +2025-09-10 10:25:25,026 - INFO - Loading chunk starting at offset 240000 +2025-09-10 10:25:32,079 - INFO - Loading chunk starting at offset 250000 +2025-09-10 10:25:39,990 - INFO - Loading chunk starting at offset 260000 +2025-09-10 10:25:50,492 - INFO - Loading chunk starting at offset 270000 +2025-09-10 10:26:00,181 - INFO - Loading chunk starting at offset 280000 +2025-09-10 10:26:10,138 - INFO - Loading chunk starting at offset 290000 +2025-09-10 10:26:20,437 - INFO - Loading chunk starting at offset 300000 +2025-09-10 10:26:34,962 - INFO - Loading chunk starting at offset 310000 +2025-09-10 10:26:46,248 - INFO - Loading chunk starting at offset 320000 +2025-09-10 10:26:55,275 - INFO - Loading chunk starting at offset 330000 +2025-09-10 10:27:09,733 - INFO - Loading chunk starting at offset 340000 +2025-09-10 10:27:15,480 - INFO - [2025-09-10 10:27:15] Detecting salary... +2025-09-10 10:27:16,553 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:27:16,553 - INFO - [2025-09-10 10:27:16] Salary detection complete +2025-09-10 10:27:26,568 - INFO - Loading chunk starting at offset 350000 +2025-09-10 10:27:36,472 - INFO - Loading chunk starting at offset 360000 +2025-09-10 10:27:44,909 - INFO - Loading chunk starting at offset 370000 +2025-09-10 10:27:54,557 - INFO - Loading chunk starting at offset 380000 +2025-09-10 10:28:00,588 - INFO - Loading chunk starting at offset 390000 +2025-09-10 10:28:05,957 - INFO - Loading chunk starting at offset 400000 +2025-09-10 10:28:12,058 - INFO - Loading chunk starting at offset 410000 +2025-09-10 10:28:19,248 - INFO - Loading chunk starting at offset 420000 +2025-09-10 10:28:29,938 - INFO - Loading chunk starting at offset 430000 +2025-09-10 10:28:51,274 - INFO - Loading chunk starting at offset 440000 +2025-09-10 10:28:57,720 - INFO - Loading chunk starting at offset 450000 +2025-09-10 10:29:02,117 - INFO - Loading chunk starting at offset 460000 +2025-09-10 10:29:11,555 - INFO - Loading chunk starting at offset 470000 +2025-09-10 10:29:16,565 - INFO - [2025-09-10 10:29:16] Detecting salary... +2025-09-10 10:29:17,452 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:29:17,455 - INFO - [2025-09-10 10:29:17] Salary detection complete +2025-09-10 10:29:30,145 - INFO - Loading chunk starting at offset 480000 +2025-09-10 10:29:39,048 - INFO - Loading chunk starting at offset 490000 +2025-09-10 10:29:43,412 - INFO - Loading chunk starting at offset 500000 +2025-09-10 10:29:47,415 - INFO - Loading chunk starting at offset 510000 +2025-09-10 10:29:52,149 - INFO - Loading chunk starting at offset 520000 +2025-09-10 10:29:58,331 - INFO - Loading chunk starting at offset 530000 +2025-09-10 10:30:04,933 - INFO - Loading chunk starting at offset 540000 +2025-09-10 10:30:11,934 - INFO - Loading chunk starting at offset 550000 +2025-09-10 10:30:19,315 - INFO - Loading chunk starting at offset 560000 +2025-09-10 10:30:25,683 - INFO - Loading chunk starting at offset 570000 +2025-09-10 10:30:33,577 - INFO - Loading chunk starting at offset 580000 +2025-09-10 10:30:40,363 - INFO - Loading chunk starting at offset 590000 +2025-09-10 10:30:46,205 - INFO - Loading chunk starting at offset 600000 +2025-09-10 10:30:53,957 - INFO - Loading chunk starting at offset 610000 +2025-09-10 10:31:08,083 - INFO - Loading chunk starting at offset 620000 +2025-09-10 10:31:17,454 - INFO - [2025-09-10 10:31:17] Detecting salary... +2025-09-10 10:31:17,989 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:31:17,989 - INFO - [2025-09-10 10:31:17] Salary detection complete +2025-09-10 10:31:18,901 - INFO - Loading chunk starting at offset 630000 +2025-09-10 10:31:27,551 - INFO - Loading chunk starting at offset 640000 +2025-09-10 10:31:33,662 - INFO - Loading chunk starting at offset 650000 +2025-09-10 10:31:40,107 - INFO - Loading chunk starting at offset 660000 +2025-09-10 10:31:49,938 - INFO - Loading chunk starting at offset 670000 +2025-09-10 10:31:57,777 - INFO - Loading chunk starting at offset 680000 +2025-09-10 10:32:06,629 - INFO - Loading chunk starting at offset 690000 +2025-09-10 10:32:15,281 - INFO - Loading chunk starting at offset 700000 +2025-09-10 10:32:25,328 - INFO - Loading chunk starting at offset 710000 +2025-09-10 10:32:40,639 - INFO - Loading chunk starting at offset 720000 +2025-09-10 10:32:51,078 - INFO - Loading chunk starting at offset 730000 +2025-09-10 10:33:03,924 - INFO - Loading chunk starting at offset 740000 +2025-09-10 10:33:13,007 - INFO - Loading chunk starting at offset 750000 +2025-09-10 10:33:17,989 - INFO - [2025-09-10 10:33:17] Detecting salary... +2025-09-10 10:33:19,698 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:33:20,039 - INFO - [2025-09-10 10:33:20] Salary detection complete +2025-09-10 10:33:20,857 - INFO - Loading chunk starting at offset 760000 +2025-09-10 10:33:30,803 - INFO - Loading chunk starting at offset 770000 +2025-09-10 10:33:38,162 - INFO - Loading chunk starting at offset 780000 +2025-09-10 10:33:44,504 - INFO - Loading chunk starting at offset 790000 +2025-09-10 10:33:50,063 - INFO - Loading chunk starting at offset 800000 +2025-09-10 10:33:57,957 - INFO - Loading chunk starting at offset 810000 +2025-09-10 10:34:05,256 - INFO - Loading chunk starting at offset 820000 +2025-09-10 10:34:12,212 - INFO - Loading chunk starting at offset 830000 +2025-09-10 10:34:20,478 - INFO - Loading chunk starting at offset 840000 +2025-09-10 10:34:28,018 - INFO - Loading chunk starting at offset 850000 +2025-09-10 10:34:36,413 - INFO - Loading chunk starting at offset 860000 +2025-09-10 10:34:50,168 - INFO - Loading chunk starting at offset 870000 +2025-09-10 10:35:13,867 - INFO - Loading chunk starting at offset 880000 +2025-09-10 10:35:20,391 - INFO - [2025-09-10 10:35:20] Detecting salary... +2025-09-10 10:35:21,791 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:35:21,794 - INFO - [2025-09-10 10:35:21] Salary detection complete +2025-09-10 10:35:28,918 - INFO - Loading chunk starting at offset 890000 +2025-09-10 10:35:40,300 - INFO - Loading chunk starting at offset 900000 +2025-09-10 10:36:05,763 - INFO - Loading chunk starting at offset 910000 +2025-09-10 10:36:15,199 - INFO - Loading chunk starting at offset 920000 +2025-09-10 10:36:46,036 - INFO - Loading chunk starting at offset 930000 +2025-09-10 10:36:53,221 - INFO - Loading chunk starting at offset 940000 +2025-09-10 10:37:06,754 - INFO - Loading chunk starting at offset 950000 +2025-09-10 10:37:15,558 - INFO - Loading chunk starting at offset 960000 +2025-09-10 10:37:21,798 - INFO - [2025-09-10 10:37:21] Detecting salary... +2025-09-10 10:37:22,954 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:37:22,955 - INFO - [2025-09-10 10:37:22] Salary detection complete +2025-09-10 10:37:29,088 - INFO - Loading chunk starting at offset 970000 +2025-09-10 10:37:54,730 - INFO - Loading chunk starting at offset 980000 +2025-09-10 10:38:13,667 - INFO - Loading chunk starting at offset 990000 +2025-09-10 10:38:27,880 - INFO - Loading chunk starting at offset 1000000 +2025-09-10 10:39:07,546 - INFO - Loading chunk starting at offset 1010000 +2025-09-10 10:39:17,369 - INFO - Loading chunk starting at offset 1020000 +2025-09-10 10:39:23,176 - INFO - [2025-09-10 10:39:23] Detecting salary... +2025-09-10 10:39:24,444 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:39:24,445 - INFO - [2025-09-10 10:39:24] Salary detection complete +2025-09-10 10:39:26,048 - INFO - Loading chunk starting at offset 1030000 +2025-09-10 10:39:45,494 - INFO - Loading chunk starting at offset 1040000 +2025-09-10 10:39:50,480 - INFO - Loading chunk starting at offset 1050000 +2025-09-10 10:39:54,793 - INFO - Loading chunk starting at offset 1060000 +2025-09-10 10:39:59,415 - INFO - Loading chunk starting at offset 1070000 +2025-09-10 10:40:04,306 - INFO - Loading chunk starting at offset 1080000 +2025-09-10 10:40:10,517 - INFO - Loading chunk starting at offset 1090000 +2025-09-10 10:40:15,671 - INFO - Loading chunk starting at offset 1100000 +2025-09-10 10:40:21,662 - INFO - Loading chunk starting at offset 1110000 +2025-09-10 10:40:30,551 - INFO - Loading chunk starting at offset 1120000 +2025-09-10 10:40:47,391 - INFO - Loading chunk starting at offset 1130000 +2025-09-10 10:40:55,609 - INFO - Loading chunk starting at offset 1140000 +2025-09-10 10:41:04,496 - INFO - Loading chunk starting at offset 1150000 +2025-09-10 10:41:14,608 - INFO - Loading chunk starting at offset 1160000 +2025-09-10 10:41:24,447 - INFO - [2025-09-10 10:41:24] Detecting salary... +2025-09-10 10:41:28,537 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:41:28,548 - INFO - [2025-09-10 10:41:28] Salary detection complete +2025-09-10 10:42:00,867 - INFO - Loading chunk starting at offset 1170000 +2025-09-10 10:42:13,069 - INFO - Loading chunk starting at offset 1180000 +2025-09-10 10:42:21,593 - INFO - Loading chunk starting at offset 1190000 +2025-09-10 10:42:32,011 - INFO - Loading chunk starting at offset 1200000 +2025-09-10 10:42:37,982 - INFO - Loading chunk starting at offset 1210000 +2025-09-10 10:42:45,458 - INFO - Loading chunk starting at offset 1220000 +2025-09-10 10:42:54,545 - INFO - Loading chunk starting at offset 1230000 +2025-09-10 10:43:15,705 - INFO - Loading chunk starting at offset 1240000 +2025-09-10 10:43:28,549 - INFO - [2025-09-10 10:43:28] Detecting salary... +2025-09-10 10:43:31,640 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:43:31,641 - INFO - [2025-09-10 10:43:31] Salary detection complete +2025-09-10 10:43:43,344 - INFO - Loading chunk starting at offset 1250000 +2025-09-10 10:43:50,004 - INFO - Loading chunk starting at offset 1260000 +2025-09-10 10:43:59,430 - INFO - Loading chunk starting at offset 1270000 +2025-09-10 10:44:07,478 - INFO - Loading chunk starting at offset 1280000 +2025-09-10 10:44:18,927 - INFO - Loading chunk starting at offset 1290000 +2025-09-10 10:44:28,523 - INFO - Loading chunk starting at offset 1300000 +2025-09-10 10:44:38,629 - INFO - Loading chunk starting at offset 1310000 +2025-09-10 10:44:56,977 - INFO - Loading chunk starting at offset 1320000 +2025-09-10 10:45:09,203 - INFO - Loading chunk starting at offset 1330000 +2025-09-10 10:45:18,657 - INFO - Loading chunk starting at offset 1340000 +2025-09-10 10:45:27,201 - INFO - Loading chunk starting at offset 1350000 +2025-09-10 10:45:31,641 - INFO - [2025-09-10 10:45:31] Detecting salary... +2025-09-10 10:45:32,261 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:45:32,262 - INFO - [2025-09-10 10:45:32] Salary detection complete +2025-09-10 10:45:35,308 - INFO - Loading chunk starting at offset 1360000 +2025-09-10 10:45:44,783 - INFO - Loading chunk starting at offset 1370000 +2025-09-10 10:45:52,048 - INFO - Loading chunk starting at offset 1380000 +2025-09-10 10:45:58,491 - INFO - Loading chunk starting at offset 1390000 +2025-09-10 10:46:13,270 - INFO - Loading chunk starting at offset 1400000 +2025-09-10 10:46:25,221 - INFO - Loading chunk starting at offset 1410000 +2025-09-10 10:46:34,358 - INFO - Loading chunk starting at offset 1420000 +2025-09-10 10:46:38,777 - INFO - Loading chunk starting at offset 1430000 +2025-09-10 10:46:42,534 - INFO - Loading chunk starting at offset 1440000 +2025-09-10 10:46:46,147 - INFO - Loading chunk starting at offset 1450000 +2025-09-10 10:46:50,727 - INFO - Loading chunk starting at offset 1460000 +2025-09-10 10:46:55,939 - INFO - Loading chunk starting at offset 1470000 +2025-09-10 10:47:01,696 - INFO - Loading chunk starting at offset 1480000 +2025-09-10 10:47:07,411 - INFO - Loading chunk starting at offset 1490000 +2025-09-10 10:47:13,449 - INFO - Loading chunk starting at offset 1500000 +2025-09-10 10:47:23,058 - INFO - Loading chunk starting at offset 1510000 +2025-09-10 10:47:30,097 - INFO - Loading chunk starting at offset 1520000 +2025-09-10 10:47:32,259 - INFO - [2025-09-10 10:47:32] Detecting salary... +2025-09-10 10:47:32,923 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:47:32,924 - INFO - [2025-09-10 10:47:32] Salary detection complete +2025-09-10 10:47:36,423 - INFO - Loading chunk starting at offset 1530000 +2025-09-10 10:47:45,086 - INFO - Loading chunk starting at offset 1540000 +2025-09-10 10:47:57,079 - INFO - Loading chunk starting at offset 1550000 +2025-09-10 10:48:19,741 - INFO - Loading chunk starting at offset 1560000 +2025-09-10 10:48:41,300 - INFO - Loading chunk starting at offset 1570000 +2025-09-10 10:48:51,349 - INFO - Loading chunk starting at offset 1580000 +2025-09-10 10:48:58,892 - INFO - Loading chunk starting at offset 1590000 +2025-09-10 10:49:04,857 - INFO - Loading chunk starting at offset 1600000 +2025-09-10 10:49:10,299 - INFO - Loading chunk starting at offset 1610000 +2025-09-10 10:49:16,650 - INFO - Loading chunk starting at offset 1620000 +2025-09-10 10:49:23,107 - INFO - Loading chunk starting at offset 1630000 +2025-09-10 10:49:32,320 - INFO - Loading chunk starting at offset 1640000 +2025-09-10 10:49:32,927 - INFO - [2025-09-10 10:49:32] Detecting salary... +2025-09-10 10:49:33,484 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:49:33,485 - INFO - [2025-09-10 10:49:33] Salary detection complete +2025-09-10 10:49:40,707 - INFO - Loading chunk starting at offset 1650000 +2025-09-10 10:49:50,677 - INFO - Loading chunk starting at offset 1660000 +2025-09-10 10:49:59,664 - INFO - Loading chunk starting at offset 1670000 +2025-09-10 10:50:07,452 - INFO - Loading chunk starting at offset 1680000 +2025-09-10 10:50:13,583 - INFO - Loading chunk starting at offset 1690000 +2025-09-10 10:50:20,858 - INFO - Loading chunk starting at offset 1700000 +2025-09-10 10:50:27,519 - INFO - Loading chunk starting at offset 1710000 +2025-09-10 10:50:34,012 - INFO - Loading chunk starting at offset 1720000 +2025-09-10 10:50:39,628 - INFO - Loading chunk starting at offset 1730000 +2025-09-10 10:50:50,012 - INFO - Loading chunk starting at offset 1740000 +2025-09-10 10:51:06,647 - INFO - Loading chunk starting at offset 1750000 +2025-09-10 10:51:17,401 - INFO - Loading chunk starting at offset 1760000 +2025-09-10 10:51:26,360 - INFO - Loading chunk starting at offset 1770000 +2025-09-10 10:51:33,485 - INFO - [2025-09-10 10:51:33] Detecting salary... +2025-09-10 10:51:55,392 - INFO - Initializing pipeline... +2025-09-10 10:51:55,395 - INFO - [2025-09-10 10:51:55] Detecting salary... +2025-09-10 10:51:55,395 - INFO - Started autonomous salary detection loop. +2025-09-10 10:51:55,409 - INFO - Server running on hostname: 22bad35c69c3 +2025-09-10 10:51:55,416 - INFO - Server IP address: 172.25.0.2 +2025-09-10 10:51:55,424 - INFO - Server is accessible at: +2025-09-10 10:51:55,426 - INFO - - http://localhost:8000 +2025-09-10 10:51:55,444 - INFO - - http://127.0.0.1:8000 +2025-09-10 10:51:55,455 - INFO - - http://172.25.0.2:8000 +2025-09-10 10:51:55,456 - INFO - Pipeline initialized successfully +2025-09-10 10:51:56,024 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:51:56,024 - INFO - [2025-09-10 10:51:56] Salary detection complete +2025-09-10 10:52:36,450 - INFO - Initializing SalaryAnalyticsPipeline +2025-09-10 10:52:36,451 - INFO - Starting data loading process +2025-09-10 10:52:36,452 - INFO - No database connection. Attempting to connect... +2025-09-10 10:52:36,452 - INFO - Attempting to connect to database... +2025-09-10 10:52:36,538 - ERROR - Error connecting to database: No module named 'oracledb' +2025-09-10 10:52:36,539 - ERROR - Failed to establish database connection +2025-09-10 10:52:36,539 - ERROR - Failed to load data +2025-09-10 10:52:36,540 - ERROR - Failed to load data +2025-09-10 10:52:36,541 - INFO - Load data endpoint failed after 0.09 seconds +2025-09-10 10:52:36,541 - ERROR - Error loading data: 500: Failed to load data +2025-09-10 10:52:36,542 - INFO - Load data endpoint failed after 0.09 seconds +2025-09-10 10:52:58,149 - INFO - Shutting down Salary Analytics API... +2025-09-10 10:53:10,697 - INFO - generated new fontManager +2025-09-10 10:53:16,039 - INFO - Initializing pipeline... +2025-09-10 10:53:16,041 - INFO - [2025-09-10 10:53:16] Detecting salary... +2025-09-10 10:53:16,042 - INFO - Started autonomous salary detection loop. +2025-09-10 10:53:16,055 - INFO - Server running on hostname: 1c5d2376fb2a +2025-09-10 10:53:16,056 - INFO - Server IP address: 172.25.0.2 +2025-09-10 10:53:16,057 - INFO - Server is accessible at: +2025-09-10 10:53:16,058 - INFO - - http://localhost:8000 +2025-09-10 10:53:16,059 - INFO - - http://127.0.0.1:8000 +2025-09-10 10:53:16,060 - INFO - - http://172.25.0.2:8000 +2025-09-10 10:53:16,062 - INFO - Pipeline initialized successfully +2025-09-10 10:53:16,812 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 10:53:16,813 - INFO - [2025-09-10 10:53:16] Salary detection complete +2025-09-10 10:53:25,477 - INFO - Initializing SalaryAnalyticsPipeline +2025-09-10 10:53:25,482 - INFO - Starting data loading process +2025-09-10 10:53:25,483 - INFO - No database connection. Attempting to connect... +2025-09-10 10:53:25,484 - INFO - Attempting to connect to database... +2025-09-10 10:53:25,679 - ERROR - Error connecting to database: No module named 'oracledb' +2025-09-10 10:53:25,679 - ERROR - Failed to establish database connection +2025-09-10 10:53:25,680 - ERROR - Failed to load data +2025-09-10 10:53:25,680 - ERROR - Failed to load data +2025-09-10 10:53:25,681 - INFO - Load data endpoint failed after 0.20 seconds +2025-09-10 10:53:25,682 - ERROR - Error loading data: 500: Failed to load data +2025-09-10 10:53:25,682 - INFO - Load data endpoint failed after 0.20 seconds +2025-09-10 10:53:46,324 - INFO - Shutting down Salary Analytics API... +2025-09-10 11:07:49,841 - INFO - generated new fontManager +2025-09-10 11:07:59,771 - INFO - Initializing pipeline... +2025-09-10 11:07:59,774 - INFO - [2025-09-10 11:07:59] Detecting salary... +2025-09-10 11:07:59,774 - INFO - Started autonomous salary detection loop. +2025-09-10 11:07:59,893 - INFO - Server running on hostname: 0b21809edf52 +2025-09-10 11:07:59,894 - INFO - Server IP address: 172.25.0.2 +2025-09-10 11:07:59,895 - INFO - Server is accessible at: +2025-09-10 11:07:59,915 - INFO - - http://localhost:8000 +2025-09-10 11:07:59,917 - INFO - - http://127.0.0.1:8000 +2025-09-10 11:07:59,918 - INFO - - http://172.25.0.2:8000 +2025-09-10 11:07:59,919 - INFO - Pipeline initialized successfully +2025-09-10 11:08:01,268 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:08:01,269 - INFO - [2025-09-10 11:08:01] Salary detection complete +2025-09-10 11:08:15,668 - INFO - Initializing SalaryAnalyticsPipeline +2025-09-10 11:08:15,669 - INFO - Starting data loading process +2025-09-10 11:08:15,671 - INFO - No database connection. Attempting to connect... +2025-09-10 11:08:15,672 - INFO - Attempting to connect to database... +2025-09-10 11:08:20,076 - ERROR - Error connecting to database: (oracledb.exceptions.DatabaseError) ORA-00936: missing expression +Help: https://docs.oracle.com/error-help/db/ora-00936/ +[SQL: SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = 'customer_account_transaction_hx')] +(Background on this error at: https://sqlalche.me/e/20/4xp6) +2025-09-10 11:08:20,086 - ERROR - Failed to establish database connection +2025-09-10 11:08:20,087 - ERROR - Failed to load data +2025-09-10 11:08:20,092 - ERROR - Failed to load data +2025-09-10 11:08:20,119 - INFO - Load data endpoint failed after 4.45 seconds +2025-09-10 11:08:20,123 - ERROR - Error loading data: 500: Failed to load data +2025-09-10 11:08:20,124 - INFO - Load data endpoint failed after 4.46 seconds +2025-09-10 11:10:01,280 - INFO - [2025-09-10 11:10:01] Detecting salary... +2025-09-10 11:10:02,367 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:10:02,368 - INFO - [2025-09-10 11:10:02] Salary detection complete +2025-09-10 11:12:02,395 - INFO - [2025-09-10 11:12:02] Detecting salary... +2025-09-10 11:12:03,234 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:12:03,256 - INFO - [2025-09-10 11:12:03] Salary detection complete +2025-09-10 11:14:03,279 - INFO - [2025-09-10 11:14:03] Detecting salary... +2025-09-10 11:14:04,266 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:14:04,267 - INFO - [2025-09-10 11:14:04] Salary detection complete +2025-09-10 11:16:04,268 - INFO - [2025-09-10 11:16:04] Detecting salary... +2025-09-10 11:16:05,133 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:16:05,134 - INFO - [2025-09-10 11:16:05] Salary detection complete +2025-09-10 11:18:05,134 - INFO - [2025-09-10 11:18:05] Detecting salary... +2025-09-10 11:18:05,955 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:18:05,956 - INFO - [2025-09-10 11:18:05] Salary detection complete +2025-09-10 11:20:05,855 - INFO - [2025-09-10 11:20:05] Detecting salary... +2025-09-10 11:20:06,453 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:20:06,463 - INFO - [2025-09-10 11:20:06] Salary detection complete +2025-09-10 11:22:06,512 - INFO - [2025-09-10 11:22:06] Detecting salary... +2025-09-10 11:22:07,087 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:22:07,088 - INFO - [2025-09-10 11:22:07] Salary detection complete +2025-09-10 11:24:07,092 - INFO - [2025-09-10 11:24:07] Detecting salary... +2025-09-10 11:24:08,235 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:24:08,236 - INFO - [2025-09-10 11:24:08] Salary detection complete +2025-09-10 11:25:13,551 - INFO - Shutting down Salary Analytics API... +2025-09-10 11:25:52,852 - INFO - Initializing pipeline... +2025-09-10 11:25:52,857 - INFO - [2025-09-10 11:25:52] Detecting salary... +2025-09-10 11:25:52,857 - INFO - Started autonomous salary detection loop. +2025-09-10 11:25:52,866 - INFO - Server running on hostname: 0b21809edf52 +2025-09-10 11:25:52,876 - INFO - Server IP address: 172.25.0.2 +2025-09-10 11:25:52,881 - INFO - Server is accessible at: +2025-09-10 11:25:52,882 - INFO - - http://localhost:8000 +2025-09-10 11:25:52,883 - INFO - - http://127.0.0.1:8000 +2025-09-10 11:25:52,884 - INFO - - http://172.25.0.2:8000 +2025-09-10 11:25:52,884 - INFO - Pipeline initialized successfully +2025-09-10 11:25:53,580 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:25:53,581 - INFO - [2025-09-10 11:25:53] Salary detection complete +2025-09-10 11:27:53,578 - INFO - [2025-09-10 11:27:53] Detecting salary... +2025-09-10 11:27:54,235 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:27:54,235 - INFO - [2025-09-10 11:27:54] Salary detection complete +2025-09-10 11:29:54,235 - INFO - [2025-09-10 11:29:54] Detecting salary... +2025-09-10 11:29:54,827 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-10 11:29:54,827 - INFO - [2025-09-10 11:29:54] Salary detection complete +2025-09-10 11:30:20,979 - INFO - Shutting down Salary Analytics API... +2025-09-10 11:31:18,351 - INFO - Initializing pipeline... +2025-09-10 11:31:18,375 - INFO - [2025-09-10 11:31:18] Detecting salary... +2025-09-10 11:31:18,376 - INFO - Started autonomous salary detection loop. +2025-09-10 11:31:18,405 - INFO - Server running on hostname: 0b21809edf52 +2025-09-10 11:31:18,447 - INFO - Server IP address: 172.25.0.2 +2025-09-10 11:31:18,528 - INFO - Server is accessible at: +2025-09-10 11:31:18,552 - INFO - - http://localhost:8000 +2025-09-10 11:31:18,552 - INFO - - http://127.0.0.1:8000 +2025-09-10 11:31:18,553 - INFO - - http://172.25.0.2:8000 +2025-09-10 11:31:18,554 - INFO - Pipeline initialized successfully +2025-09-19 11:27:20,722 - INFO - Initializing pipeline... +2025-09-19 11:27:20,724 - INFO - [2025-09-19 11:27:20] Detecting salary... +2025-09-19 11:27:20,724 - INFO - Started autonomous salary detection loop. +2025-09-19 11:27:20,750 - INFO - Server running on hostname: 0b21809edf52 +2025-09-19 11:27:20,752 - INFO - Server IP address: 172.25.0.2 +2025-09-19 11:27:20,753 - INFO - Server is accessible at: +2025-09-19 11:27:20,759 - INFO - - http://localhost:8000 +2025-09-19 11:27:20,761 - INFO - - http://127.0.0.1:8000 +2025-09-19 11:27:20,766 - INFO - - http://172.25.0.2:8000 +2025-09-19 11:27:20,771 - INFO - Pipeline initialized successfully +2025-09-19 11:27:24,622 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-19 11:27:24,628 - INFO - [2025-09-19 11:27:24] Salary detection complete +2025-09-19 11:29:13,095 - INFO - Shutting down Salary Analytics API... +2025-09-24 09:47:38,012 - INFO - Initializing pipeline... +2025-09-24 09:47:38,017 - INFO - [2025-09-24 09:47:38] Detecting salary... +2025-09-24 09:47:38,024 - INFO - Started autonomous salary detection loop. +2025-09-24 09:47:38,077 - INFO - Server running on hostname: 0b21809edf52 +2025-09-24 09:47:38,079 - INFO - Server IP address: 172.25.0.2 +2025-09-24 09:47:38,080 - INFO - Server is accessible at: +2025-09-24 09:47:38,081 - INFO - - http://localhost:8000 +2025-09-24 09:47:38,082 - INFO - - http://127.0.0.1:8000 +2025-09-24 09:47:38,083 - INFO - - http://172.25.0.2:8000 +2025-09-24 09:47:38,084 - INFO - Pipeline initialized successfully +2025-09-24 09:47:41,272 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-24 09:47:41,273 - INFO - [2025-09-24 09:47:41] Salary detection complete +2025-09-24 09:49:41,290 - INFO - [2025-09-24 09:49:41] Detecting salary... +2025-09-24 09:49:43,791 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-24 09:49:43,791 - INFO - [2025-09-24 09:49:43] Salary detection complete +2025-09-24 09:50:21,300 - INFO - Shutting down Salary Analytics API... +2025-09-24 09:59:27,300 - INFO - generated new fontManager +2025-09-24 09:59:30,871 - INFO - Initializing pipeline... +2025-09-24 09:59:30,872 - INFO - [2025-09-24 09:59:30] Detecting salary... +2025-09-24 09:59:30,872 - INFO - Started autonomous salary detection loop. +2025-09-24 09:59:30,877 - INFO - Server running on hostname: bfe07c2f7da2 +2025-09-24 09:59:30,878 - INFO - Server IP address: 172.25.0.2 +2025-09-24 09:59:30,878 - INFO - Server is accessible at: +2025-09-24 09:59:30,883 - INFO - - http://localhost:8000 +2025-09-24 09:59:30,884 - INFO - - http://127.0.0.1:8000 +2025-09-24 09:59:30,887 - INFO - - http://172.25.0.2:8000 +2025-09-24 09:59:30,889 - INFO - Pipeline initialized successfully +2025-09-24 09:59:32,676 - INFO - POST http://www.simbrellang.net:5000/autocall/analytic-salary-detect status: 200, response: { + "data": [], + "error": {}, + "message": "AutoCall Add Salary Successful", + "status": true, + "statusCode": 200 +} + +2025-09-24 09:59:32,705 - INFO - [2025-09-24 09:59:32] Salary detection complete diff --git a/salary_analytics/app/__init__.py b/app/api/__init__.py similarity index 59% rename from salary_analytics/app/__init__.py rename to app/api/__init__.py index b286361..dde0fd0 100644 --- a/salary_analytics/app/__init__.py +++ b/app/api/__init__.py @@ -1,17 +1,21 @@ from flask import Flask import os -from .extensions import db, migrate +from app.extensions import db, migrate + + def create_app(): app = Flask(__name__) - app.config.from_object('salary_analytics.config') + + # Load configuration from config.py + app.config.from_object('app.config') # Initialize extensions db.init_app(app) migrate.init_app(app, db) # Register blueprints or CLI commands here if needed - from . import commands + from app.api.commands import commands app.cli.add_command(commands.upload_xls_cli) return app \ No newline at end of file diff --git a/salary_analytics/app/commands.py b/app/api/commands/commands.py similarity index 94% rename from salary_analytics/app/commands.py rename to app/api/commands/commands.py index b90b73e..d055a5b 100644 --- a/salary_analytics/app/commands.py +++ b/app/api/commands/commands.py @@ -2,8 +2,8 @@ import click import pandas as pd from datetime import datetime from flask.cli import with_appcontext -from salary_analytics.app.extensions import db -from salary_analytics.app.models import RawTransaction +from app.extensions import db +from app.models import RawTransaction @click.group() def commands(): diff --git a/salary_analytics/config.py b/app/config.py similarity index 77% rename from salary_analytics/config.py rename to app/config.py index fccdb37..32a2569 100644 --- a/salary_analytics/config.py +++ b/app/config.py @@ -22,20 +22,32 @@ os.makedirs(PLOTS_DIR, exist_ok=True) os.makedirs(CSV_DIR, exist_ok=True) os.makedirs(MODEL_DIR, exist_ok=True) + # Database Configuration DB_CONFIG = { - "user": os.getenv("DB_USER"), # Default value as fallback - "password": os.getenv("DB_PASSWORD"), - "name": os.getenv("DB_NAME"), - "port": os.getenv("DB_PORT"), - "host": os.getenv("DB_HOST") + "user": os.getenv("DATABASE_USER"), + "password": os.getenv("DATABASE_PASSWORD"), + "name": os.getenv("DATABASE_NAME"), + "port": os.getenv("DATABASE_PORT", 10532), + "host": os.getenv("DATABASE_HOST", "firstadvancedev"), + "sid": os.getenv("DATABASE_SID", "FREE"), + "service_name": os.getenv("DATABASE_SERVICE_NAME", "firstadv") } + +# DNS = f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={DB_CONFIG['host']})(PORT={DB_CONFIG['port']}))(CONNECT_DATA=(SID={DB_CONFIG['sid']})))" + +# Database Connection +# SQLALCHEMY_DATABASE_URI = (f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DNS}") +SQLALCHEMY_DATABASE_URI = ( f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/?service_name={DB_CONFIG['service_name']}") + + # SQLAlchemy Configuration -SQLALCHEMY_DATABASE_URI = ( - f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" - f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}" -) +# SQLALCHEMY_DATABASE_URI = ( +# f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" +# f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}" +# ) + SQLALCHEMY_TRACK_MODIFICATIONS = False # Table Configuration @@ -80,7 +92,7 @@ OUTPUT_PATHS = { } SIMBRELLA_BASE_URL = os.getenv("SIMBRELLA_BASE_URL", "http://127.0.0.1:6337") -SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS","api/rac-check") +SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS", "api/rac-check") # Salary Detect Endpoint Config SALARY_DETECT_URL = "http://www.simbrellang.net:5000/autocall/analytic-salary-detect" diff --git a/salary_analytics/app/extensions.py b/app/extensions.py similarity index 100% rename from salary_analytics/app/extensions.py rename to app/extensions.py diff --git a/app/models/__init__.py b/app/models/__init__.py new file mode 100644 index 0000000..62baa90 --- /dev/null +++ b/app/models/__init__.py @@ -0,0 +1,4 @@ +from .raw_transaction import RawTransaction + + +__all__ = ['RawTransaction'] \ No newline at end of file diff --git a/app/models/account.py b/app/models/account.py new file mode 100644 index 0000000..2f08f72 --- /dev/null +++ b/app/models/account.py @@ -0,0 +1,18 @@ +from app.extensions import db + + +class Account(db.Model): + __tablename__ = "accounts" + + customerid = db.Column(db.String(50), primary_key=True) + accountid = db.Column(db.String(11), nullable=False) + registrationdate = db.Column(db.DateTime) + accountcurrencycode = db.Column(db.String(3)) + schemecode = db.Column(db.String(5)) + lastinflowtransactiondate = db.Column(db.DateTime) + accountstatus = db.Column(db.String(50)) + accountstatusdate = db.Column(db.DateTime) + + + def __repr__(self): + return f"" \ No newline at end of file diff --git a/app/models/batch_results.py b/app/models/batch_results.py new file mode 100644 index 0000000..fcfdd91 --- /dev/null +++ b/app/models/batch_results.py @@ -0,0 +1,97 @@ +from sqlalchemy import Column, Integer, String, DateTime, Numeric, Boolean, func +from sqlalchemy.orm import declarative_base, Session +from datetime import datetime +from app.utils.logger import logger +from app.extensions import db + + +class BatchResult(db.Model): + __tablename__ = "salary_analytics_batch_results" + + id = Column(Integer, primary_key=True, autoincrement=True) + batch_number = Column(Integer, nullable=False) + total_batches = Column(Integer, nullable=False) + processed_at = Column(DateTime, default=datetime.utcnow) + accountid = Column(String, nullable=False) + num_months = Column(Integer) + least_inflow_6m = Column(Numeric) + avg_monthly_salary = Column(Numeric) + estimated_next_amount = Column(Numeric) + estimated_next_date = Column(DateTime) + is_45day_salary = Column(Boolean, default=False) + is_2months_salary = Column(Boolean, default=False) + status = Column(String, default="success") + + + @classmethod + def save_batch(cls, batch_number, total_batches, results_df, status="success"): + """Save batch results into DB using ORM bulk insert.""" + try: + results_df["batch_number"] = batch_number + results_df["total_batches"] = total_batches + results_df["processed_at"] = datetime.utcnow() + results_df["status"] = status + + # Normalize boolean columns + results_df["is_45day_salary"] = results_df.get("45daysalary", False) + results_df["is_2months_salary"] = results_df.get("2monthssalary", False) + + # Convert to list of ORM objects + records = [ + cls(**row) + for row in results_df.to_dict("records") + ] + + db.session.bulk_save_objects(records) + db.session.commit() + logger.info(f"Saved batch {batch_number} successfully.") + return True + except Exception as e: + db.session.rollback() + logger.error(f"Error saving batch {batch_number}: {str(e)}") + return False + + @classmethod + def get_batch_status(cls, batch_number: int): + """Return summary info about one batch.""" + try: + result = ( + db.session.query( + cls.batch_number, + cls.total_batches, + cls.processed_at, + func.count().label("total_records"), + func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"), + func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"), + ) + .filter(cls.batch_number == batch_number) + .group_by(cls.batch_number, cls.total_batches, cls.processed_at) + .order_by(cls.processed_at.desc()) + .first() + ) + return dict(result._mapping) if result else None + except Exception as e: + logger.error(f"Error fetching batch {batch_number} status: {str(e)}") + return None + + @classmethod + def get_all_batches(cls): + """Return summaries for all batches.""" + try: + results = ( + db.session.query( + cls.batch_number, + cls.total_batches, + cls.processed_at, + func.count().label("total_records"), + func.sum(func.case((cls.status == "success", 1), else_=0)).label("successful_records"), + func.sum(func.case((cls.status == "error", 1), else_=0)).label("failed_records"), + ) + .group_by(cls.batch_number, cls.total_batches, cls.processed_at) + .order_by(cls.batch_number) + .all() + ) + return [dict(r._mapping) for r in results] + except Exception as e: + logger.error(f"Error fetching all batches: {str(e)}") + return [] diff --git a/app/models/customer_account_transaction_hx.py b/app/models/customer_account_transaction_hx.py new file mode 100644 index 0000000..ac3e006 --- /dev/null +++ b/app/models/customer_account_transaction_hx.py @@ -0,0 +1,100 @@ +from venv import logger +from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey +from sqlalchemy.orm import relationship +from app.extensions import db +import pandas as pd + +class CustomerAccountTransactionHx(db.Model): + __tablename__ = "customer_account_transaction_hx" + + id = Column(Integer, primary_key=True, autoincrement=True) + accountid = Column(String(64), nullable=False, index=True) + trx_type = Column(String(50), nullable=False) + amount = Column(Float, nullable=False) + description = Column(String(255)) + customer_id = Column(String(64)) + trx_start_date = Column(DateTime, nullable=False) + trx_end_date = Column(DateTime) + is_salary_related = Column(Integer, default=0) + is_consistent_amount = Column(Integer, default=0) + is_salary_type = Column(Integer, default=0) + + + + @classmethod + def get_all(cls): + """Fetch all transactions.""" + return db.session.query(cls).all() + + @classmethod + def get_rows_count(cls): + """Return total number of transaction rows.""" + try: + count = db.session.query(db.func.count(cls.id)).scalar() + return count + except Exception as e: + logger.error(f"Error getting row count: {str(e)}") + return None + + @classmethod + def get_by_account(cls, accountid: str): + """Fetch transactions for a given account.""" + return db.session.query(cls).filter_by(accountid=accountid).all() + + @classmethod + def get_accounts(cls, limit=None): + """Fetch distinct account IDs.""" + query = db.session.query(cls.accountid).distinct() + if limit: + query = query.limit(limit) + return [row.accountid for row in query.all()] + + @classmethod + def insert_transaction(cls, **kwargs): + """Insert a new transaction.""" + trx = cls(**kwargs) + try: + db.session.add(trx) + db.session.commit() + except Exception as e: + logger.error(f"Error inserting transaction: {str(e)}") + return None + return trx + + @classmethod + def bulk_insert(cls, transactions: list[dict]): + """Insert multiple transactions at once.""" + objs = [cls(**trx) for trx in transactions] + + try: + db.session.bulk_save_objects(objs) + db.session.commit() + except Exception as e: + logger.error(f"Error in bulk insert: {str(e)}") + return None + return objs + + @classmethod + def get_transactions_df(cls, accountids: list[str] = None): + """Return a Pandas DataFrame for ML model preparation.""" + query = db.session.query(cls) + if accountids: + query = query.filter(cls.accountid.in_(accountids)) + rows = query.all() + + + df = pd.DataFrame([{ + "id": trx.id, + "accountid": trx.accountid, + "trx_type": trx.trx_type, + "amount": trx.amount, + "description": trx.description, + "customer_id": trx.customer_id, + "trx_start_date": trx.trx_start_date, + "trx_end_date": trx.trx_end_date, + "is_salary_related": trx.is_salary_related, + "is_consistent_amount": trx.is_consistent_amount, + "is_salary_type": trx.is_salary_type, + } for trx in rows]) + + return df diff --git a/salary_analytics/db_operations.py b/app/models/db_operations.py similarity index 98% rename from salary_analytics/db_operations.py rename to app/models/db_operations.py index 9cb317e..6a20389 100644 --- a/salary_analytics/db_operations.py +++ b/app/models/db_operations.py @@ -2,12 +2,10 @@ Database operations module for salary analytics. """ -import logging from sqlalchemy import text -from .config import BATCH_RESULTS_TABLE +from app.config import BATCH_RESULTS_TABLE from datetime import datetime - -logger = logging.getLogger(__name__) +from app.utils.logger import logger class DatabaseOperations: def __init__(self, engine): diff --git a/salary_analytics/app/models.py b/app/models/raw_transaction.py similarity index 91% rename from salary_analytics/app/models.py rename to app/models/raw_transaction.py index 72615f9..a34b7d8 100644 --- a/salary_analytics/app/models.py +++ b/app/models/raw_transaction.py @@ -1,4 +1,4 @@ -from .extensions import db +from app.extensions import db class RawTransaction(db.Model): __tablename__ = 'analytics_raw_transactions' diff --git a/app/models/simbrella_customer.py b/app/models/simbrella_customer.py new file mode 100644 index 0000000..2e16424 --- /dev/null +++ b/app/models/simbrella_customer.py @@ -0,0 +1,36 @@ +from app.extensions import db + +class SimbrellaCustomer(db.Model): + __tablename__ = "simbrella_customers" + + customerid = db.Column(db.String(500), primary_key=True) + acct_opn_date = db.Column(db.DateTime) + gender = db.Column(db.String(20)) + birth_date = db.Column(db.DateTime) + msisdn = db.Column(db.String(100)) + isbvnvalid = db.Column(db.String(100)) + datebvnvalidated = db.Column(db.DateTime) + iscrmsvalid = db.Column(db.String(100)) + datecrmsvalidated = db.Column(db.DateTime) + iscrcvalid = db.Column(db.String(100)) + datecrcvalidated = db.Column(db.DateTime) + + + def __repr__(self): + return f"" + + + def to_dict(self): + return { + "customerid": self.customerid, + "acct_opn_date": self.acct_opn_date, + "gender": self.gender, + "birth_date": self.birth_date, + "msisdn": self.msisdn, + "isbvnvalid": self.isbvnvalid, + "datebvnvalidated": self.datebvnvalidated, + "iscrmsvalid": self.iscrmsvalid, + "datecrmsvalidated": self.datecrmsvalidated, + "iscrcvalid": self.iscrcvalid, + "datecrcvalidated": self.datecrcvalidated, + } \ No newline at end of file diff --git a/app/models/temp_simbrella_lien.py b/app/models/temp_simbrella_lien.py new file mode 100644 index 0000000..5449120 --- /dev/null +++ b/app/models/temp_simbrella_lien.py @@ -0,0 +1,29 @@ +from app.extensions import db + + +class TempSimbrellaLien(db.Model): + __tablename__ = "tmp_simbrella_lien" + + lienid = db.Column(db.String(33), primary_key=True) + customerid = db.Column(db.String(500)) + accountid = db.Column(db.String(11)) + lien_start_date = db.Column(db.DateTime) + action = db.Column(db.String(5)) + restriction_nature = db.Column(db.String(150)) + claim_amount = db.Column(db.Numeric(20, 4)) + + + def __repr__(self): + return f"" + + + def to_dict(self): + return { + "lienid": self.lienid, + "customerid": self.customerid, + "accountid": self.accountid, + "lien_start_date": self.lien_start_date, + "action": self.action, + "restriction_nature": self.restriction_nature, + "claim_amount": float(self.claim_amount) if self.claim_amount is not None else None, + } \ No newline at end of file diff --git a/app/models/transaction.py b/app/models/transaction.py new file mode 100644 index 0000000..f2c1de0 --- /dev/null +++ b/app/models/transaction.py @@ -0,0 +1,45 @@ +from app.extensions import db + + +class Transaction(db.Model): + __tablename__ = "transactions" + + tran_id = db.Column(db.String(50), primary_key=True) + cif_id = db.Column(db.String(500)) + foracid = db.Column(db.String(150)) + acid = db.Column(db.String(150)) + tran_date = db.Column(db.DateTime) + value_date = db.Column(db.DateTime) + pstd_date = db.Column(db.DateTime) + tran_sub_type = db.Column(db.String(50)) + part_tran_type = db.Column(db.String(50)) + tran_crncy_code = db.Column(db.String(50)) + tran_amt = db.Column(db.Numeric(38, 0)) + tran_particular = db.Column(db.String(250)) + origination_channel = db.Column(db.String(150)) + reversal_tran_id = db.Column(db.String(50)) + isreversal = db.Column(db.String(50)) + + +def __repr__(self): + return f"" + + +def to_dict(self): + return { + "tran_id": self.tran_id, + "cif_id": self.cif_id, + "foracid": self.foracid, + "acid": self.acid, + "tran_date": self.tran_date, + "value_date": self.value_date, + "pstd_date": self.pstd_date, + "tran_sub_type": self.tran_sub_type, + "part_tran_type": self.part_tran_type, + "tran_crncy_code": self.tran_crncy_code, + "tran_amt": float(self.tran_amt) if self.tran_amt is not None else None, + "tran_particular": self.tran_particular, + "origination_channel": self.origination_channel, + "reversal_tran_id": self.reversal_tran_id, + "isreversal": self.isreversal, + } \ No newline at end of file diff --git a/app/models/transaction_stg.py b/app/models/transaction_stg.py new file mode 100644 index 0000000..9fa1939 --- /dev/null +++ b/app/models/transaction_stg.py @@ -0,0 +1,45 @@ +from app.extensions import db + + +class TransactionStg(db.Model): + __tablename__ = "transaction_stg" + + tran_id = db.Column(db.String(50), primary_key=True) + cif_id = db.Column(db.String(500)) + foracid = db.Column(db.String(150)) + acid = db.Column(db.String(150)) + tran_date = db.Column(db.String(50)) # staging table keeps as string + value_date = db.Column(db.String(50)) + pstd_date = db.Column(db.String(50)) + tran_sub_type = db.Column(db.String(50)) + part_tran_type = db.Column(db.String(50)) + tran_crncy_code = db.Column(db.String(50)) + tran_amt = db.Column(db.Numeric(38, 0)) + tran_particular = db.Column(db.String(250)) + origination_channel = db.Column(db.String(150)) + reversal_tran_id = db.Column(db.String(50)) + isreversal = db.Column(db.String(50)) + + + def __repr__(self): + return f"" + + + def to_dict(self): + return { + "tran_id": self.tran_id, + "cif_id": self.cif_id, + "foracid": self.foracid, + "acid": self.acid, + "tran_date": self.tran_date, + "value_date": self.value_date, + "pstd_date": self.pstd_date, + "tran_sub_type": self.tran_sub_type, + "part_tran_type": self.part_tran_type, + "tran_crncy_code": self.tran_crncy_code, + "tran_amt": float(self.tran_amt) if self.tran_amt is not None else None, + "tran_particular": self.tran_particular, + "origination_channel": self.origination_channel, + "reversal_tran_id": self.reversal_tran_id, + "isreversal": self.isreversal, + } \ No newline at end of file diff --git a/app/salary_analytics/__init__.py b/app/salary_analytics/__init__.py new file mode 100644 index 0000000..62f8ac0 --- /dev/null +++ b/app/salary_analytics/__init__.py @@ -0,0 +1,38 @@ +from fastapi import FastAPI +from app.salary_analytics.routes import analysis, reports, pipeline, load, base, train +from app.salary_analytics.middlewares.middleware import add_middlewares +from app.salary_analytics.events.lifecycle import register_events +from app.utils.logger import logger +import socket + +""" +Salary Analytics Package +A package for analyzing and predicting salary patterns from transaction data. +""" + +__version__ = "0.1.0" + + +def create_app() -> FastAPI: + app = FastAPI( + title="Salary Analytics API", + description="API for analyzing and predicting salary patterns from transaction data", + version="1.0.0" + ) + + # Middlewares + add_middlewares(app) + + # Events + register_events(app) + + # Routers + app.include_router(base.router, tags=["Base"]) + app.include_router(analysis.router, prefix="/analyze", tags=["Analysis"]) + app.include_router(reports.router, tags=["Reports"]) + app.include_router(pipeline.router, tags=["Pipeline"]) + app.include_router(load.router, tags=["Data"]) + app.include_router(train.router, tags=["Model Training"]) + + + return app diff --git a/app/salary_analytics/core/state.py b/app/salary_analytics/core/state.py new file mode 100644 index 0000000..4e59c2f --- /dev/null +++ b/app/salary_analytics/core/state.py @@ -0,0 +1,64 @@ +from app.salary_analytics.services.main import SalaryAnalyticsPipeline +from app.salary_analytics.services.data_loader import DataLoader + + +class GlobalState: + def __init__(self): + self._pipeline = None + self._data_loader = None + self.df = None + self.salary_predictor = None + self.salary_earner_analyzer = None + + # ---- Pipeline ---- + @property + def pipeline(self): + if self._pipeline is None: + self._pipeline = SalaryAnalyticsPipeline() + return self._pipeline + + @pipeline.setter + def pipeline(self, value): + self._pipeline = value + + # ---- Data Loader ---- + @property + def data_loader(self): + if self._data_loader is None: + self._data_loader = DataLoader() + return self._data_loader + + @data_loader.setter + def data_loader(self, value): + self._data_loader = value + + # ---- DataFrame ---- + @property + def df(self): + return self._df + + @df.setter + def df(self, value): + self._df = value + + # ---- Salary Predictor ---- + @property + def salary_predictor(self): + return self._salary_predictor + + @salary_predictor.setter + def salary_predictor(self, value): + self._salary_predictor = value + + # ---- Salary Earner Analyzer ---- + @property + def salary_earner_analyzer(self): + return self._salary_earner_analyzer + + @salary_earner_analyzer.setter + def salary_earner_analyzer(self, value): + self._salary_earner_analyzer = value + + +state = GlobalState() + diff --git a/app/salary_analytics/events/lifecycle.py b/app/salary_analytics/events/lifecycle.py new file mode 100644 index 0000000..635ab73 --- /dev/null +++ b/app/salary_analytics/events/lifecycle.py @@ -0,0 +1,36 @@ +import socket +from fastapi import FastAPI +from app.salary_analytics.integrations.salary_detect import SalaryDetect +from app.utils.logger import logger + +salary_detect = SalaryDetect() + + +def register_events(app: FastAPI): + @app.on_event("startup") + async def startup_event(): + """Initialize the pipeline on startup.""" + try: + logger.info("Initializing pipeline...") + + # Start autonomous salary detection loop + salary_detect.start() + logger.info("Started autonomous salary detection loop.") + + # Print network information + hostname = socket.gethostname() + ip_address = socket.gethostbyname(hostname) + logger.info(f"Server running on hostname: {hostname}") + logger.info(f"Server IP address: {ip_address}") + logger.info(f"Server is accessible at:") + logger.info(f"- http://localhost:8000") + logger.info(f"- http://127.0.0.1:8000") + logger.info(f"- http://{ip_address}:8000") + logger.info("Pipeline initialized successfully") + except Exception as e: + logger.error(f"Error during startup: {str(e)}") + raise + + @app.on_event("shutdown") + async def shutdown_event(): + logger.info("Shutting down Salary Analytics API...") diff --git a/app/salary_analytics/helpers/data_checks.py b/app/salary_analytics/helpers/data_checks.py new file mode 100644 index 0000000..47caa4a --- /dev/null +++ b/app/salary_analytics/helpers/data_checks.py @@ -0,0 +1,12 @@ +from fastapi import HTTPException +from app.salary_analytics.core.state import state + + +def check_data_loaded(): + """Raise HTTP 400 if no data is loaded into the pipeline.""" + if state.pipeline.df is None: + raise HTTPException( + status_code=400, + detail="No data loaded. Please load data first using the /load-data endpoint." + ) + return True diff --git a/app/salary_analytics/helpers/response_helpers.py b/app/salary_analytics/helpers/response_helpers.py new file mode 100644 index 0000000..8485611 --- /dev/null +++ b/app/salary_analytics/helpers/response_helpers.py @@ -0,0 +1,17 @@ +from typing import Optional, Dict, List, Union +from pydantic import BaseModel + + +class AnalysisResponse(BaseModel): + """Response model for analysis endpoints.""" + message: str + data: Optional[Dict] = None + file_path: Optional[str] = None + +class BatchResponse(BaseModel): + """Response model for batch processing.""" + batch_number: int + total_batches: int + processed_rows: int + results_path: str + message: str \ No newline at end of file diff --git a/salary_analytics/rac_check.py b/app/salary_analytics/integrations/rac_check.py similarity index 90% rename from salary_analytics/rac_check.py rename to app/salary_analytics/integrations/rac_check.py index d8f16d3..86c21ff 100644 --- a/salary_analytics/rac_check.py +++ b/app/salary_analytics/integrations/rac_check.py @@ -1,10 +1,8 @@ from django.conf import settings import httpx import json -from salary_analytics.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS -import logging - -logger = logging.getLogger(__name__) +from app.config import SIMBRELLA_BASE_URL, SIMBRELLA_ENDPOINT_RAC_CHECKS +from app.utils.logger import logger class SimbrellaIntegration: BASE_URL = SIMBRELLA_BASE_URL diff --git a/salary_analytics/salary_detect.py b/app/salary_analytics/integrations/salary_detect.py similarity index 85% rename from salary_analytics/salary_detect.py rename to app/salary_analytics/integrations/salary_detect.py index 0824ae2..441dd02 100644 --- a/salary_analytics/salary_detect.py +++ b/app/salary_analytics/integrations/salary_detect.py @@ -1,11 +1,8 @@ import time -import logging import threading import requests -from .config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload - -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) +from app.config import SALARY_DETECT_URL, SALARY_DETECT_HEADERS, get_random_salary_payload +from app.utils.logger import logger class SalaryDetect: def __init__(self): diff --git a/app/salary_analytics/middlewares/middleware.py b/app/salary_analytics/middlewares/middleware.py new file mode 100644 index 0000000..afd95a3 --- /dev/null +++ b/app/salary_analytics/middlewares/middleware.py @@ -0,0 +1,11 @@ +from fastapi.middleware.cors import CORSMiddleware +from fastapi import FastAPI + +def add_middlewares(app: FastAPI): + app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) diff --git a/app/salary_analytics/routes/analysis.py b/app/salary_analytics/routes/analysis.py new file mode 100644 index 0000000..e6c6688 --- /dev/null +++ b/app/salary_analytics/routes/analysis.py @@ -0,0 +1,75 @@ +from fastapi import APIRouter, HTTPException +from app.salary_analytics.helpers.response_helpers import AnalysisResponse +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.utils.logger import logger +import time +from app.salary_analytics.core.state import state + +router = APIRouter() + + +@router.post("/keyword", response_model=AnalysisResponse) +async def analyze_keyword(): + """Run keyword-based salary transaction analysis.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting keyword analysis...") + data = state.pipeline.run_keyword_analysis() + logger.info(f"Keyword analysis completed. Found {len(data)} matches") + response = AnalysisResponse( + message="Keyword analysis completed successfully", + data={"count": len(data)} + ) + logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in keyword analysis: {str(e)}") + logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + + +@router.post("/consistent-amount", response_model=AnalysisResponse) +async def analyze_consistent_amount(): + """Run consistent amount transaction analysis.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting consistent amount analysis...") + data = state.pipeline.run_consistent_amount_analysis() + logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") + response = AnalysisResponse( + message="Consistent amount analysis completed successfully", + data={"count": len(data)} + ) + logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in consistent amount analysis: {str(e)}") + logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + + + +@router.post("/transaction-type", response_model=AnalysisResponse) +async def analyze_transaction_type(): + """Run transaction type analysis.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting transaction type analysis...") + data = state.pipeline.run_transaction_type_analysis() + logger.info(f"Transaction type analysis completed. Found {len(data)} matches") + response = AnalysisResponse( + message="Transaction type analysis completed successfully", + data={"count": len(data)} + ) + logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in transaction type analysis: {str(e)}") + logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/app/salary_analytics/routes/base.py b/app/salary_analytics/routes/base.py new file mode 100644 index 0000000..c1319fb --- /dev/null +++ b/app/salary_analytics/routes/base.py @@ -0,0 +1,28 @@ +from fastapi import APIRouter +from app.utils.logger import logger +import time + + +router = APIRouter() + +@router.get("/") +async def root(): + """Root endpoint.""" + start_time = time.time() + logger.info("Root endpoint accessed") + response = {"message": "Welcome to Salary Analytics API"} + logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") + return response + + +@router.get("/health") +async def health_check(): + """Health check endpoint.""" + start_time = time.time() + logger.info("Health check endpoint accessed") + response = {"status": "healthy"} + logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") + return response + + + diff --git a/app/salary_analytics/routes/load.py b/app/salary_analytics/routes/load.py new file mode 100644 index 0000000..ac6802c --- /dev/null +++ b/app/salary_analytics/routes/load.py @@ -0,0 +1,74 @@ +from fastapi import APIRouter, HTTPException, UploadFile, File +from app.salary_analytics.core.state import state +from app.utils.logger import logger +import tempfile, os, time +from typing import Optional + +router = APIRouter() + + + +@router.post("/load-data") +async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): + """ + Load data from either database or CSV file. + + Args: + source (str): Source of data ('db' or 'csv') + file (UploadFile, optional): CSV file to load (required if source is 'csv') + + Returns: + dict: Status of data loading + """ + start_time = time.time() + try: + if source not in ['db', 'csv']: + logger.error(f"Invalid source: {source}") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + + if source == 'csv' and not file: + logger.error("No file provided for CSV source") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + + if source == 'csv': + # Save uploaded file temporarily + with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: + content = await file.read() + temp_file.write(content) + temp_file_path = temp_file.name + + try: + success = state.pipeline.load_data(source='csv', file_path=temp_file_path) + finally: + # Clean up temporary file + os.unlink(temp_file_path) + else: + success = state.pipeline.load_data(source='db') + + if not success: + logger.error("Failed to load data") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to load data") + + response = { + "status": "success", + "message": f"Successfully loaded {len(state.pipeline.df)} rows of data", + "columns": state.pipeline.df.columns.tolist(), + "row_count": len(state.pipeline.df) + } + logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error loading data: {str(e)}") + logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/load-data-with-file") +async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): + """Dependency to handle file upload only when source is csv.""" + if source == 'csv' and not file: + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + return file diff --git a/app/salary_analytics/routes/pipeline.py b/app/salary_analytics/routes/pipeline.py new file mode 100644 index 0000000..db08518 --- /dev/null +++ b/app/salary_analytics/routes/pipeline.py @@ -0,0 +1,292 @@ +from fastapi import APIRouter, HTTPException +from app.salary_analytics.services.main import SalaryAnalyticsPipeline +from app.salary_analytics.helpers.response_helpers import AnalysisResponse, BatchResponse +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.salary_analytics.services.data_loader import DataLoader +from app.salary_analytics.core.state import state +from app.models.db_operations import DatabaseOperations +from app.config import OUTPUT_PATHS, TABLE_NAME +from app.utils.logger import logger +from typing import Optional, List, Union +from sqlalchemy import text +from datetime import datetime +import pandas as pd, os, tempfile, time +from typing import Optional, Union +from fastapi import UploadFile, File + +router = APIRouter() + + +@router.post("/run/pipeline", response_model=AnalysisResponse) +async def run_full_pipeline(): + """Run the complete salary analytics pipeline.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting full pipeline...") + success = state.pipeline.run_full_pipeline() + if not success: + logger.error("Pipeline failed") + logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Pipeline failed") + + logger.info("Pipeline completed successfully") + response = AnalysisResponse( + message="Pipeline completed successfully" + ) + logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in pipeline: {str(e)}") + logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + +@router.post("/run/streaming-pipeline", response_model=List[BatchResponse]) +async def run_streaming_pipeline( + source: str = "db", + batch_size: int = 10000, + file: Optional[Union[UploadFile, str]] = File(None) +): + """ + Run the complete salary analytics pipeline in batches. + + Args: + source (str): Source of data ('db' or 'csv') + batch_size (int): Number of rows to process in each batch + file (UploadFile, optional): CSV file to load (required if source is 'csv') + + Returns: + List[BatchResponse]: List of responses for each batch processed + """ + start_time = time.time() + try: + if source not in ['db', 'csv']: + logger.error(f"Invalid source: {source}") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") + + if source == 'csv' and not file: + logger.error("No file provided for CSV source") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") + + # Initialize data loader + state.data_loader = DataLoader() + state.data_loader.chunk_size = batch_size + + # Create output directory for batch results + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") + os.makedirs(batch_output_dir, exist_ok=True) + + # Initialize database operations + if not state.data_loader.connect(): + logger.error("Failed to connect to database") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to connect to database") + + db_ops = DatabaseOperations(state.data_loader.engine) + if not db_ops.create_batch_results_table(): + logger.error("Failed to create batch results table") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail="Failed to create batch results table") + + responses = [] + batch_number = 0 + batch_start_time = time.time() + + def preprocess_chunk(chunk): + """Preprocess a chunk of data with the same logic as DataLoader.""" + # Convert dates + chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) + chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) + + # Rename columns + chunk = chunk.rename(columns={ + 'd1': 'trx_type', + 'd2': 'trx_subtype', + 'd3': 'initiated_by', + 'd4': 'customer_id' + }) + + chunk = chunk.dropna() + + return chunk + + if source == 'csv': + # Save uploaded file temporarily + with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: + content = await file.read() + temp_file.write(content) + temp_file_path = temp_file.name + + try: + # Process CSV in chunks + for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): + batch_number += 1 + logger.info(f"Processing batch {batch_number}") + + # Preprocess chunk + chunk = preprocess_chunk(chunk) + + # Run pipeline on chunk + state.pipeline = SalaryAnalyticsPipeline() + state.pipeline.df = chunk + + try: + batch_start_time = time.time() + # Run analyses + state.pipeline.run_keyword_analysis() + state.pipeline.run_consistent_amount_analysis() + state.pipeline.run_transaction_type_analysis() + + # Generate reports + reports = state.pipeline.generate_salary_earner_reports() + + # Add batch metadata to results + results_df = reports['final_table'].copy() + results_df['batch_number'] = batch_number + results_df['total_batches'] = -1 # Unknown for CSV + results_df['processed_at'] = datetime.now() + + # Save batch results to CSV + batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") + results_df.to_csv(batch_results_path, index=False) + + # Save to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=-1, # Unknown for CSV + results_df=results_df, + status="success" + ) + + logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=-1, # Unknown for CSV + processed_rows=len(chunk), + results_path=batch_results_path, + message=f"Successfully processed batch {batch_number}" + )) + except Exception as e: + error_message = str(e) + logger.error(f"Error processing batch {batch_number}: {error_message}") + + # Save error to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=-1, + results_df=pd.DataFrame(), # Empty DataFrame for error case + status="error" + ) + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=-1, + processed_rows=len(chunk), + results_path="", + message=f"Error processing batch {batch_number}: {error_message}" + )) + finally: + # Clean up temporary file + os.unlink(temp_file_path) + else: + # Process database in chunks + if not state.data_loader.connect(): + raise HTTPException(status_code=500, detail="Failed to connect to database") + + # Get total row count + with state.data_loader.engine.connect() as conn: + count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") + total_rows = conn.execute(count_query).scalar() + + total_batches = (total_rows + batch_size - 1) // batch_size + offset = 0 + + while offset < total_rows: + batch_number += 1 + logger.info(f"Processing batch {batch_number} of {total_batches}") + + # Load chunk from database + query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" + chunk = pd.read_sql(query, state.data_loader.engine) + + if chunk.empty: + break + + # Preprocess chunk + chunk = preprocess_chunk(chunk) + + # Run pipeline on chunk + pipeline = SalaryAnalyticsPipeline() + state.pipeline.df = chunk + + try: + batch_start_time = time.time() + # Run analyses + state.pipeline.run_keyword_analysis() + state.pipeline.run_consistent_amount_analysis() + state.pipeline.run_transaction_type_analysis() + + # Generate reports + reports = state.pipeline.generate_salary_earner_reports() + + # Add batch metadata to results + results_df = reports['final_table'].copy() + results_df['batch_number'] = batch_number + results_df['total_batches'] = total_batches + results_df['processed_at'] = datetime.now() + + # Save batch results to CSV + batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") + results_df.to_csv(batch_results_path, index=False) + + # Save to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=total_batches, + results_df=results_df, + status="success" + ) + + logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=total_batches, + processed_rows=len(chunk), + results_path=batch_results_path, + message=f"Successfully processed batch {batch_number} of {total_batches}" + )) + except Exception as e: + error_message = str(e) + logger.error(f"Error processing batch {batch_number}: {error_message}") + + # Save error to database + db_ops.save_batch_to_db( + batch_number=batch_number, + total_batches=total_batches, + results_df=pd.DataFrame(), # Empty DataFrame for error case + status="error" + ) + + responses.append(BatchResponse( + batch_number=batch_number, + total_batches=total_batches, + processed_rows=len(chunk), + results_path="", + message=f"Error processing batch {batch_number}: {error_message}" + )) + + offset += batch_size + + logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") + return responses + except Exception as e: + logger.error(f"Error in streaming pipeline: {str(e)}") + logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + \ No newline at end of file diff --git a/app/salary_analytics/routes/reports.py b/app/salary_analytics/routes/reports.py new file mode 100644 index 0000000..24f9477 --- /dev/null +++ b/app/salary_analytics/routes/reports.py @@ -0,0 +1,78 @@ +from fastapi import APIRouter, HTTPException, BackgroundTasks +from fastapi.responses import FileResponse +from app.salary_analytics.helpers.response_helpers import AnalysisResponse +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.salary_analytics.core.state import state +from app.config import OUTPUT_PATHS +from app.utils.logger import logger +import os, time + +router = APIRouter() + + +@router.post("/generate/reports", response_model=AnalysisResponse) +async def generate_reports(background_tasks: BackgroundTasks): + """Generate salary earner reports.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting report generation...") + reports = state.pipeline.generate_salary_earner_reports() + logger.info("Reports generated successfully") + response = AnalysisResponse( + message="Reports generated successfully", + data={ + "verified_salary_earners": len(reports['final_table']), + "likely_salary_earners": len(reports['likely_salary_earner']), + "high_earners": reports['total_high_earners'] + } + ) + logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in report generation: {str(e)}") + logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + + + +@router.get("/download/{report_type}") +async def download_report(report_type: str): + """Download generated reports.""" + start_time = time.time() + try: + check_data_loaded() + logger.info(f"Attempting to download report: {report_type}") + file_paths = { + "high_earners": OUTPUT_PATHS["high_earner_details"], + "likely_earners": OUTPUT_PATHS["likely_salary_earner"], + "final_table": OUTPUT_PATHS["final_table"], + "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], + "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], + "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] + } + + if report_type not in file_paths: + logger.error(f"Report type not found: {report_type}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=404, detail="Report type not found") + + file_path = file_paths[report_type] + if not os.path.exists(file_path): + logger.error(f"Report file not found: {file_path}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=404, detail="Report file not found") + + logger.info(f"Successfully found report file: {file_path}") + response = FileResponse( + path=file_path, + filename=os.path.basename(file_path), + media_type="application/octet-stream" + ) + logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error downloading report: {str(e)}") + logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/app/salary_analytics/routes/train.py b/app/salary_analytics/routes/train.py new file mode 100644 index 0000000..94320cb --- /dev/null +++ b/app/salary_analytics/routes/train.py @@ -0,0 +1,33 @@ +import time +import logging +from fastapi import APIRouter, HTTPException +from app.salary_analytics.services.main import SalaryAnalyticsPipeline +from app.salary_analytics.helpers.data_checks import check_data_loaded +from app.salary_analytics.helpers.response_helpers import AnalysisResponse +from app.salary_analytics.core.state import state +from app.utils.logger import logger + + + +router = APIRouter() + + +@router.post("/train/models", response_model=AnalysisResponse) +async def train_models(): + """Train salary prediction models.""" + start_time = time.time() + try: + check_data_loaded() + logger.info("Starting model training...") + state.pipeline.train_salary_prediction_models() + logger.info("Models trained successfully") + response = AnalysisResponse( + message="Models trained successfully" + ) + logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") + return response + except Exception as e: + logger.error(f"Error in model training: {str(e)}") + logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") + raise HTTPException(status_code=500, detail=str(e)) + diff --git a/app/salary_analytics/services/__init__.py b/app/salary_analytics/services/__init__.py new file mode 100644 index 0000000..0f844f8 --- /dev/null +++ b/app/salary_analytics/services/__init__.py @@ -0,0 +1,24 @@ +from .main import SalaryAnalyticsPipeline +from .data_loader import DataLoader +from .keyword_analyzer import KeywordAnalyzer +from .consistent_amount_analyzer import ConsistentAmountAnalyzer +from .transaction_type_analyzer import TransactionTypeAnalyzer +from .salary_earner_analyzer import SalaryEarnerAnalyzer +from .salary_predictor import SalaryPredictor + + +""" +Salary Analytics Package +A package for analyzing and predicting salary patterns from transaction data. +""" + +__version__ = "0.1.0" +__all__ = [ + "SalaryAnalyticsPipeline", + "DataLoader", + "KeywordAnalyzer", + "ConsistentAmountAnalyzer", + "TransactionTypeAnalyzer", + "SalaryEarnerAnalyzer", + "SalaryPredictor" +] diff --git a/salary_analytics/consistent_amount_analyzer.py b/app/salary_analytics/services/consistent_amount_analyzer.py similarity index 97% rename from salary_analytics/consistent_amount_analyzer.py rename to app/salary_analytics/services/consistent_amount_analyzer.py index 3b74afc..119adae 100644 --- a/salary_analytics/consistent_amount_analyzer.py +++ b/app/salary_analytics/services/consistent_amount_analyzer.py @@ -3,7 +3,7 @@ Consistent amount transaction analysis module. """ import pandas as pd -from .config import MODEL_CONFIG +from app.config import MODEL_CONFIG class ConsistentAmountAnalyzer: def __init__(self, df): diff --git a/salary_analytics/data_loader.py b/app/salary_analytics/services/data_loader.py similarity index 95% rename from salary_analytics/data_loader.py rename to app/salary_analytics/services/data_loader.py index ec2da46..406608c 100644 --- a/salary_analytics/data_loader.py +++ b/app/salary_analytics/services/data_loader.py @@ -7,9 +7,8 @@ import pandas as pd from datetime import datetime import logging import os -from .config import DB_CONFIG, TABLE_NAME - -logger = logging.getLogger(__name__) +from app.config import SQLALCHEMY_DATABASE_URI, TABLE_NAME +from app.utils.logger import logger class DataLoader: def __init__(self): @@ -21,8 +20,7 @@ class DataLoader: """Establish database connection.""" try: logger.info("Attempting to connect to database...") - DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}" - self.engine = create_engine(DATABASE_URL) + self.engine = create_engine(SQLALCHEMY_DATABASE_URI) with self.engine.connect() as conn: # First check if table exists check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{TABLE_NAME}')") diff --git a/salary_analytics/keyword_analyzer.py b/app/salary_analytics/services/keyword_analyzer.py similarity index 96% rename from salary_analytics/keyword_analyzer.py rename to app/salary_analytics/services/keyword_analyzer.py index 5c45b0f..b9db479 100644 --- a/salary_analytics/keyword_analyzer.py +++ b/app/salary_analytics/services/keyword_analyzer.py @@ -4,7 +4,7 @@ Keyword-based salary transaction analysis module. import re import pandas as pd -from .config import SALARY_KEYWORDS +from app.config import SALARY_KEYWORDS class KeywordAnalyzer: def __init__(self, df): diff --git a/salary_analytics/main.py b/app/salary_analytics/services/main.py similarity index 99% rename from salary_analytics/main.py rename to app/salary_analytics/services/main.py index e2781e8..87482c7 100644 --- a/salary_analytics/main.py +++ b/app/salary_analytics/services/main.py @@ -9,8 +9,7 @@ from .consistent_amount_analyzer import ConsistentAmountAnalyzer from .transaction_type_analyzer import TransactionTypeAnalyzer from .salary_earner_analyzer import SalaryEarnerAnalyzer from .salary_predictor import SalaryPredictor - -logger = logging.getLogger(__name__) +from app.utils.logger import logger class SalaryAnalyticsPipeline: def __init__(self): diff --git a/salary_analytics/salary_earner_analyzer.py b/app/salary_analytics/services/salary_earner_analyzer.py similarity index 96% rename from salary_analytics/salary_earner_analyzer.py rename to app/salary_analytics/services/salary_earner_analyzer.py index b32c995..7302cbc 100644 --- a/salary_analytics/salary_earner_analyzer.py +++ b/app/salary_analytics/services/salary_earner_analyzer.py @@ -6,15 +6,8 @@ import pandas as pd import matplotlib.pyplot as plt from matplotlib_venn import venn3 from datetime import datetime, timedelta -import logging -from .config import MODEL_CONFIG, OUTPUT_PATHS - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) +from app.config import MODEL_CONFIG, OUTPUT_PATHS +from app.utils.logger import logger class SalaryEarnerAnalyzer: def __init__(self, df): diff --git a/salary_analytics/salary_predictor.py b/app/salary_analytics/services/salary_predictor.py similarity index 99% rename from salary_analytics/salary_predictor.py rename to app/salary_analytics/services/salary_predictor.py index b74dfd5..19fc531 100644 --- a/salary_analytics/salary_predictor.py +++ b/app/salary_analytics/services/salary_predictor.py @@ -9,7 +9,7 @@ from sklearn.preprocessing import StandardScaler, OneHotEncoder from sklearn.ensemble import RandomForestRegressor from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score from joblib import dump -from .config import OUTPUT_PATHS +from app.config import OUTPUT_PATHS class SalaryPredictor: def __init__(self, df): diff --git a/salary_analytics/transaction_type_analyzer.py b/app/salary_analytics/services/transaction_type_analyzer.py similarity index 93% rename from salary_analytics/transaction_type_analyzer.py rename to app/salary_analytics/services/transaction_type_analyzer.py index 2d1249c..0597aea 100644 --- a/salary_analytics/transaction_type_analyzer.py +++ b/app/salary_analytics/services/transaction_type_analyzer.py @@ -3,7 +3,7 @@ Transaction type analysis module. """ import pandas as pd -from .config import MODEL_CONFIG +from app.config import MODEL_CONFIG class TransactionTypeAnalyzer: def __init__(self, df): diff --git a/app/utils/logger.py b/app/utils/logger.py new file mode 100644 index 0000000..4ee0e98 --- /dev/null +++ b/app/utils/logger.py @@ -0,0 +1,13 @@ +import logging + +# Configure logging +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s - %(levelname)s - %(message)s", + handlers=[ + # logging.StreamHandler(), + logging.FileHandler("app.log", mode='a') # Log to file + ] +) + +logger = logging.getLogger("DetectionService") diff --git a/docker-compose.yml b/docker-compose.yml index b5a05e3..9205cb3 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,16 +1,17 @@ services: digifi-analytics: build: . + env_file: + - .env ports: - "${APP_PORT:-4800}:8000" - volumes: - - ./output:/app/output environment: - - DB_USER=salaryloan - - DB_PASSWORD=salaryloan - - DB_NAME=salaryloan - - DB_PORT=10532 - - DB_HOST=dev-data.simbrellang.net + - FLASK_APP=${FLASK_APP} + - FLASK_ENV=${FLASK_ENV} + - DATABASE_URL=postgresql+psycopg2://${DATABASE_USER}:${DATABASE_PASSWORD}@${DATABASE_HOST}:${DATABASE_PORT}/${DATABASE_NAME} + volumes: + - .:/app + - ./output:/app/output restart: unless-stopped networks: - salary_network diff --git a/migrations/env.py b/migrations/env.py index 79a3e8d..fa86225 100644 --- a/migrations/env.py +++ b/migrations/env.py @@ -19,7 +19,7 @@ if config.config_file_name is not None: # from myapp import Base # target_metadata = Base.metadata from flask import current_app -from salary_analytics.app.extensions import db +from app.extensions import db config.set_main_option('sqlalchemy.url', current_app.config.get('SQLALCHEMY_DATABASE_URI')) diff --git a/requirements.txt b/requirements.txt index 68fa9fc..461ceb2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,4 +1,8 @@ +# Database and ORM sqlalchemy>=2.0.0 +oracledb>=1.0.0 + + pandas>=1.5.0 numpy>=1.21.0 matplotlib>=3.5.0 @@ -17,4 +21,6 @@ openpyxl>=3.0.10 Flask>=2.0.0 Flask-SQLAlchemy>=3.0.0 Flask-Migrate>=4.0.0 -alembic>=1.8.0 \ No newline at end of file +alembic>=1.8.0 +requests>=2.26.0 +gunicorn \ No newline at end of file diff --git a/run.py b/run.py index 77e5bde..c4932c4 100644 --- a/run.py +++ b/run.py @@ -1,4 +1,4 @@ import os -from salary_analytics.app import create_app +from app.salary_analytics import create_app app = create_app() \ No newline at end of file diff --git a/salary_analytics/__init__.py b/salary_analytics/__init__.py deleted file mode 100644 index 1825412..0000000 --- a/salary_analytics/__init__.py +++ /dev/null @@ -1,6 +0,0 @@ -""" -Salary Analytics Package -A package for analyzing and predicting salary patterns from transaction data. -""" - -__version__ = "0.1.0" \ No newline at end of file diff --git a/salary_analytics/api.py b/salary_analytics/api.py deleted file mode 100644 index cb41a30..0000000 --- a/salary_analytics/api.py +++ /dev/null @@ -1,605 +0,0 @@ -""" -FastAPI application for salary analytics. -""" - -from fastapi import FastAPI, HTTPException, BackgroundTasks, UploadFile, File, Depends -from fastapi.responses import FileResponse -from fastapi.middleware.cors import CORSMiddleware -from pydantic import BaseModel -from typing import Optional, Dict, List, Union -import os -import socket -import logging -import pandas as pd -import tempfile -from datetime import datetime -from sqlalchemy import text, Table, Column, Integer, String, Float, DateTime, MetaData -import numpy as np -import warnings -import time -from .main import SalaryAnalyticsPipeline -from .config import OUTPUT_PATHS, TABLE_NAME, BATCH_RESULTS_TABLE -from .data_loader import DataLoader -from .salary_predictor import SalaryPredictor -from .salary_earner_analyzer import SalaryEarnerAnalyzer -from .db_operations import DatabaseOperations -from .salary_detect import SalaryDetect - -# Configure logging -logging.basicConfig( - level=logging.INFO, - format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' -) -logger = logging.getLogger(__name__) - -# Suppress warnings -warnings.filterwarnings('ignore', category=RuntimeWarning, module='numpy') -pd.options.mode.chained_assignment = None - -app = FastAPI( - title="Salary Analytics API", - description="API for analyzing and predicting salary patterns from transaction data", - version="1.0.0" -) - -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Allows all origins - allow_credentials=True, - allow_methods=["*"], # Allows all methods - allow_headers=["*"], # Allows all headers -) - -# Global pipeline instance -pipeline = SalaryAnalyticsPipeline() - -# Global variables to store loaded data and models -data_loader = None -df = None -salary_predictor = None -salary_earner_analyzer = None - -salary_detect = SalaryDetect() - -class AnalysisResponse(BaseModel): - """Response model for analysis endpoints.""" - message: str - data: Optional[Dict] = None - file_path: Optional[str] = None - -class BatchResponse(BaseModel): - """Response model for batch processing.""" - batch_number: int - total_batches: int - processed_rows: int - results_path: str - message: str - -def check_data_loaded(): - """Check if data is loaded before running analytics.""" - if pipeline.df is None: - raise HTTPException( - status_code=400, - detail="No data loaded. Please load data first using the /load-data endpoint." - ) - -@app.on_event("startup") -async def startup_event(): - """Initialize the pipeline on startup.""" - try: - logger.info("Initializing pipeline...") - - # Start autonomous salary detection loop - salary_detect.start() - logger.info("Started autonomous salary detection loop.") - - # Print network information - hostname = socket.gethostname() - ip_address = socket.gethostbyname(hostname) - logger.info(f"Server running on hostname: {hostname}") - logger.info(f"Server IP address: {ip_address}") - logger.info(f"Server is accessible at:") - logger.info(f"- http://localhost:8000") - logger.info(f"- http://127.0.0.1:8000") - logger.info(f"- http://{ip_address}:8000") - logger.info("Pipeline initialized successfully") - except Exception as e: - logger.error(f"Error during startup: {str(e)}") - raise - -@app.get("/") -async def root(): - """Root endpoint.""" - start_time = time.time() - logger.info("Root endpoint accessed") - response = {"message": "Welcome to Salary Analytics API"} - logger.info(f"Root endpoint completed in {time.time() - start_time:.2f} seconds") - return response - -@app.get("/health") -async def health_check(): - """Health check endpoint.""" - start_time = time.time() - logger.info("Health check endpoint accessed") - response = {"status": "healthy"} - logger.info(f"Health check completed in {time.time() - start_time:.2f} seconds") - return response - -@app.post("/analyze/keyword", response_model=AnalysisResponse) -async def analyze_keyword(): - """Run keyword-based salary transaction analysis.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting keyword analysis...") - data = pipeline.run_keyword_analysis() - logger.info(f"Keyword analysis completed. Found {len(data)} matches") - response = AnalysisResponse( - message="Keyword analysis completed successfully", - data={"count": len(data)} - ) - logger.info(f"Keyword analysis endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in keyword analysis: {str(e)}") - logger.info(f"Keyword analysis endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/analyze/consistent-amount", response_model=AnalysisResponse) -async def analyze_consistent_amount(): - """Run consistent amount transaction analysis.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting consistent amount analysis...") - data = pipeline.run_consistent_amount_analysis() - logger.info(f"Consistent amount analysis completed. Found {len(data)} matches") - response = AnalysisResponse( - message="Consistent amount analysis completed successfully", - data={"count": len(data)} - ) - logger.info(f"Consistent amount analysis endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in consistent amount analysis: {str(e)}") - logger.info(f"Consistent amount analysis endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/analyze/transaction-type", response_model=AnalysisResponse) -async def analyze_transaction_type(): - """Run transaction type analysis.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting transaction type analysis...") - data = pipeline.run_transaction_type_analysis() - logger.info(f"Transaction type analysis completed. Found {len(data)} matches") - response = AnalysisResponse( - message="Transaction type analysis completed successfully", - data={"count": len(data)} - ) - logger.info(f"Transaction type analysis endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in transaction type analysis: {str(e)}") - logger.info(f"Transaction type analysis endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/generate/reports", response_model=AnalysisResponse) -async def generate_reports(background_tasks: BackgroundTasks): - """Generate salary earner reports.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting report generation...") - reports = pipeline.generate_salary_earner_reports() - logger.info("Reports generated successfully") - response = AnalysisResponse( - message="Reports generated successfully", - data={ - "verified_salary_earners": len(reports['final_table']), - "likely_salary_earners": len(reports['likely_salary_earner']), - "high_earners": reports['total_high_earners'] - } - ) - logger.info(f"Report generation endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in report generation: {str(e)}") - logger.info(f"Report generation endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/train/models", response_model=AnalysisResponse) -async def train_models(): - """Train salary prediction models.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting model training...") - pipeline.train_salary_prediction_models() - logger.info("Models trained successfully") - response = AnalysisResponse( - message="Models trained successfully" - ) - logger.info(f"Model training endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in model training: {str(e)}") - logger.info(f"Model training endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.get("/download/{report_type}") -async def download_report(report_type: str): - """Download generated reports.""" - start_time = time.time() - try: - check_data_loaded() - logger.info(f"Attempting to download report: {report_type}") - file_paths = { - "high_earners": OUTPUT_PATHS["high_earner_details"], - "likely_earners": OUTPUT_PATHS["likely_salary_earner"], - "final_table": OUTPUT_PATHS["final_table"], - "consistent_plot": OUTPUT_PATHS["consistent_earners_plot"], - "inconsistent_plot": OUTPUT_PATHS["inconsistent_earners_plot"], - "hypothesis_plot": OUTPUT_PATHS["hypothesis_overlap_plot"] - } - - if report_type not in file_paths: - logger.error(f"Report type not found: {report_type}") - logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=404, detail="Report type not found") - - file_path = file_paths[report_type] - if not os.path.exists(file_path): - logger.error(f"Report file not found: {file_path}") - logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=404, detail="Report file not found") - - logger.info(f"Successfully found report file: {file_path}") - response = FileResponse( - path=file_path, - filename=os.path.basename(file_path), - media_type="application/octet-stream" - ) - logger.info(f"Download endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error downloading report: {str(e)}") - logger.info(f"Download endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/run/pipeline", response_model=AnalysisResponse) -async def run_full_pipeline(): - """Run the complete salary analytics pipeline.""" - start_time = time.time() - try: - check_data_loaded() - logger.info("Starting full pipeline...") - success = pipeline.run_full_pipeline() - if not success: - logger.error("Pipeline failed") - logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Pipeline failed") - - logger.info("Pipeline completed successfully") - response = AnalysisResponse( - message="Pipeline completed successfully" - ) - logger.info(f"Full pipeline endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error in pipeline: {str(e)}") - logger.info(f"Full pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -@app.post("/load-data") -async def load_data(source: str = "db", file: Optional[UploadFile] = File(None)): - """ - Load data from either database or CSV file. - - Args: - source (str): Source of data ('db' or 'csv') - file (UploadFile, optional): CSV file to load (required if source is 'csv') - - Returns: - dict: Status of data loading - """ - start_time = time.time() - try: - if source not in ['db', 'csv']: - logger.error(f"Invalid source: {source}") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - - if source == 'csv' and not file: - logger.error("No file provided for CSV source") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - - if source == 'csv': - # Save uploaded file temporarily - with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: - content = await file.read() - temp_file.write(content) - temp_file_path = temp_file.name - - try: - success = pipeline.load_data(source='csv', file_path=temp_file_path) - finally: - # Clean up temporary file - os.unlink(temp_file_path) - else: - success = pipeline.load_data(source='db') - - if not success: - logger.error("Failed to load data") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Failed to load data") - - response = { - "status": "success", - "message": f"Successfully loaded {len(pipeline.df)} rows of data", - "columns": pipeline.df.columns.tolist(), - "row_count": len(pipeline.df) - } - logger.info(f"Load data endpoint completed in {time.time() - start_time:.2f} seconds") - return response - except Exception as e: - logger.error(f"Error loading data: {str(e)}") - logger.info(f"Load data endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) - -async def get_file_if_csv(source: str, file: Optional[UploadFile] = File(None)): - """Dependency to handle file upload only when source is csv.""" - if source == 'csv' and not file: - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - return file - -@app.post("/run/streaming-pipeline", response_model=List[BatchResponse]) -async def run_streaming_pipeline( - source: str = "db", - batch_size: int = 10000, - file: Optional[Union[UploadFile, str]] = File(None) -): - """ - Run the complete salary analytics pipeline in batches. - - Args: - source (str): Source of data ('db' or 'csv') - batch_size (int): Number of rows to process in each batch - file (UploadFile, optional): CSV file to load (required if source is 'csv') - - Returns: - List[BatchResponse]: List of responses for each batch processed - """ - start_time = time.time() - try: - if source not in ['db', 'csv']: - logger.error(f"Invalid source: {source}") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="Source must be either 'db' or 'csv'") - - if source == 'csv' and not file: - logger.error("No file provided for CSV source") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=400, detail="File must be provided when loading from CSV") - - # Initialize data loader - data_loader = DataLoader() - data_loader.chunk_size = batch_size - - # Create output directory for batch results - timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - batch_output_dir = os.path.join(os.path.dirname(OUTPUT_PATHS['final_table']), f"batch_results_{timestamp}") - os.makedirs(batch_output_dir, exist_ok=True) - - # Initialize database operations - if not data_loader.connect(): - logger.error("Failed to connect to database") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Failed to connect to database") - - db_ops = DatabaseOperations(data_loader.engine) - if not db_ops.create_batch_results_table(): - logger.error("Failed to create batch results table") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail="Failed to create batch results table") - - responses = [] - batch_number = 0 - batch_start_time = time.time() - - def preprocess_chunk(chunk): - """Preprocess a chunk of data with the same logic as DataLoader.""" - # Convert dates - chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date']) - chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date']) - - # Rename columns - chunk = chunk.rename(columns={ - 'd1': 'trx_type', - 'd2': 'trx_subtype', - 'd3': 'initiated_by', - 'd4': 'customer_id' - }) - - chunk = chunk.dropna() - - return chunk - - if source == 'csv': - # Save uploaded file temporarily - with tempfile.NamedTemporaryFile(delete=False, suffix='.csv') as temp_file: - content = await file.read() - temp_file.write(content) - temp_file_path = temp_file.name - - try: - # Process CSV in chunks - for chunk in pd.read_csv(temp_file_path, chunksize=batch_size): - batch_number += 1 - logger.info(f"Processing batch {batch_number}") - - # Preprocess chunk - chunk = preprocess_chunk(chunk) - - # Run pipeline on chunk - pipeline = SalaryAnalyticsPipeline() - pipeline.df = chunk - - try: - batch_start_time = time.time() - # Run analyses - pipeline.run_keyword_analysis() - pipeline.run_consistent_amount_analysis() - pipeline.run_transaction_type_analysis() - - # Generate reports - reports = pipeline.generate_salary_earner_reports() - - # Add batch metadata to results - results_df = reports['final_table'].copy() - results_df['batch_number'] = batch_number - results_df['total_batches'] = -1 # Unknown for CSV - results_df['processed_at'] = datetime.now() - - # Save batch results to CSV - batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") - results_df.to_csv(batch_results_path, index=False) - - # Save to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=-1, # Unknown for CSV - results_df=results_df, - status="success" - ) - - logger.info(f"Batch {batch_number} processed in {time.time() - batch_start_time:.2f} seconds") - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=-1, # Unknown for CSV - processed_rows=len(chunk), - results_path=batch_results_path, - message=f"Successfully processed batch {batch_number}" - )) - except Exception as e: - error_message = str(e) - logger.error(f"Error processing batch {batch_number}: {error_message}") - - # Save error to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=-1, - results_df=pd.DataFrame(), # Empty DataFrame for error case - status="error" - ) - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=-1, - processed_rows=len(chunk), - results_path="", - message=f"Error processing batch {batch_number}: {error_message}" - )) - finally: - # Clean up temporary file - os.unlink(temp_file_path) - else: - # Process database in chunks - if not data_loader.connect(): - raise HTTPException(status_code=500, detail="Failed to connect to database") - - # Get total row count - with data_loader.engine.connect() as conn: - count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}") - total_rows = conn.execute(count_query).scalar() - - total_batches = (total_rows + batch_size - 1) // batch_size - offset = 0 - - while offset < total_rows: - batch_number += 1 - logger.info(f"Processing batch {batch_number} of {total_batches}") - - # Load chunk from database - query = f"SELECT * FROM {TABLE_NAME} LIMIT {batch_size} OFFSET {offset}" - chunk = pd.read_sql(query, data_loader.engine) - - if chunk.empty: - break - - # Preprocess chunk - chunk = preprocess_chunk(chunk) - - # Run pipeline on chunk - pipeline = SalaryAnalyticsPipeline() - pipeline.df = chunk - - try: - batch_start_time = time.time() - # Run analyses - pipeline.run_keyword_analysis() - pipeline.run_consistent_amount_analysis() - pipeline.run_transaction_type_analysis() - - # Generate reports - reports = pipeline.generate_salary_earner_reports() - - # Add batch metadata to results - results_df = reports['final_table'].copy() - results_df['batch_number'] = batch_number - results_df['total_batches'] = total_batches - results_df['processed_at'] = datetime.now() - - # Save batch results to CSV - batch_results_path = os.path.join(batch_output_dir, f"batch_{batch_number}_results.csv") - results_df.to_csv(batch_results_path, index=False) - - # Save to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=total_batches, - results_df=results_df, - status="success" - ) - - logger.info(f"Batch {batch_number} of {total_batches} processed in {time.time() - batch_start_time:.2f} seconds") - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=total_batches, - processed_rows=len(chunk), - results_path=batch_results_path, - message=f"Successfully processed batch {batch_number} of {total_batches}" - )) - except Exception as e: - error_message = str(e) - logger.error(f"Error processing batch {batch_number}: {error_message}") - - # Save error to database - db_ops.save_batch_to_db( - batch_number=batch_number, - total_batches=total_batches, - results_df=pd.DataFrame(), # Empty DataFrame for error case - status="error" - ) - - responses.append(BatchResponse( - batch_number=batch_number, - total_batches=total_batches, - processed_rows=len(chunk), - results_path="", - message=f"Error processing batch {batch_number}: {error_message}" - )) - - offset += batch_size - - logger.info(f"Streaming pipeline endpoint completed in {time.time() - start_time:.2f} seconds") - return responses - except Exception as e: - logger.error(f"Error in streaming pipeline: {str(e)}") - logger.info(f"Streaming pipeline endpoint failed after {time.time() - start_time:.2f} seconds") - raise HTTPException(status_code=500, detail=str(e)) \ No newline at end of file diff --git a/wsgi.py b/wsgi.py new file mode 100644 index 0000000..49f70e1 --- /dev/null +++ b/wsgi.py @@ -0,0 +1,7 @@ +from app import create_app + +app = create_app() + +if __name__ != "__main__": + # Expose WSGI app instance for Gunicorn + wsgi_app = app