Skip to content

Commit 09ad536

Browse files
authored
runtime: avoid redundant unpark in current_thread scheduler (#7834)
1 parent 934f68d commit 09ad536

3 files changed

Lines changed: 111 additions & 7 deletions

File tree

tokio/src/runtime/scheduler/current_thread/mod.rs

Lines changed: 18 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::util::{waker_ref, RngSeedGenerator, Wake, WakerRef};
1515
use std::cell::RefCell;
1616
use std::collections::VecDeque;
1717
use std::future::{poll_fn, Future};
18-
use std::sync::atomic::Ordering::{AcqRel, Release};
18+
use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
1919
use std::task::Poll::{Pending, Ready};
2020
use std::task::Waker;
2121
use std::thread::ThreadId;
@@ -380,8 +380,8 @@ impl Context {
380380
core = c;
381381
}
382382

383-
// This check will fail if `before_park` spawns a task for us to run
384-
// instead of parking the thread
383+
// If `before_park` spawns a task (or otherwise schedules work for us), then we should not
384+
// park the thread.
385385
if !self.has_pending_work(&core) {
386386
// Park until the thread is signaled
387387
core.metrics.about_to_park();
@@ -415,7 +415,7 @@ impl Context {
415415
}
416416

417417
fn has_pending_work(&self, core: &Core) -> bool {
418-
!core.tasks.is_empty() || !self.defer.is_empty()
418+
!core.tasks.is_empty() || !self.defer.is_empty() || self.handle.shared.woken.load(Acquire)
419419
}
420420

421421
fn park_internal(
@@ -724,8 +724,20 @@ impl Wake for Handle {
724724

725725
/// Wake by reference
726726
fn wake_by_ref(arc_self: &Arc<Self>) {
727-
arc_self.shared.woken.store(true, Release);
728-
arc_self.driver.unpark();
727+
let already_woken = arc_self.shared.woken.swap(true, Release);
728+
729+
if !already_woken {
730+
use scheduler::Context::CurrentThread;
731+
732+
// If we are already running on the runtime, then it's not required to wake up the
733+
// runtime.
734+
context::with_scheduler(|maybe_cx| match maybe_cx {
735+
Some(CurrentThread(cx)) if Arc::ptr_eq(arc_self, &cx.handle) => {}
736+
_ => {
737+
arc_self.driver.unpark();
738+
}
739+
});
740+
}
729741
}
730742
}
731743

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
#![warn(rust_2018_idioms)]
2+
#![cfg(feature = "full")]
3+
#![cfg(not(target_os = "wasi"))] // Wasi doesn't support threads
4+
5+
use std::sync::atomic::{AtomicBool, Ordering};
6+
use std::sync::Arc;
7+
use tokio::runtime::Builder;
8+
use tokio::sync::Notify;
9+
10+
#[test]
11+
fn before_park_wakes_block_on_task() {
12+
let notify = Arc::new(Notify::new());
13+
let notify2 = notify.clone();
14+
let woken = Arc::new(AtomicBool::new(false));
15+
let woken2 = woken.clone();
16+
17+
let rt = Builder::new_current_thread()
18+
.enable_all()
19+
.on_thread_park(move || {
20+
// Only wake once to avoid busy loop if something goes wrong,
21+
// though in this test we expect it to unpark immediately.
22+
if !woken2.swap(true, Ordering::SeqCst) {
23+
notify2.notify_one();
24+
}
25+
})
26+
.build()
27+
.unwrap();
28+
29+
rt.block_on(async {
30+
// This will block until `notify` is notified.
31+
// `before_park` should run when the runtime is about to park.
32+
// It will notify `notify`, which should wake this task.
33+
// The runtime should then see the task is woken and NOT park.
34+
notify.notified().await;
35+
});
36+
37+
assert!(woken.load(Ordering::SeqCst));
38+
}
39+
40+
#[test]
41+
fn before_park_spawns_task() {
42+
let notify = Arc::new(Notify::new());
43+
let notify2 = notify.clone();
44+
let woken = Arc::new(AtomicBool::new(false));
45+
let woken2 = woken.clone();
46+
47+
let rt = Builder::new_current_thread()
48+
.enable_all()
49+
.on_thread_park(move || {
50+
if !woken2.swap(true, Ordering::SeqCst) {
51+
let notify = notify2.clone();
52+
tokio::spawn(async move {
53+
notify.notify_one();
54+
});
55+
}
56+
})
57+
.build()
58+
.unwrap();
59+
60+
rt.block_on(async {
61+
// This will block until `notify` is notified.
62+
// `before_park` should run when the runtime is about to park.
63+
// It will spawn a task that notifies `notify`.
64+
// The runtime should see the new task and NOT park.
65+
// If it parks, it will deadlock.
66+
notify.notified().await;
67+
});
68+
69+
assert!(woken.load(Ordering::SeqCst));
70+
}
71+
72+
#[test]
73+
fn wake_from_other_thread_block_on() {
74+
let rt = Builder::new_current_thread().enable_all().build().unwrap();
75+
let handle = rt.handle().clone();
76+
let notify = Arc::new(Notify::new());
77+
let notify2 = notify.clone();
78+
79+
let th = std::thread::spawn(move || {
80+
// Give the main thread time to park
81+
std::thread::sleep(std::time::Duration::from_millis(5));
82+
handle.block_on(async move {
83+
notify2.notify_one();
84+
});
85+
});
86+
87+
rt.block_on(async {
88+
notify.notified().await;
89+
});
90+
91+
th.join().unwrap();
92+
}

tokio/tests/rt_metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ fn worker_park_unpark_count() {
152152
let metrics = rt.metrics();
153153
rt.block_on(rt.spawn(async {})).unwrap();
154154
drop(rt);
155-
assert!(2 <= metrics.worker_park_unpark_count(0));
155+
assert_eq!(0, metrics.worker_park_unpark_count(0) % 2);
156156

157157
let rt = threaded();
158158
let metrics = rt.metrics();

0 commit comments

Comments
 (0)