From 673bdb4d35ecccc882f410e9eddf9387946cc19b Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 28 May 2019 11:53:37 +0300 Subject: [PATCH 01/19] Work on --- Lib/asyncio/unix_events.py | 51 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 28128d2977df64..1cff69d94ac707 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1101,6 +1101,57 @@ def _do_waitpid_all(self): callback(pid, returncode, *args) +class AbstractMultiLoopChildWatcher: + + def add_child_handler(self, loop, pid, callback, *args): + raise NotImplementedError() + + def remove_child_handler(self, loop, pid): + raise NotImplementedError() + + def attach_loop(self, loop): + # No-op for smooth transition from loop-attached watchers. + # There is external code that calls watcher.attach_loop() explicitly + pass + + def close(self): + raise NotImplementedError() + + def __enter__(self): + raise NotImplementedError() + + def __exit__(self, a, b, c): + raise NotImplementedError() + + +class MultiLoopChildWatcher(AbstractMultiLoopChildWatcher): + def __init__(self): + self._pids = {} + + def close(self): + raise NotImplementedError() + + def __enter__(self): + return self + + def __exit__(self, a, b, c): + # SafeChildWatcher doesn't close on __exit__ too + pass + + def add_child_handler(self, loop, pid, callback, *args): + raise NotImplementedError() + + def remove_child_handler(self, loop, pid): + raise NotImplementedError() + + def attach_loop(self, loop): + # No-op for smooth transition from loop-attached watchers. + # There is external code that calls watcher.attach_loop() explicitly + pass + + + + class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): """UNIX event loop policy with a watcher for child processes.""" _loop_factory = _UnixSelectorEventLoop From 2fc273787d884ad63ba17c74550a6152b0def418 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Tue, 28 May 2019 19:04:12 +0300 Subject: [PATCH 02/19] Support subprocesses in threads --- Lib/asyncio/unix_events.py | 160 +++++++++++++----- Lib/test/test_asyncio/test_unix_events.py | 14 +- .../2019-05-28-19-03-46.bpo-35621.Abc1lf.rst | 2 + 3 files changed, 124 insertions(+), 52 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 1cff69d94ac707..e65acd7b30497c 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -11,6 +11,7 @@ import sys import threading import warnings +import weakref from . import base_events @@ -29,7 +30,8 @@ __all__ = ( 'SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', - 'FastChildWatcher', 'DefaultEventLoopPolicy', + 'FastChildWatcher', 'MultiLoopChildWatcher', + 'DefaultEventLoopPolicy', ) @@ -849,6 +851,20 @@ def __exit__(self, a, b, c): raise NotImplementedError() +def _compute_returncode(status): + if os.WIFSIGNALED(status): + # The child process died because of a signal. + return -os.WTERMSIG(status) + elif os.WIFEXITED(status): + # The child process exited (e.g sys.exit()). + return os.WEXITSTATUS(status) + else: + # The child exited, but we don't understand its status. + # This shouldn't happen, but if it does, let's just + # return that status; perhaps that helps debug it. + return status + + class BaseChildWatcher(AbstractChildWatcher): def __init__(self): @@ -899,17 +915,8 @@ def _sig_chld(self): }) def _compute_returncode(self, status): - if os.WIFSIGNALED(status): - # The child process died because of a signal. - return -os.WTERMSIG(status) - elif os.WIFEXITED(status): - # The child process exited (e.g sys.exit()). - return os.WEXITSTATUS(status) - else: - # The child exited, but we don't understand its status. - # This shouldn't happen, but if it does, let's just - # return that status; perhaps that helps debug it. - return status + # Keep the method for backward compatibility + return _compute_returncode(status) class SafeChildWatcher(BaseChildWatcher): @@ -1101,35 +1108,23 @@ def _do_waitpid_all(self): callback(pid, returncode, *args) -class AbstractMultiLoopChildWatcher: - - def add_child_handler(self, loop, pid, callback, *args): - raise NotImplementedError() - - def remove_child_handler(self, loop, pid): - raise NotImplementedError() - - def attach_loop(self, loop): - # No-op for smooth transition from loop-attached watchers. - # There is external code that calls watcher.attach_loop() explicitly - pass - - def close(self): - raise NotImplementedError() - - def __enter__(self): - raise NotImplementedError() +class MultiLoopChildWatcher(AbstractChildWatcher): + # The class keeps compatibility with AbstractChildWatcher ABC + # To achieve this it has empty attach_loop() method + # and don't accept explicit loop argument + # for add_child_handler()/remove_child_handler() + # but retrieves the current loop by get_running_loop() - def __exit__(self, a, b, c): - raise NotImplementedError() - - -class MultiLoopChildWatcher(AbstractMultiLoopChildWatcher): def __init__(self): - self._pids = {} + self._callbacks = {} + self._saved_sighandler = None def close(self): - raise NotImplementedError() + self._callbacks.clear() + if self._saved_sighandler is not None: + signal.signal(signal.SIGCHLD, self._saved_sighandler) + self._saved_sighandler = None + super().close() def __enter__(self): return self @@ -1138,17 +1133,90 @@ def __exit__(self, a, b, c): # SafeChildWatcher doesn't close on __exit__ too pass - def add_child_handler(self, loop, pid, callback, *args): - raise NotImplementedError() + def add_child_handler(self, pid, callback, *args): + loop = tasks.get_running_loop() + self._callbacks[pid] = (weakref.ref(loop), callback, args) - def remove_child_handler(self, loop, pid): - raise NotImplementedError() + # Prevent a race condition in case the child is already terminated. + self._do_waitpid(pid) + + def remove_child_handler(self, pid): + try: + del self._callbacks[pid] + return True + except KeyError: + return False def attach_loop(self, loop): - # No-op for smooth transition from loop-attached watchers. - # There is external code that calls watcher.attach_loop() explicitly - pass + # Don't attach the loop but initialize itself if called first time + # The reason to do it here is that attach_loop() is called from + # unix policy only for the main thread. + # Main thread is required for subscription on SIGCHLD signal + if self._saved_sighandler is None: + self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) + + # Set SA_RESTART to limit EINTR occurrences. + signal.siginterrupt(signal.SIGCHLD, False) + def _do_waitpid_all(self): + for pid in list(self._callbacks): + self._do_waitpid(pid) + + def _do_waitpid(self, expected_pid): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, os.WNOHANG) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + debug_log = False + else: + if pid == 0: + # The child process is still alive. + return + + returncode = _compute_returncode(status) + debug_log = True + if self._loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + + try: + loop_wr, callback, args = self._callbacks.pop(pid) + except KeyError: # pragma: no cover + # May happen if .remove_child_handler() is called + # after os.waitpid() returns. + if self._loop.get_debug(): + logger.warning("Child watcher got an unexpected pid: %r", + pid, exc_info=True) + else: + loop = loop_wr() + if loop is None: + logger.warning("Loop that handles pid %r is gone", pid) + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is gone", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) + + def _sig_chld(self, signum, frame): + try: + self._do_waitpid_all() + except (SystemExit, KeyboardInterrupt): + raise + except BaseException as exc: + # self._loop should always be available here + # as '_sig_chld' is added as a signal handler + # in 'attach_loop' + self._loop.call_exception_handler({ + 'message': 'Unknown exception in SIGCHLD handler', + 'exception': exc, + }) @@ -1163,7 +1231,7 @@ def __init__(self): def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = SafeChildWatcher() + self._watcher = MultiLoopChildWatcher() if isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(self._local._loop) @@ -1185,7 +1253,7 @@ def set_event_loop(self, loop): def get_child_watcher(self): """Get the watcher for child processes. - If not yet set, a SafeChildWatcher object is automatically created. + If not yet set, a MultiLoopChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index ac84304ec99da6..f2b6cf54a7e582 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1814,12 +1814,11 @@ def test_get_child_watcher(self): self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) + self.assertIsInstance(watcher, asyncio.MultiLoopChildWatcher) self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) - self.assertIsNone(watcher._loop) def test_get_child_watcher_after_set(self): policy = self.create_policy() @@ -1836,8 +1835,7 @@ def test_get_child_watcher_with_mainloop_existing(self): self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) - self.assertIs(watcher._loop, loop) + self.assertIsInstance(watcher, asyncio.MultiLoopChildWatcher) loop.close() @@ -1863,9 +1861,13 @@ def f(): def test_child_watcher_replace_mainloop_existing(self): policy = self.create_policy() - loop = policy.get_event_loop() - watcher = policy.get_child_watcher() + # Set SafeChildWatcher to test attach_loop functionality + # Default MultiLoopChildWatcher doesn't have _loop attribute at all + watcher = asyncio.SafeChildWatcher() + policy.set_child_watcher(watcher) + + loop = policy.get_event_loop() self.assertIs(watcher._loop, loop) diff --git a/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst new file mode 100644 index 00000000000000..c492e1de6d5c42 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2019-05-28-19-03-46.bpo-35621.Abc1lf.rst @@ -0,0 +1,2 @@ +Support running asyncio subprocesses when execution event loop in a thread +on UNIX. From 75e0495136ac5c0473e3889e5c13d6d44aaaa155 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Wed, 29 May 2019 18:42:30 +0300 Subject: [PATCH 03/19] Fix typo in comment --- Lib/asyncio/unix_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index e65acd7b30497c..d9ff0e5d7874d8 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1111,7 +1111,7 @@ def _do_waitpid_all(self): class MultiLoopChildWatcher(AbstractChildWatcher): # The class keeps compatibility with AbstractChildWatcher ABC # To achieve this it has empty attach_loop() method - # and don't accept explicit loop argument + # and doesn't accept explicit loop argument # for add_child_handler()/remove_child_handler() # but retrieves the current loop by get_running_loop() From 2d2db05229a724491b3c986c5e0f5364ce08a7da Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 30 May 2019 18:51:29 +0300 Subject: [PATCH 04/19] Init MultiLoopChildWatcher in UnixEventLoopPolicy constructor --- Lib/asyncio/events.py | 2 +- Lib/asyncio/unix_events.py | 5 ++++- Lib/test/test_asyncio/test_subprocess.py | 2 +- 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index d381b1c596239c..40af5e8930aea1 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -663,7 +663,7 @@ def new_event_loop(self): _event_loop_policy = None # Lock for protecting the on-the-fly creation of the event loop policy. -_lock = threading.Lock() +_lock = threading.RLock() # A TLS for the running event loop, used by _get_running_loop. diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index d9ff0e5d7874d8..4942173496965c 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1122,9 +1122,11 @@ def __init__(self): def close(self): self._callbacks.clear() if self._saved_sighandler is not None: + handler = signal.getsignal(signal.SIGCHLD) + if handler != self._sig_chld: + raise RuntimeError("SIGCHLD handler was changed by outside code") signal.signal(signal.SIGCHLD, self._saved_sighandler) self._saved_sighandler = None - super().close() def __enter__(self): return self @@ -1227,6 +1229,7 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def __init__(self): super().__init__() self._watcher = None + self._init_watcher() def _init_watcher(self): with events._lock: diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 7d72e6cde4e7a8..ed98ab414f7583 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -646,8 +646,8 @@ def setUp(self): self.set_event_loop(self.loop) watcher = self.Watcher() - watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) + watcher.attach_loop(self.loop) self.addCleanup(policy.set_child_watcher, None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, From e2f59e8e90470f231e1efd4451e019cd0f6f8378 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Thu, 30 May 2019 21:42:49 +0300 Subject: [PATCH 05/19] Revert tests change --- Lib/asyncio/events.py | 2 +- Lib/asyncio/unix_events.py | 12 +++++++----- Lib/test/test_asyncio/test_subprocess.py | 2 +- Lib/test/test_asyncio/test_unix_events.py | 14 ++++++-------- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/Lib/asyncio/events.py b/Lib/asyncio/events.py index 40af5e8930aea1..d381b1c596239c 100644 --- a/Lib/asyncio/events.py +++ b/Lib/asyncio/events.py @@ -663,7 +663,7 @@ def new_event_loop(self): _event_loop_policy = None # Lock for protecting the on-the-fly creation of the event loop policy. -_lock = threading.RLock() +_lock = threading.Lock() # A TLS for the running event loop, used by _get_running_loop. diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 4942173496965c..24b0189e5a0cf2 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1124,8 +1124,11 @@ def close(self): if self._saved_sighandler is not None: handler = signal.getsignal(signal.SIGCHLD) if handler != self._sig_chld: - raise RuntimeError("SIGCHLD handler was changed by outside code") - signal.signal(signal.SIGCHLD, self._saved_sighandler) + warnings.warn("SIGCHLD handler was changed by outside code", + ResourceWarning, + source=self) + else: + signal.signal(signal.SIGCHLD, self._saved_sighandler) self._saved_sighandler = None def __enter__(self): @@ -1229,12 +1232,11 @@ class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): def __init__(self): super().__init__() self._watcher = None - self._init_watcher() def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = MultiLoopChildWatcher() + self._watcher = SafeChildWatcher() if isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(self._local._loop) @@ -1256,7 +1258,7 @@ def set_event_loop(self, loop): def get_child_watcher(self): """Get the watcher for child processes. - If not yet set, a MultiLoopChildWatcher object is automatically created. + If not yet set, a SafeChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ed98ab414f7583..7d72e6cde4e7a8 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -646,8 +646,8 @@ def setUp(self): self.set_event_loop(self.loop) watcher = self.Watcher() - policy.set_child_watcher(watcher) watcher.attach_loop(self.loop) + policy.set_child_watcher(watcher) self.addCleanup(policy.set_child_watcher, None) class SubprocessSafeWatcherTests(SubprocessWatcherMixin, diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index f2b6cf54a7e582..ac84304ec99da6 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1814,11 +1814,12 @@ def test_get_child_watcher(self): self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.MultiLoopChildWatcher) + self.assertIsInstance(watcher, asyncio.SafeChildWatcher) self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) + self.assertIsNone(watcher._loop) def test_get_child_watcher_after_set(self): policy = self.create_policy() @@ -1835,7 +1836,8 @@ def test_get_child_watcher_with_mainloop_existing(self): self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.MultiLoopChildWatcher) + self.assertIsInstance(watcher, asyncio.SafeChildWatcher) + self.assertIs(watcher._loop, loop) loop.close() @@ -1861,14 +1863,10 @@ def f(): def test_child_watcher_replace_mainloop_existing(self): policy = self.create_policy() - - # Set SafeChildWatcher to test attach_loop functionality - # Default MultiLoopChildWatcher doesn't have _loop attribute at all - watcher = asyncio.SafeChildWatcher() - policy.set_child_watcher(watcher) - loop = policy.get_event_loop() + watcher = policy.get_child_watcher() + self.assertIs(watcher._loop, loop) new_loop = policy.new_event_loop() From eaea9f9b8243806edaff59ccbb0dd1e09757d3e7 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 31 May 2019 01:21:06 +0300 Subject: [PATCH 06/19] Work on --- Lib/asyncio/unix_events.py | 27 ++++++++++++++++++++++----- 1 file changed, 22 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 24b0189e5a0cf2..0bc55512ec5ea7 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -840,6 +840,15 @@ def close(self): """ raise NotImplementedError() + def is_active(self): + """Watcher status. + + Return True if the watcher is installed and ready to handle process exit + notifications. + + """ + raise NotImplementedError() + def __enter__(self): """Enter the watcher's context and allow starting new processes @@ -874,6 +883,9 @@ def __init__(self): def close(self): self.attach_loop(None) + def is_active(self): + return self._loop is not None + def _do_waitpid(self, expected_pid): raise NotImplementedError() @@ -1119,17 +1131,18 @@ def __init__(self): self._callbacks = {} self._saved_sighandler = None + def is_active(self): + return self._saved_sighandler is not None + def close(self): self._callbacks.clear() - if self._saved_sighandler is not None: + if self._saved_sighandler is not _SENTINEL: handler = signal.getsignal(signal.SIGCHLD) if handler != self._sig_chld: - warnings.warn("SIGCHLD handler was changed by outside code", - ResourceWarning, - source=self) + logger.warning("SIGCHLD handler was changed by outside code") else: signal.signal(signal.SIGCHLD, self._saved_sighandler) - self._saved_sighandler = None + self._saved_sighandler = _SENTINEL def __enter__(self): return self @@ -1159,6 +1172,10 @@ def attach_loop(self, loop): # Main thread is required for subscription on SIGCHLD signal if self._saved_sighandler is None: self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld) + if self._saved_sighandler is None: + logger.warning("Previous SIGCHLD handler was set by non-Python code, " + "restore to default handler on watcher close.") + self._saved_sighandler = signal.SIG_DFL # Set SA_RESTART to limit EINTR occurrences. signal.siginterrupt(signal.SIGCHLD, False) From 798a7fbba55426b2d3080b08ae9345372829dbaa Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 31 May 2019 18:14:52 +0300 Subject: [PATCH 07/19] Add ThreadedChildWatcher and make it default --- Lib/asyncio/unix_events.py | 106 ++++++++++++++++------ Lib/test/test_asyncio/test_unix_events.py | 23 ++--- 2 files changed, 85 insertions(+), 44 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 0bc55512ec5ea7..5deac85cd44d4b 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -2,6 +2,7 @@ import errno import io +import itertools import os import selectors import signal @@ -11,7 +12,6 @@ import sys import threading import warnings -import weakref from . import base_events @@ -30,7 +30,8 @@ __all__ = ( 'SelectorEventLoop', 'AbstractChildWatcher', 'SafeChildWatcher', - 'FastChildWatcher', 'MultiLoopChildWatcher', + 'FastChildWatcher', + 'MultiLoopChildWatcher', 'ThreadedChildWatcher', 'DefaultEventLoopPolicy', ) @@ -1136,24 +1137,23 @@ def is_active(self): def close(self): self._callbacks.clear() - if self._saved_sighandler is not _SENTINEL: + if self._saved_sighandler is not None: handler = signal.getsignal(signal.SIGCHLD) if handler != self._sig_chld: logger.warning("SIGCHLD handler was changed by outside code") else: signal.signal(signal.SIGCHLD, self._saved_sighandler) - self._saved_sighandler = _SENTINEL + self._saved_sighandler = None def __enter__(self): return self - def __exit__(self, a, b, c): - # SafeChildWatcher doesn't close on __exit__ too + def __exit__(self, ecx_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): loop = tasks.get_running_loop() - self._callbacks[pid] = (weakref.ref(loop), callback, args) + self._callbacks[pid] = (loop, callback, args) # Prevent a race condition in case the child is already terminated. self._do_waitpid(pid) @@ -1205,25 +1205,20 @@ def _do_waitpid(self, expected_pid): returncode = _compute_returncode(status) debug_log = True - if self._loop.get_debug(): - logger.debug('process %s exited with returncode %s', - expected_pid, returncode) - try: - loop_wr, callback, args = self._callbacks.pop(pid) + loop, callback, args = self._callbacks.pop(pid) except KeyError: # pragma: no cover # May happen if .remove_child_handler() is called # after os.waitpid() returns. - if self._loop.get_debug(): - logger.warning("Child watcher got an unexpected pid: %r", - pid, exc_info=True) + logger.warning("Child watcher got an unexpected pid: %r", + pid, exc_info=True) else: - loop = loop_wr() - if loop is None: - logger.warning("Loop that handles pid %r is gone", pid) if loop.is_closed(): logger.warning("Loop %r that handles pid %r is gone", loop, pid) else: + if debug_log and loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) loop.call_soon_threadsafe(callback, pid, returncode, *args) def _sig_chld(self, signum, frame): @@ -1231,15 +1226,70 @@ def _sig_chld(self, signum, frame): self._do_waitpid_all() except (SystemExit, KeyboardInterrupt): raise - except BaseException as exc: - # self._loop should always be available here - # as '_sig_chld' is added as a signal handler - # in 'attach_loop' - self._loop.call_exception_handler({ - 'message': 'Unknown exception in SIGCHLD handler', - 'exception': exc, - }) + except BaseException: + logger.warning('Unknown exception in SIGCHLD handler', exc_info=True) + + +class ThreadedChildWatcher(AbstractChildWatcher): + # The watcher uses a thread per process + # for waiting for the process finish. + # It doesn't require subscription on POSIX signal + + def __init__(self): + self._pid_counter = itertools.count(0) + + def is_active(self): + return True + + def close(self): + pass + + def __enter__(self): + return self + def __exit__(self, ecx_type, exc_val, exc_tb): + pass + + def add_child_handler(self, pid, callback, *args): + loop = tasks.get_running_loop() + thread = threading.Thread(target=self._do_waitpid, + name=f"waitpid-{next(self._pid_counter)}", + args=(loop, pid, callback, args), + daemon=True) + thread.start() + + def remove_child_handler(self, pid): + # asyncio never calls remove_child_handler() !!! + # The method is no-op but is implemented because + # abstract base classe requires it + return True + + def attach_loop(self, loop): + pass + + def _do_waitpid(self, loop, expected_pid, callback, args): + assert expected_pid > 0 + + try: + pid, status = os.waitpid(expected_pid, 0) + except ChildProcessError: + # The child process is already reaped + # (may happen if waitpid() is called elsewhere). + pid = expected_pid + returncode = 255 + logger.warning( + "Unknown child process pid %d, will report returncode 255", + pid) + else: + returncode = _compute_returncode(status) + if loop.get_debug(): + logger.debug('process %s exited with returncode %s', + expected_pid, returncode) + + if loop.is_closed(): + logger.warning("Loop %r that handles pid %r is gone", loop, pid) + else: + loop.call_soon_threadsafe(callback, pid, returncode, *args) class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy): @@ -1253,7 +1303,7 @@ def __init__(self): def _init_watcher(self): with events._lock: if self._watcher is None: # pragma: no branch - self._watcher = SafeChildWatcher() + self._watcher = ThreadedChildWatcher() if isinstance(threading.current_thread(), threading._MainThread): self._watcher.attach_loop(self._local._loop) @@ -1275,7 +1325,7 @@ def set_event_loop(self, loop): def get_child_watcher(self): """Get the watcher for child processes. - If not yet set, a SafeChildWatcher object is automatically created. + If not yet set, a ThreadedChildWatcher object is automatically created. """ if self._watcher is None: self._init_watcher() diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index ac84304ec99da6..99f646cd358a83 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1809,17 +1809,16 @@ class PolicyTests(unittest.TestCase): def create_policy(self): return asyncio.DefaultEventLoopPolicy() - def test_get_child_watcher(self): + def test_get_default_child_watcher(self): policy = self.create_policy() self.assertIsNone(policy._watcher) watcher = policy.get_child_watcher() - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) + self.assertIsInstance(watcher, asyncio.ThreadedChildWatcher) self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) - self.assertIsNone(watcher._loop) def test_get_child_watcher_after_set(self): policy = self.create_policy() @@ -1829,18 +1828,6 @@ def test_get_child_watcher_after_set(self): self.assertIs(policy._watcher, watcher) self.assertIs(watcher, policy.get_child_watcher()) - def test_get_child_watcher_with_mainloop_existing(self): - policy = self.create_policy() - loop = policy.get_event_loop() - - self.assertIsNone(policy._watcher) - watcher = policy.get_child_watcher() - - self.assertIsInstance(watcher, asyncio.SafeChildWatcher) - self.assertIs(watcher._loop, loop) - - loop.close() - def test_get_child_watcher_thread(self): def f(): @@ -1865,7 +1852,11 @@ def test_child_watcher_replace_mainloop_existing(self): policy = self.create_policy() loop = policy.get_event_loop() - watcher = policy.get_child_watcher() + # Explicitly setup SafeChildWatcher, + # default ThreadedChildWatcher has no _loop property + watcher = asyncio.SafeChildWatcher() + policy.set_child_watcher(watcher) + watcher.attach_loop(loop) self.assertIs(watcher._loop, loop) From 7ba175e0b4939a86a1de2665bcd6d80bbe40ef5b Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 31 May 2019 19:39:48 +0300 Subject: [PATCH 08/19] Add check for is_active() --- Lib/asyncio/unix_events.py | 3 +++ Lib/test/test_asyncio/test_unix_events.py | 2 ++ 2 files changed, 5 insertions(+) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 5deac85cd44d4b..707acb49bd77c7 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -187,6 +187,9 @@ async def _make_subprocess_transport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, extra=None, **kwargs): with events.get_child_watcher() as watcher: + if not watcher.is_active(): + raise RuntimeError("asyncio.get_child_watcher() is not ready, " + "subproccess support is not installed.") waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, stdin, stdout, stderr, bufsize, diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index 99f646cd358a83..f67775644e03fb 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1082,6 +1082,8 @@ def test_not_implemented(self): NotImplementedError, watcher.attach_loop, f) self.assertRaises( NotImplementedError, watcher.close) + self.assertRaises( + NotImplementedError, watcher.is_active) self.assertRaises( NotImplementedError, watcher.__enter__) self.assertRaises( From 4ed22baccbabf10c3e0bd0ffb214cbf6ba44e094 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Fri, 31 May 2019 20:42:34 +0300 Subject: [PATCH 09/19] Add new watcher classes to test suite --- Lib/asyncio/unix_events.py | 4 ++-- Lib/test/test_asyncio/test_subprocess.py | 10 ++++++++++ 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 707acb49bd77c7..5b0ab931011dc5 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1155,7 +1155,7 @@ def __exit__(self, ecx_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): - loop = tasks.get_running_loop() + loop = events.get_running_loop() self._callbacks[pid] = (loop, callback, args) # Prevent a race condition in case the child is already terminated. @@ -1254,7 +1254,7 @@ def __exit__(self, ecx_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): - loop = tasks.get_running_loop() + loop = events.get_running_loop() thread = threading.Thread(target=self._do_waitpid, name=f"waitpid-{next(self._pid_counter)}", args=(loop, pid, callback, args), diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index 7d72e6cde4e7a8..b0379498825f3f 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -650,6 +650,16 @@ def setUp(self): policy.set_child_watcher(watcher) self.addCleanup(policy.set_child_watcher, None) + class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, + test_utils.TestCase): + + Watcher = unix_events.ThreadedChildWatcher + + class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin, + test_utils.TestCase): + + Watcher = unix_events.MultiLoopChildWatcher + class SubprocessSafeWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): From 4c464e79f73e600def3b6d780e62d65013e9a92f Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 1 Jun 2019 23:16:51 +0300 Subject: [PATCH 10/19] Correctly close watcher in tests --- Lib/test/test_asyncio/test_subprocess.py | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index b0379498825f3f..ed5d8579e02cdb 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -648,7 +648,14 @@ def setUp(self): watcher = self.Watcher() watcher.attach_loop(self.loop) policy.set_child_watcher(watcher) - self.addCleanup(policy.set_child_watcher, None) + + def tearDown(self): + super().setUp() + policy = asyncio.get_event_loop_policy() + watcher = policy.get_child_watcher() + policy.set_child_watcher(None) + watcher.attach_loop(None) + watcher.close() class SubprocessThreadedWatcherTests(SubprocessWatcherMixin, test_utils.TestCase): From c07e3f3b46a21c20b1785f5d29d61ca6b22f4d1d Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sat, 1 Jun 2019 23:31:41 +0300 Subject: [PATCH 11/19] Clarify text --- Lib/asyncio/unix_events.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 5b0ab931011dc5..93d79ca9f655b7 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -188,7 +188,12 @@ async def _make_subprocess_transport(self, protocol, args, shell, extra=None, **kwargs): with events.get_child_watcher() as watcher: if not watcher.is_active(): - raise RuntimeError("asyncio.get_child_watcher() is not ready, " + # Check early. + # Raising exception before process creation + # prevents subprocess execution if the watcher + # canoot handle it (add_child_handler() fails with exception + # if watcher.is_active() returns False). + raise RuntimeError("asyncio.get_child_watcher() is not activated, " "subproccess support is not installed.") waiter = self.create_future() transp = _UnixSubprocessTransport(self, protocol, args, shell, @@ -1155,6 +1160,10 @@ def __exit__(self, ecx_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): + if self._saved_sighandler is None: + raise RuntimeError( + "Cannot add child handler, " + "the child watcher is not activated (attach_loop() was not called)") loop = events.get_running_loop() self._callbacks[pid] = (loop, callback, args) From 03fa4b91bb51a8591bf645dd033b4ca00f12c0c1 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 11:25:16 +0300 Subject: [PATCH 12/19] Make checkers stricter --- Lib/asyncio/unix_events.py | 12 +----------- Lib/test/test_asyncio/test_unix_events.py | 9 --------- 2 files changed, 1 insertion(+), 20 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 93d79ca9f655b7..8b335ee65d61c2 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -893,7 +893,7 @@ def close(self): self.attach_loop(None) def is_active(self): - return self._loop is not None + return self._loop is not None and self._loop.is_running() def _do_waitpid(self, expected_pid): raise NotImplementedError() @@ -962,11 +962,6 @@ def __exit__(self, a, b, c): pass def add_child_handler(self, pid, callback, *args): - if self._loop is None: - raise RuntimeError( - "Cannot add child handler, " - "the child watcher does not have a loop attached") - self._callbacks[pid] = (callback, args) # Prevent a race condition in case the child is already terminated. @@ -1063,11 +1058,6 @@ def __exit__(self, a, b, c): def add_child_handler(self, pid, callback, *args): assert self._forks, "Must use the context manager" - if self._loop is None: - raise RuntimeError( - "Cannot add child handler, " - "the child watcher does not have a loop attached") - with self._lock: try: returncode = self._zombies.pop(pid) diff --git a/Lib/test/test_asyncio/test_unix_events.py b/Lib/test/test_asyncio/test_unix_events.py index f67775644e03fb..f7f992fcea49f9 100644 --- a/Lib/test/test_asyncio/test_unix_events.py +++ b/Lib/test/test_asyncio/test_unix_events.py @@ -1786,15 +1786,6 @@ def test_close(self, m): if isinstance(self.watcher, asyncio.FastChildWatcher): self.assertFalse(self.watcher._zombies) - @waitpid_mocks - def test_add_child_handler_with_no_loop_attached(self, m): - callback = mock.Mock() - with self.create_watcher() as watcher: - with self.assertRaisesRegex( - RuntimeError, - 'the child watcher does not have a loop attached'): - watcher.add_child_handler(100, callback) - class SafeChildWatcherTests (ChildWatcherTestsMixin, test_utils.TestCase): def create_watcher(self): From fd56fc7628b36f9f631a1cef2bd181384c91c4b6 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 11:53:45 +0300 Subject: [PATCH 13/19] Add specific test for non-active watcher --- Lib/test/test_asyncio/test_subprocess.py | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/Lib/test/test_asyncio/test_subprocess.py b/Lib/test/test_asyncio/test_subprocess.py index ed5d8579e02cdb..582e1720246033 100644 --- a/Lib/test/test_asyncio/test_subprocess.py +++ b/Lib/test/test_asyncio/test_subprocess.py @@ -633,6 +633,7 @@ async def execute(): self.assertIsNone(self.loop.run_until_complete(execute())) + if sys.platform != 'win32': # Unix class SubprocessWatcherMixin(SubprocessMixin): @@ -687,5 +688,25 @@ def setUp(self): self.set_event_loop(self.loop) +class GenericWatcherTests: + + def test_create_subprocess_fails_with_inactive_watcher(self): + + async def execute(): + watcher = mock.create_authspec(asyncio.AbstractChildWatcher) + watcher.is_active.return_value = False + asyncio.set_child_watcher(watcher) + + with self.assertRaises(RuntimeError): + await subprocess.create_subprocess_exec( + support.FakePath(sys.executable), '-c', 'pass') + + watcher.add_child_handler.assert_not_called() + + self.assertIsNone(self.loop.run_until_complete(execute())) + + + + if __name__ == '__main__': unittest.main() From e7805a6b9a22e068396e3f021feefcbb9b050b03 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 12:15:49 +0300 Subject: [PATCH 14/19] Fix typo --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 8b335ee65d61c2..47229ee60a6ad7 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1146,7 +1146,7 @@ def close(self): def __enter__(self): return self - def __exit__(self, ecx_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): @@ -1249,7 +1249,7 @@ def close(self): def __enter__(self): return self - def __exit__(self, ecx_type, exc_val, exc_tb): + def __exit__(self, exc_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): From e849af50c675def6cf910c7fb9d3a81262a4f62c Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 12:42:08 +0300 Subject: [PATCH 15/19] Fix comment --- Lib/asyncio/unix_events.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 47229ee60a6ad7..0b79837e6c3291 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -191,8 +191,7 @@ async def _make_subprocess_transport(self, protocol, args, shell, # Check early. # Raising exception before process creation # prevents subprocess execution if the watcher - # canoot handle it (add_child_handler() fails with exception - # if watcher.is_active() returns False). + # is not ready to handle it. raise RuntimeError("asyncio.get_child_watcher() is not activated, " "subproccess support is not installed.") waiter = self.create_future() From ae7f11dd2b8c9c7d7cf53ab22ca101544b42aed3 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 12:43:33 +0300 Subject: [PATCH 16/19] Drop redundant check --- Lib/asyncio/unix_events.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 0b79837e6c3291..d4f3cf5bf65f56 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1149,10 +1149,6 @@ def __exit__(self, exc_type, exc_val, exc_tb): pass def add_child_handler(self, pid, callback, *args): - if self._saved_sighandler is None: - raise RuntimeError( - "Cannot add child handler, " - "the child watcher is not activated (attach_loop() was not called)") loop = events.get_running_loop() self._callbacks[pid] = (loop, callback, args) From 6fb36f2be4025cdfe1727033e866a8e4e2d39f71 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 12:45:23 +0300 Subject: [PATCH 17/19] Fix comment --- Lib/asyncio/unix_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index d4f3cf5bf65f56..d9035c2db9b5ff 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1163,7 +1163,7 @@ def remove_child_handler(self, pid): return False def attach_loop(self, loop): - # Don't attach the loop but initialize itself if called first time + # Don't save the loop but initialize itself if called first time # The reason to do it here is that attach_loop() is called from # unix policy only for the main thread. # Main thread is required for subscription on SIGCHLD signal From d071f19d49f6add662da3b60ecf877316bc99920 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 13:04:47 +0300 Subject: [PATCH 18/19] Code cleanup --- Lib/asyncio/unix_events.py | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index d9035c2db9b5ff..11175717b72325 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -934,10 +934,6 @@ def _sig_chld(self): 'exception': exc, }) - def _compute_returncode(self, status): - # Keep the method for backward compatibility - return _compute_returncode(status) - class SafeChildWatcher(BaseChildWatcher): """'Safe' child watcher implementation. @@ -996,7 +992,7 @@ def _do_waitpid(self, expected_pid): # The child process is still alive. return - returncode = self._compute_returncode(status) + returncode = _compute_returncode(status) if self._loop.get_debug(): logger.debug('process %s exited with returncode %s', expected_pid, returncode) @@ -1089,7 +1085,7 @@ def _do_waitpid_all(self): # A child process is still alive. return - returncode = self._compute_returncode(status) + returncode = _compute_returncode(status) with self._lock: try: From de56eb9c4b405fadcc33710341f55c1b397643f2 Mon Sep 17 00:00:00 2001 From: Andrew Svetlov Date: Sun, 2 Jun 2019 13:23:58 +0300 Subject: [PATCH 19/19] Improve logging message --- Lib/asyncio/unix_events.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/asyncio/unix_events.py b/Lib/asyncio/unix_events.py index 11175717b72325..6714542e4e3361 100644 --- a/Lib/asyncio/unix_events.py +++ b/Lib/asyncio/unix_events.py @@ -1207,7 +1207,7 @@ def _do_waitpid(self, expected_pid): pid, exc_info=True) else: if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is gone", loop, pid) + logger.warning("Loop %r that handles pid %r is closed", loop, pid) else: if debug_log and loop.get_debug(): logger.debug('process %s exited with returncode %s', @@ -1280,7 +1280,7 @@ def _do_waitpid(self, loop, expected_pid, callback, args): expected_pid, returncode) if loop.is_closed(): - logger.warning("Loop %r that handles pid %r is gone", loop, pid) + logger.warning("Loop %r that handles pid %r is closed", loop, pid) else: loop.call_soon_threadsafe(callback, pid, returncode, *args)