diff --git a/.gitignore b/.gitignore index 3c3629e..c9d43bc 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,4 @@ node_modules +*.swp +coverage +.env diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..6790509 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,2 @@ +FROM node:0.10-onbuild +EXPOSE 8000 diff --git a/README.md b/README.md index c981d6f..7fea053 100644 --- a/README.md +++ b/README.md @@ -15,5 +15,20 @@ to be loaded by dotenv: ``` npm install -node server +docker-compose up ``` + +This spins up the following: + +* A kdihalas/beanstalkd container +* A worker container that executes `npm run worker` +* A web container with ```node server.js``` + +## Job Processing + +compute-service contains a worker that can read jobs from a beanstalkd queue and +spin up instances. To configure, set the variable BEANSTALK_SERVER. The default +value is ```127.0.0.1:11300```. To run: + +```shell +$ ./node_modules/beanstalk_worker/bin/ diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..f18e96c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,18 @@ +web: + build: . + command: node server.js + volumes: + - .:/usr/src/app + links: + - worker + ports: + - "8000:8000" +worker: + build: . + command: npm run worker + volumes: + - .:/usr/src/app + links: + - beanstalkd +beanstalkd: + image: kdihalas/beanstalkd diff --git a/lib/config.js b/lib/config.js index 9734b64..7d8abbc 100644 --- a/lib/config.js +++ b/lib/config.js @@ -6,10 +6,25 @@ dotenv.load(); nconf.use('memory'); nconf.env(); +nconf.defaults({ + BEANSTALK_SERVER: '127.0.0.1:11300' +}); + +if (nconf.get('BEANSTALKD_PORT')) { + nconf.set('BEANSTALK_SERVER', nconf.get('BEANSTALKD_PORT_11300_TCP_ADDR')+':'+nconf.get('BEANSTALKD_PORT_11300_TCP_PORT')); +}; + nconf.defaults({ aws: { accessKeyId: nconf.get('AWS_ACCESS_KEY_ID'), secretAccessKey: nconf.get('AWS_SECRET_ACCESS_KEY') + }, + beanstalk: { + server: nconf.get('BEANSTALK_SERVER'), + workers: 1 + }, + codius: { + endpoint: nconf.get('CODIUS_URI') } }); diff --git a/lib/job-queue.js b/lib/job-queue.js new file mode 100644 index 0000000..c1229b8 --- /dev/null +++ b/lib/job-queue.js @@ -0,0 +1,72 @@ +var bluebird = require('bluebird'); +var Promise = bluebird.Promise; +var config = require('./config'); +var beanstalk = require('nodestalker'); + +module.exports = function(tubeName) { + var self = this; + self.client = beanstalk.Client(config.get('beanstalk:server')); + self.workers = {}; + self.tubeName = tubeName; + bluebird.promisifyAll(self.client, {promisifier: promisify}); + self.tubeDelay = self.client.useAsync(self.tubeName).then(function() { + return self.client.watchAsync(self.tubeName); + }); +} + +module.exports.prototype = { + useTube: function() { + return this.tubeDelay; + }, + + addWorker: function(name, worker) { + var self = this; + if (typeof(worker.run) !== 'function') { + throw new Error("Object with a run() method must be passed in"); + } + self.workers[name] = worker; + }, + + enqueueJob: function(type, data) { + var self = this; + var job = { + type: type, + data: data + } + return self.client.putAsync(JSON.stringify(job)); + }, + + processNextJob: function() { + var self = this; + var p = []; + var job; + return self.client.reserveAsync().then(function(_job) { + job = _job; + var realJob = JSON.parse(job.data); + if (realJob.type in self.workers) { + var r = self.workers[realJob.type].run(realJob.data); + if (typeof(r) === 'undefined') { + r = bluebird.resolve(); + } + return r.then(function() { + return self.client.deleteJobAsync(job.id); + }); + } else { + //FIXME: Log unhandled job + return self.client.releaseAsync(job.id); + } + }); + } +} + +function promisify(method) { + return function() { + var self = this; + var args = arguments; + return new bluebird.Promise(function(resolve, reject) { + var r = method.apply(self, args); + r.onSuccess(function(v) {resolve(v)}); + r.onError(function(err) {reject(new Error(err))}); + }); + } +} diff --git a/package.json b/package.json index 5baf5cb..12aeada 100644 --- a/package.json +++ b/package.json @@ -1,4 +1,9 @@ { + "scripts": { + "test": "istanbul cover _mocha", + "start": "node server.js", + "worker": "./node_modules/beanstalk_worker/bin/worker.js `pwd`/conf/workers.js" + }, "name": "compute-service", "version": "0.1.0", "description": "Codius host compute service", @@ -10,8 +15,15 @@ "license": "ISC", "dependencies": { "aws-sdk": "^2.1.16", + "beanstalk_worker": "^0.2.1", "bluebird": "^2.9.12", "dotenv": "^0.5.1", - "nconf": "^0.7.1" + "nconf": "^0.7.1", + "nodestalker": "^0.1.17", + "request-promise": "^0.4.0" + }, + "devDependencies": { + "chai": "^2.1.1", + "sinon": "^1.13.0" } } diff --git a/test/lib.job-queue.js b/test/lib.job-queue.js new file mode 100644 index 0000000..1d501c5 --- /dev/null +++ b/test/lib.job-queue.js @@ -0,0 +1,34 @@ +var bluebird = require('bluebird'); + +bluebird.longStackTraces(); + +var chai = require('chai'); +var expect = chai.expect; +var JobQueue = require('../lib/job-queue'); +var sinon = require('sinon'); + +var TestWorker = function() { +} + +TestWorker.prototype = { + run: function(data) {} +} + +describe('JobQueue', function() { + var queue, worker; + + beforeEach(function() { + queue = new JobQueue('test_'+Math.round(Math.random()*100)); + worker = new TestWorker(); + queue.addWorker('test', worker); + }); + + it('should run a job after it is enqueued', function() { + sinon.spy(worker, 'run'); + return queue.enqueueJob('test', {}).then(function() { + return queue.processNextJob(); + }).then(function() { + expect(worker.run.called).to.equal(true); + }); + }); +}); diff --git a/test/workers.launch.js b/test/workers.launch.js new file mode 100644 index 0000000..649ad88 --- /dev/null +++ b/test/workers.launch.js @@ -0,0 +1,26 @@ +var Promise = require('bluebird').Promise; +var chai = require('chai'); +var expect = chai.expect; +var JobQueue = require('../lib/job-queue'); +var sinon = require('sinon'); + +var LaunchWorker = require('../workers/launch'); + +describe('launch handler', function() { + var queue, worker; + + beforeEach(function() { + queue = new JobQueue('test_'+Math.random()*100); + worker = new LaunchWorker(); + queue.addWorker('instance_launch', worker); + }); + + it('should handle an instance launch job', function() { + sinon.spy(worker, 'run'); + return queue.enqueueJob('instance_launch', {}).then(function() { + return queue.processNextJob(); + }).then(function() { + expect(worker.run.called).to.equal(true); + }); + }); +}); diff --git a/workers/launch.js b/workers/launch.js new file mode 100644 index 0000000..a3f8b26 --- /dev/null +++ b/workers/launch.js @@ -0,0 +1,11 @@ +var request = require('request-promise'); +var config = require('../lib/config'); + +module.exports = function() { +} + +module.exports.prototype = { + run: function(data) { + return request(config.get('codius:endpoint')); + } +}