Skip to content

Commit 92f979b

Browse files
committed
bpo-42110: prototype fix for MultiLoopChildWatcher
1 parent c60394c commit 92f979b

File tree

2 files changed

+108
-56
lines changed

2 files changed

+108
-56
lines changed

Lib/asyncio/unix_events.py

Lines changed: 89 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -1202,6 +1202,79 @@ def _do_waitpid_all(self):
12021202
callback(pid, returncode, *args)
12031203

12041204

1205+
# Internal helper for MultiLoopChildWatcher.
1206+
# Manage the child handlers for a single event loop.
1207+
# So all accesses to this are from the same thread.
1208+
class _LoopChildWatcher:
1209+
def __init__(self, loop):
1210+
self._loop = loop
1211+
self._callbacks = {}
1212+
1213+
def add_child_handler(self, pid, callback, *args):
1214+
self._callbacks[pid] = (callback, args)
1215+
1216+
# Prevent a race condition in case the child is already terminated.
1217+
self._do_waitpid(pid)
1218+
1219+
def remove_child_handler(self, pid):
1220+
try:
1221+
del self._callbacks[pid]
1222+
return True
1223+
except KeyError:
1224+
return False
1225+
1226+
def empty(self):
1227+
return not self._callbacks
1228+
1229+
def _do_waitpid(self, expected_pid):
1230+
assert expected_pid > 0
1231+
1232+
try:
1233+
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1234+
except ChildProcessError:
1235+
# The child process is already reaped
1236+
# (may happen if waitpid() is called elsewhere).
1237+
pid = expected_pid
1238+
returncode = 255
1239+
logger.warning(
1240+
"Unknown child process pid %d, will report returncode 255",
1241+
pid)
1242+
debug_log = False
1243+
else:
1244+
if pid == 0:
1245+
# The child process is still alive.
1246+
return
1247+
1248+
returncode = _compute_returncode(status)
1249+
debug_log = True
1250+
try:
1251+
callback, args = self._callbacks.pop(pid)
1252+
except KeyError: # pragma: no cover
1253+
# May happen if .remove_child_handler() is called
1254+
# after os.waitpid() returns.
1255+
logger.warning("Child watcher got an unexpected pid: %r",
1256+
pid, exc_info=True)
1257+
else:
1258+
if debug_log and self._loop.get_debug():
1259+
logger.debug('process %s exited with returncode %s',
1260+
expected_pid, returncode)
1261+
self._loop.call_soon(callback, pid, returncode, *args)
1262+
1263+
def do_waitpid_all(self):
1264+
try:
1265+
for pid in list(self._callbacks):
1266+
self._do_waitpid(pid)
1267+
except (SystemExit, KeyboardInterrupt):
1268+
raise
1269+
except BaseException:
1270+
# self._loop should always be available here
1271+
# as we are only called via loop.call_soon_threadsafe()
1272+
self._loop.call_exception_handler({
1273+
'message': 'Unknown exception in SIGCHLD handler',
1274+
'exception': exc,
1275+
})
1276+
1277+
12051278
class MultiLoopChildWatcher(AbstractChildWatcher):
12061279
"""A watcher that doesn't require running loop in the main thread.
12071280
@@ -1222,14 +1295,14 @@ class MultiLoopChildWatcher(AbstractChildWatcher):
12221295
# but retrieves the current loop by get_running_loop()
12231296

12241297
def __init__(self):
1225-
self._callbacks = {}
1298+
self._loops = {} # event loop -> _LoopChildWatcher
12261299
self._saved_sighandler = None
12271300

12281301
def is_active(self):
12291302
return self._saved_sighandler is not None
12301303

12311304
def close(self):
1232-
self._callbacks.clear()
1305+
self._loops.clear()
12331306
if self._saved_sighandler is not None:
12341307
handler = signal.getsignal(signal.SIGCHLD)
12351308
if handler != self._sig_chld:
@@ -1246,17 +1319,18 @@ def __exit__(self, exc_type, exc_val, exc_tb):
12461319

12471320
def add_child_handler(self, pid, callback, *args):
12481321
loop = events.get_running_loop()
1249-
self._callbacks[pid] = (loop, callback, args)
1250-
1251-
# Prevent a race condition in case the child is already terminated.
1252-
self._do_waitpid(pid)
1322+
if not loop in self._loops:
1323+
self._loops[loop] = _LoopChildWatcher(loop)
1324+
watcher = self._loops[loop]
1325+
watcher.add_child_handler(pid, callback, *args)
12531326

12541327
def remove_child_handler(self, pid):
1255-
try:
1256-
del self._callbacks[pid]
1257-
return True
1258-
except KeyError:
1328+
if not loop in self._loops:
12591329
return False
1330+
watcher = self._loops[loop]
1331+
ret = watcher.remove_child_handler(pid)
1332+
if watcher.empty():
1333+
del self._loops[loop]
12601334

12611335
def attach_loop(self, loop):
12621336
# Don't save the loop but initialize itself if called first time
@@ -1273,54 +1347,13 @@ def attach_loop(self, loop):
12731347
# Set SA_RESTART to limit EINTR occurrences.
12741348
signal.siginterrupt(signal.SIGCHLD, False)
12751349

1276-
def _do_waitpid_all(self):
1277-
for pid in list(self._callbacks):
1278-
self._do_waitpid(pid)
1279-
1280-
def _do_waitpid(self, expected_pid):
1281-
assert expected_pid > 0
1282-
1283-
try:
1284-
pid, status = os.waitpid(expected_pid, os.WNOHANG)
1285-
except ChildProcessError:
1286-
# The child process is already reaped
1287-
# (may happen if waitpid() is called elsewhere).
1288-
pid = expected_pid
1289-
returncode = 255
1290-
logger.warning(
1291-
"Unknown child process pid %d, will report returncode 255",
1292-
pid)
1293-
debug_log = False
1294-
else:
1295-
if pid == 0:
1296-
# The child process is still alive.
1297-
return
1298-
1299-
returncode = _compute_returncode(status)
1300-
debug_log = True
1301-
try:
1302-
loop, callback, args = self._callbacks.pop(pid)
1303-
except KeyError: # pragma: no cover
1304-
# May happen if .remove_child_handler() is called
1305-
# after os.waitpid() returns.
1306-
logger.warning("Child watcher got an unexpected pid: %r",
1307-
pid, exc_info=True)
1308-
else:
1350+
def _sig_chld(self, signum, frame):
1351+
for loop, watcher in self._loops.items():
1352+
# TODO - is this good enough? can we do better?
13091353
if loop.is_closed():
1310-
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
1354+
logger.warning("Loop %r is closed, but it still had running subprocesses", loop)
13111355
else:
1312-
if debug_log and loop.get_debug():
1313-
logger.debug('process %s exited with returncode %s',
1314-
expected_pid, returncode)
1315-
loop.call_soon_threadsafe(callback, pid, returncode, *args)
1316-
1317-
def _sig_chld(self, signum, frame):
1318-
try:
1319-
self._do_waitpid_all()
1320-
except (SystemExit, KeyboardInterrupt):
1321-
raise
1322-
except BaseException:
1323-
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)
1356+
loop.call_soon_threadsafe(watcher.do_waitpid_all)
13241357

13251358

13261359
class ThreadedChildWatcher(AbstractChildWatcher):

Lib/test/test_asyncio/test_subprocess.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import os
22
import signal
33
import sys
4+
import time
45
import unittest
56
import warnings
67
from unittest import mock
@@ -655,6 +656,24 @@ async def go():
655656
await proc.wait()
656657
self.loop.run_until_complete(go())
657658

659+
def test_kill_before_wait(self):
660+
# Patch os.kill to call sleep(0.1) first,
661+
# opening up the window for a race condition.
662+
os_kill = os.kill
663+
def kill(pid, signum):
664+
time.sleep(0.1)
665+
os_kill(pid, signum)
666+
667+
with mock.patch('subprocess.os.kill', kill):
668+
async def go():
669+
p = await asyncio.create_subprocess_shell("exit 0")
670+
try:
671+
p.send_signal(signal.SIGTERM)
672+
finally:
673+
# cleanup
674+
await p.wait()
675+
self.loop.run_until_complete(go())
676+
658677

659678
if sys.platform != 'win32':
660679
# Unix

0 commit comments

Comments
 (0)