Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions lib/kv.js
Original file line number Diff line number Diff line change
Expand Up @@ -85,3 +85,40 @@ KV.prototype.delete = function (key, opts, done) {
if ('function' == typeof opts) done = opts, opts = null;
this.requestor.delete(key, opts, done);
};

/**
* GET then WATCH a single `key`
* @param {key} key
* @param {hanlder} is called with the current value, then each time the value is changed
* @returns {Function} until this function is called to stop watching
*/

KV.prototype.watch = function(key, handler) {
var self = this;
var shouldStop = false;
var watchMore = function(index) {
// get the key, wait up to 10s
self.get(key, { wait: "10s", index: index }, function(err, res) {
var newIndex;
if (!shouldStop) { // do nothing if user has cancelled
if (err != null) { // in case of error
handler(err); // call handler with error
return setTimeout(function() { // but schedule a retry in 1s
watchMore(index);
}, 1000);
}
newIndex = res[0].modifyIndex;
if (index !== newIndex) { // the index has changed == data has changed
handler(null, res); // call handler with new data
}
return process.nextTick(function() { // then recurse
watchMore(newIndex);
});
}
});
};
watchMore(0); // index=0 is a good starting point
return function() { // return a function which will stop watching when called
shouldStop = true;
};
};
72 changes: 72 additions & 0 deletions test/kv.test.js
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,76 @@ describe('consul.kv', function () {
});
});

describe('#watch', function() {
var consul = new Consul();
it('calls back immediately with value', function (done) {
var stop = consul.kv.watch('hello', function (err, values) {
if (err) return done(err);
assert(Array.isArray(values));
assert(values.length === 1);
values.forEach(function (value) {
assert(value.key === 'hello');
assert(value.value === 'world');
});
stop();
done();
});
});

it('calls back again if value is changed', function (done) {
this.timeout(12000);
var count = 0;
var stop = consul.kv.watch('hello', function (err, values) {
count++;
if (err) return done(err);
switch (count) {
case 1:
assert(values[0].key === 'hello');
assert(values[0].value === 'world');
consul.kv.put('hello','all', function (err) { // change value, handler will be called again
if (err) return done(err);
});
break;
case 2:
assert(values[0].key === 'hello');
assert(values[0].value === 'all');
stop();
done();
}
});
});

it('retries http calls after 10s', function (done) {
this.timeout(12000);
var count = 0;
var stop = consul.kv.watch('hello', function (err, values) {
count++;
if (err) return done(err);
switch (count) {
case 1:
assert(values[0].key === 'hello');
assert(values[0].value === 'all');
consul.kv.put('hello','world', function (err) { // change value, handler will be called again
if (err) return done(err);
});
break;
case 2:
assert(values[0].key === 'hello');
assert(values[0].value === 'world');
setTimeout(function() {
consul.kv.put('hello','again', function (err) { // change value back, handler will be called again
if (err) return done(err);
});
}, 10500);
break;
case 3:
assert(values[0].key === 'hello');
assert(values[0].value === 'again');
stop();
done();
}
});
});

});
});