Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 36 additions & 30 deletions integration_tests/src/base_node_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{
convert::TryInto,
fmt::{Debug, Formatter},
net::TcpListener,
path::PathBuf,
Expand All @@ -47,14 +46,14 @@ use tari_shutdown::Shutdown;
use tokio::task;
use tonic::transport::Channel;

use crate::{TariWorld, get_peer_addresses, get_port, wait_for_service};
use crate::{TariWorld, get_peer_addresses, wait_for_service};

#[derive(Clone)]
pub struct BaseNodeProcess {
pub name: String,
pub port: u64,
pub grpc_port: u64,
pub http_port: u64,
pub port: u16,
pub grpc_port: u16,
pub http_port: u16,
pub identity: NodeIdentity,
pub temp_dir_path: PathBuf,
pub is_seed_node: bool,
Expand Down Expand Up @@ -95,14 +94,11 @@ pub async fn spawn_base_node_with_config(
peers: Vec<String>,
mut base_node_config: BaseNodeConfig,
) {
unsafe {
std::env::set_var("TARI_NETWORK", "localnet");
}
set_network_if_choice_valid(Network::LocalNet).unwrap();

let port: u64;
let grpc_port: u64;
let http_port: u64;
let port: u16;
let grpc_port: u16;
let http_port: u16;
let temp_dir_path: PathBuf;
let base_node_identity: NodeIdentity;

Expand All @@ -115,10 +111,18 @@ pub async fn spawn_base_node_with_config(

base_node_identity = node_ps.identity.clone();
} else {
// each spawned base node will use different ports
port = get_port(world, 18000..18499).unwrap();
grpc_port = get_port(world, 18500..18999).unwrap();
http_port = get_port(world, 19000..19499).unwrap();
// Allocate ports from the global pool (pre-scanned at startup, much faster than
// random scanning per node)
let ports = crate::port_pool::global_port_pool()
.allocate_base_node_ports()
.expect("Port pool exhausted — too many concurrent base nodes");
port = ports.p2p;
grpc_port = ports.grpc;
http_port = ports.http;
// Track in world for backwards compatibility
world.assigned_ports.insert(port, port);
world.assigned_ports.insert(grpc_port, grpc_port);
world.assigned_ports.insert(http_port, http_port);
// create a new temporary directory
temp_dir_path = world
.current_base_dir
Expand Down Expand Up @@ -174,7 +178,7 @@ pub async fn spawn_base_node_with_config(
base_node_config.base_node.grpc_address = Some(format!("/ip4/127.0.0.1/tcp/{grpc_port}").parse().unwrap());
base_node_config.base_node.report_grpc_error = true;
base_node_config.base_node.metadata_auto_ping_interval = Duration::from_secs(3);
base_node_config.base_node.http_wallet_query_service.port = http_port.try_into().unwrap();
base_node_config.base_node.http_wallet_query_service.port = http_port;
base_node_config.base_node.http_wallet_query_service.listen_ip = Some("127.0.0.1".to_string().parse().unwrap());
base_node_config.base_node.http_wallet_query_service.external_address =
Some(format!("http://127.0.0.1:{http_port}").parse().unwrap());
Expand Down Expand Up @@ -236,11 +240,13 @@ pub async fn spawn_base_node_with_config(
.blocks_behind_before_considered_lagging = 1;
base_node_config.base_node.state_machine.time_before_considered_lagging = Duration::from_secs(3);
base_node_config.base_node.state_machine.initial_sync_peer_count = 1;
base_node_config
.base_node
.state_machine
.blockchain_sync_config
.num_initial_sync_rounds_seed_bootstrap = 1;

// Tune blockchain sync for faster test execution
let sync_cfg = &mut base_node_config.base_node.state_machine.blockchain_sync_config;
sync_cfg.num_initial_sync_rounds_seed_bootstrap = 1;
sync_cfg.initial_max_sync_latency = Duration::from_secs(60); // prod: 240s — tests use local TCP, not tor
sync_cfg.rpc_deadline = Duration::from_secs(60); // prod: 240s
sync_cfg.short_ban_period = Duration::from_secs(30); // prod: 240s — faster retry after transient failures

println!(
"Initializing base node: name={name_cloned}; port={port}; grpc_port={grpc_port}; \
Expand All @@ -266,33 +272,33 @@ pub async fn spawn_base_node_with_config(

impl BaseNodeProcess {
pub async fn get_grpc_client(&self) -> anyhow::Result<BaseNodeGrpcClient<Channel>> {
Ok(
BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port))
.await?
.max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE)
.max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE),
)
let endpoint = tonic::transport::Endpoint::from_shared(format!("http://127.0.0.1:{}", self.grpc_port))?
.connect_timeout(Duration::from_secs(10))
.timeout(Duration::from_secs(30));
Ok(BaseNodeGrpcClient::new(endpoint.connect().await?)
.max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE)
.max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE))
}

pub fn kill(&mut self) {
self.kill_signal.trigger();
loop {
// lets wait till the port is cleared
if TcpListener::bind(("127.0.0.1", self.port.try_into().unwrap())).is_ok() {
if TcpListener::bind(("127.0.0.1", self.port)).is_ok() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
loop {
// lets wait till the port is cleared
if TcpListener::bind(("127.0.0.1", self.grpc_port.try_into().unwrap())).is_ok() {
if TcpListener::bind(("127.0.0.1", self.grpc_port)).is_ok() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
loop {
// lets wait till the http port is cleared
if TcpListener::bind(("127.0.0.1", self.http_port.try_into().unwrap())).is_ok() {
if TcpListener::bind(("127.0.0.1", self.http_port)).is_ok() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(20));
Expand Down
37 changes: 17 additions & 20 deletions integration_tests/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

use std::{convert::TryFrom, net::TcpListener, ops::Range, path::PathBuf, process, time::Duration};
use std::{net::TcpListener, ops::Range, path::PathBuf, process, time::Duration};

use rand::Rng;

Expand All @@ -33,25 +33,30 @@ pub mod transaction;
pub mod wallet;
pub mod wallet_ffi;
pub mod wallet_process;
#[macro_use]
pub mod polling;
pub mod port_pool;
pub mod tx_event_stream;
pub mod world;

pub use polling::{DEFAULT_TIMEOUT, SHORT_TIMEOUT, scaled_timeout, timeout_multiplier};
pub use world::TariWorld;

pub fn get_port(world: &mut TariWorld, range: Range<u16>) -> Option<u64> {
pub fn get_port(world: &mut TariWorld, range: Range<u16>) -> Option<u16> {
let min = range.clone().min().expect("A minimum possible port number");
let max = range.max().expect("A maximum possible port number");

loop {
let port = loop {
let port = rand::thread_rng().gen_range(min..max);
if !world.assigned_ports.contains_key(&u64::from(port)) {
if !world.assigned_ports.contains_key(&port) {
break port;
}
};

if TcpListener::bind(("127.0.0.1", port)).is_ok() {
world.assigned_ports.insert(u64::from(port), u64::from(port));
return Some(u64::from(port));
world.assigned_ports.insert(port, port);
return Some(port);
}
}
}
Expand All @@ -61,24 +66,16 @@ pub fn get_base_dir() -> PathBuf {
crate_root.join(format!("tests/temp/cucumber_{}", process::id()))
}

pub async fn wait_for_service(port: u64) {
pub async fn wait_for_service(port: u16) {
// The idea is that if the port is taken it means the service is running.
// If the port is not taken the service hasn't come up yet
let max_tries = 4 * 60;
let mut attempts = 0;

loop {
if TcpListener::bind(("127.0.0.1", u16::try_from(port).unwrap())).is_err() {
return;
}

if attempts >= max_tries {
panic!("Service on port {port} never started");
wait_for!(
timeout: Duration::from_secs(60),
description: format!("service on port {port} to start"),
condition: async {
Ok(TcpListener::bind(("127.0.0.1", port)).is_err())
}

tokio::time::sleep(Duration::from_millis(250)).await;
attempts += 1;
}
);
}

pub async fn get_peer_addresses(world: &TariWorld, peers: &[String]) -> Vec<String> {
Expand Down
5 changes: 1 addition & 4 deletions integration_tests/src/merge_mining_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct MergeMiningProxyProcess {
pub name: String,
pub base_node_name: String,
pub wallet_name: String,
pub port: u64,
pub port: u16,
pub origin_submission: bool,
id: u64,
}
Expand Down Expand Up @@ -69,9 +69,6 @@ pub async fn register_merge_mining_proxy_process(

impl MergeMiningProxyProcess {
pub async fn start(&self, world: &mut TariWorld) {
unsafe {
std::env::set_var("TARI_NETWORK", "localnet");
}
set_network_if_choice_valid(Network::LocalNet).unwrap();

let temp_dir = tempdir().unwrap();
Expand Down
3 changes: 0 additions & 3 deletions integration_tests/src/miner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,6 @@ impl MinerProcess {
miner_min_diff: Option<u64>,
miner_max_diff: Option<u64>,
) {
unsafe {
std::env::set_var("TARI_NETWORK", "localnet");
}
set_network_if_choice_valid(Network::LocalNet).unwrap();
let pow_algo = match self.pow_algo {
PowAlgorithm::RandomXM => {
Expand Down
Loading
Loading