Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package http
package httpi

import (
"net/http"
Expand Down
4 changes: 2 additions & 2 deletions localization/localization_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
"github.com/pitabwire/frame"
"github.com/pitabwire/frame/localization"
lgrpc "github.com/pitabwire/frame/localization/interceptors/grpc"
lhttp "github.com/pitabwire/frame/localization/interceptors/http"
lhttp "github.com/pitabwire/frame/localization/interceptors/httpi"
"github.com/pitabwire/frame/tests"
)

Expand Down Expand Up @@ -255,7 +255,7 @@ func (s *LocalizationTestSuite) TestLanguageHTTPMiddleware() {
middleware := lhttp.LanguageHTTPMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
lang := localization.FromContext(r.Context())
w.WriteHeader(http.StatusOK)
w.Write([]byte(strings.Join(lang, ",")))
_, _ = w.Write([]byte(strings.Join(lang, ",")))
}))

req := httptest.NewRequest(http.MethodGet, tc.requestPath, nil)
Expand Down
92 changes: 70 additions & 22 deletions queue/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@ import (
"github.com/pitabwire/frame/workerpool"
)

const (
subscriberReceiveErrorBackoffBaseDelay = 100 * time.Millisecond
subscriberReceiveErrorBackoffMaxDelay = 30 * time.Second
subscriberReceiveErrorBackoffMaxConsecutiveErrs = 10
)

type subscriber struct {
reference string
url string
Expand All @@ -30,6 +36,38 @@ type subscriber struct {
workManager workerpool.Manager
}

func subscriberReceiveErrorBackoffDelay(consecutiveErrors int) time.Duration {
if consecutiveErrors <= 0 {
return 0
}

attempt := consecutiveErrors
if attempt > subscriberReceiveErrorBackoffMaxConsecutiveErrs {
attempt = subscriberReceiveErrorBackoffMaxConsecutiveErrs
}

delay := subscriberReceiveErrorBackoffBaseDelay * time.Duration(1<<(attempt-1))
if delay > subscriberReceiveErrorBackoffMaxDelay {
return subscriberReceiveErrorBackoffMaxDelay
}

return delay
}

func waitForDelay(ctx context.Context, delay time.Duration) {
if delay <= 0 {
return
}

timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
case <-timer.C:
}
}

func (s *subscriber) Ref() string {
return s.reference
}
Expand Down Expand Up @@ -227,41 +265,51 @@ func (s *subscriber) listen(ctx context.Context) {
WithField("function", "subscription").
WithField("url", s.url)
logger.Debug("starting to listen for messages")

var consecutiveReceiveErrors int
for {
select {
case <-ctx.Done():
if ctx.Err() != nil {
err := s.Stop(ctx)
if err != nil {
logger.WithError(err).Error("could not stop subscription")
return
}
logger.Debug("exiting due to canceled context")
return
}

default:
msg, err := s.Receive(ctx)
if err != nil {
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Context cancelled or deadline exceeded, loop again to check ctx.Done()
continue
}
// Other errors from Receive are critical for the listener.
logger.WithError(err).Error("could not pull message")

// Recreate subscription
s.recreateSubscription(ctx)
msg, err := s.Receive(ctx)
if err != nil {
if ctx.Err() != nil && (errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded)) {
// Context cancelled or deadline exceeded, loop again to check ctx.Done()
continue
}

// Process the received message. Errors from processing (like job submission failure)
// will be logged by processReceivedMessage. If it's a critical submission error,
// it will be returned and will stop the whole application.
if procErr := s.processReceivedMessage(ctx, msg); procErr != nil {
// processReceivedMessage already logs details. This error is for critical failures.
logger.WithError(procErr).Error("critical error processing message, stopping listener")
s.SendStopError(ctx, procErr) // procErr
return // Exit listen loop
// Other errors from Receive are critical for the listener.
consecutiveReceiveErrors++
if consecutiveReceiveErrors > subscriberReceiveErrorBackoffMaxConsecutiveErrs {
consecutiveReceiveErrors = subscriberReceiveErrorBackoffMaxConsecutiveErrs
}
logger.WithError(err).Error("could not pull message")

// Recreate subscription
s.recreateSubscription(ctx)

delay := subscriberReceiveErrorBackoffDelay(consecutiveReceiveErrors)
waitForDelay(ctx, delay)
continue
}

consecutiveReceiveErrors = 0

// Process the received message. Errors from processing (like job submission failure)
// will be logged by processReceivedMessage. If it's a critical submission error,
// it will be returned and will stop the whole application.
if procErr := s.processReceivedMessage(ctx, msg); procErr != nil {
// processReceivedMessage already logs details. This error is for critical failures.
logger.WithError(procErr).Error("critical error processing message, stopping listener")
s.SendStopError(ctx, procErr) // procErr
return // Exit listen loop
}
}
}
Expand Down
81 changes: 73 additions & 8 deletions workerpool/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,81 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/pitabwire/util"

"github.com/pitabwire/frame/config"
)

const (
jobRetryBackoffBaseDelay = 100 * time.Millisecond
jobRetryBackoffMaxDelay = 30 * time.Second
jobRetryBackoffMaxRunNumber = 10
)

func shouldCloseJob(executionErr error) bool {
return executionErr == nil || errors.Is(executionErr, context.Canceled) ||
errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed)
}

func jobRetryBackoffDelay(run int) time.Duration {
if run < 1 {
run = 1
}

if run > jobRetryBackoffMaxRunNumber {
run = jobRetryBackoffMaxRunNumber
}

delay := jobRetryBackoffBaseDelay * time.Duration(1<<(run-1))
if delay > jobRetryBackoffMaxDelay {
return jobRetryBackoffMaxDelay
}

return delay
}

func handleResubmitError[T any](
ctx context.Context,
job Job[T],
log *util.LogEntry,
executionErr error,
resubmitErr error,
) {
if resubmitErr == nil {
return
}

log.WithError(resubmitErr).Error("Failed to resubmit job")
_ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr))
job.Close()
}

func scheduleRetryResubmission[T any](
ctx context.Context,
s Manager,
job Job[T],
delay time.Duration,
log *util.LogEntry,
executionErr error,
) {
go func() {
timer := time.NewTimer(delay)
defer timer.Stop()

select {
case <-ctx.Done():
job.Close()
return
case <-timer.C:
}

resubmitErr := SubmitJob(ctx, s, job)
handleResubmitError(ctx, job, log, executionErr, resubmitErr)
}()
}

type manager struct {
pool WorkerPool
stopErr func(ctx context.Context, err error)
Expand Down Expand Up @@ -88,8 +157,7 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f
executionErr := job.F()(ctx, job)

// Handle successful execution first and return early
if executionErr == nil || errors.Is(executionErr, context.Canceled) ||
errors.Is(executionErr, ErrWorkerPoolResultChannelIsClosed) {
if shouldCloseJob(executionErr) {
job.Close()
return
}
Expand All @@ -105,11 +173,8 @@ func createJobExecutionTask[T any](ctx context.Context, s Manager, job Job[T]) f

// Job can be retried to resolve error
log.Warn("Job failed, attempting to retry it")
resubmitErr := SubmitJob(ctx, s, job) // Recursive call to SubmitJob for retry
if resubmitErr != nil {
log.WithError(resubmitErr).Error("Failed to resubmit job")
_ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr))
job.Close()
}

delay := jobRetryBackoffDelay(job.Runs())
scheduleRetryResubmission(ctx, s, job, delay, log, executionErr)
}
}
55 changes: 55 additions & 0 deletions workerpool/worker_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,22 @@ type WorkerPoolTestSuite struct {
tests.BaseTestSuite
}

type testWorkerPool struct{}

func (p *testWorkerPool) Submit(_ context.Context, task func()) error {
task()
return nil
}

func (p *testWorkerPool) Shutdown() {}

type testManager struct {
pool workerpool.WorkerPool
}

func (m *testManager) GetPool() (workerpool.WorkerPool, error) { return m.pool, nil }
func (m *testManager) StopError(_ context.Context, _ error) {}

// TestWorkerPoolSuite runs the worker pool test suite.
func TestWorkerPoolSuite(t *testing.T) {
suite.Run(t, &WorkerPoolTestSuite{})
Expand Down Expand Up @@ -72,6 +88,45 @@ func (s *WorkerPoolTestSuite) TestJobImplChannelOperations() {
}
}

func (s *WorkerPoolTestSuite) TestJobRetryUsesBackoff() {
ctx, cancel := context.WithTimeout(s.T().Context(), 2*time.Second)
defer cancel()

var mu sync.Mutex
var executionTimes []time.Time

job := workerpool.NewJobWithRetry[any](func(_ context.Context, _ workerpool.JobResultPipe[any]) error {
mu.Lock()
executionTimes = append(executionTimes, time.Now())
mu.Unlock()
return errors.New("fail")
}, 1)

mgr := &testManager{pool: &testWorkerPool{}}
err := workerpool.SubmitJob(ctx, mgr, job)
s.Require().NoError(err)

mu.Lock()
s.Require().Len(executionTimes, 1)
mu.Unlock()

time.Sleep(20 * time.Millisecond)
mu.Lock()
s.Require().Len(executionTimes, 1)
mu.Unlock()

s.Require().Eventually(func() bool {
mu.Lock()
defer mu.Unlock()
return len(executionTimes) == 2
}, 500*time.Millisecond, 10*time.Millisecond)

mu.Lock()
delta := executionTimes[1].Sub(executionTimes[0])
mu.Unlock()
s.Require().GreaterOrEqual(delta, 80*time.Millisecond)
}

func (s *WorkerPoolTestSuite) writeIntRangeAsResult(
ctx context.Context,
t *testing.T,
Expand Down