-
-
Notifications
You must be signed in to change notification settings - Fork 328
[v3] Sync with futures #1804
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[v3] Sync with futures #1804
Conversation
addresses #1803 |
src/zarr/v3/sync.py
Outdated
if isinstance(return_result, BaseException): | ||
raise return_result | ||
else: | ||
return return_result | ||
|
||
|
||
def _get_loop(): | ||
def _get_loop() -> asyncio.AbstractEventLoop | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
def _get_loop() -> asyncio.AbstractEventLoop | None: | |
def _get_loop() -> asyncio.AbstractEventLoop: |
I suspect you had this here because of some mypy thing but it should be possible to get this to always return a loop. Perhaps an assert loop[0] is not None
right before the return is all that is needed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might depend on whether it is called in the main thread, the IO thread of elsewhere,
src/zarr/v3/sync.py
Outdated
done, _ = wait([future]) | ||
return_result = list(done)[0].result() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you comment on what is happening here?
Some specific questions / ideas:
wait
returnsdone
andnot_done
. Do we need to assert anything aboutnot_done
? I guess we're expecting expecting thatnot_done
is always an empty set because we have setreturn_when=ALL_COMPLETED
.- Should we consider specifying the
timeout
parameter topwait
? Would a future that hangs never return here? - Can we check that
len(done)
is exactly 1 before indexing out the result?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I understand it, done, _ = wait([future])
is guaranteed to return tuple[set with one element, the empty set]
, because we are waiting on a list of futures with 1 element, and wait
returns when the futures are all done.
Regarding a timeout, that's a broader question about what error boundaries we want to define for IO operations. At the moment, we basically assume that IO either succeeds or fails after a reasonable amount of time. If we want to include "IO takes forever" as a failure mode, then we would need to start building infrastructure around that higher up in the stack.
|
||
# From https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py | ||
|
||
iothread: List[Optional[threading.Thread]] = [None] # dedicated IO thread | ||
loop: List[Optional[asyncio.AbstractEventLoop]] = [ | ||
iothread: list[threading.Thread | None] = [None] # dedicated IO thread |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So we are targeting py >=3.9?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
3.10+ actually!
src/zarr/v3/sync.py
Outdated
|
||
|
||
def sync(coro: Coroutine, loop: Optional[asyncio.AbstractEventLoop] = None): | ||
def sync(coro: Coroutine[Any, Any, T], loop: asyncio.AbstractEventLoop | None = None) -> T: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason not to directly re-use the same functions in fsspec, since they exist and have a lot of eyes on them? If they need improvement, that would be gladly received. Also, you should make triple sure that if this code appears in multiple places, there are not accidentally multiple loops on multiple async threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wasn't aware that these functions existed in fsspec! I'm reading https://github.com/fsspec/filesystem_spec/blob/master/fsspec/asyn.py and the relevant tests right now.
One reason I can think of to not simply import these functions from fsspec is that it ties zarr-python
to fsspec in a rather unexpected way. That's a rather abstract problem, and surely something we could defer solving if necessary.
Is FSSpec the only place where this kind of functionality exists, or are there more places? Surely we aren't the only python users setting synchronous barriers for async code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had loosely copied the code from fsspec for zarrita.
src/zarr/v3/sync.py
Outdated
event = threading.Event() | ||
asyncio.run_coroutine_threadsafe(_runner(event, coro, result_box), loop) | ||
while True: | ||
# this loops allows thread to get interrupted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment here explains the way to loop was written. Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the changed code allow for an exception in the main thread (timeout, interrupt and other signals)? Does the GC run while waiting?
We don't test for these things at present, so I have no idea! Tests are needed, regardless of the efforts in this PR, and I see that fsspec basically has this covered.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I suggest we open a ticket for finding a test that exercises this concern. I don't think we should hold back this PR though.
src/zarr/v3/sync.py
Outdated
if isinstance(return_result, BaseException): | ||
raise return_result | ||
else: | ||
return return_result | ||
|
||
|
||
def _get_loop(): | ||
def _get_loop() -> asyncio.AbstractEventLoop | None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might depend on whether it is called in the main thread, the IO thread of elsewhere,
src/zarr/v3/group.py
Outdated
# _children: List[Union[AsyncArray, AsyncGroup]] = self._sync_iter( | ||
# self._async_group.children() | ||
# ) | ||
# return [Array(obj) if isinstance(obj, AsyncArray) else Group(obj) for obj in _children] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
all of these changes were to make mypy happy.
raise RuntimeError("Loop is not running") | ||
try: | ||
loop0 = asyncio.events.get_running_loop() | ||
if loop0 is loop: | ||
raise NotImplementedError("Calling sync() from within a running loop") | ||
raise SyncError("Calling sync() from within a running loop") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this was a bug before as NotImplementedError
inherits from RuntimeError
so this exception never raised.
sync(foo(), loop=loop) | ||
|
||
|
||
@pytest.mark.filterwarnings("ignore:coroutine.*was never awaited") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
despite this, we're still seeing the below warning. Why?
sys:1: RuntimeWarning: coroutine 'test_sync_raises_if_calling_sync_from_within_a_running_loop.<locals>.foo' was never awaited
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This warning comes after the test has finished, during garbage collection. Maybe doing del
on the objects (and closure) would fix. Or run the coroutine after the check.
thanks to @jhamman this PR also adds some tests. I also added a @martindurant [wrotehttps://github.com//pull/1804#discussion_r1573863272)
How would you propose we test for these properties? Since |
Meta question: is it actually possible to open multiple groups/arrays concurrently from sync code? |
There should not be a technical barrier to this, since this question translates to "is it possible to concurrently open some JSON documents and apply very lightweight parsing to them", but (to my knowledge) we don't have an API that supports this yet, emphasis on "yet". Bulk hierarchy access is a target for v3, and we are trying to create a foundation for doing this concurrently. |
I encourage early planning for this, since data IO access will predominantly come from sync code (as part of data processing). Current zarr 2 already has concurrency on data read (with fsspec) within an array, so this would be a clear benefit of the new approach. We've been talking about it for two years or more :) |
IMO this API requires a data structure that represents an un-stored Zarr hierarchy, which The plan is to implement a declarative hierarchy API, which will allow something like this: g = GroupModel(
attributes={'foo': 10},
members={
'group': GroupModel(),
'array': ArrayModel(shape=(10,))
}
).to_storage('foo.zarr/path') where |
I was thinking more of data access, such as when xarray wants to eagerly load multiple smallish coordinate arrays, eagerly at dataset open time. |
that's a good point. we could do something like this: nodes: tuple[Array | Group] = open_nodes(
[
path_to_array,
path_to_group,
path_to_array_2
],
access_options={'array': {...}, 'group': {...}}
) We could also have But i'm just thinking out loud here :) |
Exactly, this is the kind of design I am after. Maybe it should be a separate issue/discussion. |
I opened #1805 so we can chat about it |
Adjusts the
sync
function. It now creates aFuture
from a coroutine, wait for that future to complete in another thread, and access the result value with the.result()
method.The previous implementation relied on mutating a list which was initialized to
[None]
, which is ambiguous if the coroutine being awaited happens to returnNone
.In order to pass pre-commit, I also had to fix type annotations in
v3/array.py
.TODO: