Skip to content

Commit ca118d7

Browse files
committed
Shut-down immediate consumes queue
1 parent 089eb96 commit ca118d7

File tree

2 files changed

+31
-81
lines changed

2 files changed

+31
-81
lines changed

Lib/queue.py

Lines changed: 11 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,6 @@ class ShutDown(Exception):
2929
'''Raised when put/get with shut-down queue.'''
3030

3131

32-
_queue_alive = "alive"
33-
_queue_shutdown = "shutdown"
34-
_queue_shutdown_immediate = "shutdown-immediate"
35-
3632
class Queue:
3733
'''Create a queue object with a given maximum size.
3834
@@ -63,7 +59,7 @@ def __init__(self, maxsize=0):
6359
self.unfinished_tasks = 0
6460

6561
# Queue shutdown state
66-
self.shutdown_state = _queue_alive
62+
self.is_shutdown = False
6763

6864
def task_done(self):
6965
'''Indicate that a formerly enqueued task is complete.
@@ -82,8 +78,6 @@ def task_done(self):
8278
Raises ShutDown if the queue has been shut down immediately.
8379
'''
8480
with self.all_tasks_done:
85-
if self._is_shutdown_immediate():
86-
raise ShutDown
8781
unfinished = self.unfinished_tasks - 1
8882
if unfinished <= 0:
8983
if unfinished < 0:
@@ -103,12 +97,8 @@ def join(self):
10397
Raises ShutDown if the queue has been shut down immediately.
10498
'''
10599
with self.all_tasks_done:
106-
if self._is_shutdown_immediate():
107-
raise ShutDown
108100
while self.unfinished_tasks:
109101
self.all_tasks_done.wait()
110-
if self._is_shutdown_immediate():
111-
raise ShutDown
112102

113103
def qsize(self):
114104
'''Return the approximate size of the queue (not reliable!).'''
@@ -154,7 +144,7 @@ def put(self, item, block=True, timeout=None):
154144
Raises ShutDown if the queue has been shut down.
155145
'''
156146
with self.not_full:
157-
if not self._is_alive():
147+
if self.is_shutdown:
158148
raise ShutDown
159149
if self.maxsize > 0:
160150
if not block:
@@ -163,7 +153,7 @@ def put(self, item, block=True, timeout=None):
163153
elif timeout is None:
164154
while self._qsize() >= self.maxsize:
165155
self.not_full.wait()
166-
if not self._is_alive():
156+
if self.is_shutdown:
167157
raise ShutDown
168158
elif timeout < 0:
169159
raise ValueError("'timeout' must be a non-negative number")
@@ -174,7 +164,7 @@ def put(self, item, block=True, timeout=None):
174164
if remaining <= 0.0:
175165
raise Full
176166
self.not_full.wait(remaining)
177-
if not self._is_alive():
167+
if self.is_shutdown:
178168
raise ShutDown
179169
self._put(item)
180170
self.unfinished_tasks += 1
@@ -195,16 +185,15 @@ def get(self, block=True, timeout=None):
195185
or if the queue has been shut down immediately.
196186
'''
197187
with self.not_empty:
198-
if self._is_shutdown_immediate() or\
199-
(self._is_shutdown() and not self._qsize()):
188+
if self.is_shutdown and not self._qsize():
200189
raise ShutDown
201190
if not block:
202191
if not self._qsize():
203192
raise Empty
204193
elif timeout is None:
205194
while not self._qsize():
206195
self.not_empty.wait()
207-
if self._is_shutdown_immediate():
196+
if self.is_shutdown and not self._qsize():
208197
raise ShutDown
209198
elif timeout < 0:
210199
raise ValueError("'timeout' must be a non-negative number")
@@ -215,7 +204,7 @@ def get(self, block=True, timeout=None):
215204
if remaining <= 0.0:
216205
raise Empty
217206
self.not_empty.wait(remaining)
218-
if self._is_shutdown_immediate():
207+
if self.is_shutdown and not self._qsize():
219208
raise ShutDown
220209
item = self._get()
221210
self.not_full.notify()
@@ -247,32 +236,16 @@ def shutdown(self, immediate=False):
247236
and join() if 'immediate'. The ShutDown exception is raised.
248237
'''
249238
with self.mutex:
250-
if self._is_shutdown_immediate():
251-
return
239+
self.is_shutdown = True
252240
if immediate:
253-
self._set_shutdown_immediate()
241+
while self._qsize():
242+
self._get()
243+
self.unfinished_tasks = 0
254244
self.not_empty.notify_all()
255245
# release all blocked threads in `join()`
256246
self.all_tasks_done.notify_all()
257-
else:
258-
self._set_shutdown()
259247
self.not_full.notify_all()
260248

261-
def _is_alive(self):
262-
return self.shutdown_state == _queue_alive
263-
264-
def _is_shutdown(self):
265-
return self.shutdown_state == _queue_shutdown
266-
267-
def _is_shutdown_immediate(self):
268-
return self.shutdown_state == _queue_shutdown_immediate
269-
270-
def _set_shutdown(self):
271-
self.shutdown_state = _queue_shutdown
272-
273-
def _set_shutdown_immediate(self):
274-
self.shutdown_state = _queue_shutdown_immediate
275-
276249

277250
# Override these methods to implement other queue organizations
278251
# (e.g. stack or priority queue).

Lib/test/test_queue.py

Lines changed: 20 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -267,16 +267,15 @@ def test_shutdown_immediate(self):
267267
def test_shutdown_allowed_transitions(self):
268268
# allowed transitions would be from alive via shutdown to immediate
269269
q = self.type2test()
270-
self.assertEqual("alive", q.shutdown_state)
270+
self.assertFalse(q.is_shutdown)
271271

272272
q.shutdown()
273-
self.assertEqual("shutdown", q.shutdown_state)
273+
self.assertTrue(q.is_shutdown)
274274

275275
q.shutdown(immediate=True)
276-
self.assertEqual("shutdown-immediate", q.shutdown_state)
276+
self.assertTrue(q.is_shutdown)
277277

278278
q.shutdown(immediate=False)
279-
self.assertNotEqual("shutdown", q.shutdown_state)
280279

281280
def _shutdown_all_methods_in_one_thread(self, immediate):
282281
q = self.type2test(2)
@@ -293,10 +292,9 @@ def _shutdown_all_methods_in_one_thread(self, immediate):
293292
q.get()
294293
with self.assertRaises(self.queue.ShutDown):
295294
q.get_nowait()
296-
with self.assertRaises(self.queue.ShutDown):
295+
with self.assertRaises(ValueError):
297296
q.task_done()
298-
with self.assertRaises(self.queue.ShutDown):
299-
q.join()
297+
q.join()
300298
else:
301299
self.assertIn(q.get(), "LO")
302300
q.task_done()
@@ -333,10 +331,7 @@ def _write_msg_thread(self, q, n, results, delay,
333331
event_end.set()
334332
time.sleep(delay)
335333
# end of all puts
336-
try:
337-
q.join()
338-
except self.queue.ShutDown:
339-
pass
334+
q.join()
340335

341336
def _read_msg_thread(self, q, nb, results, delay, event_start):
342337
event_start.wait()
@@ -355,26 +350,17 @@ def _read_msg_thread(self, q, nb, results, delay, event_start):
355350
nb -= 1
356351
except self.queue.Empty:
357352
pass
358-
try:
359-
q.join()
360-
except self.queue.ShutDown:
361-
pass
353+
q.join()
362354

363355
def _shutdown_thread(self, q, event_end, immediate):
364356
event_end.wait()
365357
q.shutdown(immediate)
366-
try:
367-
q.join()
368-
except self.queue.ShutDown:
369-
pass
358+
q.join()
370359

371360
def _join_thread(self, q, delay, event_start):
372361
event_start.wait()
373362
time.sleep(delay)
374-
try:
375-
q.join()
376-
except self.queue.ShutDown:
377-
pass
363+
q.join()
378364

379365
def _shutdown_all_methods_in_many_threads(self, immediate):
380366
q = self.type2test()
@@ -413,6 +399,9 @@ def _shutdown_all_methods_in_many_threads(self, immediate):
413399
assert(len(res_gets) <= len(res_puts))
414400
assert(res_gets.count(True) <= res_puts.count(True))
415401

402+
for thread in ps[1:]:
403+
thread.join()
404+
416405
def test_shutdown_all_methods_in_many_threads(self):
417406
return self._shutdown_all_methods_in_many_threads(False)
418407

@@ -544,15 +533,9 @@ def _shutdown_join(self, immediate):
544533
go = threading.Event()
545534
nb = q.qsize()
546535

547-
if immediate:
548-
thrds = (
549-
(self._join_shutdown, (q, results)),
550-
(self._join_shutdown, (q, results)),
551-
)
552-
else:
553-
thrds = (
554-
(self._join, (q, results)),
555-
(self._join, (q, results)),
536+
thrds = (
537+
(self._join, (q, results)),
538+
(self._join, (q, results)),
556539
)
557540
threads = []
558541
for func, params in thrds:
@@ -584,21 +567,15 @@ def _shutdown_put_join(self, immediate):
584567
nb = q.qsize()
585568
# queue not fulled
586569

587-
if immediate:
588-
thrds = (
589-
(self._put_shutdown, (q, "E", go, results)),
590-
(self._join_shutdown, (q, results)),
591-
)
592-
else:
593-
thrds = (
594-
(self._put_shutdown, (q, "E", go, results)),
595-
(self._join, (q, results)),
596-
)
570+
thrds = (
571+
(self._put_shutdown, (q, "E", go, results)),
572+
(self._join, (q, results)),
573+
)
597574
threads = []
598575
for func, params in thrds:
599576
threads.append(threading.Thread(target=func, args=params))
600577
threads[-1].start()
601-
if not immediate:
578+
if not immediate or immediate: # TODO: dedent (minimising Git diff)
602579
self.assertEqual(q.unfinished_tasks, nb)
603580
for i in range(nb):
604581
t = threading.Thread(target=q.task_done)

0 commit comments

Comments
 (0)