Skip to content

Commit 0ba9b9a

Browse files
committed
f
1 parent 2926f31 commit 0ba9b9a

File tree

1 file changed

+8
-1
lines changed

1 file changed

+8
-1
lines changed

futures-util/src/stream/stream/buffered.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ where
1818
stream: Fuse<St>,
1919
in_progress_queue: FuturesOrdered<St::Item>,
2020
max: usize,
21+
is_terminated: bool,
2122
}
2223

2324
impl<St> Unpin for Buffered<St>
@@ -36,6 +37,7 @@ where
3637
.field("stream", &self.stream)
3738
.field("in_progress_queue", &self.in_progress_queue)
3839
.field("max", &self.max)
40+
.field("is_terminated", &self.is_terminated)
3941
.finish()
4042
}
4143
}
@@ -46,7 +48,7 @@ where
4648
St::Item: Future,
4749
{
4850
fn is_terminated(&self) -> bool {
49-
self.stream.is_terminated() && self.in_progress_queue.is_empty()
51+
self.is_terminated
5052
}
5153
}
5254

@@ -57,12 +59,14 @@ where
5759
{
5860
unsafe_pinned!(stream: Fuse<St>);
5961
unsafe_unpinned!(in_progress_queue: FuturesOrdered<St::Item>);
62+
unsafe_unpinned!(is_terminated: bool);
6063

6164
pub(super) fn new(stream: St, n: usize) -> Buffered<St> {
6265
Buffered {
6366
stream: super::Fuse::new(stream),
6467
in_progress_queue: FuturesOrdered::new(),
6568
max: n,
69+
is_terminated: false,
6670
}
6771
}
6872

@@ -127,6 +131,9 @@ where
127131

128132
// If more values are still coming from the stream, we're not done yet
129133
if self.stream.is_done() {
134+
// We yield a `None`, so now the stream is considered terminated.
135+
*self.as_mut().is_terminated() = true;
136+
130137
Poll::Ready(None)
131138
} else {
132139
Poll::Pending

0 commit comments

Comments
 (0)