From 440a7024ff65c342fab43e98dc3668d0b44d394e Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 1 Sep 2022 18:33:18 +1000 Subject: [PATCH 01/19] Add asyncio queue shutdown * Include docs --- Doc/library/asyncio-queue.rst | 32 ++++++++++++++++ Lib/asyncio/queues.py | 57 +++++++++++++++++++++++++++- Lib/test/test_asyncio/test_queues.py | 57 ++++++++++++++++++++++++++++ 3 files changed, 145 insertions(+), 1 deletion(-) diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index d86fbc21351e2d..008a7f908f4fbc 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -62,6 +62,9 @@ Queue Remove and return an item from the queue. If queue is empty, wait until an item is available. + Raises :exc:`QueueShutDown` if the queue has been shut down and + is empty, or if the queue has been shut down immediately. + .. method:: get_nowait() Return an item if one is immediately available, else raise @@ -77,11 +80,16 @@ Queue work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. + Raises :exc:`QueueShutDown` if the queue has been shut down + immediately. + .. coroutinemethod:: put(item) Put an item into the queue. If the queue is full, wait until a free slot is available before adding the item. + Raises :exc:`QueueShutDown` if the queue has been shut down. + .. method:: put_nowait(item) Put an item into the queue without blocking. @@ -92,6 +100,19 @@ Queue Return the number of items in the queue. + .. method:: shutdown(immediate=False) + + Shut-down the queue, making queue gets and puts raise + :exc:`QueueShutDown`. + + By default, gets will only raise once the queue is empty. Set + *immediate* to true to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if *immediate* is true. + + .. versionadded:: 3.12 + .. method:: task_done() Indicate that a formerly enqueued task is complete. @@ -108,6 +129,9 @@ Queue Raises :exc:`ValueError` if called more times than there were items placed in the queue. + Raises :exc:`QueueShutDown` if the queue has been shut down + immediately. + Priority Queue ============== @@ -145,6 +169,14 @@ Exceptions on a queue that has reached its *maxsize*. +.. exception:: QueueShutDown + + Exception raised when getting an item from or putting an item onto a + queue which has been shut down. + + .. versionadded:: 3.12 + + Examples ======== diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a9656a6df561ba..a869993a1de3fe 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -1,4 +1,11 @@ -__all__ = ('Queue', 'PriorityQueue', 'LifoQueue', 'QueueFull', 'QueueEmpty') +__all__ = ( + 'Queue', + 'PriorityQueue', + 'LifoQueue', + 'QueueFull', + 'QueueEmpty', + 'QueueShutDown', +) import collections import heapq @@ -18,6 +25,16 @@ class QueueFull(Exception): pass +class QueueShutDown(Exception): + """Raised when putting on to or getting from a shut-down Queue.""" + pass + + +_queue_alive = "alive" +_queue_shutdown = "shutdown" +_queue_shutdown_immediate = "shutdown-immediate" + + class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -41,6 +58,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) + self.shutdown_state = _queue_alive # These three are overridable in subclasses. @@ -113,6 +131,8 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. """ + if self.shutdown_state != _queue_alive: + raise QueueShutDown while self.full(): putter = self._get_loop().create_future() self._putters.append(putter) @@ -132,6 +152,8 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise + if self.shutdown_state != _queue_alive: + raise QueueShutDown return self.put_nowait(item) def put_nowait(self, item): @@ -139,6 +161,8 @@ def put_nowait(self, item): If no free slot is immediately available, raise QueueFull. """ + if self.shutdown_state != _queue_alive: + raise QueueShutDown if self.full(): raise QueueFull self._put(item) @@ -151,7 +175,11 @@ async def get(self): If queue is empty, wait until an item is available. """ + if self.shutdown_state == _queue_shutdown_immediate: + raise QueueShutDown while self.empty(): + if self.shutdown_state != _queue_alive: + raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) try: @@ -170,6 +198,8 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise + if self.shutdown_state == _queue_shutdown_immediate: + raise QueueShutDown return self.get_nowait() def get_nowait(self): @@ -178,7 +208,11 @@ def get_nowait(self): Return an item if one is immediately available, else raise QueueEmpty. """ if self.empty(): + if self.shutdown_state != _queue_alive: + raise QueueShutDown raise QueueEmpty + elif self.shutdown_state == _queue_shutdown_immediate: + raise QueueShutDown item = self._get() self._wakeup_next(self._putters) return item @@ -214,6 +248,27 @@ async def join(self): if self._unfinished_tasks > 0: await self._finished.wait() + def shutdown(self, immediate=False): + """Shut-down the queue, making queue gets and puts raise. + + By default, gets will only raise once the queue is empty. Set + 'immediate' to True to make gets raise immediately instead. + + All blocked callers of put() will be unblocked, and also get() + and join() if 'immediate'. The QueueShutDown exception is raised. + """ + if immediate: + self.shutdown_state = _queue_shutdown_immediate + while self._getters: + getter = self._getters.popleft() + if not getter.done(): + getter.set_result(None) + else: + self.shutdown_state = _queue_shutdown + while self._putters: + putter = self._putters.popleft() + if not putter.done(): + putter.set_result(None) class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 2d058ccf6a8c72..418c3fe618d89b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -522,5 +522,62 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa q_class = asyncio.PriorityQueue +class _QueueShutdownTestMixin: + q_class = None + + async def test_empty(self): + q = self.q_class() + q.shutdown() + try: + await q.put("data") + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + try: + await q.get() + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + + async def test_nonempty(self): + q = self.q_class() + q.put_nowait("data") + q.shutdown() + await q.get() + try: + await q.get() + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + + async def test_immediate(self): + q = self.q_class() + q.put_nowait("data") + q.shutdown(immediate=True) + try: + await q.get() + self.fail("Didn't appear to shut-down queue") + except asyncio.QueueShutDown: + pass + + +class QueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.Queue + + +class LifoQueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.LifoQueue + + +class PriorityQueueShutdownTests( + _QueueShutdownTestMixin, unittest.IsolatedAsyncioTestCase +): + q_class = asyncio.PriorityQueue + + if __name__ == '__main__': unittest.main() From fb458db0a2e6adce92fe846c2dfaa781888fc256 Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 10 Feb 2023 17:52:33 +0100 Subject: [PATCH 02/19] Fix queue shutdown * Queue state enum members are capitalised * Termination state in str/repr * Include raised exception in docstrings * Factor out queue-state checks and updates to methods * Logic fixes in get_nowait and shutdown * Handle queue shutdown in task_done and join * Updated tests * Document feature added in 3.13 --- Doc/library/asyncio-queue.rst | 4 +- Lib/asyncio/queues.py | 80 ++++-- Lib/test/test_asyncio/test_queues.py | 358 +++++++++++++++++++++++++-- 3 files changed, 404 insertions(+), 38 deletions(-) diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index 008a7f908f4fbc..b66bc9341a2081 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -111,7 +111,7 @@ Queue All blocked callers of put() will be unblocked, and also get() and join() if *immediate* is true. - .. versionadded:: 3.12 + .. versionadded:: 3.13 .. method:: task_done() @@ -174,7 +174,7 @@ Exceptions Exception raised when getting an item from or putting an item onto a queue which has been shut down. - .. versionadded:: 3.12 + .. versionadded:: 3.13 Examples diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index a869993a1de3fe..ae2d55478342e6 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -8,6 +8,7 @@ ) import collections +import enum import heapq from types import GenericAlias @@ -30,9 +31,10 @@ class QueueShutDown(Exception): pass -_queue_alive = "alive" -_queue_shutdown = "shutdown" -_queue_shutdown_immediate = "shutdown-immediate" +class _QueueState(enum.Enum): + ALIVE = "alive" + SHUTDOWN = "shutdown" + SHUTDOWN_IMMEDIATE = "shutdown-immediate" class Queue(mixins._LoopBoundMixin): @@ -58,7 +60,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) - self.shutdown_state = _queue_alive + self._shutdown_state = _QueueState.ALIVE # These three are overridable in subclasses. @@ -99,6 +101,8 @@ def _format(self): result += f' _putters[{len(self._putters)}]' if self._unfinished_tasks: result += f' tasks={self._unfinished_tasks}' + if not self._is_alive(): + result += f' state={self._shutdown_state.value}' return result def qsize(self): @@ -130,8 +134,10 @@ async def put(self, item): Put an item into the queue. If the queue is full, wait until a free slot is available before adding item. + + Raises QueueShutDown if the queue has been shut down. """ - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise QueueShutDown while self.full(): putter = self._get_loop().create_future() @@ -145,14 +151,14 @@ async def put(self, item): self._putters.remove(putter) except ValueError: # The putter could be removed from self._putters by a - # previous get_nowait call. + # previous get_nowait call or a shutdown call. pass if not self.full() and not putter.cancelled(): # We were woken up by get_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise QueueShutDown return self.put_nowait(item) @@ -160,8 +166,10 @@ def put_nowait(self, item): """Put an item into the queue without blocking. If no free slot is immediately available, raise QueueFull. + + Raises QueueShutDown if the queue has been shut down. """ - if self.shutdown_state != _queue_alive: + if not self._is_alive(): raise QueueShutDown if self.full(): raise QueueFull @@ -174,11 +182,14 @@ async def get(self): """Remove and return an item from the queue. If queue is empty, wait until an item is available. + + Raises QueueShutDown if the queue has been shut down and is empty, or + if the queue has been shut down immediately. """ - if self.shutdown_state == _queue_shutdown_immediate: + if self._is_shutdown_immediate(): raise QueueShutDown while self.empty(): - if self.shutdown_state != _queue_alive: + if self._is_shutdown(): raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) @@ -191,14 +202,15 @@ async def get(self): self._getters.remove(getter) except ValueError: # The getter could be removed from self._getters by a - # previous put_nowait call. + # previous put_nowait call, + # or a shutdown call. pass if not self.empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self.shutdown_state == _queue_shutdown_immediate: + if self._is_shutdown_immediate(): raise QueueShutDown return self.get_nowait() @@ -206,13 +218,16 @@ def get_nowait(self): """Remove and return an item from the queue. Return an item if one is immediately available, else raise QueueEmpty. + + Raises QueueShutDown if the queue has been shut down and is empty, or + if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown if self.empty(): - if self.shutdown_state != _queue_alive: + if self._is_shutdown(): raise QueueShutDown raise QueueEmpty - elif self.shutdown_state == _queue_shutdown_immediate: - raise QueueShutDown item = self._get() self._wakeup_next(self._putters) return item @@ -230,7 +245,11 @@ def task_done(self): Raises ValueError if called more times than there were items placed in the queue. + + Raises QueueShutDown if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self._unfinished_tasks -= 1 @@ -244,9 +263,15 @@ async def join(self): queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. + + Raises QueueShutDown if the queue has been shut down immediately. """ + if self._is_shutdown_immediate(): + raise QueueShutDown if self._unfinished_tasks > 0: await self._finished.wait() + if self._is_shutdown_immediate(): + raise QueueShutDown def shutdown(self, immediate=False): """Shut-down the queue, making queue gets and puts raise. @@ -257,19 +282,40 @@ def shutdown(self, immediate=False): All blocked callers of put() will be unblocked, and also get() and join() if 'immediate'. The QueueShutDown exception is raised. """ + if self._is_shutdown_immediate(): + return + # here _shutdown_state is ALIVE or SHUTDOWN if immediate: - self.shutdown_state = _queue_shutdown_immediate + self._set_shutdown_immediate() while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) + # Release all 'blocked' tasks/coros in `join()` + self._finished.set() else: - self.shutdown_state = _queue_shutdown + self._set_shutdown() while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) + def _is_alive(self): + return self._shutdown_state is _QueueState.ALIVE + + def _is_shutdown(self): + return self._shutdown_state is _QueueState.SHUTDOWN + + def _is_shutdown_immediate(self): + return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE + + def _set_shutdown(self): + self._shutdown_state = _QueueState.SHUTDOWN + + def _set_shutdown_immediate(self): + self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE + + class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 418c3fe618d89b..bf4a5a78a8f0cc 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -525,40 +525,360 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa class _QueueShutdownTestMixin: q_class = None - async def test_empty(self): - q = self.q_class() - q.shutdown() + async def asyncSetUp(self): + await super().asyncSetUp() + self.delay = 0.001 + + async def _get(self, q, go, results, shutdown=False): + await go.wait() try: - await q.put("data") - self.fail("Didn't appear to shut-down queue") + msg = await q.get() + results.append(not shutdown) + return msg except asyncio.QueueShutDown: - pass + results.append(shutdown) + return shutdown + + async def _get_nowait(self, q, go, results, shutdown=False): + await go.wait() try: - await q.get() - self.fail("Didn't appear to shut-down queue") + msg = q.get_nowait() + results.append(not shutdown) + return msg + except asyncio.QueueShutDown: + results.append(shutdown) + return shutdown + + async def _get_task_done(self, q, go, results): + await go.wait() + try: + msg = await q.get() + q.task_done() + results.append(True) + return msg + except asyncio.QueueShutDown: + results.append(False) + return False + + async def _put(self, q, go, msg, results, shutdown=False): + await go.wait() + try: + await q.put(msg) + results.append(not shutdown) + return not shutdown except asyncio.QueueShutDown: - pass + results.append(shutdown) + return shutdown + + async def _put_nowait(self, q, go, msg, results, shutdown=False): + await go.wait() + try: + q.put_nowait(msg) + results.append(False) + return not shutdown + except asyncio.QueueShutDown: + results.append(True) + return shutdown + + async def _shutdown(self, q, go, immediate): + q.shutdown(immediate) + await asyncio.sleep(self.delay) + go.set() + await asyncio.sleep(self.delay) + + async def _join(self, q, results, shutdown=False): + try: + await q.join() + results.append(not shutdown) + return True + except asyncio.QueueShutDown: + results.append(shutdown) + return False + except asyncio.CancelledError: + results.append(shutdown) + raise + + async def test_shutdown_empty(self): + q = self.q_class() + q.shutdown() + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): + await q.put("data") + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): + await q.get() - async def test_nonempty(self): + async def test_shutdown_nonempty(self): q = self.q_class() q.put_nowait("data") q.shutdown() await q.get() - try: + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass - async def test_immediate(self): + async def test_shutdown_immediate(self): q = self.q_class() q.put_nowait("data") q.shutdown(immediate=True) - try: + with self.assertRaises( + asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" + ): await q.get() - self.fail("Didn't appear to shut-down queue") - except asyncio.QueueShutDown: - pass + + async def test_shutdown_repr(self): + q = self.q_class(4) + # when alive, not in repr + self.assertNotIn("alive", repr(q)) + + q = self.q_class(6) + q.shutdown(immediate=False) + self.assertIn("shutdown", repr(q)) + + q = self.q_class(8) + q.shutdown(immediate=True) + self.assertIn("shutdown-immediate", repr(q)) + + async def test_shutdown_allowed_transitions(self): + # allowed transitions would be from alive via shutdown to immediate + q = self.q_class() + self.assertEqual("alive", q._shutdown_state.value) + + q.shutdown() + self.assertEqual("shutdown", q._shutdown_state.value) + + q.shutdown(immediate=True) + self.assertEqual("shutdown-immediate", q._shutdown_state.value) + + q.shutdown(immediate=False) + self.assertNotEqual("shutdown", q._shutdown_state.value) + + async def _shutdown_all_methods_in_one_task(self, immediate): + q = asyncio.Queue() + await q.put("L") + q.put_nowait("O") + q.shutdown(immediate) + with self.assertRaises(asyncio.QueueShutDown): + await q.put("E") + with self.assertRaises(asyncio.QueueShutDown): + q.put_nowait("W") + + if immediate: + with self.assertRaises(asyncio.QueueShutDown): + await q.get() + with self.assertRaises(asyncio.QueueShutDown): + q.get_nowait() + with self.assertRaises(asyncio.QueueShutDown): + q.task_done() + with self.assertRaises(asyncio.QueueShutDown): + await q.join() + else: + self.assertIn(await q.get(), "LO") + q.task_done() + self.assertIn(q.get_nowait(), "LO") + q.task_done() + await q.join() + # on shutdown(immediate=False) + # when queue is empty, should raise ShutDown Exception + with self.assertRaises(asyncio.QueueShutDown): + await q.get() + with self.assertRaises(asyncio.QueueShutDown): + q.get_nowait() + + async def test_shutdown_all_methods_in_one_task(self): + return await self._shutdown_all_methods_in_one_task(False) + + async def test_shutdown_immediate_all_methods_in_one_task(self): + return await self._shutdown_all_methods_in_one_task(True) + + async def _shutdown_putters(self, immediate): + delay = self.delay + q = self.q_class(2) + results = [] + await q.put("E") + await q.put("W") + # queue full + t = asyncio.create_task(q.put("Y")) + await asyncio.sleep(delay) + self.assertTrue(len(q._putters) == 1) + with self.assertRaises(asyncio.QueueShutDown): + # here `t` raises a QueueShuDown + q.shutdown(immediate) + await t + self.assertTrue(not q._putters) + + async def test_shutdown_putters_deque(self): + return await self._shutdown_putters(False) + + async def test_shutdown_immediate_putters_deque(self): + return await self._shutdown_putters(True) + + async def _shutdown_getters(self, immediate): + delay = self.delay + q = self.q_class(1) + results = [] + await q.put("Y") + nb = q.qsize() + # queue full + + asyncio.create_task(q.get()) + await asyncio.sleep(delay) + t = asyncio.create_task(q.get()) + await asyncio.sleep(delay) + self.assertTrue(len(q._getters) == 1) + if immediate: + # here `t` raises a QueueShuDown + with self.assertRaises(asyncio.QueueShutDown): + q.shutdown(immediate) + await t + self.assertTrue(not q._getters) + else: + # here `t` is always pending + q.shutdown(immediate) + await asyncio.sleep(delay) + self.assertTrue(q._getters) + self.assertEqual(q._unfinished_tasks, nb) + + + async def test_shutdown_getters_deque(self): + return await self._shutdown_getters(False) + + async def test_shutdown_immediate_getters_deque(self): + return await self._shutdown_getters(True) + + async def _shutdown_get(self, immediate): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + nb = q.qsize() + # queue full + + if immediate: + coros = ( + (self._get(q, go, results, shutdown=True)), + (self._get_nowait(q, go, results, shutdown=True)), + ) + else: + coros = ( + # one of these tasks shoud raise Shutdown + (self._get(q, go, results)), + (self._get_nowait(q, go, results)), + (self._get_nowait(q, go, results)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + res = await asyncio.gather(*t) + if immediate: + self.assertEqual(results, [True]*len(coros)) + else: + self.assertListEqual(sorted(results), [False] + [True]*(len(coros)-1)) + + async def test_shutdown_get(self): + return await self._shutdown_get(False) + + async def test_shutdown_immediate_get(self): + return await self._shutdown_get(True) + + async def test_shutdown_get_task_done_join(self): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + self.assertEqual(q._unfinished_tasks, q.qsize()) + + # queue full + + coros = ( + (self._get_task_done(q, go, results)), + (self._get_task_done(q, go, results)), + (self._join(q, results)), + (self._join(q, results)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, False))) + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + self.assertIn(t[0].result(), "YD") + self.assertIn(t[1].result(), "YD") + self.assertNotEqual(t[0].result(), t[1].result()) + self.assertEqual(q._unfinished_tasks, 0) + + async def _shutdown_put(self, immediate): + q = self.q_class() + results = [] + go = asyncio.Event() + # queue not empty + + coros = ( + (self._put(q, go, "Y", results, shutdown=True)), + (self._put_nowait(q, go, "D", results, shutdown=True)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + + async def test_shutdown_put(self): + return await self._shutdown_put(False) + + async def test_shutdown_immediate_put(self): + return await self._shutdown_put(True) + + async def _shutdown_put_join(self, immediate): + q = self.q_class(2) + results = [] + go = asyncio.Event() + await q.put("Y") + await q.put("D") + nb = q.qsize() + # queue fulled + + async def _cancel_join_task(q, delay, t): + await asyncio.sleep(delay) + t.cancel() + await asyncio.sleep(0) + q._finished.set() + + coros = ( + (self._put(q, go, "E", results, shutdown=True)), + (self._put_nowait(q, go, "W", results, shutdown=True)), + (self._join(q, results, shutdown=True)), + ) + t = [] + for coro in coros: + t.append(asyncio.create_task(coro)) + t.append(asyncio.create_task(self._shutdown(q, go, immediate))) + if not immediate: + # Here calls `join` is a blocking operation + # so wait for a delay and cancel this blocked task + t.append(asyncio.create_task(_cancel_join_task(q, 0.01, t[2]))) + with self.assertRaises(asyncio.CancelledError) as e: + await asyncio.gather(*t) + else: + res = await asyncio.gather(*t) + + self.assertEqual(results, [True]*len(coros)) + self.assertTrue(q._finished.is_set()) + + async def test_shutdown_put_join(self): + return await self._shutdown_put_join(False) + + async def test_shutdown_immediate_put_and_join(self): + return await self._shutdown_put_join(True) class QueueShutdownTests( From e5951ac5445f2346885808b6081aa9b27ff5d3ef Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sat, 6 May 2023 05:00:46 +0000 Subject: [PATCH 03/19] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst | 1 + 1 file changed, 1 insertion(+) create mode 100644 Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst diff --git a/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst b/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst new file mode 100644 index 00000000000000..ac07ef25636964 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst @@ -0,0 +1 @@ +Add asyncio.Queue termination with ``shutdown`` method. From d5e925d7be1140c2db34d003a22d4afaa748bce9 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 20 Feb 2024 21:12:18 +1000 Subject: [PATCH 04/19] Add references in docs and news entry --- Doc/library/asyncio-queue.rst | 14 ++++++++------ .../2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst | 3 ++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index b66bc9341a2081..f5d82296dcd946 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -102,14 +102,16 @@ Queue .. method:: shutdown(immediate=False) - Shut-down the queue, making queue gets and puts raise - :exc:`QueueShutDown`. + Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` + raise :exc:`QueueShutDown`. - By default, gets will only raise once the queue is empty. Set - *immediate* to true to make gets raise immediately instead. + By default, :meth:`~Queue.get` on a shut down queue will only raise once + the queue is empty. Set *immediate* to true to make gets raise + immediately instead. - All blocked callers of put() will be unblocked, and also get() - and join() if *immediate* is true. + All blocked callers of :meth:`~Queue.put` will be unblocked. If + *immediate* is true, also unblock callers of :meth:`~Queue.get` and + :meth:`~Queue.join`. .. versionadded:: 3.13 diff --git a/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst b/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst index ac07ef25636964..989a7411168a19 100644 --- a/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst +++ b/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst @@ -1 +1,2 @@ -Add asyncio.Queue termination with ``shutdown`` method. +Add :py:class:`asyncio.Queue`` termination with +:py:meth:`~asyncio.Queue.shutdown` method. From bd2a7c397d75a0297e01dbaf2b19c3d313204a1a Mon Sep 17 00:00:00 2001 From: Laurie O Date: Wed, 20 Mar 2024 18:55:59 +1000 Subject: [PATCH 05/19] Improve docs --- Doc/library/asyncio-queue.rst | 14 +++++++------- Doc/whatsnew/3.13.rst | 4 ++++ Lib/asyncio/queues.py | 3 +-- 3 files changed, 12 insertions(+), 9 deletions(-) diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index f5d82296dcd946..f648de24e48a28 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -105,13 +105,13 @@ Queue Shut down the queue, making :meth:`~Queue.get` and :meth:`~Queue.put` raise :exc:`QueueShutDown`. - By default, :meth:`~Queue.get` on a shut down queue will only raise once - the queue is empty. Set *immediate* to true to make gets raise - immediately instead. + By default, :meth:`~Queue.get` on a shut down queue will only + raise once the queue is empty. Set *immediate* to true to make + :meth:`~Queue.get` raise immediately instead. All blocked callers of :meth:`~Queue.put` will be unblocked. If - *immediate* is true, also unblock callers of :meth:`~Queue.get` and - :meth:`~Queue.join`. + *immediate* is true, also unblock callers of :meth:`~Queue.get` + and :meth:`~Queue.join`. .. versionadded:: 3.13 @@ -173,8 +173,8 @@ Exceptions .. exception:: QueueShutDown - Exception raised when getting an item from or putting an item onto a - queue which has been shut down. + Exception raised when :meth:`~Queue.put` or :meth:`~Queue.get` is + called on a queue which has been shut down. .. versionadded:: 3.13 diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index 0e04dcd196d306..ddbfabb09c8c01 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -275,6 +275,10 @@ asyncio forcefully close an asyncio server. (Contributed by Pierre Ossman in :gh:`113538`.) +* Add :meth:`asyncio.queues.Queue.shutdown` (along with + :exc:`asyncio.queues.QueueShutDown`) for queue termination. + (Contributed by Laurie Opperman and Yves Duprat in :gh:`104228`.) + base64 --- diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index ae2d55478342e6..bda5252952c00f 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -202,8 +202,7 @@ async def get(self): self._getters.remove(getter) except ValueError: # The getter could be removed from self._getters by a - # previous put_nowait call, - # or a shutdown call. + # previous put_nowait call, or a shutdown call. pass if not self.empty() and not getter.cancelled(): # We were woken up by put_nowait(), but can't take From e9ac8de8ab212e8912dd0a57057d8ec440b00f72 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Wed, 20 Mar 2024 18:55:33 +1000 Subject: [PATCH 06/19] Consume queue on immediate shutdown --- Doc/library/asyncio-queue.rst | 9 +- Doc/whatsnew/3.13.rst | 2 +- Lib/asyncio/queues.py | 77 ++--- Lib/test/test_asyncio/test_queues.py | 415 ++++++--------------------- 4 files changed, 115 insertions(+), 388 deletions(-) diff --git a/Doc/library/asyncio-queue.rst b/Doc/library/asyncio-queue.rst index f648de24e48a28..030d4310942d7a 100644 --- a/Doc/library/asyncio-queue.rst +++ b/Doc/library/asyncio-queue.rst @@ -80,9 +80,6 @@ Queue work on it is complete. When the count of unfinished tasks drops to zero, :meth:`join` unblocks. - Raises :exc:`QueueShutDown` if the queue has been shut down - immediately. - .. coroutinemethod:: put(item) Put an item into the queue. If the queue is full, wait until a @@ -128,12 +125,12 @@ Queue call was received for every item that had been :meth:`~Queue.put` into the queue). + ``shutdown(immediate=True)`` calls :meth:`task_done` for each + remaining item in the queue. + Raises :exc:`ValueError` if called more times than there were items placed in the queue. - Raises :exc:`QueueShutDown` if the queue has been shut down - immediately. - Priority Queue ============== diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index ddbfabb09c8c01..dfdf304c0dbc37 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -277,7 +277,7 @@ asyncio * Add :meth:`asyncio.queues.Queue.shutdown` (along with :exc:`asyncio.queues.QueueShutDown`) for queue termination. - (Contributed by Laurie Opperman and Yves Duprat in :gh:`104228`.) + (Contributed by Laurie Opperman in :gh:`104228`.) base64 --- diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index bda5252952c00f..174b0f68ec17bc 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -8,7 +8,6 @@ ) import collections -import enum import heapq from types import GenericAlias @@ -31,12 +30,6 @@ class QueueShutDown(Exception): pass -class _QueueState(enum.Enum): - ALIVE = "alive" - SHUTDOWN = "shutdown" - SHUTDOWN_IMMEDIATE = "shutdown-immediate" - - class Queue(mixins._LoopBoundMixin): """A queue, useful for coordinating producer and consumer coroutines. @@ -60,7 +53,7 @@ def __init__(self, maxsize=0): self._finished = locks.Event() self._finished.set() self._init(maxsize) - self._shutdown_state = _QueueState.ALIVE + self._is_shutdown = False # These three are overridable in subclasses. @@ -101,8 +94,8 @@ def _format(self): result += f' _putters[{len(self._putters)}]' if self._unfinished_tasks: result += f' tasks={self._unfinished_tasks}' - if not self._is_alive(): - result += f' state={self._shutdown_state.value}' + if self._is_shutdown: + result += ' shutdown' return result def qsize(self): @@ -137,7 +130,7 @@ async def put(self, item): Raises QueueShutDown if the queue has been shut down. """ - if not self._is_alive(): + if self._is_shutdown: raise QueueShutDown while self.full(): putter = self._get_loop().create_future() @@ -158,7 +151,7 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if not self._is_alive(): + if self._is_shutdown: raise QueueShutDown return self.put_nowait(item) @@ -169,7 +162,7 @@ def put_nowait(self, item): Raises QueueShutDown if the queue has been shut down. """ - if not self._is_alive(): + if self._is_shutdown: raise QueueShutDown if self.full(): raise QueueFull @@ -186,11 +179,9 @@ async def get(self): Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately. """ - if self._is_shutdown_immediate(): + if self._is_shutdown and self.empty(): raise QueueShutDown while self.empty(): - if self._is_shutdown(): - raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) try: @@ -209,7 +200,7 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self._is_shutdown_immediate(): + if self._is_shutdown and self.empty(): raise QueueShutDown return self.get_nowait() @@ -221,10 +212,8 @@ def get_nowait(self): Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately. """ - if self._is_shutdown_immediate(): - raise QueueShutDown if self.empty(): - if self._is_shutdown(): + if self._is_shutdown: raise QueueShutDown raise QueueEmpty item = self._get() @@ -242,13 +231,12 @@ def task_done(self): been processed (meaning that a task_done() call was received for every item that had been put() into the queue). - Raises ValueError if called more times than there were items placed in + shutdown(immediate=True) calls task_done() for each remaining item in the queue. - Raises QueueShutDown if the queue has been shut down immediately. + Raises ValueError if called more times than there were items placed in + the queue. """ - if self._is_shutdown_immediate(): - raise QueueShutDown if self._unfinished_tasks <= 0: raise ValueError('task_done() called too many times') self._unfinished_tasks -= 1 @@ -262,58 +250,37 @@ async def join(self): queue. The count goes down whenever a consumer calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks. - - Raises QueueShutDown if the queue has been shut down immediately. """ - if self._is_shutdown_immediate(): - raise QueueShutDown if self._unfinished_tasks > 0: await self._finished.wait() - if self._is_shutdown_immediate(): - raise QueueShutDown def shutdown(self, immediate=False): - """Shut-down the queue, making queue gets and puts raise. + """Shut-down the queue, making queue gets and puts raise + QueueShutDown. By default, gets will only raise once the queue is empty. Set 'immediate' to True to make gets raise immediately instead. All blocked callers of put() will be unblocked, and also get() - and join() if 'immediate'. The QueueShutDown exception is raised. + and join() if 'immediate'. """ - if self._is_shutdown_immediate(): - return - # here _shutdown_state is ALIVE or SHUTDOWN + self._is_shutdown = True if immediate: - self._set_shutdown_immediate() + while not self.empty(): + self._get() + if self._unfinished_tasks > 0: + self._unfinished_tasks -= 1 + if self._unfinished_tasks <= 0: + self._finished.set() while self._getters: getter = self._getters.popleft() if not getter.done(): getter.set_result(None) - # Release all 'blocked' tasks/coros in `join()` - self._finished.set() - else: - self._set_shutdown() while self._putters: putter = self._putters.popleft() if not putter.done(): putter.set_result(None) - def _is_alive(self): - return self._shutdown_state is _QueueState.ALIVE - - def _is_shutdown(self): - return self._shutdown_state is _QueueState.SHUTDOWN - - def _is_shutdown_immediate(self): - return self._shutdown_state is _QueueState.SHUTDOWN_IMMEDIATE - - def _set_shutdown(self): - self._shutdown_state = _QueueState.SHUTDOWN - - def _set_shutdown_immediate(self): - self._shutdown_state = _QueueState.SHUTDOWN_IMMEDIATE - class PriorityQueue(Queue): """A subclass of Queue; retrieves entries in priority order (lowest first). diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index bf4a5a78a8f0cc..364f95a2951724 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -525,360 +525,123 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa class _QueueShutdownTestMixin: q_class = None - async def asyncSetUp(self): - await super().asyncSetUp() - self.delay = 0.001 - - async def _get(self, q, go, results, shutdown=False): - await go.wait() - try: - msg = await q.get() - results.append(not shutdown) - return msg - except asyncio.QueueShutDown: - results.append(shutdown) - return shutdown - - async def _get_nowait(self, q, go, results, shutdown=False): - await go.wait() - try: - msg = q.get_nowait() - results.append(not shutdown) - return msg - except asyncio.QueueShutDown: - results.append(shutdown) - return shutdown - - async def _get_task_done(self, q, go, results): - await go.wait() - try: - msg = await q.get() - q.task_done() - results.append(True) - return msg - except asyncio.QueueShutDown: - results.append(False) - return False - - async def _put(self, q, go, msg, results, shutdown=False): - await go.wait() - try: - await q.put(msg) - results.append(not shutdown) - return not shutdown - except asyncio.QueueShutDown: - results.append(shutdown) - return shutdown - - async def _put_nowait(self, q, go, msg, results, shutdown=False): - await go.wait() + @staticmethod + async def _ping_awaitable(a): try: - q.put_nowait(msg) - results.append(False) - return not shutdown - except asyncio.QueueShutDown: - results.append(True) - return shutdown - - async def _shutdown(self, q, go, immediate): - q.shutdown(immediate) - await asyncio.sleep(self.delay) - go.set() - await asyncio.sleep(self.delay) - - async def _join(self, q, results, shutdown=False): - try: - await q.join() - results.append(not shutdown) - return True - except asyncio.QueueShutDown: - results.append(shutdown) - return False - except asyncio.CancelledError: - results.append(shutdown) - raise + await asyncio.wait_for(asyncio.shield(a), 0.01) + except TimeoutError: + pass + + def assertRaisesShutdown(self, msg="Didn't appear to shut-down queue"): + return self.assertRaises(asyncio.QueueShutDown, msg=msg) + + async def test_format(self): + q = self.q_class() + q._is_shutdown = True + self.assertEqual(q._format(), 'maxsize=0 shutdown') async def test_shutdown_empty(self): q = self.q_class() + loop = asyncio.get_running_loop() + join_task = loop.create_task(q.join()) q.shutdown() - with self.assertRaises( - asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" - ): + + await self._ping_awaitable(join_task) + self.assertTrue(join_task.done()) + with self.assertRaisesShutdown(): await q.put("data") - with self.assertRaises( - asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" - ): + with self.assertRaisesShutdown(): await q.get() + await join_task async def test_shutdown_nonempty(self): q = self.q_class() + loop = asyncio.get_running_loop() q.put_nowait("data") + join_task = loop.create_task(q.join()) q.shutdown() - await q.get() - with self.assertRaises( - asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" - ): + + self.assertEqual(await q.get(), "data") + + await self._ping_awaitable(join_task) + self.assertFalse(join_task.done()) + + with self.assertRaisesShutdown(): + await q.put("data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): await q.get() + with self.assertRaisesShutdown(): + q.get_nowait() + + q.task_done() + + await self._ping_awaitable(join_task) + self.assertTrue(join_task.done()) + await join_task async def test_shutdown_immediate(self): q = self.q_class() + loop = asyncio.get_running_loop() q.put_nowait("data") + join_task = loop.create_task(q.join()) q.shutdown(immediate=True) - with self.assertRaises( - asyncio.QueueShutDown, msg="Didn't appear to shut-down queue" - ): - await q.get() - async def test_shutdown_repr(self): - q = self.q_class(4) - # when alive, not in repr - self.assertNotIn("alive", repr(q)) + await self._ping_awaitable(join_task) + self.assertTrue(join_task.done()) - q = self.q_class(6) - q.shutdown(immediate=False) - self.assertIn("shutdown", repr(q)) + with self.assertRaisesShutdown(): + await q.put("data") + with self.assertRaisesShutdown(): + q.put_nowait("data") - q = self.q_class(8) - q.shutdown(immediate=True) - self.assertIn("shutdown-immediate", repr(q)) + with self.assertRaisesShutdown(): + await q.get() + with self.assertRaisesShutdown(): + q.get_nowait() - async def test_shutdown_allowed_transitions(self): - # allowed transitions would be from alive via shutdown to immediate - q = self.q_class() - self.assertEqual("alive", q._shutdown_state.value) + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): + q.task_done() - q.shutdown() - self.assertEqual("shutdown", q._shutdown_state.value) + await self._ping_awaitable(join_task) + self.assertTrue(join_task.done()) + await join_task + async def test_shutdown_immediate_with_unfinished(self): + q = self.q_class() + loop = asyncio.get_running_loop() + q.put_nowait("data") + q.put_nowait("data") + join_task = loop.create_task(q.join()) + self.assertEqual(await q.get(), "data") q.shutdown(immediate=True) - self.assertEqual("shutdown-immediate", q._shutdown_state.value) - q.shutdown(immediate=False) - self.assertNotEqual("shutdown", q._shutdown_state.value) + await self._ping_awaitable(join_task) + self.assertFalse(join_task.done()) - async def _shutdown_all_methods_in_one_task(self, immediate): - q = asyncio.Queue() - await q.put("L") - q.put_nowait("O") - q.shutdown(immediate) - with self.assertRaises(asyncio.QueueShutDown): - await q.put("E") - with self.assertRaises(asyncio.QueueShutDown): - q.put_nowait("W") - - if immediate: - with self.assertRaises(asyncio.QueueShutDown): - await q.get() - with self.assertRaises(asyncio.QueueShutDown): - q.get_nowait() - with self.assertRaises(asyncio.QueueShutDown): - q.task_done() - with self.assertRaises(asyncio.QueueShutDown): - await q.join() - else: - self.assertIn(await q.get(), "LO") - q.task_done() - self.assertIn(q.get_nowait(), "LO") + with self.assertRaisesShutdown(): + await q.put("data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + + with self.assertRaisesShutdown(): + await q.get() + with self.assertRaisesShutdown(): + q.get_nowait() + + q.task_done() + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): q.task_done() - await q.join() - # on shutdown(immediate=False) - # when queue is empty, should raise ShutDown Exception - with self.assertRaises(asyncio.QueueShutDown): - await q.get() - with self.assertRaises(asyncio.QueueShutDown): - q.get_nowait() - - async def test_shutdown_all_methods_in_one_task(self): - return await self._shutdown_all_methods_in_one_task(False) - - async def test_shutdown_immediate_all_methods_in_one_task(self): - return await self._shutdown_all_methods_in_one_task(True) - - async def _shutdown_putters(self, immediate): - delay = self.delay - q = self.q_class(2) - results = [] - await q.put("E") - await q.put("W") - # queue full - t = asyncio.create_task(q.put("Y")) - await asyncio.sleep(delay) - self.assertTrue(len(q._putters) == 1) - with self.assertRaises(asyncio.QueueShutDown): - # here `t` raises a QueueShuDown - q.shutdown(immediate) - await t - self.assertTrue(not q._putters) - - async def test_shutdown_putters_deque(self): - return await self._shutdown_putters(False) - - async def test_shutdown_immediate_putters_deque(self): - return await self._shutdown_putters(True) - - async def _shutdown_getters(self, immediate): - delay = self.delay - q = self.q_class(1) - results = [] - await q.put("Y") - nb = q.qsize() - # queue full - - asyncio.create_task(q.get()) - await asyncio.sleep(delay) - t = asyncio.create_task(q.get()) - await asyncio.sleep(delay) - self.assertTrue(len(q._getters) == 1) - if immediate: - # here `t` raises a QueueShuDown - with self.assertRaises(asyncio.QueueShutDown): - q.shutdown(immediate) - await t - self.assertTrue(not q._getters) - else: - # here `t` is always pending - q.shutdown(immediate) - await asyncio.sleep(delay) - self.assertTrue(q._getters) - self.assertEqual(q._unfinished_tasks, nb) - - - async def test_shutdown_getters_deque(self): - return await self._shutdown_getters(False) - - async def test_shutdown_immediate_getters_deque(self): - return await self._shutdown_getters(True) - - async def _shutdown_get(self, immediate): - q = self.q_class(2) - results = [] - go = asyncio.Event() - await q.put("Y") - await q.put("D") - nb = q.qsize() - # queue full - - if immediate: - coros = ( - (self._get(q, go, results, shutdown=True)), - (self._get_nowait(q, go, results, shutdown=True)), - ) - else: - coros = ( - # one of these tasks shoud raise Shutdown - (self._get(q, go, results)), - (self._get_nowait(q, go, results)), - (self._get_nowait(q, go, results)), - ) - t = [] - for coro in coros: - t.append(asyncio.create_task(coro)) - t.append(asyncio.create_task(self._shutdown(q, go, immediate))) - res = await asyncio.gather(*t) - if immediate: - self.assertEqual(results, [True]*len(coros)) - else: - self.assertListEqual(sorted(results), [False] + [True]*(len(coros)-1)) - - async def test_shutdown_get(self): - return await self._shutdown_get(False) - - async def test_shutdown_immediate_get(self): - return await self._shutdown_get(True) - - async def test_shutdown_get_task_done_join(self): - q = self.q_class(2) - results = [] - go = asyncio.Event() - await q.put("Y") - await q.put("D") - self.assertEqual(q._unfinished_tasks, q.qsize()) - - # queue full - - coros = ( - (self._get_task_done(q, go, results)), - (self._get_task_done(q, go, results)), - (self._join(q, results)), - (self._join(q, results)), - ) - t = [] - for coro in coros: - t.append(asyncio.create_task(coro)) - t.append(asyncio.create_task(self._shutdown(q, go, False))) - res = await asyncio.gather(*t) - - self.assertEqual(results, [True]*len(coros)) - self.assertIn(t[0].result(), "YD") - self.assertIn(t[1].result(), "YD") - self.assertNotEqual(t[0].result(), t[1].result()) - self.assertEqual(q._unfinished_tasks, 0) - - async def _shutdown_put(self, immediate): - q = self.q_class() - results = [] - go = asyncio.Event() - # queue not empty - - coros = ( - (self._put(q, go, "Y", results, shutdown=True)), - (self._put_nowait(q, go, "D", results, shutdown=True)), - ) - t = [] - for coro in coros: - t.append(asyncio.create_task(coro)) - t.append(asyncio.create_task(self._shutdown(q, go, immediate))) - res = await asyncio.gather(*t) - - self.assertEqual(results, [True]*len(coros)) - - async def test_shutdown_put(self): - return await self._shutdown_put(False) - - async def test_shutdown_immediate_put(self): - return await self._shutdown_put(True) - - async def _shutdown_put_join(self, immediate): - q = self.q_class(2) - results = [] - go = asyncio.Event() - await q.put("Y") - await q.put("D") - nb = q.qsize() - # queue fulled - - async def _cancel_join_task(q, delay, t): - await asyncio.sleep(delay) - t.cancel() - await asyncio.sleep(0) - q._finished.set() - - coros = ( - (self._put(q, go, "E", results, shutdown=True)), - (self._put_nowait(q, go, "W", results, shutdown=True)), - (self._join(q, results, shutdown=True)), - ) - t = [] - for coro in coros: - t.append(asyncio.create_task(coro)) - t.append(asyncio.create_task(self._shutdown(q, go, immediate))) - if not immediate: - # Here calls `join` is a blocking operation - # so wait for a delay and cancel this blocked task - t.append(asyncio.create_task(_cancel_join_task(q, 0.01, t[2]))) - with self.assertRaises(asyncio.CancelledError) as e: - await asyncio.gather(*t) - else: - res = await asyncio.gather(*t) - - self.assertEqual(results, [True]*len(coros)) - self.assertTrue(q._finished.is_set()) - - async def test_shutdown_put_join(self): - return await self._shutdown_put_join(False) - - async def test_shutdown_immediate_put_and_join(self): - return await self._shutdown_put_join(True) + + await self._ping_awaitable(join_task) + self.assertTrue(join_task.done()) + await join_task class QueueShutdownTests( From 1e7813a65f6f89fce249cba4daefe500490d2db2 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 22 Mar 2024 11:05:22 +1000 Subject: [PATCH 07/19] Fix links in what's-new --- Doc/whatsnew/3.13.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/whatsnew/3.13.rst b/Doc/whatsnew/3.13.rst index dfdf304c0dbc37..f43e6eae948784 100644 --- a/Doc/whatsnew/3.13.rst +++ b/Doc/whatsnew/3.13.rst @@ -275,8 +275,8 @@ asyncio forcefully close an asyncio server. (Contributed by Pierre Ossman in :gh:`113538`.) -* Add :meth:`asyncio.queues.Queue.shutdown` (along with - :exc:`asyncio.queues.QueueShutDown`) for queue termination. +* Add :meth:`asyncio.Queue.shutdown` (along with + :exc:`asyncio.QueueShutDown`) for queue termination. (Contributed by Laurie Opperman in :gh:`104228`.) base64 From eec29bbe8991cbae4fb092d8e3bb72a70981a8e0 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Fri, 22 Mar 2024 16:46:43 +1000 Subject: [PATCH 08/19] Fix formatting in news entry --- .../next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst b/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst index 989a7411168a19..128a85d3d73ddf 100644 --- a/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst +++ b/Misc/NEWS.d/next/Library/2023-05-06-05-00-42.gh-issue-96471.S3X5I-.rst @@ -1,2 +1,2 @@ -Add :py:class:`asyncio.Queue`` termination with +Add :py:class:`asyncio.Queue` termination with :py:meth:`~asyncio.Queue.shutdown` method. From a233830ae07fd9d6566b93e463cf43273e3b6237 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 26 Mar 2024 13:35:33 +1000 Subject: [PATCH 09/19] Improve tests * Explicit 'immediate' argument * All tests test '*_nowait' * All tests check 'qsize' --- Lib/test/test_asyncio/test_queues.py | 23 +++++++++++++++++------ 1 file changed, 17 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 364f95a2951724..0f363b41c5337b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -544,14 +544,23 @@ async def test_shutdown_empty(self): q = self.q_class() loop = asyncio.get_running_loop() join_task = loop.create_task(q.join()) - q.shutdown() + q.shutdown(immediate=False) # unfinished tasks: 0 -> 0 + + self.assertEqual(q.qsize(), 0) await self._ping_awaitable(join_task) self.assertTrue(join_task.done()) + with self.assertRaisesShutdown(): await q.put("data") + with self.assertRaisesShutdown(): + q.put_nowait("data") + with self.assertRaisesShutdown(): await q.get() + with self.assertRaisesShutdown(): + q.get_nowait() + await join_task async def test_shutdown_nonempty(self): @@ -559,7 +568,9 @@ async def test_shutdown_nonempty(self): loop = asyncio.get_running_loop() q.put_nowait("data") join_task = loop.create_task(q.join()) - q.shutdown() + q.shutdown(immediate=False) # unfinished tasks: 1 -> 1 + + self.assertEqual(q.qsize(), 1) self.assertEqual(await q.get(), "data") @@ -587,7 +598,9 @@ async def test_shutdown_immediate(self): loop = asyncio.get_running_loop() q.put_nowait("data") join_task = loop.create_task(q.join()) - q.shutdown(immediate=True) + q.shutdown(immediate=True) # unfinished tasks: 1 -> 0 + + self.assertEqual(q.qsize(), 0) await self._ping_awaitable(join_task) self.assertTrue(join_task.done()) @@ -607,8 +620,6 @@ async def test_shutdown_immediate(self): ): q.task_done() - await self._ping_awaitable(join_task) - self.assertTrue(join_task.done()) await join_task async def test_shutdown_immediate_with_unfinished(self): @@ -618,7 +629,7 @@ async def test_shutdown_immediate_with_unfinished(self): q.put_nowait("data") join_task = loop.create_task(q.join()) self.assertEqual(await q.get(), "data") - q.shutdown(immediate=True) + q.shutdown(immediate=True) # unfinished tasks: 2 -> 1 await self._ping_awaitable(join_task) self.assertFalse(join_task.done()) From 420a2475ec8bb9a711888978e1e718250e706114 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Tue, 26 Mar 2024 19:30:43 +1000 Subject: [PATCH 10/19] Improve tests even more * Add missing 'qsize' check * Test unblocked put on shutdown * Await 'join_task' near task-done check --- Lib/test/test_asyncio/test_queues.py | 29 ++++++++++++++++++++++------ 1 file changed, 23 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 0f363b41c5337b..b848d2a1809e2b 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -527,8 +527,14 @@ class _QueueShutdownTestMixin: @staticmethod async def _ping_awaitable(a): + async def swallow(a_): + try: + return await a_ + except Exception: + pass + try: - await asyncio.wait_for(asyncio.shield(a), 0.01) + await asyncio.wait_for(asyncio.shield(swallow(a)), 0.01) except TimeoutError: pass @@ -550,6 +556,7 @@ async def test_shutdown_empty(self): await self._ping_awaitable(join_task) self.assertTrue(join_task.done()) + await join_task with self.assertRaisesShutdown(): await q.put("data") @@ -561,17 +568,26 @@ async def test_shutdown_empty(self): with self.assertRaisesShutdown(): q.get_nowait() - await join_task - async def test_shutdown_nonempty(self): - q = self.q_class() + q = self.q_class(maxsize=1) loop = asyncio.get_running_loop() + q.put_nowait("data") join_task = loop.create_task(q.join()) + put_task = loop.create_task(q.put("data2")) + + await self._ping_awaitable(put_task) + self.assertFalse(put_task.done()) + q.shutdown(immediate=False) # unfinished tasks: 1 -> 1 self.assertEqual(q.qsize(), 1) + await self._ping_awaitable(put_task) + self.assertTrue(put_task.done()) + with self.assertRaisesShutdown(): + await put_task + self.assertEqual(await q.get(), "data") await self._ping_awaitable(join_task) @@ -604,6 +620,7 @@ async def test_shutdown_immediate(self): await self._ping_awaitable(join_task) self.assertTrue(join_task.done()) + await join_task with self.assertRaisesShutdown(): await q.put("data") @@ -620,8 +637,6 @@ async def test_shutdown_immediate(self): ): q.task_done() - await join_task - async def test_shutdown_immediate_with_unfinished(self): q = self.q_class() loop = asyncio.get_running_loop() @@ -631,6 +646,8 @@ async def test_shutdown_immediate_with_unfinished(self): self.assertEqual(await q.get(), "data") q.shutdown(immediate=True) # unfinished tasks: 2 -> 1 + self.assertEqual(q.qsize(), 0) + await self._ping_awaitable(join_task) self.assertFalse(join_task.done()) From 6d9edd6cd768367ce72f8791c5307d9b46400822 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Wed, 27 Mar 2024 18:03:46 +1000 Subject: [PATCH 11/19] Document tests --- Lib/test/test_asyncio/test_queues.py | 65 +++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index b848d2a1809e2b..3c25def71121ac 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -526,15 +526,17 @@ class _QueueShutdownTestMixin: q_class = None @staticmethod - async def _ping_awaitable(a): - async def swallow(a_): + async def _ensure_started(task): + # Explicitly start (if not already) task + + async def swallow(x): try: - return await a_ + return await x except Exception: pass try: - await asyncio.wait_for(asyncio.shield(swallow(a)), 0.01) + await asyncio.wait_for(asyncio.shield(swallow(task)), 0.01) except TimeoutError: pass @@ -547,17 +549,24 @@ async def test_format(self): self.assertEqual(q._format(), 'maxsize=0 shutdown') async def test_shutdown_empty(self): + # Test shutting down an empty queue + + # Setup empty queue and join() task q = self.q_class() loop = asyncio.get_running_loop() join_task = loop.create_task(q.join()) + + # Perform shut-down q.shutdown(immediate=False) # unfinished tasks: 0 -> 0 self.assertEqual(q.qsize(), 0) - await self._ping_awaitable(join_task) + # Ensure join() task has successfully finished + await self._ensure_started(join_task) self.assertTrue(join_task.done()) await join_task + # Ensure put() and get() raise ShutDown with self.assertRaisesShutdown(): await q.put("data") with self.assertRaisesShutdown(): @@ -569,6 +578,9 @@ async def test_shutdown_empty(self): q.get_nowait() async def test_shutdown_nonempty(self): + # Test shutting down a non-empty queue + + # Setup full queue with 1 item, and join() and put() tasks q = self.q_class(maxsize=1) loop = asyncio.get_running_loop() @@ -576,23 +588,29 @@ async def test_shutdown_nonempty(self): join_task = loop.create_task(q.join()) put_task = loop.create_task(q.put("data2")) - await self._ping_awaitable(put_task) + # Ensure put() task is not finished + await self._ensure_started(put_task) self.assertFalse(put_task.done()) + # Perform shut-down q.shutdown(immediate=False) # unfinished tasks: 1 -> 1 self.assertEqual(q.qsize(), 1) - await self._ping_awaitable(put_task) + # Ensure put() task is finished, and raised ShutDown + await self._ensure_started(put_task) self.assertTrue(put_task.done()) with self.assertRaisesShutdown(): await put_task + # Ensure get() succeeds on enqueued item self.assertEqual(await q.get(), "data") - await self._ping_awaitable(join_task) + # Ensure join() task is not finished + await self._ensure_started(join_task) self.assertFalse(join_task.done()) + # Ensure put() and get() raise ShutDown with self.assertRaisesShutdown(): await q.put("data") with self.assertRaisesShutdown(): @@ -603,25 +621,38 @@ async def test_shutdown_nonempty(self): with self.assertRaisesShutdown(): q.get_nowait() + # Ensure there is 1 unfinished task q.task_done() + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): + q.task_done() - await self._ping_awaitable(join_task) + # Ensure join() task has successfully finished + await self._ensure_started(join_task) self.assertTrue(join_task.done()) await join_task async def test_shutdown_immediate(self): + # Test immediately shutting down a queue + + # Setup queue with 1 item, and a join() task q = self.q_class() loop = asyncio.get_running_loop() q.put_nowait("data") join_task = loop.create_task(q.join()) + + # Perform shut-down q.shutdown(immediate=True) # unfinished tasks: 1 -> 0 self.assertEqual(q.qsize(), 0) - await self._ping_awaitable(join_task) + # Ensure join() task has successfully finished + await self._ensure_started(join_task) self.assertTrue(join_task.done()) await join_task + # Ensure put() and get() raise ShutDown with self.assertRaisesShutdown(): await q.put("data") with self.assertRaisesShutdown(): @@ -632,25 +663,33 @@ async def test_shutdown_immediate(self): with self.assertRaisesShutdown(): q.get_nowait() + # Ensure there are no unfinished tasks with self.assertRaises( ValueError, msg="Didn't appear to mark all tasks done" ): q.task_done() async def test_shutdown_immediate_with_unfinished(self): + # Test immediately shutting down a queue with unfinished tasks + + # Setup queue with 2 items (1 retrieved), and a join() task q = self.q_class() loop = asyncio.get_running_loop() q.put_nowait("data") q.put_nowait("data") join_task = loop.create_task(q.join()) self.assertEqual(await q.get(), "data") + + # Perform shut-down q.shutdown(immediate=True) # unfinished tasks: 2 -> 1 self.assertEqual(q.qsize(), 0) - await self._ping_awaitable(join_task) + # Ensure join() task is not finished + await self._ensure_started(join_task) self.assertFalse(join_task.done()) + # Ensure put() and get() raise ShutDown with self.assertRaisesShutdown(): await q.put("data") with self.assertRaisesShutdown(): @@ -661,13 +700,15 @@ async def test_shutdown_immediate_with_unfinished(self): with self.assertRaisesShutdown(): q.get_nowait() + # Ensure there is 1 unfinished task q.task_done() with self.assertRaises( ValueError, msg="Didn't appear to mark all tasks done" ): q.task_done() - await self._ping_awaitable(join_task) + # Ensure join() task has successfully finished + await self._ensure_started(join_task) self.assertTrue(join_task.done()) await join_task From ddc6ad60276acc55620de6fa7683eedb9846be71 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 28 Mar 2024 15:55:30 +1000 Subject: [PATCH 12/19] Always allow getters to re-check queue empty --- Lib/asyncio/queues.py | 8 ++++---- Lib/test/test_asyncio/test_queues.py | 9 ++++++++- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 174b0f68ec17bc..37f7ec84acd0c2 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -272,10 +272,10 @@ def shutdown(self, immediate=False): self._unfinished_tasks -= 1 if self._unfinished_tasks <= 0: self._finished.set() - while self._getters: - getter = self._getters.popleft() - if not getter.done(): - getter.set_result(None) + while self._getters: + getter = self._getters.popleft() + if not getter.done(): + getter.set_result(None) while self._putters: putter = self._putters.popleft() if not putter.done(): diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 3c25def71121ac..a5fd204c854302 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -551,10 +551,12 @@ async def test_format(self): async def test_shutdown_empty(self): # Test shutting down an empty queue - # Setup empty queue and join() task + # Setup empty queue, and join() and get() tasks q = self.q_class() loop = asyncio.get_running_loop() join_task = loop.create_task(q.join()) + get_task = loop.create_task(q.get()) + await self._ensure_started(get_task) # want pending before shutdown # Perform shut-down q.shutdown(immediate=False) # unfinished tasks: 0 -> 0 @@ -566,6 +568,11 @@ async def test_shutdown_empty(self): self.assertTrue(join_task.done()) await join_task + # Ensure get() task is finished, and raised ShutDown + self.assertTrue(get_task.done()) + with self.assertRaisesShutdown(): + await get_task + # Ensure put() and get() raise ShutDown with self.assertRaisesShutdown(): await q.put("data") From aef4063ebae34ba3511ab83ca568b3262d34619a Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:20:35 +1000 Subject: [PATCH 13/19] Simplify shutdown-check in put and get --- Lib/asyncio/queues.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 37f7ec84acd0c2..23226fd0c0470c 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -130,9 +130,9 @@ async def put(self, item): Raises QueueShutDown if the queue has been shut down. """ - if self._is_shutdown: - raise QueueShutDown while self.full(): + if self._is_shutdown: + raise QueueShutDown putter = self._get_loop().create_future() self._putters.append(putter) try: @@ -151,8 +151,6 @@ async def put(self, item): # the call. Wake up the next in line. self._wakeup_next(self._putters) raise - if self._is_shutdown: - raise QueueShutDown return self.put_nowait(item) def put_nowait(self, item): @@ -179,9 +177,9 @@ async def get(self): Raises QueueShutDown if the queue has been shut down and is empty, or if the queue has been shut down immediately. """ - if self._is_shutdown and self.empty(): - raise QueueShutDown while self.empty(): + if self._is_shutdown and self.empty(): + raise QueueShutDown getter = self._get_loop().create_future() self._getters.append(getter) try: @@ -200,8 +198,6 @@ async def get(self): # the call. Wake up the next in line. self._wakeup_next(self._getters) raise - if self._is_shutdown and self.empty(): - raise QueueShutDown return self.get_nowait() def get_nowait(self): From d49c6dd579a1ad3141dc01882d45b5442c4ff327 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:20:51 +1000 Subject: [PATCH 14/19] Format shutdown docstring --- Lib/asyncio/queues.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index 23226fd0c0470c..b0f4e3d663a68a 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -251,8 +251,7 @@ async def join(self): await self._finished.wait() def shutdown(self, immediate=False): - """Shut-down the queue, making queue gets and puts raise - QueueShutDown. + """Shut-down the queue, making queue gets and puts raise QueueShutDown. By default, gets will only raise once the queue is empty. Set 'immediate' to True to make gets raise immediately instead. From 5a435a63bad1449e1526f2753eb21f2cced373fb Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:21:18 +1000 Subject: [PATCH 15/19] Check for 0 unfinised tasks in shutdown --- Lib/asyncio/queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/queues.py b/Lib/asyncio/queues.py index b0f4e3d663a68a..b8156704b8fc23 100644 --- a/Lib/asyncio/queues.py +++ b/Lib/asyncio/queues.py @@ -265,7 +265,7 @@ def shutdown(self, immediate=False): self._get() if self._unfinished_tasks > 0: self._unfinished_tasks -= 1 - if self._unfinished_tasks <= 0: + if self._unfinished_tasks == 0: self._finished.set() while self._getters: getter = self._getters.popleft() From c8db40ed8c4b3eb5722217b6905db07873b9d535 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:23:25 +1000 Subject: [PATCH 16/19] Use asyncio.sleep to run other tasks --- Lib/test/test_asyncio/test_queues.py | 32 ++++++++-------------------- 1 file changed, 9 insertions(+), 23 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index a5fd204c854302..b490354d435047 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -525,21 +525,6 @@ class PriorityQueueJoinTests(_QueueJoinTestMixin, unittest.IsolatedAsyncioTestCa class _QueueShutdownTestMixin: q_class = None - @staticmethod - async def _ensure_started(task): - # Explicitly start (if not already) task - - async def swallow(x): - try: - return await x - except Exception: - pass - - try: - await asyncio.wait_for(asyncio.shield(swallow(task)), 0.01) - except TimeoutError: - pass - def assertRaisesShutdown(self, msg="Didn't appear to shut-down queue"): return self.assertRaises(asyncio.QueueShutDown, msg=msg) @@ -556,7 +541,7 @@ async def test_shutdown_empty(self): loop = asyncio.get_running_loop() join_task = loop.create_task(q.join()) get_task = loop.create_task(q.get()) - await self._ensure_started(get_task) # want pending before shutdown + await asyncio.sleep(0) # want get task pending before shutdown # Perform shut-down q.shutdown(immediate=False) # unfinished tasks: 0 -> 0 @@ -569,6 +554,7 @@ async def test_shutdown_empty(self): await join_task # Ensure get() task is finished, and raised ShutDown + await asyncio.sleep(0) self.assertTrue(get_task.done()) with self.assertRaisesShutdown(): await get_task @@ -596,7 +582,7 @@ async def test_shutdown_nonempty(self): put_task = loop.create_task(q.put("data2")) # Ensure put() task is not finished - await self._ensure_started(put_task) + await asyncio.sleep(0) self.assertFalse(put_task.done()) # Perform shut-down @@ -605,7 +591,7 @@ async def test_shutdown_nonempty(self): self.assertEqual(q.qsize(), 1) # Ensure put() task is finished, and raised ShutDown - await self._ensure_started(put_task) + await asyncio.sleep(0) self.assertTrue(put_task.done()) with self.assertRaisesShutdown(): await put_task @@ -614,7 +600,7 @@ async def test_shutdown_nonempty(self): self.assertEqual(await q.get(), "data") # Ensure join() task is not finished - await self._ensure_started(join_task) + await asyncio.sleep(0) self.assertFalse(join_task.done()) # Ensure put() and get() raise ShutDown @@ -636,7 +622,7 @@ async def test_shutdown_nonempty(self): q.task_done() # Ensure join() task has successfully finished - await self._ensure_started(join_task) + await asyncio.sleep(0) self.assertTrue(join_task.done()) await join_task @@ -655,7 +641,7 @@ async def test_shutdown_immediate(self): self.assertEqual(q.qsize(), 0) # Ensure join() task has successfully finished - await self._ensure_started(join_task) + await asyncio.sleep(0) self.assertTrue(join_task.done()) await join_task @@ -693,7 +679,7 @@ async def test_shutdown_immediate_with_unfinished(self): self.assertEqual(q.qsize(), 0) # Ensure join() task is not finished - await self._ensure_started(join_task) + await asyncio.sleep(0) self.assertFalse(join_task.done()) # Ensure put() and get() raise ShutDown @@ -715,7 +701,7 @@ async def test_shutdown_immediate_with_unfinished(self): q.task_done() # Ensure join() task has successfully finished - await self._ensure_started(join_task) + await asyncio.sleep(0) self.assertTrue(join_task.done()) await join_task From ca01ee109f1cfe9273217f21304ae7949ad2c5c1 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:23:41 +1000 Subject: [PATCH 17/19] Use public method to shut down queue in format test --- Lib/test/test_asyncio/test_queues.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index b490354d435047..610bd91c7d8c63 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -530,7 +530,7 @@ def assertRaisesShutdown(self, msg="Didn't appear to shut-down queue"): async def test_format(self): q = self.q_class() - q._is_shutdown = True + q.shutdown() self.assertEqual(q._format(), 'maxsize=0 shutdown') async def test_shutdown_empty(self): From b02c4dd3bb5f00e86cccea2b3107aa3d7721208e Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:24:23 +1000 Subject: [PATCH 18/19] Only start queue join after shutdown in test --- Lib/test/test_asyncio/test_queues.py | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index 610bd91c7d8c63..e90befb93e34c5 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -539,7 +539,6 @@ async def test_shutdown_empty(self): # Setup empty queue, and join() and get() tasks q = self.q_class() loop = asyncio.get_running_loop() - join_task = loop.create_task(q.join()) get_task = loop.create_task(q.get()) await asyncio.sleep(0) # want get task pending before shutdown @@ -548,10 +547,8 @@ async def test_shutdown_empty(self): self.assertEqual(q.qsize(), 0) - # Ensure join() task has successfully finished - await self._ensure_started(join_task) - self.assertTrue(join_task.done()) - await join_task + # Ensure join() task successfully finishes + await q.join() # Ensure get() task is finished, and raised ShutDown await asyncio.sleep(0) From 8deca77fd9d12e6f501097d057f31888c170f9b0 Mon Sep 17 00:00:00 2001 From: Laurie O Date: Thu, 4 Apr 2024 11:25:51 +1000 Subject: [PATCH 19/19] Test join before failing task-done --- Lib/test/test_asyncio/test_queues.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/Lib/test/test_asyncio/test_queues.py b/Lib/test/test_asyncio/test_queues.py index e90befb93e34c5..5019e9a293525d 100644 --- a/Lib/test/test_asyncio/test_queues.py +++ b/Lib/test/test_asyncio/test_queues.py @@ -611,18 +611,18 @@ async def test_shutdown_nonempty(self): with self.assertRaisesShutdown(): q.get_nowait() - # Ensure there is 1 unfinished task + # Ensure there is 1 unfinished task, and join() task succeeds q.task_done() - with self.assertRaises( - ValueError, msg="Didn't appear to mark all tasks done" - ): - q.task_done() - # Ensure join() task has successfully finished await asyncio.sleep(0) self.assertTrue(join_task.done()) await join_task + with self.assertRaises( + ValueError, msg="Didn't appear to mark all tasks done" + ): + q.task_done() + async def test_shutdown_immediate(self): # Test immediately shutting down a queue