Skip to content

gh-96471: Add multiprocessing queue shutdown #104230

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 22 additions & 2 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,8 @@ For an example of the usage of queues for interprocess communication see
free slot was available within that time. Otherwise (*block* is
``False``), put an item on the queue if a free slot is immediately
available, else raise the :exc:`queue.Full` exception (*timeout* is
ignored in that case).
ignored in that case). Raises :exc:`ShutDown` if the queue has been shut
down.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -863,7 +864,9 @@ For an example of the usage of queues for interprocess communication see
it blocks at most *timeout* seconds and raises the :exc:`queue.Empty`
exception if no item was available within that time. Otherwise (block is
``False``), return an item if one is immediately available, else raise the
:exc:`queue.Empty` exception (*timeout* is ignored in that case).
:exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises
:exc:`queue.ShutDown` if the queue has been shut down and is empty, or if
the queue has been shut down immediately.

.. versionchanged:: 3.8
If the queue is closed, :exc:`ValueError` is raised instead of
Expand All @@ -873,6 +876,19 @@ For an example of the usage of queues for interprocess communication see

Equivalent to ``get(False)``.

.. method:: shutdown(immediate=False)

Shut-down the queue, making queue gets and puts raise
:exc:`queue.ShutDown`.

By default, gets will only raise once the queue is empty. Set
*immediate* to true to make gets raise immediately instead.

All blocked callers of put() will be unblocked, and also get()
and join() if *immediate* is true.

.. versionadded:: 3.13

:class:`multiprocessing.Queue` has a few additional methods not found in
:class:`queue.Queue`. These methods are usually unnecessary for most
code:
Expand Down Expand Up @@ -962,6 +978,8 @@ For an example of the usage of queues for interprocess communication see
Raises a :exc:`ValueError` if called more times than there were items
placed in the queue.

Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.


.. method:: join()

Expand All @@ -973,6 +991,8 @@ For an example of the usage of queues for interprocess communication see
it is complete. When the count of unfinished tasks drops to zero,
:meth:`~queue.Queue.join` unblocks.

Raises :exc:`queue.ShutDown` if the queue has been shut down immediately.


Miscellaneous
~~~~~~~~~~~~~
Expand Down
79 changes: 76 additions & 3 deletions Lib/multiprocessing/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import weakref
import errno

from queue import Empty, Full
from queue import Empty, Full, ShutDown

import _multiprocessing

Expand All @@ -28,6 +28,10 @@

from .util import debug, info, Finalize, register_after_fork, is_exiting

_queue_alive = 0
_queue_shutdown = 1
_queue_shutdown_immediate = 2

#
# Queue type using a pipe, buffer and thread
#
Expand All @@ -50,18 +54,21 @@ def __init__(self, maxsize=0, *, ctx):
# For use by concurrent.futures
self._ignore_epipe = False
self._reset()
self._shutdown_state = ctx.Value('i', _queue_alive)

if sys.platform != 'win32':
register_after_fork(self, Queue._after_fork)

def __getstate__(self):
context.assert_spawning(self)
return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid)
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state)

def __setstate__(self, state):
(self._ignore_epipe, self._maxsize, self._reader, self._writer,
self._rlock, self._wlock, self._sem, self._opid) = state
self._rlock, self._wlock, self._sem, self._opid,
self._shutdown_state) = state
self._reset()

def _after_fork(self):
Expand All @@ -83,10 +90,29 @@ def _reset(self, after_fork=False):
self._recv_bytes = self._reader.recv_bytes
self._poll = self._reader.poll

def _is_alive(self):
return self._shutdown_state.value == _queue_alive

def _is_shutdown(self):
return self._shutdown_state.value == _queue_shutdown

def _is_shutdown_immediate(self):
return self._shutdown_state.value == _queue_shutdown_immediate

def _set_shutdown(self):
self._shutdown_state.value = _queue_shutdown

def _set_shutdown_immediate(self):
self._shutdown_state.value = _queue_shutdown_immediate

def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._is_alive():
raise ShutDown
if not self._sem.acquire(block, timeout):
if not self._is_alive():
raise ShutDown
raise Full

with self._notempty:
Expand All @@ -100,24 +126,41 @@ def get(self, block=True, timeout=None):
raise ValueError(f"Queue {self!r} is closed")
if block and timeout is None:
with self._rlock:
# checks shutdown state
if (self._is_shutdown_immediate()
or (self._is_shutdown() and self.empty())):
raise ShutDown
res = self._recv_bytes()
self._sem.release()
else:
if block:
deadline = time.monotonic() + timeout
if not self._rlock.acquire(block, timeout):
if (self._is_shutdown_immediate()
or (self._is_shutdown() and self.empty())):
raise ShutDown
raise Empty
try:
if block:
timeout = deadline - time.monotonic()
if not self._poll(timeout):
if not self._is_alive():
raise ShutDown
raise Empty
elif not self._poll():
if not self._is_alive():
raise ShutDown
raise Empty

# here queue is not empty
if self._is_shutdown_immediate():
raise ShutDown
# here shutdown state queue is alive or shutdown
res = self._recv_bytes()
self._sem.release()
finally:
self._rlock.release()

# unserialize the data after having released the lock
return _ForkingPickler.loads(res)

Expand All @@ -137,6 +180,19 @@ def get_nowait(self):
def put_nowait(self, obj):
return self.put(obj, False)

def shutdown(self, immediate=False):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
with self._shutdown_state.get_lock():
if self._is_shutdown_immediate():
return
if immediate:
self._set_shutdown_immediate()
with self._notempty:
self._notempty.notify_all()
else:
self._set_shutdown()

def close(self):
self._closed = True
close = self._close
Expand Down Expand Up @@ -310,7 +366,11 @@ def __setstate__(self, state):
def put(self, obj, block=True, timeout=None):
if self._closed:
raise ValueError(f"Queue {self!r} is closed")
if not self._is_alive():
raise ShutDown
if not self._sem.acquire(block, timeout):
if not self._is_alive():
raise ShutDown
raise Full

with self._notempty, self._cond:
Expand All @@ -322,15 +382,28 @@ def put(self, obj, block=True, timeout=None):

def task_done(self):
with self._cond:
if self._is_shutdown_immediate():
raise ShutDown
if not self._unfinished_tasks.acquire(False):
raise ValueError('task_done() called too many times')
if self._unfinished_tasks._semlock._is_zero():
self._cond.notify_all()

def join(self):
with self._cond:
if self._is_shutdown_immediate():
raise ShutDown
if not self._unfinished_tasks._semlock._is_zero():
self._cond.wait()
if self._is_shutdown_immediate():
raise ShutDown

def shutdown(self, immediate=False):
with self._cond:
is_alive = self._is_alive()
super().shutdown(immediate)
if is_alive:
self._cond.notify_all()

#
# Simplified Queue type -- really just a locked pipe
Expand Down
Loading