Skip to content

Commit 02d00cc

Browse files
committed
Switch to stable futures
1 parent 01bce16 commit 02d00cc

40 files changed

+1081
-1140
lines changed

core/Cargo.toml

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ log = "0.4"
2121
multiaddr = { package = "parity-multiaddr", version = "0.5.0", path = "../misc/multiaddr" }
2222
multihash = { package = "parity-multihash", version = "0.1.0", path = "../misc/multihash" }
2323
multistream-select = { version = "0.4.0", path = "../misc/multistream-select" }
24-
futures = "0.1"
24+
futures-preview = { version = "0.3.0-alpha.17", features = ["compat"] }
2525
parking_lot = "0.8"
2626
protobuf = "2.3"
2727
quick-error = "1.2"
@@ -30,8 +30,6 @@ rw-stream-sink = { version = "0.1.1", path = "../misc/rw-stream-sink" }
3030
libsecp256k1 = { version = "0.2.2", optional = true }
3131
sha2 = "0.8.0"
3232
smallvec = "0.6"
33-
tokio-executor = "0.1.4"
34-
tokio-io = "0.1"
3533
wasm-timer = "0.1"
3634
unsigned-varint = "0.2"
3735
void = "1"

core/src/either.rs

Lines changed: 131 additions & 107 deletions
Large diffs are not rendered by default.

core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
3838
/// Multi-address re-export.
3939
pub use multiaddr;
40-
pub use multistream_select::Negotiated;
40+
pub type Negotiated<T> = futures::compat::Compat01As03<multistream_select::Negotiated<T>>;
4141

4242
mod keys_proto;
4343
mod peer_id;

core/src/muxing.rs

Lines changed: 117 additions & 143 deletions
Large diffs are not rendered by default.

core/src/muxing/singleton.rs

Lines changed: 26 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,9 @@
1919
// DEALINGS IN THE SOFTWARE.
2020

2121
use crate::{Endpoint, muxing::StreamMuxer};
22-
use futures::prelude::*;
22+
use futures::{prelude::*, io::Initializer};
2323
use parking_lot::Mutex;
24-
use std::{io, sync::atomic::{AtomicBool, Ordering}};
25-
use tokio_io::{AsyncRead, AsyncWrite};
24+
use std::{io, pin::Pin, sync::atomic::{AtomicBool, Ordering}, task::Context, task::Poll};
2625

2726
/// Implementation of `StreamMuxer` that allows only one substream on top of a connection,
2827
/// yielding the connection itself.
@@ -62,67 +61,67 @@ pub struct OutboundSubstream {}
6261

6362
impl<TSocket> StreamMuxer for SingletonMuxer<TSocket>
6463
where
65-
TSocket: AsyncRead + AsyncWrite,
64+
TSocket: AsyncRead + AsyncWrite + Unpin,
6665
{
6766
type Substream = Substream;
6867
type OutboundSubstream = OutboundSubstream;
6968
type Error = io::Error;
7069

71-
fn poll_inbound(&self) -> Poll<Self::Substream, io::Error> {
70+
fn poll_inbound(&self) -> Poll<Result<Self::Substream, io::Error>> {
7271
match self.endpoint {
73-
Endpoint::Dialer => return Ok(Async::NotReady),
72+
Endpoint::Dialer => return Poll::Pending,
7473
Endpoint::Listener => {}
7574
}
7675

7776
if !self.substream_extracted.swap(true, Ordering::Relaxed) {
78-
Ok(Async::Ready(Substream {}))
77+
Poll::Ready(Ok(Substream {}))
7978
} else {
80-
Ok(Async::NotReady)
79+
Poll::Pending
8180
}
8281
}
8382

8483
fn open_outbound(&self) -> Self::OutboundSubstream {
8584
OutboundSubstream {}
8685
}
8786

88-
fn poll_outbound(&self, _: &mut Self::OutboundSubstream) -> Poll<Self::Substream, io::Error> {
87+
fn poll_outbound(&self, _: &mut Self::OutboundSubstream) -> Poll<Result<Self::Substream, io::Error>> {
8988
match self.endpoint {
90-
Endpoint::Listener => return Ok(Async::NotReady),
89+
Endpoint::Listener => return Poll::Pending,
9190
Endpoint::Dialer => {}
9291
}
9392

9493
if !self.substream_extracted.swap(true, Ordering::Relaxed) {
95-
Ok(Async::Ready(Substream {}))
94+
Poll::Ready(Ok(Substream {}))
9695
} else {
97-
Ok(Async::NotReady)
96+
Poll::Pending
9897
}
9998
}
10099

101100
fn destroy_outbound(&self, _: Self::OutboundSubstream) {
102101
}
103102

104-
unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool {
105-
self.inner.lock().prepare_uninitialized_buffer(buf)
103+
unsafe fn initializer(&self) -> Initializer {
104+
self.inner.lock().initializer()
106105
}
107106

108-
fn read_substream(&self, _: &mut Self::Substream, buf: &mut [u8]) -> Poll<usize, io::Error> {
109-
let res = self.inner.lock().poll_read(buf);
110-
if let Ok(Async::Ready(_)) = res {
107+
fn read_substream(&self, cx: &mut Context, _: &mut Self::Substream, buf: &mut [u8]) -> Poll<Result<usize, io::Error>> {
108+
let res = AsyncRead::poll_read(Pin::new(&mut *self.inner.lock()), cx, buf);
109+
if let Poll::Ready(Ok(_)) = res {
111110
self.remote_acknowledged.store(true, Ordering::Release);
112111
}
113112
res
114113
}
115114

116-
fn write_substream(&self, _: &mut Self::Substream, buf: &[u8]) -> Poll<usize, io::Error> {
117-
self.inner.lock().poll_write(buf)
115+
fn write_substream(&self, cx: &mut Context, _: &mut Self::Substream, buf: &[u8]) -> Poll<Result<usize, io::Error>> {
116+
AsyncWrite::poll_write(Pin::new(&mut *self.inner.lock()), cx, buf)
118117
}
119118

120-
fn flush_substream(&self, _: &mut Self::Substream) -> Poll<(), io::Error> {
121-
self.inner.lock().poll_flush()
119+
fn flush_substream(&self, cx: &mut Context, _: &mut Self::Substream) -> Poll<Result<(), io::Error>> {
120+
AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx)
122121
}
123122

124-
fn shutdown_substream(&self, _: &mut Self::Substream) -> Poll<(), io::Error> {
125-
self.inner.lock().shutdown()
123+
fn shutdown_substream(&self, cx: &mut Context, _: &mut Self::Substream) -> Poll<Result<(), io::Error>> {
124+
AsyncWrite::poll_close(Pin::new(&mut *self.inner.lock()), cx)
126125
}
127126

128127
fn destroy_substream(&self, _: Self::Substream) {
@@ -132,12 +131,12 @@ where
132131
self.remote_acknowledged.load(Ordering::Acquire)
133132
}
134133

135-
fn close(&self) -> Poll<(), io::Error> {
134+
fn close(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
136135
// The `StreamMuxer` trait requires that `close()` implies `flush_all()`.
137-
self.flush_all()
136+
self.flush_all(cx)
138137
}
139138

140-
fn flush_all(&self) -> Poll<(), io::Error> {
141-
self.inner.lock().poll_flush()
139+
fn flush_all(&self, cx: &mut Context) -> Poll<Result<(), io::Error>> {
140+
AsyncWrite::poll_flush(Pin::new(&mut *self.inner.lock()), cx)
142141
}
143142
}

core/src/nodes/collection.rs

Lines changed: 24 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use crate::{
2929
};
3030
use fnv::FnvHashMap;
3131
use futures::prelude::*;
32-
use std::{error, fmt, hash::Hash, mem};
32+
use std::{error, fmt, hash::Hash, mem, pin::Pin, task::Context, task::Poll};
3333

3434
pub use crate::nodes::tasks::StartTakeOver;
3535

@@ -323,7 +323,7 @@ where
323323
pub fn add_reach_attempt<TFut, TMuxer>(&mut self, future: TFut, handler: THandler)
324324
-> ReachAttemptId
325325
where
326-
TFut: Future<Item = (TConnInfo, TMuxer), Error = TReachErr> + Send + 'static,
326+
TFut: Future<Output = Result<(TConnInfo, TMuxer), TReachErr>> + Unpin + Send + 'static,
327327
THandler: IntoNodeHandler<TConnInfo> + Send + 'static,
328328
THandler::Handler: NodeHandler<Substream = Substream<TMuxer>, InEvent = TInEvent, OutEvent = TOutEvent, Error = THandlerErr> + Send + 'static,
329329
<THandler::Handler as NodeHandler>::OutboundOpenInfo: Send + 'static,
@@ -358,17 +358,19 @@ where
358358
}
359359

360360
/// Sends an event to all nodes.
361-
#[must_use]
362-
pub fn start_broadcast(&mut self, event: &TInEvent) -> AsyncSink<()>
361+
///
362+
/// Must be called only after a successful call to `poll_ready_broadcast`.
363+
pub fn start_broadcast(&mut self, event: &TInEvent)
363364
where
364365
TInEvent: Clone
365366
{
366367
self.inner.start_broadcast(event)
367368
}
368369

370+
/// Wait until we have enough room in senders to broadcast an event.
369371
#[must_use]
370-
pub fn complete_broadcast(&mut self) -> Async<()> {
371-
self.inner.complete_broadcast()
372+
pub fn poll_ready_broadcast(&mut self, cx: &mut Context) -> Poll<()> {
373+
self.inner.poll_ready_broadcast(cx)
372374
}
373375

374376
/// Adds an existing connection to a node to the collection.
@@ -447,13 +449,13 @@ where
447449
/// > **Note**: we use a regular `poll` method instead of implementing `Stream` in order to
448450
/// > remove the `Err` variant, but also because we want the `CollectionStream` to stay
449451
/// > borrowed if necessary.
450-
pub fn poll(&mut self) -> Async<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>>
452+
pub fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<CollectionEvent<'_, TInEvent, TOutEvent, THandler, TReachErr, THandlerErr, TUserData, TConnInfo, TPeerId>>
451453
where
452454
TConnInfo: Clone, // TODO: Clone shouldn't be necessary
453455
{
454-
let item = match self.inner.poll() {
455-
Async::Ready(item) => item,
456-
Async::NotReady => return Async::NotReady,
456+
let item = match self.inner.poll(cx) {
457+
Poll::Ready(item) => item,
458+
Poll::Pending => return Poll::Pending,
457459
};
458460

459461
match item {
@@ -463,7 +465,7 @@ where
463465

464466
match (user_data, result, handler) {
465467
(TaskState::Pending, tasks::Error::Reach(err), Some(handler)) => {
466-
Async::Ready(CollectionEvent::ReachError {
468+
Poll::Ready(CollectionEvent::ReachError {
467469
id: ReachAttemptId(id),
468470
error: err,
469471
handler,
@@ -482,7 +484,7 @@ where
482484
debug_assert!(_handler.is_none());
483485
let _node_task_id = self.nodes.remove(conn_info.peer_id());
484486
debug_assert_eq!(_node_task_id, Some(id));
485-
Async::Ready(CollectionEvent::NodeClosed {
487+
Poll::Ready(CollectionEvent::NodeClosed {
486488
conn_info,
487489
error: err,
488490
user_data,
@@ -497,8 +499,8 @@ where
497499
tasks::Event::NodeReached { task, conn_info } => {
498500
let id = task.id();
499501
drop(task);
500-
Async::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
501-
parent: self,
502+
Poll::Ready(CollectionEvent::NodeReached(CollectionReachEvent {
503+
parent: &mut *self,
502504
id,
503505
conn_info: Some(conn_info),
504506
}))
@@ -512,7 +514,7 @@ where
512514
self.tasks is switched to the Connected state; QED"),
513515
};
514516
drop(task);
515-
Async::Ready(CollectionEvent::NodeEvent {
517+
Poll::Ready(CollectionEvent::NodeEvent {
516518
// TODO: normally we'd build a `PeerMut` manually here, but the borrow checker
517519
// doesn't like it
518520
peer: self.peer_mut(&conn_info.peer_id())
@@ -616,14 +618,15 @@ where
616618
}
617619
}
618620

619-
/// Sends an event to the given node.
620-
pub fn start_send_event(&mut self, event: TInEvent) -> StartSend<TInEvent, ()> {
621+
/// Begin sending an event to the given node. Must be called only after a successful call to
622+
/// `poll_ready_event`.
623+
pub fn start_send_event(&mut self, event: TInEvent) {
621624
self.inner.start_send_event(event)
622625
}
623626

624-
/// Complete sending an event message initiated by `start_send_event`.
625-
pub fn complete_send_event(&mut self) -> Poll<(), ()> {
626-
self.inner.complete_send_event()
627+
/// Make sure we are ready to accept an event to be sent with `start_send_event`.
628+
pub fn poll_ready_event(&mut self, cx: &mut Context) -> Poll<()> {
629+
self.inner.poll_ready_event(cx)
627630
}
628631

629632
/// Closes the connections to this node. Returns the user data.
@@ -664,7 +667,7 @@ where
664667
}
665668

666669
/// Complete a take over initiated by `start_take_over`.
667-
pub fn complete_take_over(&mut self) -> Poll<(), ()> {
670+
pub fn complete_take_over(&mut self) -> Poll<()> {
668671
self.inner.complete_take_over()
669672
}
670673
}

core/src/nodes/collection/tests.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -75,14 +75,14 @@ fn collection_stream_reaches_the_nodes() {
7575
poll_count += 1;
7676
let event = cs.poll();
7777
match poll_count {
78-
1 => assert_matches!(event, Async::NotReady),
78+
1 => assert_matches!(event, Poll::Pending),
7979
2 => {
8080
assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(_)));
8181
return Ok(Async::Ready(())); // stop
8282
}
8383
_ => unreachable!()
8484
}
85-
Ok(Async::NotReady)
85+
Poll::Pending
8686
});
8787
rt.block_on(fut).unwrap();
8888
}
@@ -102,8 +102,8 @@ fn accepting_a_node_yields_new_entry() {
102102
let event = cs.poll();
103103
match poll_count {
104104
1 => {
105-
assert_matches!(event, Async::NotReady);
106-
return Ok(Async::NotReady)
105+
assert_matches!(event, Poll::Pending);
106+
return Poll::Pending
107107
}
108108
2 => {
109109
assert_matches!(event, Async::Ready(CollectionEvent::NodeReached(reach_ev)) => {
@@ -150,7 +150,7 @@ fn events_in_a_node_reaches_the_collection_stream() {
150150
let cs_fut = cs.clone();
151151
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
152152
let mut cs = cs_fut.lock();
153-
assert_matches!(cs.poll(), Async::NotReady);
153+
assert_matches!(cs.poll(), Poll::Pending);
154154
Ok(Async::Ready(()))
155155
})).expect("tokio works");
156156

@@ -246,7 +246,7 @@ fn task_closed_with_error_while_task_is_pending_yields_reach_error() {
246246
let cs_fut = cs.clone();
247247
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
248248
let mut cs = cs_fut.lock();
249-
assert_matches!(cs.poll(), Async::NotReady);
249+
assert_matches!(cs.poll(), Poll::Pending);
250250
Ok(Async::Ready(()))
251251
})).expect("tokio works");
252252

@@ -289,7 +289,7 @@ fn task_closed_with_error_when_task_is_connected_yields_node_error() {
289289
let cs_fut = cs.clone();
290290
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
291291
let mut cs = cs_fut.lock();
292-
assert_matches!(cs.poll(), Async::NotReady);
292+
assert_matches!(cs.poll(), Poll::Pending);
293293
// send an event so the Handler errors in two polls
294294
Ok(cs.complete_broadcast())
295295
})).expect("tokio works");
@@ -351,7 +351,7 @@ fn interrupting_an_established_connection_is_err() {
351351
let cs_fut = cs.clone();
352352
rt.block_on(future::poll_fn(move || -> Poll<_, ()> {
353353
let mut cs = cs_fut.lock();
354-
assert_matches!(cs.poll(), Async::NotReady);
354+
assert_matches!(cs.poll(), Poll::Pending);
355355
// send an event so the Handler errors in two polls
356356
Ok(Async::Ready(()))
357357
})).expect("tokio works");

0 commit comments

Comments
 (0)