Skip to content

Commit cd8ceaf

Browse files
committed
Add threading queue shutdown
* Include docs
1 parent f508800 commit cd8ceaf

File tree

3 files changed

+123
-0
lines changed

3 files changed

+123
-0
lines changed

Doc/library/queue.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions:
9393
on a :class:`Queue` object which is full.
9494

9595

96+
.. exception:: ShutDown
97+
98+
Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on
99+
a :class:`Queue` object which has been shut down.
100+
101+
.. versionadded:: 3.12
102+
103+
96104
.. _queueobjects:
97105

98106
Queue Objects
@@ -135,6 +143,8 @@ provide the public methods described below.
135143
immediately available, else raise the :exc:`Full` exception (*timeout* is
136144
ignored in that case).
137145

146+
Raises :exc:`ShutDown` if the queue has been shut down.
147+
138148

139149
.. method:: Queue.put_nowait(item)
140150

@@ -155,6 +165,9 @@ provide the public methods described below.
155165
an uninterruptible wait on an underlying lock. This means that no exceptions
156166
can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`.
157167

168+
Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if
169+
the queue has been shut down immediately.
170+
158171

159172
.. method:: Queue.get_nowait()
160173

@@ -177,6 +190,8 @@ fully processed by daemon consumer threads.
177190
Raises a :exc:`ValueError` if called more times than there were items placed in
178191
the queue.
179192

193+
Raises :exc:`ShutDown` if the queue has been shut down immediately.
194+
180195

181196
.. method:: Queue.join()
182197

@@ -187,6 +202,8 @@ fully processed by daemon consumer threads.
187202
indicate that the item was retrieved and all work on it is complete. When the
188203
count of unfinished tasks drops to zero, :meth:`join` unblocks.
189204

205+
Raises :exc:`ShutDown` if the queue has been shut down immediately.
206+
190207

191208
Example of how to wait for enqueued tasks to be completed::
192209

@@ -214,6 +231,25 @@ Example of how to wait for enqueued tasks to be completed::
214231
print('All work completed')
215232

216233

234+
Terminating queues
235+
^^^^^^^^^^^^^^^^^^
236+
237+
:class:`Queue` objects can be made to prevent further interaction by shutting
238+
them down.
239+
240+
.. method:: Queue.shutdown(immediate=False)
241+
242+
Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`.
243+
244+
By default, gets will only raise once the queue is empty. Set
245+
*immediate* to true to make gets raise immediately instead.
246+
247+
All blocked callers of put() will be unblocked, and also get()
248+
and join() if *immediate* is true.
249+
250+
.. versionadded:: 3.12
251+
252+
217253
SimpleQueue Objects
218254
-------------------
219255

Lib/queue.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,15 @@ class Full(Exception):
2525
pass
2626

2727

28+
class ShutDown(Exception):
29+
'''Raised when put/get with shut-down queue.'''
30+
31+
32+
_queue_alive = "alive"
33+
_queue_shutdown = "shutdown"
34+
_queue_shutdown_immediate = "shutdown-immediate"
35+
36+
2837
class Queue:
2938
'''Create a queue object with a given maximum size.
3039
@@ -54,6 +63,9 @@ def __init__(self, maxsize=0):
5463
self.all_tasks_done = threading.Condition(self.mutex)
5564
self.unfinished_tasks = 0
5665

66+
# Queue shut-down state
67+
self.shutdown_state = _queue_alive
68+
5769
def task_done(self):
5870
'''Indicate that a formerly enqueued task is complete.
5971
@@ -87,6 +99,8 @@ def join(self):
8799
'''
88100
with self.all_tasks_done:
89101
while self.unfinished_tasks:
102+
if self.shutdown_state == _queue_shutdown_immediate:
103+
return
90104
self.all_tasks_done.wait()
91105

92106
def qsize(self):
@@ -130,6 +144,8 @@ def put(self, item, block=True, timeout=None):
130144
is immediately available, else raise the Full exception ('timeout'
131145
is ignored in that case).
132146
'''
147+
if self.shutdown_state != _queue_alive:
148+
raise ShutDown
133149
with self.not_full:
134150
if self.maxsize > 0:
135151
if not block:
@@ -138,6 +154,8 @@ def put(self, item, block=True, timeout=None):
138154
elif timeout is None:
139155
while self._qsize() >= self.maxsize:
140156
self.not_full.wait()
157+
if self.shutdown_state != _queue_alive:
158+
raise ShutDown
141159
elif timeout < 0:
142160
raise ValueError("'timeout' must be a non-negative number")
143161
else:
@@ -147,6 +165,8 @@ def put(self, item, block=True, timeout=None):
147165
if remaining <= 0.0:
148166
raise Full
149167
self.not_full.wait(remaining)
168+
if self.shutdown_state != _queue_alive:
169+
raise ShutDown
150170
self._put(item)
151171
self.unfinished_tasks += 1
152172
self.not_empty.notify()
@@ -162,22 +182,36 @@ def get(self, block=True, timeout=None):
162182
available, else raise the Empty exception ('timeout' is ignored
163183
in that case).
164184
'''
185+
if self.shutdown_state == _queue_shutdown_immediate:
186+
raise ShutDown
165187
with self.not_empty:
166188
if not block:
167189
if not self._qsize():
190+
if self.shutdown_state != _queue_alive:
191+
raise ShutDown
168192
raise Empty
169193
elif timeout is None:
170194
while not self._qsize():
195+
if self.shutdown_state != _queue_alive:
196+
raise ShutDown
171197
self.not_empty.wait()
198+
if self.shutdown_state != _queue_alive:
199+
raise ShutDown
172200
elif timeout < 0:
173201
raise ValueError("'timeout' must be a non-negative number")
174202
else:
175203
endtime = time() + timeout
176204
while not self._qsize():
205+
if self.shutdown_state != _queue_alive:
206+
raise ShutDown
177207
remaining = endtime - time()
178208
if remaining <= 0.0:
179209
raise Empty
180210
self.not_empty.wait(remaining)
211+
if self.shutdown_state != _queue_alive:
212+
raise ShutDown
213+
if self.shutdown_state == _queue_shutdown_immediate:
214+
raise ShutDown
181215
item = self._get()
182216
self.not_full.notify()
183217
return item
@@ -198,6 +232,24 @@ def get_nowait(self):
198232
'''
199233
return self.get(block=False)
200234

235+
def shutdown(self, immediate=False):
236+
'''Shut-down the queue, making queue gets and puts raise.
237+
238+
By default, gets will only raise once the queue is empty. Set
239+
'immediate' to True to make gets raise immediately instead.
240+
241+
All blocked callers of put() will be unblocked, and also get()
242+
and join() if 'immediate'. The ShutDown exception is raised.
243+
'''
244+
with self.mutex:
245+
if immediate:
246+
self.shutdown_state = _queue_shutdown_immediate
247+
self.not_empty.notify_all()
248+
self.all_tasks_done.notify_all()
249+
else:
250+
self.shutdown_state = _queue_shutdown
251+
self.not_full.notify_all()
252+
201253
# Override these methods to implement other queue organizations
202254
# (e.g. stack or priority queue).
203255
# These will only be called with appropriate locks held

Lib/test/test_queue.py

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,41 @@ def test_shrinking_queue(self):
241241
with self.assertRaises(self.queue.Full):
242242
q.put_nowait(4)
243243

244+
def test_shutdown_empty(self):
245+
q = self.type2test()
246+
q.shutdown()
247+
try:
248+
q.put("data")
249+
self.fail("Didn't appear to shut-down queue")
250+
except self.queue.ShutDown:
251+
pass
252+
try:
253+
q.get()
254+
self.fail("Didn't appear to shut-down queue")
255+
except self.queue.ShutDown:
256+
pass
257+
258+
def test_shutdown_nonempty(self):
259+
q = self.type2test()
260+
q.put("data")
261+
q.shutdown()
262+
q.get()
263+
try:
264+
q.get()
265+
self.fail("Didn't appear to shut-down queue")
266+
except self.queue.ShutDown:
267+
pass
268+
269+
def test_shutdown_immediate(self):
270+
q = self.type2test()
271+
q.put("data")
272+
q.shutdown(immediate=True)
273+
try:
274+
q.get()
275+
self.fail("Didn't appear to shut-down queue")
276+
except self.queue.ShutDown:
277+
pass
278+
244279
class QueueTest(BaseQueueTestMixin):
245280

246281
def setUp(self):

0 commit comments

Comments
 (0)