Phase 8.2 — CausalGraph: Temporal Cause-Effect Relationship Mapping
Phase: 8 — Introspection & Explainability
Sub-phase: 8.2
Depends on: #276 (Phase 8.1 DecisionTracer)
Labels: enhancement, phase-8, architecture
Overview
CausalGraph is a directed acyclic graph (DAG) structure that records temporal cause-effect relationships between DecisionTrace events produced by the DecisionTracer (Phase 8.1). It enables post-hoc reasoning about which past decisions influenced later outcomes and surfaces the graph for the ExplainAPI (Phase 8.3).
DecisionTracer ──produces──► DecisionTrace
│
CausalEdge (cause → effect, weight, lag_ms)
│
CausalGraph (DAG, topological order, cycle guard)
│
CausalGraphSnapshot ──► ExplainAPI (Phase 8.3)
New Types
CausalEdge (frozen dataclass)
@dataclass(frozen=True)
class CausalEdge:
cause_trace_id: str # DecisionTrace.trace_id of the cause
effect_trace_id: str # DecisionTrace.trace_id of the effect
weight: float # correlation strength in [0.0, 1.0]
lag_ms: float # temporal lag (effect_ts - cause_ts) in ms
edge_type: str = "temporal" # "temporal" | "saliency" | "attention" | "shapley"
CausalNode (frozen dataclass)
@dataclass(frozen=True)
class CausalNode:
trace_id: str
module_id: str
phase: str # CyclePhase value at decision time
timestamp_ms: float # epoch milliseconds
summary: str # short label for visualisation
CausalGraphSnapshot (dataclass)
@dataclass
class CausalGraphSnapshot:
nodes: list[CausalNode]
edges: list[CausalEdge]
captured_at: float # time.time()
max_depth: int # longest path length from any root
root_ids: list[str] # trace_ids with in-degree == 0
leaf_ids: list[str] # trace_ids with out-degree == 0
CausalGraph (main class)
class CausalGraph:
"""Thread-safe DAG of causal relationships between DecisionTrace events."""
def __init__(
self,
max_nodes: int = 1024,
eviction: Literal["lru", "fifo"] = "lru",
) -> None: ...
# Mutation
def add_node(self, node: CausalNode) -> None: ...
def add_edge(self, edge: CausalEdge) -> None: ... # raises CycleDetectedError
# Query
def ancestors(self, trace_id: str, depth: int = 3) -> list[CausalNode]: ...
def descendants(self, trace_id: str, depth: int = 3) -> list[CausalNode]: ...
def critical_path(self) -> list[CausalNode]: ... # longest path in DAG
def topological_sort(self) -> list[CausalNode]: ...
# Serialisation
def snapshot(self) -> CausalGraphSnapshot: ...
def to_dot(self) -> str: ... # Graphviz DOT language output
def to_json(self) -> str: ... # JSON adjacency list
CycleDetectedError(ValueError) — raised by add_edge() when adding an edge would create a cycle.
CausalGraphBuilder — automatic edge inference from DecisionTraces
class CausalGraphBuilder:
"""Watches a TraceStorage and infers CausalEdge entries automatically."""
def __init__(
self,
graph: CausalGraph,
storage: TraceStorage, # from Phase 8.1
window_ms: float = 5000.0, # correlation window
min_weight: float = 0.1, # discard weak edges
edge_strategy: Literal["saliency", "uniform", "attention"] = "saliency",
) -> None: ...
async def infer_edges(self, new_trace: DecisionTrace) -> int:
"""Scan recent traces in storage; add edges where contributor overlap
exceeds min_weight. Returns number of edges added."""
...
def build_node(self, trace: DecisionTrace) -> CausalNode: ...
build_causal_graph() factory
def build_causal_graph(
storage: TraceStorage,
*,
max_nodes: int = 1024,
eviction: Literal["lru", "fifo"] = "lru",
window_ms: float = 5000.0,
min_weight: float = 0.1,
edge_strategy: Literal["saliency", "uniform", "attention"] = "saliency",
) -> tuple[CausalGraph, CausalGraphBuilder]:
"""Return a (graph, builder) pair ready for integration."""
...
Cycle Detection Algorithm
CausalGraph uses DFS-based cycle detection on every add_edge call:
- Maintain
in_degree dict (updated on every add_edge call).
- Before committing a new edge
(u → v), run DFS from v to check if u is reachable from v. If yes → raise CycleDetectedError.
- Time complexity: O(V + E) per insertion (bounded by
max_nodes).
Node Eviction Strategy
When len(nodes) >= max_nodes:
| Policy |
Eviction target |
Use case |
lru |
Least-recently-accessed node and its incident edges |
Default; preserves recent causal chains |
fifo |
Oldest-inserted node by timestamp_ms |
Deterministic in tests |
Integration with DecisionTracer (Phase 8.1)
# In CognitiveCycle._run_phase8():
tracer: DecisionTracer = self._phase8_tracer
graph: CausalGraph = self._phase8_graph
builder: CausalGraphBuilder = self._phase8_builder
trace = await tracer.trace(context, strategy="saliency")
node = builder.build_node(trace)
graph.add_node(node)
edges_added = await builder.infer_edges(trace)
Prometheus Metrics
Pre-initialise all counters/gauges to 0 at module import to avoid "no data" gaps:
| Metric |
Type |
Labels |
Description |
asi_causal_nodes_total |
Gauge |
— |
Current live node count |
asi_causal_edges_total |
Gauge |
edge_type |
Current live edge count by type |
asi_causal_cycle_rejected_total |
Counter |
— |
Edges rejected due to cycle detection |
asi_causal_eviction_total |
Counter |
policy |
Nodes evicted (lru/fifo) |
asi_causal_infer_duration_seconds |
Histogram |
— |
infer_edges() wall-clock time |
Test Targets (12)
| # |
Test |
Covers |
| 1 |
test_add_node_deduplicates |
Same trace_id twice → still 1 node |
| 2 |
test_add_edge_cycle_raises |
A→B→C→A raises CycleDetectedError |
| 3 |
test_topological_sort_order |
DAG of 5 nodes → correct topo order |
| 4 |
test_ancestors_depth |
ancestors(leaf, depth=2) returns ≤2 hops |
| 5 |
test_descendants_depth |
descendants(root, depth=2) returns ≤2 hops |
| 6 |
test_critical_path_linear |
Linear DAG → path = all nodes |
| 7 |
test_lru_eviction |
max_nodes=3; access node A, add 2 more; node B (LRU) evicted |
| 8 |
test_fifo_eviction |
max_nodes=3; oldest inserted node evicted |
| 9 |
test_to_dot_output |
to_dot() contains digraph, node labels, edge arrows |
| 10 |
test_infer_edges_window |
Traces outside window_ms → no edge added |
| 11 |
test_infer_edges_min_weight |
Weak contributors below min_weight → no edge |
| 12 |
test_snapshot_fields |
snapshot() root_ids/leaf_ids correct for diamond DAG |
Module Placement
asi/
└── phase8/
├── decision_tracer.py # Phase 8.1 (existing)
└── causal_graph.py # Phase 8.2 (NEW)
Implementation Order
CausalEdge + CausalNode frozen dataclasses
CycleDetectedError
- Internal
_AdjList helper (dict[str, set[str]] for forward + reverse)
CausalGraph.__init__() — adjacency lists, in_degree, access-order tracking
CausalGraph.add_node() + eviction logic
CausalGraph.add_edge() + DFS cycle check
CausalGraph.ancestors() / descendants() (BFS up to depth)
CausalGraph.critical_path() (DP on DAG)
CausalGraph.topological_sort() (Kahn's algorithm)
CausalGraph.to_dot() / to_json() / snapshot()
CausalGraphBuilder + infer_edges()
build_causal_graph() factory
- Prometheus pre-init block
- Unit tests (12 targets above)
Phase 8.2 — CausalGraph: Temporal Cause-Effect Relationship Mapping
Phase: 8 — Introspection & Explainability
Sub-phase: 8.2
Depends on: #276 (Phase 8.1 DecisionTracer)
Labels:
enhancement,phase-8,architectureOverview
CausalGraphis a directed acyclic graph (DAG) structure that records temporal cause-effect relationships betweenDecisionTraceevents produced by theDecisionTracer(Phase 8.1). It enables post-hoc reasoning about which past decisions influenced later outcomes and surfaces the graph for theExplainAPI(Phase 8.3).New Types
CausalEdge(frozen dataclass)CausalNode(frozen dataclass)CausalGraphSnapshot(dataclass)CausalGraph(main class)CycleDetectedError(ValueError)— raised byadd_edge()when adding an edge would create a cycle.CausalGraphBuilder— automatic edge inference from DecisionTracesbuild_causal_graph()factoryCycle Detection Algorithm
CausalGraphuses DFS-based cycle detection on everyadd_edgecall:in_degreedict (updated on everyadd_edgecall).(u → v), run DFS fromvto check ifuis reachable fromv. If yes → raiseCycleDetectedError.max_nodes).Node Eviction Strategy
When
len(nodes) >= max_nodes:lrufifotimestamp_msIntegration with
DecisionTracer(Phase 8.1)Prometheus Metrics
Pre-initialise all counters/gauges to 0 at module import to avoid "no data" gaps:
asi_causal_nodes_totalasi_causal_edges_totaledge_typeasi_causal_cycle_rejected_totalasi_causal_eviction_totalpolicyasi_causal_infer_duration_secondsinfer_edges()wall-clock timeTest Targets (12)
test_add_node_deduplicatestrace_idtwice → still 1 nodetest_add_edge_cycle_raisesCycleDetectedErrortest_topological_sort_ordertest_ancestors_depthancestors(leaf, depth=2)returns ≤2 hopstest_descendants_depthdescendants(root, depth=2)returns ≤2 hopstest_critical_path_lineartest_lru_evictionmax_nodes=3; access node A, add 2 more; node B (LRU) evictedtest_fifo_evictionmax_nodes=3; oldest inserted node evictedtest_to_dot_outputto_dot()containsdigraph, node labels, edge arrowstest_infer_edges_windowwindow_ms→ no edge addedtest_infer_edges_min_weightmin_weight→ no edgetest_snapshot_fieldssnapshot()root_ids/leaf_ids correct for diamond DAGModule Placement
Implementation Order
CausalEdge+CausalNodefrozen dataclassesCycleDetectedError_AdjListhelper (dict[str, set[str]]for forward + reverse)CausalGraph.__init__()— adjacency lists,in_degree, access-order trackingCausalGraph.add_node()+ eviction logicCausalGraph.add_edge()+ DFS cycle checkCausalGraph.ancestors()/descendants()(BFS up todepth)CausalGraph.critical_path()(DP on DAG)CausalGraph.topological_sort()(Kahn's algorithm)CausalGraph.to_dot()/to_json()/snapshot()CausalGraphBuilder+infer_edges()build_causal_graph()factory