Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/labeler.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
R-loom-blocking:
- tokio/src/runtime/blocking/*
- tokio/src/runtime/blocking/**/*

R-loom-sync:
- tokio/src/sync/*
Expand Down
16 changes: 16 additions & 0 deletions .github/workflows/loom.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,22 @@ permissions:
contents: read

jobs:
loom-blocking:
name: loom tokio::runtime::spawn_blocking
# base_ref is null when it's not a pull request
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-blocking') || (github.base_ref == null))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v5
- name: Install Rust ${{ env.rust_stable }}
uses: dtolnay/rust-toolchain@master
with:
toolchain: ${{ env.rust_stable }}
- uses: Swatinem/rust-cache@v2
- name: run tests
run: cargo test --lib --release --features full -- --nocapture loom_blocking
working-directory: tokio

loom-sync:
name: loom tokio::sync
# base_ref is null when it's not a pull request
Expand Down
26 changes: 18 additions & 8 deletions tokio/src/runtime/blocking/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,8 @@ impl Inner {

let mut shared = self.shared.lock();
let mut join_on_thread = None;
// is this thread currently counted in `num_idle_threads`?
let mut is_counted_idle;

'main: loop {
// BUSY
Expand All @@ -520,6 +522,8 @@ impl Inner {

// IDLE
self.metrics.inc_num_idle_threads();
// mark this thread as currently counted in `num_idle_threads`.
is_counted_idle = true;

while !shared.shutdown {
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
Expand All @@ -532,6 +536,9 @@ impl Inner {
// acknowledge it by decrementing the counter
// and transition to the BUSY state.
shared.num_notify -= 1;
// since this is a legitimate wakeup,
// the `Spawner::spawn_task` has already decremented `num_idle_threads`.
is_counted_idle = false;
Comment thread
ADD-SP marked this conversation as resolved.
break;
}

Expand Down Expand Up @@ -568,14 +575,17 @@ impl Inner {
// Thread exit
self.metrics.dec_num_threads();

// `num_idle_threads` should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
let prev_idle = self.metrics.dec_num_idle_threads();
assert_ne!(
prev_idle, 0,
"`num_idle_threads` underflowed on thread exit"
);
// Is this thread currently counted in `num_idle_threads`?
if is_counted_idle {
// `num_idle_threads` should now be tracked exactly, panic
// with a descriptive message if it is not the
// case.
let prev_idle = self.metrics.dec_num_idle_threads();
assert_ne!(
prev_idle, 0,
"`num_idle_threads` underflowed on thread exit"
);
}

if shared.shutdown && self.metrics.num_threads() == 0 {
self.condvar.notify_one();
Expand Down
40 changes: 39 additions & 1 deletion tokio/src/runtime/tests/loom_blocking.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use crate::runtime::{self, Runtime};

use std::sync::Arc;
use std::time::Duration;

#[test]
fn blocking_shutdown() {
Expand Down Expand Up @@ -75,7 +76,6 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread

#[test]
fn spawn_blocking_when_paused() {
use std::time::Duration;
loom::model(|| {
let rt = crate::runtime::Builder::new_current_thread()
.enable_time()
Expand All @@ -94,6 +94,44 @@ fn spawn_blocking_when_paused() {
});
}

#[test]
/// See <https://github.com/tokio-rs/tokio/pull/7922>
fn spawn_blocking_then_shutdown() {
loom::model(|| {
let rt = crate::runtime::Builder::new_current_thread()
.max_blocking_threads(1)
.thread_keep_alive(Duration::from_secs(7200)) // don't let the thread exit on its own
.build()
.unwrap();
let rt_hdl = rt.handle().clone();

// Currently, there is no live blocking thread,
// so `spawn_blocking` will spawn a new blocking thread.
let jh0 = rt_hdl.spawn_blocking(|| {});
loom::future::block_on(jh0).unwrap();

// Now, there is a idle blocking threads park on the condvar,
// so the following `spawn_blocking` will decrease the `num_idle_threads`
// and then notify one of the idle threads to run the task.

// this will decrease the `num_idle_threads`
// and then notify one of the idle threads to run the task.
let jh3 = rt_hdl.spawn_blocking(|| {});

// shutdown the runtime, which also shutdown the blocking pool
drop(rt);

// loom will emulate two parrel operations:
//
// 1. the blocking thread is woken up on the condvar
// 2. the main thread is waiting for the blocking thread to finish the task
//
// So, if the `num_idle_threads` is not counted correctly,
// it will trigger the assertions inside the `Inner::run` function.
let _ = loom::future::block_on(jh3);
});
}

fn mk_runtime(num_threads: usize) -> Runtime {
runtime::Builder::new_multi_thread()
.worker_threads(num_threads)
Expand Down
Loading