Skip to content

Commit 2e67a52

Browse files
call-me-ramhaofeif
andauthored
feat(inbox): add reconciliation sweep for orphaned PENDING messages (#266)
Inbox messages could stay PENDING forever when the receiving terminal was already idle: the single immediate (on POST) delivery attempt can observe a stale status and skip, and the log-watching observer never fires again because an idle agent produces no new log output. With both fast paths missed there was no fallback (issue #131). Add a provider-agnostic reconciliation daemon that runs on a slower interval than the watchdog and re-attempts delivery for any message left PENDING past a grace window. The grace window keeps the sweep from competing with the immediate and watchdog paths for freshly queued messages — it only adopts ones those paths have already missed. - list_pending_receiver_ids_older_than: provider-agnostic query for stuck receivers, joined against terminals so deleted receivers are skipped; the cutoff uses local-naive datetime.now() to match InboxModel.created_at. - reconcile_orphaned_messages: sweep body, reusing check_and_send_pending_messages. - inbox_reconciliation_daemon: background loop wired into the server lifespan. - INBOX_RECONCILE_INTERVAL / INBOX_RECONCILE_GRACE_SECONDS constants. The sweep reuses the existing delivery helper and so shares its known duplicate-wakeup race; making delivery atomic is left to the unified delivery engine tracked in GH #115. Documented in docs/inbox-delivery.md. Fixes #131 Co-authored-by: haofeif <56006724+haofeif@users.noreply.github.com>
1 parent ccc8a92 commit 2e67a52

8 files changed

Lines changed: 305 additions & 4 deletions

File tree

docs/inbox-delivery.md

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,3 +72,22 @@ The `_initialized` gate is important -- it prevents delivery during startup when
7272
| Message delivered during PROCESSING gets lost (agent errors mid-turn) | Low | Message status is DELIVERED; acceptable for v1 |
7373
| Watchdog fires every 5s during long turns | Medium (bounded) | One DB query + one tmux call per interval; no amplification |
7474
| Feature causes regression in non-eager providers | None | Provider flag defaults to False; only opt-in providers affected |
75+
76+
## Reconciliation Sweep
77+
78+
The immediate and watchdog paths can both miss a message when the receiving terminal is *already idle* when the message is queued:
79+
80+
- the single immediate attempt may observe a momentarily stale status and skip delivery, and
81+
- the watchdog only fires on log-file changes, which an already-idle agent that produces no further output never generates.
82+
83+
When both miss, the message would otherwise stay `PENDING` forever (issue #131).
84+
85+
A provider-agnostic background sweep closes this gap. Every `INBOX_RECONCILE_INTERVAL` (default 30s) it re-attempts delivery for any message left `PENDING` longer than `INBOX_RECONCILE_GRACE_SECONDS` (default 30s), routing it back through the same `check_and_send_pending_messages()` gate as the other paths. The work scales with the number of *backlogged* receivers, not the total agent count: when nothing is stuck the sweep runs one cheap query and returns.
86+
87+
### Grace Window
88+
89+
The sweep deliberately ignores messages younger than the grace window. The immediate and watchdog paths own delivery during that window; the sweep only adopts messages they have demonstrably had their chance at and missed. This keeps the sweep from competing with the fast paths on freshly queued messages and minimizes its overlap with them.
90+
91+
### Relationship to the OpenCode Poller
92+
93+
The sweep does not replace the OpenCode poller. They serve different roles: the OpenCode poller is a fast (5s) primary wakeup for a provider whose logs stop changing once its TUI settles, while the sweep is a slow, provider-agnostic safety net. Both reuse `check_and_send_pending_messages()` and so share its known duplicate-wakeup race; the grace window keeps the sweep from overlapping the fast paths in practice. GH #115 tracks unifying all of these wakeup sources into a single coordinated delivery engine that would make delivery atomic.

src/cli_agent_orchestrator/api/main.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
CORS_ORIGINS,
4343
DEFAULT_PROVIDER,
4444
INBOX_POLLING_INTERVAL,
45+
INBOX_RECONCILE_INTERVAL,
4546
SERVER_HOST,
4647
SERVER_PORT,
4748
SERVER_VERSION,
@@ -111,6 +112,24 @@ async def opencode_inbox_delivery_daemon(registry: PluginRegistry) -> None:
111112
logger.exception("OpenCode inbox delivery poller error")
112113

113114

115+
async def inbox_reconciliation_daemon(registry: PluginRegistry) -> None:
116+
"""Background task that recovers inbox messages the fast paths missed.
117+
118+
Safety net for issue #131: the immediate (on POST) and watchdog (on log
119+
change) delivery paths can both miss a message when the receiver is already
120+
idle, leaving it PENDING forever. This sweep runs on a slower interval than
121+
the watchdog and re-attempts delivery for anything left pending past the
122+
grace window.
123+
"""
124+
logger.info("Inbox reconciliation daemon started")
125+
while True:
126+
await asyncio.sleep(INBOX_RECONCILE_INTERVAL)
127+
try:
128+
await asyncio.to_thread(inbox_service.reconcile_orphaned_messages, registry)
129+
except Exception:
130+
logger.exception("Inbox reconciliation daemon error")
131+
132+
114133
# Response Models
115134
class TerminalOutputResponse(BaseModel):
116135
output: str
@@ -183,6 +202,10 @@ async def lifespan(app: FastAPI):
183202
# provider-specific wakeup path with a unified delivery engine.
184203
opencode_inbox_task = asyncio.create_task(opencode_inbox_delivery_daemon(registry))
185204

205+
# Start provider-agnostic reconciliation sweep for orphaned PENDING
206+
# messages the immediate and watchdog paths missed (issue #131).
207+
inbox_reconcile_task = asyncio.create_task(inbox_reconciliation_daemon(registry))
208+
186209
# Start inbox watcher
187210
inbox_observer = PollingObserver(timeout=INBOX_POLLING_INTERVAL)
188211
inbox_observer.schedule(LogFileHandler(registry), str(TERMINAL_LOG_DIR), recursive=False)
@@ -210,6 +233,13 @@ async def lifespan(app: FastAPI):
210233
except asyncio.CancelledError:
211234
pass
212235

236+
# Cancel inbox reconciliation sweep on shutdown
237+
inbox_reconcile_task.cancel()
238+
try:
239+
await inbox_reconcile_task
240+
except asyncio.CancelledError:
241+
pass
242+
213243
await registry.teardown()
214244
logger.info("Shutting down CLI Agent Orchestrator server...")
215245

src/cli_agent_orchestrator/clients/database.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
import logging
44
import uuid
5-
from datetime import datetime, timezone
5+
from datetime import datetime, timedelta, timezone
66
from typing import Any, Dict, List, Optional
77

88
from sqlalchemy import (
@@ -344,6 +344,36 @@ def list_pending_receiver_ids_by_provider(provider: str) -> List[str]:
344344
return [row[0] for row in rows]
345345

346346

347+
def list_pending_receiver_ids_older_than(min_age_seconds: int) -> List[str]:
348+
"""List receiver terminal IDs whose messages have been PENDING too long.
349+
350+
Returns the distinct receivers of any message still PENDING for longer than
351+
``min_age_seconds``. Used by the inbox reconciliation sweep to find messages
352+
the immediate and watchdog delivery paths missed, without competing with
353+
them for freshly queued ones (issue #131).
354+
355+
The join on ``terminals`` drops messages whose receiver terminal no longer
356+
exists, so the sweep does not keep retrying deliveries to deleted agents.
357+
358+
``created_at`` is stored local-naive (``InboxModel.created_at`` defaults to
359+
``datetime.now``), so the cutoff uses ``datetime.now()`` to match — the same
360+
convention as the retention query in ``cleanup_service.cleanup_old_data``.
361+
"""
362+
cutoff = datetime.now() - timedelta(seconds=min_age_seconds)
363+
with SessionLocal() as db:
364+
rows = (
365+
db.query(InboxModel.receiver_id)
366+
.join(TerminalModel, TerminalModel.id == InboxModel.receiver_id)
367+
.filter(
368+
InboxModel.status == MessageStatus.PENDING.value,
369+
InboxModel.created_at < cutoff,
370+
)
371+
.distinct()
372+
.all()
373+
)
374+
return [row[0] for row in rows]
375+
376+
347377
def delete_terminal(terminal_id: str) -> bool:
348378
"""Delete terminal metadata."""
349379
with SessionLocal() as db:

src/cli_agent_orchestrator/constants.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,25 @@
6666
# for capable providers (e.g., Claude Code).
6767
EAGER_INBOX_DELIVERY = os.environ.get("CAO_EAGER_INBOX_DELIVERY", "false").lower() == "true"
6868

69+
# Reconciliation sweep for orphaned inbox messages.
70+
# The immediate (on POST) and watchdog (on log-file change) delivery paths can
71+
# both miss a message when the receiving terminal is already idle: the single
72+
# immediate attempt may observe a stale status, and the watchdog only fires on
73+
# log output that an idle agent never produces. Those messages would otherwise
74+
# stay PENDING forever. A slow background sweep re-attempts delivery for any
75+
# message left pending past the grace window below, acting as a catch-all
76+
# fallback under the two fast paths (issue #131).
77+
#
78+
# The interval is deliberately much larger than INBOX_POLLING_INTERVAL: this is
79+
# a safety net, not a primary delivery path, so it trades latency for low load.
80+
INBOX_RECONCILE_INTERVAL = 30 # seconds between reconciliation sweeps
81+
82+
# Only reconcile messages older than this. Keeping the grace window >=
83+
# INBOX_POLLING_INTERVAL (the watchdog poll cadence) ensures the sweep never
84+
# competes with the immediate and watchdog paths for freshly queued messages —
85+
# it only adopts ones those paths have already had their chance at and missed.
86+
INBOX_RECONCILE_GRACE_SECONDS = 30
87+
6988
# =============================================================================
7089
# Cleanup Service Configuration
7190
# =============================================================================

src/cli_agent_orchestrator/services/inbox_service.py

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,14 @@
3131
from cli_agent_orchestrator.clients.database import (
3232
get_pending_messages,
3333
list_pending_receiver_ids_by_provider,
34+
list_pending_receiver_ids_older_than,
3435
update_message_status,
3536
)
36-
from cli_agent_orchestrator.constants import EAGER_INBOX_DELIVERY, TERMINAL_LOG_DIR
37+
from cli_agent_orchestrator.constants import (
38+
EAGER_INBOX_DELIVERY,
39+
INBOX_RECONCILE_GRACE_SECONDS,
40+
TERMINAL_LOG_DIR,
41+
)
3742
from cli_agent_orchestrator.models.inbox import MessageStatus, OrchestrationType
3843
from cli_agent_orchestrator.models.provider import ProviderType
3944
from cli_agent_orchestrator.models.terminal import TerminalStatus
@@ -161,6 +166,34 @@ def poll_opencode_pending_messages(registry: PluginRegistry | None = None) -> No
161166
logger.debug(f"OpenCode inbox poll failed for {terminal_id}: {e}")
162167

163168

169+
def reconcile_orphaned_messages(registry: PluginRegistry | None = None) -> None:
170+
"""Re-attempt delivery for messages stuck in PENDING past the grace window.
171+
172+
Provider-agnostic safety net for the gap described in issue #131: when a
173+
receiving terminal is already idle, the immediate (on POST) delivery path
174+
may miss on a stale status and the log-watching observer never fires again
175+
(an idle agent produces no new log output), leaving the message orphaned.
176+
This sweep finds any such message and routes it back through the normal
177+
delivery gate.
178+
179+
Only messages older than ``INBOX_RECONCILE_GRACE_SECONDS`` are considered,
180+
so the sweep never competes with the immediate and watchdog paths for
181+
freshly queued messages — it only adopts ones they have already missed.
182+
183+
Like ``poll_opencode_pending_messages``, this reuses ``check_and_send_pending_messages``
184+
and so inherits its known duplicate-wakeup race; the grace window keeps the
185+
sweep from overlapping the fast paths in practice, and GH #115 tracks the
186+
single coordinated delivery engine that would make delivery atomic.
187+
"""
188+
receiver_ids = list_pending_receiver_ids_older_than(INBOX_RECONCILE_GRACE_SECONDS)
189+
190+
for terminal_id in receiver_ids:
191+
try:
192+
check_and_send_pending_messages(terminal_id, registry=registry)
193+
except Exception as e:
194+
logger.debug(f"Inbox reconciliation failed for {terminal_id}: {e}")
195+
196+
164197
class LogFileHandler(FileSystemEventHandler):
165198
"""Handler for terminal log file changes."""
166199

test/api/test_api_endpoints.py

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,14 @@
1111

1212
import pytest
1313

14-
from cli_agent_orchestrator.api.main import app, flow_daemon, opencode_inbox_delivery_daemon
14+
from cli_agent_orchestrator.api.main import (
15+
app,
16+
flow_daemon,
17+
inbox_reconciliation_daemon,
18+
opencode_inbox_delivery_daemon,
19+
)
1520
from cli_agent_orchestrator.models.terminal import Terminal
21+
from cli_agent_orchestrator.services import inbox_service
1622
from cli_agent_orchestrator.utils.skills import SkillNameError
1723

1824
# ── Health endpoint ──────────────────────────────────────────────────
@@ -995,6 +1001,35 @@ async def fake_sleep(_seconds):
9951001
assert mock_to_thread.await_args.args[1] is registry
9961002

9971003

1004+
class TestInboxReconciliationDaemon:
1005+
"""Tests for the provider-agnostic inbox reconciliation sweep (issue #131)."""
1006+
1007+
@pytest.mark.asyncio
1008+
async def test_sweep_runs_one_iteration_then_cancels(self):
1009+
"""Daemon sleeps, runs the sync sweep in a thread, then handles cancellation."""
1010+
sleep_calls = 0
1011+
registry = MagicMock()
1012+
mock_to_thread = AsyncMock()
1013+
1014+
async def fake_sleep(_seconds):
1015+
nonlocal sleep_calls
1016+
sleep_calls += 1
1017+
if sleep_calls > 1:
1018+
raise asyncio.CancelledError
1019+
1020+
with (
1021+
patch("asyncio.sleep", new=fake_sleep),
1022+
patch("asyncio.to_thread", mock_to_thread),
1023+
):
1024+
with pytest.raises(asyncio.CancelledError):
1025+
await inbox_reconciliation_daemon(registry)
1026+
1027+
mock_to_thread.assert_awaited_once()
1028+
# The sweep, not some other sync function, must be the dispatched work.
1029+
assert mock_to_thread.await_args.args[0] is inbox_service.reconcile_orphaned_messages
1030+
assert mock_to_thread.await_args.args[1] is registry
1031+
1032+
9981033
# ── lifespan ─────────────────────────────────────────────────────────
9991034

10001035

@@ -1030,6 +1065,43 @@ async def fake_daemon():
10301065
mock_observer.stop.assert_called_once()
10311066
mock_observer.join.assert_called_once()
10321067

1068+
@pytest.mark.asyncio
1069+
async def test_lifespan_cancels_inbox_reconciliation_on_shutdown(self):
1070+
"""The reconciliation sweep task is cancelled when the server stops (issue #131)."""
1071+
from cli_agent_orchestrator.api.main import lifespan
1072+
1073+
mock_observer = MagicMock()
1074+
reconcile_cancelled = {"value": False}
1075+
1076+
async def fake_flow_daemon():
1077+
await asyncio.sleep(3600)
1078+
1079+
async def fake_reconcile(registry):
1080+
try:
1081+
await asyncio.sleep(3600)
1082+
except asyncio.CancelledError:
1083+
reconcile_cancelled["value"] = True
1084+
raise
1085+
1086+
with (
1087+
patch("cli_agent_orchestrator.api.main.setup_logging"),
1088+
patch("cli_agent_orchestrator.api.main.init_db"),
1089+
patch("cli_agent_orchestrator.api.main.cleanup_old_data"),
1090+
patch(
1091+
"cli_agent_orchestrator.api.main.PollingObserver",
1092+
return_value=mock_observer,
1093+
),
1094+
patch("cli_agent_orchestrator.api.main.flow_daemon", fake_flow_daemon),
1095+
patch(
1096+
"cli_agent_orchestrator.api.main.inbox_reconciliation_daemon",
1097+
fake_reconcile,
1098+
),
1099+
):
1100+
async with lifespan(app):
1101+
pass
1102+
1103+
assert reconcile_cancelled["value"] is True
1104+
10331105

10341106
# ── main() entry point ───────────────────────────────────────────────
10351107

test/clients/test_database.py

Lines changed: 68 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Tests for the database client."""
22

33
import tempfile
4-
from datetime import datetime
4+
from datetime import datetime, timedelta
55
from pathlib import Path
66
from unittest.mock import MagicMock, patch
77

@@ -27,6 +27,7 @@
2727
init_db,
2828
list_flows,
2929
list_pending_receiver_ids_by_provider,
30+
list_pending_receiver_ids_older_than,
3031
list_terminals_by_session,
3132
update_flow_enabled,
3233
update_flow_run_times,
@@ -234,6 +235,72 @@ def test_list_pending_receiver_ids_by_provider(self, mock_session_class):
234235

235236
assert result == ["receiver-1", "receiver-2"]
236237

238+
def test_list_pending_receiver_ids_older_than(self, test_db):
239+
"""Only messages pending past the grace window — whose receiver
240+
terminal still exists — are returned for reconciliation (issue #131).
241+
242+
Uses the real in-memory DB (not a mocked session) so the age cutoff,
243+
status filter, and terminal join are actually exercised.
244+
"""
245+
old = datetime.now() - timedelta(seconds=120)
246+
fresh = datetime.now()
247+
248+
with test_db() as seed:
249+
seed.add_all(
250+
[
251+
TerminalModel(
252+
id="term-old",
253+
tmux_session="cao-s",
254+
tmux_window="w",
255+
provider="kiro_cli",
256+
),
257+
TerminalModel(
258+
id="term-fresh",
259+
tmux_session="cao-s",
260+
tmux_window="w",
261+
provider="kiro_cli",
262+
),
263+
# Stuck long enough to reconcile, receiver still alive — kept.
264+
InboxModel(
265+
sender_id="a",
266+
receiver_id="term-old",
267+
message="m",
268+
status=MessageStatus.PENDING.value,
269+
created_at=old,
270+
),
271+
# Too recent — left to the immediate/watchdog paths.
272+
InboxModel(
273+
sender_id="a",
274+
receiver_id="term-fresh",
275+
message="m",
276+
status=MessageStatus.PENDING.value,
277+
created_at=fresh,
278+
),
279+
# Already delivered — not pending.
280+
InboxModel(
281+
sender_id="a",
282+
receiver_id="term-old",
283+
message="m",
284+
status=MessageStatus.DELIVERED.value,
285+
created_at=old,
286+
),
287+
# Receiver terminal is gone — dropped by the join.
288+
InboxModel(
289+
sender_id="a",
290+
receiver_id="term-ghost",
291+
message="m",
292+
status=MessageStatus.PENDING.value,
293+
created_at=old,
294+
),
295+
]
296+
)
297+
seed.commit()
298+
299+
with patch("cli_agent_orchestrator.clients.database.SessionLocal", test_db):
300+
result = list_pending_receiver_ids_older_than(30)
301+
302+
assert result == ["term-old"]
303+
237304
@patch("cli_agent_orchestrator.clients.database.SessionLocal")
238305
def test_delete_terminals_by_session(self, mock_session_class):
239306
"""Test deleting all terminals in a session."""

0 commit comments

Comments
 (0)