Skip to content

Commit 9017fa4

Browse files
committed
add network tests for chain sync
1 parent 3c9b2a2 commit 9017fa4

7 files changed

Lines changed: 159 additions & 32 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ tokio-util = "0.7.12"
5252
tokio-stream = "0.1.17"
5353
tracing = "0.1.40"
5454
tracing-opentelemetry = "0.28.0"
55-
tracing-subscriber = "0.3.18"
55+
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
5656

5757
amaru-consensus = { path = "crates/amaru-consensus" }
5858
amaru-kernel = { path = "crates/amaru-kernel" }

crates/amaru-consensus/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,4 @@ proptest.workspace = true
4141
rand.workspace = true
4242
serde_json.workspace = true
4343
tempfile.workspace = true
44+
tracing-subscriber.workspace = true

crates/amaru-consensus/src/chain_forward.rs

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ use acto::{AcTokio, ActoCell, ActoMsgSuper, ActoRef, ActoRuntime};
1717
use amaru_kernel::{Hash, Header};
1818
use amaru_ledger::BlockValidationResult;
1919
use amaru_ouroboros::IsHeader;
20-
use client_protocol::{client_protocols, ChainSyncMsg};
20+
use client_protocol::{client_protocols, ClientProtocolMsg};
2121
use client_state::{to_pallas_point, ClientOp};
2222
use gasket::framework::*;
2323
use pallas_network::{
@@ -53,10 +53,18 @@ pub struct ForwardStage {
5353
pub network_magic: u64,
5454
pub runtime: AcTokio,
5555
pub listen_address: String,
56+
pub downstream: Option<ActoRef<ForwardEvent>>,
57+
}
58+
59+
#[derive(Debug, Clone)]
60+
pub enum ForwardEvent {
61+
Listening(u16),
62+
Forward(Point),
5663
}
5764

5865
impl ForwardStage {
5966
pub fn new(
67+
downstream: Option<ActoRef<ForwardEvent>>,
6068
store: Arc<Mutex<dyn ChainStore<Header>>>,
6169
network_magic: u64,
6270
listen_address: &str,
@@ -68,6 +76,7 @@ impl ForwardStage {
6876
network_magic,
6977
runtime,
7078
listen_address: listen_address.to_string(),
79+
downstream,
7180
}
7281
}
7382
}
@@ -94,6 +103,11 @@ impl Drop for Worker {
94103
impl gasket::framework::Worker<ForwardStage> for Worker {
95104
async fn bootstrap(stage: &ForwardStage) -> Result<Self, WorkerError> {
96105
let server = TcpListener::bind(&stage.listen_address).await.or_panic()?;
106+
if let Some(downstream) = &stage.downstream {
107+
tracing::debug!("sending listening event");
108+
downstream.send(ForwardEvent::Listening(server.local_addr().unwrap().port()));
109+
}
110+
97111
let (tx, incoming_peers) = mpsc::channel(10);
98112

99113
let clients = stage
@@ -175,6 +189,10 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
175189
self.clients.send(ClientMsg::Op(ClientOp::Forward(header)));
176190
}
177191

192+
if let Some(downstream) = &stage.downstream {
193+
downstream.send(ForwardEvent::Forward(to_pallas_point(point)));
194+
}
195+
178196
Ok(())
179197
}
180198
Unit::Block(BlockValidationResult::BlockForwardStorageFailed(point, span)) => {
@@ -255,7 +273,7 @@ async fn client_supervisor(
255273
}
256274
ActoMsgSuper::Message(ClientMsg::Op(op)) => {
257275
for client in clients.values() {
258-
client.send(ChainSyncMsg::Op(op.clone()));
276+
client.send(ClientProtocolMsg::Op(op.clone()));
259277
}
260278
}
261279
ActoMsgSuper::Supervision { id, name, result } => {

crates/amaru-consensus/src/chain_forward/client_protocol.rs

Lines changed: 13 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -26,41 +26,21 @@ pub enum ClientError {
2626
#[error("handler failure: {0}")]
2727
HandlerFailure(PanicOrAbort),
2828
#[error("chainsync error: {0}")]
29-
ChainSync(chainsync::ServerError),
29+
ChainSync(#[from] chainsync::ServerError),
3030
#[error("block fetch error: {0}")]
31-
BlockFetch(blockfetch::ServerError),
31+
BlockFetch(#[from] blockfetch::ServerError),
3232
#[error("tx submission error: {0}")]
33-
TxSubmission(txsubmission::Error),
33+
TxSubmission(#[from] txsubmission::Error),
3434
#[error("keep alive error: {0}")]
35-
KeepAlive(keepalive::ServerError),
35+
KeepAlive(#[from] keepalive::ServerError),
3636
}
3737

38-
impl From<chainsync::ServerError> for ClientError {
39-
fn from(value: chainsync::ServerError) -> Self {
40-
ClientError::ChainSync(value)
41-
}
42-
}
43-
44-
impl From<blockfetch::ServerError> for ClientError {
45-
fn from(value: blockfetch::ServerError) -> Self {
46-
ClientError::BlockFetch(value)
47-
}
48-
}
49-
50-
impl From<txsubmission::Error> for ClientError {
51-
fn from(value: txsubmission::Error) -> Self {
52-
ClientError::TxSubmission(value)
53-
}
54-
}
55-
56-
impl From<keepalive::ServerError> for ClientError {
57-
fn from(value: keepalive::ServerError) -> Self {
58-
ClientError::KeepAlive(value)
59-
}
38+
pub enum ClientProtocolMsg {
39+
Op(ClientOp),
6040
}
6141

6242
pub async fn client_protocols(
63-
mut cell: ActoCell<ChainSyncMsg, impl ActoRuntime, Result<(), ClientError>>,
43+
mut cell: ActoCell<ClientProtocolMsg, impl ActoRuntime, Result<(), ClientError>>,
6444
server: PeerServer,
6545
store: Arc<Mutex<dyn ChainStore<Header>>>,
6646
tip: Tip,
@@ -80,7 +60,9 @@ pub async fn client_protocols(
8060
});
8161

8262
while let ActoInput::Message(msg) = cell.recv().await {
83-
chain_sync.send(msg);
63+
match msg {
64+
ClientProtocolMsg::Op(op) => chain_sync.send(ChainSyncMsg::Op(op)),
65+
};
8466
}
8567

8668
Ok(())
@@ -105,11 +87,14 @@ async fn chain_sync(
10587
return Err(ClientError::EarlyRequestNext);
10688
};
10789

90+
tracing::debug!("finding headers between {:?} and {:?}", tip.0, req);
10891
let Some((intersection, client_at)) = find_headers_between(&*store.lock().await, &tip.0, &req)
10992
else {
93+
tracing::debug!("no intersection found");
11094
server.send_intersect_not_found(tip).await?;
11195
return Err(ClientError::NoIntersection);
11296
};
97+
tracing::debug!("intersection found: {client_at:?}");
11398
server
11499
.send_intersect_found(client_at.0.clone(), tip)
115100
.await?;

crates/amaru-consensus/src/chain_forward/tests.rs

Lines changed: 122 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,19 @@
1+
use super::{ForwardEvent, ForwardStage};
12
use crate::chain_forward::client_state::find_headers_between;
23
use crate::consensus::store::{ChainStore, Nonces, StoreError};
4+
use acto::{AcTokio, AcTokioRuntime, ActoCell, ActoInput, ActoRuntime};
35
use amaru_kernel::{Hash, Header};
4-
use pallas_network::miniprotocols::chainsync::Tip;
6+
use amaru_ledger::BlockValidationResult;
7+
use gasket::messaging::tokio::ChannelRecvAdapter;
8+
use gasket::runtime::spawn_stage;
9+
use pallas_network::facades::PeerClient;
10+
use pallas_network::miniprotocols::chainsync::{NextResponse, Tip};
511
use pallas_network::miniprotocols::Point;
12+
use std::future::Future;
13+
use std::sync::Arc;
614
use std::{collections::HashMap, fs::File, path::Path, str::FromStr};
15+
use tokio::sync::{mpsc, Mutex};
16+
use tracing_subscriber::EnvFilter;
717

818
impl ChainStore<Header> for HashMap<Hash<32>, Header> {
919
fn load_header(&self, hash: &Hash<32>) -> Option<Header> {
@@ -76,6 +86,10 @@ fn point(slot: u64, hash: &str) -> Point {
7686
Point::Specific(slot, hex(hash))
7787
}
7888

89+
fn amaru_point(slot: u64, hash: &str) -> amaru_kernel::Point {
90+
amaru_kernel::Point::Specific(slot, hex(hash))
91+
}
92+
7993
#[test]
8094
fn test_mk_store() {
8195
let store = mk_store(CHAIN_41);
@@ -151,3 +165,110 @@ fn find_headers_between_tip_and_lost() {
151165
let result = find_headers_between(&store, &tip, &points);
152166
assert!(result.is_none(), "{result:?}");
153167
}
168+
169+
#[test]
170+
fn test_chain_sync() {
171+
let _ = tracing_subscriber::fmt()
172+
.with_env_filter(EnvFilter::from_default_env())
173+
.with_test_writer()
174+
.try_init();
175+
176+
// step 0a: prepare the store
177+
let store = Arc::new(Mutex::new(mk_store(CHAIN_41)));
178+
179+
// step 0b: prepare actor to forward downstream traffic
180+
let runtime = AcTokio::new("test", 1).unwrap();
181+
let (port_tx, mut port_rx) = mpsc::channel(1);
182+
let downstream = runtime
183+
.spawn_actor(
184+
"test",
185+
|mut cell: ActoCell<ForwardEvent, AcTokioRuntime>| async move {
186+
while let ActoInput::Message(msg) = cell.recv().await {
187+
port_tx.send(msg).await.unwrap();
188+
}
189+
},
190+
)
191+
.me;
192+
193+
// step 0c: prepare a little utility
194+
fn block_on<F: Future>(runtime: &AcTokio, f: F) -> F::Output {
195+
runtime.with_rt(|rt| rt.block_on(f)).unwrap()
196+
}
197+
198+
// step 1: prepare the stage
199+
let (block_tx, block_rx) = mpsc::channel(1);
200+
let mut stage = ForwardStage::new(Some(downstream), store, 42, "127.0.0.1:0");
201+
stage.upstream.connect(ChannelRecvAdapter::Mpsc(block_rx));
202+
let tether = spawn_stage(stage, Default::default());
203+
204+
// step 2: wait for the listening event
205+
println!("stage state 1: {:?}", tether.check_state());
206+
let port = block_on(&runtime, port_rx.recv()).unwrap();
207+
let ForwardEvent::Listening(port) = port else {
208+
panic!("expected listening event, got {:?}", port);
209+
};
210+
assert_ne!(port, 0);
211+
println!("stage state 2: {:?}", tether.check_state());
212+
213+
// step 3: send the block validated event to inform the stage of the current tip
214+
let span = tracing::debug_span!("whatever");
215+
216+
let validated = block_on(&runtime, {
217+
let block_tx = &block_tx;
218+
async move {
219+
println!("sending block validated");
220+
block_tx
221+
.send(
222+
BlockValidationResult::BlockValidated(amaru_point(TIP_41_SLOT, TIP_41), span)
223+
.into(),
224+
)
225+
.await
226+
.unwrap();
227+
println!("waiting for forward event");
228+
port_rx.recv().await.unwrap()
229+
}
230+
});
231+
let ForwardEvent::Forward(p) = validated else {
232+
panic!("expected forward event, got {:?}", validated);
233+
};
234+
assert_eq!(p, point(TIP_41_SLOT, TIP_41));
235+
236+
// step 4a: connect to the stage and prove that it is still alive
237+
println!("stage state 3: {:?}", tether.check_state());
238+
let mut client = block_on(
239+
&runtime,
240+
PeerClient::connect(&format!("127.0.0.1:{port}"), 42),
241+
)
242+
.unwrap();
243+
244+
// step 4b: find the intersection point
245+
let (p, t) = block_on(&runtime, {
246+
client
247+
.chainsync()
248+
.find_intersect(vec![point(BRANCH_41_SLOT, BRANCH_41)])
249+
})
250+
.unwrap();
251+
252+
assert_eq!(p, Some(point(BRANCH_41_SLOT, BRANCH_41)));
253+
assert_eq!(t.0, point(TIP_41_SLOT, TIP_41));
254+
assert_eq!(t.1, TIP_41_HEIGHT);
255+
256+
// step 5: pull headers from the stage
257+
let headers = block_on(&runtime, async move {
258+
let mut headers = Vec::new();
259+
while let Ok(response) = client.chainsync().request_next().await {
260+
match response {
261+
NextResponse::RollForward(header, _) => headers.push(header),
262+
NextResponse::RollBackward(_, _) => panic!("unexpected roll backward"),
263+
NextResponse::Await => break,
264+
}
265+
}
266+
headers
267+
});
268+
assert_eq!(headers.len() as u64, TIP_41_HEIGHT - BRANCH_41_HEIGHT);
269+
270+
// prove that this is still alive - otherwise gasket will kill the stage
271+
drop(block_tx);
272+
273+
// Note: there’s no way to shut down the gasket stage without logging to ERRORs, sorry
274+
}

crates/amaru/src/sync.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ pub fn bootstrap(
8282

8383
let mut consensus_stage = HeaderStage::new(consensus);
8484
let mut block_forward = chain_forward::ForwardStage::new(
85+
None,
8586
chain_ref.clone(),
8687
config.network_magic as u64,
8788
&config.listen_address,

0 commit comments

Comments
 (0)