Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 46 additions & 20 deletions deps/rabbit/src/rabbit_stream_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
-define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s
-define(V2_OR_MORE(Vsn), (Vsn >= 2)).
-define(V5_OR_MORE(Vsn), (Vsn >= 5)).
-define(V7_OR_MORE(Vsn), (Vsn >= 7)). %% SAC monitors no longer in monitors map
-define(SAC_V4, rabbit_stream_sac_coordinator_v4).
-define(SAC_CURRENT, rabbit_stream_sac_coordinator).

Expand Down Expand Up @@ -543,7 +544,7 @@ reachable_coord_members() ->
Nodes = rabbit_nodes:list_reachable(),
[{?MODULE, Node} || Node <- Nodes].

version() -> 6.
version() -> 7.

which_module(_) ->
?MODULE.
Expand Down Expand Up @@ -588,14 +589,21 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0,
{reply, Reply} ->
return(Meta, State0, Reply, [])
end;
apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0,
monitors = Monitors0} = State0) ->
apply(#{machine_version := Vsn} = Meta, {sac, SacCommand},
#?MODULE{single_active_consumer = SacState0,
monitors = Monitors0} = State0) ->
Mod = sac_module(Meta),
{SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0),
{SacState2, Monitors1, Effects1} =
Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0),
Monitors2 = case ?V7_OR_MORE(Vsn) of
true ->
Monitors0;
false ->
Monitors1
end,
return(Meta, State0#?MODULE{single_active_consumer = SacState2,
monitors = Monitors1}, Reply, Effects1);
monitors = Monitors2}, Reply, Effects1);
apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
#?MODULE{streams = Streams0,
monitors = Monitors0,
Expand All @@ -607,6 +615,18 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
_ ->
[]
end,
{SacState1, SacEffects} =
case Vsn >= 7 of
true ->
%% all down PIDs are submitted to SAC
%% it filters out if not interested
sac_handle_connection_down(Meta, SacState0,
Pid, Reason, Vsn);
false ->
{SacState0, []}
end,
Effects1 = Effects0 ++ SacEffects,
State1 = State#?MODULE{single_active_consumer = SacState1},
case maps:take(Pid, Monitors0) of
{{StreamId, listener}, Monitors} when Vsn < 2 ->
Listeners = case maps:take(StreamId, StateListeners0) of
Expand All @@ -620,8 +640,8 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
Listeners1#{StreamId => Pids}
end
end,
return(Meta, State#?MODULE{listeners = Listeners,
monitors = Monitors}, ok, Effects0);
return(Meta, State1#?MODULE{listeners = Listeners,
monitors = Monitors}, ok, Effects1);
{{PidStreams, listener}, Monitors} when ?V2_OR_MORE(Vsn) ->
Streams = maps:fold(
fun(StreamId, _, Acc) ->
Expand All @@ -638,31 +658,34 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd,
Acc
end
end, Streams0, PidStreams),
return(Meta, State#?MODULE{streams = Streams,
monitors = Monitors}, ok, Effects0);
return(Meta, State1#?MODULE{streams = Streams,
monitors = Monitors}, ok, Effects1);
{{StreamId, member}, Monitors1} ->
case Streams0 of
#{StreamId := Stream0} ->
Stream1 = update_stream(Meta, Cmd, Stream0),
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0),
{Stream, Effects} = evaluate_stream(Meta, Stream1, Effects1),
Streams = Streams0#{StreamId => Stream},
return(Meta, State#?MODULE{streams = Streams,
monitors = Monitors1}, ok,
return(Meta, State1#?MODULE{streams = Streams,
monitors = Monitors1}, ok,
Effects);
_ ->
%% stream not found, can happen if "late" downs are
%% received
return(Meta, State#?MODULE{streams = Streams0,
monitors = Monitors1}, ok, Effects0)
return(Meta, State1#?MODULE{streams = Streams0,
monitors = Monitors1}, ok, Effects1)
end;
{sac, Monitors1} ->
{SacState1, SacEffects} = sac_handle_connection_down(Meta, SacState0,
Pid, Reason, Vsn),
return(Meta, State#?MODULE{single_active_consumer = SacState1,
monitors = Monitors1},
ok, [Effects0 ++ SacEffects]);
{sac, Monitors1} when Vsn < 7 ->
#?MODULE{single_active_consumer = SacSt0} = State1,
{SacSt1, SacEfts} = sac_handle_connection_down(Meta, SacSt0,
Pid, Reason, Vsn),
return(Meta, State1#?MODULE{single_active_consumer = SacSt1,
monitors = Monitors1},
ok, Effects1 ++ SacEfts);
error ->
return(Meta, State, ok, Effects0)
return(Meta, State1, ok, Effects1);
_ ->
return(Meta, State1, ok, Effects1)
end;
apply(#{machine_version := MachineVersion} = Meta,
{register_listener, #{pid := Pid,
Expand Down Expand Up @@ -2304,6 +2327,9 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) ->
SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0),
Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport),
{State#?MODULE{single_active_consumer = Sac1}, []};
machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) ->
Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0),
{State#?MODULE{monitors = Monitors}, []};
machine_version(From, To, State) ->
?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.",
[From, To]),
Expand Down
3 changes: 2 additions & 1 deletion deps/rabbit/src/rabbit_stream_coordinator.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,8 @@
monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1
#{stream_id() => ok}, %% v2
monitor_role()} |
sac},
sac %% before v7
},
%% not used as of v2
listeners = #{} :: undefined | #{stream_id() =>
#{pid() := queue_ref()}},
Expand Down
6 changes: 4 additions & 2 deletions deps/rabbit/src/rabbit_stream_sac_coordinator.erl
Original file line number Diff line number Diff line change
Expand Up @@ -955,7 +955,8 @@ list_nodes(#?MODULE{groups = Groups}) ->

-spec state_enter(ra_server:ra_state(), state() | term()) ->
ra_machine:effects().
state_enter(leader, #?MODULE{groups = Groups} = State)
state_enter(leader, #?MODULE{groups = Groups,
pids_groups = PidsGroups} = State)
when ?IS_STATE_REC(State) ->
%% becoming leader, we re-issue monitors and timers for connections with
%% disconnected consumers
Expand All @@ -978,8 +979,9 @@ state_enter(leader, #?MODULE{groups = Groups} = State)
end, Acc, Cs)
end, {#{}, #{}}, Groups),
DisTimeout = disconnected_timeout(State),
%% monitor involved nodes
%% monitor connections and involved nodes
%% reset a timer for disconnected connections
[{monitor, process, P} || P <- lists:sort(maps:keys(PidsGroups))] ++
[{monitor, node, N} || N <- lists:sort(maps:keys(Nodes))] ++
[begin
Time = case ts() - Ts of
Expand Down
136 changes: 136 additions & 0 deletions deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ all_tests() ->
listeners,
machine_version_upgrade_to_2,
machine_version_upgrade_to_3,
machine_version_upgrade_to_7,
sac_v7_down_handler_should_not_use_monitors_map,
sac_v7_ensure_monitors_should_not_use_monitors_map,
sac_pre_v7_down_handler_should_use_monitors_map,
sac_pre_v7_ensure_monitors_should_use_monitors_map,
new_stream,
leader_down,
leader_down_scenario_1,
Expand Down Expand Up @@ -60,9 +65,23 @@ init_per_group(_Group, Config) ->
end_per_group(_Group, _Config) ->
ok.

init_per_testcase(TestCase, Config)
when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map;
TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map;
TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map;
TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
ok = meck:new(rabbit_stream_sac_coordinator, [no_link]),
Config;
init_per_testcase(_TestCase, Config) ->
Config.

end_per_testcase(TestCase, _Config)
when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map;
TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map;
TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map;
TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map ->
meck:unload(rabbit_stream_sac_coordinator),
ok;
end_per_testcase(_TestCase, _Config) ->
ok.

Expand Down Expand Up @@ -253,6 +272,123 @@ machine_version_to_3(From) ->
?assertEqual(Effects, []),
ok.

machine_version_upgrade_to_7(_) ->
Pid1 = spawn(fun() -> ok end),
Pid2 = spawn(fun() -> ok end),
Pid3 = spawn(fun() -> ok end),
S = <<"stream">>,
Monitors0 = #{Pid1 => sac,
Pid2 => {S, member},
Pid3 => sac},
State0 = #?STATE{monitors = Monitors0},

{State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, 6, 7}, State0),

?assertEqual(#{Pid2 => {S, member}}, State1#?STATE.monitors),
?assertEqual([], Effects),
ok.

sac_v7_down_handler_should_not_use_monitors_map(_) ->
ConnectionPid = spawn(fun() -> ok end),
SacState0 = fake_sac_state,
SacState1 = updated_sac_state,
meck:expect(rabbit_stream_sac_coordinator, handle_connection_down,
fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid,
State =:= SacState0 ->
{SacState1, []}
end),

OtherPid = spawn(fun() -> ok end),
Monitors0 = #{OtherPid => {<<"other">>, member}},
State0 = #?STATE{single_active_consumer = SacState0,
monitors = Monitors0},

{State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}),
{down, ConnectionPid, normal}, State0),

?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down,
['_', ConnectionPid, normal, SacState0])),
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
?assertEqual(Monitors0, State1#?STATE.monitors),
ok.

sac_v7_ensure_monitors_should_not_use_monitors_map(_) ->
ConnectionPid = self(),
SacCmd = fake_sac_cmd,
SacState0 = fake_sac_state,
SacState1 = updated_sac_state,
meck:expect(rabbit_stream_sac_coordinator, apply,
fun(Cmd, State) when Cmd =:= SacCmd,
State =:= SacState0 ->
{SacState1, {ok, true}, []}
end),
meck:expect(rabbit_stream_sac_coordinator, ensure_monitors,
fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd,
State =:= SacState1 ->
{State, Monitors#{ConnectionPid => sac}, Effects}
end),

State0 = #?STATE{single_active_consumer = SacState0,
monitors = #{}},

{State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}),
{sac, SacCmd}, State0),

?assertEqual(#{}, State1#?STATE.monitors),
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
ok.

sac_pre_v7_down_handler_should_use_monitors_map(_) ->
ConnectionPid = spawn(fun() -> ok end),
SacState0 = fake_sac_state,
SacState1 = updated_sac_state,
meck:expect(rabbit_stream_sac_coordinator, handle_connection_down,
fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid,
State =:= SacState0 ->
{SacState1, []}
end),

OtherPid = spawn(fun() -> ok end),
Monitors0 = #{ConnectionPid => sac,
OtherPid => {<<"other">>, member}},
State0 = #?STATE{single_active_consumer = SacState0,
monitors = Monitors0},

{State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}),
{down, ConnectionPid, normal}, State0),

?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down,
['_', ConnectionPid, normal, SacState0])),
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
?assertEqual(#{OtherPid => {<<"other">>, member}}, State1#?STATE.monitors),
ok.

sac_pre_v7_ensure_monitors_should_use_monitors_map(_) ->
ConnectionPid = self(),
SacCmd = fake_sac_cmd,
SacState0 = fake_sac_state,
SacState1 = updated_sac_state,
meck:expect(rabbit_stream_sac_coordinator, apply,
fun(Cmd, State) when Cmd =:= SacCmd,
State =:= SacState0 ->
{SacState1, {ok, true}, []}
end),
meck:expect(rabbit_stream_sac_coordinator, ensure_monitors,
fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd,
State =:= SacState1 ->
{State, Monitors#{ConnectionPid => sac}, Effects}
end),

State0 = #?STATE{single_active_consumer = SacState0,
monitors = #{}},

{State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}),
{sac, SacCmd}, State0),

?assertEqual(#{ConnectionPid => sac}, State1#?STATE.monitors),
?assertEqual(SacState1, State1#?STATE.single_active_consumer),
ok.

new_stream(_) ->
[N1, N2, N3] = Nodes = [r@n1, r@n2, r@n3],
StreamId = atom_to_list(?FUNCTION_NAME),
Expand Down
23 changes: 10 additions & 13 deletions deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,6 @@ init_per_group(_Group, Config) ->
end_per_group(_Group, _Config) ->
ok.

init_per_testcase(_TestCase, Config) ->
ok = meck:new(rabbit_feature_flags),
meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end),
Config.

end_per_testcase(_TestCase, _Config) ->
meck:unload(),
ok.

check_conf_test(_) ->
K = disconnected_timeout,
Def = 60_000,
Expand Down Expand Up @@ -1870,21 +1861,21 @@ state_enter_test(_) ->

assertEmpty(?MOD:state_enter(follower, #{})),

?assertEqual(mon_node_eff([N0, N1, N2]),
?assertEqual(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]),
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
Id2 => grp([csr(P2), csr(P2), csr(P2)])})),

?assertEqual(mon_node_eff([N0, N1]),
?assertEqual(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]),
state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]),
Id1 => grp([csr(P1), csr(P1), csr(P1)]),
Id2 => grp([csr(P0), csr(P1), csr(P1)])})),

?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ [timer_eff(P1)]),
?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]) ++ [timer_eff(P1)]),
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
Id2 => grp([csr(P0)])})),

?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ timer_eff([P1, P2])),
?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]) ++ timer_eff([P1, P2])),
state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]),
Id1 => grp([csr(P0), csr(P2, {disconnected, waiting})]),
Id2 => grp([csr(P0), csr(P1, {disconnected, waiting})])})),
Expand All @@ -1898,6 +1889,12 @@ mon_node_eff(Nodes) when is_list(Nodes) ->
mon_node_eff(N) ->
{monitor, node, N}.

mon_proc_eff(Pids) when is_list(Pids) ->
lists:sort([mon_proc_eff(P) || P <- Pids]);
mon_proc_eff(Pid) ->
{monitor, process, Pid}.


timer_eff(Pids) when is_list(Pids) ->
lists:sort([timer_eff(Pid) || Pid <- Pids]);
timer_eff(Pid) ->
Expand Down
8 changes: 4 additions & 4 deletions deps/rabbitmq_stream/docs/stream_coordinator.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@ sequenceDiagram
```mermaid
flowchart TB
A(monitor) --noconnection--> B(status = disconnected, set up timer)
B -. timeout .-> C(status = forgotten)
B -. timeout .-> C(status = presumed_down)
B -. nodeup .-> D(reissue monitors, send msg to connections)
D -. down .-> E(handle connection down)
D -. connection response .-> F(evaluate impacted groups)
```

* composite status for consumers: `{connected, active}`, `{disconnected,active}`, etc.
* composite status for consumers: `{connected, active}`, `{disconnected, active}`, `{presumed_down, active}`, etc.
* `disconnected` status can prevent rebalancing in a group, e.g. `{disconnected, active}` (it is impossible to tell the active consumer to step down)
* consumers in `forgotten` status are ignored during rebalancing
* it may be necessary to reconcile a group if a `{forgotten, active}` consumer comes back in a group ("evaluate impacted groups" box above).
* consumers in `presumed_down` status are ignored during rebalancing (they are not eligible)
* it may be necessary to reconcile a group if a `{presumed_down, active}` consumer comes back in a group ("evaluate impacted groups" box above).
This is unlikely though.

### Stale Node Detection
Expand Down
Loading