diff --git a/mempalace/backends/chroma.py b/mempalace/backends/chroma.py index 3a0d2c3f9..971fca20c 100644 --- a/mempalace/backends/chroma.py +++ b/mempalace/backends/chroma.py @@ -4,10 +4,19 @@ import logging import os import sqlite3 +from contextlib import contextmanager from typing import Any, Optional import chromadb +try: + import fcntl as _fcntl + + _HAS_FLOCK = True +except ImportError: + _fcntl = None # type: ignore[assignment] + _HAS_FLOCK = False # Windows — no cross-process flock available + from .base import ( BaseBackend, BaseCollection, @@ -176,14 +185,61 @@ def _as_list(v: Any) -> list: return [v] +@contextmanager +def _palace_write_lock(palace_path: Optional[str]): + """Cross-process exclusive write lock for ChromaDB writes. + + Claude Code spawns one mcp_server.py per open terminal; stop hooks spawn + additional short-lived writers (diary writes, mine subprocesses). All open + independent PersistentClient instances against the same palace directory. + ChromaDB has no inter-process write locking — concurrent col.add/upsert/ + update/delete from N processes corrupts the HNSW segment, causing the next + read to SIGSEGV in chromadb_rust_bindings. + + Serializing all writes with flock(LOCK_EX) on a lock file in the palace + directory prevents the corruption. flock auto-releases on process death — + a mid-write crash cannot deadlock future writers. + + On Windows, fcntl is unavailable — yields without locking. Windows users + running multiple MCP server processes remain exposed to the underlying + ChromaDB concurrency issue. palace-daemon, which provides proper asyncio + semaphores, is the recommended path for multi-client setups on any + platform. + + palace_path may be None when the adapter is wrapping a collection whose + owning palace path isn't known (e.g. tests); in that case locking is + skipped. + """ + if not _HAS_FLOCK or not palace_path: + yield + return + try: + os.makedirs(palace_path, exist_ok=True) + except OSError: + pass + lock_path = os.path.join(palace_path, ".write.lock") + with open(lock_path, "a") as _lf: + _fcntl.flock(_lf.fileno(), _fcntl.LOCK_EX) + try: + yield + finally: + _fcntl.flock(_lf.fileno(), _fcntl.LOCK_UN) + + class ChromaCollection(BaseCollection): - """Thin adapter translating ChromaDB dict returns into typed results.""" + """Thin adapter translating ChromaDB dict returns into typed results. + + Wraps all write methods (add/upsert/update/delete) in a cross-process + flock so concurrent MCP servers + mine subprocesses cannot corrupt the + HNSW segment by racing their writes. + """ - def __init__(self, collection): + def __init__(self, collection, palace_path: Optional[str] = None): self._collection = collection + self._palace_path = palace_path # ------------------------------------------------------------------ - # Writes + # Writes (serialized via cross-process flock on palace dir) # ------------------------------------------------------------------ def add(self, *, documents, ids, metadatas=None, embeddings=None): @@ -192,7 +248,8 @@ def add(self, *, documents, ids, metadatas=None, embeddings=None): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.add(**kwargs) + with _palace_write_lock(self._palace_path): + self._collection.add(**kwargs) def upsert(self, *, documents, ids, metadatas=None, embeddings=None): kwargs: dict[str, Any] = {"documents": documents, "ids": ids} @@ -200,7 +257,8 @@ def upsert(self, *, documents, ids, metadatas=None, embeddings=None): kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.upsert(**kwargs) + with _palace_write_lock(self._palace_path): + self._collection.upsert(**kwargs) def update( self, @@ -219,7 +277,8 @@ def update( kwargs["metadatas"] = metadatas if embeddings is not None: kwargs["embeddings"] = embeddings - self._collection.update(**kwargs) + with _palace_write_lock(self._palace_path): + self._collection.update(**kwargs) # ------------------------------------------------------------------ # Reads @@ -363,7 +422,8 @@ def delete(self, *, ids=None, where=None): kwargs["ids"] = ids if where is not None: kwargs["where"] = where - self._collection.delete(**kwargs) + with _palace_write_lock(self._palace_path): + self._collection.delete(**kwargs) def count(self): return self._collection.count() @@ -538,7 +598,7 @@ def get_collection( ) else: collection = client.get_collection(collection_name) - return ChromaCollection(collection) + return ChromaCollection(collection, palace_path=palace_path) def close_palace(self, palace) -> None: """Drop cached handles for ``palace``. Accepts ``PalaceRef`` or legacy path str.""" @@ -581,7 +641,7 @@ def create_collection( collection = self._client(palace_path).create_collection( collection_name, metadata={"hnsw:space": hnsw_space} ) - return ChromaCollection(collection) + return ChromaCollection(collection, palace_path=palace_path) def _normalize_get_collection_args(args, kwargs): diff --git a/mempalace/mcp_server.py b/mempalace/mcp_server.py index 2650e3073..e504029d2 100644 --- a/mempalace/mcp_server.py +++ b/mempalace/mcp_server.py @@ -220,12 +220,16 @@ def _get_collection(create=False): _collection_cache = ChromaCollection( client.get_or_create_collection( _config.collection_name, metadata={"hnsw:space": "cosine"} - ) + ), + palace_path=_config.palace_path, ) _metadata_cache = None _metadata_cache_time = 0 elif _collection_cache is None: - _collection_cache = ChromaCollection(client.get_collection(_config.collection_name)) + _collection_cache = ChromaCollection( + client.get_collection(_config.collection_name), + palace_path=_config.palace_path, + ) _metadata_cache = None _metadata_cache_time = 0 return _collection_cache