From a42b026f6544eb47f7f1b3b068b380d70ef9a0a4 Mon Sep 17 00:00:00 2001 From: Stefan Meinecke Date: Thu, 19 May 2016 10:39:10 +0000 Subject: [PATCH 1/3] Add multi connection support - add connection pooling for each registry. --- config/epp-config-example.json | 6 ++- lib/event-dispatcher.js | 18 ------- lib/node-epp-server.js | 91 ++++++++++++++++++++++------------ package.json | 3 +- 4 files changed, 65 insertions(+), 53 deletions(-) delete mode 100644 lib/event-dispatcher.js diff --git a/config/epp-config-example.json b/config/epp-config-example.json index a885bbe..282dff8 100644 --- a/config/epp-config-example.json +++ b/config/epp-config-example.json @@ -27,7 +27,8 @@ "DNSSEC": { "xmlns": "urn:ietf:params:xml:ns:secDNS-1.1" } - } + }, + "connections":[10, 100] }, "registry-test2": { "ssl": true, @@ -57,7 +58,8 @@ "keyvalue": { "xmlns": "http://schema.ispapi.net/epp/xml/keyvalue-1.0" } - } + }, + "connections":[10, 100] } }, "rabbitmq": { diff --git a/lib/event-dispatcher.js b/lib/event-dispatcher.js deleted file mode 100644 index 3f17cf1..0000000 --- a/lib/event-dispatcher.js +++ /dev/null @@ -1,18 +0,0 @@ - -var events = require('events'); -var util = require('util'); - -EventDispatcher = function (){ - events.EventEmitter.call(this); - this.childFree = function (registry) { - this.emit('childFree', registry); - }; - this.queueChild = function (registry) { - this.emit('queueChild', registry); - }; - -}; -util.inherits(EventDispatcher, events.EventEmitter); - - -module.exports = EventDispatcher; diff --git a/lib/node-epp-server.js b/lib/node-epp-server.js index 98f10a2..535f5c6 100644 --- a/lib/node-epp-server.js +++ b/lib/node-epp-server.js @@ -5,10 +5,10 @@ var program = require('commander'); var ipfilter = require('ipfilter'); var moment = require('moment'); var Listener = require('./listener.js'); -var EventDispatcher = require('./event-dispatcher.js'); var path = require('path'); var nconf = require('nconf'); var winston = require('winston'); +var Pool = require('generic-pool').Pool; nconf.env().file({ "file": path.resolve(__dirname, '..', 'config/epp-config.json') @@ -35,36 +35,54 @@ logger.log("Initialised with registries: ", registries); var availableProcesses = {}; for (var accred in registries) { var registry = registries[accred]; - var eppProcess = cp.fork(__dirname + '/worker.js'); - eppProcess.send({ - "registry": registry + var registryConfig = nconf.get('registries')[registry]; + var pool = new Pool({ + name : registry, + create : function(callback) { + var eppProcess = cp.fork(__dirname + '/worker.js'); + eppProcess.send({ + "registry": registry + }); + + // parameter order: err, resource + callback(null, eppProcess); + }, + destroy : function(client) { + logger.log("Logout."); + client.send({ + "command": "logout", + "data": {"kill": true} + }); + client.once('message', function(data) { + logger.log(registry + ": Got reply from logout: ", data); + }) + }, + max : registryConfig['connections'][1], + // optional. if you set this, make sure to drain() (see step 3) + min : registryConfig['connections'][0], + // specifies how long a resource can stay idle in pool before being removed + idleTimeoutMillis : 30000, + // if true, logs via console.log - can also be a function + log : true }); - availableProcesses[registry] = eppProcess; + availableProcesses[registry] = pool; } -process.on('SIGINT', function() { - var logoutResponse = function(data) { - logger.log("Got reply from logout: ", data); - }; + +process.on('SIGINT', function(err) { + if (err) { + console.log(err.stack); + } for (var registry in availableProcesses) { - var childProc = availableProcesses[registry]; - childProc.send({ - "command": "logout", - "data": {"kill": true} + var pool = availableProcesses[registry]; + pool.drain(function() { + pool.destroyAllNow(); + process.exit(0); }); - childProc.once('message', logoutResponse); } - process.exit(0); }); - // Wire up event/listener to keep track of available worker process. This is // to avoid individual connection workers from getting swamped. -var eventDispatch = new EventDispatcher(); -var listener = new Listener(eventDispatch, availableProcesses); -eventDispatch.on('queueChild', listener.queueChild); -eventDispatch.on('childFree', listener.childFree); - - var app = express(); app.use(bodyParser.json()); var ips = nconf.get('whitelisted_ips'); @@ -72,31 +90,40 @@ app.use(ipfilter(ips, {"mode": "allow"})); app.post('/command/:registry/:command', function(req, res) { var registry = req.params.registry; - if (! (registry in availableProcesses)) { + if (!(registry in availableProcesses)) { res.send(400, { "error": "Unknown registry" }); return; } - var queryData = req.body; - var a = moment(); - var processChild = function (childProcess) { - childProcess.once('message', function(m) { + var queryData = req.body; + var pool = availableProcesses[registry]; + pool.acquire(function(err, client) { + if (err) { + logger.error(err); + res.send(400, { + "error": "Session overload" + }); + return; + } + var a = moment(); + client.once('message', function(m) { var b = moment(); var diff = b.diff(a, 'milliseconds'); logger.info('Request elapsed time: '+ diff.toString() + ' ms'); + m['runtime'] = diff/1000; res.send(m); - eventDispatch.childFree(registry); + pool.release(client); }); - childProcess.send({ + + client.send({ "command": req.params.command, "data": queryData }); - }; - listener.pushChildQueue(processChild); - eventDispatch.queueChild(registry); + }); }); app.listen(program.listen); +process.stdin.resume();//so the program will not close instantly diff --git a/package.json b/package.json index bd71c0a..2177e37 100644 --- a/package.json +++ b/package.json @@ -32,7 +32,8 @@ "node-expat": "*", "q": "*", "xml2json": "*", - "winston": "*" + "winston": "*", + "generic-pool": "*" }, "devDependencies": { "chai": "*", From 6e9e59113a4ca8b5af7e4d252376b17bcfc91f17 Mon Sep 17 00:00:00 2001 From: Stefan Meinecke Date: Thu, 19 May 2016 20:56:21 +0000 Subject: [PATCH 2/3] Add return of login status return login status after first run of worker. --- lib/dispatcher.js | 2 ++ lib/epp-factory.js | 3 +++ 2 files changed, 5 insertions(+) diff --git a/lib/dispatcher.js b/lib/dispatcher.js index 8bec85d..3bf3039 100644 --- a/lib/dispatcher.js +++ b/lib/dispatcher.js @@ -65,9 +65,11 @@ Dispatcher.prototype.processMessage = function processMessage(m) { loginTransactionId).then( function(data) { logger.log("Got login data: ", data.toString()); + process.send(currentState.loggedIn); }, function(error) { logger.error("Unable to login: ", error); + process.send(currentState.loggedIn); }); }, 2000); diff --git a/lib/epp-factory.js b/lib/epp-factory.js index d50ee0b..b674174 100644 --- a/lib/epp-factory.js +++ b/lib/epp-factory.js @@ -20,6 +20,9 @@ var logger = new (winston.Logger)({ function EppFactory() {} EppFactory.generate = function(registry, config) { + if (!registry) { + return; + } var epp = new EPP(registry, config); config.extensionClasses.forEach(function(extensionClass) { var extension = extensionClass.extension; From 93163b5e72ed4264e4a0c8c63e5e3a893d4f444b Mon Sep 17 00:00:00 2001 From: Stefan Meinecke Date: Thu, 19 May 2016 20:59:51 +0000 Subject: [PATCH 3/3] fix pooling bugs fix some bugs in pool handling + shutdown. --- lib/listener.js | 43 --------------------- lib/node-epp-server.js | 88 ++++++++++++++++++++++++++++-------------- lib/worker.js | 11 ++++++ 3 files changed, 69 insertions(+), 73 deletions(-) delete mode 100644 lib/listener.js diff --git a/lib/listener.js b/lib/listener.js deleted file mode 100644 index cdb68a1..0000000 --- a/lib/listener.js +++ /dev/null @@ -1,43 +0,0 @@ -var winston = require('winston'); -var nconf = require('nconf'); - -nconf.env() -var log_level = (nconf.get('LOG_LEVEL') || 'debug').toLowerCase(); -var logger = new (winston.Logger)({ - transports: [ - new (winston.transports.Console)({ level: log_level }) - ] -}); - -var available; -var busy; -var childQueue = []; -var eventer; -function Listener(eventDispatcher, availableProcesses) { - eventer = eventDispatcher; - available = availableProcesses; - busy = {}; -} -Listener.prototype.pushChildQueue = function (child) { - childQueue.push(child); - logger.info("Items in queue: ", childQueue.length); -}; -Listener.prototype.childFree = function(registry) { - logger.info(registry + " free "); - var childProc = busy[registry]; - delete busy[registry]; - available[registry] = childProc; - eventer.queueChild(registry); -}; - -Listener.prototype.queueChild = function (registry) { - var childProc = available[registry]; - if (childProc && childQueue.length > 0) { - delete available[registry]; - busy[registry] = childProc; - var callToChild = childQueue.shift(); - callToChild(childProc); - } -}; - -module.exports = Listener; diff --git a/lib/node-epp-server.js b/lib/node-epp-server.js index 535f5c6..f3a6a79 100644 --- a/lib/node-epp-server.js +++ b/lib/node-epp-server.js @@ -4,7 +4,6 @@ var cp = require('child_process'); var program = require('commander'); var ipfilter = require('ipfilter'); var moment = require('moment'); -var Listener = require('./listener.js'); var path = require('path'); var nconf = require('nconf'); var winston = require('winston'); @@ -39,26 +38,36 @@ for (var accred in registries) { var pool = new Pool({ name : registry, create : function(callback) { - var eppProcess = cp.fork(__dirname + '/worker.js'); - eppProcess.send({ + var client = cp.fork(__dirname + '/worker.js'); + client.once('message', function(loggedIn) { + if (!loggedIn) { + client.kill(); + return callback("Login failed.", null); + } + // parameter order: err, resource + callback(null, client); + }); + client.send({ "registry": registry }); - - // parameter order: err, resource - callback(null, eppProcess); }, - destroy : function(client) { - logger.log("Logout."); + destroy: function(client) { + setTimeout(function() { + client.kill(); + }, 2000); client.send({ "command": "logout", "data": {"kill": true} }); - client.once('message', function(data) { - logger.log(registry + ": Got reply from logout: ", data); - }) }, + validateAsync: function(client, callback) { + if (!client.connected) { + return callback(false); + } + return callback(true); + }, + returnToHead: true, max : registryConfig['connections'][1], - // optional. if you set this, make sure to drain() (see step 3) min : registryConfig['connections'][0], // specifies how long a resource can stay idle in pool before being removed idleTimeoutMillis : 30000, @@ -68,19 +77,6 @@ for (var accred in registries) { availableProcesses[registry] = pool; } -process.on('SIGINT', function(err) { - if (err) { - console.log(err.stack); - } - for (var registry in availableProcesses) { - var pool = availableProcesses[registry]; - pool.drain(function() { - pool.destroyAllNow(); - process.exit(0); - }); - } -}); - // Wire up event/listener to keep track of available worker process. This is // to avoid individual connection workers from getting swamped. var app = express(); @@ -91,7 +87,7 @@ app.use(ipfilter(ips, {"mode": "allow"})); app.post('/command/:registry/:command', function(req, res) { var registry = req.params.registry; if (!(registry in availableProcesses)) { - res.send(400, { + res.status(400).send({ "error": "Unknown registry" }); return; @@ -102,19 +98,30 @@ app.post('/command/:registry/:command', function(req, res) { pool.acquire(function(err, client) { if (err) { logger.error(err); - res.send(400, { + res.status(400).send({ "error": "Session overload" }); return; } + + var registryConfig = nconf.get('registries')[registry]; + var t = setTimeout(function() { + pool.destroy(client); + res.status(400).send({ + "error": "Timeout" + }); + return; + }, registryConfig['max_runtime']); + var a = moment(); client.once('message', function(m) { + clearTimeout(t); var b = moment(); var diff = b.diff(a, 'milliseconds'); logger.info('Request elapsed time: '+ diff.toString() + ' ms'); m['runtime'] = diff/1000; - res.send(m); pool.release(client); + res.send(m); }); client.send({ @@ -124,6 +131,27 @@ app.post('/command/:registry/:command', function(req, res) { }); }); -app.listen(program.listen); +process.on('SIGINT', function(err) { + if (err) { + console.log(err.stack); + } -process.stdin.resume();//so the program will not close instantly + for (var registry in availableProcesses) { + var pool = availableProcesses[registry]; + pool.drain(function() { + pool.destroyAllNow(); + }); + } + + setTimeout(function() { + process.exit(0); + }, 5000); +}); + +process.on('uncaughtException', function(err) { + console.error(err); + console.error(err.stack); +}); +process.stdin.resume(); //so the program will not close instantly + +app.listen(program.listen); diff --git a/lib/worker.js b/lib/worker.js index a2042cd..4dbb639 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -2,3 +2,14 @@ var Dispatcher = require('./dispatcher.js'); var dispatch = new Dispatcher(); process.on('message', dispatch.processMessage); +process.on('SIGINT', function(err) { + console.log("Try to logout."); + setTimeout(function() { + process.exit(0); + }, 2000); + process.send({ + "command": "logout", + "data": {"kill": true} + }); +}); +