Skip to content

Commit c789c43

Browse files
committed
asyncio: Rework AddThreadSelectorEventLoop
Running a whole event loop on the other thread leads to tricky synchronization problems. Instead, keep as much as possible on the main thread, and call out to a second thread only for the blocking select system call itself.
1 parent ff7e028 commit c789c43

File tree

3 files changed

+205
-127
lines changed

3 files changed

+205
-127
lines changed

tornado/platform/asyncio.py

Lines changed: 182 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -22,40 +22,51 @@
2222
import asyncio
2323
import atexit
2424
import concurrent.futures
25+
import errno
2526
import functools
26-
import itertools
27+
import select
28+
import socket
2729
import sys
2830
import threading
2931
import typing
3032
from tornado.gen import convert_yielded
3133
from tornado.ioloop import IOLoop, _Selectable
3234

33-
from typing import Any, TypeVar, Awaitable, Callable, Union, Optional
35+
from typing import Any, TypeVar, Awaitable, Callable, Union, Optional, List, Tuple, Dict
3436

3537
if typing.TYPE_CHECKING:
36-
from typing import Set, Dict, Tuple # noqa: F401
37-
38-
_T = TypeVar("_T")
39-
40-
41-
class _HasFileno(typing.Protocol):
42-
def fileno(self) -> int:
43-
pass
38+
from typing import Set # noqa: F401
39+
from typing_extensions import Protocol
4440

41+
class _HasFileno(Protocol):
42+
def fileno(self) -> int:
43+
pass
4544

46-
_FileDescriptorLike = Union[int, _HasFileno]
45+
_FileDescriptorLike = Union[int, _HasFileno]
4746

47+
_T = TypeVar("_T")
4848

49-
_seq_gen = itertools.count()
5049

51-
_atexit_run = False
50+
# Collection of sockets to write to at shutdown to wake up any selector threads.
51+
_waker_sockets = set() # type: Set[socket.socket]
5252

5353

5454
def _atexit_callback() -> None:
55-
global _atexit_run
56-
_atexit_run = True
55+
for fd in _waker_sockets:
56+
try:
57+
fd.send(b"a")
58+
except BlockingIOError:
59+
pass
5760

5861

62+
# atexit callbacks are run in LIFO order. Our callback must run before
63+
# ThreadPoolExecutor's or it will deadlock (the pool's threads can't
64+
# finish their work items until we write to their waker sockets). In
65+
# recent versions of Python the thread pool atexit callback is
66+
# registered in a getattr hook the first time TPE is *referenced*
67+
# (instead of older versions of python where it was registered when
68+
# concurrent.futures was imported).
69+
concurrent.futures.ThreadPoolExecutor
5970
atexit.register(_atexit_callback)
6071

6172

@@ -71,7 +82,7 @@ def initialize( # type: ignore
7182
# as windows where the default event loop does not implement these methods.
7283
self.selector_loop = asyncio_loop
7384
if hasattr(asyncio, "ProactorEventLoop") and isinstance(
74-
asyncio_loop, asyncio.ProactorEventLoop
85+
asyncio_loop, asyncio.ProactorEventLoop # type: ignore
7586
):
7687
# Ignore this line for mypy because the abstract method checker
7788
# doesn't understand dynamic proxies.
@@ -237,7 +248,7 @@ def run_in_executor(
237248
self,
238249
executor: Optional[concurrent.futures.Executor],
239250
func: Callable[..., _T],
240-
*args: Any,
251+
*args: Any
241252
) -> Awaitable[_T]:
242253
return self.asyncio_loop.run_in_executor(executor, func, *args)
243254

@@ -389,9 +400,9 @@ def get_event_loop(self) -> asyncio.AbstractEventLoop:
389400
class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
390401
"""Wrap an event loop to add implementations of the ``add_reader`` method family.
391402
392-
Instances of this class start a second thread to run a selector-based event loop.
393-
This thread is completely hidden from the user; all callbacks are run on the
394-
wrapped event loop's thread.
403+
Instances of this class start a second thread to run a selector.
404+
This thread is completely hidden from the user; all callbacks are
405+
run on the wrapped event loop's thread.
395406
396407
This class is used automatically by Tornado; applications should not need
397408
to refer to it directly.
@@ -400,137 +411,182 @@ class AddThreadSelectorEventLoop(asyncio.AbstractEventLoop):
400411
for event loops that do not implement the ``add_reader`` family of methods
401412
themselves (i.e. ``WindowsProactorEventLoop``)
402413
403-
Closing the ``AddThreadSelectorEventLoop`` does not close the wrapped event loop.
414+
Closing the ``AddThreadSelectorEventLoop`` also closes the wrapped event loop.
415+
404416
"""
405417

406418
# This class is a __getattribute__-based proxy. All attributes other than those
407419
# in this set are proxied through to the underlying loop.
408420
MY_ATTRIBUTES = {
421+
"_consume_waker",
422+
"_executor",
423+
"_handle_event",
424+
"_readers",
425+
"_real_loop",
426+
"_run_select",
427+
"_select_loop",
428+
"_selector_task",
429+
"_start_selector",
430+
"_wake_selector",
431+
"_waker_r",
432+
"_waker_w",
433+
"_writers",
409434
"add_reader",
410435
"add_writer",
436+
"close",
411437
"remove_reader",
412438
"remove_writer",
413-
"close",
414-
"_real_loop",
415-
"_selector_loop",
416-
"_selector_thread",
417-
"_run_on_selector",
418-
"_handle_event_from_selector",
419-
"_reader_seq",
420-
"_writer_seq",
421439
}
422440

441+
def __getattribute__(self, name: str) -> Any:
442+
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
443+
return super().__getattribute__(name)
444+
return getattr(self._real_loop, name)
445+
423446
def __init__(self, real_loop: asyncio.AbstractEventLoop) -> None:
424447
self._real_loop = real_loop
425-
426-
# Sequence numbers allow us to detect races between the selector thread
427-
# and the main thread, such as when a handler for a file descriptor has
428-
# been removed and re-added. These maps go from file descriptor to a
429-
# sequence number.
430-
self._reader_seq = {} # type: Dict[_FileDescriptorLike, int]
431-
self._writer_seq = {} # type: Dict[_FileDescriptorLike, int]
432-
433-
fut = (
434-
concurrent.futures.Future()
435-
) # type: concurrent.futures.Future[asyncio.AbstractEventLoop]
436-
437-
def f() -> None:
438-
loop = asyncio.SelectorEventLoop()
439-
fut.set_result(loop)
440-
loop.run_forever()
441-
loop.close()
442-
443-
self._selector_thread = threading.Thread(target=f)
444-
# Must be a daemon in case this event loop is not explicitly closed
445-
# (often the case for the main loop).
446-
self._selector_thread.daemon = True
447-
self._selector_thread.start()
448-
self._selector_loop = fut.result()
448+
# Create our own executor to ensure we always have a thread
449+
# available (we'll keep it 100% busy) instead of contending
450+
# with the application for a thread in the default executor.
451+
self._executor = concurrent.futures.ThreadPoolExecutor(1)
452+
self._selector_task = None
453+
# Start the select loop once the loop is started.
454+
self._real_loop.call_soon(self._start_selector)
455+
456+
self._readers = {} # type: Dict[_FileDescriptorLike, Callable]
457+
self._writers = {} # type: Dict[_FileDescriptorLike, Callable]
458+
459+
# Writing to _waker_w will wake up the selector thread, which
460+
# watches for _waker_r to be readable.
461+
self._waker_r, self._waker_w = socket.socketpair()
462+
self._waker_r.setblocking(False)
463+
self._waker_w.setblocking(False)
464+
_waker_sockets.add(self._waker_w)
465+
self.add_reader(self._waker_r, self._consume_waker)
466+
467+
def __del__(self) -> None:
468+
# If the top-level application code uses asyncio interfaces to
469+
# start and stop the event loop, no objects created in Tornado
470+
# can get a clean shutdown notification. If we're just left to
471+
# be GC'd, we must explicitly close our sockets to avoid
472+
# logging warnings.
473+
_waker_sockets.discard(self._waker_w)
474+
self._waker_r.close()
475+
self._waker_w.close()
449476

450477
def close(self) -> None:
451-
self._selector_loop.call_soon_threadsafe(self._selector_loop.stop)
452-
if not _atexit_run:
453-
# Shutdown is tricky: Our thread must be set as a daemon so that it
454-
# doesn't prevent shutdown in the common case of an unclosed main
455-
# loop. But daemon threads are halted relatively early in the
456-
# interpreter shutdown process; once this happens attempts to join
457-
# them will block forever.
458-
#
459-
# I can't find formal documentation of this, but as of cpython 3.8
460-
# the shutdown order is
461-
# 1. atexit functions
462-
# 2. daemon threads halt
463-
# 3. global destructors run
464-
#
465-
# If we're running after atexit functions, we're probably in a
466-
# global destructor. But in any case, we know that the process is
467-
# about to exit and it's no longer necessary to join our daemon
468-
# thread. (Is it ever necessary to join it? Probably not but it
469-
# feels dirty not to)
470-
self._selector_thread.join()
478+
if self._selector_task is not None:
479+
self._selector_task.cancel()
480+
try:
481+
# Cancellation is not immediate (coroutines are given
482+
# a chance to catch the error and recover) so we must
483+
# restart the loop here to allow our selector task to
484+
# finish and avoid logging warnings at shutdown.
485+
self._real_loop.run_until_complete(self._selector_task)
486+
except asyncio.CancelledError:
487+
pass
488+
self._wake_selector()
489+
self._executor.shutdown()
490+
_waker_sockets.discard(self._waker_w)
491+
self._waker_r.close()
492+
self._waker_w.close()
471493
self._real_loop.close()
472494

473-
def __getattribute__(self, name: str) -> Any:
474-
if name in AddThreadSelectorEventLoop.MY_ATTRIBUTES:
475-
return super().__getattribute__(name)
476-
return getattr(self._real_loop, name)
495+
def _wake_selector(self) -> None:
496+
try:
497+
self._waker_w.send(b"a")
498+
except BlockingIOError:
499+
pass
477500

478-
def _run_on_selector(self, method: Callable[..., _T], *args: Any) -> _T:
479-
"""Synchronously run the given method on the selector thread.
480-
"""
481-
fut = concurrent.futures.Future() # type: concurrent.futures.Future[_T]
501+
def _consume_waker(self) -> None:
502+
try:
503+
self._waker_r.recv(1024)
504+
except BlockingIOError:
505+
pass
482506

483-
def wrapper() -> None:
484-
try:
485-
result = method(*args)
486-
except Exception as e:
487-
fut.set_exception(e)
488-
else:
489-
fut.set_result(result)
507+
def _run_select(
508+
self, to_read: List[int], to_write: List[int]
509+
) -> Tuple[List[int], List[int]]:
510+
# We use the simpler interface of the select module instead of
511+
# the more stateful interface in the selectors module because
512+
# this class is only intended for use on windows, where
513+
# select.select is the only option. The selector interface
514+
# does not have well-documented thread-safety semantics that
515+
# we can rely on so ensuring proper synchronization would be
516+
# tricky.
517+
try:
518+
# On windows, selecting on a socket for write will not
519+
# return the socket when there is an error (but selecting
520+
# for reads works). Also select for errors when selecting
521+
# for writes, and merge the results.
522+
#
523+
# This pattern is also used in
524+
# https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
525+
rs, ws, xs = select.select(to_read, to_write, to_write)
526+
ws = ws + xs
527+
except OSError as e:
528+
# After remove_reader or remove_writer is called, the file
529+
# descriptor may subsequently be closed on the event loop
530+
# thread. It's possible that this select thread hasn't
531+
# gotten into the select system call by the time that
532+
# happens in which case (at least on macOS), select may
533+
# raise a "bad file descriptor" error. If we get that
534+
# error, check and see if we're also being woken up by
535+
# polling the waker alone. If we are, just return to the
536+
# event loop and we'll get the updated set of file
537+
# descriptors on the next iteration. Otherwise, raise the
538+
# original error.
539+
if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
540+
rs, _, _ = select.select([self._waker_r.fileno()], [], [], 0)
541+
if rs:
542+
return rs, []
543+
raise
544+
return rs, ws
545+
546+
def _start_selector(self) -> None:
547+
self._selector_task = asyncio.create_task(self._select_loop()) # type: ignore
548+
549+
async def _select_loop(self) -> None:
550+
while True:
551+
# Capture reader and writer sets here in the event loop
552+
# thread to avoid any problems with concurrent
553+
# modification while the select loop uses them.
554+
rs, ws = await self.run_in_executor(
555+
self._executor,
556+
self._run_select,
557+
list(self._readers.keys()),
558+
list(self._writers.keys()),
559+
)
560+
for r in rs:
561+
self._handle_event(r, self._readers)
562+
for w in ws:
563+
self._handle_event(w, self._writers)
490564

491-
self._selector_loop.call_soon_threadsafe(wrapper)
492-
return fut.result()
565+
def _handle_event(
566+
self, fd: "_FileDescriptorLike", cb_map: Dict["_FileDescriptorLike", Callable],
567+
) -> None:
568+
try:
569+
callback = cb_map[fd]
570+
except KeyError:
571+
return
572+
callback()
493573

494574
def add_reader(
495-
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
575+
self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
496576
) -> None:
497-
seq = next(_seq_gen)
498-
self._reader_seq[fd] = seq
499-
500-
def wrapper() -> None:
501-
if self._reader_seq.get(fd, None) != seq:
502-
return
503-
callback(*args)
504-
505-
return self._run_on_selector(
506-
self._selector_loop.add_reader,
507-
fd,
508-
self._real_loop.call_soon_threadsafe,
509-
wrapper,
510-
)
577+
self._readers[fd] = functools.partial(callback, *args)
578+
self._wake_selector()
511579

512580
def add_writer(
513-
self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
581+
self, fd: "_FileDescriptorLike", callback: Callable[..., None], *args: Any
514582
) -> None:
515-
seq = next(_seq_gen)
516-
self._writer_seq[fd] = seq
517-
518-
def wrapper() -> None:
519-
if self._writer_seq.get(fd, None) != seq:
520-
return
521-
callback(*args)
522-
523-
return self._run_on_selector(
524-
self._selector_loop.add_writer,
525-
fd,
526-
self._real_loop.call_soon_threadsafe,
527-
wrapper,
528-
)
583+
self._writers[fd] = functools.partial(callback, *args)
584+
self._wake_selector()
529585

530-
def remove_reader(self, fd: _FileDescriptorLike) -> None:
531-
del self._reader_seq[fd]
532-
return self._run_on_selector(self._selector_loop.remove_reader, fd)
586+
def remove_reader(self, fd: "_FileDescriptorLike") -> None:
587+
del self._readers[fd]
588+
self._wake_selector()
533589

534-
def remove_writer(self, fd: _FileDescriptorLike) -> None:
535-
del self._writer_seq[fd]
536-
return self._run_on_selector(self._selector_loop.remove_writer, fd)
590+
def remove_writer(self, fd: "_FileDescriptorLike") -> None:
591+
del self._writers[fd]
592+
self._wake_selector()

0 commit comments

Comments
 (0)