From 7523ca2dd86f46ee651a2c4f3dfec775f740141c Mon Sep 17 00:00:00 2001 From: "an.makarov" Date: Sun, 23 Nov 2025 22:13:08 +0300 Subject: [PATCH] Fixed "configcheck validates ClusterVectorPipeline (CVP) against all ClusterVectorAggregator instances instead of only matching ones" --- internal/controller/pipeline_controller.go | 39 +++++-- internal/pipeline/pipeline.go | 17 +-- internal/pipeline/pipeline_test.go | 59 ----------- internal/utils/k8s/label.go | 13 +++ internal/utils/k8s/label_test.go | 114 +++++++++++++++++++++ 5 files changed, 161 insertions(+), 81 deletions(-) delete mode 100644 internal/pipeline/pipeline_test.go create mode 100644 internal/utils/k8s/label_test.go diff --git a/internal/controller/pipeline_controller.go b/internal/controller/pipeline_controller.go index a1d3fa3..ebdf5a5 100644 --- a/internal/controller/pipeline_controller.go +++ b/internal/controller/pipeline_controller.go @@ -24,24 +24,23 @@ import ( "time" "golang.org/x/sync/errgroup" - "sigs.k8s.io/controller-runtime/pkg/controller" - "sigs.k8s.io/controller-runtime/pkg/predicate" - - "github.com/kaasops/vector-operator/internal/config/configcheck" - "github.com/kaasops/vector-operator/internal/vector/aggregator" - "github.com/kaasops/vector-operator/internal/vector/vectoragent" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/kaasops/vector-operator/api/v1alpha1" "github.com/kaasops/vector-operator/internal/config" + "github.com/kaasops/vector-operator/internal/config/configcheck" "github.com/kaasops/vector-operator/internal/pipeline" + "github.com/kaasops/vector-operator/internal/utils/k8s" + "github.com/kaasops/vector-operator/internal/vector/aggregator" + "github.com/kaasops/vector-operator/internal/vector/vectoragent" ) type PipelineReconciler struct { @@ -139,12 +138,22 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c return ctrl.Result{}, nil } pipelineCR.SetRole(pipelineVectorRole) - + var pipelineLabels map[string]string + if pipelineCR.GetLabels() != nil { + pipelineLabels = pipelineCR.GetLabels() + } eg := errgroup.Group{} if *pipelineVectorRole == v1alpha1.VectorPipelineRoleAgent { for _, vector := range vectorAgents { + var selectorLabels map[string]string + if vector.Spec.Selector != nil { + selectorLabels = vector.Spec.Selector.MatchLabels + } + if !k8s.MatchLabels(selectorLabels, pipelineLabels) { + continue + } eg.Go(func() error { vaCtrl := vectoragent.NewController(vector, r.Client, r.Clientset) cfg, byteConfig, err := config.BuildAgentConfig(config.VectorConfigParams{ @@ -184,6 +193,13 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c if pipelineCR.GetNamespace() != "" { for _, vector := range vectorAggregators { + var selectorLabels map[string]string + if vector.Spec.Selector != nil { + selectorLabels = vector.Spec.Selector.MatchLabels + } + if !k8s.MatchLabels(selectorLabels, pipelineLabels) { + continue + } eg.Go(func() error { vaCtrl := aggregator.NewController(vector, r.Client, r.Clientset) cfg, err := config.BuildAggregatorConfig(config.VectorConfigParams{ @@ -230,6 +246,13 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c } else { for _, vector := range clusterVectorAggregators { + var selectorLabels map[string]string + if vector.Spec.Selector != nil { + selectorLabels = vector.Spec.Selector.MatchLabels + } + if !k8s.MatchLabels(selectorLabels, pipelineLabels) { + continue + } eg.Go(func() error { vaCtrl := aggregator.NewController(vector, r.Client, r.Clientset) cfg, err := config.BuildAggregatorConfig(config.VectorConfigParams{ diff --git a/internal/pipeline/pipeline.go b/internal/pipeline/pipeline.go index 3a42ab6..1fd33c0 100644 --- a/internal/pipeline/pipeline.go +++ b/internal/pipeline/pipeline.go @@ -25,6 +25,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "github.com/kaasops/vector-operator/api/v1alpha1" + "github.com/kaasops/vector-operator/internal/utils/k8s" ) type Pipeline interface { @@ -82,7 +83,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP vp.IsValid() && vp.GetRole() == filter.Role && (filter.Scope == AllPipelines || vp.Namespace == filter.Namespace) && - MatchLabels(matchLabels, vp.Labels) { + k8s.MatchLabels(matchLabels, vp.Labels) { validPipelines = append(validPipelines, vp.DeepCopy()) } } @@ -99,7 +100,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP if !cvp.IsDeleted() && cvp.IsValid() && cvp.GetRole() == filter.Role && - MatchLabels(matchLabels, cvp.Labels) { + k8s.MatchLabels(matchLabels, cvp.Labels) { validPipelines = append(validPipelines, cvp.DeepCopy()) } } @@ -148,15 +149,3 @@ func GetClusterVectorPipelines(ctx context.Context, client client.Client) ([]v1a } return cvps.Items, nil } - -func MatchLabels(selector map[string]string, labels map[string]string) bool { - if selector == nil { - return true - } - for k, v := range selector { - if labels[k] != v { - return false - } - } - return true -} diff --git a/internal/pipeline/pipeline_test.go b/internal/pipeline/pipeline_test.go deleted file mode 100644 index d04adde..0000000 --- a/internal/pipeline/pipeline_test.go +++ /dev/null @@ -1,59 +0,0 @@ -package pipeline - -import ( - "testing" -) - -func TestMatchLabels(t *testing.T) { - tests := []struct { - name string - selector map[string]string - labels map[string]string - want bool - }{ - { - name: "NoSelector", - selector: nil, - labels: map[string]string{"label1": "value1", "label2": "value2"}, - want: true, - }, - { - name: "MatchingLabels", - selector: map[string]string{"label1": "value1", "label2": "value2"}, - labels: map[string]string{"label1": "value1", "label2": "value2"}, - want: true, - }, - { - name: "MismatchedLabelValues", - selector: map[string]string{"label1": "value1", "label2": "value2"}, - labels: map[string]string{"label1": "value1", "label2": "mismatch"}, - want: false, - }, - { - name: "ExtraLabelsInMap", - selector: map[string]string{"label1": "value1"}, - labels: map[string]string{"label1": "value1", "label2": "value2"}, - want: true, - }, - { - name: "SelectorWithNoMatches", - selector: map[string]string{"label1": "value1", "label2": "value2"}, - labels: map[string]string{"label3": "value3"}, - want: false, - }, - { - name: "SelectorWithNoMatches2", - selector: map[string]string{"label1": "value1", "label2": "value2"}, - labels: map[string]string{"label1": "label1"}, - want: false, - }, - } - - for _, test := range tests { - t.Run(test.name, func(t *testing.T) { - if got := MatchLabels(test.selector, test.labels); got != test.want { - t.Errorf("MatchLabels() = %v, want %v", got, test.want) - } - }) - } -} diff --git a/internal/utils/k8s/label.go b/internal/utils/k8s/label.go index 9f75c8e..de1526f 100644 --- a/internal/utils/k8s/label.go +++ b/internal/utils/k8s/label.go @@ -57,3 +57,16 @@ func MergeLabels(dst, src map[string]string) map[string]string { } return dst } + +// MatchLabels matches a set of Kubernetes selectors and a set of Kubernetes labels +func MatchLabels(selector map[string]string, labels map[string]string) bool { + if selector == nil { + return true + } + for k, v := range selector { + if labels[k] != v { + return false + } + } + return true +} diff --git a/internal/utils/k8s/label_test.go b/internal/utils/k8s/label_test.go new file mode 100644 index 0000000..5186d39 --- /dev/null +++ b/internal/utils/k8s/label_test.go @@ -0,0 +1,114 @@ +package k8s + +import ( + "reflect" + "testing" +) + +func TestMatchLabels(t *testing.T) { + tests := []struct { + name string + selector map[string]string + labels map[string]string + want bool + }{ + { + name: "NoSelector", + selector: nil, + labels: map[string]string{"label1": "value1", "label2": "value2"}, + want: true, + }, + { + name: "MatchingLabels", + selector: map[string]string{"label1": "value1", "label2": "value2"}, + labels: map[string]string{"label1": "value1", "label2": "value2"}, + want: true, + }, + { + name: "MismatchedLabelValues", + selector: map[string]string{"label1": "value1", "label2": "value2"}, + labels: map[string]string{"label1": "value1", "label2": "mismatch"}, + want: false, + }, + { + name: "ExtraLabelsInMap", + selector: map[string]string{"label1": "value1"}, + labels: map[string]string{"label1": "value1", "label2": "value2"}, + want: true, + }, + { + name: "SelectorWithNoMatches", + selector: map[string]string{"label1": "value1", "label2": "value2"}, + labels: map[string]string{"label3": "value3"}, + want: false, + }, + { + name: "SelectorWithNoMatches2", + selector: map[string]string{"label1": "value1", "label2": "value2"}, + labels: map[string]string{"label1": "label1"}, + want: false, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if got := MatchLabels(test.selector, test.labels); got != test.want { + t.Errorf("MatchLabels() = %v, want %v", got, test.want) + } + }) + } +} + +func TestMergeLabels(t *testing.T) { + tests := []struct { + name string + sourceLabels map[string]string + distLabels map[string]string + want map[string]string + }{ + { + name: "EmptySource", + sourceLabels: nil, + distLabels: map[string]string{"label1": "value1", "label2": "value2"}, + want: map[string]string{"label1": "value1", "label2": "value2"}, + }, + { + name: "EmptyDist", + sourceLabels: map[string]string{"label1": "value1", "label2": "value2"}, + distLabels: nil, + want: map[string]string{"label1": "value1", "label2": "value2"}, + }, + { + name: "DifferentLabelValues", + sourceLabels: map[string]string{"label1": "value1", "label2": "value2"}, + distLabels: map[string]string{"label1": "value1", "label2": "mismatch"}, + want: map[string]string{"label1": "value1", "label2": "mismatch"}, + }, + { + name: "SameLabelValues", + sourceLabels: map[string]string{"label1": "value1"}, + distLabels: map[string]string{"label1": "value1", "label2": "value2"}, + want: map[string]string{"label1": "value1", "label2": "value2"}, + }, + { + name: "NewLabelValues", + sourceLabels: map[string]string{"label1": "value1", "label2": "value2"}, + distLabels: map[string]string{"label3": "value3"}, + want: map[string]string{"label1": "value1", "label2": "value2", "label3": "value3"}, + }, + { + name: "DifferentLabelValues2", + sourceLabels: map[string]string{"label1": "value1", "label2": "value2"}, + distLabels: map[string]string{"label1": "label1"}, + want: map[string]string{"label1": "label1", "label2": "value2"}, + }, + } + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + if got := MergeLabels(test.distLabels, test.sourceLabels); !reflect.DeepEqual(got, test.want) { + t.Errorf("MatchLabels() = %v, want %v", got, test.want) + } + }) + } +}