kakka parts

This commit is contained in:
CHIEFSOFT\ameye
2024-07-06 08:10:03 -04:00
parent 0bbb4edc08
commit d9258ea3d7
4 changed files with 70 additions and 9 deletions
+1 -1
View File
@@ -16,6 +16,6 @@ module.exports = function(app) {
.get(controller.getUsersEscrows);
app.route('/flutterOkHook')
.get(controller.flutterOkHook);
.post(controller.flutterOkHook);
};
+46
View File
@@ -0,0 +1,46 @@
// import { Kafka } from "kafkajs";
const {Kafka} = require('kafkajs');
const kafka = new Kafka({
clientId: "nodejs-kafka",
brokers: ["10.10.10.120:9092"],
});
class KafkaConfig {
constructor() {
this.producer = kafka.producer();
this.consumer = kafka.consumer({ groupId: "test-group" });
}
async produce(topic, messages) {
try {
await this.producer.connect();
await this.producer.send({
topic: topic,
messages: messages,
});
} catch (error) {
console.error(error);
} finally {
await this.producer.disconnect();
}
}
async consume(topic, callback) {
try {
await this.consumer.connect();
await this.consumer.subscribe({ topic: topic, fromBeginning: true });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const value = message.value.toString();
callback(value);
},
});
} catch (error) {
console.error(error);
}
}
}
module.exports = KafkaConfig;
//export default KafkaConfig;
+1
View File
@@ -4,6 +4,7 @@ const cookieParser = require('cookie-parser');
const bodyParser = require('body-parser');
const logger = require('./app/logger');
const port = process.env.PORT || 3000;
const KafkaConfig = require("./app/kconfig");
const app = express();
+22 -8
View File
@@ -3,20 +3,34 @@
const request = require('request');
const db = require('../app/db')
const logger = require('../app/logger');
const KafkaConfig = require("../app/kconfig");
var ebroker = {
eventPublish: function (req, res, next) {
//console.log("REQ---->",req.body.uid);
var data = {
"uid": req.body.uid,
"member_id": req.body.member_id,
"limit": (req.body.limit != null && req.body.limit !== "") ? req.body.limit : 20,
"sessionid": req.body.sessionid,
"page": req.body.page
};
try {
const { message } = req.body;
console.log('THIS-> req.body -> ', req.body);
const kafkaConfig = new KafkaConfig();
const messages = [{ key: "key1", value: message }];
// const messages = [{ key: "key1", value: "ameye olusesan" }];
kafkaConfig.produce("FLUTTER_PAYMENT_RECEIVED", messages).then(r =>{
console.log('THIS->RET-> ',r);
} );
res.status(200).json({
status: "Ok!",
message: "Message successfully send!",
});
} catch (error) {
console.log(error);
}
let resultItem ={
"result": [],
"total_record": 0
}
next(null, resultItem ); // pass control to the next handler
},
eventConsumer: function (req, res, next) {