Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions crd/kubeflow.org_mpijobs.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7866,6 +7866,18 @@ spec:
format: int32
type: integer
type: object
suspend:
default: false
description: "suspend specifies whether the MPIJob controller
should create Pods or not. If a MPIJob is created with suspend
set to true, no Pods are created by the MPIJob controller. If
a MPIJob is suspended after creation (i.e. the flag goes from
false to true), the MPIJob controller will delete all active
Pods and PodGroups associated with this MPIJob. Also, it will
suspend the Launcher Job. Users must design their workload to
gracefully handle this. Suspending a Job will reset the StartTime
field of the MPIJob. \n Defaults to false."
type: boolean
ttlSecondsAfterFinished:
description: TTLSecondsAfterFinished is the TTL to clean up jobs.
It may take extra ReconcilePeriod seconds for the cleanup, since
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
k8s.io/klog v1.0.0
k8s.io/kube-openapi v0.0.0-20230109183929-3758b55a6596
k8s.io/sample-controller v0.25.6
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed
sigs.k8s.io/controller-runtime v0.13.1
volcano.sh/apis v1.7.0
)
Expand Down Expand Up @@ -73,7 +74,6 @@ require (
k8s.io/component-base v0.25.6 // indirect
k8s.io/gengo v0.0.0-20211129171323-c02415ce4185 // indirect
k8s.io/klog/v2 v2.70.1 // indirect
k8s.io/utils v0.0.0-20220728103510-ee6ede2d64ed // indirect
sigs.k8s.io/json v0.0.0-20220713155537-f223a00ba0e2 // indirect
sigs.k8s.io/structured-merge-diff/v4 v4.2.3 // indirect
sigs.k8s.io/yaml v1.3.0 // indirect
Expand Down
4 changes: 3 additions & 1 deletion manifests/base/crd.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ spec:
type: integer
minimum: 0
description: "Specifies the number of retries before marking the launcher Job as failed. Defaults to 6."
suspend:
type: boolean
sshAuthMountPath:
type: string
mpiImplementation:
Expand Down Expand Up @@ -94,7 +96,7 @@ spec:
properties:
type:
type: string
enum: ["Created", "Running", "Restarting", "Succeeded", "Failed"]
enum: ["Created", "Running", "Restarting", "Succeeded", "Suspended", "Failed"]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this autogenerated?
If so, I'm curious to know how it worked.

Maybe this? https://github.com/kubeflow/common/blob/9ec55d141f90faaf52fd6df271e987e5a6781945/pkg/apis/common/v1/types.go#L112

We should probably use it in Kueue, where applicable

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope, I updated it manually.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uhm... then make sure that make generate doesn't override it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it doesn't, checked

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y is this expected?
Or is this related to your other PR?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can leave this to #510

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a known issue, and this will be fixed by #510.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, but it is needed for the e2e test (which I believe is worth adding).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, keep the manual change, and #510 should automatize it.

status:
type: string
enum: ["True", "False", "Unknown"]
Expand Down
7 changes: 7 additions & 0 deletions pkg/apis/kubeflow/v2beta1/openapi_generated.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pkg/apis/kubeflow/v2beta1/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,10 @@
"description": "SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling",
"$ref": "#/definitions/v2beta1.SchedulingPolicy"
},
"suspend": {
"description": "suspend specifies whether the MPIJob controller should create Pods or not. If a MPIJob is created with suspend set to true, no Pods are created by the MPIJob controller. If a MPIJob is suspended after creation (i.e. the flag goes from false to true), the MPIJob controller will delete all active Pods and PodGroups associated with this MPIJob. Also, it will suspend the Launcher Job. Users must design their workload to gracefully handle this. Suspending a Job will reset the StartTime field of the MPIJob.\n\nDefaults to false.",
"type": "boolean"
},
"ttlSecondsAfterFinished": {
"description": "TTLSecondsAfterFinished is the TTL to clean up jobs. It may take extra ReconcilePeriod seconds for the cleanup, since reconcile gets called periodically. Default to infinite.",
"type": "integer",
Expand Down
15 changes: 15 additions & 0 deletions pkg/apis/kubeflow/v2beta1/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,18 @@ type RunPolicy struct {
// SchedulingPolicy defines the policy related to scheduling, e.g. gang-scheduling
// +optional
SchedulingPolicy *SchedulingPolicy `json:"schedulingPolicy,omitempty"`

// suspend specifies whether the MPIJob controller should create Pods or not.
// If a MPIJob is created with suspend set to true, no Pods are created by
// the MPIJob controller. If a MPIJob is suspended after creation (i.e. the
// flag goes from false to true), the MPIJob controller will delete all
// active Pods and PodGroups associated with this MPIJob. Also, it will suspend the
// Launcher Job. Users must design their workload to gracefully handle this.
// Suspending a Job will reset the StartTime field of the MPIJob.
//
// Defaults to false.
// +kubebuilder:default:=false
Suspend *bool `json:"suspend,omitempty"`
}

type MPIJobSpec struct {
Expand Down Expand Up @@ -239,6 +251,9 @@ const (
// The training is complete without error.
JobSucceeded JobConditionType = "Succeeded"

// JobSuspended means the job has been suspended.
JobSuspended JobConditionType = "Suspended"

// JobFailed means one or more sub-resources (e.g. services/pods) of this job
// reached phase failed with no restarting.
// The training has failed its execution.
Expand Down
5 changes: 5 additions & 0 deletions pkg/apis/kubeflow/v2beta1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

134 changes: 107 additions & 27 deletions pkg/controller/mpi_job_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"golang.org/x/crypto/ssh"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -50,6 +51,8 @@ import (
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog"
"k8s.io/utils/clock"
"k8s.io/utils/pointer"
podgroupv1beta1 "volcano.sh/apis/pkg/apis/scheduling/v1beta1"
volcanoclient "volcano.sh/apis/pkg/client/clientset/versioned"
podgroupsinformer "volcano.sh/apis/pkg/client/informers/externalversions/scheduling/v1beta1"
Expand Down Expand Up @@ -243,6 +246,9 @@ type MPIJobController struct {

// To allow injection of updateStatus for testing.
updateStatusHandler func(mpijob *kubeflow.MPIJob) error

// Clock for internal use of unit-testing
clock clock.WithTicker
}

// NewMPIJobController returns a new MPIJob controller.
Expand All @@ -258,6 +264,26 @@ func NewMPIJobController(
podgroupsInformer podgroupsinformer.PodGroupInformer,
mpiJobInformer informers.MPIJobInformer,
gangSchedulerName string) *MPIJobController {
return NewMPIJobControllerWithClock(kubeClient, kubeflowClient, volcanoClientSet,
configMapInformer, secretInformer, serviceInformer, jobInformer,
podInformer, podgroupsInformer, mpiJobInformer, gangSchedulerName,
&clock.RealClock{})
}

// NewMPIJobController returns a new MPIJob controller.
func NewMPIJobControllerWithClock(
kubeClient kubernetes.Interface,
kubeflowClient clientset.Interface,
volcanoClientSet volcanoclient.Interface,
configMapInformer coreinformers.ConfigMapInformer,
secretInformer coreinformers.SecretInformer,
serviceInformer coreinformers.ServiceInformer,
jobInformer batchinformers.JobInformer,
podInformer coreinformers.PodInformer,
podgroupsInformer podgroupsinformer.PodGroupInformer,
mpiJobInformer informers.MPIJobInformer,
gangSchedulerName string,
clock clock.WithTicker) *MPIJobController {

// Create event broadcaster.
klog.V(4).Info("Creating event broadcaster")
Expand Down Expand Up @@ -294,6 +320,7 @@ func NewMPIJobController(
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "MPIJobs"),
recorder: recorder,
gangSchedulerName: gangSchedulerName,
clock: clock,
}

controller.updateStatusHandler = controller.doUpdateJobStatus
Expand Down Expand Up @@ -449,9 +476,9 @@ func (c *MPIJobController) processNextWorkItem() bool {
// converge the two. It then updates the Status block of the MPIJob resource
// with the current status of the resource.
func (c *MPIJobController) syncHandler(key string) error {
startTime := time.Now()
startTime := c.clock.Now()
defer func() {
klog.Infof("Finished syncing job %q (%v)", key, time.Since(startTime))
klog.Infof("Finished syncing job %q (%v)", key, c.clock.Since(startTime))
}()

// Convert the namespace/name string into a distinct namespace and name.
Expand Down Expand Up @@ -493,7 +520,7 @@ func (c *MPIJobController) syncHandler(key string) error {

if len(mpiJob.Status.Conditions) == 0 {
msg := fmt.Sprintf("MPIJob %s/%s is created.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, mpiJobCreatedReason, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobCreated, v1.ConditionTrue, mpiJobCreatedReason, msg)
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobCreated", msg)
mpiJobsCreatedCount.Inc()
}
Expand All @@ -503,24 +530,16 @@ func (c *MPIJobController) syncHandler(key string) error {
// cleanup and stop retrying the MPIJob.
if isFinished(mpiJob.Status) && mpiJob.Status.CompletionTime != nil {
if isCleanUpPods(mpiJob.Spec.RunPolicy.CleanPodPolicy) {
// set worker StatefulSet Replicas to 0.
if err := c.deleteWorkerPods(mpiJob); err != nil {
if err := cleanUpWorkerPods(mpiJob, c); err != nil {
return err
}
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
if c.gangSchedulerName != "" {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
}
mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0
return c.updateStatusHandler(mpiJob)
}
return nil
}

// first set StartTime.
if mpiJob.Status.StartTime == nil {
if mpiJob.Status.StartTime == nil && !isMPIJobSuspended(mpiJob) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the job was suspended, we should reset the StartTime. Double check how we do it in the job controller.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done - I've added the JobSuspended condition and replicated the semantics around StartTime.

However, the .spec.runPolicy.activeDeadlineSeconds is actually respected for MPIJobs via batch.Job:

ActiveDeadlineSeconds: mpiJob.Spec.RunPolicy.ActiveDeadlineSeconds,
.

This means that the changes aren't strictly required to enforce the timeout. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yay for using Job!

now := metav1.Now()
mpiJob.Status.StartTime = &now
}
Expand Down Expand Up @@ -549,17 +568,18 @@ func (c *MPIJobController) syncHandler(key string) error {
return fmt.Errorf("creating SSH auth secret: %w", err)
}

// Get the PodGroup for this MPIJob
if c.gangSchedulerName != "" {
if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil {
if !isMPIJobSuspended(mpiJob) {
// Get the PodGroup for this MPIJob
if c.gangSchedulerName != "" {
if podgroup, err := c.getOrCreatePodGroups(mpiJob, workerReplicas(mpiJob)+1); podgroup == nil || err != nil {
return err
}
}
worker, err = c.getOrCreateWorker(mpiJob)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also create the podgroup conditionally?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now I create all resources for as long as the MPIJob is suspended. This is the common workflow for Kueue, where the Job is created suspended. In some cases, when the job never get unsuspended (for whatever reason), we can save on creating the objects.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For now reverted the change - sorry for back end forth - this is just to minimize the diff as the unit tests currently verify the objects are created. Let me know what you think.

if err != nil {
return err
}
}

worker, err = c.getOrCreateWorker(mpiJob)
if err != nil {
return err
}
if mpiJob.Spec.MPIImplementation == kubeflow.MPIImplementationIntel {
// The Intel implementation requires workers to communicate with the
// launcher through its hostname. For that, we create a Service which
Expand All @@ -578,6 +598,23 @@ func (c *MPIJobController) syncHandler(key string) error {
}
}

if launcher != nil {
if isMPIJobSuspended(mpiJob) != isJobSuspended(launcher) {
// align the suspension state of launcher with the MPIJob
launcher.Spec.Suspend = pointer.Bool(isMPIJobSuspended(mpiJob))
if _, err := c.kubeClient.BatchV1().Jobs(namespace).Update(context.TODO(), launcher, metav1.UpdateOptions{}); err != nil {
return err
}
}
}

// cleanup the running worker pods if the MPI job is suspended
if isMPIJobSuspended(mpiJob) {
if err := cleanUpWorkerPods(mpiJob, c); err != nil {
return err
}
}

// Finally, we update the status block of the MPIJob resource to reflect the
// current state of the world.
err = c.updateMPIJobStatus(mpiJob, launcher, worker)
Expand All @@ -588,6 +625,21 @@ func (c *MPIJobController) syncHandler(key string) error {
return nil
}

func cleanUpWorkerPods(mpiJob *kubeflow.MPIJob, c *MPIJobController) error {
// set worker StatefulSet Replicas to 0.
if err := c.deleteWorkerPods(mpiJob); err != nil {
return err
}
initializeMPIJobStatuses(mpiJob, kubeflow.MPIReplicaTypeWorker)
if c.gangSchedulerName != "" {
if err := c.deletePodGroups(mpiJob); err != nil {
return err
}
}
Comment on lines +634 to +638
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@tenzen-y are you familiar with the volcano integration?

I wonder if we need to remove the pod group on suspension. Does it matter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do podgroups allocate any node resources, as pods, or they are just API objects (as services)?

I do not delete services and other API objects on suspension. I also don't delete the launcher job, just suspend it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AFAIK, a podgroup is just a declaration that some pods should be treated as a unit. But a podgroup doesn't create other objects.

Copy link
Member

@tenzen-y tenzen-y Jan 27, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The scheduling.volcano.sh/v1beta1 PodGroup also has queueing logic. So we might need to delete PodGroup to re-queue Pods.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe, we should create E2E with the volcano in a separate PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably someone with volcano experience should do it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I looked at the volcano code a little bit on my own, but could not conclude if it may create the pods under some scenarios.

Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT?

+1 for the idea of an e2e test employing with podgroups. I guess we could create a follow-up Issue and ask for people willing to do it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thus, for now, I delete the PodGroups either, to be on the safe side. WDYT?

That sounds good to me.

mpiJob.Status.ReplicaStatuses[kubeflow.MPIReplicaTypeWorker].Active = 0
return nil
}

// getLauncherJob gets the launcher Job controlled by this MPIJob.
func (c *MPIJobController) getLauncherJob(mpiJob *kubeflow.MPIJob) (*batchv1.Job, error) {
launcher, err := c.jobLister.Jobs(mpiJob.Namespace).Get(mpiJob.Name + launcherSuffix)
Expand Down Expand Up @@ -857,6 +909,14 @@ func (c *MPIJobController) getOrCreateWorker(mpiJob *kubeflow.MPIJob) ([]*corev1
return workerPods, nil
}

func isMPIJobSuspended(mpiJob *kubeflow.MPIJob) bool {
return pointer.BoolDeref(mpiJob.Spec.RunPolicy.Suspend, false)
}

func isJobSuspended(job *batchv1.Job) bool {
return pointer.BoolDeref(job.Spec.Suspend, false)
}

func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {
var (
workerPrefix = mpiJob.Name + workerSuffix
Expand Down Expand Up @@ -901,6 +961,19 @@ func (c *MPIJobController) deleteWorkerPods(mpiJob *kubeflow.MPIJob) error {

func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher *batchv1.Job, worker []*corev1.Pod) error {
oldStatus := mpiJob.Status.DeepCopy()
if isMPIJobSuspended(mpiJob) {
// it is suspended now
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionTrue, mpiJobSuspendedReason, "MPIJob suspended") {
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobSuspended", "MPIJob suspended")
}
} else if getCondition(mpiJob.Status, kubeflow.JobSuspended) != nil {
// it is not suspended now, consider resumed if the condition was set before
if updateMPIJobConditions(mpiJob, kubeflow.JobSuspended, v1.ConditionFalse, mpiJobResumedReason, "MPIJob resumed") {
c.recorder.Event(mpiJob, corev1.EventTypeNormal, "MPIJobResumed", "MPIJob resumed")
now := metav1.NewTime(c.clock.Now())
mpiJob.Status.StartTime = &now
}
}
launcherPods, err := c.jobPods(launcher)
if err != nil {
return fmt.Errorf("checking launcher pods running: %w", err)
Expand All @@ -919,7 +992,7 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
if mpiJob.Status.CompletionTime == nil {
mpiJob.Status.CompletionTime = launcher.Status.CompletionTime
}
updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, mpiJobSucceededReason, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobSucceeded, v1.ConditionTrue, mpiJobSucceededReason, msg)
mpiJobsSuccessCount.Inc()
} else if isJobFailed(launcher) {
c.updateMPIJobFailedStatus(mpiJob, launcher, launcherPods)
Expand Down Expand Up @@ -953,13 +1026,16 @@ func (c *MPIJobController) updateMPIJobStatus(mpiJob *kubeflow.MPIJob, launcher
if evict > 0 {
msg := fmt.Sprintf("%d/%d workers are evicted", evict, len(worker))
klog.Infof("MPIJob <%s/%s>: %v", mpiJob.Namespace, mpiJob.Name, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobFailed, mpiJobEvict, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobFailed, v1.ConditionTrue, mpiJobEvict, msg)
c.recorder.Event(mpiJob, corev1.EventTypeWarning, mpiJobEvict, msg)
}

if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
if isMPIJobSuspended(mpiJob) {
msg := fmt.Sprintf("MPIJob %s/%s is suspended.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionFalse, mpiJobSuspendedReason, msg)
} else if launcher != nil && launcherPodsCnt >= 1 && running == len(worker) {
msg := fmt.Sprintf("MPIJob %s/%s is running.", mpiJob.Namespace, mpiJob.Name)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, mpiJobRunningReason, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobRunning, v1.ConditionTrue, mpiJobRunningReason, msg)
c.recorder.Eventf(mpiJob, corev1.EventTypeNormal, "MPIJobRunning", "MPIJob %s/%s is running", mpiJob.Namespace, mpiJob.Name)
}

Expand Down Expand Up @@ -999,7 +1075,7 @@ func (c *MPIJobController) updateMPIJobFailedStatus(mpiJob *kubeflow.MPIJob, lau
now := metav1.Now()
mpiJob.Status.CompletionTime = &now
}
updateMPIJobConditions(mpiJob, kubeflow.JobFailed, reason, msg)
updateMPIJobConditions(mpiJob, kubeflow.JobFailed, v1.ConditionTrue, reason, msg)
mpiJobsFailureCount.Inc()
}

Expand Down Expand Up @@ -1304,7 +1380,7 @@ func (c *MPIJobController) newWorker(mpiJob *kubeflow.MPIJob, index int) *corev1
}

func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job {
return &batchv1.Job{
job := &batchv1.Job{
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Food for thought:
We will probably need some kind of Job annotation that tells kueue that this Job is already queued as part of a higher level object (MPIJob in this case), so that we simply ignore it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point. IIUC, this affects only Kueue configurations when ManageJobsWithoutQueueName=true, which is non-default. We could have an annotation, yes, or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

or just do not manage by Kueue any Job objects which have OwnerReference to another object managed by Kueue.

The dependency could be indirect. And we don't want to spend a GET call to obtain such information.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, but still we should strive to keep the integration interface as small as possible - every extension to the surface of the interface will be multiplied by the number of projects, but maybe a new annotation is not that bad. Also, maybe we can have a hybrid approach.

ObjectMeta: metav1.ObjectMeta{
Name: mpiJob.Name + launcherSuffix,
Namespace: mpiJob.Namespace,
Expand All @@ -1322,6 +1398,10 @@ func (c *MPIJobController) newLauncherJob(mpiJob *kubeflow.MPIJob) *batchv1.Job
Template: c.newLauncherPodTemplate(mpiJob),
},
}
if isMPIJobSuspended(mpiJob) {
job.Spec.Suspend = pointer.Bool(true)
}
return job
}

// newLauncherPodTemplate creates a new launcher Job for an MPIJob resource. It also sets
Expand Down
Loading