Skip to content

[Data] Read task parallelism should be planned after operator fusion #41018

@stephanie-wang

Description

@stephanie-wang

What happened + What you expected to happen

Operator fusion can change the target block size of a Read op. This happens if there is a downstream AllToAll op, since AllToAll ops have a different and larger default block size from Map ops. Currently there is a circular dependency in the optimizer rules:

  1. SplitReadOutputBlocks rule: Read op determines its split factor based on its target block size.
  2. OperatorFusion rule: When an AllToAll op gets fused with a Map op, the immediately upstream op inherits the larger target block size. This upstream op may be a Read op.

However, SplitReadOutputBlocks needs the inherited target block size in step 1. This means that for all-to-all ops, the read stage's computed parallelism may be higher than it's supposed to be.

Users can work around this issue by setting DataContext.target_max_block_size = DataContext.target_shuffle_max_block_size.

Versions / Dependencies

2.7+

Reproduction script

ray.data.range(N).random_shuffle()

Issue Severity

None

Metadata

Metadata

Labels

P1Issue that should be fixed within a few weeksbugSomething that is supposed to be working; but isn'tdataRay Data-related issues

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions