import threading from app import create_app from app.integrations import KafkaIntegration from app.config import settings from app.utils.logger import logger app = create_app() kafka = KafkaIntegration() def start_kafka_consumer(app): with app.app_context(): logger.info("Starting Kafka consumer...") while True: try: message = kafka.receive_messages( topics=settings.KAFKA_TOPICS, timeout=settings.KAFKA_TIMEOUT ) if message: logger.info(f"Processed message: {message}") else: logger.info("No message received within timeout") except Exception as e: logger.error(f"Error while receiving message: {e}") if __name__ != "__main__": # Expose WSGI app instance for Gunicorn wsgi_app = app # Start kafka in a thread threading.Thread(target=start_kafka_consumer, args=(app,), daemon=True).start()