Skip to content

Commit ddeb9b6

Browse files
YvesDupEpicWink
authored andcommitted
Fix queue shutdown
* Include raised exception in docstrings * Handle queue shutdown in task_done and join * Factor out queue-state checks and updates to methods * Logic fixes in qsize, get and shutdown * Don't set unfinished_tasks to 0 on immediate shutdown * Updated tests * Document feature added in 3.13
1 parent 16e9c09 commit ddeb9b6

File tree

8 files changed

+1146
-118
lines changed

8 files changed

+1146
-118
lines changed

Doc/library/queue.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ The :mod:`queue` module defines the following classes and exceptions:
9898
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
9999
a :class:`Queue` object which has been shut down.
100100

101-
.. versionadded:: 3.12
101+
.. versionadded:: 3.13
102102

103103

104104
.. _queueobjects:
@@ -247,7 +247,7 @@ them down.
247247
All blocked callers of put() will be unblocked, and also get()
248248
and join() if *immediate* is true.
249249

250-
.. versionadded:: 3.12
250+
.. versionadded:: 3.13
251251

252252

253253
SimpleQueue Objects

Lib/asyncio/queues.py

Lines changed: 63 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
)
99

1010
import collections
11+
import enum
1112
import heapq
1213
from types import GenericAlias
1314

@@ -30,9 +31,10 @@ class QueueShutDown(Exception):
3031
pass
3132

3233

33-
_queue_alive = "alive"
34-
_queue_shutdown = "shutdown"
35-
_queue_shutdown_immediate = "shutdown-immediate"
34+
class _QueueState(enum.Enum):
35+
ALIVE = "alive"
36+
SHUTDOWN = "shutdown"
37+
SHUTDOWN_IMMEDIATE = "shutdown-immediate"
3638

3739

3840
class Queue(mixins._LoopBoundMixin):
@@ -58,7 +60,7 @@ def __init__(self, maxsize=0):
5860
self._finished = locks.Event()
5961
self._finished.set()
6062
self._init(maxsize)
61-
self.shutdown_state = _queue_alive
63+
self._shutdown_state = _QueueState.ALIVE
6264

6365
# These three are overridable in subclasses.
6466

@@ -99,6 +101,8 @@ def _format(self):
99101
result += f' _putters[{len(self._putters)}]'
100102
if self._unfinished_tasks:
101103
result += f' tasks={self._unfinished_tasks}'
104+
if not self._is_alive():
105+
result += f' state={self._shutdown_state.value}'
102106
return result
103107

104108
def qsize(self):
@@ -130,8 +134,10 @@ async def put(self, item):
130134
131135
Put an item into the queue. If the queue is full, wait until a free
132136
slot is available before adding item.
137+
138+
Raises QueueShutDown if the queue has been shut down.
133139
"""
134-
if self.shutdown_state != _queue_alive:
140+
if not self._is_alive():
135141
raise QueueShutDown
136142
while self.full():
137143
putter = self._get_loop().create_future()
@@ -145,23 +151,25 @@ async def put(self, item):
145151
self._putters.remove(putter)
146152
except ValueError:
147153
# The putter could be removed from self._putters by a
148-
# previous get_nowait call.
154+
# previous get_nowait call or a shutdown call.
149155
pass
150156
if not self.full() and not putter.cancelled():
151157
# We were woken up by get_nowait(), but can't take
152158
# the call. Wake up the next in line.
153159
self._wakeup_next(self._putters)
154160
raise
155-
if self.shutdown_state != _queue_alive:
161+
if not self._is_alive():
156162
raise QueueShutDown
157163
return self.put_nowait(item)
158164

159165
def put_nowait(self, item):
160166
"""Put an item into the queue without blocking.
161167
162168
If no free slot is immediately available, raise QueueFull.
169+
170+
Raises QueueShutDown if the queue has been shut down.
163171
"""
164-
if self.shutdown_state != _queue_alive:
172+
if not self._is_alive():
165173
raise QueueShutDown
166174
if self.full():
167175
raise QueueFull
@@ -174,11 +182,14 @@ async def get(self):
174182
"""Remove and return an item from the queue.
175183
176184
If queue is empty, wait until an item is available.
185+
186+
Raises QueueShutDown if the queue has been shut down and is empty, or
187+
if the queue has been shut down immediately.
177188
"""
178-
if self.shutdown_state == _queue_shutdown_immediate:
189+
if self._is_shutdown_immediate():
179190
raise QueueShutDown
180191
while self.empty():
181-
if self.shutdown_state != _queue_alive:
192+
if self._is_shutdown():
182193
raise QueueShutDown
183194
getter = self._get_loop().create_future()
184195
self._getters.append(getter)
@@ -191,28 +202,32 @@ async def get(self):
191202
self._getters.remove(getter)
192203
except ValueError:
193204
# The getter could be removed from self._getters by a
194-
# previous put_nowait call.
205+
# previous put_nowait call,
206+
# or a shutdown call.
195207
pass
196208
if not self.empty() and not getter.cancelled():
197209
# We were woken up by put_nowait(), but can't take
198210
# the call. Wake up the next in line.
199211
self._wakeup_next(self._getters)
200212
raise
201-
if self.shutdown_state == _queue_shutdown_immediate:
213+
if self._is_shutdown_immediate():
202214
raise QueueShutDown
203215
return self.get_nowait()
204216

205217
def get_nowait(self):
206218
"""Remove and return an item from the queue.
207219
208220
Return an item if one is immediately available, else raise QueueEmpty.
221+
222+
Raises QueueShutDown if the queue has been shut down and is empty, or
223+
if the queue has been shut down immediately.
209224
"""
225+
if self._is_shutdown_immediate():
226+
raise QueueShutDown
210227
if self.empty():
211-
if self.shutdown_state != _queue_alive:
228+
if self._is_shutdown():
212229
raise QueueShutDown
213230
raise QueueEmpty
214-
elif self.shutdown_state == _queue_shutdown_immediate:
215-
raise QueueShutDown
216231
item = self._get()
217232
self._wakeup_next(self._putters)
218233
return item
@@ -230,7 +245,11 @@ def task_done(self):
230245
231246
Raises ValueError if called more times than there were items placed in
232247
the queue.
248+
249+
Raises QueueShutDown if the queue has been shut down immediately.
233250
"""
251+
if self._is_shutdown_immediate():
252+
raise QueueShutDown
234253
if self._unfinished_tasks <= 0:
235254
raise ValueError('task_done() called too many times')
236255
self._unfinished_tasks -= 1
@@ -244,9 +263,15 @@ async def join(self):
244263
queue. The count goes down whenever a consumer calls task_done() to
245264
indicate that the item was retrieved and all work on it is complete.
246265
When the count of unfinished tasks drops to zero, join() unblocks.
266+
267+
Raises QueueShutDown if the queue has been shut down immediately.
247268
"""
269+
if self._is_shutdown_immediate():
270+
raise QueueShutDown
248271
if self._unfinished_tasks > 0:
249272
await self._finished.wait()
273+
if self._is_shutdown_immediate():
274+
raise QueueShutDown
250275

251276
def shutdown(self, immediate=False):
252277
"""Shut-down the queue, making queue gets and puts raise.
@@ -257,19 +282,40 @@ def shutdown(self, immediate=False):
257282
All blocked callers of put() will be unblocked, and also get()
258283
and join() if 'immediate'. The QueueShutDown exception is raised.
259284
"""
285+
if self._is_shutdown_immediate():
286+
return
287+
# here _shutdown_state is ALIVE or SHUTDOWN
260288
if immediate:
261-
self.shutdown_state = _queue_shutdown_immediate
289+
self._set_shutdown_immediate()
262290
while self._getters:
263291
getter = self._getters.popleft()
264292
if not getter.done():
265293
getter.set_result(None)
294+
# Release all 'blocked' tasks/coros in `join()`
295+
self._finished.set()
266296
else:
267-
self.shutdown_state = _queue_shutdown
297+
self._set_shutdown()
268298
while self._putters:
269299
putter = self._putters.popleft()
270300
if not putter.done():
271301
putter.set_result(None)
272302

303+
def _is_alive(self):
304+
return self._shutdown_state is _QueueState.ALIVE
305+
306+
def _is_shutdown(self):
307+
return self._shutdown_state is _QueueState.SHUTDOWN
308+
309+
def _is_shutdown_immediate(self):
310+
return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE
311+
312+
def _set_shutdown(self):
313+
self._shutdown_state = _QueueState.SHUTDOWN
314+
315+
def _set_shutdown_immediate(self):
316+
self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE
317+
318+
273319
class PriorityQueue(Queue):
274320
"""A subclass of Queue; retrieves entries in priority order (lowest first).
275321

0 commit comments

Comments
 (0)