Overview
TemporalGraph is the foundational data structure for Phase 17 — Temporal Reasoning & Predictive Cognition. It maintains a directed acyclic graph (DAG) of time-stamped cognitive events and world-state snapshots, annotated with Allen interval relations to capture the qualitative temporal relationships between events.
Downstream components (EventSequencer, PredictiveEngine, SchedulerCortex) all query the TemporalGraph to obtain causal and temporal context.
Enum — AllenRelation
from enum import Enum, auto
class AllenRelation(Enum):
# 7 base relations (+ 6 inverses)
BEFORE = auto() # A ends before B starts
MEETS = auto() # A ends exactly when B starts
OVERLAPS = auto() # A starts before B, ends inside B
STARTS = auto() # A and B start together; A ends first
DURING = auto() # A is fully inside B
FINISHES = auto() # A and B end together; B starts first
EQUALS = auto() # A and B are identical intervals
# Inverses
AFTER = auto()
MET_BY = auto()
OVERLAPPED_BY = auto()
STARTED_BY = auto()
CONTAINS = auto()
FINISHED_BY = auto()
Frozen Dataclasses
from __future__ import annotations
from dataclasses import dataclass, field
from typing import FrozenSet, Any
@dataclass(frozen=True)
class TemporalNode:
node_id: str # UUID or stable hash
timestamp_ns: int # time.time_ns() at event creation
state_snapshot: dict[str, Any] # shallow copy of relevant state
tags: FrozenSet[str] = field(default_factory=frozenset)
@dataclass(frozen=True)
class TemporalEdge:
from_id: str
to_id: str
relation: AllenRelation
duration_ns: int # |to.timestamp_ns - from.timestamp_ns|
@dataclass(frozen=True)
class TemporalGraphConfig:
max_nodes: int = 10_000
consistency_check: bool = True # enforce DAG on add_edge
prune_horizon_s: float = 3600.0 # prune nodes older than this
Protocol — TemporalGraph
from typing import Protocol, runtime_checkable, Sequence
@runtime_checkable
class TemporalGraph(Protocol):
async def add_node(self, node: TemporalNode) -> None: ...
async def add_edge(self, edge: TemporalEdge) -> None: ...
async def get_successors(self, node_id: str) -> Sequence[TemporalNode]: ...
async def get_predecessors(self, node_id: str) -> Sequence[TemporalNode]: ...
async def check_consistency(self) -> bool: ...
async def prune(self, horizon_ns: int) -> int: ...
def stats(self) -> dict[str, int]: ...
Implementation — DictTemporalGraph
import asyncio
import time
from collections import defaultdict, deque
from typing import Sequence
class DictTemporalGraph:
def __init__(self, config: TemporalGraphConfig = TemporalGraphConfig()) -> None:
self._config = config
self._lock = asyncio.Lock()
self._nodes: dict[str, TemporalNode] = {}
self._edges: list[TemporalEdge] = []
self._successors: dict[str, list[str]] = defaultdict(list)
self._predecessors: dict[str, list[str]] = defaultdict(list)
self._insertion_order: deque[str] = deque()
self._nodes_added = 0
self._edges_added = 0
self._cycle_rejections = 0
self._prune_count = 0
async def add_node(self, node: TemporalNode) -> None:
async with self._lock:
if node.node_id in self._nodes:
return
if len(self._nodes) >= self._config.max_nodes:
oldest = self._insertion_order.popleft()
self._nodes.pop(oldest, None)
self._nodes[node.node_id] = node
self._insertion_order.append(node.node_id)
self._nodes_added += 1
async def add_edge(self, edge: TemporalEdge) -> None:
async with self._lock:
if edge.from_id not in self._nodes or edge.to_id not in self._nodes:
raise KeyError(f"Unknown node(s): {edge.from_id!r}, {edge.to_id!r}")
if self._config.consistency_check:
self._successors[edge.from_id].append(edge.to_id)
if self._has_cycle(edge.to_id):
self._successors[edge.from_id].remove(edge.to_id)
self._cycle_rejections += 1
raise ValueError(f"Edge would create a cycle")
self._successors[edge.from_id].append(edge.to_id)
self._predecessors[edge.to_id].append(edge.from_id)
self._edges.append(edge)
self._edges_added += 1
def _has_cycle(self, start: str) -> bool:
visited: set[str] = set()
in_stack: set[str] = set()
def dfs(node: str) -> bool:
visited.add(node)
in_stack.add(node)
for nxt in self._successors.get(node, []):
if nxt not in visited:
if dfs(nxt): return True
elif nxt in in_stack: return True
in_stack.discard(node)
return False
return dfs(start)
async def get_successors(self, node_id: str) -> Sequence[TemporalNode]:
async with self._lock:
return [self._nodes[nid] for nid in self._successors.get(node_id, []) if nid in self._nodes]
async def get_predecessors(self, node_id: str) -> Sequence[TemporalNode]:
async with self._lock:
return [self._nodes[nid] for nid in self._predecessors.get(node_id, []) if nid in self._nodes]
async def check_consistency(self) -> bool:
async with self._lock:
visited: set[str] = set()
in_stack: set[str] = set()
def dfs(node: str) -> bool:
visited.add(node); in_stack.add(node)
for nxt in self._successors.get(node, []):
if nxt not in visited:
if dfs(nxt): return True
elif nxt in in_stack: return True
in_stack.discard(node); return False
for nid in list(self._nodes):
if nid not in visited:
if dfs(nid): return False
return True
async def prune(self, horizon_ns: int) -> int:
async with self._lock:
stale = {nid for nid, n in self._nodes.items() if n.timestamp_ns < horizon_ns}
for nid in stale:
self._nodes.pop(nid)
self._successors.pop(nid, None)
self._predecessors.pop(nid, None)
self._insertion_order = deque(x for x in self._insertion_order if x not in stale)
self._edges = [e for e in self._edges if e.from_id not in stale and e.to_id not in stale]
self._prune_count += len(stale)
return len(stale)
def stats(self) -> dict[str, int]:
return {
"nodes": len(self._nodes),
"edges": len(self._edges),
"nodes_added_total": self._nodes_added,
"edges_added_total": self._edges_added,
"cycle_rejections": self._cycle_rejections,
"pruned_total": self._prune_count,
}
NullTemporalGraph — no-op sentinel
class NullTemporalGraph:
async def add_node(self, node: TemporalNode) -> None: pass
async def add_edge(self, edge: TemporalEdge) -> None: pass
async def get_successors(self, node_id: str) -> Sequence[TemporalNode]: return []
async def get_predecessors(self, node_id: str) -> Sequence[TemporalNode]: return []
async def check_consistency(self) -> bool: return True
async def prune(self, horizon_ns: int) -> int: return 0
def stats(self) -> dict[str, int]: return {}
Prometheus Metrics
| Metric |
Type |
Labels |
Description |
asi_temporal_graph_nodes_total |
Counter |
— |
Cumulative nodes added |
asi_temporal_graph_edges_total |
Counter |
— |
Cumulative edges added |
asi_temporal_graph_cycle_rejections_total |
Counter |
— |
Edges rejected due to cycle |
asi_temporal_graph_size |
Gauge |
— |
Current live node count |
asi_temporal_graph_prune_total |
Counter |
— |
Total nodes pruned |
12 Test Targets
test_add_node_increments_counter
test_duplicate_node_ignored
test_max_nodes_evicts_oldest
test_add_edge_unknown_node_raises
test_add_edge_creates_allen_relation
test_cycle_detection_raises
test_get_successors_returns_correct_nodes
test_get_predecessors_returns_correct_nodes
test_check_consistency_clean_dag
test_prune_removes_stale_nodes
test_prune_removes_dangling_edges
test_null_temporal_graph_no_ops
Phase 17 sub-phase tracker
Overview
TemporalGraphis the foundational data structure for Phase 17 — Temporal Reasoning & Predictive Cognition. It maintains a directed acyclic graph (DAG) of time-stamped cognitive events and world-state snapshots, annotated with Allen interval relations to capture the qualitative temporal relationships between events.Downstream components (
EventSequencer,PredictiveEngine,SchedulerCortex) all query theTemporalGraphto obtain causal and temporal context.Enum —
AllenRelationFrozen Dataclasses
Protocol —
TemporalGraphImplementation —
DictTemporalGraphNullTemporalGraph— no-op sentinelPrometheus Metrics
asi_temporal_graph_nodes_totalasi_temporal_graph_edges_totalasi_temporal_graph_cycle_rejections_totalasi_temporal_graph_sizeasi_temporal_graph_prune_total12 Test Targets
test_add_node_increments_countertest_duplicate_node_ignoredtest_max_nodes_evicts_oldesttest_add_edge_unknown_node_raisestest_add_edge_creates_allen_relationtest_cycle_detection_raisestest_get_successors_returns_correct_nodestest_get_predecessors_returns_correct_nodestest_check_consistency_clean_dagtest_prune_removes_stale_nodestest_prune_removes_dangling_edgestest_null_temporal_graph_no_opsPhase 17 sub-phase tracker