Files
digifi-EventManager/app/routes/authentication.py
T
2025-10-03 13:44:52 +01:00

107 lines
2.8 KiB
Python

from flask import Blueprint, request, jsonify, current_app
import requests
from app.extensions import db
from sqlalchemy import text
from app.utils.auth import get_headers
from app.config import settings
from app.utils.logger import logger
auth_bp = Blueprint("auth", __name__)
BASE_URL = settings.BANK_CALL_BASE_URL
@auth_bp.route("/health", methods=["GET"])
def health():
logger.info("Health check endpoint called")
try:
# Detect database type
dialect = db.engine.dialect.name.lower()
logger.info(f"the database dialect {dialect}")
if "oracle" in dialect:
query = text("SELECT 1 FROM dual")
else: # Postgres, MySQL, SQLite, etc.
query = text("SELECT 1")
db.session.execute(query)
return jsonify({
"status": "success",
"database": dialect,
"message": "Database connection successful"
}), 200
except Exception as e:
return jsonify({
"status": "error",
"message": str(e)
}), 500
@auth_bp.route("/login", methods=["POST"])
def login():
data = request.get_json()
api_url = f"{BASE_URL}/login"
response = requests.post(api_url, json=data)
if response.status_code == 200:
return jsonify(response.json()), 200
return jsonify({"error": "Invalid credentials"}), response.status_code
@auth_bp.route("/status-call", methods=["POST"])
def status_call():
data = request.get_json()
api_url = f"{BASE_URL}/StatusCall"
# response = requests.post(api_url, json=data, headers=get_headers())
# return jsonify(response.json()), response.status_code
response = {
"transactionId": "24110114545374721",
"data": {
"transactionId": "241101",
"providedAmount": 1000,
"collectedAmount": 0,
"resultCode": "00",
"resultDescription": "Loan Provision is successful",
},
"resultCode": "00",
"resultDescription": "SUCCESS",
}
return jsonify(response), 200
@auth_bp.route("/sms", methods=["POST"])
def sms():
data = request.get_json()
api_url = f"{BASE_URL}/SMS"
# response = requests.post(api_url, json=data, headers=get_headers())
# return jsonify(response.json()), response.status_code
response = {
"data": "",
"statusCode": 200,
"IsSuccessful": True,
"errorMessage": None,
}
return jsonify(response), 200
@auth_bp.route("/bulk-sms", methods=["POST"])
def bulk_sms():
data = request.get_json()
api_url = f"{BASE_URL}/BulkSMS"
# response = requests.post(api_url, json=data, headers=get_headers())
# return jsonify(response.json()), response.status_code
response = {
"data": "",
"statusCode": 200,
"IsSuccessful": True,
"errorMessage": None,
}
return jsonify(response), 200