From d2fde7a1dcd605f6a258ec97751559f6e4a80d45 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 16 Sep 2019 08:18:18 +0200 Subject: [PATCH 1/4] join stream works! Signed-off-by: Yoshua Wuyts --- src/join_stream.rs | 36 ++++++++++++++++++++++++++++++++++++ src/lib.rs | 2 ++ 2 files changed, 38 insertions(+) create mode 100644 src/join_stream.rs diff --git a/src/join_stream.rs b/src/join_stream.rs new file mode 100644 index 0000000..64aa8c8 --- /dev/null +++ b/src/join_stream.rs @@ -0,0 +1,36 @@ +use std::marker::PhantomData; +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; + +/// A stream joining two or more streams. +/// +/// This stream is returned by `join!`. +#[derive(Debug)] +pub struct JoinStream<'a, L, R, T> { + left: &'a mut L, + right: &'a mut R, + _marker: PhantomData, +} + +impl Unpin for JoinStream<'_, L, R, T> {} + +impl<'a, L, R, T> Stream for JoinStream<'a, L, R, T> +where + L: Stream + Unpin, + R: Stream + Unpin, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = Pin::new(&mut *self.left).poll_next(cx) { + // The first stream made progress. The JoinStream needs to be polled + // again to check the progress of the second stream. + cx.waker().wake_by_ref(); + Poll::Ready(Some(item)) + } else { + Pin::new(&mut *self.right).poll_next(cx) + } + } +} diff --git a/src/lib.rs b/src/lib.rs index 3a6a8a7..47fd751 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,6 +21,7 @@ #![cfg_attr(test, deny(warnings))] mod join; +mod join_stream; mod maybe_done; mod poll_fn; mod ready; @@ -28,6 +29,7 @@ mod select; mod try_join; mod try_select; +pub use join_stream::JoinStream; pub use maybe_done::MaybeDone; /// Helper re-exports for use in macros. From 02fad0db6984d64a731935337e5b4fa582057af6 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 16 Sep 2019 09:08:22 +0200 Subject: [PATCH 2/4] join_stream done Signed-off-by: Yoshua Wuyts --- examples/main.rs | 5 ++++ src/join_stream.rs | 60 ++++++++++++++++++++++++++++++++++++++++------ src/lib.rs | 1 - 3 files changed, 58 insertions(+), 8 deletions(-) diff --git a/examples/main.rs b/examples/main.rs index 740e83b..31bb74c 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -10,6 +10,11 @@ fn main() { let c = future::ready(Ok(1u8)); assert_eq!(try_select!(a, b, c).await?, 1u8); + + use async_macros::JoinStream; + use futures::stream::{self, StreamExt}; + use futures::future::ready; + Ok(()) } main().await.unwrap(); diff --git a/src/join_stream.rs b/src/join_stream.rs index 64aa8c8..c7892f5 100644 --- a/src/join_stream.rs +++ b/src/join_stream.rs @@ -8,15 +8,26 @@ use futures_core::Stream; /// /// This stream is returned by `join!`. #[derive(Debug)] -pub struct JoinStream<'a, L, R, T> { - left: &'a mut L, - right: &'a mut R, +pub struct JoinStream { + left: L, + right: R, _marker: PhantomData, } -impl Unpin for JoinStream<'_, L, R, T> {} +impl Unpin for JoinStream {} -impl<'a, L, R, T> Stream for JoinStream<'a, L, R, T> +impl JoinStream { + #[doc(hidden)] + pub fn new(left: L, right: R) -> Self { + Self { + left, + right, + _marker: PhantomData, + } + } +} + +impl Stream for JoinStream where L: Stream + Unpin, R: Stream + Unpin, @@ -24,13 +35,48 @@ where type Item = T; fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - if let Poll::Ready(Some(item)) = Pin::new(&mut *self.left).poll_next(cx) { + if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) { // The first stream made progress. The JoinStream needs to be polled // again to check the progress of the second stream. cx.waker().wake_by_ref(); Poll::Ready(Some(item)) } else { - Pin::new(&mut *self.right).poll_next(cx) + Pin::new(&mut self.right).poll_next(cx) } } } + +/// Combines multiple streams into a single stream of all their outputs. +/// +/// This macro is only usable inside of async functions, closures, and blocks. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use async_macros::join_stream as join; +/// use futures::stream::{self, StreamExt}; +/// use futures::future::ready; +/// +/// let a = &mut stream::once(ready(1u8)); +/// let b = &mut stream::once(ready(2u8)); +/// let c = &mut stream::once(ready(3u8)); +/// +/// let mut s = join!(a, b, c); +/// +/// assert_eq!(s.next().await, Some(1u8)); +/// assert_eq!(s.next().await, Some(2u8)); +/// assert_eq!(s.next().await, Some(3u8)); +/// assert_eq!(s.next().await, None); +/// # }); +/// ``` +#[macro_export] +macro_rules! join_stream { + ($stream1:ident, $stream2:ident, $($stream:ident),* $(,)?) => {{ + let joined = $crate::JoinStream::new($stream1, $stream2); + $( + let joined = $crate::JoinStream::new(joined, $stream); + )* + joined + }}; +} diff --git a/src/lib.rs b/src/lib.rs index 47fd751..fb2e6bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ //! # Examples //! //! ``` -//! #![feature(async_await)] //! # futures::executor::block_on(async { //! use async_macros::join; //! use futures::future; From ee3a5d649d80cb09953d47d2c9182581b589dc9d Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 16 Sep 2019 09:20:05 +0200 Subject: [PATCH 3/4] clean up join_stream Signed-off-by: Yoshua Wuyts --- examples/main.rs | 5 ----- src/join_stream.rs | 6 +++--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/examples/main.rs b/examples/main.rs index 31bb74c..740e83b 100644 --- a/examples/main.rs +++ b/examples/main.rs @@ -10,11 +10,6 @@ fn main() { let c = future::ready(Ok(1u8)); assert_eq!(try_select!(a, b, c).await?, 1u8); - - use async_macros::JoinStream; - use futures::stream::{self, StreamExt}; - use futures::future::ready; - Ok(()) } main().await.unwrap(); diff --git a/src/join_stream.rs b/src/join_stream.rs index c7892f5..33faff8 100644 --- a/src/join_stream.rs +++ b/src/join_stream.rs @@ -58,9 +58,9 @@ where /// use futures::stream::{self, StreamExt}; /// use futures::future::ready; /// -/// let a = &mut stream::once(ready(1u8)); -/// let b = &mut stream::once(ready(2u8)); -/// let c = &mut stream::once(ready(3u8)); +/// let a = stream::once(ready(1u8)); +/// let b = stream::once(ready(2u8)); +/// let c = stream::once(ready(3u8)); /// /// let mut s = join!(a, b, c); /// From 0798d2a1075c701ab19ce809982e244a4d94fee8 Mon Sep 17 00:00:00 2001 From: Yoshua Wuyts Date: Mon, 16 Sep 2019 09:42:23 +0200 Subject: [PATCH 4/4] remove phantomdata Signed-off-by: Yoshua Wuyts --- src/join_stream.rs | 16 +++++----------- 1 file changed, 5 insertions(+), 11 deletions(-) diff --git a/src/join_stream.rs b/src/join_stream.rs index 33faff8..1cf25fa 100644 --- a/src/join_stream.rs +++ b/src/join_stream.rs @@ -1,4 +1,3 @@ -use std::marker::PhantomData; use std::pin::Pin; use std::task::{Context, Poll}; @@ -8,26 +7,21 @@ use futures_core::Stream; /// /// This stream is returned by `join!`. #[derive(Debug)] -pub struct JoinStream { +pub struct JoinStream { left: L, right: R, - _marker: PhantomData, } -impl Unpin for JoinStream {} +impl Unpin for JoinStream {} -impl JoinStream { +impl JoinStream { #[doc(hidden)] pub fn new(left: L, right: R) -> Self { - Self { - left, - right, - _marker: PhantomData, - } + Self { left, right } } } -impl Stream for JoinStream +impl Stream for JoinStream where L: Stream + Unpin, R: Stream + Unpin,