Skip to content

Commit 9927147

Browse files
committed
Tests: event-driven conflict responder, sink-derived prompt counts
- Replace both copies of the racy polling `spawn_conflict_responder` (in `volume_merge_tests.rs` and `volume_rename_merge_tests.rs`) with one shared `ConflictResponderSink` in `conflict_responder_test_support.rs`. The sink wraps a `CollectorEventSink`, forwards every event, and answers a Stop-mode prompt synchronously the instant it observes the `write-conflict` event — no 2 ms polling loop, no parallel `AtomicUsize` answer counter. - Switch every `answered`-based assertion to sink-derived counts via `file_conflict_count(&events.inner)`. Counts are authoritative once the op future returns, and the assertions got stronger: the deep-clash test now pins the file-conflict count AND the path/flag shape, not just a counter. - Make the responder design sound by storing the oneshot sender BEFORE emitting the `write-conflict` event in `volume_conflict.rs`'s Stop branch. Previously the emit ran before the store, so a sink taking the sender inside `emit_conflict` would miss; storing first makes the take race-free for both the test sink and the real FE responder. - Drop the obsolete "spawn_conflict_responder must bump its counter before send" gotcha in `transfer/CLAUDE.md`; replace with a one-liner naming the responder-sink pattern. - Determinism: the previously-flaky case-fold and stop-clash tests now pass 40x at `-j 8`; the design is order-independent.
1 parent af25aa8 commit 9927147

6 files changed

Lines changed: 164 additions & 114 deletions

File tree

apps/desktop/src-tauri/src/file_system/write_operations/transfer/CLAUDE.md

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -182,5 +182,4 @@ Our chunked copy (1 MB read/write chunks) provides: identical speed for non-clon
182182
**Gotcha**: Cross-type Overwrite (file↔folder) is delete-first, NOT a merge or safe-replace.
183183
**Why**: A type swap can't temp-rename across backends, so `apply_volume_conflict_resolution` deletes the dest first (`delete_volume_path_recursive` for a folder dest, `Volume::delete` for a file dest) before the source materializes. These are rare and lower-stakes (a type mismatch already means wholesale content replacement). Same-type dir-vs-dir never reaches `apply_volume_conflict_resolution` for the folder — it short-circuits to merge in `resolve_volume_conflict` before any policy dispatch.
184184

185-
**Gotcha (test harness)**: `volume_rename_merge_tests.rs::spawn_conflict_responder` must bump its `answered` counter BEFORE `tx.send`, never after.
186-
**Why**: `tx.send` unblocks the operation's `rx.await` synchronously, so the op can run to completion — and the test can read `answered` — before the responder task is rescheduled to run the line after the send. Incrementing after the send left a window (negligible on an idle Mac, ~6% under the CPU contention of a full `rust-tests-linux` run) where the op finished with the counter still reading 0, producing a false "a clash must prompt exactly once" failure (`left: 0, right: 1`) on the case-fold and Stop-clash tests. The production rename-merge is correct; only the test's answer-accounting raced. Don't reorder it back.
185+
**Test harness: the conflict responder is an event sink, prompt counts come from the sink.** The folder-merge suites (`volume_merge_tests.rs`, `volume_rename_merge_tests.rs`) drive Stop-mode prompts with `ConflictResponderSink` (`conflict_responder_test_support.rs`): it wraps a `CollectorEventSink`, forwards every event, and the instant it observes a `write-conflict` it `take()`s `state.conflict_resolution_tx` and synchronously sends the scripted answer. This works because the Stop branch stores the sender BEFORE emitting the event (`volume_conflict.rs`), so the take can't miss. Assertions derive the prompt count from the recorded conflicts via `file_conflict_count(&events.inner)` — race-free once the op future returns — never from a side-channel counter. The pattern is order-independent by design, so there's no polling loop and no answer-accounting race to defend.
Lines changed: 111 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,111 @@
1+
//! Shared test support for driving Stop-mode conflict prompts in the
2+
//! folder-merge suites (`volume_merge_tests.rs` and `volume_rename_merge_tests.rs`).
3+
//!
4+
//! ## Why an event-driven responder, not a polling one
5+
//!
6+
//! A Stop-mode clash emits a `write-conflict` event and then blocks the
7+
//! operation on a `tokio::sync::oneshot` receiver until something fills the
8+
//! `state.conflict_resolution_tx` slot. The merge engine stores that sender
9+
//! BEFORE emitting the event (see `volume_conflict.rs`'s Stop branch), so by the
10+
//! time any sink observes the `emit_conflict` call the sender is already in the
11+
//! slot. [`ConflictResponderSink`] exploits exactly that: it wraps an inner
12+
//! [`CollectorEventSink`], forwards every event, and — the instant it sees a
13+
//! conflict — synchronously `take()`s the sender and sends the scripted
14+
//! [`ConflictResolutionResponse`]. The op's `rx.await` then returns immediately.
15+
//!
16+
//! This is order-independent by construction: there is no parallel counter to
17+
//! race against the op future, and no 2 ms polling loop. Once the driven op
18+
//! future completes, the inner collector's recorded conflicts ARE the
19+
//! authoritative, race-free prompt count — `events` carries the paths and
20+
//! file/folder flags too, so assertions derive from the sink, not a side-channel
21+
//! `AtomicUsize`. See [`file_conflict_count`].
22+
23+
use std::sync::Arc;
24+
25+
use super::super::state::{ConflictResolutionResponse, WriteOperationState};
26+
use super::super::types::{
27+
CollectorEventSink, ConflictInfo, ConflictResolution, DryRunResult, OperationEventSink, ScanProgressEvent,
28+
WriteCancelledEvent, WriteCompleteEvent, WriteConflictEvent, WriteErrorEvent, WriteProgressEvent,
29+
WriteSettledEvent, WriteSourceItemDoneEvent,
30+
};
31+
use crate::ignore_poison::IgnorePoison;
32+
33+
/// An event sink that auto-answers Stop-mode `write-conflict` prompts with a
34+
/// scripted resolution, the moment it observes them. Forwards every event to an
35+
/// inner [`CollectorEventSink`], so the driving test can derive race-free prompt
36+
/// counts (and richer path/flag assertions) from `sink.inner` after the op
37+
/// completes.
38+
///
39+
/// Use it as the operation's `events` sink directly — it replaces the old
40+
/// pattern of a `CollectorEventSink` plus a separately-spawned polling responder
41+
/// task. Because it answers synchronously inside `emit_conflict` (the sender is
42+
/// already stored by then), there is no task to abort and no polling window.
43+
pub(super) struct ConflictResponderSink {
44+
pub inner: CollectorEventSink,
45+
state: Arc<WriteOperationState>,
46+
resolution: ConflictResolution,
47+
apply_to_all: bool,
48+
}
49+
50+
impl ConflictResponderSink {
51+
/// Answers every prompt with `resolution` / `apply_to_all`.
52+
pub(super) fn new(state: &Arc<WriteOperationState>, resolution: ConflictResolution, apply_to_all: bool) -> Self {
53+
Self {
54+
inner: CollectorEventSink::new(),
55+
state: Arc::clone(state),
56+
resolution,
57+
apply_to_all,
58+
}
59+
}
60+
}
61+
62+
impl OperationEventSink for ConflictResponderSink {
63+
fn emit_progress(&self, event: WriteProgressEvent) {
64+
self.inner.emit_progress(event);
65+
}
66+
fn emit_complete(&self, e: WriteCompleteEvent) {
67+
self.inner.emit_complete(e);
68+
}
69+
fn emit_cancelled(&self, e: WriteCancelledEvent) {
70+
self.inner.emit_cancelled(e);
71+
}
72+
fn emit_error(&self, e: WriteErrorEvent) {
73+
self.inner.emit_error(e);
74+
}
75+
fn emit_conflict(&self, e: WriteConflictEvent) {
76+
// Record the prompt first (so the count is authoritative even if the
77+
// send below races teardown), then answer it.
78+
self.inner.emit_conflict(e);
79+
80+
// The sender was stored before this event was emitted, so the `take()`
81+
// can't miss. Sending unblocks the op's `rx.await` synchronously.
82+
if let Some(tx) = self.state.conflict_resolution_tx.lock_ignore_poison().take() {
83+
let _ = tx.send(ConflictResolutionResponse {
84+
resolution: self.resolution,
85+
apply_to_all: self.apply_to_all,
86+
});
87+
}
88+
}
89+
fn emit_source_item_done(&self, _e: WriteSourceItemDoneEvent) {}
90+
fn emit_scan_progress(&self, _e: ScanProgressEvent) {}
91+
fn emit_scan_conflict(&self, _c: ConflictInfo) {}
92+
fn emit_dry_run_complete(&self, _r: DryRunResult) {}
93+
fn emit_settled(&self, _e: WriteSettledEvent) {}
94+
}
95+
96+
/// Counts `write-conflict` events that are a FILE-vs-FILE clash (neither side a
97+
/// directory) — i.e. the per-file prompts a merge can legitimately raise. This
98+
/// is the authoritative, race-free prompt count once the driven op future has
99+
/// completed, replacing the old parallel `AtomicUsize` answer counter. Dir-vs-dir
100+
/// merges never emit a conflict at all (the resolver short-circuits before the
101+
/// emit), so this equals the total emitted conflicts in a pure file-clash merge;
102+
/// filtering to file-vs-file keeps it honest if a cross-type clash is ever mixed
103+
/// in.
104+
pub(super) fn file_conflict_count(events: &CollectorEventSink) -> usize {
105+
events
106+
.conflicts
107+
.lock_ignore_poison()
108+
.iter()
109+
.filter(|c| !c.source_is_directory && !c.destination_is_directory)
110+
.count()
111+
}

apps/desktop/src-tauri/src/file_system/write_operations/transfer/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@ pub(super) mod volume_strategy;
3131
#[allow(unused_imports, reason = "used by transaction_integration_test")]
3232
pub(crate) use super::state::CopyTransaction;
3333

34+
#[cfg(test)]
35+
pub(crate) mod conflict_responder_test_support;
3436
#[cfg(test)]
3537
mod copy_integration_test;
3638
#[cfg(test)]

apps/desktop/src-tauri/src/file_system/write_operations/transfer/volume_conflict.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -196,6 +196,16 @@ pub(super) async fn resolve_volume_conflict(
196196
_ => None,
197197
};
198198

199+
// Store the oneshot sender BEFORE emitting the event. A responder
200+
// (the FE's `resolve_write_conflict`, or a test responder sink that
201+
// takes the sender inside its `emit_conflict` callback) can only
202+
// answer a conflict it has observed; if the event reached it before
203+
// the sender slot was filled, its `take()` would miss and the op's
204+
// `rx.await` below would hang. Storing first makes the sender
205+
// available the instant the event is in the responder's hands.
206+
let (tx, rx) = tokio::sync::oneshot::channel();
207+
*state.conflict_resolution_tx.lock_ignore_poison() = Some(tx);
208+
199209
events.emit_conflict(WriteConflictEvent {
200210
operation_id: operation_id.to_string(),
201211
source_path: source_path.display().to_string(),
@@ -210,10 +220,6 @@ pub(super) async fn resolve_volume_conflict(
210220
destination_is_directory,
211221
});
212222

213-
// Create a oneshot channel for this conflict resolution
214-
let (tx, rx) = tokio::sync::oneshot::channel();
215-
*state.conflict_resolution_tx.lock_ignore_poison() = Some(tx);
216-
217223
// Wait for user to call resolve_write_conflict.
218224
match rx.await {
219225
Ok(response) => {

apps/desktop/src-tauri/src/file_system/write_operations/transfer/volume_merge_tests.rs

Lines changed: 23 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -9,17 +9,16 @@
99
//! exactly as in production. Shared fixtures `make_state` / `make_volumes` live in
1010
//! `volume_copy_tests.rs` (`super::tests`).
1111
12+
use super::super::conflict_responder_test_support::{ConflictResponderSink, file_conflict_count};
1213
use super::tests::{make_state, make_volumes};
1314
use super::*;
1415
use crate::file_system::volume::Volume;
15-
use crate::file_system::write_operations::state::{
16-
ConflictResolutionResponse, WRITE_OPERATION_STATE, cancel_write_operation,
17-
};
16+
use crate::file_system::write_operations::state::{WRITE_OPERATION_STATE, cancel_write_operation};
1817
use crate::file_system::write_operations::types::{
1918
CollectorEventSink, ConflictResolution, WriteCancelledEvent, WriteCompleteEvent, WriteConflictEvent,
2019
WriteErrorEvent, WriteSourceItemDoneEvent,
2120
};
22-
use std::sync::atomic::{AtomicU8, AtomicUsize};
21+
use std::sync::atomic::AtomicU8;
2322

2423
// ============================================================================
2524
// Helpers
@@ -95,35 +94,6 @@ async fn make_rich_merge() -> (Arc<dyn Volume>, Arc<dyn Volume>) {
9594
(source, dest)
9695
}
9796

98-
/// Background task that auto-answers EVERY Stop-mode `write-conflict` with a
99-
/// scripted response by polling `conflict_resolution_tx`. Runs until aborted by
100-
/// the caller (after the driven op returns), so it never blocks the test for a
101-
/// fixed duration — the count of prompts answered is read from the returned
102-
/// `Arc<AtomicUsize>`.
103-
fn spawn_conflict_responder(
104-
state: &Arc<WriteOperationState>,
105-
resolution: ConflictResolution,
106-
apply_to_all: bool,
107-
) -> (tokio::task::JoinHandle<()>, Arc<AtomicUsize>) {
108-
let state = Arc::clone(state);
109-
let answered = Arc::new(AtomicUsize::new(0));
110-
let answered_task = Arc::clone(&answered);
111-
let handle = tokio::spawn(async move {
112-
loop {
113-
let tx = state.conflict_resolution_tx.lock().unwrap().take();
114-
if let Some(tx) = tx {
115-
let _ = tx.send(ConflictResolutionResponse {
116-
resolution,
117-
apply_to_all,
118-
});
119-
answered_task.fetch_add(1, Ordering::Relaxed);
120-
}
121-
tokio::time::sleep(Duration::from_millis(2)).await;
122-
}
123-
});
124-
(handle, answered)
125-
}
126-
12797
/// Count `write-conflict` events whose paths refer to a DIRECTORY on either side
12898
/// — i.e. a folder-level prompt. The contract is ZERO of these for a dir-vs-dir
12999
/// merge (folders always merge silently).
@@ -169,9 +139,15 @@ async fn merge_never_deletes_unshadowed_dest_files_under_every_policy() {
169139
let (source, dest) = make_rich_merge().await;
170140
let state = make_state();
171141

172-
let responder = scripted.map(|answer| spawn_conflict_responder(&state, answer, true));
173-
174-
let events = Arc::new(CollectorEventSink::new());
142+
// The responder sink IS the events sink: it forwards every event to its
143+
// inner collector and auto-answers any Stop-mode prompt. For non-Stop
144+
// policies no prompt is ever emitted, so the scripted answer (defaulted
145+
// to Skip here) never fires — the sink is a plain collector in that case.
146+
let events = Arc::new(ConflictResponderSink::new(
147+
&state,
148+
scripted.unwrap_or(ConflictResolution::Skip),
149+
true,
150+
));
175151
let config = VolumeCopyConfig {
176152
conflict_resolution: *policy,
177153
progress_interval_ms: 0,
@@ -190,9 +166,6 @@ async fn merge_never_deletes_unshadowed_dest_files_under_every_policy() {
190166
)
191167
.await;
192168

193-
if let Some((handle, _)) = responder {
194-
handle.abort();
195-
}
196169
assert!(
197170
result.is_ok(),
198171
"policy {policy:?}/{scripted:?} should complete, got {result:?}"
@@ -224,7 +197,7 @@ async fn merge_never_deletes_unshadowed_dest_files_under_every_policy() {
224197

225198
// Zero folder-level prompts under EVERY policy, even Stop.
226199
assert_eq!(
227-
folder_conflict_count(&events),
200+
folder_conflict_count(&events.inner),
228201
0,
229202
"policy {policy:?}/{scripted:?}: a dir-vs-dir merge wrongly emitted a folder conflict"
230203
);
@@ -457,9 +430,7 @@ async fn stop_mode_deep_file_clash_emits_conflict_and_resumes() {
457430
.unwrap();
458431

459432
let state = make_state();
460-
let (responder, answered) = spawn_conflict_responder(&state, ConflictResolution::Overwrite, false);
461-
462-
let events = Arc::new(CollectorEventSink::new());
433+
let events = Arc::new(ConflictResponderSink::new(&state, ConflictResolution::Overwrite, false));
463434
let config = VolumeCopyConfig {
464435
conflict_resolution: ConflictResolution::Stop,
465436
progress_interval_ms: 0,
@@ -477,10 +448,11 @@ async fn stop_mode_deep_file_clash_emits_conflict_and_resumes() {
477448
&config,
478449
)
479450
.await;
480-
responder.abort();
481451
assert!(result.is_ok(), "expected Ok, got {result:?}");
452+
// The sink-recorded file prompts ARE the race-free count once the op future
453+
// has completed — exactly one Stop prompt for the deep clash.
482454
assert_eq!(
483-
answered.load(Ordering::Relaxed),
455+
file_conflict_count(&events.inner),
484456
1,
485457
"exactly one Stop prompt expected for the deep clash"
486458
);
@@ -491,7 +463,7 @@ async fn stop_mode_deep_file_clash_emits_conflict_and_resumes() {
491463
// `await_holding_lock` flags a guard alive across `.await` even with an
492464
// explicit `drop`, so we end the borrow by leaving the block instead).
493465
let (src_path, dst_path, src_is_dir, dst_is_dir, n_conflicts) = {
494-
let conflicts = events.conflicts.lock().unwrap();
466+
let conflicts = events.inner.conflicts.lock().unwrap();
495467
let c = conflicts.first().expect("exactly one deep file conflict");
496468
(
497469
c.source_path.clone(),
@@ -725,9 +697,7 @@ async fn concurrent_merge_with_two_deep_clashes_serializes_prompts() {
725697
source.create_file(Path::new("/three.txt"), b"THREE").await.unwrap();
726698

727699
let state = make_state();
728-
let (responder, answered) = spawn_conflict_responder(&state, ConflictResolution::Overwrite, false);
729-
730-
let events = Arc::new(CollectorEventSink::new());
700+
let events = Arc::new(ConflictResponderSink::new(&state, ConflictResolution::Overwrite, false));
731701
let config = VolumeCopyConfig {
732702
conflict_resolution: ConflictResolution::Stop,
733703
progress_interval_ms: 0,
@@ -749,9 +719,10 @@ async fn concurrent_merge_with_two_deep_clashes_serializes_prompts() {
749719
&config,
750720
)
751721
.await;
752-
responder.abort();
753722
assert!(result.is_ok(), "expected Ok, got {result:?}");
754-
let n = answered.load(Ordering::Relaxed);
723+
// Sink-derived: both deep clashes prompted (the dispatch mutex serialized
724+
// them through the single oneshot slot, each answered in turn).
725+
let n = file_conflict_count(&events.inner);
755726
assert_eq!(n, 2, "both deep clashes should prompt and be answered, got {n}");
756727

757728
// Both clashes overwritten (50_000 bytes of 1u8), both dest-only files kept.
@@ -790,9 +761,7 @@ async fn top_level_and_deep_clash_share_the_dispatch_mutex() {
790761
let state = make_state();
791762
// One "…all" answer collapses any queued prompt via the latch double-check,
792763
// so at most 2 prompts ever emit (top + deep), often just 1.
793-
let (responder, _answered) = spawn_conflict_responder(&state, ConflictResolution::Overwrite, true);
794-
795-
let events = Arc::new(CollectorEventSink::new());
764+
let events = Arc::new(ConflictResponderSink::new(&state, ConflictResolution::Overwrite, true));
796765
let config = VolumeCopyConfig {
797766
conflict_resolution: ConflictResolution::Stop,
798767
progress_interval_ms: 0,
@@ -814,7 +783,6 @@ async fn top_level_and_deep_clash_share_the_dispatch_mutex() {
814783
&config,
815784
)
816785
.await;
817-
responder.abort();
818786
assert!(result.is_ok(), "expected Ok, got {result:?}");
819787

820788
// Both clashes overwritten by the "…all" choice; dest-only file survives.

0 commit comments

Comments
 (0)