Skip to content

Change stream results to indicate cancellation #498

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 44 additions & 30 deletions design/mvp/CanonicalABI.md
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ values *into* the buffer's memory. Buffers are represented by the following 3
abstract Python classes:
```python
class Buffer:
MAX_LENGTH = 2**30 - 1
MAX_LENGTH = 2**28 - 1
t: ValType
remain: Callable[[], int]

Expand Down Expand Up @@ -1055,7 +1055,7 @@ stream.)
```python
RevokeBuffer = Callable[[], None]
OnPartialCopy = Callable[[RevokeBuffer], None]
OnCopyDone = Callable[[], None]
OnCopyDone = Callable[[Literal['completed','cancelled']], None]

class ReadableStream:
t: ValType
Expand All @@ -1068,7 +1068,8 @@ The key operation is `read` which works as follows:
* `read` is non-blocking, returning `'blocked'` if it would have blocked.
* The `On*` callbacks are only called *after* `read` returns `'blocked'`.
* `OnCopyDone` is called to indicate that the caller has regained ownership of
the buffer.
the buffer and whether this was due to the read/write completing or
being cancelled.
* `OnPartialCopy` is called to indicate a partial write has been made to the
buffer, but there may be further writes made in the future, so the caller
has *not* regained ownership of the buffer.
Expand Down Expand Up @@ -1118,21 +1119,21 @@ If set, the `pending_*` fields record the `Buffer` and `On*` callbacks of a
`read`. Closing the readable or writable end of a stream or cancelling a `read`
or `write` notifies any pending `read` or `write` via its `OnCopyDone`
callback, which lets the other side know that ownership of the `Buffer` has
been returned:
been returned and why:
```python
def reset_and_notify_pending(self):
def reset_and_notify_pending(self, why):
pending_on_copy_done = self.pending_on_copy_done
self.reset_pending()
pending_on_copy_done()
pending_on_copy_done(why)

def cancel(self):
self.reset_and_notify_pending()
self.reset_and_notify_pending('cancelled')

def close(self):
if not self.closed_:
self.closed_ = True
if self.pending_buffer:
self.reset_and_notify_pending()
self.reset_and_notify_pending('completed')

def closed(self):
return self.closed_
Expand Down Expand Up @@ -1178,7 +1179,7 @@ but in the opposite direction. Both are implemented by a single underlying
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
self.reset_and_notify_pending('completed')
return 'done'
```
Currently, there is a trap when both the `read` and `write` come from the same
Expand Down Expand Up @@ -1242,10 +1243,10 @@ and closing once a value has been read-from or written-to the given buffer:
class FutureEnd(StreamEnd):
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
assert(buffer.remain() == 1)
def on_copy_done_wrapper():
def on_copy_done_wrapper(why):
if buffer.remain() == 0:
self.stream.close()
on_copy_done()
on_copy_done(why)
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
if ret == 'done' and buffer.remain() == 0:
self.stream.close()
Expand Down Expand Up @@ -3520,7 +3521,8 @@ multiple partial copies before having to context-switch back.
```python
if opts.sync:
final_revoke_buffer = None
def on_partial_copy(revoke_buffer):
def on_partial_copy(revoke_buffer, why = 'completed'):
assert(why == 'completed')
nonlocal final_revoke_buffer
final_revoke_buffer = revoke_buffer
if not async_copy.done():
Expand All @@ -3531,6 +3533,8 @@ multiple partial copies before having to context-switch back.
await task.wait_on(async_copy, sync = True)
final_revoke_buffer()
```
(When non-cooperative threads are added, the assertion that synchronous copies
can only be `completed`, and not `cancelled`, will no longer hold.)

In the asynchronous case, the `on_*` callbacks set a pending event on the
`Waitable` which will be delivered to core wasm when core wasm calls
Expand All @@ -3541,36 +3545,46 @@ allowing multiple partial copies to complete in the interim, reducing overall
context-switching overhead.
```python
else:
def copy_event(revoke_buffer):
def copy_event(why, revoke_buffer):
revoke_buffer()
e.copying = False
return (event_code, i, pack_copy_result(task, buffer, e))
return (event_code, i, pack_copy_result(task, e, buffer, why))
def on_partial_copy(revoke_buffer):
e.set_event(partial(copy_event, revoke_buffer))
def on_copy_done():
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
e.set_event(partial(copy_event, 'completed', revoke_buffer))
def on_copy_done(why):
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done':
e.copying = True
return [BLOCKED]
return [pack_copy_result(task, buffer, e)]
return [pack_copy_result(task, e, buffer, 'completed')]
```
However the copy completes, the results are reported to the caller via
`pack_copy_result`:
```python
BLOCKED = 0xffff_ffff
CLOSED = 0x8000_0000
BLOCKED = 0xffff_ffff
COMPLETED = 0x0
CLOSED = 0x1
CANCELLED = 0x2

def pack_copy_result(task, buffer, e):
if buffer.progress or not e.stream.closed():
assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED)
assert(not (buffer.progress & CLOSED))
return buffer.progress
def pack_copy_result(task, e, buffer, why):
if e.stream.closed():
result = CLOSED
elif why == 'cancelled':
result = CANCELLED
else:
return CLOSED
```
The order of tests here indicates that, if some progress was made and then the
stream was closed, only the progress is reported and the `CLOSED` status is
left to be discovered next time.
assert(why == 'completed')
assert(not isinstance(e, FutureEnd))
result = COMPLETED
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
packed = result | (buffer.progress << 4)
assert(packed != BLOCKED)
return packed
```
The `result` indicates whether the stream was closed by the other end, the
copy was cancelled by this end (via `{stream,future}.cancel-{read,write}`) or,
otherwise, completed successfully. In all cases, any number of elements (from
`0` to `n`) may have *first* been copied into or out of the buffer passed to
the `read` or `write` and so this number is packed into the `i32` result.
Comment on lines +3585 to +3587
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be possible to provide a guarantee that if CANCELLED is returned then the number of elements transferred is always 0?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking that it was indeed possible to receive CANCELLED and >0 elements. Hypothetically this could occur even with component-to-component streaming if a few partial writes succeed but then, when the reader tries to cancel, the writer is (in a hypothetical future with threads) on a different thread and the runtime is holding a lock while doing a large copy between buffers, and maybe we allow in this case the cancellation to return BLOCKED, and then it seems like we'd expect CANCELLED with >0 elements (as opposed to COMPLETED; the idea being that COMPLETED means "on its own", which is perhaps a distinction without a difference; I dunno?)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok makes sense to me, and I don't think this should be too too hard to bind in various languages. There's going to be some subtelty to interpreting the return values but I don't think that'll be too bad.



### 🔀 `canon {stream,future}.cancel-{read,write}`
Expand Down
61 changes: 35 additions & 26 deletions design/mvp/canonical-abi/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ def __init__(self, impl, dtor = None, dtor_sync = True, dtor_callback = None):
#### Buffer State

class Buffer:
MAX_LENGTH = 2**30 - 1
MAX_LENGTH = 2**28 - 1
t: ValType
remain: Callable[[], int]

Expand Down Expand Up @@ -638,7 +638,7 @@ def drop(self):

RevokeBuffer = Callable[[], None]
OnPartialCopy = Callable[[RevokeBuffer], None]
OnCopyDone = Callable[[], None]
OnCopyDone = Callable[[Literal['completed','cancelled']], None]

class ReadableStream:
t: ValType
Expand All @@ -665,19 +665,19 @@ def reset_pending(self):
self.pending_on_partial_copy = None
self.pending_on_copy_done = None

def reset_and_notify_pending(self):
def reset_and_notify_pending(self, why):
pending_on_copy_done = self.pending_on_copy_done
self.reset_pending()
pending_on_copy_done()
pending_on_copy_done(why)

def cancel(self):
self.reset_and_notify_pending()
self.reset_and_notify_pending('cancelled')

def close(self):
if not self.closed_:
self.closed_ = True
if self.pending_buffer:
self.reset_and_notify_pending()
self.reset_and_notify_pending('completed')

def closed(self):
return self.closed_
Expand Down Expand Up @@ -705,7 +705,7 @@ def copy(self, inst, buffer, on_partial_copy, on_copy_done, src, dst):
if self.pending_buffer.remain() > 0:
self.pending_on_partial_copy(self.reset_pending)
else:
self.reset_and_notify_pending()
self.reset_and_notify_pending('completed')
return 'done'

class StreamEnd(Waitable):
Expand Down Expand Up @@ -735,10 +735,10 @@ def copy(self, inst, src, on_partial_copy, on_copy_done):
class FutureEnd(StreamEnd):
def close_after_copy(self, copy_op, inst, buffer, on_copy_done):
assert(buffer.remain() == 1)
def on_copy_done_wrapper():
def on_copy_done_wrapper(why):
if buffer.remain() == 0:
self.stream.close()
on_copy_done()
on_copy_done(why)
ret = copy_op(inst, buffer, on_partial_copy = None, on_copy_done = on_copy_done_wrapper)
if ret == 'done' and buffer.remain() == 0:
self.stream.close()
Expand Down Expand Up @@ -2021,7 +2021,8 @@ async def copy(EndT, BufferT, event_code, t, opts, task, i, ptr, n):
buffer = BufferT(t, cx, ptr, n)
if opts.sync:
final_revoke_buffer = None
def on_partial_copy(revoke_buffer):
def on_partial_copy(revoke_buffer, why = 'completed'):
assert(why == 'completed')
nonlocal final_revoke_buffer
final_revoke_buffer = revoke_buffer
if not async_copy.done():
Expand All @@ -2032,29 +2033,37 @@ def on_partial_copy(revoke_buffer):
await task.wait_on(async_copy, sync = True)
final_revoke_buffer()
else:
def copy_event(revoke_buffer):
def copy_event(why, revoke_buffer):
revoke_buffer()
e.copying = False
return (event_code, i, pack_copy_result(task, buffer, e))
return (event_code, i, pack_copy_result(task, e, buffer, why))
def on_partial_copy(revoke_buffer):
e.set_event(partial(copy_event, revoke_buffer))
def on_copy_done():
e.set_event(partial(copy_event, revoke_buffer = lambda:()))
e.set_event(partial(copy_event, 'completed', revoke_buffer))
def on_copy_done(why):
e.set_event(partial(copy_event, why, revoke_buffer = lambda:()))
if e.copy(task.inst, buffer, on_partial_copy, on_copy_done) != 'done':
e.copying = True
return [BLOCKED]
return [pack_copy_result(task, buffer, e)]

BLOCKED = 0xffff_ffff
CLOSED = 0x8000_0000

def pack_copy_result(task, buffer, e):
if buffer.progress or not e.stream.closed():
assert(buffer.progress <= Buffer.MAX_LENGTH < BLOCKED)
assert(not (buffer.progress & CLOSED))
return buffer.progress
return [pack_copy_result(task, e, buffer, 'completed')]

BLOCKED = 0xffff_ffff
COMPLETED = 0x0
CLOSED = 0x1
CANCELLED = 0x2

def pack_copy_result(task, e, buffer, why):
if e.stream.closed():
result = CLOSED
elif why == 'cancelled':
result = CANCELLED
else:
return CLOSED
assert(why == 'completed')
assert(not isinstance(e, FutureEnd))
result = COMPLETED
assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28)
packed = result | (buffer.progress << 4)
assert(packed != BLOCKED)
return packed

### 🔀 `canon {stream,future}.cancel-{read,write}`

Expand Down
Loading