diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index d4efdc1f7dcb..1a5c1c59c55b 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -255,3 +255,14 @@ post_enable => {rabbit_khepri, topic_binding_projection_post_enable}} }}). + +-rabbit_feature_flag( + {topic_binding_projection_v5, + #{desc => "Enable the topic binding Khepri projection v5", + stability => stable, + depends_on => [topic_binding_projection_v4], + callbacks => #{enable => + {rabbit_khepri, topic_binding_projection_v5_enable}, + post_enable => + {rabbit_khepri, topic_binding_projection_v5_post_enable}} + }}). diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl index 10267ecd407a..83ecedf2c80b 100644 --- a/deps/rabbit/src/rabbit_db_topic_exchange.erl +++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl @@ -16,9 +16,6 @@ -define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3). --define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4). --define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4). - -type match_result() :: [rabbit_types:binding_destination() | {rabbit_amqqueue:name(), rabbit_types:binding_key()}]. @@ -39,8 +36,14 @@ match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) -> Words = split_topic_key_binary(RoutingKey), case rabbit_khepri:get_effective_topic_binding_projection_version() of V when V >= 4 -> + XSrc = {VHost, XName}, + {TrieTab, BindingTab} = rabbit_khepri:topic_trie_table_names(V), + Root = case V of + 4 -> root; + _ -> {root, XSrc} + end, try - trie_match({VHost, XName}, root, Words, BKeys, []) + trie_match(XSrc, TrieTab, BindingTab, Root, Words, BKeys, []) catch error:badarg -> [] @@ -87,37 +90,36 @@ split_topic_key_binary(RoutingKey) -> %% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2. %% ============================================================== -trie_match(XSrc, Node, [], BKeys, Acc0) -> - Acc1 = trie_bindings(Node, BKeys, Acc0), - trie_match_try(XSrc, Node, <<"#">>, - fun trie_match_skip_any/5, +trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc0) -> + Acc1 = trie_bindings(BindTab, Node, BKeys, Acc0), + trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>, + fun trie_match_skip_any/7, [], BKeys, Acc1); -trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) -> - Acc1 = trie_match_try(XSrc, Node, W, - fun trie_match/5, +trie_match(XSrc, TrieTab, BindTab, Node, [W | RestW] = Words, BKeys, Acc0) -> + Acc1 = trie_match_try(XSrc, TrieTab, BindTab, Node, W, + fun trie_match/7, RestW, BKeys, Acc0), - Acc2 = trie_match_try(XSrc, Node, <<"*">>, - fun trie_match/5, + Acc2 = trie_match_try(XSrc, TrieTab, BindTab, Node, <<"*">>, + fun trie_match/7, RestW, BKeys, Acc1), - trie_match_try(XSrc, Node, <<"#">>, - fun trie_match_skip_any/5, + trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>, + fun trie_match_skip_any/7, Words, BKeys, Acc2). -trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) -> - case ets:lookup_element(?TOPIC_TRIE_PROJECTION, - {XSrc, Node, Word}, 2, undefined) of +trie_match_try(XSrc, TrieTab, BindTab, Node, Word, MatchFun, RestW, BKeys, Acc) -> + case ets:lookup_element(TrieTab, {XSrc, Node, Word}, 2, undefined) of undefined -> Acc; NextNode -> - MatchFun(XSrc, NextNode, RestW, BKeys, Acc) + MatchFun(XSrc, TrieTab, BindTab, NextNode, RestW, BKeys, Acc) end. -trie_match_skip_any(XSrc, Node, [], BKeys, Acc) -> - trie_match(XSrc, Node, [], BKeys, Acc); -trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) -> +trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc) -> + trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc); +trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [_ | RestW] = Words, BKeys, Acc) -> trie_match_skip_any( - XSrc, Node, RestW, BKeys, - trie_match(XSrc, Node, Words, BKeys, Acc)). + XSrc, TrieTab, BindTab, Node, RestW, BKeys, + trie_match(XSrc, TrieTab, BindTab, Node, Words, BKeys, Acc)). %% Collect all destinations bound at the given trie node. %% @@ -129,15 +131,15 @@ trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) -> %% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan, %% which is cheaper than F individual ets:next/2 calls %% (each O(log N) due to CATree fresh-stack allocation). -trie_bindings(NodeId, BKeys, Acc) -> +trie_bindings(BindingTab, NodeId, BKeys, Acc) -> StartKey = {NodeId, <<>>, {}}, - case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of + case ets:next(BindingTab, StartKey) of {NodeId, BKey1, Dest1} = Key1 -> - case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of + case ets:next(BindingTab, Key1) of {NodeId, BKey2, Dest2} = Key2 -> - case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of + case ets:next(BindingTab, Key2) of {NodeId, _, _} -> - collect_select(NodeId, BKeys, Acc); + collect_select(BindingTab, NodeId, BKeys, Acc); _ -> Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc), collect_binding(Dest2, BKey2, BKeys, Acc1) @@ -154,12 +156,12 @@ collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) -> collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) -> [Dest | Acc]. -collect_select(NodeId, false, Acc) -> - Dests = ets:select(?TOPIC_BINDING_PROJECTION, +collect_select(BindingTab, NodeId, false, Acc) -> + Dests = ets:select(BindingTab, [{{{NodeId, '_', '$1'}}, [], ['$1']}]), Dests ++ Acc; -collect_select(NodeId, true, Acc) -> - DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION, +collect_select(BindingTab, NodeId, true, Acc) -> + DestsAndBKeys = ets:select(BindingTab, [{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]), format_dest_bkeys(DestsAndBKeys, Acc). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 17d4c498cffa..17a3771fb90a 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -182,11 +182,14 @@ supports_rabbit_khepri_topic_trie_version/0]). %% Called locally to determine which projection to use. --export([get_effective_topic_binding_projection_version/0]). +-export([get_effective_topic_binding_projection_version/0, + topic_trie_table_names/1]). %% Used during topic binding projections related feature flags handling. -export([topic_binding_projection_enable/1, topic_binding_projection_post_enable/1]). +-export([topic_binding_projection_v5_enable/1, + topic_binding_projection_v5_post_enable/1]). -ifdef(TEST). -export([register_projections/0, @@ -1564,8 +1567,8 @@ projection_fun_for_sets(MapFun) -> register_rabbit_topic_binding_projection() -> case get_effective_topic_binding_projection_version() of - V when V >= 4 -> register_rabbit_topic_trie_projection(); - _ -> register_rabbit_topic_graph_projection() + V when V >= 4 -> register_rabbit_topic_trie_projection(V); + _ -> register_rabbit_topic_graph_projection() end. %% Topic routing via trie ETS projection (v3). @@ -1648,7 +1651,7 @@ register_rabbit_topic_graph_projection() -> _ = unregister_old_rabbit_topic_trie_projections(), khepri:register_projection(?STORE_ID, PathPattern, Projection). -%% Topic routing via trie + ordered_set ETS projections (v4). +%% Topic routing via trie + ordered_set ETS projections. %% Uses a single Khepri projection with two ETS tables: %% %% Trie edges table (set) for fast trie navigation during routing: @@ -1658,14 +1661,11 @@ register_rabbit_topic_graph_projection() -> %% Row: {{NodeId, BindingKey, Dest}} %% %% XSrc = {VHost, ExchangeName} (binaries) -%% NodeId = root | reference() +%% NodeId = {root, XSrc} (v5) | root (v4) | reference() %% Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>) %% ChildCount = non_neg_integer() (number of outgoing edges) %% Dest = #resource{} -%% -%% This projection is only used once the `topic_binding_projection_v4' feature -%% flag is enabled. -register_rabbit_topic_trie_projection() -> +register_rabbit_topic_trie_projection(Vsn) -> ShouldProcessFun = fun (rabbit_db_topic_exchange, split_topic_key_binary, 1, _From) -> %% This function uses `persistent_term' to store a lazily compiled @@ -1681,27 +1681,30 @@ register_rabbit_topic_trie_projection() -> (M, F, A, From) -> khepri_tx_adv:should_process_function(M, F, A, From) end, - Opts = #{tables => #{rabbit_khepri_topic_trie_v4 => - #{type => set}, - rabbit_khepri_topic_binding_v4 => - #{type => ordered_set}}, + {TrieTabName, BindingTabName} = topic_trie_table_names(Vsn), + Opts = #{tables => #{TrieTabName => #{type => set}, + BindingTabName => #{type => ordered_set}}, keypos => 1, read_concurrency => true, standalone_fun_options => #{should_process_function => ShouldProcessFun}}, PFun = fun(Tables, Path, OldProps, NewProps) -> - #{rabbit_khepri_topic_trie_v4 := TrieTab, - rabbit_khepri_topic_binding_v4 := BindingTab} = Tables, + #{TrieTabName := TrieTab, + BindingTabName := BindingTab} = Tables, {VHost, ExchangeName, Kind, DstName, BindingKey} = rabbit_db_binding:khepri_route_path_to_args(Path), XSrc = {VHost, ExchangeName}, Dest = rabbit_misc:r(VHost, Kind, DstName), Words = rabbit_db_topic_exchange:split_topic_key_binary(BindingKey), + Root = case Vsn of + 4 -> root; + _ -> {root, XSrc} + end, case {OldProps, NewProps} of {_, #{data := _}} -> - LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Words), + LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Root, Words), ets:insert(BindingTab, {{LeafNodeId, BindingKey, Dest}}); {#{data := _}, _} -> - case trie_follow_down_get_path(TrieTab, XSrc, Words) of + case trie_follow_down_get_path(TrieTab, XSrc, Root, Words) of {ok, LeafNodeId, TriePath} -> ets:delete(BindingTab, {LeafNodeId, BindingKey, Dest}), trie_gc_path(TrieTab, BindingTab, TriePath); @@ -1712,7 +1715,7 @@ register_rabbit_topic_trie_projection() -> ok end end, - Projection = khepri_projection:new(rabbit_khepri_topic_trie_v4, PFun, Opts), + Projection = khepri_projection:new(TrieTabName, PFun, Opts), PathPattern = topic_binding_path_pattern(), unregister_old_rabbit_topic_trie_projections(), khepri:register_projection(?STORE_ID, PathPattern, Projection). @@ -1732,8 +1735,8 @@ topic_binding_path_pattern() -> %% Each trie row is a 3-tuple: {Key, ChildNodeId, ChildCount}. %% ChildCount tracks the number of outgoing edges from ChildNodeId. %% It is incremented when a new edge is created, decremented during GC. -trie_follow_down_create(TrieTab, XSrc, Words) -> - trie_follow_down_create(TrieTab, XSrc, root, none, Words). +trie_follow_down_create(TrieTab, XSrc, Root, Words) -> + trie_follow_down_create(TrieTab, XSrc, Root, none, Words). trie_follow_down_create(_TrieTab, _XSrc, NodeId, _ParentKey, []) -> NodeId; @@ -1756,8 +1759,8 @@ trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) -> %% Walk down the trie following the given words, collecting the path %% for later GC. Returns {ok, LeafNodeId, Path} or error. -trie_follow_down_get_path(TrieTab, XSrc, Words) -> - trie_follow_down_get_path(TrieTab, XSrc, root, none, Words, []). +trie_follow_down_get_path(TrieTab, XSrc, Root, Words) -> + trie_follow_down_get_path(TrieTab, XSrc, Root, none, Words, []). trie_follow_down_get_path(_TrieTab, _XSrc, NodeId, _ParentKey, [], Path) -> {ok, NodeId, Path}; @@ -1811,14 +1814,46 @@ supports_rabbit_khepri_topic_trie_v2() -> -spec supports_rabbit_khepri_topic_trie_version() -> non_neg_integer(). supports_rabbit_khepri_topic_trie_version() -> - 4. + 5. + +-spec topic_trie_table_names(non_neg_integer()) -> {atom(), atom()}. +topic_trie_table_names(V) when V >= 5 -> + {rabbit_khepri_topic_trie_v5, + rabbit_khepri_topic_binding_v5}; +topic_trie_table_names(4) -> + {rabbit_khepri_topic_trie_v4, + rabbit_khepri_topic_binding_v4}. get_effective_topic_binding_projection_version() -> - IsEnabled = rabbit_feature_flags:is_enabled( - topic_binding_projection_v4, non_blocking), - case IsEnabled of - true -> 4; - _ -> 3 + IsV5 = rabbit_feature_flags:is_enabled(topic_binding_projection_v5, + non_blocking), + case IsV5 of + true -> + 5; + _ -> + IsV4 = rabbit_feature_flags:is_enabled(topic_binding_projection_v4, + non_blocking), + case IsV4 of + true -> + 4; + _ -> + 3 + end + end. + +topic_binding_projection_v5_enable( + #{feature_name := topic_binding_projection_v5 = FeatureName}) -> + ?LOG_DEBUG( + "Feature flag `~s`: register topic binding projection v5", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + case register_rabbit_topic_trie_projection(5) of + ok -> + ok; + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error end. topic_binding_projection_enable( @@ -1827,7 +1862,7 @@ topic_binding_projection_enable( "Feature flag `~s`: register topic binding projection v4", [FeatureName], #{domain => ?RMQLOG_DOMAIN_DB}), - case register_rabbit_topic_trie_projection() of + case register_rabbit_topic_trie_projection(4) of ok -> ok; {error, {khepri, projection_already_exists, _Info}} -> @@ -1836,6 +1871,15 @@ topic_binding_projection_enable( Error end. +topic_binding_projection_v5_post_enable( + #{feature_name := topic_binding_projection_v5 = FeatureName}) -> + ?LOG_DEBUG( + "Feature flag `~s`: unregister old topic binding projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + unregister_old_rabbit_topic_trie_projections(), + ok. + topic_binding_projection_post_enable( #{feature_name := topic_binding_projection_v4 = FeatureName}) -> ?LOG_DEBUG( @@ -1849,7 +1893,11 @@ unregister_old_rabbit_topic_trie_projections() -> OldProjections0 = [{1, rabbit_khepri_topic_trie}, {2, rabbit_khepri_topic_trie_v2}], OldProjections1 = case get_effective_topic_binding_projection_version() of - V when V >= 4 -> + V when V >= 5 -> + OldProjections0 ++ + [{3, rabbit_khepri_topic_trie_v3}, + {4, rabbit_khepri_topic_trie_v4}]; + 4 -> OldProjections0 ++ [{3, rabbit_khepri_topic_trie_v3}]; _ -> diff --git a/deps/rabbit/test/bindings_SUITE.erl b/deps/rabbit/test/bindings_SUITE.erl index 79c1d43d4f45..2150bf38ba0c 100644 --- a/deps/rabbit/test/bindings_SUITE.erl +++ b/deps/rabbit/test/bindings_SUITE.erl @@ -46,6 +46,7 @@ all_tests() -> unbind_from_volatile_queue, binding_args_direct_exchange, binding_args_fanout_exchange, + topic_exchange_zero_words, %% Exchange bindings bind_and_unbind_direct_exchange, @@ -706,6 +707,96 @@ binding_args(Exchange, Config) -> ?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}}, amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})). +%% Test case for https://github.com/rabbitmq/rabbitmq-server/discussions/16221 +%% +%% "The routing key used for a topic exchange MUST consist of zero or more words delimited by dots. +%% Each word may contain the letters A-Z and a-z and digits 0-9. +%% The routing pattern follows the same rules as the routing key with the addition +%% that * matches a single word, and # matches zero or more words." +%% [AMQP 0.9.1] +%% +%% Here, we test a zero words routing key and routing pattern. +topic_exchange_zero_words(Config) -> + ZeroWords = <<>>, + + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + + X1 = <<"x1">>, + X2 = <<"x2">>, + Q1 = <<"q1">>, + Q2 = <<"q2">>, + + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X1, + type = <<"topic">>}), + #'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X2, + type = <<"topic">>}), + #'queue.declare_ok'{} = declare(Ch, Q1, []), + #'queue.declare_ok'{} = declare(Ch, Q2, []), + + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X1, + queue = Q1, + routing_key = ZeroWords}), + #'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X2, + queue = Q2, + routing_key = ZeroWords}), + + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Ch, self()), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = X1, + routing_key = ZeroWords}, + #amqp_msg{payload = <<"m1">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + %% m1 must arrive at Q1 and must not leak into Q2 + ?assertMatch(#'queue.declare_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.declare'{queue = Q1, passive = true})), + ?assertMatch(#'queue.declare_ok'{message_count = 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = Q2, passive = true})), + + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = X2, + routing_key = ZeroWords}, + #amqp_msg{payload = <<"m2">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + %% m2 must arrive at Q2 and must not leak into Q1. + ?assertMatch(#'queue.declare_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.declare'{queue = Q1, passive = true})), + ?assertMatch(#'queue.declare_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.declare'{queue = Q2, passive = true})), + + %% Unbinding the empty key stops message routing. + %% Zero words means the root is also the leaf, so `trie_follow_down_get_path/4` + %% returns an empty trie path and `trie_gc_path/3` will not have any edges to prune. + #'queue.unbind_ok'{} = amqp_channel:call(Ch, #'queue.unbind'{exchange = X1, + queue = Q1, + routing_key = ZeroWords}), + ok = amqp_channel:cast(Ch, + #'basic.publish'{exchange = X1, + routing_key = ZeroWords}, + #amqp_msg{payload = <<"m3">>}), + receive #'basic.ack'{} -> ok + after 9000 -> ct:fail(confirm_timeout) + end, + %% For completeness' sake: + %% m3 must not reach Q1 (unbound) or Q2 (bound to a different exchange) + ?assertMatch(#'queue.declare_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.declare'{queue = Q1, passive = true})), + ?assertMatch(#'queue.declare_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.declare'{queue = Q2, passive = true})), + + ?assertMatch(#'queue.delete_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q1})), + ?assertMatch(#'queue.delete_ok'{message_count = 1}, + amqp_channel:call(Ch, #'queue.delete'{queue = Q2})), + #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X1}), + #'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X2}), + rabbit_ct_client_helpers:close_channel(Ch). + bind_and_unbind_direct_exchange(Config) -> bind_and_unbind_exchange(<<"direct">>, Config). diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl index fb2e4aeea94e..d7bad013d916 100644 --- a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl @@ -14,8 +14,8 @@ -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). --define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4). --define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4). +-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v5). +-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v5). -export([all/0, groups/0, @@ -90,7 +90,7 @@ init_per_group(cluster_size_3 = _Group, Config0) -> case Config1 of _ when is_list(Config1) -> Ret = rabbit_ct_broker_helpers:enable_feature_flag( - Config1, topic_binding_projection_v4), + Config1, topic_binding_projection_v5), case Ret of ok -> Config1; diff --git a/release-notes/4.3.1.md b/release-notes/4.3.1.md new file mode 100644 index 000000000000..44cb5532f0bb --- /dev/null +++ b/release-notes/4.3.1.md @@ -0,0 +1,16 @@ +## RabbitMQ 4.3.1 + +RabbitMQ `4.3.1` is a maintenance release. + +### Core Server + +#### Bug Fixes + +* **Topic Exchange Routing with Empty Binding Keys** + + In RabbitMQ 4.3.0, the following bug was introduced after feature flag `topic_binding_projection_v4` got enabled: + If a queue was bound to a topic exchange using an empty binding key (`""`), messages published to **any** topic exchange with an empty routing key would be incorrectly routed to that queue. + + This issue has been fixed in 4.3.1. To apply the fix, enable the new `topic_binding_projection_v5` feature flag after upgrading. + + PR: [#16271](https://github.com/rabbitmq/rabbitmq-server/pull/16271)