11//! Thread pool for blocking operations
22
3- use crate :: loom:: sync:: { Arc , Mutex } ;
3+ use crate :: loom:: sync:: { Arc , Condvar , Mutex } ;
44use crate :: loom:: thread;
55use crate :: runtime:: blocking:: schedule:: BlockingSchedule ;
6- use crate :: runtime:: blocking:: sharded_queue:: { ShardedQueue , WaitResult } ;
76use crate :: runtime:: blocking:: { shutdown, BlockingTask } ;
87use crate :: runtime:: builder:: ThreadNameFn ;
98use crate :: runtime:: task:: { self , JoinHandle } ;
109use crate :: runtime:: { Builder , Callback , Handle , BOX_FUTURE_THRESHOLD } ;
1110use crate :: util:: metric_atomics:: MetricAtomicUsize ;
1211use crate :: util:: trace:: { blocking_task, SpawnMeta } ;
1312
14- use std:: collections:: HashMap ;
13+ use std:: collections:: { HashMap , VecDeque } ;
1514use std:: fmt;
1615use std:: io;
1716use std:: sync:: atomic:: Ordering ;
@@ -75,12 +74,12 @@ impl SpawnerMetrics {
7574}
7675
7776struct Inner {
78- /// Sharded queue for task distribution.
79- queue : ShardedQueue ,
80-
81- /// State shared between worker threads (thread management only).
77+ /// State shared between worker threads.
8278 shared : Mutex < Shared > ,
8379
80+ /// Pool threads wait on this.
81+ condvar : Condvar ,
82+
8483 /// Spawned threads use this name.
8584 thread_name : ThreadNameFn ,
8685
@@ -104,6 +103,8 @@ struct Inner {
104103}
105104
106105struct Shared {
106+ queue : VecDeque < Task > ,
107+ num_notify : u32 ,
107108 shutdown : bool ,
108109 shutdown_tx : Option < shutdown:: Sender > ,
109110 /// Prior to shutdown, we clean up `JoinHandles` by having each timed-out
@@ -213,14 +214,16 @@ impl BlockingPool {
213214 BlockingPool {
214215 spawner : Spawner {
215216 inner : Arc :: new ( Inner {
216- queue : ShardedQueue :: new ( ) ,
217217 shared : Mutex :: new ( Shared {
218+ queue : VecDeque :: new ( ) ,
219+ num_notify : 0 ,
218220 shutdown : false ,
219221 shutdown_tx : Some ( shutdown_tx) ,
220222 last_exiting_thread : None ,
221223 worker_threads : HashMap :: new ( ) ,
222224 worker_thread_index : 0 ,
223225 } ) ,
226+ condvar : Condvar :: new ( ) ,
224227 thread_name : builder. thread_name . clone ( ) ,
225228 stack_size : builder. thread_stack_size ,
226229 after_start : builder. after_start . clone ( ) ,
@@ -250,7 +253,7 @@ impl BlockingPool {
250253
251254 shared. shutdown = true ;
252255 shared. shutdown_tx = None ;
253- self . spawner . inner . queue . shutdown ( ) ;
256+ self . spawner . inner . condvar . notify_all ( ) ;
254257
255258 let last_exited_thread = std:: mem:: take ( & mut shared. last_exiting_thread ) ;
256259 let workers = std:: mem:: take ( & mut shared. worker_threads ) ;
@@ -388,8 +391,9 @@ impl Spawner {
388391 }
389392
390393 fn spawn_task ( & self , task : Task , rt : & Handle ) -> Result < ( ) , SpawnError > {
391- // Check shutdown without holding the lock
392- if self . inner . queue . is_shutdown ( ) {
394+ let mut shared = self . inner . shared . lock ( ) ;
395+
396+ if shared. shutdown {
393397 // Shutdown the task: it's fine to shutdown this task (even if
394398 // mandatory) because it was scheduled after the shutdown of the
395399 // runtime began.
@@ -399,64 +403,52 @@ impl Spawner {
399403 return Err ( SpawnError :: ShuttingDown ) ;
400404 }
401405
402- // Push to the sharded queue
403- self . inner . queue . push ( task) ;
406+ shared. queue . push_back ( task) ;
404407 self . inner . metrics . inc_queue_depth ( ) ;
405408
406- // Check if we need to spawn a new thread or notify an idle one
407409 if self . inner . metrics . num_idle_threads ( ) == 0 {
408- // No idle threads - might need to spawn one
409- if self . inner . metrics . num_threads ( ) < self . inner . thread_cap {
410- // Try to spawn a new thread
411- let mut shared = self . inner . shared . lock ( ) ;
412-
413- // Double-check conditions after acquiring the lock
414- if shared. shutdown {
415- // Shutdown raced with our push. The task is in the
416- // sharded queue but workers may have already exited.
417- // Drain it here so mandatory tasks still run.
418- drop ( shared) ;
419- while let Some ( task) = self . inner . queue . pop ( 0 ) {
420- self . inner . metrics . dec_queue_depth ( ) ;
421- task. shutdown_or_run_if_mandatory ( ) ;
422- }
423- return Ok ( ( ) ) ;
424- }
410+ // No threads are able to process the task.
411+
412+ if self . inner . metrics . num_threads ( ) == self . inner . thread_cap {
413+ // At max number of threads
414+ } else {
415+ assert ! ( shared. shutdown_tx. is_some( ) ) ;
416+ let shutdown_tx = shared. shutdown_tx . clone ( ) ;
417+
418+ if let Some ( shutdown_tx) = shutdown_tx {
419+ let id = shared. worker_thread_index ;
425420
426- // Re-check thread count (another thread might have spawned one)
427- if self . inner . metrics . num_threads ( ) < self . inner . thread_cap {
428- if let Some ( shutdown_tx) = shared. shutdown_tx . clone ( ) {
429- let id = shared. worker_thread_index ;
430-
431- match self . spawn_thread ( shutdown_tx, rt, id) {
432- Ok ( handle) => {
433- self . inner . metrics . inc_num_threads ( ) ;
434- shared. worker_thread_index += 1 ;
435- shared. worker_threads . insert ( id, handle) ;
436- }
437- Err ( ref e)
438- if is_temporary_os_thread_error ( e)
439- && self . inner . metrics . num_threads ( ) > 0 =>
440- {
441- // OS temporarily failed to spawn a new thread.
442- // The task will be picked up eventually by a currently
443- // busy thread.
444- }
445- Err ( e) => {
446- // The OS refused to spawn the thread and there is no thread
447- // to pick up the task that has just been pushed to the queue.
448- return Err ( SpawnError :: NoThreads ( e) ) ;
449- }
421+ match self . spawn_thread ( shutdown_tx, rt, id) {
422+ Ok ( handle) => {
423+ self . inner . metrics . inc_num_threads ( ) ;
424+ shared. worker_thread_index += 1 ;
425+ shared. worker_threads . insert ( id, handle) ;
426+ }
427+ Err ( ref e)
428+ if is_temporary_os_thread_error ( e)
429+ && self . inner . metrics . num_threads ( ) > 0 =>
430+ {
431+ // OS temporarily failed to spawn a new thread.
432+ // The task will be picked up eventually by a currently
433+ // busy thread.
434+ }
435+ Err ( e) => {
436+ // The OS refused to spawn the thread and there is no thread
437+ // to pick up the task that has just been pushed to the queue.
438+ return Err ( SpawnError :: NoThreads ( e) ) ;
450439 }
451440 }
452441 }
453- } else {
454- // At max threads, notify anyway in case threads are waiting
455- self . inner . queue . notify_one ( ) ;
456442 }
457443 } else {
458- // There are idle threads waiting, notify one
459- self . inner . queue . notify_one ( ) ;
444+ // Notify an idle worker thread. The notification counter
445+ // is used to count the needed amount of notifications
446+ // exactly. Thread libraries may generate spurious
447+ // wakeups, this counter is used to keep us in a
448+ // consistent state.
449+ self . inner . metrics . dec_num_idle_threads ( ) ;
450+ shared. num_notify += 1 ;
451+ self . inner . condvar . notify_one ( ) ;
460452 }
461453
462454 Ok ( ( ) )
@@ -513,62 +505,94 @@ impl Inner {
513505 f ( ) ;
514506 }
515507
516- // Use worker_thread_id as the preferred shard
517- let preferred_shard = worker_thread_id;
508+ let mut shared = self . shared . lock ( ) ;
518509 let mut join_on_thread = None ;
510+ // is this thread currently counted in `num_idle_threads`?
511+ let mut is_counted_idle;
519512
520513 ' main: loop {
521- // BUSY: Process tasks from the queue
522- while let Some ( task) = self . queue . pop ( preferred_shard ) {
514+ // BUSY
515+ while let Some ( task) = shared . queue . pop_front ( ) {
523516 self . metrics . dec_queue_depth ( ) ;
517+ drop ( shared) ;
524518 task. run ( ) ;
525- }
526519
527- // Check for shutdown before going idle
528- if self . queue . is_shutdown ( ) {
529- break ;
520+ shared = self . shared . lock ( ) ;
530521 }
531522
532- // IDLE: Wait for new tasks (spurious wakeups handled internally)
523+ // IDLE
533524 self . metrics . inc_num_idle_threads ( ) ;
534-
535- match self . queue . wait_for_task ( preferred_shard, self . keep_alive ) {
536- WaitResult :: Task ( task) => {
537- self . metrics . dec_num_idle_threads ( ) ;
538- self . metrics . dec_queue_depth ( ) ;
539- task. run ( ) ;
540- }
541- WaitResult :: Shutdown => {
542- self . metrics . dec_num_idle_threads ( ) ;
543- break ' main;
525+ // mark this thread as currently counted in `num_idle_threads`.
526+ is_counted_idle = true ;
527+
528+ while !shared. shutdown {
529+ let lock_result = self . condvar . wait_timeout ( shared, self . keep_alive ) . unwrap ( ) ;
530+
531+ shared = lock_result. 0 ;
532+ let timeout_result = lock_result. 1 ;
533+
534+ if shared. num_notify != 0 {
535+ // We have received a legitimate wakeup,
536+ // acknowledge it by decrementing the counter
537+ // and transition to the BUSY state.
538+ shared. num_notify -= 1 ;
539+ // since this is a legitimate wakeup,
540+ // the `Spawner::spawn_task` has already decremented `num_idle_threads`.
541+ is_counted_idle = false ;
542+ break ;
544543 }
545- WaitResult :: Timeout => {
546- self . metrics . dec_num_idle_threads ( ) ;
547-
548- // Clean up thread handle
549- let mut shared = self . shared . lock ( ) ;
550- if !shared. shutdown {
551- let my_handle = shared. worker_threads . remove ( & worker_thread_id) ;
552- join_on_thread =
553- std:: mem:: replace ( & mut shared. last_exiting_thread , my_handle) ;
554- }
544+
545+ // Even if the condvar "timed out", if the pool is entering the
546+ // shutdown phase, we want to perform the cleanup logic.
547+ if !shared. shutdown && timeout_result. timed_out ( ) {
548+ // We'll join the prior timed-out thread's JoinHandle after dropping the lock.
549+ // This isn't done when shutting down, because the thread calling shutdown will
550+ // handle joining everything.
551+ let my_handle = shared. worker_threads . remove ( & worker_thread_id) ;
552+ join_on_thread = std:: mem:: replace ( & mut shared. last_exiting_thread , my_handle) ;
555553
556554 break ' main;
557555 }
556+
557+ // Spurious wakeup detected, go back to sleep.
558558 }
559- }
560559
561- // Drain remaining tasks if shutting down
562- if self . queue . is_shutdown ( ) {
563- while let Some ( task) = self . queue . pop ( preferred_shard) {
564- self . metrics . dec_queue_depth ( ) ;
565- task. shutdown_or_run_if_mandatory ( ) ;
560+ if shared. shutdown {
561+ // Drain the queue
562+ while let Some ( task) = shared. queue . pop_front ( ) {
563+ self . metrics . dec_queue_depth ( ) ;
564+ drop ( shared) ;
565+
566+ task. shutdown_or_run_if_mandatory ( ) ;
567+
568+ shared = self . shared . lock ( ) ;
569+ }
570+
571+ break ;
566572 }
567573 }
568574
569575 // Thread exit
570576 self . metrics . dec_num_threads ( ) ;
571577
578+ // Is this thread currently counted in `num_idle_threads`?
579+ if is_counted_idle {
580+ // `num_idle_threads` should now be tracked exactly, panic
581+ // with a descriptive message if it is not the
582+ // case.
583+ let prev_idle = self . metrics . dec_num_idle_threads ( ) ;
584+ assert_ne ! (
585+ prev_idle, 0 ,
586+ "`num_idle_threads` underflowed on thread exit"
587+ ) ;
588+ }
589+
590+ if shared. shutdown && self . metrics . num_threads ( ) == 0 {
591+ self . condvar . notify_one ( ) ;
592+ }
593+
594+ drop ( shared) ;
595+
572596 if let Some ( f) = & self . before_stop {
573597 f ( ) ;
574598 }
0 commit comments