Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
0207ca7
add cancel context for solution manager
linyguo Oct 31, 2024
5ee0e1c
update reconcile cancel process
linyguo Nov 1, 2024
a66f824
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Nov 1, 2024
6f0203f
update solution manager
linyguo Nov 8, 2024
210d6ab
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Nov 8, 2024
16d484b
Merge branch 'main' into users/lingyun/cancel
linyguo Nov 12, 2024
94a37ce
update comments
linyguo Nov 15, 2024
9353be9
update job list
linyguo Dec 6, 2024
3a47b06
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Dec 6, 2024
33b48c5
Merge branch 'main' into users/lingyun/cancel
linyguo Dec 10, 2024
9ab080d
remove component-wise cancel
linyguo Dec 13, 2024
4267300
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Dec 17, 2024
707c48a
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Dec 24, 2024
ec37e4c
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Feb 24, 2025
c37437a
Merge remote-tracking branch 'upstream/main' into users/lingyun/cancel
linyguo Mar 13, 2025
d4d7259
resolve rebase error
linyguo Mar 13, 2025
829bfef
Merge branch 'main' into users/lingyun/cancel
coderdjw Mar 14, 2025
60305b5
Merge branch 'main' into users/lingyun/cancel
coderdjw Mar 14, 2025
7142720
Merge branch 'main' into users/lingyun/cancel
coderdjw Mar 17, 2025
e31a48a
Merge branch 'main' into users/lingyun/cancel
coderdjw Mar 18, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
359 changes: 235 additions & 124 deletions api/pkg/apis/v1alpha1/managers/solution/solution-manager.go

Large diffs are not rendered by default.

23 changes: 23 additions & 0 deletions api/pkg/apis/v1alpha1/utils/apiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ type (
Dispatcher interface {
QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error
QueueDeploymentJob(ctx context.Context, namespace string, isDelete bool, deployment model.DeploymentSpec, user string, password string) error
CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error
}

ApiClient interface {
Expand Down Expand Up @@ -537,6 +538,28 @@ func (a *apiClient) QueueDeploymentJob(ctx context.Context, namespace string, is
return nil
}

func (a *apiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error {
// func (a *apiClient) CancelDeploymentJob(ctx context.Context, namespace string, deployment model.DeploymentSpec) error {
token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password)
if err != nil {
return err
}

path := "solution/cancel"
query := url.Values{
"namespace": []string{namespace},
"instance": []string{id},
"jobid": []string{jobId},
}

log.DebugfCtx(ctx, "apiClient.CancelDeploymentJob: Deployment id: %s, namespace: %v", id, namespace)
_, err = a.callRestAPI(ctx, fmt.Sprintf("%s?%s", path, query.Encode()), "POST", nil, token)
if err != nil {
return err
}
return nil
}

// Deprecated: Use QueueDeploymentJob instead
func (a *apiClient) QueueJob(ctx context.Context, id string, namespace string, isDelete bool, isTarget bool, user string, password string) error {
token, err := a.tokenProvider(ctx, a.baseUrl, a.client, user, password)
Expand Down
6 changes: 1 addition & 5 deletions api/pkg/apis/v1alpha1/vendors/job-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,7 @@ func (e *JobVendor) Init(config vendors.VendorConfig, factories []managers.IMana
},
})

if err != nil {
return err
}

return nil
return err
}

func (o *JobVendor) GetEndpoints() []v1alpha2.Endpoint {
Expand Down
34 changes: 33 additions & 1 deletion api/pkg/apis/v1alpha1/vendors/solution-vendor.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint {
Parameters: []string{"delete?"},
Handler: o.onReconcile,
},
{
Methods: []string{fasthttp.MethodPost},
Route: route + "/cancel",
Version: o.Version,
Handler: o.onCancel,
},
{
Methods: []string{fasthttp.MethodGet, fasthttp.MethodPost, fasthttp.MethodDelete},
Route: route + "/queue",
Expand All @@ -82,6 +88,7 @@ func (o *SolutionVendor) GetEndpoints() []v1alpha2.Endpoint {
},
}
}

func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COAResponse {
rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{
"method": "onQueue",
Expand Down Expand Up @@ -161,6 +168,10 @@ func (c *SolutionVendor) onQueue(request v1alpha2.COARequest) v1alpha2.COARespon
})
}
instance = deployment.Instance.ObjectMeta.Name

if delete == "true" {
c.SolutionManager.CancelPreviousJobs(ctx, namespace, instance, deployment.JobID)
}
}

if instance == "" {
Expand Down Expand Up @@ -258,7 +269,8 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe
targetName = v
}
}
summary, err := c.SolutionManager.Reconcile(ctx, deployment, delete == "true", namespace, targetName)

summary, err := c.SolutionManager.ReconcileWithCancelWrapper(ctx, deployment, delete == "true", namespace, targetName)
data, _ := json.Marshal(summary)
if err != nil {
sLog.ErrorfCtx(ctx, "V (Solution): onReconcile failed POST - reconcile %s", err.Error())
Expand All @@ -281,6 +293,26 @@ func (c *SolutionVendor) onReconcile(request v1alpha2.COARequest) v1alpha2.COARe
})
}

func (c *SolutionVendor) onCancel(request v1alpha2.COARequest) v1alpha2.COAResponse {
rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{
"method": "onCancel",
})
defer span.End()

namespace := request.Parameters["namespace"]
instance := request.Parameters["instance"]
jobId := request.Parameters["jobId"]

sLog.InfofCtx(rContext, "V (Solution): onCancel instance: %s job ID: %s", instance, jobId)
c.SolutionManager.CancelPreviousJobs(rContext, namespace, instance, jobId)

return observ_utils.CloseSpanWithCOAResponse(span, v1alpha2.COAResponse{
State: v1alpha2.OK,
Body: []byte("{\"result\":\"200 - OK\"}"),
ContentType: "application/json",
})
}

func (c *SolutionVendor) onApplyDeployment(request v1alpha2.COARequest) v1alpha2.COAResponse {
rContext, span := observability.StartSpan("Solution Vendor", request.Context, &map[string]string{
"method": "onApplyDeployment",
Expand Down
2 changes: 1 addition & 1 deletion api/pkg/apis/v1alpha1/vendors/solution-vendor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,7 @@ func TestSolutionEndpoints(t *testing.T) {
vendor := createSolutionVendor()
vendor.Route = "solution"
endpoints := vendor.GetEndpoints()
assert.Equal(t, 3, len(endpoints))
assert.Equal(t, 4, len(endpoints))
}

func TestSolutionInfo(t *testing.T) {
Expand Down
9 changes: 9 additions & 0 deletions coa/pkg/apis/v1alpha2/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,15 @@ func IsNotFound(err error) bool {
}
return coaE.State == NotFound
}

func IsCanceled(err error) bool {
coaE, ok := err.(COAError)
if !ok {
return false
}
return coaE.State == Canceled
}

func IsDelayed(err error) bool {
coaE, ok := err.(COAError)
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions coa/pkg/apis/v1alpha2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ const (
ValidateFailed State = 8003
Updated State = 8004
Deleted State = 8005
Canceled State = 8006
// Workflow status
Running State = 9994
Paused State = 9995
Expand Down Expand Up @@ -222,6 +223,8 @@ func (s State) String() string {
return "Updated"
case Deleted:
return "Deleted"
case Canceled:
return "Canceled"
case Running:
return "Running"
case Paused:
Expand Down
6 changes: 6 additions & 0 deletions k8s/testing/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,12 @@ func (c *MockApiClient) QueueDeploymentJob(ctx context.Context, namespace string
return args.Error(0)
}

// CancelDeploymentJob implements utils.ApiClient.
func (c *MockApiClient) CancelDeploymentJob(ctx context.Context, id string, jobId string, namespace string, user string, password string) error {
args := c.Called(ctx, namespace, id, jobId, namespace)
return args.Error(0)
}

// QueueJob implements ApiClient.
// Deprecated and not used.
func (c *MockApiClient) QueueJob(ctx context.Context, id string, scope string, isDelete bool, isTarget bool, user string, password string) error {
Expand Down
Loading