From 3064da45a98c3d0de82287e239b18273cfbd9449 Mon Sep 17 00:00:00 2001 From: Peter Shrosbree Date: Fri, 21 Dec 2018 23:06:41 -0800 Subject: [PATCH 1/2] First pass at an F# Trill sample --- TrillSamples/HelloWorldFS/HelloWorldFS.fsproj | 17 +++++ TrillSamples/HelloWorldFS/Program.fs | 67 +++++++++++++++++++ TrillSamples/TrillSamples.sln | 13 ++++ 3 files changed, 97 insertions(+) create mode 100644 TrillSamples/HelloWorldFS/HelloWorldFS.fsproj create mode 100644 TrillSamples/HelloWorldFS/Program.fs diff --git a/TrillSamples/HelloWorldFS/HelloWorldFS.fsproj b/TrillSamples/HelloWorldFS/HelloWorldFS.fsproj new file mode 100644 index 0000000..2ae95e5 --- /dev/null +++ b/TrillSamples/HelloWorldFS/HelloWorldFS.fsproj @@ -0,0 +1,17 @@ + + + + Exe + netcoreapp2.1 + + + + + + + + + + + + diff --git a/TrillSamples/HelloWorldFS/Program.fs b/TrillSamples/HelloWorldFS/Program.fs new file mode 100644 index 0000000..7156a9f --- /dev/null +++ b/TrillSamples/HelloWorldFS/Program.fs @@ -0,0 +1,67 @@ +open System +open System.Data.SqlTypes +open System.Linq.Expressions +open System.Reactive.Linq +open Microsoft.StreamProcessing + +module Streamable = + open Microsoft.StreamProcessing + + let ofObservable observable = + Streamable.ToStreamable( + observable, + DisorderPolicy.Throw(), + FlushPolicy.FlushOnPunctuation, + PeriodicPunctuationPolicy.None(), + OnCompletedPolicy.EndOfStream) + + let join mapping left right = + Streamable.Join(left, right, fun a b -> mapping a b) + + let multicast (mapping:IStreamable<'k,'a> -> IStreamable<'k,'b>) source = + Streamable.Multicast(source, mapping) + + let toStreamEventObservable (source:IStreamable) = + Streamable.ToStreamEventObservable(source) + + let alterEventLifetime (duration:int64) startTimeSelector source = + Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), duration) + + let filter predicate source = + Streamable.Where(source, fun e -> predicate e) + + let map mapping source = + Streamable.Select(source, fun e -> mapping e) + +type SensorRange = { Time: int64; Low: int; High: int } +type SensorReading = { Time: int64; Value: int } + +let historicData = + [ 0; 20; 15; 30; 45; 50; 30; 35; 60; 20 ] + |> Seq.mapi (fun i v -> { Time = (int64)(i + 1); Value = v } ) + +let historicStream = + historicData + |> Observable.ToObservable + |> Observable.map (fun r -> StreamEvent.CreateInterval(r.Time, r.Time + 1L, r)) + |> Streamable.ofObservable + +let createStream isRealTime = + historicStream + +let ranges threshold input = + let above = input |> Streamable.filter (fun s -> s.Value > threshold) + let below = input |> Streamable.alterEventLifetime 1L ((+) 1L) |> Streamable.filter (fun s -> s.Value < threshold) + let (|><|) = Streamable.join (fun a b -> { Time = a.Time; Low = b.Value; High = a.Value }) + below |><| above + +[] +let main argv = + let ranges = + createStream false + |> Streamable.multicast (ranges 42) + |> Streamable.toStreamEventObservable + Observable.ForEachAsync(ranges, fun e -> printfn "%A" e).Wait() + printfn "Done. Press ENTER to terminate" + Console.ReadLine() |> ignore + 0 \ No newline at end of file diff --git a/TrillSamples/TrillSamples.sln b/TrillSamples/TrillSamples.sln index 4d1ba31..8b09cd7 100644 --- a/TrillSamples/TrillSamples.sln +++ b/TrillSamples/TrillSamples.sln @@ -44,6 +44,10 @@ Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Solution Items", "Solution EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "QueryWritingGuide", "QueryWritingGuide\QueryWritingGuide.csproj", "{C4A9AC70-6189-445D-BBFA-64AB1958C8C6}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "FSharp", "FSharp", "{F3335D3B-A406-4FA8-B6B7-E904E28A77FA}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "HelloWorldFS", "HelloWorldFS\HelloWorldFS.fsproj", "{9B513F30-B201-4DAC-BE0D-32611D02651A}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -188,6 +192,14 @@ Global {C4A9AC70-6189-445D-BBFA-64AB1958C8C6}.Release|Any CPU.Build.0 = Release|Any CPU {C4A9AC70-6189-445D-BBFA-64AB1958C8C6}.Release|x64.ActiveCfg = Release|Any CPU {C4A9AC70-6189-445D-BBFA-64AB1958C8C6}.Release|x64.Build.0 = Release|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|x64.ActiveCfg = Debug|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Debug|x64.Build.0 = Debug|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|Any CPU.Build.0 = Release|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|x64.ActiveCfg = Release|Any CPU + {9B513F30-B201-4DAC-BE0D-32611D02651A}.Release|x64.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -196,6 +208,7 @@ Global {526AC0AF-4079-4581-ADCA-36913FDEBC68} = {87014F67-F6AE-4AA6-89F2-27E0469A31B0} {1F0E35C8-03CA-4B23-92A8-57572AD919C4} = {87014F67-F6AE-4AA6-89F2-27E0469A31B0} {C580BAC3-23A0-42B0-804B-E882AAF55BDE} = {87014F67-F6AE-4AA6-89F2-27E0469A31B0} + {9B513F30-B201-4DAC-BE0D-32611D02651A} = {F3335D3B-A406-4FA8-B6B7-E904E28A77FA} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {529E43AF-C26A-4A3A-BF6F-9C267775E28F} From b2559d7ccb2981abc96c8275f9633bfe50120962 Mon Sep 17 00:00:00 2001 From: Peter Shrosbree Date: Thu, 27 Dec 2018 12:48:34 -0800 Subject: [PATCH 2/2] Extend Trill mappings Add optional parameters to toStreamable Map some overloads to F# functions --- TrillSamples/HelloWorldFS/Program.fs | 87 ++++++++++++++++++++++------ 1 file changed, 68 insertions(+), 19 deletions(-) diff --git a/TrillSamples/HelloWorldFS/Program.fs b/TrillSamples/HelloWorldFS/Program.fs index 7156a9f..19c1339 100644 --- a/TrillSamples/HelloWorldFS/Program.fs +++ b/TrillSamples/HelloWorldFS/Program.fs @@ -4,34 +4,83 @@ open System.Linq.Expressions open System.Reactive.Linq open Microsoft.StreamProcessing -module Streamable = - open Microsoft.StreamProcessing +open System.Runtime.CompilerServices - let ofObservable observable = +[] +type Observable with + [] + static member inline toStreamable(observable: IObservable>, + ?disorderPolicy : DisorderPolicy, + ?flushPolicy : FlushPolicy, + ?periodicPunctuationPolicy : PeriodicPunctuationPolicy, + ?onCompletedPolicy : OnCompletedPolicy) = + let disorderPolicy = DisorderPolicy.Throw() |> defaultArg disorderPolicy + let flushPolicy = FlushPolicy.FlushOnPunctuation |> defaultArg flushPolicy + let periodicPunctuationPolicy = PeriodicPunctuationPolicy.None() |> defaultArg periodicPunctuationPolicy + let onCompletedPolicy = OnCompletedPolicy.EndOfStream |> defaultArg onCompletedPolicy Streamable.ToStreamable( - observable, - DisorderPolicy.Throw(), - FlushPolicy.FlushOnPunctuation, - PeriodicPunctuationPolicy.None(), - OnCompletedPolicy.EndOfStream) + observable, + disorderPolicy, + flushPolicy, + periodicPunctuationPolicy, + onCompletedPolicy) + +module Streamable = + open Microsoft.StreamProcessing - let join mapping left right = - Streamable.Join(left, right, fun a b -> mapping a b) + let join map left right = + Streamable.Join(left, right, fun a b -> map a b) + + let keyJoin map left right = + Streamable.Join(fst left, fst right, (fun a -> a |> snd left), (fun b -> b |> snd right), (fun a b -> map a b)) + + let hintedKeyJoin options map left right = + Streamable.Join(fst left, fst right, (fun a -> a |> snd left), (fun b -> b |> snd right), (fun a b -> map a b), options) + + let multicast (map:IStreamable<'TKey,'TPayload> -> IStreamable<'TKey,'TResult>) source = + Streamable.Multicast(source, map) + + let multicastMerge (map:IStreamable<'k,'a> -> IStreamable<'k,'b> -> IStreamable<'k,'c>) sourceLeft sourceRight = + Streamable.Multicast(sourceLeft, sourceRight, fun a b -> map a b) - let multicast (mapping:IStreamable<'k,'a> -> IStreamable<'k,'b>) source = - Streamable.Multicast(source, mapping) + let multicastCount (count:int) source : IStreamable<'TKey,'TPayload>[] = + Streamable.Multicast(source, count) - let toStreamEventObservable (source:IStreamable) = + let toStreamEventObservable (source:IStreamable) = Streamable.ToStreamEventObservable(source) - let alterEventLifetime (duration:int64) startTimeSelector source = + let toReshapedStreamEventObservable reshapingPolicy (source:IStreamable) = + Streamable.ToStreamEventObservable(source, reshapingPolicy) + + let toPartitionedStreamEventObservable (source:IStreamable,'TPayload>) = + Streamable.ToStreamEventObservable(source) + + let toReshapedPartitionedStreamEventObservable reshapingPolicy (source:IStreamable,'TPayload>) = + Streamable.ToStreamEventObservable(source, reshapingPolicy) + + let setEventLifetime startTimeSelector (duration:int64) source = Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), duration) - let filter predicate source = + let mapEventLifetime startTimeSelector (map:int64 -> int64) source = + Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), fun duration -> map duration) + + let mergeEventLifetime startTimeSelector (merge:int64 -> int64 -> int64) source = + Streamable.AlterEventLifetime(source, (fun x -> startTimeSelector x), fun a b -> merge a b) + + let mergePartitionedEventLifetime startTimeSelector (merge:'TPartition -> int64 -> int64 -> int64) source : IStreamable,'TPayload> = + Streamable.AlterEventLifetime(source, (fun p x -> startTimeSelector p x), fun a b -> merge a b) + + let filter predicate source : IStreamable<'TKey,'TPayload> = Streamable.Where(source, fun e -> predicate e) - let map mapping source = - Streamable.Select(source, fun e -> mapping e) + let map (map:'TPayload->'TResult) source : IStreamable<'TKey,'TResult>= + Streamable.Select(source, fun e -> map e) + + let bind (map:'TLeft -> 'TRight -> 'TResult) (left:IStreamable<'TKey,'TLeft>) (right:Empty -> IStreamable<'TKey,'TRight>) = + Streamable.SelectMany(left, right, fun a b -> map a b) + + let groupBind (groupDefinition:IMapDefinition<'TOuterKey,'TPayload,'TPayload,'TInnerKey,'TResult>) apply (resultSelector:GroupSelectorInput<'TInnerKey> -> 'TBind -> 'TOutput) = + Streamable.SelectMany(groupDefinition, apply, fun i b -> resultSelector i b) type SensorRange = { Time: int64; Low: int; High: int } type SensorReading = { Time: int64; Value: int } @@ -44,14 +93,14 @@ let historicStream = historicData |> Observable.ToObservable |> Observable.map (fun r -> StreamEvent.CreateInterval(r.Time, r.Time + 1L, r)) - |> Streamable.ofObservable + |> Observable.toStreamable let createStream isRealTime = historicStream let ranges threshold input = let above = input |> Streamable.filter (fun s -> s.Value > threshold) - let below = input |> Streamable.alterEventLifetime 1L ((+) 1L) |> Streamable.filter (fun s -> s.Value < threshold) + let below = input |> Streamable.setEventLifetime ((+) 1L) 1L |> Streamable.filter (fun s -> s.Value < threshold) let (|><|) = Streamable.join (fun a b -> { Time = a.Time; Low = b.Value; High = a.Value }) below |><| above