Skip to content

Commit 564e1b2

Browse files
authored
Merge pull request #15859 from rabbitmq/mergify/bp/v4.3.x/pr-15849
Ensure stable routing for `x-modulus-hash` exchange (backport #15849)
2 parents f5c2039 + 5dbdd00 commit 564e1b2

6 files changed

Lines changed: 350 additions & 216 deletions

File tree

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -271,7 +271,7 @@ PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_ty
271271
PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop rabbit_quorum_queue_prop direct_reply_to_amqpl direct_reply_to_amqp classic_queue
272272
PARALLEL_CT_SET_5_B = feature_flags_v2 backing_queue transactions
273273
PARALLEL_CT_SET_5_C = cluster_upgrade maintenance_mode
274-
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel
274+
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel rabbit_exchange_type_modulus_hash
275275

276276
PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
277277
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(rabbit_exchange_type_modulus_hash).
9+
10+
-behaviour(rabbit_exchange_type).
11+
12+
-include_lib("rabbit_common/include/rabbit.hrl").
13+
14+
-export([description/0,
15+
route/3,
16+
serialise_events/0,
17+
info/1,
18+
info/2,
19+
validate/1,
20+
validate_binding/2,
21+
create/2,
22+
delete/2,
23+
policy_changed/2,
24+
add_binding/3,
25+
remove_bindings/3,
26+
assert_args_equivalence/2]).
27+
28+
-rabbit_boot_step({?MODULE,
29+
[{description, "exchange type x-modulus-hash"},
30+
{mfa, {rabbit_registry, register,
31+
[exchange, <<"x-modulus-hash">>, ?MODULE]}},
32+
{requires, rabbit_registry},
33+
{enables, kernel_ready}]}).
34+
35+
%% 2^27
36+
-define(PHASH2_RANGE, 134217728).
37+
38+
description() ->
39+
[{description, <<"Modulus Hashing Exchange">>}].
40+
41+
route(#exchange{name = Name}, Msg, _Options) ->
42+
Destinations = rabbit_router:match_routing_key(Name, ['_']),
43+
case length(Destinations) of
44+
0 ->
45+
[];
46+
Len ->
47+
%% We sort to guarantee stable routing after node restarts.
48+
DestinationsSorted = lists:sort(Destinations),
49+
Hash = erlang:phash2(mc:routing_keys(Msg), ?PHASH2_RANGE),
50+
Destination = lists:nth(Hash rem Len + 1, DestinationsSorted),
51+
[Destination]
52+
end.
53+
54+
info(_) -> [].
55+
info(_, _) -> [].
56+
serialise_events() -> false.
57+
validate(_X) -> ok.
58+
validate_binding(_X, _B) -> ok.
59+
create(_Serial, _X) -> ok.
60+
delete(_Serial, _X) -> ok.
61+
policy_changed(_X1, _X2) -> ok.
62+
add_binding(_Serial, _X, _B) -> ok.
63+
remove_bindings(_Serial, _X, _Bs) -> ok.
64+
assert_args_equivalence(X, Args) ->
65+
rabbit_exchange:assert_args_equivalence(X, Args).
Lines changed: 277 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,277 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
-module(rabbit_exchange_type_modulus_hash_SUITE).
8+
9+
-compile(export_all).
10+
11+
-include_lib("amqp_client/include/amqp_client.hrl").
12+
-include_lib("eunit/include/eunit.hrl").
13+
14+
all() ->
15+
[
16+
{group, tests}
17+
].
18+
19+
groups() ->
20+
[
21+
{tests, [],
22+
[
23+
routed_to_zero_queue_test,
24+
routed_to_one_queue_test,
25+
routed_to_many_queue_test,
26+
stable_routing_across_restarts_test,
27+
weighted_routing_test
28+
]}
29+
].
30+
31+
%% -------------------------------------------------------------------
32+
%% Test suite setup/teardown
33+
%% -------------------------------------------------------------------
34+
35+
init_per_suite(Config) ->
36+
rabbit_ct_helpers:log_environment(),
37+
Config1 = rabbit_ct_helpers:set_config(Config, [
38+
{rmq_nodename_suffix, ?MODULE}
39+
]),
40+
rabbit_ct_helpers:run_setup_steps(Config1,
41+
rabbit_ct_broker_helpers:setup_steps() ++
42+
rabbit_ct_client_helpers:setup_steps()).
43+
44+
end_per_suite(Config) ->
45+
rabbit_ct_helpers:run_teardown_steps(Config,
46+
rabbit_ct_client_helpers:teardown_steps() ++
47+
rabbit_ct_broker_helpers:teardown_steps()).
48+
49+
init_per_group(_, Config) ->
50+
Config.
51+
52+
end_per_group(_, Config) ->
53+
Config.
54+
55+
init_per_testcase(Testcase, Config) ->
56+
TestCaseName = rabbit_ct_helpers:config_to_testcase_name(Config, Testcase),
57+
Config1 = rabbit_ct_helpers:set_config(Config, {test_resource_name,
58+
re:replace(TestCaseName, "/", "-", [global, {return, list}])}),
59+
rabbit_ct_helpers:testcase_started(Config1, Testcase).
60+
61+
end_per_testcase(Testcase, Config) ->
62+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
63+
64+
%% -------------------------------------------------------------------
65+
%% Test cases
66+
%% -------------------------------------------------------------------
67+
68+
routed_to_zero_queue_test(Config) ->
69+
ok = route(Config, [], 5, 0).
70+
71+
routed_to_one_queue_test(Config) ->
72+
ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1).
73+
74+
routed_to_many_queue_test(Config) ->
75+
ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5).
76+
77+
route(Config, Queues, PublishCount, ExpectedRoutedCount) ->
78+
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
79+
B = rabbit_ct_helpers:get_config(Config, test_resource_name),
80+
XNameBin = erlang:list_to_binary("x-" ++ B),
81+
82+
#'exchange.declare_ok'{} = amqp_channel:call(Chan,
83+
#'exchange.declare'{
84+
exchange = XNameBin,
85+
type = <<"x-modulus-hash">>,
86+
durable = true,
87+
auto_delete = true}),
88+
[begin
89+
#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{
90+
queue = Q,
91+
durable = true,
92+
exclusive = true}),
93+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{
94+
queue = Q,
95+
exchange = XNameBin})
96+
end
97+
|| Q <- Queues],
98+
99+
amqp_channel:call(Chan, #'confirm.select'{}),
100+
[amqp_channel:call(Chan,
101+
#'basic.publish'{exchange = XNameBin,
102+
routing_key = rnd()},
103+
#amqp_msg{props = #'P_basic'{},
104+
payload = <<>>}) ||
105+
_ <- lists:duplicate(PublishCount, const)],
106+
amqp_channel:wait_for_confirms_or_die(Chan),
107+
108+
Count = lists:foldl(
109+
fun(Q, Acc) ->
110+
#'queue.declare_ok'{message_count = M} = amqp_channel:call(
111+
Chan,
112+
#'queue.declare'{
113+
queue = Q,
114+
durable = true,
115+
exclusive = true}),
116+
Acc + M
117+
end, 0, Queues),
118+
?assertEqual(ExpectedRoutedCount, Count),
119+
120+
amqp_channel:call(Chan, #'exchange.delete'{exchange = XNameBin}),
121+
[amqp_channel:call(Chan, #'queue.delete'{queue = Q}) || Q <- Queues],
122+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan).
123+
124+
stable_routing_across_restarts_test(Config) ->
125+
{Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
126+
XNameBin = atom_to_binary(?FUNCTION_NAME),
127+
NumQs = 40,
128+
NumMsgs = 500,
129+
130+
#'exchange.declare_ok'{} = amqp_channel:call(Chan1,
131+
#'exchange.declare'{
132+
exchange = XNameBin,
133+
type = <<"x-modulus-hash">>,
134+
durable = true}),
135+
Queues = [erlang:list_to_binary("q-" ++ integer_to_list(I)) || I <- lists:seq(1, NumQs)],
136+
[begin
137+
#'queue.declare_ok'{} = amqp_channel:call(Chan1, #'queue.declare'{
138+
queue = Q,
139+
durable = true}),
140+
#'queue.bind_ok'{} = amqp_channel:call(Chan1, #'queue.bind'{
141+
queue = Q,
142+
exchange = XNameBin,
143+
%% The routing key shouldn't matter
144+
routing_key = rnd()})
145+
end
146+
|| Q <- Queues],
147+
148+
RoutingKeys = [rnd() || _ <- lists:seq(1, NumMsgs)],
149+
150+
amqp_channel:call(Chan1, #'confirm.select'{}),
151+
[amqp_channel:call(Chan1,
152+
#'basic.publish'{exchange = XNameBin,
153+
routing_key = RK},
154+
#amqp_msg{payload = RK})
155+
|| RK <- RoutingKeys],
156+
amqp_channel:wait_for_confirms_or_die(Chan1),
157+
158+
Map1 = consume_all(Chan1, Queues),
159+
160+
%% Assert at least two queues got messages
161+
NonEmptyQueues1 = maps:filter(fun(_Q, Msgs) -> length(Msgs) > 0 end, Map1),
162+
?assert(maps:size(NonEmptyQueues1) >= 2),
163+
164+
%% Assert all messages routed
165+
?assertEqual(NumMsgs, lists:sum([length(Msgs) || Msgs <- maps:values(Map1)])),
166+
167+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1),
168+
%% Restart node
169+
ok = rabbit_ct_broker_helpers:restart_node(Config, 0),
170+
{Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
171+
172+
amqp_channel:call(Chan2, #'confirm.select'{}),
173+
%% Publish the same messages again.
174+
[amqp_channel:call(Chan2,
175+
#'basic.publish'{exchange = XNameBin,
176+
routing_key = RK},
177+
#amqp_msg{payload = RK})
178+
|| RK <- RoutingKeys],
179+
amqp_channel:wait_for_confirms_or_die(Chan2),
180+
181+
Map2 = consume_all(Chan2, Queues),
182+
183+
%% Assert the same messages ended up in the same queues,
184+
%% i.e. that routing was stable.
185+
?assertEqual(Map1, Map2),
186+
187+
amqp_channel:call(Chan2, #'exchange.delete'{exchange = XNameBin}),
188+
[amqp_channel:call(Chan2, #'queue.delete'{queue = Q}) || Q <- Queues],
189+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2).
190+
191+
weighted_routing_test(Config) ->
192+
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
193+
XNameBin = atom_to_binary(?FUNCTION_NAME),
194+
Queues = [<<"q1">>, <<"q2">>, <<"q3">>],
195+
NumMsgs = 600,
196+
197+
#'exchange.declare_ok'{} = amqp_channel:call(Chan,
198+
#'exchange.declare'{
199+
exchange = XNameBin,
200+
type = <<"x-modulus-hash">>,
201+
durable = true}),
202+
203+
[#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{queue = Q,
204+
durable = true})
205+
|| Q <- Queues],
206+
207+
%% Bind q1 once
208+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q1">>,
209+
exchange = XNameBin}),
210+
211+
%% Bind q2 twice
212+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q2">>,
213+
exchange = XNameBin,
214+
routing_key = <<"a">>}),
215+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q2">>,
216+
exchange = XNameBin,
217+
routing_key = <<"b">>}),
218+
219+
%% Bind q3 three times
220+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q3">>,
221+
exchange = XNameBin,
222+
routing_key = <<"a">>}),
223+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q3">>,
224+
exchange = XNameBin,
225+
routing_key = <<"b">>}),
226+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{queue = <<"q3">>,
227+
exchange = XNameBin,
228+
routing_key = <<"c">>}),
229+
230+
amqp_channel:call(Chan, #'confirm.select'{}),
231+
[amqp_channel:call(Chan,
232+
#'basic.publish'{exchange = XNameBin,
233+
routing_key = integer_to_binary(I)},
234+
#amqp_msg{})
235+
|| I <- lists:seq(1, NumMsgs)],
236+
amqp_channel:wait_for_confirms_or_die(Chan),
237+
238+
Counts = lists:foldl(
239+
fun(Q, Acc) ->
240+
#'queue.declare_ok'{message_count = M} = amqp_channel:call(
241+
Chan,
242+
#'queue.declare'{queue = Q,
243+
durable = true}),
244+
maps:put(Q, M, Acc)
245+
end, #{}, Queues),
246+
247+
C1 = maps:get(<<"q1">>, Counts),
248+
C2 = maps:get(<<"q2">>, Counts),
249+
C3 = maps:get(<<"q3">>, Counts),
250+
ct:pal("q1: ~b, q2: ~b, q3: ~b", [C1, C2, C3]),
251+
252+
?assertEqual(NumMsgs, C1 + C2 + C3),
253+
%% Assert weighted distribution
254+
?assert(C1 < C2),
255+
?assert(C2 < C3),
256+
257+
amqp_channel:call(Chan, #'exchange.delete'{exchange = XNameBin}),
258+
[amqp_channel:call(Chan, #'queue.delete'{queue = Q}) || Q <- Queues],
259+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan).
260+
261+
consume_all(Chan, Queues) ->
262+
lists:foldl(fun(Q, Map) ->
263+
Msgs = consume_queue(Chan, Q, []),
264+
maps:put(Q, Msgs, Map)
265+
end, #{}, Queues).
266+
267+
consume_queue(Chan, Q, L) ->
268+
case amqp_channel:call(Chan, #'basic.get'{queue = Q,
269+
no_ack = true}) of
270+
#'basic.get_empty'{} ->
271+
L;
272+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload}} ->
273+
consume_queue(Chan, Q, L ++ [Payload])
274+
end.
275+
276+
rnd() ->
277+
integer_to_binary(rand:uniform(10_000_000)).

deps/rabbitmq_sharding/README.md

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,23 +48,21 @@ Do not use this plugin with quorum queues. Avoid classic mirrored queues in gene
4848

4949
## Messages Distribution Between Shards (Partitioning)
5050

51-
The exchanges that ship by default with RabbitMQ work in an "all or
52-
nothing" fashion, i.e: if a routing key matches a set of queues bound
53-
to the exchange, then RabbitMQ will route the message to all the
54-
queues in that set. For this plugin to work it is necessary to
55-
route messages to an exchange that would partition messages, so they
56-
are routed to _at most_ one queue (a subset).
57-
58-
The plugin provides a new exchange type, `"x-modulus-hash"`, that will use
51+
RabbitMQ provides a built-in exchange type, `"x-modulus-hash"`, that will use
5952
a hashing function to partition messages routed to a logical queue
60-
across a number of regular queues (shards).
53+
across a number of regular queues (shards). This exchange type is available
54+
in core RabbitMQ and does not require enabling this plugin to be used.
6155

6256
The `"x-modulus-hash"` exchange will hash the routing key used to
6357
publish the message and then it will apply a `Hash mod N` to pick the
6458
queue where to route the message, where N is the number of queues
6559
bound to the exchange. **This exchange will completely ignore the
6660
binding key used to bind the queue to the exchange**.
6761

62+
This exchange guarantees stable routing. As long as the bindings to the exchange remain the same,
63+
messages with the same routing key will always be routed to exactly the same destination queue,
64+
even across node restarts.
65+
6866
There are other exchanges with similar behaviour:
6967
the _Consistent Hash Exchange_ or the _Random Exchange_.
7068
Those were designed with regular queues in mind, not this plugin, so `"x-modulus-hash"`

0 commit comments

Comments
 (0)