From e7e514ce91fe2724a06fc0e2190e5dd67beee259 Mon Sep 17 00:00:00 2001 From: zedrox Date: Tue, 4 Aug 2020 17:54:55 +0100 Subject: [PATCH 1/4] Made basic Simulation tests --- .../com/workflowfm/simulator/Simulation.scala | 122 ++++++++++++++++++ 1 file changed, 122 insertions(+) create mode 100644 src/test/scala/com/workflowfm/simulator/Simulation.scala diff --git a/src/test/scala/com/workflowfm/simulator/Simulation.scala b/src/test/scala/com/workflowfm/simulator/Simulation.scala new file mode 100644 index 00000000..3699d73f --- /dev/null +++ b/src/test/scala/com/workflowfm/simulator/Simulation.scala @@ -0,0 +1,122 @@ +package com.workflowfm.simulator + +import akka.actor.{ActorSystem, ActorRef,Props} +import akka.testkit.{ ImplicitSender, TestActors, TestKit, TestProbe } +import akka.pattern.ask +import akka.util.Timeout +import com.typesafe.config.ConfigFactory +import org.scalatest.{ BeforeAndAfterAll, Matchers, WordSpecLike } +import scala.concurrent._ +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.{ Failure, Success, Try } +import java.util.UUID +import com.workflowfm.simulator.metrics._ +import uk.ac.ed.inf.ppapapan.subakka.MockPublisher + +class SimulationTests + extends TestKit( + ActorSystem("SimulaionTests", ConfigFactory.parseString(MockPublisher.config)) + ) + with WordSpecLike + with Matchers + with BeforeAndAfterAll + with ImplicitSender { + implicit val executionContext = ExecutionContext.global + implicit val timeout: FiniteDuration = 10.seconds + + override def afterAll: Unit = { + TestKit.shutdownActorSystem(system) + } + + "Simulations" must { + + "interact correctly with a coordinator with no tasks" in { + class DummySim(name: String, coordinator: ActorRef) + (implicit executionContext: ExecutionContext) + extends Simulation(name,coordinator) { + override def run():Future[Any] = { Future.unit } //finish instantly + override def complete(task: Task, time: Long): Unit = { Unit } //does nothing + } + + val sim = system.actorOf(Props(new DummySim("sim",self))) + + sim ! Simulation.Start + expectMsg( Coordinator.SimStarted("sim")) + expectMsg( Coordinator.SimDone("sim",Success())) + expectNoMsg() + } + + "interact correctly with a cooridnator with one task" in { + val sim = system.actorOf(SingleTaskSimulation.props("sim",self,Seq("r1"),ConstantGenerator(2L))) + + sim ! Simulation.Start + expectMsg( Coordinator.SimStarted("sim")) + val Coordinator.AddTask(id, generator, resources) = expectMsgType[ Coordinator.AddTask ] + expectMsg( Coordinator.SimReady ) + + val task = generator.create(id,0L,sim,"r1") + sim ! Simulation.TaskCompleted(task,2L) + val Coordinator.SimDone(name, future) = expectMsgType[ Coordinator.SimDone ] + name should be ("sim") + expectNoMsg() + } + + "interact correctly with a cooridnator with a sequence of task, acking tasks as they complete" in { + class DummySim(name: String, coordinator: ActorRef) + (implicit executionContext: ExecutionContext) + extends AsyncSimulation(name,coordinator) with FutureTasks { + override def run():Future[Any] = { + val id1 = java.util.UUID.randomUUID + val id2 = java.util.UUID.randomUUID + val id3 = java.util.UUID.randomUUID + val task1 = futureTask(id1, TaskGenerator("task1","sim",ConstantGenerator(2L),ConstantGenerator(0L)),Seq("r1")) + ready() + val task2 = task1 flatMap { _=> + val t = futureTask(id2,TaskGenerator("task2","sim",ConstantGenerator(2L),ConstantGenerator(0L)),Seq("r1")) + ack(Seq(id1)) + t + } + val task3 = task2 flatMap { _=> + val t = futureTask(id3,TaskGenerator("task3","sim",ConstantGenerator(2L),ConstantGenerator(0L)),Seq("r1")) + ack(Seq(id2)) + t + } + task3 + } + } + + val sim = system.actorOf(Props(new DummySim("sim",self))) + + sim ! Simulation.Start + expectMsg( Coordinator.SimStarted("sim")) + val Coordinator.AddTask(id1, generator1, resources1) = expectMsgType[ Coordinator.AddTask ] + expectMsg( Coordinator.SimReady ) + expectNoMsg() + + val task1 = generator1.create(id1,0L,sim,resources1:_*) + sim ! Simulation.TaskCompleted(task1,2L) + + val Coordinator.AddTask(id2, generator2, resources2) = expectMsgType[ Coordinator.AddTask ] + expectMsg( Coordinator.AckTasks(Seq(id1))) + expectNoMsg() + + val task2 = generator2.create(id2,2L,sim,resources2:_*) + sim ! Simulation.TaskCompleted(task2,4L) + + val Coordinator.AddTask(id3, generator3, resources3) = expectMsgType[ Coordinator.AddTask ] + expectMsg( Coordinator.AckTasks(Seq(id2))) + expectNoMsg() + + val task3 = generator3.create(id3,2L,sim,resources3:_*) + sim ! Simulation.TaskCompleted(task3,6L) + + //expectMsg( Coordinator.AckTasks(Seq(id3))) + val Coordinator.SimDone(name, future) = expectMsgType[ Coordinator.SimDone ] + name should be ("sim") + expectNoMsg() + + } + } + +} \ No newline at end of file From fa9fc8589b576cd707f78240c703357f9118237c Mon Sep 17 00:00:00 2001 From: zedrox Date: Tue, 4 Aug 2020 17:56:51 +0100 Subject: [PATCH 2/4] fixes coordinator tests --- .../workflowfm/simulator/Coordinator.scala | 52 +++++++++---------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/src/test/scala/com/workflowfm/simulator/Coordinator.scala b/src/test/scala/com/workflowfm/simulator/Coordinator.scala index cdf6d1d6..87cdddfe 100644 --- a/src/test/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/test/scala/com/workflowfm/simulator/Coordinator.scala @@ -36,7 +36,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") coordinator ! Coordinator.SimDone("Test", Success(Unit)) expectNoMsg() @@ -47,7 +47,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") coordinator ! Coordinator.Ping expectMsgType[Coordinator.Time].time should be(0L) @@ -59,7 +59,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") val id = UUID.randomUUID() @@ -68,7 +68,7 @@ class CoordinatorTests coordinator ! Coordinator.AddTasks(Seq((id, tg, Seq()))) coordinator ! Coordinator.SimReady - val SimulationActor.TaskCompleted(task, time) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task, time) = expectMsgType[Simulation.TaskCompleted] time should be(2L) task.compare(expected) should be(0) coordinator ! Coordinator.Ping @@ -81,7 +81,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") // Add task T1 0..2 @@ -92,7 +92,7 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // T1 completes - val SimulationActor.TaskCompleted(task1, time1) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1, time1) = expectMsgType[Simulation.TaskCompleted] time1 should be(2L) task1.compare(expected1) should be(0) @@ -104,7 +104,7 @@ class CoordinatorTests coordinator ! Coordinator.AckTasks(Seq(id1)) // T2 completes - val SimulationActor.TaskCompleted(task2, time2) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task2, time2) = expectMsgType[Simulation.TaskCompleted] time2 should be(5L) task2.compare(expected2) should be(0) @@ -117,7 +117,7 @@ class CoordinatorTests coordinator ! Coordinator.AddSim(0L, self) coordinator ! Coordinator.Start - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") // T1 0..2 @@ -135,8 +135,8 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // T1 and T2 complete - expectMsgType[SimulationActor.TaskCompleted] - expectMsgType[SimulationActor.TaskCompleted] + expectMsgType[Simulation.TaskCompleted] + expectMsgType[Simulation.TaskCompleted] coordinator ! Coordinator.Ping expectMsgType[Coordinator.Time].time should be(2L) @@ -159,7 +159,7 @@ class CoordinatorTests coordinator ! Coordinator.AckTasks(Seq(id2)) // T3 completes - val SimulationActor.TaskCompleted(task3, time3) = expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task3, time3) = expectMsgType[Simulation.TaskCompleted] time3 should be(5L) task3.compare(expected3) should be(0) @@ -177,7 +177,7 @@ class CoordinatorTests coordinator ! Coordinator.Start // Test1 starts - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test1") // T1a 0..2 @@ -188,7 +188,7 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // Test2 starts - probe.expectMsg(SimulationActor.Start) + probe.expectMsg(Simulation.Start) probe.reply(Coordinator.SimStarted("Test2")) // T2a 1..2 @@ -199,14 +199,14 @@ class CoordinatorTests probe.send(coordinator, Coordinator.SimReady) // T1a completes - val SimulationActor.TaskCompleted(task1a, time1a) = - expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1a, time1a) = + expectMsgType[Simulation.TaskCompleted] time1a should be(2L) task1a.compare(expected1a) should be(0) // T2a completes - val SimulationActor.TaskCompleted(task2a, time2a) = - probe.expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task2a, time2a) = + probe.expectMsgType[Simulation.TaskCompleted] time2a should be(2L) task2a.compare(expected2a) should be(0) @@ -225,7 +225,7 @@ class CoordinatorTests coordinator ! Coordinator.Start // Test1 starts - expectMsg(SimulationActor.Start) + expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test1") // T1a 0..10 @@ -237,7 +237,7 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // Test2 starts - probe.expectMsg(SimulationActor.Start) + probe.expectMsg(Simulation.Start) probe.reply(Coordinator.SimStarted("Test2")) // T2a 1..2 @@ -248,14 +248,14 @@ class CoordinatorTests probe.send(coordinator, Coordinator.SimReady) // T2a completes - val SimulationActor.TaskCompleted(task2a, time2a) = - probe.expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task2a, time2a) = + probe.expectMsgType[Simulation.TaskCompleted] time2a should be(2L) task2a.compare(expected2a) should be(0) // Test1 requests wait coordinator ! Coordinator.WaitFor(self) - expectMsg(SimulationActor.AckWait) + expectMsg(Simulation.AckWait) // Test2 completes probe.send(coordinator, Coordinator.SimDone("Test2", Success(Unit))) @@ -273,15 +273,15 @@ class CoordinatorTests coordinator ! Coordinator.SimReady // T1b completes - val SimulationActor.TaskCompleted(task1b, time1b) = - expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1b, time1b) = + expectMsgType[Simulation.TaskCompleted] time1b should be(3L) task1b.compare(expected1b) should be(0) coordinator ! Coordinator.AckTasks(Seq(id1b)) // T1a completes - val SimulationActor.TaskCompleted(task1a, time1a) = - expectMsgType[SimulationActor.TaskCompleted] + val Simulation.TaskCompleted(task1a, time1a) = + expectMsgType[Simulation.TaskCompleted] time1a should be(10L) task1a.compare(expected1a) should be(0) From 89126c2542649eb2f939b98623bc4e1af152b14a Mon Sep 17 00:00:00 2001 From: zedrox Date: Wed, 5 Aug 2020 13:43:34 +0100 Subject: [PATCH 3/4] First implementation of child sims, inc. tests --- .../workflowfm/simulator/Coordinator.scala | 14 +++++--- .../com/workflowfm/simulator/Simulation.scala | 18 +++++++++- .../workflowfm/simulator/Coordinator.scala | 16 +++++++++ .../com/workflowfm/simulator/Simulation.scala | 35 +++++++++++++++++-- 4 files changed, 75 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/workflowfm/simulator/Coordinator.scala b/src/main/scala/com/workflowfm/simulator/Coordinator.scala index 16ab595b..f4c46aa6 100644 --- a/src/main/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/main/scala/com/workflowfm/simulator/Coordinator.scala @@ -72,6 +72,8 @@ class Coordinator( */ val waiting: Map[ActorRef, List[UUID]] = Map[ActorRef, List[UUID]]() + val parents: Map[ActorRef, ActorRef] = Map[ActorRef, ActorRef]() + /** Set of simulation names that are running, i.e. they have already started but not finished. */ /** * Set of simulation names that are running. @@ -240,9 +242,10 @@ class Coordinator( * time. * @param actor The reference to the [[Simulation]] corresponding to the simulation. */ - protected def addSimulation(t: Long, actor: ActorRef) = { + protected def addSimulation(t: Long, actor: ActorRef, parent: Option[ActorRef]=None) = { publish(ESimAdd(self, time, actor.toString(), t)) if (t >= time) events += StartingSim(t, actor) + if (parent.isDefined) parents += actor -> parent.get } /** @@ -303,6 +306,7 @@ class Coordinator( protected def stopSimulation(name: String, result: String, actor: ActorRef) = { simulations -= name waiting -= actor + parents.get(actor) map (x=> x ! Simulation.SimCompleted(actor,time)) publish(ESimEnd(self, time, name, result)) log.debug(s"[COORD:$time] Finished: [${actor.path.name}]") ready(actor) @@ -543,9 +547,9 @@ class Coordinator( * @return The [[Receive]] behaviour. */ def receiveBehaviour: Receive = { - case Coordinator.AddSim(t, s) => addSimulation(t, s) + case Coordinator.AddSim(t, s, p) => addSimulation(t, s, p) case Coordinator.AddSims(l) => addSimulations(l) - case Coordinator.AddSimNow(s) => addSimulation(time, s) + case Coordinator.AddSimNow(s,p) => addSimulation(time, s, p) case Coordinator.AddSimsNow(l) => addSimulations(l.map((time, _))) case Coordinator.AddResource(r) => addResource(r) @@ -615,7 +619,7 @@ object Coordinator { * Message to add a simulation. * @group toplevel */ - case class AddSim(t: Long, actor: ActorRef) + case class AddSim(t: Long, actor: ActorRef, parent: Option[ActorRef]=None) /** * Message to add a list of simulations. * @group toplevel @@ -625,7 +629,7 @@ object Coordinator { * Message to add a simulation due to start right now. * @group toplevel */ - case class AddSimNow(actor: ActorRef) + case class AddSimNow(actor: ActorRef, parent: Option[ActorRef]=None) /** * Message to add a list of simulations due to start right now. * @group toplevel diff --git a/src/main/scala/com/workflowfm/simulator/Simulation.scala b/src/main/scala/com/workflowfm/simulator/Simulation.scala index 29a0e8da..de912f60 100644 --- a/src/main/scala/com/workflowfm/simulator/Simulation.scala +++ b/src/main/scala/com/workflowfm/simulator/Simulation.scala @@ -62,7 +62,7 @@ import java.util.UUID */ abstract class Simulation( name: String, - coordinator: ActorRef + coordinator: ActorRef, )(implicit executionContext: ExecutionContext) extends Actor { @@ -72,10 +72,13 @@ abstract class Simulation( * @group api * @return A `Future` that completes with a custom output when the simulation is completed. */ + def run(): Future[Any] def complete(task: Task, time: Long): Unit + def completeActor(actor: ActorRef, time: Long): Unit + /** * Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation. * @@ -123,6 +126,10 @@ abstract class Simulation( coordinator ! Coordinator.AddTask(id, t, resources) } + def sim(actor: ActorRef) { + coordinator ! Coordinator.AddSimNow(actor,Option(self)) + } + /** * Starts the simulation via the [[run]] function. * @@ -187,6 +194,7 @@ abstract class Simulation( def simulationReceive: Receive = { case Simulation.Start => start() case Simulation.TaskCompleted(task, time) => complete(task, time) + case Simulation.SimCompleted(actor, time) => completeActor(actor, time) } def receive = simulationReceive @@ -236,6 +244,8 @@ object Simulation { * @param time The (virtual) time of completion. */ case class TaskCompleted(task: Task, time: Long) + //TODO document + case class SimCompleted(actor: ActorRef, time: Long) /** * Tells the [[Simulation]] to request that [[Coordinator]] waits. * @@ -295,6 +305,7 @@ class SingleTaskSimulation( } override def complete(task: Task, time: Long) = promise.success((task, time)) + override def completeActor(actor: ActorRef, time: Long): Unit = Unit } object SingleTaskSimulation { @@ -418,6 +429,10 @@ abstract class AsyncSimulation( tasks -= task.id } + override def completeActor(actor: ActorRef, time: Long) = { + + } + def actorCallback(actor: ActorRef): Callback = (task, time) => { actor ! (task, time) } @@ -436,6 +451,7 @@ abstract class AsyncSimulation( case Simulation.Ready => ready() case Simulation.AckTasks(tasks) => ack(tasks) case Simulation.TaskCompleted(task, time) => complete(task, time) + case Simulation.SimCompleted(actor, time) => completeActor(actor, time) case Simulation.AddTaskWithId(id, t, r) => task(id, t, actorCallback(sender), r) case Simulation.AddTask(t, r) => task(t, actorCallback(sender), r: _*) case Simulation.Wait => coordinator.forward(Coordinator.WaitFor(self)) diff --git a/src/test/scala/com/workflowfm/simulator/Coordinator.scala b/src/test/scala/com/workflowfm/simulator/Coordinator.scala index 87cdddfe..4a6ade48 100644 --- a/src/test/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/test/scala/com/workflowfm/simulator/Coordinator.scala @@ -288,6 +288,22 @@ class CoordinatorTests coordinator ! Coordinator.SimDone("Test1", Success(Unit)) } + "interact correctly with a simulation creating child simulations" in { + val coordinator = system.actorOf(Coordinator.props(DefaultScheduler)) + val childSim = system.actorOf(SingleTaskSimulation.props("ChildSim",coordinator,Seq("r1"),ConstantGenerator(2L))) + + coordinator ! Coordinator.AddSim(0L, self) + coordinator ! Coordinator.Start + expectMsg(Simulation.Start) + coordinator ! Coordinator.SimStarted("Test") + coordinator ! Coordinator.AddSimNow(childSim,Option(self)) + coordinator ! Coordinator.SimReady + expectMsg(Simulation.SimCompleted(childSim,2L)) + + //coordinator ! Coordinator.SimDone("Test", Success(Unit)) + //expectNoMsg() + } + } /* "The Coordinator" must { diff --git a/src/test/scala/com/workflowfm/simulator/Simulation.scala b/src/test/scala/com/workflowfm/simulator/Simulation.scala index 3699d73f..d0f04e41 100644 --- a/src/test/scala/com/workflowfm/simulator/Simulation.scala +++ b/src/test/scala/com/workflowfm/simulator/Simulation.scala @@ -35,8 +35,9 @@ class SimulationTests class DummySim(name: String, coordinator: ActorRef) (implicit executionContext: ExecutionContext) extends Simulation(name,coordinator) { - override def run():Future[Any] = { Future.unit } //finish instantly - override def complete(task: Task, time: Long): Unit = { Unit } //does nothing + override def run():Future[Any] = Future.unit //finish instantly + override def complete(task: Task, time: Long): Unit = Unit //does nothing + override def completeActor(actor: ActorRef, time: Long): Unit = Unit } val sim = system.actorOf(Props(new DummySim("sim",self))) @@ -117,6 +118,36 @@ class SimulationTests expectNoMsg() } + + "innitiate child simulations" in { + class DummySim(name: String, coordinator: ActorRef) + (implicit executionContext: ExecutionContext) + extends Simulation(name,coordinator) with ChildSims { + private val promise = Promise[Any]() + override def run():Future[Any] = { + val childSim = system.actorOf(SingleTaskSimulation.props("ChildSim",self,Seq("r1"),ConstantGenerator(2L))) + sim(childSim) + ready() + + promise.future + } + override def complete(task: Task, time: Long) = Unit //Does nothing + override def completeActor(actor: ActorRef, time: Long): Unit = promise.success(Unit) + } + + val sim = system.actorOf(Props(new DummySim("sim",self))) + + sim ! Simulation.Start + expectMsg( Coordinator.SimStarted("sim")) + val Coordinator.AddSimNow(actor,parent) = expectMsgType[ Coordinator.AddSimNow ] + parent should be (Some(sim)) + expectMsg( Coordinator.SimReady ) + + sim ! Simulation.SimCompleted(actor,2L) + expectMsg( Coordinator.SimDone("sim",Success(Unit))) + expectNoMsg() + + } } } \ No newline at end of file From 8dec8f98197bcae25a4b59f1db845be3d97574ab Mon Sep 17 00:00:00 2001 From: zedrox Date: Mon, 10 Aug 2020 10:37:21 +0100 Subject: [PATCH 4/4] Adds async simulation support and acks Updated tests. Updated message from coordinator to simulation- now contains ID which must be acked before continuing. Created simulationChildDemo for easy testing, will probably remove this in the future. --- .../workflowfm/simulator/Coordinator.scala | 6 +- .../com/workflowfm/simulator/Simulation.scala | 33 ++- .../simulator/SimulationChildDemo.scala | 89 +++++++ .../workflowfm/simulator/Coordinator.scala | 6 +- .../com/workflowfm/simulator/Simulation.scala | 236 ++++++++++-------- 5 files changed, 248 insertions(+), 122 deletions(-) create mode 100644 src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala diff --git a/src/main/scala/com/workflowfm/simulator/Coordinator.scala b/src/main/scala/com/workflowfm/simulator/Coordinator.scala index f4c46aa6..7cab583d 100644 --- a/src/main/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/main/scala/com/workflowfm/simulator/Coordinator.scala @@ -306,7 +306,11 @@ class Coordinator( protected def stopSimulation(name: String, result: String, actor: ActorRef) = { simulations -= name waiting -= actor - parents.get(actor) map (x=> x ! Simulation.SimCompleted(actor,time)) + parents.get(actor) map {x=> + val id = java.util.UUID.randomUUID + waiting += x -> List(id) + x ! Simulation.SimCompleted(actor,time,id) + } publish(ESimEnd(self, time, name, result)) log.debug(s"[COORD:$time] Finished: [${actor.path.name}]") ready(actor) diff --git a/src/main/scala/com/workflowfm/simulator/Simulation.scala b/src/main/scala/com/workflowfm/simulator/Simulation.scala index de912f60..5d570542 100644 --- a/src/main/scala/com/workflowfm/simulator/Simulation.scala +++ b/src/main/scala/com/workflowfm/simulator/Simulation.scala @@ -62,7 +62,7 @@ import java.util.UUID */ abstract class Simulation( name: String, - coordinator: ActorRef, + val coordinator: ActorRef )(implicit executionContext: ExecutionContext) extends Actor { @@ -77,7 +77,7 @@ abstract class Simulation( def complete(task: Task, time: Long): Unit - def completeActor(actor: ActorRef, time: Long): Unit + def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = Unit /** * Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation. @@ -127,7 +127,7 @@ abstract class Simulation( } def sim(actor: ActorRef) { - coordinator ! Coordinator.AddSimNow(actor,Option(self)) + coordinator ! Coordinator.AddSimNow(actor,Some(self)) } /** @@ -194,7 +194,7 @@ abstract class Simulation( def simulationReceive: Receive = { case Simulation.Start => start() case Simulation.TaskCompleted(task, time) => complete(task, time) - case Simulation.SimCompleted(actor, time) => completeActor(actor, time) + case Simulation.SimCompleted(actor, time, id) => completeActor(actor, time, id) } def receive = simulationReceive @@ -245,7 +245,7 @@ object Simulation { */ case class TaskCompleted(task: Task, time: Long) //TODO document - case class SimCompleted(actor: ActorRef, time: Long) + case class SimCompleted(actor: ActorRef, time: Long, id: UUID) /** * Tells the [[Simulation]] to request that [[Coordinator]] waits. * @@ -305,7 +305,7 @@ class SingleTaskSimulation( } override def complete(task: Task, time: Long) = promise.success((task, time)) - override def completeActor(actor: ActorRef, time: Long): Unit = Unit + override def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = Unit } object SingleTaskSimulation { @@ -360,6 +360,7 @@ abstract class AsyncSimulation( * @group internal */ private val tasks: Map[UUID, Callback] = Map() + private val childSims: Map[String,UUID] = Map() /** * Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation. @@ -414,6 +415,13 @@ abstract class AsyncSimulation( super.task(id, t, resources) } + def sim(actor: ActorRef, callback: Callback): Unit = { + val id = java.util.UUID.randomUUID + tasks += id -> callback + childSims += actor.path.name -> id + super.sim(actor) + } + /** * Manages a [[Task]] whose simulation has completed. * @@ -429,8 +437,9 @@ abstract class AsyncSimulation( tasks -= task.id } - override def completeActor(actor: ActorRef, time: Long) = { - + override def completeActor(actor: ActorRef, time: Long, id: UUID) = { + childSims.get(actor.path.name) map ( tasks.get(_).map(_(null, time)) ) + ack(Seq(id)) } def actorCallback(actor: ActorRef): Callback = (task, time) => { @@ -451,7 +460,7 @@ abstract class AsyncSimulation( case Simulation.Ready => ready() case Simulation.AckTasks(tasks) => ack(tasks) case Simulation.TaskCompleted(task, time) => complete(task, time) - case Simulation.SimCompleted(actor, time) => completeActor(actor, time) + case Simulation.SimCompleted(actor, time, id) => completeActor(actor, time, id) case Simulation.AddTaskWithId(id, t, r) => task(id, t, actorCallback(sender), r) case Simulation.AddTask(t, r) => task(t, actorCallback(sender), r: _*) case Simulation.Wait => coordinator.forward(Coordinator.WaitFor(self)) @@ -472,3 +481,9 @@ trait FutureTasks { self: AsyncSimulation => p.future } } + +// trait ChildSims { myself: Simulation => +// def sim(actor: ActorRef) { +// myself.coordinator ! Coordinator.AddSimNow(actor,Some(self)) +// } +// } diff --git a/src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala b/src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala new file mode 100644 index 00000000..b088c6c5 --- /dev/null +++ b/src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala @@ -0,0 +1,89 @@ +package com.workflowfm.simulator + +import akka.actor.{ ActorSystem, Props, ActorRef } +import akka.util.Timeout +import akka.pattern.ask +import scala.concurrent._ +import scala.concurrent.duration._ +import com.workflowfm.simulator._ +import com.workflowfm.simulator.metrics._ +import com.workflowfm.simulator.events.{ ShutdownHandler } +import uk.ac.ed.inf.ppapapan.subakka.Subscriber +import java.io.File + +object FlowsMain { + //toggle for debug + val DEBUG = false + + def main(args: Array[String]): Unit = { + + implicit val system: ActorSystem = ActorSystem("ChildSims") + implicit val executionContext: ExecutionContext = ExecutionContext.global + implicit val timeout = Timeout(2.seconds) + + val coordinator = system.actorOf(Coordinator.props(DefaultScheduler)) + val shutdownActor = Subscriber.actor(new ShutdownHandler()) + + val handler = SimMetricsOutputs( + new SimMetricsPrinter(), + new SimCSVFileOutput("ChildSims" + File.separator + "output" + File.separator,"ChildSims"), + new SimD3Timeline("ChildSims" + File.separator + "output" + File.separator,"ChildSims") + ) + + Await.result(new SimOutputHandler(handler).subAndForgetTo(coordinator,Some("MetricsHandler")), 3.seconds) + Await.result(shutdownActor ? Subscriber.SubAndForgetTo(coordinator), 3.seconds) + + if (DEBUG) { + println(s"Cores: ${Runtime.getRuntime().availableProcessors()}") + val config = system.settings.config.getConfig("akka.actor.default-dispatcher") + println(s"Parallelism: ${config.getInt("fork-join-executor.parallelism-min")}-${config.getInt("fork-join-executor.parallelism-max")} x ${config.getDouble("fork-join-executor.parallelism-factor")}") + val printer = new com.workflowfm.simulator.events.PrintEventHandler + Await.result(printer.subAndForgetTo(coordinator), 3.seconds) + } + + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends AsyncSimulation(name, coordinator) { + + override def run(): Future[Any] = { + val promise = Promise[Any]() + + val childSim = system.actorOf(SingleTaskSimulation.props("childSim", coordinator, Seq("r1"), ConstantGenerator(5L))) + val callback2: Callback = (_,_)=> promise.success(Unit) + + sim(childSim,callback2) + + task(TaskGenerator("t1","sim",ConstantGenerator(1L),ConstantGenerator(0L)), + (_,_)=> {task(TaskGenerator("t2","sim",ConstantGenerator(2L),ConstantGenerator(0L)), (_,_)=>ready(), "r3" ); ready() }, + "r2" + ) + + ready() + + promise.future + + } + } + + //========================================================================================= + + // Define resources + val r1 = new TaskResource("r1",0) + val r2 = new TaskResource("r2",0) + val r3 = new TaskResource("r3",0) + val r4 = new TaskResource("r4",0) + val r5 = new TaskResource("r5",0) + val r6 = new TaskResource("r6",0) + val r7 = new TaskResource("r7",0) + val r8 = new TaskResource("r8",0) + val resources = List (r1,r2,r3,r4,r5,r6,r7,r8) + coordinator ! Coordinator.AddResources(resources) + + val sim = Props(new DummySim("sim", coordinator)) + + coordinator ! Coordinator.AddSim(0L,system.actorOf(sim,"sim")) + + coordinator ! Coordinator.Start + } + +} \ No newline at end of file diff --git a/src/test/scala/com/workflowfm/simulator/Coordinator.scala b/src/test/scala/com/workflowfm/simulator/Coordinator.scala index 4a6ade48..dad4bd10 100644 --- a/src/test/scala/com/workflowfm/simulator/Coordinator.scala +++ b/src/test/scala/com/workflowfm/simulator/Coordinator.scala @@ -296,9 +296,11 @@ class CoordinatorTests coordinator ! Coordinator.Start expectMsg(Simulation.Start) coordinator ! Coordinator.SimStarted("Test") - coordinator ! Coordinator.AddSimNow(childSim,Option(self)) + coordinator ! Coordinator.AddSimNow(childSim,Some(self)) coordinator ! Coordinator.SimReady - expectMsg(Simulation.SimCompleted(childSim,2L)) + val Simulation.SimCompleted(actor,time,id) = expectMsgType[Simulation.SimCompleted] + actor should be (childSim) + time should be (2L) //coordinator ! Coordinator.SimDone("Test", Success(Unit)) //expectNoMsg() diff --git a/src/test/scala/com/workflowfm/simulator/Simulation.scala b/src/test/scala/com/workflowfm/simulator/Simulation.scala index d0f04e41..f0eef9b1 100644 --- a/src/test/scala/com/workflowfm/simulator/Simulation.scala +++ b/src/test/scala/com/workflowfm/simulator/Simulation.scala @@ -1,6 +1,6 @@ package com.workflowfm.simulator -import akka.actor.{ActorSystem, ActorRef,Props} +import akka.actor.{ ActorRef, ActorSystem, Props } import akka.testkit.{ ImplicitSender, TestActors, TestKit, TestProbe } import akka.pattern.ask import akka.util.Timeout @@ -31,123 +31,139 @@ class SimulationTests "Simulations" must { - "interact correctly with a coordinator with no tasks" in { - class DummySim(name: String, coordinator: ActorRef) - (implicit executionContext: ExecutionContext) - extends Simulation(name,coordinator) { - override def run():Future[Any] = Future.unit //finish instantly - override def complete(task: Task, time: Long): Unit = Unit //does nothing - override def completeActor(actor: ActorRef, time: Long): Unit = Unit - } - - val sim = system.actorOf(Props(new DummySim("sim",self))) - - sim ! Simulation.Start - expectMsg( Coordinator.SimStarted("sim")) - expectMsg( Coordinator.SimDone("sim",Success())) - expectNoMsg() - } + "interact correctly with a coordinator with no tasks" in { + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends Simulation(name, coordinator) { + override def run(): Future[Any] = Future.unit //finish instantly + override def complete(task: Task, time: Long): Unit = Unit //does nothing + //override def completeActor(actor: ActorRef, time: Long): Unit = Unit + } + + val sim = system.actorOf(Props(new DummySim("sim", self))) + + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + expectMsg(Coordinator.SimDone("sim", Success())) + expectNoMsg() + } - "interact correctly with a cooridnator with one task" in { - val sim = system.actorOf(SingleTaskSimulation.props("sim",self,Seq("r1"),ConstantGenerator(2L))) + "interact correctly with a cooridnator with one task" in { + val sim = + system.actorOf(SingleTaskSimulation.props("sim", self, Seq("r1"), ConstantGenerator(2L))) - sim ! Simulation.Start - expectMsg( Coordinator.SimStarted("sim")) - val Coordinator.AddTask(id, generator, resources) = expectMsgType[ Coordinator.AddTask ] - expectMsg( Coordinator.SimReady ) + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + val Coordinator.AddTask(id, generator, resources) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.SimReady) - val task = generator.create(id,0L,sim,"r1") - sim ! Simulation.TaskCompleted(task,2L) - val Coordinator.SimDone(name, future) = expectMsgType[ Coordinator.SimDone ] - name should be ("sim") - expectNoMsg() + val task = generator.create(id, 0L, sim, "r1") + sim ! Simulation.TaskCompleted(task, 2L) + val Coordinator.SimDone(name, future) = expectMsgType[Coordinator.SimDone] + name should be("sim") + expectNoMsg() + } + + "interact correctly with a cooridnator with a sequence of task, acking tasks as they complete" in { + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends AsyncSimulation(name, coordinator) + with FutureTasks { + override def run(): Future[Any] = { + val id1 = java.util.UUID.randomUUID + val id2 = java.util.UUID.randomUUID + val id3 = java.util.UUID.randomUUID + val task1 = futureTask( + id1, + TaskGenerator("task1", "sim", ConstantGenerator(2L), ConstantGenerator(0L)), + Seq("r1") + ) + ready() + val task2 = task1 flatMap { _ => + val t = futureTask( + id2, + TaskGenerator("task2", "sim", ConstantGenerator(2L), ConstantGenerator(0L)), + Seq("r1") + ) + ack(Seq(id1)) + t + } + val task3 = task2 flatMap { _ => + val t = futureTask( + id3, + TaskGenerator("task3", "sim", ConstantGenerator(2L), ConstantGenerator(0L)), + Seq("r1") + ) + ack(Seq(id2)) + t + } + task3 } + } - "interact correctly with a cooridnator with a sequence of task, acking tasks as they complete" in { - class DummySim(name: String, coordinator: ActorRef) - (implicit executionContext: ExecutionContext) - extends AsyncSimulation(name,coordinator) with FutureTasks { - override def run():Future[Any] = { - val id1 = java.util.UUID.randomUUID - val id2 = java.util.UUID.randomUUID - val id3 = java.util.UUID.randomUUID - val task1 = futureTask(id1, TaskGenerator("task1","sim",ConstantGenerator(2L),ConstantGenerator(0L)),Seq("r1")) - ready() - val task2 = task1 flatMap { _=> - val t = futureTask(id2,TaskGenerator("task2","sim",ConstantGenerator(2L),ConstantGenerator(0L)),Seq("r1")) - ack(Seq(id1)) - t - } - val task3 = task2 flatMap { _=> - val t = futureTask(id3,TaskGenerator("task3","sim",ConstantGenerator(2L),ConstantGenerator(0L)),Seq("r1")) - ack(Seq(id2)) - t - } - task3 - } - } - - val sim = system.actorOf(Props(new DummySim("sim",self))) - - sim ! Simulation.Start - expectMsg( Coordinator.SimStarted("sim")) - val Coordinator.AddTask(id1, generator1, resources1) = expectMsgType[ Coordinator.AddTask ] - expectMsg( Coordinator.SimReady ) - expectNoMsg() - - val task1 = generator1.create(id1,0L,sim,resources1:_*) - sim ! Simulation.TaskCompleted(task1,2L) - - val Coordinator.AddTask(id2, generator2, resources2) = expectMsgType[ Coordinator.AddTask ] - expectMsg( Coordinator.AckTasks(Seq(id1))) - expectNoMsg() - - val task2 = generator2.create(id2,2L,sim,resources2:_*) - sim ! Simulation.TaskCompleted(task2,4L) - - val Coordinator.AddTask(id3, generator3, resources3) = expectMsgType[ Coordinator.AddTask ] - expectMsg( Coordinator.AckTasks(Seq(id2))) - expectNoMsg() - - val task3 = generator3.create(id3,2L,sim,resources3:_*) - sim ! Simulation.TaskCompleted(task3,6L) - - //expectMsg( Coordinator.AckTasks(Seq(id3))) - val Coordinator.SimDone(name, future) = expectMsgType[ Coordinator.SimDone ] - name should be ("sim") - expectNoMsg() + val sim = system.actorOf(Props(new DummySim("sim", self))) - } + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + val Coordinator.AddTask(id1, generator1, resources1) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.SimReady) + expectNoMsg() + + val task1 = generator1.create(id1, 0L, sim, resources1: _*) + sim ! Simulation.TaskCompleted(task1, 2L) + + val Coordinator.AddTask(id2, generator2, resources2) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.AckTasks(Seq(id1))) + expectNoMsg() + + val task2 = generator2.create(id2, 2L, sim, resources2: _*) + sim ! Simulation.TaskCompleted(task2, 4L) + + val Coordinator.AddTask(id3, generator3, resources3) = expectMsgType[Coordinator.AddTask] + expectMsg(Coordinator.AckTasks(Seq(id2))) + expectNoMsg() - "innitiate child simulations" in { - class DummySim(name: String, coordinator: ActorRef) - (implicit executionContext: ExecutionContext) - extends Simulation(name,coordinator) with ChildSims { - private val promise = Promise[Any]() - override def run():Future[Any] = { - val childSim = system.actorOf(SingleTaskSimulation.props("ChildSim",self,Seq("r1"),ConstantGenerator(2L))) - sim(childSim) - ready() - - promise.future - } - override def complete(task: Task, time: Long) = Unit //Does nothing - override def completeActor(actor: ActorRef, time: Long): Unit = promise.success(Unit) - } - - val sim = system.actorOf(Props(new DummySim("sim",self))) - - sim ! Simulation.Start - expectMsg( Coordinator.SimStarted("sim")) - val Coordinator.AddSimNow(actor,parent) = expectMsgType[ Coordinator.AddSimNow ] - parent should be (Some(sim)) - expectMsg( Coordinator.SimReady ) - - sim ! Simulation.SimCompleted(actor,2L) - expectMsg( Coordinator.SimDone("sim",Success(Unit))) - expectNoMsg() + val task3 = generator3.create(id3, 2L, sim, resources3: _*) + sim ! Simulation.TaskCompleted(task3, 6L) + //expectMsg( Coordinator.AckTasks(Seq(id3))) + val Coordinator.SimDone(name, future) = expectMsgType[Coordinator.SimDone] + name should be("sim") + expectNoMsg() + + } + + "innitiate child simulations" in { + class DummySim(name: String, coordinator: ActorRef)( + implicit executionContext: ExecutionContext + ) extends Simulation(name, coordinator) { + private val promise = Promise[Any]() + override def run(): Future[Any] = { + val childSim = system.actorOf( + SingleTaskSimulation.props("ChildSim", self, Seq("r1"), ConstantGenerator(2L)) + ) + sim(childSim) + ready() + + promise.future } + override def complete(task: Task, time: Long) = Unit //Does nothing + override def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = promise.success(Unit) + } + + val sim = system.actorOf(Props(new DummySim("sim", self))) + + sim ! Simulation.Start + expectMsg(Coordinator.SimStarted("sim")) + val Coordinator.AddSimNow(actor, parent) = expectMsgType[Coordinator.AddSimNow] + parent should be(Some(sim)) + expectMsg(Coordinator.SimReady) + + sim ! Simulation.SimCompleted(actor, 2L, java.util.UUID.randomUUID) + expectMsg(Coordinator.SimDone("sim", Success(Unit))) + expectNoMsg() + } + } -} \ No newline at end of file +}