Skip to content

iostream: Resolve pending reads on stream close #2805

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 1, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 28 additions & 2 deletions tornado/iostream.py
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,15 @@ def close(
if self._read_until_close:
self._read_until_close = False
self._finish_read(self._read_buffer_size, False)
elif self._read_future is not None:
# resolve reads that are pending and ready to complete
try:
pos = self._find_read_pos()
except UnsatisfiableReadError:
pass
else:
if pos is not None:
self._read_from_buffer(pos)
if self._state is not None:
self.io_loop.remove_handler(self.fileno())
self._state = None
Expand Down Expand Up @@ -784,8 +793,25 @@ def _handle_read(self) -> None:
self._read_from_buffer(pos)

def _start_read(self) -> Future:
self._check_closed() # Before reading, check that stream is not closed.
assert self._read_future is None, "Already reading"
if self._read_future is not None:
# It is an error to start a read while a prior read is unresolved.
# However, if the prior read is unresolved because the stream was
# closed without satisfying it, it's better to raise
# StreamClosedError instead of AssertionError. In particular, this
# situation occurs in harmless situations in http1connection.py and
# an AssertionError would be logged noisily.
#
# On the other hand, it is legal to start a new read while the
# stream is closed, in case the read can be satisfied from the
# read buffer. So we only want to check the closed status of the
# stream if we need to decide what kind of error to raise for
# "already reading".
#
# These conditions have proven difficult to test; we have no
# unittests that reliably verify this behavior so be careful
# when making changes here. See #2651 and #2719.
self._check_closed()
assert self._read_future is None, "Already reading"
self._read_future = Future()
return self._read_future

Expand Down
56 changes: 56 additions & 0 deletions tornado/test/iostream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from tornado.test.util import skipIfNonUnix, refusing_port, skipPypy3V58
from tornado.web import RequestHandler, Application
import asyncio
import errno
import hashlib
import logging
Expand Down Expand Up @@ -166,6 +167,27 @@ class TestReadWriteMixin(object):
def make_iostream_pair(self, **kwargs):
raise NotImplementedError

def iostream_pair(self, **kwargs):
"""Like make_iostream_pair, but called by ``async with``.

In py37 this becomes simpler with contextlib.asynccontextmanager.
"""

class IOStreamPairContext:
def __init__(self, test, kwargs):
self.test = test
self.kwargs = kwargs

async def __aenter__(self):
self.pair = await self.test.make_iostream_pair(**self.kwargs)
return self.pair

async def __aexit__(self, typ, value, tb):
for s in self.pair:
s.close()

return IOStreamPairContext(self, kwargs)

@gen_test
def test_write_zero_bytes(self):
# Attempting to write zero bytes should run the callback without
Expand Down Expand Up @@ -261,6 +283,40 @@ def test_large_read_until(self: typing.Any):
ws.close()
rs.close()

@gen_test
async def test_read_until_with_close_after_second_packet(self):
# This is a regression test for a regression in Tornado 6.0
# (maybe 6.0.3?) reported in
# https://github.com/tornadoweb/tornado/issues/2717
#
# The data arrives in two chunks; the stream is closed at the
# same time that the second chunk is received. If the second
# chunk is larger than the first, it works, but when this bug
# existed it would fail if the second chunk were smaller than
# the first. This is due to the optimization that the
# read_until condition is only checked when the buffer doubles
# in size
async with self.iostream_pair() as (rs, ws):
rf = asyncio.ensure_future(rs.read_until(b"done"))
await ws.write(b"x" * 2048)
ws.write(b"done")
ws.close()
await rf

@gen_test
async def test_read_until_unsatisfied_after_close(self: typing.Any):
# If a stream is closed while reading, it raises
# StreamClosedError instead of UnsatisfiableReadError (the
# latter should only be raised when byte limits are reached).
# The particular scenario tested here comes from #2717.
async with self.iostream_pair() as (rs, ws):
rf = asyncio.ensure_future(rs.read_until(b"done"))
await ws.write(b"x" * 2048)
ws.write(b"foo")
ws.close()
with self.assertRaises(StreamClosedError):
await rf

@gen_test
def test_close_callback_with_pending_read(self: typing.Any):
# Regression test for a bug that was introduced in 2.3
Expand Down