This commit was merged in pull request #4.
This commit is contained in:
Azeez Muibi
2025-04-16 12:03:44 +01:00
parent 7b0c7aab19
commit 3c88e53bab
2 changed files with 158 additions and 146 deletions
+100 -96
View File
@@ -1,114 +1,118 @@
from flask import Blueprint, request, jsonify, send_from_directory from flask import Blueprint, request, jsonify
from app.api.services import ( from app.api.services.loan import LoanService
AuthorizationService, from app.api.services.transaction import TransactionService
TransactionService, from app.api.services.auth_service import AuthService
LoanService, from app.api.services.dashboard_service import DashboardService
AuthService, from functools import wraps
DashboardService
)
from app.utils.logger import logger
from app.api.middlewares import enforce_json, require_auth
import os
from flask_jwt_extended import (
JWTManager,
jwt_required,
create_access_token,
get_jwt_identity,
create_refresh_token,
)
api = Blueprint("api", __name__) api = Blueprint('api', __name__)
@api.before_request # JWT Authentication decorator
def cors_middleware(): def token_required(f):
"""Middleware applied globally to all API routes in this blueprint""" @wraps(f)
return enforce_json() def decorated(*args, **kwargs):
token = None
# Get token from header
auth_header = request.headers.get('Authorization')
if auth_header:
if auth_header.startswith('Bearer '):
token = auth_header.split(' ')[1]
if not token:
return jsonify({'message': 'Token is missing!'}), 401
# Verify token
payload = AuthService.verify_token(token)
if not payload:
return jsonify({'message': 'Token is invalid or expired!'}), 401
# Add user info to request
request.user = payload
return f(*args, **kwargs)
return decorated
# Swagger JSON file @api.route('/login', methods=['POST'])
@api.route("/swagger.json", methods=["GET"])
def swagger_json():
swagger_dir = os.path.join("swagger")
return send_from_directory(swagger_dir, "digifi_swagger.json")
@api.route("/swagger/<path:filename>")
def serve_paths(filename):
swagger_dir = os.path.join("swagger")
return send_from_directory(swagger_dir, filename)
# Login endpoint
@api.route("/login", methods=["POST"])
def login(): def login():
data = request.get_json() data = request.get_json()
response = AuthService.login(data)
return response # Check if username and password are provided
if not data or 'username' not in data or 'password' not in data:
return jsonify({
'error': 'Missing credentials',
'message': 'Username and password are required'
}), 400
username = data.get('username', '')
password = data.get('password', '')
# Call the login method from AuthService
result = AuthService.login(username, password)
# Check if result is a tuple (error response)
if isinstance(result, tuple):
return jsonify(result[0]), result[1]
return jsonify(result)
# Dashboard endpoint @api.route('/dashboard', methods=['GET'])
@api.route("/dashboard", methods=["GET"]) @token_required
@jwt_required()
def get_dashboard(): def get_dashboard():
response = DashboardService.get_dashboard_data() # Call the dashboard service
return response result = DashboardService.get_dashboard_data()
return jsonify(result)
# Get All Transactions Endpoint @api.route('/loans', methods=['GET'])
@api.route("/transactions", methods=["GET"]) @token_required
@jwt_required()
def get_transactions():
# Extract query parameters for filtering
filters = {
'account_id': request.args.get('account_id'),
'type': request.args.get('type'),
'channel': request.args.get('channel'),
'start_date': request.args.get('start_date'),
'end_date': request.args.get('end_date')
}
# logger.info(f"Get transactions request received with filters: {filters}")
response = TransactionService.process_request(filters)
return response
# Get All Loans Endpoint
@api.route("/loans", methods=["GET"])
@jwt_required()
def get_loans(): def get_loans():
# Extract query parameters for filtering # Extract query parameters
filters = { customer_id = request.args.get('customer_id')
'customer_id': request.args.get('customer_id'), loan_id = request.args.get('loan_id')
'account_id': request.args.get('account_id'), status = request.args.get('status')
'status': request.args.get('status'), offer_id = request.args.get('offer_id')
'offer_id': request.args.get('offer_id'), product_id = request.args.get('product_id')
'product_id': request.args.get('product_id'), start_date = request.args.get('start_date')
'start_date': request.args.get('start_date'), end_date = request.args.get('end_date')
'end_date': request.args.get('end_date'),
'due_before': request.args.get('due_before'),
'due_after': request.args.get('due_after')
}
# logger.info(f"Get loans request received with filters: {filters}") # Call the loan service
response = LoanService.process_request(filters) result = LoanService.process_request(
return response customer_id=customer_id,
loan_id=loan_id,
status=status,
offer_id=offer_id,
product_id=product_id,
start_date=start_date,
end_date=end_date
)
return jsonify(result)
# Authorize endpoint @api.route('/transactions', methods=['GET'])
@api.route("/Authorize", methods=["POST"]) @token_required
def authorize(): def get_transactions():
data = request.get_json() # Extract query parameters
# logger.info(f"Authorize request received: {data}") account_id = request.args.get('account_id')
response = AuthorizationService.process_request(data) transaction_id = request.args.get('transaction_id')
return response type = request.args.get('type')
channel = request.args.get('channel')
start_date = request.args.get('start_date')
end_date = request.args.get('end_date')
# Call the transaction service
result = TransactionService.process_request(
account_id=account_id,
transaction_id=transaction_id,
type=type,
channel=channel,
start_date=start_date,
end_date=end_date
)
# Authorize refresh endpoint return jsonify(result)
@api.route("/AuthorizeRefresh", methods=["POST"])
@jwt_required(refresh=True)
def refresh():
data = request.get_json()
# logger.info(f"Authorize refresh request received: {data}")
response = AuthorizationService.process_refresh_request()
return response
+58 -50
View File
@@ -1,58 +1,66 @@
from flask import jsonify import jwt
from app.utils.logger import logger import datetime
from app.api.services.base_service import BaseService from flask import current_app
from app.models.user import User
from flask_jwt_extended import create_access_token
from datetime import timedelta
class AuthService(BaseService): class AuthService:
@staticmethod @staticmethod
def login(data): def login(username, password):
""" """
Process the login request. Login method that checks for specific credentials and returns a JWT token
Args:
data (dict): Login credentials including username and password.
Returns:
dict: A standardized response with JWT token and user information.
""" """
try: # Define valid credentials for testing
# Extract credentials valid_credentials = {
username = data.get('username') "digifiuser": "digifipass",
password = data.get('password') "admin": "admin123",
"test": "test123"
}
# Validate input # Check if the provided credentials are valid
if not username or not password: if username in valid_credentials and password == valid_credentials[username]:
return jsonify({ # Generate JWT token with 15 minutes expiration
"message": "Username and password are required" payload = {
}), 400 'sub': username, # Subject (typically user ID)
'iat': datetime.datetime.utcnow(), # Issued at
# Get user by username 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=15), # Expiration (15 minutes)
user = User.get_user_by_username(username) 'role': 'admin' if username == 'admin' else 'user' # Role based on username
# Check if user exists and password is correct
if not user or not user.check_password(password):
return jsonify({
"message": "Invalid username or password"
}), 401
# Create JWT token with 15 minute expiration
access_token = create_access_token(
identity=user.username,
expires_delta=timedelta(minutes=15),
additional_claims={"name": user.name}
)
# Return token and user information
return {
"jwt_token": access_token,
"name": user.name
} }
except Exception as e: # Get the secret key from config
logger.error(f"An error occurred during login: {str(e)}", exc_info=True) secret_key = current_app.config.get('JWT_SECRET_KEY', 'default-secret-key')
return jsonify({
"message": "Internal Server Error" # Generate the token
}), 500 token = jwt.encode(payload, secret_key, algorithm='HS256')
# Return the token and user info
return {
'jwt_token': token,
'user': {
'username': username,
'role': 'admin' if username == 'admin' else 'user'
},
'expires_in': 900 # 15 minutes in seconds
}
else:
# Return error for invalid credentials
return {
'error': 'Invalid credentials',
'message': 'The username or password is incorrect'
}, 401
@staticmethod
def verify_token(token):
"""
Verify the JWT token
"""
try:
# Get the secret key from config
secret_key = current_app.config.get('JWT_SECRET_KEY', 'default-secret-key')
# Decode the token
payload = jwt.decode(token, secret_key, algorithms=['HS256'])
return payload
except jwt.ExpiredSignatureError:
return None # Token has expired
except jwt.InvalidTokenError:
return None # Invalid token