Skip to content

Commit db28bf1

Browse files
committed
fix: paginate closet_llm col.get (#1073)
Mirror the pagination pattern PR #851 landed in miner.py:status(). A single drawers_col.get(limit=total, ...) on palaces larger than SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) crashes inside chromadb. Fetch drawers in batch_size=5000 chunks, stepping offset until the collection is drained. by_source aggregation semantics are preserved exactly — grouping, wing filter, meta capture all unchanged. Closes #1073. Related: #802, #850, #1016.
1 parent fdfaf01 commit db28bf1

2 files changed

Lines changed: 198 additions & 11 deletions

File tree

mempalace/closet_llm.py

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -221,17 +221,28 @@ def regenerate_closets(
221221
print("No drawers in palace.")
222222
return {"processed": 0}
223223

224-
all_data = drawers_col.get(limit=total, include=["documents", "metadatas"])
225-
by_source = {}
226-
for doc_id, doc, meta in zip(all_data["ids"], all_data["documents"], all_data["metadatas"]):
227-
source = meta.get("source_file", "unknown")
228-
w = meta.get("wing", "")
229-
if wing and w != wing:
230-
continue
231-
if source not in by_source:
232-
by_source[source] = {"drawer_ids": [], "content": [], "meta": meta}
233-
by_source[source]["drawer_ids"].append(doc_id)
234-
by_source[source]["content"].append(doc)
224+
# Paginate the fetch — a single get(limit=total, ...) blows through
225+
# SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) on large palaces and
226+
# crashes inside chromadb (see #802, #850, #1073).
227+
by_source: dict = {}
228+
batch_size = 5000
229+
offset = 0
230+
while offset < total:
231+
batch = drawers_col.get(limit=batch_size, offset=offset, include=["documents", "metadatas"])
232+
ids = batch["ids"]
233+
if not ids:
234+
break
235+
for doc_id, doc, meta in zip(ids, batch["documents"], batch["metadatas"]):
236+
meta = meta or {}
237+
source = meta.get("source_file", "unknown")
238+
w = meta.get("wing", "")
239+
if wing and w != wing:
240+
continue
241+
if source not in by_source:
242+
by_source[source] = {"drawer_ids": [], "content": [], "meta": meta}
243+
by_source[source]["drawer_ids"].append(doc_id)
244+
by_source[source]["content"].append(doc)
245+
offset += len(ids)
235246

236247
sources = list(by_source.keys())
237248
if sample > 0:

tests/test_closet_llm.py

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,182 @@ def fake_urlopen(req, timeout=None):
296296
assert meta.get("generated_by", "").startswith("llm:")
297297
assert meta.get("normalize_version") == NORMALIZE_VERSION
298298

299+
def test_regen_paginates_drawer_fetch(self, tmp_path):
300+
"""Regression for #1073: drawers_col.get must be paginated at
301+
batch_size=5000. A single get(limit=total, ...) on a palace with
302+
more than SQLite's SQLITE_MAX_VARIABLE_NUMBER (32766) drawers
303+
blows up inside chromadb. Matches the miner.status pattern
304+
introduced in #851 (see #802, #850, #1073)."""
305+
from mempalace import closet_llm as closet_llm_mod
306+
307+
palace = str(tmp_path / "palace")
308+
309+
# Build a fake collection: 12_000 drawers across 3 source files,
310+
# enough to force 3 batches of batch_size=5000 (5000 + 5000 + 2000).
311+
n_drawers = 12_000
312+
ids = [f"d{i:05d}" for i in range(n_drawers)]
313+
docs = [f"doc body {i}" for i in range(n_drawers)]
314+
metas = [
315+
{
316+
"wing": "w",
317+
"room": "r",
318+
"source_file": f"/src/file_{i % 3}.md",
319+
"entities": "",
320+
}
321+
for i in range(n_drawers)
322+
]
323+
324+
get_calls: list = []
325+
326+
class FakeDrawersCol:
327+
def count(self):
328+
return n_drawers
329+
330+
def get(self, limit=None, offset=0, include=None, **kwargs):
331+
get_calls.append({"limit": limit, "offset": offset, "include": include})
332+
end = min(offset + (limit or n_drawers), n_drawers)
333+
return {
334+
"ids": ids[offset:end],
335+
"documents": docs[offset:end],
336+
"metadatas": metas[offset:end],
337+
}
338+
339+
class FakeClosetsCol:
340+
"""Accept the purge + upsert calls the success path makes."""
341+
342+
def get(self, *a, **kw):
343+
return {"ids": [], "documents": [], "metadatas": []}
344+
345+
def delete(self, *a, **kw):
346+
return None
347+
348+
def upsert(self, *a, **kw):
349+
return None
350+
351+
fake_drawers = FakeDrawersCol()
352+
fake_closets = FakeClosetsCol()
353+
354+
def fake_urlopen(req, timeout=None):
355+
return _FakeResp(
356+
{
357+
"choices": [
358+
{"message": {"content": '{"topics":["t1"],"quotes":[],"summary":""}'}}
359+
],
360+
"usage": {"prompt_tokens": 1, "completion_tokens": 1},
361+
}
362+
)
363+
364+
cfg = LLMConfig(endpoint="http://local/v1", model="m")
365+
366+
with (
367+
patch.object(closet_llm_mod, "get_collection", return_value=fake_drawers),
368+
patch.object(closet_llm_mod, "get_closets_collection", return_value=fake_closets),
369+
patch.object(closet_llm_mod, "purge_file_closets", return_value=None),
370+
patch.object(closet_llm_mod, "upsert_closet_lines", return_value=None),
371+
patch("urllib.request.urlopen", side_effect=fake_urlopen),
372+
):
373+
result = regenerate_closets(palace, cfg=cfg, dry_run=True)
374+
375+
# Three paginated calls: (limit=5000, offset=0), (5000, 5000), (5000, 10000).
376+
assert len(get_calls) == 3, f"expected 3 batched fetches, got {len(get_calls)}"
377+
for call in get_calls:
378+
assert (
379+
call["limit"] == 5000
380+
), f"batch must be 5000 — got {call['limit']} (would risk SQLITE_MAX_VARIABLE_NUMBER)"
381+
# include must still request both documents and metadatas
382+
assert "documents" in call["include"]
383+
assert "metadatas" in call["include"]
384+
assert [c["offset"] for c in get_calls] == [0, 5000, 10_000]
385+
386+
# by_source aggregation must be preserved exactly across batches:
387+
# 12_000 drawers, 3 source files → 4_000 drawers each.
388+
# dry_run=True short-circuits LLM calls but still walks by_source.
389+
assert result.get("processed", 0) == 0 # dry_run
390+
# Verify no single call tried to pull more than batch_size.
391+
assert max(c["limit"] for c in get_calls) <= 5000
392+
393+
def test_regen_by_source_aggregates_across_batches(self, tmp_path):
394+
"""Pagination must not change the by_source grouping — drawers for
395+
the same source_file split across batches still land in one group."""
396+
from mempalace import closet_llm as closet_llm_mod
397+
398+
palace = str(tmp_path / "palace")
399+
400+
# 7_500 drawers, alternating between two source files → forces
401+
# splits across the 5000/2500 boundary. Each source ends up with
402+
# 3_750 drawers after regrouping.
403+
n_drawers = 7_500
404+
ids = [f"d{i:05d}" for i in range(n_drawers)]
405+
docs = [f"body-{i}" for i in range(n_drawers)]
406+
metas = [
407+
{
408+
"wing": "w",
409+
"room": "r",
410+
"source_file": f"/src/file_{i % 2}.md",
411+
"entities": "",
412+
}
413+
for i in range(n_drawers)
414+
]
415+
416+
captured_sources: dict = {}
417+
418+
class FakeDrawersCol:
419+
def count(self):
420+
return n_drawers
421+
422+
def get(self, limit=None, offset=0, include=None, **kwargs):
423+
end = min(offset + (limit or n_drawers), n_drawers)
424+
return {
425+
"ids": ids[offset:end],
426+
"documents": docs[offset:end],
427+
"metadatas": metas[offset:end],
428+
}
429+
430+
class FakeClosetsCol:
431+
def get(self, *a, **kw):
432+
return {"ids": [], "documents": [], "metadatas": []}
433+
434+
def delete(self, *a, **kw):
435+
return None
436+
437+
def upsert(self, *a, **kw):
438+
return None
439+
440+
# Hook _call_llm to inspect what regenerate_closets aggregated
441+
# per source before the HTTP boundary.
442+
real_call_llm = closet_llm_mod._call_llm
443+
444+
def spying_call_llm(cfg, source_file, wing, room, content):
445+
captured_sources[source_file] = content
446+
return (
447+
{"topics": ["t"], "quotes": [], "summary": ""},
448+
{"prompt_tokens": 1, "completion_tokens": 1},
449+
)
450+
451+
cfg = LLMConfig(endpoint="http://local/v1", model="m")
452+
453+
with (
454+
patch.object(closet_llm_mod, "get_collection", return_value=FakeDrawersCol()),
455+
patch.object(closet_llm_mod, "get_closets_collection", return_value=FakeClosetsCol()),
456+
patch.object(closet_llm_mod, "purge_file_closets", return_value=None),
457+
patch.object(closet_llm_mod, "upsert_closet_lines", return_value=None),
458+
patch.object(closet_llm_mod, "_call_llm", side_effect=spying_call_llm),
459+
):
460+
regenerate_closets(palace, cfg=cfg)
461+
462+
# Both sources survived the pagination boundary.
463+
assert set(captured_sources.keys()) == {"/src/file_0.md", "/src/file_1.md"}
464+
# Each source accumulated exactly 3_750 drawer bodies, concatenated
465+
# with the "\n\n" separator the regenerate path uses.
466+
for source, content in captured_sources.items():
467+
assert content.count("\n\n") == 3_749, (
468+
f"{source}: expected 3_750 chunks joined (3_749 separators), "
469+
f"got {content.count(chr(10) + chr(10)) + 1}"
470+
)
471+
472+
# Silence unused-var lint.
473+
assert real_call_llm is not None
474+
299475
def test_regen_uses_basename_not_split_slash(self, tmp_path, monkeypatch):
300476
"""Regression: the old closet_id base used ``source.split('/')[-1]``
301477
which silently degrades on Windows paths (``C:\\proj\\a.md`` →

0 commit comments

Comments
 (0)