Skip to content

Commit b7af114

Browse files
committed
Reorganize the futures_core::task module
This commit is a large reorganization of the `task` module in accordance with RFC 2 which added an explicit `Context` argument to all futures-related functions. This argument is now explicitly constructed when necessary and otherwise passed around in implementations of `poll`.
1 parent 0cc6765 commit b7af114

File tree

25 files changed

+499
-989
lines changed

25 files changed

+499
-989
lines changed

futures-channel/benches/sync_mpsc.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,22 @@ fn notify_noop() -> NotifyHandle {
3131
#[bench]
3232
fn unbounded_1_tx(b: &mut Bencher) {
3333
b.iter(|| {
34-
let (tx, rx) = unbounded();
35-
36-
let mut rx = task::spawn(rx);
34+
let (tx, mut rx) = unbounded();
35+
let notify = || notify_noop().into();
36+
let mut map = task::LocalMap::new();
37+
let mut cx = task::Context::new(&mut map, 0, &notify);
3738

3839
// 1000 iterations to avoid measuring overhead of initialization
3940
// Result should be divided by 1000
4041
for i in 0..1000 {
4142

4243
// Poll, not ready, park
43-
assert_eq!(Ok(Async::Pending), rx.poll_stream_notify(&notify_noop(), 1));
44+
assert_eq!(Ok(Async::Pending), rx.poll(&mut cx));
4445

4546
UnboundedSender::unbounded_send(&tx, i).unwrap();
4647

4748
// Now poll ready
48-
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
49+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
4950
}
5051
})
5152
}
@@ -54,34 +55,39 @@ fn unbounded_1_tx(b: &mut Bencher) {
5455
#[bench]
5556
fn unbounded_100_tx(b: &mut Bencher) {
5657
b.iter(|| {
57-
let (tx, rx) = unbounded();
58-
59-
let mut rx = task::spawn(rx);
58+
let (tx, mut rx) = unbounded();
59+
let notify = || notify_noop().into();
60+
let mut map = task::LocalMap::new();
61+
let mut cx = task::Context::new(&mut map, 0, &notify);
6062

6163
let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
6264

6365
// 1000 send/recv operations total, result should be divided by 1000
6466
for _ in 0..10 {
6567
for i in 0..tx.len() {
66-
assert_eq!(Ok(Async::Pending), rx.poll_stream_notify(&notify_noop(), 1));
68+
assert_eq!(Ok(Async::Pending), rx.poll(&mut cx));
6769

6870
UnboundedSender::unbounded_send(&tx[i], i).unwrap();
6971

70-
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
72+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
7173
}
7274
}
7375
})
7476
}
7577

7678
#[bench]
7779
fn unbounded_uncontended(b: &mut Bencher) {
80+
let notify = || notify_noop().into();
81+
let mut map = task::LocalMap::new();
82+
let mut cx = task::Context::new(&mut map, 0, &notify);
83+
7884
b.iter(|| {
7985
let (tx, mut rx) = unbounded();
8086

8187
for i in 0..1000 {
8288
UnboundedSender::unbounded_send(&tx, i).expect("send");
8389
// No need to create a task, because poll is not going to park.
84-
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut task::Context));
90+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
8591
}
8692
})
8793
}
@@ -103,7 +109,7 @@ impl Stream for TestSender {
103109
Err(_) => panic!(),
104110
Ok(Ok(())) => {
105111
self.last += 1;
106-
assert_eq!(Ok(Async::Ready(())), self.tx.flush(&mut task::Context));
112+
assert_eq!(Ok(Async::Ready(())), self.tx.flush(cx));
107113
Ok(Async::Ready(Some(self.last)))
108114
}
109115
Ok(Err(_)) => {
@@ -117,20 +123,22 @@ impl Stream for TestSender {
117123
/// Single producers, single consumer
118124
#[bench]
119125
fn bounded_1_tx(b: &mut Bencher) {
126+
let notify = || notify_noop().into();
127+
let mut map = task::LocalMap::new();
128+
let mut cx = task::Context::new(&mut map, 0, &notify);
129+
120130
b.iter(|| {
121-
let (tx, rx) = channel(0);
131+
let (tx, mut rx) = channel(0);
122132

123-
let mut tx = task::spawn(TestSender {
133+
let mut tx = TestSender {
124134
tx: tx,
125135
last: 0,
126-
});
127-
128-
let mut rx = task::spawn(rx);
136+
};
129137

130138
for i in 0..1000 {
131-
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(&notify_noop(), 1));
132-
assert_eq!(Ok(Async::Pending), tx.poll_stream_notify(&notify_noop(), 1));
133-
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
139+
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll(&mut cx));
140+
assert_eq!(Ok(Async::Pending), tx.poll(&mut cx));
141+
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll(&mut cx));
134142
}
135143
})
136144
}
@@ -140,25 +148,26 @@ fn bounded_1_tx(b: &mut Bencher) {
140148
fn bounded_100_tx(b: &mut Bencher) {
141149
b.iter(|| {
142150
// Each sender can send one item after specified capacity
143-
let (tx, rx) = channel(0);
151+
let (tx, mut rx) = channel(0);
152+
let notify = || notify_noop().into();
153+
let mut map = task::LocalMap::new();
154+
let mut cx = task::Context::new(&mut map, 0, &notify);
144155

145156
let mut tx: Vec<_> = (0..100).map(|_| {
146-
task::spawn(TestSender {
157+
TestSender {
147158
tx: tx.clone(),
148159
last: 0
149-
})
160+
}
150161
}).collect();
151162

152-
let mut rx = task::spawn(rx);
153-
154163
for i in 0..10 {
155164
for j in 0..tx.len() {
156165
// Send an item
157-
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(&notify_noop(), 1));
166+
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll(&mut cx));
158167
// Then block
159-
assert_eq!(Ok(Async::Pending), tx[j].poll_stream_notify(&notify_noop(), 1));
168+
assert_eq!(Ok(Async::Pending), tx[j].poll(&mut cx));
160169
// Recv the item
161-
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
170+
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll(&mut cx));
162171
}
163172
}
164173
})

futures-channel/src/mpsc/mod.rs

Lines changed: 21 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ use std::sync::{Arc, Mutex};
7676
use std::thread;
7777
use std::usize;
7878

79-
use futures_core::task::{self, Task};
79+
use futures_core::task::{self, Waker};
8080
use futures_core::{Async, Poll, Stream};
8181

8282
use mpsc::queue::{Queue, PopResult};
@@ -268,7 +268,7 @@ struct State {
268268
#[derive(Debug)]
269269
struct ReceiverTask {
270270
unparked: bool,
271-
task: Option<Task>,
271+
task: Option<Waker>,
272272
}
273273

274274
// Returned from Receiver::try_park()
@@ -295,7 +295,7 @@ const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
295295
// Sent to the consumer to wake up blocked producers
296296
#[derive(Debug)]
297297
struct SenderTask {
298-
task: Option<Task>,
298+
task: Option<Waker>,
299299
is_parked: bool,
300300
}
301301

@@ -397,14 +397,14 @@ impl<T> Sender<T> {
397397
/// notified when the channel is no longer full.
398398
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
399399
// If the sender is currently blocked, reject the message
400-
if !self.poll_unparked(false).is_ready() {
400+
if !self.poll_unparked(None).is_ready() {
401401
return Err(TrySendError {
402402
kind: TrySendErrorKind::Full(msg),
403403
});
404404
}
405405

406406
// The channel has capacity to accept the message, so send it
407-
self.do_send(Some(msg), false)
407+
self.do_send(Some(msg), None)
408408
.map_err(|SendError(v)| {
409409
TrySendError {
410410
kind: TrySendErrorKind::Disconnected(v),
@@ -419,22 +419,24 @@ impl<T> Sender<T> {
419419
/// awoken when the channel is ready to receive more messages.
420420
///
421421
/// On successful completion, this function will return `Ok(Ok(()))`.
422-
pub fn start_send(&mut self, _cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
422+
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
423423
// If the sender is currently blocked, reject the message before doing
424424
// any work.
425-
if !self.poll_unparked(true).is_ready() {
425+
if !self.poll_unparked(Some(cx)).is_ready() {
426426
return Ok(Err(msg));
427427
}
428428

429429
// The channel has capacity to accept the message, so send it.
430-
self.do_send(Some(msg), true)?;
430+
self.do_send(Some(msg), Some(cx))?;
431431

432432
Ok(Ok(()))
433433
}
434434

435435
// Do the send without failing
436436
// None means close
437-
fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
437+
fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
438+
-> Result<(), SendError<T>>
439+
{
438440
// First, increment the number of messages contained by the channel.
439441
// This operation will also atomically determine if the sender task
440442
// should be parked.
@@ -469,7 +471,7 @@ impl<T> Sender<T> {
469471
// maintain internal consistency, a blank message is pushed onto the
470472
// parked task queue.
471473
if park_self {
472-
self.park(do_park);
474+
self.park(cx);
473475
}
474476

475477
self.queue_push_and_signal(msg);
@@ -573,14 +575,10 @@ impl<T> Sender<T> {
573575
}
574576
}
575577

576-
fn park(&mut self, can_park: bool) {
578+
fn park(&mut self, cx: Option<&mut task::Context>) {
577579
// TODO: clean up internal state if the task::current will fail
578580

579-
let task = if can_park {
580-
Some(task::current())
581-
} else {
582-
None
583-
};
581+
let task = cx.map(|cx| cx.waker());
584582

585583
{
586584
let mut sender = self.sender_task.lock().unwrap();
@@ -608,16 +606,16 @@ impl<T> Sender<T> {
608606
/// # Panics
609607
///
610608
/// This method will panic if called from outside the context of a task or future.
611-
pub fn poll_ready(&mut self, _cx: &mut task::Context) -> Poll<(), SendError<()>> {
609+
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError<()>> {
612610
let state = decode_state(self.inner.state.load(SeqCst));
613611
if !state.is_open {
614612
return Err(SendError(()));
615613
}
616614

617-
Ok(self.poll_unparked(true))
615+
Ok(self.poll_unparked(Some(cx)))
618616
}
619617

620-
fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
618+
fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> {
621619
// First check the `maybe_parked` variable. This avoids acquiring the
622620
// lock in most cases
623621
if self.maybe_parked {
@@ -635,11 +633,7 @@ impl<T> Sender<T> {
635633
//
636634
// Update the task in case the `Sender` has been moved to another
637635
// task
638-
task.task = if do_park {
639-
Some(task::current())
640-
} else {
641-
None
642-
};
636+
task.task = cx.map(|cx| cx.waker());
643637

644638
Async::Pending
645639
} else {
@@ -717,7 +711,7 @@ impl<T> Drop for Sender<T> {
717711
let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
718712

719713
if prev == 1 {
720-
let _ = self.do_send(None, false);
714+
let _ = self.do_send(None, None);
721715
}
722716
}
723717
}
@@ -815,7 +809,7 @@ impl<T> Receiver<T> {
815809
}
816810

817811
// Try to park the receiver task
818-
fn try_park(&self, _cx: &mut task::Context) -> TryPark {
812+
fn try_park(&self, cx: &mut task::Context) -> TryPark {
819813
let curr = self.inner.state.load(SeqCst);
820814
let state = decode_state(curr);
821815

@@ -833,7 +827,7 @@ impl<T> Receiver<T> {
833827
return TryPark::NotEmpty;
834828
}
835829

836-
recv_task.task = Some(task::current());
830+
recv_task.task = Some(cx.waker());
837831
TryPark::Parked
838832
}
839833

futures-channel/src/oneshot.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ use std::error::Error;
77
use std::fmt;
88

99
use futures_core::{Future, Poll, Async};
10-
use futures_core::task::{self, Task};
10+
use futures_core::task::{self, Waker};
1111

1212
use lock::Lock;
1313

@@ -60,11 +60,11 @@ struct Inner<T> {
6060
/// the `Lock` here, unlike in `data` above, is important to resolve races.
6161
/// Both the `Receiver` and the `Sender` halves understand that if they
6262
/// can't acquire the lock then some important interference is happening.
63-
rx_task: Lock<Option<Task>>,
63+
rx_task: Lock<Option<Waker>>,
6464

6565
/// Like `rx_task` above, except for the task blocked in
6666
/// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
67-
tx_task: Lock<Option<Task>>,
67+
tx_task: Lock<Option<Waker>>,
6868
}
6969

7070
/// Creates a new futures-aware, one-shot channel.
@@ -158,7 +158,7 @@ impl<T> Inner<T> {
158158
}
159159
}
160160

161-
fn poll_cancel(&self, _cx: &mut task::Context) -> Poll<(), ()> {
161+
fn poll_cancel(&self, cx: &mut task::Context) -> Poll<(), ()> {
162162
// Fast path up first, just read the flag and see if our other half is
163163
// gone. This flag is set both in our destructor and the oneshot
164164
// destructor, but our destructor hasn't run yet so if it's set then the
@@ -180,7 +180,7 @@ impl<T> Inner<T> {
180180
// may have been dropped. The first thing it does is set the flag, and
181181
// if it fails to acquire the lock it assumes that we'll see the flag
182182
// later on. So... we then try to see the flag later on!
183-
let handle = task::current();
183+
let handle = cx.waker();
184184
match self.tx_task.try_lock() {
185185
Some(mut p) => *p = Some(handle),
186186
None => return Ok(Async::Ready(())),
@@ -238,7 +238,7 @@ impl<T> Inner<T> {
238238
}
239239
}
240240

241-
fn recv(&self, _cx: &mut task::Context) -> Poll<T, Canceled> {
241+
fn recv(&self, cx: &mut task::Context) -> Poll<T, Canceled> {
242242
let mut done = false;
243243

244244
// Check to see if some data has arrived. If it hasn't then we need to
@@ -251,7 +251,7 @@ impl<T> Inner<T> {
251251
if self.complete.load(SeqCst) {
252252
done = true;
253253
} else {
254-
let task = task::current();
254+
let task = cx.waker();
255255
match self.rx_task.try_lock() {
256256
Some(mut slot) => *slot = Some(task),
257257
None => done = true,

futures-channel/tests/channel.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ use std::sync::atomic::*;
66
use std::thread;
77

88
use futures::prelude::*;
9-
use futures::future::result;
10-
use futures::task;
9+
use futures::future::{result, poll_fn};
1110
use futures_executor::current_thread::run;
1211
use futures_channel::mpsc;
1312

@@ -42,10 +41,10 @@ fn sequence() {
4241
fn drop_sender() {
4342
let (tx, mut rx) = mpsc::channel::<u32>(1);
4443
drop(tx);
45-
match rx.poll(&mut task::Context) {
46-
Ok(Async::Ready(None)) => {}
47-
_ => panic!("channel should be done"),
48-
}
44+
let f = poll_fn(|cx| {
45+
rx.poll(cx)
46+
});
47+
assert_eq!(run(|c| c.block_on(f)).unwrap(), None)
4948
}
5049

5150
#[test]

0 commit comments

Comments
 (0)