Enhance XLS upload functionality and update requirements. Added Flask, Flask-SQLAlchemy, and Alembic to requirements. Modified database schema in upload_xls.py for improved data handling and added SQLAlchemy configuration in config.py.

This commit is contained in:
2025-06-09 15:34:18 +01:00
parent f478a52a2f
commit c00bb71d2a
14 changed files with 427 additions and 33 deletions
Binary file not shown.
+36 -32
View File
@@ -32,23 +32,21 @@ def create_table_if_not_exists(conn):
cur.execute("""
CREATE TABLE IF NOT EXISTS analytics_raw_transactions (
id SERIAL PRIMARY KEY,
cif_id TEXT,
acid TEXT,
ref_num TEXT,
entry_usr TEXT,
tran_id TEXT,
tran_date TIMESTAMP NULL,
value_date TIMESTAMP NULL,
cust_id VARCHAR(10),
accountid VARCHAR(10),
tran_id VARCHAR(12),
entry_date TIMESTAMP NULL,
value_date TIMESTAMP NULL,
pstd_date TIMESTAMP NULL,
tran_subtype TEXT,
part_tran_type TEXT,
isreverse TEXT,
reverse TEXT,
tran_particular TEXT,
channel TEXT,
amount DECIMAL(20,2),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
tran_date TIMESTAMP NULL,
tran_sub_ty VARCHAR(4),
part_tran_ty VARCHAR(4),
channel VARCHAR(32),
tran_amt DECIMAL(20,2),
balance DECIMAL(20,2),
isreverse VARCHAR(4),
reverse VARCHAR(4),
tran_particular VARCHAR(100)
)
""")
conn.commit()
@@ -59,14 +57,21 @@ def upload_xls_to_db(xls_path):
# Read XLS file
df = pd.read_excel(xls_path, dtype=str)
# Convert date columns to datetime, errors='coerce' will set invalid parsing as NaT
for col in ["TRAN_DATE", "VALUE_DATE", "ENTRY_DATE", "PSTD_DATE"]:
# Convert date columns to datetime
date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"]
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
df[col] = df[col].fillna(pd.Timestamp.now())
# Convert AMOUNT to numeric
if "AMOUNT" in df.columns:
df["AMOUNT"] = pd.to_numeric(df["AMOUNT"], errors='coerce')
# Convert numeric columns
for col in ["TRAN_AMT", "BALANCE"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce')
# Truncate TRAN_PARTICULAR to 100 chars
if "TRAN_PARTICULAR" in df.columns:
df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100)
# Connect to database
conn = connect_to_db()
@@ -83,26 +88,25 @@ def upload_xls_to_db(xls_path):
cur,
"""
INSERT INTO analytics_raw_transactions
(cif_id, acid, ref_num, entry_usr, tran_id, tran_date, value_date, entry_date, pstd_date, tran_subtype, part_tran_type, isreverse, reverse, tran_particular, channel, amount)
(cust_id, accountid, tran_id, entry_date, value_date, pstd_date, tran_date, tran_sub_ty, part_tran_ty, channel, tran_amt, balance, isreverse, reverse, tran_particular)
VALUES %s
""",
[(
row.get('CIF_ID'),
row.get('ACID'),
row.get('REF_NUM'),
row.get('ENTRY_USR'),
row.get('CUST_ID'),
row.get('ACCOUNTID'),
row.get('TRAN_ID'),
row.get('TRAN_DATE'),
row.get('VALUE_DATE'),
row.get('ENTRY_DATE'),
row.get('VALUE_DATE'),
row.get('PSTD_DATE'),
row.get('TRAN_SUBTYPE'),
row.get('PART_TRAN_TYPE'),
row.get('TRAN_DATE'),
row.get('TRAN_SUB_TY'),
row.get('PART_TRAN_TY'),
row.get('CHANNEL'),
row.get('TRAN_AMT'),
row.get('BALANCE'),
row.get('ISREVERSE'),
row.get('REVERSE'),
row.get('TRAN_PARTICULAR'),
row.get('CHANNEL'),
row.get('AMOUNT')
(row.get('TRAN_PARTICULAR') or '')[:100]
) for row in data]
)
+56
View File
@@ -0,0 +1,56 @@
# Database Migration & Data Upload Guide
This project uses **Flask-Migrate** (powered by Alembic) for managing database schema changes and a custom Flask CLI command for uploading XLS data.
## Migration Workflow
Follow these steps to manage database schema and upload data:
1. **Set Flask App Environment Variable:**
Open your terminal in the project root (`digifi-Analytics/`) and set the `FLASK_APP` variable:
```bash
# For Windows
set FLASK_APP=run.py
# For macOS/Linux
export FLASK_APP=run.py
```
2. **Initialize Migration Repository (First-time setup only):**
This command sets up the `migrations/` directory structure. You've likely already run this, so you might get a "Directory already exists" message, which is fine.
```bash
flask --app run.py db init
```
3. **Generate a New Migration Script:**
After making changes to `salary_analytics/app/models.py` (e.g., adding a new table or column), use this command to create a migration file. This file will contain the `upgrade()` and `downgrade()` logic.
```bash
flask --app run.py db migrate -m "Descriptive message for your changes"
```
4. **Review the Generated Migration:**
Open the newly created `.py` file in `migrations/versions/` (e.g., `XXXXXXXXXXXX_your_message.py`) and review the `upgrade()` and `downgrade()` functions. Ensure they accurately reflect your intended schema changes.
5. **Apply Migrations to the Database:**
This command executes the pending migration scripts, updating your database schema. **Always apply migrations after generating them.**
```bash
flask --app run.py db upgrade
```
## Uploading XLS Data
Once your `analytics_raw_transactions` table is created via migration, you can upload your XLS files using the custom command:
```bash
flask --app run.py upload-xls <path_to_your_xls_file>
```
**Example:**
```bash
flask --app run.py upload-xls data/transactions.xls
```
+44
View File
@@ -0,0 +1,44 @@
[alembic]
script_location = migrations
sqlalchemy.url =
revision_environment = false
version_locations = %(script_location)s/versions
output_encoding = utf-8
[post_write_hooks]
hooks =
hook_default_ops =
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARNING
handlers = console
qualname =
[logger_sqlalchemy]
level = WARNING
handlers = console
qualname = sqlalchemy
[logger_alembic]
level = DEBUG
handlers = console
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
+89
View File
@@ -0,0 +1,89 @@
from logging.config import fileConfig
from sqlalchemy import engine_from_config
from sqlalchemy import pool
from alembic import context
# this is the Alembic Config object, which provides
# access to values within the .ini file in use.
config = context.config
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# add your model's MetaData object here
# for 'autogenerate' support
# from myapp import Base
# target_metadata = Base.metadata
from flask import current_app
from salary_analytics.app.extensions import db
config.set_main_option('sqlalchemy.url',
current_app.config.get('SQLALCHEMY_DATABASE_URI'))
target_metadata = db.metadata
# Function to filter objects for autogenerate
def include_object(object, name, type_, reflected, compare_to):
"""
Control which objects are included in the autogenerate process.
Only include objects that are part of the Flask app's declared models.
"""
if type_ == "table" and reflected and name not in target_metadata.tables:
return False # Exclude tables that are in DB but not in models
return True # Include everything else (e.g., our modeled tables, columns, etc.)
def run_migrations_offline():
"""Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a database to begin with.
Calls to context.execute() here emit the given string to the
script output.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
include_object=include_object
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online():
"""Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection, target_metadata=target_metadata,
include_object=include_object
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
+24
View File
@@ -0,0 +1,24 @@
"""${message}
Revision ID: ${up_revision}
Revises: ${down_revision | comma,n}
Create Date: ${create_date}
"""
from alembic import op
import sqlalchemy as sa
${imports if imports else ""}
# revision identifiers, used by Alembic.
revision = ${repr(up_revision)}
down_revision = ${repr(down_revision)}
branch_labels = ${repr(branch_labels)}
depends_on = ${repr(depends_on)}
def upgrade():
${upgrades if upgrades else "pass"}
def downgrade():
${downgrades if downgrades else "pass"}
@@ -0,0 +1,46 @@
"""Create analytics_raw_transactions table
Revision ID: c2b2cdbc8022
Revises:
Create Date: 2025-06-09 15:19:36.017861
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = 'c2b2cdbc8022'
down_revision = None
branch_labels = None
depends_on = None
def upgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.create_table('analytics_raw_transactions',
sa.Column('id', sa.Integer(), nullable=False),
sa.Column('cust_id', sa.String(length=10), nullable=True),
sa.Column('accountid', sa.String(length=10), nullable=True),
sa.Column('tran_id', sa.String(length=12), nullable=True),
sa.Column('entry_date', sa.TIMESTAMP(), nullable=True),
sa.Column('value_date', sa.TIMESTAMP(), nullable=True),
sa.Column('pstd_date', sa.TIMESTAMP(), nullable=True),
sa.Column('tran_date', sa.TIMESTAMP(), nullable=True),
sa.Column('tran_sub_ty', sa.String(length=4), nullable=True),
sa.Column('part_tran_ty', sa.String(length=4), nullable=True),
sa.Column('channel', sa.String(length=32), nullable=True),
sa.Column('tran_amt', sa.Numeric(precision=20, scale=2), nullable=True),
sa.Column('balance', sa.Numeric(precision=20, scale=2), nullable=True),
sa.Column('isreverse', sa.String(length=4), nullable=True),
sa.Column('reverse', sa.String(length=4), nullable=True),
sa.Column('tran_particular', sa.String(length=100), nullable=True),
sa.PrimaryKeyConstraint('id')
)
# ### end Alembic commands ###
def downgrade():
# ### commands auto generated by Alembic - please adjust! ###
op.drop_table('analytics_raw_transactions')
# ### end Alembic commands ###
+5 -1
View File
@@ -13,4 +13,8 @@ pydantic>=1.8.0
python-multipart>=0.0.5
python-dotenv>=0.19.0
joblib>=1.1.0
openpyxl>=3.0.10
openpyxl>=3.0.10
Flask>=2.0.0
Flask-SQLAlchemy>=3.0.0
Flask-Migrate>=4.0.0
alembic>=1.8.0
+4
View File
@@ -0,0 +1,4 @@
import os
from salary_analytics.app import create_app
app = create_app()
+17
View File
@@ -0,0 +1,17 @@
from flask import Flask
import os
from .extensions import db, migrate
def create_app():
app = Flask(__name__)
app.config.from_object('salary_analytics.config')
# Initialize extensions
db.init_app(app)
migrate.init_app(app, db)
# Register blueprints or CLI commands here if needed
from . import commands
app.cli.add_command(commands.upload_xls_cli)
return app
+70
View File
@@ -0,0 +1,70 @@
import click
import pandas as pd
from datetime import datetime
from flask.cli import with_appcontext
from salary_analytics.app.extensions import db
from salary_analytics.app.models import RawTransaction
@click.group()
def commands():
"""Management commands for the salary analytics application."""
pass
@commands.command('upload-xls')
@click.argument('xls_path')
@with_appcontext
def upload_xls_cli(xls_path):
"""Uploads data from an XLS file to the analytics_raw_transactions table.
Args:
xls_path (str): The path to the XLS file.
"""
print(f"Attempting to upload data from {xls_path}...")
try:
df = pd.read_excel(xls_path, dtype=str)
# Convert date columns to datetime
date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"]
for col in date_cols:
if col in df.columns:
df[col] = pd.to_datetime(df[col], errors='coerce')
df[col] = df[col].fillna(pd.Timestamp.now())
# Convert numeric columns
for col in ["TRAN_AMT", "BALANCE"]:
if col in df.columns:
df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce')
# Truncate TRAN_PARTICULAR to 100 chars
if "TRAN_PARTICULAR" in df.columns:
df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100)
# Prepare data for insertion into the database
records = []
for index, row in df.iterrows():
record = RawTransaction(
cust_id=row.get('CUST_ID'),
accountid=row.get('ACCOUNTID'),
tran_id=row.get('TRAN_ID'),
entry_date=row.get('ENTRY_DATE'),
value_date=row.get('VALUE_DATE'),
pstd_date=row.get('PSTD_DATE'),
tran_date=row.get('TRAN_DATE'),
tran_sub_ty=row.get('TRAN_SUB_TY'),
part_tran_ty=row.get('PART_TRAN_TY'),
channel=row.get('CHANNEL'),
tran_amt=row.get('TRAN_AMT'),
balance=row.get('BALANCE'),
isreverse=row.get('ISREVERSE'),
reverse=row.get('REVERSE'),
tran_particular=(row.get('TRAN_PARTICULAR') or '')[:100]
)
records.append(record)
db.session.add_all(records)
db.session.commit()
print(f"Successfully uploaded {len(records)} records to analytics_raw_transactions")
except Exception as e:
db.session.rollback()
print(f"Error uploading data: {str(e)}")
+5
View File
@@ -0,0 +1,5 @@
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
+24
View File
@@ -0,0 +1,24 @@
from .extensions import db
class RawTransaction(db.Model):
__tablename__ = 'analytics_raw_transactions'
id = db.Column(db.Integer, primary_key=True)
cust_id = db.Column(db.String(10))
accountid = db.Column(db.String(10))
tran_id = db.Column(db.String(12))
entry_date = db.Column(db.TIMESTAMP, nullable=True)
value_date = db.Column(db.TIMESTAMP, nullable=True)
pstd_date = db.Column(db.TIMESTAMP, nullable=True)
tran_date = db.Column(db.TIMESTAMP, nullable=True)
tran_sub_ty = db.Column(db.String(4))
part_tran_ty = db.Column(db.String(4))
channel = db.Column(db.String(32))
tran_amt = db.Column(db.Numeric(20, 2))
balance = db.Column(db.Numeric(20, 2))
isreverse = db.Column(db.String(4))
reverse = db.Column(db.String(4))
tran_particular = db.Column(db.String(100))
def __repr__(self):
return f'<RawTransaction {self.tran_id}>'
+7
View File
@@ -30,6 +30,13 @@ DB_CONFIG = {
"host": os.getenv("DB_HOST")
}
# SQLAlchemy Configuration
SQLALCHEMY_DATABASE_URI = (
f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Table Configuration
TABLE_NAME = "customer_account_transaction_hx"
BATCH_RESULTS_TABLE = "salary_analytics_batch_results"