diff --git a/app.py b/app.py index 51894a3..23c7d2b 100644 --- a/app.py +++ b/app.py @@ -1,20 +1,9 @@ from app import create_app from app.integrations import KafkaIntegration +from app.config import settings app = create_app() if __name__ == "__main__": - kafka = KafkaIntegration() - - if kafka.is_kafka_running(): - print("Kafka is running") - else: - print("Kafka is not running") - exit(1) - - try: - message = kafka.receive_messages( - topic= - ) app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index bdcb00b..1be7f16 100644 Binary files a/app/__pycache__/config.cpython-310.pyc and b/app/__pycache__/config.cpython-310.pyc differ diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py index fc66890..4d3ed85 100644 --- a/app/integrations/kafka.py +++ b/app/integrations/kafka.py @@ -1,4 +1,4 @@ -from confluent_kafka import Consumer +from confluent_kafka import Consumer, Producer import json import logging from app.config import settings @@ -31,7 +31,6 @@ class KafkaIntegration: def delivery_report(err, msg): """Called once for each message produced""" if err is not None: - logger.error(f"Message delivery failed: {err}") raise RuntimeError(f"Message delivery failed: {err}") @@ -41,7 +40,7 @@ class KafkaIntegration: ) @staticmethod - def receive_messages(topic, timeout=1.0): + def receive_disbursement_messages(topic, timeout=1.0): """ Receive messages from a Kafka topic. @@ -66,7 +65,13 @@ class KafkaIntegration: logger.debug( f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) - return json.loads(message_value) if message_value else None + message = json.loads(message_value) if message_value else None + + # Call the endpoint if provided + if message: + KafkaIntegration._call_disbursement_endpoint(message) + + return message except Exception as e: logger.error(f"Error while receiving message: {e}") @@ -76,29 +81,9 @@ class KafkaIntegration: def close_consumer(): """Shutdown consumer""" consumer = KafkaIntegration._get_consumer() - consumer.clonse() + consumer.close() logger.info("Kafka consumer closed") - @staticmethod - def is_kafka_running(): - """ - Check if Kafka broker is reachable by attempting to fetch metadata. - :return: True if Kafka is running, False otherwise - """ - try: - producer = KafkaIntegration._get_producer() - # Attempt to fetch metadata for a dummy topic - metadata = producer.list_topics(timeout=5) - if metadata.brokers: # If brokers are returned, Kafka is running - logger.info(f"Kafka broker at {settings.KAFKA_BROKER} is running") - return True - else: - logger.warning(f"No brokers found for {settings.KAFKA_BROKER}") - return False - except Exception as e: - logger.error(f"Kafka broker check failed: {e}") - return False - @staticmethod def _call_disbursement_endpoint( message, endpoint_url="http://localhost:8000/loans/disbursement" diff --git a/wsgi.py b/wsgi.py index 49f70e1..697264b 100644 --- a/wsgi.py +++ b/wsgi.py @@ -1,7 +1,38 @@ from app import create_app +from app.integrations import KafkaIntegration +from app.config import settings +import logging app = create_app() +logger = logging.getLogger(__name__) if __name__ != "__main__": + + kafka = KafkaIntegration() + + while True: + message = kafka.receive_disbursement_messages( + topic=settings.KAFKA_PAYMENT_TOPIC, timeout=1.0 + ) + + if message: + logger.info(f"Processed message: {message}") + else: + logger.info("No message received within timeout") + + # try: + # message = kafka.receive_disbursement_messages( + # topic=settings.KAFKA_PAYMENT_TOPIC, timeout=5.0 + # ) + + # if message: + # logger.info(f"Processed message: {message}") + # else: + # logger.info("No message received within timeout") + # except Exception as e: + # print(f"Error: {e}") + # finally: + # kafka.close_consumer() + # Expose WSGI app instance for Gunicorn - wsgi_app = app + wsgi_app = app