Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
66 commits
Select commit Hold shift + click to select a range
4b8ba59
Some refactoring
PetrosPapapa Dec 9, 2018
980c5db
Generalized `PromiseHandler`
PetrosPapapa Dec 9, 2018
f4ec574
First step
PetrosPapapa Dec 9, 2018
fc50167
Implemented PiSwitch and PiStream
PetrosPapapa Dec 9, 2018
0b9886a
Extending PiStream functionality
PetrosPapapa Dec 10, 2018
3132658
Merge branch 'master' into petros/streamHandlers
PetrosPapapa Dec 11, 2018
1f33ad7
Fixed unit tests
PetrosPapapa Dec 11, 2018
b69ddc1
Updated changelog for 1.3.0
PetrosPapapa Dec 12, 2018
b5a3810
Adds documentation for metrics WIP
PetrosPapapa Jun 26, 2019
b473f4d
Adds scaladoc documentation for com.workflowfm.pew.metrics
Jun 26, 2019
2a63ca6
Adds scaladoc documentation to simulation/metrics/Measure.scala
Jun 26, 2019
960b890
Removes mutable reference to `Coordinator` in `SimMetricsActor`
PetrosPapapa Jun 26, 2019
b546352
Removes white space from CHANGELOG
PetrosPapapa Jun 26, 2019
6f2fc9a
Merge branch 'master' into petros/streamHandlers
PetrosPapapa Jun 26, 2019
e2ced7a
Merge branch 'petros/streamHandlers' into metricsDocs
PetrosPapapa Jun 26, 2019
d443e68
Fixes compilation issues in `KafkaExecutorTests` due to merge
PetrosPapapa Jun 26, 2019
d566082
Updates CHANGELOG
PetrosPapapa Jun 26, 2019
92de26b
Updates CHANGELOG and version
PetrosPapapa Jun 26, 2019
39a777c
Merge branch 'master' into petros/streamHandlers
PetrosPapapa Jun 27, 2019
94686d0
Adds Scaladoc to simulation/metrics/Actor.scala
PetrosPapapa Jun 27, 2019
6687464
Adds Scaladoc to simulation/metrics/Output.scala
Jun 27, 2019
3551ecd
Merge remote-tracking branch 'origin/v1.4.0' into metricsDocs
Jun 27, 2019
070c18a
Updates docs/metrics.org
Jun 27, 2019
10cf5b6
Merge pull request #53 from PetrosPapapa/metricsDocs
PetrosPapapa Jun 27, 2019
133ec7c
Adds some scaladoc documentation
PetrosPapapa Jun 28, 2019
3e2706e
`PiEventHandler` no longer needs a name.
PetrosPapapa Jun 28, 2019
00e8805
`AkkaExecutor` now uses `java.util.UUID` for workflow IDs
PetrosPapapa Jun 28, 2019
d519408
Fixes broken `SingleStateExecutorTests`
PetrosPapapa Jun 28, 2019
0014889
Removes legacy pom.xml
PetrosPapapa Jun 28, 2019
fed1ee2
Merge branch 'v1.4.0' into petros/streamHandlers
PetrosPapapa Jul 10, 2019
def134f
Cleans up simulator stuff and adds subproject for pew-simulator
PetrosPapapa Jul 12, 2019
b6c33b1
Updates d3-timeline to crop overflowing text
PetrosPapapa Jul 13, 2019
1576514
Sets up pew simulator environment
PetrosPapapa Jul 17, 2019
84d01a7
Merge pull request #35 from PetrosPapapa/petros/streamHandlers
PetrosPapapa Jul 29, 2019
8087884
Introduces PiEventIdle and progress towards new simulator
PetrosPapapa Jul 29, 2019
584769e
Merge branch 'v1.4.0' into coordinatorSync
PetrosPapapa Jul 29, 2019
c773ba8
Updates PiSimulation to use the new simulator
PetrosPapapa Jul 29, 2019
11b0991
Makes the atomic process executor ActorRef parameter of AkkaExecutor
PetrosPapapa Jul 30, 2019
6bf28c7
Some progress with the simulator, which will evolve further
PetrosPapapa Jul 30, 2019
903d0fb
Some more progress.
PetrosPapapa Jul 30, 2019
63e6dad
Makes each PiSimulationActor have its own SimulatorExecutor
PetrosPapapa Jul 30, 2019
6c03bec
Major progress towards async simulation
PetrosPapapa Jul 31, 2019
692ff3f
Provides a way for no-task processes to resume, but it's no good
PetrosPapapa Jul 31, 2019
fbbbc21
Minor clean
PetrosPapapa Aug 1, 2019
3239ed4
Uses SubAkka
PetrosPapapa Aug 2, 2019
1237dd5
Swaps receive order for PiSimulationActor
PetrosPapapa Aug 2, 2019
1815b91
Uses SubAkka.HashSetPublisher and SubscriptionSwitch
PetrosPapapa Aug 6, 2019
260c670
Cleans up simulator
PetrosPapapa Aug 6, 2019
454c2ca
Lets Coordinator know we should wait for resumed processes
PetrosPapapa Dec 4, 2019
279e25f
Cleans leftover junk
PetrosPapapa Dec 4, 2019
5a480db
Adds a note about the asks used in `SimulatedPiProcess`
PetrosPapapa Dec 5, 2019
56deef9
Merge pull request #57 from PetrosPapapa/petros/coordinatorSync
PetrosPapapa Dec 5, 2019
0ee8a1d
Removes unused process stores from executors
PetrosPapapa Dec 5, 2019
55a34ec
Updates executors to allow intermediate `PiInstance` execution (#58)
PetrosPapapa Dec 7, 2019
81c3a03
Cleans up more simulation stuff from main library
PetrosPapapa Dec 7, 2019
d3d8a6a
Updates wfm-simulator dependency to version 0.2
PetrosPapapa Dec 12, 2019
ebca1e7
Makes `PiProcess` dependencies vals for efficiency
PetrosPapapa Dec 16, 2019
04d8418
Fixes `MetricsHandler` not handling `PiEventIdle`
PetrosPapapa Dec 16, 2019
3bc7676
Allows control of subscription timeout in `PiStream`
PetrosPapapa Dec 16, 2019
92b76cd
Updates wfm-simulator version to 0.2.1
PetrosPapapa Dec 17, 2019
c5e23ae
Removes unnecessary atomic executor actor from `AkkaExecutor`
PetrosPapapa Dec 18, 2019
e255b55
Updates dependencies
PetrosPapapa Dec 19, 2019
2b609b2
Merge branch 'v1.4.0' of PetrosPapapa/WorkflowFM-PEW into v1.4.0
PetrosPapapa Dec 19, 2019
2a3e4f8
Updates gitignore with metals files
PetrosPapapa Mar 19, 2020
8045630
From Scala 2.12.6 to 2.12.10
PetrosPapapa Mar 19, 2020
4447d41
Finalizes 1.4.0
PetrosPapapa Jul 9, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,7 @@
*~
\#*\#
.\#*
.projectile
.projectile
.metals/
.bloop/
project/metals.sbt
9 changes: 9 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,15 @@ Includes feature updates, bug fixes, and open issues.
* [AkkaExecutor.Call may timeout #4](https://github.com/PetrosPapapa/WorkflowFM-PEW/issues/4) - since v0.1


## [v1.4.0](https://github.com/PetrosPapapa/WorkflowFM-PEW/releases/tag/v1.4.0) - 2019-07-01

### Features

* Improved `PiEventHandlers`. The `PromiseHandler` is now generalized to return a single object at the end of the workflow. The old `PromiseHandler` is an instance called `ResultHandler` (see also [#26](https://github.com/PetrosPapapa/WorkflowFM-PEW/issues/26)).
* Implemented `PiStream` using Akka's `BroadcastHub` to enable more flexible event handling (see also [#34](https://github.com/PetrosPapapa/WorkflowFM-PEW/issues/34)). Executors can now be mixed in with (at least) either of the two default observables, namely `SimplePiObservable` and `PiStream`.
* `SimMetricsActor` no longer keeps a reference to the `Coordinator`. This makes for a cleaner, more flexible implementation, allowing multiple simulations across multiple `Coordinator`s. The downside is that simulations can be run asynchronously, making it hard to disambiguate which results came from which `Coordinator`. We leave that problem to the user for now.


## [v1.3.0](https://github.com/PetrosPapapa/WorkflowFM-PEW/releases/tag/v1.3.0) - 2019-06-19

For some unknown reason, the version number was increased in `build.sbt` back in December without actually merging the intended changes or creating a new tag. In the meantime, [#45](https://github.com/PetrosPapapa/WorkflowFM-PEW/pull/45)) was merged with various bug fixes and minor changes, the Ski example was updated and some documentation was added. I decided to create the tag now and push the stream changes to 1.4.0.
Expand Down
59 changes: 35 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
name := "PEW"

sbtVersion := "1.2.6"

lazy val commonSettings = Seq (
version := "1.3.0-SNAPSHOT",
version := "1.4.0",
organization := "com.workflowfm",
scalaVersion := "2.12.6"
scalaVersion := "2.12.10"
)

autoAPIMappings := true

// The dependencies are in Maven format, with % separating the parts.
// Notice the extra bit "test" on the end of JUnit and ScalaTest, which will
// mean it is only a test dependency.
Expand All @@ -19,37 +19,48 @@ libraryDependencies += "org.scalatest" %% "scalatest" % "3.2.0-SNAP10" % "test"
libraryDependencies += "org.scalamock" %% "scalamock" % "4.1.0" % Test
libraryDependencies += "org.scalacheck" %% "scalacheck" % "1.14.0" % "test"

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.12"
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.5.16" % "test"
libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.6.1"
libraryDependencies += "com.typesafe.akka" %% "akka-testkit" % "2.6.1" % "test"

libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.3.2"

libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.21.1"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.5.13"
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.1.3"
libraryDependencies += "de.heikoseeberger" %% "akka-http-jackson" % "1.21.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "1.1.0"
libraryDependencies += "com.typesafe.akka" %% "akka-stream" % "2.6.1"
libraryDependencies += "com.typesafe.akka" %% "akka-http" % "10.1.11"
libraryDependencies += "de.heikoseeberger" %% "akka-http-jackson" % "1.27.0"
libraryDependencies += "org.apache.kafka" %% "kafka" % "1.1.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams" % "1.1.0"

libraryDependencies += "org.mongodb.scala" %% "mongo-scala-driver" % "2.2.1"

libraryDependencies += "junit" % "junit" % "4.8.2"

EclipseKeys.preTasks := Seq(compile in Compile, compile in Test)
libraryDependencies += "uk.ac.ed.inf" %% "subakka" % "0.1-SNAPSHOT"
libraryDependencies += "uk.ac.ed.inf" %% "subakka" % "0.1-SNAPSHOT" % Test classifier "tests"

lazy val skiexample = project
.in(file("skiexample"))
.settings(
commonSettings,
scalaSource in Compile := baseDirectory.value / "src",
scalaSource in Test := baseDirectory.value / "test"
).dependsOn(rootRef)

lazy val simulator = project
.in(file("simulator"))
.settings(
commonSettings,
name := "pew-simulator",
libraryDependencies += "com.workflowfm" %% "wfm-simulator" % "0.2.1"
).dependsOn(rootRef)

lazy val skiexample = project
.in(file("skiexample"))
.settings(
commonSettings,
scalaSource in Compile := baseDirectory.value / "src",
scalaSource in Test := baseDirectory.value / "test"
).dependsOn(rootRef)

lazy val root = project
.in(file("."))
.settings(
commonSettings,
scalaSource in Compile := baseDirectory.value / "src",
scalaSource in Test := baseDirectory.value / "test"
)
.in(file("."))
.settings(
commonSettings,
name := "pew",
scalaSource in Compile := baseDirectory.value / "src",
scalaSource in Test := baseDirectory.value / "test"
)
lazy val rootRef = LocalProject("root")
200 changes: 200 additions & 0 deletions docs/metrics.org
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
#+TITLE: Metrics
#+AUTHOR: Petros Papapanagiotou
#+EMAIL: petrospapapan@gmail.com
#+OPTIONS: toc:2
#+EXCLUDE_TAGS: noexport

* Introduction

The PEW engine is able to automatically track metrics during workflow execution. These can be used to monitor the workflows and extract analytics and insights.

Metrics can be tracked in 2 settings:
1) [[realtime][*Real-time execution*]]: These are metrics from the engine regarding the execution of any workflow.
2) [[simulation][*Simulation*]]: These are metrics from the simulator across the simulated time.

We describe the general setting and the individual metrics next.


* Setting

The general idea for PEW, is that the engine will automatically collect all the available metrics at runtime. The user can then implement an output function to generate some analytics from the collected metrics.

The following key concepts are used by PEW for capturing and managing metrics:

** ~Metrics~
Metrics are captured around individual concepts, such as an atomic process or task, a persistent resource, or a workflow. Each set of metrics is captured in an immutable case class. This includes the different features being measured and the methods to update them based on what is happening in the engine.

** ~Aggregator~
An ~Aggregator~ is a mutable class that collects all the ~Metrics~ collected across multiple workflows in one place. It contains methods to update the different metrics, indexed by some id, and based on different events that may take place.

** ~Output~
An ~Output~ is essentially a function that can generate any outputs from the ~Metrics~ within an ~Aggregator~. Outputs may include analytics, visualizations, reports, or anything else.


* <<realtime>>Real-time execution metrics

[[[https://github.com/PetrosPapapa/WorkflowFM-PEW/tree/master/src/com/workflowfm/pew/metrics][Source]]]

Real-time metrics are minimal, as they are aimed to be generic and domain-independent. We capture metrics about calls to atomic processes and metrics abount executions of entire workflows.

The metrics are gathered in a ~MetricsAggregator~ and can be processed through a ~MetricsOutput~.

A ~MetricsHandler~ is the most convenient way of gathering metrics directly from a ~ProcessExecutor~. It is a ~PiEventHandler~, which means you can register it directly to the executor and process the results afterwards.

Here's an example pattern:
#+BEGIN_SRC scala
// Instantiate your handler. Call it "metrics".
val handler = new MetricsHandler[Int]("metrics")

// Instantiate your executor (assuming a list of processes).
val executor = new AkkaExecutor(processes :_*)

// Subscribe the handler and obtain a kill switch to unsubscribe it when done.
val killSwitch = executor.subscribe(handler)

///////////////////////////////////
// Execute all your workflows here.
// Wait for them to finish.
///////////////////////////////////

// Stop/unsubscribe the handler.
killSwitch.map(_.stop)

// Instantiate your output, in this case a simple MetricsPrinter.
val output = new MetricsPrinter[Int]()

// Run it on the results.
output(handler)
#+END_SRC

The 2 types of metrics available are described next.

** ~ProcessMetrics~

This captures metrics about a particular call of an ~AtomicProcess~.
- ~piID~: The ID of the workflow that executed the atomic process.
- ~ref~: A unique call ID for this process call within the particular workflow.
- ~process~: The name of the process.
- ~start~: The system time in milliseconds when the process call started.
- ~finish~: The system time in milliseconds that the process call finished, or ~None~ if it is still running.
- ~result~: A ~String~ representation of the returned result from the process call, or ~None~ if it still running. In case of failure, the field is populated with the localized message of the exception thrown.

** ~WorkflowMetrics~

This captures metrics for a particular execution of a workflow (~CompositeProcess~).
- ~piID~: The unique ID of the workflow.
- ~start~: The system time in milliseconds when the workflow started executing.
- ~calls~: The number of individual calls performed to atomic processes.
- ~finish~: The system time in milliseconds that the workflow finished, or ~None~ if it is still running.
- ~result~: a ~String~ representation of the returned result from the workflow, or ~None~ if it still running. In case of failure, the field is populated with the localized message of the exception thrown.


* <<simulation>>Simulation metrics

[[[https://github.com/PetrosPapapa/WorkflowFM-PEW/tree/master/src/com/workflowfm/pew/simulation/metrics][Source]]]

Simulation metrics are somewhat richer than the real-time ones. We capture metrics about each ~Task~, ~Simulation~ and ~TaskResource~ used. More details about these concepts can be found [[https://github.com/PetrosPapapa/WorkflowFM-PEW/wiki/Simulation][here]].

The metrics are gathered in a ~SimMetricsAggregator~ and can be processed through a ~SimMetricsOutput~.

The general assumption is that simulations run on ~AkkaExecutor~. Under this assumption, we can expand the implementation to ensure asynchronous gathering of the metrics and automatic execution of the output in the end. For this reason we have introduced the [[https://github.com/PetrosPapapa/WorkflowFM-PEW/blob/master/src/com/workflowfm/pew/simulation/metrics/Actor.scala][~SimMetricsActor~]] that takes care of all of this for us.

Here is an example setup to manage simulation metrics, assuming an active ~ActorSystem~:
#+BEGIN_SRC scala
// Instantiate the Coordinator.
val coordinator = system.actorOf(Coordinator.props(DefaultScheduler))

// Load the a list of available TaskResources to the Coordinator.
coordinator ! Coordinator.AddResources(machines)

// Instantiate your output, in this case a simple SimMetricsPrinter.
val output = new SimMetricsPrinter()

// Create the SimMetricsActor.
val metricsActor = system.actorOf(SimMetricsActor.props(output))

// Set up a list of simulations, paired with their starting times.
val simulations:Seq[(Long,Simulation)] = ...

// Instantiate the executor.
val executor = new AkkaExecutor(simulations flatMap (_._2.getProcesses()) :_*)

// Start the simulations through the SimMetricsActor.
metricsActor ! SimMetricsActor.StartSims(coordinator,simulations,executor)
#+END_SRC

The ~metricsActor~ will automatically run the output function (the printer in this case) on the results.

Note that, in this scenario, the ~metricsActor~ will also shutdown the ~ActorSystem~. If you want to avoid that, e.g. when you need to run multiple independent simulations, you need to set up your own actor that will be given the opportunity to act when the simulation and metrics output have finished. Assuming ~a:ActorRef~ is that actor, you can pass it to the ~metricsActor~ at construction as follows:
#+BEGIN_SRC scala
val metricsActor = system.actorOf(SimMetricsActor.props(output, Some(a)))
#+END_SRC

Your actor will receive a ~Coordinator.Done~ message when everything is done and the ~ActorSystem~ will remain active.

The 3 types of gathered metrics are described next.

** ~TaskMetrics~

This captures metrics for a simulated ~Task~.
- ~id~: The unique ID of the ~Task~.
- ~task~: The name of the ~Task~.
- ~simulation~: The name of the simulation the ~Task~ belongs to.
- ~created~: The virtual timestamp when the ~Task~ was created and entered the ~Coordinator~.
- ~started~: The virtual timestamp when the ~Task~ started executing, or ~None~ if it has not started yet.
- ~duration~: The virtual duration of the ~Task~.
- ~cost~: The cost associated with the ~Task~.
- ~resources~: The list of names of the ~TaskResource~ this ~Task~ used.

** ~SimulationMetrics~

This captures metrics for a particular ~Simulation~.
- ~name~: The unique name of the ~Simulation~.
- ~started~: The virtual timestamp when the ~Simulation~ started executing.
- ~duration~: The virtual duration of the ~Simulation~.
- ~delay~: The sum of all delays for all involved ~Task~.
- ~tasks~: The number of ~Task~ associated with the ~Simulation~ so far.
- ~cost~: The total cost associated with the ~Simulation~ so far.
- ~result~: a ~String~ representation of the returned result from the ~Simulation~, or ~None~ if it still running. In case of failure, the field is populated with the localized message of the exception thrown.

** ~ResourceMetrics~

This captures metrics for a particular ~TaskResource~.
- ~name~: The unique name of the ~TaskResource~.
- ~busyTime~: The total amount of virtual time that the ~TaskResource~ has been busy, i.e. attached to a ~Task~.
- ~idleTime~: The total amount of virtual time that the ~TaskResource~ has been idle, i.e. not attached to any ~Task~.
- ~tasks~: The number of different ~Task~ that have been attached to this ~TaskResource~.
- ~cost~: The total cost associated with this ~TaskResource~.


* Extending the metrics

Analytics that can be derived from the current metrics can be calculated by a custom output function.

Implementation of new types of metrics in the current setup requires an extension of each of the 3 main concepts and, more importantly, a computational way to generate these metrics at runtime.

The former can be easily achieved by:
1) Implementing your own custom case classes for your metrics.
2) Extending one of the existing aggregators to hold your new metrics.
3) Extending the output classes to deal with your custom metrics.

The latter is harder, as the current metrics are measured directly in the ~PiEvent~'s generated by the executor or by the simulation ~Coordinator~.

Metrics that can be calculated by atomic processes (or tasks), can be given as metadata output in the process implementation. Instead of implementing a standard ~AtomicProcess~, switch its inheritance to a ~MetadataAtomicProcess~. You can then implement the ~run~ function so that it returns calculated metrics as one or more ~PiMetadataElem~.

Here's an example pattern:
#+BEGIN_SRC scala
override def runMeta( args: Seq[PiObject] )( implicit ec: ExecutionContext ): Future[MetadataAtomicResult] = {
// run this as a regular AtomicProcess
run( args ).map { result =>
// calculate your metrics
val metrics :Future[Seq[PiMetadataElem]] = ...
// return the combined result (assuming metrics is a Future here)
metrics.map { m => MetadataAtomicProcess.result(result, m :_*) }
}
}
#+END_SRC

The generated metadata will be attached to the corresponding ~PiEventReturn~, so you can use a ~PiEventHandler~ to grab it and pass it to your aggregator.

Calculating the metrics at the same time as the result requires refactoring of the automatically generated code.
50 changes: 0 additions & 50 deletions pom.xml

This file was deleted.

Loading