kafka on Jobs
This commit is contained in:
@@ -1,2 +1,3 @@
|
||||
WRENCHJOB_PORT=3000
|
||||
WRENCHJOB_POSTGRE_URL='postgresql://wrenchboard:wrenchboard@10.20.30.60:5432/wrenchboard'
|
||||
WRENCHJOB_KAFKA_BROKER01='10.10.10.120:9092'
|
||||
+11
-1
@@ -3,6 +3,7 @@
|
||||
const properties = require('../package.json')
|
||||
const jobs = require('../service/jobs');
|
||||
const logger = require('../app/logger');
|
||||
const ebroker = require('../service/ebroker');
|
||||
|
||||
var controllers = {
|
||||
getMarketJobs: function(req, res) {
|
||||
@@ -22,7 +23,16 @@ var controllers = {
|
||||
}
|
||||
res.json(dist);
|
||||
});
|
||||
},
|
||||
},
|
||||
newJobPublish: function(req, res) {
|
||||
ebroker.eventNewJobPublish(req, res, function(err, dist) {
|
||||
if (err) {
|
||||
res.send(err);
|
||||
}
|
||||
// res.json(dist);
|
||||
res.status(200).json({'status': 'OK', 'internal_return': 0, 'result_list': dist.result,'total_record': dist.total_record })
|
||||
});
|
||||
},
|
||||
};
|
||||
|
||||
module.exports = controllers;
|
||||
|
||||
+2
-1
@@ -9,5 +9,6 @@ module.exports = function(app) {
|
||||
.get(controller.getMarketJobsFiles);
|
||||
app.route('/marketjob3s/:id')
|
||||
.get(controller.getStatus);
|
||||
|
||||
app.route('/jobAdded')
|
||||
.post(controller.newJobPublish);
|
||||
};
|
||||
@@ -0,0 +1,53 @@
|
||||
// import { Kafka } from "kafkajs";
|
||||
const dotenv = require('dotenv');
|
||||
const env = dotenv.config();
|
||||
//console.log("ENVVVVV ", env);
|
||||
|
||||
const {Kafka} = require('kafkajs');
|
||||
const kafkaBroker1 = process.env.WRENCHJOB_KAFKA_BROKER01;
|
||||
|
||||
console.log("kafkaBroker1", kafkaBroker1);
|
||||
|
||||
const kafka = new Kafka({
|
||||
clientId: "wallet-kafka",
|
||||
brokers: [kafkaBroker1],
|
||||
});
|
||||
class KafkaConfig {
|
||||
|
||||
constructor() {
|
||||
this.producer = kafka.producer();
|
||||
this.consumer = kafka.consumer({ groupId: "test-group" });
|
||||
}
|
||||
|
||||
async produce(topic, messages) {
|
||||
try {
|
||||
await this.producer.connect();
|
||||
await this.producer.send({
|
||||
topic: topic,
|
||||
messages: messages,
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
} finally {
|
||||
await this.producer.disconnect();
|
||||
}
|
||||
}
|
||||
|
||||
async consume(topic, callback) {
|
||||
try {
|
||||
await this.consumer.connect();
|
||||
await this.consumer.subscribe({ topic: topic, fromBeginning: true });
|
||||
await this.consumer.run({
|
||||
eachMessage: async ({ topic, partition, message }) => {
|
||||
const value = message.value.toString();
|
||||
callback(value);
|
||||
},
|
||||
});
|
||||
} catch (error) {
|
||||
console.error(error);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = KafkaConfig;
|
||||
//export default KafkaConfig;
|
||||
@@ -14,7 +14,10 @@
|
||||
"dependencies": {
|
||||
"axios": "^0.24.0",
|
||||
"body-parser": "^1.19.0",
|
||||
"cookie-parser": "^1.4.6",
|
||||
"dotenv": "^16.4.5",
|
||||
"express": "^4.17.1",
|
||||
"kafkajs": "^2.2.4",
|
||||
"openapi-types": "^10.0.0",
|
||||
"pg": "8.7.1",
|
||||
"pg-pool": "^3.5.1",
|
||||
|
||||
@@ -1,12 +1,29 @@
|
||||
const express = require('express');
|
||||
const logger = require('./app/logger');
|
||||
const cookieParser = require('cookie-parser');
|
||||
const bodyParser = require('body-parser');
|
||||
const port = process.env.PORT || 3000;
|
||||
|
||||
const app = express();
|
||||
|
||||
|
||||
// create application/json parser
|
||||
var jsonParser = bodyParser.json(); // express.json();
|
||||
|
||||
// create application/x-www-form-urlencoded parser
|
||||
var urlencodedParser = bodyParser.urlencoded({ extended: false }); // express.bodyParser({extended: true});
|
||||
app.use(cookieParser());
|
||||
|
||||
app.use(urlencodedParser);
|
||||
app.use(jsonParser);
|
||||
|
||||
|
||||
app.use(express.json());
|
||||
app.use(express.urlencoded());
|
||||
|
||||
// parse application/vnd.api+json as json
|
||||
app.use(bodyParser.json({ type: 'application/vnd.api+json' }))
|
||||
|
||||
const routes = require('./api/routes');
|
||||
routes(app);
|
||||
|
||||
|
||||
@@ -0,0 +1,50 @@
|
||||
'use strict';
|
||||
|
||||
const request = require('request');
|
||||
const db = require('../app/db')
|
||||
const logger = require('../app/logger');
|
||||
const KafkaConfig = require("../app/kconfig");
|
||||
|
||||
var ebroker = {
|
||||
eventNewJobPublish: function (req, res, next) {
|
||||
|
||||
try {
|
||||
const { message } = req.body;
|
||||
// console.log('THIS-> req.body -> ', req.body);
|
||||
const kafka_topic ="NEW_OFFER_"+ message.assign_mode;
|
||||
const kafkaConfig = new KafkaConfig();
|
||||
const messages = [{ key: "offer", value: message }];
|
||||
kafkaConfig.produce(kafka_topic, messages).then(r =>{
|
||||
console.log('THIS->RET-> ',r);
|
||||
} );
|
||||
|
||||
res.status(200).json({
|
||||
status: "Ok!",
|
||||
message: "Message successfully send!",
|
||||
});
|
||||
} catch (error) {
|
||||
console.log(error);
|
||||
}
|
||||
|
||||
let resultItem ={
|
||||
"result": [],
|
||||
"total_record": 0
|
||||
}
|
||||
next(null, resultItem ); // pass control to the next handler
|
||||
},
|
||||
|
||||
eventConsumer: function (req, res, next) {
|
||||
|
||||
//console.log("REQ---->",req.body.uid);
|
||||
var data = {
|
||||
"uid": req.body.uid,
|
||||
"member_id": req.body.member_id,
|
||||
"limit": (req.body.limit != null && req.body.limit !== "") ? req.body.limit : 20,
|
||||
"sessionid": req.body.sessionid,
|
||||
"page": req.body.page
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
};
|
||||
module.exports = ebroker;
|
||||
+7
-9
@@ -8,14 +8,8 @@ var jobs = {
|
||||
getmarketjobs: function (req, res, next) {
|
||||
|
||||
//console.log("REQ---->",req.body.uid);
|
||||
/*
|
||||
var data = {
|
||||
"uid": req.query.uid,
|
||||
"member_id": req.query.member_id,
|
||||
"family_uid": req.query.family_uid,
|
||||
"sessionid": req.query.sessionid,
|
||||
};
|
||||
*/
|
||||
logger.info("------lll---->");
|
||||
logger.info(req.query);
|
||||
var data = {
|
||||
"uid": req.query.uid,
|
||||
"member_id": req.query.member_id,
|
||||
@@ -35,7 +29,7 @@ var jobs = {
|
||||
Qstring = " SELECT j.title,j.description,m.id AS job_id,m.expire,m.job_description,j.price, " +
|
||||
" m.offer_code,j.timeline_days, to_char(m.expire, 'Dy Mon dd, yyyy HH:MI AM') AS expire2," +
|
||||
" m.uid AS offer_uid,j.uid AS job_uid,m.added::date AS offer_added,j.country AS job_country, " +
|
||||
" c.code AS currency_code, c.description AS currency_description,j.country, j.category,mb.uid AS market_uid " +
|
||||
" c.code AS currency_code, c.description AS currency_description,j.country, j.category,mb.uid AS market_uid, 0 AS interest_count " +
|
||||
" FROM members_jobs_offer m " +
|
||||
" LEFT JOIN members_jobs j ON j.id=m.job_id " +
|
||||
" LEFT JOIN members mb ON mb.id = m.member_id " +
|
||||
@@ -62,6 +56,10 @@ var jobs = {
|
||||
});
|
||||
},
|
||||
getmarketjobsFiles: function (req, res, next) {
|
||||
|
||||
logger.info("------ ********** ********** ---->");
|
||||
logger.info(req.query);
|
||||
|
||||
var data = {
|
||||
"member_uid": req.query.member_uid,
|
||||
"job_uid": req.query.job_uid,
|
||||
|
||||
Reference in New Issue
Block a user