Skip to content

Commit 7867d12

Browse files
fix: peer retention and connections (#7123)
Description --- - Fixed peer retention in the database by improving delete all stale peers logic. - Improved error responses when peers could not be found in the database. - Improved DHT connection pools sync with peer database. Motivation and Context --- DHT neighbour and random pools were not in sync with actual connections and peers in the peer db. Issue - Why `PeerManagerError(PeerNotFoundError)` ``` 2025-05-28 04:49:55.101009900 [comms::dht::connectivity] ERROR Error refreshing neighbour peer pool: PeerManagerError(PeerNotFoundError) 2025-05-28 04:49:55.101120100 [comms::connectivity::manager] TRACE Request (14743808475136314793): GetAllowList(Sender { inner: Some(Inner { state: State { is_complete: false, is_closed: false, is_rx_task_set: true, is_tx_task_set: false } }) }) 2025-05-28 04:49:55.101160300 [comms::connectivity::manager] TRACE Request (14743808475136314793) done 2025-05-28 04:49:55.104823200 [comms::dht::connectivity] ERROR Error refreshing random peer pool: PeerManagerError(PeerNotFoundError) ``` Issue - Why `0 connected` but `active DHT connections: 10/12` ``` 2025-05-28 04:49:55.104929100 [comms::dht::connectivity] DEBUG DHT connectivity status: neighbour pool: 8/8 (0 connected), random pool: 4/4 (0 connected, last refreshed 12777s ago), active DHT connections: 10/12 ``` Issue - Why `Inbound pipeline returned an error: 'The requested peer does not exist'` ``` 2025-05-28 10:42:21.447513700 [comms::pipeline::inbound] WARN Inbound pipeline returned an error: 'The requested peer does not exist' ``` How Has This Been Tested? --- - Adapted unit test for improved delete all stale peers logic. - System-level testing [**TBD**] What process can a PR reviewer use to test or verify this change? --- - Code review. - System-level testing <!-- Checklist --> <!-- 1. Is the title of your PR in the form that would make nice release notes? The title, excluding the conventional commit tag, will be included exactly as is in the CHANGELOG, so please think about it carefully. --> Breaking Changes --- - [x] None - [ ] Requires data directory on base node to be deleted - [ ] Requires hard fork - [ ] Other - Please specify <!-- Does this include a breaking change? If so, include this line as a footer --> <!-- BREAKING CHANGE: Description what the user should do, e.g. delete a database, resync the chain --> <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Improved error messages now include detailed peer identifiers for easier troubleshooting. - Introduced distinct soft and hard peer deletion methods for better peer lifecycle management. - Support for duplicate peer addresses has been added, allowing more flexible network setups. - **Bug Fixes** - Enhanced error handling to skip peers with inaccessible addresses without failing operations. - **Database Migration** - Removed the unique constraint on peer addresses to support multiple peers sharing the same address. - **Tests** - Added and updated tests to validate peer deletion behaviors and duplicate address support. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent e5a0854 commit 7867d12

File tree

15 files changed

+303
-116
lines changed

15 files changed

+303
-116
lines changed

applications/minotari_node/src/commands/command/test_peer_liveness.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl HandleCommand<ArgsTestPeerLiveness> for CommandContext {
8686
let address_clone = args.address.clone();
8787

8888
// Remove the peer from the peer manager (not the peer db)
89-
let _res = peer_manager.delete_peer(&node_id).await;
89+
let _res = peer_manager.soft_delete_peer(&node_id).await;
9090

9191
// Create a new peer with the given address, if the peer exists, this will merge the given address
9292
let peer = Peer::new(

comms/core/src/connection_manager/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -510,7 +510,7 @@ where
510510
warn!(target: LOG_TARGET, "Peer not found for dial");
511511
if let Some(reply) = reply {
512512
let _result = reply.send(Err(ConnectionManagerError::PeerManagerError(
513-
PeerManagerError::PeerNotFoundError,
513+
PeerManagerError::peer_not_found(&node_id),
514514
)));
515515
}
516516
},

comms/core/src/connection_manager/tests/manager.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ async fn connect_to_nonexistent_peer() {
7878
rt_handle.spawn(connection_manager.run());
7979

8080
let err = requester.dial_peer(NodeId::default()).await.unwrap_err();
81-
unpack_enum!(ConnectionManagerError::PeerManagerError(PeerManagerError::PeerNotFoundError) = err);
81+
unpack_enum!(ConnectionManagerError::PeerManagerError(PeerManagerError::PeerNotFound(_)) = err);
8282

8383
shutdown.trigger();
8484
}

comms/core/src/connectivity/manager.rs

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -467,7 +467,12 @@ impl ConnectivityManagerActor {
467467

468468
async fn delete_stale_peers_from_db(&mut self, task_id: u64) {
469469
let start = Instant::now();
470-
match tokio::time::timeout(STALE_PEER_DELETE_TIMEOUT, self.peer_manager.delete_all_stale_peers()).await {
470+
match tokio::time::timeout(
471+
STALE_PEER_DELETE_TIMEOUT,
472+
self.peer_manager.hard_delete_all_stale_peers(),
473+
)
474+
.await
475+
{
471476
Ok(res) => match res {
472477
Ok(deleted) => {
473478
let len = deleted.len();
@@ -713,7 +718,7 @@ impl ConnectivityManagerActor {
713718
.map(|d| format!("{}s ago", d.as_secs()))
714719
.unwrap_or_else(|| "Never".to_string()),
715720
);
716-
self.peer_manager.delete_peer(node_id).await?;
721+
self.peer_manager.soft_delete_peer(node_id).await?;
717722
}
718723
}
719724
}
@@ -1108,7 +1113,7 @@ impl ConnectivityManagerActor {
11081113
format_duration(duration),
11091114
reason
11101115
);
1111-
self.peer_manager.ban_peer_by_node_id(node_id, duration, reason).await?;
1116+
let ban_result = self.peer_manager.ban_peer_by_node_id(node_id, duration, reason).await;
11121117

11131118
#[cfg(feature = "metrics")]
11141119
super::metrics::banned_peers_counter().inc();
@@ -1123,6 +1128,7 @@ impl ConnectivityManagerActor {
11231128
"Disconnected banned peer {}. The peer connection status is {}", node_id, status
11241129
);
11251130
}
1131+
ban_result?;
11261132
Ok(())
11271133
}
11281134

comms/core/src/peer_manager/error.rs

Lines changed: 57 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222

2323
// use std::sync::PoisonError;
2424

25+
use std::fmt;
26+
2527
use multiaddr::Multiaddr;
2628
use tari_common_sqlite::error::StorageError;
27-
use tari_utilities::hex::HexError;
29+
use tari_utilities::hex::{Hex, HexError};
2830
use thiserror::Error;
2931
use tokio::task::JoinError;
3032

@@ -33,8 +35,8 @@ use crate::peer_manager::NodeId;
3335
/// Error type for [PeerManager](super::PeerManager).
3436
#[derive(Debug, Error, Clone)]
3537
pub enum PeerManagerError {
36-
#[error("The requested peer does not exist")]
37-
PeerNotFoundError,
38+
#[error("The requested peer does not exist: {0}")]
39+
PeerNotFound(DisplayVec),
3840
#[error("DB Data inconsistency: {0}")]
3941
DataInconsistency(String),
4042
#[error("The peer has been banned")]
@@ -68,7 +70,6 @@ pub enum PeerManagerError {
6870
#[error("Process error: `{0}`")]
6971
ProcessError(String),
7072
}
71-
7273
impl From<JoinError> for PeerManagerError {
7374
fn from(err: JoinError) -> Self {
7475
PeerManagerError::JoinError(err.to_string())
@@ -81,13 +82,6 @@ impl From<StorageError> for PeerManagerError {
8182
}
8283
}
8384

84-
impl PeerManagerError {
85-
/// Returns true if this error indicates that the peer is not found, otherwise false
86-
pub fn is_peer_not_found(&self) -> bool {
87-
matches!(self, PeerManagerError::PeerNotFoundError)
88-
}
89-
}
90-
9185
impl From<HexError> for PeerManagerError {
9286
fn from(value: HexError) -> Self {
9387
PeerManagerError::HexError(value.to_string())
@@ -99,3 +93,55 @@ impl From<std::io::Error> for PeerManagerError {
9993
PeerManagerError::StorageError(value.to_string())
10094
}
10195
}
96+
97+
/// Display helper struct for a vector of strings.
98+
#[derive(Debug, Error, Clone)]
99+
pub struct DisplayVec(Vec<NodeId>);
100+
101+
impl fmt::Display for DisplayVec {
102+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
103+
write!(
104+
f,
105+
"[{}]",
106+
self.0.iter().map(|v| v.to_hex()).collect::<Vec<_>>().join(", ")
107+
)
108+
}
109+
}
110+
111+
impl PeerManagerError {
112+
pub fn peer_not_found(peer: &NodeId) -> Self {
113+
PeerManagerError::PeerNotFound(DisplayVec(vec![peer.clone()]))
114+
}
115+
116+
pub fn peers_not_found<T>(peers: T) -> Self
117+
where T: AsRef<[NodeId]> {
118+
PeerManagerError::PeerNotFound(DisplayVec(peers.as_ref().to_vec()))
119+
}
120+
}
121+
122+
#[cfg(test)]
123+
mod tests {
124+
use super::*;
125+
126+
#[test]
127+
fn test_peer_not_found() {
128+
let peers = [
129+
NodeId::from_hex("abb1556d806c2ff042f433ca0a").unwrap(),
130+
NodeId::from_hex("ba9ab662a6d974c5a607562326").unwrap(),
131+
NodeId::from_hex("97676095b1901327bdc36e8cb6").unwrap(),
132+
];
133+
134+
let error = PeerManagerError::peers_not_found(&peers);
135+
assert_eq!(
136+
error.to_string(),
137+
"The requested peer does not exist: [abb1556d806c2ff042f433ca0a, ba9ab662a6d974c5a607562326, \
138+
97676095b1901327bdc36e8cb6]"
139+
);
140+
141+
let error = PeerManagerError::peer_not_found(&peers[0]);
142+
assert_eq!(
143+
error.to_string(),
144+
"The requested peer does not exist: [abb1556d806c2ff042f433ca0a]"
145+
);
146+
}
147+
}

comms/core/src/peer_manager/manager.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -80,9 +80,9 @@ impl PeerManager {
8080
Ok(peer_id)
8181
}
8282

83-
/// The peer with the specified public_key will be removed from the PeerManager
84-
pub async fn delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
85-
self.peer_storage_sql.delete_peer(node_id)?;
83+
/// The peer with the specified node id will be soft deleted (marked as deleted)
84+
pub async fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
85+
self.peer_storage_sql.soft_delete_peer(node_id)?;
8686
#[cfg(feature = "metrics")]
8787
{
8888
let count = self.count().await;
@@ -93,8 +93,8 @@ impl PeerManager {
9393
}
9494

9595
/// Delete all stale peers, removing them from the database and returning their node_ids
96-
pub async fn delete_all_stale_peers(&self) -> Result<Vec<NodeId>, PeerManagerError> {
97-
let deleted_peers = self.peer_storage_sql.delete_all_stale_peers()?;
96+
pub async fn hard_delete_all_stale_peers(&self) -> Result<Vec<NodeId>, PeerManagerError> {
97+
let deleted_peers = self.peer_storage_sql.hard_delete_all_stale_peers()?;
9898
Ok(deleted_peers)
9999
}
100100

@@ -184,7 +184,7 @@ impl PeerManager {
184184
pub async fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Option<Peer>, PeerManagerError> {
185185
match self.peer_storage_sql.direct_identity_node_id(node_id) {
186186
Ok(peer) => Ok(Some(peer)),
187-
Err(PeerManagerError::PeerNotFoundError) | Err(PeerManagerError::BannedPeer) => Ok(None),
187+
Err(PeerManagerError::PeerNotFound(_)) | Err(PeerManagerError::BannedPeer) => Ok(None),
188188
Err(err) => Err(err),
189189
}
190190
}
@@ -196,7 +196,7 @@ impl PeerManager {
196196
) -> Result<Option<Peer>, PeerManagerError> {
197197
match self.peer_storage_sql.direct_identity_public_key(public_key) {
198198
Ok(peer) => Ok(Some(peer)),
199-
Err(PeerManagerError::PeerNotFoundError) | Err(PeerManagerError::BannedPeer) => Ok(None),
199+
Err(PeerManagerError::PeerNotFound(_)) | Err(PeerManagerError::BannedPeer) => Ok(None),
200200
Err(err) => Err(err),
201201
}
202202
}
@@ -296,7 +296,7 @@ impl PeerManager {
296296
let peer = self
297297
.find_by_node_id(node_id)
298298
.await?
299-
.ok_or(PeerManagerError::PeerNotFoundError)?;
299+
.ok_or(PeerManagerError::peer_not_found(node_id))?;
300300
Ok(peer.features)
301301
}
302302

@@ -308,7 +308,7 @@ impl PeerManager {
308308
let peer = self
309309
.find_by_node_id(node_id)
310310
.await?
311-
.ok_or(PeerManagerError::PeerNotFoundError)?;
311+
.ok_or(PeerManagerError::peer_not_found(node_id))?;
312312
Ok(peer.addresses)
313313
}
314314

@@ -324,7 +324,7 @@ impl PeerManager {
324324
}
325325
let peers = self.get_peers_by_node_ids(node_ids).await?;
326326
if peers.is_empty() {
327-
return Err(PeerManagerError::PeerNotFoundError);
327+
return Err(PeerManagerError::peers_not_found(node_ids));
328328
}
329329
let results = peers.into_iter().map(|p| (p.node_id, p.addresses)).collect::<Vec<_>>();
330330
Ok(results)

comms/core/src/peer_manager/or_not_found.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,20 +20,20 @@
2020
// WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
2121
// USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
2222

23-
use crate::peer_manager::PeerManagerError;
23+
use crate::peer_manager::{NodeId, PeerManagerError};
2424

2525
/// Extension trait for Result<Option<T>, PeerManagerError>.
2626
pub trait OrNotFound {
2727
type Value;
2828
type Error;
29-
fn or_not_found(self) -> Result<Self::Value, Self::Error>;
29+
fn or_not_found(self, node_id: &NodeId) -> Result<Self::Value, Self::Error>;
3030
}
3131

3232
impl<T> OrNotFound for Result<Option<T>, PeerManagerError> {
3333
type Error = PeerManagerError;
3434
type Value = T;
3535

36-
fn or_not_found(self) -> Result<Self::Value, Self::Error> {
37-
self.and_then(|val| val.ok_or(PeerManagerError::PeerNotFoundError))
36+
fn or_not_found(self, node_id: &NodeId) -> Result<Self::Value, Self::Error> {
37+
self.and_then(|val| val.ok_or(PeerManagerError::peer_not_found(node_id)))
3838
}
3939
}

comms/core/src/peer_manager/peer_storage_sql.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,9 @@ impl PeerStorageSql {
9999
.add_or_update_online_peer(pubkey, node_id, addresses, peer_features, source)?)
100100
}
101101

102-
/// The peer with the specified public_key will be removed from the PeerManager
103-
pub fn delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
104-
self.peer_db.set_deleted_at(node_id)?;
102+
/// The peer with the specified node id will be soft deleted (marked as deleted)
103+
pub fn soft_delete_peer(&self, node_id: &NodeId) -> Result<(), PeerManagerError> {
104+
self.peer_db.soft_delete_peer(node_id)?;
105105
Ok(())
106106
}
107107

@@ -159,7 +159,7 @@ impl PeerStorageSql {
159159
pub fn direct_identity_node_id(&self, node_id: &NodeId) -> Result<Peer, PeerManagerError> {
160160
let peer = self
161161
.get_peer_by_node_id(node_id)?
162-
.ok_or(PeerManagerError::PeerNotFoundError)?;
162+
.ok_or(PeerManagerError::peer_not_found(node_id))?;
163163

164164
if peer.is_banned() {
165165
Err(PeerManagerError::BannedPeer)
@@ -172,7 +172,7 @@ impl PeerStorageSql {
172172
pub fn direct_identity_public_key(&self, public_key: &CommsPublicKey) -> Result<Peer, PeerManagerError> {
173173
let peer = self
174174
.find_by_public_key(public_key)?
175-
.ok_or(PeerManagerError::PeerNotFoundError)?;
175+
.ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))?;
176176

177177
if peer.is_banned() {
178178
Err(PeerManagerError::BannedPeer)
@@ -244,10 +244,10 @@ impl PeerStorageSql {
244244
}
245245

246246
/// Delete all stale peers, removing them from the database and returning their node_ids
247-
pub fn delete_all_stale_peers(&self) -> Result<Vec<NodeId>, PeerManagerError> {
247+
pub fn hard_delete_all_stale_peers(&self) -> Result<Vec<NodeId>, PeerManagerError> {
248248
Ok(self
249249
.peer_db
250-
.delete_all_stale_peers(STALE_PEER_THRESHOLD_DURATION, MAX_NEIGHBOUR_WALLET_PEER_COUNT)?)
250+
.hard_delete_all_stale_peers(STALE_PEER_THRESHOLD_DURATION, MAX_NEIGHBOUR_WALLET_PEER_COUNT)?)
251251
}
252252

253253
/// Compile a random list of communication node peers of size _n_ that are not banned or offline
@@ -330,7 +330,7 @@ impl PeerStorageSql {
330330
let node_id = NodeId::from_key(public_key);
331331
self.peer_db
332332
.set_banned(&node_id, duration, reason)?
333-
.ok_or(PeerManagerError::PeerNotFoundError)
333+
.ok_or(PeerManagerError::peer_not_found(&NodeId::from_public_key(public_key)))
334334
}
335335

336336
/// Ban the peer for the given duration
@@ -342,13 +342,13 @@ impl PeerStorageSql {
342342
) -> Result<NodeId, PeerManagerError> {
343343
self.peer_db
344344
.set_banned(node_id, duration, reason)?
345-
.ok_or(PeerManagerError::PeerNotFoundError)
345+
.ok_or(PeerManagerError::peer_not_found(node_id))
346346
}
347347

348348
pub fn is_peer_banned(&self, node_id: &NodeId) -> Result<bool, PeerManagerError> {
349349
let peer = self
350350
.get_peer_by_node_id(node_id)?
351-
.ok_or(PeerManagerError::PeerNotFoundError)?;
351+
.ok_or(PeerManagerError::peer_not_found(node_id))?;
352352
Ok(peer.is_banned())
353353
}
354354

@@ -597,7 +597,7 @@ mod test {
597597
peer_storage.find_by_public_key(&peer3.public_key).unwrap().unwrap();
598598

599599
// Test delete of border case peer
600-
assert!(peer_storage.delete_peer(&peer3.node_id).is_ok());
600+
assert!(peer_storage.soft_delete_peer(&peer3.node_id).is_ok());
601601

602602
// It is a logical delete, so there should still be 3 peers in the db
603603
assert_eq!(peer_storage.peer_db.size(), 3);

0 commit comments

Comments
 (0)