Skip to content

Commit 4557416

Browse files
committed
feat: move the forward_chain stage to pure-stage
Signed-off-by: etorreborre <etorreborre@yahoo.com>
1 parent 7f9be7d commit 4557416

28 files changed

Lines changed: 1018 additions & 807 deletions

crates/amaru-consensus/src/consensus/effects/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
// limitations under the License.
1414

1515
pub mod block_effects;
16+
pub mod network_effects;
1617
pub mod store_effects;
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2025 PRAGMA
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::consensus::errors::ProcessingFailed;
16+
use crate::consensus::tip::HeaderTip;
17+
use amaru_kernel::peer::Peer;
18+
use amaru_kernel::{Header, Point};
19+
use amaru_ouroboros_traits::IsHeader;
20+
use anyhow::anyhow;
21+
use async_trait::async_trait;
22+
use pure_stage::{ExternalEffect, ExternalEffectAPI, Resources};
23+
use serde::{Deserialize, Serialize};
24+
use std::fmt::Display;
25+
use std::sync::Arc;
26+
27+
pub type ResourceForwardEventListener = Arc<dyn ForwardEventListener + Send + Sync>;
28+
29+
/// A listener interface for forward events (new headers or rollbacks).
30+
/// These events are either caught for tests or forwarded to downstream peers (see the TcpForwardEventListener implementation).
31+
#[async_trait]
32+
pub trait ForwardEventListener {
33+
async fn send(&self, event: ForwardEvent) -> anyhow::Result<()>;
34+
}
35+
36+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37+
pub enum ForwardEvent {
38+
Forward(Header),
39+
Backward(HeaderTip),
40+
}
41+
42+
impl ForwardEvent {
43+
pub fn point(&self) -> Point {
44+
match self {
45+
ForwardEvent::Forward(header) => header.point(),
46+
ForwardEvent::Backward(tip) => tip.point(),
47+
}
48+
}
49+
}
50+
51+
impl Display for ForwardEvent {
52+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53+
match self {
54+
ForwardEvent::Forward(header) => write!(f, "Forward({})", header.point()),
55+
ForwardEvent::Backward(tip) => write!(f, "Backward({})", tip),
56+
}
57+
}
58+
}
59+
60+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61+
pub struct ForwardEventEffect {
62+
peer: Peer,
63+
event: ForwardEvent,
64+
}
65+
66+
impl ForwardEventEffect {
67+
pub fn new(peer: &Peer, event: ForwardEvent) -> Self {
68+
Self {
69+
peer: peer.clone(),
70+
event,
71+
}
72+
}
73+
}
74+
75+
impl ExternalEffect for ForwardEventEffect {
76+
#[expect(clippy::expect_used)]
77+
fn run(
78+
self: Box<Self>,
79+
resources: Resources,
80+
) -> pure_stage::BoxFuture<'static, Box<dyn pure_stage::SendData>> {
81+
Box::pin(async move {
82+
let listener = resources
83+
.get::<ResourceForwardEventListener>()
84+
.expect("ForwardEventEffect requires a ForwardEventListener")
85+
.clone();
86+
87+
let point = self.event.point();
88+
let result: <Self as ExternalEffectAPI>::Response =
89+
listener.send(self.event).await.map_err(|e| {
90+
ProcessingFailed::new(
91+
&self.peer,
92+
anyhow!("Cannot send the forward event {}: {e}", &point),
93+
)
94+
});
95+
Box::new(result) as Box<dyn pure_stage::SendData>
96+
})
97+
}
98+
}
99+
100+
impl ExternalEffectAPI for ForwardEventEffect {
101+
type Response = Result<(), ProcessingFailed>;
102+
}

crates/amaru-consensus/src/consensus/events.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -257,6 +257,16 @@ pub enum BlockValidationResult {
257257
},
258258
}
259259

260+
impl BlockValidationResult {
261+
pub fn peer(&self) -> Peer {
262+
match self {
263+
BlockValidationResult::BlockValidated { peer, .. } => peer.clone(),
264+
BlockValidationResult::BlockValidationFailed { peer, .. } => peer.clone(),
265+
BlockValidationResult::RolledBackTo { peer, .. } => peer.clone(),
266+
}
267+
}
268+
}
269+
260270
impl PartialEq for BlockValidationResult {
261271
fn eq(&self, other: &Self) -> bool {
262272
match (self, other) {
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
// Copyright 2025 PRAGMA
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::consensus::effects::network_effects::{ForwardEvent, ForwardEventEffect};
16+
use crate::consensus::errors::{ProcessingFailed, ValidationFailed};
17+
use crate::consensus::events::BlockValidationResult;
18+
use crate::consensus::span::adopt_current_span;
19+
use crate::consensus::tip::{AsHeaderTip, HeaderTip};
20+
use amaru_kernel::Point;
21+
use amaru_ouroboros_traits::IsHeader;
22+
use anyhow::anyhow;
23+
use pure_stage::{Effects, StageRef};
24+
use tracing::{Level, error, info, instrument, trace};
25+
26+
pub const EVENT_TARGET: &str = "amaru::consensus::forward_chain";
27+
28+
type State = (
29+
HeaderTip,
30+
StageRef<ValidationFailed>,
31+
StageRef<ProcessingFailed>,
32+
);
33+
34+
/// The forward chain stage forwards the headers of validated blocks to downstream peers, via the
35+
/// `ForwardEventEffect`. The current node tip is maintained in order to double check that the header
36+
/// we sent out is correct
37+
#[instrument(
38+
level = Level::TRACE,
39+
skip_all,
40+
name = "stage.forward_chain",
41+
)]
42+
pub async fn stage(
43+
state: State,
44+
msg: BlockValidationResult,
45+
eff: Effects<BlockValidationResult>,
46+
) -> State {
47+
adopt_current_span(&msg);
48+
let (mut our_tip, validation_errors, processing_errors) = state;
49+
match msg {
50+
BlockValidationResult::BlockValidated { peer, header, .. } => {
51+
// assert that the new tip is a direct successor of the old tip
52+
assert_eq!(header.block_height(), our_tip.block_height() + 1);
53+
match header.parent() {
54+
Some(parent) => assert_eq!(parent, our_tip.hash()),
55+
None => assert_eq!(our_tip, HeaderTip::new(Point::Origin, 0)),
56+
}
57+
our_tip = header.as_header_tip();
58+
trace!(
59+
target: EVENT_TARGET,
60+
tip = %header.point(),
61+
"tip_changed"
62+
);
63+
64+
if let Err(e) = eff
65+
.external(ForwardEventEffect::new(
66+
&peer,
67+
ForwardEvent::Forward(header.clone()),
68+
))
69+
.await
70+
{
71+
error!(
72+
target: EVENT_TARGET,
73+
%e,
74+
"failed to send forward event"
75+
);
76+
eff.send(&processing_errors, ProcessingFailed::new(&peer, anyhow!(e)))
77+
.await
78+
}
79+
}
80+
BlockValidationResult::RolledBackTo {
81+
peer,
82+
rollback_header,
83+
..
84+
} => {
85+
info!(
86+
target: EVENT_TARGET,
87+
point = %rollback_header.point(),
88+
"rolled_back_to"
89+
);
90+
91+
our_tip = rollback_header.as_header_tip();
92+
if let Err(e) = eff
93+
.external(ForwardEventEffect::new(
94+
&peer,
95+
ForwardEvent::Backward(rollback_header.as_header_tip()),
96+
))
97+
.await
98+
{
99+
error!(
100+
target: EVENT_TARGET,
101+
%e,
102+
"failed to send backward event"
103+
);
104+
eff.send(&processing_errors, ProcessingFailed::new(&peer, anyhow!(e)))
105+
.await
106+
}
107+
}
108+
BlockValidationResult::BlockValidationFailed { point, .. } => {
109+
error!(
110+
target: EVENT_TARGET,
111+
slot = %point.slot_or_default(),
112+
hash = %point.hash(),
113+
"block validation failed"
114+
);
115+
}
116+
}
117+
(our_tip, validation_errors, processing_errors)
118+
}

crates/amaru-consensus/src/consensus/stages/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
// limitations under the License.
1414

1515
pub mod fetch_block;
16+
pub mod forward_chain;
1617
pub mod receive_header;
1718
pub mod select_chain;
1819
pub mod store_block;

crates/amaru-consensus/src/consensus/tip.rs

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use amaru_kernel::{HEADER_HASH_SIZE, ORIGIN_HASH, Point, cbor};
15+
use amaru_kernel::{HEADER_HASH_SIZE, Header, ORIGIN_HASH, Point, cbor};
1616
use amaru_ouroboros_traits::is_header::IsHeader;
1717
use pallas_crypto::hash::Hash;
1818
use serde::{Deserialize, Serialize};
@@ -50,14 +50,21 @@ impl<H: IsHeader + Display> Display for Tip<H> {
5050
}
5151
}
5252

53-
impl<H> Tip<H> {
53+
impl<H: IsHeader> Tip<H> {
5454
/// Return the header for this Tip if the Tip doesn't represent the Genesis header.
5555
pub fn to_header(&self) -> Option<&H> {
5656
match self {
5757
Tip::Genesis => None,
5858
Tip::Hdr(h) => Some(h),
5959
}
6060
}
61+
62+
pub fn block_height(&self) -> u64 {
63+
match self {
64+
Tip::Genesis => 0,
65+
Tip::Hdr(h) => h.block_height(),
66+
}
67+
}
6168
}
6269

6370
impl<H: IsHeader, C> cbor::encode::Encode<C> for Tip<H>
@@ -137,3 +144,45 @@ impl<H: IsHeader> From<Option<H>> for Tip<H> {
137144
}
138145
}
139146
}
147+
148+
/// This is equivalent to pallas_network::miniprotocols::chainsync::protocol::Tip
149+
/// but does not incur the dependency on pallas_network.
150+
///
151+
/// This can also replace Tip<Header> in places where we only need the Point and block height,
152+
/// without needing the full Header.
153+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154+
pub struct HeaderTip(Point, u64);
155+
156+
impl Display for HeaderTip {
157+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
158+
write!(f, "{}.{}", self.1, self.0.hash())
159+
}
160+
}
161+
162+
impl HeaderTip {
163+
pub fn new(point: Point, block_height: u64) -> Self {
164+
Self(point, block_height)
165+
}
166+
167+
pub fn hash(&self) -> Hash<HEADER_HASH_SIZE> {
168+
self.0.hash()
169+
}
170+
171+
pub fn point(&self) -> Point {
172+
self.0.clone()
173+
}
174+
175+
pub fn block_height(&self) -> u64 {
176+
self.1
177+
}
178+
}
179+
180+
pub trait AsHeaderTip {
181+
fn as_header_tip(&self) -> HeaderTip;
182+
}
183+
184+
impl AsHeaderTip for Header {
185+
fn as_header_tip(&self) -> HeaderTip {
186+
HeaderTip::new(self.point(), self.block_height())
187+
}
188+
}

crates/amaru/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ sysinfo.workspace = true
3636
thiserror.workspace = true
3737
tokio = { workspace = true, features = [
3838
"fs",
39+
"net",
3940
"rt",
4041
"rt-multi-thread",
4142
"signal",

crates/amaru/src/stages/build_stage_graph.rs

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,14 @@
1414

1515
use amaru_consensus::consensus::effects::store_effects::ResourceHeaderValidation;
1616
use amaru_consensus::consensus::errors::{ProcessingFailed, ValidationFailed};
17-
use amaru_consensus::consensus::events::{BlockValidationResult, ChainSyncEvent};
17+
use amaru_consensus::consensus::events::ChainSyncEvent;
1818
use amaru_consensus::consensus::stages::select_chain::SelectChain;
1919
use amaru_consensus::consensus::stages::validate_header::ValidateHeader;
2020
use amaru_consensus::consensus::stages::{
21-
fetch_block, receive_header, select_chain, store_block, store_header, validate_block,
22-
validate_header,
21+
fetch_block, forward_chain, receive_header, select_chain, store_block, store_header,
22+
validate_block, validate_header,
2323
};
24+
use amaru_consensus::consensus::tip::HeaderTip;
2425
use amaru_kernel::protocol_parameters::GlobalParameters;
2526
use pure_stage::{StageGraph, StageRef};
2627

@@ -31,8 +32,8 @@ pub fn build_stage_graph(
3132
global_parameters: &GlobalParameters,
3233
header_validation: ResourceHeaderValidation,
3334
chain_selector: SelectChain,
35+
our_tip: HeaderTip,
3436
network: &mut impl StageGraph,
35-
outputs: StageRef<BlockValidationResult>,
3637
) -> StageRef<ChainSyncEvent> {
3738
let receive_header_stage = network.stage("receive_header", receive_header::stage);
3839
let store_header_stage = network.stage("store_header", store_header::stage);
@@ -41,6 +42,7 @@ pub fn build_stage_graph(
4142
let fetch_block_stage = network.stage("fetch_block", fetch_block::stage);
4243
let store_block_stage = network.stage("store_block", store_block::stage);
4344
let validate_block_stage = network.stage("validate_block", validate_block::stage);
45+
let forward_chain_stage = network.stage("forward_chain", forward_chain::stage);
4446

4547
// TODO: currently only validate_header errors, will need to grow into all error handling
4648
let validation_errors_stage = network.stage(
@@ -65,10 +67,18 @@ pub fn build_stage_graph(
6567
let validation_errors_stage = network.wire_up(validation_errors_stage, ());
6668
let processing_errors_stage = network.wire_up(processing_errors_stage, ());
6769

70+
let forward_chain_stage = network.wire_up(
71+
forward_chain_stage,
72+
(
73+
our_tip,
74+
validation_errors_stage.clone().without_state(),
75+
processing_errors_stage.clone().without_state(),
76+
),
77+
);
6878
let validate_block_stage = network.wire_up(
6979
validate_block_stage,
7080
(
71-
outputs,
81+
forward_chain_stage.without_state(),
7282
validation_errors_stage.clone().without_state(),
7383
processing_errors_stage.clone().without_state(),
7484
),

0 commit comments

Comments
 (0)