Skip to content

Commit de43879

Browse files
committed
rabbit_db: Eliminate the delete_queue Khepri transaction
... by using `keep_while` conditions on bindings and auto-delete exchanges. [Why] The `delete_queue` transaction's anonymous function has to be extracted by Horus, like any Khepri transaction. This is an expensive operation, but Horus uses caching to avoid most work after the first extraction. Unfortunately, even with this caching, this transaction is still very expensive when there are massive simultaneous deletions of queues. For instance when the queues' lifetime is linked to that of many clients, and all these clients get disconnected at once. [How] This patch removes the transaction entirely. Instead, it uses `keep_while` conditions on bindings and auto-delete exchanges to let Khepri handle the deletion of semantically related tree nodes. RabbitMQ just has to make a simle "delete this queue" command. To maintain backward compatibility, we introduce the `tie_binding_to_dest_with_keep_while_cond` feature flag. Its `enable` callback will take care of rewriting all bindings and auto-delete exchanges record in the store to add the new `keep_while` conditions. The binding deletion transaction is also simplified because it benefits from the `keep_while` conditions. We may be able to replace this transaction is the future as well in the same manner.
1 parent 6eeb339 commit de43879

5 files changed

Lines changed: 518 additions & 51 deletions

File tree

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,15 @@
232232
stability => stable,
233233
depends_on => ['rabbitmq_4.3.0']
234234
}}).
235+
236+
-rabbit_feature_flag(
237+
{tie_binding_to_dest_with_keep_while_cond,
238+
#{desc =>
239+
"Use keep_while condition to tie a binding record lifetime to its "
240+
"destination record in Khepri",
241+
stability => stable,
242+
depends_on => ['rabbitmq_4.3.0'],
243+
callbacks => #{enable =>
244+
{rabbit_db_queue,
245+
tie_binding_to_dest_with_keep_while_cond_enable}}
246+
}}).

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 136 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
delete_for_destination_in_khepri/2,
3232
delete_all_for_exchange_in_khepri/3,
3333
has_for_source_in_khepri/1,
34-
match_source_and_destination_in_khepri_tx/2
34+
match_source_and_destination_in_khepri_tx/2,
35+
khepri_ret_to_deletions/2,
36+
put_options/1
3537
]).
3638

3739
-export([
@@ -123,6 +125,12 @@ create(#binding{source = SrcName,
123125
case ChecksFun(Src, Dst) of
124126
ok ->
125127
RoutePath = khepri_route_path(Binding),
128+
FeatureFlag = rabbit_feature_flags:is_enabled(
129+
tie_binding_to_dest_with_keep_while_cond),
130+
PutOptions = case FeatureFlag of
131+
true -> put_options(DstName);
132+
false -> #{}
133+
end,
126134
MaybeSerial = rabbit_exchange:serialise_events(Src),
127135
Serial = rabbit_khepri:transaction(
128136
fun() ->
@@ -132,11 +140,17 @@ create(#binding{source = SrcName,
132140
true ->
133141
already_exists;
134142
false ->
135-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)),
143+
ok = khepri_tx:put(
144+
RoutePath,
145+
sets:add_element(Binding, Set),
146+
PutOptions),
136147
serial_in_khepri(MaybeSerial, Src)
137148
end;
138149
_ ->
139-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))),
150+
ok = khepri_tx:put(
151+
RoutePath,
152+
sets:add_element(Binding, sets:new([{version, 2}])),
153+
PutOptions),
140154
serial_in_khepri(MaybeSerial, Src)
141155
end
142156
end, rw),
@@ -166,6 +180,30 @@ lookup_resource(#resource{kind = exchange} = Name) ->
166180
_ -> []
167181
end.
168182

183+
put_options(#resource{virtual_host = VHost, name = DstName, kind = Kind}) ->
184+
put_options(VHost, DstName, Kind);
185+
put_options(BindingPath) when is_list(BindingPath) ->
186+
{
187+
VHost,
188+
_SrcName,
189+
Kind,
190+
DstName,
191+
_RoutingKey
192+
} = rabbit_db_binding:khepri_route_path_to_args(BindingPath),
193+
put_options(VHost, DstName, Kind).
194+
195+
put_options(VHost, DstName, Kind) ->
196+
DstPath = case Kind of
197+
queue ->
198+
rabbit_db_queue:khepri_queue_path(
199+
VHost, DstName);
200+
exchange ->
201+
rabbit_db_exchange:khepri_exchange_path(
202+
VHost, DstName)
203+
end,
204+
KeepWhile = #{DstPath => #if_node_exists{}},
205+
#{keep_while => KeepWhile}.
206+
169207
serial_in_khepri(false, _) ->
170208
none;
171209
serial_in_khepri(true, X) ->
@@ -190,8 +228,18 @@ serial_in_khepri(true, X) ->
190228
%%
191229
%% @private
192230

193-
delete(#binding{source = SrcName,
194-
destination = DstName} = Binding, ChecksFun) ->
231+
delete(Binding, ChecksFun) ->
232+
FeatureFlag = rabbit_feature_flags:is_enabled(
233+
tie_binding_to_dest_with_keep_while_cond),
234+
case FeatureFlag of
235+
true ->
236+
delete_v2(Binding, ChecksFun);
237+
false ->
238+
delete_v1(Binding, ChecksFun)
239+
end.
240+
241+
delete_v1(#binding{source = SrcName,
242+
destination = DstName} = Binding, ChecksFun) ->
195243
Path = khepri_route_path(Binding),
196244
case rabbit_khepri:transaction(
197245
fun () ->
@@ -204,7 +252,7 @@ delete(#binding{source = SrcName,
204252
true ->
205253
case ChecksFun(Src, Dst) of
206254
ok ->
207-
ok = delete_in_khepri(Binding),
255+
ok = delete_in_khepri_tx_v1(Binding),
208256
maybe_auto_delete_exchange_in_khepri(Binding#binding.source, [Binding], rabbit_binding:new_deletions(), false);
209257
{error, _} = Err ->
210258
Err
@@ -224,6 +272,40 @@ delete(#binding{source = SrcName,
224272
{ok, Deletions}
225273
end.
226274

275+
delete_v2(#binding{source = SrcName,
276+
destination = DstName} = Binding, ChecksFun) ->
277+
Path = khepri_route_path(Binding),
278+
case rabbit_khepri:transaction(
279+
fun () ->
280+
case {lookup_resource_in_khepri_tx(SrcName),
281+
lookup_resource_in_khepri_tx(DstName)} of
282+
{[Src], [Dst]} ->
283+
case exists_in_khepri(Path, Binding) of
284+
false ->
285+
ok;
286+
true ->
287+
case ChecksFun(Src, Dst) of
288+
ok ->
289+
delete_in_khepri_tx_v2(Binding);
290+
{error, _} = Err ->
291+
Err
292+
end
293+
end;
294+
_Errs ->
295+
%% No absent queues, always present on disk
296+
ok
297+
end
298+
end) of
299+
ok ->
300+
ok;
301+
{error, _} = Err ->
302+
Err;
303+
{ok, Deleted} ->
304+
Deletions = khepri_ret_to_deletions(Deleted, false),
305+
ok = rabbit_binding:process_deletions(Deletions),
306+
{ok, Deletions}
307+
end.
308+
227309
exists_in_khepri(Path, Binding) ->
228310
case khepri_tx:get(Path) of
229311
{ok, Set} ->
@@ -232,7 +314,7 @@ exists_in_khepri(Path, Binding) ->
232314
false
233315
end.
234316

235-
delete_in_khepri(Binding) ->
317+
delete_in_khepri_tx_v1(Binding) ->
236318
Path = khepri_route_path(Binding),
237319
case khepri_tx:get(Path) of
238320
{ok, Set0} ->
@@ -247,6 +329,21 @@ delete_in_khepri(Binding) ->
247329
ok
248330
end.
249331

332+
delete_in_khepri_tx_v2(Binding) ->
333+
Path = khepri_route_path(Binding),
334+
case khepri_tx:get(Path) of
335+
{ok, Set0} ->
336+
Set = sets:del_element(Binding, Set0),
337+
case sets:is_empty(Set) of
338+
true ->
339+
khepri_tx_adv:delete(Path);
340+
false ->
341+
ok = khepri_tx:put(Path, Set)
342+
end;
343+
_ ->
344+
ok
345+
end.
346+
250347
maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
251348
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
252349
{not_deleted, undefined} ->
@@ -562,6 +659,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
562659
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
563660
lists:keysort(#binding.source, Bindings), OnlyDurable).
564661

662+
khepri_ret_to_deletions(Deleted, OnlyDurable) ->
663+
Bindings0 = maps:fold(
664+
fun(Path, Props, Acc) ->
665+
case {Path, Props} of
666+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
667+
_VHost, _SrcName, _Kind, _Name, _RoutingKey),
668+
#{data := Set}} ->
669+
sets:to_list(Set) ++ Acc;
670+
{_, _} ->
671+
Acc
672+
end
673+
end, [], Deleted),
674+
Bindings1 = lists:keysort(#binding.source, Bindings0),
675+
rabbit_binding:group_bindings_fold(
676+
fun(XName, Bindings, Deletions, _OnlyDurable) ->
677+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName),
678+
case Deleted of
679+
#{ExchangePath := #{data := X}} ->
680+
rabbit_binding:add_deletion(
681+
XName, X, deleted, Bindings, Deletions);
682+
_ ->
683+
case rabbit_db_exchange:get(XName) of
684+
{ok, X} ->
685+
rabbit_binding:add_deletion(
686+
XName, X, not_deleted, Bindings, Deletions);
687+
_ ->
688+
Deletions
689+
end
690+
end
691+
end,
692+
Bindings1, OnlyDurable).
693+
565694
%% -------------------------------------------------------------------
566695
%% has_for_source_in_khepri().
567696
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
get_in_khepri_tx/1,
4242
update_in_khepri_tx/2,
4343
clear_exchanges_in_khepri/0,
44-
clear_exchange_serials_in_khepri/0
44+
clear_exchange_serials_in_khepri/0,
45+
put_options/1
4546
]).
4647

4748
%% For testing
@@ -284,7 +285,13 @@ create_or_get(#exchange{name = XName} = X) ->
284285
Path0, [#if_any{conditions =
285286
[#if_node_exists{exists = false},
286287
#if_has_payload{has_payload = false}]}]),
287-
case rabbit_khepri:put(Path1, X) of
288+
FeatureFlag = rabbit_feature_flags:is_enabled(
289+
tie_binding_to_dest_with_keep_while_cond),
290+
PutOptions = case FeatureFlag of
291+
true -> put_options(X);
292+
false -> #{}
293+
end,
294+
case rabbit_khepri:put(Path1, X, PutOptions) of
288295
ok ->
289296
{new, X};
290297
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
@@ -293,6 +300,23 @@ create_or_get(#exchange{name = XName} = X) ->
293300
Err
294301
end.
295302

303+
put_options(Exchange) ->
304+
case Exchange of
305+
#exchange{name = #resource{virtual_host = VHost,
306+
name = Name},
307+
auto_delete = true} ->
308+
Path = rabbit_db_binding:khepri_route_path(
309+
VHost,
310+
Name,
311+
_Kind = ?KHEPRI_WILDCARD_STAR,
312+
_DstName = ?KHEPRI_WILDCARD_STAR,
313+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
314+
KeepWhile = #{Path => #if_has_data{}},
315+
#{keep_while => KeepWhile};
316+
_ ->
317+
#{}
318+
end.
319+
296320
%% -------------------------------------------------------------------
297321
%% set().
298322
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)