Files
digifi-EventManager/app/integrations/kafka.py
T
2025-04-10 13:16:02 +01:00

86 lines
2.5 KiB
Python

from confluent_kafka import Producer
import json
import logging
from app.config import settings
logger = logging.getLogger(__name__)
class KafkaIntegration:
_producer = None
_config = {
"bootstrap.servers": settings.KAFKA_BROKER,
"client.id": "loan-service-producer",
"acks": "all",
"retries": 3,
"debug": "broker,topic,msg",
}
@staticmethod
def _get_producer():
"""Kafka producer"""
if not KafkaIntegration._producer:
KafkaIntegration._producer = Producer(KafkaIntegration._config)
logger.info(
f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}"
)
return KafkaIntegration._producer
@staticmethod
def delivery_report(err, msg):
"""Called once for each message produced"""
if err is not None:
logger.error(f"Message delivery failed: {err}")
raise RuntimeError(f"Message delivery failed: {err}")
else:
logger.debug(
f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}"
)
@staticmethod
def send_disbursement_request(disbursement_data, request_id):
"""
Send disbursement request to PROCESS_PAYMENT topic
Args:
disbursement_data (dict): Disbursement data request payload
request_id (str): Unique request identifier (used as Kafka key)
Returns:
None
"""
try:
# proceed to send the disbursement request to Kafka
producer = KafkaIntegration._get_producer()
# sending disbursement request to Kafka
producer.produce(
"PROCESS_PAYMENT",
key=str(request_id),
value=json.dumps(disbursement_data).encode("utf-8"),
callback=KafkaIntegration.delivery_report,
)
producer.poll(0)
logger.info(
f"Disbursement request {request_id} sent to Kafka: {disbursement_data}"
)
except Exception as e:
logger.error(
f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}"
)
raise Exception(
f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}"
)
@staticmethod
def flush():
"""Shutdown"""
producer = KafkaIntegration._get_producer()
producer.flush()