Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cmd/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"time"

"github.com/spf13/pflag"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apiserver/pkg/apis/config"
"k8s.io/kubernetes/pkg/client/leaderelectionconfig"
)
Expand Down Expand Up @@ -69,4 +69,4 @@ func (t *TrainingJobOperatorOption) AddFlags(fs *pflag.FlagSet) {
fs.DurationVar(&t.CreatingDurationTime, "creating-duration-period", t.CreatingDurationTime, "The period time of creating container")
fs.BoolVar(&t.EnableCreatingFailed, "enable-creating-failed", t.EnableCreatingFailed, "set job failed if containers have been creating excceed creating-restart-period.")
leaderelectionconfig.BindFlags(&t.LeaderElection, fs)
}
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ require (
github.com/coreos/etcd v3.3.22+incompatible // indirect
github.com/coreos/go-semver v0.3.0 // indirect
github.com/coreos/go-systemd v0.0.0-20180201102105-176f85496f4e // indirect
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7ac // indirect
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/dgrijalva/jwt-go v3.2.0+incompatible // indirect
github.com/docker/docker v1.13.1+incompatible // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
Expand Down
5 changes: 2 additions & 3 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ github.com/coreos/go-semver v0.3.0 h1:wkHLiw0WNATZnSG7epLsujiMCgPAc9xhjJ4tgnAxmf
github.com/coreos/go-semver v0.3.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
github.com/coreos/go-systemd v0.0.0-20180201102105-176f85496f4e h1:Ug6mBD9bXZ40KoHHvg4vH7EcJAlm6y43MTrO7Mzv7gg=
github.com/coreos/go-systemd v0.0.0-20180201102105-176f85496f4e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7ac h1:w01sHbSV7G60T5n+8AiLGi9eciuXp687ufXPvB9dUUQ=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7ac/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
Expand Down Expand Up @@ -161,7 +161,6 @@ github.com/kubernetes-sigs/kube-batch v0.4.2 h1:CcFsYiHz1d9adH/jz7nFmjkdZnkPoq9o
github.com/kubernetes-sigs/kube-batch v0.4.2/go.mod h1:UI2ZUDvosfoXEO3dbyttgTBXsYMSvqBJ7CL84giPlxQ=
github.com/kubernetes/api v0.0.0-20190222213804-5cb15d344471 h1:arxwmCNOF7deAm6SuG3pBDAKPrs4/PJ5fsSvTc41GEk=
github.com/kubernetes/api v0.0.0-20190222213804-5cb15d344471/go.mod h1:o5K7QWn2BV3MxynyVmAM1hhJxCL/FTrSymRIVwXDw58=
github.com/kubernetes/code-generator v0.0.0-20181117043124-c2090bec4d9b/go.mod h1:GXXF2gdS/LDGLo7HilXGuIQfo5nLQDhB4eUn/2UMPnk=
github.com/kubernetes/kube-openapi v0.0.0-20180719232738-d8ea2fe547a4 h1:Eivl3ga1o75VOk5L1hS/ARWe66fmbc63ieOobAvCWGU=
github.com/kubernetes/kube-openapi v0.0.0-20180719232738-d8ea2fe547a4/go.mod h1:yc69piAq6DZCX1Cett9uZEiB1Lyw4TVy4E58dGKwHFc=
github.com/mailru/easyjson v0.0.0-20180823135443-60711f1a8329 h1:2gxZ0XQIU/5z3Z3bUBu+FXuk2pFbkN6tcwi/pjyaDic=
Expand Down
9 changes: 5 additions & 4 deletions pkg/apis/aitrainingjob/validation/validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@ package validation

import (
"fmt"

trainingjobv1 "github.com/elasticdeeplearning/trainingjob-operator/pkg/apis/aitrainingjob/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog"
)

// ValidateV1TrainingJobSpec checks that the v1.Trainingjob is valid.
func ValidateV1TrainingJobSpec(job *trainingjobv1.Trainingjob) error {
return validateV1ReplicaSpecs(job.ReplicaSpecs)
func ValidateV1TrainingJobSpec(jobSpec *trainingjobv1.TrainingJobSpec) error {
return validateV1ReplicaSpecs(jobSpec.ReplicaSpecs)
}

func validateV1ReplicaSpecs(specs map[trainingjobv1.ReplicaName]*trainingjobv1.ReplicaSpec) error {
Expand All @@ -23,7 +24,7 @@ func validateV1ReplicaSpecs(specs map[trainingjobv1.ReplicaName]*trainingjobv1.R
for _, container := range value.Template.Spec.Containers {
if container.Image == "" {
msg := fmt.Sprintf("Trainingjob is not valid: Image is undefined in the container of %v", rType)
log.Error(msg)
klog.Error(msg)
return fmt.Errorf(msg)
}
}
Expand Down
62 changes: 33 additions & 29 deletions pkg/controller/pod.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

trainingjobv1 "github.com/elasticdeeplearning/trainingjob-operator/pkg/apis/aitrainingjob/v1"
"github.com/kubeflow/common/job_controller"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -17,7 +18,6 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/klog"
"k8s.io/kubernetes/pkg/controller"
"github.com/kubeflow/common/job_controller"
)

func (tc *TrainingJobController) addPod(obj interface{}) {
Expand Down Expand Up @@ -216,14 +216,14 @@ func (tc *TrainingJobController) reconcilePods(
(spec.RestartLimit != nil && job.Status.RestartCountes[rtype] < *spec.RestartLimit) {
updateRestartCount(job, rtype)
msg = fmt.Sprintf("restart times is %d, %s ", job.Status.RestartCountes[rtype], msg)
// Restart the pod
// Restart the pod
if spec.RestartScope == trainingjobv1.RestartScopePod {
klog.Warningf("According to restartscope, need to restart the pod: %v.%v", pod.Namespace, pod.Name)
deletepod(pod.Namespace, pod.Name, job)
updateTrainingJobReplicaStatuses(job, rtype, replicaPods)
return nil, trainingjobv1.TrainingJobPhaseRestarting, msg
}
// Restart the replica pods
// Restart the replica pods
if spec.RestartScope == trainingjobv1.RestartScopeReplica {
klog.Warningf("According to restartscope, need to restart all pods of the replica: %v", rtype)
for _, podSlice := range podSlices {
Expand All @@ -234,9 +234,9 @@ func (tc *TrainingJobController) reconcilePods(
updateTrainingJobReplicaStatuses(job, rtype, replicaPods)
return nil, trainingjobv1.TrainingJobPhaseRestarting, msg
}
// Restart all pods
// Restart all pods
if spec.RestartScope == trainingjobv1.RestartScopeAll {
klog.Warningf("According to restartscope, need to restart all pods", )
klog.Warningf("According to restartscope, need to restart all pods")
for _, pod := range pods {
deletepod(pod.Namespace, pod.Name, job)
}
Expand Down Expand Up @@ -399,7 +399,7 @@ func (tc *TrainingJobController) reconcileContainers(
message = fmt.Sprintf("%s, %s", pod.Status.Reason, pod.Status.Message)
}
}
klog.Infof("message %v",message)
klog.Infof("message %v", message)
phase = trainingjobv1.TrainingJobPhaseFailed
return phase, isRestart, message
}
Expand Down Expand Up @@ -571,57 +571,57 @@ func (tc *TrainingJobController) setEnv(
replicaName := strings.ToUpper(string(rt))
hostsEnv = append(hostsEnv,
[]corev1.EnvVar{
corev1.EnvVar{
{
Name: fmt.Sprintf("%s_INSTANCES", replicaName),
Value: strings.Join(instances, ","),
},
corev1.EnvVar{
{
Name: fmt.Sprintf("%s_INSTANCES_NUM", replicaName),
Value: strconv.Itoa(len(instances)),
},
corev1.EnvVar{
{
Name: fmt.Sprintf("%s_PORTS", replicaName),
Value: strings.Join(portsStr, ","),
},
corev1.EnvVar{
{
Name: fmt.Sprintf("%s_PORTS_NUM", replicaName),
Value: strconv.Itoa(len(portsStr)),
},
corev1.EnvVar{
{
Name: fmt.Sprintf("%s_HOSTS", replicaName),
Value: strings.Join(hosts, ","),
},
corev1.EnvVar{
{
Name: fmt.Sprintf("%s_HOSTS_NUM", replicaName),
Value: strconv.Itoa(len(hosts)),
},
}...)
}
hostsEnv = append(hostsEnv,
[]corev1.EnvVar{
corev1.EnvVar{
{
Name: trainingjobv1.TrainingJobReplicaNameEnv,
Value: rtype,
},
corev1.EnvVar{
{
Name: trainingjobv1.TrainingJobReplicaIndexEnv,
Value: index,
},
corev1.EnvVar{
{
Name: trainingjobv1.TrainingJobReplicaRestartCountEnv,
Value: restartCount,
},
corev1.EnvVar{
{
Name: trainingjobv1.TrainingJobServiceEnv,
Value: fmt.Sprintf("%v.%v",
tc.GenGeneralName(job.Name, rtype, index),
job.Namespace),
},
corev1.EnvVar{
{
Name: trainingjobv1.TrainingJobNameEnv,
Value: job.Name,
},
corev1.EnvVar{
{
Name: trainingjobv1.TrainingJobNamespaceEnv,
Value: job.Namespace,
},
Expand Down Expand Up @@ -651,46 +651,50 @@ func (tc *TrainingJobController) setEnv(
return nil
}

// FilterPodsForReplicaType returns the pods with type replicaType
func (tc *TrainingJobController) FilterPodsForReplicaType(pods []*corev1.Pod, replicaType string) ([]*corev1.Pod, error) {
var result []*corev1.Pod

replicaSelector := &metav1.LabelSelector{
MatchLabels: make(map[string]string),
MatchLabels: map[string]string{
trainingjobv1.TrainingJobReplicaName: replicaType,
},
}

replicaSelector.MatchLabels[trainingjobv1.TrainingJobReplicaName] = replicaType

for _, pod := range pods {
selector, err := metav1.LabelSelectorAsSelector(replicaSelector)
if err != nil {
return nil, err
}
if !selector.Matches(labels.Set(pod.Labels)) {
continue
if selector.Matches(labels.Set(pod.Labels)) {
result = append(result, pod)
}
result = append(result, pod)
}
return result, nil
}

// GetPodSlices slices pods based on their index label.
// The ith element in the returned slice is a slice of pods with index i.
func (tc *TrainingJobController) GetPodSlices(pods []*corev1.Pod, replicas int) [][]*corev1.Pod {
podSlices := make([][]*corev1.Pod, replicas)
for _, pod := range pods {
if _, ok := pod.Labels[trainingjobv1.TrainingJobReplicaIndex]; !ok {
indexstr, ok := pod.Labels[trainingjobv1.TrainingJobReplicaIndex]
if !ok {
klog.Warning("The pod do not have the index label.")
continue
}
index, err := strconv.Atoi(pod.Labels[trainingjobv1.TrainingJobReplicaIndex])
index, err := strconv.Atoi(indexstr)
if err != nil {
klog.Warningf("Error when strconv.Atoi: %v", err)
continue
}
if index < 0 || index >= replicas {
klog.Warningf("The label index is not expected: %d", index)
} else {
klog.V(4).Infof("index %d, pod %v", index, pod.Name)
podSlices[index] = append(podSlices[index], pod)
continue
}

klog.V(4).Infof("index %d, pod %v", index, pod.Name)
podSlices[index] = append(podSlices[index], pod)
}
return podSlices
}