[add]: database connection
This commit is contained in:
+8
-8
@@ -34,19 +34,19 @@ DB_CONFIG = {
|
||||
}
|
||||
|
||||
|
||||
# DNS = f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={DB_CONFIG['host']})(PORT={DB_CONFIG['port']}))(CONNECT_DATA=(SID={DB_CONFIG['sid']})))"
|
||||
DNS = f"(DESCRIPTION=(ADDRESS=(PROTOCOL=TCP)(HOST={DB_CONFIG['host']})(PORT={DB_CONFIG['port']}))(CONNECT_DATA=(SID={DB_CONFIG['sid']})))"
|
||||
|
||||
# Database Connection
|
||||
# SQLALCHEMY_DATABASE_URI_INTERNAL = (f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DNS}")
|
||||
# SQLALCHEMY_DATABASE_URI = os.getenv("SQLALCHEMY_DATABASE_URI_FULL", SQLALCHEMY_DATABASE_URI_INTERNAL)
|
||||
SQLALCHEMY_DATABASE_URI_INTERNAL = (f"oracle+oracledb://{DB_CONFIG['user']}:{DB_CONFIG['password']}@{DNS}")
|
||||
SQLALCHEMY_DATABASE_URI = os.getenv("SQLALCHEMY_DATABASE_URI_FULL", SQLALCHEMY_DATABASE_URI_INTERNAL)
|
||||
|
||||
#SQLALCHEMY_DATABASE_URI_FULL = 'oracle+oracledb://FIRSTADVSTG:Pchanged_56789@10.2.110.30:1521/?service_name=firstadv'
|
||||
# SQLALCHEMY_DATABASE_URI_FULL = 'oracle+oracledb://FIRSTADVSTG:Pchanged_56789@10.2.110.30:1521/?service_name=firstadv'
|
||||
|
||||
# SQLAlchemy Configuration
|
||||
SQLALCHEMY_DATABASE_URI = (
|
||||
f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
|
||||
f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
|
||||
)
|
||||
# SQLALCHEMY_DATABASE_URI = (
|
||||
# f"postgresql://{DB_CONFIG['user']}:{DB_CONFIG['password']}@"
|
||||
# f"{DB_CONFIG['host']}:{DB_CONFIG['port']}/{DB_CONFIG['name']}"
|
||||
# )
|
||||
|
||||
SQLALCHEMY_TRACK_MODIFICATIONS = False
|
||||
|
||||
|
||||
@@ -24,7 +24,7 @@ class BatchResult(db.Model):
|
||||
|
||||
|
||||
@classmethod
|
||||
def save_batch(cls, session: Session, batch_number, total_batches, results_df, status="success"):
|
||||
def save_batch(cls, batch_number, total_batches, results_df, status="success"):
|
||||
"""Save batch results into DB using ORM bulk insert."""
|
||||
try:
|
||||
results_df["batch_number"] = batch_number
|
||||
@@ -42,21 +42,21 @@ class BatchResult(db.Model):
|
||||
for row in results_df.to_dict("records")
|
||||
]
|
||||
|
||||
session.bulk_save_objects(records)
|
||||
session.commit()
|
||||
db.session.bulk_save_objects(records)
|
||||
db.session.commit()
|
||||
logger.info(f"Saved batch {batch_number} successfully.")
|
||||
return True
|
||||
except Exception as e:
|
||||
session.rollback()
|
||||
db.session.rollback()
|
||||
logger.error(f"Error saving batch {batch_number}: {str(e)}")
|
||||
return False
|
||||
|
||||
@classmethod
|
||||
def get_batch_status(cls, session: Session, batch_number: int):
|
||||
def get_batch_status(cls, batch_number: int):
|
||||
"""Return summary info about one batch."""
|
||||
try:
|
||||
result = (
|
||||
session.query(
|
||||
db.session.query(
|
||||
cls.batch_number,
|
||||
cls.total_batches,
|
||||
cls.processed_at,
|
||||
@@ -75,11 +75,11 @@ class BatchResult(db.Model):
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_all_batches(cls, session: Session):
|
||||
def get_all_batches(cls):
|
||||
"""Return summaries for all batches."""
|
||||
try:
|
||||
results = (
|
||||
session.query(
|
||||
db.session.query(
|
||||
cls.batch_number,
|
||||
cls.total_batches,
|
||||
cls.processed_at,
|
||||
|
||||
@@ -0,0 +1,100 @@
|
||||
from venv import logger
|
||||
from sqlalchemy import Column, Integer, String, Float, DateTime, ForeignKey
|
||||
from sqlalchemy.orm import relationship
|
||||
from app.extensions import db
|
||||
import pandas as pd
|
||||
|
||||
class CustomerAccountTransactionHx(db.Model):
|
||||
__tablename__ = "customer_account_transaction_hx"
|
||||
|
||||
id = Column(Integer, primary_key=True, autoincrement=True)
|
||||
accountid = Column(String(64), nullable=False, index=True)
|
||||
trx_type = Column(String(50), nullable=False)
|
||||
amount = Column(Float, nullable=False)
|
||||
description = Column(String(255))
|
||||
customer_id = Column(String(64))
|
||||
trx_start_date = Column(DateTime, nullable=False)
|
||||
trx_end_date = Column(DateTime)
|
||||
is_salary_related = Column(Integer, default=0)
|
||||
is_consistent_amount = Column(Integer, default=0)
|
||||
is_salary_type = Column(Integer, default=0)
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def get_all(cls):
|
||||
"""Fetch all transactions."""
|
||||
return db.session.query(cls).all()
|
||||
|
||||
@classmethod
|
||||
def get_rows_count(cls):
|
||||
"""Return total number of transaction rows."""
|
||||
try:
|
||||
count = db.session.query(db.func.count(cls.id)).scalar()
|
||||
return count
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting row count: {str(e)}")
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
def get_by_account(cls, accountid: str):
|
||||
"""Fetch transactions for a given account."""
|
||||
return db.session.query(cls).filter_by(accountid=accountid).all()
|
||||
|
||||
@classmethod
|
||||
def get_accounts(cls, limit=None):
|
||||
"""Fetch distinct account IDs."""
|
||||
query = db.session.query(cls.accountid).distinct()
|
||||
if limit:
|
||||
query = query.limit(limit)
|
||||
return [row.accountid for row in query.all()]
|
||||
|
||||
@classmethod
|
||||
def insert_transaction(cls, **kwargs):
|
||||
"""Insert a new transaction."""
|
||||
trx = cls(**kwargs)
|
||||
try:
|
||||
db.session.add(trx)
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Error inserting transaction: {str(e)}")
|
||||
return None
|
||||
return trx
|
||||
|
||||
@classmethod
|
||||
def bulk_insert(cls, transactions: list[dict]):
|
||||
"""Insert multiple transactions at once."""
|
||||
objs = [cls(**trx) for trx in transactions]
|
||||
|
||||
try:
|
||||
db.session.bulk_save_objects(objs)
|
||||
db.session.commit()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in bulk insert: {str(e)}")
|
||||
return None
|
||||
return objs
|
||||
|
||||
@classmethod
|
||||
def get_transactions_df(cls, accountids: list[str] = None):
|
||||
"""Return a Pandas DataFrame for ML model preparation."""
|
||||
query = db.session.query(cls)
|
||||
if accountids:
|
||||
query = query.filter(cls.accountid.in_(accountids))
|
||||
rows = query.all()
|
||||
|
||||
|
||||
df = pd.DataFrame([{
|
||||
"id": trx.id,
|
||||
"accountid": trx.accountid,
|
||||
"trx_type": trx.trx_type,
|
||||
"amount": trx.amount,
|
||||
"description": trx.description,
|
||||
"customer_id": trx.customer_id,
|
||||
"trx_start_date": trx.trx_start_date,
|
||||
"trx_end_date": trx.trx_end_date,
|
||||
"is_salary_related": trx.is_salary_related,
|
||||
"is_consistent_amount": trx.is_consistent_amount,
|
||||
"is_salary_type": trx.is_salary_type,
|
||||
} for trx in rows])
|
||||
|
||||
return df
|
||||
@@ -3,7 +3,7 @@ Database operations module for salary analytics.
|
||||
"""
|
||||
|
||||
from sqlalchemy import text
|
||||
from ..config import BATCH_RESULTS_TABLE
|
||||
from app.config import BATCH_RESULTS_TABLE
|
||||
from datetime import datetime
|
||||
from app.utils.logger import logger
|
||||
|
||||
|
||||
Reference in New Issue
Block a user