Skip to content

Commit b38c13a

Browse files
raulchenpeterxcli
authored andcommitted
[data] fix resource allocator not respecting max resource requirement (ray-project#59412)
This PR adds a cap on the total budget allocation by max_resource_usage in ReservationOpResourceAllocator. Previously, the reservation was capped by max_resource_usage, but the total budget (reservation + shared resources) could exceed the max. This could lead to operators being allocated more resources than they can use, while other operators are starved. Changes Cap total allocation by max_resource_usage: - Total allocation = max(total_reserved, op_usage) + op_shared - We now cap op_shared so that total allocation ≤ max_resource_usage - Excess shared resources remain in remaining_shared for other operators Redistribute remaining shared resources: - After the allocation loop, any remaining shared resources (from caps) are given to the most downstream uncapped operator - An operator is "uncapped" if its max_resource_usage == ExecutionResources.inf() - If all operators are capped, remaining resources are not redistributed Tests - test_budget_capped_by_max_resource_usage: Tests that capped operators don't receive excess shared resources, and remaining goes to uncapped downstream op - test_budget_capped_by_max_resource_usage_all_capped: Tests that when all operators are capped, remaining shared resources are not redistributed --------- Signed-off-by: Hao Chen <chenh1024@gmail.com> Signed-off-by: peterxcli <peterxcli@gmail.com>
1 parent fa2c4de commit b38c13a

File tree

2 files changed

+171
-1
lines changed

2 files changed

+171
-1
lines changed

python/ray/data/_internal/execution/resource_manager.py

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -789,6 +789,14 @@ def get_allocation(self, op: PhysicalOperator) -> Optional[ExecutionResources]:
789789
# TODO fix
790790
return ExecutionResources.zero()
791791

792+
def _get_total_reserved(self, op: PhysicalOperator) -> ExecutionResources:
793+
"""Get total reserved resources for an operator, including outputs reservation."""
794+
op_reserved = self._op_reserved[op]
795+
reserved_for_outputs = self._reserved_for_op_outputs[op]
796+
return op_reserved.copy(
797+
object_store_memory=op_reserved.object_store_memory + reserved_for_outputs
798+
)
799+
792800
def max_task_output_bytes_to_read(
793801
self,
794802
op: PhysicalOperator,
@@ -888,6 +896,20 @@ def update_budgets(
888896
remaining_shared
889897
):
890898
op_shared = op_shared.add(to_borrow)
899+
900+
# Cap op_shared so that total allocation doesn't exceed max_resource_usage.
901+
# Total allocation = max(total_reserved, op_usage) + op_shared
902+
# This ensures excess resources stay in remaining_shared for other operators.
903+
_, max_resource_usage = op.min_max_resource_requirements()
904+
if max_resource_usage != ExecutionResources.inf():
905+
total_reserved = self._get_total_reserved(op)
906+
op_usage = self._resource_manager.get_op_usage(op)
907+
current_allocation = total_reserved.max(op_usage)
908+
max_shared = max_resource_usage.subtract(current_allocation).max(
909+
ExecutionResources.zero()
910+
)
911+
op_shared = op_shared.min(max_shared)
912+
891913
remaining_shared = remaining_shared.subtract(op_shared)
892914
assert remaining_shared.is_non_negative(), (
893915
remaining_shared,
@@ -896,7 +918,10 @@ def update_budgets(
896918
to_borrow,
897919
)
898920

899-
if op.min_max_resource_requirements()[1].gpu > 0:
921+
if (
922+
max_resource_usage != ExecutionResources.inf()
923+
and max_resource_usage.gpu > 0
924+
):
900925
# If an operator needs GPU, we just allocate all GPUs to it.
901926
# TODO(hchen): allocate resources across multiple GPU operators.
902927

@@ -915,6 +940,17 @@ def update_budgets(
915940
self._op_budgets[op].add(op_shared).copy(gpu=target_num_gpu)
916941
)
917942

943+
# Give any remaining shared resources to the most downstream uncapped op.
944+
# This can happen when some ops have their shared allocation capped.
945+
if eligible_ops and not remaining_shared.is_zero():
946+
for op in reversed(eligible_ops):
947+
_, max_resource_usage = op.min_max_resource_requirements()
948+
if max_resource_usage == ExecutionResources.inf():
949+
self._op_budgets[op] = self._op_budgets[op].add(
950+
remaining_shared.copy(gpu=0)
951+
)
952+
break
953+
918954
# A materializing operator like `AllToAllOperator` waits for all its input
919955
# operator's outputs before processing data. This often forces the input
920956
# operator to exceed its object store memory budget. To prevent deadlock, we

python/ray/data/tests/test_resource_manager.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -712,6 +712,140 @@ def test_does_not_reserve_more_than_max_resource_usage(self):
712712
cpu=1, object_store_memory=1
713713
)
714714

715+
def test_budget_capped_by_max_resource_usage(self, restore_data_context):
716+
"""Test that the total allocation is capped by max_resource_usage.
717+
718+
Total allocation = max(total_reserved, op_usage) + op_shared
719+
We cap op_shared so that total allocation <= max_resource_usage.
720+
Excess shared resources should remain available for other operators.
721+
"""
722+
DataContext.get_current().op_resource_reservation_enabled = True
723+
DataContext.get_current().op_resource_reservation_ratio = 0.5
724+
725+
o1 = InputDataBuffer(DataContext.get_current(), [])
726+
o2 = mock_map_op(o1, incremental_resource_usage=ExecutionResources(1, 0, 10))
727+
o3 = mock_map_op(o2, incremental_resource_usage=ExecutionResources(1, 0, 10))
728+
729+
# o2 has a small max CPU, so its CPU shared allocation will be capped.
730+
# o3 has unlimited max_resource_usage.
731+
o2.min_max_resource_requirements = MagicMock(
732+
return_value=(
733+
ExecutionResources.zero(),
734+
ExecutionResources(cpu=4, object_store_memory=float("inf")),
735+
)
736+
)
737+
o3.min_max_resource_requirements = MagicMock(
738+
return_value=(
739+
ExecutionResources.zero(),
740+
ExecutionResources.inf(),
741+
)
742+
)
743+
744+
topo = build_streaming_topology(o3, ExecutionOptions())
745+
746+
global_limits = ExecutionResources(cpu=20, object_store_memory=400)
747+
748+
op_usages = {
749+
o1: ExecutionResources.zero(),
750+
o2: ExecutionResources(cpu=2, object_store_memory=40),
751+
o3: ExecutionResources(cpu=2, object_store_memory=40),
752+
}
753+
754+
resource_manager = ResourceManager(
755+
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
756+
)
757+
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
758+
resource_manager._mem_op_internal = {o1: 0, o2: 40, o3: 40}
759+
resource_manager._mem_op_outputs = {o1: 0, o2: 0, o3: 0}
760+
resource_manager.get_global_limits = MagicMock(return_value=global_limits)
761+
762+
allocator = resource_manager._op_resource_allocator
763+
assert isinstance(allocator, ReservationOpResourceAllocator)
764+
allocator.update_budgets(limits=global_limits)
765+
766+
# All tuples below are (cpu, object_store_memory).
767+
#
768+
# Reservation phase:
769+
# - default_reserved per op = global_limits * 0.5 / 2 = (5, 100)
770+
# - reserved_for_outputs per op = 100 / 2 = 50
771+
# - o2's reserved_for_tasks is capped by max (4, inf) -> (4, 50)
772+
# - o3's reserved_for_tasks = (5, 50)
773+
# - total_shared = global_limits - o2_total_reserved - o3_total_reserved
774+
# = (20, 400) - (4, 100) - (5, 100) = (11, 200)
775+
#
776+
# Budget phase (first loop calculates reserved_remaining):
777+
# - o2: reserved_remaining = reserved_for_tasks - usage = (4, 50) - (2, 40) = (2, 10)
778+
# - o3: reserved_remaining = (5, 50) - (2, 40) = (3, 10)
779+
#
780+
# Shared allocation (second loop, reversed order):
781+
# - o3: op_shared = remaining_shared / 2 = (5.5, 100), no cap
782+
# budget = reserved_remaining + op_shared = (3, 10) + (5.5, 100) = (8.5, 110)
783+
# - o2: op_shared = (5.5, 100), CPU capped to (0, 100)
784+
# budget = (2, 10) + (0, 100) = (2, 110)
785+
# remaining_shared = (5.5, 0)
786+
# - After loop, remaining (5.5, 0) given to most downstream op (o3):
787+
# o3 budget = (8.5, 110) + (5.5, 0) = (14, 110)
788+
assert allocator._op_budgets[o2] == ExecutionResources(
789+
cpu=2, object_store_memory=110
790+
)
791+
assert allocator._op_budgets[o3] == ExecutionResources(
792+
cpu=14, object_store_memory=110
793+
)
794+
795+
def test_budget_capped_by_max_resource_usage_all_capped(self, restore_data_context):
796+
"""Test when all operators are capped, remaining shared resources are not given."""
797+
DataContext.get_current().op_resource_reservation_enabled = True
798+
DataContext.get_current().op_resource_reservation_ratio = 0.5
799+
800+
o1 = InputDataBuffer(DataContext.get_current(), [])
801+
o2 = mock_map_op(o1, incremental_resource_usage=ExecutionResources(1, 0, 10))
802+
o3 = mock_map_op(o2, incremental_resource_usage=ExecutionResources(1, 0, 10))
803+
804+
# Both operators are capped.
805+
o2.min_max_resource_requirements = MagicMock(
806+
return_value=(
807+
ExecutionResources.zero(),
808+
ExecutionResources(cpu=4, object_store_memory=float("inf")),
809+
)
810+
)
811+
o3.min_max_resource_requirements = MagicMock(
812+
return_value=(
813+
ExecutionResources.zero(),
814+
ExecutionResources(cpu=4, object_store_memory=float("inf")),
815+
)
816+
)
817+
818+
topo = build_streaming_topology(o3, ExecutionOptions())
819+
820+
global_limits = ExecutionResources(cpu=20, object_store_memory=400)
821+
822+
op_usages = {
823+
o1: ExecutionResources.zero(),
824+
o2: ExecutionResources(cpu=2, object_store_memory=40),
825+
o3: ExecutionResources(cpu=2, object_store_memory=40),
826+
}
827+
828+
resource_manager = ResourceManager(
829+
topo, ExecutionOptions(), MagicMock(), DataContext.get_current()
830+
)
831+
resource_manager.get_op_usage = MagicMock(side_effect=lambda op: op_usages[op])
832+
resource_manager._mem_op_internal = {o1: 0, o2: 40, o3: 40}
833+
resource_manager._mem_op_outputs = {o1: 0, o2: 0, o3: 0}
834+
resource_manager.get_global_limits = MagicMock(return_value=global_limits)
835+
836+
allocator = resource_manager._op_resource_allocator
837+
allocator.update_budgets(limits=global_limits)
838+
839+
# Both ops are capped (max cpu=4), so remaining CPU is not given to any op.
840+
# o2: reserved_remaining (2, 10) + capped op_shared (0, 100) = (2, 110)
841+
# o3: reserved_remaining (2, 10) + capped op_shared (0, 100) = (2, 110)
842+
assert allocator._op_budgets[o2] == ExecutionResources(
843+
cpu=2, object_store_memory=110
844+
)
845+
assert allocator._op_budgets[o3] == ExecutionResources(
846+
cpu=2, object_store_memory=110
847+
)
848+
715849
def test_only_handle_eligible_ops(self, restore_data_context):
716850
"""Test that we only handle non-completed map ops."""
717851
DataContext.get_current().op_resource_reservation_enabled = True

0 commit comments

Comments
 (0)