diff --git a/src/main/scala/com/workflowfm/simulator/Coordinator.scala b/src/main/scala/com/workflowfm/simulator/Coordinator.scala index 16ab595b..7cab583d 100644 --- a/src/main/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/main/scala/com/workflowfm/simulator/Coordinator.scala @@ -72,6 +72,8 @@ class Coordinator( */ val waiting: Map[ActorRef, List[UUID]] = Map[ActorRef, List[UUID]]() + val parents: Map[ActorRef, ActorRef] = Map[ActorRef, ActorRef]() + /** Set of simulation names that are running, i.e. they have already started but not finished. */ /** * Set of simulation names that are running. @@ -240,9 +242,10 @@ class Coordinator( * time. * @param actor The reference to the [[Simulation]] corresponding to the simulation. */ - protected def addSimulation(t: Long, actor: ActorRef) = { + protected def addSimulation(t: Long, actor: ActorRef, parent: Option[ActorRef]=None) = { publish(ESimAdd(self, time, actor.toString(), t)) if (t >= time) events += StartingSim(t, actor) + if (parent.isDefined) parents += actor -> parent.get } /** @@ -303,6 +306,11 @@ class Coordinator( protected def stopSimulation(name: String, result: String, actor: ActorRef) = { simulations -= name waiting -= actor + parents.get(actor) map {x=> + val id = java.util.UUID.randomUUID + waiting += x -> List(id) + x ! Simulation.SimCompleted(actor,time,id) + } publish(ESimEnd(self, time, name, result)) log.debug(s"[COORD:$time] Finished: [${actor.path.name}]") ready(actor) @@ -543,9 +551,9 @@ class Coordinator( * @return The [[Receive]] behaviour. */ def receiveBehaviour: Receive = { - case Coordinator.AddSim(t, s) => addSimulation(t, s) + case Coordinator.AddSim(t, s, p) => addSimulation(t, s, p) case Coordinator.AddSims(l) => addSimulations(l) - case Coordinator.AddSimNow(s) => addSimulation(time, s) + case Coordinator.AddSimNow(s,p) => addSimulation(time, s, p) case Coordinator.AddSimsNow(l) => addSimulations(l.map((time, _))) case Coordinator.AddResource(r) => addResource(r) @@ -615,7 +623,7 @@ object Coordinator { * Message to add a simulation. * @group toplevel */ - case class AddSim(t: Long, actor: ActorRef) + case class AddSim(t: Long, actor: ActorRef, parent: Option[ActorRef]=None) /** * Message to add a list of simulations. * @group toplevel @@ -625,7 +633,7 @@ object Coordinator { * Message to add a simulation due to start right now. * @group toplevel */ - case class AddSimNow(actor: ActorRef) + case class AddSimNow(actor: ActorRef, parent: Option[ActorRef]=None) /** * Message to add a list of simulations due to start right now. * @group toplevel diff --git a/src/main/scala/com/workflowfm/simulator/Simulation.scala b/src/main/scala/com/workflowfm/simulator/Simulation.scala index 29a0e8da..5d570542 100644 --- a/src/main/scala/com/workflowfm/simulator/Simulation.scala +++ b/src/main/scala/com/workflowfm/simulator/Simulation.scala @@ -62,7 +62,7 @@ import java.util.UUID */ abstract class Simulation( name: String, - coordinator: ActorRef + val coordinator: ActorRef )(implicit executionContext: ExecutionContext) extends Actor { @@ -72,10 +72,13 @@ abstract class Simulation( * @group api * @return A `Future` that completes with a custom output when the simulation is completed. */ + def run(): Future[Any] def complete(task: Task, time: Long): Unit + def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = Unit + /** * Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation. * @@ -123,6 +126,10 @@ abstract class Simulation( coordinator ! Coordinator.AddTask(id, t, resources) } + def sim(actor: ActorRef) { + coordinator ! Coordinator.AddSimNow(actor,Some(self)) + } + /** * Starts the simulation via the [[run]] function. * @@ -187,6 +194,7 @@ abstract class Simulation( def simulationReceive: Receive = { case Simulation.Start => start() case Simulation.TaskCompleted(task, time) => complete(task, time) + case Simulation.SimCompleted(actor, time, id) => completeActor(actor, time, id) } def receive = simulationReceive @@ -236,6 +244,8 @@ object Simulation { * @param time The (virtual) time of completion. */ case class TaskCompleted(task: Task, time: Long) + //TODO document + case class SimCompleted(actor: ActorRef, time: Long, id: UUID) /** * Tells the [[Simulation]] to request that [[Coordinator]] waits. * @@ -295,6 +305,7 @@ class SingleTaskSimulation( } override def complete(task: Task, time: Long) = promise.success((task, time)) + override def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = Unit } object SingleTaskSimulation { @@ -349,6 +360,7 @@ abstract class AsyncSimulation( * @group internal */ private val tasks: Map[UUID, Callback] = Map() + private val childSims: Map[String,UUID] = Map() /** * Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation. @@ -403,6 +415,13 @@ abstract class AsyncSimulation( super.task(id, t, resources) } + def sim(actor: ActorRef, callback: Callback): Unit = { + val id = java.util.UUID.randomUUID + tasks += id -> callback + childSims += actor.path.name -> id + super.sim(actor) + } + /** * Manages a [[Task]] whose simulation has completed. * @@ -418,6 +437,11 @@ abstract class AsyncSimulation( tasks -= task.id } + override def completeActor(actor: ActorRef, time: Long, id: UUID) = { + childSims.get(actor.path.name) map ( tasks.get(_).map(_(null, time)) ) + ack(Seq(id)) + } + def actorCallback(actor: ActorRef): Callback = (task, time) => { actor ! (task, time) } @@ -436,6 +460,7 @@ abstract class AsyncSimulation( case Simulation.Ready => ready() case Simulation.AckTasks(tasks) => ack(tasks) case Simulation.TaskCompleted(task, time) => complete(task, time) + case Simulation.SimCompleted(actor, time, id) => completeActor(actor, time, id) case Simulation.AddTaskWithId(id, t, r) => task(id, t, actorCallback(sender), r) case Simulation.AddTask(t, r) => task(t, actorCallback(sender), r: _*) case Simulation.Wait => coordinator.forward(Coordinator.WaitFor(self)) @@ -456,3 +481,9 @@ trait FutureTasks { self: AsyncSimulation => p.future } } + +// trait ChildSims { myself: Simulation => +// def sim(actor: ActorRef) { +// myself.coordinator ! Coordinator.AddSimNow(actor,Some(self)) +// } +// } diff --git a/src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala b/src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala new file mode 100644 index 00000000..b088c6c5 --- /dev/null +++ b/src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala @@ -0,0 +1,89 @@ +package com.workflowfm.simulator + +import akka.actor.{ ActorSystem, Props, ActorRef } +import akka.util.Timeout +import akka.pattern.ask +import scala.concurrent._ +import scala.concurrent.duration._ +import com.workflowfm.simulator._ +import com.workflowfm.simulator.metrics._ +import com.workflowfm.simulator.events.{ ShutdownHandler } +import uk.ac.ed.inf.ppapapan.subakka.Subscriber +import java.io.File + +object FlowsMain { + //toggle for debug + val DEBUG = false + + def main(args: Array[String]): Unit = { + + implicit val system: ActorSystem = ActorSystem("ChildSims") + implicit val executionContext: ExecutionContext = ExecutionContext.global + implicit val timeout = Timeout(2.seconds) + + val coordinator = system.actorOf(Coordinator.props(DefaultScheduler)) + val shutdownActor = Subscriber.actor(new ShutdownHandler()) + + val handler = SimMetricsOutputs( + new SimMetricsPrinter(), + new SimCSVFileOutput("ChildSims" + File.separator + "output" + File.separator,"ChildSims"), + new SimD3Timeline("ChildSims" + File.separator + "output" + File.separator,"ChildSims") + ) + + Await.result(new SimOutputHandler(handler).subAndForgetTo(coordinator,Some("MetricsHandler")), 3.seconds) + Await.result(shutdownActor ? Subscriber.SubAndForgetTo(coordinator), 3.seconds) + + if (DEBUG) { + println(s"Cores: ${Runtime.getRuntime().availableProcessors()}") + val config = system.settings.config.getConfig("akka.actor.default-dispatcher") + println(s"Parallelism: ${config.getInt("fork-join-executor.parallelism-min")}-${config.getInt("fork-join-executor.parallelism-max")} x ${config.getDouble("fork-join-executor.parallelism-factor")}") + val printer = new com.workflowfm.simulator.events.PrintEventHandler + Await.result(printer.subAndForgetTo(coordinator), 3.seconds) + } + + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends AsyncSimulation(name, coordinator) { + + override def run(): Future[Any] = { + val promise = Promise[Any]() + + val childSim = system.actorOf(SingleTaskSimulation.props("childSim", coordinator, Seq("r1"), ConstantGenerator(5L))) + val callback2: Callback = (_,_)=> promise.success(Unit) + + sim(childSim,callback2) + + task(TaskGenerator("t1","sim",ConstantGenerator(1L),ConstantGenerator(0L)), + (_,_)=> {task(TaskGenerator("t2","sim",ConstantGenerator(2L),ConstantGenerator(0L)), (_,_)=>ready(), "r3" ); ready() }, + "r2" + ) + + ready() + + promise.future + + } + } + + //========================================================================================= + + // Define resources + val r1 = new TaskResource("r1",0) + val r2 = new TaskResource("r2",0) + val r3 = new TaskResource("r3",0) + val r4 = new TaskResource("r4",0) + val r5 = new TaskResource("r5",0) + val r6 = new TaskResource("r6",0) + val r7 = new TaskResource("r7",0) + val r8 = new TaskResource("r8",0) + val resources = List (r1,r2,r3,r4,r5,r6,r7,r8) + coordinator ! Coordinator.AddResources(resources) + + val sim = Props(new DummySim("sim", coordinator)) + + coordinator ! Coordinator.AddSim(0L,system.actorOf(sim,"sim")) + + coordinator ! Coordinator.Start + } + +} \ No newline at end of file diff --git a/src/test/scala/com/workflowfm/simulator/Coordinator.scala b/src/test/scala/com/workflowfm/simulator/Coordinator.scala index cdf6d1d6..dad4bd10 100644 --- a/src/test/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/test/scala/com/workflowfm/simulator/Coordinator.scala @@ -36,7 +36,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") coordinator ! Coordinator.SimDone("Test", Success(Unit)) expectNoMsg() @@ -47,7 +47,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") coordinator ! Coordinator.Ping expectMsgType[Coordinator.Time].time should be(0L) @@ -59,7 +59,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") val id = UUID.randomUUID() @@ -68,7 +68,7 @@ class CoordinatorTests coordinator ! Coordinator.AddTasks(Seq((id, tg, Seq()))) coordinator ! Coordinator.SimReady - val SimulationActor.TaskCompleted(task, time) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task, time) = expectMsgType[Simulation.TaskCompleted] time should be(2L) task.compare(expected) should be(0) coordinator ! Coordinator.Ping @@ -81,7 +81,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") // Add task T1 0..2 @@ -92,7 +92,7 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // T1 completes - val SimulationActor.TaskCompleted(task1, time1) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1, time1) = expectMsgType[Simulation.TaskCompleted] time1 should be(2L) task1.compare(expected1) should be(0) @@ -104,7 +104,7 @@ class CoordinatorTests coordinator ! Coordinator.AckTasks(Seq(id1)) // T2 completes - val SimulationActor.TaskCompleted(task2, time2) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task2, time2) = expectMsgType[Simulation.TaskCompleted] time2 should be(5L) task2.compare(expected2) should be(0) @@ -117,7 +117,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") // T1 0..2 @@ -135,8 +135,8 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // T1 and T2 complete - expectMsgType[SimulationActor.TaskCompleted] - expectMsgType[SimulationActor.TaskCompleted] + expectMsgType[Simulation.TaskCompleted] + expectMsgType[Simulation.TaskCompleted] coordinator ! Coordinator.Ping expectMsgType[Coordinator.Time].time should be(2L) @@ -159,7 +159,7 @@ class CoordinatorTests coordinator ! Coordinator.AckTasks(Seq(id2)) // T3 completes - val SimulationActor.TaskCompleted(task3, time3) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task3, time3) = expectMsgType[Simulation.TaskCompleted] time3 should be(5L) task3.compare(expected3) should be(0) @@ -177,7 +177,7 @@ class CoordinatorTests coordinator ! Coordinator.Start // Test1 starts - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test1") // T1a 0..2 @@ -188,7 +188,7 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // Test2 starts - probe.expectMsg(SimulationActor.Start) + probe.expectMsg(Simulation.Start) probe.reply(Coordinator.SimStarted("Test2")) // T2a 1..2 @@ -199,14 +199,14 @@ class CoordinatorTests probe.send(coordinator, Coordinator.SimReady) // T1a completes - val SimulationActor.TaskCompleted(task1a, time1a) = - expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1a, time1a) = + expectMsgType[Simulation.TaskCompleted] time1a should be(2L) task1a.compare(expected1a) should be(0) // T2a completes - val SimulationActor.TaskCompleted(task2a, time2a) = - probe.expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task2a, time2a) = + probe.expectMsgType[Simulation.TaskCompleted] time2a should be(2L) task2a.compare(expected2a) should be(0) @@ -225,7 +225,7 @@ class CoordinatorTests coordinator ! Coordinator.Start // Test1 starts - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test1") // T1a 0..10 @@ -237,7 +237,7 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // Test2 starts - probe.expectMsg(SimulationActor.Start) + probe.expectMsg(Simulation.Start) probe.reply(Coordinator.SimStarted("Test2")) // T2a 1..2 @@ -248,14 +248,14 @@ class CoordinatorTests probe.send(coordinator, Coordinator.SimReady) // T2a completes - val SimulationActor.TaskCompleted(task2a, time2a) = - probe.expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task2a, time2a) = + probe.expectMsgType[Simulation.TaskCompleted] time2a should be(2L) task2a.compare(expected2a) should be(0) // Test1 requests wait coordinator ! Coordinator.WaitFor(self) - expectMsg(SimulationActor.AckWait) + expectMsg(Simulation.AckWait) // Test2 completes probe.send(coordinator, Coordinator.SimDone("Test2", Success(Unit))) @@ -273,21 +273,39 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // T1b completes - val SimulationActor.TaskCompleted(task1b, time1b) = - expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1b, time1b) = + expectMsgType[Simulation.TaskCompleted] time1b should be(3L) task1b.compare(expected1b) should be(0) coordinator ! Coordinator.AckTasks(Seq(id1b)) // T1a completes - val SimulationActor.TaskCompleted(task1a, time1a) = - expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1a, time1a) = + expectMsgType[Simulation.TaskCompleted] time1a should be(10L) task1a.compare(expected1a) should be(0) coordinator ! Coordinator.SimDone("Test1", Success(Unit)) } + "interact correctly with a simulation creating child simulations" in { + val coordinator = system.actorOf(Coordinator.props(DefaultScheduler)) + val childSim = system.actorOf(SingleTaskSimulation.props("ChildSim",coordinator,Seq("r1"),ConstantGenerator(2L))) + + coordinator ! Coordinator.AddSim(0L, self) + coordinator ! Coordinator.Start + expectMsg(Simulation.Start) + coordinator ! Coordinator.SimStarted("Test") + coordinator ! Coordinator.AddSimNow(childSim,Some(self)) + coordinator ! Coordinator.SimReady + val Simulation.SimCompleted(actor,time,id) = expectMsgType[Simulation.SimCompleted] + actor should be (childSim) + time should be (2L) + + //coordinator ! Coordinator.SimDone("Test", Success(Unit)) + //expectNoMsg() + } + } /* "The Coordinator" must { diff --git a/src/test/scala/com/workflowfm/simulator/Simulation.scala b/src/test/scala/com/workflowfm/simulator/Simulation.scala new file mode 100644 index 00000000..f0eef9b1 --- /dev/null +++ b/src/test/scala/com/workflowfm/simulator/Simulation.scala @@ -0,0 +1,169 @@ +package com.workflowfm.simulator + +import akka.actor.{ ActorRef, ActorSystem, Props } +import akka.testkit.{ ImplicitSender, TestActors, TestKit, TestProbe } +import akka.pattern.ask +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import scala.concurrent._ +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{ Failure, Success, Try } +import java.util.UUID +import com.workflowfm.simulator.metrics._ +import uk.ac.ed.inf.ppapapan.subakka.MockPublisher + +class SimulationTests + extends TestKit( + ActorSystem("SimulaionTests", ConfigFactory.parseString(MockPublisher.config)) + ) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ImplicitSender { + implicit val executionContext = ExecutionContext.global + implicit val timeout: FiniteDuration = 10.seconds + + override def afterAll: Unit = { + TestKit.shutdownActorSystem(system) + } + + "Simulations" must { + + "interact correctly with a coordinator with no tasks" in { + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends Simulation(name, coordinator) { + override def run(): Future[Any] = Future.unit //finish instantly + override def complete(task: Task, time: Long): Unit = Unit //does nothing + //override def completeActor(actor: ActorRef, time: Long): Unit = Unit + } + + val sim = system.actorOf(Props(new DummySim("sim", self))) + + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + expectMsg(Coordinator.SimDone("sim", Success())) + expectNoMsg() + } + + "interact correctly with a cooridnator with one task" in { + val sim = + system.actorOf(SingleTaskSimulation.props("sim", self, Seq("r1"), ConstantGenerator(2L))) + + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + val Coordinator.AddTask(id, generator, resources) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.SimReady) + + val task = generator.create(id, 0L, sim, "r1") + sim ! Simulation.TaskCompleted(task, 2L) + val Coordinator.SimDone(name, future) = expectMsgType[Coordinator.SimDone] + name should be("sim") + expectNoMsg() + } + + "interact correctly with a cooridnator with a sequence of task, acking tasks as they complete" in { + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends AsyncSimulation(name, coordinator) + with FutureTasks { + override def run(): Future[Any] = { + val id1 = java.util.UUID.randomUUID + val id2 = java.util.UUID.randomUUID + val id3 = java.util.UUID.randomUUID + val task1 = futureTask( + id1, + TaskGenerator("task1", "sim", ConstantGenerator(2L), ConstantGenerator(0L)), + Seq("r1") + ) + ready() + val task2 = task1 flatMap { _ => + val t = futureTask( + id2, + TaskGenerator("task2", "sim", ConstantGenerator(2L), ConstantGenerator(0L)), + Seq("r1") + ) + ack(Seq(id1)) + t + } + val task3 = task2 flatMap { _ => + val t = futureTask( + id3, + TaskGenerator("task3", "sim", ConstantGenerator(2L), ConstantGenerator(0L)), + Seq("r1") + ) + ack(Seq(id2)) + t + } + task3 + } + } + + val sim = system.actorOf(Props(new DummySim("sim", self))) + + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + val Coordinator.AddTask(id1, generator1, resources1) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.SimReady) + expectNoMsg() + + val task1 = generator1.create(id1, 0L, sim, resources1: _*) + sim ! Simulation.TaskCompleted(task1, 2L) + + val Coordinator.AddTask(id2, generator2, resources2) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.AckTasks(Seq(id1))) + expectNoMsg() + + val task2 = generator2.create(id2, 2L, sim, resources2: _*) + sim ! Simulation.TaskCompleted(task2, 4L) + + val Coordinator.AddTask(id3, generator3, resources3) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.AckTasks(Seq(id2))) + expectNoMsg() + + val task3 = generator3.create(id3, 2L, sim, resources3: _*) + sim ! Simulation.TaskCompleted(task3, 6L) + + //expectMsg( Coordinator.AckTasks(Seq(id3))) + val Coordinator.SimDone(name, future) = expectMsgType[Coordinator.SimDone] + name should be("sim") + expectNoMsg() + + } + + "innitiate child simulations" in { + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends Simulation(name, coordinator) { + private val promise = Promise[Any]() + override def run(): Future[Any] = { + val childSim = system.actorOf( + SingleTaskSimulation.props("ChildSim", self, Seq("r1"), ConstantGenerator(2L)) + ) + sim(childSim) + ready() + + promise.future + } + override def complete(task: Task, time: Long) = Unit //Does nothing + override def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = promise.success(Unit) + } + + val sim = system.actorOf(Props(new DummySim("sim", self))) + + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + val Coordinator.AddSimNow(actor, parent) = expectMsgType[Coordinator.AddSimNow] + parent should be(Some(sim)) + expectMsg(Coordinator.SimReady) + + sim ! Simulation.SimCompleted(actor, 2L, java.util.UUID.randomUUID) + expectMsg(Coordinator.SimDone("sim", Success(Unit))) + expectNoMsg() + + } + } + +}