Skip to content

Commit ee852b8

Browse files
authored
Merge pull request #464 from pragma-org/etorreborre/refactor/move-forward-chain-to-pure-stage
refactor: move forward chain to pure stage
2 parents 5bb0573 + 97125bb commit ee852b8

35 files changed

Lines changed: 1100 additions & 938 deletions

crates/amaru-consensus/src/consensus/effects/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,5 @@
1313
// limitations under the License.
1414

1515
pub mod block_effects;
16+
pub mod network_effects;
1617
pub mod store_effects;
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
// Copyright 2025 PRAGMA
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::consensus::errors::ProcessingFailed;
16+
use crate::consensus::tip::HeaderTip;
17+
use amaru_kernel::peer::Peer;
18+
use amaru_kernel::{Header, Point};
19+
use amaru_ouroboros_traits::IsHeader;
20+
use anyhow::anyhow;
21+
use async_trait::async_trait;
22+
use pure_stage::{ExternalEffect, ExternalEffectAPI, Resources};
23+
use serde::{Deserialize, Serialize};
24+
use std::fmt::Display;
25+
use std::sync::Arc;
26+
27+
pub type ResourceForwardEventListener = Arc<dyn ForwardEventListener + Send + Sync>;
28+
29+
/// A listener interface for forward events (new headers or rollbacks).
30+
/// These events are either caught for tests or forwarded to downstream peers (see the TcpForwardEventListener implementation).
31+
#[async_trait]
32+
pub trait ForwardEventListener {
33+
async fn send(&self, event: ForwardEvent) -> anyhow::Result<()>;
34+
}
35+
36+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
37+
pub enum ForwardEvent {
38+
Forward(Header),
39+
Backward(HeaderTip),
40+
}
41+
42+
impl ForwardEvent {
43+
pub fn point(&self) -> Point {
44+
match self {
45+
ForwardEvent::Forward(header) => header.point(),
46+
ForwardEvent::Backward(tip) => tip.point(),
47+
}
48+
}
49+
}
50+
51+
impl Display for ForwardEvent {
52+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53+
match self {
54+
ForwardEvent::Forward(header) => write!(f, "Forward({})", header.point()),
55+
ForwardEvent::Backward(tip) => write!(f, "Backward({})", tip),
56+
}
57+
}
58+
}
59+
60+
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
61+
pub struct ForwardEventEffect {
62+
peer: Peer,
63+
event: ForwardEvent,
64+
}
65+
66+
impl ForwardEventEffect {
67+
pub fn new(peer: &Peer, event: ForwardEvent) -> Self {
68+
Self {
69+
peer: peer.clone(),
70+
event,
71+
}
72+
}
73+
}
74+
75+
impl ExternalEffect for ForwardEventEffect {
76+
#[expect(clippy::expect_used)]
77+
fn run(
78+
self: Box<Self>,
79+
resources: Resources,
80+
) -> pure_stage::BoxFuture<'static, Box<dyn pure_stage::SendData>> {
81+
Box::pin(async move {
82+
let listener = resources
83+
.get::<ResourceForwardEventListener>()
84+
.expect("ForwardEventEffect requires a ForwardEventListener")
85+
.clone();
86+
87+
let point = self.event.point();
88+
let result: <Self as ExternalEffectAPI>::Response =
89+
listener.send(self.event).await.map_err(|e| {
90+
ProcessingFailed::new(
91+
&self.peer,
92+
anyhow!("Cannot send the forward event {}: {e}", &point),
93+
)
94+
});
95+
Box::new(result) as Box<dyn pure_stage::SendData>
96+
})
97+
}
98+
}
99+
100+
impl ExternalEffectAPI for ForwardEventEffect {
101+
type Response = Result<(), ProcessingFailed>;
102+
}

crates/amaru-consensus/src/consensus/events.rs

Lines changed: 20 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ pub enum ValidateHeaderEvent {
148148
},
149149
Rollback {
150150
peer: Peer,
151-
rollback_point: Point,
151+
rollback_header: Header,
152152
#[serde(skip, default = "Span::none")]
153153
span: Span,
154154
},
@@ -166,7 +166,7 @@ pub enum ValidateBlockEvent {
166166
},
167167
Rollback {
168168
peer: Peer,
169-
rollback_point: Point,
169+
rollback_header: Header,
170170
#[serde(skip, default = "Span::none")]
171171
span: Span,
172172
},
@@ -192,7 +192,7 @@ impl Debug for ValidateBlockEvent {
192192
.finish(),
193193
ValidateBlockEvent::Rollback {
194194
peer,
195-
rollback_point,
195+
rollback_header: rollback_point,
196196
..
197197
} => f
198198
.debug_struct("Rollback")
@@ -210,25 +210,23 @@ impl PartialEq for ValidateBlockEvent {
210210
ValidateBlockEvent::Validated {
211211
peer: p1,
212212
header: h1,
213-
block: b1,
214213
..
215214
},
216215
ValidateBlockEvent::Validated {
217216
peer: p2,
218217
header: h2,
219-
block: b2,
220218
..
221219
},
222-
) => p1 == p2 && h1 == h2 && b1 == b2,
220+
) => p1 == p2 && h1 == h2,
223221
(
224222
ValidateBlockEvent::Rollback {
225223
peer: p1,
226-
rollback_point: rp1,
224+
rollback_header: rp1,
227225
..
228226
},
229227
ValidateBlockEvent::Rollback {
230228
peer: p2,
231-
rollback_point: rp2,
229+
rollback_header: rp2,
232230
..
233231
},
234232
) => p1 == p2 && rp1 == rp2,
@@ -242,11 +240,8 @@ pub enum BlockValidationResult {
242240
BlockValidated {
243241
peer: Peer,
244242
header: Header,
245-
#[serde(skip, default = "default_block")]
246-
block: RawBlock,
247243
#[serde(skip, default = "Span::none")]
248244
span: Span,
249-
block_height: u64,
250245
},
251246
BlockValidationFailed {
252247
peer: Peer,
@@ -256,31 +251,37 @@ pub enum BlockValidationResult {
256251
},
257252
RolledBackTo {
258253
peer: Peer,
259-
rollback_point: Point,
254+
rollback_header: Header,
260255
#[serde(skip, default = "Span::none")]
261256
span: Span,
262257
},
263258
}
264259

260+
impl BlockValidationResult {
261+
pub fn peer(&self) -> Peer {
262+
match self {
263+
BlockValidationResult::BlockValidated { peer, .. } => peer.clone(),
264+
BlockValidationResult::BlockValidationFailed { peer, .. } => peer.clone(),
265+
BlockValidationResult::RolledBackTo { peer, .. } => peer.clone(),
266+
}
267+
}
268+
}
269+
265270
impl PartialEq for BlockValidationResult {
266271
fn eq(&self, other: &Self) -> bool {
267272
match (self, other) {
268273
(
269274
BlockValidationResult::BlockValidated {
270275
peer: p1,
271276
header: hd1,
272-
block: b1,
273-
block_height: bh1,
274277
..
275278
},
276279
BlockValidationResult::BlockValidated {
277280
peer: p2,
278281
header: hd2,
279-
block: b2,
280-
block_height: bh2,
281282
..
282283
},
283-
) => p1 == p2 && hd1 == hd2 && b1 == b2 && bh1 == bh2,
284+
) => p1 == p2 && hd1 == hd2,
284285
(
285286
BlockValidationResult::BlockValidationFailed {
286287
peer: p1,
@@ -296,12 +297,12 @@ impl PartialEq for BlockValidationResult {
296297
(
297298
BlockValidationResult::RolledBackTo {
298299
peer: p1,
299-
rollback_point: rp1,
300+
rollback_header: rp1,
300301
..
301302
},
302303
BlockValidationResult::RolledBackTo {
303304
peer: p2,
304-
rollback_point: rp2,
305+
rollback_header: rp2,
305306
..
306307
},
307308
) => p1 == p2 && rp1 == rp2,

crates/amaru-consensus/src/consensus/headers_tree/data_generation/actions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -426,12 +426,12 @@ pub fn make_best_chains_from_results(results: &[SelectionResult]) -> Vec<Chain>
426426
| Back(RollbackChainSelection::SwitchToFork(fork)) => {
427427
let rollback_position = current_best_chain
428428
.iter()
429-
.position(|h| h.hash() == fork.rollback_point.hash());
429+
.position(|h| h.hash() == fork.rollback_header.hash());
430430
assert!(
431431
rollback_position.is_some(),
432432
"after the action {}, we have a rollback position that does not exist with hash {}",
433433
i + 1,
434-
fork.rollback_point.hash()
434+
fork.rollback_header.hash()
435435
);
436436
current_best_chain.truncate(rollback_position.unwrap() + 1);
437437
current_best_chain.extend(fork.fork.clone())

crates/amaru-consensus/src/consensus/headers_tree/headers_tree.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,7 @@ impl<H: IsHeader + Clone + Debug + PartialEq + Eq> HeadersTree<H> {
473473

474474
Fork {
475475
peer: best_peer.clone(),
476-
rollback_point: self.unsafe_get_header(&intersection_hash).point(),
476+
rollback_header: self.unsafe_get_header(&intersection_hash),
477477
fork: fork_fragment,
478478
}
479479
}
@@ -953,7 +953,7 @@ mod tests {
953953
let fork: Vec<TestHeader> = new_bob_headers;
954954
let fork = Fork {
955955
peer: bob.clone(),
956-
rollback_point: middle.point(),
956+
rollback_header: middle,
957957
fork,
958958
};
959959

@@ -981,7 +981,7 @@ mod tests {
981981
bob_headers.push(bob_new_tip);
982982
let fork = Fork {
983983
peer: bob.clone(),
984-
rollback_point: anchor.point(),
984+
rollback_header: *anchor,
985985
fork: bob_headers,
986986
};
987987

crates/amaru-consensus/src/consensus/stages/fetch_block.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ pub async fn stage(
6565
}
6666
ValidateHeaderEvent::Rollback {
6767
peer,
68-
rollback_point,
68+
rollback_header: rollback_point,
6969
span,
7070
..
7171
} => {
7272
eff.send(
7373
&downstream,
7474
ValidateBlockEvent::Rollback {
7575
peer,
76-
rollback_point,
76+
rollback_header: rollback_point,
7777
span,
7878
},
7979
)

0 commit comments

Comments
 (0)