diff --git a/TrillSamples/HelloWorldFS/HelloWorldFS.fsproj b/TrillSamples/HelloWorldFS/HelloWorldFS.fsproj
new file mode 100644
index 0000000..2ae95e5
--- /dev/null
+++ b/TrillSamples/HelloWorldFS/HelloWorldFS.fsproj
@@ -0,0 +1,17 @@
+
+
+
+ Exe
+ netcoreapp2.1
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/TrillSamples/HelloWorldFS/Program.fs b/TrillSamples/HelloWorldFS/Program.fs
new file mode 100644
index 0000000..19c1339
--- /dev/null
+++ b/TrillSamples/HelloWorldFS/Program.fs
@@ -0,0 +1,116 @@
+open System
+open System.Data.SqlTypes
+open System.Linq.Expressions
+open System.Reactive.Linq
+open Microsoft.StreamProcessing
+
+open System.Runtime.CompilerServices
+
+[]
+type Observable with
+ []
+ static member inline toStreamable(observable: IObservable>,
+ ?disorderPolicy : DisorderPolicy,
+ ?flushPolicy : FlushPolicy,
+ ?periodicPunctuationPolicy : PeriodicPunctuationPolicy,
+ ?onCompletedPolicy : OnCompletedPolicy) =
+ let disorderPolicy = DisorderPolicy.Throw() |> defaultArg disorderPolicy
+ let flushPolicy = FlushPolicy.FlushOnPunctuation |> defaultArg flushPolicy
+ let periodicPunctuationPolicy = PeriodicPunctuationPolicy.None() |> defaultArg periodicPunctuationPolicy
+ let onCompletedPolicy = OnCompletedPolicy.EndOfStream |> defaultArg onCompletedPolicy
+ Streamable.ToStreamable(
+ observable,
+ disorderPolicy,
+ flushPolicy,
+ periodicPunctuationPolicy,
+ onCompletedPolicy)
+
+module Streamable =
+ open Microsoft.StreamProcessing
+
+ let join map left right =
+ Streamable.Join(left, right, fun a b -> map a b)
+
+ let keyJoin map left right =
+ Streamable.Join(fst left, fst right, (fun a -> a |> snd left), (fun b -> b |> snd right), (fun a b -> map a b))
+
+ let hintedKeyJoin options map left right =
+ Streamable.Join(fst left, fst right, (fun a -> a |> snd left), (fun b -> b |> snd right), (fun a b -> map a b), options)
+
+ let multicast (map:IStreamable<'TKey,'TPayload> -> IStreamable<'TKey,'TResult>) source =
+ Streamable.Multicast(source, map)
+
+ let multicastMerge (map:IStreamable<'k,'a> -> IStreamable<'k,'b> -> IStreamable<'k,'c>) sourceLeft sourceRight =
+ Streamable.Multicast(sourceLeft, sourceRight, fun a b -> map a b)
+
+ let multicastCount (count:int) source : IStreamable<'TKey,'TPayload>[] =
+ Streamable.Multicast(source, count)
+
+ let toStreamEventObservable (source:IStreamable) =
+ Streamable.ToStreamEventObservable(source)
+
+ let toReshapedStreamEventObservable reshapingPolicy (source:IStreamable) =
+ Streamable.ToStreamEventObservable(source, reshapingPolicy)
+
+ let toPartitionedStreamEventObservable (source:IStreamable,'TPayload>) =
+ Streamable.ToStreamEventObservable(source)
+
+ let toReshapedPartitionedStreamEventObservable reshapingPolicy (source:IStreamable,'TPayload>) =
+ Streamable.ToStreamEventObservable(source, reshapingPolicy)
+
+ let setEventLifetime startTimeSelector (duration:int64) source =
+ Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), duration)
+
+ let mapEventLifetime startTimeSelector (map:int64 -> int64) source =
+ Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), fun duration -> map duration)
+
+ let mergeEventLifetime startTimeSelector (merge:int64 -> int64 -> int64) source =
+ Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), fun a b -> merge a b)
+
+ let mergePartitionedEventLifetime startTimeSelector (merge:'TPartition -> int64 -> int64 -> int64) source : IStreamable,'TPayload> =
+ Streamable.AlterEventLifetime(source, (fun p x -> startTimeSelector p x), fun a b -> merge a b)
+
+ let filter predicate source : IStreamable<'TKey,'TPayload> =
+ Streamable.Where(source, fun e -> predicate e)
+
+ let map (map:'TPayload->'TResult) source : IStreamable<'TKey,'TResult>=
+ Streamable.Select(source, fun e -> map e)
+
+ let bind (map:'TLeft -> 'TRight -> 'TResult) (left:IStreamable<'TKey,'TLeft>) (right:Empty -> IStreamable<'TKey,'TRight>) =
+ Streamable.SelectMany(left, right, fun a b -> map a b)
+
+ let groupBind (groupDefinition:IMapDefinition<'TOuterKey,'TPayload,'TPayload,'TInnerKey,'TResult>) apply (resultSelector:GroupSelectorInput<'TInnerKey> -> 'TBind -> 'TOutput) =
+ Streamable.SelectMany(groupDefinition, apply, fun i b -> resultSelector i b)
+
+type SensorRange = { Time: int64; Low: int; High: int }
+type SensorReading = { Time: int64; Value: int }
+
+let historicData =
+ [ 0; 20; 15; 30; 45; 50; 30; 35; 60; 20 ]
+ |> Seq.mapi (fun i v -> { Time = (int64)(i + 1); Value = v } )
+
+let historicStream =
+ historicData
+ |> Observable.ToObservable
+ |> Observable.map (fun r -> StreamEvent.CreateInterval(r.Time, r.Time + 1L, r))
+ |> Observable.toStreamable
+
+let createStream isRealTime =
+ historicStream
+
+let ranges threshold input =
+ let above = input |> Streamable.filter (fun s -> s.Value > threshold)
+ let below = input |> Streamable.setEventLifetime ((+) 1L) 1L |> Streamable.filter (fun s -> s.Value < threshold)
+ let (|><|) = Streamable.join (fun a b -> { Time = a.Time; Low = b.Value; High = a.Value })
+ below |><| above
+
+[]
+let main argv =
+ let ranges =
+ createStream false
+ |> Streamable.multicast (ranges 42)
+ |> Streamable.toStreamEventObservable
+ Observable.ForEachAsync(ranges, fun e -> printfn "%A" e).Wait()
+ printfn "Done. Press ENTER to terminate"
+ Console.ReadLine() |> ignore
+ 0
\ No newline at end of file
diff --git a/TrillSamples/TrillSamples.sln b/TrillSamples/TrillSamples.sln
index 4d1ba31..8b09cd7 100644
--- a/TrillSamples/TrillSamples.sln
+++ b/TrillSamples/TrillSamples.sln
@@ -44,6 +44,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QueryWritingGuide", "QueryWritingGuide\QueryWritingGuide.csproj", "{C4A9AC70-6189-445D-BBFA-64AB1958C8C6}"
EndProject
+Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "FSharp", "FSharp", "{F3335D3B-A406-4FA8-B6B7-E904E28A77FA}"
+EndProject
+Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "HelloWorldFS", "HelloWorldFS\HelloWorldFS.fsproj", "{9B513F30-B201-4DAC-BE0D-32611D02651A}"
+EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
@@ -188,6 +192,14 @@ Global
{C4A9AC70-6189-445D-BBFA-64AB1958C8C6}.Release|Any CPU.Build.0 = Release|Any CPU
{C4A9AC70-6189-445D-BBFA-64AB1958C8C6}.Release|x64.ActiveCfg = Release|Any CPU
{C4A9AC70-6189-445D-BBFA-64AB1958C8C6}.Release|x64.Build.0 = Release|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|Any CPU.Build.0 = Debug|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|x64.ActiveCfg = Debug|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|x64.Build.0 = Debug|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|Any CPU.ActiveCfg = Release|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|Any CPU.Build.0 = Release|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|x64.ActiveCfg = Release|Any CPU
+ {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|x64.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
@@ -196,6 +208,7 @@ Global
{526AC0AF-4079-4581-ADCA-36913FDEBC68} = {87014F67-F6AE-4AA6-89F2-27E0469A31B0}
{1F0E35C8-03CA-4B23-92A8-57572AD919C4} = {87014F67-F6AE-4AA6-89F2-27E0469A31B0}
{C580BAC3-23A0-42B0-804B-E882AAF55BDE} = {87014F67-F6AE-4AA6-89F2-27E0469A31B0}
+ {9B513F30-B201-4DAC-BE0D-32611D02651A} = {F3335D3B-A406-4FA8-B6B7-E904E28A77FA}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {529E43AF-C26A-4A3A-BF6F-9C267775E28F}