Skip to content

Commit 16a0866

Browse files
committed
Add StreamExt::buffer_unordered_adaptable
It might be desirable to have adaptative parallelism for the processing of buffered futures. An example use case would be to cope with the ever changing network conditions when sending requests to a remote host. One might collect metrics about the performance of the data transfer and adapt the parallelism parameters as necessary to maximize throughput.
1 parent 4fef913 commit 16a0866

File tree

2 files changed

+224
-0
lines changed

2 files changed

+224
-0
lines changed
Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,162 @@
1+
use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2+
use futures_core::future::Future;
3+
use futures_core::stream::{Stream, FusedStream};
4+
use futures_core::task::{Context, Poll};
5+
#[cfg(feature = "sink")]
6+
use futures_sink::Sink;
7+
use pin_utils::{unsafe_pinned, unsafe_unpinned};
8+
use core::fmt;
9+
use core::pin::Pin;
10+
use core::sync::atomic::{AtomicUsize, Ordering};
11+
use alloc::sync::Arc;
12+
13+
/// Stream for the [`buffer_unordered_adaptable`](super::StreamExt::buffer_unordered_adaptable)
14+
/// method.
15+
#[must_use = "streams do nothing unless polled"]
16+
pub struct BufferUnorderedAdaptable<St>
17+
where
18+
St: Stream,
19+
{
20+
stream: Fuse<St>,
21+
in_progress_queue: FuturesUnordered<St::Item>,
22+
max: Arc<AtomicUsize>,
23+
}
24+
25+
impl<St> Unpin for BufferUnorderedAdaptable<St>
26+
where
27+
St: Stream + Unpin,
28+
{}
29+
30+
impl<St> fmt::Debug for BufferUnorderedAdaptable<St>
31+
where
32+
St: Stream + fmt::Debug,
33+
{
34+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35+
f.debug_struct("BufferUnorderedAdaptable")
36+
.field("stream", &self.stream)
37+
.field("in_progress_queue", &self.in_progress_queue)
38+
.field("max", &self.max.load(Ordering::Relaxed))
39+
.finish()
40+
}
41+
}
42+
43+
impl<St> BufferUnorderedAdaptable<St>
44+
where
45+
St: Stream,
46+
St::Item: Future,
47+
{
48+
unsafe_pinned!(stream: Fuse<St>);
49+
unsafe_unpinned!(in_progress_queue: FuturesUnordered<St::Item>);
50+
51+
pub(super) fn new(stream: St, n: Arc<AtomicUsize>) -> BufferUnorderedAdaptable<St>
52+
where
53+
St: Stream,
54+
St::Item: Future,
55+
{
56+
BufferUnorderedAdaptable {
57+
stream: super::Fuse::new(stream),
58+
in_progress_queue: FuturesUnordered::new(),
59+
max: n,
60+
}
61+
}
62+
63+
/// Acquires a reference to the underlying stream that this combinator is
64+
/// pulling from.
65+
pub fn get_ref(&self) -> &St {
66+
self.stream.get_ref()
67+
}
68+
69+
/// Acquires a mutable reference to the underlying stream that this
70+
/// combinator is pulling from.
71+
///
72+
/// Note that care must be taken to avoid tampering with the state of the
73+
/// stream which may otherwise confuse this combinator.
74+
pub fn get_mut(&mut self) -> &mut St {
75+
self.stream.get_mut()
76+
}
77+
78+
/// Acquires a pinned mutable reference to the underlying stream that this
79+
/// combinator is pulling from.
80+
///
81+
/// Note that care must be taken to avoid tampering with the state of the
82+
/// stream which may otherwise confuse this combinator.
83+
pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
84+
self.stream().get_pin_mut()
85+
}
86+
87+
/// Consumes this combinator, returning the underlying stream.
88+
///
89+
/// Note that this may discard intermediate state of this combinator, so
90+
/// care should be taken to avoid losing resources when this is called.
91+
pub fn into_inner(self) -> St {
92+
self.stream.into_inner()
93+
}
94+
}
95+
96+
impl<St> Stream for BufferUnorderedAdaptable<St>
97+
where
98+
St: Stream,
99+
St::Item: Future,
100+
{
101+
type Item = <St::Item as Future>::Output;
102+
103+
fn poll_next(
104+
mut self: Pin<&mut Self>,
105+
cx: &mut Context<'_>,
106+
) -> Poll<Option<Self::Item>> {
107+
// First up, try to spawn off as many futures as possible by filling up
108+
// our queue of futures.
109+
while self.in_progress_queue.len() < self.max.load(Ordering::Relaxed) {
110+
match self.as_mut().stream().poll_next(cx) {
111+
Poll::Ready(Some(fut)) => self.as_mut().in_progress_queue().push(fut),
112+
Poll::Ready(None) | Poll::Pending => break,
113+
}
114+
}
115+
116+
// Attempt to pull the next value from the in_progress_queue
117+
match self.as_mut().in_progress_queue().poll_next_unpin(cx) {
118+
x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
119+
Poll::Ready(None) => {}
120+
}
121+
122+
// If more values are still coming from the stream, we're not done yet
123+
if self.stream.is_done() {
124+
Poll::Ready(None)
125+
} else {
126+
Poll::Pending
127+
}
128+
}
129+
130+
fn size_hint(&self) -> (usize, Option<usize>) {
131+
let queue_len = self.in_progress_queue.len();
132+
let (lower, upper) = self.stream.size_hint();
133+
let lower = lower.saturating_add(queue_len);
134+
let upper = match upper {
135+
Some(x) => x.checked_add(queue_len),
136+
None => None,
137+
};
138+
(lower, upper)
139+
}
140+
}
141+
142+
impl<St> FusedStream for BufferUnorderedAdaptable<St>
143+
where
144+
St: Stream,
145+
St::Item: Future,
146+
{
147+
fn is_terminated(&self) -> bool {
148+
self.in_progress_queue.is_terminated() && self.stream.is_terminated()
149+
}
150+
}
151+
152+
// Forwarding impl of Sink from the underlying stream
153+
#[cfg(feature = "sink")]
154+
impl<S, Item> Sink<Item> for BufferUnorderedAdaptable<S>
155+
where
156+
S: Stream + Sink<Item>,
157+
S::Item: Future,
158+
{
159+
type Error = S::Error;
160+
161+
delegate_sink!(stream, Item);
162+
}

futures-util/src/stream/stream/mod.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,16 @@ cfg_target_has_atomic! {
127127
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
128128
pub use self::buffer_unordered::BufferUnordered;
129129

130+
#[cfg(feature = "alloc")]
131+
mod buffer_unordered_adaptable;
132+
#[cfg(feature = "alloc")]
133+
use core::sync::atomic::AtomicUsize;
134+
#[cfg(feature = "alloc")]
135+
use alloc::sync::Arc;
136+
#[cfg(feature = "alloc")]
137+
#[allow(unreachable_pub)] // https://github.com/rust-lang/rust/issues/57411
138+
pub use self::buffer_unordered_adaptable::BufferUnorderedAdaptable;
139+
130140
#[cfg(feature = "alloc")]
131141
mod buffered;
132142
#[cfg(feature = "alloc")]
@@ -966,6 +976,58 @@ pub trait StreamExt: Stream {
966976
BufferUnordered::new(self, n)
967977
}
968978

979+
/// An adaptor for creating a dynamically adaptable buffered list of pending
980+
/// futures (unordered).
981+
///
982+
/// If this stream's item can be converted into a future, then this adaptor
983+
/// will buffer up to `n` futures and then return the outputs in the order
984+
/// in which they complete. No more than `n` futures will be buffered at
985+
/// any point in time, and less than `n` may also be buffered depending on
986+
/// the state of each future. `n` can be mutated to alter buffering
987+
/// behavior.
988+
///
989+
/// The returned stream will be a stream of each future's output.
990+
///
991+
/// This method is only available when the `std` or `alloc` feature of this
992+
/// library is activated, and it is activated by default.
993+
///
994+
/// # Examples
995+
///
996+
/// ```
997+
/// # futures::executor::block_on(async {
998+
/// use futures::channel::oneshot;
999+
/// use futures::stream::{self, StreamExt};
1000+
/// use core::sync::atomic::{AtomicUsize, Ordering};
1001+
/// use std::sync::Arc;
1002+
///
1003+
/// let (send_one, recv_one) = oneshot::channel();
1004+
/// let (send_two, recv_two) = oneshot::channel();
1005+
///
1006+
/// let stream_of_futures = stream::iter(vec![recv_one, recv_two]);
1007+
/// let atomic_n = Arc::new(AtomicUsize::new(10));
1008+
/// let mut buffered = stream_of_futures.buffer_unordered_adaptable(atomic_n.clone());
1009+
///
1010+
/// send_two.send(2i32)?;
1011+
/// assert_eq!(buffered.next().await, Some(Ok(2i32)));
1012+
///
1013+
/// atomic_n.store(20, Ordering::Relaxed);
1014+
///
1015+
/// send_one.send(1i32)?;
1016+
/// assert_eq!(buffered.next().await, Some(Ok(1i32)));
1017+
///
1018+
/// assert_eq!(buffered.next().await, None);
1019+
/// # Ok::<(), i32>(()) }).unwrap();
1020+
/// ```
1021+
#[cfg_attr(feature = "cfg-target-has-atomic", cfg(target_has_atomic = "ptr"))]
1022+
#[cfg(feature = "alloc")]
1023+
fn buffer_unordered_adaptable(self, n: Arc<AtomicUsize>) -> BufferUnorderedAdaptable<Self>
1024+
where
1025+
Self::Item: Future,
1026+
Self: Sized,
1027+
{
1028+
BufferUnorderedAdaptable::new(self, n)
1029+
}
1030+
9691031
/// An adapter for zipping two streams together.
9701032
///
9711033
/// The zipped stream waits for both streams to produce an item, and then

0 commit comments

Comments
 (0)