Multiple event monitoring added
This commit is contained in:
@@ -51,7 +51,10 @@ class KafkaIntegration:
|
||||
:return: The message value (decoded) or None if no message is received
|
||||
"""
|
||||
consumer = KafkaIntegration._get_consumer()
|
||||
consumer.subscribe([topic])
|
||||
# consumer.subscribe(["LOAN_REPAYMENT"])
|
||||
# consumer.subscribe([topic])
|
||||
consumer.subscribe(['LOAN_REPAYMENT', 'PROCESS_PAYMENT'])
|
||||
|
||||
|
||||
logger.info(
|
||||
f"Waiting for messages from topic {topic} with this timeout: {timeout}..."
|
||||
@@ -94,7 +97,16 @@ class KafkaIntegration:
|
||||
logger.info(
|
||||
f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message}"
|
||||
)
|
||||
KafkaIntegration._call_disbursement_endpoint(message)
|
||||
current_topic = msg.topic()
|
||||
|
||||
if current_topic=="PROCESS_PAYMENT":
|
||||
KafkaIntegration._call_disbursement_endpoint(message)
|
||||
|
||||
if current_topic=="LOAN_REPAYMENT":
|
||||
# Do loan repayment call here
|
||||
logger.info(
|
||||
f"Loan Repayment message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message}"
|
||||
)
|
||||
|
||||
return message
|
||||
|
||||
|
||||
Reference in New Issue
Block a user