Skip to content

Commit 946e0c9

Browse files
committed
chore: factorize header importation
import-headers and bootstrap both use the same peerless tactic to import headers from source `data` directory. Signed-off-by: Pascal Grange <pascal@grange.nom.fr>
1 parent e853d6d commit 946e0c9

4 files changed

Lines changed: 55 additions & 198 deletions

File tree

Makefile

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,9 +54,7 @@ import-headers: ## Import headers from $PEER_ADDRESS for demo
5454
cargo run --profile $(BUILD_PROFILE) -- import-headers \
5555
--network $(NETWORK) \
5656
--chain-dir $(CHAIN_DIR) \
57-
--peer-address $(PEER_ADDRESS) \
58-
--starting-point $$HEADER \
59-
--count 2; \
57+
--config-dir $(CONFIG_FOLDER); \
6058
done
6159

6260
import-nonces: ## Import nonces for demo

crates/amaru/src/bin/amaru/cmd/bootstrap.rs

Lines changed: 3 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -13,21 +13,16 @@
1313
// limitations under the License.
1414

1515
use super::{
16+
import_headers::import_headers_for_network,
1617
import_ledger_state::import_all_from_directory,
1718
import_nonces::{InitialNonces, import_nonces},
1819
};
1920
use crate::cmd::DEFAULT_NETWORK;
2021
use amaru::snapshots_dir;
21-
use amaru_consensus::IsHeader;
22-
use amaru_consensus::consensus::store::ChainStore;
23-
use amaru_kernel::{
24-
Header, default_chain_dir, default_ledger_dir, from_cbor, network::NetworkName,
25-
};
26-
use amaru_stores::rocksdb::consensus::RocksDBStore;
22+
use amaru_kernel::{default_chain_dir, default_ledger_dir, network::NetworkName};
2723
use async_compression::tokio::bufread::GzipDecoder;
2824
use clap::{Parser, arg};
2925
use futures_util::TryStreamExt;
30-
use gasket::framework::WorkerError;
3126
use serde::Deserialize;
3227
use std::{
3328
error::Error,
@@ -37,7 +32,7 @@ use std::{
3732
use thiserror::Error;
3833
use tokio::{
3934
fs::{self, File},
40-
io::{AsyncReadExt, BufReader},
35+
io::BufReader,
4136
};
4237
use tokio_util::io::StreamReader;
4338
use tracing::info;
@@ -112,40 +107,6 @@ pub async fn run(args: Args) -> Result<(), Box<dyn Error>> {
112107
Ok(())
113108
}
114109

115-
#[allow(clippy::unwrap_used)]
116-
async fn import_headers_for_network(
117-
network: NetworkName,
118-
config_dir: &Path,
119-
chain_dir: &PathBuf,
120-
) -> Result<(), Box<dyn Error>> {
121-
let era_history = network.into();
122-
let mut db = RocksDBStore::new(chain_dir, era_history)?;
123-
124-
for entry in std::fs::read_dir(config_dir.join("headers"))? {
125-
let entry = entry?;
126-
let path = entry.path();
127-
if path.is_file()
128-
&& let Some(filename) = path.file_name().and_then(|f| f.to_str())
129-
&& filename.starts_with("header.")
130-
&& filename.ends_with(".cbor")
131-
{
132-
let mut file = File::open(&path).await
133-
.inspect_err(|reason| tracing::error!(file = %path.display(), reason = %reason, "Failed to open header file"))
134-
.map_err(|_| WorkerError::Panic)?;
135-
let mut cbor_data = Vec::new();
136-
file.read_to_end(&mut cbor_data).await
137-
.inspect_err(|reason| tracing::error!(file = %path.display(), reason = %reason, "Failed to read header file"))
138-
.map_err(|_| WorkerError::Panic)?;
139-
let header_from_file: Header = from_cbor(&cbor_data).unwrap();
140-
let hash = header_from_file.hash();
141-
db.store_header(&hash, &header_from_file)
142-
.map_err(|_| WorkerError::Panic)?;
143-
}
144-
}
145-
146-
Ok(())
147-
}
148-
149110
async fn import_nonces_for_network(
150111
network: NetworkName,
151112
config_dir: &Path,

crates/amaru/src/bin/amaru/cmd/import_headers.rs

Lines changed: 50 additions & 152 deletions
Original file line numberDiff line numberDiff line change
@@ -12,31 +12,20 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use crate::cmd::connect_to_peer;
1615
use amaru_consensus::{IsHeader, consensus::store::ChainStore};
17-
use amaru_kernel::{Header, Point, default_chain_dir, from_cbor, network::NetworkName, peer::Peer};
18-
use amaru_network::chain_sync_client::ChainSyncClient;
19-
use amaru_progress_bar::{ProgressBar, new_terminal_progress_bar};
16+
use amaru_kernel::{Header, default_chain_dir, from_cbor, network::NetworkName};
2017
use amaru_stores::rocksdb::consensus::RocksDBStore;
2118
use clap::Parser;
2219
use gasket::framework::*;
23-
use pallas_network::miniprotocols::chainsync::{HeaderContent, NextResponse};
24-
use std::{error::Error, path::PathBuf, time::Duration};
25-
use tokio::time::timeout;
26-
use tracing::info;
20+
use std::{
21+
error::Error,
22+
path::{Path, PathBuf},
23+
};
24+
use tokio::{fs::File, io::AsyncReadExt};
2725

2826
#[derive(Debug, Parser)]
2927
pub struct Args {
30-
/// Address of the node to connect to for retrieving chain data.
31-
/// The node should be accessible via the node-2-node protocol, which
32-
/// means the remote node should be running as a validator and not
33-
/// as a client node.
34-
///
35-
/// Addressis given in the usual `host:port` format, for example: "1.2.3.4:3000".
36-
#[arg(long, value_name = "NETWORK_ADDRESS", verbatim_doc_comment)]
37-
peer_address: String,
38-
39-
/// Network to use for the connection.
28+
/// Network for which we are importing headers.
4029
///
4130
/// Should be one of 'mainnet', 'preprod', 'preview' or 'testnet:<magic>' where
4231
/// `magic` is a 32-bits unsigned value denoting a particular testnet.
@@ -54,152 +43,61 @@ pub struct Args {
5443
#[arg(long, value_name = "DIR", verbatim_doc_comment)]
5544
chain_dir: Option<PathBuf>,
5645

57-
/// Starting point of import.
46+
/// Path to directory containing per-network configuration files.
47+
///
48+
/// This path will be used as a prefix to resolve per-network configuration files
49+
/// needed for importing headers. Given a source directory `data`, and a
50+
/// a network name of `preview`, the expected layout for header files would be:
5851
///
59-
/// This is the "intersection" point which will be given to the peer as a starting point
60-
/// to import the chain database.
61-
#[arg(long, value_name = "POINT", verbatim_doc_comment, value_parser = |s: &str| Point::try_from(s)
52+
/// `data/preview/headers/header.*.*.cbor`
53+
#[arg(
54+
long,
55+
value_name = "DIRECTORY",
56+
default_value = "data",
57+
verbatim_doc_comment
6258
)]
63-
starting_point: Point,
64-
65-
/// Number of headers to import.
66-
/// Maximum number of headers to import from the `peer`.
67-
/// By default, it will retrieve all headers until it reaches the tip of the peer's chain.
68-
#[arg(long, value_name = "UINT", verbatim_doc_comment, default_value_t = usize::MAX)]
69-
count: usize,
70-
}
71-
72-
enum What {
73-
Continue,
74-
Stop,
59+
config_dir: PathBuf,
7560
}
7661

77-
use What::*;
78-
7962
pub async fn run(args: Args) -> Result<(), Box<dyn std::error::Error>> {
63+
let network = args.network;
64+
let network_dir = args.config_dir.join(&*network.to_string());
8065
let chain_dir = args
8166
.chain_dir
8267
.unwrap_or_else(|| default_chain_dir(args.network).into());
83-
import_headers(
84-
&args.peer_address,
85-
args.network,
86-
&chain_dir,
87-
args.starting_point,
88-
args.count,
89-
)
90-
.await
68+
import_headers_for_network(args.network, &network_dir, &chain_dir).await
9169
}
9270

93-
pub(crate) async fn import_headers(
94-
peer_address: &str,
95-
network_name: NetworkName,
96-
chain_db_dir: &PathBuf,
97-
point: Point,
98-
max: usize,
71+
#[allow(clippy::unwrap_used)]
72+
pub(crate) async fn import_headers_for_network(
73+
network: NetworkName,
74+
config_dir: &Path,
75+
chain_dir: &PathBuf,
9976
) -> Result<(), Box<dyn Error>> {
100-
let era_history = network_name.into();
101-
let mut db = RocksDBStore::new(chain_db_dir, era_history)?;
102-
103-
let peer_client = connect_to_peer(peer_address, &network_name).await?;
104-
let mut client = ChainSyncClient::new(
105-
Peer::new(peer_address),
106-
peer_client.chainsync,
107-
vec![point.clone()],
108-
);
109-
110-
client.find_intersection().await?;
111-
112-
let mut count = 0;
113-
114-
let mut progress: Option<Box<dyn ProgressBar>> = None;
115-
116-
loop {
117-
let what = if client.has_agency() {
118-
request_next_block(&mut client, &mut db, &mut count, &mut progress, max).await?
119-
} else {
120-
await_for_next_block(&mut client, &mut db, &mut count, &mut progress, max).await?
121-
};
122-
match what {
123-
Continue => continue,
124-
Stop => {
125-
if let Some(progress) = progress {
126-
progress.clear()
127-
}
128-
break;
129-
}
130-
}
131-
}
132-
info!(total = count, "header_imported");
133-
Ok(())
134-
}
135-
136-
async fn request_next_block(
137-
client: &mut ChainSyncClient,
138-
db: &mut RocksDBStore,
139-
count: &mut usize,
140-
progress: &mut Option<Box<dyn ProgressBar>>,
141-
max: usize,
142-
) -> Result<What, WorkerError> {
143-
let next = client.request_next().await.or_restart()?;
144-
handle_response(next, db, count, progress, max)
145-
}
146-
147-
async fn await_for_next_block(
148-
client: &mut ChainSyncClient,
149-
db: &mut RocksDBStore,
150-
count: &mut usize,
151-
progress: &mut Option<Box<dyn ProgressBar>>,
152-
max: usize,
153-
) -> Result<What, WorkerError> {
154-
match timeout(Duration::from_secs(1), client.await_next()).await {
155-
Ok(result) => result
156-
.map_err(|_| WorkerError::Recv)
157-
.and_then(|next| handle_response(next, db, count, progress, max)),
158-
Err(_) => Err(WorkerError::Retry)?,
159-
}
160-
}
161-
162-
#[expect(clippy::unwrap_used)]
163-
fn handle_response(
164-
next: NextResponse<HeaderContent>,
165-
db: &mut RocksDBStore,
166-
count: &mut usize,
167-
progress: &mut Option<Box<dyn ProgressBar>>,
168-
max: usize,
169-
) -> Result<What, WorkerError> {
170-
match next {
171-
NextResponse::RollForward(content, tip) => {
172-
let header: Header = from_cbor(&content.cbor).unwrap();
173-
let hash = header.hash();
174-
175-
db.store_header(&hash, &header)
77+
let era_history = network.into();
78+
let mut db = RocksDBStore::new(chain_dir, era_history)?;
79+
80+
for entry in std::fs::read_dir(config_dir.join("headers"))? {
81+
let entry = entry?;
82+
let path = entry.path();
83+
if path.is_file()
84+
&& let Some(filename) = path.file_name().and_then(|f| f.to_str())
85+
&& filename.starts_with("header.")
86+
&& filename.ends_with(".cbor")
87+
{
88+
let mut file = File::open(&path).await
89+
.inspect_err(|reason| tracing::error!(file = %path.display(), reason = %reason, "Failed to open header file"))
90+
.map_err(|_| WorkerError::Panic)?;
91+
let mut cbor_data = Vec::new();
92+
file.read_to_end(&mut cbor_data).await
93+
.inspect_err(|reason| tracing::error!(file = %path.display(), reason = %reason, "Failed to read header file"))
94+
.map_err(|_| WorkerError::Panic)?;
95+
let header_from_file: Header = from_cbor(&cbor_data).unwrap();
96+
let hash = header_from_file.hash();
97+
db.store_header(&hash, &header_from_file)
17698
.map_err(|_| WorkerError::Panic)?;
177-
178-
*count += 1;
179-
180-
let slot = header.slot();
181-
let tip_slot = tip.0.slot_or_default();
182-
183-
if let Some(progress) = progress {
184-
progress.tick(1)
185-
}
186-
187-
if *count >= max || slot == tip_slot {
188-
Ok(Stop)
189-
} else {
190-
Ok(Continue)
191-
}
192-
}
193-
NextResponse::RollBackward(point, tip) => {
194-
info!(?point, ?tip, "roll_backward");
195-
if progress.is_none() {
196-
*progress = Some(new_terminal_progress_bar(
197-
max,
198-
" importing headers (~{eta} left) {bar:70} {pos:>7}/{len:7} ({percent_precise}%)",
199-
));
200-
}
201-
Ok(Continue)
20299
}
203-
NextResponse::Await => Ok(Continue),
204100
}
101+
102+
Ok(())
205103
}

crates/amaru/src/bin/amaru/main.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ enum Command {
4646
#[clap(alias = "import")]
4747
ImportLedgerState(cmd::import_ledger_state::Args),
4848

49-
/// Import block headers from another (live) node.
49+
/// Import block headers from `${config_dir}/${network name}/`
5050
#[clap(alias = "import-chain-db")]
5151
ImportHeaders(cmd::import_headers::Args),
5252

0 commit comments

Comments
 (0)