Skip to content

[WIP] Explicit task::Context argument to poll. #744

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 21 additions & 29 deletions futures-channel/src/mpsc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ use std::sync::{Arc, Mutex};
use std::thread;
use std::usize;

use futures_core::task::{self, Task};
use futures_core::task;
use futures_core::{Async, Poll, Stream};

use mpsc::queue::{Queue, PopResult};
Expand Down Expand Up @@ -268,7 +268,7 @@ struct State {
#[derive(Debug)]
struct ReceiverTask {
unparked: bool,
task: Option<Task>,
task: Option<task::Waker>,
}

// Returned from Receiver::try_park()
Expand All @@ -295,7 +295,7 @@ const MAX_BUFFER: usize = MAX_CAPACITY >> 1;
// Sent to the consumer to wake up blocked producers
#[derive(Debug)]
struct SenderTask {
task: Option<Task>,
task: Option<task::Waker>,
is_parked: bool,
}

Expand All @@ -311,7 +311,7 @@ impl SenderTask {
self.is_parked = false;

if let Some(task) = self.task.take() {
task.notify();
task.wake();
}
}
}
Expand Down Expand Up @@ -397,7 +397,7 @@ impl<T> Sender<T> {
/// notified when the channel is no longer full.
pub fn try_send(&mut self, msg: T) -> Result<(), TrySendError<T>> {
// If the sender is currently blocked, reject the message
if !self.poll_unparked(false).is_ready() {
if !self.poll_unparked(None).is_ready() {
return Err(TrySendError {
kind: TrySendErrorKind::Full(msg),
});
Expand All @@ -419,10 +419,10 @@ impl<T> Sender<T> {
/// awoken when the channel is ready to receive more messages.
///
/// On successful completion, this function will return `Ok(Ok(()))`.
pub fn start_send(&mut self, msg: T) -> Result<Result<(), T>, SendError<T>> {
pub fn start_send(&mut self, ctx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
// If the sender is currently blocked, reject the message before doing
// any work.
if !self.poll_unparked(true).is_ready() {
if !self.poll_unparked(Some(ctx)).is_ready() {
return Ok(Err(msg));
}

Expand Down Expand Up @@ -569,15 +569,15 @@ impl<T> Sender<T> {
};

if let Some(task) = task {
task.notify();
task.wake();
}
}

fn park(&mut self, can_park: bool) {
// TODO: clean up internal state if the task::current will fail

let task = if can_park {
Some(task::current())
None// TODO Some(task::current())
} else {
None
};
Expand All @@ -604,20 +604,16 @@ impl<T> Sender<T> {
/// Returns `Ok(Async::Ready(_))` if there is sufficient capacity, or returns
/// `Ok(Async::Pending)` if the channel is not guaranteed to have capacity. Returns
/// `Err(SendError(_))` if the receiver has been dropped.
///
/// # Panics
///
/// This method will panic if called from outside the context of a task or future.
pub fn poll_ready(&mut self) -> Poll<(), SendError<()>> {
pub fn poll_ready(&mut self, ctx: &mut task::Context) -> Poll<(), SendError<()>> {
let state = decode_state(self.inner.state.load(SeqCst));
if !state.is_open {
return Err(SendError(()));
}

Ok(self.poll_unparked(true))
Ok(self.poll_unparked(Some(ctx)))
}

fn poll_unparked(&mut self, do_park: bool) -> Async<()> {
fn poll_unparked(&mut self, ctx: Option<&mut task::Context>) -> Async<()> {
// First check the `maybe_parked` variable. This avoids acquiring the
// lock in most cases
if self.maybe_parked {
Expand All @@ -635,11 +631,7 @@ impl<T> Sender<T> {
//
// Update the task in case the `Sender` has been moved to another
// task
task.task = if do_park {
Some(task::current())
} else {
None
};
task.task = ctx.map(|c| c.waker());

Async::Pending
} else {
Expand All @@ -657,8 +649,8 @@ impl<T> UnboundedSender<T> {
///
/// On a successful `start_send`, this function will return
/// `Ok(Ok(())`.
pub fn start_send(&mut self, msg: T) -> Result<Result<(), T>, SendError<T>> {
self.0.start_send(msg)
pub fn start_send(&mut self, ctx: &mut task::Context, msg: T) -> Result<Result<(), T>, SendError<T>> {
self.0.start_send(ctx, msg)
}

/// Sends the provided message along this channel.
Expand Down Expand Up @@ -815,7 +807,7 @@ impl<T> Receiver<T> {
}

// Try to park the receiver task
fn try_park(&self) -> TryPark {
fn try_park(&self, ctx: &mut task::Context) -> TryPark {
let curr = self.inner.state.load(SeqCst);
let state = decode_state(curr);

Expand All @@ -833,7 +825,7 @@ impl<T> Receiver<T> {
return TryPark::NotEmpty;
}

recv_task.task = Some(task::current());
recv_task.task = Some(ctx.waker());
TryPark::Parked
}

Expand All @@ -858,7 +850,7 @@ impl<T> Stream for Receiver<T> {
type Item = T;
type Error = ();

fn poll(&mut self) -> Poll<Option<T>, ()> {
fn poll(&mut self, ctx: &mut task::Context) -> Poll<Option<T>, ()> {
loop {
// Try to read a message off of the message queue.
let msg = match self.next_message() {
Expand All @@ -867,7 +859,7 @@ impl<T> Stream for Receiver<T> {
// There are no messages to read, in this case, attempt to
// park. The act of parking will verify that the channel is
// still empty after the park operation has completed.
match self.try_park() {
match self.try_park(ctx) {
TryPark::Parked => {
// The task was parked, and the channel is still
// empty, return Pending.
Expand Down Expand Up @@ -925,8 +917,8 @@ impl<T> Stream for UnboundedReceiver<T> {
type Item = T;
type Error = ();

fn poll(&mut self) -> Poll<Option<T>, ()> {
self.0.poll()
fn poll(&mut self, ctx: &mut task::Context) -> Poll<Option<T>, ()> {
self.0.poll(ctx)
}
}

Expand Down
30 changes: 14 additions & 16 deletions futures-channel/src/oneshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::error::Error;
use std::fmt;

use futures_core::{Future, Poll, Async};
use futures_core::task::{self, Task};
use futures_core::task;

use lock::Lock;

Expand Down Expand Up @@ -60,11 +60,11 @@ struct Inner<T> {
/// the `Lock` here, unlike in `data` above, is important to resolve races.
/// Both the `Receiver` and the `Sender` halves understand that if they
/// can't acquire the lock then some important interference is happening.
rx_task: Lock<Option<Task>>,
rx_task: Lock<Option<task::Waker>>,

/// Like `rx_task` above, except for the task blocked in
/// `Sender::poll_cancel`. Additionally, `Lock` cannot be `UnsafeCell`.
tx_task: Lock<Option<Task>>,
tx_task: Lock<Option<task::Waker>>,
}

/// Creates a new futures-aware, one-shot channel.
Expand Down Expand Up @@ -158,7 +158,7 @@ impl<T> Inner<T> {
}
}

fn poll_cancel(&self) -> Poll<(), ()> {
fn poll_cancel(&self, ctx: &mut task::Context) -> Poll<(), ()> {
// Fast path up first, just read the flag and see if our other half is
// gone. This flag is set both in our destructor and the oneshot
// destructor, but our destructor hasn't run yet so if it's set then the
Expand All @@ -180,9 +180,8 @@ impl<T> Inner<T> {
// may have been dropped. The first thing it does is set the flag, and
// if it fails to acquire the lock it assumes that we'll see the flag
// later on. So... we then try to see the flag later on!
let handle = task::current();
match self.tx_task.try_lock() {
Some(mut p) => *p = Some(handle),
Some(mut p) => *p = Some(ctx.waker()),
None => return Ok(Async::Ready(())),
}
if self.complete.load(SeqCst) {
Expand Down Expand Up @@ -221,7 +220,7 @@ impl<T> Inner<T> {
if let Some(mut slot) = self.rx_task.try_lock() {
if let Some(task) = slot.take() {
drop(slot);
task.notify();
task.wake();
}
}
}
Expand All @@ -233,12 +232,12 @@ impl<T> Inner<T> {
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.notify()
task.wake()
}
}
}

fn recv(&self) -> Poll<T, Canceled> {
fn recv(&self, ctx: &mut task::Context) -> Poll<T, Canceled> {
let mut done = false;

// Check to see if some data has arrived. If it hasn't then we need to
Expand All @@ -251,9 +250,8 @@ impl<T> Inner<T> {
if self.complete.load(SeqCst) {
done = true;
} else {
let task = task::current();
match self.rx_task.try_lock() {
Some(mut slot) => *slot = Some(task),
Some(mut slot) => *slot = Some(ctx.waker()),
None => done = true,
}
}
Expand Down Expand Up @@ -306,7 +304,7 @@ impl<T> Inner<T> {
if let Some(mut handle) = self.tx_task.try_lock() {
if let Some(task) = handle.take() {
drop(handle);
task.notify()
task.wake()
}
}
}
Expand Down Expand Up @@ -351,8 +349,8 @@ impl<T> Sender<T> {
///
/// If you're calling this function from a context that does not have a
/// task, then you can use the `is_canceled` API instead.
pub fn poll_cancel(&mut self) -> Poll<(), ()> {
self.inner.poll_cancel()
pub fn poll_cancel(&mut self, ctx: &mut task::Context) -> Poll<(), ()> {
self.inner.poll_cancel(ctx)
}

/// Tests to see whether this `Sender`'s corresponding `Receiver`
Expand Down Expand Up @@ -412,8 +410,8 @@ impl<T> Future for Receiver<T> {
type Item = T;
type Error = Canceled;

fn poll(&mut self) -> Poll<T, Canceled> {
self.inner.recv()
fn poll(&mut self, ctx: &mut task::Context) -> Poll<T, Canceled> {
self.inner.recv(ctx)
}
}

Expand Down
2 changes: 1 addition & 1 deletion futures-channel/tests/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ fn sequence() {
fn drop_sender() {
let (tx, mut rx) = mpsc::channel::<u32>(1);
drop(tx);
match rx.poll() {
match rx.poll(&mut TaskContext::panicking()) {
Ok(Async::Ready(None)) => {}
_ => panic!("channel should be done"),
}
Expand Down
40 changes: 20 additions & 20 deletions futures-channel/tests/mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,29 +34,29 @@ fn send_recv_no_buffer() {

// Run on a task context
let f = lazy(move || {
assert!(tx.flush().unwrap().is_ready());
assert!(tx.poll_ready().unwrap().is_ready());
assert!(tx.flush(&mut TaskContext::panicking()).unwrap().is_ready());
assert!(tx.poll_ready(&mut TaskContext::panicking()).unwrap().is_ready());

// Send first message
let res = tx.start_send(1).unwrap();
let res = tx.start_send(&mut TaskContext::panicking(), 1).unwrap();
assert!(res.is_ok());
assert!(tx.poll_ready().unwrap().is_not_ready());
assert!(tx.poll_ready(&mut TaskContext::panicking()).unwrap().is_not_ready());

// Send second message
let res = tx.start_send(2).unwrap();
let res = tx.start_send(&mut TaskContext::panicking(), 2).unwrap();
assert!(res.is_err());

// Take the value
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(1)));
assert!(tx.poll_ready().unwrap().is_ready());
assert_eq!(rx.poll(&mut TaskContext::panicking()).unwrap(), Async::Ready(Some(1)));
assert!(tx.poll_ready(&mut TaskContext::panicking()).unwrap().is_ready());

let res = tx.start_send(2).unwrap();
let res = tx.start_send(&mut TaskContext::panicking(), 2).unwrap();
assert!(res.is_ok());
assert!(tx.poll_ready().unwrap().is_not_ready());
assert!(tx.poll_ready(&mut TaskContext::panicking()).unwrap().is_not_ready());

// Take the value
assert_eq!(rx.poll().unwrap(), Async::Ready(Some(2)));
assert!(tx.poll_ready().unwrap().is_ready());
assert_eq!(rx.poll(&mut TaskContext::panicking()).unwrap(), Async::Ready(Some(2)));
assert!(tx.poll_ready(&mut TaskContext::panicking()).unwrap().is_ready());

Ok::<(), ()>(())
});
Expand Down Expand Up @@ -125,8 +125,8 @@ fn recv_close_gets_none() {
let f = lazy(move || {
rx.close();

assert_eq!(rx.poll(), Ok(Async::Ready(None)));
assert!(tx.poll_ready().is_err());
assert_eq!(rx.poll(&mut TaskContext::panicking()), Ok(Async::Ready(None)));
assert!(tx.poll_ready(&mut TaskContext::panicking()).is_err());

drop(tx);

Expand All @@ -142,8 +142,8 @@ fn tx_close_gets_none() {

// Run on a task context
let f = lazy(move || {
assert_eq!(rx.poll(), Ok(Async::Ready(None)));
assert_eq!(rx.poll(), Ok(Async::Ready(None)));
assert_eq!(rx.poll(&mut TaskContext::panicking()), Ok(Async::Ready(None)));
assert_eq!(rx.poll(&mut TaskContext::panicking()), Ok(Async::Ready(None)));

Ok::<(), ()>(())
});
Expand Down Expand Up @@ -322,7 +322,7 @@ fn stress_receiver_multi_task_bounded_hard() {
// Just poll
let n = n.clone();
let f = lazy(move || {
let r = match rx.poll().unwrap() {
let r = match rx.poll(&mut TaskContext::panicking()).unwrap() {
Async::Ready(Some(_)) => {
n.fetch_add(1, Ordering::Relaxed);
*lock = Some(rx);
Expand Down Expand Up @@ -443,12 +443,12 @@ fn stress_poll_ready() {
type Item = ();
type Error = ();

fn poll(&mut self) -> Poll<(), ()> {
fn poll(&mut self, ctx: &mut TaskContext) -> Poll<(), ()> {
// In a loop, check if the channel is ready. If so, push an item onto the channel
// (asserting that it doesn't attempt to block).
while self.count > 0 {
try_ready!(self.sender.poll_ready().map_err(|_| ()));
assert!(self.sender.start_send(self.count).unwrap().is_ok());
try_ready!(self.sender.poll_ready(ctx).map_err(|_| ()));
assert!(self.sender.start_send(ctx, self.count).unwrap().is_ok());
self.count -= 1;
}
Ok(Async::Ready(()))
Expand Down Expand Up @@ -523,7 +523,7 @@ fn try_send_2() {
let th = thread::spawn(|| {
run(|c| {
c.block_on(lazy(|| {
assert!(tx.start_send("fail").unwrap().is_err());
assert!(tx.start_send(&mut TaskContext::panicking(), "fail").unwrap().is_err());
Ok::<_, ()>(())
})).unwrap();

Expand Down
Loading