import pandas as pd import psycopg2 from psycopg2.extras import execute_values import os from dotenv import load_dotenv # Load environment variables load_dotenv() # Database Configuration DB_CONFIG = { "user": os.getenv("DB_USER"), "password": os.getenv("DB_PASSWORD"), "name": os.getenv("DB_NAME"), "port": os.getenv("DB_PORT"), "host": os.getenv("DB_HOST") } def connect_to_db(): """Establish connection to the database.""" return psycopg2.connect( user=DB_CONFIG["user"], password=DB_CONFIG["password"], host=DB_CONFIG["host"], port=DB_CONFIG["port"], database=DB_CONFIG["name"] ) def create_table_if_not_exists(conn): """Create the analytics_raw_transactions table if it doesn't exist.""" with conn.cursor() as cur: cur.execute(""" CREATE TABLE IF NOT EXISTS analytics_raw_transactions ( id SERIAL PRIMARY KEY, cif_id TEXT, acid TEXT, ref_num TEXT, entry_usr TEXT, tran_id TEXT, tran_date TIMESTAMP NULL, value_date TIMESTAMP NULL, entry_date TIMESTAMP NULL, pstd_date TIMESTAMP NULL, tran_subtype TEXT, part_tran_type TEXT, isreverse TEXT, reverse TEXT, tran_particular TEXT, channel TEXT, amount DECIMAL(20,2), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) conn.commit() def upload_xls_to_db(xls_path): """Upload data from XLS file to the database.""" try: # Read XLS file df = pd.read_excel(xls_path, dtype=str) # Convert date columns to datetime, errors='coerce' will set invalid parsing as NaT for col in ["TRAN_DATE", "VALUE_DATE", "ENTRY_DATE", "PSTD_DATE"]: if col in df.columns: df[col] = pd.to_datetime(df[col], errors='coerce') # Convert AMOUNT to numeric if "AMOUNT" in df.columns: df["AMOUNT"] = pd.to_numeric(df["AMOUNT"], errors='coerce') # Connect to database conn = connect_to_db() # Create table if it doesn't exist create_table_if_not_exists(conn) # Prepare data for insertion data = df.to_dict('records') # Insert data with conn.cursor() as cur: execute_values( cur, """ INSERT INTO analytics_raw_transactions (cif_id, acid, ref_num, entry_usr, tran_id, tran_date, value_date, entry_date, pstd_date, tran_subtype, part_tran_type, isreverse, reverse, tran_particular, channel, amount) VALUES %s """, [( row.get('CIF_ID'), row.get('ACID'), row.get('REF_NUM'), row.get('ENTRY_USR'), row.get('TRAN_ID'), row.get('TRAN_DATE'), row.get('VALUE_DATE'), row.get('ENTRY_DATE'), row.get('PSTD_DATE'), row.get('TRAN_SUBTYPE'), row.get('PART_TRAN_TYPE'), row.get('ISREVERSE'), row.get('REVERSE'), row.get('TRAN_PARTICULAR'), row.get('CHANNEL'), row.get('AMOUNT') ) for row in data] ) conn.commit() print(f"Successfully uploaded {len(data)} records to analytics_raw_transactions") except Exception as e: print(f"Error uploading data: {str(e)}") if conn: conn.rollback() finally: if conn: conn.close() if __name__ == "__main__": import sys if len(sys.argv) != 2: print("Usage: python upload_xls.py ") sys.exit(1) xls_path = sys.argv[1] upload_xls_to_db(xls_path)