@@ -319,82 +319,95 @@ def test_shutdown_immediate_all_methods_in_one_thread(self):
319
319
320
320
def _write_msg_thread (self , q , n , results ,
321
321
i_when_exec_shutdown , event_shutdown ,
322
- event_start ):
323
- put_atleast = i_when_exec_shutdown // 2
324
- for i in range (1 , n + 1 ):
322
+ barrier_start ):
323
+ # All `write_msg_threads`
324
+ # put several items into the queue.
325
+ for i in range (0 , i_when_exec_shutdown // 2 ):
326
+ q .put ((i , 'LOYD' ))
327
+ # Wait for the barrier to be complete.
328
+ barrier_start .wait ()
329
+
330
+ for i in range (i , n ):
325
331
try :
326
332
q .put ((i , "YDLO" ))
327
333
except self .queue .ShutDown :
328
334
results .append (False )
329
335
break
330
336
331
- # Be sure that all write_threads
332
- # put few items into the queue.
333
- if i == put_atleast :
334
- event_start .wait ()
335
-
336
- # Triggers shutdown of queue.
337
+ # Trigger queue shutdown.
337
338
if i == i_when_exec_shutdown :
339
+ # Only once thread do it.
338
340
if not event_shutdown .is_set ():
339
341
event_shutdown .set ()
340
342
results .append (True )
341
343
q .join ()
342
344
343
- def _read_msg_thread (self , q , results , event_start ):
344
- nbr = 0
345
+ def _read_msg_thread (self , q , results , barrier_start ):
346
+ # Wait for the barrier to be complete.
347
+ barrier_start .wait ()
345
348
while True :
346
349
try :
347
350
q .get (False )
348
351
q .task_done ()
349
- nbr += 1
350
352
except self .queue .ShutDown :
351
353
results .append (True )
352
354
break
353
355
except self .queue .Empty :
354
356
pass
355
357
q .join ()
356
358
357
- def _shutdown_thread (self , q , event_end , immediate ):
359
+ def _shutdown_thread (self , q , results , event_end , immediate ):
358
360
event_end .wait ()
359
361
q .shutdown (immediate )
362
+ results .append (q .qsize () == 0 )
360
363
q .join ()
361
364
362
- def _join_thread (self , q , event_start ):
363
- event_start .wait ()
365
+ def _join_thread (self , q , barrier_start ):
366
+ # Wait for the barrier to be complete.
367
+ barrier_start .wait ()
364
368
q .join ()
365
369
366
370
def _shutdown_all_methods_in_many_threads (self , immediate ):
371
+ # Run a 'multi-producers/consumers queue' use case,
372
+ # with enough items into the queue.
373
+ # When shutdown, all running threads will be concerned.
367
374
q = self .type2test ()
368
375
ps = []
369
- ev_start = threading .Event ()
370
- ev_exec_shutdown = threading .Event ()
371
376
res_puts = []
372
377
res_gets = []
378
+ res_shutdown = []
373
379
write_threads = 4
374
- read_threads = 16
375
- nb_msgs = 1024 * 4
380
+ read_threads = 6
381
+ join_threads = 2
382
+ nb_msgs = 1024 * 64
376
383
nb_msgs_w = nb_msgs // write_threads
377
384
when_exec_shutdown = nb_msgs_w // 2
385
+ # Use of a `threading.Barrier`` to ensure that all `_write_msg_threads`
386
+ # put their part of items into the queue. And trigger the start of
387
+ # other threads as `_read_msg_thread`and `_join_thread`.
388
+ barrier_start = threading .Barrier (write_threads + read_threads + join_threads )
389
+ ev_exec_shutdown = threading .Event ()
378
390
lprocs = (
379
391
(self ._write_msg_thread , write_threads , (q , nb_msgs_w , res_puts ,
380
392
when_exec_shutdown , ev_exec_shutdown ,
381
- ev_start )),
382
- (self ._read_msg_thread , read_threads , (q , res_gets ,
383
- ev_start )),
384
- (self ._join_thread , 2 , (q , ev_start )),
385
- (self ._shutdown_thread , 1 , (q , ev_exec_shutdown , immediate )),
393
+ barrier_start )),
394
+ (self ._read_msg_thread , read_threads , (q , res_gets , barrier_start )),
395
+ (self ._join_thread , join_threads , (q , barrier_start )),
396
+ (self ._shutdown_thread , 1 , (q , res_shutdown , ev_exec_shutdown , immediate )),
386
397
)
387
- # start all thredas
398
+ # start all threads.
388
399
for func , n , args in lprocs :
389
400
for i in range (n ):
390
401
ps .append (threading .Thread (target = func , args = args ))
391
402
ps [- 1 ].start ()
392
- ev_start .set ()
393
403
for thread in ps :
394
404
thread .join ()
395
405
396
406
self .assertEqual (res_puts .count (True ), 1 )
397
407
self .assertLessEqual (res_gets .count (True ), read_threads )
408
+ if immediate :
409
+ self .assertListEqual (res_shutdown , [True ])
410
+ self .assertTrue (q .empty ())
398
411
399
412
def test_shutdown_all_methods_in_many_threads (self ):
400
413
return self ._shutdown_all_methods_in_many_threads (False )
0 commit comments