Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 0 additions & 25 deletions tutorials/maprapp/README.md

This file was deleted.

41 changes: 0 additions & 41 deletions tutorials/maprapp/src/main/java/com/datatorrent/maprapp/Data.java

This file was deleted.

41 changes: 41 additions & 0 deletions tutorials/maprstreams/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
MapR Streams provides a way to deliver messages from a range of data
producer types (for instance IoT sensors, machine logs, clickstream
data) to consumers that include but are not limited to real-time or
near real-time processing applications. This uses the same API as of
Apache Kafka 0.9.
MapR-DB is a proprietary product from MapR and is supposed to use the
same API as of HBase. In fact both MapR Streams and MapR DB can be used
interchangeably with Kafka Streams and HBase respectively.
This sample application show how to read log data from a MapR Streams
using Kafka (0.9) input operator and write them out to MapR DB using
HBase output operator.

The purpose of this application is to demonstrate that the Kafka input
operator and HBase output operator in Apache Apex Malhar library can
be used for pulling data from MapR streams and writing into MapR DB.

Note: When using MapR streams or MapR DB in this application,
appropriate dependencies would need to be added in the pom.xml.
Additionally, the application needs to be configured and is not usable
as is.

A sample operator to parse JSON formatted data into POJO has been
inserted into this pipeline. Other processing operators can be
introduced depending upon the requirements.

###### MapR Streams Properties

1. Specifying topic in MapR Streams. Please note the name of topic starts
with Stream file path, followed by ":" and then Topic name.

Property > dt.application.MaprStreamsApp.operator.Streams.prop.topics
Sample Value > **/data/streams/sample-stream:sample-topic**

2. MapR Streams Clusters running at

Property > dt.application.MaprStreamsToMaprDB.operator.Streams.prop.clusters
Sample Value > **broker1.dtlab.com:9092**

###### MapR DB Properties

HBase output operator as configured in the Application.java file.
6 changes: 3 additions & 3 deletions tutorials/maprapp/pom.xml → tutorials/maprstreams/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@

<groupId>com.datatorrent</groupId>
<version>1.0-SNAPSHOT</version>
<artifactId>maprapp</artifactId>
<artifactId>maprstreams</artifactId>
<packaging>jar</packaging>

<!-- change these to the appropriate values -->
<name>MapR Streams to MapR DB</name>
<description>MapR Streams to MapR DB Application</description>
<name>MapR Streams Example App</name>
<description>MapR Streams Example App</description>

<properties>
<!-- change this if you desire to use a different version of Apex Core -->
Expand Down
Original file line number Diff line number Diff line change
@@ -1,31 +1,47 @@
/**
* Put your copyright and license info here.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.maprapp;

import java.util.ArrayList;
import java.util.List;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;

import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.contrib.hbase.HBaseFieldInfo;
import com.datatorrent.contrib.hbase.HBasePOJOPutOperator;
import com.datatorrent.contrib.hbase.HBaseStore;
import com.datatorrent.contrib.parser.JsonParser;
import com.datatorrent.lib.util.FieldInfo;
import com.datatorrent.lib.util.TableInfo;
import org.apache.apex.malhar.kafka.KafkaSinglePortInputOperator;
import org.apache.hadoop.conf.Configuration;

import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.DAG.Locality;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;

@ApplicationAnnotation(name="MaprStreamsToMaprDB")
@ApplicationAnnotation(name="MaprStreamsApp")
public class Application implements StreamingApplication
{
private void createHBaseTable(HBaseStore store)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.maprapp;

/**
* Data: POJO for data
*/
public class Data
{
private int id;
private String name;
private String message;

public int getId()
{
return id;
}

public void setId(int id)
{
this.id = id;
}

public String getName()
{
return name;
}

public void setName(String name)
{
this.name = name;
}

public String getMessage()
{
return message;
}

public void setMessage(String message)
{
this.message = message;
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,20 @@
/**
* Put your copyright and license info here.
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package com.datatorrent.maprapp;

Expand All @@ -8,12 +23,11 @@
import javax.validation.ConstraintViolationException;

import org.junit.Assert;
import org.junit.Test;

import org.apache.hadoop.conf.Configuration;
import org.junit.Test;

import com.datatorrent.api.LocalMode;
import com.datatorrent.maprapp.Application;

/**
* Test the DAG declaration in local mode.
Expand Down