diff --git a/.github/workflows/integration-tests.yaml b/.github/workflows/integration-tests.yaml index fd8f005..4f2ad7e 100644 --- a/.github/workflows/integration-tests.yaml +++ b/.github/workflows/integration-tests.yaml @@ -43,6 +43,10 @@ jobs: -f manifests/serviceaccount.yaml \ -f manifests/netpol.yaml + - name: Label namespace + run: | + kubectl label namespace anray-liu app.kubernetes.io/part-of=kubeflow-profile + - name: Apply configs run: | kubectl apply -f manifests/controller/controller_config.yaml \ @@ -89,6 +93,9 @@ jobs: run: | kubectl apply -f testing/sts.yaml sleep 5 + kubectl logs -l app=volume-cleaner-controller -n das --tail 500 + kubectl get pvc pvc1 -n anray-liu -o yaml + kubectl get ns anray-liu -o yaml - name: Test label removed run: | diff --git a/cmd/controller/main.go b/cmd/controller/main.go index aa8bd5a..3f0d9c7 100644 --- a/cmd/controller/main.go +++ b/cmd/controller/main.go @@ -30,6 +30,7 @@ func main() { cfg := structInternal.ControllerConfig{ Namespace: os.Getenv("NAMESPACE"), + NsLabel: os.Getenv("NS_LABEL"), TimeLabel: os.Getenv("TIME_LABEL"), NotifLabel: os.Getenv("NOTIF_LABEL"), TimeFormat: os.Getenv("TIME_FORMAT"), diff --git a/cmd/scheduler/main.go b/cmd/scheduler/main.go index f7ffd77..6114a28 100644 --- a/cmd/scheduler/main.go +++ b/cmd/scheduler/main.go @@ -26,6 +26,7 @@ func main() { // there is also a config for the controller cfg := structInternal.SchedulerConfig{ Namespace: os.Getenv("NAMESPACE"), + NsLabel: os.Getenv("NS_LABEL"), TimeLabel: os.Getenv("TIME_LABEL"), NotifLabel: os.Getenv("NOTIF_LABEL"), TimeFormat: os.Getenv("TIME_FORMAT"), diff --git a/internal/kubernetes/finder.go b/internal/kubernetes/finder.go index 44a2432..8ce2886 100644 --- a/internal/kubernetes/finder.go +++ b/internal/kubernetes/finder.go @@ -34,101 +34,107 @@ func FindStale(kube kubernetes.Interface, cfg structInternal.SchedulerConfig) (i log.Print("[INFO] Scanning for stale PVCS...") // iterate through all pvcs in configured namespace(s) - - for _, pvc := range PvcList(kube, cfg.Namespace) { - log.Printf("[INFO] Found PVC %s from NS %s", pvc.Name, pvc.Namespace) - - // check if label exists (meaning pvc is unattached) - // if pvc is attached to a sts, it would've had its label removed by the controller - - timestamp, ok := pvc.Labels[cfg.TimeLabel] - if !ok { - log.Printf("[INFO] Label %s not found. Skipping.", cfg.TimeLabel) + for _, ns := range NsList(kube, cfg.NsLabel) { + // skip if not in configured namespace + if ns.Name != cfg.Namespace && cfg.Namespace != "" { continue } - // check if pvc should be deleted - stale, staleError := IsStale(timestamp, cfg.TimeFormat, cfg.GracePeriod) - if staleError != nil { - log.Printf("[ERROR] Failed to parse timestamp: %s", staleError) - errCount++ - continue - } + for _, pvc := range PvcList(kube, ns.Name) { + log.Printf("[INFO] Found PVC %s from NS %s", pvc.Name, pvc.Namespace) - // stale means grace period has passed, can be deleted - if stale { - if cfg.DryRun { - log.Printf("[DRY RUN] Delete PVC %s", pvc.Name) - deleteCount++ + // check if label exists (meaning pvc is unattached) + // if pvc is attached to a sts, it would've had its label removed by the controller + + timestamp, ok := pvc.Labels[cfg.TimeLabel] + if !ok { + log.Printf("[INFO] Label %s not found. Skipping.", cfg.TimeLabel) continue } - err := kube.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{}) - if err != nil { - log.Printf("[ERROR] Failed to delete PVC %s: %s", pvc.Name, err) + // check if pvc should be deleted + stale, staleError := IsStale(timestamp, cfg.TimeFormat, cfg.GracePeriod) + if staleError != nil { + log.Printf("[ERROR] Failed to parse timestamp: %s", staleError) errCount++ continue } - log.Print("[INFO] PVC successfully deleted.") - deleteCount++ - - } else { - // not stale yet, handle email logic here - - log.Print("[INFO] Grace period not passed.") + // stale means grace period has passed, can be deleted + if stale { + if cfg.DryRun { + log.Printf("[DRY RUN] Delete PVC %s", pvc.Name) + deleteCount++ + continue + } - notifCount, ok := pvc.Labels[cfg.NotifLabel] - if !ok { - log.Printf("[INFO] Label %s not found. Skipping.", cfg.NotifLabel) - errCount++ - continue - } + err := kube.CoreV1().PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, metav1.DeleteOptions{}) + if err != nil { + log.Printf("[ERROR] Failed to delete PVC %s: %s", pvc.Name, err) + errCount++ + continue + } - currNotif, countErr := strconv.Atoi(notifCount) - if countErr != nil { - log.Printf("[ERROR] Failed to parse notification count: %v", countErr) - errCount++ - continue - } + log.Print("[INFO] PVC successfully deleted.") + deleteCount++ - if len(cfg.NotifTimes) == 0 { - continue - } + } else { + // not stale yet, handle email logic here - shouldSend, mailError := ShouldSendMail(timestamp, currNotif, cfg) - if mailError != nil { - log.Printf("[ERROR] Failed to parse timestamp: %s", mailError) - errCount++ - continue - } + log.Print("[INFO] Grace period not passed.") - if shouldSend { - if cfg.DryRun { - log.Print("[DRY RUN] Email owner.") - emailCount++ + notifCount, ok := pvc.Labels[cfg.NotifLabel] + if !ok { + log.Printf("[INFO] Label %s not found. Skipping.", cfg.NotifLabel) + errCount++ continue } - // personal consists of details passed into the email template as variables while email is - // the email address that is consistent regardless of the template + currNotif, countErr := strconv.Atoi(notifCount) + if countErr != nil { + log.Printf("[ERROR] Failed to parse notification count: %v", countErr) + errCount++ + continue + } - email, personal := utilsInternal.EmailDetails(kube, pvc, cfg.GracePeriod) + if len(cfg.NotifTimes) == 0 { + continue + } - err := utilsInternal.SendNotif(client, cfg.EmailCfg, email, personal) - if err != nil { - log.Printf("[Error] Unable to send an email to %s at %s", personal.Name, email) + shouldSend, mailError := ShouldSendMail(timestamp, currNotif, cfg) + if mailError != nil { + log.Printf("[ERROR] Failed to parse timestamp: %s", mailError) errCount++ continue } - // Update Email Count - emailCount++ + if shouldSend { + if cfg.DryRun { + log.Print("[DRY RUN] Email owner.") + emailCount++ + continue + } + + // personal consists of details passed into the email template as variables while email is + // the email address that is consistent regardless of the template - // Increment notification count by 1 - newNotifCount := strconv.Itoa(currNotif + 1) - SetPvcLabel(kube, cfg.NotifLabel, newNotifCount, pvc.Namespace, pvc.Name) + email, personal := utilsInternal.EmailDetails(kube, pvc, cfg.GracePeriod) + err := utilsInternal.SendNotif(client, cfg.EmailCfg, email, personal) + if err != nil { + log.Printf("[Error] Unable to send an email to %s at %s", personal.Name, email) + errCount++ + continue + } + + // Update Email Count + emailCount++ + + // Increment notification count by 1 + newNotifCount := strconv.Itoa(currNotif + 1) + SetPvcLabel(kube, cfg.NotifLabel, newNotifCount, pvc.Namespace, pvc.Name) + + } } } } diff --git a/internal/kubernetes/retriever.go b/internal/kubernetes/retriever.go index 89dec16..b81a2c4 100644 --- a/internal/kubernetes/retriever.go +++ b/internal/kubernetes/retriever.go @@ -17,9 +17,9 @@ import ( // returns a slice of corev1.Namespace structs -func NsList(kube kubernetes.Interface) []corev1.Namespace { +func NsList(kube kubernetes.Interface, label string) []corev1.Namespace { ns, err := kube.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{ - LabelSelector: "app.kubernetes.io/part-of=kubeflow-profile", + LabelSelector: label, }) if err != nil { // nothing can be done without namespaces so crash the program @@ -74,7 +74,7 @@ func FindUnattachedPVCs(kube kubernetes.Interface, cfg structInternal.Controller log.Print("[INFO] Scanning namespaces...") - for _, namespace := range NsList(kube) { + for _, namespace := range NsList(kube, cfg.NsLabel) { // skip if not in configured namespace if namespace.Name != cfg.Namespace && cfg.Namespace != "" { continue diff --git a/internal/kubernetes/retriever_test.go b/internal/kubernetes/retriever_test.go index 5f1f3ac..2c6d023 100644 --- a/internal/kubernetes/retriever_test.go +++ b/internal/kubernetes/retriever_test.go @@ -31,7 +31,9 @@ func TestNsList(t *testing.T) { } } - list := NsList(kube) + // check with correct label + + list := NsList(kube, "app.kubernetes.io/part-of=kubeflow-profile") // check right length assert.Equal(t, len(list), len(names)) @@ -41,6 +43,22 @@ func TestNsList(t *testing.T) { assert.Equal(t, ns.Name, names[i]) } + // check with incorrect label + + list = NsList(kube, "bad-label") + + assert.Equal(t, len(list), 0) + + // check with empty label + + list = NsList(kube, "") + + assert.Equal(t, len(list), len(names)) + + for i, ns := range list { + assert.Equal(t, ns.Name, names[i]) + } + }) } diff --git a/internal/kubernetes/watcher.go b/internal/kubernetes/watcher.go index ed600eb..3af4ea9 100644 --- a/internal/kubernetes/watcher.go +++ b/internal/kubernetes/watcher.go @@ -4,6 +4,7 @@ import ( // standard packages "context" "log" + "sync" "time" // external packages @@ -19,45 +20,62 @@ import ( // Watches for when statefulsets are created or deleted func WatchSts(ctx context.Context, kube kubernetes.Interface, cfg structInternal.ControllerConfig) { - watcher, err := kube.AppsV1().StatefulSets(cfg.Namespace).Watch(ctx, metav1.ListOptions{}) - if err != nil { - log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) - } + var wg sync.WaitGroup + + // iterate through all pvcs in configured namespace(s) + for _, ns := range NsList(kube, cfg.NsLabel) { + // skip if not in configured namespace + if ns.Name != cfg.Namespace && cfg.Namespace != "" { + continue + } - log.Print("[INFO] Watching for statefulset events...") + wg.Add(1) + go func() { + defer log.Printf("[INFO] Watcher for NS %s finished.", ns.Name) + defer wg.Done() - // create a channel to capture sts events in the cluster - events := watcher.ResultChan() + watcher, err := kube.AppsV1().StatefulSets(ns.Name).Watch(ctx, metav1.ListOptions{}) + if err != nil { + log.Fatalf("[ERROR] Failed to create watcher for statefulsets: %s", err) + } - for { - select { + log.Printf("[INFO] Watching NS %s for statefulset events...", ns.Name) - // context used to kill loop - // used during unit tests - case <-ctx.Done(): - return + // create a channel to capture sts events in the cluster + events := watcher.ResultChan() - // sts was added or deleted - case event := <-events: - sts, ok := event.Object.(*appsv1.StatefulSet) + for { + select { - // Skip this event if it can't be parsed into a sts - if !ok { - continue - } + // context used to kill loop + // used during unit tests + case <-ctx.Done(): + return + + // sts was added or deleted + case event := <-events: + sts, ok := event.Object.(*appsv1.StatefulSet) - switch event.Type { + // Skip this event if it can't be parsed into a sts + if !ok { + continue + } - case watch.Added: - // sts added - handleAdded(kube, cfg, sts) - case watch.Deleted: - // sts deleted - handleDeleted(kube, cfg, sts) + switch event.Type { + + case watch.Added: + // sts added + handleAdded(kube, cfg, sts) + case watch.Deleted: + // sts deleted + handleDeleted(kube, cfg, sts) + } + } } - } - } + }() + wg.Wait() + } } // scan performed on controller startup to find unattached pvcs and assign labels to them @@ -89,7 +107,12 @@ func InitialScan(kube kubernetes.Interface, cfg structInternal.ControllerConfig) func ResetLabels(kube kubernetes.Interface, cfg structInternal.ControllerConfig) { log.Print("Resetting labels...") - for _, namespace := range NsList(kube) { + for _, namespace := range NsList(kube, cfg.NsLabel) { + // skip if not in configured namespace + if namespace.Name != cfg.Namespace && cfg.Namespace != "" { + continue + } + for _, pvc := range PvcList(kube, namespace.Name) { _, ok := pvc.Labels[cfg.TimeLabel] if ok { diff --git a/internal/structure/configs.go b/internal/structure/configs.go index 005526a..e04218d 100644 --- a/internal/structure/configs.go +++ b/internal/structure/configs.go @@ -29,6 +29,7 @@ API_KEY: "Random APIKEY", type ControllerConfig struct { Namespace string + NsLabel string TimeLabel string NotifLabel string TimeFormat string @@ -38,6 +39,7 @@ type ControllerConfig struct { type SchedulerConfig struct { Namespace string + NsLabel string TimeLabel string NotifLabel string TimeFormat string diff --git a/internal/utils/email.go b/internal/utils/email.go index 9c7712e..2c2c01c 100644 --- a/internal/utils/email.go +++ b/internal/utils/email.go @@ -74,19 +74,16 @@ func SendNotif(client *http.Client, conf structInternal.EmailConfig, email strin // given a pvc, this function will aquire the details related to the pvc such as the owner of the pvc, their email, the bounded volume name and ID, and details about its deletion func EmailDetails(kube kubernetes.Interface, pvc corev1.PersistentVolumeClaim, gracePeriod int) (string, structInternal.Personalisation) { - ns := pvc.Namespace - vol := pvc.Spec.VolumeName - // Acquire User Email - email := nsEmail(kube, ns) + email := nsEmail(kube, pvc.Namespace) // Calculate DeletionDate now := time.Now() futureTime := now.Add(time.Duration(gracePeriod) * 24 * time.Hour) personal := structInternal.Personalisation{ - Name: ns, - VolumeName: vol, + Name: pvc.Namespace, + VolumeName: pvc.Spec.VolumeName, GracePeriod: fmt.Sprintf("%d", gracePeriod), DeletionDate: futureTime.Format(time.UnixDate), } diff --git a/internal/utils/time.go b/internal/utils/time.go index 2cc3ccf..86eb431 100644 --- a/internal/utils/time.go +++ b/internal/utils/time.go @@ -15,7 +15,7 @@ func ParseNotifTimes(str string) []int { var intSlice []int if str == "" { - return []int{} + return make([]int, 0) } // use fields() and join() to get rid of all whitespace diff --git a/internal/utils/time_test.go b/internal/utils/time_test.go index 8d7c1cd..e370ecf 100644 --- a/internal/utils/time_test.go +++ b/internal/utils/time_test.go @@ -48,7 +48,7 @@ func TestParseNotifTimes(t *testing.T) { { name: "empty string", input: "", - expected: []int{}, // Assuming empty string results in an empty slice + expected: make([]int, 0), // Assuming empty string results in an empty slice }, } diff --git a/manifests/controller/controller_config.yaml b/manifests/controller/controller_config.yaml index e8f9bb4..619b85a 100644 --- a/manifests/controller/controller_config.yaml +++ b/manifests/controller/controller_config.yaml @@ -6,6 +6,7 @@ metadata: namespace: das data: NAMESPACE: "anray-liu" + NS_LABEL: "app.kubernetes.io/part-of=kubeflow-profile" TIME_LABEL: "volume-cleaner/unattached-time" NOTIF_LABEL: "volume-cleaner/notification-count" TIME_FORMAT: "2006-01-02_15-04-05Z" diff --git a/manifests/scheduler/scheduler_config.yaml b/manifests/scheduler/scheduler_config.yaml index ad7ed2d..b89b7fa 100644 --- a/manifests/scheduler/scheduler_config.yaml +++ b/manifests/scheduler/scheduler_config.yaml @@ -6,6 +6,7 @@ metadata: namespace: das data: NAMESPACE: "anray-liu" + NS_LABEL: "app.kubernetes.io/part-of=kubeflow-profile" TIME_LABEL: "volume-cleaner/unattached-time" NOTIF_LABEL: "volume-cleaner/notification-count" GRACE_PERIOD: "7"