-
-
Notifications
You must be signed in to change notification settings - Fork 33.9k
gh-113538: Allow client connections to be closed #114432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 9 commits
9f0111e
c25eabb
b7fa198
b3cd9c1
c78a927
1ec06da
2790ddf
6a56a80
6929888
8316199
3e1705b
6c078d6
1158151
1065dda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -277,7 +277,8 @@ def __init__(self, loop, sockets, protocol_factory, ssl_context, backlog, | |||||
| ssl_handshake_timeout, ssl_shutdown_timeout=None): | ||||||
| self._loop = loop | ||||||
| self._sockets = sockets | ||||||
| self._active_count = 0 | ||||||
| # Weak references so abandoned transports can be detected | ||||||
|
||||||
| # Weak references so abandoned transports can be detected | |
| # Weak references so abandoned transports can be ignored |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The wording here was intentional. Weak references is to my knowledge the only way to detect abandoned objects. But it's not this code that does that detection, so I can understand the confusion. How about:
Weak references so we don't break Transport's ability to detect abandoned transports
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're thinking from the POV of the transport, whose __del__ must be called to "detect" (i.e., warn about) that it was abandoned. I was thinking from the POV of the loop in close_clients(), where we want to ignore (not encounter) transports that have been closed already.
I'll make it your choice which wording to use.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,8 +125,12 @@ async def main(srv): | |
| class TestServer2(unittest.IsolatedAsyncioTestCase): | ||
|
|
||
| async def test_wait_closed_basic(self): | ||
| async def serve(*args): | ||
| pass | ||
| async def serve(rd, wr): | ||
| try: | ||
| await rd.read() | ||
| finally: | ||
| wr.close() | ||
| await wr.wait_closed() | ||
|
|
||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
@@ -137,7 +141,8 @@ async def serve(*args): | |
| self.assertFalse(task1.done()) | ||
|
|
||
| # active count != 0, not closed: should block | ||
| srv._attach() | ||
| addr = srv.sockets[0].getsockname() | ||
| (rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| task2 = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task1.done()) | ||
|
|
@@ -152,7 +157,8 @@ async def serve(*args): | |
| self.assertFalse(task2.done()) | ||
| self.assertFalse(task3.done()) | ||
|
|
||
| srv._detach() | ||
| wr.close() | ||
| await wr.wait_closed() | ||
| # active count == 0, closed: should unblock | ||
| await task1 | ||
| await task2 | ||
|
|
@@ -161,22 +167,85 @@ async def serve(*args): | |
|
|
||
| async def test_wait_closed_race(self): | ||
| # Test a regression in 3.12.0, should be fixed in 3.12.1 | ||
| async def serve(*args): | ||
| pass | ||
| async def serve(rd, wr): | ||
| try: | ||
| await rd.read() | ||
| finally: | ||
| wr.close() | ||
| await wr.wait_closed() | ||
|
|
||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
||
| task = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task.done()) | ||
| srv._attach() | ||
| addr = srv.sockets[0].getsockname() | ||
| (rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
| loop = asyncio.get_running_loop() | ||
| loop.call_soon(srv.close) | ||
| loop.call_soon(srv._detach) | ||
| loop.call_soon(wr.close) | ||
| await srv.wait_closed() | ||
|
|
||
| async def test_close_clients(self): | ||
| async def serve(rd, wr): | ||
| try: | ||
| await rd.read() | ||
| finally: | ||
| wr.close() | ||
| await wr.wait_closed() | ||
|
|
||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
||
| addr = srv.sockets[0].getsockname() | ||
| (rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
| self.addCleanup(wr.close) | ||
|
|
||
| task = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task.done()) | ||
|
|
||
| srv.close() | ||
| srv.close_clients() | ||
| await asyncio.sleep(0.1) # FIXME: flush call_soon()? | ||
|
||
| self.assertTrue(task.done()) | ||
|
|
||
| async def test_abort_clients(self): | ||
| async def serve(rd, wr): | ||
| nonlocal s_rd, s_wr | ||
| s_rd = rd | ||
| s_wr = wr | ||
| await wr.wait_closed() | ||
|
|
||
| s_rd = s_wr = None | ||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
||
| addr = srv.sockets[0].getsockname() | ||
| (c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
| self.addCleanup(c_wr.close) | ||
|
|
||
| # Make sure both sides are in a paused state | ||
| while (s_wr.transport.get_write_buffer_size() == 0 or | ||
| c_wr.transport.is_reading()): | ||
| while s_wr.transport.get_write_buffer_size() == 0: | ||
| s_wr.write(b'a' * 65536) | ||
| await asyncio.sleep(0) | ||
| await asyncio.sleep(0.1) # FIXME: More socket buffer space magically appears? | ||
|
||
|
|
||
| task = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task.done()) | ||
|
|
||
| # Sanity check | ||
| self.assertNotEqual(s_wr.transport.get_write_buffer_size(), 0) | ||
| self.assertFalse(c_wr.transport.is_reading()) | ||
|
|
||
| srv.close() | ||
| srv.abort_clients() | ||
| await asyncio.sleep(0.1) # FIXME: flush call_soon()? | ||
|
||
| self.assertTrue(task.done()) | ||
|
|
||
|
|
||
| # Test the various corner cases of Unix server socket removal | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| Add :meth:`asyncio.Server.close_clients` and | ||
| :meth:`asyncio.Server.abort_clients` methods which allow to more forcefully | ||
| close an asyncio server. |
Uh oh!
There was an error while loading. Please reload this page.