diff --git a/controllers/release.go b/controllers/release.go index 18b3b90e..606b0c36 100644 --- a/controllers/release.go +++ b/controllers/release.go @@ -79,6 +79,25 @@ func (r *releaser) releaseTopic( _, _, table := transformer.ParseTopic(topic) reloadedTable := table + tableSuffix + // Check if reload table exists before attempting release + // This prevents the "relation does not exist" error and allows recovery + reloadTableExist, err := r.redshifter.TableExist(ctx, schema, reloadedTable) + if err != nil { + return fmt.Errorf("error checking reload table existence: %v", err) + } + if !reloadTableExist { + klog.Warningf( + "rsk/%s reload table %s.%s does not exist for topic %s, moving back to reloading", + r.rsk.Name, schema, reloadedTable, topic, + ) + // Move topic back to reloading state so it can be reprocessed + status.moveTopicToReloading(topic) + return fmt.Errorf( + "reload table %s.%s does not exist, topic %s moved back to reloading state", + schema, reloadedTable, topic, + ) + } + tableExist, err := r.redshifter.TableExist(ctx, schema, table) if err != nil { return err diff --git a/controllers/status.go b/controllers/status.go index 09934c28..21c43335 100644 --- a/controllers/status.go +++ b/controllers/status.go @@ -346,6 +346,31 @@ func (s *status) updateTopicsOnRelease(releasedTopic string) { s.realtime = removeFromSlice(s.realtime, releasedTopic) } +// moveTopicToReloading moves a topic from realtime back to reloading state. +// This is used when release fails due to missing reload table, allowing +// the topic to be reprocessed by the reload sink group. +func (s *status) moveTopicToReloading(topic string) { + klog.V(2).Infof("rsk/%s moving topic %s from realtime to reloading", s.rsk.Name, topic) + + // Remove from realtime + s.realtime = removeFromSlice(s.realtime, topic) + + // Add to reloading + s.reloading = appendIfMissing(s.reloading, topic) + + // Clear the cached loader offset so realtime calculator doesn't use stale data + s.deleteLoaderTopicGroupCurrentOffset(topic) + + // Update the mask status for this topic to Reloading + if s.rsk.Status.MaskStatus != nil && s.rsk.Status.MaskStatus.CurrentMaskStatus != nil { + if topicStatus, ok := s.rsk.Status.MaskStatus.CurrentMaskStatus[topic]; ok { + topicStatus.Phase = tipocav1.MaskReloading + s.rsk.Status.MaskStatus.CurrentMaskStatus[topic] = topicStatus + klog.V(2).Infof("rsk/%s updated topic %s phase to Reloading", s.rsk.Name, topic) + } + } +} + func (s *status) computerCurrentMaskStatus() map[string]tipocav1.TopicMaskStatus { topicsReleased := toMap(s.released) topicsRealtime := toMap(s.realtime)