Skip to content

Commit 97e517b

Browse files
michaelklishinmergify[bot]
authored andcommitted
MOve rabbit_routing_util to STOMP #13009 #13016
This avoids making `amqp_client` aware of the DQT code paths in the server, plus its Dialyzer run now passes. (cherry picked from commit b2d5376)
1 parent ab75e63 commit 97e517b

8 files changed

Lines changed: 42 additions & 71 deletions

File tree

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,13 +248,26 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
248248

249249
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
250250
%% This version is not virtual host metadata-aware.
251+
get_queue_type([]) ->
252+
rabbit_queue_type:default();
251253
get_queue_type(Args) ->
252-
rabbit_queue_type_common:get_queue_type(Args, rabbit_queue_type:default()).
254+
get_queue_type(Args, rabbit_queue_type:default()).
253255

254256
-spec get_queue_type(Args :: rabbit_framing:amqp_table(), DefaultQueueType :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
255257
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'.
258+
get_queue_type([], DefaultQueueType) ->
259+
rabbit_queue_type:discover(DefaultQueueType);
256260
get_queue_type(Args, DefaultQueueType) ->
257-
rabbit_queue_type_common:get_queue_type(Args, DefaultQueueType).
261+
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
262+
undefined ->
263+
rabbit_queue_type:discover(DefaultQueueType);
264+
{longstr, undefined} ->
265+
rabbit_queue_type:discover(DefaultQueueType);
266+
{longstr, <<"undefined">>} ->
267+
rabbit_queue_type:discover(DefaultQueueType);
268+
{_, V} ->
269+
rabbit_queue_type:discover(V)
270+
end.
258271

259272
-spec internal_declare(Queue, Recover) -> Ret when
260273
Queue :: amqqueue:amqqueue(),

deps/rabbit_common/src/rabbit_queue_type_common.erl

Lines changed: 0 additions & 33 deletions
This file was deleted.

deps/amqp_client/include/rabbit_routing_prefixes.hrl renamed to deps/rabbitmq_stomp/include/rabbit_stomp_routing_prefixes.hrl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
77

88
-define(QUEUE_PREFIX, "/queue").

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -153,7 +153,7 @@ initial_state(Configuration,
153153
version = none,
154154
pending_receipts = undefined,
155155
config = Configuration,
156-
route_state = rabbit_routing_util:init_state(),
156+
route_state = rabbit_stomp_routing_util:init_state(),
157157
reply_queues = #{},
158158
frame_transformer = undefined,
159159
trailing_lf = application:get_env(rabbitmq_stomp, trailing_lf, true),
@@ -557,7 +557,7 @@ with_destination(Command, Frame, State, Fun) ->
557557
"'~ts' is not a valid destination.~n"
558558
"Valid destination types are: ~ts.~n",
559559
[Content,
560-
string:join(rabbit_routing_util:all_dest_prefixes(),
560+
string:join(rabbit_stomp_routing_util:all_dest_prefixes(),
561561
", ")], State)
562562
end;
563563
not_found ->
@@ -652,15 +652,14 @@ server_header() ->
652652

653653
do_subscribe(Destination, DestHdr, Frame,
654654
State = #proc_state{subscriptions = Subs,
655-
route_state = RouteState,
656655
channel = Channel,
657656
default_topic_exchange = DfltTopicEx}) ->
658657
check_subscription_access(Destination, State),
659658
Prefetch =
660659
rabbit_stomp_frame:integer_header(Frame, ?HEADER_PREFETCH_COUNT,
661660
undefined),
662661
{AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame),
663-
case ensure_endpoint(source, Destination, Frame, Channel, RouteState) of
662+
case ensure_endpoint(source, Destination, Frame, Channel, State) of
664663
{ok, Queue, RouteState1} ->
665664
{ok, ConsumerTag, Description} =
666665
rabbit_stomp_util:consumer_tag(Frame),
@@ -689,7 +688,7 @@ do_subscribe(Destination, DestHdr, Frame,
689688
exclusive = false,
690689
arguments = Arguments},
691690
self()),
692-
ok = rabbit_routing_util:ensure_binding(
691+
ok = rabbit_stomp_routing_util:ensure_binding(
693692
Queue, ExchangeAndKey, Channel)
694693
catch exit:Err ->
695694
%% it's safe to delete this queue, it
@@ -790,9 +789,8 @@ maybe_clean_up_queue(Queue, #proc_state{connection = Connection}) ->
790789
do_send(Destination, _DestHdr,
791790
Frame = #stomp_frame{body_iolist = BodyFragments},
792791
State = #proc_state{channel = Channel,
793-
route_state = RouteState,
794792
default_topic_exchange = DfltTopicEx}) ->
795-
case ensure_endpoint(dest, Destination, Frame, Channel, RouteState) of
793+
case ensure_endpoint(dest, Destination, Frame, Channel, State) of
796794

797795
{ok, _Q, RouteState1} ->
798796

@@ -914,7 +912,7 @@ ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) ->
914912
{Frame, State};
915913
{ok, ReplyTo} ->
916914
{ok, Destination} = rabbit_routing_parser:parse_endpoint(ReplyTo),
917-
case rabbit_routing_util:dest_temp_queue(Destination) of
915+
case rabbit_stomp_routing_util:dest_temp_queue(Destination) of
918916
none ->
919917
{Frame, State};
920918
TempQueueId ->
@@ -1131,7 +1129,7 @@ ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) ->
11311129
{error, {invalid_destination, "Destination cannot be blank"}};
11321130

11331131
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel,
1134-
State = #proc_state{virtual_host = VHost}) ->
1132+
#proc_state{virtual_host = VHost, route_state = RouteState}) ->
11351133
Params =
11361134
[{subscription_queue_name_gen,
11371135
fun () ->
@@ -1145,16 +1143,16 @@ ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel,
11451143
{default_queue_type, rabbit_vhost:default_queue_type(VHost)}]
11461144
++ rabbit_stomp_util:build_params(EndPoint, Headers),
11471145
Arguments = rabbit_stomp_util:build_arguments(Headers),
1148-
rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint,
1149-
[Arguments | Params], State);
1146+
rabbit_stomp_routing_util:ensure_endpoint(source, Channel, EndPoint,
1147+
[Arguments | Params], RouteState);
11501148

11511149
ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel,
1152-
State = #proc_state{virtual_host = VHost}) ->
1150+
#proc_state{virtual_host = VHost, route_state = RouteState}) ->
11531151
Params = [{default_queue_type, rabbit_vhost:default_queue_type(VHost)}
11541152
| rabbit_stomp_util:build_params(EndPoint, Headers)],
11551153
Arguments = rabbit_stomp_util:build_arguments(Headers),
1156-
rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
1157-
[Arguments | Params], State).
1154+
rabbit_stomp_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
1155+
[Arguments | Params], RouteState).
11581156

11591157
build_subscription_id(Frame) ->
11601158
case rabbit_stomp_util:has_durable_header(Frame) of

deps/amqp_client/src/rabbit_routing_util.erl renamed to deps/rabbitmq_stomp/src/rabbit_stomp_routing_util.erl

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,17 @@
22
%% License, v. 2.0. If a copy of the MPL was not distributed with this
33
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
44
%%
5-
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
66
%%
77

8-
-module(rabbit_routing_util).
8+
-module(rabbit_stomp_routing_util).
99

1010
-export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]).
1111
-export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]).
1212
-export([dest_temp_queue/1]).
1313

14-
-include("amqp_client.hrl").
15-
-include("rabbit_routing_prefixes.hrl").
14+
-include_lib("amqp_client/include/amqp_client.hrl").
15+
-include("rabbit_stomp_routing_prefixes.hrl").
1616

1717
%%----------------------------------------------------------------------------
1818

@@ -147,8 +147,8 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) ->
147147

148148
Arguments = proplists:get_value(arguments, Params, []),
149149
DefaultQueueType = proplists:get_value(default_queue_type, Params,
150-
rabbit_queue_type_common:default()),
151-
Method3 = case rabbit_queue_type_common:get_queue_type(Arguments, DefaultQueueType) of
150+
rabbit_queue_type:default()),
151+
Method3 = case rabbit_amqqueue:get_queue_type(Arguments, DefaultQueueType) of
152152
T when T =:= rabbit_stream_queue; T =:= rabbit_quorum_queue ->
153153
Method2#'queue.declare'{durable = true,
154154
exclusive = false};

deps/rabbitmq_stomp/src/rabbit_stomp_util.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
-export([trim_headers/1]).
1818

1919
-include_lib("amqp_client/include/amqp_client.hrl").
20-
-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
20+
-include("rabbit_stomp_routing_prefixes.hrl").
2121
-include("rabbit_stomp_frame.hrl").
2222
-include("rabbit_stomp_headers.hrl").
2323

@@ -135,7 +135,7 @@ headers_extra(SessionId, AckMode, Version,
135135
end.
136136

137137
headers_post_process(Headers) ->
138-
Prefixes = rabbit_routing_util:dest_prefixes(),
138+
Prefixes = rabbit_stomp_routing_util:dest_prefixes(),
139139
[case Header of
140140
{?HEADER_REPLY_TO, V} ->
141141
case lists:any(fun (P) -> lists:prefix(P, V) end, Prefixes) of
@@ -333,7 +333,7 @@ default_params({exchange, _}) ->
333333
[{exclusive, true}, {auto_delete, true}];
334334

335335
default_params({topic, _}) ->
336-
[{exclusive, false}, {auto_delete, true}];
336+
[{auto_delete, true}];
337337

338338
default_params(_) ->
339339
[{exclusive, true},

deps/rabbitmq_stomp/test/python_SUITE.erl

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,16 +46,9 @@ init_per_group(_, Config) ->
4646
{rmq_certspwd, "bunnychow"}
4747
]),
4848
rabbit_ct_helpers:log_environment(),
49-
%% Remove when transient_nonexcl_queues is removed entirely,
50-
%% and fix the Python tests.
51-
Config1 = rabbit_ct_helpers:merge_app_env(
52-
Config0,
53-
{rabbit,
54-
[{permit_deprecated_features, #{transient_nonexcl_queues => true}}]}),
55-
Config2 = rabbit_ct_helpers:run_setup_steps(
56-
Config1,
57-
rabbit_ct_broker_helpers:setup_steps()),
58-
Config2.
49+
rabbit_ct_helpers:run_setup_steps(
50+
Config0,
51+
rabbit_ct_broker_helpers:setup_steps()).
5952

6053
end_per_group(_, Config) ->
6154
rabbit_ct_helpers:run_teardown_steps(Config,

deps/rabbitmq_stomp/test/util_SUITE.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99

1010
-include_lib("eunit/include/eunit.hrl").
1111
-include_lib("amqp_client/include/amqp_client.hrl").
12-
-include_lib("amqp_client/include/rabbit_routing_prefixes.hrl").
12+
-include("rabbit_stomp_routing_prefixes.hrl").
1313
-include("rabbit_stomp_frame.hrl").
1414
-compile(export_all).
1515

@@ -149,7 +149,7 @@ headers_post_process_noop_replyto(_) ->
149149
[begin
150150
Headers = [{"reply-to", Prefix ++ "/something"}],
151151
Headers = rabbit_stomp_util:headers_post_process(Headers)
152-
end || Prefix <- rabbit_routing_util:dest_prefixes()].
152+
end || Prefix <- rabbit_stomp_routing_util:dest_prefixes()].
153153

154154
headers_post_process_noop2(_) ->
155155
Headers = [{"header1", "1"},

0 commit comments

Comments
 (0)