Skip to content

Commit 1b9db8a

Browse files
Make it possible for clients to declared server-named queues with custom prefixes
By specifying the prefix in the `name_prefix` optional queue argument. The default remains `amq.gen`. The prefix is meant to allow the user to use app-specific prefixes to eariest tell which app owns a given server-named queue. So the prefixes are meant to be relatively short, limited to 64 bytes (not UTF codepoints) by design since queue names have the 255 byte limit in AMQP 0-9-1.
1 parent ac4538c commit 1b9db8a

6 files changed

Lines changed: 423 additions & 6 deletions

deps/rabbit/Makefile

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -249,16 +249,16 @@ define ct_master.erl
249249
endef
250250

251251
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
252-
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
252+
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet server_named_queue_prefix_prop signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
253253
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control
254-
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
254+
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit server_named_queue_prefix
255255

256256
PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
257257
PARALLEL_CT_SET_2_B = crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2
258258
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_log_management unit_operator_policy prevent_startup_if_node_was_reset
259259
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator
260260

261-
PARALLEL_CT_SET_3_A = unit_definitions definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue unit_rabbit_khepri_migrations
261+
PARALLEL_CT_SET_3_A = unit_definitions definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue unit_rabbit_khepri_migrations unit_server_named_queue
262262
PARALLEL_CT_SET_3_B = list_consumers_sanity_check list_queues_online_and_offline logging lqueue rabbit_fifo_q rabbit_fifo_pq
263263
PARALLEL_CT_SET_3_C = cli_forget_cluster_node mc_unit message_size_limit
264264
PARALLEL_CT_SET_3_D = metrics mirrored_supervisor proxy_protocol runtime_parameters unit_rabbit_vm unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@
5959
-export([collect_info_all/2]).
6060

6161
-export([is_policy_applicable/2, declare_args/0, consume_args/0]).
62-
-export([is_server_named_allowed/1]).
62+
-export([is_server_named_allowed/1, server_named_queue_prefix/1]).
6363

6464
-export([check_max_age/1]).
6565
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
@@ -362,6 +362,19 @@ is_server_named_allowed(Args) ->
362362
Type = get_queue_type(Args),
363363
rabbit_queue_type:is_server_named_allowed(Type).
364364

365+
-spec server_named_queue_prefix(rabbit_framing:amqp_table()) ->
366+
{ok, binary()} | {error, {prefix_too_long, binary()}}.
367+
server_named_queue_prefix(Args) ->
368+
case rabbit_misc:table_lookup(Args, <<"name_prefix">>) of
369+
{longstr, Prefix} when Prefix =/= <<>> ->
370+
case byte_size(Prefix) =< 64 of
371+
true -> {ok, Prefix};
372+
false -> {error, {prefix_too_long, Prefix}}
373+
end;
374+
_ ->
375+
{ok, <<"amq.gen">>}
376+
end.
377+
365378
-spec lookup(QueueName) -> Ret when
366379
QueueName :: name(),
367380
Ret :: rabbit_types:ok(amqqueue:amqqueue())

deps/rabbit/src/rabbit_channel.erl

Lines changed: 13 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2442,19 +2442,30 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
24422442
true -> ConnPid;
24432443
false -> none
24442444
end,
2445+
Args0Stripped = lists:keydelete(<<"name_prefix">>, 1, Args0),
24452446
Args = rabbit_amqqueue:augment_declare_args(VHostPath,
24462447
DurableDeclare,
24472448
ExclusiveDeclare,
24482449
AutoDelete,
2449-
Args0),
2450+
Args0Stripped),
24502451
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
24512452
Durable = DurableDeclare andalso not ExclusiveDeclare,
24522453
Kind = queue,
24532454
ActualNameBin = case StrippedQueueNameBin of
24542455
<<>> ->
24552456
case rabbit_amqqueue:is_server_named_allowed(Args) of
24562457
true ->
2457-
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen");
2458+
NamePrefix =
2459+
case rabbit_amqqueue:server_named_queue_prefix(Args0) of
2460+
{ok, P} ->
2461+
strip_cr_lf(P);
2462+
{error, {prefix_too_long, P}} ->
2463+
rabbit_misc:protocol_error(
2464+
precondition_failed,
2465+
"Queue name prefix '~ts' exceeds the 64-byte limit",
2466+
[strip_cr_lf(P)])
2467+
end,
2468+
rabbit_guid:binary(rabbit_guid:gen_secure(), NamePrefix);
24582469
false ->
24592470
rabbit_misc:protocol_error(
24602471
precondition_failed,
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(server_named_queue_prefix_SUITE).
9+
10+
-include_lib("amqp_client/include/amqp_client.hrl").
11+
-include_lib("eunit/include/eunit.hrl").
12+
13+
-compile([export_all, nowarn_export_all]).
14+
15+
all() ->
16+
[
17+
{group, tests}
18+
].
19+
20+
groups() ->
21+
[
22+
{tests, [], [
23+
default_prefix,
24+
custom_prefix,
25+
custom_prefix_with_other_args,
26+
prefix_at_max_length,
27+
prefix_too_long_rejected,
28+
name_prefix_not_stored_in_queue_args,
29+
name_prefix_ignored_for_named_queue
30+
]}
31+
].
32+
33+
init_per_suite(Config) ->
34+
rabbit_ct_helpers:log_environment(),
35+
rabbit_ct_helpers:run_setup_steps(Config).
36+
37+
end_per_suite(Config) ->
38+
rabbit_ct_helpers:run_teardown_steps(Config).
39+
40+
init_per_group(_, Config) ->
41+
Config.
42+
43+
end_per_group(_, Config) ->
44+
Config.
45+
46+
init_per_testcase(Testcase, Config) ->
47+
rabbit_ct_helpers:testcase_started(Config, Testcase),
48+
Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, Testcase}]),
49+
rabbit_ct_helpers:run_steps(Config1,
50+
rabbit_ct_broker_helpers:setup_steps() ++
51+
rabbit_ct_client_helpers:setup_steps()).
52+
53+
end_per_testcase(Testcase, Config) ->
54+
Config1 = rabbit_ct_helpers:run_steps(Config,
55+
rabbit_ct_client_helpers:teardown_steps() ++
56+
rabbit_ct_broker_helpers:teardown_steps()),
57+
rabbit_ct_helpers:testcase_finished(Config1, Testcase).
58+
59+
%% Without name_prefix the queue name starts with "amq.gen-".
60+
default_prefix(Config) ->
61+
Ch = rabbit_ct_client_helpers:open_channel(Config),
62+
#'queue.declare_ok'{queue = QName} =
63+
amqp_channel:call(Ch, #'queue.declare'{queue = <<>>, exclusive = true}),
64+
?assertMatch(<<"amq.gen-", _/binary>>, QName),
65+
rabbit_ct_client_helpers:close_channel(Ch).
66+
67+
%% With name_prefix set, the generated name starts with "<prefix>-".
68+
custom_prefix(Config) ->
69+
Ch = rabbit_ct_client_helpers:open_channel(Config),
70+
#'queue.declare_ok'{queue = QName} =
71+
amqp_channel:call(Ch, #'queue.declare'{
72+
queue = <<>>,
73+
exclusive = true,
74+
arguments = [{<<"name_prefix">>, longstr, <<"myapp">>}]
75+
}),
76+
?assertMatch(<<"myapp-", _/binary>>, QName),
77+
rabbit_ct_client_helpers:close_channel(Ch).
78+
79+
%% name_prefix works alongside other queue arguments.
80+
custom_prefix_with_other_args(Config) ->
81+
Ch = rabbit_ct_client_helpers:open_channel(Config),
82+
#'queue.declare_ok'{queue = QName} =
83+
amqp_channel:call(Ch, #'queue.declare'{
84+
queue = <<>>,
85+
exclusive = true,
86+
arguments = [
87+
{<<"name_prefix">>, longstr, <<"svc">>},
88+
{<<"x-message-ttl">>, signedint, 60000}
89+
]
90+
}),
91+
?assertMatch(<<"svc-", _/binary>>, QName),
92+
rabbit_ct_client_helpers:close_channel(Ch).
93+
94+
%% A prefix of exactly 64 bytes is accepted.
95+
prefix_at_max_length(Config) ->
96+
Ch = rabbit_ct_client_helpers:open_channel(Config),
97+
Prefix = binary:copy(<<"x">>, 64),
98+
#'queue.declare_ok'{queue = QName} =
99+
amqp_channel:call(Ch, #'queue.declare'{
100+
queue = <<>>,
101+
exclusive = true,
102+
arguments = [{<<"name_prefix">>, longstr, Prefix}]
103+
}),
104+
ExpectedHead = <<Prefix/binary, "-">>,
105+
?assert(binary:part(QName, 0, byte_size(ExpectedHead)) =:= ExpectedHead),
106+
rabbit_ct_client_helpers:close_channel(Ch).
107+
108+
%% A prefix longer than 64 bytes causes the broker to close the channel.
109+
prefix_too_long_rejected(Config) ->
110+
Ch = rabbit_ct_client_helpers:open_channel(Config),
111+
TooLong = binary:copy(<<"x">>, 65),
112+
?assertExit(
113+
{{shutdown, {server_initiated_close, 406, _}}, _},
114+
amqp_channel:call(Ch, #'queue.declare'{
115+
queue = <<>>,
116+
exclusive = true,
117+
arguments = [{<<"name_prefix">>, longstr, TooLong}]
118+
})).
119+
120+
%% name_prefix is not stored in the declared queue's arguments.
121+
name_prefix_not_stored_in_queue_args(Config) ->
122+
Ch = rabbit_ct_client_helpers:open_channel(Config),
123+
#'queue.declare_ok'{queue = QName} =
124+
amqp_channel:call(Ch, #'queue.declare'{
125+
queue = <<>>,
126+
exclusive = true,
127+
arguments = [{<<"name_prefix">>, longstr, <<"myapp">>}]
128+
}),
129+
QRes = rabbit_misc:r(<<"/">>, queue, QName),
130+
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QRes]),
131+
StoredArgs = amqqueue:get_arguments(Q),
132+
?assertEqual(undefined, rabbit_misc:table_lookup(StoredArgs, <<"name_prefix">>)),
133+
rabbit_ct_client_helpers:close_channel(Ch).
134+
135+
%% `name_prefix` is ignored for explicitly named queues.
136+
%% Redeclaring with `name_prefix` also passes the equivalence check.
137+
name_prefix_ignored_for_named_queue(Config) ->
138+
Ch = rabbit_ct_client_helpers:open_channel(Config),
139+
QName = <<"my-explicit-queue">>,
140+
#'queue.declare_ok'{queue = QName} =
141+
amqp_channel:call(Ch, #'queue.declare'{
142+
queue = QName,
143+
exclusive = true,
144+
arguments = [{<<"name_prefix">>, longstr, <<"ignored">>}]
145+
}),
146+
%% Redeclare the same queue with a `name_prefix`.
147+
#'queue.declare_ok'{queue = QName} =
148+
amqp_channel:call(Ch, #'queue.declare'{
149+
queue = QName,
150+
exclusive = true,
151+
arguments = [{<<"name_prefix">>, longstr, <<"ignored">>}]
152+
}),
153+
rabbit_ct_client_helpers:close_channel(Ch).
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(server_named_queue_prefix_prop_SUITE).
9+
10+
-compile(export_all).
11+
12+
-include_lib("common_test/include/ct.hrl").
13+
-include_lib("proper/include/proper.hrl").
14+
15+
-define(NUM_TESTS, 1000).
16+
-define(MAX_PREFIX_LEN, 64).
17+
18+
all() ->
19+
[
20+
prop_valid_prefix_returned_verbatim,
21+
prop_too_long_prefix_yields_error,
22+
prop_no_valid_prefix_yields_default,
23+
prop_generated_name_starts_with_prefix,
24+
prop_generated_name_within_amqp_limit,
25+
prop_different_guids_yield_different_names,
26+
prop_different_prefixes_yield_different_names
27+
].
28+
29+
init_per_suite(Config) ->
30+
rabbit_ct_helpers:log_environment(),
31+
rabbit_ct_helpers:run_setup_steps(Config).
32+
33+
end_per_suite(Config) ->
34+
rabbit_ct_helpers:run_teardown_steps(Config).
35+
36+
init_per_testcase(Testcase, Config) ->
37+
rabbit_ct_helpers:testcase_started(Config, Testcase).
38+
39+
%% A non-empty prefix of at most 64 bytes is returned as {ok, Prefix}.
40+
prop_valid_prefix_returned_verbatim(Config) ->
41+
rabbit_ct_proper_helpers:run_proper(
42+
fun() -> prop_valid_prefix(Config) end, [], ?NUM_TESTS).
43+
44+
prop_valid_prefix(_Config) ->
45+
?FORALL(Prefix, valid_prefix_gen(),
46+
begin
47+
Args = [{<<"name_prefix">>, longstr, Prefix}],
48+
rabbit_amqqueue:server_named_queue_prefix(Args) =:= {ok, Prefix}
49+
end).
50+
51+
%% A prefix longer than 64 bytes yields {error, {prefix_too_long, Prefix}}.
52+
prop_too_long_prefix_yields_error(Config) ->
53+
rabbit_ct_proper_helpers:run_proper(
54+
fun() -> prop_too_long_prefix(Config) end, [], ?NUM_TESTS).
55+
56+
prop_too_long_prefix(_Config) ->
57+
?FORALL(Prefix, too_long_prefix_gen(),
58+
begin
59+
Args = [{<<"name_prefix">>, longstr, Prefix}],
60+
rabbit_amqqueue:server_named_queue_prefix(Args) =:=
61+
{error, {prefix_too_long, Prefix}}
62+
end).
63+
64+
%% Absent, empty, or wrong-type `name_prefix` yields {ok, <<"amq.gen">>}.
65+
prop_no_valid_prefix_yields_default(Config) ->
66+
rabbit_ct_proper_helpers:run_proper(
67+
fun() -> prop_default_fallback(Config) end, [], ?NUM_TESTS).
68+
69+
prop_default_fallback(_Config) ->
70+
?FORALL(Args, invalid_prefix_args_gen(),
71+
rabbit_amqqueue:server_named_queue_prefix(Args) =:= {ok, <<"amq.gen">>}).
72+
73+
%% rabbit_guid:binary/2 produces "prefix-<base64url>".
74+
prop_generated_name_starts_with_prefix(Config) ->
75+
rabbit_ct_proper_helpers:run_proper(
76+
fun() -> prop_name_starts_with_prefix(Config) end, [], ?NUM_TESTS).
77+
78+
prop_name_starts_with_prefix(_Config) ->
79+
?FORALL({Prefix, Guid}, {valid_prefix_gen(), binary(16)},
80+
begin
81+
Generated = rabbit_guid:binary(Guid, Prefix),
82+
ExpectedHead = <<Prefix/binary, "-">>,
83+
HeadLen = byte_size(ExpectedHead),
84+
byte_size(Generated) >= HeadLen andalso
85+
binary:part(Generated, 0, HeadLen) =:= ExpectedHead
86+
end).
87+
88+
%% The generated name fits within the 255-byte AMQP shortstr limit.
89+
prop_generated_name_within_amqp_limit(Config) ->
90+
rabbit_ct_proper_helpers:run_proper(
91+
fun() -> prop_name_within_limit(Config) end, [], ?NUM_TESTS).
92+
93+
prop_name_within_limit(_Config) ->
94+
?FORALL({Prefix, Guid}, {valid_prefix_gen(), binary(16)},
95+
byte_size(rabbit_guid:binary(Guid, Prefix)) =< 255).
96+
97+
%% Two distinct GUIDs with the same prefix produce different names.
98+
prop_different_guids_yield_different_names(Config) ->
99+
rabbit_ct_proper_helpers:run_proper(
100+
fun() -> prop_unique_names(Config) end, [], ?NUM_TESTS).
101+
102+
prop_unique_names(_Config) ->
103+
?FORALL({Prefix, G1, G2}, {valid_prefix_gen(), binary(16), binary(16)},
104+
G1 =:= G2 orelse
105+
rabbit_guid:binary(G1, Prefix) =/= rabbit_guid:binary(G2, Prefix)).
106+
107+
%% Two distinct prefixes with the same GUID produce different names.
108+
prop_different_prefixes_yield_different_names(Config) ->
109+
rabbit_ct_proper_helpers:run_proper(
110+
fun() -> prop_unique_prefixes(Config) end, [], ?NUM_TESTS).
111+
112+
prop_unique_prefixes(_Config) ->
113+
?FORALL({P1, P2, Guid}, {valid_prefix_gen(), valid_prefix_gen(), binary(16)},
114+
P1 =:= P2 orelse
115+
rabbit_guid:binary(Guid, P1) =/= rabbit_guid:binary(Guid, P2)).
116+
117+
%% -------------------------------------------------------------------
118+
%% Generators
119+
%% -------------------------------------------------------------------
120+
121+
valid_prefix_gen() ->
122+
?SUCHTHAT(P, non_empty(binary()), byte_size(P) =< ?MAX_PREFIX_LEN).
123+
124+
too_long_prefix_gen() ->
125+
?LET(Extra, binary(),
126+
<<(binary:copy(<<"x">>, ?MAX_PREFIX_LEN + 1))/binary, Extra/binary>>).
127+
128+
invalid_prefix_args_gen() ->
129+
oneof([
130+
%% no name_prefix at all
131+
list(unrelated_arg_gen()),
132+
%% empty binary
133+
?LET(Extra, list(unrelated_arg_gen()),
134+
[{<<"name_prefix">>, longstr, <<>>} | Extra]),
135+
%% non-longstr type
136+
?LET({Extra, Type},
137+
{list(unrelated_arg_gen()),
138+
oneof([signedint, bool, double, timestamp, byte, short, long,
139+
float, binary, void])},
140+
[{<<"name_prefix">>, Type, 0} | Extra])
141+
]).
142+
143+
unrelated_arg_gen() ->
144+
?LET(Key, oneof([<<"x-queue-type">>, <<"x-max-length">>, <<"x-expires">>]),
145+
{Key, longstr, <<"classic">>}).

0 commit comments

Comments
 (0)