diff --git a/integration_tests/src/base_node_process.rs b/integration_tests/src/base_node_process.rs index e4916678c9..006cef484e 100644 --- a/integration_tests/src/base_node_process.rs +++ b/integration_tests/src/base_node_process.rs @@ -282,26 +282,20 @@ impl BaseNodeProcess { pub fn kill(&mut self) { self.kill_signal.trigger(); - loop { - // lets wait till the port is cleared - if TcpListener::bind(("127.0.0.1", self.port)).is_ok() { - break; + let deadline = std::time::Instant::now() + Duration::from_secs(30); + for (label, port) in [("p2p", self.port), ("grpc", self.grpc_port), ("http", self.http_port)] { + while std::time::Instant::now() < deadline { + if TcpListener::bind(("127.0.0.1", port)).is_ok() { + break; + } + std::thread::sleep(Duration::from_millis(20)); } - std::thread::sleep(std::time::Duration::from_millis(20)); - } - loop { - // lets wait till the port is cleared - if TcpListener::bind(("127.0.0.1", self.grpc_port)).is_ok() { - break; - } - std::thread::sleep(std::time::Duration::from_millis(20)); - } - loop { - // lets wait till the http port is cleared - if TcpListener::bind(("127.0.0.1", self.http_port)).is_ok() { - break; + if TcpListener::bind(("127.0.0.1", port)).is_err() { + eprintln!( + "WARNING: base node '{}' {} port {} was not released within 30s timeout", + self.name, label, port + ); } - std::thread::sleep(std::time::Duration::from_millis(20)); } } } diff --git a/integration_tests/src/merge_mining_proxy.rs b/integration_tests/src/merge_mining_proxy.rs index bad81f8b92..9c0aa61cef 100644 --- a/integration_tests/src/merge_mining_proxy.rs +++ b/integration_tests/src/merge_mining_proxy.rs @@ -32,8 +32,7 @@ use tempfile::tempdir; use tokio::runtime; use tonic::transport::Channel; -use super::get_port; -use crate::{TariWorld, wait_for_service}; +use crate::{TariWorld, port_pool, wait_for_service}; #[derive(Clone, Debug)] pub struct MergeMiningProxyProcess { @@ -52,11 +51,14 @@ pub async fn register_merge_mining_proxy_process( wallet_name: String, origin_submission: bool, ) { + let proxy_port = port_pool::global_port_pool() + .allocate_merge_mining_proxy_port() + .expect("Port pool exhausted — too many concurrent merge mining proxies"); let merge_mining_proxy = MergeMiningProxyProcess { name: merge_mining_proxy_name.clone(), base_node_name, wallet_name, - port: get_port(world, 18000..18499).unwrap(), + port: proxy_port, origin_submission, id: 0, }; diff --git a/integration_tests/src/port_pool.rs b/integration_tests/src/port_pool.rs index c0da57f063..100b35465b 100644 --- a/integration_tests/src/port_pool.rs +++ b/integration_tests/src/port_pool.rs @@ -42,6 +42,8 @@ struct PortPoolInner { wallet_grpc_ports: VecDeque, /// Pre-validated available ports for wallet HTTP (20000-20499) wallet_http_ports: VecDeque, + /// Pre-validated available ports for merge mining proxy (20500-20999) + merge_mining_proxy_ports: VecDeque, } /// A set of ports allocated for a single base node. @@ -70,14 +72,17 @@ impl PortPool { let http_ports = scan_available_ports(19000, 19499, capacity); let wallet_grpc_ports = scan_available_ports(19500, 19999, capacity); let wallet_http_ports = scan_available_ports(20000, 20499, capacity); + let merge_mining_proxy_ports = scan_available_ports(20500, 20999, capacity); println!( - "PortPool initialized: {} p2p, {} grpc, {} http, {} wallet_grpc, {} wallet_http ports available", + "PortPool initialized: {} p2p, {} grpc, {} http, {} wallet_grpc, {} wallet_http, {} merge_mining_proxy \ + ports available", p2p_ports.len(), grpc_ports.len(), http_ports.len(), wallet_grpc_ports.len(), wallet_http_ports.len(), + merge_mining_proxy_ports.len(), ); Self { @@ -87,6 +92,7 @@ impl PortPool { http_ports, wallet_grpc_ports, wallet_http_ports, + merge_mining_proxy_ports, }), } } @@ -124,6 +130,18 @@ impl PortPool { inner.wallet_grpc_ports.push_back(ports.grpc); inner.wallet_http_ports.push_back(ports.http); } + + /// Allocate a port for a merge mining proxy. + pub fn allocate_merge_mining_proxy_port(&self) -> Option { + let mut inner = self.pools.lock().unwrap(); + inner.merge_mining_proxy_ports.pop_front() + } + + /// Return a merge mining proxy port to the pool. + pub fn return_merge_mining_proxy_port(&self, port: u16) { + let mut inner = self.pools.lock().unwrap(); + inner.merge_mining_proxy_ports.push_back(port); + } } /// Scan a port range and return up to `capacity` ports that are currently available. diff --git a/integration_tests/src/world.rs b/integration_tests/src/world.rs index ae140b8a8b..f3a3b6b450 100644 --- a/integration_tests/src/world.rs +++ b/integration_tests/src/world.rs @@ -331,7 +331,13 @@ impl TariWorld { pub async fn after(&mut self, _scenario: &Scenario) { let pool = crate::port_pool::global_port_pool(); - // Kill wallets first — they depend on base nodes + // Destroy FFI wallets first — they hold native resources and open connections + for (name, mut ffi_wallet) in self.ffi_wallets.drain(..) { + println!("Destroying FFI wallet {name}"); + ffi_wallet.destroy(); + } + + // Kill wallets — they depend on base nodes for (name, mut p) in self.wallets.drain(..) { println!("Shutting down wallet {name}"); let grpc_port = p.grpc_port; @@ -342,7 +348,20 @@ impl TariWorld { http: 0, // wallet doesn't own an http port }); } - // Then kill base nodes — kill() waits for ports to be released, + + // Shut down merge mining proxies and return ports to the pool + for (name, proxy) in self.merge_mining_proxies.drain(..) { + println!("Shutting down merge mining proxy {name}"); + pool.return_merge_mining_proxy_port(proxy.port); + } + + // Drop miners (they don't own ports or long-lived resources, but clear them + // so they don't hold references to base nodes or wallets) + for (name, _miner) in self.miners.drain(..) { + println!("Dropping miner {name}"); + } + + // Kill base nodes last — kill() waits for ports to be released, // preventing port conflicts with the next scenario for (name, mut p) in self.base_nodes.drain(..) { println!("Shutting down base node {name}");