Skip to content

Commit 97125bb

Browse files
committed
feat: improve with review comments
Signed-off-by: etorreborre <etorreborre@yahoo.com>
1 parent a6d039c commit 97125bb

3 files changed

Lines changed: 31 additions & 19 deletions

File tree

crates/amaru/src/stages/consensus/forward_chain/tcp_forward_chain_server.rs

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ impl TcpForwardChainServer {
5959
}
6060

6161
/// Creates a new TcpForwardChainServer instance with a provided Acto runtime and TcpListener.
62-
#[expect(clippy::unwrap_used)]
62+
#[expect(clippy::expect_used)]
6363
pub fn create(
6464
store: Arc<dyn ChainStore<Header>>,
6565
tcp_listener: TcpListener,
@@ -92,7 +92,10 @@ impl TcpForwardChainServer {
9292
// in particular, it isn’t possible to poll for new peers within the `schedule` method
9393
match PeerServer::accept(&tcp_listener, network_magic).await {
9494
Ok(peer) => {
95-
let our_tip = our_tip_clone.lock().unwrap().clone();
95+
let our_tip = our_tip_clone
96+
.lock()
97+
.expect("poisoned lock for our tip")
98+
.clone();
9699
clients_clone.send(ClientMsg::Peer(
97100
peer,
98101
Tip(our_tip.point().pallas_point(), our_tip.block_height()),
@@ -123,11 +126,14 @@ impl ForwardEventListener for TcpForwardChainServer {
123126
async fn send(&self, event: ForwardEvent) -> anyhow::Result<()> {
124127
match event {
125128
ForwardEvent::Forward(header) => {
126-
let mut our_tip = self
127-
.our_tip
128-
.lock()
129-
.map_err(|e| anyhow::anyhow!("Mutex poisoned: {}", e))?;
130-
*our_tip = header.as_header_tip();
129+
{
130+
let mut our_tip = self
131+
.our_tip
132+
.lock()
133+
.map_err(|e| anyhow::anyhow!("Mutex poisoned: {}", e))?;
134+
*our_tip = header.as_header_tip();
135+
};
136+
131137
self.clients.send(ClientMsg::Op(ClientOp::Forward(header)));
132138
Ok(())
133139
}

simulation/amaru-sim/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
14+
#![feature(generic_atomic)]
1415

1516
pub mod echo;
1617
pub mod simulator;

simulation/amaru-sim/src/simulator/run.rs

Lines changed: 17 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -51,9 +51,10 @@ use pure_stage::trace_buffer::TraceBuffer;
5151
use pure_stage::{Instant, Receiver, StageGraph, StageRef};
5252
use std::path::PathBuf;
5353
use std::sync::Arc;
54+
use std::sync::atomic::{AtomicU64, Ordering};
5455
use std::time::Duration;
5556
use tokio::runtime::Runtime;
56-
use tokio::sync::{Mutex, mpsc};
57+
use tokio::sync::mpsc;
5758
use tracing::{Span, info};
5859

5960
/// Run the full simulation:
@@ -294,7 +295,7 @@ pub struct MockForwardEventListener {
294295
node_id: String,
295296
number_of_downstream_peers: u8,
296297
sender: mpsc::Sender<Envelope<ChainSyncMessage>>,
297-
msg_id: Arc<Mutex<u64>>,
298+
msg_id: Arc<AtomicU64>,
298299
}
299300

300301
impl MockForwardEventListener {
@@ -306,7 +307,7 @@ impl MockForwardEventListener {
306307
Self {
307308
node_id,
308309
number_of_downstream_peers,
309-
msg_id: Arc::new(Mutex::new(0)),
310+
msg_id: Arc::new(AtomicU64::new(0)),
310311
sender,
311312
}
312313
}
@@ -337,16 +338,20 @@ impl ForwardEventListener for MockForwardEventListener {
337338
}
338339
}
339340

341+
// This allocates a range of message ids from
342+
// self.msg_id to self.msg_id + number_of_downstream_peers
343+
let base_msg_id = self
344+
.msg_id
345+
.fetch_add(self.number_of_downstream_peers as u64, Ordering::Relaxed);
346+
340347
for i in 1..=self.number_of_downstream_peers {
341-
let envelope = {
342-
let mut msg_id = self.msg_id.lock().await;
343-
let envelope = Envelope {
344-
src: self.node_id.clone(),
345-
dest: format!("c{}", i),
346-
body: message(&event, *msg_id),
347-
};
348-
*msg_id += 1;
349-
envelope
348+
let dest = format!("c{}", i);
349+
let msg_id = base_msg_id + i as u64;
350+
println!("msg id {}", msg_id);
351+
let envelope = Envelope {
352+
src: self.node_id.clone(),
353+
dest,
354+
body: message(&event, msg_id),
350355
};
351356
self.sender.send(envelope).await?;
352357
}

0 commit comments

Comments
 (0)