Skip to content

Commit bf0b47f

Browse files
committed
Indexing: Fix missing dir sizes after replay
- Dirs created by reconciler/live events via `UpsertEntryV2` never got a `dir_stats` row, so they showed no sizes in the UI - `UpsertEntryV2` now initializes a zero-valued `dir_stats` row when inserting a new directory - Added `backfill_missing_dir_stats()` aggregator function that finds dirs without stats and computes them bottom-up - `BackfillMissingDirStats` writer message triggers after post-scan reconciler replay and cold-start FSEvents replay
1 parent 5dbd4a5 commit bf0b47f

4 files changed

Lines changed: 245 additions & 3 deletions

File tree

apps/desktop/src-tauri/src/indexing/CLAUDE.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ Full design: `docs/specs/drive-indexing/plan.md`
1212
- **store.rs** -- SQLite schema v2 (integer-keyed entries, dir_stats by entry_id, meta), platform_case collation, read queries, DB open/migrate. Schema version check: mismatch triggers drop+rebuild. Both path-keyed (backward compat) and integer-keyed APIs.
1313
- **path_resolver.rs** -- `PathResolver`: resolves filesystem paths to integer entry IDs via component-by-component walk with full-path LRU cache (50K entries). Case-aware `CacheKey` on macOS (NFD + case fold). Prefix-based invalidation for deletes/renames.
1414
- **memory_watchdog.rs** -- Background task monitoring resident memory via `mach_task_info` (macOS). Warns at 8 GB, stops indexing at 16 GB, emits `index-memory-warning` event to frontend. No-op stub on non-macOS. Started from `start_indexing()`.
15-
- **writer.rs** -- Single writer thread, owns the write connection, processes `WriteMessage` channel (bounded `sync_channel`, 20K capacity, backpressure via blocking). Priority: `UpdateDirStats` before `InsertEntries`. `Flush` variant + async `flush()` method let callers wait for all prior writes to commit. Has both integer-keyed variants (`InsertEntriesV2`, `UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`) and path-keyed backward-compat variants. The integer-keyed delete/subtree-delete handlers auto-propagate negative deltas via the `parent_id` chain (same pattern as the path-keyed variants). `propagate_delta_by_id` walks the parent chain using `get_parent_id` lookups. Maintains `AccumulatorMaps` during `InsertEntriesV2` processing (two HashMaps: direct children stats and child dir relationships + an `entries_inserted` counter), cleared on `TruncateData`. On `ComputeAllAggregates`, passes accumulated maps to `aggregator::compute_all_aggregates_with_maps()` to skip expensive full-table-scan SQL queries. Accepts an optional `AppHandle` at spawn time to emit `index-aggregation-progress` events during aggregation (phase, current, total). Also emits `saving_entries` phase progress during `InsertEntriesV2` processing when the expected total is set via `set_expected_total_entries()` (an `Arc<AtomicU64>` shared between the writer thread and the `IndexWriter` handle).
15+
- **writer.rs** -- Single writer thread, owns the write connection, processes `WriteMessage` channel (bounded `sync_channel`, 20K capacity, backpressure via blocking). Priority: `UpdateDirStats` before `InsertEntries`. `Flush` variant + async `flush()` method let callers wait for all prior writes to commit. Has both integer-keyed variants (`InsertEntriesV2`, `UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`) and path-keyed backward-compat variants. The integer-keyed delete/subtree-delete handlers auto-propagate negative deltas via the `parent_id` chain (same pattern as the path-keyed variants). `propagate_delta_by_id` walks the parent chain using `get_parent_id` lookups. `UpsertEntryV2` initializes a zero-valued `dir_stats` row when inserting a NEW directory, so enrichment always has a row (subsequent `PropagateDeltaById` calls update it incrementally). Maintains `AccumulatorMaps` during `InsertEntriesV2` processing (two HashMaps: direct children stats and child dir relationships + an `entries_inserted` counter), cleared on `TruncateData`. On `ComputeAllAggregates`, passes accumulated maps to `aggregator::compute_all_aggregates_with_maps()` to skip expensive full-table-scan SQL queries. Accepts an optional `AppHandle` at spawn time to emit `index-aggregation-progress` events during aggregation (phase, current, total). Also emits `saving_entries` phase progress during `InsertEntriesV2` processing when the expected total is set via `set_expected_total_entries()` (an `Arc<AtomicU64>` shared between the writer thread and the `IndexWriter` handle).
1616
- **scanner.rs** -- jwalk-based parallel directory walker. `scan_volume()` for full scan, `scan_subtree()` for micro-scans. Uses `ScanContext` (from store.rs) to assign integer IDs and parent IDs during the walk: maintains a `HashMap<PathBuf, i64>` mapping directory paths to assigned IDs. The scan root is mapped to `ROOT_ID` (1). Sends `InsertEntriesV2(Vec<EntryRow>)` batches to the writer. Platform-specific exclusion filters (macOS system paths, Linux virtual filesystems). Physical sizes (`st_blocks * 512`).
1717
- **micro_scan.rs** -- `MicroScanManager`: bounded task pool (default 3 concurrent), priority queue (`UserSelected` > `CurrentDir`), deduplication, cancellation. Skips after full scan completes.
18-
- **aggregator.rs** -- Dir stats computation. Bottom-up after full scan (O(N) single pass), per-subtree after micro-scan, incremental delta propagation up ancestor chain for watcher events. Two entry points for full aggregation: `compute_all_aggregates_reported` (loads maps from SQL) and `compute_all_aggregates_with_maps` (accepts pre-built maps from the writer). Both accept an `on_progress: &mut dyn FnMut(AggregationProgress)` callback and delegate to `compute_and_write()` for the shared topological sort + bottom-up computation + batch write. Progress is reported at phase transitions and every ~1% during compute/write loops. `AggregationPhase` enum: `SavingEntries` (flushing writer channel), `LoadingDirectories`, `Sorting`, `Computing`, `Writing`.
18+
- **aggregator.rs** -- Dir stats computation. Bottom-up after full scan (O(N) single pass), per-subtree after micro-scan, incremental delta propagation up ancestor chain for watcher events. Two entry points for full aggregation: `compute_all_aggregates_reported` (loads maps from SQL) and `compute_all_aggregates_with_maps` (accepts pre-built maps from the writer). Both accept an `on_progress: &mut dyn FnMut(AggregationProgress)` callback and delegate to `compute_and_write()` for the shared topological sort + bottom-up computation + batch write. Progress is reported at phase transitions and every ~1% during compute/write loops. `AggregationPhase` enum: `SavingEntries` (flushing writer channel), `LoadingDirectories`, `Sorting`, `Computing`, `Writing`. `backfill_missing_dir_stats` is a catch-up pass that finds directories without `dir_stats` rows and computes their stats bottom-up; triggered after reconciler replay and cold-start replay via `BackfillMissingDirStats` writer message.
1919
- **watcher.rs** -- Drive-level filesystem watcher. macOS: FSEvents via `cmdr-fsevent-stream` with event IDs and `sinceWhen` replay. Linux: `notify` crate (inotify backend) with recursive watching and synthetic event counter. Other platforms: stub. `supports_event_replay()` lets callers branch on whether journal replay is available.
2020
- **reconciler.rs** -- Buffers FSEvents during scan (capped at 500K events; overflow sets `buffer_overflow` flag forcing full rescan), replays after scan completes using event IDs to skip stale events. Processes live events for file creates/removes/modifies using integer-keyed write messages (`UpsertEntryV2`, `DeleteEntryById`, `DeleteSubtreeById`, `PropagateDeltaById`). Resolves filesystem paths to entry IDs via `store::resolve_path()` using a read connection passed by callers. Key functions (`process_fs_event`, `emit_dir_updated`) are `pub(super)` so `mod.rs` can call them directly during cold-start replay.
2121
- **firmlinks.rs** -- Parses `/usr/share/firmlinks`, builds prefix map, normalizes paths. Converts `/System/Volumes/Data/Users/foo` to `/Users/foo`.
@@ -146,6 +146,8 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
146146

147147
**`verifier.rs` is a placeholder**: Per-navigation readdir diff is a future milestone. Currently just a TODO comment.
148148

149+
**Dirs created by reconciler/live events must get `dir_stats` immediately**: `UpsertEntryV2` inserts a zero-valued `dir_stats` row when creating a new directory. Without this, directories created after the last full aggregation have no `dir_stats` and show no sizes in the UI. `BackfillMissingDirStats` runs after reconciler replay and cold-start replay as a catch-up pass for any dirs that slipped through (for example, from older code before this fix). The zero-init + backfill combination guarantees every directory always has a `dir_stats` row.
150+
149151
**Scan cancellation leaves partial data**: By design. `scan_completed_at` not set in meta, so next startup detects incomplete scan and runs fresh. No cleanup needed.
150152

151153
**`ReadPool` replaces `INDEXING` lock for all read-only DB access**: Enrichment (`enrich_entries_with_index`), verification Phase 1 (`verify_affected_dirs`), and background verification dir-stat reads all use `get_read_pool()` + `pool.with_conn()` — thread-local SQLite connections with no lock contention. The `INDEXING` mutex now guards only lifecycle transitions and IPC commands that need `PathResolver`. `with_conn` uses `thread_local!` storage, so callers must not have `.await` points between obtaining the pool and completing the closure (async task migration would break thread affinity).

apps/desktop/src-tauri/src/indexing/aggregator.rs

Lines changed: 162 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -301,6 +301,107 @@ pub fn compute_subtree_aggregates(conn: &Connection, root: &str) -> Result<u64,
301301
Ok(count)
302302
}
303303

304+
/// Backfill `dir_stats` for directories that have entries but no stats row.
305+
///
306+
/// Finds all directories missing a `dir_stats` row and computes their stats
307+
/// bottom-up. This catches directories created by reconciler/live events
308+
/// after the last full aggregation. Returns the number of dirs backfilled.
309+
pub fn backfill_missing_dir_stats(conn: &Connection) -> Result<u64, IndexStoreError> {
310+
// Find directories without dir_stats
311+
let missing_ids = load_dirs_missing_stats(conn)?;
312+
if missing_ids.is_empty() {
313+
return Ok(0);
314+
}
315+
316+
let start = std::time::Instant::now();
317+
let count = missing_ids.len();
318+
log::debug!("Backfill: {count} directories missing dir_stats, computing...");
319+
320+
// Load ALL directory entries for the topological sort (we need the full
321+
// tree structure to compute bottom-up correctly, since a missing dir's
322+
// children may also be missing).
323+
let all_dir_entries = load_all_directory_ids(conn)?;
324+
325+
// Build direct_stats and child_dirs maps scoped to the missing dirs
326+
// and their descendants. We use the full-table bulk queries since the
327+
// missing dirs can be scattered across the tree.
328+
let direct_stats = bulk_get_children_stats_by_id(conn)?;
329+
let child_dirs_map = bulk_get_child_dir_ids(conn)?;
330+
331+
// Topological sort all dirs (we need correct ordering)
332+
let sorted = topological_sort_bottom_up(&all_dir_entries);
333+
334+
// Build set of missing IDs for fast lookup
335+
let missing_set: std::collections::HashSet<i64> = missing_ids.into_iter().collect();
336+
337+
// Compute stats bottom-up for ALL dirs, but only write the missing ones.
338+
// We need to compute all because a missing dir's stats depend on its
339+
// children (which might have existing stats in the DB or might also be
340+
// missing).
341+
let mut computed: HashMap<i64, DirStatsById> = HashMap::with_capacity(sorted.len());
342+
let mut to_write: Vec<DirStatsById> = Vec::with_capacity(count);
343+
344+
for &dir_id in &sorted {
345+
let (file_size_sum, file_count, child_dir_count) =
346+
direct_stats.get(&dir_id).copied().unwrap_or((0, 0, 0));
347+
348+
let mut recursive_size = file_size_sum;
349+
let mut recursive_file_count = file_count;
350+
let mut recursive_dir_count = child_dir_count;
351+
352+
if let Some(children) = child_dirs_map.get(&dir_id) {
353+
for &child_id in children {
354+
// Prefer freshly computed stats, fall back to existing DB stats
355+
if let Some(child_stats) = computed.get(&child_id) {
356+
recursive_size += child_stats.recursive_size;
357+
recursive_file_count += child_stats.recursive_file_count;
358+
recursive_dir_count += child_stats.recursive_dir_count;
359+
} else if let Ok(Some(db_stats)) = IndexStore::get_dir_stats_by_id(conn, child_id) {
360+
recursive_size += db_stats.recursive_size;
361+
recursive_file_count += db_stats.recursive_file_count;
362+
recursive_dir_count += db_stats.recursive_dir_count;
363+
}
364+
}
365+
}
366+
367+
let stats = DirStatsById {
368+
entry_id: dir_id,
369+
recursive_size,
370+
recursive_file_count,
371+
recursive_dir_count,
372+
};
373+
374+
if missing_set.contains(&dir_id) {
375+
to_write.push(stats.clone());
376+
}
377+
computed.insert(dir_id, stats);
378+
}
379+
380+
// Batch-write only the missing stats
381+
for chunk in to_write.chunks(1000) {
382+
IndexStore::upsert_dir_stats_by_id(conn, chunk)?;
383+
}
384+
385+
log::debug!(
386+
"Backfill: wrote {} dir_stats rows in {:.1}s",
387+
to_write.len(),
388+
start.elapsed().as_secs_f64(),
389+
);
390+
391+
Ok(to_write.len() as u64)
392+
}
393+
394+
/// Load directory IDs that have entries but no `dir_stats` row.
395+
fn load_dirs_missing_stats(conn: &Connection) -> Result<Vec<i64>, IndexStoreError> {
396+
let mut stmt = conn.prepare(
397+
"SELECT e.id FROM entries e
398+
LEFT JOIN dir_stats ds ON ds.entry_id = e.id
399+
WHERE e.is_directory = 1 AND ds.entry_id IS NULL",
400+
)?;
401+
let rows = stmt.query_map([], |row| row.get::<_, i64>(0))?;
402+
rows.collect::<Result<Vec<_>, _>>().map_err(Into::into)
403+
}
404+
304405
// ── Internal helpers ─────────────────────────────────────────────────
305406

306407
/// Load all directory `(id, parent_id)` pairs from the entries table.
@@ -690,6 +791,67 @@ mod tests {
690791
assert_eq!(count, 0);
691792
}
692793

794+
// ── backfill_missing_dir_stats tests ─────────────────────────────
795+
796+
#[test]
797+
fn backfill_fills_missing_stats() {
798+
let (conn, _dir) = open_temp_conn();
799+
800+
// Tree: /a (id=2) with /a/f.txt (id=3, 100 bytes), /a/sub (id=4), /a/sub/g.txt (id=5, 200)
801+
insert_entries(
802+
&conn,
803+
&[
804+
make_dir(2, ROOT_ID, "a"),
805+
make_file(3, 2, "f.txt", 100),
806+
make_dir(4, 2, "sub"),
807+
make_file(5, 4, "g.txt", 200),
808+
],
809+
);
810+
811+
// Only compute stats for /a/sub (id=4) — leave /a (id=2) and root (id=1) missing
812+
IndexStore::upsert_dir_stats_by_id(
813+
&conn,
814+
&[DirStatsById {
815+
entry_id: 4,
816+
recursive_size: 200,
817+
recursive_file_count: 1,
818+
recursive_dir_count: 0,
819+
}],
820+
)
821+
.unwrap();
822+
823+
// Backfill should fill in root sentinel (id=1) and /a (id=2)
824+
let count = backfill_missing_dir_stats(&conn).unwrap();
825+
assert_eq!(count, 2); // root sentinel + /a
826+
827+
// /a should now have correct recursive stats
828+
let a_stats = get_stats(&conn, 2).unwrap();
829+
assert_eq!(a_stats.recursive_size, 300); // 100 + 200
830+
assert_eq!(a_stats.recursive_file_count, 2);
831+
assert_eq!(a_stats.recursive_dir_count, 1);
832+
833+
// Root sentinel should also be correct
834+
let root_stats = get_stats(&conn, ROOT_ID).unwrap();
835+
assert_eq!(root_stats.recursive_size, 300);
836+
}
837+
838+
#[test]
839+
fn backfill_noop_when_all_stats_present() {
840+
let (conn, _dir) = open_temp_conn();
841+
842+
insert_entries(
843+
&conn,
844+
&[make_dir(2, ROOT_ID, "a"), make_file(3, 2, "f.txt", 100)],
845+
);
846+
847+
// Compute all stats first
848+
compute_all_aggregates(&conn).unwrap();
849+
850+
// Backfill should find nothing to do
851+
let count = backfill_missing_dir_stats(&conn).unwrap();
852+
assert_eq!(count, 0);
853+
}
854+
693855
// ── topological sort test ────────────────────────────────────────
694856

695857
#[test]

apps/desktop/src-tauri/src/indexing/mod.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -861,6 +861,10 @@ impl IndexManager {
861861
}
862862
}
863863

864+
// Backfill dir_stats for any directories created by the replay
865+
// that didn't go through the full aggregation pass.
866+
let _ = writer.send(WriteMessage::BackfillMissingDirStats);
867+
864868
// Switch to live mode
865869
reconciler.switch_to_live();
866870

@@ -1602,6 +1606,10 @@ async fn run_replay_event_loop(
16021606
reconciler::emit_dir_updated(&app, affected_paths.iter().cloned().collect());
16031607
}
16041608

1609+
// Backfill dir_stats for any directories created by the replay
1610+
// that didn't go through a full aggregation pass.
1611+
let _ = writer.send(WriteMessage::BackfillMissingDirStats);
1612+
16051613
// ── Switch to live mode immediately (before verification) ────────
16061614

16071615
log::debug!("Replay: switching to live mode");

0 commit comments

Comments
 (0)