diff --git a/demo/New_Sample_Transaction.xls b/demo/New_Sample_Transaction.xls new file mode 100644 index 0000000..45e36ef Binary files /dev/null and b/demo/New_Sample_Transaction.xls differ diff --git a/demo/upload_xls.py b/demo/upload_xls.py index 38ea71b..80acb0e 100644 --- a/demo/upload_xls.py +++ b/demo/upload_xls.py @@ -32,23 +32,21 @@ def create_table_if_not_exists(conn): cur.execute(""" CREATE TABLE IF NOT EXISTS analytics_raw_transactions ( id SERIAL PRIMARY KEY, - cif_id TEXT, - acid TEXT, - ref_num TEXT, - entry_usr TEXT, - tran_id TEXT, - tran_date TIMESTAMP NULL, - value_date TIMESTAMP NULL, + cust_id VARCHAR(10), + accountid VARCHAR(10), + tran_id VARCHAR(12), entry_date TIMESTAMP NULL, + value_date TIMESTAMP NULL, pstd_date TIMESTAMP NULL, - tran_subtype TEXT, - part_tran_type TEXT, - isreverse TEXT, - reverse TEXT, - tran_particular TEXT, - channel TEXT, - amount DECIMAL(20,2), - created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + tran_date TIMESTAMP NULL, + tran_sub_ty VARCHAR(4), + part_tran_ty VARCHAR(4), + channel VARCHAR(32), + tran_amt DECIMAL(20,2), + balance DECIMAL(20,2), + isreverse VARCHAR(4), + reverse VARCHAR(4), + tran_particular VARCHAR(100) ) """) conn.commit() @@ -59,14 +57,21 @@ def upload_xls_to_db(xls_path): # Read XLS file df = pd.read_excel(xls_path, dtype=str) - # Convert date columns to datetime, errors='coerce' will set invalid parsing as NaT - for col in ["TRAN_DATE", "VALUE_DATE", "ENTRY_DATE", "PSTD_DATE"]: + # Convert date columns to datetime + date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"] + for col in date_cols: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce') + df[col] = df[col].fillna(pd.Timestamp.now()) - # Convert AMOUNT to numeric - if "AMOUNT" in df.columns: - df["AMOUNT"] = pd.to_numeric(df["AMOUNT"], errors='coerce') + # Convert numeric columns + for col in ["TRAN_AMT", "BALANCE"]: + if col in df.columns: + df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce') + + # Truncate TRAN_PARTICULAR to 100 chars + if "TRAN_PARTICULAR" in df.columns: + df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100) # Connect to database conn = connect_to_db() @@ -83,26 +88,25 @@ def upload_xls_to_db(xls_path): cur, """ INSERT INTO analytics_raw_transactions - (cif_id, acid, ref_num, entry_usr, tran_id, tran_date, value_date, entry_date, pstd_date, tran_subtype, part_tran_type, isreverse, reverse, tran_particular, channel, amount) + (cust_id, accountid, tran_id, entry_date, value_date, pstd_date, tran_date, tran_sub_ty, part_tran_ty, channel, tran_amt, balance, isreverse, reverse, tran_particular) VALUES %s """, [( - row.get('CIF_ID'), - row.get('ACID'), - row.get('REF_NUM'), - row.get('ENTRY_USR'), + row.get('CUST_ID'), + row.get('ACCOUNTID'), row.get('TRAN_ID'), - row.get('TRAN_DATE'), - row.get('VALUE_DATE'), row.get('ENTRY_DATE'), + row.get('VALUE_DATE'), row.get('PSTD_DATE'), - row.get('TRAN_SUBTYPE'), - row.get('PART_TRAN_TYPE'), + row.get('TRAN_DATE'), + row.get('TRAN_SUB_TY'), + row.get('PART_TRAN_TY'), + row.get('CHANNEL'), + row.get('TRAN_AMT'), + row.get('BALANCE'), row.get('ISREVERSE'), row.get('REVERSE'), - row.get('TRAN_PARTICULAR'), - row.get('CHANNEL'), - row.get('AMOUNT') + (row.get('TRAN_PARTICULAR') or '')[:100] ) for row in data] ) diff --git a/migrations/README.md b/migrations/README.md new file mode 100644 index 0000000..75b98dd --- /dev/null +++ b/migrations/README.md @@ -0,0 +1,56 @@ +# Database Migration & Data Upload Guide + +This project uses **Flask-Migrate** (powered by Alembic) for managing database schema changes and a custom Flask CLI command for uploading XLS data. + +## Migration Workflow + +Follow these steps to manage database schema and upload data: + +1. **Set Flask App Environment Variable:** + Open your terminal in the project root (`digifi-Analytics/`) and set the `FLASK_APP` variable: + + ```bash + # For Windows + set FLASK_APP=run.py + + # For macOS/Linux + export FLASK_APP=run.py + ``` + +2. **Initialize Migration Repository (First-time setup only):** + This command sets up the `migrations/` directory structure. You've likely already run this, so you might get a "Directory already exists" message, which is fine. + + ```bash + flask --app run.py db init + ``` + +3. **Generate a New Migration Script:** + After making changes to `salary_analytics/app/models.py` (e.g., adding a new table or column), use this command to create a migration file. This file will contain the `upgrade()` and `downgrade()` logic. + + ```bash + flask --app run.py db migrate -m "Descriptive message for your changes" + ``` + +4. **Review the Generated Migration:** + Open the newly created `.py` file in `migrations/versions/` (e.g., `XXXXXXXXXXXX_your_message.py`) and review the `upgrade()` and `downgrade()` functions. Ensure they accurately reflect your intended schema changes. + +5. **Apply Migrations to the Database:** + This command executes the pending migration scripts, updating your database schema. **Always apply migrations after generating them.** + + ```bash + flask --app run.py db upgrade + ``` + +## Uploading XLS Data + +Once your `analytics_raw_transactions` table is created via migration, you can upload your XLS files using the custom command: + +```bash +flask --app run.py upload-xls +``` + +**Example:** + +```bash +flask --app run.py upload-xls data/transactions.xls +``` \ No newline at end of file diff --git a/migrations/alembic.ini b/migrations/alembic.ini new file mode 100644 index 0000000..50e832b --- /dev/null +++ b/migrations/alembic.ini @@ -0,0 +1,44 @@ +[alembic] +script_location = migrations +sqlalchemy.url = +revision_environment = false +version_locations = %(script_location)s/versions +output_encoding = utf-8 + +[post_write_hooks] +hooks = +hook_default_ops = + +[loggers] +keys = root,sqlalchemy,alembic + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = WARNING +handlers = console +qualname = + +[logger_sqlalchemy] +level = WARNING +handlers = console +qualname = sqlalchemy + +[logger_alembic] +level = DEBUG +handlers = console +qualname = alembic + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(levelname)-5.5s [%(name)s] %(message)s +datefmt = %H:%M:%S \ No newline at end of file diff --git a/migrations/env.py b/migrations/env.py new file mode 100644 index 0000000..79a3e8d --- /dev/null +++ b/migrations/env.py @@ -0,0 +1,89 @@ +from logging.config import fileConfig + +from sqlalchemy import engine_from_config +from sqlalchemy import pool + +from alembic import context + +# this is the Alembic Config object, which provides +# access to values within the .ini file in use. +config = context.config + +# Interpret the config file for Python logging. +# This line sets up loggers basically. +if config.config_file_name is not None: + fileConfig(config.config_file_name) + +# add your model's MetaData object here +# for 'autogenerate' support +# from myapp import Base +# target_metadata = Base.metadata +from flask import current_app +from salary_analytics.app.extensions import db + +config.set_main_option('sqlalchemy.url', + current_app.config.get('SQLALCHEMY_DATABASE_URI')) +target_metadata = db.metadata + +# Function to filter objects for autogenerate +def include_object(object, name, type_, reflected, compare_to): + """ + Control which objects are included in the autogenerate process. + Only include objects that are part of the Flask app's declared models. + """ + if type_ == "table" and reflected and name not in target_metadata.tables: + return False # Exclude tables that are in DB but not in models + return True # Include everything else (e.g., our modeled tables, columns, etc.) + +def run_migrations_offline(): + """Run migrations in 'offline' mode. + + This configures the context with just a URL + and not an Engine, though an Engine is acceptable + here as well. By skipping the Engine creation + we don't even need a database to begin with. + + Calls to context.execute() here emit the given string to the + script output. + + """ + url = config.get_main_option("sqlalchemy.url") + context.configure( + url=url, + target_metadata=target_metadata, + literal_binds=True, + dialect_opts={"paramstyle": "named"}, + include_object=include_object + ) + + with context.begin_transaction(): + context.run_migrations() + + +def run_migrations_online(): + """Run migrations in 'online' mode. + + In this scenario we need to create an Engine + and associate a connection with the context. + + """ + connectable = engine_from_config( + config.get_section(config.config_ini_section, {}), + prefix="sqlalchemy.", + poolclass=pool.NullPool, + ) + + with connectable.connect() as connection: + context.configure( + connection=connection, target_metadata=target_metadata, + include_object=include_object + ) + + with context.begin_transaction(): + context.run_migrations() + + +if context.is_offline_mode(): + run_migrations_offline() +else: + run_migrations_online() \ No newline at end of file diff --git a/migrations/script.py.mako b/migrations/script.py.mako new file mode 100644 index 0000000..2c01563 --- /dev/null +++ b/migrations/script.py.mako @@ -0,0 +1,24 @@ +"""${message} + +Revision ID: ${up_revision} +Revises: ${down_revision | comma,n} +Create Date: ${create_date} + +""" +from alembic import op +import sqlalchemy as sa +${imports if imports else ""} + +# revision identifiers, used by Alembic. +revision = ${repr(up_revision)} +down_revision = ${repr(down_revision)} +branch_labels = ${repr(branch_labels)} +depends_on = ${repr(depends_on)} + + +def upgrade(): + ${upgrades if upgrades else "pass"} + + +def downgrade(): + ${downgrades if downgrades else "pass"} diff --git a/migrations/versions/c2b2cdbc8022_create_analytics_raw_transactions_table.py b/migrations/versions/c2b2cdbc8022_create_analytics_raw_transactions_table.py new file mode 100644 index 0000000..d4ff734 --- /dev/null +++ b/migrations/versions/c2b2cdbc8022_create_analytics_raw_transactions_table.py @@ -0,0 +1,46 @@ +"""Create analytics_raw_transactions table + +Revision ID: c2b2cdbc8022 +Revises: +Create Date: 2025-06-09 15:19:36.017861 + +""" +from alembic import op +import sqlalchemy as sa + + +# revision identifiers, used by Alembic. +revision = 'c2b2cdbc8022' +down_revision = None +branch_labels = None +depends_on = None + + +def upgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table('analytics_raw_transactions', + sa.Column('id', sa.Integer(), nullable=False), + sa.Column('cust_id', sa.String(length=10), nullable=True), + sa.Column('accountid', sa.String(length=10), nullable=True), + sa.Column('tran_id', sa.String(length=12), nullable=True), + sa.Column('entry_date', sa.TIMESTAMP(), nullable=True), + sa.Column('value_date', sa.TIMESTAMP(), nullable=True), + sa.Column('pstd_date', sa.TIMESTAMP(), nullable=True), + sa.Column('tran_date', sa.TIMESTAMP(), nullable=True), + sa.Column('tran_sub_ty', sa.String(length=4), nullable=True), + sa.Column('part_tran_ty', sa.String(length=4), nullable=True), + sa.Column('channel', sa.String(length=32), nullable=True), + sa.Column('tran_amt', sa.Numeric(precision=20, scale=2), nullable=True), + sa.Column('balance', sa.Numeric(precision=20, scale=2), nullable=True), + sa.Column('isreverse', sa.String(length=4), nullable=True), + sa.Column('reverse', sa.String(length=4), nullable=True), + sa.Column('tran_particular', sa.String(length=100), nullable=True), + sa.PrimaryKeyConstraint('id') + ) + # ### end Alembic commands ### + + +def downgrade(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table('analytics_raw_transactions') + # ### end Alembic commands ### diff --git a/requirements.txt b/requirements.txt index b2d72d4..68fa9fc 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,4 +13,8 @@ pydantic>=1.8.0 python-multipart>=0.0.5 python-dotenv>=0.19.0 joblib>=1.1.0 -openpyxl>=3.0.10 \ No newline at end of file +openpyxl>=3.0.10 +Flask>=2.0.0 +Flask-SQLAlchemy>=3.0.0 +Flask-Migrate>=4.0.0 +alembic>=1.8.0 \ No newline at end of file diff --git a/run.py b/run.py new file mode 100644 index 0000000..77e5bde --- /dev/null +++ b/run.py @@ -0,0 +1,4 @@ +import os +from salary_analytics.app import create_app + +app = create_app() \ No newline at end of file diff --git a/salary_analytics/app/__init__.py b/salary_analytics/app/__init__.py new file mode 100644 index 0000000..b286361 --- /dev/null +++ b/salary_analytics/app/__init__.py @@ -0,0 +1,17 @@ +from flask import Flask +import os +from .extensions import db, migrate + +def create_app(): + app = Flask(__name__) + app.config.from_object('salary_analytics.config') + + # Initialize extensions + db.init_app(app) + migrate.init_app(app, db) + + # Register blueprints or CLI commands here if needed + from . import commands + app.cli.add_command(commands.upload_xls_cli) + + return app \ No newline at end of file diff --git a/salary_analytics/app/commands.py b/salary_analytics/app/commands.py new file mode 100644 index 0000000..b90b73e --- /dev/null +++ b/salary_analytics/app/commands.py @@ -0,0 +1,70 @@ +import click +import pandas as pd +from datetime import datetime +from flask.cli import with_appcontext +from salary_analytics.app.extensions import db +from salary_analytics.app.models import RawTransaction + +@click.group() +def commands(): + """Management commands for the salary analytics application.""" + pass + +@commands.command('upload-xls') +@click.argument('xls_path') +@with_appcontext +def upload_xls_cli(xls_path): + """Uploads data from an XLS file to the analytics_raw_transactions table. + + Args: + xls_path (str): The path to the XLS file. + """ + print(f"Attempting to upload data from {xls_path}...") + try: + df = pd.read_excel(xls_path, dtype=str) + + # Convert date columns to datetime + date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"] + for col in date_cols: + if col in df.columns: + df[col] = pd.to_datetime(df[col], errors='coerce') + df[col] = df[col].fillna(pd.Timestamp.now()) + + # Convert numeric columns + for col in ["TRAN_AMT", "BALANCE"]: + if col in df.columns: + df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce') + + # Truncate TRAN_PARTICULAR to 100 chars + if "TRAN_PARTICULAR" in df.columns: + df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100) + + # Prepare data for insertion into the database + records = [] + for index, row in df.iterrows(): + record = RawTransaction( + cust_id=row.get('CUST_ID'), + accountid=row.get('ACCOUNTID'), + tran_id=row.get('TRAN_ID'), + entry_date=row.get('ENTRY_DATE'), + value_date=row.get('VALUE_DATE'), + pstd_date=row.get('PSTD_DATE'), + tran_date=row.get('TRAN_DATE'), + tran_sub_ty=row.get('TRAN_SUB_TY'), + part_tran_ty=row.get('PART_TRAN_TY'), + channel=row.get('CHANNEL'), + tran_amt=row.get('TRAN_AMT'), + balance=row.get('BALANCE'), + isreverse=row.get('ISREVERSE'), + reverse=row.get('REVERSE'), + tran_particular=(row.get('TRAN_PARTICULAR') or '')[:100] + ) + records.append(record) + + db.session.add_all(records) + db.session.commit() + print(f"Successfully uploaded {len(records)} records to analytics_raw_transactions") + + except Exception as e: + db.session.rollback() + print(f"Error uploading data: {str(e)}") \ No newline at end of file diff --git a/salary_analytics/app/extensions.py b/salary_analytics/app/extensions.py new file mode 100644 index 0000000..11cfaa6 --- /dev/null +++ b/salary_analytics/app/extensions.py @@ -0,0 +1,5 @@ +from flask_sqlalchemy import SQLAlchemy +from flask_migrate import Migrate + +db = SQLAlchemy() +migrate = Migrate() \ No newline at end of file diff --git a/salary_analytics/app/models.py b/salary_analytics/app/models.py new file mode 100644 index 0000000..72615f9 --- /dev/null +++ b/salary_analytics/app/models.py @@ -0,0 +1,24 @@ +from .extensions import db + +class RawTransaction(db.Model): + __tablename__ = 'analytics_raw_transactions' + + id = db.Column(db.Integer, primary_key=True) + cust_id = db.Column(db.String(10)) + accountid = db.Column(db.String(10)) + tran_id = db.Column(db.String(12)) + entry_date = db.Column(db.TIMESTAMP, nullable=True) + value_date = db.Column(db.TIMESTAMP, nullable=True) + pstd_date = db.Column(db.TIMESTAMP, nullable=True) + tran_date = db.Column(db.TIMESTAMP, nullable=True) + tran_sub_ty = db.Column(db.String(4)) + part_tran_ty = db.Column(db.String(4)) + channel = db.Column(db.String(32)) + tran_amt = db.Column(db.Numeric(20, 2)) + balance = db.Column(db.Numeric(20, 2)) + isreverse = db.Column(db.String(4)) + reverse = db.Column(db.String(4)) + tran_particular = db.Column(db.String(100)) + + def __repr__(self): + return f'' \ No newline at end of file diff --git a/salary_analytics/config.py b/salary_analytics/config.py index 8277286..1022fbd 100644 --- a/salary_analytics/config.py +++ b/salary_analytics/config.py @@ -30,6 +30,13 @@ DB_CONFIG = { "host": os.getenv("DB_HOST") } +# SQLAlchemy Configuration +SQLALCHEMY_DATABASE_URI = ( + f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@" + f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}" +) +SQLALCHEMY_TRACK_MODIFICATIONS = False + # Table Configuration TABLE_NAME = "customer_account_transaction_hx" BATCH_RESULTS_TABLE = "salary_analytics_batch_results"