diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index b2fd6e9..7d5cbe9 100644 Binary files a/app/__pycache__/config.cpython-310.pyc and b/app/__pycache__/config.cpython-310.pyc differ diff --git a/app/config.py b/app/config.py index 598d366..a466a3a 100644 --- a/app/config.py +++ b/app/config.py @@ -1,9 +1,13 @@ import os + class Config: """Base configuration for Flask app""" SECRET_KEY = os.getenv("SECRET_KEY", "supersecretkey") API_BASE_URL = "https://coreapi.dev.simbrellang.net/v1/api/salary" JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") - DEBUG = True \ No newline at end of file + DEBUG = True + + KAFKA_BROKER = "dev-events.simbrellang.net:9085" + KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" diff --git a/app/integrations/__init__.py b/app/integrations/__init__.py new file mode 100644 index 0000000..f19ae60 --- /dev/null +++ b/app/integrations/__init__.py @@ -0,0 +1 @@ +from .kafka import KafkaIntegration diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py new file mode 100644 index 0000000..0a10c29 --- /dev/null +++ b/app/integrations/kafka.py @@ -0,0 +1,85 @@ +from confluent_kafka import Producer +import json +import logging +from app.config import settings + +logger = logging.getLogger(__name__) + + +class KafkaIntegration: + _producer = None + _config = { + "bootstrap.servers": settings.KAFKA_BROKER, + "client.id": "loan-service-producer", + "acks": "all", + "retries": 3, + "debug": "broker,topic,msg", + } + + @staticmethod + def _get_producer(): + """Kafka producer""" + if not KafkaIntegration._producer: + KafkaIntegration._producer = Producer(KafkaIntegration._config) + + logger.info( + f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" + ) + + return KafkaIntegration._producer + + @staticmethod + def delivery_report(err, msg): + """Called once for each message produced""" + if err is not None: + + logger.error(f"Message delivery failed: {err}") + raise RuntimeError(f"Message delivery failed: {err}") + + else: + logger.debug( + f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" + ) + + @staticmethod + def send_disbursement_request(disbursement_data, request_id): + """ + Send disbursement request to PROCESS_PAYMENT topic + + Args: + disbursement_data (dict): Disbursement data request payload + request_id (str): Unique request identifier (used as Kafka key) + + Returns: + None + """ + try: + # proceed to send the disbursement request to Kafka + producer = KafkaIntegration._get_producer() + + # sending disbursement request to Kafka + producer.produce( + "PROCESS_PAYMENT", + key=str(request_id), + value=json.dumps(disbursement_data).encode("utf-8"), + callback=KafkaIntegration.delivery_report, + ) + + producer.poll(0) + + logger.info( + f"Disbursement request {request_id} sent to Kafka: {disbursement_data}" + ) + except Exception as e: + logger.error( + f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" + ) + raise Exception( + f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" + ) + + @staticmethod + def flush(): + """Shutdown""" + producer = KafkaIntegration._get_producer() + producer.flush() diff --git a/requirements.txt b/requirements.txt index ae70549..a5968da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ Flask-Marshmallow==0.15.0 marshmallow==3.19.0 Flask-Cors==3.0.10 gunicorn -requests \ No newline at end of file +requests +confluent-kafka==1.9.2 \ No newline at end of file