Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 71 additions & 34 deletions src/Pholly.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ executers that can be reused so as to avoid repeated reconstruction of policies.
module Pholly

open System
open System.Threading
open System.Threading.Tasks
open FSharp.Control.Tasks.NonAffine
open Polly.CircuitBreaker
Expand All @@ -27,11 +28,11 @@ module Fallback =
OnFallback: (Polly.DelegateResult<Result<'a,'b>> -> Polly.Context -> unit) option
OnFallbackAsync: (Polly.DelegateResult<Result<'a,'b>> -> Polly.Context -> Task<unit>) option
}

let shouldFallback handler config = { config with ShouldFallback = handler }

let whenFallingBack handler config = { config with OnFallback = handler }

let whenFallingBackAsync handler config = { config with OnFallbackAsync = handler }

module CircuitBreaker =
Expand All @@ -47,9 +48,9 @@ module CircuitBreaker =
Reset: unit -> unit
}
*)

[<Measure>] type consecutiveErrors

type CircuitBreakerConfig<'a,'b> =
{ BreakOn: int<consecutiveErrors>
ShouldBreak: Result<'a,'b> -> bool
Expand All @@ -58,7 +59,7 @@ module CircuitBreaker =
OnReset: Polly.Context -> unit
CircuitOpenResult: Result<'a,'b> option
}

let breakOn consecutiveErrors config = { config with BreakOn = consecutiveErrors }
let shouldBreak handler config = { config with ShouldBreak = handler }
let whenCircuitIsOpened handler config = { config with OnBreak = handler }
Expand All @@ -70,31 +71,67 @@ module Retry =
exception RetryForeverFailedException
exception FailedWithOkResultException
[<Measure>] type times

type Retry =
| Forever
| Times of int

type RetryConfig<'a,'b> =
{ Retry : Retry
BackoffSequenceMs : int<ms> list
BeforeEachRetry : 'b->int->Polly.Context->unit
ShouldRetry: Result<'a,'b> -> bool
}
}

let retry retries config = { config with Retry = retries }
let withIntervalOf interval config = { config with BackoffSequenceMs = [interval] }
let withIntervalsOf backoff config = { config with BackoffSequenceMs = backoff }
let beforeEachRetry handler config = { config with BeforeEachRetry = handler }
let shouldRetry handler config = { config with ShouldRetry = handler }
// syntactic sugar for retries
let upto (value:int<times>) = value |> int |> Times

let upto (value:int<times>) = value |> int |> Times

module Timeout =

type Strategy =
| Optimistic
| Pessimistic

type TimeoutConfig =
{ Timeout: TimeSpan
Strategy: Strategy
OnTimeout: (Polly.Context -> TimeSpan -> Task -> Task) option
}

let timeout timespan config = { config with Timeout = timespan }
let withStrategy strategy config = { config with Strategy = strategy }
let onTimeout ontimeout config = { config with OnTimeout = ontimeout }

module Policy =
open Polly

open Polly.Timeout

let defaultResultComparer = function | Ok _ -> false | Error _ -> true


let timeoutAsync (props:(Timeout.TimeoutConfig -> Timeout.TimeoutConfig) seq) =
let defaultProps:Timeout.TimeoutConfig =
{ Timeout = TimeSpan.FromSeconds 30.
Strategy = Timeout.Strategy.Optimistic
OnTimeout = None
}
let config = props |> Seq.fold(fun cfg configFunc -> cfg |> configFunc) defaultProps
let onTimeout = defaultArg config.OnTimeout (fun _ _ _ -> Task.CompletedTask)
let timeoutPolicy =
let strategy =
match config.Strategy with
| Timeout.Optimistic -> TimeoutStrategy.Optimistic
| Timeout.Pessimistic -> TimeoutStrategy.Pessimistic
Policy
.TimeoutAsync(config.Timeout, strategy, onTimeout)
let execute cancellationToken (asyncWorkload: (CancellationToken -> Task<'t>)) =
timeoutPolicy.ExecuteAsync(asyncWorkload, cancellationToken)
execute

let fallbackWithOptions<'a,'b> (value:'a) (props:(Fallback.FallbackConfig<'a,'b> -> Fallback.FallbackConfig<'a,'b>) seq) =
let defaultProps:Fallback.FallbackConfig<'a,'b> =
{ ShouldFallback = defaultResultComparer
Expand All @@ -112,9 +149,9 @@ module Policy =
| Ok value -> value
| Error _ -> raise Fallback.UnsuccessfulFallbackException
execute

let fallbackWith value = fallbackWithOptions value []

let fallbackAsyncWithOptions<'a,'b> (value:'a) (props:(Fallback.FallbackConfig<'a,'b> -> Fallback.FallbackConfig<'a,'b>) seq) =
let defaultProps:Fallback.FallbackConfig<'a,'b> =
{ ShouldFallback = defaultResultComparer
Expand All @@ -139,9 +176,9 @@ module Policy =
| Error _ -> raise Fallback.UnsuccessfulFallbackException
}
execute

let fallbackAsyncWith value = fallbackAsyncWithOptions value []

let circuitBreakerAsync<'a,'b> (props:(CircuitBreaker.CircuitBreakerConfig<'a,'b> -> CircuitBreaker.CircuitBreakerConfig<'a,'b>) seq) =
let defaultProps:CircuitBreaker.CircuitBreakerConfig<'a,'b> =
{ BreakOn = 10<CircuitBreaker.consecutiveErrors>
Expand All @@ -153,7 +190,7 @@ module Policy =
}
let config = props |> Seq.fold(fun cfg configFunc -> cfg |> configFunc) defaultProps
let breakerPolicy = Policy.HandleResult(fun r -> r |> config.ShouldBreak)

let breakerPolicy =
breakerPolicy.CircuitBreakerAsync(
config.BreakOn |> int,
Expand All @@ -170,7 +207,7 @@ module Policy =
match config.CircuitOpenResult with
| Some circuitOpenResult -> circuitOpenResult
| None -> raise exn
// now I'm using tasks I don't think this is needed, but leaving as a failsafe for now
// now I'm using tasks I don't think this is needed, but leaving as a failsafe for now
| :? AggregateException as exn when (exn.InnerException :? BrokenCircuitException) ->
return
match config.CircuitOpenResult with
Expand All @@ -179,7 +216,7 @@ module Policy =
}

(execute, breakerPolicy.Reset, breakerPolicy.Isolate)

let circuitBreaker<'a,'b> (props:(CircuitBreaker.CircuitBreakerConfig<'a,'b> -> CircuitBreaker.CircuitBreakerConfig<'a,'b>) seq) =
let defaultProps:CircuitBreaker.CircuitBreakerConfig<'a,'b> =
{ BreakOn = 10<CircuitBreaker.consecutiveErrors>
Expand All @@ -191,7 +228,7 @@ module Policy =
}
let config = props |> Seq.fold(fun cfg configFunc -> cfg |> configFunc) defaultProps
let breakerPolicy = Policy.HandleResult(fun r -> r |> config.ShouldBreak)

let breakerPolicy =
breakerPolicy.CircuitBreaker(
config.BreakOn |> int,
Expand All @@ -207,9 +244,9 @@ module Policy =
match config.CircuitOpenResult with
| Some circuitOpenResult -> circuitOpenResult
| None -> raise exn

(execute, breakerPolicy.Reset, breakerPolicy.Isolate)

let retryAsync<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) =
let defaultProps:(Retry.RetryConfig<'a,'b>) =
{ Retry = Retry.Times 10
Expand All @@ -218,15 +255,15 @@ module Policy =
ShouldRetry = function | Ok _ -> false | Error _ -> true
}
let config = retryProps |> Seq.fold (fun cfg configFunc -> cfg |> configFunc) defaultProps

let durationProvider =
fun (retryAttempt:int) _ -> TimeSpan.FromMilliseconds(config.BackoffSequenceMs.[min retryAttempt (config.BackoffSequenceMs.Length-1)] |> double)
let retryPolicy = Policy.HandleResult(fun r -> r |> config.ShouldRetry)
let retryHandler =
fun (dr:DelegateResult<Result<'a,'b>>) ct ctx ->
match dr.Result with
| Error e -> config.BeforeEachRetry e ct ctx
| Ok _ -> raise Retry.FailedWithOkResultException
| Ok _ -> raise Retry.FailedWithOkResultException
let retryPolicy =
match config.Retry with
| Retry.Times times ->
Expand All @@ -240,13 +277,13 @@ module Policy =
match config.BackoffSequenceMs |> Seq.isEmpty with
| true ->
retryPolicy.RetryForeverAsync(onRetry=retryHandler)
| false ->
| false ->
let wrappedHandler = (fun r i (_:TimeSpan) ctx -> retryHandler r i ctx)
retryPolicy.WaitAndRetryForeverAsync(durationProvider,wrappedHandler)
let execute asyncWorkload =
retryPolicy.ExecuteAsync(fun () -> asyncWorkload())
execute

let retry<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) =
let defaultProps:(Retry.RetryConfig<'a,'b>) =
{ Retry = Retry.Times 10
Expand All @@ -257,7 +294,7 @@ module Policy =
let config =
retryProps
|> Seq.fold (fun cfg configFunc -> configFunc cfg) defaultProps

let retryPolicy = Policy.HandleResult(fun r -> r |> config.ShouldRetry)
let durationProvider =
fun retryAttempt _ -> TimeSpan.FromMilliseconds(config.BackoffSequenceMs.[min retryAttempt (config.BackoffSequenceMs.Length-1)] |> double)
Expand All @@ -279,25 +316,25 @@ module Policy =
match config.BackoffSequenceMs |> Seq.isEmpty with
| true ->
retryPolicy.RetryForever(onRetry=retryHandler)
| false ->
| false ->
let wrappedHandler = (fun r i (_:TimeSpan) ctx -> retryHandler r i ctx)
retryPolicy.WaitAndRetryForever(durationProvider,wrappedHandler)
let execute workload =
retryPolicy.Execute(fun () -> workload())
execute

// we separate out retry forever as this means we can simply return 'a rather than Result<'a,'b> simplifying
// usage for the caller
let retryForever<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) =
let forever (config:Retry.RetryConfig<'a,'b>) = { config with Retry = Retry.Forever }
let resultExecute = retry<'a,'b> ([forever] |> Seq.append retryProps)
let resultExecute = retry<'a,'b> ([forever] |> Seq.append retryProps)
let execute workload =
let result = workload |> resultExecute
match result with
| Ok r -> r
| Error _ -> raise Retry.RetryForeverFailedException // this should not occur as retrying until ok
execute

let retryForeverAsync<'a,'b> (retryProps:(Retry.RetryConfig<'a,'b> -> Retry.RetryConfig<'a,'b>) seq) =
let forever (config:Retry.RetryConfig<'a,'b>) = { config with Retry = Retry.Forever }
let resultExecute = retryAsync<'a,'b> ([forever] |> Seq.append retryProps)
Expand All @@ -310,7 +347,7 @@ module Policy =
}
executeAsync

let (-|>) leftSide rightSide =
let (-|>) leftSide rightSide =
let execute workload =
rightSide (fun () -> workload |> leftSide)
execute
Expand Down