Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGES/9600.breaking.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Improved performance of the connector when a connection can be reused -- by :user:`bdraco`.

If ``BaseConnector.connect`` has sub-classed and replaced with custom logic, the ``ceil_timeout`` must be added.
11 changes: 3 additions & 8 deletions aiohttp/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
_SENTINEL,
BasicAuth,
TimeoutHandle,
ceil_timeout,
get_env_proxy_for_url,
method_must_be_empty_body,
sentinel,
Expand Down Expand Up @@ -634,13 +633,9 @@ async def _request(

# connection timeout
try:
async with ceil_timeout(
real_timeout.connect,
ceil_threshold=real_timeout.ceil_threshold,
):
conn = await self._connector.connect(
req, traces=traces, timeout=real_timeout
)
conn = await self._connector.connect(
req, traces=traces, timeout=real_timeout
)
except asyncio.TimeoutError as exc:
raise ConnectionTimeoutError(
f"Connection timeout to host {url}"
Expand Down
157 changes: 93 additions & 64 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,83 +493,112 @@ async def connect(
"""Get from pool or create new connection."""
key = req.connection_key
available = self._available_connections(key)

# Wait if there are no available connections or if there are/were
# waiters (i.e. don't steal connection from a waiter about to wake up)
if available <= 0 or key in self._waiters:
fut: asyncio.Future[None] = self._loop.create_future()

# This connection will now count towards the limit.
self._waiters[key].append(fut)

wait_for_conn = available <= 0 or key in self._waiters
if not wait_for_conn and (proto := self._get(key)):
# If we do not have to wait and we can get a connection from the pool
# we can avoid the timeout ceil logic and directly return the connection
if traces:
for trace in traces:
await trace.send_connection_queued_start()
await self._send_connect_reuseconn(key, traces)
return self._acquired_connection(proto, key)

try:
await fut
except BaseException as e:
if key in self._waiters:
# remove a waiter even if it was cancelled, normally it's
# removed when it's notified
try:
self._waiters[key].remove(fut)
except ValueError: # fut may no longer be in list
pass

raise e
finally:
if key in self._waiters and not self._waiters[key]:
del self._waiters[key]
async with ceil_timeout(
timeout.connect,
ceil_threshold=timeout.ceil_threshold,
):
# Wait if there are no available connections or if there are/were
# waiters (i.e. don't steal connection from a waiter about to wake up)
if wait_for_conn:
await self._wait_for_available_connection(key, traces)
proto = self._get(key)

if traces:
for trace in traces:
await trace.send_connection_queued_end()
if proto is None:
placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
self._acquired.add(placeholder)
self._acquired_per_host[key].add(placeholder)

proto = self._get(key)
if proto is None:
placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
self._acquired.add(placeholder)
self._acquired_per_host[key].add(placeholder)
if traces:
for trace in traces:
await trace.send_connection_create_start()

if traces:
for trace in traces:
await trace.send_connection_create_start()
try:
proto = await self._create_connection(req, traces, timeout)
if self._closed:
proto.close()
raise ClientConnectionError("Connector is closed.")
except BaseException:
if not self._closed:
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)
self._release_waiter()
raise
else:
if not self._closed:
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)

try:
proto = await self._create_connection(req, traces, timeout)
if self._closed:
proto.close()
raise ClientConnectionError("Connector is closed.")
except BaseException:
if not self._closed:
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)
self._release_waiter()
raise
if traces:
for trace in traces:
await trace.send_connection_create_end()
else:
if not self._closed:
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)
if traces:
await self._send_connect_reuseconn(key, traces)

if traces:
for trace in traces:
await trace.send_connection_create_end()
else:
if traces:
# Acquire the connection to prevent race conditions with limits
placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
self._acquired.add(placeholder)
self._acquired_per_host[key].add(placeholder)
for trace in traces:
await trace.send_connection_reuseconn()
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)
return self._acquired_connection(proto, key)

def _acquired_connection(
self, proto: ResponseHandler, key: "ConnectionKey"
) -> Connection:
"""Mark proto as acquired and wrap it in a Connection object."""
self._acquired.add(proto)
self._acquired_per_host[key].add(proto)
return Connection(self, key, proto, self._loop)

async def _wait_for_available_connection(
self, key: "ConnectionKey", traces: List["Trace"]
) -> None:
"""Wait until there is an available connection."""
fut: asyncio.Future[None] = self._loop.create_future()

# This connection will now count towards the limit.
self._waiters[key].append(fut)

if traces:
for trace in traces:
await trace.send_connection_queued_start()

try:
await fut
except BaseException as e:
if key in self._waiters:
# remove a waiter even if it was cancelled, normally it's
# removed when it's notified
try:
self._waiters[key].remove(fut)
except ValueError: # fut may no longer be in list
pass

raise e
finally:
if key in self._waiters and not self._waiters[key]:
del self._waiters[key]

if traces:
for trace in traces:
await trace.send_connection_queued_end()

async def _send_connect_reuseconn(
self, key: "ConnectionKey", traces: List["Trace"]
) -> None:
"""Send tracing events for reusing a connection."""
# Acquire the connection to prevent race conditions with limits
placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop))
self._acquired.add(placeholder)
self._acquired_per_host[key].add(placeholder)
for trace in traces:
await trace.send_connection_reuseconn()
self._acquired.remove(placeholder)
self._drop_acquired_per_host(key, placeholder)

def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]:
try:
conns = self._conns[key]
Expand Down