diff --git a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go index c82f850cd..9c6b5d9e9 100644 --- a/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go +++ b/api/pkg/apis/v1alpha1/managers/solution/solution-manager.go @@ -15,6 +15,7 @@ import ( "runtime/debug" "strconv" "strings" + "sync" "time" "github.com/eclipse-symphony/symphony/api/pkg/apis/v1alpha1/managers/solution/metrics" @@ -36,6 +37,7 @@ import ( var ( log = logger.NewLogger("coa.runtime") + jobListLock sync.RWMutex apiOperationMetrics *metrics.Metrics ) @@ -52,8 +54,15 @@ const ( Summary = "Summary" DeploymentState = "DeployState" + + defaultTimeout = 60 * time.Minute ) +type JobIdentifier struct { + Namespace string + Name string +} + type SolutionManager struct { SummaryManager TargetProviders map[string]tgt.ITargetProvider @@ -63,6 +72,7 @@ type SolutionManager struct { IsTarget bool TargetNames []string ApiClientHttp api_utils.ApiClient + jobList map[JobIdentifier]map[string]context.CancelFunc } type SolutionManagerDeploymentState struct { @@ -138,9 +148,15 @@ func (s *SolutionManager) Init(context *contexts.VendorContext, config managers. if err != nil { return err } + + s.initCancelMap() return nil } +func (s *SolutionManager) initCancelMap() { + s.jobList = make(map[JobIdentifier]map[string]context.CancelFunc) +} + func (s *SolutionManager) GetSummary(ctx context.Context, summaryId string, name string, namespace string) (model.SummaryResult, error) { return s.SummaryManager.GetSummary(ctx, fmt.Sprintf("%s-%s", "summary", summaryId), name, namespace) } @@ -350,147 +366,162 @@ func (s *SolutionManager) Reconcile(ctx context.Context, deployment model.Deploy plannedCount := 0 planSuccessCount := 0 + log.DebugfCtx(ctx, " M (Solution): count of plan.Steps: %v", len(plan.Steps)) for _, step := range plan.Steps { - log.DebugfCtx(ctx, " M (Solution): processing step with Role %s on target %s", step.Role, step.Target) - for _, component := range step.Components { - log.DebugfCtx(ctx, " M (Solution): processing component %s with action %s", component.Component.Name, component.Action) - } - if s.IsTarget && !api_utils.ContainsString(s.TargetNames, step.Target) { - continue - } + select { + case <-ctx.Done(): + // Context canceled or timed out + log.DebugCtx(ctx, " M (Solution): reconcile canceled") + err = v1alpha2.NewCOAError(nil, "Reconciliation was canceled.", v1alpha2.Canceled) + return summary, err + default: + log.DebugfCtx(ctx, " M (Solution): processing step with Role %s on target %s", step.Role, step.Target) + for _, component := range step.Components { + log.DebugfCtx(ctx, " M (Solution): processing component %s with action %s", component.Component.Name, component.Action) + } + if s.IsTarget && !api_utils.ContainsString(s.TargetNames, step.Target) { + continue + } - if targetName != "" && targetName != step.Target { - continue - } + if targetName != "" && targetName != step.Target { + continue + } - plannedCount++ + plannedCount++ - dep.ActiveTarget = step.Target - agent := findAgentFromDeploymentState(mergedState, step.Target) - if agent != "" { - col[ENV_NAME] = agent - } else { - delete(col, ENV_NAME) - } - var override tgt.ITargetProvider - role := step.Role - if role == "container" { - role = "instance" - } - if v, ok := s.TargetProviders[role]; ok { - override = v - } - var provider providers.IProvider - if override == nil { - targetSpec := s.getTargetStateForStep(step, deployment, previousDesiredState) - provider, err = sp.CreateProviderForTargetRole(s.Context, step.Role, targetSpec, override) - if err != nil { - summary.SummaryMessage = "failed to create provider:" + err.Error() - log.ErrorfCtx(ctx, " M (Solution): failed to create provider: %+v", err) - return summary, err + dep.ActiveTarget = step.Target + agent := findAgentFromDeploymentState(mergedState, step.Target) + if agent != "" { + col[ENV_NAME] = agent + } else { + delete(col, ENV_NAME) } - } else { - provider = override - } - var stepError error - var componentResults = make(map[string]model.ComponentResultSpec) - if previousDesiredState != nil { - testState := MergeDeploymentStates(&previousDesiredState.State, currentState) - if s.canSkipStep(ctx, step, step.Target, provider.(tgt.ITargetProvider), previousDesiredState.State.Components, testState) { - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults}) - log.InfofCtx(ctx, " M (Solution): skipping step with role %s on target %s", step.Role, step.Target) - targetResult[step.Target] = 1 - planSuccessCount++ - summary.CurrentDeployed += len(step.Components) - continue + var override tgt.ITargetProvider + role := step.Role + if role == "container" { + role = "instance" } - } - log.DebugfCtx(ctx, " M (Solution): applying step with Role %s on target %s", step.Role, step.Target) - someStepsRan = true - retryCount := 1 - //TODO: set to 1 for now. Although retrying can help to handle transient errors, in more cases - // an error condition can't be resolved quickly. - - // for _, component := range step.Components { - // for k, v := range component.Component.Properties { - // if strV, ok := v.(string); ok { - // parser := api_utils.NewParser(strV) - // eCtx := s.VendorContext.EvaluationContext.Clone() - // eCtx.DeploymentSpec = deployment - // eCtx.Component = component.Component.Name - // val, err := parser.Eval(*eCtx) - // if err == nil { - // component.Component.Properties[k] = val - // } else { - // log.ErrorfCtx(ctx, " M (Solution): failed to evaluate property: %+v", err) - // summary.SummaryMessage = fmt.Sprintf("failed to evaluate property '%s' on component '%s: %s", k, component.Component.Name, err.Error()) - // s.saveSummary(ctx, deployment, summary) - // return summary, err - // } - // } - // } - // } - - defaultScope := deployment.Instance.Spec.Scope - // ensure to restore the original scope defined in instance in case the scope is changed during the deployment - defer func() { - deployment.Instance.Spec.Scope = defaultScope - }() - for i := 0; i < retryCount; i++ { - deployment.Instance.Spec.Scope = getCurrentApplicationScope(ctx, deployment.Instance, deployment.Targets[step.Target]) - componentResults, stepError = (provider.(tgt.ITargetProvider)).Apply(ctx, dep, step, deployment.IsDryRun) - if stepError == nil { - targetResult[step.Target] = 1 - summary.AllAssignedDeployed = plannedCount == planSuccessCount - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults}) - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, summaryId, deployment.Generation, deployment.Hash, summary, namespace) + if v, ok := s.TargetProviders[role]; ok { + override = v + } + var provider providers.IProvider + if override == nil { + targetSpec := s.getTargetStateForStep(step, deployment, previousDesiredState) + provider, err = sp.CreateProviderForTargetRole(s.Context, step.Role, targetSpec, override) if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + summary.SummaryMessage = "failed to create provider:" + err.Error() + log.ErrorfCtx(ctx, " M (Solution): failed to create provider: %+v", err) return summary, err } - break } else { - targetResult[step.Target] = 0 - summary.AllAssignedDeployed = false - targetResultStatus := fmt.Sprintf("%s Failed", deploymentType) - targetResultMessage := fmt.Sprintf("An error occurred in %s, err: %s", deploymentType, stepError.Error()) - summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: targetResultStatus, Message: targetResultMessage, ComponentResults: componentResults}) // TODO: this keeps only the last error on the target - time.Sleep(5 * time.Second) //TODO: make this configurable? + provider = override } - deployment.Instance.Spec.Scope = defaultScope - } - if stepError != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to execute deployment step: %+v", stepError) + var stepError error + var componentResults = make(map[string]model.ComponentResultSpec) + if previousDesiredState != nil { + testState := MergeDeploymentStates(&previousDesiredState.State, currentState) + if s.canSkipStep(ctx, step, step.Target, provider.(tgt.ITargetProvider), previousDesiredState.State.Components, testState) { + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults}) + log.InfofCtx(ctx, " M (Solution): skipping step with role %s on target %s", step.Role, step.Target) + targetResult[step.Target] = 1 + planSuccessCount++ + summary.CurrentDeployed += len(step.Components) + continue + } + } + log.DebugfCtx(ctx, " M (Solution): applying step with Role %s on target %s", step.Role, step.Target) + someStepsRan = true + retryCount := 1 + //TODO: set to 1 for now. Although retrying can help to handle transient errors, in more cases + // an error condition can't be resolved quickly. + + // for _, component := range step.Components { + // for k, v := range component.Component.Properties { + // if strV, ok := v.(string); ok { + // parser := api_utils.NewParser(strV) + // eCtx := s.VendorContext.EvaluationContext.Clone() + // eCtx.DeploymentSpec = deployment + // eCtx.Component = component.Component.Name + // val, err := parser.Eval(*eCtx) + // if err == nil { + // component.Component.Properties[k] = val + // } else { + // log.ErrorfCtx(ctx, " M (Solution): failed to evaluate property: %+v", err) + // summary.SummaryMessage = fmt.Sprintf("failed to evaluate property '%s' on component '%s: %s", k, component.Component.Name, err.Error()) + // s.saveSummary(ctx, deployment, summary) + // return summary, err + // } + // } + // } + // } + + defaultScope := deployment.Instance.Spec.Scope + // ensure to restore the original scope defined in instance in case the scope is changed during the deployment + defer func() { + deployment.Instance.Spec.Scope = defaultScope + }() + for i := 0; i < retryCount; i++ { + deployment.Instance.Spec.Scope = getCurrentApplicationScope(ctx, deployment.Instance, deployment.Targets[step.Target]) + componentResults, stepError = (provider.(tgt.ITargetProvider)).Apply(ctx, dep, step, deployment.IsDryRun) + if stepError == nil { + targetResult[step.Target] = 1 + summary.AllAssignedDeployed = plannedCount == planSuccessCount + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: "OK", Message: "", ComponentResults: componentResults}) + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, summaryId, deployment.Generation, deployment.Hash, summary, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + return summary, err + } + break + } else { + targetResult[step.Target] = 0 + summary.AllAssignedDeployed = false + targetResultStatus := fmt.Sprintf("%s Failed", deploymentType) + targetResultMessage := fmt.Sprintf("An error occurred in %s, err: %s", deploymentType, stepError.Error()) + summary.UpdateTargetResult(step.Target, model.TargetResultSpec{Status: targetResultStatus, Message: targetResultMessage, ComponentResults: componentResults}) // TODO: this keeps only the last error on the target + + if v1alpha2.IsCanceled(stepError) { + log.ErrorfCtx(ctx, " M (Solution): reconcile canceled: %+v", stepError) + break + } - successCount := 0 - for _, v := range targetResult { - successCount += v + time.Sleep(5 * time.Second) //TODO: make this configurable? + } + deployment.Instance.Spec.Scope = defaultScope } - deployedCount := 0 - for _, ret := range componentResults { - if (!remove && ret.Status == v1alpha2.Updated) || (remove && ret.Status == v1alpha2.Deleted) { - // TODO: need to ensure the status updated correctly on returning from target providers. - deployedCount += 1 + if stepError != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to execute deployment step: %+v", stepError) + + successCount := 0 + for _, v := range targetResult { + successCount += v + } + deployedCount := 0 + for _, ret := range componentResults { + if (!remove && ret.Status == v1alpha2.Updated) || (remove && ret.Status == v1alpha2.Deleted) { + // TODO: need to ensure the status updated correctly on returning from target providers. + deployedCount += 1 + } } + summary.CurrentDeployed += deployedCount + if deployment.IsDryRun || deployment.IsInActive { + summary.SuccessCount = 0 + } else { + summary.SuccessCount = successCount + } + summary.AllAssignedDeployed = plannedCount == planSuccessCount + err = stepError + return summary, err } - summary.CurrentDeployed += deployedCount - if deployment.IsDryRun || deployment.IsInActive { - summary.SuccessCount = 0 - } else { - summary.SuccessCount = successCount + planSuccessCount++ + summary.CurrentDeployed += len(step.Components) + err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, summaryId, deployment.Generation, deployment.Hash, summary, namespace) + if err != nil { + log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) + return summary, err } - summary.AllAssignedDeployed = plannedCount == planSuccessCount - err = stepError - return summary, err - } - planSuccessCount++ - summary.CurrentDeployed += len(step.Components) - err = s.saveSummaryProgress(ctx, deployment.Instance.ObjectMeta.Name, summaryId, deployment.Generation, deployment.Hash, summary, namespace) - if err != nil { - log.ErrorfCtx(ctx, " M (Solution): failed to save summary progress: %+v", err) - return summary, err + log.DebugfCtx(ctx, " M (Solution): reconcile save summary progress: current deployed %v out of total %v deployments", summary.CurrentDeployed, summary.PlannedDeployment) } - log.DebugfCtx(ctx, " M (Solution): reconcile save summary progress: current deployed %v out of total %v deployments", summary.CurrentDeployed, summary.PlannedDeployment) } mergedState.ClearAllRemoved() @@ -751,6 +782,86 @@ func getCurrentApplicationScope(ctx context.Context, instance model.InstanceStat return instance.Spec.Scope } +func (s *SolutionManager) ReconcileWithCancelWrapper(ctx context.Context, deployment model.DeploymentSpec, remove bool, namespace string, targetName string) (model.SummarySpec, error) { + instance := deployment.Instance.ObjectMeta.Name + log.InfofCtx(ctx, " M (Solution): onReconcile create context with timeout, instance: %s, job id: %s, isRemove: %v", instance, deployment.JobID, remove) + + // Two conditions when the context will be canceled: + // 1. default timeout reached; + // 2. cancel() is called + cancelCtx, cancel := context.WithTimeout(ctx, defaultTimeout) + if !remove { + // Track the CancelFunc for ongoing reconcile job + s.addCancelFunc(ctx, namespace, instance, deployment.JobID, cancel) + } + + defer func() { + // call cancel to release resource and clean up the cancelFunc in jobList + log.InfofCtx(ctx, " M (Solution): onReconcile complete, namespace: %s, instance: %s, job id: %s, isRemove: %v", namespace, instance, deployment.JobID, remove) + cancel() + if !remove { + s.cleanUpCancelFunc(namespace, instance, deployment.JobID) + } + }() + + summary, err := s.Reconcile(cancelCtx, deployment, remove, namespace, targetName) + return summary, err +} + +func (s *SolutionManager) addCancelFunc(ctx context.Context, namespace string, instance string, jobID string, cancel context.CancelFunc) { + jobListLock.Lock() + defer jobListLock.Unlock() + log.InfofCtx(ctx, " M (Solution): AddCancelFunc, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) + + jobKey := JobIdentifier{Namespace: namespace, Name: instance} + if _, exists := s.jobList[jobKey]; !exists { + s.jobList[jobKey] = make(map[string]context.CancelFunc) + } + + s.jobList[jobKey][jobID] = cancel +} + +func (s *SolutionManager) cleanUpCancelFunc(namespace string, instance string, jobID string) { + jobListLock.Lock() + defer jobListLock.Unlock() + + jobKey := JobIdentifier{Namespace: namespace, Name: instance} + if _, exists := s.jobList[jobKey]; exists { + delete(s.jobList[jobKey], jobID) + } + + if len(s.jobList[jobKey]) == 0 { + delete(s.jobList, jobKey) + } +} + +func (s *SolutionManager) CancelPreviousJobs(ctx context.Context, namespace string, instance string, jobID string) error { + jobListLock.RLock() + defer jobListLock.RUnlock() + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, namespace: %s, instance: %s, job id: %s", namespace, instance, jobID) + + jobKey := JobIdentifier{Namespace: namespace, Name: instance} + for jid, cancelJob := range s.jobList[jobKey] { + if convertJobIdToInt(jid) < convertJobIdToInt(jobID) { + // only cancel jobs prior to the delete job + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, found previous job id: %s", jid) + if cancelJob != nil { + cancelJob() + log.InfofCtx(ctx, " M (Solution): CancelPreviousJobs, cancelled job id: %s", jid) + } + } + } + return nil +} + +func convertJobIdToInt(jobID string) int { + num, err := strconv.Atoi(jobID) + if err != nil { + return 0 + } + return num +} + func findAgentFromDeploymentState(state model.DeploymentState, targetName string) string { for _, targetDes := range state.Targets { if targetName == targetDes.Name { diff --git a/api/pkg/apis/v1alpha1/utils/apiclient.go b/api/pkg/apis/v1alpha1/utils/apiclient.go index 180d78da7..eeca67f57 100644 --- a/api/pkg/apis/v1alpha1/utils/apiclient.go +++ b/api/pkg/apis/v1alpha1/utils/apiclient.go @@ -50,6 +50,7 @@ type ( Dispatcher interface { QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error QueueDeploymentJob(ctx context.Context, namespace string, isDelete bool, deployment model.DeploymentSpec, user string, password string) error + CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error } ApiClient interface { @@ -537,6 +538,28 @@ func (a *apiClient) QueueDeploymentJob(ctx context.Context, namespace string, is return nil } +func (a *apiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error { + // func (a *apiClient) CancelDeploymentJob(ctx context.Context, namespace string, deployment model.DeploymentSpec) error { + token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password) + if err != nil { + return err + } + + path := "solution/cancel" + query := url.Values{ + "namespace": []string{namespace}, + "instance": []string{id}, + "jobid": []string{jobId}, + } + + log.DebugfCtx(ctx, "apiClient.CancelDeploymentJob: Deployment id: %s, namespace: %v", id, namespace) + _, err = a.callRestAPI(ctx, fmt.Sprintf("%s?%s", path, query.Encode()), "POST", nil, token) + if err != nil { + return err + } + return nil +} + // Deprecated: Use QueueDeploymentJob instead func (a *apiClient) QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error { token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password) diff --git a/api/pkg/apis/v1alpha1/vendors/job-vendor.go b/api/pkg/apis/v1alpha1/vendors/job-vendor.go index b614556a3..e69adb929 100644 --- a/api/pkg/apis/v1alpha1/vendors/job-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/job-vendor.go @@ -97,11 +97,7 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana }, }) - if err != nil { - return err - } - - return nil + return err } func (o *JobVendor) GetEndpoints() []v1alpha2.Endpoint { diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go index 2a8a49bb2..9ce7eda2a 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor.go @@ -74,6 +74,12 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint { Parameters: []string{"delete?"}, Handler: o.onReconcile, }, + { + Methods: []string{fasthttp.MethodPost}, + Route: route + "/cancel", + Version: o.Version, + Handler: o.onCancel, + }, { Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodDelete}, Route: route + "/queue", @@ -82,6 +88,7 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint { }, } } + func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onQueue", @@ -161,6 +168,10 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon }) } instance = deployment.Instance.ObjectMeta.Name + + if delete == "true" { + c.SolutionManager.CancelPreviousJobs(ctx, namespace, instance, deployment.JobID) + } } if instance == "" { @@ -258,7 +269,8 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe targetName = v } } - summary, err := c.SolutionManager.Reconcile(ctx, deployment, delete == "true", namespace, targetName) + + summary, err := c.SolutionManager.ReconcileWithCancelWrapper(ctx, deployment, delete == "true", namespace, targetName) data, _ := json.Marshal(summary) if err != nil { sLog.ErrorfCtx(ctx, "V (Solution): onReconcile failed POST - reconcile %s", err.Error()) @@ -281,6 +293,26 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe }) } +func (c *SolutionVendor) onCancel(request v1alpha2.COARequest) v1alpha2.COAResponse { + rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ + "method": "onCancel", + }) + defer span.End() + + namespace := request.Parameters["namespace"] + instance := request.Parameters["instance"] + jobId := request.Parameters["jobId"] + + sLog.InfofCtx(rContext, "V (Solution): onCancel instance: %s job ID: %s", instance, jobId) + c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, jobId) + + return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{ + State: v1alpha2.OK, + Body: []byte("{\"result\":\"200 - OK\"}"), + ContentType: "application/json", + }) +} + func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2.COAResponse { rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{ "method": "onApplyDeployment", diff --git a/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go b/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go index 8a6779587..e0253d1fa 100644 --- a/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go +++ b/api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go @@ -189,7 +189,7 @@ func TestSolutionEndpoints(t *testing.T) { vendor := createSolutionVendor() vendor.Route = "solution" endpoints := vendor.GetEndpoints() - assert.Equal(t, 3, len(endpoints)) + assert.Equal(t, 4, len(endpoints)) } func TestSolutionInfo(t *testing.T) { diff --git a/coa/pkg/apis/v1alpha2/errors.go b/coa/pkg/apis/v1alpha2/errors.go index fe21ca5c8..a36c2c367 100644 --- a/coa/pkg/apis/v1alpha2/errors.go +++ b/coa/pkg/apis/v1alpha2/errors.go @@ -122,6 +122,15 @@ func IsNotFound(err error) bool { } return coaE.State == NotFound } + +func IsCanceled(err error) bool { + coaE, ok := err.(COAError) + if !ok { + return false + } + return coaE.State == Canceled +} + func IsDelayed(err error) bool { coaE, ok := err.(COAError) if !ok { diff --git a/coa/pkg/apis/v1alpha2/types.go b/coa/pkg/apis/v1alpha2/types.go index 451ddd1f9..afbd8f351 100644 --- a/coa/pkg/apis/v1alpha2/types.go +++ b/coa/pkg/apis/v1alpha2/types.go @@ -55,6 +55,7 @@ const ( ValidateFailed State = 8003 Updated State = 8004 Deleted State = 8005 + Canceled State = 8006 // Workflow status Running State = 9994 Paused State = 9995 @@ -222,6 +223,8 @@ func (s State) String() string { return "Updated" case Deleted: return "Deleted" + case Canceled: + return "Canceled" case Running: return "Running" case Paused: diff --git a/k8s/testing/mocks.go b/k8s/testing/mocks.go index 819b6fa98..580c0241d 100644 --- a/k8s/testing/mocks.go +++ b/k8s/testing/mocks.go @@ -307,6 +307,12 @@ func (c *MockApiClient) QueueDeploymentJob(ctx context.Context, namespace string return args.Error(0) } +// CancelDeploymentJob implements utils.ApiClient. +func (c *MockApiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error { + args := c.Called(ctx, namespace, id, jobId, namespace) + return args.Error(0) +} + // QueueJob implements ApiClient. // Deprecated and not used. func (c *MockApiClient) QueueJob(ctx context.Context, id string, scope string, isDelete bool, isTarget bool, user string, password string) error {