Skip to content

Commit 8929bc5

Browse files
authored
Replace Khepri topic routing projection with trie + ordered_set (v4) (#15619)
* Replace Khepri topic routing projection with trie + ordered_set (v4) Resolves #15588. The previous Khepri topic routing projection (v3) stored topic bindings as sets:set(#binding{}) inside trie leaf nodes. This design had a major performance drawback: On the insertion/deletion path (in the single Khepri Ra process), every binding change required a read-modify-write of the entire sets:set(), making it O(N) in the number of bindings at that leaf. With many MQTT clients connecting concurrently (each subscribing to the same topic filter), this made the Ra process a bottleneck. Another less severe performance issue was that the entire binding was being copied including the binding arguments containing the MQTT 5.0 subscription options such as: ``` {<<"x-mqtt-subscription-opts">>,table, [{<<"id">>,unsignedint,1}, {<<"no-local">>,bool,false}, {<<"qos">>,unsignedbyte,0}, {<<"retain-as-published">>,bool,false}, {<<"retain-handling">>,unsignedbyte,0}]}] ``` Replace the single ETS projection table with two purpose-built tables: 1. Trie edges table (ETS set, read_concurrency=true): - Row: `{{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}` - XSrc = {VHost, ExchangeName} (compact 2-tuple of binaries) - NodeId = root | reference() - ChildCount tracks outgoing edges for garbage collection 2. Leaf bindings table (ETS ordered_set, read_concurrency=true): - Key: {NodeId, BindingKey, Dest} - Stored as 1-tuples: {{NodeId, BindingKey, Dest}} - No value column; all data is in the key to minimize copying The trie structure preserves O(depth * 3) routing complexity regardless of the number of overall bindings or wildcard filters. At each trie level, we probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via ets:lookup_element/4 which copies only the ChildNodeId (a reference). The ordered_set for bindings provides: - O(log N) insert and delete per binding (no read-modify-write) - The binding key (needed for MQTT subscription identifiers and topic aliases) is part of the key, so it is returned directly during destination collection without additional lookups Collecting destinations at a matched trie leaf uses a hybrid strategy: - Fanout 0-2 (the common case: unicast, device + stream): up to 3 ets:next/2 probes. Each ets:next/2 call costs O(log N) because the CATree (used with read_concurrency) allocates a fresh tree traversal stack on each call. - Fanout > 2: ets:select/2 with a partially bound key does an O(log N) seek followed by an O(F) range scan. The match spec compilation overhead amortises over the larger result set. ets:lookup_element/4 (OTP 26+) returns a default value on miss instead of throwing badarg, and copies only the requested element on hit. This avoids both exception overhead (misses are common during trie traversal of <<"*">> and <<"#">> branches) and unnecessary data copying (we only need the ChildNodeId, not the full row). Trie node IDs are ephemeral (the tables are rebuilt when the Khepri projection is re-registered). make_ref() is fast, globally unique within a node, and has good hash distribution for the ETS set table. When a binding is deleted, the trie path from root to leaf is collected in a single downward walk (trie_follow_down_get_path). Empty nodes are then pruned bottom-up: a node is empty when its ChildCount is 0 and it has no bindings in the ordered_set table. Benchmarks below were run with 500K routing operations per scenario (on the same machine, back-to-back between main (v3) and this commit. Significant insert/delete improvements: Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%) v3 did a read-modify-write of sets:set() per binding; v4 does a single ets:insert into the ordered_set plus trie edge updates. MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%) Same mechanism as churn insert. Particularly impactful when many clients share the same wildcard filter (e.g. "broadcast.#"), since v3's sets:set() grew with each client while v4 inserts are O(log N) regardless. Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%) The worst case for v3: all 10K bindings share the same key, so each insert copies and rebuilds the growing sets:set(). Routing improvements: MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%) Each route matches 1 queue among 10K unique exact keys plus 10K queues sharing "broadcast.#". v3 stored bindings in the same ETS row as the trie edge, so every trie lookup copied the entire sets:set(). v4 separates trie edges (small rows, set table) from bindings (ordered_set), so the trie walk copies only references. Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%) v3 copied a 10K-element sets:set() out of ETS in a single ets:lookup, then called sets:to_list/1. v4 uses ets:select/2 with a partially bound key, which does an O(log N) seek and then an efficient O(F) range scan without intermediate set conversion. MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%) Same mechanism as above. Scenarios with no significant change (within benchmark noise): Exact match, wildcard *, wildcard #, mixed wildcards, and many wildcard filters showed no clear difference. Both v3 and v4 use a trie walk, so routing speed is comparable when the fanout is small and the bottleneck is trie traversal rather than destination collection. * Improve function names Improve function names and add function specs.
1 parent 1898ac1 commit 8929bc5

4 files changed

Lines changed: 1230 additions & 189 deletions

File tree

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,3 +244,14 @@
244244
{rabbit_db_queue,
245245
tie_binding_to_dest_with_keep_while_cond_enable}}
246246
}}).
247+
248+
-rabbit_feature_flag(
249+
{topic_binding_projection_v4,
250+
#{desc => "Enable the topic binding Khepri projection v4",
251+
stability => stable,
252+
depends_on => ['rabbitmq_4.3.0'],
253+
callbacks => #{enable =>
254+
{rabbit_khepri, topic_binding_projection_enable},
255+
post_enable =>
256+
{rabbit_khepri, topic_binding_projection_post_enable}}
257+
}}).

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 160 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@
1414
%% Used by Khepri projections and the Mnesia-to-Khepri migration.
1515
-export([split_topic_key_binary/1]).
1616

17-
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v3).
17+
-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3).
18+
19+
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
20+
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
1821

1922
-type match_result() :: [rabbit_types:binding_destination() |
2023
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
@@ -31,14 +34,20 @@
3134
%%
3235
%% @private
3336

34-
match(XName, RoutingKey, Opts) ->
37+
match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
3538
BKeys = maps:get(return_binding_keys, Opts, false),
3639
Words = split_topic_key_binary(RoutingKey),
37-
trie_match_in_khepri(XName, Words, BKeys).
38-
39-
%% --------------------------------------------------------------
40-
%% split_topic_key_binary().
41-
%% --------------------------------------------------------------
40+
case rabbit_khepri:get_effective_topic_binding_projection_version() of
41+
V when V >= 4 ->
42+
try
43+
trie_match({VHost, XName}, root, Words, BKeys, [])
44+
catch
45+
error:badarg ->
46+
[]
47+
end;
48+
_ ->
49+
trie_match_v3(X, Words, BKeys)
50+
end.
4251

4352
-spec split_topic_key_binary(RoutingKey) -> Words when
4453
RoutingKey :: binary(),
@@ -58,81 +67,164 @@ split_topic_key_binary(RoutingKey) ->
5867
end,
5968
binary:split(RoutingKey, Pattern, [global]).
6069

61-
%% --------------------------------------------------------------
62-
%% Internal
63-
%% --------------------------------------------------------------
70+
%% ==============================================================
71+
%% Trie-based routing
72+
%%
73+
%% Uses two ETS tables:
74+
%%
75+
%% 1. Trie edges table (set): {Key, ChildNodeId, ChildCount}
76+
%% Key = {XSrc, ParentNodeId, Word}
77+
%% Navigation: ets:lookup_element/4 for O(1) edge traversal.
78+
%%
79+
%% 2. Leaf bindings table (ordered_set): {{NodeId, BindingKey, Dest}}
80+
%% Collection: ets:next/2 probes for fanout 0-2 (fast path), then
81+
%% ets:select/2 with a partially bound key for fanout > 2 (does an
82+
%% O(log N) seek followed by O(F) range scan).
83+
%%
84+
%% Routing walks the trie (branching on literal word, <<"*">>, <<"#">>)
85+
%% then collects destinations from the bindings table at each matching
86+
%% leaf. This is O(depth * 3) for the trie walk, plus O(log N) per
87+
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
88+
%% ==============================================================
6489

65-
-spec add_matched([rabbit_types:binding_destination() |
66-
{rabbit_types:binding_destination(), BindingArgs :: list()}],
67-
ReturnBindingKeys :: boolean(),
68-
match_result()) ->
69-
match_result().
70-
add_matched(Destinations, false, Acc) ->
71-
Destinations ++ Acc;
72-
add_matched(DestinationsArgs, true, Acc) ->
73-
lists:foldl(
74-
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
75-
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
76-
{longstr, BKey} ->
77-
[{DestQ, BKey} | L];
78-
_ ->
79-
[DestQ | L]
80-
end;
81-
({DestX, _BindingArgs}, L) ->
82-
[DestX | L]
83-
end, Acc, DestinationsArgs).
90+
trie_match(XSrc, Node, [], BKeys, Acc0) ->
91+
Acc1 = trie_bindings(Node, BKeys, Acc0),
92+
trie_match_try(XSrc, Node, <<"#">>,
93+
fun trie_match_skip_any/5,
94+
[], BKeys, Acc1);
95+
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
96+
Acc1 = trie_match_try(XSrc, Node, W,
97+
fun trie_match/5,
98+
RestW, BKeys, Acc0),
99+
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
100+
fun trie_match/5,
101+
RestW, BKeys, Acc1),
102+
trie_match_try(XSrc, Node, <<"#">>,
103+
fun trie_match_skip_any/5,
104+
Words, BKeys, Acc2).
105+
106+
trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
107+
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
108+
{XSrc, Node, Word}, 2, undefined) of
109+
undefined ->
110+
Acc;
111+
NextNode ->
112+
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
113+
end.
84114

85-
%% Khepri topic graph
115+
trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
116+
trie_match(XSrc, Node, [], BKeys, Acc);
117+
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
118+
trie_match_skip_any(
119+
XSrc, Node, RestW, BKeys,
120+
trie_match(XSrc, Node, Words, BKeys, Acc)).
86121

87-
trie_match_in_khepri(X, Words, BKeys) ->
122+
%% Collect all destinations bound at the given trie node.
123+
%%
124+
%% Uses ets:next/2 for up to two elements (fast path for the common
125+
%% fanout 0-2 cases), then switches to ets:select/2 when fanout > 2.
126+
%%
127+
%% ets:select/2 occurs the expensive match spec compilation overhead.
128+
%% For larger fanouts, the cost for compiling the match spec amortises.
129+
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
130+
%% which is cheaper than F individual ets:next/2 calls
131+
%% (each O(log N) due to CATree fresh-stack allocation).
132+
trie_bindings(NodeId, BKeys, Acc) ->
133+
StartKey = {NodeId, <<>>, {}},
134+
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
135+
{NodeId, BKey1, Dest1} = Key1 ->
136+
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
137+
{NodeId, BKey2, Dest2} = Key2 ->
138+
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
139+
{NodeId, _, _} ->
140+
collect_select(NodeId, BKeys, Acc);
141+
_ ->
142+
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
143+
collect_binding(Dest2, BKey2, BKeys, Acc1)
144+
end;
145+
_ ->
146+
collect_binding(Dest1, BKey1, BKeys, Acc)
147+
end;
148+
_ ->
149+
Acc
150+
end.
151+
152+
collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
153+
[{Dest, BindingKey} | Acc];
154+
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
155+
[Dest | Acc].
156+
157+
collect_select(NodeId, false, Acc) ->
158+
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
159+
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
160+
Dests ++ Acc;
161+
collect_select(NodeId, true, Acc) ->
162+
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
163+
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
164+
format_dest_bkeys(DestsAndBKeys, Acc).
165+
166+
format_dest_bkeys([], Acc) ->
167+
Acc;
168+
format_dest_bkeys([{#resource{kind = queue} = Dest, BKey} | Rest], Acc) ->
169+
format_dest_bkeys(Rest, [{Dest, BKey} | Acc]);
170+
format_dest_bkeys([{Dest, _BKey} | Rest], Acc) ->
171+
format_dest_bkeys(Rest, [Dest | Acc]).
172+
173+
%% ==============================================================
174+
%% Old v3 Khepri topic graph.
175+
%% Delete these *_v3 functions when feature flag
176+
%% topic_binding_projection_v4 becomes required.
177+
%% ==============================================================
178+
179+
trie_match_v3(X, Words, BKeys) ->
88180
try
89-
trie_match_in_khepri(X, root, Words, BKeys, [])
181+
trie_match_v3(X, root, Words, BKeys, [])
90182
catch
91183
error:badarg ->
92184
[]
93185
end.
94186

95-
trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) ->
96-
Destinations = trie_bindings_in_khepri(X, Node, BKeys),
97-
ResAcc = add_matched(Destinations, BKeys, ResAcc0),
98-
trie_match_part_in_khepri(
187+
trie_match_v3(X, Node, [], BKeys, ResAcc0) ->
188+
Destinations = trie_bindings_v3(X, Node, BKeys),
189+
ResAcc = add_matched_v3(Destinations, BKeys, ResAcc0),
190+
trie_match_part_v3(
99191
X, Node, <<"#">>,
100-
fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc);
101-
trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
192+
fun trie_match_skip_any_v3/5, [], BKeys, ResAcc);
193+
trie_match_v3(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
102194
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
103-
trie_match_part_in_khepri(
195+
trie_match_part_v3(
104196
X, Node, WArg, MatchFun, RestWArg, BKeys, Acc)
105-
end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW},
106-
{<<"*">>, fun trie_match_in_khepri/5, RestW},
197+
end, ResAcc, [{W, fun trie_match_v3/5, RestW},
198+
{<<"*">>, fun trie_match_v3/5, RestW},
107199
{<<"#">>,
108-
fun trie_match_skip_any_in_khepri/5, Words}]).
200+
fun trie_match_skip_any_v3/5, Words}]).
109201

110-
trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
111-
case trie_child_in_khepri(X, Node, Search) of
202+
trie_match_part_v3(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
203+
case trie_child_v3(X, Node, Search) of
112204
{ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc);
113205
error -> ResAcc
114206
end.
115207

116-
trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) ->
117-
trie_match_in_khepri(X, Node, [], BKeys, ResAcc);
118-
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
119-
trie_match_skip_any_in_khepri(
208+
trie_match_skip_any_v3(X, Node, [], BKeys, ResAcc) ->
209+
trie_match_v3(X, Node, [], BKeys, ResAcc);
210+
trie_match_skip_any_v3(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
211+
trie_match_skip_any_v3(
120212
X, Node, RestW, BKeys,
121-
trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)).
213+
trie_match_v3(X, Node, Words, BKeys, ResAcc)).
122214

123-
trie_child_in_khepri(X, Node, Word) ->
215+
trie_child_v3(X, Node, Word) ->
124216
case ets:lookup(
125-
?KHEPRI_PROJECTION,
217+
?KHEPRI_PROJECTION_V3,
126218
#trie_edge{exchange_name = X,
127219
node_id = Node,
128220
word = Word}) of
129221
[#topic_trie_edge_v2{node_id = NextNode}] -> {ok, NextNode};
130222
[] -> error
131223
end.
132224

133-
trie_bindings_in_khepri(X, Node, BKeys) ->
225+
trie_bindings_v3(X, Node, BKeys) ->
134226
case ets:lookup(
135-
?KHEPRI_PROJECTION,
227+
?KHEPRI_PROJECTION_V3,
136228
#trie_edge{exchange_name = X,
137229
node_id = Node,
138230
word = bindings}) of
@@ -147,3 +239,18 @@ trie_bindings_in_khepri(X, Node, BKeys) ->
147239
[] ->
148240
[]
149241
end.
242+
243+
add_matched_v3(Destinations, false, Acc) ->
244+
Destinations ++ Acc;
245+
add_matched_v3(DestinationsArgs, true, Acc) ->
246+
lists:foldl(
247+
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
248+
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
249+
{longstr, BKey} ->
250+
[{DestQ, BKey} | L];
251+
_ ->
252+
[DestQ | L]
253+
end;
254+
({DestX, _BindingArgs}, L) ->
255+
[DestX | L]
256+
end, Acc, DestinationsArgs).

0 commit comments

Comments
 (0)