diff --git a/lib/kv.js b/lib/kv.js index 114eb7e..84148ca 100644 --- a/lib/kv.js +++ b/lib/kv.js @@ -85,3 +85,40 @@ KV.prototype.delete = function (key, opts, done) { if ('function' == typeof opts) done = opts, opts = null; this.requestor.delete(key, opts, done); }; + +/** + * GET then WATCH a single `key` + * @param {key} key + * @param {hanlder} is called with the current value, then each time the value is changed + * @returns {Function} until this function is called to stop watching + */ + +KV.prototype.watch = function(key, handler) { + var self = this; + var shouldStop = false; + var watchMore = function(index) { + // get the key, wait up to 10s + self.get(key, { wait: "10s", index: index }, function(err, res) { + var newIndex; + if (!shouldStop) { // do nothing if user has cancelled + if (err != null) { // in case of error + handler(err); // call handler with error + return setTimeout(function() { // but schedule a retry in 1s + watchMore(index); + }, 1000); + } + newIndex = res[0].modifyIndex; + if (index !== newIndex) { // the index has changed == data has changed + handler(null, res); // call handler with new data + } + return process.nextTick(function() { // then recurse + watchMore(newIndex); + }); + } + }); + }; + watchMore(0); // index=0 is a good starting point + return function() { // return a function which will stop watching when called + shouldStop = true; + }; +}; diff --git a/test/kv.test.js b/test/kv.test.js index 43579ea..d698c68 100644 --- a/test/kv.test.js +++ b/test/kv.test.js @@ -51,4 +51,76 @@ describe('consul.kv', function () { }); }); + describe('#watch', function() { + var consul = new Consul(); + it('calls back immediately with value', function (done) { + var stop = consul.kv.watch('hello', function (err, values) { + if (err) return done(err); + assert(Array.isArray(values)); + assert(values.length === 1); + values.forEach(function (value) { + assert(value.key === 'hello'); + assert(value.value === 'world'); + }); + stop(); + done(); + }); + }); + + it('calls back again if value is changed', function (done) { + this.timeout(12000); + var count = 0; + var stop = consul.kv.watch('hello', function (err, values) { + count++; + if (err) return done(err); + switch (count) { + case 1: + assert(values[0].key === 'hello'); + assert(values[0].value === 'world'); + consul.kv.put('hello','all', function (err) { // change value, handler will be called again + if (err) return done(err); + }); + break; + case 2: + assert(values[0].key === 'hello'); + assert(values[0].value === 'all'); + stop(); + done(); + } + }); + }); + + it('retries http calls after 10s', function (done) { + this.timeout(12000); + var count = 0; + var stop = consul.kv.watch('hello', function (err, values) { + count++; + if (err) return done(err); + switch (count) { + case 1: + assert(values[0].key === 'hello'); + assert(values[0].value === 'all'); + consul.kv.put('hello','world', function (err) { // change value, handler will be called again + if (err) return done(err); + }); + break; + case 2: + assert(values[0].key === 'hello'); + assert(values[0].value === 'world'); + setTimeout(function() { + consul.kv.put('hello','again', function (err) { // change value back, handler will be called again + if (err) return done(err); + }); + }, 10500); + break; + case 3: + assert(values[0].key === 'hello'); + assert(values[0].value === 'again'); + stop(); + done(); + } + }); + }); + + }); });