Skip to content

gh-96471: Add queue shutdown, next step. #102499

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 74 commits into from
Closed
Show file tree
Hide file tree
Changes from 47 commits
Commits
Show all changes
74 commits
Select commit Hold shift + click to select a range
4fd0640
Add threading implementation of queue shutdown
EpicWink Sep 1, 2022
d942c9e
Fix up implementation, add unit-tests
EpicWink Sep 18, 2022
f552ac1
Implement for asyncio queues
EpicWink Sep 18, 2022
78671f9
WIP: multiprocessing queue shutdown
EpicWink Oct 22, 2022
5f31f8e
WIP: multiprocessing queue shutdown
EpicWink Jan 19, 2023
9bbc5db
change comment
YvesDup Feb 10, 2023
f9f2c06
call to self._finished.set() in order to release all joined tasks/coros
YvesDup Feb 10, 2023
7491ef1
add unitests to `shutdwon` method
YvesDup Feb 10, 2023
dd22c6b
add unitests to `shutdwon` method
YvesDup Feb 10, 2023
5239306
replace global state variable with an enum `_QueueState`
YvesDup Feb 10, 2023
4b127b6
replace global state variable with an enum `_QueueState` - erase E ju…
YvesDup Feb 10, 2023
6402de7
simplify and unify tests
YvesDup Feb 11, 2023
be9588b
simplify and unify tests
YvesDup Feb 11, 2023
3613f5d
add `_shutdown_state` to tuples of `__getstate__` and `__setstate__`,…
YvesDup Feb 13, 2023
06775bb
Update initial tests with `self.assertRaises`
YvesDup Feb 15, 2023
6f01015
integration of shudown transition in `shutdown` method
YvesDup Feb 15, 2023
ff9895d
Set `test_shutdown` prefix to all unittests
YvesDup Feb 15, 2023
d42433e
asyncio.queue: refactoring of tests, add new tests, last updates and…
YvesDup Feb 28, 2023
53078bb
first version working
YvesDup Feb 28, 2023
0075039
Some corrections
YvesDup Feb 28, 2023
b4a53d2
Change Enum to global about `shutdown_state` attr
YvesDup Mar 3, 2023
dbe2078
Add new tests
YvesDup Mar 3, 2023
18bb995
Fixes bugs
YvesDup Mar 3, 2023
05700d5
Add first tests
YvesDup Mar 3, 2023
7d01747
Update tests
YvesDup Mar 3, 2023
aad0cba
add _wait()
YvesDup Mar 3, 2023
d9dbb33
Move some tests about shutdwon state and empty queue
YvesDup Mar 3, 2023
9f4c0a3
Merge branch 'python:main' into queue-shutdown
YvesDup Mar 4, 2023
db6a257
📜🤖 Added by blurb_it.
blurb-it[bot] Mar 7, 2023
01e5880
Update 2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst
YvesDup Mar 7, 2023
88627bb
Merge branch 'main' into queue-shutdown
YvesDup Mar 7, 2023
37da705
Update test _shutdown_all_methods
YvesDup Mar 7, 2023
795fb2d
Fix some bugs, Refactoring code
YvesDup Mar 8, 2023
0de7836
Merge branch 'main' into queue-shutdown
YvesDup Mar 9, 2023
4443237
Merge branch 'main' into queue-shutdown
YvesDup Mar 15, 2023
499157d
Merge branch 'main' into queue-shutdown
AlexWaygood Mar 15, 2023
99eadd7
Update test names
YvesDup Mar 15, 2023
670d864
Refactoring and fix minor bugs
YvesDup Mar 15, 2023
6af0e8e
suppress import enum
YvesDup Mar 15, 2023
a3e03c5
update docstrings
YvesDup Mar 17, 2023
bc30db7
Suppress `import ctypes`, causes no necessary uses
YvesDup Mar 17, 2023
64defd4
Merge branch 'main' into queue-shutdown
YvesDup Mar 19, 2023
4382409
fix segmentation fault: use ctx.Value,
YvesDup Mar 22, 2023
aa70dc2
fix segmentation fault:
YvesDup Mar 22, 2023
cbfd771
Add private method about shutdown_state
YvesDup Mar 22, 2023
0d095bc
Add private methods to check _shutdown_state attr
YvesDup Mar 24, 2023
6d1f072
Merge branch 'main' into queue-shutdown
YvesDup Mar 24, 2023
891cffe
Update docs for queue shutdown
EpicWink Mar 28, 2023
e30933f
Update Lib/queue.py
YvesDup Mar 29, 2023
de5714d
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
da2e3c7
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
50857c0
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
56272b9
Update Lib/queue.py
YvesDup Mar 29, 2023
db7eaff
Update Lib/queue.py
YvesDup Mar 29, 2023
4099fb8
Update Lib/multiprocessing/queues.py
YvesDup Mar 29, 2023
58007dc
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
225387f
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
7c6e1c7
Update Lib/queue.py
YvesDup Mar 29, 2023
c025766
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
c9ae3be
Update Lib/asyncio/queues.py
YvesDup Mar 29, 2023
e9b66b2
Update some tests
YvesDup Apr 6, 2023
570158e
Suppress a borderline assert
YvesDup Apr 6, 2023
eeb47b0
Fix bugs
YvesDup Apr 6, 2023
6f0b131
Merge branch 'main' into queue-shutdown
YvesDup Apr 7, 2023
4e2a19e
ran patchcheck
YvesDup Apr 7, 2023
6926e11
remove ./Tools/c-analyzer/cpython/_parser.py
YvesDup Apr 7, 2023
d301c90
Merge branch 'main' into queue-shutdown
arhadthedev Apr 10, 2023
49879a0
Merge branch 'main' into queue-shutdown
YvesDup Apr 11, 2023
31ea16b
Add `shutdown` method documentation
YvesDup May 5, 2023
bed3a4b
Add `shutdown` method to documentation
YvesDup May 5, 2023
8e8dcfa
Add `shutdown` method to documentation
YvesDup May 5, 2023
9eed14e
Add `shutdown` method to documentation
YvesDup May 5, 2023
66efd9c
Merge pull request #4 from EpicWink/yvesdup-queue-shutdown-2
YvesDup May 5, 2023
f4ad064
Merge branch 'main' into queue-shutdown
YvesDup May 5, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 114 additions & 3 deletions Lib/asyncio/queues.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,14 @@
__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty')
__all__ = (
'Queue',
'PriorityQueue',
'LifoQueue',
'QueueFull',
'QueueEmpty',
'QueueShutDown',
)

import collections
import enum
import heapq
from types import GenericAlias

Expand All @@ -18,6 +26,17 @@ class QueueFull(Exception):
pass


class QueueShutDown(Exception):
"""Raised when putting on to or getting from a shut-down Queue."""
pass


class _QueueState(enum.Enum):
ALIVE = "alive"
SHUTDOWN = "shutdown"
SHUTDOWN_IMMEDIATE = "shutdown-immediate"


class Queue(mixins._LoopBoundMixin):
"""A queue, useful for coordinating producer and consumer coroutines.

Expand All @@ -41,6 +60,7 @@ def __init__(self, maxsize=0):
self._finished = locks.Event()
self._finished.set()
self._init(maxsize)
self._shutdown_state = _QueueState.ALIVE

# These three are overridable in subclasses.

Expand Down Expand Up @@ -81,6 +101,8 @@ def _format(self):
result += f' _putters[{len(self._putters)}]'
if self._unfinished_tasks:
result += f' tasks={self._unfinished_tasks}'
if not self._is_alive():
result += f' state={self._shutdown_state.value}'
return result

def qsize(self):
Expand Down Expand Up @@ -112,7 +134,11 @@ async def put(self, item):

Put an item into the queue. If the queue is full, wait until a free
slot is available before adding item.

Raise a QueueShutDown if _shutdown_state is not set to `ALIVE`.
"""
if not self._is_alive():
raise QueueShutDown
while self.full():
putter = self._get_loop().create_future()
self._putters.append(putter)
Expand All @@ -125,20 +151,28 @@ async def put(self, item):
self._putters.remove(putter)
except ValueError:
# The putter could be removed from self._putters by a
# previous get_nowait call.
# previous get_nowait call,
# or a shutdown call.
pass
if not self.full() and not putter.cancelled():
# We were woken up by get_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._putters)
raise
if not self._is_alive():
raise QueueShutDown
return self.put_nowait(item)

def put_nowait(self, item):
"""Put an item into the queue without blocking.

If no free slot is immediately available, raise QueueFull.

Raise a QueueShutDown if _shutdown_state is not set to
`ALIVE`.
"""
if not self._is_alive():
raise QueueShutDown
if self.full():
raise QueueFull
self._put(item)
Expand All @@ -150,8 +184,18 @@ async def get(self):
"""Remove and return an item from the queue.

If queue is empty, wait until an item is available.

Raise a QueueShutDown if _shutdown_state is set to
`SHUTDOWN_IMMEDIATE`.

Raise a QueueShutDown if _shutdown_state is set to
`SHUTDOWN` and queue is empty.
"""
if self._is_shutdown_immediate():
raise QueueShutDown
while self.empty():
if self._is_shutdown():
raise QueueShutDown
getter = self._get_loop().create_future()
self._getters.append(getter)
try:
Expand All @@ -163,21 +207,34 @@ async def get(self):
self._getters.remove(getter)
except ValueError:
# The getter could be removed from self._getters by a
# previous put_nowait call.
# previous put_nowait call,
# or a shutdown call.
pass
if not self.empty() and not getter.cancelled():
# We were woken up by put_nowait(), but can't take
# the call. Wake up the next in line.
self._wakeup_next(self._getters)
raise
if self._is_shutdown_immediate():
raise QueueShutDown
return self.get_nowait()

def get_nowait(self):
"""Remove and return an item from the queue.

Return an item if one is immediately available, else raise QueueEmpty.

Raise a QueueShutDown if _shutdown_state is set to
`SHUTDOWN_IMMEDIATE`.

Raise a QueueShutDown if _shutdown_state is set to
`SHUTDOWN` and queue is empty.
"""
if self._is_shutdown_immediate():
raise QueueShutDown
if self.empty():
if self._is_shutdown():
raise QueueShutDown
raise QueueEmpty
item = self._get()
self._wakeup_next(self._putters)
Expand All @@ -196,7 +253,12 @@ def task_done(self):

Raises ValueError if called more times than there were items placed in
the queue.

Raise a QueueShutDown if _shutdown_state is set to
`SHUTDOWN_IMMEDIATE`.
"""
if self._is_shutdown_immediate():
raise QueueShutDown
if self._unfinished_tasks <= 0:
raise ValueError('task_done() called too many times')
self._unfinished_tasks -= 1
Expand All @@ -210,9 +272,58 @@ async def join(self):
queue. The count goes down whenever a consumer calls task_done() to
indicate that the item was retrieved and all work on it is complete.
When the count of unfinished tasks drops to zero, join() unblocks.

Raise a QueueShutDown if _shutdown_state is set to
`SHUTDOWN_IMMEDIATE`.
"""
if self._is_shutdown_immediate():
raise QueueShutDown
if self._unfinished_tasks > 0:
await self._finished.wait()
if self._is_shutdown_immediate():
raise QueueShutDown

def shutdown(self, immediate=False):
"""Shut-down the queue, making queue gets and puts raise.

By default, gets will only raise once the queue is empty. Set
'immediate' to True to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if 'immediate'. The QueueShutDown exception is raised.
"""
if self._is_shutdown_immediate():
return
# here _shutdown_state is ALIVE or SHUTDOWN
if immediate:
self._set_shutdown_immediate()
while self._getters:
getter = self._getters.popleft()
if not getter.done():
getter.set_result(None)
# Release all 'blocked' tasks/coros in `join()`
self._finished.set()
else:
self._set_shutdown()
while self._putters:
putter = self._putters.popleft()
if not putter.done():
putter.set_result(None)

def _is_alive(self):
return self._shutdown_state is _QueueState.ALIVE

def _is_shutdown(self):
return self._shutdown_state is _QueueState.SHUTDOWN

def _is_shutdown_immediate(self):
return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE

def _set_shutdown(self):
self._shutdown_state = _QueueState.SHUTDOWN

def _set_shutdown_immediate(self):
self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE


class PriorityQueue(Queue):
Expand Down
63 changes: 60 additions & 3 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import weakref
import errno

from queue import Empty, Full
from queue import Empty, Full, ShutDown

import _multiprocessing

Expand All @@ -28,6 +28,10 @@

from .util import debug, info, Finalize, register_after_fork, is_exiting

_queue_alive = 0
_queue_shutdown = 1
_queue_shutdown_immediate = 2

#
# Queue type using a pipe, buffer and thread
#
Expand All @@ -50,18 +54,21 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._shutdown_state = ctx.Value('i', _queue_alive)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state) = state
self._reset()

def _after_fork(self):
Expand All @@ -83,9 +90,26 @@ def _reset(self, after_fork=False):
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def _is_alive(self):
return self._shutdown_state.value == _queue_alive

def _is_shutdown(self):
return self._shutdown_state.value == _queue_shutdown

def _is_shutdown_immediate(self):
return self._shutdown_state.value == _queue_shutdown_immediate

def _set_shutdown(self):
self._shutdown_state.value = _queue_shutdown

def _set_shutdown_immediate(self):
self._shutdown_state.value = _queue_shutdown_immediate

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._is_alive():
raise ShutDown
if not self._sem.acquire(block, timeout):
raise Full

Expand All @@ -98,6 +122,9 @@ def put(self, obj, block=True, timeout=None):
def get(self, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if self._is_shutdown_immediate() or\
(self._is_shutdown() and self.empty()):
raise ShutDown
if block and timeout is None:
with self._rlock:
res = self._recv_bytes()
Expand All @@ -118,6 +145,8 @@ def get(self, block=True, timeout=None):
self._sem.release()
finally:
self._rlock.release()
if self._is_shutdown_immediate():
raise ShutDown
# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand All @@ -137,6 +166,19 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
with self._shutdown_state.get_lock():
if self._is_shutdown_immediate():
return
if immediate:
self._set_shutdown_immediate()
with self._notempty:
self._notempty.notify_all()
else:
self._set_shutdown()

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -310,6 +352,8 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._is_alive():
raise ShutDown
if not self._sem.acquire(block, timeout):
raise Full

Expand All @@ -322,15 +366,28 @@ def put(self, obj, block=True, timeout=None):

def task_done(self):
with self._cond:
if self._is_shutdown_immediate():
raise ShutDown
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if self._is_shutdown_immediate():
raise ShutDown
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
if self._is_shutdown_immediate():
raise ShutDown

def shutdown(self, immediate=False):
with self._cond:
is_alive = self._is_alive()
super().shutdown(immediate)
if is_alive:
self._cond.notify_all() # here to check YD

#
# Simplified Queue type -- really just a locked pipe
Expand Down
Loading