From e24ffb1b233972bbc9a3aded4d17acf1a9939b3f Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 26 Jun 2014 10:56:04 +0300 Subject: [PATCH 01/44] Change storm-core to compile --- storm-core/dependency-reduced-pom.xml | 366 ++++++++++++++++++++++++++ 1 file changed, 366 insertions(+) create mode 100644 storm-core/dependency-reduced-pom.xml diff --git a/storm-core/dependency-reduced-pom.xml b/storm-core/dependency-reduced-pom.xml new file mode 100644 index 00000000000..b79fa7e03b0 --- /dev/null +++ b/storm-core/dependency-reduced-pom.xml @@ -0,0 +1,366 @@ + + + + storm + org.apache.storm + 0.9.2-incubating + + 4.0.0 + org.apache.storm + storm-core + Storm Core + Storm Core Java API and Clojure implementation. + + src/jvm + test/jvm + + + ../conf + + + META-INF + ../ + + NOTICE + + + + + + src/dev + + + test/resources + + + + + com.theoryinpractise + clojure-maven-plugin + true + + + compile-clojure + compile + + compile + + + + test-clojure + test + + test-with-junit + + + ${test.extra.args} + + + + + + src/clj + + + test/clj + + false + true + + none + + + + + maven-surefire-report-plugin + + + ${project.build.directory}/test-reports + + + + + maven-shade-plugin + 2.2 + + + package + + shade + + + + + + org.apache.storm + maven-shade-clojure-transformer + ${project.version} + + + + true + false + true + false + + + org.apache.thrift:* + org.apache.storm:* + + + + + org.apache.thrift + org.apache.thrift7 + + + + + + + + org.apache.thrift:* + + META-INF/LICENSE.txt + META-INF/NOTICE.txt + + + + + + + + + + org.clojure + clojure + 1.5.1 + compile + + + clj-time + clj-time + 0.4.1 + compile + + + compojure + compojure + 1.1.3 + compile + + + hiccup + hiccup + 0.3.6 + compile + + + ring + ring-devel + 0.3.11 + compile + + + ring + ring-jetty-adapter + 0.3.11 + compile + + + org.clojure + tools.logging + 0.2.3 + compile + + + org.clojure + math.numeric-tower + 0.0.1 + compile + + + org.clojure + tools.cli + 0.2.4 + compile + + + org.clojure + tools.nrepl + 0.2.3 + test + + + clojure + org.clojure + + + + + clojure-complete + clojure-complete + 0.2.3 + test + + + clojure + org.clojure + + + + + commons-io + commons-io + 2.4 + compile + + + org.apache.commons + commons-exec + 1.1 + compile + + + commons-lang + commons-lang + 2.5 + compile + + + org.apache.thrift + libthrift + 0.7.0 + provided + + + slf4j-api + org.slf4j + + + servlet-api + javax.servlet + + + + + org.apache.curator + curator-framework + 2.4.0 + compile + + + log4j + log4j + + + slf4j-log4j12 + org.slf4j + + + + + com.googlecode.json-simple + json-simple + 1.1 + compile + + + com.twitter + carbonite + 1.4.0 + compile + + + org.yaml + snakeyaml + 1.11 + compile + + + org.apache.httpcomponents + httpclient + 4.3.3 + compile + + + com.googlecode.disruptor + disruptor + 2.10.1 + compile + + + org.jgrapht + jgrapht-core + 0.9.0 + compile + + + com.google.guava + guava + 13.0 + compile + + + ch.qos.logback + logback-classic + 1.0.6 + compile + + + org.slf4j + log4j-over-slf4j + 1.6.6 + compile + + + io.netty + netty + 3.6.3.Final + compile + + + org.mockito + mockito-all + 1.9.5 + test + + + org.clojars.runa + conjure + 2.1.3 + test + + + junit + junit + 4.1 + test + + + reply + reply + 0.3.0 + provided + + + cd-client + org.thnetos + + + drawbridge + com.cemerick + + + versioneer + trptcolin + + + sjacket + org.clojars.trptcolin + + + + + + From 04ce472243a415cfecc1ae89d0f8429747625de4 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 26 Jun 2014 10:57:55 +0300 Subject: [PATCH 02/44] Add BasicBolt js implementation (tested) and Spout implementation (not tested) --- .../multilang/resources/splitsentence.js | 23 ++ .../multilang/resources/storm.js | 267 ++++++++++++++++++ 2 files changed, 290 insertions(+) create mode 100755 examples/storm-starter/multilang/resources/splitsentence.js create mode 100755 examples/storm-starter/multilang/resources/storm.js diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js new file mode 100755 index 00000000000..da8867b7ace --- /dev/null +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -0,0 +1,23 @@ +var storm = require('./storm'); +var BasicBolt = storm.BasicBolt; + +function SplitSentenceBolt() {}; + +SplitSentenceBolt.prototype = new BasicBolt(); + +SplitSentenceBolt.prototype.process = function(tup) { + var self = this; + var words = tup.values[0].split(" "); + words.forEach(function(word) { + self.emit([word], null, null, null, function(taskId) { + storm.logToFile('Task id - ' + JSON.stringify(taskId) + ' work - ' + word); + }); + }); +} + +SplitSentenceBolt.prototype.initialize = function(conf, context) { + storm.logToFile("CONF: " + JSON.stringify(conf)); + storm.logToFile("CONTEXT: " + JSON.stringify(context)); +} + +new SplitSentenceBolt().run(); \ No newline at end of file diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js new file mode 100755 index 00000000000..b631f466f2a --- /dev/null +++ b/examples/storm-starter/multilang/resources/storm.js @@ -0,0 +1,267 @@ + +var fs = require('fs'); + +function logToFile(msg) { + fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); +} + +function sendMsgToParent(msg){ + logToFile('SEND MESSAGE TO PARENT: ' + JSON.stringify(msg)); + var str = JSON.stringify(msg) + '\nend\n'; + process.stdout.write(str); +} + +function sync(){ + sendMsgToParent({'command':'sync'}); +} + +function sendpid(heartbeatdir){ + var pid = process.pid; + sendMsgToParent({'pid':pid}) + fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); +} + +function fail(tup) { + sendMsgToParent({"command": "fail", "id": tup.id}); +} + +function log(msg) { + sendMsgToParent({"command": "log", "msg": msg}); +} + +function Storm() { + this.lines = []; + this.taskIdCallbacks = []; + this.numMessages = 0; +} + +Storm.prototype.initSetupInfo = function(setupInfo) { + sendpid(setupInfo['pidDir']); + this.initialize(setupInfo['conf'], setupInfo['context']); +} + +Storm.prototype.startReadingInput = function() { + var self = this; + logToFile('startReadingInput'); + + process.stdin.on('readable', function() { + var chunk = process.stdin.read(); + + logToFile('CHUNK (length ' + chunk.length + '): ' + chunk.toString()); + + if (!!chunk && chunk.length !== 0) { + var lines = chunk.toString().split('\n'); + lines.forEach(function(line) { + logToFile('LINE:***' + line + '***'); + + self.handleNewLine(line); + }) + } + }); +} + +Storm.prototype.handleNewLine = function(line) { + logToFile('handleNewLine LINE: ' + line); + + if (line === 'end') { + logToFile('MESSAGE READY!!\n'); + var msg = this.collectMessageLines(); + this.cleanLines(); + this.handleNewMessage(msg); + } else { + this.storeLine(line); + } +} + +Storm.prototype.collectMessageLines = function() { + return this.lines.join('\n'); +} + +Storm.prototype.cleanLines = function() { + this.lines = []; +} + +Storm.prototype.storeLine = function(line) { + this.lines.push(line); +} + +Storm.prototype.isFirstMsg = function() { + return (this.numMessages === 0); +} + +Storm.prototype.isTaskId = function(msg) { + return (msg instanceof Array); +} + +Storm.prototype.handleNewMessage = function(msg) { + var parsedMsg = JSON.parse(msg); + + logToFile('handleNewMessage ' + msg); + + if (this.isFirstMsg()) { + logToFile('first message'); + this.initSetupInfo(parsedMsg); + } else if (this.isTaskId(parsedMsg)) { + logToFile('task id'); + this.handleNewTaskId(parsedMsg); + } else { + logToFile('command'); + this.handleNewCommand(parsedMsg); + } + + this.numMessages++; +} + +Storm.prototype.emit = function(tup, stream, id, directTask, callback) { + this.taskIdCallbacks.push(callback); + this.__emit(tup, stream, id, directTask); +} + +Storm.prototype.emitDirect = function(tup, stream, id, directTask) { + this.__emit(tup, stream, id, directTask) +} + +Storm.prototype.initialize = function(conf, context) {} + +function Tuple(id, component, stream, task, values) { + this.id = id; + this.component = component; + this.stream = stream; + this.task = task; + this.values = values; +} +// def __repr__(self): +// return '<%s%s>' % ( +// self.__class__.__name__, +// ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + +//function Bolt() {}; +// +//Bolt.prototype.initialize = function(stormconf, context) {}; +// +//Bolt.prototype.process = function(tuple) {}; +// +//Bolt.prototype.run = function() { +// MODE = Bolt +// var setupInfo = initComponent(); +// var conf = setupInfo[0]; +// var context = setupInfo[1]; +// +// this.initialize(conf, context); +// try { +// while (true) { +// var tup = readTuple(); +// this.process(tup); +// } +// } catch(err) { +// log(err); +// } +//} + +function BasicBolt() { + Storm.call(this); + this.anchorTuple = null; +}; + +BasicBolt.prototype = Object.create(Storm.prototype); +BasicBolt.prototype.constructor = Storm; + +BasicBolt.prototype.process = function(tuple) {}; + +BasicBolt.prototype.run = function() { + logToFile('run'); + this.startReadingInput(); +} + +BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { + if (typeof anchors === 'undefined') { + anchors = []; + } + + if (this.anchorTuple !== null) { + anchors = [this.anchorTuple] + } + var m = {"command": "emit"}; + + if (!typeof stream === 'undefined') { + m["stream"] = stream + } + + m["anchors"] = anchors.map(function (a) { + return a.id; + }); + + if (typeof directTask !== 'undefined') { + m["task"] = directTask; + } + m["tuple"] = tup; + sendMsgToParent(m); +} + +BasicBolt.prototype.handleNewTaskId = function(taskId) { + var callback = this.taskIdCallbacks.shift(); + if (callback) { + callback(taskId); + } +} + +BasicBolt.prototype.handleNewCommand = function(command) { + var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); + this.anchorTuple = tup; + this.process(tup); + this.ack(tup); +} + +BasicBolt.prototype.ack = function(tup) { + sendMsgToParent({"command": "ack", "id": tup.id}); +} + +function Spout() {}; +Spout.prototype.initialize = function(conf, context) {}; + +Spout.prototype.ack = function(id) {}; + +Spout.prototype.fail = function(id) {}; + +Spout.prototype.nextTuple = function(callback) {}; + +Spout.prototype.handleNewCommand = function(command) { + var self = this; + var callback = function() { + self.sync(); + } + + if (command["command"] === "next") { + this.nextTuple(callback); + } + + if (command["command"] === "ack") { + this.ack(msg["id"], callback); + } + + if (command["command"] === "fail") { + this.fail(msg["id"], callback); + } +} + +Spout.prototype.__emit = function(tup, stream, id, directTask) { + var m = {"command": "emit"}; + if (typeof id !== 'undefined') { + m["id"] = id; + } + + if (typeof stream !== 'undefined') { + m["stream"] = stream; + } + + if (typeof directTask !== 'undefined') { + m["task"] = directTask; + } + + m["tuple"] = tup; + sendMsgToParent(m); +} + +module.exports.BasicBolt = BasicBolt; +module.exports.logToFile = logToFile; + From e2b561fb40a2a7919cc05e046f8e03ba9cd10b11 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 26 Jun 2014 11:22:46 +0300 Subject: [PATCH 03/44] Change storm-core to compile --- examples/storm-starter/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index 0e0d920b1e3..b93a8465486 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -70,7 +70,7 @@ storm-core ${project.version} - provided + compile commons-collections From 83e804025a12a53189a3273339b7c95886c200b7 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 26 Jun 2014 11:23:21 +0300 Subject: [PATCH 04/44] Fix SplitSentenceBolt inheritance --- examples/storm-starter/multilang/resources/splitsentence.js | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index da8867b7ace..f8732eeb5c8 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -1,9 +1,12 @@ var storm = require('./storm'); var BasicBolt = storm.BasicBolt; -function SplitSentenceBolt() {}; +function SplitSentenceBolt() { + BasicBolt.call(this); +}; SplitSentenceBolt.prototype = new BasicBolt(); +SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); SplitSentenceBolt.prototype.process = function(tup) { var self = this; From 902fa77f993c162e463f9cc373aa4aba11220a31 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 26 Jun 2014 11:23:51 +0300 Subject: [PATCH 05/44] Fix spout handleCommand --- examples/storm-starter/multilang/resources/storm.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index b631f466f2a..9a45faebdfa 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -236,11 +236,11 @@ Spout.prototype.handleNewCommand = function(command) { } if (command["command"] === "ack") { - this.ack(msg["id"], callback); + this.ack(command["id"], callback); } if (command["command"] === "fail") { - this.fail(msg["id"], callback); + this.fail(command["id"], callback); } } From 7c6e999c3cb71c57dfddd336a803f5652b7e62fd Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Sun, 29 Jun 2014 18:41:18 +0300 Subject: [PATCH 06/44] Fix Spout js implementation. WordCountTopology works with js bolt and spout. --- .../multilang/resources/randomsentence.js | 62 ++++++++++++++++ .../multilang/resources/storm.js | 72 +++++++++++-------- .../jvm/storm/starter/WordCountTopology.java | 27 +++++-- 3 files changed, 126 insertions(+), 35 deletions(-) create mode 100644 examples/storm-starter/multilang/resources/randomsentence.js diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js new file mode 100644 index 00000000000..09e0e8041bd --- /dev/null +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -0,0 +1,62 @@ +/** + * Created by anya on 6/26/14. + */ + +var storm = require('./storm'); +var Spout = storm.Spout; + + +var SENTENCES = [ + "the cow jumped over the moon", + "an apple a day keeps the doctor away", + "four score and seven years ago", + "snow white and the seven dwarfs", + "i am at two with nature"] + +function RandomSentenceSpout() { + Spout.call(this); + this.runningTupleId = getRandomInt(0,Math.pow(2,16)); + this.pending = {}; + + storm.logToFile('CREATE NEW RandomSentenceSpout'); +}; + +RandomSentenceSpout.prototype = new Spout(); +RandomSentenceSpout.prototype = Object.create(Spout.prototype); + +RandomSentenceSpout.prototype.getRandomSentence = function() { + return SENTENCES[getRandomInt(0, SENTENCES.length - 1)]; +} + +RandomSentenceSpout.prototype.initialize = function(conf, context) { + storm.logToFile("CONF: " + JSON.stringify(conf)); + storm.logToFile("CONTEXT: " + JSON.stringify(context)); +} + +RandomSentenceSpout.prototype.nextTuple = function(callback) { + var sentence = this.getRandomSentence(); + var tup = [sentence]; + var id = this.runningTupleId; + this.pending[id] = tup; + this.emit(tup, null, id, null); + this.runningTupleId++; + callback(); +} + +RandomSentenceSpout.prototype.ack = function(id, callback) { + this.logToFile('RECEIVED ACK - ' + JSON.stringify(id)); + delete this.pending[id]; + callback(); +} + +RandomSentenceSpout.prototype.fail = function(id, callback) { + this.logToFile('RECEIVED FAIL - ' + JSON.stringify(id)); + this.emit(this.pending[id], null, id, null); + callback(); +} + +function getRandomInt(min, max) { + return Math.floor(Math.random() * (max - min + 1)) + min; +} + +new RandomSentenceSpout().run(); diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 9a45faebdfa..d0172ab86c0 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -2,6 +2,7 @@ var fs = require('fs'); function logToFile(msg) { + fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); } @@ -35,6 +36,10 @@ function Storm() { this.numMessages = 0; } +Storm.prototype.logToFile = function(msg) { + logToFile(this.name + ':\n' + msg); +} + Storm.prototype.initSetupInfo = function(setupInfo) { sendpid(setupInfo['pidDir']); this.initialize(setupInfo['conf'], setupInfo['context']); @@ -42,18 +47,14 @@ Storm.prototype.initSetupInfo = function(setupInfo) { Storm.prototype.startReadingInput = function() { var self = this; - logToFile('startReadingInput'); + this.logToFile('startReadingInput'); process.stdin.on('readable', function() { var chunk = process.stdin.read(); - logToFile('CHUNK (length ' + chunk.length + '): ' + chunk.toString()); - if (!!chunk && chunk.length !== 0) { var lines = chunk.toString().split('\n'); lines.forEach(function(line) { - logToFile('LINE:***' + line + '***'); - self.handleNewLine(line); }) } @@ -61,10 +62,10 @@ Storm.prototype.startReadingInput = function() { } Storm.prototype.handleNewLine = function(line) { - logToFile('handleNewLine LINE: ' + line); + this.logToFile('handleNewLine LINE: ' + line); if (line === 'end') { - logToFile('MESSAGE READY!!\n'); + this.logToFile('MESSAGE READY!!\n'); var msg = this.collectMessageLines(); this.cleanLines(); this.handleNewMessage(msg); @@ -96,22 +97,28 @@ Storm.prototype.isTaskId = function(msg) { Storm.prototype.handleNewMessage = function(msg) { var parsedMsg = JSON.parse(msg); - logToFile('handleNewMessage ' + msg); + this.logToFile('handleNewMessage ' + msg); if (this.isFirstMsg()) { - logToFile('first message'); + this.logToFile('first message'); this.initSetupInfo(parsedMsg); } else if (this.isTaskId(parsedMsg)) { - logToFile('task id'); + this.logToFile('task id'); this.handleNewTaskId(parsedMsg); } else { - logToFile('command'); + this.logToFile('command'); this.handleNewCommand(parsedMsg); } - this.numMessages++; } +Storm.prototype.handleNewTaskId = function(taskId) { + var callback = this.taskIdCallbacks.shift(); + if (callback) { + callback(taskId); + } +} + Storm.prototype.emit = function(tup, stream, id, directTask, callback) { this.taskIdCallbacks.push(callback); this.__emit(tup, stream, id, directTask); @@ -123,6 +130,11 @@ Storm.prototype.emitDirect = function(tup, stream, id, directTask) { Storm.prototype.initialize = function(conf, context) {} +Storm.prototype.run = function() { + this.logToFile('run'); + this.startReadingInput(); +} + function Tuple(id, component, stream, task, values) { this.id = id; this.component = component; @@ -161,6 +173,7 @@ function Tuple(id, component, stream, task, values) { function BasicBolt() { Storm.call(this); this.anchorTuple = null; + this.name = 'BOLT' }; BasicBolt.prototype = Object.create(Storm.prototype); @@ -168,26 +181,24 @@ BasicBolt.prototype.constructor = Storm; BasicBolt.prototype.process = function(tuple) {}; -BasicBolt.prototype.run = function() { - logToFile('run'); - this.startReadingInput(); -} - BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { + var self = this; if (typeof anchors === 'undefined') { anchors = []; } if (this.anchorTuple !== null) { + this.logToFile('Anchor tuple id - ' + this.anchorTuple.id); anchors = [this.anchorTuple] } var m = {"command": "emit"}; - if (!typeof stream === 'undefined') { + if (typeof stream !== 'undefined') { m["stream"] = stream } m["anchors"] = anchors.map(function (a) { + self.logToFile('ID - ' + a.id); return a.id; }); @@ -198,15 +209,9 @@ BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { sendMsgToParent(m); } -BasicBolt.prototype.handleNewTaskId = function(taskId) { - var callback = this.taskIdCallbacks.shift(); - if (callback) { - callback(taskId); - } -} - BasicBolt.prototype.handleNewCommand = function(command) { var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); + this.logToFile('Anchor tuple: id - ' + command["id"] + ' tuple - ' + JSON.stringify(command['tuple'])); this.anchorTuple = tup; this.process(tup); this.ack(tup); @@ -216,8 +221,16 @@ BasicBolt.prototype.ack = function(tup) { sendMsgToParent({"command": "ack", "id": tup.id}); } -function Spout() {}; -Spout.prototype.initialize = function(conf, context) {}; +function Spout() { + Storm.call(this); + this.name = 'SPOUT'; +}; +Spout.prototype = Object.create(Storm.prototype); +Spout.prototype.constructor = Storm; + +Spout.prototype.initialize = function(conf, context) { + this.emit(['Spout Initializing']); +}; Spout.prototype.ack = function(id) {}; @@ -228,7 +241,7 @@ Spout.prototype.nextTuple = function(callback) {}; Spout.prototype.handleNewCommand = function(command) { var self = this; var callback = function() { - self.sync(); + sync(); } if (command["command"] === "next") { @@ -264,4 +277,5 @@ Spout.prototype.__emit = function(tup, stream, id, directTask) { module.exports.BasicBolt = BasicBolt; module.exports.logToFile = logToFile; - +module.exports.Spout = Spout; +module.exports.log = log; diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java index 39184daa3e8..e189911d697 100644 --- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java @@ -20,11 +20,9 @@ import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; +import backtype.storm.spout.ShellSpout; import backtype.storm.task.ShellBolt; -import backtype.storm.topology.BasicOutputCollector; -import backtype.storm.topology.IRichBolt; -import backtype.storm.topology.OutputFieldsDeclarer; -import backtype.storm.topology.TopologyBuilder; +import backtype.storm.topology.*; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; @@ -41,7 +39,7 @@ public class WordCountTopology { public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { - super("python", "splitsentence.py"); + super("node", "splitsentence.js"); } @Override @@ -55,6 +53,23 @@ public Map getComponentConfiguration() { } } + public static class RandomSentence extends ShellSpout implements IRichSpout { + + public RandomSentence() { + super("node", "randomsentence.js"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + public static class WordCount extends BaseBasicBolt { Map counts = new HashMap(); @@ -79,7 +94,7 @@ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentenceSpout(), 5); + builder.setSpout("spout", new RandomSentence(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); From f3f3e5cd8ac46419444e58e2dab24dff7a85cd26 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Sun, 29 Jun 2014 18:43:39 +0300 Subject: [PATCH 07/44] Add js implemetation of storm - basic bolt and spout. --- storm-core/src/multilang/js/storm.js | 281 +++++++++++++++++++++++++++ 1 file changed, 281 insertions(+) create mode 100755 storm-core/src/multilang/js/storm.js diff --git a/storm-core/src/multilang/js/storm.js b/storm-core/src/multilang/js/storm.js new file mode 100755 index 00000000000..d0172ab86c0 --- /dev/null +++ b/storm-core/src/multilang/js/storm.js @@ -0,0 +1,281 @@ + +var fs = require('fs'); + +function logToFile(msg) { + + fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); +} + +function sendMsgToParent(msg){ + logToFile('SEND MESSAGE TO PARENT: ' + JSON.stringify(msg)); + var str = JSON.stringify(msg) + '\nend\n'; + process.stdout.write(str); +} + +function sync(){ + sendMsgToParent({'command':'sync'}); +} + +function sendpid(heartbeatdir){ + var pid = process.pid; + sendMsgToParent({'pid':pid}) + fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); +} + +function fail(tup) { + sendMsgToParent({"command": "fail", "id": tup.id}); +} + +function log(msg) { + sendMsgToParent({"command": "log", "msg": msg}); +} + +function Storm() { + this.lines = []; + this.taskIdCallbacks = []; + this.numMessages = 0; +} + +Storm.prototype.logToFile = function(msg) { + logToFile(this.name + ':\n' + msg); +} + +Storm.prototype.initSetupInfo = function(setupInfo) { + sendpid(setupInfo['pidDir']); + this.initialize(setupInfo['conf'], setupInfo['context']); +} + +Storm.prototype.startReadingInput = function() { + var self = this; + this.logToFile('startReadingInput'); + + process.stdin.on('readable', function() { + var chunk = process.stdin.read(); + + if (!!chunk && chunk.length !== 0) { + var lines = chunk.toString().split('\n'); + lines.forEach(function(line) { + self.handleNewLine(line); + }) + } + }); +} + +Storm.prototype.handleNewLine = function(line) { + this.logToFile('handleNewLine LINE: ' + line); + + if (line === 'end') { + this.logToFile('MESSAGE READY!!\n'); + var msg = this.collectMessageLines(); + this.cleanLines(); + this.handleNewMessage(msg); + } else { + this.storeLine(line); + } +} + +Storm.prototype.collectMessageLines = function() { + return this.lines.join('\n'); +} + +Storm.prototype.cleanLines = function() { + this.lines = []; +} + +Storm.prototype.storeLine = function(line) { + this.lines.push(line); +} + +Storm.prototype.isFirstMsg = function() { + return (this.numMessages === 0); +} + +Storm.prototype.isTaskId = function(msg) { + return (msg instanceof Array); +} + +Storm.prototype.handleNewMessage = function(msg) { + var parsedMsg = JSON.parse(msg); + + this.logToFile('handleNewMessage ' + msg); + + if (this.isFirstMsg()) { + this.logToFile('first message'); + this.initSetupInfo(parsedMsg); + } else if (this.isTaskId(parsedMsg)) { + this.logToFile('task id'); + this.handleNewTaskId(parsedMsg); + } else { + this.logToFile('command'); + this.handleNewCommand(parsedMsg); + } + this.numMessages++; +} + +Storm.prototype.handleNewTaskId = function(taskId) { + var callback = this.taskIdCallbacks.shift(); + if (callback) { + callback(taskId); + } +} + +Storm.prototype.emit = function(tup, stream, id, directTask, callback) { + this.taskIdCallbacks.push(callback); + this.__emit(tup, stream, id, directTask); +} + +Storm.prototype.emitDirect = function(tup, stream, id, directTask) { + this.__emit(tup, stream, id, directTask) +} + +Storm.prototype.initialize = function(conf, context) {} + +Storm.prototype.run = function() { + this.logToFile('run'); + this.startReadingInput(); +} + +function Tuple(id, component, stream, task, values) { + this.id = id; + this.component = component; + this.stream = stream; + this.task = task; + this.values = values; +} +// def __repr__(self): +// return '<%s%s>' % ( +// self.__class__.__name__, +// ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) + +//function Bolt() {}; +// +//Bolt.prototype.initialize = function(stormconf, context) {}; +// +//Bolt.prototype.process = function(tuple) {}; +// +//Bolt.prototype.run = function() { +// MODE = Bolt +// var setupInfo = initComponent(); +// var conf = setupInfo[0]; +// var context = setupInfo[1]; +// +// this.initialize(conf, context); +// try { +// while (true) { +// var tup = readTuple(); +// this.process(tup); +// } +// } catch(err) { +// log(err); +// } +//} + +function BasicBolt() { + Storm.call(this); + this.anchorTuple = null; + this.name = 'BOLT' +}; + +BasicBolt.prototype = Object.create(Storm.prototype); +BasicBolt.prototype.constructor = Storm; + +BasicBolt.prototype.process = function(tuple) {}; + +BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { + var self = this; + if (typeof anchors === 'undefined') { + anchors = []; + } + + if (this.anchorTuple !== null) { + this.logToFile('Anchor tuple id - ' + this.anchorTuple.id); + anchors = [this.anchorTuple] + } + var m = {"command": "emit"}; + + if (typeof stream !== 'undefined') { + m["stream"] = stream + } + + m["anchors"] = anchors.map(function (a) { + self.logToFile('ID - ' + a.id); + return a.id; + }); + + if (typeof directTask !== 'undefined') { + m["task"] = directTask; + } + m["tuple"] = tup; + sendMsgToParent(m); +} + +BasicBolt.prototype.handleNewCommand = function(command) { + var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); + this.logToFile('Anchor tuple: id - ' + command["id"] + ' tuple - ' + JSON.stringify(command['tuple'])); + this.anchorTuple = tup; + this.process(tup); + this.ack(tup); +} + +BasicBolt.prototype.ack = function(tup) { + sendMsgToParent({"command": "ack", "id": tup.id}); +} + +function Spout() { + Storm.call(this); + this.name = 'SPOUT'; +}; +Spout.prototype = Object.create(Storm.prototype); +Spout.prototype.constructor = Storm; + +Spout.prototype.initialize = function(conf, context) { + this.emit(['Spout Initializing']); +}; + +Spout.prototype.ack = function(id) {}; + +Spout.prototype.fail = function(id) {}; + +Spout.prototype.nextTuple = function(callback) {}; + +Spout.prototype.handleNewCommand = function(command) { + var self = this; + var callback = function() { + sync(); + } + + if (command["command"] === "next") { + this.nextTuple(callback); + } + + if (command["command"] === "ack") { + this.ack(command["id"], callback); + } + + if (command["command"] === "fail") { + this.fail(command["id"], callback); + } +} + +Spout.prototype.__emit = function(tup, stream, id, directTask) { + var m = {"command": "emit"}; + if (typeof id !== 'undefined') { + m["id"] = id; + } + + if (typeof stream !== 'undefined') { + m["stream"] = stream; + } + + if (typeof directTask !== 'undefined') { + m["task"] = directTask; + } + + m["tuple"] = tup; + sendMsgToParent(m); +} + +module.exports.BasicBolt = BasicBolt; +module.exports.logToFile = logToFile; +module.exports.Spout = Spout; +module.exports.log = log; From 2cfc90842c870c887d88cd1d854a42439c5fec52 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Mon, 30 Jun 2014 12:23:35 +0300 Subject: [PATCH 08/44] Pass callback to BasicBolt.process to enable async functionality. Move general methods into Storm class --- .../multilang/resources/splitsentence.js | 8 +- .../multilang/resources/storm.js | 74 ++++++++++--------- 2 files changed, 43 insertions(+), 39 deletions(-) diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index f8732eeb5c8..48fa5e69663 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -8,7 +8,7 @@ function SplitSentenceBolt() { SplitSentenceBolt.prototype = new BasicBolt(); SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); -SplitSentenceBolt.prototype.process = function(tup) { +SplitSentenceBolt.prototype.process = function(tup, callback) { var self = this; var words = tup.values[0].split(" "); words.forEach(function(word) { @@ -16,11 +16,7 @@ SplitSentenceBolt.prototype.process = function(tup) { storm.logToFile('Task id - ' + JSON.stringify(taskId) + ' work - ' + word); }); }); -} - -SplitSentenceBolt.prototype.initialize = function(conf, context) { - storm.logToFile("CONF: " + JSON.stringify(conf)); - storm.logToFile("CONTEXT: " + JSON.stringify(context)); + callback(); } new SplitSentenceBolt().run(); \ No newline at end of file diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index d0172ab86c0..1a6ea1b55f5 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -6,42 +6,39 @@ function logToFile(msg) { fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); } -function sendMsgToParent(msg){ - logToFile('SEND MESSAGE TO PARENT: ' + JSON.stringify(msg)); - var str = JSON.stringify(msg) + '\nend\n'; - process.stdout.write(str); -} -function sync(){ - sendMsgToParent({'command':'sync'}); +function Storm() { + this.lines = []; + this.taskIdCallbacks = []; + this.numMessages = 0; } -function sendpid(heartbeatdir){ - var pid = process.pid; - sendMsgToParent({'pid':pid}) - fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); +Storm.prototype.logToFile = function(msg) { + logToFile(this.name + ':\n' + msg); } -function fail(tup) { - sendMsgToParent({"command": "fail", "id": tup.id}); +Storm.prototype.sendMsgToParent = function(msg) { + logToFile('SEND MESSAGE TO PARENT: ' + JSON.stringify(msg)); + var str = JSON.stringify(msg) + '\nend\n'; + process.stdout.write(str); } -function log(msg) { - sendMsgToParent({"command": "log", "msg": msg}); +Storm.prototype.sync = function() { + this.sendMsgToParent({'command':'sync'}); } -function Storm() { - this.lines = []; - this.taskIdCallbacks = []; - this.numMessages = 0; +Storm.prototype.sendpid = function(heartbeatdir) { + var pid = process.pid; + this.sendMsgToParent({'pid':pid}) + fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); } -Storm.prototype.logToFile = function(msg) { - logToFile(this.name + ':\n' + msg); +Storm.prototype.log = function(msg) { + this.sendMsgToParent({"command": "log", "msg": msg}); } Storm.prototype.initSetupInfo = function(setupInfo) { - sendpid(setupInfo['pidDir']); + this.sendpid(setupInfo['pidDir']); this.initialize(setupInfo['conf'], setupInfo['context']); } @@ -128,7 +125,10 @@ Storm.prototype.emitDirect = function(tup, stream, id, directTask) { this.__emit(tup, stream, id, directTask) } -Storm.prototype.initialize = function(conf, context) {} +Storm.prototype.initialize = function(conf, context) { + this.logToFile("CONF: " + JSON.stringify(conf)); + this.logToFile("CONTEXT: " + JSON.stringify(context)); +} Storm.prototype.run = function() { this.logToFile('run'); @@ -179,7 +179,7 @@ function BasicBolt() { BasicBolt.prototype = Object.create(Storm.prototype); BasicBolt.prototype.constructor = Storm; -BasicBolt.prototype.process = function(tuple) {}; +BasicBolt.prototype.process = function(tuple, callback) {}; BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { var self = this; @@ -206,19 +206,28 @@ BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { m["task"] = directTask; } m["tuple"] = tup; - sendMsgToParent(m); + this.sendMsgToParent(m); } BasicBolt.prototype.handleNewCommand = function(command) { + var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); - this.logToFile('Anchor tuple: id - ' + command["id"] + ' tuple - ' + JSON.stringify(command['tuple'])); this.anchorTuple = tup; - this.process(tup); - this.ack(tup); + var callback = function(err) { + if (err) { + self.fail() + } + self.ack(tup); + } + this.process(tup, callback); } BasicBolt.prototype.ack = function(tup) { - sendMsgToParent({"command": "ack", "id": tup.id}); + this.sendMsgToParent({"command": "ack", "id": tup.id}); +} + +BasicBolt.prototype.fail = function(tup) { + this.sendMsgToParent({"command": "fail", "id": tup.id}); } function Spout() { @@ -241,7 +250,7 @@ Spout.prototype.nextTuple = function(callback) {}; Spout.prototype.handleNewCommand = function(command) { var self = this; var callback = function() { - sync(); + self.sync(); } if (command["command"] === "next") { @@ -272,10 +281,9 @@ Spout.prototype.__emit = function(tup, stream, id, directTask) { } m["tuple"] = tup; - sendMsgToParent(m); + this.sendMsgToParent(m); } module.exports.BasicBolt = BasicBolt; module.exports.logToFile = logToFile; -module.exports.Spout = Spout; -module.exports.log = log; +module.exports.Spout = Spout; \ No newline at end of file From e996e161b35f288c890b8d114e8518e6c3cd43be Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Mon, 30 Jun 2014 12:23:53 +0300 Subject: [PATCH 09/44] Add async bold --- .../multilang/resources/asyncSplitsentence.js | 37 +++++++++++++++++++ 1 file changed, 37 insertions(+) create mode 100644 examples/storm-starter/multilang/resources/asyncSplitsentence.js diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js new file mode 100644 index 00000000000..ef8f2993354 --- /dev/null +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -0,0 +1,37 @@ +/** + * Created by anya on 6/30/14. + */ + + +var storm = require('./storm'); +var BasicBolt = storm.BasicBolt; + +function SplitSentenceBolt() { + BasicBolt.call(this); +}; + +SplitSentenceBolt.prototype = new BasicBolt(); +SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); + +SplitSentenceBolt.prototype.run = function() { + var self = this; + setTimeout(function() { + self.logToFile('run'); + self.startReadingInput(); + }, 500) +} + +SplitSentenceBolt.prototype.process = function(tup, callback) { + var self = this; + setTimeout(function() { + var words = tup.values[0].split(" "); + words.forEach(function(word) { + self.emit([word], null, null, null, function(taskId) { + storm.logToFile('Task id - ' + JSON.stringify(taskId) + ' work - ' + word); + }); + }); + callback(); + }, 5000) +} + +new SplitSentenceBolt().run(); \ No newline at end of file From 9eee5df3187849994bb484063fd524e73ce564a4 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Mon, 30 Jun 2014 12:33:11 +0300 Subject: [PATCH 10/44] Fix call to fail after process - pass tuple --- examples/storm-starter/multilang/resources/storm.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 1a6ea1b55f5..74e078cd78a 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -214,8 +214,8 @@ BasicBolt.prototype.handleNewCommand = function(command) { var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); this.anchorTuple = tup; var callback = function(err) { - if (err) { - self.fail() + if (!!err) { + self.fail(tup); } self.ack(tup); } From b644a1feb398b44b2f0654ee78b7f504393a8cc3 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 10:42:11 +0300 Subject: [PATCH 11/44] Remove storm.js copy --- storm-core/src/multilang/js/storm.js | 281 --------------------------- 1 file changed, 281 deletions(-) delete mode 100755 storm-core/src/multilang/js/storm.js diff --git a/storm-core/src/multilang/js/storm.js b/storm-core/src/multilang/js/storm.js deleted file mode 100755 index d0172ab86c0..00000000000 --- a/storm-core/src/multilang/js/storm.js +++ /dev/null @@ -1,281 +0,0 @@ - -var fs = require('fs'); - -function logToFile(msg) { - - fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); -} - -function sendMsgToParent(msg){ - logToFile('SEND MESSAGE TO PARENT: ' + JSON.stringify(msg)); - var str = JSON.stringify(msg) + '\nend\n'; - process.stdout.write(str); -} - -function sync(){ - sendMsgToParent({'command':'sync'}); -} - -function sendpid(heartbeatdir){ - var pid = process.pid; - sendMsgToParent({'pid':pid}) - fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); -} - -function fail(tup) { - sendMsgToParent({"command": "fail", "id": tup.id}); -} - -function log(msg) { - sendMsgToParent({"command": "log", "msg": msg}); -} - -function Storm() { - this.lines = []; - this.taskIdCallbacks = []; - this.numMessages = 0; -} - -Storm.prototype.logToFile = function(msg) { - logToFile(this.name + ':\n' + msg); -} - -Storm.prototype.initSetupInfo = function(setupInfo) { - sendpid(setupInfo['pidDir']); - this.initialize(setupInfo['conf'], setupInfo['context']); -} - -Storm.prototype.startReadingInput = function() { - var self = this; - this.logToFile('startReadingInput'); - - process.stdin.on('readable', function() { - var chunk = process.stdin.read(); - - if (!!chunk && chunk.length !== 0) { - var lines = chunk.toString().split('\n'); - lines.forEach(function(line) { - self.handleNewLine(line); - }) - } - }); -} - -Storm.prototype.handleNewLine = function(line) { - this.logToFile('handleNewLine LINE: ' + line); - - if (line === 'end') { - this.logToFile('MESSAGE READY!!\n'); - var msg = this.collectMessageLines(); - this.cleanLines(); - this.handleNewMessage(msg); - } else { - this.storeLine(line); - } -} - -Storm.prototype.collectMessageLines = function() { - return this.lines.join('\n'); -} - -Storm.prototype.cleanLines = function() { - this.lines = []; -} - -Storm.prototype.storeLine = function(line) { - this.lines.push(line); -} - -Storm.prototype.isFirstMsg = function() { - return (this.numMessages === 0); -} - -Storm.prototype.isTaskId = function(msg) { - return (msg instanceof Array); -} - -Storm.prototype.handleNewMessage = function(msg) { - var parsedMsg = JSON.parse(msg); - - this.logToFile('handleNewMessage ' + msg); - - if (this.isFirstMsg()) { - this.logToFile('first message'); - this.initSetupInfo(parsedMsg); - } else if (this.isTaskId(parsedMsg)) { - this.logToFile('task id'); - this.handleNewTaskId(parsedMsg); - } else { - this.logToFile('command'); - this.handleNewCommand(parsedMsg); - } - this.numMessages++; -} - -Storm.prototype.handleNewTaskId = function(taskId) { - var callback = this.taskIdCallbacks.shift(); - if (callback) { - callback(taskId); - } -} - -Storm.prototype.emit = function(tup, stream, id, directTask, callback) { - this.taskIdCallbacks.push(callback); - this.__emit(tup, stream, id, directTask); -} - -Storm.prototype.emitDirect = function(tup, stream, id, directTask) { - this.__emit(tup, stream, id, directTask) -} - -Storm.prototype.initialize = function(conf, context) {} - -Storm.prototype.run = function() { - this.logToFile('run'); - this.startReadingInput(); -} - -function Tuple(id, component, stream, task, values) { - this.id = id; - this.component = component; - this.stream = stream; - this.task = task; - this.values = values; -} -// def __repr__(self): -// return '<%s%s>' % ( -// self.__class__.__name__, -// ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) - -//function Bolt() {}; -// -//Bolt.prototype.initialize = function(stormconf, context) {}; -// -//Bolt.prototype.process = function(tuple) {}; -// -//Bolt.prototype.run = function() { -// MODE = Bolt -// var setupInfo = initComponent(); -// var conf = setupInfo[0]; -// var context = setupInfo[1]; -// -// this.initialize(conf, context); -// try { -// while (true) { -// var tup = readTuple(); -// this.process(tup); -// } -// } catch(err) { -// log(err); -// } -//} - -function BasicBolt() { - Storm.call(this); - this.anchorTuple = null; - this.name = 'BOLT' -}; - -BasicBolt.prototype = Object.create(Storm.prototype); -BasicBolt.prototype.constructor = Storm; - -BasicBolt.prototype.process = function(tuple) {}; - -BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { - var self = this; - if (typeof anchors === 'undefined') { - anchors = []; - } - - if (this.anchorTuple !== null) { - this.logToFile('Anchor tuple id - ' + this.anchorTuple.id); - anchors = [this.anchorTuple] - } - var m = {"command": "emit"}; - - if (typeof stream !== 'undefined') { - m["stream"] = stream - } - - m["anchors"] = anchors.map(function (a) { - self.logToFile('ID - ' + a.id); - return a.id; - }); - - if (typeof directTask !== 'undefined') { - m["task"] = directTask; - } - m["tuple"] = tup; - sendMsgToParent(m); -} - -BasicBolt.prototype.handleNewCommand = function(command) { - var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); - this.logToFile('Anchor tuple: id - ' + command["id"] + ' tuple - ' + JSON.stringify(command['tuple'])); - this.anchorTuple = tup; - this.process(tup); - this.ack(tup); -} - -BasicBolt.prototype.ack = function(tup) { - sendMsgToParent({"command": "ack", "id": tup.id}); -} - -function Spout() { - Storm.call(this); - this.name = 'SPOUT'; -}; -Spout.prototype = Object.create(Storm.prototype); -Spout.prototype.constructor = Storm; - -Spout.prototype.initialize = function(conf, context) { - this.emit(['Spout Initializing']); -}; - -Spout.prototype.ack = function(id) {}; - -Spout.prototype.fail = function(id) {}; - -Spout.prototype.nextTuple = function(callback) {}; - -Spout.prototype.handleNewCommand = function(command) { - var self = this; - var callback = function() { - sync(); - } - - if (command["command"] === "next") { - this.nextTuple(callback); - } - - if (command["command"] === "ack") { - this.ack(command["id"], callback); - } - - if (command["command"] === "fail") { - this.fail(command["id"], callback); - } -} - -Spout.prototype.__emit = function(tup, stream, id, directTask) { - var m = {"command": "emit"}; - if (typeof id !== 'undefined') { - m["id"] = id; - } - - if (typeof stream !== 'undefined') { - m["stream"] = stream; - } - - if (typeof directTask !== 'undefined') { - m["task"] = directTask; - } - - m["tuple"] = tup; - sendMsgToParent(m); -} - -module.exports.BasicBolt = BasicBolt; -module.exports.logToFile = logToFile; -module.exports.Spout = Spout; -module.exports.log = log; From 6fcc42cd2d0d2f2337207b1d09841257ee1150db Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 11:58:24 +0300 Subject: [PATCH 12/44] Fix inheritence mess. Fix randomsentence spout initialize --- .../multilang/resources/asyncSplitsentence.js | 2 +- .../multilang/resources/randomsentence.js | 7 +------ .../multilang/resources/splitsentence.js | 2 +- .../multilang/resources/storm.js | 19 ++++++++++--------- 4 files changed, 13 insertions(+), 17 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index ef8f2993354..87cc0142d4d 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -10,8 +10,8 @@ function SplitSentenceBolt() { BasicBolt.call(this); }; -SplitSentenceBolt.prototype = new BasicBolt(); SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); +SplitSentenceBolt.prototype.constructor = SplitSentenceBolt; SplitSentenceBolt.prototype.run = function() { var self = this; diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 09e0e8041bd..9201127ff00 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -21,18 +21,13 @@ function RandomSentenceSpout() { storm.logToFile('CREATE NEW RandomSentenceSpout'); }; -RandomSentenceSpout.prototype = new Spout(); RandomSentenceSpout.prototype = Object.create(Spout.prototype); +RandomSentenceSpout.prototype.constructor = RandomSentenceSpout; RandomSentenceSpout.prototype.getRandomSentence = function() { return SENTENCES[getRandomInt(0, SENTENCES.length - 1)]; } -RandomSentenceSpout.prototype.initialize = function(conf, context) { - storm.logToFile("CONF: " + JSON.stringify(conf)); - storm.logToFile("CONTEXT: " + JSON.stringify(context)); -} - RandomSentenceSpout.prototype.nextTuple = function(callback) { var sentence = this.getRandomSentence(); var tup = [sentence]; diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index 48fa5e69663..175aa8cb6ac 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -5,8 +5,8 @@ function SplitSentenceBolt() { BasicBolt.call(this); }; -SplitSentenceBolt.prototype = new BasicBolt(); SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); +SplitSentenceBolt.prototype.constructor = SplitSentenceBolt; SplitSentenceBolt.prototype.process = function(tup, callback) { var self = this; diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 74e078cd78a..b50b9f1d727 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -38,8 +38,12 @@ Storm.prototype.log = function(msg) { } Storm.prototype.initSetupInfo = function(setupInfo) { - this.sendpid(setupInfo['pidDir']); - this.initialize(setupInfo['conf'], setupInfo['context']); + var self = this; + var callback = function() { + self.logToFile('Inside initialize callback, sending pid.') + self.sendpid(setupInfo['pidDir']); + } + this.initialize(setupInfo['conf'], setupInfo['context'], callback); } Storm.prototype.startReadingInput = function() { @@ -125,9 +129,10 @@ Storm.prototype.emitDirect = function(tup, stream, id, directTask) { this.__emit(tup, stream, id, directTask) } -Storm.prototype.initialize = function(conf, context) { +Storm.prototype.initialize = function(conf, context, callback) { this.logToFile("CONF: " + JSON.stringify(conf)); this.logToFile("CONTEXT: " + JSON.stringify(context)); + callback(); } Storm.prototype.run = function() { @@ -177,7 +182,7 @@ function BasicBolt() { }; BasicBolt.prototype = Object.create(Storm.prototype); -BasicBolt.prototype.constructor = Storm; +BasicBolt.prototype.constructor = BasicBolt; BasicBolt.prototype.process = function(tuple, callback) {}; @@ -235,11 +240,7 @@ function Spout() { this.name = 'SPOUT'; }; Spout.prototype = Object.create(Storm.prototype); -Spout.prototype.constructor = Storm; - -Spout.prototype.initialize = function(conf, context) { - this.emit(['Spout Initializing']); -}; +Spout.prototype.constructor = Spout; Spout.prototype.ack = function(id) {}; From 94ed36042a8e8395bd60ecb9ef1aa2468fde88b7 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 12:23:41 +0300 Subject: [PATCH 13/44] Add documentation to modules --- .../storm-starter/multilang/resources/asyncSplitsentence.js | 2 ++ examples/storm-starter/multilang/resources/randomsentence.js | 3 +++ examples/storm-starter/multilang/resources/splitsentence.js | 4 ++++ examples/storm-starter/multilang/resources/storm.js | 4 +++- 4 files changed, 12 insertions(+), 1 deletion(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index 87cc0142d4d..8e20b90968a 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -1,4 +1,6 @@ /** + * Simple example for async bolt. Receives sentence and breaks it into words. + * * Created by anya on 6/30/14. */ diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 9201127ff00..142fff43050 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -1,4 +1,7 @@ /** + * Simple example for storm spout. Emits random sentences. + * The original class in java - storm.starter.spout.RandomSentenceSpout. + * * Created by anya on 6/26/14. */ diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index 175aa8cb6ac..50b3d08d253 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -1,3 +1,7 @@ +/** + * Simple Bolt example - receives sentence and breaks it into words. + */ + var storm = require('./storm'); var BasicBolt = storm.BasicBolt; diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index b50b9f1d727..8dfc7e96aec 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -1,3 +1,6 @@ +/** + * Base classes in node-js for storm Bolt and Spout. + */ var fs = require('fs'); @@ -6,7 +9,6 @@ function logToFile(msg) { fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); } - function Storm() { this.lines = []; this.taskIdCallbacks = []; From 847b5095274d05857b4732862bdfb8009b339bfd Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 13:48:06 +0300 Subject: [PATCH 14/44] Remove async run implementation --- .../multilang/resources/asyncSplitsentence.js | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index 8e20b90968a..5e4400f7656 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -15,16 +15,10 @@ function SplitSentenceBolt() { SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); SplitSentenceBolt.prototype.constructor = SplitSentenceBolt; -SplitSentenceBolt.prototype.run = function() { - var self = this; - setTimeout(function() { - self.logToFile('run'); - self.startReadingInput(); - }, 500) -} - SplitSentenceBolt.prototype.process = function(tup, callback) { var self = this; + + // Here setTimeout is not really needed, we use it to demonstrate asynchronous code in the process method: setTimeout(function() { var words = tup.values[0].split(" "); words.forEach(function(word) { From e4d15d044f962ed4cc7782527264e0773f33f586 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 14:14:18 +0300 Subject: [PATCH 15/44] Change callback to done. --- .../multilang/resources/randomsentence.js | 12 ++++++------ examples/storm-starter/multilang/resources/storm.js | 8 ++++---- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 142fff43050..89132a618b9 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -31,26 +31,26 @@ RandomSentenceSpout.prototype.getRandomSentence = function() { return SENTENCES[getRandomInt(0, SENTENCES.length - 1)]; } -RandomSentenceSpout.prototype.nextTuple = function(callback) { +RandomSentenceSpout.prototype.nextTuple = function(done) { var sentence = this.getRandomSentence(); var tup = [sentence]; var id = this.runningTupleId; this.pending[id] = tup; this.emit(tup, null, id, null); this.runningTupleId++; - callback(); + done(); } -RandomSentenceSpout.prototype.ack = function(id, callback) { +RandomSentenceSpout.prototype.ack = function(id, done) { this.logToFile('RECEIVED ACK - ' + JSON.stringify(id)); delete this.pending[id]; - callback(); + done(); } -RandomSentenceSpout.prototype.fail = function(id, callback) { +RandomSentenceSpout.prototype.fail = function(id, done) { this.logToFile('RECEIVED FAIL - ' + JSON.stringify(id)); this.emit(this.pending[id], null, id, null); - callback(); + done(); } function getRandomInt(min, max) { diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 8dfc7e96aec..7b8dc001814 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -186,7 +186,7 @@ function BasicBolt() { BasicBolt.prototype = Object.create(Storm.prototype); BasicBolt.prototype.constructor = BasicBolt; -BasicBolt.prototype.process = function(tuple, callback) {}; +BasicBolt.prototype.process = function(tuple, done) {}; BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { var self = this; @@ -244,11 +244,11 @@ function Spout() { Spout.prototype = Object.create(Storm.prototype); Spout.prototype.constructor = Spout; -Spout.prototype.ack = function(id) {}; +Spout.prototype.ack = function(id, done) {}; -Spout.prototype.fail = function(id) {}; +Spout.prototype.fail = function(id, done) {}; -Spout.prototype.nextTuple = function(callback) {}; +Spout.prototype.nextTuple = function(done) {}; Spout.prototype.handleNewCommand = function(command) { var self = this; From 32dbb93f77368fc4b67826561a2d6a4fd732a0ef Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 14:51:21 +0300 Subject: [PATCH 16/44] Pass sentence list to the ocnstructor of the spout --- .../storm-starter/multilang/resources/randomsentence.js | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 89132a618b9..f06049079b1 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -16,9 +16,10 @@ var SENTENCES = [ "snow white and the seven dwarfs", "i am at two with nature"] -function RandomSentenceSpout() { +function RandomSentenceSpout(sentences) { Spout.call(this); this.runningTupleId = getRandomInt(0,Math.pow(2,16)); + this.sentences = sentences; this.pending = {}; storm.logToFile('CREATE NEW RandomSentenceSpout'); @@ -28,7 +29,7 @@ RandomSentenceSpout.prototype = Object.create(Spout.prototype); RandomSentenceSpout.prototype.constructor = RandomSentenceSpout; RandomSentenceSpout.prototype.getRandomSentence = function() { - return SENTENCES[getRandomInt(0, SENTENCES.length - 1)]; + return this.sentences[getRandomInt(0, this.sentences.length - 1)]; } RandomSentenceSpout.prototype.nextTuple = function(done) { @@ -57,4 +58,4 @@ function getRandomInt(min, max) { return Math.floor(Math.random() * (max - min + 1)) + min; } -new RandomSentenceSpout().run(); +new RandomSentenceSpout(SENTENCES).run(); From a2e2ab8d99c00e21cc95add674bd8e1a3bc90331 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 15:02:18 +0300 Subject: [PATCH 17/44] Add documentation to random int method --- examples/storm-starter/multilang/resources/randomsentence.js | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index f06049079b1..e689e68d06e 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -54,6 +54,10 @@ RandomSentenceSpout.prototype.fail = function(id, done) { done(); } +/** + * Returns a random integer between min (inclusive) and max (inclusive) + * Using Math.round() will give you a non-uniform distribution! + */ function getRandomInt(min, max) { return Math.floor(Math.random() * (max - min + 1)) + min; } From dc19eb7dd69471dd95d2ef580b6aa07978902da6 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 1 Jul 2014 15:53:37 +0300 Subject: [PATCH 18/44] Use isFirstMessage instead of counting messages --- examples/storm-starter/multilang/resources/storm.js | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 7b8dc001814..2f3e7e29df7 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -12,7 +12,7 @@ function logToFile(msg) { function Storm() { this.lines = []; this.taskIdCallbacks = []; - this.numMessages = 0; + this.isFirstMessage = true; } Storm.prototype.logToFile = function(msg) { @@ -89,10 +89,6 @@ Storm.prototype.storeLine = function(line) { this.lines.push(line); } -Storm.prototype.isFirstMsg = function() { - return (this.numMessages === 0); -} - Storm.prototype.isTaskId = function(msg) { return (msg instanceof Array); } @@ -102,9 +98,10 @@ Storm.prototype.handleNewMessage = function(msg) { this.logToFile('handleNewMessage ' + msg); - if (this.isFirstMsg()) { + if (this.isFirstMessage) { this.logToFile('first message'); this.initSetupInfo(parsedMsg); + this.isFirstMessage = false; } else if (this.isTaskId(parsedMsg)) { this.logToFile('task id'); this.handleNewTaskId(parsedMsg); @@ -112,7 +109,6 @@ Storm.prototype.handleNewMessage = function(msg) { this.logToFile('command'); this.handleNewCommand(parsedMsg); } - this.numMessages++; } Storm.prototype.handleNewTaskId = function(taskId) { From 2d3a511f612e15781a60c357224eb68d1d2f8765 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 10:03:44 +0300 Subject: [PATCH 19/44] Pass the error on fail --- examples/storm-starter/multilang/resources/storm.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 2f3e7e29df7..79b015900d6 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -218,7 +218,7 @@ BasicBolt.prototype.handleNewCommand = function(command) { this.anchorTuple = tup; var callback = function(err) { if (!!err) { - self.fail(tup); + self.fail(tup, err); } self.ack(tup); } @@ -229,7 +229,7 @@ BasicBolt.prototype.ack = function(tup) { this.sendMsgToParent({"command": "ack", "id": tup.id}); } -BasicBolt.prototype.fail = function(tup) { +BasicBolt.prototype.fail = function(tup, err) { this.sendMsgToParent({"command": "fail", "id": tup.id}); } From 920e3830bffebbbc9117a46035f9b3149744cf81 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 10:20:51 +0300 Subject: [PATCH 20/44] Restore pom.xml to the original state --- examples/storm-starter/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml index b93a8465486..0e0d920b1e3 100644 --- a/examples/storm-starter/pom.xml +++ b/examples/storm-starter/pom.xml @@ -70,7 +70,7 @@ storm-core ${project.version} - compile + provided commons-collections From fc920caf0c8ff4eb53538791e6fb8debc303a012 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 10:33:13 +0300 Subject: [PATCH 21/44] Restore original WordCountTopology.java. Add WordCountTopologyNode.java with node example --- .../jvm/storm/starter/WordCountTopology.java | 27 +--- .../storm/starter/WordCountTopologyNode.java | 121 ++++++++++++++++++ 2 files changed, 127 insertions(+), 21 deletions(-) create mode 100644 examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java index e189911d697..39184daa3e8 100644 --- a/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java +++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopology.java @@ -20,9 +20,11 @@ import backtype.storm.Config; import backtype.storm.LocalCluster; import backtype.storm.StormSubmitter; -import backtype.storm.spout.ShellSpout; import backtype.storm.task.ShellBolt; -import backtype.storm.topology.*; +import backtype.storm.topology.BasicOutputCollector; +import backtype.storm.topology.IRichBolt; +import backtype.storm.topology.OutputFieldsDeclarer; +import backtype.storm.topology.TopologyBuilder; import backtype.storm.topology.base.BaseBasicBolt; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Tuple; @@ -39,7 +41,7 @@ public class WordCountTopology { public static class SplitSentence extends ShellBolt implements IRichBolt { public SplitSentence() { - super("node", "splitsentence.js"); + super("python", "splitsentence.py"); } @Override @@ -53,23 +55,6 @@ public Map getComponentConfiguration() { } } - public static class RandomSentence extends ShellSpout implements IRichSpout { - - public RandomSentence() { - super("node", "randomsentence.js"); - } - - @Override - public void declareOutputFields(OutputFieldsDeclarer declarer) { - declarer.declare(new Fields("word")); - } - - @Override - public Map getComponentConfiguration() { - return null; - } - } - public static class WordCount extends BaseBasicBolt { Map counts = new HashMap(); @@ -94,7 +79,7 @@ public static void main(String[] args) throws Exception { TopologyBuilder builder = new TopologyBuilder(); - builder.setSpout("spout", new RandomSentence(), 5); + builder.setSpout("spout", new RandomSentenceSpout(), 5); builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); diff --git a/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java b/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java new file mode 100644 index 00000000000..3fe982f1144 --- /dev/null +++ b/examples/storm-starter/src/jvm/storm/starter/WordCountTopologyNode.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package storm.starter; + +import backtype.storm.Config; +import backtype.storm.LocalCluster; +import backtype.storm.StormSubmitter; +import backtype.storm.spout.ShellSpout; +import backtype.storm.task.ShellBolt; +import backtype.storm.topology.*; +import backtype.storm.topology.base.BaseBasicBolt; +import backtype.storm.tuple.Fields; +import backtype.storm.tuple.Tuple; +import backtype.storm.tuple.Values; + +import java.util.HashMap; +import java.util.Map; + +/** + * This topology demonstrates Storm's stream groupings and multilang capabilities. + */ +public class WordCountTopologyNode { + public static class SplitSentence extends ShellBolt implements IRichBolt { + + public SplitSentence() { + super("node", "splitsentence.js"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + + public static class RandomSentence extends ShellSpout implements IRichSpout { + + public RandomSentence() { + super("node", "randomsentence.js"); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word")); + } + + @Override + public Map getComponentConfiguration() { + return null; + } + } + + public static class WordCount extends BaseBasicBolt { + Map counts = new HashMap(); + + @Override + public void execute(Tuple tuple, BasicOutputCollector collector) { + String word = tuple.getString(0); + Integer count = counts.get(word); + if (count == null) + count = 0; + count++; + counts.put(word, count); + collector.emit(new Values(word, count)); + } + + @Override + public void declareOutputFields(OutputFieldsDeclarer declarer) { + declarer.declare(new Fields("word", "count")); + } + } + + public static void main(String[] args) throws Exception { + + TopologyBuilder builder = new TopologyBuilder(); + + builder.setSpout("spout", new RandomSentence(), 5); + + builder.setBolt("split", new SplitSentence(), 8).shuffleGrouping("spout"); + builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word")); + + Config conf = new Config(); + conf.setDebug(true); + + + if (args != null && args.length > 0) { + conf.setNumWorkers(3); + + StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology()); + } + else { + conf.setMaxTaskParallelism(3); + + LocalCluster cluster = new LocalCluster(); + cluster.submitTopology("word-count", conf, builder.createTopology()); + + Thread.sleep(10000); + + cluster.shutdown(); + } + } +} From fe2d0d6c44ca6a9b6d9c8c7cf9243dd577f3a74e Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 11:56:43 +0300 Subject: [PATCH 22/44] Delete xml that wasn't here on original storm --- storm-core/dependency-reduced-pom.xml | 366 -------------------------- 1 file changed, 366 deletions(-) delete mode 100644 storm-core/dependency-reduced-pom.xml diff --git a/storm-core/dependency-reduced-pom.xml b/storm-core/dependency-reduced-pom.xml deleted file mode 100644 index b79fa7e03b0..00000000000 --- a/storm-core/dependency-reduced-pom.xml +++ /dev/null @@ -1,366 +0,0 @@ - - - - storm - org.apache.storm - 0.9.2-incubating - - 4.0.0 - org.apache.storm - storm-core - Storm Core - Storm Core Java API and Clojure implementation. - - src/jvm - test/jvm - - - ../conf - - - META-INF - ../ - - NOTICE - - - - - - src/dev - - - test/resources - - - - - com.theoryinpractise - clojure-maven-plugin - true - - - compile-clojure - compile - - compile - - - - test-clojure - test - - test-with-junit - - - ${test.extra.args} - - - - - - src/clj - - - test/clj - - false - true - - none - - - - - maven-surefire-report-plugin - - - ${project.build.directory}/test-reports - - - - - maven-shade-plugin - 2.2 - - - package - - shade - - - - - - org.apache.storm - maven-shade-clojure-transformer - ${project.version} - - - - true - false - true - false - - - org.apache.thrift:* - org.apache.storm:* - - - - - org.apache.thrift - org.apache.thrift7 - - - - - - - - org.apache.thrift:* - - META-INF/LICENSE.txt - META-INF/NOTICE.txt - - - - - - - - - - org.clojure - clojure - 1.5.1 - compile - - - clj-time - clj-time - 0.4.1 - compile - - - compojure - compojure - 1.1.3 - compile - - - hiccup - hiccup - 0.3.6 - compile - - - ring - ring-devel - 0.3.11 - compile - - - ring - ring-jetty-adapter - 0.3.11 - compile - - - org.clojure - tools.logging - 0.2.3 - compile - - - org.clojure - math.numeric-tower - 0.0.1 - compile - - - org.clojure - tools.cli - 0.2.4 - compile - - - org.clojure - tools.nrepl - 0.2.3 - test - - - clojure - org.clojure - - - - - clojure-complete - clojure-complete - 0.2.3 - test - - - clojure - org.clojure - - - - - commons-io - commons-io - 2.4 - compile - - - org.apache.commons - commons-exec - 1.1 - compile - - - commons-lang - commons-lang - 2.5 - compile - - - org.apache.thrift - libthrift - 0.7.0 - provided - - - slf4j-api - org.slf4j - - - servlet-api - javax.servlet - - - - - org.apache.curator - curator-framework - 2.4.0 - compile - - - log4j - log4j - - - slf4j-log4j12 - org.slf4j - - - - - com.googlecode.json-simple - json-simple - 1.1 - compile - - - com.twitter - carbonite - 1.4.0 - compile - - - org.yaml - snakeyaml - 1.11 - compile - - - org.apache.httpcomponents - httpclient - 4.3.3 - compile - - - com.googlecode.disruptor - disruptor - 2.10.1 - compile - - - org.jgrapht - jgrapht-core - 0.9.0 - compile - - - com.google.guava - guava - 13.0 - compile - - - ch.qos.logback - logback-classic - 1.0.6 - compile - - - org.slf4j - log4j-over-slf4j - 1.6.6 - compile - - - io.netty - netty - 3.6.3.Final - compile - - - org.mockito - mockito-all - 1.9.5 - test - - - org.clojars.runa - conjure - 2.1.3 - test - - - junit - junit - 4.1 - test - - - reply - reply - 0.3.0 - provided - - - cd-client - org.thnetos - - - drawbridge - com.cemerick - - - versioneer - trptcolin - - - sjacket - org.clojars.trptcolin - - - - - - From 5603beea5ccee114e24e139f523dd8516330f5dc Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 12:24:59 +0300 Subject: [PATCH 23/44] Changen parameter name - taskId to taskI(ds (because its a list). Add comments explaining task id callbacks functionality --- .../multilang/resources/storm.js | 21 ++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 79b015900d6..6e45bcdfac5 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -11,7 +11,7 @@ function logToFile(msg) { function Storm() { this.lines = []; - this.taskIdCallbacks = []; + this.taskIdsCallbacks = []; this.isFirstMessage = true; } @@ -89,7 +89,7 @@ Storm.prototype.storeLine = function(line) { this.lines.push(line); } -Storm.prototype.isTaskId = function(msg) { +Storm.prototype.isTaskIds = function(msg) { return (msg instanceof Array); } @@ -102,7 +102,7 @@ Storm.prototype.handleNewMessage = function(msg) { this.logToFile('first message'); this.initSetupInfo(parsedMsg); this.isFirstMessage = false; - } else if (this.isTaskId(parsedMsg)) { + } else if (this.isTaskIds(parsedMsg)) { this.logToFile('task id'); this.handleNewTaskId(parsedMsg); } else { @@ -111,15 +111,22 @@ Storm.prototype.handleNewMessage = function(msg) { } } -Storm.prototype.handleNewTaskId = function(taskId) { - var callback = this.taskIdCallbacks.shift(); +Storm.prototype.handleNewTaskId = function(taskIds) { + //When new list of task ids arrives, the callback that was passed with the corresponding emit should be called. + //Storm assures that the task ids will be sent in the same order as their corresponding emits so it we can simply + //take the first callback in the list and be sure it is the right one. + + var callback = this.taskIdsCallbacks.shift(); if (callback) { - callback(taskId); + callback(taskIds); } } Storm.prototype.emit = function(tup, stream, id, directTask, callback) { - this.taskIdCallbacks.push(callback); + //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible + //through the callback (will be called when the response arrives). The callback is stored in a list until the + //corresponding task id list arrives. + this.taskIdsCallbacks.push(callback); this.__emit(tup, stream, id, directTask); } From f8522f8eaa6cb4f42d15580ac80750c32b73e880 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 13:49:26 +0300 Subject: [PATCH 24/44] Change taskId to taskIds --- examples/storm-starter/multilang/resources/splitsentence.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index 50b3d08d253..e393f50a8ef 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -16,8 +16,8 @@ SplitSentenceBolt.prototype.process = function(tup, callback) { var self = this; var words = tup.values[0].split(" "); words.forEach(function(word) { - self.emit([word], null, null, null, function(taskId) { - storm.logToFile('Task id - ' + JSON.stringify(taskId) + ' work - ' + word); + self.emit([word], null, null, null, function(taskIds) { + storm.logToFile('Task id - ' + JSON.stringify(taskIds) + ' work - ' + word); }); }); callback(); From 47a426b8fee2e85667ab5053d15f7c2289e5cf09 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 13:50:24 +0300 Subject: [PATCH 25/44] In case no callback is passed when emitting, use a default callback --- examples/storm-starter/multilang/resources/storm.js | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 6e45bcdfac5..6d33f06d9a0 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -119,13 +119,26 @@ Storm.prototype.handleNewTaskId = function(taskIds) { var callback = this.taskIdsCallbacks.shift(); if (callback) { callback(taskIds); + } else { + throw new Error('Something went wrong, we off the split of task id callbacks'); } } +Storm.prototype.createDefaultEmitCallback = function(tupleId) { + return function(taskIds) { + logToFile('Tuple ' + tupleId + ' sent to task ids - ' + JSON.stringify(taskIds)); + }; +} + Storm.prototype.emit = function(tup, stream, id, directTask, callback) { //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible //through the callback (will be called when the response arrives). The callback is stored in a list until the //corresponding task id list arrives. + + if (!callback) { + callback = this.createDefaultEmitCallback(id); + } + this.taskIdsCallbacks.push(callback); this.__emit(tup, stream, id, directTask); } From e71cc8c836b2668ed8fd0e4c93f6bcfb0630c1eb Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 15:13:38 +0300 Subject: [PATCH 26/44] Remove dead code --- .../multilang/resources/storm.js | 27 ------------------- 1 file changed, 27 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 6d33f06d9a0..a7a4c2d29b1 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -165,33 +165,6 @@ function Tuple(id, component, stream, task, values) { this.task = task; this.values = values; } -// def __repr__(self): -// return '<%s%s>' % ( -// self.__class__.__name__, -// ''.join(' %s=%r' % (k, self.__dict__[k]) for k in sorted(self.__dict__.keys()))) - -//function Bolt() {}; -// -//Bolt.prototype.initialize = function(stormconf, context) {}; -// -//Bolt.prototype.process = function(tuple) {}; -// -//Bolt.prototype.run = function() { -// MODE = Bolt -// var setupInfo = initComponent(); -// var conf = setupInfo[0]; -// var context = setupInfo[1]; -// -// this.initialize(conf, context); -// try { -// while (true) { -// var tup = readTuple(); -// this.process(tup); -// } -// } catch(err) { -// log(err); -// } -//} function BasicBolt() { Storm.call(this); From f573112cee359d5a4cfe1c3d8c74dd1b3a1f132e Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 15:18:01 +0300 Subject: [PATCH 27/44] Change callback to done to match common format --- .../storm-starter/multilang/resources/asyncSplitsentence.js | 4 ++-- examples/storm-starter/multilang/resources/splitsentence.js | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index 5e4400f7656..56455c2fa2c 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -15,7 +15,7 @@ function SplitSentenceBolt() { SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); SplitSentenceBolt.prototype.constructor = SplitSentenceBolt; -SplitSentenceBolt.prototype.process = function(tup, callback) { +SplitSentenceBolt.prototype.process = function(tup, done) { var self = this; // Here setTimeout is not really needed, we use it to demonstrate asynchronous code in the process method: @@ -26,7 +26,7 @@ SplitSentenceBolt.prototype.process = function(tup, callback) { storm.logToFile('Task id - ' + JSON.stringify(taskId) + ' work - ' + word); }); }); - callback(); + done(); }, 5000) } diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index e393f50a8ef..f17b4ab7e48 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -12,7 +12,7 @@ function SplitSentenceBolt() { SplitSentenceBolt.prototype = Object.create(BasicBolt.prototype); SplitSentenceBolt.prototype.constructor = SplitSentenceBolt; -SplitSentenceBolt.prototype.process = function(tup, callback) { +SplitSentenceBolt.prototype.process = function(tup, done) { var self = this; var words = tup.values[0].split(" "); words.forEach(function(word) { @@ -20,7 +20,7 @@ SplitSentenceBolt.prototype.process = function(tup, callback) { storm.logToFile('Task id - ' + JSON.stringify(taskIds) + ' work - ' + word); }); }); - callback(); + done(); } new SplitSentenceBolt().run(); \ No newline at end of file From ab1183badfa865ba4b9f6065743b9010bf1a003b Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 18:08:37 +0300 Subject: [PATCH 28/44] Change method name from callback to done --- examples/storm-starter/multilang/resources/storm.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index a7a4c2d29b1..dc5dbb77374 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -147,10 +147,10 @@ Storm.prototype.emitDirect = function(tup, stream, id, directTask) { this.__emit(tup, stream, id, directTask) } -Storm.prototype.initialize = function(conf, context, callback) { +Storm.prototype.initialize = function(conf, context, done) { this.logToFile("CONF: " + JSON.stringify(conf)); this.logToFile("CONTEXT: " + JSON.stringify(context)); - callback(); + done(); } Storm.prototype.run = function() { From c446f91a6ff4f2a8a0baa612e829090049220127 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 18:45:57 +0300 Subject: [PATCH 29/44] Delete unnecessary and illegal log messages (before changing to storm logger) --- .../multilang/resources/asyncSplitsentence.js | 2 +- .../multilang/resources/randomsentence.js | 4 ++-- .../multilang/resources/splitsentence.js | 2 +- .../multilang/resources/storm.js | 22 +++++-------------- 4 files changed, 9 insertions(+), 21 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index 56455c2fa2c..678699ec3aa 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -23,7 +23,7 @@ SplitSentenceBolt.prototype.process = function(tup, done) { var words = tup.values[0].split(" "); words.forEach(function(word) { self.emit([word], null, null, null, function(taskId) { - storm.logToFile('Task id - ' + JSON.stringify(taskId) + ' work - ' + word); + self.logToFile(word + 'Sent to task ids - ' + taskIds); }); }); done(); diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index e689e68d06e..eba04338989 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -43,13 +43,13 @@ RandomSentenceSpout.prototype.nextTuple = function(done) { } RandomSentenceSpout.prototype.ack = function(id, done) { - this.logToFile('RECEIVED ACK - ' + JSON.stringify(id)); + this.logToFile('Received ack for - ' + id); delete this.pending[id]; done(); } RandomSentenceSpout.prototype.fail = function(id, done) { - this.logToFile('RECEIVED FAIL - ' + JSON.stringify(id)); + this.logToFile('Received fail for - ' + id); this.emit(this.pending[id], null, id, null); done(); } diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index f17b4ab7e48..84da5ff6038 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -17,7 +17,7 @@ SplitSentenceBolt.prototype.process = function(tup, done) { var words = tup.values[0].split(" "); words.forEach(function(word) { self.emit([word], null, null, null, function(taskIds) { - storm.logToFile('Task id - ' + JSON.stringify(taskIds) + ' work - ' + word); + self.logToFile(word + 'Sent to task ids - ' + taskIds); }); }); done(); diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index dc5dbb77374..a4a01f63149 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -20,7 +20,6 @@ Storm.prototype.logToFile = function(msg) { } Storm.prototype.sendMsgToParent = function(msg) { - logToFile('SEND MESSAGE TO PARENT: ' + JSON.stringify(msg)); var str = JSON.stringify(msg) + '\nend\n'; process.stdout.write(str); } @@ -42,7 +41,6 @@ Storm.prototype.log = function(msg) { Storm.prototype.initSetupInfo = function(setupInfo) { var self = this; var callback = function() { - self.logToFile('Inside initialize callback, sending pid.') self.sendpid(setupInfo['pidDir']); } this.initialize(setupInfo['conf'], setupInfo['context'], callback); @@ -50,7 +48,7 @@ Storm.prototype.initSetupInfo = function(setupInfo) { Storm.prototype.startReadingInput = function() { var self = this; - this.logToFile('startReadingInput'); + this.logToFile('Start reading input from stdin.'); process.stdin.on('readable', function() { var chunk = process.stdin.read(); @@ -65,10 +63,7 @@ Storm.prototype.startReadingInput = function() { } Storm.prototype.handleNewLine = function(line) { - this.logToFile('handleNewLine LINE: ' + line); - if (line === 'end') { - this.logToFile('MESSAGE READY!!\n'); var msg = this.collectMessageLines(); this.cleanLines(); this.handleNewMessage(msg); @@ -96,17 +91,14 @@ Storm.prototype.isTaskIds = function(msg) { Storm.prototype.handleNewMessage = function(msg) { var parsedMsg = JSON.parse(msg); - this.logToFile('handleNewMessage ' + msg); - if (this.isFirstMessage) { - this.logToFile('first message'); this.initSetupInfo(parsedMsg); this.isFirstMessage = false; } else if (this.isTaskIds(parsedMsg)) { - this.logToFile('task id'); + this.logToFile('New task ids received.'); this.handleNewTaskId(parsedMsg); } else { - this.logToFile('command'); + this.logToFile('New command received.'); this.handleNewCommand(parsedMsg); } } @@ -148,13 +140,11 @@ Storm.prototype.emitDirect = function(tup, stream, id, directTask) { } Storm.prototype.initialize = function(conf, context, done) { - this.logToFile("CONF: " + JSON.stringify(conf)); - this.logToFile("CONTEXT: " + JSON.stringify(context)); done(); } Storm.prototype.run = function() { - this.logToFile('run'); + this.logToFile('Start running'); this.startReadingInput(); } @@ -184,7 +174,6 @@ BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { } if (this.anchorTuple !== null) { - this.logToFile('Anchor tuple id - ' + this.anchorTuple.id); anchors = [this.anchorTuple] } var m = {"command": "emit"}; @@ -194,7 +183,6 @@ BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { } m["anchors"] = anchors.map(function (a) { - self.logToFile('ID - ' + a.id); return a.id; }); @@ -209,7 +197,7 @@ BasicBolt.prototype.handleNewCommand = function(command) { var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); this.anchorTuple = tup; - var callback = function(err) { + var callback = function(err) { if (!!err) { self.fail(tup, err); } From 977a49825f0c3b810a0fe1190d32857b39e56de9 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 19:07:40 +0300 Subject: [PATCH 30/44] Delete logToFile and all references to it. Use storm logging system --- .../multilang/resources/asyncSplitsentence.js | 2 +- .../multilang/resources/randomsentence.js | 6 ++---- .../multilang/resources/splitsentence.js | 2 +- .../multilang/resources/storm.js | 21 ++++--------------- 4 files changed, 8 insertions(+), 23 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index 678699ec3aa..7051d028cd2 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -23,7 +23,7 @@ SplitSentenceBolt.prototype.process = function(tup, done) { var words = tup.values[0].split(" "); words.forEach(function(word) { self.emit([word], null, null, null, function(taskId) { - self.logToFile(word + 'Sent to task ids - ' + taskIds); + self.log(word + 'Sent to task ids - ' + taskIds); }); }); done(); diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index eba04338989..0dae9012a43 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -21,8 +21,6 @@ function RandomSentenceSpout(sentences) { this.runningTupleId = getRandomInt(0,Math.pow(2,16)); this.sentences = sentences; this.pending = {}; - - storm.logToFile('CREATE NEW RandomSentenceSpout'); }; RandomSentenceSpout.prototype = Object.create(Spout.prototype); @@ -43,13 +41,13 @@ RandomSentenceSpout.prototype.nextTuple = function(done) { } RandomSentenceSpout.prototype.ack = function(id, done) { - this.logToFile('Received ack for - ' + id); + this.log('Received ack for - ' + id); delete this.pending[id]; done(); } RandomSentenceSpout.prototype.fail = function(id, done) { - this.logToFile('Received fail for - ' + id); + this.log('Received fail for - ' + id); this.emit(this.pending[id], null, id, null); done(); } diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index 84da5ff6038..8e58b8e52d2 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -17,7 +17,7 @@ SplitSentenceBolt.prototype.process = function(tup, done) { var words = tup.values[0].split(" "); words.forEach(function(word) { self.emit([word], null, null, null, function(taskIds) { - self.logToFile(word + 'Sent to task ids - ' + taskIds); + self.log(word + 'Sent to task ids - ' + taskIds); }); }); done(); diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index a4a01f63149..ad5ebf33921 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -4,21 +4,12 @@ var fs = require('fs'); -function logToFile(msg) { - - fs.appendFileSync('/Users/anya/tmp/storm/log', msg + '\n\n\n'); -} - function Storm() { this.lines = []; this.taskIdsCallbacks = []; this.isFirstMessage = true; } -Storm.prototype.logToFile = function(msg) { - logToFile(this.name + ':\n' + msg); -} - Storm.prototype.sendMsgToParent = function(msg) { var str = JSON.stringify(msg) + '\nend\n'; process.stdout.write(str); @@ -48,7 +39,6 @@ Storm.prototype.initSetupInfo = function(setupInfo) { Storm.prototype.startReadingInput = function() { var self = this; - this.logToFile('Start reading input from stdin.'); process.stdin.on('readable', function() { var chunk = process.stdin.read(); @@ -95,10 +85,10 @@ Storm.prototype.handleNewMessage = function(msg) { this.initSetupInfo(parsedMsg); this.isFirstMessage = false; } else if (this.isTaskIds(parsedMsg)) { - this.logToFile('New task ids received.'); + this.log('New task ids received.'); this.handleNewTaskId(parsedMsg); } else { - this.logToFile('New command received.'); + this.log('New command received.'); this.handleNewCommand(parsedMsg); } } @@ -117,8 +107,9 @@ Storm.prototype.handleNewTaskId = function(taskIds) { } Storm.prototype.createDefaultEmitCallback = function(tupleId) { + var self = this; return function(taskIds) { - logToFile('Tuple ' + tupleId + ' sent to task ids - ' + JSON.stringify(taskIds)); + self.log('Tuple ' + tupleId + ' sent to task ids - ' + JSON.stringify(taskIds)); }; } @@ -144,7 +135,6 @@ Storm.prototype.initialize = function(conf, context, done) { } Storm.prototype.run = function() { - this.logToFile('Start running'); this.startReadingInput(); } @@ -159,7 +149,6 @@ function Tuple(id, component, stream, task, values) { function BasicBolt() { Storm.call(this); this.anchorTuple = null; - this.name = 'BOLT' }; BasicBolt.prototype = Object.create(Storm.prototype); @@ -216,7 +205,6 @@ BasicBolt.prototype.fail = function(tup, err) { function Spout() { Storm.call(this); - this.name = 'SPOUT'; }; Spout.prototype = Object.create(Storm.prototype); Spout.prototype.constructor = Spout; @@ -265,5 +253,4 @@ Spout.prototype.__emit = function(tup, stream, id, directTask) { } module.exports.BasicBolt = BasicBolt; -module.exports.logToFile = logToFile; module.exports.Spout = Spout; \ No newline at end of file From ea7269f31f5c5bb62bd3fc393afa2cf34ec4140d Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 2 Jul 2014 19:24:07 +0300 Subject: [PATCH 31/44] Add documentation --- .../multilang/resources/storm.js | 46 ++++++++++++++++++- 1 file changed, 44 insertions(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index ad5ebf33921..3016e583e54 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -130,6 +130,12 @@ Storm.prototype.emitDirect = function(tup, stream, id, directTask) { this.__emit(tup, stream, id, directTask) } +/** + * Initialize storm component according to the configuration received. + * @param conf configuration object accrding to storm protocol. + * @param context context object according to storm protocol. + * @param done callback. Call this method when finished initializing. + */ Storm.prototype.initialize = function(conf, context, done) { done(); } @@ -146,6 +152,11 @@ function Tuple(id, component, stream, task, values) { this.values = values; } +/** + * Base class for storm bolt. + * To create a bolt implement 'process' method. + * You may also implement initialize method to + */ function BasicBolt() { Storm.call(this); this.anchorTuple = null; @@ -154,8 +165,6 @@ function BasicBolt() { BasicBolt.prototype = Object.create(Storm.prototype); BasicBolt.prototype.constructor = BasicBolt; -BasicBolt.prototype.process = function(tuple, done) {}; - BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { var self = this; if (typeof anchors === 'undefined') { @@ -195,6 +204,14 @@ BasicBolt.prototype.handleNewCommand = function(command) { this.process(tup, callback); } +/** + * Implement this method when creating a bolt. This is the main method the provides the logic of the bolt (what + * should it do?). + * @param tuple the input of the bolt - what to process. + * @param done call this method when done processing. + */ +BasicBolt.prototype.process = function(tuple, done) {}; + BasicBolt.prototype.ack = function(tup) { this.sendMsgToParent({"command": "ack", "id": tup.id}); } @@ -203,16 +220,41 @@ BasicBolt.prototype.fail = function(tup, err) { this.sendMsgToParent({"command": "fail", "id": tup.id}); } + +/** + * Base class for storm spout. + * To create a spout implement the following methods: nextTuple, ack and fail (nextTuple - mandatory, ack and fail + * can stay empty). + * You may also implement initialize method. + * + */ function Spout() { Storm.call(this); }; + Spout.prototype = Object.create(Storm.prototype); + Spout.prototype.constructor = Spout; +/** + * This method will be called when an ack is received for preciously sent tuple. One may implement it. + * @param id The id of the tuple. + * @param done Call this method when finished and ready to receive more tuples. + */ Spout.prototype.ack = function(id, done) {}; +/** + * This method will be called when an fail is received for preciously sent tuple. One may implement it (for example - + * log the failure or send the tuple again). + * @param id The id of the tuple. + * @param done Call this method when finished and ready to receive more tuples. + */ Spout.prototype.fail = function(id, done) {}; +/** + * Method the indicates its time to emit the next tuple. + * @param done call this method when done sending the output. + */ Spout.prototype.nextTuple = function(done) {}; Spout.prototype.handleNewCommand = function(command) { From 36764ba92a0dd9ba9cde1b9f4779b1fb35ff49cb Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 09:55:33 +0300 Subject: [PATCH 32/44] Add documentation, delete unnecessary require --- examples/storm-starter/multilang/resources/storm.js | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 3016e583e54..23ba458eff8 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -1,9 +1,8 @@ /** * Base classes in node-js for storm Bolt and Spout. + * Implements the storm multilang protocol for nodejs. */ -var fs = require('fs'); - function Storm() { this.lines = []; this.taskIdsCallbacks = []; From 6b73c35847fff79a00e5497bb1e90b585c2cfae6 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 10:03:18 +0300 Subject: [PATCH 33/44] Restore fs --- examples/storm-starter/multilang/resources/storm.js | 2 ++ 1 file changed, 2 insertions(+) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 23ba458eff8..e3ef55c964c 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -3,6 +3,8 @@ * Implements the storm multilang protocol for nodejs. */ +var fs = require('fs'); + function Storm() { this.lines = []; this.taskIdsCallbacks = []; From add2e69ca42d55b7ab5e7dee4701db31688618df Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 10:05:15 +0300 Subject: [PATCH 34/44] Fix documentation --- .../storm-starter/multilang/resources/asyncSplitsentence.js | 2 +- examples/storm-starter/multilang/resources/randomsentence.js | 2 +- examples/storm-starter/multilang/resources/splitsentence.js | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index 7051d028cd2..b16084c4e76 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -1,5 +1,5 @@ /** - * Simple example for async bolt. Receives sentence and breaks it into words. + * Example for async bolt. Receives sentence and breaks it into words. * * Created by anya on 6/30/14. */ diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 0dae9012a43..134f9afcc7c 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -1,5 +1,5 @@ /** - * Simple example for storm spout. Emits random sentences. + * Example for storm spout. Emits random sentences. * The original class in java - storm.starter.spout.RandomSentenceSpout. * * Created by anya on 6/26/14. diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index 8e58b8e52d2..0b87a2a637a 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -1,5 +1,5 @@ /** - * Simple Bolt example - receives sentence and breaks it into words. + * Bolt example - receives sentence and breaks it into words. */ var storm = require('./storm'); From d6a659e795c05ba4f5297c3aa21ab61963137a78 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 10:06:05 +0300 Subject: [PATCH 35/44] Fix documentation --- examples/storm-starter/multilang/resources/randomsentence.js | 1 - 1 file changed, 1 deletion(-) diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 134f9afcc7c..296253a933d 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -54,7 +54,6 @@ RandomSentenceSpout.prototype.fail = function(id, done) { /** * Returns a random integer between min (inclusive) and max (inclusive) - * Using Math.round() will give you a non-uniform distribution! */ function getRandomInt(min, max) { return Math.floor(Math.random() * (max - min + 1)) + min; From 20c2225e7e9348e068973cc33097637c479ead84 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 10:07:52 +0300 Subject: [PATCH 36/44] Fix documentation --- examples/storm-starter/multilang/resources/randomsentence.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 296253a933d..1855db77368 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -47,7 +47,7 @@ RandomSentenceSpout.prototype.ack = function(id, done) { } RandomSentenceSpout.prototype.fail = function(id, done) { - this.log('Received fail for - ' + id); + this.log('Received fail for - ' + id + '. Retrying.'); this.emit(this.pending[id], null, id, null); done(); } From 4e3270cf52004855ec7a63b820d19fe7c0fa3212 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 10:11:53 +0300 Subject: [PATCH 37/44] Extract create tuple id into a function --- .../multilang/resources/randomsentence.js | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 1855db77368..8b2e0bd6a40 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -18,7 +18,7 @@ var SENTENCES = [ function RandomSentenceSpout(sentences) { Spout.call(this); - this.runningTupleId = getRandomInt(0,Math.pow(2,16)); + this.runningTupleId = 0; this.sentences = sentences; this.pending = {}; }; @@ -33,13 +33,18 @@ RandomSentenceSpout.prototype.getRandomSentence = function() { RandomSentenceSpout.prototype.nextTuple = function(done) { var sentence = this.getRandomSentence(); var tup = [sentence]; - var id = this.runningTupleId; + var id = this.createNextTupleId(); this.pending[id] = tup; this.emit(tup, null, id, null); - this.runningTupleId++; done(); } +RandomSentenceSpout.prototype.createNextTupleId = function() { + var id = this.runningTupleId; + this.runningTupleId++; + return id; +} + RandomSentenceSpout.prototype.ack = function(id, done) { this.log('Received ack for - ' + id); delete this.pending[id]; From 2184834a1998c91f9e95accb5bda6230723e040e Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 17:10:55 +0300 Subject: [PATCH 38/44] Change emit api - receive json instead of separate parsms --- .../multilang/resources/asyncSplitsentence.js | 4 +- .../multilang/resources/randomsentence.js | 9 +- .../multilang/resources/splitsentence.js | 4 +- .../multilang/resources/storm.js | 86 ++++++++++--------- 4 files changed, 56 insertions(+), 47 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index b16084c4e76..d04f29d1e32 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -22,8 +22,8 @@ SplitSentenceBolt.prototype.process = function(tup, done) { setTimeout(function() { var words = tup.values[0].split(" "); words.forEach(function(word) { - self.emit([word], null, null, null, function(taskId) { - self.log(word + 'Sent to task ids - ' + taskIds); + self.emit({tuple: [word]}, function(taskIds) { + self.log(word + ' sent to task ids - ' + taskIds); }); }); done(); diff --git a/examples/storm-starter/multilang/resources/randomsentence.js b/examples/storm-starter/multilang/resources/randomsentence.js index 8b2e0bd6a40..73893e54540 100644 --- a/examples/storm-starter/multilang/resources/randomsentence.js +++ b/examples/storm-starter/multilang/resources/randomsentence.js @@ -31,11 +31,14 @@ RandomSentenceSpout.prototype.getRandomSentence = function() { } RandomSentenceSpout.prototype.nextTuple = function(done) { + var self = this; var sentence = this.getRandomSentence(); var tup = [sentence]; var id = this.createNextTupleId(); this.pending[id] = tup; - this.emit(tup, null, id, null); + this.emit({tuple: tup, id: id}, function(taskIds) { + self.log(tup + ' sent to task ids - ' + taskIds); + }); done(); } @@ -53,7 +56,9 @@ RandomSentenceSpout.prototype.ack = function(id, done) { RandomSentenceSpout.prototype.fail = function(id, done) { this.log('Received fail for - ' + id + '. Retrying.'); - this.emit(this.pending[id], null, id, null); + this.emit({tuple: this.pending[id], id:id}, function(taskIds) { + self.log(this.pending[id] + ' sent to task ids - ' + taskIds); + }); done(); } diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index 0b87a2a637a..ffa23479c9c 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -16,8 +16,8 @@ SplitSentenceBolt.prototype.process = function(tup, done) { var self = this; var words = tup.values[0].split(" "); words.forEach(function(word) { - self.emit([word], null, null, null, function(taskIds) { - self.log(word + 'Sent to task ids - ' + taskIds); + self.emit({tuple: [word]}, function(taskIds) { + self.log(word + ' sent to task ids - ' + taskIds); }); }); done(); diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index e3ef55c964c..dcd826e219f 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -114,21 +114,27 @@ Storm.prototype.createDefaultEmitCallback = function(tupleId) { }; } -Storm.prototype.emit = function(tup, stream, id, directTask, callback) { +Storm.prototype.emit = function(commandDetails, onTaskIds) { //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible //through the callback (will be called when the response arrives). The callback is stored in a list until the //corresponding task id list arrives. + if (!!commandDetails.task) { + throw new Error('Illegal input - task. To emit to specific task use emit direct!'); + } - if (!callback) { - callback = this.createDefaultEmitCallback(id); + if (!onTaskIds) { + throw new Error('You must pass a onTaskIds callback when using emit!') } - this.taskIdsCallbacks.push(callback); - this.__emit(tup, stream, id, directTask); + this.taskIdsCallbacks.push(onTaskIds); + this.__emit(commandDetails);; } -Storm.prototype.emitDirect = function(tup, stream, id, directTask) { - this.__emit(tup, stream, id, directTask) +Storm.prototype.emitDirect = function(commandDetails) { + if (!commandDetails.task) { + throw new Error("Emit direct must receive task id!") + } + this.__emit(commandDetails); } /** @@ -166,30 +172,29 @@ function BasicBolt() { BasicBolt.prototype = Object.create(Storm.prototype); BasicBolt.prototype.constructor = BasicBolt; -BasicBolt.prototype.__emit = function(tup, stream, anchors, directTask) { +BasicBolt.prototype.emitDirect = function(tup, stream, directTask) { + +} +/** + * + * {tuple, stream, task} + */ +BasicBolt.prototype.__emit = function(commandDetails) { var self = this; - if (typeof anchors === 'undefined') { - anchors = []; - } + var anchors = []; if (this.anchorTuple !== null) { - anchors = [this.anchorTuple] - } - var m = {"command": "emit"}; - - if (typeof stream !== 'undefined') { - m["stream"] = stream + anchors = [this.anchorTuple.id] } + var message = { + command: "emit", + tuple: commandDetails.tuple, + stream: commandDetails.stream, + task: commandDetails.task, + anchors: anchors + }; - m["anchors"] = anchors.map(function (a) { - return a.id; - }); - - if (typeof directTask !== 'undefined') { - m["task"] = directTask; - } - m["tuple"] = tup; - this.sendMsgToParent(m); + this.sendMsgToParent(message); } BasicBolt.prototype.handleNewCommand = function(command) { @@ -277,22 +282,21 @@ Spout.prototype.handleNewCommand = function(command) { } } -Spout.prototype.__emit = function(tup, stream, id, directTask) { - var m = {"command": "emit"}; - if (typeof id !== 'undefined') { - m["id"] = id; - } - - if (typeof stream !== 'undefined') { - m["stream"] = stream; - } - - if (typeof directTask !== 'undefined') { - m["task"] = directTask; - } +/** + * + * tup, stream, id, directTask + * + */ +Spout.prototype.__emit = function(commandDetails) { + var message = { + command: "emit", + tuple: commandDetails.tuple, + id: commandDetails.id, + stream: commandDetails.stream, + task: commandDetails.task + }; - m["tuple"] = tup; - this.sendMsgToParent(m); + this.sendMsgToParent(message); } module.exports.BasicBolt = BasicBolt; From 761b67290514a0879910f28f8c10b0e6d69804ff Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Thu, 3 Jul 2014 17:25:16 +0300 Subject: [PATCH 39/44] Docuemnt emit and emitDirect --- .../multilang/resources/storm.js | 64 ++++++++++++++++--- 1 file changed, 56 insertions(+), 8 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index dcd826e219f..15ba4861e9b 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -114,11 +114,30 @@ Storm.prototype.createDefaultEmitCallback = function(tupleId) { }; } -Storm.prototype.emit = function(commandDetails, onTaskIds) { + +/** + * + * @param messageDetails json with the emit details. + * + * For bolt, the json must contain the required fields: + * - tuple - the value to emit + * and may contain the optional fields: + * - stream (if empty - emit to default stream) + * + * For spout, the json must contain the required fields: + * - tuple - the value to emit + * + * and may contain the optional fields: + * - id - pass id for reliable emit (and receive ack/fail later). + * - stream - if empty - emit to default stream. + * + * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). + */ +Storm.prototype.emit = function(messageDetails, onTaskIds) { //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible //through the callback (will be called when the response arrives). The callback is stored in a list until the //corresponding task id list arrives. - if (!!commandDetails.task) { + if (!!messageDetails.task) { throw new Error('Illegal input - task. To emit to specific task use emit direct!'); } @@ -127,9 +146,29 @@ Storm.prototype.emit = function(commandDetails, onTaskIds) { } this.taskIdsCallbacks.push(onTaskIds); - this.__emit(commandDetails);; + this.__emit(messageDetails);; } + +/** + * Emit message to specific task. + * @param messageDetails json with the emit details. + * + * For bolt, the json must contain the required fields: + * - tuple - the value to emit + * - task - indicate the task to send the tuple to. + * and may contain the optional fields: + * - stream (if empty - emit to default stream) + * + * For spout, the json must contain the required fields: + * - tuple - the value to emit + * - task - indicate the task to send the tuple to. + * and may contain the optional fields: + * - id - pass id for reliable emit (and receive ack/fail later). + * - stream - if empty - emit to default stream. + * + * @param onTaskIds function than will be called with list of task ids the message was emitted to (when received). + */ Storm.prototype.emitDirect = function(commandDetails) { if (!commandDetails.task) { throw new Error("Emit direct must receive task id!") @@ -175,9 +214,15 @@ BasicBolt.prototype.constructor = BasicBolt; BasicBolt.prototype.emitDirect = function(tup, stream, directTask) { } + /** - * - * {tuple, stream, task} + * Emit message. + * @param commandDetails json with the required fields: + * - tuple - the value to emit + * - anchors - list of ids this emit relates to (used for reliability purposes). + * and the optional fields: + * - stream (if empty - emit to default stream) + * - task (pass only to emit to specific task) */ BasicBolt.prototype.__emit = function(commandDetails) { var self = this; @@ -283,9 +328,12 @@ Spout.prototype.handleNewCommand = function(command) { } /** - * - * tup, stream, id, directTask - * + * @param commandDetails json with the required fields: + * - tuple - the value to emit. + * and the optional fields: + * - id - pass id for reliable emit (and receive ack/fail later). + * - stream - if empty - emit to default stream. + * - task - pass only to emit to specific task. */ Spout.prototype.__emit = function(commandDetails) { var message = { From 8e1824d74c3b2a7c7771bf661e3ea296b91b96b9 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Mon, 14 Jul 2014 16:56:34 +0300 Subject: [PATCH 40/44] Delete log messages from storm base classes. Delete unused method --- examples/storm-starter/multilang/resources/storm.js | 9 --------- 1 file changed, 9 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 15ba4861e9b..cbaae55e26f 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -86,10 +86,8 @@ Storm.prototype.handleNewMessage = function(msg) { this.initSetupInfo(parsedMsg); this.isFirstMessage = false; } else if (this.isTaskIds(parsedMsg)) { - this.log('New task ids received.'); this.handleNewTaskId(parsedMsg); } else { - this.log('New command received.'); this.handleNewCommand(parsedMsg); } } @@ -107,13 +105,6 @@ Storm.prototype.handleNewTaskId = function(taskIds) { } } -Storm.prototype.createDefaultEmitCallback = function(tupleId) { - var self = this; - return function(taskIds) { - self.log('Tuple ' + tupleId + ' sent to task ids - ' + JSON.stringify(taskIds)); - }; -} - /** * From 74b0e9f77eff9e8ac192a42fe5d3656995f3131a Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Mon, 14 Jul 2014 17:22:59 +0300 Subject: [PATCH 41/44] Fix anchor tuple bug - dont save it in the bolts stats, pass the responsibility to the process method --- .../multilang/resources/asyncSplitsentence.js | 2 +- .../multilang/resources/splitsentence.js | 2 +- .../storm-starter/multilang/resources/storm.js | 14 +++++++------- 3 files changed, 9 insertions(+), 9 deletions(-) diff --git a/examples/storm-starter/multilang/resources/asyncSplitsentence.js b/examples/storm-starter/multilang/resources/asyncSplitsentence.js index d04f29d1e32..ebeb759d35e 100644 --- a/examples/storm-starter/multilang/resources/asyncSplitsentence.js +++ b/examples/storm-starter/multilang/resources/asyncSplitsentence.js @@ -22,7 +22,7 @@ SplitSentenceBolt.prototype.process = function(tup, done) { setTimeout(function() { var words = tup.values[0].split(" "); words.forEach(function(word) { - self.emit({tuple: [word]}, function(taskIds) { + self.emit({tuple: [word], anchorTupleId: tup.id}, function(taskIds) { self.log(word + ' sent to task ids - ' + taskIds); }); }); diff --git a/examples/storm-starter/multilang/resources/splitsentence.js b/examples/storm-starter/multilang/resources/splitsentence.js index ffa23479c9c..512702ef1c4 100755 --- a/examples/storm-starter/multilang/resources/splitsentence.js +++ b/examples/storm-starter/multilang/resources/splitsentence.js @@ -16,7 +16,7 @@ SplitSentenceBolt.prototype.process = function(tup, done) { var self = this; var words = tup.values[0].split(" "); words.forEach(function(word) { - self.emit({tuple: [word]}, function(taskIds) { + self.emit({tuple: [word], anchorTupleId: tup.id}, function(taskIds) { self.log(word + ' sent to task ids - ' + taskIds); }); }); diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index cbaae55e26f..332ca997e4a 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -112,6 +112,8 @@ Storm.prototype.handleNewTaskId = function(taskIds) { * * For bolt, the json must contain the required fields: * - tuple - the value to emit + * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source + * tuple and return ack when all components successfully finished to process it. * and may contain the optional fields: * - stream (if empty - emit to default stream) * @@ -147,6 +149,8 @@ Storm.prototype.emit = function(messageDetails, onTaskIds) { * * For bolt, the json must contain the required fields: * - tuple - the value to emit + * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source + * tuple and return ack when all components successfully finished to process it. * - task - indicate the task to send the tuple to. * and may contain the optional fields: * - stream (if empty - emit to default stream) @@ -210,7 +214,8 @@ BasicBolt.prototype.emitDirect = function(tup, stream, directTask) { * Emit message. * @param commandDetails json with the required fields: * - tuple - the value to emit - * - anchors - list of ids this emit relates to (used for reliability purposes). + * - anchorTupleId - the value of the anchor tuple (the input tuple that lead to this emit). Used to track the source + * tuple and return ack when all components successfully finished to process it. * and the optional fields: * - stream (if empty - emit to default stream) * - task (pass only to emit to specific task) @@ -218,16 +223,12 @@ BasicBolt.prototype.emitDirect = function(tup, stream, directTask) { BasicBolt.prototype.__emit = function(commandDetails) { var self = this; - var anchors = []; - if (this.anchorTuple !== null) { - anchors = [this.anchorTuple.id] - } var message = { command: "emit", tuple: commandDetails.tuple, stream: commandDetails.stream, task: commandDetails.task, - anchors: anchors + anchors: [commandDetails.anchorTupleId] }; this.sendMsgToParent(message); @@ -236,7 +237,6 @@ BasicBolt.prototype.__emit = function(commandDetails) { BasicBolt.prototype.handleNewCommand = function(command) { var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); - this.anchorTuple = tup; var callback = function(err) { if (!!err) { self.fail(tup, err); From b54e0b66680593200aba2b15a53c2bea3b83ac94 Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 15 Jul 2014 18:04:50 +0300 Subject: [PATCH 42/44] Fix type.\nChange method name to camel case. Change quotes to format for json. Delete git add brain/storm.js\nDelete erroneous method override --- .../multilang/resources/storm.js | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 332ca997e4a..752aba7e841 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -17,12 +17,12 @@ Storm.prototype.sendMsgToParent = function(msg) { } Storm.prototype.sync = function() { - this.sendMsgToParent({'command':'sync'}); + this.sendMsgToParent({"command":"sync"}); } -Storm.prototype.sendpid = function(heartbeatdir) { +Storm.prototype.sendPid = function(heartbeatdir) { var pid = process.pid; - this.sendMsgToParent({'pid':pid}) + this.sendMsgToParent({"pid": pid}) fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); } @@ -33,7 +33,7 @@ Storm.prototype.log = function(msg) { Storm.prototype.initSetupInfo = function(setupInfo) { var self = this; var callback = function() { - self.sendpid(setupInfo['pidDir']); + self.sendPid(setupInfo['pidDir']); } this.initialize(setupInfo['conf'], setupInfo['context'], callback); } @@ -44,7 +44,7 @@ Storm.prototype.startReadingInput = function() { process.stdin.on('readable', function() { var chunk = process.stdin.read(); - if (!!chunk && chunk.length !== 0) { + if (chunk && chunk.length !== 0) { var lines = chunk.toString().split('\n'); lines.forEach(function(line) { self.handleNewLine(line); @@ -130,7 +130,7 @@ Storm.prototype.emit = function(messageDetails, onTaskIds) { //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible //through the callback (will be called when the response arrives). The callback is stored in a list until the //corresponding task id list arrives. - if (!!messageDetails.task) { + if (messageDetails.task) { throw new Error('Illegal input - task. To emit to specific task use emit direct!'); } @@ -206,10 +206,6 @@ function BasicBolt() { BasicBolt.prototype = Object.create(Storm.prototype); BasicBolt.prototype.constructor = BasicBolt; -BasicBolt.prototype.emitDirect = function(tup, stream, directTask) { - -} - /** * Emit message. * @param commandDetails json with the required fields: @@ -238,7 +234,7 @@ BasicBolt.prototype.handleNewCommand = function(command) { var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); var callback = function(err) { - if (!!err) { + if (err) { self.fail(tup, err); } self.ack(tup); @@ -247,7 +243,7 @@ BasicBolt.prototype.handleNewCommand = function(command) { } /** - * Implement this method when creating a bolt. This is the main method the provides the logic of the bolt (what + * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what * should it do?). * @param tuple the input of the bolt - what to process. * @param done call this method when done processing. From 686ec2c0ff5516a60bfb640dba01aa1b54f64afe Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Tue, 15 Jul 2014 18:04:50 +0300 Subject: [PATCH 43/44] Fix type. Change method name to camel case. Change quotes to format for json. Delete git add brain/storm.js. Delete erroneous method override --- .../multilang/resources/storm.js | 20 ++++++++----------- 1 file changed, 8 insertions(+), 12 deletions(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 332ca997e4a..752aba7e841 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -17,12 +17,12 @@ Storm.prototype.sendMsgToParent = function(msg) { } Storm.prototype.sync = function() { - this.sendMsgToParent({'command':'sync'}); + this.sendMsgToParent({"command":"sync"}); } -Storm.prototype.sendpid = function(heartbeatdir) { +Storm.prototype.sendPid = function(heartbeatdir) { var pid = process.pid; - this.sendMsgToParent({'pid':pid}) + this.sendMsgToParent({"pid": pid}) fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); } @@ -33,7 +33,7 @@ Storm.prototype.log = function(msg) { Storm.prototype.initSetupInfo = function(setupInfo) { var self = this; var callback = function() { - self.sendpid(setupInfo['pidDir']); + self.sendPid(setupInfo['pidDir']); } this.initialize(setupInfo['conf'], setupInfo['context'], callback); } @@ -44,7 +44,7 @@ Storm.prototype.startReadingInput = function() { process.stdin.on('readable', function() { var chunk = process.stdin.read(); - if (!!chunk && chunk.length !== 0) { + if (chunk && chunk.length !== 0) { var lines = chunk.toString().split('\n'); lines.forEach(function(line) { self.handleNewLine(line); @@ -130,7 +130,7 @@ Storm.prototype.emit = function(messageDetails, onTaskIds) { //Every emit triggers a response - list of task ids to which the tuple was emitted. The task ids are accessible //through the callback (will be called when the response arrives). The callback is stored in a list until the //corresponding task id list arrives. - if (!!messageDetails.task) { + if (messageDetails.task) { throw new Error('Illegal input - task. To emit to specific task use emit direct!'); } @@ -206,10 +206,6 @@ function BasicBolt() { BasicBolt.prototype = Object.create(Storm.prototype); BasicBolt.prototype.constructor = BasicBolt; -BasicBolt.prototype.emitDirect = function(tup, stream, directTask) { - -} - /** * Emit message. * @param commandDetails json with the required fields: @@ -238,7 +234,7 @@ BasicBolt.prototype.handleNewCommand = function(command) { var self = this; var tup = new Tuple(command["id"], command["comp"], command["stream"], command["task"], command["tuple"]); var callback = function(err) { - if (!!err) { + if (err) { self.fail(tup, err); } self.ack(tup); @@ -247,7 +243,7 @@ BasicBolt.prototype.handleNewCommand = function(command) { } /** - * Implement this method when creating a bolt. This is the main method the provides the logic of the bolt (what + * Implement this method when creating a bolt. This is the main method that provides the logic of the bolt (what * should it do?). * @param tuple the input of the bolt - what to process. * @param done call this method when done processing. From 945495c2ba1293d612aaf00d4a55a570ed7bd8ac Mon Sep 17 00:00:00 2001 From: Anya Tchernishov Date: Wed, 16 Jul 2014 19:52:21 +0300 Subject: [PATCH 44/44] Fix bug in storm spout/bolt initialization - first write a file names as the pid number and then send the pid to parent --- examples/storm-starter/multilang/resources/storm.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/examples/storm-starter/multilang/resources/storm.js b/examples/storm-starter/multilang/resources/storm.js index 752aba7e841..9b319cb034b 100755 --- a/examples/storm-starter/multilang/resources/storm.js +++ b/examples/storm-starter/multilang/resources/storm.js @@ -22,8 +22,8 @@ Storm.prototype.sync = function() { Storm.prototype.sendPid = function(heartbeatdir) { var pid = process.pid; - this.sendMsgToParent({"pid": pid}) fs.closeSync(fs.openSync(heartbeatdir + "/" + pid, "w")); + this.sendMsgToParent({"pid": pid}) } Storm.prototype.log = function(msg) {