diff --git a/.env b/.env index 150ebbc..7879a8e 100644 --- a/.env +++ b/.env @@ -13,7 +13,7 @@ API_URL="/swagger.json" # VALID_API_KEY=************* # BASIC_AUTH_USERNAME=****** # BASIC_AUTH_PASSWORD=****** -#THIS_SITE_URL="http://localhost:8090" +# THIS_SITE_URL="http://localhost:8090" THIS_SITE_URL="https://qa-panel.mermsemr.com" SWAGGER_URL="/documentation" @@ -25,7 +25,7 @@ DATABASE_HOST=10.20.30.60 DATABASE_PORT=5432 DATABASE_NAME=merms_panel -KAFKA_BROKER = 'dev-events.simbrellang.net:9085' +KAFKA_BROKER = '10.10.10.120:9092' # DATABASE_HOST=10.20.30.60 # DATABASE_USER=firstadvance diff --git a/.env.live b/.env.live index 2d90d8a..f2668fa 100644 --- a/.env.live +++ b/.env.live @@ -23,8 +23,8 @@ DATABASE_PASSWORD=merms_panel DATABASE_HOST=10.13.3.60 DATABASE_PORT=5432 DATABASE_NAME=merms_panel - -KAFKA_BROKER = 'dev-events.simbrellang.net:9085' +9092 +KAFKA_BROKER = '10.10.10.120:9092' #Publishable key STRIPE_PUB_KEY="pk_test_51RqL5WLjZLojw6IZmEpwFidNZSl9lLlVUHNvuFZNEz1eTR9XXepnyyVhfvXe9cp4eMnqkDPpoe9wxLLRSV0dxRee00UfhayUOT" diff --git a/.env.local.example b/.env.local.example index 424c9bd..e8e535e 100644 --- a/.env.local.example +++ b/.env.local.example @@ -33,7 +33,7 @@ VALID_API_KEY=test-api-key-12345 # Event Bus Broker Configuration -KAFKA_BROKER="10.0.0.246:9092" +KAFKA_BROKER = '10.10.10.120:9092' #Publishable key diff --git a/.env.qa b/.env.qa index 3d2d642..69eeb3f 100644 --- a/.env.qa +++ b/.env.qa @@ -24,7 +24,7 @@ DATABASE_HOST=10.20.30.60 DATABASE_PORT=5432 DATABASE_NAME=merms_panel -KAFKA_BROKER = 'dev-events.simbrellang.net:9085' +KAFKA_BROKER = '10.10.10.120:9092' # DATABASE_HOST=10.20.30.60 # DATABASE_USER=firstadvance diff --git a/.production.env b/.production.env index 2a5f7c1..068234b 100644 --- a/.production.env +++ b/.production.env @@ -23,7 +23,7 @@ DATABASE_HOST=10.20.30.60 DATABASE_PORT=5432 DATABASE_NAME=merms_panel -KAFKA_BROKER = 'dev-events.simbrellang.net:9085' +KAFKA_BROKER = '10.10.10.120:9092' # DATABASE_HOST=10.20.30.60 # DATABASE_USER=firstadvance diff --git a/.qa.env b/.qa.env index 2a5f7c1..068234b 100644 --- a/.qa.env +++ b/.qa.env @@ -23,7 +23,7 @@ DATABASE_HOST=10.20.30.60 DATABASE_PORT=5432 DATABASE_NAME=merms_panel -KAFKA_BROKER = 'dev-events.simbrellang.net:9085' +KAFKA_BROKER = '10.10.10.120:9092' # DATABASE_HOST=10.20.30.60 # DATABASE_USER=firstadvance diff --git a/app/api/integrations/kafka.py b/app/api/integrations/kafka.py index fa3f8d1..1a64be9 100644 --- a/app/api/integrations/kafka.py +++ b/app/api/integrations/kafka.py @@ -41,6 +41,28 @@ class KafkaIntegration: logger.debug(f'Message delivered to {msg.topic()} [{msg.partition()}] @ offset {msg.offset()}') + @staticmethod + def send_setting_refresh_request(settings_data, subscription_uid, topic): + try: + # Proceed to send loan request to Kafka + producer = KafkaIntegration._get_producer() + + # Sending loan request message to Kafka + producer.produce( + topic=topic, + key=str(subscription_uid), + value=json.dumps(settings_data).encode("utf-8"), + callback=KafkaIntegration.delivery_report, + ) + + producer.poll(0) + logger.info(f"Settings request {subscription_uid} queued for processing") + except Exception as e: + logger.error( + f"Failed to send settings request to Kafka: {str(e)}", exc_info=True + ) + raise Exception(f"Failed to send settings request to Kafka: {str(e)}") + @staticmethod def send_loan_request(loan_data, request_id, topic): diff --git a/app/api/services/base_service.py b/app/api/services/base_service.py index 4bd9a74..cef870b 100644 --- a/app/api/services/base_service.py +++ b/app/api/services/base_service.py @@ -168,7 +168,13 @@ class BaseService: type = cls.TRANSACTION_TYPE, channel = channel, ) - + + @classmethod + def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic): + KafkaIntegration.send_setting_refresh_request(settings_data = settings_data, subscription_uid =subscription_uid, topic = topic) + KafkaIntegration.flush() + + @classmethod def async_send_to_kafka(cls, loan_data, request_id, topic): KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id, topic = topic) diff --git a/app/api/services/myproduct.py b/app/api/services/myproduct.py index 66088f1..1fb9a37 100644 --- a/app/api/services/myproduct.py +++ b/app/api/services/myproduct.py @@ -8,6 +8,7 @@ from app.api.helpers.response_helper import ResponseHelper from app.api.schemas.myproduct import MyProductSchema from app.api.schemas.provision import ProvisionSchema from app.api.schemas.myproduct_settings import MyProductSettingsSchema +from threading import Thread import datetime import jwt @@ -172,15 +173,15 @@ class MyProductsService(BaseService): logger.info(f"Incoming MyProduct data ==>>>> {memberSubscription}") productDataStatus = memberSubscription.status product_subscription_uid = memberSubscription.uid - product_subscription_external_url = memberSubscription.external_url - product_subscription_internal_url = memberSubscription.internal_url + # product_subscription_external_url = memberSubscription.external_url + # product_subscription_internal_url = memberSubscription.internal_url - result_data = { - "myproudct": { - "result": "Reveived under development ", - "message": "to be fixed" - } - } + # result_data = { + # "myproudct": { + # "result": "Reveived under development ", + # "message": "to be fixed" + # } + # } for key in settings.keys(): setting_value = settings[key] @@ -189,10 +190,14 @@ class MyProductsService(BaseService): # Simulate processing response_data = { - "result": result_data, "member_id": member_id, - "uid": uid, + "subscription_uid": str(subscription_uid), + "uid": str(uid), } + logger.error(f"Going for Thread ******************** ") + thread = Thread(target=MyProductsService.async_send_settings_refresh_to_kafka, args=(response_data, subscription_uid, "REFRESH_PRODUCT_SETTINGS")) + thread.start() + logger.error(f"After the Thread ******************** ") return ResponseHelper.success(data=response_data) @@ -298,8 +303,6 @@ class MyProductsService(BaseService): product_id = validated_data.get('product_id') product_subscription_uid = '' - product_subscription_external_url = '' - product_subscription_internal_url = '' product_data = Products.get_product_by_product_id(product_id) product_description = ProductsDetails.get_product_details_with_product_id('A000002') productDataStatus = product_data.status @@ -318,6 +321,7 @@ class MyProductsService(BaseService): "product_subscription_uid": product_subscription_uid, } + return ResponseHelper.success(data=response_data) except ValidationError as err: diff --git a/app/config.py b/app/config.py index 0488ef7..f81ae68 100644 --- a/app/config.py +++ b/app/config.py @@ -37,7 +37,7 @@ class Config: ) # KAFKA_BROKER = 'dev-events.simbrellang.net:9085' - KAFKA_BROKER = os.getenv("KAFKA_BROKER", "dev-events.simbrellang.net:9085") + KAFKA_BROKER = os.getenv("KAFKA_BROKER", "10.10.10.120:9092") # SIMBRELLA_ENDPOINT_RAC_CHECKS = os.getenv("SIMBRELLA_ENDPOINT_RAC_CHECKS", "RACCheck") VALID_APP_ID = os.getenv("SIMBRELLA_APP_ID", "app1")