Skip to content

Commit 53675dd

Browse files
authored
Merge pull request #1160 from mvalentsev/fix/mcp-kg-lazy-per-path-cache
fix(mcp): lazy per-path KnowledgeGraph cache (#1136)
2 parents 7ede231 + 75ad8ae commit 53675dd

4 files changed

Lines changed: 319 additions & 25 deletions

File tree

mempalace/mcp_server.py

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@
4646
import json # noqa: E402
4747
import logging # noqa: E402
4848
import hashlib # noqa: E402
49+
import sqlite3 # noqa: E402
50+
import threading # noqa: E402
4951
import time # noqa: E402
5052
from datetime import date, datetime # noqa: E402
5153
from pathlib import Path # noqa: E402
@@ -79,7 +81,7 @@
7981
follow_tunnels,
8082
)
8183

82-
from .knowledge_graph import KnowledgeGraph # noqa: E402
84+
from .knowledge_graph import KnowledgeGraph, DEFAULT_KG_PATH # noqa: E402
8385

8486
logging.basicConfig(level=logging.INFO, format="%(message)s", stream=sys.stderr)
8587
logger = logging.getLogger("mempalace_mcp")
@@ -104,12 +106,61 @@ def _parse_args():
104106
os.environ["MEMPALACE_PALACE_PATH"] = os.path.abspath(_args.palace)
105107

106108
_config = MempalaceConfig()
107-
# Only override KG path when --palace is explicitly provided; otherwise use
108-
# KnowledgeGraph's default (~/.mempalace/knowledge_graph.sqlite3).
109-
if _args.palace:
110-
_kg = KnowledgeGraph(db_path=os.path.join(_config.palace_path, "knowledge_graph.sqlite3"))
111-
else:
112-
_kg = KnowledgeGraph()
109+
110+
_kg_by_path: dict[str, KnowledgeGraph] = {}
111+
_kg_cache_lock = threading.Lock()
112+
_palace_flag_given: bool = bool(_args.palace)
113+
114+
115+
def _resolve_kg_path() -> str:
116+
if _palace_flag_given:
117+
return os.path.join(_config.palace_path, "knowledge_graph.sqlite3")
118+
return DEFAULT_KG_PATH
119+
120+
121+
def _get_kg() -> KnowledgeGraph:
122+
path = os.path.abspath(_resolve_kg_path())
123+
kg = _kg_by_path.get(path)
124+
if kg is not None:
125+
return kg
126+
with _kg_cache_lock:
127+
kg = _kg_by_path.get(path)
128+
if kg is None:
129+
kg = KnowledgeGraph(db_path=path)
130+
_kg_by_path[path] = kg
131+
return kg
132+
133+
134+
def _call_kg(op):
135+
"""Run ``op(kg)`` against the cached KG with one-shot retry on close.
136+
137+
Race we're guarding against: a handler grabs ``kg = _get_kg()`` and is
138+
about to call ``kg.add_triple(...)`` when ``tool_reconnect`` fires on
139+
another thread, drains ``_kg_by_path``, and closes the underlying
140+
sqlite3.Connection. The handler's call then raises
141+
``sqlite3.ProgrammingError: Cannot operate on a closed database`` and
142+
bubbles up as a -32000 to the MCP client even though the user just
143+
asked for a reconnect.
144+
145+
Catch that single class of error, evict the stale entry from the
146+
cache (only if it still points at the closed instance — another
147+
thread may have already replaced it), and try once more with a fresh
148+
KG. Beyond one retry give up: a second close means we're losing a
149+
sustained race we won't win in this loop, and a hung loop is worse
150+
than a clear failure surface.
151+
"""
152+
for attempt in range(2):
153+
kg = _get_kg()
154+
try:
155+
return op(kg)
156+
except sqlite3.ProgrammingError:
157+
if attempt == 0:
158+
path = os.path.abspath(_resolve_kg_path())
159+
with _kg_cache_lock:
160+
if _kg_by_path.get(path) is kg:
161+
_kg_by_path.pop(path, None)
162+
continue
163+
raise
113164

114165

115166
_client_cache = None
@@ -1065,7 +1116,7 @@ def tool_kg_query(entity: str, as_of: str = None, direction: str = "both"):
10651116
return {"error": str(e)}
10661117
if direction not in ("outgoing", "incoming", "both"):
10671118
return {"error": "direction must be 'outgoing', 'incoming', or 'both'"}
1068-
results = _kg.query_entity(entity, as_of=as_of, direction=direction)
1119+
results = _call_kg(lambda kg: kg.query_entity(entity, as_of=as_of, direction=direction))
10691120
return {"entity": entity, "as_of": as_of, "facts": results, "count": len(results)}
10701121

10711122

@@ -1111,15 +1162,17 @@ def tool_kg_add(
11111162
"source_drawer_id": source_drawer_id,
11121163
},
11131164
)
1114-
triple_id = _kg.add_triple(
1115-
subject,
1116-
predicate,
1117-
object,
1118-
valid_from=valid_from,
1119-
valid_to=valid_to,
1120-
source_closet=source_closet,
1121-
source_file=source_file,
1122-
source_drawer_id=source_drawer_id,
1165+
triple_id = _call_kg(
1166+
lambda kg: kg.add_triple(
1167+
subject,
1168+
predicate,
1169+
object,
1170+
valid_from=valid_from,
1171+
valid_to=valid_to,
1172+
source_closet=source_closet,
1173+
source_file=source_file,
1174+
source_drawer_id=source_drawer_id,
1175+
)
11231176
)
11241177
return {"success": True, "triple_id": triple_id, "fact": f"{subject}{predicate}{object}"}
11251178

@@ -1151,7 +1204,7 @@ def tool_kg_invalidate(subject: str, predicate: str, object: str, ended: str = N
11511204
"ended": resolved_ended,
11521205
},
11531206
)
1154-
_kg.invalidate(subject, predicate, object, ended=resolved_ended)
1207+
_call_kg(lambda kg: kg.invalidate(subject, predicate, object, ended=resolved_ended))
11551208
return {
11561209
"success": True,
11571210
"fact": f"{subject}{predicate}{object}",
@@ -1166,13 +1219,13 @@ def tool_kg_timeline(entity: str = None):
11661219
entity = sanitize_kg_value(entity, "entity")
11671220
except ValueError as e:
11681221
return {"error": str(e)}
1169-
results = _kg.timeline(entity)
1222+
results = _call_kg(lambda kg: kg.timeline(entity))
11701223
return {"entity": entity or "all", "timeline": results, "count": len(results)}
11711224

11721225

11731226
def tool_kg_stats():
11741227
"""Knowledge graph overview: entities, triples, relationship types."""
1175-
return _kg.stats()
1228+
return _call_kg(lambda kg: kg.stats())
11761229

11771230

11781231
# ==================== AGENT DIARY ====================
@@ -1404,10 +1457,11 @@ def tool_memories_filed_away():
14041457

14051458

14061459
def tool_reconnect():
1407-
"""Force the MCP server to drop the cached ChromaDB collection and reconnect.
1460+
"""Force the MCP server to drop cached ChromaDB + KnowledgeGraph state.
14081461
14091462
Use after external scripts or CLI commands modify the palace database
1410-
directly, which can leave the in-memory HNSW index stale.
1463+
or replace ``knowledge_graph.sqlite3`` directly, which can leave the
1464+
in-memory HNSW index stale or pin a closed-on-disk SQLite connection.
14111465
"""
14121466
global \
14131467
_client_cache, \
@@ -1425,6 +1479,15 @@ def tool_reconnect():
14251479
# still applies after the reconnect.
14261480
_vector_disabled = False
14271481
_vector_disabled_reason = ""
1482+
# Drain the per-path KnowledgeGraph cache so a replaced sqlite file is
1483+
# reopened on the next tool call rather than served from a stale handle.
1484+
with _kg_cache_lock:
1485+
for kg in _kg_by_path.values():
1486+
try:
1487+
kg.close()
1488+
except Exception:
1489+
pass
1490+
_kg_by_path.clear()
14281491
try:
14291492
col = _get_collection()
14301493
if col is None:

tests/benchmarks/test_mcp_bench.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,9 @@ def _patch_mcp_config(monkeypatch, palace_path, tmp_path):
4040

4141
import mempalace.mcp_server as mcp_mod
4242

43+
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
4344
monkeypatch.setattr(mcp_mod, "_config", cfg)
44-
monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")))
45+
monkeypatch.setattr(mcp_mod, "_get_kg", lambda: kg)
4546

4647

4748
def _get_rss_mb():

tests/benchmarks/test_memory_profile.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,8 +84,9 @@ def test_tool_status_repeated_calls(self, tmp_path, monkeypatch):
8484

8585
cfg = MempalaceConfig(config_dir=str(tmp_path / "cfg"))
8686
monkeypatch.setattr(cfg, "_file_config", {"palace_path": palace_path})
87+
kg = KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3"))
8788
monkeypatch.setattr(mcp_mod, "_config", cfg)
88-
monkeypatch.setattr(mcp_mod, "_kg", KnowledgeGraph(db_path=str(tmp_path / "kg.sqlite3")))
89+
monkeypatch.setattr(mcp_mod, "_get_kg", lambda: kg)
8990

9091
from mempalace.mcp_server import tool_status
9192

0 commit comments

Comments
 (0)