Skip to content

Commit 99e8bf2

Browse files
committed
fix(remote): address wire compression review feedback
1 parent c46985f commit 99e8bf2

12 files changed

Lines changed: 200 additions & 108 deletions

File tree

modules/remote-adaptor-std/src/transport/tcp/base.rs

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,19 @@ impl TcpRemoteTransport {
275275
self.default_address.as_ref().map(ToString::to_string).unwrap_or_default()
276276
}
277277

278+
fn local_authority_from_addresses(addresses: &[Address], bound_port: u16) -> String {
279+
addresses
280+
.first()
281+
.map(|address| {
282+
if address.port() == 0 {
283+
Address::new(address.system(), address.host(), bound_port).to_string()
284+
} else {
285+
address.to_string()
286+
}
287+
})
288+
.unwrap_or_default()
289+
}
290+
278291
fn peer_key_for_address(address: &Address) -> String {
279292
alloc::format!("{}:{}", address.host(), address.port())
280293
}
@@ -405,11 +418,12 @@ impl RemoteTransport for TcpRemoteTransport {
405418
if self.running {
406419
return Err(TransportError::AlreadyRunning);
407420
}
421+
let configured_local_addresses = self.configured_local_addresses.clone();
408422
let bound_addr = self.server.start_with_remote_events(
409423
self.inbound_txs.clone(),
410424
self.remote_event_tx.clone(),
411425
self.monotonic_epoch,
412-
self.local_authority(),
426+
move |bound_port| Self::local_authority_from_addresses(&configured_local_addresses, bound_port),
413427
)?;
414428
self.apply_bound_port_to_advertised_addresses(bound_addr.port());
415429
self.running = true;

modules/remote-adaptor-std/src/transport/tcp/base_test.rs

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,3 +47,21 @@ fn outbound_lane_key_ignores_correlation_id_for_same_actor_pair() {
4747

4848
assert_eq!(outbound_lane_key_for_envelope(&first), outbound_lane_key_for_envelope(&second));
4949
}
50+
51+
#[test]
52+
fn local_authority_from_addresses_rewrites_ephemeral_port() {
53+
let addresses = vec![Address::new("local-sys", "127.0.0.1", 0)];
54+
55+
let authority = TcpRemoteTransport::local_authority_from_addresses(&addresses, 2551);
56+
57+
assert_eq!(authority, "local-sys@127.0.0.1:2551");
58+
}
59+
60+
#[test]
61+
fn local_authority_from_addresses_preserves_fixed_port() {
62+
let addresses = vec![Address::new("local-sys", "127.0.0.1", 2552)];
63+
64+
let authority = TcpRemoteTransport::local_authority_from_addresses(&addresses, 2551);
65+
66+
assert_eq!(authority, "local-sys@127.0.0.1:2552");
67+
}

modules/remote-adaptor-std/src/transport/tcp/compression.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,31 @@ impl TcpCompressionTables {
9898
EnvelopePdu::new_with_metadata(
9999
recipient_path,
100100
sender_path,
101-
(pdu.correlation_hi(), pdu.correlation_lo()),
101+
pdu.correlation_hi(),
102+
pdu.correlation_lo(),
102103
pdu.priority(),
103-
pdu.serializer_id(),
104+
EnvelopePayload::new(pdu.serializer_id(), None, pdu.payload().clone()),
104105
manifest,
105-
pdu.payload().clone(),
106106
)
107107
.with_redelivery_sequence(pdu.redelivery_sequence())
108108
}
109109

110110
fn resolve_inbound_envelope(&self, pdu: EnvelopePdu) -> Result<EnvelopePdu, WireError> {
111-
let recipient_path = resolve_text(&self.inbound_actor_refs, pdu.recipient_path_metadata())?;
111+
let recipient_path =
112+
CompressedText::literal(resolve_text(&self.inbound_actor_refs, pdu.recipient_path_metadata())?);
112113
let sender_path =
113114
pdu.sender_path_metadata().map(|metadata| resolve_text(&self.inbound_actor_refs, metadata)).transpose()?;
114115
let manifest =
115116
pdu.manifest_metadata().map(|metadata| resolve_text(&self.inbound_manifests, metadata)).transpose()?;
116117
Ok(
117-
EnvelopePdu::new(
118+
EnvelopePdu::new_with_metadata(
118119
recipient_path,
119-
sender_path,
120+
sender_path.map(CompressedText::literal),
120121
pdu.correlation_hi(),
121122
pdu.correlation_lo(),
122123
pdu.priority(),
123-
EnvelopePayload::new(pdu.serializer_id(), manifest, pdu.payload().clone()),
124+
EnvelopePayload::new(pdu.serializer_id(), None, pdu.payload().clone()),
125+
manifest.map(CompressedText::literal),
124126
)
125127
.with_redelivery_sequence(pdu.redelivery_sequence()),
126128
)

modules/remote-adaptor-std/src/transport/tcp/compression_test.rs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,16 @@ fn envelope_frame(
2626
manifest: Option<CompressedText>,
2727
) -> WireFrame {
2828
WireFrame::Envelope(
29-
EnvelopePdu::new_with_metadata(recipient_path, sender_path, (1, 2), 1, 7, manifest, Bytes::from_static(b"hello"))
30-
.with_redelivery_sequence(None),
29+
EnvelopePdu::new_with_metadata(
30+
recipient_path,
31+
sender_path,
32+
1,
33+
2,
34+
1,
35+
EnvelopePayload::new(7, None, Bytes::from_static(b"hello")),
36+
manifest,
37+
)
38+
.with_redelivery_sequence(None),
3139
)
3240
}
3341

modules/remote-adaptor-std/src/transport/tcp/server.rs

Lines changed: 40 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,7 @@ use std::{
88
time::Instant,
99
};
1010

11-
use fraktor_remote_core_rs::{
12-
config::RemoteCompressionConfig, extension::RemoteEvent, transport::TransportError, wire::CompressionTableKind,
13-
};
11+
use fraktor_remote_core_rs::{config::RemoteCompressionConfig, extension::RemoteEvent, transport::TransportError};
1412
use futures::{SinkExt as _, StreamExt as _};
1513
use tokio::{
1614
net::{TcpListener, TcpStream},
@@ -21,11 +19,9 @@ use tokio::{
2119
use tokio_util::codec::Framed;
2220

2321
use super::{
22+
WireFrame,
2423
client::inbound_lane_index,
25-
compression::{
26-
InboundCompressionAction, TcpCompressionTables, compression_advertisement_interval,
27-
next_compression_advertisement_tick,
28-
},
24+
compression::{InboundCompressionAction, TcpCompressionTables},
2925
connection_loss_reporter::ConnectionLossReporter,
3026
frame_codec::WireFrameCodec,
3127
inbound_frame_event::InboundFrameEvent,
@@ -80,19 +76,22 @@ impl TcpServer {
8076
}
8177
}
8278

83-
pub(crate) fn start_with_remote_events(
79+
pub(crate) fn start_with_remote_events<F>(
8480
&mut self,
8581
inbound_txs: Vec<UnboundedSender<InboundFrameEvent>>,
8682
remote_event_tx: Option<Sender<RemoteEvent>>,
8783
monotonic_epoch: Instant,
88-
local_authority: String,
89-
) -> Result<SocketAddr, TransportError> {
84+
local_authority_for_bound_port: F,
85+
) -> Result<SocketAddr, TransportError>
86+
where
87+
F: FnOnce(u16) -> String, {
9088
if self.accept_task.is_some() {
9189
return Err(TransportError::AlreadyRunning);
9290
}
9391
let handle = Handle::try_current().map_err(|_| TransportError::NotAvailable)?;
9492
let listener = StdTcpListener::bind(&self.bind_addr).map_err(|_| TransportError::SendFailed)?;
9593
let bound_addr = listener.local_addr().map_err(|_| TransportError::SendFailed)?;
94+
let local_authority = local_authority_for_bound_port(bound_addr.port());
9695
listener.set_nonblocking(true).map_err(|_| TransportError::SendFailed)?;
9796
let listener = TcpListener::from_std(listener).map_err(|_| TransportError::SendFailed)?;
9897
let frame_codec = self.frame_codec;
@@ -164,69 +163,43 @@ async fn read_loop(
164163
let mut framed = Framed::new(stream, frame_codec);
165164
let mut authority = None;
166165
let mut compression_tables = TcpCompressionTables::new(compression_config);
167-
let mut actor_ref_advertisement_interval = compression_advertisement_interval(
168-
compression_config.actor_ref_max(),
169-
compression_config.actor_ref_advertisement_interval(),
170-
);
171-
let mut manifest_advertisement_interval = compression_advertisement_interval(
172-
compression_config.manifest_max(),
173-
compression_config.manifest_advertisement_interval(),
174-
);
175166
let exit_cause = loop {
176-
tokio::select! {
177-
next = framed.next() => match next {
178-
| Some(Ok(decoded)) => {
179-
let decoded = match compression_tables.handle_inbound_frame(decoded, &local_authority) {
180-
| Ok(InboundCompressionAction::Forward(frame)) => frame,
181-
| Ok(InboundCompressionAction::Reply(pdu)) => {
182-
if let Err(err) = framed.send(crate::transport::tcp::WireFrame::Control(pdu)).await {
183-
tracing::warn!(?err, peer = %peer, "tcp server compression ack write error");
184-
break Some(TransportError::SendFailed);
185-
}
186-
continue;
187-
},
188-
| Ok(InboundCompressionAction::Consumed) => continue,
189-
| Err(err) => {
190-
tracing::warn!(?err, peer = %peer, "tcp server compression frame error");
167+
match framed.next().await {
168+
| Some(Ok(decoded)) => {
169+
let decoded = match compression_tables.handle_inbound_frame(decoded, &local_authority) {
170+
| Ok(InboundCompressionAction::Forward(frame)) => frame,
171+
| Ok(InboundCompressionAction::Reply(pdu)) => {
172+
if let Err(err) = framed.send(WireFrame::Control(pdu)).await {
173+
tracing::warn!(?err, peer = %peer, "tcp server compression ack write error");
191174
break Some(TransportError::SendFailed);
192-
},
193-
};
194-
if let Some(frame_authority) = authority_for_frame(&decoded) {
195-
authority = Some(frame_authority);
196-
}
197-
let lane_index = inbound_lane_index(&peer, authority.as_ref(), &decoded, inbound_txs.len());
198-
let inbound_tx =
199-
inbound_txs.get(lane_index).expect("inbound_lane_index returns an index within the inbound_txs lane count");
200-
if inbound_tx
201-
.send(InboundFrameEvent { peer: peer.clone(), authority: authority.clone(), frame: decoded })
202-
.is_err()
203-
{
204-
// Receiver dropped — the transport is shutting down.
205-
break None;
206-
}
207-
},
208-
| Some(Err(err)) => {
209-
tracing::warn!(?err, peer = %peer, "tcp frame decode error");
210-
break Some(TransportError::SendFailed);
211-
},
212-
| None => break Some(TransportError::ConnectionClosed),
213-
},
214-
_ = next_compression_advertisement_tick(&mut actor_ref_advertisement_interval) => {
215-
if let Some(frame) = compression_tables.create_advertisement(CompressionTableKind::ActorRef, &local_authority)
216-
&& let Err(err) = framed.send(frame).await
217-
{
218-
tracing::warn!(?err, peer = %peer, "tcp server actor-ref compression advertisement write error");
219-
break Some(TransportError::SendFailed);
175+
}
176+
continue;
177+
},
178+
| Ok(InboundCompressionAction::Consumed) => continue,
179+
| Err(err) => {
180+
tracing::warn!(?err, peer = %peer, "tcp server compression frame error");
181+
break Some(TransportError::SendFailed);
182+
},
183+
};
184+
if let Some(frame_authority) = authority_for_frame(&decoded) {
185+
authority = Some(frame_authority);
220186
}
221-
},
222-
_ = next_compression_advertisement_tick(&mut manifest_advertisement_interval) => {
223-
if let Some(frame) = compression_tables.create_advertisement(CompressionTableKind::Manifest, &local_authority)
224-
&& let Err(err) = framed.send(frame).await
187+
let lane_index = inbound_lane_index(&peer, authority.as_ref(), &decoded, inbound_txs.len());
188+
let inbound_tx =
189+
inbound_txs.get(lane_index).expect("inbound_lane_index returns an index within the inbound_txs lane count");
190+
if inbound_tx
191+
.send(InboundFrameEvent { peer: peer.clone(), authority: authority.clone(), frame: decoded })
192+
.is_err()
225193
{
226-
tracing::warn!(?err, peer = %peer, "tcp server manifest compression advertisement write error");
227-
break Some(TransportError::SendFailed);
194+
// Receiver dropped — the transport is shutting down.
195+
break None;
228196
}
229197
},
198+
| Some(Err(err)) => {
199+
tracing::warn!(?err, peer = %peer, "tcp frame decode error");
200+
break Some(TransportError::SendFailed);
201+
},
202+
| None => break Some(TransportError::ConnectionClosed),
230203
}
231204
};
232205
if let (Some(cause), Some(authority), Some(sender)) = (exit_cause, authority, remote_event_tx) {

modules/remote-adaptor-std/src/transport/tcp_test.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,9 @@ fn make_test_server() -> TcpServer {
161161

162162
fn start_test_server(server: &mut TcpServer, inbound_tx: UnboundedSender<InboundFrameEvent>) -> SocketAddr {
163163
server
164-
.start_with_remote_events(vec![inbound_tx], None, Instant::now(), String::from("local@127.0.0.1:0"))
164+
.start_with_remote_events(vec![inbound_tx], None, Instant::now(), |bound_port| {
165+
format!("local@127.0.0.1:{bound_port}")
166+
})
165167
.expect("server should bind to a system-assigned port")
166168
}
167169

@@ -859,8 +861,11 @@ async fn tcp_server_consumes_compression_advertisement_and_replies_with_ack() {
859861
ack,
860862
WireFrame::Control(ControlPdu::CompressionAck { table_kind: CompressionTableKind::ActorRef, generation: 7, .. })
861863
));
862-
let inbound = tokio::time::timeout(Duration::from_millis(100), server_inbound_rx.recv()).await;
863-
assert!(inbound.is_err(), "compression advertisement must not reach the inbound event loop");
864+
tokio::task::yield_now().await;
865+
assert!(
866+
matches!(server_inbound_rx.try_recv(), Err(mpsc::error::TryRecvError::Empty)),
867+
"compression advertisement must not reach the inbound event loop"
868+
);
864869
server.shutdown();
865870
}
866871

modules/remote-core/src/wire/compression_table.rs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,12 @@ impl CompressionTable {
106106
entry.hit_count = entry.hit_count.saturating_add(1);
107107
return;
108108
}
109+
let Some(max) = self.max else {
110+
return;
111+
};
112+
if self.entries.len() >= max.get() {
113+
return;
114+
}
109115
let entry_id = self.next_entry_id;
110116
self.next_entry_id = self.next_entry_id.saturating_add(1);
111117
let mut entry = CompressionTableEntryState::new(entry_id, literal.to_string());

modules/remote-core/src/wire/compression_table_test.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,19 @@ fn observe_updates_hit_count_without_duplicate_entry_ids() {
2020
assert_eq!(table.entry_id("/user/a"), Some(1));
2121
}
2222

23+
#[test]
24+
fn observe_stops_adding_entries_at_configured_max() {
25+
let mut table = CompressionTable::new(max(1));
26+
27+
table.observe("/user/a");
28+
table.observe("/user/b");
29+
table.observe("/user/a");
30+
31+
assert_eq!(table.len(), 1);
32+
assert_eq!(table.entry_id("/user/b"), None);
33+
assert_eq!(table.hit_count("/user/a"), Some(2));
34+
}
35+
2336
#[test]
2437
fn max_accessor_returns_configured_bound() {
2538
let table = CompressionTable::new(max(2));
@@ -48,7 +61,7 @@ fn disabled_table_does_not_track_hits_or_advertise() {
4861

4962
#[test]
5063
fn advertisement_is_bounded_and_deterministic() {
51-
let mut table = CompressionTable::new(max(2));
64+
let mut table = CompressionTable::new(max(3));
5265
table.observe("/user/a");
5366
table.observe("/user/b");
5467
table.observe("/user/b");
@@ -58,9 +71,10 @@ fn advertisement_is_bounded_and_deterministic() {
5871
let advertisement = table.create_advertisement(CompressionTableKind::ActorRef).unwrap();
5972

6073
assert_eq!(advertisement.generation(), 1);
61-
assert_eq!(advertisement.entries().len(), 2);
74+
assert_eq!(advertisement.entries().len(), 3);
6275
assert_eq!(advertisement.entries()[0].literal(), "/user/b");
6376
assert_eq!(advertisement.entries()[1].literal(), "/user/c");
77+
assert_eq!(advertisement.entries()[2].literal(), "/user/a");
6478
}
6579

6680
#[test]

modules/remote-core/src/wire/control_codec.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ const SUBKIND_FLUSH_REQUEST: u8 = 0x04;
2626
const SUBKIND_FLUSH_ACK: u8 = 0x05;
2727
const SUBKIND_COMPRESSION_ADVERTISEMENT: u8 = 0x06;
2828
const SUBKIND_COMPRESSION_ACK: u8 = 0x07;
29+
const MIN_COMPRESSION_ENTRY_BYTES: usize = 4 + 4;
2930

3031
/// Zero-sized codec for [`ControlPdu`].
3132
#[derive(Clone, Copy, Debug, Default)]
@@ -176,6 +177,9 @@ fn decode_compression_entries(buf: &mut Bytes) -> Result<Vec<CompressionTableEnt
176177
return Err(WireError::Truncated);
177178
}
178179
let entry_count = buf.get_u32() as usize;
180+
if entry_count > buf.remaining() / MIN_COMPRESSION_ENTRY_BYTES {
181+
return Err(WireError::Truncated);
182+
}
179183
let mut entries = Vec::with_capacity(entry_count);
180184
for _ in 0..entry_count {
181185
if buf.remaining() < 4 {

modules/remote-core/src/wire/envelope_codec.rs

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
compressed_text::{
1010
decode_compressed_text, decode_option_compressed_text, encode_compressed_text, encode_option_compressed_text,
1111
},
12+
envelope_payload::EnvelopePayload,
1213
envelope_pdu::EnvelopePdu,
1314
frame_header::KIND_ENVELOPE,
1415
primitives::{begin_frame, decode_bytes, encode_bytes, patch_frame_length, read_frame_header},
@@ -60,16 +61,18 @@ impl Codec<EnvelopePdu> for EnvelopeCodec {
6061
let serializer_id = buf.get_u32();
6162
let manifest = decode_option_compressed_text(buf)?;
6263
let payload = decode_bytes(buf)?;
63-
Ok(EnvelopePdu::new_with_metadata(
64-
recipient_path,
65-
sender_path,
66-
(correlation_hi, correlation_lo),
67-
priority,
68-
serializer_id,
69-
manifest,
70-
payload,
71-
))
72-
.map(|pdu| pdu.with_redelivery_sequence(redelivery_sequence))
64+
Ok(
65+
EnvelopePdu::new_with_metadata(
66+
recipient_path,
67+
sender_path,
68+
correlation_hi,
69+
correlation_lo,
70+
priority,
71+
EnvelopePayload::new(serializer_id, None, payload),
72+
manifest,
73+
)
74+
.with_redelivery_sequence(redelivery_sequence),
75+
)
7376
}
7477
}
7578

0 commit comments

Comments
 (0)