const express = require('express'); const cors = require('cors'); const cookieParser = require('cookie-parser'); const bodyParser = require('body-parser'); const logger = require('./app/logger'); const port = process.env.PORT || 3000; const KafkaConfig = require("./app/kconfig"); const phpUnserialize = require('phpunserialize'); const assign_action = require('./app/assign_action'); const kafka = new KafkaConfig(); const app = express(); // create application/json parser var jsonParser = bodyParser.json(); // express.json(); // create application/x-www-form-urlencoded parser var urlencodedParser = bodyParser.urlencoded({ extended: false }); // express.bodyParser({extended: true}); app.use(urlencodedParser); app.use(jsonParser); app.use(cors()); app.use(cookieParser()); // parse application/vnd.api+json as json app.use(bodyParser.json({ type: 'application/vnd.api+json' })) const routes = require('./api/routes'); routes(app); // https://github.com/lemoncode21/nodejs-kafka.git //eventInterest kafka.consume("NEW_OFFER_110011", (value) => { try { const obj = JSON.parse(value); console.log( "**Receive NEW_OFFER_110011 message : ", obj); console.log("START -----------------------------------------"); assign_action.processLinedJobs(obj); console.log("END -----------------------------------------"); } catch (exceptionVar) { console.log(" Error ", exceptionVar.message); } }); kafka.consume("NEW_OFFER_110022", (value) => { try { // console.log("Receive NEW_OFFER_110022 message xxxx: ", value); const obj = JSON.parse(value); console.log( "**Receive NEW_OFFER_110022 message : ", obj); console.log("START -----------------------------------------"); assign_action.processLinedJobs(obj); console.log("END -----------------------------------------"); } catch (exceptionVar) { console.log(" Error ", exceptionVar.message); } }); kafka.consume("NEW_OFFER_110033", (value) => { try { console.log("Receive NEW_OFFER_110033 message xxxx: ", value); const obj = JSON.parse(value); console.log( obj); } catch (exceptionVar) { console.log(" Error ", exceptionVar.message); } }); kafka.consume("NEW_OFFER_110055", (value) => { try { console.log("Receive NEW_OFFER_110055 message xxxx: ", value); const obj = JSON.parse(value); console.log( obj); } catch (exceptionVar) { console.log(" Error ", exceptionVar.message); } }); kafka.consume("SENDMONEY_RECEIVED", (value) => { try { console.log("Receive SENDMONEY_RECEIVED message xxxx: ", value); const obj = JSON.parse(value); console.log( obj); } catch (exceptionVar) { console.log(" Error ", exceptionVar.message); } }); kafka.consume("INTEREST_MSG", (value) => { try { console.log("Receive INTEREST_MSG message xxxx: ", value); const obj = JSON.parse(value); console.log( obj); // var obj = phpUnserialize(value); // console.log(obj); } catch (exceptionVar) { console.log(" Error ", exceptionVar.message); } }); kafka.consume("FLUTTER_PAYMENT_RECEIVED", (value) => { try { console.log("Receive FLUTTER_PAYMENT_RECEIVED message xxxx: ", value); // var obj = phpUnserialize(value); const obj = JSON.parse(value); console.log(obj); console.log("Receive message yyyy: ", obj.txRef); } catch (exceptionVar) { console.log("Error ", exceptionVar.message); } }); app.listen(port, "0.0.0.0", function() { logger.info('***** Server started on port: ' + port + ' *****'); });