-
Notifications
You must be signed in to change notification settings - Fork 33.9k
gh-113538: Allow client connections to be closed #114432
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
9f0111e
c25eabb
b7fa198
b3cd9c1
c78a927
1ec06da
2790ddf
6a56a80
6929888
8316199
3e1705b
6c078d6
1158151
1065dda
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -125,8 +125,12 @@ async def main(srv): | |
| class TestServer2(unittest.IsolatedAsyncioTestCase): | ||
|
|
||
| async def test_wait_closed_basic(self): | ||
| async def serve(*args): | ||
| pass | ||
| async def serve(rd, wr): | ||
| try: | ||
| await rd.read() | ||
| finally: | ||
| wr.close() | ||
| await wr.wait_closed() | ||
|
|
||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
@@ -137,7 +141,8 @@ async def serve(*args): | |
| self.assertFalse(task1.done()) | ||
|
|
||
| # active count != 0, not closed: should block | ||
| srv._attach() | ||
| addr = srv.sockets[0].getsockname() | ||
| (rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
gvanrossum marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| task2 = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task1.done()) | ||
|
|
@@ -152,7 +157,8 @@ async def serve(*args): | |
| self.assertFalse(task2.done()) | ||
| self.assertFalse(task3.done()) | ||
|
|
||
| srv._detach() | ||
| wr.close() | ||
| await wr.wait_closed() | ||
| # active count == 0, closed: should unblock | ||
| await task1 | ||
| await task2 | ||
|
|
@@ -161,22 +167,93 @@ async def serve(*args): | |
|
|
||
| async def test_wait_closed_race(self): | ||
| # Test a regression in 3.12.0, should be fixed in 3.12.1 | ||
| async def serve(*args): | ||
| pass | ||
| async def serve(rd, wr): | ||
| try: | ||
| await rd.read() | ||
| finally: | ||
| wr.close() | ||
| await wr.wait_closed() | ||
|
|
||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
||
| task = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task.done()) | ||
| srv._attach() | ||
| addr = srv.sockets[0].getsockname() | ||
| (rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
| loop = asyncio.get_running_loop() | ||
| loop.call_soon(srv.close) | ||
| loop.call_soon(srv._detach) | ||
| loop.call_soon(wr.close) | ||
| await srv.wait_closed() | ||
|
|
||
| async def test_close_clients(self): | ||
| async def serve(rd, wr): | ||
| try: | ||
| await rd.read() | ||
| finally: | ||
| wr.close() | ||
| await wr.wait_closed() | ||
|
|
||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
||
| addr = srv.sockets[0].getsockname() | ||
| (rd, wr) = await asyncio.open_connection(addr[0], addr[1]) | ||
| self.addCleanup(wr.close) | ||
|
|
||
| task = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task.done()) | ||
|
|
||
| srv.close() | ||
| srv.close_clients() | ||
| await asyncio.sleep(0) | ||
| await asyncio.sleep(0) | ||
| self.assertTrue(task.done()) | ||
|
|
||
| async def test_abort_clients(self): | ||
| async def serve(rd, wr): | ||
| nonlocal s_rd, s_wr | ||
| s_rd = rd | ||
| s_wr = wr | ||
| await wr.wait_closed() | ||
|
|
||
| s_rd = s_wr = None | ||
| srv = await asyncio.start_server(serve, socket_helper.HOSTv4, 0) | ||
| self.addCleanup(srv.close) | ||
|
|
||
| addr = srv.sockets[0].getsockname() | ||
| (c_rd, c_wr) = await asyncio.open_connection(addr[0], addr[1], limit=4096) | ||
| self.addCleanup(c_wr.close) | ||
|
|
||
| # Limit the send buffer so we can reliably overfill it | ||
| s_sock = s_wr.get_extra_info('socket') | ||
| s_sock.setsockopt(socket.SOL_SOCKET, socket.SO_SNDBUF, 65536) | ||
|
|
||
| # Get the reader in to a paused state by sending more than twice | ||
| # the configured limit | ||
| s_wr.write(b'a' * 4096) | ||
| s_wr.write(b'a' * 4096) | ||
| s_wr.write(b'a' * 4096) | ||
| while c_wr.transport.is_reading(): | ||
| await asyncio.sleep(0) | ||
|
|
||
| # Get the writer in a waiting state by sending data until the | ||
| # kernel stops accepting more in to the send buffer | ||
| while s_wr.transport.get_write_buffer_size() == 0: | ||
| s_wr.write(b'a' * 4096) | ||
| await asyncio.sleep(0) | ||
|
||
|
|
||
| task = asyncio.create_task(srv.wait_closed()) | ||
| await asyncio.sleep(0) | ||
| self.assertFalse(task.done()) | ||
|
|
||
| srv.close() | ||
| srv.abort_clients() | ||
| await asyncio.sleep(0) | ||
| await asyncio.sleep(0) | ||
| self.assertTrue(task.done()) | ||
|
|
||
|
|
||
| # Test the various corner cases of Unix server socket removal | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| Add :meth:`asyncio.Server.close_clients` and | ||
| :meth:`asyncio.Server.abort_clients` methods which allow to more forcefully | ||
| close an asyncio server. |
Uh oh!
There was an error while loading. Please reload this page.