Skip to content

Commit 57937d1

Browse files
authored
Merge pull request #14 from m3ngyang/tj-parser
jobparser for crd TrainingJob
2 parents 7b540c1 + 39f1380 commit 57937d1

File tree

5 files changed

+391
-24
lines changed

5 files changed

+391
-24
lines changed

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
*~
22
vendor/
33
.glide/
4+
.vscode/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package v1
2+
3+
import (
4+
"encoding/json"
5+
"fmt"
6+
)
7+
8+
// Elastic returns true if the job can scale to more workers.
9+
func (s *TrainingJob) Elastic() bool {
10+
return s.Spec.Trainer.MinInstance < s.Spec.Trainer.MaxInstance
11+
}
12+
13+
// GPU convert Resource Limit Quantity to int
14+
func (s *TrainingJob) GPU() int {
15+
q := s.Spec.Trainer.Resources.Limits.NvidiaGPU()
16+
gpu, ok := q.AsInt64()
17+
if !ok {
18+
// FIXME: treat errors
19+
gpu = 0
20+
}
21+
return int(gpu)
22+
}
23+
24+
// NeedGPU returns true if the job need GPU resource to run.
25+
func (s *TrainingJob) NeedGPU() bool {
26+
return s.GPU() > 0
27+
}
28+
29+
func (s *TrainingJob) String() string {
30+
b, _ := json.MarshalIndent(s, "", " ")
31+
return fmt.Sprintf("%s", b)
32+
}

pkg/apis/paddlepaddle/v1/types.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ type TrainingJobSpec struct {
5555
Passes int `json:"passes,omitempty"`
5656
Volumes []corev1.Volume `json:"volumes"`
5757
VolumeMounts []corev1.VolumeMount `json:"VolumeMounts"`
58+
//TODO(m3ngyang) simplify the structure of sub-resource(mengyang)
5859
//TrainingJob components.
5960
Master MasterSpec `json:"master"`
6061
Pserver PserverSpec `json:"pserver"`
@@ -111,12 +112,12 @@ type TrainerJobScaleStatus struct {
111112
type TrainingResourceType string
112113

113114
const (
114-
// MASTER is the master name of TrainingResourceType.
115-
MASTER TrainingResourceType = "MASTER"
116-
// PSERVER is the pserver name of TrainingResourceType.
117-
PSERVER TrainingResourceType = "PSERVER"
118-
// TRAINER is the trainer name of TrainingResourceType.
119-
TRAINER TrainingResourceType = "TRAINER"
115+
// Master is the master name of TrainingResourceType.
116+
Master TrainingResourceType = "MASTER"
117+
// Pserver is the pserver name of TrainingResourceType.
118+
Pserver TrainingResourceType = "PSERVER"
119+
// Trainer is the trainer name of TrainingResourceType.
120+
Trainer TrainingResourceType = "TRAINER"
120121
)
121122

122123
// ResourceState is the state of a type of resource

pkg/updater/jobparser.go

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
/* Copyright (c) 2016 PaddlePaddle Authors All Rights Reserved.
2+
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
7+
http://www.apache.org/licenses/LICENSE-2.0
8+
9+
Unless required by applicable law or agreed to in writing, software
10+
distributed under the License is distributed on an "AS IS" BASIS,
11+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
See the License for the specific language governing permissions and
13+
limitations under the License. */
14+
15+
package updater
16+
17+
import (
18+
"errors"
19+
"fmt"
20+
"strconv"
21+
22+
batchv1 "k8s.io/api/batch/v1"
23+
corev1 "k8s.io/api/core/v1"
24+
v1beta1 "k8s.io/api/extensions/v1beta1"
25+
apiresource "k8s.io/apimachinery/pkg/api/resource"
26+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27+
28+
paddlev1 "github.com/paddlepaddle/edl/pkg/apis/paddlepaddle/v1"
29+
)
30+
31+
const (
32+
imagePullPolicy = "Always"
33+
)
34+
35+
// DefaultJobParser implement a basic JobParser.
36+
type DefaultJobParser struct {
37+
}
38+
39+
// setDefaultAndValidate updates default values for the added job and validates the fields.
40+
func setDefaultAndValidate(job *paddlev1.TrainingJob) error {
41+
// Fill in default values
42+
// FIXME: Need to test. What is the value if specified "omitempty"
43+
if job.Spec.Port == 0 {
44+
job.Spec.Port = 7164
45+
}
46+
if job.Spec.PortsNum == 0 {
47+
job.Spec.PortsNum = 1
48+
}
49+
if job.Spec.PortsNumForSparse == 0 {
50+
job.Spec.PortsNumForSparse = 1
51+
}
52+
if job.Spec.Image == "" {
53+
job.Spec.Image = "paddlepaddle/paddlecloud-job"
54+
}
55+
if job.Spec.Passes == 0 {
56+
job.Spec.Passes = 1
57+
}
58+
59+
if !job.Spec.FaultTolerant && job.Elastic() {
60+
return errors.New("max-instances should equal to min-instances when fault_tolerant is disabled")
61+
}
62+
// TODO: add validations.(helin)
63+
return nil
64+
}
65+
66+
// NewTrainingJob generates a whole structure of TrainingJob
67+
func (p *DefaultJobParser) NewTrainingJob(job *paddlev1.TrainingJob) (*paddlev1.TrainingJob, error) {
68+
if err := setDefaultAndValidate(job); err != nil {
69+
return nil, err
70+
}
71+
72+
useHostNetwork := job.Spec.HostNetwork
73+
if job.Spec.FaultTolerant {
74+
job.Spec.Master.ReplicaSpec = parseToMaster(job)
75+
if useHostNetwork {
76+
job.Spec.Master.ReplicaSpec.Spec.Template.Spec.HostNetwork = true
77+
}
78+
}
79+
job.Spec.Pserver.ReplicaSpec = parseToPserver(job)
80+
job.Spec.Trainer.ReplicaSpec = parseToTrainer(job)
81+
if useHostNetwork {
82+
job.Spec.Pserver.ReplicaSpec.Spec.Template.Spec.HostNetwork = true
83+
job.Spec.Trainer.ReplicaSpec.Spec.Template.Spec.HostNetwork = true
84+
}
85+
return job, nil
86+
}
87+
88+
// parseToPserver generate a pserver replicaset resource according to "TrainingJob" resource specs.
89+
func parseToPserver(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet {
90+
replicas := int32(job.Spec.Pserver.MinInstance)
91+
var command []string
92+
// FIXME: refine these part.(typhoonzero)
93+
if job.Spec.FaultTolerant {
94+
command = []string{"paddle_k8s", "start_pserver"}
95+
} else {
96+
command = []string{"paddle_k8s", "start_new_pserver"}
97+
}
98+
99+
return &v1beta1.ReplicaSet{
100+
TypeMeta: metav1.TypeMeta{
101+
Kind: "extensions/v1beta1",
102+
APIVersion: "ReplicaSet",
103+
},
104+
ObjectMeta: metav1.ObjectMeta{
105+
Name: job.ObjectMeta.Name + "-pserver",
106+
Namespace: job.ObjectMeta.Namespace,
107+
},
108+
Spec: v1beta1.ReplicaSetSpec{
109+
Replicas: &replicas,
110+
Template: corev1.PodTemplateSpec{
111+
ObjectMeta: metav1.ObjectMeta{
112+
Labels: map[string]string{"paddle-job-pserver": job.ObjectMeta.Name},
113+
},
114+
Spec: corev1.PodSpec{
115+
Volumes: job.Spec.Volumes,
116+
Containers: []corev1.Container{
117+
corev1.Container{
118+
Name: "pserver",
119+
Image: job.Spec.Image,
120+
Ports: podPorts(job),
121+
Env: podEnv(job),
122+
Command: command,
123+
Resources: job.Spec.Pserver.Resources,
124+
},
125+
},
126+
},
127+
},
128+
},
129+
}
130+
}
131+
132+
// parseToTrainer parse TrainingJob to a kubernetes job resource.
133+
func parseToTrainer(job *paddlev1.TrainingJob) *batchv1.Job {
134+
replicas := int32(job.Spec.Trainer.MinInstance)
135+
var command []string
136+
if job.Spec.FaultTolerant {
137+
command = []string{"paddle_k8s", "start_trainer"}
138+
} else {
139+
command = []string{"paddle_k8s", "start_new_trainer"}
140+
}
141+
142+
return &batchv1.Job{
143+
TypeMeta: metav1.TypeMeta{
144+
Kind: "Job",
145+
APIVersion: "batch/v1",
146+
},
147+
ObjectMeta: metav1.ObjectMeta{
148+
Name: job.ObjectMeta.Name + "-trainer",
149+
Namespace: job.ObjectMeta.Namespace,
150+
},
151+
Spec: batchv1.JobSpec{
152+
Parallelism: &replicas,
153+
Template: corev1.PodTemplateSpec{
154+
ObjectMeta: metav1.ObjectMeta{
155+
Labels: map[string]string{"paddle-job": job.ObjectMeta.Name},
156+
},
157+
Spec: corev1.PodSpec{
158+
Volumes: job.Spec.Volumes,
159+
Containers: []corev1.Container{
160+
corev1.Container{
161+
Name: "trainer",
162+
Image: job.Spec.Image,
163+
ImagePullPolicy: imagePullPolicy,
164+
Command: command,
165+
VolumeMounts: job.Spec.VolumeMounts,
166+
Ports: podPorts(job),
167+
Env: podEnv(job),
168+
Resources: job.Spec.Trainer.Resources,
169+
},
170+
},
171+
RestartPolicy: "Never",
172+
},
173+
},
174+
},
175+
}
176+
}
177+
178+
func masterResource(job *paddlev1.TrainingJob) *corev1.ResourceRequirements {
179+
// TODO(gongwb): config master resource?
180+
return &corev1.ResourceRequirements{
181+
Limits: corev1.ResourceList{
182+
"cpu": *apiresource.NewQuantity(int64(2), apiresource.DecimalSI),
183+
"memory": apiresource.MustParse("1Gi"),
184+
},
185+
Requests: corev1.ResourceList{
186+
"cpu": *apiresource.NewQuantity(int64(1), apiresource.DecimalSI),
187+
"memory": apiresource.MustParse("500Mi"),
188+
},
189+
}
190+
}
191+
192+
func getEtcdPodSpec(job *paddlev1.TrainingJob) *corev1.Container {
193+
command := []string{"etcd", "-name", "etcd0",
194+
"-advertise-client-urls", "http://$(POD_IP):2379,http://$(POD_IP):4001",
195+
"-listen-client-urls", "http://0.0.0.0:2379,http://0.0.0.0:4001",
196+
"-initial-advertise-peer-urls", "http://$(POD_IP):2380",
197+
"-listen-peer-urls", "http://0.0.0.0:2380",
198+
"-initial-cluster", "etcd0=http://$(POD_IP):2380",
199+
"-initial-cluster-state", "new"}
200+
201+
return &corev1.Container{
202+
Name: "etcd",
203+
Image: "quay.io/coreos/etcd:v3.2.1",
204+
ImagePullPolicy: imagePullPolicy,
205+
// TODO(gongwb): etcd ports?
206+
Env: podEnv(job),
207+
Command: command,
208+
}
209+
}
210+
211+
// parseToMaster parse TrainingJob to a kubernetes replicaset resource.
212+
func parseToMaster(job *paddlev1.TrainingJob) *v1beta1.ReplicaSet {
213+
replicas := int32(1)
214+
// FIXME: refine these part.(typhoonzero)
215+
command := []string{"paddle_k8s", "start_master"}
216+
217+
return &v1beta1.ReplicaSet{
218+
TypeMeta: metav1.TypeMeta{
219+
Kind: "extensions/v1beta1",
220+
APIVersion: "ReplicaSet",
221+
},
222+
ObjectMeta: metav1.ObjectMeta{
223+
Name: job.ObjectMeta.Name + "-master",
224+
Namespace: job.ObjectMeta.Namespace,
225+
},
226+
Spec: v1beta1.ReplicaSetSpec{
227+
Replicas: &replicas,
228+
Template: corev1.PodTemplateSpec{
229+
ObjectMeta: metav1.ObjectMeta{
230+
Labels: map[string]string{"paddle-job-master": job.ObjectMeta.Name},
231+
},
232+
Spec: corev1.PodSpec{
233+
Volumes: job.Spec.Volumes,
234+
Containers: []corev1.Container{
235+
corev1.Container{
236+
Name: "master",
237+
Image: job.Spec.Image,
238+
ImagePullPolicy: imagePullPolicy,
239+
Ports: masterPorts(job),
240+
Command: command,
241+
VolumeMounts: job.Spec.VolumeMounts,
242+
Resources: *masterResource(job),
243+
},
244+
*getEtcdPodSpec(job),
245+
},
246+
},
247+
},
248+
},
249+
}
250+
}
251+
252+
// general functions that pserver, trainer use the same
253+
func podPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort {
254+
portsTotal := job.Spec.PortsNum + job.Spec.PortsNumForSparse
255+
ports := make([]corev1.ContainerPort, 0)
256+
basePort := int32(job.Spec.Port)
257+
for i := 0; i < portsTotal; i++ {
258+
ports = append(ports, corev1.ContainerPort{
259+
Name: fmt.Sprintf("jobport-%d", basePort),
260+
ContainerPort: basePort,
261+
})
262+
basePort++
263+
}
264+
return ports
265+
}
266+
267+
func masterPorts(job *paddlev1.TrainingJob) []corev1.ContainerPort {
268+
ports := []corev1.ContainerPort{
269+
corev1.ContainerPort{
270+
Name: "master-port",
271+
ContainerPort: 8080,
272+
},
273+
corev1.ContainerPort{
274+
Name: "etcd-port",
275+
ContainerPort: 2379,
276+
},
277+
}
278+
return ports
279+
}
280+
281+
func podEnv(job *paddlev1.TrainingJob) []corev1.EnvVar {
282+
needGPU := "0"
283+
if job.NeedGPU() {
284+
needGPU = "1"
285+
}
286+
trainerCount := 1
287+
if job.NeedGPU() {
288+
q := job.Spec.Trainer.Resources.Requests.NvidiaGPU()
289+
trainerCount = int(q.Value())
290+
} else {
291+
q := job.Spec.Trainer.Resources.Requests.Cpu()
292+
// FIXME: CPU resource value can be less than 1.
293+
trainerCount = int(q.Value())
294+
}
295+
296+
return []corev1.EnvVar{
297+
corev1.EnvVar{Name: "PADDLE_JOB_NAME", Value: job.ObjectMeta.Name},
298+
// NOTICE: TRAINERS, PSERVERS, PADDLE_INIT_NUM_GRADIENT_SERVERS
299+
// these env are used for non-faulttolerant training,
300+
// use min-instance all the time. When job is elastic,
301+
// these envs are not used.
302+
corev1.EnvVar{Name: "TRAINERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)},
303+
corev1.EnvVar{Name: "PSERVERS", Value: strconv.Itoa(job.Spec.Pserver.MinInstance)},
304+
corev1.EnvVar{Name: "ENTRY", Value: job.Spec.Trainer.Entrypoint},
305+
// FIXME: TOPOLOGY deprecated
306+
corev1.EnvVar{Name: "TOPOLOGY", Value: job.Spec.Trainer.Entrypoint},
307+
corev1.EnvVar{Name: "TRAINER_PACKAGE", Value: job.Spec.Trainer.Workspace},
308+
corev1.EnvVar{Name: "PADDLE_INIT_PORT", Value: strconv.Itoa(job.Spec.Port)},
309+
// PADDLE_INIT_TRAINER_COUNT should be same to gpu number when use gpu
310+
// and cpu cores when using cpu
311+
corev1.EnvVar{Name: "PADDLE_INIT_TRAINER_COUNT", Value: strconv.Itoa(trainerCount)},
312+
corev1.EnvVar{Name: "PADDLE_INIT_PORTS_NUM", Value: strconv.Itoa(job.Spec.PortsNum)},
313+
corev1.EnvVar{Name: "PADDLE_INIT_PORTS_NUM_FOR_SPARSE", Value: strconv.Itoa(job.Spec.PortsNumForSparse)},
314+
corev1.EnvVar{Name: "PADDLE_INIT_NUM_GRADIENT_SERVERS", Value: strconv.Itoa(job.Spec.Trainer.MinInstance)},
315+
corev1.EnvVar{Name: "PADDLE_INIT_NUM_PASSES", Value: strconv.Itoa(job.Spec.Passes)},
316+
corev1.EnvVar{Name: "PADDLE_INIT_USE_GPU", Value: needGPU},
317+
corev1.EnvVar{Name: "LD_LIBRARY_PATH", Value: "/usr/local/cuda/lib64"},
318+
corev1.EnvVar{Name: "NAMESPACE", ValueFrom: &corev1.EnvVarSource{
319+
FieldRef: &corev1.ObjectFieldSelector{
320+
FieldPath: "metadata.namespace",
321+
},
322+
}},
323+
corev1.EnvVar{Name: "POD_IP", ValueFrom: &corev1.EnvVarSource{
324+
FieldRef: &corev1.ObjectFieldSelector{
325+
FieldPath: "status.podIP",
326+
},
327+
}},
328+
}
329+
}
330+
331+
// general functions end

0 commit comments

Comments
 (0)