-
Notifications
You must be signed in to change notification settings - Fork 5
Fix auto-recovery when reload table is missing during release #273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
When a topic reaches "Realtime" state but the reload table doesn't exist in Redshift (e.g., due to no data flow or loader issues), the release would fail with "relation does not exist" error and the topic would be stuck in Realtime state indefinitely. This fix adds: - Pre-release check for reload table existence in release.go - New moveTopicToReloading() method in status.go that: - Moves topic from realtime back to reloading state - Clears cached loader offset to prevent stale data - Updates mask status phase to Reloading Now when release fails due to missing table, the topic automatically moves back to Reloading state, allowing the reload sink group to reprocess it and create the table. Co-authored-by: Cursor <cursoragent@cursor.com>
WalkthroughTwo controller methods were enhanced to improve topic state management. First, a pre-check was added to validate reload table existence before proceeding with release operations. Second, a new helper method was introduced to facilitate moving topics between the realtime and reloading state sets with associated cache and status updates. Changes
Estimated code review effort🎯 2 (Simple) | ⏱️ ~10 minutes 🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing touches
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🤖 Fix all issues with AI agents
In `@controllers/release.go`:
- Around line 82-99: When reload table is missing, the in-memory change via
status.moveTopicToReloading(topic) must be persisted before returning; call the
same status-patch routine used elsewhere (e.g., the function that patches the CR
status) immediately after status.moveTopicToReloading(topic) and handle its
error (log/return a wrapped error if the patch fails) so the persisted status
reflects Recalling/Reloading; also update the caller that invokes release() in
the reconcile loop (the redshiftsink_controller reconcile path that handles
release errors) to attempt a status patch when release() returns an error so the
CR status cannot remain stuck in Realtime. Ensure you reference and use
status.moveTopicToReloading(topic), the status-patch helper used in this
controller, and release() in your changes and surface any patch errors alongside
the original reload-table error.
🧹 Nitpick comments (1)
controllers/status.go (1)
349-372: Consider sortings.reloadingfor consistency with other status methods.Methods like
computeReloading()andcomputeReloadingDupe()callsortStringSlice()after modifying the reloading slices. While the status object is rebuilt on each reconciliation cycle (which will sort the slice), adding explicit sorting here aligns with the established pattern in the codebase.🔧 Optional: Add sorting for consistency
// Add to reloading s.reloading = appendIfMissing(s.reloading, topic) + sortStringSlice(s.reloading)Status changes from this method are automatically persisted via the deferred patching mechanism in the
Reconcilefunction, so no action is needed regarding persistence.
| // Check if reload table exists before attempting release | ||
| // This prevents the "relation does not exist" error and allows recovery | ||
| reloadTableExist, err := r.redshifter.TableExist(ctx, schema, reloadedTable) | ||
| if err != nil { | ||
| return fmt.Errorf("error checking reload table existence: %v", err) | ||
| } | ||
| if !reloadTableExist { | ||
| klog.Warningf( | ||
| "rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading", | ||
| r.rsk.Name, schema, reloadedTable, topic, | ||
| ) | ||
| // Move topic back to reloading state so it can be reprocessed | ||
| status.moveTopicToReloading(topic) | ||
| return fmt.Errorf( | ||
| "reload table %s.%s does not exist, topic %s moved back to reloading state", | ||
| schema, reloadedTable, topic, | ||
| ) | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
# First, let's examine the release() method structure around the problematic area
cat -n controllers/release.go | sed -n '70,200p'Repository: practo/tipoca-stream
Length of output: 4381
🏁 Script executed:
# Also check for the releaseTopic method and how it handles errors from release()
rg -n 'func.*releaseTopic|\.release\(' controllers/release.go -A 2Repository: practo/tipoca-stream
Length of output: 140
🏁 Script executed:
# Check for any defer statements in the release method that might handle cleanup
rg -n 'defer' controllers/release.go -B 2 -A 2Repository: practo/tipoca-stream
Length of output: 46
🏁 Script executed:
# Check the caller of release() method - likely in the main reconciliation/processing loop
rg -n 'r\.release\(' controllers/ -B 5 -A 10Repository: practo/tipoca-stream
Length of output: 1180
🏁 Script executed:
# Check what happens in the caller after releaseError is captured
cat -n controllers/redshiftsink_controller.go | sed -n '600,660p'Repository: practo/tipoca-stream
Length of output: 1966
🏁 Script executed:
# Verify the status object lifecycle - check if status modifications are persisted elsewhere
rg -n 'status.moveTopicToReloading|patcher.Patch' controllers/release.goRepository: practo/tipoca-stream
Length of output: 195
🏁 Script executed:
# Check if there's any error handling or recovery in the caller that might patch status after release error
cat -n controllers/redshiftsink_controller.go | sed -n '615,650p'Repository: practo/tipoca-stream
Length of output: 1244
🏁 Script executed:
# Check the method signatures for status object to understand if it's passed by value or reference
rg -n 'func.*moveTopicToReloading|type status struct' controllers/ -A 5Repository: practo/tipoca-stream
Length of output: 723
🏁 Script executed:
# Check if there's any defer statement or cleanup in the release function or its callers
cat -n controllers/release.go | sed -n '167,195p'Repository: practo/tipoca-stream
Length of output: 875
🏁 Script executed:
# Check if there's any cleanup or recovery mechanism after the release loop fails
cat -n controllers/redshiftsink_controller.go | sed -n '580,670p'Repository: practo/tipoca-stream
Length of output: 2973
🏁 Script executed:
# Verify that moveTopicToReloading is a pointer receiver, confirming in-memory changes
cat -n controllers/status.go | sed -n '352,365p'Repository: practo/tipoca-stream
Length of output: 725
Critical: Status change is not persisted when reload table is missing.
The call to status.moveTopicToReloading(topic) at line 94 modifies the in-memory status object, but since an error is returned immediately after, the status patch at line 146 is never reached. The release() method (lines 181-191) only rolls back the database transaction; it does not patch the Kubernetes status. Additionally, the caller at line 615-642 in redshiftsink_controller.go does not attempt to patch status after a release error.
This means the topic will remain stuck in the Realtime state in the persisted status, defeating the purpose of this auto-recovery feature.
Proposed fix: Patch status before returning error
if !reloadTableExist {
klog.Warningf(
"rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading",
r.rsk.Name, schema, reloadedTable, topic,
)
+ // Make a copy before modifying status for patching
+ statusCopy := status.deepCopy()
// Move topic back to reloading state so it can be reprocessed
status.moveTopicToReloading(topic)
+ status.updateMaskStatus()
+ // Persist the status change immediately so topic is actually moved back to reloading
+ if patchErr := patcher.Patch(ctx, statusCopy.rsk, status.rsk, fmt.Sprintf("move %s to reloading", topic)); patchErr != nil {
+ klog.Errorf("rsk/%s failed to patch status after moving topic %s to reloading: %v", r.rsk.Name, topic, patchErr)
+ }
return fmt.Errorf(
"reload table %s.%s does not exist, topic %s moved back to reloading state",
schema, reloadedTable, topic,
)
}🤖 Prompt for AI Agents
In `@controllers/release.go` around lines 82 - 99, When reload table is missing,
the in-memory change via status.moveTopicToReloading(topic) must be persisted
before returning; call the same status-patch routine used elsewhere (e.g., the
function that patches the CR status) immediately after
status.moveTopicToReloading(topic) and handle its error (log/return a wrapped
error if the patch fails) so the persisted status reflects Recalling/Reloading;
also update the caller that invokes release() in the reconcile loop (the
redshiftsink_controller reconcile path that handles release errors) to attempt a
status patch when release() returns an error so the CR status cannot remain
stuck in Realtime. Ensure you reference and use
status.moveTopicToReloading(topic), the status-patch helper used in this
controller, and release() in your changes and surface any patch errors alongside
the original reload-table error.
When a topic reaches "Realtime" state but the reload table doesn't exist in Redshift (e.g., due to no data flow or loader issues), the release would fail with "relation does not exist" error and the topic would be stuck in Realtime state indefinitely.
This fix adds:
Now when release fails due to missing table, the topic automatically moves back to Reloading state, allowing the reload sink group to reprocess it and create the table.
Summary by CodeRabbit