From 3f40660f41e959c27e993f9f25b29684c1c29971 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Thu, 31 Jul 2025 20:54:51 +0000 Subject: [PATCH 01/18] refactor: change way of creating empty slice --- internal/utils/time.go | 2 +- internal/utils/time_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/internal/utils/time.go b/internal/utils/time.go index 2cc3ccf..86eb431 100644 --- a/internal/utils/time.go +++ b/internal/utils/time.go @@ -15,7 +15,7 @@ func ParseNotifTimes(str string) []int { var intSlice []int if str == "" { - return []int{} + return make([]int, 0) } // use fields() and join() to get rid of all whitespace diff --git a/internal/utils/time_test.go b/internal/utils/time_test.go index 8d7c1cd..e370ecf 100644 --- a/internal/utils/time_test.go +++ b/internal/utils/time_test.go @@ -48,7 +48,7 @@ func TestParseNotifTimes(t *testing.T) { { name: "empty string", input: "", - expected: []int{}, // Assuming empty string results in an empty slice + expected: make([]int, 0), // Assuming empty string results in an empty slice }, } From edbc21a8a246143270a7ad847ccb7fa7460e3b15 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Thu, 31 Jul 2025 20:55:58 +0000 Subject: [PATCH 02/18] feat: add config value for namespace label Namespace name and label will both be used to filter for namespaces. Either of them can be disabled, increasing flexibility when filtering for namespaces. Tests and functionality to come in later commits. --- cmd/controller/main.go | 1 + cmd/scheduler/main.go | 1 + internal/structure/configs.go | 2 ++ manifests/controller/controller_config.yaml | 1 + manifests/scheduler/scheduler_config.yaml | 1 + 5 files changed, 6 insertions(+) diff --git a/cmd/controller/main.go b/cmd/controller/main.go index aa8bd5a..3f0d9c7 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -30,6 +30,7 @@ func main() { cfg := structInternal.ControllerConfig{ Namespace: os.Getenv("NAMESPACE"), + NsLabel: os.Getenv("NS_LABEL"), TimeLabel: os.Getenv("TIME_LABEL"), NotifLabel: os.Getenv("NOTIF_LABEL"), TimeFormat: os.Getenv("TIME_FORMAT"), diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index f7ffd77..6114a28 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -26,6 +26,7 @@ func main() { // there is also a config for the controller cfg := structInternal.SchedulerConfig{ Namespace: os.Getenv("NAMESPACE"), + NsLabel: os.Getenv("NS_LABEL"), TimeLabel: os.Getenv("TIME_LABEL"), NotifLabel: os.Getenv("NOTIF_LABEL"), TimeFormat: os.Getenv("TIME_FORMAT"), diff --git a/internal/structure/configs.go b/internal/structure/configs.go index 005526a..e04218d 100644 --- a/internal/structure/configs.go +++ b/internal/structure/configs.go @@ -29,6 +29,7 @@ API_KEY: "Random APIKEY", type ControllerConfig struct { Namespace string + NsLabel string TimeLabel string NotifLabel string TimeFormat string @@ -38,6 +39,7 @@ type ControllerConfig struct { type SchedulerConfig struct { Namespace string + NsLabel string TimeLabel string NotifLabel string TimeFormat string diff --git a/manifests/controller/controller_config.yaml b/manifests/controller/controller_config.yaml index e8f9bb4..619b85a 100644 --- a/manifests/controller/controller_config.yaml +++ b/manifests/controller/controller_config.yaml @@ -6,6 +6,7 @@ metadata: namespace: das data: NAMESPACE: "anray-liu" + NS_LABEL: "app.kubernetes.io/part-of=kubeflow-profile" TIME_LABEL: "volume-cleaner/unattached-time" NOTIF_LABEL: "volume-cleaner/notification-count" TIME_FORMAT: "2006-01-02_15-04-05Z" diff --git a/manifests/scheduler/scheduler_config.yaml b/manifests/scheduler/scheduler_config.yaml index ad7ed2d..b89b7fa 100644 --- a/manifests/scheduler/scheduler_config.yaml +++ b/manifests/scheduler/scheduler_config.yaml @@ -6,6 +6,7 @@ metadata: namespace: das data: NAMESPACE: "anray-liu" + NS_LABEL: "app.kubernetes.io/part-of=kubeflow-profile" TIME_LABEL: "volume-cleaner/unattached-time" NOTIF_LABEL: "volume-cleaner/notification-count" GRACE_PERIOD: "7" From 436de778adb3aa453f10ede2146279693b9fcec5 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Thu, 31 Jul 2025 20:58:29 +0000 Subject: [PATCH 03/18] refactor: very minor code clean up --- internal/utils/email.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/internal/utils/email.go b/internal/utils/email.go index 9c7712e..2c2c01c 100644 --- a/internal/utils/email.go +++ b/internal/utils/email.go @@ -74,19 +74,16 @@ func SendNotif(client *http.Client, conf structInternal.EmailConfig, email strin // given a pvc, this function will aquire the details related to the pvc such as the owner of the pvc, their email, the bounded volume name and ID, and details about its deletion func EmailDetails(kube kubernetes.Interface, pvc corev1.PersistentVolumeClaim, gracePeriod int) (string, structInternal.Personalisation) { - ns := pvc.Namespace - vol := pvc.Spec.VolumeName - // Acquire User Email - email := nsEmail(kube, ns) + email := nsEmail(kube, pvc.Namespace) // Calculate DeletionDate now := time.Now() futureTime := now.Add(time.Duration(gracePeriod) * 24 * time.Hour) personal := structInternal.Personalisation{ - Name: ns, - VolumeName: vol, + Name: pvc.Namespace, + VolumeName: pvc.Spec.VolumeName, GracePeriod: fmt.Sprintf("%d", gracePeriod), DeletionDate: futureTime.Format(time.UnixDate), } From ff85926c2408fc837e739ef07a4b5e4285358497 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Thu, 31 Jul 2025 21:02:24 +0000 Subject: [PATCH 04/18] feat: add filter with namespace label --- internal/kubernetes/finder.go | 142 ++++++++++++++------------ internal/kubernetes/retriever.go | 6 +- internal/kubernetes/retriever_test.go | 2 +- internal/kubernetes/watcher.go | 11 +- 4 files changed, 87 insertions(+), 74 deletions(-) diff --git a/internal/kubernetes/finder.go b/internal/kubernetes/finder.go index 44a2432..8ce2886 100644 --- a/internal/kubernetes/finder.go +++ b/internal/kubernetes/finder.go @@ -34,101 +34,107 @@ func FindStale(kube kubernetes.Interface, cfg structInternal.SchedulerConfig) (i log.Print("[INFO] Scanning for stale PVCS...") // iterate through all pvcs in configured namespace(s) - - for _, pvc := range PvcList(kube, cfg.Namespace) { - log.Printf("[INFO] Found PVC %s from NS %s", pvc.Name, pvc.Namespace) - - // check if label exists (meaning pvc is unattached) - // if pvc is attached to a sts, it would've had its label removed by the controller - - timestamp, ok := pvc.Labels[cfg.TimeLabel] - if !ok { - log.Printf("[INFO] Label %s not found. Skipping.", cfg.TimeLabel) + for _, ns := range NsList(kube, cfg.NsLabel) { + // skip if not in configured namespace + if ns.Name != cfg.Namespace && cfg.Namespace != "" { continue } - // check if pvc should be deleted - stale, staleError := IsStale(timestamp, cfg.TimeFormat, cfg.GracePeriod) - if staleError != nil { - log.Printf("[ERROR] Failed to parse timestamp: %s", staleError) - errCount++ - continue - } + for _, pvc := range PvcList(kube, ns.Name) { + log.Printf("[INFO] Found PVC %s from NS %s", pvc.Name, pvc.Namespace) - // stale means grace period has passed, can be deleted - if stale { - if cfg.DryRun { - log.Printf("[DRY RUN] Delete PVC %s", pvc.Name) - deleteCount++ + // check if label exists (meaning pvc is unattached) + // if pvc is attached to a sts, it would've had its label removed by the controller + + timestamp, ok := pvc.Labels[cfg.TimeLabel] + if !ok { + log.Printf("[INFO] Label %s not found. Skipping.", cfg.TimeLabel) continue } - err := kube.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{}) - if err != nil { - log.Printf("[ERROR] Failed to delete PVC %s: %s", pvc.Name, err) + // check if pvc should be deleted + stale, staleError := IsStale(timestamp, cfg.TimeFormat, cfg.GracePeriod) + if staleError != nil { + log.Printf("[ERROR] Failed to parse timestamp: %s", staleError) errCount++ continue } - log.Print("[INFO] PVC successfully deleted.") - deleteCount++ - - } else { - // not stale yet, handle email logic here - - log.Print("[INFO] Grace period not passed.") + // stale means grace period has passed, can be deleted + if stale { + if cfg.DryRun { + log.Printf("[DRY RUN] Delete PVC %s", pvc.Name) + deleteCount++ + continue + } - notifCount, ok := pvc.Labels[cfg.NotifLabel] - if !ok { - log.Printf("[INFO] Label %s not found. Skipping.", cfg.NotifLabel) - errCount++ - continue - } + err := kube.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{}) + if err != nil { + log.Printf("[ERROR] Failed to delete PVC %s: %s", pvc.Name, err) + errCount++ + continue + } - currNotif, countErr := strconv.Atoi(notifCount) - if countErr != nil { - log.Printf("[ERROR] Failed to parse notification count: %v", countErr) - errCount++ - continue - } + log.Print("[INFO] PVC successfully deleted.") + deleteCount++ - if len(cfg.NotifTimes) == 0 { - continue - } + } else { + // not stale yet, handle email logic here - shouldSend, mailError := ShouldSendMail(timestamp, currNotif, cfg) - if mailError != nil { - log.Printf("[ERROR] Failed to parse timestamp: %s", mailError) - errCount++ - continue - } + log.Print("[INFO] Grace period not passed.") - if shouldSend { - if cfg.DryRun { - log.Print("[DRY RUN] Email owner.") - emailCount++ + notifCount, ok := pvc.Labels[cfg.NotifLabel] + if !ok { + log.Printf("[INFO] Label %s not found. Skipping.", cfg.NotifLabel) + errCount++ continue } - // personal consists of details passed into the email template as variables while email is - // the email address that is consistent regardless of the template + currNotif, countErr := strconv.Atoi(notifCount) + if countErr != nil { + log.Printf("[ERROR] Failed to parse notification count: %v", countErr) + errCount++ + continue + } - email, personal := utilsInternal.EmailDetails(kube, pvc, cfg.GracePeriod) + if len(cfg.NotifTimes) == 0 { + continue + } - err := utilsInternal.SendNotif(client, cfg.EmailCfg, email, personal) - if err != nil { - log.Printf("[Error] Unable to send an email to %s at %s", personal.Name, email) + shouldSend, mailError := ShouldSendMail(timestamp, currNotif, cfg) + if mailError != nil { + log.Printf("[ERROR] Failed to parse timestamp: %s", mailError) errCount++ continue } - // Update Email Count - emailCount++ + if shouldSend { + if cfg.DryRun { + log.Print("[DRY RUN] Email owner.") + emailCount++ + continue + } + + // personal consists of details passed into the email template as variables while email is + // the email address that is consistent regardless of the template - // Increment notification count by 1 - newNotifCount := strconv.Itoa(currNotif + 1) - SetPvcLabel(kube, cfg.NotifLabel, newNotifCount, pvc.Namespace, pvc.Name) + email, personal := utilsInternal.EmailDetails(kube, pvc, cfg.GracePeriod) + err := utilsInternal.SendNotif(client, cfg.EmailCfg, email, personal) + if err != nil { + log.Printf("[Error] Unable to send an email to %s at %s", personal.Name, email) + errCount++ + continue + } + + // Update Email Count + emailCount++ + + // Increment notification count by 1 + newNotifCount := strconv.Itoa(currNotif + 1) + SetPvcLabel(kube, cfg.NotifLabel, newNotifCount, pvc.Namespace, pvc.Name) + + } } } } diff --git a/internal/kubernetes/retriever.go b/internal/kubernetes/retriever.go index 89dec16..b81a2c4 100644 --- a/internal/kubernetes/retriever.go +++ b/internal/kubernetes/retriever.go @@ -17,9 +17,9 @@ import ( // returns a slice of corev1.Namespace structs -func NsList(kube kubernetes.Interface) []corev1.Namespace { +func NsList(kube kubernetes.Interface, label string) []corev1.Namespace { ns, err := kube.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/part-of=kubeflow-profile", + LabelSelector: label, }) if err != nil { // nothing can be done without namespaces so crash the program @@ -74,7 +74,7 @@ func FindUnattachedPVCs(kube kubernetes.Interface, cfg structInternal.Controller log.Print("[INFO] Scanning namespaces...") - for _, namespace := range NsList(kube) { + for _, namespace := range NsList(kube, cfg.NsLabel) { // skip if not in configured namespace if namespace.Name != cfg.Namespace && cfg.Namespace != "" { continue diff --git a/internal/kubernetes/retriever_test.go b/internal/kubernetes/retriever_test.go index 5f1f3ac..6b68314 100644 --- a/internal/kubernetes/retriever_test.go +++ b/internal/kubernetes/retriever_test.go @@ -31,7 +31,7 @@ func TestNsList(t *testing.T) { } } - list := NsList(kube) + list := NsList(kube, "app.kubernetes.io/part-of=kubeflow-profile") // check right length assert.Equal(t, len(list), len(names)) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index ed600eb..c36f9bf 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -19,7 +19,9 @@ import ( // Watches for when statefulsets are created or deleted func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) { - watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{}) + watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{ + LabelSelector: cfg.NsLabel, + }) if err != nil { log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) } @@ -89,7 +91,12 @@ func InitialScan(kube kubernetes.Interface, cfg structInternal.ControllerConfig) func ResetLabels(kube kubernetes.Interface, cfg structInternal.ControllerConfig) { log.Print("Resetting labels...") - for _, namespace := range NsList(kube) { + for _, namespace := range NsList(kube, cfg.NsLabel) { + // skip if not in configured namespace + if namespace.Name != cfg.Namespace && cfg.Namespace != "" { + continue + } + for _, pvc := range PvcList(kube, namespace.Name) { _, ok := pvc.Labels[cfg.TimeLabel] if ok { From 9ef8d40309c97a07f9f2d9983539d36cef6310ca Mon Sep 17 00:00:00 2001 From: anrayliu Date: Fri, 1 Aug 2025 13:04:51 +0000 Subject: [PATCH 05/18] fix(cicd): add label to namespace --- .github/workflows/integration-tests.yaml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index fd8f005..9cc209e 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -43,6 +43,10 @@ jobs: -f manifests/serviceaccount.yaml \ -f manifests/netpol.yaml + - name: Label namespace + run: | + kubectl label namespace das app.kubernetes.io/part-of=kubeflow-profile + - name: Apply configs run: | kubectl apply -f manifests/controller/controller_config.yaml \ From 276e6098c50fc00e968f7bfffd003b07e6f75c04 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Fri, 1 Aug 2025 13:31:03 +0000 Subject: [PATCH 06/18] test(cicd): debug integration tests --- .github/workflows/integration-tests.yaml | 2 ++ internal/kubernetes/watcher.go | 4 +--- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 9cc209e..2929ad2 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -93,6 +93,8 @@ jobs: run: | kubectl apply -f testing/sts.yaml sleep 5 + kubectl logs -l app=volume-cleaner-controller -n das --tail 500 + kubectl get pvc pvc1 -n anray-liu -o yaml - name: Test label removed run: | diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index c36f9bf..4e021b1 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -19,9 +19,7 @@ import ( // Watches for when statefulsets are created or deleted func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) { - watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{ - LabelSelector: cfg.NsLabel, - }) + watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{}) if err != nil { log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) } From 60f53e65d7ff46724c87579da277366e0185c4e7 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Fri, 1 Aug 2025 13:37:16 +0000 Subject: [PATCH 07/18] test(cicd): debug integration tests --- internal/kubernetes/watcher.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index 4e021b1..c36f9bf 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -19,7 +19,9 @@ import ( // Watches for when statefulsets are created or deleted func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) { - watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{}) + watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{ + LabelSelector: cfg.NsLabel, + }) if err != nil { log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) } From d98b479515d097a56c8153bb43e3e1052274c1e7 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Fri, 1 Aug 2025 13:52:46 +0000 Subject: [PATCH 08/18] test(cicd): debug integration tests --- .github/workflows/integration-tests.yaml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index 2929ad2..4f2ad7e 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -45,7 +45,7 @@ jobs: - name: Label namespace run: | - kubectl label namespace das app.kubernetes.io/part-of=kubeflow-profile + kubectl label namespace anray-liu app.kubernetes.io/part-of=kubeflow-profile - name: Apply configs run: | @@ -95,6 +95,7 @@ jobs: sleep 5 kubectl logs -l app=volume-cleaner-controller -n das --tail 500 kubectl get pvc pvc1 -n anray-liu -o yaml + kubectl get ns anray-liu -o yaml - name: Test label removed run: | From c91f064a37505f103d81786df09b955a9b2a9d90 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Fri, 1 Aug 2025 14:06:21 +0000 Subject: [PATCH 09/18] test(cicd): debug integration tests --- internal/kubernetes/watcher.go | 69 +++++++++++++++++++--------------- 1 file changed, 38 insertions(+), 31 deletions(-) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index c36f9bf..22b635f 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -19,47 +19,54 @@ import ( // Watches for when statefulsets are created or deleted func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) { - watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{ - LabelSelector: cfg.NsLabel, - }) - if err != nil { - log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) - } + // iterate through all pvcs in configured namespace(s) + for _, ns := range NsList(kube, cfg.NsLabel) { + // skip if not in configured namespace + if ns.Name != cfg.Namespace && cfg.Namespace != "" { + continue + } + + go func() { + watcher, err := kube.AppsV1().StatefulSets(ns.Name).Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) + } - log.Print("[INFO] Watching for statefulset events...") + log.Print("[INFO] Watching for statefulset events...") - // create a channel to capture sts events in the cluster - events := watcher.ResultChan() + // create a channel to capture sts events in the cluster + events := watcher.ResultChan() - for { - select { + for { + select { - // context used to kill loop - // used during unit tests - case <-ctx.Done(): - return + // context used to kill loop + // used during unit tests + case <-ctx.Done(): + return - // sts was added or deleted - case event := <-events: - sts, ok := event.Object.(*appsv1.StatefulSet) + // sts was added or deleted + case event := <-events: + sts, ok := event.Object.(*appsv1.StatefulSet) - // Skip this event if it can't be parsed into a sts - if !ok { - continue - } + // Skip this event if it can't be parsed into a sts + if !ok { + continue + } - switch event.Type { + switch event.Type { - case watch.Added: - // sts added - handleAdded(kube, cfg, sts) - case watch.Deleted: - // sts deleted - handleDeleted(kube, cfg, sts) + case watch.Added: + // sts added + handleAdded(kube, cfg, sts) + case watch.Deleted: + // sts deleted + handleDeleted(kube, cfg, sts) + } + } } - } + }() } - } // scan performed on controller startup to find unattached pvcs and assign labels to them From f5a3acba78e9d674b76ef90322342385fc9537a7 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 12:51:08 +0000 Subject: [PATCH 10/18] feat: use wait group --- internal/kubernetes/watcher.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index 22b635f..114c9dd 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -4,6 +4,7 @@ import ( // standard packages "context" "log" + "sync" "time" // external packages @@ -19,6 +20,8 @@ import ( // Watches for when statefulsets are created or deleted func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) { + var wg sync.WaitGroup + // iterate through all pvcs in configured namespace(s) for _, ns := range NsList(kube, cfg.NsLabel) { // skip if not in configured namespace @@ -26,7 +29,10 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal continue } + wg.Add(1) go func() { + defer wg.Done() + watcher, err := kube.AppsV1().StatefulSets(ns.Name).Watch(ctx, metav1.ListOptions{}) if err != nil { log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) From f3610eabd5bb21c1f685b95c68511047a66ea1df Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 12:57:22 +0000 Subject: [PATCH 11/18] test(cicd): add debug logs --- internal/kubernetes/watcher.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index 114c9dd..c9cd3f3 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -31,6 +31,7 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal wg.Add(1) go func() { + defer log.Printf("[INFO] Watcher for NS %s finished.", ns.Name) defer wg.Done() watcher, err := kube.AppsV1().StatefulSets(ns.Name).Watch(ctx, metav1.ListOptions{}) @@ -38,7 +39,7 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) } - log.Print("[INFO] Watching for statefulset events...") + log.Printf("[INFO] Watching NS %s for statefulset events...", ns.Name) // create a channel to capture sts events in the cluster events := watcher.ResultChan() From 19a718506f6ae284d3a3a3d761f8462d61331450 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 13:04:01 +0000 Subject: [PATCH 12/18] test(cicd): add debug log --- internal/kubernetes/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index c9cd3f3..b6d1c49 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -24,6 +24,7 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal // iterate through all pvcs in configured namespace(s) for _, ns := range NsList(kube, cfg.NsLabel) { + log.Println(ns.Name) // skip if not in configured namespace if ns.Name != cfg.Namespace && cfg.Namespace != "" { continue From 2299d7c8aab994cdac1fd4a704cd8ee8e9643016 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 13:14:17 +0000 Subject: [PATCH 13/18] test(cicd): add debug log --- internal/kubernetes/watcher.go | 1 + 1 file changed, 1 insertion(+) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index b6d1c49..b7083f8 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -25,6 +25,7 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal // iterate through all pvcs in configured namespace(s) for _, ns := range NsList(kube, cfg.NsLabel) { log.Println(ns.Name) + log.Println(cfg.Namespace) // skip if not in configured namespace if ns.Name != cfg.Namespace && cfg.Namespace != "" { continue From 5dd0efa6e9dd5849d630d8b2fbb8356e500f6706 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 13:20:39 +0000 Subject: [PATCH 14/18] test(cicd): add debug log --- internal/kubernetes/watcher.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index b7083f8..4295528 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -31,6 +31,8 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal continue } + log.Println("starting thread") + wg.Add(1) go func() { defer log.Printf("[INFO] Watcher for NS %s finished.", ns.Name) From acdac47c93d262aafd88cdf19939272ebe9d657c Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 17:28:19 +0000 Subject: [PATCH 15/18] test(cicd): add debug log --- internal/kubernetes/watcher.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index 4295528..bb5af28 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -35,6 +35,7 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal wg.Add(1) go func() { + log.Print("inside thread") defer log.Printf("[INFO] Watcher for NS %s finished.", ns.Name) defer wg.Done() @@ -77,6 +78,8 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal } } }() + + log.Print("outside thread") } } From 29f9408b7e0571183dadc1f7825d3b1c1027ca05 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 17:34:19 +0000 Subject: [PATCH 16/18] fix(cicd): add wait group wait --- internal/kubernetes/watcher.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index bb5af28..45a1686 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -80,6 +80,8 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal }() log.Print("outside thread") + + wg.Wait() } } From 5912354cf58e4107ddc033013840a8054600abcd Mon Sep 17 00:00:00 2001 From: anrayliu Date: Tue, 5 Aug 2025 17:58:13 +0000 Subject: [PATCH 17/18] fix: remove debug logs --- internal/kubernetes/watcher.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index 45a1686..3af4ea9 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -24,18 +24,13 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal // iterate through all pvcs in configured namespace(s) for _, ns := range NsList(kube, cfg.NsLabel) { - log.Println(ns.Name) - log.Println(cfg.Namespace) // skip if not in configured namespace if ns.Name != cfg.Namespace && cfg.Namespace != "" { continue } - log.Println("starting thread") - wg.Add(1) go func() { - log.Print("inside thread") defer log.Printf("[INFO] Watcher for NS %s finished.", ns.Name) defer wg.Done() @@ -79,8 +74,6 @@ func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal } }() - log.Print("outside thread") - wg.Wait() } } From d3944f62bb42a493c8424354918fc849140d3ce6 Mon Sep 17 00:00:00 2001 From: anrayliu Date: Wed, 6 Aug 2025 20:31:32 +0000 Subject: [PATCH 18/18] feat(tests): add test for label filter --- internal/kubernetes/retriever_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/internal/kubernetes/retriever_test.go b/internal/kubernetes/retriever_test.go index 6b68314..2c6d023 100644 --- a/internal/kubernetes/retriever_test.go +++ b/internal/kubernetes/retriever_test.go @@ -31,6 +31,8 @@ func TestNsList(t *testing.T) { } } + // check with correct label + list := NsList(kube, "app.kubernetes.io/part-of=kubeflow-profile") // check right length @@ -41,6 +43,22 @@ func TestNsList(t *testing.T) { assert.Equal(t, ns.Name, names[i]) } + // check with incorrect label + + list = NsList(kube, "bad-label") + + assert.Equal(t, len(list), 0) + + // check with empty label + + list = NsList(kube, "") + + assert.Equal(t, len(list), len(names)) + + for i, ns := range list { + assert.Equal(t, ns.Name, names[i]) + } + }) }