Skip to content

Commit 0f755cc

Browse files
d-v-bjhamman
andauthored
[v3] Sync with futures (#1804)
* fix: return Array from resize and update_attributes instead of AsyncArray * test(sync): add tests for sync module * clear up wait usage * _get_loop must return a loop * chore: clean up type hints * feat: add timeout to sync and sync config class, and add a test * chore: reword docstring * chore: adjust line length for the linter * update after v3 reorg merge * improve tests using asyncmock --------- Co-authored-by: Joseph Hamman <[email protected]>
1 parent d1a0d99 commit 0f755cc

File tree

6 files changed

+337
-51
lines changed

6 files changed

+337
-51
lines changed

src/zarr/array.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -527,15 +527,19 @@ def __setitem__(self, selection: Selection, value: np.ndarray) -> None:
527527
)
528528

529529
def resize(self, new_shape: ChunkCoords) -> Array:
530-
return sync(
531-
self._async_array.resize(new_shape),
532-
self._async_array.runtime_configuration.asyncio_loop,
530+
return type(self)(
531+
sync(
532+
self._async_array.resize(new_shape),
533+
self._async_array.runtime_configuration.asyncio_loop,
534+
)
533535
)
534536

535537
def update_attributes(self, new_attributes: Dict[str, Any]) -> Array:
536-
return sync(
537-
self._async_array.update_attributes(new_attributes),
538-
self._async_array.runtime_configuration.asyncio_loop,
538+
return type(self)(
539+
sync(
540+
self._async_array.update_attributes(new_attributes),
541+
self._async_array.runtime_configuration.asyncio_loop,
542+
)
539543
)
540544

541545
def __repr__(self):

src/zarr/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
class SyncConfiguration:
1010
concurrency: Optional[int] = None
1111
asyncio_loop: Optional[AbstractEventLoop] = None
12+
timeout: float | None = None
1213

1314

1415
def parse_indexing_order(data: Any) -> Literal["C", "F"]:

src/zarr/group.py

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -415,24 +415,36 @@ def nchildren(self) -> int:
415415

416416
@property
417417
def children(self) -> List[Union[Array, Group]]:
418-
_children = self._sync_iter(self._async_group.children())
419-
return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children]
418+
raise NotImplementedError
419+
# Uncomment with AsyncGroup implements this method
420+
# _children: List[Union[AsyncArray, AsyncGroup]] = self._sync_iter(
421+
# self._async_group.children()
422+
# )
423+
# return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children]
420424

421425
def __contains__(self, child) -> bool:
422426
return self._sync(self._async_group.contains(child))
423427

424428
def group_keys(self) -> List[str]:
425-
return self._sync_iter(self._async_group.group_keys())
429+
raise NotImplementedError
430+
# uncomment with AsyncGroup implements this method
431+
# return self._sync_iter(self._async_group.group_keys())
426432

427433
def groups(self) -> List[Group]:
428434
# TODO: in v2 this was a generator that return key: Group
429-
return [Group(obj) for obj in self._sync_iter(self._async_group.groups())]
435+
raise NotImplementedError
436+
# uncomment with AsyncGroup implements this method
437+
# return [Group(obj) for obj in self._sync_iter(self._async_group.groups())]
430438

431439
def array_keys(self) -> List[str]:
432-
return self._sync_iter(self._async_group.array_keys())
440+
# uncomment with AsyncGroup implements this method
441+
# return self._sync_iter(self._async_group.array_keys())
442+
raise NotImplementedError
433443

434444
def arrays(self) -> List[Array]:
435-
return [Array(obj) for obj in self._sync_iter(self._async_group.arrays())]
445+
raise NotImplementedError
446+
# uncomment with AsyncGroup implements this method
447+
# return [Array(obj) for obj in self._sync_iter(self._async_group.arrays())]
436448

437449
def tree(self, expand=False, level=None) -> Any:
438450
return self._sync(self._async_group.tree(expand=expand, level=level))

src/zarr/sync.py

Lines changed: 50 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,30 +1,34 @@
11
from __future__ import annotations
2+
from typing import TYPE_CHECKING, TypeVar
3+
4+
if TYPE_CHECKING:
5+
from typing import Any, AsyncIterator, Coroutine
26

37
import asyncio
8+
from concurrent.futures import wait
49
import threading
5-
from typing import (
6-
Any,
7-
AsyncIterator,
8-
Coroutine,
9-
List,
10-
Optional,
11-
TypeVar,
12-
)
10+
1311
from typing_extensions import ParamSpec
1412

1513
from zarr.config import SyncConfiguration
1614

15+
P = ParamSpec("P")
16+
T = TypeVar("T")
1717

1818
# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py
1919

20-
iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread
21-
loop: List[Optional[asyncio.AbstractEventLoop]] = [
20+
iothread: list[threading.Thread | None] = [None] # dedicated IO thread
21+
loop: list[asyncio.AbstractEventLoop | None] = [
2222
None
2323
] # global event loop for any non-async instance
24-
_lock: Optional[threading.Lock] = None # global lock placeholder
24+
_lock: threading.Lock | None = None # global lock placeholder
2525
get_running_loop = asyncio.get_running_loop
2626

2727

28+
class SyncError(Exception):
29+
pass
30+
31+
2832
def _get_lock() -> threading.Lock:
2933
"""Allocate or return a threading lock.
3034
@@ -36,16 +40,22 @@ def _get_lock() -> threading.Lock:
3640
return _lock
3741

3842

39-
async def _runner(event: threading.Event, coro: Coroutine, result_box: List[Optional[Any]]):
43+
async def _runner(coro: Coroutine[Any, Any, T]) -> T | BaseException:
44+
"""
45+
Await a coroutine and return the result of running it. If awaiting the coroutine raises an
46+
exception, the exception will be returned.
47+
"""
4048
try:
41-
result_box[0] = await coro
49+
return await coro
4250
except Exception as ex:
43-
result_box[0] = ex
44-
finally:
45-
event.set()
51+
return ex
4652

4753

48-
def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None):
54+
def sync(
55+
coro: Coroutine[Any, Any, T],
56+
loop: asyncio.AbstractEventLoop | None = None,
57+
timeout: float | None = None,
58+
) -> T:
4959
"""
5060
Make loop run coroutine until it returns. Runs in other thread
5161
@@ -57,30 +67,32 @@ def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None):
5767
# NB: if the loop is not running *yet*, it is OK to submit work
5868
# and we will wait for it
5969
loop = _get_loop()
60-
if loop is None or loop.is_closed():
70+
if not isinstance(loop, asyncio.AbstractEventLoop):
71+
raise TypeError(f"loop cannot be of type {type(loop)}")
72+
if loop.is_closed():
6173
raise RuntimeError("Loop is not running")
6274
try:
6375
loop0 = asyncio.events.get_running_loop()
6476
if loop0 is loop:
65-
raise NotImplementedError("Calling sync() from within a running loop")
77+
raise SyncError("Calling sync() from within a running loop")
6678
except RuntimeError:
6779
pass
68-
result_box: List[Optional[Any]] = [None]
69-
event = threading.Event()
70-
asyncio.run_coroutine_threadsafe(_runner(event, coro, result_box), loop)
71-
while True:
72-
# this loops allows thread to get interrupted
73-
if event.wait(1):
74-
break
75-
76-
return_result = result_box[0]
80+
81+
future = asyncio.run_coroutine_threadsafe(_runner(coro), loop)
82+
83+
finished, unfinished = wait([future], return_when=asyncio.ALL_COMPLETED, timeout=timeout)
84+
if len(unfinished) > 0:
85+
raise asyncio.TimeoutError(f"Coroutine {coro} failed to finish in within {timeout}s")
86+
assert len(finished) == 1
87+
return_result = list(finished)[0].result()
88+
7789
if isinstance(return_result, BaseException):
7890
raise return_result
7991
else:
8092
return return_result
8193

8294

83-
def _get_loop():
95+
def _get_loop() -> asyncio.AbstractEventLoop:
8496
"""Create or return the default fsspec IO loop
8597
8698
The loop will be running on a separate thread.
@@ -96,25 +108,24 @@ def _get_loop():
96108
th.daemon = True
97109
th.start()
98110
iothread[0] = th
111+
assert loop[0] is not None
99112
return loop[0]
100113

101114

102-
P = ParamSpec("P")
103-
T = TypeVar("T")
104-
105-
106115
class SyncMixin:
107116
_sync_configuration: SyncConfiguration
108117

109118
def _sync(self, coroutine: Coroutine[Any, Any, T]) -> T:
110119
# TODO: refactor this to to take *args and **kwargs and pass those to the method
111120
# this should allow us to better type the sync wrapper
112-
return sync(coroutine, loop=self._sync_configuration.asyncio_loop)
113-
114-
def _sync_iter(self, coroutine: Coroutine[Any, Any, AsyncIterator[T]]) -> List[T]:
115-
async def iter_to_list() -> List[T]:
116-
# TODO: replace with generators so we don't materialize the entire iterator at once
117-
async_iterator = await coroutine
121+
return sync(
122+
coroutine,
123+
loop=self._sync_configuration.asyncio_loop,
124+
timeout=self._sync_configuration.timeout,
125+
)
126+
127+
def _sync_iter(self, async_iterator: AsyncIterator[T]) -> list[T]:
128+
async def iter_to_list() -> list[T]:
118129
return [item async for item in async_iterator]
119130

120131
return self._sync(iter_to_list())

src/zarr/v3/sync.py

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
from __future__ import annotations
2+
from typing import TYPE_CHECKING, TypeVar
3+
4+
if TYPE_CHECKING:
5+
from typing import Any, AsyncIterator, Coroutine
6+
7+
import asyncio
8+
from concurrent.futures import wait
9+
import threading
10+
11+
from typing_extensions import ParamSpec
12+
13+
from zarr.v3.config import SyncConfiguration
14+
15+
P = ParamSpec("P")
16+
T = TypeVar("T")
17+
18+
# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py
19+
20+
iothread: list[threading.Thread | None] = [None] # dedicated IO thread
21+
loop: list[asyncio.AbstractEventLoop | None] = [
22+
None
23+
] # global event loop for any non-async instance
24+
_lock: threading.Lock | None = None # global lock placeholder
25+
get_running_loop = asyncio.get_running_loop
26+
27+
28+
class SyncError(Exception):
29+
pass
30+
31+
32+
def _get_lock() -> threading.Lock:
33+
"""Allocate or return a threading lock.
34+
35+
The lock is allocated on first use to allow setting one lock per forked process.
36+
"""
37+
global _lock
38+
if not _lock:
39+
_lock = threading.Lock()
40+
return _lock
41+
42+
43+
async def _runner(coro: Coroutine[Any, Any, T]) -> T | BaseException:
44+
"""
45+
Await a coroutine and return the result of running it. If awaiting the coroutine raises an
46+
exception, the exception will be returned.
47+
"""
48+
try:
49+
return await coro
50+
except Exception as ex:
51+
return ex
52+
53+
54+
def sync(
55+
coro: Coroutine[Any, Any, T],
56+
loop: asyncio.AbstractEventLoop | None = None,
57+
timeout: float | None = None,
58+
) -> T:
59+
"""
60+
Make loop run coroutine until it returns. Runs in other thread
61+
62+
Examples
63+
--------
64+
>>> sync(async_function(), existing_loop)
65+
"""
66+
if loop is None:
67+
# NB: if the loop is not running *yet*, it is OK to submit work
68+
# and we will wait for it
69+
loop = _get_loop()
70+
if not isinstance(loop, asyncio.AbstractEventLoop):
71+
raise TypeError(f"loop cannot be of type {type(loop)}")
72+
if loop.is_closed():
73+
raise RuntimeError("Loop is not running")
74+
try:
75+
loop0 = asyncio.events.get_running_loop()
76+
if loop0 is loop:
77+
raise SyncError("Calling sync() from within a running loop")
78+
except RuntimeError:
79+
pass
80+
81+
future = asyncio.run_coroutine_threadsafe(_runner(coro), loop)
82+
83+
finished, unfinished = wait([future], return_when=asyncio.ALL_COMPLETED, timeout=timeout)
84+
if len(unfinished) > 0:
85+
raise asyncio.TimeoutError(f"Coroutine {coro} failed to finish in within {timeout}s")
86+
assert len(finished) == 1
87+
return_result = list(finished)[0].result()
88+
89+
if isinstance(return_result, BaseException):
90+
raise return_result
91+
else:
92+
return return_result
93+
94+
95+
def _get_loop() -> asyncio.AbstractEventLoop:
96+
"""Create or return the default fsspec IO loop
97+
98+
The loop will be running on a separate thread.
99+
"""
100+
if loop[0] is None:
101+
with _get_lock():
102+
# repeat the check just in case the loop got filled between the
103+
# previous two calls from another thread
104+
if loop[0] is None:
105+
new_loop = asyncio.new_event_loop()
106+
loop[0] = new_loop
107+
th = threading.Thread(target=new_loop.run_forever, name="zarrIO")
108+
th.daemon = True
109+
th.start()
110+
iothread[0] = th
111+
assert loop[0] is not None
112+
return loop[0]
113+
114+
115+
class SyncMixin:
116+
_sync_configuration: SyncConfiguration
117+
118+
def _sync(self, coroutine: Coroutine[Any, Any, T]) -> T:
119+
# TODO: refactor this to to take *args and **kwargs and pass those to the method
120+
# this should allow us to better type the sync wrapper
121+
return sync(
122+
coroutine,
123+
loop=self._sync_configuration.asyncio_loop,
124+
timeout=self._sync_configuration.timeout,
125+
)
126+
127+
def _sync_iter(self, async_iterator: AsyncIterator[T]) -> list[T]:
128+
async def iter_to_list() -> list[T]:
129+
return [item async for item in async_iterator]
130+
131+
return self._sync(iter_to_list())

0 commit comments

Comments
 (0)