Skip to content

Commit 681c7ba

Browse files
committed
feat: add downstream peer limiting
For now this is a simple cmdline argument that just ignores peers beyond a certain count. Proper handling will need to include idle timers etc. to kick out downstream peers that are just squatting. Signed-Off-By: Roland Kuhn <rk@rkuhn.info>
1 parent f570e78 commit 681c7ba

5 files changed

Lines changed: 25 additions & 7 deletions

File tree

crates/amaru/src/bin/amaru/cmd/daemon.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,10 @@ pub struct Args {
5454
/// The address to listen on for incoming connections.
5555
#[arg(long, value_name = "LISTEN_ADDRESS", default_value = super::DEFAULT_LISTEN_ADDRESS)]
5656
listen_address: String,
57+
58+
/// The maximum number of downstream peers to connect to.
59+
#[arg(long, value_name = "MAX_DOWNSTREAM_PEERS", default_value_t = 10)]
60+
max_downstream_peers: usize,
5761
}
5862

5963
pub async fn run(
@@ -112,5 +116,6 @@ fn parse_args(args: Args) -> Result<Config, Box<dyn std::error::Error>> {
112116
network: args.network,
113117
network_magic: args.network.to_network_magic(),
114118
listen_address: args.listen_address,
119+
max_downstream_peers: args.max_downstream_peers,
115120
})
116121
}

crates/amaru/src/stages/consensus/chain_forward.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ pub struct ForwardStage {
5353
pub runtime: AcTokio,
5454
pub listen_address: String,
5555
pub downstream: Option<ActoRef<ForwardEvent>>,
56+
pub max_peers: usize,
5657
}
5758

5859
#[derive(Debug, Clone)]
@@ -67,6 +68,7 @@ impl ForwardStage {
6768
store: Arc<Mutex<dyn ChainStore<Header>>>,
6869
network_magic: u64,
6970
listen_address: &str,
71+
max_peers: usize,
7072
) -> Self {
7173
#[allow(clippy::expect_used)]
7274
let runtime =
@@ -78,6 +80,7 @@ impl ForwardStage {
7880
runtime,
7981
listen_address: listen_address.to_string(),
8082
downstream,
83+
max_peers,
8184
}
8285
}
8386
}
@@ -116,7 +119,7 @@ impl gasket::framework::Worker<ForwardStage> for Worker {
116119
let clients = stage
117120
.runtime
118121
.spawn_actor("chain_forward", |cell| {
119-
client_supervisor(cell, stage.store.clone())
122+
client_supervisor(cell, stage.store.clone(), stage.max_peers)
120123
})
121124
.me;
122125

@@ -251,11 +254,17 @@ enum ClientMsg {
251254
async fn client_supervisor(
252255
mut cell: ActoCell<ClientMsg, impl ActoRuntime, Result<(), client_protocol::ClientError>>,
253256
store: Arc<Mutex<dyn ChainStore<Header>>>,
257+
max_peers: usize,
254258
) {
255259
let mut clients = HashMap::new();
256260
while let Some(msg) = cell.recv().await.has_senders() {
257261
match msg {
258262
ActoMsgSuper::Message(ClientMsg::Peer(peer, tip)) => {
263+
if clients.len() >= max_peers {
264+
tracing::warn!(target: EVENT_TARGET, "max peers reached, dropping peer");
265+
continue;
266+
}
267+
259268
let addr = peer
260269
.accepted_address()
261270
.map(|a| a.to_string())

crates/amaru/src/stages/consensus/chain_forward/tests.rs

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,15 @@ impl ChainStore<Header> for TestStore {
3737
}
3838

3939
fn get_nonces(&self, _header: &Hash<32>) -> Option<Nonces> {
40-
todo!()
40+
unimplemented!()
4141
}
4242

4343
fn put_nonces(&mut self, _header: &Hash<32>, _nonces: Nonces) -> Result<(), StoreError> {
44-
todo!()
44+
unimplemented!()
4545
}
4646

4747
fn era_history(&self) -> &slot_arithmetic::EraHistory {
48-
todo!()
48+
unimplemented!()
4949
}
5050
}
5151

@@ -177,8 +177,10 @@ fn find_headers_between_tip_and_lost() {
177177
let tip = point(TIP_41_SLOT, TIP_41);
178178
let points = [point(LOST_41_SLOT, LOST_41)];
179179

180-
let result = find_headers_between(&store, &tip, &points);
181-
assert!(result.is_none(), "{result:?}");
180+
let result = find_headers_between(&store, &tip, &points).unwrap();
181+
assert_eq!(result.0.len() as u64, TIP_41_HEIGHT);
182+
assert_eq!(result.1 .0, Point::Origin);
183+
assert_eq!(result.1 .1, 0);
182184
}
183185

184186
#[test]
@@ -212,7 +214,7 @@ fn test_chain_sync() {
212214

213215
// step 1: prepare the stage
214216
let (block_tx, block_rx) = mpsc::channel(1);
215-
let mut stage = ForwardStage::new(Some(downstream), store, 42, "127.0.0.1:0");
217+
let mut stage = ForwardStage::new(Some(downstream), store, 42, "127.0.0.1:0", 1);
216218
stage.upstream.connect(ChannelRecvAdapter::Mpsc(block_rx));
217219
let tether = spawn_stage(stage, Default::default());
218220

crates/amaru/src/stages/mod.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ pub struct Config {
4949
pub network: NetworkName,
5050
pub network_magic: u32,
5151
pub listen_address: String,
52+
pub max_downstream_peers: usize,
5253
}
5354

5455
/// A session with a peer, including the peer itself and a client to communicate with it.
@@ -102,6 +103,7 @@ pub fn bootstrap(
102103
chain_ref.clone(),
103104
config.network_magic as u64,
104105
&config.listen_address,
106+
config.max_downstream_peers,
105107
);
106108

107109
let (to_block_fetch, from_consensus_stage) = gasket::messaging::tokio::mpsc_channel(50);
File renamed without changes.

0 commit comments

Comments
 (0)