Skip to content

Commit 04b1eaf

Browse files
rkuhnabailly
authored andcommitted
chore: add debug logs to chainsync server and fix bugs
Signed-off-by: Roland Kuhn <rk@rkuhn.info>
1 parent e62e425 commit 04b1eaf

5 files changed

Lines changed: 145 additions & 17 deletions

File tree

crates/amaru/src/stages/consensus/chain_forward.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,14 +105,53 @@ impl Drop for Worker {
105105
}
106106

107107
#[allow(clippy::large_enum_variant)]
108-
#[derive(Debug, Clone)]
108+
#[derive(Clone)]
109109
enum ClientOp {
110110
/// the tip to go back to
111111
Backward(Tip),
112112
/// the header to go forward to and the tip we will be at after sending this header
113113
Forward(Header, Tip),
114114
}
115115

116+
impl std::fmt::Debug for ClientOp {
117+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
118+
match self {
119+
Self::Backward(tip) => f
120+
.debug_struct("Backward")
121+
.field("tip", &(tip.1, PrettyPoint(&tip.0)))
122+
.finish(),
123+
Self::Forward(header, tip) => f
124+
.debug_struct("Forward")
125+
.field(
126+
"header",
127+
&(header.block_height(), PrettyPoint(&header.pallas_point())),
128+
)
129+
.field("tip", &(tip.1, PrettyPoint(&tip.0)))
130+
.finish(),
131+
}
132+
}
133+
}
134+
135+
struct PrettyPoint<'a>(&'a Point);
136+
137+
impl std::fmt::Debug for PrettyPoint<'_> {
138+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
139+
write!(
140+
f,
141+
"({}, {})",
142+
self.0.slot_or_default(),
143+
hex::encode(hash_point(&self.0))
144+
)
145+
}
146+
}
147+
148+
fn hash_point(point: &Point) -> Hash<32> {
149+
match point {
150+
Point::Origin => Hash::from([0; 32]),
151+
Point::Specific(_slot, hash) => Hash::from(hash.as_slice()),
152+
}
153+
}
154+
116155
impl PartialEq for ClientOp {
117156
fn eq(&self, other: &Self) -> bool {
118157
match (self, other) {

crates/amaru/src/stages/consensus/chain_forward/client_protocol.rs

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,25 +107,34 @@ async fn chain_sync(
107107
let input = cell.recv().await;
108108
match input {
109109
ActoInput::Message(ChainSyncMsg::Op(op)) => {
110+
tracing::debug!("got op {op:?}");
110111
our_tip = op.tip();
111112
state.add_op(op);
112113
if waiting {
113114
if let Some(op) = state.next_op() {
115+
tracing::debug!("sending op {op:?} to waiting handler");
114116
waiting = false;
115117
handler.send(Some((op, our_tip.clone())));
116118
}
117119
}
118120
}
119121
ActoInput::Message(ChainSyncMsg::ReqNext) => {
122+
tracing::debug!("got req next");
120123
if let Some(op) = state.next_op() {
124+
tracing::debug!("sending op {op:?} to handler");
121125
handler.send(Some((op, our_tip.clone())));
122126
} else {
127+
tracing::debug!("sending await reply");
123128
handler.send(None);
124129
waiting = true;
125130
}
126131
}
127-
ActoInput::NoMoreSenders => return Ok(()),
132+
ActoInput::NoMoreSenders => {
133+
tracing::debug!("no more senders");
134+
return Ok(());
135+
}
128136
ActoInput::Supervision { result, .. } => {
137+
tracing::debug!("supervision result: {result:?}");
129138
return result
130139
.map_err(|e| anyhow::Error::from(ClientError::HandlerFailure(e.to_string())))
131140
.and_then(|x| x);
@@ -144,24 +153,30 @@ async fn chain_sync_handler(
144153
) -> anyhow::Result<()> {
145154
loop {
146155
let Some(req) = server.recv_while_idle().await? else {
156+
tracing::debug!("client terminated");
147157
return Err(ClientError::ClientTerminated.into());
148158
};
149159
if !matches!(req, ClientRequest::RequestNext) {
160+
tracing::debug!("late intersection");
150161
return Err(ClientError::LateIntersection.into());
151162
};
163+
tracing::debug!("got req next");
152164
parent.send(ChainSyncMsg::ReqNext);
153165

154166
if let ActoInput::Message(op) = cell.recv().await {
155167
match op {
156168
Some((ClientOp::Forward(header, _), tip)) => {
169+
tracing::debug!("sending roll forward");
157170
server
158171
.send_roll_forward(to_header_content(header), tip)
159172
.await?;
160173
}
161174
Some((ClientOp::Backward(point), tip)) => {
175+
tracing::debug!("sending roll backward");
162176
server.send_roll_backward(point.0, tip).await?;
163177
}
164178
None => {
179+
tracing::debug!("sending await reply");
165180
server.send_await_reply().await?;
166181
let ActoInput::Message(Some((op, tip))) = cell.recv().await else {
167182
return Ok(());
@@ -179,6 +194,7 @@ async fn chain_sync_handler(
179194
}
180195
}
181196
} else {
197+
tracing::debug!("parent terminated");
182198
// parent terminated
183199
return Ok(());
184200
}

crates/amaru/src/stages/consensus/chain_forward/client_state.rs

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
1-
use super::ClientOp;
1+
use super::{hash_point, ClientOp};
22
use crate::stages::AsTip;
33
use amaru_consensus::{consensus::store::ChainStore, IsHeader};
4-
use amaru_kernel::{Hash, Header};
4+
use amaru_kernel::Header;
55
use pallas_network::miniprotocols::{chainsync::Tip, Point};
66
use std::collections::VecDeque;
77

@@ -19,24 +19,30 @@ impl ClientState {
1919
}
2020

2121
pub fn next_op(&mut self) -> Option<ClientOp> {
22+
tracing::debug!("next_op: {:?}", self.ops.front());
2223
self.ops.pop_front()
2324
}
2425

2526
pub fn add_op(&mut self, op: ClientOp) {
27+
tracing::debug!("add_op: {:?}", op);
2628
match op {
2729
ClientOp::Backward(tip) => {
28-
if let Some(index) = self
29-
.ops
30-
.iter()
31-
.rposition(|op| matches!(op, ClientOp::Forward(_, tip) if &tip.0 == &tip.0))
30+
if let Some((index, _)) =
31+
self.ops.iter().enumerate().rfind(
32+
|(_, op)| matches!(op, ClientOp::Forward(_, tip2) if &tip2.0 == &tip.0),
33+
)
3234
{
35+
tracing::debug!("found backward op at index {index} in {:?}", self.ops);
3336
self.ops.truncate(index + 1);
37+
tracing::debug!("last after truncate: {:?}", self.ops.back());
3438
} else {
39+
tracing::debug!("clearing ops");
3540
self.ops.clear();
3641
self.ops.push_back(ClientOp::Backward(tip));
3742
}
3843
}
3944
op @ ClientOp::Forward(..) => {
45+
tracing::debug!("adding forward op");
4046
self.ops.push_back(op);
4147
}
4248
}
@@ -85,10 +91,3 @@ pub(super) fn find_headers_between(
8591
headers.reverse();
8692
Some((headers, Tip(Point::Origin, 0)))
8793
}
88-
89-
pub(super) fn hash_point(point: &Point) -> Hash<32> {
90-
match point {
91-
Point::Origin => Hash::from([0; 32]),
92-
Point::Specific(_slot, hash) => Hash::from(hash.as_slice()),
93-
}
94-
}

crates/amaru/src/stages/consensus/chain_forward/test_infra.rs

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#![allow(dead_code)]
22

3-
use super::{ForwardEvent, ForwardStage};
3+
use super::{ForwardEvent, ForwardStage, PrettyPoint};
44
use crate::stages::PallasPoint;
55
use acto::{AcTokio, AcTokioRuntime, ActoCell, ActoInput, ActoRuntime};
66
use amaru_consensus::consensus::store::{ChainStore, Nonces, StoreError};
@@ -269,6 +269,21 @@ impl Client {
269269
ops
270270
}
271271

272+
pub fn recv_n<const N: usize>(&mut self) -> [ClientMsg; N] {
273+
let mut ops = Vec::new();
274+
for _ in 0..N {
275+
let msg = block_on(&self.runtime, self.client.chainsync().request_next()).unwrap();
276+
match msg {
277+
NextResponse::RollForward(header, tip) => {
278+
ops.push(ClientMsg::Forward(from_cbor(&header.cbor).unwrap(), tip))
279+
}
280+
NextResponse::RollBackward(point, tip) => ops.push(ClientMsg::Backward(point, tip)),
281+
NextResponse::Await => break,
282+
}
283+
}
284+
ops.try_into().unwrap()
285+
}
286+
272287
pub fn recv_after_await(&mut self) -> ClientMsg {
273288
let msg = block_on(
274289
&self.runtime,
@@ -285,12 +300,32 @@ impl Client {
285300
}
286301
}
287302

288-
#[derive(Debug, Clone)]
303+
#[derive(Clone)]
289304
pub enum ClientMsg {
290305
Forward(Header, Tip),
291306
Backward(Point, Tip),
292307
}
293308

309+
impl std::fmt::Debug for ClientMsg {
310+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
311+
match self {
312+
Self::Forward(header, tip) => f
313+
.debug_struct("Forward")
314+
.field(
315+
"header",
316+
&(header.block_height(), PrettyPoint(&header.pallas_point())),
317+
)
318+
.field("tip", &(tip.1, PrettyPoint(&tip.0)))
319+
.finish(),
320+
Self::Backward(point, tip) => f
321+
.debug_struct("Backward")
322+
.field("point", &PrettyPoint(&point))
323+
.field("tip", &(tip.1, PrettyPoint(&tip.0)))
324+
.finish(),
325+
}
326+
}
327+
}
328+
294329
impl PartialEq for ClientMsg {
295330
fn eq(&self, other: &Self) -> bool {
296331
match (self, other) {

crates/amaru/src/stages/consensus/chain_forward/tests.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,3 +131,42 @@ fn test_chain_sync() {
131131

132132
// Note: there’s no way to shut down the gasket stage without logging to ERRORs, sorry
133133
}
134+
135+
#[test]
136+
fn test_sync_optimising_rollback() {
137+
let mut setup = Setup::new(LOST_47);
138+
let chain = setup.store.get_chain(TIP_47);
139+
let lost = setup.store.get(&hash(LOST_47)).unwrap().clone();
140+
141+
let mut client = setup.connect();
142+
client.find_intersect(vec![]).0.expect("no intersection");
143+
144+
let msgs = client.recv_n::<4>();
145+
assert_eq!(
146+
msgs,
147+
[
148+
ClientMsg::Forward(chain[0].clone(), lost.as_tip()),
149+
ClientMsg::Forward(chain[1].clone(), lost.as_tip()),
150+
ClientMsg::Forward(chain[2].clone(), lost.as_tip()),
151+
ClientMsg::Forward(chain[3].clone(), lost.as_tip()),
152+
]
153+
);
154+
155+
setup.send_backward(BRANCH_47);
156+
setup.send_validated(&chain[7].hash().to_string());
157+
setup.send_validated(&chain[8].hash().to_string());
158+
setup.send_validated(&chain[9].hash().to_string());
159+
160+
let msgs = client.recv_until_await();
161+
assert_eq!(
162+
msgs,
163+
[
164+
ClientMsg::Forward(chain[4].clone(), chain[9].as_tip()),
165+
ClientMsg::Forward(chain[5].clone(), chain[9].as_tip()),
166+
ClientMsg::Forward(chain[6].clone(), chain[9].as_tip()),
167+
ClientMsg::Forward(chain[7].clone(), chain[9].as_tip()),
168+
ClientMsg::Forward(chain[8].clone(), chain[9].as_tip()),
169+
ClientMsg::Forward(chain[9].clone(), chain[9].as_tip()),
170+
]
171+
);
172+
}

0 commit comments

Comments
 (0)