From 1db3921b2024b7120cd98e2ca348f89a79c61661 Mon Sep 17 00:00:00 2001 From: Josh Buffum Date: Tue, 10 Dec 2013 13:30:30 -0800 Subject: [PATCH 1/6] hbase scalding Store based on maple/storehaus --- project/Build.scala | 11 +- .../summingbird/example/ScaldingRunner.scala | 107 +++++++++++ .../scalding/HBaseVersionedStore.scala | 173 ++++++++++++++++++ 3 files changed, 287 insertions(+), 4 deletions(-) create mode 100644 summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala create mode 100644 summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala diff --git a/project/Build.scala b/project/Build.scala index dc7c0b765..59c7b4575 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -38,7 +38,8 @@ object SummingbirdBuild extends Build { Opts.resolver.sonatypeReleases, "Clojars Repository" at "http://clojars.org/repo", "Conjars Repository" at "http://conjars.org/repo", - "Twitter Maven" at "http://maven.twttr.com" + "Twitter Maven" at "http://maven.twttr.com", + "Apache" at "http://repo.maven.apache.org/maven2" ), parallelExecution in Test := true, @@ -127,7 +128,7 @@ object SummingbirdBuild extends Build { val utilVersion = "6.3.8" val chillVersion = "0.3.3" val tormentaVersion = "0.5.4" - + val hbaseVersion = "0.94.6" lazy val slf4jVersion = "1.6.6" /** @@ -229,7 +230,9 @@ object SummingbirdBuild extends Build { "com.twitter" %% "chill-bijection" % chillVersion, "commons-lang" % "commons-lang" % "2.6", "com.twitter" %% "scalding-core" % scaldingVersion, - "com.twitter" %% "scalding-commons" % scaldingVersion + "com.twitter" %% "scalding-commons" % scaldingVersion, + "com.twitter" %% "storehaus-hbase" % storehausVersion, + "org.apache.hbase" % "hbase" % hbaseVersion ) ).dependsOn( summingbirdCore % "test->test;compile->compile", @@ -251,5 +254,5 @@ object SummingbirdBuild extends Build { "com.twitter" %% "tormenta-twitter" % tormentaVersion exclude("org.slf4j", "log4j-over-slf4j") exclude("ch.qos.logback", "logback-classic"), "com.twitter" %% "storehaus-memcache" % storehausVersion ) - ).dependsOn(summingbirdCore, summingbirdStorm) + ).dependsOn(summingbirdCore, summingbirdStorm, summingbirdScalding) } diff --git a/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala b/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala new file mode 100644 index 000000000..36b90c983 --- /dev/null +++ b/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala @@ -0,0 +1,107 @@ +package com.twitter.summingbird.example + +import com.twitter.scalding.{ Hdfs, TextLine } +import com.twitter.summingbird.batch.{ Batcher, Timestamp, BatchID } +import com.twitter.summingbird.scalding.{ InitialBatchedStore, Scalding, HBaseVersionedStore } +import com.twitter.summingbird.scalding.state.HDFSState +import com.twitter.summingbird.scalding.store.VersionedStore +import com.twitter.summingbird.{ Platform, Producer, TimeExtractor } +import com.twitter.storehaus.ReadableStore +import com.twitter.util.Await +import java.util.Date +import org.apache.hadoop.conf.Configuration + +/** + * The following object contains code to execute a similar Scalding + * job to the WordCount job defined in ExampleJob.scala. This job works + * on plain text files, as opposed to tweet Status objects. + * The example job uses a Store on top of HBase. This does require you to + * set up a local running hbase with zookeeper. + * + * @author Josh Buffum + * @author Riju Kallivalappil + */ + +object ScaldingRunner { + final val MillisInHour = 60 * 60 * 1000 + + /** + * Directory location to store state and read input file. + */ + final val JobDir = "/user/mydir/wordcount" + + /** + * pull in the serialization injections and WordCount job + */ + import Serialization._ + + implicit val batcher = Batcher.ofHours(1) + + // taken from ExampleJob + def tokenize(text: String) : TraversableOnce[String] = + text.toLowerCase + .replaceAll("[^a-zA-Z0-9\\s]", "") + .split("\\s+") + + /** + * The actual Summingbird job. Works against text instead of tweet Status + */ + def wordCount[P <: Platform[P]](source: Producer[P, String], store: P#Store[String, Long]) = { + source + .filter(_ != null) + .flatMap { text: String => tokenize(text).map(_ -> 1L) } + .sumByKey(store) + } + + // Always use an hour before the current time as the batch id. + // The storm job uses the current hour. This way we can get the "merger" to work across 2 batches + implicit val timeOf: TimeExtractor[String] = TimeExtractor(_ => new Date().getTime - MillisInHour) + + val now = System.currentTimeMillis + val waitingState = HDFSState(JobDir + "/waitstate", startTime = Some(Timestamp(now - 2 * MillisInHour)), + numBatches = 3) + + // read text lines in input.txt as job input + val src = Producer.source[Scalding, String](Scalding.pipeFactoryExact(_ => TextLine(JobDir + "/input.txt"))) + + /** + * Create the HBaseVersionedStore. Results from the Scalding job will be written + * as String => (BatchID, Long) pairs into a HBase cluster defined in a Zookeeper + * quorum at "localhost" in a table "wordcountJob" + */ + val versionedStore = HBaseVersionedStore[String, Long] ( + Seq("localhost"), + "wordcountJob" + ) + + /** + * wrap the HBaseVersionedStore with an InitialBatchedStore to take care of the early batches + */ + val store = new InitialBatchedStore(batcher.currentBatch - 2L, versionedStore) + val mode = Hdfs(false, new Configuration()) + + /** + * main + * Create the Scalding job and run it + */ + def main(args: Array[String]) { + val job = Scalding("wordcountJob") + job.run(waitingState, mode, job.plan(wordCount[Scalding](src, store))) + } + + def getReadableStore(batchOffset: Int = 0)(implicit batcher: Batcher) = { + versionedStore.asInstanceOf[ReadableStore[String, (BatchID,Long)]] + } + + /** + * lookup a Key value in the HBase store + */ + def lookup(key: String) : Option[(BatchID, Long)] = { + val reader = getReadableStore() + + Await.result { + reader.get(key) + } + } + +} diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala new file mode 100644 index 000000000..e0c1ba422 --- /dev/null +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala @@ -0,0 +1,173 @@ +package com.twitter.summingbird.scalding + +import cascading.flow.FlowDef +import cascading.tap.Tap +import cascading.tuple.Fields +import com.twitter.algebird.monad.Reader +import com.twitter.bijection.{Injection, AbstractInjection, Bufferable, Codec} +import com.twitter.bijection.Inversion.attempt +import com.twitter.maple.hbase.{HBaseScheme, HBaseTap} +import com.twitter.scalding.{AccessMode, Dsl, Mappable, Mode, Source, TupleConverter, TupleSetter, TypedPipe} +import com.twitter.scalding.typed.TypedSink +import com.twitter.storehaus.hbase.HBaseByteArrayStore +import com.twitter.storehaus.ReadableStore +import com.twitter.summingbird.batch.{Batcher, BatchID} +import com.twitter.util.{Await, Future} +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.hbase.io.ImmutableBytesWritable +import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } +import scala.util.{Failure, Success} + +/** + * Scalding implementation of the batch read and write components of a + * store that uses the VersionedKeyValSource from scalding-commons. + * + * @author Josh Buffum + */ + + +object HBaseVersionedStore { + + def apply[K, V](quorum: Seq[String], + table: String)( + implicit + batcher: Batcher, + injection: Injection[(K, (BatchID,V)), (Array[Byte], Array[Byte])], + keyInj: Injection[K, Array[Byte]], + valueInj: Injection[(BatchID,V), Array[Byte]], + ordering: Ordering[K]): HBaseVersionedStore[K, V, K, (BatchID,V)] = { + new HBaseVersionedStore[K, V, K, (BatchID,V)](quorum, table, batcher)( + { case (batchID, (k, v)) => (k, (batchID.next, v)) })( + { case (k, (batchID, v)) => (batchID, (k, v)) }) + } +} + +class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], + table: String, + override val batcher: Batcher) + (pack: (BatchID, (K, V)) => (K2, V2)) + (unpack: ((K2, V2)) => (BatchID, (K,V)))( + implicit + injection: Injection[(K2, V2), (Array[Byte], Array[Byte])], + keyInj: Injection[K, Array[Byte]], + valueInj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) + extends BatchedScaldingStore[K, V] with ReadableStore[K,V2] +{ self => + + val KeyColumnName = "key" + val ValColumnName = "value" + val ColumnFamily = "versionedstore" + + val scheme = new HBaseScheme(new Fields(KeyColumnName), ColumnFamily, new Fields(ValColumnName)) + + implicit val b2immutable: Injection[Array[Byte], ImmutableBytesWritable] = + new AbstractInjection[Array[Byte], ImmutableBytesWritable] { + def apply(a: Array[Byte]) = new ImmutableBytesWritable(a) + override def invert(i: ImmutableBytesWritable) = attempt(i)(_.get()) + } + + implicit def kvpInjection: Injection[(K2, V2), (ImmutableBytesWritable,ImmutableBytesWritable)] = { + Injection.connect[(K2,V2), (Array[Byte],Array[Byte]), (ImmutableBytesWritable,ImmutableBytesWritable)] + } + + // this is only used for client queries and does not need to be serialized out + // during the scalding job + @transient val hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) + hbaseStore.createTableIfRequired + + /** + * Exposes a stream with the (K,V) pairs from the highest batchID less than + * the input "exclusiveUB" batchID. See readVersions() for the creation of this stream + * This method is called by BatchedScaldingStore.merge + */ + override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = { + hbaseStore.createTableIfRequired + val flow = readVersions(exclusiveUB) + + if( flow != Nil ) { + Right((exclusiveUB, flow)) + } else { + Left(List("No last batch available < %s for HBaseVersionedStore()".format(exclusiveUB))) + } + } + + + def readVersions(exclusiveUB: BatchID): FlowProducer[TypedPipe[(K, V)]] = Reader { (flowMode: (FlowDef, Mode)) => + val mappable = new HBaseVersionedSource[K2, V2](table, scheme) + + val filtered = TypedPipe.from(mappable)(flowMode._1, flowMode._2) + .map{x: (K2, V2) => unpack(x)} + .filter{ _._1 < exclusiveUB } // (BatchID, (K,V) + .map{unpacked: (BatchID,(K,V)) => (unpacked._2._1,(unpacked._1,unpacked._2._2))} // (K, (BatchID,V) + + object BatchOrdering extends Ordering[(BatchID,V)] { + def compare (a: (BatchID,V), b: (BatchID,V)) = { + a._1 compare b._1 + } + } + + implicit def batchOrder = BatchOrdering; + + filtered + .group + .max + .map{x: (K, (BatchID,V)) => (x._1, x._2._2)} + } + + + /** + * write the (K, V) pairs aggregated up to batchID (inclusive) into the + * BatchedScaldingStore. In our case, this BatchedScaldingStore uses HBase + * as the mechanism to actually store data + * + * The data is written in serialized pairs of (K, (BatchID, V)) + */ + override def writeLast(batchID: BatchID, lastVals: TypedPipe[(K, V)])(implicit flowDef: FlowDef, mode: Mode): Unit = { + import Dsl._ + + lastVals.map{x: (K,V) => Injection[(K2,V2),(ImmutableBytesWritable,ImmutableBytesWritable)](pack(batchID, x))} + .toPipe(new Fields(KeyColumnName,ValColumnName)) + .write(new HBaseVersionedSource[K2, V2](table, scheme)) + } + + /* overridden methods for ReadableStore[K, V2] */ + override def get(k: K): Future[Option[V2]] = { + + val keyBytes = Injection[K, Array[Byte]](k) + val valBytes = Await.result(hbaseStore.get(keyBytes)) + + val v2Val: Option[V2] = valBytes match { + case Some(bytes) => { + Injection.invert[V2, Array[Byte]](bytes) match { + case Success(deserialized) => Option(deserialized) + } + } + // V2 is (BatchID, V) + case _ => Option((new BatchID(0L), None).asInstanceOf[V2]) + } + + Future.value(v2Val) + } + + + override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V2]]] = { + ks.map{ k => (k, self.get(k)) } + .toMap + } + +} + + +class HBaseVersionedSource[K, V](table: String, + scheme: HBaseScheme )( + implicit injection: Injection[(K, V), (Array[Byte], Array[Byte])]) + extends Source with Mappable[(K,V)] with TypedSink[(K,V)] +{ + override def converter[U >: (K, V)] = TupleConverter.asSuperConverter[(K, V), U](TupleConverter.of[(K, V)]) + + override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K,V)]) + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_,_,_] = { + (new HBaseTap(table, scheme)).asInstanceOf[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]] + } +} From 05550106af7464385d16317e829f0f0f1e7eccab Mon Sep 17 00:00:00 2001 From: Josh Buffum Date: Thu, 12 Dec 2013 15:44:56 -0800 Subject: [PATCH 2/6] incorporate code review feedback --- project/Build.scala | 2 +- .../summingbird/example/ScaldingRunner.scala | 6 +- .../scalding/HBaseVersionedStore.scala | 63 ++++++++++--------- 3 files changed, 35 insertions(+), 36 deletions(-) diff --git a/project/Build.scala b/project/Build.scala index 59c7b4575..ce7871379 100644 --- a/project/Build.scala +++ b/project/Build.scala @@ -232,7 +232,7 @@ object SummingbirdBuild extends Build { "com.twitter" %% "scalding-core" % scaldingVersion, "com.twitter" %% "scalding-commons" % scaldingVersion, "com.twitter" %% "storehaus-hbase" % storehausVersion, - "org.apache.hbase" % "hbase" % hbaseVersion + "org.apache.hbase" % "hbase" % hbaseVersion % "provided" ) ).dependsOn( summingbirdCore % "test->test;compile->compile", diff --git a/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala b/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala index 36b90c983..973d76874 100644 --- a/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala +++ b/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala @@ -89,15 +89,11 @@ object ScaldingRunner { job.run(waitingState, mode, job.plan(wordCount[Scalding](src, store))) } - def getReadableStore(batchOffset: Int = 0)(implicit batcher: Batcher) = { - versionedStore.asInstanceOf[ReadableStore[String, (BatchID,Long)]] - } - /** * lookup a Key value in the HBase store */ def lookup(key: String) : Option[(BatchID, Long)] = { - val reader = getReadableStore() + val reader = versionedStore.toReadableStore Await.result { reader.get(key) diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala index e0c1ba422..50fa5187d 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala @@ -4,7 +4,8 @@ import cascading.flow.FlowDef import cascading.tap.Tap import cascading.tuple.Fields import com.twitter.algebird.monad.Reader -import com.twitter.bijection.{Injection, AbstractInjection, Bufferable, Codec} +import com.twitter.bijection.hbase.HBaseBijections.ImmutableBytesWritableBijection +import com.twitter.bijection.Injection import com.twitter.bijection.Inversion.attempt import com.twitter.maple.hbase.{HBaseScheme, HBaseTap} import com.twitter.scalding.{AccessMode, Dsl, Mappable, Mode, Source, TupleConverter, TupleSetter, TypedPipe} @@ -18,6 +19,8 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } import scala.util.{Failure, Success} +import Injection._ + /** * Scalding implementation of the batch read and write components of a * store that uses the VersionedKeyValSource from scalding-commons. @@ -50,30 +53,24 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], implicit injection: Injection[(K2, V2), (Array[Byte], Array[Byte])], keyInj: Injection[K, Array[Byte]], - valueInj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) - extends BatchedScaldingStore[K, V] with ReadableStore[K,V2] -{ self => - + valueInj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) extends BatchedScaldingStore[K, V] +{ val KeyColumnName = "key" val ValColumnName = "value" val ColumnFamily = "versionedstore" val scheme = new HBaseScheme(new Fields(KeyColumnName), ColumnFamily, new Fields(ValColumnName)) - - implicit val b2immutable: Injection[Array[Byte], ImmutableBytesWritable] = - new AbstractInjection[Array[Byte], ImmutableBytesWritable] { - def apply(a: Array[Byte]) = new ImmutableBytesWritable(a) - override def invert(i: ImmutableBytesWritable) = attempt(i)(_.get()) - } - + + implicit lazy val byteArray2BytesWritableInj : Injection[Array[Byte], ImmutableBytesWritable] = fromBijection[Array[Byte], ImmutableBytesWritable](ImmutableBytesWritableBijection[Array[Byte]]) + implicit def kvpInjection: Injection[(K2, V2), (ImmutableBytesWritable,ImmutableBytesWritable)] = { Injection.connect[(K2,V2), (Array[Byte],Array[Byte]), (ImmutableBytesWritable,ImmutableBytesWritable)] } + // this is only used for client queries and does not need to be serialized out // during the scalding job - @transient val hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) - hbaseStore.createTableIfRequired + @transient val hbaseStore = new HBaseStore (quorum, table, ColumnFamily, ValColumnName, true)(keyInj, valueInj) /** * Exposes a stream with the (K,V) pairs from the highest batchID less than @@ -81,14 +78,7 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], * This method is called by BatchedScaldingStore.merge */ override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = { - hbaseStore.createTableIfRequired - val flow = readVersions(exclusiveUB) - - if( flow != Nil ) { - Right((exclusiveUB, flow)) - } else { - Left(List("No last batch available < %s for HBaseVersionedStore()".format(exclusiveUB))) - } + Right((exclusiveUB, readVersions(exclusiveUB))) } @@ -100,13 +90,7 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], .filter{ _._1 < exclusiveUB } // (BatchID, (K,V) .map{unpacked: (BatchID,(K,V)) => (unpacked._2._1,(unpacked._1,unpacked._2._2))} // (K, (BatchID,V) - object BatchOrdering extends Ordering[(BatchID,V)] { - def compare (a: (BatchID,V), b: (BatchID,V)) = { - a._1 compare b._1 - } - } - - implicit def batchOrder = BatchOrdering; + implicit def batchOrderer = Ordering.by[(BatchID,V),BatchID](_._1) filtered .group @@ -130,7 +114,26 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], .write(new HBaseVersionedSource[K2, V2](table, scheme)) } - /* overridden methods for ReadableStore[K, V2] */ + + def toReadableStore: ReadableStore[K,V2] = { + hbaseStore.asInstanceOf[ReadableStore[K,V2]] + } + + +} + + +class HBaseStore [K, V2] (quorum: Seq[String], + table: String, + columnFamily: String, + valColumnName: String, + createTable: Boolean)( + implicit + keyInj: Injection[K, Array[Byte]], + valueInj: Injection[V2, Array[Byte]]) extends ReadableStore[K, V2] +{ self => + val hbaseStore = HBaseByteArrayStore(quorum, table, columnFamily, valColumnName, createTable) + /* overridden methods for ReadableStore[K, V2] */ override def get(k: K): Future[Option[V2]] = { val keyBytes = Injection[K, Array[Byte]](k) From 849b74851bad07a764b7ac163d2f5dc5d905ee43 Mon Sep 17 00:00:00 2001 From: Josh Buffum Date: Fri, 13 Dec 2013 10:53:07 -0800 Subject: [PATCH 3/6] used converetd HBaseByteArrayStore rather than new class --- .../summingbird/example/ScaldingRunner.scala | 2 +- .../scalding/HBaseVersionedStore.scala | 44 ++----------------- 2 files changed, 4 insertions(+), 42 deletions(-) diff --git a/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala b/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala index 973d76874..c4a6535d1 100644 --- a/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala +++ b/summingbird-example/src/main/scala/com/twitter/summingbird/example/ScaldingRunner.scala @@ -84,7 +84,7 @@ object ScaldingRunner { * main * Create the Scalding job and run it */ - def main(args: Array[String]) { + def runJob(args: Array[String]) { val job = Scalding("wordcountJob") job.run(waitingState, mode, job.plan(wordCount[Scalding](src, store))) } diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala index 50fa5187d..a6db61905 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala @@ -70,7 +70,8 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], // this is only used for client queries and does not need to be serialized out // during the scalding job - @transient val hbaseStore = new HBaseStore (quorum, table, ColumnFamily, ValColumnName, true)(keyInj, valueInj) + @transient val hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) + .convert[K,V2](keyInj)(valueInj) /** * Exposes a stream with the (K,V) pairs from the highest batchID less than @@ -117,47 +118,8 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], def toReadableStore: ReadableStore[K,V2] = { hbaseStore.asInstanceOf[ReadableStore[K,V2]] - } - - -} - + } -class HBaseStore [K, V2] (quorum: Seq[String], - table: String, - columnFamily: String, - valColumnName: String, - createTable: Boolean)( - implicit - keyInj: Injection[K, Array[Byte]], - valueInj: Injection[V2, Array[Byte]]) extends ReadableStore[K, V2] -{ self => - val hbaseStore = HBaseByteArrayStore(quorum, table, columnFamily, valColumnName, createTable) - /* overridden methods for ReadableStore[K, V2] */ - override def get(k: K): Future[Option[V2]] = { - - val keyBytes = Injection[K, Array[Byte]](k) - val valBytes = Await.result(hbaseStore.get(keyBytes)) - - val v2Val: Option[V2] = valBytes match { - case Some(bytes) => { - Injection.invert[V2, Array[Byte]](bytes) match { - case Success(deserialized) => Option(deserialized) - } - } - // V2 is (BatchID, V) - case _ => Option((new BatchID(0L), None).asInstanceOf[V2]) - } - - Future.value(v2Val) - } - - - override def multiGet[K1 <: K](ks: Set[K1]): Map[K1, Future[Option[V2]]] = { - ks.map{ k => (k, self.get(k)) } - .toMap - } - } From 205be314e30a629fe25334b14c1e587bf5c3c127 Mon Sep 17 00:00:00 2001 From: Josh Buffum Date: Mon, 16 Dec 2013 15:51:15 -0800 Subject: [PATCH 4/6] remove extra cast --- .../com/twitter/summingbird/scalding/HBaseVersionedStore.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala index a6db61905..6876edac5 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala @@ -117,7 +117,7 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], def toReadableStore: ReadableStore[K,V2] = { - hbaseStore.asInstanceOf[ReadableStore[K,V2]] + hbaseStore } } From c56b2a4ec3bcb6809634aa5bd171f3f34751c31b Mon Sep 17 00:00:00 2001 From: Josh Buffum Date: Thu, 9 Jan 2014 13:22:07 -0800 Subject: [PATCH 5/6] store/fetch last processed BatchID in ZK --- .../scalding/HBaseVersionedStore.scala | 130 ++++++++++++++---- 1 file changed, 104 insertions(+), 26 deletions(-) diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala index 6876edac5..bd7b7fb5a 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala @@ -13,35 +13,47 @@ import com.twitter.scalding.typed.TypedSink import com.twitter.storehaus.hbase.HBaseByteArrayStore import com.twitter.storehaus.ReadableStore import com.twitter.summingbird.batch.{Batcher, BatchID} +import com.twitter.summingbird.batch.BatchID.batchID2Bytes import com.twitter.util.{Await, Future} import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } +import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper, ZooDefs} +import org.apache.zookeeper.data.Stat import scala.util.{Failure, Success} import Injection._ /** * Scalding implementation of the batch read and write components of a - * store that uses the VersionedKeyValSource from scalding-commons. + * store that uses the HBase Tap from maple and HBase store from Storehaus * * @author Josh Buffum */ object HBaseVersionedStore { - + /** + * Returns a HBaseVersionedStore abstracts the store/retrieval + * of (K,V) pairs. These (K,V) pairs are associated with a + * BatchID by internally storing (K, (BatchID,V)). + * + * The packing function receives the inclusive upper BatchID being + * committed. We actually need to store the exclusive upper bound + * alongside the value, so the packing function calls + * batchID.next. On the unpack, we drop the batchID, so no + * off-by-one error arises. + */ def apply[K, V](quorum: Seq[String], - table: String)( + table: String)( implicit batcher: Batcher, - injection: Injection[(K, (BatchID,V)), (Array[Byte], Array[Byte])], keyInj: Injection[K, Array[Byte]], valueInj: Injection[(BatchID,V), Array[Byte]], ordering: Ordering[K]): HBaseVersionedStore[K, V, K, (BatchID,V)] = { new HBaseVersionedStore[K, V, K, (BatchID,V)](quorum, table, batcher)( { case (batchID, (k, v)) => (k, (batchID.next, v)) })( - { case (k, (batchID, v)) => (batchID, (k, v)) }) + { case (k, (batchID, v)) => (batchID, (k, v)) })(keyInj, keyInj, valueInj, ordering) } } @@ -51,8 +63,8 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], (pack: (BatchID, (K, V)) => (K2, V2)) (unpack: ((K2, V2)) => (BatchID, (K,V)))( implicit - injection: Injection[(K2, V2), (Array[Byte], Array[Byte])], keyInj: Injection[K, Array[Byte]], + key2Inj: Injection[K2, Array[Byte]], valueInj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) extends BatchedScaldingStore[K, V] { val KeyColumnName = "key" @@ -63,15 +75,19 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], implicit lazy val byteArray2BytesWritableInj : Injection[Array[Byte], ImmutableBytesWritable] = fromBijection[Array[Byte], ImmutableBytesWritable](ImmutableBytesWritableBijection[Array[Byte]]) + implicit def injection : Injection[(K2, V2), (Array[Byte], Array[Byte])] = tuple2[K2, V2, Array[Byte], Array[Byte]](key2Inj, valueInj) + implicit def kvpInjection: Injection[(K2, V2), (ImmutableBytesWritable,ImmutableBytesWritable)] = { Injection.connect[(K2,V2), (Array[Byte],Array[Byte]), (ImmutableBytesWritable,ImmutableBytesWritable)] } - // this is only used for client queries and does not need to be serialized out // during the scalding job - @transient val hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) + @transient def hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) .convert[K,V2](keyInj)(valueInj) + + // state store for last processed batchID (readLast) + @transient def zk = new HBaseStoreZKState(quorum, table) /** * Exposes a stream with the (K,V) pairs from the highest batchID less than @@ -79,7 +95,11 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], * This method is called by BatchedScaldingStore.merge */ override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = { - Right((exclusiveUB, readVersions(exclusiveUB))) + readLastBatchID match { + case batchID: BatchID if batchID < exclusiveUB => Right((exclusiveUB, readVersions(exclusiveUB))) + case _ => Left(List("No last batch available < %s for HBaseVersionedStore".format(exclusiveUB))) + } + } @@ -97,8 +117,7 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], .group .max .map{x: (K, (BatchID,V)) => (x._1, x._2._2)} - } - + } /** * write the (K, V) pairs aggregated up to batchID (inclusive) into the @@ -106,11 +125,28 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], * as the mechanism to actually store data * * The data is written in serialized pairs of (K, (BatchID, V)) + * + * In addition to writing the (K, (BatchID, V)) tuples, we also store away + * the last BatchID processed into ZooKeeper to be later used by readLastBatchID + * for flow planning. We do this as close to the time of writing the HBase as possible + * to try to keep the ZooKeeper and HBase state in sync. + * TODO: In the event that https://github.com/twitter/summingbird/issues/214 + * is resolved, it might be nice to see if we can include this task in a registered + * Watcher on the WaitingState */ override def writeLast(batchID: BatchID, lastVals: TypedPipe[(K, V)])(implicit flowDef: FlowDef, mode: Mode): Unit = { import Dsl._ - lastVals.map{x: (K,V) => Injection[(K2,V2),(ImmutableBytesWritable,ImmutableBytesWritable)](pack(batchID, x))} + var wroteLastBatch = false; + + lastVals + .map{x: (K,V) => { + if( !wroteLastBatch ) { + zk.setLastBatchID(batchID) + wroteLastBatch = true + } + Injection[(K2,V2),(ImmutableBytesWritable,ImmutableBytesWritable)](pack(batchID, x))} + } .toPipe(new Fields(KeyColumnName,ValColumnName)) .write(new HBaseVersionedSource[K2, V2](table, scheme)) } @@ -119,20 +155,62 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], def toReadableStore: ReadableStore[K,V2] = { hbaseStore } - -} - - -class HBaseVersionedSource[K, V](table: String, - scheme: HBaseScheme )( - implicit injection: Injection[(K, V), (Array[Byte], Array[Byte])]) - extends Source with Mappable[(K,V)] with TypedSink[(K,V)] -{ - override def converter[U >: (K, V)] = TupleConverter.asSuperConverter[(K, V), U](TupleConverter.of[(K, V)]) - - override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K,V)]) - override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_,_,_] = { - (new HBaseTap(table, scheme)).asInstanceOf[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]] + def readLastBatchID : Option[BatchID] = { + zk.getLastBatchID + } + + class HBaseStoreZKState (quorum: Seq[String], + table: String) + extends Watcher + { + val LastBatchIDZKPath = "/summingbird/" + table + "/lastBatchID" + val zkServers = quorum.mkString(",") + val DefaultZKSessionTimeout = 4000 + + val zk = new ZooKeeper(zkServers, DefaultZKSessionTimeout, this) + createZKPath(LastBatchIDZKPath.split("/"), 2) + + def createZKPath(subpaths: Array[String], idx: Int) { + val subpath = (subpaths.slice(0,idx).mkString("/")) + + if( zk.exists(subpath, false) == null ) { + zk.create(subpath, batchID2Bytes(BatchID.Min), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT) + } + + if( subpaths.size != (idx-2)) { + createZKPath(subpaths, idx+1) + } + } + + def setLastBatchID(batchID: BatchID) { + zk.setData(LastBatchIDZKPath, batchID2Bytes(batchID), -1) + } + + def getLastBatchID() : Option[BatchID] = { + val batchBytes = zk.getData(LastBatchIDZKPath, false, new Stat()) + + batchID2Bytes.invert(batchBytes) match { + case Success(batchID) => Some(batchID) + case _ => None + } + } + + override def process(event: WatchedEvent) = { + } + } + + private class HBaseVersionedSource[K, V](table: String, + scheme: HBaseScheme )( + implicit injection: Injection[(K, V), (Array[Byte], Array[Byte])]) + extends Source with Mappable[(K,V)] with TypedSink[(K,V)] + { + override def converter[U >: (K, V)] = TupleConverter.asSuperConverter[(K, V), U](TupleConverter.of[(K, V)]) + + override def setter[U <: (K, V)] = TupleSetter.asSubSetter[(K, V), U](TupleSetter.of[(K,V)]) + + override def createTap(readOrWrite: AccessMode)(implicit mode: Mode): Tap[_,_,_] = { + (new HBaseTap(table, scheme)).asInstanceOf[Tap[JobConf, RecordReader[_,_], OutputCollector[_,_]]] + } } } From cfb97d20adbdf78137127e2bcecedcd022f6939a Mon Sep 17 00:00:00 2001 From: Josh Buffum Date: Tue, 14 Jan 2014 13:24:51 -0800 Subject: [PATCH 6/6] apply code review feedback --- .../scalding/HBaseVersionedStore.scala | 51 +++++++++++-------- 1 file changed, 29 insertions(+), 22 deletions(-) diff --git a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala index bd7b7fb5a..c788efda2 100644 --- a/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala +++ b/summingbird-scalding/src/main/scala/com/twitter/summingbird/scalding/HBaseVersionedStore.scala @@ -5,7 +5,7 @@ import cascading.tap.Tap import cascading.tuple.Fields import com.twitter.algebird.monad.Reader import com.twitter.bijection.hbase.HBaseBijections.ImmutableBytesWritableBijection -import com.twitter.bijection.Injection +import com.twitter.bijection.{Bufferable, Injection} import com.twitter.bijection.Inversion.attempt import com.twitter.maple.hbase.{HBaseScheme, HBaseTap} import com.twitter.scalding.{AccessMode, Dsl, Mappable, Mode, Source, TupleConverter, TupleSetter, TypedPipe} @@ -20,7 +20,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable import org.apache.hadoop.mapred.{ JobConf, OutputCollector, RecordReader } import org.apache.zookeeper.{CreateMode, WatchedEvent, Watcher, ZooKeeper, ZooDefs} import org.apache.zookeeper.data.Stat -import scala.util.{Failure, Success} +import scala.util.{Failure, Success, Try} import Injection._ @@ -33,6 +33,7 @@ import Injection._ object HBaseVersionedStore { + /** * Returns a HBaseVersionedStore abstracts the store/retrieval * of (K,V) pairs. These (K,V) pairs are associated with a @@ -49,11 +50,15 @@ object HBaseVersionedStore { implicit batcher: Batcher, keyInj: Injection[K, Array[Byte]], - valueInj: Injection[(BatchID,V), Array[Byte]], + valueInj: Injection[V, Array[Byte]], ordering: Ordering[K]): HBaseVersionedStore[K, V, K, (BatchID,V)] = { + + implicit val buf = Bufferable.viaInjection[(BatchID, V), (Array[Byte], Array[Byte])] + def value2Inj : Injection[(BatchID, V), Array[Byte]] = Bufferable.injectionOf[(BatchID,V)] + new HBaseVersionedStore[K, V, K, (BatchID,V)](quorum, table, batcher)( { case (batchID, (k, v)) => (k, (batchID.next, v)) })( - { case (k, (batchID, v)) => (batchID, (k, v)) })(keyInj, keyInj, valueInj, ordering) + { case (k, (batchID, v)) => (batchID, (k, v)) })(keyInj, keyInj, value2Inj, value2Inj, ordering) } } @@ -65,7 +70,8 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], implicit keyInj: Injection[K, Array[Byte]], key2Inj: Injection[K2, Array[Byte]], - valueInj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) extends BatchedScaldingStore[K, V] + valueInj: Injection[(BatchID,V), Array[Byte]], + value2Inj: Injection[V2, Array[Byte]], override val ordering: Ordering[K]) extends BatchedScaldingStore[K, V] { val KeyColumnName = "key" val ValColumnName = "value" @@ -73,33 +79,32 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], val scheme = new HBaseScheme(new Fields(KeyColumnName), ColumnFamily, new Fields(ValColumnName)) - implicit lazy val byteArray2BytesWritableInj : Injection[Array[Byte], ImmutableBytesWritable] = fromBijection[Array[Byte], ImmutableBytesWritable](ImmutableBytesWritableBijection[Array[Byte]]) + implicit def byteArray2BytesWritableInj : Injection[Array[Byte], ImmutableBytesWritable] = fromBijection[Array[Byte], ImmutableBytesWritable](ImmutableBytesWritableBijection[Array[Byte]]) - implicit def injection : Injection[(K2, V2), (Array[Byte], Array[Byte])] = tuple2[K2, V2, Array[Byte], Array[Byte]](key2Inj, valueInj) + implicit def injection : Injection[(K2, V2), (Array[Byte], Array[Byte])] = tuple2[K2, V2, Array[Byte], Array[Byte]](key2Inj, value2Inj) implicit def kvpInjection: Injection[(K2, V2), (ImmutableBytesWritable,ImmutableBytesWritable)] = { Injection.connect[(K2,V2), (Array[Byte],Array[Byte]), (ImmutableBytesWritable,ImmutableBytesWritable)] } - // this is only used for client queries and does not need to be serialized out - // during the scalding job - @transient def hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) - .convert[K,V2](keyInj)(valueInj) + // storehaus store + def hbaseStore = HBaseByteArrayStore (quorum, table, ColumnFamily, ValColumnName, true) + .convert[K, (BatchID,V)](keyInj)(valueInj) // state store for last processed batchID (readLast) - @transient def zk = new HBaseStoreZKState(quorum, table) + def zk = new HBaseStoreZKState(quorum, table) /** * Exposes a stream with the (K,V) pairs from the highest batchID less than * the input "exclusiveUB" batchID. See readVersions() for the creation of this stream * This method is called by BatchedScaldingStore.merge */ - override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = { + override def readLast(exclusiveUB: BatchID, mode: Mode): Try[(BatchID, FlowProducer[TypedPipe[(K, V)]])] = { readLastBatchID match { - case batchID: BatchID if batchID < exclusiveUB => Right((exclusiveUB, readVersions(exclusiveUB))) - case _ => Left(List("No last batch available < %s for HBaseVersionedStore".format(exclusiveUB))) + case Some(batchID) if batchID < exclusiveUB => Right((exclusiveUB, readVersions(exclusiveUB))) + case Some(batchID) => Left(List("No last batch available < %s for HBaseVersionedStore".format(exclusiveUB))) + case None => Left(List("No last batch available for HBaseVersionedStore")) } - } @@ -128,8 +133,8 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], * * In addition to writing the (K, (BatchID, V)) tuples, we also store away * the last BatchID processed into ZooKeeper to be later used by readLastBatchID - * for flow planning. We do this as close to the time of writing the HBase as possible - * to try to keep the ZooKeeper and HBase state in sync. + * for flow planning. We do this as close to the time of writing to HBase as possible + * to try to keep ZooKeeper and HBase in sync. * TODO: In the event that https://github.com/twitter/summingbird/issues/214 * is resolved, it might be nice to see if we can include this task in a registered * Watcher on the WaitingState @@ -152,7 +157,7 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], } - def toReadableStore: ReadableStore[K,V2] = { + def toReadableStore: ReadableStore[K,(BatchID,V)] = { hbaseStore } @@ -183,8 +188,10 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], } } - def setLastBatchID(batchID: BatchID) { - zk.setData(LastBatchIDZKPath, batchID2Bytes(batchID), -1) + def setLastBatchID(batchID: BatchID) : scala.util.Try[Stat] = { + scala.util.Try { + zk.setData(LastBatchIDZKPath, batchID2Bytes(batchID), -1) + } } def getLastBatchID() : Option[BatchID] = { @@ -192,7 +199,7 @@ class HBaseVersionedStore [K, V, K2, V2](quorum: Seq[String], batchID2Bytes.invert(batchBytes) match { case Success(batchID) => Some(batchID) - case _ => None + case Failure(ex) => None } }