Skip to content

rt: Allow concurrent block_on's with basic_scheduler #2804

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Sep 23, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
192 changes: 154 additions & 38 deletions tokio/src/runtime/basic_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,22 +1,35 @@
use crate::future::poll_fn;
use crate::loom::sync::Mutex;
use crate::park::{Park, Unpark};
use crate::runtime;
use crate::runtime::task::{self, JoinHandle, Schedule, Task};
use crate::sync::Notify;
use crate::util::linked_list::{Link, LinkedList};
use crate::util::{waker_ref, Wake};
use crate::util::{waker_ref, Wake, WakerRef};

use std::cell::RefCell;
use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
use std::sync::{Arc, Mutex};
use std::task::Poll::Ready;
use std::sync::{Arc, PoisonError};
use std::task::Poll::{Pending, Ready};
use std::time::Duration;

/// Executes tasks on the current thread
pub(crate) struct BasicScheduler<P>
where
P: Park,
{
pub(crate) struct BasicScheduler<P: Park> {
/// Inner state guarded by a mutex that is shared
/// between all `block_on` calls.
inner: Mutex<Option<Inner<P>>>,

/// Notifier for waking up other threads to steal the
/// parker.
notify: Notify,

/// Sendable task spawner
spawner: Spawner,
}

/// The inner scheduler that owns the task queue and the main parker P.
struct Inner<P: Park> {
/// Scheduler run queue
///
/// When the scheduler is executed, the queue is removed from `self` and
Expand Down Expand Up @@ -59,7 +72,7 @@ struct Shared {
unpark: Box<dyn Unpark>,
}

/// Thread-local context
/// Thread-local context.
struct Context {
/// Shared scheduler state
shared: Arc<Shared>,
Expand All @@ -68,38 +81,43 @@ struct Context {
tasks: RefCell<Tasks>,
}

/// Initial queue capacity
/// Initial queue capacity.
const INITIAL_CAPACITY: usize = 64;

/// Max number of tasks to poll per tick.
const MAX_TASKS_PER_TICK: usize = 61;

/// How often ot check the remote queue first
/// How often to check the remote queue first.
const REMOTE_FIRST_INTERVAL: u8 = 31;

// Tracks the current BasicScheduler
// Tracks the current BasicScheduler.
scoped_thread_local!(static CURRENT: Context);

impl<P> BasicScheduler<P>
where
P: Park,
{
impl<P: Park> BasicScheduler<P> {
pub(crate) fn new(park: P) -> BasicScheduler<P> {
let unpark = Box::new(park.unpark());

BasicScheduler {
let spawner = Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
};

let inner = Mutex::new(Some(Inner {
tasks: Some(Tasks {
owned: LinkedList::new(),
queue: VecDeque::with_capacity(INITIAL_CAPACITY),
}),
spawner: Spawner {
shared: Arc::new(Shared {
queue: Mutex::new(VecDeque::with_capacity(INITIAL_CAPACITY)),
unpark: unpark as Box<dyn Unpark>,
}),
},
spawner: spawner.clone(),
tick: 0,
park,
}));

BasicScheduler {
inner,
notify: Notify::new(),
spawner,
}
}

Expand All @@ -108,7 +126,6 @@ where
}

/// Spawns a future onto the thread pool
#[allow(dead_code)]
pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
Expand All @@ -117,13 +134,57 @@ where
self.spawner.spawn(future)
}

pub(crate) fn block_on<F>(&mut self, future: F) -> F::Output
where
F: Future,
{
pub(crate) fn block_on<F: Future>(&self, future: F) -> F::Output {
pin!(future);

// Attempt to steal the dedicated parker and block_on the future if we can there,
// othwerwise, lets select on a notification that the parker is available
// or the future is complete.
loop {
if let Some(inner) = &mut self.take_inner() {
return inner.block_on(future);
} else {
let mut enter = crate::runtime::enter(false);

let notified = self.notify.notified();
pin!(notified);

if let Some(out) = enter
.block_on(poll_fn(|cx| {
if notified.as_mut().poll(cx).is_ready() {
return Ready(None);
}

if let Ready(out) = future.as_mut().poll(cx) {
return Ready(Some(out));
}

Pending
}))
.expect("Failed to `Enter::block_on`")
{
return out;
}
}
}
}

fn take_inner(&self) -> Option<InnerGuard<'_, P>> {
let inner = self.inner.lock().unwrap().take()?;

Some(InnerGuard {
inner: Some(inner),
basic_scheduler: &self,
})
}
}

impl<P: Park> Inner<P> {
/// Block on the future provided and drive the runtime's driver.
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
enter(self, |scheduler, context| {
let _enter = runtime::enter(false);
let waker = waker_ref(&scheduler.spawner.shared);
let _enter = crate::runtime::enter(false);
let waker = scheduler.spawner.waker_ref();
let mut cx = std::task::Context::from_waker(&waker);

pin!(future);
Expand Down Expand Up @@ -178,16 +239,16 @@ where

/// Enter the scheduler context. This sets the queue and other necessary
/// scheduler state in the thread-local
fn enter<F, R, P>(scheduler: &mut BasicScheduler<P>, f: F) -> R
fn enter<F, R, P>(scheduler: &mut Inner<P>, f: F) -> R
where
F: FnOnce(&mut BasicScheduler<P>, &Context) -> R,
F: FnOnce(&mut Inner<P>, &Context) -> R,
P: Park,
{
// Ensures the run queue is placed back in the `BasicScheduler` instance
// once `block_on` returns.`
struct Guard<'a, P: Park> {
context: Option<Context>,
scheduler: &'a mut BasicScheduler<P>,
scheduler: &'a mut Inner<P>,
}

impl<P: Park> Drop for Guard<'_, P> {
Expand All @@ -214,12 +275,23 @@ where
CURRENT.set(context, || f(scheduler, context))
}

impl<P> Drop for BasicScheduler<P>
where
P: Park,
{
impl<P: Park> Drop for BasicScheduler<P> {
fn drop(&mut self) {
enter(self, |scheduler, context| {
// Avoid a double panic if we are currently panicking and
// the lock may be poisoned.

let mut inner = match self
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner)
.take()
{
Some(inner) => inner,
None if std::thread::panicking() => return,
None => panic!("Oh no! We never placed the Inner state back, this is a bug!"),
};

enter(&mut inner, |scheduler, context| {
// Loop required here to ensure borrow is dropped between iterations
#[allow(clippy::while_let_loop)]
loop {
Expand Down Expand Up @@ -269,6 +341,10 @@ impl Spawner {
fn pop(&self) -> Option<task::Notified<Arc<Shared>>> {
self.shared.queue.lock().unwrap().pop_front()
}

fn waker_ref(&self) -> WakerRef<'_> {
waker_ref(&self.shared)
}
}

impl fmt::Debug for Spawner {
Expand Down Expand Up @@ -325,3 +401,43 @@ impl Wake for Shared {
arc_self.unpark.unpark();
}
}

// ===== InnerGuard =====

/// Used to ensure we always place the Inner value
/// back into its slot in `BasicScheduler`, even if the
/// future panics.
struct InnerGuard<'a, P: Park> {
inner: Option<Inner<P>>,
basic_scheduler: &'a BasicScheduler<P>,
}

impl<P: Park> InnerGuard<'_, P> {
fn block_on<F: Future>(&mut self, future: F) -> F::Output {
// The only time inner gets set to `None` is if we have dropped
// already so this unwrap is safe.
self.inner.as_mut().unwrap().block_on(future)
}
}

impl<P: Park> Drop for InnerGuard<'_, P> {
fn drop(&mut self) {
if let Some(scheduler) = self.inner.take() {
// We can ignore the poison error here since we are
// just replacing the state.
let mut lock = self
.basic_scheduler
.inner
.lock()
.unwrap_or_else(PoisonError::into_inner);

// Replace old scheduler back into the state to allow
// other threads to pick it up and drive it.
lock.replace(scheduler);

// Wake up other possible threads that could steal
// the dedicated parker P.
self.basic_scheduler.notify.notify_one()
}
}
}
2 changes: 1 addition & 1 deletion tokio/src/runtime/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -496,7 +496,7 @@ cfg_rt_core! {
let blocking_spawner = blocking_pool.spawner().clone();

Ok(Runtime {
kind: Kind::Basic(Mutex::new(Some(scheduler))),
kind: Kind::Basic(scheduler),
handle: Handle {
spawner,
io_handle: resources.io_handle,
Expand Down
23 changes: 3 additions & 20 deletions tokio/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ enum Kind {

/// Execute all tasks on the current-thread.
#[cfg(feature = "rt-core")]
Basic(Mutex<Option<BasicScheduler<driver::Driver>>>),
Basic(BasicScheduler<driver::Driver>),

/// Execute tasks across multiple threads.
#[cfg(feature = "rt-threaded")]
Expand Down Expand Up @@ -401,7 +401,7 @@ impl Runtime {
Kind::Shell(_) => panic!("task execution disabled"),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.spawn(future),
Kind::Basic(_exec) => self.handle.spawner.spawn(future),
Kind::Basic(exec) => exec.spawn(future),
}
}

Expand Down Expand Up @@ -461,24 +461,7 @@ impl Runtime {
}
}
#[cfg(feature = "rt-core")]
Kind::Basic(exec) => {
// TODO(lucio): clean this up and move this impl into
// `basic_scheduler.rs`, this is hacky and bad but will work for
// now.
let exec_temp = {
let mut lock = exec.lock().unwrap();
lock.take()
};

if let Some(mut exec_temp) = exec_temp {
let res = exec_temp.block_on(future);
exec.lock().unwrap().replace(exec_temp);
res
} else {
let mut enter = crate::runtime::enter(true);
enter.block_on(future).unwrap()
}
}
Kind::Basic(exec) => exec.block_on(future),
#[cfg(feature = "rt-threaded")]
Kind::ThreadPool(exec) => exec.block_on(future),
})
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/sync/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ cfg_sync! {
pub mod watch;
}

cfg_not_sync! {
cfg_rt_core! {
mod notify;
pub(crate) use notify::Notify;
}
}

cfg_not_sync! {
cfg_atomic_waker_impl! {
mod task;
Expand Down
7 changes: 7 additions & 0 deletions tokio/src/sync/notify.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
// Allow `unreachable_pub` warnings when sync is not enabled
// due to the usage of `Notify` within the `rt-core` feature set.
// When this module is compiled with `sync` enabled we will warn on
// this lint. When `rt-core` is enabled we use `pub(crate)` which
// triggers this warning but it is safe to ignore in this case.
#![cfg_attr(not(feature = "sync"), allow(unreachable_pub, dead_code))]

use crate::loom::sync::atomic::AtomicU8;
use crate::loom::sync::Mutex;
use crate::util::linked_list::{self, LinkedList};
Expand Down
4 changes: 4 additions & 0 deletions tokio/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,10 @@ mod rand;
mod wake;
pub(crate) use wake::{waker_ref, Wake};

cfg_rt_core! {
pub(crate) use wake::WakerRef;
}

cfg_rt_threaded! {
pub(crate) use rand::FastRand;

Expand Down
Loading