Skip to content

Commit b8591ac

Browse files
Merge pull request #16253 from rabbitmq/mk-item-43
Refactor Direct Reply-to one more time
2 parents 40dda36 + 81e0f64 commit b8591ac

5 files changed

Lines changed: 409 additions & 24 deletions

File tree

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,7 +268,7 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
268268
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
269269
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_type rabbitmq_queues_cli_integration rabbitmq_streams_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
270270

271-
PARALLEL_CT_SET_5_A = consumer_recheck_prop rabbit_direct_reply_to_prop rabbit_quorum_queue_prop prop_stream_arg_validation direct_reply_to_amqpl direct_reply_to_amqp classic_queue
271+
PARALLEL_CT_SET_5_A = consumer_recheck_prop pid_codec pid_codec_prop rabbit_direct_reply_to_prop rabbit_quorum_queue_prop prop_stream_arg_validation direct_reply_to_amqpl direct_reply_to_amqp classic_queue
272272
PARALLEL_CT_SET_5_B = feature_flags_v2 backing_queue transactions
273273
PARALLEL_CT_SET_5_C = cluster_upgrade maintenance_mode
274274
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel rabbit_exchange_type_modulus_hash

deps/rabbit/src/rabbit_pid_codec.erl

Lines changed: 32 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,42 @@
2222
serial := non_neg_integer(),
2323
creation := non_neg_integer()}.
2424

25+
-type decomposed_pid_bin() :: #{node := binary(),
26+
id := non_neg_integer(),
27+
serial := non_neg_integer(),
28+
creation := non_neg_integer()}.
29+
30+
-export_type([decomposed_pid/0, decomposed_pid_bin/0]).
31+
2532
-spec decompose(pid()) -> decomposed_pid().
2633
decompose(Pid) ->
2734
Bin = term_to_binary(Pid, [{minor_version, 2}]),
28-
decompose_from_binary(Bin).
35+
{ok, #{node := NodeBin} = Parts} = decompose_from_binary(Bin),
36+
%% Safe: the input is a real local pid, so its node atom always exists.
37+
Parts#{node := binary_to_existing_atom(NodeBin, utf8)}.
2938

30-
-spec decompose_from_binary(binary()) -> decomposed_pid().
31-
decompose_from_binary(Bin) ->
32-
<<?TTB_PREFIX, ?NEW_PID_EXT, PidData/binary>> = Bin,
33-
{Node, Rest} = case PidData of
34-
<<?ATOM_UTF8_EXT, Len:16/integer, Node0:Len/binary, Rest1/binary>> ->
35-
{Node0, Rest1};
36-
<<?SMALL_ATOM_UTF8_EXT, Len/integer, Node0:Len/binary, Rest1/binary>> ->
37-
{Node0, Rest1}
38-
end,
39-
<<ID:32/integer, Serial:32/integer, Creation:32/integer>> = Rest,
40-
#{node => binary_to_atom(Node, utf8),
41-
id => ID,
42-
serial => Serial,
43-
creation => Creation}.
39+
%% Returns the node name as a binary and 'error' on malformed input.
40+
-spec decompose_from_binary(binary()) -> {ok, decomposed_pid_bin()} | error.
41+
decompose_from_binary(<<?TTB_PREFIX, ?NEW_PID_EXT,
42+
?ATOM_UTF8_EXT, Len:16/integer, Node:Len/binary,
43+
ID:32/integer, Serial:32/integer,
44+
Creation:32/integer>>) ->
45+
{ok,
46+
#{node => Node,
47+
id => ID,
48+
serial => Serial,
49+
creation => Creation}};
50+
decompose_from_binary(<<?TTB_PREFIX, ?NEW_PID_EXT,
51+
?SMALL_ATOM_UTF8_EXT, Len:8/integer, Node:Len/binary,
52+
ID:32/integer, Serial:32/integer,
53+
Creation:32/integer>>) ->
54+
{ok,
55+
#{node => Node,
56+
id => ID,
57+
serial => Serial,
58+
creation => Creation}};
59+
decompose_from_binary(_) ->
60+
error.
4461

4562
-spec recompose_to_binary(decomposed_pid()) -> binary().
4663
recompose_to_binary(#{node := Node,

deps/rabbit/src/rabbit_volatile_queue.erl

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -377,19 +377,30 @@ pid_from_name(<<?PREFIX, Bin/binary>>, CandidateNodes) ->
377377
try
378378
[PidBase64, _KeyBase64] = binary:split(Bin, Cp),
379379
PidBin = base64:decode(PidBase64),
380-
PidParts0 = #{node := ShortenedNodename} = rabbit_pid_codec:decompose_from_binary(PidBin),
381-
{_, NodeHash} = rabbit_nodes_common:parts(ShortenedNodename),
382-
case maps:get(list_to_integer(NodeHash), CandidateNodes, undefined) of
383-
undefined ->
384-
error;
385-
Candidate ->
386-
PidParts = maps:update(node, Candidate, PidParts0),
387-
{ok, rabbit_pid_codec:recompose(PidParts)}
380+
case rabbit_pid_codec:decompose_from_binary(PidBin) of
381+
{ok, #{node := ShortenedNodename} = PidParts0} ->
382+
NodeHash = node_hash_from_binary(ShortenedNodename),
383+
case maps:get(NodeHash, CandidateNodes, undefined) of
384+
undefined ->
385+
error;
386+
Candidate ->
387+
PidParts = maps:update(node, Candidate, PidParts0),
388+
{ok, rabbit_pid_codec:recompose(PidParts)}
389+
end;
390+
error ->
391+
error
388392
end
389393
catch error:_ -> error
390394
end;
391395
pid_from_name(_, _) ->
392396
error.
393397

398+
%% Returns the integer hash following "@" in a synthetic node-name binary
399+
%% such as <<"reply@1234567">>. Malformed input raises and is caught by
400+
%% the surrounding try.
401+
node_hash_from_binary(NodeBin) ->
402+
[_Prefix, Suffix] = binary:split(NodeBin, <<"@">>),
403+
binary_to_integer(Suffix).
404+
394405
nodes_with_hashes() ->
395406
#{erlang:phash2(Node) => Node || Node <- rabbit_nodes:list_members()}.
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(pid_codec_SUITE).
9+
10+
-compile(export_all).
11+
12+
-include_lib("common_test/include/ct.hrl").
13+
-include_lib("eunit/include/eunit.hrl").
14+
15+
-define(TTB_PREFIX, 131).
16+
-define(NEW_PID_EXT, 88).
17+
-define(ATOM_UTF8_EXT, 118).
18+
-define(SMALL_ATOM_UTF8_EXT, 119).
19+
20+
all() ->
21+
[
22+
safe_decoder_roundtrip_self,
23+
safe_decoder_returns_binary_node,
24+
safe_decoder_rejects_malformed,
25+
safe_decoder_no_atoms_for_unknown_node_names,
26+
safe_decoder_handles_long_node_names,
27+
pid_from_name_no_atoms_for_crafted_input,
28+
pid_from_name_rejects_malformed_node_name,
29+
pid_from_name_happy_path
30+
].
31+
32+
init_per_suite(Config) -> Config.
33+
end_per_suite(_Config) -> ok.
34+
35+
safe_decoder_roundtrip_self(_) ->
36+
Pid = self(),
37+
Bin = term_to_binary(Pid, [{minor_version, 2}]),
38+
{ok, Parts} = rabbit_pid_codec:decompose_from_binary(Bin),
39+
Recomposed = rabbit_pid_codec:recompose(Parts#{node := node()}),
40+
?assertEqual(Pid, Recomposed).
41+
42+
safe_decoder_returns_binary_node(_) ->
43+
Bin = term_to_binary(self(), [{minor_version, 2}]),
44+
{ok, #{node := NodeBin}} = rabbit_pid_codec:decompose_from_binary(Bin),
45+
?assert(is_binary(NodeBin)),
46+
?assertEqual(atom_to_binary(node()), NodeBin).
47+
48+
safe_decoder_rejects_malformed(_) ->
49+
Cases = [
50+
<<>>,
51+
<<0>>,
52+
<<?TTB_PREFIX>>,
53+
<<?TTB_PREFIX, 0>>,
54+
<<?TTB_PREFIX, ?NEW_PID_EXT>>,
55+
<<?TTB_PREFIX, ?NEW_PID_EXT, ?ATOM_UTF8_EXT>>,
56+
<<?TTB_PREFIX, ?NEW_PID_EXT, ?ATOM_UTF8_EXT, 0:16, 0:32, 0:32>>,
57+
<<(make_pid_bin(<<"x@1">>, 0, 0, 0))/binary, "trailing">>,
58+
<<?TTB_PREFIX, ?NEW_PID_EXT, ?ATOM_UTF8_EXT, 10:16, "abc",
59+
0:32, 0:32, 0:32>>,
60+
%% The legacy PID_EXT tag (103) is not accepted.
61+
<<?TTB_PREFIX, 103, 0, 0, 0, 0>>,
62+
%% TTB terms that are not pids.
63+
term_to_binary(an_atom),
64+
term_to_binary({a, tuple}),
65+
term_to_binary(42),
66+
%% Correct tags but the body is one byte short.
67+
<<?TTB_PREFIX, ?NEW_PID_EXT,
68+
?SMALL_ATOM_UTF8_EXT, 1:8, "x", 0:32, 0:32, 0:24>>
69+
],
70+
[?assertEqual(error, rabbit_pid_codec:decompose_from_binary(C))
71+
|| C <- Cases].
72+
73+
safe_decoder_no_atoms_for_unknown_node_names(_) ->
74+
Inputs = [{fresh_node_bytes(I), I} || I <- lists:seq(1, 1000)],
75+
Bins = [{Name, make_pid_bin(Name, I, I, I)} || {Name, I} <- Inputs],
76+
%% Warm up before sampling the atom count.
77+
{_, B0} = hd(Bins),
78+
{ok, _} = rabbit_pid_codec:decompose_from_binary(B0),
79+
Before = erlang:system_info(atom_count),
80+
lists:foreach(
81+
fun({Name, B}) ->
82+
{ok, #{node := Got}} =
83+
rabbit_pid_codec:decompose_from_binary(B),
84+
?assertEqual(Name, Got)
85+
end,
86+
Bins),
87+
?assertEqual(Before, erlang:system_info(atom_count)).
88+
89+
safe_decoder_handles_long_node_names(_) ->
90+
%% Names longer than 255 bytes use the ATOM_UTF8_EXT form.
91+
Long = binary:copy(<<"a">>, 65535),
92+
Bin = make_pid_bin(Long, 1, 2, 3),
93+
{ok, #{node := Got, id := 1, serial := 2, creation := 3}} =
94+
rabbit_pid_codec:decompose_from_binary(Bin),
95+
?assertEqual(Long, Got).
96+
97+
pid_from_name_no_atoms_for_crafted_input(_) ->
98+
CandidateNodes = #{},
99+
QNames = [craft_reply_to(fresh_node_bytes(I), I) || I <- lists:seq(1, 1000)],
100+
%% Warm up before sampling the atom count.
101+
error = rabbit_volatile_queue:pid_from_name(hd(QNames), CandidateNodes),
102+
Before = erlang:system_info(atom_count),
103+
lists:foreach(
104+
fun(QName) ->
105+
?assertEqual(error,
106+
rabbit_volatile_queue:pid_from_name(QName,
107+
CandidateNodes))
108+
end,
109+
QNames),
110+
?assertEqual(Before, erlang:system_info(atom_count)).
111+
112+
pid_from_name_rejects_malformed_node_name(_) ->
113+
Names = [<<"noatsign">>,
114+
<<"x@notdigits">>,
115+
<<"x@">>,
116+
<<"@123">>,
117+
<<>>,
118+
<<0, 1, 2, 3>>],
119+
[begin
120+
QName = craft_reply_to(N, 0),
121+
?assertEqual(error,
122+
rabbit_volatile_queue:pid_from_name(QName, #{}))
123+
end || N <- Names].
124+
125+
pid_from_name_happy_path(_) ->
126+
Hash = 4242,
127+
SyntheticNode = rabbit_nodes_common:make("reply", integer_to_list(Hash)),
128+
SyntheticNodeBin = atom_to_binary(SyntheticNode),
129+
PidBin = make_pid_bin(SyntheticNodeBin, 7, 0, 0),
130+
QName = <<"amq.rabbitmq.reply-to.",
131+
(base64:encode(PidBin))/binary, ".somekey">>,
132+
Candidate = node(),
133+
{ok, Pid} = rabbit_volatile_queue:pid_from_name(QName, #{Hash => Candidate}),
134+
?assertEqual(Candidate, node(Pid)).
135+
136+
%%% Helpers
137+
138+
make_pid_bin(NodeBin, ID, Serial, Creation) ->
139+
NodeLen = byte_size(NodeBin),
140+
case NodeLen =< 255 of
141+
true ->
142+
<<?TTB_PREFIX, ?NEW_PID_EXT,
143+
?SMALL_ATOM_UTF8_EXT, NodeLen:8, NodeBin/binary,
144+
ID:32, Serial:32, Creation:32>>;
145+
false ->
146+
<<?TTB_PREFIX, ?NEW_PID_EXT,
147+
?ATOM_UTF8_EXT, NodeLen:16, NodeBin/binary,
148+
ID:32, Serial:32, Creation:32>>
149+
end.
150+
151+
fresh_node_bytes(I) ->
152+
Uniq = erlang:unique_integer([positive]),
153+
iolist_to_binary(io_lib:format("codec-test-~b-~b@x", [Uniq, I])).
154+
155+
craft_reply_to(NodeBytes, ID) ->
156+
PidBin = make_pid_bin(NodeBytes, ID, 0, 0),
157+
<<"amq.rabbitmq.reply-to.", (base64:encode(PidBin))/binary, ".k">>.

0 commit comments

Comments
 (0)