diff --git a/design/mvp/Async.md b/design/mvp/Async.md index 7fea096a..168fdccc 100644 --- a/design/mvp/Async.md +++ b/design/mvp/Async.md @@ -610,7 +610,7 @@ Despite the above, the following scenarios do behave deterministically: that was blocked before starting due to backpressure, cancellation completes deterministically and immediately. * When both ends of a stream or future are owned by wasm components, the - behavior of all read, write, cancel and close operations is deterministic + behavior of all read, write, cancel and drop operations is deterministic (modulo any nondeterminitic execution that determines the ordering in which the operations are performed). diff --git a/design/mvp/Binary.md b/design/mvp/Binary.md index 330bbce7..e077c6a3 100644 --- a/design/mvp/Binary.md +++ b/design/mvp/Binary.md @@ -302,15 +302,15 @@ canon ::= 0x00 0x00 f: opts: ft: => (canon lift | 0x10 t: opts: => (canon stream.write t opts (core func)) 🔀 | 0x11 t: async?: => (canon stream.cancel-read async? (core func)) 🔀 | 0x12 t: async?: => (canon stream.cancel-write async? (core func)) 🔀 - | 0x13 t: => (canon stream.close-readable t (core func)) 🔀 - | 0x14 t: => (canon stream.close-writable t (core func)) 🔀 + | 0x13 t: => (canon stream.drop-readable t (core func)) 🔀 + | 0x14 t: => (canon stream.drop-writable t (core func)) 🔀 | 0x15 t: => (canon future.new t (core func)) 🔀 | 0x16 t: opts: => (canon future.read t opts (core func)) 🔀 | 0x17 t: opts: => (canon future.write t opts (core func)) 🔀 | 0x18 t: async?: => (canon future.cancel-read async? (core func)) 🔀 | 0x19 t: async?: => (canon future.cancel-write async? (core func)) 🔀 - | 0x1a t: => (canon future.close-readable t (core func)) 🔀 - | 0x1b t: => (canon future.close-writable t (core func)) 🔀 + | 0x1a t: => (canon future.drop-readable t (core func)) 🔀 + | 0x1b t: => (canon future.drop-writable t (core func)) 🔀 | 0x1c opts: => (canon error-context.new opts (core func)) 📝 | 0x1d opts: => (canon error-context.debug-message opts (core func)) 📝 | 0x1e => (canon error-context.drop (core func)) 📝 diff --git a/design/mvp/CanonicalABI.md b/design/mvp/CanonicalABI.md index 688c1136..b2bf1eb2 100644 --- a/design/mvp/CanonicalABI.md +++ b/design/mvp/CanonicalABI.md @@ -51,9 +51,10 @@ being specified here. * [`canon subtask.cancel`](#-canon-subtaskcancel) 🔀 * [`canon subtask.drop`](#-canon-subtaskdrop) 🔀 * [`canon {stream,future}.new`](#-canon-streamfuturenew) 🔀 - * [`canon {stream,future}.{read,write}`](#-canon-streamfuturereadwrite) 🔀 + * [`canon stream.{read,write}`](#-canon-streamreadwrite) 🔀 + * [`canon future.{read,write}`](#-canon-futurereadwrite) 🔀 * [`canon {stream,future}.cancel-{read,write}`](#-canon-streamfuturecancel-readwrite) 🔀 - * [`canon {stream,future}.close-{readable,writable}`](#-canon-streamfutureclose-readablewritable) 🔀 + * [`canon {stream,future}.drop-{readable,writable}`](#-canon-streamfuturedrop-readablewritable) 🔀 * [`canon error-context.new`](#-canon-error-contextnew) 📝 * [`canon error-context.debug-message`](#-canon-error-contextdebug-message) 📝 * [`canon error-context.drop`](#-canon-error-contextdrop) 📝 @@ -499,8 +500,8 @@ Waitables deliver "events" which are values of the following `EventTuple` type. The two `int` "payload" fields of `EventTuple` store core wasm `i32`s and are to be interpreted based on the `EventCode`. The meaning of the different `EventCode`s and their payloads will be introduced incrementally below by the -code that produces the events (specifically, in `subtask_event` and -`copy_event`). +code that produces the events (specifically, in `subtask_event`, `stream_event` +or `future_event`). ```python class EventCode(IntEnum): NONE = 0 @@ -523,8 +524,8 @@ waits on this `Waitable` (which may take an arbitrarily long time). A the closure can specify behaviors that trigger *right before* events are delivered to core wasm and so that the closure can compute the event based on the state of the world at delivery time (as opposed to when `pending_event` was -first set). Currently, `pending_event` holds a closure of either the -`subtask_event` or `copy_event` functions defined below. An optimizing +first set). Currently, `pending_event` holds a closure of the `subtask_event`, +`stream_event` or `future_event` functions defined below. An optimizing implementation would avoid closure allocation by inlining a union containing the closure fields directly in the component instance table. @@ -1250,16 +1251,16 @@ guest via a `CopyResult` code: ```python class CopyResult(IntEnum): COMPLETED = 0 - CLOSED = 1 + DROPPED = 1 CANCELLED = 2 ``` -The `CLOSED` code indicates that the *other* end has since closed their end and +The `DROPPED` code indicates that the *other* end has since been dropped and thus no more reads/writes are possible. The `CANCELLED` code is only possible after *this* end has performed a `{stream,future}.{read,write}` followed by a `{stream,future}.cancel-{read,write}`; `CANCELLED` notifies the wasm code that the cancellation finished and so ownership of the memory buffer has been returned to the wasm code. Lastly, `COMPLETED` indicates that at least one -value has been copied and neither `CLOSED` nor `CANCELLED` apply. +value has been copied and neither `DROPPED` nor `CANCELLED` apply. As with functions and buffers, native host code can be on either side of a stream. Thus, streams are defined in terms of an abstract interface that can be @@ -1280,8 +1281,7 @@ class ReadableStream: t: ValType read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], None] cancel: Callable[[], None] - close: Callable[[], None] - closed: Callable[[], bool] + drop: Callable[[], None] ``` The key operation is `read` which works as follows: * `read` never blocks and returns its values by either synchronously or @@ -1298,7 +1298,7 @@ The key operation is `read` which works as follows: the buffer has been returned; `cancel` only lets the caller *request* that `read` call one of the `OnCopy*` callbacks ASAP (which may or may not happen during `cancel`). -* The client may not call `read` or `close` while there is still an unfinished +* The client may not call `read` or `drop` while there is still an unfinished `read` of the same `ReadableStream`. The `OnCopy*` callbacks are a spec-internal detail used to specify the allowed @@ -1316,7 +1316,7 @@ and writable ends of streams (defined below). Introducing the class in chunks, starting with the fields and initialization: ```python class SharedStreamImpl(ReadableStream): - closed_: bool + dropped: bool pending_inst: Optional[ComponentInstance] pending_buffer: Optional[Buffer] pending_on_copy: Optional[OnCopy] @@ -1324,7 +1324,7 @@ class SharedStreamImpl(ReadableStream): def __init__(self, t): self.t = t - self.closed_ = False + self.dropped = False self.reset_pending() def reset_pending(self): @@ -1338,8 +1338,8 @@ class SharedStreamImpl(ReadableStream): ``` If set, the `pending_*` fields record the `Buffer` and `OnCopy*` callbacks of a `read` or `write` that is waiting to rendezvous with a complementary `write` or -`read`. Closing the readable or writable end of a stream or cancelling a `read` -or `write` notifies any pending `read` or `write` via its `OnCopyDone` +`read`. Dropping the readable or writable end of a stream or cancelling a +`read` or `write` notifies any pending `read` or `write` via its `OnCopyDone` callback: ```python def reset_and_notify_pending(self, result): @@ -1350,14 +1350,11 @@ callback: def cancel(self): self.reset_and_notify_pending(CopyResult.CANCELLED) - def close(self): - if not self.closed_: - self.closed_ = True + def drop(self): + if not self.dropped: + self.dropped = True if self.pending_buffer: - self.reset_and_notify_pending(CopyResult.CLOSED) - - def closed(self): - return self.closed_ + self.reset_and_notify_pending(CopyResult.DROPPED) ``` While the abstract `ReadableStream` interface *allows* `cancel` to return without having returned ownership of the buffer (which, in general, is @@ -1365,11 +1362,11 @@ necessary for [various][OIO] [host][io_uring] APIs), when *wasm* is implementing the stream, `cancel` always returns ownership of the buffer immediately. -Note that `cancel` and `close` notify in opposite directions: +Note that `cancel` and `drop` notify in opposite directions: * `cancel` *must* be called on a readable or writable end with an operation pending, and thus `cancel` notifies the same end that called it. -* `close` *must not* be called on a readable or writable end with an operation - pending, and thus `close` notifies the opposite end. +* `drop` *must not* be called on a readable or writable end with an operation + pending, and thus `drop` notifies the opposite end. The `read` method implements the `ReadableStream.read` interface described above and is called by either `stream.read` or the host, depending on who is @@ -1383,8 +1380,8 @@ case where both the reader and pending writer have zero-length buffers, the writer is notified, but the reader remains blocked: ```python def read(self, inst, dst_buffer, on_copy, on_copy_done): - if self.closed_: - on_copy_done(CopyResult.CLOSED) + if self.dropped: + on_copy_done(CopyResult.DROPPED) elif not self.pending_buffer: self.set_pending(inst, dst_buffer, on_copy, on_copy_done) else: @@ -1416,8 +1413,8 @@ when a zero-length `write` rendezvous with a zero-length `read`, in which case the `write` eagerly completes, leaving the `read` pending: ```python def write(self, inst, src_buffer, on_copy, on_copy_done): - if self.closed_: - on_copy_done(CopyResult.CLOSED) + if self.dropped: + on_copy_done(CopyResult.DROPPED) elif not self.pending_buffer: self.set_pending(inst, src_buffer, on_copy, on_copy_done) else: @@ -1457,25 +1454,22 @@ buffering is analogous to the buffering performed in kernel memory by a Given the above, we can define the `{Readable,Writable}StreamEnd` classes that are actually stored in the component instance table. The classes are almost entirely symmetric, with the only difference being whether the polymorphic -`copy` method (used below) calls `read` or `write`. The `copying` field tracks -whether there is an asynchronous read or write in progress and is maintained by -the definitions of `stream.{read,write}` below. Importantly, `copying` and the -inherited fields of `Waitable` are per-*end*, not per-*stream* (unlike the -fields of `SharedStreamImpl` shown above, which are per-stream and shared by -both ends via their `shared` field). +`copy` method (used below) calls `read` or `write`: ```python class StreamEnd(Waitable): shared: ReadableStream copying: bool + done: bool def __init__(self, shared): Waitable.__init__(self) self.shared = shared self.copying = False + self.done = False def drop(self): trap_if(self.copying) - self.shared.close() + self.shared.drop() Waitable.drop(self) class ReadableStreamEnd(StreamEnd): @@ -1486,11 +1480,19 @@ class WritableStreamEnd(StreamEnd): def copy(self, inst, src, on_copy, on_copy_done): self.shared.write(inst, src, on_copy, on_copy_done) ``` +The `copying` field tracks whether there is an asynchronous read or write in +progress and is maintained by the definitions of `stream.{read,write}` below. +The `done` field tracks whether this end has been notified that the other end +was dropped (via `CopyResult.DROPPED`) and thus no further read/write +operations are allowed. Importantly, `copying` and `done` are per-*end*, not +per-*stream* (unlike the fields of `SharedStreamImpl` shown above, which are +per-stream and shared by both ends via their `shared` field). + Dropping a stream end while an asynchronous read or write is in progress traps since the async read or write cannot be cancelled without blocking and `drop` -(called by `stream.close-{readable,writable}`) is synchronous and non-blocking. +(called by `stream.drop-{readable,writable}`) is synchronous and non-blocking. This means that client code must take care to wait for these operations to -finish before closing. +finish before dropping. The `{Readable,Writable}StreamEnd.copy` method is called polymorphically by the shared definition of `stream.{read,write}` below. While the static type of @@ -1501,41 +1503,133 @@ unconditionally call `stream.write`. #### Future State -Given the above definitions for `stream`, `future` can be simply defined as a -`stream` that transmits only 1 value before automatically closing itself. This -can be achieved by wrapping the `OnCopy*` callbacks and closing once a value -has been read-from or written-to the given buffer: -```python -class FutureEnd(StreamEnd): - def close_after_copy(self, copy, inst, buffer, on_copy, on_copy_done): - assert(buffer.remain() == 1) - def on_copy_wrapper(revoke_buffer): - assert(buffer.remain() == 0) - self.shared.close() - def on_copy_done_wrapper(result): - if buffer.remain() == 0: - self.shared.close() - on_copy_done(CopyResult.CLOSED) - else: - on_copy_done(result) - copy(inst, buffer, on_copy_wrapper, on_copy_done_wrapper) +Futures are similar to streams, except that instead of passing 0..N values, +exactly one value is passed from the writer end to the reader end unless the +reader end is explicitly dropped first. + +Like streams, futures are defined in terms of an abstract `ReadableFuture` +interface that can be implemented by the host or wasm: +```python +class ReadableFuture: + t: ValType + read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], None] + cancel: Callable[[], None] + drop: Callable[[], None] +``` +The `ReadableFuture` interface works like `ReadableStream` except that there is +no `OnCopy` callback passed to `read` to report partial progress (since at most +1 value is copied) and the given `WritableBuffer` must have `remain() == 1`. + +Introducing `SharedFutureImpl` in chunks, the first part is exactly +symmetric to `SharedStreamImpl` in how initialization and cancellation work: +```python +class SharedFutureImpl(ReadableFuture): + dropped: bool + pending_inst: Optional[ComponentInstance] + pending_buffer: Optional[Buffer] + pending_on_copy_done: Optional[OnCopyDone] + + def __init__(self, t): + self.t = t + self.dropped = False + self.reset_pending() + + def reset_pending(self): + self.set_pending(None, None, None) + + def set_pending(self, inst, buffer, on_copy_done): + self.pending_inst = inst + self.pending_buffer = buffer + self.pending_on_copy_done = on_copy_done + + def reset_and_notify_pending(self, result): + pending_on_copy_done = self.pending_on_copy_done + self.reset_pending() + pending_on_copy_done(result) + + def cancel(self): + self.reset_pending_and_notify_pending(CopyResult.CANCELLED) +``` +Dropping works almost the same in futures as streams, except that a future +writable end cannot be dropped without having written a value. This is guarded +by `WritableFutureEnd.drop` so it can be asserted here: +```python + def drop(self): + assert(not self.dropped) + self.dropped = True + if self.pending_buffer: + assert(isinstance(self.pending_buffer, WritableBuffer)) + self.reset_and_notify_pending(CopyResult.DROPPED) +``` +Lastly, `read` and `write` work mostly like streams, but simplified based on +the fact that we're copying at most 1 value. The only asymmetric difference is +that, as mentioned above, only the writable end can observe that the readable +end was dropped before receiving a value. +```python + def read(self, inst, dst_buffer, on_copy_done): + assert(not self.dropped and dst_buffer.remain() == 1) + if not self.pending_buffer: + self.set_pending(inst, dst_buffer, on_copy_done) + else: + trap_if(inst is self.pending_inst and self.t is not None) # temporary + dst_buffer.write(self.pending_buffer.read(1)) + self.reset_and_notify_pending(CopyResult.COMPLETED) + on_copy_done(CopyResult.COMPLETED) + + def write(self, inst, src_buffer, on_copy_done): + assert(src_buffer.remain() == 1) + if self.dropped: + on_copy_done(CopyResult.DROPPED) + elif not self.pending_buffer: + self.set_pending(inst, src_buffer, on_copy_done) + else: + trap_if(inst is self.pending_inst and self.t is not None) # temporary + self.pending_buffer.write(src_buffer.read(1)) + self.reset_and_notify_pending(CopyResult.COMPLETED) + on_copy_done(CopyResult.COMPLETED) +``` +As with streams, the `# temporary` limitation shown above is that a future +cannot be read and written from the same component instance when it has a +non-empty value type. + +Lastly, the `{Readable,Writable}FutureEnd` classes are mostly symmetric with +`{Readable,Writable}StreamEnd`, with the only difference being that +`WritableFutureEnd.drop` traps if the writer hasn't successfully written a +value or been notified of the reader dropping their end: +```python +class FutureEnd(Waitable): + shared: ReadableFuture + copying: bool + done: bool + + def __init__(self, shared): + Waitable.__init__(self) + self.shared = shared + self.copying = False + self.done = False + + def drop(self): + trap_if(self.copying) + Waitable.drop(self) class ReadableFutureEnd(FutureEnd): - def copy(self, inst, dst, on_copy, on_copy_done): - return self.close_after_copy(self.shared.read, inst, dst, on_copy, on_copy_done) + def copy(self, inst, src_buffer, on_copy_done): + self.shared.read(inst, src_buffer, on_copy_done) + + def drop(self): + self.shared.drop() + FutureEnd.drop(self) class WritableFutureEnd(FutureEnd): - def copy(self, inst, src, on_copy, on_copy_done): - return self.close_after_copy(self.shared.write, inst, src, on_copy, on_copy_done) + def copy(self, inst, dst_buffer, on_copy_done): + self.shared.write(inst, dst_buffer, on_copy_done) + def drop(self): - trap_if(not self.shared.closed()) + trap_if(not self.done) FutureEnd.drop(self) ``` -The `future.{read,write}` built-ins fix the buffer length to `1`, ensuring the -`assert(buffer.remain() == 1)` holds. - -The additional `trap_if` in `WritableFutureEnd.drop` ensures that a future -must have written a value before closing. +The `copying` and `done` fields are maintained by the `future` built-ins +defined below. ### Despecialization @@ -1998,17 +2092,17 @@ kept alive until the subtask completes, which in turn prevents the current task from `task.return`ing while its non-returned subtask still holds a transitively-borrowed handle. -Streams and futures are lifted in almost the same way, with the only difference -being that it is a dynamic error to attempt to lift a `future` that has already -been successfully read (which will leave it `closed()`): +Streams and futures are entirely symmetric, transferring ownership of the +readable end from the lifting component to the host or lowering component and +trapping if the readable end is in the middle of `copying` (which would create +a dangling-pointer situation) or is already `done` (in which case the only +valid operation is `{stream,future}.drop-{readable,writable}`). ```python def lift_stream(cx, i, t): return lift_async_value(ReadableStreamEnd, cx, i, t) def lift_future(cx, i, t): - v = lift_async_value(ReadableFutureEnd, cx, i, t) - trap_if(v.closed()) - return v + return lift_async_value(ReadableFutureEnd, cx, i, t) def lift_async_value(ReadableEndT, cx, i, t): assert(not contains_borrow(t)) @@ -2016,10 +2110,9 @@ def lift_async_value(ReadableEndT, cx, i, t): trap_if(not isinstance(e, ReadableEndT)) trap_if(e.shared.t != t) trap_if(e.copying) + trap_if(e.done) return e.shared ``` -Lifting transfers ownership of the readable end and traps if a read was in -progress (which would now be dangling). ### Storing @@ -2441,22 +2534,19 @@ type, the only thing the borrowed handle is good for is calling `resource.rep`, so lowering might as well avoid the overhead of creating an intermediate borrow handle. -Lowering a `stream` or `future` is almost entirely symmetric and simply add a -new readable end to the current component instance's table, passing the index -of the new element to core wasm. The `trap_if(v.closed())` in `lift_future` -ensures the validity of the `assert(not v.closed())` in `lower_future`. +Lowering a `stream` or `future` is entirely symmetric and simply adds a new +readable end to the current component instance's table, passing the index of +the new element to core wasm: ```python def lower_stream(cx, v, t): - return lower_async_value(ReadableStreamEnd, cx, v, t) + assert(isinstance(v, ReadableStream)) + assert(not contains_borrow(t)) + return cx.inst.table.add(ReadableStreamEnd(v)) def lower_future(cx, v, t): - assert(not v.closed()) - return lower_async_value(ReadableFutureEnd, cx, v, t) - -def lower_async_value(ReadableEndT, cx, v, t): - assert(isinstance(v, ReadableStream)) + assert(isinstance(v, ReadableFuture)) assert(not contains_borrow(t)) - return cx.inst.table.add(ReadableEndT(v)) + return cx.inst.table.add(ReadableFutureEnd(v)) ``` @@ -3735,18 +3825,14 @@ async def canon_stream_new(stream_t, task): async def canon_future_new(future_t, task): trap_if(not task.inst.may_leave) - shared = SharedStreamImpl(future_t.t) + shared = SharedFutureImpl(future_t.t) ri = task.inst.table.add(ReadableFutureEnd(shared)) wi = task.inst.table.add(WritableFutureEnd(shared)) return [ ri | (wi << 32) ] ``` -Because futures are just streams with extra limitations, here we see that a -`WritableFutureEnd` shares the same `SharedStreamImpl` type as -`WritableStreamEnd`; the extra limitations are added by `WritableFutureEnd` and -the future built-ins below. -### 🔀 `canon {stream,future}.{read,write}` +### 🔀 `canon stream.{read,write}` For canonical definitions: ```wat @@ -3762,93 +3848,76 @@ specifies: * [`lift($t)` above](#canonopt-validation) defines required options for `stream.read` * `memory` is required to be present -For canonical definitions: -```wat -(canon future.read $future_t $opts (core func $f)) -(canon future.write $future_t $opts (core func $f)) -``` -validation specifies: -* `$f` is given type `(func (param i32 i32) (result i32))` -* `$stream_t` must be a type of the form `(stream $t?)` -* If `$t` is present: - * [`lower($t)` above](#canonopt-validation) defines required options for `future.write` - * [`lift($t)` above](#canonopt-validation) defines required options for `future.read` - * `memory` is required to be present - -The implementation of these four built-ins all funnel down to a single -parameterized `copy` function: +The implementation of these built-ins funnels down to a single `stream_copy` +function that is parameterized by the direction of the copy: ```python async def canon_stream_read(stream_t, opts, task, i, ptr, n): - return await copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ, - stream_t, opts, task, i, ptr, n) + return await stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ, + stream_t, opts, task, i, ptr, n) async def canon_stream_write(stream_t, opts, task, i, ptr, n): - return await copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE, - stream_t, opts, task, i, ptr, n) - -async def canon_future_read(future_t, opts, task, i, ptr): - return await copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ, - future_t, opts, task, i, ptr, 1) - -async def canon_future_write(future_t, opts, task, i, ptr): - return await copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE, - future_t, opts, task, i, ptr, 1) + return await stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE, + stream_t, opts, task, i, ptr, n) ``` -Introducing the `copy` function in chunks, `copy` first checks that the -element at index `i` is of the right type and that there is not already a -copy in progress. (In the future, this restriction could be relaxed, allowing -a finite number of pipelined reads or writes.) +Introducing the `stream_copy` function in chunks, `stream_copy` first checks +that the element at index `i` is of the right type, not already `copying`, and +not already `done` (as defined next). (In the future, the `copying` trap could +be relaxed, allowing a finite number of pipelined reads or writes.) ```python -async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n): +async def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n): trap_if(not task.inst.may_leave) e = task.inst.table.get(i) trap_if(not isinstance(e, EndT)) - trap_if(e.shared.t != stream_or_future_t.t) - trap_if(e.copying) + trap_if(e.shared.t != stream_t.t) + trap_if(e.copying or e.done) ``` -Then a readable or writable buffer is created which (in `Buffer`'s -constructor) eagerly checks the alignment and bounds of (`i`, `n`). -(In the future, the restriction on futures/streams containing `borrow`s could -be relaxed by maintaining sufficient bookkeeping state to ensure that -borrowed handles *or streams/futures of borrowed handles* could not outlive -their originating call.) +Then a readable or writable buffer is created which (in `Buffer`'s constructor) +eagerly checks the alignment and bounds of (`i`, `n`). (In the future, the +restriction on futures/streams containing `borrow`s could be relaxed by +maintaining sufficient bookkeeping state to ensure that borrowed handles *or +streams/futures of borrowed handles* could not outlive their originating call.) ```python - assert(not contains_borrow(stream_or_future_t)) + assert(not contains_borrow(stream_t)) cx = LiftLowerContext(opts, task.inst, borrow_scope = None) - buffer = BufferT(stream_or_future_t.t, cx, ptr, n) + buffer = BufferT(stream_t.t, cx, ptr, n) ``` Next, the `copy` method of `{Readable,Writable}{Stream,Future}End` is called to perform the actual read/write. The `on_copy*` callbacks passed to `copy` bind -and store a `copy_event` closure on the readable/writable end (via the +and store a `stream_event` closure on the readable/writable end (via the inherited `Waitable.set_event`) which will be called right before the event is -delivered to core wasm. `copy_event` first calls `revoke_buffer` to regain +delivered to core wasm. `stream_event` first calls `revoke_buffer` to regain ownership of `buffer` and prevent any further partial reads/writes. Thus, up until event delivery, the other end of the stream is free to repeatedly read/write from/to `buffer`, ideally filling it up and minimizing context switches. Next, `copying` is cleared to reenable future `stream.{read,write}` -calls. Lastly, `copy_event` packs the `CopyResult` and number of elements -copied up until this point into a single `i32` payload for core wasm. +calls. However, if the `CopyResult` is `DROPPED`, `dropped` is set to disallow +all future use of this stream end. Lastly, `stream_event` packs the +`CopyResult` and number of elements copied up until this point into a single +`i32` payload for core wasm. ```python - def copy_event(result, revoke_buffer): + def stream_event(result, revoke_buffer): revoke_buffer() assert(e.copying) e.copying = False + if result == CopyResult.DROPPED: + e.done = True assert(0 <= result < 2**4) assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28) packed_result = result | (buffer.progress << 4) return (event_code, i, packed_result) + return (event_code, i, pack_stream_result(result, buffer)) def on_copy(revoke_buffer): - e.set_event(partial(copy_event, CopyResult.COMPLETED, revoke_buffer)) + e.set_event(partial(stream_event, CopyResult.COMPLETED, revoke_buffer)) def on_copy_done(result): - e.set_event(partial(copy_event, result, revoke_buffer = lambda:())) + e.set_event(partial(stream_event, result, revoke_buffer = lambda:())) e.copying = True e.copy(task.inst, buffer, on_copy, on_copy_done) ``` -If `{stream,future}.{read,write}` is called synchronously, the call waits for +If `stream.{read,write}` is called synchronously, the call waits for progress if necessary (blocking all execution in the calling component instance, but allowing other tasks in other component instances to make progress): @@ -3856,11 +3925,11 @@ progress): if opts.sync and not e.has_pending_event(): await task.wait_on(e.wait_for_pending_event(), sync = True) ``` -Finally, if there is a pending event on the stream/future end (which is -necessarily a `copy_event` closure), it is eagerly returned to the caller. -Otherwise, the `BLOCKED` code is returned and the caller must asynchronously -wait for an event using `waitable-set.{wait,poll}` or, if using a `callback`, -by returning to the event loop. +Finally, if there is a pending event on the stream end (which is necessarily a +`copy_event` closure), it is eagerly returned to the caller. Otherwise, the +`BLOCKED` code is returned and the caller must asynchronously wait for an event +using `waitable-set.{wait,poll}` or, if using a `callback`, by returning to the +event loop. ```python if e.has_pending_event(): code,index,payload = e.get_event() @@ -3871,6 +3940,91 @@ by returning to the event loop. ``` +### 🔀 `canon future.{read,write}` + +For canonical definitions: +```wat +(canon future.read $future_t $opts (core func $f)) +(canon future.write $future_t $opts (core func $f)) +``` +In addition to [general validation of `$opts`](#canonopt-validation) validation +specifies: +* `$f` is given type `(func (param i32 i32) (result i32))` +* `$future_t` must be a type of the form `(future $t?)` +* If `$t` is present: + * [`lift($t)` above](#canonopt-validation) defines required options for `future.read` + * [`lower($t)` above](#canonopt-validation) defines required options for `future.write` + * `memory` is required to be present + +The implementation of these built-ins funnels down to a single `future_copy` +function that is parameterized by the direction of the copy: +```python +async def canon_future_read(future_t, opts, task, i, ptr): + return await future_copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ, + future_t, opts, task, i, ptr) + +async def canon_future_write(future_t, opts, task, i, ptr): + return await future_copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE, + future_t, opts, task, i, ptr) +``` + +Introducing the `future_copy` function in chunks, `future_copy` starts with the +same set of guards as `stream_copy` for parameters `i` and `ptr`. The only +difference is that, with futures, the `Buffer` length is fixed to `1`. +```python +async def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr): + trap_if(not task.inst.may_leave) + e = task.inst.table.get(i) + trap_if(not isinstance(e, EndT)) + trap_if(e.shared.t != future_t.t) + trap_if(e.copying or e.done) + + assert(not contains_borrow(future_t)) + cx = LiftLowerContext(opts, task.inst, borrow_scope = None) + buffer = BufferT(future_t.t, cx, ptr, 1) +``` +Next, the `copy` method of `{Readable,Writable}FutureEnd.copy` is called to +perform the actual read/write. Other than the simplifications allowed by the +absence of repeated partial copies, the main difference in the following code +from the stream code is that `future_event` sets the `done` flag for *both* the +`DROPPED` and `COMPLETED` results, whereas `stream_event` sets `done` only for +`DROPPED`. This ensures that futures are read/written at most once and futures +are only passed to other components in a state where they are ready to be +read/written. Another important difference is that, since the buffer length is +always implied by the `CopyResult`, the number of elements copied is not packed +in the high 28 bits; they're always zero. +```python + def future_event(result): + assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED)) + assert(e.copying) + e.copying = False + if result == CopyResult.DROPPED or result == CopyResult.COMPLETED: + e.done = True + return (event_code, i, int(result)) + + def on_copy_done(result): + assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE) + e.set_event(partial(future_event, result)) + + e.copying = True + e.copy(task.inst, buffer, on_copy_done) +``` +The end of `future_copy` is the exact same as `stream_copy`: waiting if `sync` +and returning either the progress made or `BLOCKED`. +```python + + if opts.sync and not e.has_pending_event(): + await task.wait_on(e.wait_for_pending_event(), sync = True) + + if e.has_pending_event(): + code,index,payload = e.get_event() + assert(code == event_code and index == i) + return [payload] + else: + return [BLOCKED] +``` + + ### 🔀 `canon {stream,future}.cancel-{read,write}` For canonical definitions: @@ -3934,19 +4088,19 @@ has served only to asynchronously request that the host relinquish the buffer ASAP without waiting for anything to be read or written. If `BLOCKING` is *not* returned, the pending event (which is necessarily a -`copy_event`) is eagerly delivered to core wasm as the return value, thereby +`stream_event`) is eagerly delivered to core wasm as the return value, thereby saving an additional turn of the event loop. In this case, the core wasm caller can assume that ownership of the buffer has been returned. -### 🔀 `canon {stream,future}.close-{readable,writable}` +### 🔀 `canon {stream,future}.drop-{readable,writable}` For canonical definitions: ```wat -(canon stream.close-readable $stream_t (core func $f)) -(canon stream.close-writable $stream_t (core func $f)) -(canon future.close-readable $future_t (core func $f)) -(canon future.close-writable $future_t (core func $f)) +(canon stream.drop-readable $stream_t (core func $f)) +(canon stream.drop-writable $stream_t (core func $f)) +(canon future.drop-readable $future_t (core func $f)) +(canon future.drop-writable $future_t (core func $f)) ``` validation specifies: * `$f` is given type `(func (param i32 i32))` @@ -3957,19 +4111,19 @@ the given index from the current component instance's table, performing the guards and bookkeeping defined by `{Readable,Writable}{Stream,Future}End.drop()` above. ```python -async def canon_stream_close_readable(stream_t, task, i): - return await close(ReadableStreamEnd, stream_t, task, i) +async def canon_stream_drop_readable(stream_t, task, i): + return await drop(ReadableStreamEnd, stream_t, task, i) -async def canon_stream_close_writable(stream_t, task, hi): - return await close(WritableStreamEnd, stream_t, task, hi) +async def canon_stream_drop_writable(stream_t, task, hi): + return await drop(WritableStreamEnd, stream_t, task, hi) -async def canon_future_close_readable(future_t, task, i): - return await close(ReadableFutureEnd, future_t, task, i) +async def canon_future_drop_readable(future_t, task, i): + return await drop(ReadableFutureEnd, future_t, task, i) -async def canon_future_close_writable(future_t, task, hi): - return await close(WritableFutureEnd, future_t, task, hi) +async def canon_future_drop_writable(future_t, task, hi): + return await drop(WritableFutureEnd, future_t, task, hi) -async def close(EndT, stream_or_future_t, task, hi): +async def drop(EndT, stream_or_future_t, task, hi): trap_if(not task.inst.may_leave) e = task.inst.table.remove(hi) trap_if(not isinstance(e, EndT)) diff --git a/design/mvp/Explainer.md b/design/mvp/Explainer.md index 3a78e4e3..ea624397 100644 --- a/design/mvp/Explainer.md +++ b/design/mvp/Explainer.md @@ -729,13 +729,13 @@ are useful for: as a `list`; * long-running or infinite streams of events. -A `future` is a special case of `stream` and (in non-error scenarios) delivers -exactly one value before being automatically closed. Because all imports can -be [called asynchronously](Async.md), futures are not necessary to express a -traditional `async` function -- all functions are effectively `async`. Instead -futures are useful in more advanced scenarios where a parameter or result -value may not be ready at the same time as the other synchronous parameters or -results. +A `future` asynchronously delivers exactly one `T` value from a source to a +destination, unless the destination signals that it doesn't want the `T` value +any more. Because all imports can be [called asynchronously](Async.md), futures +are not necessary to express a traditional `async` function -- all functions +are effectively `async`. Instead futures are useful in more advanced scenarios +where a parameter or result value may not be ready at the same time as the +other synchronous parameters or results. The `T` element type of `stream` and `future` is an optional `valtype`. As with variant-case payloads and function results, when `T` is absent, the "value(s)" @@ -1432,15 +1432,15 @@ canon ::= ... | (canon stream.write * (core func ?)) 🔀 | (canon stream.cancel-read async? (core func ?)) 🔀 | (canon stream.cancel-write async? (core func ?)) 🔀 - | (canon stream.close-readable (core func ?)) 🔀 - | (canon stream.close-writable (core func ?)) 🔀 + | (canon stream.drop-readable (core func ?)) 🔀 + | (canon stream.drop-writable (core func ?)) 🔀 | (canon future.new (core func ?)) 🔀 | (canon future.read * (core func ?)) 🔀 | (canon future.write * (core func ?)) 🔀 | (canon future.cancel-read async? (core func ?)) 🔀 | (canon future.cancel-write async? (core func ?)) 🔀 - | (canon future.close-readable (core func ?)) 🔀 - | (canon future.close-writable (core func ?)) 🔀 + | (canon future.drop-readable (core func ?)) 🔀 + | (canon future.drop-writable (core func ?)) 🔀 | (canon error-context.new * (core func ?)) 📝 | (canon error-context.debug-message * (core func ?)) 📝 | (canon error-context.drop (core func ?)) 📝 @@ -1657,10 +1657,10 @@ where `event` is defined in WIT as: variant event { none, subtask(subtask-index, subtask-state), - stream-read(stream-index, copy-result), - stream-write(stream-index, copy-result), - future-read(future-index, copy-result), - future-write(future-index, copy-result), + stream-read(stream-index, stream-result), + stream-write(stream-index, stream-result), + future-read(future-index, future-read-result), + future-write(future-index, future-write-result), task-cancelled, } @@ -1687,9 +1687,9 @@ code in the interim). A `subtask` event notifies the supertask that its subtask is now in the given state (the meanings of which are described by the [async explainer]). -The meanings of the `{stream,future}-{read,write}` events as well as the -definition of `copy-result` is given as part [`stream.read` and -`stream.write`](#-streamread-and-streamwrite) below. +The meanings of the `{stream,future}-{read,write}` events/payloads are given as +part [`stream.read` and `stream.write`](#-streamread-and-streamwrite) and +[`future.read` and `future.write`](#-futureread-and-futurewrite) below. In the Canonical ABI, the `event-code` return value provides the `event` discriminant and the case payloads are stored as two contiguous `i32`s at the @@ -1797,27 +1797,27 @@ An analogous relationship exists among `readable-future-end`, ###### 🔀 `stream.read` and `stream.write` -| Synopsis | | -| -------------------------------------------- | --------------------------------------------------------------------------------------------- | -| Approximate WIT signature for `stream.read` | `func>(e: readable-stream-end, b: writable-buffer?) -> option` | -| Approximate WIT signature for `stream.write` | `func>(e: writable-stream-end, b: readable-buffer?) -> option` | -| Canonical ABI signature | `[stream-end:i32 ptr:i32 num:i32] -> [i32]` | +| Synopsis | | +| -------------------------------------------- | ----------------------------------------------------------------------------------------------- | +| Approximate WIT signature for `stream.read` | `func>(e: readable-stream-end, b: writable-buffer?) -> option` | +| Approximate WIT signature for `stream.write` | `func>(e: writable-stream-end, b: readable-buffer?) -> option` | +| Canonical ABI signature | `[stream-end:i32 ptr:i32 num:i32] -> [i32]` | -where `copy-result` is defined in WIT as: +where `stream-result` is defined in WIT as: ```wit -record copy-result { +record stream-result { progress: u32, - status: copy-status + result: copy-result } -enum copy-status { +enum copy-result { // The read/write completed successfully and is ready for more. completed, - // The other end closed and so there will be no more copies. - closed, + // The other end was dropped and so this end must now be dropped. + dropped, - // The read/write was cancelled by {stream,future}.cancel-{read,write}. + // The read/write was cancelled by stream.cancel-{read,write}; future reads/writes are possible. cancelled } ``` @@ -1827,95 +1827,133 @@ writable end] of a stream as the first parameter and, if `T` is present, a buffer for the `T` values to be read from or written to. If `T` is not present, the buffer parameter is ignored. -If the return value is a `copy-result`, then the `progress` field indicates how -many `T` elements were read or written from the given buffer before the stream -or future reached the status indicated in the `status` field. For example, a -return value of `{progress:4, status:closed}` from a `stream.read` means -that 32 bytes were copied into the given buffer before the writer end closed -the stream. The `cancelled` case can only arise as the result of a -`{stream,future}.cancel-{read,write}` operation (defined below) and is included -in `copy-status` because the `copy-result` type is shared. +If the return value is a `stream-result`, then the `progress` field indicates how +many `T` elements were read or written from the given buffer before the +`copy-result` was reached. For example, a return value of `{progress: 4, +result: dropped}` from a `stream.read` means that 32 bytes were copied +into the given buffer before the writer end dropped the stream. The `cancelled` +case can only arise as the result of a call to `stream.cancel-{read,write}` +and is included in `copy-result` because it is reused below. If the return value is `none`, then the operation blocked and the caller needs to [wait](Async.md#waiting) for progress (via `waitable-set.{wait,poll}` or, if using a `callback`, by returning to the event loop) which will asynchronously -produce an `event` containing a `copy-result`. +produce an `event` containing a `stream-result`. -In the Canonical ABI, the buffer is passed as an `i32` offset into linear -memory and the `i32` size in elements of the buffer and the -`option` return value is bit-packed into the single `i32` return -value where: +In the Canonical ABI, the `{readable,writable}-stream-end` is passed as an +`i32` index into the component instance's table followed by a pair of `i32`s +describing the linear memory offset and size-in-elements of the +`{readable,writable}-buffer`. The `option` return value is +bit-packed into a single `i32` where: * `0xffff_ffff` represents `none`. -* Otherwise, the `status` is in the low 4 bits and the `progress` is in the +* Otherwise, the `result` is in the low 4 bits and the `progress` is in the high 28 bits. (See [`canon_stream_read`] in the Canonical ABI explainer for details.) ###### 🔀 `future.read` and `future.write` -| Synopsis | | -| -------------------------------------------- | ------------------------------------------------------------------------------------------------ | -| Approximate WIT signature for `future.read` | `func>(e: readable-future-end, b: writable-buffer?) -> option` | -| Approximate WIT signature for `future.write` | `func>(e: writable-future-end, b: readable-buffer?) -> option` | -| Canonical ABI signature | `[future-end:i32 ptr:i32] -> [i32]` | - -where `copy-result` is defined as in [`stream.read` and -`stream.write`](#-streamread-and-streamwrite). The `` in the buffer types -indicates that these buffers may hold at most one `T` element. - -The `future.{read,write}` built-ins take the matching [readable or writable -end] of a future as the first parameter, and, if `T` is present, a buffer for a -single `T` value to read into or write from as the second parameter. If `T` is -not present, the second parameter is ignored. The return value has the same -meaning as with `stream.{read,write}`, where the buffer-size has been fixed to -`1`. - -The Canonical ABI is the same as `stream.{read,write}` except for the removal -of the `num` `i32` parameter. (See [`canon_future_read`] in the Canonical ABI -explainer for details.) +| Synopsis | | +| -------------------------------------------- | -------------------------------------------------------------------------------------------------------- | +| Approximate WIT signature for `future.read` | `func>(e: readable-future-end, b: writable-buffer?) -> option` | +| Approximate WIT signature for `future.write` | `func>(e: writable-future-end, v: readable-buffer?) -> option` | +| Canonical ABI signature | `[readable-future-end:i32 ptr:i32] -> [i32]` | -###### 🔀 `stream.cancel-read`, `stream.cancel-write`, `future.cancel-read`, and `future.cancel-write` +where `future-{read,write}-result` are defined in WIT as: +```wit +enum future-read-result { + // The read completed and this readable end must now be dropped. + completed, + + // The read was cancelled by future.cancel-read; future reads are possible. + cancelled +} +enum future-write-result { + // The write completed successfully and this writable end must now be dropped. + completed, + + // The readable end was dropped and so the writable end must now be dropped. + dropped, + + // The write was cancelled by future.cancel-write; future writes are possible. + cancelled +} +``` +`future-read-result` is the same as the `copy-result` enum used in +`stream-result` minus the `dropped` case (since futures do not allow the writer +to drop their end before writing a value). `future-write-result` is the same as +`copy-result`, including the `dropped` case (since the writer can be notified +that the reader signalled loss of interest by dropping their end). + +The `future.{read,write}` built-ins takes the [readable or writable end] of a +future as the first parameter and, if `T` is present, a length-1 buffer that +can be used to write or read a single `T` value. + +If the return value is `none`, then the call blocked and the caller needs +to [wait](Async.md#waiting) for progress (via `waitable-set.{wait,poll}` or, if +using a `callback`, by returning to the event loop) which will asynchronously +produce an `event` containing a `future-{read,write}-result`. + +If `future.{read,write}` return `completed` or `dropped` (synchronously or +asynchronously), the only valid next operation is +`future.drop-{readable,writable}`. -| Synopsis | | -| --------------------------------------------------- | --------------------------------------------------------------------- | -| Approximate WIT signature for `stream.cancel-read` | `func>(e: readable-stream-end) -> option` | -| Approximate WIT signature for `stream.cancel-write` | `func>(e: writable-stream-end) -> option` | -| Approximate WIT signature for `future.cancel-read` | `func>(e: readable-future-end) -> option` | -| Approximate WIT signature for `future.cancel-write` | `func>(e: writable-future-end) -> option` | -| Canonical ABI signature | `[e: i32] -> [i32]` | +A component *may* call `future.drop-readable` *before* successfully reading a +value to indicate a loss of interest. `future.drop-writable` will trap if +called before successfully writing a value. -where `copy-result` is defined as in [`stream.read` and -`stream.write`](#-streamread-and-streamwrite). +In the Canonical ABI, the `{readable,writable}-future-end` is passed as an +`i32` index into the component instance's table followed by a single +`i32` describing the linear memory offset of the +`{readable,writable}-buffer`. The `option` +return value is bit-packed into the single `i32` return value where +`0xffff_ffff` represents `none`. + +(See [`canon_future_read`] in the Canonical ABI explainer for details.) + + +###### 🔀 `stream.cancel-read`, `stream.cancel-write`, `future.cancel-read`, and `future.cancel-write` + +| Synopsis | | +| --------------------------------------------------- | ----------------------------------------------------------------------------- | +| Approximate WIT signature for `stream.cancel-read` | `func>(e: readable-stream-end) -> option` | +| Approximate WIT signature for `stream.cancel-write` | `func>(e: writable-stream-end) -> option` | +| Approximate WIT signature for `future.cancel-read` | `func>(e: readable-future-end) -> option` | +| Approximate WIT signature for `future.cancel-write` | `func>(e: writable-future-end) -> option` | +| Canonical ABI signature | `[e: i32] -> [i32]` | The `{stream,future}.cancel-{read,write}` built-ins take the matching [readable or writable end] of a stream or future that has a pending `{stream,future}.{read,write}`. -If cancellation finishes eagerly, the return value is a `copy-result`. If -cancellation blocks, the return value is `none` and the caller must wait for a -corresponding `{stream,future}-{read,write}` event via -`waitable-set.{wait,poll}` or, when using a `callback`, returning to the event -loop. In either case, the `status` of the `copy-result` may be `cancelled` but -may also be `completed` or `closed`, if one of these racily happened first. +If cancellation finishes without blocking, the return value is a +`stream-result` or `future-{read,write}-result`. If cancellation blocks, the +return value is `none` and the caller must wait for a corresponding +`{stream,future}-{read,write}` event via `waitable-set.{wait,poll}` or, when +using a `callback`, returning to the event loop. In either case, the result may +be `cancelled` but may also be `completed` or `dropped`, if one of these racily +happened first. -In the Canonical ABI, the `option` is bit-packed into the single -returned `i32` in the same way as `{stream,future}.{read,write}`. (See +In the Canonical ABI, the optional result value is bit-packed into the single +`i32` result in the same way as `{stream,future}.{read,write}`. (See [`canon_stream_cancel_read`] in the Canonical ABI explainer for details.) -###### 🔀 `stream.close-readable`, `stream.close-writable`, `future.close-readable`, and `future.close-writable` +###### 🔀 `stream.drop-readable`, `stream.drop-writable`, `future.drop-readable`, and `future.drop-writable` -| Synopsis | | -| ----------------------------------------------------- | ---------------------------------------------- | -| Approximate WIT signature for `stream.close-readable` | `func>(e: readable-stream-end)` | -| Approximate WIT signature for `stream.close-writable` | `func>(e: writable-stream-end)` | -| Approximate WIT signature for `future.close-readable` | `func>(e: readable-future-end)` | -| Approximate WIT signature for `future.close-writable` | `func>(e: writable-future-end)` | -| Canonical ABI signature | `[end:i32 err:i32] -> []` | +| Synopsis | | +| ---------------------------------------------------- | ---------------------------------------------- | +| Approximate WIT signature for `stream.drop-readable` | `func>(e: readable-stream-end)` | +| Approximate WIT signature for `stream.drop-writable` | `func>(e: writable-stream-end)` | +| Approximate WIT signature for `future.drop-readable` | `func>(e: readable-future-end)` | +| Approximate WIT signature for `future.drop-writable` | `func>(e: writable-future-end)` | +| Canonical ABI signature | `[end:i32 err:i32] -> []` | -The `{stream,future}.close-{readable,writable}` built-ins remove the indicated +The `{stream,future}.drop-{readable,writable}` built-ins remove the indicated [stream or future] from the current component instance's table, trapping if the stream or future has a mismatched direction or type or are in the middle of a -`read` or `write`. +`read` or `write` or, in the special case of `future.drop-writable`, if a +value has not already been written. (See [`canon_stream_drop_readable`] in the +Canonical ABI explainer for details.) ##### 📝 Error Context built-ins @@ -2882,9 +2920,11 @@ For some use-case-focused, worked examples, see: [`canon_waitable_set_drop`]: CanonicalABI.md#-canon-waitable-setdrop [`canon_waitable_join`]: CanonicalABI.md#-canon-waitablejoin [`canon_stream_new`]: CanonicalABI.md#-canon-streamfuturenew -[`canon_stream_read`]: CanonicalABI.md#-canon-streamfuturereadwrite -[`canon_future_read`]: CanonicalABI.md#-canon-streamfuturereadwrite +[`canon_stream_read`]: CanonicalABI.md#-canon-streamreadwrite +[`canon_future_read`]: CanonicalABI.md#-canon-futurereadwrite +[`canon_future_write`]: CanonicalABI.md#-canon-futurereadwrite [`canon_stream_cancel_read`]: CanonicalABI.md#-canon-streamfuturecancel-readwrite +[`canon_stream_drop_readable`]: CanonicalABI.md#-canon-streamfuturedrop-readablewritable [`canon_subtask_cancel`]: CanonicalABI.md#-canon-subtaskcancel [`canon_subtask_drop`]: CanonicalABI.md#-canon-subtaskdrop [`canon_resource_new`]: CanonicalABI.md#canon-resourcenew @@ -2896,7 +2936,6 @@ For some use-case-focused, worked examples, see: [`canon_thread_spawn_ref`]: CanonicalABI.md#-canon-threadspawn_ref [`canon_thread_spawn_indirect`]: CanonicalABI.md#-canon-threadspawn_indirect [`canon_thread_available_parallelism`]: CanonicalABI.md#-canon-threadavailable_parallelism -[the `close` built-ins]: CanonicalABI.md#-canon-streamfutureclose-readablewritable [Shared-Nothing]: ../high-level/Choices.md [Use Cases]: ../high-level/UseCases.md [Host Embeddings]: ../high-level/UseCases.md#hosts-embedding-components @@ -2909,7 +2948,6 @@ For some use-case-focused, worked examples, see: [Stream or Future]: Async.md#streams-and-futures [Readable and Writable Ends]: Async.md#streams-and-futures [Readable or Writable End]: Async.md#streams-and-futures -[Writable End]: Async.md#streams-and-futures [Waiting]: Async.md#waiting [Waitables]: Async.md#waiting [Waitable Set]: Async.md#waiting diff --git a/design/mvp/canonical-abi/definitions.py b/design/mvp/canonical-abi/definitions.py index c1b8a239..7cb85681 100644 --- a/design/mvp/canonical-abi/definitions.py +++ b/design/mvp/canonical-abi/definitions.py @@ -755,7 +755,7 @@ def drop(self): class CopyResult(IntEnum): COMPLETED = 0 - CLOSED = 1 + DROPPED = 1 CANCELLED = 2 RevokeBuffer = Callable[[], None] @@ -766,11 +766,10 @@ class ReadableStream: t: ValType read: Callable[[ComponentInstance, WritableBuffer, OnCopy, OnCopyDone], None] cancel: Callable[[], None] - close: Callable[[], None] - closed: Callable[[], bool] + drop: Callable[[], None] class SharedStreamImpl(ReadableStream): - closed_: bool + dropped: bool pending_inst: Optional[ComponentInstance] pending_buffer: Optional[Buffer] pending_on_copy: Optional[OnCopy] @@ -778,7 +777,7 @@ class SharedStreamImpl(ReadableStream): def __init__(self, t): self.t = t - self.closed_ = False + self.dropped = False self.reset_pending() def reset_pending(self): @@ -798,18 +797,15 @@ def reset_and_notify_pending(self, result): def cancel(self): self.reset_and_notify_pending(CopyResult.CANCELLED) - def close(self): - if not self.closed_: - self.closed_ = True + def drop(self): + if not self.dropped: + self.dropped = True if self.pending_buffer: - self.reset_and_notify_pending(CopyResult.CLOSED) - - def closed(self): - return self.closed_ + self.reset_and_notify_pending(CopyResult.DROPPED) def read(self, inst, dst_buffer, on_copy, on_copy_done): - if self.closed_: - on_copy_done(CopyResult.CLOSED) + if self.dropped: + on_copy_done(CopyResult.DROPPED) elif not self.pending_buffer: self.set_pending(inst, dst_buffer, on_copy, on_copy_done) else: @@ -826,8 +822,8 @@ def read(self, inst, dst_buffer, on_copy, on_copy_done): self.set_pending(inst, dst_buffer, on_copy, on_copy_done) def write(self, inst, src_buffer, on_copy, on_copy_done): - if self.closed_: - on_copy_done(CopyResult.CLOSED) + if self.dropped: + on_copy_done(CopyResult.DROPPED) elif not self.pending_buffer: self.set_pending(inst, src_buffer, on_copy, on_copy_done) else: @@ -848,15 +844,17 @@ def write(self, inst, src_buffer, on_copy, on_copy_done): class StreamEnd(Waitable): shared: ReadableStream copying: bool + done: bool def __init__(self, shared): Waitable.__init__(self) self.shared = shared self.copying = False + self.done = False def drop(self): trap_if(self.copying) - self.shared.close() + self.shared.drop() Waitable.drop(self) class ReadableStreamEnd(StreamEnd): @@ -869,29 +867,97 @@ def copy(self, inst, src, on_copy, on_copy_done): #### Future State -class FutureEnd(StreamEnd): - def close_after_copy(self, copy, inst, buffer, on_copy, on_copy_done): - assert(buffer.remain() == 1) - def on_copy_wrapper(revoke_buffer): - assert(buffer.remain() == 0) - self.shared.close() - def on_copy_done_wrapper(result): - if buffer.remain() == 0: - self.shared.close() - on_copy_done(CopyResult.CLOSED) - else: - on_copy_done(result) - copy(inst, buffer, on_copy_wrapper, on_copy_done_wrapper) +class ReadableFuture: + t: ValType + read: Callable[[ComponentInstance, WritableBuffer, OnCopyDone], None] + cancel: Callable[[], None] + drop: Callable[[], None] + +class SharedFutureImpl(ReadableFuture): + dropped: bool + pending_inst: Optional[ComponentInstance] + pending_buffer: Optional[Buffer] + pending_on_copy_done: Optional[OnCopyDone] + + def __init__(self, t): + self.t = t + self.dropped = False + self.reset_pending() + + def reset_pending(self): + self.set_pending(None, None, None) + + def set_pending(self, inst, buffer, on_copy_done): + self.pending_inst = inst + self.pending_buffer = buffer + self.pending_on_copy_done = on_copy_done + + def reset_and_notify_pending(self, result): + pending_on_copy_done = self.pending_on_copy_done + self.reset_pending() + pending_on_copy_done(result) + + def cancel(self): + self.reset_pending_and_notify_pending(CopyResult.CANCELLED) + + def drop(self): + assert(not self.dropped) + self.dropped = True + if self.pending_buffer: + assert(isinstance(self.pending_buffer, WritableBuffer)) + self.reset_and_notify_pending(CopyResult.DROPPED) + + def read(self, inst, dst_buffer, on_copy_done): + assert(not self.dropped and dst_buffer.remain() == 1) + if not self.pending_buffer: + self.set_pending(inst, dst_buffer, on_copy_done) + else: + trap_if(inst is self.pending_inst and self.t is not None) # temporary + dst_buffer.write(self.pending_buffer.read(1)) + self.reset_and_notify_pending(CopyResult.COMPLETED) + on_copy_done(CopyResult.COMPLETED) + + def write(self, inst, src_buffer, on_copy_done): + assert(src_buffer.remain() == 1) + if self.dropped: + on_copy_done(CopyResult.DROPPED) + elif not self.pending_buffer: + self.set_pending(inst, src_buffer, on_copy_done) + else: + trap_if(inst is self.pending_inst and self.t is not None) # temporary + self.pending_buffer.write(src_buffer.read(1)) + self.reset_and_notify_pending(CopyResult.COMPLETED) + on_copy_done(CopyResult.COMPLETED) + +class FutureEnd(Waitable): + shared: ReadableFuture + copying: bool + done: bool + + def __init__(self, shared): + Waitable.__init__(self) + self.shared = shared + self.copying = False + self.done = False + + def drop(self): + trap_if(self.copying) + Waitable.drop(self) class ReadableFutureEnd(FutureEnd): - def copy(self, inst, dst, on_copy, on_copy_done): - return self.close_after_copy(self.shared.read, inst, dst, on_copy, on_copy_done) + def copy(self, inst, src_buffer, on_copy_done): + self.shared.read(inst, src_buffer, on_copy_done) + + def drop(self): + self.shared.drop() + FutureEnd.drop(self) class WritableFutureEnd(FutureEnd): - def copy(self, inst, src, on_copy, on_copy_done): - return self.close_after_copy(self.shared.write, inst, src, on_copy, on_copy_done) + def copy(self, inst, dst_buffer, on_copy_done): + self.shared.write(inst, dst_buffer, on_copy_done) + def drop(self): - trap_if(not self.shared.closed()) + trap_if(not self.done) FutureEnd.drop(self) ### Despecialization @@ -1223,9 +1289,7 @@ def lift_stream(cx, i, t): return lift_async_value(ReadableStreamEnd, cx, i, t) def lift_future(cx, i, t): - v = lift_async_value(ReadableFutureEnd, cx, i, t) - trap_if(v.closed()) - return v + return lift_async_value(ReadableFutureEnd, cx, i, t) def lift_async_value(ReadableEndT, cx, i, t): assert(not contains_borrow(t)) @@ -1233,6 +1297,7 @@ def lift_async_value(ReadableEndT, cx, i, t): trap_if(not isinstance(e, ReadableEndT)) trap_if(e.shared.t != t) trap_if(e.copying) + trap_if(e.done) return e.shared ### Storing @@ -1526,16 +1591,14 @@ def lower_borrow(cx, rep, t): return cx.inst.table.add(h) def lower_stream(cx, v, t): - return lower_async_value(ReadableStreamEnd, cx, v, t) + assert(isinstance(v, ReadableStream)) + assert(not contains_borrow(t)) + return cx.inst.table.add(ReadableStreamEnd(v)) def lower_future(cx, v, t): - assert(not v.closed()) - return lower_async_value(ReadableFutureEnd, cx, v, t) - -def lower_async_value(ReadableEndT, cx, v, t): - assert(isinstance(v, ReadableStream)) + assert(isinstance(v, ReadableFuture)) assert(not contains_borrow(t)) - return cx.inst.table.add(ReadableEndT(v)) + return cx.inst.table.add(ReadableFutureEnd(v)) ### Flattening @@ -2173,54 +2236,49 @@ async def canon_stream_new(stream_t, task): async def canon_future_new(future_t, task): trap_if(not task.inst.may_leave) - shared = SharedStreamImpl(future_t.t) + shared = SharedFutureImpl(future_t.t) ri = task.inst.table.add(ReadableFutureEnd(shared)) wi = task.inst.table.add(WritableFutureEnd(shared)) return [ ri | (wi << 32) ] -### 🔀 `canon {stream,future}.{read,write}` +### 🔀 `canon stream.{read,write}` async def canon_stream_read(stream_t, opts, task, i, ptr, n): - return await copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ, - stream_t, opts, task, i, ptr, n) + return await stream_copy(ReadableStreamEnd, WritableBufferGuestImpl, EventCode.STREAM_READ, + stream_t, opts, task, i, ptr, n) async def canon_stream_write(stream_t, opts, task, i, ptr, n): - return await copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE, - stream_t, opts, task, i, ptr, n) - -async def canon_future_read(future_t, opts, task, i, ptr): - return await copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ, - future_t, opts, task, i, ptr, 1) - -async def canon_future_write(future_t, opts, task, i, ptr): - return await copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE, - future_t, opts, task, i, ptr, 1) + return await stream_copy(WritableStreamEnd, ReadableBufferGuestImpl, EventCode.STREAM_WRITE, + stream_t, opts, task, i, ptr, n) -async def copy(EndT, BufferT, event_code, stream_or_future_t, opts, task, i, ptr, n): +async def stream_copy(EndT, BufferT, event_code, stream_t, opts, task, i, ptr, n): trap_if(not task.inst.may_leave) e = task.inst.table.get(i) trap_if(not isinstance(e, EndT)) - trap_if(e.shared.t != stream_or_future_t.t) - trap_if(e.copying) + trap_if(e.shared.t != stream_t.t) + trap_if(e.copying or e.done) - assert(not contains_borrow(stream_or_future_t)) + assert(not contains_borrow(stream_t)) cx = LiftLowerContext(opts, task.inst, borrow_scope = None) - buffer = BufferT(stream_or_future_t.t, cx, ptr, n) + buffer = BufferT(stream_t.t, cx, ptr, n) - def copy_event(result, revoke_buffer): + def stream_event(result, revoke_buffer): revoke_buffer() assert(e.copying) e.copying = False + if result == CopyResult.DROPPED: + e.done = True assert(0 <= result < 2**4) assert(buffer.progress <= Buffer.MAX_LENGTH < 2**28) packed_result = result | (buffer.progress << 4) return (event_code, i, packed_result) + return (event_code, i, pack_stream_result(result, buffer)) def on_copy(revoke_buffer): - e.set_event(partial(copy_event, CopyResult.COMPLETED, revoke_buffer)) + e.set_event(partial(stream_event, CopyResult.COMPLETED, revoke_buffer)) def on_copy_done(result): - e.set_event(partial(copy_event, result, revoke_buffer = lambda:())) + e.set_event(partial(stream_event, result, revoke_buffer = lambda:())) e.copying = True e.copy(task.inst, buffer, on_copy, on_copy_done) @@ -2235,6 +2293,52 @@ def on_copy_done(result): else: return [BLOCKED] +### 🔀 `canon future.{read,write}` + +async def canon_future_read(future_t, opts, task, i, ptr): + return await future_copy(ReadableFutureEnd, WritableBufferGuestImpl, EventCode.FUTURE_READ, + future_t, opts, task, i, ptr) + +async def canon_future_write(future_t, opts, task, i, ptr): + return await future_copy(WritableFutureEnd, ReadableBufferGuestImpl, EventCode.FUTURE_WRITE, + future_t, opts, task, i, ptr) + +async def future_copy(EndT, BufferT, event_code, future_t, opts, task, i, ptr): + trap_if(not task.inst.may_leave) + e = task.inst.table.get(i) + trap_if(not isinstance(e, EndT)) + trap_if(e.shared.t != future_t.t) + trap_if(e.copying or e.done) + + assert(not contains_borrow(future_t)) + cx = LiftLowerContext(opts, task.inst, borrow_scope = None) + buffer = BufferT(future_t.t, cx, ptr, 1) + + def future_event(result): + assert((buffer.remain() == 0) == (result == CopyResult.COMPLETED)) + assert(e.copying) + e.copying = False + if result == CopyResult.DROPPED or result == CopyResult.COMPLETED: + e.done = True + return (event_code, i, int(result)) + + def on_copy_done(result): + assert(result != CopyResult.DROPPED or event_code == EventCode.FUTURE_WRITE) + e.set_event(partial(future_event, result)) + + e.copying = True + e.copy(task.inst, buffer, on_copy_done) + + if opts.sync and not e.has_pending_event(): + await task.wait_on(e.wait_for_pending_event(), sync = True) + + if e.has_pending_event(): + code,index,payload = e.get_event() + assert(code == event_code and index == i) + return [payload] + else: + return [BLOCKED] + ### 🔀 `canon {stream,future}.cancel-{read,write}` async def canon_stream_cancel_read(stream_t, sync, task, i): @@ -2266,21 +2370,21 @@ async def cancel_copy(EndT, event_code, stream_or_future_t, sync, task, i): assert(not e.copying and code == event_code and index == i) return [payload] -### 🔀 `canon {stream,future}.close-{readable,writable}` +### 🔀 `canon {stream,future}.drop-{readable,writable}` -async def canon_stream_close_readable(stream_t, task, i): - return await close(ReadableStreamEnd, stream_t, task, i) +async def canon_stream_drop_readable(stream_t, task, i): + return await drop(ReadableStreamEnd, stream_t, task, i) -async def canon_stream_close_writable(stream_t, task, hi): - return await close(WritableStreamEnd, stream_t, task, hi) +async def canon_stream_drop_writable(stream_t, task, hi): + return await drop(WritableStreamEnd, stream_t, task, hi) -async def canon_future_close_readable(future_t, task, i): - return await close(ReadableFutureEnd, future_t, task, i) +async def canon_future_drop_readable(future_t, task, i): + return await drop(ReadableFutureEnd, future_t, task, i) -async def canon_future_close_writable(future_t, task, hi): - return await close(WritableFutureEnd, future_t, task, hi) +async def canon_future_drop_writable(future_t, task, hi): + return await drop(WritableFutureEnd, future_t, task, hi) -async def close(EndT, stream_or_future_t, task, hi): +async def drop(EndT, stream_or_future_t, task, hi): trap_if(not task.inst.may_leave) e = task.inst.table.remove(hi) trap_if(not isinstance(e, EndT)) diff --git a/design/mvp/canonical-abi/run_tests.py b/design/mvp/canonical-abi/run_tests.py index d06c9a42..d6c7f8ec 100644 --- a/design/mvp/canonical-abi/run_tests.py +++ b/design/mvp/canonical-abi/run_tests.py @@ -1010,25 +1010,25 @@ def reset_pending(self): def closed(self): return not self.remaining and self.destroy_if_empty - def close(self): + def drop(self): self.remaining = [] self.destroy_if_empty = True if self.pending_dst: - self.pending_on_copy_done(CopyResult.CLOSED) + self.pending_on_copy_done(CopyResult.DROPPED) self.reset_pending() def destroy_once_empty(self): self.destroy_if_empty = True if not self.remaining: - self.close() + self.drop() def read(self, inst, dst, on_copy, on_copy_done): if self.closed(): - on_copy_done(CopyResult.CLOSED) + on_copy_done(CopyResult.DROPPED) elif self.remaining: self.actually_copy(dst) if self.closed(): - on_copy_done(CopyResult.CLOSED) + on_copy_done(CopyResult.DROPPED) else: on_copy_done(CopyResult.COMPLETED) else: @@ -1072,6 +1072,7 @@ class HostSink: write_remain: int write_event: asyncio.Event ready_to_consume: asyncio.Event + closed: bool def __init__(self, shared, chunk, remain = 2**64): self.shared = shared @@ -1083,21 +1084,24 @@ def __init__(self, shared, chunk, remain = 2**64): if remain: self.write_event.set() self.ready_to_consume = asyncio.Event() + self.closed = False async def read_all(): - while not self.shared.closed(): + while True: await self.write_event.wait() - if self.shared.closed(): - break def on_copy(revoke_buffer): revoke_buffer() if not f.done(): f.set_result(None) def on_copy_done(result): + if result == CopyResult.DROPPED: + self.closed = True if not f.done(): f.set_result(None) f = asyncio.Future() self.shared.read(None, self, on_copy, on_copy_done) await f + if self.closed: + break self.ready_to_consume.set() asyncio.create_task(read_all()) @@ -1118,7 +1122,7 @@ def write(self, vs): async def consume(self, n): while n > len(self.received): - if self.shared.closed(): + if self.closed: return None self.ready_to_consume.clear() await self.ready_to_consume.wait() @@ -1144,7 +1148,7 @@ async def add10(): for i in range(len(vs)): vs[i] += 10 outgoing.write(vs) - outgoing.close() + outgoing.drop() asyncio.create_task(add10()) await asyncio.sleep(0) @@ -1188,7 +1192,7 @@ async def core_func(task, args): assert(n == 4 and result == CopyResult.COMPLETED) [ret] = await canon_stream_read(StreamType(U8Type()), opts, task, rsi1, 0, 4) result,n = unpack_result(ret) - assert(n == 4 and result == CopyResult.CLOSED) + assert(n == 4 and result == CopyResult.DROPPED) assert(mem[0:4] == b'\x05\x06\x07\x08') [ret] = await canon_stream_write(StreamType(U8Type()), opts, task, wsi3, 0, 4) result,n = unpack_result(ret) @@ -1200,10 +1204,10 @@ async def core_func(task, args): [ret] = await canon_stream_write(StreamType(U8Type()), opts, task, wsi2, 0, 4) result,n = unpack_result(ret) assert(n == 4 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi1) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi4) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi2) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi3) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi1) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi4) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi2) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi3) return [] await canon_lift(opts, inst, ft, core_func, None, on_start, on_resolve, host_on_block) @@ -1298,13 +1302,13 @@ async def core_func(task, args): src_stream.destroy_once_empty() [ret] = await canon_stream_read(StreamType(U8Type()), opts, task, rsi1, 0, 4) result,n = unpack_result(ret) - assert(n == 4 and result == CopyResult.CLOSED) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi1) + assert(n == 4 and result == CopyResult.DROPPED) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi1) assert(mem[0:4] == b'\x05\x06\x07\x08') [ret] = await canon_stream_write(StreamType(U8Type()), opts, task, wsi3, 0, 4) result,n = unpack_result(ret) assert(n == 4 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi3) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi3) [ret] = await canon_stream_read(StreamType(U8Type()), opts, task, rsi4, 0, 4) assert(ret == definitions.BLOCKED) [] = await canon_waitable_join(task, rsi4, seti) @@ -1314,12 +1318,12 @@ async def core_func(task, args): result,n = unpack_result(mem[retp+4]) assert(n == 4 and result == CopyResult.COMPLETED) [ret] = await canon_stream_read(StreamType(U8Type()), sync_opts, task, rsi4, 0, 4) - assert(ret == CopyResult.CLOSED) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi4) + assert(ret == CopyResult.DROPPED) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi4) [ret] = await canon_stream_write(StreamType(U8Type()), sync_opts, task, wsi2, 0, 4) result,n = unpack_result(ret) assert(n == 4 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi2) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi2) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1380,7 +1384,7 @@ async def core_func(task, args): await canon_stream_cancel_write(StreamType(U8Type()), True, task, wsi) except Trap: pass - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi) return [] def on_start(): return [] @@ -1433,7 +1437,7 @@ async def core_func(task, args): assert(mem[retp+0] == rsi) result,n = unpack_result(mem[retp+4]) assert(n == 2 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi) [packed] = await canon_stream_new(StreamType(U8Type()), task) rsi,wsi = unpack_new_ends(packed) @@ -1455,7 +1459,7 @@ async def core_func(task, args): result,n = unpack_result(mem[retp+4]) assert(n == 4 and result == CopyResult.COMPLETED) assert(dst.received == [1,2,3,4,5,6]) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi) [] = await canon_waitable_set_drop(task, seti) dst.set_remain(100) assert(await dst.consume(100) is None) @@ -1529,7 +1533,7 @@ async def core_func1(task, args): assert(ret == 0) [errctxi] = await canon_error_context_new(opts1, task, 0, 0) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi) [] = await canon_waitable_set_drop(task, seti) [] = await canon_error_context_drop(task, errctxi) return [] @@ -1590,9 +1594,9 @@ async def core_func2(task, args): assert(event == EventCode.STREAM_READ) assert(mem2[retp+0] == rsi) p2 = int.from_bytes(mem2[retp+4 : retp+8], 'little', signed=False) - assert(p2 == (CopyResult.CLOSED | 1)) + assert(p2 == (CopyResult.DROPPED | 1)) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1640,7 +1644,7 @@ async def core_func1(task, args): fut4.set_result(None) [errctxi] = await canon_error_context_new(opts1, task, 0, 0) - [] = await canon_stream_close_writable(StreamType(None), task, wsi) + [] = await canon_stream_drop_writable(StreamType(None), task, wsi) [] = await canon_error_context_drop(task, errctxi) return [] @@ -1688,8 +1692,8 @@ async def core_func2(task, args): [ret] = await canon_stream_read(StreamType(None), opts2, task, rsi, 1000000, 2) result,n = unpack_result(ret) - assert(n == 0 and result == CopyResult.CLOSED) - [] = await canon_stream_close_readable(StreamType(None), task, rsi) + assert(n == 0 and result == CopyResult.DROPPED) + [] = await canon_stream_drop_readable(StreamType(None), task, rsi) return [] await canon_lift(opts2, inst2, ft2, core_func2, None, lambda:[], lambda _:(), host_on_block) @@ -1733,7 +1737,7 @@ async def core_func(task, args): [ret] = await canon_stream_cancel_write(StreamType(U8Type()), True, task, wsi) result,n = unpack_result(ret) assert(n == 2 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi) host_sink.set_remain(100) assert(await host_sink.consume(100) is None) @@ -1750,7 +1754,7 @@ async def core_func(task, args): [ret] = await canon_stream_cancel_write(StreamType(U8Type()), False, task, wsi) result,n = unpack_result(ret) assert(n == 2 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_writable(StreamType(U8Type()), task, wsi) + [] = await canon_stream_drop_writable(StreamType(U8Type()), task, wsi) host_sink.set_remain(100) assert(await host_sink.consume(100) is None) @@ -1763,7 +1767,7 @@ async def core_func(task, args): [ret] = await canon_stream_cancel_read(StreamType(U8Type()), True, task, rsi) result,n = unpack_result(ret) assert(n == 0 and result == CopyResult.CANCELLED) - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi) [ret] = await canon_lower(lower_opts, host_ft2, host_func2, task, [retp]) assert(ret == Subtask.State.RETURNED) @@ -1783,7 +1787,7 @@ async def core_func(task, args): result,n = unpack_result(mem[retp+4]) assert(n == 2 and result == CopyResult.CANCELLED) assert(mem[0:2] == b'\x07\x08') - [] = await canon_stream_close_readable(StreamType(U8Type()), task, rsi) + [] = await canon_stream_drop_readable(StreamType(U8Type()), task, rsi) [] = await canon_waitable_set_drop(task, seti) return [] @@ -1810,48 +1814,36 @@ def write(self, v): self.v = v[0] self.has_v.set() -class HostFutureSource(ReadableStream): - is_closed: bool +class HostFutureSource(ReadableFuture): v: Optional[any] pending_buffer: Optional[WritableBuffer] pending_on_copy_done: Optional[OnCopyDone] def __init__(self, t): self.t = t - self.is_closed = False self.v = None + self.reset_pending() + def reset_pending(self): self.pending_buffer = None self.pending_on_copy_done = None - def closed(self): - return self.is_closed - def read(self, inst, dst, on_copy, on_copy_done): - if self.is_closed: - on_copy_done(CopyResult.CLOSED) - elif self.v: - dst.write([self.v]) - self.is_closed = True - on_copy_done(CopyResult.CLOSED) + def read(self, inst, buffer, on_copy_done): + if self.v: + buffer.write([self.v]) + on_copy_done(CopyResult.COMPLETED) else: - self.pending_buffer = dst + self.pending_buffer = buffer self.pending_on_copy_done = on_copy_done + def cancel(self): + self.pending_on_copy_done(CopyResult.CANCELLED) + self.reset_pending() + def drop(self): + pass def set_result(self, v): - assert(not self.is_closed and not self.v) if self.pending_buffer: self.pending_buffer.write([v]) - self.is_closed = True - self.reset_and_notify_pending(CopyResult.CLOSED) + self.pending_on_copy_done(CopyResult.COMPLETED) + self.reset_pending() else: self.v = v - def reset_and_notify_pending(self, result): - pending_on_copy_done = self.pending_on_copy_done - self.pending_buffer = None - self.pending_on_copy_done = None - pending_on_copy_done(result) - def cancel(self): - self.reset_and_notify_pending(CopyResult.CANCELLED) - def close(self): - self.is_closed = True - if self.pending_buffer: - self.reset_and_notify_pending(CopyResult.CLOSED) async def test_futures(): inst = ComponentInstance() @@ -1864,9 +1856,8 @@ async def host_func(task, on_start, on_resolve, on_block): outgoing = HostFutureSource(U8Type()) on_resolve([outgoing]) incoming = HostFutureSink(U8Type()) - future.read(None, incoming, lambda _:(), lambda _:()) - wait = asyncio.create_task(incoming.has_v.wait()) - await on_block(wait) + future.read(None, incoming, lambda why:()) + await on_block(asyncio.create_task(incoming.has_v.wait())) assert(incoming.v == 42) outgoing.set_result(43) @@ -1887,20 +1878,18 @@ async def core_func(task, args): writep = 8 mem[writep] = 42 [ret] = await canon_future_write(FutureType(U8Type()), lower_opts, task, wfi, writep) - result,n = unpack_result(ret) - assert(n == 1 and result == CopyResult.CLOSED) + assert(ret == CopyResult.COMPLETED) [seti] = await canon_waitable_set_new(task) [] = await canon_waitable_join(task, rfi, seti) [event] = await canon_waitable_set_wait(False, mem, task, seti, retp) assert(event == EventCode.FUTURE_READ) assert(mem[retp+0] == rfi) - result,n = unpack_result(mem[retp+4]) - assert(n == 1 and result == CopyResult.CLOSED) + assert(mem[retp+4] == CopyResult.COMPLETED) assert(mem[readp] == 43) - [] = await canon_future_close_writable(FutureType(U8Type()), task, wfi) - [] = await canon_future_close_readable(FutureType(U8Type()), task, rfi) + [] = await canon_future_drop_writable(FutureType(U8Type()), task, wfi) + [] = await canon_future_drop_readable(FutureType(U8Type()), task, rfi) [] = await canon_waitable_set_drop(task, seti) [packed] = await canon_future_new(FutureType(U8Type()), task) @@ -1916,25 +1905,23 @@ async def core_func(task, args): writep = 8 mem[writep] = 42 [ret] = await canon_future_write(FutureType(U8Type()), lower_opts, task, wfi, writep) - result,n = unpack_result(ret) - assert(n == 1 and result == CopyResult.CLOSED) + assert(ret == CopyResult.COMPLETED) - while not task.inst.table.get(rfi).shared.closed(): + while not task.inst.table.get(rfi).has_pending_event(): await task.yield_(sync = False) [ret] = await canon_future_cancel_read(FutureType(U8Type()), True, task, rfi) - result,n = unpack_result(ret) - assert(n == 1 and result == CopyResult.CLOSED) + assert(ret == CopyResult.COMPLETED) assert(mem[readp] == 43) - [] = await canon_future_close_writable(FutureType(U8Type()), task, wfi) - [] = await canon_future_close_readable(FutureType(U8Type()), task, rfi) + [] = await canon_future_drop_writable(FutureType(U8Type()), task, wfi) + [] = await canon_future_drop_readable(FutureType(U8Type()), task, rfi) [packed] = await canon_future_new(FutureType(U8Type()), task) rfi,wfi = unpack_new_ends(packed) trapped = False try: - await canon_future_close_writable(FutureType(U8Type()), task, wfi) + await canon_future_drop_writable(FutureType(U8Type()), task, wfi) except Trap: trapped = True assert(trapped) @@ -2205,22 +2192,19 @@ async def core_func(task, args): [packed] = await canon_future_new(FutureType(None), task) rfi,wfi = unpack_new_ends(packed) - [ret] = await canon_future_write(FutureType(None), async_opts, task, wfi, 10000) + [ret] = await canon_future_write(FutureType(None), async_opts, task, wfi, 0xdeadbeef) assert(ret == definitions.BLOCKED) - [ret] = await canon_future_read(FutureType(None), async_opts, task, rfi, 20000) - result,n = unpack_result(ret) - assert(n == 1 and result == CopyResult.CLOSED) - [] = await canon_future_close_readable(FutureType(None), task, rfi) + [ret] = await canon_future_read(FutureType(None), async_opts, task, rfi, 0xdeadbeef) + assert(ret == CopyResult.COMPLETED) + [] = await canon_future_drop_readable(FutureType(None), task, rfi) [] = await canon_waitable_join(task, wfi, seti) [event] = await canon_waitable_set_wait(True, mem, task, seti, 0) assert(event == EventCode.FUTURE_WRITE) assert(mem[0] == wfi) - result,n = unpack_result(mem[4]) - assert(result == CopyResult.CLOSED) - assert(n == 1) - [] = await canon_future_close_writable(FutureType(None), task, wfi) + assert(mem[4] == CopyResult.COMPLETED) + [] = await canon_future_drop_writable(FutureType(None), task, wfi) [packed] = await canon_stream_new(StreamType(None), task) rsi,wsi = unpack_new_ends(packed) @@ -2233,16 +2217,16 @@ async def core_func(task, args): [ret] = await canon_stream_read(StreamType(None), async_opts, task, rsi, 2000, 4) result,n = unpack_result(ret) assert(n == 2 and result == CopyResult.COMPLETED) - [] = await canon_stream_close_readable(StreamType(None), task, rsi) + [] = await canon_stream_drop_readable(StreamType(None), task, rsi) [] = await canon_waitable_join(task, wsi, seti) [event] = await canon_waitable_set_wait(True, mem, task, seti, 0) assert(event == EventCode.STREAM_WRITE) assert(mem[0] == wsi) result,n = unpack_result(mem[4]) - assert(result == CopyResult.CLOSED) + assert(result == CopyResult.DROPPED) assert(n == 3) - [] = await canon_stream_close_writable(StreamType(None), task, wsi) + [] = await canon_stream_drop_writable(StreamType(None), task, wsi) [] = await canon_waitable_set_drop(task, seti) return [] diff --git a/test/async/cancel-stream.wast b/test/async/cancel-stream.wast index 7f69840d..96e6ff00 100644 --- a/test/async/cancel-stream.wast +++ b/test/async/cancel-stream.wast @@ -14,7 +14,7 @@ (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) (import "" "stream.cancel-write" (func $stream.cancel-write (param i32) (result i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) (global $sw (mut i32) (i32.const 0)) @@ -35,9 +35,9 @@ (then unreachable)) ) - (func $write4-and-close (export "write4-and-close") + (func $write4-and-drop (export "write4-and-drop") (call $write4) - (call $stream.close-writable (global.get $sw)) + (call $stream.drop-writable (global.get $sw)) ) (func $start-blocking-write (export "start-blocking-write") @@ -72,18 +72,18 @@ (canon stream.new $ST (core func $stream.new)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) (canon stream.cancel-write $ST (core func $stream.cancel-write)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (core instance $cm (instantiate $CM (with "" (instance (export "mem" (memory $memory "mem")) (export "task.return" (func $task.return)) (export "stream.new" (func $stream.new)) (export "stream.write" (func $stream.write)) (export "stream.cancel-write" (func $stream.cancel-write)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-writable" (func $stream.drop-writable)) )))) (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) (func (export "write4") (canon lift (core func $cm "write4"))) - (func (export "write4-and-close") (canon lift (core func $cm "write4-and-close"))) + (func (export "write4-and-drop") (canon lift (core func $cm "write4-and-drop"))) (func (export "start-blocking-write") (canon lift (core func $cm "start-blocking-write"))) (func (export "cancel-after-read4") (canon lift (core func $cm "cancel-after-read4"))) ) @@ -92,7 +92,7 @@ (import "c" (instance $c (export "start-stream" (func (result (stream u8)))) (export "write4" (func)) - (export "write4-and-close" (func)) + (export "write4-and-drop" (func)) (export "start-blocking-write" (func)) (export "cancel-after-read4" (func)) )) @@ -103,10 +103,10 @@ (import "" "mem" (memory 1)) (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) (import "" "stream.cancel-read" (func $stream.cancel-read (param i32) (result i32))) - (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) (import "" "start-stream" (func $start-stream (result i32))) (import "" "write4" (func $write4)) - (import "" "write4-and-close" (func $write4-and-close)) + (import "" "write4-and-drop" (func $write4-and-drop)) (import "" "start-blocking-write" (func $start-blocking-write)) (import "" "cancel-after-read4" (func $cancel-after-read4)) @@ -141,18 +141,17 @@ (if (i32.ne (i32.const 0xabcd) (i32.load (i32.const 8))) (then unreachable)) - ;; read, block, call $C to write 4 bytes into the buffer and close, - ;; then cancel, which should show "4+closed" + ;; read, block, call $C to write 4 bytes into the buffer and drop, then cancel (local.set $ret (call $stream.read (local.get $sr) (i32.const 8) (i32.const 100))) (if (i32.ne (i32.const -1 (; BLOCKED;)) (local.get $ret)) (then unreachable)) - (call $write4-and-close) + (call $write4-and-drop) (local.set $ret (call $stream.cancel-read (local.get $sr))) - (if (i32.ne (i32.const 0x41 (; CLOSED=1 | (4<<4) ;)) (local.get $ret)) + (if (i32.ne (i32.const 0x41 (; DROPPED=1 | (4<<4) ;)) (local.get $ret)) (then unreachable)) (if (i32.ne (i32.const 0xabcd) (i32.load (i32.const 8))) (then unreachable)) - (call $stream.close-readable (local.get $sr)) + (call $stream.drop-readable (local.get $sr)) ;; get a new $sr (local.set $sr (call $start-stream)) @@ -176,20 +175,20 @@ (type $ST (stream u8)) (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) (canon stream.cancel-read $ST (core func $stream.cancel-read)) - (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) (canon lower (func $c "start-stream") (core func $start-stream')) (canon lower (func $c "write4") (core func $write4')) - (canon lower (func $c "write4-and-close") (core func $write4-and-close')) + (canon lower (func $c "write4-and-drop") (core func $write4-and-drop')) (canon lower (func $c "start-blocking-write") (core func $start-blocking-write')) (canon lower (func $c "cancel-after-read4") (core func $cancel-after-read4')) (core instance $dm (instantiate $DM (with "" (instance (export "mem" (memory $memory "mem")) (export "stream.read" (func $stream.read)) (export "stream.cancel-read" (func $stream.cancel-read)) - (export "stream.close-readable" (func $stream.close-readable)) + (export "stream.drop-readable" (func $stream.drop-readable)) (export "start-stream" (func $start-stream')) (export "write4" (func $write4')) - (export "write4-and-close" (func $write4-and-close')) + (export "write4-and-drop" (func $write4-and-drop')) (export "start-blocking-write" (func $start-blocking-write')) (export "cancel-after-read4" (func $cancel-after-read4')) )))) diff --git a/test/async/close-stream.wast b/test/async/drop-stream.wast similarity index 76% rename from test/async/close-stream.wast rename to test/async/drop-stream.wast index 2f8d2053..5fcbe7cb 100644 --- a/test/async/close-stream.wast +++ b/test/async/drop-stream.wast @@ -12,7 +12,7 @@ (import "" "mem" (memory 1)) (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) (global $sw (mut i32) (i32.const 0)) @@ -42,32 +42,32 @@ (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) (then unreachable)) ) - (func $close-writable (export "close-writable") + (func $drop-writable (export "drop-writable") ;; boom - (call $stream.close-writable (global.get $sw)) + (call $stream.drop-writable (global.get $sw)) ) ) (type $ST (stream u8)) (canon stream.new $ST (core func $stream.new)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (core instance $cm (instantiate $CM (with "" (instance (export "mem" (memory $memory "mem")) (export "stream.new" (func $stream.new)) (export "stream.write" (func $stream.write)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-writable" (func $stream.drop-writable)) )))) (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) (func (export "write4") (canon lift (core func $cm "write4"))) (func (export "start-blocking-write") (canon lift (core func $cm "start-blocking-write"))) - (func (export "close-writable") (canon lift (core func $cm "close-writable"))) + (func (export "drop-writable") (canon lift (core func $cm "drop-writable"))) ) (component $D (import "c" (instance $c (export "start-stream" (func (result (stream u8)))) (export "write4" (func)) (export "start-blocking-write" (func)) - (export "close-writable" (func)) + (export "drop-writable" (func)) )) (core module $Memory (memory (export "mem") 1)) @@ -77,14 +77,14 @@ (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) - (import "" "stream.close-readable" (func $stream.close-readable (param i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) (import "" "start-stream" (func $start-stream (result i32))) (import "" "write4" (func $write4)) (import "" "start-blocking-write" (func $start-blocking-write)) - (import "" "close-writable" (func $close-writable)) + (import "" "drop-writable" (func $drop-writable)) - (func (export "close-while-reading") + (func (export "drop-while-reading") (local $ret i32) (local $sr i32) ;; call 'start-stream' to get the stream we'll be working with @@ -104,9 +104,9 @@ (then unreachable)) ;; boom - (call $stream.close-readable (local.get $sr)) + (call $stream.drop-readable (local.get $sr)) ) - (func (export "close-while-writing") + (func (export "drop-while-writing") (local $ret i32) (local $sr i32) ;; call 'start-stream' to get the stream we'll be working with @@ -121,40 +121,40 @@ (then unreachable)) (if (i32.ne (i32.const 0x89abcdef) (i32.load (i32.const 8))) (then unreachable)) - (call $close-writable) + (call $drop-writable) ) ) (type $ST (stream u8)) (canon stream.new $ST (core func $stream.new)) (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) - (canon stream.close-readable $ST (core func $stream.close-readable)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (canon lower (func $c "start-stream") (core func $start-stream')) (canon lower (func $c "write4") (core func $write4')) (canon lower (func $c "start-blocking-write") (core func $start-blocking-write')) - (canon lower (func $c "close-writable") (core func $close-writable')) + (canon lower (func $c "drop-writable") (core func $drop-writable')) (core instance $core (instantiate $Core (with "" (instance (export "mem" (memory $memory "mem")) (export "stream.new" (func $stream.new)) (export "stream.read" (func $stream.read)) (export "stream.write" (func $stream.write)) - (export "stream.close-readable" (func $stream.close-readable)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-readable" (func $stream.drop-readable)) + (export "stream.drop-writable" (func $stream.drop-writable)) (export "start-stream" (func $start-stream')) (export "write4" (func $write4')) (export "start-blocking-write" (func $start-blocking-write')) - (export "close-writable" (func $close-writable')) + (export "drop-writable" (func $drop-writable')) )))) - (func (export "close-while-reading") (canon lift (core func $core "close-while-reading"))) - (func (export "close-while-writing") (canon lift (core func $core "close-while-writing"))) + (func (export "drop-while-reading") (canon lift (core func $core "drop-while-reading"))) + (func (export "drop-while-writing") (canon lift (core func $core "drop-while-writing"))) ) (instance $c (instantiate $C)) (instance $d (instantiate $D (with "c" (instance $c)))) - (func (export "close-while-reading") (alias export $d "close-while-reading")) - (func (export "close-while-writing") (alias export $d "close-while-writing")) + (func (export "drop-while-reading") (alias export $d "drop-while-reading")) + (func (export "drop-while-writing") (alias export $d "drop-while-writing")) ) (component instance $new-tester-instance $Tester) -(assert_trap (invoke "close-while-reading") "cannot drop busy stream or future") +(assert_trap (invoke "drop-while-reading") "cannot drop busy stream or future") (component instance $new-tester-instance $Tester) -(assert_trap (invoke "close-while-writing") "cannot drop busy stream or future") +(assert_trap (invoke "drop-while-writing") "cannot drop busy stream or future") diff --git a/test/async/empty-wait.wast b/test/async/empty-wait.wast index f034fbfd..575de00b 100644 --- a/test/async/empty-wait.wast +++ b/test/async/empty-wait.wast @@ -15,8 +15,8 @@ (import "" "future.new" (func $future.new (result i64))) (import "" "future.read" (func $future.read (param i32 i32) (result i32))) (import "" "future.write" (func $future.write (param i32 i32) (result i32))) - (import "" "future.close-readable" (func $future.close-readable (param i32))) - (import "" "future.close-writable" (func $future.close-writable (param i32))) + (import "" "future.drop-readable" (func $future.drop-readable (param i32))) + (import "" "future.drop-writable" (func $future.drop-writable (param i32))) ;; $ws is waited on by 'blocker' and added to by 'unblocker' (global $ws (mut i32) (i32.const 0)) @@ -36,10 +36,10 @@ (then unreachable)) (if (i32.ne (global.get $futr) (local.get $index)) (then unreachable)) - (if (i32.ne (i32.const 0x11 (; CLOSED=1 | (1<<4) ;)) (local.get $payload)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $payload)) (then unreachable)) - (call $future.close-readable (global.get $futr)) + (call $future.drop-readable (global.get $futr)) ;; return 42 to $D.run (call $task.return (i32.const 42)) @@ -68,10 +68,10 @@ ;; perform a future.write which will rendezvous with the write and complete (local.set $ret (call $future.write (local.get $futw) (i32.const 0xdeadbeef))) - (if (i32.ne (i32.const 0x11 (; CLOSED=1 | (1<<4) ;)) (local.get $ret)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) (then unreachable)) - (call $future.close-writable (local.get $futw)) + (call $future.drop-writable (local.get $futw)) ;; return 43 to $D.run (call $task.return (i32.const 43)) @@ -89,8 +89,8 @@ (canon future.new $FT (core func $future.new)) (canon future.read $FT async (memory $memory "mem") (core func $future.read)) (canon future.write $FT async (memory $memory "mem") (core func $future.write)) - (canon future.close-readable $FT (core func $future.close-readable)) - (canon future.close-writable $FT (core func $future.close-writable)) + (canon future.drop-readable $FT (core func $future.drop-readable)) + (canon future.drop-writable $FT (core func $future.drop-writable)) (core instance $cm (instantiate $CM (with "" (instance (export "mem" (memory $memory "mem")) (export "task.return" (func $task.return)) @@ -99,8 +99,8 @@ (export "future.new" (func $future.new)) (export "future.read" (func $future.read)) (export "future.write" (func $future.write)) - (export "future.close-readable" (func $future.close-readable)) - (export "future.close-writable" (func $future.close-writable)) + (export "future.drop-readable" (func $future.drop-readable)) + (export "future.drop-writable" (func $future.drop-writable)) )))) (func (export "blocker") (result u32) (canon lift (core func $cm "blocker") diff --git a/test/async/futures-must-write.wast b/test/async/futures-must-write.wast index 96677147..ce2446b3 100644 --- a/test/async/futures-must-write.wast +++ b/test/async/futures-must-write.wast @@ -10,7 +10,7 @@ (import "" "mem" (memory 1)) (import "" "future.new" (func $future.new (result i64))) (import "" "future.write" (func $future.write (param i32 i32) (result i32))) - (import "" "future.close-writable" (func $future.close-writable (param i32))) + (import "" "future.drop-writable" (func $future.drop-writable (param i32))) (global $fw (mut i32) (i32.const 0)) @@ -22,68 +22,65 @@ (i32.wrap_i64 (local.get $ret64)) ) (func $attempt-write (export "attempt-write") (result i32) - ;; because the caller already closed the readable end, this write will eagerly - ;; return CLOSED having written no values. + ;; because the caller already dropped the readable end, this write will eagerly + ;; return DROPPED having written no values. (local $ret i32) (local.set $ret (call $future.write (global.get $fw) (i32.const 42))) - (if (i32.ne (i32.const 0x01 (; CLOSED=1 | (0<<4) ;)) (local.get $ret)) - (then - (i32.load (i32.add (local.get $ret) (i32.const 0x8000_0000))) - unreachable)) + (if (i32.ne (i32.const 0x01 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) ;; return without trapping (i32.const 42) ) - (func $close-writable (export "close-writable") + (func $drop-writable (export "drop-writable") ;; maybe boom - (call $future.close-writable (global.get $fw)) + (call $future.drop-writable (global.get $fw)) ) ) (type $FT (future u8)) (canon future.new $FT (core func $future.new)) (canon future.write $FT async (memory $memory "mem") (core func $future.write)) - (canon future.close-writable $FT (core func $future.close-writable)) + (canon future.drop-writable $FT (core func $future.drop-writable)) (core instance $cm (instantiate $CM (with "" (instance (export "mem" (memory $memory "mem")) (export "future.new" (func $future.new)) (export "future.write" (func $future.write)) - (export "future.close-writable" (func $future.close-writable)) + (export "future.drop-writable" (func $future.drop-writable)) )))) (func (export "start-future") (result (future u8)) (canon lift (core func $cm "start-future"))) (func (export "attempt-write") (result u32) (canon lift (core func $cm "attempt-write"))) - (func (export "close-writable") (canon lift (core func $cm "close-writable"))) + (func (export "drop-writable") (canon lift (core func $cm "drop-writable"))) ) (component $D (import "c" (instance $c (export "start-future" (func (result (future u8)))) (export "attempt-write" (func (result u32))) - (export "close-writable" (func)) + (export "drop-writable" (func)) )) (core module $Memory (memory (export "mem") 1)) (core instance $memory (instantiate $Memory)) (core module $Core (import "" "mem" (memory 1)) - (import "" "future.read" (func $future.read (param i32 i32) (result i32))) - (import "" "future.close-readable" (func $future.close-readable (param i32))) + (import "" "future.drop-readable" (func $future.drop-readable (param i32))) (import "" "start-future" (func $start-future (result i32))) (import "" "attempt-write" (func $attempt-write (result i32))) - (import "" "close-writable" (func $close-writable)) + (import "" "drop-writable" (func $drop-writable)) - (func $close-readable-future-before-read (export "close-readable-future-before-read") (result i32) + (func $drop-readable-future-before-read (export "drop-readable-future-before-read") (result i32) ;; call 'start-future' to get the future we'll be working with (local $fr i32) (local.set $fr (call $start-future)) (if (i32.ne (i32.const 1) (local.get $fr)) (then unreachable)) - ;; ok to immediately close the readable end - (call $future.close-readable (local.get $fr)) + ;; ok to immediately drop the readable end + (call $future.drop-readable (local.get $fr)) - ;; the callee will see that we closed the readable end when it tries to write + ;; the callee will see that we dropped the readable end when it tries to write (call $attempt-write) ) - (func $close-writable-future-before-write (export "close-writable-future-before-write") + (func $drop-writable-future-before-write (export "drop-writable-future-before-write") ;; call 'start-future' to get the future we'll be working with (local $fr i32) (local.set $fr (call $start-future)) @@ -91,33 +88,31 @@ (then unreachable)) ;; boom - (call $close-writable) + (call $drop-writable) ) ) (type $FT (future u8)) (canon future.new $FT (core func $future.new)) - (canon future.read $FT async (memory $memory "mem") (core func $future.read)) - (canon future.close-readable $FT (core func $future.close-readable)) + (canon future.drop-readable $FT (core func $future.drop-readable)) (canon lower (func $c "start-future") (core func $start-future')) (canon lower (func $c "attempt-write") (core func $attempt-write')) - (canon lower (func $c "close-writable") (core func $close-writable')) + (canon lower (func $c "drop-writable") (core func $drop-writable')) (core instance $core (instantiate $Core (with "" (instance (export "mem" (memory $memory "mem")) (export "future.new" (func $future.new)) - (export "future.read" (func $future.read)) - (export "future.close-readable" (func $future.close-readable)) + (export "future.drop-readable" (func $future.drop-readable)) (export "start-future" (func $start-future')) (export "attempt-write" (func $attempt-write')) - (export "close-writable" (func $close-writable')) + (export "drop-writable" (func $drop-writable')) )))) - (func (export "close-readable-future-before-read") (result u32) (canon lift (core func $core "close-readable-future-before-read"))) - (func (export "close-writable-future-before-write") (canon lift (core func $core "close-writable-future-before-write"))) + (func (export "drop-readable-future-before-read") (result u32) (canon lift (core func $core "drop-readable-future-before-read"))) + (func (export "drop-writable-future-before-write") (canon lift (core func $core "drop-writable-future-before-write"))) ) (instance $c (instantiate $C)) (instance $d (instantiate $D (with "c" (instance $c)))) - (func (export "close-writable-future-before-write") (alias export $d "close-writable-future-before-write")) - (func (export "close-readable-future-before-read") (alias export $d "close-readable-future-before-read")) + (func (export "drop-writable-future-before-write") (alias export $d "drop-writable-future-before-write")) + (func (export "drop-readable-future-before-read") (alias export $d "drop-readable-future-before-read")) ) -(assert_return (invoke "close-readable-future-before-read") (u32.const 42)) -(assert_trap (invoke "close-writable-future-before-write") "cannot close future write end without first writing a value") +(assert_return (invoke "drop-readable-future-before-read") (u32.const 42)) +(assert_trap (invoke "drop-writable-future-before-write") "cannot drop future write end without first writing a value") diff --git a/test/async/partial-stream-copies.wast b/test/async/partial-stream-copies.wast index 0526c36f..6ca89ab6 100644 --- a/test/async/partial-stream-copies.wast +++ b/test/async/partial-stream-copies.wast @@ -16,8 +16,8 @@ (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) - (import "" "stream.close-readable" (func $stream.close-readable (param i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) ;; $ws is waited on by 'transform' (global $ws (mut i32) (i32.const 0)) @@ -96,8 +96,8 @@ (if (i32.ne (i32.const 0x7654) (i32.load16_u offset=4 (global.get $inbufp))) (then unreachable)) - (call $stream.close-readable (global.get $insr)) - (call $stream.close-writable (global.get $outsw)) + (call $stream.drop-readable (global.get $insr)) + (call $stream.drop-writable (global.get $outsw)) (return (i32.const 0 (; EXIT ;))) ) ) @@ -108,8 +108,8 @@ (canon stream.new $ST (core func $stream.new)) (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) - (canon stream.close-readable $ST (core func $stream.close-readable)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (core instance $cm (instantiate $CM (with "" (instance (export "mem" (memory $memory "mem")) (export "task.return" (func $task.return)) @@ -118,8 +118,8 @@ (export "stream.new" (func $stream.new)) (export "stream.read" (func $stream.read)) (export "stream.write" (func $stream.write)) - (export "stream.close-readable" (func $stream.close-readable)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-readable" (func $stream.drop-readable)) + (export "stream.drop-writable" (func $stream.drop-writable)) )))) (func (export "transform") (param "in" (stream u8)) (result (stream u8)) (canon lift (core func $cm "transform") @@ -140,8 +140,8 @@ (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) - (import "" "stream.close-readable" (func $stream.close-readable (param i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) (import "" "transform" (func $transform (param i32 i32) (result i32))) (func $run (export "run") (result i32) @@ -184,23 +184,23 @@ (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) (then unreachable)) - ;; wait for transform to read our write and close all the streams + ;; wait for transform to read our write and drop all the streams (local.set $ws (call $waitable-set.new)) (call $waitable.join (local.get $insw) (local.get $ws)) (local.set $event_code (call $waitable-set.wait (local.get $ws) (i32.const 0))) (local.set $index (i32.load (i32.const 0))) (local.set $payload (i32.load (i32.const 4))) - ;; confirm the write and the closed stream + ;; confirm the write and the dropped stream (if (i32.ne (local.get $event_code) (i32.const 3 (; STREAM_WRITE ;))) (then unreachable)) (if (i32.ne (local.get $index) (local.get $insw)) (then unreachable)) - (if (i32.ne (local.get $payload) (i32.const 0xc1 (; CLOSED=1 | (12 << 4) ;))) + (if (i32.ne (local.get $payload) (i32.const 0xc1 (; DROPPED=1 | (12 << 4) ;))) (then unreachable)) - (call $stream.close-writable (local.get $insw)) - (call $stream.close-readable (local.get $outsr)) + (call $stream.drop-writable (local.get $insw)) + (call $stream.drop-readable (local.get $outsr)) ;; return 42 to the top-level test harness (i32.const 42) @@ -213,8 +213,8 @@ (canon stream.new $ST (core func $stream.new)) (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) - (canon stream.close-readable $ST (core func $stream.close-readable)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (canon lower (func $transform) async (memory $memory "mem") (core func $transform')) (core instance $dm (instantiate $DM (with "" (instance (export "mem" (memory $memory "mem")) @@ -224,8 +224,8 @@ (export "stream.new" (func $stream.new)) (export "stream.read" (func $stream.read)) (export "stream.write" (func $stream.write)) - (export "stream.close-readable" (func $stream.close-readable)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-readable" (func $stream.drop-readable)) + (export "stream.drop-writable" (func $stream.drop-writable)) (export "transform" (func $transform')) )))) (func (export "run") (result u32) (canon lift (core func $dm "run"))) diff --git a/test/async/passing-resources.wast b/test/async/passing-resources.wast index d6642741..70a9e4a0 100644 --- a/test/async/passing-resources.wast +++ b/test/async/passing-resources.wast @@ -15,7 +15,7 @@ (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) (import "" "stream.cancel-write" (func $stream.cancel-write (param i32) (result i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) (global $ws (mut i32) (i32.const 0)) (global $res1 (mut i32) (i32.const 0)) @@ -55,14 +55,14 @@ ;; cancel the write, confirming that the first element was transferred (local.set $ret (call $stream.cancel-write (global.get $ws))) - (if (i32.ne (i32.const 0x11 (; CLOSED=1 | (1 << 4) ;)) (local.get $ret)) + (if (i32.ne (i32.const 0x11 (; DROPPED=1 | (1 << 4) ;)) (local.get $ret)) (then unreachable)) ;; we still own $res2 (if (i32.ne (i32.const 51) (call $resource.rep (global.get $res2))) (then unreachable)) - (call $stream.close-writable (global.get $ws)) + (call $stream.drop-writable (global.get $ws)) ) (func $R.foo (export "R.foo") (param $rep i32) (result i32) (i32.add (local.get $rep) (i32.const 50)) @@ -80,7 +80,7 @@ (canon stream.new $ST (core func $stream.new)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) (canon stream.cancel-write $ST (core func $stream.cancel-write)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (core instance $core (instantiate $Core (with "" (instance (export "mem" (memory $memory "mem")) (export "resource.new" (func $resource.new)) @@ -88,7 +88,7 @@ (export "stream.new" (func $stream.new)) (export "stream.write" (func $stream.write)) (export "stream.cancel-write" (func $stream.cancel-write)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-writable" (func $stream.drop-writable)) )))) (export $R' "R" (type $R)) (func (export "[method]R.foo") (param "self" (borrow $R')) (result u32) (canon lift (core func $core "R.foo"))) @@ -110,7 +110,7 @@ (core module $Core (import "" "mem" (memory 1)) (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) - (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) (import "" "R.foo" (func $R.foo (param i32) (result i32))) (import "" "start-stream" (func $start-stream (result i32))) (import "" "cancel-write" (func $cancel-write)) @@ -140,8 +140,8 @@ (if (i32.ne (i32.const 100) (local.get $ret)) (then unreachable)) - ;; close the stream and then let $C run and assert stuff - (call $stream.close-readable (local.get $rs)) + ;; drop the stream and then let $C run and assert stuff + (call $stream.drop-readable (local.get $rs)) (call $cancel-write) (i32.const 42) @@ -150,14 +150,14 @@ (alias export $producer "R" (type $R)) (type $ST (stream (own $R))) (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) - (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) (canon lower (func $producer "[method]R.foo") (core func $R.foo')) (canon lower (func $producer "start-stream") (core func $start-stream')) (canon lower (func $producer "cancel-write") (core func $cancel-write')) (core instance $core (instantiate $Core (with "" (instance (export "mem" (memory $memory "mem")) (export "stream.read" (func $stream.read)) - (export "stream.close-readable" (func $stream.close-readable)) + (export "stream.drop-readable" (func $stream.drop-readable)) (export "R.foo" (func $R.foo')) (export "start-stream" (func $start-stream')) (export "cancel-write" (func $cancel-write')) diff --git a/test/async/trap-if-done.wast b/test/async/trap-if-done.wast new file mode 100644 index 00000000..92bf2831 --- /dev/null +++ b/test/async/trap-if-done.wast @@ -0,0 +1,468 @@ +;; This test has two components $C and $D, where $D imports and calls $C. +;; $C contains utility functions used by $D to create futures/streams, +;; write to them and close them. $D uses these utility functions to test for +;; all the cases where, once a future/stream is "done", further uses of the +;; future/stream trap. +;; +;; $D exports a list of functions, one for each case of trapping. Since traps +;; take out their containing instance, a fresh instance of $Tester is created +;; for each call to a $D export. +;; +;; When testing traps involving the readable end, the exports of $D take a +;; "bool" parameter that toggles whether the trap is triggered by +;; {stream,future}.{read,write} or by lifting, and the top-level commands +;; pass 'false' and 'true'. +(component definition $Tester + (component $C + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $CM + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.new" (func $future.new (result i64))) + (import "" "future.write" (func $future.write (param i32 i32) (result i32))) + (import "" "future.drop-writable" (func $future.drop-writable (param i32))) + (import "" "stream.new" (func $stream.new (result i64))) + (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) + + (global $writable-end (mut i32) (i32.const 0)) + (global $ws (mut i32) (i32.const 0)) + + (func $start (global.set $ws (call $waitable-set.new))) + (start $start) + + (func $start-future (export "start-future") (result i32) + ;; create a new future, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $future.new)) + (global.set $writable-end (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (call $waitable.join (global.get $writable-end) (global.get $ws) ) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $future-write (export "future-write") (result i32) + ;; the caller will assert what they expect the return value to be + (i32.store (i32.const 16) (i32.const 42)) + (call $future.write (global.get $writable-end) (i32.const 16)) + ) + (func $acknowledge-future-write (export "acknowledge-future-write") + ;; confirm we got a FUTURE_WRITE $writable-end COMPLETED event + (local $ret i32) + (local.set $ret (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 5 (; FUTURE_WRITE ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (global.get $writable-end) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (i32.load (i32.const 4))) + (then unreachable)) + ) + (func $future-drop-writable (export "future-drop-writable") + ;; maybe boom + (call $future.drop-writable (global.get $writable-end)) + ) + + (func $start-stream (export "start-stream") (result i32) + ;; create a new stream, return the readable end to the caller + (local $ret64 i64) + (local.set $ret64 (call $stream.new)) + (global.set $writable-end (i32.wrap_i64 (i64.shr_u (local.get $ret64) (i64.const 32)))) + (call $waitable.join (global.get $writable-end) (global.get $ws) ) + (i32.wrap_i64 (local.get $ret64)) + ) + (func $stream-write (export "stream-write") (result i32) + ;; the caller will assert what they expect the return value to be + (i32.store (i32.const 16) (i32.const 42)) + (call $stream.write (global.get $writable-end) (i32.const 16) (i32.const 1)) + ) + (func $acknowledge-stream-write (export "acknowledge-stream-write") + ;; confirm we got a STREAM_WRITE $writable-end COMPLETED event + (local $ret i32) + (local.set $ret (call $waitable-set.wait (global.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 3 (; STREAM_WRITE ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (global.get $writable-end) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0x11 (; DROPPED=1 | (1<<4) ;)) (i32.load (i32.const 4))) + (then unreachable)) + ) + (func $stream-drop-writable (export "stream-drop-writable") + ;; maybe boom + (call $stream.drop-writable (global.get $writable-end)) + ) + ) + (type $FT (future u8)) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon future.new $FT (core func $future.new)) + (canon future.write $FT async (memory $memory "mem") (core func $future.write)) + (canon future.drop-writable $FT (core func $future.drop-writable)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) + (core instance $cm (instantiate $CM (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "future.new" (func $future.new)) + (export "future.write" (func $future.write)) + (export "future.drop-writable" (func $future.drop-writable)) + (export "stream.new" (func $stream.new)) + (export "stream.write" (func $stream.write)) + (export "stream.drop-writable" (func $stream.drop-writable)) + )))) + (func (export "start-future") (result (future u8)) (canon lift (core func $cm "start-future"))) + (func (export "future-write") (result u32) (canon lift (core func $cm "future-write"))) + (func (export "acknowledge-future-write") (canon lift (core func $cm "acknowledge-future-write"))) + (func (export "future-drop-writable") (canon lift (core func $cm "future-drop-writable"))) + (func (export "start-stream") (result (stream u8)) (canon lift (core func $cm "start-stream"))) + (func (export "stream-write") (result u32) (canon lift (core func $cm "stream-write"))) + (func (export "acknowledge-stream-write") (canon lift (core func $cm "acknowledge-stream-write"))) + (func (export "stream-drop-writable") (canon lift (core func $cm "stream-drop-writable"))) + ) + (component $D + (import "c" (instance $c + (export "start-future" (func (result (future u8)))) + (export "future-write" (func (result u32))) + (export "acknowledge-future-write" (func)) + (export "future-drop-writable" (func)) + (export "start-stream" (func (result (stream u8)))) + (export "stream-write" (func (result u32))) + (export "acknowledge-stream-write" (func)) + (export "stream-drop-writable" (func)) + )) + + (core module $Memory (memory (export "mem") 1)) + (core instance $memory (instantiate $Memory)) + (core module $Core + (import "" "mem" (memory 1)) + (import "" "waitable.join" (func $waitable.join (param i32 i32))) + (import "" "waitable-set.new" (func $waitable-set.new (result i32))) + (import "" "waitable-set.wait" (func $waitable-set.wait (param i32 i32) (result i32))) + (import "" "future.read" (func $future.read (param i32 i32) (result i32))) + (import "" "future.drop-readable" (func $future.drop-readable (param i32))) + (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) + (import "" "start-future" (func $start-future (result i32))) + (import "" "future-write" (func $future-write (result i32))) + (import "" "acknowledge-future-write" (func $acknowledge-future-write)) + (import "" "future-drop-writable" (func $future-drop-writable)) + (import "" "start-stream" (func $start-stream (result i32))) + (import "" "stream-write" (func $stream-write (result i32))) + (import "" "acknowledge-stream-write" (func $acknowledge-stream-write)) + (import "" "stream-drop-writable" (func $stream-drop-writable)) + + (func $trap-after-future-eager-write (export "trap-after-future-eager-write") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; start a read on our end so the next write will succeed + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; calling future.write in $C should succeed eagerly + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; calling future.write in $C now should trap + (drop (call $future-write)) + ) + (func $trap-after-future-async-write (export "trap-after-future-async-write") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; calling future.write in $C should block + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our future.read should then succeed eagerly + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; let $C see the write completed so the future is 'done' + (call $acknowledge-future-write) + + ;; trying to call future.write again in $C should trap + (drop (call $future-write)) + ) + (func $trap-after-future-reader-dropped (export "trap-after-future-reader-dropped") + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; drop our readable end before writer can write + (call $future.drop-readable (local.get $fr)) + + ;; let $C try to future.write and find out we DROPPED + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 1 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) + + ;; trying to call future.write again in $C should trap + (drop (call $future-write)) + ) + (func $trap-after-future-eager-read (export "trap-after-future-eager-read") (param $bool i32) (result i32) + (local $ret i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; calling future.write in $C should block + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our future.read should then succeed eagerly + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling future.read again should then trap + (drop (call $future.read (local.get $fr) (i32.const 16))) + ) (else + ;; lifting the future by returning it should also trap + (return (local.get $fr)) + )) + unreachable + ) + (func $trap-after-future-async-read (export "trap-after-future-async-read") (param $bool i32) (result i32) + (local $ret i32) (local $ws i32) + (local $fr i32) + (local.set $fr (call $start-future)) + + ;; read first, so it blocks + (local.set $ret (call $future.read (local.get $fr) (i32.const 16))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; calling future.write in $C should then succeed eagerly + (local.set $ret (call $future-write)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; wait to see that our blocked future.read COMPLETED, producing '42' + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $fr) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 4 (; FUTURE_READ ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $fr) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0 (; COMPLETED ;)) (i32.load (i32.const 4))) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load (i32.const 16))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling future.read again should then trap + (drop (call $future.read (local.get $fr) (i32.const 16))) + ) (else + ;; lifting the future by returning it should also trap + (return (local.get $fr)) + )) + unreachable + ) + (func $trap-after-stream-reader-eager-dropped (export "trap-after-stream-reader-eager-dropped") + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; drop our readable end before writer can write + (call $stream.drop-readable (local.get $sr)) + + ;; let $C try to stream.write and find out we DROPPED + (local.set $ret (call $stream-write)) + (if (i32.ne (i32.const 1 (; DROPPED ;)) (local.get $ret)) + (then unreachable)) + + ;; trying to call stream.write again in $C should trap + (drop (call $stream-write)) + ) + (func $trap-after-stream-reader-async-dropped (export "trap-after-stream-reader-async-dropped") + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; calling stream.write in $C should block + (local.set $ret (call $stream-write)) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; our stream.read should then succeed eagerly + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const 0x10 (; COMPLETED=0 | (1<<4) ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (i32.const 42) (i32.load8_u (i32.const 16))) + (then unreachable)) + + ;; then drop our readable end + (call $stream.drop-readable (local.get $sr)) + + ;; let $C see that it's stream.write COMPLETED and wrote 1 elem + (call $acknowledge-stream-write) + + ;; now calling stream.write again in $C will trap + (drop (call $stream-write)) + ) + (func $trap-after-stream-writer-eager-dropped (export "trap-after-stream-writer-eager-dropped") (param $bool i32) (result i32) + (local $ret i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; immediately drop the writable end + (call $stream-drop-writable) + + ;; calling stream.read will see that the writer dropped + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const 0x01 (; DROPPED=1 | (0<<4) ;)) (local.get $ret)) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling stream.read again should then trap + (drop (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + ) (else + ;; lifting the stream by returning it should also trap + (return (local.get $sr)) + )) + unreachable + ) + (func $trap-after-stream-writer-async-dropped (export "trap-after-stream-writer-async-dropped") (param $bool i32) (result i32) + (local $ret i32) (local $ws i32) + (local $sr i32) + (local.set $sr (call $start-stream)) + + ;; start a read on our end first which will block + (local.set $ret (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + (if (i32.ne (i32.const -1 (; BLOCKED ;)) (local.get $ret)) + (then unreachable)) + + ;; drop the writable end before writing anything + (call $stream-drop-writable) + + ;; wait to see that our blocked stream.read was DROPPED + (local.set $ws (call $waitable-set.new)) + (call $waitable.join (local.get $sr) (local.get $ws)) + (local.set $ret (call $waitable-set.wait (local.get $ws) (i32.const 0))) + (if (i32.ne (i32.const 2 (; STREAM_READ ;)) (local.get $ret)) + (then unreachable)) + (if (i32.ne (local.get $sr) (i32.load (i32.const 0))) + (then unreachable)) + (if (i32.ne (i32.const 0x01 (; DROPPED=1 | (0<<4) ;)) (i32.load (i32.const 4))) + (then unreachable)) + + (if (i32.eqz (local.get $bool)) (then + ;; calling stream.read again should then trap + (drop (call $stream.read (local.get $sr) (i32.const 16) (i32.const 100))) + ) (else + ;; lifting the stream by returning it should also trap + (return (local.get $sr)) + )) + unreachable + ) + ) + (type $FT (future u8)) + (type $ST (stream u8)) + (canon waitable.join (core func $waitable.join)) + (canon waitable-set.new (core func $waitable-set.new)) + (canon waitable-set.wait (memory $memory "mem") (core func $waitable-set.wait)) + (canon future.new $FT (core func $future.new)) + (canon future.read $FT async (memory $memory "mem") (core func $future.read)) + (canon future.drop-readable $FT (core func $future.drop-readable)) + (canon stream.new $ST (core func $stream.new)) + (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) + (canon lower (func $c "start-future") (core func $start-future')) + (canon lower (func $c "future-write") (core func $future-write')) + (canon lower (func $c "acknowledge-future-write") (core func $acknowledge-future-write')) + (canon lower (func $c "future-drop-writable") (core func $future-drop-writable')) + (canon lower (func $c "start-stream") (core func $start-stream')) + (canon lower (func $c "stream-write") (core func $stream-write')) + (canon lower (func $c "acknowledge-stream-write") (core func $acknowledge-stream-write')) + (canon lower (func $c "stream-drop-writable") (core func $stream-drop-writable')) + (core instance $core (instantiate $Core (with "" (instance + (export "mem" (memory $memory "mem")) + (export "waitable.join" (func $waitable.join)) + (export "waitable-set.new" (func $waitable-set.new)) + (export "waitable-set.wait" (func $waitable-set.wait)) + (export "future.new" (func $future.new)) + (export "future.read" (func $future.read)) + (export "future.drop-readable" (func $future.drop-readable)) + (export "stream.new" (func $stream.new)) + (export "stream.read" (func $stream.read)) + (export "stream.drop-readable" (func $stream.drop-readable)) + (export "start-future" (func $start-future')) + (export "future-write" (func $future-write')) + (export "acknowledge-future-write" (func $acknowledge-future-write')) + (export "future-drop-writable" (func $future-drop-writable')) + (export "start-stream" (func $start-stream')) + (export "stream-write" (func $stream-write')) + (export "acknowledge-stream-write" (func $acknowledge-stream-write')) + (export "stream-drop-writable" (func $stream-drop-writable')) + )))) + (func (export "trap-after-future-eager-write") (canon lift (core func $core "trap-after-future-eager-write"))) + (func (export "trap-after-future-async-write") (canon lift (core func $core "trap-after-future-async-write"))) + (func (export "trap-after-future-reader-dropped") (canon lift (core func $core "trap-after-future-reader-dropped"))) + (func (export "trap-after-future-eager-read") (param "bool" bool) (result $FT) (canon lift (core func $core "trap-after-future-eager-read"))) + (func (export "trap-after-future-async-read") (param "bool" bool) (result $FT) (canon lift (core func $core "trap-after-future-async-read"))) + (func (export "trap-after-stream-reader-eager-dropped") (canon lift (core func $core "trap-after-stream-reader-eager-dropped"))) + (func (export "trap-after-stream-reader-async-dropped") (canon lift (core func $core "trap-after-stream-reader-async-dropped"))) + (func (export "trap-after-stream-writer-eager-dropped") (param "bool" bool) (result $ST) (canon lift (core func $core "trap-after-stream-writer-eager-dropped"))) + (func (export "trap-after-stream-writer-async-dropped") (param "bool" bool) (result $ST) (canon lift (core func $core "trap-after-stream-writer-async-dropped"))) + ) + (instance $c (instantiate $C)) + (instance $d (instantiate $D (with "c" (instance $c)))) + (func (export "trap-after-future-eager-write") (alias export $d "trap-after-future-eager-write")) + (func (export "trap-after-future-async-write") (alias export $d "trap-after-future-async-write")) + (func (export "trap-after-future-reader-dropped") (alias export $d "trap-after-future-reader-dropped")) + (func (export "trap-after-future-eager-read") (alias export $d "trap-after-future-eager-read")) + (func (export "trap-after-future-async-read") (alias export $d "trap-after-future-async-read")) + (func (export "trap-after-stream-reader-eager-dropped") (alias export $d "trap-after-stream-reader-eager-dropped")) + (func (export "trap-after-stream-reader-async-dropped") (alias export $d "trap-after-stream-reader-async-dropped")) + (func (export "trap-after-stream-writer-eager-dropped") (alias export $d "trap-after-stream-writer-eager-dropped")) + (func (export "trap-after-stream-writer-async-dropped") (alias export $d "trap-after-stream-writer-async-dropped")) +) + +(component instance $i1 $Tester) +(assert_trap (invoke "trap-after-future-eager-write") "cannot write to future after previous write succeeded") +(component instance $i2 $Tester) +(assert_trap (invoke "trap-after-future-async-write") "cannot write to future after previous write succeeded") +(component instance $i3 $Tester) +(assert_trap (invoke "trap-after-future-reader-dropped") "cannot write to future after being notified that the readable end dropped") +(component instance $i4.1 $Tester) +(assert_trap (invoke "trap-after-future-eager-read" (bool.const false)) "cannot read from future after previous read succeeded") +(component instance $i4.2 $Tester) +(assert_trap (invoke "trap-after-future-eager-read" (bool.const true)) "cannot lift future after previous read succeeded") +(component instance $i5.1 $Tester) +(assert_trap (invoke "trap-after-future-async-read" (bool.const false)) "cannot read from future after previous read succeeded") +(component instance $i5.2 $Tester) +(assert_trap (invoke "trap-after-future-async-read" (bool.const true)) "cannot lift future after previous read succeeded") +(component instance $i6 $Tester) +(assert_trap (invoke "trap-after-stream-reader-eager-dropped") "cannot write to stream after being notified that the readable end dropped") +(component instance $i7 $Tester) +(assert_trap (invoke "trap-after-stream-reader-async-dropped") "cannot write to stream after being notified that the readable end dropped") +(component instance $i8.1 $Tester) +(assert_trap (invoke "trap-after-stream-writer-eager-dropped" (bool.const false)) "cannot read from stream after being notified that the writable end dropped") +(component instance $i8.2 $Tester) +(assert_trap (invoke "trap-after-stream-writer-eager-dropped" (bool.const true)) "cannot lift stream after being notified that the writable end dropped") +(component instance $i9.1 $Tester) +(assert_trap (invoke "trap-after-stream-writer-async-dropped" (bool.const false)) "cannot read from stream after being notified that the writable end dropped") +(component instance $i9.2 $Tester) +(assert_trap (invoke "trap-after-stream-writer-async-dropped" (bool.const true)) "cannot lift stream after being notified that the writable end dropped") diff --git a/test/async/zero-length.wast b/test/async/zero-length.wast index 0170581b..a75978f4 100644 --- a/test/async/zero-length.wast +++ b/test/async/zero-length.wast @@ -15,7 +15,7 @@ (import "" "waitable-set.new" (func $waitable-set.new (result i32))) (import "" "stream.new" (func $stream.new (result i64))) (import "" "stream.write" (func $stream.write (param i32 i32 i32) (result i32))) - (import "" "stream.close-writable" (func $stream.close-writable (param i32))) + (import "" "stream.drop-writable" (func $stream.drop-writable (param i32))) ;; $ws is waited on by 'produce' (global $ws (mut i32) (i32.const 0)) @@ -78,10 +78,10 @@ ;; the second call to produce_cb: (if (i32.eq (global.get $state) (i32.const 1)) (then ;; confirm we're seeing the non-zero-length write complete - (if (i32.ne (local.get $payload) (i32.const 0x41 (; CLOSED=1 | (4 << 4) ;))) + (if (i32.ne (local.get $payload) (i32.const 0x41 (; DROPPED=1 | (4 << 4) ;))) (then unreachable)) - (call $stream.close-writable (global.get $outsw)) + (call $stream.drop-writable (global.get $outsw)) (return (i32.const 0 (; EXIT ;))) )) @@ -94,7 +94,7 @@ (canon waitable-set.new (core func $waitable-set.new)) (canon stream.new $ST (core func $stream.new)) (canon stream.write $ST async (memory $memory "mem") (core func $stream.write)) - (canon stream.close-writable $ST (core func $stream.close-writable)) + (canon stream.drop-writable $ST (core func $stream.drop-writable)) (core instance $core_producer (instantiate $CoreProducer (with "" (instance (export "mem" (memory $memory "mem")) (export "task.return" (func $task.return)) @@ -102,7 +102,7 @@ (export "waitable-set.new" (func $waitable-set.new)) (export "stream.new" (func $stream.new)) (export "stream.write" (func $stream.write)) - (export "stream.close-writable" (func $stream.close-writable)) + (export "stream.drop-writable" (func $stream.drop-writable)) )))) (func (export "produce") (result (stream u8)) (canon lift (core func $core_producer "produce") @@ -119,7 +119,7 @@ (import "" "waitable.join" (func $waitable.join (param i32 i32))) (import "" "waitable-set.new" (func $waitable-set.new (result i32))) (import "" "stream.read" (func $stream.read (param i32 i32 i32) (result i32))) - (import "" "stream.close-readable" (func $stream.close-readable (param i32))) + (import "" "stream.drop-readable" (func $stream.drop-readable (param i32))) ;; $ws is waited on by 'consume' (global $ws (mut i32) (i32.const 0)) @@ -163,7 +163,7 @@ (if (i32.ne (i32.const 0x12345678) (local.get $ret)) (then unreachable)) - (call $stream.close-readable (global.get $insr)) + (call $stream.drop-readable (global.get $insr)) ;; return 42 to the top-level assert_return (call $task.return (i32.const 42)) @@ -175,14 +175,14 @@ (canon waitable.join (core func $waitable.join)) (canon waitable-set.new (core func $waitable-set.new)) (canon stream.read $ST async (memory $memory "mem") (core func $stream.read)) - (canon stream.close-readable $ST (core func $stream.close-readable)) + (canon stream.drop-readable $ST (core func $stream.drop-readable)) (core instance $core_consumer (instantiate $CoreConsumer (with "" (instance (export "mem" (memory $memory "mem")) (export "task.return" (func $task.return)) (export "waitable.join" (func $waitable.join)) (export "waitable-set.new" (func $waitable-set.new)) (export "stream.read" (func $stream.read)) - (export "stream.close-readable" (func $stream.close-readable)) + (export "stream.drop-readable" (func $stream.drop-readable)) )))) (func (export "consume") (param "in" (stream u8)) (result u32) (canon lift (core func $core_consumer "consume")