Skip to content

Commit bbf51e7

Browse files
ansddeadtrickster
authored andcommitted
Support consistent hash exchange routing for MQTT 5.0
When a consistent hash exchange is bound to the MQTT topic exchange, MQTT 5.0 messages can be routed to queues consistently based on the Correlation-Data in the PUBLISH packet.
1 parent 60919f0 commit bbf51e7

3 files changed

Lines changed: 74 additions & 8 deletions

File tree

deps/rabbit/src/mc_amqp.erl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ x_header(Key, Msg) ->
9595
property(correlation_id, #msg{properties = #'v1_0.properties'{correlation_id = Corr}}) ->
9696
Corr;
9797
property(message_id, #msg{properties = #'v1_0.properties'{message_id = MsgId}}) ->
98-
MsgId;
98+
MsgId;
9999
property(_Prop, #msg{}) ->
100100
undefined.
101101

deps/rabbitmq_mqtt/src/mc_mqtt.erl

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,8 @@ size_prop(_, _, Sum) ->
213213
x_header(_Key, #mqtt_msg{}) ->
214214
undefined.
215215

216+
property(correlation_id, #mqtt_msg{props = #{'Correlation-Data' := Corr}}) ->
217+
{binary, Corr};
216218
property(_Key, #mqtt_msg{}) ->
217219
undefined.
218220

deps/rabbitmq_mqtt/test/v5_SUITE.erl

Lines changed: 71 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,8 @@ cluster_size_1_tests() ->
128128
topic_alias_retained_message,
129129
topic_alias_disallowed_retained_message,
130130
extended_auth,
131-
headers_exchange
131+
headers_exchange,
132+
consistent_hash_exchange
132133
].
133134

134135
cluster_size_3_tests() ->
@@ -2050,12 +2051,9 @@ headers_exchange(Config) ->
20502051
Ch, #'exchange.bind'{destination = HeadersX,
20512052
source = <<"amq.topic">>,
20522053
routing_key = <<"my.topic">>}),
2053-
#'queue.declare_ok'{} = amqp_channel:call(
2054-
Ch, #'queue.declare'{queue = Q1,
2055-
durable = true}),
2056-
#'queue.declare_ok'{} = amqp_channel:call(
2057-
Ch, #'queue.declare'{queue = Q2,
2058-
durable = true}),
2054+
[#'queue.declare_ok'{} = amqp_channel:call(
2055+
Ch, #'queue.declare'{queue = Q,
2056+
durable = true}) || Q <- Qs],
20592057
#'queue.bind_ok'{} = amqp_channel:call(
20602058
Ch, #'queue.bind'{queue = Q1,
20612059
exchange = HeadersX,
@@ -2107,6 +2105,72 @@ headers_exchange(Config) ->
21072105
[#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Qs],
21082106
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0).
21092107

2108+
%% Binding a consistent hash exchange to the MQTT topic exchange should support
2109+
%% consistent routing based on Correlation-Data in the PUBLISH packet.
2110+
consistent_hash_exchange(Config) ->
2111+
ok = rabbit_ct_broker_helpers:enable_plugin(Config, 0, rabbitmq_consistent_hash_exchange),
2112+
HashX = <<"my-consistent-hash-exchange">>,
2113+
Q1 = <<"q1">>,
2114+
Q2 = <<"q2">>,
2115+
Qs = [Q1, Q2],
2116+
Ch = rabbit_ct_client_helpers:open_channel(Config),
2117+
2118+
#'exchange.declare_ok'{} = amqp_channel:call(
2119+
Ch, #'exchange.declare'{
2120+
exchange = HashX,
2121+
type = <<"x-consistent-hash">>,
2122+
arguments = [{<<"hash-property">>, longstr, <<"correlation_id">>}],
2123+
durable = true,
2124+
auto_delete = true}),
2125+
#'exchange.bind_ok'{} = amqp_channel:call(
2126+
Ch, #'exchange.bind'{destination = HashX,
2127+
source = <<"amq.topic">>,
2128+
routing_key = <<"a.*">>}),
2129+
[#'queue.declare_ok'{} = amqp_channel:call(
2130+
Ch, #'queue.declare'{queue = Q,
2131+
durable = true}) || Q <- Qs],
2132+
[#'queue.bind_ok'{} = amqp_channel:call(
2133+
Ch, #'queue.bind'{queue = Q,
2134+
exchange = HashX,
2135+
%% weight
2136+
routing_key = <<"1">>}) || Q <- Qs],
2137+
2138+
Rands = [integer_to_binary(rand:uniform(1000)) || _ <- lists:seq(1, 30)],
2139+
UniqRands = lists:uniq(Rands),
2140+
NumMsgs = 150,
2141+
C = connect(?FUNCTION_NAME, Config),
2142+
[begin
2143+
N = integer_to_binary(rand:uniform(1_000_000)),
2144+
Topic = <<"a/", N/binary>>,
2145+
{ok, _} = emqtt:publish(C, Topic,
2146+
#{'Correlation-Data' => lists:nth(rand:uniform(length(UniqRands)), UniqRands)},
2147+
N, [{qos, 1}])
2148+
end || _ <- lists:seq(1, NumMsgs)],
2149+
2150+
#'basic.consume_ok'{consumer_tag = Ctag1} = amqp_channel:subscribe(
2151+
Ch, #'basic.consume'{queue = Q1,
2152+
no_ack = true}, self()),
2153+
#'basic.consume_ok'{consumer_tag = Ctag2} = amqp_channel:subscribe(
2154+
Ch, #'basic.consume'{queue = Q2,
2155+
no_ack = true}, self()),
2156+
{N1, Corrs1} = receive_correlations(Ctag1, 0, sets:new([{version, 2}])),
2157+
{N2, Corrs2} = receive_correlations(Ctag2, 0, sets:new([{version, 2}])),
2158+
ct:pal("q1: ~b messages, ~b unique correlation-data", [N1, sets:size(Corrs1)]),
2159+
ct:pal("q2: ~b messages, ~b unique correlation-data", [N2, sets:size(Corrs2)]),
2160+
%% All messages should be routed.
2161+
?assertEqual(NumMsgs, N1 + N2),
2162+
%% Each of the 2 queues should have received at least 1 message.
2163+
?assert(sets:size(Corrs1) > 0),
2164+
?assert(sets:size(Corrs2) > 0),
2165+
%% Assert that the consistent hash exchange routed the given Correlation-Data consistently.
2166+
%% The same Correlation-Data should never be present in both queues.
2167+
Intersection = sets:intersection(Corrs1, Corrs2),
2168+
?assert(sets:is_empty(Intersection)),
2169+
2170+
ok = emqtt:disconnect(C),
2171+
[#'queue.delete_ok'{} = amqp_channel:call(Ch, #'queue.delete'{queue = Q}) || Q <- Qs],
2172+
ok = rabbit_ct_client_helpers:close_channels_and_connection(Config, 0).
2173+
21102174
%% -------------------------------------------------------------------
21112175
%% Helpers
21122176
%% -------------------------------------------------------------------

0 commit comments

Comments
 (0)