From afa9a01c1179e86cb9e97b54d2860a211e36c1a5 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 20 Nov 2014 15:22:48 -0800 Subject: [PATCH 1/4] Implemented driver HA testing harness to test with filestream. --- lib/sparkperf/main.py | 2 +- .../project/StreamingTestsBuild.scala | 8 +- .../streaming/perf/DriverRecoveryTest.scala | 312 ++++++++++++++++++ .../perf/DriverRecoveryTestApp.scala | 136 ++++++++ ...ecoveryTest.scala => FileStreamTest.scala} | 2 +- .../scala/streaming/perf/KVDataTest.scala | 7 +- .../main/scala/streaming/perf/PerfTest.scala | 10 +- .../streaming/perf/util/FileGenerator.scala | 36 +- .../scala/streaming/perf/util/Utils.scala | 92 ++++-- .../src/resources/log4j.properties | 19 ++ .../perf/DriverRecoveryTestSuite.scala | 43 +++ .../scala/streaming/perf/UtilsSuite.scala | 35 ++ 12 files changed, 640 insertions(+), 62 deletions(-) create mode 100644 streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala create mode 100644 streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTestApp.scala rename streaming-tests/src/main/scala/streaming/perf/{HdfsRecoveryTest.scala => FileStreamTest.scala} (98%) create mode 100644 streaming-tests/src/resources/log4j.properties create mode 100644 streaming-tests/src/test/scala/streaming/perf/DriverRecoveryTestSuite.scala create mode 100644 streaming-tests/src/test/scala/streaming/perf/UtilsSuite.scala diff --git a/lib/sparkperf/main.py b/lib/sparkperf/main.py index 1d09080..ba94395 100755 --- a/lib/sparkperf/main.py +++ b/lib/sparkperf/main.py @@ -131,6 +131,6 @@ config.MLLIB_OUTPUT_FILENAME) print("All tests have finished running. Stopping Spark standalone cluster ...") -cluster.stop() +# cluster.stop() print("Finished running all tests.") diff --git a/streaming-tests/project/StreamingTestsBuild.scala b/streaming-tests/project/StreamingTestsBuild.scala index 3c30915..c30983c 100644 --- a/streaming-tests/project/StreamingTestsBuild.scala +++ b/streaming-tests/project/StreamingTestsBuild.scala @@ -16,11 +16,11 @@ object StreamingTestsBuild extends Build { "org.scalatest" %% "scalatest" % "2.2.1" % "test", "com.google.guava" % "guava" % "14.0.1", "org.slf4j" % "slf4j-log4j12" % "1.7.2", - "org.apache.spark" %% "spark-core" % "1.0.0" % "provided", - "org.apache.spark" %% "spark-streaming" % "1.0.0" % "provided" + "org.apache.spark" %% "spark-core" % "1.2.0-SNAPSHOT" % "provided", + "org.apache.spark" %% "spark-streaming" % "1.2.0-SNAPSHOT" % "provided" ), test in assembly := {}, - outputPath in assembly := file("target/spark-perf-tests-assembly.jar"), + outputPath in assembly := file("target/streaming-perf-tests-assembly.jar"), assemblyOption in assembly ~= { _.copy(includeScala = false) }, mergeStrategy in assembly := { case PathList("META-INF", xs@_*) => @@ -37,4 +37,4 @@ object StreamingTestsBuild extends Build { case _ => MergeStrategy.first } )) -} \ No newline at end of file +} diff --git a/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala b/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala new file mode 100644 index 0000000..2620fef --- /dev/null +++ b/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala @@ -0,0 +1,312 @@ +package streaming.perf + +import java.io.File + +import scala.collection.JavaConversions._ +import scala.collection.mutable.ArrayBuffer +import scala.sys.process._ +import scala.util.Random + +import akka.actor._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.spark.SparkConf +import org.apache.spark.deploy.Client +import streaming.perf.util._ + +sealed trait DriverRecoveryTestMessage extends Serializable +object Ready extends DriverRecoveryTestMessage +case class RegisterApp(appActor: ActorRef) extends DriverRecoveryTestMessage +object DeregisterApp extends DriverRecoveryTestMessage +case class GotCounts(time: Long, counts: Array[(String, Long)]) extends DriverRecoveryTestMessage +case class KillApp(exitCode: Int) extends DriverRecoveryTestMessage + +class DriverRecoveryTestActor(recoveryTest: DriverRecoveryTest) extends Actor { + + var appActor: ActorRef = null + + def receive = { + case RegisterApp(actor) => + recoveryTest.onRegister() + appActor = actor + sender ! true + + case DeregisterApp => + recoveryTest.onDeregister() + appActor = null + sender ! true + + case GotCounts(time, counts) => + recoveryTest.onGotCounts(time, counts) + + case KillApp(exitCode) => + if (appActor != null) { + appActor ! KillApp(exitCode) + } + + case Ready => + sender ! true + } +} + +object DriverRecoveryTestActor { + val actorName = "TestActor" + val actorSystemName = "DriverRecoveryTest" + + def createActor(recoveryTest: DriverRecoveryTest): (ActorSystem, ActorRef, String) = { + val (actorSystem, actorSystemPort, testActor) = Utils.createActorSystemAndActor( + actorSystemName, actorName, Props(new DriverRecoveryTestActor(recoveryTest))) + val testActorUrl = + s"akka.tcp://${actorSystem.name}@${Utils.localIpAddress}:$actorSystemPort/user/$actorName" + (actorSystem, testActor, testActorUrl) + } +} + +class DriverRecoveryTest extends PerfTest { + + private val USE_RECEIVER = ("use-receiver", "whether to use receiver to generate data") + + override def booleanOptions = super.booleanOptions ++ Seq(USE_RECEIVER) + + private val APP_START_WAIT_TIMEOUT = 10 * 1000 + private val MIN_APP_KILL_INTERVAL = 60 * 1000 + private val MAX_APP_KILL_INTERVAL = 90 * 1000 + private val RECOVERY_WAIT_TIMEOUT = 120 * 1000 + + private val RECORDS_PER_FILE = 10000 + private val FILE_CLEANER_DELAY = 300 + + private val master = new SparkConf().get("spark.master", "local[4]") + private val standaloneModeDriverLauncherClass = Client.getClass.getName.stripSuffix("$") + + protected val registerTimes = new ArrayBuffer[Long] + protected val deregisterTimes = new ArrayBuffer[Long] + protected val upTimes = new ArrayBuffer[Long] + protected val downTimes = new ArrayBuffer[Long] + + protected var expectedCounts: Set[Long] = Set.empty + + protected var hasStopped: Boolean = false + protected var hasStartedOrRestarted: Boolean = false + protected var hasGotFirstCount: Boolean = false + protected var hasCountsBeenGeneratedAfterStart: Boolean = false + protected var hasCountsMatched: Boolean = true + protected var lastStartTime: Long = -1 + protected var lastStopTime: Long = -1 + + override def run(): String = { + val useReceiver = booleanOptionValue(USE_RECEIVER) + var fileGenerator: FileGenerator = null + + // Clear earlier checkpoints + val checkpointPath = new Path(checkpointDirectory) + val fs = checkpointPath.getFileSystem(new Configuration()) + if (fs.exists(checkpointPath)) { + fs.delete(checkpointPath, true) + log("Deleted " + checkpointDirectory) + } + + // Create the actor for communication + log("Creating actor") + val (actorSystem, testActor, testActorUrl) = DriverRecoveryTestActor.createActor(this) + log(s"Created actor system ${actorSystem.name}, and " + + s"test actor $testActor with url $testActorUrl") + + // Create the file generator if not using receivers + val dataDirectory = hdfsUrl + "/data/" + val tempDataDirectory = hdfsUrl + "/temp/" + + if (!useReceiver) { + log("Creating file generator") + fileGenerator = new FileGenerator( + dataDirectory, tempDataDirectory, RECORDS_PER_FILE, FILE_CLEANER_DELAY) + fileGenerator.initialize() + expectedCounts = (1L to RECORDS_PER_FILE).map(x => (1L to x).reduce(_ + _)).toSet + log(s"Created file generator in $dataDirectory") + } + + log("Launching app") + val appClass = DriverRecoveryTestApp.getClass.getName.stripSuffix("$") + val appArgs = Seq(master, batchDurationMs.toString, checkpointDirectory, + testActorUrl, useReceiver.toString, dataDirectory) + val appDriverId = launchAppAndGetDriverId(appClass, appArgs) + + try { + // Start the generator after a delay + log("Waiting until first count is received") + Utils.waitUntil(() => hasGotFirstCount, APP_START_WAIT_TIMEOUT, + s"App has not generated results even after waiting for $APP_START_WAIT_TIMEOUT millis" + ) + + // Start file generation + log("Starting file generator") + fileGenerator.start() + + // Keep sending kill messages after random intervals + log("Starting to send kill messages after random intervals") + val launchTime = System.currentTimeMillis + def timeSinceLaunch = System.currentTimeMillis - launchTime + while (timeSinceLaunch < totalDurationSec * 1000) { + val sleepDuration = + MIN_APP_KILL_INTERVAL + Random.nextInt(MAX_APP_KILL_INTERVAL - MIN_APP_KILL_INTERVAL) + Thread.sleep(sleepDuration) + killAndRecoverDriver(testActor) + } + + // Check if counts have matched or not + if (!hasCountsMatched) { + throw new Exception("Counts have not matched") + } + } catch { + case e: Exception => + println("Error in the test: ", e) + warn("Error in the test: ", e) + return "FAILED: " + e.getMessage + } finally { + stopApp(appDriverId) + actorSystem.shutdown() + if (fileGenerator != null) { + fileGenerator.stop() + } + } + + // Print stats + log(s"Number of times killed = ${deregisterTimes.size}") + log(s"Average uptime = ${upTimes.sum.toDouble / upTimes.size / 1000.0} sec") + log(s"Average downtime = ${downTimes.sum.toDouble / downTimes.size / 1000.0} sec") + "PASSED" + } + + /** Called when the app registered itself */ + def onRegister() = synchronized { + if (!hasStartedOrRestarted) { + hasStartedOrRestarted = true + val registerTime = System.currentTimeMillis + registerTimes += registerTime + lastStartTime = registerTime + log("=" * 40) + log("App started at " + registerTime) + } else { + warn("App already started, cannot start again") + } + } + + /** Called when the app deregisters itself */ + def onDeregister() = synchronized { + if (!hasStopped) { + hasStopped = true + val deregisterTime = System.currentTimeMillis + val upTime = deregisterTime - lastStartTime + deregisterTimes += deregisterTime + lastStopTime = deregisterTime + upTimes += upTime + log("App killed at " + deregisterTime + "\nUptime was " + upTime + " ms") + log("=" * 40) + log("App deregistered after " + upTime + " ms") + } else { + warn("App already stopped, cannot stop again") + } + } + + /** Is the app running right now */ + def isRunning() = synchronized { hasStartedOrRestarted && !hasStopped } + + + /** Called when the app reports counts */ + def onGotCounts(time: Long, counts: Array[(String, Long)]) = synchronized { + hasGotFirstCount = true + if (hasStartedOrRestarted) { + if (!hasCountsBeenGeneratedAfterStart) { + val firstCountTime = System.currentTimeMillis + val downTime = firstCountTime - lastStopTime + log("First count after recovery at " + firstCountTime) + if (lastStopTime > 0) { + log("Downtime was " + downTime + " ms") + } + downTimes += downTime + hasCountsBeenGeneratedAfterStart = true + this.notifyAll() + } + } else { + warn("Count received after kill signal but before restart. Ignored.") + } + verifyCounts(time, counts) + } + + /** Launch the test streaming app through spark-submit */ + private def launchAppAndGetDriverId(driverClass: String, driverArgs: Seq[String]): String = { + val appArgsStr = driverArgs.mkString(" ") + val command = s"bin/spark-submit " + + s"--master $master --deploy-mode cluster --supervise --class $driverClass file:$jarFile $appArgsStr" + log("\tCommand: [ " + command + " ]") + val commandResult = new ArrayBuffer[String] + val processBuilder = Process(command, Some(new File(sparkDir)), System.getenv.toArray: _*) + val processLogger = ProcessLogger(commandResult += _) + val exitValue = processBuilder.run(processLogger).exitValue() + log(s"\tCommand result: exit value = $exitValue, output: \n" + + s"${commandResult.map { "\t" + _ }.mkString("\n")}\n---") + commandResult.filter(_.contains("driver-")).headOption.map(_.split(" ").last).getOrElse { + throw new Exception("Could not get driver id after launching") + } + } + + /** Stop by submitting a killing request through DriverClient */ + private def stopApp(appDriverId: String) { + val command = s"$sparkDir/bin/spark-class $standaloneModeDriverLauncherClass kill $master $appDriverId 2>&1" + log("Command: [ " + command + " ]") + val commandResult = command.!! + log(s"Result:\n${commandResult.split("\n").map { "\t" + _ }.mkString("\n")}\n---") + } + + /** Send kill signal and wait for the driver to recover and start generating counts again */ + private def killAndRecoverDriver(testActor: ActorRef) = synchronized { + // Send kill signal + hasStopped = false + hasStartedOrRestarted = false + hasCountsBeenGeneratedAfterStart = false + testActor ! KillApp(-1) + log("Sent kill signal") + + // wait for recovery + this.wait(RECOVERY_WAIT_TIMEOUT) + if (!hasStopped) { + throw new Exception("App driver was not killed within " + RECOVERY_WAIT_TIMEOUT + " ms of kill signal") + } + if (!hasStartedOrRestarted) { + throw new Exception("App driver was not restarted within " + RECOVERY_WAIT_TIMEOUT + " ms of kill signal") + } + if (!hasCountsBeenGeneratedAfterStart) { + throw new Exception("App driver was not recovered as counts were not generated within " + RECOVERY_WAIT_TIMEOUT + " ms of kill signal") + } + } + + /** Verify the counts */ + private def verifyCounts(time: Long, counts: Array[(String, Long)]) { + if (expectedCounts.nonEmpty) { + val matched = counts.forall { + case (word, count) => expectedCounts.contains(count) + } + hasCountsMatched &= matched + val logStr = s"Counts at $time = ${counts.toSeq}" + + (if (!matched) ", no match" else "") + log(logStr + "\n" + ("-" * 40)) + } else { + warn("Expected results not yet configured") + } + } + + /** Do not create a context */ + override protected def createContext() = { + // Do not create a new StreamingContext as this class is not the streaming app. Rather it is going to + // launch the streaming app within the Spark cluster. + null + } + + private def log(message: => String) { + println(s"INFO: $message") + } + + private def warn(message: => String, ex: Exception = null) { + println(s"WARN: $message" + Option(ex).map { e => ": " + e.toString }) + } +} diff --git a/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTestApp.scala b/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTestApp.scala new file mode 100644 index 0000000..9906916 --- /dev/null +++ b/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTestApp.scala @@ -0,0 +1,136 @@ +package streaming.perf + +import java.io.File + +import scala.concurrent.Await +import scala.concurrent.duration._ + +import akka.actor._ +import akka.pattern.ask +import org.apache.spark.rdd.RDD +import org.apache.spark.storage.StorageLevel +import org.apache.spark.streaming.{Milliseconds, StreamingContext, Time} +import org.apache.spark.streaming.StreamingContext._ +import streaming.perf.util.Utils + +class DriverRecoveryTestAppActor(testActorUrl: String) extends Actor { + import DriverRecoveryTestAppActor._ + + val testActor = context.system.actorSelection(testActorUrl) + + override def preStart() { + println(s"Trying to register driver actor with $testActor") + Await.ready(testActor.ask(RegisterApp(this.self))(timeout / 2), timeout / 2) + println(s"Registered app actor") + } + + def receive = { + case Ready => + println("Responding to ready") + sender ! true + + case GotCounts(time, counts) => + println(s"Sending counts of time $time: ${counts.toSeq}") + testActor ! GotCounts(time, counts) + + case KillApp(exitCode) => + println(s"Kill command with exit code $exitCode received") + Await.ready(testActor.ask(DeregisterApp)(timeout), timeout) + println("Deregistered app actor") + System.exit(exitCode) + println("Not killed") // this gets printed only if the exit does not work + + case _ => + println("Unknown message received") + } +} + +object DriverRecoveryTestAppActor { + + val timeout = 5 seconds + + def createActor(testActorUrl: String): (ActorSystem, ActorRef) = { + val (actorSystem, _, appActor) = Utils.createActorSystemAndActor( + "DriverRecoveryTestApp", "AppActor", Props(new DriverRecoveryTestAppActor(testActorUrl))) + Await.ready(appActor.ask(Ready)(timeout), timeout) + (actorSystem, appActor) + } +} + +object DriverRecoveryTestApp { + + @transient var appActor: ActorRef = null + + // Create a new StreamingContext + def createContext( + master: String, + batchDurationMs: Long, + checkpointPath: String, + useReceiver: Boolean, + optionalDataDirectory: Option[String] + ) = { + require( + useReceiver || optionalDataDirectory.nonEmpty, + "if receiver is not to be used, then data directory has to be provided" + ) + // Create the context + println("Creating context") + val jarFile = new File("streaming-perf-tests-assembly.jar").getAbsolutePath + val sparkDir = Option(System.getenv("SPARK_HOME")).getOrElse( throw new Exception("SPARK_HOME not set")) + println("Creating streaming context with spark directory = " + sparkDir + " and jar file = " + jarFile) + val ssc = new StreamingContext(master, "TestRunner: DriverRecoveryTest", + Milliseconds(batchDurationMs), sparkDir, Seq(jarFile)) + ssc.checkpoint(checkpointPath) + // Set up the computation + val inputStream = if (useReceiver) { + ssc.receiverStream[String](null) + } else { + ssc.textFileStream(optionalDataDirectory.get) + } + val wordStream = inputStream.flatMap(_.split(" ")).map(x => (x, 1L)) + val updateFunc = (values: Seq[Long], state: Option[Long]) => { + Some(values.foldLeft(0L)(_ + _) + state.getOrElse(0L)) + } + val runningCountStream = wordStream.updateStateByKey[Long](updateFunc).persist(StorageLevel.MEMORY_AND_DISK_SER) + runningCountStream.checkpoint(Milliseconds(batchDurationMs * 5)) + + val sendCountsToTest = (rdd: RDD[(String, Long)], time: Time) => { + val counts = rdd.collect() + appActor ! GotCounts(time.milliseconds, counts) + } + runningCountStream.foreachRDD(sendCountsToTest) + ssc + } + + def main(args: Array[String]) { + import java.text.SimpleDateFormat + import java.util.Calendar + val dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss") + val cal = Calendar.getInstance() + println("=" * 40) + println("Started at " + dateFormat.format(cal.getTime())) + println("Args: " + args.mkString(", ")) + if (args.size < 5) { + println("Incorrect number of arguments") + println(s"${this.getClass.getSimpleName.stripSuffix("$")} " + + s" " + + s"[]") + System.exit(255) + } + val Array(master, batchDurationMs, checkpointPath, testActorUrl, useReceiver) = args.take(5) + val optionalDataDirectory = Option(args.applyOrElse(5, null)) + println("Parsed args") + appActor = DriverRecoveryTestAppActor.createActor(testActorUrl)._2 + println("Created actor") + val ssc = StreamingContext.getOrCreate(checkpointPath, () => { + createContext(master, batchDurationMs.toLong, + checkpointPath, useReceiver.toBoolean, optionalDataDirectory) + }) + println("Prepared context") + ssc.start() + println("Started") + ssc.awaitTermination() + } +} + + diff --git a/streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala b/streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala similarity index 98% rename from streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala rename to streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala index 23e8b80..e37fcbc 100644 --- a/streaming-tests/src/main/scala/streaming/perf/HdfsRecoveryTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala @@ -7,7 +7,7 @@ import org.apache.spark.rdd.RDD import org.apache.spark.storage.StorageLevel import streaming.perf.util.FileGenerator -class HdfsRecoveryTest extends PerfTest { +class FileStreamTest extends PerfTest { val RECORDS_PER_FILE = ("records-per-file", "Number records per file") val FILE_CLEANER_DELAY = ("file-cleaner-delay", "Delay (secs) in cleaning up generated files") diff --git a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala index 628376d..87768d3 100644 --- a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala @@ -20,7 +20,7 @@ abstract class KVDataTest extends PerfTest { val UNIQUE_KEYS = ("unique-keys", "(approx) number of unique keys") val UNIQUE_VALUES = ("unique-values", "(approx) number of unique values per key") val MEMORY_SERIALIZATION = ("memory-serialization", "whether memory-persisted data is serialized") - val USE_RECEIVER = ("use-receiver", "false") + val USE_RECEIVER = ("use-receiver", "whether to use receiver to generate data") //val KEY_LENGTH = ("key-length", "length of keys in characters") //val VALUE_LENGTH = ("value-length", "length of values in characters") @@ -70,7 +70,10 @@ abstract class KVDataTest extends PerfTest { // run test ssc.start() val startTime = System.currentTimeMillis - ssc.awaitTermination(totalDurationSec * 1000) + while (System.currentTimeMillis - startTime < totalDurationSec * 1000) { + ssc.awaitTermination(10 * 1000) + println(s"Time left: ${(System.currentTimeMillis - startTime) / 1000.0} seconds") + } ssc.stop() processResults(statsReportListener) } diff --git a/streaming-tests/src/main/scala/streaming/perf/PerfTest.scala b/streaming-tests/src/main/scala/streaming/perf/PerfTest.scala index 458ebbf..6277a44 100644 --- a/streaming-tests/src/main/scala/streaming/perf/PerfTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/PerfTest.scala @@ -34,7 +34,7 @@ abstract class PerfTest { /** Initialize internal state based on arguments */ def initialize(testName_ : String, otherArgs: Array[String]) { - // add all the options to parser + // Add all the options to parser longOptions.map{case (opt, desc) => println("Registering long option " + opt) parser.accepts(opt, desc).withRequiredArg().ofType(classOf[Long]).required() @@ -55,14 +55,16 @@ abstract class PerfTest { hdfsUrl = stringOptionValue(HDFS_URL) checkpointDirectory = hdfsUrl + "/checkpoint/" ssc = createContext() - ssc.checkpoint(checkpointDirectory) - sc = ssc.sparkContext + if (ssc != null) { + ssc.checkpoint(checkpointDirectory) + sc = ssc.sparkContext + } } /** Runs the test and returns a series of results, along with values of any parameters */ def run(): String - protected def createContext() = { + protected def createContext(): StreamingContext = { val conf = new SparkConf().setAppName(testName) val sparkContext = new SparkContext(conf) new StreamingContext(sparkContext, Milliseconds(batchDurationMs)) diff --git a/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala b/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala index 06c7298..ccd9a1a 100644 --- a/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala +++ b/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala @@ -8,8 +8,14 @@ import java.util.Calendar import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} +import org.apache.spark.Logging -class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Long, cleanerDelay: Long) { +class FileGenerator( + dataDir: String, + tempDataDir: String, + maxRecordsPerFile: Long, + cleanerDelay: Long + ) extends Logging { val MAX_TRIES = 100 val MAX_KEYS = 1000 @@ -45,14 +51,14 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon deletingThread.setDaemon(true) generatingThread.start() deletingThread.start() - println("FileGenerator started") + logInfo("FileGenerator started") } /** Stop generating files */ def stop() { generatingThread.interrupt() deletingThread.interrupt() - println("FileGenerator Interrupted") + logInfo("FileGenerator Interrupted") } /** Delete test directory */ @@ -78,9 +84,9 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon val finalFile = new Path(dataDir, "file-" + time + "-" + key + "-" + count) val generated = copyFile(localFile, finalFile) if (generated) { - println("Generated file #" + count + " at " + System.currentTimeMillis() + ": " + finalFile) + logInfo("Generated file #" + count + " at " + System.currentTimeMillis() + ": " + finalFile) } else { - println("Could not generate file #" + count + ": " + finalFile) + logError("Could not generate file #" + count + ": " + finalFile) System.exit(0) } Thread.sleep(INTERVAL) @@ -88,9 +94,9 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon } } catch { case ie: InterruptedException => - println("File generating thread interrupted") + logWarning("File generating thread interrupted") case e: Exception => - println("Error generating files", e) + logError("Error generating files", e) System.exit(0) } } @@ -102,7 +108,7 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon while (!done && tries < MAX_TRIES) { tries += 1 try { - println("Copying from " + localFile + " to " + tempFile) + logDebug("Copying from " + localFile + " to " + tempFile) fs.copyFromLocalFile(new Path(localFile.toString), tempFile) //if (fs.exists(tempFile)) println("" + tempFile + " exists") else println("" + tempFile + " does not exist") //println("Renaming from " + tempFile + " to " + finalFile) @@ -110,7 +116,7 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon done = true } catch { case ioe: IOException => - println("Attempt " + tries + " at generating file " + finalFile + " failed.", ioe) + logError("Attempt " + tries + " at generating file " + finalFile + " failed.", ioe) reset() } finally { // if (fs.exists(tempFile)) fs.delete(tempFile, true) @@ -133,19 +139,19 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon modTime < oldFileThreshTime } } - println("Finding files older than " + oldFileThreshTime) + logDebug("Finding files older than " + oldFileThreshTime) val oldFiles = fs.listStatus(dataDirectory, newFilter).map(_.getPath) - println("Found " + oldFiles.size + " old files") + logInfo("Found " + oldFiles.size + " old files") oldFiles.foreach(file => { - println("Deleting file " + file) + logInfo("Deleting file " + file) fs.delete(file, true) }) } catch { case ie: InterruptedException => interrupted = true - println("File deleting thread interrupted") + logWarning("File deleting thread interrupted") case e: Exception => - println("Deleting files gave error ", e) + logError("Deleting files gave error ", e) reset() } } @@ -164,7 +170,7 @@ class FileGenerator(dataDir: String, tempDataDir: String, maxRecordsPerFile: Lon line = br.readLine() } br.close() - println("Local file has " + count + " occurrences of " + expectedWord + + logDebug("Local file has " + count + " occurrences of " + expectedWord + (if (count != expectedCount) ", expected was " + expectedCount else "")) } diff --git a/streaming-tests/src/main/scala/streaming/perf/util/Utils.scala b/streaming-tests/src/main/scala/streaming/perf/util/Utils.scala index 4ac0c60..4177c0b 100644 --- a/streaming-tests/src/main/scala/streaming/perf/util/Utils.scala +++ b/streaming-tests/src/main/scala/streaming/perf/util/Utils.scala @@ -4,59 +4,81 @@ import java.net.{Inet4Address, InetAddress, NetworkInterface} import scala.collection.JavaConversions._ -import akka.actor.{ActorSystem, ExtendedActorSystem} +import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem, Props} import com.typesafe.config.ConfigFactory object Utils { lazy val localIpAddress = findLocalIpAddress() def findLocalIpAddress(): String = { - val defaultIpOverride = System.getenv("SPARK_LOCAL_IP") - if (defaultIpOverride != null) { - defaultIpOverride - } else { - val address = InetAddress.getLocalHost - if (address.isLoopbackAddress) { - // Address resolves to something like 127.0.1.1, which happens on Debian; try to find - // a better address using the local network interfaces - for (ni <- NetworkInterface.getNetworkInterfaces) { - for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress && - !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) { - // We've found an address that looks reasonable! - println("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" + - " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress + - " instead (on interface " + ni.getName + ")") - println("Set SPARK_LOCAL_IP if you need to bind to another address") - return addr.getHostAddress - } + val address = InetAddress.getLocalHost + if (address.isLoopbackAddress) { + // Address resolves to something like 127.0.1.1, which happens on Debian; try to find + // a better address using the local network interfaces + // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order + // on unix-like system. + val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList + for (ni <- activeNetworkIFs) { + for (addr <- ni.getInetAddresses if !addr.isLinkLocalAddress && + !addr.isLoopbackAddress && addr.isInstanceOf[Inet4Address]) { + // We've found an address that looks reasonable! + println("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" + + " a loopback address: " + address.getHostAddress + "; using " + addr.getHostAddress + + " instead (on interface " + ni.getName + ")") + return addr.getHostAddress } - println("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" + - " a loopback address: " + address.getHostAddress + ", but we couldn't find any" + - " external IP address!") - println("Set SPARK_LOCAL_IP if you need to bind to another address") } - InetAddress.getByName(address.getHostAddress).getHostName + println("Your hostname, " + InetAddress.getLocalHost.getHostName + " resolves to" + + " a loopback address: " + address.getHostAddress + ", but we couldn't find any" + + " external IP address!") } + address.getHostAddress } - def createActorSystem(name: String): (ActorSystem, Int) = { - val host = localIpAddress - val port = 0 - val akkaConf = ConfigFactory.parseString( - s""" + def createActorSystemAndActor( + actorSystemName: String, + actorName: String, + actorProps: Props + ): (ActorSystem, Int, ActorRef) = { + var actorSystem: ActorSystem = null + var actorSystemPort: Int = - 1 + var actorRef: ActorRef = null + + try { + // Starting actor systems + val port = 0 + val akkaConf = ConfigFactory.parseString(s""" |akka.daemonic = on |akka.loggers = [""akka.event.slf4j.Slf4jLogger""] |akka.actor.provider = "akka.remote.RemoteActorRefProvider" |akka.remote.netty.tcp.transport-class = "akka.remote.transport.netty.NettyTransport" - |akka.remote.netty.tcp.hostname = "$host" |akka.remote.netty.tcp.port = $port |akka.remote.netty.tcp.tcp-nodelay = on """.stripMargin) - val actorSystem = ActorSystem(name, akkaConf) - val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider - val boundPort = provider.getDefaultAddress.port.get - println("Created actor system " + name + " on port " + boundPort) - (actorSystem, boundPort) + actorSystem = ActorSystem(actorSystemName, akkaConf) + val provider = actorSystem.asInstanceOf[ExtendedActorSystem].provider + actorSystemPort = provider.getDefaultAddress.port.get + + // Starting actor + actorRef = actorSystem.actorOf(actorProps, actorName) + + } catch { + case e: Exception => + println(s"Error starting actor system $actorSystemName and actor $actorName", e) + if (actorSystem != null) { + actorSystem.shutdown() + } + throw e + } + (actorSystem, actorSystemPort, actorRef) + } + + def waitUntil(condition: () => Boolean, timeoutMillis: Long, timeoutMessage: String = "") { + val startTimeMillis = System.currentTimeMillis + while (!condition() && System.currentTimeMillis - startTimeMillis < timeoutMillis) { + Thread.sleep(100) + } + assert(condition(), timeoutMessage) } } diff --git a/streaming-tests/src/resources/log4j.properties b/streaming-tests/src/resources/log4j.properties new file mode 100644 index 0000000..ed9d054 --- /dev/null +++ b/streaming-tests/src/resources/log4j.properties @@ -0,0 +1,19 @@ +# Set everything to be logged to the stderr +log4j.rootCategory=INFO, stderr + +# Set streaming perf test logs to be logged to the stdout +log4j.logger.streaming.perf=DEBUG, stdout +log4j.logger.streaming.perf.util=INFO, stderr + +# Setup stderr appender +log4j.appender.stderr=org.apache.log4j.ConsoleAppender +log4j.appender.stderr.target=System.err +log4j.appender.stderr.layout=org.apache.log4j.PatternLayout +log4j.appender.stderr.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + +# Setup stderr appender +log4j.appender.stdout=org.apache.log4j.ConsoleAppender +log4j.appender.stdout.target=System.out +log4j.appender.stdout.layout=org.apache.log4j.PatternLayout +log4j.appender.stdout.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n + diff --git a/streaming-tests/src/test/scala/streaming/perf/DriverRecoveryTestSuite.scala b/streaming-tests/src/test/scala/streaming/perf/DriverRecoveryTestSuite.scala new file mode 100644 index 0000000..d1d51e9 --- /dev/null +++ b/streaming-tests/src/test/scala/streaming/perf/DriverRecoveryTestSuite.scala @@ -0,0 +1,43 @@ +package streaming.perf + +import akka.actor.ActorSystem +import akka.pattern.ask +import org.scalatest._ +import org.scalatest.concurrent.Eventually +import scala.concurrent.Await + +import org.scalatest.FunSuite +import streaming.perf.util.Utils._ +import scala.concurrent.duration._ + +class DriverRecoveryTestSuite extends FunSuite with Eventually with BeforeAndAfter { + var appActorSystem: ActorSystem = null + var testActorSystem: ActorSystem = null + + after { + if (testActorSystem != null) { + testActorSystem.shutdown() + } + if (appActorSystem != null) { + appActorSystem.shutdown() + } + } + + test("Actor connectivity") { + val askTimeout = 100 milliseconds + val test = new DriverRecoveryTest + val (system1, testActor, testActorUrl) = DriverRecoveryTestActor.createActor(test) + testActorSystem = system1 + assert(Await.result(testActor.ask(Ready)(askTimeout).mapTo[Boolean], askTimeout)) + println("Test actor ready") + + val (system2, appActor) = DriverRecoveryTestAppActor.createActor(testActorUrl) + appActorSystem = system2 + assert(Await.result(appActor.ask(Ready)(askTimeout).mapTo[Boolean], askTimeout)) + println("App actor ready") + + eventually(timeout(2 seconds), interval(100 milliseconds)) { + assert(test.isRunning(), "App actor has not registered with test actor") + } + } +} diff --git a/streaming-tests/src/test/scala/streaming/perf/UtilsSuite.scala b/streaming-tests/src/test/scala/streaming/perf/UtilsSuite.scala new file mode 100644 index 0000000..edb84ab --- /dev/null +++ b/streaming-tests/src/test/scala/streaming/perf/UtilsSuite.scala @@ -0,0 +1,35 @@ +package streaming.perf + +import akka.actor.{ActorSystem, Actor, Props} +import akka.pattern.ask + +import org.scalatest.FunSuite +import streaming.perf.util.Utils._ +import scala.concurrent.Await +import scala.concurrent.duration._ + + +class UtilsSuite extends FunSuite { + class TestActor extends Actor { + println("Started") + def receive = { + case _ => sender ! true + } + } + + test("Actor creation") { + val askTimeout = (100 milliseconds) + var actorSystem: ActorSystem = null + + try { + val (system, _, actorRef) = + createActorSystemAndActor("System", "Actor", Props(new TestActor)) + println("Started actor") + actorSystem = system + Await.result(actorRef.ask(new Object)(askTimeout).mapTo[Boolean], askTimeout) + println("Got results") + } finally { + actorSystem.shutdown() + } + } +} From bcc1e6f2caa56b53184114507f219ca28d6ecec2 Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Thu, 20 Nov 2014 23:31:51 +0000 Subject: [PATCH 2/4] Added CountTest to KVDataTest --- config/config.py.template | 3 +++ .../src/main/scala/streaming/perf/KVDataTest.scala | 13 ++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/config/config.py.template b/config/config.py.template index ab58a25..e0f0d88 100755 --- a/config/config.py.template +++ b/config/config.py.template @@ -255,6 +255,9 @@ STREAMING_HDFS_RECOVERY_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_dura STREAMING_TESTS += [("basic", "streaming.perf.TestRunner", SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, [ConstantOption("basic")] + STREAMING_COMMON_OPTS + streaming_batch_duration_opts(1000))] +STREAMING_TESTS += [("count", "streaming.perf.TestRunner", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("count")] + STREAMING_KEY_VAL_TEST_OPTS)] + STREAMING_TESTS += [("state-by-key", "streaming.perf.TestRunner", SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, [ConstantOption("state-by-key")] + STREAMING_KEY_VAL_TEST_OPTS)] diff --git a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala index 628376d..6bc50bc 100644 --- a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala @@ -70,7 +70,11 @@ abstract class KVDataTest extends PerfTest { // run test ssc.start() val startTime = System.currentTimeMillis - ssc.awaitTermination(totalDurationSec * 1000) + while (System.currentTimeMillis - startTime < totalDurationSec * 1000) { + ssc.awaitTermination(60 * 1000) + println(s"Time left: ${(System.currentTimeMillis - startTime) / 1000.0} seconds") + } + ssc.stop() processResults(statsReportListener) } @@ -122,6 +126,13 @@ abstract class WindowKVDataTest extends KVDataTest { } } +class CountTest extends KVDataTest { + // Setup the streamign computation + def setupOutputStream(inputStream: DStream[(String, String)]): DStream[_] = { + inputStream // this stream will get automatically counted, see KVDataTest.run + } +} + class StateByKeyTest extends KVDataTest { // Setup the streaming computations def setupOutputStream(inputStream: DStream[(String, String)]): DStream[_] = { From 945cf847f92a5f4391dcf1f4d194f1d3b5fd16ef Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Nov 2014 16:00:14 -0800 Subject: [PATCH 3/4] Many more changes to the driver HA files. --- .../streaming/perf/DriverRecoveryTest.scala | 6 +- .../scala/streaming/perf/FileStreamTest.scala | 7 +- .../scala/streaming/perf/KVDataTest.scala | 2 +- .../streaming/perf/util/FileGenerator.scala | 70 ++++++++++++++----- 4 files changed, 60 insertions(+), 25 deletions(-) diff --git a/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala b/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala index 2620fef..30a9f02 100644 --- a/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/DriverRecoveryTest.scala @@ -68,7 +68,7 @@ class DriverRecoveryTest extends PerfTest { override def booleanOptions = super.booleanOptions ++ Seq(USE_RECEIVER) - private val APP_START_WAIT_TIMEOUT = 10 * 1000 + private val APP_START_WAIT_TIMEOUT = 60 * 1000 private val MIN_APP_KILL_INTERVAL = 60 * 1000 private val MAX_APP_KILL_INTERVAL = 90 * 1000 private val RECOVERY_WAIT_TIMEOUT = 120 * 1000 @@ -118,8 +118,8 @@ class DriverRecoveryTest extends PerfTest { if (!useReceiver) { log("Creating file generator") - fileGenerator = new FileGenerator( - dataDirectory, tempDataDirectory, RECORDS_PER_FILE, FILE_CLEANER_DELAY) + fileGenerator = new FileGenerator(dataDirectory, tempDataDirectory, + RECORDS_PER_FILE, 2 * batchDurationMs.toInt, FILE_CLEANER_DELAY) fileGenerator.initialize() expectedCounts = (1L to RECORDS_PER_FILE).map(x => (1L to x).reduce(_ + _)).toSet log(s"Created file generator in $dataDirectory") diff --git a/streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala b/streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala index e37fcbc..a5d8744 100644 --- a/streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/FileStreamTest.scala @@ -9,9 +9,10 @@ import streaming.perf.util.FileGenerator class FileStreamTest extends PerfTest { val RECORDS_PER_FILE = ("records-per-file", "Number records per file") + val FILE_GENERATION_INTERVAL = ("file-generation-interval", "Interval (ms) at which files will be generated") val FILE_CLEANER_DELAY = ("file-cleaner-delay", "Delay (secs) in cleaning up generated files") - override def longOptions = super.longOptions ++ Seq(RECORDS_PER_FILE, FILE_CLEANER_DELAY) + override def longOptions = super.longOptions ++ Seq(RECORDS_PER_FILE, FILE_CLEANER_DELAY, FILE_GENERATION_INTERVAL) override def stringOptions = super.stringOptions ++ Seq(HDFS_URL) @@ -20,11 +21,13 @@ class FileStreamTest extends PerfTest { // Define variables val maxRecordsPerFile = longOptionValue(RECORDS_PER_FILE) val cleanerDelay = longOptionValue(FILE_CLEANER_DELAY) + val fileGenerationInterval = longOptionValue(FILE_GENERATION_INTERVAL) val dataDirectory = hdfsUrl + "/data/" val tempDataDirectory = hdfsUrl + "/temp/" // Create the file generator - val fileGenerator = new FileGenerator(dataDirectory, tempDataDirectory, maxRecordsPerFile, cleanerDelay) + val fileGenerator = new FileGenerator(dataDirectory, tempDataDirectory, + maxRecordsPerFile, fileGenerationInterval.toInt, cleanerDelay) fileGenerator.initialize() // Setup computation diff --git a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala index 87768d3..0316c1b 100644 --- a/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala +++ b/streaming-tests/src/main/scala/streaming/perf/KVDataTest.scala @@ -92,7 +92,7 @@ abstract class KVDataTest extends PerfTest { object KVDataTest { val IGNORED_BATCHES = 10 - + // Generate statistics from the processing data def processResults(statsReportListener: StatsReportListener): String = { val processingDelays = statsReportListener.batchInfos.flatMap(_.processingDelay).map(_.toDouble / 1000.0) diff --git a/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala b/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala index ccd9a1a..42a390d 100644 --- a/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala +++ b/streaming-tests/src/main/scala/streaming/perf/util/FileGenerator.scala @@ -5,6 +5,8 @@ import java.nio.charset.Charset import java.text.SimpleDateFormat import java.util.Calendar +import scala.util.Random + import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FileSystem, Path, PathFilter} @@ -14,12 +16,13 @@ class FileGenerator( dataDir: String, tempDataDir: String, maxRecordsPerFile: Long, - cleanerDelay: Long + intervalBetweenFiles: Int, + cleanerDelay: Long, + filesPresentBeforeStart: Boolean = false ) extends Logging { val MAX_TRIES = 100 val MAX_KEYS = 1000 - val INTERVAL = 100 val VERIFY_LOCAL_FILES = false val dataDirectory = new Path(dataDir) @@ -27,7 +30,6 @@ class FileGenerator( val localFile = new File(Files.createTempDir(), "temp") val tempFile = new Path(tempDataDirectory, "temp-file") val conf = new Configuration() - // val initFile = new Path(dataDirectory, "test") val generatingThread = new Thread() { override def run() { generateFiles() }} val deletingThread = new Thread() { override def run() { deleteOldFiles() }} val df = new SimpleDateFormat("MM-dd-HH-mm-ss-SSS") @@ -43,6 +45,9 @@ class FileGenerator( fs.delete(tempDataDirectory, true) } fs.mkdirs(tempDataDirectory) + if (filesPresentBeforeStart) { + generateFileBeforeStart() + } } /** Start generating files */ @@ -81,15 +86,17 @@ class FileGenerator( Files.append(word + " " + newLine, localFile, Charset.defaultCharset()) if (VERIFY_LOCAL_FILES) verifyLocalFile(word, count) val time = df.format(Calendar.getInstance().getTime()) - val finalFile = new Path(dataDir, "file-" + time + "-" + key + "-" + count) + val finalFile = new Path(dataDir, s"file-$time-$key-$count") val generated = copyFile(localFile, finalFile) if (generated) { - logInfo("Generated file #" + count + " at " + System.currentTimeMillis() + ": " + finalFile) + logInfo(s"Generated file #$count at ${System.currentTimeMillis}: $finalFile") } else { - logError("Could not generate file #" + count + ": " + finalFile) - System.exit(0) + logError(s"Could not generate file #$count:$finalFile") + System.exit(255) } - Thread.sleep(INTERVAL) + val sleepTime = Random.nextInt(intervalBetweenFiles) + logDebug(s"Waiting for $sleepTime ms before generating next file") + Thread.sleep(sleepTime) } } } catch { @@ -97,10 +104,35 @@ class FileGenerator( logWarning("File generating thread interrupted") case e: Exception => logError("Error generating files", e) - System.exit(0) + System.exit(255) } } + private def generateFileBeforeStart() { + try { + val word = "word0" + if (localFile.exists()) localFile.delete() + Files.append(Seq.fill(10)(word).mkString(" ") + "\n", localFile, Charset.defaultCharset()) + val time = df.format(Calendar.getInstance().getTime()) + for (count <- 1 to 10) { + val finalFile = new Path(dataDir, s"file-$time-word0-$count") + val generated = copyFile(localFile, finalFile) + if (generated) { + logInfo(s"Generated file #$count at ${System.currentTimeMillis} before start: $finalFile") + } else { + logError(s"Could not generate file #$count before start:$finalFile") + System.exit(255) + } + } + } catch { + case e: Exception => + logError("Error generating files before start", e) + System.exit(255) + } + } + + + /** Copies a local file to a HDFS path */ private def copyFile(localFile: File, finalFile: Path): Boolean = { var done = false @@ -108,18 +140,18 @@ class FileGenerator( while (!done && tries < MAX_TRIES) { tries += 1 try { - logDebug("Copying from " + localFile + " to " + tempFile) + logDebug(s"Copying from $localFile to $tempFile") fs.copyFromLocalFile(new Path(localFile.toString), tempFile) - //if (fs.exists(tempFile)) println("" + tempFile + " exists") else println("" + tempFile + " does not exist") - //println("Renaming from " + tempFile + " to " + finalFile) - if (!fs.rename(tempFile, finalFile)) throw new Exception("Could not rename " + tempFile + " to " + finalFile) + if (!fs.rename(tempFile, finalFile)) { + throw new Exception(s"Could not rename $tempFile to $finalFile") + } done = true } catch { case ioe: IOException => - logError("Attempt " + tries + " at generating file " + finalFile + " failed.", ioe) + logError(s"Attempt $tries at generating file $finalFile failed.", ioe) reset() } finally { - // if (fs.exists(tempFile)) fs.delete(tempFile, true) + if (fs.exists(tempFile)) fs.delete(tempFile, true) } } done @@ -135,7 +167,7 @@ class FileGenerator( val newFilter = new PathFilter() { def accept(path: Path): Boolean = { val modTime = fs.getFileStatus(path).getModificationTime() - //println("Mod time for " + path + " is " + modTime) + logDebug(s"Mod time for $path is $modTime") modTime < oldFileThreshTime } } @@ -151,7 +183,7 @@ class FileGenerator( interrupted = true logWarning("File deleting thread interrupted") case e: Exception => - logError("Deleting files gave error ", e) + logError("Deleting files gave error", e) reset() } } @@ -170,8 +202,8 @@ class FileGenerator( line = br.readLine() } br.close() - logDebug("Local file has " + count + " occurrences of " + expectedWord + - (if (count != expectedCount) ", expected was " + expectedCount else "")) + logDebug(s"Local file has $count occurrences of $expectedWord" + + (if (count != expectedCount) s", expected was $expectedCount" else "")) } private def fs: FileSystem = synchronized { From 3956192e35b56896c95adc87833f6aa9cef882aa Mon Sep 17 00:00:00 2001 From: Tathagata Das Date: Tue, 25 Nov 2014 16:08:16 -0800 Subject: [PATCH 4/4] Updated spark perf template. --- config/config.py.template | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/config/config.py.template b/config/config.py.template index 02657e4..256ffc1 100755 --- a/config/config.py.template +++ b/config/config.py.template @@ -329,11 +329,16 @@ STREAMING_KEY_VAL_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_duration_o OptionSet("use-receiver", ["true"]), ] -STREAMING_HDFS_RECOVERY_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_duration_opts(5000) + [ +STREAMING_FILE_STREAM_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_duration_opts(5000) + [ OptionSet("records-per-file", [10000]), + OptionSet("file-generation-interval", [100]), OptionSet("file-cleaner-delay", [300]) ] +STREAMING_DRIVER_RECOVERY_TEST_OPTS = STREAMING_COMMON_OPTS + streaming_batch_duration_opts(500) + [ + OptionSet("use-receiver", ["false"]) +] + # This test is just to see if everything is setup properly STREAMING_TESTS += [("basic", "streaming.perf.TestRunner", SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, [ConstantOption("basic")] + STREAMING_COMMON_OPTS + streaming_batch_duration_opts(1000))] @@ -353,6 +358,9 @@ STREAMING_TESTS += [("reduce-by-key-and-window", "streaming.perf.TestRunner", SC STREAMING_TESTS += [("hdfs-recovery", "streaming.perf.TestRunner", SCALE_FACTOR, STREAMING_COMMON_JAVA_OPTS, [ConstantOption("hdfs-recovery")] + STREAMING_HDFS_RECOVERY_TEST_OPTS)] +STREAMING_TESTS += [("driver-recovery", "streaming.perf.TestRunner", SCALE_FACTOR, + STREAMING_COMMON_JAVA_OPTS, [ConstantOption("driver-recovery")] + STREAMING_DRIVER_RECOVERY_TEST_OPTS)] + # ================== # # MLlib Test Setup #