Skip to content

Commit 99b02c8

Browse files
committed
Update messaging protocol design spec to propose queue-mediated agent architecture
Adds detailed problem statement, design rationale, MVP scope, key assumptions, and open questions for implementing a robust task queue backed by SQLite. Documents at-least-once delivery, claim/requeue logic, and API adjustments. Addresses gaps in current synchronous dispatch handling.
1 parent a3633be commit 99b02c8

3 files changed

Lines changed: 105 additions & 32 deletions

File tree

docs/specs/02-messaging-update/SPEC.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -343,7 +343,7 @@ Inherits all project conventions from `CLAUDE.md`:
343343

344344
### Ask First (Require Explicit Config)
345345

346-
- `allow_close: true` — closing issues (unchanged from current behaviour).
346+
- `allow_close: true` — closing issues (unchanged from current behavior).
347347
- `max_retries` changes beyond the default — operators must set this deliberately.
348348

349349
### Never Do
@@ -359,7 +359,7 @@ Inherits all project conventions from `CLAUDE.md`:
359359

360360
- Multiple agent containers per queue (no consumer groups).
361361
- External queue backends (Redis, NATS) — pluggable interface defined, SQLite only implemented.
362-
- Task prioritisation or ordering beyond FIFO.
362+
- Task prioritization or ordering beyond FIFO.
363363
- Monitoring UI.
364364
- `GET /queue/status` operator endpoint.
365365

Lines changed: 76 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,79 @@
1-
# Messaging update
1+
# Messaging Update: Queue-Mediated Agent Protocol
22

33
## Problem Statement
44

5-
The current wire protocol doesn't handle the case where a message is sent to a node that is not connected.
6-
This leads to missed events with no way to recover.
5+
How might we ensure GitHub events dispatched to agent containers are processed reliably,
6+
even when agents are temporarily unavailable?
7+
8+
## Recommended Direction
9+
10+
The harness owns a task queue (SQLite by default, pluggable interface).
11+
Events are enqueued before any dispatch attempt — the queue is the source of truth.
12+
`POST /task → 202 Accepted` becomes a nudge ("check your queue now"), not a delivery mechanism.
13+
Agents poll the queue at startup, on a background interval, and when nudged.
14+
15+
Results flow symmetrically: the agent writes its DecisionMessage back to the queue,
16+
then POSTs to `POST /harness/result → 202 Accepted` to nudge the harness.
17+
The harness also has a background task that periodically checks for completed tasks.
18+
HTTP nudges are optimizations that degrade gracefully — the queue always wins.
19+
20+
This preserves the core constraint: the harness owns all infrastructure.
21+
Agents embed a thin `foreman-client` library that handles queue I/O.
22+
Agent authors call `client.next_task()` and `client.complete_task(task_id, decision)`.
23+
They don't implement queue management.
24+
25+
## Key Assumptions to Validate
26+
27+
- [ ] SQLite with WAL mode handles concurrent harness writes + agent reads
28+
without contention — benchmark before committing the schema
29+
- [ ] Agents are Python (or can embed a Python client) — validate the agent
30+
container build process supports a shared library dependency
31+
- [ ] 202 nudge + background poll provides acceptable end-to-end latency —
32+
define "acceptable" explicitly (target: < 30s for MVP)
33+
- [ ] One agent per queue is sufficient for MVP — the queue abstraction must
34+
not bake in single-consumer assumptions that block future fan-out
35+
36+
## MVP Scope
37+
38+
**In:**
39+
40+
- `task_queue` table in existing `memory.db`: task_id, agent_url, status,
41+
payload, created_at, claimed_at, completed_at, result, retry_count
42+
- Harness writes: enqueue on poll event; `POST /task → 202` nudge to agent;
43+
`POST /result` endpoint for agent callback; background drain loop for
44+
completed tasks; re-enqueue tasks claimed but not completed within timeout
45+
- Harness reads: poll queue for completed tasks on callback + interval
46+
- `foreman-client` lib: `next_task()`, `complete_task(task_id, decision)`,
47+
`heartbeat(task_id)` — heartbeat resets the claim timeout clock
48+
- Agent protocol: `POST /task → 202` (nudge only); startup queue poll;
49+
configurable background poll interval
50+
- Delivery guarantee: at-least-once; task_id is the idempotency key
51+
52+
**Out:**
53+
54+
- Multiple agent containers per queue (no consumer groups in MVP)
55+
- External queue backends (Redis, NATS) — define pluggable interface,
56+
implement SQLite only
57+
- Task prioritization or ordering beyond FIFO
58+
- Monitoring UI — structured log output only
59+
60+
## Not Doing (and Why)
61+
62+
- **Agent-owned queues** — every agent author would reimplement queue logic;
63+
harness owns infrastructure
64+
- **Exactly-once delivery** — requires distributed coordination; at-least-once
65+
- idempotency is sufficient and far simpler
66+
- **File-system queuing** — ephemeral in containers; shared volumes add
67+
deployment surface for no real gain over SQLite
68+
- **Keep synchronous dispatch as fallback** — two delivery paths means neither
69+
is authoritative; commit to queue-first fully
70+
71+
## Open Questions
72+
73+
- What is the claim timeout?
74+
If an agent pulls a task and crashes before completing, the harness must detect and re-enqueue it —
75+
define the TTL and re-enqueue logic before writing the schema.
76+
- Is `foreman-client` a separate PyPI package, part of the `foreman` package,
77+
or vendored into each agent at build time?
78+
- Should `GET /queue/status` be exposed on the harness for operator visibility,
79+
or is structured logging sufficient for MVP?

docs/specs/02-messaging-update/plan.md

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -43,16 +43,16 @@ Update `config.example.yaml` with the new section (commented out, showing defaul
4343

4444
**Acceptance criteria:**
4545

46-
- [ ] `QueueConfig` model exists with fields: `db_path: Path | None`, `claim_timeout_seconds: int = 300`,
46+
- [x] `QueueConfig` model exists with fields: `db_path: Path | None`, `claim_timeout_seconds: int = 300`,
4747
`max_retries: int = 3`, `drain_interval_seconds: int = 10`, `requeue_interval_seconds: int = 60`
48-
- [ ] `ForemanConfig.queue` defaults to a zero-config `QueueConfig()` when the section is absent
49-
- [ ] `${VAR}` references in `db_path` resolve correctly (inherits `_resolve_refs_in`)
50-
- [ ] Existing config tests still pass
48+
- [x] `ForemanConfig.queue` defaults to a zero-config `QueueConfig()` when the section is absent
49+
- [x] `${VAR}` references in `db_path` resolve correctly (inherits `_resolve_refs_in`)
50+
- [x] Existing config tests still pass
5151

5252
**Verification:**
5353

54-
- [ ] `uv run pytest --agent-digest=term tests/test_config.py`
55-
- [ ] `pre-commit run --all-files`
54+
- [x] `uv run pytest --agent-digest=term tests/test_config.py`
55+
- [x] `pre-commit run --all-files`
5656

5757
**Dependencies:** None
5858

@@ -77,20 +77,20 @@ or a `SELECT … FOR UPDATE` workaround to be concurrency-safe under multiple si
7777

7878
**Acceptance criteria:**
7979

80-
- [ ] `queue.db` schema matches spec (§3.1): `task_queue` table with all columns + index
81-
- [ ] `enqueue()` inserts with `status=pending`
82-
- [ ] `claim_next()` atomically claims oldest pending task for the given `agent_url`; returns `None` when empty
83-
- [ ] `complete()` sets `status=completed` and stores the serialised `DecisionMessage`
84-
- [ ] `heartbeat()` updates `last_heartbeat`
85-
- [ ] `drain_completed()` returns all `completed` rows and sets them to `done`
86-
- [ ] `requeue_stale()` re-enqueues `claimed` tasks past the claim timeout; increments `retry_count`
87-
- [ ] `fail_exhausted()` marks tasks with `retry_count >= max_retries` as `failed`
88-
- [ ] DB file and parent directories are auto-created (matching `MemoryStore` behaviour)
80+
- [x] `queue.db` schema matches spec (§3.1): `task_queue` table with all columns + index
81+
- [x] `enqueue()` inserts with `status=pending`
82+
- [x] `claim_next()` atomically claims oldest pending task for the given `agent_url`; returns `None` when empty
83+
- [x] `complete()` sets `status=completed` and stores the serialised `DecisionMessage`
84+
- [x] `heartbeat()` updates `last_heartbeat`
85+
- [x] `drain_completed()` returns all `completed` rows and sets them to `done`
86+
- [x] `requeue_stale()` re-enqueues `claimed` tasks past the claim timeout; increments `retry_count`
87+
- [x] `fail_exhausted()` marks tasks with `retry_count >= max_retries` as `failed`
88+
- [x] DB file and parent directories are auto-created (matching `MemoryStore` behaviour)
8989

9090
**Verification:**
9191

92-
- [ ] `uv run pytest --agent-digest=term tests/test_queue.py` (written in Task 3)
93-
- [ ] `pre-commit run --all-files`
92+
- [x] `uv run pytest --agent-digest=term tests/test_queue.py` (written in Task 3)
93+
- [x] `pre-commit run --all-files`
9494

9595
**Dependencies:** Task 1
9696

@@ -108,19 +108,19 @@ Use `freezegun` or manual timestamp manipulation to test timeout-based behaviour
108108

109109
**Acceptance criteria:**
110110

111-
- [ ] Schema creation: `task_queue` table and index exist after init
112-
- [ ] `enqueue` + `claim_next` happy path: task round-trips correctly
113-
- [ ] `claim_next` returns `None` on empty queue
114-
- [ ] `complete` + `drain_completed`: completed task is returned and marked `done`
115-
- [ ] `requeue_stale`: task claimed but not heartbeated past timeout → re-enqueued, `retry_count` incremented
116-
- [ ] `fail_exhausted`: task at `max_retries``status=failed`
117-
- [ ] Concurrent claim: two threads call `claim_next()` simultaneously; only one receives the task
118-
- [ ] Coverage ≥85% line / ≥80% branch for `foreman/queue.py`
111+
- [x] Schema creation: `task_queue` table and index exist after init
112+
- [x] `enqueue` + `claim_next` happy path: task round-trips correctly
113+
- [x] `claim_next` returns `None` on empty queue
114+
- [x] `complete` + `drain_completed`: completed task is returned and marked `done`
115+
- [x] `requeue_stale`: task claimed but not heartbeated past timeout → re-enqueued, `retry_count` incremented
116+
- [x] `fail_exhausted`: task at `max_retries``status=failed`
117+
- [x] Concurrent claim: two threads call `claim_next()` simultaneously; only one receives the task
118+
- [x] Coverage ≥85% line / ≥80% branch for `foreman/queue.py`
119119

120120
**Verification:**
121121

122-
- [ ] `uv run pytest --agent-digest=term tests/test_queue.py --cov=foreman/queue.py`
123-
- [ ] `pre-commit run --all-files`
122+
- [x] `uv run pytest --agent-digest=term tests/test_queue.py --cov=foreman/queue.py`
123+
- [x] `pre-commit run --all-files`
124124

125125
**Dependencies:** Task 2
126126

0 commit comments

Comments
 (0)