diff --git a/app.py b/app.py index 031e1d7..51894a3 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,20 @@ from app import create_app +from app.integrations import KafkaIntegration app = create_app() if __name__ == "__main__": - app.run(host="0.0.0.0", port=5000, debug=True) \ No newline at end of file + kafka = KafkaIntegration() + + if kafka.is_kafka_running(): + print("Kafka is running") + else: + print("Kafka is not running") + exit(1) + + try: + message = kafka.receive_messages( + topic= + ) + + app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index 7d5cbe9..bdcb00b 100644 Binary files a/app/__pycache__/config.cpython-310.pyc and b/app/__pycache__/config.cpython-310.pyc differ diff --git a/app/config.py b/app/config.py index a466a3a..a56a39a 100644 --- a/app/config.py +++ b/app/config.py @@ -9,5 +9,8 @@ class Config: JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") DEBUG = True - KAFKA_BROKER = "dev-events.simbrellang.net:9085" + KAFKA_BROKER = "dev-events.simbrellang.net:9084" KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" + + +settings = Config() diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py index 0a10c29..fc66890 100644 --- a/app/integrations/kafka.py +++ b/app/integrations/kafka.py @@ -1,32 +1,31 @@ -from confluent_kafka import Producer +from confluent_kafka import Consumer import json import logging from app.config import settings +import requests logger = logging.getLogger(__name__) class KafkaIntegration: - _producer = None - _config = { + _consumer = None + + _consumer_config = { "bootstrap.servers": settings.KAFKA_BROKER, - "client.id": "loan-service-producer", - "acks": "all", - "retries": 3, - "debug": "broker,topic,msg", + "group.id": "loan-service-consumer", + "auto.offset.reset": "earliest", + "enable.auto.commit": True, } @staticmethod - def _get_producer(): - """Kafka producer""" - if not KafkaIntegration._producer: - KafkaIntegration._producer = Producer(KafkaIntegration._config) - + def _get_consumer(): + """Kafka consumer""" + if not KafkaIntegration._consumer: + KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) logger.info( - f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" + f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" ) - - return KafkaIntegration._producer + return KafkaIntegration._consumer @staticmethod def delivery_report(err, msg): @@ -42,44 +41,76 @@ class KafkaIntegration: ) @staticmethod - def send_disbursement_request(disbursement_data, request_id): + def receive_messages(topic, timeout=1.0): """ - Send disbursement request to PROCESS_PAYMENT topic + Receive messages from a Kafka topic. - Args: - disbursement_data (dict): Disbursement data request payload - request_id (str): Unique request identifier (used as Kafka key) - - Returns: - None + :param topic: The Kafka topic to subscribe to + :param timeout: Time to wait for a message (in seconds) + :return: The message value (decoded) or None if no message is received """ + consumer = KafkaIntegration._get_consumer() + consumer.subscribe([topic]) + try: - # proceed to send the disbursement request to Kafka - producer = KafkaIntegration._get_producer() + msg = consumer.poll(timeout=timeout) + if msg is None: + logger.debug(f"No message received from topic {topic} within timeout") + return None + if msg.error(): + logger.error(f"Consumer error: {msg.error()}") + raise RuntimeError(f"Consumer error: {msg.error()}") - # sending disbursement request to Kafka - producer.produce( - "PROCESS_PAYMENT", - key=str(request_id), - value=json.dumps(disbursement_data).encode("utf-8"), - callback=KafkaIntegration.delivery_report, + # Decode and return the message value + message_value = msg.value().decode("utf-8") + logger.debug( + f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) + return json.loads(message_value) if message_value else None - producer.poll(0) - - logger.info( - f"Disbursement request {request_id} sent to Kafka: {disbursement_data}" - ) except Exception as e: - logger.error( - f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" - ) - raise Exception( - f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" - ) + logger.error(f"Error while receiving message: {e}") + raise @staticmethod - def flush(): - """Shutdown""" - producer = KafkaIntegration._get_producer() - producer.flush() + def close_consumer(): + """Shutdown consumer""" + consumer = KafkaIntegration._get_consumer() + consumer.clonse() + logger.info("Kafka consumer closed") + + @staticmethod + def is_kafka_running(): + """ + Check if Kafka broker is reachable by attempting to fetch metadata. + :return: True if Kafka is running, False otherwise + """ + try: + producer = KafkaIntegration._get_producer() + # Attempt to fetch metadata for a dummy topic + metadata = producer.list_topics(timeout=5) + if metadata.brokers: # If brokers are returned, Kafka is running + logger.info(f"Kafka broker at {settings.KAFKA_BROKER} is running") + return True + else: + logger.warning(f"No brokers found for {settings.KAFKA_BROKER}") + return False + except Exception as e: + logger.error(f"Kafka broker check failed: {e}") + return False + + @staticmethod + def _call_disbursement_endpoint( + message, endpoint_url="http://localhost:8000/loans/disbursement" + ): + """Call an HTTP endpoint with the received message""" + try: + response = requests.post(endpoint_url, json=message, timeout=5) + response.raise_for_status() + logger.info( + f"Successfully sent message to {endpoint_url}: {response.status_code}" + ) + print(response.json()) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to call endpoint {endpoint_url}: {e}") + raise diff --git a/app/routes/__pycache__/authentication.cpython-310.pyc b/app/routes/__pycache__/authentication.cpython-310.pyc index 84c1669..b1dd047 100644 Binary files a/app/routes/__pycache__/authentication.cpython-310.pyc and b/app/routes/__pycache__/authentication.cpython-310.pyc differ diff --git a/app/routes/loan.py b/app/routes/loan.py index f13a5fa..5c80968 100644 --- a/app/routes/loan.py +++ b/app/routes/loan.py @@ -1,6 +1,8 @@ from flask import Blueprint, request, jsonify, current_app import requests from app.utils.auth import get_headers +from app.integrations import KafkaIntegration +from threading import Thread loan_bp = Blueprint("loan", __name__) @@ -154,8 +156,10 @@ def disbursement(): data = request.json api_url = f"{current_app.config['API_BASE_URL']}/Disbursement" - response = requests.post(api_url, json=data, headers=get_headers()) - return jsonify(response.json()), response.status_code + return jsonify({"requestId": data["requestId"]}), 200 + + # response = requests.post(api_url, json=data, headers=get_headers()) + # return jsonify(response.json()), response.status_code @loan_bp.route("/collect-loan", methods=["POST"])