Skip to content

Commit 13ed079

Browse files
authored
bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)
1 parent c529967 commit 13ed079

File tree

4 files changed

+262
-54
lines changed

4 files changed

+262
-54
lines changed

Lib/asyncio/unix_events.py

Lines changed: 212 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import errno
44
import io
5+
import itertools
56
import os
67
import selectors
78
import signal
@@ -29,7 +30,9 @@
2930
__all__ = (
3031
'SelectorEventLoop',
3132
'AbstractChildWatcher', 'SafeChildWatcher',
32-
'FastChildWatcher', 'DefaultEventLoopPolicy',
33+
'FastChildWatcher',
34+
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
35+
'DefaultEventLoopPolicy',
3336
)
3437

3538

@@ -184,6 +187,13 @@ async def _make_subprocess_transport(self, protocol, args, shell,
184187
stdin, stdout, stderr, bufsize,
185188
extra=None, **kwargs):
186189
with events.get_child_watcher() as watcher:
190+
if not watcher.is_active():
191+
# Check early.
192+
# Raising exception before process creation
193+
# prevents subprocess execution if the watcher
194+
# is not ready to handle it.
195+
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
196+
"subproccess support is not installed.")
187197
waiter = self.create_future()
188198
transp = _UnixSubprocessTransport(self, protocol, args, shell,
189199
stdin, stdout, stderr, bufsize,
@@ -838,6 +848,15 @@ def close(self):
838848
"""
839849
raise NotImplementedError()
840850

851+
def is_active(self):
852+
"""Watcher status.
853+
854+
Return True if the watcher is installed and ready to handle process exit
855+
notifications.
856+
857+
"""
858+
raise NotImplementedError()
859+
841860
def __enter__(self):
842861
"""Enter the watcher's context and allow starting new processes
843862
@@ -849,6 +868,20 @@ def __exit__(self, a, b, c):
849868
raise NotImplementedError()
850869

851870

871+
def _compute_returncode(status):
872+
if os.WIFSIGNALED(status):
873+
# The child process died because of a signal.
874+
return -os.WTERMSIG(status)
875+
elif os.WIFEXITED(status):
876+
# The child process exited (e.g sys.exit()).
877+
return os.WEXITSTATUS(status)
878+
else:
879+
# The child exited, but we don't understand its status.
880+
# This shouldn't happen, but if it does, let's just
881+
# return that status; perhaps that helps debug it.
882+
return status
883+
884+
852885
class BaseChildWatcher(AbstractChildWatcher):
853886

854887
def __init__(self):
@@ -858,6 +891,9 @@ def __init__(self):
858891
def close(self):
859892
self.attach_loop(None)
860893

894+
def is_active(self):
895+
return self._loop is not None and self._loop.is_running()
896+
861897
def _do_waitpid(self, expected_pid):
862898
raise NotImplementedError()
863899

@@ -898,19 +934,6 @@ def _sig_chld(self):
898934
'exception': exc,
899935
})
900936

901-
def _compute_returncode(self, status):
902-
if os.WIFSIGNALED(status):
903-
# The child process died because of a signal.
904-
return -os.WTERMSIG(status)
905-
elif os.WIFEXITED(status):
906-
# The child process exited (e.g sys.exit()).
907-
return os.WEXITSTATUS(status)
908-
else:
909-
# The child exited, but we don't understand its status.
910-
# This shouldn't happen, but if it does, let's just
911-
# return that status; perhaps that helps debug it.
912-
return status
913-
914937

915938
class SafeChildWatcher(BaseChildWatcher):
916939
"""'Safe' child watcher implementation.
@@ -934,11 +957,6 @@ def __exit__(self, a, b, c):
934957
pass
935958

936959
def add_child_handler(self, pid, callback, *args):
937-
if self._loop is None:
938-
raise RuntimeError(
939-
"Cannot add child handler, "
940-
"the child watcher does not have a loop attached")
941-
942960
self._callbacks[pid] = (callback, args)
943961

944962
# Prevent a race condition in case the child is already terminated.
@@ -974,7 +992,7 @@ def _do_waitpid(self, expected_pid):
974992
# The child process is still alive.
975993
return
976994

977-
returncode = self._compute_returncode(status)
995+
returncode = _compute_returncode(status)
978996
if self._loop.get_debug():
979997
logger.debug('process %s exited with returncode %s',
980998
expected_pid, returncode)
@@ -1035,11 +1053,6 @@ def __exit__(self, a, b, c):
10351053
def add_child_handler(self, pid, callback, *args):
10361054
assert self._forks, "Must use the context manager"
10371055

1038-
if self._loop is None:
1039-
raise RuntimeError(
1040-
"Cannot add child handler, "
1041-
"the child watcher does not have a loop attached")
1042-
10431056
with self._lock:
10441057
try:
10451058
returncode = self._zombies.pop(pid)
@@ -1072,7 +1085,7 @@ def _do_waitpid_all(self):
10721085
# A child process is still alive.
10731086
return
10741087

1075-
returncode = self._compute_returncode(status)
1088+
returncode = _compute_returncode(status)
10761089

10771090
with self._lock:
10781091
try:
@@ -1101,6 +1114,177 @@ def _do_waitpid_all(self):
11011114
callback(pid, returncode, *args)
11021115

11031116

1117+
class MultiLoopChildWatcher(AbstractChildWatcher):
1118+
# The class keeps compatibility with AbstractChildWatcher ABC
1119+
# To achieve this it has empty attach_loop() method
1120+
# and doesn't accept explicit loop argument
1121+
# for add_child_handler()/remove_child_handler()
1122+
# but retrieves the current loop by get_running_loop()
1123+
1124+
def __init__(self):
1125+
self._callbacks = {}
1126+
self._saved_sighandler = None
1127+
1128+
def is_active(self):
1129+
return self._saved_sighandler is not None
1130+
1131+
def close(self):
1132+
self._callbacks.clear()
1133+
if self._saved_sighandler is not None:
1134+
handler = signal.getsignal(signal.SIGCHLD)
1135+
if handler != self._sig_chld:
1136+
logger.warning("SIGCHLD handler was changed by outside code")
1137+
else:
1138+
signal.signal(signal.SIGCHLD, self._saved_sighandler)
1139+
self._saved_sighandler = None
1140+
1141+
def __enter__(self):
1142+
return self
1143+
1144+
def __exit__(self, exc_type, exc_val, exc_tb):
1145+
pass
1146+
1147+
def add_child_handler(self, pid, callback, *args):
1148+
loop = events.get_running_loop()
1149+
self._callbacks[pid] = (loop, callback, args)
1150+
1151+
# Prevent a race condition in case the child is already terminated.
1152+
self._do_waitpid(pid)
1153+
1154+
def remove_child_handler(self, pid):
1155+
try:
1156+
del self._callbacks[pid]
1157+
return True
1158+
except KeyError:
1159+
return False
1160+
1161+
def attach_loop(self, loop):
1162+
# Don't save the loop but initialize itself if called first time
1163+
# The reason to do it here is that attach_loop() is called from
1164+
# unix policy only for the main thread.
1165+
# Main thread is required for subscription on SIGCHLD signal
1166+
if self._saved_sighandler is None:
1167+
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1168+
if self._saved_sighandler is None:
1169+
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1170+
"restore to default handler on watcher close.")
1171+
self._saved_sighandler = signal.SIG_DFL
1172+
1173+
# Set SA_RESTART to limit EINTR occurrences.
1174+
signal.siginterrupt(signal.SIGCHLD, False)
1175+
1176+
def _do_waitpid_all(self):
1177+
for pid in list(self._callbacks):
1178+
self._do_waitpid(pid)
1179+
1180+
def _do_waitpid(self, expected_pid):
1181+
assert expected_pid > 0
1182+
1183+
try:
1184+
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1185+
except ChildProcessError:
1186+
# The child process is already reaped
1187+
# (may happen if waitpid() is called elsewhere).
1188+
pid = expected_pid
1189+
returncode = 255
1190+
logger.warning(
1191+
"Unknown child process pid %d, will report returncode 255",
1192+
pid)
1193+
debug_log = False
1194+
else:
1195+
if pid == 0:
1196+
# The child process is still alive.
1197+
return
1198+
1199+
returncode = _compute_returncode(status)
1200+
debug_log = True
1201+
try:
1202+
loop, callback, args = self._callbacks.pop(pid)
1203+
except KeyError: # pragma: no cover
1204+
# May happen if .remove_child_handler() is called
1205+
# after os.waitpid() returns.
1206+
logger.warning("Child watcher got an unexpected pid: %r",
1207+
pid, exc_info=True)
1208+
else:
1209+
if loop.is_closed():
1210+
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1211+
else:
1212+
if debug_log and loop.get_debug():
1213+
logger.debug('process %s exited with returncode %s',
1214+
expected_pid, returncode)
1215+
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1216+
1217+
def _sig_chld(self, signum, frame):
1218+
try:
1219+
self._do_waitpid_all()
1220+
except (SystemExit, KeyboardInterrupt):
1221+
raise
1222+
except BaseException:
1223+
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1224+
1225+
1226+
class ThreadedChildWatcher(AbstractChildWatcher):
1227+
# The watcher uses a thread per process
1228+
# for waiting for the process finish.
1229+
# It doesn't require subscription on POSIX signal
1230+
1231+
def __init__(self):
1232+
self._pid_counter = itertools.count(0)
1233+
1234+
def is_active(self):
1235+
return True
1236+
1237+
def close(self):
1238+
pass
1239+
1240+
def __enter__(self):
1241+
return self
1242+
1243+
def __exit__(self, exc_type, exc_val, exc_tb):
1244+
pass
1245+
1246+
def add_child_handler(self, pid, callback, *args):
1247+
loop = events.get_running_loop()
1248+
thread = threading.Thread(target=self._do_waitpid,
1249+
name=f"waitpid-{next(self._pid_counter)}",
1250+
args=(loop, pid, callback, args),
1251+
daemon=True)
1252+
thread.start()
1253+
1254+
def remove_child_handler(self, pid):
1255+
# asyncio never calls remove_child_handler() !!!
1256+
# The method is no-op but is implemented because
1257+
# abstract base classe requires it
1258+
return True
1259+
1260+
def attach_loop(self, loop):
1261+
pass
1262+
1263+
def _do_waitpid(self, loop, expected_pid, callback, args):
1264+
assert expected_pid > 0
1265+
1266+
try:
1267+
pid, status = os.waitpid(expected_pid, 0)
1268+
except ChildProcessError:
1269+
# The child process is already reaped
1270+
# (may happen if waitpid() is called elsewhere).
1271+
pid = expected_pid
1272+
returncode = 255
1273+
logger.warning(
1274+
"Unknown child process pid %d, will report returncode 255",
1275+
pid)
1276+
else:
1277+
returncode = _compute_returncode(status)
1278+
if loop.get_debug():
1279+
logger.debug('process %s exited with returncode %s',
1280+
expected_pid, returncode)
1281+
1282+
if loop.is_closed():
1283+
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1284+
else:
1285+
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1286+
1287+
11041288
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
11051289
"""UNIX event loop policy with a watcher for child processes."""
11061290
_loop_factory = _UnixSelectorEventLoop
@@ -1112,7 +1296,7 @@ def __init__(self):
11121296
def _init_watcher(self):
11131297
with events._lock:
11141298
if self._watcher is None: # pragma: no branch
1115-
self._watcher = SafeChildWatcher()
1299+
self._watcher = ThreadedChildWatcher()
11161300
if isinstance(threading.current_thread(),
11171301
threading._MainThread):
11181302
self._watcher.attach_loop(self._local._loop)
@@ -1134,7 +1318,7 @@ def set_event_loop(self, loop):
11341318
def get_child_watcher(self):
11351319
"""Get the watcher for child processes.
11361320
1137-
If not yet set, a SafeChildWatcher object is automatically created.
1321+
If not yet set, a ThreadedChildWatcher object is automatically created.
11381322
"""
11391323
if self._watcher is None:
11401324
self._init_watcher()

Lib/test/test_asyncio/test_subprocess.py

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -633,6 +633,7 @@ async def execute():
633633

634634
self.assertIsNone(self.loop.run_until_complete(execute()))
635635

636+
636637
if sys.platform != 'win32':
637638
# Unix
638639
class SubprocessWatcherMixin(SubprocessMixin):
@@ -648,7 +649,24 @@ def setUp(self):
648649
watcher = self.Watcher()
649650
watcher.attach_loop(self.loop)
650651
policy.set_child_watcher(watcher)
651-
self.addCleanup(policy.set_child_watcher, None)
652+
653+
def tearDown(self):
654+
super().setUp()
655+
policy = asyncio.get_event_loop_policy()
656+
watcher = policy.get_child_watcher()
657+
policy.set_child_watcher(None)
658+
watcher.attach_loop(None)
659+
watcher.close()
660+
661+
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
662+
test_utils.TestCase):
663+
664+
Watcher = unix_events.ThreadedChildWatcher
665+
666+
class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
667+
test_utils.TestCase):
668+
669+
Watcher = unix_events.MultiLoopChildWatcher
652670

653671
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
654672
test_utils.TestCase):
@@ -670,5 +688,25 @@ def setUp(self):
670688
self.set_event_loop(self.loop)
671689

672690

691+
class GenericWatcherTests:
692+
693+
def test_create_subprocess_fails_with_inactive_watcher(self):
694+
695+
async def execute():
696+
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
697+
watcher.is_active.return_value = False
698+
asyncio.set_child_watcher(watcher)
699+
700+
with self.assertRaises(RuntimeError):
701+
await subprocess.create_subprocess_exec(
702+
support.FakePath(sys.executable), '-c', 'pass')
703+
704+
watcher.add_child_handler.assert_not_called()
705+
706+
self.assertIsNone(self.loop.run_until_complete(execute()))
707+
708+
709+
710+
673711
if __name__ == '__main__':
674712
unittest.main()

0 commit comments

Comments
 (0)