Skip to content

Commit 62adae3

Browse files
Wrap fetch methods in asyncio.to_thread for aio pandas/arrow/polars cursors
Override fetchone(), fetchmany(), fetchall(), and __anext__() in AioPandasCursor, AioArrowCursor, and AioPolarsCursor to use asyncio.to_thread(), preventing event loop blocking when chunksize triggers lazy S3 reads. This unifies the API so all aio cursors require await for fetch operations, consistent with AioCursor and AioS3FSCursor. Fixes #672 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent 4323987 commit 62adae3

File tree

7 files changed

+233
-75
lines changed

7 files changed

+233
-75
lines changed

docs/aio.md

Lines changed: 9 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -177,38 +177,19 @@ Native asyncio versions are available for all cursor types:
177177

178178
### Fetch behavior
179179

180-
For **AioPandasCursor**, **AioArrowCursor**, and **AioPolarsCursor**, the S3 download
181-
(CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
182-
By the time `execute()` returns, all data is already loaded into memory.
183-
Therefore `fetchone()`, `fetchall()`, `as_pandas()`, `as_arrow()`, and `as_polars()`
184-
are synchronous (in-memory only) and do not need `await`:
180+
All aio cursors use `await` for fetch operations. The S3 download (CSV or Parquet)
181+
happens inside `execute()`, wrapped in `asyncio.to_thread()`. Fetch methods are also
182+
wrapped in `asyncio.to_thread()` to ensure the event loop is never blocked — this is
183+
especially important when `chunksize` is set, as fetch calls trigger lazy S3 reads.
185184

186185
```python
187-
# Pandas, Arrow, Polars — S3 download completes during execute()
188-
await cursor.execute("SELECT * FROM many_rows") # Downloads data here
189-
row = cursor.fetchone() # No await — data already in memory
190-
rows = cursor.fetchall() # No await
191-
df = cursor.as_pandas() # No await
192-
```
193-
194-
The exceptions are **AioCursor** and **AioS3FSCursor**, which stream rows lazily from S3.
195-
Their fetch methods perform I/O and require `await`:
196-
197-
```python
198-
# AioCursor, AioS3FSCursor — fetch reads from S3 lazily
199186
await cursor.execute("SELECT * FROM many_rows")
200-
row = await cursor.fetchone() # Await required — reads from S3
201-
rows = await cursor.fetchall() # Await required
187+
row = await cursor.fetchone()
188+
rows = await cursor.fetchall()
189+
df = cursor.as_pandas() # In-memory conversion, no await needed
202190
```
203191

204-
```{note}
205-
When using AioPandasCursor or AioPolarsCursor with the `chunksize` option,
206-
`execute()` creates a lazy reader (e.g., pandas `TextFileReader`) instead of
207-
loading all data at once. Subsequent iteration via `as_pandas()`, `fetchone()`,
208-
or `async for` triggers chunk-by-chunk S3 reads that are **not** wrapped in
209-
`asyncio.to_thread()` and will block the event loop. If you need chunked
210-
processing in an async application, consider wrapping the iteration in
211-
`asyncio.to_thread()` yourself, or use the default non-chunked mode.
212-
```
192+
The `as_pandas()`, `as_arrow()`, and `as_polars()` convenience methods operate on
193+
already-loaded data and remain synchronous.
213194

214195
See each cursor's documentation page for detailed usage examples.

docs/arrow.md

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -487,11 +487,8 @@ cursor = connect(
487487

488488
AioArrowCursor is a native asyncio cursor that returns results as Apache Arrow Tables.
489489
Unlike AsyncArrowCursor which uses `concurrent.futures`, this cursor uses
490-
`asyncio.to_thread()` for result set creation, keeping the event loop free.
491-
492-
The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
493-
By the time `execute()` returns, all data is already loaded into memory.
494-
Therefore fetch methods, `as_arrow()`, and `as_polars()` are synchronous and do not need `await`.
490+
`asyncio.to_thread()` for both result set creation and fetch operations,
491+
keeping the event loop free.
495492

496493
```python
497494
from pyathena import aconnect
@@ -517,9 +514,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
517514
region_name="us-west-2") as conn:
518515
cursor = conn.cursor(AioArrowCursor)
519516
await cursor.execute("SELECT * FROM many_rows")
520-
print(cursor.fetchone())
521-
print(cursor.fetchmany())
522-
print(cursor.fetchall())
517+
print(await cursor.fetchone())
518+
print(await cursor.fetchmany())
519+
print(await cursor.fetchall())
523520
```
524521

525522
```python

docs/pandas.md

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -774,11 +774,8 @@ cursor = connect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
774774

775775
AioPandasCursor is a native asyncio cursor that returns results as pandas DataFrames.
776776
Unlike AsyncPandasCursor which uses `concurrent.futures`, this cursor uses
777-
`asyncio.to_thread()` for result set creation, keeping the event loop free.
778-
779-
The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
780-
By the time `execute()` returns, all data is already loaded into memory.
781-
Therefore fetch methods and `as_pandas()` are synchronous and do not need `await`.
777+
`asyncio.to_thread()` for both result set creation and fetch operations,
778+
keeping the event loop free.
782779

783780
```python
784781
from pyathena import aconnect
@@ -803,9 +800,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
803800
region_name="us-west-2") as conn:
804801
cursor = conn.cursor(AioPandasCursor)
805802
await cursor.execute("SELECT * FROM many_rows")
806-
print(cursor.fetchone())
807-
print(cursor.fetchmany())
808-
print(cursor.fetchall())
803+
print(await cursor.fetchone())
804+
print(await cursor.fetchmany())
805+
print(await cursor.fetchall())
809806
```
810807

811808
```python
@@ -833,9 +830,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
833830
df = cursor.as_pandas()
834831
```
835832

836-
```{note}
837-
When using AioPandasCursor with the `chunksize` option, `execute()` creates a lazy
838-
`TextFileReader` instead of loading all data at once. Subsequent iteration via
839-
`as_pandas()`, `fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that
840-
are not wrapped in `asyncio.to_thread()` and will block the event loop.
841-
```

docs/polars.md

Lines changed: 5 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -581,11 +581,8 @@ for chunk in result_set.iter_chunks():
581581

582582
AioPolarsCursor is a native asyncio cursor that returns results as Polars DataFrames.
583583
Unlike AsyncPolarsCursor which uses `concurrent.futures`, this cursor uses
584-
`asyncio.to_thread()` for result set creation, keeping the event loop free.
585-
586-
The S3 download (CSV or Parquet) happens inside `execute()`, wrapped in `asyncio.to_thread()`.
587-
By the time `execute()` returns, all data is already loaded into memory.
588-
Therefore fetch methods, `as_polars()`, and `as_arrow()` are synchronous and do not need `await`.
584+
`asyncio.to_thread()` for both result set creation and fetch operations,
585+
keeping the event loop free.
589586

590587
```python
591588
from pyathena import aconnect
@@ -610,9 +607,9 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
610607
region_name="us-west-2") as conn:
611608
cursor = conn.cursor(AioPolarsCursor)
612609
await cursor.execute("SELECT * FROM many_rows")
613-
print(cursor.fetchone())
614-
print(cursor.fetchmany())
615-
print(cursor.fetchall())
610+
print(await cursor.fetchone())
611+
print(await cursor.fetchmany())
612+
print(await cursor.fetchall())
616613
```
617614

618615
```python
@@ -653,9 +650,3 @@ async with await aconnect(s3_staging_dir="s3://YOUR_S3_BUCKET/path/to/",
653650
df = cursor.as_polars()
654651
```
655652

656-
```{note}
657-
When using AioPolarsCursor with the `chunksize` option, `execute()` creates a lazy
658-
reader instead of loading all data at once. Subsequent iteration via `as_polars()`,
659-
`fetchone()`, or `async for` triggers chunk-by-chunk S3 reads that are not wrapped
660-
in `asyncio.to_thread()` and will block the event loop.
661-
```

pyathena/aio/arrow/cursor.py

Lines changed: 69 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
import asyncio
55
import logging
6-
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast
6+
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union, cast
77

88
from pyathena.aio.common import WithAsyncFetch
99
from pyathena.arrow.converter import (
@@ -25,9 +25,8 @@
2525
class AioArrowCursor(WithAsyncFetch):
2626
"""Native asyncio cursor that returns results as Apache Arrow Tables.
2727
28-
Uses ``asyncio.to_thread()`` to create the result set off the event loop.
29-
Since ``AthenaArrowResultSet`` loads all data in ``__init__`` (via S3),
30-
fetch methods are synchronous (in-memory only) and do not need to be async.
28+
Uses ``asyncio.to_thread()`` for both result set creation and fetch
29+
operations, keeping the event loop free.
3130
3231
Example:
3332
>>> async with await pyathena.aconnect(...) as conn:
@@ -153,6 +152,72 @@ async def execute( # type: ignore[override]
153152
raise OperationalError(query_execution.state_change_reason)
154153
return self
155154

155+
async def fetchone( # type: ignore[override]
156+
self,
157+
) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
158+
"""Fetch the next row of the result set.
159+
160+
Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
161+
blocking the event loop.
162+
163+
Returns:
164+
A tuple representing the next row, or None if no more rows.
165+
166+
Raises:
167+
ProgrammingError: If no result set is available.
168+
"""
169+
if not self.has_result_set:
170+
raise ProgrammingError("No result set.")
171+
result_set = cast(AthenaArrowResultSet, self.result_set)
172+
return await asyncio.to_thread(result_set.fetchone)
173+
174+
async def fetchmany( # type: ignore[override]
175+
self, size: Optional[int] = None
176+
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
177+
"""Fetch multiple rows from the result set.
178+
179+
Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
180+
blocking the event loop.
181+
182+
Args:
183+
size: Maximum number of rows to fetch. Defaults to arraysize.
184+
185+
Returns:
186+
List of tuples representing the fetched rows.
187+
188+
Raises:
189+
ProgrammingError: If no result set is available.
190+
"""
191+
if not self.has_result_set:
192+
raise ProgrammingError("No result set.")
193+
result_set = cast(AthenaArrowResultSet, self.result_set)
194+
return await asyncio.to_thread(result_set.fetchmany, size)
195+
196+
async def fetchall( # type: ignore[override]
197+
self,
198+
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
199+
"""Fetch all remaining rows from the result set.
200+
201+
Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
202+
blocking the event loop.
203+
204+
Returns:
205+
List of tuples representing all remaining rows.
206+
207+
Raises:
208+
ProgrammingError: If no result set is available.
209+
"""
210+
if not self.has_result_set:
211+
raise ProgrammingError("No result set.")
212+
result_set = cast(AthenaArrowResultSet, self.result_set)
213+
return await asyncio.to_thread(result_set.fetchall)
214+
215+
async def __anext__(self):
216+
row = await self.fetchone()
217+
if row is None:
218+
raise StopAsyncIteration
219+
return row
220+
156221
def as_arrow(self) -> "Table":
157222
"""Return query results as an Apache Arrow Table.
158223

pyathena/aio/pandas/cursor.py

Lines changed: 70 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
Iterable,
1212
List,
1313
Optional,
14+
Tuple,
1415
Union,
1516
cast,
1617
)
@@ -34,9 +35,9 @@
3435
class AioPandasCursor(WithAsyncFetch):
3536
"""Native asyncio cursor that returns results as pandas DataFrames.
3637
37-
Uses ``asyncio.to_thread()`` to create the result set off the event loop.
38-
Since ``AthenaPandasResultSet`` loads all data in ``__init__`` (via S3),
39-
fetch methods are synchronous (in-memory only) and do not need to be async.
38+
Uses ``asyncio.to_thread()`` for both result set creation and fetch
39+
operations, keeping the event loop free. This is especially important
40+
when ``chunksize`` is set, as fetch calls trigger lazy S3 reads.
4041
4142
Example:
4243
>>> async with await pyathena.aconnect(...) as conn:
@@ -183,6 +184,72 @@ async def execute( # type: ignore[override]
183184
raise OperationalError(query_execution.state_change_reason)
184185
return self
185186

187+
async def fetchone( # type: ignore[override]
188+
self,
189+
) -> Optional[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
190+
"""Fetch the next row of the result set.
191+
192+
Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
193+
blocking the event loop when ``chunksize`` triggers lazy S3 reads.
194+
195+
Returns:
196+
A tuple representing the next row, or None if no more rows.
197+
198+
Raises:
199+
ProgrammingError: If no result set is available.
200+
"""
201+
if not self.has_result_set:
202+
raise ProgrammingError("No result set.")
203+
result_set = cast(AthenaPandasResultSet, self.result_set)
204+
return await asyncio.to_thread(result_set.fetchone)
205+
206+
async def fetchmany( # type: ignore[override]
207+
self, size: Optional[int] = None
208+
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
209+
"""Fetch multiple rows from the result set.
210+
211+
Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
212+
blocking the event loop when ``chunksize`` triggers lazy S3 reads.
213+
214+
Args:
215+
size: Maximum number of rows to fetch. Defaults to arraysize.
216+
217+
Returns:
218+
List of tuples representing the fetched rows.
219+
220+
Raises:
221+
ProgrammingError: If no result set is available.
222+
"""
223+
if not self.has_result_set:
224+
raise ProgrammingError("No result set.")
225+
result_set = cast(AthenaPandasResultSet, self.result_set)
226+
return await asyncio.to_thread(result_set.fetchmany, size)
227+
228+
async def fetchall( # type: ignore[override]
229+
self,
230+
) -> List[Union[Tuple[Optional[Any], ...], Dict[Any, Optional[Any]]]]:
231+
"""Fetch all remaining rows from the result set.
232+
233+
Wraps the synchronous fetch in ``asyncio.to_thread`` to avoid
234+
blocking the event loop when ``chunksize`` triggers lazy S3 reads.
235+
236+
Returns:
237+
List of tuples representing all remaining rows.
238+
239+
Raises:
240+
ProgrammingError: If no result set is available.
241+
"""
242+
if not self.has_result_set:
243+
raise ProgrammingError("No result set.")
244+
result_set = cast(AthenaPandasResultSet, self.result_set)
245+
return await asyncio.to_thread(result_set.fetchall)
246+
247+
async def __anext__(self):
248+
row = await self.fetchone()
249+
if row is None:
250+
raise StopAsyncIteration
251+
return row
252+
186253
def as_pandas(self) -> Union["DataFrame", PandasDataFrameIterator]:
187254
"""Return DataFrame or PandasDataFrameIterator based on chunksize setting.
188255

0 commit comments

Comments
 (0)