Skip to content

Commit ef484f4

Browse files
committed
Upgrade support
1 parent 3e198a7 commit ef484f4

4 files changed

Lines changed: 379 additions & 15 deletions

File tree

deps/rabbit/src/rabbit_core_ff.erl

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,3 +241,14 @@
241241
callbacks => #{enable =>
242242
{rabbit_db_queue, replaced_delete_queue_transaction_enable}}
243243
}}).
244+
245+
-rabbit_feature_flag(
246+
{topic_binding_projection_v4,
247+
#{desc => "Enable the topic binding Khepri projection v4",
248+
stability => stable,
249+
depends_on => ['rabbitmq_4.3.0'],
250+
callbacks => #{enable =>
251+
{rabbit_khepri, topic_binding_projection_enable},
252+
post_enable =>
253+
{rabbit_khepri, topic_binding_projection_post_enable}}
254+
}}).

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414
%% Used by Khepri projections and the Mnesia-to-Khepri migration.
1515
-export([split_topic_key_binary/1]).
1616

17+
-define(KHEPRI_PROJECTION_V3, rabbit_khepri_topic_trie_v3).
18+
1719
-define(TOPIC_TRIE_PROJECTION, rabbit_khepri_topic_trie_v4).
1820
-define(TOPIC_BINDING_PROJECTION, rabbit_khepri_topic_binding_v4).
1921

@@ -32,14 +34,19 @@
3234
%%
3335
%% @private
3436

35-
match(#resource{virtual_host = VHost, name = XName}, RoutingKey, Opts) ->
37+
match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
3638
BKeys = maps:get(return_binding_keys, Opts, false),
3739
Words = split_topic_key_binary(RoutingKey),
38-
try
39-
trie_match({VHost, XName}, root, Words, BKeys, [])
40-
catch
41-
error:badarg ->
42-
[]
40+
case rabbit_khepri:get_effective_topic_binding_projection_version() of
41+
V when V >= 4 ->
42+
try
43+
trie_match({VHost, XName}, root, Words, BKeys, [])
44+
catch
45+
error:badarg ->
46+
[]
47+
end;
48+
_ ->
49+
trie_match_in_khepri(X, Words, BKeys)
4350
end.
4451

4552
-spec split_topic_key_binary(RoutingKey) -> Words when
@@ -162,3 +169,93 @@ format_dest_bkeys([{#resource{kind = queue} = Dest, BKey} | Rest], Acc) ->
162169
format_dest_bkeys(Rest, [{Dest, BKey} | Acc]);
163170
format_dest_bkeys([{Dest, _BKey} | Rest], Acc) ->
164171
format_dest_bkeys(Rest, [Dest | Acc]).
172+
173+
%% --------------------------------------------------------------
174+
%% Internal
175+
%% --------------------------------------------------------------
176+
177+
-spec add_matched([rabbit_types:binding_destination() |
178+
{rabbit_types:binding_destination(), BindingArgs :: list()}],
179+
ReturnBindingKeys :: boolean(),
180+
match_result()) ->
181+
match_result().
182+
add_matched(Destinations, false, Acc) ->
183+
Destinations ++ Acc;
184+
add_matched(DestinationsArgs, true, Acc) ->
185+
lists:foldl(
186+
fun({DestQ = #resource{kind = queue}, BindingArgs}, L) ->
187+
case rabbit_misc:table_lookup(BindingArgs, <<"x-binding-key">>) of
188+
{longstr, BKey} ->
189+
[{DestQ, BKey} | L];
190+
_ ->
191+
[DestQ | L]
192+
end;
193+
({DestX, _BindingArgs}, L) ->
194+
[DestX | L]
195+
end, Acc, DestinationsArgs).
196+
197+
%% Khepri topic graph
198+
199+
trie_match_in_khepri(X, Words, BKeys) ->
200+
try
201+
trie_match_in_khepri(X, root, Words, BKeys, [])
202+
catch
203+
error:badarg ->
204+
[]
205+
end.
206+
207+
trie_match_in_khepri(X, Node, [], BKeys, ResAcc0) ->
208+
Destinations = trie_bindings_in_khepri(X, Node, BKeys),
209+
ResAcc = add_matched(Destinations, BKeys, ResAcc0),
210+
trie_match_part_in_khepri(
211+
X, Node, <<"#">>,
212+
fun trie_match_skip_any_in_khepri/5, [], BKeys, ResAcc);
213+
trie_match_in_khepri(X, Node, [W | RestW] = Words, BKeys, ResAcc) ->
214+
lists:foldl(fun ({WArg, MatchFun, RestWArg}, Acc) ->
215+
trie_match_part_in_khepri(
216+
X, Node, WArg, MatchFun, RestWArg, BKeys, Acc)
217+
end, ResAcc, [{W, fun trie_match_in_khepri/5, RestW},
218+
{<<"*">>, fun trie_match_in_khepri/5, RestW},
219+
{<<"#">>,
220+
fun trie_match_skip_any_in_khepri/5, Words}]).
221+
222+
trie_match_part_in_khepri(X, Node, Search, MatchFun, RestW, BKeys, ResAcc) ->
223+
case trie_child_in_khepri(X, Node, Search) of
224+
{ok, NextNode} -> MatchFun(X, NextNode, RestW, BKeys, ResAcc);
225+
error -> ResAcc
226+
end.
227+
228+
trie_match_skip_any_in_khepri(X, Node, [], BKeys, ResAcc) ->
229+
trie_match_in_khepri(X, Node, [], BKeys, ResAcc);
230+
trie_match_skip_any_in_khepri(X, Node, [_ | RestW] = Words, BKeys, ResAcc) ->
231+
trie_match_skip_any_in_khepri(
232+
X, Node, RestW, BKeys,
233+
trie_match_in_khepri(X, Node, Words, BKeys, ResAcc)).
234+
235+
trie_child_in_khepri(X, Node, Word) ->
236+
case ets:lookup(
237+
?KHEPRI_PROJECTION_V3,
238+
#trie_edge{exchange_name = X,
239+
node_id = Node,
240+
word = Word}) of
241+
[#topic_trie_edge_v2{node_id = NextNode}] -> {ok, NextNode};
242+
[] -> error
243+
end.
244+
245+
trie_bindings_in_khepri(X, Node, BKeys) ->
246+
case ets:lookup(
247+
?KHEPRI_PROJECTION_V3,
248+
#trie_edge{exchange_name = X,
249+
node_id = Node,
250+
word = bindings}) of
251+
[#topic_trie_edge_v2{node_id = {bindings, Bindings}}] ->
252+
[case BKeys of
253+
true ->
254+
{Dest, Args};
255+
false ->
256+
Dest
257+
end || #binding{destination = Dest,
258+
args = Args} <- sets:to_list(Bindings)];
259+
[] ->
260+
[]
261+
end.

0 commit comments

Comments
 (0)