-
Notifications
You must be signed in to change notification settings - Fork 950
feat(runtimes): add support for launcher resource allocation in MPI jobs #2653
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 5 commits
099d230
005d4d1
2c78ca3
999c9b8
0ebdef0
2b83fae
f02f3e4
8ab0ab0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -101,9 +101,43 @@ func (b *Builder) Initializer(trainJob *trainer.TrainJob) *Builder { | |
| return b | ||
| } | ||
|
|
||
| // isRunLauncherAsNode returns true if runLauncherAsNode is set to true in the MPI policy. | ||
| func (b *Builder) isRunLauncherAsNode(info *runtime.Info) bool { | ||
| return info.RuntimePolicy.MLPolicySource != nil && | ||
| info.RuntimePolicy.MLPolicySource.MPI != nil && | ||
| info.RuntimePolicy.MLPolicySource.MPI.RunLauncherAsNode != nil && | ||
| *info.RuntimePolicy.MLPolicySource.MPI.RunLauncherAsNode | ||
| } | ||
|
|
||
| // Trainer updates JobSet values for the trainer Job. | ||
| func (b *Builder) Trainer(info *runtime.Info, trainJob *trainer.TrainJob) *Builder { | ||
| for i, rJob := range b.Spec.ReplicatedJobs { | ||
| // TODO (andreyvelich): For MPI we should apply container resources to the Node ReplicatedJob also. | ||
| // Eventually, we should find better way to propagate resources from TrainJob to JobSet. | ||
| if b.isRunLauncherAsNode(info) && *rJob.Name == constants.Node { | ||
| for j, container := range rJob.Template.Spec.Template.Spec.Containers { | ||
| if *container.Name == constants.Node { | ||
| if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil { | ||
| if resourcesPerNode := jobTrainer.ResourcesPerNode; resourcesPerNode != nil && | ||
| (resourcesPerNode.Limits != nil || resourcesPerNode.Requests != nil) { | ||
| requirements := corev1ac.ResourceRequirements() | ||
| if limits := resourcesPerNode.Limits; limits != nil { | ||
| requirements.WithLimits(limits) | ||
| } | ||
| if requests := resourcesPerNode.Requests; requests != nil { | ||
| requirements.WithRequests(requests) | ||
| } | ||
| b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j]. | ||
| WithResources(requirements) | ||
| } | ||
| apply.UpsertEnvVars( | ||
| &b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env, | ||
| apply.EnvVars(jobTrainer.Env...)..., | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe something like this to avoid duplicating the resources logic: ancestor := ""
jobMetadata := rJob.Template.ObjectMetaApplyConfiguration
if jobMetadata != nil && jobMetadata.Labels != nil {
ancestor, _ = jobMetadata.Labels[constants.LabelTrainJobAncestor]
}
if ancestor == constants.AncestorTrainer {
// TODO: Support multiple replicas ('.template.spec.replicatedJobs[*].replicas') for replicated Jobs.
// REF: https://github.com/kubeflow/trainer/issues/2318
b.Spec.ReplicatedJobs[i].Replicas = ptr.To[int32](1)
// Update the Parallelism and Completions values for the Trainer Job.
b.Spec.ReplicatedJobs[i].Template.Spec.Parallelism = info.FindPodSetByAncestor(constants.AncestorTrainer).Count
b.Spec.ReplicatedJobs[i].Template.Spec.Completions = info.FindPodSetByAncestor(constants.AncestorTrainer).Count
// Update values for the Trainer container.
for j, container := range rJob.Template.Spec.Template.Spec.Containers {
if *container.Name == constants.Node {
// Update values from the TrainJob trainer.
if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil {
if image := jobTrainer.Image; image != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Image = image
}
if command := jobTrainer.Command; command != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Command = command
}
if args := jobTrainer.Args; args != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Args = args
}
apply.UpsertEnvVars(
&b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env,
apply.EnvVars(jobTrainer.Env...)...,
)
}
}
}
}
// Apply trainer configuration to node containers.
if ancestor == constants.AncestorTrainer ||
if b.isRunLauncherAsNode(info) && *rJob.Name == constants.Node {
for j, container := range rJob.Template.Spec.Template.Spec.Containers {
if *container.Name == constants.Node {
// Update values from the TrainJob trainer.
if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil {
if resourcesPerNode := jobTrainer.ResourcesPerNode; resourcesPerNode != nil &&
(resourcesPerNode.Limits != nil || resourcesPerNode.Requests != nil) {
requirements := corev1ac.ResourceRequirements()
if limits := resourcesPerNode.Limits; limits != nil {
requirements.WithLimits(limits)
}
if requests := resourcesPerNode.Requests; requests != nil {
requirements.WithRequests(requests)
}
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].
WithResources(requirements)
}
}
}
}
}
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Looks good! |
||
| jobMetadata := rJob.Template.ObjectMetaApplyConfiguration | ||
| if jobMetadata == nil || jobMetadata.Labels == nil { | ||
| continue | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,6 +125,16 @@ func (m *MPI) EnforceMLPolicy(info *runtime.Info, trainJob *trainer.TrainJob) er | |
|
|
||
| if trainJob.Spec.Trainer != nil && trainJob.Spec.Trainer.NumProcPerNode != nil { | ||
| info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(trainJob.Spec.Trainer.NumProcPerNode.IntValue())) | ||
| // If numProcPerNode is set to 1 in runtime, we make it equal to number of GPUs. | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @tenzen-y @astefanutti @Electronic-Waste I auto set number of slots for MPI plugin equal to number of GPUs, if TrainJob doesn't set NumProcPerNode and NumProcPerNode = 1 (which is default value in our MPI runtimes). This will help users to use DeepSpeed runtime more easily without modifying the
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This logic SGTM
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. if TrainJob doesn't set NumProcPerNode, would that make sense to set it to the number of GPUs if NumProcPerNode < num GPUs?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @astefanutti I would suggest that we always set NumProcPerNode == num GPUs if the default value: 1 is set in NumProcPerNode.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @andreyvelich right, better not override user-defined values. |
||
| } else if *info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode == 1 { | ||
| resourcesPerNode := ptr.Deref(runtime.ExtractResourcePerNodeFromRuntime(info), corev1.ResourceRequirements{}) | ||
| if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil && jobTrainer.ResourcesPerNode != nil { | ||
| resourcesPerNode = ptr.Deref(jobTrainer.ResourcesPerNode, corev1.ResourceRequirements{}) | ||
| } | ||
| gpuQ := runtime.GetNumGPUPerNode(&resourcesPerNode) | ||
| if gpuQ > 1 { | ||
| info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(gpuQ)) | ||
| } | ||
| } | ||
|
|
||
| // Add Secret and ConfigMap volumes to the Info object | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we want to propagate the environment variables as well?
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking that env should be propagated as well, for now.
We need to investigate whether
mpiruncan read env from the Worker nodes.