import pandas as pd import psycopg2 from psycopg2.extras import execute_values import os from dotenv import load_dotenv # Load environment variables load_dotenv() # Database Configuration DB_CONFIG = { "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "name": os.getenv("DB_NAME"), "port": os.getenv("DB_PORT"), "host": os.getenv("DB_HOST") } def connect_to_db(): """Establish connection to the database.""" return psycopg2.connect( user=DB_CONFIG["user"], password=DB_CONFIG["password"], host=DB_CONFIG["host"], port=DB_CONFIG["port"], database=DB_CONFIG["name"] ) def create_table_if_not_exists(conn): """Create the analytics_raw_transactions table if it doesn't exist.""" with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS analytics_raw_transactions ( id SERIAL PRIMARY KEY, cust_id VARCHAR(10), accountid VARCHAR(10), tran_id VARCHAR(12), entry_date TIMESTAMP NULL, value_date TIMESTAMP NULL, pstd_date TIMESTAMP NULL, tran_date TIMESTAMP NULL, tran_sub_ty VARCHAR(4), part_tran_ty VARCHAR(4), channel VARCHAR(32), tran_amt DECIMAL(20,2), balance DECIMAL(20,2), isreverse VARCHAR(4), reverse VARCHAR(4), tran_particular VARCHAR(100) ) """) conn.commit() def upload_xls_to_db(xls_path): """Upload data from XLS file to the database.""" try: # Read XLS file df = pd.read_excel(xls_path, dtype=str) # Convert date columns to datetime date_cols = ["ENTRY_DATE", "VALUE_DATE", "PSTD_DATE", "TRAN_DATE"] for col in date_cols: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce') df[col] = df[col].fillna(pd.Timestamp.now()) # Convert numeric columns for col in ["TRAN_AMT", "BALANCE"]: if col in df.columns: df[col] = pd.to_numeric(df[col].str.replace(",", ""), errors='coerce') # Truncate TRAN_PARTICULAR to 100 chars if "TRAN_PARTICULAR" in df.columns: df["TRAN_PARTICULAR"] = df["TRAN_PARTICULAR"].astype(str).str.slice(0, 100) # Connect to database conn = connect_to_db() # Create table if it doesn't exist create_table_if_not_exists(conn) # Prepare data for insertion data = df.to_dict('records') # Insert data with conn.cursor() as cur: execute_values( cur, """ INSERT INTO analytics_raw_transactions (cust_id, accountid, tran_id, entry_date, value_date, pstd_date, tran_date, tran_sub_ty, part_tran_ty, channel, tran_amt, balance, isreverse, reverse, tran_particular) VALUES %s """, [( row.get('CUST_ID'), row.get('ACCOUNTID'), row.get('TRAN_ID'), row.get('ENTRY_DATE'), row.get('VALUE_DATE'), row.get('PSTD_DATE'), row.get('TRAN_DATE'), row.get('TRAN_SUB_TY'), row.get('PART_TRAN_TY'), row.get('CHANNEL'), row.get('TRAN_AMT'), row.get('BALANCE'), row.get('ISREVERSE'), row.get('REVERSE'), (row.get('TRAN_PARTICULAR') or '')[:100] ) for row in data] ) conn.commit() print(f"Successfully uploaded {len(data)} records to analytics_raw_transactions") except Exception as e: print(f"Error uploading data: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close() if __name__ == "__main__": import sys if len(sys.argv) != 2: print("Usage: python upload_xls.py ") sys.exit(1) xls_path = sys.argv[1] upload_xls_to_db(xls_path)