diff --git a/apis/nodecore/v1alpha1/k8slice_configuration.go b/apis/nodecore/v1alpha1/k8slice_configuration.go index b51f22b5..fd17d5a8 100644 --- a/apis/nodecore/v1alpha1/k8slice_configuration.go +++ b/apis/nodecore/v1alpha1/k8slice_configuration.go @@ -14,7 +14,9 @@ package v1alpha1 -import "k8s.io/apimachinery/pkg/api/resource" +import ( + "k8s.io/apimachinery/pkg/api/resource" +) // K8SliceConfiguration is the partition of the flavor K8Slice. type K8SliceConfiguration struct { diff --git a/apis/nodecore/v1alpha1/k8slice_flavor.go b/apis/nodecore/v1alpha1/k8slice_flavor.go index d2ae483d..b24f6fc6 100644 --- a/apis/nodecore/v1alpha1/k8slice_flavor.go +++ b/apis/nodecore/v1alpha1/k8slice_flavor.go @@ -46,7 +46,7 @@ type K8SliceCharacteristics struct { Memory resource.Quantity `json:"memory"` // Pods is the maximum number of pods schedulable on this K8Slice Flavor. Pods resource.Quantity `json:"pods"` - // GPU is the number of GPU cores of the K8Slice Flavor. + // GPU contains the GPU traits of the K8Slice Flavor. Gpu *GPU `json:"gpu,omitempty"` // Storage is the amount of storage offered by this K8Slice Flavor. Storage *resource.Quantity `json:"storage,omitempty"` @@ -60,6 +60,37 @@ type GPU struct { Cores resource.Quantity `json:"cores"` // Memory of the GPU Memory resource.Quantity `json:"memory"` + // + // FLARE properties + // + Vendor string `json:"vendor"` + Tier string `json:"tier,omitempty"` + Count int64 `json:"count,omitempty"` + MultiInstance bool `json:"multi_instance,omitempty"` + Shared bool `json:"shared,omitempty"` + SharingStrategy string `json:"sharing_strategy,omitempty"` + Dedicated bool `json:"dedicated,omitempty"` + Interruptible bool `json:"interruptible,omitempty"` + NetworkBandwidth resource.Quantity `json:"network_bandwidth,omitempty"` + NetworkLatencyMs int64 `json:"network_latency_ms,omitempty"` + NetworkTier string `json:"network_tier,omitempty"` + TrainingScore float64 `json:"training_score,omitempty"` + InferenceScore float64 `json:"inference_score,omitempty"` + HPCScore float64 `json:"hpc_score,omitempty"` + GraphicsScore float64 `json:"graphics_score,omitempty"` + Architecture string `json:"architecture,omitempty"` + Interconnect string `json:"interconnect,omitempty"` + InterconnectBandwidth resource.Quantity `json:"interconnect_bandwidth,omitempty"` + ComputeCapability string `json:"compute_capability,omitempty"` + ClockSpeed resource.Quantity `json:"clock_speed,omitempty"` + FP32TFlops float64 `json:"fp32_tflops,omitempty"` + Topology string `json:"topology,omitempty"` + MultiGPUEfficiency string `json:"multi_gpu_efficiency,omitempty"` + Region string `json:"region,omitempty"` + Zone string `json:"zone,omitempty"` + HourlyRate float64 `json:"hourly_rate,omitempty"` + Provider string `json:"provider,omitempty"` + PreEmptible bool `json:"pre_emptible,omitempty"` } // Policies represents the policies of a K8Slice Flavor, such as the partitionability of the K8Slice Flavor. diff --git a/apis/nodecore/v1alpha1/k8slice_selector.go b/apis/nodecore/v1alpha1/k8slice_selector.go index 04b50c54..7fe016d1 100644 --- a/apis/nodecore/v1alpha1/k8slice_selector.go +++ b/apis/nodecore/v1alpha1/k8slice_selector.go @@ -14,7 +14,9 @@ package v1alpha1 -import "k8s.io/klog/v2" +import ( + "k8s.io/klog/v2" +) // K8SliceSelector is the selector for a K8Slice. type K8SliceSelector struct { @@ -32,6 +34,9 @@ type K8SliceSelector struct { // StorageFilter is the Storage filter of the K8SliceSelector. StorageFilter *ResourceQuantityFilter `json:"storageFilter,omitempty"` + + //GPUFilters is the advanced GPU filter of the K8SliceSelector + GPUFilters []GPUFieldSelector `json:"gpuFilters,omitempty"` } // GetFlavorTypeSelector returns the type of the Flavor. diff --git a/apis/nodecore/v1alpha1/selector.go b/apis/nodecore/v1alpha1/selector.go index c807754b..0ec9c724 100644 --- a/apis/nodecore/v1alpha1/selector.go +++ b/apis/nodecore/v1alpha1/selector.go @@ -34,6 +34,30 @@ const ( // FilterType is the type of filter that can be applied to a resource quantity. type FilterType string +// BooleanFilter is a filter that can be applied to a boolean value. +type BooleanFilter struct { + Data runtime.RawExtension `json:"data"` +} + +type NumberFilter struct { + // Name indicates the type of the filter + Name FilterType `json:"name"` + // Filter data + Data runtime.RawExtension `json:"data"` +} + +type NumberMatchSelector struct { + // Value is the value to match + Value float64 `json:"value"` +} + +type NumberRangeSelector struct { + // Min is the minimum value of the range + Min *float64 `json:"min,omitempty"` + // Max is the maximum value of the range + Max *float64 `json:"max,omitempty"` +} + // StringFilter is a filter that can be applied to a string. type StringFilter struct { // Name indicates the type of the filter @@ -76,6 +100,28 @@ type ResourceRangeSelector struct { Max *resource.Quantity `json:"max,omitempty"` } +type SelectorName string + +var ( + ResourceRangeSelectorName = SelectorName("ResourceRangeSelector") + ResourceMatchSelectorName = SelectorName("ResourceMatchSelector") + StringRangeSelectorName = SelectorName("StringRangeSelector") + StringFilterSelectorName = SelectorName("StringFilter") + BooleanFilterSelectorName = SelectorName("BooleanFilter") + NumberRangeSelectorName = SelectorName("NumberRangeFilter") + NumberMatchSelectorName = SelectorName("NumberMatchFilter") +) + +type GPUFieldSelector struct { + // Field is the GPU field the filter should evaluate. + Field string `json:"field"` + // Selector is the name of the concrete Selector to enforce for the given data. + //+kubebuilder:validation:Enum=ResourceRangeSelector;ResourceMatchSelector;StringRangeSelector;StringFilter;BooleanFilter;NumberRangeFilter;NumberFilter + Selector SelectorName `json:"filter"` + // Data contains the raw data for the given filter. + Data runtime.RawExtension `json:"data"` +} + // ParseResourceQuantityFilter parses a ResourceQuantityFilter into a FilterType and the corresponding filter data. // It also provides a set of validation rules for the filter data. // Particularly for the ResourceRangeSelector, it checks that at least one of min or max is set and that min is less than max if both are set. diff --git a/apis/nodecore/v1alpha1/zz_generated.deepcopy.go b/apis/nodecore/v1alpha1/zz_generated.deepcopy.go index 1fee71ac..1425276b 100644 --- a/apis/nodecore/v1alpha1/zz_generated.deepcopy.go +++ b/apis/nodecore/v1alpha1/zz_generated.deepcopy.go @@ -113,6 +113,22 @@ func (in *AllocationStatus) DeepCopy() *AllocationStatus { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *BooleanFilter) DeepCopyInto(out *BooleanFilter) { + *out = *in + in.Data.DeepCopyInto(&out.Data) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new BooleanFilter. +func (in *BooleanFilter) DeepCopy() *BooleanFilter { + if in == nil { + return nil + } + out := new(BooleanFilter) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *CarbonFootprint) DeepCopyInto(out *CarbonFootprint) { *out = *in @@ -267,6 +283,9 @@ func (in *GPU) DeepCopyInto(out *GPU) { *out = *in out.Cores = in.Cores.DeepCopy() out.Memory = in.Memory.DeepCopy() + out.NetworkBandwidth = in.NetworkBandwidth.DeepCopy() + out.InterconnectBandwidth = in.InterconnectBandwidth.DeepCopy() + out.ClockSpeed = in.ClockSpeed.DeepCopy() } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GPU. @@ -279,6 +298,22 @@ func (in *GPU) DeepCopy() *GPU { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *GPUFieldSelector) DeepCopyInto(out *GPUFieldSelector) { + *out = *in + in.Data.DeepCopyInto(&out.Data) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new GPUFieldSelector. +func (in *GPUFieldSelector) DeepCopy() *GPUFieldSelector { + if in == nil { + return nil + } + out := new(GPUFieldSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *GenericRef) DeepCopyInto(out *GenericRef) { *out = *in @@ -396,6 +431,13 @@ func (in *K8SliceSelector) DeepCopyInto(out *K8SliceSelector) { *out = new(ResourceQuantityFilter) (*in).DeepCopyInto(*out) } + if in.GPUFilters != nil { + in, out := &in.GPUFilters, &out.GPUFilters + *out = make([]GPUFieldSelector, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new K8SliceSelector. @@ -519,6 +561,62 @@ func (in *NodeIdentityAdditionalInfo) DeepCopy() *NodeIdentityAdditionalInfo { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NumberFilter) DeepCopyInto(out *NumberFilter) { + *out = *in + in.Data.DeepCopyInto(&out.Data) +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NumberFilter. +func (in *NumberFilter) DeepCopy() *NumberFilter { + if in == nil { + return nil + } + out := new(NumberFilter) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NumberMatchSelector) DeepCopyInto(out *NumberMatchSelector) { + *out = *in +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NumberMatchSelector. +func (in *NumberMatchSelector) DeepCopy() *NumberMatchSelector { + if in == nil { + return nil + } + out := new(NumberMatchSelector) + in.DeepCopyInto(out) + return out +} + +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *NumberRangeSelector) DeepCopyInto(out *NumberRangeSelector) { + *out = *in + if in.Min != nil { + in, out := &in.Min, &out.Min + *out = new(float64) + **out = **in + } + if in.Max != nil { + in, out := &in.Max, &out.Max + *out = new(float64) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new NumberRangeSelector. +func (in *NumberRangeSelector) DeepCopy() *NumberRangeSelector { + if in == nil { + return nil + } + out := new(NumberRangeSelector) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *Partitionability) DeepCopyInto(out *Partitionability) { *out = *in diff --git a/cmd/local-resource-manager/main.go b/cmd/local-resource-manager/main.go index 037a89d3..d6ab0e27 100644 --- a/cmd/local-resource-manager/main.go +++ b/cmd/local-resource-manager/main.go @@ -24,13 +24,13 @@ import ( "k8s.io/klog/v2" metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ctrl "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/healthz" "sigs.k8s.io/controller-runtime/pkg/log/zap" "sigs.k8s.io/controller-runtime/pkg/metrics/server" "sigs.k8s.io/controller-runtime/pkg/webhook" nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" + "github.com/fluidos-project/node/pkg/indexer" localresourcemanager "github.com/fluidos-project/node/pkg/local-resource-manager" "github.com/fluidos-project/node/pkg/utils/flags" ) @@ -82,13 +82,6 @@ func main() { ctrl.SetLogger(zap.New(zap.UseFlagOptions(&opts))) - cfg := ctrl.GetConfigOrDie() - cl, err := client.New(cfg, client.Options{Scheme: scheme}) - if err != nil { - setupLog.Error(err, "Unable to create client") - os.Exit(1) - } - var webhookServer webhook.Server if *enableWH { @@ -115,12 +108,15 @@ func main() { // Print something about the mgr setupLog.Info("Manager started", "manager", mgr) + flavorIndexer := indexer.FlavorByNodeName{} + // Register the controller if err = (&localresourcemanager.NodeReconciler{ - Client: cl, + Client: mgr.GetClient(), Scheme: mgr.GetScheme(), EnableAutoDiscovery: *enableAutoDiscovery, WebhookServer: webhookServer, + FlavorIndexer: flavorIndexer, }).SetupWithManager(mgr); err != nil { setupLog.Error(err, "unable to create controller", "controller", "Node") os.Exit(1) @@ -128,7 +124,7 @@ func main() { // Register the controller if err = (&localresourcemanager.ServiceBlueprintReconciler{ - Client: cl, + Client: mgr.GetClient(), Scheme: mgr.GetScheme(), EnableAutoDiscovery: *enableAutoDiscovery, WebhookServer: webhookServer, @@ -161,13 +157,22 @@ func main() { setupLog.Error(err, "unable to set up ready check") os.Exit(1) } - if err := mgr.AddHealthzCheck("webhook", webhookServer.StartedChecker()); err != nil { - setupLog.Error(err, "unable to set up webhook health check") + if *enableWH { + if err := mgr.AddHealthzCheck("webhook", webhookServer.StartedChecker()); err != nil { + setupLog.Error(err, "unable to set up webhook health check") + os.Exit(1) + } + } + + ctx := ctrl.SetupSignalHandler() + + if err := mgr.GetFieldIndexer().IndexField(ctx, flavorIndexer.Object(), flavorIndexer.Field(), flavorIndexer.IndexerFunc()); err != nil { + setupLog.Error(err, "problem setting up flavorIndexer") os.Exit(1) } setupLog.Info("Starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/cmd/rear-controller/main.go b/cmd/rear-controller/main.go index b7d67ed2..f6800f5c 100644 --- a/cmd/rear-controller/main.go +++ b/cmd/rear-controller/main.go @@ -40,7 +40,7 @@ import ( reservationv1alpha1 "github.com/fluidos-project/node/apis/reservation/v1alpha1" contractmanager "github.com/fluidos-project/node/pkg/rear-controller/contract-manager" discoverymanager "github.com/fluidos-project/node/pkg/rear-controller/discovery-manager" - gateway "github.com/fluidos-project/node/pkg/rear-controller/gateway" + "github.com/fluidos-project/node/pkg/rear-controller/gateway" "github.com/fluidos-project/node/pkg/utils/flags" ) @@ -84,8 +84,13 @@ func main() { setupLog.Info("Webhooks are disabled") } + ctx := ctrl.SetupSignalHandler() + mgr, err := ctrl.NewManager(ctrl.GetConfigOrDie(), ctrl.Options{ Scheme: scheme, + BaseContext: func() context.Context { + return ctx + }, Metrics: server.Options{ BindAddress: metricsAddr, }, @@ -130,7 +135,7 @@ func main() { } if err := cache.IndexField( - context.Background(), + ctx, &reservationv1alpha1.Contract{}, "spec.transactionID", indexFuncTransaction, @@ -140,7 +145,7 @@ func main() { } if err := cache.IndexField( - context.Background(), + ctx, &reservationv1alpha1.Contract{}, "spec.buyerClusterID", indexFuncClusterID, @@ -150,7 +155,7 @@ func main() { } if err := cache.IndexField( - context.Background(), + ctx, &reservationv1alpha1.Contract{}, "spec.buyer.additionalInformation.liqoID", indexFuncBuyerLiqoID, @@ -160,7 +165,7 @@ func main() { } if err := cache.IndexField( - context.Background(), + ctx, &reservationv1alpha1.Contract{}, "spec.seller.additionalInformation.liqoID", indexFuncSellerLiqoID, @@ -220,9 +225,11 @@ func main() { os.Exit(1) } - if err := mgr.AddHealthzCheck("webhook", webhookServer.StartedChecker()); err != nil { - setupLog.Error(err, "unable to set up webhook health check") - os.Exit(1) + if *enableWH { + if err := mgr.AddHealthzCheck("webhook", webhookServer.StartedChecker()); err != nil { + setupLog.Error(err, "unable to set up webhook health check") + os.Exit(1) + } } // Periodically clear the transaction cache @@ -244,7 +251,7 @@ func main() { } setupLog.Info("starting manager") - if err := mgr.Start(ctrl.SetupSignalHandler()); err != nil { + if err := mgr.Start(ctx); err != nil { setupLog.Error(err, "problem running manager") os.Exit(1) } diff --git a/cmd/rear-manager/main.go b/cmd/rear-manager/main.go index 1f62733c..2cd59f9b 100644 --- a/cmd/rear-manager/main.go +++ b/cmd/rear-manager/main.go @@ -154,9 +154,11 @@ func main() { os.Exit(1) } - if err := mgr.AddHealthzCheck("webhook", webhookServer.StartedChecker()); err != nil { - setupLog.Error(err, "unable to set up webhook health check") - os.Exit(1) + if *enableWH { + if err := mgr.AddHealthzCheck("webhook", webhookServer.StartedChecker()); err != nil { + setupLog.Error(err, "unable to set up webhook health check") + os.Exit(1) + } } setupLog.Info("starting manager") diff --git a/go.mod b/go.mod index 1cf6cc72..73af5c17 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gorilla/mux v1.8.1 github.com/liqotech/liqo v1.0.0 github.com/rabbitmq/amqp091-go v1.10.0 + golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 k8s.io/api v0.32.1 k8s.io/apimachinery v0.32.1 k8s.io/client-go v0.32.1 diff --git a/go.sum b/go.sum index 96e19b58..907cf59c 100644 --- a/go.sum +++ b/go.sum @@ -161,6 +161,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.35.0 h1:b15kiHdrGCHrP6LvwaQ3c03kgNhhiMgvlhxHQhmg2Xs= golang.org/x/crypto v0.35.0/go.mod h1:dy7dXNW32cAb/6/PRuTNsix8T+vJAqvuIy5Bli/x0YQ= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 h1:2dVuKD2vS7b0QIHQbpyTISPd0LeHDbnYEryqj5Q1ug8= +golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56/go.mod h1:M4RDyNAINzryxdtnbRXRL/OHtkFuWGRjvuhBJpk2IlY= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.22.0 h1:D4nJWe9zXqHOmWqj4VMOJhvzj7bEZg4wEYa759z1pH4= diff --git a/pkg/indexer/flavor_by_node_name.go b/pkg/indexer/flavor_by_node_name.go new file mode 100644 index 00000000..b66eda78 --- /dev/null +++ b/pkg/indexer/flavor_by_node_name.go @@ -0,0 +1,46 @@ +// Copyright 2022-2025 FLUIDOS Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package indexer + +import ( + "sigs.k8s.io/controller-runtime/pkg/client" + + nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" +) + +// FlavorByNodeName is a controller-runtime Manager indexer returning +// a list of nodecorev1alpha1.Flavor by Owner Reference name. +type FlavorByNodeName struct{} + +func (FlavorByNodeName) Object() client.Object { + return &nodecorev1alpha1.Flavor{} +} + +func (FlavorByNodeName) Field() string { + return "metadata.ownerReferences.name" +} + +func (FlavorByNodeName) IndexerFunc() client.IndexerFunc { + return func(obj client.Object) []string { + ownerReferences := obj.GetOwnerReferences() + keys := make([]string, 0, len(ownerReferences)) + + for _, or := range ownerReferences { + keys = append(keys, or.Name) + } + + return keys + } +} diff --git a/pkg/local-resource-manager/node_controller.go b/pkg/local-resource-manager/node_controller.go index d338a155..5f5d623f 100644 --- a/pkg/local-resource-manager/node_controller.go +++ b/pkg/local-resource-manager/node_controller.go @@ -16,24 +16,27 @@ package localresourcemanager import ( "context" - "fmt" + "encoding/json" corev1 "k8s.io/api/core/v1" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime" - "k8s.io/klog/v2" metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" ctrl "sigs.k8s.io/controller-runtime" + "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/client" - "sigs.k8s.io/controller-runtime/pkg/handler" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/webhook" nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" + "github.com/fluidos-project/node/pkg/indexer" "github.com/fluidos-project/node/pkg/utils/flags" "github.com/fluidos-project/node/pkg/utils/getters" - models "github.com/fluidos-project/node/pkg/utils/models" - "github.com/fluidos-project/node/pkg/utils/resourceforge" + "github.com/fluidos-project/node/pkg/utils/models" + "github.com/fluidos-project/node/pkg/utils/namings" + "github.com/fluidos-project/node/pkg/utils/parseutil" ) // ClusterRole @@ -50,142 +53,216 @@ type NodeReconciler struct { Scheme *runtime.Scheme EnableAutoDiscovery bool WebhookServer webhook.Server + FlavorIndexer indexer.FlavorByNodeName +} + +func (r *NodeReconciler) LabelSelector() labels.Selector { + return labels.Set{flags.ResourceNodeLabel: "true"}.AsSelector() } // Reconcile reconciles a Node object to create Flavor objects. func (r *NodeReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { log := ctrl.LoggerFrom(ctx, "node", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) - // Check if AutoDiscovery is enabled if !r.EnableAutoDiscovery { - klog.Info("AutoDiscovery is disabled") + log.Info("AutoDiscovery is disabled") return ctrl.Result{}, nil } - // Check if the webhook server is running - if err := r.WebhookServer.StartedChecker()(nil); err != nil { - klog.Info("Webhook server not started yet, requeuing the request") - return ctrl.Result{Requeue: true}, nil + if r.WebhookServer != nil { + // Check if the webhook server is running + if err := r.WebhookServer.StartedChecker()(nil); err != nil { + log.Info("Webhook server not started yet, requeuing the request") + return ctrl.Result{Requeue: true}, nil + } } - - // Set for labels over the node - labelSelector := labels.Set{flags.ResourceNodeLabel: "true"}.AsSelector() - // Fetch the Node instance var node corev1.Node if err := r.Get(ctx, req.NamespacedName, &node); err != nil { if client.IgnoreNotFound(err) != nil { - klog.Info("Node not found") + log.Info("Node not found") return ctrl.Result{}, nil } } - // Check if the node has the label - if !labelSelector.Matches(labels.Set(node.GetLabels())) { - klog.Infof("Node %s does not have the label %s", node.Name, flags.ResourceNodeLabel) + if !r.LabelSelector().Matches(labels.Set(node.GetLabels())) { + log.Info("Node %s does not have the label %s", node.Name, flags.ResourceNodeLabel) + return ctrl.Result{}, nil } - nodeMetrics := &metricsv1beta1.NodeMetrics{} - + var nodeMetrics metricsv1beta1.NodeMetrics // Get the node metrics referred to the node - if err := r.Client.Get(ctx, client.ObjectKey{Name: node.Name}, nodeMetrics); err != nil { - klog.Errorf("Error getting NodeMetrics: %v", err) + if err := r.Client.Get(ctx, client.ObjectKey{Name: node.Name}, &nodeMetrics); err != nil { + log.Error(err, "error getting NodeMetrics", err) return ctrl.Result{}, err } - // Get the NodeInfo struct for the node and its metrics - nodeInfo, err := GetNodeInfos(&node, nodeMetrics) + nodeInfo, err := GetNodeInfos(&node, &nodeMetrics) if err != nil { - klog.Errorf("Error getting NodeInfo: %v", err) + log.Error(err, "error getting NodeInfo", err) return ctrl.Result{}, err } - klog.Infof("NodeInfo created: %s", nodeInfo.Name) - + log.Info("NodeInfo created", "value", nodeInfo.Name) // Get NodeIdentity nodeIdentity := getters.GetNodeIdentity(ctx, r.Client) if nodeIdentity == nil { - klog.Error("Error getting FLUIDOS Node identity") + log.Info("error getting FLUIDOS Node identity") return ctrl.Result{}, nil } - - // Create ownerReferences with only the current node under examination - ownerReferences := []metav1.OwnerReference{ - { - APIVersion: nodecorev1alpha1.GroupVersion.String(), - Kind: "Node", - Name: node.Name, - UID: node.UID, - }, - } - - // Get all the Flavors owned by this node as kubernetes ownership - flavorsList := &nodecorev1alpha1.FlavorList{} - err = r.List(ctx, flavorsList) - if err != nil { - klog.Errorf("Error listing Flavors: %v", err) + // Get all the Flavors owned by this node as kubernetes ownership: + // iterating over a list is required since a Flavor can be owned by multiple resources. + var matchFlavors nodecorev1alpha1.FlavorList + if err = r.Client.List(ctx, &matchFlavors, client.MatchingFieldsSelector{Selector: fields.OneTermEqualSelector(r.FlavorIndexer.Field(), node.Name)}); err != nil { + log.Error(err, "error listing Flavors") return ctrl.Result{}, nil } + // Check if you have found any Flavor + var flavor *nodecorev1alpha1.Flavor - var matchFlavors []nodecorev1alpha1.Flavor + for _, i := range matchFlavors.Items { + for _, or := range i.OwnerReferences { + if or.Kind == "Node" { + flavor = &i - // Filter the Flavors by the owner reference - for i := range flavorsList.Items { - flavor := &flavorsList.Items[i] - // Check if the node is one of the owners - for _, owner := range flavor.OwnerReferences { - if owner.Name == node.Name { - // Add the Flavor to the list - matchFlavors = append(matchFlavors, *flavor) + log.Info("Flavor found", "namespacedName", client.ObjectKeyFromObject(flavor), "node", node.Name) + + break } } } - // Check if you have found any Flavor - if len(matchFlavors) > 0 { - klog.Infof("Found %d flavors for node %s", len(matchFlavors), node.Name) - // TODO: Check if the Flavors are consistent with the NodeInfo - // TODO: Update the Flavors if necessary - return ctrl.Result{}, nil - } - - // No Flavor found, create a new one - flavor, err := r.createFlavor(ctx, nodeInfo, *nodeIdentity, ownerReferences) - if err != nil { - klog.Errorf("Error creating Flavor: %v", err) + if err = r.createOrUpdateFlavor(ctx, flavor, nodeInfo, *nodeIdentity, &node); err != nil { + log.Error(err, "error creating or updating Flavor", err) return ctrl.Result{Requeue: true}, nil } - klog.Infof("Flavor created: %s", flavor.Name) + + log.Info("Flavor reconciliation completed") return ctrl.Result{}, nil } -func (r *NodeReconciler) createFlavor(ctx context.Context, nodeInfo *models.NodeInfo, - nodeIdentity nodecorev1alpha1.NodeIdentity, ownerReferences []metav1.OwnerReference) (flavor *nodecorev1alpha1.Flavor, err error) { +func (r *NodeReconciler) createOrUpdateFlavor(ctx context.Context, flavor *nodecorev1alpha1.Flavor, nodeInfo *models.NodeInfo, nodeIdentity nodecorev1alpha1.NodeIdentity, owner client.Object) (err error) { + log := ctrl.LoggerFrom(ctx) // Forge the Flavor from the NodeInfo and NodeIdentity - flavorResult := resourceforge.ForgeK8SliceFlavorFromMetrics(nodeInfo, nodeIdentity, ownerReferences) + shouldCreate := flavor == nil - if flavorResult == nil { - klog.Error("Error forging Flavor") - return nil, fmt.Errorf("error forging Flavor, Flavor is nil") + if shouldCreate { + flavor = &nodecorev1alpha1.Flavor{} + flavor.Name = namings.ForgeFlavorName(string(nodecorev1alpha1.TypeK8Slice), nodeIdentity.Domain) } - klog.Infof("Ready to create Flavor %s of type %s", flavorResult.Name, flavorResult.Spec.FlavorType.TypeIdentifier) + flavor.Namespace = flags.FluidosNamespace + + log.Info("ready to handle Flavor", "namespacedName", client.ObjectKeyFromObject(flavor), "type", nodecorev1alpha1.TypeK8Slice) + // Creating a new flavor custom resource from the metrics of the node. + res, err := controllerutil.CreateOrUpdate(ctx, r.Client, flavor, func() error { + var k8sSliceType nodecorev1alpha1.K8Slice + if len(flavor.Spec.FlavorType.TypeData.Raw) > 0 { + if unmarshalErr := json.Unmarshal(flavor.Spec.FlavorType.TypeData.Raw, &k8sSliceType); unmarshalErr != nil { + return unmarshalErr + } + } + + if shouldCreate { + k8sSliceType.Characteristics.CPU = nodeInfo.ResourceMetrics.CPUAvailable + k8sSliceType.Characteristics.Memory = nodeInfo.ResourceMetrics.MemoryAvailable + k8sSliceType.Characteristics.Pods = nodeInfo.ResourceMetrics.PodsAvailable + k8sSliceType.Characteristics.Storage = &nodeInfo.ResourceMetrics.EphemeralStorage + } - // Create the Flavor - err = r.Create(ctx, flavorResult) + k8sSliceType.Characteristics.Architecture = nodeInfo.Architecture + k8sSliceType.Characteristics.Gpu = &nodecorev1alpha1.GPU{ + Model: nodeInfo.ResourceMetrics.GPU.Model, + Cores: nodeInfo.ResourceMetrics.GPU.CoresTotal, + Memory: nodeInfo.ResourceMetrics.GPU.MemoryTotal, + Vendor: nodeInfo.ResourceMetrics.GPU.Vendor, + Tier: nodeInfo.ResourceMetrics.GPU.Tier, + Count: nodeInfo.ResourceMetrics.GPU.Count, + MultiInstance: nodeInfo.ResourceMetrics.GPU.MultiInstance, + Shared: nodeInfo.ResourceMetrics.GPU.Shared, + SharingStrategy: nodeInfo.ResourceMetrics.GPU.SharingStrategy, + Dedicated: nodeInfo.ResourceMetrics.GPU.Dedicated, + Interruptible: nodeInfo.ResourceMetrics.GPU.Interruptible, + NetworkBandwidth: nodeInfo.ResourceMetrics.GPU.NetworkBandwidth, + NetworkLatencyMs: nodeInfo.ResourceMetrics.GPU.NetworkLatencyMs, + NetworkTier: nodeInfo.ResourceMetrics.GPU.NetworkTier, + TrainingScore: nodeInfo.ResourceMetrics.GPU.TrainingScore, + InferenceScore: nodeInfo.ResourceMetrics.GPU.InferenceScore, + HPCScore: nodeInfo.ResourceMetrics.GPU.HPCScore, + GraphicsScore: nodeInfo.ResourceMetrics.GPU.GraphicsScore, + Architecture: nodeInfo.ResourceMetrics.GPU.Architecture, + Interconnect: nodeInfo.ResourceMetrics.GPU.Interconnect, + InterconnectBandwidth: nodeInfo.ResourceMetrics.GPU.InterconnectBandwidth, + ComputeCapability: nodeInfo.ResourceMetrics.GPU.ComputeCapability, + ClockSpeed: nodeInfo.ResourceMetrics.GPU.ClockSpeed, + FP32TFlops: nodeInfo.ResourceMetrics.GPU.FP32TFlops, + Topology: nodeInfo.ResourceMetrics.GPU.Topology, + MultiGPUEfficiency: nodeInfo.ResourceMetrics.GPU.MultiGPUEfficiency, + Region: nodeInfo.ResourceMetrics.GPU.Region, + Zone: nodeInfo.ResourceMetrics.GPU.Zone, + HourlyRate: nodeInfo.ResourceMetrics.GPU.HourlyRate, + Provider: nodeInfo.ResourceMetrics.GPU.Provider, + PreEmptible: nodeInfo.ResourceMetrics.GPU.PreEmptible, + } + k8sSliceType.Policies = nodecorev1alpha1.Policies{ + Partitionability: nodecorev1alpha1.Partitionability{ + CPUMin: parseutil.ParseQuantityFromString(flags.CPUMin), + MemoryMin: parseutil.ParseQuantityFromString(flags.MemoryMin), + PodsMin: parseutil.ParseQuantityFromString(flags.PodsMin), + CPUStep: parseutil.ParseQuantityFromString(flags.CPUStep), + MemoryStep: parseutil.ParseQuantityFromString(flags.MemoryStep), + PodsStep: parseutil.ParseQuantityFromString(flags.PodsStep), + }, + } + // Serialize K8SliceType to JSON + k8SliceTypeJSON, marshalErr := json.Marshal(k8sSliceType) + if marshalErr != nil { + return marshalErr + } + + flavor.Spec.ProviderID = nodeIdentity.NodeID + flavor.Spec.FlavorType = nodecorev1alpha1.FlavorType{ + TypeIdentifier: nodecorev1alpha1.TypeK8Slice, + TypeData: runtime.RawExtension{Raw: k8SliceTypeJSON}, + } + flavor.Spec.Owner = nodeIdentity + // The following options can be changed at runtime by the provider, + // avoiding to overwriting them at every reconciliation. + if shouldCreate { + flavor.Spec.Price.Amount = flags.AMOUNT + flavor.Spec.Price.Currency = flags.CURRENCY + flavor.Spec.Price.Period = flags.PERIOD + flavor.Spec.Availability = true + // FIXME: NetworkPropertyType should be taken in a smarter way + flavor.Spec.NetworkPropertyType = "networkProperty" + // FIXME: Location should be taken in a smarter way + flavor.Spec.Location = &nodecorev1alpha1.Location{ + Latitude: "10", + Longitude: "58", + Country: "Italy", + City: "Turin", + AdditionalNotes: "None", + } + } + + return controllerutil.SetOwnerReference(owner, flavor, r.Client.Scheme()) + }) if err != nil { - return nil, err + return err } - klog.Infof("Flavor created: %s", flavorResult.Name) - return flavorResult, nil + log.Info("Flavor handling completed", "namespacedName", client.ObjectKeyFromObject(flavor), "res", res) + + return nil } // SetupWithManager sets up the controller with the Manager. func (r *NodeReconciler) SetupWithManager(mgr ctrl.Manager) error { return ctrl.NewControllerManagedBy(mgr). - For(&corev1.Node{}). - Watches(&nodecorev1alpha1.Flavor{}, &handler.EnqueueRequestForObject{}). + For(&corev1.Node{}, builder.WithPredicates(predicate.NewPredicateFuncs(func(object client.Object) bool { + return r.LabelSelector().Matches(labels.Set(object.GetLabels())) + }))). + Owns(&nodecorev1alpha1.Flavor{}, builder.MatchEveryOwner). Complete(r) } diff --git a/pkg/local-resource-manager/node_services.go b/pkg/local-resource-manager/node_services.go index 0042071f..6cf9990a 100644 --- a/pkg/local-resource-manager/node_services.go +++ b/pkg/local-resource-manager/node_services.go @@ -16,8 +16,10 @@ package localresourcemanager import ( "fmt" + "strconv" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/resource" "k8s.io/klog/v2" metricsv1beta1 "k8s.io/metrics/pkg/apis/metrics/v1beta1" @@ -57,6 +59,133 @@ func forgeResourceMetrics(nodeMetrics *metricsv1beta1.NodeMetrics, node *corev1. memAvail.Sub(memoryUsed) podsAvail.Sub(podsUsed) + var gpuMetrics models.GPUMetrics + + annotations := node.ObjectMeta.Annotations + if annotations == nil { + annotations = map[string]string{} + } + + if v, found := annotations["provider.fluidos.eu/name"]; found { + gpuMetrics.Provider = v + } + if v, found := annotations["gpu.fluidos.eu/vendor"]; found { + gpuMetrics.Vendor = v + } + if v, found := annotations["gpu.fluidos.eu/model"]; found { + gpuMetrics.Model = v + } + if v, found := annotations["gpu.fluidos.eu/count"]; found { + count, _ := strconv.Atoi(v) + gpuMetrics.Count = int64(count) + } + if v, found := annotations["gpu.fluidos.eu/memory-per-gpu"]; found { + qty := resource.MustParse(v) + + computed := resource.NewQuantity(0, resource.BinarySI) + for range gpuMetrics.Count { + computed.Add(qty) + } + + gpuMetrics.MemoryTotal = *computed + } + if v, found := annotations["gpu.fluidos.eu/tier"]; found { + gpuMetrics.Tier = v + } + if v, found := annotations["gpu.fluidos.eu/architecture"]; found { + gpuMetrics.Architecture = v + } + if v, found := annotations["gpu.fluidos.eu/compute-capability"]; found { + gpuMetrics.ComputeCapability = v + } + if v, found := annotations["nvidia.fluidos.eu/mig-capable"]; found { + gpuMetrics.MultiInstance, _ = strconv.ParseBool(v) + } + if v, found := annotations["gpu.fluidos.eu/fp32-tflops"]; found { + fp32tFlops, _ := strconv.ParseFloat(v, 64) + gpuMetrics.FP32TFlops = fp32tFlops + } + if v, found := annotations["gpu.fluidos.eu/sharing-capable"]; found { + gpuMetrics.Shared, _ = strconv.ParseBool(v) + } + if v, found := annotations["gpu.fluidos.eu/sharing-strategy"]; found { + gpuMetrics.SharingStrategy = v + } + if v, found := annotations["gpu.fluidos.eu/interconnect"]; found { + gpuMetrics.Interconnect = v + } + if v, found := annotations["gpu.fluidos.eu/interconnect-bandwidth-gbps"]; found { + qty := resource.MustParse(v) + gpuMetrics.InterconnectBandwidth = qty + } + if v, found := annotations["gpu.fluidos.eu/cores"]; found { + qty := resource.MustParse(v) + + computed := resource.NewQuantity(0, resource.BinarySI) + for range gpuMetrics.Count { + computed.Add(qty) + } + + gpuMetrics.CoresTotal = *computed + } + if v, found := annotations["gpu.fluidos.eu/clock-speed"]; found { + gpuMetrics.ClockSpeed = resource.MustParse(v) + } + if v, found := annotations["gpu.fluidos.eu/interruptible"]; found { + gpuMetrics.Interruptible, _ = strconv.ParseBool(v) + } + if v, found := annotations["gpu.fluidos.eu/dedicated"]; found { + gpuMetrics.Dedicated, _ = strconv.ParseBool(v) + } + if v, found := annotations["gpu.fluidos.eu/topology"]; found { + gpuMetrics.Topology = v + } + if v, found := annotations["gpu.fluidos.eu/multi-gpu-efficiency"]; found { + gpuMetrics.MultiGPUEfficiency = v + } + if v, found := annotations["cost.fluidos.eu/hourly-rate"]; found { + hourlyRate, _ := strconv.ParseFloat(v, 64) + gpuMetrics.HourlyRate = hourlyRate + } + if v, found := annotations["provider.fluidos.eu/preemptible"]; found { + gpuMetrics.PreEmptible, _ = strconv.ParseBool(v) + } + if v, found := annotations["workload.fluidos.eu/training-score"]; found { + score, _ := strconv.ParseFloat(v, 64) + gpuMetrics.TrainingScore = score + } + if v, found := annotations["workload.fluidos.eu/inference-score"]; found { + score, _ := strconv.ParseFloat(v, 64) + gpuMetrics.InferenceScore = score + } + if v, found := annotations["workload.fluidos.eu/hpc-score"]; found { + score, _ := strconv.ParseFloat(v, 64) + gpuMetrics.HPCScore = score + } + if v, found := annotations["workload.fluidos.eu/graphics-score"]; found { + score, _ := strconv.ParseFloat(v, 64) + gpuMetrics.GraphicsScore = score + } + if v, found := annotations["network.fluidos.eu/bandwidth-gbps"]; found { + qty := resource.MustParse(v) + gpuMetrics.NetworkBandwidth = qty + } + if v, found := annotations["network.fluidos.eu/latency-ms"]; found { + ms, _ := strconv.ParseInt(v, 10, 64) + gpuMetrics.NetworkLatencyMs = ms + } + if v, found := annotations["network.fluidos.eu/tier"]; found { + gpuMetrics.NetworkTier = v + } + + if v, found := annotations["location.fluidos.eu/zone"]; found { + gpuMetrics.Region = v + } + + if v, found := annotations["location.fluidos.eu/region"]; found { + gpuMetrics.Zone = v + } + return &models.ResourceMetrics{ CPUTotal: cpuTotal, CPUAvailable: cpuAvail, @@ -65,6 +194,7 @@ func forgeResourceMetrics(nodeMetrics *metricsv1beta1.NodeMetrics, node *corev1. PodsTotal: podsTotal, PodsAvailable: podsAvail, EphemeralStorage: ephemeralStorage, + GPU: gpuMetrics, } } diff --git a/pkg/local-resource-manager/serviceblueprint_controller.go b/pkg/local-resource-manager/serviceblueprint_controller.go index 0b653753..96ed4cce 100644 --- a/pkg/local-resource-manager/serviceblueprint_controller.go +++ b/pkg/local-resource-manager/serviceblueprint_controller.go @@ -17,6 +17,7 @@ package localresourcemanager import ( "context" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" @@ -59,9 +60,11 @@ func (r *ServiceBlueprintReconciler) Reconcile(ctx context.Context, req ctrl.Req } // Check if the webhook server is running - if err := r.WebhookServer.StartedChecker()(nil); err != nil { - klog.Info("Webhook server not started yet, requeuing the request") - return ctrl.Result{Requeue: true}, nil + if r.WebhookServer != nil { + if err := r.WebhookServer.StartedChecker()(nil); err != nil { + klog.Info("Webhook server not started yet, requeuing the request") + return ctrl.Result{Requeue: true}, nil + } } // Fetch the ServiceBlueprint instance @@ -71,6 +74,11 @@ func (r *ServiceBlueprintReconciler) Reconcile(ctx context.Context, req ctrl.Req flavor := &nodecorev1alpha1.Flavor{} err = r.Get(ctx, req.NamespacedName, flavor) if err != nil { + if errors.IsNotFound(err) { + klog.Infof("%s not found, may have been deleted", req.NamespacedName) + + return ctrl.Result{}, nil + } klog.Errorf("Error getting ServiceBlueprint or Flavor: %v", err) return ctrl.Result{}, err } diff --git a/pkg/rear-controller/discovery-manager/discovery_controller.go b/pkg/rear-controller/discovery-manager/discovery_controller.go index ea6ec8c7..a4967485 100644 --- a/pkg/rear-controller/discovery-manager/discovery_controller.go +++ b/pkg/rear-controller/discovery-manager/discovery_controller.go @@ -18,14 +18,17 @@ import ( "context" "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" advertisementv1alpha1 "github.com/fluidos-project/node/apis/advertisement/v1alpha1" nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" - gateway "github.com/fluidos-project/node/pkg/rear-controller/gateway" - "github.com/fluidos-project/node/pkg/utils/resourceforge" + "github.com/fluidos-project/node/pkg/rear-controller/gateway" + "github.com/fluidos-project/node/pkg/utils/flags" + "github.com/fluidos-project/node/pkg/utils/namings" "github.com/fluidos-project/node/pkg/utils/tools" ) @@ -55,8 +58,6 @@ func (r *DiscoveryReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( log := ctrl.LoggerFrom(ctx, "discovery", req.NamespacedName) ctx = ctrl.LoggerInto(ctx, log) - var peeringCandidate *advertisementv1alpha1.PeeringCandidate - var discovery advertisementv1alpha1.Discovery if err := r.Get(ctx, req.NamespacedName, &discovery); client.IgnoreNotFound(err) != nil { klog.Errorf("Error when getting Discovery %s before reconcile: %s", req.NamespacedName, err) @@ -112,19 +113,34 @@ func (r *DiscoveryReconciler) Reconcile(ctx context.Context, req ctrl.Request) ( klog.Infof("Flavors found: %d", len(flavors)) for _, flavor := range flavors { - peeringCandidate = resourceforge.ForgePeeringCandidate(flavor, discovery.Spec.SolverID, true) - err = r.Create(context.Background(), peeringCandidate) + var peeringCandidate advertisementv1alpha1.PeeringCandidate + peeringCandidate.Namespace = flags.FluidosNamespace + peeringCandidate.Name = namings.ForgePeeringCandidateName(flavor.Name) + + _, err = controllerutil.CreateOrUpdate(ctx, r.Client, &peeringCandidate, func() error { + peeringCandidate.Spec.Flavor.ObjectMeta.Name = flavor.Name + peeringCandidate.Spec.Flavor.ObjectMeta.Namespace = flavor.Namespace + peeringCandidate.Spec.Flavor.Spec = flavor.Spec + peeringCandidate.Spec.Available = true + + ids := sets.New[string](peeringCandidate.Spec.InterestedSolverIDs...) + ids.Insert(discovery.Spec.SolverID) + + peeringCandidate.Spec.InterestedSolverIDs = ids.UnsortedList() + + return nil + }) if err != nil { klog.Infof("Discovery %s failed: error while creating Peering Candidate", discovery.Name) return ctrl.Result{}, err } peeringCandidate.Status.CreationTime = tools.GetTimeNow() - if err := r.Status().Update(ctx, peeringCandidate); err != nil { + if err := r.Status().Update(ctx, &peeringCandidate); err != nil { klog.Errorf("Error when updating PeeringCandidate %s status before reconcile: %s", req.NamespacedName, err) return ctrl.Result{}, err } // Append the PeeringCandidate to the list of PeeringCandidates found by the Discovery - discovery.Status.PeeringCandidateList.Items = append(discovery.Status.PeeringCandidateList.Items, *peeringCandidate) + discovery.Status.PeeringCandidateList.Items = append(discovery.Status.PeeringCandidateList.Items, peeringCandidate) } discovery.SetPhase(nodecorev1alpha1.PhaseSolved, "Discovery Solved: Peering Candidate found") diff --git a/pkg/rear-manager/allocation_controller.go b/pkg/rear-manager/allocation_controller.go index fda8b930..d3a0f313 100644 --- a/pkg/rear-manager/allocation_controller.go +++ b/pkg/rear-manager/allocation_controller.go @@ -397,72 +397,48 @@ func (r *AllocationReconciler) handleK8SliceConsumerAllocation(ctx context.Conte // Get the Liqo credentials for the peering target cluster, that in this scenario is the provider credentials := contract.Spec.PeeringTargetCredentials - // Check if a Liqo peering has been already established - _, err := fcutils.GetForeignClusterByID(ctx, r.Client, v1beta1.ClusterID(credentials.ClusterID)) + // Check if a Liqo peering has been already established: + // if it has been done before for another flavour, perform it in an idempotent way. + klog.InfofDepth(1, "Allocation %s is peering with cluster %s", req.NamespacedName, credentials.ClusterID) + // Decode Kubeconfig + kubeconfig, err := virtualfabricmanager.DecodeKubeconfig(credentials.Kubeconfig) if err != nil { - if apierrors.IsNotFound(err) { - // Establish peering - klog.InfofDepth(1, "Allocation %s is peering with cluster %s", req.NamespacedName, credentials.ClusterID) - // Decode Kubeconfig - kubeconfig, err := virtualfabricmanager.DecodeKubeconfig(credentials.Kubeconfig) - if err != nil { - klog.Errorf("Error when decoding Kubeconfig: %v", err) - allocation.SetStatus(nodecorev1alpha1.Error, "Error when decoding Kubeconfig") - if err := r.updateAllocationStatus(ctx, allocation); err != nil { - klog.Errorf("Error when updating Allocation %s status: %v", req.NamespacedName, err) - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } - remoteClient, remoteRestConfig, err := virtualfabricmanager.CreateKubeClientFromConfig(kubeconfig, r.Client.Scheme()) - if err != nil { - klog.Errorf("Error when creating remote client: %v", err) - allocation.SetStatus(nodecorev1alpha1.Error, "Error when creating remote client") - if err := r.updateAllocationStatus(ctx, allocation); err != nil { - klog.Errorf("Error when updating Allocation %s status: %v", req.NamespacedName, err) - return ctrl.Result{}, err - } - return ctrl.Result{}, nil - } - _, err = virtualfabricmanager.PeerWithCluster( - ctx, - r.Client, - r.RestConfig, - remoteClient, - remoteRestConfig, - contract, - ) - if err != nil { - klog.Errorf("Error when peering with cluster %s: %s", credentials.ClusterID, err) - allocation.SetStatus(nodecorev1alpha1.Error, "Error when peering with cluster "+credentials.ClusterID) - if err := r.updateAllocationStatus(ctx, allocation); err != nil { - klog.Errorf("Error when updating Solver %s status: %s", req.NamespacedName, err) - return ctrl.Result{}, err - } - return ctrl.Result{}, err - } - // Peering established - klog.Infof("Allocation %s has started the peering with cluster %s", req.NamespacedName.Name, credentials.ClusterID) - - // Change the status of the Allocation to Active - allocation.SetStatus(nodecorev1alpha1.Active, "Allocation is now Active") - if err := r.updateAllocationStatus(ctx, allocation); err != nil { - klog.Errorf("Error when updating Solver %s status: %s", req.NamespacedName, err) - return ctrl.Result{}, err - } - } else { - klog.Errorf("Error when getting ForeignCluster %s: %v", credentials.ClusterID, err) - allocation.SetStatus(nodecorev1alpha1.Error, "Error when getting ForeignCluster") - if err := r.updateAllocationStatus(ctx, allocation); err != nil { - klog.Errorf("Error when updating Allocation %s status: %v", req.NamespacedName, err) - return ctrl.Result{}, err - } + klog.Errorf("Error when decoding Kubeconfig: %v", err) + allocation.SetStatus(nodecorev1alpha1.Error, "Error when decoding Kubeconfig") + if err := r.updateAllocationStatus(ctx, allocation); err != nil { + klog.Errorf("Error when updating Allocation %s status: %v", req.NamespacedName, err) + return ctrl.Result{}, err } - } else { - // Peering already established - klog.Infof("Allocation %s has already peered with cluster %s", req.NamespacedName.Name, credentials.ClusterID) return ctrl.Result{}, nil } + remoteClient, remoteRestConfig, err := virtualfabricmanager.CreateKubeClientFromConfig(kubeconfig, r.Client.Scheme()) + if err != nil { + klog.Errorf("Error when creating remote client: %v", err) + allocation.SetStatus(nodecorev1alpha1.Error, "Error when creating remote client") + if err := r.updateAllocationStatus(ctx, allocation); err != nil { + klog.Errorf("Error when updating Allocation %s status: %v", req.NamespacedName, err) + return ctrl.Result{}, err + } + return ctrl.Result{}, nil + } + if _, err = virtualfabricmanager.PeerWithCluster(ctx, r.Client, r.RestConfig, remoteClient, remoteRestConfig, contract); err != nil { + klog.Errorf("Error when peering with cluster %s: %s", credentials.ClusterID, err) + allocation.SetStatus(nodecorev1alpha1.Error, "Error when peering with cluster "+credentials.ClusterID) + if err := r.updateAllocationStatus(ctx, allocation); err != nil { + klog.Errorf("Error when updating Solver %s status: %s", req.NamespacedName, err) + return ctrl.Result{}, err + } + return ctrl.Result{}, err + } + // Peering established + klog.Infof("Allocation %s has started the peering with cluster %s", req.NamespacedName.Name, credentials.ClusterID) + + // Change the status of the Allocation to Active + allocation.SetStatus(nodecorev1alpha1.Active, "Allocation is now Active") + if err := r.updateAllocationStatus(ctx, allocation); err != nil { + klog.Errorf("Error when updating Solver %s status: %s", req.NamespacedName, err) + return ctrl.Result{}, err + } return ctrl.Result{}, nil case nodecorev1alpha1.Released: @@ -1125,15 +1101,10 @@ func computeK8SliceCharacteristics(origin *nodecorev1alpha1.K8SliceCharacteristi switch { case part.Gpu != nil && origin != nil: // Gpu is present in the origin and in the partition - newGpuCores := origin.Gpu.Cores.DeepCopy() - newGpuCores.Sub(part.Gpu.Cores) - newGpuMemory := origin.Gpu.Memory.DeepCopy() - newGpuMemory.Sub(part.Gpu.Memory) - return &nodecorev1alpha1.GPU{ - Model: origin.Gpu.Model, - Cores: newGpuCores, - Memory: newGpuMemory, - } + newGpu := origin.DeepCopy() + newGpu.Gpu.Count = 0 + + return newGpu.Gpu case part.Gpu == nil && origin.Gpu != nil: // Gpu is present in the origin but not in the partition return origin.Gpu.DeepCopy() @@ -1230,7 +1201,6 @@ func reduceFlavorAvailability(ctx context.Context, flavor *nodecorev1alpha1.Flav } newFlavor := resourceforge.ForgeFlavorFromRef(flavor, newFlavorType) - // Create new Flavor if err := r.Create(ctx, newFlavor); err != nil { klog.Errorf("Error when creating Flavor %s: %v", newFlavor.Name, err) diff --git a/pkg/utils/common/common.go b/pkg/utils/common/common.go index a3cfc10a..0768ec7b 100644 --- a/pkg/utils/common/common.go +++ b/pkg/utils/common/common.go @@ -18,8 +18,12 @@ import ( "encoding/json" "fmt" "regexp" + "slices" + "golang.org/x/exp/maps" "k8s.io/apimachinery/pkg/api/resource" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/klog/v2" advertisementv1alpha1 "github.com/fluidos-project/node/apis/advertisement/v1alpha1" @@ -144,6 +148,56 @@ func filterResourceQuantityFilter(selectorValue resource.Quantity, filter models return true } +func filterNumberFilter(selectorValue float64, filter models.NumberFilter) bool { + switch filter.Name { + case models.MatchFilter: + // Parse the filter to a match filter + var matchFilter models.NumberMatchFilter + err := json.Unmarshal(filter.Data, &matchFilter) + if err != nil { + klog.Errorf("Error unmarshalling match filter: %v", err) + return false + } + // Check if the selector value matches the filter value + if selectorValue != matchFilter.Value { + klog.Infof("Match Filter: %f - Selector Value: %s", matchFilter.Value, selectorValue) + return false + } + case models.RangeFilter: + // Parse the filter to a range filter + var rangeFilter models.NumberRangeFilter + err := json.Unmarshal(filter.Data, &rangeFilter) + if err != nil { + klog.Errorf("Error unmarshalling range filter: %v", err) + return false + } + // Check if the selector value is within the range + // If the rangeFilter.Min exists check if the selector value is greater or equal to it + if rangeFilter.Min != nil { + if selectorValue < *rangeFilter.Min { + klog.Infof("Range Filter: %v-%v - Selector Value: %v", rangeFilter.Min, rangeFilter.Max, selectorValue) + return false + } + } + // If the rangeFilter.Max exists check if the selector value is less or equal to it + if rangeFilter.Max != nil { + if selectorValue > *rangeFilter.Max { + klog.Infof("Range Filter: %v-%v - Selector Value: %v", rangeFilter.Min, rangeFilter.Max, selectorValue) + return false + } + } + default: + klog.Errorf("Filter name %s not supported", filter.Name) + return false + } + + return true +} + +func filterBooleanFilter(selectorValue bool, filter models.BooleanFilter) bool { + return selectorValue != filter.Condition +} + func filterStringFilter(selectorValue string, filter models.StringFilter) bool { switch filter.Name { case models.MatchFilter: @@ -230,6 +284,66 @@ func filterFlavorK8Slice(k8SliceSelector *models.K8SliceSelector, flavorTypeK8Sl } } + if gpuTraits := flavorTypeK8SliceCR.Characteristics.Gpu; gpuTraits != nil { + copied := gpuTraits.DeepCopy() + u, err := runtime.DefaultUnstructuredConverter.ToUnstructured(copied) + if err != nil { + klog.Errorf("Error converting gpu traits to unstructured: %v", err) + return false + } + + keys := maps.Keys(k8SliceSelector.GPUFields) + slices.Sort(keys) + + for _, key := range keys { + filter := k8SliceSelector.GPUFields[key] + + switch f := filter.(type) { + case models.NumberFilter: + v, found, _ := unstructured.NestedNumberAsFloat64(u, key) + if !found { + return false + } + + if !filterNumberFilter(v, f) { + return false + } + case models.BooleanFilter: + v, found, _ := unstructured.NestedBool(u, key) + if !found { + return false + } + + if !filterBooleanFilter(v, f) { + return false + } + case models.StringFilter: + v, found, _ := unstructured.NestedString(u, key) + if !found { + return false + } + + if !filterStringFilter(v, f) { + return false + } + case models.ResourceQuantityFilter: + v, found, _ := unstructured.NestedString(u, key) + if !found { + return false + } + + qty, parseErr := resource.ParseQuantity(v) + if parseErr != nil { + return false + } + + if !filterResourceQuantityFilter(qty, f) { + return false + } + } + } + } + return true } diff --git a/pkg/utils/models/filters.go b/pkg/utils/models/filters.go index 61504b53..2ff3045e 100644 --- a/pkg/utils/models/filters.go +++ b/pkg/utils/models/filters.go @@ -20,15 +20,57 @@ import ( "k8s.io/apimachinery/pkg/api/resource" ) +type FilterData interface { + GetFilterType() FilterType +} + +// FilterType represents the type of Filter. +type FilterType string + +const ( + // MatchFilter is the identifier for a match filter. + MatchFilter FilterType = "Match" + // RangeFilter is the identifier for a range filter. + RangeFilter FilterType = "Range" +) + +// NumberFilter represents a filter for a numeric value. +type NumberFilter struct { + Name FilterType `scheme:"name"` + Data json.RawMessage `scheme:"data"` +} + +func (r NumberFilter) GetFilterType() FilterType { + return r.Name +} + +type NumberMatchFilter struct { + Value float64 `scheme:"value"` +} + +type NumberRangeFilter struct { + Min *float64 `scheme:"min,omitempty"` + Max *float64 `scheme:"max,omitempty"` +} + // ResourceQuantityFilter represents a filter for a resource quantity. type ResourceQuantityFilter struct { Name FilterType `scheme:"name"` Data json.RawMessage `scheme:"data"` } -// ResourceQuantityFilterData represents the data of a ResourceQuantityFilter. -type ResourceQuantityFilterData interface { - GetFilterType() FilterType +func (r ResourceQuantityFilter) GetFilterType() FilterType { + return r.Name +} + +// BooleanFilter is a filter that can be applied to a boolean value. +type BooleanFilter struct { + Condition bool `scheme:"condition"` +} + +// GetFilterType returns the type of the Filter. +func (BooleanFilter) GetFilterType() FilterType { + return MatchFilter } // StringFilter represents a filter for a string. @@ -37,52 +79,26 @@ type StringFilter struct { Data json.RawMessage `scheme:"data"` } -// StringFilterData represents the data of a StringFilter. -type StringFilterData interface { - GetFilterType() FilterType +func (s StringFilter) GetFilterType() FilterType { + return s.Name } -// FilterType represents the type of a Filter. -type FilterType string - -const ( - // MatchFilter is the identifier for a match filter. - MatchFilter FilterType = "Match" - // RangeFilter is the identifier for a range filter. - RangeFilter FilterType = "Range" -) - // ResourceQuantityMatchFilter represents a match filter for a resource quantity. type ResourceQuantityMatchFilter struct { Value resource.Quantity `scheme:"value"` } -// GetFilterType returns the type of the Filter. -func (fq ResourceQuantityMatchFilter) GetFilterType() FilterType { - return MatchFilter -} - // ResourceQuantityRangeFilter represents a range filter for a resource quantity. type ResourceQuantityRangeFilter struct { Min *resource.Quantity `scheme:"min,omitempty"` Max *resource.Quantity `scheme:"max,omitempty"` } -// GetFilterType returns the type of the Filter. -func (fq ResourceQuantityRangeFilter) GetFilterType() FilterType { - return RangeFilter -} - // StringMatchFilter represents a match filter for a string. type StringMatchFilter struct { Value string `scheme:"value"` } -// GetFilterType returns the type of the Filter. -func (fq StringMatchFilter) GetFilterType() FilterType { - return MatchFilter -} - // StringRangeFilter represents a range filter for a string. type StringRangeFilter struct { Regex string `scheme:"regex"` diff --git a/pkg/utils/models/flavor-k8slice-models.go b/pkg/utils/models/flavor-k8slice-models.go index cf8b4d3b..d175de51 100644 --- a/pkg/utils/models/flavor-k8slice-models.go +++ b/pkg/utils/models/flavor-k8slice-models.go @@ -16,29 +16,43 @@ package models import ( "encoding/json" - "strings" "k8s.io/apimachinery/pkg/api/resource" - - nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" ) // GpuCharacteristics represents the characteristics of a Gpu. type GpuCharacteristics struct { - Model string `json:"model"` - Cores resource.Quantity `json:"cores"` - Memory resource.Quantity `json:"memory"` -} - -// Cmp compares models.GpuCharacteristics with nodecorev1alpha1.GPU. -func (gpu *GpuCharacteristics) Cmp(other *nodecorev1alpha1.GPU) int { - if gpu.Model != other.Model { - return strings.Compare(gpu.Model, other.Model) - } - if cmp := gpu.Cores.Cmp(other.Cores); cmp != 0 { - return cmp - } - return gpu.Memory.Cmp(other.Memory) + Vendor string `json:"vendor"` + Model string `json:"model"` + Count int64 `json:"count"` + Tier string `json:"tier"` + MultiInstance bool `json:"multi_instance"` + Shared bool `json:"shared"` + SharingStrategy string `json:"sharing_strategy"` + Dedicated bool `json:"dedicated"` + Interruptible bool `json:"interruptible"` + NetworkBandwidth resource.Quantity `json:"network_bandwidth"` + NetworkLatencyMs int64 `json:"network_latency_ms"` + NetworkTier string `json:"network_tier"` + TrainingScore float64 `json:"training_score"` + InferenceScore float64 `json:"inference_score"` + HPCScore float64 `json:"hpc_score"` + GraphicsScore float64 `json:"graphics_score"` + Architecture string `json:"architecture"` + Interconnect string `json:"interconnect"` + InterconnectBandwidth resource.Quantity `json:"interconnect_bandwidth"` + Cores resource.Quantity `json:"cores"` + Memory resource.Quantity `json:"memory"` + ComputeCapability string `json:"compute_capability"` + ClockSpeed resource.Quantity `json:"clock_speed"` + FP32TFlops float64 `json:"fp32_tflops"` + Topology string `json:"topology"` + MultiGPUEfficiency string `json:"multi_gpu_efficiency"` + Region string `json:"region"` + Zone string `json:"zone"` + HourlyRate float64 `json:"hourly_rate"` + Provider string `json:"provider"` + PreEmptible bool `json:"pre_emptible"` } // K8SliceCharacteristics represents the characteristics of a Kubernetes slice. @@ -94,6 +108,7 @@ type K8SliceSelector struct { Memory *ResourceQuantityFilter `scheme:"memory,omitempty"` Pods *ResourceQuantityFilter `scheme:"pods,omitempty"` Storage *ResourceQuantityFilter `scheme:"storage,omitempty"` + GPUFields map[string]FilterData `scheme:"gpuFields,omitempty"` } // GetSelectorType returns the type of the Selector. diff --git a/pkg/utils/models/local-resource-manager.go b/pkg/utils/models/local-resource-manager.go index d7484bac..24b6abd0 100644 --- a/pkg/utils/models/local-resource-manager.go +++ b/pkg/utils/models/local-resource-manager.go @@ -29,11 +29,62 @@ type NodeInfo struct { // GPUMetrics represents GPU metrics. type GPUMetrics struct { - Model string `json:"model"` - CoresTotal resource.Quantity `json:"totalCores"` - CoresAvailable resource.Quantity `json:"availableCores"` - MemoryTotal resource.Quantity `json:"totalMemory"` - MemoryAvailable resource.Quantity `json:"availableMemory"` + Vendor string `json:"vendor,omitempty"` + Model string `json:"model,omitempty"` + Count int64 `json:"count"` + MemoryTotal resource.Quantity `json:"total_memory"` + CoresTotal resource.Quantity `json:"total_cores"` + Tier string `json:"tier,omitempty"` + + GPUSharingMetrics `json:",inline"` + GPUNetworkMetrics `json:",inline"` + GPUScoreMetrics `json:",inline"` + GPUSpecMetrics `json:",inline"` + GPURentingMetrics `json:",inline"` + GPUProviderMetrics `json:",inline"` +} + +type GPUSharingMetrics struct { + MultiInstance bool `json:"multi_instance,omitempty"` + Shared bool `json:"shared,omitempty"` + SharingStrategy string `json:"sharing_strategy,omitempty"` + Dedicated bool `json:"dedicated,omitempty"` + Interruptible bool `json:"interruptible,omitempty"` +} + +type GPUNetworkMetrics struct { + NetworkBandwidth resource.Quantity `json:"network_bandwidth,omitempty"` + NetworkLatencyMs int64 `json:"network_latency_ms,omitempty"` + NetworkTier string `json:"network_tier,omitempty"` +} + +type GPUScoreMetrics struct { + TrainingScore float64 `json:"training_score,omitempty"` + InferenceScore float64 `json:"inference_score,omitempty"` + HPCScore float64 `json:"hpc_score,omitempty"` + GraphicsScore float64 `json:"graphics_score,omitempty"` +} + +type GPUSpecMetrics struct { + Architecture string `json:"architecture,omitempty"` + Interconnect string `json:"interconnect,omitempty"` + InterconnectBandwidth resource.Quantity `json:"interconnect_bandwidth,omitempty"` + ComputeCapability string `json:"compute_capability,omitempty"` + ClockSpeed resource.Quantity `json:"clock_speed,omitempty"` + FP32TFlops float64 `json:"fp32_tflops,omitempty"` + Topology string `json:"topology,omitempty"` + MultiGPUEfficiency string `json:"multi_gpu_efficiency,omitempty"` +} + +type GPURentingMetrics struct { + Region string `json:"region,omitempty"` + Zone string `json:"zone,omitempty"` + HourlyRate float64 `json:"hourly_rate,omitempty"` +} + +type GPUProviderMetrics struct { + Provider string `json:"provider,omitempty"` + PreEmptible bool `json:"pre_emptible,omitempty"` } // ResourceMetrics represents resources of a certain node. diff --git a/pkg/utils/parseutil/parseutil.go b/pkg/utils/parseutil/parseutil.go index 93f0cf7d..870c560e 100644 --- a/pkg/utils/parseutil/parseutil.go +++ b/pkg/utils/parseutil/parseutil.go @@ -47,6 +47,7 @@ func ParseFlavorSelector(selector *nodecorev1alpha1.Selector) (models.Selector, Memory: nil, Pods: nil, Storage: nil, + GPUFields: nil, }, nil } // Force casting of selectorStruct to K8Slice @@ -100,6 +101,114 @@ func ParseFlavorSelector(selector *nodecorev1alpha1.Selector) (models.Selector, } } +func ParseGPUFilter(selector nodecorev1alpha1.GPUFieldSelector) (models.FilterData, error) { + switch selector.Selector { + case nodecorev1alpha1.ResourceRangeSelectorName: + return ParseResourceQuantityFilter(&nodecorev1alpha1.ResourceQuantityFilter{ + Name: nodecorev1alpha1.TypeRangeFilter, + Data: selector.Data, + }) + case nodecorev1alpha1.ResourceMatchSelectorName: + return ParseResourceQuantityFilter(&nodecorev1alpha1.ResourceQuantityFilter{ + Name: nodecorev1alpha1.TypeMatchFilter, + Data: selector.Data, + }) + case nodecorev1alpha1.StringRangeSelectorName: + return ParseStringFilter(&nodecorev1alpha1.StringFilter{ + Name: nodecorev1alpha1.TypeRangeFilter, + Data: selector.Data, + }) + case nodecorev1alpha1.StringFilterSelectorName: + return ParseStringFilter(&nodecorev1alpha1.StringFilter{ + Name: nodecorev1alpha1.TypeMatchFilter, + Data: selector.Data, + }) + case nodecorev1alpha1.BooleanFilterSelectorName: + return ParseBooleanFilter(&nodecorev1alpha1.BooleanFilter{ + Data: selector.Data, + }) + case nodecorev1alpha1.NumberRangeSelectorName: + return ParseNumberFilter(&nodecorev1alpha1.NumberFilter{ + Name: nodecorev1alpha1.TypeRangeFilter, + Data: selector.Data, + }) + case nodecorev1alpha1.NumberMatchSelectorName: + return ParseNumberFilter(&nodecorev1alpha1.NumberFilter{ + Name: nodecorev1alpha1.TypeMatchFilter, + Data: selector.Data, + }) + default: + return nil, fmt.Errorf("unknown filter type") + } +} + +func ParseNumberFilter(filter *nodecorev1alpha1.NumberFilter) (models.NumberFilter, error) { + var filterModel models.NumberFilter + + if filter == nil { + return filterModel, nil + } + + klog.Infof("Parsing the filter %s", filter.Name) + + switch filter.Name { + case nodecorev1alpha1.TypeMatchFilter: + klog.Info("Parsing the filter as a MatchFilter") + // Unmarshal the data into a ResourceMatchSelector + var matchFilter nodecorev1alpha1.NumberMatchSelector + err := json.Unmarshal(filter.Data.Raw, &matchFilter) + if err != nil { + return filterModel, err + } + + matchFilterData := models.NumberMatchFilter{ + Value: matchFilter.Value, + } + // Marshal the filter data into JSON + matchFilterDataJSON, err := json.Marshal(matchFilterData) + if err != nil { + return filterModel, err + } + // Generate the model for the filter + filterModel = models.NumberFilter{ + Name: models.MatchFilter, + Data: matchFilterDataJSON, + } + + klog.Infof("Filter model: %v", filterModel) + case nodecorev1alpha1.TypeRangeFilter: + klog.Info("Parsing the filter as a RangeFilter") + // Unmarshal the data into a NumberRangeSelector + var rangeFilter nodecorev1alpha1.NumberRangeSelector + err := json.Unmarshal(filter.Data.Raw, &rangeFilter) + if err != nil { + return filterModel, err + } + + rangeFilterData := models.NumberRangeFilter{ + Min: rangeFilter.Min, + Max: rangeFilter.Max, + } + // Marshal the filter data into JSON + rangeFilterDataJSON, err := json.Marshal(rangeFilterData) + if err != nil { + return filterModel, err + } + + // Generate the model for the filter + filterModel = models.NumberFilter{ + Name: models.RangeFilter, + Data: rangeFilterDataJSON, + } + + klog.Infof("Filter model: %v", filterModel) + default: + return filterModel, fmt.Errorf("unknown filter type") + } + + return filterModel, nil +} + // ParseResourceQuantityFilter parses a filter of type ResourceQuantityFilter into a ResourceQuantityFilter model. func ParseResourceQuantityFilter(filter *nodecorev1alpha1.ResourceQuantityFilter) (models.ResourceQuantityFilter, error) { var filterModel models.ResourceQuantityFilter @@ -167,6 +276,21 @@ func ParseResourceQuantityFilter(filter *nodecorev1alpha1.ResourceQuantityFilter return filterModel, nil } +// ParseBooleanFilter parses a filter of type BooleanFilter into a BooleanFilter model. +func ParseBooleanFilter(filter *nodecorev1alpha1.BooleanFilter) (models.BooleanFilter, error) { + var filterModel models.BooleanFilter + + if filter == nil { + return filterModel, nil + } + + if err := json.Unmarshal(filter.Data.Raw, &filterModel.Condition); err != nil { + return filterModel, err + } + + return filterModel, nil +} + // ParseStringFilter parses a filter of type StringFilter into a StringFilter model. func ParseStringFilter(filter *nodecorev1alpha1.StringFilter) (models.StringFilter, error) { var filterModel models.StringFilter @@ -320,6 +444,22 @@ func parseK8SliceFilters(k8sSelector *nodecorev1alpha1.K8SliceSelector) (*models } return nil }(), + GPUFields: func() map[string]models.FilterData { + gpuFilters := map[string]models.FilterData{} + + for _, f := range k8sSelector.GPUFilters { + parsed, parsedErr := ParseGPUFilter(f) + if parsedErr != nil { + klog.Infof("GPU filter %q for field %q has parsing error: %s", f.Selector, f.Field, parsedErr.Error()) + + continue + } + + gpuFilters[f.Field] = parsed + } + + return gpuFilters + }(), } return &k8SliceSelector, nil @@ -384,11 +524,7 @@ func ParseConfiguration(configuration *nodecorev1alpha1.Configuration, flavor *n Pods: configuration.Pods, Gpu: func() *models.GpuCharacteristics { if configuration.Gpu != nil { - return &models.GpuCharacteristics{ - Model: configuration.Gpu.Model, - Cores: configuration.Gpu.Cores, - Memory: configuration.Gpu.Memory, - } + return ToGpuCharacteristics(*configuration.Gpu) } return nil }(), @@ -581,11 +717,7 @@ func ParseFlavor(flavor *nodecorev1alpha1.Flavor) *models.Flavor { Pods: flavorTypeStruct.Characteristics.Pods, Gpu: func() *models.GpuCharacteristics { if flavorTypeStruct.Characteristics.Gpu != nil { - return &models.GpuCharacteristics{ - Model: flavorTypeStruct.Characteristics.Gpu.Model, - Cores: flavorTypeStruct.Characteristics.Gpu.Cores, - Memory: flavorTypeStruct.Characteristics.Gpu.Memory, - } + return ToGpuCharacteristics(*flavorTypeStruct.Characteristics.Gpu) } return nil }(), diff --git a/pkg/utils/parseutil/to_gpu_characteristics.go b/pkg/utils/parseutil/to_gpu_characteristics.go new file mode 100644 index 00000000..5f3458db --- /dev/null +++ b/pkg/utils/parseutil/to_gpu_characteristics.go @@ -0,0 +1,56 @@ +// Copyright 2022-2025 FLUIDOS Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parseutil + +import ( + "github.com/fluidos-project/node/apis/nodecore/v1alpha1" + "github.com/fluidos-project/node/pkg/utils/models" +) + +func ToGpuCharacteristics(input v1alpha1.GPU) *models.GpuCharacteristics { + return &models.GpuCharacteristics{ + Vendor: input.Vendor, + Model: input.Model, + Count: input.Count, + Tier: input.Tier, + MultiInstance: input.MultiInstance, + Shared: input.Shared, + SharingStrategy: input.SharingStrategy, + Dedicated: input.Dedicated, + Interruptible: input.Interruptible, + NetworkBandwidth: input.NetworkBandwidth, + NetworkLatencyMs: input.NetworkLatencyMs, + NetworkTier: input.NetworkTier, + TrainingScore: input.TrainingScore, + InferenceScore: input.InferenceScore, + HPCScore: input.HPCScore, + GraphicsScore: input.GraphicsScore, + Architecture: input.Architecture, + Interconnect: input.Interconnect, + InterconnectBandwidth: input.InterconnectBandwidth, + Cores: input.Cores, + Memory: input.Memory, + ComputeCapability: input.ComputeCapability, + ClockSpeed: input.ClockSpeed, + FP32TFlops: input.FP32TFlops, + Topology: input.Topology, + MultiGPUEfficiency: input.MultiGPUEfficiency, + Region: input.Region, + Zone: input.Zone, + HourlyRate: input.HourlyRate, + Provider: input.Provider, + PreEmptible: input.PreEmptible, + } +} diff --git a/pkg/utils/parseutil/to_node_gpu.go b/pkg/utils/parseutil/to_node_gpu.go new file mode 100644 index 00000000..5c50867c --- /dev/null +++ b/pkg/utils/parseutil/to_node_gpu.go @@ -0,0 +1,57 @@ +// Copyright 2022-2025 FLUIDOS Project +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package parseutil + +import ( + nodecorev1alpha1 "github.com/fluidos-project/node/apis/nodecore/v1alpha1" + "github.com/fluidos-project/node/pkg/utils/models" +) + +func ToNodeCoreGPU(in models.GpuCharacteristics) *nodecorev1alpha1.GPU { + return &nodecorev1alpha1.GPU{ + Model: in.Model, + Cores: in.Cores, + Memory: in.Memory, + Vendor: in.Vendor, + Tier: in.Tier, + Count: in.Count, + MultiInstance: in.MultiInstance, + Shared: in.Shared, + SharingStrategy: in.SharingStrategy, + Dedicated: in.Dedicated, + Interruptible: in.Interruptible, + NetworkBandwidth: in.NetworkBandwidth, + NetworkLatencyMs: in.NetworkLatencyMs, + NetworkTier: in.NetworkTier, + TrainingScore: in.TrainingScore, + InferenceScore: in.InferenceScore, + HPCScore: in.HPCScore, + GraphicsScore: in.GraphicsScore, + Architecture: in.Architecture, + Interconnect: in.Interconnect, + InterconnectBandwidth: in.InterconnectBandwidth, + ComputeCapability: in.ComputeCapability, + ClockSpeed: in.ClockSpeed, + FP32TFlops: in.FP32TFlops, + Topology: in.Topology, + MultiGPUEfficiency: in.MultiGPUEfficiency, + Region: in.Region, + Zone: in.Zone, + HourlyRate: in.HourlyRate, + Provider: in.Provider, + PreEmptible: in.PreEmptible, + } + +} diff --git a/pkg/utils/resourceforge/forge.go b/pkg/utils/resourceforge/forge.go index 31ffc06a..34e679fc 100644 --- a/pkg/utils/resourceforge/forge.go +++ b/pkg/utils/resourceforge/forge.go @@ -65,29 +65,6 @@ func ForgeDiscovery(selector *nodecorev1alpha1.Selector, solverID string) *adver } } -// ForgePeeringCandidate creates a PeeringCandidate CR from a Flavor and a Discovery. -func ForgePeeringCandidate(flavorPeeringCandidate *nodecorev1alpha1.Flavor, - solverID string, available bool) (pc *advertisementv1alpha1.PeeringCandidate) { - pc = &advertisementv1alpha1.PeeringCandidate{ - ObjectMeta: metav1.ObjectMeta{ - Name: namings.ForgePeeringCandidateName(flavorPeeringCandidate.Name), - Namespace: flags.FluidosNamespace, - }, - Spec: advertisementv1alpha1.PeeringCandidateSpec{ - Flavor: nodecorev1alpha1.Flavor{ - ObjectMeta: metav1.ObjectMeta{ - Name: flavorPeeringCandidate.Name, - Namespace: flavorPeeringCandidate.Namespace, - }, - Spec: flavorPeeringCandidate.Spec, - }, - Available: available, - }, - } - pc.Spec.InterestedSolverIDs = append(pc.Spec.InterestedSolverIDs, solverID) - return -} - // ForgeReservation creates a Reservation CR from a PeeringCandidate. func ForgeReservation(pc *advertisementv1alpha1.PeeringCandidate, configuration *nodecorev1alpha1.Configuration, @@ -199,75 +176,6 @@ func ForgeContract( } } -// ForgeK8SliceFlavorFromMetrics creates a new flavor custom resource from the metrics of the node. -func ForgeK8SliceFlavorFromMetrics(node *models.NodeInfo, ni nodecorev1alpha1.NodeIdentity, - ownerReferences []metav1.OwnerReference) (flavor *nodecorev1alpha1.Flavor) { - k8SliceType := nodecorev1alpha1.K8Slice{ - Characteristics: nodecorev1alpha1.K8SliceCharacteristics{ - Architecture: node.Architecture, - CPU: node.ResourceMetrics.CPUAvailable, - Memory: node.ResourceMetrics.MemoryAvailable, - Pods: node.ResourceMetrics.PodsAvailable, - Storage: &node.ResourceMetrics.EphemeralStorage, - Gpu: &nodecorev1alpha1.GPU{ - Model: node.ResourceMetrics.GPU.Model, - Cores: node.ResourceMetrics.GPU.CoresAvailable, - Memory: node.ResourceMetrics.GPU.MemoryAvailable, - }, - }, - Properties: nodecorev1alpha1.Properties{}, - Policies: nodecorev1alpha1.Policies{ - Partitionability: nodecorev1alpha1.Partitionability{ - CPUMin: parseutil.ParseQuantityFromString(flags.CPUMin), - MemoryMin: parseutil.ParseQuantityFromString(flags.MemoryMin), - PodsMin: parseutil.ParseQuantityFromString(flags.PodsMin), - CPUStep: parseutil.ParseQuantityFromString(flags.CPUStep), - MemoryStep: parseutil.ParseQuantityFromString(flags.MemoryStep), - PodsStep: parseutil.ParseQuantityFromString(flags.PodsStep), - }, - }, - } - - // Serialize K8SliceType to JSON - k8SliceTypeJSON, err := json.Marshal(k8SliceType) - if err != nil { - klog.Errorf("Error when marshaling K8SliceType: %s", err) - return nil - } - - return &nodecorev1alpha1.Flavor{ - ObjectMeta: metav1.ObjectMeta{ - Name: namings.ForgeFlavorName(string(nodecorev1alpha1.TypeK8Slice), ni.Domain), - Namespace: flags.FluidosNamespace, - OwnerReferences: ownerReferences, - }, - Spec: nodecorev1alpha1.FlavorSpec{ - ProviderID: ni.NodeID, - FlavorType: nodecorev1alpha1.FlavorType{ - TypeIdentifier: nodecorev1alpha1.TypeK8Slice, - TypeData: runtime.RawExtension{Raw: k8SliceTypeJSON}, - }, - Owner: ni, - Price: nodecorev1alpha1.Price{ - Amount: flags.AMOUNT, - Currency: flags.CURRENCY, - Period: flags.PERIOD, - }, - Availability: true, - // FIXME: NetworkPropertyType should be taken in a smarter way - NetworkPropertyType: "networkProperty", - // FIXME: Location should be taken in a smarter way - Location: &nodecorev1alpha1.Location{ - Latitude: "10", - Longitude: "58", - Country: "Italy", - City: "Turin", - AdditionalNotes: "None", - }, - }, - } -} - // ForgeServiceFlavorFromBlueprint creates a new flavor custom resource from a ServiceBlueprint. func ForgeServiceFlavorFromBlueprint(serviceBlueprint *nodecorev1alpha1.ServiceBlueprint, ni *nodecorev1alpha1.NodeIdentity, ownerReferences []metav1.OwnerReference) (flavor *nodecorev1alpha1.Flavor) { @@ -592,12 +500,9 @@ func ForgeConfigurationFromObj(configuration models.Configuration) (*nodecorev1a Pods: configurationStruct.Pods, Gpu: func() *nodecorev1alpha1.GPU { if configurationStruct.Gpu != nil { - return &nodecorev1alpha1.GPU{ - Model: configurationStruct.Gpu.Model, - Cores: configurationStruct.Gpu.Cores, - Memory: configurationStruct.Gpu.Memory, - } + return parseutil.ToNodeCoreGPU(*configurationStruct.Gpu) } + return nil }(), Storage: configurationStruct.Storage, @@ -670,11 +575,7 @@ func ForgeConfigurationObj(configuration *nodecorev1alpha1.Configuration) (*mode Pods: configurationStruct.Pods, Gpu: func() *models.GpuCharacteristics { if configurationStruct.Gpu != nil { - return &models.GpuCharacteristics{ - Model: configurationStruct.Gpu.Model, - Cores: configurationStruct.Gpu.Cores, - Memory: configurationStruct.Gpu.Memory, - } + return parseutil.ToGpuCharacteristics(*configurationStruct.Gpu) } return nil }(), @@ -891,12 +792,9 @@ func ForgeFlavorFromObj(flavor *models.Flavor) (*nodecorev1alpha1.Flavor, error) Storage: flavorTypeDataModel.Characteristics.Storage, Gpu: func() *nodecorev1alpha1.GPU { if flavorTypeDataModel.Characteristics.Gpu != nil { - return &nodecorev1alpha1.GPU{ - Model: flavorTypeDataModel.Characteristics.Gpu.Model, - Cores: flavorTypeDataModel.Characteristics.Gpu.Cores, - Memory: flavorTypeDataModel.Characteristics.Gpu.Memory, - } + return parseutil.ToNodeCoreGPU(*flavorTypeDataModel.Characteristics.Gpu) } + return nil }(), }, diff --git a/pkg/virtual-fabric-manager/services.go b/pkg/virtual-fabric-manager/services.go index 69413db0..1b414c06 100644 --- a/pkg/virtual-fabric-manager/services.go +++ b/pkg/virtual-fabric-manager/services.go @@ -32,13 +32,14 @@ import ( ipamLiqo "github.com/liqotech/liqo/pkg/utils/ipam" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" clientcmdapi "k8s.io/client-go/tools/clientcmd/api" - klog "k8s.io/klog/v2" + "k8s.io/klog/v2" "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" @@ -375,7 +376,7 @@ func createTenantNamespace(ctx context.Context, cl client.Client, clusterID core err := cl.Create(ctx, tenantNamespace) if err != nil { klog.Error(err) - return "", err + return name, err } klog.InfofDepth(1, "Tenant namespace %s created in %s cluster", name, clusterID) return name, nil @@ -447,15 +448,19 @@ func EstablishNetwork( // Create local tenant namespaces localNamespaceName, err := createTenantNamespace(ctx, localClient, remoteClusterIdentity) if err != nil { - klog.Error(err) - return nil, nil, "", "", err + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, "", "", err + } } // Create remote tenant namespaces remoteNamespaceName, err := createTenantNamespace(ctx, remoteClient, localClusterIdentity) if err != nil { - klog.Error(err) - return nil, nil, "", "", err + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, "", "", err + } } klog.InfoDepth(1, "Creating configurations...") @@ -474,10 +479,11 @@ func EstablishNetwork( } // Remote cluster applies Local configuration - err = remoteClient.Create(ctx, localConfiguration) - if err != nil { - klog.Error(err) - return nil, nil, "", "", err + if err = remoteClient.Create(ctx, localConfiguration); err != nil { + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, "", "", err + } } klog.InfofDepth(1, "Local configuration %s created", localConfiguration.Name) @@ -496,10 +502,11 @@ func EstablishNetwork( } // Local cluster applies Remote configuration - err = localClient.Create(ctx, remoteConfiguration) - if err != nil { - klog.Error(err) - return nil, nil, "", "", err + if err = localClient.Create(ctx, remoteConfiguration); err != nil { + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, "", "", err + } } klog.InfofDepth(1, "Remote configuration %s created", remoteConfiguration.Name) @@ -536,7 +543,6 @@ func EstablishNetwork( gatewayServerPort, ) if err != nil { - klog.Error(err) return nil, nil, "", "", err } @@ -558,8 +564,10 @@ func EstablishNetwork( // Create public key on Local cluster err = remoteClient.Create(ctx, localPublicKey) if err != nil { - klog.Error(err) - return nil, nil, "", "", err + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, "", "", err + } } klog.InfofDepth(1, "Local public key %s created", localPublicKey.Name) @@ -580,8 +588,10 @@ func EstablishNetwork( // Create public key on Remote cluster err = localClient.Create(ctx, remotePublicKey) if err != nil { - klog.Error(err) - return nil, nil, "", "", err + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, "", "", err + } } klog.InfofDepth(1, "Remote public key %s created", remotePublicKey.Name) @@ -729,8 +739,10 @@ func createGatewayClient( // Create GatewayClient on Local cluster err = localClient.Create(ctx, gatewayClient) if err != nil { - klog.Error(err) - return nil, nil, err + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, err + } } klog.InfofDepth(1, "GatewayClient %s created", gwClientName) @@ -798,10 +810,11 @@ func createGatewayServer( return nil, nil, 0, nil, err } - err = remoteClient.Create(ctx, gatewayServer) - if err != nil { - klog.Error(err) - return nil, nil, 0, nil, err + if err = remoteClient.Create(ctx, gatewayServer); err != nil { + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, nil, 0, nil, err + } } klog.InfofDepth(1, "GatewayServer %s created", gwServerName) @@ -1226,8 +1239,10 @@ func PeerWithCluster( // Authenticate with remote cluster err = Authentication(ctx, localClient, localRestConfig, remoteclient, remoteRestConfig, localNamespaceName, remoteNamespaceName) if err != nil { - klog.Error(err) - return nil, err + if !errors.IsAlreadyExists(err) { + klog.Error(err) + return nil, err + } } err = Offloading(ctx, localClient, remoteRestConfig, localNamespaceName, contract) diff --git a/tools/development/build.sh b/tools/development/build.sh old mode 100644 new mode 100755 index b35e83f5..1762fb82 --- a/tools/development/build.sh +++ b/tools/development/build.sh @@ -3,9 +3,10 @@ # Build and load the docker image build_and_load() { local COMPONENT="$1" - docker build -f ../../build/common/Dockerfile --build-arg COMPONENT="$COMPONENT" -t "$NAMESPACE"/"$COMPONENT":"$VERSION" ../../ - kind load docker-image "$NAMESPACE"/"$COMPONENT":"$VERSION" --name=fluidos-provider - kind load docker-image "$NAMESPACE"/"$COMPONENT":"$VERSION" --name=fluidos-consumer + docker build -f ../../build/common/Dockerfile --build-arg COMPONENT="$COMPONENT" -t "$NAMESPACE/$COMPONENT:$VERSION" ../../ + for NAME in $(kind get clusters | grep -i fluidos); do + kind load docker-image "$NAMESPACE/$COMPONENT:$VERSION" --name="$NAME" + done } # Get the Docker namespace, version, and component from the command line diff --git a/tools/scripts/install_liqo.sh b/tools/scripts/install_liqo.sh old mode 100755 new mode 100644