diff --git a/crd/kubeflow.org_mpijobs.yaml b/crd/kubeflow.org_mpijobs.yaml index 9998e2095..f4964d165 100644 --- a/crd/kubeflow.org_mpijobs.yaml +++ b/crd/kubeflow.org_mpijobs.yaml @@ -7866,6 +7866,18 @@ spec: format: int32 type: integer type: object + suspend: + default: false + description: "suspend specifies whether the MPIJob controller + should create Pods or not. If a MPIJob is created with suspend + set to true, no Pods are created by the MPIJob controller. If + a MPIJob is suspended after creation (i.e. the flag goes from + false to true), the MPIJob controller will delete all active + Pods and PodGroups associated with this MPIJob. Also, it will + suspend the Launcher Job. Users must design their workload to + gracefully handle this. Suspending a Job will reset the StartTime + field of the MPIJob. \n Defaults to false." + type: boolean ttlSecondsAfterFinished: description: TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since diff --git a/go.mod b/go.mod index ad2680579..44752c776 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( k8s.io/klog v1.0.0 k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596 k8s.io/sample-controller v0.25.6 + k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed sigs.k8s.io/controller-runtime v0.13.1 volcano.sh/apis v1.7.0 ) @@ -73,7 +74,6 @@ require ( k8s.io/component-base v0.25.6 // indirect k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect k8s.io/klog/v2 v2.70.1 // indirect - k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect sigs.k8s.io/yaml v1.3.0 // indirect diff --git a/manifests/base/crd.yaml b/manifests/base/crd.yaml index e0a4a2fd9..c770a5147 100644 --- a/manifests/base/crd.yaml +++ b/manifests/base/crd.yaml @@ -49,6 +49,8 @@ spec: type: integer minimum: 0 description: "Specifies the number of retries before marking the launcher Job as failed. Defaults to 6." + suspend: + type: boolean sshAuthMountPath: type: string mpiImplementation: @@ -94,7 +96,7 @@ spec: properties: type: type: string - enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"] + enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"] status: type: string enum: ["True", "False", "Unknown"] diff --git a/pkg/apis/kubeflow/v2beta1/openapi_generated.go b/pkg/apis/kubeflow/v2beta1/openapi_generated.go index 9347831b4..0ce9deb51 100644 --- a/pkg/apis/kubeflow/v2beta1/openapi_generated.go +++ b/pkg/apis/kubeflow/v2beta1/openapi_generated.go @@ -543,6 +543,13 @@ func schema_pkg_apis_kubeflow_v2beta1_RunPolicy(ref common.ReferenceCallback) co Ref: ref("github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1.SchedulingPolicy"), }, }, + "suspend": { + SchemaProps: spec.SchemaProps{ + Description: "suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob.\n\nDefaults to false.", + Type: []string{"boolean"}, + Format: "", + }, + }, }, }, }, diff --git a/pkg/apis/kubeflow/v2beta1/swagger.json b/pkg/apis/kubeflow/v2beta1/swagger.json index 7588b8c0c..e9b1cb202 100644 --- a/pkg/apis/kubeflow/v2beta1/swagger.json +++ b/pkg/apis/kubeflow/v2beta1/swagger.json @@ -399,6 +399,10 @@ "description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling", "$ref": "#/definitions/v2beta1.SchedulingPolicy" }, + "suspend": { + "description": "suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob.\n\nDefaults to false.", + "type": "boolean" + }, "ttlSecondsAfterFinished": { "description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.", "type": "integer", diff --git a/pkg/apis/kubeflow/v2beta1/types.go b/pkg/apis/kubeflow/v2beta1/types.go index 1fa23c246..94dd86571 100644 --- a/pkg/apis/kubeflow/v2beta1/types.go +++ b/pkg/apis/kubeflow/v2beta1/types.go @@ -84,6 +84,18 @@ type RunPolicy struct { // SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling // +optional SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"` + + // suspend specifies whether the MPIJob controller should create Pods or not. + // If a MPIJob is created with suspend set to true, no Pods are created by + // the MPIJob controller. If a MPIJob is suspended after creation (i.e. the + // flag goes from false to true), the MPIJob controller will delete all + // active Pods and PodGroups associated with this MPIJob. Also, it will suspend the + // Launcher Job. Users must design their workload to gracefully handle this. + // Suspending a Job will reset the StartTime field of the MPIJob. + // + // Defaults to false. + // +kubebuilder:default:=false + Suspend *bool `json:"suspend,omitempty"` } type MPIJobSpec struct { @@ -239,6 +251,9 @@ const ( // The training is complete without error. JobSucceeded JobConditionType = "Succeeded" + // JobSuspended means the job has been suspended. + JobSuspended JobConditionType = "Suspended" + // JobFailed means one or more sub-resources (e.g. services/pods) of this job // reached phase failed with no restarting. // The training has failed its execution. diff --git a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go index 6a74a66f0..55aca2ac6 100644 --- a/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go +++ b/pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go @@ -242,6 +242,11 @@ func (in *RunPolicy) DeepCopyInto(out *RunPolicy) { *out = new(SchedulingPolicy) (*in).DeepCopyInto(*out) } + if in.Suspend != nil { + in, out := &in.Suspend, &out.Suspend + *out = new(bool) + **out = **in + } return } diff --git a/pkg/controller/mpi_job_controller.go b/pkg/controller/mpi_job_controller.go index e51a3ac52..a2126cf79 100644 --- a/pkg/controller/mpi_job_controller.go +++ b/pkg/controller/mpi_job_controller.go @@ -33,6 +33,7 @@ import ( "golang.org/x/crypto/ssh" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -50,6 +51,8 @@ import ( "k8s.io/client-go/tools/record" "k8s.io/client-go/util/workqueue" "k8s.io/klog" + "k8s.io/utils/clock" + "k8s.io/utils/pointer" podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned" podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1" @@ -243,6 +246,9 @@ type MPIJobController struct { // To allow injection of updateStatus for testing. updateStatusHandler func(mpijob *kubeflow.MPIJob) error + + // Clock for internal use of unit-testing + clock clock.WithTicker } // NewMPIJobController returns a new MPIJob controller. @@ -258,6 +264,26 @@ func NewMPIJobController( podgroupsInformer podgroupsinformer.PodGroupInformer, mpiJobInformer informers.MPIJobInformer, gangSchedulerName string) *MPIJobController { + return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet, + configMapInformer, secretInformer, serviceInformer, jobInformer, + podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName, + &clock.RealClock{}) +} + +// NewMPIJobController returns a new MPIJob controller. +func NewMPIJobControllerWithClock( + kubeClient kubernetes.Interface, + kubeflowClient clientset.Interface, + volcanoClientSet volcanoclient.Interface, + configMapInformer coreinformers.ConfigMapInformer, + secretInformer coreinformers.SecretInformer, + serviceInformer coreinformers.ServiceInformer, + jobInformer batchinformers.JobInformer, + podInformer coreinformers.PodInformer, + podgroupsInformer podgroupsinformer.PodGroupInformer, + mpiJobInformer informers.MPIJobInformer, + gangSchedulerName string, + clock clock.WithTicker) *MPIJobController { // Create event broadcaster. klog.V(4).Info("Creating event broadcaster") @@ -294,6 +320,7 @@ func NewMPIJobController( queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"), recorder: recorder, gangSchedulerName: gangSchedulerName, + clock: clock, } controller.updateStatusHandler = controller.doUpdateJobStatus @@ -449,9 +476,9 @@ func (c *MPIJobController) processNextWorkItem() bool { // converge the two. It then updates the Status block of the MPIJob resource // with the current status of the resource. func (c *MPIJobController) syncHandler(key string) error { - startTime := time.Now() + startTime := c.clock.Now() defer func() { - klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime)) + klog.Infof("Finished syncing job %q (%v)", key, c.clock.Since(startTime)) }() // Convert the namespace/name string into a distinct namespace and name. @@ -493,7 +520,7 @@ func (c *MPIJobController) syncHandler(key string) error { if len(mpiJob.Status.Conditions) == 0 { msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, kubeflow.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobCreated", msg) mpiJobsCreatedCount.Inc() } @@ -503,24 +530,16 @@ func (c *MPIJobController) syncHandler(key string) error { // cleanup and stop retrying the MPIJob. if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil { if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) { - // set worker StatefulSet Replicas to 0. - if err := c.deleteWorkerPods(mpiJob); err != nil { + if err := cleanUpWorkerPods(mpiJob, c); err != nil { return err } - initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) - if c.gangSchedulerName != "" { - if err := c.deletePodGroups(mpiJob); err != nil { - return err - } - } - mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0 return c.updateStatusHandler(mpiJob) } return nil } // first set StartTime. - if mpiJob.Status.StartTime == nil { + if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) { now := metav1.Now() mpiJob.Status.StartTime = &now } @@ -549,17 +568,18 @@ func (c *MPIJobController) syncHandler(key string) error { return fmt.Errorf("creating SSH auth secret: %w", err) } - // Get the PodGroup for this MPIJob - if c.gangSchedulerName != "" { - if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil { + if !isMPIJobSuspended(mpiJob) { + // Get the PodGroup for this MPIJob + if c.gangSchedulerName != "" { + if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil { + return err + } + } + worker, err = c.getOrCreateWorker(mpiJob) + if err != nil { return err } } - - worker, err = c.getOrCreateWorker(mpiJob) - if err != nil { - return err - } if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel { // The Intel implementation requires workers to communicate with the // launcher through its hostname. For that, we create a Service which @@ -578,6 +598,23 @@ func (c *MPIJobController) syncHandler(key string) error { } } + if launcher != nil { + if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) { + // align the suspension state of launcher with the MPIJob + launcher.Spec.Suspend = pointer.Bool(isMPIJobSuspended(mpiJob)) + if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil { + return err + } + } + } + + // cleanup the running worker pods if the MPI job is suspended + if isMPIJobSuspended(mpiJob) { + if err := cleanUpWorkerPods(mpiJob, c); err != nil { + return err + } + } + // Finally, we update the status block of the MPIJob resource to reflect the // current state of the world. err = c.updateMPIJobStatus(mpiJob, launcher, worker) @@ -588,6 +625,21 @@ func (c *MPIJobController) syncHandler(key string) error { return nil } +func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error { + // set worker StatefulSet Replicas to 0. + if err := c.deleteWorkerPods(mpiJob); err != nil { + return err + } + initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker) + if c.gangSchedulerName != "" { + if err := c.deletePodGroups(mpiJob); err != nil { + return err + } + } + mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0 + return nil +} + // getLauncherJob gets the launcher Job controlled by this MPIJob. func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) { launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix) @@ -857,6 +909,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1 return workerPods, nil } +func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool { + return pointer.BoolDeref(mpiJob.Spec.RunPolicy.Suspend, false) +} + +func isJobSuspended(job *batchv1.Job) bool { + return pointer.BoolDeref(job.Spec.Suspend, false) +} + func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { var ( workerPrefix = mpiJob.Name + workerSuffix @@ -901,6 +961,19 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error { func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker []*corev1.Pod) error { oldStatus := mpiJob.Status.DeepCopy() + if isMPIJobSuspended(mpiJob) { + // it is suspended now + if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") { + c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobSuspended", "MPIJob suspended") + } + } else if getCondition(mpiJob.Status, kubeflow.JobSuspended) != nil { + // it is not suspended now, consider resumed if the condition was set before + if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, mpiJobResumedReason, "MPIJob resumed") { + c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobResumed", "MPIJob resumed") + now := metav1.NewTime(c.clock.Now()) + mpiJob.Status.StartTime = &now + } + } launcherPods, err := c.jobPods(launcher) if err != nil { return fmt.Errorf("checking launcher pods running: %w", err) @@ -919,7 +992,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher if mpiJob.Status.CompletionTime == nil { mpiJob.Status.CompletionTime = launcher.Status.CompletionTime } - updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, v1.ConditionTrue, mpiJobSucceededReason, msg) mpiJobsSuccessCount.Inc() } else if isJobFailed(launcher) { c.updateMPIJobFailedStatus(mpiJob, launcher, launcherPods) @@ -953,13 +1026,16 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher if evict > 0 { msg := fmt.Sprintf("%d/%d workers are evicted", evict, len(worker)) klog.Infof("MPIJob <%s/%s>: %v", mpiJob.Namespace, mpiJob.Name, msg) - updateMPIJobConditions(mpiJob, kubeflow.JobFailed, mpiJobEvict, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobFailed, v1.ConditionTrue, mpiJobEvict, msg) c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg) } - if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { + if isMPIJobSuspended(mpiJob) { + msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + } else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) { msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, kubeflow.JobRunning, mpiJobRunningReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg) c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name) } @@ -999,7 +1075,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau now := metav1.Now() mpiJob.Status.CompletionTime = &now } - updateMPIJobConditions(mpiJob, kubeflow.JobFailed, reason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobFailed, v1.ConditionTrue, reason, msg) mpiJobsFailureCount.Inc() } @@ -1304,7 +1380,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1 } func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job { - return &batchv1.Job{ + job := &batchv1.Job{ ObjectMeta: metav1.ObjectMeta{ Name: mpiJob.Name + launcherSuffix, Namespace: mpiJob.Namespace, @@ -1322,6 +1398,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job Template: c.newLauncherPodTemplate(mpiJob), }, } + if isMPIJobSuspended(mpiJob) { + job.Spec.Suspend = pointer.Bool(true) + } + return job } // newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets diff --git a/pkg/controller/mpi_job_controller_status.go b/pkg/controller/mpi_job_controller_status.go index 123530853..05007e116 100644 --- a/pkg/controller/mpi_job_controller_status.go +++ b/pkg/controller/mpi_job_controller_status.go @@ -28,6 +28,10 @@ const ( mpiJobSucceededReason = "MPIJobSucceeded" // mpiJobRunningReason is added in a mpijob when it is running. mpiJobRunningReason = "MPIJobRunning" + // mpiJobSuspendedReason is added in a mpijob when it is suspended. + mpiJobSuspendedReason = "MPIJobSuspended" + // mpiJobResumedReason is added in a mpijob when it is resumed. + mpiJobResumedReason = "MPIJobResumed" // mpiJobFailedReason is added in a mpijob when it is failed. mpiJobFailedReason = "MPIJobFailed" // mpiJobEvict @@ -45,16 +49,16 @@ func initializeMPIJobStatuses(mpiJob *kubeflow.MPIJob, mtype kubeflow.MPIReplica } // updateMPIJobConditions updates the conditions of the given mpiJob. -func updateMPIJobConditions(mpiJob *kubeflow.MPIJob, conditionType kubeflow.JobConditionType, reason, message string) { - condition := newCondition(conditionType, reason, message) - setCondition(&mpiJob.Status, condition) +func updateMPIJobConditions(mpiJob *kubeflow.MPIJob, conditionType kubeflow.JobConditionType, status v1.ConditionStatus, reason, message string) bool { + condition := newCondition(conditionType, status, reason, message) + return setCondition(&mpiJob.Status, condition) } // newCondition creates a new mpiJob condition. -func newCondition(conditionType kubeflow.JobConditionType, reason, message string) kubeflow.JobCondition { +func newCondition(conditionType kubeflow.JobConditionType, status v1.ConditionStatus, reason, message string) kubeflow.JobCondition { return kubeflow.JobCondition{ Type: conditionType, - Status: v1.ConditionTrue, + Status: status, LastUpdateTime: metav1.Now(), LastTransitionTime: metav1.Now(), Reason: reason, @@ -96,13 +100,12 @@ func isFailed(status kubeflow.JobStatus) bool { // setCondition updates the mpiJob to include the provided condition. // If the condition that we are about to add already exists // and has the same status and reason then we are not going to update. -func setCondition(status *kubeflow.JobStatus, condition kubeflow.JobCondition) { - +func setCondition(status *kubeflow.JobStatus, condition kubeflow.JobCondition) bool { currentCond := getCondition(*status, condition.Type) // Do nothing if condition doesn't change if currentCond != nil && currentCond.Status == condition.Status && currentCond.Reason == condition.Reason { - return + return false } // Do not update lastTransitionTime if the status of the condition doesn't change. @@ -113,6 +116,7 @@ func setCondition(status *kubeflow.JobStatus, condition kubeflow.JobCondition) { // Append the updated condition newConditions := filterOutCondition(status.Conditions, condition.Type) status.Conditions = append(newConditions, condition) + return true } // filterOutCondition returns a new slice of mpiJob conditions without conditions with the provided type. diff --git a/pkg/controller/mpi_job_controller_test.go b/pkg/controller/mpi_job_controller_test.go index ff2cb1c12..2eb4e808c 100644 --- a/pkg/controller/mpi_job_controller_test.go +++ b/pkg/controller/mpi_job_controller_test.go @@ -24,6 +24,7 @@ import ( "github.com/google/go-cmp/cmp/cmpopts" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" @@ -34,6 +35,9 @@ import ( core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" + "k8s.io/utils/clock" + clocktesting "k8s.io/utils/clock/testing" + "k8s.io/utils/pointer" podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1" volcanofake "volcano.sh/apis/pkg/client/clientset/versioned/fake" @@ -145,7 +149,7 @@ func newMPIJob(name string, replicas *int32, startTime, completionTime *metav1.T return mpiJob } -func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { +func (f *fixture) newController(gangSchedulerName string, clock clock.WithTicker) (*MPIJobController, informers.SharedInformerFactory, kubeinformers.SharedInformerFactory) { f.client = fake.NewSimpleClientset(f.objects...) f.kubeClient = k8sfake.NewSimpleClientset(f.kubeObjects...) @@ -155,7 +159,7 @@ func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, in volcanoInformerFactory := volcanoinformers.NewSharedInformerFactory(f.volcanoClient, 0) podgroupsInformer := volcanoInformerFactory.Scheduling().V1beta1().PodGroups() - c := NewMPIJobController( + c := NewMPIJobControllerWithClock( f.kubeClient, f.client, f.volcanoClient, @@ -167,6 +171,7 @@ func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, in podgroupsInformer, i.Kubeflow().V2beta1().MPIJobs(), gangSchedulerName, + clock, ) c.configMapSynced = alwaysReady @@ -230,15 +235,19 @@ func (f *fixture) newController(gangSchedulerName string) (*MPIJobController, in } func (f *fixture) run(mpiJobName string) { - f.runController(mpiJobName, true, false, "") + f.runWithClock(mpiJobName, clock.RealClock{}) +} + +func (f *fixture) runWithClock(mpiJobName string, clock clock.WithTicker) { + f.runController(mpiJobName, true, false, "", clock) } func (f *fixture) runExpectError(mpiJobName string) { - f.runController(mpiJobName, true, true, "") + f.runController(mpiJobName, true, true, "", clock.RealClock{}) } -func (f *fixture) runController(mpiJobName string, startInformers, expectError bool, gangSchedulerName string) { - c, i, k8sI := f.newController(gangSchedulerName) +func (f *fixture) runController(mpiJobName string, startInformers, expectError bool, gangSchedulerName string, clock clock.WithTicker) { + c, i, k8sI := f.newController(gangSchedulerName, clock) if startInformers { stopCh := make(chan struct{}) defer close(stopCh) @@ -359,6 +368,11 @@ func (f *fixture) expectCreateJobAction(d *batchv1.Job) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "jobs", Group: "batch"}, d.Namespace, d)) } +func (f *fixture) expectUpdateJobAction(job *batchv1.Job) { + action := core.NewUpdateAction(schema.GroupVersionResource{Resource: "jobs", Group: "batch", Version: "v1"}, job.Namespace, job) + f.kubeActions = append(f.kubeActions, action) +} + func (f *fixture) expectCreatePodAction(d *corev1.Pod) { f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "pods"}, d.Namespace, d)) } @@ -486,7 +500,7 @@ func TestAllResourcesCreated(t *testing.T) { } f.expectCreateJobAction(fmjc.newLauncherJob(mpiJobCopy)) - mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, mpiJobCreatedReason, "MPIJob default/foo is created.")} + mpiJobCopy.Status.Conditions = []kubeflow.JobCondition{newCondition(kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, "MPIJob default/foo is created.")} mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ kubeflow.MPIReplicaTypeLauncher: {}, kubeflow.MPIReplicaTypeWorker: {}, @@ -547,9 +561,9 @@ func TestLauncherSucceeded(t *testing.T) { setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) msg = fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSucceeded, v1.ConditionTrue, mpiJobSucceededReason, msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t)) @@ -601,9 +615,9 @@ func TestLauncherFailed(t *testing.T) { setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) msg = "Job has reached the specified backoff limit: second message" - updateMPIJobConditions(mpiJobCopy, kubeflow.JobFailed, jobBackoffLimitExceededReason+"/FailedReason2", msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobFailed, v1.ConditionTrue, jobBackoffLimitExceededReason+"/FailedReason2", msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) @@ -715,7 +729,8 @@ func TestShutdownWorker(t *testing.T) { var replicas int32 = 8 mpiJob := newMPIJob("test", &replicas, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s successfully completed.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, mpiJobSucceededReason, msg) + + updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, v1.ConditionTrue, mpiJobSucceededReason, msg) f.setUpMPIJob(mpiJob) fmjc := f.newFakeMPIJobController() @@ -751,6 +766,209 @@ func TestShutdownWorker(t *testing.T) { f.run(getKey(mpiJob, t)) } +func TestCreateSuspendedMPIJob(t *testing.T) { + impls := []kubeflow.MPIImplementation{kubeflow.MPIImplementationOpenMPI, kubeflow.MPIImplementationIntel} + for _, implementation := range impls { + t.Run(string(implementation), func(t *testing.T) { + f := newFixture(t) + + // create a suspended job + var replicas int32 = 8 + mpiJob := newMPIJob("test", &replicas, nil, nil) + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + mpiJob.Spec.MPIImplementation = implementation + f.setUpMPIJob(mpiJob) + + // expect creation of objects + scheme.Scheme.Default(mpiJob) + f.expectCreateServiceAction(newWorkersService(mpiJob)) + cfgMap := newConfigMap(mpiJob, replicas) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + f.expectCreateConfigMapAction(cfgMap) + secret, err := newSSHAuthSecret(mpiJob) + if err != nil { + t.Fatalf("Failed creating secret") + } + f.expectCreateSecretAction(secret) + if implementation == kubeflow.MPIImplementationIntel { + f.expectCreateServiceAction(newLauncherService(mpiJob)) + } + + // expect creating of the launcher + fmjc := f.newFakeMPIJobController() + launcher := fmjc.newLauncherJob(mpiJob) + launcher.Spec.Suspend = pointer.Bool(true) + f.expectCreateJobAction(launcher) + + // expect an update to add the conditions + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, + } + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") + msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) + }) + } +} + +func TestSuspendedRunningMPIJob(t *testing.T) { + f := newFixture(t) + + // setup a running MPIJob with a launcher + var replicas int32 = 8 + startTime := metav1.Now() + mpiJob := newMPIJob("test", &replicas, &startTime, nil) + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false) + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) + msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg) + + mpiJob.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { + Active: 1, + }, + kubeflow.MPIReplicaTypeWorker: { + Active: replicas, + }, + } + + f.setUpMPIJob(mpiJob) + + // setup workers + fmjc := f.newFakeMPIJobController() + var runningPodList []*corev1.Pod + for i := 0; i < int(replicas); i++ { + worker := fmjc.newWorker(mpiJob, i) + worker.Status.Phase = corev1.PodRunning + runningPodList = append(runningPodList, worker) + f.setUpPod(worker) + } + + // setup objects + scheme.Scheme.Default(mpiJob) + f.setUpService(newWorkersService(mpiJob)) + + cfgMap := newConfigMap(mpiJob, replicas) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, runningPodList) + f.setUpConfigMap(cfgMap) + secret, err := newSSHAuthSecret(mpiJob) + if err != nil { + t.Fatalf("Failed creating secret") + } + f.setUpSecret(secret) + + // setup launcher and its pod + launcher := fmjc.newLauncherJob(mpiJob) + launcher.Spec.Suspend = pointer.Bool(false) + launcherPod := mockJobPod(launcher) + launcherPod.Status.Phase = corev1.PodRunning + f.setUpLauncher(launcher) + f.setUpPod(launcherPod) + + // transition the MPIJob into suspended state + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + + // expect moving the launcher pod into suspended state + launcherCopy := launcher.DeepCopy() + launcherCopy.Spec.Suspend = pointer.Bool(true) + f.expectUpdateJobAction(launcherCopy) + + // expect removal of the pods + for i := 0; i < int(replicas); i++ { + name := fmt.Sprintf("%s-%d", mpiJob.Name+workerSuffix, i) + f.kubeActions = append(f.kubeActions, core.NewDeleteAction(schema.GroupVersionResource{Resource: "pods"}, mpiJob.Namespace, name)) + } + + // expect MPI job status update to add the suspend condition + mpiJobCopy := mpiJob.DeepCopy() + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") + msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJobCopy.Namespace, mpiJobCopy.Name) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + // the launcher pod remains active. In live system it gets deleted by + // the launcher's Job controller. + kubeflow.MPIReplicaTypeLauncher: { + Active: 1, + }, + kubeflow.MPIReplicaTypeWorker: {}, + } + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.run(getKey(mpiJob, t)) +} + +func TestResumeMPIJob(t *testing.T) { + fakeClock := clocktesting.NewFakeClock(time.Now().Truncate(time.Second)) + f := newFixture(t) + + // create a suspended job + var replicas int32 = 8 + startTime := metav1.Now() + mpiJob := newMPIJob("test", &replicas, &startTime, nil) + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") + msg = fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name) + updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg) + mpiJob.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, + } + f.setUpMPIJob(mpiJob) + + // expect creation of objects + scheme.Scheme.Default(mpiJob) + f.expectCreateServiceAction(newWorkersService(mpiJob)) + cfgMap := newConfigMap(mpiJob, replicas) + updateDiscoverHostsInConfigMap(cfgMap, mpiJob, nil) + f.setUpConfigMap(cfgMap) + secret, err := newSSHAuthSecret(mpiJob) + if err != nil { + t.Fatalf("Failed creating secret") + } + f.setUpSecret(secret) + + // expect creating of the launcher + fmjc := f.newFakeMPIJobController() + launcher := fmjc.newLauncherJob(mpiJob) + launcher.Spec.Suspend = pointer.Bool(true) + f.setUpLauncher(launcher) + + // move the timer by a second so that the StartTime is updated after resume + fakeClock.Sleep(time.Second) + + // resume the MPIJob + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false) + + // expect creation of the pods + for i := 0; i < int(replicas); i++ { + worker := fmjc.newWorker(mpiJob, i) + f.kubeActions = append(f.kubeActions, core.NewCreateAction(schema.GroupVersionResource{Resource: "pods"}, mpiJob.Namespace, worker)) + } + + // expect the launcher update to resume it + launcherCopy := launcher.DeepCopy() + launcherCopy.Spec.Suspend = pointer.Bool(false) + f.expectUpdateJobAction(launcherCopy) + + // expect an update to add the conditions + mpiJobCopy := mpiJob.DeepCopy() + mpiJobCopy.Status.StartTime = &metav1.Time{Time: fakeClock.Now()} + updateMPIJobConditions(mpiJobCopy, kubeflow.JobSuspended, v1.ConditionFalse, "MPIJobResumed", "MPIJob resumed") + f.expectUpdateMPIJobStatusAction(mpiJobCopy) + + f.runWithClock(getKey(mpiJob, t), fakeClock) +} + func TestWorkerNotControlledByUs(t *testing.T) { f := newFixture(t) startTime := metav1.Now() @@ -816,7 +1034,7 @@ func TestLauncherActiveWorkerNotReady(t *testing.T) { f.setUpPod(worker) } msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) mpiJobCopy.Status.ReplicaStatuses = map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ kubeflow.MPIReplicaTypeLauncher: { Active: 1, @@ -886,9 +1104,9 @@ func TestLauncherActiveWorkerReady(t *testing.T) { } setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) msg = fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, mpiJobRunningReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg) f.expectUpdateMPIJobStatusAction(mpiJobCopy) f.run(getKey(mpiJob, t)) @@ -942,7 +1160,7 @@ func TestWorkerReady(t *testing.T) { }, } msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name) - updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, mpiJobCreatedReason, msg) + updateMPIJobConditions(mpiJobCopy, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg) setUpMPIJobTimestamp(mpiJobCopy, &startTime, &completionTime) f.expectUpdateMPIJobStatusAction(mpiJobCopy) diff --git a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md index 54b052d1f..eeb1727fc 100644 --- a/sdk/python/v2beta1/docs/V2beta1RunPolicy.md +++ b/sdk/python/v2beta1/docs/V2beta1RunPolicy.md @@ -9,6 +9,7 @@ Name | Type | Description | Notes **backoff_limit** | **int** | Optional number of retries before marking this job failed. | [optional] **clean_pod_policy** | **str** | CleanPodPolicy defines the policy to kill pods after the job completes. Default to Running. | [optional] **scheduling_policy** | [**V2beta1SchedulingPolicy**](V2beta1SchedulingPolicy.md) | | [optional] +**suspend** | **bool** | suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. | [optional] **ttl_seconds_after_finished** | **int** | TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite. | [optional] [[Back to Model list]](../README.md#documentation-for-models) [[Back to API list]](../README.md#documentation-for-api-endpoints) [[Back to README]](../README.md) diff --git a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py index ae3d4e411..b8c252949 100644 --- a/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py +++ b/sdk/python/v2beta1/mpijob/models/v2beta1_run_policy.py @@ -37,6 +37,7 @@ class V2beta1RunPolicy(object): 'backoff_limit': 'int', 'clean_pod_policy': 'str', 'scheduling_policy': 'V2beta1SchedulingPolicy', + 'suspend': 'bool', 'ttl_seconds_after_finished': 'int' } @@ -45,10 +46,11 @@ class V2beta1RunPolicy(object): 'backoff_limit': 'backoffLimit', 'clean_pod_policy': 'cleanPodPolicy', 'scheduling_policy': 'schedulingPolicy', + 'suspend': 'suspend', 'ttl_seconds_after_finished': 'ttlSecondsAfterFinished' } - def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 + def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_policy=None, scheduling_policy=None, suspend=None, ttl_seconds_after_finished=None, local_vars_configuration=None): # noqa: E501 """V2beta1RunPolicy - a model defined in OpenAPI""" # noqa: E501 if local_vars_configuration is None: local_vars_configuration = Configuration.get_default_copy() @@ -58,6 +60,7 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self._backoff_limit = None self._clean_pod_policy = None self._scheduling_policy = None + self._suspend = None self._ttl_seconds_after_finished = None self.discriminator = None @@ -69,6 +72,8 @@ def __init__(self, active_deadline_seconds=None, backoff_limit=None, clean_pod_p self.clean_pod_policy = clean_pod_policy if scheduling_policy is not None: self.scheduling_policy = scheduling_policy + if suspend is not None: + self.suspend = suspend if ttl_seconds_after_finished is not None: self.ttl_seconds_after_finished = ttl_seconds_after_finished @@ -162,6 +167,29 @@ def scheduling_policy(self, scheduling_policy): self._scheduling_policy = scheduling_policy + @property + def suspend(self): + """Gets the suspend of this V2beta1RunPolicy. # noqa: E501 + + suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. # noqa: E501 + + :return: The suspend of this V2beta1RunPolicy. # noqa: E501 + :rtype: bool + """ + return self._suspend + + @suspend.setter + def suspend(self, suspend): + """Sets the suspend of this V2beta1RunPolicy. + + suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob. Defaults to false. # noqa: E501 + + :param suspend: The suspend of this V2beta1RunPolicy. # noqa: E501 + :type suspend: bool + """ + + self._suspend = suspend + @property def ttl_seconds_after_finished(self): """Gets the ttl_seconds_after_finished of this V2beta1RunPolicy. # noqa: E501 diff --git a/test/e2e/mpi_job_test.go b/test/e2e/mpi_job_test.go index 789739726..192aafafe 100644 --- a/test/e2e/mpi_job_test.go +++ b/test/e2e/mpi_job_test.go @@ -31,6 +31,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/intstr" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/utils/pointer" ) var _ = ginkgo.Describe("MPIJob", func() { @@ -122,6 +123,25 @@ var _ = ginkgo.Describe("MPIJob", func() { expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) }) + ginkgo.When("suspended on creation", func() { + ginkgo.BeforeEach(func() { + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + }) + ginkgo.It("should not create pods when suspended and succeed when resumed", func() { + mpiJob := createJob(mpiJob) + + ctx := context.Background() + ginkgo.By("verifying there are no pods (neither launcher nor pods) running for the suspended MPIJob") + pods, err := k8sClient.CoreV1().Pods(mpiJob.Namespace).List(ctx, metav1.ListOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + gomega.Expect(pods.Items).To(gomega.HaveLen(0)) + + mpiJob = resumeJob(mpiJob) + mpiJob = waitForCompletion(mpiJob) + expectConditionToBeTrue(mpiJob, kubeflow.JobSucceeded) + }) + }) + ginkgo.When("running with host network", func() { ginkgo.BeforeEach(func() { mpiJob.Spec.MPIReplicaSpecs[kubeflow.MPIReplicaTypeLauncher].Template.Spec.HostNetwork = true @@ -211,12 +231,31 @@ var _ = ginkgo.Describe("MPIJob", func() { }) }) +func resumeJob(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob { + ctx := context.Background() + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false) + ginkgo.By("Resuming MPIJob") + mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Update(ctx, mpiJob, metav1.UpdateOptions{}) + gomega.Expect(err).ToNot(gomega.HaveOccurred()) + return mpiJob +} + func createJobAndWaitForCompletion(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob { + mpiJob = createJob(mpiJob) + return waitForCompletion(mpiJob) +} + +func createJob(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob { ctx := context.Background() - var err error ginkgo.By("Creating MPIJob") - mpiJob, err = mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Create(ctx, mpiJob, metav1.CreateOptions{}) + mpiJob, err := mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Create(ctx, mpiJob, metav1.CreateOptions{}) gomega.Expect(err).ToNot(gomega.HaveOccurred()) + return mpiJob +} + +func waitForCompletion(mpiJob *kubeflow.MPIJob) *kubeflow.MPIJob { + var err error + ctx := context.Background() ginkgo.By("Waiting for MPIJob to finish") err = wait.Poll(waitInterval, foreverTimeout, func() (bool, error) { diff --git a/test/integration/mpi_job_controller_test.go b/test/integration/mpi_job_controller_test.go index 2cb961c1e..d3d9e02e8 100644 --- a/test/integration/mpi_job_controller_test.go +++ b/test/integration/mpi_job_controller_test.go @@ -23,6 +23,7 @@ import ( "github.com/google/go-cmp/cmp" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/util/runtime" @@ -30,6 +31,7 @@ import ( kubeinformers "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/reference" + "k8s.io/utils/pointer" common "github.com/kubeflow/common/pkg/apis/common/v1" kubeflow "github.com/kubeflow/mpi-operator/pkg/apis/kubeflow/v2beta1" @@ -169,6 +171,170 @@ func TestMPIJobSuccess(t *testing.T) { } } +func TestMPIJobResumingAndSuspending(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + s := newTestSetup(ctx, t) + startController(ctx, s.kClient, s.mpiClient) + + mpiJob := &kubeflow.MPIJob{ + ObjectMeta: metav1.ObjectMeta{ + Name: "job", + Namespace: s.namespace, + }, + Spec: kubeflow.MPIJobSpec{ + SlotsPerWorker: newInt32(1), + RunPolicy: kubeflow.RunPolicy{ + CleanPodPolicy: newCleanPodPolicy(kubeflow.CleanPodPolicyRunning), + Suspend: pointer.Bool(true), + }, + MPIReplicaSpecs: map[kubeflow.MPIReplicaType]*common.ReplicaSpec{ + kubeflow.MPIReplicaTypeLauncher: { + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "mpi-image", + }, + }, + }, + }, + }, + kubeflow.MPIReplicaTypeWorker: { + Replicas: newInt32(2), + Template: corev1.PodTemplateSpec{ + Spec: corev1.PodSpec{ + Containers: []corev1.Container{ + { + Name: "main", + Image: "mpi-image", + }, + }, + }, + }, + }, + }, + }, + } + // 1. Create suspended MPIJob + var err error + mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(s.namespace).Create(ctx, mpiJob, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("Failed sending job to apiserver: %v", err) + } + s.events.expect(eventForJob(corev1.Event{ + Type: corev1.EventTypeNormal, + Reason: "MPIJobCreated", + }, mpiJob)) + + s.events.expect(eventForJob(corev1.Event{ + Type: corev1.EventTypeNormal, + Reason: "MPIJobSuspended", + }, mpiJob)) + + _, launcherJob := validateMPIJobDependencies(ctx, t, s.kClient, mpiJob, 0) + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, + }) + if !mpiJobHasCondition(mpiJob, kubeflow.JobCreated) { + t.Errorf("MPIJob missing Created condition") + } + if !mpiJobHasCondition(mpiJob, kubeflow.JobSuspended) { + t.Errorf("MPIJob missing Suspended condition") + } + if !isJobSuspended(launcherJob) { + t.Errorf("LauncherJob is suspended") + } + if mpiJob.Status.StartTime != nil { + t.Errorf("MPIJob has unexpected start time: %v", mpiJob.Status.StartTime) + } + + s.events.verify(t) + + // 2. Resume the MPIJob + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(false) + mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Update(ctx, mpiJob, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to resume the MPIJob: %v", err) + } + s.events.expect(eventForJob(corev1.Event{ + Type: corev1.EventTypeNormal, + Reason: "MPIJobResumed", + }, mpiJob)) + + workerPods, launcherJob := validateMPIJobDependencies(ctx, t, s.kClient, mpiJob, 2) + + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, + }) + if mpiJob.Status.StartTime == nil { + t.Errorf("MPIJob is missing startTime") + } + if isJobSuspended(launcherJob) { + t.Errorf("LauncherJob is suspended") + } + if !mpiJobHasConditionWithStatus(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse) { + t.Errorf("MPIJob has unexpected Suspended condition") + } + + s.events.verify(t) + + // 3. Set the pods to be running + err = updatePodsToPhase(ctx, s.kClient, workerPods, corev1.PodRunning) + if err != nil { + t.Fatalf("Updating worker Pods to Running phase: %v", err) + } + s.events.expect(eventForJob(corev1.Event{ + Type: corev1.EventTypeNormal, + Reason: "MPIJobRunning", + }, mpiJob)) + launcherPod, err := createPodForJob(ctx, s.kClient, launcherJob) + if err != nil { + t.Fatalf("Failed to create mock pod for launcher Job: %v", err) + } + err = updatePodsToPhase(ctx, s.kClient, []corev1.Pod{*launcherPod}, corev1.PodRunning) + if err != nil { + t.Fatalf("Updating launcher Pods to Running phase: %v", err) + } + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: { + Active: 1, + }, + kubeflow.MPIReplicaTypeWorker: { + Active: 2, + }, + }) + s.events.verify(t) + + // 4. Suspend the running MPIJob + mpiJob.Spec.RunPolicy.Suspend = pointer.Bool(true) + mpiJob, err = s.mpiClient.KubeflowV2beta1().MPIJobs(mpiJob.Namespace).Update(ctx, mpiJob, metav1.UpdateOptions{}) + if err != nil { + t.Fatalf("Failed to suspend the MPIJob: %v", err) + } + err = s.kClient.CoreV1().Pods(launcherPod.Namespace).Delete(ctx, launcherPod.Name, metav1.DeleteOptions{}) + if err != nil { + t.Fatalf("Failed to delete mock pod for launcher Job: %v", err) + } + _, launcherJob = validateMPIJobDependencies(ctx, t, s.kClient, mpiJob, 0) + mpiJob = validateMPIJobStatus(ctx, t, s.mpiClient, mpiJob, map[kubeflow.MPIReplicaType]*kubeflow.ReplicaStatus{ + kubeflow.MPIReplicaTypeLauncher: {}, + kubeflow.MPIReplicaTypeWorker: {}, + }) + if !isJobSuspended(launcherJob) { + t.Errorf("LauncherJob is not suspended") + } + if !mpiJobHasCondition(mpiJob, kubeflow.JobSuspended) { + t.Errorf("MPIJob missing Suspended condition") + } + if !mpiJobHasConditionWithStatus(mpiJob, kubeflow.JobRunning, v1.ConditionFalse) { + t.Errorf("MPIJob has unexpected Running condition") + } +} + func TestMPIJobFailure(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) t.Cleanup(cancel) @@ -533,14 +699,22 @@ func hasVolumeForConfigMap(podSpec *corev1.PodSpec, cm *corev1.ConfigMap) bool { } func mpiJobHasCondition(job *kubeflow.MPIJob, cond kubeflow.JobConditionType) bool { + return mpiJobHasConditionWithStatus(job, cond, v1.ConditionTrue) +} + +func mpiJobHasConditionWithStatus(job *kubeflow.MPIJob, cond kubeflow.JobConditionType, status v1.ConditionStatus) bool { for _, c := range job.Status.Conditions { - if c.Type == cond { - return c.Status == corev1.ConditionTrue + if c.Type == cond && c.Status == status { + return true } } return false } +func isJobSuspended(job *batchv1.Job) bool { + return pointer.BoolDeref(job.Spec.Suspend, false) +} + func newInt32(v int32) *int32 { return &v }