Skip to content

Commit 37cd4a8

Browse files
Add output_size property (#12452)
1 parent 3c60849 commit 37cd4a8

8 files changed

Lines changed: 482 additions & 5 deletions

CHANGES/12452.feature.rst

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
Added :attr:`~aiohttp.ClientResponse.output_size` and
2+
:attr:`~aiohttp.ClientResponse.upload_complete` -- by :user:`Dreamsorcerer`.

aiohttp/client_reqrep.py

Lines changed: 49 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,9 @@ class ClientResponse(HeadersMixin):
212212
_resolve_charset: Callable[["ClientResponse", bytes], str] = lambda *_: "utf-8"
213213

214214
__writer: asyncio.Task[None] | None = None
215+
_stream_writer: AbstractStreamWriter | None = None
216+
_output_size: int = 0
217+
_upload_complete: asyncio.Future[None] | None = None
215218

216219
def __init__(
217220
self,
@@ -226,6 +229,7 @@ def __init__(
226229
session: "ClientSession | None",
227230
request_headers: CIMultiDict[str],
228231
original_url: URL,
232+
stream_writer: AbstractStreamWriter,
229233
**kwargs: object,
230234
) -> None:
231235
# kwargs exists so authors of subclasses should expect to pass through unknown
@@ -240,7 +244,10 @@ def __init__(
240244

241245
self._real_url = url
242246
self._url = url.with_fragment(None) if url.raw_fragment else url
243-
if writer is not None:
247+
if writer is None: # Request already sent
248+
self._output_size = stream_writer.output_size
249+
else:
250+
self._stream_writer = stream_writer
244251
self._writer = writer
245252
if continue100 is not None:
246253
self._continue = continue100
@@ -261,6 +268,11 @@ def __init__(
261268

262269
def __reset_writer(self, _: object = None) -> None:
263270
self.__writer = None
271+
if self._stream_writer is not None:
272+
self._output_size = self._stream_writer.output_size
273+
self._stream_writer = None
274+
if self._upload_complete is not None and not self._upload_complete.done():
275+
self._upload_complete.set_result(None)
264276

265277
@property
266278
def _writer(self) -> asyncio.Task[None] | None:
@@ -281,10 +293,29 @@ def _writer(self, writer: asyncio.Task[None] | None) -> None:
281293
return
282294
if writer.done():
283295
# The writer is already done, so we can clear it immediately.
284-
self.__writer = None
296+
self.__reset_writer()
285297
else:
286298
writer.add_done_callback(self.__reset_writer)
287299

300+
@property
301+
def output_size(self) -> int:
302+
"""Number of bytes sent for this request."""
303+
if self._stream_writer is not None:
304+
return self._stream_writer.output_size
305+
return self._output_size
306+
307+
@property
308+
def upload_complete(self) -> "asyncio.Future[None]":
309+
"""Future set when the request body has been fully sent.
310+
311+
Already done when the request had no body or was written eagerly.
312+
"""
313+
if self._upload_complete is None:
314+
self._upload_complete = self._loop.create_future()
315+
if self._stream_writer is None: # upload already finished
316+
self._upload_complete.set_result(None)
317+
return self._upload_complete
318+
288319
@property
289320
def cookies(self) -> SimpleCookie:
290321
if self._cookies is None:
@@ -558,6 +589,9 @@ async def _wait_released(self) -> None:
558589
def _cleanup_writer(self) -> None:
559590
if self.__writer is not None:
560591
self.__writer.cancel()
592+
if self._stream_writer is not None:
593+
self._output_size = self._stream_writer.output_size
594+
self._stream_writer = None
561595
self._session = None
562596

563597
def _notify_content(self) -> None:
@@ -800,7 +834,11 @@ def _update_headers(self, headers: CIMultiDict[str]) -> None:
800834
self.headers[hdrs.HOST] = headers.pop(hdrs.HOST, host)
801835
self.headers.extend(headers)
802836

803-
def _create_response(self, task: asyncio.Task[None] | None) -> ClientResponse:
837+
def _create_response(
838+
self,
839+
task: asyncio.Task[None] | None,
840+
stream_writer: AbstractStreamWriter,
841+
) -> ClientResponse:
804842
return self.response_class(
805843
self.method,
806844
self.original_url,
@@ -812,6 +850,7 @@ def _create_response(self, task: asyncio.Task[None] | None) -> ClientResponse:
812850
session=None,
813851
request_headers=self.headers,
814852
original_url=self.original_url,
853+
stream_writer=stream_writer,
815854
)
816855

817856
def _create_writer(self, protocol: BaseProtocol) -> StreamWriter:
@@ -885,7 +924,7 @@ async def _send(self, conn: "Connection") -> ClientResponse:
885924
protocol.start_timeout()
886925
writer.set_eof()
887926
task = None
888-
self._response = self._create_response(task)
927+
self._response = self._create_response(task, stream_writer=writer)
889928
return self._response
890929

891930
async def _write_bytes(
@@ -1261,7 +1300,11 @@ def _update_proxy(
12611300
self.proxy = proxy
12621301
self.proxy_headers = proxy_headers
12631302

1264-
def _create_response(self, task: asyncio.Task[None] | None) -> ClientResponse:
1303+
def _create_response(
1304+
self,
1305+
task: asyncio.Task[None] | None,
1306+
stream_writer: AbstractStreamWriter,
1307+
) -> ClientResponse:
12651308
return self.response_class(
12661309
self.method,
12671310
self.original_url,
@@ -1273,6 +1316,7 @@ def _create_response(self, task: asyncio.Task[None] | None) -> ClientResponse:
12731316
session=self._session,
12741317
request_headers=self.headers,
12751318
original_url=self.original_url,
1319+
stream_writer=stream_writer,
12761320
)
12771321

12781322
def _create_writer(self, protocol: BaseProtocol) -> StreamWriter:

docs/client_reference.rst

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1538,6 +1538,30 @@ Response object
15381538

15391539
.. versionadded:: 3.2
15401540

1541+
.. attribute:: output_size
1542+
1543+
Number of bytes sent for this request.
1544+
1545+
Pair with :attr:`upload_complete` to display upload progress::
1546+
1547+
async with session.post(url, data=mpwriter) as resp:
1548+
while not resp.upload_complete.done():
1549+
print(f"uploaded {resp.output_size} bytes")
1550+
await asyncio.sleep(0.5)
1551+
print(f"upload complete: {resp.output_size} bytes")
1552+
1553+
.. versionadded:: 3.14
1554+
1555+
.. attribute:: upload_complete
1556+
1557+
An :class:`asyncio.Future` set when the request body has been fully sent.
1558+
1559+
Use ``await resp.upload_complete`` to block until the upload finishes, or
1560+
``resp.upload_complete.done()`` to poll from a progress-sampling loop
1561+
(see :attr:`output_size`).
1562+
1563+
.. versionadded:: 3.14
1564+
15411565
.. attribute:: content_type
15421566

15431567
Read-only property with *content* part of *Content-Type* header.

tests/test_client_functional.py

Lines changed: 170 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5746,3 +5746,173 @@ async def handler(request: web.Request) -> web.Response:
57465746
data = await resp.content.read()
57475747
assert resp.content.total_raw_bytes == len(data)
57485748
assert resp.content.total_raw_bytes == int(resp.headers["Content-Length"])
5749+
5750+
5751+
async def test_output_size_bytes(aiohttp_client: AiohttpClient) -> None:
5752+
async def handler(request: web.Request) -> web.Response:
5753+
await request.read()
5754+
return web.Response()
5755+
5756+
app = web.Application()
5757+
app.router.add_post("/", handler)
5758+
client = await aiohttp_client(app)
5759+
5760+
body = b"x" * 1024
5761+
async with client.post("/", data=body) as resp:
5762+
assert resp.output_size >= len(body)
5763+
5764+
5765+
async def test_output_size_multipart(aiohttp_client: AiohttpClient) -> None:
5766+
async def handler(request: web.Request) -> web.Response:
5767+
await request.read()
5768+
return web.Response()
5769+
5770+
app = web.Application()
5771+
app.router.add_post("/", handler)
5772+
client = await aiohttp_client(app)
5773+
5774+
mpwriter = aiohttp.MultipartWriter("form-data")
5775+
mpwriter.append(b"x" * 4096)
5776+
mpwriter.append(b"y" * 2048)
5777+
expected_body_size = mpwriter.size
5778+
assert expected_body_size is not None
5779+
5780+
async with client.post("/", data=mpwriter) as resp:
5781+
assert resp.output_size >= expected_body_size
5782+
5783+
5784+
async def test_output_size_keepalive_isolated(
5785+
aiohttp_client: AiohttpClient,
5786+
) -> None:
5787+
"""Each request on a keep-alive connection has its own counter."""
5788+
transports: set[object] = set()
5789+
5790+
async def handler(request: web.Request) -> web.Response:
5791+
transports.add(request.transport)
5792+
await request.read()
5793+
return web.Response()
5794+
5795+
app = web.Application()
5796+
app.router.add_post("/", handler)
5797+
connector = aiohttp.TCPConnector(limit=1, force_close=False)
5798+
client = await aiohttp_client(app, connector=connector)
5799+
body = b"x" * 65536
5800+
5801+
async with client.post("/", data=body) as resp1:
5802+
size1 = resp1.output_size
5803+
5804+
async with client.post("/", data=body) as resp2:
5805+
size2 = resp2.output_size
5806+
5807+
assert len(transports) == 1 # Check keep-alive worked.
5808+
assert size1 >= len(body)
5809+
assert size1 == size2
5810+
5811+
5812+
async def test_output_size_progress(aiohttp_client: AiohttpClient) -> None:
5813+
"""output_size advances by exactly one chunk per yield."""
5814+
5815+
async def handler(request: web.Request) -> web.StreamResponse:
5816+
response = web.StreamResponse()
5817+
await response.prepare(request)
5818+
# Flush headers + a chunk so resp.start() returns on the client
5819+
# side before we read the body.
5820+
await response.write(b"x")
5821+
await request.read()
5822+
return response
5823+
5824+
app = web.Application()
5825+
app.router.add_post("/", handler)
5826+
client = await aiohttp_client(app)
5827+
5828+
chunk_size = 4096
5829+
chunk = b"z" * chunk_size
5830+
num_chunks = 8
5831+
sample_taken = asyncio.Event()
5832+
next_chunk = asyncio.Event()
5833+
5834+
async def gated_body() -> AsyncIterator[bytes]:
5835+
for _ in range(num_chunks):
5836+
yield chunk
5837+
sample_taken.clear()
5838+
next_chunk.set()
5839+
await sample_taken.wait()
5840+
5841+
async with client.post("/", data=gated_body()) as resp:
5842+
samples: list[int] = []
5843+
for _ in range(num_chunks):
5844+
await next_chunk.wait()
5845+
next_chunk.clear()
5846+
samples.append(resp.output_size)
5847+
assert not resp.upload_complete.done()
5848+
sample_taken.set()
5849+
await resp.upload_complete
5850+
assert resp.upload_complete.done()
5851+
await resp.read()
5852+
5853+
# Each sample after the first reflects exactly one more chunk on the wire.
5854+
chunked_framing = len(f"{chunk_size:x}".encode()) + 4
5855+
deltas = [samples[i] - samples[i - 1] for i in range(1, len(samples))]
5856+
assert deltas == [chunk_size + chunked_framing] * (num_chunks - 1)
5857+
5858+
5859+
async def test_output_size_get_request(aiohttp_client: AiohttpClient) -> None:
5860+
"""GET request with no body still reports the request header byte count."""
5861+
5862+
async def handler(request: web.Request) -> web.Response:
5863+
return web.Response()
5864+
5865+
app = web.Application()
5866+
app.router.add_get("/", handler)
5867+
client = await aiohttp_client(app)
5868+
5869+
async with client.get("/") as resp:
5870+
assert resp.output_size >= 0
5871+
5872+
5873+
async def test_output_size_writer_released(aiohttp_client: AiohttpClient) -> None:
5874+
"""Writer is dropped once body upload completes; output_size survives."""
5875+
5876+
async def handler(request: web.Request) -> web.Response:
5877+
await request.read()
5878+
return web.Response()
5879+
5880+
app = web.Application()
5881+
app.router.add_post("/", handler)
5882+
client = await aiohttp_client(app)
5883+
5884+
body = b"x" * 1024
5885+
async with client.post("/", data=body) as resp:
5886+
await resp.read()
5887+
assert resp._stream_writer is None
5888+
assert resp.output_size >= len(body)
5889+
5890+
5891+
async def test_upload_complete_no_body(aiohttp_client: AiohttpClient) -> None:
5892+
async def handler(request: web.Request) -> web.Response:
5893+
return web.Response()
5894+
5895+
app = web.Application()
5896+
app.router.add_get("/", handler)
5897+
client = await aiohttp_client(app)
5898+
5899+
async with client.get("/") as resp:
5900+
assert resp.upload_complete.done()
5901+
5902+
5903+
async def test_upload_complete_late_access(aiohttp_client: AiohttpClient) -> None:
5904+
"""Accessing upload_complete after the upload finished returns a done future."""
5905+
5906+
async def handler(request: web.Request) -> web.Response:
5907+
await request.read()
5908+
return web.Response()
5909+
5910+
app = web.Application()
5911+
app.router.add_post("/", handler)
5912+
client = await aiohttp_client(app)
5913+
5914+
async with client.post("/", data=b"x" * 1024) as resp:
5915+
await resp.read()
5916+
# Writer task is done; future is created lazily on this first access.
5917+
assert resp._upload_complete is None
5918+
assert resp.upload_complete.done()

tests/test_client_proto.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
from yarl import URL
77

88
from aiohttp import http
9+
from aiohttp.abc import AbstractStreamWriter
910
from aiohttp.client_exceptions import ClientOSError, ServerDisconnectedError
1011
from aiohttp.client_proto import ResponseHandler
1112
from aiohttp.client_reqrep import ClientResponse
@@ -122,6 +123,9 @@ async def test_multiple_responses_one_byte_at_a_time() -> None:
122123
session=mock.Mock(),
123124
request_headers=CIMultiDict[str](),
124125
original_url=url,
126+
stream_writer=mock.create_autospec(
127+
AbstractStreamWriter, spec_set=True, instance=True
128+
),
125129
)
126130
await response.start(conn)
127131
await response.read() == payload
@@ -153,6 +157,9 @@ class PatchableHttpResponseParser(http.HttpResponseParser):
153157
session=mock.Mock(),
154158
request_headers=CIMultiDict[str](),
155159
original_url=url,
160+
stream_writer=mock.create_autospec(
161+
AbstractStreamWriter, spec_set=True, instance=True
162+
),
156163
)
157164
await response.start(conn)
158165
await response.read() == b"ab"
@@ -184,6 +191,9 @@ async def test_client_protocol_readuntil_eof() -> None:
184191
session=mock.Mock(),
185192
request_headers=CIMultiDict[str](),
186193
original_url=url,
194+
stream_writer=mock.create_autospec(
195+
AbstractStreamWriter, spec_set=True, instance=True
196+
),
187197
)
188198
proto.set_response_params(read_until_eof=True)
189199
await response.start(conn)

0 commit comments

Comments
 (0)