Skip to content

Commit 08e20fc

Browse files
authored
fs: add support for non-threadpool executors (#1495)
Provides a thread pool dedicated to running blocking operations (#588) and update `tokio-fs` to use this pool. In an effort to make incremental progress, this is an initial step towards a final solution. First, it provides a very basic pool implementation with the intend that the pool will be replaced before the final release. Second, it updates `tokio-fs` to always use this blocking pool instead of conditionally using `threadpool::blocking`. Issue #588 contains additional discussion around potential improvements to the "blocking for all" strategy. The implementation provided here builds on work started in #954 and continued in #1045. The general idea is th same as #1045, but the PR improves on some of the details: * The number of explicit operations tracked by `File` is reduced only to the ones that could interact. All other ops are spawned on the blocking pool without being tracked by the `File` instance. * The `seek` implementation is not backed by a trait and `poll_seek` function. This avoids the question of how to model non-blocking seeks on top of a blocking file. In this patch, `seek` is represented as an `async fn`. If the associated future is dropped before the caller observes the return value, we make no effort to define the state in which the file ends up.
1 parent 08099bb commit 08e20fc

35 files changed

+1975
-264
lines changed

tokio-executor/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ keywords = ["futures", "tokio"]
2121
categories = ["concurrency", "asynchronous"]
2222

2323
[features]
24+
blocking = ["tokio-sync"]
2425
current-thread = ["crossbeam-channel"]
2526
threadpool = [
2627
"tokio-sync",

tokio-executor/src/blocking.rs

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
//! Thread pool for blocking operations
2+
3+
use tokio_sync::oneshot;
4+
5+
use lazy_static::lazy_static;
6+
use std::collections::VecDeque;
7+
use std::future::Future;
8+
use std::pin::Pin;
9+
use std::sync::{Condvar, Mutex};
10+
use std::task::{Context, Poll};
11+
use std::thread;
12+
use std::time::Duration;
13+
14+
struct Pool {
15+
shared: Mutex<Shared>,
16+
condvar: Condvar,
17+
}
18+
19+
struct Shared {
20+
queue: VecDeque<Box<dyn FnOnce() + Send>>,
21+
num_th: u32,
22+
num_idle: u32,
23+
}
24+
25+
lazy_static! {
26+
static ref POOL: Pool = Pool::new();
27+
}
28+
29+
const MAX_THREADS: u32 = 1_000;
30+
const KEEP_ALIVE: Duration = Duration::from_secs(10);
31+
32+
/// Result of a blocking operation running on the blocking thread pool.
33+
#[derive(Debug)]
34+
pub struct Blocking<T> {
35+
rx: oneshot::Receiver<T>,
36+
}
37+
38+
/// Run the provided function on a threadpool dedicated to blocking operations.
39+
pub fn run<F, R>(f: F) -> Blocking<R>
40+
where
41+
F: FnOnce() -> R + Send + 'static,
42+
R: Send + 'static,
43+
{
44+
let (tx, rx) = oneshot::channel();
45+
46+
let should_spawn = {
47+
let mut shared = POOL.shared.lock().unwrap();
48+
49+
shared.queue.push_back(Box::new(move || {
50+
// The receiver may have dropped
51+
let _ = tx.send(f());
52+
}));
53+
54+
if shared.num_idle == 0 {
55+
// No threads are able to process the task
56+
57+
if shared.num_th == MAX_THREADS {
58+
// At max number of threads
59+
false
60+
} else {
61+
shared.num_th += 1;
62+
true
63+
}
64+
} else {
65+
shared.num_idle -= 1;
66+
POOL.condvar.notify_one();
67+
false
68+
}
69+
};
70+
71+
if should_spawn {
72+
spawn_thread();
73+
}
74+
75+
Blocking { rx }
76+
}
77+
78+
impl<T> Future for Blocking<T> {
79+
type Output = T;
80+
81+
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
82+
use std::task::Poll::*;
83+
84+
match Pin::new(&mut self.rx).poll(cx) {
85+
Ready(Ok(v)) => Ready(v),
86+
Ready(Err(_)) => panic!(
87+
"the blocking operation has been dropped before completing. \
88+
This should not happen and is a bug."
89+
),
90+
Pending => Pending,
91+
}
92+
}
93+
}
94+
95+
fn spawn_thread() {
96+
thread::Builder::new()
97+
.name("tokio-blocking-driver".to_string())
98+
.spawn(|| {
99+
'outer: loop {
100+
let mut shared = POOL.shared.lock().unwrap();
101+
102+
if let Some(task) = shared.queue.pop_front() {
103+
drop(shared);
104+
run_task(task);
105+
continue;
106+
}
107+
108+
// IDLE
109+
shared.num_idle += 1;
110+
111+
loop {
112+
shared = POOL.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap().0;
113+
114+
if let Some(task) = shared.queue.pop_front() {
115+
drop(shared);
116+
run_task(task);
117+
continue 'outer;
118+
}
119+
}
120+
}
121+
})
122+
.unwrap();
123+
}
124+
125+
fn run_task(f: Box<dyn FnOnce() + Send>) {
126+
use std::panic::{catch_unwind, AssertUnwindSafe};
127+
128+
let _ = catch_unwind(AssertUnwindSafe(|| f()));
129+
}
130+
131+
impl Pool {
132+
fn new() -> Pool {
133+
Pool {
134+
shared: Mutex::new(Shared {
135+
queue: VecDeque::new(),
136+
num_th: 0,
137+
num_idle: 0,
138+
}),
139+
condvar: Condvar::new(),
140+
}
141+
}
142+
}

tokio-executor/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,9 @@ mod global;
6767
pub mod park;
6868
mod typed;
6969

70+
#[cfg(feature = "blocking")]
71+
pub mod blocking;
72+
7073
#[cfg(feature = "current-thread")]
7174
pub mod current_thread;
7275

tokio-fs/Cargo.toml

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,16 @@ categories = ["asynchronous", "network-programming", "filesystem"]
2323

2424
[dependencies]
2525
tokio-io = { version = "=0.2.0-alpha.2", features = ["util"], path = "../tokio-io" }
26-
tokio-executor = { version = "=0.2.0-alpha.2", features = ["threadpool"], path = "../tokio-executor" }
26+
tokio-executor = { version = "=0.2.0-alpha.2", features = ["blocking"], path = "../tokio-executor" }
27+
tokio-sync = { version = "=0.2.0-alpha.2", path = "../tokio-sync" }
2728

2829
futures-core-preview = "=0.3.0-alpha.18"
2930
futures-util-preview = "=0.3.0-alpha.18"
31+
lazy_static = "1.3.0"
3032

3133
[dev-dependencies]
32-
tokio = { version = "0.2.0-alpha.1", path = "../tokio" }
34+
tokio = { version = "=0.2.0-alpha.2", path = "../tokio" }
35+
tokio-test = { version = "=0.2.0-alpha.2", path = "../tokio-test" }
3336

3437
rand = "0.7"
3538
tempfile = "3"

0 commit comments

Comments
 (0)