Skip to content

Commit aaef3ea

Browse files
committed
Handle stream SAC monitors in SAC coordinator
The stream SAC coordinator already keeps track of its monitors but delegates some of the work to the stream coordinator (in its monitors map). This commit changes this to let the SAC coordinator handle its monitors itself. This decouples the 2 coordinators. This also fixes a bug whereby the SAC coordinator would correctly re-issue monitors but would not modify the monitors map accordingly. The SAC coordinator would not then be notified of the corresponding down messages. References rabbitmq/rabbitmq-stream-dotnet-client#447
1 parent d725359 commit aaef3ea

6 files changed

Lines changed: 202 additions & 40 deletions

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 46 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
8989
-define(V2_OR_MORE(Vsn), (Vsn >= 2)).
9090
-define(V5_OR_MORE(Vsn), (Vsn >= 5)).
91+
-define(V7_OR_MORE(Vsn), (Vsn >= 7)). %% SAC monitors no longer in monitors map
9192
-define(SAC_V4, rabbit_stream_sac_coordinator_v4).
9293
-define(SAC_CURRENT, rabbit_stream_sac_coordinator).
9394

@@ -543,7 +544,7 @@ reachable_coord_members() ->
543544
Nodes = rabbit_nodes:list_reachable(),
544545
[{?MODULE, Node} || Node <- Nodes].
545546

546-
version() -> 6.
547+
version() -> 7.
547548

548549
which_module(_) ->
549550
?MODULE.
@@ -588,14 +589,21 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
588589
{reply, Reply} ->
589590
return(Meta, State0, Reply, [])
590591
end;
591-
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
592-
monitors = Monitors0} = State0) ->
592+
apply(#{machine_version := Vsn} = Meta, {sac, SacCommand},
593+
#?MODULE{single_active_consumer = SacState0,
594+
monitors = Monitors0} = State0) ->
593595
Mod = sac_module(Meta),
594596
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
595597
{SacState2, Monitors1, Effects1} =
596598
Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
599+
Monitors2 = case ?V7_OR_MORE(Vsn) of
600+
true ->
601+
Monitors0;
602+
false ->
603+
Monitors1
604+
end,
597605
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
598-
monitors = Monitors1}, Reply, Effects1);
606+
monitors = Monitors2}, Reply, Effects1);
599607
apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
600608
#?MODULE{streams = Streams0,
601609
monitors = Monitors0,
@@ -607,6 +615,18 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
607615
_ ->
608616
[]
609617
end,
618+
{SacState1, SacEffects} =
619+
case Vsn >= 7 of
620+
true ->
621+
%% all down PIDs are submitted to SAC
622+
%% it filters out if not interested
623+
sac_handle_connection_down(Meta, SacState0,
624+
Pid, Reason, Vsn);
625+
false ->
626+
{SacState0, []}
627+
end,
628+
Effects1 = Effects0 ++ SacEffects,
629+
State1 = State#?MODULE{single_active_consumer = SacState1},
610630
case maps:take(Pid, Monitors0) of
611631
{{StreamId, listener}, Monitors} when Vsn < 2 ->
612632
Listeners = case maps:take(StreamId, StateListeners0) of
@@ -620,8 +640,8 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
620640
Listeners1#{StreamId => Pids}
621641
end
622642
end,
623-
return(Meta, State#?MODULE{listeners = Listeners,
624-
monitors = Monitors}, ok, Effects0);
643+
return(Meta, State1#?MODULE{listeners = Listeners,
644+
monitors = Monitors}, ok, Effects1);
625645
{{PidStreams, listener}, Monitors} when ?V2_OR_MORE(Vsn) ->
626646
Streams = maps:fold(
627647
fun(StreamId, _, Acc) ->
@@ -638,31 +658,34 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
638658
Acc
639659
end
640660
end, Streams0, PidStreams),
641-
return(Meta, State#?MODULE{streams = Streams,
642-
monitors = Monitors}, ok, Effects0);
661+
return(Meta, State1#?MODULE{streams = Streams,
662+
monitors = Monitors}, ok, Effects1);
643663
{{StreamId, member}, Monitors1} ->
644664
case Streams0 of
645665
#{StreamId := Stream0} ->
646666
Stream1 = update_stream(Meta, Cmd, Stream0),
647-
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0),
667+
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects1),
648668
Streams = Streams0#{StreamId => Stream},
649-
return(Meta, State#?MODULE{streams = Streams,
650-
monitors = Monitors1}, ok,
669+
return(Meta, State1#?MODULE{streams = Streams,
670+
monitors = Monitors1}, ok,
651671
Effects);
652672
_ ->
653673
%% stream not found, can happen if "late" downs are
654674
%% received
655-
return(Meta, State#?MODULE{streams = Streams0,
656-
monitors = Monitors1}, ok, Effects0)
675+
return(Meta, State1#?MODULE{streams = Streams0,
676+
monitors = Monitors1}, ok, Effects1)
657677
end;
658-
{sac, Monitors1} ->
659-
{SacState1, SacEffects} = sac_handle_connection_down(Meta, SacState0,
660-
Pid, Reason, Vsn),
661-
return(Meta, State#?MODULE{single_active_consumer = SacState1,
662-
monitors = Monitors1},
663-
ok, [Effects0 ++ SacEffects]);
678+
{sac, Monitors1} when Vsn < 7 ->
679+
#?MODULE{single_active_consumer = SacSt0} = State1,
680+
{SacSt1, SacEfts} = sac_handle_connection_down(Meta, SacSt0,
681+
Pid, Reason, Vsn),
682+
return(Meta, State1#?MODULE{single_active_consumer = SacSt1,
683+
monitors = Monitors1},
684+
ok, Effects1 ++ SacEfts);
664685
error ->
665-
return(Meta, State, ok, Effects0)
686+
return(Meta, State1, ok, Effects1);
687+
_ ->
688+
return(Meta, State1, ok, Effects1)
666689
end;
667690
apply(#{machine_version := MachineVersion} = Meta,
668691
{register_listener, #{pid := Pid,
@@ -2304,6 +2327,9 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
23042327
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
23052328
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
23062329
{State#?MODULE{single_active_consumer = Sac1}, []};
2330+
machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) ->
2331+
Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0),
2332+
{State#?MODULE{monitors = Monitors}, []};
23072333
machine_version(From, To, State) ->
23082334
?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
23092335
[From, To]),

deps/rabbit/src/rabbit_stream_coordinator.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1
6464
#{stream_id() => ok}, %% v2
6565
monitor_role()} |
66-
sac},
66+
sac %% before v7
67+
},
6768
%% not used as of v2
6869
listeners = #{} :: undefined | #{stream_id() =>
6970
#{pid() := queue_ref()}},

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,8 @@ list_nodes(#?MODULE{groups = Groups}) ->
955955

956956
-spec state_enter(ra_server:ra_state(), state() | term()) ->
957957
ra_machine:effects().
958-
state_enter(leader, #?MODULE{groups = Groups} = State)
958+
state_enter(leader, #?MODULE{groups = Groups,
959+
pids_groups = PidsGroups} = State)
959960
when ?IS_STATE_REC(State) ->
960961
%% becoming leader, we re-issue monitors and timers for connections with
961962
%% disconnected consumers
@@ -978,8 +979,9 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
978979
end, Acc, Cs)
979980
end, {#{}, #{}}, Groups),
980981
DisTimeout = disconnected_timeout(State),
981-
%% monitor involved nodes
982+
%% monitor connections and involved nodes
982983
%% reset a timer for disconnected connections
984+
[{monitor, process, P} || P <- lists:sort(maps:keys(PidsGroups))] ++
983985
[{monitor, node, N} || N <- lists:sort(maps:keys(Nodes))] ++
984986
[begin
985987
Time = case ts() - Ts of

deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl

Lines changed: 136 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,11 @@ all_tests() ->
2727
listeners,
2828
machine_version_upgrade_to_2,
2929
machine_version_upgrade_to_3,
30+
machine_version_upgrade_to_7,
31+
sac_v7_down_handler_should_not_use_monitors_map,
32+
sac_v7_ensure_monitors_should_not_use_monitors_map,
33+
sac_pre_v7_down_handler_should_use_monitors_map,
34+
sac_pre_v7_ensure_monitors_should_use_monitors_map,
3035
new_stream,
3136
leader_down,
3237
leader_down_scenario_1,
@@ -60,9 +65,23 @@ init_per_group(_Group, Config) ->
6065
end_per_group(_Group, _Config) ->
6166
ok.
6267

68+
init_per_testcase(TestCase, Config)
69+
when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map;
70+
TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map;
71+
TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map;
72+
TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
73+
ok = meck:new(rabbit_stream_sac_coordinator, [no_link]),
74+
Config;
6375
init_per_testcase(_TestCase, Config) ->
6476
Config.
6577

78+
end_per_testcase(TestCase, _Config)
79+
when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map;
80+
TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map;
81+
TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map;
82+
TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
83+
meck:unload(rabbit_stream_sac_coordinator),
84+
ok;
6685
end_per_testcase(_TestCase, _Config) ->
6786
ok.
6887

@@ -253,6 +272,123 @@ machine_version_to_3(From) ->
253272
?assertEqual(Effects, []),
254273
ok.
255274

275+
machine_version_upgrade_to_7(_) ->
276+
Pid1 = spawn(fun() -> ok end),
277+
Pid2 = spawn(fun() -> ok end),
278+
Pid3 = spawn(fun() -> ok end),
279+
S = <<"stream">>,
280+
Monitors0 = #{Pid1 => sac,
281+
Pid2 => {S, member},
282+
Pid3 => sac},
283+
State0 = #?STATE{monitors = Monitors0},
284+
285+
{State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, 6, 7}, State0),
286+
287+
?assertEqual(#{Pid2 => {S, member}}, State1#?STATE.monitors),
288+
?assertEqual([], Effects),
289+
ok.
290+
291+
sac_v7_down_handler_should_not_use_monitors_map(_) ->
292+
ConnectionPid = spawn(fun() -> ok end),
293+
SacState0 = fake_sac_state,
294+
SacState1 = updated_sac_state,
295+
meck:expect(rabbit_stream_sac_coordinator, handle_connection_down,
296+
fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid,
297+
State =:= SacState0 ->
298+
{SacState1, []}
299+
end),
300+
301+
OtherPid = spawn(fun() -> ok end),
302+
Monitors0 = #{OtherPid => {<<"other">>, member}},
303+
State0 = #?STATE{single_active_consumer = SacState0,
304+
monitors = Monitors0},
305+
306+
{State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}),
307+
{down, ConnectionPid, normal}, State0),
308+
309+
?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down,
310+
['_', ConnectionPid, normal, SacState0])),
311+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
312+
?assertEqual(Monitors0, State1#?STATE.monitors),
313+
ok.
314+
315+
sac_v7_ensure_monitors_should_not_use_monitors_map(_) ->
316+
ConnectionPid = self(),
317+
SacCmd = fake_sac_cmd,
318+
SacState0 = fake_sac_state,
319+
SacState1 = updated_sac_state,
320+
meck:expect(rabbit_stream_sac_coordinator, apply,
321+
fun(Cmd, State) when Cmd =:= SacCmd,
322+
State =:= SacState0 ->
323+
{SacState1, {ok, true}, []}
324+
end),
325+
meck:expect(rabbit_stream_sac_coordinator, ensure_monitors,
326+
fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd,
327+
State =:= SacState1 ->
328+
{State, Monitors#{ConnectionPid => sac}, Effects}
329+
end),
330+
331+
State0 = #?STATE{single_active_consumer = SacState0,
332+
monitors = #{}},
333+
334+
{State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}),
335+
{sac, SacCmd}, State0),
336+
337+
?assertEqual(#{}, State1#?STATE.monitors),
338+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
339+
ok.
340+
341+
sac_pre_v7_down_handler_should_use_monitors_map(_) ->
342+
ConnectionPid = spawn(fun() -> ok end),
343+
SacState0 = fake_sac_state,
344+
SacState1 = updated_sac_state,
345+
meck:expect(rabbit_stream_sac_coordinator, handle_connection_down,
346+
fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid,
347+
State =:= SacState0 ->
348+
{SacState1, []}
349+
end),
350+
351+
OtherPid = spawn(fun() -> ok end),
352+
Monitors0 = #{ConnectionPid => sac,
353+
OtherPid => {<<"other">>, member}},
354+
State0 = #?STATE{single_active_consumer = SacState0,
355+
monitors = Monitors0},
356+
357+
{State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}),
358+
{down, ConnectionPid, normal}, State0),
359+
360+
?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down,
361+
['_', ConnectionPid, normal, SacState0])),
362+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
363+
?assertEqual(#{OtherPid => {<<"other">>, member}}, State1#?STATE.monitors),
364+
ok.
365+
366+
sac_pre_v7_ensure_monitors_should_use_monitors_map(_) ->
367+
ConnectionPid = self(),
368+
SacCmd = fake_sac_cmd,
369+
SacState0 = fake_sac_state,
370+
SacState1 = updated_sac_state,
371+
meck:expect(rabbit_stream_sac_coordinator, apply,
372+
fun(Cmd, State) when Cmd =:= SacCmd,
373+
State =:= SacState0 ->
374+
{SacState1, {ok, true}, []}
375+
end),
376+
meck:expect(rabbit_stream_sac_coordinator, ensure_monitors,
377+
fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd,
378+
State =:= SacState1 ->
379+
{State, Monitors#{ConnectionPid => sac}, Effects}
380+
end),
381+
382+
State0 = #?STATE{single_active_consumer = SacState0,
383+
monitors = #{}},
384+
385+
{State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}),
386+
{sac, SacCmd}, State0),
387+
388+
?assertEqual(#{ConnectionPid => sac}, State1#?STATE.monitors),
389+
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
390+
ok.
391+
256392
new_stream(_) ->
257393
[N1, N2, N3] = Nodes = [r@n1, r@n2, r@n3],
258394
StreamId = atom_to_list(?FUNCTION_NAME),

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -56,15 +56,6 @@ init_per_group(_Group, Config) ->
5656
end_per_group(_Group, _Config) ->
5757
ok.
5858

59-
init_per_testcase(_TestCase, Config) ->
60-
ok = meck:new(rabbit_feature_flags),
61-
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
62-
Config.
63-
64-
end_per_testcase(_TestCase, _Config) ->
65-
meck:unload(),
66-
ok.
67-
6859
check_conf_test(_) ->
6960
K = disconnected_timeout,
7061
Def = 60_000,
@@ -1870,21 +1861,21 @@ state_enter_test(_) ->
18701861

18711862
assertEmpty(?MOD:state_enter(follower, #{})),
18721863

1873-
?assertEqual(mon_node_eff([N0, N1, N2]),
1864+
?assertEqual(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]),
18741865
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
18751866
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
18761867
Id2 => grp([csr(P2), csr(P2), csr(P2)])})),
18771868

1878-
?assertEqual(mon_node_eff([N0, N1]),
1869+
?assertEqual(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]),
18791870
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
18801871
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
18811872
Id2 => grp([csr(P0), csr(P1), csr(P1)])})),
18821873

1883-
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ [timer_eff(P1)]),
1874+
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]) ++ [timer_eff(P1)]),
18841875
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
18851876
Id2 => grp([csr(P0)])})),
18861877

1887-
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ timer_eff([P1, P2])),
1878+
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]) ++ timer_eff([P1, P2])),
18881879
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
18891880
Id1 => grp([csr(P0), csr(P2, {disconnected, waiting})]),
18901881
Id2 => grp([csr(P0), csr(P1, {disconnected, waiting})])})),
@@ -1898,6 +1889,12 @@ mon_node_eff(Nodes) when is_list(Nodes) ->
18981889
mon_node_eff(N) ->
18991890
{monitor, node, N}.
19001891

1892+
mon_proc_eff(Pids) when is_list(Pids) ->
1893+
lists:sort([mon_proc_eff(P) || P <- Pids]);
1894+
mon_proc_eff(Pid) ->
1895+
{monitor, process, Pid}.
1896+
1897+
19011898
timer_eff(Pids) when is_list(Pids) ->
19021899
lists:sort([timer_eff(Pid) || Pid <- Pids]);
19031900
timer_eff(Pid) ->

deps/rabbitmq_stream/docs/stream_coordinator.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -55,16 +55,16 @@ sequenceDiagram
5555
```mermaid
5656
flowchart TB
5757
A(monitor) --noconnection--> B(status = disconnected, set up timer)
58-
B -. timeout .-> C(status = forgotten)
58+
B -. timeout .-> C(status = presumed_down)
5959
B -. nodeup .-> D(reissue monitors, send msg to connections)
6060
D -. down .-> E(handle connection down)
6161
D -. connection response .-> F(evaluate impacted groups)
6262
```
6363

64-
* composite status for consumers: `{connected, active}`, `{disconnected,active}`, etc.
64+
* composite status for consumers: `{connected, active}`, `{disconnected, active}`, `{presumed_down, active}`, etc.
6565
* `disconnected` status can prevent rebalancing in a group, e.g. `{disconnected, active}` (it is impossible to tell the active consumer to step down)
66-
* consumers in `forgotten` status are ignored during rebalancing
67-
* it may be necessary to reconcile a group if a `{forgotten, active}` consumer comes back in a group ("evaluate impacted groups" box above).
66+
* consumers in `presumed_down` status are ignored during rebalancing (they are not eligible)
67+
* it may be necessary to reconcile a group if a `{presumed_down, active}` consumer comes back in a group ("evaluate impacted groups" box above).
6868
This is unlikely though.
6969

7070
### Stale Node Detection

0 commit comments

Comments
 (0)