Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,25 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.tools.ant.types.CommandlineJava.SysProperties;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
import com.datatorrent.lib.algo.UniqueCounter;
import com.datatorrent.lib.converter.MapToKeyHashValuePairConverter;
import com.datatorrent.lib.math.ChangeAlertKeyVal;
import com.datatorrent.lib.math.SumKeyVal;
import com.datatorrent.lib.stream.StreamMerger;
import com.datatorrent.lib.util.KeyValPair;
import com.dt.weather.constants.WeatherConstants;
import com.dt.weather.converter.DefaultConverter;
import com.dt.weather.converter.UniquesConverter;
import com.dt.weather.convertor.WeatherEventConvertor;
import com.dt.weather.counter.ChangeAlert;
import com.dt.weather.event.convertor.WeatherEventConvertor;
import com.dt.weather.input.SimpleFileReader;

@ApplicationAnnotation(name = "WeatherApp")
Expand Down Expand Up @@ -56,15 +60,24 @@ public void populateDAG(DAG dag, Configuration conf)
UniquesConverter uniqConv = dag.addOperator("UniqConv", new UniquesConverter());

/*Add the uniques */
UniqueCounter<KeyValPair<Integer, Integer>> uniqCount = dag.addOperator("UniquesCounter",
new UniqueCounter<KeyValPair<Integer, Integer>>());
UniqueCounter<KeyValPair<String, Integer>> uniqCount = dag.addOperator("UniquesCounter",
new UniqueCounter<KeyValPair<String, Integer>>());
// @SuppressWarnings("rawtypes")
MapToKeyHashValuePairConverter<KeyValPair<Integer, Integer>, Integer> converter = dag.addOperator("converter",
MapToKeyHashValuePairConverter<KeyValPair<String, Integer>, Integer> converter = dag.addOperator("converter",
new MapToKeyHashValuePairConverter());
uniqCount.setCumulative(true);
uniqCount.setCumulative(false);
dag.setAttribute(uniqCount, Context.OperatorContext.PARTITIONER,
new StatelessPartitioner<UniqueCounter<Integer>>(1));

ChangeAlert<String, Integer> changeNotifier = dag.addOperator("ChangeNotifier", new ChangeAlert<String, Integer>());

changeNotifier.setPercentThreshold(1);

//TODO- add the partitioner code snippet here
//TODO - change the locality of the converter with the uniqCounter

//Add the overall counter
SumKeyVal<Integer, Integer> counter = dag.addOperator("GlobalCounter", new SumKeyVal<Integer, Integer>());
SumKeyVal<String, Integer> counter = dag.addOperator("GlobalCounter", new SumKeyVal<String, Integer>());
counter.setType(Integer.class);
counter.setCumulative(true);

Expand All @@ -91,11 +104,13 @@ public void populateDAG(DAG dag, Configuration conf)

dag.addStream("Global Counter", eventConvertor.output, counter.data);

dag.addStream("UniquesConv", uniqCount.count, converter.input);
dag.addStream("UniquesConv", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);

dag.addStream("KafkaConv", converter.output, uniqConv.data).setLocality(Locality.CONTAINER_LOCAL);

dag.addStream("KafkaConv", converter.output, uniqConv.data);
dag.addStream("Alerter", uniqConv.output, changeNotifier.data).setLocality(Locality.CONTAINER_LOCAL);

dag.addStream("kafkaUniqWriter", uniqConv.output, kafkaOutputOperator.inputPort).setLocality(
dag.addStream("kafkaUniqWriter", changeNotifier.alert, kafkaOutputOperator.inputPort).setLocality(
Locality.CONTAINER_LOCAL);

dag.addStream("KafkaOutputGlobalCounts", counter.sum, defaultConv.data);
Expand Down
155 changes: 155 additions & 0 deletions WeatherStreamingApp/src/main/java/com/dt/weather/app/WeatherApp.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,155 @@
/**
* Put your copyright and license info here.
*/
package com.dt.weather.app;

import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.Map.Entry;
import java.util.Properties;

import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.Context;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.LocalMode;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.common.partitioner.StatelessPartitioner;
import com.datatorrent.contrib.kafka.KafkaSinglePortOutputOperator;
import com.datatorrent.stram.engine.PortContext;
import com.dt.weather.constants.WeatherConstants;
import com.dt.weather.converter.OutputConverter;
import com.dt.weather.counter.KeyValChangeAggregator;
import com.dt.weather.counter.KeyValChangeAlert;
import com.dt.weather.event.convertor.SinglePortWeatherEventConvertor;
import com.dt.weather.input.SimpleFileReader;

@ApplicationAnnotation(name = "WeatherApp")
public class WeatherApp implements StreamingApplication
{

@Override
public void populateDAG(DAG dag, Configuration conf)
{
dag.setAttribute(DAG.APPLICATION_NAME, conf.get(WeatherConstants.APP_NAME, "WeatherApp"));

/*Add the File reader*/

SimpleFileReader fileReader = dag.addOperator("FileReader", new SimpleFileReader());

fileReader.setDirectory(conf.get(WeatherConstants.INPUT_DIRECTORY_PATH,
"/Users/dev/workspace/mydtapp/src/test/resources/data/"));

//Uncomment the rename logic in simple file reader when the regex works
// fileReader.getScanner().setFilePatternRegexp("\\*.json");

fileReader.setScanIntervalMillis(0);
fileReader.setEmitBatchSize(1);

/*Add the Event convertor*/

SinglePortWeatherEventConvertor eventConvertor = dag.addOperator("WeatherEventConv",
new SinglePortWeatherEventConvertor());

//Add the overall counter
KeyValChangeAggregator<String, Integer> counter = dag.addOperator("GlobalCounter",
new KeyValChangeAggregator<String, Integer>());
counter.setType(Integer.class);
counter.setCumulative(true);
dag.setAttribute(counter, Context.OperatorContext.PARTITIONER,
new StatelessPartitioner<KeyValChangeAggregator<String, Integer>>(3));

OutputConverter<String, Integer> opConv = dag.addOperator("Converter", new OutputConverter<String, Integer>());

//Kafka output's
KafkaSinglePortOutputOperator<Object, Object> kafkaOutputOperator = dag.addOperator("KafkaOutputUniques",
new KafkaSinglePortOutputOperator<Object, Object>());
kafkaOutputOperator.setConfigProperties(getProducerProperties(conf));

kafkaOutputOperator.setTopic(conf.get(WeatherConstants.TOPIC, "counter"));

/*Assemble the DAG*/

dag.addStream("InputRecords", fileReader.output, eventConvertor.data).setLocality(Locality.THREAD_LOCAL);

dag.addStream("Global Counter", eventConvertor.output, counter.data);

dag.addStream("Unifier Output", counter.alert, opConv.data);

dag.addStream("Convert Output", opConv.output, kafkaOutputOperator.inputPort);

}

private Properties getProducerProperties(Configuration conf)
{
String brokerList = conf.get(WeatherConstants.BROKERSET, "localhost:9092");
String metaData = conf.get(WeatherConstants.META_DATA_REFRESH, "60000");
// TODO: get rid of hard coded keys
Properties props = new Properties();
props.put("metadata.broker.list", brokerList);
props.put("topic.metadata.refresh.interval.ms", metaData);
props.setProperty("producer.type", "async");
props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
props.setProperty("queue.buffering.max.ms", "200");
props.setProperty("queue.buffering.max.messages", "10");
props.setProperty("batch.num.messages", "5");

return props;
}

public static Configuration readPropertiesFile(String fileName)
{
Configuration config = new Configuration(false);

Properties prop = new Properties();
InputStream input = null;

try {

input = new FileInputStream(fileName);

prop.load(input);

for (Entry<Object, Object> entry : prop.entrySet()) {
String key = entry.getKey().toString();
String value = entry.getValue().toString();
config.set(key, value);

}

} catch (IOException ex) {
ex.printStackTrace();
} finally {
if (input != null) {
try {
input.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

return config;
}

public static void main(String[] args) throws Exception
{

LocalMode lma = LocalMode.newInstance();
// Configuration conf = new Configuration(false);

Configuration conf = new Application()
.readPropertiesFile("/Users/dev/workspace/mydtapp/src/test/resources/localmode.properties");
// conf.addResource(.getClass().getResourceAsStream("/META-INF/properties.xml"));

lma.prepareDAG(new Application(), conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
lc.run(10000);

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ public class WeatherConstants
public static final String BROKERSET = "dt.brokerSet";
public static final String META_DATA_REFRESH = "metadataRefresh";
public static final String TOPIC = "dt.topic.name";
public static final String WEATHER_DESC = "description";
public static final String RECORD_SEPARATOR = ",";
public static final String TUPLE_SEPARATOR = ":";


}
Original file line number Diff line number Diff line change
@@ -1,32 +1,95 @@
package com.dt.weather.converter;

import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.datatorrent.api.DefaultInputPort;
import com.datatorrent.api.DefaultOutputPort;
import com.datatorrent.common.util.BaseOperator;
import com.datatorrent.lib.util.KeyValPair;
import com.dt.weather.constants.WeatherConstants;

public class DefaultConverter extends BaseOperator
{
private static final Logger LOG = LoggerFactory.getLogger(DefaultConverter.class);

private static transient List<KeyValPair<String, Integer>> defaultCache = null;

public final transient DefaultInputPort<KeyValPair<Integer,Integer>> data = new DefaultInputPort<KeyValPair<Integer,Integer>>()
{
@Override
public void process(KeyValPair<Integer,Integer> tuple)
{

if(tuple!=null){

String str = "<Global," + tuple.getKey() + "," + tuple.getValue() +" >";

output.emit(str);
}

}
};

public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();
}
private volatile int sum =0;

public DefaultConverter()
{
defaultCache = new ArrayList<KeyValPair<String, Integer>>();
defaultCache.add(new KeyValPair<String, Integer>("Unique descritions", 0));
}

public final transient DefaultInputPort<KeyValPair<String, Integer>> data = new DefaultInputPort<KeyValPair<String, Integer>>()
{
@Override
public void process(KeyValPair<String, Integer> tuple)
{
//Add the incoming tuples to the list
//Flush the list when you reach end window
if (tuple != null) {
defaultCache.add(tuple);
}

}
};

public final transient DefaultOutputPort<String> output = new DefaultOutputPort<String>();

@Override
public void beginWindow(long windowId)
{


defaultCache = new ArrayList<KeyValPair<String,Integer>>();

}

/**
* {@inheritDoc}
*/
@Override
public void endWindow()
{
ListIterator itr = defaultCache.listIterator();

StringBuilder otuple = new StringBuilder();

otuple.append("[");
otuple.append("time:"+(System.currentTimeMillis()/1000));


while(itr.hasNext()){
otuple.append(WeatherConstants.RECORD_SEPARATOR);
KeyValPair<String, Integer> opair = (KeyValPair<String, Integer>)itr.next();

sum +=opair.getValue();
otuple.append(opair.getKey()+WeatherConstants.TUPLE_SEPARATOR+opair.getValue());
}

// otuple.append(WeatherConstants.RECORD_SEPARATOR);
// otuple.append("Unique Descritptions:"+defaultCache.size());
//
otuple.append("]");

output.emit(otuple.toString());

defaultCache.clear();
}

/**
* {@inheritDoc}
*/
@Override
public void teardown()
{
}

}
Loading