Skip to content

Commit 8fa1fd2

Browse files
authored
Merge pull request #15695 from rabbitmq/stream-sac-monitors-in-sac-coordinator
Handle stream SAC monitors in SAC coordinator
2 parents d725359 + 996c2c3 commit 8fa1fd2

6 files changed

Lines changed: 202 additions & 40 deletions

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
8989
-define(V2_OR_MORE(Vsn), (Vsn >= 2)).
9090
-define(V5_OR_MORE(Vsn), (Vsn >= 5)).
91+
-define(V7_OR_MORE(Vsn), (Vsn >= 7)). %% SAC monitors no longer in monitors map
9192
-define(SAC_V4, rabbit_stream_sac_coordinator_v4).
9293
-define(SAC_CURRENT, rabbit_stream_sac_coordinator).
9394

@@ -543,7 +544,7 @@ reachable_coord_members() ->
543544
Nodes = rabbit_nodes:list_reachable(),
544545
[{?MODULE, Node} || Node <- Nodes].
545546

546-
version() -> 6.
547+
version() -> 7.
547548

548549
which_module(_) ->
549550
?MODULE.
@@ -588,14 +589,21 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
588589
{reply, Reply} ->
589590
return(Meta, State0, Reply, [])
590591
end;
591-
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
592-
monitors = Monitors0} = State0) ->
592+
apply(#{machine_version := Vsn} = Meta, {sac, SacCommand},
593+
#?MODULE{single_active_consumer = SacState0,
594+
monitors = Monitors0} = State0) ->
593595
Mod = sac_module(Meta),
594596
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
595597
{SacState2, Monitors1, Effects1} =
596598
Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
599+
Monitors2 = case ?V7_OR_MORE(Vsn) of
600+
true ->
601+
Monitors0;
602+
false ->
603+
Monitors1
604+
end,
597605
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
598-
monitors = Monitors1}, Reply, Effects1);
606+
monitors = Monitors2}, Reply, Effects1);
599607
apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
600608
#?MODULE{streams = Streams0,
601609
monitors = Monitors0,
@@ -607,6 +615,18 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
607615
_ ->
608616
[]
609617
end,
618+
{SacState1, SacEffects} =
619+
case Vsn >= 7 of
620+
true ->
621+
%% all down PIDs are submitted to SAC
622+
%% it filters out if not interested
623+
sac_handle_connection_down(Meta, SacState0,
624+
Pid, Reason, Vsn);
625+
false ->
626+
{SacState0, []}
627+
end,
628+
Effects1 = Effects0 ++ SacEffects,
629+
State1 = State#?MODULE{single_active_consumer = SacState1},
610630
case maps:take(Pid, Monitors0) of
611631
{{StreamId, listener}, Monitors} when Vsn < 2 ->
612632
Listeners = case maps:take(StreamId, StateListeners0) of
@@ -620,8 +640,8 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
620640
Listeners1#{StreamId => Pids}
621641
end
622642
end,
623-
return(Meta, State#?MODULE{listeners = Listeners,
624-
monitors = Monitors}, ok, Effects0);
643+
return(Meta, State1#?MODULE{listeners = Listeners,
644+
monitors = Monitors}, ok, Effects1);
625645
{{PidStreams, listener}, Monitors} when ?V2_OR_MORE(Vsn) ->
626646
Streams = maps:fold(
627647
fun(StreamId, _, Acc) ->
@@ -638,31 +658,34 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
638658
Acc
639659
end
640660
end, Streams0, PidStreams),
641-
return(Meta, State#?MODULE{streams = Streams,
642-
monitors = Monitors}, ok, Effects0);
661+
return(Meta, State1#?MODULE{streams = Streams,
662+
monitors = Monitors}, ok, Effects1);
643663
{{StreamId, member}, Monitors1} ->
644664
case Streams0 of
645665
#{StreamId := Stream0} ->
646666
Stream1 = update_stream(Meta, Cmd, Stream0),
647-
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0),
667+
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects1),
648668
Streams = Streams0#{StreamId => Stream},
649-
return(Meta, State#?MODULE{streams = Streams,
650-
monitors = Monitors1}, ok,
669+
return(Meta, State1#?MODULE{streams = Streams,
670+
monitors = Monitors1}, ok,
651671
Effects);
652672
_ ->
653673
%% stream not found, can happen if "late" downs are
654674
%% received
655-
return(Meta, State#?MODULE{streams = Streams0,
656-
monitors = Monitors1}, ok, Effects0)
675+
return(Meta, State1#?MODULE{streams = Streams0,
676+
monitors = Monitors1}, ok, Effects1)
657677
end;
658-
{sac, Monitors1} ->
659-
{SacState1, SacEffects} = sac_handle_connection_down(Meta, SacState0,
660-
Pid, Reason, Vsn),
661-
return(Meta, State#?MODULE{single_active_consumer = SacState1,
662-
monitors = Monitors1},
663-
ok, [Effects0 ++ SacEffects]);
678+
{sac, Monitors1} when Vsn < 7 ->
679+
#?MODULE{single_active_consumer = SacSt0} = State1,
680+
{SacSt1, SacEfts} = sac_handle_connection_down(Meta, SacSt0,
681+
Pid, Reason, Vsn),
682+
return(Meta, State1#?MODULE{single_active_consumer = SacSt1,
683+
monitors = Monitors1},
684+
ok, Effects1 ++ SacEfts);
664685
error ->
665-
return(Meta, State, ok, Effects0)
686+
return(Meta, State1, ok, Effects1);
687+
_ ->
688+
return(Meta, State1, ok, Effects1)
666689
end;
667690
apply(#{machine_version := MachineVersion} = Meta,
668691
{register_listener, #{pid := Pid,
@@ -2304,6 +2327,9 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
23042327
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
23052328
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
23062329
{State#?MODULE{single_active_consumer = Sac1}, []};
2330+
machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) ->
2331+
Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0),
2332+
{State#?MODULE{monitors = Monitors}, []};
23072333
machine_version(From, To, State) ->
23082334
?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
23092335
[From, To]),

deps/rabbit/src/rabbit_stream_coordinator.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1
6464
#{stream_id() => ok}, %% v2
6565
monitor_role()} |
66-
sac},
66+
sac %% before v7
67+
},
6768
%% not used as of v2
6869
listeners = #{} :: undefined | #{stream_id() =>
6970
#{pid() := queue_ref()}},

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,8 @@ list_nodes(#?MODULE{groups = Groups}) ->
955955

956956
-spec state_enter(ra_server:ra_state(), state() | term()) ->
957957
ra_machine:effects().
958-
state_enter(leader, #?MODULE{groups = Groups} = State)
958+
state_enter(leader, #?MODULE{groups = Groups,
959+
pids_groups = PidsGroups} = State)
959960
when ?IS_STATE_REC(State) ->
960961
%% becoming leader, we re-issue monitors and timers for connections with
961962
%% disconnected consumers
@@ -978,8 +979,9 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
978979
end, Acc, Cs)
979980
end, {#{}, #{}}, Groups),
980981
DisTimeout = disconnected_timeout(State),
981-
%% monitor involved nodes
982+
%% monitor connections and involved nodes
982983
%% reset a timer for disconnected connections
984+
[{monitor, process, P} || P <- lists:sort(maps:keys(PidsGroups))] ++
983985
[{monitor, node, N} || N <- lists:sort(maps:keys(Nodes))] ++
984986
[begin
985987
Time = case ts() - Ts of

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ all_tests() ->
2727
listeners,
2828
machine_version_upgrade_to_2,
2929
machine_version_upgrade_to_3,
30+
machine_version_upgrade_to_7,
31+
sac_v7_down_handler_should_not_use_monitors_map,
32+
sac_v7_ensure_monitors_should_not_use_monitors_map,
33+
sac_pre_v7_down_handler_should_use_monitors_map,
34+
sac_pre_v7_ensure_monitors_should_use_monitors_map,
3035
new_stream,
3136
leader_down,
3237
leader_down_scenario_1,
@@ -60,9 +65,23 @@ init_per_group(_Group, Config) ->
6065
end_per_group(_Group, _Config) ->
6166
ok.
6267

68+
init_per_testcase(TestCase, Config)
69+
when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map;
70+
TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map;
71+
TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map;
72+
TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
73+
ok = meck:new(rabbit_stream_sac_coordinator, [no_link]),
74+
Config;
6375
init_per_testcase(_TestCase, Config) ->
6476
Config.
6577

78+
end_per_testcase(TestCase, _Config)
79+
when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map;
80+
TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map;
81+
TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map;
82+
TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
83+
meck:unload(rabbit_stream_sac_coordinator),
84+
ok;
6685
end_per_testcase(_TestCase, _Config) ->
6786
ok.
6887

@@ -253,6 +272,123 @@ machine_version_to_3(From) ->
253272
?assertEqual(Effects, []),
254273
ok.
255274

275+
machine_version_upgrade_to_7(_) ->
276+
Pid1 = spawn(fun() -> ok end),
277+
Pid2 = spawn(fun() -> ok end),
278+
Pid3 = spawn(fun() -> ok end),
279+
S = <<"stream">>,
280+
Monitors0 = #{Pid1 => sac,
281+
Pid2 => {S, member},
282+
Pid3 => sac},
283+
State0 = #?STATE{monitors = Monitors0},
284+
285+
{State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, 6, 7}, State0),
286+
287+
?assertEqual(#{Pid2 => {S, member}}, State1#?STATE.monitors),
288+
?assertEqual([], Effects),
289+
ok.
290+
291+
sac_v7_down_handler_should_not_use_monitors_map(_) ->
292+
ConnectionPid = spawn(fun() -> ok end),
293+
SacState0 = fake_sac_state,
294+
SacState1 = updated_sac_state,
295+
meck:expect(rabbit_stream_sac_coordinator, handle_connection_down,
296+
fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid,
297+
State =:= SacState0 ->
298+
{SacState1, []}
299+
end),
300+
301+
OtherPid = spawn(fun() -> ok end),
302+
Monitors0 = #{OtherPid => {<<"other">>, member}},
303+
State0 = #?STATE{single_active_consumer = SacState0,
304+
monitors = Monitors0},
305+
306+
{State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}),
307+
{down, ConnectionPid, normal}, State0),
308+
309+
?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down,
310+
['_', ConnectionPid, normal, SacState0])),
311+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
312+
?assertEqual(Monitors0, State1#?STATE.monitors),
313+
ok.
314+
315+
sac_v7_ensure_monitors_should_not_use_monitors_map(_) ->
316+
ConnectionPid = self(),
317+
SacCmd = fake_sac_cmd,
318+
SacState0 = fake_sac_state,
319+
SacState1 = updated_sac_state,
320+
meck:expect(rabbit_stream_sac_coordinator, apply,
321+
fun(Cmd, State) when Cmd =:= SacCmd,
322+
State =:= SacState0 ->
323+
{SacState1, {ok, true}, []}
324+
end),
325+
meck:expect(rabbit_stream_sac_coordinator, ensure_monitors,
326+
fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd,
327+
State =:= SacState1 ->
328+
{State, Monitors#{ConnectionPid => sac}, Effects}
329+
end),
330+
331+
State0 = #?STATE{single_active_consumer = SacState0,
332+
monitors = #{}},
333+
334+
{State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}),
335+
{sac, SacCmd}, State0),
336+
337+
?assertEqual(#{}, State1#?STATE.monitors),
338+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
339+
ok.
340+
341+
sac_pre_v7_down_handler_should_use_monitors_map(_) ->
342+
ConnectionPid = spawn(fun() -> ok end),
343+
SacState0 = fake_sac_state,
344+
SacState1 = updated_sac_state,
345+
meck:expect(rabbit_stream_sac_coordinator, handle_connection_down,
346+
fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid,
347+
State =:= SacState0 ->
348+
{SacState1, []}
349+
end),
350+
351+
OtherPid = spawn(fun() -> ok end),
352+
Monitors0 = #{ConnectionPid => sac,
353+
OtherPid => {<<"other">>, member}},
354+
State0 = #?STATE{single_active_consumer = SacState0,
355+
monitors = Monitors0},
356+
357+
{State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}),
358+
{down, ConnectionPid, normal}, State0),
359+
360+
?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down,
361+
['_', ConnectionPid, normal, SacState0])),
362+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
363+
?assertEqual(#{OtherPid => {<<"other">>, member}}, State1#?STATE.monitors),
364+
ok.
365+
366+
sac_pre_v7_ensure_monitors_should_use_monitors_map(_) ->
367+
ConnectionPid = self(),
368+
SacCmd = fake_sac_cmd,
369+
SacState0 = fake_sac_state,
370+
SacState1 = updated_sac_state,
371+
meck:expect(rabbit_stream_sac_coordinator, apply,
372+
fun(Cmd, State) when Cmd =:= SacCmd,
373+
State =:= SacState0 ->
374+
{SacState1, {ok, true}, []}
375+
end),
376+
meck:expect(rabbit_stream_sac_coordinator, ensure_monitors,
377+
fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd,
378+
State =:= SacState1 ->
379+
{State, Monitors#{ConnectionPid => sac}, Effects}
380+
end),
381+
382+
State0 = #?STATE{single_active_consumer = SacState0,
383+
monitors = #{}},
384+
385+
{State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}),
386+
{sac, SacCmd}, State0),
387+
388+
?assertEqual(#{ConnectionPid => sac}, State1#?STATE.monitors),
389+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
390+
ok.
391+
256392
new_stream(_) ->
257393
[N1, N2, N3] = Nodes = [r@n1, r@n2, r@n3],
258394
StreamId = atom_to_list(?FUNCTION_NAME),

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,6 @@ init_per_group(_Group, Config) ->
5656
end_per_group(_Group, _Config) ->
5757
ok.
5858

59-
init_per_testcase(_TestCase, Config) ->
60-
ok = meck:new(rabbit_feature_flags),
61-
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
62-
Config.
63-
64-
end_per_testcase(_TestCase, _Config) ->
65-
meck:unload(),
66-
ok.
67-
6859
check_conf_test(_) ->
6960
K = disconnected_timeout,
7061
Def = 60_000,
@@ -1870,21 +1861,21 @@ state_enter_test(_) ->
18701861

18711862
assertEmpty(?MOD:state_enter(follower, #{})),
18721863

1873-
?assertEqual(mon_node_eff([N0, N1, N2]),
1864+
?assertEqual(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]),
18741865
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
18751866
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
18761867
Id2 => grp([csr(P2), csr(P2), csr(P2)])})),
18771868

1878-
?assertEqual(mon_node_eff([N0, N1]),
1869+
?assertEqual(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]),
18791870
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
18801871
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
18811872
Id2 => grp([csr(P0), csr(P1), csr(P1)])})),
18821873

1883-
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ [timer_eff(P1)]),
1874+
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]) ++ [timer_eff(P1)]),
18841875
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
18851876
Id2 => grp([csr(P0)])})),
18861877

1887-
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ timer_eff([P1, P2])),
1878+
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]) ++ timer_eff([P1, P2])),
18881879
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
18891880
Id1 => grp([csr(P0), csr(P2, {disconnected, waiting})]),
18901881
Id2 => grp([csr(P0), csr(P1, {disconnected, waiting})])})),
@@ -1898,6 +1889,12 @@ mon_node_eff(Nodes) when is_list(Nodes) ->
18981889
mon_node_eff(N) ->
18991890
{monitor, node, N}.
19001891

1892+
mon_proc_eff(Pids) when is_list(Pids) ->
1893+
lists:sort([mon_proc_eff(P) || P <- Pids]);
1894+
mon_proc_eff(Pid) ->
1895+
{monitor, process, Pid}.
1896+
1897+
19011898
timer_eff(Pids) when is_list(Pids) ->
19021899
lists:sort([timer_eff(Pid) || Pid <- Pids]);
19031900
timer_eff(Pid) ->

deps/rabbitmq_stream/docs/stream_coordinator.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,16 @@ sequenceDiagram
5555
```mermaid
5656
flowchart TB
5757
A(monitor) --noconnection--> B(status = disconnected, set up timer)
58-
B -. timeout .-> C(status = forgotten)
58+
B -. timeout .-> C(status = presumed_down)
5959
B -. nodeup .-> D(reissue monitors, send msg to connections)
6060
D -. down .-> E(handle connection down)
6161
D -. connection response .-> F(evaluate impacted groups)
6262
```
6363

64-
* composite status for consumers: `{connected, active}`, `{disconnected,active}`, etc.
64+
* composite status for consumers: `{connected, active}`, `{disconnected, active}`, `{presumed_down, active}`, etc.
6565
* `disconnected` status can prevent rebalancing in a group, e.g. `{disconnected, active}` (it is impossible to tell the active consumer to step down)
66-
* consumers in `forgotten` status are ignored during rebalancing
67-
* it may be necessary to reconcile a group if a `{forgotten, active}` consumer comes back in a group ("evaluate impacted groups" box above).
66+
* consumers in `presumed_down` status are ignored during rebalancing (they are not eligible)
67+
* it may be necessary to reconcile a group if a `{presumed_down, active}` consumer comes back in a group ("evaluate impacted groups" box above).
6868
This is unlikely though.
6969

7070
### Stale Node Detection

0 commit comments

Comments
 (0)