Merge branch 'oluyemi' of DigiFi/digifi-EventManager into master

This commit is contained in:
2025-04-15 16:06:06 +00:00
committed by Gogs
15 changed files with 140 additions and 61 deletions
+8 -2
View File
@@ -2,7 +2,8 @@ from flask import Flask
from flask_cors import CORS
from app.config import Config
from app.routes import auth_bp, autocall_bp
from app.errors import method_not_allowed, unsupported_media_type
from app.response import (method_not_allowed, unsupported_media_type, not_found, bad_request)
from app.extensions import db
def create_app():
@@ -12,15 +13,20 @@ def create_app():
# Load configuration
app.config.from_object(Config)
# Setup CORS
CORS(app)
# Register blueprints
app.register_blueprint(auth_bp)
app.register_blueprint(autocall_bp, url_prefix="/autocall")
# Error Handlers
app.register_error_handler(405, method_not_allowed)
app.register_error_handler(415, unsupported_media_type)
app.register_error_handler(404, not_found)
app.register_error_handler(400, bad_request)
# Database
db.init_app(app)
return app
+10
View File
@@ -28,5 +28,15 @@ class Config:
"BANK_CALL_BASIC_AUTH_PASSWORD", "password"
)
DATABASE_USER = os.getenv("DATABASE_USER")
DATABASE_PASSWORD = os.getenv("DATABASE_PASSWORD")
DATABASE_HOST = os.getenv("DATABASE_HOST")
DATABASE_NAME = os.getenv("DATABASE_NAME")
DATABASE_PORT = os.getenv("DATABASE_PORT", 10532)
SQLALCHEMY_DATABASE_URI = f"postgresql+psycopg2://{DATABASE_USER}:{DATABASE_PASSWORD}@{DATABASE_HOST}:{DATABASE_PORT}/{DATABASE_NAME}"
SQLALCHEMY_TRACK_MODIFICATIONS = False
settings = Config()
-1
View File
@@ -1 +0,0 @@
from .handlers import method_not_allowed, unsupported_media_type
+3
View File
@@ -0,0 +1,3 @@
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
+16 -25
View File
@@ -2,7 +2,9 @@ import requests
from app.config import settings
from app.utils.auth import get_headers
from app.utils.logger import logger
from flask import jsonify
from flask import jsonify, current_app
from app.services.transactions import TransactionService
class SimbrellaClient:
@@ -11,9 +13,16 @@ class SimbrellaClient:
@staticmethod
def disbursement(data):
api_url = f"{SimbrellaClient.BANK_CALL_BASE_URL}/Disbursement"
logger.info(f"BANK_CALL_BASE_URL = {SimbrellaClient.BANK_CALL_BASE_URL}")
logger.info(f"Calling Disbursement endpoint with data: {data}")
# Check if the transaction exists
logger.info(f"Checking if transaction exists")
transaction = TransactionService.get_transaction_by_transaction_id(transaction_id=data['transactionId'])
logger.info(f"Response from database: {transaction}")
if not transaction:
return 0
disbursement_data ={
"requestId": data['requestId'],
"transactionId": data['transactionId'],
@@ -45,7 +54,6 @@ class SimbrellaClient:
@staticmethod
def collect_loan(data):
api_url = f"{SimbrellaClient.BANK_CALL_BASE_URL}/CollectLoan"
logger.info(f"BANK_CALL_BASE_URL = {SimbrellaClient.BANK_CALL_BASE_URL}")
logger.info(f"Calling CollectLoan endpoint with data: {data}")
collect_loan_data = {
@@ -76,14 +84,7 @@ class SimbrellaClient:
@staticmethod
def verify_transaction():
# api_url = f"{SimbrellaClient.BANK_CALL_BASE_URL}/TransactionVerify"
# logger.info(f"BANK_CALL_BASE_URL = {SimbrellaClient.BANK_CALL_BASE_URL}")
# logger.info(f"Calling TransactionVerify endpoint with data: {data}")
try:
# logger.info(f"Here is your TransactionVerify Request data ***** : {data}")
# response = requests.post(api_url, json=data, headers=get_headers())
# logger.info(f"TransactionVerify response: {response.json()}")
return {
"status": "00",
@@ -96,16 +97,9 @@ class SimbrellaClient:
@staticmethod
def refresh_disbursement(data):
# api_url = f"{SimbrellaClient.BANK_CALL_BASE_URL}/Disbursement"
# logger.info(f"BANK_CALL_BASE_URL = {SimbrellaClient.BANK_CALL_BASE_URL}")
# logger.info(f"Calling Disbursement endpoint with data: {data}")
try:
logger.info(f"Here is your Disbursement Request data ***** : {data}")
# response = requests.post(api_url, json=data, headers=get_headers())
# logger.info(f"Disbursement response: {response.json()}")
# return response.json()
return data
@@ -115,19 +109,16 @@ class SimbrellaClient:
@staticmethod
def payment_callback(data):
# api_url = f"{SimbrellaClient.BANK_CALL_BASE_URL}/Payment"
# logger.info(f"BANK_CALL_BASE_URL = {SimbrellaClient.BANK_CALL_BASE_URL}")
# logger.info(f"Calling Payment Callback endpoint with data: {data}")
try:
logger.info(f"Here is your Payment Callback Request data ***** : {data}")
# response = requests.post(api_url, json=data, headers=get_headers())
# logger.info(f"Payment Callback response: {response.json()}")
# return response.json()
return data
except Exception as e:
logger.info(f"Failed to call Payment Callback endpoint: {e}")
raise
raise
@staticmethod
def check_transaction(txn_id):
return run_in_app_context(Transaction.get_transaction_by_id(txn_id))
+3
View File
@@ -0,0 +1,3 @@
from .transactions import Transaction
__all__ = ['Transaction']
+25
View File
@@ -0,0 +1,25 @@
from app.extensions import db
from datetime import datetime, timezone
class Transaction(db.Model):
__tablename__ = "transactions"
id = db.Column(
db.Integer,
primary_key=True,
autoincrement=True,
)
transaction_id = db.Column(db.String(50), nullable=False)
account_id = db.Column(db.String(50), nullable=True)
customer_id = db.Column(db.String(50), nullable=True)
type = db.Column(db.String(50), nullable=False)
channel = db.Column(db.String(50), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.now(timezone.utc))
updated_at = db.Column(db.DateTime, default=datetime.now(timezone.utc), onupdate=datetime.now(timezone.utc))
def __repr__(self):
return f'<Transaction {self.id}>'
@classmethod
def get_transaction_by_transaction_id(cls, transaction_id):
return cls.query.filter_by(transaction_id=transaction_id).first()
+2
View File
@@ -0,0 +1,2 @@
from .handlers import (method_not_allowed, unsupported_media_type,
not_found, bad_request, success, created, updated)
@@ -16,3 +16,15 @@ def bad_request(error):
def unsupported_media_type(error):
return ResponseHelper.error(message="Unsupported Media Type", status_code=415)
def success(data):
return ResponseHelper.success(data=data)
def created(data):
return ResponseHelper.created(data=data)
def updated(data):
return ResponseHelper.updated(data=data)
+1
View File
@@ -0,0 +1 @@
from .transactions import TransactionService
+10
View File
@@ -0,0 +1,10 @@
from app.models import Transaction
class TransactionService:
@staticmethod
def get_transaction_by_transaction_id(transaction_id):
"""
Get the transaction by ID
"""
return Transaction.get_transaction_by_transaction_id(transaction_id)
+2 -8
View File
@@ -2,14 +2,8 @@ from app.config import settings
def get_headers():
# return {
# "Content-Type": "application/json",
# "x-api_key": settings.BANK_CALL_API_KEY,
# "App-Id": settings.BANK_CALL_APP_ID,
# }
return {
"Content-Type": "application/json",
"x-api-key": "test-api-key-12345",
"App-Id": "app1",
"x-api-key": settings.BANK_CALL_API_KEY,
"App-Id": settings.BANK_CALL_APP_ID,
}
+18
View File
@@ -95,6 +95,24 @@ paths:
unicode:
type: boolean
example: true
responses:
200:
description: A successful response
/autocall/refresh-verify-disbursement:
get:
summary: Refresh the disbursement to verify
responses:
200:
description: A successful response
/autocall/refresh-disbursement:
get:
summary: Refresh the disbursement
responses:
200:
description: A successful response
/autocall/payment-callback:
get:
summary: The Payment callback
responses:
200:
description: A successful response
+4 -1
View File
@@ -5,4 +5,7 @@ marshmallow==3.19.0
Flask-Cors==3.0.10
gunicorn
requests
confluent-kafka==1.9.2
confluent-kafka==1.9.2
flask-sqlalchemy
psycopg2-binary
alembic
+26 -24
View File
@@ -1,35 +1,37 @@
import threading
from app import create_app
from app.integrations import KafkaIntegration
from app.config import settings
from app.utils.logger import logger
app = create_app()
kafka = KafkaIntegration()
def start_kafka_consumer(app):
with app.app_context():
logger.info("Starting Kafka consumer...")
while True:
try:
message = kafka.receive_disbursement_messages(
topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT
)
if message:
logger.info(f"Processed message: {message}")
else:
logger.info("No message received within timeout")
except Exception as e:
logger.error(f"Error while receiving message: {e}")
if __name__ != "__main__":
#Expose WSGI app instance for Gunicorn
# Expose WSGI app instance for Gunicorn
wsgi_app = app
kafka = KafkaIntegration()
logger.info("Starting Kafka consumer...")
while True:
try:
message = kafka.receive_disbursement_messages(
topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT
)
if message:
logger.info(f"Processed message: {message}")
else:
logger.info("No message received within timeout")
except Exception as e:
logger.error(f"Error while receiving message: {e}")
raise
# Expose WSGI app instance for Gunicorn
# wsgi_app = app
# Start kafka in a thread
threading.Thread(target=start_kafka_consumer, args=(app,), daemon=True).start()