Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions internal/controller/pipeline_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,24 +24,23 @@ import (
"time"

"golang.org/x/sync/errgroup"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/kaasops/vector-operator/internal/config/configcheck"
"github.com/kaasops/vector-operator/internal/vector/aggregator"
"github.com/kaasops/vector-operator/internal/vector/vectoragent"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/client-go/kubernetes"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"

"github.com/kaasops/vector-operator/api/v1alpha1"
"github.com/kaasops/vector-operator/internal/config"
"github.com/kaasops/vector-operator/internal/config/configcheck"
"github.com/kaasops/vector-operator/internal/pipeline"
"github.com/kaasops/vector-operator/internal/utils/k8s"
"github.com/kaasops/vector-operator/internal/vector/aggregator"
"github.com/kaasops/vector-operator/internal/vector/vectoragent"
)

type PipelineReconciler struct {
Expand Down Expand Up @@ -139,12 +138,22 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
return ctrl.Result{}, nil
}
pipelineCR.SetRole(pipelineVectorRole)

var pipelineLabels map[string]string
if pipelineCR.GetLabels() != nil {
pipelineLabels = pipelineCR.GetLabels()
}
eg := errgroup.Group{}

if *pipelineVectorRole == v1alpha1.VectorPipelineRoleAgent {

for _, vector := range vectorAgents {
var selectorLabels map[string]string
if vector.Spec.Selector != nil {
selectorLabels = vector.Spec.Selector.MatchLabels
}
if !k8s.MatchLabels(selectorLabels, pipelineLabels) {
continue
}
eg.Go(func() error {
vaCtrl := vectoragent.NewController(vector, r.Client, r.Clientset)
cfg, byteConfig, err := config.BuildAgentConfig(config.VectorConfigParams{
Expand Down Expand Up @@ -184,6 +193,13 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c

if pipelineCR.GetNamespace() != "" {
for _, vector := range vectorAggregators {
var selectorLabels map[string]string
if vector.Spec.Selector != nil {
selectorLabels = vector.Spec.Selector.MatchLabels
}
if !k8s.MatchLabels(selectorLabels, pipelineLabels) {
continue
}
eg.Go(func() error {
vaCtrl := aggregator.NewController(vector, r.Client, r.Clientset)
cfg, err := config.BuildAggregatorConfig(config.VectorConfigParams{
Expand Down Expand Up @@ -230,6 +246,13 @@ func (r *PipelineReconciler) Reconcile(ctx context.Context, req ctrl.Request) (c
} else {

for _, vector := range clusterVectorAggregators {
var selectorLabels map[string]string
if vector.Spec.Selector != nil {
selectorLabels = vector.Spec.Selector.MatchLabels
}
if !k8s.MatchLabels(selectorLabels, pipelineLabels) {
continue
}
eg.Go(func() error {
vaCtrl := aggregator.NewController(vector, r.Client, r.Clientset)
cfg, err := config.BuildAggregatorConfig(config.VectorConfigParams{
Expand Down
17 changes: 3 additions & 14 deletions internal/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

"github.com/kaasops/vector-operator/api/v1alpha1"
"github.com/kaasops/vector-operator/internal/utils/k8s"
)

type Pipeline interface {
Expand Down Expand Up @@ -82,7 +83,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP
vp.IsValid() &&
vp.GetRole() == filter.Role &&
(filter.Scope == AllPipelines || vp.Namespace == filter.Namespace) &&
MatchLabels(matchLabels, vp.Labels) {
k8s.MatchLabels(matchLabels, vp.Labels) {
validPipelines = append(validPipelines, vp.DeepCopy())
}
}
Expand All @@ -99,7 +100,7 @@ func GetValidPipelines(ctx context.Context, client client.Client, filter FilterP
if !cvp.IsDeleted() &&
cvp.IsValid() &&
cvp.GetRole() == filter.Role &&
MatchLabels(matchLabels, cvp.Labels) {
k8s.MatchLabels(matchLabels, cvp.Labels) {
validPipelines = append(validPipelines, cvp.DeepCopy())
}
}
Expand Down Expand Up @@ -148,15 +149,3 @@ func GetClusterVectorPipelines(ctx context.Context, client client.Client) ([]v1a
}
return cvps.Items, nil
}

func MatchLabels(selector map[string]string, labels map[string]string) bool {
if selector == nil {
return true
}
for k, v := range selector {
if labels[k] != v {
return false
}
}
return true
}
59 changes: 0 additions & 59 deletions internal/pipeline/pipeline_test.go

This file was deleted.

13 changes: 13 additions & 0 deletions internal/utils/k8s/label.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,3 +57,16 @@ func MergeLabels(dst, src map[string]string) map[string]string {
}
return dst
}

// MatchLabels matches a set of Kubernetes selectors and a set of Kubernetes labels
func MatchLabels(selector map[string]string, labels map[string]string) bool {
if selector == nil {
return true
}
for k, v := range selector {
if labels[k] != v {
return false
}
}
return true
}
114 changes: 114 additions & 0 deletions internal/utils/k8s/label_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package k8s

import (
"reflect"
"testing"
)

func TestMatchLabels(t *testing.T) {
tests := []struct {
name string
selector map[string]string
labels map[string]string
want bool
}{
{
name: "NoSelector",
selector: nil,
labels: map[string]string{"label1": "value1", "label2": "value2"},
want: true,
},
{
name: "MatchingLabels",
selector: map[string]string{"label1": "value1", "label2": "value2"},
labels: map[string]string{"label1": "value1", "label2": "value2"},
want: true,
},
{
name: "MismatchedLabelValues",
selector: map[string]string{"label1": "value1", "label2": "value2"},
labels: map[string]string{"label1": "value1", "label2": "mismatch"},
want: false,
},
{
name: "ExtraLabelsInMap",
selector: map[string]string{"label1": "value1"},
labels: map[string]string{"label1": "value1", "label2": "value2"},
want: true,
},
{
name: "SelectorWithNoMatches",
selector: map[string]string{"label1": "value1", "label2": "value2"},
labels: map[string]string{"label3": "value3"},
want: false,
},
{
name: "SelectorWithNoMatches2",
selector: map[string]string{"label1": "value1", "label2": "value2"},
labels: map[string]string{"label1": "label1"},
want: false,
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if got := MatchLabels(test.selector, test.labels); got != test.want {
t.Errorf("MatchLabels() = %v, want %v", got, test.want)
}
})
}
}

func TestMergeLabels(t *testing.T) {
tests := []struct {
name string
sourceLabels map[string]string
distLabels map[string]string
want map[string]string
}{
{
name: "EmptySource",
sourceLabels: nil,
distLabels: map[string]string{"label1": "value1", "label2": "value2"},
want: map[string]string{"label1": "value1", "label2": "value2"},
},
{
name: "EmptyDist",
sourceLabels: map[string]string{"label1": "value1", "label2": "value2"},
distLabels: nil,
want: map[string]string{"label1": "value1", "label2": "value2"},
},
{
name: "DifferentLabelValues",
sourceLabels: map[string]string{"label1": "value1", "label2": "value2"},
distLabels: map[string]string{"label1": "value1", "label2": "mismatch"},
want: map[string]string{"label1": "value1", "label2": "mismatch"},
},
{
name: "SameLabelValues",
sourceLabels: map[string]string{"label1": "value1"},
distLabels: map[string]string{"label1": "value1", "label2": "value2"},
want: map[string]string{"label1": "value1", "label2": "value2"},
},
{
name: "NewLabelValues",
sourceLabels: map[string]string{"label1": "value1", "label2": "value2"},
distLabels: map[string]string{"label3": "value3"},
want: map[string]string{"label1": "value1", "label2": "value2", "label3": "value3"},
},
{
name: "DifferentLabelValues2",
sourceLabels: map[string]string{"label1": "value1", "label2": "value2"},
distLabels: map[string]string{"label1": "label1"},
want: map[string]string{"label1": "label1", "label2": "value2"},
},
}

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if got := MergeLabels(test.distLabels, test.sourceLabels); !reflect.DeepEqual(got, test.want) {
t.Errorf("MatchLabels() = %v, want %v", got, test.want)
}
})
}
}