Skip to content

Commit fd4487d

Browse files
ansddumbbell
authored andcommitted
Replace Khepri topic routing projection with trie + ordered_set (v4)
Resolves #15588. The previous Khepri topic routing projection (v3) stored topic bindings as sets:set(#binding{}) inside trie leaf nodes. This design had a major performance drawback: On the insertion/deletion path (in the single Khepri Ra process), every binding change required a read-modify-write of the entire sets:set(), making it O(N) in the number of bindings at that leaf. With many MQTT clients connecting concurrently (each subscribing to the same topic filter), this made the Ra process a bottleneck. Another less severe performance issue was that the entire binding was being copied including the binding arguments containing the MQTT 5.0 subscription options such as: ``` {<<"x-mqtt-subscription-opts">>,table, [{<<"id">>,unsignedint,1}, {<<"no-local">>,bool,false}, {<<"qos">>,unsignedbyte,0}, {<<"retain-as-published">>,bool,false}, {<<"retain-handling">>,unsignedbyte,0}]}] ``` Replace the single ETS projection table with two purpose-built tables: 1. Trie edges table (ETS set, read_concurrency=true): - Row: `{{XSrc, ParentNodeId, Word}, ChildNodeId, ChildCount}` - XSrc = {VHost, ExchangeName} (compact 2-tuple of binaries) - NodeId = root | reference() - ChildCount tracks outgoing edges for garbage collection 2. Leaf bindings table (ETS ordered_set, read_concurrency=true): - Key: {NodeId, BindingKey, Dest} - Stored as 1-tuples: {{NodeId, BindingKey, Dest}} - No value column; all data is in the key to minimize copying The trie structure preserves O(depth * 3) routing complexity regardless of the number of overall bindings or wildcard filters. At each trie level, we probe at most 3 edges (literal word, <<"*">>, <<"#">>), each via ets:lookup_element/4 which copies only the ChildNodeId (a reference). The ordered_set for bindings provides: - O(log N) insert and delete per binding (no read-modify-write) - The binding key (needed for MQTT subscription identifiers and topic aliases) is part of the key, so it is returned directly during destination collection without additional lookups Collecting destinations at a matched trie leaf uses a hybrid strategy: - Fanout 0-2 (the common case: unicast, device + stream): up to 3 ets:next/2 probes. Each ets:next/2 call costs O(log N) because the CATree (used with read_concurrency) allocates a fresh tree traversal stack on each call. - Fanout > 2: ets:select/2 with a partially bound key does an O(log N) seek followed by an O(F) range scan. The match spec compilation overhead amortises over the larger result set. ets:lookup_element/4 (OTP 26+) returns a default value on miss instead of throwing badarg, and copies only the requested element on hit. This avoids both exception overhead (misses are common during trie traversal of <<"*">> and <<"#">> branches) and unnecessary data copying (we only need the ChildNodeId, not the full row). Trie node IDs are ephemeral (the tables are rebuilt when the Khepri projection is re-registered). make_ref() is fast, globally unique within a node, and has good hash distribution for the ETS set table. When a binding is deleted, the trie path from root to leaf is collected in a single downward walk (trie_follow_down_get_path). Empty nodes are then pruned bottom-up: a node is empty when its ChildCount is 0 and it has no bindings in the ordered_set table. Benchmarks below were run with 500K routing operations per scenario (on the same machine, back-to-back between main (v3) and this commit. Significant insert/delete improvements: Churn insert (8K bindings, 4 filters/client): ~1,120 vs ~810 ops/s (+38%) v3 did a read-modify-write of sets:set() per binding; v4 does a single ets:insert into the ordered_set plus trie edge updates. MQTT device insert (20K bindings): ~650 vs ~420 ops/s (+55%) Same mechanism as churn insert. Particularly impactful when many clients share the same wildcard filter (e.g. "broadcast.#"), since v3's sets:set() grew with each client while v4 inserts are O(log N) regardless. Same-key fanout insert (10K): ~415 vs ~290 ops/s (+43%) The worst case for v3: all 10K bindings share the same key, so each insert copies and rebuilds the growing sets:set(). Routing improvements: MQTT unicast (10K devices, 20K bindings): ~460K vs ~250K ops/s (+80%) Each route matches 1 queue among 10K unique exact keys plus 10K queues sharing "broadcast.#". v3 stored bindings in the same ETS row as the trie edge, so every trie lookup copied the entire sets:set(). v4 separates trie edges (small rows, set table) from bindings (ordered_set), so the trie walk copies only references. Large fanout (10K queues, same key): ~3,100 vs ~1,170 ops/s (+165%) v3 copied a 10K-element sets:set() out of ETS in a single ets:lookup, then called sets:to_list/1. v4 uses ets:select/2 with a partially bound key, which does an O(log N) seek and then an efficient O(F) range scan without intermediate set conversion. MQTT broadcast (10K fanout): ~0.6 vs ~0.9 ms/route (+50%) Same mechanism as above. Scenarios with no significant change (within benchmark noise): Exact match, wildcard *, wildcard #, mixed wildcards, and many wildcard filters showed no clear difference. Both v3 and v4 use a trie walk, so routing speed is comparable when the fanout is small and the bottleneck is trie traversal rather than destination collection.
1 parent e4e4f78 commit fd4487d

3 files changed

Lines changed: 1073 additions & 387 deletions

File tree

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 106 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@
1414
%% Used by Khepri projections and the Mnesia-to-Khepri migration.
1515
-export([split_topic_key_binary/1]).
1616

17-
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v3).
17+
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
18+
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
1819

1920
-type match_result() :: [rabbit_types:binding_destination() |
2021
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
@@ -31,14 +32,15 @@
3132
%%
3233
%% @private
3334

34-
match(XName, RoutingKey, Opts) ->
35+
match(#resource{virtual_host = VHost, name = XName}, RoutingKey, Opts) ->
3536
BKeys = maps:get(return_binding_keys, Opts, false),
3637
Words = split_topic_key_binary(RoutingKey),
37-
trie_match_in_khepri(XName, Words, BKeys).
38-
39-
%% --------------------------------------------------------------
40-
%% split_topic_key_binary().
41-
%% --------------------------------------------------------------
38+
try
39+
trie_match({VHost, XName}, root, Words, BKeys, [])
40+
catch
41+
error:badarg ->
42+
[]
43+
end.
4244

4345
-spec split_topic_key_binary(RoutingKey) -> Words when
4446
RoutingKey :: binary(),
@@ -58,92 +60,105 @@ split_topic_key_binary(RoutingKey) ->
5860
end,
5961
binary:split(RoutingKey, Pattern, [global]).
6062

61-
%% --------------------------------------------------------------
62-
%% Internal
63-
%% --------------------------------------------------------------
64-
65-
-spec add_matched([rabbit_types:binding_destination() |
66-
{rabbit_types:binding_destination(), BindingArgs :: list()}],
67-
ReturnBindingKeys :: boolean(),
68-
match_result()) ->
69-
match_result().
70-
add_matched(Destinations, false, Acc) ->
71-
Destinations ++ Acc;
72-
add_matched(DestinationsArgs, true, Acc) ->
73-
lists:foldl(
74-
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
75-
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
76-
{longstr, BKey} ->
77-
[{DestQ, BKey} | L];
78-
_ ->
79-
[DestQ | L]
80-
end;
81-
({DestX, _BindingArgs}, L) ->
82-
[DestX | L]
83-
end, Acc, DestinationsArgs).
84-
85-
%% Khepri topic graph
86-
87-
trie_match_in_khepri(X, Words, BKeys) ->
88-
try
89-
trie_match_in_khepri(X, root, Words, BKeys, [])
90-
catch
91-
error:badarg ->
92-
[]
63+
%% ==============================================================
64+
%% Trie-based routing
65+
%%
66+
%% Uses two ETS tables:
67+
%%
68+
%% 1. Trie edges table (set): {Key, ChildNodeId, ChildCount}
69+
%% Key = {XSrc, ParentNodeId, Word}
70+
%% Navigation: ets:lookup_element/4 for O(1) edge traversal.
71+
%%
72+
%% 2. Leaf bindings table (ordered_set): {{NodeId, BindingKey, Dest}}
73+
%% Collection: ets:next/2 probes for fanout 0-2 (fast path), then
74+
%% ets:select/2 with a partially bound key for fanout > 2 (does an
75+
%% O(log N) seek followed by O(F) range scan).
76+
%%
77+
%% Routing walks the trie (branching on literal word, <<"*">>, <<"#">>)
78+
%% then collects destinations from the bindings table at each matching
79+
%% leaf. This is O(depth * 3) for the trie walk, plus O(log N) per
80+
%% leaf for fanout 0-2, or O(log N + F) per leaf for fanout F > 2.
81+
%% ==============================================================
82+
83+
trie_match(XSrc, Node, [], BKeys, Acc0) ->
84+
Acc1 = trie_bindings(Node, BKeys, Acc0),
85+
trie_match_try(XSrc, Node, <<"#">>,
86+
fun trie_match_skip_any/5,
87+
[], BKeys, Acc1);
88+
trie_match(XSrc, Node, [W | RestW] = Words, BKeys, Acc0) ->
89+
Acc1 = trie_match_try(XSrc, Node, W,
90+
fun trie_match/5,
91+
RestW, BKeys, Acc0),
92+
Acc2 = trie_match_try(XSrc, Node, <<"*">>,
93+
fun trie_match/5,
94+
RestW, BKeys, Acc1),
95+
trie_match_try(XSrc, Node, <<"#">>,
96+
fun trie_match_skip_any/5,
97+
Words, BKeys, Acc2).
98+
99+
trie_match_try(XSrc, Node, Word, MatchFun, RestW, BKeys, Acc) ->
100+
case ets:lookup_element(?TOPIC_TRIE_PROJECTION,
101+
{XSrc, Node, Word}, 2, undefined) of
102+
undefined ->
103+
Acc;
104+
NextNode ->
105+
MatchFun(XSrc, NextNode, RestW, BKeys, Acc)
93106
end.
94107

95-
trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) ->
96-
Destinations = trie_bindings_in_khepri(X, Node, BKeys),
97-
ResAcc = add_matched(Destinations, BKeys, ResAcc0),
98-
trie_match_part_in_khepri(
99-
X, Node, <<"#">>,
100-
fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc);
101-
trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
102-
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
103-
trie_match_part_in_khepri(
104-
X, Node, WArg, MatchFun, RestWArg, BKeys, Acc)
105-
end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW},
106-
{<<"*">>, fun trie_match_in_khepri/5, RestW},
107-
{<<"#">>,
108-
fun trie_match_skip_any_in_khepri/5, Words}]).
109-
110-
trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
111-
case trie_child_in_khepri(X, Node, Search) of
112-
{ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc);
113-
error -> ResAcc
114-
end.
108+
trie_match_skip_any(XSrc, Node, [], BKeys, Acc) ->
109+
trie_match(XSrc, Node, [], BKeys, Acc);
110+
trie_match_skip_any(XSrc, Node, [_ | RestW] = Words, BKeys, Acc) ->
111+
trie_match_skip_any(
112+
XSrc, Node, RestW, BKeys,
113+
trie_match(XSrc, Node, Words, BKeys, Acc)).
115114

116-
trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) ->
117-
trie_match_in_khepri(X, Node, [], BKeys, ResAcc);
118-
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
119-
trie_match_skip_any_in_khepri(
120-
X, Node, RestW, BKeys,
121-
trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)).
122-
123-
trie_child_in_khepri(X, Node, Word) ->
124-
case ets:lookup(
125-
?KHEPRI_PROJECTION,
126-
#trie_edge{exchange_name = X,
127-
node_id = Node,
128-
word = Word}) of
129-
[#topic_trie_edge_v2{node_id = NextNode}] -> {ok, NextNode};
130-
[] -> error
115+
%% Collect all destinations bound at the given trie node.
116+
%%
117+
%% Uses ets:next/2 for up to two elements (fast path for the common
118+
%% fanout 0-2 cases), then switches to ets:select/2 when fanout > 2.
119+
%%
120+
%% ets:select/2 occurs the expensive match spec compilation overhead.
121+
%% For larger fanouts, the cost for compiling the match spec amortises.
122+
%% ets:select/2 occurs an O(log N) seek followed by an O(F) range scan,
123+
%% which is cheaper than F individual ets:next/2 calls
124+
%% (each O(log N) due to CATree fresh-stack allocation).
125+
trie_bindings(NodeId, BKeys, Acc) ->
126+
StartKey = {NodeId, <<>>, {}},
127+
case ets:next(?TOPIC_BINDING_PROJECTION, StartKey) of
128+
{NodeId, BKey1, Dest1} = Key1 ->
129+
case ets:next(?TOPIC_BINDING_PROJECTION, Key1) of
130+
{NodeId, BKey2, Dest2} = Key2 ->
131+
case ets:next(?TOPIC_BINDING_PROJECTION, Key2) of
132+
{NodeId, _, _} ->
133+
collect_select(NodeId, BKeys, Acc);
134+
_ ->
135+
Acc1 = collect_binding(Dest1, BKey1, BKeys, Acc),
136+
collect_binding(Dest2, BKey2, BKeys, Acc1)
137+
end;
138+
_ ->
139+
collect_binding(Dest1, BKey1, BKeys, Acc)
140+
end;
141+
_ ->
142+
Acc
131143
end.
132144

133-
trie_bindings_in_khepri(X, Node, BKeys) ->
134-
case ets:lookup(
135-
?KHEPRI_PROJECTION,
136-
#trie_edge{exchange_name = X,
137-
node_id = Node,
138-
word = bindings}) of
139-
[#topic_trie_edge_v2{node_id = {bindings, Bindings}}] ->
140-
[case BKeys of
141-
true ->
142-
{Dest, Args};
143-
false ->
144-
Dest
145-
end || #binding{destination = Dest,
146-
args = Args} <- sets:to_list(Bindings)];
147-
[] ->
148-
[]
149-
end.
145+
collect_binding(#resource{kind = queue} = Dest, BindingKey, true, Acc) ->
146+
[{Dest, BindingKey} | Acc];
147+
collect_binding(Dest, _BindingKey, _ReturnBindingKeys, Acc) ->
148+
[Dest | Acc].
149+
150+
collect_select(NodeId, false, Acc) ->
151+
Dests = ets:select(?TOPIC_BINDING_PROJECTION,
152+
[{{{NodeId, '_', '$1'}}, [], ['$1']}]),
153+
Dests ++ Acc;
154+
collect_select(NodeId, true, Acc) ->
155+
DestsAndBKeys = ets:select(?TOPIC_BINDING_PROJECTION,
156+
[{{{NodeId, '$1', '$2'}}, [], [{{'$2', '$1'}}]}]),
157+
format_dest_bkeys(DestsAndBKeys, Acc).
158+
159+
format_dest_bkeys([], Acc) ->
160+
Acc;
161+
format_dest_bkeys([{#resource{kind = queue} = Dest, BKey} | Rest], Acc) ->
162+
format_dest_bkeys(Rest, [{Dest, BKey} | Acc]);
163+
format_dest_bkeys([{Dest, _BKey} | Rest], Acc) ->
164+
format_dest_bkeys(Rest, [Dest | Acc]).

0 commit comments

Comments
 (0)