Skip to content

Commit 924d078

Browse files
jpheinclaude
andauthored
feat(mine): queue requests during repair-rebuild + drain after (#4)
Mirrors the existing /silent-save queue pattern for /mine. Closes a gap JP noticed: when the daemon is in repair-mode rebuild, hook fires that POST /mine fail outright (the rebuild replaces the collection mid-flight; running a concurrent mine subprocess would race the swap). The /silent-save queue covered diary writes; /mine had no equivalent, so transcript-ingest requests during a rebuild window were lost. Adds three pieces, all parallel to the silent-save infrastructure: * `_pending_mines_path()` — separate jsonl queue file (next to the silent-save pending file). * `_enqueue_pending_mine(payload)` — appends a /mine request body to the queue, off-loop via asyncio.to_thread. * `_drain_pending_mines()` — replays queued mines after rebuild via the same subprocess pattern the live /mine endpoint uses, gated by `_mine_sem`. Same rename-then-read pattern as `_drain_pending_writes` so concurrent /mine POSTs landing during the drain go to a fresh queue file. Dedup by (dir, wing, mode) before replay — a storm of hook fires queues the same target many times, but one mine catches up all of them via convo_miner's mtime-based dedup, so we don't need to run it N times. The /mine endpoint checks `_repair_state` and queues if in rebuild mode, returning `{"queued": true, "reason": "repair-in-progress"}` to signal the caller. After the rebuild completes, `_drain_pending_mines()` runs alongside `_drain_pending_writes()`. Also extends `/repair/status` to surface `pending_mines` count alongside `pending_writes`. Tests: 5 new unittest cases — pending-mines path is distinct from writes, enqueue→drain replays each target, drain dedups repeated targets (hook-storm scenario), failed replays quarantine to .failed-* files instead of getting lost, empty queue returns 0. NOT a fix for the HNSW corruption that prompted the current incident — that came from concurrent update_drawer calls hitting chromadb's HNSW concurrency hazards (CLAUDE.md row 15). The corruption-side fix is `PALACE_MAX_WRITE_CONCURRENCY=1` in the daemon env. This PR covers a separate failure mode: hook writes during repair windows. Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent dedface commit 924d078

2 files changed

Lines changed: 282 additions & 9 deletions

File tree

main.py

Lines changed: 148 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,108 @@ def _append():
320320
await asyncio.to_thread(_append)
321321

322322

323+
def _pending_mines_path() -> str:
324+
"""Location of the jsonl queue that holds /mine requests during rebuild.
325+
326+
Separate from the silent-save queue because mines are fire-and-forget
327+
subprocess invocations rather than diary writes — replayed via the
328+
same /mine subprocess pattern, not _do_silent_save_write.
329+
"""
330+
palace_path = _mp._config.palace_path
331+
parent = os.path.dirname(palace_path.rstrip("/")) or os.path.expanduser("~")
332+
return os.path.join(parent, "palace-daemon-pending-mines.jsonl")
333+
334+
335+
async def _enqueue_pending_mine(payload: dict) -> None:
336+
"""Append a /mine request payload to the pending-mines queue (off-loop)."""
337+
path = _pending_mines_path()
338+
line = json.dumps({"payload": payload, "enqueued_at": datetime.now().isoformat()})
339+
340+
def _append():
341+
with open(path, "a", encoding="utf-8") as f:
342+
f.write(line + "\n")
343+
344+
await asyncio.to_thread(_append)
345+
346+
347+
async def _drain_pending_mines() -> int:
348+
"""Replay queued /mine requests after a rebuild completes.
349+
350+
Same rename-then-read pattern as _drain_pending_writes — concurrent
351+
/mine POSTs landing during the drain go to a fresh pending file.
352+
Each entry is replayed by spawning the same subprocess the live
353+
/mine endpoint would, gated by _mine_sem. Failed entries are
354+
quarantined to a timestamped .failed-* file.
355+
"""
356+
path = _pending_mines_path()
357+
if not os.path.isfile(path):
358+
return 0
359+
proc_path = path + ".processing"
360+
try:
361+
os.rename(path, proc_path)
362+
except OSError:
363+
return 0
364+
count = 0
365+
failed_lines: list[str] = []
366+
try:
367+
with open(proc_path, encoding="utf-8") as f:
368+
lines = [ln for ln in f.readlines() if ln.strip()]
369+
# Dedup queued mines by (dir, wing, mode) — re-mining the same dir
370+
# is the goal, not running it N times. A storm of hook fires during
371+
# rebuild may have queued the same target dozens of times; one
372+
# successful drain replay covers them all.
373+
seen: set = set()
374+
unique_entries: list = []
375+
for line in reversed(lines): # keep newest of each (dir, wing, mode)
376+
try:
377+
entry = json.loads(line)
378+
payload = entry.get("payload", {})
379+
key = (payload.get("dir"), payload.get("wing"), payload.get("mode", "convos"))
380+
if key in seen:
381+
continue
382+
seen.add(key)
383+
unique_entries.append((line, entry))
384+
except json.JSONDecodeError:
385+
failed_lines.append(line)
386+
# Replay in original order
387+
unique_entries.reverse()
388+
for line, entry in unique_entries:
389+
try:
390+
payload = entry["payload"]
391+
directory = _translate_client_path(payload["dir"])
392+
if not Path(directory).is_dir():
393+
_log.warning("drain-mine: skipping %s — not a directory", directory)
394+
continue
395+
wing = payload.get("wing", "general")
396+
mode = payload.get("mode", "convos")
397+
mempalace_bin = os.path.join(os.path.dirname(sys.executable), "mempalace")
398+
cmd = [mempalace_bin, "mine", directory, "--mode", mode, "--wing", wing]
399+
async with _mine_sem:
400+
proc = await asyncio.create_subprocess_exec(
401+
*cmd,
402+
stdout=asyncio.subprocess.PIPE,
403+
stderr=asyncio.subprocess.PIPE,
404+
)
405+
stdout, stderr = await proc.communicate()
406+
if proc.returncode == 0:
407+
count += 1
408+
else:
409+
_log.warning("drain-mine: replay returned %s for %s", proc.returncode, directory)
410+
failed_lines.append(line)
411+
except Exception:
412+
_log.exception("drain-mine: entry replay raised")
413+
failed_lines.append(line)
414+
if failed_lines:
415+
qpath = proc_path + ".failed-" + datetime.now().strftime("%Y%m%d%H%M%S")
416+
with open(qpath, "w", encoding="utf-8") as f:
417+
f.writelines(failed_lines)
418+
_log.warning("drain-mine: %d entries quarantined at %s", len(failed_lines), qpath)
419+
os.remove(proc_path)
420+
except Exception:
421+
_log.exception("drain-mine: read failed; leaving %s in place", proc_path)
422+
return count
423+
424+
323425
async def _drain_pending_writes() -> int:
324426
"""Replay queued silent-saves after a rebuild completes.
325427
@@ -1164,6 +1266,32 @@ async def mine(request: Request, x_api_key: str | None = Header(default=None)):
11641266
except (TypeError, ValueError):
11651267
raise HTTPException(status_code=400, detail="'limit' must be an integer")
11661268

1269+
# During /repair mode=rebuild, queue the mine instead of executing it.
1270+
# Mirrors the /silent-save queue pattern — the rebuild replaces the
1271+
# collection mid-flight, so any concurrent mine subprocess would race
1272+
# the swap. After repair completes, _drain_pending_mines() replays
1273+
# queued mines through the same code path. Pass-through fields preserve
1274+
# extract/limit on replay.
1275+
if (
1276+
_repair_state["in_progress"]
1277+
and _repair_state.get("mode") == "rebuild"
1278+
):
1279+
await _enqueue_pending_mine({
1280+
"dir": body.get("dir"), # original (untranslated) path so replay translates fresh
1281+
"wing": wing,
1282+
"mode": mode,
1283+
"extract": extract,
1284+
"limit": limit,
1285+
})
1286+
return {
1287+
"queued": True,
1288+
"reason": "repair-in-progress",
1289+
"systemMessage": (
1290+
"Mine queued — palace is rebuilding. Will replay automatically "
1291+
"when repair completes."
1292+
),
1293+
}
1294+
11671295
mempalace_bin = os.path.join(os.path.dirname(sys.executable), "mempalace")
11681296
cmd = [mempalace_bin, "mine", directory, "--mode", mode, "--wing", wing]
11691297
if extract:
@@ -1381,37 +1509,48 @@ async def repair(request: Request, x_api_key: str | None = Header(default=None))
13811509
_repair_state["mode"] = None
13821510
_repair_state["started_at"] = None
13831511

1512+
drained_mines = 0
13841513
if mode == "rebuild":
13851514
drained = await _drain_pending_writes()
1515+
# Also replay any /mine requests queued during the rebuild. Mirrors
1516+
# _drain_pending_writes — same rename-then-read, dedup-by-target,
1517+
# subprocess re-execution.
1518+
drained_mines = await _drain_pending_mines()
13861519

13871520
duration = (datetime.now() - start).total_seconds()
13881521
_log.info(messages.repair_complete(mode, drained, duration))
13891522
return {
13901523
"mode": mode,
13911524
"result": result,
13921525
"drained": drained,
1526+
"drained_mines": drained_mines,
13931527
"duration_s": round(duration, 3),
13941528
"systemMessage": messages.repair_complete(mode, drained, duration),
13951529
}
13961530

13971531

13981532
@app.get("/repair/status")
13991533
async def repair_status():
1400-
"""Current repair state + pending-writes queue depth."""
1401-
queue_path = _pending_writes_path()
1402-
pending = 0
1403-
if os.path.isfile(queue_path):
1534+
"""Current repair state + pending-writes + pending-mines queue depths."""
1535+
def _count_lines(path: str) -> int:
1536+
if not os.path.isfile(path):
1537+
return 0
14041538
try:
1405-
with open(queue_path, encoding="utf-8") as f:
1406-
pending = sum(1 for ln in f if ln.strip())
1539+
with open(path, encoding="utf-8") as f:
1540+
return sum(1 for ln in f if ln.strip())
14071541
except OSError:
1408-
pending = -1
1542+
return -1
1543+
1544+
writes_path = _pending_writes_path()
1545+
mines_path = _pending_mines_path()
14091546
return {
14101547
"in_progress": _repair_state["in_progress"],
14111548
"mode": _repair_state["mode"],
14121549
"started_at": _repair_state["started_at"],
1413-
"pending_writes": pending,
1414-
"pending_writes_path": queue_path,
1550+
"pending_writes": _count_lines(writes_path),
1551+
"pending_writes_path": writes_path,
1552+
"pending_mines": _count_lines(mines_path),
1553+
"pending_mines_path": mines_path,
14151554
}
14161555

14171556

tests/test_mine_queue.py

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,134 @@
1+
"""Tests for /mine queue + drain during repair=rebuild.
2+
3+
Mirrors the silent-save queue contract: while a rebuild is in progress,
4+
/mine requests are appended to a jsonl file at ``_pending_mines_path()``
5+
and replayed by ``_drain_pending_mines()`` after the rebuild completes.
6+
7+
Run with::
8+
9+
python -m unittest tests.test_mine_queue -v
10+
11+
Pure-function and pure-IO tests; no live daemon required. The drain
12+
test stubs the chromadb subprocess invocation by monkey-patching
13+
``asyncio.create_subprocess_exec``.
14+
"""
15+
import asyncio
16+
import json
17+
import os
18+
import sys
19+
import tempfile
20+
import unittest
21+
from unittest.mock import patch, AsyncMock, MagicMock
22+
23+
_HERE = os.path.dirname(os.path.abspath(__file__))
24+
_ROOT = os.path.dirname(_HERE)
25+
if _ROOT not in sys.path:
26+
sys.path.insert(0, _ROOT)
27+
28+
import main # noqa: E402
29+
30+
31+
class TestPendingMinesPath(unittest.TestCase):
32+
"""The pending-mines file lives alongside the silent-save pending file
33+
but with a distinct name, so a daemon-busy save and a daemon-busy mine
34+
can both queue independently."""
35+
36+
def test_path_is_separate_from_writes_path(self):
37+
# Both derive from _config.palace_path's parent, but the basenames differ.
38+
writes = main._pending_writes_path()
39+
mines = main._pending_mines_path()
40+
self.assertNotEqual(writes, mines)
41+
self.assertTrue(mines.endswith("palace-daemon-pending-mines.jsonl"))
42+
43+
44+
class TestEnqueueAndDrain(unittest.IsolatedAsyncioTestCase):
45+
"""End-to-end: enqueue a few payloads, then drain — verify dedup + replay."""
46+
47+
async def asyncSetUp(self):
48+
self.tmp = tempfile.TemporaryDirectory()
49+
# Point the queue file inside the tmp dir
50+
self._queue_path = os.path.join(self.tmp.name, "pending-mines.jsonl")
51+
self._patches = [
52+
patch.object(main, "_pending_mines_path", return_value=self._queue_path),
53+
# Skip path translation for tests
54+
patch.object(main, "_translate_client_path", side_effect=lambda p: p),
55+
]
56+
for p in self._patches:
57+
p.start()
58+
# Make every replayed dir "exist" so the drain doesn't skip
59+
self._is_dir_patch = patch("pathlib.Path.is_dir", return_value=True)
60+
self._is_dir_patch.start()
61+
62+
async def asyncTearDown(self):
63+
for p in self._patches:
64+
p.stop()
65+
self._is_dir_patch.stop()
66+
self.tmp.cleanup()
67+
68+
async def test_enqueue_then_drain_replays_each_target(self):
69+
await main._enqueue_pending_mine({"dir": "/a", "wing": "wa", "mode": "convos"})
70+
await main._enqueue_pending_mine({"dir": "/b", "wing": "wb", "mode": "convos"})
71+
self.assertTrue(os.path.isfile(self._queue_path))
72+
73+
# Stub the subprocess: each call returns rc=0
74+
async def _fake_subprocess(*args, **kwargs):
75+
proc = MagicMock()
76+
proc.communicate = AsyncMock(return_value=(b"", b""))
77+
proc.returncode = 0
78+
return proc
79+
80+
with patch("asyncio.create_subprocess_exec", side_effect=_fake_subprocess) as spawn:
81+
count = await main._drain_pending_mines()
82+
self.assertEqual(count, 2)
83+
# Queue file is gone after a clean drain
84+
self.assertFalse(os.path.isfile(self._queue_path))
85+
# Subprocess was invoked twice (once per unique target)
86+
self.assertEqual(spawn.call_count, 2)
87+
88+
async def test_drain_dedups_repeated_target(self):
89+
"""A storm of hook fires queues the same (dir, wing, mode) many times.
90+
Drain replays once per unique target — a single mine catches up all
91+
the queued requests via convo_miner's mtime-based dedup anyway."""
92+
for _ in range(10):
93+
await main._enqueue_pending_mine({"dir": "/a", "wing": "wa", "mode": "convos"})
94+
95+
async def _fake_subprocess(*args, **kwargs):
96+
proc = MagicMock()
97+
proc.communicate = AsyncMock(return_value=(b"", b""))
98+
proc.returncode = 0
99+
return proc
100+
101+
with patch("asyncio.create_subprocess_exec", side_effect=_fake_subprocess) as spawn:
102+
count = await main._drain_pending_mines()
103+
self.assertEqual(count, 1)
104+
self.assertEqual(spawn.call_count, 1)
105+
106+
async def test_drain_quarantines_failed_replays(self):
107+
"""A non-zero subprocess exit doesn't lose the queue entry — it
108+
moves to a timestamped .failed-* file so the next drain doesn't
109+
replay it again."""
110+
await main._enqueue_pending_mine({"dir": "/a", "wing": "wa", "mode": "convos"})
111+
112+
async def _fake_subprocess(*args, **kwargs):
113+
proc = MagicMock()
114+
proc.communicate = AsyncMock(return_value=(b"", b"boom"))
115+
proc.returncode = 1
116+
return proc
117+
118+
with patch("asyncio.create_subprocess_exec", side_effect=_fake_subprocess):
119+
count = await main._drain_pending_mines()
120+
self.assertEqual(count, 0)
121+
# Original queue file was removed; a .failed-* sibling exists
122+
self.assertFalse(os.path.isfile(self._queue_path))
123+
siblings = os.listdir(self.tmp.name)
124+
failed = [s for s in siblings if ".failed-" in s]
125+
self.assertEqual(len(failed), 1)
126+
127+
async def test_drain_empty_queue_returns_zero(self):
128+
"""No queue file → no work → return 0, no error."""
129+
count = await main._drain_pending_mines()
130+
self.assertEqual(count, 0)
131+
132+
133+
if __name__ == "__main__":
134+
unittest.main()

0 commit comments

Comments
 (0)