Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion python/ray/data/_internal/execution/resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,14 @@ def get_allocation(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
# TODO fix
return ExecutionResources.zero()

def _get_total_reserved(self, op: PhysicalOperator) -> ExecutionResources:
"""Get total reserved resources for an operator, including outputs reservation."""
op_reserved = self._op_reserved[op]
reserved_for_outputs = self._reserved_for_op_outputs[op]
return op_reserved.copy(
object_store_memory=op_reserved.object_store_memory + reserved_for_outputs
)

def max_task_output_bytes_to_read(
self,
op: PhysicalOperator,
Expand Down Expand Up @@ -879,6 +887,20 @@ def update_budgets(
remaining_shared
):
op_shared = op_shared.add(to_borrow)

# Cap op_shared so that total allocation doesn't exceed max_resource_usage.
# Total allocation = max(total_reserved, op_usage) + op_shared
# This ensures excess resources stay in remaining_shared for other operators.
_, max_resource_usage = op.min_max_resource_requirements()
if max_resource_usage is not None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here and on L912 -- when can max_resource_usage be None? I think the return type is tuple[ExecutionResources, ExecutionResources] (i.e., always non-None)?

If we are returning None in some places, I think we should return for_limits() instead. I think polymorphism makes the harder to read and reason about

total_reserved = self._get_total_reserved(op)
op_usage = self._resource_manager.get_op_usage(op)
current_allocation = total_reserved.max(op_usage)
max_shared = max_resource_usage.subtract(current_allocation).max(
ExecutionResources.zero()
)
op_shared = op_shared.min(max_shared)

remaining_shared = remaining_shared.subtract(op_shared)
assert remaining_shared.is_non_negative(), (
remaining_shared,
Expand All @@ -887,7 +909,7 @@ def update_budgets(
to_borrow,
)

if op.min_max_resource_requirements()[1].gpu > 0:
if max_resource_usage is not None and max_resource_usage.gpu > 0:
# If an operator needs GPU, we just allocate all GPUs to it.
# TODO(hchen): allocate resources across multiple GPU operators.

Expand All @@ -906,6 +928,14 @@ def update_budgets(
self._op_budgets[op].add(op_shared).copy(gpu=target_num_gpu)
)

# Give any remaining shared resources to the most downstream op.
# This can happen when some ops have their shared allocation capped.
if eligible_ops and not remaining_shared.is_zero():
downstream_op = eligible_ops[-1]
self._op_budgets[downstream_op] = self._op_budgets[downstream_op].add(
remaining_shared.copy(gpu=0)
)

# A materializing operator like `AllToAllOperator` waits for all its input
# operator's outputs before processing data. This often forces the input
# operator to exceed its object store memory budget. To prevent deadlock, we
Expand Down
80 changes: 80 additions & 0 deletions python/ray/data/tests/test_resource_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -712,6 +712,86 @@ def test_does_not_reserve_more_than_max_resource_usage(self):
cpu=1, object_store_memory=1
)

def test_budget_capped_by_max_resource_usage(self, restore_data_context):
"""Test that the total allocation is capped by max_resource_usage.

Total allocation = max(total_reserved, op_usage) + op_shared
We cap op_shared so that total allocation <= max_resource_usage.
Excess shared resources should remain available for other operators.
"""
DataContext.get_current().op_resource_reservation_enabled = True
DataContext.get_current().op_resource_reservation_ratio = 0.5

o1 = InputDataBuffer(DataContext.get_current(), [])
o2 = mock_map_op(o1, incremental_resource_usage=ExecutionResources(1, 0, 10))
o3 = mock_map_op(o2, incremental_resource_usage=ExecutionResources(1, 0, 10))

# o2 has a small max CPU, so its CPU shared allocation will be capped.
# o3 has unlimited max_resource_usage.
o2.min_max_resource_requirements = MagicMock(
return_value=(
ExecutionResources.zero(),
ExecutionResources(cpu=4, object_store_memory=float("inf")),
)
)
o3.min_max_resource_requirements = MagicMock(
return_value=(
ExecutionResources.zero(),
ExecutionResources.inf(),
)
)

topo = build_streaming_topology(o3, ExecutionOptions())

global_limits = ExecutionResources(cpu=20, object_store_memory=400)

op_usages = {
o1: ExecutionResources.zero(),
o2: ExecutionResources(cpu=2, object_store_memory=40),
o3: ExecutionResources(cpu=2, object_store_memory=40),
}

resource_manager = ResourceManager(
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
)
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
resource_manager._mem_op_internal = {o1: 0, o2: 40, o3: 40}
resource_manager._mem_op_outputs = {o1: 0, o2: 0, o3: 0}
resource_manager.get_global_limits = MagicMock(return_value=global_limits)

allocator = resource_manager._op_resource_allocator
assert isinstance(allocator, ReservationOpResourceAllocator)
allocator.update_budgets(limits=global_limits)

# All tuples below are (cpu, object_store_memory).
#
# Reservation phase:
# - default_reserved per op = global_limits * 0.5 / 2 = (5, 100)
# - reserved_for_outputs per op = 100 / 2 = 50
# - o2's reserved_for_tasks is capped by max (4, inf) -> (4, 50)
# - o3's reserved_for_tasks = (5, 50)
# - total_shared = global_limits - o2_total_reserved - o3_total_reserved
# = (20, 400) - (4, 100) - (5, 100) = (11, 200)
#
# Budget phase (first loop calculates reserved_remaining):
# - o2: reserved_remaining = reserved_for_tasks - usage = (4, 50) - (2, 40) = (2, 10)
# - o3: reserved_remaining = (5, 50) - (2, 40) = (3, 10)
#
# Shared allocation (second loop, reversed order):
# - o3: op_shared = remaining_shared / 2 = (5.5, 100), no cap
# budget = reserved_remaining + op_shared = (3, 10) + (5.5, 100) = (8.5, 110)
# - o2: op_shared = (5.5, 100), CPU capped to (0, 100)
# budget = (2, 10) + (0, 100) = (2, 110)
# remaining_shared = (5.5, 0)
# - After loop, remaining (5.5, 0) given to most downstream op (o3):
# o3 budget = (8.5, 110) + (5.5, 0) = (14, 110)
assert allocator._op_budgets[o2] == ExecutionResources(
cpu=2, object_store_memory=110
)
assert allocator._op_budgets[o3] == ExecutionResources(
cpu=14, object_store_memory=110
)

def test_only_handle_eligible_ops(self, restore_data_context):
"""Test that we only handle non-completed map ops."""
DataContext.get_current().op_resource_reservation_enabled = True
Expand Down