diff --git a/src/join_stream.rs b/src/join_stream.rs new file mode 100644 index 0000000..1cf25fa --- /dev/null +++ b/src/join_stream.rs @@ -0,0 +1,76 @@ +use std::pin::Pin; +use std::task::{Context, Poll}; + +use futures_core::Stream; + +/// A stream joining two or more streams. +/// +/// This stream is returned by `join!`. +#[derive(Debug)] +pub struct JoinStream { + left: L, + right: R, +} + +impl Unpin for JoinStream {} + +impl JoinStream { + #[doc(hidden)] + pub fn new(left: L, right: R) -> Self { + Self { left, right } + } +} + +impl Stream for JoinStream +where + L: Stream + Unpin, + R: Stream + Unpin, +{ + type Item = T; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + if let Poll::Ready(Some(item)) = Pin::new(&mut self.left).poll_next(cx) { + // The first stream made progress. The JoinStream needs to be polled + // again to check the progress of the second stream. + cx.waker().wake_by_ref(); + Poll::Ready(Some(item)) + } else { + Pin::new(&mut self.right).poll_next(cx) + } + } +} + +/// Combines multiple streams into a single stream of all their outputs. +/// +/// This macro is only usable inside of async functions, closures, and blocks. +/// +/// # Examples +/// +/// ``` +/// # futures::executor::block_on(async { +/// use async_macros::join_stream as join; +/// use futures::stream::{self, StreamExt}; +/// use futures::future::ready; +/// +/// let a = stream::once(ready(1u8)); +/// let b = stream::once(ready(2u8)); +/// let c = stream::once(ready(3u8)); +/// +/// let mut s = join!(a, b, c); +/// +/// assert_eq!(s.next().await, Some(1u8)); +/// assert_eq!(s.next().await, Some(2u8)); +/// assert_eq!(s.next().await, Some(3u8)); +/// assert_eq!(s.next().await, None); +/// # }); +/// ``` +#[macro_export] +macro_rules! join_stream { + ($stream1:ident, $stream2:ident, $($stream:ident),* $(,)?) => {{ + let joined = $crate::JoinStream::new($stream1, $stream2); + $( + let joined = $crate::JoinStream::new(joined, $stream); + )* + joined + }}; +} diff --git a/src/lib.rs b/src/lib.rs index 3a6a8a7..fb2e6bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,7 +3,6 @@ //! # Examples //! //! ``` -//! #![feature(async_await)] //! # futures::executor::block_on(async { //! use async_macros::join; //! use futures::future; @@ -21,6 +20,7 @@ #![cfg_attr(test, deny(warnings))] mod join; +mod join_stream; mod maybe_done; mod poll_fn; mod ready; @@ -28,6 +28,7 @@ mod select; mod try_join; mod try_select; +pub use join_stream::JoinStream; pub use maybe_done::MaybeDone; /// Helper re-exports for use in macros.