From d9258ea3d750b1118cf55c45be0728d7a573dad0 Mon Sep 17 00:00:00 2001 From: "CHIEFSOFT\\ameye" Date: Sat, 6 Jul 2024 08:10:03 -0400 Subject: [PATCH] kakka parts --- api/routes.js | 2 +- app/kconfig.js | 46 ++++++++++++++++++++++++++++++++++++++++++++++ server.js | 1 + service/ebroker.js | 30 ++++++++++++++++++++++-------- 4 files changed, 70 insertions(+), 9 deletions(-) create mode 100644 app/kconfig.js diff --git a/api/routes.js b/api/routes.js index 45853b7..f99dc56 100644 --- a/api/routes.js +++ b/api/routes.js @@ -16,6 +16,6 @@ module.exports = function(app) { .get(controller.getUsersEscrows); app.route('/flutterOkHook') - .get(controller.flutterOkHook); + .post(controller.flutterOkHook); }; \ No newline at end of file diff --git a/app/kconfig.js b/app/kconfig.js new file mode 100644 index 0000000..16d5de8 --- /dev/null +++ b/app/kconfig.js @@ -0,0 +1,46 @@ +// import { Kafka } from "kafkajs"; +const {Kafka} = require('kafkajs'); + +const kafka = new Kafka({ + clientId: "nodejs-kafka", + brokers: ["10.10.10.120:9092"], +}); +class KafkaConfig { + constructor() { + + this.producer = kafka.producer(); + this.consumer = kafka.consumer({ groupId: "test-group" }); + } + + async produce(topic, messages) { + try { + await this.producer.connect(); + await this.producer.send({ + topic: topic, + messages: messages, + }); + } catch (error) { + console.error(error); + } finally { + await this.producer.disconnect(); + } + } + + async consume(topic, callback) { + try { + await this.consumer.connect(); + await this.consumer.subscribe({ topic: topic, fromBeginning: true }); + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + const value = message.value.toString(); + callback(value); + }, + }); + } catch (error) { + console.error(error); + } + } +} + +module.exports = KafkaConfig; +//export default KafkaConfig; \ No newline at end of file diff --git a/server.js b/server.js index 74cbb71..105239d 100644 --- a/server.js +++ b/server.js @@ -4,6 +4,7 @@ const cookieParser = require('cookie-parser'); const bodyParser = require('body-parser'); const logger = require('./app/logger'); const port = process.env.PORT || 3000; +const KafkaConfig = require("./app/kconfig"); const app = express(); diff --git a/service/ebroker.js b/service/ebroker.js index 57200bd..4de4e7f 100644 --- a/service/ebroker.js +++ b/service/ebroker.js @@ -3,20 +3,34 @@ const request = require('request'); const db = require('../app/db') const logger = require('../app/logger'); +const KafkaConfig = require("../app/kconfig"); var ebroker = { eventPublish: function (req, res, next) { - //console.log("REQ---->",req.body.uid); - var data = { - "uid": req.body.uid, - "member_id": req.body.member_id, - "limit": (req.body.limit != null && req.body.limit !== "") ? req.body.limit : 20, - "sessionid": req.body.sessionid, - "page": req.body.page - }; + try { + const { message } = req.body; + console.log('THIS-> req.body -> ', req.body); + const kafkaConfig = new KafkaConfig(); + const messages = [{ key: "key1", value: message }]; + // const messages = [{ key: "key1", value: "ameye olusesan" }]; + kafkaConfig.produce("FLUTTER_PAYMENT_RECEIVED", messages).then(r =>{ + console.log('THIS->RET-> ',r); + } ); + res.status(200).json({ + status: "Ok!", + message: "Message successfully send!", + }); + } catch (error) { + console.log(error); + } + let resultItem ={ + "result": [], + "total_record": 0 + } + next(null, resultItem ); // pass control to the next handler }, eventConsumer: function (req, res, next) {