From 3ef12117ac38158976e55f824a182672455de8fe Mon Sep 17 00:00:00 2001 From: "kevin.knoop" Date: Mon, 5 Sep 2022 12:03:31 +0200 Subject: [PATCH] Add timeout (async) policy. --- src/Pholly.fs | 105 ++++++++++++++++++++++++++++++++++---------------- 1 file changed, 71 insertions(+), 34 deletions(-) diff --git a/src/Pholly.fs b/src/Pholly.fs index 7ac4a45..04e2422 100644 --- a/src/Pholly.fs +++ b/src/Pholly.fs @@ -10,6 +10,7 @@ executers that can be reused so as to avoid repeated reconstruction of policies. module Pholly open System +open System.Threading open System.Threading.Tasks open FSharp.Control.Tasks.NonAffine open Polly.CircuitBreaker @@ -27,11 +28,11 @@ module Fallback = OnFallback: (Polly.DelegateResult> -> Polly.Context -> unit) option OnFallbackAsync: (Polly.DelegateResult> -> Polly.Context -> Task) option } - + let shouldFallback handler config = { config with ShouldFallback = handler } - + let whenFallingBack handler config = { config with OnFallback = handler } - + let whenFallingBackAsync handler config = { config with OnFallbackAsync = handler } module CircuitBreaker = @@ -47,9 +48,9 @@ module CircuitBreaker = Reset: unit -> unit } *) - + [] type consecutiveErrors - + type CircuitBreakerConfig<'a,'b> = { BreakOn: int ShouldBreak: Result<'a,'b> -> bool @@ -58,7 +59,7 @@ module CircuitBreaker = OnReset: Polly.Context -> unit CircuitOpenResult: Result<'a,'b> option } - + let breakOn consecutiveErrors config = { config with BreakOn = consecutiveErrors } let shouldBreak handler config = { config with ShouldBreak = handler } let whenCircuitIsOpened handler config = { config with OnBreak = handler } @@ -70,31 +71,67 @@ module Retry = exception RetryForeverFailedException exception FailedWithOkResultException [] type times - + type Retry = | Forever | Times of int - + type RetryConfig<'a,'b> = { Retry : Retry BackoffSequenceMs : int list BeforeEachRetry : 'b->int->Polly.Context->unit ShouldRetry: Result<'a,'b> -> bool - } - + } + let retry retries config = { config with Retry = retries } let withIntervalOf interval config = { config with BackoffSequenceMs = [interval] } let withIntervalsOf backoff config = { config with BackoffSequenceMs = backoff } let beforeEachRetry handler config = { config with BeforeEachRetry = handler } let shouldRetry handler config = { config with ShouldRetry = handler } // syntactic sugar for retries - let upto (value:int) = value |> int |> Times - + let upto (value:int) = value |> int |> Times + +module Timeout = + + type Strategy = + | Optimistic + | Pessimistic + + type TimeoutConfig = + { Timeout: TimeSpan + Strategy: Strategy + OnTimeout: (Polly.Context -> TimeSpan -> Task -> Task) option + } + + let timeout timespan config = { config with Timeout = timespan } + let withStrategy strategy config = { config with Strategy = strategy } + let onTimeout ontimeout config = { config with OnTimeout = ontimeout } + module Policy = open Polly - + open Polly.Timeout + let defaultResultComparer = function | Ok _ -> false | Error _ -> true - + + let timeoutAsync (props:(Timeout.TimeoutConfig -> Timeout.TimeoutConfig) seq) = + let defaultProps:Timeout.TimeoutConfig = + { Timeout = TimeSpan.FromSeconds 30. + Strategy = Timeout.Strategy.Optimistic + OnTimeout = None + } + let config = props |> Seq.fold(fun cfg configFunc -> cfg |> configFunc) defaultProps + let onTimeout = defaultArg config.OnTimeout (fun _ _ _ -> Task.CompletedTask) + let timeoutPolicy = + let strategy = + match config.Strategy with + | Timeout.Optimistic -> TimeoutStrategy.Optimistic + | Timeout.Pessimistic -> TimeoutStrategy.Pessimistic + Policy + .TimeoutAsync(config.Timeout, strategy, onTimeout) + let execute cancellationToken (asyncWorkload: (CancellationToken -> Task<'t>)) = + timeoutPolicy.ExecuteAsync(asyncWorkload, cancellationToken) + execute + let fallbackWithOptions<'a,'b> (value:'a) (props:(Fallback.FallbackConfig<'a,'b> -> Fallback.FallbackConfig<'a,'b>) seq) = let defaultProps:Fallback.FallbackConfig<'a,'b> = { ShouldFallback = defaultResultComparer @@ -112,9 +149,9 @@ module Policy = | Ok value -> value | Error _ -> raise Fallback.UnsuccessfulFallbackException execute - + let fallbackWith value = fallbackWithOptions value [] - + let fallbackAsyncWithOptions<'a,'b> (value:'a) (props:(Fallback.FallbackConfig<'a,'b> -> Fallback.FallbackConfig<'a,'b>) seq) = let defaultProps:Fallback.FallbackConfig<'a,'b> = { ShouldFallback = defaultResultComparer @@ -139,9 +176,9 @@ module Policy = | Error _ -> raise Fallback.UnsuccessfulFallbackException } execute - + let fallbackAsyncWith value = fallbackAsyncWithOptions value [] - + let circuitBreakerAsync<'a,'b> (props:(CircuitBreaker.CircuitBreakerConfig<'a,'b> -> CircuitBreaker.CircuitBreakerConfig<'a,'b>) seq) = let defaultProps:CircuitBreaker.CircuitBreakerConfig<'a,'b> = { BreakOn = 10 @@ -153,7 +190,7 @@ module Policy = } let config = props |> Seq.fold(fun cfg configFunc -> cfg |> configFunc) defaultProps let breakerPolicy = Policy.HandleResult(fun r -> r |> config.ShouldBreak) - + let breakerPolicy = breakerPolicy.CircuitBreakerAsync( config.BreakOn |> int, @@ -170,7 +207,7 @@ module Policy = match config.CircuitOpenResult with | Some circuitOpenResult -> circuitOpenResult | None -> raise exn - // now I'm using tasks I don't think this is needed, but leaving as a failsafe for now + // now I'm using tasks I don't think this is needed, but leaving as a failsafe for now | :? AggregateException as exn when (exn.InnerException :? BrokenCircuitException) -> return match config.CircuitOpenResult with @@ -179,7 +216,7 @@ module Policy = } (execute, breakerPolicy.Reset, breakerPolicy.Isolate) - + let circuitBreaker<'a,'b> (props:(CircuitBreaker.CircuitBreakerConfig<'a,'b> -> CircuitBreaker.CircuitBreakerConfig<'a,'b>) seq) = let defaultProps:CircuitBreaker.CircuitBreakerConfig<'a,'b> = { BreakOn = 10 @@ -191,7 +228,7 @@ module Policy = } let config = props |> Seq.fold(fun cfg configFunc -> cfg |> configFunc) defaultProps let breakerPolicy = Policy.HandleResult(fun r -> r |> config.ShouldBreak) - + let breakerPolicy = breakerPolicy.CircuitBreaker( config.BreakOn |> int, @@ -207,9 +244,9 @@ module Policy = match config.CircuitOpenResult with | Some circuitOpenResult -> circuitOpenResult | None -> raise exn - + (execute, breakerPolicy.Reset, breakerPolicy.Isolate) - + let retryAsync<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) = let defaultProps:(Retry.RetryConfig<'a,'b>) = { Retry = Retry.Times 10 @@ -218,7 +255,7 @@ module Policy = ShouldRetry = function | Ok _ -> false | Error _ -> true } let config = retryProps |> Seq.fold (fun cfg configFunc -> cfg |> configFunc) defaultProps - + let durationProvider = fun (retryAttempt:int) _ -> TimeSpan.FromMilliseconds(config.BackoffSequenceMs.[min retryAttempt (config.BackoffSequenceMs.Length-1)] |> double) let retryPolicy = Policy.HandleResult(fun r -> r |> config.ShouldRetry) @@ -226,7 +263,7 @@ module Policy = fun (dr:DelegateResult>) ct ctx -> match dr.Result with | Error e -> config.BeforeEachRetry e ct ctx - | Ok _ -> raise Retry.FailedWithOkResultException + | Ok _ -> raise Retry.FailedWithOkResultException let retryPolicy = match config.Retry with | Retry.Times times -> @@ -240,13 +277,13 @@ module Policy = match config.BackoffSequenceMs |> Seq.isEmpty with | true -> retryPolicy.RetryForeverAsync(onRetry=retryHandler) - | false -> + | false -> let wrappedHandler = (fun r i (_:TimeSpan) ctx -> retryHandler r i ctx) retryPolicy.WaitAndRetryForeverAsync(durationProvider,wrappedHandler) let execute asyncWorkload = retryPolicy.ExecuteAsync(fun () -> asyncWorkload()) execute - + let retry<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) = let defaultProps:(Retry.RetryConfig<'a,'b>) = { Retry = Retry.Times 10 @@ -257,7 +294,7 @@ module Policy = let config = retryProps |> Seq.fold (fun cfg configFunc -> configFunc cfg) defaultProps - + let retryPolicy = Policy.HandleResult(fun r -> r |> config.ShouldRetry) let durationProvider = fun retryAttempt _ -> TimeSpan.FromMilliseconds(config.BackoffSequenceMs.[min retryAttempt (config.BackoffSequenceMs.Length-1)] |> double) @@ -279,25 +316,25 @@ module Policy = match config.BackoffSequenceMs |> Seq.isEmpty with | true -> retryPolicy.RetryForever(onRetry=retryHandler) - | false -> + | false -> let wrappedHandler = (fun r i (_:TimeSpan) ctx -> retryHandler r i ctx) retryPolicy.WaitAndRetryForever(durationProvider,wrappedHandler) let execute workload = retryPolicy.Execute(fun () -> workload()) execute - + // we separate out retry forever as this means we can simply return 'a rather than Result<'a,'b> simplifying // usage for the caller let retryForever<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) = let forever (config:Retry.RetryConfig<'a,'b>) = { config with Retry = Retry.Forever } - let resultExecute = retry<'a,'b> ([forever] |> Seq.append retryProps) + let resultExecute = retry<'a,'b> ([forever] |> Seq.append retryProps) let execute workload = let result = workload |> resultExecute match result with | Ok r -> r | Error _ -> raise Retry.RetryForeverFailedException // this should not occur as retrying until ok execute - + let retryForeverAsync<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) = let forever (config:Retry.RetryConfig<'a,'b>) = { config with Retry = Retry.Forever } let resultExecute = retryAsync<'a,'b> ([forever] |> Seq.append retryProps) @@ -310,7 +347,7 @@ module Policy = } executeAsync -let (-|>) leftSide rightSide = +let (-|>) leftSide rightSide = let execute workload = rightSide (fun () -> workload |> leftSide) execute