Skip to content

Commit cc18ba0

Browse files
committed
Reorganize the futures_core::task module
This commit is a large reorganization of the `task` module in accordance with RFC 2 which added an explicit `Context` argument to all futures-related functions. This argument is now explicitly constructed when necessary and otherwise passed around in implementations of `poll`.
1 parent 0cc6765 commit cc18ba0

File tree

25 files changed

+460
-950
lines changed

25 files changed

+460
-950
lines changed

futures-channel/benches/sync_mpsc.rs

Lines changed: 37 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -31,21 +31,22 @@ fn notify_noop() -> NotifyHandle {
3131
#[bench]
3232
fn unbounded_1_tx(b: &mut Bencher) {
3333
b.iter(|| {
34-
let (tx, rx) = unbounded();
35-
36-
let mut rx = task::spawn(rx);
34+
let (tx, mut rx) = unbounded();
35+
let notify = || notify_noop().into();
36+
let mut map = task::LocalMap::new();
37+
let mut cx = task::Context::new(&mut map, 0, &notify);
3738

3839
// 1000 iterations to avoid measuring overhead of initialization
3940
// Result should be divided by 1000
4041
for i in 0..1000 {
4142

4243
// Poll, not ready, park
43-
assert_eq!(Ok(Async::Pending), rx.poll_stream_notify(&notify_noop(), 1));
44+
assert_eq!(Ok(Async::Pending), rx.poll(&mut cx));
4445

4546
UnboundedSender::unbounded_send(&tx, i).unwrap();
4647

4748
// Now poll ready
48-
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
49+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
4950
}
5051
})
5152
}
@@ -54,34 +55,39 @@ fn unbounded_1_tx(b: &mut Bencher) {
5455
#[bench]
5556
fn unbounded_100_tx(b: &mut Bencher) {
5657
b.iter(|| {
57-
let (tx, rx) = unbounded();
58-
59-
let mut rx = task::spawn(rx);
58+
let (tx, mut rx) = unbounded();
59+
let notify = || notify_noop().into();
60+
let mut map = task::LocalMap::new();
61+
let mut cx = task::Context::new(&mut map, 0, &notify);
6062

6163
let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();
6264

6365
// 1000 send/recv operations total, result should be divided by 1000
6466
for _ in 0..10 {
6567
for i in 0..tx.len() {
66-
assert_eq!(Ok(Async::Pending), rx.poll_stream_notify(&notify_noop(), 1));
68+
assert_eq!(Ok(Async::Pending), rx.poll(&mut cx));
6769

6870
UnboundedSender::unbounded_send(&tx[i], i).unwrap();
6971

70-
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
72+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
7173
}
7274
}
7375
})
7476
}
7577

7678
#[bench]
7779
fn unbounded_uncontended(b: &mut Bencher) {
80+
let notify = || notify_noop().into();
81+
let mut map = task::LocalMap::new();
82+
let mut cx = task::Context::new(&mut map, 0, &notify);
83+
7884
b.iter(|| {
7985
let (tx, mut rx) = unbounded();
8086

8187
for i in 0..1000 {
8288
UnboundedSender::unbounded_send(&tx, i).expect("send");
8389
// No need to create a task, because poll is not going to park.
84-
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut task::Context));
90+
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
8591
}
8692
})
8793
}
@@ -103,7 +109,7 @@ impl Stream for TestSender {
103109
Err(_) => panic!(),
104110
Ok(Ok(())) => {
105111
self.last += 1;
106-
assert_eq!(Ok(Async::Ready(())), self.tx.flush(&mut task::Context));
112+
assert_eq!(Ok(Async::Ready(())), self.tx.flush(cx));
107113
Ok(Async::Ready(Some(self.last)))
108114
}
109115
Ok(Err(_)) => {
@@ -117,20 +123,22 @@ impl Stream for TestSender {
117123
/// Single producers, single consumer
118124
#[bench]
119125
fn bounded_1_tx(b: &mut Bencher) {
126+
let notify = || notify_noop().into();
127+
let mut map = task::LocalMap::new();
128+
let mut cx = task::Context::new(&mut map, 0, &notify);
129+
120130
b.iter(|| {
121-
let (tx, rx) = channel(0);
131+
let (tx, mut rx) = channel(0);
122132

123-
let mut tx = task::spawn(TestSender {
133+
let mut tx = TestSender {
124134
tx: tx,
125135
last: 0,
126-
});
127-
128-
let mut rx = task::spawn(rx);
136+
};
129137

130138
for i in 0..1000 {
131-
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(&notify_noop(), 1));
132-
assert_eq!(Ok(Async::Pending), tx.poll_stream_notify(&notify_noop(), 1));
133-
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
139+
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll(&mut cx));
140+
assert_eq!(Ok(Async::Pending), tx.poll(&mut cx));
141+
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll(&mut cx));
134142
}
135143
})
136144
}
@@ -140,25 +148,26 @@ fn bounded_1_tx(b: &mut Bencher) {
140148
fn bounded_100_tx(b: &mut Bencher) {
141149
b.iter(|| {
142150
// Each sender can send one item after specified capacity
143-
let (tx, rx) = channel(0);
151+
let (tx, mut rx) = channel(0);
152+
let notify = || notify_noop().into();
153+
let mut map = task::LocalMap::new();
154+
let mut cx = task::Context::new(&mut map, 0, &notify);
144155

145156
let mut tx: Vec<_> = (0..100).map(|_| {
146-
task::spawn(TestSender {
157+
TestSender {
147158
tx: tx.clone(),
148159
last: 0
149-
})
160+
}
150161
}).collect();
151162

152-
let mut rx = task::spawn(rx);
153-
154163
for i in 0..10 {
155164
for j in 0..tx.len() {
156165
// Send an item
157-
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(&notify_noop(), 1));
166+
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll(&mut cx));
158167
// Then block
159-
assert_eq!(Ok(Async::Pending), tx[j].poll_stream_notify(&notify_noop(), 1));
168+
assert_eq!(Ok(Async::Pending), tx[j].poll(&mut cx));
160169
// Recv the item
161-
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
170+
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll(&mut cx));
162171
}
163172
}
164173
})

futures-channel/src/mpsc/mod.rs

Lines changed: 18 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -397,14 +397,14 @@ impl<T> Sender<T> {
397397
/// notified when the channel is no longer full.
398398
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
399399
// If the sender is currently blocked, reject the message
400-
if !self.poll_unparked(false).is_ready() {
400+
if !self.poll_unparked(None).is_ready() {
401401
return Err(TrySendError {
402402
kind: TrySendErrorKind::Full(msg),
403403
});
404404
}
405405

406406
// The channel has capacity to accept the message, so send it
407-
self.do_send(Some(msg), false)
407+
self.do_send(Some(msg), None)
408408
.map_err(|SendError(v)| {
409409
TrySendError {
410410
kind: TrySendErrorKind::Disconnected(v),
@@ -419,22 +419,24 @@ impl<T> Sender<T> {
419419
/// awoken when the channel is ready to receive more messages.
420420
///
421421
/// On successful completion, this function will return `Ok(Ok(()))`.
422-
pub fn start_send(&mut self, _cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
422+
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
423423
// If the sender is currently blocked, reject the message before doing
424424
// any work.
425-
if !self.poll_unparked(true).is_ready() {
425+
if !self.poll_unparked(Some(cx)).is_ready() {
426426
return Ok(Err(msg));
427427
}
428428

429429
// The channel has capacity to accept the message, so send it.
430-
self.do_send(Some(msg), true)?;
430+
self.do_send(Some(msg), Some(cx))?;
431431

432432
Ok(Ok(()))
433433
}
434434

435435
// Do the send without failing
436436
// None means close
437-
fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
437+
fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
438+
-> Result<(), SendError<T>>
439+
{
438440
// First, increment the number of messages contained by the channel.
439441
// This operation will also atomically determine if the sender task
440442
// should be parked.
@@ -469,7 +471,7 @@ impl<T> Sender<T> {
469471
// maintain internal consistency, a blank message is pushed onto the
470472
// parked task queue.
471473
if park_self {
472-
self.park(do_park);
474+
self.park(cx);
473475
}
474476

475477
self.queue_push_and_signal(msg);
@@ -573,14 +575,10 @@ impl<T> Sender<T> {
573575
}
574576
}
575577

576-
fn park(&mut self, can_park: bool) {
578+
fn park(&mut self, cx: Option<&mut task::Context>) {
577579
// TODO: clean up internal state if the task::current will fail
578580

579-
let task = if can_park {
580-
Some(task::current())
581-
} else {
582-
None
583-
};
581+
let task = cx.map(|cx| cx.waker());
584582

585583
{
586584
let mut sender = self.sender_task.lock().unwrap();
@@ -608,16 +606,16 @@ impl<T> Sender<T> {
608606
/// # Panics
609607
///
610608
/// This method will panic if called from outside the context of a task or future.
611-
pub fn poll_ready(&mut self, _cx: &mut task::Context) -> Poll<(), SendError<()>> {
609+
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError<()>> {
612610
let state = decode_state(self.inner.state.load(SeqCst));
613611
if !state.is_open {
614612
return Err(SendError(()));
615613
}
616614

617-
Ok(self.poll_unparked(true))
615+
Ok(self.poll_unparked(Some(cx)))
618616
}
619617

620-
fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
618+
fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> {
621619
// First check the `maybe_parked` variable. This avoids acquiring the
622620
// lock in most cases
623621
if self.maybe_parked {
@@ -635,11 +633,7 @@ impl<T> Sender<T> {
635633
//
636634
// Update the task in case the `Sender` has been moved to another
637635
// task
638-
task.task = if do_park {
639-
Some(task::current())
640-
} else {
641-
None
642-
};
636+
task.task = cx.map(|cx| cx.waker());
643637

644638
Async::Pending
645639
} else {
@@ -717,7 +711,7 @@ impl<T> Drop for Sender<T> {
717711
let prev = self.inner.num_senders.fetch_sub(1, SeqCst);
718712

719713
if prev == 1 {
720-
let _ = self.do_send(None, false);
714+
let _ = self.do_send(None, None);
721715
}
722716
}
723717
}
@@ -815,7 +809,7 @@ impl<T> Receiver<T> {
815809
}
816810

817811
// Try to park the receiver task
818-
fn try_park(&self, _cx: &mut task::Context) -> TryPark {
812+
fn try_park(&self, cx: &mut task::Context) -> TryPark {
819813
let curr = self.inner.state.load(SeqCst);
820814
let state = decode_state(curr);
821815

@@ -833,7 +827,7 @@ impl<T> Receiver<T> {
833827
return TryPark::NotEmpty;
834828
}
835829

836-
recv_task.task = Some(task::current());
830+
recv_task.task = Some(cx.waker());
837831
TryPark::Parked
838832
}
839833

futures-channel/src/oneshot.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,7 +158,7 @@ impl<T> Inner<T> {
158158
}
159159
}
160160

161-
fn poll_cancel(&self, _cx: &mut task::Context) -> Poll<(), ()> {
161+
fn poll_cancel(&self, cx: &mut task::Context) -> Poll<(), ()> {
162162
// Fast path up first, just read the flag and see if our other half is
163163
// gone. This flag is set both in our destructor and the oneshot
164164
// destructor, but our destructor hasn't run yet so if it's set then the
@@ -180,7 +180,7 @@ impl<T> Inner<T> {
180180
// may have been dropped. The first thing it does is set the flag, and
181181
// if it fails to acquire the lock it assumes that we'll see the flag
182182
// later on. So... we then try to see the flag later on!
183-
let handle = task::current();
183+
let handle = cx.waker();
184184
match self.tx_task.try_lock() {
185185
Some(mut p) => *p = Some(handle),
186186
None => return Ok(Async::Ready(())),
@@ -238,7 +238,7 @@ impl<T> Inner<T> {
238238
}
239239
}
240240

241-
fn recv(&self, _cx: &mut task::Context) -> Poll<T, Canceled> {
241+
fn recv(&self, cx: &mut task::Context) -> Poll<T, Canceled> {
242242
let mut done = false;
243243

244244
// Check to see if some data has arrived. If it hasn't then we need to
@@ -251,7 +251,7 @@ impl<T> Inner<T> {
251251
if self.complete.load(SeqCst) {
252252
done = true;
253253
} else {
254-
let task = task::current();
254+
let task = cx.waker();
255255
match self.rx_task.try_lock() {
256256
Some(mut slot) => *slot = Some(task),
257257
None => done = true,

futures-channel/tests/channel.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,7 @@ use std::sync::atomic::*;
66
use std::thread;
77

88
use futures::prelude::*;
9-
use futures::future::result;
10-
use futures::task;
9+
use futures::future::{result, poll_fn};
1110
use futures_executor::current_thread::run;
1211
use futures_channel::mpsc;
1312

@@ -42,10 +41,10 @@ fn sequence() {
4241
fn drop_sender() {
4342
let (tx, mut rx) = mpsc::channel::<u32>(1);
4443
drop(tx);
45-
match rx.poll(&mut task::Context) {
46-
Ok(Async::Ready(None)) => {}
47-
_ => panic!("channel should be done"),
48-
}
44+
let f = poll_fn(|cx| {
45+
rx.poll(cx)
46+
});
47+
assert_eq!(run(|c| c.block_on(f)).unwrap(), None)
4948
}
5049

5150
#[test]

0 commit comments

Comments
 (0)