[Data] Revisit OutputSplitter semantic to avoid unnecessary buffer accumulation#60237
Conversation
…cumulation Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
There was a problem hiding this comment.
Code Review
This pull request refactors the OutputSplitter to use a maximum buffer size instead of a minimum, which is a great change to prevent unnecessary buffer accumulation and improve streaming performance. The new dispatching logic appears to correctly prioritize locality while ensuring liveness.
However, there are a few critical issues that need to be addressed:
- Merge Conflict: The file
python/ray/data/tests/test_operators.pycontains unresolved merge conflict markers (<<<<<<<,=======,>>>>>>>) which will cause the build to fail. - Unrelated Changes: The same test file also includes a large number of changes unrelated to the
OutputSplitter(e.g., new tests forMapOperator). To keep this PR focused and easy to review, please move these changes to a separate pull request. - Argument Parsing Error: The benchmark script
release/nightly_tests/dataset/streaming_split_benchmark.pyhas a duplicate definition for the--equal-splitargument, which will cause a runtime error. The corresponding YAML configuration inrelease/release_data_tests.yamlalso uses this flag incorrectly.
I've left specific comments on these issues. Once they are resolved, this will be a solid improvement.
| preferred_loc = self._locality_hints[target_index] | ||
| preferred_loc = self._locality_hints[target_output_index] | ||
|
|
||
| # TODO make this more efficient (adding inverse hash-map) |
There was a problem hiding this comment.
The TODO here correctly identifies a potential performance bottleneck. The current implementation iterates through the entire buffer (O(N)) to find a bundle with preferred locality. If the buffer size becomes large, this linear scan could impact performance.
Consider implementing the suggestion in the TODO by using an inverted index (e.g., a dictionary mapping location -> List[RefBundle]) to achieve O(1) lookups for preferred bundles. This would make the locality optimization much more efficient.
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
| assert not self._buffer, "All bundles should have been dispatched" | ||
| return | ||
|
|
||
| if not self._buffer: |
There was a problem hiding this comment.
Missing forced dispatch when equal=False without locality hints
Medium Severity
When equal=False and _locality_hints is falsy, the new all_inputs_done logic skips the explicit forced dispatch and falls through to the finalize distribution code if the buffer is non-empty. The finalize distribution code is designed only for equal=True mode and assumes the output distribution needs equalization. Running it with equal=False can trigger the assert remainder >= 0 assertion to fail since greedy dispatching produces uneven distributions where sum(allocation) may exceed buffer_size.
…cumulation (ray-project#60237) ## Description Currently, `OutputSplitter` is only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense. This change instead inverses that semantic to - Dispatch blocks to the next outstanding receiver as soon as these become available - Force dispatch in case buffer exceed it's max-size threshold (enforce buffer doesn't exceed it's max-size) ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Co-authored-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Limark Dcunha <limarkdcunha@gmail.com>
…cumulation (ray-project#60237) ## Description Currently, `OutputSplitter` is only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense. This change instead inverses that semantic to - Dispatch blocks to the next outstanding receiver as soon as these become available - Force dispatch in case buffer exceed it's max-size threshold (enforce buffer doesn't exceed it's max-size) ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Co-authored-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: jinbum-kim <jinbum9958@gmail.com>
…cumulation (ray-project#60237) ## Description Currently, `OutputSplitter` is only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense. This change instead inverses that semantic to - Dispatch blocks to the next outstanding receiver as soon as these become available - Force dispatch in case buffer exceed it's max-size threshold (enforce buffer doesn't exceed it's max-size) ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Co-authored-by: Alexey Kudinkin <ak@anyscale.com>
…cumulation (ray-project#60237) ## Description Currently, `OutputSplitter` is only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense. This change instead inverses that semantic to - Dispatch blocks to the next outstanding receiver as soon as these become available - Force dispatch in case buffer exceed it's max-size threshold (enforce buffer doesn't exceed it's max-size) ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Co-authored-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
…cumulation (ray-project#60237) ## Description Currently, `OutputSplitter` is only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense. This change instead inverses that semantic to - Dispatch blocks to the next outstanding receiver as soon as these become available - Force dispatch in case buffer exceed it's max-size threshold (enforce buffer doesn't exceed it's max-size) ## Related issues ## Additional information --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Co-authored-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
Description
Currently,
OutputSplitteris only dispatching blocks that exceeds it's baseline of N * 2 (where N is the number of workers) blocks. That doesn't make a lot of sense.This change instead inverses that semantic to
Related issues
Additional information