Skip to content

add an unstable task::blocking function #251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Oct 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/fs/read_dir.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::fs::DirEntry;
use crate::future::Future;
use crate::io;
use crate::stream::Stream;
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Returns a stream of entries in a directory.
///
Expand Down Expand Up @@ -71,7 +71,7 @@ pub struct ReadDir(State);
#[derive(Debug)]
enum State {
Idle(Option<std::fs::ReadDir>),
Busy(blocking::JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
Busy(JoinHandle<(std::fs::ReadDir, Option<io::Result<std::fs::DirEntry>>)>),
}

impl ReadDir {
Expand Down
4 changes: 2 additions & 2 deletions src/io/stderr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cfg_if::cfg_if;

use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Constructs a new handle to the standard error of the current process.
///
Expand Down Expand Up @@ -56,7 +56,7 @@ enum State {
/// The stderr is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stderr.
Busy(blocking::JoinHandle<State>),
Busy(JoinHandle<State>),
}

/// Inner representation of the asynchronous stderr.
Expand Down
4 changes: 2 additions & 2 deletions src/io/stdin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cfg_if::cfg_if;

use crate::future::{self, Future};
use crate::io::{self, Read};
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Constructs a new handle to the standard input of the current process.
///
Expand Down Expand Up @@ -57,7 +57,7 @@ enum State {
/// The stdin is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stdin.
Busy(blocking::JoinHandle<State>),
Busy(JoinHandle<State>),
}

/// Inner representation of the asynchronous stdin.
Expand Down
4 changes: 2 additions & 2 deletions src/io/stdout.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use cfg_if::cfg_if;

use crate::future::Future;
use crate::io::{self, Write};
use crate::task::{blocking, Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

/// Constructs a new handle to the standard output of the current process.
///
Expand Down Expand Up @@ -56,7 +56,7 @@ enum State {
/// The stdout is blocked on an asynchronous operation.
///
/// Awaiting this operation will result in the new state of the stdout.
Busy(blocking::JoinHandle<State>),
Busy(JoinHandle<State>),
}

/// Inner representation of the asynchronous stdout.
Expand Down
5 changes: 2 additions & 3 deletions src/net/addr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@ use cfg_if::cfg_if;

use crate::future::Future;
use crate::io;
use crate::task::blocking;
use crate::task::{Context, Poll};
use crate::task::{blocking, Context, JoinHandle, Poll};

cfg_if! {
if #[cfg(feature = "docs")] {
Expand Down Expand Up @@ -48,7 +47,7 @@ pub trait ToSocketAddrs {
#[allow(missing_debug_implementations)]
pub enum ToSocketAddrsFuture<'a, I> {
Phantom(PhantomData<&'a ()>),
Join(blocking::JoinHandle<io::Result<I>>),
Join(JoinHandle<io::Result<I>>),
Ready(Option<io::Result<I>>),
}

Expand Down
38 changes: 8 additions & 30 deletions src/task/blocking.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! A thread pool for running blocking functions asynchronously.

use std::fmt;
use std::pin::Pin;
use std::sync::atomic::{AtomicU64, Ordering};
use std::thread;
use std::time::Duration;
Expand All @@ -10,16 +8,16 @@ use crossbeam_channel::{bounded, Receiver, Sender};
use lazy_static::lazy_static;

use crate::future::Future;
use crate::task::{Context, Poll};
use crate::task::task::{JoinHandle, Tag};
use crate::utils::abort_on_panic;

const MAX_THREADS: u64 = 10_000;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);

struct Pool {
sender: Sender<async_task::Task<()>>,
receiver: Receiver<async_task::Task<()>>,
sender: Sender<async_task::Task<Tag>>,
receiver: Receiver<async_task::Task<Tag>>,
}

lazy_static! {
Expand Down Expand Up @@ -85,7 +83,7 @@ fn maybe_create_another_blocking_thread() {
// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
fn schedule(t: async_task::Task<()>) {
fn schedule(t: async_task::Task<Tag>) {
if let Err(err) = POOL.sender.try_send(t) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
Expand All @@ -98,35 +96,15 @@ fn schedule(t: async_task::Task<()>) {
/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
pub fn spawn<F, R>(future: F) -> JoinHandle<R>
pub(crate) fn spawn<F, R>(future: F) -> JoinHandle<R>
where
F: Future<Output = R> + Send + 'static,
R: Send + 'static,
{
let (task, handle) = async_task::spawn(future, schedule, ());
let tag = Tag::new(None);
let (task, handle) = async_task::spawn(future, schedule, tag);
task.schedule();
JoinHandle(handle)
}

/// A handle to a blocking task.
pub struct JoinHandle<R>(async_task::JoinHandle<R, ()>);

impl<R> Unpin for JoinHandle<R> {}

impl<R> Future for JoinHandle<R> {
type Output = R;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Pin::new(&mut self.0).poll(cx).map(|out| out.unwrap())
}
}

impl<R> fmt::Debug for JoinHandle<R> {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("JoinHandle")
.field("handle", &self.0)
.finish()
}
JoinHandle::new(handle)
}

/// Generates a random number in `0..n`.
Expand Down
16 changes: 16 additions & 0 deletions src/task/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,3 +48,19 @@ mod task_local;
mod worker;

pub(crate) mod blocking;

/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks.
// Once this function stabilizes we should merge `blocking::spawn` into this so
// all code in our crate uses `task::blocking` too.
#[cfg(any(feature = "unstable", feature = "docs"))]
#[cfg_attr(feature = "docs", doc(cfg(unstable)))]
#[inline]
pub fn blocking<F, R>(future: F) -> task::JoinHandle<R>
where
F: crate::future::Future<Output = R> + Send + 'static,
R: Send + 'static,
{
blocking::spawn(future)
}