Skip to content
Merged
16 changes: 10 additions & 6 deletions src/zarr/v3/array.py
Original file line number Diff line number Diff line change
Expand Up @@ -527,15 +527,19 @@ def __setitem__(self, selection: Selection, value: np.ndarray) -> None:
)

def resize(self, new_shape: ChunkCoords) -> Array:
return sync(
self._async_array.resize(new_shape),
self._async_array.runtime_configuration.asyncio_loop,
return type(self)(
sync(
self._async_array.resize(new_shape),
self._async_array.runtime_configuration.asyncio_loop,
)
)

def update_attributes(self, new_attributes: Dict[str, Any]) -> Array:
return sync(
self._async_array.update_attributes(new_attributes),
self._async_array.runtime_configuration.asyncio_loop,
return type(self)(
sync(
self._async_array.update_attributes(new_attributes),
self._async_array.runtime_configuration.asyncio_loop,
)
)

def __repr__(self):
Expand Down
45 changes: 21 additions & 24 deletions src/zarr/v3/sync.py
Original file line number Diff line number Diff line change
@@ -1,27 +1,29 @@
from __future__ import annotations

import asyncio
from concurrent.futures import wait
import threading
from typing import (
Any,
AsyncIterator,
Coroutine,
List,
Optional,
TypeVar,
)
from typing_extensions import ParamSpec

from zarr.v3.config import SyncConfiguration

P = ParamSpec("P")
T = TypeVar("T")

# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py

iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread
loop: List[Optional[asyncio.AbstractEventLoop]] = [
iothread: list[threading.Thread | None] = [None] # dedicated IO thread
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we are targeting py >=3.9?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

3.10+ actually!

loop: list[asyncio.AbstractEventLoop | None] = [
None
] # global event loop for any non-async instance
_lock: Optional[threading.Lock] = None # global lock placeholder
_lock: threading.Lock | None = None # global lock placeholder
get_running_loop = asyncio.get_running_loop


Expand All @@ -36,16 +38,18 @@ def _get_lock() -> threading.Lock:
return _lock


async def _runner(event: threading.Event, coro: Coroutine, result_box: List[Optional[Any]]):
async def _runner(coro: Coroutine[Any, Any, T]) -> T | BaseException:
"""
Await a coroutine and return the result of running it. If await it raises an exception,
that will be returned instead.
"""
try:
result_box[0] = await coro
return await coro
except Exception as ex:
result_box[0] = ex
finally:
event.set()
return ex


def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None):
def sync(coro: Coroutine[Any, Any, T], loop: asyncio.AbstractEventLoop | None = None) -> T:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any reason not to directly re-use the same functions in fsspec, since they exist and have a lot of eyes on them? If they need improvement, that would be gladly received. Also, you should make triple sure that if this code appears in multiple places, there are not accidentally multiple loops on multiple async threads.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wasn't aware that these functions existed in fsspec! I'm reading https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py and the relevant tests right now.

One reason I can think of to not simply import these functions from fsspec is that it ties zarr-python to fsspec in a rather unexpected way. That's a rather abstract problem, and surely something we could defer solving if necessary.

Is FSSpec the only place where this kind of functionality exists, or are there more places? Surely we aren't the only python users setting synchronous barriers for async code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had loosely copied the code from fsspec for zarrita.

"""
Make loop run coroutine until it returns. Runs in other thread

Expand All @@ -65,22 +69,19 @@ def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None):
raise NotImplementedError("Calling sync() from within a running loop")
except RuntimeError:
pass
result_box: List[Optional[Any]] = [None]
event = threading.Event()
asyncio.run_coroutine_threadsafe(_runner(event, coro, result_box), loop)
while True:
# this loops allows thread to get interrupted
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment here explains the way to loop was written. Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?

We don't test for these things at present, so I have no idea! Tests are needed, regardless of the efforts in this PR, and I see that fsspec basically has this covered.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we open a ticket for finding a test that exercises this concern. I don't think we should hold back this PR though.

if event.wait(1):
break

return_result = result_box[0]

future = asyncio.run_coroutine_threadsafe(_runner(coro), loop)

done, _ = wait([future])
return_result = list(done)[0].result()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on what is happening here?

Some specific questions / ideas:

  1. wait returns done and not_done. Do we need to assert anything about not_done? I guess we're expecting expecting that not_done is always an empty set because we have set return_when=ALL_COMPLETED.
  2. Should we consider specifying the timeout parameter top wait? Would a future that hangs never return here?
  3. Can we check that len(done) is exactly 1 before indexing out the result?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I understand it, done, _ = wait([future]) is guaranteed to return tuple[set with one element, the empty set], because we are waiting on a list of futures with 1 element, and wait returns when the futures are all done.

Regarding a timeout, that's a broader question about what error boundaries we want to define for IO operations. At the moment, we basically assume that IO either succeeds or fails after a reasonable amount of time. If we want to include "IO takes forever" as a failure mode, then we would need to start building infrastructure around that higher up in the stack.


if isinstance(return_result, BaseException):
raise return_result
else:
return return_result


def _get_loop():
def _get_loop() -> asyncio.AbstractEventLoop | None:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
def _get_loop() -> asyncio.AbstractEventLoop | None:
def _get_loop() -> asyncio.AbstractEventLoop:

I suspect you had this here because of some mypy thing but it should be possible to get this to always return a loop. Perhaps an assert loop[0] is not None right before the return is all that is needed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might depend on whether it is called in the main thread, the IO thread of elsewhere,

"""Create or return the default fsspec IO loop

The loop will be running on a separate thread.
Expand All @@ -99,10 +100,6 @@ def _get_loop():
return loop[0]


P = ParamSpec("P")
T = TypeVar("T")


class SyncMixin:
_sync_configuration: SyncConfiguration

Expand Down