Skip to content

Commit ef29e02

Browse files
jpheinclaude
andcommitted
fix: address Copilot review feedback on PRs #3 + #4
Seven findings, batched into one cleanup PR. PR #3 (file-watcher) findings: * watcher.py:30 — drop unused ``import time``. * watcher.py:336 — type ``_log_future_exception`` parameter as ``concurrent.futures.Future`` (NOT ``asyncio.Future``); that's what ``asyncio.run_coroutine_threadsafe`` returns. Catch its cancellation/state errors plus the asyncio variants so the callback can't itself crash on a concurrent cancellation. * main.py:560 — surface stderr/stdout when watcher mine returns non-zero. The rc alone hides 'No mempalace.yaml found' / python tracebacks operators need to diagnose. * main.py:1202 — ``GET /watch`` now double-checks ``watcher.is_running`` so a thread crash that flips is_running to False between startup and now is reflected in the endpoint response. Lifespan still gates publication; this adds defense in depth. PR #4 (mine-queue) findings: * main.py drain replay — re-apply optional ``extract`` and ``limit`` fields. The prior drain dropped them silently, so a queue entry with those fields got replayed without. Live /mine accepts them; replay must too. * main.py drain replay — enforce the same safety guards as the live /mine endpoint: ``isinstance(dir, str)``, absolute-path, no ``..`` traversal, ``mode in {convos, projects}``, ``extract in {exchange, general}`` if set, ``limit`` int-coercible if set. Without these a queue entry could smuggle through values the live endpoint would 400. Tests: +2 — replay-extract-and-limit (asserts the new fields make it onto the subprocess argv) and skips-invalid-payload-fields (5 invalid-payload variants, each correctly skipped pre-spawn). 35 daemon tests pass. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 924d078 commit ef29e02

3 files changed

Lines changed: 135 additions & 13 deletions

File tree

main.py

Lines changed: 66 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -385,17 +385,58 @@ async def _drain_pending_mines() -> int:
385385
failed_lines.append(line)
386386
# Replay in original order
387387
unique_entries.reverse()
388+
# Same valid-mode/extract sets as the live /mine endpoint —
389+
# apply them on replay too so a queue entry can't smuggle through
390+
# a value the live endpoint would reject (Copilot finding on
391+
# jphein/palace-daemon#4).
392+
VALID_MODES = {"convos", "projects"}
393+
VALID_EXTRACTS = {"exchange", "general"}
388394
for line, entry in unique_entries:
389395
try:
390396
payload = entry["payload"]
391-
directory = _translate_client_path(payload["dir"])
392-
if not Path(directory).is_dir():
397+
raw_dir = payload.get("dir")
398+
if not isinstance(raw_dir, str) or not raw_dir:
399+
_log.warning("drain-mine: skipping entry — invalid 'dir'")
400+
continue
401+
directory = _translate_client_path(raw_dir)
402+
dir_path = Path(directory)
403+
# Same path-shape gate as /mine (absolute + no traversal).
404+
if not dir_path.is_absolute() or ".." in dir_path.parts:
405+
_log.warning(
406+
"drain-mine: skipping %s — non-absolute or contains '..'", raw_dir
407+
)
408+
continue
409+
if not dir_path.is_dir():
393410
_log.warning("drain-mine: skipping %s — not a directory", directory)
394411
continue
395412
wing = payload.get("wing", "general")
396413
mode = payload.get("mode", "convos")
414+
if mode not in VALID_MODES:
415+
_log.warning("drain-mine: skipping %s — invalid mode %r", directory, mode)
416+
continue
417+
extract = payload.get("extract")
418+
if extract is not None and extract not in VALID_EXTRACTS:
419+
_log.warning(
420+
"drain-mine: skipping %s — invalid extract %r", directory, extract
421+
)
422+
continue
423+
limit = payload.get("limit")
424+
if limit is not None:
425+
try:
426+
limit = int(limit)
427+
except (TypeError, ValueError):
428+
_log.warning(
429+
"drain-mine: skipping %s — invalid limit %r", directory, limit
430+
)
431+
continue
397432
mempalace_bin = os.path.join(os.path.dirname(sys.executable), "mempalace")
398433
cmd = [mempalace_bin, "mine", directory, "--mode", mode, "--wing", wing]
434+
# Re-apply optional fields the original /mine accepted but
435+
# the prior drain dropped silently (Copilot finding on #4).
436+
if extract:
437+
cmd += ["--extract", extract]
438+
if limit:
439+
cmd += ["--limit", str(limit)]
399440
async with _mine_sem:
400441
proc = await asyncio.create_subprocess_exec(
401442
*cmd,
@@ -406,7 +447,12 @@ async def _drain_pending_mines() -> int:
406447
if proc.returncode == 0:
407448
count += 1
408449
else:
409-
_log.warning("drain-mine: replay returned %s for %s", proc.returncode, directory)
450+
_log.warning(
451+
"drain-mine: replay returned %s for %s\n stderr: %s",
452+
proc.returncode,
453+
directory,
454+
(stderr or b"").decode(errors="replace")[:300],
455+
)
410456
failed_lines.append(line)
411457
except Exception:
412458
_log.exception("drain-mine: entry replay raised")
@@ -659,7 +705,17 @@ async def _internal_mine(path: str, wing: str) -> None:
659705
)
660706
stdout, stderr = await proc.communicate()
661707
if proc.returncode != 0:
662-
logger.warning("watcher mine returned %s for %s", proc.returncode, path)
708+
# Surface the actual subprocess output — the rc alone hides
709+
# 'No mempalace.yaml found' / 'directory not readable' /
710+
# python tracebacks that operators need to diagnose.
711+
# Closes Copilot finding on jphein/palace-daemon#3.
712+
logger.warning(
713+
"watcher mine returned %s for %s\n stderr: %s\n stdout: %s",
714+
proc.returncode,
715+
path,
716+
(stderr or b"").decode(errors="replace")[:500],
717+
(stdout or b"").decode(errors="replace")[-500:],
718+
)
663719

664720
watcher = WatcherService(make_async_mine_fn(loop, _internal_mine))
665721
watcher.start(targets)
@@ -1325,7 +1381,12 @@ async def watch_list(x_api_key: str | None = Header(default=None)):
13251381
"""
13261382
_check_auth(x_api_key)
13271383
watcher = getattr(app.state, "watcher", None)
1328-
if watcher is None:
1384+
# Belt + suspenders: lifespan only publishes app.state.watcher when
1385+
# is_running, but check it again here so a thread crash that flips
1386+
# is_running to False between startup and now is reflected in the
1387+
# endpoint's running= field. Closes Copilot finding on
1388+
# jphein/palace-daemon#3.
1389+
if watcher is None or not getattr(watcher, "is_running", False):
13291390
return {"running": False, "targets": []}
13301391
return {"running": True, "targets": watcher.list_targets()}
13311392

tests/test_mine_queue.py

Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,62 @@ async def test_drain_empty_queue_returns_zero(self):
129129
count = await main._drain_pending_mines()
130130
self.assertEqual(count, 0)
131131

132+
async def test_drain_replays_extract_and_limit_options(self):
133+
"""Closes Copilot finding on jphein/palace-daemon#4 — drain
134+
previously dropped optional ``extract`` / ``limit`` fields,
135+
so a queue entry that included them got replayed without."""
136+
await main._enqueue_pending_mine({
137+
"dir": "/a", "wing": "wa", "mode": "convos",
138+
"extract": "exchange", "limit": 100,
139+
})
140+
141+
captured_argv = []
142+
143+
async def _fake_subprocess(*args, **kwargs):
144+
captured_argv.append(list(args))
145+
proc = MagicMock()
146+
proc.communicate = AsyncMock(return_value=(b"", b""))
147+
proc.returncode = 0
148+
return proc
149+
150+
with patch("asyncio.create_subprocess_exec", side_effect=_fake_subprocess):
151+
count = await main._drain_pending_mines()
152+
153+
self.assertEqual(count, 1)
154+
self.assertEqual(len(captured_argv), 1)
155+
argv = captured_argv[0]
156+
# extract and limit make it onto the replay command
157+
self.assertIn("--extract", argv)
158+
self.assertEqual(argv[argv.index("--extract") + 1], "exchange")
159+
self.assertIn("--limit", argv)
160+
self.assertEqual(argv[argv.index("--limit") + 1], "100")
161+
162+
async def test_drain_skips_invalid_payload_fields(self):
163+
"""Closes Copilot finding on jphein/palace-daemon#4 — drain
164+
previously skipped only is_dir() check; now also enforces
165+
the same valid-mode / valid-extract / int-limit / no-traversal
166+
guards as the live /mine endpoint."""
167+
for bad in (
168+
{"dir": "../../etc/passwd", "wing": "wa", "mode": "convos"}, # traversal
169+
{"dir": "/a", "wing": "wa", "mode": "wrong-mode"}, # invalid mode
170+
{"dir": "/a", "wing": "wa", "mode": "convos", "extract": "wrong"}, # invalid extract
171+
{"dir": "/a", "wing": "wa", "mode": "convos", "limit": "not-a-number"}, # invalid limit
172+
{"dir": None, "wing": "wa", "mode": "convos"}, # invalid dir type
173+
):
174+
await main._enqueue_pending_mine(bad)
175+
176+
async def _fake_subprocess(*args, **kwargs):
177+
proc = MagicMock()
178+
proc.communicate = AsyncMock(return_value=(b"", b""))
179+
proc.returncode = 0
180+
return proc
181+
182+
with patch("asyncio.create_subprocess_exec", side_effect=_fake_subprocess) as spawn:
183+
count = await main._drain_pending_mines()
184+
# All five entries skipped, no subprocess spawned, count = 0
185+
self.assertEqual(count, 0)
186+
self.assertEqual(spawn.call_count, 0)
187+
132188

133189
if __name__ == "__main__":
134190
unittest.main()

watcher.py

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,10 @@
2424
from __future__ import annotations
2525

2626
import asyncio
27+
import concurrent.futures
2728
import logging
2829
import os
2930
import threading
30-
import time
3131
from dataclasses import dataclass
3232
from pathlib import Path
3333
from typing import Callable
@@ -319,18 +319,23 @@ def list_targets(self) -> list[dict[str, str]]:
319319
return [{"path": str(t.path), "wing": t.wing} for t in self._targets]
320320

321321

322-
def _log_future_exception(future: "asyncio.Future") -> None:
322+
def _log_future_exception(future: "concurrent.futures.Future") -> None:
323323
"""Surface exceptions raised inside the scheduled mine coroutine.
324324
325-
`run_coroutine_threadsafe` returns a Future the caller must observe;
326-
if the coroutine raises and the Future is dropped, the exception is
327-
swallowed silently. Attaching this callback ensures watcher-driven
328-
mine failures show up in the daemon log instead of disappearing.
329-
Closes Copilot finding on jphein/palace-daemon#2.
325+
``asyncio.run_coroutine_threadsafe`` returns a
326+
``concurrent.futures.Future`` (NOT ``asyncio.Future``) — the
327+
callback receives the cross-thread variant. Catch its
328+
cancellation/state errors plus the asyncio variants so the
329+
callback can't itself crash on a concurrent cancellation.
330+
Closes Copilot finding on jphein/palace-daemon#3.
330331
"""
331332
try:
332333
exc = future.exception()
333-
except (asyncio.CancelledError, asyncio.InvalidStateError):
334+
except (
335+
concurrent.futures.CancelledError,
336+
concurrent.futures.InvalidStateError,
337+
asyncio.CancelledError,
338+
):
334339
return
335340
if exc is not None:
336341
_log.error("watcher-scheduled mine raised: %r", exc, exc_info=exc)

0 commit comments

Comments
 (0)