Skip to content

Commit eeb55c7

Browse files
authored
runtime: steal tasks from the LIFO slot (#7431)
1 parent 1fc450a commit eeb55c7

10 files changed

Lines changed: 349 additions & 56 deletions

File tree

tokio/src/runtime/builder.rs

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1257,22 +1257,21 @@ impl Builder {
12571257
/// scheduled task being polled first.
12581258
///
12591259
/// To implement this heuristic, each worker thread has a slot which
1260-
/// holds the task that should be polled next. However, this slot cannot
1261-
/// be stolen by other worker threads, which can result in lower total
1262-
/// throughput when tasks tend to have longer poll times.
1260+
/// holds the task that should be polled next. In earlier versions of
1261+
/// Tokio, this slot could not be stolen by other worker threads, which
1262+
/// can result in lower total throughput when tasks tend to have longer
1263+
/// poll times.
12631264
///
12641265
/// This configuration option will disable this heuristic resulting in
1265-
/// all scheduled tasks being pushed into the worker-local queue, which
1266-
/// is stealable.
1267-
///
1268-
/// Consider trying this option when the task "scheduled" time is high
1269-
/// but the runtime is underutilized. Use [tokio-rs/tokio-metrics] to
1270-
/// collect this data.
1266+
/// all scheduled tasks being pushed into the worker-local queue. This
1267+
/// was intended as a workaround for the LIFO slot not being stealable.
1268+
/// As of Tokio 1.51, tasks can be stolen from the LIFO slot. In a
1269+
/// future version, this option may be deprecated.
12711270
///
12721271
/// # Unstable
12731272
///
1274-
/// This configuration option is considered a workaround for the LIFO
1275-
/// slot not being stealable. When the slot becomes stealable, we will
1273+
/// This configuration option was considered a workaround for the LIFO
1274+
/// slot not being stealable. Since this is no longer the case, we will
12761275
/// revisit whether or not this option is necessary. See
12771276
/// issue [tokio-rs/tokio#4941].
12781277
///

tokio/src/runtime/config.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,13 @@ pub(crate) struct Config {
3434

3535
/// The multi-threaded scheduler includes a per-worker LIFO slot used to
3636
/// store the last scheduled task. This can improve certain usage patterns,
37-
/// especially message passing between tasks. However, this LIFO slot is not
38-
/// currently stealable.
37+
/// especially message passing between tasks.
3938
///
40-
/// Eventually, the LIFO slot **will** become stealable, however as a
41-
/// stop-gap, this unstable option lets users disable the LIFO task.
39+
/// In Tokio versions before 1.51, tasks in the LIFO slot could not be
40+
/// stolen, which could cause issues in applications with long poll times.
41+
/// As a stop-gap, this unstable option lets users disable the LIFO task.
42+
/// Now that the LIFO slot is stealable, we may remove this option in a
43+
/// future version.
4244
pub(crate) disable_lifo_slot: bool,
4345

4446
/// Random number generator seed to configure runtimes to act in a

tokio/src/runtime/mod.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,8 @@
367367
//! three times in a row, it is temporarily disabled until the worker thread has
368368
//! scheduled a task that didn't come from the lifo slot. The lifo slot can be
369369
//! disabled using the [`disable_lifo_slot`] setting. The lifo slot is separate
370-
//! from the local queue, so other worker threads cannot steal the task in the
371-
//! lifo slot.
370+
//! from the local queue, and is stolen from by other worker threads only when
371+
//! a worker's local queue has been drained.
372372
//!
373373
//! When a task is woken from a thread that is not a worker thread, then the
374374
//! task is placed in the global queue.

tokio/src/runtime/scheduler/multi_thread/queue.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,13 @@ pub(crate) struct Inner<T: 'static> {
5252
/// Only updated by producer thread but read by many threads.
5353
tail: AtomicUnsignedShort,
5454

55+
/// When a task is scheduled from a worker, it is stored in this slot. The
56+
/// worker will check this slot for a task **before** checking the run
57+
/// queue. This effectively results in the **last** scheduled task to be run
58+
/// next (LIFO). This is an optimization for improving locality which
59+
/// benefits message passing patterns and helps to reduce latency.
60+
lifo: task::AtomicNotified<T>,
61+
5562
/// Elements
5663
buffer: Box<[UnsafeCell<MaybeUninit<task::Notified<T>>>; LOCAL_QUEUE_CAPACITY]>,
5764
}
@@ -92,6 +99,7 @@ pub(crate) fn local<T: 'static>() -> (Steal<T>, Local<T>) {
9299
let inner = Arc::new(Inner {
93100
head: AtomicUnsignedLong::new(0),
94101
tail: AtomicUnsignedShort::new(0),
102+
lifo: task::AtomicNotified::empty(),
95103
buffer: make_fixed_size(buffer.into_boxed_slice()),
96104
});
97105

@@ -108,9 +116,10 @@ impl<T> Local<T> {
108116
/// Returns the number of entries in the queue
109117
pub(crate) fn len(&self) -> usize {
110118
let (_, head) = unpack(self.inner.head.load(Acquire));
119+
let lifo = self.inner.lifo.is_some() as usize;
111120
// safety: this is the **only** thread that updates this cell.
112121
let tail = unsafe { self.inner.tail.unsync_load() };
113-
len(head, tail)
122+
len(head, tail) + lifo
114123
}
115124

116125
/// How many tasks can be pushed into the queue
@@ -388,14 +397,28 @@ impl<T> Local<T> {
388397

389398
Some(self.inner.buffer[idx].with(|ptr| unsafe { ptr::read(ptr).assume_init() }))
390399
}
400+
401+
/// Pushes a task to the LIFO slot, returning the task previously in the
402+
/// LIFO slot (if there was one).
403+
pub(crate) fn push_lifo(&self, task: task::Notified<T>) -> Option<task::Notified<T>> {
404+
self.inner.lifo.swap(Some(task))
405+
}
406+
407+
/// Pops the task currently held in the LIFO slot, if there is one;
408+
/// otherwise, returns `None`.
409+
pub(crate) fn pop_lifo(&self) -> Option<task::Notified<T>> {
410+
// LIFO-suction!
411+
self.inner.lifo.take()
412+
}
391413
}
392414

393415
impl<T> Steal<T> {
394416
/// Returns the number of entries in the queue
395417
pub(crate) fn len(&self) -> usize {
396418
let (_, head) = unpack(self.0.head.load(Acquire));
397419
let tail = self.0.tail.load(Acquire);
398-
len(head, tail)
420+
let lifo = self.0.lifo.is_some() as usize;
421+
len(head, tail) + lifo
399422
}
400423

401424
/// Return true if the queue is empty,
@@ -430,8 +453,14 @@ impl<T> Steal<T> {
430453
let mut n = self.steal_into2(dst, dst_tail);
431454

432455
if n == 0 {
433-
// No tasks were stolen
434-
return None;
456+
// If no tasks were stolen, let's see if there's one in the LIFO
457+
// slot.
458+
let lifo = self.0.lifo.take();
459+
if lifo.is_some() {
460+
dst_stats.incr_steal_count(1);
461+
dst_stats.incr_steal_operations();
462+
}
463+
return lifo;
435464
}
436465

437466
dst_stats.incr_steal_count(n as u16);
@@ -569,6 +598,7 @@ impl<T> Drop for Local<T> {
569598
fn drop(&mut self) {
570599
if !std::thread::panicking() {
571600
assert!(self.pop().is_none(), "queue not empty");
601+
assert!(self.pop_lifo().is_none(), "LIFO slot not empty");
572602
}
573603
}
574604
}

tokio/src/runtime/scheduler/multi_thread/worker.rs

Lines changed: 10 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -112,13 +112,6 @@ struct Core {
112112
/// Used to schedule bookkeeping tasks every so often.
113113
tick: u32,
114114

115-
/// When a task is scheduled from a worker, it is stored in this slot. The
116-
/// worker will check this slot for a task **before** checking the run
117-
/// queue. This effectively results in the **last** scheduled task to be run
118-
/// next (LIFO). This is an optimization for improving locality which
119-
/// benefits message passing patterns and helps to reduce latency.
120-
lifo_slot: Option<Notified>,
121-
122115
/// When `true`, locally scheduled tasks go to the LIFO slot. When `false`,
123116
/// they go to the back of the `run_queue`.
124117
lifo_enabled: bool,
@@ -280,7 +273,6 @@ pub(super) fn create(
280273

281274
cores.push(Box::new(Core {
282275
tick: 0,
283-
lifo_slot: None,
284276
lifo_enabled: !config.disable_lifo_slot,
285277
run_queue,
286278
#[cfg(all(tokio_unstable, feature = "time"))]
@@ -440,7 +432,7 @@ where
440432
// If we heavily call `spawn_blocking`, there might be no available thread to
441433
// run this core. Except for the task in the lifo_slot, all tasks can be
442434
// stolen, so we move the task out of the lifo_slot to the run_queue.
443-
if let Some(task) = core.lifo_slot.take() {
435+
if let Some(task) = core.run_queue.pop_lifo() {
444436
core.run_queue
445437
.push_back_or_overflow(task, &*cx.worker.handle, &mut core.stats);
446438
}
@@ -670,7 +662,7 @@ impl Context {
670662
};
671663

672664
// Check for a task in the LIFO slot
673-
let task = match core.lifo_slot.take() {
665+
let task = match core.run_queue.pop_lifo() {
674666
Some(task) => task,
675667
None => {
676668
self.reset_lifo_enabled(&mut core);
@@ -1079,7 +1071,7 @@ impl Core {
10791071
}
10801072

10811073
fn next_local_task(&mut self) -> Option<Notified> {
1082-
self.lifo_slot.take().or_else(|| self.run_queue.pop())
1074+
self.run_queue.pop_lifo().or_else(|| self.run_queue.pop())
10831075
}
10841076

10851077
/// Function responsible for stealing tasks from another worker
@@ -1135,7 +1127,7 @@ impl Core {
11351127
}
11361128

11371129
fn has_tasks(&self) -> bool {
1138-
self.lifo_slot.is_some() || self.run_queue.has_tasks()
1130+
self.run_queue.has_tasks()
11391131
}
11401132

11411133
fn should_notify_others(&self) -> bool {
@@ -1144,7 +1136,7 @@ impl Core {
11441136
if self.is_searching {
11451137
return false;
11461138
}
1147-
self.lifo_slot.is_some() as usize + self.run_queue.len() > 1
1139+
self.run_queue.len() > 1
11481140
}
11491141

11501142
/// Prepares the worker state for parking.
@@ -1306,29 +1298,23 @@ impl Handle {
13061298
// task must always be pushed to the back of the queue, enabling other
13071299
// tasks to be executed. If **not** a yield, then there is more
13081300
// flexibility and the task may go to the front of the queue.
1309-
let should_notify = if is_yield || !core.lifo_enabled {
1301+
if is_yield || !core.lifo_enabled {
13101302
core.run_queue
13111303
.push_back_or_overflow(task, self, &mut core.stats);
1312-
true
13131304
} else {
13141305
// Push to the LIFO slot
1315-
let prev = core.lifo_slot.take();
1316-
let ret = prev.is_some();
1317-
1318-
if let Some(prev) = prev {
1306+
if let Some(prev) = core.run_queue.push_lifo(task) {
1307+
// There was a previous task in the LIFO slot which needs
1308+
// to be pushed to the back of the run queue.
13191309
core.run_queue
13201310
.push_back_or_overflow(prev, self, &mut core.stats);
13211311
}
1322-
1323-
core.lifo_slot = Some(task);
1324-
1325-
ret
13261312
};
13271313

13281314
// Only notify if not currently parked. If `park` is `None`, then the
13291315
// scheduling is from a resource driver. As notifications often come in
13301316
// batches, the notification is delayed until the park is complete.
1331-
if should_notify && core.park.is_some() {
1317+
if core.park.is_some() {
13321318
self.notify_parked_local();
13331319
}
13341320
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use crate::loom::sync::atomic::AtomicPtr;
2+
use crate::runtime::task::{Header, Notified, RawTask};
3+
4+
use std::marker::PhantomData;
5+
use std::ptr;
6+
use std::ptr::NonNull;
7+
use std::sync::atomic::Ordering::SeqCst;
8+
9+
/// An atomic cell which can contain a pointer to a [`Notified`] task.
10+
///
11+
/// This is similar to the `crate::util::AtomicCell` type, but specialized to
12+
/// hold a task pointer --- this type "remembers" the task's scheduler generic
13+
/// when a task is stored in the cell, so that the pointer can be turned back
14+
/// into a [`Notified`] task with the correct generic type when it is retrieved.
15+
pub(crate) struct AtomicNotified<S: 'static> {
16+
task: AtomicPtr<Header>,
17+
_scheduler: PhantomData<S>,
18+
}
19+
20+
impl<S: 'static> AtomicNotified<S> {
21+
pub(crate) fn empty() -> Self {
22+
Self {
23+
task: AtomicPtr::new(ptr::null_mut()),
24+
_scheduler: PhantomData,
25+
}
26+
}
27+
28+
pub(crate) fn swap(&self, task: Option<Notified<S>>) -> Option<Notified<S>> {
29+
let new = task
30+
.map(|t| t.into_raw().header_ptr().as_ptr())
31+
.unwrap_or_else(ptr::null_mut);
32+
let old = self.task.swap(new, SeqCst);
33+
NonNull::new(old).map(|ptr| unsafe {
34+
// Safety: since we only allow tasks with the same scheduler type to
35+
// be placed in this cell, we know that the pointed task's scheduler
36+
// type matches the type parameter S.
37+
Notified::from_raw(RawTask::from_raw(ptr))
38+
})
39+
}
40+
41+
pub(crate) fn take(&self) -> Option<Notified<S>> {
42+
self.swap(None)
43+
}
44+
45+
pub(crate) fn is_some(&self) -> bool {
46+
!self.task.load(SeqCst).is_null()
47+
}
48+
}
49+
50+
unsafe impl<S: Send> Send for AtomicNotified<S> {}
51+
unsafe impl<S: Send> Sync for AtomicNotified<S> {}
52+
53+
impl<S> Drop for AtomicNotified<S> {
54+
fn drop(&mut self) {
55+
// Ensure the task reference is dropped if this cell is dropped.
56+
let _ = self.take();
57+
}
58+
}

tokio/src/runtime/task/mod.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,11 @@ pub(crate) use self::raw::RawTask;
209209
mod state;
210210
use self::state::State;
211211

212+
#[cfg(feature = "rt-multi-thread")]
213+
mod atomic_notified;
214+
#[cfg(feature = "rt-multi-thread")]
215+
pub(crate) use self::atomic_notified::AtomicNotified;
216+
212217
mod waker;
213218

214219
pub(crate) use self::spawn_location::SpawnLocation;

0 commit comments

Comments
 (0)