From 4abafa9a6c94a4eaccafa2d4c6f1b3f97495b4d3 Mon Sep 17 00:00:00 2001 From: "oluyemi.a.simbrellang.com" Date: Wed, 16 Apr 2025 19:41:55 +0100 Subject: [PATCH] fetch topics from .env file --- .env.local.example | 1 + .env.remote.example | 1 + app/config.py | 2 +- app/integrations/kafka.py | 12 +++++------- wsgi.py | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/.env.local.example b/.env.local.example index 541f8ac..7194652 100644 --- a/.env.local.example +++ b/.env.local.example @@ -1,6 +1,7 @@ KAFKA_TIMEOUT=1000.0 KAFKA_BROKER="10.20.30.50:9092" +KAFKA_TOPICS=PROCESS_PAYMENT,LOAN_REPAYMENT DATABASE_USER=firstadvance DATABASE_PASSWORD=FirstAdvance! diff --git a/.env.remote.example b/.env.remote.example index 8ac6ef4..bed36e7 100644 --- a/.env.remote.example +++ b/.env.remote.example @@ -1,6 +1,7 @@ KAFKA_TIMEOUT=1000.0 KAFKA_BROKER="dev-events.simbrellang.net:9085" +KAFKA_TOPICS=PROCESS_PAYMENT,LOAN_REPAYMENT DATABASE_USER=firstadvance DATABASE_PASSWORD=FirstAdvance! diff --git a/app/config.py b/app/config.py index 917a7fb..ebd652b 100644 --- a/app/config.py +++ b/app/config.py @@ -11,7 +11,7 @@ class Config: DEBUG = True KAFKA_BROKER = os.getenv("KAFKA_BROKER", "dev-events.simbrellang.net:9085") - KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" + KAFKA_TOPICS = [topic.strip() for topic in os.getenv("KAFKA_TOPICS", "").split(",") if topic.strip()] KAFKA_TIMEOUT = float( os.getenv("KAFKA_TIMEOUT", 1000.0) ) JWT_ACCESS_TOKEN_EXPIRES = os.getenv("JWT_ACCESS_TOKEN_EXPIRES", timedelta(hours=1)) diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py index 5cd4ea6..75d89ef 100644 --- a/app/integrations/kafka.py +++ b/app/integrations/kafka.py @@ -42,22 +42,20 @@ class KafkaIntegration: ) @staticmethod - def receive_disbursement_messages(topic, timeout): + def receive_messages(topics, timeout): """ Receive messages from a Kafka topic. - :param topic: The Kafka topic to subscribe to + :param topics: The Kafka topics to subscribe to :param timeout: Time to wait for a message (in seconds) :return: The message value (decoded) or None if no message is received """ consumer = KafkaIntegration._get_consumer() - # consumer.subscribe(["PROCESS_PAYMENT"]) - # consumer.subscribe([topic]) - consumer.subscribe(['LOAN_REPAYMENT', 'PROCESS_PAYMENT']) + consumer.subscribe(topics) logger.info( - f"Waiting for messages from topic {topic} with this timeout: {timeout}..." + f"Waiting for messages from topic {topics} with this timeout: {timeout}..." ) message = [] try: @@ -70,7 +68,7 @@ class KafkaIntegration: ) if msg is None: - logger.info(f"No message received from topic {topic} within timeout") + logger.info(f"No message received from topic {topics} within timeout") return None if msg.error(): logger.info(f"Consumer error: {msg.error()}") diff --git a/wsgi.py b/wsgi.py index a5c275c..fc10e89 100644 --- a/wsgi.py +++ b/wsgi.py @@ -13,8 +13,8 @@ def start_kafka_consumer(app): while True: try: - message = kafka.receive_disbursement_messages( - topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT + message = kafka.receive_messages( + topics=settings.KAFKA_TOPICS, timeout=settings.KAFKA_TIMEOUT ) if message: -- 2.34.1