Skip to content

Commit 87c8d21

Browse files
iamjustinhsuFuture-Outlier
authored andcommitted
[Data] Add Ranker Interface (ray-project#58513)
## Description Creates a ranker interface that will rank the best operator to run next in `select_operator_to_run`. This code only refractors the existing code. The ranking value must be something that is comparable. ## Related issues None ## Additional information None --------- Signed-off-by: iamjustinhsu <jhsu@anyscale.com> Signed-off-by: Future-Outlier <eric901201@gmail.com>
1 parent b938e02 commit 87c8d21

File tree

7 files changed

+222
-48
lines changed

7 files changed

+222
-48
lines changed

python/ray/data/BUILD.bazel

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1255,6 +1255,20 @@ py_test(
12551255
],
12561256
)
12571257

1258+
py_test(
1259+
name = "test_ranker",
1260+
size = "small",
1261+
srcs = ["tests/test_ranker.py"],
1262+
tags = [
1263+
"exclusive",
1264+
"team:data",
1265+
],
1266+
deps = [
1267+
":conftest",
1268+
"//:ray_lib",
1269+
],
1270+
)
1271+
12581272
py_test(
12591273
name = "test_raydp",
12601274
size = "medium",
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
from .ranker import DefaultRanker, Ranker
2+
3+
4+
def create_ranker() -> Ranker:
5+
"""Create a ranker instance based on environment and configuration."""
6+
return DefaultRanker()
Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,100 @@
1+
"""Ranker component for operator selection in streaming executor."""
2+
3+
from abc import ABC, abstractmethod
4+
from typing import TYPE_CHECKING, Generic, List, Protocol, Tuple, TypeVar
5+
6+
from ray.data._internal.execution.interfaces import PhysicalOperator
7+
from ray.data._internal.execution.resource_manager import ResourceManager
8+
9+
if TYPE_CHECKING:
10+
from ray.data._internal.execution.streaming_executor_state import Topology
11+
12+
# Protocol for comparable ranking values
13+
class Comparable(Protocol):
14+
"""Protocol for types that can be compared for ranking."""
15+
16+
def __lt__(self, other: "Comparable") -> bool:
17+
...
18+
19+
def __le__(self, other: "Comparable") -> bool:
20+
...
21+
22+
def __gt__(self, other: "Comparable") -> bool:
23+
...
24+
25+
def __ge__(self, other: "Comparable") -> bool:
26+
...
27+
28+
def __eq__(self, other: "Comparable") -> bool:
29+
...
30+
31+
32+
# Generic type for comparable ranking values
33+
RankingValue = TypeVar("RankingValue", bound=Comparable)
34+
35+
36+
class Ranker(ABC, Generic[RankingValue]):
37+
"""Abstract base class for operator ranking strategies."""
38+
39+
@abstractmethod
40+
def rank_operator(
41+
self,
42+
op: PhysicalOperator,
43+
topology: "Topology",
44+
resource_manager: ResourceManager,
45+
) -> RankingValue:
46+
"""Rank operator for execution priority.
47+
48+
Operator to run next is selected as the one with the *smallest* value
49+
of the lexicographically ordered ranks composed of (in order):
50+
51+
Args:
52+
op: Operator to rank
53+
topology: Current execution topology
54+
resource_manager: Resource manager for usage information
55+
56+
Returns:
57+
Rank (tuple) for operator
58+
"""
59+
pass
60+
61+
def rank_operators(
62+
self,
63+
ops: List[PhysicalOperator],
64+
topology: "Topology",
65+
resource_manager: ResourceManager,
66+
) -> List[RankingValue]:
67+
68+
assert len(ops) > 0
69+
return [self.rank_operator(op, topology, resource_manager) for op in ops]
70+
71+
72+
class DefaultRanker(Ranker[Tuple[int, int]]):
73+
"""Ranker implementation."""
74+
75+
def rank_operator(
76+
self,
77+
op: PhysicalOperator,
78+
topology: "Topology",
79+
resource_manager: ResourceManager,
80+
) -> Tuple[int, int]:
81+
"""Computes rank for op. *Lower means better rank*
82+
83+
1. Whether operator's could be throttled (int)
84+
2. Operators' object store utilization
85+
86+
Args:
87+
op: Operator to rank
88+
topology: Current execution topology
89+
resource_manager: Resource manager for usage information
90+
91+
Returns:
92+
Rank (tuple) for operator
93+
"""
94+
95+
throttling_disabled = 0 if op.throttling_disabled() else 1
96+
97+
return (
98+
throttling_disabled,
99+
resource_manager.get_op_usage(op).object_store_memory,
100+
)

python/ray/data/_internal/execution/streaming_executor.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
create_actor_autoscaler,
99
)
1010
from ray.data._internal.cluster_autoscaler import create_cluster_autoscaler
11+
from ray.data._internal.execution import create_ranker
1112
from ray.data._internal.execution.backpressure_policy import (
1213
BackpressurePolicy,
1314
get_backpressure_policies,
@@ -77,6 +78,7 @@ def __init__(
7778
dataset_id: str = "unknown_dataset",
7879
):
7980
self._data_context = data_context
81+
self._ranker = create_ranker()
8082
self._start_time: Optional[float] = None
8183
self._initial_stats: Optional[DatasetStats] = None
8284
self._final_stats: Optional[DatasetStats] = None
@@ -479,6 +481,7 @@ def _scheduling_loop_step(self, topology: Topology) -> bool:
479481
# If consumer is idling (there's nothing for it to consume)
480482
# enforce liveness, ie that at least a single task gets scheduled
481483
ensure_liveness=self._consumer_idling(),
484+
ranker=self._ranker,
482485
)
483486

484487
if op is None:

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 3 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
HashShuffleProgressBarMixin,
3535
)
3636
from ray.data._internal.execution.operators.input_data_buffer import InputDataBuffer
37+
from ray.data._internal.execution.ranker import Ranker
3738
from ray.data._internal.execution.resource_manager import (
3839
ResourceManager,
3940
)
@@ -746,6 +747,7 @@ def select_operator_to_run(
746747
resource_manager: ResourceManager,
747748
backpressure_policies: List[BackpressurePolicy],
748749
ensure_liveness: bool,
750+
ranker: "Ranker",
749751
) -> Optional[PhysicalOperator]:
750752
"""Select next operator to launch new tasks.
751753
@@ -769,7 +771,7 @@ def select_operator_to_run(
769771
if not eligible_ops:
770772
return None
771773

772-
ranks = _rank_operators(eligible_ops, resource_manager)
774+
ranks = ranker.rank_operators(eligible_ops, topology, resource_manager)
773775

774776
assert len(eligible_ops) == len(ranks), (eligible_ops, ranks)
775777

@@ -778,48 +780,6 @@ def select_operator_to_run(
778780
return next_op
779781

780782

781-
def _rank_operators(
782-
ops: List[PhysicalOperator], resource_manager: ResourceManager
783-
) -> List[Tuple]:
784-
"""Picks operator to run according to the following semantic:
785-
786-
Operator to run next is selected as the one with the *smallest* value
787-
of the lexicographically ordered ranks composed of (in order):
788-
789-
1. Whether operator's could be throttled (bool)
790-
2. Operators' object store utilization
791-
792-
Consider following examples:
793-
794-
Example 1:
795-
796-
Operator 1 with rank (True, 1024 bytes)
797-
Operator 2 with rank (False, 2048 bytes)
798-
799-
In that case Operator 2 will be selected.
800-
801-
Example 2:
802-
803-
Operator 1 with rank (True, 1024 bytes)
804-
Operator 2 with rank (True, 2048 bytes)
805-
806-
In that case Operator 1 will be selected.
807-
"""
808-
809-
assert len(ops) > 0, ops
810-
811-
def _ranker(op):
812-
# Rank composition:
813-
# 1. Whether throttling is enabled
814-
# 2. Estimated Object Store usage
815-
return (
816-
not op.throttling_disabled(),
817-
resource_manager.get_op_usage(op).object_store_memory,
818-
)
819-
820-
return [_ranker(op) for op in ops]
821-
822-
823783
def _actor_info_summary_str(info: _ActorPoolInfo) -> str:
824784
total = info.running + info.pending + info.restarting
825785
base = f"Actors: {total}"
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
"""Comprehensive tests for the generic ranker type system."""
2+
3+
from unittest.mock import MagicMock
4+
5+
import pytest
6+
7+
from ray.data._internal.execution.interfaces import PhysicalOperator
8+
from ray.data._internal.execution.ranker import DefaultRanker, Ranker
9+
from ray.data._internal.execution.resource_manager import ResourceManager
10+
from ray.data._internal.execution.streaming_executor_state import Topology
11+
12+
13+
def test_default_ranker():
14+
"""Test that the ranker interface works correctly."""
15+
ranker = DefaultRanker()
16+
17+
# Mock objects
18+
op1 = MagicMock()
19+
op1.throttling_disabled.return_value = False
20+
op2 = MagicMock()
21+
op2.throttling_disabled.return_value = True
22+
topology = {}
23+
resource_manager = MagicMock()
24+
resource_manager.get_op_usage.return_value = MagicMock()
25+
resource_manager.get_op_usage.return_value.object_store_memory = 1024
26+
27+
# Test rank_operator for first op
28+
rank1 = ranker.rank_operator(op1, topology, resource_manager)
29+
assert rank1 == (1, 1024) # throttling_disabled=False -> 1, memory=1024
30+
31+
# Test rank_operator for second op
32+
rank2 = ranker.rank_operator(op2, topology, resource_manager)
33+
assert rank2 == (0, 1024) # throttling_disabled=True -> 0, memory=1024
34+
35+
# Test rank_operators with both ops
36+
ops = [op1, op2]
37+
ranks = ranker.rank_operators(ops, topology, resource_manager)
38+
assert ranks == [(1, 1024), (0, 1024)]
39+
40+
41+
class IntRanker(Ranker[int]):
42+
"""Ranker that returns integer rankings."""
43+
44+
def rank_operator(
45+
self,
46+
op: PhysicalOperator,
47+
topology: "Topology",
48+
resource_manager: ResourceManager,
49+
) -> int:
50+
"""Return integer ranking."""
51+
return resource_manager.get_op_usage(op).object_store_memory
52+
53+
54+
def test_generic_types():
55+
"""Test that specific generic types work correctly."""
56+
# Test integer ranker
57+
int_ranker = IntRanker()
58+
op1 = MagicMock()
59+
op2 = MagicMock()
60+
topology = {}
61+
resource_manager = MagicMock()
62+
resource_manager.get_op_usage.return_value = MagicMock()
63+
resource_manager.get_op_usage.return_value.object_store_memory = 1024
64+
65+
# Test rank_operator for first op
66+
rank1 = int_ranker.rank_operator(op1, topology, resource_manager)
67+
assert rank1 == 1024
68+
69+
# Test rank_operator for second op
70+
rank2 = int_ranker.rank_operator(op2, topology, resource_manager)
71+
assert rank2 == 1024
72+
73+
# Test rank_operators with both ops
74+
ops = [op1, op2]
75+
ranks = int_ranker.rank_operators(ops, topology, resource_manager)
76+
assert ranks == [1024, 1024]
77+
78+
79+
if __name__ == "__main__":
80+
import sys
81+
82+
sys.exit(pytest.main(["-v", __file__]))

python/ray/data/tests/test_streaming_executor.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
BlockMapTransformFn,
4141
MapTransformer,
4242
)
43+
from ray.data._internal.execution.ranker import DefaultRanker
4344
from ray.data._internal.execution.resource_manager import ResourceManager
4445
from ray.data._internal.execution.streaming_executor import (
4546
StreamingExecutor,
@@ -49,7 +50,6 @@
4950
from ray.data._internal.execution.streaming_executor_state import (
5051
OpBufferQueue,
5152
OpState,
52-
_rank_operators,
5353
build_streaming_topology,
5454
get_eligible_operators,
5555
process_completed_tasks,
@@ -380,9 +380,10 @@ def _get_op_usage_mocked(op):
380380

381381
resource_manager.get_op_usage.side_effect = _get_op_usage_mocked
382382

383-
ranks = _rank_operators([o1, o2, o3, o4], resource_manager)
383+
ranker = DefaultRanker()
384+
ranks = ranker.rank_operators([o1, o2, o3, o4], {}, resource_manager)
384385

385-
assert [(True, 1024), (True, 2048), (True, 4096), (False, 8092)] == ranks
386+
assert [(1, 1024), (1, 2048), (1, 4096), (0, 8092)] == ranks
386387

387388

388389
def test_select_ops_to_run(ray_start_regular_shared):
@@ -428,7 +429,11 @@ def _get_op_usage_mocked(op):
428429
topo = build_streaming_topology(o4, opts)
429430

430431
selected = select_operator_to_run(
431-
topo, resource_manager, [], ensure_liveness=ensure_liveness
432+
topo,
433+
resource_manager,
434+
[],
435+
ensure_liveness=ensure_liveness,
436+
ranker=DefaultRanker(),
432437
)
433438

434439
assert selected is o4
@@ -439,7 +444,11 @@ def _get_op_usage_mocked(op):
439444
topo = build_streaming_topology(o3, opts)
440445

441446
selected = select_operator_to_run(
442-
topo, resource_manager, [], ensure_liveness=ensure_liveness
447+
topo,
448+
resource_manager,
449+
[],
450+
ensure_liveness=ensure_liveness,
451+
ranker=DefaultRanker(),
443452
)
444453

445454
assert selected is o1

0 commit comments

Comments
 (0)