Skip to content

Commit 37553ab

Browse files
Merge pull request #15809 from rabbitmq/mergify/bp/v4.3.x/pr-13016
Fix #13009, STOMP: default to exclusive queues except stream queues (backport #13016)
2 parents d157b66 + 97e517b commit 37553ab

7 files changed

Lines changed: 59 additions & 48 deletions

File tree

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -247,14 +247,14 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
247247
end.
248248

249249
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
250-
%% This version is not virtual host metadata-aware but will use
251-
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
250+
%% This version is not virtual host metadata-aware.
252251
get_queue_type([]) ->
253252
rabbit_queue_type:default();
254253
get_queue_type(Args) ->
255254
get_queue_type(Args, rabbit_queue_type:default()).
256255

257-
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
256+
-spec get_queue_type(Args :: rabbit_framing:amqp_table(), DefaultQueueType :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
257+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'.
258258
get_queue_type([], DefaultQueueType) ->
259259
rabbit_queue_type:discover(DefaultQueueType);
260260
get_queue_type(Args, DefaultQueueType) ->

deps/amqp_client/include/rabbit_routing_prefixes.hrl renamed to deps/rabbitmq_stomp/include/rabbit_stomp_routing_prefixes.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
77

88
-define(QUEUE_PREFIX, "/queue").

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 26 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
adapter_info, send_fun, ssl_login_name, peer_addr,
3131
%% see rabbitmq/rabbitmq-stomp#39
3232
trailing_lf, auth_mechanism, auth_login,
33-
default_topic_exchange, default_nack_requeue}).
33+
default_topic_exchange, default_nack_requeue,
34+
virtual_host}).
3435

3536
-record(subscription, {dest_hdr, ack_mode, multi_ack, description}).
3637

@@ -152,7 +153,7 @@ initial_state(Configuration,
152153
version = none,
153154
pending_receipts = undefined,
154155
config = Configuration,
155-
route_state = rabbit_routing_util:init_state(),
156+
route_state = rabbit_stomp_routing_util:init_state(),
156157
reply_queues = #{},
157158
frame_transformer = undefined,
158159
trailing_lf = application:get_env(rabbitmq_stomp, trailing_lf, true),
@@ -556,7 +557,7 @@ with_destination(Command, Frame, State, Fun) ->
556557
"'~ts' is not a valid destination.~n"
557558
"Valid destination types are: ~ts.~n",
558559
[Content,
559-
string:join(rabbit_routing_util:all_dest_prefixes(),
560+
string:join(rabbit_stomp_routing_util:all_dest_prefixes(),
560561
", ")], State)
561562
end;
562563
not_found ->
@@ -606,10 +607,11 @@ do_login(Username, Passwd, VirtualHost, Heartbeat, AdapterInfo, Version,
606607
false -> [{?HEADER_SERVER, server_header()} | Headers]
607608
end,
608609
"",
609-
State#proc_state{session_id = SessionId,
610-
channel = Channel,
611-
connection = Connection,
612-
version = Version});
610+
State#proc_state{session_id = SessionId,
611+
channel = Channel,
612+
connection = Connection,
613+
version = Version,
614+
virtual_host = VirtualHost});
613615
{error, {auth_failure, _}} ->
614616
?LOG_WARNING("STOMP login failed for user '~ts': authentication failed", [Username]),
615617
error("Bad CONNECT", "Access refused for user '" ++
@@ -650,15 +652,14 @@ server_header() ->
650652

651653
do_subscribe(Destination, DestHdr, Frame,
652654
State = #proc_state{subscriptions = Subs,
653-
route_state = RouteState,
654655
channel = Channel,
655656
default_topic_exchange = DfltTopicEx}) ->
656657
check_subscription_access(Destination, State),
657658
Prefetch =
658659
rabbit_stomp_frame:integer_header(Frame, ?HEADER_PREFETCH_COUNT,
659660
undefined),
660661
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
661-
case ensure_endpoint(source, Destination, Frame, Channel, RouteState) of
662+
case ensure_endpoint(source, Destination, Frame, Channel, State) of
662663
{ok, Queue, RouteState1} ->
663664
{ok, ConsumerTag, Description} =
664665
rabbit_stomp_util:consumer_tag(Frame),
@@ -687,7 +688,7 @@ do_subscribe(Destination, DestHdr, Frame,
687688
exclusive = false,
688689
arguments = Arguments},
689690
self()),
690-
ok = rabbit_routing_util:ensure_binding(
691+
ok = rabbit_stomp_routing_util:ensure_binding(
691692
Queue, ExchangeAndKey, Channel)
692693
catch exit:Err ->
693694
%% it's safe to delete this queue, it
@@ -788,9 +789,8 @@ maybe_clean_up_queue(Queue, #proc_state{connection = Connection}) ->
788789
do_send(Destination, _DestHdr,
789790
Frame = #stomp_frame{body_iolist = BodyFragments},
790791
State = #proc_state{channel = Channel,
791-
route_state = RouteState,
792792
default_topic_exchange = DfltTopicEx}) ->
793-
case ensure_endpoint(dest, Destination, Frame, Channel, RouteState) of
793+
case ensure_endpoint(dest, Destination, Frame, Channel, State) of
794794

795795
{ok, _Q, RouteState1} ->
796796

@@ -912,7 +912,7 @@ ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) ->
912912
{Frame, State};
913913
{ok, ReplyTo} ->
914914
{ok, Destination} = rabbit_routing_parser:parse_endpoint(ReplyTo),
915-
case rabbit_routing_util:dest_temp_queue(Destination) of
915+
case rabbit_stomp_routing_util:dest_temp_queue(Destination) of
916916
none ->
917917
{Frame, State};
918918
TempQueueId ->
@@ -1128,7 +1128,8 @@ millis_to_seconds(M) -> M div 1000.
11281128
ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) ->
11291129
{error, {invalid_destination, "Destination cannot be blank"}};
11301130

1131-
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) ->
1131+
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel,
1132+
#proc_state{virtual_host = VHost, route_state = RouteState}) ->
11321133
Params =
11331134
[{subscription_queue_name_gen,
11341135
fun () ->
@@ -1138,16 +1139,20 @@ ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) ->
11381139
{_, Name} = rabbit_routing_parser:parse_routing(EndPoint),
11391140
list_to_binary(rabbit_stomp_util:subscription_queue_name(Name, Id, Frame))
11401141
end
1141-
}] ++ rabbit_stomp_util:build_params(EndPoint, Headers),
1142+
},
1143+
{default_queue_type, rabbit_vhost:default_queue_type(VHost)}]
1144+
++ rabbit_stomp_util:build_params(EndPoint, Headers),
11421145
Arguments = rabbit_stomp_util:build_arguments(Headers),
1143-
rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint,
1144-
[Arguments | Params], State);
1146+
rabbit_stomp_routing_util:ensure_endpoint(source, Channel, EndPoint,
1147+
[Arguments | Params], RouteState);
11451148

1146-
ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel, State) ->
1147-
Params = rabbit_stomp_util:build_params(EndPoint, Headers),
1149+
ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel,
1150+
#proc_state{virtual_host = VHost, route_state = RouteState}) ->
1151+
Params = [{default_queue_type, rabbit_vhost:default_queue_type(VHost)}
1152+
| rabbit_stomp_util:build_params(EndPoint, Headers)],
11481153
Arguments = rabbit_stomp_util:build_arguments(Headers),
1149-
rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
1150-
[Arguments | Params], State).
1154+
rabbit_stomp_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
1155+
[Arguments | Params], RouteState).
11511156

11521157
build_subscription_id(Frame) ->
11531158
case rabbit_stomp_util:has_durable_header(Frame) of

deps/amqp_client/src/rabbit_routing_util.erl renamed to deps/rabbitmq_stomp/src/rabbit_stomp_routing_util.erl

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
77

8-
-module(rabbit_routing_util).
8+
-module(rabbit_stomp_routing_util).
99

1010
-export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]).
1111
-export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]).
1212
-export([dest_temp_queue/1]).
1313

14-
-include("amqp_client.hrl").
15-
-include("rabbit_routing_prefixes.hrl").
14+
-include_lib("amqp_client/include/amqp_client.hrl").
15+
-include("rabbit_stomp_routing_prefixes.hrl").
1616

1717
%%----------------------------------------------------------------------------
1818

@@ -137,17 +137,29 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) ->
137137
false -> Method#'queue.declare'{auto_delete = true,
138138
exclusive = true}
139139
end,
140+
140141
%% set the rest of queue.declare fields from Params
141142
Method2 = lists:foldl(fun (F, Acc) -> F(Acc, Params) end,
142143
Method1, [fun update_queue_declare_arguments/2,
143144
fun update_queue_declare_exclusive/2,
144145
fun update_queue_declare_auto_delete/2,
145146
fun update_queue_declare_nowait/2]),
147+
148+
Arguments = proplists:get_value(arguments, Params, []),
149+
DefaultQueueType = proplists:get_value(default_queue_type, Params,
150+
rabbit_queue_type:default()),
151+
Method3 = case rabbit_amqqueue:get_queue_type(Arguments, DefaultQueueType) of
152+
T when T =:= rabbit_stream_queue; T =:= rabbit_quorum_queue ->
153+
Method2#'queue.declare'{durable = true,
154+
exclusive = false};
155+
_ -> Method2
156+
end,
157+
146158
case {Type, proplists:get_value(subscription_queue_name_gen, Params)} of
147159
{topic, SQNG} when is_function(SQNG) ->
148-
Method2#'queue.declare'{queue = SQNG()};
160+
Method3#'queue.declare'{queue = SQNG()};
149161
{exchange, SQNG} when is_function(SQNG) ->
150-
Method2#'queue.declare'{queue = SQNG()};
162+
Method3#'queue.declare'{queue = SQNG()};
151163
_ ->
152-
Method2
164+
Method3
153165
end.

deps/rabbitmq_stomp/src/rabbit_stomp_util.erl

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
-export([trim_headers/1]).
1818

1919
-include_lib("amqp_client/include/amqp_client.hrl").
20-
-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
20+
-include("rabbit_stomp_routing_prefixes.hrl").
2121
-include("rabbit_stomp_frame.hrl").
2222
-include("rabbit_stomp_headers.hrl").
2323

@@ -135,7 +135,7 @@ headers_extra(SessionId, AckMode, Version,
135135
end.
136136

137137
headers_post_process(Headers) ->
138-
Prefixes = rabbit_routing_util:dest_prefixes(),
138+
Prefixes = rabbit_stomp_routing_util:dest_prefixes(),
139139
[case Header of
140140
{?HEADER_REPLY_TO, V} ->
141141
case lists:any(fun (P) -> lists:prefix(P, V) end, Prefixes) of
@@ -333,10 +333,11 @@ default_params({exchange, _}) ->
333333
[{exclusive, true}, {auto_delete, true}];
334334

335335
default_params({topic, _}) ->
336-
[{exclusive, false}, {auto_delete, true}];
336+
[{auto_delete, true}];
337337

338338
default_params(_) ->
339-
[{durable, false}].
339+
[{exclusive, true},
340+
{durable, false}].
340341

341342
string_to_boolean("True") ->
342343
true;

deps/rabbitmq_stomp/test/python_SUITE.erl

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,9 @@ init_per_group(_, Config) ->
4646
{rmq_certspwd, "bunnychow"}
4747
]),
4848
rabbit_ct_helpers:log_environment(),
49-
%% Remove when transient_nonexcl_queues is removed entirely,
50-
%% and fix the Python tests.
51-
Config1 = rabbit_ct_helpers:merge_app_env(
52-
Config0,
53-
{rabbit,
54-
[{permit_deprecated_features, #{transient_nonexcl_queues => true}}]}),
55-
Config2 = rabbit_ct_helpers:run_setup_steps(
56-
Config1,
57-
rabbit_ct_broker_helpers:setup_steps()),
58-
Config2.
49+
rabbit_ct_helpers:run_setup_steps(
50+
Config0,
51+
rabbit_ct_broker_helpers:setup_steps()).
5952

6053
end_per_group(_, Config) ->
6154
rabbit_ct_helpers:run_teardown_steps(Config,

deps/rabbitmq_stomp/test/util_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-include_lib("eunit/include/eunit.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
12-
-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
12+
-include("rabbit_stomp_routing_prefixes.hrl").
1313
-include("rabbit_stomp_frame.hrl").
1414
-compile(export_all).
1515

@@ -149,7 +149,7 @@ headers_post_process_noop_replyto(_) ->
149149
[begin
150150
Headers = [{"reply-to", Prefix ++ "/something"}],
151151
Headers = rabbit_stomp_util:headers_post_process(Headers)
152-
end || Prefix <- rabbit_routing_util:dest_prefixes()].
152+
end || Prefix <- rabbit_stomp_routing_util:dest_prefixes()].
153153

154154
headers_post_process_noop2(_) ->
155155
Headers = [{"header1", "1"},

0 commit comments

Comments
 (0)