Skip to content

Commit aa9905f

Browse files
committed
File viewer: Open instantly by backgrounding the FS watcher subscribe
- Move the blocking FSEvents subscribe off `open_session`'s critical path onto the watcher-manager thread (`spawn_watcher_manager`). Open drops from ~118ms idle (unbounded and fat-tailed under load: 195-730ms under a synthetic FS-event flood) to ~0.3ms regardless of system load. This is a prod win too: every viewer open previously paid this latency under the 2s `viewer_open` timeout, so a busy machine or slow/network path could time out the open. - Close the open→subscribe missed-append window with `catch_up_after_subscribe`: right after subscribing, re-stat the file and, if on-disk size exceeds what the live backend covers, drive the same `Grew` path a real event would. Comparing against live backend coverage is correct regardless of upgrade timing (mid-upgrade queues into `pending_grew`; post-upgrade tail-extends or emits). - Fix the `search_poll_no_active_search` 8s-nextest-cap flake: the test did ~zero CPU work; 100% of its time was that one `fseventsd`-bound subscribe, whose tail crossed 8s under the saturated check suite. - Tests that inject synthetic watcher events now call `wait_for_watcher_subscribed()`, since the watcher isn't live the instant open returns.
1 parent 2790735 commit aa9905f

3 files changed

Lines changed: 110 additions & 33 deletions

File tree

apps/desktop/src-tauri/src/file_viewer/CLAUDE.md

Lines changed: 15 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ Classification per debounce window:
9898
- `Replaced` when the inode changed (rename + atomic replace, log rotation).
9999
- `MetadataOnly` when nothing observable changed.
100100

101-
Per-session, a manager thread (`spawn_watcher_manager_thread`) consumes events on the subscription channel:
101+
Per-session, a manager thread (`spawn_watcher_manager`) does the FSEvents subscribe itself (see the gotcha below), then consumes events on the subscription channel:
102102

103103
- Always emits `viewer:file-changed:<sid>` with `{ kind: "grew", newSize }` or `{ kind: "rotated" }`.
104104
- `Grew` with `upgrading` or `rebuilding` in flight: queues `pending_grew = Some(new_size.max(prev))` (drain-and-swap
@@ -118,12 +118,20 @@ Per-session, a manager thread (`spawn_watcher_manager_thread`) consumes events o
118118

119119
## Gotchas (tail mode)
120120

121-
**Watcher subscribe happens AFTER `SESSIONS.insert` but BEFORE the upgrade spawn.** `notify-debouncer-full::new_debouncer`
122-
plus `debouncer.watch` need the session already in `SESSIONS` so the manager thread can look it up. They run before the
123-
upgrade thread spawn so the watcher captures any append that lands during the upgrade window; otherwise an append
124-
arriving between SESSIONS.insert and the watcher's first event would be observed by no one (the upgrade has already
125-
stored its LineIndex covering only the pre-append EOF, and no later FS event ever fires for that one append). Pinned by
126-
`tail_mode_on_extends_backend_when_watcher_reports_grew` and `test_append_during_upgrade_not_dropped`.
121+
**The FSEvents subscribe runs on the manager thread, NOT inline in `open_session`.** `notify-debouncer-full::new_debouncer`
122+
+ `debouncer.watch` is a blocking, `fseventsd`-bound call: ~100 ms idle on macOS and seconds under load (measured: a
123+
0.3 s test became >8 s under the full check suite, tripping the nextest cap; a synthetic FS-event flood pushed the
124+
subscribe alone from 118 ms to 730 ms). Doing it inline made every viewer open pay that latency and risked the 2 s
125+
`viewer_open` timeout on a busy system. So `open_session` spawns `spawn_watcher_manager`, which subscribes on its own
126+
thread and only then runs the poll loop. Open is now sub-millisecond regardless of system load. **Don't move the
127+
subscribe back inline.** Because the subscribe no longer precedes the upgrade thread, an append could land in the
128+
open→subscribe window and fire no event (the watcher's size baseline is the on-disk EOF at subscribe time). That window
129+
is closed by `catch_up_after_subscribe`: right after subscribing, it re-stats the file and, if the on-disk size exceeds
130+
what the active backend currently covers, drives the same `Grew` path a real event would — correct whether the
131+
ByteSeek→LineIndex upgrade has stored yet (mid-upgrade it queues into `pending_grew`; post-upgrade it tail-extends or
132+
emits). Tests that inject synthetic watcher events must call `wait_for_watcher_subscribed()` after `open_session` first,
133+
since the watcher isn't live the instant open returns. Pinned by `tail_mode_on_extends_backend_when_watcher_reports_grew`
134+
and `test_append_during_upgrade_not_dropped`.
127135

128136
**Tail-extend race against an encoding rebuild.** `apply_tail_extend` snapshots the active backend `Arc`, drops the
129137
SESSIONS lock, runs `extend_to_boxed` (multi-second on a multi-MB append), then re-acquires the lock. If an encoding

apps/desktop/src-tauri/src/file_viewer/session.rs

Lines changed: 69 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
//! API for the frontend. Sessions are cached by ID and cleaned up on close.
55
66
use std::collections::HashMap;
7-
use std::path::PathBuf;
7+
use std::path::{Path, PathBuf};
88
use std::sync::atomic::{AtomicBool, Ordering};
99
use std::sync::{Arc, LazyLock, Mutex, RwLock};
1010
use std::thread;
@@ -24,7 +24,7 @@ use super::full_load::FullLoadBackend;
2424
use super::line_index::LineIndexBackend;
2525
use super::range_read::{RangeEnd, read_range as do_read_range};
2626
use super::search_matcher::{Matcher, SearchMode};
27-
use super::watcher::{VIEWER_WATCHER_MANAGER, ViewerSubscription, WatcherEvent};
27+
use super::watcher::{VIEWER_WATCHER_MANAGER, WatcherEvent};
2828
use super::{
2929
BackendCapabilities, FULL_LOAD_THRESHOLD, FileViewerBackend, LineChunk, MAX_SEARCH_MATCHES, SearchMatch,
3030
SeekTarget, ViewerError,
@@ -361,26 +361,17 @@ pub fn open_session(path: &str) -> Result<ViewerOpenResult, ViewerError> {
361361
let session_path = session.path.clone();
362362
SESSIONS.lock_ignore_poison().insert(session_id.clone(), session);
363363

364-
// Subscribe to filesystem events BEFORE spawning the upgrade thread.
365-
// If the upgrade thread is spawned first and finishes between the spawn
366-
// and the watcher subscribe, any append on disk during that window is
367-
// observed by no one: the upgrade thread already stored its LineIndex
368-
// (covering only the pre-append EOF), and the watcher hasn't subscribed
369-
// yet so no event ever fires. Subscribing first means the watcher is
370-
// live throughout the upgrade window; any append queues into
371-
// `pending_grew` and the upgrade's drain-and-swap critical section
372-
// picks it up.
373-
//
374-
// The watcher manager thread itself locks SESSIONS to look up the
375-
// session on each event, so it gracefully tolerates the brief window
376-
// between this insert and the manager thread's first `recv`.
364+
// Attach the filesystem watcher off the critical path. The FSEvents
365+
// subscribe is a blocking, `fseventsd`-bound call (~100ms idle, seconds
366+
// under load), so doing it inline would make every viewer open pay that
367+
// latency and risk the 2s `viewer_open` timeout on a busy system. The
368+
// manager thread does the subscribe itself, then a catch-up re-stat closes
369+
// the open→subscribe window for any append that landed before the watcher
370+
// went live. See `spawn_watcher_manager`.
377371
//
378372
// Tests can opt out via `CMDR_VIEWER_DISABLE_WATCHER=1`.
379373
if std::env::var("CMDR_VIEWER_DISABLE_WATCHER").is_err() {
380-
match VIEWER_WATCHER_MANAGER.subscribe(&session_path) {
381-
Ok(sub) => spawn_watcher_manager_thread(session_id.clone(), sub, watcher_stop_for_thread),
382-
Err(e) => debug!("viewer watcher subscribe failed for {}: {}", session_path.display(), e),
383-
}
374+
spawn_watcher_manager(session_id.clone(), session_path, watcher_stop_for_thread);
384375
}
385376

386377
// If we're using ByteSeek, start background upgrade to LineIndex with timeout
@@ -822,7 +813,7 @@ pub fn write_range_to_file(
822813
read_id: u64,
823814
anchor: RangeEnd,
824815
focus: RangeEnd,
825-
dest_path: &std::path::Path,
816+
dest_path: &Path,
826817
) -> Result<(), ViewerError> {
827818
let text = read_range(session_id, read_id, anchor, focus)?;
828819

@@ -1189,13 +1180,38 @@ pub fn reload(session_id: &str) -> Result<(), ViewerError> {
11891180
Ok(())
11901181
}
11911182

1192-
/// Manager thread spawned once per session. Owns the `ViewerSubscription`
1193-
/// (kept off `ViewerSession` because the channel receiver isn't `Sync`).
1194-
/// Drops the subscription when `stop` flips (set by `close_session`) or when
1195-
/// the upstream channel disconnects; the subscription's `Drop` then
1196-
/// unregisters the path from the shared singleton.
1197-
fn spawn_watcher_manager_thread(session_id: String, sub: ViewerSubscription, stop: Arc<AtomicBool>) {
1183+
/// Manager thread spawned once per session. Does the (blocking,
1184+
/// `fseventsd`-bound) FSEvents subscribe itself — off `open_session`'s critical
1185+
/// path — then owns the resulting `ViewerSubscription` (kept off `ViewerSession`
1186+
/// because the channel receiver isn't `Sync`) and runs the event loop. Drops the
1187+
/// subscription when `stop` flips (set by `close_session`) or when the upstream
1188+
/// channel disconnects; the subscription's `Drop` then unregisters the path from
1189+
/// the shared singleton.
1190+
fn spawn_watcher_manager(session_id: String, path: PathBuf, stop: Arc<AtomicBool>) {
11981191
thread::spawn(move || {
1192+
// `close_session` may have run before this thread got scheduled; skip
1193+
// the blocking subscribe entirely in that case (nothing registered yet,
1194+
// so nothing to unregister).
1195+
if stop.load(Ordering::Relaxed) {
1196+
return;
1197+
}
1198+
1199+
let sub = match VIEWER_WATCHER_MANAGER.subscribe(&path) {
1200+
Ok(sub) => sub,
1201+
Err(e) => {
1202+
debug!("viewer watcher subscribe failed for {}: {}", path.display(), e);
1203+
return;
1204+
}
1205+
};
1206+
1207+
// `stop` may have flipped while we were subscribing. Returning here
1208+
// drops `sub`, which unregisters the path.
1209+
if stop.load(Ordering::Relaxed) {
1210+
return;
1211+
}
1212+
1213+
catch_up_after_subscribe(&session_id, &path);
1214+
11991215
// Poll with a timeout so we periodically check `stop` even if the
12001216
// file's idle: this is the only way the manager exits when
12011217
// `close_session` runs without the file changing first.
@@ -1214,6 +1230,33 @@ fn spawn_watcher_manager_thread(session_id: String, sub: ViewerSubscription, sto
12141230
});
12151231
}
12161232

1233+
/// Closes the open→subscribe missed-append window. Because the subscribe runs
1234+
/// in the background and blocks for an unbounded time, an append can land
1235+
/// between open and the watcher going live — and the watcher's size baseline is
1236+
/// the on-disk EOF *at subscribe time*, so that append would fire no event and
1237+
/// stay invisible. We compare the current on-disk size against what the active
1238+
/// backend covers and, if the file grew, drive the same path a real `Grew`
1239+
/// event would. Comparing against live backend coverage (rather than a captured
1240+
/// open-time EOF) makes this correct regardless of whether the ByteSeek →
1241+
/// LineIndex upgrade has already stored: mid-upgrade it queues into
1242+
/// `pending_grew` (drained by the swap), post-upgrade it tail-extends or emits.
1243+
fn catch_up_after_subscribe(session_id: &str, path: &Path) {
1244+
let Ok(meta) = std::fs::metadata(path) else {
1245+
return;
1246+
};
1247+
let on_disk = meta.len();
1248+
let covered = {
1249+
let sessions = SESSIONS.lock_ignore_poison();
1250+
let Some(session) = sessions.get(session_id) else {
1251+
return; // session closed while we were subscribing
1252+
};
1253+
session.backend.load_full().total_bytes()
1254+
};
1255+
if on_disk > covered {
1256+
handle_watcher_event(session_id, WatcherEvent::Grew(on_disk));
1257+
}
1258+
}
1259+
12171260
fn emit_file_changed(session_id: &str, kind: &'static str, new_size: Option<u64>) {
12181261
let Some(handle) = app_handle() else {
12191262
return;

apps/desktop/src-tauri/src/file_viewer/session_test.rs

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,23 @@ fn write_test_file(dir: &Path, name: &str, content: &str) -> PathBuf {
3535
file
3636
}
3737

38+
/// Wait until a freshly-opened session's watcher subscribe has landed, so
39+
/// `test_only_emit` reliably reaches a subscriber. The subscribe runs on a
40+
/// background thread off `open_session`'s critical path (see
41+
/// `spawn_watcher_manager`), so tests that inject synthetic watcher events must
42+
/// sync on it instead of assuming the watcher is live the moment open returns.
43+
/// Each test runs in its own nextest process, so `watch_count() > 0` reflects
44+
/// only the session(s) opened in this test.
45+
fn wait_for_watcher_subscribed() {
46+
for _ in 0..200 {
47+
if super::watcher::VIEWER_WATCHER_MANAGER.watch_count() > 0 {
48+
return;
49+
}
50+
thread::sleep(Duration::from_millis(10));
51+
}
52+
panic!("watcher subscribe did not land within 2s");
53+
}
54+
3855
#[test]
3956
fn open_small_file_uses_full_load() {
4057
let dir = create_test_dir("small");
@@ -1251,6 +1268,9 @@ fn tail_mode_on_extends_backend_when_watcher_reports_grew() {
12511268
let result = session::open_session(path.to_str().unwrap()).unwrap();
12521269
let sid = result.session_id.clone();
12531270
let original_bytes = result.total_bytes;
1271+
// Sync on the background subscribe before mutating, so the catch-up re-stat
1272+
// stays a no-op and the explicit emit below is the driver under test.
1273+
wait_for_watcher_subscribed();
12541274

12551275
// Wait for the background ByteSeek → LineIndex upgrade to finish so
12561276
// we're testing the post-upgrade fast path (extend an existing
@@ -1496,6 +1516,9 @@ fn test_session_emits_file_changed_on_append() {
14961516
let file = write_test_file(&dir, "emit.log", &initial);
14971517
let result = session::open_session(file.to_str().unwrap()).unwrap();
14981518
let sid = result.session_id.clone();
1519+
// Sync on the background subscribe before mutating the file, so the
1520+
// catch-up re-stat stays a no-op and the explicit emit below is the driver.
1521+
wait_for_watcher_subscribed();
14991522
session::set_tail_mode(&sid, true).unwrap();
15001523

15011524
let mut content = initial.clone();
@@ -1536,6 +1559,7 @@ fn test_session_tail_mode_off_does_not_extend_index() {
15361559
let file = write_test_file(&dir, "tailoff.log", initial);
15371560
let result = session::open_session(file.to_str().unwrap()).unwrap();
15381561
let sid = result.session_id.clone();
1562+
wait_for_watcher_subscribed();
15391563
assert!(!session::test_only_tail_mode(&sid));
15401564

15411565
let original_lines = session::get_session_status(&sid).unwrap().total_lines;
@@ -1580,6 +1604,7 @@ fn test_session_rotation_reopens_backend() {
15801604
let result = session::open_session(file.to_str().unwrap()).unwrap();
15811605
let sid = result.session_id.clone();
15821606
let original_bytes = result.total_bytes;
1607+
wait_for_watcher_subscribed();
15831608

15841609
// Replace the file with a longer one.
15851610
let new_path = dir.join("rot.log.new");
@@ -1619,6 +1644,7 @@ fn test_session_close_stops_watcher() {
16191644
let result = session::open_session(file.to_str().unwrap()).unwrap();
16201645
let sid = result.session_id.clone();
16211646

1647+
wait_for_watcher_subscribed();
16221648
let canonical = fs::canonicalize(&file).unwrap();
16231649
// The subscription is alive: a test-only emit reaches it.
16241650
let sent_before = super::watcher::test_only_emit(&canonical, super::watcher::WatcherEvent::MetadataOnly);

0 commit comments

Comments
 (0)