fetch topics from .env file
This commit was merged in pull request #15.
This commit is contained in:
@@ -1,6 +1,7 @@
|
||||
|
||||
KAFKA_TIMEOUT=1000.0
|
||||
KAFKA_BROKER="10.20.30.50:9092"
|
||||
KAFKA_TOPICS=PROCESS_PAYMENT,LOAN_REPAYMENT
|
||||
|
||||
DATABASE_USER=firstadvance
|
||||
DATABASE_PASSWORD=FirstAdvance!
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
|
||||
KAFKA_TIMEOUT=1000.0
|
||||
KAFKA_BROKER="dev-events.simbrellang.net:9085"
|
||||
KAFKA_TOPICS=PROCESS_PAYMENT,LOAN_REPAYMENT
|
||||
|
||||
DATABASE_USER=firstadvance
|
||||
DATABASE_PASSWORD=FirstAdvance!
|
||||
|
||||
+1
-1
@@ -11,7 +11,7 @@ class Config:
|
||||
DEBUG = True
|
||||
|
||||
KAFKA_BROKER = os.getenv("KAFKA_BROKER", "dev-events.simbrellang.net:9085")
|
||||
KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT"
|
||||
KAFKA_TOPICS = [topic.strip() for topic in os.getenv("KAFKA_TOPICS", "").split(",") if topic.strip()]
|
||||
KAFKA_TIMEOUT = float( os.getenv("KAFKA_TIMEOUT", 1000.0) )
|
||||
|
||||
JWT_ACCESS_TOKEN_EXPIRES = os.getenv("JWT_ACCESS_TOKEN_EXPIRES", timedelta(hours=1))
|
||||
|
||||
@@ -42,22 +42,20 @@ class KafkaIntegration:
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def receive_disbursement_messages(topic, timeout):
|
||||
def receive_messages(topics, timeout):
|
||||
"""
|
||||
Receive messages from a Kafka topic.
|
||||
|
||||
:param topic: The Kafka topic to subscribe to
|
||||
:param topics: The Kafka topics to subscribe to
|
||||
:param timeout: Time to wait for a message (in seconds)
|
||||
:return: The message value (decoded) or None if no message is received
|
||||
"""
|
||||
consumer = KafkaIntegration._get_consumer()
|
||||
# consumer.subscribe(["PROCESS_PAYMENT"])
|
||||
# consumer.subscribe([topic])
|
||||
consumer.subscribe(['LOAN_REPAYMENT', 'PROCESS_PAYMENT'])
|
||||
consumer.subscribe(topics)
|
||||
|
||||
|
||||
logger.info(
|
||||
f"Waiting for messages from topic {topic} with this timeout: {timeout}..."
|
||||
f"Waiting for messages from topic {topics} with this timeout: {timeout}..."
|
||||
)
|
||||
message = []
|
||||
try:
|
||||
@@ -70,7 +68,7 @@ class KafkaIntegration:
|
||||
)
|
||||
|
||||
if msg is None:
|
||||
logger.info(f"No message received from topic {topic} within timeout")
|
||||
logger.info(f"No message received from topic {topics} within timeout")
|
||||
return None
|
||||
if msg.error():
|
||||
logger.info(f"Consumer error: {msg.error()}")
|
||||
|
||||
@@ -13,8 +13,8 @@ def start_kafka_consumer(app):
|
||||
while True:
|
||||
try:
|
||||
|
||||
message = kafka.receive_disbursement_messages(
|
||||
topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT
|
||||
message = kafka.receive_messages(
|
||||
topics=settings.KAFKA_TOPICS, timeout=settings.KAFKA_TIMEOUT
|
||||
)
|
||||
|
||||
if message:
|
||||
|
||||
Reference in New Issue
Block a user