Skip to content

Commit 94b508b

Browse files
olegnntaiki-e
authored andcommitted
Basic StreamExt::{flatten_unordered, flat_map_unordered} impls (#2083)
1 parent dca16fa commit 94b508b

File tree

4 files changed

+955
-2
lines changed

4 files changed

+955
-2
lines changed
Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
#![feature(test)]
2+
3+
extern crate test;
4+
use crate::test::Bencher;
5+
6+
use futures::channel::oneshot;
7+
use futures::executor::block_on;
8+
use futures::future::{self, FutureExt};
9+
use futures::stream::{self, StreamExt};
10+
use futures::task::Poll;
11+
use std::collections::VecDeque;
12+
use std::thread;
13+
14+
#[bench]
15+
fn oneshot_streams(b: &mut Bencher) {
16+
const STREAM_COUNT: usize = 10_000;
17+
const STREAM_ITEM_COUNT: usize = 1;
18+
19+
b.iter(|| {
20+
let mut txs = VecDeque::with_capacity(STREAM_COUNT);
21+
let mut rxs = Vec::new();
22+
23+
for _ in 0..STREAM_COUNT {
24+
let (tx, rx) = oneshot::channel();
25+
txs.push_back(tx);
26+
rxs.push(rx);
27+
}
28+
29+
thread::spawn(move || {
30+
let mut last = 1;
31+
while let Some(tx) = txs.pop_front() {
32+
let _ = tx.send(stream::iter(last..last + STREAM_ITEM_COUNT));
33+
last += STREAM_ITEM_COUNT;
34+
}
35+
});
36+
37+
let mut flatten = stream::unfold(rxs.into_iter(), |mut vals| {
38+
async {
39+
if let Some(next) = vals.next() {
40+
let val = next.await.unwrap();
41+
Some((val, vals))
42+
} else {
43+
None
44+
}
45+
}
46+
.boxed()
47+
})
48+
.flatten_unordered(None);
49+
50+
block_on(future::poll_fn(move |cx| {
51+
let mut count = 0;
52+
loop {
53+
match flatten.poll_next_unpin(cx) {
54+
Poll::Ready(None) => break,
55+
Poll::Ready(Some(_)) => {
56+
count += 1;
57+
}
58+
_ => {}
59+
}
60+
}
61+
assert_eq!(count, STREAM_COUNT * STREAM_ITEM_COUNT);
62+
63+
Poll::Ready(())
64+
}))
65+
});
66+
}

0 commit comments

Comments
 (0)