Skip to content

Commit 323ae86

Browse files
committed
Indexing: Fix orphaned entries & missing dir sizes
`INSERT OR REPLACE` in `insert_entries_v2_batch` silently deleted old entries and re-inserted with new IDs during subtree rescans, orphaning all children (their `parent_id` pointed to the deleted old ID). This caused unbounded DB growth (5.5M → 21M entries) and zero `dir_stats`for any entry created after the initial full scan. - Subtree scans now send `DeleteDescendantsById` before inserting, ensuring a clean slate with no orphans - Subtree aggregation queries now use recursive CTEs scoped to the target subtree instead of full-table scans - Schema version bumped to v3 to force DB rebuild on existing installs - Added debug logging in `ScanContext::new` for path resolution failures
1 parent c224740 commit 323ae86

6 files changed

Lines changed: 161 additions & 35 deletions

File tree

apps/desktop/src-tauri/src/indexing/CLAUDE.md

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ All writes go through a dedicated `std::thread` via an unbounded mpsc channel. T
6161

6262
Reads happen on separate WAL connections (any thread). The global read-only store (`GLOBAL_INDEX_STORE`) provides enrichment without passing `AppHandle` through the listing pipeline.
6363

64-
### SQLite schema (v2: integer-keyed)
64+
### SQLite schema (v3: integer-keyed)
6565

6666
One DB per volume: `~/Library/Application Support/com.veszelovszki.cmdr/index-{volume_id}.db`
6767

@@ -72,7 +72,7 @@ Three tables:
7272

7373
WAL mode, 64 MB page cache. Custom `platform_case` collation registered on every connection: case-insensitive + NFD normalization on macOS, binary on Linux. **Opening the DB with the sqlite3 CLI will fail** on queries touching the name column (the collation isn't registered).
7474

75-
**Migration in progress**: Schema bumped from v1 to v2. Milestones 1-5 complete. Scanner, writer, aggregator, reconciler, enrichment, and IPC commands all fully migrated to integer keys. `IndexManager` owns a `PathResolver` for LRU-cached path→ID resolution in IPC commands (`get_dir_stats`, `get_dir_stats_batch`). Enrichment uses integer-keyed fast path: resolve parent once → batch child dir stats by ID. Reconciler sends integer-keyed messages exclusively. Old path-keyed `WriteMessage` variants and backward-compat shims (`ScannedEntry`, `DirStats`) still exist for post-replay verification — cleanup in milestone 6. All 848 tests pass.
75+
**Schema v3**: Bumped from v2 to force DB rebuild after fixing orphan entry bug. Scanner, writer, aggregator, reconciler, enrichment, and IPC commands all fully migrated to integer keys. `IndexManager` owns a `PathResolver` for LRU-cached path→ID resolution in IPC commands (`get_dir_stats`, `get_dir_stats_batch`). Enrichment uses integer-keyed fast path: resolve parent once → batch child dir stats by ID. Reconciler sends integer-keyed messages exclusively. Old path-keyed `WriteMessage` variants and backward-compat shims (`ScannedEntry`, `DirStats`) still exist for post-replay verification — cleanup in milestone 6.
7676

7777
## How to test
7878

@@ -103,6 +103,10 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
103103

104104
**Physical sizes (`st_blocks * 512`)**: More meaningful for disk usage than logical size. May overcount ~10-20% for APFS clones (shared blocks). Volume usage bar uses `statfs()` for true totals.
105105

106+
**Subtree rescans delete descendants first**: `scan_subtree` sends `DeleteDescendantsById(root_id)` to the writer before inserting fresh entries. This prevents orphaned entries that previously caused DB bloat (4x) and missing dir_stats. The root entry is preserved (its existing ID is reused by `ScanContext`). The delete and subsequent inserts are serialized through the single writer channel, so no race conditions. `ComputeSubtreeAggregates` runs after the scan to recompute stats.
107+
108+
**Subtree aggregation uses scoped queries**: `scoped_get_children_stats_by_id` and `scoped_get_child_dir_ids` in `aggregator.rs` use recursive CTEs scoped to the target subtree, not full-table scans. This keeps subtree aggregation O(subtree_size) regardless of total DB size.
109+
106110
**Disposable cache pattern**: The index DB is a cache, not a source of truth. Any corruption or error triggers delete+rebuild. No user-facing errors for DB issues.
107111

108112
**cmdr-fsevent-stream fork (macOS only)**: Our fork of `fsevent-stream` (v0.3.0) provides direct access to FSEvents event IDs, `sinceWhen` replay, and `MustScanSubDirs` flags. Only used on macOS. On Linux, the `notify` crate (inotify backend) provides recursive directory watching with `RecursiveMode::Recursive`.
@@ -137,6 +141,8 @@ Key test files are alongside each module (test functions within `#[cfg(test)]` b
137141

138142
**Reconciler holds a read connection**: `process_fs_event`, `replay`, and `process_live_event` all require a `&Connection` parameter for path-to-ID resolution. Callers (event loops in mod.rs) open a read connection via `IndexStore::open_write_connection(writer.db_path())` at loop start and pass it through. This is a WAL-mode connection so it doesn't block the writer. The `IndexManager` also owns a `PathResolver` with LRU cache, used by IPC commands (`get_dir_stats`, `get_dir_stats_batch`) for cached resolution. The event loops don't use the `PathResolver` yet because they run in separate async tasks -- could be migrated in a future optimization pass.
139143

140-
**ScanContext maps scan root to ROOT_ID**: Both `scan_volume` and `scan_subtree` create a `ScanContext` that maps the scan root directory to `ROOT_ID` (1). This means all top-level entries under any scan root get `parent_id = ROOT_ID` in the DB. For subtree scans, this means the scan root itself isn't stored as an entry — only its children are. The `ScanContext` opens a temporary read connection to the DB to fetch `next_id` via `get_next_id()`.
144+
**ScanContext maps scan root to ROOT_ID**: Both `scan_volume` and `scan_subtree` create a `ScanContext` that maps the scan root directory to `ROOT_ID` (1). This means all top-level entries under any scan root get `parent_id = ROOT_ID` in the DB. For subtree scans, the root is resolved to its existing entry ID (not ROOT_ID), and `DeleteDescendantsById` is sent before the scan starts. The `ScanContext` opens a temporary read connection to the DB to fetch `next_id` via `get_next_id()`.
145+
146+
**Never use `INSERT OR REPLACE` on entries without deleting descendants first**: `INSERT OR REPLACE` on the `idx_parent_name` unique index silently deletes the old row and inserts a new one with a new ID. This orphans all children (their `parent_id` points to the deleted old ID) and orphans the old `dir_stats` row. The scanner's `insert_entries_v2_batch` still uses `INSERT OR REPLACE` as a safety net, but it's always preceded by `DeleteDescendantsById` for subtree scans, so no conflicts should occur in practice.
141147

142148
**IndexWriter exposes `db_path()`**: The scanner needs the DB path to open a temporary connection for `ScanContext::new()`. This path is stored on the `IndexWriter` handle and accessible via `db_path()`. The temporary connection is short-lived (only used to read `MAX(id)`).

apps/desktop/src-tauri/src/indexing/aggregator.rs

Lines changed: 55 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -127,16 +127,15 @@ pub fn compute_subtree_aggregates(conn: &Connection, root: &str) -> Result<u64,
127127
let dir_count = dir_entries.len();
128128
log::debug!("Subtree aggregation: starting bottom-up computation for {dir_count} directories under {root}");
129129

130-
// Load direct children stats scoped to this subtree
131-
let dir_id_set: std::collections::HashSet<i64> = dir_entries.iter().map(|&(id, _)| id).collect();
132-
let direct_stats = scoped_get_children_stats_by_id(conn, &dir_id_set)?;
130+
// Load direct children stats scoped to this subtree via recursive CTE
131+
let direct_stats = scoped_get_children_stats_by_id(conn, root_id)?;
133132
log::debug!(
134133
"Subtree aggregation: loaded stats for {} parent IDs in {:.1}ms",
135134
direct_stats.len(),
136135
start.elapsed().as_secs_f64() * 1000.0,
137136
);
138137

139-
let child_dirs_map = scoped_get_child_dir_ids(conn, &dir_id_set)?;
138+
let child_dirs_map = scoped_get_child_dir_ids(conn, root_id)?;
140139
log::debug!(
141140
"Subtree aggregation: loaded child dirs for {} parent IDs in {:.1}ms",
142141
child_dirs_map.len(),
@@ -325,35 +324,70 @@ fn bulk_get_child_dir_ids(conn: &Connection) -> Result<HashMap<i64, Vec<i64>>, I
325324
Ok(map)
326325
}
327326

328-
/// Load direct children stats scoped to a set of directory IDs.
327+
/// Load direct children stats scoped to a subtree via recursive CTE.
329328
///
330329
/// Returns a map: `parent_id -> (total_file_size, file_count, dir_count)`.
331-
/// Only includes results where `parent_id` is in the provided set.
330+
/// Only includes entries whose parent is within the subtree rooted at `root_id`.
332331
fn scoped_get_children_stats_by_id(
333332
conn: &Connection,
334-
dir_ids: &std::collections::HashSet<i64>,
333+
root_id: i64,
335334
) -> Result<HashMap<i64, (u64, u64, u64)>, IndexStoreError> {
336-
// Use bulk query and filter in memory (more efficient than N individual queries)
337-
let all_stats = bulk_get_children_stats_by_id(conn)?;
338-
Ok(all_stats
339-
.into_iter()
340-
.filter(|(parent_id, _)| dir_ids.contains(parent_id))
341-
.collect())
335+
let mut stmt = conn.prepare(
336+
"WITH RECURSIVE subtree(id) AS (
337+
SELECT id FROM entries WHERE id = ?1
338+
UNION ALL
339+
SELECT e.id FROM entries e JOIN subtree s ON e.parent_id = s.id
340+
)
341+
SELECT e.parent_id,
342+
COALESCE(SUM(CASE WHEN e.is_directory = 0 THEN e.size ELSE 0 END), 0),
343+
COALESCE(SUM(CASE WHEN e.is_directory = 0 THEN 1 ELSE 0 END), 0),
344+
COALESCE(SUM(CASE WHEN e.is_directory = 1 THEN 1 ELSE 0 END), 0)
345+
FROM entries e
346+
WHERE e.parent_id IN (SELECT id FROM subtree)
347+
GROUP BY e.parent_id",
348+
)?;
349+
let rows = stmt.query_map(params![root_id], |row| {
350+
Ok((
351+
row.get::<_, i64>(0)?,
352+
row.get::<_, u64>(1)?,
353+
row.get::<_, u64>(2)?,
354+
row.get::<_, u64>(3)?,
355+
))
356+
})?;
357+
let mut map = HashMap::new();
358+
for row in rows {
359+
let (parent_id, size, files, dirs) = row?;
360+
map.insert(parent_id, (size, files, dirs));
361+
}
362+
Ok(map)
342363
}
343364

344-
/// Load child directory IDs scoped to a set of parent directory IDs.
365+
/// Load child directory IDs scoped to a subtree via recursive CTE.
345366
///
346367
/// Returns a map: `parent_id -> Vec<child_dir_id>`.
347-
/// Only includes results where `parent_id` is in the provided set.
368+
/// Only includes entries whose parent is within the subtree rooted at `root_id`.
348369
fn scoped_get_child_dir_ids(
349370
conn: &Connection,
350-
dir_ids: &std::collections::HashSet<i64>,
371+
root_id: i64,
351372
) -> Result<HashMap<i64, Vec<i64>>, IndexStoreError> {
352-
let all_children = bulk_get_child_dir_ids(conn)?;
353-
Ok(all_children
354-
.into_iter()
355-
.filter(|(parent_id, _)| dir_ids.contains(parent_id))
356-
.collect())
373+
let mut stmt = conn.prepare(
374+
"WITH RECURSIVE subtree(id) AS (
375+
SELECT id FROM entries WHERE id = ?1
376+
UNION ALL
377+
SELECT e.id FROM entries e JOIN subtree s ON e.parent_id = s.id
378+
)
379+
SELECT e.parent_id, e.id FROM entries e
380+
WHERE e.parent_id IN (SELECT id FROM subtree) AND e.is_directory = 1",
381+
)?;
382+
let rows = stmt.query_map(params![root_id], |row| {
383+
Ok((row.get::<_, i64>(0)?, row.get::<_, i64>(1)?))
384+
})?;
385+
let mut map: HashMap<i64, Vec<i64>> = HashMap::new();
386+
for row in rows {
387+
let (parent_id, child_id) = row?;
388+
map.entry(parent_id).or_default().push(child_id);
389+
}
390+
Ok(map)
357391
}
358392

359393
// ── Tests ────────────────────────────────────────────────────────────

apps/desktop/src-tauri/src/indexing/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1467,6 +1467,9 @@ fn verify_affected_dirs(affected_paths: &std::collections::HashSet<String>, writ
14671467
});
14681468

14691469
if is_dir {
1470+
log::debug!(
1471+
"verify_affected_dirs: new dir on disk: {normalized} (parent_id={parent_id})"
1472+
);
14701473
new_dir_paths.push(normalized);
14711474
} else if let Some(sz) = size {
14721475
// UpsertEntryV2 inserts the entry; propagate its size delta up the

apps/desktop/src-tauri/src/indexing/scanner.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,17 @@ fn run_scan(
274274
ScanContext::new(&conn, root, is_volume_root).map_err(|e| ScanError::WriterSend(e.to_string()))?
275275
};
276276

277+
// For subtree rescans, delete existing descendants first to prevent orphaned entries.
278+
// The scan will re-insert fresh children with correct parent-child relationships.
279+
// The root entry itself is preserved (ScanContext resolved its existing ID).
280+
if !is_volume_root
281+
&& let Some(&root_id) = scan_ctx.dir_ids.get(root)
282+
{
283+
writer
284+
.send(WriteMessage::DeleteDescendantsById(root_id))
285+
.map_err(|e| ScanError::WriterSend(e.to_string()))?;
286+
}
287+
277288
let walker = build_walker(root, num_threads, is_volume_root);
278289

279290
for entry_result in walker {

apps/desktop/src-tauri/src/indexing/store.rs

Lines changed: 59 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
use rusqlite::{Connection, OptionalExtension, params};
2020
use std::path::{Path, PathBuf};
2121

22-
const SCHEMA_VERSION: &str = "2";
22+
const SCHEMA_VERSION: &str = "3";
2323

2424
/// Root entry sentinel ID. All top-level entries have `parent_id = ROOT_ID`.
2525
pub const ROOT_ID: i64 = 1;
@@ -98,9 +98,35 @@ impl ScanContext {
9898
let root_id = if is_volume_root {
9999
ROOT_ID
100100
} else {
101-
match resolve_path(conn, &root.to_string_lossy())? {
101+
let root_str = root.to_string_lossy();
102+
match resolve_path(conn, &root_str)? {
102103
Some(id) => id,
103104
None => {
105+
// Diagnose which component is missing by walking the path
106+
let stripped = root_str.strip_prefix('/').unwrap_or(&root_str);
107+
let mut current_id = ROOT_ID;
108+
for component in stripped.split('/') {
109+
if component.is_empty() {
110+
continue;
111+
}
112+
match IndexStore::resolve_component(conn, current_id, component) {
113+
Ok(Some(id)) => current_id = id,
114+
Ok(None) => {
115+
log::debug!(
116+
"ScanContext::new: resolve_path({root_str}) failed at \
117+
component \"{component}\" (parent_id={current_id})"
118+
);
119+
break;
120+
}
121+
Err(e) => {
122+
log::debug!(
123+
"ScanContext::new: resolve_path({root_str}) errored at \
124+
component \"{component}\" (parent_id={current_id}): {e}"
125+
);
126+
break;
127+
}
128+
}
129+
}
104130
return Err(IndexStoreError::Sqlite(rusqlite::Error::QueryReturnedNoRows));
105131
}
106132
}
@@ -821,6 +847,33 @@ impl IndexStore {
821847
Ok(())
822848
}
823849

850+
/// Delete all descendants of an entry (but not the entry itself) using recursive CTE.
851+
///
852+
/// Used before subtree rescans to prevent orphaned entries. The root entry is kept
853+
/// because the scanner's `ScanContext` resolves it by path and uses its existing ID.
854+
pub fn delete_descendants_by_id(conn: &Connection, root_id: i64) -> Result<(), IndexStoreError> {
855+
// Collect descendant IDs (excluding root) then delete dir_stats and entries
856+
conn.execute(
857+
"WITH RECURSIVE descendants(id) AS (
858+
SELECT id FROM entries WHERE parent_id = ?1
859+
UNION ALL
860+
SELECT e.id FROM entries e JOIN descendants d ON e.parent_id = d.id
861+
)
862+
DELETE FROM dir_stats WHERE entry_id IN (SELECT id FROM descendants)",
863+
params![root_id],
864+
)?;
865+
conn.execute(
866+
"WITH RECURSIVE descendants(id) AS (
867+
SELECT id FROM entries WHERE parent_id = ?1
868+
UNION ALL
869+
SELECT e.id FROM entries e JOIN descendants d ON e.parent_id = d.id
870+
)
871+
DELETE FROM entries WHERE id IN (SELECT id FROM descendants)",
872+
params![root_id],
873+
)?;
874+
Ok(())
875+
}
876+
824877
/// Delete an entire subtree by root entry ID using recursive CTE.
825878
///
826879
/// No internal transaction: safe to call inside an outer `BEGIN IMMEDIATE`.
@@ -952,7 +1005,7 @@ mod tests {
9521005
fn schema_creation_and_version() {
9531006
let (store, _dir) = open_temp_store();
9541007
let status = store.get_index_status().unwrap();
955-
assert_eq!(status.schema_version.as_deref(), Some("2"));
1008+
assert_eq!(status.schema_version.as_deref(), Some(SCHEMA_VERSION));
9561009
}
9571010

9581011
#[test]
@@ -1120,7 +1173,7 @@ mod tests {
11201173

11211174
// Schema version should be re-stamped
11221175
let version = IndexStore::get_meta(&write_conn, "schema_version").unwrap();
1123-
assert_eq!(version.as_deref(), Some("2"));
1176+
assert_eq!(version.as_deref(), Some(SCHEMA_VERSION));
11241177

11251178
// Entries should be gone (except root sentinel)
11261179
let children = store.list_children(ROOT_ID).unwrap();
@@ -1142,7 +1195,7 @@ mod tests {
11421195
// Re-open: should detect mismatch and reset
11431196
let store = IndexStore::open(&db_path).unwrap();
11441197
let status = store.get_index_status().unwrap();
1145-
assert_eq!(status.schema_version.as_deref(), Some("2"));
1198+
assert_eq!(status.schema_version.as_deref(), Some(SCHEMA_VERSION));
11461199
}
11471200

11481201
#[test]
@@ -1156,7 +1209,7 @@ mod tests {
11561209
// open() should recover by deleting and recreating
11571210
let store = IndexStore::open(&db_path).unwrap();
11581211
let status = store.get_index_status().unwrap();
1159-
assert_eq!(status.schema_version.as_deref(), Some("2"));
1212+
assert_eq!(status.schema_version.as_deref(), Some(SCHEMA_VERSION));
11601213
}
11611214

11621215
#[test]

apps/desktop/src-tauri/src/indexing/writer.rs

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,9 @@ pub enum WriteMessage {
3535
DeleteEntryById(i64),
3636
/// Watcher: delete a subtree (directory removed with all children) by entry ID.
3737
DeleteSubtreeById(i64),
38+
/// Scanner: delete all descendants of an entry before a subtree rescan.
39+
/// Prevents orphaned entries when re-scanning an already-indexed subtree.
40+
DeleteDescendantsById(i64),
3841
/// Watcher: incremental delta propagation walking the parent_id chain.
3942
PropagateDeltaById {
4043
entry_id: i64,
@@ -182,7 +185,9 @@ impl WriterStats {
182185
WriteMessage::InsertEntriesV2(_) => self.current.insert_entries += 1,
183186
WriteMessage::UpsertEntryV2 { .. } => self.current.upsert_entry += 1,
184187
WriteMessage::DeleteEntryById(_) => self.current.delete_entry += 1,
185-
WriteMessage::DeleteSubtreeById(_) => self.current.delete_subtree += 1,
188+
WriteMessage::DeleteSubtreeById(_) | WriteMessage::DeleteDescendantsById(_) => {
189+
self.current.delete_subtree += 1;
190+
}
186191
WriteMessage::PropagateDeltaById { .. } => self.current.propagate_delta += 1,
187192
WriteMessage::ComputeAllAggregates | WriteMessage::ComputeSubtreeAggregates { .. } => {
188193
self.current.compute_aggregates += 1;
@@ -307,10 +312,17 @@ fn process_message(conn: &rusqlite::Connection, msg: WriteMessage, stats: &Write
307312
}
308313
}
309314
Ok(None) => {
310-
if let Err(e) =
311-
IndexStore::insert_entry_v2(conn, parent_id, &name, is_directory, is_symlink, size, modified_at)
312-
{
313-
log::warn!("Index writer: insert_entry_v2 failed for {name}: {e}");
315+
match IndexStore::insert_entry_v2(
316+
conn, parent_id, &name, is_directory, is_symlink, size, modified_at,
317+
) {
318+
Ok(new_id) => {
319+
log::debug!(
320+
"Writer: UpsertEntryV2 inserted \"{name}\" (parent_id={parent_id}) → id={new_id}"
321+
);
322+
}
323+
Err(e) => {
324+
log::warn!("Index writer: insert_entry_v2 failed for {name}: {e}");
325+
}
314326
}
315327
}
316328
Err(e) => {
@@ -349,6 +361,13 @@ fn process_message(conn: &rusqlite::Connection, msg: WriteMessage, stats: &Write
349361
propagate_delta_by_id(conn, pid, size_delta, file_delta, dir_delta);
350362
}
351363
}
364+
WriteMessage::DeleteDescendantsById(root_id) => {
365+
// No delta propagation: the subtree will be immediately re-scanned and
366+
// ComputeSubtreeAggregates will recompute stats for the subtree root.
367+
if let Err(e) = IndexStore::delete_descendants_by_id(conn, root_id) {
368+
log::warn!("Index writer: delete_descendants_by_id failed for id={root_id}: {e}");
369+
}
370+
}
352371
WriteMessage::PropagateDeltaById {
353372
entry_id,
354373
size_delta,

0 commit comments

Comments
 (0)