From 0fec860d4094212b0bbec177420677eac27c196d Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Sun, 9 Mar 2025 18:04:20 +1100 Subject: [PATCH 1/9] feat: add asyncio server state machine --- Lib/asyncio/base_events.py | 62 +++++++++++++++---- Lib/asyncio/selector_events.py | 7 ++- Lib/test/test_asyncio/test_server.py | 29 +++++++++ ...-03-09-23-10-39.gh-issue-109564.r9rnIB.rst | 1 + 4 files changed, 86 insertions(+), 13 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2025-03-09-23-10-39.gh-issue-109564.r9rnIB.rst diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 546361f80b1f47..a3df973c262e5d 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -16,6 +16,7 @@ import collections import collections.abc import concurrent.futures +import enum import errno import heapq import itertools @@ -272,6 +273,23 @@ async def restore(self): self._proto.resume_writing() +class _ServerState(enum.Enum): + """This tracks the state of Server. + + -[in]->INITIALIZED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN + + - in: Server.__init__() + - ss: Server._start_serving() + - cl: Server.close() + - wk: Server._wakeup() *only called if number of clients == 0 + """ + + INITIALIZED = "initialized" + SERVING = "serving" + CLOSED = "closed" + SHUTDOWN = "shutdown" + + class Server(events.AbstractServer): def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, @@ -287,22 +305,34 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, self._ssl_context = ssl_context self._ssl_handshake_timeout = ssl_handshake_timeout self._ssl_shutdown_timeout = ssl_shutdown_timeout - self._serving = False + self._state = _ServerState.INITIALIZED self._serving_forever_fut = None def __repr__(self): return f'<{self.__class__.__name__} sockets={self.sockets!r}>' def _attach(self, transport): - assert self._sockets is not None + if self._state != _ServerState.SERVING: + raise RuntimeError("server is not serving, cannot attach transport") self._clients.add(transport) def _detach(self, transport): self._clients.discard(transport) - if len(self._clients) == 0 and self._sockets is None: + if self._state == _ServerState.CLOSED and len(self._clients) == 0: self._wakeup() def _wakeup(self): + match self._state: + case _ServerState.SHUTDOWN: + # gh109564: the wakeup method has two possible call-sites, + # through an explicit call Server.close(), or indirectly through + # Server._detach() by the last connected client. + return + case _ServerState.INITIALIZED | _ServerState.SERVING: + raise RuntimeError("cannot wakeup server before closing") + case _ServerState.CLOSED: + self._state = _ServerState.SHUTDOWN + waiters = self._waiters self._waiters = None for waiter in waiters: @@ -310,9 +340,14 @@ def _wakeup(self): waiter.set_result(None) def _start_serving(self): - if self._serving: - return - self._serving = True + match self._state: + case _ServerState.SERVING: + return + case _ServerState.CLOSED | _ServerState.SHUTDOWN: + raise RuntimeError(f'server {self!r} is closed') + case _ServerState.INITIALIZED: + self._state = _ServerState.SERVING + for sock in self._sockets: sock.listen(self._backlog) self._loop._start_serving( @@ -324,7 +359,7 @@ def get_loop(self): return self._loop def is_serving(self): - return self._serving + return self._state == _ServerState.SERVING @property def sockets(self): @@ -333,6 +368,13 @@ def sockets(self): return tuple(trsock.TransportSocket(s) for s in self._sockets) def close(self): + match self._state: + case _ServerState.CLOSED | _ServerState.SHUTDOWN: + # Shutdown state can only be reached after closing. + return + case _: + self._state = _ServerState.CLOSED + sockets = self._sockets if sockets is None: return @@ -341,8 +383,6 @@ def close(self): for sock in sockets: self._loop._stop_serving(sock) - self._serving = False - if (self._serving_forever_fut is not None and not self._serving_forever_fut.done()): self._serving_forever_fut.cancel() @@ -369,8 +409,6 @@ async def serve_forever(self): if self._serving_forever_fut is not None: raise RuntimeError( f'server {self!r} is already being awaited on serve_forever()') - if self._sockets is None: - raise RuntimeError(f'server {self!r} is closed') self._start_serving() self._serving_forever_fut = self._loop.create_future() @@ -407,7 +445,7 @@ async def wait_closed(self): # from two places: self.close() and self._detach(), but only # when both conditions have become true. To signal that this # has happened, self._wakeup() sets self._waiters to None. - if self._waiters is None: + if self._state == _ServerState.SHUTDOWN: return waiter = self._loop.create_future() self._waiters.append(waiter) diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index 22147451fa7ebd..c947aaf59178fc 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -795,7 +795,12 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): self._paused = False # Set when pause_reading() called if self._server is not None: - self._server._attach(self) + if self._server.is_serving(): + self._server._attach(self) + else: + self.abort() + return + loop._transports[self._sock_fd] = self def __repr__(self): diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py index 32211f4cba32cb..1c90cf5de36df4 100644 --- a/Lib/test/test_asyncio/test_server.py +++ b/Lib/test/test_asyncio/test_server.py @@ -4,6 +4,7 @@ import time import threading import unittest +from unittest.mock import Mock from test.support import socket_helper from test.test_asyncio import utils as test_utils @@ -186,6 +187,8 @@ async def serve(rd, wr): loop.call_soon(srv.close) loop.call_soon(wr.close) await srv.wait_closed() + self.assertTrue(task.done()) + self.assertFalse(srv.is_serving()) async def test_close_clients(self): async def serve(rd, wr): @@ -212,6 +215,9 @@ async def serve(rd, wr): await asyncio.sleep(0) self.assertTrue(task.done()) + with self.assertRaisesRegex(RuntimeError, r'is closed'): + await srv.start_serving() + async def test_abort_clients(self): async def serve(rd, wr): fut.set_result((rd, wr)) @@ -266,6 +272,29 @@ async def serve(rd, wr): await asyncio.sleep(0) self.assertTrue(task.done()) + async def test_close_before_transport_attach(self): + proto = Mock() + loop = asyncio.get_running_loop() + srv = await loop.create_server(lambda *_: proto, socket_helper.HOSTv4, 0) + + await srv.start_serving() + addr = srv.sockets[0].getsockname() + + # Create a connection to the server but close the server before the + # socket transport for the connection is created and attached + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: + s.connect(addr) + await asyncio.sleep(0) # loop select reader + await asyncio.sleep(0) # accept conn 1 + srv.close() + + # Ensure the protocol is given an opportunity to handle this event + # gh109564: the transport would be unclosed and will cause a loop + # exception due to a double-call to Server._wakeup + await asyncio.sleep(0) + await asyncio.sleep(0) + proto.connection_lost.assert_called() + # Test the various corner cases of Unix server socket removal class UnixServerCleanupTests(unittest.IsolatedAsyncioTestCase): diff --git a/Misc/NEWS.d/next/Library/2025-03-09-23-10-39.gh-issue-109564.r9rnIB.rst b/Misc/NEWS.d/next/Library/2025-03-09-23-10-39.gh-issue-109564.r9rnIB.rst new file mode 100644 index 00000000000000..70e981a8a5dd5f --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-03-09-23-10-39.gh-issue-109564.r9rnIB.rst @@ -0,0 +1 @@ +Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan. From f3b96bfc7468676dc4037cde2257eb72b303e4a9 Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Sun, 9 Mar 2025 23:46:19 +1100 Subject: [PATCH 2/9] fix: fix proactor transport for windows --- Lib/asyncio/proactor_events.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index 7eb55bd63ddb73..b52827216016e3 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -63,7 +63,11 @@ def __init__(self, loop, sock, protocol, waiter=None, self._called_connection_lost = False self._eof_written = False if self._server is not None: - self._server._attach(self) + if self._server.is_serving(): + self._server._attach(self) + else: + self.abort() + return self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: # only wake up the waiter when connection_made() has been called From 8e409b7eb61062b8ec45672180965f6beacab9f2 Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Tue, 11 Mar 2025 10:07:38 +1100 Subject: [PATCH 3/9] fix: change match to if statements --- Lib/asyncio/base_events.py | 42 +++++++++++++++++--------------------- 1 file changed, 19 insertions(+), 23 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index a3df973c262e5d..5702207cdb397d 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -322,16 +322,15 @@ def _detach(self, transport): self._wakeup() def _wakeup(self): - match self._state: - case _ServerState.SHUTDOWN: - # gh109564: the wakeup method has two possible call-sites, - # through an explicit call Server.close(), or indirectly through - # Server._detach() by the last connected client. - return - case _ServerState.INITIALIZED | _ServerState.SERVING: - raise RuntimeError("cannot wakeup server before closing") - case _ServerState.CLOSED: - self._state = _ServerState.SHUTDOWN + if self._state == _ServerState.CLOSED: + self._state = _ServerState.SHUTDOWN + elif self._state == _ServerState.SHUTDOWN: + # gh109564: the wakeup method has two possible call-sites, + # through an explicit call Server.close(), or indirectly through + # Server._detach() by the last connected client. + return + else: + raise RuntimeError(f"server {self!r} can only wakeup waiters after closing") waiters = self._waiters self._waiters = None @@ -340,13 +339,12 @@ def _wakeup(self): waiter.set_result(None) def _start_serving(self): - match self._state: - case _ServerState.SERVING: - return - case _ServerState.CLOSED | _ServerState.SHUTDOWN: - raise RuntimeError(f'server {self!r} is closed') - case _ServerState.INITIALIZED: - self._state = _ServerState.SERVING + if self._state == _ServerState.INITIALIZED: + self._state = _ServerState.SERVING + elif self._state == _ServerState.SERVING: + return + else: + raise RuntimeError(f'server {self!r} is closed') for sock in self._sockets: sock.listen(self._backlog) @@ -368,12 +366,10 @@ def sockets(self): return tuple(trsock.TransportSocket(s) for s in self._sockets) def close(self): - match self._state: - case _ServerState.CLOSED | _ServerState.SHUTDOWN: - # Shutdown state can only be reached after closing. - return - case _: - self._state = _ServerState.CLOSED + if self._state == _ServerState.CLOSED or self._state == _ServerState.SHUTDOWN: + return + else: + self._state = _ServerState.CLOSED sockets = self._sockets if sockets is None: From a92158a88c4cfee9821961c56ccd1fef519ce0c7 Mon Sep 17 00:00:00 2001 From: Jamie Phan Date: Sun, 30 Mar 2025 23:51:52 +1100 Subject: [PATCH 4/9] Update Lib/asyncio/base_events.py Co-authored-by: Peter Bierma --- Lib/asyncio/base_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 5702207cdb397d..e3e486b995525f 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -366,7 +366,7 @@ def sockets(self): return tuple(trsock.TransportSocket(s) for s in self._sockets) def close(self): - if self._state == _ServerState.CLOSED or self._state == _ServerState.SHUTDOWN: + if self._state in {_ServerState.CLOSED, _ServerState.SHUTDOWN}: return else: self._state = _ServerState.CLOSED From 44d24fb342a1ae4523d8d9b532a73deec6e2fde4 Mon Sep 17 00:00:00 2001 From: Jamie Phan Date: Sun, 30 Mar 2025 23:52:04 +1100 Subject: [PATCH 5/9] Update Lib/asyncio/base_events.py Co-authored-by: Peter Bierma --- Lib/asyncio/base_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index e3e486b995525f..ed00a02d071b50 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -344,7 +344,7 @@ def _start_serving(self): elif self._state == _ServerState.SERVING: return else: - raise RuntimeError(f'server {self!r} is closed') + raise RuntimeError(f'server {self!r} was already started and then closed') for sock in self._sockets: sock.listen(self._backlog) From 07129e52a61086efcce625b8f3685d920d03c08f Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Sun, 30 Mar 2025 23:58:56 +1100 Subject: [PATCH 6/9] fix: rename wakeup to shutdown and states --- Lib/asyncio/base_events.py | 14 +++++++------- Lib/test/test_asyncio/test_server.py | 6 +++--- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index ed00a02d071b50..58f0e9f6684a36 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -276,7 +276,7 @@ async def restore(self): class _ServerState(enum.Enum): """This tracks the state of Server. - -[in]->INITIALIZED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN + -[in]->NOT_STARTED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN - in: Server.__init__() - ss: Server._start_serving() @@ -284,7 +284,7 @@ class _ServerState(enum.Enum): - wk: Server._wakeup() *only called if number of clients == 0 """ - INITIALIZED = "initialized" + NOT_STARTED = "not_started" SERVING = "serving" CLOSED = "closed" SHUTDOWN = "shutdown" @@ -305,7 +305,7 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, self._ssl_context = ssl_context self._ssl_handshake_timeout = ssl_handshake_timeout self._ssl_shutdown_timeout = ssl_shutdown_timeout - self._state = _ServerState.INITIALIZED + self._state = _ServerState.NOT_STARTED self._serving_forever_fut = None def __repr__(self): @@ -319,9 +319,9 @@ def _attach(self, transport): def _detach(self, transport): self._clients.discard(transport) if self._state == _ServerState.CLOSED and len(self._clients) == 0: - self._wakeup() + self._shutdown() - def _wakeup(self): + def _shutdown(self): if self._state == _ServerState.CLOSED: self._state = _ServerState.SHUTDOWN elif self._state == _ServerState.SHUTDOWN: @@ -339,7 +339,7 @@ def _wakeup(self): waiter.set_result(None) def _start_serving(self): - if self._state == _ServerState.INITIALIZED: + if self._state == _ServerState.NOT_STARTED: self._state = _ServerState.SERVING elif self._state == _ServerState.SERVING: return @@ -385,7 +385,7 @@ def close(self): self._serving_forever_fut = None if len(self._clients) == 0: - self._wakeup() + self._shutdown() def close_clients(self): for transport in self._clients.copy(): diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py index 1c90cf5de36df4..87a59f6470cf44 100644 --- a/Lib/test/test_asyncio/test_server.py +++ b/Lib/test/test_asyncio/test_server.py @@ -66,7 +66,7 @@ async def main(srv): self.assertIsNone(srv._waiters) self.assertFalse(srv.is_serving()) - with self.assertRaisesRegex(RuntimeError, r'is closed'): + with self.assertRaisesRegex(RuntimeError, r'started and then closed'): self.loop.run_until_complete(srv.serve_forever()) @@ -119,7 +119,7 @@ async def main(srv): self.assertIsNone(srv._waiters) self.assertFalse(srv.is_serving()) - with self.assertRaisesRegex(RuntimeError, r'is closed'): + with self.assertRaisesRegex(RuntimeError, r'started and then closed'): self.loop.run_until_complete(srv.serve_forever()) @@ -215,7 +215,7 @@ async def serve(rd, wr): await asyncio.sleep(0) self.assertTrue(task.done()) - with self.assertRaisesRegex(RuntimeError, r'is closed'): + with self.assertRaisesRegex(RuntimeError, r'started and then closed'): await srv.start_serving() async def test_abort_clients(self): From 48a3c0de704df66dba63c94851979662a2a204d6 Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Mon, 31 Mar 2025 00:18:30 +1100 Subject: [PATCH 7/9] fix: Add try-catch for close to recover state on fail --- Lib/asyncio/base_events.py | 30 +++++++++++++++++------------- 1 file changed, 17 insertions(+), 13 deletions(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 58f0e9f6684a36..7384d947e4cba9 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -368,24 +368,28 @@ def sockets(self): def close(self): if self._state in {_ServerState.CLOSED, _ServerState.SHUTDOWN}: return - else: + + prev_state = self._state + try: self._state = _ServerState.CLOSED - sockets = self._sockets - if sockets is None: - return - self._sockets = None + sockets = self._sockets + if sockets is None: + return + self._sockets = None - for sock in sockets: - self._loop._stop_serving(sock) + for sock in sockets: + self._loop._stop_serving(sock) - if (self._serving_forever_fut is not None and - not self._serving_forever_fut.done()): - self._serving_forever_fut.cancel() - self._serving_forever_fut = None + if (self._serving_forever_fut is not None and + not self._serving_forever_fut.done()): + self._serving_forever_fut.cancel() + self._serving_forever_fut = None - if len(self._clients) == 0: - self._shutdown() + if len(self._clients) == 0: + self._shutdown() + except: + self._state = prev_state def close_clients(self): for transport in self._clients.copy(): From 5832655847b45bd0d0aa399dcc81825132ed2be7 Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Mon, 31 Mar 2025 00:29:35 +1100 Subject: [PATCH 8/9] fix: do not abort client conn transport on close --- Lib/asyncio/proactor_events.py | 8 ++------ Lib/asyncio/selector_events.py | 8 ++------ Lib/test/test_asyncio/test_server.py | 23 ----------------------- 3 files changed, 4 insertions(+), 35 deletions(-) diff --git a/Lib/asyncio/proactor_events.py b/Lib/asyncio/proactor_events.py index b52827216016e3..159360c0d32680 100644 --- a/Lib/asyncio/proactor_events.py +++ b/Lib/asyncio/proactor_events.py @@ -62,12 +62,8 @@ def __init__(self, loop, sock, protocol, waiter=None, self._closing = False # Set when close() called. self._called_connection_lost = False self._eof_written = False - if self._server is not None: - if self._server.is_serving(): - self._server._attach(self) - else: - self.abort() - return + if self._server is not None and self._server.is_serving(): + self._server._attach(self) self._loop.call_soon(self._protocol.connection_made, self) if waiter is not None: # only wake up the waiter when connection_made() has been called diff --git a/Lib/asyncio/selector_events.py b/Lib/asyncio/selector_events.py index c947aaf59178fc..f967dd6a03c99e 100644 --- a/Lib/asyncio/selector_events.py +++ b/Lib/asyncio/selector_events.py @@ -794,12 +794,8 @@ def __init__(self, loop, sock, protocol, extra=None, server=None): self._closing = False # Set when close() called. self._paused = False # Set when pause_reading() called - if self._server is not None: - if self._server.is_serving(): - self._server._attach(self) - else: - self.abort() - return + if self._server is not None and self._server.is_serving(): + self._server._attach(self) loop._transports[self._sock_fd] = self diff --git a/Lib/test/test_asyncio/test_server.py b/Lib/test/test_asyncio/test_server.py index 87a59f6470cf44..4daa7d2ce70be3 100644 --- a/Lib/test/test_asyncio/test_server.py +++ b/Lib/test/test_asyncio/test_server.py @@ -272,29 +272,6 @@ async def serve(rd, wr): await asyncio.sleep(0) self.assertTrue(task.done()) - async def test_close_before_transport_attach(self): - proto = Mock() - loop = asyncio.get_running_loop() - srv = await loop.create_server(lambda *_: proto, socket_helper.HOSTv4, 0) - - await srv.start_serving() - addr = srv.sockets[0].getsockname() - - # Create a connection to the server but close the server before the - # socket transport for the connection is created and attached - with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: - s.connect(addr) - await asyncio.sleep(0) # loop select reader - await asyncio.sleep(0) # accept conn 1 - srv.close() - - # Ensure the protocol is given an opportunity to handle this event - # gh109564: the transport would be unclosed and will cause a loop - # exception due to a double-call to Server._wakeup - await asyncio.sleep(0) - await asyncio.sleep(0) - proto.connection_lost.assert_called() - # Test the various corner cases of Unix server socket removal class UnixServerCleanupTests(unittest.IsolatedAsyncioTestCase): From 7f3481bfb9bd7a082a537c6e4a4112e39536d1af Mon Sep 17 00:00:00 2001 From: ordinary-jamie <101677823+ordinary-jamie@users.noreply.github.com> Date: Mon, 31 Mar 2025 00:38:35 +1100 Subject: [PATCH 9/9] fix: rename error --- Lib/asyncio/base_events.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/asyncio/base_events.py b/Lib/asyncio/base_events.py index 7384d947e4cba9..a80b25ad657107 100644 --- a/Lib/asyncio/base_events.py +++ b/Lib/asyncio/base_events.py @@ -330,7 +330,7 @@ def _shutdown(self): # Server._detach() by the last connected client. return else: - raise RuntimeError(f"server {self!r} can only wakeup waiters after closing") + raise RuntimeError(f"server {self!r} must be closed before shutdown") waiters = self._waiters self._waiters = None