105 lines
3.4 KiB
Python
105 lines
3.4 KiB
Python
from flask import request, jsonify
|
|
from marshmallow import ValidationError
|
|
from app.api.services.base_service import BaseService
|
|
from app.utils.logger import logger
|
|
from app.api.schemas.authorization import AuthorizeRequestSchema
|
|
from app.api.helpers.response_helper import ResponseHelper
|
|
from flask_jwt_extended import (
|
|
JWTManager,
|
|
jwt_required,
|
|
create_access_token,
|
|
create_refresh_token,
|
|
get_jwt_identity,
|
|
)
|
|
from app.config import Config
|
|
from datetime import timedelta
|
|
|
|
USERNAME = Config.BASIC_AUTH_USERNAME
|
|
PASSWORD = Config.BASIC_AUTH_PASSWORD
|
|
|
|
|
|
class AuthorizationService(BaseService):
|
|
|
|
@staticmethod
|
|
def process_request(data):
|
|
"""
|
|
Process the Authorization request.
|
|
|
|
Args:
|
|
data (dict): The request data.
|
|
|
|
Returns:
|
|
dict: A standardized response.
|
|
"""
|
|
try:
|
|
logger.info("Processing Authorization request")
|
|
|
|
if not data:
|
|
return ResponseHelper.bad_request(result_description="Missing JSON in request")
|
|
|
|
# Validate input data using the Authorization schema
|
|
schema = AuthorizeRequestSchema()
|
|
validated_data = schema.load(data) # Raises ValidationError if invalid
|
|
|
|
if (
|
|
validated_data["username"] != USERNAME
|
|
or validated_data["password"] != PASSWORD
|
|
):
|
|
return ResponseHelper.unauthorized(result_description="Invalid credentials")
|
|
|
|
expires = timedelta(days=3)
|
|
access_token = create_access_token(identity=validated_data["username"], expires_delta=expires)
|
|
refresh_token = create_refresh_token(identity=validated_data["username"])
|
|
|
|
# Simulated processing logic
|
|
response_data = {
|
|
"access_token": access_token,
|
|
"refresh_token": refresh_token,
|
|
}
|
|
|
|
return ResponseHelper.success(
|
|
data={"data": response_data}, result_description="Authorization processed successfully"
|
|
)
|
|
|
|
except ValidationError as e:
|
|
logger.error(f"Validation error: {e}")
|
|
return ResponseHelper.bad_request(result_description=f"Validation error: {e}")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Authorization request: {e}")
|
|
return ResponseHelper.internal_server_error(
|
|
result_description=f"Error processing Authorization request: {e}"
|
|
)
|
|
|
|
@staticmethod
|
|
def process_refresh_request():
|
|
"""
|
|
Process the RefreshToken request.
|
|
|
|
Args:
|
|
data (dict): The request data.
|
|
|
|
Returns:
|
|
dict: A standardized response.
|
|
"""
|
|
try:
|
|
logger.info("Processing RefreshToken request")
|
|
|
|
identity = get_jwt_identity()
|
|
access_token = create_access_token(identity=identity)
|
|
|
|
# Simulated processing logic
|
|
response_data = {
|
|
"access_token": access_token,
|
|
}
|
|
|
|
return ResponseHelper.success(
|
|
data={"data": response_data}, result_description="RefreshToken processed successfully"
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing RefreshToken request: {e}")
|
|
return ResponseHelper.internal_server_error(
|
|
result_description=f"Error processing RefreshToken request: {e}"
|
|
)
|