diff --git a/app.py b/app.py index 031e1d7..51efc32 100644 --- a/app.py +++ b/app.py @@ -3,4 +3,5 @@ from app import create_app app = create_app() if __name__ == "__main__": - app.run(host="0.0.0.0", port=5000, debug=True) \ No newline at end of file + + app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index b2fd6e9..9e2ef39 100644 Binary files a/app/__pycache__/config.cpython-310.pyc and b/app/__pycache__/config.cpython-310.pyc differ diff --git a/app/config.py b/app/config.py index 598d366..74abe6d 100644 --- a/app/config.py +++ b/app/config.py @@ -1,9 +1,17 @@ import os + class Config: """Base configuration for Flask app""" SECRET_KEY = os.getenv("SECRET_KEY", "supersecretkey") API_BASE_URL = "https://coreapi.dev.simbrellang.net/v1/api/salary" JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") - DEBUG = True \ No newline at end of file + DEBUG = True + + KAFKA_BROKER = "dev-events.simbrellang.net:9085" + KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" + KAFKA_TIMEOUT = 5.0 + + +settings = Config() diff --git a/app/integrations/__init__.py b/app/integrations/__init__.py new file mode 100644 index 0000000..f19ae60 --- /dev/null +++ b/app/integrations/__init__.py @@ -0,0 +1 @@ +from .kafka import KafkaIntegration diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py new file mode 100644 index 0000000..4d3ed85 --- /dev/null +++ b/app/integrations/kafka.py @@ -0,0 +1,101 @@ +from confluent_kafka import Consumer, Producer +import json +import logging +from app.config import settings +import requests + +logger = logging.getLogger(__name__) + + +class KafkaIntegration: + _consumer = None + + _consumer_config = { + "bootstrap.servers": settings.KAFKA_BROKER, + "group.id": "loan-service-consumer", + "auto.offset.reset": "earliest", + "enable.auto.commit": True, + } + + @staticmethod + def _get_consumer(): + """Kafka consumer""" + if not KafkaIntegration._consumer: + KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) + logger.info( + f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" + ) + return KafkaIntegration._consumer + + @staticmethod + def delivery_report(err, msg): + """Called once for each message produced""" + if err is not None: + logger.error(f"Message delivery failed: {err}") + raise RuntimeError(f"Message delivery failed: {err}") + + else: + logger.debug( + f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" + ) + + @staticmethod + def receive_disbursement_messages(topic, timeout=1.0): + """ + Receive messages from a Kafka topic. + + :param topic: The Kafka topic to subscribe to + :param timeout: Time to wait for a message (in seconds) + :return: The message value (decoded) or None if no message is received + """ + consumer = KafkaIntegration._get_consumer() + consumer.subscribe([topic]) + + try: + msg = consumer.poll(timeout=timeout) + if msg is None: + logger.debug(f"No message received from topic {topic} within timeout") + return None + if msg.error(): + logger.error(f"Consumer error: {msg.error()}") + raise RuntimeError(f"Consumer error: {msg.error()}") + + # Decode and return the message value + message_value = msg.value().decode("utf-8") + logger.debug( + f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" + ) + message = json.loads(message_value) if message_value else None + + # Call the endpoint if provided + if message: + KafkaIntegration._call_disbursement_endpoint(message) + + return message + + except Exception as e: + logger.error(f"Error while receiving message: {e}") + raise + + @staticmethod + def close_consumer(): + """Shutdown consumer""" + consumer = KafkaIntegration._get_consumer() + consumer.close() + logger.info("Kafka consumer closed") + + @staticmethod + def _call_disbursement_endpoint( + message, endpoint_url="http://localhost:8000/loans/disbursement" + ): + """Call an HTTP endpoint with the received message""" + try: + response = requests.post(endpoint_url, json=message, timeout=5) + response.raise_for_status() + logger.info( + f"Successfully sent message to {endpoint_url}: {response.status_code}" + ) + print(response.json()) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to call endpoint {endpoint_url}: {e}") + raise diff --git a/app/routes/__pycache__/authentication.cpython-310.pyc b/app/routes/__pycache__/authentication.cpython-310.pyc index 84c1669..b1dd047 100644 Binary files a/app/routes/__pycache__/authentication.cpython-310.pyc and b/app/routes/__pycache__/authentication.cpython-310.pyc differ diff --git a/app/routes/loan.py b/app/routes/loan.py index f13a5fa..5c80968 100644 --- a/app/routes/loan.py +++ b/app/routes/loan.py @@ -1,6 +1,8 @@ from flask import Blueprint, request, jsonify, current_app import requests from app.utils.auth import get_headers +from app.integrations import KafkaIntegration +from threading import Thread loan_bp = Blueprint("loan", __name__) @@ -154,8 +156,10 @@ def disbursement(): data = request.json api_url = f"{current_app.config['API_BASE_URL']}/Disbursement" - response = requests.post(api_url, json=data, headers=get_headers()) - return jsonify(response.json()), response.status_code + return jsonify({"requestId": data["requestId"]}), 200 + + # response = requests.post(api_url, json=data, headers=get_headers()) + # return jsonify(response.json()), response.status_code @loan_bp.route("/collect-loan", methods=["POST"]) diff --git a/requirements.txt b/requirements.txt index ae70549..a5968da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ Flask-Marshmallow==0.15.0 marshmallow==3.19.0 Flask-Cors==3.0.10 gunicorn -requests \ No newline at end of file +requests +confluent-kafka==1.9.2 \ No newline at end of file diff --git a/wsgi.py b/wsgi.py index 49f70e1..7137082 100644 --- a/wsgi.py +++ b/wsgi.py @@ -1,7 +1,24 @@ from app import create_app +from app.integrations import KafkaIntegration +from app.config import settings +import logging app = create_app() +logger = logging.getLogger(__name__) if __name__ != "__main__": + + kafka = KafkaIntegration() + + while True: + message = kafka.receive_disbursement_messages( + topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT + ) + + if message: + logger.info(f"Processed message: {message}") + else: + logger.info("No message received within timeout") + # Expose WSGI app instance for Gunicorn - wsgi_app = app + wsgi_app = app