Skip to content

Commit e41df73

Browse files
committed
[Data] Fix driver hang during streaming generator block metadata retrieval
Fix driver hang that causes temporary job performance degradation when retrieving metadata references and content, by adding timeout mechanisms and state tracking to prevent blocking waits. Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent 7d5a29c commit e41df73

File tree

2 files changed

+29
-3
lines changed

2 files changed

+29
-3
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ def __init__(
114114
self._streaming_gen = streaming_gen
115115
self._output_ready_callback = output_ready_callback
116116
self._task_done_callback = task_done_callback
117+
self._pending_block_pair = None
117118

118119
def get_waitable(self) -> ObjectRefGenerator:
119120
return self._streaming_gen
@@ -128,8 +129,13 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
128129
"""
129130
bytes_read = 0
130131
while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
132+
block_ref, meta_ref = self._pending_block_pair or (
133+
ray.ObjectRef.nil(),
134+
ray.ObjectRef.nil(),
135+
)
131136
try:
132-
block_ref = self._streaming_gen._next_sync(0)
137+
if block_ref.is_nil():
138+
block_ref = self._streaming_gen._next_sync(0)
133139
if block_ref.is_nil():
134140
# The generator currently doesn't have new output.
135141
# And it's not stopped yet.
@@ -139,9 +145,18 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
139145
break
140146

141147
try:
148+
if meta_ref.is_nil():
149+
meta_ref = self._streaming_gen._next_sync(1)
150+
if meta_ref.is_nil():
151+
self._pending_block_pair = (block_ref, meta_ref)
152+
break
142153
meta_with_schema: "BlockMetadataWithSchema" = ray.get(
143-
next(self._streaming_gen)
154+
meta_ref, timeout=1
144155
)
156+
except ray.exceptions.GetTimeoutError:
157+
logger.warning(f"Get Meta timeout for (block_ref={block_ref.hex()})")
158+
self._pending_block_pair = (block_ref, meta_ref)
159+
break
145160
except StopIteration:
146161
# The generator should always yield 2 values (block and metadata)
147162
# each time. If we get a StopIteration here, it means an error
@@ -164,10 +179,15 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
164179
schema=meta_with_schema.schema,
165180
),
166181
)
182+
self._pending_block_pair = None
167183
bytes_read += meta.size_bytes
168184

169185
return bytes_read
170186

187+
@property
188+
def pending_block_pair(self):
189+
return self._pending_block_pair
190+
171191

172192
class MetadataOpTask(OpTask):
173193
"""Represents an OpTask that only handles metadata, instead of Block data."""

python/ray/data/_internal/execution/streaming_executor_state.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -497,7 +497,7 @@ def process_completed_tasks(
497497
# Process completed Ray tasks and notify operators.
498498
num_errored_blocks = 0
499499
if active_tasks:
500-
ready, _ = ray.wait(
500+
ready, not_ready = ray.wait(
501501
list(active_tasks.keys()),
502502
num_returns=len(active_tasks),
503503
fetch_local=False,
@@ -513,6 +513,12 @@ def process_completed_tasks(
513513
for ref in ready:
514514
state, task = active_tasks[ref]
515515
ready_tasks_by_op[state].append(task)
516+
for ref in not_ready:
517+
state, task = active_tasks[ref]
518+
# If there are pending blocks, try to process them even if streaming_gen is not ready
519+
# This may be because the (block, meta) pair processing encountered exceptions or timeouts, leaving data unconsumed
520+
if isinstance(task, DataOpTask) and task.pending_block_pair:
521+
ready_tasks_by_op[state].append(task)
516522

517523
for state, ready_tasks in ready_tasks_by_op.items():
518524
ready_tasks = sorted(ready_tasks, key=lambda t: t.task_index())

0 commit comments

Comments
 (0)