Skip to content

GH-82448: Add thread timeout for loop.shutdown_default_executor #97561

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

11 changes: 10 additions & 1 deletion Doc/library/asyncio-eventloop.rst
Original file line number Diff line number Diff line change
Expand Up @@ -180,18 +180,27 @@ Running and stopping the loop

.. versionadded:: 3.6

.. coroutinemethod:: loop.shutdown_default_executor()
.. coroutinemethod:: loop.shutdown_default_executor(timeout=None)

Schedule the closure of the default executor and wait for it to join all of
the threads in the :class:`ThreadPoolExecutor`. After calling this method, a
:exc:`RuntimeError` will be raised if :meth:`loop.run_in_executor` is called
while using the default executor.

The *timeout* parameter specifies the amount of time the executor will
be given to finish joining. The default value is ``None``, which means the
executor will be given an unlimited amount of time.

If the timeout duration is reached, a warning is emitted and executor is
terminated without waiting for its threads to finish joining.

Note that there is no need to call this function when
:func:`asyncio.run` is used.

.. versionadded:: 3.9

.. versionchanged:: 3.12
Added the *timeout* parameter.

Scheduling callbacks
^^^^^^^^^^^^^^^^^^^^
Expand Down
6 changes: 5 additions & 1 deletion Doc/library/asyncio-runner.rst
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Running an asyncio Program

This function runs the passed coroutine, taking care of
managing the asyncio event loop, *finalizing asynchronous
generators*, and closing the threadpool.
generators*, and closing the executor.

This function cannot be called when another asyncio event loop is
running in the same thread.
Expand All @@ -41,6 +41,10 @@ Running an asyncio Program
the end. It should be used as a main entry point for asyncio
programs, and should ideally only be called once.

The executor is given a timeout duration of 5 minutes to shutdown.
If the executor hasn't finished within that duration, a warning is
emitted and the executor is closed.

Example::

async def main():
Expand Down
17 changes: 14 additions & 3 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,13 @@ async def shutdown_asyncgens(self):
'asyncgen': agen
})

async def shutdown_default_executor(self):
"""Schedule the shutdown of the default executor."""
async def shutdown_default_executor(self, timeout=None):
"""Schedule the shutdown of the default executor.

The timeout parameter specifies the amount of time the executor will
be given to finish joining. The default value is None, which means
that the executor will be given an unlimited amount of time.
"""
self._executor_shutdown_called = True
if self._default_executor is None:
return
Expand All @@ -572,7 +577,13 @@ async def shutdown_default_executor(self):
try:
await future
finally:
thread.join()
thread.join(timeout)

if thread.is_alive():
warnings.warn("The executor did not finishing joining "
f"its threads within {timeout} seconds.",
RuntimeWarning, stacklevel=2)
self._default_executor.shutdown(wait=False)

def _do_shutdown(self, future):
try:
Expand Down
3 changes: 3 additions & 0 deletions Lib/asyncio/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
FLOW_CONTROL_HIGH_WATER_SSL_READ = 256 # KiB
FLOW_CONTROL_HIGH_WATER_SSL_WRITE = 512 # KiB

# Default timeout for joining the threads in the threadpool
THREAD_JOIN_TIMEOUT = 300

# The enum should be here to break circular dependencies between
# base_events and sslproto
class _SendfileMode(enum.Enum):
Expand Down
13 changes: 9 additions & 4 deletions Lib/asyncio/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from . import events
from . import exceptions
from . import tasks

from . import constants

class _State(enum.Enum):
CREATED = "created"
Expand Down Expand Up @@ -69,7 +69,8 @@ def close(self):
loop = self._loop
_cancel_all_tasks(loop)
loop.run_until_complete(loop.shutdown_asyncgens())
loop.run_until_complete(loop.shutdown_default_executor())
loop.run_until_complete(
loop.shutdown_default_executor(constants.THREAD_JOIN_TIMEOUT))
finally:
if self._set_event_loop:
events.set_event_loop(None)
Expand Down Expand Up @@ -160,8 +161,8 @@ def run(main, *, debug=None):
"""Execute the coroutine and return the result.

This function runs the passed coroutine, taking care of
managing the asyncio event loop and finalizing asynchronous
generators.
managing the asyncio event loop, finalizing asynchronous
generators and closing the default executor.

This function cannot be called when another asyncio event loop is
running in the same thread.
Expand All @@ -172,6 +173,10 @@ def run(main, *, debug=None):
It should be used as a main entry point for asyncio programs, and should
ideally only be called once.

The executor is given a timeout duration of 5 minutes to shutdown.
If the executor hasn't finished within that duration, a warning is
emitted and the executor is closed.

Example:

async def main():
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add *timeout* parameter to :meth:`asyncio.loop.shutdown_default_executor`.
The default value is ``None``, which means the executor will be given an unlimited amount of time.
When called from :class:`asyncio.Runner` or :func:`asyncio.run`, the default timeout is 5 minutes.