-
Notifications
You must be signed in to change notification settings - Fork 2
Prevent busy-loop retries in queue subscriber and workerpool #609
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Add exponential backoff after subscriber Receive errors and avoid spinning on ctx-like errors when ctx is still active - Add exponential backoff before workerpool job resubmission retries - Add regression test to ensure retries are delayed
Summary of ChangesHello @pitabwire, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request addresses a performance and stability issue where both the queue subscriber and the workerpool could enter busy-loop retry scenarios, consuming excessive CPU resources. By introducing context-aware exponential backoff strategies for both message reception errors and job resubmission, the system now gracefully handles transient failures, allowing for recovery without resource exhaustion. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
|
There is a problem with the Gemini CLI PR review. Please check the action logs for details. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces exponential backoff to prevent busy-looping on retries in both the queue subscriber and the worker pool, which is a great improvement for CPU usage under failure conditions. The implementation is solid and includes a good regression test. I have a few suggestions to improve maintainability and correctness: one regarding simplifying context cancellation handling, another about removing some dead code, and a final one about refactoring duplicated backoff logic into a shared helper function to adhere to the DRY principle.
queue/subscriber.go
Outdated
| if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { | ||
| // Context cancelled or deadline exceeded, loop again to check ctx.Done() | ||
| if ctx.Err() != nil { | ||
| continue | ||
| } | ||
| // Other errors from Receive are critical for the listener. | ||
| logger.WithError(err).Error("could not pull message") | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic for handling context cancellation errors can be simplified. If s.Receive(ctx) returns a context.Canceled or context.DeadlineExceeded error, we should unconditionally continue to the top of the loop. The if ctx.Err() != nil check at the beginning of the loop will then correctly handle the shutdown. The current implementation with the nested if ctx.Err() != nil check is unnecessarily complex and could lead to incorrect behavior (like an unwanted backoff) if Receive returns a context error while ctx.Err() is momentarily nil.
if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
// Context cancelled or deadline exceeded, loop again to check ctx.Done()
continue
}
workerpool/manager.go
Outdated
| attempt := job.Runs() | ||
| if attempt > 10 { | ||
| attempt = 10 | ||
| } | ||
|
|
||
| delay := 100 * time.Millisecond * time.Duration(1<<uint(attempt-1)) | ||
| if delay > 30*time.Second { | ||
| delay = 30 * time.Second | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exponential backoff logic is nearly identical to the logic in queue/subscriber.go. To improve maintainability and avoid code duplication (DRY principle), consider extracting this into a shared helper function.
Additionally, the hardcoded values 10, 100 * time.Millisecond, and 30 * time.Second should be defined as constants for better readability and easier modification.
A shared utility could look like this:
const (
maxBackoffAttempts = 10
baseBackoffDelay = 100 * time.Millisecond
maxBackoffDelay = 30 * time.Second
)
// CalculateBackoff computes an exponential backoff duration for a given attempt.
func CalculateBackoff(attempt int) time.Duration {
if attempt <= 0 {
return 0
}
if attempt > maxBackoffAttempts {
attempt = maxBackoffAttempts
}
delay := baseBackoffDelay * time.Duration(1<<uint(attempt-1))
if delay > maxBackoffDelay {
delay = maxBackoffDelay
}
return delay
}This would make the code in both places cleaner and more consistent.
workerpool/manager.go
Outdated
| if delay > 0 { | ||
| go func() { | ||
| timer := time.NewTimer(delay) | ||
| defer timer.Stop() | ||
|
|
||
| select { | ||
| case <-ctx.Done(): | ||
| job.Close() | ||
| return | ||
| case <-timer.C: | ||
| } | ||
|
|
||
| resubmitErr := SubmitJob(ctx, s, job) | ||
| if resubmitErr != nil { | ||
| log.WithError(resubmitErr).Error("Failed to resubmit job") | ||
| _ = job.WriteError(ctx, fmt.Errorf("failed to resubmit job: %w", executionErr)) | ||
| job.Close() | ||
| } | ||
| }() | ||
| return | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The added backoff logic is a good improvement. However, as implemented, the delay will always be greater than 0 because job.Runs() is at least 1. This means the if delay > 0 condition is always true, and the function will always return from within this block. Consequently, the original code for immediate resubmission starting at line 142 is now unreachable (dead code) and should be removed for clarity.
Summary
Subscription.Receiveerrors (including subscription recreation attempts).Why
Both
gocloud.dev/pubsub.Subscription.Receive(on permanent/non-retryable errors) and workerpool retry resubmission could otherwise fail fast in a tight loop and burn CPU.Tests
go test ./...