Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -249,16 +249,16 @@ define ct_master.erl
endef

PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet server_named_queue_prefix_prop signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop term_to_binary_compat_prop topic_permission unicode unit_access_control user_tags_count_limit
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit server_named_queue_prefix

PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
PARALLEL_CT_SET_2_B = crashing_queues deprecated_features direct_exchange_routing_v2 disconnect_detected_during_alarm exchanges unit_gen_server2
PARALLEL_CT_SET_2_C = disk_monitor dynamic_qq unit_disk_monitor unit_log_management unit_operator_policy prevent_startup_if_node_was_reset
PARALLEL_CT_SET_2_D = queue_length_limits queue_parallel quorum_queue_member_reconciliation rabbit_fifo rabbit_fifo_dlx rabbit_stream_coordinator

PARALLEL_CT_SET_3_A = unit_definitions definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue unit_rabbit_khepri_migrations
PARALLEL_CT_SET_3_A = unit_definitions definition_import per_user_connection_channel_limit_partitions per_vhost_connection_limit_partitions policy priority_queue_recovery rabbit_fifo_v0 rabbit_stream_sac_coordinator_v4 rabbit_stream_sac_coordinator unit_credit_flow unit_queue_consumers unit_queue_location unit_quorum_queue unit_rabbit_khepri_migrations unit_server_named_queue
PARALLEL_CT_SET_3_B = list_consumers_sanity_check list_queues_online_and_offline logging lqueue rabbit_fifo_q rabbit_fifo_pq
PARALLEL_CT_SET_3_C = cli_forget_cluster_node mc_unit message_size_limit
PARALLEL_CT_SET_3_D = metrics mirrored_supervisor proxy_protocol runtime_parameters unit_rabbit_vm unit_stats_and_metrics unit_supervisor2 unit_vm_memory_monitor
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/src/rabbit_amqqueue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@
-export([collect_info_all/2]).

-export([is_policy_applicable/2, declare_args/0, consume_args/0]).
-export([is_server_named_allowed/1]).
-export([is_server_named_allowed/1, server_named_queue_prefix/1]).

-export([check_max_age/1]).
-export([get_queue_type/1, get_queue_type/2, get_resource_vhost_name/1, get_resource_name/1]).
Expand Down Expand Up @@ -362,6 +362,19 @@ is_server_named_allowed(Args) ->
Type = get_queue_type(Args),
rabbit_queue_type:is_server_named_allowed(Type).

-spec server_named_queue_prefix(rabbit_framing:amqp_table()) ->
{ok, binary()} | {error, {prefix_too_long, binary()}}.
server_named_queue_prefix(Args) ->
case rabbit_misc:table_lookup(Args, <<"x-name-prefix">>) of
{longstr, Prefix} when Prefix =/= <<>> ->
case byte_size(Prefix) =< 64 of
true -> {ok, Prefix};
false -> {error, {prefix_too_long, Prefix}}
end;
_ ->
{ok, <<"amq.gen">>}
end.

-spec lookup(QueueName) -> Ret when
QueueName :: name(),
Ret :: rabbit_types:ok(amqqueue:amqqueue())
Expand Down
15 changes: 13 additions & 2 deletions deps/rabbit/src/rabbit_channel.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2442,19 +2442,30 @@ handle_method(#'queue.declare'{queue = QueueNameBin,
true -> ConnPid;
false -> none
end,
Args0Stripped = lists:keydelete(<<"x-name-prefix">>, 1, Args0),
Args = rabbit_amqqueue:augment_declare_args(VHostPath,
DurableDeclare,
ExclusiveDeclare,
AutoDelete,
Args0),
Args0Stripped),
StrippedQueueNameBin = strip_cr_lf(QueueNameBin),
Durable = DurableDeclare andalso not ExclusiveDeclare,
Kind = queue,
ActualNameBin = case StrippedQueueNameBin of
<<>> ->
case rabbit_amqqueue:is_server_named_allowed(Args) of
true ->
rabbit_guid:binary(rabbit_guid:gen_secure(), "amq.gen");
NamePrefix =
case rabbit_amqqueue:server_named_queue_prefix(Args0) of
{ok, P} ->
strip_cr_lf(P);
{error, {prefix_too_long, P}} ->
rabbit_misc:protocol_error(
precondition_failed,
"Queue name prefix '~ts' exceeds the 64-byte limit",
[strip_cr_lf(P)])
end,
rabbit_guid:binary(rabbit_guid:gen_secure(), NamePrefix);
false ->
rabbit_misc:protocol_error(
precondition_failed,
Expand Down
153 changes: 153 additions & 0 deletions deps/rabbit/test/server_named_queue_prefix_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(server_named_queue_prefix_SUITE).

-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("eunit/include/eunit.hrl").

-compile([export_all, nowarn_export_all]).

all() ->
[
{group, tests}
].

groups() ->
[
{tests, [], [
default_prefix,
custom_prefix,
custom_prefix_with_other_args,
prefix_at_max_length,
prefix_too_long_rejected,
name_prefix_not_stored_in_queue_args,
name_prefix_ignored_for_named_queue
]}
].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_group(_, Config) ->
Config.

end_per_group(_, Config) ->
Config.

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase),
Config1 = rabbit_ct_helpers:set_config(Config, [{rmq_nodename_suffix, Testcase}]),
rabbit_ct_helpers:run_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).

end_per_testcase(Testcase, Config) ->
Config1 = rabbit_ct_helpers:run_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()),
rabbit_ct_helpers:testcase_finished(Config1, Testcase).

%% Without `x-name-prefix` the queue name starts with "amq.gen-".
default_prefix(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{queue = <<>>, exclusive = true}),
?assertMatch(<<"amq.gen-", _/binary>>, QName),
rabbit_ct_client_helpers:close_channel(Ch).

%% With `x-name-prefix` set, the generated name starts with "<prefix>-".
custom_prefix(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{
queue = <<>>,
exclusive = true,
arguments = [{<<"x-name-prefix">>, longstr, <<"myapp">>}]
}),
?assertMatch(<<"myapp-", _/binary>>, QName),
rabbit_ct_client_helpers:close_channel(Ch).

%% `x-name-prefix` works alongside other queue arguments.
custom_prefix_with_other_args(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{
queue = <<>>,
exclusive = true,
arguments = [
{<<"x-name-prefix">>, longstr, <<"svc">>},
{<<"x-message-ttl">>, signedint, 60000}
]
}),
?assertMatch(<<"svc-", _/binary>>, QName),
rabbit_ct_client_helpers:close_channel(Ch).

%% A prefix of exactly 64 bytes is accepted.
prefix_at_max_length(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
Prefix = binary:copy(<<"x">>, 64),
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{
queue = <<>>,
exclusive = true,
arguments = [{<<"x-name-prefix">>, longstr, Prefix}]
}),
ExpectedHead = <<Prefix/binary, "-">>,
?assert(binary:part(QName, 0, byte_size(ExpectedHead)) =:= ExpectedHead),
rabbit_ct_client_helpers:close_channel(Ch).

%% A prefix longer than 64 bytes causes the broker to close the channel.
prefix_too_long_rejected(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
TooLong = binary:copy(<<"x">>, 65),
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
amqp_channel:call(Ch, #'queue.declare'{
queue = <<>>,
exclusive = true,
arguments = [{<<"x-name-prefix">>, longstr, TooLong}]
})).

%% `x-name-prefix` is not stored in the declared queue's arguments.
name_prefix_not_stored_in_queue_args(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{
queue = <<>>,
exclusive = true,
arguments = [{<<"x-name-prefix">>, longstr, <<"myapp">>}]
}),
QRes = rabbit_misc:r(<<"/">>, queue, QName),
{ok, Q} = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_amqqueue, lookup, [QRes]),
StoredArgs = amqqueue:get_arguments(Q),
?assertEqual(undefined, rabbit_misc:table_lookup(StoredArgs, <<"x-name-prefix">>)),
rabbit_ct_client_helpers:close_channel(Ch).

%% `x-name-prefix` is ignored for explicitly named queues.
%% Redeclaring with `x-name-prefix` also passes the equivalence check.
name_prefix_ignored_for_named_queue(Config) ->
Ch = rabbit_ct_client_helpers:open_channel(Config),
QName = <<"my-explicit-queue">>,
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{
queue = QName,
exclusive = true,
arguments = [{<<"x-name-prefix">>, longstr, <<"ignored">>}]
}),
%% Redeclare the same queue with a `x-name-prefix`.
#'queue.declare_ok'{queue = QName} =
amqp_channel:call(Ch, #'queue.declare'{
queue = QName,
exclusive = true,
arguments = [{<<"x-name-prefix">>, longstr, <<"ignored">>}]
}),
rabbit_ct_client_helpers:close_channel(Ch).
145 changes: 145 additions & 0 deletions deps/rabbit/test/server_named_queue_prefix_prop_SUITE.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%

-module(server_named_queue_prefix_prop_SUITE).

-compile(export_all).

-include_lib("common_test/include/ct.hrl").
-include_lib("proper/include/proper.hrl").

-define(NUM_TESTS, 1000).
-define(MAX_PREFIX_LEN, 64).

all() ->
[
prop_valid_prefix_returned_verbatim,
prop_too_long_prefix_yields_error,
prop_no_valid_prefix_yields_default,
prop_generated_name_starts_with_prefix,
prop_generated_name_within_amqp_limit,
prop_different_guids_yield_different_names,
prop_different_prefixes_yield_different_names
].

init_per_suite(Config) ->
rabbit_ct_helpers:log_environment(),
rabbit_ct_helpers:run_setup_steps(Config).

end_per_suite(Config) ->
rabbit_ct_helpers:run_teardown_steps(Config).

init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).

%% A non-empty prefix of at most 64 bytes is returned as {ok, Prefix}.
prop_valid_prefix_returned_verbatim(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_valid_prefix(Config) end, [], ?NUM_TESTS).

prop_valid_prefix(_Config) ->
?FORALL(Prefix, valid_prefix_gen(),
begin
Args = [{<<"x-name-prefix">>, longstr, Prefix}],
rabbit_amqqueue:server_named_queue_prefix(Args) =:= {ok, Prefix}
end).

%% A prefix longer than 64 bytes yields {error, {prefix_too_long, Prefix}}.
prop_too_long_prefix_yields_error(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_too_long_prefix(Config) end, [], ?NUM_TESTS).

prop_too_long_prefix(_Config) ->
?FORALL(Prefix, too_long_prefix_gen(),
begin
Args = [{<<"x-name-prefix">>, longstr, Prefix}],
rabbit_amqqueue:server_named_queue_prefix(Args) =:=
{error, {prefix_too_long, Prefix}}
end).

%% Absent, empty, or wrong-type `x-name-prefix` yields {ok, <<"amq.gen">>}.
prop_no_valid_prefix_yields_default(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_default_fallback(Config) end, [], ?NUM_TESTS).

prop_default_fallback(_Config) ->
?FORALL(Args, invalid_prefix_args_gen(),
rabbit_amqqueue:server_named_queue_prefix(Args) =:= {ok, <<"amq.gen">>}).

%% rabbit_guid:binary/2 produces "prefix-<base64url>".
prop_generated_name_starts_with_prefix(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_name_starts_with_prefix(Config) end, [], ?NUM_TESTS).

prop_name_starts_with_prefix(_Config) ->
?FORALL({Prefix, Guid}, {valid_prefix_gen(), binary(16)},
begin
Generated = rabbit_guid:binary(Guid, Prefix),
ExpectedHead = <<Prefix/binary, "-">>,
HeadLen = byte_size(ExpectedHead),
byte_size(Generated) >= HeadLen andalso
binary:part(Generated, 0, HeadLen) =:= ExpectedHead
end).

%% The generated name fits within the 255-byte AMQP shortstr limit.
prop_generated_name_within_amqp_limit(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_name_within_limit(Config) end, [], ?NUM_TESTS).

prop_name_within_limit(_Config) ->
?FORALL({Prefix, Guid}, {valid_prefix_gen(), binary(16)},
byte_size(rabbit_guid:binary(Guid, Prefix)) =< 255).

%% Two distinct GUIDs with the same prefix produce different names.
prop_different_guids_yield_different_names(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_unique_names(Config) end, [], ?NUM_TESTS).

prop_unique_names(_Config) ->
?FORALL({Prefix, G1, G2}, {valid_prefix_gen(), binary(16), binary(16)},
G1 =:= G2 orelse
rabbit_guid:binary(G1, Prefix) =/= rabbit_guid:binary(G2, Prefix)).

%% Two distinct prefixes with the same GUID produce different names.
prop_different_prefixes_yield_different_names(Config) ->
rabbit_ct_proper_helpers:run_proper(
fun() -> prop_unique_prefixes(Config) end, [], ?NUM_TESTS).

prop_unique_prefixes(_Config) ->
?FORALL({P1, P2, Guid}, {valid_prefix_gen(), valid_prefix_gen(), binary(16)},
P1 =:= P2 orelse
rabbit_guid:binary(Guid, P1) =/= rabbit_guid:binary(Guid, P2)).

%% -------------------------------------------------------------------
%% Generators
%% -------------------------------------------------------------------

valid_prefix_gen() ->
?SUCHTHAT(P, non_empty(binary()), byte_size(P) =< ?MAX_PREFIX_LEN).

too_long_prefix_gen() ->
?LET(Extra, binary(),
<<(binary:copy(<<"x">>, ?MAX_PREFIX_LEN + 1))/binary, Extra/binary>>).

invalid_prefix_args_gen() ->
oneof([
%% no `x-name-prefix` at all
list(unrelated_arg_gen()),
%% empty binary
?LET(Extra, list(unrelated_arg_gen()),
[{<<"x-name-prefix">>, longstr, <<>>} | Extra]),
%% non-longstr type
?LET({Extra, Type},
{list(unrelated_arg_gen()),
oneof([signedint, bool, double, timestamp, byte, short, long,
float, binary, void])},
[{<<"x-name-prefix">>, Type, 0} | Extra])
]).

unrelated_arg_gen() ->
?LET(Key, oneof([<<"x-queue-type">>, <<"x-max-length">>, <<"x-expires">>]),
{Key, longstr, <<"classic">>}).
Loading
Loading