Skip to content

Commit f21f269

Browse files
authored
runtime: fix race condition during the blocking pool shutdown (#7922)
1 parent d81e8f0 commit f21f269

4 files changed

Lines changed: 76 additions & 9 deletions

File tree

.github/labeler.yml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,6 @@
1+
R-loom-blocking:
2+
- tokio/src/runtime/blocking/*
3+
- tokio/src/runtime/blocking/**/*
14

25
R-loom-sync:
36
- tokio/src/sync/*

.github/workflows/loom.yml

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,22 @@ permissions:
2323
contents: read
2424

2525
jobs:
26+
loom-blocking:
27+
name: loom tokio::runtime::spawn_blocking
28+
# base_ref is null when it's not a pull request
29+
if: github.repository_owner == 'tokio-rs' && (contains(github.event.pull_request.labels.*.name, 'R-loom-blocking') || (github.base_ref == null))
30+
runs-on: ubuntu-latest
31+
steps:
32+
- uses: actions/checkout@v5
33+
- name: Install Rust ${{ env.rust_stable }}
34+
uses: dtolnay/rust-toolchain@master
35+
with:
36+
toolchain: ${{ env.rust_stable }}
37+
- uses: Swatinem/rust-cache@v2
38+
- name: run tests
39+
run: cargo test --lib --release --features full -- --nocapture loom_blocking
40+
working-directory: tokio
41+
2642
loom-sync:
2743
name: loom tokio::sync
2844
# base_ref is null when it's not a pull request

tokio/src/runtime/blocking/pool.rs

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -507,6 +507,8 @@ impl Inner {
507507

508508
let mut shared = self.shared.lock();
509509
let mut join_on_thread = None;
510+
// is this thread currently counted in `num_idle_threads`?
511+
let mut is_counted_idle;
510512

511513
'main: loop {
512514
// BUSY
@@ -520,6 +522,8 @@ impl Inner {
520522

521523
// IDLE
522524
self.metrics.inc_num_idle_threads();
525+
// mark this thread as currently counted in `num_idle_threads`.
526+
is_counted_idle = true;
523527

524528
while !shared.shutdown {
525529
let lock_result = self.condvar.wait_timeout(shared, self.keep_alive).unwrap();
@@ -532,6 +536,9 @@ impl Inner {
532536
// acknowledge it by decrementing the counter
533537
// and transition to the BUSY state.
534538
shared.num_notify -= 1;
539+
// since this is a legitimate wakeup,
540+
// the `Spawner::spawn_task` has already decremented `num_idle_threads`.
541+
is_counted_idle = false;
535542
break;
536543
}
537544

@@ -568,14 +575,17 @@ impl Inner {
568575
// Thread exit
569576
self.metrics.dec_num_threads();
570577

571-
// `num_idle_threads` should now be tracked exactly, panic
572-
// with a descriptive message if it is not the
573-
// case.
574-
let prev_idle = self.metrics.dec_num_idle_threads();
575-
assert_ne!(
576-
prev_idle, 0,
577-
"`num_idle_threads` underflowed on thread exit"
578-
);
578+
// Is this thread currently counted in `num_idle_threads`?
579+
if is_counted_idle {
580+
// `num_idle_threads` should now be tracked exactly, panic
581+
// with a descriptive message if it is not the
582+
// case.
583+
let prev_idle = self.metrics.dec_num_idle_threads();
584+
assert_ne!(
585+
prev_idle, 0,
586+
"`num_idle_threads` underflowed on thread exit"
587+
);
588+
}
579589

580590
if shared.shutdown && self.metrics.num_threads() == 0 {
581591
self.condvar.notify_one();

tokio/src/runtime/tests/loom_blocking.rs

Lines changed: 39 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use crate::runtime::{self, Runtime};
22

33
use std::sync::Arc;
4+
use std::time::Duration;
45

56
#[test]
67
fn blocking_shutdown() {
@@ -75,7 +76,6 @@ fn spawn_mandatory_blocking_should_run_even_when_shutting_down_from_other_thread
7576

7677
#[test]
7778
fn spawn_blocking_when_paused() {
78-
use std::time::Duration;
7979
loom::model(|| {
8080
let rt = crate::runtime::Builder::new_current_thread()
8181
.enable_time()
@@ -94,6 +94,44 @@ fn spawn_blocking_when_paused() {
9494
});
9595
}
9696

97+
#[test]
98+
/// See <https://github.com/tokio-rs/tokio/pull/7922>
99+
fn spawn_blocking_then_shutdown() {
100+
loom::model(|| {
101+
let rt = crate::runtime::Builder::new_current_thread()
102+
.max_blocking_threads(1)
103+
.thread_keep_alive(Duration::from_secs(7200)) // don't let the thread exit on its own
104+
.build()
105+
.unwrap();
106+
let rt_hdl = rt.handle().clone();
107+
108+
// Currently, there is no live blocking thread,
109+
// so `spawn_blocking` will spawn a new blocking thread.
110+
let jh0 = rt_hdl.spawn_blocking(|| {});
111+
loom::future::block_on(jh0).unwrap();
112+
113+
// Now, there is a idle blocking threads park on the condvar,
114+
// so the following `spawn_blocking` will decrease the `num_idle_threads`
115+
// and then notify one of the idle threads to run the task.
116+
117+
// this will decrease the `num_idle_threads`
118+
// and then notify one of the idle threads to run the task.
119+
let jh3 = rt_hdl.spawn_blocking(|| {});
120+
121+
// shutdown the runtime, which also shutdown the blocking pool
122+
drop(rt);
123+
124+
// loom will emulate two parrel operations:
125+
//
126+
// 1. the blocking thread is woken up on the condvar
127+
// 2. the main thread is waiting for the blocking thread to finish the task
128+
//
129+
// So, if the `num_idle_threads` is not counted correctly,
130+
// it will trigger the assertions inside the `Inner::run` function.
131+
let _ = loom::future::block_on(jh3);
132+
});
133+
}
134+
97135
fn mk_runtime(num_threads: usize) -> Runtime {
98136
runtime::Builder::new_multi_thread()
99137
.worker_threads(num_threads)

0 commit comments

Comments
 (0)