Skip to content
Merged
4 changes: 4 additions & 0 deletions crates/amaru-consensus/src/consensus/effects/store_effects.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ impl<T: SendData + Sync> ReadOnlyChainStore<BlockHeader> for Store<T> {
fn load_from_best_chain(&self, _point: &Point) -> Option<HeaderHash> {
None
}

fn next_best_chain(&self, _point: &Point) -> Option<Point> {
None
}
}

impl<T: SendData + Sync> ChainStore<BlockHeader> for Store<T> {
Expand Down
6 changes: 3 additions & 3 deletions crates/amaru-consensus/src/consensus/stages/validate_block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ pub async fn stage(
.await
}
Ok(Err(err)) => {
error!(?err, "Failed to validate a block");
error!(?err, %point, "Failed to validate a block");
eff.base()
.send(
&validation_errors,
Expand All @@ -75,7 +75,7 @@ pub async fn stage(
.await;
}
Err(err) => {
error!(?err, "Failed to roll forward block");
error!(?err, %point, "Failed to roll forward block");
eff.base()
.send(
&processing_errors,
Expand All @@ -92,7 +92,7 @@ pub async fn stage(
..
} => {
if let Err(err) = eff.ledger().rollback(&peer, &rollback_point).await {
error!(?err, "Failed to rollback");
error!(?err, %rollback_point, "Failed to rollback");
eff.base().send(&processing_errors, err).await;
} else {
eff.base()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ mod tests {
fn load_from_best_chain(&self, point: &Point) -> Option<HeaderHash> {
self.store.load_from_best_chain(point)
}

fn next_best_chain(&self, point: &Point) -> Option<Point> {
self.store.next_best_chain(point)
}
}

impl ChainStore<BlockHeader> for FailingStore {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@ impl<H: IsHeader + Clone + Send + Sync + 'static> ReadOnlyChainStore<H> for InMe
let inner = self.inner.lock().unwrap();
inner.chain.iter().find(|p| *p == point).map(|p| p.hash())
}

#[expect(clippy::unwrap_used)]
fn next_best_chain(&self, point: &Point) -> Option<Point> {
let inner = self.inner.lock().unwrap();
let min_slot = point.slot_or_default();

let next: Vec<&Point> = inner
.chain
.iter()
.filter(move |p| p.slot_or_default() > min_slot)
.take(1)
.collect();

if next.is_empty() {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

next.first().cloned()?

None
} else {
Some(next[0].clone())
}
}
}

impl<H: IsHeader + Send + Sync + Clone + 'static> ChainStore<H> for InMemConsensusStore<H> {
Expand Down
8 changes: 8 additions & 0 deletions crates/amaru-ouroboros-traits/src/stores/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ where
/// Returns `None` if the point is not in the best chain.
fn load_from_best_chain(&self, point: &Point) -> Option<HeaderHash>;

/// Return the next `Point` on the best chain following given
/// `Point`, if it exists.
fn next_best_chain(&self, point: &Point) -> Option<Point>;

fn load_block(&self, hash: &HeaderHash) -> Result<RawBlock, StoreError>;
fn get_nonces(&self, header: &HeaderHash) -> Option<Nonces>;
fn has_header(&self, hash: &HeaderHash) -> bool;
Expand Down Expand Up @@ -137,6 +141,10 @@ impl<H: IsHeader> ReadOnlyChainStore<H> for Box<dyn ChainStore<H>> {
fn load_from_best_chain(&self, point: &Point) -> Option<HeaderHash> {
self.as_ref().load_from_best_chain(point)
}

fn next_best_chain(&self, point: &Point) -> Option<Point> {
self.as_ref().next_best_chain(point)
}
}

/// A simple chain store interface that can store and retrieve headers indexed by their hash.
Expand Down
66 changes: 66 additions & 0 deletions crates/amaru-stores/src/rocksdb/consensus/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,25 @@ macro_rules! impl_ReadOnlyChainStore {
})
}

fn next_best_chain(&self, point: &Point) -> Option<Point> {
let readopts = ReadOptions::default();
let prefix = [&CHAIN_PREFIX[..], &(u64::from(point.slot_or_default()) + 1).to_be_bytes()].concat();
let mut iter = self.db.iterator_opt(IteratorMode::From(&prefix, rocksdb::Direction::Forward), readopts);

if let Some(Ok((k, v))) = iter.next() {
let slot_bytes = &k[CHAIN_PREFIX.len()..CHAIN_PREFIX.len() + 8];
let slot = u64::from_be_bytes(slot_bytes.try_into().unwrap());
if v.len() == HEADER_HASH_SIZE {
let hash = HeaderHash::from(v.as_ref());
Some(Point::Specific(slot, hash.to_vec()))
} else {
None
}
} else {
None
}
}
Comment on lines +253 to +270

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Implementation looks solid, but there's a potential panic lurking.

The logic is straightforward - start iterating from slot+1, grab the first result, parse it, done. However, line 260 has an .unwrap() that could panic if the key slice isn't exactly 8 bytes:

let slot = u64::from_be_bytes(slot_bytes.try_into().unwrap());

If the database is corrupted or the key format somehow changes, this'll bring the whole node down faster than you can say "crikey".

Consider replacing with:

-                    let slot = u64::from_be_bytes(slot_bytes.try_into().unwrap());
+                    let slot_array: [u8; 8] = match slot_bytes.try_into() {
+                        Ok(arr) => arr,
+                        Err(_) => {
+                            tracing::error!("Invalid slot bytes length in chain store");
+                            return None;
+                        }
+                    };
+                    let slot = u64::from_be_bytes(slot_array);

Or at minimum use .expect() with a descriptive message so you know what went wrong when investigating the crash.

Also, minor observation: the implementation relies on the iterator's prefix filtering to ensure keys start with CHAIN_PREFIX. This is fine, but if you're being defensive you could add an explicit check - though the iterator prefix mode should handle this already.


})*
}
}
Expand Down Expand Up @@ -729,6 +748,53 @@ pub mod test {
});
}

#[test]
fn next_best_chain_returns_successor_give_valid_point() {
with_db(|store| {
let chain = populate_db(store.clone());

let result = store
.next_best_chain(&chain[5].point())
.expect("should find successor");

assert_eq!(result, chain[6].point());
});
}

#[test]
fn next_best_chain_returns_first_point_on_chain_given_origin() {
with_db(|store| {
let chain = populate_db(store.clone());

let result = store
.next_best_chain(&Point::Origin)
.expect("should find successor");

assert_eq!(result, chain[0].point());
});
}

#[test]
fn next_best_chain_returns_none_given_point_is_not_on_chain() {
with_db(|store| {
let _chain = populate_db(store.clone());
let invalid_point = Point::Specific(100, random_hash().to_vec());

assert!(store.next_best_chain(&invalid_point).is_none());
});
}

#[test]
fn next_best_chain_returns_none_given_point_is_tip() {
with_db(|store| {
let _chain = populate_db(store.clone());
let tip = store.get_best_chain_hash();
let tip_header = store.load_header(&tip).unwrap();

assert!(store.next_best_chain(&tip_header.point()).is_none());
});
}

#[test]
fn raises_error_if_rollback_is_not_on_best_chain() {
with_db(|store| {
Expand Down
1 change: 1 addition & 0 deletions crates/amaru/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ rocksdb.workspace = true

# Internal dependencies ───────────────────────────────────────────────────────┐
amaru-iter-borrow.workspace = true
amaru-kernel = { workspace = true, features = ["test-utils"] }

[build-dependencies]
built = { workspace = true, features = ["git2"] }
Expand Down
Loading
Loading