diff --git a/localization/interceptors/http/language.go b/localization/interceptors/httpi/language.go similarity index 97% rename from localization/interceptors/http/language.go rename to localization/interceptors/httpi/language.go index 30d78dc0..f39b2e46 100644 --- a/localization/interceptors/http/language.go +++ b/localization/interceptors/httpi/language.go @@ -1,4 +1,4 @@ -package http +package httpi import ( "net/http" diff --git a/localization/localization_test.go b/localization/localization_test.go index 8f41bd3a..656fd2e7 100644 --- a/localization/localization_test.go +++ b/localization/localization_test.go @@ -14,7 +14,7 @@ import ( "github.com/pitabwire/frame" "github.com/pitabwire/frame/localization" lgrpc "github.com/pitabwire/frame/localization/interceptors/grpc" - lhttp "github.com/pitabwire/frame/localization/interceptors/http" + lhttp "github.com/pitabwire/frame/localization/interceptors/httpi" "github.com/pitabwire/frame/tests" ) @@ -255,7 +255,7 @@ func (s *LocalizationTestSuite) TestLanguageHTTPMiddleware() { middleware := lhttp.LanguageHTTPMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { lang := localization.FromContext(r.Context()) w.WriteHeader(http.StatusOK) - w.Write([]byte(strings.Join(lang, ","))) + _, _ = w.Write([]byte(strings.Join(lang, ","))) })) req := httptest.NewRequest(http.MethodGet, tc.requestPath, nil) diff --git a/queue/subscriber.go b/queue/subscriber.go index 48cf0d1d..daab64c0 100644 --- a/queue/subscriber.go +++ b/queue/subscriber.go @@ -18,6 +18,12 @@ import ( "github.com/pitabwire/frame/workerpool" ) +const ( + subscriberReceiveErrorBackoffBaseDelay = 100 * time.Millisecond + subscriberReceiveErrorBackoffMaxDelay = 30 * time.Second + subscriberReceiveErrorBackoffMaxConsecutiveErrs = 10 +) + type subscriber struct { reference string url string @@ -30,6 +36,38 @@ type subscriber struct { workManager workerpool.Manager } +func subscriberReceiveErrorBackoffDelay(consecutiveErrors int) time.Duration { + if consecutiveErrors <= 0 { + return 0 + } + + attempt := consecutiveErrors + if attempt > subscriberReceiveErrorBackoffMaxConsecutiveErrs { + attempt = subscriberReceiveErrorBackoffMaxConsecutiveErrs + } + + delay := subscriberReceiveErrorBackoffBaseDelay * time.Duration(1<<(attempt-1)) + if delay > subscriberReceiveErrorBackoffMaxDelay { + return subscriberReceiveErrorBackoffMaxDelay + } + + return delay +} + +func waitForDelay(ctx context.Context, delay time.Duration) { + if delay <= 0 { + return + } + + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + case <-timer.C: + } +} + func (s *subscriber) Ref() string { return s.reference } @@ -227,9 +265,10 @@ func (s *subscriber) listen(ctx context.Context) { WithField("function", "subscription"). WithField("url", s.url) logger.Debug("starting to listen for messages") + + var consecutiveReceiveErrors int for { - select { - case <-ctx.Done(): + if ctx.Err() != nil { err := s.Stop(ctx) if err != nil { logger.WithError(err).Error("could not stop subscription") @@ -237,31 +276,40 @@ func (s *subscriber) listen(ctx context.Context) { } logger.Debug("exiting due to canceled context") return + } - default: - msg, err := s.Receive(ctx) - if err != nil { - if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { - // Context cancelled or deadline exceeded, loop again to check ctx.Done() - continue - } - // Other errors from Receive are critical for the listener. - logger.WithError(err).Error("could not pull message") - - // Recreate subscription - s.recreateSubscription(ctx) + msg, err := s.Receive(ctx) + if err != nil { + if ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) { + // Context cancelled or deadline exceeded, loop again to check ctx.Done() continue } - // Process the received message. Errors from processing (like job submission failure) - // will be logged by processReceivedMessage. If it's a critical submission error, - // it will be returned and will stop the whole application. - if procErr := s.processReceivedMessage(ctx, msg); procErr != nil { - // processReceivedMessage already logs details. This error is for critical failures. - logger.WithError(procErr).Error("critical error processing message, stopping listener") - s.SendStopError(ctx, procErr) // procErr - return // Exit listen loop + // Other errors from Receive are critical for the listener. + consecutiveReceiveErrors++ + if consecutiveReceiveErrors > subscriberReceiveErrorBackoffMaxConsecutiveErrs { + consecutiveReceiveErrors = subscriberReceiveErrorBackoffMaxConsecutiveErrs } + logger.WithError(err).Error("could not pull message") + + // Recreate subscription + s.recreateSubscription(ctx) + + delay := subscriberReceiveErrorBackoffDelay(consecutiveReceiveErrors) + waitForDelay(ctx, delay) + continue + } + + consecutiveReceiveErrors = 0 + + // Process the received message. Errors from processing (like job submission failure) + // will be logged by processReceivedMessage. If it's a critical submission error, + // it will be returned and will stop the whole application. + if procErr := s.processReceivedMessage(ctx, msg); procErr != nil { + // processReceivedMessage already logs details. This error is for critical failures. + logger.WithError(procErr).Error("critical error processing message, stopping listener") + s.SendStopError(ctx, procErr) // procErr + return // Exit listen loop } } } diff --git a/workerpool/manager.go b/workerpool/manager.go index 9251e368..21e43bb6 100644 --- a/workerpool/manager.go +++ b/workerpool/manager.go @@ -4,12 +4,81 @@ import ( "context" "errors" "fmt" + "time" "github.com/pitabwire/util" "github.com/pitabwire/frame/config" ) +const ( + jobRetryBackoffBaseDelay = 100 * time.Millisecond + jobRetryBackoffMaxDelay = 30 * time.Second + jobRetryBackoffMaxRunNumber = 10 +) + +func shouldCloseJob(executionErr error) bool { + return executionErr == nil || errors.Is(executionErr, context.Canceled) || + errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed) +} + +func jobRetryBackoffDelay(run int) time.Duration { + if run < 1 { + run = 1 + } + + if run > jobRetryBackoffMaxRunNumber { + run = jobRetryBackoffMaxRunNumber + } + + delay := jobRetryBackoffBaseDelay * time.Duration(1<<(run-1)) + if delay > jobRetryBackoffMaxDelay { + return jobRetryBackoffMaxDelay + } + + return delay +} + +func handleResubmitError[T any]( + ctx context.Context, + job Job[T], + log *util.LogEntry, + executionErr error, + resubmitErr error, +) { + if resubmitErr == nil { + return + } + + log.WithError(resubmitErr).Error("Failed to resubmit job") + _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) + job.Close() +} + +func scheduleRetryResubmission[T any]( + ctx context.Context, + s Manager, + job Job[T], + delay time.Duration, + log *util.LogEntry, + executionErr error, +) { + go func() { + timer := time.NewTimer(delay) + defer timer.Stop() + + select { + case <-ctx.Done(): + job.Close() + return + case <-timer.C: + } + + resubmitErr := SubmitJob(ctx, s, job) + handleResubmitError(ctx, job, log, executionErr, resubmitErr) + }() +} + type manager struct { pool WorkerPool stopErr func(ctx context.Context, err error) @@ -88,8 +157,7 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f executionErr := job.F()(ctx, job) // Handle successful execution first and return early - if executionErr == nil || errors.Is(executionErr, context.Canceled) || - errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed) { + if shouldCloseJob(executionErr) { job.Close() return } @@ -105,11 +173,8 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f // Job can be retried to resolve error log.Warn("Job failed, attempting to retry it") - resubmitErr := SubmitJob(ctx, s, job) // Recursive call to SubmitJob for retry - if resubmitErr != nil { - log.WithError(resubmitErr).Error("Failed to resubmit job") - _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) - job.Close() - } + + delay := jobRetryBackoffDelay(job.Runs()) + scheduleRetryResubmission(ctx, s, job, delay, log, executionErr) } } diff --git a/workerpool/worker_pool_test.go b/workerpool/worker_pool_test.go index 50001ba3..7e07769b 100644 --- a/workerpool/worker_pool_test.go +++ b/workerpool/worker_pool_test.go @@ -18,6 +18,22 @@ type WorkerPoolTestSuite struct { tests.BaseTestSuite } +type testWorkerPool struct{} + +func (p *testWorkerPool) Submit(_ context.Context, task func()) error { + task() + return nil +} + +func (p *testWorkerPool) Shutdown() {} + +type testManager struct { + pool workerpool.WorkerPool +} + +func (m *testManager) GetPool() (workerpool.WorkerPool, error) { return m.pool, nil } +func (m *testManager) StopError(_ context.Context, _ error) {} + // TestWorkerPoolSuite runs the worker pool test suite. func TestWorkerPoolSuite(t *testing.T) { suite.Run(t, &WorkerPoolTestSuite{}) @@ -72,6 +88,45 @@ func (s *WorkerPoolTestSuite) TestJobImplChannelOperations() { } } +func (s *WorkerPoolTestSuite) TestJobRetryUsesBackoff() { + ctx, cancel := context.WithTimeout(s.T().Context(), 2*time.Second) + defer cancel() + + var mu sync.Mutex + var executionTimes []time.Time + + job := workerpool.NewJobWithRetry[any](func(_ context.Context, _ workerpool.JobResultPipe[any]) error { + mu.Lock() + executionTimes = append(executionTimes, time.Now()) + mu.Unlock() + return errors.New("fail") + }, 1) + + mgr := &testManager{pool: &testWorkerPool{}} + err := workerpool.SubmitJob(ctx, mgr, job) + s.Require().NoError(err) + + mu.Lock() + s.Require().Len(executionTimes, 1) + mu.Unlock() + + time.Sleep(20 * time.Millisecond) + mu.Lock() + s.Require().Len(executionTimes, 1) + mu.Unlock() + + s.Require().Eventually(func() bool { + mu.Lock() + defer mu.Unlock() + return len(executionTimes) == 2 + }, 500*time.Millisecond, 10*time.Millisecond) + + mu.Lock() + delta := executionTimes[1].Sub(executionTimes[0]) + mu.Unlock() + s.Require().GreaterOrEqual(delta, 80*time.Millisecond) +} + func (s *WorkerPoolTestSuite) writeIntRangeAsResult( ctx context.Context, t *testing.T,