Skip to content

Commit ccd343f

Browse files
Merge pull request #16302 from rabbitmq/mergify/bp/v4.2.x/pr-16301
Apply the per-node `connection_max` limit to AMQP 1.0 connections (backport #16300) (backport #16301)
2 parents e52771e + 9842be1 commit ccd343f

6 files changed

Lines changed: 333 additions & 3 deletions

File tree

deps/rabbit/Makefile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
272272
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
273273
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info queue_type rabbitmq_queues_cli_integration rabbitmq_streams_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
274274

275-
PARALLEL_CT_SET_5_A = consumer_recheck_prop pid_codec pid_codec_prop rabbit_direct_reply_to_prop prop_stream_arg_validation direct_reply_to_amqpl direct_reply_to_amqp classic_queue
275+
PARALLEL_CT_SET_5_A = amqp10_connection_max unit_amqp10_connection_max consumer_recheck_prop pid_codec pid_codec_prop rabbit_direct_reply_to_prop prop_stream_arg_validation direct_reply_to_amqpl direct_reply_to_amqp classic_queue
276276
PARALLEL_CT_SET_5_B = feature_flags_v2 backing_queue transactions
277277
PARALLEL_CT_SET_5_C = metadata_store_migration cluster_upgrade maintenance_mode
278278
PARALLEL_CT_SET_5_D = rabbit_fifo_dlx_integration publisher_confirms_parallel

deps/rabbit/include/rabbit_amqp_reader.hrl

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,10 @@
4343
-record(v1,
4444
{parent :: pid(),
4545
helper_sup :: pid(),
46+
%% A Ranch listener reference, used to enforce the per-node
47+
%% `connection_max` limit.
48+
%% Will be `undefined` for AMQP-over-WebSockets connections.
49+
ranch_ref = undefined :: undefined | ranch:ref(),
4650
writer = none :: none | pid(),
4751
heartbeater = none :: none | rabbit_heartbeat:heartbeaters(),
4852
session_sup = none :: none | pid(),

deps/rabbit/src/rabbit_amqp_reader.erl

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,18 +29,24 @@
2929
handle_other/2,
3030
ensure_stats_timer/1]).
3131

32+
-ifdef(TEST).
33+
-export([check_node_connection_limit/1]).
34+
-endif.
35+
3236
-import(rabbit_amqp_util, [protocol_error/3]).
3337

3438
-define(IS_RUNNING(State), State#v1.connection_state =:= running).
3539

3640
unpack_from_0_9_1(
3741
{Sock, PendingRecv, SupPid, Buf, BufLen, ProxySocket,
38-
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer},
42+
ConnectionName, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer,
43+
RanchRef},
3944
Parent) ->
4045
logger:update_process_metadata(#{connection => ConnectionName}),
4146
#v1{parent = Parent,
4247
websocket = false,
4348
sock = Sock,
49+
ranch_ref = RanchRef,
4450
callback = {frame_header, sasl},
4551
pending_recv = PendingRecv,
4652
helper_sup = SupPid,
@@ -438,13 +444,16 @@ handle_connection_frame(
438444
user = User = #user{username = Username},
439445
auth_mechanism = {Mechanism, _Mod}
440446
},
447+
ranch_ref = RanchRef,
441448
helper_sup = HelperSupPid,
442449
sock = Sock} = State0) ->
443450
Vhost = vhost(Hostname),
444451
logger:update_process_metadata(#{amqp_container => ContainerId,
445452
vhost => Vhost,
446453
user => Username}),
447454
ok = check_user_loopback(State0),
455+
%% Enforced before per-vhost and per-user limits for efficiency.
456+
ok = check_node_connection_limit(RanchRef),
448457
ok = check_vhost_exists(Vhost, State0),
449458
ok = check_vhost_alive(Vhost),
450459
ok = rabbit_access_control:check_vhost_access(User, Vhost, {socket, Sock}, #{}),
@@ -889,6 +898,27 @@ check_vhost_alive(Vhost) ->
889898
[Vhost])
890899
end.
891900

901+
-spec check_node_connection_limit(ranch:ref() | undefined) -> ok.
902+
check_node_connection_limit(undefined) ->
903+
%% AMQP-over-WebSockets connections won't have an associated Ranch listener
904+
ok;
905+
check_node_connection_limit(RanchRef) ->
906+
case application:get_env(rabbit, connection_max, infinity) of
907+
infinity ->
908+
ok;
909+
Limit when is_integer(Limit), Limit >= 0 ->
910+
#{active_connections := ActiveConns} = ranch:info(RanchRef),
911+
case ActiveConns > Limit of
912+
false ->
913+
ok;
914+
true ->
915+
protocol_error(
916+
?V_1_0_AMQP_ERROR_RESOURCE_LIMIT_EXCEEDED,
917+
"connection refused: node connection limit (~p) is reached",
918+
[Limit])
919+
end
920+
end.
921+
892922
check_vhost_connection_limit(Vhost, Username) ->
893923
case rabbit_vhost_limit:is_over_connection_limit(Vhost) of
894924
false ->

deps/rabbit/src/rabbit_reader.erl

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1730,6 +1730,7 @@ become_10(State) ->
17301730
State#v1{connection_state = {become, Fun}}.
17311731

17321732
pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
1733+
ranch_ref = RanchRef,
17331734
pending_recv = PendingRecv,
17341735
helper_sup = {_HelperSup091, HelperSup10},
17351736
proxy_socket = ProxySocket,
@@ -1742,7 +1743,8 @@ pack_for_1_0(Buf, BufLen, #v1{sock = Sock,
17421743
peer_port = PeerPort,
17431744
connected_at = ConnectedAt}}) ->
17441745
{Sock, PendingRecv, HelperSup10, Buf, BufLen, ProxySocket,
1745-
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer}.
1746+
Name, Host, PeerHost, Port, PeerPort, ConnectedAt, StatsTimer,
1747+
RanchRef}.
17461748

17471749
respond_and_close(State, Channel, Protocol, Reason, LogErr) ->
17481750
log_hard_error(State, Channel, LogErr),
Lines changed: 184 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,184 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term “Broadcom” refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
6+
%%
7+
8+
-module(amqp10_connection_max_SUITE).
9+
10+
-compile([export_all, nowarn_export_all]).
11+
12+
-include_lib("common_test/include/ct.hrl").
13+
-include_lib("eunit/include/eunit.hrl").
14+
-include_lib("amqp_client/include/amqp_client.hrl").
15+
-include_lib("amqp10_common/include/amqp10_framing.hrl").
16+
17+
-define(TIMEOUT, 30_000).
18+
19+
all() ->
20+
[{group, single_node}].
21+
22+
groups() ->
23+
[{single_node, [],
24+
[
25+
node_connection_limit_amqp10,
26+
node_limit_infinity_allows_amqp10,
27+
node_limit_room_after_close,
28+
node_connection_limit_shared_with_amqp091,
29+
amqp10_connection_is_tracked
30+
]}].
31+
32+
suite() ->
33+
[{timetrap, {minutes, 3}}].
34+
35+
init_per_suite(Config) ->
36+
{ok, _} = application:ensure_all_started(amqp10_client),
37+
rabbit_ct_helpers:log_environment(),
38+
rabbit_ct_helpers:run_setup_steps(Config).
39+
40+
end_per_suite(Config) ->
41+
rabbit_ct_helpers:run_teardown_steps(Config).
42+
43+
init_per_group(Group, Config0) ->
44+
Config1 = rabbit_ct_helpers:set_config(Config0,
45+
[{rmq_nodename_suffix, Group},
46+
{rmq_nodes_count, 1}]),
47+
rabbit_ct_helpers:run_steps(Config1,
48+
rabbit_ct_broker_helpers:setup_steps() ++
49+
rabbit_ct_client_helpers:setup_steps()).
50+
51+
end_per_group(_Group, Config) ->
52+
rabbit_ct_helpers:run_steps(Config,
53+
rabbit_ct_client_helpers:teardown_steps() ++
54+
rabbit_ct_broker_helpers:teardown_steps()).
55+
56+
init_per_testcase(Testcase, Config) ->
57+
set_node_limit(Config, infinity),
58+
rabbit_ct_helpers:testcase_started(Config, Testcase).
59+
60+
end_per_testcase(Testcase, Config) ->
61+
set_node_limit(Config, infinity),
62+
rabbit_ct_helpers:testcase_finished(Config, Testcase).
63+
64+
%% -------------------------------------------------------------------
65+
%% Test cases
66+
%% -------------------------------------------------------------------
67+
68+
node_connection_limit_amqp10(Config) ->
69+
set_node_limit(Config, 0),
70+
{refused, _} = open_amqp10(Config),
71+
72+
set_node_limit(Config, 5),
73+
Conns = [open_amqp10_ok(Config) || _ <- lists:seq(1, 5)],
74+
{refused, _} = open_amqp10(Config),
75+
[amqp_utils:close_connection_sync(C) || C <- Conns],
76+
ok.
77+
78+
node_limit_infinity_allows_amqp10(Config) ->
79+
set_node_limit(Config, infinity),
80+
Conns = [open_amqp10_ok(Config) || _ <- lists:seq(1, 10)],
81+
[amqp_utils:close_connection_sync(C) || C <- Conns],
82+
ok.
83+
84+
node_limit_room_after_close(Config) ->
85+
set_node_limit(Config, 2),
86+
C1 = open_amqp10_ok(Config),
87+
C2 = open_amqp10_ok(Config),
88+
{refused, _} = open_amqp10(Config),
89+
amqp_utils:close_connection_sync(C1),
90+
{ok, C3} = retry_open_amqp10(Config, 30),
91+
[amqp_utils:close_connection_sync(C) || C <- [C2, C3]],
92+
ok.
93+
94+
%% Historically only AMQP 0-9-1 honored `connection_max`,
95+
%% so this example uses both protocols.
96+
node_connection_limit_shared_with_amqp091(Config) ->
97+
set_node_limit(Config, 2),
98+
C091 = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
99+
true = is_pid(C091),
100+
C10 = open_amqp10_ok(Config),
101+
{refused, _} = open_amqp10(Config),
102+
{error, not_allowed} = rabbit_ct_client_helpers:open_unmanaged_connection(Config, 0),
103+
amqp_utils:close_connection_sync(C10),
104+
rabbit_ct_client_helpers:close_connection(C091),
105+
ok.
106+
107+
amqp10_connection_is_tracked(Config) ->
108+
ok = event_recorder:start(Config),
109+
try
110+
C = open_amqp10_ok(Config),
111+
ServerPid = wait_for_created_pid(Config),
112+
Local = rabbit_ct_broker_helpers:rpc(
113+
Config, 0, rabbit_networking, local_connections, []),
114+
?assert(lists:member(ServerPid, Local)),
115+
amqp_utils:close_connection_sync(C),
116+
wait_for_event_types(Config, [connection_closed])
117+
after
118+
ok = event_recorder:stop(Config)
119+
end,
120+
ok.
121+
122+
%% -------------------------------------------------------------------
123+
%% Helpers
124+
%% -------------------------------------------------------------------
125+
126+
set_node_limit(Config, Limit) ->
127+
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
128+
application, set_env,
129+
[rabbit, connection_max, Limit]).
130+
131+
open_amqp10_ok(Config) ->
132+
{ok, Conn} = open_amqp10(Config),
133+
Conn.
134+
135+
open_amqp10(Config) ->
136+
{ok, Conn} = amqp10_client:open_connection(amqp_utils:connection_config(Config)),
137+
receive
138+
{amqp10_event, {connection, Conn, opened}} ->
139+
{ok, Conn};
140+
{amqp10_event, {connection, Conn, {closed, Reason}}} ->
141+
{refused, Reason}
142+
after ?TIMEOUT ->
143+
ct:fail({open_amqp10_timeout, Conn})
144+
end.
145+
146+
%% Covers the brief window between client-side close and Ranch
147+
%% actually dropping the active TCP connection.
148+
retry_open_amqp10(_Config, 0) ->
149+
ct:fail(retry_open_amqp10_exhausted);
150+
retry_open_amqp10(Config, Attempts) ->
151+
case open_amqp10(Config) of
152+
{ok, _} = Result -> Result;
153+
{refused, _} ->
154+
timer:sleep(100),
155+
retry_open_amqp10(Config, Attempts - 1)
156+
end.
157+
158+
wait_for_created_pid(Config) ->
159+
wait_for_created_pid(Config, [], 30).
160+
161+
wait_for_created_pid(_Config, _Acc, 0) ->
162+
ct:fail(missing_connection_created_event);
163+
wait_for_created_pid(Config, Acc, N) ->
164+
Acc1 = Acc ++ event_recorder:get_events(Config),
165+
case [proplists:get_value(pid, Props)
166+
|| #event{type = connection_created, props = Props} <- Acc1] of
167+
[Pid | _] when is_pid(Pid) -> Pid;
168+
[] -> wait_for_created_pid(Config, Acc1, N - 1)
169+
end.
170+
171+
wait_for_event_types(Config, Wanted) ->
172+
wait_for_event_types(Config, Wanted, 30).
173+
174+
wait_for_event_types(_Config, [], _N) ->
175+
ok;
176+
wait_for_event_types(_Config, Wanted, 0) ->
177+
ct:fail({missing_event_types, Wanted});
178+
wait_for_event_types(Config, Wanted, N) ->
179+
Seen = [T || #event{type = T} <- event_recorder:get_events(Config)],
180+
Remaining = [T || T <- Wanted, not lists:member(T, Seen)],
181+
case Remaining of
182+
[] -> ok;
183+
_ -> wait_for_event_types(Config, Remaining, N - 1)
184+
end.

0 commit comments

Comments
 (0)