From 52b5b1047f2be8a7abf7887795dd5d32b8dab49a Mon Sep 17 00:00:00 2001 From: Daniel Olano Date: Sun, 20 Dec 2020 20:11:12 +0100 Subject: [PATCH 1/2] Add stream module to futures crate The stream module exposes the JsStream type used to convert JS objects implementing the AsyncIterator interface to be used as Rust streams. --- crates/futures/Cargo.toml | 2 + crates/futures/src/lib.rs | 1 + crates/futures/src/stream.rs | 81 +++++++++++++++++++++++++++++++++++ crates/futures/tests/tests.rs | 23 +++++++++- 4 files changed, 105 insertions(+), 2 deletions(-) create mode 100644 crates/futures/src/stream.rs diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 53dc829105a..28b1cd1d253 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -14,6 +14,7 @@ edition = "2018" cfg-if = "1.0.0" js-sys = { path = "../js-sys", version = '0.3.46' } wasm-bindgen = { path = "../..", version = '0.2.69' } +futures-core = { version = '0.3.8', default-features = false } [target.'cfg(target_feature = "atomics")'.dependencies.web-sys] path = "../web-sys" @@ -26,3 +27,4 @@ features = [ [target.'cfg(target_arch = "wasm32")'.dev-dependencies] wasm-bindgen-test = { path = '../test', version = '0.3.19' } futures-channel-preview = { version = "0.3.0-alpha.18" } +futures-lite = { version = "1.11.3", default-features = false } diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 8447b25abd1..26ee20dc9c2 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -43,6 +43,7 @@ use std::task::{Context, Poll, Waker}; use wasm_bindgen::prelude::*; mod queue; +pub mod stream; mod task { use cfg_if::cfg_if; diff --git a/crates/futures/src/stream.rs b/crates/futures/src/stream.rs new file mode 100644 index 00000000000..208120c3106 --- /dev/null +++ b/crates/futures/src/stream.rs @@ -0,0 +1,81 @@ +//! Converting JavaScript `AsyncIterator`s to Rust `Stream`s. +//! +//! Analogous to the promise to future convertion, this module allows the +//! turing objects implementing the async iterator protocol into `Stream`s +//! that produce values that can be awaited from. +//! + +use crate::JsFuture; +use core::future::Future; +use core::pin::Pin; +use core::task::{Context, Poll}; +use futures_core::stream::Stream; +use js_sys::{AsyncIterator, IteratorNext}; +use wasm_bindgen::{prelude::*, JsCast}; + +/// A `Stream` that yields values from an underlying `AsyncIterator`. +pub struct JsStream { + iter: AsyncIterator, + next: Option, + done: bool, +} + +impl JsStream { + fn next_future(&self) -> Result { + self.iter.next().map(JsFuture::from) + } +} + +impl From for JsStream { + fn from(iter: AsyncIterator) -> Self { + JsStream { + iter, + next: None, + done: false, + } + } +} + +impl Stream for JsStream { + type Item = Result; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + if self.done { + return Poll::Ready(None); + } + + let future = match self.next.as_mut() { + Some(val) => val, + None => match self.next_future() { + Ok(val) => { + self.next = Some(val); + self.next.as_mut().unwrap() + } + Err(e) => { + self.done = true; + return Poll::Ready(Some(Err(e))); + } + }, + }; + + match Pin::new(future).poll(cx) { + Poll::Ready(res) => match res { + Ok(iter_next) => { + let next = iter_next.unchecked_into::(); + if next.done() { + self.done = true; + Poll::Ready(None) + } else { + self.next.take(); + Poll::Ready(Some(Ok(next.value()))) + } + } + Err(e) => { + self.done = true; + Poll::Ready(Some(Err(e))) + } + }, + Poll::Pending => Poll::Pending, + } + } +} diff --git a/crates/futures/tests/tests.rs b/crates/futures/tests/tests.rs index f53866fb154..ad95736722f 100644 --- a/crates/futures/tests/tests.rs +++ b/crates/futures/tests/tests.rs @@ -3,8 +3,8 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); use futures_channel::oneshot; -use wasm_bindgen::prelude::*; -use wasm_bindgen_futures::{future_to_promise, spawn_local, JsFuture}; +use wasm_bindgen::{prelude::*, JsCast}; +use wasm_bindgen_futures::{future_to_promise, spawn_local, stream::JsStream, JsFuture}; use wasm_bindgen_test::*; #[wasm_bindgen_test] @@ -88,3 +88,22 @@ async fn can_create_multiple_futures_from_same_promise() { a.await.unwrap(); b.await.unwrap(); } + +#[wasm_bindgen_test] +async fn can_use_an_async_iterable_as_stream() { + use futures_lite::stream::StreamExt; + let async_iter = js_sys::Function::new_no_args( + "return async function*() { + yield 42; + yield 24; + }()", + ) + .call0(&JsValue::undefined()) + .unwrap() + .unchecked_into::(); + + let mut stream = JsStream::from(async_iter); + assert_eq!(stream.next().await, Some(Ok(JsValue::from(42)))); + assert_eq!(stream.next().await, Some(Ok(JsValue::from(24)))); + assert_eq!(stream.next().await, None); +} From a8edfb117c79654773cf3d9b4da3e4a01b9884ab Mon Sep 17 00:00:00 2001 From: Daniel Olano Date: Mon, 4 Jan 2021 18:34:49 +0100 Subject: [PATCH 2/2] Add stream feature to futures crate --- crates/futures/Cargo.toml | 5 ++++- crates/futures/README.md | 3 +++ crates/futures/src/lib.rs | 1 + crates/futures/tests/tests.rs | 8 ++++++-- 4 files changed, 14 insertions(+), 3 deletions(-) diff --git a/crates/futures/Cargo.toml b/crates/futures/Cargo.toml index 28b1cd1d253..7945d5e747a 100644 --- a/crates/futures/Cargo.toml +++ b/crates/futures/Cargo.toml @@ -14,7 +14,10 @@ edition = "2018" cfg-if = "1.0.0" js-sys = { path = "../js-sys", version = '0.3.46' } wasm-bindgen = { path = "../..", version = '0.2.69' } -futures-core = { version = '0.3.8', default-features = false } +futures-core = { version = '0.3.8', default-features = false, optional = true } + +[features] +futures-core-03-stream = ['futures-core'] [target.'cfg(target_feature = "atomics")'.dependencies.web-sys] path = "../web-sys" diff --git a/crates/futures/README.md b/crates/futures/README.md index 9f6518fef81..eb0a067f3ca 100644 --- a/crates/futures/README.md +++ b/crates/futures/README.md @@ -8,6 +8,9 @@ This crate bridges the gap between a Rust `Future` and a JavaScript 1. From a JavaScript `Promise` into a Rust `Future`. 2. From a Rust `Future` into a JavaScript `Promise`. +Additionally under the feature flag `futures-core-03-stream` there is experimental +support for `AsyncIterator` to `Stream` conversion. + See the [API documentation][docs] for more info. [docs]: https://rustwasm.github.io/wasm-bindgen/api/wasm_bindgen_futures/ diff --git a/crates/futures/src/lib.rs b/crates/futures/src/lib.rs index 26ee20dc9c2..c2a59d76286 100644 --- a/crates/futures/src/lib.rs +++ b/crates/futures/src/lib.rs @@ -43,6 +43,7 @@ use std::task::{Context, Poll, Waker}; use wasm_bindgen::prelude::*; mod queue; +#[cfg(feature = "futures-core-03-stream")] pub mod stream; mod task { diff --git a/crates/futures/tests/tests.rs b/crates/futures/tests/tests.rs index ad95736722f..d02d60c06a5 100644 --- a/crates/futures/tests/tests.rs +++ b/crates/futures/tests/tests.rs @@ -3,8 +3,8 @@ wasm_bindgen_test::wasm_bindgen_test_configure!(run_in_browser); use futures_channel::oneshot; -use wasm_bindgen::{prelude::*, JsCast}; -use wasm_bindgen_futures::{future_to_promise, spawn_local, stream::JsStream, JsFuture}; +use wasm_bindgen::prelude::*; +use wasm_bindgen_futures::{future_to_promise, spawn_local, JsFuture}; use wasm_bindgen_test::*; #[wasm_bindgen_test] @@ -89,9 +89,13 @@ async fn can_create_multiple_futures_from_same_promise() { b.await.unwrap(); } +#[cfg(feature = "futures-core-03-stream")] #[wasm_bindgen_test] async fn can_use_an_async_iterable_as_stream() { use futures_lite::stream::StreamExt; + use wasm_bindgen::JsCast; + use wasm_bindgen_futures::stream::JsStream; + let async_iter = js_sys::Function::new_no_args( "return async function*() { yield 42;