Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions controllers/release.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,25 @@ func (r *releaser) releaseTopic(
_, _, table := transformer.ParseTopic(topic)
reloadedTable := table + tableSuffix

// Check if reload table exists before attempting release
// This prevents the "relation does not exist" error and allows recovery
reloadTableExist, err := r.redshifter.TableExist(ctx, schema, reloadedTable)
if err != nil {
return fmt.Errorf("error checking reload table existence: %v", err)
}
if !reloadTableExist {
klog.Warningf(
"rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading",
r.rsk.Name, schema, reloadedTable, topic,
)
// Move topic back to reloading state so it can be reprocessed
status.moveTopicToReloading(topic)
return fmt.Errorf(
"reload table %s.%s does not exist, topic %s moved back to reloading state",
schema, reloadedTable, topic,
)
}
Comment on lines +82 to +99
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

🏁 Script executed:

# First, let's examine the release() method structure around the problematic area
cat -n controllers/release.go | sed -n '70,200p'

Repository: practo/tipoca-stream

Length of output: 4381


🏁 Script executed:

# Also check for the releaseTopic method and how it handles errors from release()
rg -n 'func.*releaseTopic|\.release\(' controllers/release.go -A 2

Repository: practo/tipoca-stream

Length of output: 140


🏁 Script executed:

# Check for any defer statements in the release method that might handle cleanup
rg -n 'defer' controllers/release.go -B 2 -A 2

Repository: practo/tipoca-stream

Length of output: 46


🏁 Script executed:

# Check the caller of release() method - likely in the main reconciliation/processing loop
rg -n 'r\.release\(' controllers/ -B 5 -A 10

Repository: practo/tipoca-stream

Length of output: 1180


🏁 Script executed:

# Check what happens in the caller after releaseError is captured
cat -n controllers/redshiftsink_controller.go | sed -n '600,660p'

Repository: practo/tipoca-stream

Length of output: 1966


🏁 Script executed:

# Verify the status object lifecycle - check if status modifications are persisted elsewhere
rg -n 'status.moveTopicToReloading|patcher.Patch' controllers/release.go

Repository: practo/tipoca-stream

Length of output: 195


🏁 Script executed:

# Check if there's any error handling or recovery in the caller that might patch status after release error
cat -n controllers/redshiftsink_controller.go | sed -n '615,650p'

Repository: practo/tipoca-stream

Length of output: 1244


🏁 Script executed:

# Check the method signatures for status object to understand if it's passed by value or reference
rg -n 'func.*moveTopicToReloading|type status struct' controllers/ -A 5

Repository: practo/tipoca-stream

Length of output: 723


🏁 Script executed:

# Check if there's any defer statement or cleanup in the release function or its callers
cat -n controllers/release.go | sed -n '167,195p'

Repository: practo/tipoca-stream

Length of output: 875


🏁 Script executed:

# Check if there's any cleanup or recovery mechanism after the release loop fails
cat -n controllers/redshiftsink_controller.go | sed -n '580,670p'

Repository: practo/tipoca-stream

Length of output: 2973


🏁 Script executed:

# Verify that moveTopicToReloading is a pointer receiver, confirming in-memory changes
cat -n controllers/status.go | sed -n '352,365p'

Repository: practo/tipoca-stream

Length of output: 725


Critical: Status change is not persisted when reload table is missing.

The call to status.moveTopicToReloading(topic) at line 94 modifies the in-memory status object, but since an error is returned immediately after, the status patch at line 146 is never reached. The release() method (lines 181-191) only rolls back the database transaction; it does not patch the Kubernetes status. Additionally, the caller at line 615-642 in redshiftsink_controller.go does not attempt to patch status after a release error.

This means the topic will remain stuck in the Realtime state in the persisted status, defeating the purpose of this auto-recovery feature.

Proposed fix: Patch status before returning error
 	if !reloadTableExist {
 		klog.Warningf(
 			"rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading",
 			r.rsk.Name, schema, reloadedTable, topic,
 		)
+		// Make a copy before modifying status for patching
+		statusCopy := status.deepCopy()
 		// Move topic back to reloading state so it can be reprocessed
 		status.moveTopicToReloading(topic)
+		status.updateMaskStatus()
+		// Persist the status change immediately so topic is actually moved back to reloading
+		if patchErr := patcher.Patch(ctx, statusCopy.rsk, status.rsk, fmt.Sprintf("move %s to reloading", topic)); patchErr != nil {
+			klog.Errorf("rsk/%s failed to patch status after moving topic %s to reloading: %v", r.rsk.Name, topic, patchErr)
+		}
 		return fmt.Errorf(
 			"reload table %s.%s does not exist, topic %s moved back to reloading state",
 			schema, reloadedTable, topic,
 		)
 	}
🤖 Prompt for AI Agents
In `@controllers/release.go` around lines 82 - 99, When reload table is missing,
the in-memory change via status.moveTopicToReloading(topic) must be persisted
before returning; call the same status-patch routine used elsewhere (e.g., the
function that patches the CR status) immediately after
status.moveTopicToReloading(topic) and handle its error (log/return a wrapped
error if the patch fails) so the persisted status reflects Recalling/Reloading;
also update the caller that invokes release() in the reconcile loop (the
redshiftsink_controller reconcile path that handles release errors) to attempt a
status patch when release() returns an error so the CR status cannot remain
stuck in Realtime. Ensure you reference and use
status.moveTopicToReloading(topic), the status-patch helper used in this
controller, and release() in your changes and surface any patch errors alongside
the original reload-table error.


tableExist, err := r.redshifter.TableExist(ctx, schema, table)
if err != nil {
return err
Expand Down
25 changes: 25 additions & 0 deletions controllers/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,31 @@ func (s *status) updateTopicsOnRelease(releasedTopic string) {
s.realtime = removeFromSlice(s.realtime, releasedTopic)
}

// moveTopicToReloading moves a topic from realtime back to reloading state.
// This is used when release fails due to missing reload table, allowing
// the topic to be reprocessed by the reload sink group.
func (s *status) moveTopicToReloading(topic string) {
klog.V(2).Infof("rsk/%s moving topic %s from realtime to reloading", s.rsk.Name, topic)

// Remove from realtime
s.realtime = removeFromSlice(s.realtime, topic)

// Add to reloading
s.reloading = appendIfMissing(s.reloading, topic)

// Clear the cached loader offset so realtime calculator doesn't use stale data
s.deleteLoaderTopicGroupCurrentOffset(topic)

// Update the mask status for this topic to Reloading
if s.rsk.Status.MaskStatus != nil && s.rsk.Status.MaskStatus.CurrentMaskStatus != nil {
if topicStatus, ok := s.rsk.Status.MaskStatus.CurrentMaskStatus[topic]; ok {
topicStatus.Phase = tipocav1.MaskReloading
s.rsk.Status.MaskStatus.CurrentMaskStatus[topic] = topicStatus
klog.V(2).Infof("rsk/%s updated topic %s phase to Reloading", s.rsk.Name, topic)
}
}
}

func (s *status) computerCurrentMaskStatus() map[string]tipocav1.TopicMaskStatus {
topicsReleased := toMap(s.released)
topicsRealtime := toMap(s.realtime)
Expand Down