From deafd460469d130fbd081dc44ef870c41bba75d3 Mon Sep 17 00:00:00 2001 From: chris Date: Mon, 23 Jun 2014 22:25:23 +0200 Subject: [PATCH 1/3] added KV.watch --- lib/kv.js | 36 ++++++++++++++++++++++++ test/kv.test.js | 74 ++++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 109 insertions(+), 1 deletion(-) diff --git a/lib/kv.js b/lib/kv.js index 114eb7e..b054cff 100644 --- a/lib/kv.js +++ b/lib/kv.js @@ -85,3 +85,39 @@ KV.prototype.delete = function (key, opts, done) { if ('function' == typeof opts) done = opts, opts = null; this.requestor.delete(key, opts, done); }; + +/** + * GET then WATCH a single `key` + * @param {key} key + * @param {hanlder} is called with the current value, then each time the value is changed + * @returns {Function} until this function is called to stop watching + */ +KV.prototype.watch = function(key, handler) { + var self = this; + var shouldStop = false; + var watchMore = function(index) { + // get the key, wait up to 10s + self.get(key, { wait: "10s", index: index }, function(err, res) { + var newIndex; + if (!shouldStop) { // do nothing if user has cancelled + if (err != null) { // in case of error + handler(err); // call handler with error + return setTimeout(function() { // but schedule a retry in 1s + watchMore(index); + }, 1000); + } + newIndex = res[0].modifyIndex; + if (index !== newIndex) { // the index has changed == data has changed + handler(null, res); // call handler with new data + } + return process.nextTick(function() { // then recurse + watchMore(newIndex); + }); + } + }); + }; + watchMore(0); // index=0 is a good starting point + return function() { // return a function which will stop watching when called + shouldStop = true; + }; +}; diff --git a/test/kv.test.js b/test/kv.test.js index 43579ea..16f1e03 100644 --- a/test/kv.test.js +++ b/test/kv.test.js @@ -2,7 +2,7 @@ var Consul = require('..'); var assert = require('assert'); -describe('consul.kv', function () { +describe.only('consul.kv', function () { describe('#get', function () { describe('when not in "strict mode"', function () { @@ -51,4 +51,76 @@ describe('consul.kv', function () { }); }); + describe('#watch', function() { + var consul = new Consul(); + it('calls back immediately with value', function (done) { + var stop = consul.kv.watch('hello', function (err, values) { + if (err) return done(err); + assert(Array.isArray(values)); + assert(values.length === 1); + values.forEach(function (value) { + assert(value.key === 'hello'); + assert(value.value === 'world'); + }); + stop(); + done(); + }); + }); + + it('calls back again if value is changed', function (done) { + this.timeout(12000); + var count = 0; + var stop = consul.kv.watch('hello', function (err, values) { + count++; + if (err) return done(err); + switch (count) { + case 1: + assert(values[0].key === 'hello'); + assert(values[0].value === 'world'); + consul.kv.put('hello','all', function (err) { // change value, handler will be called again + if (err) return done(err); + }); + break; + case 2: + assert(values[0].key === 'hello'); + assert(values[0].value === 'all'); + stop(); + done(); + } + }); + }); + + it('retries http calls after 10s', function (done) { + this.timeout(12000); + var count = 0; + var stop = consul.kv.watch('hello', function (err, values) { + count++; + if (err) return done(err); + switch (count) { + case 1: + assert(values[0].key === 'hello'); + assert(values[0].value === 'all'); + consul.kv.put('hello','world', function (err) { // change value, handler will be called again + if (err) return done(err); + }); + break; + case 2: + assert(values[0].key === 'hello'); + assert(values[0].value === 'world'); + setTimeout(function() { + consul.kv.put('hello','again', function (err) { // change value back, handler will be called again + if (err) return done(err); + }); + }, 10500); + break; + case 3: + assert(values[0].key === 'hello'); + assert(values[0].value === 'again'); + stop(); + done(); + } + }); + }); + + }); }); From 87464a97ae7e6d6ab91454d6fdba8d49dd442d56 Mon Sep 17 00:00:00 2001 From: chris Date: Mon, 23 Jun 2014 22:27:50 +0200 Subject: [PATCH 2/3] removed .only from kv tests --- test/kv.test.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/kv.test.js b/test/kv.test.js index 16f1e03..d698c68 100644 --- a/test/kv.test.js +++ b/test/kv.test.js @@ -2,7 +2,7 @@ var Consul = require('..'); var assert = require('assert'); -describe.only('consul.kv', function () { +describe('consul.kv', function () { describe('#get', function () { describe('when not in "strict mode"', function () { From ba0e04b5b642d26af9b4a778a12937afcf7b170f Mon Sep 17 00:00:00 2001 From: chris Date: Mon, 23 Jun 2014 22:31:52 +0200 Subject: [PATCH 3/3] fixed formatting --- lib/kv.js | 45 +++++++++++++++++++++++---------------------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/lib/kv.js b/lib/kv.js index b054cff..84148ca 100644 --- a/lib/kv.js +++ b/lib/kv.js @@ -92,32 +92,33 @@ KV.prototype.delete = function (key, opts, done) { * @param {hanlder} is called with the current value, then each time the value is changed * @returns {Function} until this function is called to stop watching */ + KV.prototype.watch = function(key, handler) { var self = this; - var shouldStop = false; + var shouldStop = false; var watchMore = function(index) { // get the key, wait up to 10s - self.get(key, { wait: "10s", index: index }, function(err, res) { - var newIndex; - if (!shouldStop) { // do nothing if user has cancelled - if (err != null) { // in case of error + self.get(key, { wait: "10s", index: index }, function(err, res) { + var newIndex; + if (!shouldStop) { // do nothing if user has cancelled + if (err != null) { // in case of error handler(err); // call handler with error - return setTimeout(function() { // but schedule a retry in 1s - watchMore(index); - }, 1000); - } - newIndex = res[0].modifyIndex; - if (index !== newIndex) { // the index has changed == data has changed + return setTimeout(function() { // but schedule a retry in 1s + watchMore(index); + }, 1000); + } + newIndex = res[0].modifyIndex; + if (index !== newIndex) { // the index has changed == data has changed handler(null, res); // call handler with new data - } - return process.nextTick(function() { // then recurse - watchMore(newIndex); - }); - } - }); - }; - watchMore(0); // index=0 is a good starting point - return function() { // return a function which will stop watching when called - shouldStop = true; - }; + } + return process.nextTick(function() { // then recurse + watchMore(newIndex); + }); + } + }); + }; + watchMore(0); // index=0 is a good starting point + return function() { // return a function which will stop watching when called + shouldStop = true; + }; };