Skip to content

Commit 356d605

Browse files
sokraclaude
authored andcommitted
turbo-tasks-backend: stability fixes for task cancellation and error handling (#92254)
Bug fixes and a refactoring in `turbo-tasks-backend` targeting stability issues that surface when filesystem caching is enabled: 1. **Preserve `cell_type_max_index` on task error** — when a task fails partway through execution, `cell_counters` only reflects the partially-executed state. Previously, `cell_type_max_index` was updated from these incomplete counters, which removed entries for cell types not yet encountered. This caused `"Cell no longer exists"` hard errors for tasks that still held dependencies on those cells. The fix skips the `cell_type_max_index` update on error, keeping it consistent with the preserved cell data (which already wasn't cleared on error). This bug manifested specifically with `serialization = "hash"` cell types (e.g. `FileContent`), where cell data is transient and readers fall back to `cell_type_max_index` to decide whether to schedule recomputation. 2. **Fix shutdown hang and cache poisoning for cancelled tasks** — three related fixes for tasks cancelled during shutdown: - `task_execution_canceled` now drains and notifies all `InProgressCellState` events, preventing `stop_and_wait` from hanging on foreground jobs waiting on cells that will never be filled. - `try_read_task_cell` bails early (before calling `listen_to_cell`) when a task is in `Canceled` state, avoiding pointless listener registrations that would never resolve. - Cancelled tasks are marked as session-dependent dirty, preventing cache poisoning where `"was canceled"` errors get persisted as task output and break subsequent builds. The session-dependent dirty flag causes the task to re-execute in the next session, invalidating stale dependents. 3. **Extract `update_dirty_state` helper on `TaskGuard`** — the "read old dirty state → apply new state → propagate via `ComputeDirtyAndCleanUpdate`" pattern was duplicated between `task_execution_canceled` and `task_execution_completed_finish`. The new `update_dirty_state` default method on `TaskGuard` handles both transitions (to `SessionDependent` or to `None`) and returns the aggregation job + `ComputeDirtyAndCleanUpdateResult` for callers that need post-processing (e.g. firing the `all_clean_event`). These bugs caused observable failures when using Turbopack with filesystem caching (`--cache` / persistent cache): - `"Cell no longer exists"` panics/errors on incremental rebuilds after a task error. - Hangs on `stop_and_wait` during dev server shutdown. - Stale `"was canceled"` errors persisted in the cache breaking subsequent builds until the cache is cleared. Changes are in `turbopack/crates/turbo-tasks-backend/src/backend/`: **`mod.rs`:** - Guard the `cell_type_max_index` update block inside `if result.is_ok()` to skip it on error, with a cross-reference comment to `task_execution_completed_cleanup` (which similarly skips cell data removal on error — the two must stay in sync). - Move the `is_cancelled` bail in `try_read_task_cell` before the `listen_to_cell` call to avoid inserting phantom `InProgressCellState` events that would never be notified. - In `task_execution_canceled`: switch to `TaskDataCategory::All` (needed for dirty state metadata access), notify all pending in-progress cell events, and mark the task as `SessionDependent` dirty via the new helper. - In `task_execution_completed_finish`: replace ~77 lines of inline dirty state logic with a call to `task.update_dirty_state(new_dirtyness)`, preserving the `all_clean_event` post-processing and the `dirty_changed` variable under `#[cfg(feature = "verify_determinism")]`. **`operation/mod.rs`:** - Add `update_dirty_state` default method on `TaskGuard` trait (~60 lines), co-located with the existing `dirty_state()` reader. Takes `Option<Dirtyness>`, applies the transition, builds `ComputeDirtyAndCleanUpdate`, and returns `(Option<AggregationUpdateJob>, ComputeDirtyAndCleanUpdateResult)`. - Add `ComputeDirtyAndCleanUpdateResult` to the public re-exports. --------- Co-authored-by: Tobias Koppers <sokra@users.noreply.github.com> Co-authored-by: Claude <noreply@anthropic.com>
1 parent 3b77a6e commit 356d605

File tree

2 files changed

+145
-100
lines changed
  • turbopack/crates/turbo-tasks-backend/src/backend

2 files changed

+145
-100
lines changed

turbopack/crates/turbo-tasks-backend/src/backend/mod.rs

Lines changed: 71 additions & 98 deletions
Original file line numberDiff line numberDiff line change
@@ -55,10 +55,10 @@ use crate::{
5555
backend::{
5656
operation::{
5757
AggregationUpdateJob, AggregationUpdateQueue, ChildExecuteContext,
58-
CleanupOldEdgesOperation, ComputeDirtyAndCleanUpdate, ConnectChildOperation,
59-
ExecuteContext, ExecuteContextImpl, LeafDistanceUpdateQueue, Operation, OutdatedEdge,
60-
TaskGuard, TaskType, connect_children, get_aggregation_number, get_uppers,
61-
is_root_node, make_task_dirty_internal, prepare_new_children,
58+
CleanupOldEdgesOperation, ConnectChildOperation, ExecuteContext, ExecuteContextImpl,
59+
LeafDistanceUpdateQueue, Operation, OutdatedEdge, TaskGuard, TaskType,
60+
connect_children, get_aggregation_number, get_uppers, is_root_node,
61+
make_task_dirty_internal, prepare_new_children,
6262
},
6363
storage::Storage,
6464
storage_schema::{TaskStorage, TaskStorageAccessors},
@@ -939,7 +939,13 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
939939
}
940940

941941
// Cell should exist, but data was dropped or is not serializable. We need to recompute the
942-
// task the get the cell content.
942+
// task to get the cell content.
943+
944+
// Bail early if the task was cancelled — no point in registering a listener
945+
// on a task that won't execute again.
946+
if is_cancelled {
947+
bail!("{} was canceled", task.get_task_description());
948+
}
943949

944950
// Listen to the cell and potentially schedule the task
945951
let (listener, new_listener) = self.listen_to_cell(&mut task, task_id, &reader_task, cell);
@@ -955,11 +961,6 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
955961
)
956962
.entered();
957963

958-
// Schedule the task, if not already scheduled
959-
if is_cancelled {
960-
bail!("{} was canceled", task.get_task_description());
961-
}
962-
963964
let _ = task.add_scheduled(
964965
TaskExecutionReason::CellNotAvailable,
965966
EventDescription::new(|| task.get_task_desc_fn()),
@@ -1812,7 +1813,7 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
18121813
turbo_tasks: &dyn TurboTasksBackendApi<TurboTasksBackend<B>>,
18131814
) {
18141815
let mut ctx = self.execute_context(turbo_tasks);
1815-
let mut task = ctx.task(task_id, TaskDataCategory::Data);
1816+
let mut task = ctx.task(task_id, TaskDataCategory::All);
18161817
if let Some(in_progress) = task.take_in_progress() {
18171818
match in_progress {
18181819
InProgressState::Scheduled {
@@ -1825,8 +1826,35 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
18251826
InProgressState::Canceled => {}
18261827
}
18271828
}
1829+
// Notify any readers waiting on in-progress cells so their listeners
1830+
// resolve and foreground jobs can finish (prevents stop_and_wait hang).
1831+
let in_progress_cells = task.take_in_progress_cells();
1832+
if let Some(ref cells) = in_progress_cells {
1833+
for state in cells.values() {
1834+
state.event.notify(usize::MAX);
1835+
}
1836+
}
1837+
1838+
// Mark the cancelled task as session-dependent dirty so it will be re-executed
1839+
// in the next session. Without this, any reader that encounters the cancelled task
1840+
// records an error in its output. That error is persisted and would poison
1841+
// subsequent builds. By marking the task session-dependent dirty, the next build
1842+
// re-executes it, which invalidates dependents and corrects the stale errors.
1843+
let data_update = if self.should_track_dependencies() && !task_id.is_transient() {
1844+
task.update_dirty_state(Some(Dirtyness::SessionDependent))
1845+
} else {
1846+
None
1847+
};
1848+
18281849
let old = task.set_in_progress(InProgressState::Canceled);
18291850
debug_assert!(old.is_none(), "InProgress already exists");
1851+
drop(task);
1852+
1853+
if let Some(data_update) = data_update {
1854+
AggregationUpdateQueue::run(data_update, &mut ctx);
1855+
}
1856+
1857+
drop(in_progress_cells);
18301858
}
18311859

18321860
fn try_start_task_execution(
@@ -2145,23 +2173,33 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
21452173
}
21462174

21472175
// handle cell counters: update max index and remove cells that are no longer used
2148-
let old_counters: FxHashMap<_, _> = task
2149-
.iter_cell_type_max_index()
2150-
.map(|(&k, &v)| (k, v))
2151-
.collect();
2152-
let mut counters_to_remove = old_counters.clone();
2153-
2154-
for (&cell_type, &max_index) in cell_counters.iter() {
2155-
if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2156-
if old_max_index != max_index {
2176+
// On error, skip this update: the task may have failed before creating all cells it
2177+
// normally creates, so cell_counters is incomplete. Clearing cell_type_max_index entries
2178+
// based on partial counters would cause "cell no longer exists" errors for tasks that
2179+
// still hold dependencies on those cells. The old cell data is preserved on error
2180+
// (see task_execution_completed_cleanup), so keeping cell_type_max_index consistent with
2181+
// that data is correct.
2182+
// NOTE: This must stay in sync with task_execution_completed_cleanup, which similarly
2183+
// skips cell data removal on error.
2184+
if result.is_ok() {
2185+
let old_counters: FxHashMap<_, _> = task
2186+
.iter_cell_type_max_index()
2187+
.map(|(&k, &v)| (k, v))
2188+
.collect();
2189+
let mut counters_to_remove = old_counters.clone();
2190+
2191+
for (&cell_type, &max_index) in cell_counters.iter() {
2192+
if let Some(old_max_index) = counters_to_remove.remove(&cell_type) {
2193+
if old_max_index != max_index {
2194+
task.insert_cell_type_max_index(cell_type, max_index);
2195+
}
2196+
} else {
21572197
task.insert_cell_type_max_index(cell_type, max_index);
21582198
}
2159-
} else {
2160-
task.insert_cell_type_max_index(cell_type, max_index);
21612199
}
2162-
}
2163-
for (cell_type, _) in counters_to_remove {
2164-
task.remove_cell_type_max_index(&cell_type);
2200+
for (cell_type, _) in counters_to_remove {
2201+
task.remove_cell_type_max_index(&cell_type);
2202+
}
21652203
}
21662204

21672205
let mut queue = AggregationUpdateQueue::new();
@@ -2563,82 +2601,15 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
25632601
}
25642602
}
25652603

2566-
// Grab the old dirty state
2567-
let old_dirtyness = task.get_dirty().cloned();
2568-
let (old_self_dirty, old_current_session_self_clean) = match old_dirtyness {
2569-
None => (false, false),
2570-
Some(Dirtyness::Dirty(_)) => (true, false),
2571-
Some(Dirtyness::SessionDependent) => {
2572-
let clean_in_current_session = task.current_session_clean();
2573-
(true, clean_in_current_session)
2574-
}
2575-
};
2576-
2577-
// Compute the new dirty state
2578-
let (new_dirtyness, new_self_dirty, new_current_session_self_clean) = if session_dependent {
2579-
(Some(Dirtyness::SessionDependent), true, true)
2580-
} else {
2581-
(None, false, false)
2582-
};
2583-
2584-
// Update the dirty state
2585-
let dirty_changed = old_dirtyness != new_dirtyness;
2586-
if dirty_changed {
2587-
if let Some(value) = new_dirtyness {
2588-
task.set_dirty(value);
2589-
} else if old_dirtyness.is_some() {
2590-
task.take_dirty();
2591-
}
2592-
}
2593-
if old_current_session_self_clean != new_current_session_self_clean {
2594-
if new_current_session_self_clean {
2595-
task.set_current_session_clean(true);
2596-
} else if old_current_session_self_clean {
2597-
task.set_current_session_clean(false);
2598-
}
2599-
}
2600-
2601-
// Propagate dirtyness changes
2602-
let data_update = if old_self_dirty != new_self_dirty
2603-
|| old_current_session_self_clean != new_current_session_self_clean
2604-
{
2605-
let dirty_container_count = task
2606-
.get_aggregated_dirty_container_count()
2607-
.cloned()
2608-
.unwrap_or_default();
2609-
let current_session_clean_container_count = task
2610-
.get_aggregated_current_session_clean_container_count()
2611-
.copied()
2612-
.unwrap_or_default();
2613-
let result = ComputeDirtyAndCleanUpdate {
2614-
old_dirty_container_count: dirty_container_count,
2615-
new_dirty_container_count: dirty_container_count,
2616-
old_current_session_clean_container_count: current_session_clean_container_count,
2617-
new_current_session_clean_container_count: current_session_clean_container_count,
2618-
old_self_dirty,
2619-
new_self_dirty,
2620-
old_current_session_self_clean,
2621-
new_current_session_self_clean,
2622-
}
2623-
.compute();
2624-
if result.dirty_count_update - result.current_session_clean_update < 0 {
2625-
// The task is clean now
2626-
if let Some(activeness_state) = task.get_activeness_mut() {
2627-
activeness_state.all_clean_event.notify(usize::MAX);
2628-
activeness_state.unset_active_until_clean();
2629-
if activeness_state.is_empty() {
2630-
task.take_activeness();
2631-
}
2632-
}
2633-
}
2634-
result
2635-
.aggregated_update(task_id)
2636-
.and_then(|aggregated_update| {
2637-
AggregationUpdateJob::data_update(&mut task, aggregated_update)
2638-
})
2604+
// Compute and apply the new dirty state, propagating to aggregating ancestors
2605+
let new_dirtyness = if session_dependent {
2606+
Some(Dirtyness::SessionDependent)
26392607
} else {
26402608
None
26412609
};
2610+
#[cfg(feature = "verify_determinism")]
2611+
let dirty_changed = task.get_dirty().cloned() != new_dirtyness;
2612+
let data_update = task.update_dirty_state(new_dirtyness);
26422613

26432614
#[cfg(feature = "verify_determinism")]
26442615
let reschedule =
@@ -2681,6 +2652,8 @@ impl<B: BackingStorage> TurboTasksBackendInner<B> {
26812652
// An error is potentially caused by a eventual consistency, so we avoid updating cells
26822653
// after an error as it is likely transient and we want to keep the dependent tasks
26832654
// clean to avoid re-executions.
2655+
// NOTE: This must stay in sync with task_execution_completed_prepare, which similarly
2656+
// skips cell_type_max_index updates on error.
26842657
if !is_error {
26852658
// Remove no longer existing cells and
26862659
// find all outdated data items (removed cells, outdated edges)

turbopack/crates/turbo-tasks-backend/src/backend/operation/mod.rs

Lines changed: 74 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use turbo_tasks::{
1818
TurboTasksCallApi, TypedSharedReference, backend::CachedTaskType,
1919
};
2020

21+
use self::aggregation_update::ComputeDirtyAndCleanUpdate;
2122
use crate::{
2223
backend::{
2324
EventDescription, OperationGuard, TaskDataCategory, TurboTasksBackend,
@@ -764,6 +765,77 @@ pub trait TaskGuard: Debug + TaskStorageAccessors {
764765
Some(Dirtyness::SessionDependent) => (true, self.current_session_clean()),
765766
}
766767
}
768+
/// Update the task's dirty state to `new_dirtyness`, applying the change to stored fields,
769+
/// computing the aggregated propagation update, and firing the `all_clean_event` if the task
770+
/// transitioned to clean.
771+
///
772+
/// Returns an optional `AggregationUpdateJob` that the caller must run via
773+
/// `AggregationUpdateQueue::run` to propagate the change to aggregating ancestors.
774+
fn update_dirty_state(
775+
&mut self,
776+
new_dirtyness: Option<Dirtyness>,
777+
) -> Option<AggregationUpdateJob>
778+
where
779+
Self: Sized,
780+
{
781+
let task_id = self.id();
782+
let old_dirtyness = self.get_dirty().cloned();
783+
let (old_self_dirty, old_current_session_self_clean) = self.dirty_state();
784+
let (new_self_dirty, new_current_session_self_clean) = match new_dirtyness {
785+
None => (false, false),
786+
Some(Dirtyness::Dirty(_)) => (true, false),
787+
Some(Dirtyness::SessionDependent) => (true, true),
788+
};
789+
if old_dirtyness != new_dirtyness {
790+
if let Some(value) = new_dirtyness {
791+
self.set_dirty(value);
792+
} else {
793+
self.take_dirty();
794+
}
795+
}
796+
if old_current_session_self_clean != new_current_session_self_clean {
797+
self.set_current_session_clean(new_current_session_self_clean);
798+
}
799+
if old_self_dirty == new_self_dirty
800+
&& old_current_session_self_clean == new_current_session_self_clean
801+
{
802+
return None;
803+
}
804+
let dirty_container_count = self
805+
.get_aggregated_dirty_container_count()
806+
.cloned()
807+
.unwrap_or_default();
808+
let current_session_clean_container_count = self
809+
.get_aggregated_current_session_clean_container_count()
810+
.copied()
811+
.unwrap_or_default();
812+
let result = ComputeDirtyAndCleanUpdate {
813+
old_dirty_container_count: dirty_container_count,
814+
new_dirty_container_count: dirty_container_count,
815+
old_current_session_clean_container_count: current_session_clean_container_count,
816+
new_current_session_clean_container_count: current_session_clean_container_count,
817+
old_self_dirty,
818+
new_self_dirty,
819+
old_current_session_self_clean,
820+
new_current_session_self_clean,
821+
}
822+
.compute();
823+
// Fire the all_clean_event if the task transitioned to clean
824+
if result.dirty_count_update - result.current_session_clean_update < 0
825+
&& let Some(activeness_state) = self.get_activeness_mut()
826+
{
827+
activeness_state.all_clean_event.notify(usize::MAX);
828+
activeness_state.unset_active_until_clean();
829+
if activeness_state.is_empty() {
830+
self.take_activeness();
831+
}
832+
}
833+
result
834+
.aggregated_update(task_id)
835+
.and_then(|aggregated_update| {
836+
AggregationUpdateJob::data_update(self, aggregated_update)
837+
})
838+
}
767839
fn dirty_containers(&self) -> impl Iterator<Item = TaskId> {
768840
self.dirty_containers_with_count()
769841
.map(|(task_id, _)| task_id)
@@ -1106,8 +1178,8 @@ impl_operation!(LeafDistanceUpdate leaf_distance_update::LeafDistanceUpdateQueue
11061178
pub use self::invalidate::TaskDirtyCause;
11071179
pub use self::{
11081180
aggregation_update::{
1109-
AggregatedDataUpdate, AggregationUpdateJob, ComputeDirtyAndCleanUpdate,
1110-
get_aggregation_number, get_uppers, is_aggregating_node, is_root_node,
1181+
AggregatedDataUpdate, AggregationUpdateJob, get_aggregation_number, get_uppers,
1182+
is_aggregating_node, is_root_node,
11111183
},
11121184
cleanup_old_edges::OutdatedEdge,
11131185
connect_children::connect_children,

0 commit comments

Comments
 (0)