Added new salary-related terms and improved image outputs in salary.ipynb
This commit is contained in:
@@ -0,0 +1,113 @@
|
||||
"""
|
||||
Data loading and preprocessing module.
|
||||
"""
|
||||
|
||||
from sqlalchemy import create_engine, text
|
||||
import pandas as pd
|
||||
from datetime import datetime
|
||||
import logging
|
||||
from .config import DB_CONFIG, TABLE_NAME
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DataLoader:
|
||||
def __init__(self):
|
||||
self.engine = None
|
||||
self.df = None
|
||||
self.chunk_size = 10000 # Load 10,000 rows at a time
|
||||
|
||||
def connect(self):
|
||||
"""Establish database connection."""
|
||||
try:
|
||||
logger.info("Attempting to connect to database...")
|
||||
DATABASE_URL = f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
|
||||
self.engine = create_engine(DATABASE_URL)
|
||||
with self.engine.connect() as conn:
|
||||
# First check if table exists
|
||||
check_table = text(f"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = '{TABLE_NAME}')")
|
||||
table_exists = conn.execute(check_table).scalar()
|
||||
|
||||
if not table_exists:
|
||||
logger.error(f"Table {TABLE_NAME} does not exist in the database")
|
||||
return False
|
||||
|
||||
# Get row count
|
||||
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
|
||||
row_count = conn.execute(count_query).scalar()
|
||||
logger.info(f"Table {TABLE_NAME} exists with {row_count} rows")
|
||||
|
||||
# Get version
|
||||
result = conn.execute(text("SELECT version();"))
|
||||
logger.info("Connected successfully to database!")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to database: {str(e)}")
|
||||
return False
|
||||
|
||||
def load_data(self):
|
||||
"""Load and preprocess transaction data in chunks."""
|
||||
if not self.engine:
|
||||
logger.info("No database connection. Attempting to connect...")
|
||||
if not self.connect():
|
||||
logger.error("Failed to establish database connection")
|
||||
return None
|
||||
|
||||
try:
|
||||
logger.info(f"Loading data from table: {TABLE_NAME}")
|
||||
|
||||
# First get total count
|
||||
with self.engine.connect() as conn:
|
||||
count_query = text(f"SELECT COUNT(*) FROM {TABLE_NAME}")
|
||||
total_rows = conn.execute(count_query).scalar()
|
||||
logger.info(f"Total rows to process: {total_rows}")
|
||||
|
||||
# Load data in chunks
|
||||
chunks = []
|
||||
offset = 0
|
||||
|
||||
while True:
|
||||
logger.info(f"Loading chunk starting at offset {offset}")
|
||||
query = f"SELECT * FROM {TABLE_NAME} LIMIT {self.chunk_size} OFFSET {offset}"
|
||||
chunk = pd.read_sql(query, self.engine)
|
||||
|
||||
if chunk.empty:
|
||||
break
|
||||
|
||||
# Preprocess chunk
|
||||
chunk['trx_start_date'] = pd.to_datetime(chunk['trx_start_date'])
|
||||
chunk['trx_end_date'] = pd.to_datetime(chunk['trx_end_date'])
|
||||
|
||||
# Rename columns
|
||||
chunk = chunk.rename(columns={
|
||||
'd1': 'trx_type',
|
||||
'd2': 'trx_subtype',
|
||||
'd3': 'initiated_by',
|
||||
'd4': 'customer_id'
|
||||
})
|
||||
|
||||
chunks.append(chunk)
|
||||
offset += self.chunk_size
|
||||
|
||||
if offset >= total_rows:
|
||||
break
|
||||
|
||||
# Combine all chunks
|
||||
self.df = pd.concat(chunks, ignore_index=True)
|
||||
logger.info(f"Successfully loaded {len(self.df)} rows of data")
|
||||
|
||||
# Basic data validation
|
||||
logger.info("Performing data validation...")
|
||||
logger.info(f"Columns in dataset: {self.df.columns.tolist()}")
|
||||
logger.info(f"Data types:\n{self.df.dtypes}")
|
||||
logger.info(f"Missing values:\n{self.df.isnull().sum()}")
|
||||
|
||||
return self.df
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading data: {str(e)}")
|
||||
return None
|
||||
|
||||
def get_data(self):
|
||||
"""Get the loaded DataFrame."""
|
||||
if self.df is None:
|
||||
logger.warning("No data loaded. Call load_data() first.")
|
||||
return self.df
|
||||
Reference in New Issue
Block a user