|
| 1 | +// Copyright 2025 PRAGMA |
| 2 | +// |
| 3 | +// Licensed under the Apache License, Version 2.0 (the "License"); |
| 4 | +// you may not use this file except in compliance with the License. |
| 5 | +// You may obtain a copy of the License at |
| 6 | +// |
| 7 | +// http://www.apache.org/licenses/LICENSE-2.0 |
| 8 | +// |
| 9 | +// Unless required by applicable law or agreed to in writing, software |
| 10 | +// distributed under the License is distributed on an "AS IS" BASIS, |
| 11 | +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| 12 | +// See the License for the specific language governing permissions and |
| 13 | +// limitations under the License. |
| 14 | + |
| 15 | +use crate::cmd::{DEFAULT_NETWORK, connect_to_peer}; |
| 16 | +use amaru::stages::pull; |
| 17 | +use amaru_consensus::IsHeader; |
| 18 | +use amaru_kernel::{Header, Point, from_cbor, network::NetworkName, peer::Peer}; |
| 19 | +use amaru_network::session::PeerSession; |
| 20 | +use amaru_progress_bar::{ProgressBar, new_terminal_progress_bar}; |
| 21 | +use clap::{Parser, arg}; |
| 22 | +use gasket::framework::{AsWorkError, WorkerError}; |
| 23 | +use pallas_network::miniprotocols::chainsync::{self, HeaderContent, NextResponse}; |
| 24 | +use std::{ |
| 25 | + error::Error, |
| 26 | + fs::File, |
| 27 | + io::Write, |
| 28 | + path::{Path, PathBuf}, |
| 29 | + sync::{Arc, RwLock}, |
| 30 | + time::Duration, |
| 31 | +}; |
| 32 | +use tokio::{sync::Mutex, time::timeout}; |
| 33 | + |
| 34 | +use tracing::info; |
| 35 | + |
| 36 | +#[derive(Debug, Parser)] |
| 37 | +pub struct Args { |
| 38 | + /// Network to fetch chain headers for. |
| 39 | + /// |
| 40 | + /// Should be one of 'mainnet', 'preprod', 'preview' or 'testnet:<magic>' where |
| 41 | + /// `magic` is a 32-bits unsigned value denoting a particular testnet. |
| 42 | + #[arg( |
| 43 | + long, |
| 44 | + value_name = "NETWORK", |
| 45 | + default_value_t = DEFAULT_NETWORK, |
| 46 | + )] |
| 47 | + network: NetworkName, |
| 48 | + |
| 49 | + /// Path to directory containing per-network bootstrap configuration files. |
| 50 | + /// |
| 51 | + /// This path will be used as a prefix to resolve per-network configuration files |
| 52 | + /// needed for bootstrapping and to store fetched chain headers. Given a source |
| 53 | + /// directory `data`, and a network name of `preview`, the expected layout for |
| 54 | + /// configuration files would be: |
| 55 | + /// |
| 56 | + /// * `data/preview/snapshots.json`: a list of `Snapshot` vaalues, |
| 57 | + /// * `data/preview/nonces.json`: a list of `InitialNonces` values, |
| 58 | + /// * `data/preview/headers.json`: a list of `Point`s, |
| 59 | + /// * `data/preview/headers`: a directory where the fetched chain headers will be stored. |
| 60 | + #[arg( |
| 61 | + long, |
| 62 | + value_name = "DIRECTORY", |
| 63 | + default_value = "data", |
| 64 | + verbatim_doc_comment |
| 65 | + )] |
| 66 | + config_dir: PathBuf, |
| 67 | + |
| 68 | + /// Address of the node to connect to for retrieving chain data. |
| 69 | + /// The node should be accessible via the node-2-node protocol, which |
| 70 | + /// means the remote node should be running as a validator and not |
| 71 | + /// as a client node. |
| 72 | + /// |
| 73 | + /// Address is given in the usual `host:port` format, for example: "1.2.3.4:3000". |
| 74 | + #[arg( |
| 75 | + long, |
| 76 | + value_name = "NETWORK_ADDRESS", |
| 77 | + default_value = "127.0.0.1:3001", |
| 78 | + verbatim_doc_comment |
| 79 | + )] |
| 80 | + peer_address: String, |
| 81 | +} |
| 82 | + |
| 83 | +pub async fn run(args: Args) -> Result<(), Box<dyn Error>> { |
| 84 | + info!(config=?args.config_dir, peer=%args.peer_address, network=%args.network, |
| 85 | + "fetching chain headers", |
| 86 | + ); |
| 87 | + let network = args.network; |
| 88 | + let network_dir = args.config_dir.join(&*network.to_string()); |
| 89 | + |
| 90 | + fetch_headers_for_network(network, &args.peer_address, &network_dir).await?; |
| 91 | + |
| 92 | + Ok(()) |
| 93 | +} |
| 94 | + |
| 95 | +async fn fetch_headers_for_network( |
| 96 | + network: NetworkName, |
| 97 | + peer_address: &str, |
| 98 | + config_dir: &Path, |
| 99 | +) -> Result<(), Box<dyn Error>> { |
| 100 | + let headers_file: PathBuf = config_dir.join("headers.json"); |
| 101 | + let content = tokio::fs::read_to_string(headers_file).await?; |
| 102 | + let points: Vec<String> = serde_json::from_str(&content)?; |
| 103 | + let mut initial_headers = Vec::new(); |
| 104 | + for point in points { |
| 105 | + match Point::try_from(point.as_str()) { |
| 106 | + Ok(point) => initial_headers.push(point), |
| 107 | + Err(e) => tracing::warn!("Ignoring malformed header point '{}': {}", point, e), |
| 108 | + } |
| 109 | + } |
| 110 | + for hdr in initial_headers { |
| 111 | + // FIXME: why do we only fetch 2 headers for each header listed in the |
| 112 | + // config file? The 2 headers make sense, but why starting from more than |
| 113 | + // one header? |
| 114 | + const NUM_HEADERS_TO_FETCH: usize = 2; |
| 115 | + fetch_headers(peer_address, network, config_dir, hdr, NUM_HEADERS_TO_FETCH).await?; |
| 116 | + } |
| 117 | + |
| 118 | + Ok(()) |
| 119 | +} |
| 120 | + |
| 121 | +pub(crate) async fn fetch_headers( |
| 122 | + peer_address: &str, |
| 123 | + network_name: NetworkName, |
| 124 | + config_dir: &Path, |
| 125 | + point: Point, |
| 126 | + max: usize, |
| 127 | +) -> Result<(), Box<dyn Error>> { |
| 128 | + let peer_client = Arc::new(Mutex::new( |
| 129 | + connect_to_peer(peer_address, &network_name).await?, |
| 130 | + )); |
| 131 | + |
| 132 | + let peer_session = PeerSession { |
| 133 | + peer: Peer::new(peer_address), |
| 134 | + peer_client, |
| 135 | + }; |
| 136 | + |
| 137 | + let pull = pull::Stage::new( |
| 138 | + peer_session.clone(), |
| 139 | + vec![point.clone()], |
| 140 | + Arc::new(RwLock::new(true)), |
| 141 | + ); |
| 142 | + |
| 143 | + pull.find_intersection().await?; |
| 144 | + |
| 145 | + let mut peer_client = peer_session.lock().await; |
| 146 | + let mut count = 0; |
| 147 | + |
| 148 | + let client = (*peer_client).chainsync(); |
| 149 | + |
| 150 | + let mut progress: Option<Box<dyn ProgressBar>> = None; |
| 151 | + |
| 152 | + // TODO: implement a proper pipelined client because this one is super slow |
| 153 | + // Pipelining in Haskell is single threaded which implies the code handles |
| 154 | + // scheduling between sending burst of MsgRequest and collecting responses. |
| 155 | + // Here we can do better thanks to gasket's workers: just spawn 2 workers, |
| 156 | + // one for sending requests and the other for handling responses, along |
| 157 | + // with a shared counter. |
| 158 | + // Pipelining stops when we reach the tip of the peer's chain. |
| 159 | + loop { |
| 160 | + let what = if client.has_agency() { |
| 161 | + request_next_block(client, config_dir, &mut count, &mut progress, max).await? |
| 162 | + } else { |
| 163 | + await_for_next_block(client, config_dir, &mut count, &mut progress, max).await? |
| 164 | + }; |
| 165 | + match what { |
| 166 | + Continue => continue, |
| 167 | + Stop => { |
| 168 | + if let Some(progress) = progress { |
| 169 | + progress.clear() |
| 170 | + } |
| 171 | + break; |
| 172 | + } |
| 173 | + } |
| 174 | + } |
| 175 | + info!(total = count, "header_fetched"); |
| 176 | + Ok(()) |
| 177 | +} |
| 178 | +enum What { |
| 179 | + Continue, |
| 180 | + Stop, |
| 181 | +} |
| 182 | + |
| 183 | +use What::*; |
| 184 | + |
| 185 | +async fn request_next_block( |
| 186 | + client: &mut chainsync::Client<HeaderContent>, |
| 187 | + config_dir: &Path, |
| 188 | + count: &mut usize, |
| 189 | + progress: &mut Option<Box<dyn ProgressBar>>, |
| 190 | + max: usize, |
| 191 | +) -> Result<What, WorkerError> { |
| 192 | + let next = client.request_next().await.or_restart()?; |
| 193 | + handle_response(next, config_dir, count, progress, max) |
| 194 | +} |
| 195 | + |
| 196 | +async fn await_for_next_block( |
| 197 | + client: &mut chainsync::Client<HeaderContent>, |
| 198 | + config_dir: &Path, |
| 199 | + count: &mut usize, |
| 200 | + progress: &mut Option<Box<dyn ProgressBar>>, |
| 201 | + max: usize, |
| 202 | +) -> Result<What, WorkerError> { |
| 203 | + match timeout(Duration::from_secs(1), client.recv_while_must_reply()).await { |
| 204 | + Ok(result) => result |
| 205 | + .map_err(|_| WorkerError::Recv) |
| 206 | + .and_then(|next| handle_response(next, config_dir, count, progress, max)), |
| 207 | + Err(_) => Err(WorkerError::Retry)?, |
| 208 | + } |
| 209 | +} |
| 210 | + |
| 211 | +#[allow(clippy::unwrap_used)] |
| 212 | +fn handle_response( |
| 213 | + next: NextResponse<HeaderContent>, |
| 214 | + config_dir: &Path, |
| 215 | + count: &mut usize, |
| 216 | + progress: &mut Option<Box<dyn ProgressBar>>, |
| 217 | + max: usize, |
| 218 | +) -> Result<What, WorkerError> { |
| 219 | + match next { |
| 220 | + NextResponse::RollForward(content, tip) => { |
| 221 | + let header: Header = from_cbor(&content.cbor).unwrap(); |
| 222 | + let hash = header.hash(); |
| 223 | + let slot = header.slot(); |
| 224 | + |
| 225 | + let filename = format!("chain.{}.{}.cbor", slot, hex::encode(hash)); |
| 226 | + let headers_dir = config_dir.join("headers"); |
| 227 | + std::fs::create_dir_all(&headers_dir) |
| 228 | + .inspect_err(|reason| tracing::error!(dir = %headers_dir.display(), reason = %reason, "Failed to create headers directory")) |
| 229 | + .map_err(|_| WorkerError::Panic)?; |
| 230 | + let filepath = headers_dir.join(&filename); |
| 231 | + let mut file = File::create(&filepath) |
| 232 | + .inspect_err(|reason|tracing::error!(file = %filepath.display(), reason = %reason, "Failed to create file")) |
| 233 | + .map_err(|_| WorkerError::Panic)?; |
| 234 | + file.write_all(&content.cbor) |
| 235 | + .map_err(|_| WorkerError::Panic)?; |
| 236 | + |
| 237 | + *count += 1; |
| 238 | + |
| 239 | + let slot = header.slot(); |
| 240 | + let tip_slot = tip.0.slot_or_default(); |
| 241 | + |
| 242 | + if let Some(progress) = progress { |
| 243 | + progress.tick(1) |
| 244 | + } |
| 245 | + |
| 246 | + if *count >= max || slot == tip_slot { |
| 247 | + Ok(Stop) |
| 248 | + } else { |
| 249 | + Ok(Continue) |
| 250 | + } |
| 251 | + } |
| 252 | + #[allow(clippy::unwrap_used)] |
| 253 | + NextResponse::RollBackward(point, tip) => { |
| 254 | + info!(?point, ?tip, "roll_backward"); |
| 255 | + if progress.is_none() { |
| 256 | + *progress = Some(new_terminal_progress_bar( |
| 257 | + max, |
| 258 | + " fetching headers (~{eta} left) {bar:70} {pos:>7}/{len:7} ({percent_precise}%)", |
| 259 | + )); |
| 260 | + } |
| 261 | + Ok(Continue) |
| 262 | + } |
| 263 | + NextResponse::Await => Ok(Continue), |
| 264 | + } |
| 265 | +} |
0 commit comments