From 100b5af5cf6658b6d04d71e449e99a8838a9c57a Mon Sep 17 00:00:00 2001 From: Aleksandr Aleksandrov Date: Tue, 25 Nov 2025 16:58:22 +0200 Subject: [PATCH] test(e2e): add CVP selector matching regression tests Regression tests for issue #201 / PR #208 - verify CVP is validated only against CVA with matching selector. --- test/e2e/framework/framework.go | 73 ++++++++++ test/e2e/framework/kubectl/client.go | 18 +++ test/e2e/selector_matching_e2e_test.go | 132 ++++++++++++++++++ .../e2e/testdata/selector-matching/agent.yaml | 7 + .../selector-matching/cva-matching.yaml | 12 ++ .../selector-matching/cva-no-selector.yaml | 9 ++ .../selector-matching/cva-non-matching.yaml | 13 ++ .../selector-matching/cvp-no-labels.yaml | 24 ++++ .../selector-matching/cvp-with-labels.yaml | 26 ++++ 9 files changed, 314 insertions(+) create mode 100644 test/e2e/selector_matching_e2e_test.go create mode 100644 test/e2e/testdata/selector-matching/agent.yaml create mode 100644 test/e2e/testdata/selector-matching/cva-matching.yaml create mode 100644 test/e2e/testdata/selector-matching/cva-no-selector.yaml create mode 100644 test/e2e/testdata/selector-matching/cva-non-matching.yaml create mode 100644 test/e2e/testdata/selector-matching/cvp-no-labels.yaml create mode 100644 test/e2e/testdata/selector-matching/cvp-with-labels.yaml diff --git a/test/e2e/framework/framework.go b/test/e2e/framework/framework.go index 1b46161..ffece64 100644 --- a/test/e2e/framework/framework.go +++ b/test/e2e/framework/framework.go @@ -702,6 +702,50 @@ func (f *Framework) VerifyAggregatorHasPipeline(aggregatorName, pipelineName str return nil } +// VerifyAggregatorHasClusterPipeline verifies that an aggregator Secret contains the specified ClusterVectorPipeline +func (f *Framework) VerifyAggregatorHasClusterPipeline(aggregatorName, pipelineName string) error { + // Get the aggregator's vector config from the Secret + // The config is stored in a Secret with name pattern: {aggregatorName}-aggregator + secretName := fmt.Sprintf("%s-aggregator", aggregatorName) + + // Get base64-encoded config from Secret + encodedConfig, err := f.kubectl.GetWithJsonPath("secret", secretName, ".data['config\\.json']") + if err != nil { + return fmt.Errorf("failed to get aggregator secret %s: %w", secretName, err) + } + + if encodedConfig == "" { + return fmt.Errorf("aggregator secret %s has no config.json data", secretName) + } + + // Check size before decoding to prevent DoS via large payloads + maxEncodedSize := MaxConfigSize * 4 / 3 + if len(encodedConfig) > maxEncodedSize { + return fmt.Errorf("config too large: %d bytes (max %d bytes)", len(encodedConfig), maxEncodedSize) + } + + // Decode base64 + configBytes, err := base64.StdEncoding.DecodeString(encodedConfig) + if err != nil { + return fmt.Errorf("failed to decode base64 config from secret %s: %w", secretName, err) + } + config := string(configBytes) + + if config == "" { + return fmt.Errorf("aggregator %s config is empty after decoding", aggregatorName) + } + + // Check if the cluster pipeline name appears in the config + // ClusterVectorPipeline components are prefixed with only pipelinename- (no namespace prefix) + expectedPrefix := fmt.Sprintf("%s-", pipelineName) + if !strings.Contains(config, expectedPrefix) { + return fmt.Errorf("cluster pipeline %s not found in aggregator %s config (expected prefix: %s)", + pipelineName, aggregatorName, expectedPrefix) + } + + return nil +} + // ApplyTestDataWithVars loads and applies a test manifest with variable substitution func (f *Framework) ApplyTestDataWithVars(path string, vars map[string]string) { By(fmt.Sprintf("applying test data with vars: %s", path)) @@ -728,6 +772,17 @@ func (f *Framework) DeleteResource(kind, name string) { Expect(err).NotTo(HaveOccurred(), "Failed to delete %s %s in namespace %s", kind, name, f.namespace) } +// DeleteClusterResource deletes a cluster-scoped Kubernetes resource (no namespace) +func (f *Framework) DeleteClusterResource(kind, name string) { + By(fmt.Sprintf("deleting cluster resource %s %s", kind, name)) + client := kubectl.NewClient("") + err := client.DeleteClusterScoped(kind, name) + if err != nil { + // Log warning but don't fail - resource might already be deleted + GinkgoWriter.Printf("Warning: failed to delete %s %s: %v\n", kind, name, err) + } +} + // WaitForPodReadyInNamespace waits for a pod to become ready in a specific namespace func (f *Framework) WaitForPodReadyInNamespace(podName, namespace string) { By(fmt.Sprintf("waiting for pod %s to be ready in namespace %s", podName, namespace)) @@ -779,6 +834,24 @@ func (f *Framework) WaitForClusterPipelineValid(name string) { "ClusterVectorPipeline %s did not become valid", name) } +// WaitForClusterPipelineInvalid waits for a ClusterVectorPipeline to become invalid (for negative tests) +func (f *Framework) WaitForClusterPipelineInvalid(name string) { + By(fmt.Sprintf("waiting for ClusterVectorPipeline %s to become invalid", name)) + start := time.Now() + defer func() { + duration := time.Since(start) + GinkgoWriter.Printf("⏱️ ClusterVectorPipeline %s invalidated in %v\n", name, duration) + }() + + // ClusterVectorPipeline is cluster-scoped, so we use a client without namespace + client := kubectl.NewClient("") + Eventually(func() string { + result, _ := client.GetWithJsonPath("clustervectorpipeline", name, ".status.configCheckResult") + return result + }, config.PipelineValidTimeout, config.DefaultPollInterval).Should(Equal("false"), + "ClusterVectorPipeline %s did not become invalid", name) +} + // GetClusterPipelineAnnotation retrieves a specific annotation from a ClusterVectorPipeline func (f *Framework) GetClusterPipelineAnnotation(name, annotationKey string) string { jsonPath := fmt.Sprintf(".metadata.annotations['%s']", annotationKey) diff --git a/test/e2e/framework/kubectl/client.go b/test/e2e/framework/kubectl/client.go index 95e2425..6dc35df 100644 --- a/test/e2e/framework/kubectl/client.go +++ b/test/e2e/framework/kubectl/client.go @@ -237,6 +237,24 @@ func (c *Client) Delete(resourceType, name string) error { return err } +// DeleteClusterScoped deletes a cluster-scoped resource (no namespace) +func (c *Client) DeleteClusterScoped(resourceType, name string) error { + // Validate parameters to prevent command injection + if err := ValidateResourceType(resourceType); err != nil { + return fmt.Errorf("resource type validation failed: %w", err) + } + if err := ValidateResourceName(name); err != nil { + return fmt.Errorf("resource name validation failed: %w", err) + } + + // Log command for audit and reproducibility + log.Printf("KUBECTL_CMD: kubectl delete %s %s --ignore-not-found", resourceType, name) + + cmd := exec.Command("kubectl", "delete", resourceType, name, "--ignore-not-found") + _, err := utils.Run(cmd) + return err +} + // CreateNamespace creates a namespace func CreateNamespace(name string) error { // Validate namespace to prevent command injection diff --git a/test/e2e/selector_matching_e2e_test.go b/test/e2e/selector_matching_e2e_test.go new file mode 100644 index 0000000..8219f8f --- /dev/null +++ b/test/e2e/selector_matching_e2e_test.go @@ -0,0 +1,132 @@ +/* +Copyright 2024. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2e + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/kaasops/vector-operator/test/e2e/framework" + "github.com/kaasops/vector-operator/test/e2e/framework/config" +) + +// Resource names used in selector matching tests +const ( + selectorTestAgent = "test-agent" + labeledPipelineName = "labeled-pipeline" + unlabeledPipelineName = "unlabeled-pipeline" + matchingAggregatorName = "matching-aggregator" + nonMatchingAggregatorName = "non-matching-aggregator" + noSelectorAggregatorName = "no-selector-aggregator" + matchingAggregatorDeployment = matchingAggregatorName + "-aggregator" + nonMatchingAggregatorDeploy = nonMatchingAggregatorName + "-aggregator" + noSelectorAggregatorDeploy = noSelectorAggregatorName + "-aggregator" +) + +// Selector Matching tests verify that ClusterVectorPipeline is validated only against +// ClusterVectorAggregator instances whose selector matches the pipeline's labels. +// This is a regression test for issue #201 / PR #208. +// +// The bug was: configcheck validated CVP against ALL CVA instances instead of only +// those whose selector matches the CVP's labels. This caused validation failures +// when a CVP used features/config only available on specific aggregators. +var _ = Describe("Selector Matching", Label(config.LabelSmoke, config.LabelFast), Ordered, func() { + f := framework.NewUniqueFramework("test-selector-matching") + + BeforeAll(func() { + f.Setup() + + By("deploying Vector Agent") + f.ApplyTestData("selector-matching/agent.yaml") + + // Give controller time to process Vector CR and create DaemonSet + time.Sleep(5 * time.Second) + }) + + AfterAll(func() { + // Clean up cluster-scoped resources + By("cleaning up ClusterVectorPipelines") + f.DeleteClusterResource("clustervectorpipeline", labeledPipelineName) + f.DeleteClusterResource("clustervectorpipeline", unlabeledPipelineName) + + By("cleaning up ClusterVectorAggregators") + f.DeleteClusterResource("clustervectoraggregator", matchingAggregatorName) + f.DeleteClusterResource("clustervectoraggregator", nonMatchingAggregatorName) + f.DeleteClusterResource("clustervectoraggregator", noSelectorAggregatorName) + + f.Teardown() + f.PrintMetrics() + }) + + Context("CVP with labels matching CVA selector", func() { + It("should validate CVP only against matching CVA", func() { + By("deploying ClusterVectorAggregator with matching selector (team: platform)") + f.ApplyTestData("selector-matching/cva-matching.yaml") + f.WaitForDeploymentReady(matchingAggregatorDeployment) + + By("deploying ClusterVectorAggregator with non-matching selector (team: backend)") + f.ApplyTestData("selector-matching/cva-non-matching.yaml") + f.WaitForDeploymentReady(nonMatchingAggregatorDeploy) + + By("creating ClusterVectorPipeline with label team: platform") + f.ApplyTestDataWithoutNamespaceReplacement("selector-matching/cvp-with-labels.yaml") + + By("waiting for CVP to become valid") + // The pipeline should be valid because it matches "matching-aggregator" + // Before the fix in PR #208, the pipeline would be validated against ALL aggregators, + // potentially causing validation failures against non-matching aggregators + f.WaitForClusterPipelineValid(labeledPipelineName) + + By("verifying CVP role is agent (kubernetes_logs source)") + role := f.GetClusterPipelineStatus(labeledPipelineName, "role") + Expect(role).To(Equal("agent"), "Pipeline with kubernetes_logs source should have agent role") + + By("verifying CVP is processed by agent") + Eventually(func() error { + return f.VerifyAgentHasClusterPipeline(selectorTestAgent, labeledPipelineName) + }, config.ServiceCreateTimeout, config.DefaultPollInterval).Should(Succeed(), + "Pipeline should be in agent's config") + }) + }) + + Context("CVP without labels with CVA without selector", func() { + It("should validate CVP against CVA without selector", func() { + By("deploying ClusterVectorAggregator without selector") + f.ApplyTestData("selector-matching/cva-no-selector.yaml") + f.WaitForDeploymentReady(noSelectorAggregatorDeploy) + + By("creating ClusterVectorPipeline without labels") + f.ApplyTestDataWithoutNamespaceReplacement("selector-matching/cvp-no-labels.yaml") + + By("waiting for CVP to become valid") + // Pipeline without labels should match aggregator without selector + f.WaitForClusterPipelineValid(unlabeledPipelineName) + + By("verifying CVP role is agent (kubernetes_logs source)") + role := f.GetClusterPipelineStatus(unlabeledPipelineName, "role") + Expect(role).To(Equal("agent"), "Pipeline with kubernetes_logs source should have agent role") + + By("verifying CVP is processed by agent") + Eventually(func() error { + return f.VerifyAgentHasClusterPipeline(selectorTestAgent, unlabeledPipelineName) + }, config.ServiceCreateTimeout, config.DefaultPollInterval).Should(Succeed(), + "Pipeline should be in agent's config") + }) + }) +}) diff --git a/test/e2e/testdata/selector-matching/agent.yaml b/test/e2e/testdata/selector-matching/agent.yaml new file mode 100644 index 0000000..0d9fec7 --- /dev/null +++ b/test/e2e/testdata/selector-matching/agent.yaml @@ -0,0 +1,7 @@ +apiVersion: observability.kaasops.io/v1alpha1 +kind: Vector +metadata: + name: test-agent +spec: + agent: + image: timberio/vector:0.40.0-alpine diff --git a/test/e2e/testdata/selector-matching/cva-matching.yaml b/test/e2e/testdata/selector-matching/cva-matching.yaml new file mode 100644 index 0000000..700354d --- /dev/null +++ b/test/e2e/testdata/selector-matching/cva-matching.yaml @@ -0,0 +1,12 @@ +# ClusterVectorAggregator with selector that MATCHES pipeline labels +apiVersion: observability.kaasops.io/v1alpha1 +kind: ClusterVectorAggregator +metadata: + name: matching-aggregator +spec: + resourceNamespace: NAMESPACE + image: timberio/vector:0.40.0-alpine + replicas: 1 + selector: + matchLabels: + team: platform diff --git a/test/e2e/testdata/selector-matching/cva-no-selector.yaml b/test/e2e/testdata/selector-matching/cva-no-selector.yaml new file mode 100644 index 0000000..6a9f907 --- /dev/null +++ b/test/e2e/testdata/selector-matching/cva-no-selector.yaml @@ -0,0 +1,9 @@ +# ClusterVectorAggregator without selector (matches all pipelines) +apiVersion: observability.kaasops.io/v1alpha1 +kind: ClusterVectorAggregator +metadata: + name: no-selector-aggregator +spec: + resourceNamespace: NAMESPACE + image: timberio/vector:0.40.0-alpine + replicas: 1 diff --git a/test/e2e/testdata/selector-matching/cva-non-matching.yaml b/test/e2e/testdata/selector-matching/cva-non-matching.yaml new file mode 100644 index 0000000..a8f5ce8 --- /dev/null +++ b/test/e2e/testdata/selector-matching/cva-non-matching.yaml @@ -0,0 +1,13 @@ +# ClusterVectorAggregator with selector that does NOT match pipeline labels +# This aggregator requires "team: backend" but our pipeline has "team: platform" +apiVersion: observability.kaasops.io/v1alpha1 +kind: ClusterVectorAggregator +metadata: + name: non-matching-aggregator +spec: + resourceNamespace: NAMESPACE + image: timberio/vector:0.40.0-alpine + replicas: 1 + selector: + matchLabels: + team: backend diff --git a/test/e2e/testdata/selector-matching/cvp-no-labels.yaml b/test/e2e/testdata/selector-matching/cvp-no-labels.yaml new file mode 100644 index 0000000..2abfa78 --- /dev/null +++ b/test/e2e/testdata/selector-matching/cvp-no-labels.yaml @@ -0,0 +1,24 @@ +# ClusterVectorPipeline without labels - should match aggregator with no selector +apiVersion: observability.kaasops.io/v1alpha1 +kind: ClusterVectorPipeline +metadata: + name: unlabeled-pipeline +spec: + sources: + logs: + type: kubernetes_logs + extra_label_selector: "app.kubernetes.io/name=vector" + transforms: + parse: + type: remap + inputs: + - logs + source: | + .unlabeled_pipeline = true + sinks: + console: + type: console + inputs: + - parse + encoding: + codec: json diff --git a/test/e2e/testdata/selector-matching/cvp-with-labels.yaml b/test/e2e/testdata/selector-matching/cvp-with-labels.yaml new file mode 100644 index 0000000..d5d0ccd --- /dev/null +++ b/test/e2e/testdata/selector-matching/cvp-with-labels.yaml @@ -0,0 +1,26 @@ +# ClusterVectorPipeline with labels that match "matching-aggregator" selector +apiVersion: observability.kaasops.io/v1alpha1 +kind: ClusterVectorPipeline +metadata: + name: labeled-pipeline + labels: + team: platform +spec: + sources: + logs: + type: kubernetes_logs + extra_label_selector: "app.kubernetes.io/name=vector" + transforms: + parse: + type: remap + inputs: + - logs + source: | + .labeled_pipeline = true + sinks: + console: + type: console + inputs: + - parse + encoding: + codec: json