Skip to content

Commit 05a2191

Browse files
committed
Add new v5 projection for topic exchange routing
This commit introduces a new feature flag, `topic_binding_projection_v5`, and a corresponding Khepri projection version (v5) to deploy the exchange isolation fix for empty binding keys.
1 parent b1bff0a commit 05a2191

4 files changed

Lines changed: 123 additions & 65 deletions

File tree

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,14 @@
255255
post_enable =>
256256
{rabbit_khepri, topic_binding_projection_post_enable}}
257257
}}).
258+
259+
-rabbit_feature_flag(
260+
{topic_binding_projection_v5,
261+
#{desc => "Enable the topic binding Khepri projection v5",
262+
stability => stable,
263+
depends_on => [topic_binding_projection_v4],
264+
callbacks => #{enable =>
265+
{rabbit_khepri, topic_binding_projection_v5_enable},
266+
post_enable =>
267+
{rabbit_khepri, topic_binding_projection_v5_post_enable}}
268+
}}).

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3).
1818

19-
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
20-
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
21-
2219
-type match_result() :: [rabbit_types:binding_destination() |
2320
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
2421

@@ -40,8 +37,13 @@ match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
4037
case rabbit_khepri:get_effective_topic_binding_projection_version() of
4138
V when V >= 4 ->
4239
XSrc = {VHost, XName},
40+
{TrieTab, BindingTab} = rabbit_khepri:topic_trie_table_names(V),
41+
Root = case V of
42+
4 -> root;
43+
_ -> {root, XSrc}
44+
end,
4345
try
44-
trie_match(XSrc, {root, XSrc}, Words, BKeys, [])
46+
trie_match(XSrc, TrieTab, BindingTab, Root, Words, BKeys, [])
4547
catch
4648
error:badarg ->
4749
[]
@@ -88,37 +90,36 @@ split_topic_key_binary(RoutingKey) ->
8890
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
8991
%% ==============================================================
9092

91-
trie_match(XSrc, Node, [], BKeys, Acc0) ->
92-
Acc1 = trie_bindings(Node, BKeys, Acc0),
93-
trie_match_try(XSrc, Node, <<"#">>,
94-
fun trie_match_skip_any/5,
93+
trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc0) ->
94+
Acc1 = trie_bindings(BindTab, Node, BKeys, Acc0),
95+
trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>,
96+
fun trie_match_skip_any/7,
9597
[], BKeys, Acc1);
96-
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
97-
Acc1 = trie_match_try(XSrc, Node, W,
98-
fun trie_match/5,
98+
trie_match(XSrc, TrieTab, BindTab, Node, [W | RestW] = Words, BKeys, Acc0) ->
99+
Acc1 = trie_match_try(XSrc, TrieTab, BindTab, Node, W,
100+
fun trie_match/7,
99101
RestW, BKeys, Acc0),
100-
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
101-
fun trie_match/5,
102+
Acc2 = trie_match_try(XSrc, TrieTab, BindTab, Node, <<"*">>,
103+
fun trie_match/7,
102104
RestW, BKeys, Acc1),
103-
trie_match_try(XSrc, Node, <<"#">>,
104-
fun trie_match_skip_any/5,
105+
trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>,
106+
fun trie_match_skip_any/7,
105107
Words, BKeys, Acc2).
106108

107-
trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
108-
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
109-
{XSrc, Node, Word}, 2, undefined) of
109+
trie_match_try(XSrc, TrieTab, BindTab, Node, Word, MatchFun, RestW, BKeys, Acc) ->
110+
case ets:lookup_element(TrieTab, {XSrc, Node, Word}, 2, undefined) of
110111
undefined ->
111112
Acc;
112113
NextNode ->
113-
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
114+
MatchFun(XSrc, TrieTab, BindTab, NextNode, RestW, BKeys, Acc)
114115
end.
115116

116-
trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
117-
trie_match(XSrc, Node, [], BKeys, Acc);
118-
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
117+
trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc) ->
118+
trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc);
119+
trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [_ | RestW] = Words, BKeys, Acc) ->
119120
trie_match_skip_any(
120-
XSrc, Node, RestW, BKeys,
121-
trie_match(XSrc, Node, Words, BKeys, Acc)).
121+
XSrc, TrieTab, BindTab, Node, RestW, BKeys,
122+
trie_match(XSrc, TrieTab, BindTab, Node, Words, BKeys, Acc)).
122123

123124
%% Collect all destinations bound at the given trie node.
124125
%%
@@ -130,15 +131,15 @@ trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
130131
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
131132
%% which is cheaper than F individual ets:next/2 calls
132133
%% (each O(log N) due to CATree fresh-stack allocation).
133-
trie_bindings(NodeId, BKeys, Acc) ->
134+
trie_bindings(BindingTab, NodeId, BKeys, Acc) ->
134135
StartKey = {NodeId, <<>>, {}},
135-
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
136+
case ets:next(BindingTab, StartKey) of
136137
{NodeId, BKey1, Dest1} = Key1 ->
137-
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
138+
case ets:next(BindingTab, Key1) of
138139
{NodeId, BKey2, Dest2} = Key2 ->
139-
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
140+
case ets:next(BindingTab, Key2) of
140141
{NodeId, _, _} ->
141-
collect_select(NodeId, BKeys, Acc);
142+
collect_select(BindingTab, NodeId, BKeys, Acc);
142143
_ ->
143144
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
144145
collect_binding(Dest2, BKey2, BKeys, Acc1)
@@ -155,12 +156,12 @@ collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
155156
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
156157
[Dest | Acc].
157158

158-
collect_select(NodeId, false, Acc) ->
159-
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
159+
collect_select(BindingTab, NodeId, false, Acc) ->
160+
Dests = ets:select(BindingTab,
160161
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
161162
Dests ++ Acc;
162-
collect_select(NodeId, true, Acc) ->
163-
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
163+
collect_select(BindingTab, NodeId, true, Acc) ->
164+
DestsAndBKeys = ets:select(BindingTab,
164165
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
165166
format_dest_bkeys(DestsAndBKeys, Acc).
166167

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 75 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,14 @@
182182
supports_rabbit_khepri_topic_trie_version/0]).
183183

184184
%% Called locally to determine which projection to use.
185-
-export([get_effective_topic_binding_projection_version/0]).
185+
-export([get_effective_topic_binding_projection_version/0,
186+
topic_trie_table_names/1]).
186187

187188
%% Used during topic binding projections related feature flags handling.
188189
-export([topic_binding_projection_enable/1,
189190
topic_binding_projection_post_enable/1]).
191+
-export([topic_binding_projection_v5_enable/1,
192+
topic_binding_projection_v5_post_enable/1]).
190193

191194
-ifdef(TEST).
192195
-export([register_projections/0,
@@ -1564,8 +1567,8 @@ projection_fun_for_sets(MapFun) ->
15641567

15651568
register_rabbit_topic_binding_projection() ->
15661569
case get_effective_topic_binding_projection_version() of
1567-
V when V >= 4 -> register_rabbit_topic_trie_projection();
1568-
_ -> register_rabbit_topic_graph_projection()
1570+
V when V >= 4 -> register_rabbit_topic_trie_projection(V);
1571+
_ -> register_rabbit_topic_graph_projection()
15691572
end.
15701573

15711574
%% Topic routing via trie ETS projection (v3).
@@ -1648,7 +1651,7 @@ register_rabbit_topic_graph_projection() ->
16481651
_ = unregister_old_rabbit_topic_trie_projections(),
16491652
khepri:register_projection(?STORE_ID, PathPattern, Projection).
16501653

1651-
%% Topic routing via trie + ordered_set ETS projections (v4).
1654+
%% Topic routing via trie + ordered_set ETS projections.
16521655
%% Uses a single Khepri projection with two ETS tables:
16531656
%%
16541657
%% Trie edges table (set) for fast trie navigation during routing:
@@ -1662,10 +1665,7 @@ register_rabbit_topic_graph_projection() ->
16621665
%% Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>)
16631666
%% ChildCount = non_neg_integer() (number of outgoing edges)
16641667
%% Dest = #resource{}
1665-
%%
1666-
%% This projection is only used once the `topic_binding_projection_v4' feature
1667-
%% flag is enabled.
1668-
register_rabbit_topic_trie_projection() ->
1668+
register_rabbit_topic_trie_projection(Vsn) ->
16691669
ShouldProcessFun =
16701670
fun (rabbit_db_topic_exchange, split_topic_key_binary, 1, _From) ->
16711671
%% This function uses `persistent_term' to store a lazily compiled
@@ -1681,27 +1681,30 @@ register_rabbit_topic_trie_projection() ->
16811681
(M, F, A, From) ->
16821682
khepri_tx_adv:should_process_function(M, F, A, From)
16831683
end,
1684-
Opts = #{tables => #{rabbit_khepri_topic_trie_v4 =>
1685-
#{type => set},
1686-
rabbit_khepri_topic_binding_v4 =>
1687-
#{type => ordered_set}},
1684+
{TrieTabName, BindingTabName} = topic_trie_table_names(Vsn),
1685+
Opts = #{tables => #{TrieTabName => #{type => set},
1686+
BindingTabName => #{type => ordered_set}},
16881687
keypos => 1,
16891688
read_concurrency => true,
16901689
standalone_fun_options => #{should_process_function => ShouldProcessFun}},
16911690
PFun = fun(Tables, Path, OldProps, NewProps) ->
1692-
#{rabbit_khepri_topic_trie_v4 := TrieTab,
1693-
rabbit_khepri_topic_binding_v4 := BindingTab} = Tables,
1691+
#{TrieTabName := TrieTab,
1692+
BindingTabName := BindingTab} = Tables,
16941693
{VHost, ExchangeName, Kind, DstName, BindingKey} =
16951694
rabbit_db_binding:khepri_route_path_to_args(Path),
16961695
XSrc = {VHost, ExchangeName},
16971696
Dest = rabbit_misc:r(VHost, Kind, DstName),
16981697
Words = rabbit_db_topic_exchange:split_topic_key_binary(BindingKey),
1698+
Root = case Vsn of
1699+
4 -> root;
1700+
_ -> {root, XSrc}
1701+
end,
16991702
case {OldProps, NewProps} of
17001703
{_, #{data := _}} ->
1701-
LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Words),
1704+
LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Root, Words),
17021705
ets:insert(BindingTab, {{LeafNodeId, BindingKey, Dest}});
17031706
{#{data := _}, _} ->
1704-
case trie_follow_down_get_path(TrieTab, XSrc, Words) of
1707+
case trie_follow_down_get_path(TrieTab, XSrc, Root, Words) of
17051708
{ok, LeafNodeId, TriePath} ->
17061709
ets:delete(BindingTab, {LeafNodeId, BindingKey, Dest}),
17071710
trie_gc_path(TrieTab, BindingTab, TriePath);
@@ -1712,7 +1715,7 @@ register_rabbit_topic_trie_projection() ->
17121715
ok
17131716
end
17141717
end,
1715-
Projection = khepri_projection:new(rabbit_khepri_topic_trie_v4, PFun, Opts),
1718+
Projection = khepri_projection:new(TrieTabName, PFun, Opts),
17161719
PathPattern = topic_binding_path_pattern(),
17171720
unregister_old_rabbit_topic_trie_projections(),
17181721
khepri:register_projection(?STORE_ID, PathPattern, Projection).
@@ -1732,8 +1735,8 @@ topic_binding_path_pattern() ->
17321735
%% Each trie row is a 3-tuple: {Key, ChildNodeId, ChildCount}.
17331736
%% ChildCount tracks the number of outgoing edges from ChildNodeId.
17341737
%% It is incremented when a new edge is created, decremented during GC.
1735-
trie_follow_down_create(TrieTab, XSrc, Words) ->
1736-
trie_follow_down_create(TrieTab, XSrc, {root, XSrc}, none, Words).
1738+
trie_follow_down_create(TrieTab, XSrc, Root, Words) ->
1739+
trie_follow_down_create(TrieTab, XSrc, Root, none, Words).
17371740

17381741
trie_follow_down_create(_TrieTab, _XSrc, NodeId, _ParentKey, []) ->
17391742
NodeId;
@@ -1756,8 +1759,8 @@ trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) ->
17561759

17571760
%% Walk down the trie following the given words, collecting the path
17581761
%% for later GC. Returns {ok, LeafNodeId, Path} or error.
1759-
trie_follow_down_get_path(TrieTab, XSrc, Words) ->
1760-
trie_follow_down_get_path(TrieTab, XSrc, {root, XSrc}, none, Words, []).
1762+
trie_follow_down_get_path(TrieTab, XSrc, Root, Words) ->
1763+
trie_follow_down_get_path(TrieTab, XSrc, Root, none, Words, []).
17611764

17621765
trie_follow_down_get_path(_TrieTab, _XSrc, NodeId, _ParentKey, [], Path) ->
17631766
{ok, NodeId, Path};
@@ -1811,14 +1814,44 @@ supports_rabbit_khepri_topic_trie_v2() ->
18111814

18121815
-spec supports_rabbit_khepri_topic_trie_version() -> non_neg_integer().
18131816
supports_rabbit_khepri_topic_trie_version() ->
1814-
4.
1817+
5.
1818+
1819+
-spec topic_trie_table_names(non_neg_integer()) -> {atom(), atom()}.
1820+
topic_trie_table_names(V) when V >= 5 ->
1821+
{rabbit_khepri_topic_trie_v5,
1822+
rabbit_khepri_topic_binding_v5};
1823+
topic_trie_table_names(4) ->
1824+
{rabbit_khepri_topic_trie_v4,
1825+
rabbit_khepri_topic_binding_v4}.
18151826

18161827
get_effective_topic_binding_projection_version() ->
1817-
IsEnabled = rabbit_feature_flags:is_enabled(
1818-
topic_binding_projection_v4, non_blocking),
1819-
case IsEnabled of
1820-
true -> 4;
1821-
_ -> 3
1828+
case rabbit_feature_flags:is_enabled(topic_binding_projection_v5,
1829+
non_blocking) of
1830+
true ->
1831+
5;
1832+
_ ->
1833+
case rabbit_feature_flags:is_enabled(topic_binding_projection_v4,
1834+
non_blocking) of
1835+
true ->
1836+
4;
1837+
_ ->
1838+
3
1839+
end
1840+
end.
1841+
1842+
topic_binding_projection_v5_enable(
1843+
#{feature_name := topic_binding_projection_v5 = FeatureName}) ->
1844+
?LOG_DEBUG(
1845+
"Feature flag `~s`: register topic binding projection v5",
1846+
[FeatureName],
1847+
#{domain => ?RMQLOG_DOMAIN_DB}),
1848+
case register_rabbit_topic_trie_projection(5) of
1849+
ok ->
1850+
ok;
1851+
{error, {khepri, projection_already_exists, _Info}} ->
1852+
ok;
1853+
{error, _} = Error ->
1854+
Error
18221855
end.
18231856

18241857
topic_binding_projection_enable(
@@ -1827,7 +1860,7 @@ topic_binding_projection_enable(
18271860
"Feature flag `~s`: register topic binding projection v4",
18281861
[FeatureName],
18291862
#{domain => ?RMQLOG_DOMAIN_DB}),
1830-
case register_rabbit_topic_trie_projection() of
1863+
case register_rabbit_topic_trie_projection(4) of
18311864
ok ->
18321865
ok;
18331866
{error, {khepri, projection_already_exists, _Info}} ->
@@ -1836,6 +1869,15 @@ topic_binding_projection_enable(
18361869
Error
18371870
end.
18381871

1872+
topic_binding_projection_v5_post_enable(
1873+
#{feature_name := topic_binding_projection_v5 = FeatureName}) ->
1874+
?LOG_DEBUG(
1875+
"Feature flag `~s`: unregister old topic binding projections",
1876+
[FeatureName],
1877+
#{domain => ?RMQLOG_DOMAIN_DB}),
1878+
unregister_old_rabbit_topic_trie_projections(),
1879+
ok.
1880+
18391881
topic_binding_projection_post_enable(
18401882
#{feature_name := topic_binding_projection_v4 = FeatureName}) ->
18411883
?LOG_DEBUG(
@@ -1849,7 +1891,11 @@ unregister_old_rabbit_topic_trie_projections() ->
18491891
OldProjections0 = [{1, rabbit_khepri_topic_trie},
18501892
{2, rabbit_khepri_topic_trie_v2}],
18511893
OldProjections1 = case get_effective_topic_binding_projection_version() of
1852-
V when V >= 4 ->
1894+
V when V >= 5 ->
1895+
OldProjections0 ++
1896+
[{3, rabbit_khepri_topic_trie_v3},
1897+
{4, rabbit_khepri_topic_trie_v4}];
1898+
4 ->
18531899
OldProjections0 ++
18541900
[{3, rabbit_khepri_topic_trie_v3}];
18551901
_ ->

deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414

1515
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
1616

17-
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
18-
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
17+
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v5).
18+
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v5).
1919

2020
-export([all/0,
2121
groups/0,
@@ -90,7 +90,7 @@ init_per_group(cluster_size_3 = _Group, Config0) ->
9090
case Config1 of
9191
_ when is_list(Config1) ->
9292
Ret = rabbit_ct_broker_helpers:enable_feature_flag(
93-
Config1, topic_binding_projection_v4),
93+
Config1, topic_binding_projection_v5),
9494
case Ret of
9595
ok ->
9696
Config1;

0 commit comments

Comments
 (0)