Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions src/main/scala/com/workflowfm/simulator/Coordinator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ class Coordinator(
*/
val waiting: Map[ActorRef, List[UUID]] = Map[ActorRef, List[UUID]]()

val parents: Map[ActorRef, ActorRef] = Map[ActorRef, ActorRef]()

/** Set of simulation names that are running, i.e. they have already started but not finished. */
/**
* Set of simulation names that are running.
Expand Down Expand Up @@ -240,9 +242,10 @@ class Coordinator(
* time.
* @param actor The reference to the [[Simulation]] corresponding to the simulation.
*/
protected def addSimulation(t: Long, actor: ActorRef) = {
protected def addSimulation(t: Long, actor: ActorRef, parent: Option[ActorRef]=None) = {
publish(ESimAdd(self, time, actor.toString(), t))
if (t >= time) events += StartingSim(t, actor)
if (parent.isDefined) parents += actor -> parent.get
}

/**
Expand Down Expand Up @@ -303,6 +306,11 @@ class Coordinator(
protected def stopSimulation(name: String, result: String, actor: ActorRef) = {
simulations -= name
waiting -= actor
parents.get(actor) map {x=>
val id = java.util.UUID.randomUUID
waiting += x -> List(id)
x ! Simulation.SimCompleted(actor,time,id)
}
publish(ESimEnd(self, time, name, result))
log.debug(s"[COORD:$time] Finished: [${actor.path.name}]")
ready(actor)
Expand Down Expand Up @@ -543,9 +551,9 @@ class Coordinator(
* @return The [[Receive]] behaviour.
*/
def receiveBehaviour: Receive = {
case Coordinator.AddSim(t, s) => addSimulation(t, s)
case Coordinator.AddSim(t, s, p) => addSimulation(t, s, p)
case Coordinator.AddSims(l) => addSimulations(l)
case Coordinator.AddSimNow(s) => addSimulation(time, s)
case Coordinator.AddSimNow(s,p) => addSimulation(time, s, p)
case Coordinator.AddSimsNow(l) => addSimulations(l.map((time, _)))

case Coordinator.AddResource(r) => addResource(r)
Expand Down Expand Up @@ -615,7 +623,7 @@ object Coordinator {
* Message to add a simulation.
* @group toplevel
*/
case class AddSim(t: Long, actor: ActorRef)
case class AddSim(t: Long, actor: ActorRef, parent: Option[ActorRef]=None)
/**
* Message to add a list of simulations.
* @group toplevel
Expand All @@ -625,7 +633,7 @@ object Coordinator {
* Message to add a simulation due to start right now.
* @group toplevel
*/
case class AddSimNow(actor: ActorRef)
case class AddSimNow(actor: ActorRef, parent: Option[ActorRef]=None)
/**
* Message to add a list of simulations due to start right now.
* @group toplevel
Expand Down
33 changes: 32 additions & 1 deletion src/main/scala/com/workflowfm/simulator/Simulation.scala
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ import java.util.UUID
*/
abstract class Simulation(
name: String,
coordinator: ActorRef
val coordinator: ActorRef
)(implicit executionContext: ExecutionContext)
extends Actor {

Expand All @@ -72,10 +72,13 @@ abstract class Simulation(
* @group api
* @return A `Future` that completes with a custom output when the simulation is completed.
*/

def run(): Future[Any]

def complete(task: Task, time: Long): Unit

def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = Unit

/**
* Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation.
*
Expand Down Expand Up @@ -123,6 +126,10 @@ abstract class Simulation(
coordinator ! Coordinator.AddTask(id, t, resources)
}

def sim(actor: ActorRef) {
coordinator ! Coordinator.AddSimNow(actor,Some(self))
}

/**
* Starts the simulation via the [[run]] function.
*
Expand Down Expand Up @@ -187,6 +194,7 @@ abstract class Simulation(
def simulationReceive: Receive = {
case Simulation.Start => start()
case Simulation.TaskCompleted(task, time) => complete(task, time)
case Simulation.SimCompleted(actor, time, id) => completeActor(actor, time, id)
}

def receive = simulationReceive
Expand Down Expand Up @@ -236,6 +244,8 @@ object Simulation {
* @param time The (virtual) time of completion.
*/
case class TaskCompleted(task: Task, time: Long)
//TODO document
case class SimCompleted(actor: ActorRef, time: Long, id: UUID)
/**
* Tells the [[Simulation]] to request that [[Coordinator]] waits.
*
Expand Down Expand Up @@ -295,6 +305,7 @@ class SingleTaskSimulation(
}

override def complete(task: Task, time: Long) = promise.success((task, time))
override def completeActor(actor: ActorRef, time: Long, id: UUID): Unit = Unit
}

object SingleTaskSimulation {
Expand Down Expand Up @@ -349,6 +360,7 @@ abstract class AsyncSimulation(
* @group internal
*/
private val tasks: Map[UUID, Callback] = Map()
private val childSims: Map[String,UUID] = Map()

/**
* Declare a new [[TaskGenerator]] that needs to be sent to the [[Coordinator]] for simulation.
Expand Down Expand Up @@ -403,6 +415,13 @@ abstract class AsyncSimulation(
super.task(id, t, resources)
}

def sim(actor: ActorRef, callback: Callback): Unit = {
val id = java.util.UUID.randomUUID
tasks += id -> callback
childSims += actor.path.name -> id
super.sim(actor)
}

/**
* Manages a [[Task]] whose simulation has completed.
*
Expand All @@ -418,6 +437,11 @@ abstract class AsyncSimulation(
tasks -= task.id
}

override def completeActor(actor: ActorRef, time: Long, id: UUID) = {
childSims.get(actor.path.name) map ( tasks.get(_).map(_(null, time)) )
ack(Seq(id))
}

def actorCallback(actor: ActorRef): Callback = (task, time) => {
actor ! (task, time)
}
Expand All @@ -436,6 +460,7 @@ abstract class AsyncSimulation(
case Simulation.Ready => ready()
case Simulation.AckTasks(tasks) => ack(tasks)
case Simulation.TaskCompleted(task, time) => complete(task, time)
case Simulation.SimCompleted(actor, time, id) => completeActor(actor, time, id)
case Simulation.AddTaskWithId(id, t, r) => task(id, t, actorCallback(sender), r)
case Simulation.AddTask(t, r) => task(t, actorCallback(sender), r: _*)
case Simulation.Wait => coordinator.forward(Coordinator.WaitFor(self))
Expand All @@ -456,3 +481,9 @@ trait FutureTasks { self: AsyncSimulation =>
p.future
}
}

// trait ChildSims { myself: Simulation =>
// def sim(actor: ActorRef) {
// myself.coordinator ! Coordinator.AddSimNow(actor,Some(self))
// }
// }
89 changes: 89 additions & 0 deletions src/main/scala/com/workflowfm/simulator/SimulationChildDemo.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package com.workflowfm.simulator

import akka.actor.{ ActorSystem, Props, ActorRef }
import akka.util.Timeout
import akka.pattern.ask
import scala.concurrent._
import scala.concurrent.duration._
import com.workflowfm.simulator._
import com.workflowfm.simulator.metrics._
import com.workflowfm.simulator.events.{ ShutdownHandler }
import uk.ac.ed.inf.ppapapan.subakka.Subscriber
import java.io.File

object FlowsMain {
//toggle for debug
val DEBUG = false

def main(args: Array[String]): Unit = {

implicit val system: ActorSystem = ActorSystem("ChildSims")
implicit val executionContext: ExecutionContext = ExecutionContext.global
implicit val timeout = Timeout(2.seconds)

val coordinator = system.actorOf(Coordinator.props(DefaultScheduler))
val shutdownActor = Subscriber.actor(new ShutdownHandler())

val handler = SimMetricsOutputs(
new SimMetricsPrinter(),
new SimCSVFileOutput("ChildSims" + File.separator + "output" + File.separator,"ChildSims"),
new SimD3Timeline("ChildSims" + File.separator + "output" + File.separator,"ChildSims")
)

Await.result(new SimOutputHandler(handler).subAndForgetTo(coordinator,Some("MetricsHandler")), 3.seconds)
Await.result(shutdownActor ? Subscriber.SubAndForgetTo(coordinator), 3.seconds)

if (DEBUG) {
println(s"Cores: ${Runtime.getRuntime().availableProcessors()}")
val config = system.settings.config.getConfig("akka.actor.default-dispatcher")
println(s"Parallelism: ${config.getInt("fork-join-executor.parallelism-min")}-${config.getInt("fork-join-executor.parallelism-max")} x ${config.getDouble("fork-join-executor.parallelism-factor")}")
val printer = new com.workflowfm.simulator.events.PrintEventHandler
Await.result(printer.subAndForgetTo(coordinator), 3.seconds)
}

class DummySim(name: String, coordinator: ActorRef)(
implicit executionContext: ExecutionContext
) extends AsyncSimulation(name, coordinator) {

override def run(): Future[Any] = {
val promise = Promise[Any]()

val childSim = system.actorOf(SingleTaskSimulation.props("childSim", coordinator, Seq("r1"), ConstantGenerator(5L)))
val callback2: Callback = (_,_)=> promise.success(Unit)

sim(childSim,callback2)

task(TaskGenerator("t1","sim",ConstantGenerator(1L),ConstantGenerator(0L)),
(_,_)=> {task(TaskGenerator("t2","sim",ConstantGenerator(2L),ConstantGenerator(0L)), (_,_)=>ready(), "r3" ); ready() },
"r2"
)

ready()

promise.future

}
}

//=========================================================================================

// Define resources
val r1 = new TaskResource("r1",0)
val r2 = new TaskResource("r2",0)
val r3 = new TaskResource("r3",0)
val r4 = new TaskResource("r4",0)
val r5 = new TaskResource("r5",0)
val r6 = new TaskResource("r6",0)
val r7 = new TaskResource("r7",0)
val r8 = new TaskResource("r8",0)
val resources = List (r1,r2,r3,r4,r5,r6,r7,r8)
coordinator ! Coordinator.AddResources(resources)

val sim = Props(new DummySim("sim", coordinator))

coordinator ! Coordinator.AddSim(0L,system.actorOf(sim,"sim"))

coordinator ! Coordinator.Start
}

}
Loading