Skip to content

Commit fa4670b

Browse files
committed
rabbit_khepri: Optimise topic binding projection function
[Why] Before this patch, the projection function that managed the topic bindings graph only kept track of "topic word -> topic word" = "this node ID". Reading and updating this graph was efficient. However, deleting a node in this graph was expensive because it used `ets:match/3` to determine if an edge was pointing to nothing and could be reclaimed. `ets:match()` does a full scan of the table. On my laptop, this scan took 20 ms with 100k topic bindings, thus >30 minutes to delete all of them. [How] The new projection function tracks the number of children a target node has as well. This way, it knows that if this counter reaches 0, the edge can be reclaimed. The same test with 100k topic bindings takes 3.5 seconds to delete them. (cherry picked from commit 612d93d)
1 parent 0b4bb69 commit fa4670b

4 files changed

Lines changed: 160 additions & 75 deletions

File tree

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
-define(MNESIA_NODE_TABLE, rabbit_topic_trie_node).
2727
-define(MNESIA_EDGE_TABLE, rabbit_topic_trie_edge).
2828
-define(MNESIA_BINDING_TABLE, rabbit_topic_trie_binding).
29-
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v2).
29+
-define(KHEPRI_PROJECTION, rabbit_khepri_topic_trie_v3).
3030

3131
-type match_result() :: [rabbit_types:binding_destination() |
3232
{rabbit_amqqueue:name(), rabbit_types:binding_key()}].
@@ -533,8 +533,8 @@ trie_child_in_khepri(X, Node, Word) ->
533533
#trie_edge{exchange_name = X,
534534
node_id = Node,
535535
word = Word}) of
536-
[#topic_trie_edge{node_id = NextNode}] -> {ok, NextNode};
537-
[] -> error
536+
[#topic_trie_edge_v2{node_id = NextNode}] -> {ok, NextNode};
537+
[] -> error
538538
end.
539539

540540
trie_bindings_in_khepri(X, Node, BKeys) ->
@@ -543,7 +543,7 @@ trie_bindings_in_khepri(X, Node, BKeys) ->
543543
#trie_edge{exchange_name = X,
544544
node_id = Node,
545545
word = bindings}) of
546-
[#topic_trie_edge{node_id = {bindings, Bindings}}] ->
546+
[#topic_trie_edge_v2{node_id = {bindings, Bindings}}] ->
547547
[case BKeys of
548548
true ->
549549
{Dest, Args};

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 100 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -176,7 +176,8 @@
176176
handle_fallback/1]).
177177

178178
%% Called remotely to handle unregistration of old projections.
179-
-export([supports_rabbit_khepri_topic_trie_v2/0]).
179+
-export([supports_rabbit_khepri_topic_trie_v2/0,
180+
supports_rabbit_khepri_topic_trie_version/0]).
180181

181182
-ifdef(TEST).
182183
-export([register_projections/0,
@@ -1544,7 +1545,7 @@ projection_fun_for_sets(MapFun) ->
15441545
end.
15451546

15461547
register_rabbit_topic_graph_projection() ->
1547-
Name = rabbit_khepri_topic_trie_v2,
1548+
Name = rabbit_khepri_topic_trie_v3,
15481549
%% This projection calls some external functions which are disallowed by
15491550
%% Horus because they interact with global or random state. We explicitly
15501551
%% allow them here for performance reasons.
@@ -1565,7 +1566,7 @@ register_rabbit_topic_graph_projection() ->
15651566
(M, F, A, From) ->
15661567
khepri_tx_adv:should_process_function(M, F, A, From)
15671568
end,
1568-
Options = #{keypos => #topic_trie_edge.trie_edge,
1569+
Options = #{keypos => #topic_trie_edge_v2.trie_edge,
15691570
standalone_fun_options =>
15701571
#{should_process_function => ShouldProcessFun},
15711572
read_concurrency => true},
@@ -1615,34 +1616,58 @@ register_rabbit_topic_graph_projection() ->
16151616
_Kind = ?KHEPRI_WILDCARD_STAR,
16161617
_DstName = ?KHEPRI_WILDCARD_STAR,
16171618
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
1618-
_ = unregister_rabbit_topic_trie_v1_projection(),
1619+
_ = unregister_old_rabbit_topic_trie_projections(),
16191620
khepri:register_projection(?STORE_ID, PathPattern, Projection).
16201621

16211622
supports_rabbit_khepri_topic_trie_v2() ->
16221623
true.
16231624

1624-
unregister_rabbit_topic_trie_v1_projection() ->
1625+
supports_rabbit_khepri_topic_trie_version() ->
1626+
3.
1627+
1628+
unregister_old_rabbit_topic_trie_projections() ->
1629+
OldProjections = [{1, rabbit_khepri_topic_trie},
1630+
{2, rabbit_khepri_topic_trie_v2}],
1631+
lists:foreach(
1632+
fun unregister_old_rabbit_topic_trie_projection/1,
1633+
OldProjections).
1634+
1635+
unregister_old_rabbit_topic_trie_projection({Version, ProjectionName}) ->
16251636
Nodes = rabbit_nodes:list_members(),
1626-
Rets = erpc:multicall(
1627-
Nodes,
1628-
?MODULE, supports_rabbit_khepri_topic_trie_v2, []),
1637+
Rets = case Version of
1638+
1 ->
1639+
erpc:multicall(
1640+
Nodes,
1641+
?MODULE, supports_rabbit_khepri_topic_trie_v2, []);
1642+
_ ->
1643+
erpc:multicall(
1644+
Nodes,
1645+
?MODULE, supports_rabbit_khepri_topic_trie_version, [])
1646+
end,
16291647
SupportedEverywhere = lists:all(
1630-
fun(Ret) ->
1631-
Ret =:= {ok, true}
1648+
fun
1649+
({ok, true}) ->
1650+
true;
1651+
({ok, RemoteVersion}) ->
1652+
RemoteVersion >= Version;
1653+
(_) ->
1654+
false
16321655
end, Rets),
16331656
case SupportedEverywhere of
16341657
true ->
16351658
?LOG_DEBUG(
1636-
"DB: unregister old `rabbit_khepri_topic_trie` Khepri "
1659+
"DB: unregister old `~s` Khepri "
16371660
"projection",
1661+
[ProjectionName],
16381662
#{domain => ?RMQLOG_DOMAIN_DB}),
16391663
khepri:unregister_projections(
1640-
?STORE_ID, [rabbit_khepri_topic_trie]);
1664+
?STORE_ID, [ProjectionName]);
16411665
false ->
16421666
?LOG_DEBUG(
16431667
"DB: skipping unregistration of old "
1644-
"`rabbit_khepri_topic_trie` Khepri because some RabbitMQ "
1668+
"`~s` Khepri because some RabbitMQ "
16451669
"nodes still use it",
1670+
[ProjectionName],
16461671
#{domain => ?RMQLOG_DOMAIN_DB}),
16471672
ok
16481673
end.
@@ -1666,70 +1691,89 @@ follow_down_update(Table, Exchange, Words, UpdateFn) ->
16661691
Words :: [binary()],
16671692
BindingsSet :: sets:set(rabbit_types:binding()),
16681693
UpdateFn :: fun((BindingsSet) -> BindingsSet),
1669-
Ret :: keep | delete.
1694+
Ret :: added | kept | deleted.
16701695

16711696
follow_down_update(Table, Exchange, FromNodeId, [To | Rest], UpdateFn) ->
16721697
TrieEdge = #trie_edge{exchange_name = Exchange,
16731698
node_id = FromNodeId,
16741699
word = To},
1675-
ToNodeId = case ets:lookup(Table, TrieEdge) of
1676-
[#topic_trie_edge{node_id = ExistingId}] ->
1677-
ExistingId;
1678-
[] ->
1679-
%% The Khepri topic graph table uses references for node
1680-
%% IDs instead of `rabbit_guid:gen/0' used by mnesia.
1681-
%% This is possible because the topic graph table is
1682-
%% never persisted to disk. References take up slightly
1683-
%% less memory and are very cheap to produce compared to
1684-
%% `rabbit_guid' (which requires the `rabbit_guid'
1685-
%% genserver to be online).
1686-
NewNodeId = make_ref(),
1687-
NewEdge = #topic_trie_edge{trie_edge = TrieEdge,
1688-
node_id = NewNodeId},
1689-
%% Create the intermediary node.
1690-
ets:insert(Table, NewEdge),
1691-
NewNodeId
1692-
end,
1700+
{ToNodeId, IsNew} = case ets:lookup(Table, TrieEdge) of
1701+
[#topic_trie_edge_v2{node_id = ExistingId}] ->
1702+
{ExistingId, false};
1703+
[] ->
1704+
%% The Khepri topic graph table uses references
1705+
%% for node IDs instead of `rabbit_guid:gen/0'
1706+
%% used by mnesia. This is possible because the
1707+
%% topic graph table is never persisted to
1708+
%% disk. References take up slightly less
1709+
%% memory and are very cheap to produce
1710+
%% compared to `rabbit_guid' (which requires
1711+
%% the `rabbit_guid' genserver to be online).
1712+
NewNodeId = make_ref(),
1713+
NewEdge = #topic_trie_edge_v2{
1714+
trie_edge = TrieEdge,
1715+
node_id = NewNodeId,
1716+
child_count = 0},
1717+
%% Create the intermediary node.
1718+
ets:insert(Table, NewEdge),
1719+
{NewNodeId, true}
1720+
end,
16931721
case follow_down_update(Table, Exchange, ToNodeId, Rest, UpdateFn) of
1694-
delete ->
1695-
OutEdgePattern = #topic_trie_edge{trie_edge =
1696-
TrieEdge#trie_edge{
1697-
node_id = ToNodeId,
1698-
word = '_'},
1699-
node_id = '_'},
1700-
case ets:match(Table, OutEdgePattern, 1) of
1701-
'$end_of_table' ->
1702-
ets:delete(Table, TrieEdge),
1703-
delete;
1704-
{_Match, _Continuation} ->
1705-
keep
1722+
added ->
1723+
_ = ets:update_counter(
1724+
Table, TrieEdge, {#topic_trie_edge_v2.child_count, 1}),
1725+
case IsNew of
1726+
true ->
1727+
added;
1728+
false ->
1729+
kept
17061730
end;
1707-
keep ->
1708-
keep
1731+
kept ->
1732+
false = IsNew, %% Assertion.
1733+
kept;
1734+
deleted ->
1735+
false = IsNew, %% Assertion.
1736+
NewCount = ets:update_counter(
1737+
Table, TrieEdge,
1738+
{#topic_trie_edge_v2.child_count, -1}),
1739+
if
1740+
NewCount > 0 ->
1741+
kept;
1742+
NewCount =:= 0 ->
1743+
ets:delete(Table, TrieEdge),
1744+
deleted
1745+
end
17091746
end;
17101747
follow_down_update(Table, Exchange, LeafNodeId, [], UpdateFn) ->
17111748
TrieEdge = #trie_edge{exchange_name = Exchange,
17121749
node_id = LeafNodeId,
17131750
word = bindings},
1714-
Bindings = case ets:lookup(Table, TrieEdge) of
1715-
[#topic_trie_edge{node_id =
1716-
{bindings, ExistingBindings}}] ->
1717-
ExistingBindings;
1718-
[] ->
1719-
sets:new([{version, 2}])
1720-
end,
1751+
{Bindings, IsNew} = case ets:lookup(Table, TrieEdge) of
1752+
[#topic_trie_edge_v2{
1753+
node_id = {bindings, ExistingBindings}}] ->
1754+
{ExistingBindings, false};
1755+
[] ->
1756+
{sets:new([{version, 2}]), true}
1757+
end,
17211758
NewBindings = UpdateFn(Bindings),
17221759
case sets:is_empty(NewBindings) of
17231760
true ->
17241761
%% If the bindings have been deleted, delete the trie edge and
17251762
%% any edges that no longer lead to any bindings or other edges.
17261763
ets:delete(Table, TrieEdge),
1727-
delete;
1764+
deleted;
17281765
false ->
17291766
ToNodeId = {bindings, NewBindings},
1730-
Edge = #topic_trie_edge{trie_edge = TrieEdge, node_id = ToNodeId},
1767+
Edge = #topic_trie_edge_v2{
1768+
trie_edge = TrieEdge,
1769+
node_id = ToNodeId},
17311770
ets:insert(Table, Edge),
1732-
keep
1771+
case IsNew of
1772+
true ->
1773+
added;
1774+
false ->
1775+
kept
1776+
end
17331777
end.
17341778

17351779
%% -------------------------------------------------------------------

deps/rabbit/test/rabbit_db_topic_exchange_SUITE.erl

Lines changed: 55 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -485,7 +485,7 @@ topic_trie_cleanup(Config) ->
485485
%% Log entries that were added to the ETS table
486486
lists:foreach(
487487
fun(Node) ->
488-
VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
488+
VHostEntriesAfterAdd = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v3),
489489
ct:pal("Bindings added on node ~s: ~p, ETS entries after add: ~p~n",
490490
[Node, length(Bindings), length(VHostEntriesAfterAdd)])
491491
end, Nodes),
@@ -507,11 +507,37 @@ topic_trie_cleanup(Config) ->
507507
%% old projection table might be there in case of
508508
%% mixed-version testing. This part will be tested in the
509509
%% second part of the testcase.
510-
VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v2),
510+
VHostEntriesAfterDelete = read_topic_trie_table(Config, Node, VHost, rabbit_khepri_topic_trie_v3),
511511
ct:pal("ETS entries after delete on node ~s: ~p~n", [Node, length(VHostEntriesAfterDelete)]),
512512

513-
%% Assert that no entries were found for this vhost after deletion
514-
?assertEqual([], VHostEntriesAfterDelete)
513+
% %% Assert that no entries were found for this vhost after deletion
514+
% ?assertEqual([], VHostEntriesAfterDelete)
515+
{ok, KhepriVersion0} = rabbit_ct_broker_helpers:rpc(
516+
Config, Node,
517+
application, get_key,
518+
[khepri, vsn]),
519+
KhepriVersion1 = rabbit_misc:format(
520+
"~2..0s~2..0s~2..0s",
521+
string:split(KhepriVersion0, ".", all)),
522+
KhepriVersion2 = list_to_integer(KhepriVersion1),
523+
ct:pal("Khepri version: ~b / ~s", [KhepriVersion2, KhepriVersion1]),
524+
if
525+
KhepriVersion2 >= 1704 ->
526+
%% Assert that no entries were found for this vhost
527+
%% after deletion.
528+
?assertEqual([], VHostEntriesAfterDelete);
529+
true ->
530+
%% Assert that no entries were found for this vhost
531+
%% after deletion. This node does not have the fixed
532+
%% version of Khepri, so it won't delete the new
533+
%% `#topic_trie_edge_v2{}' correctly, that's why we
534+
%% filter them out.
535+
ct:pal("Consider #topic_trie_edge{} records only"),
536+
?assertEqual(
537+
[],
538+
[E || #topic_trie_edge{} = E
539+
<- VHostEntriesAfterDelete])
540+
end
515541
end, Nodes),
516542

517543
%% If we reach this point, we know the new projection works as expected
@@ -554,13 +580,17 @@ topic_trie_cleanup(Config) ->
554580
ct:pal("Wait for projections to be restored"),
555581
?awaitMatch(
556582
Entries when is_list(Entries),
557-
catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v2),
583+
catch read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie_v3),
558584
60000),
559585

560-
ct:pal("Check that the old projection is gone"),
561-
?assertError(
562-
{exception, badarg, _},
563-
read_topic_trie_table(Config, NewNode, VHost, rabbit_khepri_topic_trie));
586+
ct:pal("Check that the old projections are gone"),
587+
lists:foreach(
588+
fun(ProjectionName) ->
589+
?assertError(
590+
{exception, badarg, _},
591+
read_topic_trie_table(Config, NewNode, VHost, ProjectionName))
592+
end, [rabbit_khepri_topic_trie,
593+
rabbit_khepri_topic_trie_v2]);
564594
false ->
565595
ok
566596
end
@@ -573,12 +603,22 @@ topic_trie_cleanup(Config) ->
573603

574604
read_topic_trie_table(Config, Node, VHost, Table) ->
575605
Entries = rabbit_ct_broker_helpers:rpc(Config, Node, ets, tab2list, [Table]),
576-
[Entry || #topic_trie_edge{trie_edge = TrieEdge} = Entry <- Entries,
577-
case TrieEdge of
578-
#trie_edge{exchange_name = #resource{virtual_host = V}} ->
579-
V =:= VHost;
580-
_ ->
581-
false
606+
[Entry || Entry <- Entries,
607+
case Entry of
608+
#topic_trie_edge{trie_edge = TrieEdge} ->
609+
case TrieEdge of
610+
#trie_edge{exchange_name = #resource{virtual_host = V}} ->
611+
V =:= VHost;
612+
_ ->
613+
false
614+
end;
615+
#topic_trie_edge_v2{trie_edge = TrieEdge} ->
616+
case TrieEdge of
617+
#trie_edge{exchange_name = #resource{virtual_host = V}} ->
618+
V =:= VHost;
619+
_ ->
620+
false
621+
end
582622
end].
583623

584624
%% ---------------------------------------------------------------------------

deps/rabbit_common/include/rabbit.hrl

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,7 @@
101101

102102
-record(topic_trie_node, {trie_node, edge_count, binding_count}).
103103
-record(topic_trie_edge, {trie_edge, node_id}).
104+
-record(topic_trie_edge_v2, {trie_edge, node_id, child_count}).
104105
-record(topic_trie_binding, {trie_binding, value = const}).
105106

106107
-record(trie_node, {exchange_name, node_id}).

0 commit comments

Comments
 (0)