diff --git a/deps/rabbit/src/rabbit_stream_coordinator.erl b/deps/rabbit/src/rabbit_stream_coordinator.erl index ef8c7525a9eb..713944042787 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_coordinator.erl @@ -88,6 +88,7 @@ -define(REPLICA_FRESHNESS_LIMIT_MS, 10 * 1000). %% 10s -define(V2_OR_MORE(Vsn), (Vsn >= 2)). -define(V5_OR_MORE(Vsn), (Vsn >= 5)). +-define(V7_OR_MORE(Vsn), (Vsn >= 7)). %% SAC monitors no longer in monitors map -define(SAC_V4, rabbit_stream_sac_coordinator_v4). -define(SAC_CURRENT, rabbit_stream_sac_coordinator). @@ -543,7 +544,7 @@ reachable_coord_members() -> Nodes = rabbit_nodes:list_reachable(), [{?MODULE, Node} || Node <- Nodes]. -version() -> 6. +version() -> 7. which_module(_) -> ?MODULE. @@ -588,14 +589,21 @@ apply(#{index := _Idx, machine_version := MachineVersion} = Meta0, {reply, Reply} -> return(Meta, State0, Reply, []) end; -apply(Meta, {sac, SacCommand}, #?MODULE{single_active_consumer = SacState0, - monitors = Monitors0} = State0) -> +apply(#{machine_version := Vsn} = Meta, {sac, SacCommand}, + #?MODULE{single_active_consumer = SacState0, + monitors = Monitors0} = State0) -> Mod = sac_module(Meta), {SacState1, Reply, Effects0} = Mod:apply(SacCommand, SacState0), {SacState2, Monitors1, Effects1} = Mod:ensure_monitors(SacCommand, SacState1, Monitors0, Effects0), + Monitors2 = case ?V7_OR_MORE(Vsn) of + true -> + Monitors0; + false -> + Monitors1 + end, return(Meta, State0#?MODULE{single_active_consumer = SacState2, - monitors = Monitors1}, Reply, Effects1); + monitors = Monitors2}, Reply, Effects1); apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd, #?MODULE{streams = Streams0, monitors = Monitors0, @@ -607,6 +615,18 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd, _ -> [] end, + {SacState1, SacEffects} = + case Vsn >= 7 of + true -> + %% all down PIDs are submitted to SAC + %% it filters out if not interested + sac_handle_connection_down(Meta, SacState0, + Pid, Reason, Vsn); + false -> + {SacState0, []} + end, + Effects1 = Effects0 ++ SacEffects, + State1 = State#?MODULE{single_active_consumer = SacState1}, case maps:take(Pid, Monitors0) of {{StreamId, listener}, Monitors} when Vsn < 2 -> Listeners = case maps:take(StreamId, StateListeners0) of @@ -620,8 +640,8 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd, Listeners1#{StreamId => Pids} end end, - return(Meta, State#?MODULE{listeners = Listeners, - monitors = Monitors}, ok, Effects0); + return(Meta, State1#?MODULE{listeners = Listeners, + monitors = Monitors}, ok, Effects1); {{PidStreams, listener}, Monitors} when ?V2_OR_MORE(Vsn) -> Streams = maps:fold( fun(StreamId, _, Acc) -> @@ -638,31 +658,34 @@ apply(#{machine_version := Vsn} = Meta, {down, Pid, Reason} = Cmd, Acc end end, Streams0, PidStreams), - return(Meta, State#?MODULE{streams = Streams, - monitors = Monitors}, ok, Effects0); + return(Meta, State1#?MODULE{streams = Streams, + monitors = Monitors}, ok, Effects1); {{StreamId, member}, Monitors1} -> case Streams0 of #{StreamId := Stream0} -> Stream1 = update_stream(Meta, Cmd, Stream0), - {Stream, Effects} = evaluate_stream(Meta, Stream1, Effects0), + {Stream, Effects} = evaluate_stream(Meta, Stream1, Effects1), Streams = Streams0#{StreamId => Stream}, - return(Meta, State#?MODULE{streams = Streams, - monitors = Monitors1}, ok, + return(Meta, State1#?MODULE{streams = Streams, + monitors = Monitors1}, ok, Effects); _ -> %% stream not found, can happen if "late" downs are %% received - return(Meta, State#?MODULE{streams = Streams0, - monitors = Monitors1}, ok, Effects0) + return(Meta, State1#?MODULE{streams = Streams0, + monitors = Monitors1}, ok, Effects1) end; - {sac, Monitors1} -> - {SacState1, SacEffects} = sac_handle_connection_down(Meta, SacState0, - Pid, Reason, Vsn), - return(Meta, State#?MODULE{single_active_consumer = SacState1, - monitors = Monitors1}, - ok, [Effects0 ++ SacEffects]); + {sac, Monitors1} when Vsn < 7 -> + #?MODULE{single_active_consumer = SacSt0} = State1, + {SacSt1, SacEfts} = sac_handle_connection_down(Meta, SacSt0, + Pid, Reason, Vsn), + return(Meta, State1#?MODULE{single_active_consumer = SacSt1, + monitors = Monitors1}, + ok, Effects1 ++ SacEfts); error -> - return(Meta, State, ok, Effects0) + return(Meta, State1, ok, Effects1); + _ -> + return(Meta, State1, ok, Effects1) end; apply(#{machine_version := MachineVersion} = Meta, {register_listener, #{pid := Pid, @@ -2304,6 +2327,9 @@ machine_version(4 = From, 5, #?MODULE{single_active_consumer = Sac0} = State) -> SacExport = rabbit_stream_sac_coordinator_v4:state_to_map(Sac0), Sac1 = rabbit_stream_sac_coordinator:import_state(From, SacExport), {State#?MODULE{single_active_consumer = Sac1}, []}; +machine_version(6, 7, #?MODULE{monitors = Monitors0} = State) -> + Monitors = maps:filter(fun(_Key, Value) -> Value =/= sac end, Monitors0), + {State#?MODULE{monitors = Monitors}, []}; machine_version(From, To, State) -> ?LOG_INFO("Stream coordinator machine version changes from ~tp to ~tp, no state changes required.", [From, To]), diff --git a/deps/rabbit/src/rabbit_stream_coordinator.hrl b/deps/rabbit/src/rabbit_stream_coordinator.hrl index 3603be485835..df9bbe0bf3ac 100644 --- a/deps/rabbit/src/rabbit_stream_coordinator.hrl +++ b/deps/rabbit/src/rabbit_stream_coordinator.hrl @@ -63,7 +63,8 @@ monitors = #{} :: #{pid() => {stream_id() | %% v0 & v1 #{stream_id() => ok}, %% v2 monitor_role()} | - sac}, + sac %% before v7 + }, %% not used as of v2 listeners = #{} :: undefined | #{stream_id() => #{pid() := queue_ref()}}, diff --git a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl index b9c39e77bd52..967982bd650e 100644 --- a/deps/rabbit/src/rabbit_stream_sac_coordinator.erl +++ b/deps/rabbit/src/rabbit_stream_sac_coordinator.erl @@ -955,7 +955,8 @@ list_nodes(#?MODULE{groups = Groups}) -> -spec state_enter(ra_server:ra_state(), state() | term()) -> ra_machine:effects(). -state_enter(leader, #?MODULE{groups = Groups} = State) +state_enter(leader, #?MODULE{groups = Groups, + pids_groups = PidsGroups} = State) when ?IS_STATE_REC(State) -> %% becoming leader, we re-issue monitors and timers for connections with %% disconnected consumers @@ -978,8 +979,9 @@ state_enter(leader, #?MODULE{groups = Groups} = State) end, Acc, Cs) end, {#{}, #{}}, Groups), DisTimeout = disconnected_timeout(State), - %% monitor involved nodes + %% monitor connections and involved nodes %% reset a timer for disconnected connections + [{monitor, process, P} || P <- lists:sort(maps:keys(PidsGroups))] ++ [{monitor, node, N} || N <- lists:sort(maps:keys(Nodes))] ++ [begin Time = case ts() - Ts of diff --git a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl index 4c680971404f..3fb94dabfa6a 100644 --- a/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_coordinator_SUITE.erl @@ -27,6 +27,11 @@ all_tests() -> listeners, machine_version_upgrade_to_2, machine_version_upgrade_to_3, + machine_version_upgrade_to_7, + sac_v7_down_handler_should_not_use_monitors_map, + sac_v7_ensure_monitors_should_not_use_monitors_map, + sac_pre_v7_down_handler_should_use_monitors_map, + sac_pre_v7_ensure_monitors_should_use_monitors_map, new_stream, leader_down, leader_down_scenario_1, @@ -60,9 +65,23 @@ init_per_group(_Group, Config) -> end_per_group(_Group, _Config) -> ok. +init_per_testcase(TestCase, Config) + when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map; + TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map; + TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map; + TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map -> + ok = meck:new(rabbit_stream_sac_coordinator, [no_link]), + Config; init_per_testcase(_TestCase, Config) -> Config. +end_per_testcase(TestCase, _Config) + when TestCase =:= sac_v7_down_handler_should_not_use_monitors_map; + TestCase =:= sac_v7_ensure_monitors_should_not_use_monitors_map; + TestCase =:= sac_pre_v7_down_handler_should_use_monitors_map; + TestCase =:= sac_pre_v7_ensure_monitors_should_use_monitors_map -> + meck:unload(rabbit_stream_sac_coordinator), + ok; end_per_testcase(_TestCase, _Config) -> ok. @@ -253,6 +272,123 @@ machine_version_to_3(From) -> ?assertEqual(Effects, []), ok. +machine_version_upgrade_to_7(_) -> + Pid1 = spawn(fun() -> ok end), + Pid2 = spawn(fun() -> ok end), + Pid3 = spawn(fun() -> ok end), + S = <<"stream">>, + Monitors0 = #{Pid1 => sac, + Pid2 => {S, member}, + Pid3 => sac}, + State0 = #?STATE{monitors = Monitors0}, + + {State1, ok, Effects} = apply_cmd(#{index => 42}, {machine_version, 6, 7}, State0), + + ?assertEqual(#{Pid2 => {S, member}}, State1#?STATE.monitors), + ?assertEqual([], Effects), + ok. + +sac_v7_down_handler_should_not_use_monitors_map(_) -> + ConnectionPid = spawn(fun() -> ok end), + SacState0 = fake_sac_state, + SacState1 = updated_sac_state, + meck:expect(rabbit_stream_sac_coordinator, handle_connection_down, + fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid, + State =:= SacState0 -> + {SacState1, []} + end), + + OtherPid = spawn(fun() -> ok end), + Monitors0 = #{OtherPid => {<<"other">>, member}}, + State0 = #?STATE{single_active_consumer = SacState0, + monitors = Monitors0}, + + {State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}), + {down, ConnectionPid, normal}, State0), + + ?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down, + ['_', ConnectionPid, normal, SacState0])), + ?assertEqual(SacState1, State1#?STATE.single_active_consumer), + ?assertEqual(Monitors0, State1#?STATE.monitors), + ok. + +sac_v7_ensure_monitors_should_not_use_monitors_map(_) -> + ConnectionPid = self(), + SacCmd = fake_sac_cmd, + SacState0 = fake_sac_state, + SacState1 = updated_sac_state, + meck:expect(rabbit_stream_sac_coordinator, apply, + fun(Cmd, State) when Cmd =:= SacCmd, + State =:= SacState0 -> + {SacState1, {ok, true}, []} + end), + meck:expect(rabbit_stream_sac_coordinator, ensure_monitors, + fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd, + State =:= SacState1 -> + {State, Monitors#{ConnectionPid => sac}, Effects} + end), + + State0 = #?STATE{single_active_consumer = SacState0, + monitors = #{}}, + + {State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 7}), + {sac, SacCmd}, State0), + + ?assertEqual(#{}, State1#?STATE.monitors), + ?assertEqual(SacState1, State1#?STATE.single_active_consumer), + ok. + +sac_pre_v7_down_handler_should_use_monitors_map(_) -> + ConnectionPid = spawn(fun() -> ok end), + SacState0 = fake_sac_state, + SacState1 = updated_sac_state, + meck:expect(rabbit_stream_sac_coordinator, handle_connection_down, + fun(_Meta, Pid, normal, State) when Pid =:= ConnectionPid, + State =:= SacState0 -> + {SacState1, []} + end), + + OtherPid = spawn(fun() -> ok end), + Monitors0 = #{ConnectionPid => sac, + OtherPid => {<<"other">>, member}}, + State0 = #?STATE{single_active_consumer = SacState0, + monitors = Monitors0}, + + {State1, ok, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}), + {down, ConnectionPid, normal}, State0), + + ?assert(meck:called(rabbit_stream_sac_coordinator, handle_connection_down, + ['_', ConnectionPid, normal, SacState0])), + ?assertEqual(SacState1, State1#?STATE.single_active_consumer), + ?assertEqual(#{OtherPid => {<<"other">>, member}}, State1#?STATE.monitors), + ok. + +sac_pre_v7_ensure_monitors_should_use_monitors_map(_) -> + ConnectionPid = self(), + SacCmd = fake_sac_cmd, + SacState0 = fake_sac_state, + SacState1 = updated_sac_state, + meck:expect(rabbit_stream_sac_coordinator, apply, + fun(Cmd, State) when Cmd =:= SacCmd, + State =:= SacState0 -> + {SacState1, {ok, true}, []} + end), + meck:expect(rabbit_stream_sac_coordinator, ensure_monitors, + fun(Cmd, State, Monitors, Effects) when Cmd =:= SacCmd, + State =:= SacState1 -> + {State, Monitors#{ConnectionPid => sac}, Effects} + end), + + State0 = #?STATE{single_active_consumer = SacState0, + monitors = #{}}, + + {State1, {ok, true}, _Effects} = apply_cmd(meta(#{index => 42, machine_version => 6}), + {sac, SacCmd}, State0), + + ?assertEqual(#{ConnectionPid => sac}, State1#?STATE.monitors), + ?assertEqual(SacState1, State1#?STATE.single_active_consumer), + ok. + new_stream(_) -> [N1, N2, N3] = Nodes = [r@n1, r@n2, r@n3], StreamId = atom_to_list(?FUNCTION_NAME), diff --git a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl index 2a1f72954a02..c8232c58d1e5 100644 --- a/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl +++ b/deps/rabbit/test/rabbit_stream_sac_coordinator_SUITE.erl @@ -56,15 +56,6 @@ init_per_group(_Group, Config) -> end_per_group(_Group, _Config) -> ok. -init_per_testcase(_TestCase, Config) -> - ok = meck:new(rabbit_feature_flags), - meck:expect(rabbit_feature_flags, is_enabled, fun (_) -> true end), - Config. - -end_per_testcase(_TestCase, _Config) -> - meck:unload(), - ok. - check_conf_test(_) -> K = disconnected_timeout, Def = 60_000, @@ -1870,21 +1861,21 @@ state_enter_test(_) -> assertEmpty(?MOD:state_enter(follower, #{})), - ?assertEqual(mon_node_eff([N0, N1, N2]), + ?assertEqual(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]), state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]), Id1 => grp([csr(P1), csr(P1), csr(P1)]), Id2 => grp([csr(P2), csr(P2), csr(P2)])})), - ?assertEqual(mon_node_eff([N0, N1]), + ?assertEqual(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]), state_enter_leader(#{Id0 => grp([csr(P0), csr(P0), csr(P0)]), Id1 => grp([csr(P1), csr(P1), csr(P1)]), Id2 => grp([csr(P0), csr(P1), csr(P1)])})), - ?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ [timer_eff(P1)]), + ?assertEqual(lists:sort(mon_node_eff([N0, N1]) ++ mon_proc_eff([P0, P1]) ++ [timer_eff(P1)]), state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]), Id2 => grp([csr(P0)])})), - ?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ timer_eff([P1, P2])), + ?assertEqual(lists:sort(mon_node_eff([N0, N1, N2]) ++ mon_proc_eff([P0, P1, P2]) ++ timer_eff([P1, P2])), state_enter_leader(#{Id0 => grp([csr(P0), csr(P1, {disconnected, waiting})]), Id1 => grp([csr(P0), csr(P2, {disconnected, waiting})]), Id2 => grp([csr(P0), csr(P1, {disconnected, waiting})])})), @@ -1898,6 +1889,12 @@ mon_node_eff(Nodes) when is_list(Nodes) -> mon_node_eff(N) -> {monitor, node, N}. +mon_proc_eff(Pids) when is_list(Pids) -> + lists:sort([mon_proc_eff(P) || P <- Pids]); +mon_proc_eff(Pid) -> + {monitor, process, Pid}. + + timer_eff(Pids) when is_list(Pids) -> lists:sort([timer_eff(Pid) || Pid <- Pids]); timer_eff(Pid) -> diff --git a/deps/rabbitmq_stream/docs/stream_coordinator.md b/deps/rabbitmq_stream/docs/stream_coordinator.md index 2904053d5760..c583f82c2e80 100644 --- a/deps/rabbitmq_stream/docs/stream_coordinator.md +++ b/deps/rabbitmq_stream/docs/stream_coordinator.md @@ -55,16 +55,16 @@ sequenceDiagram ```mermaid flowchart TB A(monitor) --noconnection--> B(status = disconnected, set up timer) - B -. timeout .-> C(status = forgotten) + B -. timeout .-> C(status = presumed_down) B -. nodeup .-> D(reissue monitors, send msg to connections) D -. down .-> E(handle connection down) D -. connection response .-> F(evaluate impacted groups) ``` -* composite status for consumers: `{connected, active}`, `{disconnected,active}`, etc. +* composite status for consumers: `{connected, active}`, `{disconnected, active}`, `{presumed_down, active}`, etc. * `disconnected` status can prevent rebalancing in a group, e.g. `{disconnected, active}` (it is impossible to tell the active consumer to step down) -* consumers in `forgotten` status are ignored during rebalancing -* it may be necessary to reconcile a group if a `{forgotten, active}` consumer comes back in a group ("evaluate impacted groups" box above). +* consumers in `presumed_down` status are ignored during rebalancing (they are not eligible) +* it may be necessary to reconcile a group if a `{presumed_down, active}` consumer comes back in a group ("evaluate impacted groups" box above). This is unlikely though. ### Stale Node Detection