Files
CHIEFSOFT\ameye 1f2d886553 first commit
2024-12-21 09:55:29 -05:00

55 lines
1.5 KiB
JavaScript

// import { Kafka } from "kafkajs";
const dotenv = require('dotenv');
const env = dotenv.config();
//console.log("ENVVVVV ", env);
const {Kafka} = require('kafkajs');
const kafkaBroker1 = process.env.WRENCHJOB_KAFKA_BROKER01;
console.log("kafkaBroker1", kafkaBroker1);
const kafka = new Kafka({
clientId: "wallet-kafka",
brokers: [kafkaBroker1],
});
class KafkaConfig {
constructor() {
console.log("kafkaBroker1 ******** ", kafkaBroker1);
this.producer = kafka.producer();
this.consumer = kafka.consumer({ groupId: "test-group" });
}
async produce(topic, messages) {
try {
console.log("kafkaBroker1 ######### ", kafkaBroker1);
await this.producer.connect();
await this.producer.send({
topic: topic,
messages: messages,
});
} catch (error) {
console.error(error);
} finally {
await this.producer.disconnect();
}
}
async consume(topic, callback) {
try {
await this.consumer.connect();
await this.consumer.subscribe({ topic: topic, fromBeginning: true });
await this.consumer.run({
eachMessage: async ({ topic, partition, message }) => {
const value = message.value.toString();
callback(value);
},
});
} catch (error) {
console.error(error);
}
}
}
module.exports = KafkaConfig;
//export default KafkaConfig;