File tree Expand file tree Collapse file tree 1 file changed +10
-11
lines changed
python/ray/data/_internal/block_batching Expand file tree Collapse file tree 1 file changed +10
-11
lines changed Original file line number Diff line number Diff line change @@ -232,22 +232,19 @@ def __init__(self):
232232 self ._thread .start ()
233233
234234 def _run (self ):
235- while True :
235+ while not self . _stopped :
236236 try :
237- blocks_to_wait = []
238-
239237 with self ._condition :
240- if len (self ._blocks ) > 0 :
241- blocks_to_wait , self ._blocks = self ._blocks [:], []
242- else :
243- if self ._stopped :
244- return
245- blocks_to_wait = []
238+ if len (self ._blocks ) == 0 :
239+ # Park, waiting for notification that prefetching
240+ # should resume
246241 self ._condition .wait ()
247242
248- if len (blocks_to_wait ) > 0 :
243+ blocks_to_fetch , self ._blocks = self ._blocks [:], []
244+
245+ if len (blocks_to_fetch ) > 0 :
249246 ray .wait (
250- blocks_to_wait ,
247+ blocks_to_fetch ,
251248 num_returns = 1 ,
252249 # NOTE: We deliberately setting timeout to 0 to avoid
253250 # blocking the fetching thread unnecessarily
@@ -257,6 +254,8 @@ def _run(self):
257254 except Exception :
258255 logger .exception ("Error in prefetcher thread." )
259256
257+ logger .exception ("Exiting prefetcher's background thread" )
258+
260259 def prefetch_blocks (self , blocks : List [ObjectRef [Block ]]):
261260 with self ._condition :
262261 if self ._stopped :
You can’t perform that action at this time.
0 commit comments