Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions test/e2e/framework/framework.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,50 @@ func (f *Framework) VerifyAggregatorHasPipeline(aggregatorName, pipelineName str
return nil
}

// VerifyAggregatorHasClusterPipeline verifies that an aggregator Secret contains the specified ClusterVectorPipeline
func (f *Framework) VerifyAggregatorHasClusterPipeline(aggregatorName, pipelineName string) error {
// Get the aggregator's vector config from the Secret
// The config is stored in a Secret with name pattern: {aggregatorName}-aggregator
secretName := fmt.Sprintf("%s-aggregator", aggregatorName)

// Get base64-encoded config from Secret
encodedConfig, err := f.kubectl.GetWithJsonPath("secret", secretName, ".data['config\\.json']")
if err != nil {
return fmt.Errorf("failed to get aggregator secret %s: %w", secretName, err)
}

if encodedConfig == "" {
return fmt.Errorf("aggregator secret %s has no config.json data", secretName)
}

// Check size before decoding to prevent DoS via large payloads
maxEncodedSize := MaxConfigSize * 4 / 3
if len(encodedConfig) > maxEncodedSize {
return fmt.Errorf("config too large: %d bytes (max %d bytes)", len(encodedConfig), maxEncodedSize)
}

// Decode base64
configBytes, err := base64.StdEncoding.DecodeString(encodedConfig)
if err != nil {
return fmt.Errorf("failed to decode base64 config from secret %s: %w", secretName, err)
}
config := string(configBytes)

if config == "" {
return fmt.Errorf("aggregator %s config is empty after decoding", aggregatorName)
}

// Check if the cluster pipeline name appears in the config
// ClusterVectorPipeline components are prefixed with only pipelinename- (no namespace prefix)
expectedPrefix := fmt.Sprintf("%s-", pipelineName)
if !strings.Contains(config, expectedPrefix) {
return fmt.Errorf("cluster pipeline %s not found in aggregator %s config (expected prefix: %s)",
pipelineName, aggregatorName, expectedPrefix)
}

return nil
}

// ApplyTestDataWithVars loads and applies a test manifest with variable substitution
func (f *Framework) ApplyTestDataWithVars(path string, vars map[string]string) {
By(fmt.Sprintf("applying test data with vars: %s", path))
Expand All @@ -728,6 +772,17 @@ func (f *Framework) DeleteResource(kind, name string) {
Expect(err).NotTo(HaveOccurred(), "Failed to delete %s %s in namespace %s", kind, name, f.namespace)
}

// DeleteClusterResource deletes a cluster-scoped Kubernetes resource (no namespace)
func (f *Framework) DeleteClusterResource(kind, name string) {
By(fmt.Sprintf("deleting cluster resource %s %s", kind, name))
client := kubectl.NewClient("")
err := client.DeleteClusterScoped(kind, name)
if err != nil {
// Log warning but don't fail - resource might already be deleted
GinkgoWriter.Printf("Warning: failed to delete %s %s: %v\n", kind, name, err)
}
}

// WaitForPodReadyInNamespace waits for a pod to become ready in a specific namespace
func (f *Framework) WaitForPodReadyInNamespace(podName, namespace string) {
By(fmt.Sprintf("waiting for pod %s to be ready in namespace %s", podName, namespace))
Expand Down Expand Up @@ -779,6 +834,24 @@ func (f *Framework) WaitForClusterPipelineValid(name string) {
"ClusterVectorPipeline %s did not become valid", name)
}

// WaitForClusterPipelineInvalid waits for a ClusterVectorPipeline to become invalid (for negative tests)
func (f *Framework) WaitForClusterPipelineInvalid(name string) {
By(fmt.Sprintf("waiting for ClusterVectorPipeline %s to become invalid", name))
start := time.Now()
defer func() {
duration := time.Since(start)
GinkgoWriter.Printf("⏱️ ClusterVectorPipeline %s invalidated in %v\n", name, duration)
}()

// ClusterVectorPipeline is cluster-scoped, so we use a client without namespace
client := kubectl.NewClient("")
Eventually(func() string {
result, _ := client.GetWithJsonPath("clustervectorpipeline", name, ".status.configCheckResult")
return result
}, config.PipelineValidTimeout, config.DefaultPollInterval).Should(Equal("false"),
"ClusterVectorPipeline %s did not become invalid", name)
}

// GetClusterPipelineAnnotation retrieves a specific annotation from a ClusterVectorPipeline
func (f *Framework) GetClusterPipelineAnnotation(name, annotationKey string) string {
jsonPath := fmt.Sprintf(".metadata.annotations['%s']", annotationKey)
Expand Down
18 changes: 18 additions & 0 deletions test/e2e/framework/kubectl/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,24 @@ func (c *Client) Delete(resourceType, name string) error {
return err
}

// DeleteClusterScoped deletes a cluster-scoped resource (no namespace)
func (c *Client) DeleteClusterScoped(resourceType, name string) error {
// Validate parameters to prevent command injection
if err := ValidateResourceType(resourceType); err != nil {
return fmt.Errorf("resource type validation failed: %w", err)
}
if err := ValidateResourceName(name); err != nil {
return fmt.Errorf("resource name validation failed: %w", err)
}

// Log command for audit and reproducibility
log.Printf("KUBECTL_CMD: kubectl delete %s %s --ignore-not-found", resourceType, name)

cmd := exec.Command("kubectl", "delete", resourceType, name, "--ignore-not-found")
_, err := utils.Run(cmd)
return err
}

// CreateNamespace creates a namespace
func CreateNamespace(name string) error {
// Validate namespace to prevent command injection
Expand Down
132 changes: 132 additions & 0 deletions test/e2e/selector_matching_e2e_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
Copyright 2024.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package e2e

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/kaasops/vector-operator/test/e2e/framework"
"github.com/kaasops/vector-operator/test/e2e/framework/config"
)

// Resource names used in selector matching tests
const (
selectorTestAgent = "test-agent"
labeledPipelineName = "labeled-pipeline"
unlabeledPipelineName = "unlabeled-pipeline"
matchingAggregatorName = "matching-aggregator"
nonMatchingAggregatorName = "non-matching-aggregator"
noSelectorAggregatorName = "no-selector-aggregator"
matchingAggregatorDeployment = matchingAggregatorName + "-aggregator"
nonMatchingAggregatorDeploy = nonMatchingAggregatorName + "-aggregator"
noSelectorAggregatorDeploy = noSelectorAggregatorName + "-aggregator"
)

// Selector Matching tests verify that ClusterVectorPipeline is validated only against
// ClusterVectorAggregator instances whose selector matches the pipeline's labels.
// This is a regression test for issue #201 / PR #208.
//
// The bug was: configcheck validated CVP against ALL CVA instances instead of only
// those whose selector matches the CVP's labels. This caused validation failures
// when a CVP used features/config only available on specific aggregators.
var _ = Describe("Selector Matching", Label(config.LabelSmoke, config.LabelFast), Ordered, func() {
f := framework.NewUniqueFramework("test-selector-matching")

BeforeAll(func() {
f.Setup()

By("deploying Vector Agent")
f.ApplyTestData("selector-matching/agent.yaml")

// Give controller time to process Vector CR and create DaemonSet
time.Sleep(5 * time.Second)
})

AfterAll(func() {
// Clean up cluster-scoped resources
By("cleaning up ClusterVectorPipelines")
f.DeleteClusterResource("clustervectorpipeline", labeledPipelineName)
f.DeleteClusterResource("clustervectorpipeline", unlabeledPipelineName)

By("cleaning up ClusterVectorAggregators")
f.DeleteClusterResource("clustervectoraggregator", matchingAggregatorName)
f.DeleteClusterResource("clustervectoraggregator", nonMatchingAggregatorName)
f.DeleteClusterResource("clustervectoraggregator", noSelectorAggregatorName)

f.Teardown()
f.PrintMetrics()
})

Context("CVP with labels matching CVA selector", func() {
It("should validate CVP only against matching CVA", func() {
By("deploying ClusterVectorAggregator with matching selector (team: platform)")
f.ApplyTestData("selector-matching/cva-matching.yaml")
f.WaitForDeploymentReady(matchingAggregatorDeployment)

By("deploying ClusterVectorAggregator with non-matching selector (team: backend)")
f.ApplyTestData("selector-matching/cva-non-matching.yaml")
f.WaitForDeploymentReady(nonMatchingAggregatorDeploy)

By("creating ClusterVectorPipeline with label team: platform")
f.ApplyTestDataWithoutNamespaceReplacement("selector-matching/cvp-with-labels.yaml")

By("waiting for CVP to become valid")
// The pipeline should be valid because it matches "matching-aggregator"
// Before the fix in PR #208, the pipeline would be validated against ALL aggregators,
// potentially causing validation failures against non-matching aggregators
f.WaitForClusterPipelineValid(labeledPipelineName)

By("verifying CVP role is agent (kubernetes_logs source)")
role := f.GetClusterPipelineStatus(labeledPipelineName, "role")
Expect(role).To(Equal("agent"), "Pipeline with kubernetes_logs source should have agent role")

By("verifying CVP is processed by agent")
Eventually(func() error {
return f.VerifyAgentHasClusterPipeline(selectorTestAgent, labeledPipelineName)
}, config.ServiceCreateTimeout, config.DefaultPollInterval).Should(Succeed(),
"Pipeline should be in agent's config")
})
})

Context("CVP without labels with CVA without selector", func() {
It("should validate CVP against CVA without selector", func() {
By("deploying ClusterVectorAggregator without selector")
f.ApplyTestData("selector-matching/cva-no-selector.yaml")
f.WaitForDeploymentReady(noSelectorAggregatorDeploy)

By("creating ClusterVectorPipeline without labels")
f.ApplyTestDataWithoutNamespaceReplacement("selector-matching/cvp-no-labels.yaml")

By("waiting for CVP to become valid")
// Pipeline without labels should match aggregator without selector
f.WaitForClusterPipelineValid(unlabeledPipelineName)

By("verifying CVP role is agent (kubernetes_logs source)")
role := f.GetClusterPipelineStatus(unlabeledPipelineName, "role")
Expect(role).To(Equal("agent"), "Pipeline with kubernetes_logs source should have agent role")

By("verifying CVP is processed by agent")
Eventually(func() error {
return f.VerifyAgentHasClusterPipeline(selectorTestAgent, unlabeledPipelineName)
}, config.ServiceCreateTimeout, config.DefaultPollInterval).Should(Succeed(),
"Pipeline should be in agent's config")
})
})
})
7 changes: 7 additions & 0 deletions test/e2e/testdata/selector-matching/agent.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
apiVersion: observability.kaasops.io/v1alpha1
kind: Vector
metadata:
name: test-agent
spec:
agent:
image: timberio/vector:0.40.0-alpine
12 changes: 12 additions & 0 deletions test/e2e/testdata/selector-matching/cva-matching.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# ClusterVectorAggregator with selector that MATCHES pipeline labels
apiVersion: observability.kaasops.io/v1alpha1
kind: ClusterVectorAggregator
metadata:
name: matching-aggregator
spec:
resourceNamespace: NAMESPACE
image: timberio/vector:0.40.0-alpine
replicas: 1
selector:
matchLabels:
team: platform
9 changes: 9 additions & 0 deletions test/e2e/testdata/selector-matching/cva-no-selector.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# ClusterVectorAggregator without selector (matches all pipelines)
apiVersion: observability.kaasops.io/v1alpha1
kind: ClusterVectorAggregator
metadata:
name: no-selector-aggregator
spec:
resourceNamespace: NAMESPACE
image: timberio/vector:0.40.0-alpine
replicas: 1
13 changes: 13 additions & 0 deletions test/e2e/testdata/selector-matching/cva-non-matching.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# ClusterVectorAggregator with selector that does NOT match pipeline labels
# This aggregator requires "team: backend" but our pipeline has "team: platform"
apiVersion: observability.kaasops.io/v1alpha1
kind: ClusterVectorAggregator
metadata:
name: non-matching-aggregator
spec:
resourceNamespace: NAMESPACE
image: timberio/vector:0.40.0-alpine
replicas: 1
selector:
matchLabels:
team: backend
24 changes: 24 additions & 0 deletions test/e2e/testdata/selector-matching/cvp-no-labels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# ClusterVectorPipeline without labels - should match aggregator with no selector
apiVersion: observability.kaasops.io/v1alpha1
kind: ClusterVectorPipeline
metadata:
name: unlabeled-pipeline
spec:
sources:
logs:
type: kubernetes_logs
extra_label_selector: "app.kubernetes.io/name=vector"
transforms:
parse:
type: remap
inputs:
- logs
source: |
.unlabeled_pipeline = true
sinks:
console:
type: console
inputs:
- parse
encoding:
codec: json
26 changes: 26 additions & 0 deletions test/e2e/testdata/selector-matching/cvp-with-labels.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# ClusterVectorPipeline with labels that match "matching-aggregator" selector
apiVersion: observability.kaasops.io/v1alpha1
kind: ClusterVectorPipeline
metadata:
name: labeled-pipeline
labels:
team: platform
spec:
sources:
logs:
type: kubernetes_logs
extra_label_selector: "app.kubernetes.io/name=vector"
transforms:
parse:
type: remap
inputs:
- logs
source: |
.labeled_pipeline = true
sinks:
console:
type: console
inputs:
- parse
encoding:
codec: json