first commit
This commit is contained in:
@@ -0,0 +1,55 @@
|
||||
// import { Kafka } from "kafkajs";
|
||||
const dotenv = require('dotenv');
|
||||
const env = dotenv.config();
|
||||
//console.log("ENVVVVV ", env);
|
||||
|
||||
const {Kafka} = require('kafkajs');
|
||||
const kafkaBroker1 = process.env.WRENCHJOB_KAFKA_BROKER01;
|
||||
|
||||
console.log("kafkaBroker1", kafkaBroker1);
|
||||
|
||||
const kafka = new Kafka({
|
||||
clientId: "wallet-kafka",
|
||||
brokers: [kafkaBroker1],
|
||||
});
|
||||
class KafkaConfig {
|
||||
|
||||
constructor() {
|
||||
console.log("kafkaBroker1 ******** ", kafkaBroker1);
|
||||
this.producer = kafka.producer();
|
||||
this.consumer = kafka.consumer({ groupId: "test-group" });
|
||||
}
|
||||
|
||||
async produce(topic, messages) {
|
||||
try {
|
||||
console.log("kafkaBroker1 ######### ", kafkaBroker1);
|
||||
await this.producer.connect();
|
||||
await this.producer.send({
|
||||
topic: topic,
|
||||
messages: messages,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
} finally {
|
||||
await this.producer.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
async consume(topic, callback) {
|
||||
try {
|
||||
await this.consumer.connect();
|
||||
await this.consumer.subscribe({ topic: topic, fromBeginning: true });
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }) => {
|
||||
const value = message.value.toString();
|
||||
callback(value);
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = KafkaConfig;
|
||||
//export default KafkaConfig;
|
||||
Reference in New Issue
Block a user