Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -255,3 +255,14 @@
post_enable =>
{rabbit_khepri, topic_binding_projection_post_enable}}
}}).

-rabbit_feature_flag(
{topic_binding_projection_v5,
#{desc => "Enable the topic binding Khepri projection v5",
stability => stable,
depends_on => [topic_binding_projection_v4],
callbacks => #{enable =>
{rabbit_khepri, topic_binding_projection_v5_enable},
post_enable =>
{rabbit_khepri, topic_binding_projection_v5_post_enable}}
}}).
68 changes: 35 additions & 33 deletions deps/rabbit/src/rabbit_db_topic_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,6 @@

-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3).

-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).

-type match_result() :: [rabbit_types:binding_destination() |
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].

Expand All @@ -39,8 +36,14 @@ match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
Words = split_topic_key_binary(RoutingKey),
case rabbit_khepri:get_effective_topic_binding_projection_version() of
V when V >= 4 ->
XSrc = {VHost, XName},
{TrieTab, BindingTab} = rabbit_khepri:topic_trie_table_names(V),
Root = case V of
4 -> root;
_ -> {root, XSrc}
end,
try
trie_match({VHost, XName}, root, Words, BKeys, [])
trie_match(XSrc, TrieTab, BindingTab, Root, Words, BKeys, [])
catch
error:badarg ->
[]
Expand Down Expand Up @@ -87,37 +90,36 @@ split_topic_key_binary(RoutingKey) ->
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
%% ==============================================================

trie_match(XSrc, Node, [], BKeys, Acc0) ->
Acc1 = trie_bindings(Node, BKeys, Acc0),
trie_match_try(XSrc, Node, <<"#">>,
fun trie_match_skip_any/5,
trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc0) ->
Acc1 = trie_bindings(BindTab, Node, BKeys, Acc0),
trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>,
fun trie_match_skip_any/7,
[], BKeys, Acc1);
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
Acc1 = trie_match_try(XSrc, Node, W,
fun trie_match/5,
trie_match(XSrc, TrieTab, BindTab, Node, [W | RestW] = Words, BKeys, Acc0) ->
Acc1 = trie_match_try(XSrc, TrieTab, BindTab, Node, W,
fun trie_match/7,
RestW, BKeys, Acc0),
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
fun trie_match/5,
Acc2 = trie_match_try(XSrc, TrieTab, BindTab, Node, <<"*">>,
fun trie_match/7,
RestW, BKeys, Acc1),
trie_match_try(XSrc, Node, <<"#">>,
fun trie_match_skip_any/5,
trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>,
fun trie_match_skip_any/7,
Words, BKeys, Acc2).

trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
{XSrc, Node, Word}, 2, undefined) of
trie_match_try(XSrc, TrieTab, BindTab, Node, Word, MatchFun, RestW, BKeys, Acc) ->
case ets:lookup_element(TrieTab, {XSrc, Node, Word}, 2, undefined) of
undefined ->
Acc;
NextNode ->
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
MatchFun(XSrc, TrieTab, BindTab, NextNode, RestW, BKeys, Acc)
end.

trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
trie_match(XSrc, Node, [], BKeys, Acc);
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc) ->
trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc);
trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [_ | RestW] = Words, BKeys, Acc) ->
trie_match_skip_any(
XSrc, Node, RestW, BKeys,
trie_match(XSrc, Node, Words, BKeys, Acc)).
XSrc, TrieTab, BindTab, Node, RestW, BKeys,
trie_match(XSrc, TrieTab, BindTab, Node, Words, BKeys, Acc)).

%% Collect all destinations bound at the given trie node.
%%
Expand All @@ -129,15 +131,15 @@ trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
%% which is cheaper than F individual ets:next/2 calls
%% (each O(log N) due to CATree fresh-stack allocation).
trie_bindings(NodeId, BKeys, Acc) ->
trie_bindings(BindingTab, NodeId, BKeys, Acc) ->
StartKey = {NodeId, <<>>, {}},
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
case ets:next(BindingTab, StartKey) of
{NodeId, BKey1, Dest1} = Key1 ->
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
case ets:next(BindingTab, Key1) of
{NodeId, BKey2, Dest2} = Key2 ->
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
case ets:next(BindingTab, Key2) of
{NodeId, _, _} ->
collect_select(NodeId, BKeys, Acc);
collect_select(BindingTab, NodeId, BKeys, Acc);
_ ->
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
collect_binding(Dest2, BKey2, BKeys, Acc1)
Expand All @@ -154,12 +156,12 @@ collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
[Dest | Acc].

collect_select(NodeId, false, Acc) ->
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
collect_select(BindingTab, NodeId, false, Acc) ->
Dests = ets:select(BindingTab,
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
Dests ++ Acc;
collect_select(NodeId, true, Acc) ->
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
collect_select(BindingTab, NodeId, true, Acc) ->
DestsAndBKeys = ets:select(BindingTab,
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
format_dest_bkeys(DestsAndBKeys, Acc).

Expand Down
108 changes: 78 additions & 30 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,14 @@
supports_rabbit_khepri_topic_trie_version/0]).

%% Called locally to determine which projection to use.
-export([get_effective_topic_binding_projection_version/0]).
-export([get_effective_topic_binding_projection_version/0,
topic_trie_table_names/1]).

%% Used during topic binding projections related feature flags handling.
-export([topic_binding_projection_enable/1,
topic_binding_projection_post_enable/1]).
-export([topic_binding_projection_v5_enable/1,
topic_binding_projection_v5_post_enable/1]).

-ifdef(TEST).
-export([register_projections/0,
Expand Down Expand Up @@ -1564,8 +1567,8 @@ projection_fun_for_sets(MapFun) ->

register_rabbit_topic_binding_projection() ->
case get_effective_topic_binding_projection_version() of
V when V >= 4 -> register_rabbit_topic_trie_projection();
_ -> register_rabbit_topic_graph_projection()
V when V >= 4 -> register_rabbit_topic_trie_projection(V);
_ -> register_rabbit_topic_graph_projection()
end.

%% Topic routing via trie ETS projection (v3).
Expand Down Expand Up @@ -1648,7 +1651,7 @@ register_rabbit_topic_graph_projection() ->
_ = unregister_old_rabbit_topic_trie_projections(),
khepri:register_projection(?STORE_ID, PathPattern, Projection).

%% Topic routing via trie + ordered_set ETS projections (v4).
%% Topic routing via trie + ordered_set ETS projections.
%% Uses a single Khepri projection with two ETS tables:
%%
%% Trie edges table (set) for fast trie navigation during routing:
Expand All @@ -1658,14 +1661,11 @@ register_rabbit_topic_graph_projection() ->
%% Row: {{NodeId, BindingKey, Dest}}
%%
%% XSrc = {VHost, ExchangeName} (binaries)
%% NodeId = root | reference()
%% NodeId = {root, XSrc} (v5) | root (v4) | reference()
%% Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>)
%% ChildCount = non_neg_integer() (number of outgoing edges)
%% Dest = #resource{}
%%
%% This projection is only used once the `topic_binding_projection_v4' feature
%% flag is enabled.
register_rabbit_topic_trie_projection() ->
register_rabbit_topic_trie_projection(Vsn) ->
ShouldProcessFun =
fun (rabbit_db_topic_exchange, split_topic_key_binary, 1, _From) ->
%% This function uses `persistent_term' to store a lazily compiled
Expand All @@ -1681,27 +1681,30 @@ register_rabbit_topic_trie_projection() ->
(M, F, A, From) ->
khepri_tx_adv:should_process_function(M, F, A, From)
end,
Opts = #{tables => #{rabbit_khepri_topic_trie_v4 =>
#{type => set},
rabbit_khepri_topic_binding_v4 =>
#{type => ordered_set}},
{TrieTabName, BindingTabName} = topic_trie_table_names(Vsn),
Opts = #{tables => #{TrieTabName => #{type => set},
BindingTabName => #{type => ordered_set}},
keypos => 1,
read_concurrency => true,
standalone_fun_options => #{should_process_function => ShouldProcessFun}},
PFun = fun(Tables, Path, OldProps, NewProps) ->
#{rabbit_khepri_topic_trie_v4 := TrieTab,
rabbit_khepri_topic_binding_v4 := BindingTab} = Tables,
#{TrieTabName := TrieTab,
BindingTabName := BindingTab} = Tables,
{VHost, ExchangeName, Kind, DstName, BindingKey} =
rabbit_db_binding:khepri_route_path_to_args(Path),
XSrc = {VHost, ExchangeName},
Dest = rabbit_misc:r(VHost, Kind, DstName),
Words = rabbit_db_topic_exchange:split_topic_key_binary(BindingKey),
Root = case Vsn of
4 -> root;
_ -> {root, XSrc}
end,
case {OldProps, NewProps} of
{_, #{data := _}} ->
LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Words),
LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Root, Words),
ets:insert(BindingTab, {{LeafNodeId, BindingKey, Dest}});
{#{data := _}, _} ->
case trie_follow_down_get_path(TrieTab, XSrc, Words) of
case trie_follow_down_get_path(TrieTab, XSrc, Root, Words) of
{ok, LeafNodeId, TriePath} ->
ets:delete(BindingTab, {LeafNodeId, BindingKey, Dest}),
trie_gc_path(TrieTab, BindingTab, TriePath);
Expand All @@ -1712,7 +1715,7 @@ register_rabbit_topic_trie_projection() ->
ok
end
end,
Projection = khepri_projection:new(rabbit_khepri_topic_trie_v4, PFun, Opts),
Projection = khepri_projection:new(TrieTabName, PFun, Opts),
PathPattern = topic_binding_path_pattern(),
unregister_old_rabbit_topic_trie_projections(),
khepri:register_projection(?STORE_ID, PathPattern, Projection).
Expand All @@ -1732,8 +1735,8 @@ topic_binding_path_pattern() ->
%% Each trie row is a 3-tuple: {Key, ChildNodeId, ChildCount}.
%% ChildCount tracks the number of outgoing edges from ChildNodeId.
%% It is incremented when a new edge is created, decremented during GC.
trie_follow_down_create(TrieTab, XSrc, Words) ->
trie_follow_down_create(TrieTab, XSrc, root, none, Words).
trie_follow_down_create(TrieTab, XSrc, Root, Words) ->
trie_follow_down_create(TrieTab, XSrc, Root, none, Words).

trie_follow_down_create(_TrieTab, _XSrc, NodeId, _ParentKey, []) ->
NodeId;
Expand All @@ -1756,8 +1759,8 @@ trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) ->

%% Walk down the trie following the given words, collecting the path
%% for later GC. Returns {ok, LeafNodeId, Path} or error.
trie_follow_down_get_path(TrieTab, XSrc, Words) ->
trie_follow_down_get_path(TrieTab, XSrc, root, none, Words, []).
trie_follow_down_get_path(TrieTab, XSrc, Root, Words) ->
trie_follow_down_get_path(TrieTab, XSrc, Root, none, Words, []).

trie_follow_down_get_path(_TrieTab, _XSrc, NodeId, _ParentKey, [], Path) ->
{ok, NodeId, Path};
Expand Down Expand Up @@ -1811,14 +1814,46 @@ supports_rabbit_khepri_topic_trie_v2() ->

-spec supports_rabbit_khepri_topic_trie_version() -> non_neg_integer().
supports_rabbit_khepri_topic_trie_version() ->
4.
5.

-spec topic_trie_table_names(non_neg_integer()) -> {atom(), atom()}.
topic_trie_table_names(V) when V >= 5 ->
{rabbit_khepri_topic_trie_v5,
rabbit_khepri_topic_binding_v5};
topic_trie_table_names(4) ->
{rabbit_khepri_topic_trie_v4,
rabbit_khepri_topic_binding_v4}.

get_effective_topic_binding_projection_version() ->
IsEnabled = rabbit_feature_flags:is_enabled(
topic_binding_projection_v4, non_blocking),
case IsEnabled of
true -> 4;
_ -> 3
IsV5 = rabbit_feature_flags:is_enabled(topic_binding_projection_v5,
non_blocking),
case IsV5 of
true ->
5;
_ ->
IsV4 = rabbit_feature_flags:is_enabled(topic_binding_projection_v4,
non_blocking),
case IsV4 of
true ->
4;
_ ->
3
end
end.

topic_binding_projection_v5_enable(
#{feature_name := topic_binding_projection_v5 = FeatureName}) ->
?LOG_DEBUG(
"Feature flag `~s`: register topic binding projection v5",
[FeatureName],
#{domain => ?RMQLOG_DOMAIN_DB}),
case register_rabbit_topic_trie_projection(5) of
ok ->
ok;
{error, {khepri, projection_already_exists, _Info}} ->
ok;
{error, _} = Error ->
Error
end.

topic_binding_projection_enable(
Expand All @@ -1827,7 +1862,7 @@ topic_binding_projection_enable(
"Feature flag `~s`: register topic binding projection v4",
[FeatureName],
#{domain => ?RMQLOG_DOMAIN_DB}),
case register_rabbit_topic_trie_projection() of
case register_rabbit_topic_trie_projection(4) of
ok ->
ok;
{error, {khepri, projection_already_exists, _Info}} ->
Expand All @@ -1836,6 +1871,15 @@ topic_binding_projection_enable(
Error
end.

topic_binding_projection_v5_post_enable(
#{feature_name := topic_binding_projection_v5 = FeatureName}) ->
?LOG_DEBUG(
"Feature flag `~s`: unregister old topic binding projections",
[FeatureName],
#{domain => ?RMQLOG_DOMAIN_DB}),
unregister_old_rabbit_topic_trie_projections(),
ok.

topic_binding_projection_post_enable(
#{feature_name := topic_binding_projection_v4 = FeatureName}) ->
?LOG_DEBUG(
Expand All @@ -1849,7 +1893,11 @@ unregister_old_rabbit_topic_trie_projections() ->
OldProjections0 = [{1, rabbit_khepri_topic_trie},
{2, rabbit_khepri_topic_trie_v2}],
OldProjections1 = case get_effective_topic_binding_projection_version() of
V when V >= 4 ->
V when V >= 5 ->
OldProjections0 ++
[{3, rabbit_khepri_topic_trie_v3},
{4, rabbit_khepri_topic_trie_v4}];
4 ->
OldProjections0 ++
[{3, rabbit_khepri_topic_trie_v3}];
_ ->
Expand Down
Loading
Loading