-
Notifications
You must be signed in to change notification settings - Fork 231
Support suspend semantics for MPIJob #511
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -33,6 +33,7 @@ import ( | |||
| "golang.org/x/crypto/ssh" | ||||
| batchv1 "k8s.io/api/batch/v1" | ||||
| corev1 "k8s.io/api/core/v1" | ||||
| v1 "k8s.io/api/core/v1" | ||||
| "k8s.io/apimachinery/pkg/api/equality" | ||||
| "k8s.io/apimachinery/pkg/api/errors" | ||||
| metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" | ||||
|
|
@@ -50,6 +51,8 @@ import ( | |||
| "k8s.io/client-go/tools/record" | ||||
| "k8s.io/client-go/util/workqueue" | ||||
| "k8s.io/klog" | ||||
| "k8s.io/utils/clock" | ||||
| "k8s.io/utils/pointer" | ||||
| podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" | ||||
| volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" | ||||
| podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" | ||||
|
|
@@ -243,6 +246,9 @@ type MPIJobController struct { | |||
|
|
||||
| // To allow injection of updateStatus for testing. | ||||
| updateStatusHandler func(mpijob *kubeflow.MPIJob) error | ||||
|
|
||||
| // Clock for internal use of unit-testing | ||||
| clock clock.WithTicker | ||||
| } | ||||
|
|
||||
| // NewMPIJobController returns a new MPIJob controller. | ||||
|
|
@@ -258,6 +264,26 @@ func NewMPIJobController( | |||
| podgroupsInformer podgroupsinformer.PodGroupInformer, | ||||
| mpiJobInformer informers.MPIJobInformer, | ||||
| gangSchedulerName string) *MPIJobController { | ||||
| return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet, | ||||
| configMapInformer, secretInformer, serviceInformer, jobInformer, | ||||
| podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName, | ||||
| &clock.RealClock{}) | ||||
| } | ||||
|
|
||||
| // NewMPIJobController returns a new MPIJob controller. | ||||
| func NewMPIJobControllerWithClock( | ||||
| kubeClient kubernetes.Interface, | ||||
| kubeflowClient clientset.Interface, | ||||
| volcanoClientSet volcanoclient.Interface, | ||||
| configMapInformer coreinformers.ConfigMapInformer, | ||||
| secretInformer coreinformers.SecretInformer, | ||||
| serviceInformer coreinformers.ServiceInformer, | ||||
| jobInformer batchinformers.JobInformer, | ||||
| podInformer coreinformers.PodInformer, | ||||
| podgroupsInformer podgroupsinformer.PodGroupInformer, | ||||
| mpiJobInformer informers.MPIJobInformer, | ||||
| gangSchedulerName string, | ||||
| clock clock.WithTicker) *MPIJobController { | ||||
|
|
||||
| // Create event broadcaster. | ||||
| klog.V(4).Info("Creating event broadcaster") | ||||
|
|
@@ -294,6 +320,7 @@ func NewMPIJobController( | |||
| queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), | ||||
| recorder: recorder, | ||||
| gangSchedulerName: gangSchedulerName, | ||||
| clock: clock, | ||||
| } | ||||
|
|
||||
| controller.updateStatusHandler = controller.doUpdateJobStatus | ||||
|
|
@@ -449,9 +476,9 @@ func (c *MPIJobController) processNextWorkItem() bool { | |||
| // converge the two. It then updates the Status block of the MPIJob resource | ||||
| // with the current status of the resource. | ||||
| func (c *MPIJobController) syncHandler(key string) error { | ||||
| startTime := time.Now() | ||||
| startTime := c.clock.Now() | ||||
| defer func() { | ||||
| klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) | ||||
| klog.Infof("Finished syncing job %q (%v)", key, c.clock.Since(startTime)) | ||||
| }() | ||||
|
|
||||
| // Convert the namespace/name string into a distinct namespace and name. | ||||
|
|
@@ -493,7 +520,7 @@ func (c *MPIJobController) syncHandler(key string) error { | |||
|
|
||||
| if len(mpiJob.Status.Conditions) == 0 { | ||||
| msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobCreated, mpiJobCreatedReason, msg) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) | ||||
| c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobCreated", msg) | ||||
| mpiJobsCreatedCount.Inc() | ||||
| } | ||||
|
|
@@ -503,24 +530,16 @@ func (c *MPIJobController) syncHandler(key string) error { | |||
| // cleanup and stop retrying the MPIJob. | ||||
| if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil { | ||||
| if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) { | ||||
| // set worker StatefulSet Replicas to 0. | ||||
| if err := c.deleteWorkerPods(mpiJob); err != nil { | ||||
| if err := cleanUpWorkerPods(mpiJob, c); err != nil { | ||||
| return err | ||||
| } | ||||
| initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) | ||||
| if c.gangSchedulerName != "" { | ||||
| if err := c.deletePodGroups(mpiJob); err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
| mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0 | ||||
| return c.updateStatusHandler(mpiJob) | ||||
| } | ||||
| return nil | ||||
| } | ||||
|
|
||||
| // first set StartTime. | ||||
| if mpiJob.Status.StartTime == nil { | ||||
| if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) { | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if the job was suspended, we should reset the StartTime. Double check how we do it in the job controller.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done - I've added the JobSuspended condition and replicated the semantics around StartTime. However, the mpi-operator/pkg/controller/mpi_job_controller.go Line 1320 in 382da78
This means that the changes aren't strictly required to enforce the timeout. WDYT?
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yay for using Job! |
||||
| now := metav1.Now() | ||||
| mpiJob.Status.StartTime = &now | ||||
| } | ||||
|
|
@@ -549,17 +568,18 @@ func (c *MPIJobController) syncHandler(key string) error { | |||
| return fmt.Errorf("creating SSH auth secret: %w", err) | ||||
| } | ||||
|
|
||||
| // Get the PodGroup for this MPIJob | ||||
| if c.gangSchedulerName != "" { | ||||
| if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil { | ||||
| if !isMPIJobSuspended(mpiJob) { | ||||
| // Get the PodGroup for this MPIJob | ||||
| if c.gangSchedulerName != "" { | ||||
| if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
| worker, err = c.getOrCreateWorker(mpiJob) | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. also create the podgroup conditionally?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now I create all resources for as long as the MPIJob is suspended. This is the common workflow for Kueue, where the Job is created suspended. In some cases, when the job never get unsuspended (for whatever reason), we can save on creating the objects.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. For now reverted the change - sorry for back end forth - this is just to minimize the diff as the unit tests currently verify the objects are created. Let me know what you think. |
||||
| if err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
|
|
||||
| worker, err = c.getOrCreateWorker(mpiJob) | ||||
| if err != nil { | ||||
| return err | ||||
| } | ||||
| if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel { | ||||
| // The Intel implementation requires workers to communicate with the | ||||
| // launcher through its hostname. For that, we create a Service which | ||||
|
|
@@ -578,6 +598,23 @@ func (c *MPIJobController) syncHandler(key string) error { | |||
| } | ||||
| } | ||||
|
|
||||
| if launcher != nil { | ||||
| if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) { | ||||
| // align the suspension state of launcher with the MPIJob | ||||
| launcher.Spec.Suspend = pointer.Bool(isMPIJobSuspended(mpiJob)) | ||||
| if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
| } | ||||
|
|
||||
| // cleanup the running worker pods if the MPI job is suspended | ||||
| if isMPIJobSuspended(mpiJob) { | ||||
| if err := cleanUpWorkerPods(mpiJob, c); err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
|
|
||||
| // Finally, we update the status block of the MPIJob resource to reflect the | ||||
| // current state of the world. | ||||
| err = c.updateMPIJobStatus(mpiJob, launcher, worker) | ||||
|
|
@@ -588,6 +625,21 @@ func (c *MPIJobController) syncHandler(key string) error { | |||
| return nil | ||||
| } | ||||
|
|
||||
| func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error { | ||||
| // set worker StatefulSet Replicas to 0. | ||||
| if err := c.deleteWorkerPods(mpiJob); err != nil { | ||||
| return err | ||||
| } | ||||
| initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) | ||||
| if c.gangSchedulerName != "" { | ||||
| if err := c.deletePodGroups(mpiJob); err != nil { | ||||
| return err | ||||
| } | ||||
| } | ||||
|
Comment on lines
+634
to
+638
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tenzen-y are you familiar with the volcano integration? I wonder if we need to remove the pod group on suspension. Does it matter?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do podgroups allocate any node resources, as pods, or they are just API objects (as services)? I do not delete services and other API objects on suspension. I also don't delete the launcher job, just suspend it.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. AFAIK, a podgroup is just a declaration that some pods should be treated as a unit. But a podgroup doesn't create other objects.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe, we should create E2E with the volcano in a separate PR.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. probably someone with volcano experience should do it.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I looked at the volcano code a little bit on my own, but could not conclude if it may create the pods under some scenarios. Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT? +1 for the idea of an e2e test employing with podgroups. I guess we could create a follow-up Issue and ask for people willing to do it.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That sounds good to me. |
||||
| mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0 | ||||
| return nil | ||||
| } | ||||
|
|
||||
| // getLauncherJob gets the launcher Job controlled by this MPIJob. | ||||
| func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) { | ||||
| launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) | ||||
|
|
@@ -857,6 +909,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 | |||
| return workerPods, nil | ||||
| } | ||||
|
|
||||
| func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool { | ||||
| return pointer.BoolDeref(mpiJob.Spec.RunPolicy.Suspend, false) | ||||
| } | ||||
|
|
||||
| func isJobSuspended(job *batchv1.Job) bool { | ||||
| return pointer.BoolDeref(job.Spec.Suspend, false) | ||||
| } | ||||
|
|
||||
| func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { | ||||
| var ( | ||||
| workerPrefix = mpiJob.Name + workerSuffix | ||||
|
|
@@ -901,6 +961,19 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { | |||
|
|
||||
| func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker []*corev1.Pod) error { | ||||
| oldStatus := mpiJob.Status.DeepCopy() | ||||
| if isMPIJobSuspended(mpiJob) { | ||||
| // it is suspended now | ||||
| if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") { | ||||
| c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobSuspended", "MPIJob suspended") | ||||
| } | ||||
| } else if getCondition(mpiJob.Status, kubeflow.JobSuspended) != nil { | ||||
| // it is not suspended now, consider resumed if the condition was set before | ||||
| if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, mpiJobResumedReason, "MPIJob resumed") { | ||||
| c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobResumed", "MPIJob resumed") | ||||
| now := metav1.NewTime(c.clock.Now()) | ||||
| mpiJob.Status.StartTime = &now | ||||
| } | ||||
| } | ||||
| launcherPods, err := c.jobPods(launcher) | ||||
| if err != nil { | ||||
| return fmt.Errorf("checking launcher pods running: %w", err) | ||||
|
|
@@ -919,7 +992,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher | |||
| if mpiJob.Status.CompletionTime == nil { | ||||
| mpiJob.Status.CompletionTime = launcher.Status.CompletionTime | ||||
| } | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, v1.ConditionTrue, mpiJobSucceededReason, msg) | ||||
| mpiJobsSuccessCount.Inc() | ||||
| } else if isJobFailed(launcher) { | ||||
| c.updateMPIJobFailedStatus(mpiJob, launcher, launcherPods) | ||||
|
|
@@ -953,13 +1026,16 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher | |||
| if evict > 0 { | ||||
| msg := fmt.Sprintf("%d/%d workers are evicted", evict, len(worker)) | ||||
| klog.Infof("MPIJob <%s/%s>: %v", mpiJob.Namespace, mpiJob.Name, msg) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobFailed, mpiJobEvict, msg) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobFailed, v1.ConditionTrue, mpiJobEvict, msg) | ||||
| c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg) | ||||
| } | ||||
|
|
||||
| if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { | ||||
| if isMPIJobSuspended(mpiJob) { | ||||
| msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) | ||||
| } else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { | ||||
| msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobRunning, mpiJobRunningReason, msg) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg) | ||||
| c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name) | ||||
| } | ||||
|
|
||||
|
|
@@ -999,7 +1075,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau | |||
| now := metav1.Now() | ||||
| mpiJob.Status.CompletionTime = &now | ||||
| } | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobFailed, reason, msg) | ||||
| updateMPIJobConditions(mpiJob, kubeflow.JobFailed, v1.ConditionTrue, reason, msg) | ||||
| mpiJobsFailureCount.Inc() | ||||
| } | ||||
|
|
||||
|
|
@@ -1304,7 +1380,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 | |||
| } | ||||
|
|
||||
| func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job { | ||||
| return &batchv1.Job{ | ||||
| job := &batchv1.Job{ | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Food for thought:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good point. IIUC, this affects only Kueue configurations when ManageJobsWithoutQueueName=true, which is non-default. We could have an annotation, yes, or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
The dependency could be indirect. And we don't want to spend a GET call to obtain such information.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I see, but still we should strive to keep the integration interface as small as possible - every extension to the surface of the interface will be multiplied by the number of projects, but maybe a new annotation is not that bad. Also, maybe we can have a hybrid approach. |
||||
| ObjectMeta: metav1.ObjectMeta{ | ||||
| Name: mpiJob.Name + launcherSuffix, | ||||
| Namespace: mpiJob.Namespace, | ||||
|
|
@@ -1322,6 +1398,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job | |||
| Template: c.newLauncherPodTemplate(mpiJob), | ||||
| }, | ||||
| } | ||||
| if isMPIJobSuspended(mpiJob) { | ||||
| job.Spec.Suspend = pointer.Bool(true) | ||||
| } | ||||
| return job | ||||
| } | ||||
|
|
||||
| // newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets | ||||
|
|
||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was this autogenerated?
If so, I'm curious to know how it worked.
Maybe this? https://github.com/kubeflow/common/blob/9ec55d141f90faaf52fd6df271e987e5a6781945/pkg/apis/common/v1/types.go#L112
We should probably use it in Kueue, where applicable
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, I updated it manually.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Uhm... then make sure that
make generatedoesn't override it.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it doesn't, checked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@tenzen-y is this expected?
Or is this related to your other PR?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can leave this to #510
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this is a known issue, and this will be fixed by #510.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, but it is needed for the e2e test (which I believe is worth adding).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, keep the manual change, and #510 should automatize it.