Skip to content

Commit 5b76361

Browse files
committed
Fix routing for empty binding key to topic exchange
Fixes #16221 ## What? Fix a bug in the Khepri v4 topic trie projection where messages published with an empty routing key to a topic exchange were incorrectly routed to queues bound to other topic exchanges with an empty binding key. ## Why? The MQTT 5.0 spec states: > All Topic Names and Topic Filters MUST be at least one character long In contrast, the AMQP 0.9.1 spec state: > The routing key used for a topic exchange MUST consist of **zero** or more words delimited by dots. Each word may contain the letters A-Z and a-z and digits 0-9. The routing pattern follows the same rules as the routing key with the addition that * matches a single word, and # matches zero or more words Hence zero words, i.e. empty routing keys and empty binding keys, are expliclity allowed for the topic exchange type. In the Khepri v4 projection, the topic trie used a global `root` atom as the initial node ID for all topic exchanges. When a binding was created with an empty binding key (`<<>>`), `split_topic_key_binary/1` returned an empty list (`[]`). The trie traversal stopped immediately at the root, meaning the binding was inserted into the `rabbit_khepri_topic_binding_v4` ETS table with the global `root` atom as the `LeafNodeId`. Because the `LeafNodeId` lacked any exchange-specific context, all empty bindings across all topic exchanges were attached to the exact same global `root` node. Consequently, when a message was published with an empty routing key to any topic exchange, the routing logic (`trie_bindings/3`) would scan the ETS table starting at the global `root` node and incorrectly match bindings belonging to completely unrelated topic exchanges. ## How? To fix this and ensure exchange isolation, the conceptual root of the trie was changed from the global `root` atom to an exchange-specific tuple: `{root, XSrc}` (where `XSrc` is `{VHost, ExchangeName}`). - `rabbit_khepri:trie_follow_down_create/3` and `trie_follow_down_get_path/3` now initialize their trie traversal with `{root, XSrc}`. - `rabbit_db_topic_exchange:trie_match/5` now initiates routing from `{root, {VHost, XName}}`. This structural change guarantees that empty bindings are isolated per exchange in the bindings ETS table, completely resolving the cross-exchange leakage while fully supporting AMQP 0-9-1's allowance for empty binding keys without breaking backward compatibility.
1 parent b1f3d4b commit 5b76361

3 files changed

Lines changed: 70 additions & 3 deletions

File tree

deps/rabbit/src/rabbit_db_topic_exchange.erl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,9 @@ match(#resource{virtual_host = VHost, name = XName} = X, RoutingKey, Opts) ->
3939
Words = split_topic_key_binary(RoutingKey),
4040
case rabbit_khepri:get_effective_topic_binding_projection_version() of
4141
V when V >= 4 ->
42+
XSrc = {VHost, XName},
4243
try
43-
trie_match({VHost, XName}, root, Words, BKeys, [])
44+
trie_match(XSrc, {root, XSrc}, Words, BKeys, [])
4445
catch
4546
error:badarg ->
4647
[]

deps/rabbit/src/rabbit_khepri.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1733,7 +1733,7 @@ topic_binding_path_pattern() ->
17331733
%% ChildCount tracks the number of outgoing edges from ChildNodeId.
17341734
%% It is incremented when a new edge is created, decremented during GC.
17351735
trie_follow_down_create(TrieTab, XSrc, Words) ->
1736-
trie_follow_down_create(TrieTab, XSrc, root, none, Words).
1736+
trie_follow_down_create(TrieTab, XSrc, {root, XSrc}, none, Words).
17371737

17381738
trie_follow_down_create(_TrieTab, _XSrc, NodeId, _ParentKey, []) ->
17391739
NodeId;
@@ -1757,7 +1757,7 @@ trie_follow_down_create(TrieTab, XSrc, ParentId, ParentKey, [Word | Rest]) ->
17571757
%% Walk down the trie following the given words, collecting the path
17581758
%% for later GC. Returns {ok, LeafNodeId, Path} or error.
17591759
trie_follow_down_get_path(TrieTab, XSrc, Words) ->
1760-
trie_follow_down_get_path(TrieTab, XSrc, root, none, Words, []).
1760+
trie_follow_down_get_path(TrieTab, XSrc, {root, XSrc}, none, Words, []).
17611761

17621762
trie_follow_down_get_path(_TrieTab, _XSrc, NodeId, _ParentKey, [], Path) ->
17631763
{ok, NodeId, Path};

deps/rabbit/test/bindings_SUITE.erl

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ all_tests() ->
4646
unbind_from_volatile_queue,
4747
binding_args_direct_exchange,
4848
binding_args_fanout_exchange,
49+
topic_exchange_zero_words,
4950

5051
%% Exchange bindings
5152
bind_and_unbind_direct_exchange,
@@ -706,6 +707,71 @@ binding_args(Exchange, Config) ->
706707
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
707708
amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = true})).
708709

710+
%% Test case for https://github.com/rabbitmq/rabbitmq-server/discussions/16221
711+
%%
712+
%% "The routing key used for a topic exchange MUST consist of zero or more words delimited by dots.
713+
%% Each word may contain the letters A-Z and a-z and digits 0-9.
714+
%% The routing pattern follows the same rules as the routing key with the addition
715+
%% that * matches a single word, and # matches zero or more words."
716+
%% [AMQP 0.9.1]
717+
%%
718+
%% Here, we test a zero words routing key and routing pattern.
719+
topic_exchange_zero_words(Config) ->
720+
ZeroWords = <<>>,
721+
722+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
723+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
724+
725+
X1 = <<"x1">>,
726+
X2 = <<"x2">>,
727+
Q1 = <<"q1">>,
728+
Q2 = <<"q2">>,
729+
730+
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X1,
731+
type = <<"topic">>}),
732+
#'exchange.declare_ok'{} = amqp_channel:call(Ch, #'exchange.declare'{exchange = X2,
733+
type = <<"topic">>}),
734+
#'queue.declare_ok'{} = declare(Ch, Q1, []),
735+
#'queue.declare_ok'{} = declare(Ch, Q2, []),
736+
737+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X1,
738+
queue = Q1,
739+
routing_key = ZeroWords}),
740+
#'queue.bind_ok'{} = amqp_channel:call(Ch, #'queue.bind'{exchange = X2,
741+
queue = Q2,
742+
routing_key = ZeroWords}),
743+
744+
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
745+
amqp_channel:register_confirm_handler(Ch, self()),
746+
ok = amqp_channel:cast(Ch,
747+
#'basic.publish'{exchange = X1,
748+
routing_key = ZeroWords},
749+
#amqp_msg{payload = <<"m1">>}),
750+
receive #'basic.ack'{} -> ok
751+
after 9000 -> ct:fail(confirm_timeout)
752+
end,
753+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m1">>}},
754+
amqp_channel:call(Ch, #'basic.get'{queue = Q1,
755+
no_ack = true})),
756+
757+
ok = amqp_channel:cast(Ch,
758+
#'basic.publish'{exchange = X2,
759+
routing_key = ZeroWords},
760+
#amqp_msg{payload = <<"m2">>}),
761+
receive #'basic.ack'{} -> ok
762+
after 9000 -> ct:fail(confirm_timeout)
763+
end,
764+
?assertMatch({#'basic.get_ok'{}, #amqp_msg{payload = <<"m2">>}},
765+
amqp_channel:call(Ch, #'basic.get'{queue = Q2, no_ack = true})),
766+
767+
?assertMatch(#'queue.delete_ok'{message_count = 0},
768+
amqp_channel:call(Ch, #'queue.delete'{queue = Q1})),
769+
?assertMatch(#'queue.delete_ok'{message_count = 0},
770+
amqp_channel:call(Ch, #'queue.delete'{queue = Q2})),
771+
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X1}),
772+
#'exchange.delete_ok'{} = amqp_channel:call(Ch, #'exchange.delete'{exchange = X2}),
773+
rabbit_ct_client_helpers:close_channel(Ch).
774+
709775
bind_and_unbind_direct_exchange(Config) ->
710776
bind_and_unbind_exchange(<<"direct">>, Config).
711777

0 commit comments

Comments
 (0)