Skip to content

Commit 817a4d4

Browse files
authored
Quorum Queue notifies AMQP client of Single Active Consumer state change (#15736)
* Notify AMQP 1.0 clients of Single Active Consumer status changes Solves #14726 Quorum queues support the Single Active Consumer (SAC) feature, but until now clients had no reliable way to learn whether they are the active consumer or a waiting one. This commit adds consumer activity notifications so that AMQP 1.0 consumers learn about their active/inactive status after attaching and whenever it changes. The notification is pushed via the `properties` field of the AMQP 1.0 flow frame, using a key `rabbitmq:active` with a boolean value. This property is pushed to the client only upon SAC activity state change, i.e. not in every flow frame to keep network traffic low. On the server side, the existing `credit_reply` message sent from the queue process to the session process is extended from a 6-tuple to a 7-tuple, where the 7th element is a map of link state properties (e.g. `#{active => true}`). This reuses the existing communication mechanism rather than introducing a new Ra event, and maps naturally to the flow frame's `properties` field. It also allows future extensibility for other queue types to convey additional link state information. `rabbit_quorum_queue:consume/3` opts in by setting `link_state_properties => true` in the consumer metadata. `rabbit_fifo` uses this flag to decide whether to emit the 7-tuple (with properties) or the old 6-tuple (without). This is how mixed-version cluster compatibility is maintained: an old queue process sends the old 6-tuple which `rabbit_amqp_session` converts to a 7-tuple with an empty map, and a new queue process only sends the 7-tuple when the consumer opted in. On the client side, amqp10_client emits a generic `{amqp10_event, {link, LinkRef, {state_properties, Map}}}` event whenever the flow frame's properties field changes, without baking in any RabbitMQ-specific knowledge. * Send link state properties also for non-SAC QQ
1 parent 8c1041a commit 817a4d4

15 files changed

Lines changed: 877 additions & 198 deletions

deps/amqp10_client/src/amqp10_client.erl

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -230,7 +230,7 @@ attach_sender_link(Session, Name, Target, SettleMode, Durability) ->
230230
durable => Durability}},
231231
snd_settle_mode => SettleMode,
232232
rcv_settle_mode => first},
233-
amqp10_client_session:attach(Session, AttachArgs).
233+
attach_link(Session, AttachArgs).
234234

235235
%% @doc Attaches a receiver link to a source.
236236
%% This is asynchronous and will notify completion of the attach request to the
@@ -307,7 +307,7 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Prop
307307
filter => Filter,
308308
properties => Properties,
309309
raw_mode => RawMode},
310-
amqp10_client_session:attach(Session, AttachArgs).
310+
attach_link(Session, AttachArgs).
311311

312312
-spec attach_link(pid(), attach_args()) -> {ok, link_ref()}.
313313
attach_link(Session, AttachArgs) ->

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,8 @@
101101
max_message_size => max_message_size(),
102102
handle => output_handle(),
103103
footer_opt => footer_opt(),
104-
raw_mode => boolean()
104+
raw_mode => boolean(),
105+
notify_when_state_properties_changed => boolean()
105106
}.
106107

107108
-type transfer_error() :: {error,
@@ -144,7 +145,9 @@
144145
Credit :: pos_integer()},
145146
incoming_unsettled = #{} :: #{delivery_number() => ok},
146147
footer_opt :: footer_opt() | undefined,
147-
raw_mode = false :: boolean()
148+
raw_mode = false :: boolean(),
149+
%% link state properties from the flow frame
150+
state_properties :: ignore | #{binary() => term()}
148151
}).
149152

150153
-record(state,
@@ -414,9 +417,9 @@ mapped(cast, #'v1_0.flow'{handle = {uint, InHandle}} = Flow,
414417
{ok, #link{output_handle = OutHandle} = Link0} =
415418
find_link_by_input_handle(InHandle, State),
416419

417-
% TODO: handle `send_flow` return tag
418-
{ok, Link} = handle_link_flow(Flow, Link0),
419-
ok = maybe_notify_link_credit(Link0, Link),
420+
{ok, Link1} = handle_link_flow(Flow, Link0),
421+
ok = maybe_notify_link_credit(Link0, Link1),
422+
Link = maybe_notify_link_state_properties(Flow, Link1),
420423
Links1 = Links#{OutHandle := Link},
421424
State1 = State#state{links = Links1},
422425
{keep_state, State1};
@@ -976,6 +979,10 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
976979
ok = Send(Attach, State),
977980

978981
Ref = make_link_ref(Role, self(), OutHandle),
982+
StateProps = case maps:get(notify_when_state_properties_changed, Args, false) of
983+
true -> maps:new();
984+
_ -> ignore
985+
end,
979986
Link = #link{name = Name,
980987
ref = Ref,
981988
output_handle = OutHandle,
@@ -987,8 +994,8 @@ send_attach(Send, #{name := Name, role := RoleTuple} = Args, {FromPid, _},
987994
delivery_count = unpack(InitialDeliveryCount),
988995
max_message_size = unpack(MaxMessageSize),
989996
footer_opt = maps:get(footer_opt, Args, undefined),
990-
raw_mode = maps:get(raw_mode, Args, false)},
991-
997+
raw_mode = maps:get(raw_mode, Args, false),
998+
state_properties = StateProps},
992999
{State#state{links = Links#{OutHandle => Link},
9931000
next_link_handle = NextLinkHandle,
9941001
link_index = LinkIndex#{{Role, Name} => OutHandle}}, Ref}.
@@ -1031,11 +1038,17 @@ handle_link_flow(#'v1_0.flow'{delivery_count = TheirDC,
10311038
link_credit = {uint, TheirCredit},
10321039
available = Available,
10331040
drain = Drain0},
1034-
Link0 = #link{role = receiver}) ->
1041+
#link{role = receiver,
1042+
link_credit = OurCredit} = Link0) ->
10351043
Drain = default(Drain0, false),
10361044
Link = case Drain andalso TheirCredit =:= 0 of
10371045
true ->
1038-
notify_credit_exhausted(Link0),
1046+
case OurCredit > 0 of
1047+
true ->
1048+
notify_credit_exhausted(Link0);
1049+
false ->
1050+
ok
1051+
end,
10391052
Link0#link{delivery_count = unpack(TheirDC),
10401053
link_credit = 0,
10411054
available = unpack(Available),
@@ -1056,9 +1069,11 @@ find_link_by_input_handle(InHandle, #state{link_handle_index = LHI,
10561069
case Links of
10571070
#{OutHandle := Link} ->
10581071
{ok, Link};
1059-
_ -> not_found
1072+
_ ->
1073+
not_found
10601074
end;
1061-
_ -> not_found
1075+
_ ->
1076+
not_found
10621077
end.
10631078

10641079
with_link(InHandle, State, Fun) ->
@@ -1120,6 +1135,19 @@ maybe_notify_link_credit(#link{role = sender,
11201135
maybe_notify_link_credit(_Old, _New) ->
11211136
ok.
11221137

1138+
maybe_notify_link_state_properties(#'v1_0.flow'{properties = {map, KVList}},
1139+
#link{state_properties = #{} = OldProps} = Link)
1140+
when is_list(KVList) ->
1141+
case #{Key => unpack(Val) || {{symbol, Key}, Val} <- KVList} of
1142+
OldProps ->
1143+
Link;
1144+
NewProps ->
1145+
notify_link(Link, {state_properties, NewProps}),
1146+
Link#link{state_properties = NewProps}
1147+
end;
1148+
maybe_notify_link_state_properties(#'v1_0.flow'{}, Link) ->
1149+
Link.
1150+
11231151
notify_link_attached(Link, Perf, #state{connection_config = Cfg}) ->
11241152
What = case Cfg of
11251153
#{notify_with_performative := true} ->
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
%% This Source Code Form is subject to the terms of the Mozilla Public
2+
%% License, v. 2.0. If a copy of the MPL was not distributed with this
3+
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
4+
%%
5+
%% Copyright (c) 2007-2026 Broadcom. All Rights Reserved. The term "Broadcom"
6+
%% refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
7+
8+
-include_lib("amqp10_common/include/amqp10_types.hrl").
9+
10+
%% see AMQP 1.0 §2.6.7
11+
-type delivery_count() :: sequence_no().
12+
-type credit() :: uint().
13+
14+
-type link_state_properties() :: #{atom() => term()}.
15+
16+
-record(credit_reply, {ctag :: rabbit_types:ctag(),
17+
delivery_count :: delivery_count(),
18+
credit :: credit(),
19+
available :: non_neg_integer(),
20+
drain :: boolean(),
21+
properties :: link_state_properties()}).

0 commit comments

Comments
 (0)