Skip to content

Commit f63b073

Browse files
committed
feat: Can fetch headers from peer
We fetch headers from peer to store them next to the snapshots. The bootstrap process will start from them instead of fetching them on demand. That way, we can move towards a peer-less bootstrap. Signed-off-by: Pascal Grange <pascal@grange.nom.fr>
1 parent 87d787d commit f63b073

13 files changed

Lines changed: 286 additions & 0 deletions

Makefile

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,12 @@ download-haskell-config: ## Download Cardano Haskell configuration for $NETWORK
8282
clear-dbs: ## Clear the databases
8383
@rm -rf $(LEDGER_DIR) $(CHAIN_DIR)
8484

85+
fetch-chain-headers: $(CONFIG_FOLDER)/$(NETWORK)/ ## Fetch chain headers from the network
86+
cargo run --profile $(BUILD_PROFILE) -- fetch-chain-headers \
87+
--peer-address $(PEER_ADDRESS) \
88+
--config-dir $(CONFIG_FOLDER) \
89+
--network $(NETWORK)
90+
8591
bootstrap: clear-dbs ## Bootstrap the node from scratch
8692
cargo run --profile $(BUILD_PROFILE) -- bootstrap \
8793
--peer-address $(PEER_ADDRESS) \
Lines changed: 265 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,265 @@
1+
// Copyright 2025 PRAGMA
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
use crate::cmd::{DEFAULT_NETWORK, connect_to_peer};
16+
use amaru::stages::pull;
17+
use amaru_consensus::IsHeader;
18+
use amaru_kernel::{Header, Point, from_cbor, network::NetworkName, peer::Peer};
19+
use amaru_network::session::PeerSession;
20+
use amaru_progress_bar::{ProgressBar, new_terminal_progress_bar};
21+
use clap::{Parser, arg};
22+
use gasket::framework::{AsWorkError, WorkerError};
23+
use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse};
24+
use std::{
25+
error::Error,
26+
fs::File,
27+
io::Write,
28+
path::{Path, PathBuf},
29+
sync::{Arc, RwLock},
30+
time::Duration,
31+
};
32+
use tokio::{sync::Mutex, time::timeout};
33+
34+
use tracing::info;
35+
36+
#[derive(Debug, Parser)]
37+
pub struct Args {
38+
/// Network to fetch chain headers for.
39+
///
40+
/// Should be one of 'mainnet', 'preprod', 'preview' or 'testnet:<magic>' where
41+
/// `magic` is a 32-bits unsigned value denoting a particular testnet.
42+
#[arg(
43+
long,
44+
value_name = "NETWORK",
45+
default_value_t = DEFAULT_NETWORK,
46+
)]
47+
network: NetworkName,
48+
49+
/// Path to directory containing per-network bootstrap configuration files.
50+
///
51+
/// This path will be used as a prefix to resolve per-network configuration files
52+
/// needed for bootstrapping and to store fetched chain headers. Given a source
53+
/// directory `data`, and a network name of `preview`, the expected layout for
54+
/// configuration files would be:
55+
///
56+
/// * `data/preview/snapshots.json`: a list of `Snapshot` vaalues,
57+
/// * `data/preview/nonces.json`: a list of `InitialNonces` values,
58+
/// * `data/preview/headers.json`: a list of `Point`s,
59+
/// * `data/preview/headers`: a directory where the fetched chain headers will be stored.
60+
#[arg(
61+
long,
62+
value_name = "DIRECTORY",
63+
default_value = "data",
64+
verbatim_doc_comment
65+
)]
66+
config_dir: PathBuf,
67+
68+
/// Address of the node to connect to for retrieving chain data.
69+
/// The node should be accessible via the node-2-node protocol, which
70+
/// means the remote node should be running as a validator and not
71+
/// as a client node.
72+
///
73+
/// Address is given in the usual `host:port` format, for example: "1.2.3.4:3000".
74+
#[arg(
75+
long,
76+
value_name = "NETWORK_ADDRESS",
77+
default_value = "127.0.0.1:3001",
78+
verbatim_doc_comment
79+
)]
80+
peer_address: String,
81+
}
82+
83+
pub async fn run(args: Args) -> Result<(), Box<dyn Error>> {
84+
info!(config=?args.config_dir, peer=%args.peer_address, network=%args.network,
85+
"fetching chain headers",
86+
);
87+
let network = args.network;
88+
let network_dir = args.config_dir.join(&*network.to_string());
89+
90+
fetch_headers_for_network(network, &args.peer_address, &network_dir).await?;
91+
92+
Ok(())
93+
}
94+
95+
async fn fetch_headers_for_network(
96+
network: NetworkName,
97+
peer_address: &str,
98+
config_dir: &Path,
99+
) -> Result<(), Box<dyn Error>> {
100+
let headers_file: PathBuf = config_dir.join("headers.json");
101+
let content = tokio::fs::read_to_string(headers_file).await?;
102+
let points: Vec<String> = serde_json::from_str(&content)?;
103+
let mut initial_headers = Vec::new();
104+
for point in points {
105+
match Point::try_from(point.as_str()) {
106+
Ok(point) => initial_headers.push(point),
107+
Err(e) => tracing::warn!("Ignoring malformed header point '{}': {}", point, e),
108+
}
109+
}
110+
for hdr in initial_headers {
111+
// FIXME: why do we only fetch 2 headers for each header listed in the
112+
// config file? The 2 headers make sense, but why starting from more than
113+
// one header?
114+
const NUM_HEADERS_TO_FETCH: usize = 2;
115+
fetch_headers(peer_address, network, config_dir, hdr, NUM_HEADERS_TO_FETCH).await?;
116+
}
117+
118+
Ok(())
119+
}
120+
121+
pub(crate) async fn fetch_headers(
122+
peer_address: &str,
123+
network_name: NetworkName,
124+
config_dir: &Path,
125+
point: Point,
126+
max: usize,
127+
) -> Result<(), Box<dyn Error>> {
128+
let peer_client = Arc::new(Mutex::new(
129+
connect_to_peer(peer_address, &network_name).await?,
130+
));
131+
132+
let peer_session = PeerSession {
133+
peer: Peer::new(peer_address),
134+
peer_client,
135+
};
136+
137+
let pull = pull::Stage::new(
138+
peer_session.clone(),
139+
vec![point.clone()],
140+
Arc::new(RwLock::new(true)),
141+
);
142+
143+
pull.find_intersection().await?;
144+
145+
let mut peer_client = peer_session.lock().await;
146+
let mut count = 0;
147+
148+
let client = (*peer_client).chainsync();
149+
150+
let mut progress: Option<Box<dyn ProgressBar>> = None;
151+
152+
// TODO: implement a proper pipelined client because this one is super slow
153+
// Pipelining in Haskell is single threaded which implies the code handles
154+
// scheduling between sending burst of MsgRequest and collecting responses.
155+
// Here we can do better thanks to gasket's workers: just spawn 2 workers,
156+
// one for sending requests and the other for handling responses, along
157+
// with a shared counter.
158+
// Pipelining stops when we reach the tip of the peer's chain.
159+
loop {
160+
let what = if client.has_agency() {
161+
request_next_block(client, config_dir, &mut count, &mut progress, max).await?
162+
} else {
163+
await_for_next_block(client, config_dir, &mut count, &mut progress, max).await?
164+
};
165+
match what {
166+
Continue => continue,
167+
Stop => {
168+
if let Some(progress) = progress {
169+
progress.clear()
170+
}
171+
break;
172+
}
173+
}
174+
}
175+
info!(total = count, "header_fetched");
176+
Ok(())
177+
}
178+
enum What {
179+
Continue,
180+
Stop,
181+
}
182+
183+
use What::*;
184+
185+
async fn request_next_block(
186+
client: &mut chainsync::Client<HeaderContent>,
187+
config_dir: &Path,
188+
count: &mut usize,
189+
progress: &mut Option<Box<dyn ProgressBar>>,
190+
max: usize,
191+
) -> Result<What, WorkerError> {
192+
let next = client.request_next().await.or_restart()?;
193+
handle_response(next, config_dir, count, progress, max)
194+
}
195+
196+
async fn await_for_next_block(
197+
client: &mut chainsync::Client<HeaderContent>,
198+
config_dir: &Path,
199+
count: &mut usize,
200+
progress: &mut Option<Box<dyn ProgressBar>>,
201+
max: usize,
202+
) -> Result<What, WorkerError> {
203+
match timeout(Duration::from_secs(1), client.recv_while_must_reply()).await {
204+
Ok(result) => result
205+
.map_err(|_| WorkerError::Recv)
206+
.and_then(|next| handle_response(next, config_dir, count, progress, max)),
207+
Err(_) => Err(WorkerError::Retry)?,
208+
}
209+
}
210+
211+
#[allow(clippy::unwrap_used)]
212+
fn handle_response(
213+
next: NextResponse<HeaderContent>,
214+
config_dir: &Path,
215+
count: &mut usize,
216+
progress: &mut Option<Box<dyn ProgressBar>>,
217+
max: usize,
218+
) -> Result<What, WorkerError> {
219+
match next {
220+
NextResponse::RollForward(content, tip) => {
221+
let header: Header = from_cbor(&content.cbor).unwrap();
222+
let hash = header.hash();
223+
let slot = header.slot();
224+
225+
let filename = format!("chain.{}.{}.cbor", slot, hex::encode(hash));
226+
let headers_dir = config_dir.join("headers");
227+
std::fs::create_dir_all(&headers_dir)
228+
.inspect_err(|reason| tracing::error!(dir = %headers_dir.display(), reason = %reason, "Failed to create headers directory"))
229+
.map_err(|_| WorkerError::Panic)?;
230+
let filepath = headers_dir.join(&filename);
231+
let mut file = File::create(&filepath)
232+
.inspect_err(|reason|tracing::error!(file = %filepath.display(), reason = %reason, "Failed to create file"))
233+
.map_err(|_| WorkerError::Panic)?;
234+
file.write_all(&content.cbor)
235+
.map_err(|_| WorkerError::Panic)?;
236+
237+
*count += 1;
238+
239+
let slot = header.slot();
240+
let tip_slot = tip.0.slot_or_default();
241+
242+
if let Some(progress) = progress {
243+
progress.tick(1)
244+
}
245+
246+
if *count >= max || slot == tip_slot {
247+
Ok(Stop)
248+
} else {
249+
Ok(Continue)
250+
}
251+
}
252+
#[allow(clippy::unwrap_used)]
253+
NextResponse::RollBackward(point, tip) => {
254+
info!(?point, ?tip, "roll_backward");
255+
if progress.is_none() {
256+
*progress = Some(new_terminal_progress_bar(
257+
max,
258+
" fetching headers (~{eta} left) {bar:70} {pos:>7}/{len:7} ({percent_precise}%)",
259+
));
260+
}
261+
Ok(Continue)
262+
}
263+
NextResponse::Await => Ok(Continue),
264+
}
265+
}

crates/amaru/src/bin/amaru/cmd/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ use amaru_kernel::network::NetworkName;
1616
use pallas_network::facades::PeerClient;
1717

1818
pub(crate) mod bootstrap;
19+
pub(crate) mod fetch_chain_headers;
1920
pub(crate) mod daemon;
2021
pub(crate) mod import_headers;
2122
pub(crate) mod import_ledger_state;

crates/amaru/src/bin/amaru/main.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ enum Command {
3737
/// **NOTE**: Only `preprod` network is supported for now.
3838
Bootstrap(cmd::bootstrap::Args),
3939

40+
FetchChainHeaders(cmd::fetch_chain_headers::Args),
41+
4042
/// Run the node in all its glory.
4143
Daemon(cmd::daemon::Args),
4244

@@ -114,6 +116,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
114116
Command::ImportHeaders(args) => cmd::import_headers::run(args).await,
115117
Command::ImportNonces(args) => cmd::import_nonces::run(args).await,
116118
Command::Bootstrap(args) => cmd::bootstrap::run(args).await,
119+
Command::FetchChainHeaders(args) => cmd::fetch_chain_headers::run(args).await,
117120
};
118121

119122
// TODO: we might also want to integrate this into a graceful shutdown system, and into a panic hook

data/README.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,3 +84,14 @@ For example, for PreProd, that means removing all immutable chunks after `03149`
8484
3. Start your node.
8585

8686
Both Ogmios and the fetch script will automatically connect to the node once ready and capture data from it when it becomes available. This should terminate once all the points have been processed.
87+
88+
4. Dump chain headers as CBOR
89+
90+
Go to home directory `cd ..` and make fetch-chain-header :
91+
92+
```
93+
cd ..
94+
make fetch-chain-header
95+
```
96+
97+
This will add `chain.<slot>.<hash>.cbor` files in the snapshot directory that can then be used to bootstraped the amaru node.

0 commit comments

Comments
 (0)