diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 783cb025826483..bfc9a2e338d43b 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -872,7 +872,8 @@ For an example of the usage of queues for interprocess communication see free slot was available within that time. Otherwise (*block* is ``False``), put an item on the queue if a free slot is immediately available, else raise the :exc:`queue.Full` exception (*timeout* is - ignored in that case). + ignored in that case). Raises the :exc:`queue.ShutDown` if the queue has + been shut down. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -890,7 +891,9 @@ For an example of the usage of queues for interprocess communication see it blocks at most *timeout* seconds and raises the :exc:`queue.Empty` exception if no item was available within that time. Otherwise (block is ``False``), return an item if one is immediately available, else raise the - :exc:`queue.Empty` exception (*timeout* is ignored in that case). + :exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises + the :exc:`queue.ShutDown` exception if the queue has been shut down and + is empty, or if the queue has been shut down immediately. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -900,6 +903,21 @@ For an example of the usage of queues for interprocess communication see Equivalent to ``get(False)``. + .. method:: shutdown(immediate=False) + + Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` + raise :exc:`queue.ShutDown`. + + By default, :meth:`~Queue.get` on a shut down queue will only raise once + the queue is empty. Set *immediate* to true to make :meth:`~Queue.get` + raise immediately instead. + + All blocked callers of :meth:`~Queue.put` will be unblocked. If + *immediate* is true, also unblock callers of :meth:`~Queue.get` and + :meth:`~Queue.join`. + + .. versionadded:: 3.13 + :class:`multiprocessing.Queue` has a few additional methods not found in :class:`queue.Queue`. These methods are usually unnecessary for most code: @@ -988,6 +1006,9 @@ For an example of the usage of queues for interprocess communication see items have been processed (meaning that a :meth:`task_done` call was received for every item that had been :meth:`~Queue.put` into the queue). + ``shutdown(immediate=True)`` calls :meth:`task_done` for each remaining + item in the queue. + Raises a :exc:`ValueError` if called more times than there were items placed in the queue. diff --git a/Doc/whatsnew/3.14.rst b/Doc/whatsnew/3.14.rst index ac0ae8cf0133e6..4f502e401383f7 100644 --- a/Doc/whatsnew/3.14.rst +++ b/Doc/whatsnew/3.14.rst @@ -613,6 +613,8 @@ multiprocessing ``d |= {'b': 2}`` for proxies of :class:`dict`. (Contributed by Roy Hyunjin Han for :gh:`103134`.) +* Add :meth:`multiprocessing.Queue.shutdown` for queue termination. + (Contributed by Laurie Opperman in :gh:`104230`.) operator diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index 925f043900004e..b605a05c55bda7 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -18,7 +18,7 @@ import weakref import errno -from queue import Empty, Full +from queue import Empty, Full, ShutDown from . import connection from . import context @@ -48,6 +48,7 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset() + self._is_shutdown = ctx.Value('B', False) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -55,11 +56,13 @@ def __init__(self, maxsize=0, *, ctx): def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + self._rlock, self._wlock, self._sem, self._opid, + self._is_shutdown) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) = state + self._rlock, self._wlock, self._sem, self._opid, + self._is_shutdown) = state self._reset() def _after_fork(self): @@ -84,10 +87,16 @@ def _reset(self, after_fork=False): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if self._is_shutdown.value: + raise ShutDown if not self._sem.acquire(block, timeout): + if self._is_shutdown.value: + raise ShutDown raise Full with self._notempty: + if self._is_shutdown.value: + raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) @@ -98,24 +107,34 @@ def get(self, block=True, timeout=None): raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: + if self._is_shutdown.value and self.empty(): + raise ShutDown res = self._recv_bytes() self._sem.release() else: if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout): + if self._is_shutdown.value and self.empty(): + raise ShutDown raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout): + if self._is_shutdown.value: + raise ShutDown raise Empty elif not self._poll(): + if self._is_shutdown.value: + raise ShutDown raise Empty + res = self._recv_bytes() self._sem.release() finally: self._rlock.release() + # unserialize the data after having released the lock return _ForkingPickler.loads(res) @@ -135,6 +154,25 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def _clear(self): + with self._rlock: + while self._poll(): + self._recv_bytes() + + def shutdown(self, immediate=False): + if self._closed: + raise ValueError(f"Queue {self!r} is closed") + with self._is_shutdown.get_lock(): + self._is_shutdown.value = True + if immediate: + self._clear() + # TODO: unblock all getters to check empty (then shutdown) + for _ in range(self._maxsize): + try: + self._sem.release() + except ValueError: + break + def close(self): self._closed = True close = self._close @@ -328,10 +366,16 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if self._is_shutdown.value: + raise ShutDown if not self._sem.acquire(block, timeout): + if self._is_shutdown.value: + raise ShutDown raise Full with self._notempty, self._cond: + if self._is_shutdown.value: + raise ShutDown if self._thread is None: self._start_thread() self._buffer.append(obj) @@ -350,6 +394,14 @@ def join(self): if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() + def _clear(self): + with self._rlock: + while self._poll(): + self._recv_bytes() + self._unfinished_tasks.acquire(block=False) + with self._cond: + self._cond.notify_all() + # # Simplified Queue type -- really just a locked pipe # diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 4b7c3e7fa8bdd7..610f6f687de445 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -301,6 +301,70 @@ def get_value(self): # Testcases # +class ProcessFuture: + __slots__ = ("process", "connection") + + def __init__(self, process, connection): + self.process = process + self.connection = connection + + def __del__(self): + self.process.kill() + self.connection.close() + + @staticmethod + def _target(conn, fn, args): + try: + result = fn(*args) + except Exception as e: + try: + conn.send((False, e)) + except pickle.PicklingError: + exc = Exception(str(e)) + exc.__class__ = e.__class__ + conn.send((False, exc)) + else: + conn.send((True, result)) + conn.close() + + def _run(self, fn, *args): + def get_result(timeout=None): + process.join(timeout) + result = recv.recv() + if isinstance(result, Exception): + raise result + else: + return result + + recv, send = multiprocessing.Pipe() + process = multiprocessing.Process(target=self._target, args=(send, fn, args)) + process.start() + return process, get_result + + @classmethod + def start(cls, target, args=(), kwargs={}): + recv, send = multiprocessing.Pipe() + process = multiprocessing.Process( + target=cls._target, args=(send, target, args) + ) + process.start() + return cls(process, recv) + + @classmethod + def run(cls, target, args=(), kwargs={}): + future = cls.start(target, args, kwargs) + return future.result() + + def result(self): + self.process.join() + if not self.connection.poll(): + raise RuntimeError("Process failed to send result") + success, result = self.connection.recv() + if success: + return result + else: + raise result + class DummyCallable: def __call__(self, q, c): assert isinstance(c, DummyCallable) @@ -1413,6 +1477,139 @@ def test_closed_queue_put_get_exceptions(self): q.put('foo') with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + + @classmethod + def _join_worker(cls, q): + q.join() + + @classmethod + def _task_done_worker(cls, q): + q.task_done() + + @classmethod + def _get_worker(cls, q): + return q.get() + + def assertRaisesShutdown(self, msg="Didn't appear to shut-down queue"): + return self.assertRaises(pyqueue.ShutDown, msg=msg) + + def test_shutdown_empty(self): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + if isinstance(q, multiprocessing.JoinableQueue): + join = ProcessFuture.start(self._join_worker, (q,)) + q.shutdown(immediate=False) # [joinable] unfinished tasks: 0 -> 0 + _wait() + + self.assertEqual(q.qsize(), 0) + + if isinstance(q, multiprocessing.JoinableQueue): + self.assertFalse(join.process.is_alive()) + join.result() + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() + + def test_shutdown_nonempty(self): + for q in multiprocessing.Queue(1), multiprocessing.JoinableQueue(1): + q.put("data") + if isinstance(q, multiprocessing.JoinableQueue): + join = ProcessFuture.start(self._join_worker, (q,)) + q.shutdown(immediate=False) # [joinable] unfinished tasks: 1 -> 1 + _wait() + + self.assertEqual(q.qsize(), 1) + + self.assertEqual(ProcessFuture.run(q.get), "data") + + if isinstance(q, multiprocessing.JoinableQueue): + self.assertTrue(join.process.is_alive()) + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() + + if isinstance(q, multiprocessing.JoinableQueue): + q.task_done() + + if isinstance(q, multiprocessing.JoinableQueue): + self.assertFalse(join.process.is_alive()) + join.result() + + def test_shutdown_immediate(self): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + q.put("data") + if isinstance(q, multiprocessing.JoinableQueue): + join = ProcessFuture.start(self._join_worker, (q,)) + q.shutdown(immediate=True) + _wait() + + self.assertEqual(q.qsize(), 0) + + if isinstance(q, multiprocessing.JoinableQueue): + self.assertFalse(join.process.is_alive()) + join.result() + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() + + if isinstance(q, multiprocessing.JoinableQueue): + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): + q.task_done() + + def test_shutdown_immediate_with_unfinished(self): + q = multiprocessing.JoinableQueue() + q.put("data") + q.put("data") + join = ProcessFuture.start(self._join_worker, (q,)) + self.assertEqual(ProcessFuture.run(q.get), "data") + q.shutdown(immediate=True) + _wait() + + self.assertEqual(q.qsize(), 0) + + self.assertTrue(join.process.is_alive()) + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.put, "data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): + ProcessFuture.run(q.get) + with self.assertRaisesShutdown(): + q.get_nowait() + + q.task_done() + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): + q.task_done() + + self.assertFalse(join.process.is_alive()) + join.result() + # # # diff --git a/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst b/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst new file mode 100644 index 00000000000000..3ff73e83ea0068 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-06-05-17-04.gh-issue-96471.IsQeKV.rst @@ -0,0 +1,2 @@ +Add :class:`multiprocessing.Queue` and :class:`multiprocessing.JoinableQueue` +termination with :py:meth:`~multiprocessing.Queue.shutdown` method.