Skip to content

Commit 9c2971b

Browse files
YvesDupEpicWink
authored andcommitted
Fix queue shutdown
* Include raised exception in docstrings * Handle queue shutdown in task_done and join * Factor out queue-state checks and updates to methods * Logic fixes in qsize, get and shutdown * Don't set unfinished_tasks to 0 on immediate shutdown * Updated tests * Document feature added in 3.13
1 parent cd8ceaf commit 9c2971b

File tree

3 files changed

+429
-38
lines changed

3 files changed

+429
-38
lines changed

Doc/library/queue.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ The :mod:`queue` module defines the following classes and exceptions:
9898
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
9999
a :class:`Queue` object which has been shut down.
100100

101-
.. versionadded:: 3.12
101+
.. versionadded:: 3.13
102102

103103

104104
.. _queueobjects:
@@ -247,7 +247,7 @@ them down.
247247
All blocked callers of put() will be unblocked, and also get()
248248
and join() if *immediate* is true.
249249

250-
.. versionadded:: 3.12
250+
.. versionadded:: 3.13
251251

252252

253253
SimpleQueue Objects

Lib/queue.py

Lines changed: 46 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ class ShutDown(Exception):
3333
_queue_shutdown = "shutdown"
3434
_queue_shutdown_immediate = "shutdown-immediate"
3535

36-
3736
class Queue:
3837
'''Create a queue object with a given maximum size.
3938
@@ -63,7 +62,7 @@ def __init__(self, maxsize=0):
6362
self.all_tasks_done = threading.Condition(self.mutex)
6463
self.unfinished_tasks = 0
6564

66-
# Queue shut-down state
65+
# Queue shutdown state
6766
self.shutdown_state = _queue_alive
6867

6968
def task_done(self):
@@ -79,8 +78,12 @@ def task_done(self):
7978
8079
Raises a ValueError if called more times than there were items
8180
placed in the queue.
81+
82+
Raises ShutDown if the queue has been shut down immediately.
8283
'''
8384
with self.all_tasks_done:
85+
if self._is_shutdown_immediate():
86+
raise ShutDown
8487
unfinished = self.unfinished_tasks - 1
8588
if unfinished <= 0:
8689
if unfinished < 0:
@@ -96,12 +99,16 @@ def join(self):
9699
to indicate the item was retrieved and all work on it is complete.
97100
98101
When the count of unfinished tasks drops to zero, join() unblocks.
102+
103+
Raises ShutDown if the queue has been shut down immediately.
99104
'''
100105
with self.all_tasks_done:
106+
if self._is_shutdown_immediate():
107+
raise ShutDown
101108
while self.unfinished_tasks:
102-
if self.shutdown_state == _queue_shutdown_immediate:
103-
return
104109
self.all_tasks_done.wait()
110+
if self._is_shutdown_immediate():
111+
raise ShutDown
105112

106113
def qsize(self):
107114
'''Return the approximate size of the queue (not reliable!).'''
@@ -143,18 +150,20 @@ def put(self, item, block=True, timeout=None):
143150
Otherwise ('block' is false), put an item on the queue if a free slot
144151
is immediately available, else raise the Full exception ('timeout'
145152
is ignored in that case).
153+
154+
Raises ShutDown if the queue has been shut down.
146155
'''
147-
if self.shutdown_state != _queue_alive:
148-
raise ShutDown
149156
with self.not_full:
157+
if not self._is_alive():
158+
raise ShutDown
150159
if self.maxsize > 0:
151160
if not block:
152161
if self._qsize() >= self.maxsize:
153162
raise Full
154163
elif timeout is None:
155164
while self._qsize() >= self.maxsize:
156165
self.not_full.wait()
157-
if self.shutdown_state != _queue_alive:
166+
if not self._is_alive():
158167
raise ShutDown
159168
elif timeout < 0:
160169
raise ValueError("'timeout' must be a non-negative number")
@@ -165,7 +174,7 @@ def put(self, item, block=True, timeout=None):
165174
if remaining <= 0.0:
166175
raise Full
167176
self.not_full.wait(remaining)
168-
if self.shutdown_state != _queue_alive:
177+
if not self._is_alive():
169178
raise ShutDown
170179
self._put(item)
171180
self.unfinished_tasks += 1
@@ -181,37 +190,33 @@ def get(self, block=True, timeout=None):
181190
Otherwise ('block' is false), return an item if one is immediately
182191
available, else raise the Empty exception ('timeout' is ignored
183192
in that case).
193+
194+
Raises ShutDown if the queue has been shut down and is empty,
195+
or if the queue has been shut down immediately.
184196
'''
185-
if self.shutdown_state == _queue_shutdown_immediate:
186-
raise ShutDown
187197
with self.not_empty:
198+
if self._is_shutdown_immediate() or\
199+
(self._is_shutdown() and not self._qsize()):
200+
raise ShutDown
188201
if not block:
189202
if not self._qsize():
190-
if self.shutdown_state != _queue_alive:
191-
raise ShutDown
192203
raise Empty
193204
elif timeout is None:
194205
while not self._qsize():
195-
if self.shutdown_state != _queue_alive:
196-
raise ShutDown
197206
self.not_empty.wait()
198-
if self.shutdown_state != _queue_alive:
207+
if self._is_shutdown_immediate():
199208
raise ShutDown
200209
elif timeout < 0:
201210
raise ValueError("'timeout' must be a non-negative number")
202211
else:
203212
endtime = time() + timeout
204213
while not self._qsize():
205-
if self.shutdown_state != _queue_alive:
206-
raise ShutDown
207214
remaining = endtime - time()
208215
if remaining <= 0.0:
209216
raise Empty
210217
self.not_empty.wait(remaining)
211-
if self.shutdown_state != _queue_alive:
218+
if self._is_shutdown_immediate():
212219
raise ShutDown
213-
if self.shutdown_state == _queue_shutdown_immediate:
214-
raise ShutDown
215220
item = self._get()
216221
self.not_full.notify()
217222
return item
@@ -242,14 +247,33 @@ def shutdown(self, immediate=False):
242247
and join() if 'immediate'. The ShutDown exception is raised.
243248
'''
244249
with self.mutex:
250+
if self._is_shutdown_immediate():
251+
return
245252
if immediate:
246-
self.shutdown_state = _queue_shutdown_immediate
253+
self._set_shutdown_immediate()
247254
self.not_empty.notify_all()
255+
# release all blocked threads in `join()`
248256
self.all_tasks_done.notify_all()
249257
else:
250-
self.shutdown_state = _queue_shutdown
258+
self._set_shutdown()
251259
self.not_full.notify_all()
252260

261+
def _is_alive(self):
262+
return self.shutdown_state == _queue_alive
263+
264+
def _is_shutdown(self):
265+
return self.shutdown_state == _queue_shutdown
266+
267+
def _is_shutdown_immediate(self):
268+
return self.shutdown_state == _queue_shutdown_immediate
269+
270+
def _set_shutdown(self):
271+
self.shutdown_state = _queue_shutdown
272+
273+
def _set_shutdown_immediate(self):
274+
self.shutdown_state = _queue_shutdown_immediate
275+
276+
253277
# Override these methods to implement other queue organizations
254278
# (e.g. stack or priority queue).
255279
# These will only be called with appropriate locks held

0 commit comments

Comments
 (0)