Support suspend semantics for MPIJob#511
Support suspend semantics for MPIJob#511google-oss-prow[bot] merged 2 commits intokubeflow:masterfrom
Conversation
|
@alculquicondor @tenzen-y WIP but ready for early feedback (would be good as this is my first PR in this repo). PTAL. |
alculquicondor
left a comment
There was a problem hiding this comment.
FYI, the common repo is on its way to disappearing. I would suggest copying the RunPolicy struct here and adding the field.
| if c.gangSchedulerName != "" { | ||
| if err := c.deletePodGroups(mpiJob); err != nil { | ||
| return err | ||
| } | ||
| } |
There was a problem hiding this comment.
@tenzen-y are you familiar with the volcano integration?
I wonder if we need to remove the pod group on suspension. Does it matter?
There was a problem hiding this comment.
Do podgroups allocate any node resources, as pods, or they are just API objects (as services)?
I do not delete services and other API objects on suspension. I also don't delete the launcher job, just suspend it.
There was a problem hiding this comment.
AFAIK, a podgroup is just a declaration that some pods should be treated as a unit. But a podgroup doesn't create other objects.
There was a problem hiding this comment.
The scheduling.volcano.sh/v1beta1 PodGroup also has queueing logic. So we might need to delete PodGroup to re-queue Pods.
There was a problem hiding this comment.
Maybe, we should create E2E with the volcano in a separate PR.
There was a problem hiding this comment.
probably someone with volcano experience should do it.
There was a problem hiding this comment.
I looked at the volcano code a little bit on my own, but could not conclude if it may create the pods under some scenarios.
Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT?
+1 for the idea of an e2e test employing with podgroups. I guess we could create a follow-up Issue and ask for people willing to do it.
There was a problem hiding this comment.
Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT?
That sounds good to me.
| if err != nil { | ||
| return err | ||
| if !isMPIJobSuspended(mpiJob) { | ||
| worker, err = c.getOrCreateWorker(mpiJob) |
There was a problem hiding this comment.
also create the podgroup conditionally?
There was a problem hiding this comment.
Now I create all resources for as long as the MPIJob is suspended. This is the common workflow for Kueue, where the Job is created suspended. In some cases, when the job never get unsuspended (for whatever reason), we can save on creating the objects.
There was a problem hiding this comment.
For now reverted the change - sorry for back end forth - this is just to minimize the diff as the unit tests currently verify the objects are created. Let me know what you think.
|
|
||
| // first set StartTime. | ||
| if mpiJob.Status.StartTime == nil { | ||
| if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) { |
There was a problem hiding this comment.
if the job was suspended, we should reset the StartTime. Double check how we do it in the job controller.
There was a problem hiding this comment.
Done - I've added the JobSuspended condition and replicated the semantics around StartTime.
However, the .spec.runPolicy.activeDeadlineSeconds is actually respected for MPIJobs via batch.Job:
mpi-operator/pkg/controller/mpi_job_controller.go
Line 1320 in 382da78
This means that the changes aren't strictly required to enforce the timeout. WDYT?
I see, but is common reused by other subprojects too, right? So we would also need to copy the contents of common into these repos. Sounds like a lot of work, maybe simple, but the diffs will be big and one needs to be careful, so not sure we want to block the suspend work on that? Also, is this effort already planned, or in progress @alculquicondor @tenzen-y ? |
Yes, that's right. We are using common repo in training-operator. However, we are planning to consolidate common codes to the training-operator repo. |
@alculquicondor I agree with adding a |
What about the other constants, like the once defining conditions? I guess we could have a PR to just copy |
77e56c5 to
ed0bcd7
Compare
Sounds good to me. Although, let me know what other members think. |
|
sgtm |
|
Sounds good |
ed0bcd7 to
da1e019
Compare
|
@tenzen-y @alculquicondor I've opened the preparatory PR here: #513. Please review. |
aaf12e6 to
7634c74
Compare
|
@terrytangyuan Please approve CI |
7634c74 to
fd68512
Compare
13c54ea to
4e96e9a
Compare
alculquicondor
left a comment
There was a problem hiding this comment.
Can you add a unit test?
Also, I'm not convinced of the value of an E2E test over unit+integration. Do you have a particular justification?
| type: | ||
| type: string | ||
| enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"] | ||
| enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"] |
There was a problem hiding this comment.
Was this autogenerated?
If so, I'm curious to know how it worked.
We should probably use it in Kueue, where applicable
There was a problem hiding this comment.
Nope, I updated it manually.
There was a problem hiding this comment.
Uhm... then make sure that make generate doesn't override it.
There was a problem hiding this comment.
@tenzen-y is this expected?
Or is this related to your other PR?
There was a problem hiding this comment.
Yes, this is a known issue, and this will be fixed by #510.
There was a problem hiding this comment.
Yeah, but it is needed for the e2e test (which I believe is worth adding).
There was a problem hiding this comment.
Yeah, keep the manual change, and #510 should automatize it.
|
|
||
| // first set StartTime. | ||
| if mpiJob.Status.StartTime == nil { | ||
| if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) { |
pkg/controller/mpi_job_controller.go
Outdated
| } | ||
| if isMPIJobSuspended(mpiJob) { | ||
| // it is suspended now | ||
| if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") { |
There was a problem hiding this comment.
| if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "MPIJobSuspended", "MPIJob suspended") { | |
| if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, "Suspended", "MPIJob suspended") { |
I don't think we need the redundancy.
There was a problem hiding this comment.
Yeah, I thought so to, but this seems to be a convention here. For now, I stick to the convention but added the reason to the list.
|
|
||
| func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job { | ||
| return &batchv1.Job{ | ||
| job := &batchv1.Job{ |
There was a problem hiding this comment.
Food for thought:
We will probably need some kind of Job annotation that tells kueue that this Job is already queued as part of a higher level object (MPIJob in this case), so that we simply ignore it.
There was a problem hiding this comment.
Good point. IIUC, this affects only Kueue configurations when ManageJobsWithoutQueueName=true, which is non-default. We could have an annotation, yes, or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.
There was a problem hiding this comment.
or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.
The dependency could be indirect. And we don't want to spend a GET call to obtain such information.
There was a problem hiding this comment.
I see, but still we should strive to keep the integration interface as small as possible - every extension to the surface of the interface will be multiplied by the number of projects, but maybe a new annotation is not that bad. Also, maybe we can have a hybrid approach.
580b061 to
7357382
Compare
Done, ended up adding 3 actually: for creating suspended MPIJob, suspending if running and resuming.
I think of two reasons:
|
| type: | ||
| type: string | ||
| enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"] | ||
| enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"] |
| updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") | ||
| msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) | ||
| updateMPIJobConditions(mpiJobCopy, common.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) | ||
| f.expectUpdateMPIJobStatusAction(mpiJobCopy) |
There was a problem hiding this comment.
The code above checks that already:
mpiJobCopy.Status.ReplicaStatuses = map[common.ReplicaType]*common.ReplicaStatus{
common.ReplicaType(kubeflow.MPIReplicaTypeLauncher): {},
common.ReplicaType(kubeflow.MPIReplicaTypeWorker): {},
}It does not specify Active, meaning it checks the value is 0.
The workers were never created in this scenario, so I cannot assert on delete actions.
There was a problem hiding this comment.
but you can assert that there were no pods were created
the status is not the same as pods being created, necessarily
There was a problem hiding this comment.
but if there were some actions to create a pod, and we didn't expect them, the test would fail (IIUC). For example, when I comment out expecting the status update, the test fails as follows:
1 unexpected actions: [{ActionImpl:{Namespace:default Verb:update Resource:kubeflow.org/v2beta1, Resource=mpijobs
There was a problem hiding this comment.
oh I see, so it's implicitly checked
| }) | ||
| }) | ||
|
|
||
| func resumeJob(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob { |
There was a problem hiding this comment.
For follow up: accept contexts in all these functions
There was a problem hiding this comment.
Will open a PR directly after this, doesn't seem it requires Issue for later.
There was a problem hiding this comment.
Similarly, going to open a PR to copy the MPIJob conditions from common.
|
/lgtm |
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: terrytangyuan The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
|
The other PR got merged first so this one will need to resolve conflicts :-) |
# Conflicts: # pkg/apis/kubeflow/v2beta1/types.go # pkg/controller/mpi_job_controller.go # pkg/controller/mpi_job_controller_status.go # pkg/controller/mpi_job_controller_test.go # test/integration/mpi_job_controller_test.go
7357382 to
f600aa4
Compare
- add unit tests for creating suspended, suspending and resuming - use fake clock for unit tests - do not return from the syncHandler after worker pods cleanup on suspend - this allows to continue with the MPIJob update in the same sync # Conflicts: # pkg/controller/mpi_job_controller.go
f600aa4 to
11e368e
Compare
|
/lgtm |
|
Still lgtm |
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them. So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511 Implementation details If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job. If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job. If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued. Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else. No Kueue-specific code leaked to Kuberay implementation Contributors from Kueue/Kubernetes cc'ed: @alculquicondor @mwielgus
Native Kubernetes Jobs have a suspend flag that allows to temporarily suspend a Job execution and resume it later, or start Jobs in a suspended state and have a custom controller, such as Kueue, decide later when to start them. So adding it to RayJob spec for consistency. Moreover, some frameworks like Kubeflow are adding it, so it becomes a standard functionality. An example implementation for MPIJob: kubeflow/mpi-operator#511 Implementation details If a RayJob is created with a spec.suspend == true, then RayCluster instance (with corresponding Kubernetes resources) is not created and the Ray job is not submitted to the cluster. The JobDeploymentStatus is set to Suspended and the corresponding event is issued. The RayJob remains in this state until somebody unsuspends the job. If suspend flips from true to false, then the RayJob controller immediately creates a RayCluster instance and submits the job. If suspend flips from false to true while Job is running, then the RayJob controller tries to gracefully stop the job and deletes the RayCluster instance (with underlying Kubernetes resources). The JobDeploymentStatus is set to Suspended; JobStatus is set to STOPPED and the corresponding event is issued. Edge case: suspend flag is ignored if a RayJob is submitted against an existing RayCluster instance (matched with ClusterSelector) since we can't delete a RayCluster created by somebody else. No Kueue-specific code leaked to Kuberay implementation Contributors from Kueue/Kubernetes cc'ed: @alculquicondor @mwielgus
It solves: #504