Skip to content

Commit d0777a0

Browse files
alexeykudinkinzma2
authored andcommitted
[Data] Avoid unnecessary copying of blocks (ray-project#56569)
<!-- Thank you for your contribution! Please review https://github.com/ray-project/ray/blob/master/CONTRIBUTING.rst before opening a pull request. --> <!-- Please add a reviewer to the assignee section when you create a PR. If you don't have the access to it, we will shortly find a reviewer and assign them to your PR. --> ## Why are these changes needed? These were introduced before current serialization protocol inheriting from Arrow IPC was implemented therefore making this precautionary copies obsolete (IPC based serializer will actually do the copying properly). ## Related issue number <!-- For example: "Closes ray-project#1234" --> ## Checks - [ ] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [ ] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in `doc/source/tune/api/` under the corresponding `.rst` file. - [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - Testing Strategy - [ ] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: Alexey Kudinkin <ak@anyscale.com> Signed-off-by: Zhiqiang Ma <zhiqiang.ma@intel.com>
1 parent fa3571c commit d0777a0

File tree

3 files changed

+5
-17
lines changed

3 files changed

+5
-17
lines changed

python/ray/data/_internal/execution/operators/limit_operator.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,9 @@ def _add_input_inner(self, refs: RefBundle, input_index: int) -> None:
5454
else:
5555
# Slice the last block.
5656
def slice_fn(block, metadata, num_rows) -> Tuple[Block, BlockMetadata]:
57-
block = BlockAccessor.for_block(block).slice(0, num_rows, copy=True)
57+
block = BlockAccessor.for_block(block).slice(
58+
0, num_rows, copy=False
59+
)
5860
metadata = copy.deepcopy(metadata)
5961
metadata.num_rows = num_rows
6062
metadata.size_bytes = BlockAccessor.for_block(block).size_bytes()

python/ray/data/_internal/execution/operators/map_transformer.py

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -599,8 +599,5 @@ def __call__(self, blocks: Iterable[Block], ctx: TaskContext) -> Iterable[Block]
599599
offset = 0
600600
split_sizes = _splitrange(block.num_rows(), self._additional_split_factor)
601601
for size in split_sizes:
602-
# NOTE: copy=True is needed because this is an output block. If
603-
# a block slice is put into the object store, the entire block
604-
# will get serialized.
605-
yield block.slice(offset, offset + size, copy=True)
602+
yield block.slice(offset, offset + size, copy=False)
606603
offset += size

python/ray/data/_internal/output_buffer.py

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -149,18 +149,7 @@ def next(self) -> Block:
149149
)
150150

151151
if target_num_rows is not None and target_num_rows < accessor.num_rows():
152-
# NOTE: We're maintaining following protocol of slicing underlying block
153-
# into appropriately sized ones:
154-
#
155-
# - (Finalized) Target blocks sliced from the original one
156-
# and are *copied* to avoid referencing original blocks
157-
# - Temporary remainder of the block should *NOT* be copied
158-
# such as to avoid repeatedly copying the remainder bytes
159-
# of the block, resulting in O(M * N) total bytes being
160-
# copied, where N is the total number of bytes in the original
161-
# block and M is the number of blocks that will be produced by
162-
# this iterator
163-
block = accessor.slice(0, target_num_rows, copy=True)
152+
block = accessor.slice(0, target_num_rows, copy=False)
164153
block_remainder = accessor.slice(
165154
target_num_rows, accessor.num_rows(), copy=False
166155
)

0 commit comments

Comments
 (0)