Skip to content

Commit 6971ed6

Browse files
ansdmichaelklishin
andauthored
Fix routing for empty binding key to topic exchange (#16271)
* Fix routing for empty binding key to topic exchange Fixes #16221 ## What? Fix a bug in the Khepri v4 topic trie projection where messages published with an empty routing key to a topic exchange were incorrectly routed to queues bound to other topic exchanges with an empty binding key. ## Why? The MQTT 5.0 spec states: > All Topic Names and Topic Filters MUST be at least one character long In contrast, the AMQP 0.9.1 spec state: > The routing key used for a topic exchange MUST consist of **zero** or more words delimited by dots. Each word may contain the letters A-Z and a-z and digits 0-9. The routing pattern follows the same rules as the routing key with the addition that * matches a single word, and # matches zero or more words Hence zero words, i.e. empty routing keys and empty binding keys, are expliclity allowed for the topic exchange type. In the Khepri v4 projection, the topic trie used a global `root` atom as the initial node ID for all topic exchanges. When a binding was created with an empty binding key (`<<>>`), `split_topic_key_binary/1` returned an empty list (`[]`). The trie traversal stopped immediately at the root, meaning the binding was inserted into the `rabbit_khepri_topic_binding_v4` ETS table with the global `root` atom as the `LeafNodeId`. Because the `LeafNodeId` lacked any exchange-specific context, all empty bindings across all topic exchanges were attached to the exact same global `root` node. Consequently, when a message was published with an empty routing key to any topic exchange, the routing logic (`trie_bindings/3`) would scan the ETS table starting at the global `root` node and incorrectly match bindings belonging to completely unrelated topic exchanges. ## How? To fix this and ensure exchange isolation, the conceptual root of the trie was changed from the global `root` atom to an exchange-specific tuple: `{root, XSrc}` (where `XSrc` is `{VHost, ExchangeName}`). - `rabbit_khepri:trie_follow_down_create/3` and `trie_follow_down_get_path/3` now initialize their trie traversal with `{root, XSrc}`. - `rabbit_db_topic_exchange:trie_match/5` now initiates routing from `{root, {VHost, XName}}`. This structural change guarantees that empty bindings are isolated per exchange in the bindings ETS table, completely resolving the cross-exchange leakage while fully supporting AMQP 0-9-1's allowance for empty binding keys without breaking backward compatibility. * Add new v5 projection for topic exchange routing This commit introduces a new feature flag, `topic_binding_projection_v5`, and a corresponding Khepri projection version (v5) to deploy the exchange isolation fix for empty binding keys. * Fix style * Add 4.3.1 release notes * Exercise one more code path #16271 #16221 --------- Co-authored-by: Michael Klishin <michaelklishin@icloud.com>
1 parent e56c164 commit 6971ed6

6 files changed

Lines changed: 234 additions & 66 deletions

File tree

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -255,3 +255,14 @@
255255
post_enable =>
256256
{rabbit_khepri, topic_binding_projection_post_enable}}
257257
}}).
258+
259+
-rabbit_feature_flag(
260+
{topic_binding_projection_v5,
261+
#{desc => "Enable the topic binding Khepri projection v5",
262+
stability => stable,
263+
depends_on => [topic_binding_projection_v4],
264+
callbacks => #{enable =>
265+
{rabbit_khepri, topic_binding_projection_v5_enable},
266+
post_enable =>
267+
{rabbit_khepri, topic_binding_projection_v5_post_enable}}
268+
}}).

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,6 @@
1616

1717
-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3).
1818

19-
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
20-
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
21-
2219
-type match_result() :: [rabbit_types:binding_destination() |
2320
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
2421

@@ -39,8 +36,14 @@ match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
3936
Words = split_topic_key_binary(RoutingKey),
4037
case rabbit_khepri:get_effective_topic_binding_projection_version() of
4138
V when V >= 4 ->
39+
XSrc = {VHost, XName},
40+
{TrieTab, BindingTab} = rabbit_khepri:topic_trie_table_names(V),
41+
Root = case V of
42+
4 -> root;
43+
_ -> {root, XSrc}
44+
end,
4245
try
43-
trie_match({VHost, XName}, root, Words, BKeys, [])
46+
trie_match(XSrc, TrieTab, BindingTab, Root, Words, BKeys, [])
4447
catch
4548
error:badarg ->
4649
[]
@@ -87,37 +90,36 @@ split_topic_key_binary(RoutingKey) ->
8790
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
8891
%% ==============================================================
8992

90-
trie_match(XSrc, Node, [], BKeys, Acc0) ->
91-
Acc1 = trie_bindings(Node, BKeys, Acc0),
92-
trie_match_try(XSrc, Node, <<"#">>,
93-
fun trie_match_skip_any/5,
93+
trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc0) ->
94+
Acc1 = trie_bindings(BindTab, Node, BKeys, Acc0),
95+
trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>,
96+
fun trie_match_skip_any/7,
9497
[], BKeys, Acc1);
95-
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
96-
Acc1 = trie_match_try(XSrc, Node, W,
97-
fun trie_match/5,
98+
trie_match(XSrc, TrieTab, BindTab, Node, [W | RestW] = Words, BKeys, Acc0) ->
99+
Acc1 = trie_match_try(XSrc, TrieTab, BindTab, Node, W,
100+
fun trie_match/7,
98101
RestW, BKeys, Acc0),
99-
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
100-
fun trie_match/5,
102+
Acc2 = trie_match_try(XSrc, TrieTab, BindTab, Node, <<"*">>,
103+
fun trie_match/7,
101104
RestW, BKeys, Acc1),
102-
trie_match_try(XSrc, Node, <<"#">>,
103-
fun trie_match_skip_any/5,
105+
trie_match_try(XSrc, TrieTab, BindTab, Node, <<"#">>,
106+
fun trie_match_skip_any/7,
104107
Words, BKeys, Acc2).
105108

106-
trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
107-
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
108-
{XSrc, Node, Word}, 2, undefined) of
109+
trie_match_try(XSrc, TrieTab, BindTab, Node, Word, MatchFun, RestW, BKeys, Acc) ->
110+
case ets:lookup_element(TrieTab, {XSrc, Node, Word}, 2, undefined) of
109111
undefined ->
110112
Acc;
111113
NextNode ->
112-
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
114+
MatchFun(XSrc, TrieTab, BindTab, NextNode, RestW, BKeys, Acc)
113115
end.
114116

115-
trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
116-
trie_match(XSrc, Node, [], BKeys, Acc);
117-
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
117+
trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc) ->
118+
trie_match(XSrc, TrieTab, BindTab, Node, [], BKeys, Acc);
119+
trie_match_skip_any(XSrc, TrieTab, BindTab, Node, [_ | RestW] = Words, BKeys, Acc) ->
118120
trie_match_skip_any(
119-
XSrc, Node, RestW, BKeys,
120-
trie_match(XSrc, Node, Words, BKeys, Acc)).
121+
XSrc, TrieTab, BindTab, Node, RestW, BKeys,
122+
trie_match(XSrc, TrieTab, BindTab, Node, Words, BKeys, Acc)).
121123

122124
%% Collect all destinations bound at the given trie node.
123125
%%
@@ -129,15 +131,15 @@ trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
129131
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
130132
%% which is cheaper than F individual ets:next/2 calls
131133
%% (each O(log N) due to CATree fresh-stack allocation).
132-
trie_bindings(NodeId, BKeys, Acc) ->
134+
trie_bindings(BindingTab, NodeId, BKeys, Acc) ->
133135
StartKey = {NodeId, <<>>, {}},
134-
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
136+
case ets:next(BindingTab, StartKey) of
135137
{NodeId, BKey1, Dest1} = Key1 ->
136-
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
138+
case ets:next(BindingTab, Key1) of
137139
{NodeId, BKey2, Dest2} = Key2 ->
138-
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
140+
case ets:next(BindingTab, Key2) of
139141
{NodeId, _, _} ->
140-
collect_select(NodeId, BKeys, Acc);
142+
collect_select(BindingTab, NodeId, BKeys, Acc);
141143
_ ->
142144
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
143145
collect_binding(Dest2, BKey2, BKeys, Acc1)
@@ -154,12 +156,12 @@ collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
154156
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
155157
[Dest | Acc].
156158

157-
collect_select(NodeId, false, Acc) ->
158-
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
159+
collect_select(BindingTab, NodeId, false, Acc) ->
160+
Dests = ets:select(BindingTab,
159161
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
160162
Dests ++ Acc;
161-
collect_select(NodeId, true, Acc) ->
162-
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
163+
collect_select(BindingTab, NodeId, true, Acc) ->
164+
DestsAndBKeys = ets:select(BindingTab,
163165
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
164166
format_dest_bkeys(DestsAndBKeys, Acc).
165167

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 78 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -182,11 +182,14 @@
182182
supports_rabbit_khepri_topic_trie_version/0]).
183183

184184
%% Called locally to determine which projection to use.
185-
-export([get_effective_topic_binding_projection_version/0]).
185+
-export([get_effective_topic_binding_projection_version/0,
186+
topic_trie_table_names/1]).
186187

187188
%% Used during topic binding projections related feature flags handling.
188189
-export([topic_binding_projection_enable/1,
189190
topic_binding_projection_post_enable/1]).
191+
-export([topic_binding_projection_v5_enable/1,
192+
topic_binding_projection_v5_post_enable/1]).
190193

191194
-ifdef(TEST).
192195
-export([register_projections/0,
@@ -1564,8 +1567,8 @@ projection_fun_for_sets(MapFun) ->
15641567

15651568
register_rabbit_topic_binding_projection() ->
15661569
case get_effective_topic_binding_projection_version() of
1567-
V when V >= 4 -> register_rabbit_topic_trie_projection();
1568-
_ -> register_rabbit_topic_graph_projection()
1570+
V when V >= 4 -> register_rabbit_topic_trie_projection(V);
1571+
_ -> register_rabbit_topic_graph_projection()
15691572
end.
15701573

15711574
%% Topic routing via trie ETS projection (v3).
@@ -1648,7 +1651,7 @@ register_rabbit_topic_graph_projection() ->
16481651
_ = unregister_old_rabbit_topic_trie_projections(),
16491652
khepri:register_projection(?STORE_ID, PathPattern, Projection).
16501653

1651-
%% Topic routing via trie + ordered_set ETS projections (v4).
1654+
%% Topic routing via trie + ordered_set ETS projections.
16521655
%% Uses a single Khepri projection with two ETS tables:
16531656
%%
16541657
%% Trie edges table (set) for fast trie navigation during routing:
@@ -1658,14 +1661,11 @@ register_rabbit_topic_graph_projection() ->
16581661
%% Row: {{NodeId, BindingKey, Dest}}
16591662
%%
16601663
%% XSrc = {VHost, ExchangeName} (binaries)
1661-
%% NodeId = root | reference()
1664+
%% NodeId = {root, XSrc} (v5) | root (v4) | reference()
16621665
%% Word = binary() (a single topic segment, e.g. <<"foo">>, <<"*">>, <<"#">>)
16631666
%% ChildCount = non_neg_integer() (number of outgoing edges)
16641667
%% Dest = #resource{}
1665-
%%
1666-
%% This projection is only used once the `topic_binding_projection_v4' feature
1667-
%% flag is enabled.
1668-
register_rabbit_topic_trie_projection() ->
1668+
register_rabbit_topic_trie_projection(Vsn) ->
16691669
ShouldProcessFun =
16701670
fun (rabbit_db_topic_exchange, split_topic_key_binary, 1, _From) ->
16711671
%% This function uses `persistent_term' to store a lazily compiled
@@ -1681,27 +1681,30 @@ register_rabbit_topic_trie_projection() ->
16811681
(M, F, A, From) ->
16821682
khepri_tx_adv:should_process_function(M, F, A, From)
16831683
end,
1684-
Opts = #{tables => #{rabbit_khepri_topic_trie_v4 =>
1685-
#{type => set},
1686-
rabbit_khepri_topic_binding_v4 =>
1687-
#{type => ordered_set}},
1684+
{TrieTabName, BindingTabName} = topic_trie_table_names(Vsn),
1685+
Opts = #{tables => #{TrieTabName => #{type => set},
1686+
BindingTabName => #{type => ordered_set}},
16881687
keypos => 1,
16891688
read_concurrency => true,
16901689
standalone_fun_options => #{should_process_function => ShouldProcessFun}},
16911690
PFun = fun(Tables, Path, OldProps, NewProps) ->
1692-
#{rabbit_khepri_topic_trie_v4 := TrieTab,
1693-
rabbit_khepri_topic_binding_v4 := BindingTab} = Tables,
1691+
#{TrieTabName := TrieTab,
1692+
BindingTabName := BindingTab} = Tables,
16941693
{VHost, ExchangeName, Kind, DstName, BindingKey} =
16951694
rabbit_db_binding:khepri_route_path_to_args(Path),
16961695
XSrc = {VHost, ExchangeName},
16971696
Dest = rabbit_misc:r(VHost, Kind, DstName),
16981697
Words = rabbit_db_topic_exchange:split_topic_key_binary(BindingKey),
1698+
Root = case Vsn of
1699+
4 -> root;
1700+
_ -> {root, XSrc}
1701+
end,
16991702
case {OldProps, NewProps} of
17001703
{_, #{data := _}} ->
1701-
LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Words),
1704+
LeafNodeId = trie_follow_down_create(TrieTab, XSrc, Root, Words),
17021705
ets:insert(BindingTab, {{LeafNodeId, BindingKey, Dest}});
17031706
{#{data := _}, _} ->
1704-
case trie_follow_down_get_path(TrieTab, XSrc, Words) of
1707+
case trie_follow_down_get_path(TrieTab, XSrc, Root, Words) of
17051708
{ok, LeafNodeId, TriePath} ->
17061709
ets:delete(BindingTab, {LeafNodeId, BindingKey, Dest}),
17071710
trie_gc_path(TrieTab, BindingTab, TriePath);
@@ -1712,7 +1715,7 @@ register_rabbit_topic_trie_projection() ->
17121715
ok
17131716
end
17141717
end,
1715-
Projection = khepri_projection:new(rabbit_khepri_topic_trie_v4, PFun, Opts),
1718+
Projection = khepri_projection:new(TrieTabName, PFun, Opts),
17161719
PathPattern = topic_binding_path_pattern(),
17171720
unregister_old_rabbit_topic_trie_projections(),
17181721
khepri:register_projection(?STORE_ID, PathPattern, Projection).
@@ -1732,8 +1735,8 @@ topic_binding_path_pattern() ->
17321735
%% Each trie row is a 3-tuple: {Key, ChildNodeId, ChildCount}.
17331736
%% ChildCount tracks the number of outgoing edges from ChildNodeId.
17341737
%% It is incremented when a new edge is created, decremented during GC.
1735-
trie_follow_down_create(TrieTab, XSrc, Words) ->
1736-
trie_follow_down_create(TrieTab, XSrc, root, none, Words).
1738+
trie_follow_down_create(TrieTab, XSrc, Root, Words) ->
1739+
trie_follow_down_create(TrieTab, XSrc, Root, none, Words).
17371740

17381741
trie_follow_down_create(_TrieTab, _XSrc, NodeId, _ParentKey, []) ->
17391742
NodeId;
@@ -1756,8 +1759,8 @@ trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) ->
17561759

17571760
%% Walk down the trie following the given words, collecting the path
17581761
%% for later GC. Returns {ok, LeafNodeId, Path} or error.
1759-
trie_follow_down_get_path(TrieTab, XSrc, Words) ->
1760-
trie_follow_down_get_path(TrieTab, XSrc, root, none, Words, []).
1762+
trie_follow_down_get_path(TrieTab, XSrc, Root, Words) ->
1763+
trie_follow_down_get_path(TrieTab, XSrc, Root, none, Words, []).
17611764

17621765
trie_follow_down_get_path(_TrieTab, _XSrc, NodeId, _ParentKey, [], Path) ->
17631766
{ok, NodeId, Path};
@@ -1811,14 +1814,46 @@ supports_rabbit_khepri_topic_trie_v2() ->
18111814

18121815
-spec supports_rabbit_khepri_topic_trie_version() -> non_neg_integer().
18131816
supports_rabbit_khepri_topic_trie_version() ->
1814-
4.
1817+
5.
1818+
1819+
-spec topic_trie_table_names(non_neg_integer()) -> {atom(), atom()}.
1820+
topic_trie_table_names(V) when V >= 5 ->
1821+
{rabbit_khepri_topic_trie_v5,
1822+
rabbit_khepri_topic_binding_v5};
1823+
topic_trie_table_names(4) ->
1824+
{rabbit_khepri_topic_trie_v4,
1825+
rabbit_khepri_topic_binding_v4}.
18151826

18161827
get_effective_topic_binding_projection_version() ->
1817-
IsEnabled = rabbit_feature_flags:is_enabled(
1818-
topic_binding_projection_v4, non_blocking),
1819-
case IsEnabled of
1820-
true -> 4;
1821-
_ -> 3
1828+
IsV5 = rabbit_feature_flags:is_enabled(topic_binding_projection_v5,
1829+
non_blocking),
1830+
case IsV5 of
1831+
true ->
1832+
5;
1833+
_ ->
1834+
IsV4 = rabbit_feature_flags:is_enabled(topic_binding_projection_v4,
1835+
non_blocking),
1836+
case IsV4 of
1837+
true ->
1838+
4;
1839+
_ ->
1840+
3
1841+
end
1842+
end.
1843+
1844+
topic_binding_projection_v5_enable(
1845+
#{feature_name := topic_binding_projection_v5 = FeatureName}) ->
1846+
?LOG_DEBUG(
1847+
"Feature flag `~s`: register topic binding projection v5",
1848+
[FeatureName],
1849+
#{domain => ?RMQLOG_DOMAIN_DB}),
1850+
case register_rabbit_topic_trie_projection(5) of
1851+
ok ->
1852+
ok;
1853+
{error, {khepri, projection_already_exists, _Info}} ->
1854+
ok;
1855+
{error, _} = Error ->
1856+
Error
18221857
end.
18231858

18241859
topic_binding_projection_enable(
@@ -1827,7 +1862,7 @@ topic_binding_projection_enable(
18271862
"Feature flag `~s`: register topic binding projection v4",
18281863
[FeatureName],
18291864
#{domain => ?RMQLOG_DOMAIN_DB}),
1830-
case register_rabbit_topic_trie_projection() of
1865+
case register_rabbit_topic_trie_projection(4) of
18311866
ok ->
18321867
ok;
18331868
{error, {khepri, projection_already_exists, _Info}} ->
@@ -1836,6 +1871,15 @@ topic_binding_projection_enable(
18361871
Error
18371872
end.
18381873

1874+
topic_binding_projection_v5_post_enable(
1875+
#{feature_name := topic_binding_projection_v5 = FeatureName}) ->
1876+
?LOG_DEBUG(
1877+
"Feature flag `~s`: unregister old topic binding projections",
1878+
[FeatureName],
1879+
#{domain => ?RMQLOG_DOMAIN_DB}),
1880+
unregister_old_rabbit_topic_trie_projections(),
1881+
ok.
1882+
18391883
topic_binding_projection_post_enable(
18401884
#{feature_name := topic_binding_projection_v4 = FeatureName}) ->
18411885
?LOG_DEBUG(
@@ -1849,7 +1893,11 @@ unregister_old_rabbit_topic_trie_projections() ->
18491893
OldProjections0 = [{1, rabbit_khepri_topic_trie},
18501894
{2, rabbit_khepri_topic_trie_v2}],
18511895
OldProjections1 = case get_effective_topic_binding_projection_version() of
1852-
V when V >= 4 ->
1896+
V when V >= 5 ->
1897+
OldProjections0 ++
1898+
[{3, rabbit_khepri_topic_trie_v3},
1899+
{4, rabbit_khepri_topic_trie_v4}];
1900+
4 ->
18531901
OldProjections0 ++
18541902
[{3, rabbit_khepri_topic_trie_v3}];
18551903
_ ->

0 commit comments

Comments
 (0)