Skip to content

Reorganize the futures_core::task module #762

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 13, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 37 additions & 28 deletions futures-channel/benches/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,21 +31,22 @@ fn notify_noop() -> NotifyHandle {
#[bench]
fn unbounded_1_tx(b: &mut Bencher) {
b.iter(|| {
let (tx, rx) = unbounded();

let mut rx = task::spawn(rx);
let (tx, mut rx) = unbounded();
let mut notify = || notify_noop().into();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);

// 1000 iterations to avoid measuring overhead of initialization
// Result should be divided by 1000
for i in 0..1000 {

// Poll, not ready, park
assert_eq!(Ok(Async::Pending), rx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Pending), rx.poll(&mut cx));

UnboundedSender::unbounded_send(&tx, i).unwrap();

// Now poll ready
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
}
})
}
Expand All @@ -54,34 +55,39 @@ fn unbounded_1_tx(b: &mut Bencher) {
#[bench]
fn unbounded_100_tx(b: &mut Bencher) {
b.iter(|| {
let (tx, rx) = unbounded();

let mut rx = task::spawn(rx);
let (tx, mut rx) = unbounded();
let mut notify = || notify_noop().into();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);

let tx: Vec<_> = (0..100).map(|_| tx.clone()).collect();

// 1000 send/recv operations total, result should be divided by 1000
for _ in 0..10 {
for i in 0..tx.len() {
assert_eq!(Ok(Async::Pending), rx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Pending), rx.poll(&mut cx));

UnboundedSender::unbounded_send(&tx[i], i).unwrap();

assert_eq!(Ok(Async::Ready(Some(i))), rx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
}
}
})
}

#[bench]
fn unbounded_uncontended(b: &mut Bencher) {
let mut notify = || notify_noop().into();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);

b.iter(|| {
let (tx, mut rx) = unbounded();

for i in 0..1000 {
UnboundedSender::unbounded_send(&tx, i).expect("send");
// No need to create a task, because poll is not going to park.
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut task::Context));
assert_eq!(Ok(Async::Ready(Some(i))), rx.poll(&mut cx));
}
})
}
Expand All @@ -103,7 +109,7 @@ impl Stream for TestSender {
Err(_) => panic!(),
Ok(Ok(())) => {
self.last += 1;
assert_eq!(Ok(Async::Ready(())), self.tx.flush(&mut task::Context));
assert_eq!(Ok(Async::Ready(())), self.tx.flush(cx));
Ok(Async::Ready(Some(self.last)))
}
Ok(Err(_)) => {
Expand All @@ -117,20 +123,22 @@ impl Stream for TestSender {
/// Single producers, single consumer
#[bench]
fn bounded_1_tx(b: &mut Bencher) {
let mut notify = || notify_noop().into();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);

b.iter(|| {
let (tx, rx) = channel(0);
let (tx, mut rx) = channel(0);

let mut tx = task::spawn(TestSender {
let mut tx = TestSender {
tx: tx,
last: 0,
});

let mut rx = task::spawn(rx);
};

for i in 0..1000 {
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Pending), tx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx.poll(&mut cx));
assert_eq!(Ok(Async::Pending), tx.poll(&mut cx));
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll(&mut cx));
}
})
}
Expand All @@ -140,25 +148,26 @@ fn bounded_1_tx(b: &mut Bencher) {
fn bounded_100_tx(b: &mut Bencher) {
b.iter(|| {
// Each sender can send one item after specified capacity
let (tx, rx) = channel(0);
let (tx, mut rx) = channel(0);
let mut notify = || notify_noop().into();
let mut map = task::LocalMap::new();
let mut cx = task::Context::new(&mut map, 0, &mut notify);

let mut tx: Vec<_> = (0..100).map(|_| {
task::spawn(TestSender {
TestSender {
tx: tx.clone(),
last: 0
})
}
}).collect();

let mut rx = task::spawn(rx);

for i in 0..10 {
for j in 0..tx.len() {
// Send an item
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Ready(Some(i + 1))), tx[j].poll(&mut cx));
// Then block
assert_eq!(Ok(Async::Pending), tx[j].poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Pending), tx[j].poll(&mut cx));
// Recv the item
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll_stream_notify(&notify_noop(), 1));
assert_eq!(Ok(Async::Ready(Some(i + 1))), rx.poll(&mut cx));
}
}
})
Expand Down
48 changes: 21 additions & 27 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::usize;

use futures_core::task::{self, Task};
use futures_core::task::{self, Waker};
use futures_core::{Async, Poll, Stream};

use mpsc::queue::{Queue, PopResult};
Expand Down Expand Up @@ -268,7 +268,7 @@ struct State {
#[derive(Debug)]
struct ReceiverTask {
unparked: bool,
task: Option<Task>,
task: Option<Waker>,
}

// Returned from Receiver::try_park()
Expand All @@ -295,7 +295,7 @@ const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
// Sent to the consumer to wake up blocked producers
#[derive(Debug)]
struct SenderTask {
task: Option<Task>,
task: Option<Waker>,
is_parked: bool,
}

Expand Down Expand Up @@ -397,14 +397,14 @@ impl<T> Sender<T> {
/// notified when the channel is no longer full.
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(false).is_ready() {
if !self.poll_unparked(None).is_ready() {
return Err(TrySendError {
kind: TrySendErrorKind::Full(msg),
});
}

// The channel has capacity to accept the message, so send it
self.do_send(Some(msg), false)
self.do_send(Some(msg), None)
.map_err(|SendError(v)| {
TrySendError {
kind: TrySendErrorKind::Disconnected(v),
Expand All @@ -419,22 +419,24 @@ impl<T> Sender<T> {
/// awoken when the channel is ready to receive more messages.
///
/// On successful completion, this function will return `Ok(Ok(()))`.
pub fn start_send(&mut self, _cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
pub fn start_send(&mut self, cx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
// If the sender is currently blocked, reject the message before doing
// any work.
if !self.poll_unparked(true).is_ready() {
if !self.poll_unparked(Some(cx)).is_ready() {
return Ok(Err(msg));
}

// The channel has capacity to accept the message, so send it.
self.do_send(Some(msg), true)?;
self.do_send(Some(msg), Some(cx))?;

Ok(Ok(()))
}

// Do the send without failing
// None means close
fn do_send(&mut self, msg: Option<T>, do_park: bool) -> Result<(), SendError<T>> {
fn do_send(&mut self, msg: Option<T>, cx: Option<&mut task::Context>)
-> Result<(), SendError<T>>
{
// First, increment the number of messages contained by the channel.
// This operation will also atomically determine if the sender task
// should be parked.
Expand Down Expand Up @@ -469,7 +471,7 @@ impl<T> Sender<T> {
// maintain internal consistency, a blank message is pushed onto the
// parked task queue.
if park_self {
self.park(do_park);
self.park(cx);
}

self.queue_push_and_signal(msg);
Expand Down Expand Up @@ -573,14 +575,10 @@ impl<T> Sender<T> {
}
}

fn park(&mut self, can_park: bool) {
fn park(&mut self, cx: Option<&mut task::Context>) {
// TODO: clean up internal state if the task::current will fail

let task = if can_park {
Some(task::current())
} else {
None
};
let task = cx.map(|cx| cx.waker());

{
let mut sender = self.sender_task.lock().unwrap();
Expand Down Expand Up @@ -608,16 +606,16 @@ impl<T> Sender<T> {
/// # Panics
///
/// This method will panic if called from outside the context of a task or future.
pub fn poll_ready(&mut self, _cx: &mut task::Context) -> Poll<(), SendError<()>> {
pub fn poll_ready(&mut self, cx: &mut task::Context) -> Poll<(), SendError<()>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Err(SendError(()));
}

Ok(self.poll_unparked(true))
Ok(self.poll_unparked(Some(cx)))
}

fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
fn poll_unparked(&mut self, cx: Option<&mut task::Context>) -> Async<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked {
Expand All @@ -635,11 +633,7 @@ impl<T> Sender<T> {
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = if do_park {
Some(task::current())
} else {
None
};
task.task = cx.map(|cx| cx.waker());

Async::Pending
} else {
Expand Down Expand Up @@ -717,7 +711,7 @@ impl<T> Drop for Sender<T> {
let prev = self.inner.num_senders.fetch_sub(1, SeqCst);

if prev == 1 {
let _ = self.do_send(None, false);
let _ = self.do_send(None, None);
}
}
}
Expand Down Expand Up @@ -815,7 +809,7 @@ impl<T> Receiver<T> {
}

// Try to park the receiver task
fn try_park(&self, _cx: &mut task::Context) -> TryPark {
fn try_park(&self, cx: &mut task::Context) -> TryPark {
let curr = self.inner.state.load(SeqCst);
let state = decode_state(curr);

Expand All @@ -833,7 +827,7 @@ impl<T> Receiver<T> {
return TryPark::NotEmpty;
}

recv_task.task = Some(task::current());
recv_task.task = Some(cx.waker());
TryPark::Parked
}

Expand Down
14 changes: 7 additions & 7 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::error::Error;
use std::fmt;

use futures_core::{Future, Poll, Async};
use futures_core::task::{self, Task};
use futures_core::task::{self, Waker};

use lock::Lock;

Expand Down Expand Up @@ -60,11 +60,11 @@ struct Inner<T> {
/// the `Lock` here, unlike in `data` above, is important to resolve races.
/// Both the `Receiver` and the `Sender` halves understand that if they
/// can't acquire the lock then some important interference is happening.
rx_task: Lock<Option<Task>>,
rx_task: Lock<Option<Waker>>,

/// Like `rx_task` above, except for the task blocked in
/// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
tx_task: Lock<Option<Task>>,
tx_task: Lock<Option<Waker>>,
}

/// Creates a new futures-aware, one-shot channel.
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<T> Inner<T> {
}
}

fn poll_cancel(&self, _cx: &mut task::Context) -> Poll<(), ()> {
fn poll_cancel(&self, cx: &mut task::Context) -> Poll<(), ()> {
// Fast path up first, just read the flag and see if our other half is
// gone. This flag is set both in our destructor and the oneshot
// destructor, but our destructor hasn't run yet so if it's set then the
Expand All @@ -180,7 +180,7 @@ impl<T> Inner<T> {
// may have been dropped. The first thing it does is set the flag, and
// if it fails to acquire the lock it assumes that we'll see the flag
// later on. So... we then try to see the flag later on!
let handle = task::current();
let handle = cx.waker();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
None => return Ok(Async::Ready(())),
Expand Down Expand Up @@ -238,7 +238,7 @@ impl<T> Inner<T> {
}
}

fn recv(&self, _cx: &mut task::Context) -> Poll<T, Canceled> {
fn recv(&self, cx: &mut task::Context) -> Poll<T, Canceled> {
let mut done = false;

// Check to see if some data has arrived. If it hasn't then we need to
Expand All @@ -251,7 +251,7 @@ impl<T> Inner<T> {
if self.complete.load(SeqCst) {
done = true;
} else {
let task = task::current();
let task = cx.waker();
match self.rx_task.try_lock() {
Some(mut slot) => *slot = Some(task),
None => done = true,
Expand Down
11 changes: 5 additions & 6 deletions futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,7 @@ use std::sync::atomic::*;
use std::thread;

use futures::prelude::*;
use futures::future::result;
use futures::task;
use futures::future::{result, poll_fn};
use futures_executor::current_thread::run;
use futures_channel::mpsc;

Expand Down Expand Up @@ -42,10 +41,10 @@ fn sequence() {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
match rx.poll(&mut task::Context) {
Ok(Async::Ready(None)) => {}
_ => panic!("channel should be done"),
}
let f = poll_fn(|cx| {
rx.poll(cx)
});
assert_eq!(run(|c| c.block_on(f)).unwrap(), None)
}

#[test]
Expand Down
Loading