132 lines
4.3 KiB
Python
132 lines
4.3 KiB
Python
import pandas as pd
|
|
import psycopg2
|
|
from psycopg2.extras import execute_values
|
|
import os
|
|
from dotenv import load_dotenv
|
|
|
|
# Load environment variables
|
|
load_dotenv()
|
|
|
|
# Database Configuration
|
|
DB_CONFIG = {
|
|
"user": os.getenv("DB_USER"),
|
|
"password": os.getenv("DB_PASSWORD"),
|
|
"name": os.getenv("DB_NAME"),
|
|
"port": os.getenv("DB_PORT"),
|
|
"host": os.getenv("DB_HOST")
|
|
}
|
|
|
|
def connect_to_db():
|
|
"""Establish connection to the database."""
|
|
return psycopg2.connect(
|
|
user=DB_CONFIG["user"],
|
|
password=DB_CONFIG["password"],
|
|
host=DB_CONFIG["host"],
|
|
port=DB_CONFIG["port"],
|
|
database=DB_CONFIG["name"]
|
|
)
|
|
|
|
def create_table_if_not_exists(conn):
|
|
"""Create the analytics_raw_transactions table if it doesn't exist."""
|
|
with conn.cursor() as cur:
|
|
cur.execute("""
|
|
CREATE TABLE IF NOT EXISTS analytics_raw_transactions (
|
|
id SERIAL PRIMARY KEY,
|
|
cust_id VARCHAR(10),
|
|
accountid VARCHAR(10),
|
|
tran_id VARCHAR(12),
|
|
entry_date TIMESTAMP NULL,
|
|
value_date TIMESTAMP NULL,
|
|
pstd_date TIMESTAMP NULL,
|
|
tran_date TIMESTAMP NULL,
|
|
tran_sub_ty VARCHAR(4),
|
|
part_tran_ty VARCHAR(4),
|
|
channel VARCHAR(32),
|
|
tran_amt DECIMAL(20,2),
|
|
balance DECIMAL(20,2),
|
|
isreverse VARCHAR(4),
|
|
reverse VARCHAR(4),
|
|
tran_particular VARCHAR(100)
|
|
)
|
|
""")
|
|
conn.commit()
|
|
|
|
def upload_xls_to_db(xls_path):
|
|
"""Upload data from XLS file to the database."""
|
|
try:
|
|
# Read XLS file
|
|
df = pd.read_excel(xls_path, dtype=str)
|
|
|
|
# Convert date columns to datetime
|
|
date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"]
|
|
for col in date_cols:
|
|
if col in df.columns:
|
|
df[col] = pd.to_datetime(df[col], errors='coerce')
|
|
df[col] = df[col].fillna(pd.Timestamp.now())
|
|
|
|
# Convert numeric columns
|
|
for col in ["TRAN_AMT", "BALANCE"]:
|
|
if col in df.columns:
|
|
df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce')
|
|
|
|
# Truncate TRAN_PARTICULAR to 100 chars
|
|
if "TRAN_PARTICULAR" in df.columns:
|
|
df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100)
|
|
|
|
# Connect to database
|
|
conn = connect_to_db()
|
|
|
|
# Create table if it doesn't exist
|
|
create_table_if_not_exists(conn)
|
|
|
|
# Prepare data for insertion
|
|
data = df.to_dict('records')
|
|
|
|
# Insert data
|
|
with conn.cursor() as cur:
|
|
execute_values(
|
|
cur,
|
|
"""
|
|
INSERT INTO analytics_raw_transactions
|
|
(cust_id, accountid, tran_id, entry_date, value_date, pstd_date, tran_date, tran_sub_ty, part_tran_ty, channel, tran_amt, balance, isreverse, reverse, tran_particular)
|
|
VALUES %s
|
|
""",
|
|
[(
|
|
row.get('CUST_ID'),
|
|
row.get('ACCOUNTID'),
|
|
row.get('TRAN_ID'),
|
|
row.get('ENTRY_DATE'),
|
|
row.get('VALUE_DATE'),
|
|
row.get('PSTD_DATE'),
|
|
row.get('TRAN_DATE'),
|
|
row.get('TRAN_SUB_TY'),
|
|
row.get('PART_TRAN_TY'),
|
|
row.get('CHANNEL'),
|
|
row.get('TRAN_AMT'),
|
|
row.get('BALANCE'),
|
|
row.get('ISREVERSE'),
|
|
row.get('REVERSE'),
|
|
(row.get('TRAN_PARTICULAR') or '')[:100]
|
|
) for row in data]
|
|
)
|
|
|
|
conn.commit()
|
|
print(f"Successfully uploaded {len(data)} records to analytics_raw_transactions")
|
|
|
|
except Exception as e:
|
|
print(f"Error uploading data: {str(e)}")
|
|
if conn:
|
|
conn.rollback()
|
|
finally:
|
|
if conn:
|
|
conn.close()
|
|
|
|
if __name__ == "__main__":
|
|
import sys
|
|
|
|
if len(sys.argv) != 2:
|
|
print("Usage: python upload_xls.py <path_to_xls_file>")
|
|
sys.exit(1)
|
|
|
|
xls_path = sys.argv[1]
|
|
upload_xls_to_db(xls_path) |