Skip to content

Commit 6900588

Browse files
committed
Ensure stable routing for x-modulus-hash exchange
## What? This commit makes routing stable for the `x-modulus-hash` exchange: If the same destination queues stay bound to the exchange (i.e. do not bind or unbind queues after the "initial setup"), messages with the same domain entity (routing key) will always end up in the same destination queue, even across node restarts. ## How? With Mnesia this was guaranteed due to order_set tables. Khepri introduced a regression since it uses a bag ETS projection table. This commit simply sorts the destinations before picking the Nth destination. ## Why? Uses cases where message order matters are common. This commit allows to for example bind N quorum queues to an `x-modulus-hash` exchange instance (the binding key doesn't matter) and use the Single Active Consumer (SAC) feature on each quorum queue. This will provide * message ordering thanks to stable routing and SAC * concurrent consumption: N app instances can process messages in parallel * fault tolerance since the broker will deliver messages to another consumer when the active consumer crashes Using the consistent hash exchange is an alternative, but unnecessarily complex for this use case. Yet another alternative for this use case is using the murmur3 exchange type (#8319).
1 parent 8a41c1f commit 6900588

2 files changed

Lines changed: 164 additions & 98 deletions

File tree

deps/rabbitmq_sharding/src/rabbit_sharding_exchange_type_modulus_hash.erl

Lines changed: 30 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -11,40 +11,50 @@
1111

1212
-behaviour(rabbit_exchange_type).
1313

14-
-export([description/0, serialise_events/0, route/3, info/1, info/2]).
15-
-export([validate/1, validate_binding/2,
16-
create/2, delete/2, policy_changed/2,
17-
add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
14+
-export([description/0,
15+
route/3,
16+
serialise_events/0,
17+
info/1,
18+
info/2,
19+
validate/1,
20+
validate_binding/2,
21+
create/2,
22+
delete/2,
23+
policy_changed/2,
24+
add_binding/3,
25+
remove_bindings/3,
26+
assert_args_equivalence/2]).
1827

1928
-rabbit_boot_step(
2029
{rabbit_sharding_exchange_type_modulus_hash_registry,
2130
[{description, "exchange type x-modulus-hash: registry"},
22-
{mfa, {rabbit_registry, register,
23-
[exchange, <<"x-modulus-hash">>, ?MODULE]}},
24-
{cleanup, {rabbit_registry, unregister,
25-
[exchange, <<"x-modulus-hash">>]}},
26-
{requires, rabbit_registry},
27-
{enables, kernel_ready}]}).
31+
{mfa, {rabbit_registry, register, [exchange, <<"x-modulus-hash">>, ?MODULE]}},
32+
{cleanup, {rabbit_registry, unregister, [exchange, <<"x-modulus-hash">>]}},
33+
{requires, rabbit_registry},
34+
{enables, kernel_ready}]}).
2835

29-
-define(PHASH2_RANGE, 134217728). %% 2^27
36+
%% 2^27
37+
-define(PHASH2_RANGE, 134217728).
3038

3139
description() ->
3240
[{description, <<"Modulus Hashing Exchange">>}].
3341

34-
serialise_events() -> false.
35-
3642
route(#exchange{name = Name}, Msg, _Options) ->
37-
Routes = mc:routing_keys(Msg),
38-
Qs = rabbit_router:match_routing_key(Name, ['_']),
39-
case length(Qs) of
40-
0 -> [];
41-
N -> [lists:nth(hash_mod(Routes, N), Qs)]
43+
Destinations = rabbit_router:match_routing_key(Name, ['_']),
44+
case length(Destinations) of
45+
0 ->
46+
[];
47+
Len ->
48+
%% We sort to guarantee stable routing after node restarts.
49+
DestinationsSorted = lists:sort(Destinations),
50+
Hash = erlang:phash2(mc:routing_keys(Msg), ?PHASH2_RANGE),
51+
Destination = lists:nth(Hash rem Len + 1, DestinationsSorted),
52+
[Destination]
4253
end.
4354

4455
info(_) -> [].
45-
4656
info(_, _) -> [].
47-
57+
serialise_events() -> false.
4858
validate(_X) -> ok.
4959
validate_binding(_X, _B) -> ok.
5060
create(_Serial, _X) -> ok.
@@ -54,7 +64,3 @@ add_binding(_Serial, _X, _B) -> ok.
5464
remove_bindings(_Serial, _X, _Bs) -> ok.
5565
assert_args_equivalence(X, Args) ->
5666
rabbit_exchange:assert_args_equivalence(X, Args).
57-
58-
hash_mod(Routes, N) ->
59-
M = erlang:phash2(Routes, ?PHASH2_RANGE) rem N,
60-
M + 1. %% erlang lists are 1..N indexed.

deps/rabbitmq_sharding/test/rabbit_hash_exchange_SUITE.erl

Lines changed: 134 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,18 @@
1313

1414
all() ->
1515
[
16-
{group, non_parallel_tests}
16+
{group, tests}
1717
].
1818

1919
groups() ->
2020
[
21-
{non_parallel_tests, [], [
22-
routed_to_zero_queue_test,
23-
routed_to_one_queue_test,
24-
routed_to_many_queue_test
25-
]}
21+
{tests, [],
22+
[
23+
routed_to_zero_queue_test,
24+
routed_to_one_queue_test,
25+
routed_to_many_queue_test,
26+
stable_routing_across_restarts_test
27+
]}
2628
].
2729

2830
%% -------------------------------------------------------------------
@@ -63,84 +65,142 @@ end_per_testcase(Testcase, Config) ->
6365
%% -------------------------------------------------------------------
6466

6567
routed_to_zero_queue_test(Config) ->
66-
test0(Config, fun () ->
67-
#'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()}
68-
end,
69-
fun() ->
70-
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
71-
end, [], 5, 0),
72-
73-
passed.
68+
ok = route(Config, [], 5, 0).
7469

7570
routed_to_one_queue_test(Config) ->
76-
test0(Config, fun () ->
77-
#'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()}
78-
end,
79-
fun() ->
80-
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
81-
end, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1),
82-
83-
passed.
71+
ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 1, 1).
8472

8573
routed_to_many_queue_test(Config) ->
86-
test0(Config, fun () ->
87-
#'basic.publish'{exchange = make_exchange_name(Config, "0"), routing_key = rnd()}
88-
end,
89-
fun() ->
90-
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
91-
end, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5),
92-
93-
passed.
94-
95-
test0(Config, MakeMethod, MakeMsg, Queues, MsgCount, Count) ->
96-
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0),
97-
E = make_exchange_name(Config, "0"),
98-
99-
#'exchange.declare_ok'{} =
100-
amqp_channel:call(Chan,
101-
#'exchange.declare' {
102-
exchange = E,
103-
type = <<"x-modulus-hash">>,
104-
auto_delete = true
105-
}),
106-
[#'queue.declare_ok'{} =
107-
amqp_channel:call(Chan, #'queue.declare' {
108-
queue = Q, exclusive = true }) || Q <- Queues],
109-
[#'queue.bind_ok'{} =
110-
amqp_channel:call(Chan, #'queue.bind'{queue = Q,
111-
exchange = E,
112-
routing_key = <<"">>})
74+
ok = route(Config, [<<"q1">>, <<"q2">>, <<"q3">>], 5, 5).
75+
76+
route(Config, Queues, PublishCount, ExpectedRoutedCount) ->
77+
{Conn, Chan} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
78+
B = rabbit_ct_helpers:get_config(Config, test_resource_name),
79+
XNameBin = erlang:list_to_binary("x-" ++ B),
80+
81+
#'exchange.declare_ok'{} = amqp_channel:call(Chan,
82+
#'exchange.declare'{
83+
exchange = XNameBin,
84+
type = <<"x-modulus-hash">>,
85+
durable = true,
86+
auto_delete = true}),
87+
[begin
88+
#'queue.declare_ok'{} = amqp_channel:call(Chan, #'queue.declare'{
89+
queue = Q,
90+
durable = true,
91+
exclusive = true}),
92+
#'queue.bind_ok'{} = amqp_channel:call(Chan, #'queue.bind'{
93+
queue = Q,
94+
exchange = XNameBin})
95+
end
11396
|| Q <- Queues],
11497

11598
amqp_channel:call(Chan, #'confirm.select'{}),
116-
11799
[amqp_channel:call(Chan,
118-
MakeMethod(),
119-
MakeMsg()) || _ <- lists:duplicate(MsgCount, const)],
120-
121-
% ensure that the messages have been delivered to the queues before asking
122-
% for the message count
100+
#'basic.publish'{exchange = XNameBin,
101+
routing_key = rnd()},
102+
#amqp_msg{props = #'P_basic'{},
103+
payload = <<>>}) ||
104+
_ <- lists:duplicate(PublishCount, const)],
123105
amqp_channel:wait_for_confirms_or_die(Chan),
124106

125-
Counts =
126-
[begin
127-
#'queue.declare_ok'{message_count = M} =
128-
amqp_channel:call(Chan, #'queue.declare' {queue = Q,
129-
exclusive = true }),
130-
M
131-
end || Q <- Queues],
132-
133-
?assertEqual(Count, lists:sum(Counts)),
134-
135-
amqp_channel:call(Chan, #'exchange.delete' { exchange = E }),
136-
[amqp_channel:call(Chan, #'queue.delete' { queue = Q }) || Q <- Queues],
107+
Count = lists:foldl(
108+
fun(Q, Acc) ->
109+
#'queue.declare_ok'{message_count = M} = amqp_channel:call(
110+
Chan,
111+
#'queue.declare'{
112+
queue = Q,
113+
durable = true,
114+
exclusive = true}),
115+
Acc + M
116+
end, 0, Queues),
117+
?assertEqual(ExpectedRoutedCount, Count),
118+
119+
amqp_channel:call(Chan, #'exchange.delete'{exchange = XNameBin}),
120+
[amqp_channel:call(Chan, #'queue.delete'{queue = Q}) || Q <- Queues],
121+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan).
122+
123+
stable_routing_across_restarts_test(Config) ->
124+
{Conn1, Chan1} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
125+
XNameBin = atom_to_binary(?FUNCTION_NAME),
126+
NumQs = 40,
127+
NumMsgs = 500,
128+
129+
#'exchange.declare_ok'{} = amqp_channel:call(Chan1,
130+
#'exchange.declare'{
131+
exchange = XNameBin,
132+
type = <<"x-modulus-hash">>,
133+
durable = true}),
134+
Queues = [erlang:list_to_binary("q-" ++ integer_to_list(I)) || I <- lists:seq(1, NumQs)],
135+
[begin
136+
#'queue.declare_ok'{} = amqp_channel:call(Chan1, #'queue.declare'{
137+
queue = Q,
138+
durable = true}),
139+
#'queue.bind_ok'{} = amqp_channel:call(Chan1, #'queue.bind'{
140+
queue = Q,
141+
exchange = XNameBin,
142+
%% The routing key shouldn't matter
143+
routing_key = rnd()})
144+
end
145+
|| Q <- Queues],
137146

138-
rabbit_ct_client_helpers:close_connection_and_channel(Conn, Chan),
139-
ok.
147+
RoutingKeys = [rnd() || _ <- lists:seq(1, NumMsgs)],
148+
149+
amqp_channel:call(Chan1, #'confirm.select'{}),
150+
[amqp_channel:call(Chan1,
151+
#'basic.publish'{exchange = XNameBin,
152+
routing_key = RK},
153+
#amqp_msg{payload = RK})
154+
|| RK <- RoutingKeys],
155+
amqp_channel:wait_for_confirms_or_die(Chan1),
156+
157+
Map1 = consume_all(Chan1, Queues),
158+
159+
%% Assert at least two queues got messages
160+
NonEmptyQueues1 = maps:filter(fun(_Q, Msgs) -> length(Msgs) > 0 end, Map1),
161+
?assert(maps:size(NonEmptyQueues1) >= 2),
162+
163+
%% Assert all messages routed
164+
?assertEqual(NumMsgs, lists:sum([length(Msgs) || Msgs <- maps:values(Map1)])),
165+
166+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn1, Chan1),
167+
%% Restart node
168+
ok = rabbit_ct_broker_helpers:restart_node(Config, 0),
169+
{Conn2, Chan2} = rabbit_ct_client_helpers:open_connection_and_channel(Config),
170+
171+
amqp_channel:call(Chan2, #'confirm.select'{}),
172+
%% Publish the same messages again.
173+
[amqp_channel:call(Chan2,
174+
#'basic.publish'{exchange = XNameBin,
175+
routing_key = RK},
176+
#amqp_msg{payload = RK})
177+
|| RK <- RoutingKeys],
178+
amqp_channel:wait_for_confirms_or_die(Chan2),
179+
180+
Map2 = consume_all(Chan2, Queues),
181+
182+
%% Assert the same messages ended up in the same queues,
183+
%% i.e. that routing was stable.
184+
?assertEqual(Map1, Map2),
185+
186+
amqp_channel:call(Chan2, #'exchange.delete'{exchange = XNameBin}),
187+
[amqp_channel:call(Chan2, #'queue.delete'{queue = Q}) || Q <- Queues],
188+
ok = rabbit_ct_client_helpers:close_connection_and_channel(Conn2, Chan2).
189+
190+
consume_all(Chan, Queues) ->
191+
lists:foldl(fun(Q, Map) ->
192+
Msgs = consume_queue(Chan, Q, []),
193+
maps:put(Q, Msgs, Map)
194+
end, #{}, Queues).
195+
196+
consume_queue(Chan, Q, L) ->
197+
case amqp_channel:call(Chan, #'basic.get'{queue = Q,
198+
no_ack = true}) of
199+
#'basic.get_empty'{} ->
200+
L;
201+
{#'basic.get_ok'{}, #amqp_msg{payload = Payload}} ->
202+
consume_queue(Chan, Q, L ++ [Payload])
203+
end.
140204

141205
rnd() ->
142-
list_to_binary(integer_to_list(rand:uniform(1000000))).
143-
144-
make_exchange_name(Config, Suffix) ->
145-
B = rabbit_ct_helpers:get_config(Config, test_resource_name),
146-
erlang:list_to_binary("x-" ++ B ++ "-" ++ Suffix).
206+
integer_to_binary(rand:uniform(10_000_000)).

0 commit comments

Comments
 (0)