Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions deps/rabbit/src/rabbit_core_ff.erl
Original file line number Diff line number Diff line change
Expand Up @@ -232,3 +232,15 @@
stability => stable,
depends_on => ['rabbitmq_4.3.0']
}}).

-rabbit_feature_flag(
{tie_binding_to_dest_with_keep_while_cond,
#{desc =>
"Use keep_while condition to tie a binding record lifetime to its "
"destination record in Khepri",
stability => stable,
depends_on => ['rabbitmq_4.3.0'],
callbacks => #{enable =>
{rabbit_db_queue,
tie_binding_to_dest_with_keep_while_cond_enable}}
}}).
143 changes: 136 additions & 7 deletions deps/rabbit/src/rabbit_db_binding.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,9 @@
delete_for_destination_in_khepri/2,
delete_all_for_exchange_in_khepri/3,
has_for_source_in_khepri/1,
match_source_and_destination_in_khepri_tx/2
match_source_and_destination_in_khepri_tx/2,
khepri_ret_to_deletions/2,
put_options/1
]).

-export([
Expand Down Expand Up @@ -123,6 +125,12 @@ create(#binding{source = SrcName,
case ChecksFun(Src, Dst) of
ok ->
RoutePath = khepri_route_path(Binding),
FeatureFlag = rabbit_feature_flags:is_enabled(
tie_binding_to_dest_with_keep_while_cond),
PutOptions = case FeatureFlag of
true -> put_options(DstName);
false -> #{}
end,
MaybeSerial = rabbit_exchange:serialise_events(Src),
Serial = rabbit_khepri:transaction(
fun() ->
Expand All @@ -132,11 +140,17 @@ create(#binding{source = SrcName,
true ->
already_exists;
false ->
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, Set)),
ok = khepri_tx:put(
RoutePath,
sets:add_element(Binding, Set),
PutOptions),
serial_in_khepri(MaybeSerial, Src)
end;
_ ->
ok = khepri_tx:put(RoutePath, sets:add_element(Binding, sets:new([{version, 2}]))),
ok = khepri_tx:put(
RoutePath,
sets:add_element(Binding, sets:new([{version, 2}])),
PutOptions),
serial_in_khepri(MaybeSerial, Src)
end
end, rw),
Expand Down Expand Up @@ -166,6 +180,30 @@ lookup_resource(#resource{kind = exchange} = Name) ->
_ -> []
end.

put_options(#resource{virtual_host = VHost, name = DstName, kind = Kind}) ->
put_options(VHost, DstName, Kind);
put_options(BindingPath) when is_list(BindingPath) ->
{
VHost,
_SrcName,
Kind,
DstName,
_RoutingKey
} = rabbit_db_binding:khepri_route_path_to_args(BindingPath),
put_options(VHost, DstName, Kind).

put_options(VHost, DstName, Kind) ->
DstPath = case Kind of
queue ->
rabbit_db_queue:khepri_queue_path(
VHost, DstName);
exchange ->
rabbit_db_exchange:khepri_exchange_path(
VHost, DstName)
end,
KeepWhile = #{DstPath => #if_node_exists{}},
#{keep_while => KeepWhile}.

serial_in_khepri(false, _) ->
none;
serial_in_khepri(true, X) ->
Expand All @@ -190,8 +228,18 @@ serial_in_khepri(true, X) ->
%%
%% @private

delete(#binding{source = SrcName,
destination = DstName} = Binding, ChecksFun) ->
delete(Binding, ChecksFun) ->
FeatureFlag = rabbit_feature_flags:is_enabled(
tie_binding_to_dest_with_keep_while_cond),
case FeatureFlag of
true ->
delete_v2(Binding, ChecksFun);
false ->
delete_v1(Binding, ChecksFun)
end.

delete_v1(#binding{source = SrcName,
destination = DstName} = Binding, ChecksFun) ->
Path = khepri_route_path(Binding),
case rabbit_khepri:transaction(
fun () ->
Expand All @@ -204,7 +252,7 @@ delete(#binding{source = SrcName,
true ->
case ChecksFun(Src, Dst) of
ok ->
ok = delete_in_khepri(Binding),
ok = delete_in_khepri_tx_v1(Binding),
maybe_auto_delete_exchange_in_khepri(Binding#binding.source, [Binding], rabbit_binding:new_deletions(), false);
{error, _} = Err ->
Err
Expand All @@ -224,6 +272,40 @@ delete(#binding{source = SrcName,
{ok, Deletions}
end.

delete_v2(#binding{source = SrcName,
destination = DstName} = Binding, ChecksFun) ->
Path = khepri_route_path(Binding),
case rabbit_khepri:transaction(
fun () ->
case {lookup_resource_in_khepri_tx(SrcName),
lookup_resource_in_khepri_tx(DstName)} of
{[Src], [Dst]} ->
case exists_in_khepri(Path, Binding) of
false ->
ok;
true ->
case ChecksFun(Src, Dst) of
ok ->
delete_in_khepri_tx_v2(Binding);
{error, _} = Err ->
Err
end
end;
_Errs ->
%% No absent queues, always present on disk
ok
end
end) of
ok ->
ok;
{error, _} = Err ->
Err;
{ok, Deleted} ->
Deletions = khepri_ret_to_deletions(Deleted, false),
ok = rabbit_binding:process_deletions(Deletions),
{ok, Deletions}
end.

exists_in_khepri(Path, Binding) ->
case khepri_tx:get(Path) of
{ok, Set} ->
Expand All @@ -232,7 +314,7 @@ exists_in_khepri(Path, Binding) ->
false
end.

delete_in_khepri(Binding) ->
delete_in_khepri_tx_v1(Binding) ->
Path = khepri_route_path(Binding),
case khepri_tx:get(Path) of
{ok, Set0} ->
Expand All @@ -247,6 +329,21 @@ delete_in_khepri(Binding) ->
ok
end.

delete_in_khepri_tx_v2(Binding) ->
Path = khepri_route_path(Binding),
case khepri_tx:get(Path) of
{ok, Set0} ->
Set = sets:del_element(Binding, Set0),
case sets:is_empty(Set) of
true ->
khepri_tx_adv:delete(Path);
false ->
ok = khepri_tx:put(Path, Set)
end;
_ ->
ok
end.

maybe_auto_delete_exchange_in_khepri(XName, Bindings, Deletions, OnlyDurable) ->
case rabbit_db_exchange:maybe_auto_delete_in_khepri(XName, OnlyDurable) of
{not_deleted, undefined} ->
Expand Down Expand Up @@ -562,6 +659,38 @@ delete_for_destination_in_khepri(#resource{virtual_host = VHost, kind = Kind, na
rabbit_binding:group_bindings_fold(fun maybe_auto_delete_exchange_in_khepri/4,
lists:keysort(#binding.source, Bindings), OnlyDurable).

khepri_ret_to_deletions(Deleted, OnlyDurable) ->
Bindings0 = maps:fold(
fun(Path, Props, Acc) ->
case {Path, Props} of
{?RABBITMQ_KHEPRI_ROUTE_PATH(
_VHost, _SrcName, _Kind, _Name, _RoutingKey),
#{data := Set}} ->
sets:to_list(Set) ++ Acc;
{_, _} ->
Acc
end
end, [], Deleted),
Bindings1 = lists:keysort(#binding.source, Bindings0),
rabbit_binding:group_bindings_fold(
fun(XName, Bindings, Deletions, _OnlyDurable) ->
ExchangePath = rabbit_db_exchange:khepri_exchange_path(XName),
case Deleted of
#{ExchangePath := #{data := X}} ->
rabbit_binding:add_deletion(
XName, X, deleted, Bindings, Deletions);
_ ->
case rabbit_db_exchange:get(XName) of
{ok, X} ->
rabbit_binding:add_deletion(
XName, X, not_deleted, Bindings, Deletions);
_ ->
Deletions
end
end
end,
Bindings1, OnlyDurable).

%% -------------------------------------------------------------------
%% has_for_source_in_khepri().
%% -------------------------------------------------------------------
Expand Down
28 changes: 26 additions & 2 deletions deps/rabbit/src/rabbit_db_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
get_in_khepri_tx/1,
update_in_khepri_tx/2,
clear_exchanges_in_khepri/0,
clear_exchange_serials_in_khepri/0
clear_exchange_serials_in_khepri/0,
put_options/1
]).

%% For testing
Expand Down Expand Up @@ -284,7 +285,13 @@ create_or_get(#exchange{name = XName} = X) ->
Path0, [#if_any{conditions =
[#if_node_exists{exists = false},
#if_has_payload{has_payload = false}]}]),
case rabbit_khepri:put(Path1, X) of
FeatureFlag = rabbit_feature_flags:is_enabled(
tie_binding_to_dest_with_keep_while_cond),
PutOptions = case FeatureFlag of
true -> put_options(X);
false -> #{}
end,
case rabbit_khepri:put(Path1, X, PutOptions) of
ok ->
{new, X};
{error, {khepri, mismatching_node, #{node_props := #{data := ExistingX}}}} ->
Expand All @@ -293,6 +300,23 @@ create_or_get(#exchange{name = XName} = X) ->
Err
end.

put_options(Exchange) ->
case Exchange of
#exchange{name = #resource{virtual_host = VHost,
name = Name},
auto_delete = true} ->
Path = rabbit_db_binding:khepri_route_path(
VHost,
Name,
_Kind = ?KHEPRI_WILDCARD_STAR,
_DstName = ?KHEPRI_WILDCARD_STAR,
_RoutingKey = ?KHEPRI_WILDCARD_STAR),
KeepWhile = #{Path => #if_has_data{}},
#{keep_while => KeepWhile};
_ ->
#{}
end.

%% -------------------------------------------------------------------
%% set().
%% -------------------------------------------------------------------
Expand Down
Loading
Loading