Skip to content

Commit bd5a38e

Browse files
committed
Handle stream SAC monitors in SAC coordinator
The stream SAC coordinator already keeps track of its monitors but delegates some of the work to the stream coordinator (in its monitors map). This commit changes this to let the SAC coordinator handle its monitors itself. This decouples the 2 coordinators. This also fixes a bug whereby the SAC coordinator would correctly re-issue monitors but would not modify the monitors map accordingly. The SAC coordinator would not then be notified of the corresponding down messages. References rabbitmq/rabbitmq-stream-dotnet-client#447
1 parent 2d28353 commit bd5a38e

4 files changed

Lines changed: 61 additions & 26 deletions

File tree

deps/rabbit/src/rabbit_stream_coordinator.erl

Lines changed: 45 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@
8888
-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
8989
-define(V2_OR_MORE(Vsn), (Vsn >= 2)).
9090
-define(V5_OR_MORE(Vsn), (Vsn >= 5)).
91+
-define(V7_OR_MORE(Vsn), (Vsn >= 7)). %% SAC monitors no longer in monitors map
9192
-define(SAC_V4, rabbit_stream_sac_coordinator_v4).
9293
-define(SAC_CURRENT, rabbit_stream_sac_coordinator).
9394

@@ -543,7 +544,7 @@ reachable_coord_members() ->
543544
Nodes = rabbit_nodes:list_reachable(),
544545
[{?MODULE, Node} || Node <- Nodes].
545546

546-
version() -> 6.
547+
version() -> 7.
547548

548549
which_module(_) ->
549550
?MODULE.
@@ -588,14 +589,21 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
588589
{reply, Reply} ->
589590
return(Meta, State0, Reply, [])
590591
end;
591-
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
592-
monitors = Monitors0} = State0) ->
592+
apply(#{machine_version := Vsn} = Meta, {sac, SacCommand},
593+
#?MODULE{single_active_consumer = SacState0,
594+
monitors = Monitors0} = State0) ->
593595
Mod = sac_module(Meta),
594596
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
595597
{SacState2, Monitors1, Effects1} =
596598
Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
599+
Monitors2 = case ?V7_OR_MORE(Vsn) of
600+
true ->
601+
Monitors0;
602+
false ->
603+
Monitors1
604+
end,
597605
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
598-
monitors = Monitors1}, Reply, Effects1);
606+
monitors = Monitors2}, Reply, Effects1);
599607
apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
600608
#?MODULE{streams = Streams0,
601609
monitors = Monitors0,
@@ -607,6 +615,18 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
607615
_ ->
608616
[]
609617
end,
618+
{SacState1, SacEffects} =
619+
case Vsn >= 7 of
620+
true ->
621+
%% all down PIDs are submitted to SAC
622+
%% it filters out if not interested
623+
sac_handle_connection_down(Meta, SacState0,
624+
Pid, Reason, Vsn);
625+
false ->
626+
{SacState0, []}
627+
end,
628+
Effects1 = Effects0 ++ SacEffects,
629+
State1 = State#?MODULE{single_active_consumer = SacState1},
610630
case maps:take(Pid, Monitors0) of
611631
{{StreamId, listener}, Monitors} when Vsn < 2 ->
612632
Listeners = case maps:take(StreamId, StateListeners0) of
@@ -620,8 +640,8 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
620640
Listeners1#{StreamId => Pids}
621641
end
622642
end,
623-
return(Meta, State#?MODULE{listeners = Listeners,
624-
monitors = Monitors}, ok, Effects0);
643+
return(Meta, State1#?MODULE{listeners = Listeners,
644+
monitors = Monitors}, ok, Effects1);
625645
{{PidStreams, listener}, Monitors} when ?V2_OR_MORE(Vsn) ->
626646
Streams = maps:fold(
627647
fun(StreamId, _, Acc) ->
@@ -638,31 +658,34 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
638658
Acc
639659
end
640660
end, Streams0, PidStreams),
641-
return(Meta, State#?MODULE{streams = Streams,
642-
monitors = Monitors}, ok, Effects0);
661+
return(Meta, State1#?MODULE{streams = Streams,
662+
monitors = Monitors}, ok, Effects1);
643663
{{StreamId, member}, Monitors1} ->
644664
case Streams0 of
645665
#{StreamId := Stream0} ->
646666
Stream1 = update_stream(Meta, Cmd, Stream0),
647-
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0),
667+
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects1),
648668
Streams = Streams0#{StreamId => Stream},
649-
return(Meta, State#?MODULE{streams = Streams,
650-
monitors = Monitors1}, ok,
669+
return(Meta, State1#?MODULE{streams = Streams,
670+
monitors = Monitors1}, ok,
651671
Effects);
652672
_ ->
653673
%% stream not found, can happen if "late" downs are
654674
%% received
655-
return(Meta, State#?MODULE{streams = Streams0,
656-
monitors = Monitors1}, ok, Effects0)
675+
return(Meta, State1#?MODULE{streams = Streams0,
676+
monitors = Monitors1}, ok, Effects1)
657677
end;
658-
{sac, Monitors1} ->
659-
{SacState1, SacEffects} = sac_handle_connection_down(Meta, SacState0,
660-
Pid, Reason, Vsn),
661-
return(Meta, State#?MODULE{single_active_consumer = SacState1,
678+
{sac, Monitors1} when Vsn < 7 ->
679+
#?MODULE{single_active_consumer = SacSt0} = State1,
680+
{SacSt1, SacEfts} = sac_handle_connection_down(Meta, SacSt0,
681+
Pid, Reason, Vsn),
682+
return(Meta, State#?MODULE{single_active_consumer = SacSt1,
662683
monitors = Monitors1},
663-
ok, [Effects0 ++ SacEffects]);
684+
ok, [Effects1 ++ SacEfts]);
664685
error ->
665-
return(Meta, State, ok, Effects0)
686+
return(Meta, State1, ok, Effects1);
687+
_ ->
688+
return(Meta, State1, ok, Effects1)
666689
end;
667690
apply(#{machine_version := MachineVersion} = Meta,
668691
{register_listener, #{pid := Pid,
@@ -2304,6 +2327,9 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
23042327
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
23052328
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
23062329
{State#?MODULE{single_active_consumer = Sac1}, []};
2330+
machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) ->
2331+
Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0),
2332+
{State#?MODULE{monitors = Monitors}, []};
23072333
machine_version(From, To, State) ->
23082334
?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
23092335
[From, To]),

deps/rabbit/src/rabbit_stream_coordinator.hrl

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@
6363
monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1
6464
#{stream_id() => ok}, %% v2
6565
monitor_role()} |
66-
sac},
66+
sac %% before v7
67+
},
6768
%% not used as of v2
6869
listeners = #{} :: undefined | #{stream_id() =>
6970
#{pid() := queue_ref()}},

deps/rabbit/src/rabbit_stream_sac_coordinator.erl

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,7 +955,8 @@ list_nodes(#?MODULE{groups = Groups}) ->
955955

956956
-spec state_enter(ra_server:ra_state(), state() | term()) ->
957957
ra_machine:effects().
958-
state_enter(leader, #?MODULE{groups = Groups} = State)
958+
state_enter(leader, #?MODULE{groups = Groups,
959+
pids_groups = PidsGroups} = State)
959960
when ?IS_STATE_REC(State) ->
960961
%% becoming leader, we re-issue monitors and timers for connections with
961962
%% disconnected consumers
@@ -978,8 +979,9 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
978979
end, Acc, Cs)
979980
end, {#{}, #{}}, Groups),
980981
DisTimeout = disconnected_timeout(State),
981-
%% monitor involved nodes
982+
%% monitor connections and involved nodes
982983
%% reset a timer for disconnected connections
984+
[{monitor, process, P} || P <- lists:sort(maps:keys(PidsGroups))] ++
983985
[{monitor, node, N} || N <- lists:sort(maps:keys(Nodes))] ++
984986
[begin
985987
Time = case ts() - Ts of

deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1870,21 +1870,21 @@ state_enter_test(_) ->
18701870

18711871
assertEmpty(?MOD:state_enter(follower, #{})),
18721872

1873-
?assertEqual(mon_node_eff([N0, N1, N2]),
1873+
?assertEqual(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]),
18741874
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
18751875
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
18761876
Id2 => grp([csr(P2), csr(P2), csr(P2)])})),
18771877

1878-
?assertEqual(mon_node_eff([N0, N1]),
1878+
?assertEqual(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]),
18791879
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
18801880
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
18811881
Id2 => grp([csr(P0), csr(P1), csr(P1)])})),
18821882

1883-
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ [timer_eff(P1)]),
1883+
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]) ++ [timer_eff(P1)]),
18841884
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
18851885
Id2 => grp([csr(P0)])})),
18861886

1887-
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ timer_eff([P1, P2])),
1887+
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]) ++ timer_eff([P1, P2])),
18881888
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
18891889
Id1 => grp([csr(P0), csr(P2, {disconnected, waiting})]),
18901890
Id2 => grp([csr(P0), csr(P1, {disconnected, waiting})])})),
@@ -1898,6 +1898,12 @@ mon_node_eff(Nodes) when is_list(Nodes) ->
18981898
mon_node_eff(N) ->
18991899
{monitor, node, N}.
19001900

1901+
mon_proc_eff(Pids) when is_list(Pids) ->
1902+
lists:sort([mon_proc_eff(P) || P <- Pids]);
1903+
mon_proc_eff(Pid) ->
1904+
{monitor, process, Pid}.
1905+
1906+
19011907
timer_eff(Pids) when is_list(Pids) ->
19021908
lists:sort([timer_eff(Pid) || Pid <- Pids]);
19031909
timer_eff(Pid) ->

0 commit comments

Comments
 (0)