Skip to content

[Data] Streaming Partition enforce row_num per block#57984

Merged
raulchen merged 62 commits intoray-project:masterfrom
owenowenisme:data/use-map-op-for-streaming-repartition
Nov 14, 2025
Merged

[Data] Streaming Partition enforce row_num per block#57984
raulchen merged 62 commits intoray-project:masterfrom
owenowenisme:data/use-map-op-for-streaming-repartition

Conversation

@owenowenisme
Copy link
Member

@owenowenisme owenowenisme commented Oct 22, 2025

Description

Currently, streaming repartition applies a map transform to each block independently and does not merge leftover rows across blocks, so it cannot guarantee exact row counts per output block. This PR introduces a new design that computes, on the driver, the input block ranges for every output block. It avoids driver-side block fetching while ensuring correctness and leveraging the efficiency of parallel map tasks.

Related issues

Closes #57165

Additional information

@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Oct 22, 2025
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme force-pushed the data/use-map-op-for-streaming-repartition branch from 6610c21 to 7e39adb Compare October 22, 2025 07:49
@owenowenisme owenowenisme force-pushed the data/use-map-op-for-streaming-repartition branch from 6b1c2c3 to ad81683 Compare October 22, 2025 12:03
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
@owenowenisme owenowenisme marked this pull request as ready for review October 23, 2025 15:06
@owenowenisme owenowenisme requested a review from a team as a code owner October 23, 2025 15:06
@cursor
Copy link

cursor bot commented Oct 23, 2025

Bug: Test Fails to Verify Row Counts Post-Repartitioning

The test_repartition_guarantee_row_num_to_be_exact test initializes block_row_counts as an empty list. This prevents the subsequent loop and assertions from executing, meaning the test doesn't actually verify the expected row counts per block after repartitioning.

Fix in Cursor Fix in Web

@ray-gardener ray-gardener bot added the data Ray Data-related issues label Oct 23, 2025
@srinathk10
Copy link
Contributor

@owenowenisme My first pass looks good. @bveeramani Will do a review for implementation design inside MapOperator.

…t_task

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
cursor[bot]

This comment was marked as outdated.

Copy link
Contributor

@srinathk10 srinathk10 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM but @alexeykudinkin or @bveeramani need review the impl design

Copy link
Member

@bveeramani bveeramani left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the high-level idea sounds reasonable, but I think the current implementation adds a lot of complexity to the MapOperator interfaces.

Could you figure out how to implement this in a way that:

  1. Avoids introducing abstractions that overlap with existing ones (e.g., _TaskInput/TaskContext and StreamingRepartitionTaskBuilder/BlockRefBundler)
  2. Avoids adding streaming-repartition-specific methods to the MapOperator base class (e.g., _submit_task_input and set_task_input_builder)
  3. Makes the correctness easy to test without requiring tens of E2E test cases?

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
cursor[bot]

This comment was marked as outdated.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
elif metadata.num_rows != block_slice.num_rows:
# Partial block - estimate size based on rows
per_row = metadata.size_bytes / metadata.num_rows
total += max(1, int(math.ceil(per_row * block_slice.num_rows)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like we are double slicing the metadata? one here and one in _slice_block_metadata.

I think let's remove _slice_block_metadata and document that when slices are present, metadata is still the original metadata.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually _slice_block_metadata is wrong. because then you cannot slice an already-sliced block.
Let's fix it and add a unit test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added unit test and remove _slice_block_metadata

else:
assert len(self.blocks) == len(
self.slices
), "Number of blocks and slices must match"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also validate the slices have valid ranges.

"""


class StreamingRepartitionRefBundler(BaseRefBundler):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please also add unit tests for this class.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added in test_operators just like BlockRefBundler

if self._total_pending_rows >= self._target_num_rows or flush_remaining:
rows_needed_from_last_bundle = (
self._total_pending_rows % self._target_num_rows
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems wrong.
should be
self._total_pending_rows % self._target_num_rows - self._total_pending_rows % self._target_num_rows

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant self._pending_bundles[-1].num_rows() - self._total_pending_rows % self._target_num_rows ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw self._pending_bundles[-1].num_rows() - self._total_pending_rows % self._target_num_rows will never be negative, but I added assertion just in case



class StreamingRepartitionRefBundler(BaseRefBundler):
"""Incrementally builds task inputs to produce target-sized outputs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this refbundler generate exactly the same as target_num_rows_per_block or multiplies of target_num_rows_per_block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated description

…edata

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
elif metadata.num_rows != block_slice.num_rows:
# Partial block - estimate size based on rows
per_row = metadata.size_bytes / metadata.num_rows
total += max(1, int(math.ceil(per_row * block_slice.num_rows)))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Incorrect Size for Empty Data

When calculating size_bytes() for a slice with zero rows, the code uses max(1, int(math.ceil(per_row * block_slice.num_rows))) which returns 1 byte even when block_slice.num_rows is 0. An empty slice (0 rows) should contribute 0 bytes to the total size, not 1 byte. The max(1, ...) guard appears intended to prevent zero-byte estimates for non-empty slices but incorrectly applies to empty slices as well.

Fix in Cursor Fix in Web

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
rows_needed_from_last_bundle
)
pending_bundles.append(sliced_bundle)
self._ready_bundles.append(RefBundle.merge_ref_bundles(pending_bundles))
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Bundle Exclusion Fails on Exact Completion

When rows_needed_from_last_bundle equals zero, the last bundle should be excluded from the ready bundle but isn't. This occurs when the last bundle's row count exactly equals the remainder (_total_pending_rows % _target_num_rows). For example, with 15 total rows, target of 10, and last bundle of 5 rows, the code outputs all 15 rows instead of outputting 10 rows and keeping 5 pending. The condition at line 39 should handle the zero case by removing the last bundle from pending_bundles before merging.

Fix in Cursor Fix in Web

assert flat_out == list(range(n))


@pytest.mark.parametrize(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit, this should be put under tests/unit, as it's a uni test.

# Test with empty blocks
3,
[[[1]], [[]], [[2, 3]], [[]], [[4, 5]]],
[3, 2], # Expected: [1,2,3] and [4,5]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's also check the block contents.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@raulchen raulchen enabled auto-merge (squash) November 14, 2025 00:37
@raulchen raulchen merged commit 47c1015 into ray-project:master Nov 14, 2025
6 of 7 checks passed
ArturNiederfahrenhorst pushed a commit to ArturNiederfahrenhorst/ray that referenced this pull request Nov 16, 2025
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
landscapepainter pushed a commit to landscapepainter/ray that referenced this pull request Nov 17, 2025
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Future-Outlier pushed a commit to Future-Outlier/ray that referenced this pull request Dec 7, 2025
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: Future-Outlier <eric901201@gmail.com>
peterxcli pushed a commit to peterxcli/ray that referenced this pull request Feb 25, 2026
## Description
Currently, streaming repartition applies a map transform to each block
independently and does not merge leftover rows across blocks, so it
cannot guarantee exact row counts per output block. This PR introduces a
new design that computes, on the driver, the input block ranges for
every output block. It avoids driver-side block fetching while ensuring
correctness and leveraging the efficiency of parallel map tasks.
## Related issues
Closes ray-project#57165

## Additional information

---------

Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: peterxcli <peterxcli@gmail.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Streaming repartition to honor num_rows_per_block

6 participants