Skip to content

Commit 6adb3c7

Browse files
author
Shallow Copy Bot
committed
[Data] Fix driver hang during streaming generator block metadata retrieval
Original PR #56451 by dragongu Original: ray-project/ray#56451
1 parent c4075ab commit 6adb3c7

File tree

4 files changed

+263
-124
lines changed

4 files changed

+263
-124
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ray.data._internal.output_buffer import OutputBlockSizeOption
3131
from ray.data._internal.progress_bar import ProgressBar
3232
from ray.data._internal.stats import StatsDict, Timer
33+
from ray.data.block import Block, BlockMetadata
3334
from ray.data.context import DataContext
3435

3536
if TYPE_CHECKING:
@@ -38,6 +39,11 @@
3839

3940
logger = logging.getLogger(__name__)
4041

42+
# Timeout for getting metadata from Ray object references (in seconds)
43+
METADATA_GET_TIMEOUT_S = 1.0
44+
45+
# Timeout for waiting for metadata object to become available (in seconds)
46+
METADATA_WAIT_TIMEOUT_S = 0.1
4147

4248
# TODO(hchen): Ray Core should have a common interface for these two types.
4349
Waitable = Union[ray.ObjectRef, ObjectRefGenerator]
@@ -93,8 +99,8 @@ def __init__(
9399
self,
94100
task_index: int,
95101
streaming_gen: ObjectRefGenerator,
96-
output_ready_callback: Callable[[RefBundle], None],
97-
task_done_callback: Callable[[Optional[Exception]], None],
102+
output_ready_callback: Callable[[RefBundle], None] = lambda bundle: None,
103+
task_done_callback: Callable[[Optional[Exception]], None] = lambda exc: None,
98104
task_resource_bundle: Optional[ExecutionResources] = None,
99105
):
100106
"""Create a DataOpTask
@@ -115,6 +121,13 @@ def __init__(
115121
self._output_ready_callback = output_ready_callback
116122
self._task_done_callback = task_done_callback
117123

124+
# If the generator hasn't produced block metadata yet, or if the block metadata
125+
# object isn't available after we get a reference, we need store the pending
126+
# references and wait until Ray (re)constructs the block metadata. Either case
127+
# can happen if a node dies after producing a block.
128+
self._pending_block_ref: ray.ObjectRef[Block] = ray.ObjectRef.nil()
129+
self._pending_meta_ref: ray.ObjectRef[BlockMetadata] = ray.ObjectRef.nil()
130+
118131
def get_waitable(self) -> ObjectRefGenerator:
119132
return self._streaming_gen
120133

@@ -128,42 +141,81 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
128141
"""
129142
bytes_read = 0
130143
while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
131-
try:
132-
block_ref = self._streaming_gen._next_sync(0)
133-
if block_ref.is_nil():
144+
if self._pending_block_ref.is_nil():
145+
assert self._pending_meta_ref.is_nil(), (
146+
"This method expects streaming generators to yield blocks then "
147+
"metadata. So, if we have a reference to metadata but not the "
148+
"block, it means there's an error in the implementation."
149+
)
150+
151+
try:
152+
self._pending_block_ref = self._streaming_gen._next_sync(
153+
timeout_s=0
154+
)
155+
except StopIteration:
156+
self._task_done_callback(None)
157+
break
158+
159+
if self._pending_block_ref.is_nil():
134160
# The generator currently doesn't have new output.
135161
# And it's not stopped yet.
136162
break
137-
except StopIteration:
138-
self._task_done_callback(None)
139-
break
163+
164+
if self._pending_meta_ref.is_nil():
165+
try:
166+
self._pending_meta_ref = self._streaming_gen._next_sync(
167+
timeout_s=METADATA_WAIT_TIMEOUT_S
168+
)
169+
except StopIteration:
170+
# The generator should always yield 2 values (block and metadata)
171+
# each time. If we get a StopIteration here, it means an error
172+
# happened in the task.
173+
# And in this case, the block_ref is the exception object.
174+
# TODO(hchen): Ray Core should have a better interface for
175+
# detecting and obtaining the exception.
176+
try:
177+
ray.get(self._pending_block_ref)
178+
assert False, "Above ray.get should raise an exception."
179+
except Exception as ex:
180+
self._task_done_callback(ex)
181+
raise ex from None
182+
183+
if self._pending_meta_ref.is_nil():
184+
# We have a reference to the block but the metadata isn't ready
185+
# yet.
186+
break
140187

141188
try:
189+
# The timeout for `ray.get` includes the time required to ship the
190+
# block metadata to this node. So, if we set the timeout to 0, `ray.get`
191+
# will timeout and possible cancel the download. To avoid this issue,
192+
# we set the timeout to a small non-zero value.
142193
meta_with_schema: "BlockMetadataWithSchema" = ray.get(
143-
next(self._streaming_gen)
194+
self._pending_meta_ref, timeout=METADATA_GET_TIMEOUT_S
144195
)
145-
except StopIteration:
146-
# The generator should always yield 2 values (block and metadata)
147-
# each time. If we get a StopIteration here, it means an error
148-
# happened in the task.
149-
# And in this case, the block_ref is the exception object.
150-
# TODO(hchen): Ray Core should have a better interface for
151-
# detecting and obtaining the exception.
152-
try:
153-
ray.get(block_ref)
154-
assert False, "Above ray.get should raise an exception."
155-
except Exception as ex:
156-
self._task_done_callback(ex)
157-
raise ex from None
196+
except ray.exceptions.GetTimeoutError:
197+
# We have a reference to the block and its metadata, but the metadata
198+
# object isn't available. This can happen if the node dies.
199+
logger.warning(
200+
f"Metadata object not ready for "
201+
f"ref={self._pending_meta_ref.hex()} "
202+
f"(operator={self.__class__.__name__}). "
203+
f"Metadata may still be computing or worker may have failed and "
204+
f"object is being reconstructed. Will retry in next iteration."
205+
)
206+
break
158207

159208
meta = meta_with_schema.metadata
160209
self._output_ready_callback(
161210
RefBundle(
162-
[(block_ref, meta)],
211+
[(self._pending_block_ref, meta)],
163212
owns_blocks=True,
164213
schema=meta_with_schema.schema,
165214
),
166215
)
216+
self._pending_block_ref = ray.ObjectRef.nil()
217+
self._pending_meta_ref = ray.ObjectRef.nil()
218+
167219
bytes_read += meta.size_bytes
168220

169221
return bytes_read

python/ray/data/_internal/util.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import threading
1010
import time
1111
import urllib.parse
12-
from collections import Counter
1312
from queue import Empty, Full, Queue
1413
from types import ModuleType
1514
from typing import (
@@ -1675,13 +1674,18 @@ def rows_same(actual: pd.DataFrame, expected: pd.DataFrame) -> bool:
16751674
order of rows. This is useful for testing Ray Data because its interface doesn't
16761675
usually guarantee the order of rows.
16771676
"""
1678-
actual_rows = actual.to_dict(orient="records")
1679-
expected_rows = expected.to_dict(orient="records")
1677+
if len(actual) == len(expected) == 0:
1678+
return True
16801679

1681-
actual_items_counts = Counter(frozenset(row.items()) for row in actual_rows)
1682-
expected_items_counts = Counter(frozenset(row.items()) for row in expected_rows)
1683-
1684-
return actual_items_counts == expected_items_counts
1680+
try:
1681+
pd.testing.assert_frame_equal(
1682+
actual.sort_values(sorted(actual.columns)).reset_index(drop=True),
1683+
expected.sort_values(sorted(expected.columns)).reset_index(drop=True),
1684+
check_dtype=False,
1685+
)
1686+
return True
1687+
except AssertionError:
1688+
return False
16851689

16861690

16871691
def merge_resources_to_ray_remote_args(

python/ray/data/tests/preprocessors/test_discretizer.py

Lines changed: 77 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pytest
33

44
import ray
5+
from ray.data._internal.util import rows_same
56
from ray.data.preprocessors import CustomKBinsDiscretizer, UniformKBinsDiscretizer
67

78

@@ -55,28 +56,27 @@ def test_uniform_kbins_discretizer(
5556
labels_B = dtypes.get("B").categories
5657
ordered_B = dtypes.get("B").ordered
5758

58-
assert out_df["A"].equals(
59-
pd.cut(
60-
in_df["A"],
61-
bins_A,
62-
labels=labels_A,
63-
ordered=ordered_A,
64-
right=right,
65-
include_lowest=include_lowest,
66-
)
59+
# Create expected dataframe with transformed columns
60+
expected_df = in_df.copy()
61+
expected_df["A"] = pd.cut(
62+
in_df["A"],
63+
bins_A,
64+
labels=labels_A,
65+
ordered=ordered_A,
66+
right=right,
67+
include_lowest=include_lowest,
6768
)
68-
assert out_df["B"].equals(
69-
pd.cut(
70-
in_df["B"],
71-
bins_B,
72-
labels=labels_B,
73-
ordered=ordered_B,
74-
right=right,
75-
include_lowest=include_lowest,
76-
)
69+
expected_df["B"] = pd.cut(
70+
in_df["B"],
71+
bins_B,
72+
labels=labels_B,
73+
ordered=ordered_B,
74+
right=right,
75+
include_lowest=include_lowest,
7776
)
78-
# Check that the remaining column was not modified
79-
assert out_df["C"].equals(in_df["C"])
77+
78+
# Use rows_same to compare regardless of row ordering
79+
assert rows_same(out_df, expected_df)
8080

8181
# append mode
8282
expected_message = "The length of columns and output_columns must match."
@@ -95,28 +95,27 @@ def test_uniform_kbins_discretizer(
9595
transformed = discretizer.fit_transform(ds)
9696
out_df = transformed.to_pandas()
9797

98-
assert out_df["A_discretized"].equals(
99-
pd.cut(
100-
in_df["A"],
101-
bins_A,
102-
labels=labels_A,
103-
ordered=ordered_A,
104-
right=right,
105-
include_lowest=include_lowest,
106-
)
98+
# Create expected dataframe with appended columns
99+
expected_df = in_df.copy()
100+
expected_df["A_discretized"] = pd.cut(
101+
in_df["A"],
102+
bins_A,
103+
labels=labels_A,
104+
ordered=ordered_A,
105+
right=right,
106+
include_lowest=include_lowest,
107107
)
108-
assert out_df["B_discretized"].equals(
109-
pd.cut(
110-
in_df["B"],
111-
bins_B,
112-
labels=labels_B,
113-
ordered=ordered_B,
114-
right=right,
115-
include_lowest=include_lowest,
116-
)
108+
expected_df["B_discretized"] = pd.cut(
109+
in_df["B"],
110+
bins_B,
111+
labels=labels_B,
112+
ordered=ordered_B,
113+
right=right,
114+
include_lowest=include_lowest,
117115
)
118-
# Check that the remaining column was not modified
119-
assert out_df["C"].equals(in_df["C"])
116+
117+
# Use rows_same to compare regardless of row ordering
118+
assert rows_same(out_df, expected_df)
120119

121120

122121
@pytest.mark.parametrize(
@@ -171,28 +170,27 @@ def test_custom_kbins_discretizer(
171170
labels_B = dtypes.get("B").categories
172171
ordered_B = dtypes.get("B").ordered
173172

174-
assert out_df["A"].equals(
175-
pd.cut(
176-
in_df["A"],
177-
bins_A,
178-
labels=labels_A,
179-
ordered=ordered_A,
180-
right=right,
181-
include_lowest=include_lowest,
182-
)
173+
# Create expected dataframe with transformed columns
174+
expected_df = in_df.copy()
175+
expected_df["A"] = pd.cut(
176+
in_df["A"],
177+
bins_A,
178+
labels=labels_A,
179+
ordered=ordered_A,
180+
right=right,
181+
include_lowest=include_lowest,
183182
)
184-
assert out_df["B"].equals(
185-
pd.cut(
186-
in_df["B"],
187-
bins_B,
188-
labels=labels_B,
189-
ordered=ordered_B,
190-
right=right,
191-
include_lowest=include_lowest,
192-
)
183+
expected_df["B"] = pd.cut(
184+
in_df["B"],
185+
bins_B,
186+
labels=labels_B,
187+
ordered=ordered_B,
188+
right=right,
189+
include_lowest=include_lowest,
193190
)
194-
# Check that the remaining column was not modified
195-
assert out_df["C"].equals(in_df["C"])
191+
192+
# Use rows_same to compare regardless of row ordering
193+
assert rows_same(out_df, expected_df)
196194

197195
# append mode
198196
expected_message = "The length of columns and output_columns must match."
@@ -211,28 +209,27 @@ def test_custom_kbins_discretizer(
211209
transformed = discretizer.fit_transform(ds)
212210
out_df = transformed.to_pandas()
213211

214-
assert out_df["A_discretized"].equals(
215-
pd.cut(
216-
in_df["A"],
217-
bins_A,
218-
labels=labels_A,
219-
ordered=ordered_A,
220-
right=right,
221-
include_lowest=include_lowest,
222-
)
212+
# Create expected dataframe with appended columns
213+
expected_df = in_df.copy()
214+
expected_df["A_discretized"] = pd.cut(
215+
in_df["A"],
216+
bins_A,
217+
labels=labels_A,
218+
ordered=ordered_A,
219+
right=right,
220+
include_lowest=include_lowest,
223221
)
224-
assert out_df["B_discretized"].equals(
225-
pd.cut(
226-
in_df["B"],
227-
bins_B,
228-
labels=labels_B,
229-
ordered=ordered_B,
230-
right=right,
231-
include_lowest=include_lowest,
232-
)
222+
expected_df["B_discretized"] = pd.cut(
223+
in_df["B"],
224+
bins_B,
225+
labels=labels_B,
226+
ordered=ordered_B,
227+
right=right,
228+
include_lowest=include_lowest,
233229
)
234-
# Check that the remaining column was not modified
235-
assert out_df["C"].equals(in_df["C"])
230+
231+
# Use rows_same to compare regardless of row ordering
232+
assert rows_same(out_df, expected_df)
236233

237234

238235
if __name__ == "__main__":

0 commit comments

Comments
 (0)