From 51480d664af4cb15b69a8e9421a3e9d5a0af84d9 Mon Sep 17 00:00:00 2001 From: lennyaiko Date: Thu, 10 Apr 2025 13:16:02 +0100 Subject: [PATCH 1/5] progress on kafka --- app/__pycache__/config.cpython-310.pyc | Bin 524 -> 621 bytes app/config.py | 6 +- app/integrations/__init__.py | 1 + app/integrations/kafka.py | 85 +++++++++++++++++++++++++ requirements.txt | 3 +- 5 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 app/integrations/__init__.py create mode 100644 app/integrations/kafka.py diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index b2fd6e99994a3829c8a8ed46f7792c53ba567b34..7d5cbe934bd071b98a54e75367b092da14c76d2c 100644 GIT binary patch delta 170 zcmeBSdCS6^&&$ij00bqQzNgnu9O-j4B3LH^#ZK~cgG9#m0$h<|{m^W Xig|zp2LlTu4-*hGPS$6#Wn=;XPf-on diff --git a/app/config.py b/app/config.py index 598d366..a466a3a 100644 --- a/app/config.py +++ b/app/config.py @@ -1,9 +1,13 @@ import os + class Config: """Base configuration for Flask app""" SECRET_KEY = os.getenv("SECRET_KEY", "supersecretkey") API_BASE_URL = "https://coreapi.dev.simbrellang.net/v1/api/salary" JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") - DEBUG = True \ No newline at end of file + DEBUG = True + + KAFKA_BROKER = "dev-events.simbrellang.net:9085" + KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" diff --git a/app/integrations/__init__.py b/app/integrations/__init__.py new file mode 100644 index 0000000..f19ae60 --- /dev/null +++ b/app/integrations/__init__.py @@ -0,0 +1 @@ +from .kafka import KafkaIntegration diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py new file mode 100644 index 0000000..0a10c29 --- /dev/null +++ b/app/integrations/kafka.py @@ -0,0 +1,85 @@ +from confluent_kafka import Producer +import json +import logging +from app.config import settings + +logger = logging.getLogger(__name__) + + +class KafkaIntegration: + _producer = None + _config = { + "bootstrap.servers": settings.KAFKA_BROKER, + "client.id": "loan-service-producer", + "acks": "all", + "retries": 3, + "debug": "broker,topic,msg", + } + + @staticmethod + def _get_producer(): + """Kafka producer""" + if not KafkaIntegration._producer: + KafkaIntegration._producer = Producer(KafkaIntegration._config) + + logger.info( + f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" + ) + + return KafkaIntegration._producer + + @staticmethod + def delivery_report(err, msg): + """Called once for each message produced""" + if err is not None: + + logger.error(f"Message delivery failed: {err}") + raise RuntimeError(f"Message delivery failed: {err}") + + else: + logger.debug( + f"Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}" + ) + + @staticmethod + def send_disbursement_request(disbursement_data, request_id): + """ + Send disbursement request to PROCESS_PAYMENT topic + + Args: + disbursement_data (dict): Disbursement data request payload + request_id (str): Unique request identifier (used as Kafka key) + + Returns: + None + """ + try: + # proceed to send the disbursement request to Kafka + producer = KafkaIntegration._get_producer() + + # sending disbursement request to Kafka + producer.produce( + "PROCESS_PAYMENT", + key=str(request_id), + value=json.dumps(disbursement_data).encode("utf-8"), + callback=KafkaIntegration.delivery_report, + ) + + producer.poll(0) + + logger.info( + f"Disbursement request {request_id} sent to Kafka: {disbursement_data}" + ) + except Exception as e: + logger.error( + f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" + ) + raise Exception( + f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" + ) + + @staticmethod + def flush(): + """Shutdown""" + producer = KafkaIntegration._get_producer() + producer.flush() diff --git a/requirements.txt b/requirements.txt index ae70549..a5968da 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ Flask-Marshmallow==0.15.0 marshmallow==3.19.0 Flask-Cors==3.0.10 gunicorn -requests \ No newline at end of file +requests +confluent-kafka==1.9.2 \ No newline at end of file From 34cba74580ec55b5aa24f5b23f848eaaf3c65309 Mon Sep 17 00:00:00 2001 From: lennyaiko Date: Thu, 10 Apr 2025 13:59:04 +0100 Subject: [PATCH 2/5] progress on kafka --- app.py | 16 ++- app/__pycache__/config.cpython-310.pyc | Bin 621 -> 639 bytes app/config.py | 5 +- app/integrations/kafka.py | 123 +++++++++++------- .../authentication.cpython-310.pyc | Bin 1721 -> 1721 bytes app/routes/loan.py | 8 +- 6 files changed, 102 insertions(+), 50 deletions(-) diff --git a/app.py b/app.py index 031e1d7..51894a3 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,20 @@ from app import create_app +from app.integrations import KafkaIntegration app = create_app() if __name__ == "__main__": - app.run(host="0.0.0.0", port=5000, debug=True) \ No newline at end of file + kafka = KafkaIntegration() + + if kafka.is_kafka_running(): + print("Kafka is running") + else: + print("Kafka is not running") + exit(1) + + try: + message = kafka.receive_messages( + topic= + ) + + app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index 7d5cbe934bd071b98a54e75367b092da14c76d2c..bdcb00b7bf4b03fc7dcf43a9b18b24a09fda6047 100644 GIT binary patch delta 80 zcmaFM@}Gq_pO=@50SLbB{hmH`BCjl?!bELJwp7MuhA5_uF7k|wCX-VbFN-o4aRL<; fF#(BN9L1?6C7F5Y#gm?DHMVvs{A|@a)IfzM_ Mg9XUoU=U&k0Oha@lmGw# diff --git a/app/config.py b/app/config.py index a466a3a..a56a39a 100644 --- a/app/config.py +++ b/app/config.py @@ -9,5 +9,8 @@ class Config: JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") DEBUG = True - KAFKA_BROKER = "dev-events.simbrellang.net:9085" + KAFKA_BROKER = "dev-events.simbrellang.net:9084" KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" + + +settings = Config() diff --git a/app/integrations/kafka.py b/app/integrations/kafka.py index 0a10c29..fc66890 100644 --- a/app/integrations/kafka.py +++ b/app/integrations/kafka.py @@ -1,32 +1,31 @@ -from confluent_kafka import Producer +from confluent_kafka import Consumer import json import logging from app.config import settings +import requests logger = logging.getLogger(__name__) class KafkaIntegration: - _producer = None - _config = { + _consumer = None + + _consumer_config = { "bootstrap.servers": settings.KAFKA_BROKER, - "client.id": "loan-service-producer", - "acks": "all", - "retries": 3, - "debug": "broker,topic,msg", + "group.id": "loan-service-consumer", + "auto.offset.reset": "earliest", + "enable.auto.commit": True, } @staticmethod - def _get_producer(): - """Kafka producer""" - if not KafkaIntegration._producer: - KafkaIntegration._producer = Producer(KafkaIntegration._config) - + def _get_consumer(): + """Kafka consumer""" + if not KafkaIntegration._consumer: + KafkaIntegration._consumer = Consumer(KafkaIntegration._consumer_config) logger.info( - f"Connected to Kafka broker at {KafkaIntegration._config['bootstrap.servers']}" + f"Consumer connected to Kafka broker at {KafkaIntegration._consumer_config['bootstrap.servers']}" ) - - return KafkaIntegration._producer + return KafkaIntegration._consumer @staticmethod def delivery_report(err, msg): @@ -42,44 +41,76 @@ class KafkaIntegration: ) @staticmethod - def send_disbursement_request(disbursement_data, request_id): + def receive_messages(topic, timeout=1.0): """ - Send disbursement request to PROCESS_PAYMENT topic + Receive messages from a Kafka topic. - Args: - disbursement_data (dict): Disbursement data request payload - request_id (str): Unique request identifier (used as Kafka key) - - Returns: - None + :param topic: The Kafka topic to subscribe to + :param timeout: Time to wait for a message (in seconds) + :return: The message value (decoded) or None if no message is received """ + consumer = KafkaIntegration._get_consumer() + consumer.subscribe([topic]) + try: - # proceed to send the disbursement request to Kafka - producer = KafkaIntegration._get_producer() + msg = consumer.poll(timeout=timeout) + if msg is None: + logger.debug(f"No message received from topic {topic} within timeout") + return None + if msg.error(): + logger.error(f"Consumer error: {msg.error()}") + raise RuntimeError(f"Consumer error: {msg.error()}") - # sending disbursement request to Kafka - producer.produce( - "PROCESS_PAYMENT", - key=str(request_id), - value=json.dumps(disbursement_data).encode("utf-8"), - callback=KafkaIntegration.delivery_report, + # Decode and return the message value + message_value = msg.value().decode("utf-8") + logger.debug( + f"Received message from {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}: {message_value}" ) + return json.loads(message_value) if message_value else None - producer.poll(0) - - logger.info( - f"Disbursement request {request_id} sent to Kafka: {disbursement_data}" - ) except Exception as e: - logger.error( - f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" - ) - raise Exception( - f"Failed to send disbursement request {str(request_id)} to Kafka: {str(e)}" - ) + logger.error(f"Error while receiving message: {e}") + raise @staticmethod - def flush(): - """Shutdown""" - producer = KafkaIntegration._get_producer() - producer.flush() + def close_consumer(): + """Shutdown consumer""" + consumer = KafkaIntegration._get_consumer() + consumer.clonse() + logger.info("Kafka consumer closed") + + @staticmethod + def is_kafka_running(): + """ + Check if Kafka broker is reachable by attempting to fetch metadata. + :return: True if Kafka is running, False otherwise + """ + try: + producer = KafkaIntegration._get_producer() + # Attempt to fetch metadata for a dummy topic + metadata = producer.list_topics(timeout=5) + if metadata.brokers: # If brokers are returned, Kafka is running + logger.info(f"Kafka broker at {settings.KAFKA_BROKER} is running") + return True + else: + logger.warning(f"No brokers found for {settings.KAFKA_BROKER}") + return False + except Exception as e: + logger.error(f"Kafka broker check failed: {e}") + return False + + @staticmethod + def _call_disbursement_endpoint( + message, endpoint_url="http://localhost:8000/loans/disbursement" + ): + """Call an HTTP endpoint with the received message""" + try: + response = requests.post(endpoint_url, json=message, timeout=5) + response.raise_for_status() + logger.info( + f"Successfully sent message to {endpoint_url}: {response.status_code}" + ) + print(response.json()) + except requests.exceptions.RequestException as e: + logger.error(f"Failed to call endpoint {endpoint_url}: {e}") + raise diff --git a/app/routes/__pycache__/authentication.cpython-310.pyc b/app/routes/__pycache__/authentication.cpython-310.pyc index 84c166987f20fe9c77a9317f476174523c260f06..b1dd047329fca2895af92878fb941c4e09f9b0bc 100644 GIT binary patch delta 20 acmdnVyOWnYpO=@50SL5rec#Bvjtu}c2?cim delta 20 acmdnVyOWnYpO=@50SFAtpKjz{#|8i~zy#+2 diff --git a/app/routes/loan.py b/app/routes/loan.py index f13a5fa..5c80968 100644 --- a/app/routes/loan.py +++ b/app/routes/loan.py @@ -1,6 +1,8 @@ from flask import Blueprint, request, jsonify, current_app import requests from app.utils.auth import get_headers +from app.integrations import KafkaIntegration +from threading import Thread loan_bp = Blueprint("loan", __name__) @@ -154,8 +156,10 @@ def disbursement(): data = request.json api_url = f"{current_app.config['API_BASE_URL']}/Disbursement" - response = requests.post(api_url, json=data, headers=get_headers()) - return jsonify(response.json()), response.status_code + return jsonify({"requestId": data["requestId"]}), 200 + + # response = requests.post(api_url, json=data, headers=get_headers()) + # return jsonify(response.json()), response.status_code @loan_bp.route("/collect-loan", methods=["POST"]) From a2e4daa4e7eda8c747e184b085240abf6492137e Mon Sep 17 00:00:00 2001 From: lennyaiko Date: Thu, 10 Apr 2025 14:43:35 +0100 Subject: [PATCH 3/5] progress on kafka --- app.py | 13 +-------- app/__pycache__/config.cpython-310.pyc | Bin 639 -> 639 bytes app/integrations/kafka.py | 35 +++++++------------------ wsgi.py | 33 ++++++++++++++++++++++- 4 files changed, 43 insertions(+), 38 deletions(-) diff --git a/app.py b/app.py index 51894a3..23c7d2b 100644 --- a/app.py +++ b/app.py @@ -1,20 +1,9 @@ from app import create_app from app.integrations import KafkaIntegration +from app.config import settings app = create_app() if __name__ == "__main__": - kafka = KafkaIntegration() - - if kafka.is_kafka_running(): - print("Kafka is running") - else: - print("Kafka is not running") - exit(1) - - try: - message = kafka.receive_messages( - topic= - ) app.run(host="0.0.0.0", port=5000, debug=True) diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index bdcb00b7bf4b03fc7dcf43a9b18b24a09fda6047..1be7f16d96aae380d6a87638148ee8db881d9400 100644 GIT binary patch delta 99 zcmey*@}Gq(pO=@50SLaG+{kr{QSO$ox1*c4V|;*Pq_3-ANPLKYfT#0Hh9X{|k|N&8 nOiY0~5GI%pBKUyBEe@O9{FKt1R69l>ub2%;a4_&ObMXKG3mX-V delta 99 zcmey*@}Gq(pO=@50SLbB-N Date: Thu, 10 Apr 2025 14:57:35 +0100 Subject: [PATCH 4/5] progress on kafka --- app.py | 2 -- app/config.py | 1 + wsgi.py | 16 +--------------- 3 files changed, 2 insertions(+), 17 deletions(-) diff --git a/app.py b/app.py index 23c7d2b..51efc32 100644 --- a/app.py +++ b/app.py @@ -1,6 +1,4 @@ from app import create_app -from app.integrations import KafkaIntegration -from app.config import settings app = create_app() diff --git a/app/config.py b/app/config.py index a56a39a..c6ed9bc 100644 --- a/app/config.py +++ b/app/config.py @@ -11,6 +11,7 @@ class Config: KAFKA_BROKER = "dev-events.simbrellang.net:9084" KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" + KAFKA_TIMEOUT = 5.0 settings = Config() diff --git a/wsgi.py b/wsgi.py index 697264b..7137082 100644 --- a/wsgi.py +++ b/wsgi.py @@ -12,7 +12,7 @@ if __name__ != "__main__": while True: message = kafka.receive_disbursement_messages( - topic=settings.KAFKA_PAYMENT_TOPIC, timeout=1.0 + topic=settings.KAFKA_PAYMENT_TOPIC, timeout=settings.KAFKA_TIMEOUT ) if message: @@ -20,19 +20,5 @@ if __name__ != "__main__": else: logger.info("No message received within timeout") - # try: - # message = kafka.receive_disbursement_messages( - # topic=settings.KAFKA_PAYMENT_TOPIC, timeout=5.0 - # ) - - # if message: - # logger.info(f"Processed message: {message}") - # else: - # logger.info("No message received within timeout") - # except Exception as e: - # print(f"Error: {e}") - # finally: - # kafka.close_consumer() - # Expose WSGI app instance for Gunicorn wsgi_app = app From d713ddfede793be873598d44cf2ee4be43d5fb2f Mon Sep 17 00:00:00 2001 From: lennyaiko Date: Thu, 10 Apr 2025 15:33:54 +0100 Subject: [PATCH 5/5] progress on kafka --- app/__pycache__/config.cpython-310.pyc | Bin 639 -> 669 bytes app/config.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/app/__pycache__/config.cpython-310.pyc b/app/__pycache__/config.cpython-310.pyc index 1be7f16d96aae380d6a87638148ee8db881d9400..9e2ef39381da637f9485b5df8ae186027ff01aa1 100644 GIT binary patch delta 226 zcmey*GMAM%pO=@50SInh_@2IYB5yLI&BQ`WjwqfK?qCK@o{8&r%b7;;2L$;$y9Nix z2RKIhy84BrgETNOh&cFZ@=mT}tggSs>+R^~?HC{8>Feqr8nTk1hz}?aCVmO&Cl(Zd zadLiMT4uUlL1hsWP?iOVi-mv$2LlTu4-*hGvM{m$X%0rPye4xIC&&TZlX-=vgBamCn1|&Eb_?WqP0mVEpTL1t6 delta 145 zcmbQs`k#e2pO=@50SLaG{GL8_B5yLI#l%8OmS6@=?unari<(672L$;$y9Nix2RKIh zy84CqY4S|&W~}C1$xy@#)K*0)U|?b7VFF@C7Dg5(4#pxTpr9sm5hq9) dh<%HrIJKlCGcUausud&wHiU0-Ba<{E4*(l)9+Ln7 diff --git a/app/config.py b/app/config.py index c6ed9bc..74abe6d 100644 --- a/app/config.py +++ b/app/config.py @@ -9,7 +9,7 @@ class Config: JWT_SECRET_KEY = os.getenv("JWT_SECRET_KEY", "your_jwt_secret") DEBUG = True - KAFKA_BROKER = "dev-events.simbrellang.net:9084" + KAFKA_BROKER = "dev-events.simbrellang.net:9085" KAFKA_PAYMENT_TOPIC = "PROCESS_PAYMENT" KAFKA_TIMEOUT = 5.0