diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 4aac08d0f7fc..861825795d97 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -247,14 +247,14 @@ declare(QueueName = #resource{virtual_host = VHost}, Durable, AutoDelete, Args, end. -spec get_queue_type(Args :: rabbit_framing:amqp_table()) -> rabbit_queue_type:queue_type(). -%% This version is not virtual host metadata-aware but will use -%% the node-wide default type as well as 'rabbit_queue_type:fallback/0'. +%% This version is not virtual host metadata-aware. get_queue_type([]) -> rabbit_queue_type:default(); get_queue_type(Args) -> get_queue_type(Args, rabbit_queue_type:default()). -%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}' +-spec get_queue_type(Args :: rabbit_framing:amqp_table(), DefaultQueueType :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type(). +%% This version should be used together with 'rabbit_vhost:default_queue_type/{1,2}'. get_queue_type([], DefaultQueueType) -> rabbit_queue_type:discover(DefaultQueueType); get_queue_type(Args, DefaultQueueType) -> diff --git a/deps/amqp_client/include/rabbit_routing_prefixes.hrl b/deps/rabbitmq_stomp/include/rabbit_stomp_routing_prefixes.hrl similarity index 87% rename from deps/amqp_client/include/rabbit_routing_prefixes.hrl rename to deps/rabbitmq_stomp/include/rabbit_stomp_routing_prefixes.hrl index f3ea36612289..d0a34e467266 100644 --- a/deps/amqp_client/include/rabbit_routing_prefixes.hrl +++ b/deps/rabbitmq_stomp/include/rabbit_stomp_routing_prefixes.hrl @@ -2,7 +2,7 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -define(QUEUE_PREFIX, "/queue"). diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl index 1abd02d1e12a..b185cc25fb06 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_processor.erl @@ -30,7 +30,8 @@ adapter_info, send_fun, ssl_login_name, peer_addr, %% see rabbitmq/rabbitmq-stomp#39 trailing_lf, auth_mechanism, auth_login, - default_topic_exchange, default_nack_requeue}). + default_topic_exchange, default_nack_requeue, + virtual_host}). -record(subscription, {dest_hdr, ack_mode, multi_ack, description}). @@ -152,7 +153,7 @@ initial_state(Configuration, version = none, pending_receipts = undefined, config = Configuration, - route_state = rabbit_routing_util:init_state(), + route_state = rabbit_stomp_routing_util:init_state(), reply_queues = #{}, frame_transformer = undefined, trailing_lf = application:get_env(rabbitmq_stomp, trailing_lf, true), @@ -556,7 +557,7 @@ with_destination(Command, Frame, State, Fun) -> "'~ts' is not a valid destination.~n" "Valid destination types are: ~ts.~n", [Content, - string:join(rabbit_routing_util:all_dest_prefixes(), + string:join(rabbit_stomp_routing_util:all_dest_prefixes(), ", ")], State) end; not_found -> @@ -606,10 +607,11 @@ do_login(Username, Passwd, VirtualHost, Heartbeat, AdapterInfo, Version, false -> [{?HEADER_SERVER, server_header()} | Headers] end, "", - State#proc_state{session_id = SessionId, - channel = Channel, - connection = Connection, - version = Version}); + State#proc_state{session_id = SessionId, + channel = Channel, + connection = Connection, + version = Version, + virtual_host = VirtualHost}); {error, {auth_failure, _}} -> ?LOG_WARNING("STOMP login failed for user '~ts': authentication failed", [Username]), error("Bad CONNECT", "Access refused for user '" ++ @@ -650,7 +652,6 @@ server_header() -> do_subscribe(Destination, DestHdr, Frame, State = #proc_state{subscriptions = Subs, - route_state = RouteState, channel = Channel, default_topic_exchange = DfltTopicEx}) -> check_subscription_access(Destination, State), @@ -658,7 +659,7 @@ do_subscribe(Destination, DestHdr, Frame, rabbit_stomp_frame:integer_header(Frame, ?HEADER_PREFETCH_COUNT, undefined), {AckMode, IsMulti} = rabbit_stomp_util:ack_mode(Frame), - case ensure_endpoint(source, Destination, Frame, Channel, RouteState) of + case ensure_endpoint(source, Destination, Frame, Channel, State) of {ok, Queue, RouteState1} -> {ok, ConsumerTag, Description} = rabbit_stomp_util:consumer_tag(Frame), @@ -687,7 +688,7 @@ do_subscribe(Destination, DestHdr, Frame, exclusive = false, arguments = Arguments}, self()), - ok = rabbit_routing_util:ensure_binding( + ok = rabbit_stomp_routing_util:ensure_binding( Queue, ExchangeAndKey, Channel) catch exit:Err -> %% it's safe to delete this queue, it @@ -788,9 +789,8 @@ maybe_clean_up_queue(Queue, #proc_state{connection = Connection}) -> do_send(Destination, _DestHdr, Frame = #stomp_frame{body_iolist = BodyFragments}, State = #proc_state{channel = Channel, - route_state = RouteState, default_topic_exchange = DfltTopicEx}) -> - case ensure_endpoint(dest, Destination, Frame, Channel, RouteState) of + case ensure_endpoint(dest, Destination, Frame, Channel, State) of {ok, _Q, RouteState1} -> @@ -912,7 +912,7 @@ ensure_reply_to(Frame = #stomp_frame{headers = Headers}, State) -> {Frame, State}; {ok, ReplyTo} -> {ok, Destination} = rabbit_routing_parser:parse_endpoint(ReplyTo), - case rabbit_routing_util:dest_temp_queue(Destination) of + case rabbit_stomp_routing_util:dest_temp_queue(Destination) of none -> {Frame, State}; TempQueueId -> @@ -1128,7 +1128,8 @@ millis_to_seconds(M) -> M div 1000. ensure_endpoint(_Direction, {queue, []}, _Frame, _Channel, _State) -> {error, {invalid_destination, "Destination cannot be blank"}}; -ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) -> +ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, + #proc_state{virtual_host = VHost, route_state = RouteState}) -> Params = [{subscription_queue_name_gen, fun () -> @@ -1138,16 +1139,20 @@ ensure_endpoint(source, EndPoint, {_, _, Headers, _} = Frame, Channel, State) -> {_, Name} = rabbit_routing_parser:parse_routing(EndPoint), list_to_binary(rabbit_stomp_util:subscription_queue_name(Name, Id, Frame)) end - }] ++ rabbit_stomp_util:build_params(EndPoint, Headers), + }, + {default_queue_type, rabbit_vhost:default_queue_type(VHost)}] + ++ rabbit_stomp_util:build_params(EndPoint, Headers), Arguments = rabbit_stomp_util:build_arguments(Headers), - rabbit_routing_util:ensure_endpoint(source, Channel, EndPoint, - [Arguments | Params], State); + rabbit_stomp_routing_util:ensure_endpoint(source, Channel, EndPoint, + [Arguments | Params], RouteState); -ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel, State) -> - Params = rabbit_stomp_util:build_params(EndPoint, Headers), +ensure_endpoint(Direction, EndPoint, {_, _, Headers, _}, Channel, + #proc_state{virtual_host = VHost, route_state = RouteState}) -> + Params = [{default_queue_type, rabbit_vhost:default_queue_type(VHost)} + | rabbit_stomp_util:build_params(EndPoint, Headers)], Arguments = rabbit_stomp_util:build_arguments(Headers), - rabbit_routing_util:ensure_endpoint(Direction, Channel, EndPoint, - [Arguments | Params], State). + rabbit_stomp_routing_util:ensure_endpoint(Direction, Channel, EndPoint, + [Arguments | Params], RouteState). build_subscription_id(Frame) -> case rabbit_stomp_util:has_durable_header(Frame) of diff --git a/deps/amqp_client/src/rabbit_routing_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_routing_util.erl similarity index 86% rename from deps/amqp_client/src/rabbit_routing_util.erl rename to deps/rabbitmq_stomp/src/rabbit_stomp_routing_util.erl index af43e550e99d..66db2727ebdc 100644 --- a/deps/amqp_client/src/rabbit_routing_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_routing_util.erl @@ -2,17 +2,17 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% --module(rabbit_routing_util). +-module(rabbit_stomp_routing_util). -export([init_state/0, dest_prefixes/0, all_dest_prefixes/0]). -export([ensure_endpoint/4, ensure_endpoint/5, ensure_binding/3]). -export([dest_temp_queue/1]). --include("amqp_client.hrl"). --include("rabbit_routing_prefixes.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stomp_routing_prefixes.hrl"). %%---------------------------------------------------------------------------- @@ -137,17 +137,29 @@ queue_declare_method(#'queue.declare'{} = Method, Type, Params) -> false -> Method#'queue.declare'{auto_delete = true, exclusive = true} end, + %% set the rest of queue.declare fields from Params Method2 = lists:foldl(fun (F, Acc) -> F(Acc, Params) end, Method1, [fun update_queue_declare_arguments/2, fun update_queue_declare_exclusive/2, fun update_queue_declare_auto_delete/2, fun update_queue_declare_nowait/2]), + + Arguments = proplists:get_value(arguments, Params, []), + DefaultQueueType = proplists:get_value(default_queue_type, Params, + rabbit_queue_type:default()), + Method3 = case rabbit_amqqueue:get_queue_type(Arguments, DefaultQueueType) of + T when T =:= rabbit_stream_queue; T =:= rabbit_quorum_queue -> + Method2#'queue.declare'{durable = true, + exclusive = false}; + _ -> Method2 + end, + case {Type, proplists:get_value(subscription_queue_name_gen, Params)} of {topic, SQNG} when is_function(SQNG) -> - Method2#'queue.declare'{queue = SQNG()}; + Method3#'queue.declare'{queue = SQNG()}; {exchange, SQNG} when is_function(SQNG) -> - Method2#'queue.declare'{queue = SQNG()}; + Method3#'queue.declare'{queue = SQNG()}; _ -> - Method2 + Method3 end. diff --git a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl index 3e22a6aaa9b0..b7750cd47ca6 100644 --- a/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl +++ b/deps/rabbitmq_stomp/src/rabbit_stomp_util.erl @@ -17,7 +17,7 @@ -export([trim_headers/1]). -include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("amqp_client/include/rabbit_routing_prefixes.hrl"). +-include("rabbit_stomp_routing_prefixes.hrl"). -include("rabbit_stomp_frame.hrl"). -include("rabbit_stomp_headers.hrl"). @@ -135,7 +135,7 @@ headers_extra(SessionId, AckMode, Version, end. headers_post_process(Headers) -> - Prefixes = rabbit_routing_util:dest_prefixes(), + Prefixes = rabbit_stomp_routing_util:dest_prefixes(), [case Header of {?HEADER_REPLY_TO, V} -> case lists:any(fun (P) -> lists:prefix(P, V) end, Prefixes) of @@ -333,10 +333,11 @@ default_params({exchange, _}) -> [{exclusive, true}, {auto_delete, true}]; default_params({topic, _}) -> - [{exclusive, false}, {auto_delete, true}]; + [{auto_delete, true}]; default_params(_) -> - [{durable, false}]. + [{exclusive, true}, + {durable, false}]. string_to_boolean("True") -> true; diff --git a/deps/rabbitmq_stomp/test/python_SUITE.erl b/deps/rabbitmq_stomp/test/python_SUITE.erl index 1e056c816acb..63051d21fff3 100644 --- a/deps/rabbitmq_stomp/test/python_SUITE.erl +++ b/deps/rabbitmq_stomp/test/python_SUITE.erl @@ -46,16 +46,9 @@ init_per_group(_, Config) -> {rmq_certspwd, "bunnychow"} ]), rabbit_ct_helpers:log_environment(), - %% Remove when transient_nonexcl_queues is removed entirely, - %% and fix the Python tests. - Config1 = rabbit_ct_helpers:merge_app_env( - Config0, - {rabbit, - [{permit_deprecated_features, #{transient_nonexcl_queues => true}}]}), - Config2 = rabbit_ct_helpers:run_setup_steps( - Config1, - rabbit_ct_broker_helpers:setup_steps()), - Config2. + rabbit_ct_helpers:run_setup_steps( + Config0, + rabbit_ct_broker_helpers:setup_steps()). end_per_group(_, Config) -> rabbit_ct_helpers:run_teardown_steps(Config, diff --git a/deps/rabbitmq_stomp/test/util_SUITE.erl b/deps/rabbitmq_stomp/test/util_SUITE.erl index 8f5c411e7e4d..a0177e5eb1fd 100644 --- a/deps/rabbitmq_stomp/test/util_SUITE.erl +++ b/deps/rabbitmq_stomp/test/util_SUITE.erl @@ -9,7 +9,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("amqp_client/include/amqp_client.hrl"). --include_lib("amqp_client/include/rabbit_routing_prefixes.hrl"). +-include("rabbit_stomp_routing_prefixes.hrl"). -include("rabbit_stomp_frame.hrl"). -compile(export_all). @@ -149,7 +149,7 @@ headers_post_process_noop_replyto(_) -> [begin Headers = [{"reply-to", Prefix ++ "/something"}], Headers = rabbit_stomp_util:headers_post_process(Headers) - end || Prefix <- rabbit_routing_util:dest_prefixes()]. + end || Prefix <- rabbit_stomp_routing_util:dest_prefixes()]. headers_post_process_noop2(_) -> Headers = [{"header1", "1"},