diff --git a/deps/rabbit/src/rabbit_core_ff.erl b/deps/rabbit/src/rabbit_core_ff.erl index aaf41ee51861..d4efdc1f7dcb 100644 --- a/deps/rabbit/src/rabbit_core_ff.erl +++ b/deps/rabbit/src/rabbit_core_ff.erl @@ -244,3 +244,14 @@ {rabbit_db_queue, tie_binding_to_dest_with_keep_while_cond_enable}} }}). + +-rabbit_feature_flag( + {topic_binding_projection_v4, + #{desc => "Enable the topic binding Khepri projection v4", + stability => stable, + depends_on => ['rabbitmq_4.3.0'], + callbacks => #{enable => + {rabbit_khepri, topic_binding_projection_enable}, + post_enable => + {rabbit_khepri, topic_binding_projection_post_enable}} + }}). diff --git a/deps/rabbit/src/rabbit_db_topic_exchange.erl b/deps/rabbit/src/rabbit_db_topic_exchange.erl index 80067e4e0f69..10267ecd407a 100644 --- a/deps/rabbit/src/rabbit_db_topic_exchange.erl +++ b/deps/rabbit/src/rabbit_db_topic_exchange.erl @@ -14,7 +14,10 @@ %% Used by Khepri projections and the Mnesia-to-Khepri migration. -export([split_topic_key_binary/1]). --define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v3). +-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3). + +-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4). +-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4). -type match_result() :: [rabbit_types:binding_destination() | {rabbit_amqqueue:name(), rabbit_types:binding_key()}]. @@ -31,14 +34,20 @@ %% %% @private -match(XName, RoutingKey, Opts) -> +match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) -> BKeys = maps:get(return_binding_keys, Opts, false), Words = split_topic_key_binary(RoutingKey), - trie_match_in_khepri(XName, Words, BKeys). - -%% -------------------------------------------------------------- -%% split_topic_key_binary(). -%% -------------------------------------------------------------- + case rabbit_khepri:get_effective_topic_binding_projection_version() of + V when V >= 4 -> + try + trie_match({VHost, XName}, root, Words, BKeys, []) + catch + error:badarg -> + [] + end; + _ -> + trie_match_v3(X, Words, BKeys) + end. -spec split_topic_key_binary(RoutingKey) -> Words when RoutingKey :: binary(), @@ -58,71 +67,154 @@ split_topic_key_binary(RoutingKey) -> end, binary:split(RoutingKey, Pattern, [global]). -%% -------------------------------------------------------------- -%% Internal -%% -------------------------------------------------------------- +%% ============================================================== +%% Trie-based routing +%% +%% Uses two ETS tables: +%% +%% 1. Trie edges table (set): {Key, ChildNodeId, ChildCount} +%% Key = {XSrc, ParentNodeId, Word} +%% Navigation: ets:lookup_element/4 for O(1) edge traversal. +%% +%% 2. Leaf bindings table (ordered_set): {{NodeId, BindingKey, Dest}} +%% Collection: ets:next/2 probes for fanout 0-2 (fast path), then +%% ets:select/2 with a partially bound key for fanout > 2 (does an +%% O(log N) seek followed by O(F) range scan). +%% +%% Routing walks the trie (branching on literal word, <<"*">>, <<"#">>) +%% then collects destinations from the bindings table at each matching +%% leaf. This is O(depth * 3) for the trie walk, plus O(log N) per +%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2. +%% ============================================================== --spec add_matched([rabbit_types:binding_destination() | - {rabbit_types:binding_destination(), BindingArgs :: list()}], - ReturnBindingKeys :: boolean(), - match_result()) -> - match_result(). -add_matched(Destinations, false, Acc) -> - Destinations ++ Acc; -add_matched(DestinationsArgs, true, Acc) -> - lists:foldl( - fun({DestQ = #resource{kind = queue}, BindingArgs}, L) -> - case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of - {longstr, BKey} -> - [{DestQ, BKey} | L]; - _ -> - [DestQ | L] - end; - ({DestX, _BindingArgs}, L) -> - [DestX | L] - end, Acc, DestinationsArgs). +trie_match(XSrc, Node, [], BKeys, Acc0) -> + Acc1 = trie_bindings(Node, BKeys, Acc0), + trie_match_try(XSrc, Node, <<"#">>, + fun trie_match_skip_any/5, + [], BKeys, Acc1); +trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) -> + Acc1 = trie_match_try(XSrc, Node, W, + fun trie_match/5, + RestW, BKeys, Acc0), + Acc2 = trie_match_try(XSrc, Node, <<"*">>, + fun trie_match/5, + RestW, BKeys, Acc1), + trie_match_try(XSrc, Node, <<"#">>, + fun trie_match_skip_any/5, + Words, BKeys, Acc2). + +trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) -> + case ets:lookup_element(?TOPIC_TRIE_PROJECTION, + {XSrc, Node, Word}, 2, undefined) of + undefined -> + Acc; + NextNode -> + MatchFun(XSrc, NextNode, RestW, BKeys, Acc) + end. -%% Khepri topic graph +trie_match_skip_any(XSrc, Node, [], BKeys, Acc) -> + trie_match(XSrc, Node, [], BKeys, Acc); +trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) -> + trie_match_skip_any( + XSrc, Node, RestW, BKeys, + trie_match(XSrc, Node, Words, BKeys, Acc)). -trie_match_in_khepri(X, Words, BKeys) -> +%% Collect all destinations bound at the given trie node. +%% +%% Uses ets:next/2 for up to two elements (fast path for the common +%% fanout 0-2 cases), then switches to ets:select/2 when fanout > 2. +%% +%% ets:select/2 occurs the expensive match spec compilation overhead. +%% For larger fanouts, the cost for compiling the match spec amortises. +%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan, +%% which is cheaper than F individual ets:next/2 calls +%% (each O(log N) due to CATree fresh-stack allocation). +trie_bindings(NodeId, BKeys, Acc) -> + StartKey = {NodeId, <<>>, {}}, + case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of + {NodeId, BKey1, Dest1} = Key1 -> + case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of + {NodeId, BKey2, Dest2} = Key2 -> + case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of + {NodeId, _, _} -> + collect_select(NodeId, BKeys, Acc); + _ -> + Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc), + collect_binding(Dest2, BKey2, BKeys, Acc1) + end; + _ -> + collect_binding(Dest1, BKey1, BKeys, Acc) + end; + _ -> + Acc + end. + +collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) -> + [{Dest, BindingKey} | Acc]; +collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) -> + [Dest | Acc]. + +collect_select(NodeId, false, Acc) -> + Dests = ets:select(?TOPIC_BINDING_PROJECTION, + [{{{NodeId, '_', '$1'}}, [], ['$1']}]), + Dests ++ Acc; +collect_select(NodeId, true, Acc) -> + DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION, + [{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]), + format_dest_bkeys(DestsAndBKeys, Acc). + +format_dest_bkeys([], Acc) -> + Acc; +format_dest_bkeys([{#resource{kind = queue} = Dest, BKey} | Rest], Acc) -> + format_dest_bkeys(Rest, [{Dest, BKey} | Acc]); +format_dest_bkeys([{Dest, _BKey} | Rest], Acc) -> + format_dest_bkeys(Rest, [Dest | Acc]). + +%% ============================================================== +%% Old v3 Khepri topic graph. +%% Delete these *_v3 functions when feature flag +%% topic_binding_projection_v4 becomes required. +%% ============================================================== + +trie_match_v3(X, Words, BKeys) -> try - trie_match_in_khepri(X, root, Words, BKeys, []) + trie_match_v3(X, root, Words, BKeys, []) catch error:badarg -> [] end. -trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) -> - Destinations = trie_bindings_in_khepri(X, Node, BKeys), - ResAcc = add_matched(Destinations, BKeys, ResAcc0), - trie_match_part_in_khepri( +trie_match_v3(X, Node, [], BKeys, ResAcc0) -> + Destinations = trie_bindings_v3(X, Node, BKeys), + ResAcc = add_matched_v3(Destinations, BKeys, ResAcc0), + trie_match_part_v3( X, Node, <<"#">>, - fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc); -trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) -> + fun trie_match_skip_any_v3/5, [], BKeys, ResAcc); +trie_match_v3(X, Node, [W | RestW] = Words, BKeys, ResAcc) -> lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) -> - trie_match_part_in_khepri( + trie_match_part_v3( X, Node, WArg, MatchFun, RestWArg, BKeys, Acc) - end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW}, - {<<"*">>, fun trie_match_in_khepri/5, RestW}, + end, ResAcc, [{W, fun trie_match_v3/5, RestW}, + {<<"*">>, fun trie_match_v3/5, RestW}, {<<"#">>, - fun trie_match_skip_any_in_khepri/5, Words}]). + fun trie_match_skip_any_v3/5, Words}]). -trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) -> - case trie_child_in_khepri(X, Node, Search) of +trie_match_part_v3(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) -> + case trie_child_v3(X, Node, Search) of {ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc); error -> ResAcc end. -trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) -> - trie_match_in_khepri(X, Node, [], BKeys, ResAcc); -trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) -> - trie_match_skip_any_in_khepri( +trie_match_skip_any_v3(X, Node, [], BKeys, ResAcc) -> + trie_match_v3(X, Node, [], BKeys, ResAcc); +trie_match_skip_any_v3(X, Node, [_ | RestW] = Words, BKeys, ResAcc) -> + trie_match_skip_any_v3( X, Node, RestW, BKeys, - trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)). + trie_match_v3(X, Node, Words, BKeys, ResAcc)). -trie_child_in_khepri(X, Node, Word) -> +trie_child_v3(X, Node, Word) -> case ets:lookup( - ?KHEPRI_PROJECTION, + ?KHEPRI_PROJECTION_V3, #trie_edge{exchange_name = X, node_id = Node, word = Word}) of @@ -130,9 +222,9 @@ trie_child_in_khepri(X, Node, Word) -> [] -> error end. -trie_bindings_in_khepri(X, Node, BKeys) -> +trie_bindings_v3(X, Node, BKeys) -> case ets:lookup( - ?KHEPRI_PROJECTION, + ?KHEPRI_PROJECTION_V3, #trie_edge{exchange_name = X, node_id = Node, word = bindings}) of @@ -147,3 +239,18 @@ trie_bindings_in_khepri(X, Node, BKeys) -> [] -> [] end. + +add_matched_v3(Destinations, false, Acc) -> + Destinations ++ Acc; +add_matched_v3(DestinationsArgs, true, Acc) -> + lists:foldl( + fun({DestQ = #resource{kind = queue}, BindingArgs}, L) -> + case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of + {longstr, BKey} -> + [{DestQ, BKey} | L]; + _ -> + [DestQ | L] + end; + ({DestX, _BindingArgs}, L) -> + [DestX | L] + end, Acc, DestinationsArgs). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 7210d23c71a2..e7ee3e779a71 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -181,6 +181,13 @@ -export([supports_rabbit_khepri_topic_trie_v2/0, supports_rabbit_khepri_topic_trie_version/0]). +%% Called locally to determine which projection to use. +-export([get_effective_topic_binding_projection_version/0]). + +%% Used during topic binding projections related feature flags handling. +-export([topic_binding_projection_enable/1, + topic_binding_projection_post_enable/1]). + -ifdef(TEST). -export([register_projections/0]). -endif. @@ -1329,7 +1336,7 @@ register_projections() -> fun register_rabbit_bindings_projection/0, fun register_rabbit_route_by_source_key_projection/0, fun register_rabbit_route_by_source_projection/0, - fun register_rabbit_topic_graph_projection/0], + fun register_rabbit_topic_binding_projection/0], rabbit_misc:for_each_while_ok( fun(RegisterFun) -> case RegisterFun() of @@ -1549,6 +1556,17 @@ projection_fun_for_sets(MapFun) -> ok end. +register_rabbit_topic_binding_projection() -> + case get_effective_topic_binding_projection_version() of + V when V >= 4 -> register_rabbit_topic_trie_projection(); + _ -> register_rabbit_topic_graph_projection() + end. + +%% Topic routing via trie ETS projection (v3). +%% +%% This is kept for backward compatibility only. This projection is unused +%% after the `topic_binding_projection_v4' feature flag is enabled. It +%% can be removed once this feature flag becomes required. register_rabbit_topic_graph_projection() -> Name = rabbit_khepri_topic_trie_v3, %% This projection calls some external functions which are disallowed by @@ -1624,18 +1642,215 @@ register_rabbit_topic_graph_projection() -> _ = unregister_old_rabbit_topic_trie_projections(), khepri:register_projection(?STORE_ID, PathPattern, Projection). +%% Topic routing via trie + ordered_set ETS projections (v4). +%% Uses a single Khepri projection with two ETS tables: +%% +%% Trie edges table (set) for fast trie navigation during routing: +%% Row: {{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount} +%% +%% Leaf bindings table (ordered_set) for collecting destinations: +%% Row: {{NodeId, BindingKey, Dest}} +%% +%% XSrc = {VHost, ExchangeName} (binaries) +%% NodeId = root | reference() +%% Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>) +%% ChildCount = non_neg_integer() (number of outgoing edges) +%% Dest = #resource{} +%% +%% This projection is only used once the `topic_binding_projection_v4' feature +%% flag is enabled. +register_rabbit_topic_trie_projection() -> + ShouldProcessFun = + fun (rabbit_db_topic_exchange, split_topic_key_binary, 1, _From) -> + %% This function uses `persistent_term' to store a lazily compiled + %% binary pattern. + false; + (erlang, make_ref, 0, _From) -> + %% References are used for trie node IDs. They are non-deterministic + %% across cluster members, but that's fine since both tables live + %% only as long as the projection and are rebuilt on restart. + false; + (ets, _F, _A, _From) -> + false; + (M, F, A, From) -> + khepri_tx_adv:should_process_function(M, F, A, From) + end, + Opts = #{tables => #{rabbit_khepri_topic_trie_v4 => + #{type => set}, + rabbit_khepri_topic_binding_v4 => + #{type => ordered_set}}, + keypos => 1, + read_concurrency => true, + standalone_fun_options => #{should_process_function => ShouldProcessFun}}, + PFun = fun(Tables, Path, OldProps, NewProps) -> + #{rabbit_khepri_topic_trie_v4 := TrieTab, + rabbit_khepri_topic_binding_v4 := BindingTab} = Tables, + {VHost, ExchangeName, Kind, DstName, BindingKey} = + rabbit_db_binding:khepri_route_path_to_args(Path), + XSrc = {VHost, ExchangeName}, + Dest = rabbit_misc:r(VHost, Kind, DstName), + Words = rabbit_db_topic_exchange:split_topic_key_binary(BindingKey), + case {OldProps, NewProps} of + {_, #{data := _}} -> + LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Words), + ets:insert(BindingTab, {{LeafNodeId, BindingKey, Dest}}); + {#{data := _}, _} -> + case trie_follow_down_get_path(TrieTab, XSrc, Words) of + {ok, LeafNodeId, TriePath} -> + ets:delete(BindingTab, {LeafNodeId, BindingKey, Dest}), + trie_gc_path(TrieTab, BindingTab, TriePath); + error -> + ok + end; + {_, _} -> + ok + end + end, + Projection = khepri_projection:new(rabbit_khepri_topic_trie_v4, PFun, Opts), + PathPattern = topic_binding_path_pattern(), + unregister_old_rabbit_topic_trie_projections(), + khepri:register_projection(?STORE_ID, PathPattern, Projection). + +topic_binding_path_pattern() -> + rabbit_db_binding:khepri_route_path( + _VHost = ?KHEPRI_WILDCARD_STAR, + _Exchange = #if_data_matches{pattern = #exchange{type = topic, + _ = '_'}}, + _Kind = ?KHEPRI_WILDCARD_STAR, + _DstName = ?KHEPRI_WILDCARD_STAR, + _BindingKey = ?KHEPRI_WILDCARD_STAR). + +%% Walk down the trie following the given words, creating edges and +%% intermediate nodes as needed. Returns the leaf node ID. +%% +%% Each trie row is a 3-tuple: {Key, ChildNodeId, ChildCount}. +%% ChildCount tracks the number of outgoing edges from ChildNodeId. +%% It is incremented when a new edge is created, decremented during GC. +trie_follow_down_create(TrieTab, XSrc, Words) -> + trie_follow_down_create(TrieTab, XSrc, root, none, Words). + +trie_follow_down_create(_TrieTab, _XSrc, NodeId, _ParentKey, []) -> + NodeId; +trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) -> + Key = {XSrc, ParentId, Word}, + case ets:lookup_element(TrieTab, Key, 2, undefined) of + undefined -> + NewId = make_ref(), + ets:insert(TrieTab, {Key, NewId, 0}), + _ = case ParentKey of + none -> + ok; + _ -> + ets:update_counter(TrieTab, ParentKey, {3, 1}) + end, + trie_follow_down_create(TrieTab, XSrc, NewId, Key, Rest); + ChildId -> + trie_follow_down_create(TrieTab, XSrc, ChildId, Key, Rest) + end. + +%% Walk down the trie following the given words, collecting the path +%% for later GC. Returns {ok, LeafNodeId, Path} or error. +trie_follow_down_get_path(TrieTab, XSrc, Words) -> + trie_follow_down_get_path(TrieTab, XSrc, root, none, Words, []). + +trie_follow_down_get_path(_TrieTab, _XSrc, NodeId, _ParentKey, [], Path) -> + {ok, NodeId, Path}; +trie_follow_down_get_path(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest], Path) -> + Key = {XSrc, ParentId, Word}, + case ets:lookup_element(TrieTab, Key, 2, undefined) of + undefined -> + error; + ChildId -> + trie_follow_down_get_path(TrieTab, XSrc, ChildId, Key, Rest, + [{Key, ParentKey, ChildId} | Path]) + end. + +%% Walk the path bottom-up (path is already in leaf-to-root order). +%% At each level, if the child node has no outgoing edges and no bindings, +%% delete the edge, decrement the parent's count, and continue upward. +trie_gc_path(_TrieTab, _BindingTab, []) -> + ok; +trie_gc_path(TrieTab, BindingTab, [{Key, ParentEdgeKey, ChildId} | Rest]) -> + case trie_node_is_empty(TrieTab, BindingTab, Key, ChildId) of + true -> + ets:delete(TrieTab, Key), + _ = case ParentEdgeKey of + none -> + ok; + _ -> + ets:update_counter(TrieTab, ParentEdgeKey, {3, -1}) + end, + trie_gc_path(TrieTab, BindingTab, Rest); + false -> + ok + end. + +%% A trie node is empty when it has no outgoing edges (ChildCount = 0) and +%% no bindings in the bindings table. +trie_node_is_empty(TrieTab, BindingTab, Key, ChildId) -> + case ets:lookup_element(TrieTab, Key, 3, 0) of + 0 -> not trie_node_has_bindings(BindingTab, ChildId); + _ -> false + end. + +trie_node_has_bindings(BindingTab, NodeId) -> + case ets:next(BindingTab, {NodeId, <<>>, {}}) of + {NodeId, _, _} -> true; + _ -> false + end. + +-spec supports_rabbit_khepri_topic_trie_v2() -> boolean(). supports_rabbit_khepri_topic_trie_v2() -> true. +-spec supports_rabbit_khepri_topic_trie_version() -> non_neg_integer(). supports_rabbit_khepri_topic_trie_version() -> - 3. + 4. + +get_effective_topic_binding_projection_version() -> + IsEnabled = rabbit_feature_flags:is_enabled( + topic_binding_projection_v4, non_blocking), + case IsEnabled of + true -> 4; + _ -> 3 + end. + +topic_binding_projection_enable( + #{feature_name := topic_binding_projection_v4 = FeatureName}) -> + ?LOG_DEBUG( + "Feature flag `~s`: register topic binding projection v4", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + case register_rabbit_topic_trie_projection() of + ok -> + ok; + {error, {khepri, projection_already_exists, _Info}} -> + ok; + {error, _} = Error -> + Error + end. + +topic_binding_projection_post_enable( + #{feature_name := topic_binding_projection_v4 = FeatureName}) -> + ?LOG_DEBUG( + "Feature flag `~s`: unregister old topic binding projections", + [FeatureName], + #{domain => ?RMQLOG_DOMAIN_DB}), + unregister_old_rabbit_topic_trie_projections(), + ok. unregister_old_rabbit_topic_trie_projections() -> - OldProjections = [{1, rabbit_khepri_topic_trie}, - {2, rabbit_khepri_topic_trie_v2}], - lists:foreach( - fun unregister_old_rabbit_topic_trie_projection/1, - OldProjections). + OldProjections0 = [{1, rabbit_khepri_topic_trie}, + {2, rabbit_khepri_topic_trie_v2}], + OldProjections1 = case get_effective_topic_binding_projection_version() of + V when V >= 4 -> + OldProjections0 ++ + [{3, rabbit_khepri_topic_trie_v3}]; + _ -> + OldProjections0 + end, + lists:foreach(fun unregister_old_rabbit_topic_trie_projection/1, + OldProjections1). unregister_old_rabbit_topic_trie_projection({Version, ProjectionName}) -> Nodes = rabbit_nodes:list_members(), diff --git a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl index f069e04a130b..fb2e4aeea94e 100644 --- a/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl +++ b/deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl @@ -2,17 +2,21 @@ %% License, v. 2.0. If a copy of the MPL was not distributed with this %% file, You can obtain one at https://mozilla.org/MPL/2.0/. %% -%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. +%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved. %% -module(rabbit_db_topic_exchange_SUITE). +-include_lib("proper/include/proper.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl"). +-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4). +-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4). + -export([all/0, groups/0, init_per_suite/1, @@ -22,24 +26,52 @@ init_per_testcase/2, end_per_testcase/2, - topic_trie_cleanup/1 + topic_trie_cleanup/1, + topic_match_basic/1, + topic_match_wildcards/1, + topic_match_hash_middle/1, + topic_match_binding_keys/1, + topic_match_empty_routing_key/1, + topic_match_deep_hierarchy/1, + topic_match_all_wildcards/1, + topic_match_alternating_wildcards/1, + topic_match_overlapping_filters/1, + topic_match_empty_segments/1, + topic_match_large_fanout/1, + topic_match_exchange_to_exchange/1, + topic_match_unicode/1, + prop_topic_match/1, + prop_topic_binding_lifecycle/1 ]). --define(VHOST, <<"/">>). - all() -> [ - {group, khepri_store} + {group, cluster_size_3} ]. groups() -> [ - {khepri_store, [], khepri_tests()} + {cluster_size_3, [], khepri_tests()} ]. khepri_tests() -> [ - topic_trie_cleanup + topic_trie_cleanup, + topic_match_basic, + topic_match_wildcards, + topic_match_hash_middle, + topic_match_binding_keys, + topic_match_empty_routing_key, + topic_match_deep_hierarchy, + topic_match_all_wildcards, + topic_match_alternating_wildcards, + topic_match_overlapping_filters, + topic_match_empty_segments, + topic_match_large_fanout, + topic_match_exchange_to_exchange, + topic_match_unicode, + prop_topic_match, + prop_topic_binding_lifecycle ]. init_per_suite(Config) -> @@ -49,24 +81,29 @@ init_per_suite(Config) -> end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). -init_per_group(khepri_store = Group, Config0) -> - Config = rabbit_ct_helpers:set_config( - Config0, - [{rmq_nodes_count, 3}]), - init_per_group_common(Group, Config); -init_per_group(Group, Config0) -> - Config = rabbit_ct_helpers:set_config( - Config0, - [{rmq_nodes_count, 1}]), - init_per_group_common(Group, Config). - -init_per_group_common(Group, Config) -> - Config1 = rabbit_ct_helpers:set_config(Config, [ - {rmq_nodename_suffix, Group} - ]), - rabbit_ct_helpers:run_steps(Config1, - rabbit_ct_broker_helpers:setup_steps() ++ - rabbit_ct_client_helpers:setup_steps()). +init_per_group(cluster_size_3 = _Group, Config0) -> + Config = rabbit_ct_helpers:set_config(Config0, [{rmq_nodes_count, 3}]), + Config1 = rabbit_ct_helpers:run_steps( + Config, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()), + case Config1 of + _ when is_list(Config1) -> + Ret = rabbit_ct_broker_helpers:enable_feature_flag( + Config1, topic_binding_projection_v4), + case Ret of + ok -> + Config1; + {skip, _} = Skip -> + _ = rabbit_ct_helpers:run_steps( + Config1, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + Skip + end; + {skip, _} = Skip -> + Skip + end. end_per_group(_Group, Config) -> rabbit_ct_helpers:run_steps(Config, @@ -87,30 +124,26 @@ end_per_testcase(Testcase, Config) -> %% Khepri-specific Tests %% --------------------------------------------------------------------------- -% https://github.com/rabbitmq/rabbitmq-server/issues/15024 +%% https://github.com/rabbitmq/rabbitmq-server/issues/15024 topic_trie_cleanup(Config) -> [_, OldNode, NewNode] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - %% this test has to be isolated to avoid flakes + %% This test has to be isolated to avoid flakes. VHost = <<"test-vhost-topic-trie">>, ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_vhost, add, [VHost, <<"test-user">>]), - %% Create an exchange in the vhost ExchangeName = rabbit_misc:r(VHost, exchange, <<"test-topic-exchange">>), {ok, _Exchange} = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_exchange, declare, [ExchangeName, topic, _Durable = true, _AutoDelete = false, _Internal = false, _Args = [], <<"test-user">>]), - %% List of routing keys that exercise topic exchange functionality - RoutingKeys = [ - %% Exact patterns with common prefixes + BindingKeys = [ <<"a.b.c">>, <<"a.b.d">>, <<"a.b.e">>, <<"a.c.d">>, <<"a.c.e">>, <<"b.c.d">>, - %% Patterns with a single wildcard <<"a.*.c">>, <<"a.*.d">>, <<"*.b.c">>, @@ -121,7 +154,6 @@ topic_trie_cleanup(Config) -> <<"a.*">>, <<"*.b">>, <<"*">>, - %% Patterns with multiple wildcards <<"a.#">>, <<"a.b.#">>, <<"a.c.#">>, @@ -130,13 +162,11 @@ topic_trie_cleanup(Config) -> <<"#.b.d">>, <<"#">>, <<"#.#">>, - %% Mixed patterns <<"a.*.#">>, <<"*.b.#">>, <<"*.#">>, <<"#.*">>, <<"#.*.#">>, - %% More complex patterns with common prefixes <<"orders.created.#">>, <<"orders.updated.#">>, <<"orders.*.confirmed">>, @@ -146,10 +176,8 @@ topic_trie_cleanup(Config) -> <<"events.#">> ], - %% Shuffle the routing keys to test in random order - ShuffledRoutingKeys = [RK || {_, RK} <- lists:sort([{rand:uniform(), RK} || RK <- RoutingKeys])], + ShuffledBindingKeys = [BK || {_, BK} <- lists:sort([{rand:uniform(), BK} || BK <- BindingKeys])], - %% Create bindings for all routing keys Bindings = [begin QueueName = rabbit_misc:r(VHost, queue, list_to_binary("queue-" ++ integer_to_list(Idx))), @@ -161,108 +189,70 @@ topic_trie_cleanup(Config) -> {existing, _Q} -> ok end, #binding{source = ExchangeName, - key = RoutingKey, + key = BindingKey, destination = QueueName, args = []} - end || {Idx, RoutingKey} <- lists:enumerate(ShuffledRoutingKeys)], + end || {Idx, BindingKey} <- lists:enumerate(ShuffledBindingKeys)], - %% Add all bindings [ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, add, [B, <<"test-user">>]) || B <- Bindings], - %% Log entries that were added to the ETS table lists:foreach( fun(Node) -> - VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v3), - ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n", - [Node, length(Bindings), length(VHostEntriesAfterAdd)]) + TrieEntries = read_trie_table(Config, Node, VHost, ?TOPIC_TRIE_PROJECTION), + BindingEntries = read_binding_table(Config, Node, ?TOPIC_BINDING_PROJECTION), + ct:pal("Bindings added on node ~s: ~p, trie entries: ~p, binding entries: ~p~n", + [Node, length(Bindings), length(TrieEntries), length(BindingEntries)]) end, Nodes), - %% Shuffle bindings again for deletion in random order ShuffledBindings = [B || {_, B} <- lists:sort([{rand:uniform(), B} || B <- Bindings])], - %% Delete all bindings in random order [ok = rabbit_ct_broker_helpers:rpc(Config, OldNode, rabbit_binding, remove, [B, <<"test-user">>]) || B <- ShuffledBindings], - %% Verify that the projection ETS table doesn't contain any entries related - %% to this vhost try lists:foreach( fun(Node) -> - %% We read and check the new projection table only. It is - %% declared by the new node and is available everywhere. The - %% old projection table might be there in case of - %% mixed-version testing. This part will be tested in the - %% second part of the testcase. - VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v3), - ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]), - - % %% Assert that no entries were found for this vhost after deletion - % ?assertEqual([], VHostEntriesAfterDelete) - {ok, KhepriVersion0} = rabbit_ct_broker_helpers:rpc( - Config, Node, - application, get_key, - [khepri, vsn]), - KhepriVersion1 = rabbit_misc:format( - "~2..0s~2..0s~2..0s", - string:split(KhepriVersion0, ".", all)), - KhepriVersion2 = list_to_integer(KhepriVersion1), - ct:pal("Khepri version: ~b / ~s", [KhepriVersion2, KhepriVersion1]), - if - KhepriVersion2 >= 1704 -> - %% Await until no entries remain for this vhost after - %% deletion. The Khepri projection may lag on followers - %% in a mixed-version cluster. - ?awaitMatch( - [], - read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v3), - 30_000); - true -> - %% This branch uses a version of Khepri that has a bug, - %% so it won't delete the new `#topic_trie_edge_v2{}` - %% correctly. Therefore we filter those out and await only for the - %% legacy entries to be gone. - ct:pal("Consider #topic_trie_edge{} records only"), - ?awaitMatch( - [], - [E || #topic_trie_edge{} = E - <- read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v3)], - 30_000) - end + ?awaitMatch( + [], + begin + TrieAfterDelete = read_trie_table( + Config, Node, VHost, + ?TOPIC_TRIE_PROJECTION), + ct:pal( + "ETS entries after delete on node ~s: trie=~p", + [Node, length(TrieAfterDelete)]), + TrieAfterDelete + end, + 30000), + ?awaitMatch( + [], + begin + BindingsAfterDelete = read_binding_table( + Config, Node, + ?TOPIC_BINDING_PROJECTION), + ct:pal( + "ETS entries after delete on node ~s: bindings=~p", + [Node, length(BindingsAfterDelete)]), + BindingsAfterDelete + end, + 30000) end, Nodes), - %% If we reach this point, we know the new projection works as expected - %% and the leaked ETS entries are no more. - %% - %% Now, we want to test that after an upgrade, the old projection is - %% unregistered. + %% Check whether old projections exist and need cleanup. HasOldProjection = try - VHostEntriesInOldTable = read_topic_trie_table( + VHostEntriesInOldTable = read_trie_table( Config, OldNode, VHost, rabbit_khepri_topic_trie), ct:pal("Old ETS table entries after delete: ~p~n", [length(VHostEntriesInOldTable)]), - ?assertNotEqual([], VHostEntriesInOldTable), true catch error:{exception, badarg, _} -> - %% The old projection doesn't exist. The old - %% node, if we are in a mixed-version test, - %% also supports the new projection. There - %% is nothing more to test. ct:pal("The old projection was not registered, nothing to test"), false end, case HasOldProjection of true -> - %% The old projection is registered. Simulate an update by removing - %% node 1 (which is the old one in our mixed-version testing) from - %% the cluster, then restart node 2. On restart, it should - %% unregister the old projection. - %% - %% FIXME: The cluster is configured at the test group level. - %% Therefore, if we add more testcases to this group, following - %% testcases won't have the expected cluster. ?assertEqual(ok, rabbit_ct_broker_helpers:stop_broker(Config, OldNode)), ?assertEqual(ok, rabbit_ct_broker_helpers:forget_cluster_node(Config, NewNode, OldNode)), @@ -272,7 +262,7 @@ topic_trie_cleanup(Config) -> ct:pal("Wait for projections to be restored"), ?awaitMatch( Entries when is_list(Entries), - catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v3), + catch read_trie_table(Config, NewNode, VHost, ?TOPIC_TRIE_PROJECTION), 60000), ct:pal("Check that the old projections are gone"), @@ -280,35 +270,753 @@ topic_trie_cleanup(Config) -> fun(ProjectionName) -> ?assertError( {exception, badarg, _}, - read_topic_trie_table(Config, NewNode, VHost, ProjectionName)) + read_trie_table(Config, NewNode, VHost, ProjectionName)) end, [rabbit_khepri_topic_trie, - rabbit_khepri_topic_trie_v2]); + rabbit_khepri_topic_trie_v2, + rabbit_khepri_topic_trie_v3]); false -> ok end after - %% Clean up the vhost ok = rabbit_ct_broker_helpers:rpc(Config, NewNode, rabbit_vhost, delete, [VHost, <<"test-user">>]) end, passed. -read_topic_trie_table(Config, Node, VHost, Table) -> +topic_match_basic(Config) -> + VHost = <<"test-vhost-basic">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + QFoo = rabbit_misc:r(VHost, queue, <<"q-foo">>), + QBar = rabbit_misc:r(VHost, queue, <<"q-bar">>), + QBaz = rabbit_misc:r(VHost, queue, <<"q-baz">>), + + add_binding(Config, XName, <<"foo.bar">>, QFoo), + add_binding(Config, XName, <<"foo.baz">>, QBar), + add_binding(Config, XName, <<"other.key">>, QBaz), + + try + ?assertEqual([QFoo], + do_match(Config, XName, <<"foo.bar">>, #{})), + ?assertEqual([QBar], + do_match(Config, XName, <<"foo.baz">>, #{})), + ?assertEqual([QBaz], + do_match(Config, XName, <<"other.key">>, #{})), + ?assertEqual([], + do_match(Config, XName, <<"no.match">>, #{})) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +topic_match_wildcards(Config) -> + VHost = <<"test-vhost-wildcards">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + QStar = rabbit_misc:r(VHost, queue, <<"q-star">>), + QHash = rabbit_misc:r(VHost, queue, <<"q-hash">>), + QExact = rabbit_misc:r(VHost, queue, <<"q-exact">>), + QDeep = rabbit_misc:r(VHost, queue, <<"q-deep">>), + QAll = rabbit_misc:r(VHost, queue, <<"q-all">>), + + add_binding(Config, XName, <<"foo.*">>, QStar), + add_binding(Config, XName, <<"foo.#">>, QHash), + add_binding(Config, XName, <<"foo.bar">>, QExact), + add_binding(Config, XName, <<"foo.*.baz">>, QDeep), + add_binding(Config, XName, <<"#">>, QAll), + + try + ?assertEqual( + [QAll, QExact, QHash, QStar], + sort_dests(do_match(Config, XName, <<"foo.bar">>, #{}))), + ?assertEqual( + [QAll, QDeep, QHash], + sort_dests(do_match(Config, XName, <<"foo.bar.baz">>, #{}))), + ?assertEqual( + [QAll, QHash], + sort_dests(do_match(Config, XName, <<"foo.bar.baz.qux">>, #{}))), + ?assertEqual( + [QAll, QHash, QStar], + sort_dests(do_match(Config, XName, <<"foo.qux">>, #{}))), + ?assertEqual( + [QAll], + sort_dests(do_match(Config, XName, <<"bar.foo">>, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Tests '#' appearing in the middle and beginning of filters, which is +%% valid in RabbitMQ's AMQP implementation, but not in MQTT. +topic_match_hash_middle(Config) -> + VHost = <<"test-vhost-hash-middle">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + Q1 = rabbit_misc:r(VHost, queue, <<"q-hash-c">>), + Q2 = rabbit_misc:r(VHost, queue, <<"q-hash-bc">>), + Q3 = rabbit_misc:r(VHost, queue, <<"q-hash-mid">>), + Q4 = rabbit_misc:r(VHost, queue, <<"q-hash-star">>), + + add_binding(Config, XName, <<"#.c">>, Q1), + add_binding(Config, XName, <<"#.b.c">>, Q2), + add_binding(Config, XName, <<"a.#.d">>, Q3), + add_binding(Config, XName, <<"#.*.#">>, Q4), + + try + ?assertEqual( + sort_dests([Q1, Q2, Q4]), + sort_dests(do_match(Config, XName, <<"a.b.c">>, #{}))), + ?assertEqual( + sort_dests([Q1, Q4]), + sort_dests(do_match(Config, XName, <<"c">>, #{}))), + ?assertEqual( + sort_dests([Q1, Q2, Q4]), + sort_dests(do_match(Config, XName, <<"x.y.b.c">>, #{}))), + ?assertEqual( + sort_dests([Q3, Q4]), + sort_dests(do_match(Config, XName, <<"a.d">>, #{}))), + ?assertEqual( + sort_dests([Q3, Q4]), + sort_dests(do_match(Config, XName, <<"a.b.c.d">>, #{}))), + ?assertEqual( + sort_dests([Q4]), + sort_dests(do_match(Config, XName, <<"x">>, #{}))), + ?assertEqual( + sort_dests([Q1, Q4]), + sort_dests(do_match(Config, XName, <<"x.c">>, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +topic_match_binding_keys(Config) -> + VHost = <<"test-vhost-bkeys">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + QStar = rabbit_misc:r(VHost, queue, <<"q-star">>), + QExact = rabbit_misc:r(VHost, queue, <<"q-exact">>), + + add_binding(Config, XName, <<"foo.*">>, QStar), + add_binding(Config, XName, <<"foo.bar">>, QExact), + + try + Result = do_match(Config, XName, <<"foo.bar">>, #{return_binding_keys => true}), + ?assertEqual([{QExact, <<"foo.bar">>}, + {QStar, <<"foo.*">>}], + lists:sort(Result)) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +topic_match_empty_routing_key(Config) -> + VHost = <<"test-vhost-empty-rk">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + QHash = rabbit_misc:r(VHost, queue, <<"q-hash">>), + QExact = rabbit_misc:r(VHost, queue, <<"q-exact">>), + + add_binding(Config, XName, <<"#">>, QHash), + add_binding(Config, XName, <<"foo.bar">>, QExact), + + try + %% Empty routing key should only match '#' + ?assertEqual( + [QHash], + sort_dests(do_match(Config, XName, <<>>, #{}))), + %% Non-empty routing key should also match '#' + ?assertEqual( + sort_dests([QExact, QHash]), + sort_dests(do_match(Config, XName, <<"foo.bar">>, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Tests a deep 26-level topic hierarchy. +topic_match_deep_hierarchy(Config) -> + VHost = <<"test-vhost-deep">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + T = <<"a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z">>, + QAll = rabbit_misc:r(VHost, queue, <<"q-all">>), + QPrefix = rabbit_misc:r(VHost, queue, <<"q-prefix">>), + QPrefixStar = rabbit_misc:r(VHost, queue, <<"q-prefix-star">>), + + add_binding(Config, XName, <<"#">>, QAll), + add_binding(Config, XName, <>, QPrefix), + add_binding(Config, XName, <>, QPrefixStar), + + try + ?assertEqual( + sort_dests([QAll, QPrefix]), + sort_dests(do_match(Config, XName, T, #{}))), + ?assertEqual( + sort_dests([QAll, QPrefix, QPrefixStar]), + sort_dests(do_match(Config, XName, <>, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Tests a filter with 26 single-level wildcards followed by '#'. +topic_match_all_wildcards(Config) -> + VHost = <<"test-vhost-all-wild">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + T = <<"a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z">>, + W = <<"*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.*.#">>, + QWild = rabbit_misc:r(VHost, queue, <<"q-wild">>), + + add_binding(Config, XName, W, QWild), + + try + ?assertEqual( + [QWild], + sort_dests(do_match(Config, XName, T, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Tests alternating literal and wildcard segments. +topic_match_alternating_wildcards(Config) -> + VHost = <<"test-vhost-alt-wild">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + T = <<"a.b.c.d.e.f.g.h.i.j.k.l.m.n.o.p.q.r.s.t.u.v.w.x.y.z">>, + W = <<"a.*.c.*.e.*.g.*.i.*.k.*.m.*.o.*.q.*.s.*.u.*.w.*.y.*.#">>, + QAlt = rabbit_misc:r(VHost, queue, <<"q-alt">>), + + add_binding(Config, XName, W, QAlt), + + try + ?assertEqual( + [QAlt], + sort_dests(do_match(Config, XName, T, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Tests multiple overlapping wildcard filters against the same topic. +topic_match_overlapping_filters(Config) -> + VHost = <<"test-vhost-overlap">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + Q1 = rabbit_misc:r(VHost, queue, <<"q1">>), + Q2 = rabbit_misc:r(VHost, queue, <<"q2">>), + Q3 = rabbit_misc:r(VHost, queue, <<"q3">>), + Q4 = rabbit_misc:r(VHost, queue, <<"q4">>), + Q5 = rabbit_misc:r(VHost, queue, <<"q5">>), + Q6 = rabbit_misc:r(VHost, queue, <<"q6">>), + Q7 = rabbit_misc:r(VHost, queue, <<"q7">>), + Q8 = rabbit_misc:r(VHost, queue, <<"q8">>), + + add_binding(Config, XName, <<"a.b">>, Q1), + add_binding(Config, XName, <<"a.b.#">>, Q2), + add_binding(Config, XName, <<"a.b.c">>, Q3), + add_binding(Config, XName, <<"a.b.*">>, Q4), + add_binding(Config, XName, <<"a.b.d">>, Q5), + add_binding(Config, XName, <<"a.*.*">>, Q6), + add_binding(Config, XName, <<"a.*.#">>, Q7), + add_binding(Config, XName, <<"*.b.c">>, Q8), + + try + ?assertEqual( + sort_dests([Q2, Q3, Q4, Q6, Q7, Q8]), + sort_dests(do_match(Config, XName, <<"a.b.c">>, #{}))), + ?assertEqual( + sort_dests([Q1, Q2, Q7]), + sort_dests(do_match(Config, XName, <<"a.b">>, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Tests routing keys with empty segments (consecutive dots). +topic_match_empty_segments(Config) -> + VHost = <<"test-vhost-empty-seg">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + Q1 = rabbit_misc:r(VHost, queue, <<"q-hash">>), + Q2 = rabbit_misc:r(VHost, queue, <<"q-star">>), + Q3 = rabbit_misc:r(VHost, queue, <<"q-deep">>), + + %% Leading dot means the first segment is empty. + add_binding(Config, XName, <<".#">>, Q1), + add_binding(Config, XName, <<".*">>, Q2), + add_binding(Config, XName, <<".*.a.b.c">>, Q3), + + try + ?assertEqual( + sort_dests([Q1, Q3]), + sort_dests(do_match(Config, XName, <<".0.a.b.c">>, #{}))) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +topic_match_large_fanout(Config) -> + VHost = <<"test-vhost-fanout">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + NumQueues = 500, + Queues = [begin + QName = rabbit_misc:r(VHost, queue, + list_to_binary("q-fan-" ++ integer_to_list(I))), + add_binding(Config, XName, <<"events.user.login">>, QName), + QName + end || I <- lists:seq(1, NumQueues)], + + try + Result = do_match(Config, XName, <<"events.user.login">>, #{}), + ?assertEqual(lists:sort(Queues), + lists:sort(Result)) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Exchange-to-exchange topic bindings: a topic exchange routes to both +%% queue and exchange destinations. +topic_match_exchange_to_exchange(Config) -> + VHost = <<"test-vhost-e2e">>, + setup_vhost(Config, VHost), + XSrc = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + QDirect = rabbit_misc:r(VHost, queue, <<"q-direct">>), + XDest = rabbit_misc:r(VHost, exchange, <<"x-dest">>), + QViaExchange = rabbit_misc:r(VHost, queue, <<"q-via-exchange">>), + + add_binding(Config, XSrc, <<"events.*.critical">>, QDirect), + add_exchange_binding(Config, XSrc, <<"events.#">>, XDest), + add_binding(Config, XSrc, <<"events.app.critical">>, QViaExchange), + + try + Result = do_match(Config, XSrc, <<"events.app.critical">>, #{}), + ?assertEqual(sort_dests([QDirect, XDest, QViaExchange]), + sort_dests(Result)), + + ResultBKeys = do_match(Config, XSrc, <<"events.app.critical">>, + #{return_binding_keys => true}), + ?assert(lists:member( + {QDirect, <<"events.*.critical">>}, + ResultBKeys)), + ?assert(lists:member( + {QViaExchange, <<"events.app.critical">>}, + ResultBKeys)), + %% Exchange destinations do not carry a binding key + ?assert(lists:member(XDest, ResultBKeys)), + + ResultWild = do_match(Config, XSrc, <<"events.db.info">>, #{}), + ?assertEqual([XDest], ResultWild) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Unicode characters in both routing keys and binding keys. +topic_match_unicode(Config) -> + VHost = <<"test-vhost-unicode">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + QCjk = rabbit_misc:r(VHost, queue, <<"q-cjk">>), + QEmoji = rabbit_misc:r(VHost, queue, <<"q-emoji">>), + QCyrillic = rabbit_misc:r(VHost, queue, <<"q-cyrillic">>), + QMixed = rabbit_misc:r(VHost, queue, <<"q-mixed">>), + QWild = rabbit_misc:r(VHost, queue, <<"q-wild">>), + QHash = rabbit_misc:r(VHost, queue, <<"q-hash">>), + QAll = rabbit_misc:r(VHost, queue, <<"q-all">>), + + add_binding(Config, XName, <<"传感器.温度.摄氏"/utf8>>, QCjk), + add_binding(Config, XName, <<"🏠.🌡️.📊"/utf8>>, QEmoji), + add_binding(Config, XName, <<"датчик.температура.цельсий"/utf8>>, QCyrillic), + add_binding(Config, XName, <<"sensor.données.état"/utf8>>, QMixed), + add_binding(Config, XName, <<"传感器.*.摄氏"/utf8>>, QWild), + add_binding(Config, XName, <<"датчик.#"/utf8>>, QHash), + add_binding(Config, XName, <<"#">>, QAll), + + try + ?assertEqual( + sort_dests([QCjk, QWild, QAll]), + sort_dests(do_match(Config, XName, + <<"传感器.温度.摄氏"/utf8>>, #{}))), + + ?assertEqual( + sort_dests([QEmoji, QAll]), + sort_dests(do_match(Config, XName, + <<"🏠.🌡️.📊"/utf8>>, #{}))), + + ?assertEqual( + sort_dests([QCyrillic, QHash, QAll]), + sort_dests(do_match(Config, XName, + <<"датчик.температура.цельсий"/utf8>>, #{}))), + + ?assertEqual( + sort_dests([QMixed, QAll]), + sort_dests(do_match(Config, XName, + <<"sensor.données.état"/utf8>>, #{}))), + + ?assertEqual( + sort_dests([QWild, QAll]), + sort_dests(do_match(Config, XName, + <<"传感器.压力.摄氏"/utf8>>, #{}))), + + ?assertEqual( + sort_dests([QHash, QAll]), + sort_dests(do_match(Config, XName, + <<"датчик.давление"/utf8>>, #{}))), + + ResultBKeys = do_match(Config, XName, <<"传感器.温度.摄氏"/utf8>>, + #{return_binding_keys => true}), + ?assert(lists:member( + {QCjk, <<"传感器.温度.摄氏"/utf8>>}, + ResultBKeys)), + ?assert(lists:member( + {QWild, <<"传感器.*.摄氏"/utf8>>}, + ResultBKeys)), + ?assert(lists:member( + {QAll, <<"#">>}, + ResultBKeys)) + after + cleanup_vhost(Config, VHost) + end, + + passed. + +%% Generates random topic filters and routing keys, creates bindings +%% on the broker via RPC, and verifies the routing results match an +%% independent reference implementation. +prop_topic_match(Config) -> + VHost = <<"test-vhost-prop">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + try + run_proper( + fun() -> + ?FORALL( + {Filters, Topics}, + {resize(10, list(topic_filter_gen())), + resize(10, list(topic_gen()))}, + begin + NumberedFilters = lists:enumerate(Filters), + Queues = lists:map( + fun({N, FilterBin}) -> + QName = rabbit_misc:r( + VHost, queue, + <<"prop-q-", (integer_to_binary(N))/binary>>), + add_binding(Config, XName, FilterBin, QName), + {N, FilterBin, QName} + end, NumberedFilters), + Result = lists:all( + fun(TopicBin) -> + Actual = lists:usort( + do_match(Config, XName, TopicBin, #{})), + Expected = lists:usort( + [QName + || {_N, F, QName} <- Queues, + ref_match_filter(F, TopicBin)]), + case Actual =:= Expected of + true -> + true; + false -> + ct:pal("Topic: ~s~n" + "Filters: ~p~n" + "Expected: ~p~n" + "Actual: ~p~n", + [TopicBin, Filters, + Expected, Actual]), + false + end + end, Topics), + lists:foreach( + fun({_N, FilterBin, QName}) -> + B = #binding{source = XName, + key = FilterBin, + destination = QName, + args = []}, + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_binding, remove, [B, <<"test-user">>]) + end, Queues), + Result + end) + end) + after + cleanup_vhost(Config, VHost) + end, + passed. + +%% Property: inserting and deleting bindings maintains correct projection +%% table invariants. After inserting N unique bindings, the binding table +%% should contain exactly N entries for the vhost. After deleting all +%% bindings (in random order), both projection tables should be empty. +prop_topic_binding_lifecycle(Config) -> + VHost = <<"test-vhost-lifecycle">>, + setup_vhost(Config, VHost), + XName = rabbit_misc:r(VHost, exchange, <<"amq.topic">>), + + try + run_proper( + fun() -> + ?FORALL( + FilterQueuePairs, + resize(15, non_empty(list( + {topic_filter_gen(), + oneof([<<"q1">>, <<"q2">>, <<"q3">>])}))), + begin + Bindings = lists:map( + fun({FilterBin, QBin}) -> + QName = rabbit_misc:r(VHost, queue, QBin), + add_binding(Config, XName, FilterBin, QName), + #binding{source = XName, + key = FilterBin, + destination = QName, + args = []} + end, FilterQueuePairs), + + UniqueBindings = lists:usort( + [{F, Q} || #binding{key = F, destination = Q} <- Bindings]), + ExpectedBindingCount = length(UniqueBindings), + + HasNonEmptyKey = lists:any( + fun({F, _}) -> F =/= <<>> end, + UniqueBindings), + + BindingCount = count_binding_entries(Config, VHost), + TrieCount = count_trie_entries(Config, VHost), + + BindingCountOk = BindingCount =:= ExpectedBindingCount, + %% Empty binding keys (<<>>) bind at the root node + %% without creating any trie edges. + TrieCountOk = case HasNonEmptyKey of + true -> TrieCount > 0; + false -> TrieCount >= 0 + end, + + case BindingCountOk andalso TrieCountOk of + false -> + ct:pal("After insert:~n" + " Pairs: ~p~n" + " Expected binding count: ~b~n" + " Actual binding count: ~b~n" + " Trie entries: ~b~n", + [FilterQueuePairs, ExpectedBindingCount, + BindingCount, TrieCount]); + true -> + ok + end, + + ShuffledBindings = shuffle(lists:usort(Bindings)), + lists:foreach( + fun(B) -> + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_binding, remove, [B, <<"test-user">>]) + end, ShuffledBindings), + + BindingCountAfter = count_binding_entries(Config, VHost), + TrieCountAfter = count_trie_entries(Config, VHost), + + EmptyOk = BindingCountAfter =:= 0 andalso + TrieCountAfter =:= 0, + + case EmptyOk of + false -> + ct:pal("After delete:~n" + " Binding entries remaining: ~b~n" + " Trie entries remaining: ~b~n", + [BindingCountAfter, TrieCountAfter]); + true -> + ok + end, + + BindingCountOk andalso TrieCountOk andalso EmptyOk + end) + end) + after + cleanup_vhost(Config, VHost) + end, + passed. + +%% Independent reference implementation of AMQP topic matching. +ref_match_filter(FilterBin, TopicBin) -> + FilterWords = rabbit_db_topic_exchange:split_topic_key_binary(FilterBin), + TopicWords = rabbit_db_topic_exchange:split_topic_key_binary(TopicBin), + ref_match(FilterWords, TopicWords). + +ref_match([], []) -> + true; +ref_match([<<"#">>], _) -> + true; +ref_match([<<"#">> | FRest], Topic) -> + ref_match_hash(FRest, Topic); +ref_match([<<"*">> | FRest], [_ | TRest]) -> + ref_match(FRest, TRest); +ref_match([W | FRest], [W | TRest]) -> + ref_match(FRest, TRest); +ref_match(_, _) -> + false. + +ref_match_hash(FRest, Topic) -> + ref_match(FRest, Topic) + orelse + case Topic of + [_ | TRest] -> ref_match_hash(FRest, TRest); + [] -> false + end. + +run_proper(Fun) -> + ?assert(proper:counterexample( + Fun(), + [{numtests, 100}, + {on_output, fun(".", _) -> ok; + (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) + end}])). + +topic_gen() -> + ?LET(Words, resize(5, list(topic_word_gen())), + list_to_binary(lists:join(".", Words))). + +topic_filter_gen() -> + frequency([{3, ?LET(Pat, resize(5, list(mqtt_segment_gen())), + mk_filter_binary_mqtt(Pat))}, + {1, ?LET(Pat, resize(5, list(amqp_segment_gen())), + mk_filter_binary_amqp(Pat))}]). + +topic_word_gen() -> + oneof([<<"a">>, <<"b">>, <<"c">>, <<"d">>, + <<"foo">>, <<"bar">>, <<"baz">>, + <<>>]). + +%% MQTT-style: '#' can only appear at the end. +mqtt_segment_gen() -> + frequency([{5, literal}, + {2, star}, + {1, hash}]). + +%% AMQP-style: '#' can appear anywhere. +amqp_segment_gen() -> + frequency([{5, literal}, + {2, star}, + {2, hash}]). + +mk_filter_binary_mqtt(Pat) -> + mk_filter_binary_mqtt(Pat, []). + +mk_filter_binary_mqtt([], Acc) -> + list_to_binary(lists:join(".", lists:reverse(Acc))); +mk_filter_binary_mqtt([hash | _], Acc) -> + list_to_binary(lists:join(".", lists:reverse(["#" | Acc]))); +mk_filter_binary_mqtt([star | Rest], Acc) -> + mk_filter_binary_mqtt(Rest, ["*" | Acc]); +mk_filter_binary_mqtt([literal | Rest], Acc) -> + Word = lists:nth(rand:uniform(4), ["a", "b", "c", "d"]), + mk_filter_binary_mqtt(Rest, [Word | Acc]). + +mk_filter_binary_amqp(Pat) -> + Segments = [case S of + literal -> lists:nth(rand:uniform(4), ["a", "b", "c", "d"]); + star -> "*"; + hash -> "#" + end || S <- Pat], + list_to_binary(lists:join(".", Segments)). + +%% --------------------------------------------------------------------------- +%% Helpers +%% --------------------------------------------------------------------------- + +setup_vhost(Config, VHost) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, add, [VHost, <<"test-user">>]). + +cleanup_vhost(Config, VHost) -> + ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_vhost, delete, [VHost, <<"test-user">>]). + +add_binding(Config, XName, BindingKey, QName) -> + Ret = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_amqqueue, declare, + [QName, true, false, [], self(), <<"test-user">>]), + case Ret of + {new, _Q} -> ok; + {existing, _Q} -> ok + end, + Binding = #binding{source = XName, + key = BindingKey, + destination = QName, + args = []}, + ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, add, [Binding, <<"test-user">>]). + +add_exchange_binding(Config, XSrc, BindingKey, XDest) -> + {ok, _} = rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_exchange, declare, + [XDest, fanout, true, false, false, [], <<"test-user">>]), + Binding = #binding{source = XSrc, + key = BindingKey, + destination = XDest, + args = []}, + ok = rabbit_ct_broker_helpers:rpc(Config, 0, rabbit_binding, add, [Binding, <<"test-user">>]). + +do_match(Config, XName, RoutingKey, Opts) -> + rabbit_ct_broker_helpers:rpc( + Config, 0, + rabbit_db_topic_exchange, match, [XName, RoutingKey, Opts]). + +sort_dests(List) -> + lists:usort(List). + +read_trie_table(Config, Node, VHost, Table) -> Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]), [Entry || Entry <- Entries, - case Entry of - #topic_trie_edge{trie_edge = TrieEdge} -> - case TrieEdge of - #trie_edge{exchange_name = #resource{virtual_host = V}} -> - V =:= VHost; - _ -> - false - end; - #topic_trie_edge_v2{trie_edge = TrieEdge} -> - case TrieEdge of - #trie_edge{exchange_name = #resource{virtual_host = V}} -> - V =:= VHost; - _ -> - false - end - end]. + trie_entry_matches_vhost(Entry, VHost)]. + +trie_entry_matches_vhost(Entry, VHost) when is_tuple(Entry) -> + case element(1, Entry) of + {{V, _}, _, _} when is_binary(V) -> V =:= VHost; + _ -> false + end. + +read_binding_table(Config, Node, Table) -> + rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]). + +count_binding_entries(Config, VHost) -> + All = rabbit_ct_broker_helpers:rpc( + Config, 0, ets, tab2list, [?TOPIC_BINDING_PROJECTION]), + length([E || E = {{_NodeId, _BKey, #resource{virtual_host = V}}} <- All, + V =:= VHost]). + +count_trie_entries(Config, VHost) -> + All = rabbit_ct_broker_helpers:rpc( + Config, 0, ets, tab2list, [?TOPIC_TRIE_PROJECTION]), + length([E || E = {{XSrc, _NodeId, _Word}, _ChildId, _Count} <- All, + element(1, XSrc) =:= VHost]). + +shuffle(List) -> + [X || {_, X} <- lists:sort([{rand:uniform(), X} || X <- List])].