Skip to content

Commit a380a27

Browse files
author
boats
authored
Merge branch 'master' into consistent-async-args
2 parents 01ada35 + 7730f0d commit a380a27

File tree

12 files changed

+170
-29
lines changed

12 files changed

+170
-29
lines changed

futures-core/src/stream/mod.rs

+12
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ impl<'a, S: ?Sized + Stream> Stream for &'a mut S {
7171
}
7272

7373
if_std! {
74+
use Async;
75+
use never::Never;
76+
7477
impl<S: ?Sized + Stream> Stream for ::std::boxed::Box<S> {
7578
type Item = S::Item;
7679
type Error = S::Error;
@@ -98,4 +101,13 @@ if_std! {
98101
self.0.poll_next(cx)
99102
}
100103
}
104+
105+
impl<T> Stream for ::std::collections::VecDeque<T> {
106+
type Item = T;
107+
type Error = Never;
108+
109+
fn poll_next(&mut self, _cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
110+
Ok(Async::Ready(self.pop_front()))
111+
}
112+
}
101113
}

futures-sink/src/lib.rs

+24-2
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,28 @@ if_std! {
5151
}
5252
}
5353

54+
impl<T> Sink for ::std::collections::VecDeque<T> {
55+
type SinkItem = T;
56+
type SinkError = Never;
57+
58+
fn poll_ready(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
59+
Ok(Async::Ready(()))
60+
}
61+
62+
fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
63+
self.push_back(item);
64+
Ok(())
65+
}
66+
67+
fn poll_flush(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
68+
Ok(Async::Ready(()))
69+
}
70+
71+
fn poll_close(&mut self, _: &mut task::Context) -> Poll<(), Self::SinkError> {
72+
Ok(Async::Ready(()))
73+
}
74+
}
75+
5476
impl<S: ?Sized + Sink> Sink for ::std::boxed::Box<S> {
5577
type SinkItem = S::SinkItem;
5678
type SinkError = S::SinkError;
@@ -98,7 +120,7 @@ if_std! {
98120
/// required methods, and a host of default methods for working in a
99121
/// higher-level way. The `Sink::send_all` combinator is of particular
100122
/// importance: you can use it to send an entire stream to a sink, which is
101-
/// the simplest way to ultimately consume a sink.
123+
/// the simplest way to ultimately consume a stream.
102124
pub trait Sink {
103125
/// The type of value that the sink accepts.
104126
type SinkItem;
@@ -133,7 +155,7 @@ pub trait Sink {
133155
/// send**.
134156
///
135157
/// Implementations of `poll_ready` and `start_send` will usually involve
136-
/// flushing behind the scenes in order to make room for new messages.
158+
/// flushing behind the scenes in order to make room for new messages.
137159
/// It is only necessary to call `poll_flush` if you need to guarantee that
138160
/// *all* of the items placed into the `Sink` have been sent.
139161
///
+80
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,80 @@
1+
use core::fmt;
2+
3+
use futures_core::{task, Async, Future};
4+
use futures_sink::Sink;
5+
6+
#[derive(Debug)]
7+
enum State<F> where F: Future, <F as Future>::Item: Sink {
8+
Waiting(F),
9+
Ready(F::Item),
10+
Closed,
11+
}
12+
13+
/// Future for the `flatten_sink` combinator, flattening a
14+
/// future-of-a-sink to get just the result of the final sink as a sink.
15+
///
16+
/// This is created by the `Future::flatten_sink` method.
17+
pub struct FlattenSink<F> where F: Future, <F as Future>::Item: Sink {
18+
st: State<F>
19+
}
20+
21+
impl<F> fmt::Debug for FlattenSink<F>
22+
where F: Future + fmt::Debug,
23+
<F as Future>::Item: Sink<SinkError=F::Error> + fmt::Debug,
24+
{
25+
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
26+
fmt.debug_struct("FlattenStream")
27+
.field("state", &self.st)
28+
.finish()
29+
}
30+
}
31+
32+
impl<F> Sink for FlattenSink<F> where F: Future, <F as Future>::Item: Sink<SinkError=F::Error> {
33+
type SinkItem = <<F as Future>::Item as Sink>::SinkItem;
34+
type SinkError = <<F as Future>::Item as Sink>::SinkError;
35+
36+
fn poll_ready(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
37+
let mut resolved_stream = match self.st {
38+
State::Ready(ref mut s) => return s.poll_ready(cx),
39+
State::Waiting(ref mut f) => match f.poll(cx)? {
40+
Async::Pending => return Ok(Async::Pending),
41+
Async::Ready(s) => s,
42+
},
43+
State::Closed => panic!("poll_ready called after eof"),
44+
};
45+
let result = resolved_stream.poll_ready(cx);
46+
self.st = State::Ready(resolved_stream);
47+
result
48+
}
49+
50+
fn start_send(&mut self, item: Self::SinkItem) -> Result<(), Self::SinkError> {
51+
match self.st {
52+
State::Ready(ref mut s) => s.start_send(item),
53+
State::Waiting(_) => panic!("poll_ready not called first"),
54+
State::Closed => panic!("start_send called after eof"),
55+
}
56+
}
57+
58+
fn poll_flush(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
59+
match self.st {
60+
State::Ready(ref mut s) => s.poll_flush(cx),
61+
// if sink not yet resolved, nothing written ==> everything flushed
62+
State::Waiting(_) => Ok(Async::Ready(())),
63+
State::Closed => panic!("poll_flush called after eof"),
64+
}
65+
}
66+
67+
fn poll_close(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
68+
if let State::Ready(ref mut s) = self.st {
69+
try_ready!(s.poll_close(cx));
70+
}
71+
self.st = State::Closed;
72+
return Ok(Async::Ready(()));
73+
}
74+
}
75+
76+
pub fn new<F>(fut: F) -> FlattenSink<F> where F: Future, <F as Future>::Item: Sink {
77+
FlattenSink {
78+
st: State::Waiting(fut)
79+
}
80+
}

futures-util/src/future/join_all.rs

-19
Original file line numberDiff line numberDiff line change
@@ -143,22 +143,3 @@ impl<F: IntoFuture> FromIterator<F> for JoinAll<F::Future> {
143143
join_all(iter)
144144
}
145145
}
146-
147-
#[test]
148-
fn join_all_from_iter() {
149-
let f_ok = |a| {
150-
let r: Result<i32, ()> = Ok(a);
151-
r.into_future()
152-
};
153-
154-
pub fn assert_done<T, F>(_f: F, _result: Result<T::Item, T::Error>)
155-
where T: Future,
156-
T::Item: Eq + fmt::Debug,
157-
T::Error: Eq + fmt::Debug,
158-
F: FnOnce() -> T,
159-
{
160-
//FIXME: write a real test somehow somewhere
161-
}
162-
163-
assert_done(|| vec![f_ok(1), f_ok(2)].into_iter().collect::<JoinAll<_>>(), Ok(vec![1, 2]))
164-
}

futures-util/src/future/mod.rs

+19
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use core::result;
77

88
use futures_core::{Future, IntoFuture, Stream};
9+
use futures_sink::Sink;
910

1011
// Primitive futures
1112
mod empty;
@@ -20,6 +21,7 @@ pub use self::loop_fn::{loop_fn, Loop, LoopFn};
2021
// combinators
2122
mod and_then;
2223
mod flatten;
24+
mod flatten_sink;
2325
mod flatten_stream;
2426
mod fuse;
2527
mod into_stream;
@@ -39,6 +41,7 @@ mod chain;
3941

4042
pub use self::and_then::AndThen;
4143
pub use self::flatten::Flatten;
44+
pub use self::flatten_sink::FlattenSink;
4245
pub use self::flatten_stream::FlattenStream;
4346
pub use self::fuse::Fuse;
4447
pub use self::into_stream::IntoStream;
@@ -632,6 +635,22 @@ pub trait FutureExt: Future {
632635
_>(f)
633636
}
634637

638+
/// Flatten the execution of this future when the successful result of this
639+
/// future is a sink.
640+
///
641+
/// This can be useful when sink initialization is deferred, and it is
642+
/// convenient to work with that sink as if sink was available at the
643+
/// call site.
644+
///
645+
/// Note that this function consumes this future and returns a wrapped
646+
/// version of it.
647+
fn flatten_sink(self) -> FlattenSink<Self>
648+
where <Self as Future>::Item: Sink<SinkError=Self::Error>,
649+
Self: Sized
650+
{
651+
flatten_sink::new(self)
652+
}
653+
635654
/// Flatten the execution of this future when the successful result of this
636655
/// future is a stream.
637656
///

futures-util/src/future/shared.rs

+11-2
Original file line numberDiff line numberDiff line change
@@ -234,8 +234,17 @@ impl Wake for Notifier {
234234
}
235235
}
236236

237-
unsafe impl<F: Future> Sync for Inner<F> {}
238-
unsafe impl<F: Future> Send for Inner<F> {}
237+
unsafe impl<F> Sync for Inner<F>
238+
where F: Future + Send,
239+
F::Item: Send + Sync,
240+
F::Error: Send + Sync
241+
{}
242+
243+
unsafe impl<F> Send for Inner<F>
244+
where F: Future + Send,
245+
F::Item: Send + Sync,
246+
F::Error: Send + Sync
247+
{}
239248

240249
impl<F> fmt::Debug for Inner<F>
241250
where F: Future + fmt::Debug,

futures-util/src/stream/mod.rs

-3
Original file line numberDiff line numberDiff line change
@@ -881,9 +881,6 @@ pub trait StreamExt: Stream {
881881
/// stream will be polled in a round-robin fashion, and whenever a stream is
882882
/// ready to yield an item that item is yielded.
883883
///
884-
/// The `select` function is similar to `merge` except that it requires both
885-
/// streams to have the same item and error types.
886-
///
887884
/// Error are passed through from either stream.
888885
fn select<S>(self, other: S) -> Select<Self, S>
889886
where S: Stream<Item = Self::Item, Error = Self::Error>,

futures/tests/all.rs

+8
Original file line numberDiff line numberDiff line change
@@ -268,6 +268,14 @@ fn join_all_iter_lifetime() {
268268
assert_done(|| sizes(vec![&[1,2,3], &[], &[0]]), Ok(vec![3, 0, 1]));
269269
}
270270

271+
#[test]
272+
fn join_all_from_iter() {
273+
assert_done(
274+
|| vec![f_ok(1), f_ok(2)].into_iter().collect::<JoinAll<_>>(),
275+
Ok(vec![1, 2]),
276+
)
277+
}
278+
271279
#[test]
272280
fn select2() {
273281
assert_done(|| f_ok(2).select(empty()).then(unselect), Ok(2));

futures/tests/async_await/pinned.rs

+2
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,8 @@ fn main() {
9999
assert_eq!(block_on_stable(qux2(17)), Ok(17));
100100
assert_eq!(block_on(boxed(17)), Ok(17));
101101
assert_eq!(block_on(boxed_send(17)), Ok(17));
102+
assert_eq!(block_on(boxed_borrow(&17)), Ok(17));
103+
assert_eq!(block_on(boxed_send_borrow(&17)), Ok(17));
102104
assert_eq!(block_on_stable(uses_async_for()), Ok(vec![0, 1]));
103105
}
104106

futures/tests/future_flatten_stream.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ impl<T, E> Stream for PanickingStream<T, E> {
2929
type Item = T;
3030
type Error = E;
3131

32-
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
32+
fn poll_next(&mut self, _: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
3333
panic!()
3434
}
3535
}

futures/tests/select_all.rs

-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
extern crate futures;
22

33
use futures::executor::block_on;
4-
use futures::prelude::*;
54
use futures::future::{ok, select_all, err};
65

76
#[test]

futures/tests/sink.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ use std::sync::Arc;
55
use std::rc::Rc;
66
use std::cell::{Cell, RefCell};
77
use std::sync::atomic::{Ordering, AtomicBool};
8+
use std::collections::VecDeque;
89

910
use futures::prelude::*;
1011
use futures::future::ok;
@@ -26,6 +27,17 @@ fn vec_sink() {
2627
assert_done(move || v.flush(), Ok(vec![0, 1]));
2728
}
2829

30+
#[test]
31+
fn vecdeque_sink() {
32+
let mut deque = VecDeque::new();
33+
deque.start_send(2).unwrap();
34+
deque.start_send(3).unwrap();
35+
36+
assert_eq!(deque.pop_front(), Some(2));
37+
assert_eq!(deque.pop_front(), Some(3));
38+
assert_eq!(deque.pop_front(), None);
39+
}
40+
2941
#[test]
3042
fn send() {
3143
let v = Vec::new();
@@ -395,7 +407,7 @@ fn fanout_backpressure() {
395407
let sink = left_send.fanout(right_send);
396408

397409
let sink = block_on(StartSendFut::new(sink, 0)).unwrap();
398-
410+
399411
flag_cx(|flag, cx| {
400412
let mut task = sink.send(2);
401413
assert!(!flag.get());

0 commit comments

Comments
 (0)