Skip to content

Commit 94dfbb6

Browse files
jpheinclaude
andcommitted
feat(watcher): file-watcher service for auto-mining on file change
Closes the "automining when files are added or updated" half of JP's ask. The other half (CLI add/remove of mined data) ships in jphein/mempalace#4 as `mempalace mined` + `mempalace purge --source-file`. Adds a watchdog-based file watcher that runs inside the daemon's lifespan. Configured via `PALACE_WATCH_DIRS` env var: PALACE_WATCH_DIRS="/home/jp/Projects/realmwatch=wing_realmwatch, /home/jp/Projects/oracle=wing_oracle" Each entry is `path` or `path=wing`. Bare path derives wing via `mempalace.config.normalize_wing_name(basename)` to match the local-spawn miner default. Non-existent paths are warned-and-skipped, not fatal — a misconfigured env var doesn't kill startup. Architecture: * watcher.py — pure-Python module: parse_watch_dirs(), _DebouncedMineHandler (subclass of FileSystemEventHandler with a per-target debounce timer), WatcherService (lifecycle wrapper around watchdog.Observer). * main.py lifespan — instantiate WatcherService, schedule one recursive watch per target, register stop() on shutdown. Failures are non-fatal; the daemon serves traffic regardless. * main.py — `_internal_mine()` async closure runs the same `mempalace mine ... --mode projects --wing X` argv as the existing /mine endpoint, gated by the same `_mine_sem`. * main.py — new `GET /watch` endpoint surfaces the running target list. No POST/DELETE — runtime add/remove requires daemon restart (operator just edits the systemd unit env and restarts). Debounce: events on the same target collapse via a 2-second per-target timer. Catches editor write+rename storms and *.swp / *.lock churn. Also pre-filters by extension allowlist (29 extensions: code, configs, markdown). Inotify fires for everything; the handler discards non-watched extensions before the timer schedules. Path translation: when a watch path lives in a Syncthing-replicated directory, the daemon-side path is translated through `PALACE_DAEMON_PATH_MAP` before the mine subprocess runs (same mechanism used by /mine). Tests: 11 new unittest cases in tests/test_watcher.py: - TestParseWatchDirs (7 cases): empty, single-with-derived-wing, explicit-wing, multiple, skips-nonexistent, skips-files, env-fallback - TestDebouncedMineHandler (4 cases): single-event-fires, burst-collapses-to-one, skips-directory-events, skips-bad-extensions Stacked on #1 (path-translation). When that merges, this PR auto-rebases against main; reviewers see only the new commits in this PR. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
1 parent 625a47f commit 94dfbb6

4 files changed

Lines changed: 493 additions & 0 deletions

File tree

main.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -525,10 +525,52 @@ async def lifespan(app: FastAPI):
525525
asyncio.create_task(_watchdog_loop(wdog_secs))
526526
logger.info("Systemd watchdog active (interval=%ds, tick=%ds).", wdog_secs, max(10, wdog_secs // 2))
527527

528+
# File-watcher service: mines configured directories on file change.
529+
# Configured via PALACE_WATCH_DIRS (comma-separated path[=wing]).
530+
# Mirrors the pattern used by the /mine endpoint above.
531+
try:
532+
from watcher import WatcherService, make_async_mine_fn, parse_watch_dirs
533+
534+
targets = parse_watch_dirs()
535+
loop = asyncio.get_running_loop()
536+
537+
async def _internal_mine(path: str, wing: str) -> None:
538+
translated = _translate_client_path(path)
539+
dir_path = Path(translated)
540+
if not dir_path.is_dir():
541+
logger.warning("watcher: skipping mine for non-dir path %s", translated)
542+
return
543+
mempalace_bin = os.path.join(os.path.dirname(sys.executable), "mempalace")
544+
argv = [mempalace_bin, "mine", translated, "--mode", "projects", "--wing", wing]
545+
# Same pattern as /mine endpoint: list-form argv, no shell.
546+
async with _mine_sem:
547+
proc = await asyncio.create_subprocess_exec(
548+
*argv,
549+
stdout=asyncio.subprocess.PIPE,
550+
stderr=asyncio.subprocess.PIPE,
551+
)
552+
stdout, stderr = await proc.communicate()
553+
if proc.returncode != 0:
554+
logger.warning("watcher mine returned %s for %s", proc.returncode, translated)
555+
556+
watcher = WatcherService(make_async_mine_fn(loop, _internal_mine))
557+
watcher.start(targets)
558+
app.state.watcher = watcher
559+
except Exception as e:
560+
logger.warning("File-watcher startup failed (non-fatal): %s", e)
561+
app.state.watcher = None
562+
528563
yield
529564

530565
# --- Shutdown: Silent Save / Flush ---
531566
logger.info("Lifespan: shutting down, flushing memories...")
567+
try:
568+
watcher = getattr(app.state, "watcher", None)
569+
if watcher is not None:
570+
watcher.stop()
571+
logger.info("WatcherService stopped.")
572+
except Exception:
573+
logger.exception("WatcherService stop failed (non-fatal)")
532574
try:
533575
# We call mempalace_memories_filed_away which triggers a checkpoint in recent mempalace versions
534576
await _call({
@@ -1135,6 +1177,22 @@ async def mine(request: Request, x_api_key: str | None = Header(default=None)):
11351177
}
11361178

11371179

1180+
@app.get("/watch")
1181+
async def watch_list(x_api_key: str | None = Header(default=None)):
1182+
"""List the directories the file-watcher is currently monitoring.
1183+
1184+
Configured at startup via PALACE_WATCH_DIRS env var; runtime add /
1185+
remove requires a daemon restart. Returns an empty list when the
1186+
watcher isn't running (env unset, watchdog package missing, or
1187+
startup failed).
1188+
"""
1189+
_check_auth(x_api_key)
1190+
watcher = getattr(app.state, "watcher", None)
1191+
if watcher is None:
1192+
return {"running": False, "targets": []}
1193+
return {"running": True, "targets": watcher.list_targets()}
1194+
1195+
11381196
# ── Repair + silent-save ─────────────────────────────────────────────────────
11391197

11401198
@app.post("/silent-save")

requirements.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
11
mempalace>=3.3.2
22
fastapi>=0.136.0
33
uvicorn[standard]>=0.44.0
4+
watchdog>=3.0.0

tests/test_watcher.py

Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
"""Unit tests for the file-watcher service.
2+
3+
Run with::
4+
5+
python -m unittest tests.test_watcher -v
6+
7+
Tests are pure-function: parse_watch_dirs() reads the env var and
8+
returns a list, with no observer threads or filesystem watching
9+
involved. The Observer-side debouncing is tested in isolation via
10+
the _DebouncedMineHandler class — instantiated with a fake mine
11+
callback, fed events, asserted on call count after the debounce
12+
window has passed.
13+
"""
14+
import os
15+
import sys
16+
import threading
17+
import time
18+
import unittest
19+
from unittest.mock import patch
20+
21+
_HERE = os.path.dirname(os.path.abspath(__file__))
22+
_ROOT = os.path.dirname(_HERE)
23+
if _ROOT not in sys.path:
24+
sys.path.insert(0, _ROOT)
25+
26+
import watcher as _watcher # noqa: E402
27+
28+
29+
class TestParseWatchDirs(unittest.TestCase):
30+
def setUp(self):
31+
# Use the test-class temp dir so created paths exist for the
32+
# is_dir() guard in parse_watch_dirs.
33+
import tempfile
34+
35+
self._tmp = tempfile.TemporaryDirectory()
36+
self.tmp = self._tmp.name
37+
38+
def tearDown(self):
39+
self._tmp.cleanup()
40+
41+
def test_empty_returns_empty(self):
42+
self.assertEqual(_watcher.parse_watch_dirs(""), [])
43+
self.assertEqual(_watcher.parse_watch_dirs(" "), [])
44+
45+
def test_path_only_derives_wing(self):
46+
d = os.path.join(self.tmp, "my-project")
47+
os.mkdir(d)
48+
targets = _watcher.parse_watch_dirs(d)
49+
self.assertEqual(len(targets), 1)
50+
self.assertEqual(targets[0].wing, "my_project")
51+
self.assertEqual(str(targets[0].path), d)
52+
53+
def test_path_with_explicit_wing(self):
54+
d = os.path.join(self.tmp, "anything")
55+
os.mkdir(d)
56+
targets = _watcher.parse_watch_dirs(f"{d}=wing_specific")
57+
self.assertEqual(len(targets), 1)
58+
self.assertEqual(targets[0].wing, "wing_specific")
59+
60+
def test_multiple_entries(self):
61+
d1 = os.path.join(self.tmp, "alpha")
62+
d2 = os.path.join(self.tmp, "beta")
63+
os.mkdir(d1)
64+
os.mkdir(d2)
65+
targets = _watcher.parse_watch_dirs(f"{d1}=wing_a, {d2}")
66+
self.assertEqual(len(targets), 2)
67+
wings = {t.wing for t in targets}
68+
self.assertEqual(wings, {"wing_a", "beta"})
69+
70+
def test_skips_nonexistent_paths(self):
71+
d = os.path.join(self.tmp, "exists")
72+
os.mkdir(d)
73+
missing = os.path.join(self.tmp, "missing-dir-never-created")
74+
targets = _watcher.parse_watch_dirs(f"{d}=ok,{missing}=bad")
75+
self.assertEqual(len(targets), 1)
76+
self.assertEqual(targets[0].wing, "ok")
77+
78+
def test_skips_files(self):
79+
f = os.path.join(self.tmp, "regular-file")
80+
open(f, "w").close()
81+
targets = _watcher.parse_watch_dirs(f"{f}=should_skip")
82+
self.assertEqual(targets, [])
83+
84+
def test_reads_env_var_when_no_arg(self):
85+
d = os.path.join(self.tmp, "from-env")
86+
os.mkdir(d)
87+
with patch.dict(os.environ, {"PALACE_WATCH_DIRS": f"{d}=env_wing"}, clear=True):
88+
targets = _watcher.parse_watch_dirs()
89+
self.assertEqual(len(targets), 1)
90+
self.assertEqual(targets[0].wing, "env_wing")
91+
92+
93+
class TestDebouncedMineHandler(unittest.TestCase):
94+
"""Integration-ish: instantiate the real handler, feed events, watch
95+
the debounce timer fire."""
96+
97+
def _make_event(self, src_path: str, is_directory: bool = False):
98+
# Real watchdog event objects need the watchdog dep. Use a
99+
# minimal stand-in that satisfies the handler's attribute reads.
100+
class _E:
101+
def __init__(self):
102+
self.src_path = src_path
103+
self.is_directory = is_directory
104+
105+
return _E()
106+
107+
def test_single_event_fires_after_debounce(self):
108+
target = _watcher.WatchTarget(path=_watcher.Path("/x"), wing="w")
109+
fired: list[tuple[str, str]] = []
110+
cv = threading.Condition()
111+
112+
def mine_fn(p, w):
113+
with cv:
114+
fired.append((p, w))
115+
cv.notify_all()
116+
117+
# Shorten the debounce so the test stays fast.
118+
original = _watcher._DEBOUNCE_SECONDS
119+
_watcher._DEBOUNCE_SECONDS = 0.05
120+
try:
121+
handler = _watcher._DebouncedMineHandler(target, mine_fn)
122+
handler.on_any_event(self._make_event("/x/foo.md"))
123+
with cv:
124+
cv.wait(timeout=1.0)
125+
self.assertEqual(fired, [("/x", "w")])
126+
finally:
127+
_watcher._DEBOUNCE_SECONDS = original
128+
129+
def test_burst_collapses_to_one_fire(self):
130+
"""A storm of events on the same target produces exactly one mine."""
131+
target = _watcher.WatchTarget(path=_watcher.Path("/x"), wing="w")
132+
fired: list[tuple[str, str]] = []
133+
cv = threading.Condition()
134+
135+
def mine_fn(p, w):
136+
with cv:
137+
fired.append((p, w))
138+
cv.notify_all()
139+
140+
original = _watcher._DEBOUNCE_SECONDS
141+
_watcher._DEBOUNCE_SECONDS = 0.05
142+
try:
143+
handler = _watcher._DebouncedMineHandler(target, mine_fn)
144+
for i in range(20):
145+
handler.on_any_event(self._make_event(f"/x/file{i}.py"))
146+
time.sleep(0.005)
147+
with cv:
148+
cv.wait(timeout=1.0)
149+
self.assertEqual(len(fired), 1)
150+
finally:
151+
_watcher._DEBOUNCE_SECONDS = original
152+
153+
def test_skips_directory_events(self):
154+
target = _watcher.WatchTarget(path=_watcher.Path("/x"), wing="w")
155+
fired: list[tuple[str, str]] = []
156+
original = _watcher._DEBOUNCE_SECONDS
157+
_watcher._DEBOUNCE_SECONDS = 0.05
158+
try:
159+
handler = _watcher._DebouncedMineHandler(target, lambda p, w: fired.append((p, w)))
160+
handler.on_any_event(self._make_event("/x/subdir", is_directory=True))
161+
time.sleep(0.15)
162+
self.assertEqual(fired, [])
163+
finally:
164+
_watcher._DEBOUNCE_SECONDS = original
165+
166+
def test_skips_unwatched_extensions(self):
167+
target = _watcher.WatchTarget(path=_watcher.Path("/x"), wing="w")
168+
fired: list[tuple[str, str]] = []
169+
original = _watcher._DEBOUNCE_SECONDS
170+
_watcher._DEBOUNCE_SECONDS = 0.05
171+
try:
172+
handler = _watcher._DebouncedMineHandler(target, lambda p, w: fired.append((p, w)))
173+
# Extensions not in _WATCHABLE_EXTENSIONS — editor swap, lock,
174+
# build artifact, binary.
175+
for path in ("/x/file.swp", "/x/file.lock", "/x/file.pyc", "/x/file.png"):
176+
handler.on_any_event(self._make_event(path))
177+
time.sleep(0.15)
178+
self.assertEqual(fired, [])
179+
finally:
180+
_watcher._DEBOUNCE_SECONDS = original
181+
182+
183+
if __name__ == "__main__":
184+
unittest.main()

0 commit comments

Comments
 (0)