Skip to content

Commit fc2b6ec

Browse files
authored
Merge pull request #15954 from rabbitmq/mergify/bp/v4.3.x/pr-14902
rabbit_db: Eliminate the `delete_queue` Khepri transaction (backport #14902)
2 parents ba1ec10 + 9ae6ece commit fc2b6ec

6 files changed

Lines changed: 519 additions & 52 deletions

File tree

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,3 +232,15 @@
232232
stability => stable,
233233
depends_on => ['rabbitmq_4.3.0']
234234
}}).
235+
236+
-rabbit_feature_flag(
237+
{tie_binding_to_dest_with_keep_while_cond,
238+
#{desc =>
239+
"Use keep_while condition to tie a binding record lifetime to its "
240+
"destination record in Khepri",
241+
stability => stable,
242+
depends_on => ['rabbitmq_4.3.0'],
243+
callbacks => #{enable =>
244+
{rabbit_db_queue,
245+
tie_binding_to_dest_with_keep_while_cond_enable}}
246+
}}).

deps/rabbit/src/rabbit_db_binding.erl

Lines changed: 136 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,9 @@
3131
delete_for_destination_in_khepri/2,
3232
delete_all_for_exchange_in_khepri/3,
3333
has_for_source_in_khepri/1,
34-
match_source_and_destination_in_khepri_tx/2
34+
match_source_and_destination_in_khepri_tx/2,
35+
khepri_ret_to_deletions/2,
36+
put_options/1
3537
]).
3638

3739
-export([
@@ -123,6 +125,12 @@ create(#binding{source = SrcName,
123125
case ChecksFun(Src, Dst) of
124126
ok ->
125127
RoutePath = khepri_route_path(Binding),
128+
FeatureFlag = rabbit_feature_flags:is_enabled(
129+
tie_binding_to_dest_with_keep_while_cond),
130+
PutOptions = case FeatureFlag of
131+
true -> put_options(DstName);
132+
false -> #{}
133+
end,
126134
MaybeSerial = rabbit_exchange:serialise_events(Src),
127135
Serial = rabbit_khepri:transaction(
128136
fun() ->
@@ -132,11 +140,17 @@ create(#binding{source = SrcName,
132140
true ->
133141
already_exists;
134142
false ->
135-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)),
143+
ok = khepri_tx:put(
144+
RoutePath,
145+
sets:add_element(Binding, Set),
146+
PutOptions),
136147
serial_in_khepri(MaybeSerial, Src)
137148
end;
138149
_ ->
139-
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))),
150+
ok = khepri_tx:put(
151+
RoutePath,
152+
sets:add_element(Binding, sets:new([{version, 2}])),
153+
PutOptions),
140154
serial_in_khepri(MaybeSerial, Src)
141155
end
142156
end, rw),
@@ -166,6 +180,30 @@ lookup_resource(#resource{kind = exchange} = Name) ->
166180
_ -> []
167181
end.
168182

183+
put_options(#resource{virtual_host = VHost, name = DstName, kind = Kind}) ->
184+
put_options(VHost, DstName, Kind);
185+
put_options(BindingPath) when is_list(BindingPath) ->
186+
{
187+
VHost,
188+
_SrcName,
189+
Kind,
190+
DstName,
191+
_RoutingKey
192+
} = rabbit_db_binding:khepri_route_path_to_args(BindingPath),
193+
put_options(VHost, DstName, Kind).
194+
195+
put_options(VHost, DstName, Kind) ->
196+
DstPath = case Kind of
197+
queue ->
198+
rabbit_db_queue:khepri_queue_path(
199+
VHost, DstName);
200+
exchange ->
201+
rabbit_db_exchange:khepri_exchange_path(
202+
VHost, DstName)
203+
end,
204+
KeepWhile = #{DstPath => #if_node_exists{}},
205+
#{keep_while => KeepWhile}.
206+
169207
serial_in_khepri(false, _) ->
170208
none;
171209
serial_in_khepri(true, X) ->
@@ -190,8 +228,18 @@ serial_in_khepri(true, X) ->
190228
%%
191229
%% @private
192230

193-
delete(#binding{source = SrcName,
194-
destination = DstName} = Binding, ChecksFun) ->
231+
delete(Binding, ChecksFun) ->
232+
FeatureFlag = rabbit_feature_flags:is_enabled(
233+
tie_binding_to_dest_with_keep_while_cond),
234+
case FeatureFlag of
235+
true ->
236+
delete_v2(Binding, ChecksFun);
237+
false ->
238+
delete_v1(Binding, ChecksFun)
239+
end.
240+
241+
delete_v1(#binding{source = SrcName,
242+
destination = DstName} = Binding, ChecksFun) ->
195243
Path = khepri_route_path(Binding),
196244
case rabbit_khepri:transaction(
197245
fun () ->
@@ -204,7 +252,7 @@ delete(#binding{source = SrcName,
204252
true ->
205253
case ChecksFun(Src, Dst) of
206254
ok ->
207-
ok = delete_in_khepri(Binding),
255+
ok = delete_in_khepri_tx_v1(Binding),
208256
maybe_auto_delete_exchange_in_khepri(Binding#binding.source, [Binding], rabbit_binding:new_deletions(), false);
209257
{error, _} = Err ->
210258
Err
@@ -224,6 +272,40 @@ delete(#binding{source = SrcName,
224272
{ok, Deletions}
225273
end.
226274

275+
delete_v2(#binding{source = SrcName,
276+
destination = DstName} = Binding, ChecksFun) ->
277+
Path = khepri_route_path(Binding),
278+
case rabbit_khepri:transaction(
279+
fun () ->
280+
case {lookup_resource_in_khepri_tx(SrcName),
281+
lookup_resource_in_khepri_tx(DstName)} of
282+
{[Src], [Dst]} ->
283+
case exists_in_khepri(Path, Binding) of
284+
false ->
285+
ok;
286+
true ->
287+
case ChecksFun(Src, Dst) of
288+
ok ->
289+
delete_in_khepri_tx_v2(Binding);
290+
{error, _} = Err ->
291+
Err
292+
end
293+
end;
294+
_Errs ->
295+
%% No absent queues, always present on disk
296+
ok
297+
end
298+
end) of
299+
ok ->
300+
ok;
301+
{error, _} = Err ->
302+
Err;
303+
{ok, Deleted} ->
304+
Deletions = khepri_ret_to_deletions(Deleted, false),
305+
ok = rabbit_binding:process_deletions(Deletions),
306+
{ok, Deletions}
307+
end.
308+
227309
exists_in_khepri(Path, Binding) ->
228310
case khepri_tx:get(Path) of
229311
{ok, Set} ->
@@ -232,7 +314,7 @@ exists_in_khepri(Path, Binding) ->
232314
false
233315
end.
234316

235-
delete_in_khepri(Binding) ->
317+
delete_in_khepri_tx_v1(Binding) ->
236318
Path = khepri_route_path(Binding),
237319
case khepri_tx:get(Path) of
238320
{ok, Set0} ->
@@ -247,6 +329,21 @@ delete_in_khepri(Binding) ->
247329
ok
248330
end.
249331

332+
delete_in_khepri_tx_v2(Binding) ->
333+
Path = khepri_route_path(Binding),
334+
case khepri_tx:get(Path) of
335+
{ok, Set0} ->
336+
Set = sets:del_element(Binding, Set0),
337+
case sets:is_empty(Set) of
338+
true ->
339+
khepri_tx_adv:delete(Path);
340+
false ->
341+
ok = khepri_tx:put(Path, Set)
342+
end;
343+
_ ->
344+
ok
345+
end.
346+
250347
maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
251348
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
252349
{not_deleted, undefined} ->
@@ -562,6 +659,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
562659
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
563660
lists:keysort(#binding.source, Bindings), OnlyDurable).
564661

662+
khepri_ret_to_deletions(Deleted, OnlyDurable) ->
663+
Bindings0 = maps:fold(
664+
fun(Path, Props, Acc) ->
665+
case {Path, Props} of
666+
{?RABBITMQ_KHEPRI_ROUTE_PATH(
667+
_VHost, _SrcName, _Kind, _Name, _RoutingKey),
668+
#{data := Set}} ->
669+
sets:to_list(Set) ++ Acc;
670+
{_, _} ->
671+
Acc
672+
end
673+
end, [], Deleted),
674+
Bindings1 = lists:keysort(#binding.source, Bindings0),
675+
rabbit_binding:group_bindings_fold(
676+
fun(XName, Bindings, Deletions, _OnlyDurable) ->
677+
ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName),
678+
case Deleted of
679+
#{ExchangePath := #{data := X}} ->
680+
rabbit_binding:add_deletion(
681+
XName, X, deleted, Bindings, Deletions);
682+
_ ->
683+
case rabbit_db_exchange:get(XName) of
684+
{ok, X} ->
685+
rabbit_binding:add_deletion(
686+
XName, X, not_deleted, Bindings, Deletions);
687+
_ ->
688+
Deletions
689+
end
690+
end
691+
end,
692+
Bindings1, OnlyDurable).
693+
565694
%% -------------------------------------------------------------------
566695
%% has_for_source_in_khepri().
567696
%% -------------------------------------------------------------------

deps/rabbit/src/rabbit_db_exchange.erl

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,8 @@
4141
get_in_khepri_tx/1,
4242
update_in_khepri_tx/2,
4343
clear_exchanges_in_khepri/0,
44-
clear_exchange_serials_in_khepri/0
44+
clear_exchange_serials_in_khepri/0,
45+
put_options/1
4546
]).
4647

4748
%% For testing
@@ -284,7 +285,13 @@ create_or_get(#exchange{name = XName} = X) ->
284285
Path0, [#if_any{conditions =
285286
[#if_node_exists{exists = false},
286287
#if_has_payload{has_payload = false}]}]),
287-
case rabbit_khepri:put(Path1, X) of
288+
FeatureFlag = rabbit_feature_flags:is_enabled(
289+
tie_binding_to_dest_with_keep_while_cond),
290+
PutOptions = case FeatureFlag of
291+
true -> put_options(X);
292+
false -> #{}
293+
end,
294+
case rabbit_khepri:put(Path1, X, PutOptions) of
288295
ok ->
289296
{new, X};
290297
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
@@ -293,6 +300,23 @@ create_or_get(#exchange{name = XName} = X) ->
293300
Err
294301
end.
295302

303+
put_options(Exchange) ->
304+
case Exchange of
305+
#exchange{name = #resource{virtual_host = VHost,
306+
name = Name},
307+
auto_delete = true} ->
308+
Path = rabbit_db_binding:khepri_route_path(
309+
VHost,
310+
Name,
311+
_Kind = ?KHEPRI_WILDCARD_STAR,
312+
_DstName = ?KHEPRI_WILDCARD_STAR,
313+
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
314+
KeepWhile = #{Path => #if_has_data{}},
315+
#{keep_while => KeepWhile};
316+
_ ->
317+
#{}
318+
end.
319+
296320
%% -------------------------------------------------------------------
297321
%% set().
298322
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)