Skip to content

Commit 15b82ae

Browse files
dragongubveeramani
andauthored
[Data] Fix driver hang during streaming generator block metadata retrieval (#56451)
## Why are these changes needed? This PR fixes a critical driver hang issue in Ray Data's streaming generator. The problem occurs when computation completes and block data is generated, but the worker crashes before the metadata object is generated, causing the driver to hang completely until the task's metadata is successfully rebuilt. This creates severe performance issues, especially in cluster environments with significant resource fluctuations. ## What was the problem? **Specific scenario:** 1. Computation completes, block data is generated 2. Worker crashes before the metadata object is generated 3. Driver enters the [physical_operator.on_data_ready()](https://github.com/ray-project/ray/blob/ray-2.46.0/python/ray/data/_internal/execution/interfaces/physical_operator.py#L124) logic and waits indefinitely for metadata until task retry succeeds and meta object becomes available 4. If cluster resources are insufficient, the task cannot be retried successfully, causing driver to hang for hours (actual case: 12 hours) **Technical causes:** - Using `ray.get(next(self._streaming_gen))` for metadata content retrieval, which may hang indefinitely - Lack of timeout mechanisms and state tracking, preventing driver recovery from hang state - No proper handling when worker crashes between block generation and metadata generation ## What does this fix do? - Adds `_pending_block_ref` and `_pending_meta_ref` state tracking to properly handle block/metadata pairs - Uses `ray.get(meta_ref, timeout=1)` with timeout for metadata content retrieval - Adds error handling for `GetTimeoutError` with warning logs - Prevents unnecessary re-fetching of already obtained block references - **Key improvement: Prevents driver from hanging for extended periods when worker crashes between block and metadata generation** ## Related issue number Fixes critical performance issue in streaming data processing that causes driver to hang for extended periods (up to 12 hours) when workers crash between block generation and metadata generation, especially in cluster environments with significant resource fluctuations. ## Checks - [x] I've signed off every commit(by using the -s flag, i.e., `git commit -s`) in this PR. - [x] I've run `scripts/format.sh` to lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/. - [x] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/ - **Testing Strategy** - [x] Unit tests - [ ] Release tests - [ ] This PR is not tested :( --------- Signed-off-by: dragongu <andrewgu@vip.qq.com> Signed-off-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Balaji Veeramani <bveeramani@berkeley.edu> Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
1 parent 6320275 commit 15b82ae

File tree

4 files changed

+263
-124
lines changed

4 files changed

+263
-124
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 75 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
from ray.data._internal.output_buffer import OutputBlockSizeOption
3131
from ray.data._internal.progress_bar import ProgressBar
3232
from ray.data._internal.stats import StatsDict, Timer
33+
from ray.data.block import Block, BlockMetadata
3334
from ray.data.context import DataContext
3435

3536
if TYPE_CHECKING:
@@ -38,6 +39,11 @@
3839

3940
logger = logging.getLogger(__name__)
4041

42+
# Timeout for getting metadata from Ray object references (in seconds)
43+
METADATA_GET_TIMEOUT_S = 1.0
44+
45+
# Timeout for waiting for metadata object to become available (in seconds)
46+
METADATA_WAIT_TIMEOUT_S = 0.1
4147

4248
# TODO(hchen): Ray Core should have a common interface for these two types.
4349
Waitable = Union[ray.ObjectRef, ObjectRefGenerator]
@@ -93,8 +99,8 @@ def __init__(
9399
self,
94100
task_index: int,
95101
streaming_gen: ObjectRefGenerator,
96-
output_ready_callback: Callable[[RefBundle], None],
97-
task_done_callback: Callable[[Optional[Exception]], None],
102+
output_ready_callback: Callable[[RefBundle], None] = lambda bundle: None,
103+
task_done_callback: Callable[[Optional[Exception]], None] = lambda exc: None,
98104
task_resource_bundle: Optional[ExecutionResources] = None,
99105
):
100106
"""Create a DataOpTask
@@ -115,6 +121,13 @@ def __init__(
115121
self._output_ready_callback = output_ready_callback
116122
self._task_done_callback = task_done_callback
117123

124+
# If the generator hasn't produced block metadata yet, or if the block metadata
125+
# object isn't available after we get a reference, we need store the pending
126+
# references and wait until Ray (re)constructs the block metadata. Either case
127+
# can happen if a node dies after producing a block.
128+
self._pending_block_ref: ray.ObjectRef[Block] = ray.ObjectRef.nil()
129+
self._pending_meta_ref: ray.ObjectRef[BlockMetadata] = ray.ObjectRef.nil()
130+
118131
def get_waitable(self) -> ObjectRefGenerator:
119132
return self._streaming_gen
120133

@@ -128,42 +141,81 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
128141
"""
129142
bytes_read = 0
130143
while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
131-
try:
132-
block_ref = self._streaming_gen._next_sync(0)
133-
if block_ref.is_nil():
144+
if self._pending_block_ref.is_nil():
145+
assert self._pending_meta_ref.is_nil(), (
146+
"This method expects streaming generators to yield blocks then "
147+
"metadata. So, if we have a reference to metadata but not the "
148+
"block, it means there's an error in the implementation."
149+
)
150+
151+
try:
152+
self._pending_block_ref = self._streaming_gen._next_sync(
153+
timeout_s=0
154+
)
155+
except StopIteration:
156+
self._task_done_callback(None)
157+
break
158+
159+
if self._pending_block_ref.is_nil():
134160
# The generator currently doesn't have new output.
135161
# And it's not stopped yet.
136162
break
137-
except StopIteration:
138-
self._task_done_callback(None)
139-
break
163+
164+
if self._pending_meta_ref.is_nil():
165+
try:
166+
self._pending_meta_ref = self._streaming_gen._next_sync(
167+
timeout_s=METADATA_WAIT_TIMEOUT_S
168+
)
169+
except StopIteration:
170+
# The generator should always yield 2 values (block and metadata)
171+
# each time. If we get a StopIteration here, it means an error
172+
# happened in the task.
173+
# And in this case, the block_ref is the exception object.
174+
# TODO(hchen): Ray Core should have a better interface for
175+
# detecting and obtaining the exception.
176+
try:
177+
ray.get(self._pending_block_ref)
178+
assert False, "Above ray.get should raise an exception."
179+
except Exception as ex:
180+
self._task_done_callback(ex)
181+
raise ex from None
182+
183+
if self._pending_meta_ref.is_nil():
184+
# We have a reference to the block but the metadata isn't ready
185+
# yet.
186+
break
140187

141188
try:
189+
# The timeout for `ray.get` includes the time required to ship the
190+
# block metadata to this node. So, if we set the timeout to 0, `ray.get`
191+
# will timeout and possible cancel the download. To avoid this issue,
192+
# we set the timeout to a small non-zero value.
142193
meta_with_schema: "BlockMetadataWithSchema" = ray.get(
143-
next(self._streaming_gen)
194+
self._pending_meta_ref, timeout=METADATA_GET_TIMEOUT_S
144195
)
145-
except StopIteration:
146-
# The generator should always yield 2 values (block and metadata)
147-
# each time. If we get a StopIteration here, it means an error
148-
# happened in the task.
149-
# And in this case, the block_ref is the exception object.
150-
# TODO(hchen): Ray Core should have a better interface for
151-
# detecting and obtaining the exception.
152-
try:
153-
ray.get(block_ref)
154-
assert False, "Above ray.get should raise an exception."
155-
except Exception as ex:
156-
self._task_done_callback(ex)
157-
raise ex from None
196+
except ray.exceptions.GetTimeoutError:
197+
# We have a reference to the block and its metadata, but the metadata
198+
# object isn't available. This can happen if the node dies.
199+
logger.warning(
200+
f"Metadata object not ready for "
201+
f"ref={self._pending_meta_ref.hex()} "
202+
f"(operator={self.__class__.__name__}). "
203+
f"Metadata may still be computing or worker may have failed and "
204+
f"object is being reconstructed. Will retry in next iteration."
205+
)
206+
break
158207

159208
meta = meta_with_schema.metadata
160209
self._output_ready_callback(
161210
RefBundle(
162-
[(block_ref, meta)],
211+
[(self._pending_block_ref, meta)],
163212
owns_blocks=True,
164213
schema=meta_with_schema.schema,
165214
),
166215
)
216+
self._pending_block_ref = ray.ObjectRef.nil()
217+
self._pending_meta_ref = ray.ObjectRef.nil()
218+
167219
bytes_read += meta.size_bytes
168220

169221
return bytes_read

python/ray/data/_internal/util.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,6 @@
99
import threading
1010
import time
1111
import urllib.parse
12-
from collections import Counter
1312
from queue import Empty, Full, Queue
1413
from types import ModuleType
1514
from typing import (
@@ -1675,13 +1674,18 @@ def rows_same(actual: pd.DataFrame, expected: pd.DataFrame) -> bool:
16751674
order of rows. This is useful for testing Ray Data because its interface doesn't
16761675
usually guarantee the order of rows.
16771676
"""
1678-
actual_rows = actual.to_dict(orient="records")
1679-
expected_rows = expected.to_dict(orient="records")
1677+
if len(actual) == len(expected) == 0:
1678+
return True
16801679

1681-
actual_items_counts = Counter(frozenset(row.items()) for row in actual_rows)
1682-
expected_items_counts = Counter(frozenset(row.items()) for row in expected_rows)
1683-
1684-
return actual_items_counts == expected_items_counts
1680+
try:
1681+
pd.testing.assert_frame_equal(
1682+
actual.sort_values(sorted(actual.columns)).reset_index(drop=True),
1683+
expected.sort_values(sorted(expected.columns)).reset_index(drop=True),
1684+
check_dtype=False,
1685+
)
1686+
return True
1687+
except AssertionError:
1688+
return False
16851689

16861690

16871691
def merge_resources_to_ray_remote_args(

python/ray/data/tests/preprocessors/test_discretizer.py

Lines changed: 77 additions & 80 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
import pytest
33

44
import ray
5+
from ray.data._internal.util import rows_same
56
from ray.data.preprocessors import CustomKBinsDiscretizer, UniformKBinsDiscretizer
67

78

@@ -55,28 +56,27 @@ def test_uniform_kbins_discretizer(
5556
labels_B = dtypes.get("B").categories
5657
ordered_B = dtypes.get("B").ordered
5758

58-
assert out_df["A"].equals(
59-
pd.cut(
60-
in_df["A"],
61-
bins_A,
62-
labels=labels_A,
63-
ordered=ordered_A,
64-
right=right,
65-
include_lowest=include_lowest,
66-
)
59+
# Create expected dataframe with transformed columns
60+
expected_df = in_df.copy()
61+
expected_df["A"] = pd.cut(
62+
in_df["A"],
63+
bins_A,
64+
labels=labels_A,
65+
ordered=ordered_A,
66+
right=right,
67+
include_lowest=include_lowest,
6768
)
68-
assert out_df["B"].equals(
69-
pd.cut(
70-
in_df["B"],
71-
bins_B,
72-
labels=labels_B,
73-
ordered=ordered_B,
74-
right=right,
75-
include_lowest=include_lowest,
76-
)
69+
expected_df["B"] = pd.cut(
70+
in_df["B"],
71+
bins_B,
72+
labels=labels_B,
73+
ordered=ordered_B,
74+
right=right,
75+
include_lowest=include_lowest,
7776
)
78-
# Check that the remaining column was not modified
79-
assert out_df["C"].equals(in_df["C"])
77+
78+
# Use rows_same to compare regardless of row ordering
79+
assert rows_same(out_df, expected_df)
8080

8181
# append mode
8282
expected_message = "The length of columns and output_columns must match."
@@ -95,28 +95,27 @@ def test_uniform_kbins_discretizer(
9595
transformed = discretizer.fit_transform(ds)
9696
out_df = transformed.to_pandas()
9797

98-
assert out_df["A_discretized"].equals(
99-
pd.cut(
100-
in_df["A"],
101-
bins_A,
102-
labels=labels_A,
103-
ordered=ordered_A,
104-
right=right,
105-
include_lowest=include_lowest,
106-
)
98+
# Create expected dataframe with appended columns
99+
expected_df = in_df.copy()
100+
expected_df["A_discretized"] = pd.cut(
101+
in_df["A"],
102+
bins_A,
103+
labels=labels_A,
104+
ordered=ordered_A,
105+
right=right,
106+
include_lowest=include_lowest,
107107
)
108-
assert out_df["B_discretized"].equals(
109-
pd.cut(
110-
in_df["B"],
111-
bins_B,
112-
labels=labels_B,
113-
ordered=ordered_B,
114-
right=right,
115-
include_lowest=include_lowest,
116-
)
108+
expected_df["B_discretized"] = pd.cut(
109+
in_df["B"],
110+
bins_B,
111+
labels=labels_B,
112+
ordered=ordered_B,
113+
right=right,
114+
include_lowest=include_lowest,
117115
)
118-
# Check that the remaining column was not modified
119-
assert out_df["C"].equals(in_df["C"])
116+
117+
# Use rows_same to compare regardless of row ordering
118+
assert rows_same(out_df, expected_df)
120119

121120

122121
@pytest.mark.parametrize(
@@ -171,28 +170,27 @@ def test_custom_kbins_discretizer(
171170
labels_B = dtypes.get("B").categories
172171
ordered_B = dtypes.get("B").ordered
173172

174-
assert out_df["A"].equals(
175-
pd.cut(
176-
in_df["A"],
177-
bins_A,
178-
labels=labels_A,
179-
ordered=ordered_A,
180-
right=right,
181-
include_lowest=include_lowest,
182-
)
173+
# Create expected dataframe with transformed columns
174+
expected_df = in_df.copy()
175+
expected_df["A"] = pd.cut(
176+
in_df["A"],
177+
bins_A,
178+
labels=labels_A,
179+
ordered=ordered_A,
180+
right=right,
181+
include_lowest=include_lowest,
183182
)
184-
assert out_df["B"].equals(
185-
pd.cut(
186-
in_df["B"],
187-
bins_B,
188-
labels=labels_B,
189-
ordered=ordered_B,
190-
right=right,
191-
include_lowest=include_lowest,
192-
)
183+
expected_df["B"] = pd.cut(
184+
in_df["B"],
185+
bins_B,
186+
labels=labels_B,
187+
ordered=ordered_B,
188+
right=right,
189+
include_lowest=include_lowest,
193190
)
194-
# Check that the remaining column was not modified
195-
assert out_df["C"].equals(in_df["C"])
191+
192+
# Use rows_same to compare regardless of row ordering
193+
assert rows_same(out_df, expected_df)
196194

197195
# append mode
198196
expected_message = "The length of columns and output_columns must match."
@@ -211,28 +209,27 @@ def test_custom_kbins_discretizer(
211209
transformed = discretizer.fit_transform(ds)
212210
out_df = transformed.to_pandas()
213211

214-
assert out_df["A_discretized"].equals(
215-
pd.cut(
216-
in_df["A"],
217-
bins_A,
218-
labels=labels_A,
219-
ordered=ordered_A,
220-
right=right,
221-
include_lowest=include_lowest,
222-
)
212+
# Create expected dataframe with appended columns
213+
expected_df = in_df.copy()
214+
expected_df["A_discretized"] = pd.cut(
215+
in_df["A"],
216+
bins_A,
217+
labels=labels_A,
218+
ordered=ordered_A,
219+
right=right,
220+
include_lowest=include_lowest,
223221
)
224-
assert out_df["B_discretized"].equals(
225-
pd.cut(
226-
in_df["B"],
227-
bins_B,
228-
labels=labels_B,
229-
ordered=ordered_B,
230-
right=right,
231-
include_lowest=include_lowest,
232-
)
222+
expected_df["B_discretized"] = pd.cut(
223+
in_df["B"],
224+
bins_B,
225+
labels=labels_B,
226+
ordered=ordered_B,
227+
right=right,
228+
include_lowest=include_lowest,
233229
)
234-
# Check that the remaining column was not modified
235-
assert out_df["C"].equals(in_df["C"])
230+
231+
# Use rows_same to compare regardless of row ordering
232+
assert rows_same(out_df, expected_df)
236233

237234

238235
if __name__ == "__main__":

0 commit comments

Comments
 (0)