Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 69 additions & 9 deletions mempalace/backends/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,19 @@
import logging
import os
import sqlite3
from contextlib import contextmanager
from typing import Any, Optional

import chromadb

try:
import fcntl as _fcntl

_HAS_FLOCK = True
except ImportError:
_fcntl = None # type: ignore[assignment]
_HAS_FLOCK = False # Windows — no cross-process flock available

from .base import (
BaseBackend,
BaseCollection,
Expand Down Expand Up @@ -176,14 +185,61 @@ def _as_list(v: Any) -> list:
return [v]


@contextmanager
def _palace_write_lock(palace_path: Optional[str]):
"""Cross-process exclusive write lock for ChromaDB writes.

Claude Code spawns one mcp_server.py per open terminal; stop hooks spawn
additional short-lived writers (diary writes, mine subprocesses). All open
independent PersistentClient instances against the same palace directory.
ChromaDB has no inter-process write locking — concurrent col.add/upsert/
update/delete from N processes corrupts the HNSW segment, causing the next
read to SIGSEGV in chromadb_rust_bindings.

Serializing all writes with flock(LOCK_EX) on a lock file in the palace
directory prevents the corruption. flock auto-releases on process death —
a mid-write crash cannot deadlock future writers.

On Windows, fcntl is unavailable — yields without locking. Windows users
running multiple MCP server processes remain exposed to the underlying
ChromaDB concurrency issue. palace-daemon, which provides proper asyncio
semaphores, is the recommended path for multi-client setups on any
platform.

palace_path may be None when the adapter is wrapping a collection whose
owning palace path isn't known (e.g. tests); in that case locking is
skipped.
"""
if not _HAS_FLOCK or not palace_path:
yield
return
try:
os.makedirs(palace_path, exist_ok=True)
except OSError:
pass
lock_path = os.path.join(palace_path, ".write.lock")
with open(lock_path, "a") as _lf:
_fcntl.flock(_lf.fileno(), _fcntl.LOCK_EX)
try:
yield
finally:
_fcntl.flock(_lf.fileno(), _fcntl.LOCK_UN)


class ChromaCollection(BaseCollection):
"""Thin adapter translating ChromaDB dict returns into typed results."""
"""Thin adapter translating ChromaDB dict returns into typed results.

Wraps all write methods (add/upsert/update/delete) in a cross-process
flock so concurrent MCP servers + mine subprocesses cannot corrupt the
HNSW segment by racing their writes.
"""

def __init__(self, collection):
def __init__(self, collection, palace_path: Optional[str] = None):
self._collection = collection
self._palace_path = palace_path

# ------------------------------------------------------------------
# Writes
# Writes (serialized via cross-process flock on palace dir)
# ------------------------------------------------------------------

def add(self, *, documents, ids, metadatas=None, embeddings=None):
Expand All @@ -192,15 +248,17 @@ def add(self, *, documents, ids, metadatas=None, embeddings=None):
kwargs["metadatas"] = metadatas
if embeddings is not None:
kwargs["embeddings"] = embeddings
self._collection.add(**kwargs)
with _palace_write_lock(self._palace_path):
self._collection.add(**kwargs)

def upsert(self, *, documents, ids, metadatas=None, embeddings=None):
kwargs: dict[str, Any] = {"documents": documents, "ids": ids}
if metadatas is not None:
kwargs["metadatas"] = metadatas
if embeddings is not None:
kwargs["embeddings"] = embeddings
self._collection.upsert(**kwargs)
with _palace_write_lock(self._palace_path):
self._collection.upsert(**kwargs)

def update(
self,
Expand All @@ -219,7 +277,8 @@ def update(
kwargs["metadatas"] = metadatas
if embeddings is not None:
kwargs["embeddings"] = embeddings
self._collection.update(**kwargs)
with _palace_write_lock(self._palace_path):
self._collection.update(**kwargs)

# ------------------------------------------------------------------
# Reads
Expand Down Expand Up @@ -363,7 +422,8 @@ def delete(self, *, ids=None, where=None):
kwargs["ids"] = ids
if where is not None:
kwargs["where"] = where
self._collection.delete(**kwargs)
with _palace_write_lock(self._palace_path):
self._collection.delete(**kwargs)

def count(self):
return self._collection.count()
Expand Down Expand Up @@ -538,7 +598,7 @@ def get_collection(
)
else:
collection = client.get_collection(collection_name)
return ChromaCollection(collection)
return ChromaCollection(collection, palace_path=palace_path)

def close_palace(self, palace) -> None:
"""Drop cached handles for ``palace``. Accepts ``PalaceRef`` or legacy path str."""
Expand Down Expand Up @@ -581,7 +641,7 @@ def create_collection(
collection = self._client(palace_path).create_collection(
collection_name, metadata={"hnsw:space": hnsw_space}
)
return ChromaCollection(collection)
return ChromaCollection(collection, palace_path=palace_path)


def _normalize_get_collection_args(args, kwargs):
Expand Down
8 changes: 6 additions & 2 deletions mempalace/mcp_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -220,12 +220,16 @@ def _get_collection(create=False):
_collection_cache = ChromaCollection(
client.get_or_create_collection(
_config.collection_name, metadata={"hnsw:space": "cosine"}
)
),
palace_path=_config.palace_path,
)
_metadata_cache = None
_metadata_cache_time = 0
elif _collection_cache is None:
_collection_cache = ChromaCollection(client.get_collection(_config.collection_name))
_collection_cache = ChromaCollection(
client.get_collection(_config.collection_name),
palace_path=_config.palace_path,
)
_metadata_cache = None
_metadata_cache_time = 0
return _collection_cache
Expand Down
Loading