diff --git a/Lib/multiprocessing/resource_tracker.py b/Lib/multiprocessing/resource_tracker.py index 8e41f461cc934e..02403996bc9bdd 100644 --- a/Lib/multiprocessing/resource_tracker.py +++ b/Lib/multiprocessing/resource_tracker.py @@ -61,6 +61,7 @@ def __init__(self): self._lock = threading.RLock() self._fd = None self._pid = None + self._exitcode = None def _reentrant_call_error(self): # gh-109629: this happens if an explicit call to the ResourceTracker @@ -84,9 +85,16 @@ def _stop(self): os.close(self._fd) self._fd = None - os.waitpid(self._pid, 0) + _, status = os.waitpid(self._pid, 0) + self._pid = None + try: + self._exitcode = os.waitstatus_to_exitcode(status) + except ValueError: + # os.waitstatus_to_exitcode may raise an exception for invalid values + self._exitcode = None + def getfd(self): self.ensure_running() return self._fd @@ -119,6 +127,7 @@ def ensure_running(self): pass self._fd = None self._pid = None + self._exitcode = None warnings.warn('resource_tracker: process died unexpectedly, ' 'relaunching. Some resources might leak.') @@ -221,6 +230,8 @@ def main(fd): pass cache = {rtype: set() for rtype in _CLEANUP_FUNCS.keys()} + exit_code = 0 + try: # keep track of registered/unregistered resources with open(fd, 'rb') as f: @@ -251,6 +262,7 @@ def main(fd): for rtype, rtype_cache in cache.items(): if rtype_cache: try: + exit_code = 1 warnings.warn( f'resource_tracker: There appear to be {len(rtype_cache)} ' f'leaked {rtype} objects to clean up at shutdown: {rtype_cache}' @@ -268,3 +280,5 @@ def main(fd): warnings.warn('resource_tracker: %r: %s' % (name, e)) finally: pass + + sys.exit(exit_code) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index c0d3ca50f17d69..56d116a73ebb58 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -5730,6 +5730,47 @@ def test_too_long_name_resource(self): with self.assertRaises(ValueError): resource_tracker.register(too_long_name_resource, rtype) + def _test_resource_tracker_leak_resources(self, context, delete_queue): + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker.ensure_running() + self.assertTrue(_resource_tracker._check_alive()) + + # Reset exit code value + _resource_tracker._exitcode = None + + mp_context = multiprocessing.get_context(context) + + # Keep it on variable, so it won't be cleared yet + q = mp_context.Queue() + if delete_queue: + # Clearing the queue resource to be sure explicitly with deleting + # and gc.collect + q.close() + del q + gc.collect() + expected_exit_code = 0 + else: + expected_exit_code = 1 + + self.assertIsNone(_resource_tracker._exitcode) + _resource_tracker._stop() + + self.assertEqual(_resource_tracker._exitcode, expected_exit_code) + + def test_resource_tracker_exit_code(self): + """ + Test the exit code of the resource tracker based on if there were left + leaked resources when we stop the process. If not leaked resources were + found, exit code should be 0, otherwise 1 + """ + for context in ["spawn", "forkserver"]: + for delete_queue in [True, False]: + with self.subTest(context=context, delete_queue=delete_queue): + self._test_resource_tracker_leak_resources( + context=context, + delete_queue=delete_queue, + ) + class TestSimpleQueue(unittest.TestCase): diff --git a/Lib/test/test_concurrent_futures/test_init.py b/Lib/test/test_concurrent_futures/test_init.py index ce01e0ff0f287a..d79a6367701fb4 100644 --- a/Lib/test/test_concurrent_futures/test_init.py +++ b/Lib/test/test_concurrent_futures/test_init.py @@ -3,6 +3,7 @@ import queue import time import unittest +import sys from concurrent.futures._base import BrokenExecutor from logging.handlers import QueueHandler @@ -109,6 +110,31 @@ def _assert_logged(self, msg): create_executor_tests(globals(), FailingInitializerMixin) +@unittest.skipIf(sys.platform == "win32", "Resource Tracker doesn't run on Windows") +class FailingInitializerResourcesTest(unittest.TestCase): + """ + Source: https://github.com/python/cpython/issues/104090 + """ + + def _test(self, test_class): + runner = unittest.TextTestRunner() + runner.run(test_class('test_initializer')) + + # GH-104090: + # Stop resource tracker manually now, so we can verify there are not leaked resources by checking + # the process exit code + from multiprocessing.resource_tracker import _resource_tracker + _resource_tracker._stop() + + self.assertEqual(_resource_tracker._exitcode, 0) + + def test_spawn(self): + self._test(ProcessPoolSpawnFailingInitializerTest) + + def test_forkserver(self): + self._test(ProcessPoolForkserverFailingInitializerTest) + + def setUpModule(): setup_module() diff --git a/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst new file mode 100644 index 00000000000000..202026586dce57 --- /dev/null +++ b/Misc/NEWS.d/next/Core and Builtins/2023-07-16-15-02-47.gh-issue-104090.oMjNa9.rst @@ -0,0 +1,2 @@ +The multiprocessing resource tracker now exits with status code 1 if a resource +leak was detected. It still exits with status code 0 otherwise.