diff --git a/integration_tests/src/base_node_process.rs b/integration_tests/src/base_node_process.rs index 92bceba4bc..e4916678c9 100644 --- a/integration_tests/src/base_node_process.rs +++ b/integration_tests/src/base_node_process.rs @@ -21,7 +21,6 @@ // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. use std::{ - convert::TryInto, fmt::{Debug, Formatter}, net::TcpListener, path::PathBuf, @@ -47,14 +46,14 @@ use tari_shutdown::Shutdown; use tokio::task; use tonic::transport::Channel; -use crate::{TariWorld, get_peer_addresses, get_port, wait_for_service}; +use crate::{TariWorld, get_peer_addresses, wait_for_service}; #[derive(Clone)] pub struct BaseNodeProcess { pub name: String, - pub port: u64, - pub grpc_port: u64, - pub http_port: u64, + pub port: u16, + pub grpc_port: u16, + pub http_port: u16, pub identity: NodeIdentity, pub temp_dir_path: PathBuf, pub is_seed_node: bool, @@ -95,14 +94,11 @@ pub async fn spawn_base_node_with_config( peers: Vec, mut base_node_config: BaseNodeConfig, ) { - unsafe { - std::env::set_var("TARI_NETWORK", "localnet"); - } set_network_if_choice_valid(Network::LocalNet).unwrap(); - let port: u64; - let grpc_port: u64; - let http_port: u64; + let port: u16; + let grpc_port: u16; + let http_port: u16; let temp_dir_path: PathBuf; let base_node_identity: NodeIdentity; @@ -115,10 +111,18 @@ pub async fn spawn_base_node_with_config( base_node_identity = node_ps.identity.clone(); } else { - // each spawned base node will use different ports - port = get_port(world, 18000..18499).unwrap(); - grpc_port = get_port(world, 18500..18999).unwrap(); - http_port = get_port(world, 19000..19499).unwrap(); + // Allocate ports from the global pool (pre-scanned at startup, much faster than + // random scanning per node) + let ports = crate::port_pool::global_port_pool() + .allocate_base_node_ports() + .expect("Port pool exhausted — too many concurrent base nodes"); + port = ports.p2p; + grpc_port = ports.grpc; + http_port = ports.http; + // Track in world for backwards compatibility + world.assigned_ports.insert(port, port); + world.assigned_ports.insert(grpc_port, grpc_port); + world.assigned_ports.insert(http_port, http_port); // create a new temporary directory temp_dir_path = world .current_base_dir @@ -174,7 +178,7 @@ pub async fn spawn_base_node_with_config( base_node_config.base_node.grpc_address = Some(format!("/ip4/127.0.0.1/tcp/{grpc_port}").parse().unwrap()); base_node_config.base_node.report_grpc_error = true; base_node_config.base_node.metadata_auto_ping_interval = Duration::from_secs(3); - base_node_config.base_node.http_wallet_query_service.port = http_port.try_into().unwrap(); + base_node_config.base_node.http_wallet_query_service.port = http_port; base_node_config.base_node.http_wallet_query_service.listen_ip = Some("127.0.0.1".to_string().parse().unwrap()); base_node_config.base_node.http_wallet_query_service.external_address = Some(format!("http://127.0.0.1:{http_port}").parse().unwrap()); @@ -236,11 +240,13 @@ pub async fn spawn_base_node_with_config( .blocks_behind_before_considered_lagging = 1; base_node_config.base_node.state_machine.time_before_considered_lagging = Duration::from_secs(3); base_node_config.base_node.state_machine.initial_sync_peer_count = 1; - base_node_config - .base_node - .state_machine - .blockchain_sync_config - .num_initial_sync_rounds_seed_bootstrap = 1; + + // Tune blockchain sync for faster test execution + let sync_cfg = &mut base_node_config.base_node.state_machine.blockchain_sync_config; + sync_cfg.num_initial_sync_rounds_seed_bootstrap = 1; + sync_cfg.initial_max_sync_latency = Duration::from_secs(60); // prod: 240s — tests use local TCP, not tor + sync_cfg.rpc_deadline = Duration::from_secs(60); // prod: 240s + sync_cfg.short_ban_period = Duration::from_secs(30); // prod: 240s — faster retry after transient failures println!( "Initializing base node: name={name_cloned}; port={port}; grpc_port={grpc_port}; \ @@ -266,33 +272,33 @@ pub async fn spawn_base_node_with_config( impl BaseNodeProcess { pub async fn get_grpc_client(&self) -> anyhow::Result> { - Ok( - BaseNodeGrpcClient::connect(format!("http://127.0.0.1:{}", self.grpc_port)) - .await? - .max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE) - .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE), - ) + let endpoint = tonic::transport::Endpoint::from_shared(format!("http://127.0.0.1:{}", self.grpc_port))? + .connect_timeout(Duration::from_secs(10)) + .timeout(Duration::from_secs(30)); + Ok(BaseNodeGrpcClient::new(endpoint.connect().await?) + .max_encoding_message_size(MAX_GRPC_MESSAGE_SIZE) + .max_decoding_message_size(MAX_GRPC_MESSAGE_SIZE)) } pub fn kill(&mut self) { self.kill_signal.trigger(); loop { // lets wait till the port is cleared - if TcpListener::bind(("127.0.0.1", self.port.try_into().unwrap())).is_ok() { + if TcpListener::bind(("127.0.0.1", self.port)).is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(20)); } loop { // lets wait till the port is cleared - if TcpListener::bind(("127.0.0.1", self.grpc_port.try_into().unwrap())).is_ok() { + if TcpListener::bind(("127.0.0.1", self.grpc_port)).is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(20)); } loop { // lets wait till the http port is cleared - if TcpListener::bind(("127.0.0.1", self.http_port.try_into().unwrap())).is_ok() { + if TcpListener::bind(("127.0.0.1", self.http_port)).is_ok() { break; } std::thread::sleep(std::time::Duration::from_millis(20)); diff --git a/integration_tests/src/lib.rs b/integration_tests/src/lib.rs index 5e945e849c..900abaf765 100644 --- a/integration_tests/src/lib.rs +++ b/integration_tests/src/lib.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{convert::TryFrom, net::TcpListener, ops::Range, path::PathBuf, process, time::Duration}; +use std::{net::TcpListener, ops::Range, path::PathBuf, process, time::Duration}; use rand::Rng; @@ -33,25 +33,30 @@ pub mod transaction; pub mod wallet; pub mod wallet_ffi; pub mod wallet_process; +#[macro_use] +pub mod polling; +pub mod port_pool; +pub mod tx_event_stream; pub mod world; +pub use polling::{DEFAULT_TIMEOUT, SHORT_TIMEOUT, scaled_timeout, timeout_multiplier}; pub use world::TariWorld; -pub fn get_port(world: &mut TariWorld, range: Range) -> Option { +pub fn get_port(world: &mut TariWorld, range: Range) -> Option { let min = range.clone().min().expect("A minimum possible port number"); let max = range.max().expect("A maximum possible port number"); loop { let port = loop { let port = rand::thread_rng().gen_range(min..max); - if !world.assigned_ports.contains_key(&u64::from(port)) { + if !world.assigned_ports.contains_key(&port) { break port; } }; if TcpListener::bind(("127.0.0.1", port)).is_ok() { - world.assigned_ports.insert(u64::from(port), u64::from(port)); - return Some(u64::from(port)); + world.assigned_ports.insert(port, port); + return Some(port); } } } @@ -61,24 +66,16 @@ pub fn get_base_dir() -> PathBuf { crate_root.join(format!("tests/temp/cucumber_{}", process::id())) } -pub async fn wait_for_service(port: u64) { +pub async fn wait_for_service(port: u16) { // The idea is that if the port is taken it means the service is running. // If the port is not taken the service hasn't come up yet - let max_tries = 4 * 60; - let mut attempts = 0; - - loop { - if TcpListener::bind(("127.0.0.1", u16::try_from(port).unwrap())).is_err() { - return; - } - - if attempts >= max_tries { - panic!("Service on port {port} never started"); + wait_for!( + timeout: Duration::from_secs(60), + description: format!("service on port {port} to start"), + condition: async { + Ok(TcpListener::bind(("127.0.0.1", port)).is_err()) } - - tokio::time::sleep(Duration::from_millis(250)).await; - attempts += 1; - } + ); } pub async fn get_peer_addresses(world: &TariWorld, peers: &[String]) -> Vec { diff --git a/integration_tests/src/merge_mining_proxy.rs b/integration_tests/src/merge_mining_proxy.rs index 25db2277e2..bad81f8b92 100644 --- a/integration_tests/src/merge_mining_proxy.rs +++ b/integration_tests/src/merge_mining_proxy.rs @@ -40,7 +40,7 @@ pub struct MergeMiningProxyProcess { pub name: String, pub base_node_name: String, pub wallet_name: String, - pub port: u64, + pub port: u16, pub origin_submission: bool, id: u64, } @@ -69,9 +69,6 @@ pub async fn register_merge_mining_proxy_process( impl MergeMiningProxyProcess { pub async fn start(&self, world: &mut TariWorld) { - unsafe { - std::env::set_var("TARI_NETWORK", "localnet"); - } set_network_if_choice_valid(Network::LocalNet).unwrap(); let temp_dir = tempdir().unwrap(); diff --git a/integration_tests/src/miner.rs b/integration_tests/src/miner.rs index 51b449080a..9630d2635f 100644 --- a/integration_tests/src/miner.rs +++ b/integration_tests/src/miner.rs @@ -94,9 +94,6 @@ impl MinerProcess { miner_min_diff: Option, miner_max_diff: Option, ) { - unsafe { - std::env::set_var("TARI_NETWORK", "localnet"); - } set_network_if_choice_valid(Network::LocalNet).unwrap(); let pow_algo = match self.pow_algo { PowAlgorithm::RandomXM => { diff --git a/integration_tests/src/polling.rs b/integration_tests/src/polling.rs new file mode 100644 index 0000000000..a7f3d20629 --- /dev/null +++ b/integration_tests/src/polling.rs @@ -0,0 +1,165 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{sync::OnceLock, time::Duration}; + +/// Convenience: 2-minute timeout (matches the old `TWO_MINUTES_WITH_HALF_SECOND_SLEEP` pattern). +pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(120); + +/// Convenience: shorter timeout for operations that should be fast. +pub const SHORT_TIMEOUT: Duration = Duration::from_secs(30); + +/// Returns the timeout multiplier, read from `INTEGRATION_TEST_TIMEOUT_MULTIPLIER` env var. +/// +/// Defaults to 1.0. Set to e.g. 2.0 in CI to double all timeouts for slower environments, +/// or 0.5 locally to fail faster during development. +/// +/// The value is read once and cached for the lifetime of the process. +pub fn timeout_multiplier() -> f64 { + static MULTIPLIER: OnceLock = OnceLock::new(); + *MULTIPLIER.get_or_init(|| { + std::env::var("INTEGRATION_TEST_TIMEOUT_MULTIPLIER") + .ok() + .and_then(|v| v.parse::().ok()) + .map(|v| if v > 0.0 { v } else { 1.0 }) + .unwrap_or(1.0) + }) +} + +/// Apply the timeout multiplier to a duration. +pub fn scaled_timeout(base: Duration) -> Duration { + Duration::from_secs_f64(base.as_secs_f64() * timeout_multiplier()) +} + +/// Poll an async condition with exponential backoff until it succeeds or the timeout is reached. +/// +/// The `$condition` expression must evaluate to `Result`: +/// - `Ok(true)` — condition met, stop polling +/// - `Ok(false)` — not yet met, continue polling +/// - `Err(msg)` — not yet met, record `msg` for the timeout panic message +/// +/// The polling interval starts at 250ms and grows by 1.5x each iteration, capped at 4s. +/// +/// Timeouts are automatically scaled by `INTEGRATION_TEST_TIMEOUT_MULTIPLIER` env var +/// (default 1.0). Set to 2.0 in CI for slower environments. +/// +/// # Usage +/// ```ignore +/// wait_for!( +/// timeout: DEFAULT_TIMEOUT, +/// description: format!("node {} to reach height {}", node, height), +/// condition: async { +/// let tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); +/// let h = tip.metadata.unwrap().best_block_height; +/// if h >= height { Ok(true) } else { Err(format!("current height {h}")) } +/// } +/// ); +/// ``` +#[macro_export] +macro_rules! wait_for { + // Variant with custom max_interval for poll-sensitive operations + ( + timeout: $timeout:expr, + max_interval: $max_iv:expr, + description: $desc:expr, + condition: async $body:block + ) => {{ + let __start = ::tokio::time::Instant::now(); + let __timeout: ::std::time::Duration = $crate::polling::scaled_timeout($timeout); + let __desc = $desc; + let mut __interval = ::std::time::Duration::from_millis(250); + let __max_interval: ::std::time::Duration = $max_iv; + let mut __last_error: Option = None; + + loop { + let __result: Result = async $body .await; + match __result { + Ok(true) => break, + Ok(false) => {}, + Err(e) => { __last_error = Some(e); }, + } + + if __start.elapsed() >= __timeout { + let mut __msg = format!( + "Timed out after {:.1}s waiting for: {}", + __start.elapsed().as_secs_f64(), + __desc + ); + if let Some(ref e) = __last_error { + __msg.push_str(&format!(" (last state: {e})")); + } + panic!("{}", __msg); + } + + let __remaining = __timeout.saturating_sub(__start.elapsed()); + let __sleep_dur = __interval.min(__remaining); + ::tokio::time::sleep(__sleep_dur).await; + + __interval = ::std::time::Duration::from_secs_f64( + (__interval.as_secs_f64() * 1.5).min(__max_interval.as_secs_f64()) + ); + } + }}; + // Default variant — 4s max interval + ( + timeout: $timeout:expr, + description: $desc:expr, + condition: async $body:block + ) => {{ + let __start = ::tokio::time::Instant::now(); + let __timeout: ::std::time::Duration = $crate::polling::scaled_timeout($timeout); + let __desc = $desc; + let mut __interval = ::std::time::Duration::from_millis(250); + let __max_interval = ::std::time::Duration::from_secs(4); + let mut __last_error: Option = None; + + loop { + let __result: Result = async $body .await; + match __result { + Ok(true) => break, + Ok(false) => {}, + Err(e) => { __last_error = Some(e); }, + } + + if __start.elapsed() >= __timeout { + let mut __msg = format!( + "Timed out after {:.1}s waiting for: {}", + __start.elapsed().as_secs_f64(), + __desc + ); + if let Some(ref e) = __last_error { + __msg.push_str(&format!(" (last state: {e})")); + } + panic!("{}", __msg); + } + + let __remaining = __timeout.saturating_sub(__start.elapsed()); + let __sleep_dur = __interval.min(__remaining); + ::tokio::time::sleep(__sleep_dur).await; + + // Exponential backoff capped at max_interval + __interval = ::std::time::Duration::from_secs_f64( + (__interval.as_secs_f64() * 1.5).min(__max_interval.as_secs_f64()) + ); + } + }}; +} diff --git a/integration_tests/src/port_pool.rs b/integration_tests/src/port_pool.rs new file mode 100644 index 0000000000..c0da57f063 --- /dev/null +++ b/integration_tests/src/port_pool.rs @@ -0,0 +1,149 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::{collections::VecDeque, net::TcpListener, sync::Mutex}; + +/// A thread-safe port pool that pre-validates port availability at construction time. +/// +/// This eliminates the per-scenario cost of random port scanning, which is especially slow +/// when many concurrent scenarios compete for ports. Ports are returned to the pool after +/// a scenario completes (via [`return_port`]). +pub struct PortPool { + pools: Mutex, +} + +struct PortPoolInner { + /// Pre-validated available ports for base node P2P (18000-18499) + p2p_ports: VecDeque, + /// Pre-validated available ports for gRPC (18500-18999) + grpc_ports: VecDeque, + /// Pre-validated available ports for HTTP (19000-19499) + http_ports: VecDeque, + /// Pre-validated available ports for wallet gRPC (19500-19999) + wallet_grpc_ports: VecDeque, + /// Pre-validated available ports for wallet HTTP (20000-20499) + wallet_http_ports: VecDeque, +} + +/// A set of ports allocated for a single base node. +#[derive(Debug, Clone)] +pub struct BaseNodePorts { + pub p2p: u16, + pub grpc: u16, + pub http: u16, +} + +/// A set of ports allocated for a single wallet. +#[derive(Debug, Clone)] +pub struct WalletPorts { + pub grpc: u16, + pub http: u16, +} + +impl PortPool { + /// Create a new port pool, pre-scanning for available ports. + /// + /// `capacity` is the number of ports to pre-validate per category. + /// A typical value is 50-100, covering most test scenarios. + pub fn new(capacity: usize) -> Self { + let p2p_ports = scan_available_ports(18000, 18499, capacity); + let grpc_ports = scan_available_ports(18500, 18999, capacity); + let http_ports = scan_available_ports(19000, 19499, capacity); + let wallet_grpc_ports = scan_available_ports(19500, 19999, capacity); + let wallet_http_ports = scan_available_ports(20000, 20499, capacity); + + println!( + "PortPool initialized: {} p2p, {} grpc, {} http, {} wallet_grpc, {} wallet_http ports available", + p2p_ports.len(), + grpc_ports.len(), + http_ports.len(), + wallet_grpc_ports.len(), + wallet_http_ports.len(), + ); + + Self { + pools: Mutex::new(PortPoolInner { + p2p_ports, + grpc_ports, + http_ports, + wallet_grpc_ports, + wallet_http_ports, + }), + } + } + + /// Allocate a set of ports for a base node. + /// + /// Returns `None` if not enough ports are available (pool exhausted). + pub fn allocate_base_node_ports(&self) -> Option { + let mut inner = self.pools.lock().unwrap(); + let p2p = inner.p2p_ports.pop_front()?; + let grpc = inner.grpc_ports.pop_front()?; + let http = inner.http_ports.pop_front()?; + Some(BaseNodePorts { p2p, grpc, http }) + } + + /// Allocate a set of ports for a wallet. + pub fn allocate_wallet_ports(&self) -> Option { + let mut inner = self.pools.lock().unwrap(); + let grpc = inner.wallet_grpc_ports.pop_front()?; + let http = inner.wallet_http_ports.pop_front()?; + Some(WalletPorts { grpc, http }) + } + + /// Return base node ports to the pool for reuse by another scenario. + pub fn return_base_node_ports(&self, ports: BaseNodePorts) { + let mut inner = self.pools.lock().unwrap(); + inner.p2p_ports.push_back(ports.p2p); + inner.grpc_ports.push_back(ports.grpc); + inner.http_ports.push_back(ports.http); + } + + /// Return wallet ports to the pool. + pub fn return_wallet_ports(&self, ports: WalletPorts) { + let mut inner = self.pools.lock().unwrap(); + inner.wallet_grpc_ports.push_back(ports.grpc); + inner.wallet_http_ports.push_back(ports.http); + } +} + +/// Scan a port range and return up to `capacity` ports that are currently available. +fn scan_available_ports(start: u16, end: u16, capacity: usize) -> VecDeque { + let mut ports = VecDeque::with_capacity(capacity); + for port in start..=end { + if ports.len() >= capacity { + break; + } + if TcpListener::bind(("127.0.0.1", port)).is_ok() { + ports.push_back(port); + } + } + ports +} + +/// Global port pool, initialized once on first access. +static GLOBAL_PORT_POOL: std::sync::OnceLock = std::sync::OnceLock::new(); + +/// Get the global port pool, initializing it on first access with 80 ports per category. +pub fn global_port_pool() -> &'static PortPool { + GLOBAL_PORT_POOL.get_or_init(|| PortPool::new(80)) +} diff --git a/integration_tests/src/tx_event_stream.rs b/integration_tests/src/tx_event_stream.rs new file mode 100644 index 0000000000..75bbc1d5b0 --- /dev/null +++ b/integration_tests/src/tx_event_stream.rs @@ -0,0 +1,274 @@ +// Copyright 2023. The Tari Project +// +// Redistribution and use in source and binary forms, with or without modification, are permitted provided that the +// following conditions are met: +// +// 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following +// disclaimer. +// +// 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the +// following disclaimer in the documentation and/or other materials provided with the distribution. +// +// 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote +// products derived from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +// INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE +// DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR +// SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, +// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE +// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +use std::time::Duration; + +use minotari_app_grpc::tari_rpc::{self as grpc}; +use minotari_wallet_grpc_client::WalletGrpcClient; +use tonic::transport::Channel; + +use crate::polling::scaled_timeout; + +/// Wait for a specific transaction to reach a target status by subscribing to the wallet's +/// transaction event stream instead of polling. +/// +/// Falls back to polling via `GetTransactionInfo` if the stream is unavailable or the event +/// doesn't arrive within the timeout. +/// +/// `target_status` should be one of: "Pending", "Completed", "Broadcast", +/// "Mined_or_OneSidedUnconfirmed", "Mined_or_OneSidedConfirmed", "Coinbase" +pub async fn wait_for_tx_status( + client: &mut WalletGrpcClient, + tx_id: u64, + target_status: &str, + timeout: Duration, +) -> Result<(), String> { + let timeout = scaled_timeout(timeout); + let deadline = tokio::time::Instant::now() + timeout; + + // Try event stream first — if it's available, we get instant notifications. + // Timeout the stream open quickly so we don't waste time if the RPC isn't supported. + let stream_result = tokio::time::timeout( + Duration::from_secs(5), + client.stream_transaction_events(grpc::TransactionEventRequest {}), + ) + .await + .ok() + .and_then(|r| r.ok()); + + if let Some(response) = stream_result { + let mut stream = response.into_inner(); + + // First check current state in case it already matches + if check_tx_status_matches(client, tx_id, target_status) + .await + .unwrap_or(false) + { + return Ok(()); + } + + // Listen for events until timeout + loop { + let now = tokio::time::Instant::now(); + if now >= deadline { + return Err(format!( + "Timed out after {:.1}s waiting for tx {tx_id} to reach status '{target_status}' via event stream", + timeout.as_secs_f64() + )); + } + + let remaining = deadline.saturating_duration_since(now); + match tokio::time::timeout(remaining, stream.message()).await { + Ok(Ok(Some(event_response))) => { + if let Some(event) = event_response.transaction { + // Check if this event is for our transaction + if let Ok(event_tx_id) = event.tx_id.parse::() && + event_tx_id == tx_id && + status_matches_target(&event.status, target_status) + { + return Ok(()); + } + } + }, + Ok(Ok(None)) => { + // Stream ended — fall back to polling + break; + }, + Ok(Err(_)) => { + // Stream error — fall back to polling + break; + }, + Err(_) => { + // Timeout + return Err(format!( + "Timed out after {:.1}s waiting for tx {tx_id} to reach status '{target_status}'", + timeout.as_secs_f64() + )); + }, + } + } + } + + // Fallback: poll via GetTransactionInfo (same as before but with exponential backoff) + let mut interval = Duration::from_millis(250); + let max_interval = Duration::from_secs(2); + let mut last_error: Option = None; + + loop { + match check_tx_status_matches(client, tx_id, target_status).await { + Ok(true) => return Ok(()), + Ok(false) => {}, + Err(e) => { + // gRPC errors are transient — keep retrying + last_error = Some(e); + }, + } + + if tokio::time::Instant::now() >= deadline { + let current = get_current_tx_status(client, tx_id).await; + let extra = last_error.map(|e| format!(", last error: {e}")).unwrap_or_default(); + return Err(format!( + "Timed out after {:.1}s waiting for tx {tx_id} to reach status '{target_status}' (current: \ + {current}{extra})", + timeout.as_secs_f64() + )); + } + + let remaining = deadline.saturating_duration_since(tokio::time::Instant::now()); + tokio::time::sleep(interval.min(remaining)).await; + interval = Duration::from_secs_f64((interval.as_secs_f64() * 1.5).min(max_interval.as_secs_f64())); + } +} + +/// Check if the current transaction status matches the target. +async fn check_tx_status_matches( + client: &mut WalletGrpcClient, + tx_id: u64, + target_status: &str, +) -> Result { + let request = grpc::GetTransactionInfoRequest { + transaction_ids: vec![tx_id], + }; + let tx_info = client + .get_transaction_info(request) + .await + .map_err(|e| format!("gRPC error getting tx info for {tx_id}: {e}"))? + .into_inner(); + + let Some(tx) = tx_info.transactions.first() else { + return Err(format!("No transaction info returned for tx_id {tx_id}")); + }; + + Ok(tx_status_matches(tx.status(), target_status)) +} + +/// Get the current status of a transaction as a string (for error messages). +async fn get_current_tx_status(client: &mut WalletGrpcClient, tx_id: u64) -> String { + let request = grpc::GetTransactionInfoRequest { + transaction_ids: vec![tx_id], + }; + match client.get_transaction_info(request).await { + Ok(resp) => { + let inner = resp.into_inner(); + inner + .transactions + .first() + .map(|tx| format!("{:?}", tx.status())) + .unwrap_or_else(|| "unknown".to_string()) + }, + Err(e) => format!("error: {e}"), + } +} + +/// Check if a gRPC TransactionStatus enum matches the string-based target. +pub fn tx_status_matches(status: grpc::TransactionStatus, target: &str) -> bool { + match target { + "Pending" => matches!( + status, + grpc::TransactionStatus::Pending | + grpc::TransactionStatus::Completed | + grpc::TransactionStatus::Broadcast | + grpc::TransactionStatus::MinedUnconfirmed | + grpc::TransactionStatus::MinedConfirmed | + grpc::TransactionStatus::OneSidedUnconfirmed | + grpc::TransactionStatus::OneSidedConfirmed | + grpc::TransactionStatus::CoinbaseUnconfirmed | + grpc::TransactionStatus::CoinbaseConfirmed + ), + "Completed" => matches!( + status, + grpc::TransactionStatus::Completed | + grpc::TransactionStatus::Broadcast | + grpc::TransactionStatus::MinedUnconfirmed | + grpc::TransactionStatus::MinedConfirmed | + grpc::TransactionStatus::OneSidedUnconfirmed | + grpc::TransactionStatus::OneSidedConfirmed | + grpc::TransactionStatus::CoinbaseUnconfirmed | + grpc::TransactionStatus::CoinbaseConfirmed + ), + "Broadcast" => matches!( + status, + grpc::TransactionStatus::Broadcast | + grpc::TransactionStatus::MinedUnconfirmed | + grpc::TransactionStatus::MinedConfirmed | + grpc::TransactionStatus::OneSidedUnconfirmed | + grpc::TransactionStatus::OneSidedConfirmed | + grpc::TransactionStatus::CoinbaseUnconfirmed | + grpc::TransactionStatus::CoinbaseConfirmed + ), + "Mined_or_OneSidedUnconfirmed" => matches!( + status, + grpc::TransactionStatus::MinedUnconfirmed | + grpc::TransactionStatus::MinedConfirmed | + grpc::TransactionStatus::OneSidedUnconfirmed | + grpc::TransactionStatus::OneSidedConfirmed | + grpc::TransactionStatus::CoinbaseUnconfirmed | + grpc::TransactionStatus::CoinbaseConfirmed + ), + "Mined_or_OneSidedConfirmed" => matches!( + status, + grpc::TransactionStatus::MinedConfirmed | + grpc::TransactionStatus::OneSidedConfirmed | + grpc::TransactionStatus::CoinbaseConfirmed + ), + "Coinbase" => matches!( + status, + grpc::TransactionStatus::CoinbaseConfirmed | grpc::TransactionStatus::CoinbaseUnconfirmed + ), + _ => false, + } +} + +/// Check if a string-based status (from TransactionEvent) matches the target. +fn status_matches_target(event_status: &str, target: &str) -> bool { + match target { + "Pending" => true, // Any status satisfies "at least Pending" + "Completed" => !matches!(event_status, "Pending"), + "Broadcast" => matches!( + event_status, + "Broadcast" | + "MinedUnconfirmed" | + "MinedConfirmed" | + "OneSidedUnconfirmed" | + "OneSidedConfirmed" | + "CoinbaseUnconfirmed" | + "CoinbaseConfirmed" + ), + "Mined_or_OneSidedUnconfirmed" => matches!( + event_status, + "MinedUnconfirmed" | + "MinedConfirmed" | + "OneSidedUnconfirmed" | + "OneSidedConfirmed" | + "CoinbaseUnconfirmed" | + "CoinbaseConfirmed" + ), + "Mined_or_OneSidedConfirmed" => { + matches!( + event_status, + "MinedConfirmed" | "OneSidedConfirmed" | "CoinbaseConfirmed" + ) + }, + "Coinbase" => matches!(event_status, "CoinbaseConfirmed" | "CoinbaseUnconfirmed"), + _ => false, + } +} diff --git a/integration_tests/src/wallet_process.rs b/integration_tests/src/wallet_process.rs index 9b28a8436c..0c6358b77d 100644 --- a/integration_tests/src/wallet_process.rs +++ b/integration_tests/src/wallet_process.rs @@ -20,7 +20,7 @@ // WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE // USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -use std::{path::PathBuf, str::FromStr, thread}; +use std::{path::PathBuf, str::FromStr, thread, time::Duration}; use minotari_app_utilities::common_cli_args::CommonCliArgs; use minotari_console_wallet::{Cli, run_wallet_with_cli}; @@ -34,18 +34,18 @@ use tari_shutdown::Shutdown; use tokio::runtime; use tonic::transport::Channel; -use crate::{TariWorld, get_peer_addresses, get_port, wait_for_service}; +use crate::{TariWorld, get_peer_addresses, wait_for_service}; #[derive(Clone, Debug)] pub struct WalletProcess { pub config: WalletConfig, - pub grpc_port: u64, + pub grpc_port: u16, pub kill_signal: Shutdown, pub name: String, pub temp_dir_path: PathBuf, pub base_node_name: Option, pub peer_seeds: Vec, - pub http_port: u64, + pub http_port: u16, is_running: bool, } @@ -64,12 +64,9 @@ pub async fn spawn_wallet( routing_mechanism: Option, cli: Option, ) { - unsafe { - std::env::set_var("TARI_NETWORK", "localnet"); - } set_network_if_choice_valid(Network::LocalNet).unwrap(); - let grpc_port: u64; + let grpc_port: u16; let temp_dir_path: PathBuf; let mut wallet_config: WalletConfig; @@ -81,8 +78,12 @@ pub async fn spawn_wallet( temp_dir_path = wallet_ps.temp_dir_path.clone(); wallet_config = wallet_ps.config.clone(); } else { - // each spawned wallet will use different ports - grpc_port = get_port(world, 18500..18999).unwrap(); + // Allocate port from the global pool (pre-scanned at startup) + let wallet_ports = crate::port_pool::global_port_pool() + .allocate_wallet_ports() + .expect("Port pool exhausted — too many concurrent wallets"); + grpc_port = wallet_ports.grpc; + world.assigned_ports.insert(grpc_port, grpc_port); temp_dir_path = world .current_base_dir @@ -130,6 +131,16 @@ pub async fn spawn_wallet( wallet_app_config.wallet.db_file = PathBuf::from("console_wallet.db"); wallet_app_config.wallet.http_server_url = format!("http://127.0.0.1:{http_port}"); wallet_app_config.wallet.fallback_http_server_url = format!("http://127.0.0.1:{http_port}"); + // Tune transaction service timing for faster test execution + let tx_cfg = &mut wallet_app_config.wallet.transaction_service_config; + tx_cfg.broadcast_monitoring_timeout = Duration::from_secs(5); // prod: 30s + tx_cfg.chain_monitoring_timeout = Duration::from_secs(10); // prod: 60s + tx_cfg.direct_send_timeout = Duration::from_secs(5); // prod: 20s + tx_cfg.broadcast_send_timeout = Duration::from_secs(10); // prod: 60s + tx_cfg.transaction_resend_period = Duration::from_secs(30); // prod: 600s + tx_cfg.resend_response_cooldown = Duration::from_secs(10); // prod: 300s + tx_cfg.transaction_mempool_resubmission_window = Duration::from_secs(30); // prod: 600s + if let Some(mech) = routing_mechanism { wallet_app_config .wallet @@ -137,6 +148,16 @@ pub async fn spawn_wallet( .transaction_routing_mechanism = mech; } + // Tune wallet base node monitoring for faster chain detection + wallet_app_config + .wallet + .base_node_service_config + .base_node_monitor_max_refresh_interval = Duration::from_secs(5); // prod: 30s + + // Tune balance/broadcast responsiveness + wallet_app_config.wallet.balance_enquiry_cooldown_period = Duration::from_secs(1); // prod: 5s + wallet_app_config.wallet.grpc_broadcast_confirmation = 1000; // 1s, prod: 5s + wallet_app_config.wallet.set_base_path(temp_dir_path.clone()); let rt = runtime::Builder::new_multi_thread() @@ -223,19 +244,35 @@ pub fn get_default_cli() -> Cli { } pub async fn create_wallet_client(world: &TariWorld, wallet_name: String) -> anyhow::Result> { - let wallet_grpc_port = world.wallets.get(&wallet_name).unwrap().grpc_port; + let wallet_grpc_port = world + .wallets + .get(&wallet_name) + .ok_or_else(|| anyhow::anyhow!("Wallet process '{wallet_name}' not found in world"))? + .grpc_port; let wallet_addr = format!("http://127.0.0.1:{wallet_grpc_port}"); eprintln!("Wallet GRPC at {wallet_addr}"); - Ok(WalletGrpcClient::connect(wallet_addr.as_str()).await?) + tokio::time::timeout( + std::time::Duration::from_secs(10), + WalletGrpcClient::connect(wallet_addr.as_str()), + ) + .await + .map_err(|_| anyhow::anyhow!("Timed out connecting to wallet '{wallet_name}' gRPC at {wallet_addr}"))? + .map_err(|e| anyhow::anyhow!("Failed to connect to wallet '{wallet_name}' gRPC at {wallet_addr}: {e}")) } impl WalletProcess { #[allow(dead_code)] pub async fn get_grpc_client(&self) -> anyhow::Result> { let wallet_addr = format!("http://127.0.0.1:{}", self.grpc_port); - Ok(WalletGrpcClient::connect(wallet_addr.as_str()).await?) + tokio::time::timeout( + std::time::Duration::from_secs(10), + WalletGrpcClient::connect(wallet_addr.as_str()), + ) + .await + .map_err(|_| anyhow::anyhow!("Timed out connecting to wallet '{}' gRPC", self.name))? + .map_err(|e| anyhow::anyhow!("Failed to connect to wallet '{}' gRPC: {e}", self.name)) } pub fn is_running(&self) -> bool { @@ -244,6 +281,14 @@ impl WalletProcess { pub fn kill(&mut self) { self.kill_signal.trigger(); - self.is_running = !self.kill_signal.is_triggered(); + self.is_running = false; + // Wait for the gRPC port to be released so the next scenario doesn't hit port conflicts + let deadline = std::time::Instant::now() + std::time::Duration::from_secs(10); + while std::time::Instant::now() < deadline { + if std::net::TcpListener::bind(("127.0.0.1", self.grpc_port)).is_ok() { + break; + } + std::thread::sleep(std::time::Duration::from_millis(50)); + } } } diff --git a/integration_tests/src/world.rs b/integration_tests/src/world.rs index ae709d0cc3..ae140b8a8b 100644 --- a/integration_tests/src/world.rs +++ b/integration_tests/src/world.rs @@ -105,7 +105,7 @@ pub struct TariWorld { // This receiver wallet address will be used for default one-sided coinbase payments pub default_payment_address: TariAddress, pub consensus_manager: BaseNodeConsensusManager, - pub assigned_ports: IndexMap, + pub assigned_ports: IndexMap, /// Named benchmark timers, keyed by a label set in the feature file. /// Used by "I start benchmark timer {word}" / "I stop benchmark timer {word} and log elapsed time" steps. pub benchmark_timers: IndexMap, @@ -185,7 +185,11 @@ impl TariWorld { &self, name: &S, ) -> anyhow::Result> { - self.get_node(name)?.get_grpc_client().await + let node_name = name.as_ref(); + self.get_node(name)? + .get_grpc_client() + .await + .map_err(|e| anyhow::anyhow!("Failed to connect gRPC client to base node '{node_name}': {e}")) } pub async fn get_base_node_or_wallet_client>( @@ -205,19 +209,20 @@ impl TariWorld { if let Some(address) = self.wallet_addresses.get(name.as_ref()) { return Ok(address.clone()); } + let wallet_name = name.as_ref(); let address_bytes = match self.get_wallet_client(name).await { - Ok(wallet) => { - let mut wallet = wallet; - + Ok(mut wallet) => { wallet .get_address(minotari_wallet_grpc_client::grpc::Empty {}) .await - .unwrap() + .map_err(|e| anyhow::anyhow!("Failed to get address for wallet '{wallet_name}': {e}"))? .into_inner() .interactive_address }, Err(_) => { - let ffi_wallet = self.get_ffi_wallet(name).unwrap(); + let ffi_wallet = self + .get_ffi_wallet(name) + .map_err(|e| anyhow::anyhow!("No wallet or FFI wallet found for '{wallet_name}': {e}"))?; ffi_wallet.get_address().address().get_vec() }, @@ -227,19 +232,20 @@ impl TariWorld { } pub async fn get_wallet_one_sided_address>(&self, name: &S) -> anyhow::Result { + let wallet_name = name.as_ref(); let address_bytes = match self.get_wallet_client(name).await { - Ok(wallet) => { - let mut wallet = wallet; - + Ok(mut wallet) => { wallet .get_address(minotari_wallet_grpc_client::grpc::Empty {}) .await - .unwrap() + .map_err(|e| anyhow::anyhow!("Failed to get one-sided address for wallet '{wallet_name}': {e}"))? .into_inner() .one_sided_address }, Err(_) => { - let ffi_wallet = self.get_ffi_wallet(name).unwrap(); + let ffi_wallet = self + .get_ffi_wallet(name) + .map_err(|e| anyhow::anyhow!("No wallet or FFI wallet found for '{wallet_name}': {e}"))?; ffi_wallet.get_one_sided_address().address().get_vec() }, @@ -253,7 +259,11 @@ impl TariWorld { &self, name: &S, ) -> anyhow::Result> { - self.get_wallet(name)?.get_grpc_client().await + let wallet_name = name.as_ref(); + self.get_wallet(name)? + .get_grpc_client() + .await + .map_err(|e| anyhow::anyhow!("Failed to connect gRPC client to wallet '{wallet_name}': {e}")) } pub fn get_node>(&self, node_name: &S) -> anyhow::Result<&BaseNodeProcess> { @@ -319,14 +329,31 @@ impl TariWorld { } pub async fn after(&mut self, _scenario: &Scenario) { + let pool = crate::port_pool::global_port_pool(); + + // Kill wallets first — they depend on base nodes for (name, mut p) in self.wallets.drain(..) { println!("Shutting down wallet {name}"); - p.kill_signal.trigger(); + let grpc_port = p.grpc_port; + p.kill(); + // Return wallet gRPC port to pool for reuse + pool.return_wallet_ports(crate::port_pool::WalletPorts { + grpc: grpc_port, + http: 0, // wallet doesn't own an http port + }); } + // Then kill base nodes — kill() waits for ports to be released, + // preventing port conflicts with the next scenario for (name, mut p) in self.base_nodes.drain(..) { println!("Shutting down base node {name}"); - // You have explicitly trigger the shutdown now because of the change to use Arc/Mutex in tari_shutdown - p.kill_signal.trigger(); + let ports = crate::port_pool::BaseNodePorts { + p2p: p.port, + grpc: p.grpc_port, + http: p.http_port, + }; + p.kill(); + // Return ports to pool for reuse by next scenario + pool.return_base_node_ports(ports); } } diff --git a/integration_tests/tests/cucumber.rs b/integration_tests/tests/cucumber.rs index 8c68b58c95..48c3442662 100644 --- a/integration_tests/tests/cucumber.rs +++ b/integration_tests/tests/cucumber.rs @@ -44,6 +44,12 @@ pub const LOG_TARGET: &str = "cucumber"; pub const LOG_TARGET_STDOUT: &str = "stdout"; fn main() { + // Set the network env var once at startup — safe because no other threads exist yet. + // This replaces the unsafe set_var calls that were scattered across spawn functions. + unsafe { + std::env::set_var("TARI_NETWORK", "localnet"); + } + initialize_logging( &PathBuf::from("log4rs/cucumber.yml"), &PathBuf::from("./"), @@ -55,9 +61,9 @@ fn main() { let runtime = Runtime::new().unwrap(); runtime.block_on(async { let world = TariWorld::cucumber() - .repeat_failed() + // .repeat_failed() — removed: retrying hides flaky tests instead of surfacing them // following config needed to use eprint statements in the tests - .max_concurrent_scenarios(5) + .max_concurrent_scenarios(10) .after(move |_feature, _rule, scenario, ev, maybe_world| { Box::pin(async move { match ev { diff --git a/integration_tests/tests/steps/mod.rs b/integration_tests/tests/steps/mod.rs index 5a7ca64b01..876ff3c81e 100644 --- a/integration_tests/tests/steps/mod.rs +++ b/integration_tests/tests/steps/mod.rs @@ -39,7 +39,10 @@ pub mod wallet_ffi_steps; pub mod wallet_steps; pub const CONFIRMATION_PERIOD: u64 = 4; +// Deprecated: use tari_integration_tests::wait_for_or_panic with DEFAULT_TIMEOUT instead +#[allow(dead_code)] pub const TWO_MINUTES_WITH_HALF_SECOND_SLEEP: u64 = 240; +#[allow(dead_code)] pub const HALF_SECOND: u64 = 500; #[when(expr = "I wait {int} seconds")] diff --git a/integration_tests/tests/steps/node_steps.rs b/integration_tests/tests/steps/node_steps.rs index b181693b9d..30a7882ab4 100644 --- a/integration_tests/tests/steps/node_steps.rs +++ b/integration_tests/tests/steps/node_steps.rs @@ -30,28 +30,27 @@ use std::{ use cucumber::{given, then, when}; use futures::StreamExt; use indexmap::IndexMap; -use minotari_app_grpc::{ - tari_rpc, - tari_rpc::{ - self as grpc, - GetBlocksRequest, - GetNewBlockTemplateWithCoinbasesRequest, - GetNewBlockWithCoinbasesRequest, - ListHeadersRequest, - NewBlockCoinbase, - NewBlockTemplateRequest, - PowAlgo, - pow_algo::PowAlgos, - }, +use minotari_app_grpc::tari_rpc::{ + self as grpc, + GetBlocksRequest, + GetNewBlockTemplateWithCoinbasesRequest, + GetNewBlockWithCoinbasesRequest, + ListHeadersRequest, + NewBlockCoinbase, + NewBlockTemplateRequest, + PowAlgo, + pow_algo::PowAlgos, }; use minotari_node::BaseNodeConfig; use minotari_wallet_grpc_client::grpc::Empty; use tari_common_types::tari_address::TariAddress; use tari_integration_tests::{ + DEFAULT_TIMEOUT, TariWorld, base_node_process::{spawn_base_node, spawn_base_node_with_config}, get_peer_addresses, miner::mine_block_before_submit, + wait_for, }; use tari_node_components::blocks::Block; use tari_transaction_components::{ @@ -60,8 +59,6 @@ use tari_transaction_components::{ weight::TransactionWeight, }; -use crate::steps::{HALF_SECOND, TWO_MINUTES_WITH_HALF_SECOND_SLEEP}; - #[given(expr = "I have a seed node {word}")] #[when(expr = "I have a seed node {word}")] async fn start_base_node(world: &mut TariWorld, name: String) { @@ -104,37 +101,33 @@ async fn base_node_pending_connection_to(world: &mut TariWorld, first_node: Stri let second_client_pubkey = second_client.identify(Empty {}).await.unwrap().into_inner().public_key; - for _i in 0..100 { - let res: tonic::Response = - node_client.list_connected_peers(Empty {}).await.unwrap(); - let res = res.into_inner(); - if res.connected_peers.iter().any(|p| p.public_key == second_client_pubkey) { - return; + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("base node {first_node} to connect to {second_node}"), + condition: async { + let res = node_client.list_connected_peers(Empty {}).await.unwrap().into_inner(); + Ok(res.connected_peers.iter().any(|p| p.public_key == second_client_pubkey)) } - - tokio::time::sleep(Duration::from_secs(1)).await; - } - - panic!("Peer was not connected in time"); + ); } #[when(expr = "I wait base node for {word} to have {int} base node connections")] async fn wait_for_node_have_x_connections(world: &mut TariWorld, node: String, num_connections: usize) { let mut node_client = world.get_node_client(&node).await.unwrap(); - let mut connected_peers = 0; - for _i in 0..60 { - let res: tonic::Response = - node_client.list_connected_peers(Empty {}).await.unwrap(); - let res = res.into_inner(); - connected_peers = res.connected_peers.len(); - if res.connected_peers.len() >= num_connections { - return; - } - - tokio::time::sleep(Duration::from_secs(1)).await; - } - panic!("Peer was not connected in time, connected to {connected_peers} peers"); + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("node {node} to have {num_connections} connections"), + condition: async { + let res = node_client.list_connected_peers(Empty {}).await.unwrap().into_inner(); + let count = res.connected_peers.len(); + if count >= num_connections { + Ok(true) + } else { + Err(format!("connected to {count} peers")) + } + } + ); } #[then(expr = "all nodes are on the same chain at height {int}")] @@ -145,31 +138,31 @@ async fn all_nodes_on_same_chain_at_height(world: &mut TariWorld, height: u64) { nodes_at_height.insert(name, (0, vec![])); } - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP * height) { - for (name, _) in nodes_at_height - .clone() - .iter() - .filter(|(_, (at_height, _))| at_height != &height) - { - let mut client = world.get_node_client(name).await.unwrap(); - - let chain_tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); - let metadata = chain_tip.metadata.unwrap(); - - nodes_at_height.insert(name, (metadata.best_block_height, metadata.best_block_hash)); - } + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("all nodes to synchronize at chain height {height}"), + condition: async { + for (name, _) in nodes_at_height + .clone() + .iter() + .filter(|(_, (at_height, _))| at_height != &height) + { + let mut client = world.get_node_client(name).await.unwrap(); + let chain_tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); + let metadata = chain_tip.metadata.unwrap(); + nodes_at_height.insert(name, (metadata.best_block_height, metadata.best_block_hash)); + } - if nodes_at_height - .values() - .all(|(h, block_hash)| h == &height && block_hash == &nodes_at_height.values().last().unwrap().1) - { - return; + let all_synced = nodes_at_height + .values() + .all(|(h, block_hash)| h == &height && block_hash == &nodes_at_height.values().last().unwrap().1); + if all_synced { + Ok(true) + } else { + Err(format!("{nodes_at_height:?}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("base nodes not successfully synchronized at height {height}, {nodes_at_height:?}"); + ); } #[then(expr = "all nodes are on the same network difficulty")] @@ -204,74 +197,76 @@ async fn all_nodes_are_at_height(world: &mut TariWorld, height: u64) { nodes_at_height.insert(name, 0); } - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP * 7) { - // ~14 minutes matching the original implementation timeout - for (name, _) in nodes_at_height - .clone() - .iter() - .filter(|(_, at_height)| at_height != &&height) - { - let Ok(mut client) = world.get_node_client(name).await else { - continue; - }; - - let Ok(chain_tip) = client.get_tip_info(Empty {}).await else { - // Node may be temporarily unavailable during sync/reorg, retry later - continue; - }; - let chain_hgt = chain_tip.into_inner().metadata.unwrap().best_block_height; - - nodes_at_height.insert(name, chain_hgt); - } + // Use a generous timeout (was ~14 minutes originally) + wait_for!( + timeout: Duration::from_secs(840), + description: format!("all nodes to reach height {height}"), + condition: async { + for (name, _) in nodes_at_height + .clone() + .iter() + .filter(|(_, at_height)| at_height != &&height) + { + let Ok(mut client) = world.get_node_client(name).await else { + continue; + }; + let Ok(chain_tip) = client.get_tip_info(Empty {}).await else { + continue; + }; + let chain_hgt = chain_tip.into_inner().metadata.unwrap().best_block_height; + nodes_at_height.insert(name, chain_hgt); + } - if nodes_at_height.values().all(|h| h == &height) { - return; + if nodes_at_height.values().all(|h| h == &height) { + Ok(true) + } else { + Err(format!("{nodes_at_height:?}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("base nodes not successfully synchronized at height {height}, {nodes_at_height:?}"); + ); } #[when(expr = "node {word} is at height {int}")] #[then(expr = "node {word} is at height {int}")] async fn node_is_at_height(world: &mut TariWorld, base_node: String, height: u64) { let mut client = world.get_node_client(&base_node).await.unwrap(); - let mut chain_hgt = 0; - for _ in 0..=(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { - let chain_tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); - chain_hgt = chain_tip.metadata.unwrap().best_block_height; - - if chain_hgt >= height { - return; + // Use a generous timeout — this step is used for reorg scenarios where peer discovery + + // chain sync can take significant time, especially on slow CI machines. + // Use 1s max poll interval to detect height changes promptly during sync. + wait_for!( + timeout: Duration::from_secs(300), + max_interval: Duration::from_secs(1), + description: format!("node {base_node} to reach height {height}"), + condition: async { + let chain_tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); + let chain_hgt = chain_tip.metadata.unwrap().best_block_height; + if chain_hgt >= height { + Ok(true) + } else { + Err(format!("current height {chain_hgt}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - // base node didn't synchronize successfully at height, so we bail out - panic!("base node didn't synchronize successfully with height {height}, current chain height {chain_hgt}"); + ); } #[then(expr = "node {word} has a pruned height of {int}")] async fn pruned_height_of(world: &mut TariWorld, node: String, height: u64) { let mut client = world.get_node_client(&node).await.unwrap(); - let mut last_pruned_height = 0; - - for _ in 0..=TWO_MINUTES_WITH_HALF_SECOND_SLEEP { - let chain_tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); - last_pruned_height = chain_tip.metadata.unwrap().pruned_height; - if last_pruned_height == height { - return; + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("node {node} to reach pruned height {height}"), + condition: async { + let chain_tip = client.get_tip_info(Empty {}).await.unwrap().into_inner(); + let pruned_height = chain_tip.metadata.unwrap().pruned_height; + if pruned_height == height { + Ok(true) + } else { + Err(format!("current pruned height {pruned_height}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("Node {node} pruned height is {last_pruned_height} and never reached expected pruned height of {height}") + ); } #[given(expr = "I have a base node {word} connected to seed {word}")] @@ -306,55 +301,54 @@ async fn transaction_in_state( .get(&tx_name) .unwrap_or_else(|| panic!("Couldn't find transaction {tx_name}")); let sig = &tx.body.kernels()[0].excess_sig; - let mut last_state = "UNCHECKED: DEFAULT TEST STATE"; - - // Some state changes take up to 30 minutes to make - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP * 2) { - let resp = client - .transaction_state(grpc::TransactionStateRequest { - excess_sig: Some(sig.into()), - }) - .await?; - - let inner = resp.into_inner(); - - last_state = match inner.result { - 0 => "UNKNOWN", - 1 => "MEMPOOL", - 2 => "MINED", - 3 => "NOT_STORED", - _ => panic!("not getting a good result"), - }; - - if last_state == state { - return Ok(()); - } + let state_clone = state.clone(); + wait_for!( + timeout: Duration::from_secs(240), + description: format!("node {node} to have tx {tx_name} in state {state}"), + condition: async { + let resp = client + .transaction_state(grpc::TransactionStateRequest { + excess_sig: Some(sig.into()), + }) + .await + .map_err(|e| format!("gRPC error: {e}"))?; - tokio::time::sleep(Duration::from_millis(HALF_SECOND * 2)).await; - } + let inner = resp.into_inner(); + let current_state = match inner.result { + 0 => "UNKNOWN", + 1 => "MEMPOOL", + 2 => "MINED", + 3 => "NOT_STORED", + _ => panic!("not getting a good result"), + }; - panic!("The node {node} has tx {tx_name} in state {last_state} instead of the expected {state}"); + if current_state == state_clone { + Ok(true) + } else { + Err(format!("current state: {current_state}")) + } + } + ); + Ok(()) } #[then(expr = "I wait until base node {word} has {int} unconfirmed transactions in its mempool")] async fn base_node_has_unconfirmed_transaction_in_mempool(world: &mut TariWorld, node: String, num_transactions: u64) { let mut client = world.get_node_client(&node).await.unwrap(); - let mut unconfirmed_txs = 0; - - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { - let resp = client.get_mempool_stats(Empty {}).await.unwrap(); - let inner = resp.into_inner(); - - unconfirmed_txs = inner.unconfirmed_txs; - if inner.unconfirmed_txs == num_transactions { - return; + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("node {node} to have {num_transactions} unconfirmed txs in mempool"), + condition: async { + let resp = client.get_mempool_stats(Empty {}).await.unwrap(); + let inner = resp.into_inner(); + if inner.unconfirmed_txs == num_transactions { + Ok(true) + } else { + Err(format!("has {} unconfirmed txs", inner.unconfirmed_txs)) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("The node {node} has {unconfirmed_txs} unconfirmed txs instead of the expected {num_transactions}"); + ); } #[then(expr = "{word} is in the {word} of all nodes")] @@ -386,44 +380,46 @@ async fn tx_in_state_all_nodes_with_allowed_failure( } let can_fail = ((can_fail_percent as f64 * nodes.len() as f64) / 100.0).ceil() as u64; + let pool_clone = pool.clone(); + + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("{tx_name} to be in {pool} of all nodes (allowing {can_fail_percent}% failure)"), + condition: async { + for (name, _) in node_pool_status + .clone() + .iter() + .filter(|(_, in_pool)| ***in_pool != pool_clone) + { + let mut client = world.get_node_client(name).await.map_err(|e| e.to_string())?; + + let resp = client + .transaction_state(grpc::TransactionStateRequest { + excess_sig: Some(sig.into()), + }) + .await + .map_err(|e| e.to_string())?; + + let inner = resp.into_inner(); + let res_state = match inner.result { + 0 => "UNKNOWN", + 1 => "MEMPOOL", + 2 => "MINED", + 3 => "NOT_STORED", + _ => panic!("not getting a good result"), + }; + + node_pool_status.insert(name, res_state); + } - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP / 2) { - for (name, _) in node_pool_status - .clone() - .iter() - .filter(|(_, in_pool)| ***in_pool != pool) - { - let mut client = world.get_node_client(name).await?; - - let resp = client - .transaction_state(grpc::TransactionStateRequest { - excess_sig: Some(sig.into()), - }) - .await?; - - let inner = resp.into_inner(); - - let res_state = match inner.result { - 0 => "UNKNOWN", - 1 => "MEMPOOL", - 2 => "MINED", - 3 => "NOT_STORED", - _ => panic!("not getting a good result"), - }; - - node_pool_status.insert(name, res_state); - } - - if node_pool_status.values().filter(|v| ***v == pool).count() >= (nodes_count - can_fail as usize) { - return Ok(()); + if node_pool_status.values().filter(|v| ***v == pool_clone).count() >= (nodes_count - can_fail as usize) { + Ok(true) + } else { + Err(format!("{node_pool_status:?}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND / 2)).await; - } - - panic!( - "More than {can_fail_percent}% ({can_fail} node(s)) failed to get {tx_name} in {pool}, {node_pool_status:?}" ); + Ok(()) } #[then(expr = "I submit transaction {word} to {word}")] @@ -511,18 +507,18 @@ async fn node_state(world: &mut TariWorld, node_name: String, state: String) { _ => panic!("Invalid state"), }; let mut node_client = world.get_node_client(&node_name).await.unwrap(); - let mut actual_state = 0i32; - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { - let tip = node_client.get_tip_info(Empty {}).await.unwrap().into_inner(); - actual_state = tip.base_node_state; - if actual_state == expected_state { - return; + + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("node {node_name} to reach state {state}"), + condition: async { + let tip = node_client.get_tip_info(Empty {}).await.unwrap().into_inner(); + if tip.base_node_state == expected_state { + Ok(true) + } else { + Err(format!("current state: {}", tip.base_node_state)) + } } - tokio::time::sleep(Duration::from_millis(500)).await; - } - panic!( - "Node {} is in state {} but expected state {}", - node_name, actual_state, expected_state ); } @@ -540,12 +536,12 @@ async fn base_node_is_at_same_height_as_node(world: &mut TariWorld, base_node: S .best_block_height; let mut base_node_client = world.get_node_client(&base_node).await.unwrap(); - let mut current_height = 0; - let num_retries = 100; - 'outer: for _ in 0..12 { - 'inner: for _ in 0..num_retries { - current_height = base_node_client + wait_for!( + timeout: Duration::from_secs(600), + description: format!("node {base_node} to reach same height as node {peer_node}"), + condition: async { + expected_height = peer_node_client .get_tip_info(req) .await .unwrap() @@ -553,41 +549,24 @@ async fn base_node_is_at_same_height_as_node(world: &mut TariWorld, base_node: S .metadata .unwrap() .best_block_height; - if current_height >= expected_height { - break 'inner; - } - tokio::time::sleep(Duration::from_secs(5)).await; - } - - expected_height = peer_node_client - .get_tip_info(req) - .await - .unwrap() - .into_inner() - .metadata - .unwrap() - .best_block_height; - - current_height = base_node_client - .get_tip_info(req) - .await - .unwrap() - .into_inner() - .metadata - .unwrap() - .best_block_height; + let current_height = base_node_client + .get_tip_info(req) + .await + .unwrap() + .into_inner() + .metadata + .unwrap() + .best_block_height; - if current_height == expected_height { - break 'outer; + if current_height >= expected_height { + Ok(true) + } else { + Err(format!("current {current_height}, expected {expected_height}")) + } } - } - - if current_height == expected_height { - println!("Base node {base_node} is at the same height {current_height} as node {peer_node}"); - } else { - panic!("Base node {base_node} failed to synchronize at the same height as node {peer_node}"); - } + ); + println!("Base node {base_node} is at the same height as node {peer_node}"); } #[given(expr = "I stop node {word}")] @@ -1183,21 +1162,20 @@ async fn lagging_delayed_node(world: &mut TariWorld, delayed_node: String, node: #[then(expr = "node {word} has reached initial sync")] async fn node_reached_sync(world: &mut TariWorld, node: String) { let mut client = world.get_node_client(&node).await.unwrap(); - let mut longest_chain = 0; - - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP * 11) { - let tip_info = client.get_tip_info(Empty {}).await.unwrap().into_inner(); - let metadata = tip_info.metadata.unwrap(); - longest_chain = metadata.best_block_height; - if tip_info.initial_sync_achieved { - return; + wait_for!( + timeout: Duration::from_secs(660), + description: format!("node {node} to reach initial sync"), + condition: async { + let tip_info = client.get_tip_info(Empty {}).await.unwrap().into_inner(); + let longest_chain = tip_info.metadata.unwrap().best_block_height; + if tip_info.initial_sync_achieved { + Ok(true) + } else { + Err(format!("stuck at tip {longest_chain}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("Node {node} never reached initial sync. Stuck at tip {longest_chain}") + ); } #[when(expr = "I have {int} base nodes with pruning horizon {int} force syncing on node {word}")] @@ -1222,51 +1200,51 @@ async fn force_sync_node_with_an_army_of_pruned_nodes( #[then(expr = "{word} has at least {int} peers")] async fn has_at_least_num_peers(world: &mut TariWorld, node: String, num_peers: u64) { let mut client = world.get_node_client(&node).await.unwrap(); - let mut last_num_of_peers = 0; - - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { - last_num_of_peers = 0; - - let mut peers_stream = client.get_peers(grpc::GetPeersRequest {}).await.unwrap().into_inner(); - while let Some(resp) = peers_stream.next().await { - if let Ok(resp) = resp && - let Some(_peer) = resp.peer - { - last_num_of_peers += 1 + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("node {node} to have at least {num_peers} peers"), + condition: async { + let mut count = 0usize; + let mut peers_stream = client.get_peers(grpc::GetPeersRequest {}).await.unwrap().into_inner(); + while let Some(resp) = peers_stream.next().await { + if let Ok(resp) = resp && + let Some(_peer) = resp.peer + { + count += 1 + } + } + if count >= usize::try_from(num_peers).unwrap() { + Ok(true) + } else { + Err(format!("has {count} peers")) } } - - if last_num_of_peers >= usize::try_from(num_peers).unwrap() { - return; - } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("Node {node} only received {last_num_of_peers} of {num_peers} expected peers") + ); } #[when(expr = "I wait for base node {word} to have {int} base node connections")] async fn wait_for_base_node_connections(world: &mut TariWorld, node: String, num_connections: u64) { let mut client = world.get_node_client(&node).await.unwrap(); - let mut last_count = 0usize; - - for _ in 0..TWO_MINUTES_WITH_HALF_SECOND_SLEEP { - last_count = 0; - let mut peers_stream = client.get_peers(grpc::GetPeersRequest {}).await.unwrap().into_inner(); - while let Some(resp) = peers_stream.next().await { - if let Ok(resp) = resp && - resp.peer.is_some() - { - last_count += 1; + + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("node {node} to have at least {num_connections} connections"), + condition: async { + let mut count = 0usize; + let mut peers_stream = client.get_peers(grpc::GetPeersRequest {}).await.unwrap().into_inner(); + while let Some(resp) = peers_stream.next().await { + if let Ok(resp) = resp && + resp.peer.is_some() + { + count += 1; + } + } + if count >= num_connections as usize { + Ok(true) + } else { + Err(format!("has {count} connections")) } } - if last_count >= num_connections as usize { - return; - } - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!("Node {node} only has {last_count} connections, expected at least {num_connections}"); + ); } diff --git a/integration_tests/tests/steps/wallet_steps.rs b/integration_tests/tests/steps/wallet_steps.rs index f9bf848e23..fc043a0a18 100644 --- a/integration_tests/tests/steps/wallet_steps.rs +++ b/integration_tests/tests/steps/wallet_steps.rs @@ -45,7 +45,7 @@ use grpc::{ }; use minotari_app_grpc::{ tari_rpc, - tari_rpc::{self as grpc, GetBalanceResponse, GetStateRequest, TransactionStatus, TxOutputsToSpendTransfer}, + tari_rpc::{self as grpc, GetStateRequest, TransactionStatus, TxOutputsToSpendTransfer}, }; use minotari_console_wallet::{CliCommands, ExportUtxosArgs}; use minotari_wallet::transaction_service::config::TransactionRoutingMechanism; @@ -55,12 +55,15 @@ use tari_common_types::{ }; use tari_crypto::ristretto::pedersen::CompressedPedersenCommitment; use tari_integration_tests::{ + DEFAULT_TIMEOUT, + SHORT_TIMEOUT, TariWorld, transaction::{ build_transaction_with_output, build_transaction_with_output_and_fee_per_gram, build_transaction_with_output_and_lockheight, }, + wait_for, wallet_process::{create_wallet_client, get_default_cli, spawn_wallet}, }; use tari_script::{ExecutionStack, TariScript}; @@ -80,13 +83,7 @@ use tari_transaction_components::{ }; use tari_utilities::hex::Hex; -use crate::steps::{ - CONFIRMATION_PERIOD, - HALF_SECOND, - TWO_MINUTES_WITH_HALF_SECOND_SLEEP, - cucumber_steps_log, - mining_steps::create_miner, -}; +use crate::steps::{CONFIRMATION_PERIOD, cucumber_steps_log, mining_steps::create_miner}; pub const LOG_TARGET: &str = "cucumber::wallet_steps"; @@ -122,38 +119,31 @@ async fn start_wallet_connected_to_all_seed_nodes(world: &mut TariWorld, name: S #[then(expr = "I wait for wallet {word} to have at least {int} uT")] async fn wait_for_wallet_to_have_micro_tari(world: &mut TariWorld, wallet: String, amount: u64) { let wallet_ps = world.wallets.get(&wallet).unwrap(); - let num_retries = 100; - let mut client = wallet_ps.get_grpc_client().await.unwrap(); - let mut available_balance = 0; - for i in 0..=num_retries { - let _result = client.validate_all_transactions(ValidateRequest {}).await; - let balance = client - .get_balance(GetBalanceRequest { payment_id: None }) - .await - .unwrap() - .into_inner(); - available_balance = balance.available_balance; + // Wallet balance detection after mining requires blockchain scanning, which can be slow on CI. + // Original timeout was 100 retries × 2s = 200s. + wait_for!( + timeout: Duration::from_secs(200), + description: format!("wallet {wallet} to have at least {amount} uT"), + condition: async { + let _result = client.validate_all_transactions(ValidateRequest {}).await; + let balance = client + .get_balance(GetBalanceRequest { payment_id: None }) + .await + .unwrap() + .into_inner(); - if available_balance >= amount { - cucumber_steps_log(format!( - "Wallet {wallet} needs at least available {amount} uT (DONE), has {balance:?}" - )); - return; - } else if i % 5 == 0 { - cucumber_steps_log(format!( - "Wallet {wallet} needs at least available {amount} uT, has {balance:?}" - )); - } else { - // Nothing here + if balance.available_balance >= amount { + cucumber_steps_log(format!( + "Wallet {wallet} needs at least available {amount} uT (DONE), has {balance:?}" + )); + Ok(true) + } else { + Err(format!("available balance: {}", balance.available_balance)) + } } - - tokio::time::sleep(Duration::from_secs(2)).await; - } - - // failed to get wallet right amount, so we panic - panic!("wallet {wallet} failed to get balance of at least amount {amount}, current amount is {available_balance}"); + ); } #[when(expr = "I remember wallet {word} balance {word}")] @@ -198,7 +188,6 @@ async fn have_wallet_connect_to_seed_node(world: &mut TariWorld, wallet: String, #[when(expr = "wallet {word} detects all transactions as {word}")] #[then(expr = "wallet {word} detects all transactions as {word}")] -#[allow(clippy::too_many_lines)] async fn wallet_detects_all_txs_as_mined_status(world: &mut TariWorld, wallet_name: String, status: String) { let mut client = create_wallet_client(world, wallet_name.clone()).await.unwrap(); @@ -212,98 +201,33 @@ async fn wallet_detects_all_txs_as_mined_status(world: &mut TariWorld, wallet_na .unwrap() .into_inner(); - let num_retries = 100; - + // Collect all tx_ids first, then wait for each + let mut tx_ids = Vec::new(); while let Some(tx_info) = completed_tx_stream.next().await { let tx_info = tx_info.unwrap(); - let tx_id = tx_info.transaction.unwrap().tx_id; + tx_ids.push(tx_info.transaction.unwrap().tx_id); + } + for tx_id in tx_ids { cucumber_steps_log(format!("waiting for tx with tx_id = {tx_id} to be {status}")); - for retry in 0..=num_retries { - let request = GetTransactionInfoRequest { - transaction_ids: vec![tx_id], - }; - let tx_info = client.get_transaction_info(request).await.unwrap().into_inner(); - let tx_info = tx_info.transactions.first().unwrap(); - - if retry == num_retries { - panic!( - "Wallet {} failed to detect tx with tx_id = {} to be {}, current status is {:?}", - wallet_name.as_str(), - tx_id, - status, - tx_info.status() - ); - } - match status.as_str() { - "Pending" => match tx_info.status() { - grpc::TransactionStatus::Pending | - grpc::TransactionStatus::Completed | - grpc::TransactionStatus::Broadcast | - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed | - grpc::TransactionStatus::CoinbaseUnconfirmed | - grpc::TransactionStatus::CoinbaseConfirmed => { - break; - }, - _ => (), - }, - "Completed" => match tx_info.status() { - grpc::TransactionStatus::Completed | - grpc::TransactionStatus::Broadcast | - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed | - grpc::TransactionStatus::CoinbaseUnconfirmed | - grpc::TransactionStatus::CoinbaseConfirmed => { - break; - }, - _ => (), - }, - "Broadcast" => match tx_info.status() { - grpc::TransactionStatus::Broadcast | - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed | - grpc::TransactionStatus::CoinbaseUnconfirmed | - grpc::TransactionStatus::CoinbaseConfirmed => { - break; - }, - _ => (), - }, - "Mined_or_OneSidedUnconfirmed" => match tx_info.status() { - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed | - grpc::TransactionStatus::CoinbaseUnconfirmed | - grpc::TransactionStatus::CoinbaseConfirmed => { - break; - }, - _ => (), - }, - "Mined_or_OneSidedConfirmed" => match tx_info.status() { - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedConfirmed | - grpc::TransactionStatus::CoinbaseConfirmed => { - break; - }, - _ => (), - }, - "Coinbase" => match tx_info.status() { - grpc::TransactionStatus::CoinbaseConfirmed | grpc::TransactionStatus::CoinbaseUnconfirmed => { - break; - }, - _ => (), - }, - _ => panic!("Unknown status {status}, don't know what to expect"), + let status_clone = status.clone(); + wait_for!( + timeout: Duration::from_secs(200), + max_interval: Duration::from_secs(2), + description: format!("wallet {wallet_name} tx {tx_id} to reach status {status}"), + condition: async { + let request = GetTransactionInfoRequest { + transaction_ids: vec![tx_id], + }; + let tx_info = client.get_transaction_info(request).await.unwrap().into_inner(); + let tx_info = tx_info.transactions.first().unwrap(); + if tari_integration_tests::tx_event_stream::tx_status_matches(tx_info.status(), &status_clone) { + Ok(true) + } else { + Err(format!("current status: {:?}", tx_info.status())) + } } - tokio::time::sleep(Duration::from_secs(1)).await; - } + ); } } @@ -325,72 +249,27 @@ async fn wallet_detects_all_txs_are_at_least_in_some_status( }, }; - let num_retries = 100; - for tx_id in &tx_ids { - cucumber_steps_log(format!("waiting for tx with tx_id = {tx_id} to be pending")); - for retry in 0..=num_retries { - let request = GetTransactionInfoRequest { - transaction_ids: vec![*tx_id], - }; - let tx_info = client.get_transaction_info(request).await.unwrap().into_inner(); - let tx_info = tx_info.transactions.first().unwrap(); - - if retry == num_retries { - panic!( - "Wallet {} failed to detect tx with tx_id = {} to be at least {}", - wallet_name.as_str(), - tx_id, - status - ); - } - match status.as_str() { - "Pending" => match tx_info.status() { - grpc::TransactionStatus::Pending | - grpc::TransactionStatus::Completed | - grpc::TransactionStatus::Broadcast | - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed => { - break; - }, - _ => (), - }, - "Completed" => match tx_info.status() { - grpc::TransactionStatus::Completed | - grpc::TransactionStatus::Broadcast | - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed => { - break; - }, - _ => (), - }, - "Broadcast" => match tx_info.status() { - grpc::TransactionStatus::Broadcast | - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed => { - break; - }, - _ => (), - }, - "Mined_or_OneSidedUnconfirmed" => match tx_info.status() { - grpc::TransactionStatus::MinedUnconfirmed | - grpc::TransactionStatus::MinedConfirmed | - grpc::TransactionStatus::OneSidedUnconfirmed | - grpc::TransactionStatus::OneSidedConfirmed => { - break; - }, - _ => (), - }, - _ => panic!("Unknown status {status}, don't know what to expect"), + cucumber_steps_log(format!("waiting for tx with tx_id = {tx_id} to be at least {status}")); + let status_clone = status.clone(); + let tx_id_val = *tx_id; + wait_for!( + timeout: Duration::from_secs(200), + max_interval: Duration::from_secs(2), + description: format!("wallet {wallet_name} tx {tx_id_val} to reach at least status {status}"), + condition: async { + let request = GetTransactionInfoRequest { + transaction_ids: vec![tx_id_val], + }; + let tx_info = client.get_transaction_info(request).await.unwrap().into_inner(); + let tx_info = tx_info.transactions.first().unwrap(); + if tari_integration_tests::tx_event_stream::tx_status_matches(tx_info.status(), &status_clone) { + Ok(true) + } else { + Err(format!("current status: {:?}", tx_info.status())) + } } - tokio::time::sleep(Duration::from_secs(1)).await; - } + ); } } @@ -576,38 +455,33 @@ async fn wallet_has_at_least_num_txs(world: &mut TariWorld, wallet: String, num_ _ => panic!("Invalid transaction status {transaction_status}"), }; - let num_retries = 100; - let mut current_status = 0; - let mut total_found = 0; - - for _ in 0..num_retries { - let mut txs = client - .get_completed_transactions(grpc::GetCompletedTransactionsRequest { - payment_id: None, - block_hash: None, - block_height: None, - }) - .await - .unwrap() - .into_inner(); - let mut found_tx = 0; - while let Some(tx) = txs.next().await { - let tx_info = tx.unwrap().transaction.unwrap(); - current_status = tx_info.status; - if current_status == transaction_status { - found_tx += 1; + // Original timeout was 100 retries × 2s = 200s. + wait_for!( + timeout: Duration::from_secs(200), + description: format!("wallet {wallet} to have at least {num_txs} txs with status {transaction_status}"), + condition: async { + let mut txs = client + .get_completed_transactions(grpc::GetCompletedTransactionsRequest { + payment_id: None, + block_hash: None, + block_height: None, + }) + .await + .unwrap() + .into_inner(); + let mut found_tx = 0u64; + while let Some(tx) = txs.next().await { + let tx_info = tx.unwrap().transaction.unwrap(); + if tx_info.status == transaction_status { + found_tx += 1; + } + } + if found_tx >= num_txs { + Ok(true) + } else { + Err(format!("found {found_tx} matching txs")) } } - if found_tx >= num_txs { - return; - } - total_found += found_tx; - tokio::time::sleep(Duration::from_secs(2)).await; - } - - panic!( - "Wallet {wallet} failed to have at least num {num_txs} txs with status {transaction_status}, current status \ - is {current_status}, scanned txs {total_found}" ); } @@ -662,30 +536,27 @@ async fn wait_for_wallet_to_have_less_than_micro_tari(world: &mut TariWorld, wal let mut client = create_wallet_client(world, wallet.clone()).await.unwrap(); cucumber_steps_log(format!("Waiting for wallet {wallet} to have less than {amount} uT")); - let num_retries = 100; - for i in 0..num_retries { - let _result = client.validate_all_transactions(ValidateRequest {}).await; - let balance_res = client - .get_balance(GetBalanceRequest { payment_id: None }) - .await - .unwrap() - .into_inner(); - if balance_res.available_balance < amount { - cucumber_steps_log(format!( - "Wallet {wallet} needs less than available {amount} uT (DONE), has {balance_res:?}" - )); - return; - } else if i % 5 == 0 { - cucumber_steps_log(format!( - "Wallet {wallet} needs less than available {amount} uT, has {balance_res:?}" - )); - } else { - // Nothing here + // Original timeout was 100 retries × 2s = 200s. + wait_for!( + timeout: Duration::from_secs(200), + description: format!("wallet {wallet} to have less than {amount} uT"), + condition: async { + let _result = client.validate_all_transactions(ValidateRequest {}).await; + let balance_res = client + .get_balance(GetBalanceRequest { payment_id: None }) + .await + .unwrap() + .into_inner(); + if balance_res.available_balance < amount { + cucumber_steps_log(format!( + "Wallet {wallet} needs less than available {amount} uT (DONE), has {balance_res:?}" + )); + Ok(true) + } else { + Err(format!("available balance: {}", balance_res.available_balance)) + } } - tokio::time::sleep(Duration::from_secs(2)).await; - } - - panic!("Wallet {wallet} didn't get less than {amount} after num_retries {num_retries}"); + ); } #[then(expr = "I wait for wallet {word} to have scanned to height {int}")] @@ -696,26 +567,22 @@ async fn wait_for_wallet_to_have_scanned_to_height(world: &mut TariWorld, wallet "Waiting for wallet {wallet} to have scanned to height {height}" )); - let num_retries = 40; - for i in 0..num_retries { - let _result = client.validate_all_transactions(ValidateRequest {}).await; - let state_res = client.get_state(GetStateRequest {}).await.unwrap().into_inner(); - if state_res.scanned_height == height { - cucumber_steps_log(format!( - "Wallet {wallet} needs to scan to height {height} (DONE), current {state_res:?}" - )); - return; - } else if i % 3 == 0 { - cucumber_steps_log(format!( - "Wallet {wallet} needs to scan to height {height}, current {state_res:?}" - )); - } else { - // Nothing here + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("wallet {wallet} to scan to height {height}"), + condition: async { + let _result = client.validate_all_transactions(ValidateRequest {}).await; + let state_res = client.get_state(GetStateRequest {}).await.unwrap().into_inner(); + if state_res.scanned_height == height { + cucumber_steps_log(format!( + "Wallet {wallet} needs to scan to height {height} (DONE), current {state_res:?}" + )); + Ok(true) + } else { + Err(format!("scanned height: {}", state_res.scanned_height)) + } } - tokio::time::sleep(Duration::from_secs(2)).await; - } - - panic!("Wallet {wallet} didn't scan to height {height} after num_retries {num_retries}"); + ); } #[then(expr = "all wallets validate their transactions")] @@ -3272,96 +3139,89 @@ async fn burn_transaction(world: &mut TariWorld, amount: u64, wallet: String, fe let result = client.create_burn_transaction(req).await.unwrap(); let tx_id = result.into_inner().transaction_id; - let mut last_status = 0; - for _ in 0..(TWO_MINUTES_WITH_HALF_SECOND_SLEEP) { - let result = client - .get_transaction_info(grpc::GetTransactionInfoRequest { - transaction_ids: vec![tx_id], - }) - .await - .unwrap(); - - last_status = result.into_inner().transactions.last().unwrap().status; + wait_for!( + timeout: DEFAULT_TIMEOUT, + description: format!("burn transaction from {wallet} to be broadcast/confirmed"), + condition: async { + let result = client + .get_transaction_info(grpc::GetTransactionInfoRequest { + transaction_ids: vec![tx_id], + }) + .await + .unwrap(); - if let 1 | 2 | 6 = last_status { - return; + let status = result.into_inner().transactions.last().unwrap().status; + if let 1 | 2 | 6 = status { + Ok(true) + } else { + Err(format!("status: {status}")) + } } - - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; - } - - panic!( - "Burn transaction has status {last_status} when we desired 1 (TRANSACTION_STATUS_BROADCAST), 2 \ - (TRANSACTION_STATUS_UNCONFIRMED), or 6 (TRANSACTION_STATUS_CONFIRMED)" - ) + ); } #[then(expr = "wallet {word} balance is {word}")] async fn wallet_has_balance(world: &mut TariWorld, wallet_name: String, balance_key: String) { let mut client = world.get_wallet_client(&wallet_name).await.unwrap(); - let balance = world.balance.get(&balance_key).unwrap(); - - let balance_res = GetBalanceResponse::default(); - let num_retries = 30; - for i in 0..num_retries { - let _result = client.validate_all_transactions(ValidateRequest {}).await; - let balance_res = client - .get_balance(GetBalanceRequest { payment_id: None }) - .await - .unwrap() - .into_inner(); - if &balance_res == balance { - cucumber_steps_log(format!( - "Wallet {wallet_name} needs balance {balance:?} (DONE), has {balance_res:?}" - )); - return; - } else if i % 3 == 0 { - cucumber_steps_log(format!( - "Wallet {wallet_name} needs balance {balance:?}, has {balance_res:?}" - )); - } else { - // Nothing here + let balance = *world.balance.get(&balance_key).unwrap(); + + wait_for!( + timeout: SHORT_TIMEOUT, + description: format!("wallet {wallet_name} to match balance {balance_key}"), + condition: async { + let _result = client.validate_all_transactions(ValidateRequest {}).await; + let balance_res = client + .get_balance(GetBalanceRequest { payment_id: None }) + .await + .unwrap() + .into_inner(); + if balance_res == balance { + cucumber_steps_log(format!( + "Wallet {wallet_name} needs balance {balance:?} (DONE), has {balance_res:?}" + )); + Ok(true) + } else { + Err(format!("current: {balance_res:?}")) + } } - tokio::time::sleep(Duration::from_secs(2)).await; - } - - panic!("Wallet {wallet_name} doesn't have the correct balance: expected {balance:?} current {balance_res:?}"); + ); } #[then(expr = "wallet {word} has {int} coinbase transactions")] async fn wallet_has_num_coinbase_transactions(world: &mut TariWorld, wallet_name: String, expected: u64) { let mut client = create_wallet_client(world, wallet_name.clone()).await.unwrap(); - let num_retries = 100; - let mut found = 0u64; - for _ in 0..num_retries { - let mut txs = client - .get_completed_transactions(GetCompletedTransactionsRequest { - payment_id: None, - block_hash: None, - block_height: None, - }) - .await - .unwrap() - .into_inner(); - found = 0; - while let Some(tx) = txs.next().await { - let tx_info = tx.unwrap().transaction.unwrap(); - let is_coinbase = tx_info.status == grpc::TransactionStatus::Coinbase as i32 || - tx_info.status == grpc::TransactionStatus::CoinbaseConfirmed as i32 || - tx_info.status == grpc::TransactionStatus::CoinbaseUnconfirmed as i32 || - tx_info.status == grpc::TransactionStatus::CoinbaseNotInBlockChain as i32; - if is_coinbase { - found += 1; + + // Original timeout was 100 retries × 2s = 200s. + wait_for!( + timeout: Duration::from_secs(200), + description: format!("wallet {wallet_name} to have {expected} coinbase transactions"), + condition: async { + let mut txs = client + .get_completed_transactions(GetCompletedTransactionsRequest { + payment_id: None, + block_hash: None, + block_height: None, + }) + .await + .unwrap() + .into_inner(); + let mut found = 0u64; + while let Some(tx) = txs.next().await { + let tx_info = tx.unwrap().transaction.unwrap(); + let is_coinbase = tx_info.status == grpc::TransactionStatus::Coinbase as i32 || + tx_info.status == grpc::TransactionStatus::CoinbaseConfirmed as i32 || + tx_info.status == grpc::TransactionStatus::CoinbaseUnconfirmed as i32 || + tx_info.status == grpc::TransactionStatus::CoinbaseNotInBlockChain as i32; + if is_coinbase { + found += 1; + } + } + if found >= expected { + Ok(true) + } else { + Err(format!("found {found} coinbase txs")) } } - if found >= expected { - break; - } - tokio::time::sleep(Duration::from_secs(2)).await; - } - assert_eq!( - found, expected, - "Wallet {wallet_name} has {found} coinbase transactions, expected {expected}" ); } @@ -3589,7 +3449,7 @@ async fn wallet_has_payrefs_for_all_mined_transactions(world: &mut TariWorld, wa } }, } - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } assert!( found, @@ -3615,7 +3475,7 @@ async fn wallet_has_historical_payrefs(world: &mut TariWorld, wallet_name: Strin // We poll until the wallet has processed the reorg. let mut found_any_history = false; for tx_id in &tx_ids { - let num_retries = TWO_MINUTES_WITH_HALF_SECOND_SLEEP; + let num_retries = 240; for retry in 0..num_retries { let resp = client .get_transaction_pay_refs(GetTransactionPayRefsRequest { transaction_id: *tx_id }) @@ -3650,7 +3510,7 @@ async fn wallet_has_historical_payrefs(world: &mut TariWorld, wallet_name: Strin } }, } - tokio::time::sleep(Duration::from_millis(HALF_SECOND)).await; + tokio::time::sleep(Duration::from_millis(500)).await; } if found_any_history { break;