Skip to content

Commit 9535aff

Browse files
asvetlovmiss-islington
authored andcommitted
Revert "bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)" (GH-13793)
https://bugs.python.org/issue35621
1 parent eddef86 commit 9535aff

File tree

4 files changed

+54
-262
lines changed

4 files changed

+54
-262
lines changed

Lib/asyncio/unix_events.py

Lines changed: 28 additions & 212 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22

33
import errno
44
import io
5-
import itertools
65
import os
76
import selectors
87
import signal
@@ -30,9 +29,7 @@
3029
__all__ = (
3130
'SelectorEventLoop',
3231
'AbstractChildWatcher', 'SafeChildWatcher',
33-
'FastChildWatcher',
34-
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
35-
'DefaultEventLoopPolicy',
32+
'FastChildWatcher', 'DefaultEventLoopPolicy',
3633
)
3734

3835

@@ -187,13 +184,6 @@ async def _make_subprocess_transport(self, protocol, args, shell,
187184
stdin, stdout, stderr, bufsize,
188185
extra=None, **kwargs):
189186
with events.get_child_watcher() as watcher:
190-
if not watcher.is_active():
191-
# Check early.
192-
# Raising exception before process creation
193-
# prevents subprocess execution if the watcher
194-
# is not ready to handle it.
195-
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
196-
"subprocess support is not installed.")
197187
waiter = self.create_future()
198188
transp = _UnixSubprocessTransport(self, protocol, args, shell,
199189
stdin, stdout, stderr, bufsize,
@@ -848,15 +838,6 @@ def close(self):
848838
"""
849839
raise NotImplementedError()
850840

851-
def is_active(self):
852-
"""Watcher status.
853-
854-
Return True if the watcher is installed and ready to handle process exit
855-
notifications.
856-
857-
"""
858-
raise NotImplementedError()
859-
860841
def __enter__(self):
861842
"""Enter the watcher's context and allow starting new processes
862843
@@ -868,20 +849,6 @@ def __exit__(self, a, b, c):
868849
raise NotImplementedError()
869850

870851

871-
def _compute_returncode(status):
872-
if os.WIFSIGNALED(status):
873-
# The child process died because of a signal.
874-
return -os.WTERMSIG(status)
875-
elif os.WIFEXITED(status):
876-
# The child process exited (e.g sys.exit()).
877-
return os.WEXITSTATUS(status)
878-
else:
879-
# The child exited, but we don't understand its status.
880-
# This shouldn't happen, but if it does, let's just
881-
# return that status; perhaps that helps debug it.
882-
return status
883-
884-
885852
class BaseChildWatcher(AbstractChildWatcher):
886853

887854
def __init__(self):
@@ -891,9 +858,6 @@ def __init__(self):
891858
def close(self):
892859
self.attach_loop(None)
893860

894-
def is_active(self):
895-
return self._loop is not None and self._loop.is_running()
896-
897861
def _do_waitpid(self, expected_pid):
898862
raise NotImplementedError()
899863

@@ -934,6 +898,19 @@ def _sig_chld(self):
934898
'exception': exc,
935899
})
936900

901+
def _compute_returncode(self, status):
902+
if os.WIFSIGNALED(status):
903+
# The child process died because of a signal.
904+
return -os.WTERMSIG(status)
905+
elif os.WIFEXITED(status):
906+
# The child process exited (e.g sys.exit()).
907+
return os.WEXITSTATUS(status)
908+
else:
909+
# The child exited, but we don't understand its status.
910+
# This shouldn't happen, but if it does, let's just
911+
# return that status; perhaps that helps debug it.
912+
return status
913+
937914

938915
class SafeChildWatcher(BaseChildWatcher):
939916
"""'Safe' child watcher implementation.
@@ -957,6 +934,11 @@ def __exit__(self, a, b, c):
957934
pass
958935

959936
def add_child_handler(self, pid, callback, *args):
937+
if self._loop is None:
938+
raise RuntimeError(
939+
"Cannot add child handler, "
940+
"the child watcher does not have a loop attached")
941+
960942
self._callbacks[pid] = (callback, args)
961943

962944
# Prevent a race condition in case the child is already terminated.
@@ -992,7 +974,7 @@ def _do_waitpid(self, expected_pid):
992974
# The child process is still alive.
993975
return
994976

995-
returncode = _compute_returncode(status)
977+
returncode = self._compute_returncode(status)
996978
if self._loop.get_debug():
997979
logger.debug('process %s exited with returncode %s',
998980
expected_pid, returncode)
@@ -1053,6 +1035,11 @@ def __exit__(self, a, b, c):
10531035
def add_child_handler(self, pid, callback, *args):
10541036
assert self._forks, "Must use the context manager"
10551037

1038+
if self._loop is None:
1039+
raise RuntimeError(
1040+
"Cannot add child handler, "
1041+
"the child watcher does not have a loop attached")
1042+
10561043
with self._lock:
10571044
try:
10581045
returncode = self._zombies.pop(pid)
@@ -1085,7 +1072,7 @@ def _do_waitpid_all(self):
10851072
# A child process is still alive.
10861073
return
10871074

1088-
returncode = _compute_returncode(status)
1075+
returncode = self._compute_returncode(status)
10891076

10901077
with self._lock:
10911078
try:
@@ -1114,177 +1101,6 @@ def _do_waitpid_all(self):
11141101
callback(pid, returncode, *args)
11151102

11161103

1117-
class MultiLoopChildWatcher(AbstractChildWatcher):
1118-
# The class keeps compatibility with AbstractChildWatcher ABC
1119-
# To achieve this it has empty attach_loop() method
1120-
# and doesn't accept explicit loop argument
1121-
# for add_child_handler()/remove_child_handler()
1122-
# but retrieves the current loop by get_running_loop()
1123-
1124-
def __init__(self):
1125-
self._callbacks = {}
1126-
self._saved_sighandler = None
1127-
1128-
def is_active(self):
1129-
return self._saved_sighandler is not None
1130-
1131-
def close(self):
1132-
self._callbacks.clear()
1133-
if self._saved_sighandler is not None:
1134-
handler = signal.getsignal(signal.SIGCHLD)
1135-
if handler != self._sig_chld:
1136-
logger.warning("SIGCHLD handler was changed by outside code")
1137-
else:
1138-
signal.signal(signal.SIGCHLD, self._saved_sighandler)
1139-
self._saved_sighandler = None
1140-
1141-
def __enter__(self):
1142-
return self
1143-
1144-
def __exit__(self, exc_type, exc_val, exc_tb):
1145-
pass
1146-
1147-
def add_child_handler(self, pid, callback, *args):
1148-
loop = events.get_running_loop()
1149-
self._callbacks[pid] = (loop, callback, args)
1150-
1151-
# Prevent a race condition in case the child is already terminated.
1152-
self._do_waitpid(pid)
1153-
1154-
def remove_child_handler(self, pid):
1155-
try:
1156-
del self._callbacks[pid]
1157-
return True
1158-
except KeyError:
1159-
return False
1160-
1161-
def attach_loop(self, loop):
1162-
# Don't save the loop but initialize itself if called first time
1163-
# The reason to do it here is that attach_loop() is called from
1164-
# unix policy only for the main thread.
1165-
# Main thread is required for subscription on SIGCHLD signal
1166-
if self._saved_sighandler is None:
1167-
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
1168-
if self._saved_sighandler is None:
1169-
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
1170-
"restore to default handler on watcher close.")
1171-
self._saved_sighandler = signal.SIG_DFL
1172-
1173-
# Set SA_RESTART to limit EINTR occurrences.
1174-
signal.siginterrupt(signal.SIGCHLD, False)
1175-
1176-
def _do_waitpid_all(self):
1177-
for pid in list(self._callbacks):
1178-
self._do_waitpid(pid)
1179-
1180-
def _do_waitpid(self, expected_pid):
1181-
assert expected_pid > 0
1182-
1183-
try:
1184-
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1185-
except ChildProcessError:
1186-
# The child process is already reaped
1187-
# (may happen if waitpid() is called elsewhere).
1188-
pid = expected_pid
1189-
returncode = 255
1190-
logger.warning(
1191-
"Unknown child process pid %d, will report returncode 255",
1192-
pid)
1193-
debug_log = False
1194-
else:
1195-
if pid == 0:
1196-
# The child process is still alive.
1197-
return
1198-
1199-
returncode = _compute_returncode(status)
1200-
debug_log = True
1201-
try:
1202-
loop, callback, args = self._callbacks.pop(pid)
1203-
except KeyError: # pragma: no cover
1204-
# May happen if .remove_child_handler() is called
1205-
# after os.waitpid() returns.
1206-
logger.warning("Child watcher got an unexpected pid: %r",
1207-
pid, exc_info=True)
1208-
else:
1209-
if loop.is_closed():
1210-
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1211-
else:
1212-
if debug_log and loop.get_debug():
1213-
logger.debug('process %s exited with returncode %s',
1214-
expected_pid, returncode)
1215-
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1216-
1217-
def _sig_chld(self, signum, frame):
1218-
try:
1219-
self._do_waitpid_all()
1220-
except (SystemExit, KeyboardInterrupt):
1221-
raise
1222-
except BaseException:
1223-
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1224-
1225-
1226-
class ThreadedChildWatcher(AbstractChildWatcher):
1227-
# The watcher uses a thread per process
1228-
# for waiting for the process finish.
1229-
# It doesn't require subscription on POSIX signal
1230-
1231-
def __init__(self):
1232-
self._pid_counter = itertools.count(0)
1233-
1234-
def is_active(self):
1235-
return True
1236-
1237-
def close(self):
1238-
pass
1239-
1240-
def __enter__(self):
1241-
return self
1242-
1243-
def __exit__(self, exc_type, exc_val, exc_tb):
1244-
pass
1245-
1246-
def add_child_handler(self, pid, callback, *args):
1247-
loop = events.get_running_loop()
1248-
thread = threading.Thread(target=self._do_waitpid,
1249-
name=f"waitpid-{next(self._pid_counter)}",
1250-
args=(loop, pid, callback, args),
1251-
daemon=True)
1252-
thread.start()
1253-
1254-
def remove_child_handler(self, pid):
1255-
# asyncio never calls remove_child_handler() !!!
1256-
# The method is no-op but is implemented because
1257-
# abstract base classe requires it
1258-
return True
1259-
1260-
def attach_loop(self, loop):
1261-
pass
1262-
1263-
def _do_waitpid(self, loop, expected_pid, callback, args):
1264-
assert expected_pid > 0
1265-
1266-
try:
1267-
pid, status = os.waitpid(expected_pid, 0)
1268-
except ChildProcessError:
1269-
# The child process is already reaped
1270-
# (may happen if waitpid() is called elsewhere).
1271-
pid = expected_pid
1272-
returncode = 255
1273-
logger.warning(
1274-
"Unknown child process pid %d, will report returncode 255",
1275-
pid)
1276-
else:
1277-
returncode = _compute_returncode(status)
1278-
if loop.get_debug():
1279-
logger.debug('process %s exited with returncode %s',
1280-
expected_pid, returncode)
1281-
1282-
if loop.is_closed():
1283-
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1284-
else:
1285-
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1286-
1287-
12881104
class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
12891105
"""UNIX event loop policy with a watcher for child processes."""
12901106
_loop_factory = _UnixSelectorEventLoop
@@ -1296,7 +1112,7 @@ def __init__(self):
12961112
def _init_watcher(self):
12971113
with events._lock:
12981114
if self._watcher is None: # pragma: no branch
1299-
self._watcher = ThreadedChildWatcher()
1115+
self._watcher = SafeChildWatcher()
13001116
if isinstance(threading.current_thread(),
13011117
threading._MainThread):
13021118
self._watcher.attach_loop(self._local._loop)
@@ -1318,7 +1134,7 @@ def set_event_loop(self, loop):
13181134
def get_child_watcher(self):
13191135
"""Get the watcher for child processes.
13201136
1321-
If not yet set, a ThreadedChildWatcher object is automatically created.
1137+
If not yet set, a SafeChildWatcher object is automatically created.
13221138
"""
13231139
if self._watcher is None:
13241140
self._init_watcher()

Lib/test/test_asyncio/test_subprocess.py

Lines changed: 1 addition & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -633,7 +633,6 @@ async def execute():
633633

634634
self.assertIsNone(self.loop.run_until_complete(execute()))
635635

636-
637636
if sys.platform != 'win32':
638637
# Unix
639638
class SubprocessWatcherMixin(SubprocessMixin):
@@ -649,24 +648,7 @@ def setUp(self):
649648
watcher = self.Watcher()
650649
watcher.attach_loop(self.loop)
651650
policy.set_child_watcher(watcher)
652-
653-
def tearDown(self):
654-
super().setUp()
655-
policy = asyncio.get_event_loop_policy()
656-
watcher = policy.get_child_watcher()
657-
policy.set_child_watcher(None)
658-
watcher.attach_loop(None)
659-
watcher.close()
660-
661-
class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
662-
test_utils.TestCase):
663-
664-
Watcher = unix_events.ThreadedChildWatcher
665-
666-
class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
667-
test_utils.TestCase):
668-
669-
Watcher = unix_events.MultiLoopChildWatcher
651+
self.addCleanup(policy.set_child_watcher, None)
670652

671653
class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
672654
test_utils.TestCase):
@@ -688,25 +670,5 @@ def setUp(self):
688670
self.set_event_loop(self.loop)
689671

690672

691-
class GenericWatcherTests:
692-
693-
def test_create_subprocess_fails_with_inactive_watcher(self):
694-
695-
async def execute():
696-
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
697-
watcher.is_active.return_value = False
698-
asyncio.set_child_watcher(watcher)
699-
700-
with self.assertRaises(RuntimeError):
701-
await subprocess.create_subprocess_exec(
702-
support.FakePath(sys.executable), '-c', 'pass')
703-
704-
watcher.add_child_handler.assert_not_called()
705-
706-
self.assertIsNone(self.loop.run_until_complete(execute()))
707-
708-
709-
710-
711673
if __name__ == '__main__':
712674
unittest.main()

0 commit comments

Comments
 (0)