Phase 18.1 — HorizonPlanner: Multi-Horizon Goal Decomposition
Phase: 18.1 / 5 | Depends on: GoalRegistry (#319), TemporalGraph (#434)
Background
ASI:BUILD's current planning stack (Phase 10) operates on a flat goal queue with no notion of temporal scale. A goal like "optimise memory consolidation over the next hour" is treated identically to "respond to this sensor event in 50 ms". The HorizonPlanner introduces three planning horizons and routes goals into the correct horizon bucket, enabling the cognitive cycle to prioritise appropriately.
Design
Enums & Constants
from enum import Enum, auto
class PlanningHorizon(Enum):
SHORT = "short" # ≤ SHORT_HORIZON_MS (default 10 000 ms)
MEDIUM = "medium" # ≤ MEDIUM_HORIZON_MS (default 300 000 ms)
LONG = "long" # > MEDIUM_HORIZON_MS (default: anything longer)
Frozen Dataclasses
from dataclasses import dataclass, field
from typing import FrozenSet
@dataclass(frozen=True)
class HorizonBucket:
horizon: PlanningHorizon
goal_ids: FrozenSet[str] # IDs from GoalRegistry
window_ms: int # effective deadline window in ms
created_at_ns: int = field(default_factory=time.time_ns)
@dataclass(frozen=True)
class HorizonConfig:
short_horizon_ms: int = 10_000 # 10 s
medium_horizon_ms: int = 300_000 # 5 min
max_goals_per_bucket: int = 64
rebalance_interval_s: float = 1.0
Protocol
from typing import Protocol, runtime_checkable
@runtime_checkable
class HorizonPlanner(Protocol):
async def classify(self, goal_id: str, deadline_ns: int) -> PlanningHorizon: ...
async def get_bucket(self, horizon: PlanningHorizon) -> HorizonBucket: ...
async def rebalance(self) -> None: ... # re-classify stale goals
async def drain(self, horizon: PlanningHorizon, n: int) -> list[str]: ...
PriorityHorizonPlanner (concrete implementation)
import asyncio, time
from collections import defaultdict
class PriorityHorizonPlanner:
"""
Classifies goals into SHORT/MEDIUM/LONG horizon buckets and
drains them in deadline order within each bucket.
"""
def __init__(self, config: HorizonConfig, goal_registry, temporal_graph):
self._config = config
self._registry = goal_registry
self._graph = temporal_graph
self._lock = asyncio.Lock()
# bucket → list[(deadline_ns, goal_id)] — min-heap per horizon
self._heaps: dict[PlanningHorizon, list] = {h: [] for h in PlanningHorizon}
self._goal_horizon: dict[str, PlanningHorizon] = {}
async def classify(self, goal_id: str, deadline_ns: int) -> PlanningHorizon:
now_ns = time.time_ns()
delta_ms = (deadline_ns - now_ns) / 1_000_000
if delta_ms <= self._config.short_horizon_ms:
horizon = PlanningHorizon.SHORT
elif delta_ms <= self._config.medium_horizon_ms:
horizon = PlanningHorizon.MEDIUM
else:
horizon = PlanningHorizon.LONG
async with self._lock:
import heapq
heapq.heappush(self._heaps[horizon], (deadline_ns, goal_id))
self._goal_horizon[goal_id] = horizon
HORIZON_CLASSIFIED.labels(horizon=horizon.value).inc()
return horizon
async def get_bucket(self, horizon: PlanningHorizon) -> HorizonBucket:
async with self._lock:
ids = frozenset(gid for (_, gid) in self._heaps[horizon])
window_ms = {
PlanningHorizon.SHORT: self._config.short_horizon_ms,
PlanningHorizon.MEDIUM: self._config.medium_horizon_ms,
PlanningHorizon.LONG: 10 ** 9, # unbounded
}[horizon]
return HorizonBucket(horizon=horizon, goal_ids=ids, window_ms=window_ms)
async def rebalance(self) -> None:
"""Re-classify goals whose deadline has shifted horizon."""
import heapq
now_ns = time.time_ns()
async with self._lock:
for horizon in list(PlanningHorizon):
surviving = []
for (deadline_ns, gid) in self._heaps[horizon]:
delta_ms = (deadline_ns - now_ns) / 1_000_000
new_h = self._classify_delta(delta_ms)
if new_h != horizon:
heapq.heappush(self._heaps[new_h], (deadline_ns, gid))
self._goal_horizon[gid] = new_h
HORIZON_REBALANCED.labels(from_h=horizon.value, to_h=new_h.value).inc()
else:
surviving.append((deadline_ns, gid))
self._heaps[horizon] = surviving
heapq.heapify(self._heaps[horizon])
def _classify_delta(self, delta_ms: float) -> PlanningHorizon:
if delta_ms <= self._config.short_horizon_ms:
return PlanningHorizon.SHORT
if delta_ms <= self._config.medium_horizon_ms:
return PlanningHorizon.MEDIUM
return PlanningHorizon.LONG
async def drain(self, horizon: PlanningHorizon, n: int) -> list[str]:
import heapq
result = []
async with self._lock:
for _ in range(min(n, len(self._heaps[horizon]))):
_, gid = heapq.heappop(self._heaps[horizon])
self._goal_horizon.pop(gid, None)
result.append(gid)
HORIZON_DRAINED.labels(horizon=horizon.value).inc(len(result))
return result
NullHorizonPlanner
class NullHorizonPlanner:
async def classify(self, goal_id, deadline_ns): return PlanningHorizon.SHORT
async def get_bucket(self, horizon): return HorizonBucket(horizon=horizon, goal_ids=frozenset(), window_ms=0)
async def rebalance(self): ...
async def drain(self, horizon, n): return []
Factory
def make_horizon_planner(
config: HorizonConfig | None = None,
goal_registry=None,
temporal_graph=None,
) -> HorizonPlanner:
if goal_registry is None or temporal_graph is None:
return NullHorizonPlanner()
return PriorityHorizonPlanner(config or HorizonConfig(), goal_registry, temporal_graph)
Prometheus Metrics
| Metric |
Type |
Labels |
asi_horizon_classified_total |
Counter |
horizon |
asi_horizon_rebalanced_total |
Counter |
from_h, to_h |
asi_horizon_drained_total |
Counter |
horizon |
asi_horizon_bucket_size |
Gauge |
horizon |
asi_horizon_rebalance_duration_seconds |
Histogram |
— |
Test Targets (12)
test_short_deadline_classified_short — delta ≤ 10 s → SHORT
test_medium_deadline_classified_medium — 10 s < delta ≤ 5 min → MEDIUM
test_long_deadline_classified_long — delta > 5 min → LONG
test_get_bucket_contains_classified_goal — goal_id in HorizonBucket.goal_ids
test_drain_returns_deadline_ordered — earliest deadline returned first
test_drain_removes_from_heap — drain(SHORT, 1) removes one entry
test_rebalance_promotes_medium_to_short — goal crosses boundary after time passes
test_rebalance_demotes_long_to_medium — goal deadline shrinks
test_max_goals_per_bucket_cap — overflow raises or drops gracefully
test_concurrent_classify_safe — 50 concurrent classify() calls no corruption
test_null_planner_returns_empty — NullHorizonPlanner all paths safe
test_prometheus_metrics_increment — counters increment on classify/drain
Integration with CognitiveCycle
# In CognitiveCycle._temporal_step()
async def _temporal_step(self) -> None:
await self._temporal_orchestrator.run_cycle()
# NEW in Phase 18.1: drain SHORT-horizon goals first
urgent = await self._horizon_planner.drain(PlanningHorizon.SHORT, n=8)
for gid in urgent:
goal = await self._goal_registry.get(gid)
await self._plan_executor.submit(goal)
Phase 18 Sub-Phase Tracker
| Sub-phase |
Component |
Status |
| 18.1 |
HorizonPlanner |
🟡 Open (this issue) |
| 18.2 |
MemoryConsolidator |
⏳ Planned |
| 18.3 |
DistributedTemporalSync |
⏳ Planned |
| 18.4 |
CausalMemoryIndex |
⏳ Planned |
| 18.5 |
TemporalCoherenceArbiter |
⏳ Planned |
Labels: enhancement, phase-18, temporal, planning
Phase 18.1 —
HorizonPlanner: Multi-Horizon Goal DecompositionBackground
ASI:BUILD's current planning stack (Phase 10) operates on a flat goal queue with no notion of temporal scale. A goal like "optimise memory consolidation over the next hour" is treated identically to "respond to this sensor event in 50 ms". The
HorizonPlannerintroduces three planning horizons and routes goals into the correct horizon bucket, enabling the cognitive cycle to prioritise appropriately.Design
Enums & Constants
Frozen Dataclasses
Protocol
PriorityHorizonPlanner(concrete implementation)NullHorizonPlannerFactory
Prometheus Metrics
asi_horizon_classified_totalhorizonasi_horizon_rebalanced_totalfrom_h,to_hasi_horizon_drained_totalhorizonasi_horizon_bucket_sizehorizonasi_horizon_rebalance_duration_secondsTest Targets (12)
test_short_deadline_classified_short— delta ≤ 10 s → SHORTtest_medium_deadline_classified_medium— 10 s < delta ≤ 5 min → MEDIUMtest_long_deadline_classified_long— delta > 5 min → LONGtest_get_bucket_contains_classified_goal— goal_id in HorizonBucket.goal_idstest_drain_returns_deadline_ordered— earliest deadline returned firsttest_drain_removes_from_heap— drain(SHORT, 1) removes one entrytest_rebalance_promotes_medium_to_short— goal crosses boundary after time passestest_rebalance_demotes_long_to_medium— goal deadline shrinkstest_max_goals_per_bucket_cap— overflow raises or drops gracefullytest_concurrent_classify_safe— 50 concurrent classify() calls no corruptiontest_null_planner_returns_empty— NullHorizonPlanner all paths safetest_prometheus_metrics_increment— counters increment on classify/drainIntegration with CognitiveCycle
Phase 18 Sub-Phase Tracker
Labels:
enhancement,phase-18,temporal,planning