|
13 | 13 | // limitations under the License. |
14 | 14 |
|
15 | 15 | use super::{ |
16 | | - import_headers::import_headers, |
17 | 16 | import_ledger_state::import_all_from_directory, |
18 | 17 | import_nonces::{InitialNonces, import_nonces}, |
19 | 18 | }; |
20 | 19 | use crate::cmd::DEFAULT_NETWORK; |
21 | 20 | use amaru::snapshots_dir; |
22 | | -use amaru_kernel::{Point, default_chain_dir, default_ledger_dir, network::NetworkName}; |
| 21 | +use amaru_consensus::IsHeader; |
| 22 | +use amaru_consensus::consensus::store::ChainStore; |
| 23 | +use amaru_kernel::{ |
| 24 | + Header, default_chain_dir, default_ledger_dir, from_cbor, network::NetworkName, |
| 25 | +}; |
| 26 | +use amaru_stores::rocksdb::consensus::RocksDBStore; |
23 | 27 | use async_compression::tokio::bufread::GzipDecoder; |
24 | 28 | use clap::{Parser, arg}; |
25 | 29 | use futures_util::TryStreamExt; |
| 30 | +use gasket::framework::WorkerError; |
26 | 31 | use serde::Deserialize; |
27 | 32 | use std::{ |
28 | 33 | error::Error, |
29 | | - io, |
| 34 | + io::{self}, |
30 | 35 | path::{Path, PathBuf}, |
31 | 36 | }; |
32 | 37 | use thiserror::Error; |
33 | 38 | use tokio::{ |
34 | 39 | fs::{self, File}, |
35 | | - io::BufReader, |
| 40 | + io::{AsyncReadExt, BufReader}, |
36 | 41 | }; |
37 | 42 | use tokio_util::io::StreamReader; |
38 | 43 | use tracing::info; |
@@ -116,34 +121,41 @@ pub async fn run(args: Args) -> Result<(), Box<dyn Error>> { |
116 | 121 |
|
117 | 122 | import_nonces_for_network(network, &network_dir, &chain_dir).await?; |
118 | 123 |
|
119 | | - import_headers_for_network(network, &args.peer_address, &network_dir, &chain_dir).await?; |
| 124 | + import_headers_for_network(network, &network_dir, &chain_dir).await?; |
120 | 125 |
|
121 | 126 | Ok(()) |
122 | 127 | } |
123 | 128 |
|
| 129 | +#[allow(clippy::unwrap_used)] |
124 | 130 | async fn import_headers_for_network( |
125 | 131 | network: NetworkName, |
126 | | - peer_address: &str, |
127 | 132 | config_dir: &Path, |
128 | 133 | chain_dir: &PathBuf, |
129 | 134 | ) -> Result<(), Box<dyn Error>> { |
130 | | - let headers_file: PathBuf = config_dir.join("headers.json"); |
131 | | - let content = tokio::fs::read_to_string(headers_file).await?; |
132 | | - let points: Vec<String> = serde_json::from_str(&content)?; |
133 | | - let mut initial_headers = Vec::new(); |
134 | | - for point in points { |
135 | | - match Point::try_from(point.as_str()) { |
136 | | - Ok(point) => initial_headers.push(point), |
137 | | - Err(e) => tracing::warn!("Ignoring malformed header point '{}': {}", point, e), |
| 135 | + let era_history = network.into(); |
| 136 | + let mut db = RocksDBStore::new(chain_dir, era_history)?; |
| 137 | + |
| 138 | + for entry in std::fs::read_dir(config_dir.join("headers"))? { |
| 139 | + let entry = entry?; |
| 140 | + let path = entry.path(); |
| 141 | + if path.is_file() |
| 142 | + && let Some(filename) = path.file_name().and_then(|f| f.to_str()) |
| 143 | + && filename.starts_with("chain.") |
| 144 | + && filename.ends_with(".cbor") |
| 145 | + { |
| 146 | + let mut file = File::open(&path).await |
| 147 | + .inspect_err(|reason| tracing::error!(file = %path.display(), reason = %reason, "Failed to open header file")) |
| 148 | + .map_err(|_| WorkerError::Panic)?; |
| 149 | + let mut cbor_data = Vec::new(); |
| 150 | + file.read_to_end(&mut cbor_data).await |
| 151 | + .inspect_err(|reason| tracing::error!(file = %path.display(), reason = %reason, "Failed to read header file")) |
| 152 | + .map_err(|_| WorkerError::Panic)?; |
| 153 | + let header_from_file: Header = from_cbor(&cbor_data).unwrap(); |
| 154 | + let hash = header_from_file.hash(); |
| 155 | + db.store_header(&hash, &header_from_file) |
| 156 | + .map_err(|_| WorkerError::Panic)?; |
138 | 157 | } |
139 | 158 | } |
140 | | - for hdr in initial_headers { |
141 | | - // FIXME: why do we only import 2 headers for each header listed in the |
142 | | - // config file? The 2 headers make sense, but why starting from more than |
143 | | - // one header? |
144 | | - const NUM_HEADERS_TO_IMPORT: usize = 2; |
145 | | - import_headers(peer_address, network, chain_dir, hdr, NUM_HEADERS_TO_IMPORT).await?; |
146 | | - } |
147 | 159 |
|
148 | 160 | Ok(()) |
149 | 161 | } |
|
0 commit comments