Skip to content

gh-128041: Add a terminate_workers method to ProcessPoolExecutor #128043

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 36 commits into from
Mar 3, 2025
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
47b162a
gh-128041 - Add a terminate_workers method to ProcessPoolExecutor
csm10495 Dec 17, 2024
6ef8833
📜🤖 Added by blurb_it.
blurb-it[bot] Dec 17, 2024
61c9b14
Fix lint
csm10495 Dec 17, 2024
3bf5464
Swap to SIGTERM as the default
csm10495 Dec 17, 2024
4b285b8
Add some tests
csm10495 Dec 17, 2024
b4939fd
Update some docs
csm10495 Dec 17, 2024
ba6a4c0
Fix docs
csm10495 Dec 17, 2024
5d58e50
Fix docs
csm10495 Dec 17, 2024
0db381b
PR fixes/updates
csm10495 Dec 17, 2024
7ae1685
SIGKILL doesn't exist on windows
csm10495 Dec 17, 2024
f7ad96c
Update Lib/concurrent/futures/process.py
csm10495 Dec 17, 2024
2c0b578
Apply suggestions from code review
csm10495 Dec 18, 2024
a878221
Fix indenting from suggestions
csm10495 Dec 18, 2024
794ee25
Internally call shutdown to prevent a resource leak when calling term…
csm10495 Dec 20, 2024
64693a7
Change test to not validate calling of os.kill since shutdown may cal…
csm10495 Dec 20, 2024
926dff1
Commit to retrigger CI
csm10495 Dec 20, 2024
6d77c10
Merge branch 'python:main' into terminate_workers
csm10495 Feb 25, 2025
4429b2f
PR feedback. Split terminate_workers into terminate_workers and kill_…
csm10495 Feb 25, 2025
b8d6e5f
Remove un-needed imports
csm10495 Feb 25, 2025
f9a7714
lint
csm10495 Feb 25, 2025
7cfa42e
Harden a test a bit to ensure the correct type of kill/terminate was …
csm10495 Feb 25, 2025
2b31fab
rekick ci
csm10495 Feb 25, 2025
ad15ee5
Allow more time for queue to get data back
csm10495 Feb 25, 2025
0f57912
Use subTest to break up tests
csm10495 Feb 26, 2025
c16fde5
Merge branch 'main' into terminate_workers
csm10495 Feb 26, 2025
1bedb28
Apply suggestions from code review
csm10495 Feb 27, 2025
f1b0cf6
PR feedback: swap to dict with constants, better subtest parameteriza…
csm10495 Feb 27, 2025
cc5f359
swap to using context in the test
csm10495 Feb 27, 2025
b3cc8a2
trailing whitespace
csm10495 Feb 27, 2025
dbf9d32
Various pr feedbacks
csm10495 Mar 2, 2025
1e16da6
PR feedback: swap name of terminate_or_kill to force_shutdown
csm10495 Mar 2, 2025
52a5326
PR feedback: swap test names
csm10495 Mar 2, 2025
7f09586
PR feedback: use self.executor_type instead of ProcessPoolExecutor di…
csm10495 Mar 2, 2025
d5f7578
Add constants for terminate/kill methods
csm10495 Mar 2, 2025
0e42eca
feedback to get below 80 chars per pep8
csm10495 Mar 2, 2025
3f55347
Merge branch 'main' into terminate_workers
csm10495 Mar 2, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,18 @@ to a :class:`ProcessPoolExecutor` will result in deadlock.
require the *fork* start method for :class:`ProcessPoolExecutor` you must
explicitly pass ``mp_context=multiprocessing.get_context("fork")``.

.. method:: terminate_workers(signal=signal.SIGTERM)

Attempt to terminate all living worker processes immediately by sending
each of them the given signal. If the signal is not specified, the default
signal :data:`signal.SIGTERM` is used.

After calling this method the caller should no longer submit tasks to the
executor. It is also recommended to still call :meth:`Executor.shutdown`
to ensure that all other resources associated with the executor are freed.

.. versionadded:: next

.. _processpoolexecutor-example:

ProcessPoolExecutor Example
Expand Down
4 changes: 4 additions & 0 deletions Doc/whatsnew/3.14.rst
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,10 @@ concurrent.futures
supplying a *mp_context* to :class:`concurrent.futures.ProcessPoolExecutor`.
(Contributed by Gregory P. Smith in :gh:`84559`.)

* Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
a way to terminate all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)

ctypes
------

Expand Down
29 changes: 29 additions & 0 deletions Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import weakref
from functools import partial
import itertools
import signal
import sys
from traceback import format_exception

Expand Down Expand Up @@ -855,3 +856,31 @@ def shutdown(self, wait=True, *, cancel_futures=False):
self._executor_manager_thread_wakeup = None

shutdown.__doc__ = _base.Executor.shutdown.__doc__

def terminate_workers(self, signal=signal.SIGTERM):
"""Attempts to terminate the executor's workers using the given signal.
Iterates through all of the current processes and sends the given signal if
the process is still alive.

After terminating workers, the pool will be in a broken state and no longer usable.

Args:
signal: The signal to send to each worker process. Defaults to
signal.SIGTERM.
"""
if not self._processes:
return

for pid, proc in self._processes.items():
try:
is_alive = proc.is_alive()
except ValueError:
# The process is already exited/closed out.
is_alive = False

if is_alive:
try:
os.kill(pid, signal)
except ProcessLookupError:
# The process just ended before our signal
pass
73 changes: 73 additions & 0 deletions Lib/test/test_concurrent_futures/test_process_pool.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
import multiprocessing
import os
import queue
import signal
import sys
import threading
import time
import unittest
import unittest.mock
from concurrent import futures
from concurrent.futures.process import BrokenProcessPool

Expand All @@ -22,6 +26,12 @@ def __init__(self, mgr):
def __del__(self):
self.event.set()

def _put_sleep_put(queue):
""" Used as part of test_process_pool_executor_terminate_workers """
queue.put('started')
time.sleep(2)
queue.put('finished')


class ProcessPoolExecutorTest(ExecutorTest):

Expand Down Expand Up @@ -218,6 +228,69 @@ def mock_start_new_thread(func, *args, **kwargs):
list(executor.map(mul, [(2, 3)] * 10))
executor.shutdown()

def test_process_pool_executor_terminate_workers(self):
manager = multiprocessing.Manager()
q = manager.Queue()

with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(_put_sleep_put, q)

# We should get started, but not finished since we'll terminate the workers just after
self.assertEqual(q.get(timeout=1), 'started')

executor.terminate_workers()

self.assertRaises(queue.Empty, q.get, timeout=1)


def test_process_pool_executor_terminate_workers_dead_workers(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(os._exit, 1)
self.assertRaises(BrokenProcessPool, future.result)

# Patching in here instead of at the function level since we only want
# to patch it for this function call, not other parts of the flow.
with unittest.mock.patch('concurrent.futures.process.os.kill') as mock_kill:
executor.terminate_workers()

mock_kill.assert_not_called()

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_not_started_yet(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
# The worker has not been started yet, terminate_workers should basically no-op
executor.terminate_workers()

mock_kill.assert_not_called()

def test_process_pool_executor_terminate_workers_stops_pool(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
executor.submit(time.sleep, 0).result()

executor.terminate_workers()

future = executor.submit(time.sleep, 0)
self.assertRaises(BrokenProcessPool, future.result)

@unittest.mock.patch('concurrent.futures.process.os.kill')
def test_process_pool_executor_terminate_workers_passes_signal(self, mock_kill):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 0)
future.result()

executor.terminate_workers(signal.SIGABRT)

worker_process = list(executor._processes.values())[0]
mock_kill.assert_called_once_with(worker_process.pid, signal.SIGABRT)

def test_process_pool_executor_terminate_workers_passes_even_bad_signals(self):
with futures.ProcessPoolExecutor(max_workers=1) as executor:
future = executor.submit(time.sleep, 0)
future.result()

# 'potatoes' isn't a valid signal, so os.kill will raise a TypeError
self.assertRaises(TypeError, executor.terminate_workers, 'potatoes')


create_executor_tests(globals(), ProcessPoolExecutorTest,
executor_mixins=(ProcessPoolForkMixin,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Add :meth:`concurrent.futures.ProcessPoolExecutor.terminate_workers` as
a way to terminate all living worker processes in the given pool.
(Contributed by Charles Machalow in :gh:`128043`.)
Loading