Overview
Phase 15.2 introduces HotSwapper — the component responsible for performing live, zero-downtime replacement of cognitive modules within a running CognitiveCycle. It consumes ModuleRegistry (Phase 15.1) to discover staged versions and executes the swap atomically with rollback on failure.
Motivation
Classical AI systems require full restart to deploy updated modules. ASI-Build's self-modification loop (Phase 14: CodeSynthesiser → SandboxRunner → TestHarness → PatchSelector → SynthesisAudit) produces validated, registered patches. HotSwapper closes the loop by applying those patches while the system is running — enabling continuous self-improvement without service interruption.
Core Types
from __future__ import annotations
import asyncio
import dataclasses
import enum
import importlib
import logging
import time
from typing import Any, Callable, Protocol, runtime_checkable
logger = logging.getLogger(__name__)
class SwapResult(enum.Enum):
SUCCESS = "success" # module replaced and validated
ROLLBACK = "rollback" # swap attempted, validation failed, reverted
SKIPPED = "skipped" # no staged version available
ERROR = "error" # unexpected exception during swap
@dataclasses.dataclass(frozen=True)
class SwapEvent:
module_name: str
from_version: int
to_version: int
result: SwapResult
duration_ms: float
error: str | None = None
@dataclasses.dataclass(frozen=True)
class SwapConfig:
validation_timeout_s: float = 5.0 # max seconds to run post-swap smoke test
max_rollback_attempts: int = 3 # retry rollback if first attempt fails
emit_audit_event: bool = True # forward SwapEvent to SynthesisAudit
Protocol
@runtime_checkable
class HotSwapper(Protocol):
async def swap(
self,
module_name: str,
loader: Callable[[int], Any],
validator: Callable[[Any], bool],
) -> SwapEvent:
"""Atomically replace the live module with its staged version."""
...
async def swap_all_staged(
self,
loader: Callable[[str, int], Any],
validator: Callable[[str, Any], bool],
) -> list[SwapEvent]:
"""Iterate over all STAGED modules and attempt a swap for each."""
...
def last_event(self, module_name: str) -> SwapEvent | None: ...
def stats(self) -> dict[str, int]: ...
Reference Implementation: LiveHotSwapper
class LiveHotSwapper:
"""Thread-safe, asyncio-native hot-swapper backed by ModuleRegistry."""
def __init__(self, registry: ModuleRegistry, config: SwapConfig | None = None) -> None:
self._registry = registry
self._cfg = config or SwapConfig()
self._locks: dict[str, asyncio.Lock] = {}
self._history: dict[str, SwapEvent] = {}
# Prometheus counters (registered lazily)
self._swaps_total = _counter("asi_hotswapper_swaps_total", ["module", "result"])
self._swap_duration = _histogram("asi_hotswapper_swap_duration_seconds", ["module"])
def _lock_for(self, name: str) -> asyncio.Lock:
if name not in self._locks:
self._locks[name] = asyncio.Lock()
return self._locks[name]
async def swap(
self,
module_name: str,
loader: Callable[[int], Any],
validator: Callable[[Any], bool],
) -> SwapEvent:
async with self._lock_for(module_name):
staged = self._registry.latest_staged(module_name)
if staged is None:
event = SwapEvent(
module_name=module_name, from_version=0,
to_version=0, result=SwapResult.SKIPPED,
duration_ms=0.0,
)
self._history[module_name] = event
return event
from_ver = self._registry.latest_version(module_name, status=ModuleStatus.ACTIVE)
t0 = time.monotonic()
try:
new_obj = loader(staged.version)
valid = await asyncio.wait_for(
asyncio.get_running_loop().run_in_executor(None, validator, new_obj),
timeout=self._cfg.validation_timeout_s,
)
if valid:
self._registry.set_status(module_name, staged.version, ModuleStatus.ACTIVE)
if from_ver:
self._registry.set_status(module_name, from_ver, ModuleStatus.ARCHIVED)
result = SwapResult.SUCCESS
error = None
else:
self._registry.set_status(module_name, staged.version, ModuleStatus.REVERTED)
result = SwapResult.ROLLBACK
error = "validator returned False"
except Exception as exc:
self._registry.set_status(module_name, staged.version, ModuleStatus.REVERTED)
result = SwapResult.ERROR
error = str(exc)
logger.exception("HotSwapper error on %s v%d", module_name, staged.version)
duration_ms = (time.monotonic() - t0) * 1000
event = SwapEvent(
module_name=module_name, from_version=from_ver or 0,
to_version=staged.version, result=result,
duration_ms=duration_ms, error=error,
)
self._history[module_name] = event
self._swaps_total.labels(module=module_name, result=result.value).inc()
self._swap_duration.labels(module=module_name).observe(duration_ms / 1000)
return event
async def swap_all_staged(
self,
loader: Callable[[str, int], Any],
validator: Callable[[str, Any], bool],
) -> list[SwapEvent]:
staged_modules = self._registry.list_staged_modules()
tasks = [
self.swap(name, lambda v, n=name: loader(n, v), lambda obj, n=name: validator(n, obj))
for name in staged_modules
]
return list(await asyncio.gather(*tasks, return_exceptions=False))
def last_event(self, module_name: str) -> SwapEvent | None:
return self._history.get(module_name)
def stats(self) -> dict[str, int]:
from collections import Counter
c: Counter[str] = Counter()
for ev in self._history.values():
c[ev.result.value] += 1
return dict(c)
ModuleRegistry Extension (15.1 → 15.2)
HotSwapper requires two new methods on ModuleRegistry:
def latest_staged(self, module_name: str) -> ModuleVersion | None:
"""Return the highest-version STAGED entry, or None."""
versions = [
v for v in self.list_versions(module_name)
if v.status == ModuleStatus.STAGED
]
return max(versions, key=lambda v: v.version, default=None)
def list_staged_modules(self) -> list[str]:
"""Return names of all modules that have at least one STAGED version."""
...
CognitiveCycle Integration
class CognitiveCycle:
def __init__(self, ..., swapper: HotSwapper | None = None) -> None:
...
self._swapper = swapper
async def _synthesis_step(self) -> None:
# ... existing: synthesise → sandbox → harness → selector → audit → registry.register(STAGED)
if self._swapper:
events = await self._swapper.swap_all_staged(
loader=self._load_module,
validator=self._validate_module,
)
for ev in events:
logger.info("HotSwap %s v%d→v%d: %s", ev.module_name,
ev.from_version, ev.to_version, ev.result.value)
Data-Flow Diagram
SynthesisAudit (audit)
│ SYNTHESIS_COMPLETE
▼
ModuleRegistry.register(status=STAGED) ◄── Phase 15.1
│ list_staged_modules()
▼
HotSwapper.swap_all_staged()
│
├── loader(name, version) → new_obj
├── validator(name, new_obj) → bool
│
├─[valid]──► registry.set_status(ACTIVE) / old→ARCHIVED
└─[invalid]─► registry.set_status(REVERTED)
Prometheus Metrics
| Metric |
Type |
Labels |
asi_hotswapper_swaps_total |
Counter |
module, result |
asi_hotswapper_swap_duration_seconds |
Histogram |
module |
asi_hotswapper_active_modules |
Gauge |
module |
asi_hotswapper_staged_queue_depth |
Gauge |
— |
asi_hotswapper_rollbacks_total |
Counter |
module |
Example PromQL
# Success rate over 5 min
rate(asi_hotswapper_swaps_total{result="success"}[5m])
/ rate(asi_hotswapper_swaps_total[5m])
# P95 swap latency
histogram_quantile(0.95, rate(asi_hotswapper_swap_duration_seconds_bucket[5m]))
Type-Safety Table
| Symbol |
mypy annotation |
isinstance check |
HotSwapper |
Protocol |
✅ (runtime_checkable) |
SwapConfig |
frozen dataclass |
✅ |
SwapEvent |
frozen dataclass |
✅ |
SwapResult |
str Enum |
✅ |
loader |
Callable[[int], Any] |
— |
validator |
Callable[[Any], bool] |
— |
Test Targets
test_swap_success — loader+validator both succeed → ACTIVE, old → ARCHIVED
test_swap_rollback — validator returns False → REVERTED, SwapResult.ROLLBACK
test_swap_error — loader raises → REVERTED, SwapResult.ERROR
test_swap_skipped — no STAGED version → SwapResult.SKIPPED
test_swap_timeout — validator times out → SwapResult.ERROR
test_swap_all_staged — multiple staged modules → correct events list
test_concurrent_swap — asyncio.gather on same module → serialised via lock
test_last_event — history populated correctly per module
test_stats_counter — stats() reflects outcome distribution
test_registry_integration — end-to-end: register STAGED → swap → ACTIVE
test_no_swapper_cognitive_cycle — swapper=None path is a no-op
test_prometheus_labels — metrics emitted with correct module+result labels
Implementation Order
- Add
latest_staged() and list_staged_modules() to InMemoryModuleRegistry
- Define
SwapResult enum and SwapEvent + SwapConfig frozen dataclasses
- Define
HotSwapper Protocol
- Implement
LiveHotSwapper.__init__() with lock dict and Prometheus helpers
- Implement
LiveHotSwapper.swap() — lock acquire → staged lookup → load → validate → commit/revert
- Implement
LiveHotSwapper.swap_all_staged() — gather over staged modules
- Implement
last_event() and stats() accessors
- Wire
HotSwapper into CognitiveCycle._synthesis_step()
- Add
_load_module() and _validate_module() stub hooks to CognitiveCycle
- Register Prometheus metrics (Counter + Histogram + Gauge)
- Write unit tests (mock registry + mock loader/validator)
- Write integration test (real InMemoryModuleRegistry end-to-end)
- Add Grafana dashboard panel definitions (4 panels)
- Update wiki and sidebar
Phase 15 Sub-Phase Tracker
| Sub-phase |
Component |
Issue |
Status |
| 15.1 |
ModuleRegistry |
#401 |
🟡 open |
| 15.2 |
HotSwapper |
this issue |
🟡 open |
| 15.3 |
RollbackCoordinator |
— |
⏳ |
| 15.4 |
CapabilityIndex |
— |
⏳ |
| 15.5 |
SelfModificationAudit |
— |
⏳ |
Part of the Phase 15 — Runtime Self-Modification & Hot-Reload Architecture track.
Overview
Phase 15.2 introduces
HotSwapper— the component responsible for performing live, zero-downtime replacement of cognitive modules within a runningCognitiveCycle. It consumesModuleRegistry(Phase 15.1) to discover staged versions and executes the swap atomically with rollback on failure.Motivation
Classical AI systems require full restart to deploy updated modules. ASI-Build's self-modification loop (Phase 14: CodeSynthesiser → SandboxRunner → TestHarness → PatchSelector → SynthesisAudit) produces validated, registered patches.
HotSwappercloses the loop by applying those patches while the system is running — enabling continuous self-improvement without service interruption.Core Types
Protocol
Reference Implementation:
LiveHotSwapperModuleRegistryExtension (15.1 → 15.2)HotSwapperrequires two new methods onModuleRegistry:CognitiveCycleIntegrationData-Flow Diagram
Prometheus Metrics
asi_hotswapper_swaps_totalmodule,resultasi_hotswapper_swap_duration_secondsmoduleasi_hotswapper_active_modulesmoduleasi_hotswapper_staged_queue_depthasi_hotswapper_rollbacks_totalmoduleExample PromQL
Type-Safety Table
isinstancecheckHotSwapperProtocolSwapConfigfrozen dataclassSwapEventfrozen dataclassSwapResultstr EnumloaderCallable[[int], Any]validatorCallable[[Any], bool]Test Targets
test_swap_success— loader+validator both succeed → ACTIVE, old → ARCHIVEDtest_swap_rollback— validator returns False → REVERTED, SwapResult.ROLLBACKtest_swap_error— loader raises → REVERTED, SwapResult.ERRORtest_swap_skipped— no STAGED version → SwapResult.SKIPPEDtest_swap_timeout— validator times out → SwapResult.ERRORtest_swap_all_staged— multiple staged modules → correct events listtest_concurrent_swap— asyncio.gather on same module → serialised via locktest_last_event— history populated correctly per moduletest_stats_counter— stats() reflects outcome distributiontest_registry_integration— end-to-end: register STAGED → swap → ACTIVEtest_no_swapper_cognitive_cycle— swapper=None path is a no-optest_prometheus_labels— metrics emitted with correct module+result labelsImplementation Order
latest_staged()andlist_staged_modules()toInMemoryModuleRegistrySwapResultenum andSwapEvent+SwapConfigfrozen dataclassesHotSwapperProtocolLiveHotSwapper.__init__()with lock dict and Prometheus helpersLiveHotSwapper.swap()— lock acquire → staged lookup → load → validate → commit/revertLiveHotSwapper.swap_all_staged()— gather over staged moduleslast_event()andstats()accessorsHotSwapperintoCognitiveCycle._synthesis_step()_load_module()and_validate_module()stub hooks toCognitiveCyclePhase 15 Sub-Phase Tracker
Part of the Phase 15 — Runtime Self-Modification & Hot-Reload Architecture track.