-
Notifications
You must be signed in to change notification settings - Fork 263
hbase scalding Store based on maple/storehaus #404
base: develop
Are you sure you want to change the base?
Changes from all commits
1db3921
13e7a6f
0555010
e488903
849b748
f1b3998
205be31
6fbe3cd
c56b2a4
ff90ee7
cfb97d2
ff0f97d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,103 @@ | ||
| package com.twitter.summingbird.example | ||
|
|
||
| import com.twitter.scalding.{ Hdfs, TextLine } | ||
| import com.twitter.summingbird.batch.{ Batcher, Timestamp, BatchID } | ||
| import com.twitter.summingbird.scalding.{ InitialBatchedStore, Scalding, HBaseVersionedStore } | ||
| import com.twitter.summingbird.scalding.state.HDFSState | ||
| import com.twitter.summingbird.scalding.store.VersionedStore | ||
| import com.twitter.summingbird.{ Platform, Producer, TimeExtractor } | ||
| import com.twitter.storehaus.ReadableStore | ||
| import com.twitter.util.Await | ||
| import java.util.Date | ||
| import org.apache.hadoop.conf.Configuration | ||
|
|
||
| /** | ||
| * The following object contains code to execute a similar Scalding | ||
| * job to the WordCount job defined in ExampleJob.scala. This job works | ||
| * on plain text files, as opposed to tweet Status objects. | ||
| * The example job uses a Store on top of HBase. This does require you to | ||
| * set up a local running hbase with zookeeper. | ||
| * | ||
| * @author Josh Buffum | ||
| * @author Riju Kallivalappil | ||
| */ | ||
|
|
||
| object ScaldingRunner { | ||
| final val MillisInHour = 60 * 60 * 1000 | ||
|
|
||
| /** | ||
| * Directory location to store state and read input file. | ||
| */ | ||
| final val JobDir = "/user/mydir/wordcount" | ||
|
|
||
| /** | ||
| * pull in the serialization injections and WordCount job | ||
| */ | ||
| import Serialization._ | ||
|
|
||
| implicit val batcher = Batcher.ofHours(1) | ||
|
|
||
| // taken from ExampleJob | ||
| def tokenize(text: String) : TraversableOnce[String] = | ||
| text.toLowerCase | ||
| .replaceAll("[^a-zA-Z0-9\\s]", "") | ||
| .split("\\s+") | ||
|
|
||
| /** | ||
| * The actual Summingbird job. Works against text instead of tweet Status | ||
| */ | ||
| def wordCount[P <: Platform[P]](source: Producer[P, String], store: P#Store[String, Long]) = { | ||
| source | ||
| .filter(_ != null) | ||
| .flatMap { text: String => tokenize(text).map(_ -> 1L) } | ||
| .sumByKey(store) | ||
| } | ||
|
|
||
| // Always use an hour before the current time as the batch id. | ||
| // The storm job uses the current hour. This way we can get the "merger" to work across 2 batches | ||
| implicit val timeOf: TimeExtractor[String] = TimeExtractor(_ => new Date().getTime - MillisInHour) | ||
|
|
||
| val now = System.currentTimeMillis | ||
| val waitingState = HDFSState(JobDir + "/waitstate", startTime = Some(Timestamp(now - 2 * MillisInHour)), | ||
| numBatches = 3) | ||
|
|
||
| // read text lines in input.txt as job input | ||
| val src = Producer.source[Scalding, String](Scalding.pipeFactoryExact(_ => TextLine(JobDir + "/input.txt"))) | ||
|
|
||
| /** | ||
| * Create the HBaseVersionedStore. Results from the Scalding job will be written | ||
| * as String => (BatchID, Long) pairs into a HBase cluster defined in a Zookeeper | ||
| * quorum at "localhost" in a table "wordcountJob" | ||
| */ | ||
| val versionedStore = HBaseVersionedStore[String, Long] ( | ||
| Seq("localhost"), | ||
| "wordcountJob" | ||
| ) | ||
|
|
||
| /** | ||
| * wrap the HBaseVersionedStore with an InitialBatchedStore to take care of the early batches | ||
| */ | ||
| val store = new InitialBatchedStore(batcher.currentBatch - 2L, versionedStore) | ||
| val mode = Hdfs(false, new Configuration()) | ||
|
|
||
| /** | ||
| * main | ||
| * Create the Scalding job and run it | ||
| */ | ||
| def runJob(args: Array[String]) { | ||
| val job = Scalding("wordcountJob") | ||
| job.run(waitingState, mode, job.plan(wordCount[Scalding](src, store))) | ||
| } | ||
|
|
||
| /** | ||
| * lookup a Key value in the HBase store | ||
| */ | ||
| def lookup(key: String) : Option[(BatchID, Long)] = { | ||
| val reader = versionedStore.toReadableStore | ||
|
|
||
| Await.result { | ||
| reader.get(key) | ||
| } | ||
| } | ||
|
|
||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,223 @@ | ||
| package com.twitter.summingbird.scalding | ||
|
|
||
| import cascading.flow.FlowDef | ||
| import cascading.tap.Tap | ||
| import cascading.tuple.Fields | ||
| import com.twitter.algebird.monad.Reader | ||
| import com.twitter.bijection.hbase.HBaseBijections.ImmutableBytesWritableBijection | ||
| import com.twitter.bijection.{Bufferable, Injection} | ||
| import com.twitter.bijection.Inversion.attempt | ||
| import com.twitter.maple.hbase.{HBaseScheme, HBaseTap} | ||
| import com.twitter.scalding.{AccessMode, Dsl, Mappable, Mode, Source, TupleConverter, TupleSetter, TypedPipe} | ||
| import com.twitter.scalding.typed.TypedSink | ||
| import com.twitter.storehaus.hbase.HBaseByteArrayStore | ||
| import com.twitter.storehaus.ReadableStore | ||
| import com.twitter.summingbird.batch.{Batcher, BatchID} | ||
| import com.twitter.summingbird.batch.BatchID.batchID2Bytes | ||
| import com.twitter.util.{Await, Future} | ||
| import org.apache.hadoop.conf.Configuration | ||
| import org.apache.hadoop.hbase.io.ImmutableBytesWritable | ||
| import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } | ||
| import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper, ZooDefs} | ||
| import org.apache.zookeeper.data.Stat | ||
| import scala.util.{Failure, Success, Try} | ||
|
|
||
| import Injection._ | ||
|
|
||
| /** | ||
| * Scalding implementation of the batch read and write components of a | ||
| * store that uses the HBase Tap from maple and HBase store from Storehaus | ||
| * | ||
| * @author Josh Buffum | ||
| */ | ||
|
|
||
|
|
||
| object HBaseVersionedStore { | ||
|
|
||
| /** | ||
| * Returns a HBaseVersionedStore abstracts the store/retrieval | ||
| * of (K,V) pairs. These (K,V) pairs are associated with a | ||
| * BatchID by internally storing (K, (BatchID,V)). | ||
| * | ||
| * The packing function receives the inclusive upper BatchID being | ||
| * committed. We actually need to store the exclusive upper bound | ||
| * alongside the value, so the packing function calls | ||
| * batchID.next. On the unpack, we drop the batchID, so no | ||
| * off-by-one error arises. | ||
| */ | ||
| def apply[K, V](quorum: Seq[String], | ||
| table: String)( | ||
| implicit | ||
| batcher: Batcher, | ||
| keyInj: Injection[K, Array[Byte]], | ||
| valueInj: Injection[V, Array[Byte]], | ||
| ordering: Ordering[K]): HBaseVersionedStore[K, V, K, (BatchID,V)] = { | ||
|
|
||
| implicit val buf = Bufferable.viaInjection[(BatchID, V), (Array[Byte], Array[Byte])] | ||
| def value2Inj : Injection[(BatchID, V), Array[Byte]] = Bufferable.injectionOf[(BatchID,V)] | ||
|
|
||
| new HBaseVersionedStore[K, V, K, (BatchID,V)](quorum, table, batcher)( | ||
| { case (batchID, (k, v)) => (k, (batchID.next, v)) })( | ||
| { case (k, (batchID, v)) => (batchID, (k, v)) })(keyInj, keyInj, value2Inj, value2Inj, ordering) | ||
| } | ||
| } | ||
|
|
||
| class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], | ||
| table: String, | ||
| override val batcher: Batcher) | ||
| (pack: (BatchID, (K, V)) => (K2, V2)) | ||
| (unpack: ((K2, V2)) => (BatchID, (K,V)))( | ||
| implicit | ||
| keyInj: Injection[K, Array[Byte]], | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. shouldn't this be K2? |
||
| key2Inj: Injection[K2, Array[Byte]], | ||
| valueInj: Injection[(BatchID,V), Array[Byte]], | ||
| value2Inj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) extends BatchedScaldingStore[K, V] | ||
| { | ||
| val KeyColumnName = "key" | ||
| val ValColumnName = "value" | ||
| val ColumnFamily = "versionedstore" | ||
|
|
||
| val scheme = new HBaseScheme(new Fields(KeyColumnName), ColumnFamily, new Fields(ValColumnName)) | ||
|
|
||
| implicit def byteArray2BytesWritableInj : Injection[Array[Byte], ImmutableBytesWritable] = fromBijection[Array[Byte], ImmutableBytesWritable](ImmutableBytesWritableBijection[Array[Byte]]) | ||
|
|
||
| implicit def injection : Injection[(K2, V2), (Array[Byte], Array[Byte])] = tuple2[K2, V2, Array[Byte], Array[Byte]](key2Inj, value2Inj) | ||
|
|
||
| implicit def kvpInjection: Injection[(K2, V2), (ImmutableBytesWritable,ImmutableBytesWritable)] = { | ||
| Injection.connect[(K2,V2), (Array[Byte],Array[Byte]), (ImmutableBytesWritable,ImmutableBytesWritable)] | ||
| } | ||
|
|
||
| // storehaus store | ||
| def hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) | ||
| .convert[K, (BatchID,V)](keyInj)(valueInj) | ||
|
|
||
| // state store for last processed batchID (readLast) | ||
| def zk = new HBaseStoreZKState(quorum, table) | ||
|
|
||
| /** | ||
| * Exposes a stream with the (K,V) pairs from the highest batchID less than | ||
| * the input "exclusiveUB" batchID. See readVersions() for the creation of this stream | ||
| * This method is called by BatchedScaldingStore.merge | ||
| */ | ||
| override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = { | ||
| readLastBatchID match { | ||
| case Some(batchID) if batchID < exclusiveUB => Right((exclusiveUB, readVersions(exclusiveUB))) | ||
| case Some(batchID) => Left(List("No last batch available < %s for HBaseVersionedStore".format(exclusiveUB))) | ||
| case None => Left(List("No last batch available for HBaseVersionedStore")) | ||
| } | ||
| } | ||
|
|
||
|
|
||
| def readVersions(exclusiveUB: BatchID): FlowProducer[TypedPipe[(K, V)]] = Reader { (flowMode: (FlowDef, Mode)) => | ||
| val mappable = new HBaseVersionedSource[K2, V2](table, scheme) | ||
|
|
||
| val filtered = TypedPipe.from(mappable)(flowMode._1, flowMode._2) | ||
| .map{x: (K2, V2) => unpack(x)} | ||
| .filter{ _._1 < exclusiveUB } // (BatchID, (K,V) | ||
| .map{unpacked: (BatchID,(K,V)) => (unpacked._2._1,(unpacked._1,unpacked._2._2))} // (K, (BatchID,V) | ||
|
|
||
| implicit def batchOrderer = Ordering.by[(BatchID,V),BatchID](_._1) | ||
|
|
||
| filtered | ||
| .group | ||
| .max | ||
| .map{x: (K, (BatchID,V)) => (x._1, x._2._2)} | ||
| } | ||
|
|
||
| /** | ||
| * write the (K, V) pairs aggregated up to batchID (inclusive) into the | ||
| * BatchedScaldingStore. In our case, this BatchedScaldingStore uses HBase | ||
| * as the mechanism to actually store data | ||
| * | ||
| * The data is written in serialized pairs of (K, (BatchID, V)) | ||
| * | ||
| * In addition to writing the (K, (BatchID, V)) tuples, we also store away | ||
| * the last BatchID processed into ZooKeeper to be later used by readLastBatchID | ||
| * for flow planning. We do this as close to the time of writing to HBase as possible | ||
| * to try to keep ZooKeeper and HBase in sync. | ||
| * TODO: In the event that https://github.com/twitter/summingbird/issues/214 | ||
| * is resolved, it might be nice to see if we can include this task in a registered | ||
| * Watcher on the WaitingState | ||
| */ | ||
| override def writeLast(batchID: BatchID, lastVals: TypedPipe[(K, V)])(implicit flowDef: FlowDef, mode: Mode): Unit = { | ||
| import Dsl._ | ||
|
|
||
| var wroteLastBatch = false; | ||
|
|
||
| lastVals | ||
| .map{x: (K,V) => { | ||
| if( !wroteLastBatch ) { | ||
| zk.setLastBatchID(batchID) | ||
| wroteLastBatch = true | ||
| } | ||
| Injection[(K2,V2),(ImmutableBytesWritable,ImmutableBytesWritable)](pack(batchID, x))} | ||
| } | ||
| .toPipe(new Fields(KeyColumnName,ValColumnName)) | ||
| .write(new HBaseVersionedSource[K2, V2](table, scheme)) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We also need to write some state that notes what we have finished batchID. |
||
| } | ||
|
|
||
|
|
||
| def toReadableStore: ReadableStore[K,(BatchID,V)] = { | ||
| hbaseStore | ||
| } | ||
|
|
||
| def readLastBatchID : Option[BatchID] = { | ||
| zk.getLastBatchID | ||
| } | ||
|
|
||
| class HBaseStoreZKState (quorum: Seq[String], | ||
| table: String) | ||
| extends Watcher | ||
| { | ||
| val LastBatchIDZKPath = "/summingbird/" + table + "/lastBatchID" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. maybe we want the users to pass the path?
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I didn't incorporate this feedback. I thought that the details of how/where the store put the state in ZK were internal details to the store and I preferred not to leak them out. If you feel strongly I can add that though. |
||
| val zkServers = quorum.mkString(",") | ||
| val DefaultZKSessionTimeout = 4000 | ||
|
|
||
| val zk = new ZooKeeper(zkServers, DefaultZKSessionTimeout, this) | ||
| createZKPath(LastBatchIDZKPath.split("/"), 2) | ||
|
|
||
| def createZKPath(subpaths: Array[String], idx: Int) { | ||
| val subpath = (subpaths.slice(0,idx).mkString("/")) | ||
|
|
||
| if( zk.exists(subpath, false) == null ) { | ||
| zk.create(subpath, batchID2Bytes(BatchID.Min), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) | ||
| } | ||
|
|
||
| if( subpaths.size != (idx-2)) { | ||
| createZKPath(subpaths, idx+1) | ||
| } | ||
| } | ||
|
|
||
| def setLastBatchID(batchID: BatchID) : scala.util.Try[Stat] = { | ||
| scala.util.Try { | ||
| zk.setData(LastBatchIDZKPath, batchID2Bytes(batchID), -1) | ||
| } | ||
| } | ||
|
|
||
| def getLastBatchID() : Option[BatchID] = { | ||
| val batchBytes = zk.getData(LastBatchIDZKPath, false, new Stat()) | ||
|
|
||
| batchID2Bytes.invert(batchBytes) match { | ||
| case Success(batchID) => Some(batchID) | ||
| case Failure(ex) => None | ||
| } | ||
| } | ||
|
|
||
| override def process(event: WatchedEvent) = { | ||
| } | ||
| } | ||
|
|
||
| private class HBaseVersionedSource[K, V](table: String, | ||
| scheme: HBaseScheme )( | ||
| implicit injection: Injection[(K, V), (Array[Byte], Array[Byte])]) | ||
| extends Source with Mappable[(K,V)] with TypedSink[(K,V)] | ||
| { | ||
| override def converter[U >: (K, V)] = TupleConverter.asSuperConverter[(K, V), U](TupleConverter.of[(K, V)]) | ||
|
|
||
| override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K,V)]) | ||
|
|
||
| override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_,_,_] = { | ||
| (new HBaseTap(table, scheme)).asInstanceOf[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]] | ||
| } | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can you comment the assymmetry on the .next? (I'm confused actually).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the delay. I'll add the comment here. Just for reference, I borrowed the logic here from https://github.com/twitter/summingbird/blob/develop/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/store/VersionedStore.scala. The comment I'll add will look very similar to the description of the VersionedStore object