From 0207ca7d7b67b350807b20439758439f04e5480d Mon Sep 17 00:00:00 2001 From: linyguo Date: Thu, 31 Oct 2024 10:00:16 +0800 Subject: [PATCH 1/7] add cancel context for solution manager --- .../managers/solution/solution-manager.go | 372 +++++++++++------- api/pkg/apis/v1alpha1/utils/apiclient.go | 23 ++ api/pkg/apis/v1alpha1/vendors/job-vendor.go | 4 - .../apis/v1alpha1/vendors/solution-vendor.go | 63 ++- .../v1alpha1/vendors/solution-vendor_test.go | 2 +- coa/pkg/apis/v1alpha2/errors.go | 9 + coa/pkg/apis/v1alpha2/types.go | 3 + 7 files changed, 335 insertions(+), 141 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 9b7bb3a46..49f08598d 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -17,6 +17,7 @@ import ( "sync" "time" + "github.com/eclipse-symphony/symphony/api/constants" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution/metrics" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model" sp "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers" @@ -64,6 +65,8 @@ type SolutionManager struct { IsTarget bool TargetNames []string ApiClientHttp api_utils.ApiClient + jobList map[string]map[string][]string + cancelFunc map[string]context.CancelFunc } type SolutionManagerDeploymentState struct { @@ -421,158 +424,190 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy plannedCount := 0 planSuccessCount := 0 for _, step := range plan.Steps { - log.DebugfCtx(ctx, " M (Solution): processing step with Role %s on target %s", step.Role, step.Target) - for _, component := range step.Components { - log.DebugfCtx(ctx, " M (Solution): processing component %s with action %s", component.Component.Name, component.Action) - } - if s.IsTarget && !api_utils.ContainsString(s.TargetNames, step.Target) { - continue - } - - if targetName != "" && targetName != step.Target { - continue - } - - plannedCount++ - - dep.ActiveTarget = step.Target - agent := findAgentFromDeploymentState(mergedState, step.Target) - if agent != "" { - col[ENV_NAME] = agent - } else { - delete(col, ENV_NAME) - } - var override tgt.ITargetProvider - role := step.Role - if role == "container" { - role = "instance" - } - if v, ok := s.TargetProviders[role]; ok { - override = v - } - var provider providers.IProvider - if override == nil { - targetSpec := s.getTargetStateForStep(step, deployment, previousDesiredState) - provider, err = sp.CreateProviderForTargetRole(s.Context, step.Role, targetSpec, override) + select { + case <-ctx.Done(): + // Context canceled or timed out + log.DebugfCtx(ctx, " M (Solution): reconcile canceled") + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) if err != nil { - summary.SummaryMessage = "failed to create provider:" + err.Error() - log.ErrorfCtx(ctx, " M (Solution): failed to create provider: %+v", err) + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) return summary, err } - } else { - provider = override - } + err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) + return summary, err + default: + log.DebugfCtx(ctx, " M (Solution): processing step with Role %s on target %s", step.Role, step.Target) + for _, component := range step.Components { + log.DebugfCtx(ctx, " M (Solution): processing component %s with action %s", component.Component.Name, component.Action) + } + if s.IsTarget && !api_utils.ContainsString(s.TargetNames, step.Target) { + continue + } - if previousDesiredState != nil { - testState := MergeDeploymentStates(&previousDesiredState.State, currentState) - if s.canSkipStep(ctx, step, step.Target, provider.(tgt.ITargetProvider), previousDesiredState.State.Components, testState) { - log.InfofCtx(ctx, " M (Solution): skipping step with role %s on target %s", step.Role, step.Target) - targetResult[step.Target] = 1 - planSuccessCount++ + if targetName != "" && targetName != step.Target { continue } - } - log.DebugfCtx(ctx, " M (Solution): applying step with Role %s on target %s", step.Role, step.Target) - someStepsRan = true - retryCount := 1 - //TODO: set to 1 for now. Although retrying can help to handle transient errors, in more cases - // an error condition can't be resolved quickly. - var stepError error - var componentResults map[string]model.ComponentResultSpec - - // for _, component := range step.Components { - // for k, v := range component.Component.Properties { - // if strV, ok := v.(string); ok { - // parser := api_utils.NewParser(strV) - // eCtx := s.VendorContext.EvaluationContext.Clone() - // eCtx.DeploymentSpec = deployment - // eCtx.Component = component.Component.Name - // val, err := parser.Eval(*eCtx) - // if err == nil { - // component.Component.Properties[k] = val - // } else { - // log.ErrorfCtx(ctx, " M (Solution): failed to evaluate property: %+v", err) - // summary.SummaryMessage = fmt.Sprintf("failed to evaluate property '%s' on component '%s: %s", k, component.Component.Name, err.Error()) - // s.saveSummary(ctx, deployment, summary) - // return summary, err - // } - // } - // } - // } - - for i := 0; i < retryCount; i++ { - componentResults, stepError = (provider.(tgt.ITargetProvider)).Apply(ctx, dep, step, deployment.IsDryRun) - if stepError == nil { - targetResult[step.Target] = 1 - summary.AllAssignedDeployed = plannedCount == planSuccessCount - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", ComponentResults: componentResults}) - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + + plannedCount++ + + dep.ActiveTarget = step.Target + agent := findAgentFromDeploymentState(mergedState, step.Target) + if agent != "" { + col[ENV_NAME] = agent + } else { + delete(col, ENV_NAME) + } + var override tgt.ITargetProvider + role := step.Role + if role == "container" { + role = "instance" + } + if v, ok := s.TargetProviders[role]; ok { + override = v + } + var provider providers.IProvider + if override == nil { + targetSpec := s.getTargetStateForStep(step, deployment, previousDesiredState) + provider, err = sp.CreateProviderForTargetRole(s.Context, step.Role, targetSpec, override) if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + summary.SummaryMessage = "failed to create provider:" + err.Error() + log.ErrorfCtx(ctx, " M (Solution): failed to create provider: %+v", err) return summary, err } - break } else { - targetResult[step.Target] = 0 - summary.AllAssignedDeployed = false - targetResultStatus := fmt.Sprintf("%s Failed", deploymentType) - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: targetResultStatus, ComponentResults: componentResults}) // TODO: this keeps only the last error on the target - time.Sleep(5 * time.Second) //TODO: make this configurable? + provider = override } - } - if stepError != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to execute deployment step: %+v", stepError) - successCount := 0 - for _, v := range targetResult { - successCount += v + if previousDesiredState != nil { + testState := MergeDeploymentStates(&previousDesiredState.State, currentState) + if s.canSkipStep(ctx, step, step.Target, provider.(tgt.ITargetProvider), previousDesiredState.State.Components, testState) { + log.InfofCtx(ctx, " M (Solution): skipping step with role %s on target %s", step.Role, step.Target) + targetResult[step.Target] = 1 + planSuccessCount++ + continue + } } - summary.CurrentDeployed += successCount - summary.SuccessCount = successCount - summary.AllAssignedDeployed = plannedCount == planSuccessCount - err = stepError - return summary, err - } - planSuccessCount++ - summary.CurrentDeployed += len(step.Components) - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) - return summary, err + log.DebugfCtx(ctx, " M (Solution): applying step with Role %s on target %s", step.Role, step.Target) + someStepsRan = true + retryCount := 1 + //TODO: set to 1 for now. Although retrying can help to handle transient errors, in more cases + // an error condition can't be resolved quickly. + var stepError error + var componentResults map[string]model.ComponentResultSpec + + // for _, component := range step.Components { + // for k, v := range component.Component.Properties { + // if strV, ok := v.(string); ok { + // parser := api_utils.NewParser(strV) + // eCtx := s.VendorContext.EvaluationContext.Clone() + // eCtx.DeploymentSpec = deployment + // eCtx.Component = component.Component.Name + // val, err := parser.Eval(*eCtx) + // if err == nil { + // component.Component.Properties[k] = val + // } else { + // log.ErrorfCtx(ctx, " M (Solution): failed to evaluate property: %+v", err) + // summary.SummaryMessage = fmt.Sprintf("failed to evaluate property '%s' on component '%s: %s", k, component.Component.Name, err.Error()) + // s.saveSummary(ctx, deployment, summary) + // return summary, err + // } + // } + // } + // } + + for i := 0; i < retryCount; i++ { + select { + case <-ctx.Done(): + // Context canceled or timed out + log.DebugfCtx(ctx, " M (Solution): reconcile canceled") + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + return summary, err + } + err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) + return summary, err + default: + componentResults, stepError = (provider.(tgt.ITargetProvider)).Apply(ctx, dep, step, deployment.IsDryRun) + if stepError == nil { + targetResult[step.Target] = 1 + summary.AllAssignedDeployed = plannedCount == planSuccessCount + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", ComponentResults: componentResults}) + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + return summary, err + } + break + } else { + targetResult[step.Target] = 0 + summary.AllAssignedDeployed = false + targetResultStatus := fmt.Sprintf("%s Failed", deploymentType) + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: targetResultStatus, ComponentResults: componentResults}) // TODO: this keeps only the last error on the target + + if v1alpha2.IsCanceled(stepError) { + log.ErrorfCtx(ctx, " M (Solution): reconcile canceled: %+v", stepError) + break + } + + time.Sleep(5 * time.Second) //TODO: make this configurable? + } + } + } + if stepError != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to execute deployment step: %+v", stepError) + + successCount := 0 + for _, v := range targetResult { + successCount += v + } + summary.CurrentDeployed += successCount + summary.SuccessCount = successCount + summary.AllAssignedDeployed = plannedCount == planSuccessCount + err = stepError + return summary, err + } + planSuccessCount++ + summary.CurrentDeployed += len(step.Components) + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + return summary, err + } + log.DebugfCtx(ctx, " M (Solution): reconcile save summary progress: current deployed %v out of total %v deployments", summary.PlannedDeployment, summary.CurrentDeployed) } - log.DebugfCtx(ctx, " M (Solution): reconcile save summary progress: current deployed %v out of total %v deployments", summary.PlannedDeployment, summary.CurrentDeployed) - } - mergedState.ClearAllRemoved() + mergedState.ClearAllRemoved() - if !deployment.IsDryRun { - if len(mergedState.TargetComponent) == 0 && remove { - log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") - s.StateProvider.Delete(ctx, states.DeleteRequest{ - ID: deployment.Instance.ObjectMeta.Name, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) - } else { - s.StateProvider.Upsert(ctx, states.UpsertRequest{ - Value: states.StateEntry{ + if !deployment.IsDryRun { + if len(mergedState.TargetComponent) == 0 && remove { + log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") + s.StateProvider.Delete(ctx, states.DeleteRequest{ ID: deployment.Instance.ObjectMeta.Name, - Body: SolutionManagerDeploymentState{ - Spec: deployment, - State: mergedState, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, }, - }, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) + }) + } else { + s.StateProvider.Upsert(ctx, states.UpsertRequest{ + Value: states.StateEntry{ + ID: deployment.Instance.ObjectMeta.Name, + Body: SolutionManagerDeploymentState{ + Spec: deployment, + State: mergedState, + }, + }, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + } } } @@ -835,6 +870,75 @@ func (s *SolutionManager) Reconcil() []error { return nil } +func (s *SolutionManager) InitCancelMap() { + s.jobList = make(map[string]map[string][]string) + s.cancelFunc = make(map[string]context.CancelFunc) +} + +func (s *SolutionManager) UntrackJob(ctx context.Context, namespace string, instance string, jobID string) { + for i, v := range s.jobList[namespace][instance] { + if v == jobID { + s.jobList[namespace][instance] = append(s.jobList[namespace][instance][:i], s.jobList[namespace][instance][i+1:]...) + } + } + log.InfofCtx(ctx, " M (Solution): UntrackJob, namespace %s, job id: %s", namespace, jobID) + if len(s.jobList[namespace][instance]) == 0 { + delete(s.jobList[namespace], instance) + } + delete(s.cancelFunc, generateJobIndex(namespace, instance, jobID)) +} + +func (s *SolutionManager) UntrackPreviousJob(ctx context.Context, namespace string, instance string, jobID string) { + if _, exists := s.jobList[namespace]; !exists { + return + } + + for i, v := range s.jobList[namespace][instance] { + if v < jobID { + s.jobList[namespace][instance] = append(s.jobList[namespace][instance][:i], s.jobList[namespace][instance][i+1:]...) + delete(s.cancelFunc, generateJobIndex(namespace, instance, v)) + } + } + if len(s.jobList[namespace][instance]) == 0 { + delete(s.jobList[namespace], instance) + } + log.InfofCtx(ctx, " M (Solution): UntrackPreviousJob, namespace %s, instance %s, job id: %s", namespace, instance, jobID) +} + +func (s *SolutionManager) TrackJob(ctx context.Context, namespace string, instance string, jobID string) { + if _, exists := s.jobList[namespace]; !exists { + s.jobList[namespace] = make(map[string][]string) + } + if _, exists := s.jobList[namespace][instance]; !exists { + s.jobList[namespace][instance] = []string{jobID} + } else { + s.jobList[namespace][instance] = append(s.jobList[namespace][instance], jobID) + } + log.InfofCtx(ctx, " M (Solution): TrackJob, namespace %s, instance: %s, job id: %s", namespace, instance, jobID) +} + +func (s *SolutionManager) AddCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) { + index := generateJobIndex(namespace, instance, jobID) + s.cancelFunc[index] = cancel +} + +func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) { + if _, exists := s.jobList[namespace]; exists { + for _, id := range s.jobList[namespace][instance] { + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, scanning job id: %s", id) + if id < jobID { + // only cancel jobs prior to the delete job + s.cancelFunc[generateJobIndex(namespace, instance, id)]() + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", id) + } + } + } +} + +func generateJobIndex(namespace string, instance string, jobID string) string { + return namespace + constants.ResourceSeperator + instance + constants.ResourceSeperator + jobID +} + func findAgentFromDeploymentState(state model.DeploymentState, targetName string) string { for _, targetDes := range state.Targets { if targetName == targetDes.Name { diff --git a/api/pkg/apis/v1alpha1/utils/apiclient.go b/api/pkg/apis/v1alpha1/utils/apiclient.go index 1262fdca1..1417333ca 100644 --- a/api/pkg/apis/v1alpha1/utils/apiclient.go +++ b/api/pkg/apis/v1alpha1/utils/apiclient.go @@ -50,6 +50,7 @@ type ( Dispatcher interface { QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error QueueDeploymentJob(ctx context.Context, namespace string, isDelete bool, deployment model.DeploymentSpec, user string, password string) error + CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error } ApiClient interface { @@ -533,6 +534,28 @@ func (a *apiClient) QueueDeploymentJob(ctx context.Context, namespace string, is return nil } +func (a *apiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error { + // func (a *apiClient) CancelDeploymentJob(ctx context.Context, namespace string, deployment model.DeploymentSpec) error { + token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password) + if err != nil { + return err + } + + path := "solution/cancel" + query := url.Values{ + "namespace": []string{namespace}, + "instance": []string{id}, + "jobid": []string{jobId}, + } + + log.DebugfCtx(ctx, "apiClient.CancelDeploymentJob: Deployment id: %s, namespace: %v", id, namespace) + _, err = a.callRestAPI(ctx, fmt.Sprintf("%s?%s", path, query.Encode()), "POST", nil, token) + if err != nil { + return err + } + return nil +} + // Deprecated: Use QueueDeploymentJob instead func (a *apiClient) QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error { token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password) diff --git a/api/pkg/apis/v1alpha1/vendors/job-vendor.go b/api/pkg/apis/v1alpha1/vendors/job-vendor.go index 06bf9048d..43e1b22a3 100644 --- a/api/pkg/apis/v1alpha1/vendors/job-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/job-vendor.go @@ -97,10 +97,6 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana }, }) - if err != nil { - return err - } - return nil } diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go index b7c0688f9..350bd6b23 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go @@ -10,6 +10,7 @@ import ( "context" "encoding/json" "fmt" + "time" "github.com/eclipse-symphony/symphony/api/constants" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution" @@ -30,6 +31,10 @@ type SolutionVendor struct { SolutionManager *solution.SolutionManager } +const ( + defaultTimeout = 60 * time.Minute +) + func (o *SolutionVendor) GetInfo() vendors.VendorInfo { return vendors.VendorInfo{ Version: o.Vendor.Version, @@ -51,6 +56,7 @@ func (e *SolutionVendor) Init(config vendors.VendorConfig, factories []managers. if e.SolutionManager == nil { return v1alpha2.NewCOAError(nil, "solution manager is not supplied", v1alpha2.MissingConfig) } + e.SolutionManager.InitCancelMap() return nil } @@ -73,6 +79,12 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint { Parameters: []string{"delete?"}, Handler: o.onReconcile, }, + { + Methods: []string{fasthttp.MethodPost}, + Route: route + "/cancel", + Version: o.Version, + Handler: o.onCancel, + }, { Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodDelete}, Route: route + "/queue", @@ -155,6 +167,16 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon }) } instance = deployment.Instance.ObjectMeta.Name + + if delete == "true" { + // cancel the jobs in queue + sLog.InfofCtx(rContext, "V (Solution): onQueue, delete instance: %s, job id: %s", instance, deployment.JobID) + c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, deployment.JobID) + } else { + // track the job id for an ongoing job + sLog.InfofCtx(rContext, "V (Solution): onQueue, add tracking job id for instance: %s, job id: %s", instance, deployment.JobID) + c.SolutionManager.TrackJob(rContext, namespace, instance, deployment.JobID) + } } if instance == "" { @@ -245,14 +267,31 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe Body: []byte(err.Error()), }) } - delete := request.Parameters["delete"] + isRemove := request.Parameters["delete"] targetName := "" if request.Metadata != nil { if v, ok := request.Metadata["active-target"]; ok { targetName = v } } - summary, err := c.SolutionManager.Reconcile(ctx, deployment, delete == "true", namespace, targetName) + + instance := deployment.Instance.ObjectMeta.Name + sLog.InfofCtx(ctx, "V (Solution): onReconcile create context with timeout and cancel function for instance: %s, job id: %s", instance, deployment.JobID) + cancelCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + c.SolutionManager.AddCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) + defer func() { + log.InfofCtx(rContext, "V (Solution): onReconcile, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, isRemove) + if isRemove == "true" { + // if delete completes, remove all the ongoing job list and cancel function prior to this job, if any + c.SolutionManager.UntrackPreviousJob(rContext, namespace, instance, deployment.JobID) + } else { + // remove the reconcile job from list + c.SolutionManager.UntrackJob(rContext, namespace, instance, deployment.JobID) + } + cancel() + }() + + summary, err := c.SolutionManager.Reconcile(cancelCtx, deployment, isRemove == "true", namespace, targetName) data, _ := json.Marshal(summary) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): onReconcile failed POST - reconcile %s", err.Error()) @@ -275,6 +314,26 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe }) } +func (c *SolutionVendor) onCancel(request v1alpha2.COARequest) v1alpha2.COAResponse { + rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ + "method": "onCancel", + }) + defer span.End() + + namespace := request.Parameters["namespace"] + instance := request.Parameters["instance"] + jobId := request.Parameters["jobId"] + + sLog.InfofCtx(rContext, "V (Solution): onCancel instance: %s job ID: %s", instance, jobId) + c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, jobId) + + return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ + State: v1alpha2.OK, + Body: []byte("{\"result\":\"200 - OK\"}"), + ContentType: "application/json", + }) +} + func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onApplyDeployment", diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go index 25218c3f9..deea2658b 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go @@ -172,7 +172,7 @@ func TestSolutionEndpoints(t *testing.T) { vendor := createSolutionVendor() vendor.Route = "solution" endpoints := vendor.GetEndpoints() - assert.Equal(t, 3, len(endpoints)) + assert.Equal(t, 4, len(endpoints)) } func TestSolutionInfo(t *testing.T) { diff --git a/coa/pkg/apis/v1alpha2/errors.go b/coa/pkg/apis/v1alpha2/errors.go index fd078ff10..e0cc27065 100644 --- a/coa/pkg/apis/v1alpha2/errors.go +++ b/coa/pkg/apis/v1alpha2/errors.go @@ -69,6 +69,15 @@ func IsNotFound(err error) bool { } return coaE.State == NotFound } + +func IsCanceled(err error) bool { + coaE, ok := err.(COAError) + if !ok { + return false + } + return coaE.State == Canceled +} + func IsDelayed(err error) bool { coaE, ok := err.(COAError) if !ok { diff --git a/coa/pkg/apis/v1alpha2/types.go b/coa/pkg/apis/v1alpha2/types.go index 11416e6cf..2da47fb2d 100644 --- a/coa/pkg/apis/v1alpha2/types.go +++ b/coa/pkg/apis/v1alpha2/types.go @@ -55,6 +55,7 @@ const ( ValidateFailed State = 8003 Updated State = 8004 Deleted State = 8005 + Canceled State = 8006 // Workflow status Running State = 9994 Paused State = 9995 @@ -209,6 +210,8 @@ func (s State) String() string { return "Updated" case Deleted: return "Deleted" + case Canceled: + return "Canceled" case Running: return "Running" case Paused: From 5ee0e1cfc00b18d2a505758620adedae4e1213b7 Mon Sep 17 00:00:00 2001 From: linyguo Date: Fri, 1 Nov 2024 17:16:43 +0800 Subject: [PATCH 2/7] update reconcile cancel process --- .../managers/solution/solution-manager.go | 83 +++++++++++-------- .../apis/v1alpha1/vendors/solution-vendor.go | 17 ++-- k8s/testing/mocks.go | 6 ++ 3 files changed, 64 insertions(+), 42 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 49f08598d..51e906f0a 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -65,7 +65,7 @@ type SolutionManager struct { IsTarget bool TargetNames []string ApiClientHttp api_utils.ApiClient - jobList map[string]map[string][]string + jobList map[string]map[string][]int cancelFunc map[string]context.CancelFunc } @@ -871,68 +871,85 @@ func (s *SolutionManager) Reconcil() []error { } func (s *SolutionManager) InitCancelMap() { - s.jobList = make(map[string]map[string][]string) + s.jobList = make(map[string]map[string][]int) s.cancelFunc = make(map[string]context.CancelFunc) } -func (s *SolutionManager) UntrackJob(ctx context.Context, namespace string, instance string, jobID string) { - for i, v := range s.jobList[namespace][instance] { - if v == jobID { - s.jobList[namespace][instance] = append(s.jobList[namespace][instance][:i], s.jobList[namespace][instance][i+1:]...) - } +func (s *SolutionManager) TrackJob(ctx context.Context, namespace string, instance string, jobID string) error { + log.InfofCtx(ctx, " M (Solution): TrackJob, namespace %s, instance: %s, job id: %s", namespace, instance, jobID) + jobIdNum, err := convertJobIdToInt(jobID) + if err != nil { + return err } - log.InfofCtx(ctx, " M (Solution): UntrackJob, namespace %s, job id: %s", namespace, jobID) - if len(s.jobList[namespace][instance]) == 0 { - delete(s.jobList[namespace], instance) + + if _, exists := s.jobList[namespace]; !exists { + s.jobList[namespace] = make(map[string][]int) } - delete(s.cancelFunc, generateJobIndex(namespace, instance, jobID)) + if _, exists := s.jobList[namespace][instance]; !exists { + s.jobList[namespace][instance] = []int{jobIdNum} + } else { + s.jobList[namespace][instance] = append(s.jobList[namespace][instance], jobIdNum) + } + return nil } -func (s *SolutionManager) UntrackPreviousJob(ctx context.Context, namespace string, instance string, jobID string) { +func (s *SolutionManager) UntrackJob(ctx context.Context, namespace string, instance string, jobID string) error { + log.InfofCtx(ctx, " M (Solution): UntrackJob, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) + delete(s.cancelFunc, generateJobIndex(namespace, instance, jobID)) + jobIdNum, err := convertJobIdToInt(jobID) + if err != nil { + return err + } + if _, exists := s.jobList[namespace]; !exists { - return + return nil } for i, v := range s.jobList[namespace][instance] { - if v < jobID { + if v == jobIdNum { s.jobList[namespace][instance] = append(s.jobList[namespace][instance][:i], s.jobList[namespace][instance][i+1:]...) - delete(s.cancelFunc, generateJobIndex(namespace, instance, v)) } } if len(s.jobList[namespace][instance]) == 0 { delete(s.jobList[namespace], instance) } - log.InfofCtx(ctx, " M (Solution): UntrackPreviousJob, namespace %s, instance %s, job id: %s", namespace, instance, jobID) -} - -func (s *SolutionManager) TrackJob(ctx context.Context, namespace string, instance string, jobID string) { - if _, exists := s.jobList[namespace]; !exists { - s.jobList[namespace] = make(map[string][]string) - } - if _, exists := s.jobList[namespace][instance]; !exists { - s.jobList[namespace][instance] = []string{jobID} - } else { - s.jobList[namespace][instance] = append(s.jobList[namespace][instance], jobID) - } - log.InfofCtx(ctx, " M (Solution): TrackJob, namespace %s, instance: %s, job id: %s", namespace, instance, jobID) + return nil } func (s *SolutionManager) AddCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) { + log.InfofCtx(ctx, " M (Solution): AddCancelFunc, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) index := generateJobIndex(namespace, instance, jobID) s.cancelFunc[index] = cancel } -func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) { +func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) error { + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) + jobIdNum, err := convertJobIdToInt(jobID) + if err != nil { + return err + } + if _, exists := s.jobList[namespace]; exists { for _, id := range s.jobList[namespace][instance] { - log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, scanning job id: %s", id) - if id < jobID { + if id < jobIdNum { // only cancel jobs prior to the delete job - s.cancelFunc[generateJobIndex(namespace, instance, id)]() - log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", id) + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, found previous job id: %v", id) + if cancel, exists := s.cancelFunc[generateJobIndex(namespace, instance, strconv.Itoa(id))]; exists { + cancel() + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", id) + } } } } + return nil +} + +func convertJobIdToInt(jobID string) (int, error) { + num, err := strconv.Atoi(jobID) + if err != nil { + return 0, v1alpha2.NewCOAError(err, "Invalid JobID", v1alpha2.BadConfig) + } + return num, nil } func generateJobIndex(namespace string, instance string, jobID string) string { diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go index 350bd6b23..9ddeef7ab 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go @@ -93,6 +93,7 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint { }, } } + func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onQueue", @@ -276,19 +277,17 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe } instance := deployment.Instance.ObjectMeta.Name - sLog.InfofCtx(ctx, "V (Solution): onReconcile create context with timeout and cancel function for instance: %s, job id: %s", instance, deployment.JobID) + sLog.InfofCtx(ctx, "V (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %s", instance, deployment.JobID, isRemove) cancelCtx, cancel := context.WithTimeout(ctx, defaultTimeout) - c.SolutionManager.AddCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) + if isRemove != "true" { + c.SolutionManager.AddCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) + } defer func() { - log.InfofCtx(rContext, "V (Solution): onReconcile, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, isRemove) - if isRemove == "true" { - // if delete completes, remove all the ongoing job list and cancel function prior to this job, if any - c.SolutionManager.UntrackPreviousJob(rContext, namespace, instance, deployment.JobID) - } else { - // remove the reconcile job from list + log.InfofCtx(rContext, "V (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, isRemove) + cancel() + if isRemove != "true" { c.SolutionManager.UntrackJob(rContext, namespace, instance, deployment.JobID) } - cancel() }() summary, err := c.SolutionManager.Reconcile(cancelCtx, deployment, isRemove == "true", namespace, targetName) diff --git a/k8s/testing/mocks.go b/k8s/testing/mocks.go index e0bc2340c..e5ed30e3b 100644 --- a/k8s/testing/mocks.go +++ b/k8s/testing/mocks.go @@ -307,6 +307,12 @@ func (c *MockApiClient) QueueDeploymentJob(ctx context.Context, namespace string return args.Error(0) } +// CancelDeploymentJob implements utils.ApiClient. +func (c *MockApiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error { + args := c.Called(ctx, namespace, id, jobId, namespace) + return args.Error(0) +} + // QueueJob implements ApiClient. // Deprecated and not used. func (c *MockApiClient) QueueJob(ctx context.Context, id string, scope string, isDelete bool, isTarget bool, user string, password string) error { From 6f0203ff37729883bf8ecfb582ed12e974067c9f Mon Sep 17 00:00:00 2001 From: linyguo Date: Fri, 8 Nov 2024 15:29:42 +0800 Subject: [PATCH 3/7] update solution manager --- .../managers/solution/solution-manager.go | 178 ++++++++---------- api/pkg/apis/v1alpha1/vendors/job-vendor.go | 2 + .../apis/v1alpha1/vendors/solution-vendor.go | 16 +- 3 files changed, 86 insertions(+), 110 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 280ed1bd2..6bffc3a0f 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -17,7 +17,6 @@ import ( "sync" "time" - "github.com/eclipse-symphony/symphony/api/constants" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution/metrics" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/model" sp "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/providers" @@ -65,8 +64,7 @@ type SolutionManager struct { IsTarget bool TargetNames []string ApiClientHttp api_utils.ApiClient - jobList map[string]map[string][]int - cancelFunc map[string]context.CancelFunc + jobList map[string]map[string]map[string]context.CancelFunc } type SolutionManagerDeploymentState struct { @@ -423,16 +421,12 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy plannedCount := 0 planSuccessCount := 0 + log.DebugfCtx(ctx, " M (Solution): plan.Steps: %v", len(plan.Steps)) for _, step := range plan.Steps { select { case <-ctx.Done(): // Context canceled or timed out log.DebugfCtx(ctx, " M (Solution): reconcile canceled") - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) - return summary, err - } err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) return summary, err default: @@ -520,11 +514,6 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy case <-ctx.Done(): // Context canceled or timed out log.DebugfCtx(ctx, " M (Solution): reconcile canceled") - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) - return summary, err - } err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) return summary, err default: @@ -577,38 +566,37 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy } log.DebugfCtx(ctx, " M (Solution): reconcile save summary progress: current deployed %v out of total %v deployments", summary.PlannedDeployment, summary.CurrentDeployed) } + } - mergedState.ClearAllRemoved() - - if !deployment.IsDryRun { - if len(mergedState.TargetComponent) == 0 && remove { - log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") - s.StateProvider.Delete(ctx, states.DeleteRequest{ + mergedState.ClearAllRemoved() + if !deployment.IsDryRun { + if len(mergedState.TargetComponent) == 0 && remove { + log.DebugfCtx(ctx, " M (Solution): no assigned components to manage, deleting state") + s.StateProvider.Delete(ctx, states.DeleteRequest{ + ID: deployment.Instance.ObjectMeta.Name, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) + } else { + s.StateProvider.Upsert(ctx, states.UpsertRequest{ + Value: states.StateEntry{ ID: deployment.Instance.ObjectMeta.Name, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) - } else { - s.StateProvider.Upsert(ctx, states.UpsertRequest{ - Value: states.StateEntry{ - ID: deployment.Instance.ObjectMeta.Name, - Body: SolutionManagerDeploymentState{ - Spec: deployment, - State: mergedState, - }, + Body: SolutionManagerDeploymentState{ + Spec: deployment, + State: mergedState, }, - Metadata: map[string]interface{}{ - "namespace": namespace, - "group": model.SolutionGroup, - "version": "v1", - "resource": DeploymentState, - }, - }) - } + }, + Metadata: map[string]interface{}{ + "namespace": namespace, + "group": model.SolutionGroup, + "version": "v1", + "resource": DeploymentState, + }, + }) } } @@ -872,89 +860,87 @@ func (s *SolutionManager) Reconcil() []error { } func (s *SolutionManager) InitCancelMap() { - s.jobList = make(map[string]map[string][]int) - s.cancelFunc = make(map[string]context.CancelFunc) + s.jobList = make(map[string]map[string]map[string]context.CancelFunc) } -func (s *SolutionManager) TrackJob(ctx context.Context, namespace string, instance string, jobID string) error { - log.InfofCtx(ctx, " M (Solution): TrackJob, namespace %s, instance: %s, job id: %s", namespace, instance, jobID) - jobIdNum, err := convertJobIdToInt(jobID) - if err != nil { - return err - } - - if _, exists := s.jobList[namespace]; !exists { - s.jobList[namespace] = make(map[string][]int) - } - if _, exists := s.jobList[namespace][instance]; !exists { - s.jobList[namespace][instance] = []int{jobIdNum} +func (s *SolutionManager) HandleCancelableJobEvent(ctx context.Context, namespace string, instance string, jobID string, isRemove string) { + if isRemove == "true" { + // cancel the jobs in queue + log.InfofCtx(ctx, " M (Solution): HandleCancelableJobEvent, delete instance: %s, job id: %s", instance, jobID) + s.CancelPreviousJobs(ctx, namespace, instance, jobID) } else { - s.jobList[namespace][instance] = append(s.jobList[namespace][instance], jobIdNum) + // add the job id for an ongoing job + log.InfofCtx(ctx, " M (Solution): HandleCancelableJobEvent, adding job id for instance: %s, job id: %s", instance, jobID) + if _, exists := s.jobList[namespace]; !exists { + s.jobList[namespace] = make(map[string]map[string]context.CancelFunc) + } + if _, exists := s.jobList[namespace][instance]; !exists { + s.jobList[namespace][instance] = make(map[string]context.CancelFunc) + } + + s.jobList[namespace][instance][jobID] = nil } - return nil } -func (s *SolutionManager) UntrackJob(ctx context.Context, namespace string, instance string, jobID string) error { - log.InfofCtx(ctx, " M (Solution): UntrackJob, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) - delete(s.cancelFunc, generateJobIndex(namespace, instance, jobID)) - jobIdNum, err := convertJobIdToInt(jobID) - if err != nil { - return err +func (s *SolutionManager) HandleReconcileCancelEvent(ctx context.Context, namespace string, instance string, jobID string, isRemove string, cancel context.CancelFunc) { + log.InfofCtx(ctx, "V (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, jobID, isRemove) + cancel() + if isRemove != "true" { + if _, exists := s.jobList[namespace]; exists { + for jid := range s.jobList[namespace][instance] { + if jid == jobID { + delete(s.jobList[namespace][instance], jid) + } + } + if len(s.jobList[namespace][instance]) == 0 { + delete(s.jobList[namespace], instance) + } + } } +} +func (s *SolutionManager) AddCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) error { + log.InfofCtx(ctx, " M (Solution): AddCancelFunc, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) if _, exists := s.jobList[namespace]; !exists { - return nil + return v1alpha2.NewCOAError(nil, "Job is not queued", v1alpha2.InternalError) } - - for i, v := range s.jobList[namespace][instance] { - if v == jobIdNum { - s.jobList[namespace][instance] = append(s.jobList[namespace][instance][:i], s.jobList[namespace][instance][i+1:]...) - } - } - if len(s.jobList[namespace][instance]) == 0 { - delete(s.jobList[namespace], instance) + if _, exists := s.jobList[namespace][instance]; !exists { + return v1alpha2.NewCOAError(nil, "Job is not queued", v1alpha2.InternalError) } - return nil -} -func (s *SolutionManager) AddCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) { - log.InfofCtx(ctx, " M (Solution): AddCancelFunc, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) - index := generateJobIndex(namespace, instance, jobID) - s.cancelFunc[index] = cancel + s.jobList[namespace][instance][jobID] = cancel + return nil } func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) error { log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) - jobIdNum, err := convertJobIdToInt(jobID) - if err != nil { - return err - } if _, exists := s.jobList[namespace]; exists { - for _, id := range s.jobList[namespace][instance] { - if id < jobIdNum { + for jid, cancelJob := range s.jobList[namespace][instance] { + if convertJobIdToInt(jid) < convertJobIdToInt(jobID) { // only cancel jobs prior to the delete job - log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, found previous job id: %v", id) - if cancel, exists := s.cancelFunc[generateJobIndex(namespace, instance, strconv.Itoa(id))]; exists { - cancel() - log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", id) + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, found previous job id: %s", jid) + if cancelJob != nil { + cancelJob() + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", jid) + delete(s.jobList[namespace][instance], jid) } } } } + + if len(s.jobList[namespace][instance]) == 0 { + delete(s.jobList[namespace], instance) + } return nil } -func convertJobIdToInt(jobID string) (int, error) { +func convertJobIdToInt(jobID string) int { num, err := strconv.Atoi(jobID) if err != nil { - return 0, v1alpha2.NewCOAError(err, "Invalid JobID", v1alpha2.BadConfig) + return 0 } - return num, nil -} - -func generateJobIndex(namespace string, instance string, jobID string) string { - return namespace + constants.ResourceSeperator + instance + constants.ResourceSeperator + jobID + return num } func findAgentFromDeploymentState(state model.DeploymentState, targetName string) string { diff --git a/api/pkg/apis/v1alpha1/vendors/job-vendor.go b/api/pkg/apis/v1alpha1/vendors/job-vendor.go index 43e1b22a3..8856c42b3 100644 --- a/api/pkg/apis/v1alpha1/vendors/job-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/job-vendor.go @@ -9,6 +9,7 @@ package vendors import ( "context" "encoding/json" + "time" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/jobs" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/utils" @@ -72,6 +73,7 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana } err := e.JobsManager.HandleJobEvent(ctx, event) if err != nil && v1alpha2.IsDelayed(err) { + time.Sleep(5 * time.Second) go e.Vendor.Context.Publish(topic, event) } // job reconciler already has a retry mechanism, return nil to avoid retrying diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go index 9ddeef7ab..91b526524 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go @@ -169,15 +169,7 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon } instance = deployment.Instance.ObjectMeta.Name - if delete == "true" { - // cancel the jobs in queue - sLog.InfofCtx(rContext, "V (Solution): onQueue, delete instance: %s, job id: %s", instance, deployment.JobID) - c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, deployment.JobID) - } else { - // track the job id for an ongoing job - sLog.InfofCtx(rContext, "V (Solution): onQueue, add tracking job id for instance: %s, job id: %s", instance, deployment.JobID) - c.SolutionManager.TrackJob(rContext, namespace, instance, deployment.JobID) - } + c.SolutionManager.HandleCancelableJobEvent(rContext, namespace, instance, deployment.JobID, delete) } if instance == "" { @@ -283,11 +275,7 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe c.SolutionManager.AddCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) } defer func() { - log.InfofCtx(rContext, "V (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, isRemove) - cancel() - if isRemove != "true" { - c.SolutionManager.UntrackJob(rContext, namespace, instance, deployment.JobID) - } + c.SolutionManager.HandleReconcileCancelEvent(rContext, namespace, instance, deployment.JobID, isRemove, cancel) }() summary, err := c.SolutionManager.Reconcile(cancelCtx, deployment, isRemove == "true", namespace, targetName) From 94a37cef906fc7ec17c8519000d4b7c6a0581bd0 Mon Sep 17 00:00:00 2001 From: linyguo Date: Fri, 15 Nov 2024 16:45:55 +0800 Subject: [PATCH 4/7] update comments --- .../managers/solution/solution-manager.go | 100 ++++++++++-------- api/pkg/apis/v1alpha1/vendors/job-vendor.go | 2 +- .../apis/v1alpha1/vendors/solution-vendor.go | 24 +---- 3 files changed, 61 insertions(+), 65 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 6bffc3a0f..ef97f2d89 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -37,6 +37,7 @@ import ( var ( log = logger.NewLogger("coa.runtime") lock sync.Mutex + jobListLock sync.RWMutex apiOperationMetrics *metrics.Metrics ) @@ -53,6 +54,8 @@ const ( Summary = "Summary" DeploymentState = "DeployState" + + defaultTimeout = 60 * time.Minute ) type SolutionManager struct { @@ -140,9 +143,15 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. if err != nil { return err } + + s.initCancelMap() return nil } +func (s *SolutionManager) initCancelMap() { + s.jobList = make(map[string]map[string]map[string]context.CancelFunc) +} + func (s *SolutionManager) getPreviousState(ctx context.Context, instance string, namespace string) *SolutionManagerDeploymentState { state, err := s.StateProvider.Get(ctx, states.GetRequest{ ID: instance, @@ -421,12 +430,12 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy plannedCount := 0 planSuccessCount := 0 - log.DebugfCtx(ctx, " M (Solution): plan.Steps: %v", len(plan.Steps)) + log.DebugfCtx(ctx, " M (Solution): count of plan.Steps: %v", len(plan.Steps)) for _, step := range plan.Steps { select { case <-ctx.Done(): // Context canceled or timed out - log.DebugfCtx(ctx, " M (Solution): reconcile canceled") + log.DebugCtx(ctx, " M (Solution): reconcile canceled") err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) return summary, err default: @@ -859,60 +868,66 @@ func (s *SolutionManager) Reconcil() []error { return nil } -func (s *SolutionManager) InitCancelMap() { - s.jobList = make(map[string]map[string]map[string]context.CancelFunc) -} - -func (s *SolutionManager) HandleCancelableJobEvent(ctx context.Context, namespace string, instance string, jobID string, isRemove string) { - if isRemove == "true" { - // cancel the jobs in queue - log.InfofCtx(ctx, " M (Solution): HandleCancelableJobEvent, delete instance: %s, job id: %s", instance, jobID) - s.CancelPreviousJobs(ctx, namespace, instance, jobID) - } else { - // add the job id for an ongoing job - log.InfofCtx(ctx, " M (Solution): HandleCancelableJobEvent, adding job id for instance: %s, job id: %s", instance, jobID) - if _, exists := s.jobList[namespace]; !exists { - s.jobList[namespace] = make(map[string]map[string]context.CancelFunc) - } - if _, exists := s.jobList[namespace][instance]; !exists { - s.jobList[namespace][instance] = make(map[string]context.CancelFunc) - } +func (s *SolutionManager) ReconcileWithCancelWrapper(ctx context.Context, deployment model.DeploymentSpec, remove bool, namespace string, targetName string) (model.SummarySpec, error) { + instance := deployment.Instance.ObjectMeta.Name + log.InfofCtx(ctx, " M (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %s", instance, deployment.JobID, remove) - s.jobList[namespace][instance][jobID] = nil + // Two conditions when the context will be canceled: + // 1. default timeout reached; + // 2. cancel() is called + cancelCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + if !remove { + // Track the CancelFunc for ongoing reconcile job + s.addCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) } -} -func (s *SolutionManager) HandleReconcileCancelEvent(ctx context.Context, namespace string, instance string, jobID string, isRemove string, cancel context.CancelFunc) { - log.InfofCtx(ctx, "V (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, jobID, isRemove) - cancel() - if isRemove != "true" { - if _, exists := s.jobList[namespace]; exists { - for jid := range s.jobList[namespace][instance] { - if jid == jobID { - delete(s.jobList[namespace][instance], jid) - } - } - if len(s.jobList[namespace][instance]) == 0 { - delete(s.jobList[namespace], instance) - } + defer func() { + // call cancel to release resource and clean up the cancelFunc in jobList + log.InfofCtx(ctx, " M (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, remove) + cancel() + if !remove { + s.cleanUpCancelFunc(namespace, instance, deployment.JobID) } - } + }() + + summary, err := s.Reconcile(cancelCtx, deployment, remove, namespace, targetName) + return summary, err } -func (s *SolutionManager) AddCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) error { +func (s *SolutionManager) addCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) { + jobListLock.Lock() + defer jobListLock.Unlock() log.InfofCtx(ctx, " M (Solution): AddCancelFunc, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) + if _, exists := s.jobList[namespace]; !exists { - return v1alpha2.NewCOAError(nil, "Job is not queued", v1alpha2.InternalError) + s.jobList[namespace] = make(map[string]map[string]context.CancelFunc) } if _, exists := s.jobList[namespace][instance]; !exists { - return v1alpha2.NewCOAError(nil, "Job is not queued", v1alpha2.InternalError) + s.jobList[namespace][instance] = make(map[string]context.CancelFunc) } s.jobList[namespace][instance][jobID] = cancel - return nil +} + +func (s *SolutionManager) cleanUpCancelFunc(namespace string, instance string, jobID string) { + jobListLock.Lock() + defer jobListLock.Unlock() + + if _, exists := s.jobList[namespace]; exists { + for jid := range s.jobList[namespace][instance] { + if jid == jobID { + delete(s.jobList[namespace][instance], jid) + } + } + if len(s.jobList[namespace][instance]) == 0 { + delete(s.jobList[namespace], instance) + } + } } func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) error { + jobListLock.Lock() + defer jobListLock.Unlock() log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) if _, exists := s.jobList[namespace]; exists { @@ -923,15 +938,10 @@ func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace stri if cancelJob != nil { cancelJob() log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", jid) - delete(s.jobList[namespace][instance], jid) } } } } - - if len(s.jobList[namespace][instance]) == 0 { - delete(s.jobList[namespace], instance) - } return nil } diff --git a/api/pkg/apis/v1alpha1/vendors/job-vendor.go b/api/pkg/apis/v1alpha1/vendors/job-vendor.go index 8856c42b3..acb48af06 100644 --- a/api/pkg/apis/v1alpha1/vendors/job-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/job-vendor.go @@ -99,7 +99,7 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana }, }) - return nil + return err } func (o *JobVendor) GetEndpoints() []v1alpha2.Endpoint { diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go index 91b526524..a800e8c7f 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go @@ -10,7 +10,6 @@ import ( "context" "encoding/json" "fmt" - "time" "github.com/eclipse-symphony/symphony/api/constants" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution" @@ -31,10 +30,6 @@ type SolutionVendor struct { SolutionManager *solution.SolutionManager } -const ( - defaultTimeout = 60 * time.Minute -) - func (o *SolutionVendor) GetInfo() vendors.VendorInfo { return vendors.VendorInfo{ Version: o.Vendor.Version, @@ -56,7 +51,6 @@ func (e *SolutionVendor) Init(config vendors.VendorConfig, factories []managers. if e.SolutionManager == nil { return v1alpha2.NewCOAError(nil, "solution manager is not supplied", v1alpha2.MissingConfig) } - e.SolutionManager.InitCancelMap() return nil } @@ -169,7 +163,9 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon } instance = deployment.Instance.ObjectMeta.Name - c.SolutionManager.HandleCancelableJobEvent(rContext, namespace, instance, deployment.JobID, delete) + if delete == "true" { + c.SolutionManager.CancelPreviousJobs(ctx, namespace, instance, deployment.JobID) + } } if instance == "" { @@ -260,7 +256,7 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe Body: []byte(err.Error()), }) } - isRemove := request.Parameters["delete"] + delete := request.Parameters["delete"] targetName := "" if request.Metadata != nil { if v, ok := request.Metadata["active-target"]; ok { @@ -268,17 +264,7 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe } } - instance := deployment.Instance.ObjectMeta.Name - sLog.InfofCtx(ctx, "V (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %s", instance, deployment.JobID, isRemove) - cancelCtx, cancel := context.WithTimeout(ctx, defaultTimeout) - if isRemove != "true" { - c.SolutionManager.AddCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) - } - defer func() { - c.SolutionManager.HandleReconcileCancelEvent(rContext, namespace, instance, deployment.JobID, isRemove, cancel) - }() - - summary, err := c.SolutionManager.Reconcile(cancelCtx, deployment, isRemove == "true", namespace, targetName) + summary, err := c.SolutionManager.ReconcileWithCancelWrapper(ctx, deployment, delete == "true", namespace, targetName) data, _ := json.Marshal(summary) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): onReconcile failed POST - reconcile %s", err.Error()) From 9353be9180e9ab4f6e2b8f3e01ae1301af7d7d6f Mon Sep 17 00:00:00 2001 From: linyguo Date: Fri, 6 Dec 2024 16:10:12 +0800 Subject: [PATCH 5/7] update job list --- .../managers/solution/solution-manager.go | 60 +++++++++---------- 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index ef97f2d89..8375f661f 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -58,6 +58,11 @@ const ( defaultTimeout = 60 * time.Minute ) +type JobIdentifier struct { + Namespace string + Name string +} + type SolutionManager struct { managers.Manager TargetProviders map[string]tgt.ITargetProvider @@ -67,7 +72,7 @@ type SolutionManager struct { IsTarget bool TargetNames []string ApiClientHttp api_utils.ApiClient - jobList map[string]map[string]map[string]context.CancelFunc + jobList map[JobIdentifier]map[string]context.CancelFunc } type SolutionManagerDeploymentState struct { @@ -149,7 +154,7 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. } func (s *SolutionManager) initCancelMap() { - s.jobList = make(map[string]map[string]map[string]context.CancelFunc) + s.jobList = make(map[JobIdentifier]map[string]context.CancelFunc) } func (s *SolutionManager) getPreviousState(ctx context.Context, instance string, namespace string) *SolutionManagerDeploymentState { @@ -870,7 +875,7 @@ func (s *SolutionManager) Reconcil() []error { func (s *SolutionManager) ReconcileWithCancelWrapper(ctx context.Context, deployment model.DeploymentSpec, remove bool, namespace string, targetName string) (model.SummarySpec, error) { instance := deployment.Instance.ObjectMeta.Name - log.InfofCtx(ctx, " M (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %s", instance, deployment.JobID, remove) + log.InfofCtx(ctx, " M (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %v", instance, deployment.JobID, remove) // Two conditions when the context will be canceled: // 1. default timeout reached; @@ -883,7 +888,7 @@ func (s *SolutionManager) ReconcileWithCancelWrapper(ctx context.Context, deploy defer func() { // call cancel to release resource and clean up the cancelFunc in jobList - log.InfofCtx(ctx, " M (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %s", namespace, instance, deployment.JobID, remove) + log.InfofCtx(ctx, " M (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %v", namespace, instance, deployment.JobID, remove) cancel() if !remove { s.cleanUpCancelFunc(namespace, instance, deployment.JobID) @@ -899,46 +904,41 @@ func (s *SolutionManager) addCancelFunc(ctx context.Context, namespace string, i defer jobListLock.Unlock() log.InfofCtx(ctx, " M (Solution): AddCancelFunc, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) - if _, exists := s.jobList[namespace]; !exists { - s.jobList[namespace] = make(map[string]map[string]context.CancelFunc) - } - if _, exists := s.jobList[namespace][instance]; !exists { - s.jobList[namespace][instance] = make(map[string]context.CancelFunc) + jobKey := JobIdentifier{Namespace: namespace, Name: instance} + if _, exists := s.jobList[jobKey]; !exists { + s.jobList[jobKey] = make(map[string]context.CancelFunc) } - s.jobList[namespace][instance][jobID] = cancel + s.jobList[jobKey][jobID] = cancel } func (s *SolutionManager) cleanUpCancelFunc(namespace string, instance string, jobID string) { jobListLock.Lock() defer jobListLock.Unlock() - if _, exists := s.jobList[namespace]; exists { - for jid := range s.jobList[namespace][instance] { - if jid == jobID { - delete(s.jobList[namespace][instance], jid) - } - } - if len(s.jobList[namespace][instance]) == 0 { - delete(s.jobList[namespace], instance) - } + jobKey := JobIdentifier{Namespace: namespace, Name: instance} + if _, exists := s.jobList[jobKey]; exists { + delete(s.jobList[jobKey], jobID) + } + + if len(s.jobList[jobKey]) == 0 { + delete(s.jobList, jobKey) } } func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) error { - jobListLock.Lock() - defer jobListLock.Unlock() + jobListLock.RLock() + defer jobListLock.RUnlock() log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) - if _, exists := s.jobList[namespace]; exists { - for jid, cancelJob := range s.jobList[namespace][instance] { - if convertJobIdToInt(jid) < convertJobIdToInt(jobID) { - // only cancel jobs prior to the delete job - log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, found previous job id: %s", jid) - if cancelJob != nil { - cancelJob() - log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", jid) - } + jobKey := JobIdentifier{Namespace: namespace, Name: instance} + for jid, cancelJob := range s.jobList[jobKey] { + if convertJobIdToInt(jid) < convertJobIdToInt(jobID) { + // only cancel jobs prior to the delete job + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, found previous job id: %s", jid) + if cancelJob != nil { + cancelJob() + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", jid) } } } From 9ab080d6b8496079d705e9f4751d350aeff79f3f Mon Sep 17 00:00:00 2001 From: linyguo Date: Fri, 13 Dec 2024 14:41:22 +0800 Subject: [PATCH 6/7] remove component-wise cancel --- .../managers/solution/solution-manager.go | 52 ++++++++----------- 1 file changed, 22 insertions(+), 30 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index 2be8223e7..be6bb6c49 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -540,38 +540,30 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy // } for i := 0; i < retryCount; i++ { - select { - case <-ctx.Done(): - // Context canceled or timed out - log.DebugfCtx(ctx, " M (Solution): reconcile canceled") - err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) - return summary, err - default: - componentResults, stepError = (provider.(tgt.ITargetProvider)).Apply(ctx, dep, step, deployment.IsDryRun) - if stepError == nil { - targetResult[step.Target] = 1 - summary.AllAssignedDeployed = plannedCount == planSuccessCount - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults}) - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) - return summary, err - } + componentResults, stepError = (provider.(tgt.ITargetProvider)).Apply(ctx, dep, step, deployment.IsDryRun) + if stepError == nil { + targetResult[step.Target] = 1 + summary.AllAssignedDeployed = plannedCount == planSuccessCount + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults}) + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, deployment.Generation, deployment.Hash, summary, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + return summary, err + } + break + } else { + targetResult[step.Target] = 0 + summary.AllAssignedDeployed = false + targetResultStatus := fmt.Sprintf("%s Failed", deploymentType) + targetResultMessage := fmt.Sprintf("An error occurred in %s, err: %s", deploymentType, stepError.Error()) + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: targetResultStatus, Message: targetResultMessage, ComponentResults: componentResults}) // TODO: this keeps only the last error on the target + + if v1alpha2.IsCanceled(stepError) { + log.ErrorfCtx(ctx, " M (Solution): reconcile canceled: %+v", stepError) break - } else { - targetResult[step.Target] = 0 - summary.AllAssignedDeployed = false - targetResultStatus := fmt.Sprintf("%s Failed", deploymentType) - targetResultMessage := fmt.Sprintf("An error occurred in %s, err: %s", deploymentType, stepError.Error()) - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: targetResultStatus, Message: targetResultMessage, ComponentResults: componentResults}) // TODO: this keeps only the last error on the target - - if v1alpha2.IsCanceled(stepError) { - log.ErrorfCtx(ctx, " M (Solution): reconcile canceled: %+v", stepError) - break - } - - time.Sleep(5 * time.Second) //TODO: make this configurable? } + + time.Sleep(5 * time.Second) //TODO: make this configurable? } } if stepError != nil { From d4d7259693c75cf510c51c9d6305e0efc65682c8 Mon Sep 17 00:00:00 2001 From: linyguo Date: Thu, 13 Mar 2025 15:29:31 +0800 Subject: [PATCH 7/7] resolve rebase error --- api/pkg/apis/v1alpha1/managers/solution/solution-manager.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index c992f87ee..9c6b5d9e9 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -434,8 +434,6 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy retryCount := 1 //TODO: set to 1 for now. Although retrying can help to handle transient errors, in more cases // an error condition can't be resolved quickly. - var stepError error - var componentResults map[string]model.ComponentResultSpec // for _, component := range step.Components { // for k, v := range component.Component.Properties {