Skip to content

Commit a0ba8dd

Browse files
committed
Convert TaskQueue tests to use context manager and update installed packages
1 parent 170d707 commit a0ba8dd

4 files changed

Lines changed: 370 additions & 321 deletions

File tree

docs/specs/02-messaging-update/plan.md

Lines changed: 27 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ Use `freezegun` or manual timestamp manipulation to test timeout-based behaviour
132132

133133
### Checkpoint: Phase 1
134134

135-
- [ ] `uv run pytest --agent-digest=term` — all tests pass
136-
- [ ] `pre-commit run --all-files` — clean
137-
- [ ] `TaskQueue` is fully exercised; concurrent-claim test passes
138-
- [ ] Human review before proceeding
135+
- [x] `uv run pytest --agent-digest=term` — all tests pass
136+
- [x] `pre-commit run --all-files` — clean
137+
- [x] `TaskQueue` is fully exercised; concurrent-claim test passes
138+
- [x] Human review before proceeding
139139

140140
### Phase 2: Harness Queue API Endpoints
141141

@@ -156,17 +156,17 @@ The router receives a `TaskQueue` instance via FastAPI dependency injection (use
156156

157157
**Acceptance criteria:**
158158

159-
- [ ] `POST /queue/next` returns 200 + `TaskMessage` JSON when a task is available
160-
- [ ] `POST /queue/next` returns 204 when the queue is empty
161-
- [ ] `POST /queue/complete` stores the decision and returns 202
162-
- [ ] `POST /queue/heartbeat` updates `last_heartbeat` and returns 202
163-
- [ ] All endpoints return 202 immediately (no blocking on downstream work)
164-
- [ ] Router is included in `app` (registered in `server.py`)
159+
- [x] `POST /queue/next` returns 200 + `TaskMessage` JSON when a task is available
160+
- [x] `POST /queue/next` returns 204 when the queue is empty
161+
- [x] `POST /queue/complete` stores the decision and returns 202
162+
- [x] `POST /queue/heartbeat` updates `last_heartbeat` and returns 202
163+
- [x] All endpoints return 202 immediately (no blocking on downstream work)
164+
- [x] Router is included in `app` (registered in `server.py`)
165165

166166
**Verification:**
167167

168-
- [ ] `uv run pytest --agent-digest=term tests/test_queue_router.py` (written in Task 6)
169-
- [ ] `pre-commit run --all-files`
168+
- [x] `uv run pytest --agent-digest=term tests/test_queue_router.py` (written in Task 6)
169+
- [x] `pre-commit run --all-files`
170170

171171
**Dependencies:** Tasks 2, 3
172172

@@ -186,14 +186,14 @@ The trigger mechanism is an `asyncio.Event` set in the background loop and reset
186186

187187
**Acceptance criteria:**
188188

189-
- [ ] `POST /harness/result` accepts `{"task_id": "<uuid>"}` and returns 202 Accepted
190-
- [ ] Receiving the nudge triggers the drain loop event (verified by inspecting `app.state`)
191-
- [ ] Router is included in `app`
189+
- [x] `POST /harness/result` accepts `{"task_id": "<uuid>"}` and returns 202 Accepted
190+
- [x] Receiving the nudge triggers the drain loop event (verified by inspecting `app.state`)
191+
- [x] Router is included in `app`
192192

193193
**Verification:**
194194

195-
- [ ] `uv run pytest --agent-digest=term tests/test_result_router.py` (written in Task 6)
196-
- [ ] `pre-commit run --all-files`
195+
- [x] `uv run pytest --agent-digest=term tests/test_result_router.py` (written in Task 6)
196+
- [x] `pre-commit run --all-files`
197197

198198
**Dependencies:** Task 4
199199

@@ -212,16 +212,16 @@ Verify HTTP contracts only.
212212

213213
**Acceptance criteria:**
214214

215-
- [ ] `POST /queue/next` — 200 with task body when queue has a task
216-
- [ ] `POST /queue/next` — 204 when `claim_next()` returns `None`
217-
- [ ] `POST /queue/complete` — 202; `TaskQueue.complete()` called with correct args
218-
- [ ] `POST /queue/heartbeat` — 202; `TaskQueue.heartbeat()` called with correct `task_id`
219-
- [ ] `POST /harness/result` — 202; drain event is set
215+
- [x] `POST /queue/next` — 200 with task body when queue has a task
216+
- [x] `POST /queue/next` — 204 when `claim_next()` returns `None`
217+
- [x] `POST /queue/complete` — 202; `TaskQueue.complete()` called with correct args
218+
- [x] `POST /queue/heartbeat` — 202; `TaskQueue.heartbeat()` called with correct `task_id`
219+
- [x] `POST /harness/result` — 202; drain event is set
220220

221221
**Verification:**
222222

223-
- [ ] `uv run pytest --agent-digest=term tests/test_queue_router.py tests/test_result_router.py`
224-
- [ ] `pre-commit run --all-files`
223+
- [x] `uv run pytest --agent-digest=term tests/test_queue_router.py tests/test_result_router.py`
224+
- [x] `pre-commit run --all-files`
225225

226226
**Dependencies:** Tasks 4, 5
227227

@@ -234,9 +234,9 @@ Verify HTTP contracts only.
234234

235235
### Checkpoint: Phase 2
236236

237-
- [ ] `uv run pytest --agent-digest=term` — all tests pass
238-
- [ ] All three queue endpoints + `/harness/result` exist and return correct status codes
239-
- [ ] Human review before proceeding
237+
- [x] `uv run pytest --agent-digest=term` — all tests pass
238+
- [x] All three queue endpoints + `/harness/result` exist and return correct status codes
239+
- [x] Human review before proceeding
240240

241241
### Phase 3: `foreman-client` Package
242242

tests/test_memory.py

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,19 +35,19 @@ def test_action_log_table_exists(self, tmp_path: Path) -> None:
3535
db_path = tmp_path / "memory.db"
3636
store = MemoryStore(db_path)
3737
store.close()
38-
with sqlite3.connect(db_path) as conn:
39-
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='action_log'").fetchall()
38+
conn = sqlite3.connect(db_path)
39+
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='action_log'").fetchall()
40+
conn.close()
4041
assert rows, "action_log table should exist"
4142

4243
def test_memory_summary_table_exists(self, tmp_path: Path) -> None:
4344
"""memory_summary table is created with the correct schema."""
4445
db_path = tmp_path / "memory.db"
4546
store = MemoryStore(db_path)
4647
store.close()
47-
with sqlite3.connect(db_path) as conn:
48-
rows = conn.execute(
49-
"SELECT name FROM sqlite_master WHERE type='table' AND name='memory_summary'"
50-
).fetchall()
48+
conn = sqlite3.connect(db_path)
49+
rows = conn.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='memory_summary'").fetchall()
50+
conn.close()
5151
assert rows, "memory_summary table should exist"
5252

5353

@@ -71,8 +71,9 @@ def test_log_action_inserts_row(self, store: MemoryStore) -> None:
7171
actions=[ActionItem(type="add_label", label="bug")],
7272
)
7373

74-
with sqlite3.connect(store.db_path) as conn:
75-
rows = conn.execute("SELECT * FROM action_log").fetchall()
74+
conn = sqlite3.connect(store.db_path)
75+
rows = conn.execute("SELECT * FROM action_log").fetchall()
76+
conn.close()
7677
assert len(rows) == 1
7778

7879
def test_log_action_stores_correct_values(self, store: MemoryStore) -> None:
@@ -87,10 +88,9 @@ def test_log_action_stores_correct_values(self, store: MemoryStore) -> None:
8788
actions=actions,
8889
)
8990

90-
with sqlite3.connect(store.db_path) as conn:
91-
row = conn.execute(
92-
"SELECT repo, issue_id, task_type, decision, rationale, actions FROM action_log"
93-
).fetchone()
91+
conn = sqlite3.connect(store.db_path)
92+
row = conn.execute("SELECT repo, issue_id, task_type, decision, rationale, actions FROM action_log").fetchone()
93+
conn.close()
9494

9595
assert row[0] == "owner/repo"
9696
assert row[1] == 7
@@ -113,8 +113,9 @@ def test_log_action_multiple_entries(self, store: MemoryStore) -> None:
113113
actions=[],
114114
)
115115

116-
with sqlite3.connect(store.db_path) as conn:
117-
count = conn.execute("SELECT COUNT(*) FROM action_log").fetchone()[0]
116+
conn = sqlite3.connect(store.db_path)
117+
count = conn.execute("SELECT COUNT(*) FROM action_log").fetchone()[0]
118+
conn.close()
118119
assert count == 3
119120

120121
def test_log_action_null_rationale(self, store: MemoryStore) -> None:
@@ -128,8 +129,9 @@ def test_log_action_null_rationale(self, store: MemoryStore) -> None:
128129
actions=[],
129130
)
130131

131-
with sqlite3.connect(store.db_path) as conn:
132-
row = conn.execute("SELECT rationale FROM action_log").fetchone()
132+
conn = sqlite3.connect(store.db_path)
133+
row = conn.execute("SELECT rationale FROM action_log").fetchone()
134+
conn.close()
133135
assert row[0] is None
134136

135137

tests/test_queue.py

Lines changed: 32 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,8 @@ def test_index_exists(self, queue: TaskQueue) -> None:
6969
def test_db_file_created(self, tmp_path: Path) -> None:
7070
"""DB file and parent directories are auto-created."""
7171
db_path = tmp_path / "nested" / "dir" / "queue.db"
72-
TaskQueue(db_path=db_path)
72+
with TaskQueue(db_path=db_path):
73+
pass
7374
assert db_path.exists()
7475

7576

@@ -216,24 +217,24 @@ class TestRequeueStale:
216217

217218
def test_requeue_stale_re_enqueues_timed_out_task(self, tmp_path: Path) -> None:
218219
"""A claimed task past the timeout is re-enqueued and retry_count incremented."""
219-
q = TaskQueue(db_path=tmp_path / "queue.db", claim_timeout_seconds=1)
220-
task = _make_task("t1")
221-
q.enqueue(task, agent_url="http://agent:9001")
222-
q.claim_next(agent_url="http://agent:9001")
220+
with TaskQueue(db_path=tmp_path / "queue.db", claim_timeout_seconds=1) as q:
221+
task = _make_task("t1")
222+
q.enqueue(task, agent_url="http://agent:9001")
223+
q.claim_next(agent_url="http://agent:9001")
223224

224-
# Force both claimed_at and last_heartbeat into the past to simulate timeout
225-
past = time.time() - 10
226-
q._conn.execute(
227-
"UPDATE task_queue SET claimed_at = ?, last_heartbeat = ? WHERE task_id = 't1'",
228-
(past, past),
229-
)
225+
# Force both claimed_at and last_heartbeat into the past to simulate timeout
226+
past = time.time() - 10
227+
q._conn.execute(
228+
"UPDATE task_queue SET claimed_at = ?, last_heartbeat = ? WHERE task_id = 't1'",
229+
(past, past),
230+
)
230231

231-
count = q.requeue_stale()
232-
assert count == 1
232+
count = q.requeue_stale()
233+
assert count == 1
233234

234-
row = q._conn.execute("SELECT status, retry_count FROM task_queue WHERE task_id = 't1'").fetchone()
235-
assert row[0] == "pending"
236-
assert row[1] == 1
235+
row = q._conn.execute("SELECT status, retry_count FROM task_queue WHERE task_id = 't1'").fetchone()
236+
assert row[0] == "pending"
237+
assert row[1] == 1
237238

238239
def test_requeue_stale_ignores_fresh_claims(self, queue: TaskQueue) -> None:
239240
"""A recently claimed task is not re-enqueued."""
@@ -245,21 +246,21 @@ def test_requeue_stale_ignores_fresh_claims(self, queue: TaskQueue) -> None:
245246

246247
def test_requeue_stale_ignores_heartbeated_task(self, tmp_path: Path) -> None:
247248
"""A task with a recent heartbeat is not re-enqueued even if claimed_at is old."""
248-
q = TaskQueue(db_path=tmp_path / "queue.db", claim_timeout_seconds=1)
249-
task = _make_task("t1")
250-
q.enqueue(task, agent_url="http://agent:9001")
251-
q.claim_next(agent_url="http://agent:9001")
252-
253-
# Age the claimed_at but keep last_heartbeat fresh
254-
now = time.time()
255-
q._conn.execute(
256-
"UPDATE task_queue SET claimed_at = ?, last_heartbeat = ? WHERE task_id = 't1'",
257-
(now - 10, now),
258-
)
259-
q._conn.commit()
260-
261-
count = q.requeue_stale()
262-
assert count == 0
249+
with TaskQueue(db_path=tmp_path / "queue.db", claim_timeout_seconds=1) as q:
250+
task = _make_task("t1")
251+
q.enqueue(task, agent_url="http://agent:9001")
252+
q.claim_next(agent_url="http://agent:9001")
253+
254+
# Age the claimed_at but keep last_heartbeat fresh
255+
now = time.time()
256+
q._conn.execute(
257+
"UPDATE task_queue SET claimed_at = ?, last_heartbeat = ? WHERE task_id = 't1'",
258+
(now - 10, now),
259+
)
260+
q._conn.commit()
261+
262+
count = q.requeue_stale()
263+
assert count == 0
263264

264265

265266
# ---------------------------------------------------------------------------

0 commit comments

Comments
 (0)