From 48bf317ccd57ca40eaecbaaa78f9618463ee9588 Mon Sep 17 00:00:00 2001 From: Mark Sagi-Kazar Date: Wed, 17 Sep 2025 18:33:05 +0200 Subject: [PATCH] Upgrade backoff library to v5 Signed-off-by: Mark Sagi-Kazar --- backend/client.go | 4 +--- backend/executor.go | 5 +---- backend/worker.go | 5 +---- client/client_grpc.go | 30 +++++++++++++----------------- client/worker_grpc.go | 16 +++++++--------- go.mod | 2 +- go.sum | 4 ++-- 7 files changed, 26 insertions(+), 40 deletions(-) diff --git a/backend/client.go b/backend/client.go index 53440236..64c1cd06 100644 --- a/backend/client.go +++ b/backend/client.go @@ -5,7 +5,7 @@ import ( "fmt" "time" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "github.com/google/uuid" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/trace" @@ -104,8 +104,6 @@ func (c *backendClient) waitForOrchestrationCondition(ctx context.Context, id ap MaxInterval: 10 * time.Second, Multiplier: 1.5, RandomizationFactor: 0.05, - Stop: backoff.Stop, - Clock: backoff.SystemClock, } b.Reset() diff --git a/backend/executor.go b/backend/executor.go index feef60d7..66469f9b 100644 --- a/backend/executor.go +++ b/backend/executor.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/metadata" @@ -477,10 +477,7 @@ func (g *grpcExecutor) waitForInstance(ctx context.Context, req *protos.GetInsta MaxInterval: 3 * time.Second, Multiplier: 1.5, RandomizationFactor: 0.5, - Stop: backoff.Stop, - Clock: backoff.SystemClock, } - b = backoff.WithContext(b, ctx) b.Reset() loop: diff --git a/backend/worker.go b/backend/worker.go index 640e7bc3..c9255c7a 100644 --- a/backend/worker.go +++ b/backend/worker.go @@ -7,7 +7,7 @@ import ( "sync/atomic" "time" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "github.com/marusama/semaphore/v2" ) @@ -99,10 +99,7 @@ func (w *worker) Start(ctx context.Context) { MaxInterval: 5 * time.Second, Multiplier: 1.05, RandomizationFactor: 0.05, - Stop: backoff.Stop, - Clock: backoff.SystemClock, } - b = backoff.WithContext(b, ctx) b.Reset() loop: diff --git a/client/client_grpc.go b/client/client_grpc.go index bf1ef199..f3e33ade 100644 --- a/client/client_grpc.go +++ b/client/client_grpc.go @@ -4,7 +4,7 @@ import ( "context" "fmt" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/protobuf/types/known/wrapperspb" @@ -70,20 +70,18 @@ func (c *TaskHubGrpcClient) FetchOrchestrationMetadata(ctx context.Context, id a // // api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist. func (c *TaskHubGrpcClient) WaitForOrchestrationStart(ctx context.Context, id api.InstanceID, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) { - var resp *protos.GetInstanceResponse - var err error - err = backoff.Retry(func() error { + resp, err := backoff.Retry(ctx, func() (*protos.GetInstanceResponse, error) { req := makeGetInstanceRequest(id, opts) - resp, err = c.client.WaitForInstanceStart(ctx, req) + resp, err := c.client.WaitForInstanceStart(ctx, req) if err != nil { // if its context cancelled stop retrying if ctx.Err() != nil { - return backoff.Permanent(ctx.Err()) + return nil, backoff.Permanent(ctx.Err()) } - return fmt.Errorf("failed to wait for orchestration start: %w", err) + return nil, fmt.Errorf("failed to wait for orchestration start: %w", err) } - return nil - }, backoff.WithContext(newInfiniteRetries(), ctx)) + return resp, nil + }, backoff.WithBackOff(newInfiniteRetries())) if err != nil { return nil, err } @@ -95,20 +93,18 @@ func (c *TaskHubGrpcClient) WaitForOrchestrationStart(ctx context.Context, id ap // // api.ErrInstanceNotFound is returned when the specified orchestration doesn't exist. func (c *TaskHubGrpcClient) WaitForOrchestrationCompletion(ctx context.Context, id api.InstanceID, opts ...api.FetchOrchestrationMetadataOptions) (*api.OrchestrationMetadata, error) { - var resp *protos.GetInstanceResponse - var err error - err = backoff.Retry(func() error { + resp, err := backoff.Retry(ctx, func() (*protos.GetInstanceResponse, error) { req := makeGetInstanceRequest(id, opts) - resp, err = c.client.WaitForInstanceCompletion(ctx, req) + resp, err := c.client.WaitForInstanceCompletion(ctx, req) if err != nil { // if its context cancelled stop retrying if ctx.Err() != nil { - return backoff.Permanent(ctx.Err()) + return nil, backoff.Permanent(ctx.Err()) } - return fmt.Errorf("failed to wait for orchestration completion: %w", err) + return nil, fmt.Errorf("failed to wait for orchestration completion: %w", err) } - return nil - }, backoff.WithContext(newInfiniteRetries(), ctx)) + return resp, nil + }, backoff.WithBackOff(newInfiniteRetries())) if err != nil { return nil, err } diff --git a/client/worker_grpc.go b/client/worker_grpc.go index 69c1bea0..ff512f1e 100644 --- a/client/worker_grpc.go +++ b/client/worker_grpc.go @@ -7,7 +7,7 @@ import ( "io" "time" - "github.com/cenkalti/backoff/v4" + "github.com/cenkalti/backoff/v5" "github.com/microsoft/durabletask-go/api" "github.com/microsoft/durabletask-go/backend" "github.com/microsoft/durabletask-go/internal/helpers" @@ -90,23 +90,23 @@ func (c *TaskHubGrpcClient) StartWorkItemListener(ctx context.Context, r *task.T return } - err = backoff.Retry( - func() error { + _, err = backoff.Retry[any](ctx, + func() (any, error) { // user wants to stop the listener if ctx.Err() != nil { - return backoff.Permanent(ctx.Err()) + return nil, backoff.Permanent(ctx.Err()) } c.logger.Infof("reconnecting work item listener stream") streamErr := initStream() if streamErr != nil { c.logger.Errorf("error initializing work item listener stream %v", streamErr) - return streamErr + return nil, streamErr } - return nil + return nil, nil }, // retry forever since we don't have a way of asynchronously return errors to the user - newInfiniteRetries(), + backoff.WithBackOff(newInfiniteRetries()), ) if err != nil { c.logger.Infof("stopping background processor, unable to reconnect stream: %v", err) @@ -205,8 +205,6 @@ func newInfiniteRetries() *backoff.ExponentialBackOff { b := backoff.NewExponentialBackOff() // max wait of 15 seconds between retries b.MaxInterval = 15 * time.Second - // retry forever - b.MaxElapsedTime = 0 b.Reset() return b } diff --git a/go.mod b/go.mod index 04aa4dde..6e1b0348 100644 --- a/go.mod +++ b/go.mod @@ -3,7 +3,7 @@ module github.com/microsoft/durabletask-go go 1.23.0 require ( - github.com/cenkalti/backoff/v4 v4.1.3 + github.com/cenkalti/backoff/v5 v5.0.3 github.com/golang/protobuf v1.5.3 github.com/google/uuid v1.3.0 github.com/jackc/pgx/v5 v5.7.1 diff --git a/go.sum b/go.sum index 050e23d5..4c704ced 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,5 @@ -github.com/cenkalti/backoff/v4 v4.1.3 h1:cFAlzYUlVYDysBEH2T5hyJZMh3+5+WCBvSnK6Q8UtC4= -github.com/cenkalti/backoff/v4 v4.1.3/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= +github.com/cenkalti/backoff/v5 v5.0.3 h1:ZN+IMa753KfX5hd8vVaMixjnqRZ3y8CuJKRKj1xcsSM= +github.com/cenkalti/backoff/v5 v5.0.3/go.mod h1:rkhZdG3JZukswDf7f0cwqPNk4K0sa+F97BxZthm/crw= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=