From 060cba131b4b1f63df79977cc76cfc1f24f56cea Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20Bo=C4=8Dek?= Date: Tue, 17 Dec 2024 17:35:21 +0100 Subject: [PATCH 1/6] Fix incomplete writes after close in asyncio._SelectorSocketTransport MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Change introduced in gh-106503 cause the transport to miss writes scheduled after it is closed. Signed-off-by: Vojtěch Boček --- Lib/asyncio/selector_events.py | 2 +- Lib/test/test_asyncio/test_selector_events.py | 39 +++++++++++++++++++ Misc/ACKS | 1 + ...-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst | 3 ++ 4 files changed, 44 insertions(+), 1 deletion(-) create mode 100644 Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 50992a607b3a1c..d43ebdefac139c 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1186,6 +1186,7 @@ def can_write_eof(self): def _call_connection_lost(self, exc): super()._call_connection_lost(exc) + self._write_ready = None if self._empty_waiter is not None: self._empty_waiter.set_exception( ConnectionError("Connection is closed by peer")) @@ -1203,7 +1204,6 @@ def _reset_empty_waiter(self): def close(self): self._read_ready_cb = None - self._write_ready = None super().close() diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index c9217d04bcd322..8aba08892bf676 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1051,6 +1051,45 @@ def test_transport_close_remove_writer(self, m_log): transport.close() remove_writer.assert_called_with(self.sock_fd) + def test_write_buffer_after_close(self): + # If the transport is closed while: + # * Transport write buffer is not empty + # * Transport is paused + # * Protocol has data in its buffer, like SSLProtocol in self._outgoing + # The data is still written out. + + data = memoryview(b'data') + self.sock.send.return_value = 2 + self.sock.send.fileno.return_value = 7 + + def _resume_writing(): + transport.write(b"data") + self.protocol.resume_writing.side_effect = None + + self.protocol.resume_writing.side_effect = _resume_writing + + transport = self.socket_transport() + transport._high_water = 1 + + transport.write(data) + + self.assertTrue(transport._protocol_paused) + self.assertTrue(self.sock.send.called) + self.loop.assert_writer(7, transport._write_ready) + + transport.close() + + # not called, we still have data in write buffer + self.assertFalse(self.protocol.connection_lost.called) + + self.loop.writers[7]._run() + # during this ^ run, the _resume_writing mock above was called and added more data + + self.assertEqual(transport.get_write_buffer_size(), 2) + self.loop.writers[7]._run() + + self.assertEqual(transport.get_write_buffer_size(), 0) + self.assertTrue(self.protocol.connection_lost.called) class SelectorSocketTransportBufferedProtocolTests(test_utils.TestCase): diff --git a/Misc/ACKS b/Misc/ACKS index deda334bee7417..fcc3caa8342fff 100644 --- a/Misc/ACKS +++ b/Misc/ACKS @@ -189,6 +189,7 @@ Stéphane Blondon Eric Blossom Sergey Bobrov Finn Bock +Vojtěch Boček Paul Boddie Matthew Boedicker Robin Boerdijk diff --git a/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst b/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst new file mode 100644 index 00000000000000..df03ae91dbeeaa --- /dev/null +++ b/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst @@ -0,0 +1,3 @@ +Fix exceptions and incomplete writes after asyncio _SelectorTransport +is closed before writes are completed. +Breaking change introduced in gh-106503, released in 3.12.0. From 26908b7a95b0990ff2ffb208cb5868de3c8fd434 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20Bo=C4=8Dek?= Date: Wed, 18 Dec 2024 10:04:15 +0100 Subject: [PATCH 2/6] Run cleanup in finally in _SelectorSocketTransport._call_connection_lost --- Lib/asyncio/selector_events.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index d43ebdefac139c..22147451fa7ebd 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -1185,11 +1185,13 @@ def can_write_eof(self): return True def _call_connection_lost(self, exc): - super()._call_connection_lost(exc) - self._write_ready = None - if self._empty_waiter is not None: - self._empty_waiter.set_exception( - ConnectionError("Connection is closed by peer")) + try: + super()._call_connection_lost(exc) + finally: + self._write_ready = None + if self._empty_waiter is not None: + self._empty_waiter.set_exception( + ConnectionError("Connection is closed by peer")) def _make_empty_waiter(self): if self._empty_waiter is not None: From 9a7be93a6755f7ae23e7ac6ec63cb1f258949c10 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20Bo=C4=8Dek?= Date: Mon, 13 Jan 2025 08:11:34 +0100 Subject: [PATCH 3/6] chore: review tweaks --- Lib/test/test_asyncio/test_selector_events.py | 2 +- .../Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index 8aba08892bf676..b6a3998174aaaf 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1052,7 +1052,7 @@ def test_transport_close_remove_writer(self, m_log): remove_writer.assert_called_with(self.sock_fd) def test_write_buffer_after_close(self): - # If the transport is closed while: + # gh-115514: If the transport is closed while: # * Transport write buffer is not empty # * Transport is paused # * Protocol has data in its buffer, like SSLProtocol in self._outgoing diff --git a/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst b/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst index df03ae91dbeeaa..24e836a0b0b7f9 100644 --- a/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst +++ b/Misc/NEWS.d/next/Library/2024-12-17-16-48-02.gh-issue-115514.1yOJ7T.rst @@ -1,3 +1,2 @@ -Fix exceptions and incomplete writes after asyncio _SelectorTransport +Fix exceptions and incomplete writes after :class:`!asyncio._SelectorTransport` is closed before writes are completed. -Breaking change introduced in gh-106503, released in 3.12.0. From c3265061071399f9e913b6a3a969238e3d3cf5eb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20Bo=C4=8Dek?= Date: Mon, 20 Jan 2025 08:14:12 +0100 Subject: [PATCH 4/6] Make test_remote_shutdown_receives_trailing_data trigger gh-115514 --- Lib/test/test_asyncio/test_selector_events.py | 3 +++ Lib/test/test_asyncio/test_ssl.py | 26 ++++++++++++++++--- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_asyncio/test_selector_events.py b/Lib/test/test_asyncio/test_selector_events.py index b6a3998174aaaf..9d094a7b041276 100644 --- a/Lib/test/test_asyncio/test_selector_events.py +++ b/Lib/test/test_asyncio/test_selector_events.py @@ -1058,6 +1058,9 @@ def test_write_buffer_after_close(self): # * Protocol has data in its buffer, like SSLProtocol in self._outgoing # The data is still written out. + # Also tested with real SSL transport in + # test.test_asyncio.test_ssl.TestSSL.test_remote_shutdown_receives_trailing_data + data = memoryview(b'data') self.sock.send.return_value = 2 self.sock.send.fileno.return_value = 7 diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py index 125a6c35793c44..2fffcc4021e3ab 100644 --- a/Lib/test/test_asyncio/test_ssl.py +++ b/Lib/test/test_asyncio/test_ssl.py @@ -12,6 +12,7 @@ import tempfile import threading import time +import unittest.mock import weakref import unittest @@ -1410,10 +1411,27 @@ async def client(addr): except (BrokenPipeError, ConnectionResetError): pass - await future - - writer.close() - await self.wait_closed(writer) + # Make sure _SelectorSocketTransport enters the delayed write + # path in its `write` method by setting the socket buffer to small value, + # so that `socket.send` does not consume all the data. + # This triggers bug gh-115514, also tested using mocks in + # test.test_asyncio.test_selector_events.SelectorSocketTransportTests.test_write_buffer_after_close + socket_transport = writer.transport._ssl_protocol._transport + + def _shrink_sock_buffer(data): + if socket_transport._read_ready_cb is None: + socket_transport.get_extra_info("socket").setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) + return unittest.mock.DEFAULT + + with unittest.mock.patch.object( + socket_transport, "write", + wraps=socket_transport.write, + side_effect=_shrink_sock_buffer + ): + await future + + writer.close() + await self.wait_closed(writer) def run(meth): def wrapper(sock): From f88ae9299f29c44d537a544f457c8eb515bea91d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20Bo=C4=8Dek?= Date: Mon, 20 Jan 2025 13:06:32 +0100 Subject: [PATCH 5/6] Split ssl test into another case, test_remote_shutdown_receives_trailing_data_on_slow_socket --- Lib/test/test_asyncio/test_ssl.py | 131 ++++++++++++++++++++++++++++++ 1 file changed, 131 insertions(+) diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py index 2fffcc4021e3ab..1443959ebe1fd2 100644 --- a/Lib/test/test_asyncio/test_ssl.py +++ b/Lib/test/test_asyncio/test_ssl.py @@ -1319,6 +1319,137 @@ def test_remote_shutdown_receives_trailing_data(self): client_sslctx = self._create_client_ssl_context() future = None + def server(sock): + incoming = ssl.MemoryBIO() + outgoing = ssl.MemoryBIO() + sslobj = sslctx.wrap_bio(incoming, outgoing, server_side=True) + + while True: + try: + sslobj.do_handshake() + except ssl.SSLWantReadError: + if outgoing.pending: + sock.send(outgoing.read()) + incoming.write(sock.recv(16384)) + else: + if outgoing.pending: + sock.send(outgoing.read()) + break + + while True: + try: + data = sslobj.read(4) + except ssl.SSLWantReadError: + incoming.write(sock.recv(16384)) + else: + break + + self.assertEqual(data, b'ping') + sslobj.write(b'pong') + sock.send(outgoing.read()) + + time.sleep(0.2) # wait for the peer to fill its backlog + + # send close_notify but don't wait for response + with self.assertRaises(ssl.SSLWantReadError): + sslobj.unwrap() + sock.send(outgoing.read()) + + # should receive all data + data_len = 0 + while True: + try: + chunk = len(sslobj.read(16384)) + data_len += chunk + except ssl.SSLWantReadError: + incoming.write(sock.recv(16384)) + except ssl.SSLZeroReturnError: + break + + self.assertEqual(data_len, CHUNK * SIZE) + + # verify that close_notify is received + sslobj.unwrap() + + sock.close() + + def eof_server(sock): + sock.starttls(sslctx, server_side=True) + self.assertEqual(sock.recv_all(4), b'ping') + sock.send(b'pong') + + time.sleep(0.2) # wait for the peer to fill its backlog + + # send EOF + sock.shutdown(socket.SHUT_WR) + + # should receive all data + data = sock.recv_all(CHUNK * SIZE) + self.assertEqual(len(data), CHUNK * SIZE) + + sock.close() + + async def client(addr): + nonlocal future + future = self.loop.create_future() + + reader, writer = await asyncio.open_connection( + *addr, + ssl=client_sslctx, + server_hostname='') + writer.write(b'ping') + data = await reader.readexactly(4) + self.assertEqual(data, b'pong') + + # fill write backlog in a hacky way - renegotiation won't help + for _ in range(SIZE): + writer.transport._test__append_write_backlog(b'x' * CHUNK) + + try: + data = await reader.read() + self.assertEqual(data, b'') + except (BrokenPipeError, ConnectionResetError): + pass + + await future + + writer.close() + await self.wait_closed(writer) + + def run(meth): + def wrapper(sock): + try: + meth(sock) + except Exception as ex: + self.loop.call_soon_threadsafe(future.set_exception, ex) + else: + self.loop.call_soon_threadsafe(future.set_result, None) + return wrapper + + with self.tcp_server(run(server)) as srv: + self.loop.run_until_complete(client(srv.addr)) + + with self.tcp_server(run(eof_server)) as srv: + self.loop.run_until_complete(client(srv.addr)) + + def test_remote_shutdown_receives_trailing_data_on_slow_socket(self): + # This test is the same as test_remote_shutdown_receives_trailing_data, + # except it simulates a socket that is not able to write data in time, + # thus triggering different code path in _SelectorSocketTransport. + # This triggers bug gh-115514, also tested using mocks in + # test.test_asyncio.test_selector_events.SelectorSocketTransportTests.test_write_buffer_after_close + # The slow path is triggered here by setting SO_SNDBUF, see code and comment below. + + CHUNK = 1024 * 128 + SIZE = 32 + + sslctx = self._create_server_ssl_context( + test_utils.ONLYCERT, + test_utils.ONLYKEY + ) + client_sslctx = self._create_client_ssl_context() + future = None + def server(sock): incoming = ssl.MemoryBIO() outgoing = ssl.MemoryBIO() From 8669d21fffe7fc0233ef736d34fe3ec230288aec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Vojt=C4=9Bch=20Bo=C4=8Dek?= Date: Sat, 1 Feb 2025 01:05:21 +0100 Subject: [PATCH 6/6] Make ssl test work on Linux --- Lib/test/test_asyncio/test_ssl.py | 28 ++++++++++++++++++++-------- 1 file changed, 20 insertions(+), 8 deletions(-) diff --git a/Lib/test/test_asyncio/test_ssl.py b/Lib/test/test_asyncio/test_ssl.py index 1443959ebe1fd2..ac774307c7942b 100644 --- a/Lib/test/test_asyncio/test_ssl.py +++ b/Lib/test/test_asyncio/test_ssl.py @@ -1497,7 +1497,7 @@ def server(sock): except ssl.SSLZeroReturnError: break - self.assertEqual(data_len, CHUNK * SIZE) + self.assertEqual(data_len, CHUNK * SIZE*2) # verify that close_notify is received sslobj.unwrap() @@ -1533,7 +1533,7 @@ async def client(addr): self.assertEqual(data, b'pong') # fill write backlog in a hacky way - renegotiation won't help - for _ in range(SIZE): + for _ in range(SIZE*2): writer.transport._test__append_write_backlog(b'x' * CHUNK) try: @@ -1543,21 +1543,33 @@ async def client(addr): pass # Make sure _SelectorSocketTransport enters the delayed write - # path in its `write` method by setting the socket buffer to small value, - # so that `socket.send` does not consume all the data. + # path in its `write` method by wrapping socket in a fake class + # that acts as if there is not enough space in socket buffer. # This triggers bug gh-115514, also tested using mocks in # test.test_asyncio.test_selector_events.SelectorSocketTransportTests.test_write_buffer_after_close socket_transport = writer.transport._ssl_protocol._transport - def _shrink_sock_buffer(data): - if socket_transport._read_ready_cb is None: - socket_transport.get_extra_info("socket").setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 1) + class SocketWrapper: + def __init__(self, sock) -> None: + self.sock = sock + + def __getattr__(self, name): + return getattr(self.sock, name) + + def send(self, data): + # Fake that our write buffer is full, send only half + to_send = len(data)//2 + return self.sock.send(data[:to_send]) + + def _fake_full_write_buffer(data): + if socket_transport._read_ready_cb is None and not isinstance(socket_transport._sock, SocketWrapper): + socket_transport._sock = SocketWrapper(socket_transport._sock) return unittest.mock.DEFAULT with unittest.mock.patch.object( socket_transport, "write", wraps=socket_transport.write, - side_effect=_shrink_sock_buffer + side_effect=_fake_full_write_buffer ): await future