Skip to content

Commit 5cd14bd

Browse files
jpheinclaude
andcommitted
feat: concurrent mining with ThreadPoolExecutor + improved room routing
Mining: - Added _prepare_file() for thread-safe file processing (read/chunk/route) - mine() now supports --workers flag (default: min(8, cpu_count)) - Concurrent path: bulk mtime pre-fetch, parallel _prepare_file(), serialized ChromaDB writes in batches of 100. Sequential path unchanged (workers=1). Room routing: - Priority 1: exact folder match only (no substring) - Priority 2: exact filename match only - Content scan increased from 2KB to 5KB (full file if <10KB) - Keyword scoring uses word-boundary regex instead of substring count - Added 13 unit tests for detect_room covering all priority paths Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
1 parent fccb705 commit 5cd14bd

2 files changed

Lines changed: 290 additions & 48 deletions

File tree

mempalace/miner.py

Lines changed: 192 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
"""
99

1010
import os
11+
import re
1112
import sys
1213
import hashlib
1314
import fnmatch
@@ -17,7 +18,7 @@
1718

1819
import chromadb
1920

20-
from .palace import SKIP_DIRS, get_collection, file_already_mined
21+
from .palace import SKIP_DIRS, get_collection, file_already_mined, bulk_check_mined
2122

2223
READABLE_EXTENSIONS = {
2324
".txt",
@@ -279,34 +280,37 @@ def detect_room(filepath: Path, content: str, rooms: list, project_path: Path) -
279280
"""
280281
Route a file to the right room.
281282
Priority:
282-
1. Folder path matches a room name
283-
2. Filename matches a room name or keyword
284-
3. Content keyword scoring
283+
1. Folder path exactly matches a room name or keyword
284+
2. Filename exactly matches a room name or keyword
285+
3. Content keyword scoring (word-boundary matching)
285286
4. Fallback: "general"
286287
"""
287288
relative = str(filepath.relative_to(project_path)).lower()
288289
filename = filepath.stem.lower()
289-
content_lower = content[:2000].lower()
290+
# Use more content for keyword scoring: full file up to 10KB, else first 5KB
291+
scan_limit = len(content) if len(content) <= 10000 else 5000
292+
content_lower = content[:scan_limit].lower()
290293

291-
# Priority 1: folder path matches room name or keywords
294+
# Priority 1: folder path exactly matches room name or keywords
292295
path_parts = relative.replace("\\", "/").split("/")
293296
for part in path_parts[:-1]: # skip filename itself
294297
for room in rooms:
295298
candidates = [room["name"].lower()] + [k.lower() for k in room.get("keywords", [])]
296-
if any(part == c or c in part or part in c for c in candidates):
299+
if any(part == c for c in candidates):
297300
return room["name"]
298301

299-
# Priority 2: filename matches room name
302+
# Priority 2: filename exactly matches room name or keyword
300303
for room in rooms:
301-
if room["name"].lower() in filename or filename in room["name"].lower():
304+
candidates = [room["name"].lower()] + [k.lower() for k in room.get("keywords", [])]
305+
if any(filename == c for c in candidates):
302306
return room["name"]
303307

304-
# Priority 3: keyword scoring from room keywords + name
308+
# Priority 3: keyword scoring with word-boundary matching
305309
scores = defaultdict(int)
306310
for room in rooms:
307311
keywords = room.get("keywords", []) + [room["name"]]
308312
for kw in keywords:
309-
count = content_lower.count(kw.lower())
313+
count = len(re.findall(r'\b' + re.escape(kw.lower()) + r'\b', content_lower))
310314
scores[room["name"]] += count
311315

312316
if scores:
@@ -404,39 +408,36 @@ def add_drawer(
404408
# =============================================================================
405409

406410

407-
def process_file(
411+
def _prepare_file(
408412
filepath: Path,
409413
project_path: Path,
410-
collection,
411414
wing: str,
412415
rooms: list,
413416
agent: str,
414-
dry_run: bool,
415417
) -> tuple:
416-
"""Read, chunk, route, and file one file. Returns (drawer_count, room_name)."""
418+
"""Read, chunk, and route one file without writing to ChromaDB.
417419
418-
# Skip if already filed
420+
Returns (batch_docs, batch_ids, batch_metas, room) or (None, None, None, None)
421+
when the file should be skipped (unreadable, too small, etc.).
422+
This is the pure-computation half of process_file, safe for concurrent use.
423+
"""
419424
source_file = str(filepath)
420-
if not dry_run and file_already_mined(collection, source_file, check_mtime=True):
421-
return 0, None
422425

423426
try:
424427
content = filepath.read_text(encoding="utf-8", errors="replace")
425428
except OSError:
426-
return 0, None
429+
return None, None, None, None
427430

428431
content = content.strip()
429432
if len(content) < MIN_CHUNK_SIZE:
430-
return 0, None
433+
return None, None, None, None
431434

432435
room = detect_room(filepath, content, rooms, project_path)
433436
chunks = chunk_text(content, source_file)
434437

435-
if dry_run:
436-
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)")
437-
return len(chunks), room
438+
if not chunks:
439+
return None, None, None, None
438440

439-
# Batch all chunks into a single upsert call per file
440441
batch_docs = []
441442
batch_ids = []
442443
batch_metas = []
@@ -461,12 +462,50 @@ def process_file(
461462
batch_ids.append(drawer_id)
462463
batch_metas.append(metadata)
463464

464-
if batch_docs:
465-
collection.upsert(
466-
documents=batch_docs,
467-
ids=batch_ids,
468-
metadatas=batch_metas,
469-
)
465+
return batch_docs, batch_ids, batch_metas, room
466+
467+
468+
def process_file(
469+
filepath: Path,
470+
project_path: Path,
471+
collection,
472+
wing: str,
473+
rooms: list,
474+
agent: str,
475+
dry_run: bool,
476+
) -> tuple:
477+
"""Read, chunk, route, and file one file. Returns (drawer_count, room_name)."""
478+
479+
# Skip if already filed
480+
source_file = str(filepath)
481+
if not dry_run and file_already_mined(collection, source_file, check_mtime=True):
482+
return 0, None
483+
484+
if dry_run:
485+
# Still need to read/chunk for the dry-run report
486+
try:
487+
content = filepath.read_text(encoding="utf-8", errors="replace")
488+
except OSError:
489+
return 0, None
490+
content = content.strip()
491+
if len(content) < MIN_CHUNK_SIZE:
492+
return 0, None
493+
room = detect_room(filepath, content, rooms, project_path)
494+
chunks = chunk_text(content, source_file)
495+
print(f" [DRY RUN] {filepath.name} → room:{room} ({len(chunks)} drawers)")
496+
return len(chunks), room
497+
498+
batch_docs, batch_ids, batch_metas, room = _prepare_file(
499+
filepath, project_path, wing, rooms, agent
500+
)
501+
if batch_docs is None:
502+
return 0, None
503+
504+
collection.upsert(
505+
documents=batch_docs,
506+
ids=batch_ids,
507+
metadatas=batch_metas,
508+
)
470509

471510
return len(batch_docs), room
472511

@@ -545,6 +584,26 @@ def scan_project(
545584
# =============================================================================
546585

547586

587+
def _is_already_mined(source_file: str, mined_map: dict) -> bool:
588+
"""Check if a file is already mined using the bulk-fetched mined_map.
589+
590+
Compares stored mtime against current file mtime, matching the logic
591+
in file_already_mined() but without per-file DB queries.
592+
"""
593+
stored_mtime = mined_map.get(source_file)
594+
if stored_mtime is None:
595+
return False
596+
try:
597+
current_mtime = os.path.getmtime(source_file)
598+
except OSError:
599+
return False
600+
return abs(stored_mtime - current_mtime) < 0.01
601+
602+
603+
# Maximum documents per ChromaDB upsert call
604+
_UPSERT_BATCH_SIZE = 100
605+
606+
548607
def mine(
549608
project_dir: str,
550609
palace_path: str,
@@ -554,8 +613,16 @@ def mine(
554613
dry_run: bool = False,
555614
respect_gitignore: bool = True,
556615
include_ignored: list = None,
616+
workers: int = 0,
557617
):
558-
"""Mine a project directory into the palace."""
618+
"""Mine a project directory into the palace.
619+
620+
When workers > 1, files are read/chunked/routed in parallel threads
621+
and then written to ChromaDB sequentially (the Python client is not
622+
thread-safe for concurrent writes to the same collection).
623+
"""
624+
import concurrent.futures
625+
import threading
559626

560627
project_path = Path(project_dir).expanduser().resolve()
561628
config = load_config(project_dir)
@@ -571,13 +638,18 @@ def mine(
571638
if limit > 0:
572639
files = files[:limit]
573640

641+
if workers <= 0:
642+
workers = min(8, os.cpu_count() or 4)
643+
574644
print(f"\n{'=' * 55}")
575645
print(" MemPalace Mine")
576646
print(f"{'=' * 55}")
577647
print(f" Wing: {wing}")
578648
print(f" Rooms: {', '.join(r['name'] for r in rooms)}")
579649
print(f" Files: {len(files)}")
580650
print(f" Palace: {palace_path}")
651+
if workers > 1:
652+
print(f" Workers: {workers}")
581653
if dry_run:
582654
print(" DRY RUN — nothing will be filed")
583655
if not respect_gitignore:
@@ -595,23 +667,96 @@ def mine(
595667
files_skipped = 0
596668
room_counts = defaultdict(int)
597669

598-
for i, filepath in enumerate(files, 1):
599-
drawers, room = process_file(
600-
filepath=filepath,
601-
project_path=project_path,
602-
collection=collection,
603-
wing=wing,
604-
rooms=rooms,
605-
agent=agent,
606-
dry_run=dry_run,
607-
)
608-
if drawers == 0 and not dry_run:
609-
files_skipped += 1
610-
else:
611-
total_drawers += drawers
670+
# --- Sequential path (workers=1 or dry_run) ---
671+
if workers <= 1 or dry_run:
672+
for i, filepath in enumerate(files, 1):
673+
drawers, room = process_file(
674+
filepath=filepath,
675+
project_path=project_path,
676+
collection=collection,
677+
wing=wing,
678+
rooms=rooms,
679+
agent=agent,
680+
dry_run=dry_run,
681+
)
682+
if drawers == 0 and not dry_run:
683+
files_skipped += 1
684+
else:
685+
total_drawers += drawers
686+
room_counts[room] += 1
687+
if not dry_run:
688+
print(f" \u2713 [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers}")
689+
else:
690+
# --- Concurrent path (workers > 1) ---
691+
692+
# Phase 0: bulk-fetch already-mined mtimes to skip files without
693+
# per-file DB queries.
694+
filepaths_str = [str(f) for f in files]
695+
mined_map = bulk_check_mined(collection, filepaths_str)
696+
697+
# Filter out already-mined files before spawning threads.
698+
files_to_process = []
699+
for filepath in files:
700+
if _is_already_mined(str(filepath), mined_map):
701+
files_skipped += 1
702+
else:
703+
files_to_process.append(filepath)
704+
705+
# Phase 1: parallel read/chunk/route
706+
counter_lock = threading.Lock()
707+
processed_count = 0
708+
709+
def prepare_one(filepath):
710+
return filepath, _prepare_file(filepath, project_path, wing, rooms, agent)
711+
712+
results = []
713+
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as pool:
714+
futures = {pool.submit(prepare_one, fp): fp for fp in files_to_process}
715+
for future in concurrent.futures.as_completed(futures):
716+
filepath, (batch_docs, batch_ids, batch_metas, room) = future.result()
717+
if batch_docs is None:
718+
with counter_lock:
719+
files_skipped += 1
720+
continue
721+
results.append((filepath, batch_docs, batch_ids, batch_metas, room))
722+
with counter_lock:
723+
processed_count += 1
724+
print(
725+
f" \u2713 [{processed_count:4}/{len(files_to_process)}] "
726+
f"{filepath.name[:50]:50} +{len(batch_docs)}"
727+
)
728+
729+
# Phase 2: sequential ChromaDB writes, batched across files
730+
pending_docs = []
731+
pending_ids = []
732+
pending_metas = []
733+
734+
for filepath, batch_docs, batch_ids, batch_metas, room in results:
735+
total_drawers += len(batch_docs)
612736
room_counts[room] += 1
613-
if not dry_run:
614-
print(f" ✓ [{i:4}/{len(files)}] {filepath.name[:50]:50} +{drawers}")
737+
738+
pending_docs.extend(batch_docs)
739+
pending_ids.extend(batch_ids)
740+
pending_metas.extend(batch_metas)
741+
742+
# Flush when batch is large enough
743+
if len(pending_docs) >= _UPSERT_BATCH_SIZE:
744+
collection.upsert(
745+
documents=pending_docs,
746+
ids=pending_ids,
747+
metadatas=pending_metas,
748+
)
749+
pending_docs = []
750+
pending_ids = []
751+
pending_metas = []
752+
753+
# Flush remainder
754+
if pending_docs:
755+
collection.upsert(
756+
documents=pending_docs,
757+
ids=pending_ids,
758+
metadatas=pending_metas,
759+
)
615760

616761
print(f"\n{'=' * 55}")
617762
print(" Done.")

0 commit comments

Comments
 (0)