3838
3939logger = logging .getLogger (__name__ )
4040
41+ # Timeout for waiting for metadata reference to become available (in seconds)
42+ METADATA_REF_WAIT_TIMEOUT_SECONDS = 0.1
43+
44+ # Timeout for getting metadata from Ray object references (in seconds)
45+ METADATA_GET_TIMEOUT_SECONDS = 0.1
4146
4247# TODO(hchen): Ray Core should have a common interface for these two types.
4348Waitable = Union [ray .ObjectRef , ObjectRefGenerator ]
@@ -130,11 +135,11 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
130135 bytes_read = 0
131136 while max_bytes_to_read is None or bytes_read < max_bytes_to_read :
132137 block_ref , meta_ref = self ._pending_block_pair or (
133- ray . ObjectRef . nil () ,
134- ray . ObjectRef . nil () ,
138+ None ,
139+ None ,
135140 )
136141 try :
137- if block_ref . is_nil () :
142+ if block_ref is None :
138143 block_ref = self ._streaming_gen ._next_sync (0 )
139144 if block_ref .is_nil ():
140145 # The generator currently doesn't have new output.
@@ -145,17 +150,24 @@ def on_data_ready(self, max_bytes_to_read: Optional[int]) -> int:
145150 break
146151
147152 try :
153+ if meta_ref is None :
154+ meta_ref = self ._streaming_gen ._next_sync (
155+ METADATA_REF_WAIT_TIMEOUT_SECONDS
156+ )
148157 if meta_ref .is_nil ():
149- meta_ref = self ._streaming_gen ._next_sync (1 )
150- if meta_ref .is_nil ():
151- self ._pending_block_pair = (block_ref , meta_ref )
158+ self ._pending_block_pair = (block_ref , None )
152159 break
153160 meta_with_schema : "BlockMetadataWithSchema" = ray .get (
154- meta_ref , timeout = 1
161+ meta_ref , timeout = METADATA_GET_TIMEOUT_SECONDS
155162 )
156163 except ray .exceptions .GetTimeoutError :
157- logger .warning (f"Get Meta timeout for (block_ref={ block_ref .hex ()} )" )
158- self ._pending_block_pair = (block_ref , meta_ref )
164+ logger .warning (
165+ f"Metadata object not ready for block_ref={ block_ref .hex ()[:8 ]} ... "
166+ f"(operator={ self .__class__ .__name__ } ). "
167+ f"Metadata may still be computing or worker may have failed and object is being reconstructed. "
168+ f"Will retry in next iteration."
169+ )
170+ self ._pending_block_pair = (block_ref , None )
159171 break
160172 except StopIteration :
161173 # The generator should always yield 2 values (block and metadata)
0 commit comments