Skip to content

Commit ff6da8f

Browse files
josh-ag2clauded-v-b
authored
fix(FsspecStore): close owned async filesystem on store.close() (#4003)
* fix(FsspecStore): close owned async filesystem on store.close() FsspecStore.from_url() and from_mapper() create their own async filesystem instance that zarr is responsible for — but Store.close() never cleaned it up, leaving the underlying aiohttp ClientSession open until garbage collection. This produced "Unclosed client session" ResourceWarnings from aiohttp, and in environments where the finalizer ran on the wrong event loop (e.g. Python 3.12+ with eager_start=True) it could raise RuntimeError. Changes: - Add _close_fs() async helper: calls fs.set_session() then client.close() for filesystems that expose set_session() (e.g. s3fs); no-op for all others. - Add _owns_fs: bool to FsspecStore.__init__ (default False). Set True in from_url() unconditionally; set True in from_mapper() only when _make_async() produced a new instance (sync→async wrap). Direct construction and from_upath() leave _owns_fs=False — the caller supplied the fs and remains responsible for it. - Override close() to invoke zarr_sync(_close_fs(self.fs)) before calling super().close(), guarded by _owns_fs and a bare except so it can never raise from a destructor path. Tests: - Update pytestmark comment (the filter stays for GC-path warnings). - test_from_url_owns_filesystem / test_from_url_close_releases_store - test_direct_construction_does_not_own_filesystem - test_from_upath_does_not_own_filesystem - test_from_mapper_does_not_own_already_async_filesystem - test_from_mapper_owns_wrapped_sync_filesystem - test_close_fs_closes_s3_client / test_close_fs_no_op_for_fs_without_set_session Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix lint: use contextlib.suppress instead of try/except/pass (SIM105) * add towncrier release note for #4003 * changes: expand 4003 release note to document fix scope and s3fs limitation Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com> * fix(FsspecStore): propagate fs ownership through with_read_only Addresses roborev review findings on the filesystem-ownership change: - with_read_only() transferred fs ownership: it built the derived store with _owns_fs=False while sharing the same fs. In the common from_url(...).with_read_only() chain the only owner (the unreferenced source) was GC'd without close(), reintroducing the unclosed-session leak. Ownership now transfers to the surviving store and is cleared on the source to avoid a double-close. Covered by a new test. - close() now logs at debug before suppressing, so a regression in the close path stays observable instead of silently reverting to leaking. - Documented that set_session() lazily creates a session, so closing a store that never did I/O may instantiate one purely to close it (accepted best-effort behavior). Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com> --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com> Co-authored-by: Davis Bennett <davis.v.bennett@gmail.com>
1 parent 1cda981 commit ff6da8f

3 files changed

Lines changed: 224 additions & 5 deletions

File tree

changes/4003.bugfix.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
`FsspecStore.from_url()` and `from_mapper()` now close the async filesystem
2+
they create when `store.close()` is called. Previously the underlying aiohttp
3+
`ClientSession` was left open until garbage collection, producing
4+
`"Unclosed client session"` `ResourceWarning`s from aiohttp.
5+
6+
The fix introduces `FsspecStore._owns_fs`, a boolean that is ``True`` only when
7+
`FsspecStore` itself created the filesystem (via `from_url` or `from_mapper`
8+
when a sync→async conversion was performed). When `_owns_fs` is ``True``,
9+
`store.close()` calls the new `_close_fs()` helper, which invokes
10+
`fs.set_session()` and closes the returned client. Callers who supply their own
11+
filesystem instance to `FsspecStore()` directly remain responsible for its
12+
lifecycle; `_owns_fs` is ``False`` for those stores.
13+
14+
**Scope note**: This fix closes the S3 client session that is active at the time
15+
`store.close()` is called. Some S3-backed filesystem implementations (e.g.
16+
s3fs with ``cache_regions=True``) may internally refresh and replace their
17+
client during I/O operations, abandoning prior sessions before ``store.close()``
18+
is invoked. Those intermediate sessions are outside the scope of this fix and
19+
are an issue in the upstream filesystem library.

src/zarr/storage/_fsspec.py

Lines changed: 57 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import json
44
import warnings
55
from contextlib import suppress
6+
from logging import getLogger
67
from typing import TYPE_CHECKING, Any
78

89
from packaging.version import parse as parse_version
@@ -18,6 +19,8 @@
1819
from zarr.errors import ZarrUserWarning
1920
from zarr.storage._utils import _dereference_path
2021

22+
logger = getLogger(__name__)
23+
2124
if TYPE_CHECKING:
2225
from collections.abc import AsyncIterator, Iterable
2326

@@ -35,6 +38,26 @@
3538
)
3639

3740

41+
async def _close_fs(fs: AsyncFileSystem) -> None:
42+
"""
43+
Best-effort async close of an fsspec async filesystem owned by FsspecStore.
44+
45+
For filesystems that expose ``set_session()`` (e.g. s3fs) the underlying
46+
aiohttp ``ClientSession`` is closed explicitly, which prevents
47+
"Unclosed client session" ``ResourceWarning``s from aiohttp. For all
48+
other filesystem types the call is a no-op (not every implementation
49+
manages an HTTP session directly).
50+
51+
Note that ``set_session()`` lazily creates a session if none exists yet, so
52+
closing a store that never performed any I/O may instantiate a session
53+
purely to close it. This is accepted best-effort behavior; fsspec does not
54+
expose a stable, cross-implementation way to test for an existing session.
55+
"""
56+
if hasattr(fs, "set_session"):
57+
session = await fs.set_session()
58+
await session.close()
59+
60+
3861
def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem:
3962
"""Convert a sync FSSpec filesystem to an async FFSpec filesystem
4063
@@ -129,6 +152,9 @@ def __init__(
129152
self.fs = fs
130153
self.path = path
131154
self.allowed_exceptions = allowed_exceptions
155+
# True only when this store created fs itself (from_url / from_mapper with new instance).
156+
# Callers who supply their own fs remain responsible for its lifecycle.
157+
self._owns_fs: bool = False
132158

133159
if not self.fs.async_impl:
134160
raise TypeError("Filesystem needs to support async operations.")
@@ -194,13 +220,17 @@ def from_mapper(
194220
-------
195221
FsspecStore
196222
"""
197-
fs = _make_async(fs_map.fs)
198-
return cls(
223+
original_fs = fs_map.fs
224+
fs = _make_async(original_fs)
225+
store = cls(
199226
fs=fs,
200227
path=fs_map.root,
201228
read_only=read_only,
202229
allowed_exceptions=allowed_exceptions,
203230
)
231+
# _make_async returns a new instance when converting sync→async; own it.
232+
store._owns_fs = fs is not original_fs
233+
return store
204234

205235
@classmethod
206236
def from_url(
@@ -242,16 +272,39 @@ def from_url(
242272
if not fs.async_impl:
243273
fs = _make_async(fs)
244274

245-
return cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions)
275+
store = cls(fs=fs, path=path, read_only=read_only, allowed_exceptions=allowed_exceptions)
276+
store._owns_fs = True
277+
return store
246278

247279
def with_read_only(self, read_only: bool = False) -> FsspecStore:
248280
# docstring inherited
249-
return type(self)(
281+
new_store = type(self)(
250282
fs=self.fs,
251283
path=self.path,
252284
allowed_exceptions=self.allowed_exceptions,
253285
read_only=read_only,
254286
)
287+
# The derived store shares the same fs. Transfer ownership so the
288+
# surviving store closes it, and clear ours to avoid a double-close.
289+
# Otherwise the common ``from_url(...).with_read_only()`` chain would
290+
# drop the only owner (the unreferenced source) and leak the session.
291+
new_store._owns_fs = self._owns_fs
292+
self._owns_fs = False
293+
return new_store
294+
295+
def close(self) -> None:
296+
# docstring inherited
297+
if self._owns_fs:
298+
from zarr.core.sync import sync as zarr_sync
299+
300+
# Best-effort: a failure to release the session must not block close(),
301+
# but log it so a genuine regression in the close path stays observable
302+
# rather than silently reverting to the leaking behavior.
303+
try:
304+
zarr_sync(_close_fs(self.fs))
305+
except Exception:
306+
logger.debug("Failed to close owned filesystem %r", self.fs, exc_info=True)
307+
super().close()
255308

256309
async def clear(self) -> None:
257310
# docstring inherited

tests/test_store/test_fsspec.py

Lines changed: 148 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,9 @@
3636
pytest.mark.filterwarnings(
3737
re.escape("ignore:datetime.datetime.utcnow() is deprecated:DeprecationWarning")
3838
),
39-
# TODO: fix these warnings
39+
# FsspecStore.from_url() and from_mapper() now close the aiohttp session on store.close().
40+
# This filter covers stores that are GC'd without an explicit close() call, and any
41+
# residual sessions from aiobotocore's ClientCreatorContext (a separate upstream issue).
4042
pytest.mark.filterwarnings("ignore:Unclosed client session:ResourceWarning"),
4143
pytest.mark.filterwarnings(
4244
"ignore:coroutine 'ClientCreatorContext.__aexit__' was never awaited:RuntimeWarning"
@@ -283,6 +285,75 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None:
283285
):
284286
await store.delete_dir("test_prefix")
285287

288+
# ── Filesystem lifecycle (ownership) ──────────────────────────────────────
289+
290+
def test_from_url_owns_filesystem(self) -> None:
291+
"""FsspecStore.from_url() creates the async fs; it must own it."""
292+
store = FsspecStore.from_url(
293+
f"s3://{test_bucket_name}/lifecycle/",
294+
storage_options={"endpoint_url": endpoint_url, "anon": False},
295+
)
296+
assert store._owns_fs
297+
store.close()
298+
299+
async def test_from_url_close_releases_store(self) -> None:
300+
"""
301+
close() on a from_url() store must succeed without error and mark the
302+
store as closed. For the owned filesystem, _close_fs() is invoked to
303+
release the underlying S3 client / aiohttp connection pool.
304+
"""
305+
store = FsspecStore.from_url(
306+
f"s3://{test_bucket_name}/lifecycle/",
307+
storage_options={"endpoint_url": endpoint_url, "anon": False},
308+
)
309+
# Materialise the S3 client and connection pool.
310+
await store.set("probe", cpu.Buffer.from_bytes(b"x"))
311+
312+
store.close()
313+
314+
assert not store._is_open
315+
316+
def test_direct_construction_does_not_own_filesystem(self) -> None:
317+
"""Direct FsspecStore() must not claim ownership — the caller owns the fs."""
318+
try:
319+
from fsspec import url_to_fs
320+
except ImportError:
321+
from fsspec.core import url_to_fs
322+
fs, path = url_to_fs(
323+
f"s3://{test_bucket_name}", endpoint_url=endpoint_url, anon=False, asynchronous=True
324+
)
325+
store = FsspecStore(fs=fs, path=path)
326+
assert not store._owns_fs
327+
328+
@pytest.mark.skipif(
329+
parse_version(fsspec.__version__) < parse_version("2024.03.01"),
330+
reason="Prior bug in from_upath",
331+
)
332+
def test_from_upath_does_not_own_filesystem(self) -> None:
333+
"""from_upath() uses the UPath's existing fs; the store must not own it."""
334+
upath = pytest.importorskip("upath")
335+
path = upath.UPath(
336+
f"s3://{test_bucket_name}/foo/bar/",
337+
endpoint_url=endpoint_url,
338+
anon=False,
339+
asynchronous=True,
340+
)
341+
store = FsspecStore.from_upath(path)
342+
assert not store._owns_fs
343+
344+
def test_from_mapper_does_not_own_already_async_filesystem(self) -> None:
345+
"""from_mapper() with an already-async fs must not claim ownership."""
346+
s3_filesystem = s3fs.S3FileSystem(
347+
asynchronous=True,
348+
endpoint_url=endpoint_url,
349+
anon=False,
350+
skip_instance_cache=True,
351+
)
352+
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/")
353+
store = FsspecStore.from_mapper(mapper)
354+
# _make_async returns the same instance for an already-async fs.
355+
assert not store._owns_fs
356+
286357

287358
def array_roundtrip(store: FsspecStore) -> None:
288359
"""
@@ -512,6 +583,82 @@ def test_open_s3map_raises() -> None:
512583
zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3))
513584

514585

586+
async def test_close_fs_closes_s3_client() -> None:
587+
"""
588+
_close_fs() must call set_session() and then close() on the returned
589+
S3 client. This is verified with mocks to avoid a real S3 connection.
590+
"""
591+
from unittest.mock import AsyncMock
592+
593+
from zarr.storage._fsspec import _close_fs
594+
595+
mock_client = AsyncMock()
596+
mock_fs = AsyncMock()
597+
mock_fs.set_session = AsyncMock(return_value=mock_client)
598+
599+
await _close_fs(mock_fs)
600+
601+
mock_fs.set_session.assert_called_once()
602+
mock_client.close.assert_called_once()
603+
604+
605+
async def test_close_fs_no_op_for_fs_without_set_session() -> None:
606+
"""_close_fs() must be a no-op for filesystems that don't expose set_session()."""
607+
from unittest.mock import AsyncMock
608+
609+
from zarr.storage._fsspec import _close_fs
610+
611+
mock_fs = AsyncMock(spec=[]) # empty spec — no set_session attribute
612+
await _close_fs(mock_fs) # must not raise
613+
614+
615+
@pytest.mark.skipif(
616+
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
617+
reason="No AsyncFileSystemWrapper",
618+
)
619+
def test_from_mapper_owns_wrapped_sync_filesystem(tmp_path: pathlib.Path) -> None:
620+
"""
621+
from_mapper() with a sync fs must wrap it in AsyncFileSystemWrapper and
622+
claim ownership so that close() cleans it up.
623+
624+
The local filesystem is synchronous; _make_async() produces a new
625+
AsyncFileSystemWrapper instance — a different object from the original fs.
626+
"""
627+
import fsspec as _fsspec
628+
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
629+
630+
fs = _fsspec.filesystem("file", auto_mkdir=True)
631+
mapper = fs.get_mapper(str(tmp_path))
632+
store = FsspecStore.from_mapper(mapper)
633+
assert isinstance(store.fs, AsyncFileSystemWrapper)
634+
assert store._owns_fs
635+
636+
637+
@pytest.mark.skipif(
638+
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
639+
reason="No AsyncFileSystemWrapper",
640+
)
641+
def test_with_read_only_transfers_filesystem_ownership(tmp_path: pathlib.Path) -> None:
642+
"""
643+
with_read_only() must transfer fs ownership to the derived store and clear
644+
it on the source, so the surviving store closes the shared fs exactly once.
645+
646+
In the common ``from_url(...).with_read_only()`` chain the source store is
647+
immediately unreferenced; if ownership were not transferred, the only owner
648+
would be garbage-collected without close() and the session would leak.
649+
"""
650+
source = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": False})
651+
assert source._owns_fs
652+
653+
derived = source.with_read_only(read_only=True)
654+
655+
# Ownership moved to the survivor; the source no longer owns it (no double-close).
656+
assert derived._owns_fs
657+
assert not source._owns_fs
658+
# The derived store shares the same underlying fs.
659+
assert derived.fs is source.fs
660+
661+
515662
@pytest.mark.parametrize("asynchronous", [True, False])
516663
def test_make_async(asynchronous: bool) -> None:
517664
s3_filesystem = s3fs.S3FileSystem(

0 commit comments

Comments
 (0)