Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions src/replication/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,24 @@ pub struct ReplicationCoordinator {
/// tasks; the mutex serializes them — anti-entropy is sequential
/// per peer-pair by protocol).
session: Mutex<Session>,
/// Sender half of the inbound-message channel. The application's
/// [`Transport::listen`] loop calls
/// [`Self::deliver_inbound`] which routes here; the
/// [`super::scheduler::ReplicationScheduler`] reads the other
/// end via [`Self::recv_inbound`] to step a round between
/// `SendThenWait` and the next inbound. Bounded capacity 8 —
/// enough to absorb the few in-flight Summary / Diff / Deliver
/// messages without blocking the listen loop on a slow round.
inbound_tx: tokio::sync::mpsc::Sender<ReplicationMessage>,
inbound_rx: Mutex<tokio::sync::mpsc::Receiver<ReplicationMessage>>,
}

impl ReplicationCoordinator {
/// Default inbound mpsc capacity. A round in flight has at most
/// 3 messages queued (Summary + Diff + Deliver); 8 gives slack
/// for a slightly-late deliver while the scheduler is mid-step.
pub const INBOUND_CHANNEL_CAPACITY: usize = 8;

pub fn new(
transport: Arc<dyn Transport>,
peer_key_id: impl Into<String>,
Expand All @@ -126,6 +141,7 @@ impl ReplicationCoordinator {
provider: Arc<dyn StateProvider>,
applier: Arc<Mutex<dyn StateApplier>>,
) -> Self {
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(Self::INBOUND_CHANNEL_CAPACITY);
Self {
transport,
peer_key_id: peer_key_id.into(),
Expand All @@ -134,9 +150,34 @@ impl ReplicationCoordinator {
provider,
applier,
session: Mutex::new(Session::new(role, kind)),
inbound_tx,
inbound_rx: Mutex::new(inbound_rx),
}
}

/// Deliver an inbound replication message into this coordinator's
/// queue. Called by the application's [`Transport::listen`] loop
/// after [`Self::parse_inbound_bytes`] yields a
/// [`ReplicationMessage`].
///
/// Returns `Err(NoRoundInProgress)` if the inbound channel is full
/// (the scheduler isn't keeping up; back-pressure surfaces). The
/// listen loop typically logs + drops the frame.
pub fn deliver_inbound(&self, msg: ReplicationMessage) -> Result<(), CoordinatorError> {
self.inbound_tx
.try_send(msg)
.map_err(|_| CoordinatorError::NoRoundInProgress)
}

/// Wait for the next inbound replication message from the
/// listen-loop-fed queue. Returns `None` if the channel is
/// permanently closed (the coordinator is being dropped). The
/// scheduler awaits on this between `SendThenWait` and the
/// next round step.
pub async fn recv_inbound(&self) -> Option<ReplicationMessage> {
self.inbound_rx.lock().await.recv().await
}

/// Step the held [`Session`] one transition forward.
///
/// - `msg = None` — start a new round (initiator only). Returns
Expand Down
46 changes: 46 additions & 0 deletions src/replication/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,54 @@
//! counters for round counts, bytes transferred, diff sizes are
//! surfaced via `tracing` spans the binding PR will wire to a
//! metric backend.
//!
//! ## NAT traversal
//!
//! Edge does **not** implement STUN / TURN / ICE — and shouldn't.
//!
//! Reticulum (the canonical wire per MISSION §1.4) routes by
//! **cryptographic destination address** (`sha256(pubkey)[..16]`,
//! the same shape this crate's `transport::addressing` builds for
//! the packet-radio plug). Transit nodes carry packets by
//! destination-hash without decrypt capability — the relay can't
//! read what it's carrying, can't even tell which CEG namespace
//! it's in. As long as a NAT'd peer's announce graph reaches *any*
//! publicly-reachable transport node, mesh routing carries packets
//! both ways without endpoint-translation gymnastics.
//!
//! The federation topology itself supplies that public-side peer
//! set, by construction:
//!
//! - **Registry servers** — substrate of the federation; public by
//! definition (CEG 0.15 §0.4, registry-anchored normative refs).
//! - **CIRISLens (LensCore → edge)** — public observability surface
//! per the lens-opt-in model. The opt-in dimension is at the
//! policy layer (does lens get to see this peer's CEG envelopes
//! for telemetry?); the transit layer (Reticulum relay through
//! lens's edge instance) is orthogonal — lens can relay packets
//! it can't read.
//! - **Agent 2.9.6 (CEG/RET-native)** — community-server-opt-in
//! instances on public IPs join the transport graph for free.
//! Mobile agents behind NAT inherit the same benefit they give
//! to other NAT'd peers.
//! - **Any other CEG 0.15 community peer with a public interface.**
//!
//! Practical implication: the only operator-doc bit is "your peer
//! set should include at least one publicly-reachable CIRIS peer"
//! — which is *automatically* satisfied if the operator points at
//! the registry / lens / public-agent set at all.
//!
//! **HTTP transport** stays the one exception: its accept-side
//! still needs port-forward / reverse-proxy to be reachable from
//! outside the NAT. That's operator config (Cloudflare Tunnel,
//! nginx reverse proxy, etc.), not edge code. Operators behind
//! hard NATs should use Reticulum, which is what MISSION §1.4
//! designates canonical anyway.

pub mod coordinator;
pub mod directory;
pub mod protocol;
pub mod scheduler;
pub mod session;
pub mod summary;
pub mod wire_frame;
Expand All @@ -90,6 +134,8 @@ pub use protocol::{
SummaryMessage,
};
#[doc(inline)]
pub use scheduler::{ReplicationScheduler, RoundEvent, SchedulerConfig};
#[doc(inline)]
pub use session::{ReplicationOutcome, Session, SessionRole};
#[doc(inline)]
pub use summary::{LocalState, StalenessSignal, StateApplier, StateProvider};
Expand Down
Loading
Loading