Skip to content

Commit 070b8d1

Browse files
committed
Bugfix: store conflict sender before emit on local path
- Mirror the volume-side fix on the local-FS Stop branch in `conflict.rs`: store the oneshot sender in `state.conflict_resolution_tx` BEFORE emitting the `write-conflict` event, not after. A responder (the FE's `resolve_write_conflict`, or a sink that answers from within `emit_conflict`) can only answer a conflict it has observed; emitting before the slot was filled left a window where the take missed and `blocking_recv` would hang. - The store statement is self-contained (lock guard released as it ends), so the reorder changes no lock-hold ordering across the emit or the `blocking_recv`. - Add `stop_branch_store_before_emit_tests::stop_clash_answered_from_within_emit_resolves_without_hanging`: drives `resolve_conflict` directly (no runtime, so `blocking_recv` is legal) with a sink that answers synchronously inside `emit_conflict`. Passes instantly with the fix; deadlocks (nextest TIMEOUT) against the old emit-then-store order — verified by temporarily reverting. - Update `write_operations/CLAUDE.md` to describe store-before-emit as load-bearing on both the local and volume paths.
1 parent 9927147 commit 070b8d1

2 files changed

Lines changed: 120 additions & 4 deletions

File tree

apps/desktop/src-tauri/src/file_system/write_operations/CLAUDE.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ Rates and ETA are computed in the backend (`eta.rs`) and shipped on every `Write
9999

100100
**Two-layer cancellation.** `AtomicU8` (`OperationIntent`) for fast in-loop checks in local file operations. Volume operations (MTP, SMB) use the same `AtomicU8` checks but run on the async executor (no `spawn_blocking`). `run_cancellable` wraps blocking local operations (for example, network-mount copies that may block indefinitely) in a separate thread, polling the flag every 100 ms via `mpsc::channel`.
101101

102-
**Stop-mode conflict resolution.** Emits `write-conflict` event, then blocks on a `tokio::sync::oneshot` channel (`blocking_recv()` inside `spawn_blocking`). A new oneshot channel is created per conflict. Frontend calls `resolve_write_conflict(operation_id, resolution, apply_to_all)` which takes the stored `Sender` and sends the `ConflictResolutionResponse`. `cancel_write_operation` drops the sender, causing the receiver to return `Err` (interpreted as cancellation). This is strictly better than the old Condvar+timeout approach: no polling, no 30 s safety timeout needed, immediate unblock on cancel.
102+
**Stop-mode conflict resolution.** Creates a per-conflict `tokio::sync::oneshot` channel, **stores the sender BEFORE emitting the `write-conflict` event**, then blocks on the receiver (`blocking_recv()` inside `spawn_blocking`; the volume path `await`s instead). Store-before-emit is load-bearing: a responder can only answer a conflict it has observed, so if the event reached `resolve_write_conflict` (or a test responder sink) before the sender slot was filled, the take would miss and the recv would hang. Both the local-FS branch (`conflict.rs`) and the volume branch (`transfer/volume_conflict.rs`) order it this way. Frontend calls `resolve_write_conflict(operation_id, resolution, apply_to_all)` which takes the stored `Sender` and sends the `ConflictResolutionResponse`. `cancel_write_operation` drops the sender, causing the receiver to return `Err` (interpreted as cancellation). This is strictly better than the old Condvar+timeout approach: no polling, no 30 s safety timeout needed, immediate unblock on cancel. Pinned by `conflict.rs::stop_branch_store_before_emit_tests` (local) and the `ConflictResponderSink` suites (volume).
103103

104104
**Conflict-dispatch mutex (folder merges).** `WriteOperationState::conflict_dispatch_lock` (a `tokio::sync::Mutex`, next to `conflict_resolution_tx`) serializes the whole Stop-mode dispatch for an operation: there is exactly one human and one oneshot slot, so two tasks both hitting a Stop-mode clash at once — the concurrent volume-copy spawn loop, or two parallel deep directory merges — must queue rather than race to emit a `write-conflict` and clobber each other's sender. The dispatch sequence under the lock: check `is_cancelled` (bail with `Cancelled` so a queued task can't emit a prompt no one will answer after the dialog tears down — a hang), re-check the apply-to-all latch (a prior "…all" answer collapses the queued prompt), emit + await, store the latch, release. Released on every exit, NEVER held across the subsequent file write. Volume-side only today (the local-FS engine's per-file conflicts surface serially inside one `spawn_blocking`). See `transfer/CLAUDE.md` § "The conflict-dispatch mutex".
105105

apps/desktop/src-tauri/src/file_system/write_operations/conflict.rs

Lines changed: 119 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -156,12 +156,20 @@ pub(super) fn resolve_conflict(
156156
source_size_for_dir,
157157
destination_size_for_dir,
158158
);
159-
events.emit_conflict(event);
160-
161-
// Create a oneshot channel for this conflict resolution
159+
// Store the oneshot sender BEFORE emitting the event. A responder
160+
// (the FE's `resolve_write_conflict`, which takes the stored sender)
161+
// can only answer a conflict it has observed; if the event reached it
162+
// before the sender slot was filled, its take would miss and the
163+
// `blocking_recv` below would hang. Storing first makes the sender
164+
// available the instant the event is in the responder's hands. The
165+
// lock guard is released as the statement ends — never held across
166+
// the emit or the recv. Mirrors the volume-side Stop branch in
167+
// `transfer/volume_conflict.rs`.
162168
let (tx, rx) = tokio::sync::oneshot::channel();
163169
*state.conflict_resolution_tx.lock_ignore_poison() = Some(tx);
164170

171+
events.emit_conflict(event);
172+
165173
// Wait for user to call resolve_write_conflict.
166174
// The sender is dropped on cancel_write_operation, which unblocks the
167175
// receiver immediately. No timeout needed (the old 30s timeout was a
@@ -1273,3 +1281,111 @@ mod apply_to_all_tests {
12731281
);
12741282
}
12751283
}
1284+
1285+
#[cfg(test)]
1286+
mod stop_branch_store_before_emit_tests {
1287+
//! Pins the store-before-emit ordering of the local-FS Stop branch in
1288+
//! `resolve_conflict`: the oneshot sender must be stored in
1289+
//! `state.conflict_resolution_tx` BEFORE the `write-conflict` event is
1290+
//! emitted, so a responder that observes the event and answers it
1291+
//! synchronously (the FE's `resolve_write_conflict`, modeled here by a sink
1292+
//! that answers inside `emit_conflict`) finds the sender already present.
1293+
1294+
use super::*;
1295+
use crate::file_system::write_operations::state::{ConflictResolutionResponse, WriteOperationState};
1296+
use crate::file_system::write_operations::types::{
1297+
CollectorEventSink, ConflictInfo, ConflictResolution, DryRunResult, OperationEventSink, ScanProgressEvent,
1298+
WriteCancelledEvent, WriteCompleteEvent, WriteConflictEvent, WriteErrorEvent, WriteOperationConfig,
1299+
WriteProgressEvent, WriteSourceItemDoneEvent,
1300+
};
1301+
use std::sync::Arc;
1302+
use std::time::Duration;
1303+
use tempfile::TempDir;
1304+
1305+
/// A sink that answers a Stop-mode `write-conflict` synchronously the moment
1306+
/// it observes it, by taking the stored oneshot sender — exactly what the
1307+
/// FE's `resolve_write_conflict` does, but driven from inside `emit_conflict`.
1308+
/// Forwards every event to an inner `CollectorEventSink` for inspection.
1309+
///
1310+
/// This only resolves the conflict if the sender is ALREADY stored when the
1311+
/// event arrives. If the production code emitted before storing, the take
1312+
/// would miss, nothing would answer, and `resolve_conflict`'s
1313+
/// `rx.blocking_recv()` would deadlock — turning the ordering bug into a hang
1314+
/// instead of a wrong value. The store-before-emit fix is what keeps this
1315+
/// test from hanging.
1316+
struct AnswerOnConflictSink {
1317+
inner: CollectorEventSink,
1318+
state: Arc<WriteOperationState>,
1319+
resolution: ConflictResolution,
1320+
}
1321+
1322+
impl OperationEventSink for AnswerOnConflictSink {
1323+
fn emit_conflict(&self, e: WriteConflictEvent) {
1324+
self.inner.emit_conflict(e);
1325+
if let Some(tx) = self.state.conflict_resolution_tx.lock_ignore_poison().take() {
1326+
let _ = tx.send(ConflictResolutionResponse {
1327+
resolution: self.resolution,
1328+
apply_to_all: false,
1329+
});
1330+
}
1331+
}
1332+
fn emit_progress(&self, e: WriteProgressEvent) {
1333+
self.inner.emit_progress(e);
1334+
}
1335+
fn emit_complete(&self, e: WriteCompleteEvent) {
1336+
self.inner.emit_complete(e);
1337+
}
1338+
fn emit_cancelled(&self, e: WriteCancelledEvent) {
1339+
self.inner.emit_cancelled(e);
1340+
}
1341+
fn emit_error(&self, e: WriteErrorEvent) {
1342+
self.inner.emit_error(e);
1343+
}
1344+
fn emit_source_item_done(&self, _e: WriteSourceItemDoneEvent) {}
1345+
fn emit_scan_progress(&self, _e: ScanProgressEvent) {}
1346+
fn emit_scan_conflict(&self, _c: ConflictInfo) {}
1347+
fn emit_dry_run_complete(&self, _r: DryRunResult) {}
1348+
}
1349+
1350+
/// THE PIN: with the sender stored before the emit, a responder answering
1351+
/// synchronously inside `emit_conflict` resolves the local-FS Stop clash and
1352+
/// `resolve_conflict` returns the scripted resolution. Run WITHOUT a Tokio
1353+
/// runtime so the function's `rx.blocking_recv()` is legal; the answer is
1354+
/// already buffered in the oneshot by the time `blocking_recv` runs, so it
1355+
/// returns immediately. Against the pre-fix emit-then-store ordering the take
1356+
/// inside `emit_conflict` would miss and this test would deadlock.
1357+
#[test]
1358+
fn stop_clash_answered_from_within_emit_resolves_without_hanging() {
1359+
let dir = TempDir::new().unwrap();
1360+
let src = dir.path().join("src.txt");
1361+
let dst = dir.path().join("dst.txt");
1362+
fs::write(&src, b"SRC").unwrap();
1363+
fs::write(&dst, b"DEST").unwrap();
1364+
1365+
let state = Arc::new(WriteOperationState::new(Duration::from_millis(0)));
1366+
let events = AnswerOnConflictSink {
1367+
inner: CollectorEventSink::new(),
1368+
state: Arc::clone(&state),
1369+
resolution: ConflictResolution::Skip,
1370+
};
1371+
let config = WriteOperationConfig::default(); // Stop, overwrite=false
1372+
let mut latch = ApplyToAll::default();
1373+
1374+
let result = resolve_conflict(&src, &dst, &config, &events, "op-local-stop-pin", &state, &mut latch);
1375+
1376+
// Skip resolves to "skip this file" (None), proving the responder's
1377+
// answer reached the op — which is only possible if the sender was
1378+
// stored before the event was emitted.
1379+
assert!(
1380+
matches!(result, Ok(None)),
1381+
"Skip answer should resolve the clash to None, got {result:?}"
1382+
);
1383+
// Exactly one conflict event was recorded, and it's a file-vs-file clash.
1384+
let conflicts = events.inner.conflicts.lock_ignore_poison();
1385+
assert_eq!(conflicts.len(), 1, "exactly one Stop prompt for the file clash");
1386+
assert!(
1387+
!conflicts[0].source_is_directory && !conflicts[0].destination_is_directory,
1388+
"the clash is file-vs-file"
1389+
);
1390+
}
1391+
}

0 commit comments

Comments
 (0)