Skip to content

Commit 4232f9f

Browse files
committed
fix some issues
Signed-off-by: dragongu <andrewgu@vip.qq.com>
1 parent 56f5f07 commit 4232f9f

File tree

1 file changed

+20
-8
lines changed

1 file changed

+20
-8
lines changed

python/ray/data/_internal/execution/interfaces/physical_operator.py

Lines changed: 20 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,11 @@
3838

3939
logger = logging.getLogger(__name__)
4040

41+
# Timeout for waiting for metadata reference to become available (in seconds)
42+
METADATA_REF_WAIT_TIMEOUT_SECONDS = 0.1
43+
44+
# Timeout for getting metadata from Ray object references (in seconds)
45+
METADATA_GET_TIMEOUT_SECONDS = 1
4146

4247
# TODO(hchen): Ray Core should have a common interface for these two types.
4348
Waitable = Union[ray.ObjectRef, ObjectRefGenerator]
@@ -130,11 +135,11 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
130135
bytes_read = 0
131136
while max_bytes_to_read is None or bytes_read < max_bytes_to_read:
132137
block_ref, meta_ref = self._pending_block_pair or (
133-
ray.ObjectRef.nil(),
134-
ray.ObjectRef.nil(),
138+
None,
139+
None,
135140
)
136141
try:
137-
if block_ref.is_nil():
142+
if block_ref is None:
138143
block_ref = self._streaming_gen._next_sync(0)
139144
if block_ref.is_nil():
140145
# The generator currently doesn't have new output.
@@ -145,16 +150,23 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
145150
break
146151

147152
try:
153+
if meta_ref is None:
154+
meta_ref = self._streaming_gen._next_sync(
155+
METADATA_REF_WAIT_TIMEOUT_SECONDS
156+
)
148157
if meta_ref.is_nil():
149-
meta_ref = self._streaming_gen._next_sync(1)
150-
if meta_ref.is_nil():
151-
self._pending_block_pair = (block_ref, meta_ref)
158+
self._pending_block_pair = (block_ref, None)
152159
break
153160
meta_with_schema: "BlockMetadataWithSchema" = ray.get(
154-
meta_ref, timeout=1
161+
meta_ref, timeout=METADATA_GET_TIMEOUT_SECONDS
155162
)
156163
except ray.exceptions.GetTimeoutError:
157-
logger.warning(f"Get Meta timeout for (block_ref={block_ref.hex()})")
164+
logger.warning(
165+
f"Metadata object not ready for block_ref={block_ref.hex()[:8]}... "
166+
f"(operator={self.__class__.__name__}). "
167+
f"Metadata may still be computing or worker may have failed and object is being reconstructed. "
168+
f"Will retry in next iteration."
169+
)
158170
self._pending_block_pair = (block_ref, meta_ref)
159171
break
160172
except StopIteration:

0 commit comments

Comments
 (0)