From 51480d664af4cb15b69a8e9421a3e9d5a0af84d9 Mon Sep 17 00:00:00 2001 From: lennyaiko Date: Thu, 10 Apr 2025 13:16:02 +0100 Subject: [PATCH] progress on kafka --- app/__pycache__/config.cpython-310.pyc | Bin 524 -> 621 bytes app/config.py | 6 +- app/integrations/__init__.py | 1 + app/integrations/kafka.py | 85 +++++++++++++++++++++++++ requirements.txt | 3 +- 5 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 app/integrations/__init__.py create mode 100644 app/integrations/kafka.py diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index b2fd6e99994a3829c8a8ed46f7792c53ba567b34..7d5cbe934bd071b98a54e75367b092da14c76d2c 100644 GIT binary patch delta 170 zcmeBSdCS6^&&$ij00bqQzNgnu9O-j4B3LH^#ZK~cgG9#m0$h<|{m^W Xig|zp2LlTu4-*hGPS$6#Wn=;XPf-on diff --git a/app/config.py b/app/config.py index 598d366..a466a3a 100644 --- a/app/config.py +++ b/app/config.py @@ -1,9 +1,13 @@ import os + class Config: """Base configuration for Flask app""" SECRET_KEY = os.getenv("SECRET_KEY", "supersecretkey") API_BASE_URL = "https://coreapi.dev.simbrellang.net/v1/api/salary" JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") - DEBUG = True \ No newline at end of file + DEBUG = True + + KAFKA_BROKER = "dev-events.simbrellang.net:9085" + KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" diff --git a/app/integrations/__init__.py b/app/integrations/__init__.py new file mode 100644 index 0000000..f19ae60 --- /dev/null +++ b/app/integrations/__init__.py @@ -0,0 +1 @@ +from .kafka import KafkaIntegration diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py new file mode 100644 index 0000000..0a10c29 --- /dev/null +++ b/app/integrations/kafka.py @@ -0,0 +1,85 @@ +from confluent_kafka import Producer +import json +import logging +from app.config import settings + +logger = logging.getLogger(__name__) + + +class KafkaIntegration: + _producer = None + _config = { + "bootstrap.servers": settings.KAFKA_BROKER, + "client.id": "loan-service-producer", + "acks": "all", + "retries": 3, + "debug": "broker,topic,msg", + } + + @staticmethod + def _get_producer(): + """Kafka producer""" + if not KafkaIntegration._producer: + KafkaIntegration._producer = Producer(KafkaIntegration._config) + + logger.info( + f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" + ) + + return KafkaIntegration._producer + + @staticmethod + def delivery_report(err, msg): + """Called once for each message produced""" + if err is not None: + + logger.error(f"Message delivery failed: {err}") + raise RuntimeError(f"Message delivery failed: {err}") + + else: + logger.debug( + f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" + ) + + @staticmethod + def send_disbursement_request(disbursement_data, request_id): + """ + Send disbursement request to PROCESS_PAYMENT topic + + Args: + disbursement_data (dict): Disbursement data request payload + request_id (str): Unique request identifier (used as Kafka key) + + Returns: + None + """ + try: + # proceed to send the disbursement request to Kafka + producer = KafkaIntegration._get_producer() + + # sending disbursement request to Kafka + producer.produce( + "PROCESS_PAYMENT", + key=str(request_id), + value=json.dumps(disbursement_data).encode("utf-8"), + callback=KafkaIntegration.delivery_report, + ) + + producer.poll(0) + + logger.info( + f"Disbursement request {request_id} sent to Kafka: {disbursement_data}" + ) + except Exception as e: + logger.error( + f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" + ) + raise Exception( + f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" + ) + + @staticmethod + def flush(): + """Shutdown""" + producer = KafkaIntegration._get_producer() + producer.flush() diff --git a/requirements.txt b/requirements.txt index ae70549..a5968da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ Flask-Marshmallow==0.15.0 marshmallow==3.19.0 Flask-Cors==3.0.10 gunicorn -requests \ No newline at end of file +requests +confluent-kafka==1.9.2 \ No newline at end of file