Skip to content

Phase 17.5 โ€” TemporalOrchestrator: Unified Temporal Control Plane (PHASE 17 COMPLETE ๐ŸŽ‰)ย #446

@web3guru888

Description

@web3guru888

Phase 17.5 โ€” TemporalOrchestrator

Phase: 17 โ€” Temporal Reasoning & Predictive Cognition
Component: TemporalOrchestrator โ€” unified control plane integrating all Phase 17 sub-components
Depends on: #434 (TemporalGraph), #437 (EventSequencer), #440 (PredictiveEngine), #443 (SchedulerCortex)
Completes Phase 17 ๐ŸŽ‰


1. Overview

TemporalOrchestrator is the Phase 17 capstone: a single entry point that composes EventSequencer, TemporalGraph, PredictiveEngine, and SchedulerCortex into a unified orchestration loop that drives temporal reasoning within the ASI-Build cognitive cycle.

CognitiveCycle
    โ”‚
    โ–ผ
TemporalOrchestrator โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”
    โ”‚                                                      โ”‚
    โ”œโ”€โ–บ EventSequencer (ingest events)                     โ”‚
    โ”‚        โ”‚                                             โ”‚
    โ”‚        โ–ผ                                             โ”‚
    โ”œโ”€โ–บ TemporalGraph (store time-ordered states)          โ”‚
    โ”‚        โ”‚                                             โ”‚
    โ”‚        โ–ผ                                             โ”‚
    โ”œโ”€โ–บ PredictiveEngine (train on windows, predict)       โ”‚
    โ”‚        โ”‚                                             โ”‚
    โ”‚        โ–ผ                                             โ”‚
    โ””โ”€โ–บ SchedulerCortex (schedule tasks from predictions) โ”€โ”˜
              โ”‚
              โ–ผ
       CognitiveCycle executor (run_tick dispatch)

2. OrchestratorPhase Enum (7-state lifecycle)

from enum import Enum, auto

class OrchestratorPhase(Enum):
    IDLE       = auto()   # waiting for next cycle trigger
    INGESTING  = auto()   # draining EventSequencer queue
    GRAPHING   = auto()   # updating TemporalGraph with new nodes/edges
    PREDICTING = auto()   # PredictiveEngine.predict() for active modules
    SCHEDULING = auto()   # SchedulerCortex.run_tick() + new task enqueue
    TICKING    = auto()   # executing dispatched tasks via CognitiveCycle
    SNAPSHOT   = auto()   # capturing OrchestratorSnapshot (periodic)

Mirrors the CycleState pattern from Phase 16 ReflectionCycle (7 clean states, no sub-states).


3. Frozen Dataclasses

from __future__ import annotations
from dataclasses import dataclass, field

@dataclass(frozen=True)
class OrchestratorSnapshot:
    snapshot_id:       str
    timestamp_ns:      int
    graph_nodes:       int
    pending_events:    int
    predictions_made:  int
    scheduled_tasks:   int
    cycle_latency_ms:  float


@dataclass(frozen=True)
class TemporalConfig:
    sequencer_config:       SequencerConfig
    predictor_config:       PredictorConfig
    scheduler_config:       SchedulerConfig
    snapshot_interval_s:    float = 10.0
    enable_temporal_graph:  bool  = True

TemporalConfig is the single nested-config entry point: callers construct one object and pass it to make_temporal_orchestrator().


4. TemporalOrchestrator Protocol

from typing import Protocol, runtime_checkable

@runtime_checkable
class TemporalOrchestrator(Protocol):
    async def ingest_event(self, event: CognitiveEvent) -> bool: ...
    async def run_cycle(self) -> OrchestratorSnapshot: ...
    def get_predictions(self, module: str) -> list[Prediction]: ...
    def get_schedule(self) -> list[ScheduleSlot]: ...
    async def stop(self) -> None: ...
    def stats(self) -> dict[str, int | float]: ...

@runtime_checkable allows isinstance(obj, TemporalOrchestrator) in integration tests without importing the concrete class.


5. AsyncTemporalOrchestrator Implementation

import asyncio, time, uuid
from collections import deque

class AsyncTemporalOrchestrator:
    """Composes all Phase 17 sub-components into a single temporal control plane."""

    def __init__(self, config: TemporalConfig) -> None:
        self._cfg      = config
        self._seq      = AsyncEventSequencer(config.sequencer_config)
        self._graph    = DictTemporalGraph(TemporalGraphConfig()) if config.enable_temporal_graph else NullTemporalGraph()
        self._pred     = AdaptivePredictiveEngine(config.predictor_config)
        self._sched    = AsyncSchedulerCortex(config.scheduler_config)
        self._phase    = OrchestratorPhase.IDLE
        self._stop_evt = asyncio.Event()
        self._snapshots: deque[OrchestratorSnapshot] = deque(maxlen=100)
        self._last_snapshot_ts: float = 0.0
        self._cycles_total: int = 0

    async def ingest_event(self, event: CognitiveEvent) -> bool:
        return await self._seq.ingest(event)

    async def run_cycle(self) -> OrchestratorSnapshot:
        t0 = time.monotonic_ns()
        self._phase = OrchestratorPhase.INGESTING

        # 1. Drain EventSequencer โ†’ flush windows
        windows = await self._seq.drain()

        # 2. GRAPHING โ€” add sequence windows as TemporalGraph nodes
        self._phase = OrchestratorPhase.GRAPHING
        if self._cfg.enable_temporal_graph:
            for w in windows:
                node = TemporalNode(
                    node_id=w.window_id,
                    module_id=w.module_id,
                    timestamp_ns=w.start_ns,
                    metrics=w.metrics,
                )
                await self._graph.add_node(node)

        # 3. PREDICTING โ€” train + predict per module
        self._phase = OrchestratorPhase.PREDICTING
        predictions_made = 0
        for w in windows:
            await self._pred.train(w.module_id, w)
            p = await self._pred.predict(w.module_id)
            if p is not None:
                predictions_made += 1
                # Convert prediction to ScheduledTask if confidence sufficient
                if p.confidence >= 0.8:
                    task = ScheduledTask(
                        task_id=f"pred-{p.prediction_id}",
                        module_id=p.module_id,
                        priority=TaskPriority.NORMAL,
                        deadline_ns=p.predicted_start_ns,
                        estimated_duration_ms=p.estimated_duration_ms,
                        payload={"prediction_basis": p.basis_window_ids},
                    )
                    await self._sched.enqueue(task)

        # 4. SCHEDULING โ€” run scheduler tick, collect ready tasks
        self._phase = OrchestratorPhase.SCHEDULING
        ready = await self._sched.run_tick()

        # 5. TICKING โ€” dispatch ready tasks to CognitiveCycle executor
        self._phase = OrchestratorPhase.TICKING
        for task in ready:
            await self._dispatch(task)

        # 6. SNAPSHOT โ€” periodic health capture
        self._phase = OrchestratorPhase.SNAPSHOT
        now = time.monotonic()
        latency_ms = (time.monotonic_ns() - t0) / 1e6
        self._cycles_total += 1

        snap = OrchestratorSnapshot(
            snapshot_id=str(uuid.uuid4()),
            timestamp_ns=time.time_ns(),
            graph_nodes=await self._graph.node_count(),
            pending_events=self._seq.pending_count(),
            predictions_made=predictions_made,
            scheduled_tasks=len(ready),
            cycle_latency_ms=latency_ms,
        )
        if now - self._last_snapshot_ts >= self._cfg.snapshot_interval_s:
            self._snapshots.append(snap)
            self._last_snapshot_ts = now

        self._phase = OrchestratorPhase.IDLE
        return snap

    async def _dispatch(self, task: ScheduledTask) -> None:
        """Hook for CognitiveCycle executor โ€” override in subclass or inject callback."""
        pass

    def get_predictions(self, module: str) -> list[Prediction]:
        return self._pred.get_history(module)

    def get_schedule(self) -> list[ScheduleSlot]:
        return self._sched.peek_schedule()

    async def stop(self) -> None:
        self._stop_evt.set()
        await self._seq.flush()
        await self._sched.stop()

    def stats(self) -> dict[str, int | float]:
        return {
            "cycles_total": self._cycles_total,
            "phase": self._phase.name,
            "snapshots_stored": len(self._snapshots),
            "graph_nodes": self._graph.node_count_sync(),
        }

6. NullTemporalOrchestrator

class NullTemporalOrchestrator:
    """No-op orchestrator for tests and feature-flag disabled paths."""

    async def ingest_event(self, event: CognitiveEvent) -> bool:
        return False

    async def run_cycle(self) -> OrchestratorSnapshot:
        return OrchestratorSnapshot(
            snapshot_id="null", timestamp_ns=0,
            graph_nodes=0, pending_events=0,
            predictions_made=0, scheduled_tasks=0,
            cycle_latency_ms=0.0,
        )

    def get_predictions(self, module: str) -> list[Prediction]:
        return []

    def get_schedule(self) -> list[ScheduleSlot]:
        return []

    async def stop(self) -> None:
        pass

    def stats(self) -> dict[str, int | float]:
        return {}

7. make_temporal_orchestrator() Factory

def make_temporal_orchestrator(
    config: TemporalConfig | None = None,
    *,
    null: bool = False,
) -> TemporalOrchestrator:
    if null:
        return NullTemporalOrchestrator()
    cfg = config or TemporalConfig(
        sequencer_config=SequencerConfig(),
        predictor_config=PredictorConfig(),
        scheduler_config=SchedulerConfig(),
    )
    return AsyncTemporalOrchestrator(cfg)

8. Prometheus Metrics

Metric Type Labels Description
asibuild_orchestration_cycles_total Counter result (ok/error) Completed orchestration cycles
asibuild_cycle_latency_seconds Histogram phase Per-cycle wall-clock latency
asibuild_predictions_acted_on_total Counter module_id Predictions converted to ScheduledTasks
asibuild_temporal_graph_nodes_gauge Gauge โ€” Current TemporalGraph node count
asibuild_schedule_queue_depth_gauge Gauge โ€” SchedulerCortex current queue depth

9. Test Targets (12)

# Target Assertion
1 test_ingest_event_returns_true await orch.ingest_event(event) returns True
2 test_run_cycle_full_pipeline snapshot predictions_made >= 0, no exception
3 test_run_cycle_graph_disabled enable_temporal_graph=False โ†’ no graph nodes
4 test_low_confidence_prediction_not_scheduled confidence < 0.8 โ†’ 0 scheduled tasks
5 test_high_confidence_prediction_scheduled confidence >= 0.8 โ†’ โ‰ฅ1 scheduled tasks
6 test_stop_drains_queue stop() โ†’ sequencer queue empty
7 test_stop_idempotent double stop() โ†’ no exception
8 test_snapshot_captured_on_interval snapshot recorded when โ‰ฅ snapshot_interval_s elapsed
9 test_null_orchestrator_run_cycle NullTemporalOrchestrator().run_cycle() returns zero snapshot
10 test_factory_null_flag make_temporal_orchestrator(null=True) โ†’ NullTemporalOrchestrator
11 test_isinstance_protocol isinstance(orch, TemporalOrchestrator) โ†’ True
12 test_stats_keys_present stats() contains cycles_total, phase, snapshots_stored

Phase 17 Sub-Phase Tracker

Sub-Phase Component Issue Status
17.1 TemporalGraph #434 ๐ŸŸก Spec'd
17.2 EventSequencer #437 ๐ŸŸก Spec'd
17.3 PredictiveEngine #440 ๐ŸŸก Spec'd
17.4 SchedulerCortex #443 ๐ŸŸก Spec'd
17.5 TemporalOrchestrator #446 ๐ŸŸก Spec'd โ€” PHASE 17 COMPLETE ๐ŸŽ‰

Labels: enhancement, phase-17, temporal-reasoning
Milestone: Phase 17 โ€” Temporal Reasoning & Predictive Cognition

Metadata

Metadata

Assignees

No one assigned

    Labels

    enhancementNew feature or request

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions

      โšก