Skip to content

Conversation

@Danish-sarwar1
Copy link

@Danish-sarwar1 Danish-sarwar1 commented Feb 2, 2026

When a topic reaches "Realtime" state but the reload table doesn't exist in Redshift (e.g., due to no data flow or loader issues), the release would fail with "relation does not exist" error and the topic would be stuck in Realtime state indefinitely.

This fix adds:

  • Pre-release check for reload table existence in release.go
  • New moveTopicToReloading() method in status.go that:
    • Moves topic from realtime back to reloading state
    • Clears cached loader offset to prevent stale data
    • Updates mask status phase to Reloading

Now when release fails due to missing table, the topic automatically moves back to Reloading state, allowing the reload sink group to reprocess it and create the table.

Summary by CodeRabbit

  • Bug Fixes
    • Added validation to detect and gracefully handle missing reload targets, preventing errors during reload operations.
    • Enhanced topic state management to properly synchronize state transitions between active and reloading modes.

When a topic reaches "Realtime" state but the reload table doesn't exist
in Redshift (e.g., due to no data flow or loader issues), the release
would fail with "relation does not exist" error and the topic would be
stuck in Realtime state indefinitely.

This fix adds:
- Pre-release check for reload table existence in release.go
- New moveTopicToReloading() method in status.go that:
  - Moves topic from realtime back to reloading state
  - Clears cached loader offset to prevent stale data
  - Updates mask status phase to Reloading

Now when release fails due to missing table, the topic automatically
moves back to Reloading state, allowing the reload sink group to
reprocess it and create the table.

Co-authored-by: Cursor <cursoragent@cursor.com>
@coderabbitai
Copy link

coderabbitai bot commented Feb 2, 2026

Walkthrough

Two controller methods were enhanced to improve topic state management. First, a pre-check was added to validate reload table existence before proceeding with release operations. Second, a new helper method was introduced to facilitate moving topics between the realtime and reloading state sets with associated cache and status updates.

Changes

Cohort / File(s) Summary
Topic state validation and transitions
controllers/release.go, controllers/status.go
Added pre-check in releaseTopic to verify reload table existence and short-circuit on missing table; introduced new moveTopicToReloading method to migrate topics from realtime to reloading state, including loader offset cache clearing and MaskStatus phase updates.

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

🚥 Pre-merge checks | ✅ 3
✅ Passed checks (3 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title directly addresses the main change: adding auto-recovery logic when a reload table is missing during the release process.
Docstring Coverage ✅ Passed No functions found in the changed files to evaluate docstring coverage. Skipping docstring coverage check.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing touches
  • 📝 Generate docstrings
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch fix/auto-recovery-missing-reload-table

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🤖 Fix all issues with AI agents
In `@controllers/release.go`:
- Around line 82-99: When reload table is missing, the in-memory change via
status.moveTopicToReloading(topic) must be persisted before returning; call the
same status-patch routine used elsewhere (e.g., the function that patches the CR
status) immediately after status.moveTopicToReloading(topic) and handle its
error (log/return a wrapped error if the patch fails) so the persisted status
reflects Recalling/Reloading; also update the caller that invokes release() in
the reconcile loop (the redshiftsink_controller reconcile path that handles
release errors) to attempt a status patch when release() returns an error so the
CR status cannot remain stuck in Realtime. Ensure you reference and use
status.moveTopicToReloading(topic), the status-patch helper used in this
controller, and release() in your changes and surface any patch errors alongside
the original reload-table error.
🧹 Nitpick comments (1)
controllers/status.go (1)

349-372: Consider sorting s.reloading for consistency with other status methods.

Methods like computeReloading() and computeReloadingDupe() call sortStringSlice() after modifying the reloading slices. While the status object is rebuilt on each reconciliation cycle (which will sort the slice), adding explicit sorting here aligns with the established pattern in the codebase.

🔧 Optional: Add sorting for consistency
 	// Add to reloading
 	s.reloading = appendIfMissing(s.reloading, topic)
+	sortStringSlice(s.reloading)

Status changes from this method are automatically persisted via the deferred patching mechanism in the Reconcile function, so no action is needed regarding persistence.

Comment on lines +82 to +99
// Check if reload table exists before attempting release
// This prevents the "relation does not exist" error and allows recovery
reloadTableExist, err := r.redshifter.TableExist(ctx, schema, reloadedTable)
if err != nil {
return fmt.Errorf("error checking reload table existence: %v", err)
}
if !reloadTableExist {
klog.Warningf(
"rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading",
r.rsk.Name, schema, reloadedTable, topic,
)
// Move topic back to reloading state so it can be reprocessed
status.moveTopicToReloading(topic)
return fmt.Errorf(
"reload table %s.%s does not exist, topic %s moved back to reloading state",
schema, reloadedTable, topic,
)
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the release() method structure around the problematic area
cat -n controllers/release.go | sed -n '70,200p'

Repository: practo/tipoca-stream

Length of output: 4381


🏁 Script executed:

# Also check for the releaseTopic method and how it handles errors from release()
rg -n 'func.*releaseTopic|\.release\(' controllers/release.go -A 2

Repository: practo/tipoca-stream

Length of output: 140


🏁 Script executed:

# Check for any defer statements in the release method that might handle cleanup
rg -n 'defer' controllers/release.go -B 2 -A 2

Repository: practo/tipoca-stream

Length of output: 46


🏁 Script executed:

# Check the caller of release() method - likely in the main reconciliation/processing loop
rg -n 'r\.release\(' controllers/ -B 5 -A 10

Repository: practo/tipoca-stream

Length of output: 1180


🏁 Script executed:

# Check what happens in the caller after releaseError is captured
cat -n controllers/redshiftsink_controller.go | sed -n '600,660p'

Repository: practo/tipoca-stream

Length of output: 1966


🏁 Script executed:

# Verify the status object lifecycle - check if status modifications are persisted elsewhere
rg -n 'status.moveTopicToReloading|patcher.Patch' controllers/release.go

Repository: practo/tipoca-stream

Length of output: 195


🏁 Script executed:

# Check if there's any error handling or recovery in the caller that might patch status after release error
cat -n controllers/redshiftsink_controller.go | sed -n '615,650p'

Repository: practo/tipoca-stream

Length of output: 1244


🏁 Script executed:

# Check the method signatures for status object to understand if it's passed by value or reference
rg -n 'func.*moveTopicToReloading|type status struct' controllers/ -A 5

Repository: practo/tipoca-stream

Length of output: 723


🏁 Script executed:

# Check if there's any defer statement or cleanup in the release function or its callers
cat -n controllers/release.go | sed -n '167,195p'

Repository: practo/tipoca-stream

Length of output: 875


🏁 Script executed:

# Check if there's any cleanup or recovery mechanism after the release loop fails
cat -n controllers/redshiftsink_controller.go | sed -n '580,670p'

Repository: practo/tipoca-stream

Length of output: 2973


🏁 Script executed:

# Verify that moveTopicToReloading is a pointer receiver, confirming in-memory changes
cat -n controllers/status.go | sed -n '352,365p'

Repository: practo/tipoca-stream

Length of output: 725


Critical: Status change is not persisted when reload table is missing.

The call to status.moveTopicToReloading(topic) at line 94 modifies the in-memory status object, but since an error is returned immediately after, the status patch at line 146 is never reached. The release() method (lines 181-191) only rolls back the database transaction; it does not patch the Kubernetes status. Additionally, the caller at line 615-642 in redshiftsink_controller.go does not attempt to patch status after a release error.

This means the topic will remain stuck in the Realtime state in the persisted status, defeating the purpose of this auto-recovery feature.

Proposed fix: Patch status before returning error
 	if !reloadTableExist {
 		klog.Warningf(
 			"rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading",
 			r.rsk.Name, schema, reloadedTable, topic,
 		)
+		// Make a copy before modifying status for patching
+		statusCopy := status.deepCopy()
 		// Move topic back to reloading state so it can be reprocessed
 		status.moveTopicToReloading(topic)
+		status.updateMaskStatus()
+		// Persist the status change immediately so topic is actually moved back to reloading
+		if patchErr := patcher.Patch(ctx, statusCopy.rsk, status.rsk, fmt.Sprintf("move %s to reloading", topic)); patchErr != nil {
+			klog.Errorf("rsk/%s failed to patch status after moving topic %s to reloading: %v", r.rsk.Name, topic, patchErr)
+		}
 		return fmt.Errorf(
 			"reload table %s.%s does not exist, topic %s moved back to reloading state",
 			schema, reloadedTable, topic,
 		)
 	}
🤖 Prompt for AI Agents
In `@controllers/release.go` around lines 82 - 99, When reload table is missing,
the in-memory change via status.moveTopicToReloading(topic) must be persisted
before returning; call the same status-patch routine used elsewhere (e.g., the
function that patches the CR status) immediately after
status.moveTopicToReloading(topic) and handle its error (log/return a wrapped
error if the patch fails) so the persisted status reflects Recalling/Reloading;
also update the caller that invokes release() in the reconcile loop (the
redshiftsink_controller reconcile path that handles release errors) to attempt a
status patch when release() returns an error so the CR status cannot remain
stuck in Realtime. Ensure you reference and use
status.moveTopicToReloading(topic), the status-patch helper used in this
controller, and release() in your changes and surface any patch errors alongside
the original reload-table error.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants