feat(runtimes): add support for launcher resource allocation in MPI jobs#2653
Conversation
Pull Request Test Coverage Report for Build 18651664516Details
💛 - Coveralls |
c0d40e8 to
6925f41
Compare
andreyvelich
left a comment
There was a problem hiding this comment.
Sorry for the late reply @jskswamy!
Please can you rebase your PR, so we can take a look!
/assign @tenzen-y @Electronic-Waste @astefanutti
6925f41 to
0859508
Compare
|
I've rebased the changes, kindly have a look at the changes |
andreyvelich
left a comment
There was a problem hiding this comment.
Thank you for this @jskswamy!
/assign @tenzen-y @astefanutti Appreciate your review!
|
@jskswamy Can you also update the title please to align with conventions ? |
| // Update values from the TrainJob trainer. | ||
| if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil { | ||
| if image := jobTrainer.Image; image != nil { | ||
| b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Image = image |
There was a problem hiding this comment.
But that will also override the image for the MPI workers, isn't ?
There was a problem hiding this comment.
Yes, you are right, this will override the image. I have provided a fix for it, kindly have a look at it
| // Skip if container is neither node nor launcher | ||
| if *container.Name != constants.Node && *container.Name != constants.Launcher { | ||
| continue | ||
| } | ||
|
|
||
| // Skip launcher container if runLauncherAsNode is false | ||
| if *container.Name == constants.Launcher && !b.isRunLauncherAsNode(info) { | ||
| continue | ||
| } |
There was a problem hiding this comment.
@tenzen-y @astefanutti How do you like this approach ?
Shall we check that resources need to updated in the JobSet plugin or in the MPI Plugin ?
E.g. we can use the PodSet in the Info object.
There was a problem hiding this comment.
Yes it seems like it would be a better separation of concerns to update the launcher resources in the MPI plugin.
I think the current PR can be merged as is to enable fix the current behavior and follow-up to have a proper design.
0859508 to
9052324
Compare
9052324 to
1122a5c
Compare
I've update the commit message and the title as well according to the conventions |
|
/milestone v2.1 |
| // Apply resources to both Node and Launcher containers (when launcher is included) | ||
| if resourcesPerNode := jobTrainer.ResourcesPerNode; resourcesPerNode != nil && | ||
| (resourcesPerNode.Limits != nil || resourcesPerNode.Requests != nil) { | ||
| requirements := corev1ac.ResourceRequirements() |
There was a problem hiding this comment.
How does that impact the resources requirement for the RuntimeInfo, that Kueue relies on for example?
| } | ||
|
|
||
| // Update the Parallelism and Completions values for the Trainer Job. | ||
| if ancestor, ok := jobMetadata.Labels[constants.LabelTrainJobAncestor]; ok && ancestor == constants.AncestorTrainer { |
There was a problem hiding this comment.
To make sure, why not adding the trainer.kubeflow.org/trainjob-ancestor-step: trainer label on the node ReplicatedJob as well?
There was a problem hiding this comment.
We can't do that since it will also override the container spec using .trainer parameters: #2653 (comment)
However, we should always run OpenSSH server there.
There was a problem hiding this comment.
Can't that be controlled with the container name?
There was a problem hiding this comment.
Currently, we don't have such capability. We use this label to mark ReplicatedJob to which we should apply values from the Trainer, Model Initializer, and Dataset Initializer spec. The container name is always must be equal to node to apply values from the Trainer values.
@astefanutti Any thoughts on how we can enhance it ?
|
As we discussed on today's call, @astefanutti will help to finalize this PR. |
|
/retitle feat(runtimes): add support for launcher resource allocation in MPI jobs |
|
/hold since we should fix the bug identified in Slack: https://cloud-native.slack.com/archives/C0742LDFZ4K/p1760719119325289?thread_ts=1760462777.817729&cid=C0742LDFZ4K |
|
Check out this pull request on See visual diffs & provide feedback on Jupyter Notebooks. Powered by ReviewNB |
|
|
||
| if trainJob.Spec.Trainer != nil && trainJob.Spec.Trainer.NumProcPerNode != nil { | ||
| info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(trainJob.Spec.Trainer.NumProcPerNode.IntValue())) | ||
| // If numProcPerNode is set to 1 in runtime, we make it equal to number of GPUs. |
There was a problem hiding this comment.
@tenzen-y @astefanutti @Electronic-Waste I auto set number of slots for MPI plugin equal to number of GPUs, if TrainJob doesn't set NumProcPerNode and NumProcPerNode = 1 (which is default value in our MPI runtimes).
This will help users to use DeepSpeed runtime more easily without modifying the numProcPerNode.
Let me know if that sounds good to you.
/assign @tenzen-y @astefanutti @Electronic-Waste
There was a problem hiding this comment.
if TrainJob doesn't set NumProcPerNode, would that make sense to set it to the number of GPUs if NumProcPerNode < num GPUs?
There was a problem hiding this comment.
@astefanutti I would suggest that we always set NumProcPerNode == num GPUs if the default value: 1 is set in NumProcPerNode.
If users manually override this value in the Runtime or in the TrainJob, we won't override it.
WDYT @astefanutti ?
There was a problem hiding this comment.
@andreyvelich right, better not override user-defined values.
Electronic-Waste
left a comment
There was a problem hiding this comment.
@andreyvelich Thanks for this. Some comments for you
|
|
||
| if trainJob.Spec.Trainer != nil && trainJob.Spec.Trainer.NumProcPerNode != nil { | ||
| info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(trainJob.Spec.Trainer.NumProcPerNode.IntValue())) | ||
| // If numProcPerNode is set to 1 in runtime, we make it equal to number of GPUs. |
| info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(trainJob.Spec.Trainer.NumProcPerNode.IntValue())) | ||
| // If numProcPerNode is set to 1 in runtime, we make it equal to number of GPUs. | ||
| } else if *info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode == 1 { | ||
| resourcesPerNode := ptr.Deref(torch.ExtractResourcePerNodeFromRuntime(info), corev1.ResourceRequirements{}) |
There was a problem hiding this comment.
It looks weird to reference the code in torch plugin. Can we make it global somewhere?
| if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil && jobTrainer.ResourcesPerNode != nil { | ||
| resourcesPerNode = ptr.Deref(jobTrainer.ResourcesPerNode, corev1.ResourceRequirements{}) | ||
| } | ||
| gpuQ := torch.GetNumGPUPerNode(&resourcesPerNode) |
There was a problem hiding this comment.
Same as above. And also we can combine the assign clause and if clause:
if gpuQ := torch.GetNumGPUPerNode(&resourcesPerNode); gpuQ > 1 {
info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(gpuQ))
}Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
| // TODO (andreyvelich): For MPI we should apply container resources to the Node ReplicatedJob also. | ||
| // Eventually, we should find better way to propagate resources from TrainJob to JobSet. | ||
| if b.isRunLauncherAsNode(info) && *rJob.Name == constants.Node { | ||
| for j, container := range rJob.Template.Spec.Template.Spec.Containers { | ||
| if *container.Name == constants.Node { | ||
| if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil { | ||
| if resourcesPerNode := jobTrainer.ResourcesPerNode; resourcesPerNode != nil && | ||
| (resourcesPerNode.Limits != nil || resourcesPerNode.Requests != nil) { | ||
| requirements := corev1ac.ResourceRequirements() | ||
| if limits := resourcesPerNode.Limits; limits != nil { | ||
| requirements.WithLimits(limits) | ||
| } | ||
| if requests := resourcesPerNode.Requests; requests != nil { | ||
| requirements.WithRequests(requests) | ||
| } | ||
| b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j]. | ||
| WithResources(requirements) | ||
| } | ||
| apply.UpsertEnvVars( | ||
| &b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env, | ||
| apply.EnvVars(jobTrainer.Env...)..., | ||
| ) | ||
| } | ||
| } | ||
| } | ||
| } |
There was a problem hiding this comment.
Maybe something like this to avoid duplicating the resources logic:
ancestor := ""
jobMetadata := rJob.Template.ObjectMetaApplyConfiguration
if jobMetadata != nil && jobMetadata.Labels != nil {
ancestor, _ = jobMetadata.Labels[constants.LabelTrainJobAncestor]
}
if ancestor == constants.AncestorTrainer {
// TODO: Support multiple replicas ('.template.spec.replicatedJobs[*].replicas') for replicated Jobs.
// REF: https://github.com/kubeflow/trainer/issues/2318
b.Spec.ReplicatedJobs[i].Replicas = ptr.To[int32](1)
// Update the Parallelism and Completions values for the Trainer Job.
b.Spec.ReplicatedJobs[i].Template.Spec.Parallelism = info.FindPodSetByAncestor(constants.AncestorTrainer).Count
b.Spec.ReplicatedJobs[i].Template.Spec.Completions = info.FindPodSetByAncestor(constants.AncestorTrainer).Count
// Update values for the Trainer container.
for j, container := range rJob.Template.Spec.Template.Spec.Containers {
if *container.Name == constants.Node {
// Update values from the TrainJob trainer.
if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil {
if image := jobTrainer.Image; image != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Image = image
}
if command := jobTrainer.Command; command != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Command = command
}
if args := jobTrainer.Args; args != nil {
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Args = args
}
apply.UpsertEnvVars(
&b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].Env,
apply.EnvVars(jobTrainer.Env...)...,
)
}
}
}
}
// Apply trainer configuration to node containers.
if ancestor == constants.AncestorTrainer ||
if b.isRunLauncherAsNode(info) && *rJob.Name == constants.Node {
for j, container := range rJob.Template.Spec.Template.Spec.Containers {
if *container.Name == constants.Node {
// Update values from the TrainJob trainer.
if jobTrainer := trainJob.Spec.Trainer; jobTrainer != nil {
if resourcesPerNode := jobTrainer.ResourcesPerNode; resourcesPerNode != nil &&
(resourcesPerNode.Limits != nil || resourcesPerNode.Requests != nil) {
requirements := corev1ac.ResourceRequirements()
if limits := resourcesPerNode.Limits; limits != nil {
requirements.WithLimits(limits)
}
if requests := resourcesPerNode.Requests; requests != nil {
requirements.WithRequests(requests)
}
b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j].
WithResources(requirements)
}
}
}
}
}| b.Spec.ReplicatedJobs[i].Template.Spec.Template.Spec.Containers[j]. | ||
| WithResources(requirements) | ||
| } | ||
| apply.UpsertEnvVars( |
There was a problem hiding this comment.
Do we want to propagate the environment variables as well?
There was a problem hiding this comment.
I was thinking that env should be propagated as well, for now.
We need to investigate whether mpirun can read env from the Worker nodes.
|
|
||
| if trainJob.Spec.Trainer != nil && trainJob.Spec.Trainer.NumProcPerNode != nil { | ||
| info.RuntimePolicy.MLPolicySource.MPI.NumProcPerNode = ptr.To(int32(trainJob.Spec.Trainer.NumProcPerNode.IntValue())) | ||
| // If numProcPerNode is set to 1 in runtime, we make it equal to number of GPUs. |
There was a problem hiding this comment.
if TrainJob doesn't set NumProcPerNode, would that make sense to set it to the number of GPUs if NumProcPerNode < num GPUs?
| ancestor = jobMetadata.Labels[constants.LabelTrainJobAncestor] | ||
| } | ||
| if ancestor, ok := jobMetadata.Labels[constants.LabelTrainJobAncestor]; ok && ancestor == constants.AncestorTrainer { | ||
| if ancestor == constants.AncestorTrainer { |
There was a problem hiding this comment.
Need to explore why the unit tests are failing.
/hold
Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
303be89 to
2b83fae
Compare
Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
| // Update the Parallelism and Completions values for the Trainer Job. | ||
| b.Spec.ReplicatedJobs[i].Template.Spec.Parallelism = info.FindPodSetByAncestor(constants.AncestorTrainer).Count | ||
| b.Spec.ReplicatedJobs[i].Template.Spec.Completions = info.FindPodSetByAncestor(constants.AncestorTrainer).Count |
There was a problem hiding this comment.
We assign Parallelism and Completions spec when we sync pod sets to JobSet template in the plugins
trainer/pkg/runtime/core/trainingruntime.go
Line 240 in f02f3e4
So we can remove these lines from the Builder.
If the changes look good, we can move this forward.
/assign @tenzen-y @astefanutti @Electronic-Waste
/hold cancel
There was a problem hiding this comment.
I think that part about syncPodSets will simplify with #2877.
| WithLabels(map[string]string{ | ||
| constants.LabelTrainJobAncestor: "invalid", | ||
| }). |
| }, | ||
| wantObjs: []apiruntime.Object{ | ||
| testingutil.MakeJobSetWrapper(metav1.NamespaceDefault, "test-job"). | ||
| // This is needed to override default label in MakeJobSetWrapper() for Node rJob. |
There was a problem hiding this comment.
Ah I see, why add the "invalid" label is needed. Could we add a TODO maybe to clean this up in a follow-up?
|
/lgtm |
Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
|
/approve |
|
/lgtm |
Electronic-Waste
left a comment
There was a problem hiding this comment.
@andreyvelich Thanks for this great work!
/lgtm
/approve
|
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: andreyvelich, Electronic-Waste The full list of commands accepted by this bot can be found here. The pull request process is described here DetailsNeeds approval from an approver in each of these files:
Approvers can indicate their approval by writing |
…obs (kubeflow#2653) * feat(runtime): add support for launcher resource allocation in MPI jobs Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Add unit tests Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Set numProcPerNode for MPI plugin Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Move util func to runtime package Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Fix torchtune plugin Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Inline if for GPU check Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Assign container resources once Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Add todo for test wrappers Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> --------- Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
…obs (kubeflow#2653) * feat(runtime): add support for launcher resource allocation in MPI jobs Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Add unit tests Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Set numProcPerNode for MPI plugin Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Move util func to runtime package Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Fix torchtune plugin Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Inline if for GPU check Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Assign container resources once Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> * Add todo for test wrappers Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> --------- Signed-off-by: Andrey Velichkevich <andrey.velichkevich@gmail.com> Co-authored-by: Andrey Velichkevich <andrey.velichkevich@gmail.com>
The Trainer method has been updated to apply resources appropriately to both the launcher and node containers based on this flag.
Key changes include:
isRunLauncherAsNodemethod to determine if the launcher should be run as a node.runLauncherAsNodevalue.Which issue(s) this PR fixes: Fixes #2650