Skip to content

Commit 21bd419

Browse files
committed
fix(remote): align compression table ack semantics
1 parent 9f3fbab commit 21bd419

3 files changed

Lines changed: 63 additions & 10 deletions

File tree

modules/remote-adaptor-std/src/transport/tcp/compression_test.rs

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,6 +137,41 @@ fn inbound_manifest_advertisement_resolves_manifest_metadata() {
137137
));
138138
}
139139

140+
#[test]
141+
fn inbound_advertisement_accepts_peer_table_larger_than_local_max() {
142+
let config = RemoteCompressionConfig::new().with_actor_ref_max(max(1));
143+
let mut tables = TcpCompressionTables::new(config);
144+
let frame = WireFrame::Control(ControlPdu::CompressionAdvertisement {
145+
authority: "remote@host:1".to_string(),
146+
table_kind: CompressionTableKind::ActorRef,
147+
generation: 9,
148+
entries: vec![
149+
CompressionTableEntry::new(3, "/user/a".to_string()),
150+
CompressionTableEntry::new(4, "/user/b".to_string()),
151+
],
152+
});
153+
154+
let action = tables.handle_inbound_frame(frame, "local@host:2").unwrap();
155+
156+
assert!(matches!(
157+
action,
158+
InboundCompressionAction::Reply {
159+
pdu: ControlPdu::CompressionAck {
160+
authority,
161+
table_kind: CompressionTableKind::ActorRef,
162+
generation: 9,
163+
},
164+
authority: peer_authority,
165+
} if authority == "local@host:2" && peer_authority.authority() == "remote@host:1"
166+
));
167+
let action =
168+
tables.handle_inbound_frame(envelope_frame(CompressedText::table_ref(4), None, None), "local@host:2").unwrap();
169+
assert!(matches!(
170+
action,
171+
InboundCompressionAction::Forward(WireFrame::Envelope(pdu)) if pdu.recipient_path() == "/user/b"
172+
));
173+
}
174+
140175
#[test]
141176
fn outbound_acknowledged_metadata_uses_table_refs() {
142177
let mut tables = TcpCompressionTables::new(RemoteCompressionConfig::new());

modules/remote-core/src/wire/compression_table.rs

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -159,6 +159,8 @@ impl CompressionTable {
159159
if entry.advertised_generation == Some(generation) {
160160
entry.acknowledged_generation = Some(generation);
161161
applied = true;
162+
} else {
163+
entry.acknowledged_generation = None;
162164
}
163165
}
164166
if applied {
@@ -189,11 +191,6 @@ impl CompressionTable {
189191
if !self.is_enabled() {
190192
return Ok(false);
191193
}
192-
if let Some(max) = self.max
193-
&& entries.len() > max.get()
194-
{
195-
return Err(WireError::InvalidFormat);
196-
}
197194
if has_duplicate_entry_id(entries) {
198195
return Err(WireError::InvalidFormat);
199196
}

modules/remote-core/src/wire/compression_table_test.rs

Lines changed: 26 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use alloc::string::ToString;
22
use core::num::NonZeroUsize;
33

4-
use super::CompressionTable;
4+
use super::{CompressionTable, CompressionTableEntryState};
55
use crate::wire::{CompressionTableEntry, CompressionTableKind, WireError};
66

77
fn max(value: usize) -> Option<NonZeroUsize> {
@@ -117,6 +117,27 @@ fn stale_ack_is_ignored() {
117117
assert_eq!(table.latest_pending_generation(), Some(generation));
118118
}
119119

120+
#[test]
121+
fn ack_clears_entries_not_present_in_acknowledged_generation() {
122+
let mut table = CompressionTable::new(max(2));
123+
table.entries.push(CompressionTableEntryState {
124+
id: 1,
125+
literal: "/user/a".to_string(),
126+
hit_count: 2,
127+
advertised_generation: Some(1),
128+
acknowledged_generation: Some(1),
129+
});
130+
table.entries.push(CompressionTableEntryState::new(2, "/user/b".to_string()));
131+
table.entries[1].hit_count = 1;
132+
table.entries[1].advertised_generation = Some(2);
133+
table.latest_pending_generation = Some(2);
134+
135+
assert!(table.acknowledge(2));
136+
137+
assert_eq!(table.encode("/user/a").as_literal(), Some("/user/a"));
138+
assert_eq!(table.encode("/user/b").as_table_ref(), Some(2));
139+
}
140+
120141
#[test]
121142
fn inbound_advertisement_resolves_entry_ids() {
122143
let mut table = CompressionTable::new(max(4));
@@ -129,15 +150,15 @@ fn inbound_advertisement_resolves_entry_ids() {
129150
}
130151

131152
#[test]
132-
fn inbound_advertisement_rejects_entries_over_configured_max() {
153+
fn inbound_advertisement_accepts_entries_over_local_advertisement_bound() {
133154
let mut table = CompressionTable::new(max(1));
134155
let entries =
135156
[CompressionTableEntry::new(9, "/user/a".to_string()), CompressionTableEntry::new(10, "/user/b".to_string())];
136157

137-
let err = table.apply_advertisement(7, &entries).unwrap_err();
158+
assert_eq!(table.apply_advertisement(7, &entries), Ok(true));
138159

139-
assert_eq!(err, WireError::InvalidFormat);
140-
assert!(table.is_empty());
160+
assert_eq!(table.resolve(9), Some("/user/a"));
161+
assert_eq!(table.resolve(10), Some("/user/b"));
141162
}
142163

143164
#[test]

0 commit comments

Comments
 (0)