Skip to content

Commit d8702f3

Browse files
Extract rabbit_queue_type_common:get_queue_type/2
The STOMP plugin still relies on `amqp_client` (for now), which cannot use `rabbit_vhost`, so we extract a new DQT resolution function that takes the virtual host DQT as an argument. The STOMP plugin then resolves the DQT using `rabbit_vhost` and `rabbit_registry`, passes it to `get_queue_type/2` and decides on the properties based on the canonicalized type name (a module name).
1 parent 8aa7d40 commit d8702f3

4 files changed

Lines changed: 58 additions & 32 deletions

File tree

deps/amqp_client/src/rabbit_routing_util.erl

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -146,11 +146,12 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) ->
146146
fun update_queue_declare_nowait/2]),
147147

148148
Arguments = proplists:get_value(arguments, Params, []),
149-
QueueType = rabbit_amqqueue:get_queue_type(Arguments),
150-
151-
Method3 = case QueueType of
152-
rabbit_stream_queue -> Method2#'queue.declare'{durable = true,
153-
exclusive = false};
149+
DefaultQueueType = proplists:get_value(default_queue_type, Params,
150+
rabbit_queue_type_common:default()),
151+
Method3 = case rabbit_queue_type_common:get_queue_type(Arguments, DefaultQueueType) of
152+
T when T =:= rabbit_stream_queue; T =:= rabbit_quorum_queue ->
153+
Method2#'queue.declare'{durable = true,
154+
exclusive = false};
154155
_ -> Method2
155156
end,
156157

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 5 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -247,27 +247,14 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args,
247247
end.
248248

249249
-spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type().
250-
%% This version is not virtual host metadata-aware but will use
251-
%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'.
252-
get_queue_type([]) ->
253-
rabbit_queue_type:default();
250+
%% This version is not virtual host metadata-aware.
254251
get_queue_type(Args) ->
255-
get_queue_type(Args, rabbit_queue_type:default()).
252+
rabbit_queue_type_common:get_queue_type(Args, rabbit_queue_type:default()).
256253

257-
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'
258-
get_queue_type([], DefaultQueueType) ->
259-
rabbit_queue_type:discover(DefaultQueueType);
254+
-spec get_queue_type(Args :: rabbit_framing:amqp_table(), DefaultQueueType :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
255+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'.
260256
get_queue_type(Args, DefaultQueueType) ->
261-
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
262-
undefined ->
263-
rabbit_queue_type:discover(DefaultQueueType);
264-
{longstr, undefined} ->
265-
rabbit_queue_type:discover(DefaultQueueType);
266-
{longstr, <<"undefined">>} ->
267-
rabbit_queue_type:discover(DefaultQueueType);
268-
{_, V} ->
269-
rabbit_queue_type:discover(V)
270-
end.
257+
rabbit_queue_type_common:get_queue_type(Args, DefaultQueueType).
271258

272259
-spec internal_declare(Queue, Recover) -> Ret when
273260
Queue :: amqqueue:amqqueue(),
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_queue_type_common).
9+
10+
-export([get_queue_type/2, default/0]).
11+
12+
-spec get_queue_type(rabbit_framing:amqp_table(), module()) -> module().
13+
%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'.
14+
get_queue_type([], DefaultQueueType) ->
15+
discover(DefaultQueueType);
16+
get_queue_type(Args, DefaultQueueType) ->
17+
case rabbit_misc:table_lookup(Args, <<"x-queue-type">>) of
18+
undefined -> discover(DefaultQueueType);
19+
{longstr, undefined} -> discover(DefaultQueueType);
20+
{longstr, <<"undefined">>} -> discover(DefaultQueueType);
21+
{_, V} -> discover(V)
22+
end.
23+
24+
%% ------------------------------------------------------------------
25+
26+
-spec default() -> module().
27+
default() ->
28+
rabbit_data_coercion:to_atom(
29+
application:get_env(rabbit, default_queue_type, rabbit_classic_queue)).
30+
31+
discover(TypeDescriptor) ->
32+
{ok, Module} = rabbit_registry:lookup_type_module(queue, TypeDescriptor),
33+
Module.

deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@
3030
adapter_info, send_fun, ssl_login_name, peer_addr,
3131
%% see rabbitmq/rabbitmq-stomp#39
3232
trailing_lf, auth_mechanism, auth_login,
33-
default_topic_exchange, default_nack_requeue}).
33+
default_topic_exchange, default_nack_requeue,
34+
default_queue_type}).
3435

3536
-record(subscription, {dest_hdr, ack_mode, multi_ack, description}).
3637

@@ -606,10 +607,11 @@ do_login(Username, Passwd, VirtualHost, Heartbeat, AdapterInfo, Version,
606607
false -> [{?HEADER_SERVER, server_header()} | Headers]
607608
end,
608609
"",
609-
State#proc_state{session_id = SessionId,
610-
channel = Channel,
611-
connection = Connection,
612-
version = Version});
610+
State#proc_state{session_id = SessionId,
611+
channel = Channel,
612+
connection = Connection,
613+
version = Version,
614+
default_queue_type = rabbit_vhost:default_queue_type(VirtualHost)});
613615
{error, {auth_failure, _}} ->
614616
?LOG_WARNING("STOMP login failed for user '~ts': authentication failed", [Username]),
615617
error("Bad CONNECT", "Access refused for user '" ++
@@ -1128,7 +1130,8 @@ millis_to_seconds(M) -> M div 1000.
11281130
ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) ->
11291131
{error, {invalid_destination, "Destination cannot be blank"}};
11301132

1131-
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) ->
1133+
ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel,
1134+
State = #proc_state{default_queue_type = DQT}) ->
11321135
Params =
11331136
[{subscription_queue_name_gen,
11341137
fun () ->
@@ -1138,13 +1141,15 @@ ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) ->
11381141
{_, Name} = rabbit_routing_parser:parse_routing(EndPoint),
11391142
list_to_binary(rabbit_stomp_util:subscription_queue_name(Name, Id, Frame))
11401143
end
1141-
}] ++ rabbit_stomp_util:build_params(EndPoint, Headers),
1144+
},
1145+
{default_queue_type, DQT}] ++ rabbit_stomp_util:build_params(EndPoint, Headers),
11421146
Arguments = rabbit_stomp_util:build_arguments(Headers),
11431147
rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint,
11441148
[Arguments | Params], State);
11451149

1146-
ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel, State) ->
1147-
Params = rabbit_stomp_util:build_params(EndPoint, Headers),
1150+
ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel,
1151+
State = #proc_state{default_queue_type = DQT}) ->
1152+
Params = [{default_queue_type, DQT} | rabbit_stomp_util:build_params(EndPoint, Headers)],
11481153
Arguments = rabbit_stomp_util:build_arguments(Headers),
11491154
rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint,
11501155
[Arguments | Params], State).

0 commit comments

Comments
 (0)