Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 12 additions & 18 deletions integration_tests/src/base_node_process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -282,26 +282,20 @@ impl BaseNodeProcess {

pub fn kill(&mut self) {
self.kill_signal.trigger();
loop {
// lets wait till the port is cleared
if TcpListener::bind(("127.0.0.1", self.port)).is_ok() {
break;
let deadline = std::time::Instant::now() + Duration::from_secs(30);
for (label, port) in [("p2p", self.port), ("grpc", self.grpc_port), ("http", self.http_port)] {
while std::time::Instant::now() < deadline {
if TcpListener::bind(("127.0.0.1", port)).is_ok() {
break;
}
std::thread::sleep(Duration::from_millis(20));
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
loop {
// lets wait till the port is cleared
if TcpListener::bind(("127.0.0.1", self.grpc_port)).is_ok() {
break;
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
loop {
// lets wait till the http port is cleared
if TcpListener::bind(("127.0.0.1", self.http_port)).is_ok() {
break;
if TcpListener::bind(("127.0.0.1", port)).is_err() {
eprintln!(
"WARNING: base node '{}' {} port {} was not released within 30s timeout",
self.name, label, port
);
}
std::thread::sleep(std::time::Duration::from_millis(20));
}
}
}
8 changes: 5 additions & 3 deletions integration_tests/src/merge_mining_proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ use tempfile::tempdir;
use tokio::runtime;
use tonic::transport::Channel;

use super::get_port;
use crate::{TariWorld, wait_for_service};
use crate::{TariWorld, port_pool, wait_for_service};

#[derive(Clone, Debug)]
pub struct MergeMiningProxyProcess {
Expand All @@ -52,11 +51,14 @@ pub async fn register_merge_mining_proxy_process(
wallet_name: String,
origin_submission: bool,
) {
let proxy_port = port_pool::global_port_pool()
.allocate_merge_mining_proxy_port()
.expect("Port pool exhausted — too many concurrent merge mining proxies");
let merge_mining_proxy = MergeMiningProxyProcess {
name: merge_mining_proxy_name.clone(),
base_node_name,
wallet_name,
port: get_port(world, 18000..18499).unwrap(),
port: proxy_port,
origin_submission,
id: 0,
};
Expand Down
20 changes: 19 additions & 1 deletion integration_tests/src/port_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ struct PortPoolInner {
wallet_grpc_ports: VecDeque<u16>,
/// Pre-validated available ports for wallet HTTP (20000-20499)
wallet_http_ports: VecDeque<u16>,
/// Pre-validated available ports for merge mining proxy (20500-20999)
merge_mining_proxy_ports: VecDeque<u16>,
}

/// A set of ports allocated for a single base node.
Expand Down Expand Up @@ -70,14 +72,17 @@ impl PortPool {
let http_ports = scan_available_ports(19000, 19499, capacity);
let wallet_grpc_ports = scan_available_ports(19500, 19999, capacity);
let wallet_http_ports = scan_available_ports(20000, 20499, capacity);
let merge_mining_proxy_ports = scan_available_ports(20500, 20999, capacity);

println!(
"PortPool initialized: {} p2p, {} grpc, {} http, {} wallet_grpc, {} wallet_http ports available",
"PortPool initialized: {} p2p, {} grpc, {} http, {} wallet_grpc, {} wallet_http, {} merge_mining_proxy \
ports available",
p2p_ports.len(),
grpc_ports.len(),
http_ports.len(),
wallet_grpc_ports.len(),
wallet_http_ports.len(),
merge_mining_proxy_ports.len(),
);

Self {
Expand All @@ -87,6 +92,7 @@ impl PortPool {
http_ports,
wallet_grpc_ports,
wallet_http_ports,
merge_mining_proxy_ports,
}),
}
}
Expand Down Expand Up @@ -124,6 +130,18 @@ impl PortPool {
inner.wallet_grpc_ports.push_back(ports.grpc);
inner.wallet_http_ports.push_back(ports.http);
}

/// Allocate a port for a merge mining proxy.
pub fn allocate_merge_mining_proxy_port(&self) -> Option<u16> {
let mut inner = self.pools.lock().unwrap();
inner.merge_mining_proxy_ports.pop_front()
}

/// Return a merge mining proxy port to the pool.
pub fn return_merge_mining_proxy_port(&self, port: u16) {
let mut inner = self.pools.lock().unwrap();
inner.merge_mining_proxy_ports.push_back(port);
}
}

/// Scan a port range and return up to `capacity` ports that are currently available.
Expand Down
23 changes: 21 additions & 2 deletions integration_tests/src/world.rs
Original file line number Diff line number Diff line change
Expand Up @@ -331,7 +331,13 @@ impl TariWorld {
pub async fn after(&mut self, _scenario: &Scenario) {
let pool = crate::port_pool::global_port_pool();

// Kill wallets first — they depend on base nodes
// Destroy FFI wallets first — they hold native resources and open connections
for (name, mut ffi_wallet) in self.ffi_wallets.drain(..) {
println!("Destroying FFI wallet {name}");
ffi_wallet.destroy();
}

// Kill wallets — they depend on base nodes
for (name, mut p) in self.wallets.drain(..) {
println!("Shutting down wallet {name}");
let grpc_port = p.grpc_port;
Expand All @@ -342,7 +348,20 @@ impl TariWorld {
http: 0, // wallet doesn't own an http port
});
}
// Then kill base nodes — kill() waits for ports to be released,

// Shut down merge mining proxies and return ports to the pool
for (name, proxy) in self.merge_mining_proxies.drain(..) {
println!("Shutting down merge mining proxy {name}");
pool.return_merge_mining_proxy_port(proxy.port);
}

// Drop miners (they don't own ports or long-lived resources, but clear them
// so they don't hold references to base nodes or wallets)
for (name, _miner) in self.miners.drain(..) {
println!("Dropping miner {name}");
}

// Kill base nodes last — kill() waits for ports to be released,
// preventing port conflicts with the next scenario
for (name, mut p) in self.base_nodes.drain(..) {
println!("Shutting down base node {name}");
Expand Down
Loading