Skip to content

Commit d16dda1

Browse files
Merge pull request #16287 from rabbitmq/mergify/bp/v4.2.x/pr-16286
By @lukebakken: Fix three stream argument validation bugs (backport #16285) (backport #16286)
2 parents 8c0a95a + 053f64c commit d16dda1

6 files changed

Lines changed: 320 additions & 8 deletions

deps/rabbit/Makefile

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -252,7 +252,7 @@ define ct_master.erl
252252
halt(0)
253253
endef
254254

255-
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
255+
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking unit_stream_arg_validation
256256
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
257257
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority default_queue_type_prop passive_declare_permission term_to_binary_compat_prop topic_permission unicode unit_access_control user_tags_count_limit
258258
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
@@ -272,7 +272,7 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
272272
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
273273
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_type rabbitmq_queues_cli_integration rabbitmq_streams_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
274274

275-
PARALLEL_CT_SET_5_A = consumer_recheck_prop rabbit_direct_reply_to_prop direct_reply_to_amqpl direct_reply_to_amqp classic_queue
275+
PARALLEL_CT_SET_5_A = consumer_recheck_prop rabbit_direct_reply_to_prop prop_stream_arg_validation direct_reply_to_amqpl direct_reply_to_amqp classic_queue
276276
PARALLEL_CT_SET_5_B = feature_flags_v2 backing_queue transactions
277277
PARALLEL_CT_SET_5_C = metadata_store_migration cluster_upgrade maintenance_mode
278278
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel

deps/rabbit/src/rabbit_amqqueue.erl

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1062,14 +1062,13 @@ check_max_age_arg({Type, _}, _Args) ->
10621062
{error, {unacceptable_type, Type}}.
10631063

10641064
check_max_age(MaxAge) ->
1065-
case re:run(MaxAge, "(^[0-9]*)(.*)", [{capture, all_but_first, list}]) of
1065+
case re:run(MaxAge, "(^[0-9]+)(.*)", [{capture, all_but_first, list}]) of
10661066
{match, [Value, Unit]} ->
10671067
case list_to_integer(Value) of
10681068
I when I > 0 ->
10691069
case lists:member(Unit, ["Y", "M", "D", "h", "m", "s"]) of
10701070
true ->
1071-
Int = list_to_integer(Value),
1072-
Int * unit_value_in_ms(Unit);
1071+
I * unit_value_in_ms(Unit);
10731072
false ->
10741073
{error, invalid_max_age}
10751074
end;

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,9 +178,14 @@ check_filter_size(Q) ->
178178
case rabbit_misc:table_lookup(Args, <<"x-stream-filter-size-bytes">>) of
179179
undefined ->
180180
ok;
181+
{Type, _Val} when Type =/= long, Type =/= short, Type =/= signedint,
182+
Type =/= unsignedint, Type =/= unsignedbyte,
183+
Type =/= unsignedshort, Type =/= byte ->
184+
{protocol_error, precondition_failed,
185+
"Invalid type for x-stream-filter-size-bytes", []};
181186
{_Type, Val} when Val > 255 orelse Val < 16 ->
182187
{protocol_error, precondition_failed,
183-
"Invalid value for x-stream-filter-size-bytes", []};
188+
"Invalid value for x-stream-filter-size-bytes", []};
184189
_ ->
185190
ok
186191
end.
@@ -1454,6 +1459,7 @@ capabilities() ->
14541459
<<"dead-letter-strategy">>, <<"target-group-size">>],
14551460
queue_arguments => [<<"x-max-length-bytes">>, <<"x-queue-type">>,
14561461
<<"x-max-age">>, <<"x-stream-max-segment-size-bytes">>,
1462+
<<"x-stream-filter-size-bytes">>,
14571463
<<"x-initial-cluster-size">>, <<"x-queue-leader-locator">>],
14581464
consumer_arguments => [<<"x-stream-offset">>,
14591465
<<"x-stream-filter">>,
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-module(prop_stream_arg_validation_SUITE).
8+
9+
-compile(nowarn_export_all).
10+
-compile(export_all).
11+
12+
-include_lib("proper/include/proper.hrl").
13+
-include_lib("common_test/include/ct.hrl").
14+
15+
-define(ITERATIONS, 1000).
16+
17+
all() ->
18+
[
19+
prop_valid_max_age,
20+
prop_invalid_max_age_no_leading_digits,
21+
prop_invalid_max_age_invalid_unit
22+
].
23+
24+
%% -------------------------------------------------------------------
25+
%% Suite setup/teardown.
26+
%% -------------------------------------------------------------------
27+
28+
init_per_suite(Config) ->
29+
rabbit_ct_helpers:log_environment(),
30+
rabbit_ct_helpers:run_setup_steps(Config).
31+
32+
end_per_suite(Config) ->
33+
rabbit_ct_helpers:run_teardown_steps(Config).
34+
35+
init_per_testcase(Testcase, Config) ->
36+
rabbit_ct_helpers:testcase_started(Config, Testcase).
37+
38+
end_per_testcase(Testcase, Config) ->
39+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
40+
41+
%% -------------------------------------------------------------------
42+
%% Properties: rabbit_amqqueue:check_max_age/1.
43+
%% Pure function; no broker required.
44+
%% -------------------------------------------------------------------
45+
46+
prop_valid_max_age(_Config) ->
47+
rabbit_ct_proper_helpers:run_proper(fun prop_valid_max_age_body/0, [], ?ITERATIONS).
48+
49+
prop_valid_max_age_body() ->
50+
ValidUnits = ["Y", "M", "D", "h", "m", "s"],
51+
?FORALL({N, Unit}, {pos_integer(), elements(ValidUnits)},
52+
begin
53+
MaxAge = list_to_binary(integer_to_list(N) ++ Unit),
54+
Result = rabbit_amqqueue:check_max_age(MaxAge),
55+
is_integer(Result) andalso Result > 0
56+
end).
57+
58+
prop_invalid_max_age_no_leading_digits(_Config) ->
59+
rabbit_ct_proper_helpers:run_proper(
60+
fun prop_invalid_max_age_no_leading_digits_body/0, [], ?ITERATIONS).
61+
62+
prop_invalid_max_age_no_leading_digits_body() ->
63+
%% ASCII digits occupy 48..57; any other first byte means no leading digits.
64+
NonDigitByte = ?SUCHTHAT(C, byte(), C < $0 orelse C > $9),
65+
?FORALL({First, Rest}, {NonDigitByte, binary()},
66+
begin
67+
MaxAge = <<First, Rest/binary>>,
68+
{error, invalid_max_age} =:= rabbit_amqqueue:check_max_age(MaxAge)
69+
end).
70+
71+
prop_invalid_max_age_invalid_unit(_Config) ->
72+
rabbit_ct_proper_helpers:run_proper(
73+
fun prop_invalid_max_age_invalid_unit_body/0, [], ?ITERATIONS).
74+
75+
prop_invalid_max_age_invalid_unit_body() ->
76+
ValidUnits = ["Y", "M", "D", "h", "m", "s"],
77+
BadUnitChar = ?SUCHTHAT(C, choose($A, $z), not lists:member([C], ValidUnits)),
78+
?FORALL({N, UnitChar}, {pos_integer(), BadUnitChar},
79+
begin
80+
MaxAge = list_to_binary(integer_to_list(N) ++ [UnitChar]),
81+
{error, invalid_max_age} =:= rabbit_amqqueue:check_max_age(MaxAge)
82+
end).

deps/rabbit/test/rabbit_stream_queue_SUITE.erl

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,13 @@ declare_max_age(Config) ->
344344
[{<<"x-queue-type">>, longstr, <<"stream">>},
345345
{<<"x-max-age">>, longstr, <<"1A">>}])),
346346

347+
%% "D" has no leading digits; must return a clean error, not crash
348+
?assertExit(
349+
{{shutdown, {server_initiated_close, 406, _}}, _},
350+
declare(Config, Server, Q,
351+
[{<<"x-queue-type">>, longstr, <<"stream">>},
352+
{<<"x-max-age">>, longstr, <<"D">>}])),
353+
347354
?assertEqual({'queue.declare_ok', Q, 0, 0},
348355
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
349356
{<<"x-max-age">>, longstr, <<"1Y">>}])),
@@ -401,11 +408,18 @@ declare_invalid_filter_size(Config) ->
401408
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
402409
Q = ?config(queue_name, Config),
403410

404-
ExpectedError = <<"PRECONDITION_FAILED - Invalid value for x-stream-filter-size-bytes">>,
411+
ExpectedError = <<"PRECONDITION_FAILED - Invalid value for x-stream-filter-size-bytes">>,
405412
?assertExit(
406413
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
407414
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
408-
{<<"x-stream-filter-size-bytes">>, long, 256}])).
415+
{<<"x-stream-filter-size-bytes">>, long, 256}])),
416+
417+
%% `x-stream-filter-size-bytes` is now in stream capabilities, so `check_non_neg_int_arg/2`
418+
%% runs during declare argument validation and rejects non-integer types
419+
?assertExit(
420+
{{shutdown, {server_initiated_close, 406, _}}, _},
421+
declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>},
422+
{<<"x-stream-filter-size-bytes">>, longstr, <<"128">>}])).
409423

410424
consume_invalid_arg(Config) ->
411425
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
Lines changed: 211 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,211 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
7+
-module(unit_stream_arg_validation_SUITE).
8+
9+
-compile(nowarn_export_all).
10+
-compile(export_all).
11+
12+
-include_lib("common_test/include/ct.hrl").
13+
-include_lib("eunit/include/eunit.hrl").
14+
-include_lib("amqp_client/include/amqp_client.hrl").
15+
16+
suite() ->
17+
[{timetrap, {minutes, 5}}].
18+
19+
all() ->
20+
[
21+
{group, unit},
22+
{group, integration}
23+
].
24+
25+
groups() ->
26+
[
27+
{unit, [parallel],
28+
[check_max_age_empty_string,
29+
check_max_age_no_leading_digits,
30+
check_max_age_zero_count,
31+
check_max_age_valid_units,
32+
check_max_age_invalid_unit]},
33+
{integration, [parallel],
34+
[filter_size_lower_bound,
35+
filter_size_upper_bound,
36+
filter_size_below_lower_bound,
37+
filter_size_zero,
38+
filter_size_float,
39+
filter_size_on_classic_queue]}
40+
].
41+
42+
%% -------------------------------------------------------------------
43+
%% Suite setup/teardown.
44+
%% -------------------------------------------------------------------
45+
46+
init_per_suite(Config) ->
47+
rabbit_ct_helpers:log_environment(),
48+
rabbit_ct_helpers:run_setup_steps(Config).
49+
50+
end_per_suite(Config) ->
51+
rabbit_ct_helpers:run_teardown_steps(Config).
52+
53+
init_per_group(unit, Config) ->
54+
Config;
55+
init_per_group(integration, Config) ->
56+
Config1 = rabbit_ct_helpers:set_config(Config,
57+
[{rmq_nodename_suffix, integration},
58+
{rmq_nodes_count, 1}]),
59+
rabbit_ct_helpers:run_steps(Config1,
60+
rabbit_ct_broker_helpers:setup_steps() ++
61+
rabbit_ct_client_helpers:setup_steps()).
62+
63+
end_per_group(unit, _Config) ->
64+
ok;
65+
end_per_group(integration, Config) ->
66+
rabbit_ct_helpers:run_steps(Config,
67+
rabbit_ct_client_helpers:teardown_steps() ++
68+
rabbit_ct_broker_helpers:teardown_steps()).
69+
70+
init_per_testcase(Testcase, Config) ->
71+
Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase),
72+
Q = rabbit_data_coercion:to_binary(Testcase),
73+
rabbit_ct_helpers:set_config(Config1, [{queue_name, Q}]).
74+
75+
end_per_testcase(Testcase, Config) ->
76+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
77+
78+
%% -------------------------------------------------------------------
79+
%% Unit tests: rabbit_amqqueue:check_max_age/1.
80+
%% Pure function; no broker required.
81+
%% -------------------------------------------------------------------
82+
83+
check_max_age_empty_string(_Config) ->
84+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<>>)).
85+
86+
check_max_age_no_leading_digits(_Config) ->
87+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"D">>)),
88+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"M">>)),
89+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"Year">>)),
90+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"-1D">>)).
91+
92+
check_max_age_zero_count(_Config) ->
93+
lists:foreach(
94+
fun(Unit) ->
95+
?assertEqual({error, invalid_max_age},
96+
rabbit_amqqueue:check_max_age(list_to_binary("0" ++ Unit)))
97+
end,
98+
["Y", "M", "D", "h", "m", "s"]).
99+
100+
check_max_age_valid_units(_Config) ->
101+
lists:foreach(
102+
fun(Unit) ->
103+
Result = rabbit_amqqueue:check_max_age(list_to_binary("10" ++ Unit)),
104+
?assert(is_integer(Result)),
105+
?assert(Result > 0)
106+
end,
107+
["Y", "M", "D", "h", "m", "s"]).
108+
109+
check_max_age_invalid_unit(_Config) ->
110+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"1A">>)),
111+
%% Lowercase y is not a valid unit; only uppercase Y is
112+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"1y">>)),
113+
%% Multiple trailing characters are not a valid unit
114+
?assertEqual({error, invalid_max_age}, rabbit_amqqueue:check_max_age(<<"1D2">>)).
115+
116+
%% -------------------------------------------------------------------
117+
%% Integration tests: x-stream-filter-size-bytes validation.
118+
%% These tests require a running broker.
119+
%% -------------------------------------------------------------------
120+
121+
filter_size_lower_bound(Config) ->
122+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
123+
Q = ?config(queue_name, Config),
124+
?assertEqual({'queue.declare_ok', Q, 0, 0},
125+
declare_stream(Config, Server, Q,
126+
[{<<"x-stream-filter-size-bytes">>, long, 16}])),
127+
delete_queue(Config, Q).
128+
129+
filter_size_upper_bound(Config) ->
130+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
131+
Q = ?config(queue_name, Config),
132+
?assertEqual({'queue.declare_ok', Q, 0, 0},
133+
declare_stream(Config, Server, Q,
134+
[{<<"x-stream-filter-size-bytes">>, long, 255}])),
135+
delete_queue(Config, Q).
136+
137+
filter_size_below_lower_bound(Config) ->
138+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
139+
Q = ?config(queue_name, Config),
140+
ExpectedError = <<"PRECONDITION_FAILED - Invalid value for x-stream-filter-size-bytes">>,
141+
?assertExit(
142+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
143+
declare_stream(Config, Server, Q,
144+
[{<<"x-stream-filter-size-bytes">>, long, 15}])).
145+
146+
filter_size_zero(Config) ->
147+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
148+
Q = ?config(queue_name, Config),
149+
%% 0 satisfies `check_non_neg_int_arg/2` but still falls below the valid range
150+
ExpectedError = <<"PRECONDITION_FAILED - Invalid value for x-stream-filter-size-bytes">>,
151+
?assertExit(
152+
{{shutdown, {server_initiated_close, 406, ExpectedError}}, _},
153+
declare_stream(Config, Server, Q,
154+
[{<<"x-stream-filter-size-bytes">>, long, 0}])).
155+
156+
filter_size_float(Config) ->
157+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
158+
Q = ?config(queue_name, Config),
159+
%% Before the fix, a float in [16.0, 255.0] was silently accepted because Erlang
160+
%% compares floats and integers numerically, bypassing the `x-stream-filter-size-bytes`
161+
%% range guard.
162+
?assertExit(
163+
{{shutdown, {server_initiated_close, 406, _}}, _},
164+
declare_stream(Config, Server, Q,
165+
[{<<"x-stream-filter-size-bytes">>, float, 32.0}])).
166+
167+
filter_size_on_classic_queue(Config) ->
168+
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
169+
Q = ?config(queue_name, Config),
170+
%% `x-stream-filter-size-bytes` is now in stream capabilities and therefore in the
171+
%% global queue_arguments union, so classic queues must reject it.
172+
?assertExit(
173+
{{shutdown, {server_initiated_close, 406, _}}, _},
174+
declare_classic(Config, Server, Q,
175+
[{<<"x-stream-filter-size-bytes">>, long, 32}])).
176+
177+
%% -------------------------------------------------------------------
178+
%% Helpers.
179+
%% -------------------------------------------------------------------
180+
181+
declare_stream(Config, Server, Q, ExtraArgs) ->
182+
call_declare(Config, Server,
183+
#'queue.declare'{queue = Q,
184+
durable = true,
185+
arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}
186+
| ExtraArgs]}).
187+
188+
declare_classic(Config, Server, Q, ExtraArgs) ->
189+
call_declare(Config, Server,
190+
#'queue.declare'{queue = Q,
191+
durable = true,
192+
arguments = [{<<"x-queue-type">>, longstr, <<"classic">>}
193+
| ExtraArgs]}).
194+
195+
call_declare(Config, Server, Cmd) ->
196+
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
197+
Reply = amqp_channel:call(Ch, Cmd),
198+
rabbit_ct_client_helpers:close_channel(Ch),
199+
Reply.
200+
201+
delete_queue(Config, Q) ->
202+
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, do_delete_queue, [Q]).
203+
204+
do_delete_queue(Name) ->
205+
QName = rabbit_misc:r(<<"/">>, queue, Name),
206+
case rabbit_amqqueue:lookup(QName) of
207+
{ok, Q} ->
208+
{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>);
209+
_ ->
210+
ok
211+
end.

0 commit comments

Comments
 (0)