Skip to content

GH-109564: add asyncio.Server state machine #131009

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 50 additions & 12 deletions Lib/asyncio/base_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import collections
import collections.abc
import concurrent.futures
import enum
import errno
import heapq
import itertools
Expand Down Expand Up @@ -272,6 +273,23 @@ async def restore(self):
self._proto.resume_writing()


class _ServerState(enum.Enum):
"""This tracks the state of Server.

-[in]->INITIALIZED -[ss]-> SERVING -[cl]-> CLOSED -[wk]*-> SHUTDOWN

- in: Server.__init__()
- ss: Server._start_serving()
- cl: Server.close()
- wk: Server._wakeup() *only called if number of clients == 0
"""

INITIALIZED = "initialized"
SERVING = "serving"
CLOSED = "closed"
SHUTDOWN = "shutdown"
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about the name SHUTDOWN here, but needed something more "definitive" than closed.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we do use SHUTDOWN should I also rename Server._wakeup -> Server._shutdown?

I think that makes sense.

I'm more worried about INITIALIZED here; nothing is really initialized, other than the Python object, which isn't particularly useful knowledge. Let's call it something like NOT_SERVING or NOT_YET_STARTED.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 07129e5



class Server(events.AbstractServer):

def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
Expand All @@ -287,32 +305,49 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog,
self._ssl_context = ssl_context
self._ssl_handshake_timeout = ssl_handshake_timeout
self._ssl_shutdown_timeout = ssl_shutdown_timeout
self._serving = False
self._state = _ServerState.INITIALIZED
self._serving_forever_fut = None

def __repr__(self):
return f'<{self.__class__.__name__} sockets={self.sockets!r}>'

def _attach(self, transport):
assert self._sockets is not None
if self._state != _ServerState.SERVING:
raise RuntimeError("server is not serving, cannot attach transport")
self._clients.add(transport)

def _detach(self, transport):
self._clients.discard(transport)
if len(self._clients) == 0 and self._sockets is None:
if self._state == _ServerState.CLOSED and len(self._clients) == 0:
self._wakeup()

def _wakeup(self):
match self._state:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if PEP 636 match-case statements are permitted; can change to if-else statement if needed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They're permitted, but it's not particularly useful to use match here (no pattern is being matched).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's stick to if else as i am not so used to match case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 8e409b7

case _ServerState.SHUTDOWN:
# gh109564: the wakeup method has two possible call-sites,
# through an explicit call Server.close(), or indirectly through
# Server._detach() by the last connected client.
return
case _ServerState.INITIALIZED | _ServerState.SERVING:
raise RuntimeError("cannot wakeup server before closing")
case _ServerState.CLOSED:
self._state = _ServerState.SHUTDOWN

waiters = self._waiters
self._waiters = None
for waiter in waiters:
if not waiter.done():
waiter.set_result(None)

def _start_serving(self):
if self._serving:
return
self._serving = True
match self._state:
case _ServerState.SERVING:
return
case _ServerState.CLOSED | _ServerState.SHUTDOWN:
raise RuntimeError(f'server {self!r} is closed')
case _ServerState.INITIALIZED:
self._state = _ServerState.SERVING

for sock in self._sockets:
sock.listen(self._backlog)
self._loop._start_serving(
Expand All @@ -324,7 +359,7 @@ def get_loop(self):
return self._loop

def is_serving(self):
return self._serving
return self._state == _ServerState.SERVING

@property
def sockets(self):
Expand All @@ -333,6 +368,13 @@ def sockets(self):
return tuple(trsock.TransportSocket(s) for s in self._sockets)

def close(self):
match self._state:
case _ServerState.CLOSED | _ServerState.SHUTDOWN:
# Shutdown state can only be reached after closing.
return
case _:
self._state = _ServerState.CLOSED

sockets = self._sockets
if sockets is None:
return
Expand All @@ -341,8 +383,6 @@ def close(self):
for sock in sockets:
self._loop._stop_serving(sock)

self._serving = False

if (self._serving_forever_fut is not None and
not self._serving_forever_fut.done()):
self._serving_forever_fut.cancel()
Expand All @@ -369,8 +409,6 @@ async def serve_forever(self):
if self._serving_forever_fut is not None:
raise RuntimeError(
f'server {self!r} is already being awaited on serve_forever()')
if self._sockets is None:
raise RuntimeError(f'server {self!r} is closed')

self._start_serving()
self._serving_forever_fut = self._loop.create_future()
Expand Down Expand Up @@ -407,7 +445,7 @@ async def wait_closed(self):
# from two places: self.close() and self._detach(), but only
# when both conditions have become true. To signal that this
# has happened, self._wakeup() sets self._waiters to None.
if self._waiters is None:
if self._state == _ServerState.SHUTDOWN:
return
waiter = self._loop.create_future()
self._waiters.append(waiter)
Expand Down
6 changes: 5 additions & 1 deletion Lib/asyncio/proactor_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,11 @@ def __init__(self, loop, sock, protocol, waiter=None,
self._called_connection_lost = False
self._eof_written = False
if self._server is not None:
self._server._attach(self)
if self._server.is_serving():
self._server._attach(self)
else:
self.abort()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why abort the whole transport? This is seemingly a change in behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yup sorry, my misunderstanding! Addressed in 5832655

I commented here my original thinking:

This is a functional change over main.. In my head, if a transport was constructed with server != None, then the transport should not be created/serviced if its corresponding server was shutdown.

But I re-read the docs for Server.close and see I was thinking about it incorrectly:

https://docs.python.org/3/library/asyncio-eventloop.html#asyncio.Server.close

The sockets that represent existing incoming client connections are left open.

return
self._loop.call_soon(self._protocol.connection_made, self)
if waiter is not None:
# only wake up the waiter when connection_made() has been called
Expand Down
7 changes: 6 additions & 1 deletion Lib/asyncio/selector_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,12 @@ def __init__(self, loop, sock, protocol, extra=None, server=None):
self._paused = False # Set when pause_reading() called

if self._server is not None:
self._server._attach(self)
if self._server.is_serving():
self._server._attach(self)
else:
self.abort()
return
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a functional change over main.. In my head, if a transport was constructed with server != None, then the transport should not be created/serviced if its corresponding server was shutdown.

Used transport.abort over transport.close to force clear the buffer, since no server exists to service that.

A return statement was added to the else block to prevent the transport getting tracked in loop._transports


loop._transports[self._sock_fd] = self

def __repr__(self):
Expand Down
29 changes: 29 additions & 0 deletions Lib/test/test_asyncio/test_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import time
import threading
import unittest
from unittest.mock import Mock

from test.support import socket_helper
from test.test_asyncio import utils as test_utils
Expand Down Expand Up @@ -186,6 +187,8 @@ async def serve(rd, wr):
loop.call_soon(srv.close)
loop.call_soon(wr.close)
await srv.wait_closed()
self.assertTrue(task.done())
self.assertFalse(srv.is_serving())

async def test_close_clients(self):
async def serve(rd, wr):
Expand All @@ -212,6 +215,9 @@ async def serve(rd, wr):
await asyncio.sleep(0)
self.assertTrue(task.done())

with self.assertRaisesRegex(RuntimeError, r'is closed'):
await srv.start_serving()

async def test_abort_clients(self):
async def serve(rd, wr):
fut.set_result((rd, wr))
Expand Down Expand Up @@ -266,6 +272,29 @@ async def serve(rd, wr):
await asyncio.sleep(0)
self.assertTrue(task.done())

async def test_close_before_transport_attach(self):
proto = Mock()
loop = asyncio.get_running_loop()
srv = await loop.create_server(lambda *_: proto, socket_helper.HOSTv4, 0)

await srv.start_serving()
addr = srv.sockets[0].getsockname()

# Create a connection to the server but close the server before the
# socket transport for the connection is created and attached
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.connect(addr)
await asyncio.sleep(0) # loop select reader
await asyncio.sleep(0) # accept conn 1
srv.close()

# Ensure the protocol is given an opportunity to handle this event
# gh109564: the transport would be unclosed and will cause a loop
# exception due to a double-call to Server._wakeup
await asyncio.sleep(0)
await asyncio.sleep(0)
proto.connection_lost.assert_called()


# Test the various corner cases of Unix server socket removal
class UnixServerCleanupTests(unittest.IsolatedAsyncioTestCase):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fix race condition in :meth:`asyncio.Server.close`. Patch by Jamie Phan.
Loading