-
Notifications
You must be signed in to change notification settings - Fork 0
Metrics
- Introduction
- Setting
- Real-time execution metrics
- Simulation metrics
- Extending the metrics
The PEW engine is able to automatically track metrics during workflow execution. These can be used to monitor the workflows and extract analytics and insights.
Metrics can be tracked in 2 settings:
- Real-time execution: These are metrics from the engine regarding the execution of any workflow.
- Simulation: These are metrics from the simulator across the simulated time.
We describe the general setting and the individual metrics next.
The general idea for PEW, is that the engine will automatically collect all the available metrics at runtime. The user can then implement an output function to generate some analytics from the collected metrics.
The following key concepts are used by PEW for capturing and managing metrics:
Metrics are captured around individual concepts, such as an atomic process or task, a persistent resource, or a workflow. Each set of metrics is captured in an immutable case class. This includes the different features being measured and the methods to update them based on what is happening in the engine.
An Aggregator is a mutable class that collects all the Metrics collected across multiple workflows in one place. It contains methods to update the different metrics, indexed by some id, and based on different events that may take place.
An Output is essentially a function that can generate any outputs from the Metrics within an Aggregator. Outputs may include analytics, visualizations, reports, or anything else.
[Source]
Real-time metrics are minimal, as they are aimed to be generic and domain-independent. We capture metrics about calls to atomic processes and metrics abount executions of entire workflows.
The metrics are gathered in a MetricsAggregator and can be processed through a MetricsOutput.
A MetricsHandler is the most convenient way of gathering metrics directly from a ProcessExecutor. It is a PiEventHandler, which means you can register it directly to the executor and process the results afterwards.
Here’s an example pattern:
// Instantiate your handler. Call it "metrics".
val handler = new MetricsHandler[Int]("metrics")
// Instantiate your executor (assuming a list of processes).
val executor = new AkkaExecutor(processes :_*)
// Subscribe the handler and obtain a kill switch to unsubscribe it when done.
val killSwitch = executor.subscribe(handler)
///////////////////////////////////
// Execute all your workflows here.
// Wait for them to finish.
///////////////////////////////////
// Stop/unsubscribe the handler.
killSwitch.map(_.stop)
// Instantiate your output, in this case a simple MetricsPrinter.
val output = new MetricsPrinter[Int]()
// Run it on the results.
output(handler)The 2 types of metrics available are described next.
This captures metrics about a particular call of an AtomicProcess.
-
piID: The ID of the workflow that executed the atomic process. -
ref: A unique call ID for this process call within the particular workflow. -
process: The name of the process. -
start: The system time in milliseconds when the process call started. -
finish: The system time in milliseconds that the process call finished, orNoneif it is still running. -
result: AStringrepresentation of the returned result from the process call, orNoneif it still running. In case of failure, the field is populated with the localized message of the exception thrown.
This captures metrics for a particular execution of a workflow (CompositeProcess).
-
piID: The unique ID of the workflow. -
start: The system time in milliseconds when the workflow started executing. -
calls: The number of individual calls performed to atomic processes. -
finish: The system time in milliseconds that the workflow finished, orNoneif it is still running. -
result: aStringrepresentation of the returned result from the workflow, orNoneif it still running. In case of failure, the field is populated with the localized message of the exception thrown.
[Source]
Simulation metrics are somewhat richer than the real-time ones. We capture metrics about each Task, Simulation and TaskResource used. More details about these concepts can be found here.
The metrics are gathered in a SimMetricsAggregator and can be processed through a SimMetricsOutput.
The general assumption is that simulations run on AkkaExecutor. Under this assumption, we can expand the implementation to ensure asynchronous gathering of the metrics and automatic execution of the output in the end. For this reason we have introduced the SimMetricsActor that takes care of all of this for us.
Here is an example setup to manage simulation metrics, assuming an active ActorSystem:
// Instantiate the Coordinator.
val coordinator = system.actorOf(Coordinator.props(DefaultScheduler))
// Load the a list of available TaskResources to the Coordinator.
coordinator ! Coordinator.AddResources(machines)
// Instantiate your output, in this case a simple SimMetricsPrinter.
val output = new SimMetricsPrinter()
// Create the SimMetricsActor.
val metricsActor = system.actorOf(SimMetricsActor.props(output))
// Set up a list of simulations, paired with their starting times.
val simulations:Seq[(Long,Simulation)] = ...
// Instantiate the executor.
val executor = new AkkaExecutor(simulations flatMap (_._2.getProcesses()) :_*)
// Start the simulations through the SimMetricsActor.
metricsActor ! SimMetricsActor.StartSims(coordinator,simulations,executor)The metricsActor will automatically run the output function (the printer in this case) on the results.
Note that, in this scenario, the metricsActor will also shutdown the ActorSystem. If you want to avoid that, e.g. when you need to run multiple independent simulations, you need to set up your own actor that will be given the opportunity to act when the simulation and metrics output have finished. Assuming a:ActorRef is that actor, you can pass it to the metricsActor at construction as follows:
val metricsActor = system.actorOf(SimMetricsActor.props(output, Some(a)))Your actor will receive a Coordinator.Done message when everything is done and the ActorSystem will remain active.
The 3 types of gathered metrics are described next.
This captures metrics for a simulated Task.
-
id: The unique ID of theTask. -
task: The name of theTask. -
simulation: The name of the simulation theTaskbelongs to. -
created: The virtual timestamp when theTaskwas created and entered theCoordinator. -
started: The virtual timestamp when theTaskstarted executing, orNoneif it has not started yet. -
duration: The virtual duration of theTask. -
cost: The cost associated with theTask. -
resources: The list of names of theTaskResourcethisTaskused.
This captures metrics for a particular Simulation.
-
name: The unique name of theSimulation. -
started: The virtual timestamp when theSimulationstarted executing. -
duration: The virtual duration of theSimulation. -
delay: The sum of all delays for all involvedTask. -
tasks: The number ofTaskassociated with theSimulationso far. -
cost: The total cost associated with theSimulationso far. -
result: aStringrepresentation of the returned result from theSimulation, orNoneif it still running. In case of failure, the field is populated with the localized message of the exception thrown.
This captures metrics for a particular TaskResource.
-
name: The unique name of theTaskResource. -
busyTime: The total amount of virtual time that theTaskResourcehas been busy, i.e. attached to aTask. -
idleTime: The total amount of virtual time that theTaskResourcehas been idle, i.e. not attached to anyTask. -
tasks: The number of differentTaskthat have been attached to thisTaskResource. -
cost: The total cost associated with thisTaskResource.
Analytics that can be derived from the current metrics can be calculated by a custom output function.
Implementation of new types of metrics in the current setup requires an extension of each of the 3 main concepts and, more importantly, a computational way to generate these metrics at runtime.
The former can be easily achieved by:
- Implementing your own custom case classes for your metrics.
- Extending one of the existing aggregators to hold your new metrics.
- Extending the output classes to deal with your custom metrics.
The latter is harder, as the current metrics are measured directly in the PiEvent’s generated by the executor or by the simulation Coordinator.
Metrics that can be calculated by atomic processes (or tasks), can be given as metadata output in the process implementation. Instead of implementing a standard AtomicProcess, switch its inheritance to a MetadataAtomicProcess. You can then implement the run function so that it returns calculated metrics as one or more PiMetadataElem.
Here’s an example pattern:
override def runMeta( args: Seq[PiObject] )( implicit ec: ExecutionContext ): Future[MetadataAtomicResult] = {
// run this as a regular AtomicProcess
run( args ).map { result =>
// calculate your metrics
val metrics :Future[Seq[PiMetadataElem]] = ...
// return the combined result (assuming metrics is a Future here)
metrics.map { m => MetadataAtomicProcess.result(result, m :_*) }
}
}The generated metadata will be attached to the corresponding PiEventReturn, so you can use a PiEventHandler to grab it and pass it to your aggregator.
Calculating the metrics at the same time as the result requires refactoring of the automatically generated code.