From 4e6e785dae27748865411a826a4e0400ed765930 Mon Sep 17 00:00:00 2001 From: Peter Bwire Date: Sat, 13 Dec 2025 00:04:51 +0300 Subject: [PATCH 1/4] Prevent busy-loop retries in queue subscriber and workerpool - Add exponential backoff after subscriber Receive errors and avoid spinning on ctx-like errors when ctx is still active - Add exponential backoff before workerpool job resubmission retries - Add regression test to ensure retries are delayed --- queue/subscriber.go | 68 ++++++++++++++++++++++++---------- workerpool/manager.go | 34 +++++++++++++++++ workerpool/worker_pool_test.go | 55 +++++++++++++++++++++++++++ 3 files changed, 137 insertions(+), 20 deletions(-) diff --git a/queue/subscriber.go b/queue/subscriber.go index 48cf0d1d..df257322 100644 --- a/queue/subscriber.go +++ b/queue/subscriber.go @@ -227,9 +227,10 @@ func (s *subscriber) listen(ctx context.Context) { WithField("function", "subscription"). WithField("url", s.url) logger.Debug("starting to listen for messages") + + var consecutiveReceiveErrors int for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { err := s.Stop(ctx) if err != nil { logger.WithError(err).Error("could not stop subscription") @@ -237,31 +238,58 @@ func (s *subscriber) listen(ctx context.Context) { } logger.Debug("exiting due to canceled context") return + } - default: - msg, err := s.Receive(ctx) - if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Context cancelled or deadline exceeded, loop again to check ctx.Done() + msg, err := s.Receive(ctx) + if err != nil { + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + // Context cancelled or deadline exceeded, loop again to check ctx.Done() + if ctx.Err() != nil { continue } - // Other errors from Receive are critical for the listener. - logger.WithError(err).Error("could not pull message") + } + + // Other errors from Receive are critical for the listener. + consecutiveReceiveErrors++ + logger.WithError(err).Error("could not pull message") - // Recreate subscription - s.recreateSubscription(ctx) - continue + // Recreate subscription + s.recreateSubscription(ctx) + + delay := time.Duration(0) + if consecutiveReceiveErrors > 0 { + if consecutiveReceiveErrors > 10 { + consecutiveReceiveErrors = 10 + } + delay = 100 * time.Millisecond * time.Duration(1< 30*time.Second { + delay = 30 * time.Second + } } - // Process the received message. Errors from processing (like job submission failure) - // will be logged by processReceivedMessage. If it's a critical submission error, - // it will be returned and will stop the whole application. - if procErr := s.processReceivedMessage(ctx, msg); procErr != nil { - // processReceivedMessage already logs details. This error is for critical failures. - logger.WithError(procErr).Error("critical error processing message, stopping listener") - s.SendStopError(ctx, procErr) // procErr - return // Exit listen loop + if delay > 0 { + timer := time.NewTimer(delay) + select { + case <-ctx.Done(): + timer.Stop() + continue + case <-timer.C: + } } + + continue + } + + consecutiveReceiveErrors = 0 + + // Process the received message. Errors from processing (like job submission failure) + // will be logged by processReceivedMessage. If it's a critical submission error, + // it will be returned and will stop the whole application. + if procErr := s.processReceivedMessage(ctx, msg); procErr != nil { + // processReceivedMessage already logs details. This error is for critical failures. + logger.WithError(procErr).Error("critical error processing message, stopping listener") + s.SendStopError(ctx, procErr) // procErr + return // Exit listen loop } } } diff --git a/workerpool/manager.go b/workerpool/manager.go index 9251e368..ad82b045 100644 --- a/workerpool/manager.go +++ b/workerpool/manager.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/pitabwire/util" @@ -105,6 +106,39 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f // Job can be retried to resolve error log.Warn("Job failed, attempting to retry it") + + attempt := job.Runs() + if attempt > 10 { + attempt = 10 + } + + delay := 100 * time.Millisecond * time.Duration(1< 30*time.Second { + delay = 30 * time.Second + } + + if delay > 0 { + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + job.Close() + return + case <-timer.C: + } + + resubmitErr := SubmitJob(ctx, s, job) + if resubmitErr != nil { + log.WithError(resubmitErr).Error("Failed to resubmit job") + _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) + job.Close() + } + }() + return + } + resubmitErr := SubmitJob(ctx, s, job) // Recursive call to SubmitJob for retry if resubmitErr != nil { log.WithError(resubmitErr).Error("Failed to resubmit job") diff --git a/workerpool/worker_pool_test.go b/workerpool/worker_pool_test.go index 50001ba3..7e07769b 100644 --- a/workerpool/worker_pool_test.go +++ b/workerpool/worker_pool_test.go @@ -18,6 +18,22 @@ type WorkerPoolTestSuite struct { tests.BaseTestSuite } +type testWorkerPool struct{} + +func (p *testWorkerPool) Submit(_ context.Context, task func()) error { + task() + return nil +} + +func (p *testWorkerPool) Shutdown() {} + +type testManager struct { + pool workerpool.WorkerPool +} + +func (m *testManager) GetPool() (workerpool.WorkerPool, error) { return m.pool, nil } +func (m *testManager) StopError(_ context.Context, _ error) {} + // TestWorkerPoolSuite runs the worker pool test suite. func TestWorkerPoolSuite(t *testing.T) { suite.Run(t, &WorkerPoolTestSuite{}) @@ -72,6 +88,45 @@ func (s *WorkerPoolTestSuite) TestJobImplChannelOperations() { } } +func (s *WorkerPoolTestSuite) TestJobRetryUsesBackoff() { + ctx, cancel := context.WithTimeout(s.T().Context(), 2*time.Second) + defer cancel() + + var mu sync.Mutex + var executionTimes []time.Time + + job := workerpool.NewJobWithRetry[any](func(_ context.Context, _ workerpool.JobResultPipe[any]) error { + mu.Lock() + executionTimes = append(executionTimes, time.Now()) + mu.Unlock() + return errors.New("fail") + }, 1) + + mgr := &testManager{pool: &testWorkerPool{}} + err := workerpool.SubmitJob(ctx, mgr, job) + s.Require().NoError(err) + + mu.Lock() + s.Require().Len(executionTimes, 1) + mu.Unlock() + + time.Sleep(20 * time.Millisecond) + mu.Lock() + s.Require().Len(executionTimes, 1) + mu.Unlock() + + s.Require().Eventually(func() bool { + mu.Lock() + defer mu.Unlock() + return len(executionTimes) == 2 + }, 500*time.Millisecond, 10*time.Millisecond) + + mu.Lock() + delta := executionTimes[1].Sub(executionTimes[0]) + mu.Unlock() + s.Require().GreaterOrEqual(delta, 80*time.Millisecond) +} + func (s *WorkerPoolTestSuite) writeIntRangeAsResult( ctx context.Context, t *testing.T, From 4344b6b3a7742a7814d9f1b59193142e87bbba53 Mon Sep 17 00:00:00 2001 From: Peter Bwire Date: Sat, 13 Dec 2025 08:20:36 +0300 Subject: [PATCH 2/4] fix: address golangci-lint issues in subscriber and workerpool --- queue/subscriber.go | 70 ++++++++++++++++---------- workerpool/manager.go | 111 +++++++++++++++++++++++++++--------------- 2 files changed, 116 insertions(+), 65 deletions(-) diff --git a/queue/subscriber.go b/queue/subscriber.go index df257322..daab64c0 100644 --- a/queue/subscriber.go +++ b/queue/subscriber.go @@ -18,6 +18,12 @@ import ( "github.com/pitabwire/frame/workerpool" ) +const ( + subscriberReceiveErrorBackoffBaseDelay = 100 * time.Millisecond + subscriberReceiveErrorBackoffMaxDelay = 30 * time.Second + subscriberReceiveErrorBackoffMaxConsecutiveErrs = 10 +) + type subscriber struct { reference string url string @@ -30,6 +36,38 @@ type subscriber struct { workManager workerpool.Manager } +func subscriberReceiveErrorBackoffDelay(consecutiveErrors int) time.Duration { + if consecutiveErrors <= 0 { + return 0 + } + + attempt := consecutiveErrors + if attempt > subscriberReceiveErrorBackoffMaxConsecutiveErrs { + attempt = subscriberReceiveErrorBackoffMaxConsecutiveErrs + } + + delay := subscriberReceiveErrorBackoffBaseDelay * time.Duration(1<<(attempt-1)) + if delay > subscriberReceiveErrorBackoffMaxDelay { + return subscriberReceiveErrorBackoffMaxDelay + } + + return delay +} + +func waitForDelay(ctx context.Context, delay time.Duration) { + if delay <= 0 { + return + } + + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + case <-timer.C: + } +} + func (s *subscriber) Ref() string { return s.reference } @@ -242,41 +280,23 @@ func (s *subscriber) listen(ctx context.Context) { msg, err := s.Receive(ctx) if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + if ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { // Context cancelled or deadline exceeded, loop again to check ctx.Done() - if ctx.Err() != nil { - continue - } + continue } // Other errors from Receive are critical for the listener. consecutiveReceiveErrors++ + if consecutiveReceiveErrors > subscriberReceiveErrorBackoffMaxConsecutiveErrs { + consecutiveReceiveErrors = subscriberReceiveErrorBackoffMaxConsecutiveErrs + } logger.WithError(err).Error("could not pull message") // Recreate subscription s.recreateSubscription(ctx) - delay := time.Duration(0) - if consecutiveReceiveErrors > 0 { - if consecutiveReceiveErrors > 10 { - consecutiveReceiveErrors = 10 - } - delay = 100 * time.Millisecond * time.Duration(1< 30*time.Second { - delay = 30 * time.Second - } - } - - if delay > 0 { - timer := time.NewTimer(delay) - select { - case <-ctx.Done(): - timer.Stop() - continue - case <-timer.C: - } - } - + delay := subscriberReceiveErrorBackoffDelay(consecutiveReceiveErrors) + waitForDelay(ctx, delay) continue } diff --git a/workerpool/manager.go b/workerpool/manager.go index ad82b045..21e43bb6 100644 --- a/workerpool/manager.go +++ b/workerpool/manager.go @@ -11,6 +11,74 @@ import ( "github.com/pitabwire/frame/config" ) +const ( + jobRetryBackoffBaseDelay = 100 * time.Millisecond + jobRetryBackoffMaxDelay = 30 * time.Second + jobRetryBackoffMaxRunNumber = 10 +) + +func shouldCloseJob(executionErr error) bool { + return executionErr == nil || errors.Is(executionErr, context.Canceled) || + errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed) +} + +func jobRetryBackoffDelay(run int) time.Duration { + if run < 1 { + run = 1 + } + + if run > jobRetryBackoffMaxRunNumber { + run = jobRetryBackoffMaxRunNumber + } + + delay := jobRetryBackoffBaseDelay * time.Duration(1<<(run-1)) + if delay > jobRetryBackoffMaxDelay { + return jobRetryBackoffMaxDelay + } + + return delay +} + +func handleResubmitError[T any]( + ctx context.Context, + job Job[T], + log *util.LogEntry, + executionErr error, + resubmitErr error, +) { + if resubmitErr == nil { + return + } + + log.WithError(resubmitErr).Error("Failed to resubmit job") + _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) + job.Close() +} + +func scheduleRetryResubmission[T any]( + ctx context.Context, + s Manager, + job Job[T], + delay time.Duration, + log *util.LogEntry, + executionErr error, +) { + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + job.Close() + return + case <-timer.C: + } + + resubmitErr := SubmitJob(ctx, s, job) + handleResubmitError(ctx, job, log, executionErr, resubmitErr) + }() +} + type manager struct { pool WorkerPool stopErr func(ctx context.Context, err error) @@ -89,8 +157,7 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f executionErr := job.F()(ctx, job) // Handle successful execution first and return early - if executionErr == nil || errors.Is(executionErr, context.Canceled) || - errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed) { + if shouldCloseJob(executionErr) { job.Close() return } @@ -107,43 +174,7 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f // Job can be retried to resolve error log.Warn("Job failed, attempting to retry it") - attempt := job.Runs() - if attempt > 10 { - attempt = 10 - } - - delay := 100 * time.Millisecond * time.Duration(1< 30*time.Second { - delay = 30 * time.Second - } - - if delay > 0 { - go func() { - timer := time.NewTimer(delay) - defer timer.Stop() - - select { - case <-ctx.Done(): - job.Close() - return - case <-timer.C: - } - - resubmitErr := SubmitJob(ctx, s, job) - if resubmitErr != nil { - log.WithError(resubmitErr).Error("Failed to resubmit job") - _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) - job.Close() - } - }() - return - } - - resubmitErr := SubmitJob(ctx, s, job) // Recursive call to SubmitJob for retry - if resubmitErr != nil { - log.WithError(resubmitErr).Error("Failed to resubmit job") - _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) - job.Close() - } + delay := jobRetryBackoffDelay(job.Runs()) + scheduleRetryResubmission(ctx, s, job, delay, log, executionErr) } } From 86851b53ef14013a223a034f67c08fdb06993798 Mon Sep 17 00:00:00 2001 From: Peter Bwire Date: Sat, 13 Dec 2025 08:23:46 +0300 Subject: [PATCH 3/4] resolve minor lint issue --- localization/interceptors/{http => httpi}/language.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename localization/interceptors/{http => httpi}/language.go (97%) diff --git a/localization/interceptors/http/language.go b/localization/interceptors/httpi/language.go similarity index 97% rename from localization/interceptors/http/language.go rename to localization/interceptors/httpi/language.go index 30d78dc0..f39b2e46 100644 --- a/localization/interceptors/http/language.go +++ b/localization/interceptors/httpi/language.go @@ -1,4 +1,4 @@ -package http +package httpi import ( "net/http" From 6035437233231cfd60c355e9c78ec38f1232abbb Mon Sep 17 00:00:00 2001 From: Peter Bwire Date: Sat, 13 Dec 2025 08:27:46 +0300 Subject: [PATCH 4/4] further fixes for localization --- localization/localization_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/localization/localization_test.go b/localization/localization_test.go index 8f41bd3a..656fd2e7 100644 --- a/localization/localization_test.go +++ b/localization/localization_test.go @@ -14,7 +14,7 @@ import ( "github.com/pitabwire/frame" "github.com/pitabwire/frame/localization" lgrpc "github.com/pitabwire/frame/localization/interceptors/grpc" - lhttp "github.com/pitabwire/frame/localization/interceptors/http" + lhttp "github.com/pitabwire/frame/localization/interceptors/httpi" "github.com/pitabwire/frame/tests" ) @@ -255,7 +255,7 @@ func (s *LocalizationTestSuite) TestLanguageHTTPMiddleware() { middleware := lhttp.LanguageHTTPMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { lang := localization.FromContext(r.Context()) w.WriteHeader(http.StatusOK) - w.Write([]byte(strings.Join(lang, ","))) + _, _ = w.Write([]byte(strings.Join(lang, ","))) })) req := httptest.NewRequest(http.MethodGet, tc.requestPath, nil)