From 215e67283c2d704e81b1e518a75f7aca97e476d6 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Wed, 18 Aug 2021 16:00:57 +0800 Subject: [PATCH 01/14] v0.1 --- .../app/controllermanager.go | 44 +++++++++++-------- cmd/controller-manager/app/options/options.go | 12 ++++- pkg/util/detector/detector.go | 4 +- pkg/util/detector/discovery.go | 20 +++++++-- pkg/util/objectwatcher/objectwatcher.go | 6 ++- .../pkg/leaderelection/leader_election.go | 3 +- 6 files changed, 62 insertions(+), 27 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index 72b709f45482..e9a26c9a20d2 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -22,9 +22,9 @@ import ( "github.com/karmada-io/karmada/pkg/controllers/binding" "github.com/karmada-io/karmada/pkg/controllers/cluster" "github.com/karmada-io/karmada/pkg/controllers/execution" - "github.com/karmada-io/karmada/pkg/controllers/hpa" + //"github.com/karmada-io/karmada/pkg/controllers/hpa" "github.com/karmada-io/karmada/pkg/controllers/mcs" - "github.com/karmada-io/karmada/pkg/controllers/namespace" + //"github.com/karmada-io/karmada/pkg/controllers/namespace" "github.com/karmada-io/karmada/pkg/controllers/propagationpolicy" "github.com/karmada-io/karmada/pkg/controllers/status" "github.com/karmada-io/karmada/pkg/util" @@ -69,6 +69,7 @@ func Run(ctx context.Context, opts *options.Options) error { LeaderElectionID: "karmada-controller-manager", HealthProbeBindAddress: fmt.Sprintf("%s:%d", opts.BindAddress, opts.SecurePort), LivenessEndpointName: "/healthz", + MetricsBindAddress: opts.MetricsBindAddress, }) if err != nil { klog.Errorf("failed to build controller manager: %v", err) @@ -119,6 +120,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop DynamicClient: dynamicClientSet, SkippedResourceConfig: skippedResourceConfig, SkippedPropagatingNamespaces: skippedPropagatingNamespaces, + ManagedGroups: opts.ManagedGroups, } resourceDetector.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(resourceDetector.EventFilter, resourceDetector.OnAdd, resourceDetector.OnUpdate, resourceDetector.OnDelete) @@ -173,15 +175,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup cluster status controller: %v", err) } - hpaController := &hpa.HorizontalPodAutoscalerController{ - Client: mgr.GetClient(), - DynamicClient: dynamicClientSet, - EventRecorder: mgr.GetEventRecorderFor(hpa.ControllerName), - RESTMapper: mgr.GetRESTMapper(), - } - if err := hpaController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup hpa controller: %v", err) - } + /* + hpaController := &hpa.HorizontalPodAutoscalerController{ + Client: mgr.GetClient(), + DynamicClient: dynamicClientSet, + EventRecorder: mgr.GetEventRecorderFor(hpa.ControllerName), + RESTMapper: mgr.GetRESTMapper(), + } + if err := hpaController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup hpa controller: %v", err) + } + */ policyController := &propagationpolicy.Controller{ Client: mgr.GetClient(), @@ -240,14 +244,16 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup work status controller: %v", err) } - namespaceSyncController := &namespace.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(namespace.ControllerName), - SkippedPropagatingNamespaces: skippedPropagatingNamespaces, - } - if err := namespaceSyncController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup namespace sync controller: %v", err) - } + /* + namespaceSyncController := &namespace.Controller{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(namespace.ControllerName), + SkippedPropagatingNamespaces: skippedPropagatingNamespaces, + } + if err := namespaceSyncController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup namespace sync controller: %v", err) + } + */ serviceExportController := &mcs.ServiceExportController{ Client: mgr.GetClient(), diff --git a/cmd/controller-manager/app/options/options.go b/cmd/controller-manager/app/options/options.go index 19263c475409..4ccadfb21973 100644 --- a/cmd/controller-manager/app/options/options.go +++ b/cmd/controller-manager/app/options/options.go @@ -17,14 +17,20 @@ var ( ) const ( - defaultBindAddress = "0.0.0.0" - defaultPort = 10357 + defaultMetricsBindAddress = ":8080" + defaultBindAddress = "0.0.0.0" + defaultPort = 10357 ) // Options contains everything necessary to create and run controller-manager. type Options struct { HostNamespace string LeaderElection componentbaseconfig.LeaderElectionConfiguration + + ManagedGroups []string + // MetricsBindAddress is the TCP address that the controller should bind to for serving prometheus metrics. + // It can be set to "0" to disable the metrics serving. + MetricsBindAddress string // BindAddress is the IP address on which to listen for the --secure-port port. BindAddress string // SecurePort is the port that the the server serves at. @@ -90,6 +96,8 @@ func (o *Options) Complete() { // AddFlags adds flags to the specified FlagSet. func (o *Options) AddFlags(flags *pflag.FlagSet) { + flags.StringSliceVar(&o.ManagedGroups, "managed-groups", []string{"autoscaling.karrier.io"}, "Groups managed by federation.") + flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", defaultMetricsBindAddress, "The TCP address for serving prometheus metrics.") flags.StringVar(&o.BindAddress, "bind-address", defaultBindAddress, "The IP address on which to listen for the --secure-port port.") flags.IntVar(&o.SecurePort, "secure-port", defaultPort, diff --git a/pkg/util/detector/detector.go b/pkg/util/detector/detector.go index 3932714450e3..55c13c1f0410 100644 --- a/pkg/util/detector/detector.go +++ b/pkg/util/detector/detector.go @@ -75,6 +75,8 @@ type ResourceDetector struct { waitingLock sync.RWMutex stopCh <-chan struct{} + + ManagedGroups []string } // Start runs the detector, never stop until stopCh closed. @@ -146,7 +148,7 @@ var _ manager.LeaderElectionRunnable = &ResourceDetector{} func (d *ResourceDetector) discoverResources(period time.Duration) { wait.Until(func() { - newResources := GetDeletableResources(d.DiscoveryClientSet) + newResources := GetDeletableResources(d.DiscoveryClientSet, d.ManagedGroups) for r := range newResources { if d.InformerManager.IsHandlerExist(r, d.EventHandler) { continue diff --git a/pkg/util/detector/discovery.go b/pkg/util/detector/discovery.go index 0d6f69f2a9ad..b4c9e93e2bf5 100644 --- a/pkg/util/detector/discovery.go +++ b/pkg/util/detector/discovery.go @@ -1,6 +1,8 @@ package detector import ( + "strings" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/discovery" "k8s.io/klog/v2" @@ -15,7 +17,7 @@ import ( // // This code is directly lifted from the Kubernetes codebase. // For reference: https://github.com/kubernetes/kubernetes/blob/1e11e4a2108024935ecfcb2912226cedeafd99df/pkg/controller/garbagecollector/garbagecollector.go#L638-L667 -func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) map[schema.GroupVersionResource]struct{} { +func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface, groups []string) map[schema.GroupVersionResource]struct{} { preferredResources, err := discoveryClient.ServerPreferredResources() if err != nil { if discovery.IsGroupDiscoveryFailedError(err) { @@ -32,14 +34,26 @@ func GetDeletableResources(discoveryClient discovery.ServerResourcesInterface) m // failures on a per-resource basis. deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete", "list", "watch"}}, preferredResources) deletableGroupVersionResources := map[schema.GroupVersionResource]struct{}{} + var match bool for _, rl := range deletableResources { gv, err := schema.ParseGroupVersion(rl.GroupVersion) if err != nil { klog.Warningf("ignoring invalid discovered resource %q: %v", rl.GroupVersion, err) continue } - for i := range rl.APIResources { - deletableGroupVersionResources[schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}] = struct{}{} + + for _, g := range groups { + match = true + if strings.Contains(gv.Group, g) { + break + } + match = false + } + + if match { + for i := range rl.APIResources { + deletableGroupVersionResources[schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: rl.APIResources[i].Name}] = struct{}{} + } } } diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index d3b72bfaba70..8b9cdc080167 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -66,6 +66,8 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc return err } + desireObj.SetResourceVersion("") + // Karmada will adopt creating resource due to an existing resource in member cluster, because we don't want to force update or delete the resource created by users. // users should resolve the conflict in person. clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) @@ -74,7 +76,7 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc // - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True), // when reconcile again, the controller will try to apply(by create) the resource again. // - 2. The resource already exist in the member cluster but it's not created by karmada. - if apierrors.IsAlreadyExists(err) { + if apierrors.IsAlreadyExists(err) || strings.Contains(err.Error(), "already exists") { existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) @@ -110,6 +112,8 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster return err } + desireObj.SetUID("") + err = RetainClusterFields(desireObj, clusterObj) if err != nil { klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) : %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) diff --git a/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go b/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go index 0173f6e2f47a..7a816ba9b308 100644 --- a/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go +++ b/vendor/sigs.k8s.io/controller-runtime/pkg/leaderelection/leader_election.go @@ -21,6 +21,7 @@ import ( "fmt" "io/ioutil" "os" + "strings" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/client-go/kubernetes" @@ -116,5 +117,5 @@ func getInClusterNamespace() (string, error) { if err != nil { return "", fmt.Errorf("error reading namespace file: %w", err) } - return string(namespace), nil + return strings.Replace(string(namespace), "\n", "", -1), nil } From 5f96a9e63b7c33720da96610840c64fa95f836be Mon Sep 17 00:00:00 2001 From: jingxueli Date: Mon, 23 Aug 2021 14:18:32 +0800 Subject: [PATCH 02/14] update --- pkg/util/objectwatcher/objectwatcher.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 8b9cdc080167..588116115fd6 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -67,7 +67,6 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc } desireObj.SetResourceVersion("") - // Karmada will adopt creating resource due to an existing resource in member cluster, because we don't want to force update or delete the resource created by users. // users should resolve the conflict in person. clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) @@ -112,14 +111,15 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster return err } - desireObj.SetUID("") - err = RetainClusterFields(desireObj, clusterObj) if err != nil { klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) : %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) return err } + desireObj.SetUID("") + // TODO: for SmartHPA + desireObj.SetOwnerReferences(clusterObj.GetOwnerReferences()) resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Failed to update resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) @@ -132,7 +132,6 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster o.recordVersion(resource, cluster.Name) return nil } - func (o *objectWatcherImpl) Delete(cluster *v1alpha1.Cluster, desireObj *unstructured.Unstructured) error { dynamicClusterClient, err := o.ClusterClientSetFunc(cluster, o.KubeClientSet) if err != nil { From 6941c6ec0d9cf32371d13ef4efe34a9fac39c4c7 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Tue, 24 Aug 2021 18:26:32 +0800 Subject: [PATCH 03/14] move cluster controller and cluster status controller --- .../app/controllermanager.go | 98 ++++++++++--------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index e9a26c9a20d2..e10467cc1a82 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -10,17 +10,17 @@ import ( "github.com/spf13/cobra" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" - kubeclientset "k8s.io/client-go/kubernetes" + //kubeclientset "k8s.io/client-go/kubernetes" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" - "sigs.k8s.io/controller-runtime/pkg/event" + //"sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/healthz" - "sigs.k8s.io/controller-runtime/pkg/predicate" + //"sigs.k8s.io/controller-runtime/pkg/predicate" "github.com/karmada-io/karmada/cmd/controller-manager/app/options" - clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" + //clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/binding" - "github.com/karmada-io/karmada/pkg/controllers/cluster" + //"github.com/karmada-io/karmada/pkg/controllers/cluster" "github.com/karmada-io/karmada/pkg/controllers/execution" //"github.com/karmada-io/karmada/pkg/controllers/hpa" "github.com/karmada-io/karmada/pkg/controllers/mcs" @@ -129,51 +129,53 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup resource detector: %v", err) } - clusterController := &cluster.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), - ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, - ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, - ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, - } - if err := clusterController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup cluster controller: %v", err) - } + /* + clusterController := &cluster.Controller{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), + ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, + ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, + ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, + } + if err := clusterController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup cluster controller: %v", err) + } - clusterPredicateFunc := predicate.Funcs{ - CreateFunc: func(createEvent event.CreateEvent) bool { - obj := createEvent.Object.(*clusterv1alpha1.Cluster) - return obj.Spec.SyncMode == clusterv1alpha1.Push - }, - UpdateFunc: func(updateEvent event.UpdateEvent) bool { - obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster) - return obj.Spec.SyncMode == clusterv1alpha1.Push - }, - DeleteFunc: func(deleteEvent event.DeleteEvent) bool { - obj := deleteEvent.Object.(*clusterv1alpha1.Cluster) - return obj.Spec.SyncMode == clusterv1alpha1.Push - }, - GenericFunc: func(genericEvent event.GenericEvent) bool { - return false - }, - } + clusterPredicateFunc := predicate.Funcs{ + CreateFunc: func(createEvent event.CreateEvent) bool { + obj := createEvent.Object.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push + }, + UpdateFunc: func(updateEvent event.UpdateEvent) bool { + obj := updateEvent.ObjectNew.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push + }, + DeleteFunc: func(deleteEvent event.DeleteEvent) bool { + obj := deleteEvent.Object.(*clusterv1alpha1.Cluster) + return obj.Spec.SyncMode == clusterv1alpha1.Push + }, + GenericFunc: func(genericEvent event.GenericEvent) bool { + return false + }, + } - clusterStatusController := &status.ClusterStatusController{ - Client: mgr.GetClient(), - KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), - EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), - PredicateFunc: clusterPredicateFunc, - InformerManager: informermanager.GetInstance(), - StopChan: stopChan, - ClusterClientSetFunc: util.NewClusterClientSet, - ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, - ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, - ClusterLeaseDuration: opts.ClusterLeaseDuration, - ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, - } - if err := clusterStatusController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup cluster status controller: %v", err) - } + clusterStatusController := &status.ClusterStatusController{ + Client: mgr.GetClient(), + KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), + EventRecorder: mgr.GetEventRecorderFor(status.ControllerName), + PredicateFunc: clusterPredicateFunc, + InformerManager: informermanager.GetInstance(), + StopChan: stopChan, + ClusterClientSetFunc: util.NewClusterClientSet, + ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet, + ClusterStatusUpdateFrequency: opts.ClusterStatusUpdateFrequency, + ClusterLeaseDuration: opts.ClusterLeaseDuration, + ClusterLeaseRenewIntervalFraction: opts.ClusterLeaseRenewIntervalFraction, + } + if err := clusterStatusController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup cluster status controller: %v", err) + } + */ /* hpaController := &hpa.HorizontalPodAutoscalerController{ From 5d677726c4834c26d280631bd8ad666f55b2af06 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Mon, 30 Aug 2021 14:49:36 +0800 Subject: [PATCH 04/14] update --- pkg/util/objectwatcher/objectwatcher.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 588116115fd6..8ea315adfaa5 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -66,7 +66,6 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc return err } - desireObj.SetResourceVersion("") // Karmada will adopt creating resource due to an existing resource in member cluster, because we don't want to force update or delete the resource created by users. // users should resolve the conflict in person. clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) @@ -117,7 +116,6 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster return err } - desireObj.SetUID("") // TODO: for SmartHPA desireObj.SetOwnerReferences(clusterObj.GetOwnerReferences()) resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) From 69bf4c8a5c9d835925a42ca2a9aeb4c27e22a22e Mon Sep 17 00:00:00 2001 From: jingxueli Date: Wed, 1 Sep 2021 20:37:56 +0800 Subject: [PATCH 05/14] update --- pkg/util/objectwatcher/objectwatcher.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 8ea315adfaa5..2371c86a845c 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -66,6 +66,11 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc return err } + // TODO: more common + // for SmartHPA here + if desireObj.GroupVersionKind().Group == "autoscaling.karrier.io" { + desireObj.SetOwnerReferences(nil) + } // Karmada will adopt creating resource due to an existing resource in member cluster, because we don't want to force update or delete the resource created by users. // users should resolve the conflict in person. clusterObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Create(context.TODO(), desireObj, metav1.CreateOptions{}) @@ -116,8 +121,11 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster return err } - // TODO: for SmartHPA - desireObj.SetOwnerReferences(clusterObj.GetOwnerReferences()) + // TODO: more common + // for SmartHPA here + if desireObj.GroupVersionKind().Group == "autoscaling.karrier.io" { + desireObj.SetOwnerReferences(clusterObj.GetOwnerReferences()) + } resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) if err != nil { klog.Errorf("Failed to update resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) From 19e5ad7ac8d839dc2a02eac5486b46420a460f6e Mon Sep 17 00:00:00 2001 From: jingxueli Date: Thu, 2 Sep 2021 14:56:12 +0800 Subject: [PATCH 06/14] set resourcebinding.spec.clusters from clusterList and propogationPolicy --- pkg/util/detector/detector.go | 69 +++++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/pkg/util/detector/detector.go b/pkg/util/detector/detector.go index 55c13c1f0410..8f103e4c8d40 100644 --- a/pkg/util/detector/detector.go +++ b/pkg/util/detector/detector.go @@ -20,6 +20,7 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/errors" + "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/discovery" "k8s.io/client-go/dynamic" @@ -29,6 +30,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil" "sigs.k8s.io/controller-runtime/pkg/manager" + clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/util" @@ -300,6 +302,57 @@ func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) { func (d *ResourceDetector) OnDelete(obj interface{}) { d.OnAdd(obj) } +func (d *ResourceDetector) LookForMatchedClusters(clusterAffinity *policyv1alpha1.ClusterAffinity) ([]workv1alpha1.TargetCluster, error) { + clusterList := &clusterv1alpha1.ClusterList{} + if err := d.Client.List(context.TODO(), clusterList); err != nil { + klog.Errorf("Failed to list cluster: %v", err) + return nil, err + } + + result := make([]workv1alpha1.TargetCluster, 0) + for cn, _ := range ClustersMatchSelector(clusterList, clusterAffinity) { + result = append(result, workv1alpha1.TargetCluster{Name: cn}) + } + + return result, nil +} +func ClustersMatchSelector(clusterList *clusterv1alpha1.ClusterList, clusterAffinity *policyv1alpha1.ClusterAffinity) sets.String { + result := sets.String{} + for _, cluster := range clusterList.Items { + isExist, err := ClusterMatchSelector(&cluster, clusterAffinity) + if err != nil { + continue + } + if isExist { + result.Insert(cluster.Name) + } else if result.Has(cluster.Name) { + result.Delete(cluster.Name) + } + } + return result +} +func ClusterMatchSelector(cluster *clusterv1alpha1.Cluster, clusterAffinity *policyv1alpha1.ClusterAffinity) (bool, error) { + for _, ecn := range clusterAffinity.ExcludeClusters { + if ecn == cluster.Name { + return false, nil + } + } + for _, cn := range clusterAffinity.ClusterNames { + if cn == cluster.Name { + return true, nil + } + } + if clusterAffinity.LabelSelector != nil { + s, err := metav1.LabelSelectorAsSelector(clusterAffinity.LabelSelector) + if err != nil { + return false, err + } + if s.Matches(labels.Set(cluster.Labels)) { + return true, nil + } + } + return false, nil +} // LookForMatchedPolicy tries to find a policy for object referenced by object key. func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey) (*policyv1alpha1.PropagationPolicy, error) { @@ -392,6 +445,14 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object bindingCopy.Labels = binding.Labels bindingCopy.OwnerReferences = binding.OwnerReferences bindingCopy.Spec.Resource = binding.Spec.Resource + if len(bindingCopy.Spec.Clusters) == 0 { + clusters, err := d.LookForMatchedClusters(policy.Spec.Placement.ClusterAffinity) + if err != nil { + klog.Errorf("Failed to look clusters for object: %s. error: %v", objectKey, err) + return err + } + bindingCopy.Spec.Clusters = clusters + } return nil }) if err != nil { @@ -438,6 +499,14 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Labels = binding.Labels bindingCopy.OwnerReferences = binding.OwnerReferences bindingCopy.Spec.Resource = binding.Spec.Resource + if len(bindingCopy.Spec.Clusters) == 0 { + clusters, err := d.LookForMatchedClusters(policy.Spec.Placement.ClusterAffinity) + if err != nil { + klog.Errorf("Failed to look clusters for object: %s. error: %v", objectKey, err) + return err + } + bindingCopy.Spec.Clusters = clusters + } return nil }) From 70408747bd8aa34c96d5f055200405a69ef4eab4 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Fri, 10 Sep 2021 17:58:10 +0800 Subject: [PATCH 07/14] remove: judge whether error contains "already exists" or not --- pkg/util/objectwatcher/objectwatcher.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 2371c86a845c..0c27e01bcb8b 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -79,7 +79,7 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc // - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True), // when reconcile again, the controller will try to apply(by create) the resource again. // - 2. The resource already exist in the member cluster but it's not created by karmada. - if apierrors.IsAlreadyExists(err) || strings.Contains(err.Error(), "already exists") { + if apierrors.IsAlreadyExists(err) { existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) From 7416877f3fe68b7e29dd594303dcf8f871e24492 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Wed, 15 Sep 2021 21:23:45 +0800 Subject: [PATCH 08/14] use cluster-controller --- .../app/controllermanager.go | 24 +++++++++---------- pkg/controllers/cluster/cluster_controller.go | 19 ++++++++++----- 2 files changed, 25 insertions(+), 18 deletions(-) diff --git a/cmd/controller-manager/app/controllermanager.go b/cmd/controller-manager/app/controllermanager.go index e10467cc1a82..0b764cbb9138 100644 --- a/cmd/controller-manager/app/controllermanager.go +++ b/cmd/controller-manager/app/controllermanager.go @@ -20,7 +20,7 @@ import ( "github.com/karmada-io/karmada/cmd/controller-manager/app/options" //clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" "github.com/karmada-io/karmada/pkg/controllers/binding" - //"github.com/karmada-io/karmada/pkg/controllers/cluster" + "github.com/karmada-io/karmada/pkg/controllers/cluster" "github.com/karmada-io/karmada/pkg/controllers/execution" //"github.com/karmada-io/karmada/pkg/controllers/hpa" "github.com/karmada-io/karmada/pkg/controllers/mcs" @@ -129,18 +129,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop klog.Fatalf("Failed to setup resource detector: %v", err) } + clusterController := &cluster.Controller{ + Client: mgr.GetClient(), + EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), + ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, + ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, + ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, + } + if err := clusterController.SetupWithManager(mgr); err != nil { + klog.Fatalf("Failed to setup cluster controller: %v", err) + } /* - clusterController := &cluster.Controller{ - Client: mgr.GetClient(), - EventRecorder: mgr.GetEventRecorderFor(cluster.ControllerName), - ClusterMonitorPeriod: opts.ClusterMonitorPeriod.Duration, - ClusterMonitorGracePeriod: opts.ClusterMonitorGracePeriod.Duration, - ClusterStartupGracePeriod: opts.ClusterStartupGracePeriod.Duration, - } - if err := clusterController.SetupWithManager(mgr); err != nil { - klog.Fatalf("Failed to setup cluster controller: %v", err) - } - clusterPredicateFunc := predicate.Funcs{ CreateFunc: func(createEvent event.CreateEvent) bool { obj := createEvent.Object.(*clusterv1alpha1.Cluster) @@ -159,6 +158,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop }, } + clusterStatusController := &status.ClusterStatusController{ Client: mgr.GetClient(), KubeClient: kubeclientset.NewForConfigOrDie(mgr.GetConfig()), diff --git a/pkg/controllers/cluster/cluster_controller.go b/pkg/controllers/cluster/cluster_controller.go index 4a29b60ebff2..c7f07ff4ca60 100644 --- a/pkg/controllers/cluster/cluster_controller.go +++ b/pkg/controllers/cluster/cluster_controller.go @@ -13,8 +13,8 @@ import ( "k8s.io/apimachinery/pkg/api/meta" v1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" - utilerrors "k8s.io/apimachinery/pkg/util/errors" - "k8s.io/apimachinery/pkg/util/wait" + //utilerrors "k8s.io/apimachinery/pkg/util/errors" + //"k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/record" "k8s.io/klog/v2" controllerruntime "sigs.k8s.io/controller-runtime" @@ -86,6 +86,7 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques } // Start starts an asynchronous loop that monitors the status of cluster. +/* func (c *Controller) Start(ctx context.Context) error { klog.Infof("Starting cluster health monitor") defer klog.Infof("Shutting cluster health monitor") @@ -100,13 +101,17 @@ func (c *Controller) Start(ctx context.Context) error { return nil } +*/ // SetupWithManager creates a controller and register to controller manager. func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error { - return utilerrors.NewAggregate([]error{ - controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c), - mgr.Add(c), - }) + /* + return utilerrors.NewAggregate([]error{ + controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c), + mgr.Add(c), + }) + */ + return controllerruntime.NewControllerManagedBy(mgr).For(&v1alpha1.Cluster{}).Complete(c) } func (c *Controller) syncCluster(cluster *v1alpha1.Cluster) (controllerruntime.Result, error) { @@ -246,6 +251,7 @@ func (c *Controller) createExecutionSpace(cluster *v1alpha1.Cluster) error { return nil } +/* func (c *Controller) monitorClusterHealth() error { clusterList := &v1alpha1.ClusterList{} if err := c.Client.List(context.TODO(), clusterList); err != nil { @@ -279,6 +285,7 @@ func (c *Controller) monitorClusterHealth() error { return nil } +*/ // tryUpdateClusterHealth checks a given cluster's conditions and tries to update it. //nolint:gocyclo From 21cd8c9d23b094d63e0c699c858909bf180a5633 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Fri, 17 Sep 2021 17:28:18 +0800 Subject: [PATCH 09/14] update: replace karmada-scheduler --- pkg/util/detector/detector.go | 67 +++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 7 deletions(-) diff --git a/pkg/util/detector/detector.go b/pkg/util/detector/detector.go index 8f103e4c8d40..893b92e51ca5 100644 --- a/pkg/util/detector/detector.go +++ b/pkg/util/detector/detector.go @@ -302,20 +302,63 @@ func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) { func (d *ResourceDetector) OnDelete(obj interface{}) { d.OnAdd(obj) } -func (d *ResourceDetector) LookForMatchedClusters(clusterAffinity *policyv1alpha1.ClusterAffinity) ([]workv1alpha1.TargetCluster, error) { +func (d *ResourceDetector) LookForMatchedClusters(placement *policyv1alpha1.Placement) ([]workv1alpha1.TargetCluster, error) { + clusterAffinity := placement.ClusterAffinity + clusterList := &clusterv1alpha1.ClusterList{} if err := d.Client.List(context.TODO(), clusterList); err != nil { klog.Errorf("Failed to list cluster: %v", err) return nil, err } - + /* + readyClusters := make([]clusterv1alpha1.Cluster, 0) + for _, cluster := range clusterList.Items { + if IsClusterReady(&cluster.Status) { + readyClusters = append(readyClusters, cluster) + } + } + readyClusterList := &clusterv1alpha1.ClusterList{} + readyClusterList.Items = readyClusters + clusterList = readyClusterList + */ result := make([]workv1alpha1.TargetCluster, 0) - for cn, _ := range ClustersMatchSelector(clusterList, clusterAffinity) { + for cn := range ClustersMatchSelector(clusterList, clusterAffinity) { result = append(result, workv1alpha1.TargetCluster{Name: cn}) } return result, nil } + +func IsUpdateBindingClusters(resourceBinding *workv1alpha1.ResourceBinding, placement *policyv1alpha1.Placement) (string, bool) { + if len(resourceBinding.Spec.Clusters) == 0 { + return "", true + } + + policyPlacementStr, err := GetPlacementBytes(placement) + if err != nil { + return "", false + } + + appliedPlacement := util.GetLabelValue(resourceBinding.Annotations, util.PolicyPlacementAnnotation) + if policyPlacementStr != appliedPlacement { + return policyPlacementStr, true + } + + return "", false +} + +func GetPlacementBytes(placement *policyv1alpha1.Placement) (string, error) { + placementBytes, err := json.Marshal(*placement) + if err != nil { + return "", err + } + return string(placementBytes), nil +} + +func IsClusterReady(clusterStatus *clusterv1alpha1.ClusterStatus) bool { + return meta.IsStatusConditionTrue(clusterStatus.Conditions, clusterv1alpha1.ClusterConditionReady) +} + func ClustersMatchSelector(clusterList *clusterv1alpha1.ClusterList, clusterAffinity *policyv1alpha1.ClusterAffinity) sets.String { result := sets.String{} for _, cluster := range clusterList.Items { @@ -445,13 +488,18 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object bindingCopy.Labels = binding.Labels bindingCopy.OwnerReferences = binding.OwnerReferences bindingCopy.Spec.Resource = binding.Spec.Resource - if len(bindingCopy.Spec.Clusters) == 0 { - clusters, err := d.LookForMatchedClusters(policy.Spec.Placement.ClusterAffinity) + placementStr, isUpdate := IsUpdateBindingClusters(bindingCopy, &policy.Spec.Placement) + if isUpdate { + clusters, err := d.LookForMatchedClusters(&policy.Spec.Placement) if err != nil { klog.Errorf("Failed to look clusters for object: %s. error: %v", objectKey, err) return err } bindingCopy.Spec.Clusters = clusters + if bindingCopy.Annotations == nil { + bindingCopy.Annotations = make(map[string]string) + } + bindingCopy.Annotations[util.PolicyPlacementAnnotation] = placementStr } return nil }) @@ -499,13 +547,18 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, bindingCopy.Labels = binding.Labels bindingCopy.OwnerReferences = binding.OwnerReferences bindingCopy.Spec.Resource = binding.Spec.Resource - if len(bindingCopy.Spec.Clusters) == 0 { - clusters, err := d.LookForMatchedClusters(policy.Spec.Placement.ClusterAffinity) + placementStr, isUpdate := IsUpdateBindingClusters(bindingCopy, &policy.Spec.Placement) + if isUpdate { + clusters, err := d.LookForMatchedClusters(&policy.Spec.Placement) if err != nil { klog.Errorf("Failed to look clusters for object: %s. error: %v", objectKey, err) return err } bindingCopy.Spec.Clusters = clusters + if bindingCopy.Annotations == nil { + bindingCopy.Annotations = make(map[string]string) + } + bindingCopy.Annotations[util.PolicyPlacementAnnotation] = placementStr } return nil }) From 4dd4eeb32523f20f5282d66c3f128ad51c22dff0 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Sat, 9 Oct 2021 19:49:57 +0800 Subject: [PATCH 10/14] remove work metadata.ownerReferences field --- pkg/webhook/work/mutating.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/webhook/work/mutating.go b/pkg/webhook/work/mutating.go index ea6b0f6aa6df..41e0d9096fd5 100644 --- a/pkg/webhook/work/mutating.go +++ b/pkg/webhook/work/mutating.go @@ -97,6 +97,8 @@ func removeIrrelevantField(workload *unstructured.Unstructured) { // populated by the kubernetes. unstructured.RemoveNestedField(workload.Object, "metadata", "uid") + unstructured.RemoveNestedField(workload.Object, "metadata", "ownerReferences") + unstructured.RemoveNestedField(workload.Object, "status") if workload.GetKind() == util.ServiceKind { From b838b49517701cfc0214269e09c11d6dff96a7a8 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Mon, 18 Oct 2021 17:25:10 +0800 Subject: [PATCH 11/14] add dockerfile --- Dockerfile_cm | 15 +++++++++++++++ Dockerfile_webhook | 15 +++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 Dockerfile_cm create mode 100644 Dockerfile_webhook diff --git a/Dockerfile_cm b/Dockerfile_cm new file mode 100644 index 000000000000..c146a38a5cb0 --- /dev/null +++ b/Dockerfile_cm @@ -0,0 +1,15 @@ +FROM hub.cloud.ctripcorp.com/k8s-mirror/golang:1.14 as builder + +RUN mkdir -p /go/src/github.com/karmada-io/karmada + +ADD . /go/src/github.com/karmada-io/karmada + +WORKDIR /go/src/github.com/karmada-io/karmada/cmd/controller-manager + +RUN GOPROXY="https://goproxy.io" GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /go/bin/controller-manager . + +FROM hub.cloud.ctripcorp.com/container/alpine:3.5 + +WORKDIR /usr/local/bin + +COPY --from=builder /go/bin/controller-manager . diff --git a/Dockerfile_webhook b/Dockerfile_webhook new file mode 100644 index 000000000000..2115ffb5db78 --- /dev/null +++ b/Dockerfile_webhook @@ -0,0 +1,15 @@ +FROM hub.cloud.ctripcorp.com/k8s-mirror/golang:1.14 as builder + +RUN mkdir -p /go/src/github.com/karmada-io/karmada + +ADD . /go/src/github.com/karmada-io/karmada + +WORKDIR /go/src/github.com/karmada-io/karmada/cmd/webhook + +RUN GOPROXY="https://goproxy.io" GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o /go/bin/webhook . + +FROM hub.cloud.ctripcorp.com/container/alpine:3.5 + +WORKDIR /usr/local/bin + +COPY --from=builder /go/bin/webhook . From 44ee4cefdf8070ccd89b8dc7dcc7501268a29d40 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Mon, 18 Oct 2021 17:43:44 +0800 Subject: [PATCH 12/14] add cluster message in objectwatcher logs for convenient debugging Signed-off-by: jingxueli --- pkg/util/objectwatcher/objectwatcher.go | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 0c27e01bcb8b..8631ffa1532b 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -62,7 +62,7 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind()) if err != nil { - klog.Errorf("Failed to create resource(%s/%s) as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to create resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) return err } @@ -92,7 +92,7 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc return o.Update(cluster, desireObj, existObj) } - klog.Errorf("Failed to create resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to create resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) return err } klog.Infof("Created resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) @@ -111,13 +111,13 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind()) if err != nil { - klog.Errorf("Failed to update resource(%s/%s) as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to update resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) return err } err = RetainClusterFields(desireObj, clusterObj) if err != nil { - klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) : %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to retain fields for resource(kind=%s, %s/%s) in cluster %s: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) return err } @@ -128,7 +128,7 @@ func (o *objectWatcherImpl) Update(cluster *v1alpha1.Cluster, desireObj, cluster } resource, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Update(context.TODO(), desireObj, metav1.UpdateOptions{}) if err != nil { - klog.Errorf("Failed to update resource(kind=%s, %s/%s), err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to update resource(kind=%s, %s/%s) in cluster %s, err is %v ", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) return err } @@ -147,7 +147,7 @@ func (o *objectWatcherImpl) Delete(cluster *v1alpha1.Cluster, desireObj *unstruc gvr, err := restmapper.GetGroupVersionResource(o.RESTMapper, desireObj.GroupVersionKind()) if err != nil { - klog.Errorf("Failed to delete resource(%s/%s) as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), err) + klog.Errorf("Failed to delete resource(%s/%s) in cluster %s as mapping GVK to GVR failed: %v", desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) return err } @@ -156,7 +156,7 @@ func (o *objectWatcherImpl) Delete(cluster *v1alpha1.Cluster, desireObj *unstruc err = nil } if err != nil { - klog.Errorf("Failed to delete resource %v, err is %v ", desireObj.GetName(), err) + klog.Errorf("Failed to delete resource %v in cluster %s, err is %v ", desireObj.GetName(), cluster.Name, err) return err } klog.Infof("Deleted resource(kind=%s, %s/%s) on cluster: %s", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name) @@ -226,8 +226,8 @@ func (o *objectWatcherImpl) NeedsUpdate(cluster *v1alpha1.Cluster, desiredObj, c // get resource version version, exist := o.getVersionRecord(cluster.Name, desiredObj.GroupVersionKind().String()+"/"+desiredObj.GetNamespace()+"/"+desiredObj.GetName()) if !exist { - klog.Errorf("Failed to update resource %v/%v for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName()) - return false, fmt.Errorf("failed to update resource %v/%v for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName()) + klog.Errorf("Failed to update resource %v/%v in cluster %s for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName(), cluster.Name) + return false, fmt.Errorf("failed to update resource %v/%v in cluster %s for the version record does not exist", desiredObj.GetNamespace(), desiredObj.GetName(), cluster.Name) } return objectNeedsUpdate(desiredObj, clusterObj, version), nil From b79d2ac1fdacf3f638b2dc88fd9860605727cca6 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Tue, 19 Oct 2021 16:25:57 +0800 Subject: [PATCH 13/14] fix failed deletion in member clusters --- pkg/controllers/execution/execution_controller.go | 2 +- pkg/util/objectwatcher/objectwatcher.go | 2 +- pkg/util/objectwatcher/retain.go | 2 ++ 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 78b44f196682..44d77d12c265 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -76,8 +76,8 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) return controllerruntime.Result{Requeue: true}, err } + return c.removeFinalizer(work) } - return c.removeFinalizer(work) } return c.syncWork(cluster, work) diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 8631ffa1532b..4eab57da79ba 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -79,7 +79,7 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc // - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True), // when reconcile again, the controller will try to apply(by create) the resource again. // - 2. The resource already exist in the member cluster but it's not created by karmada. - if apierrors.IsAlreadyExists(err) { + if apierrors.IsAlreadyExists(err) || strings.Contains(err.Error(), "already exists") { existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err) diff --git a/pkg/util/objectwatcher/retain.go b/pkg/util/objectwatcher/retain.go index e91bc46dd3a0..be143fc46c53 100644 --- a/pkg/util/objectwatcher/retain.go +++ b/pkg/util/objectwatcher/retain.go @@ -34,6 +34,8 @@ func RetainClusterFields(desiredObj, clusterObj *unstructured.Unstructured) erro // and be set by user in karmada-controller-plane. util.MergeAnnotations(desiredObj, clusterObj) + desiredObj.SetDeletionGracePeriodSeconds(clusterObj.GetDeletionGracePeriodSeconds()) + switch targetKind { case util.PodKind: return retainPodFields(desiredObj, clusterObj) From 0f08a0a166a8625b3766f18f8e5333544e5962f5 Mon Sep 17 00:00:00 2001 From: jingxueli Date: Mon, 1 Nov 2021 14:34:02 +0800 Subject: [PATCH 14/14] useless helper.IsResourceApplied() when create/update/delete workload --- .../execution/execution_controller.go | 60 ++++++++++++------- pkg/util/objectwatcher/objectwatcher.go | 2 +- 2 files changed, 39 insertions(+), 23 deletions(-) diff --git a/pkg/controllers/execution/execution_controller.go b/pkg/controllers/execution/execution_controller.go index 44d77d12c265..7d8b5249b392 100644 --- a/pkg/controllers/execution/execution_controller.go +++ b/pkg/controllers/execution/execution_controller.go @@ -19,7 +19,7 @@ import ( "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" "github.com/karmada-io/karmada/pkg/util" - "github.com/karmada-io/karmada/pkg/util/helper" + //"github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/restmapper" @@ -69,15 +69,23 @@ func (c *Controller) Reconcile(ctx context.Context, req controllerruntime.Reques } if !work.DeletionTimestamp.IsZero() { - applied := helper.IsResourceApplied(&work.Status) - if applied { - err := c.tryDeleteWorkload(cluster, work) - if err != nil { - klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) - return controllerruntime.Result{Requeue: true}, err + /* + applied := helper.IsResourceApplied(&work.Status) + if applied { + err := c.tryDeleteWorkload(cluster, work) + if err != nil { + klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) + return controllerruntime.Result{Requeue: true}, err + } + return c.removeFinalizer(work) } - return c.removeFinalizer(work) + */ + err := c.tryDeleteWorkload(cluster, work) + if err != nil { + klog.Errorf("Failed to delete work %v, namespace is %v, err is %v", work.Name, work.Namespace, err) + return controllerruntime.Result{Requeue: true}, err } + return c.removeFinalizer(work) } return c.syncWork(cluster, work) @@ -166,21 +174,29 @@ func (c *Controller) syncToClusters(cluster *v1alpha1.Cluster, work *workv1alpha continue } - applied := helper.IsResourceApplied(&work.Status) - if applied { - err = c.tryUpdateWorkload(cluster, workload, clusterDynamicClient) - if err != nil { - klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) - errs = append(errs, err) - continue - } - } else { - err = c.tryCreateWorkload(cluster, workload) - if err != nil { - klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) - errs = append(errs, err) - continue + /* + applied := helper.IsResourceApplied(&work.Status) + if applied { + err = c.tryUpdateWorkload(cluster, workload, clusterDynamicClient) + if err != nil { + klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) + errs = append(errs, err) + continue + } + } else { + err = c.tryCreateWorkload(cluster, workload) + if err != nil { + klog.Errorf("Failed to create resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) + errs = append(errs, err) + continue + } } + */ + err = c.tryUpdateWorkload(cluster, workload, clusterDynamicClient) + if err != nil { + klog.Errorf("Failed to update resource(%v/%v) in the given member cluster %s, err is %v", workload.GetNamespace(), workload.GetName(), cluster.Name, err) + errs = append(errs, err) + continue } syncSucceedNum++ } diff --git a/pkg/util/objectwatcher/objectwatcher.go b/pkg/util/objectwatcher/objectwatcher.go index 4eab57da79ba..8631ffa1532b 100644 --- a/pkg/util/objectwatcher/objectwatcher.go +++ b/pkg/util/objectwatcher/objectwatcher.go @@ -79,7 +79,7 @@ func (o *objectWatcherImpl) Create(cluster *v1alpha1.Cluster, desireObj *unstruc // - 1. In a reconcile process, the execution controller successfully applied resource to member cluster but failed to update the work conditions(Applied=True), // when reconcile again, the controller will try to apply(by create) the resource again. // - 2. The resource already exist in the member cluster but it's not created by karmada. - if apierrors.IsAlreadyExists(err) || strings.Contains(err.Error(), "already exists") { + if apierrors.IsAlreadyExists(err) { existObj, err := dynamicClusterClient.DynamicClientSet.Resource(gvr).Namespace(desireObj.GetNamespace()).Get(context.TODO(), desireObj.GetName(), metav1.GetOptions{}) if err != nil { return fmt.Errorf("failed to get exist resource(kind=%s, %s/%s) in cluster %v: %v", desireObj.GetKind(), desireObj.GetNamespace(), desireObj.GetName(), cluster.Name, err)