Skip to content

declare_stream does not set x-max-age from StreamParams::expiration, uses wrong key names for length args #118

@lukebakken

Description

@lukebakken

Describe the Bug

Client::declare_stream in both the async and blocking APIs silently drops StreamParams::expiration and uses incorrect x-argument key names for max_length_bytes and max_segment_length_bytes. This means streams declared via this client never have retention or size limits applied by the broker, even though the API call succeeds.

Reported downstream in rabbitmq/rabbitmqadmin-ng#145.

Root Cause

In both src/blocking_api/queues_and_streams.rs and src/api/queues_and_streams.rs, the declare_stream function builds an arguments map but:

  1. Never uses params.expiration — the field is set on StreamParams but never inserted into the arguments map as x-max-age
  2. Inserts max_length_bytes — should be x-max-length-bytes (the canonical x-argument recognized by rabbit_amqqueue:declare_args())
  3. Inserts max_segment_length_bytes — should be x-stream-max-segment-size-bytes (the canonical x-argument)

The broker silently accepts unknown argument keys (they pass through check_arguments_key because they are not in the known-but-wrong-type set), so the API call succeeds. However, rabbit_stream_queue:update_stream_conf/2 uses args_policy_lookup/3 which prepends x- to look up these values, so the incorrectly-named keys have no effect. The server-side log confirms retention => [].

Affected Code

// Both blocking_api and api versions are identical:
pub fn declare_stream(&self, vhost: &str, params: &StreamParams<'_>) -> Result<()> {
    let mut m: Map<String, Value> = Map::new();
    if let Some(m2) = params.arguments.clone() { m.extend(m2); };
    if let Some(val) = params.max_length_bytes {
        m.insert("max_length_bytes".to_owned(), json!(val));       // wrong key
    };
    if let Some(val) = params.max_segment_length_bytes {
        m.insert("max_segment_length_bytes".to_owned(), json!(val)); // wrong key
    };
    // params.expiration is never referenced                        // missing entirely
    let q_params = QueueParams::new_stream(params.name, Some(m));
    ...
}

Expected Fix

if !params.expiration.is_empty() {
    m.insert("x-max-age".to_owned(), json!(params.expiration));
}
if let Some(val) = params.max_length_bytes {
    m.insert("x-max-length-bytes".to_owned(), json!(val));
}
if let Some(val) = params.max_segment_length_bytes {
    m.insert("x-stream-max-segment-size-bytes".to_owned(), json!(val));
}

Verification

All claims verified from source:

  • rabbit_amqqueue:declare_args() lists {<<"x-max-age">>, ...}, {<<"x-max-length-bytes">>, ...}, {<<"x-stream-max-segment-size-bytes">>, ...} as the canonical x-argument names
  • rabbit_stream_queue:update_stream_conf/2 reads these via args_policy_lookup(<<"max-age">>, ...) etc., which auto-prepends x-
  • rabbit_misc:to_amqp_table/1 performs no key transformation — keys are passed through as-is
  • Existing tests in blocking_stream_tests.rs and async_stream_tests.rs only assert is_ok() on declare_stream — they never verify the arguments on the created stream

/cc @michaelklishin

Metadata

Metadata

Labels

bugSomething isn't working

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions