Support group_mlp offload for TE op fuser#1
Conversation
| if offload_nbytes == 0: | ||
| return src_tensor |
There was a problem hiding this comment.
I'm not sure this is correct. GroupedTensor.prepare_for_saving removes all the buffers within the GroupedTensor, so we need to call restore_from_saved even if no buffers were moved to CPU.
| if not isinstance(buffer, torch.Tensor): | ||
| buffer_states.append(None) | ||
| elif _regular_tensor_needs_offloading(buffer, self.min_offloaded_tensor_size): | ||
| buffer_states.append( | ||
| self._offload_tensor(buffer, pin_memory=pin_memory, use_cpu_pool=use_cpu_pool) | ||
| ) | ||
| buffer.record_stream(self.d2h_stream) | ||
| offload_nbytes += buffer.numel() * buffer.element_size() | ||
| else: | ||
| buffer_states.append((_TE_GROUPED_TENSOR_RESIDENT_BUFFER_STATE, buffer)) |
There was a problem hiding this comment.
Nit: This implementation is correct, but we could make it a bit nicer by using a consistent structure for offloaded and non-offloaded buffers.
| if not isinstance(buffer, torch.Tensor): | |
| buffer_states.append(None) | |
| elif _regular_tensor_needs_offloading(buffer, self.min_offloaded_tensor_size): | |
| buffer_states.append( | |
| self._offload_tensor(buffer, pin_memory=pin_memory, use_cpu_pool=use_cpu_pool) | |
| ) | |
| buffer.record_stream(self.d2h_stream) | |
| offload_nbytes += buffer.numel() * buffer.element_size() | |
| else: | |
| buffer_states.append((_TE_GROUPED_TENSOR_RESIDENT_BUFFER_STATE, buffer)) | |
| is_offloaded = ( | |
| buffer is not None | |
| and _regular_tensor_needs_offloading(buffer, self.min_offloaded_tensor_size) | |
| ) | |
| buffer_state = ( | |
| self._offload_tensor(buffer, pin_memory, use_cpu_pool=use_cpu_pool) | |
| if is_offloaded | |
| else buffer | |
| ) | |
| buffer_states.append((is_offloaded, buffer_state)) |
The reloading function becomes:
def _reload_te_grouped_tensor(self, state, non_blocking=None):
"""Reload TE GroupedTensor backing buffers and reconstruct the wrapper."""
debug_rank("------reload TE GroupedTensor")
_, tensor_obj, buffer_states, _ = state
buffers = []
for is_offloaded, buffer_state in buffer_states:
if is_offloaded:
buffers.append(self._reload_tensor(buffer_state, non_blocking=non_blocking))
else:
buffers.append(buffer_state)
return te_grouped_tensor_restore_from_saved(tensor_obj, buffers)| def offload(self, src_tensor, pin_memory=True, use_cpu_pool=True): | ||
| """Offload a tensor-like saved activation.""" | ||
| if is_te_grouped_tensor(src_tensor): | ||
| return self._offload_te_grouped_tensor(src_tensor, pin_memory, use_cpu_pool) | ||
| return self._offload_tensor(src_tensor, pin_memory, use_cpu_pool) |
There was a problem hiding this comment.
This looks correct, but I'm curious why we don't have similar logic for QuantizedTensor, which should have similar problems as GroupedTensor. If we already have TE infrastructure for CPU offloading for quantized tensors, then we should use that for grouped tensors instead of implementing custom logic in Mcore.
What does this PR do ?
Contribution process
Pre-checks
Code review
Feel free to message or comment the @mcore-oncall to help accelerate your merge into main. The less complex your PR is, the faster it will be approved and merged!
All PRs start as draft. If you open a non-draft PR, it will be automatically converted to draft.
Step 1: Mark PR as "Ready for Review"
.github/CODEOWNERS.Final Review might get declined if these requirements are not fulfilled.
Step 2: Final Review
For PRs that change
megatron/core, once all expert reviewers have approved, theFinal Reviewlabel is applied automatically and final reviewers are assigned.For PRs outside
megatron/core, this step is skipped.Step 3: Approved
Once all required reviewers have approved, the
Approvedlabel is applied automatically.Merge
Any member of mcore-engineers will be able to merge your PR.
For MRs into `dev` branch
The proposed review process for `dev` branch is under active discussion.MRs are mergable after one approval by either
eharper@nvidia.comorzijiey@nvidia.com.