Skip to content

Commit 0f1b0dc

Browse files
feat: add sqlite peer_db (#6963)
Description --- - Added an sqlite implementation of peer_db. - Removed the lmdb implementation of peer_db. Motivation and Context --- The lmdb peer_db was memory and resource intensive. How Has This Been Tested? --- New unit tests Existing unit tests depending on the new peer_db implementation pass System-level testing passed What process can a PR reviewer use to test or verify this change? --- Code review System-level testing <!-- Checklist --> <!-- 1. Is the title of your PR in the form that would make nice release notes? The title, excluding the conventional commit tag, will be included exactly as is in the CHANGELOG, so please think about it carefully. --> Breaking Changes --- - [ ] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [X] Other - Please specify <!-- Does this include a breaking change? If so, include this line as a footer --> BREAKING CHANGES: - The existing lmbd peer_db will be discarded (left intact but not used anymore). - In PeerManager, the method `fn closest_peers` is replaced with `fn closest_n_active_peers` as amore flexible alternative. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Summary by CodeRabbit - **New Features** - Transitioned peer storage backend from LMDB to SQLite, enhancing reliability and simplifying setup. - Added test utilities for creating peers with address stats and recent last-seen timestamps. - Extended peer and address data structures with constructors, conversion methods, and reset functions. - **Bug Fixes** - Unified peer add/update operations into `add_or_update_peer` for consistency across modules. - Corrected configuration comments to specify SQLite database paths instead of LMDB. - **Refactor** - Removed legacy peer query framework and migration code; replaced with async wrappers over SQLite. - Enhanced peer selection, filtering, and banning logic to leverage new database queries. - Simplified batch peer retrieval in wallet and FFI layers with direct SQL queries. - Updated test setups to use SQLite, replacing in-memory or LMDB dependencies. - Replaced direct sync calls with async wrappers in discovery, network, and connection modules. - Improved asynchronous handling and batch processing for peer management. - **Chores** - Removed LMDB and related dependencies; added SQLite, Diesel, and migration crates. - Created SQL migration scripts for node identity, peers, and addresses schemas. - Managed temporary database cleanup in tests and connection pools. - Cleaned imports and streamlined peer manager initialization in examples and tests. - **Documentation** - Updated configuration comments to clarify transition from LMDB to SQLite storage paths. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: Stan Bondi <sdbondi@users.noreply.github.com>
1 parent 8a6f229 commit 0f1b0dc

File tree

121 files changed

+6092
-2937
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

121 files changed

+6092
-2937
lines changed

Cargo.lock

Lines changed: 34 additions & 16 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

applications/minotari_console_wallet/src/automation/commands.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ use tari_common_types::{
8080
use tari_comms::{
8181
connectivity::{ConnectivityEvent, ConnectivityRequester},
8282
multiaddr::Multiaddr,
83-
peer_manager::{Peer, PeerQuery},
83+
peer_manager::Peer,
8484
types::CommsPublicKey,
8585
};
8686
use tari_comms_dht::{envelope::NodeDestination, DhtDiscoveryRequester};
@@ -2600,11 +2600,10 @@ pub async fn command_runner(
26002600
.map_err(|e| CommandError::General(e.to_string()))?;
26012601
// config
26022602

2603-
let query = PeerQuery::new().select_where(|p| p.is_seed());
26042603
let peer_seeds = wallet
26052604
.comms
26062605
.peer_manager()
2607-
.perform_query(query)
2606+
.get_seed_peers()
26082607
.await
26092608
.map_err(|e| CommandError::General(e.to_string()))?;
26102609
// config

applications/minotari_console_wallet/src/grpc/wallet_grpc_server.rs

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1205,15 +1205,29 @@ impl wallet_server::Wallet for WalletGrpcServer {
12051205
.await
12061206
.map_err(|err| Status::internal(err.to_string()))?;
12071207

1208-
let mut peers = Vec::with_capacity(connected_peers.len());
1209-
for conn in connected_peers {
1210-
peers.push(
1211-
peer_manager
1212-
.find_by_node_id(conn.peer_node_id())
1213-
.await
1214-
.map_err(|err| Status::internal(err.to_string()))?
1215-
.ok_or_else(|| Status::not_found(format!("Peer '{}' not found", conn.peer_node_id())))?,
1216-
);
1208+
let node_ids = connected_peers
1209+
.iter()
1210+
.map(|c| c.peer_node_id())
1211+
.cloned()
1212+
.collect::<Vec<_>>();
1213+
let peers = peer_manager
1214+
.get_peers_by_node_ids(&node_ids)
1215+
.await
1216+
.map_err(|err| Status::internal(err.to_string()))?;
1217+
if peers.len() != node_ids.len() {
1218+
let mut error_response = Vec::new();
1219+
node_ids.iter().for_each(|node_id| {
1220+
if !peers.iter().any(|p| p.node_id == *node_id) {
1221+
warn!(target: LOG_TARGET, "Peer '{}' not found", node_id);
1222+
error_response.push(format!("'{}'", node_id));
1223+
}
1224+
});
1225+
if !error_response.is_empty() {
1226+
return Err(Status::not_found(format!(
1227+
"Peer(s) not found: {}",
1228+
error_response.join(", ")
1229+
)));
1230+
}
12171231
}
12181232

12191233
let resp = tari_rpc::ListConnectedPeersResponse {

applications/minotari_console_wallet/src/init/mod.rs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ use tari_common_types::{
6161
};
6262
use tari_comms::{
6363
multiaddr::Multiaddr,
64-
peer_manager::{Peer, PeerFeatures, PeerQuery},
64+
peer_manager::{Peer, PeerFeatures},
6565
types::CommsPublicKey,
6666
NodeIdentity,
6767
};
@@ -347,11 +347,10 @@ pub async fn set_peer_and_get_base_node_peer_config(
347347
}
348348
}
349349
}
350-
let query = PeerQuery::new().select_where(|p| p.is_seed());
351-
let peer_seeds = wallet.comms.peer_manager().perform_query(query).await.map_err(|err| {
350+
let peer_seeds = wallet.comms.peer_manager().get_seed_peers().await.map_err(|err| {
352351
ExitError::new(
353352
ExitCode::InterfaceError,
354-
format!("Could net get seed peers from peer manager: {}", err),
353+
format!("Could not get seed peers from peer manager: {}", err),
355354
)
356355
})?;
357356
// config

applications/minotari_console_wallet/src/recovery.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ pub async fn wallet_recovery(
119119
);
120120
peer_public_keys.push(peer.public_key.clone());
121121
peer_manager
122-
.add_peer(peer)
122+
.add_or_update_peer(peer)
123123
.await
124124
.map_err(|err| ExitError::new(ExitCode::NetworkError, err))?;
125125
}

applications/minotari_console_wallet/src/ui/state/app_state.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -942,13 +942,15 @@ impl AppStateInner {
942942
self.refresh_network_id().await?;
943943
let connections = self.wallet.comms.connectivity().get_active_connections().await?;
944944
let peer_manager = self.wallet.comms.peer_manager();
945-
let mut peers = Vec::with_capacity(connections.len());
946-
for c in &connections {
947-
if let Ok(Some(p)) = peer_manager.find_by_node_id(c.peer_node_id()).await {
948-
peers.push(p);
949-
}
945+
let node_ids = connections
946+
.iter()
947+
.map(|c| c.peer_node_id())
948+
.cloned()
949+
.collect::<Vec<_>>();
950+
self.data.connected_peers = peer_manager.get_peers_by_node_ids(&node_ids).await?;
951+
if self.data.connected_peers.is_empty() {
952+
debug!(target: LOG_TARGET, "Failed to look up {} peers", node_ids.len());
950953
}
951-
self.data.connected_peers = peers;
952954
self.updated = true;
953955
Ok(())
954956
}

applications/minotari_console_wallet/src/ui/ui_error.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use minotari_wallet::{
2626
transaction_service::error::TransactionServiceError,
2727
};
2828
use tari_common_types::tari_address::TariAddressError;
29-
use tari_comms::connectivity::ConnectivityError;
29+
use tari_comms::{connectivity::ConnectivityError, peer_manager::PeerManagerError};
3030
use tari_contacts::contacts_service::error::ContactsServiceError;
3131
use tari_utilities::hex::HexError;
3232
use thiserror::Error;
@@ -47,6 +47,8 @@ pub enum UiError {
4747
WalletError(#[from] WalletError),
4848
#[error(transparent)]
4949
WalletStorageError(#[from] WalletStorageError),
50+
#[error(transparent)]
51+
PeerManagerError(#[from] PeerManagerError),
5052
#[error("Could not parse Tari Address: `{0}`")]
5153
TariAddressParseError(#[from] TariAddressError),
5254
#[error("Could not convert string into Net Address")]

applications/minotari_node/src/commands/command/add_peer.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ impl HandleCommand<ArgsAddPeer> for CommandContext {
6060
String::new(),
6161
);
6262
// If the peer exists, this will merge the given address
63-
peer_manager.add_peer(peer).await?;
63+
peer_manager.add_or_update_peer(peer).await?;
6464
println!("Peer with node id '{}' was added to the base node.", node_id);
6565
self.dial_peer(node_id).await?;
6666
Ok(())

applications/minotari_node/src/commands/command/list_connections.rs

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,26 @@ impl CommandContext {
5454
"Info",
5555
]);
5656
let peer_manager = self.comms.peer_manager();
57-
for conn in conns {
58-
let peer = peer_manager
59-
.find_by_node_id(conn.peer_node_id())
60-
.await
61-
.expect("Unexpected peer database error")
62-
.expect("Peer not found");
57+
let node_ids = conns
58+
.iter()
59+
.map(|conn| conn.peer_node_id())
60+
.cloned()
61+
.collect::<Vec<_>>();
62+
let peers = match peer_manager.get_peers_by_node_ids(&node_ids).await {
63+
Ok(val) => val,
64+
Err(e) => {
65+
println!("Error: Unexpected peer database error: {}", e);
66+
return;
67+
},
68+
};
69+
if peers.len() != node_ids.len() {
70+
println!("\nError: Peer manager returned fewer peers than requested\n");
71+
}
72+
for peer in peers {
73+
let conn = match conns.iter().find(|conn| conn.peer_node_id() == &peer.node_id) {
74+
None => continue,
75+
Some(val) => val,
76+
};
6377

6478
let chain_height = peer
6579
.get_metadata(1)

applications/minotari_node/src/commands/command/list_peers.rs

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use anyhow::Error;
2424
use async_trait::async_trait;
2525
use chrono::Utc;
2626
use clap::Parser;
27-
use tari_comms::peer_manager::PeerQuery;
27+
use tari_comms::peer_manager::PeerFeatures;
2828
use tari_core::base_node::state_machine_service::states::PeerMetadata;
2929

3030
use super::{CommandContext, HandleCommand};
@@ -45,16 +45,17 @@ impl HandleCommand<Args> for CommandContext {
4545

4646
impl CommandContext {
4747
pub async fn list_peers(&self, filter: Option<String>) -> Result<(), Error> {
48-
let mut query = PeerQuery::new();
49-
if let Some(f) = filter {
50-
let filter = f.to_lowercase();
51-
query = query.select_where(move |p| match filter.as_str() {
52-
"basenode" | "basenodes" | "base_node" | "base-node" | "bn" => p.features.is_node(),
53-
"wallet" | "wallets" | "w" => p.features.is_client(),
54-
_ => false,
55-
})
56-
}
57-
let mut peers = self.comms.peer_manager().perform_query(query).await?;
48+
let features = filter.as_ref().and_then(|value| match value.to_lowercase().as_str() {
49+
"basenode" | "basenodes" | "base_node" | "base-node" | "bn" => Some(PeerFeatures::COMMUNICATION_NODE),
50+
"wallet" | "wallets" | "w" => Some(PeerFeatures::COMMUNICATION_CLIENT),
51+
_ => {
52+
println!("Unknown filter '{:?}'; list-peers", filter);
53+
println!(" Try 'basenode', 'basenodes', 'base-node', 'base_node', 'wallet', 'wallets', 'w'");
54+
None
55+
},
56+
});
57+
58+
let mut peers = self.comms.peer_manager().all(features).await?;
5859
let num_peers = peers.len();
5960
println!();
6061
let mut table = Table::new();

0 commit comments

Comments
 (0)