Skip to content

Phase 17.2 — EventSequencer: streaming causal time-ordered event processor #437

@web3guru888

Description

@web3guru888

Phase 17.2 — EventSequencer 🕐

Phase: 17 — Temporal Reasoning & Predictive Cognition
Component: EventSequencer
Depends on: Phase 17.1 TemporalGraph (#434), Phase 13 WorldModel, Phase 16 ReflectionCycle (#430)
Labels: enhancement, phase-17, temporal-reasoning


Motivation

TemporalGraph stores Allen-relation edges between cognitive events, but something must feed it with a time-ordered, causally-validated stream. Raw events emitted by CognitiveCycle modules arrive concurrently and potentially out-of-order. EventSequencer is the gatekeeper: it buffers, reorders, validates causal chains, and windows events before handing them off to TemporalGraph.add_node().

Pipeline position:

CognitiveCycle modules
       │  (raw CognitiveEvents, unordered)
       ▼
 AsyncEventSequencer
   ├── heapq buffer (timestamp_ns ordering)
   ├── causal parent validation
   ├── sliding/tumbling window accumulator
   └── drain() → TemporalGraph.add_node()
       │
       ▼
  TemporalGraph
       │
       ▼
  PredictiveEngine (Phase 17.3)

Data Structures

OrderPolicy enum

from enum import Enum, auto

class OrderPolicy(Enum):
    STRICT      = auto()  # reject any out-of-order event (causal_check=True required)
    RELAXED     = auto()  # reorder up to max_buffer depth before emitting
    BEST_EFFORT = auto()  # emit as-arrived, reorder opportunistically

Frozen dataclasses

from __future__ import annotations
from dataclasses import dataclass, field
from typing import Any, Mapping

@dataclass(frozen=True)
class CognitiveEvent:
    event_id:         str
    source_module:    str
    event_type:       str
    payload:          Mapping[str, Any]
    timestamp_ns:     int                   # monotonic nanoseconds (no float rounding)
    causal_parent_id: str | None = None     # event_id of causal predecessor

    def __lt__(self, other: "CognitiveEvent") -> bool:
        return self.timestamp_ns < other.timestamp_ns


@dataclass(frozen=True)
class WindowedAggregate:
    window_id:   str                  # f"{start_ns}_{end_ns}"
    start_ns:    int
    end_ns:      int
    event_count: int
    event_types: frozenset[str]
    summary:     Mapping[str, Any]    # module → count, dominant event_type, etc.


@dataclass(frozen=True)
class SequencerConfig:
    window_size_ms:    int  = 500      # tumbling window width in milliseconds
    max_buffer:        int  = 10_000   # maximum heapq depth before eviction
    causal_check:      bool = True     # validate causal_parent_id chain
    drop_out_of_order: bool = False    # False→reorder, True→drop
    order_policy:      OrderPolicy = field(default=OrderPolicy.RELAXED)

Protocol

from typing import AsyncIterator, Protocol, runtime_checkable

@runtime_checkable
class EventSequencer(Protocol):
    async def ingest(self, event: CognitiveEvent) -> bool:
        """Buffer event; returns True if accepted, False if dropped."""
        ...

    async def drain(self) -> AsyncIterator[CognitiveEvent]:
        """Yield events in timestamp_ns order until buffer empty."""
        ...

    def get_window(self, window_id: str) -> WindowedAggregate | None:
        """Return accumulated window by ID, or None if not found."""
        ...

    def flush_windows(self) -> list[WindowedAggregate]:
        """Flush and return all completed windows."""
        ...

    def stats(self) -> dict[str, int | float]:
        """Return buffer_size, dropped, causal_violations, windows_flushed."""
        ...

Implementation: AsyncEventSequencer

import asyncio
import heapq
import time
from collections import defaultdict
from typing import AsyncIterator

class AsyncEventSequencer:
    def __init__(self, config: SequencerConfig) -> None:
        self._cfg = config
        self._heap: list[CognitiveEvent] = []          # min-heap by timestamp_ns
        self._lock = asyncio.Lock()
        self._seen_ids: set[str] = set()               # for causal parent validation
        self._windows: dict[str, dict] = {}            # window_id → accumulator
        self._window_size_ns = config.window_size_ms * 1_000_000
        # counters
        self._ingested  = 0
        self._dropped   = 0
        self._causal_viol = 0
        self._windows_flushed = 0

    async def ingest(self, event: CognitiveEvent) -> bool:
        async with self._lock:
            # causal check
            if self._cfg.causal_check and event.causal_parent_id is not None:
                if event.causal_parent_id not in self._seen_ids:
                    self._causal_viol += 1
                    if self._cfg.order_policy == OrderPolicy.STRICT:
                        self._dropped += 1
                        return False

            # buffer overflow → LRU eviction (drop oldest)
            if len(self._heap) >= self._cfg.max_buffer:
                heapq.heappop(self._heap)   # evict smallest timestamp
                self._dropped += 1

            heapq.heappush(self._heap, event)
            self._seen_ids.add(event.event_id)
            self._ingested += 1
            self._accumulate_window(event)
            return True

    def _accumulate_window(self, event: CognitiveEvent) -> None:
        """Assign event to its tumbling window bucket."""
        bucket = (event.timestamp_ns // self._window_size_ns) * self._window_size_ns
        wid = f"{bucket}_{bucket + self._window_size_ns}"
        if wid not in self._windows:
            self._windows[wid] = {
                "start_ns": bucket,
                "end_ns": bucket + self._window_size_ns,
                "event_count": 0,
                "event_types": set(),
                "module_counts": defaultdict(int),
            }
        w = self._windows[wid]
        w["event_count"] += 1
        w["event_types"].add(event.event_type)
        w["module_counts"][event.source_module] += 1

    async def drain(self) -> AsyncIterator[CognitiveEvent]:
        async with self._lock:
            while self._heap:
                yield heapq.heappop(self._heap)

    def get_window(self, window_id: str) -> WindowedAggregate | None:
        w = self._windows.get(window_id)
        if w is None:
            return None
        return WindowedAggregate(
            window_id=window_id,
            start_ns=w["start_ns"],
            end_ns=w["end_ns"],
            event_count=w["event_count"],
            event_types=frozenset(w["event_types"]),
            summary=dict(w["module_counts"]),
        )

    def flush_windows(self) -> list[WindowedAggregate]:
        now_ns = time.monotonic_ns()
        result = []
        to_delete = []
        for wid, w in self._windows.items():
            if w["end_ns"] <= now_ns:
                result.append(self.get_window(wid))
                to_delete.append(wid)
        for wid in to_delete:
            del self._windows[wid]
        self._windows_flushed += len(result)
        return result

    def stats(self) -> dict[str, int | float]:
        return {
            "ingested":        self._ingested,
            "dropped":         self._dropped,
            "causal_violations": self._causal_viol,
            "windows_flushed": self._windows_flushed,
            "buffer_size":     len(self._heap),
            "buffer_utilization": len(self._heap) / self._cfg.max_buffer,
        }

NullSequencer (no-op)

class NullSequencer:
    """No-op sequencer for testing / dependency injection."""

    async def ingest(self, event: CognitiveEvent) -> bool:
        return True

    async def drain(self) -> AsyncIterator[CognitiveEvent]:
        return
        yield  # make it an async generator

    def get_window(self, window_id: str) -> WindowedAggregate | None:
        return None

    def flush_windows(self) -> list[WindowedAggregate]:
        return []

    def stats(self) -> dict[str, int | float]:
        return {}

Prometheus Metrics

Metric Type Labels Description
asi_sequencer_events_ingested_total Counter source_module, event_type Events accepted into heapq buffer
asi_sequencer_events_dropped_total Counter reason (overflow/causal/strict) Events dropped for any reason
asi_sequencer_windows_flushed_total Counter Completed tumbling windows flushed
asi_sequencer_causal_violations_total Counter source_module Causal parent not found in seen set
asi_sequencer_buffer_utilization_ratio Gauge len(heap) / max_buffer

Test Targets (12)

  1. test_ingest_returns_true_on_success
  2. test_ingest_returns_false_on_strict_causal_violation
  3. test_ingest_drops_oldest_on_overflow
  4. test_drain_yields_timestamp_ordered
  5. test_drain_empty_buffer_yields_nothing
  6. test_causal_violation_increments_counter
  7. test_causal_parent_accepted_when_parent_seen_first
  8. test_window_accumulates_events_in_bucket
  9. test_flush_windows_returns_completed_only
  10. test_null_sequencer_always_accepts
  11. test_stats_reflects_all_counters
  12. test_order_policy_best_effort_never_drops

Integration: CognitiveCycle._run_step() hook

# In CognitiveCycle._run_step():
event = CognitiveEvent(
    event_id=str(uuid4()),
    source_module=module.name,
    event_type="module_output",
    payload=result,
    timestamp_ns=time.monotonic_ns(),
    causal_parent_id=prev_event_id,
)
accepted = await self._sequencer.ingest(event)
if accepted:
    async for ordered_event in self._sequencer.drain():
        await self._temporal_graph.add_node(ordered_event.event_id, ordered_event.timestamp_ns)

Phase 17 Sub-phase Tracker

Sub-phase Component Issue Status
17.1 TemporalGraph #434 🟡 In Progress
17.2 EventSequencer #437 🟡 In Progress
17.3 PredictiveEngine 🔲 Pending
17.4 TemporalCalibrator 🔲 Pending
17.5 TemporalOrchestrator 🔲 Pending

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions