Skip to content

Commit 8a6671d

Browse files
committed
Viewer: session-level tail wiring + Tauri commands
- `ViewerSession` gains `tail_mode`, `watcher_stop`; spawns a per-session watcher manager thread that consumes WatcherEvents - Manager thread queues `Grew` into `pending_grew` when an upgrade or rebuild is in flight (drain-and-swap protocol); else extends the backend directly when tail mode is on; reload on `Shrunk`/`Replaced` - Adds `viewer_set_tail_mode` and `viewer_reload` Tauri commands, plus the FE bindings and IPC contract tests - Emits `viewer:file-changed:<sid>` so the FE can react with a reload toast (next commit) - Watcher subscribe happens AFTER `SESSIONS.insert` so a slow notify-debouncer setup can't race the upgrade thread into a stuck `upgrading: Some(_)` state - Registers `viewer_set_encoding` and `viewer_get_encoding_options` in `ipc.rs` (they were collected by specta but never wired into the runtime dispatch handler; tail-mode work surfaced the gap)
1 parent 9a7ef22 commit 8a6671d

8 files changed

Lines changed: 507 additions & 4 deletions

File tree

apps/desktop/src-tauri/src/commands/file_viewer.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -208,6 +208,24 @@ pub fn viewer_set_encoding(session_id: String, encoding: FileEncoding) -> Result
208208
file_viewer::set_encoding(&session_id, encoding).map_err(|e| e.to_string())
209209
}
210210

211+
/// Toggles tail mode for a viewer session. When enabled, the backend extends its line index
212+
/// in response to filesystem `Grew` events so the viewport can auto-follow new bytes.
213+
/// When disabled, the FE still receives `viewer:file-changed:<sid>` events and renders a
214+
/// persistent reload toast.
215+
#[tauri::command]
216+
#[specta::specta]
217+
pub fn viewer_set_tail_mode(session_id: String, enabled: bool) -> Result<(), String> {
218+
file_viewer::set_tail_mode(&session_id, enabled).map_err(|e| e.to_string())
219+
}
220+
221+
/// Reopens the viewer's backend against the file on disk under the session's current
222+
/// encoding. Called by the FE reload toast and on file rotation.
223+
#[tauri::command]
224+
#[specta::specta]
225+
pub fn viewer_reload(session_id: String) -> Result<(), String> {
226+
file_viewer::reload(&session_id).map_err(|e| e.to_string())
227+
}
228+
211229
/// Sets up a viewer-specific menu on the given window (adds "Word wrap" to View submenu).
212230
#[tauri::command]
213231
#[specta::specta]

apps/desktop/src-tauri/src/file_viewer/session.rs

Lines changed: 274 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
use std::collections::HashMap;
77
use std::path::PathBuf;
88
use std::sync::atomic::{AtomicBool, Ordering};
9-
use std::sync::{Arc, LazyLock, Mutex};
9+
use std::sync::{Arc, LazyLock, Mutex, RwLock};
1010
use std::thread;
1111
use std::time::Duration;
1212

@@ -16,18 +16,36 @@ use crate::commands::file_system::expand_tilde;
1616
use crate::ignore_poison::IgnorePoison;
1717
use log::debug;
1818
use serde::Serialize;
19+
use tauri::{AppHandle, Emitter};
1920

2021
use super::byte_seek::ByteSeekBackend;
2122
use super::encoding::{FileEncoding, detect, same_byte_layout};
2223
use super::full_load::FullLoadBackend;
2324
use super::line_index::LineIndexBackend;
2425
use super::range_read::{RangeEnd, read_range as do_read_range};
2526
use super::search_matcher::{Matcher, SearchMode};
27+
use super::watcher::{VIEWER_WATCHER_MANAGER, ViewerSubscription, WatcherEvent};
2628
use super::{
2729
BackendCapabilities, FULL_LOAD_THRESHOLD, FileViewerBackend, LineChunk, MAX_SEARCH_MATCHES, SearchMatch,
2830
SeekTarget, ViewerError,
2931
};
3032

33+
/// Process-wide AppHandle for emitting `viewer:file-changed:<sid>` events from
34+
/// background watcher threads. Set during app setup via [`init_app_handle`].
35+
static VIEWER_APP_HANDLE: LazyLock<RwLock<Option<AppHandle>>> = LazyLock::new(|| RwLock::new(None));
36+
37+
/// Stash the AppHandle so the per-session watcher manager threads can emit
38+
/// `viewer:file-changed:<session_id>` events.
39+
pub fn init_app_handle(handle: AppHandle) {
40+
if let Ok(mut guard) = VIEWER_APP_HANDLE.write() {
41+
*guard = Some(handle);
42+
}
43+
}
44+
45+
fn app_handle() -> Option<AppHandle> {
46+
VIEWER_APP_HANDLE.read().ok().and_then(|g| g.clone())
47+
}
48+
3149
/// Which backend strategy is active for a session.
3250
#[derive(Debug, Clone, Serialize, specta::Type)]
3351
#[serde(rename_all = "camelCase")]
@@ -170,11 +188,19 @@ struct ViewerSession {
170188
/// all under one mutex lock.
171189
pending_grew: Mutex<Option<u64>>,
172190
/// Current encoding. Updated atomically with the backend swap on `set_encoding`.
173-
#[allow(dead_code, reason = "milestone-3 watcher/tail extends usage")]
174191
encoding: Mutex<FileEncoding>,
175192
/// Detected encoding at open time (sticky; never changes after `open_session`).
176-
#[allow(dead_code, reason = "milestone-3 watcher/tail extends usage")]
177193
detected_encoding: FileEncoding,
194+
/// Tail mode flag: when true, `Grew` watcher events trigger a backend
195+
/// `extend_to` so the open viewport auto-follows newly appended bytes.
196+
/// When false, the FE still hears `viewer:file-changed:<sid>` events and
197+
/// renders the persistent reload toast.
198+
tail_mode: AtomicBool,
199+
/// Cancel flag the manager thread reads on session close. Dropping the
200+
/// session sets this flag; the manager thread observes it on its next
201+
/// receive cycle, drops its owned `ViewerSubscription`, and exits. The
202+
/// subscription's `Drop` then unwatches the path via the shared singleton.
203+
watcher_stop: Arc<AtomicBool>,
178204
/// Per-read cancel flags. Each `read_range` call inserts an entry keyed by the FE's
179205
/// `read_id`, removes it on completion (cancelled or not). The FE generates fresh
180206
/// ids per call (a monotonic counter is fine; uniqueness within a session is all
@@ -333,6 +359,9 @@ pub fn open_session(path: &str) -> Result<ViewerOpenResult, ViewerError> {
333359
});
334360
}
335361

362+
let watcher_stop = Arc::new(AtomicBool::new(false));
363+
let watcher_stop_for_thread = watcher_stop.clone();
364+
336365
let session = ViewerSession {
337366
backend,
338367
backend_type: Mutex::new(backend_type.clone()),
@@ -342,6 +371,8 @@ pub fn open_session(path: &str) -> Result<ViewerOpenResult, ViewerError> {
342371
pending_grew: Mutex::new(None),
343372
encoding: Mutex::new(detected_encoding),
344373
detected_encoding,
374+
tail_mode: AtomicBool::new(false),
375+
watcher_stop,
345376
active_reads: Mutex::new(HashMap::new()),
346377
path: file_path,
347378
};
@@ -374,7 +405,25 @@ pub fn open_session(path: &str) -> Result<ViewerOpenResult, ViewerError> {
374405
encoding: detected_encoding,
375406
};
376407

377-
SESSIONS.lock_ignore_poison().insert(session_id, session);
408+
let session_path = session.path.clone();
409+
SESSIONS.lock_ignore_poison().insert(session_id.clone(), session);
410+
411+
// Subscribe the session to filesystem events AFTER it's been inserted
412+
// into the SESSIONS map. notify-debouncer-full's `new_debouncer` /
413+
// `debouncer.watch` calls take long enough on macOS (FSEvents stream
414+
// setup) that doing them before insert opens a window where the ByteSeek
415+
// → LineIndex upgrade thread can finish and fail to find the session,
416+
// leaving `upgrading` stuck at `Some` forever. Best-effort: if the
417+
// watcher can't register, the session still opens; the tail toggle just
418+
// won't surface external changes.
419+
//
420+
// Tests can opt out via `CMDR_VIEWER_DISABLE_WATCHER=1`.
421+
if std::env::var("CMDR_VIEWER_DISABLE_WATCHER").is_err() {
422+
match VIEWER_WATCHER_MANAGER.subscribe(&session_path) {
423+
Ok(sub) => spawn_watcher_manager_thread(session_id, sub, watcher_stop_for_thread),
424+
Err(e) => debug!("viewer watcher subscribe failed for {}: {}", session_path.display(), e),
425+
}
426+
}
378427

379428
Ok(result)
380429
}
@@ -952,10 +1001,231 @@ pub fn close_session(session_id: &str) -> Result<(), ViewerError> {
9521001
if let Some(rebuild_cancel) = session.rebuilding.lock_ignore_poison().as_ref() {
9531002
rebuild_cancel.store(true, Ordering::Relaxed);
9541003
}
1004+
// Stop the watcher manager thread; dropping the Arc<ViewerSubscription>
1005+
// when `session` falls out of scope unregisters the underlying path.
1006+
session.watcher_stop.store(true, Ordering::Relaxed);
9551007
// Cancel any in-flight range reads so they exit promptly with `Cancelled`.
9561008
for flag in session.active_reads.lock_ignore_poison().values() {
9571009
flag.store(true, Ordering::Relaxed);
9581010
}
9591011
}
9601012
Ok(())
9611013
}
1014+
1015+
/// Toggle tail mode for a session. When enabled, future watcher `Grew` events
1016+
/// trigger an `extend_to` on the active backend so the open viewport
1017+
/// auto-follows newly appended bytes. When disabled, the FE still receives
1018+
/// `viewer:file-changed:<sid>` events and renders its persistent reload toast.
1019+
pub fn set_tail_mode(session_id: &str, enabled: bool) -> Result<(), ViewerError> {
1020+
let sessions = SESSIONS.lock_ignore_poison();
1021+
let session = sessions.get(session_id).ok_or_else(|| ViewerError::SessionNotFound {
1022+
session_id: session_id.to_string(),
1023+
})?;
1024+
session.tail_mode.store(enabled, Ordering::Relaxed);
1025+
debug!("set_tail_mode: session={}, enabled={}", session_id, enabled);
1026+
1027+
// If tail is being turned on and the file already grew on disk while tail
1028+
// was off, jump the backend to the on-disk EOF so the user doesn't have to
1029+
// wait for the next change to see the catch-up.
1030+
if enabled {
1031+
let path = session.path.clone();
1032+
let backend_arc = session.load_backend();
1033+
drop(sessions);
1034+
if let Ok(meta) = std::fs::metadata(&path) {
1035+
let on_disk = meta.len();
1036+
if on_disk > backend_arc.total_bytes() {
1037+
apply_tail_extend(session_id, on_disk);
1038+
}
1039+
}
1040+
}
1041+
Ok(())
1042+
}
1043+
1044+
/// Reopen the active backend from scratch with the session's current encoding.
1045+
/// Called by the FE's reload toast or by the watcher's rotation handler.
1046+
/// Choice of backend mirrors `open_session`: FullLoad under the threshold,
1047+
/// otherwise ByteSeek (an in-flight LineIndex upgrade isn't restarted here;
1048+
/// the next `get_lines` settles into the right backend).
1049+
pub fn reload(session_id: &str) -> Result<(), ViewerError> {
1050+
let path;
1051+
let encoding;
1052+
{
1053+
let sessions = SESSIONS.lock_ignore_poison();
1054+
let session = sessions.get(session_id).ok_or_else(|| ViewerError::SessionNotFound {
1055+
session_id: session_id.to_string(),
1056+
})?;
1057+
path = session.path.clone();
1058+
encoding = *session.encoding.lock_ignore_poison();
1059+
}
1060+
1061+
let metadata = std::fs::metadata(&path)?;
1062+
let file_size = metadata.len();
1063+
let new_backend: Box<dyn FileViewerBackend> = if file_size <= FULL_LOAD_THRESHOLD {
1064+
Box::new(FullLoadBackend::open_with_encoding(&path, encoding)?)
1065+
} else {
1066+
Box::new(ByteSeekBackend::open_with_encoding(&path, encoding)?)
1067+
};
1068+
let new_type = if file_size <= FULL_LOAD_THRESHOLD {
1069+
BackendType::FullLoad
1070+
} else {
1071+
BackendType::ByteSeek
1072+
};
1073+
1074+
let sessions = SESSIONS.lock_ignore_poison();
1075+
if let Some(session) = sessions.get(session_id) {
1076+
session.backend.store(Arc::new(new_backend));
1077+
*session.backend_type.lock_ignore_poison() = new_type;
1078+
// Reset the pending grew queue; the fresh backend covers what the queue
1079+
// was reserving for the old one.
1080+
*session.pending_grew.lock_ignore_poison() = None;
1081+
}
1082+
Ok(())
1083+
}
1084+
1085+
/// Manager thread spawned once per session. Owns the `ViewerSubscription`
1086+
/// (kept off `ViewerSession` because the channel receiver isn't `Sync`).
1087+
/// Drops the subscription when `stop` flips (set by `close_session`) or when
1088+
/// the upstream channel disconnects; the subscription's `Drop` then
1089+
/// unregisters the path from the shared singleton.
1090+
fn spawn_watcher_manager_thread(session_id: String, sub: ViewerSubscription, stop: Arc<AtomicBool>) {
1091+
thread::spawn(move || {
1092+
// Poll with a timeout so we periodically check `stop` even if the
1093+
// file's idle: this is the only way the manager exits when
1094+
// `close_session` runs without the file changing first.
1095+
const POLL: Duration = Duration::from_millis(200);
1096+
loop {
1097+
if stop.load(Ordering::Relaxed) {
1098+
return;
1099+
}
1100+
if let Some(event) = sub.recv_timeout(POLL) {
1101+
if stop.load(Ordering::Relaxed) {
1102+
return;
1103+
}
1104+
handle_watcher_event(&session_id, event);
1105+
}
1106+
}
1107+
});
1108+
}
1109+
1110+
fn emit_file_changed(session_id: &str, kind: &'static str, new_size: Option<u64>) {
1111+
let Some(handle) = app_handle() else {
1112+
return;
1113+
};
1114+
let event = format!("viewer:file-changed:{}", session_id);
1115+
let payload = serde_json::json!({
1116+
"kind": kind,
1117+
"newSize": new_size,
1118+
});
1119+
if let Err(e) = handle.emit(&event, payload) {
1120+
debug!("emit viewer:file-changed failed: {}", e);
1121+
}
1122+
}
1123+
1124+
fn handle_watcher_event(session_id: &str, event: WatcherEvent) {
1125+
match event {
1126+
WatcherEvent::MetadataOnly => {
1127+
// No bytes changed; nothing for the viewer to do.
1128+
}
1129+
WatcherEvent::Grew(new_size) => {
1130+
// Always tell the FE.
1131+
emit_file_changed(session_id, "grew", Some(new_size));
1132+
1133+
// Look up session state to decide whether to queue or apply.
1134+
let (queue, can_extend, is_tail) = {
1135+
let sessions = SESSIONS.lock_ignore_poison();
1136+
let Some(session) = sessions.get(session_id) else {
1137+
return;
1138+
};
1139+
let upgrading = session.upgrading.lock_ignore_poison().is_some();
1140+
let rebuilding = session.rebuilding.lock_ignore_poison().is_some();
1141+
(
1142+
upgrading || rebuilding,
1143+
!upgrading && !rebuilding,
1144+
session.tail_mode.load(Ordering::Relaxed),
1145+
)
1146+
};
1147+
1148+
if queue {
1149+
push_pending_grew(session_id, new_size);
1150+
return;
1151+
}
1152+
if can_extend && is_tail {
1153+
apply_tail_extend(session_id, new_size);
1154+
}
1155+
}
1156+
WatcherEvent::Shrunk | WatcherEvent::Replaced => {
1157+
emit_file_changed(session_id, "rotated", None);
1158+
// Best-effort reload; failure here surfaces on the next FE
1159+
// interaction.
1160+
let _ = reload(session_id);
1161+
}
1162+
}
1163+
}
1164+
1165+
/// Apply a single tail-mode extension under the drain-and-swap-under-lock
1166+
/// protocol. Re-reads the current backend on every call (no cached Arc) so
1167+
/// concurrent encoding rebuilds / upgrades don't write into a stale slot.
1168+
fn apply_tail_extend(session_id: &str, new_size: u64) {
1169+
let dummy_cancel = AtomicBool::new(false);
1170+
1171+
let sessions = SESSIONS.lock_ignore_poison();
1172+
let Some(session) = sessions.get(session_id) else {
1173+
return;
1174+
};
1175+
// Re-check: an upgrade or rebuild may have started between the watcher
1176+
// thread's read and this lock acquisition. Queue and bail in that case.
1177+
if session.upgrading.lock_ignore_poison().is_some() || session.rebuilding.lock_ignore_poison().is_some() {
1178+
let mut q = session.pending_grew.lock_ignore_poison();
1179+
let next = match *q {
1180+
Some(prev) => prev.max(new_size),
1181+
None => new_size,
1182+
};
1183+
*q = Some(next);
1184+
return;
1185+
}
1186+
// Fresh ArcSwap::load on every call: the watcher must observe whichever
1187+
// backend `set_encoding`/`upgrade` last installed.
1188+
let backend = session.backend.load_full();
1189+
let current_size = backend.total_bytes();
1190+
if new_size <= current_size {
1191+
return;
1192+
}
1193+
drop(sessions);
1194+
1195+
let extended = match backend.extend_to_boxed(new_size, &dummy_cancel) {
1196+
Ok(b) => b,
1197+
Err(_) => {
1198+
// The active backend can't extend (FullLoad). The viewer remains
1199+
// valid against the older byte range until the user reloads.
1200+
return;
1201+
}
1202+
};
1203+
1204+
let sessions = SESSIONS.lock_ignore_poison();
1205+
if let Some(session) = sessions.get(session_id) {
1206+
session.backend.store(Arc::new(extended));
1207+
}
1208+
}
1209+
1210+
fn push_pending_grew(session_id: &str, new_size: u64) {
1211+
let sessions = SESSIONS.lock_ignore_poison();
1212+
if let Some(session) = sessions.get(session_id) {
1213+
let mut q = session.pending_grew.lock_ignore_poison();
1214+
let next = match *q {
1215+
Some(prev) => prev.max(new_size),
1216+
None => new_size,
1217+
};
1218+
*q = Some(next);
1219+
}
1220+
}
1221+
1222+
/// Test-only helper: returns the current tail-mode flag.
1223+
#[cfg(test)]
1224+
#[allow(dead_code, reason = "consumed by session_test::tail_mode_can_be_toggled")]
1225+
pub fn test_only_tail_mode(session_id: &str) -> bool {
1226+
let sessions = SESSIONS.lock_ignore_poison();
1227+
sessions
1228+
.get(session_id)
1229+
.map(|s| s.tail_mode.load(Ordering::Relaxed))
1230+
.unwrap_or(false)
1231+
}

0 commit comments

Comments
 (0)