diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index d86fbc21351e2d..b66bc9341a2081 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -62,6 +62,9 @@ Queue Remove and return an item from the queue. If queue is empty, wait until an item is available. + Raises :exc:`QueueShutDown` if the queue has been shut down and + is empty, or if the queue has been shut down immediately. + .. method:: get_nowait() Return an item if one is immediately available, else raise @@ -77,11 +80,16 @@ Queue work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. + Raises :exc:`QueueShutDown` if the queue has been shut down + immediately. + .. coroutinemethod:: put(item) Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item. + Raises :exc:`QueueShutDown` if the queue has been shut down. + .. method:: put_nowait(item) Put an item into the queue without blocking. @@ -92,6 +100,19 @@ Queue Return the number of items in the queue. + .. method:: shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise + :exc:`QueueShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.13 + .. method:: task_done() Indicate that a formerly enqueued task is complete. @@ -108,6 +129,9 @@ Queue Raises :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`QueueShutDown` if the queue has been shut down + immediately. + Priority Queue ============== @@ -145,6 +169,14 @@ Exceptions on a queue that has reached its *maxsize*. +.. exception:: QueueShutDown + + Exception raised when getting an item from or putting an item onto a + queue which has been shut down. + + .. versionadded:: 3.13 + + Examples ======== diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 8454296b815b41..7f1835e98ef91e 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -845,7 +845,8 @@ For an example of the usage of queues for interprocess communication see free slot was available within that time. Otherwise (*block* is ``False``), put an item on the queue if a free slot is immediately available, else raise the :exc:`queue.Full` exception (*timeout* is - ignored in that case). + ignored in that case). Raises :exc:`ShutDown` if the queue has been shut + down. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -863,7 +864,9 @@ For an example of the usage of queues for interprocess communication see it blocks at most *timeout* seconds and raises the :exc:`queue.Empty` exception if no item was available within that time. Otherwise (block is ``False``), return an item if one is immediately available, else raise the - :exc:`queue.Empty` exception (*timeout* is ignored in that case). + :exc:`queue.Empty` exception (*timeout* is ignored in that case). Raises + :exc:`queue.ShutDown` if the queue has been shut down and is empty, or if + the queue has been shut down immediately. .. versionchanged:: 3.8 If the queue is closed, :exc:`ValueError` is raised instead of @@ -873,6 +876,19 @@ For an example of the usage of queues for interprocess communication see Equivalent to ``get(False)``. + .. method:: shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise + :exc:`queue.ShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.13 + :class:`multiprocessing.Queue` has a few additional methods not found in :class:`queue.Queue`. These methods are usually unnecessary for most code: @@ -962,6 +978,8 @@ For an example of the usage of queues for interprocess communication see Raises a :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`queue.ShutDown` if the queue has been shut down immediately. + .. method:: join() @@ -973,6 +991,8 @@ For an example of the usage of queues for interprocess communication see it is complete. When the count of unfinished tasks drops to zero, :meth:`~queue.Queue.join` unblocks. + Raises :exc:`queue.ShutDown` if the queue has been shut down immediately. + Miscellaneous ~~~~~~~~~~~~~ diff --git a/Doc/library/queue.rst b/Doc/library/queue.rst index b2b787c5a8260c..33d6f2f4c85e1c 100644 --- a/Doc/library/queue.rst +++ b/Doc/library/queue.rst @@ -93,6 +93,14 @@ The :mod:`queue` module defines the following classes and exceptions: on a :class:`Queue` object which is full. +.. exception:: ShutDown + + Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is called on + a :class:`Queue` object which has been shut down. + + .. versionadded:: 3.13 + + .. _queueobjects: Queue Objects @@ -135,6 +143,8 @@ provide the public methods described below. immediately available, else raise the :exc:`Full` exception (*timeout* is ignored in that case). + Raises :exc:`ShutDown` if the queue has been shut down. + .. method:: Queue.put_nowait(item) @@ -155,6 +165,9 @@ provide the public methods described below. an uninterruptible wait on an underlying lock. This means that no exceptions can occur, and in particular a SIGINT will not trigger a :exc:`KeyboardInterrupt`. + Raises :exc:`ShutDown` if the queue has been shut down and is empty, or if + the queue has been shut down immediately. + .. method:: Queue.get_nowait() @@ -177,6 +190,8 @@ fully processed by daemon consumer threads. Raises a :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`ShutDown` if the queue has been shut down immediately. + .. method:: Queue.join() @@ -187,6 +202,8 @@ fully processed by daemon consumer threads. indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. + Raises :exc:`ShutDown` if the queue has been shut down immediately. + Example of how to wait for enqueued tasks to be completed:: @@ -214,6 +231,25 @@ Example of how to wait for enqueued tasks to be completed:: print('All work completed') +Terminating queues +^^^^^^^^^^^^^^^^^^ + +:class:`Queue` objects can be made to prevent further interaction by shutting +them down. + +.. method:: Queue.shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise :exc:`ShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.13 + + SimpleQueue Objects ------------------- diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a9656a6df561ba..ae2d55478342e6 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -1,6 +1,14 @@ -__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') +__all__ = ( + 'Queue', + 'PriorityQueue', + 'LifoQueue', + 'QueueFull', + 'QueueEmpty', + 'QueueShutDown', +) import collections +import enum import heapq from types import GenericAlias @@ -18,6 +26,17 @@ class QueueFull(Exception): pass +class QueueShutDown(Exception): + """Raised when putting on to or getting from a shut-down Queue.""" + pass + + +class _QueueState(enum.Enum): + ALIVE = "alive" + SHUTDOWN = "shutdown" + SHUTDOWN_IMMEDIATE = "shutdown-immediate" + + class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -41,6 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) + self._shutdown_state = _QueueState.ALIVE # These three are overridable in subclasses. @@ -81,6 +101,8 @@ def _format(self): result += f' _putters[{len(self._putters)}]' if self._unfinished_tasks: result += f' tasks={self._unfinished_tasks}' + if not self._is_alive(): + result += f' state={self._shutdown_state.value}' return result def qsize(self): @@ -112,7 +134,11 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. + + Raises QueueShutDown if the queue has been shut down. """ + if not self._is_alive(): + raise QueueShutDown while self.full(): putter = self._get_loop().create_future() self._putters.append(putter) @@ -125,20 +151,26 @@ async def put(self, item): self._putters.remove(putter) except ValueError: # The putter could be removed from self._putters by a - # previous get_nowait call. + # previous get_nowait call or a shutdown call. pass if not self.full() and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._putters) raise + if not self._is_alive(): + raise QueueShutDown return self.put_nowait(item) def put_nowait(self, item): """Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull. + + Raises QueueShutDown if the queue has been shut down. """ + if not self._is_alive(): + raise QueueShutDown if self.full(): raise QueueFull self._put(item) @@ -150,8 +182,15 @@ async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. + + Raises QueueShutDown if the queue has been shut down and is empty, or + if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown while self.empty(): + if self._is_shutdown(): + raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) try: @@ -163,21 +202,31 @@ async def get(self): self._getters.remove(getter) except ValueError: # The getter could be removed from self._getters by a - # previous put_nowait call. + # previous put_nowait call, + # or a shutdown call. pass if not self.empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._getters) raise + if self._is_shutdown_immediate(): + raise QueueShutDown return self.get_nowait() def get_nowait(self): """Remove and return an item from the queue. Return an item if one is immediately available, else raise QueueEmpty. + + Raises QueueShutDown if the queue has been shut down and is empty, or + if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown if self.empty(): + if self._is_shutdown(): + raise QueueShutDown raise QueueEmpty item = self._get() self._wakeup_next(self._putters) @@ -196,7 +245,11 @@ def task_done(self): Raises ValueError if called more times than there were items placed in the queue. + + Raises QueueShutDown if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self._unfinished_tasks -= 1 @@ -210,9 +263,57 @@ async def join(self): queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. + + Raises QueueShutDown if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown if self._unfinished_tasks > 0: await self._finished.wait() + if self._is_shutdown_immediate(): + raise QueueShutDown + + def shutdown(self, immediate=False): + """Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The QueueShutDown exception is raised. + """ + if self._is_shutdown_immediate(): + return + # here _shutdown_state is ALIVE or SHUTDOWN + if immediate: + self._set_shutdown_immediate() + while self._getters: + getter = self._getters.popleft() + if not getter.done(): + getter.set_result(None) + # Release all 'blocked' tasks/coros in `join()` + self._finished.set() + else: + self._set_shutdown() + while self._putters: + putter = self._putters.popleft() + if not putter.done(): + putter.set_result(None) + + def _is_alive(self): + return self._shutdown_state is _QueueState.ALIVE + + def _is_shutdown(self): + return self._shutdown_state is _QueueState.SHUTDOWN + + def _is_shutdown_immediate(self): + return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE + + def _set_shutdown(self): + self._shutdown_state = _QueueState.SHUTDOWN + + def _set_shutdown_immediate(self): + self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE class PriorityQueue(Queue): diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py index daf9ee94a19431..c9d5d4b567b4cd 100644 --- a/Lib/multiprocessing/queues.py +++ b/Lib/multiprocessing/queues.py @@ -18,7 +18,7 @@ import weakref import errno -from queue import Empty, Full +from queue import Empty, Full, ShutDown import _multiprocessing @@ -28,6 +28,10 @@ from .util import debug, info, Finalize, register_after_fork, is_exiting +_queue_alive = 0 +_queue_shutdown = 1 +_queue_shutdown_immediate = 2 + # # Queue type using a pipe, buffer and thread # @@ -50,6 +54,7 @@ def __init__(self, maxsize=0, *, ctx): # For use by concurrent.futures self._ignore_epipe = False self._reset() + self._shutdown_state = ctx.Value('i', _queue_alive) if sys.platform != 'win32': register_after_fork(self, Queue._after_fork) @@ -57,11 +62,13 @@ def __init__(self, maxsize=0, *, ctx): def __getstate__(self): context.assert_spawning(self) return (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) def __setstate__(self, state): (self._ignore_epipe, self._maxsize, self._reader, self._writer, - self._rlock, self._wlock, self._sem, self._opid) = state + self._rlock, self._wlock, self._sem, self._opid, + self._shutdown_state) = state self._reset() def _after_fork(self): @@ -83,10 +90,29 @@ def _reset(self, after_fork=False): self._recv_bytes = self._reader.recv_bytes self._poll = self._reader.poll + def _is_alive(self): + return self._shutdown_state.value == _queue_alive + + def _is_shutdown(self): + return self._shutdown_state.value == _queue_shutdown + + def _is_shutdown_immediate(self): + return self._shutdown_state.value == _queue_shutdown_immediate + + def _set_shutdown(self): + self._shutdown_state.value = _queue_shutdown + + def _set_shutdown_immediate(self): + self._shutdown_state.value = _queue_shutdown_immediate + def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if not self._is_alive(): + raise ShutDown if not self._sem.acquire(block, timeout): + if not self._is_alive(): + raise ShutDown raise Full with self._notempty: @@ -100,24 +126,41 @@ def get(self, block=True, timeout=None): raise ValueError(f"Queue {self!r} is closed") if block and timeout is None: with self._rlock: + # checks shutdown state + if (self._is_shutdown_immediate() + or (self._is_shutdown() and self.empty())): + raise ShutDown res = self._recv_bytes() self._sem.release() else: if block: deadline = time.monotonic() + timeout if not self._rlock.acquire(block, timeout): + if (self._is_shutdown_immediate() + or (self._is_shutdown() and self.empty())): + raise ShutDown raise Empty try: if block: timeout = deadline - time.monotonic() if not self._poll(timeout): + if not self._is_alive(): + raise ShutDown raise Empty elif not self._poll(): + if not self._is_alive(): + raise ShutDown raise Empty + + # here queue is not empty + if self._is_shutdown_immediate(): + raise ShutDown + # here shutdown state queue is alive or shutdown res = self._recv_bytes() self._sem.release() finally: self._rlock.release() + # unserialize the data after having released the lock return _ForkingPickler.loads(res) @@ -137,6 +180,19 @@ def get_nowait(self): def put_nowait(self, obj): return self.put(obj, False) + def shutdown(self, immediate=False): + if self._closed: + raise ValueError(f"Queue {self!r} is closed") + with self._shutdown_state.get_lock(): + if self._is_shutdown_immediate(): + return + if immediate: + self._set_shutdown_immediate() + with self._notempty: + self._notempty.notify_all() + else: + self._set_shutdown() + def close(self): self._closed = True close = self._close @@ -310,7 +366,11 @@ def __setstate__(self, state): def put(self, obj, block=True, timeout=None): if self._closed: raise ValueError(f"Queue {self!r} is closed") + if not self._is_alive(): + raise ShutDown if not self._sem.acquire(block, timeout): + if not self._is_alive(): + raise ShutDown raise Full with self._notempty, self._cond: @@ -322,6 +382,8 @@ def put(self, obj, block=True, timeout=None): def task_done(self): with self._cond: + if self._is_shutdown_immediate(): + raise ShutDown if not self._unfinished_tasks.acquire(False): raise ValueError('task_done() called too many times') if self._unfinished_tasks._semlock._is_zero(): @@ -329,8 +391,19 @@ def task_done(self): def join(self): with self._cond: + if self._is_shutdown_immediate(): + raise ShutDown if not self._unfinished_tasks._semlock._is_zero(): self._cond.wait() + if self._is_shutdown_immediate(): + raise ShutDown + + def shutdown(self, immediate=False): + with self._cond: + is_alive = self._is_alive() + super().shutdown(immediate) + if is_alive: + self._cond.notify_all() # # Simplified Queue type -- really just a locked pipe diff --git a/Lib/queue.py b/Lib/queue.py index 55f50088460f9e..f8a7ba072247f0 100644 --- a/Lib/queue.py +++ b/Lib/queue.py @@ -25,6 +25,14 @@ class Full(Exception): pass +class ShutDown(Exception): + '''Raised when put/get with shut-down queue.''' + + +_queue_alive = "alive" +_queue_shutdown = "shutdown" +_queue_shutdown_immediate = "shutdown-immediate" + class Queue: '''Create a queue object with a given maximum size. @@ -54,6 +62,9 @@ def __init__(self, maxsize=0): self.all_tasks_done = threading.Condition(self.mutex) self.unfinished_tasks = 0 + # Queue shutdown state + self.shutdown_state = _queue_alive + def task_done(self): '''Indicate that a formerly enqueued task is complete. @@ -67,8 +78,12 @@ def task_done(self): Raises a ValueError if called more times than there were items placed in the queue. + + Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: + if self._is_shutdown_immediate(): + raise ShutDown unfinished = self.unfinished_tasks - 1 if unfinished <= 0: if unfinished < 0: @@ -84,10 +99,16 @@ def join(self): to indicate the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. + + Raises ShutDown if the queue has been shut down immediately. ''' with self.all_tasks_done: + if self._is_shutdown_immediate(): + raise ShutDown while self.unfinished_tasks: self.all_tasks_done.wait() + if self._is_shutdown_immediate(): + raise ShutDown def qsize(self): '''Return the approximate size of the queue (not reliable!).''' @@ -129,8 +150,12 @@ def put(self, item, block=True, timeout=None): Otherwise ('block' is false), put an item on the queue if a free slot is immediately available, else raise the Full exception ('timeout' is ignored in that case). + + Raises ShutDown if the queue has been shut down. ''' with self.not_full: + if not self._is_alive(): + raise ShutDown if self.maxsize > 0: if not block: if self._qsize() >= self.maxsize: @@ -138,6 +163,8 @@ def put(self, item, block=True, timeout=None): elif timeout is None: while self._qsize() >= self.maxsize: self.not_full.wait() + if not self._is_alive(): + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -147,6 +174,8 @@ def put(self, item, block=True, timeout=None): if remaining <= 0.0: raise Full self.not_full.wait(remaining) + if not self._is_alive(): + raise ShutDown self._put(item) self.unfinished_tasks += 1 self.not_empty.notify() @@ -161,14 +190,22 @@ def get(self, block=True, timeout=None): Otherwise ('block' is false), return an item if one is immediately available, else raise the Empty exception ('timeout' is ignored in that case). + + Raises ShutDown if the queue has been shut down and is empty, + or if the queue has been shut down immediately. ''' with self.not_empty: + if self._is_shutdown_immediate() or\ + (self._is_shutdown() and not self._qsize()): + raise ShutDown if not block: if not self._qsize(): raise Empty elif timeout is None: while not self._qsize(): self.not_empty.wait() + if self._is_shutdown_immediate(): + raise ShutDown elif timeout < 0: raise ValueError("'timeout' must be a non-negative number") else: @@ -178,6 +215,8 @@ def get(self, block=True, timeout=None): if remaining <= 0.0: raise Empty self.not_empty.wait(remaining) + if self._is_shutdown_immediate(): + raise ShutDown item = self._get() self.not_full.notify() return item @@ -198,6 +237,43 @@ def get_nowait(self): ''' return self.get(block=False) + def shutdown(self, immediate=False): + '''Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The ShutDown exception is raised. + ''' + with self.mutex: + if self._is_shutdown_immediate(): + return + if immediate: + self._set_shutdown_immediate() + self.not_empty.notify_all() + # release all blocked threads in `join()` + self.all_tasks_done.notify_all() + else: + self._set_shutdown() + self.not_full.notify_all() + + def _is_alive(self): + return self.shutdown_state == _queue_alive + + def _is_shutdown(self): + return self.shutdown_state == _queue_shutdown + + def _is_shutdown_immediate(self): + return self.shutdown_state == _queue_shutdown_immediate + + def _set_shutdown(self): + self.shutdown_state = _queue_shutdown + + def _set_shutdown_immediate(self): + self.shutdown_state = _queue_shutdown_immediate + + # Override these methods to implement other queue organizations # (e.g. stack or priority queue). # These will only be called with appropriate locks held diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 9a2db24b4bd597..ae76c697c71ac1 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -1277,6 +1277,262 @@ def test_closed_queue_put_get_exceptions(self): q.put('foo') with self.assertRaisesRegex(ValueError, 'is closed'): q.get() + + def test_shutdown_empty(self): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + q.shutdown() + _wait() + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.put("data") + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.get() + + def test_shutdown_nonempty(self): + for q in multiprocessing.Queue(1), multiprocessing.JoinableQueue(1): + q.put("data") + q.shutdown() + _wait() + q.get() + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.get() + + def test_shutdown_immediate(self): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + q.put("data") + q.shutdown(immediate=True) + _wait() + with self.assertRaises( + pyqueue.ShutDown, msg="Didn't appear to shut-down queue" + ): + q.get() + + def test_shutdown_allowed_transitions(self): + # allowed transitions would be from `alive`` via `shutdown` to `shutdown_immediate`` + mod_q = multiprocessing.queues + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + self.assertEqual(mod_q._queue_alive, q._shutdown_state.value) + + # default -> immediate=False + q.shutdown() + self.assertEqual(mod_q._queue_shutdown, q._shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual(mod_q._queue_shutdown_immediate, q._shutdown_state.value) + + q.shutdown(immediate=False) + self.assertNotEqual(mod_q._queue_shutdown, q._shutdown_state.value) + + def _shutdown_all_methods_in_one_process(self, immediate): + # part 1: Queue + q = multiprocessing.Queue(2) + q.put("L") + _wait() # Give time to simulate many processes + q.put_nowait("O") + q.shutdown(immediate) + _wait() # simulate time of synchro primitive + + with self.assertRaises(pyqueue.ShutDown): + q.put("E") + with self.assertRaises(pyqueue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(pyqueue.ShutDown): + q.get() + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() + else: + # Neither `task_done`, neither `join`methods` to test + self.assertEqual(q.get(), "L") + self.assertEqual(q.get_nowait(), "O") + _wait() + + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(pyqueue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() # q.get(False) + with self.assertRaises(pyqueue.ShutDown): + q.get(True, 1.0) + + # part 2: JoinableQueue + q = multiprocessing.JoinableQueue(2) + q.put("L") + _wait() + q.put_nowait("O") + q.shutdown(immediate) + _wait() + + with self.assertRaises(pyqueue.ShutDown): + q.put("E") + with self.assertRaises(pyqueue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(pyqueue.ShutDown): + q.get() + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() + with self.assertRaises(pyqueue.ShutDown): + q.task_done() + with self.assertRaises(pyqueue.ShutDown): + q.join() + else: + self.assertEqual(q.get(), "L") + q.task_done() + _wait() + self.assertEqual(q.get(), "O") + q.task_done() + _wait() + q.join() + # when `shutdown` queue is empty, should raise ShutDown Exception + with self.assertRaises(pyqueue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(pyqueue.ShutDown): + q.get_nowait() # p.get(False) + with self.assertRaises(pyqueue.ShutDown): + q.get(True, 1.0) + + def test_shutdown_all_methods_in_one_process(self): + return self._shutdown_all_methods_in_one_process(False) + + def test_shutdown_immediate_all_methods_in_one_process(self): + return self._shutdown_all_methods_in_one_process(True) + + @classmethod + def _write_msg_process(cls, q, n, results, delay, + i_when_exec_shutdown, + event_start, event_end): + event_start.wait() + for i in range(1, n+1): + try: + q.put((i, "YDLO")) + results.append(True) + except pyqueue.ShutDown: + results.append(False) + # triggers shutdown of queue + if i == i_when_exec_shutdown: + event_end.set() + time.sleep(delay) + # end of all puts + if isinstance(q, type(multiprocessing.JoinableQueue())): + try: + q.join() + except pyqueue.ShutDown: + pass + + @classmethod + def _read_msg_process(cls, q, nb, results, delay, event_start): + event_start.wait() + block = True + while nb: + time.sleep(delay) + try: + # Get at least one message + q.get(block) + block = False + if isinstance(q, type(multiprocessing.JoinableQueue())): + q.task_done() + results.append(True) + nb -= 1 + except pyqueue.ShutDown: + results.append(False) + nb -= 1 + except pyqueue.Empty: + pass + # end of all gets + if isinstance(q, type(multiprocessing.JoinableQueue())): + try: + q.join() + except pyqueue.ShutDown: + pass + + @classmethod + def _shutdown_process(cls, q, event_end, immediate): + event_end.wait() + q.shutdown(immediate) + if isinstance(q, type(multiprocessing.JoinableQueue())): + try: + q.join() + except pyqueue.ShutDown: + pass + + @classmethod + def _join_process(cls, q, delay, event_start): + event_start.wait() + time.sleep(delay) + try: + q.join() + except pyqueue.ShutDown: + pass + + #@classmethod + def _shutdown_all_methods_in_many_processes(self, immediate): + for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): + ps = [] + ev_start = multiprocessing.Event() + ev_exec_shutdown = multiprocessing.Event() + m = multiprocessing.Manager() + res_puts = m.list() + res_gets = m.list() + delay = 1e-4 + read_process = 4 + nb_msgs = read_process * 16 + nb_msgs_r = nb_msgs // read_process + when_exec_shutdown = nb_msgs // 2 + if isinstance(q, type(multiprocessing.Queue())): + lprocs = ( + (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_process, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)), + ) + else: + # add 2 self._join process processes + lprocs = ( + (self._write_msg_process, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_process, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._join_process, 2, (q, delay*2, ev_start)), + (self._shutdown_process, 1, (q, ev_exec_shutdown, immediate)), + ) + # start all processes + for func, n, args in lprocs: + for i in range(n): + ps.append(multiprocessing.Process(target=func, args=args)) + ps[-1].start() + # set event in order to run q.shutdown() + ev_start.set() + _wait() + # wait + if isinstance(q, type(multiprocessing.Queue())): + for p in ps: + p.join() + + if not immediate: + self.assertTrue(q.empty()) + self.assertEqual(res_gets.count(True), res_puts.count(True)) + else: + self.assertTrue(res_gets.count(True) <= res_puts.count(True)) + + def test_shutdown_all_methods_in_many_processes(self): + return self._shutdown_all_methods_in_many_processes(False) + + def test_shutdown_immediate_all_methods_in_many_processes(self): + return self._shutdown_all_methods_in_many_processes(True) + + # # # diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 2d058ccf6a8c72..bf4a5a78a8f0cc 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -522,5 +522,382 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa q_class = asyncio.PriorityQueue +class _QueueShutdownTestMixin: + q_class = None + + async def asyncSetUp(self): + await super().asyncSetUp() + self.delay = 0.001 + + async def _get(self, q, go, results, shutdown=False): + await go.wait() + try: + msg = await q.get() + results.append(not shutdown) + return msg + except asyncio.QueueShutDown: + results.append(shutdown) + return shutdown + + async def _get_nowait(self, q, go, results, shutdown=False): + await go.wait() + try: + msg = q.get_nowait() + results.append(not shutdown) + return msg + except asyncio.QueueShutDown: + results.append(shutdown) + return shutdown + + async def _get_task_done(self, q, go, results): + await go.wait() + try: + msg = await q.get() + q.task_done() + results.append(True) + return msg + except asyncio.QueueShutDown: + results.append(False) + return False + + async def _put(self, q, go, msg, results, shutdown=False): + await go.wait() + try: + await q.put(msg) + results.append(not shutdown) + return not shutdown + except asyncio.QueueShutDown: + results.append(shutdown) + return shutdown + + async def _put_nowait(self, q, go, msg, results, shutdown=False): + await go.wait() + try: + q.put_nowait(msg) + results.append(False) + return not shutdown + except asyncio.QueueShutDown: + results.append(True) + return shutdown + + async def _shutdown(self, q, go, immediate): + q.shutdown(immediate) + await asyncio.sleep(self.delay) + go.set() + await asyncio.sleep(self.delay) + + async def _join(self, q, results, shutdown=False): + try: + await q.join() + results.append(not shutdown) + return True + except asyncio.QueueShutDown: + results.append(shutdown) + return False + except asyncio.CancelledError: + results.append(shutdown) + raise + + async def test_shutdown_empty(self): + q = self.q_class() + q.shutdown() + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): + await q.put("data") + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): + await q.get() + + async def test_shutdown_nonempty(self): + q = self.q_class() + q.put_nowait("data") + q.shutdown() + await q.get() + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): + await q.get() + + async def test_shutdown_immediate(self): + q = self.q_class() + q.put_nowait("data") + q.shutdown(immediate=True) + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): + await q.get() + + async def test_shutdown_repr(self): + q = self.q_class(4) + # when alive, not in repr + self.assertNotIn("alive", repr(q)) + + q = self.q_class(6) + q.shutdown(immediate=False) + self.assertIn("shutdown", repr(q)) + + q = self.q_class(8) + q.shutdown(immediate=True) + self.assertIn("shutdown-immediate", repr(q)) + + async def test_shutdown_allowed_transitions(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.q_class() + self.assertEqual("alive", q._shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown", q._shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q._shutdown_state.value) + + q.shutdown(immediate=False) + self.assertNotEqual("shutdown", q._shutdown_state.value) + + async def _shutdown_all_methods_in_one_task(self, immediate): + q = asyncio.Queue() + await q.put("L") + q.put_nowait("O") + q.shutdown(immediate) + with self.assertRaises(asyncio.QueueShutDown): + await q.put("E") + with self.assertRaises(asyncio.QueueShutDown): + q.put_nowait("W") + + if immediate: + with self.assertRaises(asyncio.QueueShutDown): + await q.get() + with self.assertRaises(asyncio.QueueShutDown): + q.get_nowait() + with self.assertRaises(asyncio.QueueShutDown): + q.task_done() + with self.assertRaises(asyncio.QueueShutDown): + await q.join() + else: + self.assertIn(await q.get(), "LO") + q.task_done() + self.assertIn(q.get_nowait(), "LO") + q.task_done() + await q.join() + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(asyncio.QueueShutDown): + await q.get() + with self.assertRaises(asyncio.QueueShutDown): + q.get_nowait() + + async def test_shutdown_all_methods_in_one_task(self): + return await self._shutdown_all_methods_in_one_task(False) + + async def test_shutdown_immediate_all_methods_in_one_task(self): + return await self._shutdown_all_methods_in_one_task(True) + + async def _shutdown_putters(self, immediate): + delay = self.delay + q = self.q_class(2) + results = [] + await q.put("E") + await q.put("W") + # queue full + t = asyncio.create_task(q.put("Y")) + await asyncio.sleep(delay) + self.assertTrue(len(q._putters) == 1) + with self.assertRaises(asyncio.QueueShutDown): + # here `t` raises a QueueShuDown + q.shutdown(immediate) + await t + self.assertTrue(not q._putters) + + async def test_shutdown_putters_deque(self): + return await self._shutdown_putters(False) + + async def test_shutdown_immediate_putters_deque(self): + return await self._shutdown_putters(True) + + async def _shutdown_getters(self, immediate): + delay = self.delay + q = self.q_class(1) + results = [] + await q.put("Y") + nb = q.qsize() + # queue full + + asyncio.create_task(q.get()) + await asyncio.sleep(delay) + t = asyncio.create_task(q.get()) + await asyncio.sleep(delay) + self.assertTrue(len(q._getters) == 1) + if immediate: + # here `t` raises a QueueShuDown + with self.assertRaises(asyncio.QueueShutDown): + q.shutdown(immediate) + await t + self.assertTrue(not q._getters) + else: + # here `t` is always pending + q.shutdown(immediate) + await asyncio.sleep(delay) + self.assertTrue(q._getters) + self.assertEqual(q._unfinished_tasks, nb) + + + async def test_shutdown_getters_deque(self): + return await self._shutdown_getters(False) + + async def test_shutdown_immediate_getters_deque(self): + return await self._shutdown_getters(True) + + async def _shutdown_get(self, immediate): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + nb = q.qsize() + # queue full + + if immediate: + coros = ( + (self._get(q, go, results, shutdown=True)), + (self._get_nowait(q, go, results, shutdown=True)), + ) + else: + coros = ( + # one of these tasks shoud raise Shutdown + (self._get(q, go, results)), + (self._get_nowait(q, go, results)), + (self._get_nowait(q, go, results)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + res = await asyncio.gather(*t) + if immediate: + self.assertEqual(results, [True]*len(coros)) + else: + self.assertListEqual(sorted(results), [False] + [True]*(len(coros)-1)) + + async def test_shutdown_get(self): + return await self._shutdown_get(False) + + async def test_shutdown_immediate_get(self): + return await self._shutdown_get(True) + + async def test_shutdown_get_task_done_join(self): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + self.assertEqual(q._unfinished_tasks, q.qsize()) + + # queue full + + coros = ( + (self._get_task_done(q, go, results)), + (self._get_task_done(q, go, results)), + (self._join(q, results)), + (self._join(q, results)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, False))) + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + self.assertIn(t[0].result(), "YD") + self.assertIn(t[1].result(), "YD") + self.assertNotEqual(t[0].result(), t[1].result()) + self.assertEqual(q._unfinished_tasks, 0) + + async def _shutdown_put(self, immediate): + q = self.q_class() + results = [] + go = asyncio.Event() + # queue not empty + + coros = ( + (self._put(q, go, "Y", results, shutdown=True)), + (self._put_nowait(q, go, "D", results, shutdown=True)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + + async def test_shutdown_put(self): + return await self._shutdown_put(False) + + async def test_shutdown_immediate_put(self): + return await self._shutdown_put(True) + + async def _shutdown_put_join(self, immediate): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + nb = q.qsize() + # queue fulled + + async def _cancel_join_task(q, delay, t): + await asyncio.sleep(delay) + t.cancel() + await asyncio.sleep(0) + q._finished.set() + + coros = ( + (self._put(q, go, "E", results, shutdown=True)), + (self._put_nowait(q, go, "W", results, shutdown=True)), + (self._join(q, results, shutdown=True)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + if not immediate: + # Here calls `join` is a blocking operation + # so wait for a delay and cancel this blocked task + t.append(asyncio.create_task(_cancel_join_task(q, 0.01, t[2]))) + with self.assertRaises(asyncio.CancelledError) as e: + await asyncio.gather(*t) + else: + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + self.assertTrue(q._finished.is_set()) + + async def test_shutdown_put_join(self): + return await self._shutdown_put_join(False) + + async def test_shutdown_immediate_put_and_join(self): + return await self._shutdown_put_join(True) + + +class QueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.Queue + + +class LifoQueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.LifoQueue + + +class PriorityQueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.PriorityQueue + + if __name__ == '__main__': unittest.main() diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 33113a72e6b6a9..d9e840a7c861ed 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -241,6 +241,408 @@ def test_shrinking_queue(self): with self.assertRaises(self.queue.Full): q.put_nowait(4) + def test_shutdown_empty(self): + q = self.type2test() + q.shutdown() + with self.assertRaises(self.queue.ShutDown): + q.put("data") + with self.assertRaises(self.queue.ShutDown): + q.get() + + def test_shutdown_nonempty(self): + q = self.type2test() + q.put("data") + q.shutdown() + q.get() + with self.assertRaises(self.queue.ShutDown): + q.get() + + def test_shutdown_immediate(self): + q = self.type2test() + q.put("data") + q.shutdown(immediate=True) + with self.assertRaises(self.queue.ShutDown): + q.get() + + def test_shutdown_allowed_transitions(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.type2test() + self.assertEqual("alive", q.shutdown_state) + + q.shutdown() + self.assertEqual("shutdown", q.shutdown_state) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q.shutdown_state) + + q.shutdown(immediate=False) + self.assertNotEqual("shutdown", q.shutdown_state) + + def _shutdown_all_methods_in_one_thread(self, immediate): + q = self.type2test(2) + q.put("L") + q.put_nowait("O") + q.shutdown(immediate) + + with self.assertRaises(self.queue.ShutDown): + q.put("E") + with self.assertRaises(self.queue.ShutDown): + q.put_nowait("W") + if immediate: + with self.assertRaises(self.queue.ShutDown): + q.get() + with self.assertRaises(self.queue.ShutDown): + q.get_nowait() + with self.assertRaises(self.queue.ShutDown): + q.task_done() + with self.assertRaises(self.queue.ShutDown): + q.join() + else: + self.assertIn(q.get(), "LO") + q.task_done() + self.assertIn(q.get(), "LO") + q.task_done() + q.join() + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(self.queue.ShutDown): + q.get() # p.get(True) + with self.assertRaises(self.queue.ShutDown): + q.get_nowait() # p.get(False) + with self.assertRaises(self.queue.ShutDown): + q.get(True, 1.0) + + def test_shutdown_all_methods_in_one_thread(self): + return self._shutdown_all_methods_in_one_thread(False) + + def test_shutdown_immediate_all_methods_in_one_thread(self): + return self._shutdown_all_methods_in_one_thread(True) + + def _write_msg_thread(self, q, n, results, delay, + i_when_exec_shutdown, + event_start, event_end): + event_start.wait() + for i in range(1, n+1): + try: + q.put((i, "YDLO")) + results.append(True) + except self.queue.ShutDown: + results.append(False) + # triggers shutdown of queue + if i == i_when_exec_shutdown: + event_end.set() + time.sleep(delay) + # end of all puts + try: + q.join() + except self.queue.ShutDown: + pass + + def _read_msg_thread(self, q, nb, results, delay, event_start): + event_start.wait() + block = True + while nb: + time.sleep(delay) + try: + # Get at least one message + q.get(block) + block = False + q.task_done() + results.append(True) + nb -= 1 + except self.queue.ShutDown: + results.append(False) + nb -= 1 + except self.queue.Empty: + pass + try: + q.join() + except self.queue.ShutDown: + pass + + def _shutdown_thread(self, q, event_end, immediate): + event_end.wait() + q.shutdown(immediate) + try: + q.join() + except self.queue.ShutDown: + pass + + def _join_thread(self, q, delay, event_start): + event_start.wait() + time.sleep(delay) + try: + q.join() + except self.queue.ShutDown: + pass + + def _shutdown_all_methods_in_many_threads(self, immediate): + q = self.type2test() + ps = [] + ev_start = threading.Event() + ev_exec_shutdown = threading.Event() + res_puts = [] + res_gets = [] + delay = 1e-4 + read_process = 4 + nb_msgs = read_process * 16 + nb_msgs_r = nb_msgs // read_process + when_exec_shutdown = nb_msgs // 2 + lprocs = ( + (self._write_msg_thread, 1, (q, nb_msgs, res_puts, delay, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_thread, read_process, (q, nb_msgs_r, + res_gets, delay*2, + ev_start)), + (self._join_thread, 2, (q, delay*2, ev_start)), + (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), + ) + # start all threds + for func, n, args in lprocs: + for i in range(n): + ps.append(threading.Thread(target=func, args=args)) + ps[-1].start() + # set event in order to run q.shutdown() + ev_start.set() + + if not immediate: + assert(len(res_gets) == len(res_puts)) + assert(res_gets.count(True) == res_puts.count(True)) + else: + assert(len(res_gets) <= len(res_puts)) + assert(res_gets.count(True) <= res_puts.count(True)) + + def test_shutdown_all_methods_in_many_threads(self): + return self._shutdown_all_methods_in_many_threads(False) + + def test_shutdown_immediate_all_methods_in_many_threads(self): + return self._shutdown_all_methods_in_many_threads(True) + + def _get(self, q, go, results, shutdown=False): + go.wait() + try: + msg = q.get() + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _get_shutdown(self, q, go, results): + return self._get(q, go, results, True) + + def _get_task_done(self, q, go, results): + go.wait() + try: + msg = q.get() + q.task_done() + results.append(True) + return msg + except self.queue.ShutDown: + results.append(False) + return False + + def _put(self, q, msg, go, results, shutdown=False): + go.wait() + try: + q.put(msg) + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _put_shutdown(self, q, msg, go, results): + return self._put(q, msg, go, results, True) + + def _join(self, q, results, shutdown=False): + try: + q.join() + results.append(not shutdown) + return not shutdown + except self.queue.ShutDown: + results.append(shutdown) + return shutdown + + def _join_shutdown(self, q, results): + return self._join(q, results, True) + + def _shutdown_get(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + # queue full + + if immediate: + thrds = ( + (self._get_shutdown, (q, go, results)), + (self._get_shutdown, (q, go, results)), + ) + else: + thrds = ( + # on shutdown(immediate=False) + # one of these threads shoud raise Shutdown + (self._get, (q, go, results)), + (self._get, (q, go, results)), + (self._get, (q, go, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + q.shutdown(immediate) + go.set() + for t in threads: + t.join() + if immediate: + self.assertListEqual(results, [True, True]) + else: + self.assertListEqual(sorted(results), [False] + [True]*(len(thrds)-1)) + + def test_shutdown_get(self): + return self._shutdown_get(False) + + def test_shutdown_immediate_get(self): + return self._shutdown_get(True) + + def _shutdown_put(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + # queue fulled + + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._put_shutdown, (q, "W", go, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + q.shutdown() + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_put(self): + return self._shutdown_put(False) + + def test_shutdown_immediate_put(self): + return self._shutdown_put(True) + + def _shutdown_join(self, immediate): + q = self.type2test() + results = [] + q.put("Y") + go = threading.Event() + nb = q.qsize() + + if immediate: + thrds = ( + (self._join_shutdown, (q, results)), + (self._join_shutdown, (q, results)), + ) + else: + thrds = ( + (self._join, (q, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + if not immediate: + res = [] + for i in range(nb): + threads.append(threading.Thread(target=self._get_task_done, args=(q, go, res))) + threads[-1].start() + q.shutdown(immediate) + go.set() + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_immediate_join(self): + return self._shutdown_join(True) + + def test_shutdown_join(self): + return self._shutdown_join(False) + + def _shutdown_put_join(self, immediate): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + nb = q.qsize() + # queue not fulled + + if immediate: + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join_shutdown, (q, results)), + ) + else: + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + if not immediate: + self.assertEqual(q.unfinished_tasks, nb) + for i in range(nb): + t = threading.Thread(target=q.task_done) + t.start() + threads.append(t) + go.set() + q.shutdown(immediate) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + def test_shutdown_immediate_put_join(self): + return self._shutdown_put_join(True) + + def test_shutdown_put_join(self): + return self._shutdown_put_join(False) + + def test_shutdown_get_task_done_join(self): + q = self.type2test(2) + results = [] + go = threading.Event() + q.put("Y") + q.put("D") + self.assertEqual(q.unfinished_tasks, q.qsize()) + + thrds = ( + (self._get_task_done, (q, go, results)), + (self._get_task_done, (q, go, results)), + (self._join, (q, results)), + (self._join, (q, results)), + ) + threads = [] + for func, params in thrds: + threads.append(threading.Thread(target=func, args=params)) + threads[-1].start() + go.set() + q.shutdown(False) + for t in threads: + t.join() + + self.assertEqual(results, [True]*len(thrds)) + + class QueueTest(BaseQueueTestMixin): def setUp(self): diff --git a/Misc/NEWS.d/next/Library/2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst b/Misc/NEWS.d/next/Library/2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst new file mode 100644 index 00000000000000..61af8d77ae94ee --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-03-07-15-42-27.gh-issue-96471.oWZtwQ.rst @@ -0,0 +1 @@ +Add "shutdown" method to multithreading.Queue, asyncio.Queue and multiprocessing.queue.Queue and multiprocessing.queue.JoinableQueue