diff --git a/.env b/.env index 4b80416..ec429c8 100644 --- a/.env +++ b/.env @@ -1,2 +1,3 @@ WRENCHJOB_PORT=3000 WRENCHJOB_POSTGRE_URL='postgresql://wrenchboard:wrenchboard@10.20.30.60:5432/wrenchboard' +WRENCHJOB_KAFKA_BROKER01='10.10.10.120:9092' \ No newline at end of file diff --git a/api/controller.js b/api/controller.js index bbb207c..f2849d0 100644 --- a/api/controller.js +++ b/api/controller.js @@ -3,6 +3,7 @@ const properties = require('../package.json') const jobs = require('../service/jobs'); const logger = require('../app/logger'); +const ebroker = require('../service/ebroker'); var controllers = { getMarketJobs: function(req, res) { @@ -22,7 +23,16 @@ var controllers = { } res.json(dist); }); - }, + }, + newJobPublish: function(req, res) { + ebroker.eventNewJobPublish(req, res, function(err, dist) { + if (err) { + res.send(err); + } + // res.json(dist); + res.status(200).json({'status': 'OK', 'internal_return': 0, 'result_list': dist.result,'total_record': dist.total_record }) + }); + }, }; module.exports = controllers; diff --git a/api/routes.js b/api/routes.js index fbf1946..95e778d 100644 --- a/api/routes.js +++ b/api/routes.js @@ -9,5 +9,6 @@ module.exports = function(app) { .get(controller.getMarketJobsFiles); app.route('/marketjob3s/:id') .get(controller.getStatus); - + app.route('/jobAdded') + .post(controller.newJobPublish); }; \ No newline at end of file diff --git a/app/kconfig.js b/app/kconfig.js new file mode 100644 index 0000000..89c576f --- /dev/null +++ b/app/kconfig.js @@ -0,0 +1,53 @@ +// import { Kafka } from "kafkajs"; +const dotenv = require('dotenv'); +const env = dotenv.config(); +//console.log("ENVVVVV ", env); + +const {Kafka} = require('kafkajs'); +const kafkaBroker1 = process.env.WRENCHJOB_KAFKA_BROKER01; + +console.log("kafkaBroker1", kafkaBroker1); + +const kafka = new Kafka({ + clientId: "wallet-kafka", + brokers: [kafkaBroker1], +}); +class KafkaConfig { + + constructor() { + this.producer = kafka.producer(); + this.consumer = kafka.consumer({ groupId: "test-group" }); + } + + async produce(topic, messages) { + try { + await this.producer.connect(); + await this.producer.send({ + topic: topic, + messages: messages, + }); + } catch (error) { + console.error(error); + } finally { + await this.producer.disconnect(); + } + } + + async consume(topic, callback) { + try { + await this.consumer.connect(); + await this.consumer.subscribe({ topic: topic, fromBeginning: true }); + await this.consumer.run({ + eachMessage: async ({ topic, partition, message }) => { + const value = message.value.toString(); + callback(value); + }, + }); + } catch (error) { + console.error(error); + } + } +} + +module.exports = KafkaConfig; +//export default KafkaConfig; \ No newline at end of file diff --git a/package.json b/package.json index 0cf0143..1d210c5 100644 --- a/package.json +++ b/package.json @@ -14,7 +14,10 @@ "dependencies": { "axios": "^0.24.0", "body-parser": "^1.19.0", + "cookie-parser": "^1.4.6", + "dotenv": "^16.4.5", "express": "^4.17.1", + "kafkajs": "^2.2.4", "openapi-types": "^10.0.0", "pg": "8.7.1", "pg-pool": "^3.5.1", diff --git a/server.js b/server.js index 33715c0..c9012b0 100644 --- a/server.js +++ b/server.js @@ -1,12 +1,29 @@ const express = require('express'); const logger = require('./app/logger'); +const cookieParser = require('cookie-parser'); +const bodyParser = require('body-parser'); const port = process.env.PORT || 3000; const app = express(); + +// create application/json parser +var jsonParser = bodyParser.json(); // express.json(); + +// create application/x-www-form-urlencoded parser +var urlencodedParser = bodyParser.urlencoded({ extended: false }); // express.bodyParser({extended: true}); +app.use(cookieParser()); + +app.use(urlencodedParser); +app.use(jsonParser); + + app.use(express.json()); app.use(express.urlencoded()); +// parse application/vnd.api+json as json +app.use(bodyParser.json({ type: 'application/vnd.api+json' })) + const routes = require('./api/routes'); routes(app); diff --git a/service/ebroker.js b/service/ebroker.js new file mode 100644 index 0000000..7987eb0 --- /dev/null +++ b/service/ebroker.js @@ -0,0 +1,50 @@ +'use strict'; + +const request = require('request'); +const db = require('../app/db') +const logger = require('../app/logger'); +const KafkaConfig = require("../app/kconfig"); + +var ebroker = { + eventNewJobPublish: function (req, res, next) { + + try { + const { message } = req.body; + // console.log('THIS-> req.body -> ', req.body); + const kafka_topic ="NEW_OFFER_"+ message.assign_mode; + const kafkaConfig = new KafkaConfig(); + const messages = [{ key: "offer", value: message }]; + kafkaConfig.produce(kafka_topic, messages).then(r =>{ + console.log('THIS->RET-> ',r); + } ); + + res.status(200).json({ + status: "Ok!", + message: "Message successfully send!", + }); + } catch (error) { + console.log(error); + } + + let resultItem ={ + "result": [], + "total_record": 0 + } + next(null, resultItem ); // pass control to the next handler + }, + + eventConsumer: function (req, res, next) { + + //console.log("REQ---->",req.body.uid); + var data = { + "uid": req.body.uid, + "member_id": req.body.member_id, + "limit": (req.body.limit != null && req.body.limit !== "") ? req.body.limit : 20, + "sessionid": req.body.sessionid, + "page": req.body.page + }; + + + } +}; +module.exports = ebroker; diff --git a/service/jobs.js b/service/jobs.js index a780915..9056c50 100644 --- a/service/jobs.js +++ b/service/jobs.js @@ -8,14 +8,8 @@ var jobs = { getmarketjobs: function (req, res, next) { //console.log("REQ---->",req.body.uid); - /* - var data = { - "uid": req.query.uid, - "member_id": req.query.member_id, - "family_uid": req.query.family_uid, - "sessionid": req.query.sessionid, - }; - */ + logger.info("------lll---->"); + logger.info(req.query); var data = { "uid": req.query.uid, "member_id": req.query.member_id, @@ -35,7 +29,7 @@ var jobs = { Qstring = " SELECT j.title,j.description,m.id AS job_id,m.expire,m.job_description,j.price, " + " m.offer_code,j.timeline_days, to_char(m.expire, 'Dy Mon dd, yyyy HH:MI AM') AS expire2," + " m.uid AS offer_uid,j.uid AS job_uid,m.added::date AS offer_added,j.country AS job_country, " + - " c.code AS currency_code, c.description AS currency_description,j.country, j.category,mb.uid AS market_uid " + + " c.code AS currency_code, c.description AS currency_description,j.country, j.category,mb.uid AS market_uid, 0 AS interest_count " + " FROM members_jobs_offer m " + " LEFT JOIN members_jobs j ON j.id=m.job_id " + " LEFT JOIN members mb ON mb.id = m.member_id " + @@ -62,6 +56,10 @@ var jobs = { }); }, getmarketjobsFiles: function (req, res, next) { + + logger.info("------ ********** ********** ---->"); + logger.info(req.query); + var data = { "member_uid": req.query.member_uid, "job_uid": req.query.job_uid,