Phase 42.1 — ConsensusEngine
Overview
The ConsensusEngine implements distributed consensus protocols for ASI-Build's multi-node AI infrastructure. It provides Raft-based log replication, Paxos-based agreement, and PBFT extensions for Byzantine fault tolerance, ensuring all nodes agree on model state, training progress, and configuration changes.
Academic Foundation
- Lamport (1998) — Paxos: the foundational consensus algorithm for crash-tolerant distributed systems
- Ongaro & Ousterhout (2014) — Raft: an understandable consensus algorithm with strong leader, log replication, and membership changes
- Castro & Liskov (1999) — PBFT: practical Byzantine fault tolerance tolerating f < n/3 Byzantine nodes
- Fischer, Lynch & Paterson (1985) — FLP impossibility: no deterministic consensus in async systems with even one crash failure
Architecture
ConsensusEngine
├── RaftConsensus
│ ├── LeaderElection # Randomized timeouts, term management
│ ├── LogReplicator # AppendEntries RPC, commit index tracking
│ ├── MembershipManager # Joint consensus for config changes
│ └── SnapshotCompactor # Log compaction via snapshots
├── PaxosConsensus
│ ├── Proposer # Prepare/Accept phase management
│ ├── Acceptor # Promise/Accept state machine
│ ├── Learner # Value learning and dissemination
│ └── MultiPaxos # Leader-based optimization for repeated consensus
├── ByzantineConsensus
│ ├── PBFTEngine # Pre-prepare/Prepare/Commit protocol
│ ├── ViewChanger # View change on leader failure
│ ├── MessageAuthenticator # MAC-based message authentication
│ └── CheckpointProtocol # Periodic state checkpoints for garbage collection
├── ModelAgreement
│ ├── GradientConsensus # Agree on gradient updates across nodes
│ ├── ParameterSync # Synchronized parameter server protocol
│ └── VersionVector # Causal ordering of model versions
└── ConsensusMetrics
├── LatencyTracker # Consensus round latency monitoring
├── ThroughputMeter # Operations per second measurement
└── SafetyVerifier # Invariant checking (single leader, log matching)
Interface Specification
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Optional, Callable
import asyncio
class ConsensusProtocol(Enum):
RAFT = "raft"
PAXOS = "paxos"
PBFT = "pbft"
MULTI_PAXOS = "multi_paxos"
class NodeState(Enum):
FOLLOWER = "follower"
CANDIDATE = "candidate"
LEADER = "leader"
@dataclass
class LogEntry:
term: int
index: int
command: Any
timestamp: float
client_id: str
@dataclass
class ConsensusConfig:
protocol: ConsensusProtocol = ConsensusProtocol.RAFT
cluster_size: int = 5
election_timeout_ms: tuple = (150, 300) # min, max randomized
heartbeat_interval_ms: int = 50
max_log_entries_per_append: int = 100
snapshot_threshold: int = 10000
byzantine_tolerance: bool = False
max_byzantine_nodes: int = 1 # f < n/3
class ConsensusEngine:
"""
Distributed consensus engine supporting Raft, Paxos, and PBFT protocols.
Provides strong consistency guarantees for distributed AI model state,
training coordination, and configuration management.
"""
def __init__(self, config: ConsensusConfig):
self.config = config
self.state = NodeState.FOLLOWER
self.current_term = 0
self.voted_for = None
self.log: list[LogEntry] = []
self.commit_index = 0
self.last_applied = 0
async def start_node(self, node_id: str, peers: list[str]) -> None:
"""Initialize and start the consensus node."""
...
async def propose(self, command: Any, client_id: str) -> LogEntry:
"""Propose a new command to the consensus group. Only leader can accept proposals."""
...
async def request_vote(self, candidate_id: str, term: int,
last_log_index: int, last_log_term: int) -> tuple[int, bool]:
"""Handle RequestVote RPC (Raft) or Prepare (Paxos)."""
...
async def append_entries(self, leader_id: str, term: int,
prev_log_index: int, prev_log_term: int,
entries: list[LogEntry], leader_commit: int) -> tuple[int, bool]:
"""Handle AppendEntries RPC for log replication."""
...
async def get_leader(self) -> Optional[str]:
"""Return current leader node ID, None if no leader elected."""
...
async def add_node(self, node_id: str) -> bool:
"""Add a new node via joint consensus (Raft) or reconfiguration."""
...
async def remove_node(self, node_id: str) -> bool:
"""Remove a node from the consensus group."""
...
async def create_snapshot(self) -> bytes:
"""Create a state snapshot for log compaction."""
...
def get_metrics(self) -> dict:
"""Return consensus performance metrics."""
...
Testing Requirements
class TestConsensusEngine:
async def test_leader_election_single_candidate(self): ...
async def test_leader_election_split_vote(self): ...
async def test_log_replication_basic(self): ...
async def test_log_replication_with_follower_lag(self): ...
async def test_leader_failure_and_reelection(self): ...
async def test_network_partition_minority_cannot_commit(self): ...
async def test_network_partition_majority_continues(self): ...
async def test_byzantine_node_detection(self): ...
async def test_pbft_3f_plus_1_tolerance(self): ...
async def test_snapshot_compaction(self): ...
async def test_membership_change_joint_consensus(self): ...
async def test_gradient_consensus_convergence(self): ...
async def test_causal_ordering_version_vectors(self): ...
async def test_concurrent_proposals_linearizability(self): ...
Acceptance Criteria
Dependencies
- Phase 38 (Federated Learning) —
FederatedAggregator for aggregation patterns
- Phase 41 (Adversarial Robustness) — Byzantine detection techniques
References
Phase 42.1 — ConsensusEngine
Overview
The ConsensusEngine implements distributed consensus protocols for ASI-Build's multi-node AI infrastructure. It provides Raft-based log replication, Paxos-based agreement, and PBFT extensions for Byzantine fault tolerance, ensuring all nodes agree on model state, training progress, and configuration changes.
Academic Foundation
Architecture
Interface Specification
Testing Requirements
Acceptance Criteria
Dependencies
FederatedAggregatorfor aggregation patternsReferences