From 4cb542a06faf0fe0be17968d8c682f66691966f1 Mon Sep 17 00:00:00 2001 From: Vladimir Iliakov Date: Tue, 13 Jan 2026 08:35:11 +0100 Subject: [PATCH 1/3] STAC-24078: Handle parallel execution --- ARCHITECTURE.md | 45 ++- README.md | 29 +- cmd/clickhouse/check_and_finalize.go | 6 +- cmd/clickhouse/restore.go | 12 +- cmd/cmdutils/common.go | 18 +- cmd/elasticsearch/check_and_finalize.go | 10 +- cmd/elasticsearch/restore.go | 13 +- cmd/settings/check_and_finalize.go | 2 +- cmd/settings/restore.go | 16 +- cmd/stackgraph/check_and_finalize.go | 2 +- cmd/stackgraph/restore.go | 16 +- cmd/victoriametrics/check_and_finalize.go | 2 +- cmd/victoriametrics/restore.go | 16 +- internal/clients/k8s/client.go | 195 +++++++++++ internal/foundation/config/config.go | 21 ++ internal/foundation/logger/logger.go | 8 +- internal/foundation/logger/logger_test.go | 8 +- .../orchestration/restorelock/datastore.go | 62 ++++ .../restorelock/datastore_test.go | 141 ++++++++ internal/orchestration/restorelock/lock.go | 160 +++++++++ .../orchestration/restorelock/lock_test.go | 314 ++++++++++++++++++ internal/orchestration/scale/scale.go | 70 ++++ 22 files changed, 1108 insertions(+), 58 deletions(-) create mode 100644 internal/orchestration/restorelock/datastore.go create mode 100644 internal/orchestration/restorelock/datastore_test.go create mode 100644 internal/orchestration/restorelock/lock.go create mode 100644 internal/orchestration/restorelock/lock_test.go diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index d8fde2a..7228422 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -39,7 +39,8 @@ stackstate-backup-cli/ │ ├── orchestration/ # Layer 2: Workflows │ │ ├── portforward/ # Port-forwarding orchestration │ │ ├── scale/ # Deployment/StatefulSet scaling workflows -│ │ └── restore/ # Restore job orchestration +│ │ ├── restore/ # Restore job orchestration +│ │ └── restorelock/ # Restore lock mechanism (prevents parallel restores) │ │ │ ├── app/ # Layer 3: Dependency Container │ │ └── app.go # Application context and dependency injection @@ -126,6 +127,7 @@ appCtx.Formatter - `portforward/`: Manages Kubernetes port-forwarding lifecycle - `scale/`: Deployment and StatefulSet scaling workflows with detailed logging - `restore/`: Restore job orchestration (confirmation, job lifecycle, finalization, resource management) +- `restorelock/`: Prevents parallel restore operations using Kubernetes annotations **Dependency Rules**: - ✅ Can import: `internal/foundation/*`, `internal/clients/*` @@ -317,16 +319,45 @@ Log: log, ### 7. Structured Logging -All operations use structured logging with consistent levels: +All operations use structured logging with consistent levels and emoji prefixes for visual clarity: ```go -log.Infof("Starting operation...") -log.Debugf("Detail: %v", detail) -log.Warningf("Non-fatal issue: %v", warning) -log.Errorf("Operation failed: %v", err) -log.Successf("Operation completed successfully") +log.Infof("Starting operation...") // No prefix +log.Debugf("Detail: %v", detail) // 🛠️ DEBUG: +log.Warningf("Non-fatal issue: %v", warning) // ⚠️ Warning: +log.Errorf("Operation failed: %v", err) // ❌ Error: +log.Successf("Operation completed") // ✅ ``` +### 8. Restore Lock Pattern + +The `restorelock` package prevents parallel restore operations that could corrupt data: + +```go +// Scale down with automatic lock acquisition +scaledApps, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ + K8sClient: k8sClient, + Namespace: namespace, + LabelSelector: selector, + Datastore: restorelock.DatastoreStackgraph, + AllSelectors: config.GetAllScaleDownSelectors(), + Log: log, +}) + +// Scale up and release lock +defer scale.ScaleUpAndReleaseLock(k8sClient, namespace, selector, log) +``` + +**How it works**: +1. Before scaling down, checks for existing restore locks on Deployments/StatefulSets +2. Detects conflicts for the same datastore or mutually exclusive datastores (e.g., Stackgraph and Settings) +3. Sets annotations (`stackstate.com/restore-in-progress`, `stackstate.com/restore-started-at`) on resources +4. Releases locks when scaling up or on failure + +**Mutual Exclusion Groups**: +- Stackgraph and Settings restores are mutually exclusive (both modify HBase data) +- Other datastores (Elasticsearch, ClickHouse, VictoriaMetrics) are independent + ## Testing Strategy ### Unit Tests diff --git a/README.md b/README.md index 941c8b6..ce256d4 100644 --- a/README.md +++ b/README.md @@ -391,11 +391,12 @@ See [internal/foundation/config/testdata/validConfigMapConfig.yaml](internal/fou │ ├── orchestration/ # Layer 2: Workflows │ │ ├── portforward/ # Port-forwarding lifecycle │ │ ├── scale/ # Deployment/StatefulSet scaling -│ │ └── restore/ # Restore job orchestration -│ │ ├── confirmation.go # User confirmation prompts -│ │ ├── finalize.go # Job status check and cleanup -│ │ ├── job.go # Job lifecycle management -│ │ └── resources.go # Restore resource management +│ │ ├── restore/ # Restore job orchestration +│ │ │ ├── confirmation.go # User confirmation prompts +│ │ │ ├── finalize.go # Job status check and cleanup +│ │ │ ├── job.go # Job lifecycle management +│ │ │ └── resources.go # Restore resource management +│ │ └── restorelock/ # Parallel restore prevention │ ├── app/ # Layer 3: Dependency container │ │ └── app.go # Application context and DI │ └── scripts/ # Embedded bash scripts @@ -409,6 +410,24 @@ See [internal/foundation/config/testdata/validConfigMapConfig.yaml](internal/fou - **Dependency Injection**: Centralized dependency creation via `internal/app/` eliminates boilerplate from commands - **Testability**: All layers use interfaces for external dependencies, enabling comprehensive unit testing - **Clean Commands**: Commands are thin (50-100 lines) and focused on business logic +- **Restore Lock Protection**: Prevents parallel restore operations that could corrupt data + +### Restore Lock Protection + +The CLI prevents parallel restore operations that could corrupt data by using Kubernetes annotations on Deployments and StatefulSets. When a restore starts: + +1. The CLI checks for existing restore locks before proceeding +2. If another restore is in progress for the same datastore, the operation is blocked +3. Mutually exclusive datastores are also protected (e.g., Stackgraph and Settings cannot restore simultaneously because they share HBase data) + +If a restore operation is interrupted or fails, the lock annotations may remain. To manually remove a stuck lock: + +```bash +kubectl annotate deployment,statefulset -l \ + stackstate.com/restore-in-progress- \ + stackstate.com/restore-started-at- \ + -n +``` See [ARCHITECTURE.md](ARCHITECTURE.md) for detailed information about the layered architecture and design patterns. diff --git a/cmd/clickhouse/check_and_finalize.go b/cmd/clickhouse/check_and_finalize.go index da054e3..3c9d0cf 100644 --- a/cmd/clickhouse/check_and_finalize.go +++ b/cmd/clickhouse/check_and_finalize.go @@ -127,7 +127,7 @@ func waitAndFinalize(appCtx *app.Context, chClient clickhouse.Interface, operati return finalizeRestore(appCtx) } -// finalizeRestore finalizes the restore by executing SQL and scaling up +// finalizeRestore finalizes the restore by executing SQL, scaling up, and releasing lock func finalizeRestore(appCtx *app.Context) error { if err := executePostRestoreSQL(appCtx); err != nil { appCtx.Logger.Warningf("Post-restore SQL failed: %v", err) @@ -135,13 +135,13 @@ func finalizeRestore(appCtx *app.Context) error { appCtx.Logger.Println() scaleSelector := appCtx.Config.Clickhouse.Restore.ScaleDownLabelSelector - if err := scale.ScaleUpFromAnnotations( + if err := scale.ScaleUpAndReleaseLock( appCtx.K8sClient, appCtx.Namespace, scaleSelector, appCtx.Logger, ); err != nil { - return fmt.Errorf("failed to scale up: %w", err) + return fmt.Errorf("failed to scale up and release lock: %w", err) } appCtx.Logger.Println() diff --git a/cmd/clickhouse/restore.go b/cmd/clickhouse/restore.go index b92acc7..2a82256 100644 --- a/cmd/clickhouse/restore.go +++ b/cmd/clickhouse/restore.go @@ -10,6 +10,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" ) @@ -69,10 +70,17 @@ func runRestore(appCtx *app.Context) error { } } - // Scale down deployments/statefulsets before restore + // Scale down deployments/statefulsets before restore (with lock protection) appCtx.Logger.Println() scaleDownLabelSelector := appCtx.Config.Clickhouse.Restore.ScaleDownLabelSelector - _, err := scale.ScaleDown(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger) + _, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + LabelSelector: scaleDownLabelSelector, + Datastore: restorelock.DatastoreClickhouse, + AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), + Log: appCtx.Logger, + }) if err != nil { return err } diff --git a/cmd/cmdutils/common.go b/cmd/cmdutils/common.go index d511ef2..ddb538f 100644 --- a/cmd/cmdutils/common.go +++ b/cmd/cmdutils/common.go @@ -1,9 +1,7 @@ package cmdutils import ( - "errors" "fmt" - "io" "os" "github.com/stackvista/stackstate-backup-cli/internal/app" @@ -18,19 +16,15 @@ const ( func Run(globalFlags *config.CLIGlobalFlags, runFunc func(ctx *app.Context) error, minioRequired bool) { appCtx, err := app.NewContext(globalFlags) if err != nil { - exitWithError(err, os.Stderr) + _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + os.Exit(1) } if minioRequired && !appCtx.Config.Minio.Enabled { - exitWithError(errors.New("commands that interact with Minio require SUSE Observability to be deployed with .Values.global.backup.enabled=true"), os.Stderr) + appCtx.Logger.Errorf("commands that interact with Minio require SUSE Observability to be deployed with .Values.global.backup.enabled=true") + os.Exit(1) } if err := runFunc(appCtx); err != nil { - exitWithError(err, os.Stderr) + appCtx.Logger.Errorf(err.Error()) + os.Exit(1) } } - -// ExitWithError prints an error message to the writer and exits with status code 1. -// This is a helper function to avoid repeating error handling code in commands. -func exitWithError(err error, w io.Writer) { - _, _ = fmt.Fprintf(w, "error: %v\n", err) - os.Exit(1) -} diff --git a/cmd/elasticsearch/check_and_finalize.go b/cmd/elasticsearch/check_and_finalize.go index 13960c9..9acb5ec 100644 --- a/cmd/elasticsearch/check_and_finalize.go +++ b/cmd/elasticsearch/check_and_finalize.go @@ -113,20 +113,22 @@ func waitAndFinalize(appCtx *app.Context, repository, snapshotName string) error return finalizeRestore(appCtx) } -// finalizeRestore performs post-restore finalization (scale up deployments) +// finalizeRestore performs post-restore finalization (scale up deployments and release lock) func finalizeRestore(appCtx *app.Context) error { appCtx.Logger.Println() + labelSelector := appCtx.Config.Elasticsearch.Restore.ScaleDownLabelSelector scaleUpFn := func() error { - return scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Elasticsearch.Restore.ScaleDownLabelSelector, appCtx.Logger) + return scale.ScaleUpAndReleaseLock(appCtx.K8sClient, appCtx.Namespace, labelSelector, appCtx.Logger) } return restore.FinalizeRestore(scaleUpFn, appCtx.Logger) } -// attemptScaleUp tries to scale up deployments (used when restore is not found/already complete) +// attemptScaleUp tries to scale up deployments and release lock (used when restore is not found/already complete) func attemptScaleUp(appCtx *app.Context) error { + labelSelector := appCtx.Config.Elasticsearch.Restore.ScaleDownLabelSelector scaleUpFn := func() error { - return scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Elasticsearch.Restore.ScaleDownLabelSelector, appCtx.Logger) + return scale.ScaleUpAndReleaseLock(appCtx.K8sClient, appCtx.Namespace, labelSelector, appCtx.Logger) } if err := scaleUpFn(); err != nil { diff --git a/cmd/elasticsearch/restore.go b/cmd/elasticsearch/restore.go index 2e42dd0..ce8ae22 100644 --- a/cmd/elasticsearch/restore.go +++ b/cmd/elasticsearch/restore.go @@ -14,6 +14,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" ) @@ -91,9 +92,17 @@ func runRestore(appCtx *app.Context) error { } } - // Scale down deployments before restore + // Scale down deployments before restore (with lock protection) appCtx.Logger.Println() - _, err = scale.ScaleDown(appCtx.K8sClient, appCtx.Namespace, appCtx.Config.Elasticsearch.Restore.ScaleDownLabelSelector, appCtx.Logger) + scaleDownLabelSelector := appCtx.Config.Elasticsearch.Restore.ScaleDownLabelSelector + _, err = scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + LabelSelector: scaleDownLabelSelector, + Datastore: restorelock.DatastoreElasticsearch, + AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), + Log: appCtx.Logger, + }) if err != nil { return err } diff --git a/cmd/settings/check_and_finalize.go b/cmd/settings/check_and_finalize.go index 97ea29c..c923fb5 100644 --- a/cmd/settings/check_and_finalize.go +++ b/cmd/settings/check_and_finalize.go @@ -48,7 +48,7 @@ func runCheckAndFinalize(appCtx *app.Context) error { Namespace: appCtx.Namespace, JobName: checkJobName, ServiceName: "settings", - ScaleUpFn: scale.ScaleUpFromAnnotations, + ScaleUpFn: scale.ScaleUpAndReleaseLock, ScaleDownFn: scale.ScaleDown, ScaleSelector: appCtx.Config.Settings.Restore.ScaleDownLabelSelector, CleanupPVC: false, diff --git a/cmd/settings/restore.go b/cmd/settings/restore.go index 7442bac..b6c344e 100644 --- a/cmd/settings/restore.go +++ b/cmd/settings/restore.go @@ -12,6 +12,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" corev1 "k8s.io/api/core/v1" ) @@ -78,19 +79,26 @@ func runRestore(appCtx *app.Context) error { } } - // Scale down deployments before restore + // Scale down deployments before restore (with lock protection) appCtx.Logger.Println() scaleDownLabelSelector := appCtx.Config.Settings.Restore.ScaleDownLabelSelector - scaledDeployments, err := scale.ScaleDown(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger) + scaledDeployments, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + LabelSelector: scaleDownLabelSelector, + Datastore: restorelock.DatastoreSettings, + AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), + Log: appCtx.Logger, + }) if err != nil { return err } - // Ensure deployments are scaled back up on exit (even if restore fails) + // Ensure deployments are scaled back up and lock released on exit (even if restore fails) defer func() { if len(scaledDeployments) > 0 && !background { appCtx.Logger.Println() - if err := scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { + if err := scale.ScaleUpAndReleaseLock(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { appCtx.Logger.Warningf("Failed to scale up deployments: %v", err) } } diff --git a/cmd/stackgraph/check_and_finalize.go b/cmd/stackgraph/check_and_finalize.go index 8ab92e1..2895eb0 100644 --- a/cmd/stackgraph/check_and_finalize.go +++ b/cmd/stackgraph/check_and_finalize.go @@ -48,7 +48,7 @@ func runCheckAndFinalize(appCtx *app.Context) error { Namespace: appCtx.Namespace, JobName: checkJobName, ServiceName: "stackgraph", - ScaleUpFn: scale.ScaleUpFromAnnotations, + ScaleUpFn: scale.ScaleUpAndReleaseLock, ScaleDownFn: scale.ScaleDown, ScaleSelector: appCtx.Config.Stackgraph.Restore.ScaleDownLabelSelector, CleanupPVC: true, diff --git a/cmd/stackgraph/restore.go b/cmd/stackgraph/restore.go index cbf2d17..04003a2 100644 --- a/cmd/stackgraph/restore.go +++ b/cmd/stackgraph/restore.go @@ -18,6 +18,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" corev1 "k8s.io/api/core/v1" ) @@ -84,19 +85,26 @@ func runRestore(appCtx *app.Context) error { } } - // Scale down deployments before restore + // Scale down deployments before restore (with lock protection) appCtx.Logger.Println() scaleDownLabelSelector := appCtx.Config.Stackgraph.Restore.ScaleDownLabelSelector - scaledDeployments, err := scale.ScaleDown(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger) + scaledDeployments, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + LabelSelector: scaleDownLabelSelector, + Datastore: restorelock.DatastoreStackgraph, + AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), + Log: appCtx.Logger, + }) if err != nil { return err } - // Ensure deployments are scaled back up on exit (even if restore fails) + // Ensure deployments are scaled back up and lock released on exit (even if restore fails) defer func() { if len(scaledDeployments) > 0 && !background { appCtx.Logger.Println() - if err := scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { + if err := scale.ScaleUpAndReleaseLock(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { appCtx.Logger.Warningf("Failed to scale up deployments: %v", err) } } diff --git a/cmd/victoriametrics/check_and_finalize.go b/cmd/victoriametrics/check_and_finalize.go index 50875d7..a20b433 100644 --- a/cmd/victoriametrics/check_and_finalize.go +++ b/cmd/victoriametrics/check_and_finalize.go @@ -48,7 +48,7 @@ func runCheckAndFinalize(appCtx *app.Context) error { Namespace: appCtx.Namespace, JobName: checkJobName, ServiceName: "victoria-metrics", - ScaleUpFn: scale.ScaleUpFromAnnotations, + ScaleUpFn: scale.ScaleUpAndReleaseLock, ScaleDownFn: scale.ScaleDown, ScaleSelector: appCtx.Config.VictoriaMetrics.Restore.ScaleDownLabelSelector, CleanupPVC: false, diff --git a/cmd/victoriametrics/restore.go b/cmd/victoriametrics/restore.go index 4a7ecae..cc84068 100644 --- a/cmd/victoriametrics/restore.go +++ b/cmd/victoriametrics/restore.go @@ -17,6 +17,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" corev1 "k8s.io/api/core/v1" ) @@ -82,19 +83,26 @@ func runRestore(appCtx *app.Context) error { } } - // Scale down workload before restore + // Scale down workload before restore (with lock protection) appCtx.Logger.Println() scaleDownLabelSelector := appCtx.Config.VictoriaMetrics.Restore.ScaleDownLabelSelector - scaledStatefulSets, err := scale.ScaleDown(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger) + scaledStatefulSets, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ + K8sClient: appCtx.K8sClient, + Namespace: appCtx.Namespace, + LabelSelector: scaleDownLabelSelector, + Datastore: restorelock.DatastoreVictoriaMetrics, + AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), + Log: appCtx.Logger, + }) if err != nil { return err } - // Ensure workload are scaled back up on exit (even if restore fails) + // Ensure workload are scaled back up and lock released on exit (even if restore fails) defer func() { if len(scaledStatefulSets) > 0 && !background { appCtx.Logger.Println() - if err := scale.ScaleUpFromAnnotations(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { + if err := scale.ScaleUpAndReleaseLock(appCtx.K8sClient, appCtx.Namespace, scaleDownLabelSelector, appCtx.Logger); err != nil { appCtx.Logger.Warningf("Failed to scale up workload: %v", err) } } diff --git a/internal/clients/k8s/client.go b/internal/clients/k8s/client.go index 5a5a597..497f7c3 100644 --- a/internal/clients/k8s/client.go +++ b/internal/clients/k8s/client.go @@ -150,6 +150,12 @@ func (c *Client) PortForwardPod(namespace, podName string, localPort, remotePort const ( // PreRestoreReplicasAnnotation is the annotation key used to store original replica counts PreRestoreReplicasAnnotation = "stackstate.com/pre-restore-replicas" + + // RestoreInProgressAnnotation is the annotation key used to track which datastore restore is in progress + RestoreInProgressAnnotation = "stackstate.com/restore-in-progress" + + // RestoreStartedAtAnnotation is the annotation key used to track when the restore started + RestoreStartedAtAnnotation = "stackstate.com/restore-started-at" ) // AppsScale holds the name and original replica count of a scalable resource @@ -158,6 +164,14 @@ type AppsScale struct { Replicas int32 } +// RestoreLockInfo holds information about an active restore lock on a resource +type RestoreLockInfo struct { + ResourceKind string // "Deployment" or "StatefulSet" + ResourceName string + Datastore string + StartedAt string +} + // ScalableResource abstracts operations on resources that can be scaled type ScalableResource interface { GetName() string @@ -413,3 +427,184 @@ func NewTestClient(clientset kubernetes.Interface) *Client { debug: false, } } + +// GetRestoreLocks returns all active restore locks on Deployments and StatefulSets +// matching the given label selector +func (c *Client) GetRestoreLocks(namespace, labelSelector string) ([]RestoreLockInfo, error) { + ctx := context.Background() + var locks []RestoreLockInfo + + // Check Deployments + deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list deployments: %w", err) + } + + for _, dep := range deployments.Items { + if datastore, ok := dep.Annotations[RestoreInProgressAnnotation]; ok { + locks = append(locks, RestoreLockInfo{ + ResourceKind: "Deployment", + ResourceName: dep.Name, + Datastore: datastore, + StartedAt: dep.Annotations[RestoreStartedAtAnnotation], + }) + } + } + + // Check StatefulSets + statefulSets, err := c.clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return nil, fmt.Errorf("failed to list statefulsets: %w", err) + } + + for _, sts := range statefulSets.Items { + if datastore, ok := sts.Annotations[RestoreInProgressAnnotation]; ok { + locks = append(locks, RestoreLockInfo{ + ResourceKind: "StatefulSet", + ResourceName: sts.Name, + Datastore: datastore, + StartedAt: sts.Annotations[RestoreStartedAtAnnotation], + }) + } + } + + return locks, nil +} + +// SetRestoreLock sets the restore lock annotations on Deployments and StatefulSets +// matching the given label selector +func (c *Client) SetRestoreLock(namespace, labelSelector, datastore, startedAt string) error { + ctx := context.Background() + + // Update Deployments + deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } + + for i := range deployments.Items { + dep := &deployments.Items[i] + if dep.Annotations == nil { + dep.Annotations = make(map[string]string) + } + dep.Annotations[RestoreInProgressAnnotation] = datastore + dep.Annotations[RestoreStartedAtAnnotation] = startedAt + + if _, err := c.clientset.AppsV1().Deployments(namespace).Update(ctx, dep, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to set restore lock on deployment %s: %w", dep.Name, err) + } + } + + // Update StatefulSets + statefulSets, err := c.clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return fmt.Errorf("failed to list statefulsets: %w", err) + } + + for i := range statefulSets.Items { + sts := &statefulSets.Items[i] + if sts.Annotations == nil { + sts.Annotations = make(map[string]string) + } + sts.Annotations[RestoreInProgressAnnotation] = datastore + sts.Annotations[RestoreStartedAtAnnotation] = startedAt + + if _, err := c.clientset.AppsV1().StatefulSets(namespace).Update(ctx, sts, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to set restore lock on statefulset %s: %w", sts.Name, err) + } + } + + return nil +} + +// hasRestoreLockAnnotations checks if the given annotations map contains restore lock annotations +func hasRestoreLockAnnotations(annotations map[string]string) bool { + if annotations == nil { + return false + } + _, hasLock := annotations[RestoreInProgressAnnotation] + _, hasStartedAt := annotations[RestoreStartedAtAnnotation] + return hasLock || hasStartedAt +} + +// removeRestoreLockAnnotations removes restore lock annotations from the given map +func removeRestoreLockAnnotations(annotations map[string]string) { + delete(annotations, RestoreInProgressAnnotation) + delete(annotations, RestoreStartedAtAnnotation) +} + +// ClearRestoreLock removes the restore lock annotations from Deployments and StatefulSets +// matching the given label selector +func (c *Client) ClearRestoreLock(namespace, labelSelector string) error { + ctx := context.Background() + + // Update Deployments + deployments, err := c.clientset.AppsV1().Deployments(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return fmt.Errorf("failed to list deployments: %w", err) + } + + for i := range deployments.Items { + if !hasRestoreLockAnnotations(deployments.Items[i].Annotations) { + continue + } + + // Refetch to get latest version (may have been modified by scale-up) + dep, err := c.clientset.AppsV1().Deployments(namespace).Get(ctx, deployments.Items[i].Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get deployment %s: %w", deployments.Items[i].Name, err) + } + + if dep.Annotations == nil { + continue + } + + removeRestoreLockAnnotations(dep.Annotations) + + if _, err := c.clientset.AppsV1().Deployments(namespace).Update(ctx, dep, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to clear restore lock on deployment %s: %w", dep.Name, err) + } + } + + // Update StatefulSets + statefulSets, err := c.clientset.AppsV1().StatefulSets(namespace).List(ctx, metav1.ListOptions{ + LabelSelector: labelSelector, + }) + if err != nil { + return fmt.Errorf("failed to list statefulsets: %w", err) + } + + for i := range statefulSets.Items { + if !hasRestoreLockAnnotations(statefulSets.Items[i].Annotations) { + continue + } + + // Refetch to get latest version (may have been modified by scale-up) + sts, err := c.clientset.AppsV1().StatefulSets(namespace).Get(ctx, statefulSets.Items[i].Name, metav1.GetOptions{}) + if err != nil { + return fmt.Errorf("failed to get statefulset %s: %w", statefulSets.Items[i].Name, err) + } + + if sts.Annotations == nil { + continue + } + + removeRestoreLockAnnotations(sts.Annotations) + + if _, err := c.clientset.AppsV1().StatefulSets(namespace).Update(ctx, sts, metav1.UpdateOptions{}); err != nil { + return fmt.Errorf("failed to clear restore lock on statefulset %s: %w", sts.Name, err) + } + } + + return nil +} diff --git a/internal/foundation/config/config.go b/internal/foundation/config/config.go index 7c8ada9..7a5c198 100644 --- a/internal/foundation/config/config.go +++ b/internal/foundation/config/config.go @@ -354,3 +354,24 @@ type CLIGlobalFlags struct { func NewCLIGlobalFlags() *CLIGlobalFlags { return &CLIGlobalFlags{} } + +// Datastore identifiers (must match restorelock package constants) +const ( + DatastoreElasticsearch = "elasticsearch" + DatastoreClickhouse = "clickhouse" + DatastoreVictoriaMetrics = "victoriametrics" + DatastoreStackgraph = "stackgraph" + DatastoreSettings = "settings" +) + +// GetAllScaleDownSelectors returns a map of all datastore scale-down label selectors. +// This is used by the restore lock mechanism to check for conflicting operations. +func (c *Config) GetAllScaleDownSelectors() map[string]string { + return map[string]string{ + DatastoreElasticsearch: c.Elasticsearch.Restore.ScaleDownLabelSelector, + DatastoreClickhouse: c.Clickhouse.Restore.ScaleDownLabelSelector, + DatastoreVictoriaMetrics: c.VictoriaMetrics.Restore.ScaleDownLabelSelector, + DatastoreStackgraph: c.Stackgraph.Restore.ScaleDownLabelSelector, + DatastoreSettings: c.Settings.Restore.ScaleDownLabelSelector, + } +} diff --git a/internal/foundation/logger/logger.go b/internal/foundation/logger/logger.go index 5230f34..eafdf61 100644 --- a/internal/foundation/logger/logger.go +++ b/internal/foundation/logger/logger.go @@ -32,26 +32,26 @@ func (l *Logger) Infof(format string, args ...interface{}) { // Successf logs a success message func (l *Logger) Successf(format string, args ...interface{}) { if !l.quiet { - _, _ = fmt.Fprintf(l.writer, "✓ "+format+"\n", args...) + _, _ = fmt.Fprintf(l.writer, "✅ "+format+"\n", args...) } } // Warningf logs a warning message func (l *Logger) Warningf(format string, args ...interface{}) { if !l.quiet { - _, _ = fmt.Fprintf(l.writer, "Warning: "+format+"\n", args...) + _, _ = fmt.Fprintf(l.writer, "⚠️ Warning: "+format+"\n", args...) } } // Errorf logs an error message (always shown, even in quiet mode) func (l *Logger) Errorf(format string, args ...interface{}) { - _, _ = fmt.Fprintf(l.writer, "Error: "+format+"\n", args...) + _, _ = fmt.Fprintf(l.writer, "❌ Error: "+format+"\n", args...) } // Debug logs a debug message (only shown when debug mode is enabled) func (l *Logger) Debugf(format string, args ...interface{}) { if l.debug { - _, _ = fmt.Fprintf(l.writer, "DEBUG: "+format+"\n", args...) + _, _ = fmt.Fprintf(l.writer, "🛠️ DEBUG: "+format+"\n", args...) } } diff --git a/internal/foundation/logger/logger_test.go b/internal/foundation/logger/logger_test.go index ce5be9b..6f07859 100644 --- a/internal/foundation/logger/logger_test.go +++ b/internal/foundation/logger/logger_test.go @@ -134,7 +134,7 @@ func TestLogger_Successf(t *testing.T) { output := buf.String() assert.Contains(t, output, "Completed task") if tt.containsSymbol { - assert.Contains(t, output, "✓") + assert.Contains(t, output, "✅") } } else { assert.Empty(t, buf.String()) @@ -329,7 +329,7 @@ func TestLogger_MultipleCalls(t *testing.T) { assert.Len(t, lines, 4) assert.Contains(t, output, "Starting process") - assert.Contains(t, output, "DEBUG: Debug details") - assert.Contains(t, output, "✓ Process completed") - assert.Contains(t, output, "Warning: Cleanup recommended") + assert.Contains(t, output, "🛠️ DEBUG: Debug details") + assert.Contains(t, output, "✅ Process completed") + assert.Contains(t, output, "⚠️ Warning: Cleanup recommended") } diff --git a/internal/orchestration/restorelock/datastore.go b/internal/orchestration/restorelock/datastore.go new file mode 100644 index 0000000..000f029 --- /dev/null +++ b/internal/orchestration/restorelock/datastore.go @@ -0,0 +1,62 @@ +// Package restorelock provides mechanisms to prevent parallel restore operations +// for the same datastore or mutually exclusive datastores. +package restorelock + +// Datastore identifiers used for restore lock tracking +const ( + DatastoreElasticsearch = "elasticsearch" + DatastoreClickhouse = "clickhouse" + DatastoreVictoriaMetrics = "victoriametrics" + DatastoreStackgraph = "stackgraph" + DatastoreSettings = "settings" +) + +// MutualExclusionGroup identifies a group of datastores that cannot be restored concurrently. +// Datastores in the same group share underlying data or have dependencies that make +// parallel restores unsafe. +const ( + // ExclusionGroupStackgraph groups Stackgraph and Settings restores + // because Settings restore modifies Stackgraph/HBase data + ExclusionGroupStackgraph = "stackgraph" +) + +// datastoreMutualExclusion maps each datastore to its mutual exclusion group. +// Empty string means no mutual exclusion (datastore is independent). +var datastoreMutualExclusion = map[string]string{ + DatastoreElasticsearch: "", // Independent + DatastoreClickhouse: "", // Independent + DatastoreVictoriaMetrics: "", // Independent + DatastoreStackgraph: ExclusionGroupStackgraph, + DatastoreSettings: ExclusionGroupStackgraph, +} + +// GetMutualExclusionGroup returns the mutual exclusion group for a datastore. +// Returns empty string if the datastore has no mutual exclusion constraints. +func GetMutualExclusionGroup(datastore string) string { + return datastoreMutualExclusion[datastore] +} + +// GetDatastoresInGroup returns all datastores that belong to the given mutual exclusion group. +// Returns nil if the group doesn't exist or is empty. +func GetDatastoresInGroup(group string) []string { + if group == "" { + return nil + } + + var datastores []string + for ds, g := range datastoreMutualExclusion { + if g == group { + datastores = append(datastores, ds) + } + } + return datastores +} + +// AreDatastoresMutuallyExclusive checks if two datastores are in the same mutual exclusion group. +func AreDatastoresMutuallyExclusive(datastore1, datastore2 string) bool { + group1 := GetMutualExclusionGroup(datastore1) + group2 := GetMutualExclusionGroup(datastore2) + + // Both must have a non-empty group and the groups must match + return group1 != "" && group1 == group2 +} diff --git a/internal/orchestration/restorelock/datastore_test.go b/internal/orchestration/restorelock/datastore_test.go new file mode 100644 index 0000000..818503b --- /dev/null +++ b/internal/orchestration/restorelock/datastore_test.go @@ -0,0 +1,141 @@ +package restorelock + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestGetMutualExclusionGroup(t *testing.T) { + tests := []struct { + name string + datastore string + want string + }{ + { + name: "elasticsearch has no exclusion group", + datastore: DatastoreElasticsearch, + want: "", + }, + { + name: "clickhouse has no exclusion group", + datastore: DatastoreClickhouse, + want: "", + }, + { + name: "victoriametrics has no exclusion group", + datastore: DatastoreVictoriaMetrics, + want: "", + }, + { + name: "stackgraph has stackgraph group", + datastore: DatastoreStackgraph, + want: ExclusionGroupStackgraph, + }, + { + name: "settings has stackgraph group", + datastore: DatastoreSettings, + want: ExclusionGroupStackgraph, + }, + { + name: "unknown datastore returns empty string", + datastore: "unknown", + want: "", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := GetMutualExclusionGroup(tt.datastore) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestGetDatastoresInGroup(t *testing.T) { + tests := []struct { + name string + group string + want []string + }{ + { + name: "empty group returns nil", + group: "", + want: nil, + }, + { + name: "stackgraph group contains stackgraph and settings", + group: ExclusionGroupStackgraph, + want: []string{DatastoreStackgraph, DatastoreSettings}, + }, + { + name: "unknown group returns nil", + group: "unknown-group", + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := GetDatastoresInGroup(tt.group) + if tt.want == nil { + assert.Nil(t, got) + } else { + assert.ElementsMatch(t, tt.want, got) + } + }) + } +} + +func TestAreDatastoresMutuallyExclusive(t *testing.T) { + tests := []struct { + name string + datastore1 string + datastore2 string + want bool + }{ + { + name: "stackgraph and settings are mutually exclusive", + datastore1: DatastoreStackgraph, + datastore2: DatastoreSettings, + want: true, + }, + { + name: "settings and stackgraph are mutually exclusive (reversed)", + datastore1: DatastoreSettings, + datastore2: DatastoreStackgraph, + want: true, + }, + { + name: "elasticsearch and clickhouse are not mutually exclusive", + datastore1: DatastoreElasticsearch, + datastore2: DatastoreClickhouse, + want: false, + }, + { + name: "stackgraph and elasticsearch are not mutually exclusive", + datastore1: DatastoreStackgraph, + datastore2: DatastoreElasticsearch, + want: false, + }, + { + name: "same datastore (stackgraph) is technically mutually exclusive with itself", + datastore1: DatastoreStackgraph, + datastore2: DatastoreStackgraph, + want: true, + }, + { + name: "same datastore (elasticsearch) without group is not mutually exclusive", + datastore1: DatastoreElasticsearch, + datastore2: DatastoreElasticsearch, + want: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got := AreDatastoresMutuallyExclusive(tt.datastore1, tt.datastore2) + assert.Equal(t, tt.want, got) + }) + } +} diff --git a/internal/orchestration/restorelock/lock.go b/internal/orchestration/restorelock/lock.go new file mode 100644 index 0000000..35ed9e8 --- /dev/null +++ b/internal/orchestration/restorelock/lock.go @@ -0,0 +1,160 @@ +package restorelock + +import ( + "fmt" + "time" + + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" +) + +// LabelSelectors maps datastore names to their scale-down label selectors. +// This is used to check for conflicts across all related datastores. +type LabelSelectors map[string]string + +// formatUnlockHint returns the kubectl command to manually remove a stuck restore lock +func formatUnlockHint(namespace, labelSelector string) string { + return fmt.Sprintf( + "To manually remove a stuck restore lock, run:\n"+ + " kubectl annotate deployment,statefulset -l %s %s- %s- -n %s", + labelSelector, + k8s.RestoreInProgressAnnotation, + k8s.RestoreStartedAtAnnotation, + namespace, + ) +} + +// formatConflictError creates an error message for restore conflicts including the unlock hint +func formatConflictError( + currentDatastore, conflictingDatastore, resourceKind, resourceName, startedAt string, + isMutualExclusion bool, + namespace, labelSelector string, +) error { + var msg string + if isMutualExclusion { + msg = fmt.Sprintf( + "cannot start %s restore: %s restore is in progress (started at %s on %s/%s). "+ + "Note: %s and %s restores are mutually exclusive", + currentDatastore, conflictingDatastore, startedAt, + resourceKind, resourceName, + currentDatastore, conflictingDatastore, + ) + } else { + msg = fmt.Sprintf( + "cannot start %s restore: another %s restore is already in progress (started at %s on %s/%s)", + currentDatastore, conflictingDatastore, startedAt, + resourceKind, resourceName, + ) + } + + hint := formatUnlockHint(namespace, labelSelector) + return fmt.Errorf("%s\n\n%s", msg, hint) +} + +// CheckForConflicts checks if any restore operation is in progress that would conflict +// with starting a new restore for the given datastore. +// +// It checks for two types of conflicts: +// 1. Same datastore: Another restore for the same datastore is in progress +// 2. Mutual exclusion: A restore for a mutually exclusive datastore is in progress +// (e.g., stackgraph and settings cannot run concurrently) +func CheckForConflicts( + k8sClient *k8s.Client, + namespace string, + datastore string, + allSelectors LabelSelectors, + log *logger.Logger, +) error { + log.Debugf("Checking for restore conflicts for datastore: %s", datastore) + + // Get current datastore's selector + currentSelector, ok := allSelectors[datastore] + if !ok { + return fmt.Errorf("no label selector configured for datastore: %s", datastore) + } + + // 1. Check for same-datastore conflict + locks, err := k8sClient.GetRestoreLocks(namespace, currentSelector) + if err != nil { + return fmt.Errorf("failed to check for restore locks: %w", err) + } + + for _, lock := range locks { + return formatConflictError( + datastore, lock.Datastore, lock.ResourceKind, lock.ResourceName, lock.StartedAt, + lock.Datastore != datastore, + namespace, currentSelector, + ) + } + + // 2. Check for mutual exclusion conflicts + currentGroup := GetMutualExclusionGroup(datastore) + if currentGroup != "" { + relatedDatastores := GetDatastoresInGroup(currentGroup) + for _, relatedDS := range relatedDatastores { + if relatedDS == datastore { + continue // Skip self (already checked above) + } + + relatedSelector, ok := allSelectors[relatedDS] + if !ok { + log.Debugf("No selector for related datastore %s, skipping", relatedDS) + continue + } + + locks, err := k8sClient.GetRestoreLocks(namespace, relatedSelector) + if err != nil { + return fmt.Errorf("failed to check for restore locks on %s: %w", relatedDS, err) + } + + for _, lock := range locks { + return formatConflictError( + datastore, lock.Datastore, lock.ResourceKind, lock.ResourceName, lock.StartedAt, + true, + namespace, relatedSelector, + ) + } + } + } + + log.Debugf("No restore conflicts found for datastore: %s", datastore) + return nil +} + +// AcquireLock sets restore lock annotations on all resources matching the label selector. +// This should be called before starting a restore operation. +func AcquireLock( + k8sClient *k8s.Client, + namespace string, + labelSelector string, + datastore string, + log *logger.Logger, +) error { + startedAt := time.Now().UTC().Format(time.RFC3339) + log.Debugf("Acquiring restore lock for datastore %s (started at %s)", datastore, startedAt) + + if err := k8sClient.SetRestoreLock(namespace, labelSelector, datastore, startedAt); err != nil { + return fmt.Errorf("failed to acquire restore lock: %w", err) + } + + log.Debugf("Restore lock acquired for datastore: %s", datastore) + return nil +} + +// ReleaseLock removes restore lock annotations from all resources matching the label selector. +// This should be called after a restore operation completes (success or failure). +func ReleaseLock( + k8sClient *k8s.Client, + namespace string, + labelSelector string, + log *logger.Logger, +) error { + log.Debugf("Releasing restore lock (selector: %s)", labelSelector) + + if err := k8sClient.ClearRestoreLock(namespace, labelSelector); err != nil { + return fmt.Errorf("failed to release restore lock: %w", err) + } + + log.Debugf("Restore lock released") + return nil +} diff --git a/internal/orchestration/restorelock/lock_test.go b/internal/orchestration/restorelock/lock_test.go new file mode 100644 index 0000000..28dee71 --- /dev/null +++ b/internal/orchestration/restorelock/lock_test.go @@ -0,0 +1,314 @@ +package restorelock + +import ( + "testing" + + "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + appsv1 "k8s.io/api/apps/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes/fake" +) + +func TestFormatConflictError(t *testing.T) { + tests := []struct { + name string + contains []string + }{ + { + name: "same datastore conflict", + contains: []string{ + "cannot start elasticsearch restore", + "another elasticsearch restore is already in progress", + "2025-01-01T12:00:00Z", + "Deployment/api-server", + "To manually remove a stuck restore lock", + "kubectl annotate", + }, + }, + { + name: "mutual exclusion conflict", + contains: []string{ + "cannot start settings restore", + "stackgraph restore is in progress", + "mutually exclusive", + "To manually remove a stuck restore lock", + }, + }, + } + + t.Run(tests[0].name, func(t *testing.T) { + err := formatConflictError( + DatastoreElasticsearch, DatastoreElasticsearch, + "Deployment", "api-server", "2025-01-01T12:00:00Z", + false, + "test-ns", "app=api", + ) + errMsg := err.Error() + for _, s := range tests[0].contains { + assert.Contains(t, errMsg, s) + } + }) + + t.Run(tests[1].name, func(t *testing.T) { + err := formatConflictError( + DatastoreSettings, DatastoreStackgraph, + "Deployment", "server", "2025-01-01T12:00:00Z", + true, + "test-ns", "app=stackgraph", + ) + errMsg := err.Error() + for _, s := range tests[1].contains { + assert.Contains(t, errMsg, s) + } + }) +} + +func TestCheckForConflicts_NoConflict(t *testing.T) { + // Create fake clientset with deployment without lock annotation + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "api", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(deployment) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + allSelectors := LabelSelectors{ + DatastoreElasticsearch: "app=api", + DatastoreStackgraph: "app=stackgraph", + DatastoreSettings: "app=settings", + } + + err := CheckForConflicts(k8sClient, "test-ns", DatastoreElasticsearch, allSelectors, log) + assert.NoError(t, err) +} + +func TestCheckForConflicts_SameDatastoreConflict(t *testing.T) { + // Create fake clientset with deployment that has lock annotation + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "api", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(deployment) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + allSelectors := LabelSelectors{ + DatastoreElasticsearch: "app=api", + DatastoreStackgraph: "app=stackgraph", + DatastoreSettings: "app=settings", + } + + err := CheckForConflicts(k8sClient, "test-ns", DatastoreElasticsearch, allSelectors, log) + require.Error(t, err) + + errMsg := err.Error() + assert.Contains(t, errMsg, "cannot start elasticsearch restore") + assert.Contains(t, errMsg, "another elasticsearch restore is already in progress") + assert.Contains(t, errMsg, "Deployment/api-server") + assert.Contains(t, errMsg, "To manually remove a stuck restore lock") +} + +func TestCheckForConflicts_MutualExclusionConflict(t *testing.T) { + // Create fake clientset with stackgraph deployment that has lock annotation + stackgraphDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "stackgraph-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "stackgraph", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: DatastoreStackgraph, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + } + + // Settings deployment without lock (the one being checked) + settingsDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "settings-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "settings", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(stackgraphDeployment, settingsDeployment) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + allSelectors := LabelSelectors{ + DatastoreElasticsearch: "app=elasticsearch", + DatastoreStackgraph: "app=stackgraph", + DatastoreSettings: "app=settings", + } + + // Try to start settings restore when stackgraph is running + err := CheckForConflicts(k8sClient, "test-ns", DatastoreSettings, allSelectors, log) + require.Error(t, err) + + errMsg := err.Error() + assert.Contains(t, errMsg, "cannot start settings restore") + assert.Contains(t, errMsg, "stackgraph restore is in progress") + assert.Contains(t, errMsg, "mutually exclusive") + assert.Contains(t, errMsg, "To manually remove a stuck restore lock") +} + +func TestCheckForConflicts_NoMutualExclusionBetweenIndependentDatastores(t *testing.T) { + // Create fake clientset with elasticsearch deployment that has lock annotation + esDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "es-master", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "elasticsearch", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + } + + // Clickhouse deployment without lock + chDeployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "clickhouse", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "clickhouse", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(esDeployment, chDeployment) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + allSelectors := LabelSelectors{ + DatastoreElasticsearch: "app=elasticsearch", + DatastoreClickhouse: "app=clickhouse", + DatastoreStackgraph: "app=stackgraph", + DatastoreSettings: "app=settings", + } + + // Clickhouse restore should succeed even though elasticsearch is running + err := CheckForConflicts(k8sClient, "test-ns", DatastoreClickhouse, allSelectors, log) + assert.NoError(t, err) +} + +func TestAcquireLock(t *testing.T) { + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "api", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(deployment) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + err := AcquireLock(k8sClient, "test-ns", "app=api", DatastoreElasticsearch, log) + require.NoError(t, err) + + // Verify lock was set + locks, err := k8sClient.GetRestoreLocks("test-ns", "app=api") + require.NoError(t, err) + require.Len(t, locks, 1) + assert.Equal(t, DatastoreElasticsearch, locks[0].Datastore) + assert.Equal(t, "Deployment", locks[0].ResourceKind) + assert.Equal(t, "api-server", locks[0].ResourceName) + assert.NotEmpty(t, locks[0].StartedAt) +} + +func TestReleaseLock(t *testing.T) { + deployment := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "api", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(deployment) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + // Verify lock exists + locks, err := k8sClient.GetRestoreLocks("test-ns", "app=api") + require.NoError(t, err) + require.Len(t, locks, 1) + + // Release lock + err = ReleaseLock(k8sClient, "test-ns", "app=api", log) + require.NoError(t, err) + + // Verify lock was removed + locks, err = k8sClient.GetRestoreLocks("test-ns", "app=api") + require.NoError(t, err) + assert.Empty(t, locks) +} + +func TestCheckForConflicts_StatefulSetLock(t *testing.T) { + // Create fake clientset with statefulset that has lock annotation + statefulSet := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "victoria-metrics", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "vm", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: DatastoreVictoriaMetrics, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + } + + fakeClientset := fake.NewSimpleClientset(statefulSet) + k8sClient := k8s.NewTestClient(fakeClientset) + log := logger.New(false, true) + + allSelectors := LabelSelectors{ + DatastoreVictoriaMetrics: "app=vm", + } + + err := CheckForConflicts(k8sClient, "test-ns", DatastoreVictoriaMetrics, allSelectors, log) + require.Error(t, err) + + errMsg := err.Error() + assert.Contains(t, errMsg, "StatefulSet/victoria-metrics") + assert.Contains(t, errMsg, "To manually remove a stuck restore lock") +} diff --git a/internal/orchestration/scale/scale.go b/internal/orchestration/scale/scale.go index 4a111cf..8e6baf0 100644 --- a/internal/orchestration/scale/scale.go +++ b/internal/orchestration/scale/scale.go @@ -10,6 +10,7 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" ) const ( @@ -61,6 +62,75 @@ func ScaleDown(k8sClient *k8s.Client, namespace, labelSelector string, log *logg return scaledApps, nil } +// ScaleDownWithLockParams contains parameters for ScaleDownWithLock +// +//nolint:revive // Keeping full name for clarity as this is a parameter struct for ScaleDownWithLock +type ScaleDownWithLockParams struct { + K8sClient *k8s.Client + Namespace string + LabelSelector string + Datastore string + AllSelectors restorelock.LabelSelectors + Log *logger.Logger +} + +// ScaleDownWithLock scales down deployments and statefulsets with restore lock protection. +// It first checks for conflicting restore operations, acquires a lock, then scales down. +// Returns the list of scaled resources that can be used for scale-up later. +// +//nolint:revive // Package name "scale" with function "ScaleDownWithLock" is intentionally verbose for clarity +func ScaleDownWithLock(params ScaleDownWithLockParams) ([]k8s.AppsScale, error) { + // Check for conflicting restore operations + if err := restorelock.CheckForConflicts( + params.K8sClient, + params.Namespace, + params.Datastore, + params.AllSelectors, + params.Log, + ); err != nil { + return nil, err + } + + // Acquire restore lock before scaling down + if err := restorelock.AcquireLock( + params.K8sClient, + params.Namespace, + params.LabelSelector, + params.Datastore, + params.Log, + ); err != nil { + return nil, err + } + + // Scale down (the lock will be released when scaling up or on cleanup) + scaledApps, err := ScaleDown(params.K8sClient, params.Namespace, params.LabelSelector, params.Log) + if err != nil { + // Release lock on scale-down failure + _ = restorelock.ReleaseLock(params.K8sClient, params.Namespace, params.LabelSelector, params.Log) + return nil, err + } + + return scaledApps, nil +} + +// ScaleUpAndReleaseLock scales up resources from annotations and releases the restore lock +// +//nolint:revive // Package name "scale" with function "ScaleUpAndReleaseLock" is intentionally verbose for clarity +func ScaleUpAndReleaseLock(k8sClient *k8s.Client, namespace, labelSelector string, log *logger.Logger) error { + // Scale up first + if err := ScaleUpFromAnnotations(k8sClient, namespace, labelSelector, log); err != nil { + return err + } + + // Release restore lock + if err := restorelock.ReleaseLock(k8sClient, namespace, labelSelector, log); err != nil { + log.Warningf("Failed to release restore lock: %v", err) + // Don't return error - scale up succeeded, lock release is secondary + } + + return nil +} + // waitForPodsToTerminate polls for pod termination until all pods matching the label selector are gone func waitForPodsToTerminate(k8sClient *k8s.Client, namespace, labelSelector string, log *logger.Logger) error { ctx := context.Background() From 560526a5f6ad2766013f1a8edc61bfdf9a2e2165 Mon Sep 17 00:00:00 2001 From: Vladimir Iliakov Date: Tue, 13 Jan 2026 23:37:52 +0100 Subject: [PATCH 2/3] STAC-24078: Addressing pr comments --- ARCHITECTURE.md | 2 +- cmd/clickhouse/restore.go | 3 +- cmd/cmdutils/common.go | 2 +- cmd/elasticsearch/restore.go | 3 +- cmd/settings/restore.go | 3 +- cmd/stackgraph/restore.go | 3 +- cmd/victoriametrics/restore.go | 3 +- internal/clients/k8s/client_test.go | 503 ++++++++++++++++ internal/foundation/config/config.go | 2 +- .../orchestration/restorelock/datastore.go | 19 +- .../restorelock/datastore_test.go | 37 +- .../orchestration/restorelock/lock_test.go | 57 +- internal/orchestration/scale/scale_test.go | 568 ++++++++++++++++++ 13 files changed, 1133 insertions(+), 72 deletions(-) diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md index 7228422..6055fe7 100644 --- a/ARCHITECTURE.md +++ b/ARCHITECTURE.md @@ -339,7 +339,7 @@ scaledApps, err := scale.ScaleDownWithLock(scale.ScaleDownWithLockParams{ K8sClient: k8sClient, Namespace: namespace, LabelSelector: selector, - Datastore: restorelock.DatastoreStackgraph, + Datastore: config.DatastoreStackgraph, AllSelectors: config.GetAllScaleDownSelectors(), Log: log, }) diff --git a/cmd/clickhouse/restore.go b/cmd/clickhouse/restore.go index 2a82256..9e6d820 100644 --- a/cmd/clickhouse/restore.go +++ b/cmd/clickhouse/restore.go @@ -10,7 +10,6 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" - "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" ) @@ -77,7 +76,7 @@ func runRestore(appCtx *app.Context) error { K8sClient: appCtx.K8sClient, Namespace: appCtx.Namespace, LabelSelector: scaleDownLabelSelector, - Datastore: restorelock.DatastoreClickhouse, + Datastore: config.DatastoreClickhouse, AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), Log: appCtx.Logger, }) diff --git a/cmd/cmdutils/common.go b/cmd/cmdutils/common.go index ddb538f..98d5a1e 100644 --- a/cmd/cmdutils/common.go +++ b/cmd/cmdutils/common.go @@ -16,7 +16,7 @@ const ( func Run(globalFlags *config.CLIGlobalFlags, runFunc func(ctx *app.Context) error, minioRequired bool) { appCtx, err := app.NewContext(globalFlags) if err != nil { - _, _ = fmt.Fprintf(os.Stderr, "error: %v\n", err) + _, _ = fmt.Fprintf(os.Stderr, "❌ Error: %v\n", err) os.Exit(1) } if minioRequired && !appCtx.Config.Minio.Enabled { diff --git a/cmd/elasticsearch/restore.go b/cmd/elasticsearch/restore.go index ce8ae22..6b1c88b 100644 --- a/cmd/elasticsearch/restore.go +++ b/cmd/elasticsearch/restore.go @@ -14,7 +14,6 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" - "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" ) @@ -99,7 +98,7 @@ func runRestore(appCtx *app.Context) error { K8sClient: appCtx.K8sClient, Namespace: appCtx.Namespace, LabelSelector: scaleDownLabelSelector, - Datastore: restorelock.DatastoreElasticsearch, + Datastore: config.DatastoreElasticsearch, AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), Log: appCtx.Logger, }) diff --git a/cmd/settings/restore.go b/cmd/settings/restore.go index b6c344e..2bd9fb5 100644 --- a/cmd/settings/restore.go +++ b/cmd/settings/restore.go @@ -12,7 +12,6 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" - "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" corev1 "k8s.io/api/core/v1" ) @@ -86,7 +85,7 @@ func runRestore(appCtx *app.Context) error { K8sClient: appCtx.K8sClient, Namespace: appCtx.Namespace, LabelSelector: scaleDownLabelSelector, - Datastore: restorelock.DatastoreSettings, + Datastore: config.DatastoreSettings, AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), Log: appCtx.Logger, }) diff --git a/cmd/stackgraph/restore.go b/cmd/stackgraph/restore.go index 04003a2..702704e 100644 --- a/cmd/stackgraph/restore.go +++ b/cmd/stackgraph/restore.go @@ -18,7 +18,6 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" - "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" corev1 "k8s.io/api/core/v1" ) @@ -92,7 +91,7 @@ func runRestore(appCtx *app.Context) error { K8sClient: appCtx.K8sClient, Namespace: appCtx.Namespace, LabelSelector: scaleDownLabelSelector, - Datastore: restorelock.DatastoreStackgraph, + Datastore: config.DatastoreStackgraph, AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), Log: appCtx.Logger, }) diff --git a/cmd/victoriametrics/restore.go b/cmd/victoriametrics/restore.go index cc84068..231eb81 100644 --- a/cmd/victoriametrics/restore.go +++ b/cmd/victoriametrics/restore.go @@ -17,7 +17,6 @@ import ( "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/portforward" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restore" - "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stackvista/stackstate-backup-cli/internal/orchestration/scale" corev1 "k8s.io/api/core/v1" ) @@ -90,7 +89,7 @@ func runRestore(appCtx *app.Context) error { K8sClient: appCtx.K8sClient, Namespace: appCtx.Namespace, LabelSelector: scaleDownLabelSelector, - Datastore: restorelock.DatastoreVictoriaMetrics, + Datastore: config.DatastoreVictoriaMetrics, AllSelectors: appCtx.Config.GetAllScaleDownSelectors(), Log: appCtx.Logger, }) diff --git a/internal/clients/k8s/client_test.go b/internal/clients/k8s/client_test.go index eb4a5a7..d9a680f 100644 --- a/internal/clients/k8s/client_test.go +++ b/internal/clients/k8s/client_test.go @@ -462,3 +462,506 @@ func createDeployment(name, namespace string, labels map[string]string, replicas }, } } + +// Helper function to create a statefulset for testing +// +//nolint:unparam // namespace parameter is always "test-ns" in current tests, but kept for flexibility +func createStatefulSet(name, namespace string, labels map[string]string, replicas int32) appsv1.StatefulSet { + return appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: namespace, + Labels: labels, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "test:latest", + }, + }, + }, + }, + }, + } +} + +//nolint:funlen // Table-driven test with comprehensive test cases +func TestClient_GetRestoreLocks(t *testing.T) { + tests := []struct { + name string + namespace string + labelSelector string + deployments []appsv1.Deployment + statefulSets []appsv1.StatefulSet + expectedLocks []RestoreLockInfo + }{ + { + name: "no locks", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3), + }, + statefulSets: []appsv1.StatefulSet{}, + expectedLocks: nil, + }, + { + name: "lock on deployment", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + func() appsv1.Deployment { + d := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "elasticsearch", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return d + }(), + }, + statefulSets: []appsv1.StatefulSet{}, + expectedLocks: []RestoreLockInfo{ + { + ResourceKind: "Deployment", + ResourceName: "deploy1", + Datastore: "elasticsearch", + StartedAt: "2025-01-01T12:00:00Z", + }, + }, + }, + { + name: "lock on statefulset", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{}, + statefulSets: []appsv1.StatefulSet{ + func() appsv1.StatefulSet { + s := createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 0) + s.Annotations = map[string]string{ + RestoreInProgressAnnotation: "victoriametrics", + RestoreStartedAtAnnotation: "2025-01-01T13:00:00Z", + } + return s + }(), + }, + expectedLocks: []RestoreLockInfo{ + { + ResourceKind: "StatefulSet", + ResourceName: "sts1", + Datastore: "victoriametrics", + StartedAt: "2025-01-01T13:00:00Z", + }, + }, + }, + { + name: "locks on both deployment and statefulset", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + func() appsv1.Deployment { + d := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "stackgraph", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return d + }(), + }, + statefulSets: []appsv1.StatefulSet{ + func() appsv1.StatefulSet { + s := createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 0) + s.Annotations = map[string]string{ + RestoreInProgressAnnotation: "stackgraph", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return s + }(), + }, + expectedLocks: []RestoreLockInfo{ + { + ResourceKind: "Deployment", + ResourceName: "deploy1", + Datastore: "stackgraph", + StartedAt: "2025-01-01T12:00:00Z", + }, + { + ResourceKind: "StatefulSet", + ResourceName: "sts1", + Datastore: "stackgraph", + StartedAt: "2025-01-01T12:00:00Z", + }, + }, + }, + { + name: "only matching label selector", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + func() appsv1.Deployment { + d := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "elasticsearch", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return d + }(), + func() appsv1.Deployment { + d := createDeployment("deploy2", "test-ns", map[string]string{"app": "other"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "elasticsearch", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return d + }(), + }, + statefulSets: []appsv1.StatefulSet{}, + expectedLocks: []RestoreLockInfo{ + { + ResourceKind: "Deployment", + ResourceName: "deploy1", + Datastore: "elasticsearch", + StartedAt: "2025-01-01T12:00:00Z", + }, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + + for _, deploy := range tt.deployments { + _, err := fakeClient.AppsV1().Deployments(tt.namespace).Create( + context.Background(), &deploy, metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + + for _, sts := range tt.statefulSets { + _, err := fakeClient.AppsV1().StatefulSets(tt.namespace).Create( + context.Background(), &sts, metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + + client := &Client{clientset: fakeClient} + + locks, err := client.GetRestoreLocks(tt.namespace, tt.labelSelector) + require.NoError(t, err) + + assert.Equal(t, len(tt.expectedLocks), len(locks)) + for i, expected := range tt.expectedLocks { + assert.Equal(t, expected.ResourceKind, locks[i].ResourceKind) + assert.Equal(t, expected.ResourceName, locks[i].ResourceName) + assert.Equal(t, expected.Datastore, locks[i].Datastore) + assert.Equal(t, expected.StartedAt, locks[i].StartedAt) + } + }) + } +} + +func TestClient_SetRestoreLock(t *testing.T) { + tests := []struct { + name string + namespace string + labelSelector string + datastore string + startedAt string + deployments []appsv1.Deployment + statefulSets []appsv1.StatefulSet + }{ + { + name: "set lock on deployment", + namespace: "test-ns", + labelSelector: "app=test", + datastore: "elasticsearch", + startedAt: "2025-01-01T12:00:00Z", + deployments: []appsv1.Deployment{ + createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3), + }, + statefulSets: []appsv1.StatefulSet{}, + }, + { + name: "set lock on statefulset", + namespace: "test-ns", + labelSelector: "app=test", + datastore: "victoriametrics", + startedAt: "2025-01-01T13:00:00Z", + deployments: []appsv1.Deployment{}, + statefulSets: []appsv1.StatefulSet{ + createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 3), + }, + }, + { + name: "set lock on multiple resources", + namespace: "test-ns", + labelSelector: "app=test", + datastore: "stackgraph", + startedAt: "2025-01-01T14:00:00Z", + deployments: []appsv1.Deployment{ + createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3), + createDeployment("deploy2", "test-ns", map[string]string{"app": "test"}, 2), + }, + statefulSets: []appsv1.StatefulSet{ + createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 1), + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + + for _, deploy := range tt.deployments { + _, err := fakeClient.AppsV1().Deployments(tt.namespace).Create( + context.Background(), &deploy, metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + + for _, sts := range tt.statefulSets { + _, err := fakeClient.AppsV1().StatefulSets(tt.namespace).Create( + context.Background(), &sts, metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + + client := &Client{clientset: fakeClient} + + err := client.SetRestoreLock(tt.namespace, tt.labelSelector, tt.datastore, tt.startedAt) + require.NoError(t, err) + + // Verify locks were set on deployments + for _, deploy := range tt.deployments { + d, err := fakeClient.AppsV1().Deployments(tt.namespace).Get( + context.Background(), deploy.Name, metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, tt.datastore, d.Annotations[RestoreInProgressAnnotation]) + assert.Equal(t, tt.startedAt, d.Annotations[RestoreStartedAtAnnotation]) + } + + // Verify locks were set on statefulsets + for _, sts := range tt.statefulSets { + s, err := fakeClient.AppsV1().StatefulSets(tt.namespace).Get( + context.Background(), sts.Name, metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, tt.datastore, s.Annotations[RestoreInProgressAnnotation]) + assert.Equal(t, tt.startedAt, s.Annotations[RestoreStartedAtAnnotation]) + } + }) + } +} + +//nolint:funlen // Table-driven test with comprehensive test cases +func TestClient_ClearRestoreLock(t *testing.T) { + tests := []struct { + name string + namespace string + labelSelector string + deployments []appsv1.Deployment + statefulSets []appsv1.StatefulSet + }{ + { + name: "clear lock from deployment", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + func() appsv1.Deployment { + d := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "elasticsearch", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return d + }(), + }, + statefulSets: []appsv1.StatefulSet{}, + }, + { + name: "clear lock from statefulset", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{}, + statefulSets: []appsv1.StatefulSet{ + func() appsv1.StatefulSet { + s := createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 0) + s.Annotations = map[string]string{ + RestoreInProgressAnnotation: "victoriametrics", + RestoreStartedAtAnnotation: "2025-01-01T13:00:00Z", + } + return s + }(), + }, + }, + { + name: "clear locks from multiple resources", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + func() appsv1.Deployment { + d := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "stackgraph", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return d + }(), + }, + statefulSets: []appsv1.StatefulSet{ + func() appsv1.StatefulSet { + s := createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 0) + s.Annotations = map[string]string{ + RestoreInProgressAnnotation: "stackgraph", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + } + return s + }(), + }, + }, + { + name: "clear lock preserves other annotations", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + func() appsv1.Deployment { + d := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 0) + d.Annotations = map[string]string{ + RestoreInProgressAnnotation: "elasticsearch", + RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + PreRestoreReplicasAnnotation: "3", + "custom-annotation": "custom-value", + } + return d + }(), + }, + statefulSets: []appsv1.StatefulSet{}, + }, + { + name: "no-op when no locks present", + namespace: "test-ns", + labelSelector: "app=test", + deployments: []appsv1.Deployment{ + createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3), + }, + statefulSets: []appsv1.StatefulSet{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + + for _, deploy := range tt.deployments { + _, err := fakeClient.AppsV1().Deployments(tt.namespace).Create( + context.Background(), &deploy, metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + + for _, sts := range tt.statefulSets { + _, err := fakeClient.AppsV1().StatefulSets(tt.namespace).Create( + context.Background(), &sts, metav1.CreateOptions{}, + ) + require.NoError(t, err) + } + + client := &Client{clientset: fakeClient} + + err := client.ClearRestoreLock(tt.namespace, tt.labelSelector) + require.NoError(t, err) + + // Verify locks were cleared from deployments + for _, deploy := range tt.deployments { + d, err := fakeClient.AppsV1().Deployments(tt.namespace).Get( + context.Background(), deploy.Name, metav1.GetOptions{}, + ) + require.NoError(t, err) + + _, hasLock := d.Annotations[RestoreInProgressAnnotation] + assert.False(t, hasLock, "restore-in-progress annotation should be removed") + + _, hasStartedAt := d.Annotations[RestoreStartedAtAnnotation] + assert.False(t, hasStartedAt, "restore-started-at annotation should be removed") + + // Verify other annotations are preserved + if deploy.Annotations != nil { + if val, ok := deploy.Annotations["custom-annotation"]; ok { + assert.Equal(t, val, d.Annotations["custom-annotation"], "custom annotations should be preserved") + } + if val, ok := deploy.Annotations[PreRestoreReplicasAnnotation]; ok { + assert.Equal(t, val, d.Annotations[PreRestoreReplicasAnnotation], "pre-restore-replicas annotation should be preserved") + } + } + } + + // Verify locks were cleared from statefulsets + for _, sts := range tt.statefulSets { + s, err := fakeClient.AppsV1().StatefulSets(tt.namespace).Get( + context.Background(), sts.Name, metav1.GetOptions{}, + ) + require.NoError(t, err) + + _, hasLock := s.Annotations[RestoreInProgressAnnotation] + assert.False(t, hasLock, "restore-in-progress annotation should be removed") + + _, hasStartedAt := s.Annotations[RestoreStartedAtAnnotation] + assert.False(t, hasStartedAt, "restore-started-at annotation should be removed") + } + }) + } +} + +func TestClient_SetAndClearRestoreLock_Integration(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + + // Create resources + deploy := createDeployment("deploy1", "test-ns", map[string]string{"app": "test"}, 3) + sts := createStatefulSet("sts1", "test-ns", map[string]string{"app": "test"}, 2) + + _, err := fakeClient.AppsV1().Deployments("test-ns").Create( + context.Background(), &deploy, metav1.CreateOptions{}, + ) + require.NoError(t, err) + + _, err = fakeClient.AppsV1().StatefulSets("test-ns").Create( + context.Background(), &sts, metav1.CreateOptions{}, + ) + require.NoError(t, err) + + client := &Client{clientset: fakeClient} + + // Verify no locks initially + locks, err := client.GetRestoreLocks("test-ns", "app=test") + require.NoError(t, err) + assert.Empty(t, locks) + + // Set locks + err = client.SetRestoreLock("test-ns", "app=test", "elasticsearch", "2025-01-01T12:00:00Z") + require.NoError(t, err) + + // Verify locks are set + locks, err = client.GetRestoreLocks("test-ns", "app=test") + require.NoError(t, err) + assert.Len(t, locks, 2) + + // Clear locks + err = client.ClearRestoreLock("test-ns", "app=test") + require.NoError(t, err) + + // Verify locks are cleared + locks, err = client.GetRestoreLocks("test-ns", "app=test") + require.NoError(t, err) + assert.Empty(t, locks) +} diff --git a/internal/foundation/config/config.go b/internal/foundation/config/config.go index 7a5c198..64e01d9 100644 --- a/internal/foundation/config/config.go +++ b/internal/foundation/config/config.go @@ -355,7 +355,7 @@ func NewCLIGlobalFlags() *CLIGlobalFlags { return &CLIGlobalFlags{} } -// Datastore identifiers (must match restorelock package constants) +// Datastore identifiers used across the application const ( DatastoreElasticsearch = "elasticsearch" DatastoreClickhouse = "clickhouse" diff --git a/internal/orchestration/restorelock/datastore.go b/internal/orchestration/restorelock/datastore.go index 000f029..efc70c4 100644 --- a/internal/orchestration/restorelock/datastore.go +++ b/internal/orchestration/restorelock/datastore.go @@ -2,14 +2,7 @@ // for the same datastore or mutually exclusive datastores. package restorelock -// Datastore identifiers used for restore lock tracking -const ( - DatastoreElasticsearch = "elasticsearch" - DatastoreClickhouse = "clickhouse" - DatastoreVictoriaMetrics = "victoriametrics" - DatastoreStackgraph = "stackgraph" - DatastoreSettings = "settings" -) +import "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" // MutualExclusionGroup identifies a group of datastores that cannot be restored concurrently. // Datastores in the same group share underlying data or have dependencies that make @@ -23,11 +16,11 @@ const ( // datastoreMutualExclusion maps each datastore to its mutual exclusion group. // Empty string means no mutual exclusion (datastore is independent). var datastoreMutualExclusion = map[string]string{ - DatastoreElasticsearch: "", // Independent - DatastoreClickhouse: "", // Independent - DatastoreVictoriaMetrics: "", // Independent - DatastoreStackgraph: ExclusionGroupStackgraph, - DatastoreSettings: ExclusionGroupStackgraph, + config.DatastoreElasticsearch: "", // Independent + config.DatastoreClickhouse: "", // Independent + config.DatastoreVictoriaMetrics: "", // Independent + config.DatastoreStackgraph: ExclusionGroupStackgraph, + config.DatastoreSettings: ExclusionGroupStackgraph, } // GetMutualExclusionGroup returns the mutual exclusion group for a datastore. diff --git a/internal/orchestration/restorelock/datastore_test.go b/internal/orchestration/restorelock/datastore_test.go index 818503b..9e48574 100644 --- a/internal/orchestration/restorelock/datastore_test.go +++ b/internal/orchestration/restorelock/datastore_test.go @@ -3,6 +3,7 @@ package restorelock import ( "testing" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stretchr/testify/assert" ) @@ -14,27 +15,27 @@ func TestGetMutualExclusionGroup(t *testing.T) { }{ { name: "elasticsearch has no exclusion group", - datastore: DatastoreElasticsearch, + datastore: config.DatastoreElasticsearch, want: "", }, { name: "clickhouse has no exclusion group", - datastore: DatastoreClickhouse, + datastore: config.DatastoreClickhouse, want: "", }, { name: "victoriametrics has no exclusion group", - datastore: DatastoreVictoriaMetrics, + datastore: config.DatastoreVictoriaMetrics, want: "", }, { name: "stackgraph has stackgraph group", - datastore: DatastoreStackgraph, + datastore: config.DatastoreStackgraph, want: ExclusionGroupStackgraph, }, { name: "settings has stackgraph group", - datastore: DatastoreSettings, + datastore: config.DatastoreSettings, want: ExclusionGroupStackgraph, }, { @@ -66,7 +67,7 @@ func TestGetDatastoresInGroup(t *testing.T) { { name: "stackgraph group contains stackgraph and settings", group: ExclusionGroupStackgraph, - want: []string{DatastoreStackgraph, DatastoreSettings}, + want: []string{config.DatastoreStackgraph, config.DatastoreSettings}, }, { name: "unknown group returns nil", @@ -96,38 +97,38 @@ func TestAreDatastoresMutuallyExclusive(t *testing.T) { }{ { name: "stackgraph and settings are mutually exclusive", - datastore1: DatastoreStackgraph, - datastore2: DatastoreSettings, + datastore1: config.DatastoreStackgraph, + datastore2: config.DatastoreSettings, want: true, }, { name: "settings and stackgraph are mutually exclusive (reversed)", - datastore1: DatastoreSettings, - datastore2: DatastoreStackgraph, + datastore1: config.DatastoreSettings, + datastore2: config.DatastoreStackgraph, want: true, }, { name: "elasticsearch and clickhouse are not mutually exclusive", - datastore1: DatastoreElasticsearch, - datastore2: DatastoreClickhouse, + datastore1: config.DatastoreElasticsearch, + datastore2: config.DatastoreClickhouse, want: false, }, { name: "stackgraph and elasticsearch are not mutually exclusive", - datastore1: DatastoreStackgraph, - datastore2: DatastoreElasticsearch, + datastore1: config.DatastoreStackgraph, + datastore2: config.DatastoreElasticsearch, want: false, }, { name: "same datastore (stackgraph) is technically mutually exclusive with itself", - datastore1: DatastoreStackgraph, - datastore2: DatastoreStackgraph, + datastore1: config.DatastoreStackgraph, + datastore2: config.DatastoreStackgraph, want: true, }, { name: "same datastore (elasticsearch) without group is not mutually exclusive", - datastore1: DatastoreElasticsearch, - datastore2: DatastoreElasticsearch, + datastore1: config.DatastoreElasticsearch, + datastore2: config.DatastoreElasticsearch, want: false, }, } diff --git a/internal/orchestration/restorelock/lock_test.go b/internal/orchestration/restorelock/lock_test.go index 28dee71..ffb6e0e 100644 --- a/internal/orchestration/restorelock/lock_test.go +++ b/internal/orchestration/restorelock/lock_test.go @@ -4,6 +4,7 @@ import ( "testing" "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -41,7 +42,7 @@ func TestFormatConflictError(t *testing.T) { t.Run(tests[0].name, func(t *testing.T) { err := formatConflictError( - DatastoreElasticsearch, DatastoreElasticsearch, + config.DatastoreElasticsearch, config.DatastoreElasticsearch, "Deployment", "api-server", "2025-01-01T12:00:00Z", false, "test-ns", "app=api", @@ -54,7 +55,7 @@ func TestFormatConflictError(t *testing.T) { t.Run(tests[1].name, func(t *testing.T) { err := formatConflictError( - DatastoreSettings, DatastoreStackgraph, + config.DatastoreSettings, config.DatastoreStackgraph, "Deployment", "server", "2025-01-01T12:00:00Z", true, "test-ns", "app=stackgraph", @@ -83,12 +84,12 @@ func TestCheckForConflicts_NoConflict(t *testing.T) { log := logger.New(false, true) allSelectors := LabelSelectors{ - DatastoreElasticsearch: "app=api", - DatastoreStackgraph: "app=stackgraph", - DatastoreSettings: "app=settings", + config.DatastoreElasticsearch: "app=api", + config.DatastoreStackgraph: "app=stackgraph", + config.DatastoreSettings: "app=settings", } - err := CheckForConflicts(k8sClient, "test-ns", DatastoreElasticsearch, allSelectors, log) + err := CheckForConflicts(k8sClient, "test-ns", config.DatastoreElasticsearch, allSelectors, log) assert.NoError(t, err) } @@ -102,7 +103,7 @@ func TestCheckForConflicts_SameDatastoreConflict(t *testing.T) { "app": "api", }, Annotations: map[string]string{ - k8s.RestoreInProgressAnnotation: DatastoreElasticsearch, + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", }, }, @@ -113,12 +114,12 @@ func TestCheckForConflicts_SameDatastoreConflict(t *testing.T) { log := logger.New(false, true) allSelectors := LabelSelectors{ - DatastoreElasticsearch: "app=api", - DatastoreStackgraph: "app=stackgraph", - DatastoreSettings: "app=settings", + config.DatastoreElasticsearch: "app=api", + config.DatastoreStackgraph: "app=stackgraph", + config.DatastoreSettings: "app=settings", } - err := CheckForConflicts(k8sClient, "test-ns", DatastoreElasticsearch, allSelectors, log) + err := CheckForConflicts(k8sClient, "test-ns", config.DatastoreElasticsearch, allSelectors, log) require.Error(t, err) errMsg := err.Error() @@ -138,7 +139,7 @@ func TestCheckForConflicts_MutualExclusionConflict(t *testing.T) { "app": "stackgraph", }, Annotations: map[string]string{ - k8s.RestoreInProgressAnnotation: DatastoreStackgraph, + k8s.RestoreInProgressAnnotation: config.DatastoreStackgraph, k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", }, }, @@ -160,13 +161,13 @@ func TestCheckForConflicts_MutualExclusionConflict(t *testing.T) { log := logger.New(false, true) allSelectors := LabelSelectors{ - DatastoreElasticsearch: "app=elasticsearch", - DatastoreStackgraph: "app=stackgraph", - DatastoreSettings: "app=settings", + config.DatastoreElasticsearch: "app=elasticsearch", + config.DatastoreStackgraph: "app=stackgraph", + config.DatastoreSettings: "app=settings", } // Try to start settings restore when stackgraph is running - err := CheckForConflicts(k8sClient, "test-ns", DatastoreSettings, allSelectors, log) + err := CheckForConflicts(k8sClient, "test-ns", config.DatastoreSettings, allSelectors, log) require.Error(t, err) errMsg := err.Error() @@ -186,7 +187,7 @@ func TestCheckForConflicts_NoMutualExclusionBetweenIndependentDatastores(t *test "app": "elasticsearch", }, Annotations: map[string]string{ - k8s.RestoreInProgressAnnotation: DatastoreElasticsearch, + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", }, }, @@ -208,14 +209,14 @@ func TestCheckForConflicts_NoMutualExclusionBetweenIndependentDatastores(t *test log := logger.New(false, true) allSelectors := LabelSelectors{ - DatastoreElasticsearch: "app=elasticsearch", - DatastoreClickhouse: "app=clickhouse", - DatastoreStackgraph: "app=stackgraph", - DatastoreSettings: "app=settings", + config.DatastoreElasticsearch: "app=elasticsearch", + config.DatastoreClickhouse: "app=clickhouse", + config.DatastoreStackgraph: "app=stackgraph", + config.DatastoreSettings: "app=settings", } // Clickhouse restore should succeed even though elasticsearch is running - err := CheckForConflicts(k8sClient, "test-ns", DatastoreClickhouse, allSelectors, log) + err := CheckForConflicts(k8sClient, "test-ns", config.DatastoreClickhouse, allSelectors, log) assert.NoError(t, err) } @@ -234,14 +235,14 @@ func TestAcquireLock(t *testing.T) { k8sClient := k8s.NewTestClient(fakeClientset) log := logger.New(false, true) - err := AcquireLock(k8sClient, "test-ns", "app=api", DatastoreElasticsearch, log) + err := AcquireLock(k8sClient, "test-ns", "app=api", config.DatastoreElasticsearch, log) require.NoError(t, err) // Verify lock was set locks, err := k8sClient.GetRestoreLocks("test-ns", "app=api") require.NoError(t, err) require.Len(t, locks, 1) - assert.Equal(t, DatastoreElasticsearch, locks[0].Datastore) + assert.Equal(t, config.DatastoreElasticsearch, locks[0].Datastore) assert.Equal(t, "Deployment", locks[0].ResourceKind) assert.Equal(t, "api-server", locks[0].ResourceName) assert.NotEmpty(t, locks[0].StartedAt) @@ -256,7 +257,7 @@ func TestReleaseLock(t *testing.T) { "app": "api", }, Annotations: map[string]string{ - k8s.RestoreInProgressAnnotation: DatastoreElasticsearch, + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", }, }, @@ -291,7 +292,7 @@ func TestCheckForConflicts_StatefulSetLock(t *testing.T) { "app": "vm", }, Annotations: map[string]string{ - k8s.RestoreInProgressAnnotation: DatastoreVictoriaMetrics, + k8s.RestoreInProgressAnnotation: config.DatastoreVictoriaMetrics, k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", }, }, @@ -302,10 +303,10 @@ func TestCheckForConflicts_StatefulSetLock(t *testing.T) { log := logger.New(false, true) allSelectors := LabelSelectors{ - DatastoreVictoriaMetrics: "app=vm", + config.DatastoreVictoriaMetrics: "app=vm", } - err := CheckForConflicts(k8sClient, "test-ns", DatastoreVictoriaMetrics, allSelectors, log) + err := CheckForConflicts(k8sClient, "test-ns", config.DatastoreVictoriaMetrics, allSelectors, log) require.Error(t, err) errMsg := err.Error() diff --git a/internal/orchestration/scale/scale_test.go b/internal/orchestration/scale/scale_test.go index 9a76bf1..2ee83f8 100644 --- a/internal/orchestration/scale/scale_test.go +++ b/internal/orchestration/scale/scale_test.go @@ -6,7 +6,9 @@ import ( "time" "github.com/stackvista/stackstate-backup-cli/internal/clients/k8s" + "github.com/stackvista/stackstate-backup-cli/internal/foundation/config" "github.com/stackvista/stackstate-backup-cli/internal/foundation/logger" + "github.com/stackvista/stackstate-backup-cli/internal/orchestration/restorelock" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" appsv1 "k8s.io/api/apps/v1" @@ -585,3 +587,569 @@ func createPod(name string, labels map[string]string, phase corev1.PodPhase) cor }, } } + +// Helper function to create a statefulset for testing +func createStatefulSet(name string, labels map[string]string, replicas int32) appsv1.StatefulSet { + return appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "test-ns", + Labels: labels, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: &replicas, + Selector: &metav1.LabelSelector{ + MatchLabels: labels, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: labels, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "test-container", + Image: "test:latest", + }, + }, + }, + }, + }, + } +} + +// TestScaleDownWithLock_Success tests successful lock acquisition and scale down +func TestScaleDownWithLock_Success(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Create test deployment + deploy := createDeployment("api-server", map[string]string{"app": "test"}, 3) + _, err := fakeClient.AppsV1().Deployments("test-ns").Create( + context.Background(), &deploy, metav1.CreateOptions{}, + ) + require.NoError(t, err) + + allSelectors := restorelock.LabelSelectors{ + config.DatastoreElasticsearch: "app=test", + } + + // Execute scale down with lock + scaledApps, err := ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=test", + Datastore: config.DatastoreElasticsearch, + AllSelectors: allSelectors, + Log: log, + }) + + // Assertions + require.NoError(t, err) + require.Len(t, scaledApps, 1) + assert.Equal(t, "api-server", scaledApps[0].Name) + assert.Equal(t, int32(3), scaledApps[0].Replicas) + + // Verify deployment was scaled to 0 + deployAfter, err := fakeClient.AppsV1().Deployments("test-ns").Get( + context.Background(), "api-server", metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, int32(0), *deployAfter.Spec.Replicas) + + // Verify lock was acquired on deployment + assert.Equal(t, config.DatastoreElasticsearch, deployAfter.Annotations[k8s.RestoreInProgressAnnotation]) + assert.NotEmpty(t, deployAfter.Annotations[k8s.RestoreStartedAtAnnotation]) +} + +// TestScaleDownWithLock_ConflictSameDatastore tests conflict detection for same datastore +func TestScaleDownWithLock_ConflictSameDatastore(t *testing.T) { + // Create deployment with existing lock + deploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "test", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), + }, + } + + fakeClient := fake.NewSimpleClientset(deploy) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + allSelectors := restorelock.LabelSelectors{ + config.DatastoreElasticsearch: "app=test", + } + + // Execute scale down with lock - should fail due to existing lock + scaledApps, err := ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=test", + Datastore: config.DatastoreElasticsearch, + AllSelectors: allSelectors, + Log: log, + }) + + // Assertions + require.Error(t, err) + assert.Nil(t, scaledApps) + assert.Contains(t, err.Error(), "cannot start elasticsearch restore") + assert.Contains(t, err.Error(), "another elasticsearch restore is already in progress") +} + +// TestScaleDownWithLock_MutualExclusionConflict tests mutual exclusion between stackgraph and settings +func TestScaleDownWithLock_MutualExclusionConflict(t *testing.T) { + // Create stackgraph deployment with existing lock + stackgraphDeploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "stackgraph-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "stackgraph", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: config.DatastoreStackgraph, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), + }, + } + + // Settings deployment without lock + settingsDeploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "settings-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "settings", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), + }, + } + + fakeClient := fake.NewSimpleClientset(stackgraphDeploy, settingsDeploy) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + allSelectors := restorelock.LabelSelectors{ + config.DatastoreStackgraph: "app=stackgraph", + config.DatastoreSettings: "app=settings", + } + + // Try to start settings restore when stackgraph is running + scaledApps, err := ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=settings", + Datastore: config.DatastoreSettings, + AllSelectors: allSelectors, + Log: log, + }) + + // Assertions + require.Error(t, err) + assert.Nil(t, scaledApps) + assert.Contains(t, err.Error(), "cannot start settings restore") + assert.Contains(t, err.Error(), "stackgraph restore is in progress") + assert.Contains(t, err.Error(), "mutually exclusive") +} + +// TestScaleDownWithLock_NoConflictIndependentDatastores tests that independent datastores don't block each other +func TestScaleDownWithLock_NoConflictIndependentDatastores(t *testing.T) { + // Create elasticsearch deployment with existing lock + esDeploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "es-master", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "elasticsearch", + }, + Annotations: map[string]string{ + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), + }, + } + + // Clickhouse deployment without lock + chDeploy := createDeployment("clickhouse", map[string]string{"app": "clickhouse"}, 2) + + fakeClient := fake.NewSimpleClientset(esDeploy, &chDeploy) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + allSelectors := restorelock.LabelSelectors{ + config.DatastoreElasticsearch: "app=elasticsearch", + config.DatastoreClickhouse: "app=clickhouse", + } + + // Clickhouse restore should succeed even though elasticsearch is running + scaledApps, err := ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=clickhouse", + Datastore: config.DatastoreClickhouse, + AllSelectors: allSelectors, + Log: log, + }) + + // Assertions + require.NoError(t, err) + assert.Len(t, scaledApps, 1) + assert.Equal(t, "clickhouse", scaledApps[0].Name) +} + +// TestScaleDownWithLock_WithStatefulSet tests lock acquisition with statefulsets +func TestScaleDownWithLock_WithStatefulSet(t *testing.T) { + // Setup fake client and create statefulset + sts := createStatefulSet("victoria-metrics", map[string]string{"app": "vm"}, 2) + fakeClient := fake.NewSimpleClientset(&sts) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + allSelectors := restorelock.LabelSelectors{config.DatastoreVictoriaMetrics: "app=vm"} + + // Execute scale down with lock and verify success + scaledApps, err := ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=vm", + Datastore: config.DatastoreVictoriaMetrics, + AllSelectors: allSelectors, + Log: log, + }) + require.NoError(t, err) + require.Len(t, scaledApps, 1) + assert.Equal(t, "victoria-metrics", scaledApps[0].Name) + + // Verify statefulset state after scale down + stsAfter, err := fakeClient.AppsV1().StatefulSets("test-ns").Get( + context.Background(), "victoria-metrics", metav1.GetOptions{}, + ) + require.NoError(t, err) + + // StatefulSet-specific assertions + assert.Equal(t, int32(0), *stsAfter.Spec.Replicas, "StatefulSet should be scaled to 0") + assert.Equal(t, config.DatastoreVictoriaMetrics, stsAfter.Annotations[k8s.RestoreInProgressAnnotation]) + assert.Equal(t, "2", stsAfter.Annotations[k8s.PreRestoreReplicasAnnotation], "Original replica count should be saved") +} + +// TestScaleUpAndReleaseLock_Success tests successful scale up and lock release +func TestScaleUpAndReleaseLock_Success(t *testing.T) { + // Create deployment at scale 0 with annotations (lock and replicas) + deploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "test", + }, + Annotations: map[string]string{ + k8s.PreRestoreReplicasAnnotation: "3", + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), // 0 replicas + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test", Image: "test:latest"}, + }, + }, + }, + }, + } + + fakeClient := fake.NewSimpleClientset(deploy) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Verify lock exists before scale up + locks, err := client.GetRestoreLocks("test-ns", "app=test") + require.NoError(t, err) + require.Len(t, locks, 1) + + // Execute scale up and release lock + err = ScaleUpAndReleaseLock(client, "test-ns", "app=test", log) + + // Assertions + require.NoError(t, err) + + // Verify deployment was scaled up + deployAfter, err := fakeClient.AppsV1().Deployments("test-ns").Get( + context.Background(), "api-server", metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, int32(3), *deployAfter.Spec.Replicas) + + // Verify pre-restore annotation was removed + _, exists := deployAfter.Annotations[k8s.PreRestoreReplicasAnnotation] + assert.False(t, exists) + + // Verify lock was released + locks, err = client.GetRestoreLocks("test-ns", "app=test") + require.NoError(t, err) + assert.Empty(t, locks) +} + +// TestScaleUpAndReleaseLock_WithStatefulSet tests scale up and lock release with statefulset +func TestScaleUpAndReleaseLock_WithStatefulSet(t *testing.T) { + // Create statefulset at scale 0 with annotations + sts := &appsv1.StatefulSet{ + ObjectMeta: metav1.ObjectMeta{ + Name: "victoria-metrics", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "vm", + }, + Annotations: map[string]string{ + k8s.PreRestoreReplicasAnnotation: "2", + k8s.RestoreInProgressAnnotation: config.DatastoreVictoriaMetrics, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + Spec: appsv1.StatefulSetSpec{ + Replicas: new(int32), // 0 replicas + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "vm"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "vm"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test", Image: "test:latest"}, + }, + }, + }, + }, + } + + fakeClient := fake.NewSimpleClientset(sts) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Execute scale up and release lock + err := ScaleUpAndReleaseLock(client, "test-ns", "app=vm", log) + + // Assertions + require.NoError(t, err) + + // Verify statefulset was scaled up + stsAfter, err := fakeClient.AppsV1().StatefulSets("test-ns").Get( + context.Background(), "victoria-metrics", metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, int32(2), *stsAfter.Spec.Replicas) + + // Verify lock was released + locks, err := client.GetRestoreLocks("test-ns", "app=vm") + require.NoError(t, err) + assert.Empty(t, locks) +} + +// TestScaleUpAndReleaseLock_NoLockToRelease tests scale up when no lock exists +func TestScaleUpAndReleaseLock_NoLockToRelease(t *testing.T) { + // Create deployment with pre-restore annotation but no lock + deploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "test", + }, + Annotations: map[string]string{ + k8s.PreRestoreReplicasAnnotation: "3", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), // 0 replicas + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test", Image: "test:latest"}, + }, + }, + }, + }, + } + + fakeClient := fake.NewSimpleClientset(deploy) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Execute scale up and release lock - should succeed even with no lock + err := ScaleUpAndReleaseLock(client, "test-ns", "app=test", log) + + // Assertions - should succeed (release lock is non-blocking) + require.NoError(t, err) + + // Verify deployment was scaled up + deployAfter, err := fakeClient.AppsV1().Deployments("test-ns").Get( + context.Background(), "api-server", metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, int32(3), *deployAfter.Spec.Replicas) +} + +// TestScaleUpAndReleaseLock_ScaleUpError tests that error is returned when scale up fails +func TestScaleUpAndReleaseLock_ScaleUpError(t *testing.T) { + // Create deployment with invalid annotation value + deploy := &appsv1.Deployment{ + ObjectMeta: metav1.ObjectMeta{ + Name: "api-server", + Namespace: "test-ns", + Labels: map[string]string{ + "app": "test", + }, + Annotations: map[string]string{ + k8s.PreRestoreReplicasAnnotation: "invalid", + k8s.RestoreInProgressAnnotation: config.DatastoreElasticsearch, + k8s.RestoreStartedAtAnnotation: "2025-01-01T12:00:00Z", + }, + }, + Spec: appsv1.DeploymentSpec{ + Replicas: new(int32), + Selector: &metav1.LabelSelector{ + MatchLabels: map[string]string{"app": "test"}, + }, + Template: corev1.PodTemplateSpec{ + ObjectMeta: metav1.ObjectMeta{ + Labels: map[string]string{"app": "test"}, + }, + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + {Name: "test", Image: "test:latest"}, + }, + }, + }, + }, + } + + fakeClient := fake.NewSimpleClientset(deploy) + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Execute scale up - should fail due to invalid annotation + err := ScaleUpAndReleaseLock(client, "test-ns", "app=test", log) + + // Assertions + require.Error(t, err) + assert.Contains(t, err.Error(), "failed to parse replicas annotation") + + // Verify lock was NOT released (scale up failed before release) + locks, err := client.GetRestoreLocks("test-ns", "app=test") + require.NoError(t, err) + assert.Len(t, locks, 1) // Lock should still exist +} + +// TestScaleDownWithLock_FullCycle tests the complete lock/scale-down/restore/scale-up/unlock cycle +func TestScaleDownWithLock_FullCycle(t *testing.T) { + fakeClient := fake.NewSimpleClientset() + client := k8s.NewTestClient(fakeClient) + log := logger.New(true, false) + + // Create test deployment + deploy := createDeployment("api-server", map[string]string{"app": "test"}, 3) + _, err := fakeClient.AppsV1().Deployments("test-ns").Create( + context.Background(), &deploy, metav1.CreateOptions{}, + ) + require.NoError(t, err) + + allSelectors := restorelock.LabelSelectors{ + config.DatastoreElasticsearch: "app=test", + } + + // Step 1: Scale down with lock + scaledApps, err := ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=test", + Datastore: config.DatastoreElasticsearch, + AllSelectors: allSelectors, + Log: log, + }) + require.NoError(t, err) + assert.Len(t, scaledApps, 1) + + // Verify deployment is scaled down and locked + deployMid, err := fakeClient.AppsV1().Deployments("test-ns").Get( + context.Background(), "api-server", metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, int32(0), *deployMid.Spec.Replicas) + assert.Equal(t, config.DatastoreElasticsearch, deployMid.Annotations[k8s.RestoreInProgressAnnotation]) + assert.Equal(t, "3", deployMid.Annotations[k8s.PreRestoreReplicasAnnotation]) + + // Verify a second restore attempt would be blocked + _, err = ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=test", + Datastore: config.DatastoreElasticsearch, + AllSelectors: allSelectors, + Log: log, + }) + require.Error(t, err) + assert.Contains(t, err.Error(), "another elasticsearch restore is already in progress") + + // Step 2: Scale up and release lock + err = ScaleUpAndReleaseLock(client, "test-ns", "app=test", log) + require.NoError(t, err) + + // Verify deployment is scaled back up and lock is released + deployFinal, err := fakeClient.AppsV1().Deployments("test-ns").Get( + context.Background(), "api-server", metav1.GetOptions{}, + ) + require.NoError(t, err) + assert.Equal(t, int32(3), *deployFinal.Spec.Replicas) + _, hasLock := deployFinal.Annotations[k8s.RestoreInProgressAnnotation] + assert.False(t, hasLock) + _, hasReplicas := deployFinal.Annotations[k8s.PreRestoreReplicasAnnotation] + assert.False(t, hasReplicas) + + // Verify a new restore can now be started + scaledApps, err = ScaleDownWithLock(ScaleDownWithLockParams{ + K8sClient: client, + Namespace: "test-ns", + LabelSelector: "app=test", + Datastore: config.DatastoreElasticsearch, + AllSelectors: allSelectors, + Log: log, + }) + require.NoError(t, err) + assert.Len(t, scaledApps, 1) +} From 76add40afb9e2bb893ff6d4ed63b021cf8ac06b9 Mon Sep 17 00:00:00 2001 From: Vladimir Iliakov Date: Wed, 14 Jan 2026 10:37:49 +0100 Subject: [PATCH 3/3] STAC-24078: Addressing pr comments --- internal/orchestration/scale/scale.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/internal/orchestration/scale/scale.go b/internal/orchestration/scale/scale.go index 8e6baf0..107a9ae 100644 --- a/internal/orchestration/scale/scale.go +++ b/internal/orchestration/scale/scale.go @@ -106,7 +106,17 @@ func ScaleDownWithLock(params ScaleDownWithLockParams) ([]k8s.AppsScale, error) scaledApps, err := ScaleDown(params.K8sClient, params.Namespace, params.LabelSelector, params.Log) if err != nil { // Release lock on scale-down failure - _ = restorelock.ReleaseLock(params.K8sClient, params.Namespace, params.LabelSelector, params.Log) + releaseLockErr := restorelock.ReleaseLock(params.K8sClient, params.Namespace, params.LabelSelector, params.Log) + if releaseLockErr != nil { + params.Log.Errorf("Failed to release lock for scale down deployments: %s", releaseLockErr.Error()) + params.Log.Warningf("To manually remove a restore lock, run:\n"+ + " kubectl annotate deployment,statefulset -l %s %s- %s- -n %s", + params.LabelSelector, + k8s.RestoreInProgressAnnotation, + k8s.RestoreStartedAtAnnotation, + params.Namespace, + ) + } return nil, err }