Skip to content

Commit 7757809

Browse files
authored
Merge pull request #341 from pragma-org/stevan/concurrent-requests
Stevan/concurrent requests
2 parents 3d26591 + 84da4db commit 7757809

4 files changed

Lines changed: 308 additions & 141 deletions

File tree

simulation/amaru-sim/src/simulator/generate.rs

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,23 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15-
use proptest::prelude::*;
1615
use pure_stage::Instant;
1716
use rand::rngs::StdRng;
1817
use rand::Rng;
19-
use rand::SeedableRng;
2018
use rand_distr::{Distribution, Exp};
2119
use serde::Deserialize;
2220
use serde_json::Result;
21+
use std::cmp::Reverse;
2322
use std::collections::{BTreeMap, BTreeSet};
2423
use std::fmt;
2524
use std::fs;
2625
use std::path::PathBuf;
2726
use std::time::Duration;
2827

28+
use crate::echo::Envelope;
29+
2930
use super::bytes::Bytes;
31+
use super::simulate::Entry;
3032
use super::sync::ChainSyncMessage;
3133
use slot_arithmetic::Slot;
3234

@@ -298,18 +300,6 @@ pub fn generate_inputs<R: Rng>(rng: &mut R, file_path: &PathBuf) -> Result<Vec<C
298300
}
299301
}
300302

301-
pub fn generate_inputs_strategy(
302-
file_path: &PathBuf,
303-
seed: Option<u64>,
304-
) -> impl Strategy<Value = Vec<ChainSyncMessage>> + use<'_> {
305-
any::<u64>().prop_map(move |s| {
306-
let seed = seed.unwrap_or(s);
307-
let mut rng = StdRng::seed_from_u64(seed);
308-
println!("seed {}", seed);
309-
generate_inputs(&mut rng, file_path).unwrap()
310-
})
311-
}
312-
313303
pub fn generate_arrival_times<R: Rng>(
314304
rng: &mut R,
315305
start_time: Instant,
@@ -329,12 +319,89 @@ pub fn generate_arrival_times<R: Rng>(
329319
arrival_times
330320
}
331321

322+
pub fn generate_entries<R: Rng>(
323+
file_path: &PathBuf,
324+
start_time: Instant,
325+
mean_millis: f64,
326+
number_of_upstream_peers: u8,
327+
) -> impl Fn(&mut R) -> Vec<Reverse<Entry<ChainSyncMessage>>> + use<'_, R> {
328+
move |rng| {
329+
let mut entries: Vec<Reverse<Entry<ChainSyncMessage>>> = vec![];
330+
for client in 1..=number_of_upstream_peers {
331+
let messages =
332+
generate_inputs(rng, file_path).expect("Failed to generate inputs from chain file");
333+
let arrival_times =
334+
generate_arrival_times(rng, start_time, mean_millis, messages.len());
335+
entries.extend(
336+
messages
337+
.into_iter()
338+
.enumerate()
339+
.map(|(idx, msg)| {
340+
Reverse(Entry {
341+
arrival_time: arrival_times[idx],
342+
envelope: Envelope {
343+
src: "c".to_owned() + &client.to_string(),
344+
dest: "n1".to_string(),
345+
body: msg,
346+
},
347+
})
348+
})
349+
.collect::<Vec<_>>(),
350+
);
351+
}
352+
entries
353+
}
354+
}
355+
356+
pub fn generate_vec<A>(
357+
size: usize,
358+
generator: impl Fn(&mut StdRng) -> A,
359+
) -> impl Fn(&mut StdRng) -> Vec<A> {
360+
move |rng| {
361+
let mut result = Vec::<A>::with_capacity(size);
362+
for _ in 0..size {
363+
result.push(generator(rng));
364+
}
365+
result
366+
}
367+
}
368+
369+
pub fn generate_u8_then<A>(low: u8, high: u8, then: impl Fn(u8) -> A) -> impl Fn(&mut StdRng) -> A {
370+
move |rng| {
371+
let x = rng.random_range(low..=high);
372+
then(x)
373+
}
374+
}
375+
376+
pub fn generate_u8(low: u8, high: u8) -> impl Fn(&mut StdRng) -> u8 {
377+
generate_u8_then(low, high, |x| x)
378+
}
379+
332380
#[cfg(test)]
333381
mod test {
382+
use rand::rngs::StdRng;
383+
use rand::SeedableRng;
334384
use std::path::Path;
335385

336386
use crate::simulator::generate::*;
337387

388+
#[test]
389+
fn test_generate_u8() {
390+
let seed = 1234;
391+
let mut rng = StdRng::seed_from_u64(seed);
392+
let mut counts = std::collections::BTreeMap::new();
393+
394+
for _i in 0..1000 {
395+
let x = generate_u8(1, 6)(&mut rng);
396+
*counts.entry(x).or_insert(0) += 1;
397+
}
398+
399+
assert_eq!(counts.len(), 6);
400+
for i in 1..=6 {
401+
assert!(counts.contains_key(&i), "value {} was never generated", i);
402+
}
403+
}
404+
338405
#[test]
339406
fn test_ancestors() {
340407
let a = Bytes::try_from("aa").unwrap();

simulation/amaru-sim/src/simulator/mod.rs

Lines changed: 47 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,13 @@ use amaru_stores::rocksdb::consensus::InMemConsensusStore;
3737
use anyhow::Error;
3838
use bytes::Bytes;
3939
use clap::Parser;
40-
use generate::{generate_inputs_strategy, parse_json, read_chain_json};
40+
use generate::{generate_entries, parse_json, read_chain_json};
4141
use ledger::{populate_chain_store, FakeStakeDistribution};
42-
use proptest::test_runner::Config;
4342
use pure_stage::{simulation::SimulationBuilder, trace_buffer::TraceBuffer, StageRef};
44-
use pure_stage::{Receiver, StageGraph, Void};
45-
use simulate::{pure_stage_node_handle, simulate, Trace};
43+
use pure_stage::{Instant, Receiver, StageGraph, Void};
44+
use rand::Rng;
45+
use simulate::{pure_stage_node_handle, simulate, History, SimulateConfig};
46+
use std::time::Duration;
4647
use std::{path::PathBuf, sync::Arc};
4748
use tokio::sync::Mutex;
4849
use tracing::{info, Span};
@@ -81,6 +82,18 @@ pub struct Args {
8182
#[arg(long, default_value_t = Hash::from([0; 32]))]
8283
pub start_header: Hash<32>,
8384

85+
/// Number of tests to run in simulation
86+
#[arg(long, default_value = "50")]
87+
pub number_of_tests: Option<u32>,
88+
89+
/// Number of nodes in simulation.
90+
#[arg(long, default_value = "1")]
91+
pub number_of_nodes: Option<u8>,
92+
93+
/// Number of upstream peers to simulate
94+
#[arg(long, default_value = "2")]
95+
pub number_of_upstream_peers: Option<u8>,
96+
8497
/// Seed for simulation testing.
8598
#[arg(long)]
8699
pub seed: Option<u64>,
@@ -108,8 +121,9 @@ fn init_node(args: &Args) -> (GlobalParameters, SelectChain, ValidateHeader) {
108121
let select_chain = SelectChain::new(make_chain_selector(
109122
Origin,
110123
&chain_store,
111-
// FIXME: Shouldn't be hardcoded!
112-
&vec![Peer::new("c1")],
124+
&(1..=args.number_of_upstream_peers.unwrap_or(2))
125+
.map(|i| Peer::new(&format!("c{}", i)))
126+
.collect::<Vec<_>>(),
113127
));
114128
let chain_ref = Arc::new(Mutex::new(chain_store));
115129
let validate_header = ValidateHeader::new(Arc::new(stake_distribution), chain_ref.clone());
@@ -334,7 +348,9 @@ fn spawn_node(
334348
}
335349

336350
pub fn run(rt: tokio::runtime::Runtime, args: Args) {
337-
let number_of_nodes = 1;
351+
let number_of_tests = args.number_of_tests.unwrap_or(50);
352+
let number_of_nodes = args.number_of_nodes.unwrap_or(1);
353+
let number_of_upstream_peers = args.number_of_upstream_peers.unwrap_or(2);
338354
let trace_buffer = Arc::new(parking_lot::Mutex::new(TraceBuffer::new(42, 1_000_000_000)));
339355

340356
let spawn = || {
@@ -344,11 +360,24 @@ pub fn run(rt: tokio::runtime::Runtime, args: Args) {
344360
pure_stage_node_handle(rx, receive, running).unwrap()
345361
};
346362

363+
let seed = args.seed.unwrap_or({
364+
let mut rng = rand::rng();
365+
rng.random::<u64>()
366+
});
367+
347368
simulate(
348-
Config::default(),
349-
number_of_nodes,
369+
SimulateConfig {
370+
number_of_tests,
371+
seed,
372+
number_of_nodes,
373+
},
350374
spawn,
351-
generate_inputs_strategy(&args.block_tree_file, args.seed),
375+
generate_entries(
376+
&args.block_tree_file,
377+
Instant::at_offset(Duration::from_secs(0)),
378+
200.0,
379+
number_of_upstream_peers,
380+
),
352381
chain_property(&args.block_tree_file),
353382
trace_buffer.clone(),
354383
args.persist_on_success,
@@ -357,15 +386,13 @@ pub fn run(rt: tokio::runtime::Runtime, args: Args) {
357386

358387
fn chain_property(
359388
chain_data_path: &PathBuf,
360-
) -> impl Fn(Trace<ChainSyncMessage>) -> Result<(), String> + use<'_> {
361-
move |trace| {
362-
match trace.0.last() {
363-
None => Err("impossible, no last entry in trace".to_string()),
389+
) -> impl Fn(History<ChainSyncMessage>) -> Result<(), String> + use<'_> {
390+
move |history| {
391+
match history.0.last() {
392+
None => Err("impossible, no last entry in history".to_string()),
364393
Some(entry) => {
365-
assert_eq!(entry.src, "n1", "entry: {:?}, trace: {:?}", entry, trace);
366-
assert_eq!(entry.dest, "c1");
367394
// FIXME: the property is wrong, we should check the property
368-
// that the output message trace is a prefix of the read chain
395+
// that the output message history is a prefix of the read chain
369396
let data = read_chain_json(chain_data_path);
370397
let blocks = parse_json(data.as_bytes()).map_err(|err| err.to_string())?;
371398
match &entry.body {
@@ -376,20 +403,13 @@ fn chain_property(
376403
.map(|block| (block.hash.clone(), Slot::from(block.slot)))
377404
.expect("empty chain data");
378405
if actual != expected {
379-
panic!(
406+
return Err(format!(
380407
"tip of chains don't match, expected {:?}, got {:?}",
381408
expected, actual
382-
);
383-
}
384-
info!("Success!")
385-
}
386-
_ => {
387-
info!("TRACE:");
388-
for entry in &trace.0 {
389-
info!("{:?}", entry);
409+
));
390410
}
391-
panic!("Last entry in trace isn't a forward")
392411
}
412+
_ => return Err("Last entry in history isn't a forward".to_string()),
393413
}
394414
Ok(())
395415
}

0 commit comments

Comments
 (0)