From c558e28bdbe85ad3f96e1c86dbd72dd4887c6fdb Mon Sep 17 00:00:00 2001 From: Danish Sarwar Date: Mon, 2 Feb 2026 23:58:09 +0530 Subject: [PATCH] Fix auto-recovery when reload table is missing during release When a topic reaches "Realtime" state but the reload table doesn't exist in Redshift (e.g., due to no data flow or loader issues), the release would fail with "relation does not exist" error and the topic would be stuck in Realtime state indefinitely. This fix adds: - Pre-release check for reload table existence in release.go - New moveTopicToReloading() method in status.go that: - Moves topic from realtime back to reloading state - Clears cached loader offset to prevent stale data - Updates mask status phase to Reloading Now when release fails due to missing table, the topic automatically moves back to Reloading state, allowing the reload sink group to reprocess it and create the table. Co-authored-by: Cursor --- controllers/release.go | 19 +++++++++++++++++++ controllers/status.go | 25 +++++++++++++++++++++++++ 2 files changed, 44 insertions(+) diff --git a/controllers/release.go b/controllers/release.go index 18b3b90e9..606b0c36a 100644 --- a/controllers/release.go +++ b/controllers/release.go @@ -79,6 +79,25 @@ func (r *releaser) releaseTopic( _, _, table := transformer.ParseTopic(topic) reloadedTable := table + tableSuffix + // Check if reload table exists before attempting release + // This prevents the "relation does not exist" error and allows recovery + reloadTableExist, err := r.redshifter.TableExist(ctx, schema, reloadedTable) + if err != nil { + return fmt.Errorf("error checking reload table existence: %v", err) + } + if !reloadTableExist { + klog.Warningf( + "rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading", + r.rsk.Name, schema, reloadedTable, topic, + ) + // Move topic back to reloading state so it can be reprocessed + status.moveTopicToReloading(topic) + return fmt.Errorf( + "reload table %s.%s does not exist, topic %s moved back to reloading state", + schema, reloadedTable, topic, + ) + } + tableExist, err := r.redshifter.TableExist(ctx, schema, table) if err != nil { return err diff --git a/controllers/status.go b/controllers/status.go index 09934c28c..21c43335e 100644 --- a/controllers/status.go +++ b/controllers/status.go @@ -346,6 +346,31 @@ func (s *status) updateTopicsOnRelease(releasedTopic string) { s.realtime = removeFromSlice(s.realtime, releasedTopic) } +// moveTopicToReloading moves a topic from realtime back to reloading state. +// This is used when release fails due to missing reload table, allowing +// the topic to be reprocessed by the reload sink group. +func (s *status) moveTopicToReloading(topic string) { + klog.V(2).Infof("rsk/%s moving topic %s from realtime to reloading", s.rsk.Name, topic) + + // Remove from realtime + s.realtime = removeFromSlice(s.realtime, topic) + + // Add to reloading + s.reloading = appendIfMissing(s.reloading, topic) + + // Clear the cached loader offset so realtime calculator doesn't use stale data + s.deleteLoaderTopicGroupCurrentOffset(topic) + + // Update the mask status for this topic to Reloading + if s.rsk.Status.MaskStatus != nil && s.rsk.Status.MaskStatus.CurrentMaskStatus != nil { + if topicStatus, ok := s.rsk.Status.MaskStatus.CurrentMaskStatus[topic]; ok { + topicStatus.Phase = tipocav1.MaskReloading + s.rsk.Status.MaskStatus.CurrentMaskStatus[topic] = topicStatus + klog.V(2).Infof("rsk/%s updated topic %s phase to Reloading", s.rsk.Name, topic) + } + } +} + func (s *status) computerCurrentMaskStatus() map[string]tipocav1.TopicMaskStatus { topicsReleased := toMap(s.released) topicsRealtime := toMap(s.realtime)